1use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11 AgentStarted(AgentStartedEvent),
12 AgentCompleted(AgentCompletedEvent),
13 ToolStarted(ToolStartedEvent),
14 ToolCompleted(ToolCompletedEvent),
15 ToolFailed(ToolFailedEvent),
16 SubAgentStarted(SubAgentStartedEvent),
17 SubAgentCompleted(SubAgentCompletedEvent),
18 TodosUpdated(TodosUpdatedEvent),
19 StateCheckpointed(StateCheckpointedEvent),
20 PlanningComplete(PlanningCompleteEvent),
21}
22
23impl AgentEvent {
24 pub fn event_type_name(&self) -> &'static str {
25 match self {
26 AgentEvent::AgentStarted(_) => "agent_started",
27 AgentEvent::AgentCompleted(_) => "agent_completed",
28 AgentEvent::ToolStarted(_) => "tool_started",
29 AgentEvent::ToolCompleted(_) => "tool_completed",
30 AgentEvent::ToolFailed(_) => "tool_failed",
31 AgentEvent::SubAgentStarted(_) => "sub_agent_started",
32 AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
33 AgentEvent::TodosUpdated(_) => "todos_updated",
34 AgentEvent::StateCheckpointed(_) => "state_checkpointed",
35 AgentEvent::PlanningComplete(_) => "planning_complete",
36 }
37 }
38
39 pub fn metadata(&self) -> &EventMetadata {
40 match self {
41 AgentEvent::AgentStarted(e) => &e.metadata,
42 AgentEvent::AgentCompleted(e) => &e.metadata,
43 AgentEvent::ToolStarted(e) => &e.metadata,
44 AgentEvent::ToolCompleted(e) => &e.metadata,
45 AgentEvent::ToolFailed(e) => &e.metadata,
46 AgentEvent::SubAgentStarted(e) => &e.metadata,
47 AgentEvent::SubAgentCompleted(e) => &e.metadata,
48 AgentEvent::TodosUpdated(e) => &e.metadata,
49 AgentEvent::StateCheckpointed(e) => &e.metadata,
50 AgentEvent::PlanningComplete(e) => &e.metadata,
51 }
52 }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventMetadata {
57 pub thread_id: String,
58 pub correlation_id: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub customer_id: Option<String>,
61 pub timestamp: String,
62}
63
64impl EventMetadata {
65 pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
66 Self {
67 thread_id,
68 correlation_id,
69 customer_id,
70 timestamp: chrono::Utc::now().to_rfc3339(),
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct AgentStartedEvent {
77 pub metadata: EventMetadata,
78 pub agent_name: String,
79 pub message_preview: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct AgentCompletedEvent {
84 pub metadata: EventMetadata,
85 pub agent_name: String,
86 pub duration_ms: u64,
87 pub response_preview: String,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ToolStartedEvent {
92 pub metadata: EventMetadata,
93 pub tool_name: String,
94 pub input_summary: String,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ToolCompletedEvent {
99 pub metadata: EventMetadata,
100 pub tool_name: String,
101 pub duration_ms: u64,
102 pub result_summary: String,
103 pub success: bool,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ToolFailedEvent {
108 pub metadata: EventMetadata,
109 pub tool_name: String,
110 pub duration_ms: u64,
111 pub error_message: String,
112 pub is_recoverable: bool,
113 pub retry_count: u32,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SubAgentStartedEvent {
118 pub metadata: EventMetadata,
119 pub agent_name: String,
120 pub instruction_summary: String,
121 pub delegation_depth: u32,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SubAgentCompletedEvent {
126 pub metadata: EventMetadata,
127 pub agent_name: String,
128 pub duration_ms: u64,
129 pub result_summary: String,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct TodosUpdatedEvent {
134 pub metadata: EventMetadata,
135 pub todos: Vec<TodoItem>,
136 pub pending_count: usize,
137 pub in_progress_count: usize,
138 pub completed_count: usize,
139 pub last_updated: String,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct StateCheckpointedEvent {
144 pub metadata: EventMetadata,
145 pub checkpoint_id: String,
146 pub state_size_bytes: usize,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct PlanningCompleteEvent {
151 pub metadata: EventMetadata,
152 pub action_type: String,
153 pub action_summary: String,
154}
155
156#[async_trait]
157pub trait EventBroadcaster: Send + Sync {
158 fn id(&self) -> &str;
159 async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
160 fn should_broadcast(&self, _event: &AgentEvent) -> bool {
161 true
162 }
163}
164
165pub struct EventDispatcher {
166 broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
167}
168
169impl EventDispatcher {
170 pub fn new() -> Self {
171 Self {
172 broadcasters: std::sync::RwLock::new(Vec::new()),
173 }
174 }
175
176 pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
178 if let Ok(mut broadcasters) = self.broadcasters.write() {
179 broadcasters.push(broadcaster);
180 } else {
181 tracing::error!("Failed to acquire write lock on broadcasters");
182 }
183 }
184
185 pub async fn dispatch(&self, event: AgentEvent) {
186 let broadcasters = {
187 if let Ok(guard) = self.broadcasters.read() {
188 guard.clone()
189 } else {
190 tracing::error!("Failed to acquire read lock on broadcasters");
191 return;
192 }
193 };
194
195 for broadcaster in broadcasters {
196 let event_clone = event.clone();
197 tokio::spawn(async move {
198 if broadcaster.should_broadcast(&event_clone) {
199 if let Err(e) = broadcaster.broadcast(&event_clone).await {
200 tracing::warn!(
201 broadcaster_id = broadcaster.id(),
202 error = %e,
203 "Failed to broadcast event"
204 );
205 }
206 }
207 });
208 }
209 }
210}
211
212impl Default for EventDispatcher {
213 fn default() -> Self {
214 Self::new()
215 }
216}