1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventScope {
15 Workflow,
16 WorkflowStep,
17 Agent,
18 LlmRequest,
19 Tool,
20 System,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25#[serde(rename_all = "snake_case")]
26pub enum EventType {
27 Started,
28 Progress,
29 Completed,
30 Failed,
31 Canceled,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(rename_all = "snake_case")]
37pub enum ComponentStatus {
38 Pending,
39 Running,
40 Completed,
41 Failed,
42 Canceled,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Event {
48 pub id: EventId,
49 pub offset: EventOffset,
50 pub timestamp: DateTime<Utc>,
51
52 pub scope: EventScope,
54
55 #[serde(rename = "type")]
57 pub event_type: EventType,
58
59 pub component_id: String,
61
62 pub status: ComponentStatus,
64
65 pub workflow_id: WorkflowId,
67
68 #[serde(skip_serializing_if = "Option::is_none")]
70 pub parent_workflow_id: Option<WorkflowId>,
71
72 #[serde(skip_serializing_if = "Option::is_none")]
74 pub message: Option<String>,
75
76 pub data: JsonValue,
78}
79
80impl Event {
81 #[allow(clippy::too_many_arguments)]
82 pub fn new(
83 offset: EventOffset,
84 scope: EventScope,
85 event_type: EventType,
86 component_id: String,
87 status: ComponentStatus,
88 workflow_id: WorkflowId,
89 message: Option<String>,
90 data: JsonValue,
91 ) -> Result<Self, String> {
92 Self::validate_component_id(&scope, &component_id)?;
94
95 Ok(Self {
96 id: format!("evt_{}", uuid::Uuid::new_v4()),
97 offset,
98 timestamp: Utc::now(),
99 scope,
100 event_type,
101 component_id,
102 status,
103 workflow_id,
104 parent_workflow_id: None,
105 message,
106 data,
107 })
108 }
109
110 #[allow(clippy::too_many_arguments)]
111 pub fn with_parent(
112 offset: EventOffset,
113 scope: EventScope,
114 event_type: EventType,
115 component_id: String,
116 status: ComponentStatus,
117 workflow_id: WorkflowId,
118 parent_workflow_id: Option<WorkflowId>,
119 message: Option<String>,
120 data: JsonValue,
121 ) -> Result<Self, String> {
122 Self::validate_component_id(&scope, &component_id)?;
124
125 Ok(Self {
126 id: format!("evt_{}", uuid::Uuid::new_v4()),
127 offset,
128 timestamp: Utc::now(),
129 scope,
130 event_type,
131 component_id,
132 status,
133 workflow_id,
134 parent_workflow_id,
135 message,
136 data,
137 })
138 }
139
140 fn validate_component_id(scope: &EventScope, component_id: &str) -> Result<(), String> {
142 if component_id.is_empty() {
143 return Err(format!("{:?} component_id cannot be empty", scope));
144 }
145
146 match scope {
147 EventScope::Workflow => {
148 Ok(())
150 }
151 EventScope::WorkflowStep => {
152 let parts: Vec<&str> = component_id.split(':').collect();
154 if parts.len() != 3 || parts[1] != "step" {
155 return Err(format!(
156 "WorkflowStep component_id must be 'workflow_name:step:N', got '{}'",
157 component_id
158 ));
159 }
160 if parts[2].parse::<usize>().is_err() {
162 return Err(format!(
163 "WorkflowStep index must be a number, got '{}'",
164 parts[2]
165 ));
166 }
167 Ok(())
168 }
169 EventScope::Agent => {
170 Ok(())
172 }
173 EventScope::LlmRequest => {
174 let parts: Vec<&str> = component_id.split(':').collect();
176 if parts.len() != 3 || parts[1] != "llm" {
177 return Err(format!(
178 "LlmRequest component_id must be 'agent_name:llm:N', got '{}'",
179 component_id
180 ));
181 }
182 if parts[2].parse::<usize>().is_err() {
184 return Err(format!(
185 "LlmRequest iteration must be a number, got '{}'",
186 parts[2]
187 ));
188 }
189 Ok(())
190 }
191 EventScope::Tool => {
192 Ok(())
195 }
196 EventScope::System => {
197 if !component_id.starts_with("system:") {
199 return Err(format!(
200 "System component_id must start with 'system:', got '{}'",
201 component_id
202 ));
203 }
204 Ok(())
205 }
206 }
207 }
208}
209
210pub struct EventStream {
212 sender: broadcast::Sender<Event>,
214
215 history: Arc<RwLock<Vec<Event>>>,
217
218 next_offset: Arc<RwLock<EventOffset>>,
220}
221
222impl EventStream {
223 pub fn new() -> Self {
225 Self::with_capacity(1000)
226 }
227
228 pub fn with_capacity(capacity: usize) -> Self {
230 let (sender, _) = broadcast::channel(capacity);
231
232 Self {
233 sender,
234 history: Arc::new(RwLock::new(Vec::new())),
235 next_offset: Arc::new(RwLock::new(0)),
236 }
237 }
238
239 #[allow(clippy::too_many_arguments)]
277 pub fn append(
278 &self,
279 scope: EventScope,
280 event_type: EventType,
281 component_id: String,
282 status: ComponentStatus,
283 workflow_id: WorkflowId,
284 message: Option<String>,
285 data: JsonValue,
286 ) -> tokio::task::JoinHandle<Result<Event, String>> {
287 self.append_with_parent(
288 scope,
289 event_type,
290 component_id,
291 status,
292 workflow_id,
293 None,
294 message,
295 data,
296 )
297 }
298
299 #[allow(clippy::too_many_arguments)]
304 pub fn append_with_parent(
305 &self,
306 scope: EventScope,
307 event_type: EventType,
308 component_id: String,
309 status: ComponentStatus,
310 workflow_id: WorkflowId,
311 parent_workflow_id: Option<WorkflowId>,
312 message: Option<String>,
313 data: JsonValue,
314 ) -> tokio::task::JoinHandle<Result<Event, String>> {
315 let sender = self.sender.clone();
316 let history = self.history.clone();
317 let next_offset = self.next_offset.clone();
318
319 tokio::spawn(async move {
321 let offset = {
323 let mut next_offset = next_offset.write().unwrap();
324 let current = *next_offset;
325 *next_offset += 1;
326 current
327 };
328
329 let event = Event::with_parent(
330 offset,
331 scope,
332 event_type,
333 component_id,
334 status,
335 workflow_id,
336 parent_workflow_id,
337 message,
338 data,
339 )?;
340
341 history.write().unwrap().push(event.clone());
343
344 let _ = sender.send(event.clone());
346
347 Ok(event)
348 })
349 }
350
351 pub fn agent_started(
355 &self,
356 agent_name: &str,
357 workflow_id: WorkflowId,
358 data: JsonValue,
359 ) -> tokio::task::JoinHandle<Result<Event, String>> {
360 self.append(
361 EventScope::Agent,
362 EventType::Started,
363 agent_name.to_string(),
364 ComponentStatus::Running,
365 workflow_id,
366 None,
367 data,
368 )
369 }
370
371 pub fn agent_completed(
373 &self,
374 agent_name: &str,
375 workflow_id: WorkflowId,
376 message: Option<String>,
377 data: JsonValue,
378 ) -> tokio::task::JoinHandle<Result<Event, String>> {
379 self.append(
380 EventScope::Agent,
381 EventType::Completed,
382 agent_name.to_string(),
383 ComponentStatus::Completed,
384 workflow_id,
385 message,
386 data,
387 )
388 }
389
390 pub fn agent_failed(
392 &self,
393 agent_name: &str,
394 workflow_id: WorkflowId,
395 error: &str,
396 data: JsonValue,
397 ) -> tokio::task::JoinHandle<Result<Event, String>> {
398 self.append(
399 EventScope::Agent,
400 EventType::Failed,
401 agent_name.to_string(),
402 ComponentStatus::Failed,
403 workflow_id,
404 Some(error.to_string()),
405 data,
406 )
407 }
408
409 pub fn llm_started(
411 &self,
412 agent_name: &str,
413 iteration: usize,
414 workflow_id: WorkflowId,
415 data: JsonValue,
416 ) -> tokio::task::JoinHandle<Result<Event, String>> {
417 self.append(
418 EventScope::LlmRequest,
419 EventType::Started,
420 format!("{}:llm:{}", agent_name, iteration),
421 ComponentStatus::Running,
422 workflow_id,
423 None,
424 data,
425 )
426 }
427
428 pub fn llm_progress(
430 &self,
431 agent_name: &str,
432 iteration: usize,
433 workflow_id: WorkflowId,
434 chunk: String,
435 ) -> tokio::task::JoinHandle<Result<Event, String>> {
436 self.append(
437 EventScope::LlmRequest,
438 EventType::Progress,
439 format!("{}:llm:{}", agent_name, iteration),
440 ComponentStatus::Running,
441 workflow_id,
442 None,
443 serde_json::json!({ "chunk": chunk }),
444 )
445 }
446
447 pub fn llm_completed(
449 &self,
450 agent_name: &str,
451 iteration: usize,
452 workflow_id: WorkflowId,
453 data: JsonValue,
454 ) -> tokio::task::JoinHandle<Result<Event, String>> {
455 self.append(
456 EventScope::LlmRequest,
457 EventType::Completed,
458 format!("{}:llm:{}", agent_name, iteration),
459 ComponentStatus::Completed,
460 workflow_id,
461 None,
462 data,
463 )
464 }
465
466 pub fn llm_failed(
468 &self,
469 agent_name: &str,
470 iteration: usize,
471 workflow_id: WorkflowId,
472 error: &str,
473 ) -> tokio::task::JoinHandle<Result<Event, String>> {
474 self.append(
475 EventScope::LlmRequest,
476 EventType::Failed,
477 format!("{}:llm:{}", agent_name, iteration),
478 ComponentStatus::Failed,
479 workflow_id,
480 Some(error.to_string()),
481 serde_json::json!({}),
482 )
483 }
484
485 pub fn tool_started(
487 &self,
488 tool_name: &str,
489 workflow_id: WorkflowId,
490 data: JsonValue,
491 ) -> tokio::task::JoinHandle<Result<Event, String>> {
492 self.append(
493 EventScope::Tool,
494 EventType::Started,
495 tool_name.to_string(),
496 ComponentStatus::Running,
497 workflow_id,
498 None,
499 data,
500 )
501 }
502
503 pub fn tool_progress(
505 &self,
506 tool_name: &str,
507 workflow_id: WorkflowId,
508 message: &str,
509 percent: Option<u8>,
510 ) -> tokio::task::JoinHandle<Result<Event, String>> {
511 self.append(
512 EventScope::Tool,
513 EventType::Progress,
514 tool_name.to_string(),
515 ComponentStatus::Running,
516 workflow_id,
517 Some(message.to_string()),
518 serde_json::json!({ "percent": percent }),
519 )
520 }
521
522 pub fn tool_completed(
524 &self,
525 tool_name: &str,
526 workflow_id: WorkflowId,
527 data: JsonValue,
528 ) -> tokio::task::JoinHandle<Result<Event, String>> {
529 self.append(
530 EventScope::Tool,
531 EventType::Completed,
532 tool_name.to_string(),
533 ComponentStatus::Completed,
534 workflow_id,
535 None,
536 data,
537 )
538 }
539
540 pub fn tool_failed(
542 &self,
543 tool_name: &str,
544 workflow_id: WorkflowId,
545 error: &str,
546 data: JsonValue,
547 ) -> tokio::task::JoinHandle<Result<Event, String>> {
548 self.append(
549 EventScope::Tool,
550 EventType::Failed,
551 tool_name.to_string(),
552 ComponentStatus::Failed,
553 workflow_id,
554 Some(error.to_string()),
555 data,
556 )
557 }
558
559 pub fn workflow_started(
561 &self,
562 workflow_name: &str,
563 data: JsonValue,
564 ) -> tokio::task::JoinHandle<Result<Event, String>> {
565 self.append(
566 EventScope::Workflow,
567 EventType::Started,
568 workflow_name.to_string(),
569 ComponentStatus::Running,
570 workflow_name.to_string(),
571 None,
572 data,
573 )
574 }
575
576 pub fn workflow_completed(
578 &self,
579 workflow_name: &str,
580 data: JsonValue,
581 ) -> tokio::task::JoinHandle<Result<Event, String>> {
582 self.append(
583 EventScope::Workflow,
584 EventType::Completed,
585 workflow_name.to_string(),
586 ComponentStatus::Completed,
587 workflow_name.to_string(),
588 None,
589 data,
590 )
591 }
592
593 pub fn workflow_failed(
595 &self,
596 workflow_name: &str,
597 error: &str,
598 data: JsonValue,
599 ) -> tokio::task::JoinHandle<Result<Event, String>> {
600 self.append(
601 EventScope::Workflow,
602 EventType::Failed,
603 workflow_name.to_string(),
604 ComponentStatus::Failed,
605 workflow_name.to_string(),
606 Some(error.to_string()),
607 data,
608 )
609 }
610
611 pub fn step_started(
613 &self,
614 workflow_name: &str,
615 step_index: usize,
616 data: JsonValue,
617 ) -> tokio::task::JoinHandle<Result<Event, String>> {
618 self.append(
619 EventScope::WorkflowStep,
620 EventType::Started,
621 format!("{}:step:{}", workflow_name, step_index),
622 ComponentStatus::Running,
623 workflow_name.to_string(),
624 None,
625 data,
626 )
627 }
628
629 pub fn step_completed(
631 &self,
632 workflow_name: &str,
633 step_index: usize,
634 data: JsonValue,
635 ) -> tokio::task::JoinHandle<Result<Event, String>> {
636 self.append(
637 EventScope::WorkflowStep,
638 EventType::Completed,
639 format!("{}:step:{}", workflow_name, step_index),
640 ComponentStatus::Completed,
641 workflow_name.to_string(),
642 None,
643 data,
644 )
645 }
646
647 pub fn step_failed(
649 &self,
650 workflow_name: &str,
651 step_index: usize,
652 error: &str,
653 data: JsonValue,
654 ) -> tokio::task::JoinHandle<Result<Event, String>> {
655 self.append(
656 EventScope::WorkflowStep,
657 EventType::Failed,
658 format!("{}:step:{}", workflow_name, step_index),
659 ComponentStatus::Failed,
660 workflow_name.to_string(),
661 Some(error.to_string()),
662 data,
663 )
664 }
665
666 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
669 self.sender.subscribe()
670 }
671
672 pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
674 let history = self.history.read().unwrap();
675 history
676 .iter()
677 .filter(|e| e.offset >= offset)
678 .cloned()
679 .collect()
680 }
681
682 pub fn all(&self) -> Vec<Event> {
684 self.history.read().unwrap().clone()
685 }
686
687 pub fn len(&self) -> usize {
689 self.history.read().unwrap().len()
690 }
691
692 pub fn is_empty(&self) -> bool {
693 self.history.read().unwrap().is_empty()
694 }
695
696 pub fn current_offset(&self) -> EventOffset {
698 *self.next_offset.read().unwrap()
699 }
700}
701
702impl Default for EventStream {
703 fn default() -> Self {
704 Self::new()
705 }
706}
707
708impl Clone for EventStream {
709 fn clone(&self) -> Self {
710 Self {
711 sender: self.sender.clone(),
712 history: Arc::clone(&self.history),
713 next_offset: Arc::clone(&self.next_offset),
714 }
715 }
716}