1use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11 AgentStarted(AgentStartedEvent),
12 AgentCompleted(AgentCompletedEvent),
13 ToolStarted(ToolStartedEvent),
14 ToolCompleted(ToolCompletedEvent),
15 ToolFailed(ToolFailedEvent),
16 SubAgentStarted(SubAgentStartedEvent),
17 SubAgentCompleted(SubAgentCompletedEvent),
18 TodosUpdated(TodosUpdatedEvent),
19 StateCheckpointed(StateCheckpointedEvent),
20 PlanningComplete(PlanningCompleteEvent),
21}
22
23impl AgentEvent {
24 pub fn event_type_name(&self) -> &'static str {
25 match self {
26 AgentEvent::AgentStarted(_) => "agent_started",
27 AgentEvent::AgentCompleted(_) => "agent_completed",
28 AgentEvent::ToolStarted(_) => "tool_started",
29 AgentEvent::ToolCompleted(_) => "tool_completed",
30 AgentEvent::ToolFailed(_) => "tool_failed",
31 AgentEvent::SubAgentStarted(_) => "sub_agent_started",
32 AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
33 AgentEvent::TodosUpdated(_) => "todos_updated",
34 AgentEvent::StateCheckpointed(_) => "state_checkpointed",
35 AgentEvent::PlanningComplete(_) => "planning_complete",
36 }
37 }
38
39 pub fn metadata(&self) -> &EventMetadata {
40 match self {
41 AgentEvent::AgentStarted(e) => &e.metadata,
42 AgentEvent::AgentCompleted(e) => &e.metadata,
43 AgentEvent::ToolStarted(e) => &e.metadata,
44 AgentEvent::ToolCompleted(e) => &e.metadata,
45 AgentEvent::ToolFailed(e) => &e.metadata,
46 AgentEvent::SubAgentStarted(e) => &e.metadata,
47 AgentEvent::SubAgentCompleted(e) => &e.metadata,
48 AgentEvent::TodosUpdated(e) => &e.metadata,
49 AgentEvent::StateCheckpointed(e) => &e.metadata,
50 AgentEvent::PlanningComplete(e) => &e.metadata,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventMetadata {
57 pub thread_id: String,
58 pub correlation_id: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub customer_id: Option<String>,
61 pub timestamp: String,
62}
63
64impl EventMetadata {
65 pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
66 Self {
67 thread_id,
68 correlation_id,
69 customer_id,
70 timestamp: chrono::Utc::now().to_rfc3339(),
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct AgentStartedEvent {
77 pub metadata: EventMetadata,
78 pub agent_name: String,
79 pub message_preview: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct AgentCompletedEvent {
84 pub metadata: EventMetadata,
85 pub agent_name: String,
86 pub duration_ms: u64,
87 pub response_preview: String, pub response: String, }
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ToolStartedEvent {
93 pub metadata: EventMetadata,
94 pub tool_name: String,
95 pub input_summary: String,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct ToolCompletedEvent {
100 pub metadata: EventMetadata,
101 pub tool_name: String,
102 pub duration_ms: u64,
103 pub result_summary: String,
104 pub success: bool,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ToolFailedEvent {
109 pub metadata: EventMetadata,
110 pub tool_name: String,
111 pub duration_ms: u64,
112 pub error_message: String,
113 pub is_recoverable: bool,
114 pub retry_count: u32,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct SubAgentStartedEvent {
119 pub metadata: EventMetadata,
120 pub agent_name: String,
121 pub instruction_summary: String,
122 pub delegation_depth: u32,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct SubAgentCompletedEvent {
127 pub metadata: EventMetadata,
128 pub agent_name: String,
129 pub duration_ms: u64,
130 pub result_summary: String,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct TodosUpdatedEvent {
135 pub metadata: EventMetadata,
136 pub todos: Vec<TodoItem>,
137 pub pending_count: usize,
138 pub in_progress_count: usize,
139 pub completed_count: usize,
140 pub last_updated: String,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct StateCheckpointedEvent {
145 pub metadata: EventMetadata,
146 pub checkpoint_id: String,
147 pub state_size_bytes: usize,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct PlanningCompleteEvent {
152 pub metadata: EventMetadata,
153 pub action_type: String,
154 pub action_summary: String,
155}
156
157#[async_trait]
158pub trait EventBroadcaster: Send + Sync {
159 fn id(&self) -> &str;
160 async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
161 fn should_broadcast(&self, _event: &AgentEvent) -> bool {
162 true
163 }
164}
165
166pub struct EventDispatcher {
167 broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
168}
169
170impl EventDispatcher {
171 pub fn new() -> Self {
172 Self {
173 broadcasters: std::sync::RwLock::new(Vec::new()),
174 }
175 }
176
177 pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
179 if let Ok(mut broadcasters) = self.broadcasters.write() {
180 broadcasters.push(broadcaster);
181 } else {
182 tracing::error!("Failed to acquire write lock on broadcasters");
183 }
184 }
185
186 pub async fn dispatch(&self, event: AgentEvent) {
187 let broadcasters = {
188 if let Ok(guard) = self.broadcasters.read() {
189 guard.clone()
190 } else {
191 tracing::error!("Failed to acquire read lock on broadcasters");
192 return;
193 }
194 };
195
196 for broadcaster in broadcasters {
197 let event_clone = event.clone();
198 tokio::spawn(async move {
199 if broadcaster.should_broadcast(&event_clone) {
200 if let Err(e) = broadcaster.broadcast(&event_clone).await {
201 tracing::warn!(
202 broadcaster_id = broadcaster.id(),
203 error = %e,
204 "Failed to broadcast event"
205 );
206 }
207 }
208 });
209 }
210 }
211}
212
213impl Default for EventDispatcher {
214 fn default() -> Self {
215 Self::new()
216 }
217}