Skip to main content

harn_vm/
session_bundle.rs

1//! Canonical session bundle export/import support.
2//!
3//! A session bundle is the portable JSON envelope that downstream hosts use
4//! for local exports, sanitized shares, and replay-only handoffs. The bundle
5//! is intentionally built from existing Harn run-record surfaces so replay,
6//! transcripts, tool calls, HITL questions, and observability do not fork into
7//! a second persistence model.
8
9use std::borrow::Cow;
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12use std::fs;
13use std::path::Path;
14
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value as JsonValue};
18
19use crate::agent_events::AgentEvent;
20use crate::event_log::sanitize_topic_component;
21use crate::orchestration::{
22    derive_run_observability, new_id, now_rfc3339, AgentSessionReplayEvent, ReplayFixture,
23    RunCheckpointRecord, RunChildRecord, RunExecutionRecord, RunHitlQuestionRecord,
24    RunObservabilityRecord, RunRecord, RunTraceSpanRecord, RunTransitionRecord,
25    RunVerificationOutcomeRecord, RunWorkerLineageRecord, ToolCallRecord,
26};
27use crate::redact::{RedactionPolicy, REDACTED_PLACEHOLDER};
28use crate::workspace_anchor::{anchor_from_transcript_metadata_json, MountedRoot, WorkspaceAnchor};
29
30mod schema;
31pub use schema::{session_bundle_schema, session_bundle_schema_pretty};
32
33#[cfg(test)]
34mod tests;
35
36pub const SESSION_BUNDLE_TYPE: &str = "harn_session_bundle";
37pub const SESSION_BUNDLE_SCHEMA_VERSION: u32 = 1;
38pub const SESSION_BUNDLE_SCHEMA_ID: &str = "https://harnlang.com/schemas/session-bundle.v1.json";
39pub const REPLAY_ONLY_PLACEHOLDER: &str = "[withheld]";
40
41/// Bundle `source.status` for a session reconstructed from an event stream that
42/// has no terminal `SessionClosed` event — a live loop frozen mid-turn for
43/// cross-compute migration (#3682), or a time-traveled prefix truncated before
44/// the close. The next action is still pending, so a consumer must CONTINUE the
45/// loop, not treat it as finished.
46pub const SESSION_BUNDLE_STATUS_SUSPENDED: &str = "suspended";
47
48/// Bundle `source.status` fallback for a session that closed without an
49/// explicit status on its terminal `SessionClosed` event.
50pub const SESSION_BUNDLE_STATUS_COMPLETED: &str = "completed";
51
52/// `metadata` key carrying the machine-readable liveness verdict
53/// (`"suspended"` | `"closed"`) so a resume host can decide continue-vs-replay
54/// without string-matching the human-facing `status`.
55pub const SESSION_BUNDLE_LIVENESS_KEY: &str = "session_liveness";
56
57/// Liveness of an agent session reconstructed from its event log.
58///
59/// Derived purely from the event stream: a session is [`Closed`] only when the
60/// stream contains a terminal `SessionClosed` event. Any other stream is
61/// [`Suspended`] — its next action is still pending.
62///
63/// [`Closed`]: AgentSessionLiveness::Closed
64/// [`Suspended`]: AgentSessionLiveness::Suspended
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub enum AgentSessionLiveness {
67    /// The stream ends in a `SessionClosed` event. `status` is that event's
68    /// status, or [`SESSION_BUNDLE_STATUS_COMPLETED`] when it was empty.
69    Closed { status: String, finished_at_ms: i64 },
70    /// No terminal `SessionClosed` event: the loop is frozen mid-flight and a
71    /// consumer must resume it rather than replay it as a finished run.
72    Suspended,
73}
74
75impl AgentSessionLiveness {
76    /// The human-facing `source.status` string for this liveness.
77    pub fn status(&self) -> &str {
78        match self {
79            AgentSessionLiveness::Closed { status, .. } => status,
80            AgentSessionLiveness::Suspended => SESSION_BUNDLE_STATUS_SUSPENDED,
81        }
82    }
83
84    /// The machine-readable [`SESSION_BUNDLE_LIVENESS_KEY`] tag.
85    pub fn tag(&self) -> &'static str {
86        match self {
87            AgentSessionLiveness::Closed { .. } => "closed",
88            AgentSessionLiveness::Suspended => "suspended",
89        }
90    }
91
92    /// Whether a consumer must CONTINUE the loop (vs. treat it as finished).
93    pub fn is_suspended(&self) -> bool {
94        matches!(self, AgentSessionLiveness::Suspended)
95    }
96}
97
98/// Classify an agent session's liveness from its replay event stream by finding
99/// the last terminal `SessionClosed` event. The keystone discriminator behind
100/// portable suspend/resume (#3682): without it, a frozen-mid-flight or
101/// time-traveled session is silently mislabeled `completed` and a resume host
102/// replays it instead of continuing the pending turn.
103pub fn agent_session_liveness(events: &[AgentSessionReplayEvent]) -> AgentSessionLiveness {
104    events
105        .iter()
106        .rev()
107        .find_map(|entry| match &entry.event {
108            AgentEvent::SessionClosed { status, .. } => Some(AgentSessionLiveness::Closed {
109                status: if status.is_empty() {
110                    SESSION_BUNDLE_STATUS_COMPLETED.to_string()
111                } else {
112                    status.clone()
113                },
114                finished_at_ms: entry.occurred_at_ms,
115            }),
116            _ => None,
117        })
118        .unwrap_or(AgentSessionLiveness::Suspended)
119}
120
121#[derive(Clone, Copy, Debug, Eq, PartialEq)]
122pub enum SessionBundleExportMode {
123    Local,
124    Sanitized,
125    ReplayOnly,
126}
127
128impl SessionBundleExportMode {
129    pub fn as_str(self) -> &'static str {
130        match self {
131            Self::Local => "local",
132            Self::Sanitized => "sanitized",
133            Self::ReplayOnly => "replay_only",
134        }
135    }
136}
137
138#[derive(Clone, Debug)]
139pub struct SessionBundleExportOptions {
140    pub mode: SessionBundleExportMode,
141    pub include_attachments: bool,
142    pub redaction_policy: RedactionPolicy,
143}
144
145impl Default for SessionBundleExportOptions {
146    fn default() -> Self {
147        Self {
148            mode: SessionBundleExportMode::Sanitized,
149            include_attachments: false,
150            redaction_policy: RedactionPolicy::default(),
151        }
152    }
153}
154
155#[derive(Clone, Debug, Default)]
156pub struct SessionBundleValidationOptions {
157    pub allow_unsafe_secret_markers: bool,
158    pub redaction_policy: RedactionPolicy,
159}
160
161#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
162#[serde(default)]
163pub struct SessionBundle {
164    #[serde(rename = "_type")]
165    pub type_name: String,
166    pub schema_version: u32,
167    pub bundle_id: String,
168    pub created_at: String,
169    pub producer: BundleProducer,
170    pub source: BundleSource,
171    pub runtime: BundleRuntime,
172    pub workspace: Option<BundleWorkspace>,
173    pub transcript: BundleTranscript,
174    pub tools: BundleTools,
175    pub permissions: Vec<BundlePermission>,
176    pub replay: BundleReplay,
177    pub redaction: RedactionManifest,
178    pub attachments: Vec<BundleAttachment>,
179    pub metadata: BTreeMap<String, JsonValue>,
180}
181
182impl Default for SessionBundle {
183    fn default() -> Self {
184        Self {
185            type_name: SESSION_BUNDLE_TYPE.to_string(),
186            schema_version: SESSION_BUNDLE_SCHEMA_VERSION,
187            bundle_id: String::new(),
188            created_at: String::new(),
189            producer: BundleProducer::default(),
190            source: BundleSource::default(),
191            runtime: BundleRuntime::default(),
192            workspace: None,
193            transcript: BundleTranscript::default(),
194            tools: BundleTools::default(),
195            permissions: Vec::new(),
196            replay: BundleReplay::default(),
197            redaction: RedactionManifest::default(),
198            attachments: Vec::new(),
199            metadata: BTreeMap::new(),
200        }
201    }
202}
203
204#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
205#[serde(default)]
206pub struct BundleProducer {
207    pub name: String,
208    pub version: String,
209    pub schema_id: String,
210}
211
212#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
213#[serde(default)]
214pub struct BundleSource {
215    pub kind: String,
216    pub run_record_id: String,
217    pub workflow_id: String,
218    pub workflow_name: Option<String>,
219    pub task: String,
220    pub status: String,
221    pub started_at: String,
222    pub finished_at: Option<String>,
223    pub persisted_path: Option<String>,
224    pub root_run_id: Option<String>,
225    pub parent_run_id: Option<String>,
226    pub child_run_count: usize,
227}
228
229#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
230#[serde(default)]
231pub struct BundleRuntime {
232    pub harn_version: String,
233    pub provider_models: Vec<String>,
234    pub usage: Option<BundleUsage>,
235    pub metadata: BTreeMap<String, JsonValue>,
236}
237
238#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
239#[serde(default)]
240pub struct BundleUsage {
241    pub input_tokens: i64,
242    pub output_tokens: i64,
243    pub call_count: i64,
244    pub total_duration_ms: i64,
245    pub total_cost: f64,
246    pub models: Vec<String>,
247}
248
249#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
250#[serde(default)]
251pub struct BundleWorkspace {
252    /// Primary workspace path the session is anchored against.
253    pub primary: Option<String>,
254    /// Additional roots mounted alongside the primary anchor.
255    #[serde(default)]
256    pub additional_roots: Vec<BundleMountedRoot>,
257    /// RFC3339 timestamp when the anchor was set.
258    pub anchored_at: Option<String>,
259    /// Bundle workspace policy label. Currently always
260    /// `"safe_identity_only"`; future revisions may tie this to mount
261    /// modes once the PathScope matcher (#2216) lands.
262    pub policy: String,
263}
264
265#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
266#[serde(default)]
267pub struct BundleMountedRoot {
268    pub path: String,
269    pub mount_mode: String,
270    pub mounted_at: String,
271}
272
273impl From<&MountedRoot> for BundleMountedRoot {
274    fn from(root: &MountedRoot) -> Self {
275        Self {
276            path: root.path.to_string_lossy().into_owned(),
277            mount_mode: root.mount_mode.as_str().to_string(),
278            mounted_at: root.mounted_at.clone(),
279        }
280    }
281}
282
283impl From<&WorkspaceAnchor> for BundleWorkspace {
284    fn from(anchor: &WorkspaceAnchor) -> Self {
285        Self {
286            primary: Some(anchor.primary.to_string_lossy().into_owned()),
287            additional_roots: anchor.additional_roots.iter().map(Into::into).collect(),
288            anchored_at: Some(anchor.anchored_at.clone()),
289            policy: "safe_identity_only".to_string(),
290        }
291    }
292}
293
294#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
295#[serde(default)]
296pub struct BundleTranscript {
297    pub sections: Vec<BundleTranscriptSection>,
298}
299
300#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
301#[serde(default)]
302pub struct BundleTranscriptSection {
303    pub id: String,
304    pub label: String,
305    pub scope: String,
306    pub location: String,
307    pub summary: Option<String>,
308    pub messages: Vec<JsonValue>,
309    pub events: Vec<JsonValue>,
310    pub assets: Vec<JsonValue>,
311    pub metadata: BTreeMap<String, JsonValue>,
312}
313
314#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
315#[serde(default)]
316pub struct BundleTools {
317    pub schemas: Vec<BundleJsonEntry>,
318    pub calls: Vec<BundleToolCall>,
319}
320
321#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
322#[serde(default)]
323pub struct BundleJsonEntry {
324    pub source: String,
325    pub index: usize,
326    pub value: JsonValue,
327}
328
329#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
330#[serde(default)]
331pub struct BundleToolCall {
332    pub tool_name: String,
333    pub tool_use_id: String,
334    pub args_hash: String,
335    pub result: String,
336    pub is_rejected: bool,
337    pub duration_ms: u64,
338    pub iteration: usize,
339    pub timestamp: String,
340}
341
342impl From<&ToolCallRecord> for BundleToolCall {
343    fn from(record: &ToolCallRecord) -> Self {
344        Self {
345            tool_name: record.tool_name.clone(),
346            tool_use_id: record.tool_use_id.clone(),
347            args_hash: record.args_hash.clone(),
348            result: record.result.clone(),
349            is_rejected: record.is_rejected,
350            duration_ms: record.duration_ms,
351            iteration: record.iteration,
352            timestamp: record.timestamp.clone(),
353        }
354    }
355}
356
357#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
358#[serde(default)]
359pub struct BundlePermission {
360    pub kind: String,
361    pub source: String,
362    pub request_id: Option<String>,
363    pub agent: Option<String>,
364    pub payload: JsonValue,
365}
366
367#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
368#[serde(default)]
369pub struct BundleReplay {
370    pub replay_fixture: Option<ReplayFixture>,
371    pub run_record: Option<JsonValue>,
372    #[serde(skip_serializing_if = "Option::is_none")]
373    pub observability: Option<RunObservabilityRecord>,
374    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
375    #[serde(skip_serializing_if = "Vec::is_empty")]
376    pub worker_snapshots: Vec<BundleWorkerSnapshot>,
377    pub event_log_pointers: Vec<BundleEventLogPointer>,
378    pub transitions: Vec<RunTransitionRecord>,
379    pub checkpoints: Vec<RunCheckpointRecord>,
380    pub trace_spans: Vec<RunTraceSpanRecord>,
381    pub deterministic_events: Vec<BundleJsonEntry>,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
385#[serde(default)]
386pub struct BundleWorkerSnapshot {
387    pub worker_id: String,
388    pub worker_name: String,
389    pub status: String,
390    pub snapshot_ref: String,
391    pub source_path: Option<String>,
392    pub value: JsonValue,
393}
394
395#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
396#[serde(default)]
397pub struct MaterializedWorkerSnapshot {
398    pub worker_id: String,
399    pub path: String,
400}
401
402#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
403#[serde(default)]
404pub struct BundleEventLogPointer {
405    pub kind: String,
406    pub topic: Option<String>,
407    pub path: Option<String>,
408    pub location: String,
409    pub available: bool,
410}
411
412#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
413#[serde(default)]
414pub struct RedactionManifest {
415    pub mode: String,
416    pub policy: String,
417    pub placeholder: String,
418    pub entries: Vec<RedactionEntry>,
419    pub unsafe_secret_markers_rejected: bool,
420}
421
422#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
423#[serde(default)]
424pub struct RedactionEntry {
425    pub path: String,
426    pub class: String,
427    pub action: String,
428    pub replacement: Option<String>,
429}
430
431#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
432#[serde(default)]
433pub struct BundleAttachment {
434    pub id: String,
435    pub kind: String,
436    pub title: Option<String>,
437    pub stage: Option<String>,
438    pub text: Option<String>,
439    pub data: Option<JsonValue>,
440    pub metadata: BTreeMap<String, JsonValue>,
441}
442
443#[derive(Debug, Clone, PartialEq, Eq)]
444pub enum SessionBundleError {
445    Decode(String),
446    Encode(String),
447    MissingRequired(String),
448    UnsupportedSchemaVersion { found: u64, supported: u32 },
449    InvalidType { path: String, expected: String },
450    UnsupportedCheckpointState { status: String },
451    UnsafeSecretMarker { path: String, excerpt: String },
452    MissingRunRecord,
453    MissingSessionEvents { session_id: String },
454}
455
456impl fmt::Display for SessionBundleError {
457    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458        match self {
459            Self::Decode(error) => write!(f, "failed to decode session bundle: {error}"),
460            Self::Encode(error) => write!(f, "failed to encode session bundle: {error}"),
461            Self::MissingRequired(path) => {
462                write!(f, "session bundle is missing required field {path}")
463            }
464            Self::UnsupportedSchemaVersion { found, supported } => write!(
465                f,
466                "unsupported session bundle schema_version {found}; this build supports <= {supported}"
467            ),
468            Self::InvalidType { path, expected } => {
469                write!(f, "session bundle field {path} must be {expected}")
470            }
471            Self::UnsupportedCheckpointState { status } => write!(
472                f,
473                "worker snapshot status {status:?} is not checkpointable; suspend the worker at a turn boundary first"
474            ),
475            Self::UnsafeSecretMarker { path, excerpt } => write!(
476                f,
477                "session bundle contains an unsafe unredacted secret marker at {path}: {excerpt}"
478            ),
479            Self::MissingRunRecord => write!(f, "session bundle does not include an importable run record"),
480            Self::MissingSessionEvents { session_id } => write!(
481                f,
482                "event log does not contain replayable events for session_id {session_id:?}"
483            ),
484        }
485    }
486}
487
488impl std::error::Error for SessionBundleError {}
489
490pub fn export_run_record_bundle(
491    run: &RunRecord,
492    options: &SessionBundleExportOptions,
493) -> Result<SessionBundle, SessionBundleError> {
494    let run_record_value =
495        serde_json::to_value(run).map_err(|error| SessionBundleError::Encode(error.to_string()))?;
496    let mut bundle = raw_bundle_from_run(run, run_record_value, options.include_attachments)?;
497    let mut bundle_value = serde_json::to_value(&bundle)
498        .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
499
500    let mut manifest = RedactionManifest {
501        mode: options.mode.as_str().to_string(),
502        policy: if matches!(options.mode, SessionBundleExportMode::Local) {
503            "none".to_string()
504        } else {
505            "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths".to_string()
506        },
507        placeholder: REDACTED_PLACEHOLDER.to_string(),
508        entries: Vec::new(),
509        unsafe_secret_markers_rejected: !matches!(options.mode, SessionBundleExportMode::Local),
510    };
511
512    if !matches!(options.mode, SessionBundleExportMode::Local) {
513        let redaction_policy = bundle_redaction_policy(&options.redaction_policy);
514        redact_json_with_manifest(
515            &mut bundle_value,
516            "$",
517            &redaction_policy,
518            &mut manifest.entries,
519        );
520        redact_bundle_pointer_paths_json(&mut bundle_value, "$", &mut manifest.entries);
521    }
522    if matches!(options.mode, SessionBundleExportMode::ReplayOnly) {
523        withhold_replay_only_json(&mut bundle_value, "$", &mut manifest.entries);
524    }
525    set_json_path(
526        &mut bundle_value,
527        &["redaction"],
528        serde_json::to_value(&manifest)
529            .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
530    );
531    bundle = serde_json::from_value(bundle_value)
532        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
533    Ok(bundle)
534}
535
536pub fn export_worker_snapshot_bundle(
537    snapshot_path: &Path,
538    options: &SessionBundleExportOptions,
539) -> Result<SessionBundle, SessionBundleError> {
540    let run = run_record_from_worker_snapshot(snapshot_path)?;
541    export_run_record_bundle(&run, options)
542}
543
544pub fn run_record_from_worker_snapshot(
545    snapshot_path: &Path,
546) -> Result<RunRecord, SessionBundleError> {
547    let content = fs::read_to_string(snapshot_path).map_err(|error| {
548        SessionBundleError::Decode(format!(
549            "failed to read worker snapshot {}: {error}",
550            snapshot_path.display()
551        ))
552    })?;
553    let value: JsonValue = serde_json::from_str(&content)
554        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
555    run_record_from_worker_snapshot_value(snapshot_path, value)
556}
557
558pub fn validate_session_bundle_value(
559    value: &JsonValue,
560    options: &SessionBundleValidationOptions,
561) -> Result<SessionBundle, SessionBundleError> {
562    require_field(value, "_type")?;
563    require_field(value, "schema_version")?;
564    require_field(value, "bundle_id")?;
565    require_field(value, "created_at")?;
566    require_field(value, "producer")?;
567    require_field(value, "source")?;
568    require_field(value, "runtime")?;
569    require_field(value, "transcript")?;
570    require_field(value, "tools")?;
571    require_field(value, "permissions")?;
572    require_field(value, "replay")?;
573    require_field(value, "redaction")?;
574    require_field(value, "attachments")?;
575    require_nested_field(value, &["producer", "name"])?;
576    require_nested_field(value, &["producer", "version"])?;
577    require_nested_field(value, &["producer", "schema_id"])?;
578    require_nested_field(value, &["source", "kind"])?;
579    require_nested_field(value, &["source", "run_record_id"])?;
580    require_nested_field(value, &["source", "workflow_id"])?;
581    require_nested_field(value, &["source", "task"])?;
582    require_nested_field(value, &["source", "status"])?;
583    require_nested_field(value, &["runtime", "harn_version"])?;
584    require_nested_field(value, &["runtime", "provider_models"])?;
585    require_nested_field(value, &["transcript", "sections"])?;
586    require_nested_field(value, &["tools", "schemas"])?;
587    require_nested_field(value, &["tools", "calls"])?;
588    require_nested_field(value, &["replay", "event_log_pointers"])?;
589    require_nested_field(value, &["replay", "transitions"])?;
590    require_nested_field(value, &["replay", "checkpoints"])?;
591    require_nested_field(value, &["replay", "trace_spans"])?;
592    require_nested_field(value, &["replay", "deterministic_events"])?;
593    require_nested_field(value, &["redaction", "mode"])?;
594    require_nested_field(value, &["redaction", "policy"])?;
595    require_nested_field(value, &["redaction", "placeholder"])?;
596    require_nested_field(value, &["redaction", "entries"])?;
597    require_nested_field(value, &["redaction", "unsafe_secret_markers_rejected"])?;
598
599    let type_name = value
600        .get("_type")
601        .and_then(JsonValue::as_str)
602        .ok_or_else(|| SessionBundleError::InvalidType {
603            path: "$._type".to_string(),
604            expected: "string".to_string(),
605        })?;
606    if type_name != SESSION_BUNDLE_TYPE {
607        return Err(SessionBundleError::InvalidType {
608            path: "$._type".to_string(),
609            expected: format!("\"{SESSION_BUNDLE_TYPE}\""),
610        });
611    }
612
613    let version = value
614        .get("schema_version")
615        .and_then(JsonValue::as_u64)
616        .ok_or_else(|| SessionBundleError::InvalidType {
617            path: "$.schema_version".to_string(),
618            expected: "positive integer".to_string(),
619        })?;
620    if version == 0 || version > u64::from(SESSION_BUNDLE_SCHEMA_VERSION) {
621        return Err(SessionBundleError::UnsupportedSchemaVersion {
622            found: version,
623            supported: SESSION_BUNDLE_SCHEMA_VERSION,
624        });
625    }
626
627    if !options.allow_unsafe_secret_markers {
628        reject_unredacted_secret_markers(value, "$", &options.redaction_policy)?;
629    }
630
631    serde_json::from_value::<SessionBundle>(value.clone())
632        .map_err(|error| SessionBundleError::Decode(error.to_string()))
633}
634
635pub fn validate_session_bundle_str(
636    content: &str,
637    options: &SessionBundleValidationOptions,
638) -> Result<SessionBundle, SessionBundleError> {
639    let value: JsonValue = serde_json::from_str(content)
640        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
641    validate_session_bundle_value(&value, options)
642}
643
644pub fn import_run_record_value(bundle: &SessionBundle) -> Result<JsonValue, SessionBundleError> {
645    let replay_observability = replay_observability_for_import(&bundle.replay);
646    if let Some(mut run_record) = bundle.replay.run_record.clone() {
647        let should_fill_observability = match run_record.get("observability") {
648            Some(value) => value.is_null(),
649            None => true,
650        };
651        if should_fill_observability {
652            if let (JsonValue::Object(map), Some(observability)) =
653                (&mut run_record, replay_observability.as_ref())
654            {
655                map.insert(
656                    "observability".to_string(),
657                    serde_json::to_value(observability)
658                        .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
659                );
660            }
661        }
662        return Ok(run_record);
663    }
664    if let Some(fixture) = &bundle.replay.replay_fixture {
665        let transcript = bundle.transcript.sections.first().map(|section| {
666            json!({
667                "_type": "transcript",
668                "messages": section.messages.clone(),
669                "events": section.events.clone(),
670                "assets": section.assets.clone(),
671                "summary": section.summary.clone(),
672                "metadata": section.metadata.clone(),
673            })
674        });
675        let hitl_questions = bundle
676            .permissions
677            .iter()
678            .filter(|permission| permission.kind == "hitl_question")
679            .map(|permission| permission.payload.clone())
680            .collect::<Vec<_>>();
681        return Ok(json!({
682            "_type": "run_record",
683            "id": bundle.source.run_record_id.clone(),
684            "workflow_id": bundle.source.workflow_id.clone(),
685            "workflow_name": bundle.source.workflow_name.clone(),
686            "task": bundle.source.task.clone(),
687            "status": bundle.source.status.clone(),
688            "started_at": bundle.source.started_at.clone(),
689            "finished_at": bundle.source.finished_at.clone(),
690            "stages": [],
691            "transitions": bundle.replay.transitions.clone(),
692            "checkpoints": bundle.replay.checkpoints.clone(),
693            "pending_nodes": [],
694            "completed_nodes": [],
695            "child_runs": [],
696            "artifacts": [],
697            "handoffs": [],
698            "policy": {},
699            "transcript": transcript,
700            "usage": bundle.runtime.usage.clone(),
701            "replay_fixture": fixture,
702            "observability": replay_observability,
703            "trace_spans": bundle.replay.trace_spans.clone(),
704            "tool_recordings": bundle.tools.calls.clone(),
705            "hitl_questions": hitl_questions,
706            "persona_runtime": [],
707            "metadata": {
708                "imported_from_session_bundle": bundle.bundle_id.clone(),
709                "session_bundle_schema_version": bundle.schema_version,
710                "worker_snapshot_count": bundle.replay.worker_snapshots.len(),
711            }
712        }));
713    }
714    Err(SessionBundleError::MissingRunRecord)
715}
716
717pub fn import_run_record_value_with_materialized_worker_snapshots(
718    bundle: &SessionBundle,
719    materialized: &[MaterializedWorkerSnapshot],
720) -> Result<JsonValue, SessionBundleError> {
721    let mut run_record = import_run_record_value(bundle)?;
722    apply_materialized_worker_snapshot_paths(&mut run_record, materialized);
723    Ok(run_record)
724}
725
726fn run_record_from_worker_snapshot_value(
727    snapshot_path: &Path,
728    value: JsonValue,
729) -> Result<RunRecord, SessionBundleError> {
730    require_worker_snapshot_marker(&value)?;
731    let status =
732        snapshot_string(&value, "status").ok_or_else(|| missing_worker_snapshot_field("status"))?;
733    if status != "suspended" {
734        return Err(SessionBundleError::UnsupportedCheckpointState { status });
735    }
736    require_worker_snapshot_object_field(&value, "config")?;
737    require_worker_snapshot_object_field(&value, "suspension")?;
738
739    let snapshot_path_string = snapshot_path.to_string_lossy().into_owned();
740    let worker_id =
741        snapshot_string(&value, "id").ok_or_else(|| missing_worker_snapshot_field("id"))?;
742    let worker_name = snapshot_string(&value, "name").unwrap_or_else(|| "worker".to_string());
743    let task = snapshot_string(&value, "task").unwrap_or_else(|| "Suspended worker".to_string());
744    let suspended_at = snapshot_pointer_string(&value, &["suspension", "suspended_at"]);
745    let started_at = snapshot_string(&value, "started_at")
746        .or_else(|| snapshot_string(&value, "created_at"))
747        .or_else(|| suspended_at.clone())
748        .unwrap_or_else(now_rfc3339);
749    let finished_at = snapshot_string(&value, "finished_at");
750    let session_id = snapshot_pointer_string(&value, &["config", "spec", "session_id"])
751        .or_else(|| snapshot_pointer_string(&value, &["audit", "session_id"]));
752    let parent_session_id =
753        snapshot_pointer_string(&value, &["config", "spec", "parent_session_id"])
754            .or_else(|| snapshot_pointer_string(&value, &["audit", "parent_session_id"]));
755    let child_run_id = snapshot_string(&value, "child_run_id");
756    let child_run_path = snapshot_string(&value, "child_run_path");
757    let execution = value
758        .get("execution")
759        .cloned()
760        .and_then(|value| serde_json::from_value::<RunExecutionRecord>(value).ok());
761
762    let child = RunChildRecord {
763        worker_id: worker_id.clone(),
764        worker_name: worker_name.clone(),
765        parent_stage_id: snapshot_string(&value, "parent_stage_id"),
766        session_id: session_id.clone(),
767        parent_session_id: parent_session_id.clone(),
768        mutation_scope: snapshot_pointer_string(&value, &["audit", "mutation_scope"]),
769        approval_policy: None,
770        task: task.clone(),
771        request: value.get("request").cloned(),
772        provenance: value.get("provenance").cloned(),
773        status: status.clone(),
774        started_at: started_at.clone(),
775        finished_at: finished_at.clone(),
776        run_id: child_run_id.clone(),
777        run_path: child_run_path.clone(),
778        snapshot_path: Some(snapshot_path_string.clone()),
779        execution,
780    };
781    let lineage = RunWorkerLineageRecord {
782        worker_id: worker_id.clone(),
783        worker_name,
784        parent_stage_id: child.parent_stage_id.clone(),
785        task: task.clone(),
786        status: status.clone(),
787        session_id,
788        parent_session_id,
789        run_id: child_run_id,
790        run_path: child_run_path,
791        snapshot_path: Some(snapshot_path_string.clone()),
792    };
793
794    let run_id = format!("checkpoint_{}", sanitize_topic_component(&worker_id));
795    let workflow_id = "worker_snapshot_checkpoint".to_string();
796    let workflow_name = Some("Worker snapshot checkpoint".to_string());
797    let checkpoint_id = format!("{run_id}_turn_boundary");
798    let checkpointed_at = suspended_at
799        .or_else(|| finished_at.clone())
800        .unwrap_or_else(|| started_at.clone());
801
802    Ok(RunRecord {
803        type_name: "run_record".to_string(),
804        id: run_id.clone(),
805        workflow_id: workflow_id.clone(),
806        workflow_name: workflow_name.clone(),
807        task,
808        status,
809        started_at,
810        finished_at,
811        checkpoints: vec![RunCheckpointRecord {
812            id: checkpoint_id,
813            ready_nodes: vec!["worker_snapshot_resume".to_string()],
814            completed_nodes: Vec::new(),
815            last_stage_id: None,
816            persisted_at: checkpointed_at.clone(),
817            reason: "suspended_worker_snapshot_turn_boundary".to_string(),
818        }],
819        child_runs: vec![child],
820        transcript: value.get("transcript").cloned(),
821        replay_fixture: Some(ReplayFixture {
822            type_name: "replay_fixture".to_string(),
823            id: format!("fixture_{run_id}"),
824            source_run_id: run_id,
825            workflow_id,
826            workflow_name,
827            created_at: checkpointed_at,
828            eval_kind: Some("worker_snapshot_checkpoint".to_string()),
829            expected_status: "suspended".to_string(),
830            ..ReplayFixture::default()
831        }),
832        observability: Some(RunObservabilityRecord {
833            schema_version: 4,
834            worker_lineage: vec![lineage],
835            ..RunObservabilityRecord::default()
836        }),
837        metadata: BTreeMap::from([
838            ("checkpoint_kind".to_string(), json!("worker_snapshot")),
839            (
840                "worker_snapshot_path".to_string(),
841                json!(snapshot_path_string),
842            ),
843        ]),
844        ..RunRecord::default()
845    })
846}
847
848fn snapshot_string(value: &JsonValue, key: &str) -> Option<String> {
849    value
850        .get(key)
851        .and_then(JsonValue::as_str)
852        .filter(|value| !value.is_empty())
853        .map(str::to_string)
854}
855
856fn missing_worker_snapshot_field(field: &str) -> SessionBundleError {
857    SessionBundleError::MissingRequired(format!("$.worker_snapshot.{field}"))
858}
859
860fn require_worker_snapshot_marker(value: &JsonValue) -> Result<(), SessionBundleError> {
861    match snapshot_string(value, "_type").as_deref() {
862        Some("worker_snapshot") => Ok(()),
863        Some(_) => Err(SessionBundleError::InvalidType {
864            path: "$.worker_snapshot._type".to_string(),
865            expected: "\"worker_snapshot\"".to_string(),
866        }),
867        None => Err(missing_worker_snapshot_field("_type")),
868    }
869}
870
871fn require_worker_snapshot_object_field(
872    value: &JsonValue,
873    field: &str,
874) -> Result<(), SessionBundleError> {
875    match value.get(field) {
876        Some(JsonValue::Object(_)) => Ok(()),
877        Some(_) => Err(SessionBundleError::InvalidType {
878            path: format!("$.worker_snapshot.{field}"),
879            expected: "object".to_string(),
880        }),
881        None => Err(missing_worker_snapshot_field(field)),
882    }
883}
884
885fn snapshot_pointer_string(value: &JsonValue, path: &[&str]) -> Option<String> {
886    let mut current = value;
887    for component in path {
888        current = current.get(*component)?;
889    }
890    current
891        .as_str()
892        .filter(|value| !value.is_empty())
893        .map(str::to_string)
894}
895
896pub fn materialize_worker_snapshots(
897    bundle: &SessionBundle,
898    out_dir: &Path,
899) -> Result<Vec<MaterializedWorkerSnapshot>, SessionBundleError> {
900    if bundle.replay.worker_snapshots.is_empty() {
901        return Ok(Vec::new());
902    }
903    fs::create_dir_all(out_dir).map_err(|error| {
904        SessionBundleError::Encode(format!(
905            "failed to create worker snapshot directory {}: {error}",
906            out_dir.display()
907        ))
908    })?;
909
910    let mut materialized = Vec::new();
911    for (index, snapshot) in bundle.replay.worker_snapshots.iter().enumerate() {
912        let worker_id = if snapshot.worker_id.trim().is_empty() {
913            format!("worker_{index}")
914        } else {
915            snapshot.worker_id.clone()
916        };
917        let path = out_dir.join(worker_snapshot_file_name(&worker_id, index));
918        let value = worker_snapshot_value_for_import(&snapshot.value, &path);
919        let rendered = serde_json::to_string_pretty(&value)
920            .map(|json| format!("{json}\n"))
921            .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
922        fs::write(&path, rendered).map_err(|error| {
923            SessionBundleError::Encode(format!(
924                "failed to write worker snapshot {}: {error}",
925                path.display()
926            ))
927        })?;
928        materialized.push(MaterializedWorkerSnapshot {
929            worker_id,
930            path: path.to_string_lossy().into_owned(),
931        });
932    }
933    Ok(materialized)
934}
935
936fn apply_materialized_worker_snapshot_paths(
937    run_record: &mut JsonValue,
938    materialized: &[MaterializedWorkerSnapshot],
939) {
940    if materialized.is_empty() {
941        return;
942    }
943
944    let paths_by_worker_id = materialized
945        .iter()
946        .filter(|snapshot| !snapshot.worker_id.is_empty())
947        .map(|snapshot| (snapshot.worker_id.as_str(), snapshot.path.as_str()))
948        .collect::<BTreeMap<_, _>>();
949    if paths_by_worker_id.is_empty() {
950        return;
951    }
952
953    rewrite_worker_snapshot_paths(run_record.get_mut("child_runs"), &paths_by_worker_id);
954    rewrite_worker_snapshot_paths(
955        run_record
956            .get_mut("observability")
957            .and_then(|observability| observability.get_mut("worker_lineage")),
958        &paths_by_worker_id,
959    );
960    rewrite_checkpoint_metadata_snapshot_path(run_record, materialized);
961}
962
963fn rewrite_worker_snapshot_paths(
964    records: Option<&mut JsonValue>,
965    paths_by_worker_id: &BTreeMap<&str, &str>,
966) {
967    let Some(records) = records.and_then(JsonValue::as_array_mut) else {
968        return;
969    };
970    for record in records {
971        let Some(worker_id) = record.get("worker_id").and_then(JsonValue::as_str) else {
972            continue;
973        };
974        let Some(path) = paths_by_worker_id.get(worker_id) else {
975            continue;
976        };
977        if let JsonValue::Object(map) = record {
978            map.insert(
979                "snapshot_path".to_string(),
980                JsonValue::String((*path).to_string()),
981            );
982        }
983    }
984}
985
986fn rewrite_checkpoint_metadata_snapshot_path(
987    run_record: &mut JsonValue,
988    materialized: &[MaterializedWorkerSnapshot],
989) {
990    let Some(snapshot) = materialized.first() else {
991        return;
992    };
993    let Some(metadata) = run_record
994        .get_mut("metadata")
995        .and_then(JsonValue::as_object_mut)
996    else {
997        return;
998    };
999    if metadata.contains_key("worker_snapshot_path") {
1000        metadata.insert(
1001            "worker_snapshot_path".to_string(),
1002            JsonValue::String(snapshot.path.clone()),
1003        );
1004    }
1005}
1006
1007fn replay_observability_for_import(replay: &BundleReplay) -> Option<RunObservabilityRecord> {
1008    let mut observability = replay.observability.clone().unwrap_or_default();
1009    let has_observability = replay.observability.is_some();
1010    let has_verification_outcomes = !replay.verification_outcomes.is_empty();
1011    if !has_observability && !has_verification_outcomes {
1012        return None;
1013    }
1014    if observability.schema_version == 0 {
1015        observability.schema_version = 4;
1016    }
1017    if observability.verification_outcomes.is_empty() && has_verification_outcomes {
1018        observability.verification_outcomes = replay.verification_outcomes.clone();
1019    }
1020    Some(observability)
1021}
1022
1023pub fn session_bundle_from_agent_session_events(
1024    session_id: &str,
1025    events: &[AgentSessionReplayEvent],
1026) -> Result<SessionBundle, SessionBundleError> {
1027    if events.is_empty() {
1028        return Err(SessionBundleError::MissingSessionEvents {
1029            session_id: session_id.to_string(),
1030        });
1031    }
1032
1033    let stable_id = sanitize_topic_component(session_id);
1034    let started_at = rfc3339_from_epoch_ms(events[0].occurred_at_ms);
1035    // Liveness, not a `completed` default: a stream without a terminal
1036    // `SessionClosed` event (a frozen-mid-flight loop, or a time-traveled
1037    // prefix) is `suspended`, so `finished_at` stays null and a resume host
1038    // continues the pending turn rather than replaying a "finished" run.
1039    let liveness = agent_session_liveness(events);
1040    let finished_at = match &liveness {
1041        AgentSessionLiveness::Closed { finished_at_ms, .. } => {
1042            Some(rfc3339_from_epoch_ms(*finished_at_ms))
1043        }
1044        AgentSessionLiveness::Suspended => None,
1045    };
1046    let status = liveness.status().to_string();
1047    let run_id = session_id.to_string();
1048    let workflow_id = "agent_session".to_string();
1049    let created_at = finished_at.clone().unwrap_or_else(|| started_at.clone());
1050    let transcript_events = transcript_events_from_agent_session(events)?;
1051    let transcript_messages = transcript_messages_from_agent_session(events);
1052    let mut transcript_metadata = BTreeMap::new();
1053    transcript_metadata.insert("session_id".to_string(), json!(session_id));
1054    transcript_metadata.insert(
1055        "source".to_string(),
1056        json!("events.sqlite observability.agent_events topic"),
1057    );
1058
1059    let replay_fixture = ReplayFixture {
1060        type_name: "replay_fixture".to_string(),
1061        id: format!("fixture_from_session_{stable_id}"),
1062        source_run_id: run_id.clone(),
1063        workflow_id: workflow_id.clone(),
1064        workflow_name: Some(format!("Agent session {session_id}")),
1065        created_at: created_at.clone(),
1066        eval_kind: Some("replay".to_string()),
1067        expected_status: status.clone(),
1068        ..ReplayFixture::default()
1069    };
1070
1071    Ok(SessionBundle {
1072        bundle_id: format!("bundle_from_session_{stable_id}"),
1073        created_at,
1074        producer: BundleProducer {
1075            name: "harn".to_string(),
1076            version: env!("CARGO_PKG_VERSION").to_string(),
1077            schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1078        },
1079        source: BundleSource {
1080            kind: "event_log_session".to_string(),
1081            run_record_id: run_id,
1082            workflow_id,
1083            workflow_name: Some(format!("Agent session {session_id}")),
1084            task: task_from_agent_session(events)
1085                .unwrap_or_else(|| format!("Agent session {session_id}")),
1086            status,
1087            started_at,
1088            finished_at,
1089            ..BundleSource::default()
1090        },
1091        runtime: BundleRuntime {
1092            harn_version: env!("CARGO_PKG_VERSION").to_string(),
1093            ..BundleRuntime::default()
1094        },
1095        transcript: BundleTranscript {
1096            sections: vec![BundleTranscriptSection {
1097                id: "agent_events".to_string(),
1098                label: "Agent event log".to_string(),
1099                scope: "session".to_string(),
1100                location: format!(
1101                    "observability.agent_events.{}",
1102                    sanitize_topic_component(session_id)
1103                ),
1104                summary: None,
1105                messages: transcript_messages,
1106                events: transcript_events,
1107                assets: Vec::new(),
1108                metadata: transcript_metadata,
1109            }],
1110        },
1111        permissions: permissions_from_agent_session(events),
1112        replay: BundleReplay {
1113            replay_fixture: Some(replay_fixture),
1114            event_log_pointers: vec![BundleEventLogPointer {
1115                kind: "agent_events".to_string(),
1116                topic: Some(format!(
1117                    "observability.agent_events.{}",
1118                    sanitize_topic_component(session_id)
1119                )),
1120                path: None,
1121                location: "events.sqlite".to_string(),
1122                available: true,
1123            }],
1124            deterministic_events: deterministic_events_from_agent_session(events)?,
1125            ..BundleReplay::default()
1126        },
1127        metadata: BTreeMap::from([(
1128            SESSION_BUNDLE_LIVENESS_KEY.to_string(),
1129            json!(liveness.tag()),
1130        )]),
1131        ..SessionBundle::default()
1132    })
1133}
1134
1135pub fn import_run_record_from_agent_session_events(
1136    session_id: &str,
1137    events: &[AgentSessionReplayEvent],
1138) -> Result<RunRecord, SessionBundleError> {
1139    let bundle = session_bundle_from_agent_session_events(session_id, events)?;
1140    let run_record = import_run_record_value(&bundle)?;
1141    serde_json::from_value(run_record)
1142        .map_err(|error| SessionBundleError::Decode(error.to_string()))
1143}
1144
1145fn transcript_events_from_agent_session(
1146    events: &[AgentSessionReplayEvent],
1147) -> Result<Vec<JsonValue>, SessionBundleError> {
1148    events
1149        .iter()
1150        .map(|entry| {
1151            let event = serde_json::to_value(&entry.event)
1152                .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
1153            Ok(json!({
1154                "event_id": entry.event_id,
1155                "kind": entry.kind,
1156                "occurred_at_ms": entry.occurred_at_ms,
1157                "event": event,
1158            }))
1159        })
1160        .collect()
1161}
1162
1163fn transcript_messages_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<JsonValue> {
1164    events
1165        .iter()
1166        .filter_map(|entry| match &entry.event {
1167            AgentEvent::UserMessage { content, .. } => Some(json!({
1168                "role": "user",
1169                "content": content,
1170            })),
1171            AgentEvent::AgentMessageChunk { content, .. } if !content.is_empty() => Some(json!({
1172                "role": "assistant",
1173                "content": content,
1174            })),
1175            _ => None,
1176        })
1177        .collect()
1178}
1179
1180fn permissions_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<BundlePermission> {
1181    let mut permissions = Vec::new();
1182    for entry in events {
1183        if let AgentEvent::HitlRequested {
1184            request_id,
1185            kind,
1186            payload,
1187            ..
1188        } = &entry.event
1189        {
1190            permissions.push(BundlePermission {
1191                kind: "hitl_question".to_string(),
1192                source: "agent_events".to_string(),
1193                request_id: Some(request_id.clone()),
1194                agent: None,
1195                payload: json!({
1196                    "kind": kind,
1197                    "payload": payload,
1198                    "event_id": entry.event_id,
1199                    "occurred_at_ms": entry.occurred_at_ms,
1200                }),
1201            });
1202        }
1203    }
1204    permissions
1205}
1206
1207fn deterministic_events_from_agent_session(
1208    events: &[AgentSessionReplayEvent],
1209) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1210    transcript_events_from_agent_session(events).map(|entries| {
1211        entries
1212            .into_iter()
1213            .enumerate()
1214            .map(|(index, value)| BundleJsonEntry {
1215                source: "events.sqlite.agent_events".to_string(),
1216                index,
1217                value,
1218            })
1219            .collect()
1220    })
1221}
1222
1223fn task_from_agent_session(events: &[AgentSessionReplayEvent]) -> Option<String> {
1224    events.iter().find_map(|entry| match &entry.event {
1225        AgentEvent::UserMessage { content, .. } => user_message_text(content),
1226        _ => None,
1227    })
1228}
1229
1230fn user_message_text(content: &[JsonValue]) -> Option<String> {
1231    let parts = content
1232        .iter()
1233        .filter_map(|value| {
1234            value
1235                .get("text")
1236                .and_then(JsonValue::as_str)
1237                .or_else(|| value.as_str())
1238                .map(str::to_string)
1239        })
1240        .filter(|text| !text.trim().is_empty())
1241        .collect::<Vec<_>>();
1242    if parts.is_empty() {
1243        None
1244    } else {
1245        Some(parts.join("\n"))
1246    }
1247}
1248
1249fn rfc3339_from_epoch_ms(ms: i64) -> String {
1250    DateTime::<Utc>::from_timestamp_millis(ms)
1251        .unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).expect("unix epoch is valid"))
1252        .to_rfc3339_opts(SecondsFormat::Millis, true)
1253}
1254
1255fn raw_bundle_from_run(
1256    run: &RunRecord,
1257    run_record_value: JsonValue,
1258    include_attachments: bool,
1259) -> Result<SessionBundle, SessionBundleError> {
1260    let mut bundle = SessionBundle {
1261        bundle_id: new_id("bundle"),
1262        created_at: now_rfc3339(),
1263        producer: BundleProducer {
1264            name: "harn".to_string(),
1265            version: env!("CARGO_PKG_VERSION").to_string(),
1266            schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1267        },
1268        source: BundleSource {
1269            kind: "run_record".to_string(),
1270            run_record_id: run.id.clone(),
1271            workflow_id: run.workflow_id.clone(),
1272            workflow_name: run.workflow_name.clone(),
1273            task: run.task.clone(),
1274            status: run.status.clone(),
1275            started_at: run.started_at.clone(),
1276            finished_at: run.finished_at.clone(),
1277            persisted_path: run.persisted_path.clone(),
1278            root_run_id: run.root_run_id.clone(),
1279            parent_run_id: run.parent_run_id.clone(),
1280            child_run_count: run.child_runs.len(),
1281        },
1282        runtime: BundleRuntime {
1283            harn_version: env!("CARGO_PKG_VERSION").to_string(),
1284            provider_models: run
1285                .usage
1286                .as_ref()
1287                .map(|usage| usage.models.clone())
1288                .unwrap_or_default(),
1289            usage: run.usage.as_ref().map(|usage| BundleUsage {
1290                input_tokens: usage.input_tokens,
1291                output_tokens: usage.output_tokens,
1292                call_count: usage.call_count,
1293                total_duration_ms: usage.total_duration_ms,
1294                total_cost: usage.total_cost,
1295                models: usage.models.clone(),
1296            }),
1297            metadata: BTreeMap::new(),
1298        },
1299        workspace: workspace_from_run(run),
1300        transcript: transcript_from_run(run),
1301        tools: BundleTools {
1302            schemas: tool_schema_entries(run),
1303            calls: run
1304                .tool_recordings
1305                .iter()
1306                .map(BundleToolCall::from)
1307                .collect(),
1308        },
1309        permissions: permissions_from_run(run),
1310        replay: BundleReplay {
1311            replay_fixture: run.replay_fixture.clone(),
1312            run_record: Some(run_record_value),
1313            observability: run.observability.clone(),
1314            verification_outcomes: verification_outcomes_for_run(run),
1315            worker_snapshots: worker_snapshots_from_run(run),
1316            event_log_pointers: event_log_pointers_from_run(run),
1317            transitions: run.transitions.clone(),
1318            checkpoints: run.checkpoints.clone(),
1319            trace_spans: run.trace_spans.clone(),
1320            deterministic_events: deterministic_events_from_run(run)?,
1321        },
1322        redaction: RedactionManifest {
1323            mode: "sanitized".to_string(),
1324            policy: "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths"
1325                .to_string(),
1326            placeholder: REDACTED_PLACEHOLDER.to_string(),
1327            entries: Vec::new(),
1328            unsafe_secret_markers_rejected: true,
1329        },
1330        attachments: if include_attachments {
1331            attachments_from_run(run)
1332        } else {
1333            Vec::new()
1334        },
1335        ..SessionBundle::default()
1336    };
1337    bundle.metadata.insert(
1338        "format_note".to_string(),
1339        json!("Session bundles are portable JSON envelopes; hosted share links should reference sanitized bundles rather than raw run records."),
1340    );
1341    Ok(bundle)
1342}
1343
1344fn verification_outcomes_for_run(run: &RunRecord) -> Vec<RunVerificationOutcomeRecord> {
1345    if let Some(observability) = run.observability.as_ref() {
1346        return observability.verification_outcomes.clone();
1347    }
1348    derive_run_observability(run, run.persisted_path.as_deref().map(Path::new))
1349        .verification_outcomes
1350}
1351
1352fn bundle_redaction_policy(base: &RedactionPolicy) -> RedactionPolicy {
1353    base.clone()
1354        .with_extra_field("persisted_path")
1355        .with_extra_field("primary")
1356        .with_extra_field("run_path")
1357        .with_extra_field("snapshot_ref")
1358        .with_extra_field("snapshot_path")
1359        .with_extra_field("source_path")
1360}
1361
1362fn workspace_from_run(run: &RunRecord) -> Option<BundleWorkspace> {
1363    let anchor = run
1364        .transcript
1365        .as_ref()
1366        .and_then(|transcript| transcript.get("metadata"))
1367        .and_then(anchor_from_transcript_metadata_json)?;
1368    Some(BundleWorkspace::from(&anchor))
1369}
1370
1371fn transcript_from_run(run: &RunRecord) -> BundleTranscript {
1372    let mut sections = Vec::new();
1373    if let Some(transcript) = &run.transcript {
1374        sections.push(transcript_section(
1375            "run",
1376            "Run transcript",
1377            "run",
1378            "$.transcript",
1379            transcript,
1380        ));
1381    }
1382    for (index, stage) in run.stages.iter().enumerate() {
1383        if let Some(transcript) = &stage.transcript {
1384            sections.push(transcript_section(
1385                &stage.id,
1386                &format!("Stage {}", stage.node_id),
1387                "stage",
1388                &format!("$.stages[{index}].transcript"),
1389                transcript,
1390            ));
1391        }
1392    }
1393    BundleTranscript { sections }
1394}
1395
1396fn transcript_section(
1397    id: &str,
1398    label: &str,
1399    scope: &str,
1400    location: &str,
1401    transcript: &JsonValue,
1402) -> BundleTranscriptSection {
1403    BundleTranscriptSection {
1404        id: id.to_string(),
1405        label: label.to_string(),
1406        scope: scope.to_string(),
1407        location: location.to_string(),
1408        summary: transcript
1409            .get("summary")
1410            .and_then(JsonValue::as_str)
1411            .map(str::to_string),
1412        messages: json_array(transcript.get("messages")),
1413        events: json_array(transcript.get("events")),
1414        assets: json_array(transcript.get("assets")),
1415        metadata: transcript
1416            .get("metadata")
1417            .and_then(JsonValue::as_object)
1418            .map(|map| {
1419                map.iter()
1420                    .map(|(key, value)| (key.clone(), value.clone()))
1421                    .collect()
1422            })
1423            .unwrap_or_default(),
1424    }
1425}
1426
1427fn json_array(value: Option<&JsonValue>) -> Vec<JsonValue> {
1428    value
1429        .and_then(JsonValue::as_array)
1430        .cloned()
1431        .unwrap_or_default()
1432}
1433
1434fn tool_schema_entries(run: &RunRecord) -> Vec<BundleJsonEntry> {
1435    let mut entries = Vec::new();
1436    collect_tool_schema_entries_from_transcript(&mut entries, "run.transcript", &run.transcript);
1437    for stage in &run.stages {
1438        collect_tool_schema_entries_from_transcript(
1439            &mut entries,
1440            &format!("stage.{}.transcript", stage.node_id),
1441            &stage.transcript,
1442        );
1443        if let Some(tools) = stage
1444            .metadata
1445            .get("tool_schemas")
1446            .or_else(|| stage.metadata.get("tools"))
1447        {
1448            entries.push(BundleJsonEntry {
1449                source: format!("stage.{}.metadata", stage.node_id),
1450                index: entries.len(),
1451                value: tools.clone(),
1452            });
1453        }
1454    }
1455    entries
1456}
1457
1458fn collect_tool_schema_entries_from_transcript(
1459    entries: &mut Vec<BundleJsonEntry>,
1460    source: &str,
1461    transcript: &Option<JsonValue>,
1462) {
1463    let Some(transcript) = transcript else {
1464        return;
1465    };
1466    for event in transcript
1467        .get("events")
1468        .and_then(JsonValue::as_array)
1469        .into_iter()
1470        .flatten()
1471    {
1472        let kind = event
1473            .get("type")
1474            .or_else(|| event.get("kind"))
1475            .and_then(JsonValue::as_str)
1476            .unwrap_or_default();
1477        if kind == "tool_schemas" || kind == "tool_schema" {
1478            entries.push(BundleJsonEntry {
1479                source: source.to_string(),
1480                index: entries.len(),
1481                value: event.clone(),
1482            });
1483        }
1484    }
1485}
1486
1487fn permissions_from_run(run: &RunRecord) -> Vec<BundlePermission> {
1488    let mut permissions = run
1489        .hitl_questions
1490        .iter()
1491        .map(permission_from_hitl_question)
1492        .collect::<Vec<_>>();
1493    collect_permission_events(&mut permissions, "run.transcript", &run.transcript);
1494    for stage in &run.stages {
1495        collect_permission_events(
1496            &mut permissions,
1497            &format!("stage.{}.transcript", stage.node_id),
1498            &stage.transcript,
1499        );
1500        if let Some(worker) = stage.metadata.get("worker") {
1501            if let Some(policy) = worker
1502                .get("audit")
1503                .and_then(|audit| audit.get("approval_policy"))
1504            {
1505                permissions.push(BundlePermission {
1506                    kind: "approval_policy".to_string(),
1507                    source: format!("stage.{}.worker.audit", stage.node_id),
1508                    request_id: None,
1509                    agent: worker
1510                        .get("name")
1511                        .and_then(JsonValue::as_str)
1512                        .map(str::to_string),
1513                    payload: policy.clone(),
1514                });
1515            }
1516        }
1517    }
1518    permissions
1519}
1520
1521fn permission_from_hitl_question(question: &RunHitlQuestionRecord) -> BundlePermission {
1522    BundlePermission {
1523        kind: "hitl_question".to_string(),
1524        source: "run.hitl_questions".to_string(),
1525        request_id: Some(question.request_id.clone()),
1526        agent: if question.agent.is_empty() {
1527            None
1528        } else {
1529            Some(question.agent.clone())
1530        },
1531        payload: serde_json::to_value(question).unwrap_or(JsonValue::Null),
1532    }
1533}
1534
1535fn collect_permission_events(
1536    permissions: &mut Vec<BundlePermission>,
1537    source: &str,
1538    transcript: &Option<JsonValue>,
1539) {
1540    let Some(transcript) = transcript else {
1541        return;
1542    };
1543    for event in transcript
1544        .get("events")
1545        .and_then(JsonValue::as_array)
1546        .into_iter()
1547        .flatten()
1548    {
1549        let kind = event
1550            .get("type")
1551            .or_else(|| event.get("kind"))
1552            .and_then(JsonValue::as_str)
1553            .unwrap_or_default();
1554        if kind.contains("permission") || kind.contains("approval") || kind.starts_with("hitl_") {
1555            permissions.push(BundlePermission {
1556                kind: kind.to_string(),
1557                source: source.to_string(),
1558                request_id: event
1559                    .get("request_id")
1560                    .or_else(|| event.get("id"))
1561                    .and_then(JsonValue::as_str)
1562                    .map(str::to_string),
1563                agent: event
1564                    .get("agent")
1565                    .and_then(JsonValue::as_str)
1566                    .map(str::to_string),
1567                payload: event.clone(),
1568            });
1569        }
1570    }
1571}
1572
1573fn event_log_pointers_from_run(run: &RunRecord) -> Vec<BundleEventLogPointer> {
1574    let mut pointers = Vec::new();
1575    if let Some(observability) = &run.observability {
1576        for pointer in &observability.transcript_pointers {
1577            pointers.push(BundleEventLogPointer {
1578                kind: pointer.kind.clone(),
1579                topic: None,
1580                path: pointer.path.clone(),
1581                location: pointer.location.clone(),
1582                available: pointer.available,
1583            });
1584        }
1585        for worker in &observability.worker_lineage {
1586            if let Some(session_id) = &worker.session_id {
1587                pointers.push(BundleEventLogPointer {
1588                    kind: "agent_events".to_string(),
1589                    topic: Some(format!("observability.agent_events.{session_id}")),
1590                    path: worker.snapshot_path.clone(),
1591                    location: format!("worker.{}.session", worker.worker_id),
1592                    available: worker.snapshot_path.is_some(),
1593                });
1594            }
1595        }
1596    }
1597    pointers
1598}
1599
1600fn worker_snapshots_from_run(run: &RunRecord) -> Vec<BundleWorkerSnapshot> {
1601    let mut snapshots = Vec::new();
1602    let mut seen_paths = BTreeSet::new();
1603    for child in &run.child_runs {
1604        let Some(path) = child.snapshot_path.as_deref() else {
1605            continue;
1606        };
1607        if !seen_paths.insert(path.to_string()) {
1608            continue;
1609        }
1610        if let Some(snapshot) = worker_snapshot_from_path(
1611            &child.worker_id,
1612            &child.worker_name,
1613            &child.status,
1614            Path::new(path),
1615        ) {
1616            snapshots.push(snapshot);
1617        }
1618    }
1619    if let Some(observability) = run.observability.as_ref() {
1620        for worker in &observability.worker_lineage {
1621            let Some(path) = worker.snapshot_path.as_deref() else {
1622                continue;
1623            };
1624            if !seen_paths.insert(path.to_string()) {
1625                continue;
1626            }
1627            if let Some(snapshot) = worker_snapshot_from_path(
1628                &worker.worker_id,
1629                &worker.worker_name,
1630                &worker.status,
1631                Path::new(path),
1632            ) {
1633                snapshots.push(snapshot);
1634            }
1635        }
1636    }
1637    snapshots
1638}
1639
1640fn worker_snapshot_from_path(
1641    worker_id: &str,
1642    worker_name: &str,
1643    status: &str,
1644    path: &Path,
1645) -> Option<BundleWorkerSnapshot> {
1646    let content = fs::read_to_string(path).ok()?;
1647    let value = serde_json::from_str::<JsonValue>(&content).ok()?;
1648    Some(BundleWorkerSnapshot {
1649        worker_id: if worker_id.is_empty() {
1650            value
1651                .get("id")
1652                .and_then(JsonValue::as_str)
1653                .unwrap_or_default()
1654                .to_string()
1655        } else {
1656            worker_id.to_string()
1657        },
1658        worker_name: if worker_name.is_empty() {
1659            value
1660                .get("name")
1661                .and_then(JsonValue::as_str)
1662                .unwrap_or("worker")
1663                .to_string()
1664        } else {
1665            worker_name.to_string()
1666        },
1667        status: if status.is_empty() {
1668            value
1669                .get("status")
1670                .and_then(JsonValue::as_str)
1671                .unwrap_or_default()
1672                .to_string()
1673        } else {
1674            status.to_string()
1675        },
1676        snapshot_ref: value
1677            .get("suspension")
1678            .and_then(|value| value.get("snapshot_ref"))
1679            .and_then(JsonValue::as_str)
1680            .or_else(|| value.get("snapshot_path").and_then(JsonValue::as_str))
1681            .unwrap_or_else(|| path.to_str().unwrap_or_default())
1682            .to_string(),
1683        source_path: Some(path.to_string_lossy().into_owned()),
1684        value,
1685    })
1686}
1687
1688fn worker_snapshot_file_name(worker_id: &str, index: usize) -> String {
1689    let component = sanitize_topic_component(worker_id);
1690    let component = if component.is_empty() {
1691        format!("worker_{index}")
1692    } else {
1693        component
1694    };
1695    format!("{component}.json")
1696}
1697
1698fn worker_snapshot_value_for_import(value: &JsonValue, path: &Path) -> JsonValue {
1699    let mut value = value.clone();
1700    let path = path.to_string_lossy().into_owned();
1701    if let JsonValue::Object(map) = &mut value {
1702        map.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
1703        if let Some(JsonValue::Object(suspension)) = map.get_mut("suspension") {
1704            suspension.insert("snapshot_ref".to_string(), JsonValue::String(path));
1705        }
1706    }
1707    value
1708}
1709
1710fn deterministic_events_from_run(
1711    run: &RunRecord,
1712) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1713    let mut events = Vec::new();
1714    for (index, transition) in run.transitions.iter().enumerate() {
1715        events.push(BundleJsonEntry {
1716            source: "run.transitions".to_string(),
1717            index,
1718            value: serde_json::to_value(transition)
1719                .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1720        });
1721    }
1722    for (index, checkpoint) in run.checkpoints.iter().enumerate() {
1723        events.push(BundleJsonEntry {
1724            source: "run.checkpoints".to_string(),
1725            index,
1726            value: serde_json::to_value(checkpoint)
1727                .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1728        });
1729    }
1730    Ok(events)
1731}
1732
1733fn attachments_from_run(run: &RunRecord) -> Vec<BundleAttachment> {
1734    run.artifacts
1735        .iter()
1736        .map(|artifact| BundleAttachment {
1737            id: artifact.id.clone(),
1738            kind: artifact.kind.clone(),
1739            title: artifact.title.clone(),
1740            stage: artifact.stage.clone(),
1741            text: artifact.text.clone(),
1742            data: artifact.data.clone(),
1743            metadata: artifact.metadata.clone(),
1744        })
1745        .collect()
1746}
1747
1748fn redact_json_with_manifest(
1749    value: &mut JsonValue,
1750    path: &str,
1751    policy: &RedactionPolicy,
1752    entries: &mut Vec<RedactionEntry>,
1753) {
1754    match value {
1755        JsonValue::Object(map) => {
1756            let keys = map.keys().cloned().collect::<Vec<_>>();
1757            for key in keys {
1758                let child_path = json_path_child(path, &key);
1759                if policy.field_is_sensitive(&key) {
1760                    map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1761                    entries.push(RedactionEntry {
1762                        path: child_path,
1763                        class: "sensitive_field".to_string(),
1764                        action: "replaced".to_string(),
1765                        replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1766                    });
1767                } else if let Some(child) = map.get_mut(&key) {
1768                    redact_json_with_manifest(child, &child_path, policy, entries);
1769                }
1770            }
1771        }
1772        JsonValue::Array(items) => {
1773            for (index, item) in items.iter_mut().enumerate() {
1774                redact_json_with_manifest(item, &format!("{path}[{index}]"), policy, entries);
1775            }
1776        }
1777        JsonValue::String(text) => {
1778            let redacted = policy.redact_string(text);
1779            if let Cow::Owned(replacement) = redacted {
1780                // Record the actual replacement string (now a named
1781                // `<redacted:<pattern>:<len>>` placeholder from the
1782                // OA-06 catalog) in the manifest so audit consumers
1783                // can attribute the leak to a specific provider.
1784                let manifest_replacement = replacement.clone();
1785                *text = replacement;
1786                entries.push(RedactionEntry {
1787                    path: path.to_string(),
1788                    class: "secret_pattern_or_url".to_string(),
1789                    action: "replaced".to_string(),
1790                    replacement: Some(manifest_replacement),
1791                });
1792            }
1793        }
1794        _ => {}
1795    }
1796}
1797
1798fn redact_bundle_pointer_paths_json(
1799    value: &mut JsonValue,
1800    path: &str,
1801    entries: &mut Vec<RedactionEntry>,
1802) {
1803    match value {
1804        JsonValue::Object(map) => {
1805            let keys = map.keys().cloned().collect::<Vec<_>>();
1806            for key in keys {
1807                let child_path = json_path_child(path, &key);
1808                if key == "path" && bundle_pointer_path_should_redact(&child_path) {
1809                    if !map.get(&key).is_some_and(JsonValue::is_null) {
1810                        map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1811                        entries.push(RedactionEntry {
1812                            path: child_path,
1813                            class: "local_pointer_path".to_string(),
1814                            action: "replaced".to_string(),
1815                            replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1816                        });
1817                    }
1818                } else if let Some(child) = map.get_mut(&key) {
1819                    redact_bundle_pointer_paths_json(child, &child_path, entries);
1820                }
1821            }
1822        }
1823        JsonValue::Array(items) => {
1824            for (index, item) in items.iter_mut().enumerate() {
1825                redact_bundle_pointer_paths_json(item, &format!("{path}[{index}]"), entries);
1826            }
1827        }
1828        _ => {}
1829    }
1830}
1831
1832fn bundle_pointer_path_should_redact(path: &str) -> bool {
1833    path.contains(".event_log_pointers[") || path.contains(".transcript_pointers[")
1834}
1835
1836fn withhold_replay_only_json(value: &mut JsonValue, path: &str, entries: &mut Vec<RedactionEntry>) {
1837    match value {
1838        JsonValue::Object(map) => {
1839            let keys = map.keys().cloned().collect::<Vec<_>>();
1840            for key in keys {
1841                let child_path = json_path_child(path, &key);
1842                if replay_only_field_is_prompt_payload(&key) {
1843                    if !map.get(&key).is_some_and(JsonValue::is_null) {
1844                        map.insert(key, JsonValue::String(REPLAY_ONLY_PLACEHOLDER.to_string()));
1845                        entries.push(RedactionEntry {
1846                            path: child_path,
1847                            class: "prompt_or_tool_payload".to_string(),
1848                            action: "withheld".to_string(),
1849                            replacement: Some(REPLAY_ONLY_PLACEHOLDER.to_string()),
1850                        });
1851                    }
1852                } else if let Some(child) = map.get_mut(&key) {
1853                    withhold_replay_only_json(child, &child_path, entries);
1854                }
1855            }
1856        }
1857        JsonValue::Array(items) => {
1858            for (index, item) in items.iter_mut().enumerate() {
1859                withhold_replay_only_json(item, &format!("{path}[{index}]"), entries);
1860            }
1861        }
1862        _ => {}
1863    }
1864}
1865
1866fn replay_only_field_is_prompt_payload(key: &str) -> bool {
1867    matches!(
1868        key,
1869        "args"
1870            | "arguments"
1871            | "blocks"
1872            | "content"
1873            | "data"
1874            | "private_reasoning"
1875            | "prompt"
1876            | "raw_input"
1877            | "raw_output"
1878            | "result"
1879            | "response_text"
1880            | "summary"
1881            | "system"
1882            | "system_prompt"
1883            | "task"
1884            | "text"
1885            | "thinking"
1886            | "visible_text"
1887    )
1888}
1889
1890fn set_json_path(value: &mut JsonValue, path: &[&str], replacement: JsonValue) {
1891    let Some((head, tail)) = path.split_first() else {
1892        *value = replacement;
1893        return;
1894    };
1895    if tail.is_empty() {
1896        if let JsonValue::Object(map) = value {
1897            map.insert((*head).to_string(), replacement);
1898        }
1899        return;
1900    }
1901    if let JsonValue::Object(map) = value {
1902        if let Some(child) = map.get_mut(*head) {
1903            set_json_path(child, tail, replacement);
1904        }
1905    }
1906}
1907
1908fn require_field(value: &JsonValue, field: &str) -> Result<(), SessionBundleError> {
1909    if value.get(field).is_some() {
1910        Ok(())
1911    } else {
1912        Err(SessionBundleError::MissingRequired(format!("$.{field}")))
1913    }
1914}
1915
1916fn require_nested_field(value: &JsonValue, path: &[&str]) -> Result<(), SessionBundleError> {
1917    let mut current = value;
1918    for segment in path {
1919        current = current
1920            .get(*segment)
1921            .ok_or_else(|| SessionBundleError::MissingRequired(json_path_from_segments(path)))?;
1922    }
1923    Ok(())
1924}
1925
1926fn json_path_from_segments(path: &[&str]) -> String {
1927    path.iter().fold("$".to_string(), |parent, segment| {
1928        json_path_child(&parent, segment)
1929    })
1930}
1931
1932fn reject_unredacted_secret_markers(
1933    value: &JsonValue,
1934    path: &str,
1935    policy: &RedactionPolicy,
1936) -> Result<(), SessionBundleError> {
1937    match value {
1938        JsonValue::Object(map) => {
1939            for (key, child) in map {
1940                reject_unredacted_secret_markers(child, &json_path_child(path, key), policy)?;
1941            }
1942        }
1943        JsonValue::Array(items) => {
1944            for (index, item) in items.iter().enumerate() {
1945                reject_unredacted_secret_markers(item, &format!("{path}[{index}]"), policy)?;
1946            }
1947        }
1948        JsonValue::String(text) => {
1949            if matches!(policy.redact_string(text), Cow::Owned(_)) {
1950                return Err(SessionBundleError::UnsafeSecretMarker {
1951                    path: path.to_string(),
1952                    excerpt: secret_excerpt(text),
1953                });
1954            }
1955        }
1956        _ => {}
1957    }
1958    Ok(())
1959}
1960
1961fn secret_excerpt(text: &str) -> String {
1962    let excerpt = text.chars().take(80).collect::<String>();
1963    if text.chars().count() > 80 {
1964        format!("{excerpt}...")
1965    } else {
1966        excerpt
1967    }
1968}
1969
1970fn json_path_child(parent: &str, key: &str) -> String {
1971    if key
1972        .chars()
1973        .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1974    {
1975        format!("{parent}.{key}")
1976    } else {
1977        format!(
1978            "{parent}[{}]",
1979            serde_json::to_string(key).unwrap_or_default()
1980        )
1981    }
1982}