Skip to main content

harn_vm/
session_bundle.rs

1//! Canonical session bundle export/import support.
2//!
3//! A session bundle is the portable JSON envelope that downstream hosts use
4//! for local exports, sanitized shares, and replay-only handoffs. The bundle
5//! is intentionally built from existing Harn run-record surfaces so replay,
6//! transcripts, tool calls, HITL questions, and observability do not fork into
7//! a second persistence model.
8
9use std::borrow::Cow;
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12use std::fs;
13use std::path::Path;
14
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value as JsonValue};
18
19use crate::agent_events::AgentEvent;
20use crate::event_log::sanitize_topic_component;
21use crate::orchestration::{
22    derive_run_observability, new_id, now_rfc3339, AgentSessionReplayEvent, ReplayFixture,
23    RunCheckpointRecord, RunHitlQuestionRecord, RunObservabilityRecord, RunRecord,
24    RunTraceSpanRecord, RunTransitionRecord, RunVerificationOutcomeRecord, ToolCallRecord,
25};
26use crate::redact::{RedactionPolicy, REDACTED_PLACEHOLDER};
27use crate::workspace_anchor::{anchor_from_transcript_metadata_json, MountedRoot, WorkspaceAnchor};
28
29mod schema;
30pub use schema::{session_bundle_schema, session_bundle_schema_pretty};
31
32#[cfg(test)]
33mod tests;
34
35pub const SESSION_BUNDLE_TYPE: &str = "harn_session_bundle";
36pub const SESSION_BUNDLE_SCHEMA_VERSION: u32 = 1;
37pub const SESSION_BUNDLE_SCHEMA_ID: &str = "https://harnlang.com/schemas/session-bundle.v1.json";
38pub const REPLAY_ONLY_PLACEHOLDER: &str = "[withheld]";
39
40/// Bundle `source.status` for a session reconstructed from an event stream that
41/// has no terminal `SessionClosed` event — a live loop frozen mid-turn for
42/// cross-compute migration (#3682), or a time-traveled prefix truncated before
43/// the close. The next action is still pending, so a consumer must CONTINUE the
44/// loop, not treat it as finished.
45pub const SESSION_BUNDLE_STATUS_SUSPENDED: &str = "suspended";
46
47/// Bundle `source.status` fallback for a session that closed without an
48/// explicit status on its terminal `SessionClosed` event.
49pub const SESSION_BUNDLE_STATUS_COMPLETED: &str = "completed";
50
51/// `metadata` key carrying the machine-readable liveness verdict
52/// (`"suspended"` | `"closed"`) so a resume host can decide continue-vs-replay
53/// without string-matching the human-facing `status`.
54pub const SESSION_BUNDLE_LIVENESS_KEY: &str = "session_liveness";
55
56/// Liveness of an agent session reconstructed from its event log.
57///
58/// Derived purely from the event stream: a session is [`Closed`] only when the
59/// stream contains a terminal `SessionClosed` event. Any other stream is
60/// [`Suspended`] — its next action is still pending.
61///
62/// [`Closed`]: AgentSessionLiveness::Closed
63/// [`Suspended`]: AgentSessionLiveness::Suspended
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum AgentSessionLiveness {
66    /// The stream ends in a `SessionClosed` event. `status` is that event's
67    /// status, or [`SESSION_BUNDLE_STATUS_COMPLETED`] when it was empty.
68    Closed { status: String, finished_at_ms: i64 },
69    /// No terminal `SessionClosed` event: the loop is frozen mid-flight and a
70    /// consumer must resume it rather than replay it as a finished run.
71    Suspended,
72}
73
74impl AgentSessionLiveness {
75    /// The human-facing `source.status` string for this liveness.
76    pub fn status(&self) -> &str {
77        match self {
78            AgentSessionLiveness::Closed { status, .. } => status,
79            AgentSessionLiveness::Suspended => SESSION_BUNDLE_STATUS_SUSPENDED,
80        }
81    }
82
83    /// The machine-readable [`SESSION_BUNDLE_LIVENESS_KEY`] tag.
84    pub fn tag(&self) -> &'static str {
85        match self {
86            AgentSessionLiveness::Closed { .. } => "closed",
87            AgentSessionLiveness::Suspended => "suspended",
88        }
89    }
90
91    /// Whether a consumer must CONTINUE the loop (vs. treat it as finished).
92    pub fn is_suspended(&self) -> bool {
93        matches!(self, AgentSessionLiveness::Suspended)
94    }
95}
96
97/// Classify an agent session's liveness from its replay event stream by finding
98/// the last terminal `SessionClosed` event. The keystone discriminator behind
99/// portable suspend/resume (#3682): without it, a frozen-mid-flight or
100/// time-traveled session is silently mislabeled `completed` and a resume host
101/// replays it instead of continuing the pending turn.
102pub fn agent_session_liveness(events: &[AgentSessionReplayEvent]) -> AgentSessionLiveness {
103    events
104        .iter()
105        .rev()
106        .find_map(|entry| match &entry.event {
107            AgentEvent::SessionClosed { status, .. } => Some(AgentSessionLiveness::Closed {
108                status: if status.is_empty() {
109                    SESSION_BUNDLE_STATUS_COMPLETED.to_string()
110                } else {
111                    status.clone()
112                },
113                finished_at_ms: entry.occurred_at_ms,
114            }),
115            _ => None,
116        })
117        .unwrap_or(AgentSessionLiveness::Suspended)
118}
119
120#[derive(Clone, Copy, Debug, Eq, PartialEq)]
121pub enum SessionBundleExportMode {
122    Local,
123    Sanitized,
124    ReplayOnly,
125}
126
127impl SessionBundleExportMode {
128    pub fn as_str(self) -> &'static str {
129        match self {
130            Self::Local => "local",
131            Self::Sanitized => "sanitized",
132            Self::ReplayOnly => "replay_only",
133        }
134    }
135}
136
137#[derive(Clone, Debug)]
138pub struct SessionBundleExportOptions {
139    pub mode: SessionBundleExportMode,
140    pub include_attachments: bool,
141    pub redaction_policy: RedactionPolicy,
142}
143
144impl Default for SessionBundleExportOptions {
145    fn default() -> Self {
146        Self {
147            mode: SessionBundleExportMode::Sanitized,
148            include_attachments: false,
149            redaction_policy: RedactionPolicy::default(),
150        }
151    }
152}
153
154#[derive(Clone, Debug, Default)]
155pub struct SessionBundleValidationOptions {
156    pub allow_unsafe_secret_markers: bool,
157    pub redaction_policy: RedactionPolicy,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
161#[serde(default)]
162pub struct SessionBundle {
163    #[serde(rename = "_type")]
164    pub type_name: String,
165    pub schema_version: u32,
166    pub bundle_id: String,
167    pub created_at: String,
168    pub producer: BundleProducer,
169    pub source: BundleSource,
170    pub runtime: BundleRuntime,
171    pub workspace: Option<BundleWorkspace>,
172    pub transcript: BundleTranscript,
173    pub tools: BundleTools,
174    pub permissions: Vec<BundlePermission>,
175    pub replay: BundleReplay,
176    pub redaction: RedactionManifest,
177    pub attachments: Vec<BundleAttachment>,
178    pub metadata: BTreeMap<String, JsonValue>,
179}
180
181impl Default for SessionBundle {
182    fn default() -> Self {
183        Self {
184            type_name: SESSION_BUNDLE_TYPE.to_string(),
185            schema_version: SESSION_BUNDLE_SCHEMA_VERSION,
186            bundle_id: String::new(),
187            created_at: String::new(),
188            producer: BundleProducer::default(),
189            source: BundleSource::default(),
190            runtime: BundleRuntime::default(),
191            workspace: None,
192            transcript: BundleTranscript::default(),
193            tools: BundleTools::default(),
194            permissions: Vec::new(),
195            replay: BundleReplay::default(),
196            redaction: RedactionManifest::default(),
197            attachments: Vec::new(),
198            metadata: BTreeMap::new(),
199        }
200    }
201}
202
203#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
204#[serde(default)]
205pub struct BundleProducer {
206    pub name: String,
207    pub version: String,
208    pub schema_id: String,
209}
210
211#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
212#[serde(default)]
213pub struct BundleSource {
214    pub kind: String,
215    pub run_record_id: String,
216    pub workflow_id: String,
217    pub workflow_name: Option<String>,
218    pub task: String,
219    pub status: String,
220    pub started_at: String,
221    pub finished_at: Option<String>,
222    pub persisted_path: Option<String>,
223    pub root_run_id: Option<String>,
224    pub parent_run_id: Option<String>,
225    pub child_run_count: usize,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
229#[serde(default)]
230pub struct BundleRuntime {
231    pub harn_version: String,
232    pub provider_models: Vec<String>,
233    pub usage: Option<BundleUsage>,
234    pub metadata: BTreeMap<String, JsonValue>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
238#[serde(default)]
239pub struct BundleUsage {
240    pub input_tokens: i64,
241    pub output_tokens: i64,
242    pub call_count: i64,
243    pub total_duration_ms: i64,
244    pub total_cost: f64,
245    pub models: Vec<String>,
246}
247
248#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
249#[serde(default)]
250pub struct BundleWorkspace {
251    /// Primary workspace path the session is anchored against.
252    pub primary: Option<String>,
253    /// Additional roots mounted alongside the primary anchor.
254    #[serde(default)]
255    pub additional_roots: Vec<BundleMountedRoot>,
256    /// RFC3339 timestamp when the anchor was set.
257    pub anchored_at: Option<String>,
258    /// Bundle workspace policy label. Currently always
259    /// `"safe_identity_only"`; future revisions may tie this to mount
260    /// modes once the PathScope matcher (#2216) lands.
261    pub policy: String,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
265#[serde(default)]
266pub struct BundleMountedRoot {
267    pub path: String,
268    pub mount_mode: String,
269    pub mounted_at: String,
270}
271
272impl From<&MountedRoot> for BundleMountedRoot {
273    fn from(root: &MountedRoot) -> Self {
274        Self {
275            path: root.path.to_string_lossy().into_owned(),
276            mount_mode: root.mount_mode.as_str().to_string(),
277            mounted_at: root.mounted_at.clone(),
278        }
279    }
280}
281
282impl From<&WorkspaceAnchor> for BundleWorkspace {
283    fn from(anchor: &WorkspaceAnchor) -> Self {
284        Self {
285            primary: Some(anchor.primary.to_string_lossy().into_owned()),
286            additional_roots: anchor.additional_roots.iter().map(Into::into).collect(),
287            anchored_at: Some(anchor.anchored_at.clone()),
288            policy: "safe_identity_only".to_string(),
289        }
290    }
291}
292
293#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
294#[serde(default)]
295pub struct BundleTranscript {
296    pub sections: Vec<BundleTranscriptSection>,
297}
298
299#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
300#[serde(default)]
301pub struct BundleTranscriptSection {
302    pub id: String,
303    pub label: String,
304    pub scope: String,
305    pub location: String,
306    pub summary: Option<String>,
307    pub messages: Vec<JsonValue>,
308    pub events: Vec<JsonValue>,
309    pub assets: Vec<JsonValue>,
310    pub metadata: BTreeMap<String, JsonValue>,
311}
312
313#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
314#[serde(default)]
315pub struct BundleTools {
316    pub schemas: Vec<BundleJsonEntry>,
317    pub calls: Vec<BundleToolCall>,
318}
319
320#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
321#[serde(default)]
322pub struct BundleJsonEntry {
323    pub source: String,
324    pub index: usize,
325    pub value: JsonValue,
326}
327
328#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
329#[serde(default)]
330pub struct BundleToolCall {
331    pub tool_name: String,
332    pub tool_use_id: String,
333    pub args_hash: String,
334    pub result: String,
335    pub is_rejected: bool,
336    pub duration_ms: u64,
337    pub iteration: usize,
338    pub timestamp: String,
339}
340
341impl From<&ToolCallRecord> for BundleToolCall {
342    fn from(record: &ToolCallRecord) -> Self {
343        Self {
344            tool_name: record.tool_name.clone(),
345            tool_use_id: record.tool_use_id.clone(),
346            args_hash: record.args_hash.clone(),
347            result: record.result.clone(),
348            is_rejected: record.is_rejected,
349            duration_ms: record.duration_ms,
350            iteration: record.iteration,
351            timestamp: record.timestamp.clone(),
352        }
353    }
354}
355
356#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(default)]
358pub struct BundlePermission {
359    pub kind: String,
360    pub source: String,
361    pub request_id: Option<String>,
362    pub agent: Option<String>,
363    pub payload: JsonValue,
364}
365
366#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
367#[serde(default)]
368pub struct BundleReplay {
369    pub replay_fixture: Option<ReplayFixture>,
370    pub run_record: Option<JsonValue>,
371    #[serde(skip_serializing_if = "Option::is_none")]
372    pub observability: Option<RunObservabilityRecord>,
373    pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
374    #[serde(skip_serializing_if = "Vec::is_empty")]
375    pub worker_snapshots: Vec<BundleWorkerSnapshot>,
376    pub event_log_pointers: Vec<BundleEventLogPointer>,
377    pub transitions: Vec<RunTransitionRecord>,
378    pub checkpoints: Vec<RunCheckpointRecord>,
379    pub trace_spans: Vec<RunTraceSpanRecord>,
380    pub deterministic_events: Vec<BundleJsonEntry>,
381}
382
383#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
384#[serde(default)]
385pub struct BundleWorkerSnapshot {
386    pub worker_id: String,
387    pub worker_name: String,
388    pub status: String,
389    pub snapshot_ref: String,
390    pub source_path: Option<String>,
391    pub value: JsonValue,
392}
393
394#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
395#[serde(default)]
396pub struct MaterializedWorkerSnapshot {
397    pub worker_id: String,
398    pub path: String,
399}
400
401#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
402#[serde(default)]
403pub struct BundleEventLogPointer {
404    pub kind: String,
405    pub topic: Option<String>,
406    pub path: Option<String>,
407    pub location: String,
408    pub available: bool,
409}
410
411#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
412#[serde(default)]
413pub struct RedactionManifest {
414    pub mode: String,
415    pub policy: String,
416    pub placeholder: String,
417    pub entries: Vec<RedactionEntry>,
418    pub unsafe_secret_markers_rejected: bool,
419}
420
421#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
422#[serde(default)]
423pub struct RedactionEntry {
424    pub path: String,
425    pub class: String,
426    pub action: String,
427    pub replacement: Option<String>,
428}
429
430#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
431#[serde(default)]
432pub struct BundleAttachment {
433    pub id: String,
434    pub kind: String,
435    pub title: Option<String>,
436    pub stage: Option<String>,
437    pub text: Option<String>,
438    pub data: Option<JsonValue>,
439    pub metadata: BTreeMap<String, JsonValue>,
440}
441
442#[derive(Debug, Clone, PartialEq, Eq)]
443pub enum SessionBundleError {
444    Decode(String),
445    Encode(String),
446    MissingRequired(String),
447    UnsupportedSchemaVersion { found: u64, supported: u32 },
448    InvalidType { path: String, expected: String },
449    UnsafeSecretMarker { path: String, excerpt: String },
450    MissingRunRecord,
451    MissingSessionEvents { session_id: String },
452}
453
454impl fmt::Display for SessionBundleError {
455    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456        match self {
457            Self::Decode(error) => write!(f, "failed to decode session bundle: {error}"),
458            Self::Encode(error) => write!(f, "failed to encode session bundle: {error}"),
459            Self::MissingRequired(path) => {
460                write!(f, "session bundle is missing required field {path}")
461            }
462            Self::UnsupportedSchemaVersion { found, supported } => write!(
463                f,
464                "unsupported session bundle schema_version {found}; this build supports <= {supported}"
465            ),
466            Self::InvalidType { path, expected } => {
467                write!(f, "session bundle field {path} must be {expected}")
468            }
469            Self::UnsafeSecretMarker { path, excerpt } => write!(
470                f,
471                "session bundle contains an unsafe unredacted secret marker at {path}: {excerpt}"
472            ),
473            Self::MissingRunRecord => write!(f, "session bundle does not include an importable run record"),
474            Self::MissingSessionEvents { session_id } => write!(
475                f,
476                "event log does not contain replayable events for session_id {session_id:?}"
477            ),
478        }
479    }
480}
481
482impl std::error::Error for SessionBundleError {}
483
484pub fn export_run_record_bundle(
485    run: &RunRecord,
486    options: &SessionBundleExportOptions,
487) -> Result<SessionBundle, SessionBundleError> {
488    let run_record_value =
489        serde_json::to_value(run).map_err(|error| SessionBundleError::Encode(error.to_string()))?;
490    let mut bundle = raw_bundle_from_run(run, run_record_value, options.include_attachments)?;
491    let mut bundle_value = serde_json::to_value(&bundle)
492        .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
493
494    let mut manifest = RedactionManifest {
495        mode: options.mode.as_str().to_string(),
496        policy: if matches!(options.mode, SessionBundleExportMode::Local) {
497            "none".to_string()
498        } else {
499            "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths".to_string()
500        },
501        placeholder: REDACTED_PLACEHOLDER.to_string(),
502        entries: Vec::new(),
503        unsafe_secret_markers_rejected: !matches!(options.mode, SessionBundleExportMode::Local),
504    };
505
506    if !matches!(options.mode, SessionBundleExportMode::Local) {
507        let redaction_policy = bundle_redaction_policy(&options.redaction_policy);
508        redact_json_with_manifest(
509            &mut bundle_value,
510            "$",
511            &redaction_policy,
512            &mut manifest.entries,
513        );
514        redact_bundle_pointer_paths_json(&mut bundle_value, "$", &mut manifest.entries);
515    }
516    if matches!(options.mode, SessionBundleExportMode::ReplayOnly) {
517        withhold_replay_only_json(&mut bundle_value, "$", &mut manifest.entries);
518    }
519    set_json_path(
520        &mut bundle_value,
521        &["redaction"],
522        serde_json::to_value(&manifest)
523            .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
524    );
525    bundle = serde_json::from_value(bundle_value)
526        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
527    Ok(bundle)
528}
529
530pub fn validate_session_bundle_value(
531    value: &JsonValue,
532    options: &SessionBundleValidationOptions,
533) -> Result<SessionBundle, SessionBundleError> {
534    require_field(value, "_type")?;
535    require_field(value, "schema_version")?;
536    require_field(value, "bundle_id")?;
537    require_field(value, "created_at")?;
538    require_field(value, "producer")?;
539    require_field(value, "source")?;
540    require_field(value, "runtime")?;
541    require_field(value, "transcript")?;
542    require_field(value, "tools")?;
543    require_field(value, "permissions")?;
544    require_field(value, "replay")?;
545    require_field(value, "redaction")?;
546    require_field(value, "attachments")?;
547    require_nested_field(value, &["producer", "name"])?;
548    require_nested_field(value, &["producer", "version"])?;
549    require_nested_field(value, &["producer", "schema_id"])?;
550    require_nested_field(value, &["source", "kind"])?;
551    require_nested_field(value, &["source", "run_record_id"])?;
552    require_nested_field(value, &["source", "workflow_id"])?;
553    require_nested_field(value, &["source", "task"])?;
554    require_nested_field(value, &["source", "status"])?;
555    require_nested_field(value, &["runtime", "harn_version"])?;
556    require_nested_field(value, &["runtime", "provider_models"])?;
557    require_nested_field(value, &["transcript", "sections"])?;
558    require_nested_field(value, &["tools", "schemas"])?;
559    require_nested_field(value, &["tools", "calls"])?;
560    require_nested_field(value, &["replay", "event_log_pointers"])?;
561    require_nested_field(value, &["replay", "transitions"])?;
562    require_nested_field(value, &["replay", "checkpoints"])?;
563    require_nested_field(value, &["replay", "trace_spans"])?;
564    require_nested_field(value, &["replay", "deterministic_events"])?;
565    require_nested_field(value, &["redaction", "mode"])?;
566    require_nested_field(value, &["redaction", "policy"])?;
567    require_nested_field(value, &["redaction", "placeholder"])?;
568    require_nested_field(value, &["redaction", "entries"])?;
569    require_nested_field(value, &["redaction", "unsafe_secret_markers_rejected"])?;
570
571    let type_name = value
572        .get("_type")
573        .and_then(JsonValue::as_str)
574        .ok_or_else(|| SessionBundleError::InvalidType {
575            path: "$._type".to_string(),
576            expected: "string".to_string(),
577        })?;
578    if type_name != SESSION_BUNDLE_TYPE {
579        return Err(SessionBundleError::InvalidType {
580            path: "$._type".to_string(),
581            expected: format!("\"{SESSION_BUNDLE_TYPE}\""),
582        });
583    }
584
585    let version = value
586        .get("schema_version")
587        .and_then(JsonValue::as_u64)
588        .ok_or_else(|| SessionBundleError::InvalidType {
589            path: "$.schema_version".to_string(),
590            expected: "positive integer".to_string(),
591        })?;
592    if version == 0 || version > u64::from(SESSION_BUNDLE_SCHEMA_VERSION) {
593        return Err(SessionBundleError::UnsupportedSchemaVersion {
594            found: version,
595            supported: SESSION_BUNDLE_SCHEMA_VERSION,
596        });
597    }
598
599    if !options.allow_unsafe_secret_markers {
600        reject_unredacted_secret_markers(value, "$", &options.redaction_policy)?;
601    }
602
603    serde_json::from_value::<SessionBundle>(value.clone())
604        .map_err(|error| SessionBundleError::Decode(error.to_string()))
605}
606
607pub fn validate_session_bundle_str(
608    content: &str,
609    options: &SessionBundleValidationOptions,
610) -> Result<SessionBundle, SessionBundleError> {
611    let value: JsonValue = serde_json::from_str(content)
612        .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
613    validate_session_bundle_value(&value, options)
614}
615
616pub fn import_run_record_value(bundle: &SessionBundle) -> Result<JsonValue, SessionBundleError> {
617    let replay_observability = replay_observability_for_import(&bundle.replay);
618    if let Some(mut run_record) = bundle.replay.run_record.clone() {
619        let should_fill_observability = match run_record.get("observability") {
620            Some(value) => value.is_null(),
621            None => true,
622        };
623        if should_fill_observability {
624            if let (JsonValue::Object(map), Some(observability)) =
625                (&mut run_record, replay_observability.as_ref())
626            {
627                map.insert(
628                    "observability".to_string(),
629                    serde_json::to_value(observability)
630                        .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
631                );
632            }
633        }
634        return Ok(run_record);
635    }
636    if let Some(fixture) = &bundle.replay.replay_fixture {
637        let transcript = bundle.transcript.sections.first().map(|section| {
638            json!({
639                "_type": "transcript",
640                "messages": section.messages.clone(),
641                "events": section.events.clone(),
642                "assets": section.assets.clone(),
643                "summary": section.summary.clone(),
644                "metadata": section.metadata.clone(),
645            })
646        });
647        let hitl_questions = bundle
648            .permissions
649            .iter()
650            .filter(|permission| permission.kind == "hitl_question")
651            .map(|permission| permission.payload.clone())
652            .collect::<Vec<_>>();
653        return Ok(json!({
654            "_type": "run_record",
655            "id": bundle.source.run_record_id.clone(),
656            "workflow_id": bundle.source.workflow_id.clone(),
657            "workflow_name": bundle.source.workflow_name.clone(),
658            "task": bundle.source.task.clone(),
659            "status": bundle.source.status.clone(),
660            "started_at": bundle.source.started_at.clone(),
661            "finished_at": bundle.source.finished_at.clone(),
662            "stages": [],
663            "transitions": bundle.replay.transitions.clone(),
664            "checkpoints": bundle.replay.checkpoints.clone(),
665            "pending_nodes": [],
666            "completed_nodes": [],
667            "child_runs": [],
668            "artifacts": [],
669            "handoffs": [],
670            "policy": {},
671            "transcript": transcript,
672            "usage": bundle.runtime.usage.clone(),
673            "replay_fixture": fixture,
674            "observability": replay_observability,
675            "trace_spans": bundle.replay.trace_spans.clone(),
676            "tool_recordings": bundle.tools.calls.clone(),
677            "hitl_questions": hitl_questions,
678            "persona_runtime": [],
679            "metadata": {
680                "imported_from_session_bundle": bundle.bundle_id.clone(),
681                "session_bundle_schema_version": bundle.schema_version,
682                "worker_snapshot_count": bundle.replay.worker_snapshots.len(),
683            }
684        }));
685    }
686    Err(SessionBundleError::MissingRunRecord)
687}
688
689pub fn import_run_record_value_with_materialized_worker_snapshots(
690    bundle: &SessionBundle,
691    materialized: &[MaterializedWorkerSnapshot],
692) -> Result<JsonValue, SessionBundleError> {
693    let mut run_record = import_run_record_value(bundle)?;
694    apply_materialized_worker_snapshot_paths(&mut run_record, materialized);
695    Ok(run_record)
696}
697
698pub fn materialize_worker_snapshots(
699    bundle: &SessionBundle,
700    out_dir: &Path,
701) -> Result<Vec<MaterializedWorkerSnapshot>, SessionBundleError> {
702    if bundle.replay.worker_snapshots.is_empty() {
703        return Ok(Vec::new());
704    }
705    fs::create_dir_all(out_dir).map_err(|error| {
706        SessionBundleError::Encode(format!(
707            "failed to create worker snapshot directory {}: {error}",
708            out_dir.display()
709        ))
710    })?;
711
712    let mut materialized = Vec::new();
713    for (index, snapshot) in bundle.replay.worker_snapshots.iter().enumerate() {
714        let worker_id = if snapshot.worker_id.trim().is_empty() {
715            format!("worker_{index}")
716        } else {
717            snapshot.worker_id.clone()
718        };
719        let path = out_dir.join(worker_snapshot_file_name(&worker_id, index));
720        let value = worker_snapshot_value_for_import(&snapshot.value, &path);
721        let rendered = serde_json::to_string_pretty(&value)
722            .map(|json| format!("{json}\n"))
723            .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
724        fs::write(&path, rendered).map_err(|error| {
725            SessionBundleError::Encode(format!(
726                "failed to write worker snapshot {}: {error}",
727                path.display()
728            ))
729        })?;
730        materialized.push(MaterializedWorkerSnapshot {
731            worker_id,
732            path: path.to_string_lossy().into_owned(),
733        });
734    }
735    Ok(materialized)
736}
737
738fn apply_materialized_worker_snapshot_paths(
739    run_record: &mut JsonValue,
740    materialized: &[MaterializedWorkerSnapshot],
741) {
742    if materialized.is_empty() {
743        return;
744    }
745
746    let paths_by_worker_id = materialized
747        .iter()
748        .filter(|snapshot| !snapshot.worker_id.is_empty())
749        .map(|snapshot| (snapshot.worker_id.as_str(), snapshot.path.as_str()))
750        .collect::<BTreeMap<_, _>>();
751    if paths_by_worker_id.is_empty() {
752        return;
753    }
754
755    rewrite_worker_snapshot_paths(run_record.get_mut("child_runs"), &paths_by_worker_id);
756    rewrite_worker_snapshot_paths(
757        run_record
758            .get_mut("observability")
759            .and_then(|observability| observability.get_mut("worker_lineage")),
760        &paths_by_worker_id,
761    );
762}
763
764fn rewrite_worker_snapshot_paths(
765    records: Option<&mut JsonValue>,
766    paths_by_worker_id: &BTreeMap<&str, &str>,
767) {
768    let Some(records) = records.and_then(JsonValue::as_array_mut) else {
769        return;
770    };
771    for record in records {
772        let Some(worker_id) = record.get("worker_id").and_then(JsonValue::as_str) else {
773            continue;
774        };
775        let Some(path) = paths_by_worker_id.get(worker_id) else {
776            continue;
777        };
778        if let JsonValue::Object(map) = record {
779            map.insert(
780                "snapshot_path".to_string(),
781                JsonValue::String((*path).to_string()),
782            );
783        }
784    }
785}
786
787fn replay_observability_for_import(replay: &BundleReplay) -> Option<RunObservabilityRecord> {
788    let mut observability = replay.observability.clone().unwrap_or_default();
789    let has_observability = replay.observability.is_some();
790    let has_verification_outcomes = !replay.verification_outcomes.is_empty();
791    if !has_observability && !has_verification_outcomes {
792        return None;
793    }
794    if observability.schema_version == 0 {
795        observability.schema_version = 4;
796    }
797    if observability.verification_outcomes.is_empty() && has_verification_outcomes {
798        observability.verification_outcomes = replay.verification_outcomes.clone();
799    }
800    Some(observability)
801}
802
803pub fn session_bundle_from_agent_session_events(
804    session_id: &str,
805    events: &[AgentSessionReplayEvent],
806) -> Result<SessionBundle, SessionBundleError> {
807    if events.is_empty() {
808        return Err(SessionBundleError::MissingSessionEvents {
809            session_id: session_id.to_string(),
810        });
811    }
812
813    let stable_id = sanitize_topic_component(session_id);
814    let started_at = rfc3339_from_epoch_ms(events[0].occurred_at_ms);
815    // Liveness, not a `completed` default: a stream without a terminal
816    // `SessionClosed` event (a frozen-mid-flight loop, or a time-traveled
817    // prefix) is `suspended`, so `finished_at` stays null and a resume host
818    // continues the pending turn rather than replaying a "finished" run.
819    let liveness = agent_session_liveness(events);
820    let finished_at = match &liveness {
821        AgentSessionLiveness::Closed { finished_at_ms, .. } => {
822            Some(rfc3339_from_epoch_ms(*finished_at_ms))
823        }
824        AgentSessionLiveness::Suspended => None,
825    };
826    let status = liveness.status().to_string();
827    let run_id = session_id.to_string();
828    let workflow_id = "agent_session".to_string();
829    let created_at = finished_at.clone().unwrap_or_else(|| started_at.clone());
830    let transcript_events = transcript_events_from_agent_session(events)?;
831    let transcript_messages = transcript_messages_from_agent_session(events);
832    let mut transcript_metadata = BTreeMap::new();
833    transcript_metadata.insert("session_id".to_string(), json!(session_id));
834    transcript_metadata.insert(
835        "source".to_string(),
836        json!("events.sqlite observability.agent_events topic"),
837    );
838
839    let replay_fixture = ReplayFixture {
840        type_name: "replay_fixture".to_string(),
841        id: format!("fixture_from_session_{stable_id}"),
842        source_run_id: run_id.clone(),
843        workflow_id: workflow_id.clone(),
844        workflow_name: Some(format!("Agent session {session_id}")),
845        created_at: created_at.clone(),
846        eval_kind: Some("replay".to_string()),
847        expected_status: status.clone(),
848        ..ReplayFixture::default()
849    };
850
851    Ok(SessionBundle {
852        bundle_id: format!("bundle_from_session_{stable_id}"),
853        created_at,
854        producer: BundleProducer {
855            name: "harn".to_string(),
856            version: env!("CARGO_PKG_VERSION").to_string(),
857            schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
858        },
859        source: BundleSource {
860            kind: "event_log_session".to_string(),
861            run_record_id: run_id,
862            workflow_id,
863            workflow_name: Some(format!("Agent session {session_id}")),
864            task: task_from_agent_session(events)
865                .unwrap_or_else(|| format!("Agent session {session_id}")),
866            status,
867            started_at,
868            finished_at,
869            ..BundleSource::default()
870        },
871        runtime: BundleRuntime {
872            harn_version: env!("CARGO_PKG_VERSION").to_string(),
873            ..BundleRuntime::default()
874        },
875        transcript: BundleTranscript {
876            sections: vec![BundleTranscriptSection {
877                id: "agent_events".to_string(),
878                label: "Agent event log".to_string(),
879                scope: "session".to_string(),
880                location: format!(
881                    "observability.agent_events.{}",
882                    sanitize_topic_component(session_id)
883                ),
884                summary: None,
885                messages: transcript_messages,
886                events: transcript_events,
887                assets: Vec::new(),
888                metadata: transcript_metadata,
889            }],
890        },
891        permissions: permissions_from_agent_session(events),
892        replay: BundleReplay {
893            replay_fixture: Some(replay_fixture),
894            event_log_pointers: vec![BundleEventLogPointer {
895                kind: "agent_events".to_string(),
896                topic: Some(format!(
897                    "observability.agent_events.{}",
898                    sanitize_topic_component(session_id)
899                )),
900                path: None,
901                location: "events.sqlite".to_string(),
902                available: true,
903            }],
904            deterministic_events: deterministic_events_from_agent_session(events)?,
905            ..BundleReplay::default()
906        },
907        metadata: BTreeMap::from([(
908            SESSION_BUNDLE_LIVENESS_KEY.to_string(),
909            json!(liveness.tag()),
910        )]),
911        ..SessionBundle::default()
912    })
913}
914
915pub fn import_run_record_from_agent_session_events(
916    session_id: &str,
917    events: &[AgentSessionReplayEvent],
918) -> Result<RunRecord, SessionBundleError> {
919    let bundle = session_bundle_from_agent_session_events(session_id, events)?;
920    let run_record = import_run_record_value(&bundle)?;
921    serde_json::from_value(run_record)
922        .map_err(|error| SessionBundleError::Decode(error.to_string()))
923}
924
925fn transcript_events_from_agent_session(
926    events: &[AgentSessionReplayEvent],
927) -> Result<Vec<JsonValue>, SessionBundleError> {
928    events
929        .iter()
930        .map(|entry| {
931            let event = serde_json::to_value(&entry.event)
932                .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
933            Ok(json!({
934                "event_id": entry.event_id,
935                "kind": entry.kind,
936                "occurred_at_ms": entry.occurred_at_ms,
937                "event": event,
938            }))
939        })
940        .collect()
941}
942
943fn transcript_messages_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<JsonValue> {
944    events
945        .iter()
946        .filter_map(|entry| match &entry.event {
947            AgentEvent::UserMessage { content, .. } => Some(json!({
948                "role": "user",
949                "content": content,
950            })),
951            AgentEvent::AgentMessageChunk { content, .. } if !content.is_empty() => Some(json!({
952                "role": "assistant",
953                "content": content,
954            })),
955            _ => None,
956        })
957        .collect()
958}
959
960fn permissions_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<BundlePermission> {
961    let mut permissions = Vec::new();
962    for entry in events {
963        if let AgentEvent::HitlRequested {
964            request_id,
965            kind,
966            payload,
967            ..
968        } = &entry.event
969        {
970            permissions.push(BundlePermission {
971                kind: "hitl_question".to_string(),
972                source: "agent_events".to_string(),
973                request_id: Some(request_id.clone()),
974                agent: None,
975                payload: json!({
976                    "kind": kind,
977                    "payload": payload,
978                    "event_id": entry.event_id,
979                    "occurred_at_ms": entry.occurred_at_ms,
980                }),
981            });
982        }
983    }
984    permissions
985}
986
987fn deterministic_events_from_agent_session(
988    events: &[AgentSessionReplayEvent],
989) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
990    transcript_events_from_agent_session(events).map(|entries| {
991        entries
992            .into_iter()
993            .enumerate()
994            .map(|(index, value)| BundleJsonEntry {
995                source: "events.sqlite.agent_events".to_string(),
996                index,
997                value,
998            })
999            .collect()
1000    })
1001}
1002
1003fn task_from_agent_session(events: &[AgentSessionReplayEvent]) -> Option<String> {
1004    events.iter().find_map(|entry| match &entry.event {
1005        AgentEvent::UserMessage { content, .. } => user_message_text(content),
1006        _ => None,
1007    })
1008}
1009
1010fn user_message_text(content: &[JsonValue]) -> Option<String> {
1011    let parts = content
1012        .iter()
1013        .filter_map(|value| {
1014            value
1015                .get("text")
1016                .and_then(JsonValue::as_str)
1017                .or_else(|| value.as_str())
1018                .map(str::to_string)
1019        })
1020        .filter(|text| !text.trim().is_empty())
1021        .collect::<Vec<_>>();
1022    if parts.is_empty() {
1023        None
1024    } else {
1025        Some(parts.join("\n"))
1026    }
1027}
1028
1029fn rfc3339_from_epoch_ms(ms: i64) -> String {
1030    DateTime::<Utc>::from_timestamp_millis(ms)
1031        .unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).expect("unix epoch is valid"))
1032        .to_rfc3339_opts(SecondsFormat::Millis, true)
1033}
1034
1035fn raw_bundle_from_run(
1036    run: &RunRecord,
1037    run_record_value: JsonValue,
1038    include_attachments: bool,
1039) -> Result<SessionBundle, SessionBundleError> {
1040    let mut bundle = SessionBundle {
1041        bundle_id: new_id("bundle"),
1042        created_at: now_rfc3339(),
1043        producer: BundleProducer {
1044            name: "harn".to_string(),
1045            version: env!("CARGO_PKG_VERSION").to_string(),
1046            schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1047        },
1048        source: BundleSource {
1049            kind: "run_record".to_string(),
1050            run_record_id: run.id.clone(),
1051            workflow_id: run.workflow_id.clone(),
1052            workflow_name: run.workflow_name.clone(),
1053            task: run.task.clone(),
1054            status: run.status.clone(),
1055            started_at: run.started_at.clone(),
1056            finished_at: run.finished_at.clone(),
1057            persisted_path: run.persisted_path.clone(),
1058            root_run_id: run.root_run_id.clone(),
1059            parent_run_id: run.parent_run_id.clone(),
1060            child_run_count: run.child_runs.len(),
1061        },
1062        runtime: BundleRuntime {
1063            harn_version: env!("CARGO_PKG_VERSION").to_string(),
1064            provider_models: run
1065                .usage
1066                .as_ref()
1067                .map(|usage| usage.models.clone())
1068                .unwrap_or_default(),
1069            usage: run.usage.as_ref().map(|usage| BundleUsage {
1070                input_tokens: usage.input_tokens,
1071                output_tokens: usage.output_tokens,
1072                call_count: usage.call_count,
1073                total_duration_ms: usage.total_duration_ms,
1074                total_cost: usage.total_cost,
1075                models: usage.models.clone(),
1076            }),
1077            metadata: BTreeMap::new(),
1078        },
1079        workspace: workspace_from_run(run),
1080        transcript: transcript_from_run(run),
1081        tools: BundleTools {
1082            schemas: tool_schema_entries(run),
1083            calls: run
1084                .tool_recordings
1085                .iter()
1086                .map(BundleToolCall::from)
1087                .collect(),
1088        },
1089        permissions: permissions_from_run(run),
1090        replay: BundleReplay {
1091            replay_fixture: run.replay_fixture.clone(),
1092            run_record: Some(run_record_value),
1093            observability: run.observability.clone(),
1094            verification_outcomes: verification_outcomes_for_run(run),
1095            worker_snapshots: worker_snapshots_from_run(run),
1096            event_log_pointers: event_log_pointers_from_run(run),
1097            transitions: run.transitions.clone(),
1098            checkpoints: run.checkpoints.clone(),
1099            trace_spans: run.trace_spans.clone(),
1100            deterministic_events: deterministic_events_from_run(run)?,
1101        },
1102        redaction: RedactionManifest {
1103            mode: "sanitized".to_string(),
1104            policy: "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths"
1105                .to_string(),
1106            placeholder: REDACTED_PLACEHOLDER.to_string(),
1107            entries: Vec::new(),
1108            unsafe_secret_markers_rejected: true,
1109        },
1110        attachments: if include_attachments {
1111            attachments_from_run(run)
1112        } else {
1113            Vec::new()
1114        },
1115        ..SessionBundle::default()
1116    };
1117    bundle.metadata.insert(
1118        "format_note".to_string(),
1119        json!("Session bundles are portable JSON envelopes; hosted share links should reference sanitized bundles rather than raw run records."),
1120    );
1121    Ok(bundle)
1122}
1123
1124fn verification_outcomes_for_run(run: &RunRecord) -> Vec<RunVerificationOutcomeRecord> {
1125    if let Some(observability) = run.observability.as_ref() {
1126        return observability.verification_outcomes.clone();
1127    }
1128    derive_run_observability(run, run.persisted_path.as_deref().map(Path::new))
1129        .verification_outcomes
1130}
1131
1132fn bundle_redaction_policy(base: &RedactionPolicy) -> RedactionPolicy {
1133    base.clone()
1134        .with_extra_field("persisted_path")
1135        .with_extra_field("primary")
1136        .with_extra_field("run_path")
1137        .with_extra_field("snapshot_ref")
1138        .with_extra_field("snapshot_path")
1139        .with_extra_field("source_path")
1140}
1141
1142fn workspace_from_run(run: &RunRecord) -> Option<BundleWorkspace> {
1143    let anchor = run
1144        .transcript
1145        .as_ref()
1146        .and_then(|transcript| transcript.get("metadata"))
1147        .and_then(anchor_from_transcript_metadata_json)?;
1148    Some(BundleWorkspace::from(&anchor))
1149}
1150
1151fn transcript_from_run(run: &RunRecord) -> BundleTranscript {
1152    let mut sections = Vec::new();
1153    if let Some(transcript) = &run.transcript {
1154        sections.push(transcript_section(
1155            "run",
1156            "Run transcript",
1157            "run",
1158            "$.transcript",
1159            transcript,
1160        ));
1161    }
1162    for (index, stage) in run.stages.iter().enumerate() {
1163        if let Some(transcript) = &stage.transcript {
1164            sections.push(transcript_section(
1165                &stage.id,
1166                &format!("Stage {}", stage.node_id),
1167                "stage",
1168                &format!("$.stages[{index}].transcript"),
1169                transcript,
1170            ));
1171        }
1172    }
1173    BundleTranscript { sections }
1174}
1175
1176fn transcript_section(
1177    id: &str,
1178    label: &str,
1179    scope: &str,
1180    location: &str,
1181    transcript: &JsonValue,
1182) -> BundleTranscriptSection {
1183    BundleTranscriptSection {
1184        id: id.to_string(),
1185        label: label.to_string(),
1186        scope: scope.to_string(),
1187        location: location.to_string(),
1188        summary: transcript
1189            .get("summary")
1190            .and_then(JsonValue::as_str)
1191            .map(str::to_string),
1192        messages: json_array(transcript.get("messages")),
1193        events: json_array(transcript.get("events")),
1194        assets: json_array(transcript.get("assets")),
1195        metadata: transcript
1196            .get("metadata")
1197            .and_then(JsonValue::as_object)
1198            .map(|map| {
1199                map.iter()
1200                    .map(|(key, value)| (key.clone(), value.clone()))
1201                    .collect()
1202            })
1203            .unwrap_or_default(),
1204    }
1205}
1206
1207fn json_array(value: Option<&JsonValue>) -> Vec<JsonValue> {
1208    value
1209        .and_then(JsonValue::as_array)
1210        .cloned()
1211        .unwrap_or_default()
1212}
1213
1214fn tool_schema_entries(run: &RunRecord) -> Vec<BundleJsonEntry> {
1215    let mut entries = Vec::new();
1216    collect_tool_schema_entries_from_transcript(&mut entries, "run.transcript", &run.transcript);
1217    for stage in &run.stages {
1218        collect_tool_schema_entries_from_transcript(
1219            &mut entries,
1220            &format!("stage.{}.transcript", stage.node_id),
1221            &stage.transcript,
1222        );
1223        if let Some(tools) = stage
1224            .metadata
1225            .get("tool_schemas")
1226            .or_else(|| stage.metadata.get("tools"))
1227        {
1228            entries.push(BundleJsonEntry {
1229                source: format!("stage.{}.metadata", stage.node_id),
1230                index: entries.len(),
1231                value: tools.clone(),
1232            });
1233        }
1234    }
1235    entries
1236}
1237
1238fn collect_tool_schema_entries_from_transcript(
1239    entries: &mut Vec<BundleJsonEntry>,
1240    source: &str,
1241    transcript: &Option<JsonValue>,
1242) {
1243    let Some(transcript) = transcript else {
1244        return;
1245    };
1246    for event in transcript
1247        .get("events")
1248        .and_then(JsonValue::as_array)
1249        .into_iter()
1250        .flatten()
1251    {
1252        let kind = event
1253            .get("type")
1254            .or_else(|| event.get("kind"))
1255            .and_then(JsonValue::as_str)
1256            .unwrap_or_default();
1257        if kind == "tool_schemas" || kind == "tool_schema" {
1258            entries.push(BundleJsonEntry {
1259                source: source.to_string(),
1260                index: entries.len(),
1261                value: event.clone(),
1262            });
1263        }
1264    }
1265}
1266
1267fn permissions_from_run(run: &RunRecord) -> Vec<BundlePermission> {
1268    let mut permissions = run
1269        .hitl_questions
1270        .iter()
1271        .map(permission_from_hitl_question)
1272        .collect::<Vec<_>>();
1273    collect_permission_events(&mut permissions, "run.transcript", &run.transcript);
1274    for stage in &run.stages {
1275        collect_permission_events(
1276            &mut permissions,
1277            &format!("stage.{}.transcript", stage.node_id),
1278            &stage.transcript,
1279        );
1280        if let Some(worker) = stage.metadata.get("worker") {
1281            if let Some(policy) = worker
1282                .get("audit")
1283                .and_then(|audit| audit.get("approval_policy"))
1284            {
1285                permissions.push(BundlePermission {
1286                    kind: "approval_policy".to_string(),
1287                    source: format!("stage.{}.worker.audit", stage.node_id),
1288                    request_id: None,
1289                    agent: worker
1290                        .get("name")
1291                        .and_then(JsonValue::as_str)
1292                        .map(str::to_string),
1293                    payload: policy.clone(),
1294                });
1295            }
1296        }
1297    }
1298    permissions
1299}
1300
1301fn permission_from_hitl_question(question: &RunHitlQuestionRecord) -> BundlePermission {
1302    BundlePermission {
1303        kind: "hitl_question".to_string(),
1304        source: "run.hitl_questions".to_string(),
1305        request_id: Some(question.request_id.clone()),
1306        agent: if question.agent.is_empty() {
1307            None
1308        } else {
1309            Some(question.agent.clone())
1310        },
1311        payload: serde_json::to_value(question).unwrap_or(JsonValue::Null),
1312    }
1313}
1314
1315fn collect_permission_events(
1316    permissions: &mut Vec<BundlePermission>,
1317    source: &str,
1318    transcript: &Option<JsonValue>,
1319) {
1320    let Some(transcript) = transcript else {
1321        return;
1322    };
1323    for event in transcript
1324        .get("events")
1325        .and_then(JsonValue::as_array)
1326        .into_iter()
1327        .flatten()
1328    {
1329        let kind = event
1330            .get("type")
1331            .or_else(|| event.get("kind"))
1332            .and_then(JsonValue::as_str)
1333            .unwrap_or_default();
1334        if kind.contains("permission") || kind.contains("approval") || kind.starts_with("hitl_") {
1335            permissions.push(BundlePermission {
1336                kind: kind.to_string(),
1337                source: source.to_string(),
1338                request_id: event
1339                    .get("request_id")
1340                    .or_else(|| event.get("id"))
1341                    .and_then(JsonValue::as_str)
1342                    .map(str::to_string),
1343                agent: event
1344                    .get("agent")
1345                    .and_then(JsonValue::as_str)
1346                    .map(str::to_string),
1347                payload: event.clone(),
1348            });
1349        }
1350    }
1351}
1352
1353fn event_log_pointers_from_run(run: &RunRecord) -> Vec<BundleEventLogPointer> {
1354    let mut pointers = Vec::new();
1355    if let Some(observability) = &run.observability {
1356        for pointer in &observability.transcript_pointers {
1357            pointers.push(BundleEventLogPointer {
1358                kind: pointer.kind.clone(),
1359                topic: None,
1360                path: pointer.path.clone(),
1361                location: pointer.location.clone(),
1362                available: pointer.available,
1363            });
1364        }
1365        for worker in &observability.worker_lineage {
1366            if let Some(session_id) = &worker.session_id {
1367                pointers.push(BundleEventLogPointer {
1368                    kind: "agent_events".to_string(),
1369                    topic: Some(format!("observability.agent_events.{session_id}")),
1370                    path: worker.snapshot_path.clone(),
1371                    location: format!("worker.{}.session", worker.worker_id),
1372                    available: worker.snapshot_path.is_some(),
1373                });
1374            }
1375        }
1376    }
1377    pointers
1378}
1379
1380fn worker_snapshots_from_run(run: &RunRecord) -> Vec<BundleWorkerSnapshot> {
1381    let mut snapshots = Vec::new();
1382    let mut seen_paths = BTreeSet::new();
1383    for child in &run.child_runs {
1384        let Some(path) = child.snapshot_path.as_deref() else {
1385            continue;
1386        };
1387        if !seen_paths.insert(path.to_string()) {
1388            continue;
1389        }
1390        if let Some(snapshot) = worker_snapshot_from_path(
1391            &child.worker_id,
1392            &child.worker_name,
1393            &child.status,
1394            Path::new(path),
1395        ) {
1396            snapshots.push(snapshot);
1397        }
1398    }
1399    if let Some(observability) = run.observability.as_ref() {
1400        for worker in &observability.worker_lineage {
1401            let Some(path) = worker.snapshot_path.as_deref() else {
1402                continue;
1403            };
1404            if !seen_paths.insert(path.to_string()) {
1405                continue;
1406            }
1407            if let Some(snapshot) = worker_snapshot_from_path(
1408                &worker.worker_id,
1409                &worker.worker_name,
1410                &worker.status,
1411                Path::new(path),
1412            ) {
1413                snapshots.push(snapshot);
1414            }
1415        }
1416    }
1417    snapshots
1418}
1419
1420fn worker_snapshot_from_path(
1421    worker_id: &str,
1422    worker_name: &str,
1423    status: &str,
1424    path: &Path,
1425) -> Option<BundleWorkerSnapshot> {
1426    let content = fs::read_to_string(path).ok()?;
1427    let value = serde_json::from_str::<JsonValue>(&content).ok()?;
1428    Some(BundleWorkerSnapshot {
1429        worker_id: if worker_id.is_empty() {
1430            value
1431                .get("id")
1432                .and_then(JsonValue::as_str)
1433                .unwrap_or_default()
1434                .to_string()
1435        } else {
1436            worker_id.to_string()
1437        },
1438        worker_name: if worker_name.is_empty() {
1439            value
1440                .get("name")
1441                .and_then(JsonValue::as_str)
1442                .unwrap_or("worker")
1443                .to_string()
1444        } else {
1445            worker_name.to_string()
1446        },
1447        status: if status.is_empty() {
1448            value
1449                .get("status")
1450                .and_then(JsonValue::as_str)
1451                .unwrap_or_default()
1452                .to_string()
1453        } else {
1454            status.to_string()
1455        },
1456        snapshot_ref: value
1457            .get("suspension")
1458            .and_then(|value| value.get("snapshot_ref"))
1459            .and_then(JsonValue::as_str)
1460            .or_else(|| value.get("snapshot_path").and_then(JsonValue::as_str))
1461            .unwrap_or_else(|| path.to_str().unwrap_or_default())
1462            .to_string(),
1463        source_path: Some(path.to_string_lossy().into_owned()),
1464        value,
1465    })
1466}
1467
1468fn worker_snapshot_file_name(worker_id: &str, index: usize) -> String {
1469    let component = sanitize_topic_component(worker_id);
1470    let component = if component.is_empty() {
1471        format!("worker_{index}")
1472    } else {
1473        component
1474    };
1475    format!("{component}.json")
1476}
1477
1478fn worker_snapshot_value_for_import(value: &JsonValue, path: &Path) -> JsonValue {
1479    let mut value = value.clone();
1480    let path = path.to_string_lossy().into_owned();
1481    if let JsonValue::Object(map) = &mut value {
1482        map.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
1483        if let Some(JsonValue::Object(suspension)) = map.get_mut("suspension") {
1484            suspension.insert("snapshot_ref".to_string(), JsonValue::String(path));
1485        }
1486    }
1487    value
1488}
1489
1490fn deterministic_events_from_run(
1491    run: &RunRecord,
1492) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1493    let mut events = Vec::new();
1494    for (index, transition) in run.transitions.iter().enumerate() {
1495        events.push(BundleJsonEntry {
1496            source: "run.transitions".to_string(),
1497            index,
1498            value: serde_json::to_value(transition)
1499                .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1500        });
1501    }
1502    for (index, checkpoint) in run.checkpoints.iter().enumerate() {
1503        events.push(BundleJsonEntry {
1504            source: "run.checkpoints".to_string(),
1505            index,
1506            value: serde_json::to_value(checkpoint)
1507                .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1508        });
1509    }
1510    Ok(events)
1511}
1512
1513fn attachments_from_run(run: &RunRecord) -> Vec<BundleAttachment> {
1514    run.artifacts
1515        .iter()
1516        .map(|artifact| BundleAttachment {
1517            id: artifact.id.clone(),
1518            kind: artifact.kind.clone(),
1519            title: artifact.title.clone(),
1520            stage: artifact.stage.clone(),
1521            text: artifact.text.clone(),
1522            data: artifact.data.clone(),
1523            metadata: artifact.metadata.clone(),
1524        })
1525        .collect()
1526}
1527
1528fn redact_json_with_manifest(
1529    value: &mut JsonValue,
1530    path: &str,
1531    policy: &RedactionPolicy,
1532    entries: &mut Vec<RedactionEntry>,
1533) {
1534    match value {
1535        JsonValue::Object(map) => {
1536            let keys = map.keys().cloned().collect::<Vec<_>>();
1537            for key in keys {
1538                let child_path = json_path_child(path, &key);
1539                if policy.field_is_sensitive(&key) {
1540                    map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1541                    entries.push(RedactionEntry {
1542                        path: child_path,
1543                        class: "sensitive_field".to_string(),
1544                        action: "replaced".to_string(),
1545                        replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1546                    });
1547                } else if let Some(child) = map.get_mut(&key) {
1548                    redact_json_with_manifest(child, &child_path, policy, entries);
1549                }
1550            }
1551        }
1552        JsonValue::Array(items) => {
1553            for (index, item) in items.iter_mut().enumerate() {
1554                redact_json_with_manifest(item, &format!("{path}[{index}]"), policy, entries);
1555            }
1556        }
1557        JsonValue::String(text) => {
1558            let redacted = policy.redact_string(text);
1559            if let Cow::Owned(replacement) = redacted {
1560                // Record the actual replacement string (now a named
1561                // `<redacted:<pattern>:<len>>` placeholder from the
1562                // OA-06 catalog) in the manifest so audit consumers
1563                // can attribute the leak to a specific provider.
1564                let manifest_replacement = replacement.clone();
1565                *text = replacement;
1566                entries.push(RedactionEntry {
1567                    path: path.to_string(),
1568                    class: "secret_pattern_or_url".to_string(),
1569                    action: "replaced".to_string(),
1570                    replacement: Some(manifest_replacement),
1571                });
1572            }
1573        }
1574        _ => {}
1575    }
1576}
1577
1578fn redact_bundle_pointer_paths_json(
1579    value: &mut JsonValue,
1580    path: &str,
1581    entries: &mut Vec<RedactionEntry>,
1582) {
1583    match value {
1584        JsonValue::Object(map) => {
1585            let keys = map.keys().cloned().collect::<Vec<_>>();
1586            for key in keys {
1587                let child_path = json_path_child(path, &key);
1588                if key == "path" && bundle_pointer_path_should_redact(&child_path) {
1589                    if !map.get(&key).is_some_and(JsonValue::is_null) {
1590                        map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1591                        entries.push(RedactionEntry {
1592                            path: child_path,
1593                            class: "local_pointer_path".to_string(),
1594                            action: "replaced".to_string(),
1595                            replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1596                        });
1597                    }
1598                } else if let Some(child) = map.get_mut(&key) {
1599                    redact_bundle_pointer_paths_json(child, &child_path, entries);
1600                }
1601            }
1602        }
1603        JsonValue::Array(items) => {
1604            for (index, item) in items.iter_mut().enumerate() {
1605                redact_bundle_pointer_paths_json(item, &format!("{path}[{index}]"), entries);
1606            }
1607        }
1608        _ => {}
1609    }
1610}
1611
1612fn bundle_pointer_path_should_redact(path: &str) -> bool {
1613    path.contains(".event_log_pointers[") || path.contains(".transcript_pointers[")
1614}
1615
1616fn withhold_replay_only_json(value: &mut JsonValue, path: &str, entries: &mut Vec<RedactionEntry>) {
1617    match value {
1618        JsonValue::Object(map) => {
1619            let keys = map.keys().cloned().collect::<Vec<_>>();
1620            for key in keys {
1621                let child_path = json_path_child(path, &key);
1622                if replay_only_field_is_prompt_payload(&key) {
1623                    if !map.get(&key).is_some_and(JsonValue::is_null) {
1624                        map.insert(key, JsonValue::String(REPLAY_ONLY_PLACEHOLDER.to_string()));
1625                        entries.push(RedactionEntry {
1626                            path: child_path,
1627                            class: "prompt_or_tool_payload".to_string(),
1628                            action: "withheld".to_string(),
1629                            replacement: Some(REPLAY_ONLY_PLACEHOLDER.to_string()),
1630                        });
1631                    }
1632                } else if let Some(child) = map.get_mut(&key) {
1633                    withhold_replay_only_json(child, &child_path, entries);
1634                }
1635            }
1636        }
1637        JsonValue::Array(items) => {
1638            for (index, item) in items.iter_mut().enumerate() {
1639                withhold_replay_only_json(item, &format!("{path}[{index}]"), entries);
1640            }
1641        }
1642        _ => {}
1643    }
1644}
1645
1646fn replay_only_field_is_prompt_payload(key: &str) -> bool {
1647    matches!(
1648        key,
1649        "args"
1650            | "arguments"
1651            | "blocks"
1652            | "content"
1653            | "data"
1654            | "private_reasoning"
1655            | "prompt"
1656            | "raw_input"
1657            | "raw_output"
1658            | "result"
1659            | "response_text"
1660            | "summary"
1661            | "system"
1662            | "system_prompt"
1663            | "task"
1664            | "text"
1665            | "thinking"
1666            | "visible_text"
1667    )
1668}
1669
1670fn set_json_path(value: &mut JsonValue, path: &[&str], replacement: JsonValue) {
1671    let Some((head, tail)) = path.split_first() else {
1672        *value = replacement;
1673        return;
1674    };
1675    if tail.is_empty() {
1676        if let JsonValue::Object(map) = value {
1677            map.insert((*head).to_string(), replacement);
1678        }
1679        return;
1680    }
1681    if let JsonValue::Object(map) = value {
1682        if let Some(child) = map.get_mut(*head) {
1683            set_json_path(child, tail, replacement);
1684        }
1685    }
1686}
1687
1688fn require_field(value: &JsonValue, field: &str) -> Result<(), SessionBundleError> {
1689    if value.get(field).is_some() {
1690        Ok(())
1691    } else {
1692        Err(SessionBundleError::MissingRequired(format!("$.{field}")))
1693    }
1694}
1695
1696fn require_nested_field(value: &JsonValue, path: &[&str]) -> Result<(), SessionBundleError> {
1697    let mut current = value;
1698    for segment in path {
1699        current = current
1700            .get(*segment)
1701            .ok_or_else(|| SessionBundleError::MissingRequired(json_path_from_segments(path)))?;
1702    }
1703    Ok(())
1704}
1705
1706fn json_path_from_segments(path: &[&str]) -> String {
1707    path.iter().fold("$".to_string(), |parent, segment| {
1708        json_path_child(&parent, segment)
1709    })
1710}
1711
1712fn reject_unredacted_secret_markers(
1713    value: &JsonValue,
1714    path: &str,
1715    policy: &RedactionPolicy,
1716) -> Result<(), SessionBundleError> {
1717    match value {
1718        JsonValue::Object(map) => {
1719            for (key, child) in map {
1720                reject_unredacted_secret_markers(child, &json_path_child(path, key), policy)?;
1721            }
1722        }
1723        JsonValue::Array(items) => {
1724            for (index, item) in items.iter().enumerate() {
1725                reject_unredacted_secret_markers(item, &format!("{path}[{index}]"), policy)?;
1726            }
1727        }
1728        JsonValue::String(text) => {
1729            if matches!(policy.redact_string(text), Cow::Owned(_)) {
1730                return Err(SessionBundleError::UnsafeSecretMarker {
1731                    path: path.to_string(),
1732                    excerpt: secret_excerpt(text),
1733                });
1734            }
1735        }
1736        _ => {}
1737    }
1738    Ok(())
1739}
1740
1741fn secret_excerpt(text: &str) -> String {
1742    let excerpt = text.chars().take(80).collect::<String>();
1743    if text.chars().count() > 80 {
1744        format!("{excerpt}...")
1745    } else {
1746        excerpt
1747    }
1748}
1749
1750fn json_path_child(parent: &str, key: &str) -> String {
1751    if key
1752        .chars()
1753        .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1754    {
1755        format!("{parent}.{key}")
1756    } else {
1757        format!(
1758            "{parent}[{}]",
1759            serde_json::to_string(key).unwrap_or_default()
1760        )
1761    }
1762}