1use crate::{EventTimeline, EventType, NodeId, WorkflowId};
7use chrono::{DateTime, Duration, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11#[cfg(feature = "openapi")]
12use utoipa::ToSchema;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16#[cfg_attr(feature = "openapi", derive(ToSchema))]
17pub struct WorkflowAnalytics {
18 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
20 pub workflow_id: WorkflowId,
21
22 pub workflow_name: String,
24
25 pub period: AnalyticsPeriod,
27
28 pub execution_stats: ExecutionStats,
30
31 pub performance_metrics: PerformanceMetrics,
33
34 pub node_analytics: Vec<NodeAnalytics>,
36
37 pub error_patterns: Vec<ErrorPattern>,
39
40 pub updated_at: DateTime<Utc>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46#[cfg_attr(feature = "openapi", derive(ToSchema))]
47pub struct AnalyticsPeriod {
48 pub start: DateTime<Utc>,
50
51 pub end: DateTime<Utc>,
53
54 pub period_type: PeriodType,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[cfg_attr(feature = "openapi", derive(ToSchema))]
61pub enum PeriodType {
62 Hourly,
64 Daily,
66 Weekly,
68 Monthly,
70 Custom,
72}
73
74#[derive(Debug, Clone, Default, Serialize, Deserialize)]
76#[cfg_attr(feature = "openapi", derive(ToSchema))]
77pub struct ExecutionStats {
78 pub total_executions: u64,
80
81 pub successful_executions: u64,
83
84 pub failed_executions: u64,
86
87 pub cancelled_executions: u64,
89
90 pub success_rate: f64,
92
93 pub failure_rate: f64,
95
96 pub executions_per_hour: f64,
98}
99
100impl ExecutionStats {
101 pub fn calculate_rates(&mut self) {
103 if self.total_executions > 0 {
104 self.success_rate = self.successful_executions as f64 / self.total_executions as f64;
105 self.failure_rate = self.failed_executions as f64 / self.total_executions as f64;
106 }
107 }
108}
109
110#[derive(Debug, Clone, Default, Serialize, Deserialize)]
112#[cfg_attr(feature = "openapi", derive(ToSchema))]
113pub struct PerformanceMetrics {
114 pub avg_duration_ms: f64,
116
117 pub p50_duration_ms: u64,
119
120 pub p95_duration_ms: u64,
122
123 pub p99_duration_ms: u64,
125
126 pub min_duration_ms: u64,
128
129 pub max_duration_ms: u64,
131
132 pub total_tokens: u64,
134
135 pub avg_tokens: f64,
137
138 pub total_cost_usd: f64,
140
141 pub avg_cost_usd: f64,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147#[cfg_attr(feature = "openapi", derive(ToSchema))]
148pub struct NodeAnalytics {
149 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
151 pub node_id: NodeId,
152
153 pub node_name: String,
155
156 pub node_type: String,
158
159 pub execution_count: u64,
161
162 pub success_count: u64,
164
165 pub failure_count: u64,
167
168 pub avg_duration_ms: f64,
170
171 pub max_duration_ms: u64,
173
174 pub total_duration_ms: u64,
176
177 pub time_percentage: f64,
179
180 pub is_bottleneck: bool,
182}
183
184#[derive(Debug, Clone, Serialize, Deserialize)]
186#[cfg_attr(feature = "openapi", derive(ToSchema))]
187pub struct ErrorPattern {
188 pub error_message: String,
190
191 pub occurrence_count: u64,
193
194 pub error_percentage: f64,
196
197 #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
199 pub affected_nodes: Vec<NodeId>,
200
201 pub first_seen: DateTime<Utc>,
203
204 pub last_seen: DateTime<Utc>,
206
207 pub trend: ErrorTrend,
209}
210
211#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
213#[cfg_attr(feature = "openapi", derive(ToSchema))]
214pub enum ErrorTrend {
215 Increasing,
217 Stable,
219 Decreasing,
221}
222
223pub struct AnalyticsBuilder {
225 workflow_id: WorkflowId,
226 workflow_name: String,
227 period_start: DateTime<Utc>,
228 period_end: DateTime<Utc>,
229 period_type: PeriodType,
230 timelines: Vec<EventTimeline>,
231}
232
233impl AnalyticsBuilder {
234 pub fn new(
236 workflow_id: WorkflowId,
237 workflow_name: String,
238 period_start: DateTime<Utc>,
239 period_end: DateTime<Utc>,
240 period_type: PeriodType,
241 ) -> Self {
242 Self {
243 workflow_id,
244 workflow_name,
245 period_start,
246 period_end,
247 period_type,
248 timelines: Vec::new(),
249 }
250 }
251
252 pub fn add_timeline(&mut self, timeline: EventTimeline) {
254 self.timelines.push(timeline);
255 }
256
257 pub fn build(self) -> WorkflowAnalytics {
259 let execution_stats = self.calculate_execution_stats();
260 let performance_metrics = self.calculate_performance_metrics();
261 let node_analytics = self.calculate_node_analytics();
262 let error_patterns = self.calculate_error_patterns();
263
264 WorkflowAnalytics {
265 workflow_id: self.workflow_id,
266 workflow_name: self.workflow_name,
267 period: AnalyticsPeriod {
268 start: self.period_start,
269 end: self.period_end,
270 period_type: self.period_type,
271 },
272 execution_stats,
273 performance_metrics,
274 node_analytics,
275 error_patterns,
276 updated_at: Utc::now(),
277 }
278 }
279
280 fn calculate_execution_stats(&self) -> ExecutionStats {
281 let total_executions = self.timelines.len() as u64;
282 let successful_executions =
283 self.timelines.iter().filter(|t| t.is_successful()).count() as u64;
284 let failed_executions = self.timelines.iter().filter(|t| t.is_failed()).count() as u64;
285 let cancelled_executions = self
286 .timelines
287 .iter()
288 .filter(|t| {
289 t.events
290 .iter()
291 .any(|e| e.event_type == EventType::WorkflowCancelled)
292 })
293 .count() as u64;
294
295 let period_hours = (self.period_end - self.period_start).num_hours() as f64;
296 let executions_per_hour = if period_hours > 0.0 {
297 total_executions as f64 / period_hours
298 } else {
299 0.0
300 };
301
302 let mut stats = ExecutionStats {
303 total_executions,
304 successful_executions,
305 failed_executions,
306 cancelled_executions,
307 success_rate: 0.0,
308 failure_rate: 0.0,
309 executions_per_hour,
310 };
311
312 stats.calculate_rates();
313 stats
314 }
315
316 fn calculate_performance_metrics(&self) -> PerformanceMetrics {
317 let mut durations: Vec<u64> = self
319 .timelines
320 .iter()
321 .filter_map(|timeline| {
322 timeline.events.iter().find_map(|event| {
324 if let crate::EventDetails::WorkflowCompleted { duration_ms, .. } =
325 &event.details
326 {
327 Some(*duration_ms)
328 } else {
329 None
330 }
331 })
332 })
333 .collect();
334
335 if durations.is_empty() {
336 return PerformanceMetrics::default();
337 }
338
339 durations.sort_unstable();
340
341 let avg_duration_ms = durations.iter().sum::<u64>() as f64 / durations.len() as f64;
342 let min_duration_ms = *durations.first().unwrap_or(&0);
343 let max_duration_ms = *durations.last().unwrap_or(&0);
344
345 let p50_idx = (durations.len() as f64 * 0.50) as usize;
346 let p95_idx = (durations.len() as f64 * 0.95) as usize;
347 let p99_idx = (durations.len() as f64 * 0.99) as usize;
348
349 let p50_duration_ms = durations.get(p50_idx).copied().unwrap_or(0);
350 let p95_duration_ms = durations.get(p95_idx).copied().unwrap_or(0);
351 let p99_duration_ms = durations.get(p99_idx).copied().unwrap_or(0);
352
353 PerformanceMetrics {
354 avg_duration_ms,
355 p50_duration_ms,
356 p95_duration_ms,
357 p99_duration_ms,
358 min_duration_ms,
359 max_duration_ms,
360 total_tokens: 0,
361 avg_tokens: 0.0,
362 total_cost_usd: 0.0,
363 avg_cost_usd: 0.0,
364 }
365 }
366
367 fn calculate_node_analytics(&self) -> Vec<NodeAnalytics> {
368 let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::new();
369
370 for timeline in &self.timelines {
372 for event in &timeline.events {
373 if let Some(node_id) = event.node_id {
374 let stats = node_stats.entry(node_id).or_default();
375
376 match event.event_type {
377 EventType::NodeStarted => {
378 stats.execution_count += 1;
379 }
380 EventType::NodeCompleted => {
381 stats.success_count += 1;
382 if let crate::EventDetails::NodeCompleted { duration_ms, .. } =
384 &event.details
385 {
386 stats.total_duration_ms += duration_ms;
387 stats.max_duration_ms = stats.max_duration_ms.max(*duration_ms);
388 }
389 }
390 EventType::NodeFailed => {
391 stats.failure_count += 1;
392 }
393 _ => {}
394 }
395 }
396 }
397 }
398
399 let total_workflow_time: u64 = node_stats.values().map(|s| s.total_duration_ms).sum();
401
402 let mut analytics: Vec<NodeAnalytics> = node_stats
404 .into_iter()
405 .map(|(node_id, stats)| {
406 let avg_duration_ms = if stats.success_count > 0 {
407 stats.total_duration_ms as f64 / stats.success_count as f64
408 } else {
409 0.0
410 };
411
412 let time_percentage = if total_workflow_time > 0 {
413 (stats.total_duration_ms as f64 / total_workflow_time as f64) * 100.0
414 } else {
415 0.0
416 };
417
418 NodeAnalytics {
419 node_id,
420 node_name: format!("Node-{}", node_id),
421 node_type: "Unknown".to_string(),
422 execution_count: stats.execution_count,
423 success_count: stats.success_count,
424 failure_count: stats.failure_count,
425 avg_duration_ms,
426 max_duration_ms: stats.max_duration_ms,
427 total_duration_ms: stats.total_duration_ms,
428 time_percentage,
429 is_bottleneck: false,
430 }
431 })
432 .collect();
433
434 if let Some(slowest) = analytics.iter_mut().max_by_key(|a| a.total_duration_ms) {
436 slowest.is_bottleneck = true;
437 }
438
439 analytics.sort_by(|a, b| b.total_duration_ms.cmp(&a.total_duration_ms));
441
442 analytics
443 }
444
445 fn calculate_error_patterns(&self) -> Vec<ErrorPattern> {
446 let mut error_counts: HashMap<String, ErrorStats> = HashMap::new();
447
448 for timeline in &self.timelines {
450 for event in &timeline.events {
451 if let EventType::NodeFailed
452 | EventType::WorkflowFailed
453 | EventType::ErrorOccurred = event.event_type
454 {
455 let error_msg = self.extract_error_message(event);
456 let stats =
457 error_counts
458 .entry(error_msg.clone())
459 .or_insert_with(|| ErrorStats {
460 message: error_msg,
461 count: 0,
462 affected_nodes: Vec::new(),
463 first_seen: event.timestamp,
464 last_seen: event.timestamp,
465 });
466
467 stats.count += 1;
468 if let Some(node_id) = event.node_id {
469 if !stats.affected_nodes.contains(&node_id) {
470 stats.affected_nodes.push(node_id);
471 }
472 }
473 stats.last_seen = stats.last_seen.max(event.timestamp);
474 stats.first_seen = stats.first_seen.min(event.timestamp);
475 }
476 }
477 }
478
479 let total_errors: u64 = error_counts.values().map(|s| s.count).sum();
480
481 let mut patterns: Vec<ErrorPattern> = error_counts
483 .into_values()
484 .map(|stats| {
485 let error_percentage = if total_errors > 0 {
486 (stats.count as f64 / total_errors as f64) * 100.0
487 } else {
488 0.0
489 };
490
491 ErrorPattern {
492 error_message: stats.message,
493 occurrence_count: stats.count,
494 error_percentage,
495 affected_nodes: stats.affected_nodes,
496 first_seen: stats.first_seen,
497 last_seen: stats.last_seen,
498 trend: ErrorTrend::Stable, }
500 })
501 .collect();
502
503 patterns.sort_by(|a, b| b.occurrence_count.cmp(&a.occurrence_count));
505
506 patterns
507 }
508
509 fn extract_error_message(&self, event: &crate::ExecutionEvent) -> String {
510 use crate::EventDetails;
511 match &event.details {
512 EventDetails::NodeFailed { error, .. } => error.clone(),
513 EventDetails::WorkflowFailed { error, .. } => error.clone(),
514 EventDetails::ErrorOccurred { error, .. } => error.clone(),
515 _ => "Unknown error".to_string(),
516 }
517 }
518}
519
520#[derive(Default)]
521struct NodeStats {
522 execution_count: u64,
523 success_count: u64,
524 failure_count: u64,
525 total_duration_ms: u64,
526 max_duration_ms: u64,
527}
528
529struct ErrorStats {
530 message: String,
531 count: u64,
532 affected_nodes: Vec<NodeId>,
533 first_seen: DateTime<Utc>,
534 last_seen: DateTime<Utc>,
535}
536
537impl AnalyticsPeriod {
539 pub fn last_hour() -> Self {
541 let end = Utc::now();
542 let start = end - Duration::hours(1);
543 Self {
544 start,
545 end,
546 period_type: PeriodType::Hourly,
547 }
548 }
549
550 pub fn last_day() -> Self {
552 let end = Utc::now();
553 let start = end - Duration::days(1);
554 Self {
555 start,
556 end,
557 period_type: PeriodType::Daily,
558 }
559 }
560
561 pub fn last_week() -> Self {
563 let end = Utc::now();
564 let start = end - Duration::weeks(1);
565 Self {
566 start,
567 end,
568 period_type: PeriodType::Weekly,
569 }
570 }
571
572 pub fn last_month() -> Self {
574 let end = Utc::now();
575 let start = end - Duration::days(30);
576 Self {
577 start,
578 end,
579 period_type: PeriodType::Monthly,
580 }
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use crate::{ExecutionEvent, ExecutionId, ExecutionResult, NodeMetrics, WorkflowMetadata};
588 use std::collections::HashMap;
589
590 #[test]
591 fn test_execution_stats_calculation() {
592 let mut stats = ExecutionStats {
593 total_executions: 100,
594 successful_executions: 90,
595 failed_executions: 10,
596 cancelled_executions: 0,
597 success_rate: 0.0,
598 failure_rate: 0.0,
599 executions_per_hour: 0.0,
600 };
601
602 stats.calculate_rates();
603
604 assert_eq!(stats.success_rate, 0.9);
605 assert_eq!(stats.failure_rate, 0.1);
606 }
607
608 #[test]
609 fn test_analytics_builder_basic() {
610 let workflow_id = WorkflowId::new_v4();
611 let execution_id = ExecutionId::new_v4();
612
613 let mut builder = AnalyticsBuilder::new(
614 workflow_id,
615 "test-workflow".to_string(),
616 Utc::now() - Duration::hours(1),
617 Utc::now(),
618 PeriodType::Hourly,
619 );
620
621 let mut timeline = EventTimeline::new();
623 timeline.push(ExecutionEvent::workflow_started(
624 execution_id,
625 workflow_id,
626 WorkflowMetadata::new("test".to_string()),
627 HashMap::new(),
628 ));
629 timeline.push(ExecutionEvent::workflow_completed(
630 execution_id,
631 workflow_id,
632 1000,
633 ExecutionResult::Success(serde_json::Value::Null),
634 ));
635
636 builder.add_timeline(timeline);
637
638 let analytics = builder.build();
639
640 assert_eq!(analytics.execution_stats.total_executions, 1);
641 assert_eq!(analytics.execution_stats.successful_executions, 1);
642 assert_eq!(analytics.execution_stats.failed_executions, 0);
643 assert_eq!(analytics.execution_stats.success_rate, 1.0);
644 }
645
646 #[test]
647 fn test_performance_metrics_percentiles() {
648 let workflow_id = WorkflowId::new_v4();
649
650 let mut builder = AnalyticsBuilder::new(
651 workflow_id,
652 "test-workflow".to_string(),
653 Utc::now() - Duration::hours(1),
654 Utc::now(),
655 PeriodType::Hourly,
656 );
657
658 for duration in [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000] {
660 let execution_id = ExecutionId::new_v4();
661 let mut timeline = EventTimeline::new();
662
663 timeline.push(ExecutionEvent::workflow_started(
664 execution_id,
665 workflow_id,
666 WorkflowMetadata::new("test".to_string()),
667 HashMap::new(),
668 ));
669 timeline.push(ExecutionEvent::workflow_completed(
670 execution_id,
671 workflow_id,
672 duration,
673 ExecutionResult::Success(serde_json::Value::Null),
674 ));
675
676 builder.add_timeline(timeline);
677 }
678
679 let analytics = builder.build();
680
681 assert_eq!(analytics.performance_metrics.min_duration_ms, 100);
682 assert_eq!(analytics.performance_metrics.max_duration_ms, 1000);
683 assert!(analytics.performance_metrics.avg_duration_ms > 0.0);
684 assert!(analytics.performance_metrics.p50_duration_ms > 0);
685 assert!(
686 analytics.performance_metrics.p95_duration_ms
687 > analytics.performance_metrics.p50_duration_ms
688 );
689 }
690
691 #[test]
692 fn test_node_analytics_bottleneck_detection() {
693 let workflow_id = WorkflowId::new_v4();
694 let execution_id = ExecutionId::new_v4();
695 let fast_node = NodeId::new_v4();
696 let slow_node = NodeId::new_v4();
697
698 let mut builder = AnalyticsBuilder::new(
699 workflow_id,
700 "test-workflow".to_string(),
701 Utc::now() - Duration::hours(1),
702 Utc::now(),
703 PeriodType::Hourly,
704 );
705
706 let mut timeline = EventTimeline::new();
707
708 timeline.push(ExecutionEvent::node_completed(
710 execution_id,
711 workflow_id,
712 fast_node,
713 crate::NodeKind::Start,
714 100,
715 NodeMetrics::default(),
716 HashMap::new(),
717 ));
718
719 timeline.push(ExecutionEvent::node_completed(
721 execution_id,
722 workflow_id,
723 slow_node,
724 crate::NodeKind::End,
725 1000,
726 NodeMetrics::default(),
727 HashMap::new(),
728 ));
729
730 builder.add_timeline(timeline);
731 let analytics = builder.build();
732
733 assert_eq!(analytics.node_analytics.len(), 2);
734
735 let bottleneck = analytics.node_analytics.iter().find(|n| n.is_bottleneck);
737 assert!(bottleneck.is_some());
738 assert_eq!(bottleneck.unwrap().node_id, slow_node);
739 }
740
741 #[test]
742 fn test_error_pattern_analysis() {
743 let workflow_id = WorkflowId::new_v4();
744 let execution_id = ExecutionId::new_v4();
745 let node_id = NodeId::new_v4();
746
747 let mut builder = AnalyticsBuilder::new(
748 workflow_id,
749 "test-workflow".to_string(),
750 Utc::now() - Duration::hours(1),
751 Utc::now(),
752 PeriodType::Hourly,
753 );
754
755 let mut timeline = EventTimeline::new();
756
757 for _ in 0..3 {
759 timeline.push(ExecutionEvent::node_failed(
760 execution_id,
761 workflow_id,
762 node_id,
763 crate::NodeKind::Start,
764 "Connection timeout".to_string(),
765 None,
766 0,
767 ));
768 }
769
770 builder.add_timeline(timeline);
771 let analytics = builder.build();
772
773 assert_eq!(analytics.error_patterns.len(), 1);
774 assert_eq!(analytics.error_patterns[0].occurrence_count, 3);
775 assert_eq!(analytics.error_patterns[0].error_percentage, 100.0);
776 assert!(analytics.error_patterns[0]
777 .affected_nodes
778 .contains(&node_id));
779 }
780
781 #[test]
782 fn test_analytics_period_helpers() {
783 let hourly = AnalyticsPeriod::last_hour();
784 assert_eq!(hourly.period_type, PeriodType::Hourly);
785
786 let daily = AnalyticsPeriod::last_day();
787 assert_eq!(daily.period_type, PeriodType::Daily);
788
789 let weekly = AnalyticsPeriod::last_week();
790 assert_eq!(weekly.period_type, PeriodType::Weekly);
791
792 let monthly = AnalyticsPeriod::last_month();
793 assert_eq!(monthly.period_type, PeriodType::Monthly);
794 }
795}