1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventType {
15 WorkflowStarted,
17 WorkflowStepStarted,
18 WorkflowStepCompleted,
19 WorkflowCompleted,
20 WorkflowFailed,
21
22 AgentInitialized,
24 AgentProcessing,
25 AgentCompleted,
26 AgentFailed,
27
28 AgentLlmRequestStarted,
30 AgentLlmStreamChunk,
31 AgentLlmRequestCompleted,
32 AgentLlmRequestFailed,
33
34 ToolCallStarted,
36 ToolCallCompleted,
37 ToolCallFailed,
38 AgentToolLoopDetected,
39
40 SystemError,
42 StateSaved,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Event {
48 pub id: EventId,
49 pub offset: EventOffset,
50 pub timestamp: DateTime<Utc>,
51 #[serde(rename = "type")]
52 pub event_type: EventType,
53 pub workflow_id: WorkflowId,
54
55 #[serde(skip_serializing_if = "Option::is_none")]
57 pub parent_workflow_id: Option<WorkflowId>,
58
59 pub data: JsonValue,
60}
61
62impl Event {
63 pub fn new(
64 offset: EventOffset,
65 event_type: EventType,
66 workflow_id: WorkflowId,
67 data: JsonValue,
68 ) -> Self {
69 Self {
70 id: format!("evt_{}", uuid::Uuid::new_v4()),
71 offset,
72 timestamp: Utc::now(),
73 event_type,
74 workflow_id,
75 parent_workflow_id: None,
76 data,
77 }
78 }
79
80 pub fn with_parent(
81 offset: EventOffset,
82 event_type: EventType,
83 workflow_id: WorkflowId,
84 parent_workflow_id: Option<WorkflowId>,
85 data: JsonValue,
86 ) -> Self {
87 Self {
88 id: format!("evt_{}", uuid::Uuid::new_v4()),
89 offset,
90 timestamp: Utc::now(),
91 event_type,
92 workflow_id,
93 parent_workflow_id,
94 data,
95 }
96 }
97}
98
99pub struct EventStream {
101 sender: broadcast::Sender<Event>,
103
104 history: Arc<RwLock<Vec<Event>>>,
106
107 next_offset: Arc<RwLock<EventOffset>>,
109}
110
111impl EventStream {
112 pub fn new() -> Self {
114 Self::with_capacity(1000)
115 }
116
117 pub fn with_capacity(capacity: usize) -> Self {
119 let (sender, _) = broadcast::channel(capacity);
120
121 Self {
122 sender,
123 history: Arc::new(RwLock::new(Vec::new())),
124 next_offset: Arc::new(RwLock::new(0)),
125 }
126 }
127
128 pub fn append(
152 &self,
153 event_type: EventType,
154 workflow_id: WorkflowId,
155 data: JsonValue,
156 ) -> tokio::task::JoinHandle<Event> {
157 self.append_with_parent(event_type, workflow_id, None, data)
158 }
159
160 pub fn append_with_parent(
165 &self,
166 event_type: EventType,
167 workflow_id: WorkflowId,
168 parent_workflow_id: Option<WorkflowId>,
169 data: JsonValue,
170 ) -> tokio::task::JoinHandle<Event> {
171 let sender = self.sender.clone();
172 let history = self.history.clone();
173 let next_offset = self.next_offset.clone();
174
175 tokio::spawn(async move {
177 let offset = {
179 let mut next_offset = next_offset.write().unwrap();
180 let current = *next_offset;
181 *next_offset += 1;
182 current
183 };
184
185 let event =
186 Event::with_parent(offset, event_type, workflow_id, parent_workflow_id, data);
187
188 history.write().unwrap().push(event.clone());
190
191 let _ = sender.send(event.clone());
193
194 event
195 })
196 }
197
198 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
201 self.sender.subscribe()
202 }
203
204 pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
206 let history = self.history.read().unwrap();
207 history
208 .iter()
209 .filter(|e| e.offset >= offset)
210 .cloned()
211 .collect()
212 }
213
214 pub fn all(&self) -> Vec<Event> {
216 self.history.read().unwrap().clone()
217 }
218
219 pub fn len(&self) -> usize {
221 self.history.read().unwrap().len()
222 }
223
224 pub fn is_empty(&self) -> bool {
225 self.history.read().unwrap().is_empty()
226 }
227
228 pub fn current_offset(&self) -> EventOffset {
230 *self.next_offset.read().unwrap()
231 }
232}
233
234impl Default for EventStream {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240impl Clone for EventStream {
241 fn clone(&self) -> Self {
242 Self {
243 sender: self.sender.clone(),
244 history: Arc::clone(&self.history),
245 next_offset: Arc::clone(&self.next_offset),
246 }
247 }
248}