Skip to main content

sqry_core/uses/
aggregator.rs

1//! Diagnostics Aggregator - transforms raw events into actionable summaries
2//!
3//! This module provides the "local reducer" that converts daily event logs
4//! into weekly `DiagnosticsSummary` objects. Raw events are useless until
5//! interpreted locally.
6//!
7//! # What the Aggregator Produces
8//!
9//! The reducer output answers:
10//! - **What is used**: `top_workflows` counts
11//! - **What is ignored**: Low counts relative to others
12//! - **What is confusing**: High per-kind `abandonment` rates
13//! - **What needs refinement**: High `ai_requery_rate`
14//!
15//! Nothing sensitive. All local.
16//!
17//! # Usage
18//!
19//! ```rust,ignore
20//! use sqry_core::uses::{DiagnosticsAggregator, IsoWeekPeriod};
21//!
22//! let aggregator = DiagnosticsAggregator::new(&uses_dir);
23//! let summary = aggregator.summarize_week("2025-W50")?;
24//! println!("Top workflow: {:?}", summary.top_workflows.first());
25//! ```
26
27use super::storage::UsesStorage;
28use super::types::{
29    DiagnosticsSummary, GraphAbandonRate, GraphKind, IsoWeekPeriod, QueryKind, TopWorkflow,
30    UseEvent, UseEventType, ViewKind,
31};
32use chrono::{Datelike, Duration, NaiveDate, Utc};
33use std::collections::HashMap;
34use std::path::Path;
35
36/// Diagnostics aggregator - transforms raw events into weekly summaries
37pub struct DiagnosticsAggregator {
38    storage: UsesStorage,
39}
40
41impl DiagnosticsAggregator {
42    /// Create a new aggregator for the given uses directory
43    #[must_use]
44    pub fn new(uses_dir: &Path) -> Self {
45        Self {
46            storage: UsesStorage::new(uses_dir.to_path_buf()),
47        }
48    }
49
50    /// Get the underlying storage
51    #[must_use]
52    pub fn storage(&self) -> &UsesStorage {
53        &self.storage
54    }
55
56    /// Generate a summary for an ISO week
57    ///
58    /// # Arguments
59    ///
60    /// * `week` - ISO week string (e.g., "2025-W50")
61    ///
62    /// # Returns
63    ///
64    /// A `DiagnosticsSummary` containing aggregated metrics for the week.
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if the week format is invalid or events cannot be loaded.
69    pub fn summarize_week(&self, week: &str) -> Result<DiagnosticsSummary, AggregatorError> {
70        let period = IsoWeekPeriod::try_new(week)
71            .map_err(|_| AggregatorError::InvalidWeekFormat(week.to_string()))?;
72
73        // Calculate date range for the week
74        let (start_date, end_date) = week_to_date_range(week)?;
75
76        // Load events for the week
77        let (events, _skipped) = self
78            .storage
79            .load_events_for_range(&start_date, &end_date)
80            .map_err(|e| AggregatorError::StorageError(e.to_string()))?;
81
82        // Aggregate the events
83        Ok(Self::aggregate_events(&events, period))
84    }
85
86    /// Generate a summary for a custom date range
87    ///
88    /// # Arguments
89    ///
90    /// * `start_date` - Start date in "YYYY-MM-DD" format
91    /// * `end_date` - End date in "YYYY-MM-DD" format
92    ///
93    /// # Returns
94    ///
95    /// A `DiagnosticsSummary` containing aggregated metrics for the range.
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if events cannot be loaded.
100    pub fn summarize_range(
101        &self,
102        start_date: &str,
103        end_date: &str,
104    ) -> Result<DiagnosticsSummary, AggregatorError> {
105        let (events, _skipped) = self
106            .storage
107            .load_events_for_range(start_date, end_date)
108            .map_err(|e| AggregatorError::StorageError(e.to_string()))?;
109
110        // Use current week as period label
111        Ok(Self::aggregate_events(&events, IsoWeekPeriod::current()))
112    }
113
114    /// Generate a summary for the current week
115    ///
116    /// # Returns
117    ///
118    /// A `DiagnosticsSummary` for the current ISO week.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if events cannot be loaded.
123    pub fn summarize_current_week(&self) -> Result<DiagnosticsSummary, AggregatorError> {
124        let week = current_iso_week();
125        self.summarize_week(&week)
126    }
127
128    /// Aggregate events into a summary
129    fn aggregate_events(events: &[UseEvent], period: IsoWeekPeriod) -> DiagnosticsSummary {
130        if events.is_empty() {
131            return DiagnosticsSummary {
132                period,
133                ..Default::default()
134            };
135        }
136
137        let top_workflows = Self::count_workflows(events);
138        let (avg_time, median_time) = Self::calculate_timing_metrics(events);
139        let abandon_rate = Self::calculate_overall_abandon_rate(events);
140        let abandonment = Self::calculate_per_kind_abandonment(events);
141        let ai_requery_rate = Self::calculate_requery_rate(events);
142
143        DiagnosticsSummary {
144            period,
145            top_workflows,
146            avg_time_to_result_sec: avg_time,
147            median_time_to_result_sec: median_time,
148            abandon_rate,
149            abandonment,
150            ai_requery_rate,
151            total_uses: events.len(),
152            dropped_events: 0, // This is populated from collector, not events
153        }
154    }
155
156    /// Count workflow usage by query kind
157    fn count_workflows(events: &[UseEvent]) -> Vec<TopWorkflow> {
158        let mut counts: HashMap<QueryKind, usize> = HashMap::new();
159
160        for event in events {
161            if let UseEventType::QueryExecuted { kind, .. } = &event.event_type {
162                *counts.entry(*kind).or_insert(0) += 1;
163            }
164        }
165
166        // Convert to TopWorkflow and sort by count descending
167        let mut workflows: Vec<TopWorkflow> = counts
168            .into_iter()
169            .map(|(kind, count)| TopWorkflow { kind, count })
170            .collect();
171
172        workflows.sort_by(|a, b| b.count.cmp(&a.count));
173        workflows
174    }
175
176    /// Calculate average and median timing metrics
177    fn calculate_timing_metrics(events: &[UseEvent]) -> (f64, f64) {
178        let durations: Vec<f64> = events
179            .iter()
180            .filter_map(|e| e.duration_ms)
181            .map(|ms| f64::from(u32::try_from(ms).unwrap_or(u32::MAX)) / 1000.0)
182            .collect();
183
184        if durations.is_empty() {
185            return (0.0, 0.0);
186        }
187
188        let avg = average_duration(&durations);
189        let mut sorted = durations.clone();
190        let median = median_duration(&mut sorted);
191
192        (avg, median)
193    }
194
195    /// Calculate overall abandonment rate
196    ///
197    /// Abandonment rate = `ViewAbandoned` events / (`ViewAbandoned` + successful interactions)
198    fn calculate_overall_abandon_rate(events: &[UseEvent]) -> f64 {
199        let abandoned = events
200            .iter()
201            .filter(|e| matches!(e.event_type, UseEventType::ViewAbandoned { .. }))
202            .count();
203
204        // Consider graph expansions as "successful interactions"
205        let graph_expansions = events
206            .iter()
207            .filter(|e| matches!(e.event_type, UseEventType::GraphExpanded { .. }))
208            .count();
209
210        let total = abandoned + graph_expansions;
211        if total == 0 {
212            return 0.0;
213        }
214
215        usize_to_f64(abandoned) / usize_to_f64(total)
216    }
217
218    /// Calculate per-graph-kind abandonment rates
219    fn calculate_per_kind_abandonment(events: &[UseEvent]) -> Vec<GraphAbandonRate> {
220        let abandoned_counts = Self::count_abandoned_by_kind(events);
221        let graph_counts = Self::count_graph_expansions(events);
222        let graph_abandonments = *abandoned_counts.get(&ViewKind::Graph).unwrap_or(&0);
223        let total_graph_expansions: usize = graph_counts.values().sum();
224
225        let mut rates = Vec::new();
226
227        for graph_kind in [
228            GraphKind::CallGraph,
229            GraphKind::DependencyGraph,
230            GraphKind::ImportGraph,
231        ] {
232            if let Some(rate) = Self::graph_abandonment_rate(
233                graph_kind,
234                graph_abandonments,
235                total_graph_expansions,
236                &graph_counts,
237            ) {
238                rates.push(GraphAbandonRate {
239                    kind: graph_kind,
240                    rate,
241                });
242            }
243        }
244
245        rates
246    }
247
248    fn count_abandoned_by_kind(events: &[UseEvent]) -> HashMap<ViewKind, usize> {
249        let mut abandoned_counts: HashMap<ViewKind, usize> = HashMap::new();
250        for event in events {
251            if let UseEventType::ViewAbandoned { kind, .. } = &event.event_type {
252                *abandoned_counts.entry(*kind).or_insert(0) += 1;
253            }
254        }
255        abandoned_counts
256    }
257
258    fn count_graph_expansions(events: &[UseEvent]) -> HashMap<GraphKind, usize> {
259        let mut graph_counts: HashMap<GraphKind, usize> = HashMap::new();
260        for event in events {
261            if let UseEventType::GraphExpanded { kind, .. } = &event.event_type {
262                *graph_counts.entry(*kind).or_insert(0) += 1;
263            }
264        }
265        graph_counts
266    }
267
268    fn graph_abandonment_rate(
269        graph_kind: GraphKind,
270        graph_abandonments: usize,
271        total_graph_expansions: usize,
272        graph_counts: &HashMap<GraphKind, usize>,
273    ) -> Option<f64> {
274        let expansions = *graph_counts.get(&graph_kind).unwrap_or(&0);
275        if expansions == 0 && graph_abandonments == 0 {
276            return None;
277        }
278
279        let proportional_abandonments = if total_graph_expansions > 0 {
280            (usize_to_f64(graph_abandonments) * usize_to_f64(expansions))
281                / usize_to_f64(total_graph_expansions)
282        } else {
283            0.0
284        };
285
286        let rate = if expansions > 0 {
287            proportional_abandonments / (proportional_abandonments + usize_to_f64(expansions))
288        } else {
289            0.0
290        };
291
292        Some(rate)
293    }
294
295    /// Calculate AI requery rate
296    ///
297    /// Requery rate = AI answers where requeried=true / total AI answers
298    fn calculate_requery_rate(events: &[UseEvent]) -> f64 {
299        let mut total_ai = 0;
300        let mut requeried = 0;
301
302        for event in events {
303            if let UseEventType::AiAnswerGenerated { requeried: r, .. } = &event.event_type {
304                total_ai += 1;
305                if *r {
306                    requeried += 1;
307                }
308            }
309        }
310
311        if total_ai == 0 {
312            return 0.0;
313        }
314
315        f64::from(requeried) / f64::from(total_ai)
316    }
317
318    /// Prune old event files
319    ///
320    /// Delegates to storage layer.
321    ///
322    /// # Arguments
323    ///
324    /// * `retain_days` - Number of days to retain
325    ///
326    /// # Returns
327    ///
328    /// Number of files deleted.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if pruning fails.
333    pub fn prune(&self, retain_days: u32) -> Result<usize, AggregatorError> {
334        self.storage
335            .prune_old_events(retain_days)
336            .map_err(|e| AggregatorError::StorageError(e.to_string()))
337    }
338
339    /// Save a summary to the summaries directory
340    ///
341    /// # Arguments
342    ///
343    /// * `summary` - The summary to save
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if the summary cannot be serialized or written.
348    pub fn save_summary(&self, summary: &DiagnosticsSummary) -> Result<(), AggregatorError> {
349        let json = serde_json::to_vec_pretty(summary)
350            .map_err(|e| AggregatorError::SerializationError(e.to_string()))?;
351
352        self.storage
353            .write_summary(summary.period.as_str(), &json)
354            .map_err(|e| AggregatorError::StorageError(e.to_string()))
355    }
356
357    /// Load a saved summary
358    ///
359    /// # Arguments
360    ///
361    /// * `week` - ISO week string (e.g., "2025-W50")
362    ///
363    /// # Returns
364    ///
365    /// The saved summary if it exists.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if the summary cannot be read or parsed.
370    pub fn load_summary(&self, week: &str) -> Result<DiagnosticsSummary, AggregatorError> {
371        let data = self
372            .storage
373            .read_summary(week)
374            .map_err(|e| AggregatorError::StorageError(e.to_string()))?;
375
376        serde_json::from_slice(&data)
377            .map_err(|e| AggregatorError::SerializationError(e.to_string()))
378    }
379
380    /// Check if a summary exists for a week
381    #[must_use]
382    pub fn summary_exists(&self, week: &str) -> bool {
383        self.storage.summary_exists(week)
384    }
385
386    /// Get or generate summary for a week
387    ///
388    /// Loads cached summary if available, otherwise generates and saves it.
389    ///
390    /// # Arguments
391    ///
392    /// * `week` - ISO week string (e.g., "2025-W50")
393    ///
394    /// # Returns
395    ///
396    /// The summary for the week.
397    ///
398    /// # Errors
399    ///
400    /// Returns an error if the summary cannot be loaded or generated.
401    pub fn get_or_generate_summary(
402        &self,
403        week: &str,
404    ) -> Result<DiagnosticsSummary, AggregatorError> {
405        // Try to load cached
406        if self.summary_exists(week)
407            && let Ok(summary) = self.load_summary(week)
408        {
409            return Ok(summary);
410        }
411
412        // Generate fresh
413        let summary = self.summarize_week(week)?;
414
415        // Save for future use
416        let _ = self.save_summary(&summary);
417
418        Ok(summary)
419    }
420}
421
422/// Convert ISO week string to date range
423fn week_to_date_range(week: &str) -> Result<(String, String), AggregatorError> {
424    let (year, week_num) = parse_week_parts(week)?;
425
426    // Calculate first day of the ISO week (Monday)
427    // ISO week 1 is the week containing the first Thursday of the year
428    let jan4 = NaiveDate::from_ymd_opt(year, 1, 4).ok_or_else(|| invalid_week_error(week))?;
429
430    // Find Monday of week 1
431    let days_since_monday = jan4.weekday().num_days_from_monday();
432    let week1_monday = jan4 - Duration::days(i64::from(days_since_monday));
433
434    // Calculate start of requested week
435    let start = week1_monday + Duration::weeks(i64::from(week_num - 1));
436    let end = start + Duration::days(6);
437
438    Ok((
439        start.format("%Y-%m-%d").to_string(),
440        end.format("%Y-%m-%d").to_string(),
441    ))
442}
443
444/// Get the current ISO week string
445fn current_iso_week() -> String {
446    Utc::now().format("%G-W%V").to_string()
447}
448
449fn usize_to_f64(value: usize) -> f64 {
450    f64::from(u32::try_from(value).unwrap_or(u32::MAX))
451}
452
453fn average_duration(durations: &[f64]) -> f64 {
454    durations.iter().sum::<f64>() / usize_to_f64(durations.len())
455}
456
457fn median_duration(values: &mut [f64]) -> f64 {
458    values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
459    if values.len().is_multiple_of(2) {
460        let mid = values.len() / 2;
461        f64::midpoint(values[mid - 1], values[mid])
462    } else {
463        values[values.len() / 2]
464    }
465}
466
467fn parse_week_parts(week: &str) -> Result<(i32, u32), AggregatorError> {
468    // Parse "YYYY-Www" format
469    if week.len() != 8 || !week.contains("-W") {
470        return Err(invalid_week_error(week));
471    }
472
473    let year: i32 = week[0..4].parse().map_err(|_| invalid_week_error(week))?;
474    let week_num: u32 = week[6..8].parse().map_err(|_| invalid_week_error(week))?;
475
476    Ok((year, week_num))
477}
478
479fn invalid_week_error(week: &str) -> AggregatorError {
480    AggregatorError::InvalidWeekFormat(week.to_string())
481}
482
483/// Errors that can occur during aggregation
484#[derive(Debug, thiserror::Error)]
485pub enum AggregatorError {
486    /// Invalid ISO week format
487    #[error("invalid ISO week format: {0}")]
488    InvalidWeekFormat(String),
489
490    /// Storage operation failed
491    #[error("storage error: {0}")]
492    StorageError(String),
493
494    /// Serialization/deserialization error
495    #[error("serialization error: {0}")]
496    SerializationError(String),
497}
498
499// ============================================================================
500// Tests
501// ============================================================================
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use tempfile::tempdir;
507
508    const FLOAT_EPSILON: f64 = 1.0e-9;
509
510    fn assert_f64_close(actual: f64, expected: f64) {
511        assert!(
512            (actual - expected).abs() < FLOAT_EPSILON,
513            "expected {expected}, got {actual}"
514        );
515    }
516
517    fn create_test_events() -> Vec<UseEvent> {
518        vec![
519            UseEvent::with_duration(
520                UseEventType::QueryExecuted {
521                    kind: QueryKind::CallChain,
522                    result_count: 10,
523                },
524                100,
525            ),
526            UseEvent::with_duration(
527                UseEventType::QueryExecuted {
528                    kind: QueryKind::CallChain,
529                    result_count: 20,
530                },
531                200,
532            ),
533            UseEvent::with_duration(
534                UseEventType::QueryExecuted {
535                    kind: QueryKind::ImpactAnalysis,
536                    result_count: 5,
537                },
538                300,
539            ),
540            UseEvent::new(UseEventType::GraphExpanded {
541                kind: GraphKind::CallGraph,
542                depth: 3,
543            }),
544            UseEvent::new(UseEventType::ViewAbandoned {
545                kind: ViewKind::Graph,
546                time_spent_ms: 5000,
547            }),
548            UseEvent::new(UseEventType::AiAnswerGenerated {
549                accepted: true,
550                requeried: false,
551            }),
552            UseEvent::new(UseEventType::AiAnswerGenerated {
553                accepted: false,
554                requeried: true,
555            }),
556        ]
557    }
558
559    #[test]
560    fn test_count_workflows() {
561        let events = create_test_events();
562
563        let workflows = DiagnosticsAggregator::count_workflows(&events);
564
565        assert_eq!(workflows.len(), 2);
566        // CallChain should be first (2 occurrences)
567        assert_eq!(workflows[0].kind, QueryKind::CallChain);
568        assert_eq!(workflows[0].count, 2);
569        // ImpactAnalysis second (1 occurrence)
570        assert_eq!(workflows[1].kind, QueryKind::ImpactAnalysis);
571        assert_eq!(workflows[1].count, 1);
572    }
573
574    #[test]
575    fn test_timing_metrics() {
576        let events = create_test_events();
577
578        let (avg, median) = DiagnosticsAggregator::calculate_timing_metrics(&events);
579
580        // Events have durations: 100ms, 200ms, 300ms
581        // Avg = 200ms = 0.2s
582        assert!((avg - 0.2).abs() < 0.001);
583        // Median = 200ms = 0.2s (middle of 3 values)
584        assert!((median - 0.2).abs() < 0.001);
585    }
586
587    #[test]
588    fn test_overall_abandon_rate() {
589        let events = create_test_events();
590
591        let rate = DiagnosticsAggregator::calculate_overall_abandon_rate(&events);
592
593        // 1 abandoned, 1 graph expansion = 0.5
594        assert!((rate - 0.5).abs() < 0.001);
595    }
596
597    #[test]
598    fn test_requery_rate() {
599        let events = create_test_events();
600
601        let rate = DiagnosticsAggregator::calculate_requery_rate(&events);
602
603        // 1 requeried out of 2 AI answers = 0.5
604        assert!((rate - 0.5).abs() < 0.001);
605    }
606
607    #[test]
608    fn test_aggregate_events() {
609        let events = create_test_events();
610        let period = IsoWeekPeriod::try_new("2025-W50").unwrap();
611
612        let summary = DiagnosticsAggregator::aggregate_events(&events, period);
613
614        assert_eq!(summary.total_uses, 7);
615        assert!(!summary.top_workflows.is_empty());
616        assert!((summary.abandon_rate - 0.5).abs() < 0.001);
617        assert!((summary.ai_requery_rate - 0.5).abs() < 0.001);
618    }
619
620    #[test]
621    fn test_empty_events() {
622        let events: Vec<UseEvent> = vec![];
623        let period = IsoWeekPeriod::try_new("2025-W50").unwrap();
624
625        let summary = DiagnosticsAggregator::aggregate_events(&events, period);
626
627        assert_eq!(summary.total_uses, 0);
628        assert!(summary.top_workflows.is_empty());
629        assert_f64_close(summary.avg_time_to_result_sec, 0.0);
630        assert_f64_close(summary.abandon_rate, 0.0);
631    }
632
633    #[test]
634    fn test_week_to_date_range() {
635        // Week 1 of 2025
636        let (start, end) = week_to_date_range("2025-W01").unwrap();
637        assert_eq!(start, "2024-12-30"); // Monday of ISO week 1
638        assert_eq!(end, "2025-01-05"); // Sunday
639
640        // Week 50 of 2025
641        let (start, end) = week_to_date_range("2025-W50").unwrap();
642        assert_eq!(start, "2025-12-08");
643        assert_eq!(end, "2025-12-14");
644    }
645
646    #[test]
647    fn test_invalid_week_format() {
648        let result = week_to_date_range("invalid");
649        assert!(result.is_err());
650
651        let result = week_to_date_range("2025-50");
652        assert!(result.is_err());
653    }
654
655    #[test]
656    fn test_save_and_load_summary() {
657        let dir = tempdir().unwrap();
658        let uses_dir = dir.path().join("uses");
659        let aggregator = DiagnosticsAggregator::new(&uses_dir);
660
661        // Ensure directories exist
662        aggregator.storage.ensure_directories().unwrap();
663
664        let summary = DiagnosticsSummary {
665            period: IsoWeekPeriod::try_new("2025-W50").unwrap(),
666            top_workflows: vec![TopWorkflow {
667                kind: QueryKind::CallChain,
668                count: 42,
669            }],
670            avg_time_to_result_sec: 1.5,
671            median_time_to_result_sec: 1.2,
672            abandon_rate: 0.1,
673            abandonment: vec![],
674            ai_requery_rate: 0.3,
675            total_uses: 100,
676            dropped_events: 0,
677        };
678
679        aggregator.save_summary(&summary).unwrap();
680        assert!(aggregator.summary_exists("2025-W50"));
681
682        let loaded = aggregator.load_summary("2025-W50").unwrap();
683        assert_eq!(loaded.total_uses, 100);
684        assert_eq!(loaded.top_workflows[0].count, 42);
685    }
686
687    #[test]
688    fn test_current_iso_week() {
689        let week = current_iso_week();
690        // Should match format YYYY-Www
691        assert!(week.len() == 8);
692        assert!(week.contains("-W"));
693    }
694}