Skip to main content

mur_common/skill/
event_log.rs

1//! Per-skill append-only event log (`~/.mur/skills/<name>/events.jsonl`).
2//! Each line is a JSON-serialized `SkillEvent`. Used by fleet-sync for
3//! set-union merge of evolved usage state across devices.
4//!
5//! Also provides manifest conflict resolution via Last-Writer-Wins (LWW)
6//! for fleet-sync: when two devices have divergent manifests, the one
7//! with the later `updated_at` timestamp wins.
8
9use crate::skill::manifest::Skill;
10use crate::skill::stats::SkillStats;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashSet;
15use std::io::Write;
16use std::path::{Path, PathBuf};
17
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
19#[serde(tag = "kind", rename_all = "snake_case")]
20pub enum SkillEvent {
21    Retrieval {
22        ts: DateTime<Utc>,
23        device_id: String,
24    },
25    Execution {
26        ts: DateTime<Utc>,
27        device_id: String,
28        /// "success" | "failure"
29        outcome: String,
30        #[serde(default, skip_serializing_if = "Option::is_none")]
31        error: Option<String>,
32        #[serde(default, skip_serializing_if = "Option::is_none")]
33        step: Option<String>,
34    },
35    Dismissed {
36        ts: DateTime<Utc>,
37        device_id: String,
38    },
39    Superseded {
40        ts: DateTime<Utc>,
41        device_id: String,
42    },
43}
44
45impl SkillEvent {
46    /// Stable key for set-dedup: timestamp-micros + kind + device.
47    pub fn dedup_key(&self) -> String {
48        match self {
49            Self::Retrieval { ts, device_id } => {
50                format!("{}:retrieval:{}", ts.timestamp_micros(), device_id)
51            }
52            Self::Execution { ts, device_id, .. } => {
53                format!("{}:execution:{}", ts.timestamp_micros(), device_id)
54            }
55            Self::Dismissed { ts, device_id } => {
56                format!("{}:dismissed:{}", ts.timestamp_micros(), device_id)
57            }
58            Self::Superseded { ts, device_id } => {
59                format!("{}:superseded:{}", ts.timestamp_micros(), device_id)
60            }
61        }
62    }
63
64    pub fn ts(&self) -> DateTime<Utc> {
65        match self {
66            Self::Retrieval { ts, .. }
67            | Self::Execution { ts, .. }
68            | Self::Dismissed { ts, .. }
69            | Self::Superseded { ts, .. } => *ts,
70        }
71    }
72}
73
74pub fn event_log_path(mur_home: &Path, skill_name: &str) -> PathBuf {
75    mur_home
76        .join("skills")
77        .join(skill_name)
78        .join("events.jsonl")
79}
80
81pub fn append_event(path: &Path, event: &SkillEvent) -> Result<()> {
82    if let Some(parent) = path.parent() {
83        std::fs::create_dir_all(parent)?;
84    }
85    let line = serde_json::to_string(event)?;
86    let mut f = std::fs::OpenOptions::new()
87        .create(true)
88        .append(true)
89        .open(path)?;
90    writeln!(f, "{line}")?;
91    Ok(())
92}
93
94pub fn read_events(path: &Path) -> Result<Vec<SkillEvent>> {
95    match std::fs::read_to_string(path) {
96        Ok(s) => parse_events_jsonl(&s),
97        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
98        Err(e) => Err(anyhow::Error::from(e)),
99    }
100}
101
102pub fn parse_events_jsonl(raw: &str) -> Result<Vec<SkillEvent>> {
103    raw.lines()
104        .filter(|l| !l.is_empty())
105        .map(|l| serde_json::from_str(l).map_err(anyhow::Error::from))
106        .collect()
107}
108
109/// Set-union of two event logs, deduped by `dedup_key`, sorted by timestamp.
110/// Commutative and idempotent.
111pub fn union_events(mut a: Vec<SkillEvent>, b: Vec<SkillEvent>) -> Vec<SkillEvent> {
112    let seen: HashSet<String> = a.iter().map(|e| e.dedup_key()).collect();
113    for event in b {
114        if !seen.contains(&event.dedup_key()) {
115            a.push(event);
116        }
117    }
118    a.sort_by_key(|e| e.ts());
119    a
120}
121
122/// Apply a slice of new events to an existing `SkillStats`, updating only
123/// usage counters. Lifecycle state, pinned, and anchor_confidence are
124/// preserved — they are managed by the lifecycle module, not by events.
125pub fn apply_new_events_to_stats(stats: &mut SkillStats, new_events: &[SkillEvent]) {
126    for event in new_events {
127        match event {
128            SkillEvent::Retrieval { ts, .. } => {
129                stats.usage_count += 1;
130                stats.last_used_at = Some(stats.last_used_at.map(|e| e.max(*ts)).unwrap_or(*ts));
131            }
132            SkillEvent::Execution { ts, outcome, .. } => {
133                stats.usage_count += 1;
134                stats.last_used_at = Some(stats.last_used_at.map(|e| e.max(*ts)).unwrap_or(*ts));
135                if outcome == "success" {
136                    stats.success_count += 1;
137                    stats.last_success_at =
138                        Some(stats.last_success_at.map(|e| e.max(*ts)).unwrap_or(*ts));
139                    if stats.first_successful_use_at.is_none() {
140                        stats.first_successful_use_at = Some(*ts);
141                    }
142                } else {
143                    stats.failure_count += 1;
144                }
145            }
146            SkillEvent::Dismissed { .. } | SkillEvent::Superseded { .. } => {}
147        }
148    }
149}
150
151/// Resolve manifest conflict via Last-Writer-Wins (LWW).
152/// Returns the winning skill and the reason (local_wins, remote_wins, or force_local).
153pub fn resolve_manifest_lww(
154    local: Skill,
155    remote: Skill,
156    force_local: bool,
157) -> (Skill, &'static str) {
158    if force_local {
159        return (local, "force_local");
160    }
161    if remote.manifest.updated_at > local.manifest.updated_at {
162        (remote, "remote_newer")
163    } else {
164        (local, "local_newer_or_equal")
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use tempfile::tempdir;
172
173    fn device() -> String {
174        "dev-a".into()
175    }
176
177    fn retrieval(ts_offset_secs: i64) -> SkillEvent {
178        let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
179        SkillEvent::Retrieval {
180            ts: base,
181            device_id: device(),
182        }
183    }
184
185    fn exec_ok(ts_offset_secs: i64) -> SkillEvent {
186        let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
187        SkillEvent::Execution {
188            ts: base,
189            device_id: device(),
190            outcome: "success".into(),
191            error: None,
192            step: None,
193        }
194    }
195
196    fn exec_fail(ts_offset_secs: i64) -> SkillEvent {
197        let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
198        SkillEvent::Execution {
199            ts: base,
200            device_id: device(),
201            outcome: "failure".into(),
202            error: Some("oops".into()),
203            step: None,
204        }
205    }
206
207    #[test]
208    fn append_then_read_roundtrip() {
209        let dir = tempdir().unwrap();
210        let path = dir.path().join("events.jsonl");
211        append_event(&path, &retrieval(0)).unwrap();
212        append_event(&path, &exec_ok(1)).unwrap();
213        let events = read_events(&path).unwrap();
214        assert_eq!(events.len(), 2);
215    }
216
217    #[test]
218    fn union_deduplicates_identical_events() {
219        let a = vec![retrieval(0), exec_ok(1)];
220        let b = vec![exec_ok(1), exec_fail(2)];
221        let merged = union_events(a, b);
222        assert_eq!(merged.len(), 3); // dedup exec_ok(1)
223    }
224
225    #[test]
226    fn union_is_commutative() {
227        let a = vec![retrieval(0), exec_ok(1)];
228        let b = vec![exec_ok(1), exec_fail(2)];
229        let ab = union_events(a.clone(), b.clone());
230        let ba = union_events(b, a);
231        let ab_keys: Vec<_> = ab.iter().map(|e| e.dedup_key()).collect();
232        let ba_keys: Vec<_> = ba.iter().map(|e| e.dedup_key()).collect();
233        assert_eq!(ab_keys, ba_keys);
234    }
235
236    #[test]
237    fn apply_new_events_updates_counters() {
238        use crate::skill::stats::SkillStats;
239        use chrono::Utc;
240        let mut stats = SkillStats::new("test-skill", "1.0.0", "digest", Utc::now());
241        let events = vec![exec_ok(1), exec_fail(2), retrieval(3)];
242        apply_new_events_to_stats(&mut stats, &events);
243        assert_eq!(stats.usage_count, 3);
244        assert_eq!(stats.success_count, 1);
245        assert_eq!(stats.failure_count, 1);
246        assert!(stats.last_success_at.is_some());
247        assert!(stats.first_successful_use_at.is_some());
248    }
249
250    #[test]
251    fn read_events_returns_empty_for_missing_file() {
252        let dir = tempdir().unwrap();
253        let events = read_events(&dir.path().join("missing.jsonl")).unwrap();
254        assert!(events.is_empty());
255    }
256
257    #[test]
258    fn parse_events_jsonl_handles_multiline() {
259        let raw = "{\"kind\":\"retrieval\",\"ts\":\"2026-05-30T00:00:00Z\",\"device_id\":\"d\"}\n\
260                   {\"kind\":\"retrieval\",\"ts\":\"2026-05-30T00:01:00Z\",\"device_id\":\"d\"}\n";
261        let events = parse_events_jsonl(raw).unwrap();
262        assert_eq!(events.len(), 2);
263    }
264
265    #[test]
266    fn manifest_lww_prefers_remote_when_newer() {
267        use crate::skill::manifest::{Content, Skill, SkillManifest};
268        use crate::skill::types::Category;
269        let t1 = chrono::DateTime::from_timestamp(1_000, 0).unwrap();
270        let t2 = chrono::DateTime::from_timestamp(2_000, 0).unwrap();
271
272        let mut local = Skill {
273            manifest: SkillManifest {
274                name: "test".into(),
275                version: "1.0".into(),
276                publisher: "p".into(),
277                description: "d".into(),
278                category: Category::Context,
279                provenance: Default::default(),
280                hosts: vec![],
281                content: Content {
282                    r#abstract: "a".into(),
283                    context: Some("c".into()),
284                    procedure: None,
285                    command: None,
286                    note: None,
287                },
288                requires: vec![],
289                tags: vec![],
290                triggers: vec![],
291                priority: Default::default(),
292                evolution_log: vec![],
293                transfer_chain: vec![],
294                mcp_requirements: vec![],
295                updated_at: t1,
296            },
297            content_sha256: Some("hash".into()),
298            trust_level: Default::default(),
299            capabilities_declared: vec![],
300            publisher_signature: None,
301        };
302
303        let mut remote = local.clone();
304        remote.manifest.updated_at = t2;
305
306        let (winner, reason) = resolve_manifest_lww(local, remote, false);
307        assert_eq!(reason, "remote_newer");
308        assert_eq!(winner.manifest.updated_at, t2);
309    }
310
311    #[test]
312    fn manifest_lww_respects_force_local() {
313        use crate::skill::manifest::{Content, Skill, SkillManifest};
314        use crate::skill::types::Category;
315        let t1 = chrono::DateTime::from_timestamp(1_000, 0).unwrap();
316        let t2 = chrono::DateTime::from_timestamp(2_000, 0).unwrap();
317
318        let local = Skill {
319            manifest: SkillManifest {
320                name: "test".into(),
321                version: "1.0".into(),
322                publisher: "p".into(),
323                description: "d".into(),
324                category: Category::Context,
325                provenance: Default::default(),
326                hosts: vec![],
327                content: Content {
328                    r#abstract: "a".into(),
329                    context: Some("c".into()),
330                    procedure: None,
331                    command: None,
332                    note: None,
333                },
334                requires: vec![],
335                tags: vec![],
336                triggers: vec![],
337                priority: Default::default(),
338                evolution_log: vec![],
339                transfer_chain: vec![],
340                mcp_requirements: vec![],
341                updated_at: t1,
342            },
343            content_sha256: Some("hash".into()),
344            trust_level: Default::default(),
345            capabilities_declared: vec![],
346            publisher_signature: None,
347        };
348
349        let mut remote = local.clone();
350        remote.manifest.updated_at = t2;
351
352        let (winner, reason) = resolve_manifest_lww(local.clone(), remote, true);
353        assert_eq!(reason, "force_local");
354        assert_eq!(winner.manifest.updated_at, t1);
355    }
356}