1use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11 AgentStarted(AgentStartedEvent),
12 AgentCompleted(AgentCompletedEvent),
13 ToolStarted(ToolStartedEvent),
14 ToolCompleted(ToolCompletedEvent),
15 ToolFailed(ToolFailedEvent),
16 SubAgentStarted(SubAgentStartedEvent),
17 SubAgentCompleted(SubAgentCompletedEvent),
18 TodosUpdated(TodosUpdatedEvent),
19 StateCheckpointed(StateCheckpointedEvent),
20 PlanningComplete(PlanningCompleteEvent),
21 TokenUsage(TokenUsageEvent),
22}
23
24impl AgentEvent {
25 pub fn event_type_name(&self) -> &'static str {
26 match self {
27 AgentEvent::AgentStarted(_) => "agent_started",
28 AgentEvent::AgentCompleted(_) => "agent_completed",
29 AgentEvent::ToolStarted(_) => "tool_started",
30 AgentEvent::ToolCompleted(_) => "tool_completed",
31 AgentEvent::ToolFailed(_) => "tool_failed",
32 AgentEvent::SubAgentStarted(_) => "sub_agent_started",
33 AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
34 AgentEvent::TodosUpdated(_) => "todos_updated",
35 AgentEvent::StateCheckpointed(_) => "state_checkpointed",
36 AgentEvent::PlanningComplete(_) => "planning_complete",
37 AgentEvent::TokenUsage(_) => "token_usage",
38 }
39 }
40
41 pub fn metadata(&self) -> &EventMetadata {
42 match self {
43 AgentEvent::AgentStarted(e) => &e.metadata,
44 AgentEvent::AgentCompleted(e) => &e.metadata,
45 AgentEvent::ToolStarted(e) => &e.metadata,
46 AgentEvent::ToolCompleted(e) => &e.metadata,
47 AgentEvent::ToolFailed(e) => &e.metadata,
48 AgentEvent::SubAgentStarted(e) => &e.metadata,
49 AgentEvent::SubAgentCompleted(e) => &e.metadata,
50 AgentEvent::TodosUpdated(e) => &e.metadata,
51 AgentEvent::StateCheckpointed(e) => &e.metadata,
52 AgentEvent::PlanningComplete(e) => &e.metadata,
53 AgentEvent::TokenUsage(e) => &e.metadata,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct EventMetadata {
60 pub thread_id: String,
61 pub correlation_id: String,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub customer_id: Option<String>,
64 pub timestamp: String,
65}
66
67impl EventMetadata {
68 pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
69 Self {
70 thread_id,
71 correlation_id,
72 customer_id,
73 timestamp: chrono::Utc::now().to_rfc3339(),
74 }
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct AgentStartedEvent {
80 pub metadata: EventMetadata,
81 pub agent_name: String,
82 pub message_preview: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct AgentCompletedEvent {
87 pub metadata: EventMetadata,
88 pub agent_name: String,
89 pub duration_ms: u64,
90 pub response_preview: String, pub response: String, }
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct ToolStartedEvent {
96 pub metadata: EventMetadata,
97 pub tool_name: String,
98 pub input_summary: String,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct ToolCompletedEvent {
103 pub metadata: EventMetadata,
104 pub tool_name: String,
105 pub duration_ms: u64,
106 pub result_summary: String,
107 pub success: bool,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct ToolFailedEvent {
112 pub metadata: EventMetadata,
113 pub tool_name: String,
114 pub duration_ms: u64,
115 pub error_message: String,
116 pub is_recoverable: bool,
117 pub retry_count: u32,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct SubAgentStartedEvent {
122 pub metadata: EventMetadata,
123 pub agent_name: String,
124 pub instruction_summary: String,
125 pub delegation_depth: u32,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct SubAgentCompletedEvent {
130 pub metadata: EventMetadata,
131 pub agent_name: String,
132 pub duration_ms: u64,
133 pub result_summary: String,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct TodosUpdatedEvent {
138 pub metadata: EventMetadata,
139 pub todos: Vec<TodoItem>,
140 pub pending_count: usize,
141 pub in_progress_count: usize,
142 pub completed_count: usize,
143 pub last_updated: String,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct StateCheckpointedEvent {
148 pub metadata: EventMetadata,
149 pub checkpoint_id: String,
150 pub state_size_bytes: usize,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct PlanningCompleteEvent {
155 pub metadata: EventMetadata,
156 pub action_type: String,
157 pub action_summary: String,
158}
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct TokenUsageEvent {
162 pub metadata: EventMetadata,
163 pub usage: TokenUsage,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct TokenUsage {
168 pub input_tokens: u32,
170 pub output_tokens: u32,
172 pub total_tokens: u32,
174 pub estimated_cost: f64,
176 pub provider: String,
178 pub model: String,
180 pub duration_ms: u64,
182 pub timestamp: String,
184}
185
186impl TokenUsage {
187 pub fn new(
188 input_tokens: u32,
189 output_tokens: u32,
190 provider: impl Into<String>,
191 model: impl Into<String>,
192 duration_ms: u64,
193 estimated_cost: f64,
194 ) -> Self {
195 let provider = provider.into();
196 let model = model.into();
197 let total_tokens = input_tokens + output_tokens;
198
199 Self {
200 input_tokens,
201 output_tokens,
202 total_tokens,
203 estimated_cost,
204 provider,
205 model,
206 duration_ms,
207 timestamp: chrono::Utc::now().to_rfc3339(),
208 }
209 }
210}
211
212#[async_trait]
213pub trait EventBroadcaster: Send + Sync {
214 fn id(&self) -> &str;
215 async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
216 fn should_broadcast(&self, _event: &AgentEvent) -> bool {
217 true
218 }
219}
220
221pub struct EventDispatcher {
222 broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
223}
224
225impl EventDispatcher {
226 pub fn new() -> Self {
227 Self {
228 broadcasters: std::sync::RwLock::new(Vec::new()),
229 }
230 }
231
232 pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
234 if let Ok(mut broadcasters) = self.broadcasters.write() {
235 broadcasters.push(broadcaster);
236 } else {
237 tracing::error!("Failed to acquire write lock on broadcasters");
238 }
239 }
240
241 pub async fn dispatch(&self, event: AgentEvent) {
242 let broadcasters = {
243 if let Ok(guard) = self.broadcasters.read() {
244 guard.clone()
245 } else {
246 tracing::error!("Failed to acquire read lock on broadcasters");
247 return;
248 }
249 };
250
251 for broadcaster in broadcasters {
252 let event_clone = event.clone();
253 tokio::spawn(async move {
254 if broadcaster.should_broadcast(&event_clone) {
255 if let Err(e) = broadcaster.broadcast(&event_clone).await {
256 tracing::warn!(
257 broadcaster_id = broadcaster.id(),
258 error = %e,
259 "Failed to broadcast event"
260 );
261 }
262 }
263 });
264 }
265 }
266}
267
268impl Default for EventDispatcher {
269 fn default() -> Self {
270 Self::new()
271 }
272}