harn_vm/orchestration/records/
persistence.rs1use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5
6use super::super::{default_run_dir, new_id, now_rfc3339, parse_json_payload, sync_run_handoffs};
7use super::action_graph::{publish_action_graph_event, refresh_run_observability};
8use super::eval_pack::replay_fixture_from_run;
9use super::json::json_usize;
10use super::types::{
11 run_child_record_from_worker_metadata, CompactionEventRecord, DaemonEventRecord,
12 RunChildRecord, RunHitlQuestionRecord, RunRecord, RunStageRecord,
13};
14use crate::agent_events::AgentEvent;
15use crate::event_log::{
16 active_event_log, sanitize_topic_component, AnyEventLog, EventId, EventLog,
17 LogEvent as EventLogRecord, Topic,
18};
19use crate::llm::vm_value_to_json;
20use crate::triggers::{SignatureStatus, TriggerEvent};
21use crate::value::{VmError, VmValue};
22
23pub(super) fn run_child_from_stage_metadata(stage: &RunStageRecord) -> Option<RunChildRecord> {
24 let parent_stage_id = if stage.id.is_empty() {
25 None
26 } else {
27 Some(stage.id.clone())
28 };
29 run_child_record_from_worker_metadata(parent_stage_id, stage.metadata.get("worker")?)
30}
31
32pub(super) fn fill_missing_child_run_fields(existing: &mut RunChildRecord, child: RunChildRecord) {
33 if existing.worker_name.is_empty() {
34 existing.worker_name = child.worker_name;
35 }
36 if existing.parent_stage_id.is_none() {
37 existing.parent_stage_id = child.parent_stage_id;
38 }
39 if existing.session_id.is_none() {
40 existing.session_id = child.session_id;
41 }
42 if existing.parent_session_id.is_none() {
43 existing.parent_session_id = child.parent_session_id;
44 }
45 if existing.mutation_scope.is_none() {
46 existing.mutation_scope = child.mutation_scope;
47 }
48 if existing.approval_policy.is_none() {
49 existing.approval_policy = child.approval_policy;
50 }
51 if existing.task.is_empty() {
52 existing.task = child.task;
53 }
54 if existing.request.is_none() {
55 existing.request = child.request;
56 }
57 if existing.provenance.is_none() {
58 existing.provenance = child.provenance;
59 }
60 if existing.status.is_empty() {
61 existing.status = child.status;
62 }
63 if existing.started_at.is_empty() {
64 existing.started_at = child.started_at;
65 }
66 if existing.finished_at.is_none() {
67 existing.finished_at = child.finished_at;
68 }
69 if existing.run_id.is_none() {
70 existing.run_id = child.run_id;
71 }
72 if existing.run_path.is_none() {
73 existing.run_path = child.run_path;
74 }
75 if existing.snapshot_path.is_none() {
76 existing.snapshot_path = child.snapshot_path;
77 }
78 if existing.execution.is_none() {
79 existing.execution = child.execution;
80 }
81}
82
83pub(super) fn materialize_child_runs_from_stage_metadata(run: &mut RunRecord) {
84 for child in run
85 .stages
86 .iter()
87 .filter_map(run_child_from_stage_metadata)
88 .collect::<Vec<_>>()
89 {
90 match run
91 .child_runs
92 .iter_mut()
93 .find(|existing| existing.worker_id == child.worker_id)
94 {
95 Some(existing) => fill_missing_child_run_fields(existing, child),
96 None => run.child_runs.push(child),
97 }
98 }
99}
100
101pub(super) fn read_topic_records(
102 log: &AnyEventLog,
103 topic: &Topic,
104) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
105 let mut from = None;
106 let mut records = Vec::new();
107 loop {
108 let batch =
109 futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
110 if batch.is_empty() {
111 break;
112 }
113 from = batch.last().map(|(event_id, _)| *event_id);
114 records.extend(batch);
115 }
116 records
117}
118
119#[derive(Clone, Debug)]
120pub struct AgentSessionReplayEvent {
121 pub event_id: EventId,
122 pub event: AgentEvent,
123}
124
125pub async fn load_agent_session_replay_events(
126 session_id: &str,
127) -> Result<Vec<AgentSessionReplayEvent>, VmError> {
128 let Some(log) = active_event_log() else {
129 return Ok(Vec::new());
130 };
131 let topic = Topic::new(format!(
132 "observability.agent_events.{}",
133 sanitize_topic_component(session_id)
134 ))
135 .map_err(|error| VmError::Runtime(format!("failed to build agent event topic: {error}")))?;
136
137 let mut events = Vec::new();
138 let mut from = None;
139 loop {
140 let batch = log.read_range(&topic, from, 1024).await.map_err(|error| {
141 VmError::Runtime(format!(
142 "failed to read agent event replay topic {}: {error}",
143 topic.as_str()
144 ))
145 })?;
146 let batch_len = batch.len();
147 for (event_id, record) in batch {
148 from = Some(event_id);
149 if record.headers.get("session_id").map(String::as_str) != Some(session_id) {
150 continue;
151 }
152 let Some(event_value) = record.payload.get("event").cloned() else {
153 continue;
154 };
155 let event = serde_json::from_value::<AgentEvent>(event_value).map_err(|error| {
156 VmError::Runtime(format!(
157 "failed to decode agent event replay record {event_id}: {error}"
158 ))
159 })?;
160 if event.session_id() == session_id {
161 events.push(AgentSessionReplayEvent { event_id, event });
162 }
163 }
164 if batch_len < 1024 {
165 break;
166 }
167 }
168 Ok(events)
169}
170
171pub(super) fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
172 let Some(log) = active_event_log() else {
173 return;
174 };
175 let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
176 .expect("static hitl.questions topic should always be valid");
177 let mut merged = run
178 .hitl_questions
179 .iter()
180 .cloned()
181 .map(|question| (question.request_id.clone(), question))
182 .collect::<BTreeMap<_, _>>();
183
184 for (_, event) in read_topic_records(log.as_ref(), &topic) {
185 if event.kind != "hitl.question_asked" {
186 continue;
187 }
188 let payload = &event.payload;
189 let matches_run = event
190 .headers
191 .get("run_id")
192 .is_some_and(|value| value == &run.id)
193 || payload
194 .get("run_id")
195 .and_then(|value| value.as_str())
196 .is_some_and(|value| value == run.id);
197 if !matches_run {
198 continue;
199 }
200 let request_id = payload
201 .get("request_id")
202 .and_then(|value| value.as_str())
203 .or_else(|| event.headers.get("request_id").map(String::as_str))
204 .unwrap_or_default();
205 let prompt = payload
206 .get("payload")
207 .and_then(|value| value.get("prompt"))
208 .and_then(|value| value.as_str())
209 .unwrap_or_default();
210 if request_id.is_empty() || prompt.is_empty() {
211 continue;
212 }
213 merged.insert(
214 request_id.to_string(),
215 RunHitlQuestionRecord {
216 request_id: request_id.to_string(),
217 prompt: prompt.to_string(),
218 agent: payload
219 .get("agent")
220 .and_then(|value| value.as_str())
221 .unwrap_or_default()
222 .to_string(),
223 trace_id: payload
224 .get("trace_id")
225 .and_then(|value| value.as_str())
226 .map(str::to_string),
227 asked_at: payload
228 .get("requested_at")
229 .and_then(|value| value.as_str())
230 .unwrap_or_default()
231 .to_string(),
232 },
233 );
234 }
235
236 run.hitl_questions = merged.into_values().collect();
237 run.hitl_questions.sort_by(|left, right| {
238 (left.asked_at.as_str(), left.request_id.as_str())
239 .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
240 });
241}
242
243pub(super) fn signature_status_label(status: &SignatureStatus) -> &'static str {
244 match status {
245 SignatureStatus::Verified => "verified",
246 SignatureStatus::Unsigned => "unsigned",
247 SignatureStatus::Failed { .. } => "failed",
248 }
249}
250
251pub(super) fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
252 run.metadata
253 .get("trigger_event")
254 .cloned()
255 .and_then(|value| serde_json::from_value(value).ok())
256}
257
258pub(super) fn run_trace_id(
259 run: &RunRecord,
260 trigger_event: Option<&TriggerEvent>,
261) -> Option<String> {
262 trigger_event
263 .map(|event| event.trace_id.0.clone())
264 .or_else(|| {
265 run.metadata
266 .get("trace_id")
267 .and_then(|value| value.as_str())
268 .map(str::to_string)
269 })
270}
271
272pub(super) fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
273 run.metadata
274 .get("replay_of_event_id")
275 .and_then(|value| value.as_str())
276 .map(str::to_string)
277}
278
279pub(super) fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
280 let stem = run_path.file_stem()?.to_str()?;
281 let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
282 Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
283}
284
285pub(super) fn compaction_events_from_transcript(
286 transcript: &serde_json::Value,
287 stage_id: Option<&str>,
288 node_id: Option<&str>,
289 location_prefix: &str,
290 persisted_path: Option<&Path>,
291) -> Vec<CompactionEventRecord> {
292 use std::collections::BTreeSet;
293 let transcript_id = transcript
294 .get("id")
295 .and_then(|value| value.as_str())
296 .map(str::to_string);
297 let asset_ids = transcript
298 .get("assets")
299 .and_then(|value| value.as_array())
300 .map(|assets| {
301 assets
302 .iter()
303 .filter_map(|asset| {
304 asset
305 .get("id")
306 .and_then(|value| value.as_str())
307 .map(str::to_string)
308 })
309 .collect::<BTreeSet<_>>()
310 })
311 .unwrap_or_default();
312 transcript
313 .get("events")
314 .and_then(|value| value.as_array())
315 .map(|events| {
316 events
317 .iter()
318 .filter(|event| {
319 event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
320 })
321 .map(|event| {
322 let metadata = event.get("metadata");
323 let snapshot_asset_id = metadata
324 .and_then(|value| value.get("snapshot_asset_id"))
325 .and_then(|value| value.as_str())
326 .map(str::to_string);
327 let available = snapshot_asset_id
328 .as_ref()
329 .is_some_and(|asset_id| asset_ids.contains(asset_id));
330 let snapshot_location = snapshot_asset_id
331 .as_ref()
332 .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
333 .unwrap_or_else(|| location_prefix.to_string());
334 CompactionEventRecord {
335 id: event
336 .get("id")
337 .and_then(|value| value.as_str())
338 .unwrap_or_default()
339 .to_string(),
340 transcript_id: transcript_id.clone(),
341 stage_id: stage_id.map(str::to_string),
342 node_id: node_id.map(str::to_string),
343 mode: metadata
344 .and_then(|value| value.get("mode"))
345 .and_then(|value| value.as_str())
346 .unwrap_or_default()
347 .to_string(),
348 strategy: metadata
349 .and_then(|value| value.get("strategy"))
350 .and_then(|value| value.as_str())
351 .unwrap_or_default()
352 .to_string(),
353 archived_messages: json_usize(
354 metadata.and_then(|value| value.get("archived_messages")),
355 ),
356 estimated_tokens_before: json_usize(
357 metadata.and_then(|value| value.get("estimated_tokens_before")),
358 ),
359 estimated_tokens_after: json_usize(
360 metadata.and_then(|value| value.get("estimated_tokens_after")),
361 ),
362 snapshot_asset_id,
363 snapshot_location,
364 snapshot_path: persisted_path
365 .map(|path| path.to_string_lossy().into_owned()),
366 available,
367 }
368 })
369 .collect()
370 })
371 .unwrap_or_default()
372}
373
374pub(super) fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
375 let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
376 return Vec::new();
377 };
378 let Ok(content) = std::fs::read_to_string(sidecar_path) else {
379 return Vec::new();
380 };
381
382 content
383 .lines()
384 .filter(|line| !line.trim().is_empty())
385 .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
386 .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
387 .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
388 .collect()
389}
390
391pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
392 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
393 if run.type_name.is_empty() {
394 run.type_name = "run_record".to_string();
395 }
396 if run.id.is_empty() {
397 run.id = new_id("run");
398 }
399 if run.started_at.is_empty() {
400 run.started_at = now_rfc3339();
401 }
402 if run.status.is_empty() {
403 run.status = "running".to_string();
404 }
405 if run.root_run_id.is_none() {
406 run.root_run_id = Some(run.id.clone());
407 }
408 if run.replay_fixture.is_none() {
409 run.replay_fixture = Some(replay_fixture_from_run(&run));
410 }
411 merge_hitl_questions_from_active_log(&mut run);
412 materialize_child_runs_from_stage_metadata(&mut run);
413 sync_run_handoffs(&mut run);
414 if run.observability.is_none() {
415 let persisted_path = run.persisted_path.clone();
416 let persisted = persisted_path.as_deref().map(Path::new);
417 refresh_run_observability(&mut run, persisted);
418 }
419 Ok(run)
420}
421
422pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
423 let path = path
424 .map(PathBuf::from)
425 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
426 let mut materialized = run.clone();
427 merge_hitl_questions_from_active_log(&mut materialized);
428 materialize_child_runs_from_stage_metadata(&mut materialized);
429 if materialized.replay_fixture.is_none() {
430 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
431 }
432 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
433 sync_run_handoffs(&mut materialized);
434 refresh_run_observability(&mut materialized, Some(&path));
435 if let Some(parent) = path.parent() {
436 std::fs::create_dir_all(parent)
437 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
438 }
439 let json = serde_json::to_string_pretty(&materialized)
440 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
441 crate::atomic_io::atomic_write(&path, json.as_bytes())
442 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
443 if let Some(observability) = materialized.observability.as_ref() {
444 publish_action_graph_event(&materialized, observability, &path);
445 }
446 Ok(path.to_string_lossy().into_owned())
447}
448
449pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
450 let content = std::fs::read_to_string(path)
451 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
452 let mut run: RunRecord = serde_json::from_str(&content)
453 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
454 materialize_child_runs_from_stage_metadata(&mut run);
455 if run.replay_fixture.is_none() {
456 run.replay_fixture = Some(replay_fixture_from_run(&run));
457 }
458 run.persisted_path
459 .get_or_insert_with(|| path.to_string_lossy().into_owned());
460 sync_run_handoffs(&mut run);
461 refresh_run_observability(&mut run, Some(path));
462 Ok(run)
463}