1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventType {
15 WorkflowStarted,
17 WorkflowStepStarted,
18 WorkflowStepCompleted,
19 WorkflowCompleted,
20 WorkflowFailed,
21
22 AgentInitialized,
24 AgentProcessing,
25 AgentCompleted,
26 AgentFailed,
27
28 AgentLlmRequestStarted,
30 AgentLlmStreamChunk,
31 AgentLlmRequestCompleted,
32 AgentLlmRequestFailed,
33
34 ToolCallStarted,
36 ToolCallCompleted,
37 ToolCallFailed,
38 AgentToolLoopDetected,
39
40 SystemError,
42 StateSaved,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Event {
48 pub id: EventId,
49 pub offset: EventOffset,
50 pub timestamp: DateTime<Utc>,
51 #[serde(rename = "type")]
52 pub event_type: EventType,
53 pub workflow_id: WorkflowId,
54
55 #[serde(skip_serializing_if = "Option::is_none")]
57 pub parent_workflow_id: Option<WorkflowId>,
58
59 pub data: JsonValue,
60}
61
62impl Event {
63 pub fn new(
64 offset: EventOffset,
65 event_type: EventType,
66 workflow_id: WorkflowId,
67 data: JsonValue,
68 ) -> Self {
69 Self {
70 id: format!("evt_{}", uuid::Uuid::new_v4()),
71 offset,
72 timestamp: Utc::now(),
73 event_type,
74 workflow_id,
75 parent_workflow_id: None,
76 data,
77 }
78 }
79
80 pub fn with_parent(
81 offset: EventOffset,
82 event_type: EventType,
83 workflow_id: WorkflowId,
84 parent_workflow_id: Option<WorkflowId>,
85 data: JsonValue,
86 ) -> Self {
87 Self {
88 id: format!("evt_{}", uuid::Uuid::new_v4()),
89 offset,
90 timestamp: Utc::now(),
91 event_type,
92 workflow_id,
93 parent_workflow_id,
94 data,
95 }
96 }
97}
98
99pub struct EventStream {
101 sender: broadcast::Sender<Event>,
103
104 history: Arc<RwLock<Vec<Event>>>,
106
107 next_offset: Arc<RwLock<EventOffset>>,
109}
110
111impl EventStream {
112 pub fn new() -> Self {
114 Self::with_capacity(1000)
115 }
116
117 pub fn with_capacity(capacity: usize) -> Self {
119 let (sender, _) = broadcast::channel(capacity);
120
121 Self {
122 sender,
123 history: Arc::new(RwLock::new(Vec::new())),
124 next_offset: Arc::new(RwLock::new(0)),
125 }
126 }
127
128 pub fn append(&self, event_type: EventType, workflow_id: WorkflowId, data: JsonValue) -> Event {
130 self.append_with_parent(event_type, workflow_id, None, data)
131 }
132
133 pub fn append_with_parent(
135 &self,
136 event_type: EventType,
137 workflow_id: WorkflowId,
138 parent_workflow_id: Option<WorkflowId>,
139 data: JsonValue,
140 ) -> Event {
141 let offset = {
143 let mut next_offset = self.next_offset.write().unwrap();
144 let current = *next_offset;
145 *next_offset += 1;
146 current
147 };
148
149 let event = Event::with_parent(offset, event_type, workflow_id, parent_workflow_id, data);
150
151 self.history.write().unwrap().push(event.clone());
153
154 let _ = self.sender.send(event.clone());
156
157 event
158 }
159
160 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
163 self.sender.subscribe()
164 }
165
166 pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
168 let history = self.history.read().unwrap();
169 history
170 .iter()
171 .filter(|e| e.offset >= offset)
172 .cloned()
173 .collect()
174 }
175
176 pub fn all(&self) -> Vec<Event> {
178 self.history.read().unwrap().clone()
179 }
180
181 pub fn len(&self) -> usize {
183 self.history.read().unwrap().len()
184 }
185
186 pub fn is_empty(&self) -> bool {
187 self.history.read().unwrap().is_empty()
188 }
189
190 pub fn current_offset(&self) -> EventOffset {
192 *self.next_offset.read().unwrap()
193 }
194}
195
196impl Default for EventStream {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202impl Clone for EventStream {
203 fn clone(&self) -> Self {
204 Self {
205 sender: self.sender.clone(),
206 history: Arc::clone(&self.history),
207 next_offset: Arc::clone(&self.next_offset),
208 }
209 }
210}