1use std::sync::Arc;
4
5use oxidized_state::{
6 ContentDigest, RunEvent, RunId, RunLedger, RunMetadata, RunSummary, StorageResult,
7};
8
9use crate::domain::run::{Event, EventKind};
10
11fn event_kind_str(kind: &EventKind) -> String {
13 serde_json::to_value(kind)
14 .ok()
15 .and_then(|v| v["type"].as_str().map(str::to_string))
16 .unwrap_or_else(|| "unknown".to_string())
17}
18
19pub struct GraphRunRecorder {
26 ledger: Arc<dyn RunLedger>,
27 run_id: RunId,
28}
29
30impl GraphRunRecorder {
31 pub async fn start(
33 ledger: Arc<dyn RunLedger>,
34 spec_digest: &ContentDigest,
35 metadata: RunMetadata,
36 ) -> StorageResult<Self> {
37 let run_id = ledger.create_run(spec_digest, metadata.clone()).await?;
38 crate::obs::emit_run_started(run_id.to_string().as_str(), &metadata.agent_name);
39 Ok(Self { ledger, run_id })
40 }
41
42 pub async fn record(&self, event: &Event) -> StorageResult<()> {
44 let kind_str = event_kind_str(&event.kind);
45 let run_event = RunEvent {
46 seq: event.seq,
47 kind: kind_str.clone(),
48 payload: event.payload.clone(),
49 timestamp: event.timestamp,
50 };
51 self.ledger.append_event(&self.run_id, run_event).await?;
52 crate::obs::emit_event_appended(&self.run_id.to_string(), &kind_str, event.seq);
53 Ok(())
54 }
55
56 pub async fn finish_ok(self, summary: RunSummary) -> StorageResult<()> {
58 let duration_ms = summary.duration_ms;
59 let total_events = summary.total_events;
60 self.ledger.complete_run(&self.run_id, summary).await?;
61 crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, true);
62 Ok(())
63 }
64
65 pub async fn finish_err(self, summary: RunSummary) -> StorageResult<()> {
67 let duration_ms = summary.duration_ms;
68 let total_events = summary.total_events;
69 self.ledger.fail_run(&self.run_id, summary).await?;
70 crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, false);
71 Ok(())
72 }
73
74 pub fn run_id(&self) -> &RunId {
76 &self.run_id
77 }
78}