1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventType {
15 WorkflowStarted,
17 WorkflowStepStarted,
18 WorkflowStepCompleted,
19 WorkflowCompleted,
20 WorkflowFailed,
21
22 AgentInitialized,
24 AgentProcessing,
25 AgentCompleted,
26 AgentFailed,
27
28 AgentLlmRequestStarted,
30 AgentLlmStreamChunk,
31 AgentLlmRequestCompleted,
32 AgentLlmRequestFailed,
33
34 ToolCallStarted,
36 ToolCallCompleted,
37 ToolCallFailed,
38
39 SystemError,
41 StateSaved,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct Event {
47 pub id: EventId,
48 pub offset: EventOffset,
49 pub timestamp: DateTime<Utc>,
50 #[serde(rename = "type")]
51 pub event_type: EventType,
52 pub workflow_id: WorkflowId,
53
54 #[serde(skip_serializing_if = "Option::is_none")]
56 pub parent_workflow_id: Option<WorkflowId>,
57
58 pub data: JsonValue,
59}
60
61impl Event {
62 pub fn new(
63 offset: EventOffset,
64 event_type: EventType,
65 workflow_id: WorkflowId,
66 data: JsonValue,
67 ) -> Self {
68 Self {
69 id: format!("evt_{}", uuid::Uuid::new_v4()),
70 offset,
71 timestamp: Utc::now(),
72 event_type,
73 workflow_id,
74 parent_workflow_id: None,
75 data,
76 }
77 }
78
79 pub fn with_parent(
80 offset: EventOffset,
81 event_type: EventType,
82 workflow_id: WorkflowId,
83 parent_workflow_id: Option<WorkflowId>,
84 data: JsonValue,
85 ) -> Self {
86 Self {
87 id: format!("evt_{}", uuid::Uuid::new_v4()),
88 offset,
89 timestamp: Utc::now(),
90 event_type,
91 workflow_id,
92 parent_workflow_id,
93 data,
94 }
95 }
96}
97
98pub struct EventStream {
100 sender: broadcast::Sender<Event>,
102
103 history: Arc<RwLock<Vec<Event>>>,
105
106 next_offset: Arc<RwLock<EventOffset>>,
108}
109
110impl EventStream {
111 pub fn new() -> Self {
113 Self::with_capacity(1000)
114 }
115
116 pub fn with_capacity(capacity: usize) -> Self {
118 let (sender, _) = broadcast::channel(capacity);
119
120 Self {
121 sender,
122 history: Arc::new(RwLock::new(Vec::new())),
123 next_offset: Arc::new(RwLock::new(0)),
124 }
125 }
126
127 pub fn append(&self, event_type: EventType, workflow_id: WorkflowId, data: JsonValue) -> Event {
129 self.append_with_parent(event_type, workflow_id, None, data)
130 }
131
132 pub fn append_with_parent(
134 &self,
135 event_type: EventType,
136 workflow_id: WorkflowId,
137 parent_workflow_id: Option<WorkflowId>,
138 data: JsonValue,
139 ) -> Event {
140 let offset = {
142 let mut next_offset = self.next_offset.write().unwrap();
143 let current = *next_offset;
144 *next_offset += 1;
145 current
146 };
147
148 let event = Event::with_parent(offset, event_type, workflow_id, parent_workflow_id, data);
149
150 self.history.write().unwrap().push(event.clone());
152
153 let _ = self.sender.send(event.clone());
155
156 event
157 }
158
159 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
162 self.sender.subscribe()
163 }
164
165 pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
167 let history = self.history.read().unwrap();
168 history
169 .iter()
170 .filter(|e| e.offset >= offset)
171 .cloned()
172 .collect()
173 }
174
175 pub fn all(&self) -> Vec<Event> {
177 self.history.read().unwrap().clone()
178 }
179
180 pub fn len(&self) -> usize {
182 self.history.read().unwrap().len()
183 }
184
185 pub fn is_empty(&self) -> bool {
186 self.history.read().unwrap().is_empty()
187 }
188
189 pub fn current_offset(&self) -> EventOffset {
191 *self.next_offset.read().unwrap()
192 }
193}
194
195impl Default for EventStream {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201impl Clone for EventStream {
202 fn clone(&self) -> Self {
203 Self {
204 sender: self.sender.clone(),
205 history: Arc::clone(&self.history),
206 next_offset: Arc::clone(&self.next_offset),
207 }
208 }
209}