1use crate::{ExecutionResult, NodeId, NodeKind, NodeMetrics, WorkflowId, WorkflowMetadata};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::HashMap;
11
12#[cfg(feature = "openapi")]
13use utoipa::ToSchema;
14
15pub type EventId = uuid::Uuid;
17
18pub type ExecutionId = uuid::Uuid;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23#[cfg_attr(feature = "openapi", derive(ToSchema))]
24pub struct ExecutionEvent {
25 #[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
27 pub id: EventId,
28 #[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
30 pub execution_id: ExecutionId,
31 #[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
33 pub workflow_id: WorkflowId,
34 #[cfg_attr(feature = "openapi", schema(value_type = Option<uuid::Uuid>))]
36 pub node_id: Option<NodeId>,
37 pub timestamp: DateTime<Utc>,
39 pub event_type: EventType,
41 pub details: EventDetails,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47#[cfg_attr(feature = "openapi", derive(ToSchema))]
48pub enum EventType {
49 WorkflowStarted,
51 WorkflowCompleted,
53 WorkflowFailed,
55 WorkflowCancelled,
57 NodeStarted,
59 NodeCompleted,
61 NodeFailed,
63 NodeSkipped,
65 VariableChanged,
67 ErrorOccurred,
69 CheckpointCreated,
71 ExecutionResumed,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77#[cfg_attr(feature = "openapi", derive(ToSchema))]
78#[serde(tag = "type")]
79pub enum EventDetails {
80 WorkflowStarted {
82 metadata: WorkflowMetadata,
84 #[serde(default)]
86 input: HashMap<String, Value>,
87 },
88 WorkflowCompleted {
90 duration_ms: u64,
92 result: ExecutionResult,
94 },
95 WorkflowFailed {
97 error: String,
99 duration_ms: u64,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 stack_trace: Option<String>,
104 },
105 WorkflowCancelled {
107 reason: String,
109 duration_ms: u64,
111 },
112 NodeStarted {
114 node_kind: NodeKind,
116 #[serde(default)]
118 input: HashMap<String, Value>,
119 },
120 NodeCompleted {
122 node_kind: NodeKind,
124 duration_ms: u64,
126 metrics: NodeMetrics,
128 #[serde(default)]
130 output: HashMap<String, Value>,
131 },
132 NodeFailed {
134 node_kind: NodeKind,
136 error: String,
138 #[serde(skip_serializing_if = "Option::is_none")]
140 stack_trace: Option<String>,
141 retry_attempt: u32,
143 },
144 NodeSkipped {
146 node_kind: NodeKind,
148 reason: String,
150 },
151 VariableChanged {
153 variable_name: String,
155 #[serde(skip_serializing_if = "Option::is_none")]
157 old_value: Option<Value>,
158 new_value: Value,
160 source: String,
162 },
163 ErrorOccurred {
165 error: String,
167 #[serde(skip_serializing_if = "Option::is_none")]
169 stack_trace: Option<String>,
170 #[serde(default)]
172 context: HashMap<String, Value>,
173 },
174 CheckpointCreated {
176 checkpoint_id: String,
178 nodes_completed: usize,
180 state: String,
182 },
183 ExecutionResumed {
185 checkpoint_id: String,
187 nodes_to_skip: usize,
189 },
190}
191
192impl ExecutionEvent {
193 pub fn workflow_started(
195 execution_id: ExecutionId,
196 workflow_id: WorkflowId,
197 metadata: WorkflowMetadata,
198 input: HashMap<String, Value>,
199 ) -> Self {
200 Self {
201 id: EventId::new_v4(),
202 execution_id,
203 workflow_id,
204 node_id: None,
205 timestamp: Utc::now(),
206 event_type: EventType::WorkflowStarted,
207 details: EventDetails::WorkflowStarted { metadata, input },
208 }
209 }
210
211 pub fn workflow_completed(
213 execution_id: ExecutionId,
214 workflow_id: WorkflowId,
215 duration_ms: u64,
216 result: ExecutionResult,
217 ) -> Self {
218 Self {
219 id: EventId::new_v4(),
220 execution_id,
221 workflow_id,
222 node_id: None,
223 timestamp: Utc::now(),
224 event_type: EventType::WorkflowCompleted,
225 details: EventDetails::WorkflowCompleted {
226 duration_ms,
227 result,
228 },
229 }
230 }
231
232 pub fn workflow_failed(
234 execution_id: ExecutionId,
235 workflow_id: WorkflowId,
236 duration_ms: u64,
237 error: String,
238 stack_trace: Option<String>,
239 ) -> Self {
240 Self {
241 id: EventId::new_v4(),
242 execution_id,
243 workflow_id,
244 node_id: None,
245 timestamp: Utc::now(),
246 event_type: EventType::WorkflowFailed,
247 details: EventDetails::WorkflowFailed {
248 error,
249 duration_ms,
250 stack_trace,
251 },
252 }
253 }
254
255 pub fn workflow_cancelled(
257 execution_id: ExecutionId,
258 workflow_id: WorkflowId,
259 duration_ms: u64,
260 reason: String,
261 ) -> Self {
262 Self {
263 id: EventId::new_v4(),
264 execution_id,
265 workflow_id,
266 node_id: None,
267 timestamp: Utc::now(),
268 event_type: EventType::WorkflowCancelled,
269 details: EventDetails::WorkflowCancelled {
270 reason,
271 duration_ms,
272 },
273 }
274 }
275
276 pub fn node_started(
278 execution_id: ExecutionId,
279 workflow_id: WorkflowId,
280 node_id: NodeId,
281 node_kind: NodeKind,
282 input: HashMap<String, Value>,
283 ) -> Self {
284 Self {
285 id: EventId::new_v4(),
286 execution_id,
287 workflow_id,
288 node_id: Some(node_id),
289 timestamp: Utc::now(),
290 event_type: EventType::NodeStarted,
291 details: EventDetails::NodeStarted { node_kind, input },
292 }
293 }
294
295 pub fn node_completed(
297 execution_id: ExecutionId,
298 workflow_id: WorkflowId,
299 node_id: NodeId,
300 node_kind: NodeKind,
301 duration_ms: u64,
302 metrics: NodeMetrics,
303 output: HashMap<String, Value>,
304 ) -> Self {
305 Self {
306 id: EventId::new_v4(),
307 execution_id,
308 workflow_id,
309 node_id: Some(node_id),
310 timestamp: Utc::now(),
311 event_type: EventType::NodeCompleted,
312 details: EventDetails::NodeCompleted {
313 node_kind,
314 duration_ms,
315 metrics,
316 output,
317 },
318 }
319 }
320
321 pub fn node_failed(
323 execution_id: ExecutionId,
324 workflow_id: WorkflowId,
325 node_id: NodeId,
326 node_kind: NodeKind,
327 error: String,
328 stack_trace: Option<String>,
329 retry_attempt: u32,
330 ) -> Self {
331 Self {
332 id: EventId::new_v4(),
333 execution_id,
334 workflow_id,
335 node_id: Some(node_id),
336 timestamp: Utc::now(),
337 event_type: EventType::NodeFailed,
338 details: EventDetails::NodeFailed {
339 node_kind,
340 error,
341 stack_trace,
342 retry_attempt,
343 },
344 }
345 }
346
347 pub fn node_skipped(
349 execution_id: ExecutionId,
350 workflow_id: WorkflowId,
351 node_id: NodeId,
352 node_kind: NodeKind,
353 reason: String,
354 ) -> Self {
355 Self {
356 id: EventId::new_v4(),
357 execution_id,
358 workflow_id,
359 node_id: Some(node_id),
360 timestamp: Utc::now(),
361 event_type: EventType::NodeSkipped,
362 details: EventDetails::NodeSkipped { node_kind, reason },
363 }
364 }
365
366 pub fn variable_changed(
368 execution_id: ExecutionId,
369 workflow_id: WorkflowId,
370 node_id: Option<NodeId>,
371 variable_name: String,
372 old_value: Option<Value>,
373 new_value: Value,
374 source: String,
375 ) -> Self {
376 Self {
377 id: EventId::new_v4(),
378 execution_id,
379 workflow_id,
380 node_id,
381 timestamp: Utc::now(),
382 event_type: EventType::VariableChanged,
383 details: EventDetails::VariableChanged {
384 variable_name,
385 old_value,
386 new_value,
387 source,
388 },
389 }
390 }
391
392 pub fn error_occurred(
394 execution_id: ExecutionId,
395 workflow_id: WorkflowId,
396 node_id: Option<NodeId>,
397 error: String,
398 stack_trace: Option<String>,
399 context: HashMap<String, Value>,
400 ) -> Self {
401 Self {
402 id: EventId::new_v4(),
403 execution_id,
404 workflow_id,
405 node_id,
406 timestamp: Utc::now(),
407 event_type: EventType::ErrorOccurred,
408 details: EventDetails::ErrorOccurred {
409 error,
410 stack_trace,
411 context,
412 },
413 }
414 }
415
416 pub fn checkpoint_created(
418 execution_id: ExecutionId,
419 workflow_id: WorkflowId,
420 checkpoint_id: String,
421 nodes_completed: usize,
422 state: String,
423 ) -> Self {
424 Self {
425 id: EventId::new_v4(),
426 execution_id,
427 workflow_id,
428 node_id: None,
429 timestamp: Utc::now(),
430 event_type: EventType::CheckpointCreated,
431 details: EventDetails::CheckpointCreated {
432 checkpoint_id,
433 nodes_completed,
434 state,
435 },
436 }
437 }
438
439 pub fn execution_resumed(
441 execution_id: ExecutionId,
442 workflow_id: WorkflowId,
443 checkpoint_id: String,
444 nodes_to_skip: usize,
445 ) -> Self {
446 Self {
447 id: EventId::new_v4(),
448 execution_id,
449 workflow_id,
450 node_id: None,
451 timestamp: Utc::now(),
452 event_type: EventType::ExecutionResumed,
453 details: EventDetails::ExecutionResumed {
454 checkpoint_id,
455 nodes_to_skip,
456 },
457 }
458 }
459}
460
461#[derive(Debug, Clone, Default, Serialize, Deserialize)]
463#[cfg_attr(feature = "openapi", derive(ToSchema))]
464pub struct EventTimeline {
465 pub events: Vec<ExecutionEvent>,
467}
468
469impl EventTimeline {
470 pub fn new() -> Self {
472 Self { events: Vec::new() }
473 }
474
475 pub fn push(&mut self, event: ExecutionEvent) {
477 self.events.push(event);
478 }
479
480 pub fn filter_by_type(&self, event_type: EventType) -> Vec<&ExecutionEvent> {
482 self.events
483 .iter()
484 .filter(|e| e.event_type == event_type)
485 .collect()
486 }
487
488 pub fn filter_by_node(&self, node_id: NodeId) -> Vec<&ExecutionEvent> {
490 self.events
491 .iter()
492 .filter(|e| e.node_id == Some(node_id))
493 .collect()
494 }
495
496 pub fn filter_by_time_range(
498 &self,
499 start: DateTime<Utc>,
500 end: DateTime<Utc>,
501 ) -> Vec<&ExecutionEvent> {
502 self.events
503 .iter()
504 .filter(|e| e.timestamp >= start && e.timestamp <= end)
505 .collect()
506 }
507
508 pub fn total_duration_ms(&self) -> Option<u64> {
510 let start = self.events.first()?.timestamp;
511 let end = self.events.last()?.timestamp;
512 Some((end - start).num_milliseconds() as u64)
513 }
514
515 pub fn count_by_type(&self, event_type: EventType) -> usize {
517 self.events
518 .iter()
519 .filter(|e| e.event_type == event_type)
520 .count()
521 }
522
523 pub fn errors(&self) -> Vec<&ExecutionEvent> {
525 self.events
526 .iter()
527 .filter(|e| {
528 matches!(
529 e.event_type,
530 EventType::NodeFailed | EventType::WorkflowFailed | EventType::ErrorOccurred
531 )
532 })
533 .collect()
534 }
535
536 pub fn is_successful(&self) -> bool {
538 self.events
539 .iter()
540 .any(|e| e.event_type == EventType::WorkflowCompleted)
541 }
542
543 pub fn is_failed(&self) -> bool {
545 self.events
546 .iter()
547 .any(|e| e.event_type == EventType::WorkflowFailed)
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554
555 #[test]
556 fn test_workflow_started_event() {
557 let execution_id = ExecutionId::new_v4();
558 let workflow_id = WorkflowId::new_v4();
559 let metadata = WorkflowMetadata::new("test-workflow".to_string());
560
561 let event = ExecutionEvent::workflow_started(
562 execution_id,
563 workflow_id,
564 metadata.clone(),
565 HashMap::new(),
566 );
567
568 assert_eq!(event.execution_id, execution_id);
569 assert_eq!(event.workflow_id, workflow_id);
570 assert_eq!(event.event_type, EventType::WorkflowStarted);
571 assert!(event.node_id.is_none());
572 }
573
574 #[test]
575 fn test_node_events() {
576 let execution_id = ExecutionId::new_v4();
577 let workflow_id = WorkflowId::new_v4();
578 let node_id = NodeId::new_v4();
579
580 let started = ExecutionEvent::node_started(
581 execution_id,
582 workflow_id,
583 node_id,
584 NodeKind::Start,
585 HashMap::new(),
586 );
587 assert_eq!(started.event_type, EventType::NodeStarted);
588 assert_eq!(started.node_id, Some(node_id));
589
590 let metrics = NodeMetrics::default();
591 let completed = ExecutionEvent::node_completed(
592 execution_id,
593 workflow_id,
594 node_id,
595 NodeKind::Start,
596 100,
597 metrics,
598 HashMap::new(),
599 );
600 assert_eq!(completed.event_type, EventType::NodeCompleted);
601 }
602
603 #[test]
604 fn test_event_timeline() {
605 let mut timeline = EventTimeline::new();
606 let execution_id = ExecutionId::new_v4();
607 let workflow_id = WorkflowId::new_v4();
608
609 let metadata = WorkflowMetadata::new("test".to_string());
610
611 timeline.push(ExecutionEvent::workflow_started(
612 execution_id,
613 workflow_id,
614 metadata,
615 HashMap::new(),
616 ));
617
618 assert_eq!(timeline.events.len(), 1);
619 assert_eq!(timeline.count_by_type(EventType::WorkflowStarted), 1);
620 }
621
622 #[test]
623 fn test_timeline_filtering() {
624 let mut timeline = EventTimeline::new();
625 let execution_id = ExecutionId::new_v4();
626 let workflow_id = WorkflowId::new_v4();
627 let node_id = NodeId::new_v4();
628
629 timeline.push(ExecutionEvent::node_started(
631 execution_id,
632 workflow_id,
633 node_id,
634 NodeKind::Start,
635 HashMap::new(),
636 ));
637
638 timeline.push(ExecutionEvent::node_failed(
639 execution_id,
640 workflow_id,
641 node_id,
642 NodeKind::Start,
643 "Test error".to_string(),
644 None,
645 0,
646 ));
647
648 let node_events = timeline.filter_by_node(node_id);
649 assert_eq!(node_events.len(), 2);
650
651 let errors = timeline.errors();
652 assert_eq!(errors.len(), 1);
653 }
654
655 #[test]
656 fn test_variable_changed_event() {
657 let execution_id = ExecutionId::new_v4();
658 let workflow_id = WorkflowId::new_v4();
659 let node_id = NodeId::new_v4();
660
661 let event = ExecutionEvent::variable_changed(
662 execution_id,
663 workflow_id,
664 Some(node_id),
665 "counter".to_string(),
666 Some(Value::from(0)),
667 Value::from(1),
668 node_id.to_string(),
669 );
670
671 assert_eq!(event.event_type, EventType::VariableChanged);
672 if let EventDetails::VariableChanged { variable_name, .. } = &event.details {
673 assert_eq!(variable_name, "counter");
674 } else {
675 panic!("Expected VariableChanged event details");
676 }
677 }
678
679 #[test]
680 fn test_timeline_success_check() {
681 let mut timeline = EventTimeline::new();
682 let execution_id = ExecutionId::new_v4();
683 let workflow_id = WorkflowId::new_v4();
684
685 let metadata = WorkflowMetadata::new("test".to_string());
686
687 timeline.push(ExecutionEvent::workflow_started(
688 execution_id,
689 workflow_id,
690 metadata,
691 HashMap::new(),
692 ));
693
694 assert!(!timeline.is_successful());
695 assert!(!timeline.is_failed());
696
697 let result = ExecutionResult::Success(Value::Null);
698 timeline.push(ExecutionEvent::workflow_completed(
699 execution_id,
700 workflow_id,
701 1000,
702 result,
703 ));
704
705 assert!(timeline.is_successful());
706 assert!(!timeline.is_failed());
707 }
708}