1use std::sync::Arc;
4
5use oxidized_state::{
6 ContentDigest, RunEvent, RunId, RunLedger, RunMetadata, RunSummary, StorageResult,
7};
8
9use crate::domain::run::{Event, EventKind};
10
11fn event_kind_str(kind: &EventKind) -> String {
13 serde_json::to_value(kind)
14 .ok()
15 .and_then(|v| v["type"].as_str().map(str::to_string))
16 .unwrap_or_else(|| "unknown".to_string())
17}
18
19pub struct GraphRunRecorder {
26 ledger: Arc<dyn RunLedger>,
27 run_id: RunId,
28}
29
30impl GraphRunRecorder {
31 pub async fn start(
33 ledger: Arc<dyn RunLedger>,
34 spec_digest: &ContentDigest,
35 metadata: RunMetadata,
36 ) -> StorageResult<Self> {
37 let run_id = ledger.create_run(spec_digest, metadata.clone()).await?;
38 crate::obs::emit_run_started(run_id.to_string().as_str(), &metadata.agent_name);
39 Ok(Self { ledger, run_id })
40 }
41
42 pub async fn record(&self, event: &Event) -> StorageResult<()> {
44 let kind_str = event_kind_str(&event.kind);
45
46 let mut payload = event.payload.clone();
49 if let serde_json::Value::Object(ref mut map) = payload {
50 if let Ok(serde_json::Value::Object(kind_map)) = serde_json::to_value(&event.kind) {
51 for (k, v) in kind_map {
52 if k != "type" {
53 map.insert(k, v);
54 }
55 }
56 }
57 }
58
59 let run_event = RunEvent {
60 seq: event.seq,
61 kind: kind_str.clone(),
62 payload,
63 timestamp: event.timestamp,
64 };
65 self.ledger.append_event(&self.run_id, run_event).await?;
66 crate::obs::emit_event_appended(&self.run_id.to_string(), &kind_str, event.seq);
67 Ok(())
68 }
69
70 pub async fn finish_ok(self, summary: RunSummary) -> StorageResult<()> {
72 let duration_ms = summary.duration_ms;
73 let total_events = summary.total_events;
74 self.ledger.complete_run(&self.run_id, summary).await?;
75 crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, true);
76 Ok(())
77 }
78
79 pub async fn finish_err(self, summary: RunSummary) -> StorageResult<()> {
81 let duration_ms = summary.duration_ms;
82 let total_events = summary.total_events;
83 self.ledger.fail_run(&self.run_id, summary).await?;
84 crate::obs::emit_run_finished(&self.run_id.to_string(), duration_ms, total_events, false);
85 Ok(())
86 }
87
88 pub fn run_id(&self) -> &RunId {
90 &self.run_id
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use crate::domain::run::{Event, EventKind};
98 use oxidized_state::SurrealRunLedger;
99 use serde_json::json;
100 use std::sync::Arc;
101 use uuid::Uuid;
102
103 #[tokio::test]
104 async fn test_tool_name_is_preserved_in_ledger() {
105 let ledger = Arc::new(SurrealRunLedger::in_memory().await.unwrap());
106 let spec_digest = oxidized_state::ContentDigest::from_bytes(b"spec");
107 let metadata = oxidized_state::RunMetadata {
108 git_sha: None,
109 agent_name: "test".to_string(),
110 tags: json!({}),
111 };
112
113 let recorder = GraphRunRecorder::start(ledger.clone(), &spec_digest, metadata)
114 .await
115 .unwrap();
116 let run_id = recorder.run_id().clone();
117
118 let event = Event::new(
119 Uuid::new_v4(),
120 1,
121 EventKind::ToolCalled {
122 tool_name: "my_tool".to_string(),
123 },
124 json!({"param": "value"}),
125 );
126
127 recorder.record(&event).await.unwrap();
128
129 let events = ledger.get_events(&run_id).await.unwrap();
130 assert_eq!(events.len(), 1);
131 let recorded_event = &events[0];
132
133 assert_eq!(recorded_event.kind, "tool_called");
134
135 assert_eq!(
137 recorded_event
138 .payload
139 .get("tool_name")
140 .and_then(|v| v.as_str()),
141 Some("my_tool"),
142 "tool_name missing from recorded payload"
143 );
144 }
145}