oxify_model/
analytics.rs

1//! Workflow analytics and usage tracking
2//!
3//! This module provides detailed analytics for workflow execution patterns,
4//! performance metrics, and optimization insights.
5
6use crate::{EventTimeline, EventType, NodeId, WorkflowId};
7use chrono::{DateTime, Duration, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11#[cfg(feature = "openapi")]
12use utoipa::ToSchema;
13
14/// Workflow execution statistics aggregated over time
15#[derive(Debug, Clone, Serialize, Deserialize)]
16#[cfg_attr(feature = "openapi", derive(ToSchema))]
17pub struct WorkflowAnalytics {
18    /// Workflow identifier
19    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
20    pub workflow_id: WorkflowId,
21
22    /// Workflow name
23    pub workflow_name: String,
24
25    /// Time period for these analytics
26    pub period: AnalyticsPeriod,
27
28    /// Execution statistics
29    pub execution_stats: ExecutionStats,
30
31    /// Performance metrics
32    pub performance_metrics: PerformanceMetrics,
33
34    /// Node-level analytics
35    pub node_analytics: Vec<NodeAnalytics>,
36
37    /// Error patterns
38    pub error_patterns: Vec<ErrorPattern>,
39
40    /// Last updated timestamp
41    pub updated_at: DateTime<Utc>,
42}
43
44/// Time period for analytics aggregation
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[cfg_attr(feature = "openapi", derive(ToSchema))]
47pub struct AnalyticsPeriod {
48    /// Period start time
49    pub start: DateTime<Utc>,
50
51    /// Period end time
52    pub end: DateTime<Utc>,
53
54    /// Period type (for display)
55    pub period_type: PeriodType,
56}
57
58/// Period type for analytics
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[cfg_attr(feature = "openapi", derive(ToSchema))]
61pub enum PeriodType {
62    /// Last hour
63    Hourly,
64    /// Last 24 hours
65    Daily,
66    /// Last 7 days
67    Weekly,
68    /// Last 30 days
69    Monthly,
70    /// Custom time range
71    Custom,
72}
73
74/// Execution statistics
75#[derive(Debug, Clone, Default, Serialize, Deserialize)]
76#[cfg_attr(feature = "openapi", derive(ToSchema))]
77pub struct ExecutionStats {
78    /// Total number of executions
79    pub total_executions: u64,
80
81    /// Number of successful executions
82    pub successful_executions: u64,
83
84    /// Number of failed executions
85    pub failed_executions: u64,
86
87    /// Number of cancelled executions
88    pub cancelled_executions: u64,
89
90    /// Success rate (0.0 to 1.0)
91    pub success_rate: f64,
92
93    /// Failure rate (0.0 to 1.0)
94    pub failure_rate: f64,
95
96    /// Average executions per hour
97    pub executions_per_hour: f64,
98}
99
100impl ExecutionStats {
101    /// Calculate derived metrics
102    pub fn calculate_rates(&mut self) {
103        if self.total_executions > 0 {
104            self.success_rate = self.successful_executions as f64 / self.total_executions as f64;
105            self.failure_rate = self.failed_executions as f64 / self.total_executions as f64;
106        }
107    }
108}
109
110/// Performance metrics with percentiles
111#[derive(Debug, Clone, Default, Serialize, Deserialize)]
112#[cfg_attr(feature = "openapi", derive(ToSchema))]
113pub struct PerformanceMetrics {
114    /// Average execution duration in milliseconds
115    pub avg_duration_ms: f64,
116
117    /// Median (p50) execution duration in milliseconds
118    pub p50_duration_ms: u64,
119
120    /// 95th percentile execution duration in milliseconds
121    pub p95_duration_ms: u64,
122
123    /// 99th percentile execution duration in milliseconds
124    pub p99_duration_ms: u64,
125
126    /// Minimum execution duration in milliseconds
127    pub min_duration_ms: u64,
128
129    /// Maximum execution duration in milliseconds
130    pub max_duration_ms: u64,
131
132    /// Total token usage across all executions
133    pub total_tokens: u64,
134
135    /// Average tokens per execution
136    pub avg_tokens: f64,
137
138    /// Total cost in USD
139    pub total_cost_usd: f64,
140
141    /// Average cost per execution in USD
142    pub avg_cost_usd: f64,
143}
144
145/// Node-level analytics for identifying bottlenecks
146#[derive(Debug, Clone, Serialize, Deserialize)]
147#[cfg_attr(feature = "openapi", derive(ToSchema))]
148pub struct NodeAnalytics {
149    /// Node identifier
150    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
151    pub node_id: NodeId,
152
153    /// Node name
154    pub node_name: String,
155
156    /// Node type (e.g., "LLM", "Retriever")
157    pub node_type: String,
158
159    /// Number of times this node executed
160    pub execution_count: u64,
161
162    /// Number of successful executions
163    pub success_count: u64,
164
165    /// Number of failed executions
166    pub failure_count: u64,
167
168    /// Average execution duration in milliseconds
169    pub avg_duration_ms: f64,
170
171    /// Maximum execution duration in milliseconds
172    pub max_duration_ms: u64,
173
174    /// Total duration across all executions
175    pub total_duration_ms: u64,
176
177    /// Percentage of total workflow time spent in this node
178    pub time_percentage: f64,
179
180    /// Is this a bottleneck (slowest node)?
181    pub is_bottleneck: bool,
182}
183
184/// Error pattern analysis
185#[derive(Debug, Clone, Serialize, Deserialize)]
186#[cfg_attr(feature = "openapi", derive(ToSchema))]
187pub struct ErrorPattern {
188    /// Error message or pattern
189    pub error_message: String,
190
191    /// Number of occurrences
192    pub occurrence_count: u64,
193
194    /// Percentage of total errors
195    pub error_percentage: f64,
196
197    /// Node IDs where this error occurred
198    #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
199    pub affected_nodes: Vec<NodeId>,
200
201    /// First occurrence timestamp
202    pub first_seen: DateTime<Utc>,
203
204    /// Last occurrence timestamp
205    pub last_seen: DateTime<Utc>,
206
207    /// Trend (increasing, stable, decreasing)
208    pub trend: ErrorTrend,
209}
210
211/// Error trend classification
212#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
213#[cfg_attr(feature = "openapi", derive(ToSchema))]
214pub enum ErrorTrend {
215    /// Error frequency is increasing
216    Increasing,
217    /// Error frequency is stable
218    Stable,
219    /// Error frequency is decreasing
220    Decreasing,
221}
222
223/// Analytics builder that processes event timelines
224pub struct AnalyticsBuilder {
225    workflow_id: WorkflowId,
226    workflow_name: String,
227    period_start: DateTime<Utc>,
228    period_end: DateTime<Utc>,
229    period_type: PeriodType,
230    timelines: Vec<EventTimeline>,
231}
232
233impl AnalyticsBuilder {
234    /// Create a new analytics builder
235    pub fn new(
236        workflow_id: WorkflowId,
237        workflow_name: String,
238        period_start: DateTime<Utc>,
239        period_end: DateTime<Utc>,
240        period_type: PeriodType,
241    ) -> Self {
242        Self {
243            workflow_id,
244            workflow_name,
245            period_start,
246            period_end,
247            period_type,
248            timelines: Vec::new(),
249        }
250    }
251
252    /// Add an event timeline for analysis
253    pub fn add_timeline(&mut self, timeline: EventTimeline) {
254        self.timelines.push(timeline);
255    }
256
257    /// Build the analytics report
258    pub fn build(self) -> WorkflowAnalytics {
259        let execution_stats = self.calculate_execution_stats();
260        let performance_metrics = self.calculate_performance_metrics();
261        let node_analytics = self.calculate_node_analytics();
262        let error_patterns = self.calculate_error_patterns();
263
264        WorkflowAnalytics {
265            workflow_id: self.workflow_id,
266            workflow_name: self.workflow_name,
267            period: AnalyticsPeriod {
268                start: self.period_start,
269                end: self.period_end,
270                period_type: self.period_type,
271            },
272            execution_stats,
273            performance_metrics,
274            node_analytics,
275            error_patterns,
276            updated_at: Utc::now(),
277        }
278    }
279
280    fn calculate_execution_stats(&self) -> ExecutionStats {
281        let total_executions = self.timelines.len() as u64;
282        let successful_executions =
283            self.timelines.iter().filter(|t| t.is_successful()).count() as u64;
284        let failed_executions = self.timelines.iter().filter(|t| t.is_failed()).count() as u64;
285        let cancelled_executions = self
286            .timelines
287            .iter()
288            .filter(|t| {
289                t.events
290                    .iter()
291                    .any(|e| e.event_type == EventType::WorkflowCancelled)
292            })
293            .count() as u64;
294
295        let period_hours = (self.period_end - self.period_start).num_hours() as f64;
296        let executions_per_hour = if period_hours > 0.0 {
297            total_executions as f64 / period_hours
298        } else {
299            0.0
300        };
301
302        let mut stats = ExecutionStats {
303            total_executions,
304            successful_executions,
305            failed_executions,
306            cancelled_executions,
307            success_rate: 0.0,
308            failure_rate: 0.0,
309            executions_per_hour,
310        };
311
312        stats.calculate_rates();
313        stats
314    }
315
316    fn calculate_performance_metrics(&self) -> PerformanceMetrics {
317        // Extract durations from WorkflowCompleted events
318        let mut durations: Vec<u64> = self
319            .timelines
320            .iter()
321            .filter_map(|timeline| {
322                // Look for WorkflowCompleted event
323                timeline.events.iter().find_map(|event| {
324                    if let crate::EventDetails::WorkflowCompleted { duration_ms, .. } =
325                        &event.details
326                    {
327                        Some(*duration_ms)
328                    } else {
329                        None
330                    }
331                })
332            })
333            .collect();
334
335        if durations.is_empty() {
336            return PerformanceMetrics::default();
337        }
338
339        durations.sort_unstable();
340
341        let avg_duration_ms = durations.iter().sum::<u64>() as f64 / durations.len() as f64;
342        let min_duration_ms = *durations.first().unwrap_or(&0);
343        let max_duration_ms = *durations.last().unwrap_or(&0);
344
345        let p50_idx = (durations.len() as f64 * 0.50) as usize;
346        let p95_idx = (durations.len() as f64 * 0.95) as usize;
347        let p99_idx = (durations.len() as f64 * 0.99) as usize;
348
349        let p50_duration_ms = durations.get(p50_idx).copied().unwrap_or(0);
350        let p95_duration_ms = durations.get(p95_idx).copied().unwrap_or(0);
351        let p99_duration_ms = durations.get(p99_idx).copied().unwrap_or(0);
352
353        PerformanceMetrics {
354            avg_duration_ms,
355            p50_duration_ms,
356            p95_duration_ms,
357            p99_duration_ms,
358            min_duration_ms,
359            max_duration_ms,
360            total_tokens: 0,
361            avg_tokens: 0.0,
362            total_cost_usd: 0.0,
363            avg_cost_usd: 0.0,
364        }
365    }
366
367    fn calculate_node_analytics(&self) -> Vec<NodeAnalytics> {
368        let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::new();
369
370        // Collect stats for each node
371        for timeline in &self.timelines {
372            for event in &timeline.events {
373                if let Some(node_id) = event.node_id {
374                    let stats = node_stats.entry(node_id).or_default();
375
376                    match event.event_type {
377                        EventType::NodeStarted => {
378                            stats.execution_count += 1;
379                        }
380                        EventType::NodeCompleted => {
381                            stats.success_count += 1;
382                            // Extract duration from event details
383                            if let crate::EventDetails::NodeCompleted { duration_ms, .. } =
384                                &event.details
385                            {
386                                stats.total_duration_ms += duration_ms;
387                                stats.max_duration_ms = stats.max_duration_ms.max(*duration_ms);
388                            }
389                        }
390                        EventType::NodeFailed => {
391                            stats.failure_count += 1;
392                        }
393                        _ => {}
394                    }
395                }
396            }
397        }
398
399        // Calculate total workflow time
400        let total_workflow_time: u64 = node_stats.values().map(|s| s.total_duration_ms).sum();
401
402        // Convert to NodeAnalytics
403        let mut analytics: Vec<NodeAnalytics> = node_stats
404            .into_iter()
405            .map(|(node_id, stats)| {
406                let avg_duration_ms = if stats.success_count > 0 {
407                    stats.total_duration_ms as f64 / stats.success_count as f64
408                } else {
409                    0.0
410                };
411
412                let time_percentage = if total_workflow_time > 0 {
413                    (stats.total_duration_ms as f64 / total_workflow_time as f64) * 100.0
414                } else {
415                    0.0
416                };
417
418                NodeAnalytics {
419                    node_id,
420                    node_name: format!("Node-{}", node_id),
421                    node_type: "Unknown".to_string(),
422                    execution_count: stats.execution_count,
423                    success_count: stats.success_count,
424                    failure_count: stats.failure_count,
425                    avg_duration_ms,
426                    max_duration_ms: stats.max_duration_ms,
427                    total_duration_ms: stats.total_duration_ms,
428                    time_percentage,
429                    is_bottleneck: false,
430                }
431            })
432            .collect();
433
434        // Mark the slowest node as bottleneck
435        if let Some(slowest) = analytics.iter_mut().max_by_key(|a| a.total_duration_ms) {
436            slowest.is_bottleneck = true;
437        }
438
439        // Sort by total duration descending
440        analytics.sort_by(|a, b| b.total_duration_ms.cmp(&a.total_duration_ms));
441
442        analytics
443    }
444
445    fn calculate_error_patterns(&self) -> Vec<ErrorPattern> {
446        let mut error_counts: HashMap<String, ErrorStats> = HashMap::new();
447
448        // Collect error statistics
449        for timeline in &self.timelines {
450            for event in &timeline.events {
451                if let EventType::NodeFailed
452                | EventType::WorkflowFailed
453                | EventType::ErrorOccurred = event.event_type
454                {
455                    let error_msg = self.extract_error_message(event);
456                    let stats =
457                        error_counts
458                            .entry(error_msg.clone())
459                            .or_insert_with(|| ErrorStats {
460                                message: error_msg,
461                                count: 0,
462                                affected_nodes: Vec::new(),
463                                first_seen: event.timestamp,
464                                last_seen: event.timestamp,
465                            });
466
467                    stats.count += 1;
468                    if let Some(node_id) = event.node_id {
469                        if !stats.affected_nodes.contains(&node_id) {
470                            stats.affected_nodes.push(node_id);
471                        }
472                    }
473                    stats.last_seen = stats.last_seen.max(event.timestamp);
474                    stats.first_seen = stats.first_seen.min(event.timestamp);
475                }
476            }
477        }
478
479        let total_errors: u64 = error_counts.values().map(|s| s.count).sum();
480
481        // Convert to ErrorPattern
482        let mut patterns: Vec<ErrorPattern> = error_counts
483            .into_values()
484            .map(|stats| {
485                let error_percentage = if total_errors > 0 {
486                    (stats.count as f64 / total_errors as f64) * 100.0
487                } else {
488                    0.0
489                };
490
491                ErrorPattern {
492                    error_message: stats.message,
493                    occurrence_count: stats.count,
494                    error_percentage,
495                    affected_nodes: stats.affected_nodes,
496                    first_seen: stats.first_seen,
497                    last_seen: stats.last_seen,
498                    trend: ErrorTrend::Stable, // Simplified, could be calculated from time series
499                }
500            })
501            .collect();
502
503        // Sort by occurrence count descending
504        patterns.sort_by(|a, b| b.occurrence_count.cmp(&a.occurrence_count));
505
506        patterns
507    }
508
509    fn extract_error_message(&self, event: &crate::ExecutionEvent) -> String {
510        use crate::EventDetails;
511        match &event.details {
512            EventDetails::NodeFailed { error, .. } => error.clone(),
513            EventDetails::WorkflowFailed { error, .. } => error.clone(),
514            EventDetails::ErrorOccurred { error, .. } => error.clone(),
515            _ => "Unknown error".to_string(),
516        }
517    }
518}
519
520#[derive(Default)]
521struct NodeStats {
522    execution_count: u64,
523    success_count: u64,
524    failure_count: u64,
525    total_duration_ms: u64,
526    max_duration_ms: u64,
527}
528
529struct ErrorStats {
530    message: String,
531    count: u64,
532    affected_nodes: Vec<NodeId>,
533    first_seen: DateTime<Utc>,
534    last_seen: DateTime<Utc>,
535}
536
537/// Helper to create common time periods
538impl AnalyticsPeriod {
539    /// Last hour
540    pub fn last_hour() -> Self {
541        let end = Utc::now();
542        let start = end - Duration::hours(1);
543        Self {
544            start,
545            end,
546            period_type: PeriodType::Hourly,
547        }
548    }
549
550    /// Last 24 hours
551    pub fn last_day() -> Self {
552        let end = Utc::now();
553        let start = end - Duration::days(1);
554        Self {
555            start,
556            end,
557            period_type: PeriodType::Daily,
558        }
559    }
560
561    /// Last 7 days
562    pub fn last_week() -> Self {
563        let end = Utc::now();
564        let start = end - Duration::weeks(1);
565        Self {
566            start,
567            end,
568            period_type: PeriodType::Weekly,
569        }
570    }
571
572    /// Last 30 days
573    pub fn last_month() -> Self {
574        let end = Utc::now();
575        let start = end - Duration::days(30);
576        Self {
577            start,
578            end,
579            period_type: PeriodType::Monthly,
580        }
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use crate::{ExecutionEvent, ExecutionId, ExecutionResult, NodeMetrics, WorkflowMetadata};
588    use std::collections::HashMap;
589
590    #[test]
591    fn test_execution_stats_calculation() {
592        let mut stats = ExecutionStats {
593            total_executions: 100,
594            successful_executions: 90,
595            failed_executions: 10,
596            cancelled_executions: 0,
597            success_rate: 0.0,
598            failure_rate: 0.0,
599            executions_per_hour: 0.0,
600        };
601
602        stats.calculate_rates();
603
604        assert_eq!(stats.success_rate, 0.9);
605        assert_eq!(stats.failure_rate, 0.1);
606    }
607
608    #[test]
609    fn test_analytics_builder_basic() {
610        let workflow_id = WorkflowId::new_v4();
611        let execution_id = ExecutionId::new_v4();
612
613        let mut builder = AnalyticsBuilder::new(
614            workflow_id,
615            "test-workflow".to_string(),
616            Utc::now() - Duration::hours(1),
617            Utc::now(),
618            PeriodType::Hourly,
619        );
620
621        // Create a successful execution timeline
622        let mut timeline = EventTimeline::new();
623        timeline.push(ExecutionEvent::workflow_started(
624            execution_id,
625            workflow_id,
626            WorkflowMetadata::new("test".to_string()),
627            HashMap::new(),
628        ));
629        timeline.push(ExecutionEvent::workflow_completed(
630            execution_id,
631            workflow_id,
632            1000,
633            ExecutionResult::Success(serde_json::Value::Null),
634        ));
635
636        builder.add_timeline(timeline);
637
638        let analytics = builder.build();
639
640        assert_eq!(analytics.execution_stats.total_executions, 1);
641        assert_eq!(analytics.execution_stats.successful_executions, 1);
642        assert_eq!(analytics.execution_stats.failed_executions, 0);
643        assert_eq!(analytics.execution_stats.success_rate, 1.0);
644    }
645
646    #[test]
647    fn test_performance_metrics_percentiles() {
648        let workflow_id = WorkflowId::new_v4();
649
650        let mut builder = AnalyticsBuilder::new(
651            workflow_id,
652            "test-workflow".to_string(),
653            Utc::now() - Duration::hours(1),
654            Utc::now(),
655            PeriodType::Hourly,
656        );
657
658        // Add multiple timelines with different durations
659        for duration in [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000] {
660            let execution_id = ExecutionId::new_v4();
661            let mut timeline = EventTimeline::new();
662
663            timeline.push(ExecutionEvent::workflow_started(
664                execution_id,
665                workflow_id,
666                WorkflowMetadata::new("test".to_string()),
667                HashMap::new(),
668            ));
669            timeline.push(ExecutionEvent::workflow_completed(
670                execution_id,
671                workflow_id,
672                duration,
673                ExecutionResult::Success(serde_json::Value::Null),
674            ));
675
676            builder.add_timeline(timeline);
677        }
678
679        let analytics = builder.build();
680
681        assert_eq!(analytics.performance_metrics.min_duration_ms, 100);
682        assert_eq!(analytics.performance_metrics.max_duration_ms, 1000);
683        assert!(analytics.performance_metrics.avg_duration_ms > 0.0);
684        assert!(analytics.performance_metrics.p50_duration_ms > 0);
685        assert!(
686            analytics.performance_metrics.p95_duration_ms
687                > analytics.performance_metrics.p50_duration_ms
688        );
689    }
690
691    #[test]
692    fn test_node_analytics_bottleneck_detection() {
693        let workflow_id = WorkflowId::new_v4();
694        let execution_id = ExecutionId::new_v4();
695        let fast_node = NodeId::new_v4();
696        let slow_node = NodeId::new_v4();
697
698        let mut builder = AnalyticsBuilder::new(
699            workflow_id,
700            "test-workflow".to_string(),
701            Utc::now() - Duration::hours(1),
702            Utc::now(),
703            PeriodType::Hourly,
704        );
705
706        let mut timeline = EventTimeline::new();
707
708        // Fast node
709        timeline.push(ExecutionEvent::node_completed(
710            execution_id,
711            workflow_id,
712            fast_node,
713            crate::NodeKind::Start,
714            100,
715            NodeMetrics::default(),
716            HashMap::new(),
717        ));
718
719        // Slow node (bottleneck)
720        timeline.push(ExecutionEvent::node_completed(
721            execution_id,
722            workflow_id,
723            slow_node,
724            crate::NodeKind::End,
725            1000,
726            NodeMetrics::default(),
727            HashMap::new(),
728        ));
729
730        builder.add_timeline(timeline);
731        let analytics = builder.build();
732
733        assert_eq!(analytics.node_analytics.len(), 2);
734
735        // The slowest node should be marked as bottleneck
736        let bottleneck = analytics.node_analytics.iter().find(|n| n.is_bottleneck);
737        assert!(bottleneck.is_some());
738        assert_eq!(bottleneck.unwrap().node_id, slow_node);
739    }
740
741    #[test]
742    fn test_error_pattern_analysis() {
743        let workflow_id = WorkflowId::new_v4();
744        let execution_id = ExecutionId::new_v4();
745        let node_id = NodeId::new_v4();
746
747        let mut builder = AnalyticsBuilder::new(
748            workflow_id,
749            "test-workflow".to_string(),
750            Utc::now() - Duration::hours(1),
751            Utc::now(),
752            PeriodType::Hourly,
753        );
754
755        let mut timeline = EventTimeline::new();
756
757        // Add multiple failures with same error
758        for _ in 0..3 {
759            timeline.push(ExecutionEvent::node_failed(
760                execution_id,
761                workflow_id,
762                node_id,
763                crate::NodeKind::Start,
764                "Connection timeout".to_string(),
765                None,
766                0,
767            ));
768        }
769
770        builder.add_timeline(timeline);
771        let analytics = builder.build();
772
773        assert_eq!(analytics.error_patterns.len(), 1);
774        assert_eq!(analytics.error_patterns[0].occurrence_count, 3);
775        assert_eq!(analytics.error_patterns[0].error_percentage, 100.0);
776        assert!(analytics.error_patterns[0]
777            .affected_nodes
778            .contains(&node_id));
779    }
780
781    #[test]
782    fn test_analytics_period_helpers() {
783        let hourly = AnalyticsPeriod::last_hour();
784        assert_eq!(hourly.period_type, PeriodType::Hourly);
785
786        let daily = AnalyticsPeriod::last_day();
787        assert_eq!(daily.period_type, PeriodType::Daily);
788
789        let weekly = AnalyticsPeriod::last_week();
790        assert_eq!(weekly.period_type, PeriodType::Weekly);
791
792        let monthly = AnalyticsPeriod::last_month();
793        assert_eq!(monthly.period_type, PeriodType::Monthly);
794    }
795}