harn_vm/orchestration/records/
persistence.rs1use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5
6use super::super::{default_run_dir, new_id, now_rfc3339, parse_json_payload, sync_run_handoffs};
7use super::action_graph::{publish_action_graph_event, refresh_run_observability};
8use super::eval_pack::replay_fixture_from_run;
9use super::json::json_usize;
10use super::types::{
11 run_child_record_from_worker_metadata, CompactionEventRecord, DaemonEventRecord,
12 RunChildRecord, RunHitlQuestionRecord, RunRecord, RunStageRecord,
13};
14use crate::agent_events::AgentEvent;
15use crate::event_log::{
16 active_event_log, sanitize_topic_component, AnyEventLog, EventId, EventLog,
17 LogEvent as EventLogRecord, Topic,
18};
19use crate::llm::vm_value_to_json;
20use crate::triggers::{SignatureStatus, TriggerEvent};
21use crate::value::{VmError, VmValue};
22
23pub(super) fn run_child_from_stage_metadata(stage: &RunStageRecord) -> Option<RunChildRecord> {
24 let parent_stage_id = if stage.id.is_empty() {
25 None
26 } else {
27 Some(stage.id.clone())
28 };
29 run_child_record_from_worker_metadata(parent_stage_id, stage.metadata.get("worker")?)
30}
31
32pub(super) fn fill_missing_child_run_fields(existing: &mut RunChildRecord, child: RunChildRecord) {
33 if existing.worker_name.is_empty() {
34 existing.worker_name = child.worker_name;
35 }
36 if existing.parent_stage_id.is_none() {
37 existing.parent_stage_id = child.parent_stage_id;
38 }
39 if existing.session_id.is_none() {
40 existing.session_id = child.session_id;
41 }
42 if existing.parent_session_id.is_none() {
43 existing.parent_session_id = child.parent_session_id;
44 }
45 if existing.mutation_scope.is_none() {
46 existing.mutation_scope = child.mutation_scope;
47 }
48 if existing.approval_policy.is_none() {
49 existing.approval_policy = child.approval_policy;
50 }
51 if existing.task.is_empty() {
52 existing.task = child.task;
53 }
54 if existing.request.is_none() {
55 existing.request = child.request;
56 }
57 if existing.provenance.is_none() {
58 existing.provenance = child.provenance;
59 }
60 if existing.status.is_empty() {
61 existing.status = child.status;
62 }
63 if existing.started_at.is_empty() {
64 existing.started_at = child.started_at;
65 }
66 if existing.finished_at.is_none() {
67 existing.finished_at = child.finished_at;
68 }
69 if existing.run_id.is_none() {
70 existing.run_id = child.run_id;
71 }
72 if existing.run_path.is_none() {
73 existing.run_path = child.run_path;
74 }
75 if existing.snapshot_path.is_none() {
76 existing.snapshot_path = child.snapshot_path;
77 }
78 if existing.execution.is_none() {
79 existing.execution = child.execution;
80 }
81}
82
83pub(super) fn materialize_child_runs_from_stage_metadata(run: &mut RunRecord) {
84 for child in run.stages.iter().filter_map(run_child_from_stage_metadata) {
85 match run
86 .child_runs
87 .iter_mut()
88 .find(|existing| existing.worker_id == child.worker_id)
89 {
90 Some(existing) => fill_missing_child_run_fields(existing, child),
91 None => run.child_runs.push(child),
92 }
93 }
94}
95
96pub(super) fn read_topic_records(
97 log: &AnyEventLog,
98 topic: &Topic,
99) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
100 let mut from = None;
101 let mut records = Vec::new();
102 loop {
103 let batch =
104 futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
105 if batch.is_empty() {
106 break;
107 }
108 from = batch.last().map(|(event_id, _)| *event_id);
109 records.extend(batch);
110 }
111 records
112}
113
114#[derive(Clone, Debug)]
115pub struct AgentSessionReplayEvent {
116 pub event_id: EventId,
117 pub kind: String,
118 pub occurred_at_ms: i64,
119 pub event: AgentEvent,
120}
121
122pub async fn load_agent_session_replay_events(
123 session_id: &str,
124) -> Result<Vec<AgentSessionReplayEvent>, VmError> {
125 let Some(log) = active_event_log() else {
126 return Ok(Vec::new());
127 };
128 load_agent_session_replay_events_from_log(log.as_ref(), session_id).await
129}
130
131pub async fn load_agent_session_replay_events_from_log(
132 log: &AnyEventLog,
133 session_id: &str,
134) -> Result<Vec<AgentSessionReplayEvent>, VmError> {
135 let topic = Topic::new(format!(
136 "observability.agent_events.{}",
137 sanitize_topic_component(session_id)
138 ))
139 .map_err(|error| VmError::Runtime(format!("failed to build agent event topic: {error}")))?;
140
141 let mut events = Vec::new();
142 let mut from = None;
143 loop {
144 let batch = log.read_range(&topic, from, 1024).await.map_err(|error| {
145 VmError::Runtime(format!(
146 "failed to read agent event replay topic {}: {error}",
147 topic.as_str()
148 ))
149 })?;
150 let batch_len = batch.len();
151 for (event_id, record) in batch {
152 from = Some(event_id);
153 if record.headers.get("session_id").map(String::as_str) != Some(session_id) {
154 continue;
155 }
156 let Some(event_value) = record.payload.get("event").cloned() else {
157 continue;
158 };
159 let event = serde_json::from_value::<AgentEvent>(event_value).map_err(|error| {
160 VmError::Runtime(format!(
161 "failed to decode agent event replay record {event_id}: {error}"
162 ))
163 })?;
164 if event.session_id() == session_id {
165 events.push(AgentSessionReplayEvent {
166 event_id,
167 kind: record.kind,
168 occurred_at_ms: record.occurred_at_ms,
169 event,
170 });
171 }
172 }
173 if batch_len < 1024 {
174 break;
175 }
176 }
177 Ok(events)
178}
179
180pub(super) fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
181 let Some(log) = active_event_log() else {
182 return;
183 };
184 let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
185 .expect("static hitl.questions topic should always be valid");
186 let mut merged = run
187 .hitl_questions
188 .iter()
189 .cloned()
190 .map(|question| (question.request_id.clone(), question))
191 .collect::<BTreeMap<_, _>>();
192
193 for (_, event) in read_topic_records(log.as_ref(), &topic) {
194 if event.kind != "hitl.question_asked" {
195 continue;
196 }
197 let payload = &event.payload;
198 let matches_run = event
199 .headers
200 .get("run_id")
201 .is_some_and(|value| value == &run.id)
202 || payload
203 .get("run_id")
204 .and_then(|value| value.as_str())
205 .is_some_and(|value| value == run.id);
206 if !matches_run {
207 continue;
208 }
209 let request_id = payload
210 .get("request_id")
211 .and_then(|value| value.as_str())
212 .or_else(|| event.headers.get("request_id").map(String::as_str))
213 .unwrap_or_default();
214 let prompt = payload
215 .get("payload")
216 .and_then(|value| value.get("prompt"))
217 .and_then(|value| value.as_str())
218 .unwrap_or_default();
219 if request_id.is_empty() || prompt.is_empty() {
220 continue;
221 }
222 merged.insert(
223 request_id.to_string(),
224 RunHitlQuestionRecord {
225 request_id: request_id.to_string(),
226 prompt: prompt.to_string(),
227 agent: payload
228 .get("agent")
229 .and_then(|value| value.as_str())
230 .unwrap_or_default()
231 .to_string(),
232 trace_id: payload
233 .get("trace_id")
234 .and_then(|value| value.as_str())
235 .map(str::to_string),
236 asked_at: payload
237 .get("requested_at")
238 .and_then(|value| value.as_str())
239 .unwrap_or_default()
240 .to_string(),
241 },
242 );
243 }
244
245 run.hitl_questions = merged.into_values().collect();
246 run.hitl_questions.sort_by(|left, right| {
247 (left.asked_at.as_str(), left.request_id.as_str())
248 .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
249 });
250}
251
252pub(super) fn signature_status_label(status: &SignatureStatus) -> &'static str {
253 match status {
254 SignatureStatus::Verified => "verified",
255 SignatureStatus::Unsigned => "unsigned",
256 SignatureStatus::Failed { .. } => "failed",
257 }
258}
259
260pub(super) fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
261 run.metadata
262 .get("trigger_event")
263 .cloned()
264 .and_then(|value| serde_json::from_value(value).ok())
265}
266
267pub(super) fn run_trace_id(
268 run: &RunRecord,
269 trigger_event: Option<&TriggerEvent>,
270) -> Option<String> {
271 trigger_event
272 .map(|event| event.trace_id.0.clone())
273 .or_else(|| {
274 run.metadata
275 .get("trace_id")
276 .and_then(|value| value.as_str())
277 .map(str::to_string)
278 })
279}
280
281pub(super) fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
282 run.metadata
283 .get("replay_of_event_id")
284 .and_then(|value| value.as_str())
285 .map(str::to_string)
286}
287
288pub(super) fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
289 let stem = run_path.file_stem()?.to_str()?;
290 let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
291 Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
292}
293
294pub(super) fn compaction_events_from_transcript(
295 transcript: &serde_json::Value,
296 stage_id: Option<&str>,
297 node_id: Option<&str>,
298 location_prefix: &str,
299 persisted_path: Option<&Path>,
300) -> Vec<CompactionEventRecord> {
301 use std::collections::BTreeSet;
302 let transcript_id = transcript
303 .get("id")
304 .and_then(|value| value.as_str())
305 .map(str::to_string);
306 let asset_ids = transcript
307 .get("assets")
308 .and_then(|value| value.as_array())
309 .map(|assets| {
310 assets
311 .iter()
312 .filter_map(|asset| {
313 asset
314 .get("id")
315 .and_then(|value| value.as_str())
316 .map(str::to_string)
317 })
318 .collect::<BTreeSet<_>>()
319 })
320 .unwrap_or_default();
321 transcript
322 .get("events")
323 .and_then(|value| value.as_array())
324 .map(|events| {
325 events
326 .iter()
327 .filter(|event| {
328 event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
329 })
330 .map(|event| {
331 let metadata = event.get("metadata");
332 let snapshot_asset_id = metadata
333 .and_then(|value| value.get("snapshot_asset_id"))
334 .and_then(|value| value.as_str())
335 .map(str::to_string);
336 let available = snapshot_asset_id
337 .as_ref()
338 .is_some_and(|asset_id| asset_ids.contains(asset_id));
339 let snapshot_location = snapshot_asset_id
340 .as_ref()
341 .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
342 .unwrap_or_else(|| location_prefix.to_string());
343 CompactionEventRecord {
344 id: event
345 .get("id")
346 .and_then(|value| value.as_str())
347 .unwrap_or_default()
348 .to_string(),
349 transcript_id: transcript_id.clone(),
350 stage_id: stage_id.map(str::to_string),
351 node_id: node_id.map(str::to_string),
352 mode: metadata
353 .and_then(|value| value.get("mode"))
354 .and_then(|value| value.as_str())
355 .unwrap_or_default()
356 .to_string(),
357 strategy: metadata
358 .and_then(|value| value.get("strategy"))
359 .and_then(|value| value.as_str())
360 .unwrap_or_default()
361 .to_string(),
362 archived_messages: json_usize(
363 metadata.and_then(|value| value.get("archived_messages")),
364 ),
365 estimated_tokens_before: json_usize(
366 metadata.and_then(|value| value.get("estimated_tokens_before")),
367 ),
368 estimated_tokens_after: json_usize(
369 metadata.and_then(|value| value.get("estimated_tokens_after")),
370 ),
371 snapshot_asset_id,
372 snapshot_location,
373 snapshot_path: persisted_path
374 .map(|path| path.to_string_lossy().into_owned()),
375 available,
376 instruction_mode: metadata
377 .and_then(|value| value.get("instruction_mode"))
378 .and_then(|value| value.as_str())
379 .unwrap_or_default()
380 .to_string(),
381 instruction_source: metadata
382 .and_then(|value| value.get("instruction_source"))
383 .and_then(|value| value.as_str())
384 .map(str::to_string),
385 compaction_policy: metadata
386 .and_then(|value| value.get("compaction_policy"))
387 .cloned(),
388 }
389 })
390 .collect()
391 })
392 .unwrap_or_default()
393}
394
395pub(super) fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
396 let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
397 return Vec::new();
398 };
399 let Ok(content) = std::fs::read_to_string(sidecar_path) else {
400 return Vec::new();
401 };
402
403 content
404 .lines()
405 .filter(|line| !line.trim().is_empty())
406 .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
407 .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
408 .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
409 .collect()
410}
411
412pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
413 let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
414 if run.type_name.is_empty() {
415 run.type_name = "run_record".to_string();
416 }
417 if run.id.is_empty() {
418 run.id = new_id("run");
419 }
420 if run.started_at.is_empty() {
421 run.started_at = now_rfc3339();
422 }
423 if run.status.is_empty() {
424 run.status = "running".to_string();
425 }
426 if run.root_run_id.is_none() {
427 run.root_run_id = Some(run.id.clone());
428 }
429 if run.replay_fixture.is_none() {
430 run.replay_fixture = Some(replay_fixture_from_run(&run));
431 }
432 merge_hitl_questions_from_active_log(&mut run);
433 materialize_child_runs_from_stage_metadata(&mut run);
434 sync_run_handoffs(&mut run);
435 if run.observability.is_none() {
436 let persisted_path = run.persisted_path.clone();
437 let persisted = persisted_path.as_deref().map(Path::new);
438 refresh_run_observability(&mut run, persisted);
439 }
440 Ok(run)
441}
442
443pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
444 let path = path
445 .map(PathBuf::from)
446 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
447 let mut materialized = run.clone();
448 merge_hitl_questions_from_active_log(&mut materialized);
449 materialize_child_runs_from_stage_metadata(&mut materialized);
450 if materialized.replay_fixture.is_none() {
451 materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
452 }
453 materialized.persisted_path = Some(path.to_string_lossy().into_owned());
454 sync_run_handoffs(&mut materialized);
455 refresh_run_observability(&mut materialized, Some(&path));
456 if let Some(parent) = path.parent() {
457 std::fs::create_dir_all(parent)
458 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
459 }
460 let json = serde_json::to_string_pretty(&materialized)
461 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
462 crate::atomic_io::atomic_write(&path, json.as_bytes())
463 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
464 if let Some(observability) = materialized.observability.as_ref() {
465 publish_action_graph_event(&materialized, observability, &path);
466 }
467 Ok(path.to_string_lossy().into_owned())
468}
469
470pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
471 let content = std::fs::read_to_string(path)
472 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
473 let mut run: RunRecord = serde_json::from_str(&content)
474 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
475 materialize_child_runs_from_stage_metadata(&mut run);
476 if run.replay_fixture.is_none() {
477 run.replay_fixture = Some(replay_fixture_from_run(&run));
478 }
479 run.persisted_path
480 .get_or_insert_with(|| path.to_string_lossy().into_owned());
481 sync_run_handoffs(&mut run);
482 refresh_run_observability(&mut run, Some(path));
483 Ok(run)
484}