mur_common/skill/
aggregator.rs1use chrono::{DateTime, Utc};
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::{Mutex, mpsc};
12
13use crate::skill::stats::SkillStats;
14
15pub const FLUSH_EVERY_EVENTS: usize = 64;
16pub const FLUSH_EVERY: Duration = Duration::from_secs(2);
17
18#[derive(Debug, Clone)]
21pub struct StatsEvent {
22 pub skill_name: String,
23 pub skill_version: String,
24 pub manifest_digest: String,
25 pub success: bool,
26 pub failure: bool,
27 pub now: DateTime<Utc>,
28}
29
30#[derive(Debug, Default)]
31struct Delta {
32 usage: u64,
33 success: u64,
34 failure: u64,
35 resolution_misses: u64,
36 last_used_at: Option<DateTime<Utc>>,
37 last_success_at: Option<DateTime<Utc>>,
38 first_success_seen: Option<DateTime<Utc>>,
39 manifest_digest: String,
40 skill_version: String,
41}
42
43pub struct StatsAggregator {
44 #[allow(dead_code)]
45 mur_home: PathBuf,
46 #[allow(dead_code)]
47 deltas: Arc<Mutex<HashMap<String, Delta>>>,
48}
49
50impl StatsAggregator {
51 pub fn spawn(mur_home: PathBuf, mut rx: mpsc::Receiver<StatsEvent>) -> Self {
55 let deltas: Arc<Mutex<HashMap<String, Delta>>> = Arc::default();
56 let deltas_clone = Arc::clone(&deltas);
57 let mur_home_clone = mur_home.clone();
58
59 tokio::spawn(async move {
60 let mut tick = tokio::time::interval(FLUSH_EVERY);
61 let mut event_budget = FLUSH_EVERY_EVENTS;
62 loop {
63 tokio::select! {
64 _ = tick.tick() => {
65 flush(&mur_home_clone, &deltas_clone).await;
66 }
67 Some(ev) = rx.recv() => {
68 merge_one(&deltas_clone, ev).await;
69 event_budget = event_budget.saturating_sub(1);
70 if event_budget == 0 {
71 flush(&mur_home_clone, &deltas_clone).await;
72 event_budget = FLUSH_EVERY_EVENTS;
73 }
74 }
75 else => break, }
77 }
78 flush(&mur_home_clone, &deltas_clone).await;
79 });
80
81 Self { mur_home, deltas }
82 }
83}
84
85async fn merge_one(deltas: &Mutex<HashMap<String, Delta>>, ev: StatsEvent) {
86 let mut map = deltas.lock().await;
87 let d = map.entry(ev.skill_name.clone()).or_default();
88 d.usage += 1;
89 if ev.success {
90 d.success += 1;
91 d.last_success_at = Some(ev.now);
92 if d.first_success_seen.is_none() {
93 d.first_success_seen = Some(ev.now);
94 }
95 } else if ev.failure {
96 d.failure += 1;
97 }
98 d.last_used_at = Some(ev.now);
100 if d.manifest_digest.is_empty() {
101 d.manifest_digest = ev.manifest_digest;
102 d.skill_version = ev.skill_version;
103 }
104}
105
106async fn flush(mur_home: &std::path::Path, deltas: &Mutex<HashMap<String, Delta>>) {
107 let map = {
108 let mut guard = deltas.lock().await;
109 std::mem::take(&mut *guard)
110 };
111
112 for (skill_name, delta) in map {
113 if delta.usage == 0 {
114 continue;
115 }
116 let path = SkillStats::path(mur_home, &skill_name);
117 let default = || {
118 SkillStats::new(
119 &skill_name,
120 &delta.skill_version,
121 &delta.manifest_digest,
122 Utc::now(),
123 )
124 };
125 let _ = SkillStats::merge_in_place(&path, default, |s| {
126 if s.is_stale(&delta.manifest_digest) {
128 s.reset_for_new_manifest(&delta.skill_version, &delta.manifest_digest, Utc::now());
129 }
130 s.usage_count += delta.usage;
131 s.success_count += delta.success;
132 s.failure_count += delta.failure;
133 s.resolution_misses += delta.resolution_misses;
134 if let Some(t) = delta.last_used_at {
135 s.last_used_at = Some(match s.last_used_at {
136 Some(existing) => existing.max(t),
137 None => t,
138 });
139 }
140 if let Some(t) = delta.last_success_at {
141 s.last_success_at = Some(match s.last_success_at {
142 Some(existing) => existing.max(t),
143 None => t,
144 });
145 }
146 if let Some(t) = delta.first_success_seen {
147 s.first_successful_use_at = Some(match s.first_successful_use_at {
148 Some(existing) => existing.min(t),
149 None => t,
150 });
151 }
152 Ok(())
153 });
154 }
155}