agents_core/
events.rs

1//! Event system for agent lifecycle tracking and progress broadcasting
2
3use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11    AgentStarted(AgentStartedEvent),
12    AgentCompleted(AgentCompletedEvent),
13    ToolStarted(ToolStartedEvent),
14    ToolCompleted(ToolCompletedEvent),
15    ToolFailed(ToolFailedEvent),
16    SubAgentStarted(SubAgentStartedEvent),
17    SubAgentCompleted(SubAgentCompletedEvent),
18    TodosUpdated(TodosUpdatedEvent),
19    StateCheckpointed(StateCheckpointedEvent),
20    PlanningComplete(PlanningCompleteEvent),
21}
22
23impl AgentEvent {
24    pub fn event_type_name(&self) -> &'static str {
25        match self {
26            AgentEvent::AgentStarted(_) => "agent_started",
27            AgentEvent::AgentCompleted(_) => "agent_completed",
28            AgentEvent::ToolStarted(_) => "tool_started",
29            AgentEvent::ToolCompleted(_) => "tool_completed",
30            AgentEvent::ToolFailed(_) => "tool_failed",
31            AgentEvent::SubAgentStarted(_) => "sub_agent_started",
32            AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
33            AgentEvent::TodosUpdated(_) => "todos_updated",
34            AgentEvent::StateCheckpointed(_) => "state_checkpointed",
35            AgentEvent::PlanningComplete(_) => "planning_complete",
36        }
37    }
38
39    pub fn metadata(&self) -> &EventMetadata {
40        match self {
41            AgentEvent::AgentStarted(e) => &e.metadata,
42            AgentEvent::AgentCompleted(e) => &e.metadata,
43            AgentEvent::ToolStarted(e) => &e.metadata,
44            AgentEvent::ToolCompleted(e) => &e.metadata,
45            AgentEvent::ToolFailed(e) => &e.metadata,
46            AgentEvent::SubAgentStarted(e) => &e.metadata,
47            AgentEvent::SubAgentCompleted(e) => &e.metadata,
48            AgentEvent::TodosUpdated(e) => &e.metadata,
49            AgentEvent::StateCheckpointed(e) => &e.metadata,
50            AgentEvent::PlanningComplete(e) => &e.metadata,
51        }
52    }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventMetadata {
57    pub thread_id: String,
58    pub correlation_id: String,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub customer_id: Option<String>,
61    pub timestamp: String,
62}
63
64impl EventMetadata {
65    pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
66        Self {
67            thread_id,
68            correlation_id,
69            customer_id,
70            timestamp: chrono::Utc::now().to_rfc3339(),
71        }
72    }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct AgentStartedEvent {
77    pub metadata: EventMetadata,
78    pub agent_name: String,
79    pub message_preview: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct AgentCompletedEvent {
84    pub metadata: EventMetadata,
85    pub agent_name: String,
86    pub duration_ms: u64,
87    pub response_preview: String,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ToolStartedEvent {
92    pub metadata: EventMetadata,
93    pub tool_name: String,
94    pub input_summary: String,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct ToolCompletedEvent {
99    pub metadata: EventMetadata,
100    pub tool_name: String,
101    pub duration_ms: u64,
102    pub result_summary: String,
103    pub success: bool,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ToolFailedEvent {
108    pub metadata: EventMetadata,
109    pub tool_name: String,
110    pub duration_ms: u64,
111    pub error_message: String,
112    pub is_recoverable: bool,
113    pub retry_count: u32,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct SubAgentStartedEvent {
118    pub metadata: EventMetadata,
119    pub agent_name: String,
120    pub instruction_summary: String,
121    pub delegation_depth: u32,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SubAgentCompletedEvent {
126    pub metadata: EventMetadata,
127    pub agent_name: String,
128    pub duration_ms: u64,
129    pub result_summary: String,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct TodosUpdatedEvent {
134    pub metadata: EventMetadata,
135    pub todos: Vec<TodoItem>,
136    pub pending_count: usize,
137    pub in_progress_count: usize,
138    pub completed_count: usize,
139    pub last_updated: String,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct StateCheckpointedEvent {
144    pub metadata: EventMetadata,
145    pub checkpoint_id: String,
146    pub state_size_bytes: usize,
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct PlanningCompleteEvent {
151    pub metadata: EventMetadata,
152    pub action_type: String,
153    pub action_summary: String,
154}
155
156#[async_trait]
157pub trait EventBroadcaster: Send + Sync {
158    fn id(&self) -> &str;
159    async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
160    fn should_broadcast(&self, _event: &AgentEvent) -> bool {
161        true
162    }
163}
164
165pub struct EventDispatcher {
166    broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
167}
168
169impl EventDispatcher {
170    pub fn new() -> Self {
171        Self {
172            broadcasters: std::sync::RwLock::new(Vec::new()),
173        }
174    }
175
176    /// Add a broadcaster (supports dynamic addition with interior mutability)
177    pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
178        if let Ok(mut broadcasters) = self.broadcasters.write() {
179            broadcasters.push(broadcaster);
180        } else {
181            tracing::error!("Failed to acquire write lock on broadcasters");
182        }
183    }
184
185    pub async fn dispatch(&self, event: AgentEvent) {
186        let broadcasters = {
187            if let Ok(guard) = self.broadcasters.read() {
188                guard.clone()
189            } else {
190                tracing::error!("Failed to acquire read lock on broadcasters");
191                return;
192            }
193        };
194
195        for broadcaster in broadcasters {
196            let event_clone = event.clone();
197            tokio::spawn(async move {
198                if broadcaster.should_broadcast(&event_clone) {
199                    if let Err(e) = broadcaster.broadcast(&event_clone).await {
200                        tracing::warn!(
201                            broadcaster_id = broadcaster.id(),
202                            error = %e,
203                            "Failed to broadcast event"
204                        );
205                    }
206                }
207            });
208        }
209    }
210}
211
212impl Default for EventDispatcher {
213    fn default() -> Self {
214        Self::new()
215    }
216}