Skip to main content

agent_runtime/
event.rs

1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11/// Event types that can occur in the system
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventType {
15    // Workflow events
16    WorkflowStarted,
17    WorkflowStepStarted,
18    WorkflowStepCompleted,
19    WorkflowCompleted,
20    WorkflowFailed,
21
22    // Agent events
23    AgentInitialized,
24    AgentProcessing,
25    AgentCompleted,
26    AgentFailed,
27
28    // LLM events
29    AgentLlmRequestStarted,
30    AgentLlmStreamChunk,
31    AgentLlmRequestCompleted,
32    AgentLlmRequestFailed,
33
34    // Tool events
35    ToolCallStarted,
36    ToolCallCompleted,
37    ToolCallFailed,
38    AgentToolLoopDetected,
39
40    // System events
41    SystemError,
42    StateSaved,
43}
44
45/// An immutable event record
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Event {
48    pub id: EventId,
49    pub offset: EventOffset,
50    pub timestamp: DateTime<Utc>,
51    #[serde(rename = "type")]
52    pub event_type: EventType,
53    pub workflow_id: WorkflowId,
54
55    /// Optional parent workflow ID for nested workflows
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub parent_workflow_id: Option<WorkflowId>,
58
59    pub data: JsonValue,
60}
61
62impl Event {
63    pub fn new(
64        offset: EventOffset,
65        event_type: EventType,
66        workflow_id: WorkflowId,
67        data: JsonValue,
68    ) -> Self {
69        Self {
70            id: format!("evt_{}", uuid::Uuid::new_v4()),
71            offset,
72            timestamp: Utc::now(),
73            event_type,
74            workflow_id,
75            parent_workflow_id: None,
76            data,
77        }
78    }
79
80    pub fn with_parent(
81        offset: EventOffset,
82        event_type: EventType,
83        workflow_id: WorkflowId,
84        parent_workflow_id: Option<WorkflowId>,
85        data: JsonValue,
86    ) -> Self {
87        Self {
88            id: format!("evt_{}", uuid::Uuid::new_v4()),
89            offset,
90            timestamp: Utc::now(),
91            event_type,
92            workflow_id,
93            parent_workflow_id,
94            data,
95        }
96    }
97}
98
99/// Event stream with broadcast capability for real-time subscribers
100pub struct EventStream {
101    /// Broadcast sender for real-time event streaming
102    sender: broadcast::Sender<Event>,
103
104    /// Historical events for replay (thread-safe)
105    history: Arc<RwLock<Vec<Event>>>,
106
107    /// Next offset to assign
108    next_offset: Arc<RwLock<EventOffset>>,
109}
110
111impl EventStream {
112    /// Create a new event stream with specified channel capacity
113    pub fn new() -> Self {
114        Self::with_capacity(1000)
115    }
116
117    /// Create event stream with custom channel capacity
118    pub fn with_capacity(capacity: usize) -> Self {
119        let (sender, _) = broadcast::channel(capacity);
120
121        Self {
122            sender,
123            history: Arc::new(RwLock::new(Vec::new())),
124            next_offset: Arc::new(RwLock::new(0)),
125        }
126    }
127
128    /// Append a new event and broadcast to all subscribers
129    ///
130    /// Events are emitted asynchronously in a spawned task to avoid blocking
131    /// agent execution. Returns a JoinHandle that can be awaited if the caller
132    /// needs to ensure the event was processed or needs the Event object.
133    ///
134    /// # Examples
135    /// ```no_run
136    /// use agent_runtime::event::{EventStream, EventType};
137    /// use serde_json::json;
138    ///
139    /// # async fn example() {
140    /// let stream = EventStream::new();
141    ///
142    /// // Fire and forget (most common)
143    /// stream.append(EventType::AgentInitialized, "workflow_1".to_string(), json!({}));
144    ///
145    /// // Wait for event if needed
146    /// let event = stream.append(EventType::AgentCompleted, "workflow_1".to_string(), json!({}))
147    ///     .await
148    ///     .unwrap();
149    /// # }
150    /// ```
151    pub fn append(
152        &self,
153        event_type: EventType,
154        workflow_id: WorkflowId,
155        data: JsonValue,
156    ) -> tokio::task::JoinHandle<Event> {
157        self.append_with_parent(event_type, workflow_id, None, data)
158    }
159
160    /// Append event with optional parent workflow ID
161    ///
162    /// Events are emitted asynchronously to avoid blocking execution.
163    /// Returns a JoinHandle that resolves to the created Event.
164    pub fn append_with_parent(
165        &self,
166        event_type: EventType,
167        workflow_id: WorkflowId,
168        parent_workflow_id: Option<WorkflowId>,
169        data: JsonValue,
170    ) -> tokio::task::JoinHandle<Event> {
171        let sender = self.sender.clone();
172        let history = self.history.clone();
173        let next_offset = self.next_offset.clone();
174
175        // Spawn async task - never blocks the caller
176        tokio::spawn(async move {
177            // Get and increment offset atomically
178            let offset = {
179                let mut next_offset = next_offset.write().unwrap();
180                let current = *next_offset;
181                *next_offset += 1;
182                current
183            };
184
185            let event =
186                Event::with_parent(offset, event_type, workflow_id, parent_workflow_id, data);
187
188            // Store in history
189            history.write().unwrap().push(event.clone());
190
191            // Broadcast to subscribers (ignore if no active receivers)
192            let _ = sender.send(event.clone());
193
194            event
195        })
196    }
197
198    /// Subscribe to real-time event stream
199    /// Returns a receiver that will get all future events
200    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
201        self.sender.subscribe()
202    }
203
204    /// Get events from a specific offset (for replay)
205    pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
206        let history = self.history.read().unwrap();
207        history
208            .iter()
209            .filter(|e| e.offset >= offset)
210            .cloned()
211            .collect()
212    }
213
214    /// Get all events
215    pub fn all(&self) -> Vec<Event> {
216        self.history.read().unwrap().clone()
217    }
218
219    /// Get event count
220    pub fn len(&self) -> usize {
221        self.history.read().unwrap().len()
222    }
223
224    pub fn is_empty(&self) -> bool {
225        self.history.read().unwrap().is_empty()
226    }
227
228    /// Get the current offset (next event will have this offset)
229    pub fn current_offset(&self) -> EventOffset {
230        *self.next_offset.read().unwrap()
231    }
232}
233
234impl Default for EventStream {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240impl Clone for EventStream {
241    fn clone(&self) -> Self {
242        Self {
243            sender: self.sender.clone(),
244            history: Arc::clone(&self.history),
245            next_offset: Arc::clone(&self.next_offset),
246        }
247    }
248}