Skip to main content

aivcs_core/
recording.rs

1//! Graph lifecycle adapter: bridges domain `Event` types to `RunLedger` persistence.
2
3use std::sync::Arc;
4
5use oxidized_state::{
6    ContentDigest, RunEvent, RunId, RunLedger, RunMetadata, RunSummary, StorageResult,
7};
8
9use crate::domain::run::{Event, EventKind};
10
11/// Extract the snake_case kind string from an `EventKind` via its serde tag.
12fn event_kind_str(kind: &EventKind) -> String {
13    serde_json::to_value(kind)
14        .ok()
15        .and_then(|v| v["type"].as_str().map(str::to_string))
16        .unwrap_or_else(|| "unknown".to_string())
17}
18
19/// Adapter that records graph lifecycle [`Event`]s into a [`RunLedger`].
20///
21/// Usage:
22/// 1. Call [`GraphRunRecorder::start`] to create a new run.
23/// 2. Call [`GraphRunRecorder::record`] for each domain event.
24/// 3. Call [`GraphRunRecorder::finish_ok`] or [`GraphRunRecorder::finish_err`] to finalize.
25pub struct GraphRunRecorder {
26    ledger: Arc<dyn RunLedger>,
27    run_id: RunId,
28}
29
30impl GraphRunRecorder {
31    /// Start a new run in the ledger, returning a recorder bound to that run.
32    pub async fn start(
33        ledger: Arc<dyn RunLedger>,
34        spec_digest: &ContentDigest,
35        metadata: RunMetadata,
36    ) -> StorageResult<Self> {
37        let run_id = ledger.create_run(spec_digest, metadata.clone()).await?;
38        crate::obs::emit_run_started(run_id.to_string().as_str(), &metadata.agent_name);
39        Ok(Self { ledger, run_id })
40    }
41
42    /// Record a single domain event into the ledger.
43    pub async fn record(&self, event: &Event) -> StorageResult<()> {
44        let kind_str = event_kind_str(&event.kind);
45
46        // Merge fields from EventKind into payload so they are preserved in the ledger.
47        // This ensures that tool_name, node_id, etc. are queryable from the payload.
48        let mut payload = event.payload.clone();
49        if let serde_json::Value::Object(ref mut map) = payload {
50            if let Ok(serde_json::Value::Object(kind_map)) = serde_json::to_value(&event.kind) {
51                for (k, v) in kind_map {
52                    if k != "type" {
53                        map.insert(k, v);
54                    }
55                }
56            }
57        }
58
59        let run_event = RunEvent {
60            seq: event.seq,
61            kind: kind_str.clone(),
62            payload,
63            timestamp: event.timestamp,
64        };
65        self.ledger.append_event(&self.run_id, run_event).await?;
66        crate::obs::emit_event_appended(&self.run_id.to_string(), &kind_str, event.seq);
67        Ok(())
68    }
69
70    /// Finalize the run as completed.
71    pub async fn finish_ok(self, summary: RunSummary) -> StorageResult<()> {
72        let duration_ms = summary.duration_ms;
73        let total_events = summary.total_events;
74        self.ledger.complete_run(&self.run_id, summary).await?;
75        crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, true);
76        Ok(())
77    }
78
79    /// Finalize the run as failed.
80    pub async fn finish_err(self, summary: RunSummary) -> StorageResult<()> {
81        let duration_ms = summary.duration_ms;
82        let total_events = summary.total_events;
83        self.ledger.fail_run(&self.run_id, summary).await?;
84        crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, false);
85        Ok(())
86    }
87
88    /// Return a reference to the run ID.
89    pub fn run_id(&self) -> &RunId {
90        &self.run_id
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use crate::domain::run::{Event, EventKind};
98    use oxidized_state::SurrealRunLedger;
99    use serde_json::json;
100    use std::sync::Arc;
101    use uuid::Uuid;
102
103    #[tokio::test]
104    async fn test_tool_name_is_preserved_in_ledger() {
105        let ledger = Arc::new(SurrealRunLedger::in_memory().await.unwrap());
106        let spec_digest = oxidized_state::ContentDigest::from_bytes(b"spec");
107        let metadata = oxidized_state::RunMetadata {
108            git_sha: None,
109            agent_name: "test".to_string(),
110            tags: json!({}),
111        };
112
113        let recorder = GraphRunRecorder::start(ledger.clone(), &spec_digest, metadata)
114            .await
115            .unwrap();
116        let run_id = recorder.run_id().clone();
117
118        let event = Event::new(
119            Uuid::new_v4(),
120            1,
121            EventKind::ToolCalled {
122                tool_name: "my_tool".to_string(),
123            },
124            json!({"param": "value"}),
125        );
126
127        recorder.record(&event).await.unwrap();
128
129        let events = ledger.get_events(&run_id).await.unwrap();
130        assert_eq!(events.len(), 1);
131        let recorded_event = &events[0];
132
133        assert_eq!(recorded_event.kind, "tool_called");
134
135        // Regression check: tool_name must be persisted in payload for downstream diffing.
136        assert_eq!(
137            recorded_event
138                .payload
139                .get("tool_name")
140                .and_then(|v| v.as_str()),
141            Some("my_tool"),
142            "tool_name missing from recorded payload"
143        );
144    }
145}