agents_core/
events.rs

1//! Event system for agent lifecycle tracking and progress broadcasting
2
3use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11    AgentStarted(AgentStartedEvent),
12    AgentCompleted(AgentCompletedEvent),
13    ToolStarted(ToolStartedEvent),
14    ToolCompleted(ToolCompletedEvent),
15    ToolFailed(ToolFailedEvent),
16    SubAgentStarted(SubAgentStartedEvent),
17    SubAgentCompleted(SubAgentCompletedEvent),
18    TodosUpdated(TodosUpdatedEvent),
19    StateCheckpointed(StateCheckpointedEvent),
20    PlanningComplete(PlanningCompleteEvent),
21    TokenUsage(TokenUsageEvent),
22}
23
24impl AgentEvent {
25    pub fn event_type_name(&self) -> &'static str {
26        match self {
27            AgentEvent::AgentStarted(_) => "agent_started",
28            AgentEvent::AgentCompleted(_) => "agent_completed",
29            AgentEvent::ToolStarted(_) => "tool_started",
30            AgentEvent::ToolCompleted(_) => "tool_completed",
31            AgentEvent::ToolFailed(_) => "tool_failed",
32            AgentEvent::SubAgentStarted(_) => "sub_agent_started",
33            AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
34            AgentEvent::TodosUpdated(_) => "todos_updated",
35            AgentEvent::StateCheckpointed(_) => "state_checkpointed",
36            AgentEvent::PlanningComplete(_) => "planning_complete",
37            AgentEvent::TokenUsage(_) => "token_usage",
38        }
39    }
40
41    pub fn metadata(&self) -> &EventMetadata {
42        match self {
43            AgentEvent::AgentStarted(e) => &e.metadata,
44            AgentEvent::AgentCompleted(e) => &e.metadata,
45            AgentEvent::ToolStarted(e) => &e.metadata,
46            AgentEvent::ToolCompleted(e) => &e.metadata,
47            AgentEvent::ToolFailed(e) => &e.metadata,
48            AgentEvent::SubAgentStarted(e) => &e.metadata,
49            AgentEvent::SubAgentCompleted(e) => &e.metadata,
50            AgentEvent::TodosUpdated(e) => &e.metadata,
51            AgentEvent::StateCheckpointed(e) => &e.metadata,
52            AgentEvent::PlanningComplete(e) => &e.metadata,
53            AgentEvent::TokenUsage(e) => &e.metadata,
54        }
55    }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct EventMetadata {
60    pub thread_id: String,
61    pub correlation_id: String,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub customer_id: Option<String>,
64    pub timestamp: String,
65}
66
67impl EventMetadata {
68    pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
69        Self {
70            thread_id,
71            correlation_id,
72            customer_id,
73            timestamp: chrono::Utc::now().to_rfc3339(),
74        }
75    }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct AgentStartedEvent {
80    pub metadata: EventMetadata,
81    pub agent_name: String,
82    pub message_preview: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct AgentCompletedEvent {
87    pub metadata: EventMetadata,
88    pub agent_name: String,
89    pub duration_ms: u64,
90    pub response_preview: String, // Truncated for logs (~100 chars)
91    pub response: String,         // Full response text
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ToolStartedEvent {
96    pub metadata: EventMetadata,
97    pub tool_name: String,
98    pub input_summary: String,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct ToolCompletedEvent {
103    pub metadata: EventMetadata,
104    pub tool_name: String,
105    pub duration_ms: u64,
106    pub result_summary: String,
107    pub success: bool,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct ToolFailedEvent {
112    pub metadata: EventMetadata,
113    pub tool_name: String,
114    pub duration_ms: u64,
115    pub error_message: String,
116    pub is_recoverable: bool,
117    pub retry_count: u32,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct SubAgentStartedEvent {
122    pub metadata: EventMetadata,
123    pub agent_name: String,
124    pub instruction_summary: String,
125    pub delegation_depth: u32,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct SubAgentCompletedEvent {
130    pub metadata: EventMetadata,
131    pub agent_name: String,
132    pub duration_ms: u64,
133    pub result_summary: String,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct TodosUpdatedEvent {
138    pub metadata: EventMetadata,
139    pub todos: Vec<TodoItem>,
140    pub pending_count: usize,
141    pub in_progress_count: usize,
142    pub completed_count: usize,
143    pub last_updated: String,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct StateCheckpointedEvent {
148    pub metadata: EventMetadata,
149    pub checkpoint_id: String,
150    pub state_size_bytes: usize,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct PlanningCompleteEvent {
155    pub metadata: EventMetadata,
156    pub action_type: String,
157    pub action_summary: String,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct TokenUsageEvent {
162    pub metadata: EventMetadata,
163    pub usage: TokenUsage,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct TokenUsage {
168    /// Number of input tokens
169    pub input_tokens: u32,
170    /// Number of output tokens
171    pub output_tokens: u32,
172    /// Total tokens used
173    pub total_tokens: u32,
174    /// Estimated cost in USD
175    pub estimated_cost: f64,
176    /// Provider name
177    pub provider: String,
178    /// Model name
179    pub model: String,
180    /// Request duration in milliseconds
181    pub duration_ms: u64,
182    /// Timestamp of the request
183    pub timestamp: String,
184}
185
186impl TokenUsage {
187    pub fn new(
188        input_tokens: u32,
189        output_tokens: u32,
190        provider: impl Into<String>,
191        model: impl Into<String>,
192        duration_ms: u64,
193        estimated_cost: f64,
194    ) -> Self {
195        let provider = provider.into();
196        let model = model.into();
197        let total_tokens = input_tokens + output_tokens;
198
199        Self {
200            input_tokens,
201            output_tokens,
202            total_tokens,
203            estimated_cost,
204            provider,
205            model,
206            duration_ms,
207            timestamp: chrono::Utc::now().to_rfc3339(),
208        }
209    }
210}
211
212#[async_trait]
213pub trait EventBroadcaster: Send + Sync {
214    fn id(&self) -> &str;
215    async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
216    fn should_broadcast(&self, _event: &AgentEvent) -> bool {
217        true
218    }
219}
220
221pub struct EventDispatcher {
222    broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
223}
224
225impl EventDispatcher {
226    pub fn new() -> Self {
227        Self {
228            broadcasters: std::sync::RwLock::new(Vec::new()),
229        }
230    }
231
232    /// Add a broadcaster (supports dynamic addition with interior mutability)
233    pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
234        if let Ok(mut broadcasters) = self.broadcasters.write() {
235            broadcasters.push(broadcaster);
236        } else {
237            tracing::error!("Failed to acquire write lock on broadcasters");
238        }
239    }
240
241    pub async fn dispatch(&self, event: AgentEvent) {
242        let broadcasters = {
243            if let Ok(guard) = self.broadcasters.read() {
244                guard.clone()
245            } else {
246                tracing::error!("Failed to acquire read lock on broadcasters");
247                return;
248            }
249        };
250
251        for broadcaster in broadcasters {
252            let event_clone = event.clone();
253            tokio::spawn(async move {
254                if broadcaster.should_broadcast(&event_clone) {
255                    if let Err(e) = broadcaster.broadcast(&event_clone).await {
256                        tracing::warn!(
257                            broadcaster_id = broadcaster.id(),
258                            error = %e,
259                            "Failed to broadcast event"
260                        );
261                    }
262                }
263            });
264        }
265    }
266}
267
268impl Default for EventDispatcher {
269    fn default() -> Self {
270        Self::new()
271    }
272}