Skip to main content

mur_common/skill/
aggregator.rs

1//! Stats aggregator: receives `StatsEvent` from the runtime's fan-out
2//! adapter and flushes counter deltas into per-skill `stats.json` sidecars.
3//!
4//! Flush triggers: every 64 events or every 2 seconds, whichever comes first.
5
6use chrono::{DateTime, Utc};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::{Mutex, mpsc};
12
13use crate::skill::stats::SkillStats;
14
15pub const FLUSH_EVERY_EVENTS: usize = 64;
16pub const FLUSH_EVERY: Duration = Duration::from_secs(2);
17
18/// Mirrors `Event::SkillExecuted` without coupling mur-common to the
19/// runtime's `Event` enum.
20#[derive(Debug, Clone)]
21pub struct StatsEvent {
22    pub skill_name: String,
23    pub skill_version: String,
24    pub manifest_digest: String,
25    pub success: bool,
26    pub failure: bool,
27    pub now: DateTime<Utc>,
28}
29
30#[derive(Debug, Default)]
31struct Delta {
32    usage: u64,
33    success: u64,
34    failure: u64,
35    resolution_misses: u64,
36    last_used_at: Option<DateTime<Utc>>,
37    last_success_at: Option<DateTime<Utc>>,
38    first_success_seen: Option<DateTime<Utc>>,
39    manifest_digest: String,
40    skill_version: String,
41}
42
43pub struct StatsAggregator {
44    #[allow(dead_code)]
45    mur_home: PathBuf,
46    #[allow(dead_code)]
47    deltas: Arc<Mutex<HashMap<String, Delta>>>,
48}
49
50impl StatsAggregator {
51    /// Spawn the background flush task. Returns the handle so callers can
52    /// hold a reference while the task runs. The task exits when `rx` is
53    /// closed (all senders dropped).
54    pub fn spawn(mur_home: PathBuf, mut rx: mpsc::Receiver<StatsEvent>) -> Self {
55        let deltas: Arc<Mutex<HashMap<String, Delta>>> = Arc::default();
56        let deltas_clone = Arc::clone(&deltas);
57        let mur_home_clone = mur_home.clone();
58
59        tokio::spawn(async move {
60            let mut tick = tokio::time::interval(FLUSH_EVERY);
61            let mut event_budget = FLUSH_EVERY_EVENTS;
62            loop {
63                tokio::select! {
64                    _ = tick.tick() => {
65                        flush(&mur_home_clone, &deltas_clone).await;
66                    }
67                    Some(ev) = rx.recv() => {
68                        merge_one(&deltas_clone, ev).await;
69                        event_budget = event_budget.saturating_sub(1);
70                        if event_budget == 0 {
71                            flush(&mur_home_clone, &deltas_clone).await;
72                            event_budget = FLUSH_EVERY_EVENTS;
73                        }
74                    }
75                    else => break, // channel closed — drain + exit
76                }
77            }
78            flush(&mur_home_clone, &deltas_clone).await;
79        });
80
81        Self { mur_home, deltas }
82    }
83}
84
85async fn merge_one(deltas: &Mutex<HashMap<String, Delta>>, ev: StatsEvent) {
86    let mut map = deltas.lock().await;
87    let d = map.entry(ev.skill_name.clone()).or_default();
88    d.usage += 1;
89    if ev.success {
90        d.success += 1;
91        d.last_success_at = Some(ev.now);
92        if d.first_success_seen.is_none() {
93            d.first_success_seen = Some(ev.now);
94        }
95    } else if ev.failure {
96        d.failure += 1;
97    }
98    // For NotEvaluated, count as usage only (no success/failure increment)
99    d.last_used_at = Some(ev.now);
100    if d.manifest_digest.is_empty() {
101        d.manifest_digest = ev.manifest_digest;
102        d.skill_version = ev.skill_version;
103    }
104}
105
106async fn flush(mur_home: &std::path::Path, deltas: &Mutex<HashMap<String, Delta>>) {
107    let map = {
108        let mut guard = deltas.lock().await;
109        std::mem::take(&mut *guard)
110    };
111
112    for (skill_name, delta) in map {
113        if delta.usage == 0 {
114            continue;
115        }
116        let path = SkillStats::path(mur_home, &skill_name);
117        let default = || {
118            SkillStats::new(
119                &skill_name,
120                &delta.skill_version,
121                &delta.manifest_digest,
122                Utc::now(),
123            )
124        };
125        let _ = SkillStats::merge_in_place(&path, default, |s| {
126            // If manifest changed, reset counters but preserve pinned + state
127            if s.is_stale(&delta.manifest_digest) {
128                s.reset_for_new_manifest(&delta.skill_version, &delta.manifest_digest, Utc::now());
129            }
130            s.usage_count += delta.usage;
131            s.success_count += delta.success;
132            s.failure_count += delta.failure;
133            s.resolution_misses += delta.resolution_misses;
134            if let Some(t) = delta.last_used_at {
135                s.last_used_at = Some(match s.last_used_at {
136                    Some(existing) => existing.max(t),
137                    None => t,
138                });
139            }
140            if let Some(t) = delta.last_success_at {
141                s.last_success_at = Some(match s.last_success_at {
142                    Some(existing) => existing.max(t),
143                    None => t,
144                });
145            }
146            if let Some(t) = delta.first_success_seen {
147                s.first_successful_use_at = Some(match s.first_successful_use_at {
148                    Some(existing) => existing.min(t),
149                    None => t,
150                });
151            }
152            Ok(())
153        });
154    }
155}