Skip to main content

agent_runtime/
event.rs

1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11/// Event types that can occur in the system
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventType {
15    // Workflow events
16    WorkflowStarted,
17    WorkflowStepStarted,
18    WorkflowStepCompleted,
19    WorkflowCompleted,
20    WorkflowFailed,
21
22    // Agent events
23    AgentInitialized,
24    AgentProcessing,
25    AgentCompleted,
26    AgentFailed,
27
28    // LLM events
29    AgentLlmRequestStarted,
30    AgentLlmStreamChunk,
31    AgentLlmRequestCompleted,
32    AgentLlmRequestFailed,
33
34    // Tool events
35    ToolCallStarted,
36    ToolCallCompleted,
37    ToolCallFailed,
38
39    // System events
40    SystemError,
41    StateSaved,
42}
43
44/// An immutable event record
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct Event {
47    pub id: EventId,
48    pub offset: EventOffset,
49    pub timestamp: DateTime<Utc>,
50    #[serde(rename = "type")]
51    pub event_type: EventType,
52    pub workflow_id: WorkflowId,
53
54    /// Optional parent workflow ID for nested workflows
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub parent_workflow_id: Option<WorkflowId>,
57
58    pub data: JsonValue,
59}
60
61impl Event {
62    pub fn new(
63        offset: EventOffset,
64        event_type: EventType,
65        workflow_id: WorkflowId,
66        data: JsonValue,
67    ) -> Self {
68        Self {
69            id: format!("evt_{}", uuid::Uuid::new_v4()),
70            offset,
71            timestamp: Utc::now(),
72            event_type,
73            workflow_id,
74            parent_workflow_id: None,
75            data,
76        }
77    }
78
79    pub fn with_parent(
80        offset: EventOffset,
81        event_type: EventType,
82        workflow_id: WorkflowId,
83        parent_workflow_id: Option<WorkflowId>,
84        data: JsonValue,
85    ) -> Self {
86        Self {
87            id: format!("evt_{}", uuid::Uuid::new_v4()),
88            offset,
89            timestamp: Utc::now(),
90            event_type,
91            workflow_id,
92            parent_workflow_id,
93            data,
94        }
95    }
96}
97
98/// Event stream with broadcast capability for real-time subscribers
99pub struct EventStream {
100    /// Broadcast sender for real-time event streaming
101    sender: broadcast::Sender<Event>,
102
103    /// Historical events for replay (thread-safe)
104    history: Arc<RwLock<Vec<Event>>>,
105
106    /// Next offset to assign
107    next_offset: Arc<RwLock<EventOffset>>,
108}
109
110impl EventStream {
111    /// Create a new event stream with specified channel capacity
112    pub fn new() -> Self {
113        Self::with_capacity(1000)
114    }
115
116    /// Create event stream with custom channel capacity
117    pub fn with_capacity(capacity: usize) -> Self {
118        let (sender, _) = broadcast::channel(capacity);
119
120        Self {
121            sender,
122            history: Arc::new(RwLock::new(Vec::new())),
123            next_offset: Arc::new(RwLock::new(0)),
124        }
125    }
126
127    /// Append a new event and broadcast to all subscribers
128    pub fn append(&self, event_type: EventType, workflow_id: WorkflowId, data: JsonValue) -> Event {
129        self.append_with_parent(event_type, workflow_id, None, data)
130    }
131
132    /// Append event with optional parent workflow ID
133    pub fn append_with_parent(
134        &self,
135        event_type: EventType,
136        workflow_id: WorkflowId,
137        parent_workflow_id: Option<WorkflowId>,
138        data: JsonValue,
139    ) -> Event {
140        // Get and increment offset atomically
141        let offset = {
142            let mut next_offset = self.next_offset.write().unwrap();
143            let current = *next_offset;
144            *next_offset += 1;
145            current
146        };
147
148        let event = Event::with_parent(offset, event_type, workflow_id, parent_workflow_id, data);
149
150        // Store in history
151        self.history.write().unwrap().push(event.clone());
152
153        // Broadcast to subscribers (ignore if no active receivers)
154        let _ = self.sender.send(event.clone());
155
156        event
157    }
158
159    /// Subscribe to real-time event stream
160    /// Returns a receiver that will get all future events
161    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
162        self.sender.subscribe()
163    }
164
165    /// Get events from a specific offset (for replay)
166    pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
167        let history = self.history.read().unwrap();
168        history
169            .iter()
170            .filter(|e| e.offset >= offset)
171            .cloned()
172            .collect()
173    }
174
175    /// Get all events
176    pub fn all(&self) -> Vec<Event> {
177        self.history.read().unwrap().clone()
178    }
179
180    /// Get event count
181    pub fn len(&self) -> usize {
182        self.history.read().unwrap().len()
183    }
184
185    pub fn is_empty(&self) -> bool {
186        self.history.read().unwrap().is_empty()
187    }
188
189    /// Get the current offset (next event will have this offset)
190    pub fn current_offset(&self) -> EventOffset {
191        *self.next_offset.read().unwrap()
192    }
193}
194
195impl Default for EventStream {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201impl Clone for EventStream {
202    fn clone(&self) -> Self {
203        Self {
204            sender: self.sender.clone(),
205            history: Arc::clone(&self.history),
206            next_offset: Arc::clone(&self.next_offset),
207        }
208    }
209}