Skip to main content

harn_vm/
session_bundle.rs

1//! Canonical session bundle export/import support.
2//!
3//! A session bundle is the portable JSON envelope that downstream hosts use
4//! for local exports, sanitized shares, and replay-only handoffs. The bundle
5//! is intentionally built from existing Harn run-record surfaces so replay,
6//! transcripts, tool calls, HITL questions, and observability do not fork into
7//! a second persistence model.
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::fs;
12use std::path::Path;
13
14use chrono::{DateTime, SecondsFormat, Utc};
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value as JsonValue};
17
18use crate::agent_events::AgentEvent;
19use crate::event_log::sanitize_topic_component;
20use crate::orchestration::{
21    derive_run_observability, new_id, now_rfc3339, AgentSessionReplayEvent, ReplayFixture,
22    RunCheckpointRecord, RunChildRecord, RunExecutionRecord, RunHitlQuestionRecord,
23    RunObservabilityRecord, RunRecord, RunTraceSpanRecord, RunTransitionRecord,
24    RunVerificationOutcomeRecord, RunWorkerLineageRecord, ToolCallRecord,
25};
26use crate::redact::{json_path_child, RedactionEntry, RedactionPolicy, REDACTED_PLACEHOLDER};
27use crate::workspace_anchor::{anchor_from_transcript_metadata_json, MountedRoot, WorkspaceAnchor};
28
29mod schema;
30pub use schema::{session_bundle_schema, session_bundle_schema_pretty};
31
32#[cfg(test)]
33mod tests;
34
35pub const SESSION_BUNDLE_TYPE: &str = "harn_session_bundle";
36pub const SESSION_BUNDLE_SCHEMA_VERSION: u32 = 1;
37pub const SESSION_BUNDLE_SCHEMA_ID: &str = "https://harnlang.com/schemas/session-bundle.v1.json";
38pub const REPLAY_ONLY_PLACEHOLDER: &str = "[withheld]";
39
40/// Bundle `source.status` for a session reconstructed from an event stream that
41/// has no terminal `SessionClosed` event — a live loop frozen mid-turn for
42/// cross-compute migration (#3682), or a time-traveled prefix truncated before
43/// the close. The next action is still pending, so a consumer must CONTINUE the
44/// loop, not treat it as finished.
45pub const SESSION_BUNDLE_STATUS_SUSPENDED: &str = "suspended";
46
47/// Bundle `source.status` fallback for a session that closed without an
48/// explicit status on its terminal `SessionClosed` event.
49pub const SESSION_BUNDLE_STATUS_COMPLETED: &str = "completed";
50
51/// `metadata` key carrying the machine-readable liveness verdict
52/// (`"suspended"` | `"closed"`) so a resume host can decide continue-vs-replay
53/// without string-matching the human-facing `status`.
54pub const SESSION_BUNDLE_LIVENESS_KEY: &str = "session_liveness";
55
56/// Liveness of an agent session reconstructed from its event log.
57///
58/// Derived purely from the event stream: a session is [`Closed`] only when the
59/// stream contains a terminal `SessionClosed` event. Any other stream is
60/// [`Suspended`] — its next action is still pending.
61///
62/// [`Closed`]: AgentSessionLiveness::Closed
63/// [`Suspended`]: AgentSessionLiveness::Suspended
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum AgentSessionLiveness {
66    /// The stream ends in a `SessionClosed` event. `status` is that event's
67    /// status, or [`SESSION_BUNDLE_STATUS_COMPLETED`] when it was empty.
68    Closed { status: String, finished_at_ms: i64 },
69    /// No terminal `SessionClosed` event: the loop is frozen mid-flight and a
70    /// consumer must resume it rather than replay it as a finished run.
71    Suspended,
72}
73
74impl AgentSessionLiveness {
75    /// The human-facing `source.status` string for this liveness.
76    pub fn status(&self) -> &str {
77        match self {
78            AgentSessionLiveness::Closed { status, .. } => status,
79            AgentSessionLiveness::Suspended => SESSION_BUNDLE_STATUS_SUSPENDED,
80        }
81    }
82
83    /// The machine-readable [`SESSION_BUNDLE_LIVENESS_KEY`] tag.
84    pub fn tag(&self) -> &'static str {
85        match self {
86            AgentSessionLiveness::Closed { .. } => "closed",
87            AgentSessionLiveness::Suspended => "suspended",
88        }
89    }
90
91    /// Whether a consumer must CONTINUE the loop (vs. treat it as finished).
92    pub fn is_suspended(&self) -> bool {
93        matches!(self, AgentSessionLiveness::Suspended)
94    }
95}
96
97/// Classify an agent session's liveness from its replay event stream by finding
98/// the last terminal `SessionClosed` event. The keystone discriminator behind
99/// portable suspend/resume (#3682): without it, a frozen-mid-flight or
100/// time-traveled session is silently mislabeled `completed` and a resume host
101/// replays it instead of continuing the pending turn.
102pub fn agent_session_liveness(events: &[AgentSessionReplayEvent]) -> AgentSessionLiveness {
103    events
104        .iter()
105        .rev()
106        .find_map(|entry| match &entry.event {
107            AgentEvent::SessionClosed { status, .. } => Some(AgentSessionLiveness::Closed {
108                status: if status.is_empty() {
109                    SESSION_BUNDLE_STATUS_COMPLETED.to_string()
110                } else {
111                    status.clone()
112                },
113                finished_at_ms: entry.occurred_at_ms,
114            }),
115            _ => None,
116        })
117        .unwrap_or(AgentSessionLiveness::Suspended)
118}
119
120#[derive(Clone, Copy, Debug, Eq, PartialEq)]
121pub enum SessionBundleExportMode {
122    Local,
123    Sanitized,
124    ReplayOnly,
125}
126
127impl SessionBundleExportMode {
128    pub fn as_str(self) -> &'static str {
129        match self {
130            Self::Local => "local",
131            Self::Sanitized => "sanitized",
132            Self::ReplayOnly => "replay_only",
133        }
134    }
135}
136
137#[derive(Clone, Debug)]
138pub struct SessionBundleExportOptions {
139    pub mode: SessionBundleExportMode,
140    pub include_attachments: bool,
141    pub redaction_policy: RedactionPolicy,
142}
143
144impl Default for SessionBundleExportOptions {
145    fn default() -> Self {
146        Self {
147            mode: SessionBundleExportMode::Sanitized,
148            include_attachments: false,
149            redaction_policy: RedactionPolicy::default(),
150        }
151    }
152}
153
154#[derive(Clone, Debug, Default)]
155pub struct SessionBundleValidationOptions {
156    pub allow_unsafe_secret_markers: bool,
157    pub redaction_policy: RedactionPolicy,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
161#[serde(default)]
162pub struct SessionBundle {
163    #[serde(rename = "_type")]
164    pub type_name: String,
165    pub schema_version: u32,
166    pub bundle_id: String,
167    pub created_at: String,
168    pub producer: BundleProducer,
169    pub source: BundleSource,
170    pub runtime: BundleRuntime,
171    pub workspace: Option<BundleWorkspace>,
172    pub transcript: BundleTranscript,
173    pub tools: BundleTools,
174    pub permissions: Vec<BundlePermission>,
175    pub replay: BundleReplay,
176    pub redaction: RedactionManifest,
177    pub attachments: Vec<BundleAttachment>,
178    pub metadata: BTreeMap<String, JsonValue>,
179}
180
181impl Default for SessionBundle {
182    fn default() -> Self {
183        Self {
184            type_name: SESSION_BUNDLE_TYPE.to_string(),
185            schema_version: SESSION_BUNDLE_SCHEMA_VERSION,
186            bundle_id: String::new(),
187            created_at: String::new(),
188            producer: BundleProducer::default(),
189            source: BundleSource::default(),
190            runtime: BundleRuntime::default(),
191            workspace: None,
192            transcript: BundleTranscript::default(),
193            tools: BundleTools::default(),
194            permissions: Vec::new(),
195            replay: BundleReplay::default(),
196            redaction: RedactionManifest::default(),
197            attachments: Vec::new(),
198            metadata: BTreeMap::new(),
199        }
200    }
201}
202
203#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
204#[serde(default)]
205pub struct BundleProducer {
206    pub name: String,
207    pub version: String,
208    pub schema_id: String,
209}
210
211#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
212#[serde(default)]
213pub struct BundleSource {
214    pub kind: String,
215    pub run_record_id: String,
216    pub workflow_id: String,
217    pub workflow_name: Option<String>,
218    pub task: String,
219    pub status: String,
220    pub started_at: String,
221    pub finished_at: Option<String>,
222    pub persisted_path: Option<String>,
223    pub root_run_id: Option<String>,
224    pub parent_run_id: Option<String>,
225    pub child_run_count: usize,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
229#[serde(default)]
230pub struct BundleRuntime {
231    pub harn_version: String,
232    pub provider_models: Vec<String>,
233    pub usage: Option<BundleUsage>,
234    pub metadata: BTreeMap<String, JsonValue>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
238#[serde(default)]
239pub struct BundleUsage {
240    pub input_tokens: i64,
241    pub output_tokens: i64,
242    pub call_count: i64,
243    pub total_duration_ms: i64,
244    pub total_cost: f64,
245    pub models: Vec<String>,
246}
247
248#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
249#[serde(default)]
250pub struct BundleWorkspace {
251    /// Primary workspace path the session is anchored against.
252    pub primary: Option<String>,
253    /// Additional roots mounted alongside the primary anchor.
254    #[serde(default)]
255    pub additional_roots: Vec<BundleMountedRoot>,
256    /// RFC3339 timestamp when the anchor was set.
257    pub anchored_at: Option<String>,
258    /// Bundle workspace policy label. Currently always
259    /// `"safe_identity_only"`; future revisions may tie this to mount
260    /// modes once the PathScope matcher (#2216) lands.
261    pub policy: String,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
265#[serde(default)]
266pub struct BundleMountedRoot {
267    pub path: String,
268    pub mount_mode: String,
269    pub mounted_at: String,
270}
271
272impl From<&MountedRoot> for BundleMountedRoot {
273    fn from(root: &MountedRoot) -> Self {
274        Self {
275            path: root.path.to_string_lossy().into_owned(),
276            mount_mode: root.mount_mode.as_str().to_string(),
277            mounted_at: root.mounted_at.clone(),
278        }
279    }
280}
281
282impl From<&WorkspaceAnchor> for BundleWorkspace {
283    fn from(anchor: &WorkspaceAnchor) -> Self {
284        Self {
285            primary: Some(anchor.primary.to_string_lossy().into_owned()),
286            additional_roots: anchor.additional_roots.iter().map(Into::into).collect(),
287            anchored_at: Some(anchor.anchored_at.clone()),
288            policy: "safe_identity_only".to_string(),
289        }
290    }
291}
292
293#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
294#[serde(default)]
295pub struct BundleTranscript {
296    pub sections: Vec<BundleTranscriptSection>,
297}
298
299#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
300#[serde(default)]
301pub struct BundleTranscriptSection {
302    pub id: String,
303    pub label: String,
304    pub scope: String,
305    pub location: String,
306    pub summary: Option<String>,
307    pub messages: Vec<JsonValue>,
308    pub events: Vec<JsonValue>,
309    pub assets: Vec<JsonValue>,
310    pub metadata: BTreeMap<String, JsonValue>,
311}
312
313#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
314#[serde(default)]
315pub struct BundleTools {
316    pub schemas: Vec<BundleJsonEntry>,
317    pub calls: Vec<BundleToolCall>,
318}
319
320#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
321#[serde(default)]
322pub struct BundleJsonEntry {
323    pub source: String,
324    pub index: usize,
325    pub value: JsonValue,
326}
327
328#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
329#[serde(default)]
330pub struct BundleToolCall {
331    pub tool_name: String,
332    pub tool_use_id: String,
333    pub args_hash: String,
334    pub result: String,
335    pub is_rejected: bool,
336    pub duration_ms: u64,
337    pub iteration: usize,
338    pub timestamp: String,
339}
340
341impl From<&ToolCallRecord> for BundleToolCall {
342    fn from(record: &ToolCallRecord) -> Self {
343        Self {
344            tool_name: record.tool_name.clone(),
345            tool_use_id: record.tool_use_id.clone(),
346            args_hash: record.args_hash.clone(),
347            result: record.result.clone(),
348            is_rejected: record.is_rejected,
349            duration_ms: record.duration_ms,
350            iteration: record.iteration,
351            timestamp: record.timestamp.clone(),
352        }
353    }
354}
355
356#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(default)]
358pub struct BundlePermission {
359    pub kind: String,
360    pub source: String,
361    pub request_id: Option<String>,
362    pub agent: Option<String>,
363    pub payload: JsonValue,
364}
365
366// Not `Eq`: embeds `RunTraceSpanRecord`, which carries a float `cost_usd`
367// and is therefore only `PartialEq`.
368#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
369#[serde(default)]
370pub struct BundleReplay {
371    pub replay_fixture: Option<ReplayFixture>,
372    pub run_record: Option<JsonValue>,
373    #[serde(skip_serializing_if = "Option::is_none")]
374    pub observability: Option<RunObservabilityRecord>,
375    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
376    #[serde(skip_serializing_if = "Vec::is_empty")]
377    pub worker_snapshots: Vec<BundleWorkerSnapshot>,
378    pub event_log_pointers: Vec<BundleEventLogPointer>,
379    pub transitions: Vec<RunTransitionRecord>,
380    pub checkpoints: Vec<RunCheckpointRecord>,
381    pub trace_spans: Vec<RunTraceSpanRecord>,
382    pub deterministic_events: Vec<BundleJsonEntry>,
383}
384
385#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
386#[serde(default)]
387pub struct BundleWorkerSnapshot {
388    pub worker_id: String,
389    pub worker_name: String,
390    pub status: String,
391    pub snapshot_ref: String,
392    pub source_path: Option<String>,
393    pub value: JsonValue,
394}
395
396#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
397#[serde(default)]
398pub struct MaterializedWorkerSnapshot {
399    pub worker_id: String,
400    pub path: String,
401}
402
403#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
404#[serde(default)]
405pub struct BundleEventLogPointer {
406    pub kind: String,
407    pub topic: Option<String>,
408    pub path: Option<String>,
409    pub location: String,
410    pub available: bool,
411}
412
413#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
414#[serde(default)]
415pub struct RedactionManifest {
416    pub mode: String,
417    pub policy: String,
418    pub placeholder: String,
419    pub entries: Vec<RedactionEntry>,
420    pub unsafe_secret_markers_rejected: bool,
421}
422
423#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
424#[serde(default)]
425pub struct BundleAttachment {
426    pub id: String,
427    pub kind: String,
428    pub title: Option<String>,
429    pub stage: Option<String>,
430    pub text: Option<String>,
431    pub data: Option<JsonValue>,
432    pub metadata: BTreeMap<String, JsonValue>,
433}
434
435#[derive(Debug, Clone, PartialEq, Eq)]
436pub enum SessionBundleError {
437    Decode(String),
438    Encode(String),
439    MissingRequired(String),
440    UnsupportedSchemaVersion { found: u64, supported: u32 },
441    InvalidType { path: String, expected: String },
442    UnsupportedCheckpointState { status: String },
443    UnsafeSecretMarker { path: String, excerpt: String },
444    MissingRunRecord,
445    MissingSessionEvents { session_id: String },
446}
447
448impl fmt::Display for SessionBundleError {
449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450        match self {
451            Self::Decode(error) => write!(f, "failed to decode session bundle: {error}"),
452            Self::Encode(error) => write!(f, "failed to encode session bundle: {error}"),
453            Self::MissingRequired(path) => {
454                write!(f, "session bundle is missing required field {path}")
455            }
456            Self::UnsupportedSchemaVersion { found, supported } => write!(
457                f,
458                "unsupported session bundle schema_version {found}; this build supports <= {supported}"
459            ),
460            Self::InvalidType { path, expected } => {
461                write!(f, "session bundle field {path} must be {expected}")
462            }
463            Self::UnsupportedCheckpointState { status } => write!(
464                f,
465                "worker snapshot status {status:?} is not checkpointable; suspend the worker at a turn boundary first"
466            ),
467            Self::UnsafeSecretMarker { path, excerpt } => write!(
468                f,
469                "session bundle contains an unsafe unredacted secret marker at {path}: {excerpt}"
470            ),
471            Self::MissingRunRecord => write!(f, "session bundle does not include an importable run record"),
472            Self::MissingSessionEvents { session_id } => write!(
473                f,
474                "event log does not contain replayable events for session_id {session_id:?}"
475            ),
476        }
477    }
478}
479
480impl std::error::Error for SessionBundleError {}
481
482pub fn export_run_record_bundle(
483    run: &RunRecord,
484    options: &SessionBundleExportOptions,
485) -> Result<SessionBundle, SessionBundleError> {
486    let run_record_value =
487        serde_json::to_value(run).map_err(|error| SessionBundleError::Encode(error.to_string()))?;
488    let mut bundle = raw_bundle_from_run(run, run_record_value, options.include_attachments)?;
489    let mut bundle_value = serde_json::to_value(&bundle)
490        .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
491
492    let mut manifest = RedactionManifest {
493        mode: options.mode.as_str().to_string(),
494        policy: if matches!(options.mode, SessionBundleExportMode::Local) {
495            "none".to_string()
496        } else {
497            "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths".to_string()
498        },
499        placeholder: REDACTED_PLACEHOLDER.to_string(),
500        entries: Vec::new(),
501        unsafe_secret_markers_rejected: !matches!(options.mode, SessionBundleExportMode::Local),
502    };
503
504    if !matches!(options.mode, SessionBundleExportMode::Local) {
505        let redaction_policy = bundle_redaction_policy(&options.redaction_policy);
506        manifest
507            .entries
508            .extend(redaction_policy.redact_json_manifest(&mut bundle_value));
509        redact_bundle_pointer_paths_json(&mut bundle_value, "$", &mut manifest.entries);
510    }
511    if matches!(options.mode, SessionBundleExportMode::ReplayOnly) {
512        withhold_replay_only_json(&mut bundle_value, "$", &mut manifest.entries);
513    }
514    set_json_path(
515        &mut bundle_value,
516        &["redaction"],
517        serde_json::to_value(&manifest)
518            .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
519    );
520    bundle = serde_json::from_value(bundle_value)
521        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
522    Ok(bundle)
523}
524
525pub fn export_worker_snapshot_bundle(
526    snapshot_path: &Path,
527    options: &SessionBundleExportOptions,
528) -> Result<SessionBundle, SessionBundleError> {
529    let run = run_record_from_worker_snapshot(snapshot_path)?;
530    export_run_record_bundle(&run, options)
531}
532
533pub fn run_record_from_worker_snapshot(
534    snapshot_path: &Path,
535) -> Result<RunRecord, SessionBundleError> {
536    let content = fs::read_to_string(snapshot_path).map_err(|error| {
537        SessionBundleError::Decode(format!(
538            "failed to read worker snapshot {}: {error}",
539            snapshot_path.display()
540        ))
541    })?;
542    let value: JsonValue = serde_json::from_str(&content)
543        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
544    run_record_from_worker_snapshot_value(snapshot_path, value)
545}
546
547pub fn validate_session_bundle_value(
548    value: &JsonValue,
549    options: &SessionBundleValidationOptions,
550) -> Result<SessionBundle, SessionBundleError> {
551    require_field(value, "_type")?;
552    require_field(value, "schema_version")?;
553    require_field(value, "bundle_id")?;
554    require_field(value, "created_at")?;
555    require_field(value, "producer")?;
556    require_field(value, "source")?;
557    require_field(value, "runtime")?;
558    require_field(value, "transcript")?;
559    require_field(value, "tools")?;
560    require_field(value, "permissions")?;
561    require_field(value, "replay")?;
562    require_field(value, "redaction")?;
563    require_field(value, "attachments")?;
564    require_nested_field(value, &["producer", "name"])?;
565    require_nested_field(value, &["producer", "version"])?;
566    require_nested_field(value, &["producer", "schema_id"])?;
567    require_nested_field(value, &["source", "kind"])?;
568    require_nested_field(value, &["source", "run_record_id"])?;
569    require_nested_field(value, &["source", "workflow_id"])?;
570    require_nested_field(value, &["source", "task"])?;
571    require_nested_field(value, &["source", "status"])?;
572    require_nested_field(value, &["runtime", "harn_version"])?;
573    require_nested_field(value, &["runtime", "provider_models"])?;
574    require_nested_field(value, &["transcript", "sections"])?;
575    require_nested_field(value, &["tools", "schemas"])?;
576    require_nested_field(value, &["tools", "calls"])?;
577    require_nested_field(value, &["replay", "event_log_pointers"])?;
578    require_nested_field(value, &["replay", "transitions"])?;
579    require_nested_field(value, &["replay", "checkpoints"])?;
580    require_nested_field(value, &["replay", "trace_spans"])?;
581    require_nested_field(value, &["replay", "deterministic_events"])?;
582    require_nested_field(value, &["redaction", "mode"])?;
583    require_nested_field(value, &["redaction", "policy"])?;
584    require_nested_field(value, &["redaction", "placeholder"])?;
585    require_nested_field(value, &["redaction", "entries"])?;
586    require_nested_field(value, &["redaction", "unsafe_secret_markers_rejected"])?;
587
588    let type_name = value
589        .get("_type")
590        .and_then(JsonValue::as_str)
591        .ok_or_else(|| SessionBundleError::InvalidType {
592            path: "$._type".to_string(),
593            expected: "string".to_string(),
594        })?;
595    if type_name != SESSION_BUNDLE_TYPE {
596        return Err(SessionBundleError::InvalidType {
597            path: "$._type".to_string(),
598            expected: format!("\"{SESSION_BUNDLE_TYPE}\""),
599        });
600    }
601
602    let version = value
603        .get("schema_version")
604        .and_then(JsonValue::as_u64)
605        .ok_or_else(|| SessionBundleError::InvalidType {
606            path: "$.schema_version".to_string(),
607            expected: "positive integer".to_string(),
608        })?;
609    if version == 0 || version > u64::from(SESSION_BUNDLE_SCHEMA_VERSION) {
610        return Err(SessionBundleError::UnsupportedSchemaVersion {
611            found: version,
612            supported: SESSION_BUNDLE_SCHEMA_VERSION,
613        });
614    }
615
616    if !options.allow_unsafe_secret_markers {
617        if let Some(found) = options.redaction_policy.find_unredacted_secret(value) {
618            return Err(SessionBundleError::UnsafeSecretMarker {
619                path: found.path,
620                excerpt: found.excerpt,
621            });
622        }
623    }
624
625    serde_json::from_value::<SessionBundle>(value.clone())
626        .map_err(|error| SessionBundleError::Decode(error.to_string()))
627}
628
629pub fn validate_session_bundle_str(
630    content: &str,
631    options: &SessionBundleValidationOptions,
632) -> Result<SessionBundle, SessionBundleError> {
633    let value: JsonValue = serde_json::from_str(content)
634        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
635    validate_session_bundle_value(&value, options)
636}
637
638pub fn import_run_record_value(bundle: &SessionBundle) -> Result<JsonValue, SessionBundleError> {
639    let replay_observability = replay_observability_for_import(&bundle.replay);
640    if let Some(mut run_record) = bundle.replay.run_record.clone() {
641        let should_fill_observability = match run_record.get("observability") {
642            Some(value) => value.is_null(),
643            None => true,
644        };
645        if should_fill_observability {
646            if let (JsonValue::Object(map), Some(observability)) =
647                (&mut run_record, replay_observability.as_ref())
648            {
649                map.insert(
650                    "observability".to_string(),
651                    serde_json::to_value(observability)
652                        .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
653                );
654            }
655        }
656        return Ok(run_record);
657    }
658    if let Some(fixture) = &bundle.replay.replay_fixture {
659        let transcript = bundle.transcript.sections.first().map(|section| {
660            json!({
661                "_type": "transcript",
662                "messages": section.messages.clone(),
663                "events": section.events.clone(),
664                "assets": section.assets.clone(),
665                "summary": section.summary.clone(),
666                "metadata": section.metadata.clone(),
667            })
668        });
669        let hitl_questions = bundle
670            .permissions
671            .iter()
672            .filter(|permission| permission.kind == "hitl_question")
673            .map(|permission| permission.payload.clone())
674            .collect::<Vec<_>>();
675        return Ok(json!({
676            "_type": "run_record",
677            "id": bundle.source.run_record_id.clone(),
678            "workflow_id": bundle.source.workflow_id.clone(),
679            "workflow_name": bundle.source.workflow_name.clone(),
680            "task": bundle.source.task.clone(),
681            "status": bundle.source.status.clone(),
682            "started_at": bundle.source.started_at.clone(),
683            "finished_at": bundle.source.finished_at.clone(),
684            "stages": [],
685            "transitions": bundle.replay.transitions.clone(),
686            "checkpoints": bundle.replay.checkpoints.clone(),
687            "pending_nodes": [],
688            "completed_nodes": [],
689            "child_runs": [],
690            "artifacts": [],
691            "handoffs": [],
692            "policy": {},
693            "transcript": transcript,
694            "usage": bundle.runtime.usage.clone(),
695            "replay_fixture": fixture,
696            "observability": replay_observability,
697            "trace_spans": bundle.replay.trace_spans.clone(),
698            "tool_recordings": bundle.tools.calls.clone(),
699            "hitl_questions": hitl_questions,
700            "persona_runtime": [],
701            "metadata": {
702                "imported_from_session_bundle": bundle.bundle_id.clone(),
703                "session_bundle_schema_version": bundle.schema_version,
704                "worker_snapshot_count": bundle.replay.worker_snapshots.len(),
705            }
706        }));
707    }
708    Err(SessionBundleError::MissingRunRecord)
709}
710
711pub fn import_run_record_value_with_materialized_worker_snapshots(
712    bundle: &SessionBundle,
713    materialized: &[MaterializedWorkerSnapshot],
714) -> Result<JsonValue, SessionBundleError> {
715    let mut run_record = import_run_record_value(bundle)?;
716    apply_materialized_worker_snapshot_paths(&mut run_record, materialized);
717    Ok(run_record)
718}
719
720fn run_record_from_worker_snapshot_value(
721    snapshot_path: &Path,
722    value: JsonValue,
723) -> Result<RunRecord, SessionBundleError> {
724    require_worker_snapshot_marker(&value)?;
725    let status =
726        snapshot_string(&value, "status").ok_or_else(|| missing_worker_snapshot_field("status"))?;
727    if status != "suspended" {
728        return Err(SessionBundleError::UnsupportedCheckpointState { status });
729    }
730    require_worker_snapshot_object_field(&value, "config")?;
731    require_worker_snapshot_object_field(&value, "suspension")?;
732
733    let snapshot_path_string = snapshot_path.to_string_lossy().into_owned();
734    let worker_id =
735        snapshot_string(&value, "id").ok_or_else(|| missing_worker_snapshot_field("id"))?;
736    let worker_name = snapshot_string(&value, "name").unwrap_or_else(|| "worker".to_string());
737    let task = snapshot_string(&value, "task").unwrap_or_else(|| "Suspended worker".to_string());
738    let suspended_at = snapshot_pointer_string(&value, &["suspension", "suspended_at"]);
739    let started_at = snapshot_string(&value, "started_at")
740        .or_else(|| snapshot_string(&value, "created_at"))
741        .or_else(|| suspended_at.clone())
742        .unwrap_or_else(now_rfc3339);
743    let finished_at = snapshot_string(&value, "finished_at");
744    let session_id = snapshot_pointer_string(&value, &["config", "spec", "session_id"])
745        .or_else(|| snapshot_pointer_string(&value, &["audit", "session_id"]));
746    let parent_session_id =
747        snapshot_pointer_string(&value, &["config", "spec", "parent_session_id"])
748            .or_else(|| snapshot_pointer_string(&value, &["audit", "parent_session_id"]));
749    let child_run_id = snapshot_string(&value, "child_run_id");
750    let child_run_path = snapshot_string(&value, "child_run_path");
751    let execution = value
752        .get("execution")
753        .cloned()
754        .and_then(|value| serde_json::from_value::<RunExecutionRecord>(value).ok());
755
756    let child = RunChildRecord {
757        worker_id: worker_id.clone(),
758        worker_name: worker_name.clone(),
759        parent_stage_id: snapshot_string(&value, "parent_stage_id"),
760        session_id: session_id.clone(),
761        parent_session_id: parent_session_id.clone(),
762        mutation_scope: snapshot_pointer_string(&value, &["audit", "mutation_scope"]),
763        approval_policy: None,
764        task: task.clone(),
765        request: value.get("request").cloned(),
766        provenance: value.get("provenance").cloned(),
767        status: status.clone(),
768        started_at: started_at.clone(),
769        finished_at: finished_at.clone(),
770        run_id: child_run_id.clone(),
771        run_path: child_run_path.clone(),
772        snapshot_path: Some(snapshot_path_string.clone()),
773        execution,
774    };
775    let lineage = RunWorkerLineageRecord {
776        worker_id: worker_id.clone(),
777        worker_name,
778        parent_stage_id: child.parent_stage_id.clone(),
779        task: task.clone(),
780        status: status.clone(),
781        session_id,
782        parent_session_id,
783        run_id: child_run_id,
784        run_path: child_run_path,
785        snapshot_path: Some(snapshot_path_string.clone()),
786    };
787
788    let run_id = format!("checkpoint_{}", sanitize_topic_component(&worker_id));
789    let workflow_id = "worker_snapshot_checkpoint".to_string();
790    let workflow_name = Some("Worker snapshot checkpoint".to_string());
791    let checkpoint_id = format!("{run_id}_turn_boundary");
792    let checkpointed_at = suspended_at
793        .or_else(|| finished_at.clone())
794        .unwrap_or_else(|| started_at.clone());
795
796    Ok(RunRecord {
797        type_name: "run_record".to_string(),
798        id: run_id.clone(),
799        workflow_id: workflow_id.clone(),
800        workflow_name: workflow_name.clone(),
801        task,
802        status,
803        started_at,
804        finished_at,
805        checkpoints: vec![RunCheckpointRecord {
806            id: checkpoint_id,
807            ready_nodes: vec!["worker_snapshot_resume".to_string()],
808            completed_nodes: Vec::new(),
809            last_stage_id: None,
810            persisted_at: checkpointed_at.clone(),
811            reason: "suspended_worker_snapshot_turn_boundary".to_string(),
812        }],
813        child_runs: vec![child],
814        transcript: value.get("transcript").cloned(),
815        replay_fixture: Some(ReplayFixture {
816            type_name: "replay_fixture".to_string(),
817            id: format!("fixture_{run_id}"),
818            source_run_id: run_id,
819            workflow_id,
820            workflow_name,
821            created_at: checkpointed_at,
822            eval_kind: Some("worker_snapshot_checkpoint".to_string()),
823            expected_status: "suspended".to_string(),
824            ..ReplayFixture::default()
825        }),
826        observability: Some(RunObservabilityRecord {
827            schema_version: 4,
828            worker_lineage: vec![lineage],
829            ..RunObservabilityRecord::default()
830        }),
831        metadata: BTreeMap::from([
832            ("checkpoint_kind".to_string(), json!("worker_snapshot")),
833            (
834                "worker_snapshot_path".to_string(),
835                json!(snapshot_path_string),
836            ),
837        ]),
838        ..RunRecord::default()
839    })
840}
841
842fn snapshot_string(value: &JsonValue, key: &str) -> Option<String> {
843    value
844        .get(key)
845        .and_then(JsonValue::as_str)
846        .filter(|value| !value.is_empty())
847        .map(str::to_string)
848}
849
850fn missing_worker_snapshot_field(field: &str) -> SessionBundleError {
851    SessionBundleError::MissingRequired(format!("$.worker_snapshot.{field}"))
852}
853
854fn require_worker_snapshot_marker(value: &JsonValue) -> Result<(), SessionBundleError> {
855    match snapshot_string(value, "_type").as_deref() {
856        Some("worker_snapshot") => Ok(()),
857        Some(_) => Err(SessionBundleError::InvalidType {
858            path: "$.worker_snapshot._type".to_string(),
859            expected: "\"worker_snapshot\"".to_string(),
860        }),
861        None => Err(missing_worker_snapshot_field("_type")),
862    }
863}
864
865fn require_worker_snapshot_object_field(
866    value: &JsonValue,
867    field: &str,
868) -> Result<(), SessionBundleError> {
869    match value.get(field) {
870        Some(JsonValue::Object(_)) => Ok(()),
871        Some(_) => Err(SessionBundleError::InvalidType {
872            path: format!("$.worker_snapshot.{field}"),
873            expected: "object".to_string(),
874        }),
875        None => Err(missing_worker_snapshot_field(field)),
876    }
877}
878
879fn snapshot_pointer_string(value: &JsonValue, path: &[&str]) -> Option<String> {
880    let mut current = value;
881    for component in path {
882        current = current.get(*component)?;
883    }
884    current
885        .as_str()
886        .filter(|value| !value.is_empty())
887        .map(str::to_string)
888}
889
890pub fn materialize_worker_snapshots(
891    bundle: &SessionBundle,
892    out_dir: &Path,
893) -> Result<Vec<MaterializedWorkerSnapshot>, SessionBundleError> {
894    if bundle.replay.worker_snapshots.is_empty() {
895        return Ok(Vec::new());
896    }
897    fs::create_dir_all(out_dir).map_err(|error| {
898        SessionBundleError::Encode(format!(
899            "failed to create worker snapshot directory {}: {error}",
900            out_dir.display()
901        ))
902    })?;
903
904    let mut materialized = Vec::new();
905    for (index, snapshot) in bundle.replay.worker_snapshots.iter().enumerate() {
906        let worker_id = if snapshot.worker_id.trim().is_empty() {
907            format!("worker_{index}")
908        } else {
909            snapshot.worker_id.clone()
910        };
911        let path = out_dir.join(worker_snapshot_file_name(&worker_id, index));
912        let value = worker_snapshot_value_for_import(&snapshot.value, &path);
913        let rendered = serde_json::to_string_pretty(&value)
914            .map(|json| format!("{json}\n"))
915            .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
916        fs::write(&path, rendered).map_err(|error| {
917            SessionBundleError::Encode(format!(
918                "failed to write worker snapshot {}: {error}",
919                path.display()
920            ))
921        })?;
922        materialized.push(MaterializedWorkerSnapshot {
923            worker_id,
924            path: path.to_string_lossy().into_owned(),
925        });
926    }
927    Ok(materialized)
928}
929
930fn apply_materialized_worker_snapshot_paths(
931    run_record: &mut JsonValue,
932    materialized: &[MaterializedWorkerSnapshot],
933) {
934    if materialized.is_empty() {
935        return;
936    }
937
938    let paths_by_worker_id = materialized
939        .iter()
940        .filter(|snapshot| !snapshot.worker_id.is_empty())
941        .map(|snapshot| (snapshot.worker_id.as_str(), snapshot.path.as_str()))
942        .collect::<BTreeMap<_, _>>();
943    if paths_by_worker_id.is_empty() {
944        return;
945    }
946
947    rewrite_worker_snapshot_paths(run_record.get_mut("child_runs"), &paths_by_worker_id);
948    rewrite_worker_snapshot_paths(
949        run_record
950            .get_mut("observability")
951            .and_then(|observability| observability.get_mut("worker_lineage")),
952        &paths_by_worker_id,
953    );
954    rewrite_checkpoint_metadata_snapshot_path(run_record, materialized);
955}
956
957fn rewrite_worker_snapshot_paths(
958    records: Option<&mut JsonValue>,
959    paths_by_worker_id: &BTreeMap<&str, &str>,
960) {
961    let Some(records) = records.and_then(JsonValue::as_array_mut) else {
962        return;
963    };
964    for record in records {
965        let Some(worker_id) = record.get("worker_id").and_then(JsonValue::as_str) else {
966            continue;
967        };
968        let Some(path) = paths_by_worker_id.get(worker_id) else {
969            continue;
970        };
971        if let JsonValue::Object(map) = record {
972            map.insert(
973                "snapshot_path".to_string(),
974                JsonValue::String((*path).to_string()),
975            );
976        }
977    }
978}
979
980fn rewrite_checkpoint_metadata_snapshot_path(
981    run_record: &mut JsonValue,
982    materialized: &[MaterializedWorkerSnapshot],
983) {
984    let Some(snapshot) = materialized.first() else {
985        return;
986    };
987    let Some(metadata) = run_record
988        .get_mut("metadata")
989        .and_then(JsonValue::as_object_mut)
990    else {
991        return;
992    };
993    if metadata.contains_key("worker_snapshot_path") {
994        metadata.insert(
995            "worker_snapshot_path".to_string(),
996            JsonValue::String(snapshot.path.clone()),
997        );
998    }
999}
1000
1001fn replay_observability_for_import(replay: &BundleReplay) -> Option<RunObservabilityRecord> {
1002    let mut observability = replay.observability.clone().unwrap_or_default();
1003    let has_observability = replay.observability.is_some();
1004    let has_verification_outcomes = !replay.verification_outcomes.is_empty();
1005    if !has_observability && !has_verification_outcomes {
1006        return None;
1007    }
1008    if observability.schema_version == 0 {
1009        observability.schema_version = 4;
1010    }
1011    if observability.verification_outcomes.is_empty() && has_verification_outcomes {
1012        observability.verification_outcomes = replay.verification_outcomes.clone();
1013    }
1014    Some(observability)
1015}
1016
1017pub fn session_bundle_from_agent_session_events(
1018    session_id: &str,
1019    events: &[AgentSessionReplayEvent],
1020) -> Result<SessionBundle, SessionBundleError> {
1021    if events.is_empty() {
1022        return Err(SessionBundleError::MissingSessionEvents {
1023            session_id: session_id.to_string(),
1024        });
1025    }
1026
1027    let stable_id = sanitize_topic_component(session_id);
1028    let started_at = rfc3339_from_epoch_ms(events[0].occurred_at_ms);
1029    // Liveness, not a `completed` default: a stream without a terminal
1030    // `SessionClosed` event (a frozen-mid-flight loop, or a time-traveled
1031    // prefix) is `suspended`, so `finished_at` stays null and a resume host
1032    // continues the pending turn rather than replaying a "finished" run.
1033    let liveness = agent_session_liveness(events);
1034    let finished_at = match &liveness {
1035        AgentSessionLiveness::Closed { finished_at_ms, .. } => {
1036            Some(rfc3339_from_epoch_ms(*finished_at_ms))
1037        }
1038        AgentSessionLiveness::Suspended => None,
1039    };
1040    let status = liveness.status().to_string();
1041    let run_id = session_id.to_string();
1042    let workflow_id = "agent_session".to_string();
1043    let created_at = finished_at.clone().unwrap_or_else(|| started_at.clone());
1044    let transcript_events = transcript_events_from_agent_session(events)?;
1045    let transcript_messages = transcript_messages_from_agent_session(events);
1046    let mut transcript_metadata = BTreeMap::new();
1047    transcript_metadata.insert("session_id".to_string(), json!(session_id));
1048    transcript_metadata.insert(
1049        "source".to_string(),
1050        json!("events.sqlite observability.agent_events topic"),
1051    );
1052
1053    let replay_fixture = ReplayFixture {
1054        type_name: "replay_fixture".to_string(),
1055        id: format!("fixture_from_session_{stable_id}"),
1056        source_run_id: run_id.clone(),
1057        workflow_id: workflow_id.clone(),
1058        workflow_name: Some(format!("Agent session {session_id}")),
1059        created_at: created_at.clone(),
1060        eval_kind: Some("replay".to_string()),
1061        expected_status: status.clone(),
1062        ..ReplayFixture::default()
1063    };
1064
1065    Ok(SessionBundle {
1066        bundle_id: format!("bundle_from_session_{stable_id}"),
1067        created_at,
1068        producer: BundleProducer {
1069            name: "harn".to_string(),
1070            version: env!("CARGO_PKG_VERSION").to_string(),
1071            schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1072        },
1073        source: BundleSource {
1074            kind: "event_log_session".to_string(),
1075            run_record_id: run_id,
1076            workflow_id,
1077            workflow_name: Some(format!("Agent session {session_id}")),
1078            task: task_from_agent_session(events)
1079                .unwrap_or_else(|| format!("Agent session {session_id}")),
1080            status,
1081            started_at,
1082            finished_at,
1083            ..BundleSource::default()
1084        },
1085        runtime: BundleRuntime {
1086            harn_version: env!("CARGO_PKG_VERSION").to_string(),
1087            ..BundleRuntime::default()
1088        },
1089        transcript: BundleTranscript {
1090            sections: vec![BundleTranscriptSection {
1091                id: "agent_events".to_string(),
1092                label: "Agent event log".to_string(),
1093                scope: "session".to_string(),
1094                location: format!(
1095                    "observability.agent_events.{}",
1096                    sanitize_topic_component(session_id)
1097                ),
1098                summary: None,
1099                messages: transcript_messages,
1100                events: transcript_events,
1101                assets: Vec::new(),
1102                metadata: transcript_metadata,
1103            }],
1104        },
1105        permissions: permissions_from_agent_session(events),
1106        replay: BundleReplay {
1107            replay_fixture: Some(replay_fixture),
1108            event_log_pointers: vec![BundleEventLogPointer {
1109                kind: "agent_events".to_string(),
1110                topic: Some(format!(
1111                    "observability.agent_events.{}",
1112                    sanitize_topic_component(session_id)
1113                )),
1114                path: None,
1115                location: "events.sqlite".to_string(),
1116                available: true,
1117            }],
1118            deterministic_events: deterministic_events_from_agent_session(events)?,
1119            ..BundleReplay::default()
1120        },
1121        metadata: BTreeMap::from([(
1122            SESSION_BUNDLE_LIVENESS_KEY.to_string(),
1123            json!(liveness.tag()),
1124        )]),
1125        ..SessionBundle::default()
1126    })
1127}
1128
1129pub fn import_run_record_from_agent_session_events(
1130    session_id: &str,
1131    events: &[AgentSessionReplayEvent],
1132) -> Result<RunRecord, SessionBundleError> {
1133    let bundle = session_bundle_from_agent_session_events(session_id, events)?;
1134    let run_record = import_run_record_value(&bundle)?;
1135    serde_json::from_value(run_record)
1136        .map_err(|error| SessionBundleError::Decode(error.to_string()))
1137}
1138
1139fn transcript_events_from_agent_session(
1140    events: &[AgentSessionReplayEvent],
1141) -> Result<Vec<JsonValue>, SessionBundleError> {
1142    events
1143        .iter()
1144        .map(|entry| {
1145            let event = serde_json::to_value(&entry.event)
1146                .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
1147            Ok(json!({
1148                "event_id": entry.event_id,
1149                "kind": entry.kind,
1150                "occurred_at_ms": entry.occurred_at_ms,
1151                "event": event,
1152            }))
1153        })
1154        .collect()
1155}
1156
1157fn transcript_messages_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<JsonValue> {
1158    events
1159        .iter()
1160        .filter_map(|entry| match &entry.event {
1161            AgentEvent::UserMessage { content, .. } => Some(json!({
1162                "role": "user",
1163                "content": content,
1164            })),
1165            AgentEvent::AgentMessageChunk { content, .. } if !content.is_empty() => Some(json!({
1166                "role": "assistant",
1167                "content": content,
1168            })),
1169            _ => None,
1170        })
1171        .collect()
1172}
1173
1174fn permissions_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<BundlePermission> {
1175    let mut permissions = Vec::new();
1176    for entry in events {
1177        if let AgentEvent::HitlRequested {
1178            request_id,
1179            kind,
1180            payload,
1181            ..
1182        } = &entry.event
1183        {
1184            permissions.push(BundlePermission {
1185                kind: "hitl_question".to_string(),
1186                source: "agent_events".to_string(),
1187                request_id: Some(request_id.clone()),
1188                agent: None,
1189                payload: json!({
1190                    "kind": kind,
1191                    "payload": payload,
1192                    "event_id": entry.event_id,
1193                    "occurred_at_ms": entry.occurred_at_ms,
1194                }),
1195            });
1196        }
1197    }
1198    permissions
1199}
1200
1201fn deterministic_events_from_agent_session(
1202    events: &[AgentSessionReplayEvent],
1203) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1204    transcript_events_from_agent_session(events).map(|entries| {
1205        entries
1206            .into_iter()
1207            .enumerate()
1208            .map(|(index, value)| BundleJsonEntry {
1209                source: "events.sqlite.agent_events".to_string(),
1210                index,
1211                value,
1212            })
1213            .collect()
1214    })
1215}
1216
1217fn task_from_agent_session(events: &[AgentSessionReplayEvent]) -> Option<String> {
1218    events.iter().find_map(|entry| match &entry.event {
1219        AgentEvent::UserMessage { content, .. } => user_message_text(content),
1220        _ => None,
1221    })
1222}
1223
1224fn user_message_text(content: &[JsonValue]) -> Option<String> {
1225    let parts = content
1226        .iter()
1227        .filter_map(|value| {
1228            value
1229                .get("text")
1230                .and_then(JsonValue::as_str)
1231                .or_else(|| value.as_str())
1232                .map(str::to_string)
1233        })
1234        .filter(|text| !text.trim().is_empty())
1235        .collect::<Vec<_>>();
1236    if parts.is_empty() {
1237        None
1238    } else {
1239        Some(parts.join("\n"))
1240    }
1241}
1242
1243fn rfc3339_from_epoch_ms(ms: i64) -> String {
1244    DateTime::<Utc>::from_timestamp_millis(ms)
1245        .unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).expect("unix epoch is valid"))
1246        .to_rfc3339_opts(SecondsFormat::Millis, true)
1247}
1248
1249fn raw_bundle_from_run(
1250    run: &RunRecord,
1251    run_record_value: JsonValue,
1252    include_attachments: bool,
1253) -> Result<SessionBundle, SessionBundleError> {
1254    let mut bundle = SessionBundle {
1255        bundle_id: new_id("bundle"),
1256        created_at: now_rfc3339(),
1257        producer: BundleProducer {
1258            name: "harn".to_string(),
1259            version: env!("CARGO_PKG_VERSION").to_string(),
1260            schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1261        },
1262        source: BundleSource {
1263            kind: "run_record".to_string(),
1264            run_record_id: run.id.clone(),
1265            workflow_id: run.workflow_id.clone(),
1266            workflow_name: run.workflow_name.clone(),
1267            task: run.task.clone(),
1268            status: run.status.clone(),
1269            started_at: run.started_at.clone(),
1270            finished_at: run.finished_at.clone(),
1271            persisted_path: run.persisted_path.clone(),
1272            root_run_id: run.root_run_id.clone(),
1273            parent_run_id: run.parent_run_id.clone(),
1274            child_run_count: run.child_runs.len(),
1275        },
1276        runtime: BundleRuntime {
1277            harn_version: env!("CARGO_PKG_VERSION").to_string(),
1278            provider_models: run
1279                .usage
1280                .as_ref()
1281                .map(|usage| usage.models.clone())
1282                .unwrap_or_default(),
1283            usage: run.usage.as_ref().map(|usage| BundleUsage {
1284                input_tokens: usage.input_tokens,
1285                output_tokens: usage.output_tokens,
1286                call_count: usage.call_count,
1287                total_duration_ms: usage.total_duration_ms,
1288                total_cost: usage.total_cost,
1289                models: usage.models.clone(),
1290            }),
1291            metadata: BTreeMap::new(),
1292        },
1293        workspace: workspace_from_run(run),
1294        transcript: transcript_from_run(run),
1295        tools: BundleTools {
1296            schemas: tool_schema_entries(run),
1297            calls: run
1298                .tool_recordings
1299                .iter()
1300                .map(BundleToolCall::from)
1301                .collect(),
1302        },
1303        permissions: permissions_from_run(run),
1304        replay: BundleReplay {
1305            replay_fixture: run.replay_fixture.clone(),
1306            run_record: Some(run_record_value),
1307            observability: run.observability.clone(),
1308            verification_outcomes: verification_outcomes_for_run(run),
1309            worker_snapshots: worker_snapshots_from_run(run),
1310            event_log_pointers: event_log_pointers_from_run(run),
1311            transitions: run.transitions.clone(),
1312            checkpoints: run.checkpoints.clone(),
1313            trace_spans: run.trace_spans.clone(),
1314            deterministic_events: deterministic_events_from_run(run)?,
1315        },
1316        redaction: RedactionManifest {
1317            mode: "sanitized".to_string(),
1318            policy: "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths"
1319                .to_string(),
1320            placeholder: REDACTED_PLACEHOLDER.to_string(),
1321            entries: Vec::new(),
1322            unsafe_secret_markers_rejected: true,
1323        },
1324        attachments: if include_attachments {
1325            attachments_from_run(run)
1326        } else {
1327            Vec::new()
1328        },
1329        ..SessionBundle::default()
1330    };
1331    bundle.metadata.insert(
1332        "format_note".to_string(),
1333        json!("Session bundles are portable JSON envelopes; hosted share links should reference sanitized bundles rather than raw run records."),
1334    );
1335    Ok(bundle)
1336}
1337
1338fn verification_outcomes_for_run(run: &RunRecord) -> Vec<RunVerificationOutcomeRecord> {
1339    if let Some(observability) = run.observability.as_ref() {
1340        return observability.verification_outcomes.clone();
1341    }
1342    derive_run_observability(run, run.persisted_path.as_deref().map(Path::new))
1343        .verification_outcomes
1344}
1345
1346fn bundle_redaction_policy(base: &RedactionPolicy) -> RedactionPolicy {
1347    base.clone()
1348        .with_extra_field("persisted_path")
1349        .with_extra_field("primary")
1350        .with_extra_field("run_path")
1351        .with_extra_field("snapshot_ref")
1352        .with_extra_field("snapshot_path")
1353        .with_extra_field("source_path")
1354}
1355
1356fn workspace_from_run(run: &RunRecord) -> Option<BundleWorkspace> {
1357    let anchor = run
1358        .transcript
1359        .as_ref()
1360        .and_then(|transcript| transcript.get("metadata"))
1361        .and_then(anchor_from_transcript_metadata_json)?;
1362    Some(BundleWorkspace::from(&anchor))
1363}
1364
1365fn transcript_from_run(run: &RunRecord) -> BundleTranscript {
1366    let mut sections = Vec::new();
1367    if let Some(transcript) = &run.transcript {
1368        sections.push(transcript_section(
1369            "run",
1370            "Run transcript",
1371            "run",
1372            "$.transcript",
1373            transcript,
1374        ));
1375    }
1376    for (index, stage) in run.stages.iter().enumerate() {
1377        if let Some(transcript) = &stage.transcript {
1378            sections.push(transcript_section(
1379                &stage.id,
1380                &format!("Stage {}", stage.node_id),
1381                "stage",
1382                &format!("$.stages[{index}].transcript"),
1383                transcript,
1384            ));
1385        }
1386    }
1387    BundleTranscript { sections }
1388}
1389
1390fn transcript_section(
1391    id: &str,
1392    label: &str,
1393    scope: &str,
1394    location: &str,
1395    transcript: &JsonValue,
1396) -> BundleTranscriptSection {
1397    BundleTranscriptSection {
1398        id: id.to_string(),
1399        label: label.to_string(),
1400        scope: scope.to_string(),
1401        location: location.to_string(),
1402        summary: transcript
1403            .get("summary")
1404            .and_then(JsonValue::as_str)
1405            .map(str::to_string),
1406        messages: json_array(transcript.get("messages")),
1407        events: json_array(transcript.get("events")),
1408        assets: json_array(transcript.get("assets")),
1409        metadata: transcript
1410            .get("metadata")
1411            .and_then(JsonValue::as_object)
1412            .map(|map| {
1413                map.iter()
1414                    .map(|(key, value)| (key.clone(), value.clone()))
1415                    .collect()
1416            })
1417            .unwrap_or_default(),
1418    }
1419}
1420
1421fn json_array(value: Option<&JsonValue>) -> Vec<JsonValue> {
1422    value
1423        .and_then(JsonValue::as_array)
1424        .cloned()
1425        .unwrap_or_default()
1426}
1427
1428fn tool_schema_entries(run: &RunRecord) -> Vec<BundleJsonEntry> {
1429    let mut entries = Vec::new();
1430    collect_tool_schema_entries_from_transcript(&mut entries, "run.transcript", &run.transcript);
1431    for stage in &run.stages {
1432        collect_tool_schema_entries_from_transcript(
1433            &mut entries,
1434            &format!("stage.{}.transcript", stage.node_id),
1435            &stage.transcript,
1436        );
1437        if let Some(tools) = stage
1438            .metadata
1439            .get("tool_schemas")
1440            .or_else(|| stage.metadata.get("tools"))
1441        {
1442            entries.push(BundleJsonEntry {
1443                source: format!("stage.{}.metadata", stage.node_id),
1444                index: entries.len(),
1445                value: tools.clone(),
1446            });
1447        }
1448    }
1449    entries
1450}
1451
1452fn collect_tool_schema_entries_from_transcript(
1453    entries: &mut Vec<BundleJsonEntry>,
1454    source: &str,
1455    transcript: &Option<JsonValue>,
1456) {
1457    let Some(transcript) = transcript else {
1458        return;
1459    };
1460    for event in transcript
1461        .get("events")
1462        .and_then(JsonValue::as_array)
1463        .into_iter()
1464        .flatten()
1465    {
1466        let kind = event
1467            .get("type")
1468            .or_else(|| event.get("kind"))
1469            .and_then(JsonValue::as_str)
1470            .unwrap_or_default();
1471        if kind == "tool_schemas" || kind == "tool_schema" {
1472            entries.push(BundleJsonEntry {
1473                source: source.to_string(),
1474                index: entries.len(),
1475                value: event.clone(),
1476            });
1477        }
1478    }
1479}
1480
1481fn permissions_from_run(run: &RunRecord) -> Vec<BundlePermission> {
1482    let mut permissions = run
1483        .hitl_questions
1484        .iter()
1485        .map(permission_from_hitl_question)
1486        .collect::<Vec<_>>();
1487    collect_permission_events(&mut permissions, "run.transcript", &run.transcript);
1488    for stage in &run.stages {
1489        collect_permission_events(
1490            &mut permissions,
1491            &format!("stage.{}.transcript", stage.node_id),
1492            &stage.transcript,
1493        );
1494        if let Some(worker) = stage.metadata.get("worker") {
1495            if let Some(policy) = worker
1496                .get("audit")
1497                .and_then(|audit| audit.get("approval_policy"))
1498            {
1499                permissions.push(BundlePermission {
1500                    kind: "approval_policy".to_string(),
1501                    source: format!("stage.{}.worker.audit", stage.node_id),
1502                    request_id: None,
1503                    agent: worker
1504                        .get("name")
1505                        .and_then(JsonValue::as_str)
1506                        .map(str::to_string),
1507                    payload: policy.clone(),
1508                });
1509            }
1510        }
1511    }
1512    permissions
1513}
1514
1515fn permission_from_hitl_question(question: &RunHitlQuestionRecord) -> BundlePermission {
1516    BundlePermission {
1517        kind: "hitl_question".to_string(),
1518        source: "run.hitl_questions".to_string(),
1519        request_id: Some(question.request_id.clone()),
1520        agent: if question.agent.is_empty() {
1521            None
1522        } else {
1523            Some(question.agent.clone())
1524        },
1525        payload: serde_json::to_value(question).unwrap_or(JsonValue::Null),
1526    }
1527}
1528
1529fn collect_permission_events(
1530    permissions: &mut Vec<BundlePermission>,
1531    source: &str,
1532    transcript: &Option<JsonValue>,
1533) {
1534    let Some(transcript) = transcript else {
1535        return;
1536    };
1537    for event in transcript
1538        .get("events")
1539        .and_then(JsonValue::as_array)
1540        .into_iter()
1541        .flatten()
1542    {
1543        let kind = event
1544            .get("type")
1545            .or_else(|| event.get("kind"))
1546            .and_then(JsonValue::as_str)
1547            .unwrap_or_default();
1548        if kind.contains("permission") || kind.contains("approval") || kind.starts_with("hitl_") {
1549            permissions.push(BundlePermission {
1550                kind: kind.to_string(),
1551                source: source.to_string(),
1552                request_id: event
1553                    .get("request_id")
1554                    .or_else(|| event.get("id"))
1555                    .and_then(JsonValue::as_str)
1556                    .map(str::to_string),
1557                agent: event
1558                    .get("agent")
1559                    .and_then(JsonValue::as_str)
1560                    .map(str::to_string),
1561                payload: event.clone(),
1562            });
1563        }
1564    }
1565}
1566
1567fn event_log_pointers_from_run(run: &RunRecord) -> Vec<BundleEventLogPointer> {
1568    let mut pointers = Vec::new();
1569    if let Some(observability) = &run.observability {
1570        for pointer in &observability.transcript_pointers {
1571            pointers.push(BundleEventLogPointer {
1572                kind: pointer.kind.clone(),
1573                topic: None,
1574                path: pointer.path.clone(),
1575                location: pointer.location.clone(),
1576                available: pointer.available,
1577            });
1578        }
1579        for worker in &observability.worker_lineage {
1580            if let Some(session_id) = &worker.session_id {
1581                pointers.push(BundleEventLogPointer {
1582                    kind: "agent_events".to_string(),
1583                    topic: Some(format!("observability.agent_events.{session_id}")),
1584                    path: worker.snapshot_path.clone(),
1585                    location: format!("worker.{}.session", worker.worker_id),
1586                    available: worker.snapshot_path.is_some(),
1587                });
1588            }
1589        }
1590    }
1591    pointers
1592}
1593
1594fn worker_snapshots_from_run(run: &RunRecord) -> Vec<BundleWorkerSnapshot> {
1595    let mut snapshots = Vec::new();
1596    let mut seen_paths = BTreeSet::new();
1597    for child in &run.child_runs {
1598        let Some(path) = child.snapshot_path.as_deref() else {
1599            continue;
1600        };
1601        if !seen_paths.insert(path.to_string()) {
1602            continue;
1603        }
1604        if let Some(snapshot) = worker_snapshot_from_path(
1605            &child.worker_id,
1606            &child.worker_name,
1607            &child.status,
1608            Path::new(path),
1609        ) {
1610            snapshots.push(snapshot);
1611        }
1612    }
1613    if let Some(observability) = run.observability.as_ref() {
1614        for worker in &observability.worker_lineage {
1615            let Some(path) = worker.snapshot_path.as_deref() else {
1616                continue;
1617            };
1618            if !seen_paths.insert(path.to_string()) {
1619                continue;
1620            }
1621            if let Some(snapshot) = worker_snapshot_from_path(
1622                &worker.worker_id,
1623                &worker.worker_name,
1624                &worker.status,
1625                Path::new(path),
1626            ) {
1627                snapshots.push(snapshot);
1628            }
1629        }
1630    }
1631    snapshots
1632}
1633
1634fn worker_snapshot_from_path(
1635    worker_id: &str,
1636    worker_name: &str,
1637    status: &str,
1638    path: &Path,
1639) -> Option<BundleWorkerSnapshot> {
1640    let content = fs::read_to_string(path).ok()?;
1641    let value = serde_json::from_str::<JsonValue>(&content).ok()?;
1642    Some(BundleWorkerSnapshot {
1643        worker_id: if worker_id.is_empty() {
1644            value
1645                .get("id")
1646                .and_then(JsonValue::as_str)
1647                .unwrap_or_default()
1648                .to_string()
1649        } else {
1650            worker_id.to_string()
1651        },
1652        worker_name: if worker_name.is_empty() {
1653            value
1654                .get("name")
1655                .and_then(JsonValue::as_str)
1656                .unwrap_or("worker")
1657                .to_string()
1658        } else {
1659            worker_name.to_string()
1660        },
1661        status: if status.is_empty() {
1662            value
1663                .get("status")
1664                .and_then(JsonValue::as_str)
1665                .unwrap_or_default()
1666                .to_string()
1667        } else {
1668            status.to_string()
1669        },
1670        snapshot_ref: value
1671            .get("suspension")
1672            .and_then(|value| value.get("snapshot_ref"))
1673            .and_then(JsonValue::as_str)
1674            .or_else(|| value.get("snapshot_path").and_then(JsonValue::as_str))
1675            .unwrap_or_else(|| path.to_str().unwrap_or_default())
1676            .to_string(),
1677        source_path: Some(path.to_string_lossy().into_owned()),
1678        value,
1679    })
1680}
1681
1682fn worker_snapshot_file_name(worker_id: &str, index: usize) -> String {
1683    let component = sanitize_topic_component(worker_id);
1684    let component = if component.is_empty() {
1685        format!("worker_{index}")
1686    } else {
1687        component
1688    };
1689    format!("{component}.json")
1690}
1691
1692fn worker_snapshot_value_for_import(value: &JsonValue, path: &Path) -> JsonValue {
1693    let mut value = value.clone();
1694    let path = path.to_string_lossy().into_owned();
1695    if let JsonValue::Object(map) = &mut value {
1696        map.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
1697        if let Some(JsonValue::Object(suspension)) = map.get_mut("suspension") {
1698            suspension.insert("snapshot_ref".to_string(), JsonValue::String(path));
1699        }
1700    }
1701    value
1702}
1703
1704fn deterministic_events_from_run(
1705    run: &RunRecord,
1706) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1707    let mut events = Vec::new();
1708    for (index, transition) in run.transitions.iter().enumerate() {
1709        events.push(BundleJsonEntry {
1710            source: "run.transitions".to_string(),
1711            index,
1712            value: serde_json::to_value(transition)
1713                .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1714        });
1715    }
1716    for (index, checkpoint) in run.checkpoints.iter().enumerate() {
1717        events.push(BundleJsonEntry {
1718            source: "run.checkpoints".to_string(),
1719            index,
1720            value: serde_json::to_value(checkpoint)
1721                .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1722        });
1723    }
1724    Ok(events)
1725}
1726
1727fn attachments_from_run(run: &RunRecord) -> Vec<BundleAttachment> {
1728    run.artifacts
1729        .iter()
1730        .map(|artifact| BundleAttachment {
1731            id: artifact.id.clone(),
1732            kind: artifact.kind.clone(),
1733            title: artifact.title.clone(),
1734            stage: artifact.stage.clone(),
1735            text: artifact.text.clone(),
1736            data: artifact.data.clone(),
1737            metadata: artifact.metadata.clone(),
1738        })
1739        .collect()
1740}
1741
1742fn redact_bundle_pointer_paths_json(
1743    value: &mut JsonValue,
1744    path: &str,
1745    entries: &mut Vec<RedactionEntry>,
1746) {
1747    match value {
1748        JsonValue::Object(map) => {
1749            let keys = map.keys().cloned().collect::<Vec<_>>();
1750            for key in keys {
1751                let child_path = json_path_child(path, &key);
1752                if key == "path" && bundle_pointer_path_should_redact(&child_path) {
1753                    if !map.get(&key).is_some_and(JsonValue::is_null) {
1754                        map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1755                        entries.push(RedactionEntry {
1756                            path: child_path,
1757                            class: "local_pointer_path".to_string(),
1758                            action: "replaced".to_string(),
1759                            replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1760                        });
1761                    }
1762                } else if let Some(child) = map.get_mut(&key) {
1763                    redact_bundle_pointer_paths_json(child, &child_path, entries);
1764                }
1765            }
1766        }
1767        JsonValue::Array(items) => {
1768            for (index, item) in items.iter_mut().enumerate() {
1769                redact_bundle_pointer_paths_json(item, &format!("{path}[{index}]"), entries);
1770            }
1771        }
1772        _ => {}
1773    }
1774}
1775
1776fn bundle_pointer_path_should_redact(path: &str) -> bool {
1777    path.contains(".event_log_pointers[") || path.contains(".transcript_pointers[")
1778}
1779
1780fn withhold_replay_only_json(value: &mut JsonValue, path: &str, entries: &mut Vec<RedactionEntry>) {
1781    match value {
1782        JsonValue::Object(map) => {
1783            let keys = map.keys().cloned().collect::<Vec<_>>();
1784            for key in keys {
1785                let child_path = json_path_child(path, &key);
1786                if replay_only_field_is_prompt_payload(&key) {
1787                    if !map.get(&key).is_some_and(JsonValue::is_null) {
1788                        map.insert(key, JsonValue::String(REPLAY_ONLY_PLACEHOLDER.to_string()));
1789                        entries.push(RedactionEntry {
1790                            path: child_path,
1791                            class: "prompt_or_tool_payload".to_string(),
1792                            action: "withheld".to_string(),
1793                            replacement: Some(REPLAY_ONLY_PLACEHOLDER.to_string()),
1794                        });
1795                    }
1796                } else if let Some(child) = map.get_mut(&key) {
1797                    withhold_replay_only_json(child, &child_path, entries);
1798                }
1799            }
1800        }
1801        JsonValue::Array(items) => {
1802            for (index, item) in items.iter_mut().enumerate() {
1803                withhold_replay_only_json(item, &format!("{path}[{index}]"), entries);
1804            }
1805        }
1806        _ => {}
1807    }
1808}
1809
1810fn replay_only_field_is_prompt_payload(key: &str) -> bool {
1811    matches!(
1812        key,
1813        "args"
1814            | "arguments"
1815            | "blocks"
1816            | "content"
1817            | "data"
1818            | "private_reasoning"
1819            | "prompt"
1820            | "raw_input"
1821            | "raw_output"
1822            | "result"
1823            | "response_text"
1824            | "summary"
1825            | "system"
1826            | "system_prompt"
1827            | "task"
1828            | "text"
1829            | "thinking"
1830            | "visible_text"
1831    )
1832}
1833
1834fn set_json_path(value: &mut JsonValue, path: &[&str], replacement: JsonValue) {
1835    let Some((head, tail)) = path.split_first() else {
1836        *value = replacement;
1837        return;
1838    };
1839    if tail.is_empty() {
1840        if let JsonValue::Object(map) = value {
1841            map.insert((*head).to_string(), replacement);
1842        }
1843        return;
1844    }
1845    if let JsonValue::Object(map) = value {
1846        if let Some(child) = map.get_mut(*head) {
1847            set_json_path(child, tail, replacement);
1848        }
1849    }
1850}
1851
1852fn require_field(value: &JsonValue, field: &str) -> Result<(), SessionBundleError> {
1853    if value.get(field).is_some() {
1854        Ok(())
1855    } else {
1856        Err(SessionBundleError::MissingRequired(format!("$.{field}")))
1857    }
1858}
1859
1860fn require_nested_field(value: &JsonValue, path: &[&str]) -> Result<(), SessionBundleError> {
1861    let mut current = value;
1862    for segment in path {
1863        current = current
1864            .get(*segment)
1865            .ok_or_else(|| SessionBundleError::MissingRequired(json_path_from_segments(path)))?;
1866    }
1867    Ok(())
1868}
1869
1870fn json_path_from_segments(path: &[&str]) -> String {
1871    path.iter().fold("$".to_string(), |parent, segment| {
1872        json_path_child(&parent, segment)
1873    })
1874}