Skip to main content

agent_runtime/
event.rs

1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11/// Event types that can occur in the system
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventType {
15    // Workflow events
16    WorkflowStarted,
17    WorkflowStepStarted,
18    WorkflowStepCompleted,
19    WorkflowCompleted,
20    WorkflowFailed,
21
22    // Agent events
23    AgentInitialized,
24    AgentProcessing,
25    AgentCompleted,
26    AgentFailed,
27
28    // LLM events
29    AgentLlmRequestStarted,
30    AgentLlmStreamChunk,
31    AgentLlmRequestCompleted,
32    AgentLlmRequestFailed,
33
34    // Tool events
35    ToolCallStarted,
36    ToolCallCompleted,
37    ToolCallFailed,
38    AgentToolLoopDetected,
39
40    // System events
41    SystemError,
42    StateSaved,
43}
44
45/// An immutable event record
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Event {
48    pub id: EventId,
49    pub offset: EventOffset,
50    pub timestamp: DateTime<Utc>,
51    #[serde(rename = "type")]
52    pub event_type: EventType,
53    pub workflow_id: WorkflowId,
54
55    /// Optional parent workflow ID for nested workflows
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub parent_workflow_id: Option<WorkflowId>,
58
59    pub data: JsonValue,
60}
61
62impl Event {
63    pub fn new(
64        offset: EventOffset,
65        event_type: EventType,
66        workflow_id: WorkflowId,
67        data: JsonValue,
68    ) -> Self {
69        Self {
70            id: format!("evt_{}", uuid::Uuid::new_v4()),
71            offset,
72            timestamp: Utc::now(),
73            event_type,
74            workflow_id,
75            parent_workflow_id: None,
76            data,
77        }
78    }
79
80    pub fn with_parent(
81        offset: EventOffset,
82        event_type: EventType,
83        workflow_id: WorkflowId,
84        parent_workflow_id: Option<WorkflowId>,
85        data: JsonValue,
86    ) -> Self {
87        Self {
88            id: format!("evt_{}", uuid::Uuid::new_v4()),
89            offset,
90            timestamp: Utc::now(),
91            event_type,
92            workflow_id,
93            parent_workflow_id,
94            data,
95        }
96    }
97}
98
99/// Event stream with broadcast capability for real-time subscribers
100pub struct EventStream {
101    /// Broadcast sender for real-time event streaming
102    sender: broadcast::Sender<Event>,
103
104    /// Historical events for replay (thread-safe)
105    history: Arc<RwLock<Vec<Event>>>,
106
107    /// Next offset to assign
108    next_offset: Arc<RwLock<EventOffset>>,
109}
110
111impl EventStream {
112    /// Create a new event stream with specified channel capacity
113    pub fn new() -> Self {
114        Self::with_capacity(1000)
115    }
116
117    /// Create event stream with custom channel capacity
118    pub fn with_capacity(capacity: usize) -> Self {
119        let (sender, _) = broadcast::channel(capacity);
120
121        Self {
122            sender,
123            history: Arc::new(RwLock::new(Vec::new())),
124            next_offset: Arc::new(RwLock::new(0)),
125        }
126    }
127
128    /// Append a new event and broadcast to all subscribers
129    pub fn append(&self, event_type: EventType, workflow_id: WorkflowId, data: JsonValue) -> Event {
130        self.append_with_parent(event_type, workflow_id, None, data)
131    }
132
133    /// Append event with optional parent workflow ID
134    pub fn append_with_parent(
135        &self,
136        event_type: EventType,
137        workflow_id: WorkflowId,
138        parent_workflow_id: Option<WorkflowId>,
139        data: JsonValue,
140    ) -> Event {
141        // Get and increment offset atomically
142        let offset = {
143            let mut next_offset = self.next_offset.write().unwrap();
144            let current = *next_offset;
145            *next_offset += 1;
146            current
147        };
148
149        let event = Event::with_parent(offset, event_type, workflow_id, parent_workflow_id, data);
150
151        // Store in history
152        self.history.write().unwrap().push(event.clone());
153
154        // Broadcast to subscribers (ignore if no active receivers)
155        let _ = self.sender.send(event.clone());
156
157        event
158    }
159
160    /// Subscribe to real-time event stream
161    /// Returns a receiver that will get all future events
162    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
163        self.sender.subscribe()
164    }
165
166    /// Get events from a specific offset (for replay)
167    pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
168        let history = self.history.read().unwrap();
169        history
170            .iter()
171            .filter(|e| e.offset >= offset)
172            .cloned()
173            .collect()
174    }
175
176    /// Get all events
177    pub fn all(&self) -> Vec<Event> {
178        self.history.read().unwrap().clone()
179    }
180
181    /// Get event count
182    pub fn len(&self) -> usize {
183        self.history.read().unwrap().len()
184    }
185
186    pub fn is_empty(&self) -> bool {
187        self.history.read().unwrap().is_empty()
188    }
189
190    /// Get the current offset (next event will have this offset)
191    pub fn current_offset(&self) -> EventOffset {
192        *self.next_offset.read().unwrap()
193    }
194}
195
196impl Default for EventStream {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202impl Clone for EventStream {
203    fn clone(&self) -> Self {
204        Self {
205            sender: self.sender.clone(),
206            history: Arc::clone(&self.history),
207            next_offset: Arc::clone(&self.next_offset),
208        }
209    }
210}