Skip to main content

agent_runtime/
event.rs

1use crate::types::{EventId, EventOffset, JsonValue, WorkflowId};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use std::sync::{Arc, RwLock};
5use tokio::sync::broadcast;
6
7#[cfg(test)]
8#[path = "event_test.rs"]
9mod event_test;
10
11/// Event scope - which component is emitting the event
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum EventScope {
15    Workflow,
16    WorkflowStep,
17    Agent,
18    LlmRequest,
19    Tool,
20    System,
21}
22
23/// Event type - standard lifecycle events
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25#[serde(rename_all = "snake_case")]
26pub enum EventType {
27    Started,
28    Progress,
29    Completed,
30    Failed,
31    Canceled,
32}
33
34/// Component status after event
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(rename_all = "snake_case")]
37pub enum ComponentStatus {
38    Pending,
39    Running,
40    Completed,
41    Failed,
42    Canceled,
43}
44
45/// An immutable event record
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Event {
48    pub id: EventId,
49    pub offset: EventOffset,
50    pub timestamp: DateTime<Utc>,
51
52    /// Event scope (component type)
53    pub scope: EventScope,
54
55    /// Event type (lifecycle stage)
56    #[serde(rename = "type")]
57    pub event_type: EventType,
58
59    /// Component identifier (follows standardized format per scope)
60    pub component_id: String,
61
62    /// Current status of the component
63    pub status: ComponentStatus,
64
65    /// Workflow context
66    pub workflow_id: WorkflowId,
67
68    /// Optional parent workflow ID for nested workflows
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub parent_workflow_id: Option<WorkflowId>,
71
72    /// Optional human-readable message
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub message: Option<String>,
75
76    /// Flexible payload for component-specific data
77    pub data: JsonValue,
78}
79
80impl Event {
81    #[allow(clippy::too_many_arguments)]
82    pub fn new(
83        offset: EventOffset,
84        scope: EventScope,
85        event_type: EventType,
86        component_id: String,
87        status: ComponentStatus,
88        workflow_id: WorkflowId,
89        message: Option<String>,
90        data: JsonValue,
91    ) -> Result<Self, String> {
92        // Validate component_id format
93        Self::validate_component_id(&scope, &component_id)?;
94
95        Ok(Self {
96            id: format!("evt_{}", uuid::Uuid::new_v4()),
97            offset,
98            timestamp: Utc::now(),
99            scope,
100            event_type,
101            component_id,
102            status,
103            workflow_id,
104            parent_workflow_id: None,
105            message,
106            data,
107        })
108    }
109
110    #[allow(clippy::too_many_arguments)]
111    pub fn with_parent(
112        offset: EventOffset,
113        scope: EventScope,
114        event_type: EventType,
115        component_id: String,
116        status: ComponentStatus,
117        workflow_id: WorkflowId,
118        parent_workflow_id: Option<WorkflowId>,
119        message: Option<String>,
120        data: JsonValue,
121    ) -> Result<Self, String> {
122        // Validate component_id format
123        Self::validate_component_id(&scope, &component_id)?;
124
125        Ok(Self {
126            id: format!("evt_{}", uuid::Uuid::new_v4()),
127            offset,
128            timestamp: Utc::now(),
129            scope,
130            event_type,
131            component_id,
132            status,
133            workflow_id,
134            parent_workflow_id,
135            message,
136            data,
137        })
138    }
139
140    /// Validate component_id follows the required format for the scope
141    fn validate_component_id(scope: &EventScope, component_id: &str) -> Result<(), String> {
142        if component_id.is_empty() {
143            return Err(format!("{:?} component_id cannot be empty", scope));
144        }
145
146        match scope {
147            EventScope::Workflow => {
148                // Simple name, no special format required
149                Ok(())
150            }
151            EventScope::WorkflowStep => {
152                // Must match: name:step:N
153                let parts: Vec<&str> = component_id.split(':').collect();
154                if parts.len() != 3 || parts[1] != "step" {
155                    return Err(format!(
156                        "WorkflowStep component_id must be 'workflow_name:step:N', got '{}'",
157                        component_id
158                    ));
159                }
160                // Validate N is a number
161                if parts[2].parse::<usize>().is_err() {
162                    return Err(format!(
163                        "WorkflowStep index must be a number, got '{}'",
164                        parts[2]
165                    ));
166                }
167                Ok(())
168            }
169            EventScope::Agent => {
170                // Simple name, no special format required
171                Ok(())
172            }
173            EventScope::LlmRequest => {
174                // Must match: agent_name:llm:N
175                let parts: Vec<&str> = component_id.split(':').collect();
176                if parts.len() != 3 || parts[1] != "llm" {
177                    return Err(format!(
178                        "LlmRequest component_id must be 'agent_name:llm:N', got '{}'",
179                        component_id
180                    ));
181                }
182                // Validate N is a number
183                if parts[2].parse::<usize>().is_err() {
184                    return Err(format!(
185                        "LlmRequest iteration must be a number, got '{}'",
186                        parts[2]
187                    ));
188                }
189                Ok(())
190            }
191            EventScope::Tool => {
192                // tool_name or tool_name:N
193                // No strict format required, but validate not empty
194                Ok(())
195            }
196            EventScope::System => {
197                // Must start with 'system:'
198                if !component_id.starts_with("system:") {
199                    return Err(format!(
200                        "System component_id must start with 'system:', got '{}'",
201                        component_id
202                    ));
203                }
204                Ok(())
205            }
206        }
207    }
208}
209
210/// Event stream with broadcast capability for real-time subscribers
211pub struct EventStream {
212    /// Broadcast sender for real-time event streaming
213    sender: broadcast::Sender<Event>,
214
215    /// Historical events for replay (thread-safe)
216    history: Arc<RwLock<Vec<Event>>>,
217
218    /// Next offset to assign
219    next_offset: Arc<RwLock<EventOffset>>,
220}
221
222impl EventStream {
223    /// Create a new event stream with specified channel capacity
224    pub fn new() -> Self {
225        Self::with_capacity(1000)
226    }
227
228    /// Create event stream with custom channel capacity
229    pub fn with_capacity(capacity: usize) -> Self {
230        let (sender, _) = broadcast::channel(capacity);
231
232        Self {
233            sender,
234            history: Arc::new(RwLock::new(Vec::new())),
235            next_offset: Arc::new(RwLock::new(0)),
236        }
237    }
238
239    /// Append a new event and broadcast to all subscribers
240    ///
241    /// Events are emitted asynchronously in a spawned task to avoid blocking
242    /// agent execution. Returns a JoinHandle that can be awaited if the caller
243    /// needs to ensure the event was processed or needs the Event object.
244    ///
245    /// # Examples
246    /// ```no_run
247    /// use agent_runtime::event::{EventStream, EventScope, EventType, ComponentStatus};
248    /// use serde_json::json;
249    ///
250    /// # async fn example() {
251    /// let stream = EventStream::new();
252    ///
253    /// // Fire and forget (most common)
254    /// stream.append(
255    ///     EventScope::Agent,
256    ///     EventType::Started,
257    ///     "my_agent".to_string(),
258    ///     ComponentStatus::Running,
259    ///     "workflow_1".to_string(),
260    ///     None,
261    ///     json!({})
262    /// );
263    ///
264    /// // Wait for event if needed
265    /// let event = stream.append(
266    ///     EventScope::Agent,
267    ///     EventType::Completed,
268    ///     "my_agent".to_string(),
269    ///     ComponentStatus::Completed,
270    ///     "workflow_1".to_string(),
271    ///     Some("Agent completed successfully".to_string()),
272    ///     json!({})
273    /// ).await.unwrap();
274    /// # }
275    /// ```
276    #[allow(clippy::too_many_arguments)]
277    pub fn append(
278        &self,
279        scope: EventScope,
280        event_type: EventType,
281        component_id: String,
282        status: ComponentStatus,
283        workflow_id: WorkflowId,
284        message: Option<String>,
285        data: JsonValue,
286    ) -> tokio::task::JoinHandle<Result<Event, String>> {
287        self.append_with_parent(
288            scope,
289            event_type,
290            component_id,
291            status,
292            workflow_id,
293            None,
294            message,
295            data,
296        )
297    }
298
299    /// Append event with optional parent workflow ID
300    ///
301    /// Events are emitted asynchronously to avoid blocking execution.
302    /// Returns a JoinHandle that resolves to the created Event.
303    #[allow(clippy::too_many_arguments)]
304    pub fn append_with_parent(
305        &self,
306        scope: EventScope,
307        event_type: EventType,
308        component_id: String,
309        status: ComponentStatus,
310        workflow_id: WorkflowId,
311        parent_workflow_id: Option<WorkflowId>,
312        message: Option<String>,
313        data: JsonValue,
314    ) -> tokio::task::JoinHandle<Result<Event, String>> {
315        let sender = self.sender.clone();
316        let history = self.history.clone();
317        let next_offset = self.next_offset.clone();
318
319        // Spawn async task - never blocks the caller
320        tokio::spawn(async move {
321            // Get and increment offset atomically
322            let offset = {
323                let mut next_offset = next_offset.write().unwrap();
324                let current = *next_offset;
325                *next_offset += 1;
326                current
327            };
328
329            let event = Event::with_parent(
330                offset,
331                scope,
332                event_type,
333                component_id,
334                status,
335                workflow_id,
336                parent_workflow_id,
337                message,
338                data,
339            )?;
340
341            // Store in history
342            history.write().unwrap().push(event.clone());
343
344            // Broadcast to subscribers (ignore if no active receivers)
345            let _ = sender.send(event.clone());
346
347            Ok(event)
348        })
349    }
350
351    // Helper methods for common event patterns
352
353    /// Emit Agent::Started event
354    pub fn agent_started(
355        &self,
356        agent_name: &str,
357        workflow_id: WorkflowId,
358        data: JsonValue,
359    ) -> tokio::task::JoinHandle<Result<Event, String>> {
360        self.append(
361            EventScope::Agent,
362            EventType::Started,
363            agent_name.to_string(),
364            ComponentStatus::Running,
365            workflow_id,
366            None,
367            data,
368        )
369    }
370
371    /// Emit Agent::Completed event
372    pub fn agent_completed(
373        &self,
374        agent_name: &str,
375        workflow_id: WorkflowId,
376        message: Option<String>,
377        data: JsonValue,
378    ) -> tokio::task::JoinHandle<Result<Event, String>> {
379        self.append(
380            EventScope::Agent,
381            EventType::Completed,
382            agent_name.to_string(),
383            ComponentStatus::Completed,
384            workflow_id,
385            message,
386            data,
387        )
388    }
389
390    /// Emit Agent::Failed event
391    pub fn agent_failed(
392        &self,
393        agent_name: &str,
394        workflow_id: WorkflowId,
395        error: &str,
396        data: JsonValue,
397    ) -> tokio::task::JoinHandle<Result<Event, String>> {
398        self.append(
399            EventScope::Agent,
400            EventType::Failed,
401            agent_name.to_string(),
402            ComponentStatus::Failed,
403            workflow_id,
404            Some(error.to_string()),
405            data,
406        )
407    }
408
409    /// Emit LlmRequest::Started event
410    pub fn llm_started(
411        &self,
412        agent_name: &str,
413        iteration: usize,
414        workflow_id: WorkflowId,
415        data: JsonValue,
416    ) -> tokio::task::JoinHandle<Result<Event, String>> {
417        self.append(
418            EventScope::LlmRequest,
419            EventType::Started,
420            format!("{}:llm:{}", agent_name, iteration),
421            ComponentStatus::Running,
422            workflow_id,
423            None,
424            data,
425        )
426    }
427
428    /// Emit LlmRequest::Progress event (streaming chunk)
429    pub fn llm_progress(
430        &self,
431        agent_name: &str,
432        iteration: usize,
433        workflow_id: WorkflowId,
434        chunk: String,
435    ) -> tokio::task::JoinHandle<Result<Event, String>> {
436        self.append(
437            EventScope::LlmRequest,
438            EventType::Progress,
439            format!("{}:llm:{}", agent_name, iteration),
440            ComponentStatus::Running,
441            workflow_id,
442            None,
443            serde_json::json!({ "chunk": chunk }),
444        )
445    }
446
447    /// Emit LlmRequest::Completed event
448    pub fn llm_completed(
449        &self,
450        agent_name: &str,
451        iteration: usize,
452        workflow_id: WorkflowId,
453        data: JsonValue,
454    ) -> tokio::task::JoinHandle<Result<Event, String>> {
455        self.append(
456            EventScope::LlmRequest,
457            EventType::Completed,
458            format!("{}:llm:{}", agent_name, iteration),
459            ComponentStatus::Completed,
460            workflow_id,
461            None,
462            data,
463        )
464    }
465
466    /// Emit LlmRequest::Failed event
467    pub fn llm_failed(
468        &self,
469        agent_name: &str,
470        iteration: usize,
471        workflow_id: WorkflowId,
472        error: &str,
473    ) -> tokio::task::JoinHandle<Result<Event, String>> {
474        self.append(
475            EventScope::LlmRequest,
476            EventType::Failed,
477            format!("{}:llm:{}", agent_name, iteration),
478            ComponentStatus::Failed,
479            workflow_id,
480            Some(error.to_string()),
481            serde_json::json!({}),
482        )
483    }
484
485    /// Emit Tool::Started event
486    pub fn tool_started(
487        &self,
488        tool_name: &str,
489        workflow_id: WorkflowId,
490        data: JsonValue,
491    ) -> tokio::task::JoinHandle<Result<Event, String>> {
492        self.append(
493            EventScope::Tool,
494            EventType::Started,
495            tool_name.to_string(),
496            ComponentStatus::Running,
497            workflow_id,
498            None,
499            data,
500        )
501    }
502
503    /// Emit Tool::Progress event
504    pub fn tool_progress(
505        &self,
506        tool_name: &str,
507        workflow_id: WorkflowId,
508        message: &str,
509        percent: Option<u8>,
510    ) -> tokio::task::JoinHandle<Result<Event, String>> {
511        self.append(
512            EventScope::Tool,
513            EventType::Progress,
514            tool_name.to_string(),
515            ComponentStatus::Running,
516            workflow_id,
517            Some(message.to_string()),
518            serde_json::json!({ "percent": percent }),
519        )
520    }
521
522    /// Emit Tool::Completed event
523    pub fn tool_completed(
524        &self,
525        tool_name: &str,
526        workflow_id: WorkflowId,
527        data: JsonValue,
528    ) -> tokio::task::JoinHandle<Result<Event, String>> {
529        self.append(
530            EventScope::Tool,
531            EventType::Completed,
532            tool_name.to_string(),
533            ComponentStatus::Completed,
534            workflow_id,
535            None,
536            data,
537        )
538    }
539
540    /// Emit Tool::Failed event
541    pub fn tool_failed(
542        &self,
543        tool_name: &str,
544        workflow_id: WorkflowId,
545        error: &str,
546        data: JsonValue,
547    ) -> tokio::task::JoinHandle<Result<Event, String>> {
548        self.append(
549            EventScope::Tool,
550            EventType::Failed,
551            tool_name.to_string(),
552            ComponentStatus::Failed,
553            workflow_id,
554            Some(error.to_string()),
555            data,
556        )
557    }
558
559    /// Emit Workflow::Started event
560    pub fn workflow_started(
561        &self,
562        workflow_name: &str,
563        data: JsonValue,
564    ) -> tokio::task::JoinHandle<Result<Event, String>> {
565        self.append(
566            EventScope::Workflow,
567            EventType::Started,
568            workflow_name.to_string(),
569            ComponentStatus::Running,
570            workflow_name.to_string(),
571            None,
572            data,
573        )
574    }
575
576    /// Emit Workflow::Completed event
577    pub fn workflow_completed(
578        &self,
579        workflow_name: &str,
580        data: JsonValue,
581    ) -> tokio::task::JoinHandle<Result<Event, String>> {
582        self.append(
583            EventScope::Workflow,
584            EventType::Completed,
585            workflow_name.to_string(),
586            ComponentStatus::Completed,
587            workflow_name.to_string(),
588            None,
589            data,
590        )
591    }
592
593    /// Emit Workflow::Failed event
594    pub fn workflow_failed(
595        &self,
596        workflow_name: &str,
597        error: &str,
598        data: JsonValue,
599    ) -> tokio::task::JoinHandle<Result<Event, String>> {
600        self.append(
601            EventScope::Workflow,
602            EventType::Failed,
603            workflow_name.to_string(),
604            ComponentStatus::Failed,
605            workflow_name.to_string(),
606            Some(error.to_string()),
607            data,
608        )
609    }
610
611    /// Emit WorkflowStep::Started event
612    pub fn step_started(
613        &self,
614        workflow_name: &str,
615        step_index: usize,
616        data: JsonValue,
617    ) -> tokio::task::JoinHandle<Result<Event, String>> {
618        self.append(
619            EventScope::WorkflowStep,
620            EventType::Started,
621            format!("{}:step:{}", workflow_name, step_index),
622            ComponentStatus::Running,
623            workflow_name.to_string(),
624            None,
625            data,
626        )
627    }
628
629    /// Emit WorkflowStep::Completed event
630    pub fn step_completed(
631        &self,
632        workflow_name: &str,
633        step_index: usize,
634        data: JsonValue,
635    ) -> tokio::task::JoinHandle<Result<Event, String>> {
636        self.append(
637            EventScope::WorkflowStep,
638            EventType::Completed,
639            format!("{}:step:{}", workflow_name, step_index),
640            ComponentStatus::Completed,
641            workflow_name.to_string(),
642            None,
643            data,
644        )
645    }
646
647    /// Emit WorkflowStep::Failed event
648    pub fn step_failed(
649        &self,
650        workflow_name: &str,
651        step_index: usize,
652        error: &str,
653        data: JsonValue,
654    ) -> tokio::task::JoinHandle<Result<Event, String>> {
655        self.append(
656            EventScope::WorkflowStep,
657            EventType::Failed,
658            format!("{}:step:{}", workflow_name, step_index),
659            ComponentStatus::Failed,
660            workflow_name.to_string(),
661            Some(error.to_string()),
662            data,
663        )
664    }
665
666    /// Subscribe to real-time event stream
667    /// Returns a receiver that will get all future events
668    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
669        self.sender.subscribe()
670    }
671
672    /// Get events from a specific offset (for replay)
673    pub fn from_offset(&self, offset: EventOffset) -> Vec<Event> {
674        let history = self.history.read().unwrap();
675        history
676            .iter()
677            .filter(|e| e.offset >= offset)
678            .cloned()
679            .collect()
680    }
681
682    /// Get all events
683    pub fn all(&self) -> Vec<Event> {
684        self.history.read().unwrap().clone()
685    }
686
687    /// Get event count
688    pub fn len(&self) -> usize {
689        self.history.read().unwrap().len()
690    }
691
692    pub fn is_empty(&self) -> bool {
693        self.history.read().unwrap().is_empty()
694    }
695
696    /// Get the current offset (next event will have this offset)
697    pub fn current_offset(&self) -> EventOffset {
698        *self.next_offset.read().unwrap()
699    }
700}
701
702impl Default for EventStream {
703    fn default() -> Self {
704        Self::new()
705    }
706}
707
708impl Clone for EventStream {
709    fn clone(&self) -> Self {
710        Self {
711            sender: self.sender.clone(),
712            history: Arc::clone(&self.history),
713            next_offset: Arc::clone(&self.next_offset),
714        }
715    }
716}