agents_core/
events.rs

1//! Event system for agent lifecycle tracking and progress broadcasting
2
3use crate::state::TodoItem;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use std::sync::Arc;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9#[serde(tag = "event_type", rename_all = "snake_case")]
10pub enum AgentEvent {
11    AgentStarted(AgentStartedEvent),
12    AgentCompleted(AgentCompletedEvent),
13    ToolStarted(ToolStartedEvent),
14    ToolCompleted(ToolCompletedEvent),
15    ToolFailed(ToolFailedEvent),
16    SubAgentStarted(SubAgentStartedEvent),
17    SubAgentCompleted(SubAgentCompletedEvent),
18    TodosUpdated(TodosUpdatedEvent),
19    StateCheckpointed(StateCheckpointedEvent),
20    PlanningComplete(PlanningCompleteEvent),
21}
22
23impl AgentEvent {
24    pub fn event_type_name(&self) -> &'static str {
25        match self {
26            AgentEvent::AgentStarted(_) => "agent_started",
27            AgentEvent::AgentCompleted(_) => "agent_completed",
28            AgentEvent::ToolStarted(_) => "tool_started",
29            AgentEvent::ToolCompleted(_) => "tool_completed",
30            AgentEvent::ToolFailed(_) => "tool_failed",
31            AgentEvent::SubAgentStarted(_) => "sub_agent_started",
32            AgentEvent::SubAgentCompleted(_) => "sub_agent_completed",
33            AgentEvent::TodosUpdated(_) => "todos_updated",
34            AgentEvent::StateCheckpointed(_) => "state_checkpointed",
35            AgentEvent::PlanningComplete(_) => "planning_complete",
36        }
37    }
38
39    pub fn metadata(&self) -> &EventMetadata {
40        match self {
41            AgentEvent::AgentStarted(e) => &e.metadata,
42            AgentEvent::AgentCompleted(e) => &e.metadata,
43            AgentEvent::ToolStarted(e) => &e.metadata,
44            AgentEvent::ToolCompleted(e) => &e.metadata,
45            AgentEvent::ToolFailed(e) => &e.metadata,
46            AgentEvent::SubAgentStarted(e) => &e.metadata,
47            AgentEvent::SubAgentCompleted(e) => &e.metadata,
48            AgentEvent::TodosUpdated(e) => &e.metadata,
49            AgentEvent::StateCheckpointed(e) => &e.metadata,
50            AgentEvent::PlanningComplete(e) => &e.metadata,
51        }
52    }
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventMetadata {
57    pub thread_id: String,
58    pub correlation_id: String,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub customer_id: Option<String>,
61    pub timestamp: String,
62}
63
64impl EventMetadata {
65    pub fn new(thread_id: String, correlation_id: String, customer_id: Option<String>) -> Self {
66        Self {
67            thread_id,
68            correlation_id,
69            customer_id,
70            timestamp: chrono::Utc::now().to_rfc3339(),
71        }
72    }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct AgentStartedEvent {
77    pub metadata: EventMetadata,
78    pub agent_name: String,
79    pub message_preview: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct AgentCompletedEvent {
84    pub metadata: EventMetadata,
85    pub agent_name: String,
86    pub duration_ms: u64,
87    pub response_preview: String, // Truncated for logs (~100 chars)
88    pub response: String,         // Full response text
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct ToolStartedEvent {
93    pub metadata: EventMetadata,
94    pub tool_name: String,
95    pub input_summary: String,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct ToolCompletedEvent {
100    pub metadata: EventMetadata,
101    pub tool_name: String,
102    pub duration_ms: u64,
103    pub result_summary: String,
104    pub success: bool,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ToolFailedEvent {
109    pub metadata: EventMetadata,
110    pub tool_name: String,
111    pub duration_ms: u64,
112    pub error_message: String,
113    pub is_recoverable: bool,
114    pub retry_count: u32,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct SubAgentStartedEvent {
119    pub metadata: EventMetadata,
120    pub agent_name: String,
121    pub instruction_summary: String,
122    pub delegation_depth: u32,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct SubAgentCompletedEvent {
127    pub metadata: EventMetadata,
128    pub agent_name: String,
129    pub duration_ms: u64,
130    pub result_summary: String,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct TodosUpdatedEvent {
135    pub metadata: EventMetadata,
136    pub todos: Vec<TodoItem>,
137    pub pending_count: usize,
138    pub in_progress_count: usize,
139    pub completed_count: usize,
140    pub last_updated: String,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct StateCheckpointedEvent {
145    pub metadata: EventMetadata,
146    pub checkpoint_id: String,
147    pub state_size_bytes: usize,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct PlanningCompleteEvent {
152    pub metadata: EventMetadata,
153    pub action_type: String,
154    pub action_summary: String,
155}
156
157#[async_trait]
158pub trait EventBroadcaster: Send + Sync {
159    fn id(&self) -> &str;
160    async fn broadcast(&self, event: &AgentEvent) -> anyhow::Result<()>;
161    fn should_broadcast(&self, _event: &AgentEvent) -> bool {
162        true
163    }
164}
165
166pub struct EventDispatcher {
167    broadcasters: std::sync::RwLock<Vec<Arc<dyn EventBroadcaster>>>,
168}
169
170impl EventDispatcher {
171    pub fn new() -> Self {
172        Self {
173            broadcasters: std::sync::RwLock::new(Vec::new()),
174        }
175    }
176
177    /// Add a broadcaster (supports dynamic addition with interior mutability)
178    pub fn add_broadcaster(&self, broadcaster: Arc<dyn EventBroadcaster>) {
179        if let Ok(mut broadcasters) = self.broadcasters.write() {
180            broadcasters.push(broadcaster);
181        } else {
182            tracing::error!("Failed to acquire write lock on broadcasters");
183        }
184    }
185
186    pub async fn dispatch(&self, event: AgentEvent) {
187        let broadcasters = {
188            if let Ok(guard) = self.broadcasters.read() {
189                guard.clone()
190            } else {
191                tracing::error!("Failed to acquire read lock on broadcasters");
192                return;
193            }
194        };
195
196        for broadcaster in broadcasters {
197            let event_clone = event.clone();
198            tokio::spawn(async move {
199                if broadcaster.should_broadcast(&event_clone) {
200                    if let Err(e) = broadcaster.broadcast(&event_clone).await {
201                        tracing::warn!(
202                            broadcaster_id = broadcaster.id(),
203                            error = %e,
204                            "Failed to broadcast event"
205                        );
206                    }
207                }
208            });
209        }
210    }
211}
212
213impl Default for EventDispatcher {
214    fn default() -> Self {
215        Self::new()
216    }
217}