Skip to main content

harn_vm/orchestration/records/
persistence.rs

1//! Run-record I/O, child-run materialization, sidecar extraction, and trigger/trace helpers.
2
3use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5
6use super::super::{default_run_dir, new_id, now_rfc3339, parse_json_payload, sync_run_handoffs};
7use super::action_graph::{publish_action_graph_event, refresh_run_observability};
8use super::eval_pack::replay_fixture_from_run;
9use super::json::json_usize;
10use super::types::{
11    run_child_record_from_worker_metadata, CompactionEventRecord, DaemonEventRecord,
12    RunChildRecord, RunHitlQuestionRecord, RunRecord, RunStageRecord,
13};
14use crate::agent_events::AgentEvent;
15use crate::event_log::{
16    active_event_log, sanitize_topic_component, AnyEventLog, EventId, EventLog,
17    LogEvent as EventLogRecord, Topic,
18};
19use crate::llm::vm_value_to_json;
20use crate::triggers::{SignatureStatus, TriggerEvent};
21use crate::value::{VmError, VmValue};
22
23pub(super) fn run_child_from_stage_metadata(stage: &RunStageRecord) -> Option<RunChildRecord> {
24    let parent_stage_id = if stage.id.is_empty() {
25        None
26    } else {
27        Some(stage.id.clone())
28    };
29    run_child_record_from_worker_metadata(parent_stage_id, stage.metadata.get("worker")?)
30}
31
32pub(super) fn fill_missing_child_run_fields(existing: &mut RunChildRecord, child: RunChildRecord) {
33    if existing.worker_name.is_empty() {
34        existing.worker_name = child.worker_name;
35    }
36    if existing.parent_stage_id.is_none() {
37        existing.parent_stage_id = child.parent_stage_id;
38    }
39    if existing.session_id.is_none() {
40        existing.session_id = child.session_id;
41    }
42    if existing.parent_session_id.is_none() {
43        existing.parent_session_id = child.parent_session_id;
44    }
45    if existing.mutation_scope.is_none() {
46        existing.mutation_scope = child.mutation_scope;
47    }
48    if existing.approval_policy.is_none() {
49        existing.approval_policy = child.approval_policy;
50    }
51    if existing.task.is_empty() {
52        existing.task = child.task;
53    }
54    if existing.request.is_none() {
55        existing.request = child.request;
56    }
57    if existing.provenance.is_none() {
58        existing.provenance = child.provenance;
59    }
60    if existing.status.is_empty() {
61        existing.status = child.status;
62    }
63    if existing.started_at.is_empty() {
64        existing.started_at = child.started_at;
65    }
66    if existing.finished_at.is_none() {
67        existing.finished_at = child.finished_at;
68    }
69    if existing.run_id.is_none() {
70        existing.run_id = child.run_id;
71    }
72    if existing.run_path.is_none() {
73        existing.run_path = child.run_path;
74    }
75    if existing.snapshot_path.is_none() {
76        existing.snapshot_path = child.snapshot_path;
77    }
78    if existing.execution.is_none() {
79        existing.execution = child.execution;
80    }
81}
82
83pub(super) fn materialize_child_runs_from_stage_metadata(run: &mut RunRecord) {
84    for child in run
85        .stages
86        .iter()
87        .filter_map(run_child_from_stage_metadata)
88        .collect::<Vec<_>>()
89    {
90        match run
91            .child_runs
92            .iter_mut()
93            .find(|existing| existing.worker_id == child.worker_id)
94        {
95            Some(existing) => fill_missing_child_run_fields(existing, child),
96            None => run.child_runs.push(child),
97        }
98    }
99}
100
101pub(super) fn read_topic_records(
102    log: &AnyEventLog,
103    topic: &Topic,
104) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
105    let mut from = None;
106    let mut records = Vec::new();
107    loop {
108        let batch =
109            futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
110        if batch.is_empty() {
111            break;
112        }
113        from = batch.last().map(|(event_id, _)| *event_id);
114        records.extend(batch);
115    }
116    records
117}
118
119#[derive(Clone, Debug)]
120pub struct AgentSessionReplayEvent {
121    pub event_id: EventId,
122    pub event: AgentEvent,
123}
124
125pub async fn load_agent_session_replay_events(
126    session_id: &str,
127) -> Result<Vec<AgentSessionReplayEvent>, VmError> {
128    let Some(log) = active_event_log() else {
129        return Ok(Vec::new());
130    };
131    let topic = Topic::new(format!(
132        "observability.agent_events.{}",
133        sanitize_topic_component(session_id)
134    ))
135    .map_err(|error| VmError::Runtime(format!("failed to build agent event topic: {error}")))?;
136
137    let mut events = Vec::new();
138    let mut from = None;
139    loop {
140        let batch = log.read_range(&topic, from, 1024).await.map_err(|error| {
141            VmError::Runtime(format!(
142                "failed to read agent event replay topic {}: {error}",
143                topic.as_str()
144            ))
145        })?;
146        let batch_len = batch.len();
147        for (event_id, record) in batch {
148            from = Some(event_id);
149            if record.headers.get("session_id").map(String::as_str) != Some(session_id) {
150                continue;
151            }
152            let Some(event_value) = record.payload.get("event").cloned() else {
153                continue;
154            };
155            let event = serde_json::from_value::<AgentEvent>(event_value).map_err(|error| {
156                VmError::Runtime(format!(
157                    "failed to decode agent event replay record {event_id}: {error}"
158                ))
159            })?;
160            if event.session_id() == session_id {
161                events.push(AgentSessionReplayEvent { event_id, event });
162            }
163        }
164        if batch_len < 1024 {
165            break;
166        }
167    }
168    Ok(events)
169}
170
171pub(super) fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
172    let Some(log) = active_event_log() else {
173        return;
174    };
175    let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
176        .expect("static hitl.questions topic should always be valid");
177    let mut merged = run
178        .hitl_questions
179        .iter()
180        .cloned()
181        .map(|question| (question.request_id.clone(), question))
182        .collect::<BTreeMap<_, _>>();
183
184    for (_, event) in read_topic_records(log.as_ref(), &topic) {
185        if event.kind != "hitl.question_asked" {
186            continue;
187        }
188        let payload = &event.payload;
189        let matches_run = event
190            .headers
191            .get("run_id")
192            .is_some_and(|value| value == &run.id)
193            || payload
194                .get("run_id")
195                .and_then(|value| value.as_str())
196                .is_some_and(|value| value == run.id);
197        if !matches_run {
198            continue;
199        }
200        let request_id = payload
201            .get("request_id")
202            .and_then(|value| value.as_str())
203            .or_else(|| event.headers.get("request_id").map(String::as_str))
204            .unwrap_or_default();
205        let prompt = payload
206            .get("payload")
207            .and_then(|value| value.get("prompt"))
208            .and_then(|value| value.as_str())
209            .unwrap_or_default();
210        if request_id.is_empty() || prompt.is_empty() {
211            continue;
212        }
213        merged.insert(
214            request_id.to_string(),
215            RunHitlQuestionRecord {
216                request_id: request_id.to_string(),
217                prompt: prompt.to_string(),
218                agent: payload
219                    .get("agent")
220                    .and_then(|value| value.as_str())
221                    .unwrap_or_default()
222                    .to_string(),
223                trace_id: payload
224                    .get("trace_id")
225                    .and_then(|value| value.as_str())
226                    .map(str::to_string),
227                asked_at: payload
228                    .get("requested_at")
229                    .and_then(|value| value.as_str())
230                    .unwrap_or_default()
231                    .to_string(),
232            },
233        );
234    }
235
236    run.hitl_questions = merged.into_values().collect();
237    run.hitl_questions.sort_by(|left, right| {
238        (left.asked_at.as_str(), left.request_id.as_str())
239            .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
240    });
241}
242
243pub(super) fn signature_status_label(status: &SignatureStatus) -> &'static str {
244    match status {
245        SignatureStatus::Verified => "verified",
246        SignatureStatus::Unsigned => "unsigned",
247        SignatureStatus::Failed { .. } => "failed",
248    }
249}
250
251pub(super) fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
252    run.metadata
253        .get("trigger_event")
254        .cloned()
255        .and_then(|value| serde_json::from_value(value).ok())
256}
257
258pub(super) fn run_trace_id(
259    run: &RunRecord,
260    trigger_event: Option<&TriggerEvent>,
261) -> Option<String> {
262    trigger_event
263        .map(|event| event.trace_id.0.clone())
264        .or_else(|| {
265            run.metadata
266                .get("trace_id")
267                .and_then(|value| value.as_str())
268                .map(str::to_string)
269        })
270}
271
272pub(super) fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
273    run.metadata
274        .get("replay_of_event_id")
275        .and_then(|value| value.as_str())
276        .map(str::to_string)
277}
278
279pub(super) fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
280    let stem = run_path.file_stem()?.to_str()?;
281    let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
282    Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
283}
284
285pub(super) fn compaction_events_from_transcript(
286    transcript: &serde_json::Value,
287    stage_id: Option<&str>,
288    node_id: Option<&str>,
289    location_prefix: &str,
290    persisted_path: Option<&Path>,
291) -> Vec<CompactionEventRecord> {
292    use std::collections::BTreeSet;
293    let transcript_id = transcript
294        .get("id")
295        .and_then(|value| value.as_str())
296        .map(str::to_string);
297    let asset_ids = transcript
298        .get("assets")
299        .and_then(|value| value.as_array())
300        .map(|assets| {
301            assets
302                .iter()
303                .filter_map(|asset| {
304                    asset
305                        .get("id")
306                        .and_then(|value| value.as_str())
307                        .map(str::to_string)
308                })
309                .collect::<BTreeSet<_>>()
310        })
311        .unwrap_or_default();
312    transcript
313        .get("events")
314        .and_then(|value| value.as_array())
315        .map(|events| {
316            events
317                .iter()
318                .filter(|event| {
319                    event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
320                })
321                .map(|event| {
322                    let metadata = event.get("metadata");
323                    let snapshot_asset_id = metadata
324                        .and_then(|value| value.get("snapshot_asset_id"))
325                        .and_then(|value| value.as_str())
326                        .map(str::to_string);
327                    let available = snapshot_asset_id
328                        .as_ref()
329                        .is_some_and(|asset_id| asset_ids.contains(asset_id));
330                    let snapshot_location = snapshot_asset_id
331                        .as_ref()
332                        .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
333                        .unwrap_or_else(|| location_prefix.to_string());
334                    CompactionEventRecord {
335                        id: event
336                            .get("id")
337                            .and_then(|value| value.as_str())
338                            .unwrap_or_default()
339                            .to_string(),
340                        transcript_id: transcript_id.clone(),
341                        stage_id: stage_id.map(str::to_string),
342                        node_id: node_id.map(str::to_string),
343                        mode: metadata
344                            .and_then(|value| value.get("mode"))
345                            .and_then(|value| value.as_str())
346                            .unwrap_or_default()
347                            .to_string(),
348                        strategy: metadata
349                            .and_then(|value| value.get("strategy"))
350                            .and_then(|value| value.as_str())
351                            .unwrap_or_default()
352                            .to_string(),
353                        archived_messages: json_usize(
354                            metadata.and_then(|value| value.get("archived_messages")),
355                        ),
356                        estimated_tokens_before: json_usize(
357                            metadata.and_then(|value| value.get("estimated_tokens_before")),
358                        ),
359                        estimated_tokens_after: json_usize(
360                            metadata.and_then(|value| value.get("estimated_tokens_after")),
361                        ),
362                        snapshot_asset_id,
363                        snapshot_location,
364                        snapshot_path: persisted_path
365                            .map(|path| path.to_string_lossy().into_owned()),
366                        available,
367                    }
368                })
369                .collect()
370        })
371        .unwrap_or_default()
372}
373
374pub(super) fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
375    let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
376        return Vec::new();
377    };
378    let Ok(content) = std::fs::read_to_string(sidecar_path) else {
379        return Vec::new();
380    };
381
382    content
383        .lines()
384        .filter(|line| !line.trim().is_empty())
385        .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
386        .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
387        .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
388        .collect()
389}
390
391pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
392    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
393    if run.type_name.is_empty() {
394        run.type_name = "run_record".to_string();
395    }
396    if run.id.is_empty() {
397        run.id = new_id("run");
398    }
399    if run.started_at.is_empty() {
400        run.started_at = now_rfc3339();
401    }
402    if run.status.is_empty() {
403        run.status = "running".to_string();
404    }
405    if run.root_run_id.is_none() {
406        run.root_run_id = Some(run.id.clone());
407    }
408    if run.replay_fixture.is_none() {
409        run.replay_fixture = Some(replay_fixture_from_run(&run));
410    }
411    merge_hitl_questions_from_active_log(&mut run);
412    materialize_child_runs_from_stage_metadata(&mut run);
413    sync_run_handoffs(&mut run);
414    if run.observability.is_none() {
415        let persisted_path = run.persisted_path.clone();
416        let persisted = persisted_path.as_deref().map(Path::new);
417        refresh_run_observability(&mut run, persisted);
418    }
419    Ok(run)
420}
421
422pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
423    let path = path
424        .map(PathBuf::from)
425        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
426    let mut materialized = run.clone();
427    merge_hitl_questions_from_active_log(&mut materialized);
428    materialize_child_runs_from_stage_metadata(&mut materialized);
429    if materialized.replay_fixture.is_none() {
430        materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
431    }
432    materialized.persisted_path = Some(path.to_string_lossy().into_owned());
433    sync_run_handoffs(&mut materialized);
434    refresh_run_observability(&mut materialized, Some(&path));
435    if let Some(parent) = path.parent() {
436        std::fs::create_dir_all(parent)
437            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
438    }
439    let json = serde_json::to_string_pretty(&materialized)
440        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
441    crate::atomic_io::atomic_write(&path, json.as_bytes())
442        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
443    if let Some(observability) = materialized.observability.as_ref() {
444        publish_action_graph_event(&materialized, observability, &path);
445    }
446    Ok(path.to_string_lossy().into_owned())
447}
448
449pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
450    let content = std::fs::read_to_string(path)
451        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
452    let mut run: RunRecord = serde_json::from_str(&content)
453        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
454    materialize_child_runs_from_stage_metadata(&mut run);
455    if run.replay_fixture.is_none() {
456        run.replay_fixture = Some(replay_fixture_from_run(&run));
457    }
458    run.persisted_path
459        .get_or_insert_with(|| path.to_string_lossy().into_owned());
460    sync_run_handoffs(&mut run);
461    refresh_run_observability(&mut run, Some(path));
462    Ok(run)
463}