oxify_model/
event.rs

1//! Execution event tracking for workflows
2//!
3//! This module provides detailed event tracking for workflow execution,
4//! enabling debugging, monitoring, and audit trails.
5
6use crate::{ExecutionResult, NodeId, NodeKind, NodeMetrics, WorkflowId, WorkflowMetadata};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::HashMap;
11
12#[cfg(feature = "openapi")]
13use utoipa::ToSchema;
14
15/// Unique identifier for an execution event
16pub type EventId = uuid::Uuid;
17
18/// Unique identifier for a workflow execution
19pub type ExecutionId = uuid::Uuid;
20
21/// Execution event tracking node-level and workflow-level activities
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[cfg_attr(feature = "openapi", derive(ToSchema))]
24pub struct ExecutionEvent {
25    /// Unique event identifier
26    #[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
27    pub id: EventId,
28    /// Execution this event belongs to
29    #[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
30    pub execution_id: ExecutionId,
31    /// Workflow being executed
32    #[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
33    pub workflow_id: WorkflowId,
34    /// Node associated with this event (if applicable)
35    #[cfg_attr(feature = "openapi", schema(value_type = Option<uuid::Uuid>))]
36    pub node_id: Option<NodeId>,
37    /// Event timestamp
38    pub timestamp: DateTime<Utc>,
39    /// Event type classification
40    pub event_type: EventType,
41    /// Detailed event information
42    pub details: EventDetails,
43}
44
45/// Event type classification for filtering and querying
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47#[cfg_attr(feature = "openapi", derive(ToSchema))]
48pub enum EventType {
49    /// Workflow execution started
50    WorkflowStarted,
51    /// Workflow execution completed successfully
52    WorkflowCompleted,
53    /// Workflow execution failed
54    WorkflowFailed,
55    /// Workflow execution cancelled
56    WorkflowCancelled,
57    /// Node execution started
58    NodeStarted,
59    /// Node execution completed successfully
60    NodeCompleted,
61    /// Node execution failed
62    NodeFailed,
63    /// Node execution skipped
64    NodeSkipped,
65    /// Variable value changed
66    VariableChanged,
67    /// Error occurred during execution
68    ErrorOccurred,
69    /// Checkpoint created
70    CheckpointCreated,
71    /// Execution resumed from checkpoint
72    ExecutionResumed,
73}
74
75/// Detailed event information
76#[derive(Debug, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "openapi", derive(ToSchema))]
78#[serde(tag = "type")]
79pub enum EventDetails {
80    /// Workflow started event
81    WorkflowStarted {
82        /// Workflow metadata
83        metadata: WorkflowMetadata,
84        /// Input parameters
85        #[serde(default)]
86        input: HashMap<String, Value>,
87    },
88    /// Workflow completed successfully
89    WorkflowCompleted {
90        /// Total execution duration in milliseconds
91        duration_ms: u64,
92        /// Final execution result
93        result: ExecutionResult,
94    },
95    /// Workflow failed
96    WorkflowFailed {
97        /// Error message
98        error: String,
99        /// Total execution duration in milliseconds
100        duration_ms: u64,
101        /// Stack trace if available
102        #[serde(skip_serializing_if = "Option::is_none")]
103        stack_trace: Option<String>,
104    },
105    /// Workflow cancelled by user
106    WorkflowCancelled {
107        /// Reason for cancellation
108        reason: String,
109        /// Partial execution duration
110        duration_ms: u64,
111    },
112    /// Node started execution
113    NodeStarted {
114        /// Type of node being executed
115        node_kind: NodeKind,
116        /// Node input data
117        #[serde(default)]
118        input: HashMap<String, Value>,
119    },
120    /// Node completed successfully
121    NodeCompleted {
122        /// Type of node executed
123        node_kind: NodeKind,
124        /// Node execution duration in milliseconds
125        duration_ms: u64,
126        /// Performance metrics
127        metrics: NodeMetrics,
128        /// Node output data
129        #[serde(default)]
130        output: HashMap<String, Value>,
131    },
132    /// Node failed
133    NodeFailed {
134        /// Type of node that failed
135        node_kind: NodeKind,
136        /// Error message
137        error: String,
138        /// Stack trace if available
139        #[serde(skip_serializing_if = "Option::is_none")]
140        stack_trace: Option<String>,
141        /// Retry attempt number (0 for first attempt)
142        retry_attempt: u32,
143    },
144    /// Node skipped (e.g., due to conditional logic)
145    NodeSkipped {
146        /// Type of node skipped
147        node_kind: NodeKind,
148        /// Reason for skipping
149        reason: String,
150    },
151    /// Variable changed
152    VariableChanged {
153        /// Variable name
154        variable_name: String,
155        /// Previous value (None if newly created)
156        #[serde(skip_serializing_if = "Option::is_none")]
157        old_value: Option<Value>,
158        /// New value
159        new_value: Value,
160        /// Source of the change (node ID or "system")
161        source: String,
162    },
163    /// Error occurred
164    ErrorOccurred {
165        /// Error message
166        error: String,
167        /// Stack trace if available
168        #[serde(skip_serializing_if = "Option::is_none")]
169        stack_trace: Option<String>,
170        /// Additional error context
171        #[serde(default)]
172        context: HashMap<String, Value>,
173    },
174    /// Checkpoint created
175    CheckpointCreated {
176        /// Checkpoint identifier
177        checkpoint_id: String,
178        /// Number of nodes completed
179        nodes_completed: usize,
180        /// Current execution state
181        state: String,
182    },
183    /// Execution resumed
184    ExecutionResumed {
185        /// Checkpoint used for resumption
186        checkpoint_id: String,
187        /// Number of nodes to skip
188        nodes_to_skip: usize,
189    },
190}
191
192impl ExecutionEvent {
193    /// Create a new workflow started event
194    pub fn workflow_started(
195        execution_id: ExecutionId,
196        workflow_id: WorkflowId,
197        metadata: WorkflowMetadata,
198        input: HashMap<String, Value>,
199    ) -> Self {
200        Self {
201            id: EventId::new_v4(),
202            execution_id,
203            workflow_id,
204            node_id: None,
205            timestamp: Utc::now(),
206            event_type: EventType::WorkflowStarted,
207            details: EventDetails::WorkflowStarted { metadata, input },
208        }
209    }
210
211    /// Create a new workflow completed event
212    pub fn workflow_completed(
213        execution_id: ExecutionId,
214        workflow_id: WorkflowId,
215        duration_ms: u64,
216        result: ExecutionResult,
217    ) -> Self {
218        Self {
219            id: EventId::new_v4(),
220            execution_id,
221            workflow_id,
222            node_id: None,
223            timestamp: Utc::now(),
224            event_type: EventType::WorkflowCompleted,
225            details: EventDetails::WorkflowCompleted {
226                duration_ms,
227                result,
228            },
229        }
230    }
231
232    /// Create a new workflow failed event
233    pub fn workflow_failed(
234        execution_id: ExecutionId,
235        workflow_id: WorkflowId,
236        duration_ms: u64,
237        error: String,
238        stack_trace: Option<String>,
239    ) -> Self {
240        Self {
241            id: EventId::new_v4(),
242            execution_id,
243            workflow_id,
244            node_id: None,
245            timestamp: Utc::now(),
246            event_type: EventType::WorkflowFailed,
247            details: EventDetails::WorkflowFailed {
248                error,
249                duration_ms,
250                stack_trace,
251            },
252        }
253    }
254
255    /// Create a new workflow cancelled event
256    pub fn workflow_cancelled(
257        execution_id: ExecutionId,
258        workflow_id: WorkflowId,
259        duration_ms: u64,
260        reason: String,
261    ) -> Self {
262        Self {
263            id: EventId::new_v4(),
264            execution_id,
265            workflow_id,
266            node_id: None,
267            timestamp: Utc::now(),
268            event_type: EventType::WorkflowCancelled,
269            details: EventDetails::WorkflowCancelled {
270                reason,
271                duration_ms,
272            },
273        }
274    }
275
276    /// Create a new node started event
277    pub fn node_started(
278        execution_id: ExecutionId,
279        workflow_id: WorkflowId,
280        node_id: NodeId,
281        node_kind: NodeKind,
282        input: HashMap<String, Value>,
283    ) -> Self {
284        Self {
285            id: EventId::new_v4(),
286            execution_id,
287            workflow_id,
288            node_id: Some(node_id),
289            timestamp: Utc::now(),
290            event_type: EventType::NodeStarted,
291            details: EventDetails::NodeStarted { node_kind, input },
292        }
293    }
294
295    /// Create a new node completed event
296    pub fn node_completed(
297        execution_id: ExecutionId,
298        workflow_id: WorkflowId,
299        node_id: NodeId,
300        node_kind: NodeKind,
301        duration_ms: u64,
302        metrics: NodeMetrics,
303        output: HashMap<String, Value>,
304    ) -> Self {
305        Self {
306            id: EventId::new_v4(),
307            execution_id,
308            workflow_id,
309            node_id: Some(node_id),
310            timestamp: Utc::now(),
311            event_type: EventType::NodeCompleted,
312            details: EventDetails::NodeCompleted {
313                node_kind,
314                duration_ms,
315                metrics,
316                output,
317            },
318        }
319    }
320
321    /// Create a new node failed event
322    pub fn node_failed(
323        execution_id: ExecutionId,
324        workflow_id: WorkflowId,
325        node_id: NodeId,
326        node_kind: NodeKind,
327        error: String,
328        stack_trace: Option<String>,
329        retry_attempt: u32,
330    ) -> Self {
331        Self {
332            id: EventId::new_v4(),
333            execution_id,
334            workflow_id,
335            node_id: Some(node_id),
336            timestamp: Utc::now(),
337            event_type: EventType::NodeFailed,
338            details: EventDetails::NodeFailed {
339                node_kind,
340                error,
341                stack_trace,
342                retry_attempt,
343            },
344        }
345    }
346
347    /// Create a new node skipped event
348    pub fn node_skipped(
349        execution_id: ExecutionId,
350        workflow_id: WorkflowId,
351        node_id: NodeId,
352        node_kind: NodeKind,
353        reason: String,
354    ) -> Self {
355        Self {
356            id: EventId::new_v4(),
357            execution_id,
358            workflow_id,
359            node_id: Some(node_id),
360            timestamp: Utc::now(),
361            event_type: EventType::NodeSkipped,
362            details: EventDetails::NodeSkipped { node_kind, reason },
363        }
364    }
365
366    /// Create a new variable changed event
367    pub fn variable_changed(
368        execution_id: ExecutionId,
369        workflow_id: WorkflowId,
370        node_id: Option<NodeId>,
371        variable_name: String,
372        old_value: Option<Value>,
373        new_value: Value,
374        source: String,
375    ) -> Self {
376        Self {
377            id: EventId::new_v4(),
378            execution_id,
379            workflow_id,
380            node_id,
381            timestamp: Utc::now(),
382            event_type: EventType::VariableChanged,
383            details: EventDetails::VariableChanged {
384                variable_name,
385                old_value,
386                new_value,
387                source,
388            },
389        }
390    }
391
392    /// Create a new error occurred event
393    pub fn error_occurred(
394        execution_id: ExecutionId,
395        workflow_id: WorkflowId,
396        node_id: Option<NodeId>,
397        error: String,
398        stack_trace: Option<String>,
399        context: HashMap<String, Value>,
400    ) -> Self {
401        Self {
402            id: EventId::new_v4(),
403            execution_id,
404            workflow_id,
405            node_id,
406            timestamp: Utc::now(),
407            event_type: EventType::ErrorOccurred,
408            details: EventDetails::ErrorOccurred {
409                error,
410                stack_trace,
411                context,
412            },
413        }
414    }
415
416    /// Create a new checkpoint created event
417    pub fn checkpoint_created(
418        execution_id: ExecutionId,
419        workflow_id: WorkflowId,
420        checkpoint_id: String,
421        nodes_completed: usize,
422        state: String,
423    ) -> Self {
424        Self {
425            id: EventId::new_v4(),
426            execution_id,
427            workflow_id,
428            node_id: None,
429            timestamp: Utc::now(),
430            event_type: EventType::CheckpointCreated,
431            details: EventDetails::CheckpointCreated {
432                checkpoint_id,
433                nodes_completed,
434                state,
435            },
436        }
437    }
438
439    /// Create a new execution resumed event
440    pub fn execution_resumed(
441        execution_id: ExecutionId,
442        workflow_id: WorkflowId,
443        checkpoint_id: String,
444        nodes_to_skip: usize,
445    ) -> Self {
446        Self {
447            id: EventId::new_v4(),
448            execution_id,
449            workflow_id,
450            node_id: None,
451            timestamp: Utc::now(),
452            event_type: EventType::ExecutionResumed,
453            details: EventDetails::ExecutionResumed {
454                checkpoint_id,
455                nodes_to_skip,
456            },
457        }
458    }
459}
460
461/// Event timeline for an execution
462#[derive(Debug, Clone, Default, Serialize, Deserialize)]
463#[cfg_attr(feature = "openapi", derive(ToSchema))]
464pub struct EventTimeline {
465    /// All events in chronological order
466    pub events: Vec<ExecutionEvent>,
467}
468
469impl EventTimeline {
470    /// Create a new empty timeline
471    pub fn new() -> Self {
472        Self { events: Vec::new() }
473    }
474
475    /// Add an event to the timeline
476    pub fn push(&mut self, event: ExecutionEvent) {
477        self.events.push(event);
478    }
479
480    /// Get events by type
481    pub fn filter_by_type(&self, event_type: EventType) -> Vec<&ExecutionEvent> {
482        self.events
483            .iter()
484            .filter(|e| e.event_type == event_type)
485            .collect()
486    }
487
488    /// Get events by node
489    pub fn filter_by_node(&self, node_id: NodeId) -> Vec<&ExecutionEvent> {
490        self.events
491            .iter()
492            .filter(|e| e.node_id == Some(node_id))
493            .collect()
494    }
495
496    /// Get events in time range
497    pub fn filter_by_time_range(
498        &self,
499        start: DateTime<Utc>,
500        end: DateTime<Utc>,
501    ) -> Vec<&ExecutionEvent> {
502        self.events
503            .iter()
504            .filter(|e| e.timestamp >= start && e.timestamp <= end)
505            .collect()
506    }
507
508    /// Get total execution duration in milliseconds
509    pub fn total_duration_ms(&self) -> Option<u64> {
510        let start = self.events.first()?.timestamp;
511        let end = self.events.last()?.timestamp;
512        Some((end - start).num_milliseconds() as u64)
513    }
514
515    /// Count events by type
516    pub fn count_by_type(&self, event_type: EventType) -> usize {
517        self.events
518            .iter()
519            .filter(|e| e.event_type == event_type)
520            .count()
521    }
522
523    /// Get all error events
524    pub fn errors(&self) -> Vec<&ExecutionEvent> {
525        self.events
526            .iter()
527            .filter(|e| {
528                matches!(
529                    e.event_type,
530                    EventType::NodeFailed | EventType::WorkflowFailed | EventType::ErrorOccurred
531                )
532            })
533            .collect()
534    }
535
536    /// Check if execution was successful
537    pub fn is_successful(&self) -> bool {
538        self.events
539            .iter()
540            .any(|e| e.event_type == EventType::WorkflowCompleted)
541    }
542
543    /// Check if execution failed
544    pub fn is_failed(&self) -> bool {
545        self.events
546            .iter()
547            .any(|e| e.event_type == EventType::WorkflowFailed)
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554
555    #[test]
556    fn test_workflow_started_event() {
557        let execution_id = ExecutionId::new_v4();
558        let workflow_id = WorkflowId::new_v4();
559        let metadata = WorkflowMetadata::new("test-workflow".to_string());
560
561        let event = ExecutionEvent::workflow_started(
562            execution_id,
563            workflow_id,
564            metadata.clone(),
565            HashMap::new(),
566        );
567
568        assert_eq!(event.execution_id, execution_id);
569        assert_eq!(event.workflow_id, workflow_id);
570        assert_eq!(event.event_type, EventType::WorkflowStarted);
571        assert!(event.node_id.is_none());
572    }
573
574    #[test]
575    fn test_node_events() {
576        let execution_id = ExecutionId::new_v4();
577        let workflow_id = WorkflowId::new_v4();
578        let node_id = NodeId::new_v4();
579
580        let started = ExecutionEvent::node_started(
581            execution_id,
582            workflow_id,
583            node_id,
584            NodeKind::Start,
585            HashMap::new(),
586        );
587        assert_eq!(started.event_type, EventType::NodeStarted);
588        assert_eq!(started.node_id, Some(node_id));
589
590        let metrics = NodeMetrics::default();
591        let completed = ExecutionEvent::node_completed(
592            execution_id,
593            workflow_id,
594            node_id,
595            NodeKind::Start,
596            100,
597            metrics,
598            HashMap::new(),
599        );
600        assert_eq!(completed.event_type, EventType::NodeCompleted);
601    }
602
603    #[test]
604    fn test_event_timeline() {
605        let mut timeline = EventTimeline::new();
606        let execution_id = ExecutionId::new_v4();
607        let workflow_id = WorkflowId::new_v4();
608
609        let metadata = WorkflowMetadata::new("test".to_string());
610
611        timeline.push(ExecutionEvent::workflow_started(
612            execution_id,
613            workflow_id,
614            metadata,
615            HashMap::new(),
616        ));
617
618        assert_eq!(timeline.events.len(), 1);
619        assert_eq!(timeline.count_by_type(EventType::WorkflowStarted), 1);
620    }
621
622    #[test]
623    fn test_timeline_filtering() {
624        let mut timeline = EventTimeline::new();
625        let execution_id = ExecutionId::new_v4();
626        let workflow_id = WorkflowId::new_v4();
627        let node_id = NodeId::new_v4();
628
629        // Add various events
630        timeline.push(ExecutionEvent::node_started(
631            execution_id,
632            workflow_id,
633            node_id,
634            NodeKind::Start,
635            HashMap::new(),
636        ));
637
638        timeline.push(ExecutionEvent::node_failed(
639            execution_id,
640            workflow_id,
641            node_id,
642            NodeKind::Start,
643            "Test error".to_string(),
644            None,
645            0,
646        ));
647
648        let node_events = timeline.filter_by_node(node_id);
649        assert_eq!(node_events.len(), 2);
650
651        let errors = timeline.errors();
652        assert_eq!(errors.len(), 1);
653    }
654
655    #[test]
656    fn test_variable_changed_event() {
657        let execution_id = ExecutionId::new_v4();
658        let workflow_id = WorkflowId::new_v4();
659        let node_id = NodeId::new_v4();
660
661        let event = ExecutionEvent::variable_changed(
662            execution_id,
663            workflow_id,
664            Some(node_id),
665            "counter".to_string(),
666            Some(Value::from(0)),
667            Value::from(1),
668            node_id.to_string(),
669        );
670
671        assert_eq!(event.event_type, EventType::VariableChanged);
672        if let EventDetails::VariableChanged { variable_name, .. } = &event.details {
673            assert_eq!(variable_name, "counter");
674        } else {
675            panic!("Expected VariableChanged event details");
676        }
677    }
678
679    #[test]
680    fn test_timeline_success_check() {
681        let mut timeline = EventTimeline::new();
682        let execution_id = ExecutionId::new_v4();
683        let workflow_id = WorkflowId::new_v4();
684
685        let metadata = WorkflowMetadata::new("test".to_string());
686
687        timeline.push(ExecutionEvent::workflow_started(
688            execution_id,
689            workflow_id,
690            metadata,
691            HashMap::new(),
692        ));
693
694        assert!(!timeline.is_successful());
695        assert!(!timeline.is_failed());
696
697        let result = ExecutionResult::Success(Value::Null);
698        timeline.push(ExecutionEvent::workflow_completed(
699            execution_id,
700            workflow_id,
701            1000,
702            result,
703        ));
704
705        assert!(timeline.is_successful());
706        assert!(!timeline.is_failed());
707    }
708}