Skip to main content

harn_vm/orchestration/records/
persistence.rs

1//! Run-record I/O, child-run materialization, sidecar extraction, and trigger/trace helpers.
2
3use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5
6use super::super::{default_run_dir, new_id, now_rfc3339, parse_json_payload, sync_run_handoffs};
7use super::action_graph::{publish_action_graph_event, refresh_run_observability};
8use super::eval_pack::replay_fixture_from_run;
9use super::json::json_usize;
10use super::types::{
11    run_child_record_from_worker_metadata, CompactionEventRecord, DaemonEventRecord,
12    RunChildRecord, RunHitlQuestionRecord, RunRecord, RunStageRecord,
13};
14use crate::agent_events::AgentEvent;
15use crate::event_log::{
16    active_event_log, sanitize_topic_component, AnyEventLog, EventId, EventLog,
17    LogEvent as EventLogRecord, Topic,
18};
19use crate::llm::vm_value_to_json;
20use crate::triggers::{SignatureStatus, TriggerEvent};
21use crate::value::{VmError, VmValue};
22
23pub(super) fn run_child_from_stage_metadata(stage: &RunStageRecord) -> Option<RunChildRecord> {
24    let parent_stage_id = if stage.id.is_empty() {
25        None
26    } else {
27        Some(stage.id.clone())
28    };
29    run_child_record_from_worker_metadata(parent_stage_id, stage.metadata.get("worker")?)
30}
31
32pub(super) fn fill_missing_child_run_fields(existing: &mut RunChildRecord, child: RunChildRecord) {
33    if existing.worker_name.is_empty() {
34        existing.worker_name = child.worker_name;
35    }
36    if existing.parent_stage_id.is_none() {
37        existing.parent_stage_id = child.parent_stage_id;
38    }
39    if existing.session_id.is_none() {
40        existing.session_id = child.session_id;
41    }
42    if existing.parent_session_id.is_none() {
43        existing.parent_session_id = child.parent_session_id;
44    }
45    if existing.mutation_scope.is_none() {
46        existing.mutation_scope = child.mutation_scope;
47    }
48    if existing.approval_policy.is_none() {
49        existing.approval_policy = child.approval_policy;
50    }
51    if existing.task.is_empty() {
52        existing.task = child.task;
53    }
54    if existing.request.is_none() {
55        existing.request = child.request;
56    }
57    if existing.provenance.is_none() {
58        existing.provenance = child.provenance;
59    }
60    if existing.status.is_empty() {
61        existing.status = child.status;
62    }
63    if existing.started_at.is_empty() {
64        existing.started_at = child.started_at;
65    }
66    if existing.finished_at.is_none() {
67        existing.finished_at = child.finished_at;
68    }
69    if existing.run_id.is_none() {
70        existing.run_id = child.run_id;
71    }
72    if existing.run_path.is_none() {
73        existing.run_path = child.run_path;
74    }
75    if existing.snapshot_path.is_none() {
76        existing.snapshot_path = child.snapshot_path;
77    }
78    if existing.execution.is_none() {
79        existing.execution = child.execution;
80    }
81}
82
83pub(super) fn materialize_child_runs_from_stage_metadata(run: &mut RunRecord) {
84    for child in run.stages.iter().filter_map(run_child_from_stage_metadata) {
85        match run
86            .child_runs
87            .iter_mut()
88            .find(|existing| existing.worker_id == child.worker_id)
89        {
90            Some(existing) => fill_missing_child_run_fields(existing, child),
91            None => run.child_runs.push(child),
92        }
93    }
94}
95
96pub(super) fn read_topic_records(
97    log: &AnyEventLog,
98    topic: &Topic,
99) -> Vec<(crate::event_log::EventId, EventLogRecord)> {
100    let mut from = None;
101    let mut records = Vec::new();
102    loop {
103        let batch =
104            futures::executor::block_on(log.read_range(topic, from, 256)).unwrap_or_default();
105        if batch.is_empty() {
106            break;
107        }
108        from = batch.last().map(|(event_id, _)| *event_id);
109        records.extend(batch);
110    }
111    records
112}
113
114#[derive(Clone, Debug)]
115pub struct AgentSessionReplayEvent {
116    pub event_id: EventId,
117    pub kind: String,
118    pub occurred_at_ms: i64,
119    pub event: AgentEvent,
120}
121
122pub async fn load_agent_session_replay_events(
123    session_id: &str,
124) -> Result<Vec<AgentSessionReplayEvent>, VmError> {
125    let Some(log) = active_event_log() else {
126        return Ok(Vec::new());
127    };
128    load_agent_session_replay_events_from_log(log.as_ref(), session_id).await
129}
130
131pub async fn load_agent_session_replay_events_from_log(
132    log: &AnyEventLog,
133    session_id: &str,
134) -> Result<Vec<AgentSessionReplayEvent>, VmError> {
135    let topic = Topic::new(format!(
136        "observability.agent_events.{}",
137        sanitize_topic_component(session_id)
138    ))
139    .map_err(|error| VmError::Runtime(format!("failed to build agent event topic: {error}")))?;
140
141    let mut events = Vec::new();
142    let mut from = None;
143    loop {
144        let batch = log.read_range(&topic, from, 1024).await.map_err(|error| {
145            VmError::Runtime(format!(
146                "failed to read agent event replay topic {}: {error}",
147                topic.as_str()
148            ))
149        })?;
150        let batch_len = batch.len();
151        for (event_id, record) in batch {
152            from = Some(event_id);
153            if record.headers.get("session_id").map(String::as_str) != Some(session_id) {
154                continue;
155            }
156            let Some(event_value) = record.payload.get("event").cloned() else {
157                continue;
158            };
159            let event = serde_json::from_value::<AgentEvent>(event_value).map_err(|error| {
160                VmError::Runtime(format!(
161                    "failed to decode agent event replay record {event_id}: {error}"
162                ))
163            })?;
164            if event.session_id() == session_id {
165                events.push(AgentSessionReplayEvent {
166                    event_id,
167                    kind: record.kind,
168                    occurred_at_ms: record.occurred_at_ms,
169                    event,
170                });
171            }
172        }
173        if batch_len < 1024 {
174            break;
175        }
176    }
177    Ok(events)
178}
179
180pub(super) fn merge_hitl_questions_from_active_log(run: &mut RunRecord) {
181    let Some(log) = active_event_log() else {
182        return;
183    };
184    let topic = Topic::new(crate::HITL_QUESTIONS_TOPIC)
185        .expect("static hitl.questions topic should always be valid");
186    let mut merged = run
187        .hitl_questions
188        .iter()
189        .cloned()
190        .map(|question| (question.request_id.clone(), question))
191        .collect::<BTreeMap<_, _>>();
192
193    for (_, event) in read_topic_records(log.as_ref(), &topic) {
194        if event.kind != "hitl.question_asked" {
195            continue;
196        }
197        let payload = &event.payload;
198        let matches_run = event
199            .headers
200            .get("run_id")
201            .is_some_and(|value| value == &run.id)
202            || payload
203                .get("run_id")
204                .and_then(|value| value.as_str())
205                .is_some_and(|value| value == run.id);
206        if !matches_run {
207            continue;
208        }
209        let request_id = payload
210            .get("request_id")
211            .and_then(|value| value.as_str())
212            .or_else(|| event.headers.get("request_id").map(String::as_str))
213            .unwrap_or_default();
214        let prompt = payload
215            .get("payload")
216            .and_then(|value| value.get("prompt"))
217            .and_then(|value| value.as_str())
218            .unwrap_or_default();
219        if request_id.is_empty() || prompt.is_empty() {
220            continue;
221        }
222        merged.insert(
223            request_id.to_string(),
224            RunHitlQuestionRecord {
225                request_id: request_id.to_string(),
226                prompt: prompt.to_string(),
227                agent: payload
228                    .get("agent")
229                    .and_then(|value| value.as_str())
230                    .unwrap_or_default()
231                    .to_string(),
232                trace_id: payload
233                    .get("trace_id")
234                    .and_then(|value| value.as_str())
235                    .map(str::to_string),
236                asked_at: payload
237                    .get("requested_at")
238                    .and_then(|value| value.as_str())
239                    .unwrap_or_default()
240                    .to_string(),
241            },
242        );
243    }
244
245    run.hitl_questions = merged.into_values().collect();
246    run.hitl_questions.sort_by(|left, right| {
247        (left.asked_at.as_str(), left.request_id.as_str())
248            .cmp(&(right.asked_at.as_str(), right.request_id.as_str()))
249    });
250}
251
252pub(super) fn signature_status_label(status: &SignatureStatus) -> &'static str {
253    match status {
254        SignatureStatus::Verified => "verified",
255        SignatureStatus::Unsigned => "unsigned",
256        SignatureStatus::Failed { .. } => "failed",
257    }
258}
259
260pub(super) fn trigger_event_from_run(run: &RunRecord) -> Option<TriggerEvent> {
261    run.metadata
262        .get("trigger_event")
263        .cloned()
264        .and_then(|value| serde_json::from_value(value).ok())
265}
266
267pub(super) fn run_trace_id(
268    run: &RunRecord,
269    trigger_event: Option<&TriggerEvent>,
270) -> Option<String> {
271    trigger_event
272        .map(|event| event.trace_id.0.clone())
273        .or_else(|| {
274            run.metadata
275                .get("trace_id")
276                .and_then(|value| value.as_str())
277                .map(str::to_string)
278        })
279}
280
281pub(super) fn replay_of_event_id_from_run(run: &RunRecord) -> Option<String> {
282    run.metadata
283        .get("replay_of_event_id")
284        .and_then(|value| value.as_str())
285        .map(str::to_string)
286}
287
288pub(super) fn llm_transcript_sidecar_path(run_path: &Path) -> Option<PathBuf> {
289    let stem = run_path.file_stem()?.to_str()?;
290    let parent = run_path.parent().unwrap_or_else(|| Path::new("."));
291    Some(parent.join(format!("{stem}-llm/llm_transcript.jsonl")))
292}
293
294pub(super) fn compaction_events_from_transcript(
295    transcript: &serde_json::Value,
296    stage_id: Option<&str>,
297    node_id: Option<&str>,
298    location_prefix: &str,
299    persisted_path: Option<&Path>,
300) -> Vec<CompactionEventRecord> {
301    use std::collections::BTreeSet;
302    let transcript_id = transcript
303        .get("id")
304        .and_then(|value| value.as_str())
305        .map(str::to_string);
306    let asset_ids = transcript
307        .get("assets")
308        .and_then(|value| value.as_array())
309        .map(|assets| {
310            assets
311                .iter()
312                .filter_map(|asset| {
313                    asset
314                        .get("id")
315                        .and_then(|value| value.as_str())
316                        .map(str::to_string)
317                })
318                .collect::<BTreeSet<_>>()
319        })
320        .unwrap_or_default();
321    transcript
322        .get("events")
323        .and_then(|value| value.as_array())
324        .map(|events| {
325            events
326                .iter()
327                .filter(|event| {
328                    event.get("kind").and_then(|value| value.as_str()) == Some("compaction")
329                })
330                .map(|event| {
331                    let metadata = event.get("metadata");
332                    let snapshot_asset_id = metadata
333                        .and_then(|value| value.get("snapshot_asset_id"))
334                        .and_then(|value| value.as_str())
335                        .map(str::to_string);
336                    let available = snapshot_asset_id
337                        .as_ref()
338                        .is_some_and(|asset_id| asset_ids.contains(asset_id));
339                    let snapshot_location = snapshot_asset_id
340                        .as_ref()
341                        .map(|asset_id| format!("{location_prefix}.assets[{asset_id}]"))
342                        .unwrap_or_else(|| location_prefix.to_string());
343                    CompactionEventRecord {
344                        id: event
345                            .get("id")
346                            .and_then(|value| value.as_str())
347                            .unwrap_or_default()
348                            .to_string(),
349                        transcript_id: transcript_id.clone(),
350                        stage_id: stage_id.map(str::to_string),
351                        node_id: node_id.map(str::to_string),
352                        mode: metadata
353                            .and_then(|value| value.get("mode"))
354                            .and_then(|value| value.as_str())
355                            .unwrap_or_default()
356                            .to_string(),
357                        strategy: metadata
358                            .and_then(|value| value.get("strategy"))
359                            .and_then(|value| value.as_str())
360                            .unwrap_or_default()
361                            .to_string(),
362                        archived_messages: json_usize(
363                            metadata.and_then(|value| value.get("archived_messages")),
364                        ),
365                        estimated_tokens_before: json_usize(
366                            metadata.and_then(|value| value.get("estimated_tokens_before")),
367                        ),
368                        estimated_tokens_after: json_usize(
369                            metadata.and_then(|value| value.get("estimated_tokens_after")),
370                        ),
371                        snapshot_asset_id,
372                        snapshot_location,
373                        snapshot_path: persisted_path
374                            .map(|path| path.to_string_lossy().into_owned()),
375                        available,
376                        instruction_mode: metadata
377                            .and_then(|value| value.get("instruction_mode"))
378                            .and_then(|value| value.as_str())
379                            .unwrap_or_default()
380                            .to_string(),
381                        instruction_source: metadata
382                            .and_then(|value| value.get("instruction_source"))
383                            .and_then(|value| value.as_str())
384                            .map(str::to_string),
385                        compaction_policy: metadata
386                            .and_then(|value| value.get("compaction_policy"))
387                            .cloned(),
388                    }
389                })
390                .collect()
391        })
392        .unwrap_or_default()
393}
394
395pub(super) fn daemon_events_from_sidecar(run_path: &Path) -> Vec<DaemonEventRecord> {
396    let Some(sidecar_path) = llm_transcript_sidecar_path(run_path) else {
397        return Vec::new();
398    };
399    let Ok(content) = std::fs::read_to_string(sidecar_path) else {
400        return Vec::new();
401    };
402
403    content
404        .lines()
405        .filter(|line| !line.trim().is_empty())
406        .filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
407        .filter(|event| event.get("type").and_then(|value| value.as_str()) == Some("daemon_event"))
408        .filter_map(|event| serde_json::from_value::<DaemonEventRecord>(event).ok())
409        .collect()
410}
411
412pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
413    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
414    if run.type_name.is_empty() {
415        run.type_name = "run_record".to_string();
416    }
417    if run.id.is_empty() {
418        run.id = new_id("run");
419    }
420    if run.started_at.is_empty() {
421        run.started_at = now_rfc3339();
422    }
423    if run.status.is_empty() {
424        run.status = "running".to_string();
425    }
426    if run.root_run_id.is_none() {
427        run.root_run_id = Some(run.id.clone());
428    }
429    if run.replay_fixture.is_none() {
430        run.replay_fixture = Some(replay_fixture_from_run(&run));
431    }
432    merge_hitl_questions_from_active_log(&mut run);
433    materialize_child_runs_from_stage_metadata(&mut run);
434    sync_run_handoffs(&mut run);
435    if run.observability.is_none() {
436        let persisted_path = run.persisted_path.clone();
437        let persisted = persisted_path.as_deref().map(Path::new);
438        refresh_run_observability(&mut run, persisted);
439    }
440    Ok(run)
441}
442
443pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
444    let path = path
445        .map(PathBuf::from)
446        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
447    let mut materialized = run.clone();
448    merge_hitl_questions_from_active_log(&mut materialized);
449    materialize_child_runs_from_stage_metadata(&mut materialized);
450    if materialized.replay_fixture.is_none() {
451        materialized.replay_fixture = Some(replay_fixture_from_run(&materialized));
452    }
453    materialized.persisted_path = Some(path.to_string_lossy().into_owned());
454    sync_run_handoffs(&mut materialized);
455    refresh_run_observability(&mut materialized, Some(&path));
456    if let Some(parent) = path.parent() {
457        std::fs::create_dir_all(parent)
458            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
459    }
460    let json = serde_json::to_string_pretty(&materialized)
461        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
462    crate::atomic_io::atomic_write(&path, json.as_bytes())
463        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
464    if let Some(observability) = materialized.observability.as_ref() {
465        publish_action_graph_event(&materialized, observability, &path);
466    }
467    Ok(path.to_string_lossy().into_owned())
468}
469
470pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
471    let content = std::fs::read_to_string(path)
472        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
473    let mut run: RunRecord = serde_json::from_str(&content)
474        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))?;
475    materialize_child_runs_from_stage_metadata(&mut run);
476    if run.replay_fixture.is_none() {
477        run.replay_fixture = Some(replay_fixture_from_run(&run));
478    }
479    run.persisted_path
480        .get_or_insert_with(|| path.to_string_lossy().into_owned());
481    sync_run_handoffs(&mut run);
482    refresh_run_observability(&mut run, Some(path));
483    Ok(run)
484}