Skip to main content

mur_common/skill/
event_log.rs

1//! Per-skill append-only event log (`~/.mur/skills/<name>/events.jsonl`).
2//! Each line is a JSON-serialized `SkillEvent`. Used by fleet-sync for
3//! set-union merge of evolved usage state across devices.
4//!
5//! Also provides manifest conflict resolution via Last-Writer-Wins (LWW)
6//! for fleet-sync: when two devices have divergent manifests, the one
7//! with the later `updated_at` timestamp wins.
8
9use crate::skill::manifest::Skill;
10use crate::skill::stats::SkillStats;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashSet;
15use std::io::Write;
16use std::path::{Path, PathBuf};
17
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
19#[serde(tag = "kind", rename_all = "snake_case")]
20pub enum SkillEvent {
21    Retrieval {
22        ts: DateTime<Utc>,
23        device_id: String,
24    },
25    Execution {
26        ts: DateTime<Utc>,
27        device_id: String,
28        /// "success" | "failure"
29        outcome: String,
30        #[serde(default, skip_serializing_if = "Option::is_none")]
31        error: Option<String>,
32        #[serde(default, skip_serializing_if = "Option::is_none")]
33        step: Option<String>,
34        // ── Run-ledger enrichment (workflow-engine v2 P2; all default so
35        //    existing events.jsonl lines keep parsing and fleet-sync's
36        //    dedup_key (ts+kind+device) is unaffected) ──
37        #[serde(default, skip_serializing_if = "Option::is_none")]
38        duration_ms: Option<u64>,
39        #[serde(default, skip_serializing_if = "Option::is_none")]
40        exit_code: Option<i32>,
41        /// "workflow" (the skill is broken) | "env" (network/credentials/…).
42        /// The Broken fast-path (P4) only triggers on "workflow" with
43        /// confidence ≥ threshold.
44        #[serde(default, skip_serializing_if = "Option::is_none")]
45        env_class: Option<String>,
46        #[serde(default, skip_serializing_if = "Option::is_none")]
47        confidence: Option<f64>,
48        /// "manual" | "schedule" | "agent"
49        #[serde(default, skip_serializing_if = "Option::is_none")]
50        trigger: Option<String>,
51    },
52    Dismissed {
53        ts: DateTime<Utc>,
54        device_id: String,
55    },
56    Superseded {
57        ts: DateTime<Utc>,
58        device_id: String,
59    },
60}
61
62impl SkillEvent {
63    /// Stable key for set-dedup: timestamp-micros + kind + device.
64    pub fn dedup_key(&self) -> String {
65        match self {
66            Self::Retrieval { ts, device_id } => {
67                format!("{}:retrieval:{}", ts.timestamp_micros(), device_id)
68            }
69            Self::Execution { ts, device_id, .. } => {
70                format!("{}:execution:{}", ts.timestamp_micros(), device_id)
71            }
72            Self::Dismissed { ts, device_id } => {
73                format!("{}:dismissed:{}", ts.timestamp_micros(), device_id)
74            }
75            Self::Superseded { ts, device_id } => {
76                format!("{}:superseded:{}", ts.timestamp_micros(), device_id)
77            }
78        }
79    }
80
81    pub fn ts(&self) -> DateTime<Utc> {
82        match self {
83            Self::Retrieval { ts, .. }
84            | Self::Execution { ts, .. }
85            | Self::Dismissed { ts, .. }
86            | Self::Superseded { ts, .. } => *ts,
87        }
88    }
89}
90
91pub fn event_log_path(mur_home: &Path, skill_name: &str) -> PathBuf {
92    mur_home
93        .join("skills")
94        .join(skill_name)
95        .join("events.jsonl")
96}
97
98pub fn append_event(path: &Path, event: &SkillEvent) -> Result<()> {
99    if let Some(parent) = path.parent() {
100        std::fs::create_dir_all(parent)?;
101    }
102    let line = serde_json::to_string(event)?;
103    let mut f = std::fs::OpenOptions::new()
104        .create(true)
105        .append(true)
106        .open(path)?;
107    writeln!(f, "{line}")?;
108    Ok(())
109}
110
111pub fn read_events(path: &Path) -> Result<Vec<SkillEvent>> {
112    match std::fs::read_to_string(path) {
113        Ok(s) => parse_events_jsonl(&s),
114        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
115        Err(e) => Err(anyhow::Error::from(e)),
116    }
117}
118
119pub fn parse_events_jsonl(raw: &str) -> Result<Vec<SkillEvent>> {
120    raw.lines()
121        .filter(|l| !l.is_empty())
122        .map(|l| serde_json::from_str(l).map_err(anyhow::Error::from))
123        .collect()
124}
125
126/// Set-union of two event logs, deduped by `dedup_key`, sorted by timestamp.
127/// Commutative and idempotent.
128pub fn union_events(mut a: Vec<SkillEvent>, b: Vec<SkillEvent>) -> Vec<SkillEvent> {
129    let seen: HashSet<String> = a.iter().map(|e| e.dedup_key()).collect();
130    for event in b {
131        if !seen.contains(&event.dedup_key()) {
132            a.push(event);
133        }
134    }
135    a.sort_by_key(|e| e.ts());
136    a
137}
138
139/// Apply a slice of new events to an existing `SkillStats`, updating only
140/// usage counters. Lifecycle state, pinned, and anchor_confidence are
141/// preserved — they are managed by the lifecycle module, not by events.
142pub fn apply_new_events_to_stats(stats: &mut SkillStats, new_events: &[SkillEvent]) {
143    for event in new_events {
144        match event {
145            SkillEvent::Retrieval { ts, .. } => {
146                stats.usage_count += 1;
147                stats.last_used_at = Some(stats.last_used_at.map(|e| e.max(*ts)).unwrap_or(*ts));
148            }
149            SkillEvent::Execution { ts, outcome, .. } => {
150                stats.usage_count += 1;
151                stats.last_used_at = Some(stats.last_used_at.map(|e| e.max(*ts)).unwrap_or(*ts));
152                if outcome == "success" {
153                    stats.success_count += 1;
154                    stats.last_success_at =
155                        Some(stats.last_success_at.map(|e| e.max(*ts)).unwrap_or(*ts));
156                    if stats.first_successful_use_at.is_none() {
157                        stats.first_successful_use_at = Some(*ts);
158                    }
159                } else {
160                    stats.failure_count += 1;
161                }
162            }
163            SkillEvent::Dismissed { .. } | SkillEvent::Superseded { .. } => {}
164        }
165    }
166}
167
168/// Outcome of one workflow/skill run, recorded into the per-skill ledger.
169pub struct RunRecord<'a> {
170    /// true = success
171    pub success: bool,
172    pub duration_ms: Option<u64>,
173    pub exit_code: Option<i32>,
174    /// stderr (or combined output) of the failing step; used to classify
175    /// workflow-vs-environment failure. Ignored on success.
176    pub stderr: Option<&'a str>,
177    /// Step id/description that failed, if any.
178    pub failed_step: Option<String>,
179    /// "manual" | "schedule" | "agent"
180    pub trigger: &'a str,
181    /// Explicit user override of the env classification
182    /// (`mur run --env-class workflow|env`).
183    pub env_class_override: Option<&'a str>,
184}
185
186/// Append one enriched Execution event for a completed run — the run-ledger
187/// write path (workflow-engine v2 P2). Returns the event written.
188pub fn record_run(
189    mur_home: &Path,
190    skill_name: &str,
191    device_id: &str,
192    rec: &RunRecord<'_>,
193) -> Result<SkillEvent> {
194    let (env_class, confidence) = if rec.success {
195        (None, None)
196    } else if let Some(forced) = rec.env_class_override {
197        (Some(forced.to_string()), Some(1.0))
198    } else {
199        let c = crate::skill::env_class::classify_failure(rec.stderr.unwrap_or(""));
200        (Some(c.class.to_string()), Some(c.confidence))
201    };
202
203    let event = SkillEvent::Execution {
204        ts: Utc::now(),
205        device_id: device_id.to_string(),
206        outcome: if rec.success { "success" } else { "failure" }.to_string(),
207        error: (!rec.success)
208            .then(|| rec.stderr.map(|s| s.chars().take(500).collect()))
209            .flatten(),
210        step: rec.failed_step.clone(),
211        duration_ms: rec.duration_ms,
212        exit_code: rec.exit_code,
213        env_class,
214        confidence,
215        trigger: Some(rec.trigger.to_string()),
216    };
217    append_event(&event_log_path(mur_home, skill_name), &event)?;
218    Ok(event)
219}
220
221/// Resolve manifest conflict via Last-Writer-Wins (LWW).
222/// Returns the winning skill and the reason (local_wins, remote_wins, or force_local).
223pub fn resolve_manifest_lww(
224    local: Skill,
225    remote: Skill,
226    force_local: bool,
227) -> (Skill, &'static str) {
228    if force_local {
229        return (local, "force_local");
230    }
231    if remote.manifest.updated_at > local.manifest.updated_at {
232        (remote, "remote_newer")
233    } else {
234        (local, "local_newer_or_equal")
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use tempfile::tempdir;
242
243    #[test]
244    fn record_run_classifies_and_appends() {
245        let tmp = tempdir().unwrap();
246        let ev = record_run(
247            tmp.path(),
248            "deploy-api",
249            "dev-a",
250            &RunRecord {
251                success: false,
252                duration_ms: Some(1200),
253                exit_code: Some(7),
254                stderr: Some("curl: (7) Connection refused"),
255                failed_step: Some("health-check".into()),
256                trigger: "manual",
257                env_class_override: None,
258            },
259        )
260        .unwrap();
261        match &ev {
262            SkillEvent::Execution {
263                env_class, trigger, ..
264            } => {
265                assert_eq!(env_class.as_deref(), Some("env"));
266                assert_eq!(trigger.as_deref(), Some("manual"));
267            }
268            _ => panic!("wrong kind"),
269        }
270        let events = read_events(&event_log_path(tmp.path(), "deploy-api")).unwrap();
271        assert_eq!(events.len(), 1);
272
273        // Success run records no env_class.
274        let ev2 = record_run(
275            tmp.path(),
276            "deploy-api",
277            "dev-a",
278            &RunRecord {
279                success: true,
280                duration_ms: Some(900),
281                exit_code: Some(0),
282                stderr: None,
283                failed_step: None,
284                trigger: "schedule",
285                env_class_override: None,
286            },
287        )
288        .unwrap();
289        match &ev2 {
290            SkillEvent::Execution {
291                env_class, outcome, ..
292            } => {
293                assert!(env_class.is_none());
294                assert_eq!(outcome, "success");
295            }
296            _ => panic!("wrong kind"),
297        }
298    }
299
300    #[test]
301    fn legacy_execution_line_parses_and_enriched_roundtrips() {
302        // Pre-P2 line without the run-ledger fields must keep parsing.
303        let legacy = r#"{"kind":"execution","ts":"2026-05-30T00:00:00Z","device_id":"d","outcome":"success"}"#;
304        let ev: SkillEvent = serde_json::from_str(legacy).unwrap();
305        match &ev {
306            SkillEvent::Execution {
307                duration_ms,
308                env_class,
309                ..
310            } => {
311                assert!(duration_ms.is_none());
312                assert!(env_class.is_none());
313            }
314            _ => panic!("wrong kind"),
315        }
316
317        // Enriched event round-trips.
318        let enriched = SkillEvent::Execution {
319            ts: chrono::DateTime::from_timestamp(1_748_000_000, 0).unwrap(),
320            device_id: "d".into(),
321            outcome: "failure".into(),
322            error: Some("boom".into()),
323            step: Some("deploy".into()),
324            duration_ms: Some(8421),
325            exit_code: Some(1),
326            env_class: Some("workflow".into()),
327            confidence: Some(0.6),
328            trigger: Some("manual".into()),
329        };
330        let line = serde_json::to_string(&enriched).unwrap();
331        let back: SkillEvent = serde_json::from_str(&line).unwrap();
332        assert_eq!(back, enriched);
333        // dedup_key shape unchanged (ts+kind+device) — fleet-sync compatible.
334        assert!(enriched.dedup_key().ends_with(":execution:d"));
335    }
336
337    fn device() -> String {
338        "dev-a".into()
339    }
340
341    fn retrieval(ts_offset_secs: i64) -> SkillEvent {
342        let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
343        SkillEvent::Retrieval {
344            ts: base,
345            device_id: device(),
346        }
347    }
348
349    fn exec_ok(ts_offset_secs: i64) -> SkillEvent {
350        let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
351        SkillEvent::Execution {
352            ts: base,
353            device_id: device(),
354            outcome: "success".into(),
355            error: None,
356            step: None,
357            duration_ms: None,
358            exit_code: None,
359            env_class: None,
360            confidence: None,
361            trigger: None,
362        }
363    }
364
365    fn exec_fail(ts_offset_secs: i64) -> SkillEvent {
366        let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
367        SkillEvent::Execution {
368            ts: base,
369            device_id: device(),
370            outcome: "failure".into(),
371            error: Some("oops".into()),
372            step: None,
373            duration_ms: None,
374            exit_code: None,
375            env_class: None,
376            confidence: None,
377            trigger: None,
378        }
379    }
380
381    #[test]
382    fn append_then_read_roundtrip() {
383        let dir = tempdir().unwrap();
384        let path = dir.path().join("events.jsonl");
385        append_event(&path, &retrieval(0)).unwrap();
386        append_event(&path, &exec_ok(1)).unwrap();
387        let events = read_events(&path).unwrap();
388        assert_eq!(events.len(), 2);
389    }
390
391    #[test]
392    fn union_deduplicates_identical_events() {
393        let a = vec![retrieval(0), exec_ok(1)];
394        let b = vec![exec_ok(1), exec_fail(2)];
395        let merged = union_events(a, b);
396        assert_eq!(merged.len(), 3); // dedup exec_ok(1)
397    }
398
399    #[test]
400    fn union_is_commutative() {
401        let a = vec![retrieval(0), exec_ok(1)];
402        let b = vec![exec_ok(1), exec_fail(2)];
403        let ab = union_events(a.clone(), b.clone());
404        let ba = union_events(b, a);
405        let ab_keys: Vec<_> = ab.iter().map(|e| e.dedup_key()).collect();
406        let ba_keys: Vec<_> = ba.iter().map(|e| e.dedup_key()).collect();
407        assert_eq!(ab_keys, ba_keys);
408    }
409
410    #[test]
411    fn apply_new_events_updates_counters() {
412        use crate::skill::stats::SkillStats;
413        use chrono::Utc;
414        let mut stats = SkillStats::new("test-skill", "1.0.0", "digest", Utc::now());
415        let events = vec![exec_ok(1), exec_fail(2), retrieval(3)];
416        apply_new_events_to_stats(&mut stats, &events);
417        assert_eq!(stats.usage_count, 3);
418        assert_eq!(stats.success_count, 1);
419        assert_eq!(stats.failure_count, 1);
420        assert!(stats.last_success_at.is_some());
421        assert!(stats.first_successful_use_at.is_some());
422    }
423
424    #[test]
425    fn read_events_returns_empty_for_missing_file() {
426        let dir = tempdir().unwrap();
427        let events = read_events(&dir.path().join("missing.jsonl")).unwrap();
428        assert!(events.is_empty());
429    }
430
431    #[test]
432    fn parse_events_jsonl_handles_multiline() {
433        let raw = "{\"kind\":\"retrieval\",\"ts\":\"2026-05-30T00:00:00Z\",\"device_id\":\"d\"}\n\
434                   {\"kind\":\"retrieval\",\"ts\":\"2026-05-30T00:01:00Z\",\"device_id\":\"d\"}\n";
435        let events = parse_events_jsonl(raw).unwrap();
436        assert_eq!(events.len(), 2);
437    }
438
439    #[test]
440    fn manifest_lww_prefers_remote_when_newer() {
441        use crate::skill::manifest::{Content, Skill, SkillManifest};
442        use crate::skill::types::Category;
443        let t1 = chrono::DateTime::from_timestamp(1_000, 0).unwrap();
444        let t2 = chrono::DateTime::from_timestamp(2_000, 0).unwrap();
445
446        let local = Skill {
447            manifest: SkillManifest {
448                name: "test".into(),
449                version: "1.0".into(),
450                publisher: "p".into(),
451                description: "d".into(),
452                category: Category::Context,
453                provenance: Default::default(),
454                hosts: vec![],
455                content: Content {
456                    r#abstract: "a".into(),
457                    context: Some("c".into()),
458                    procedure: None,
459                    command: None,
460                    note: None,
461                },
462                requires: vec![],
463                tags: vec![],
464                triggers: vec![],
465                priority: Default::default(),
466                evolution_log: vec![],
467                transfer_chain: vec![],
468                mcp_requirements: vec![],
469                updated_at: t1,
470            },
471            content_sha256: Some("hash".into()),
472            trust_level: Default::default(),
473            capabilities_declared: vec![],
474            publisher_signature: None,
475        };
476
477        let mut remote = local.clone();
478        remote.manifest.updated_at = t2;
479
480        let (winner, reason) = resolve_manifest_lww(local, remote, false);
481        assert_eq!(reason, "remote_newer");
482        assert_eq!(winner.manifest.updated_at, t2);
483    }
484
485    #[test]
486    fn manifest_lww_respects_force_local() {
487        use crate::skill::manifest::{Content, Skill, SkillManifest};
488        use crate::skill::types::Category;
489        let t1 = chrono::DateTime::from_timestamp(1_000, 0).unwrap();
490        let t2 = chrono::DateTime::from_timestamp(2_000, 0).unwrap();
491
492        let local = Skill {
493            manifest: SkillManifest {
494                name: "test".into(),
495                version: "1.0".into(),
496                publisher: "p".into(),
497                description: "d".into(),
498                category: Category::Context,
499                provenance: Default::default(),
500                hosts: vec![],
501                content: Content {
502                    r#abstract: "a".into(),
503                    context: Some("c".into()),
504                    procedure: None,
505                    command: None,
506                    note: None,
507                },
508                requires: vec![],
509                tags: vec![],
510                triggers: vec![],
511                priority: Default::default(),
512                evolution_log: vec![],
513                transfer_chain: vec![],
514                mcp_requirements: vec![],
515                updated_at: t1,
516            },
517            content_sha256: Some("hash".into()),
518            trust_level: Default::default(),
519            capabilities_declared: vec![],
520            publisher_signature: None,
521        };
522
523        let mut remote = local.clone();
524        remote.manifest.updated_at = t2;
525
526        let (winner, reason) = resolve_manifest_lww(local.clone(), remote, true);
527        assert_eq!(reason, "force_local");
528        assert_eq!(winner.manifest.updated_at, t1);
529    }
530}