1use crate::skill::manifest::Skill;
10use crate::skill::stats::SkillStats;
11use anyhow::Result;
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashSet;
15use std::io::Write;
16use std::path::{Path, PathBuf};
17
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
19#[serde(tag = "kind", rename_all = "snake_case")]
20pub enum SkillEvent {
21 Retrieval {
22 ts: DateTime<Utc>,
23 device_id: String,
24 },
25 Execution {
26 ts: DateTime<Utc>,
27 device_id: String,
28 outcome: String,
30 #[serde(default, skip_serializing_if = "Option::is_none")]
31 error: Option<String>,
32 #[serde(default, skip_serializing_if = "Option::is_none")]
33 step: Option<String>,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
38 duration_ms: Option<u64>,
39 #[serde(default, skip_serializing_if = "Option::is_none")]
40 exit_code: Option<i32>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
45 env_class: Option<String>,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
47 confidence: Option<f64>,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
50 trigger: Option<String>,
51 },
52 Dismissed {
53 ts: DateTime<Utc>,
54 device_id: String,
55 },
56 Superseded {
57 ts: DateTime<Utc>,
58 device_id: String,
59 },
60}
61
62impl SkillEvent {
63 pub fn dedup_key(&self) -> String {
65 match self {
66 Self::Retrieval { ts, device_id } => {
67 format!("{}:retrieval:{}", ts.timestamp_micros(), device_id)
68 }
69 Self::Execution { ts, device_id, .. } => {
70 format!("{}:execution:{}", ts.timestamp_micros(), device_id)
71 }
72 Self::Dismissed { ts, device_id } => {
73 format!("{}:dismissed:{}", ts.timestamp_micros(), device_id)
74 }
75 Self::Superseded { ts, device_id } => {
76 format!("{}:superseded:{}", ts.timestamp_micros(), device_id)
77 }
78 }
79 }
80
81 pub fn ts(&self) -> DateTime<Utc> {
82 match self {
83 Self::Retrieval { ts, .. }
84 | Self::Execution { ts, .. }
85 | Self::Dismissed { ts, .. }
86 | Self::Superseded { ts, .. } => *ts,
87 }
88 }
89}
90
91pub fn event_log_path(mur_home: &Path, skill_name: &str) -> PathBuf {
92 mur_home
93 .join("skills")
94 .join(skill_name)
95 .join("events.jsonl")
96}
97
98pub fn append_event(path: &Path, event: &SkillEvent) -> Result<()> {
99 if let Some(parent) = path.parent() {
100 std::fs::create_dir_all(parent)?;
101 }
102 let line = serde_json::to_string(event)?;
103 let mut f = std::fs::OpenOptions::new()
104 .create(true)
105 .append(true)
106 .open(path)?;
107 writeln!(f, "{line}")?;
108 Ok(())
109}
110
111pub fn read_events(path: &Path) -> Result<Vec<SkillEvent>> {
112 match std::fs::read_to_string(path) {
113 Ok(s) => parse_events_jsonl(&s),
114 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
115 Err(e) => Err(anyhow::Error::from(e)),
116 }
117}
118
119pub fn parse_events_jsonl(raw: &str) -> Result<Vec<SkillEvent>> {
120 raw.lines()
121 .filter(|l| !l.is_empty())
122 .map(|l| serde_json::from_str(l).map_err(anyhow::Error::from))
123 .collect()
124}
125
126pub fn union_events(mut a: Vec<SkillEvent>, b: Vec<SkillEvent>) -> Vec<SkillEvent> {
129 let seen: HashSet<String> = a.iter().map(|e| e.dedup_key()).collect();
130 for event in b {
131 if !seen.contains(&event.dedup_key()) {
132 a.push(event);
133 }
134 }
135 a.sort_by_key(|e| e.ts());
136 a
137}
138
139pub fn apply_new_events_to_stats(stats: &mut SkillStats, new_events: &[SkillEvent]) {
143 for event in new_events {
144 match event {
145 SkillEvent::Retrieval { ts, .. } => {
146 stats.usage_count += 1;
147 stats.last_used_at = Some(stats.last_used_at.map(|e| e.max(*ts)).unwrap_or(*ts));
148 }
149 SkillEvent::Execution { ts, outcome, .. } => {
150 stats.usage_count += 1;
151 stats.last_used_at = Some(stats.last_used_at.map(|e| e.max(*ts)).unwrap_or(*ts));
152 if outcome == "success" {
153 stats.success_count += 1;
154 stats.last_success_at =
155 Some(stats.last_success_at.map(|e| e.max(*ts)).unwrap_or(*ts));
156 if stats.first_successful_use_at.is_none() {
157 stats.first_successful_use_at = Some(*ts);
158 }
159 } else {
160 stats.failure_count += 1;
161 }
162 }
163 SkillEvent::Dismissed { .. } | SkillEvent::Superseded { .. } => {}
164 }
165 }
166}
167
168pub struct RunRecord<'a> {
170 pub success: bool,
172 pub duration_ms: Option<u64>,
173 pub exit_code: Option<i32>,
174 pub stderr: Option<&'a str>,
177 pub failed_step: Option<String>,
179 pub trigger: &'a str,
181 pub env_class_override: Option<&'a str>,
184}
185
186pub fn record_run(
189 mur_home: &Path,
190 skill_name: &str,
191 device_id: &str,
192 rec: &RunRecord<'_>,
193) -> Result<SkillEvent> {
194 let (env_class, confidence) = if rec.success {
195 (None, None)
196 } else if let Some(forced) = rec.env_class_override {
197 (Some(forced.to_string()), Some(1.0))
198 } else {
199 let c = crate::skill::env_class::classify_failure(rec.stderr.unwrap_or(""));
200 (Some(c.class.to_string()), Some(c.confidence))
201 };
202
203 let event = SkillEvent::Execution {
204 ts: Utc::now(),
205 device_id: device_id.to_string(),
206 outcome: if rec.success { "success" } else { "failure" }.to_string(),
207 error: (!rec.success)
208 .then(|| rec.stderr.map(|s| s.chars().take(500).collect()))
209 .flatten(),
210 step: rec.failed_step.clone(),
211 duration_ms: rec.duration_ms,
212 exit_code: rec.exit_code,
213 env_class,
214 confidence,
215 trigger: Some(rec.trigger.to_string()),
216 };
217 append_event(&event_log_path(mur_home, skill_name), &event)?;
218 Ok(event)
219}
220
221pub fn resolve_manifest_lww(
224 local: Skill,
225 remote: Skill,
226 force_local: bool,
227) -> (Skill, &'static str) {
228 if force_local {
229 return (local, "force_local");
230 }
231 if remote.manifest.updated_at > local.manifest.updated_at {
232 (remote, "remote_newer")
233 } else {
234 (local, "local_newer_or_equal")
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use tempfile::tempdir;
242
243 #[test]
244 fn record_run_classifies_and_appends() {
245 let tmp = tempdir().unwrap();
246 let ev = record_run(
247 tmp.path(),
248 "deploy-api",
249 "dev-a",
250 &RunRecord {
251 success: false,
252 duration_ms: Some(1200),
253 exit_code: Some(7),
254 stderr: Some("curl: (7) Connection refused"),
255 failed_step: Some("health-check".into()),
256 trigger: "manual",
257 env_class_override: None,
258 },
259 )
260 .unwrap();
261 match &ev {
262 SkillEvent::Execution {
263 env_class, trigger, ..
264 } => {
265 assert_eq!(env_class.as_deref(), Some("env"));
266 assert_eq!(trigger.as_deref(), Some("manual"));
267 }
268 _ => panic!("wrong kind"),
269 }
270 let events = read_events(&event_log_path(tmp.path(), "deploy-api")).unwrap();
271 assert_eq!(events.len(), 1);
272
273 let ev2 = record_run(
275 tmp.path(),
276 "deploy-api",
277 "dev-a",
278 &RunRecord {
279 success: true,
280 duration_ms: Some(900),
281 exit_code: Some(0),
282 stderr: None,
283 failed_step: None,
284 trigger: "schedule",
285 env_class_override: None,
286 },
287 )
288 .unwrap();
289 match &ev2 {
290 SkillEvent::Execution {
291 env_class, outcome, ..
292 } => {
293 assert!(env_class.is_none());
294 assert_eq!(outcome, "success");
295 }
296 _ => panic!("wrong kind"),
297 }
298 }
299
300 #[test]
301 fn legacy_execution_line_parses_and_enriched_roundtrips() {
302 let legacy = r#"{"kind":"execution","ts":"2026-05-30T00:00:00Z","device_id":"d","outcome":"success"}"#;
304 let ev: SkillEvent = serde_json::from_str(legacy).unwrap();
305 match &ev {
306 SkillEvent::Execution {
307 duration_ms,
308 env_class,
309 ..
310 } => {
311 assert!(duration_ms.is_none());
312 assert!(env_class.is_none());
313 }
314 _ => panic!("wrong kind"),
315 }
316
317 let enriched = SkillEvent::Execution {
319 ts: chrono::DateTime::from_timestamp(1_748_000_000, 0).unwrap(),
320 device_id: "d".into(),
321 outcome: "failure".into(),
322 error: Some("boom".into()),
323 step: Some("deploy".into()),
324 duration_ms: Some(8421),
325 exit_code: Some(1),
326 env_class: Some("workflow".into()),
327 confidence: Some(0.6),
328 trigger: Some("manual".into()),
329 };
330 let line = serde_json::to_string(&enriched).unwrap();
331 let back: SkillEvent = serde_json::from_str(&line).unwrap();
332 assert_eq!(back, enriched);
333 assert!(enriched.dedup_key().ends_with(":execution:d"));
335 }
336
337 fn device() -> String {
338 "dev-a".into()
339 }
340
341 fn retrieval(ts_offset_secs: i64) -> SkillEvent {
342 let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
343 SkillEvent::Retrieval {
344 ts: base,
345 device_id: device(),
346 }
347 }
348
349 fn exec_ok(ts_offset_secs: i64) -> SkillEvent {
350 let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
351 SkillEvent::Execution {
352 ts: base,
353 device_id: device(),
354 outcome: "success".into(),
355 error: None,
356 step: None,
357 duration_ms: None,
358 exit_code: None,
359 env_class: None,
360 confidence: None,
361 trigger: None,
362 }
363 }
364
365 fn exec_fail(ts_offset_secs: i64) -> SkillEvent {
366 let base = chrono::DateTime::from_timestamp(1_748_000_000 + ts_offset_secs, 0).unwrap();
367 SkillEvent::Execution {
368 ts: base,
369 device_id: device(),
370 outcome: "failure".into(),
371 error: Some("oops".into()),
372 step: None,
373 duration_ms: None,
374 exit_code: None,
375 env_class: None,
376 confidence: None,
377 trigger: None,
378 }
379 }
380
381 #[test]
382 fn append_then_read_roundtrip() {
383 let dir = tempdir().unwrap();
384 let path = dir.path().join("events.jsonl");
385 append_event(&path, &retrieval(0)).unwrap();
386 append_event(&path, &exec_ok(1)).unwrap();
387 let events = read_events(&path).unwrap();
388 assert_eq!(events.len(), 2);
389 }
390
391 #[test]
392 fn union_deduplicates_identical_events() {
393 let a = vec![retrieval(0), exec_ok(1)];
394 let b = vec![exec_ok(1), exec_fail(2)];
395 let merged = union_events(a, b);
396 assert_eq!(merged.len(), 3); }
398
399 #[test]
400 fn union_is_commutative() {
401 let a = vec![retrieval(0), exec_ok(1)];
402 let b = vec![exec_ok(1), exec_fail(2)];
403 let ab = union_events(a.clone(), b.clone());
404 let ba = union_events(b, a);
405 let ab_keys: Vec<_> = ab.iter().map(|e| e.dedup_key()).collect();
406 let ba_keys: Vec<_> = ba.iter().map(|e| e.dedup_key()).collect();
407 assert_eq!(ab_keys, ba_keys);
408 }
409
410 #[test]
411 fn apply_new_events_updates_counters() {
412 use crate::skill::stats::SkillStats;
413 use chrono::Utc;
414 let mut stats = SkillStats::new("test-skill", "1.0.0", "digest", Utc::now());
415 let events = vec![exec_ok(1), exec_fail(2), retrieval(3)];
416 apply_new_events_to_stats(&mut stats, &events);
417 assert_eq!(stats.usage_count, 3);
418 assert_eq!(stats.success_count, 1);
419 assert_eq!(stats.failure_count, 1);
420 assert!(stats.last_success_at.is_some());
421 assert!(stats.first_successful_use_at.is_some());
422 }
423
424 #[test]
425 fn read_events_returns_empty_for_missing_file() {
426 let dir = tempdir().unwrap();
427 let events = read_events(&dir.path().join("missing.jsonl")).unwrap();
428 assert!(events.is_empty());
429 }
430
431 #[test]
432 fn parse_events_jsonl_handles_multiline() {
433 let raw = "{\"kind\":\"retrieval\",\"ts\":\"2026-05-30T00:00:00Z\",\"device_id\":\"d\"}\n\
434 {\"kind\":\"retrieval\",\"ts\":\"2026-05-30T00:01:00Z\",\"device_id\":\"d\"}\n";
435 let events = parse_events_jsonl(raw).unwrap();
436 assert_eq!(events.len(), 2);
437 }
438
439 #[test]
440 fn manifest_lww_prefers_remote_when_newer() {
441 use crate::skill::manifest::{Content, Skill, SkillManifest};
442 use crate::skill::types::Category;
443 let t1 = chrono::DateTime::from_timestamp(1_000, 0).unwrap();
444 let t2 = chrono::DateTime::from_timestamp(2_000, 0).unwrap();
445
446 let local = Skill {
447 manifest: SkillManifest {
448 name: "test".into(),
449 version: "1.0".into(),
450 publisher: "p".into(),
451 description: "d".into(),
452 category: Category::Context,
453 provenance: Default::default(),
454 hosts: vec![],
455 content: Content {
456 r#abstract: "a".into(),
457 context: Some("c".into()),
458 procedure: None,
459 command: None,
460 note: None,
461 },
462 requires: vec![],
463 tags: vec![],
464 triggers: vec![],
465 priority: Default::default(),
466 evolution_log: vec![],
467 transfer_chain: vec![],
468 mcp_requirements: vec![],
469 updated_at: t1,
470 },
471 content_sha256: Some("hash".into()),
472 trust_level: Default::default(),
473 capabilities_declared: vec![],
474 publisher_signature: None,
475 };
476
477 let mut remote = local.clone();
478 remote.manifest.updated_at = t2;
479
480 let (winner, reason) = resolve_manifest_lww(local, remote, false);
481 assert_eq!(reason, "remote_newer");
482 assert_eq!(winner.manifest.updated_at, t2);
483 }
484
485 #[test]
486 fn manifest_lww_respects_force_local() {
487 use crate::skill::manifest::{Content, Skill, SkillManifest};
488 use crate::skill::types::Category;
489 let t1 = chrono::DateTime::from_timestamp(1_000, 0).unwrap();
490 let t2 = chrono::DateTime::from_timestamp(2_000, 0).unwrap();
491
492 let local = Skill {
493 manifest: SkillManifest {
494 name: "test".into(),
495 version: "1.0".into(),
496 publisher: "p".into(),
497 description: "d".into(),
498 category: Category::Context,
499 provenance: Default::default(),
500 hosts: vec![],
501 content: Content {
502 r#abstract: "a".into(),
503 context: Some("c".into()),
504 procedure: None,
505 command: None,
506 note: None,
507 },
508 requires: vec![],
509 tags: vec![],
510 triggers: vec![],
511 priority: Default::default(),
512 evolution_log: vec![],
513 transfer_chain: vec![],
514 mcp_requirements: vec![],
515 updated_at: t1,
516 },
517 content_sha256: Some("hash".into()),
518 trust_level: Default::default(),
519 capabilities_declared: vec![],
520 publisher_signature: None,
521 };
522
523 let mut remote = local.clone();
524 remote.manifest.updated_at = t2;
525
526 let (winner, reason) = resolve_manifest_lww(local.clone(), remote, true);
527 assert_eq!(reason, "force_local");
528 assert_eq!(winner.manifest.updated_at, t1);
529 }
530}