Skip to main content

aivcs_core/
recording.rs

1//! Graph lifecycle adapter: bridges domain `Event` types to `RunLedger` persistence.
2
3use std::sync::Arc;
4
5use oxidized_state::{
6    ContentDigest, RunEvent, RunId, RunLedger, RunMetadata, RunSummary, StorageResult,
7};
8
9use crate::domain::run::{Event, EventKind};
10
11/// Extract the snake_case kind string from an `EventKind` via its serde tag.
12fn event_kind_str(kind: &EventKind) -> String {
13    serde_json::to_value(kind)
14        .ok()
15        .and_then(|v| v["type"].as_str().map(str::to_string))
16        .unwrap_or_else(|| "unknown".to_string())
17}
18
19/// Adapter that records graph lifecycle [`Event`]s into a [`RunLedger`].
20///
21/// Usage:
22/// 1. Call [`GraphRunRecorder::start`] to create a new run.
23/// 2. Call [`GraphRunRecorder::record`] for each domain event.
24/// 3. Call [`GraphRunRecorder::finish_ok`] or [`GraphRunRecorder::finish_err`] to finalize.
25pub struct GraphRunRecorder {
26    ledger: Arc<dyn RunLedger>,
27    run_id: RunId,
28}
29
30impl GraphRunRecorder {
31    /// Start a new run in the ledger, returning a recorder bound to that run.
32    pub async fn start(
33        ledger: Arc<dyn RunLedger>,
34        spec_digest: &ContentDigest,
35        metadata: RunMetadata,
36    ) -> StorageResult<Self> {
37        let run_id = ledger.create_run(spec_digest, metadata.clone()).await?;
38        crate::obs::emit_run_started(run_id.to_string().as_str(), &metadata.agent_name);
39        Ok(Self { ledger, run_id })
40    }
41
42    /// Record a single domain event into the ledger.
43    pub async fn record(&self, event: &Event) -> StorageResult<()> {
44        let kind_str = event_kind_str(&event.kind);
45        let run_event = RunEvent {
46            seq: event.seq,
47            kind: kind_str.clone(),
48            payload: event.payload.clone(),
49            timestamp: event.timestamp,
50        };
51        self.ledger.append_event(&self.run_id, run_event).await?;
52        crate::obs::emit_event_appended(&self.run_id.to_string(), &kind_str, event.seq);
53        Ok(())
54    }
55
56    /// Finalize the run as completed.
57    pub async fn finish_ok(self, summary: RunSummary) -> StorageResult<()> {
58        let duration_ms = summary.duration_ms;
59        let total_events = summary.total_events;
60        self.ledger.complete_run(&self.run_id, summary).await?;
61        crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, true);
62        Ok(())
63    }
64
65    /// Finalize the run as failed.
66    pub async fn finish_err(self, summary: RunSummary) -> StorageResult<()> {
67        let duration_ms = summary.duration_ms;
68        let total_events = summary.total_events;
69        self.ledger.fail_run(&self.run_id, summary).await?;
70        crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, false);
71        Ok(())
72    }
73
74    /// Return a reference to the run ID.
75    pub fn run_id(&self) -> &RunId {
76        &self.run_id
77    }
78}