1use std::borrow::Cow;
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12use std::fs;
13use std::path::Path;
14
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value as JsonValue};
18
19use crate::agent_events::AgentEvent;
20use crate::event_log::sanitize_topic_component;
21use crate::orchestration::{
22 derive_run_observability, new_id, now_rfc3339, AgentSessionReplayEvent, ReplayFixture,
23 RunCheckpointRecord, RunChildRecord, RunExecutionRecord, RunHitlQuestionRecord,
24 RunObservabilityRecord, RunRecord, RunTraceSpanRecord, RunTransitionRecord,
25 RunVerificationOutcomeRecord, RunWorkerLineageRecord, ToolCallRecord,
26};
27use crate::redact::{RedactionPolicy, REDACTED_PLACEHOLDER};
28use crate::workspace_anchor::{anchor_from_transcript_metadata_json, MountedRoot, WorkspaceAnchor};
29
30mod schema;
31pub use schema::{session_bundle_schema, session_bundle_schema_pretty};
32
33#[cfg(test)]
34mod tests;
35
36pub const SESSION_BUNDLE_TYPE: &str = "harn_session_bundle";
37pub const SESSION_BUNDLE_SCHEMA_VERSION: u32 = 1;
38pub const SESSION_BUNDLE_SCHEMA_ID: &str = "https://harnlang.com/schemas/session-bundle.v1.json";
39pub const REPLAY_ONLY_PLACEHOLDER: &str = "[withheld]";
40
41pub const SESSION_BUNDLE_STATUS_SUSPENDED: &str = "suspended";
47
48pub const SESSION_BUNDLE_STATUS_COMPLETED: &str = "completed";
51
52pub const SESSION_BUNDLE_LIVENESS_KEY: &str = "session_liveness";
56
57#[derive(Clone, Debug, PartialEq, Eq)]
66pub enum AgentSessionLiveness {
67 Closed { status: String, finished_at_ms: i64 },
70 Suspended,
73}
74
75impl AgentSessionLiveness {
76 pub fn status(&self) -> &str {
78 match self {
79 AgentSessionLiveness::Closed { status, .. } => status,
80 AgentSessionLiveness::Suspended => SESSION_BUNDLE_STATUS_SUSPENDED,
81 }
82 }
83
84 pub fn tag(&self) -> &'static str {
86 match self {
87 AgentSessionLiveness::Closed { .. } => "closed",
88 AgentSessionLiveness::Suspended => "suspended",
89 }
90 }
91
92 pub fn is_suspended(&self) -> bool {
94 matches!(self, AgentSessionLiveness::Suspended)
95 }
96}
97
98pub fn agent_session_liveness(events: &[AgentSessionReplayEvent]) -> AgentSessionLiveness {
104 events
105 .iter()
106 .rev()
107 .find_map(|entry| match &entry.event {
108 AgentEvent::SessionClosed { status, .. } => Some(AgentSessionLiveness::Closed {
109 status: if status.is_empty() {
110 SESSION_BUNDLE_STATUS_COMPLETED.to_string()
111 } else {
112 status.clone()
113 },
114 finished_at_ms: entry.occurred_at_ms,
115 }),
116 _ => None,
117 })
118 .unwrap_or(AgentSessionLiveness::Suspended)
119}
120
121#[derive(Clone, Copy, Debug, Eq, PartialEq)]
122pub enum SessionBundleExportMode {
123 Local,
124 Sanitized,
125 ReplayOnly,
126}
127
128impl SessionBundleExportMode {
129 pub fn as_str(self) -> &'static str {
130 match self {
131 Self::Local => "local",
132 Self::Sanitized => "sanitized",
133 Self::ReplayOnly => "replay_only",
134 }
135 }
136}
137
138#[derive(Clone, Debug)]
139pub struct SessionBundleExportOptions {
140 pub mode: SessionBundleExportMode,
141 pub include_attachments: bool,
142 pub redaction_policy: RedactionPolicy,
143}
144
145impl Default for SessionBundleExportOptions {
146 fn default() -> Self {
147 Self {
148 mode: SessionBundleExportMode::Sanitized,
149 include_attachments: false,
150 redaction_policy: RedactionPolicy::default(),
151 }
152 }
153}
154
155#[derive(Clone, Debug, Default)]
156pub struct SessionBundleValidationOptions {
157 pub allow_unsafe_secret_markers: bool,
158 pub redaction_policy: RedactionPolicy,
159}
160
161#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
162#[serde(default)]
163pub struct SessionBundle {
164 #[serde(rename = "_type")]
165 pub type_name: String,
166 pub schema_version: u32,
167 pub bundle_id: String,
168 pub created_at: String,
169 pub producer: BundleProducer,
170 pub source: BundleSource,
171 pub runtime: BundleRuntime,
172 pub workspace: Option<BundleWorkspace>,
173 pub transcript: BundleTranscript,
174 pub tools: BundleTools,
175 pub permissions: Vec<BundlePermission>,
176 pub replay: BundleReplay,
177 pub redaction: RedactionManifest,
178 pub attachments: Vec<BundleAttachment>,
179 pub metadata: BTreeMap<String, JsonValue>,
180}
181
182impl Default for SessionBundle {
183 fn default() -> Self {
184 Self {
185 type_name: SESSION_BUNDLE_TYPE.to_string(),
186 schema_version: SESSION_BUNDLE_SCHEMA_VERSION,
187 bundle_id: String::new(),
188 created_at: String::new(),
189 producer: BundleProducer::default(),
190 source: BundleSource::default(),
191 runtime: BundleRuntime::default(),
192 workspace: None,
193 transcript: BundleTranscript::default(),
194 tools: BundleTools::default(),
195 permissions: Vec::new(),
196 replay: BundleReplay::default(),
197 redaction: RedactionManifest::default(),
198 attachments: Vec::new(),
199 metadata: BTreeMap::new(),
200 }
201 }
202}
203
204#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
205#[serde(default)]
206pub struct BundleProducer {
207 pub name: String,
208 pub version: String,
209 pub schema_id: String,
210}
211
212#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
213#[serde(default)]
214pub struct BundleSource {
215 pub kind: String,
216 pub run_record_id: String,
217 pub workflow_id: String,
218 pub workflow_name: Option<String>,
219 pub task: String,
220 pub status: String,
221 pub started_at: String,
222 pub finished_at: Option<String>,
223 pub persisted_path: Option<String>,
224 pub root_run_id: Option<String>,
225 pub parent_run_id: Option<String>,
226 pub child_run_count: usize,
227}
228
229#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
230#[serde(default)]
231pub struct BundleRuntime {
232 pub harn_version: String,
233 pub provider_models: Vec<String>,
234 pub usage: Option<BundleUsage>,
235 pub metadata: BTreeMap<String, JsonValue>,
236}
237
238#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
239#[serde(default)]
240pub struct BundleUsage {
241 pub input_tokens: i64,
242 pub output_tokens: i64,
243 pub call_count: i64,
244 pub total_duration_ms: i64,
245 pub total_cost: f64,
246 pub models: Vec<String>,
247}
248
249#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
250#[serde(default)]
251pub struct BundleWorkspace {
252 pub primary: Option<String>,
254 #[serde(default)]
256 pub additional_roots: Vec<BundleMountedRoot>,
257 pub anchored_at: Option<String>,
259 pub policy: String,
263}
264
265#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
266#[serde(default)]
267pub struct BundleMountedRoot {
268 pub path: String,
269 pub mount_mode: String,
270 pub mounted_at: String,
271}
272
273impl From<&MountedRoot> for BundleMountedRoot {
274 fn from(root: &MountedRoot) -> Self {
275 Self {
276 path: root.path.to_string_lossy().into_owned(),
277 mount_mode: root.mount_mode.as_str().to_string(),
278 mounted_at: root.mounted_at.clone(),
279 }
280 }
281}
282
283impl From<&WorkspaceAnchor> for BundleWorkspace {
284 fn from(anchor: &WorkspaceAnchor) -> Self {
285 Self {
286 primary: Some(anchor.primary.to_string_lossy().into_owned()),
287 additional_roots: anchor.additional_roots.iter().map(Into::into).collect(),
288 anchored_at: Some(anchor.anchored_at.clone()),
289 policy: "safe_identity_only".to_string(),
290 }
291 }
292}
293
294#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
295#[serde(default)]
296pub struct BundleTranscript {
297 pub sections: Vec<BundleTranscriptSection>,
298}
299
300#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
301#[serde(default)]
302pub struct BundleTranscriptSection {
303 pub id: String,
304 pub label: String,
305 pub scope: String,
306 pub location: String,
307 pub summary: Option<String>,
308 pub messages: Vec<JsonValue>,
309 pub events: Vec<JsonValue>,
310 pub assets: Vec<JsonValue>,
311 pub metadata: BTreeMap<String, JsonValue>,
312}
313
314#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
315#[serde(default)]
316pub struct BundleTools {
317 pub schemas: Vec<BundleJsonEntry>,
318 pub calls: Vec<BundleToolCall>,
319}
320
321#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
322#[serde(default)]
323pub struct BundleJsonEntry {
324 pub source: String,
325 pub index: usize,
326 pub value: JsonValue,
327}
328
329#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
330#[serde(default)]
331pub struct BundleToolCall {
332 pub tool_name: String,
333 pub tool_use_id: String,
334 pub args_hash: String,
335 pub result: String,
336 pub is_rejected: bool,
337 pub duration_ms: u64,
338 pub iteration: usize,
339 pub timestamp: String,
340}
341
342impl From<&ToolCallRecord> for BundleToolCall {
343 fn from(record: &ToolCallRecord) -> Self {
344 Self {
345 tool_name: record.tool_name.clone(),
346 tool_use_id: record.tool_use_id.clone(),
347 args_hash: record.args_hash.clone(),
348 result: record.result.clone(),
349 is_rejected: record.is_rejected,
350 duration_ms: record.duration_ms,
351 iteration: record.iteration,
352 timestamp: record.timestamp.clone(),
353 }
354 }
355}
356
357#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
358#[serde(default)]
359pub struct BundlePermission {
360 pub kind: String,
361 pub source: String,
362 pub request_id: Option<String>,
363 pub agent: Option<String>,
364 pub payload: JsonValue,
365}
366
367#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
368#[serde(default)]
369pub struct BundleReplay {
370 pub replay_fixture: Option<ReplayFixture>,
371 pub run_record: Option<JsonValue>,
372 #[serde(skip_serializing_if = "Option::is_none")]
373 pub observability: Option<RunObservabilityRecord>,
374 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
375 #[serde(skip_serializing_if = "Vec::is_empty")]
376 pub worker_snapshots: Vec<BundleWorkerSnapshot>,
377 pub event_log_pointers: Vec<BundleEventLogPointer>,
378 pub transitions: Vec<RunTransitionRecord>,
379 pub checkpoints: Vec<RunCheckpointRecord>,
380 pub trace_spans: Vec<RunTraceSpanRecord>,
381 pub deterministic_events: Vec<BundleJsonEntry>,
382}
383
384#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
385#[serde(default)]
386pub struct BundleWorkerSnapshot {
387 pub worker_id: String,
388 pub worker_name: String,
389 pub status: String,
390 pub snapshot_ref: String,
391 pub source_path: Option<String>,
392 pub value: JsonValue,
393}
394
395#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
396#[serde(default)]
397pub struct MaterializedWorkerSnapshot {
398 pub worker_id: String,
399 pub path: String,
400}
401
402#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
403#[serde(default)]
404pub struct BundleEventLogPointer {
405 pub kind: String,
406 pub topic: Option<String>,
407 pub path: Option<String>,
408 pub location: String,
409 pub available: bool,
410}
411
412#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
413#[serde(default)]
414pub struct RedactionManifest {
415 pub mode: String,
416 pub policy: String,
417 pub placeholder: String,
418 pub entries: Vec<RedactionEntry>,
419 pub unsafe_secret_markers_rejected: bool,
420}
421
422#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
423#[serde(default)]
424pub struct RedactionEntry {
425 pub path: String,
426 pub class: String,
427 pub action: String,
428 pub replacement: Option<String>,
429}
430
431#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
432#[serde(default)]
433pub struct BundleAttachment {
434 pub id: String,
435 pub kind: String,
436 pub title: Option<String>,
437 pub stage: Option<String>,
438 pub text: Option<String>,
439 pub data: Option<JsonValue>,
440 pub metadata: BTreeMap<String, JsonValue>,
441}
442
443#[derive(Debug, Clone, PartialEq, Eq)]
444pub enum SessionBundleError {
445 Decode(String),
446 Encode(String),
447 MissingRequired(String),
448 UnsupportedSchemaVersion { found: u64, supported: u32 },
449 InvalidType { path: String, expected: String },
450 UnsupportedCheckpointState { status: String },
451 UnsafeSecretMarker { path: String, excerpt: String },
452 MissingRunRecord,
453 MissingSessionEvents { session_id: String },
454}
455
456impl fmt::Display for SessionBundleError {
457 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
458 match self {
459 Self::Decode(error) => write!(f, "failed to decode session bundle: {error}"),
460 Self::Encode(error) => write!(f, "failed to encode session bundle: {error}"),
461 Self::MissingRequired(path) => {
462 write!(f, "session bundle is missing required field {path}")
463 }
464 Self::UnsupportedSchemaVersion { found, supported } => write!(
465 f,
466 "unsupported session bundle schema_version {found}; this build supports <= {supported}"
467 ),
468 Self::InvalidType { path, expected } => {
469 write!(f, "session bundle field {path} must be {expected}")
470 }
471 Self::UnsupportedCheckpointState { status } => write!(
472 f,
473 "worker snapshot status {status:?} is not checkpointable; suspend the worker at a turn boundary first"
474 ),
475 Self::UnsafeSecretMarker { path, excerpt } => write!(
476 f,
477 "session bundle contains an unsafe unredacted secret marker at {path}: {excerpt}"
478 ),
479 Self::MissingRunRecord => write!(f, "session bundle does not include an importable run record"),
480 Self::MissingSessionEvents { session_id } => write!(
481 f,
482 "event log does not contain replayable events for session_id {session_id:?}"
483 ),
484 }
485 }
486}
487
488impl std::error::Error for SessionBundleError {}
489
490pub fn export_run_record_bundle(
491 run: &RunRecord,
492 options: &SessionBundleExportOptions,
493) -> Result<SessionBundle, SessionBundleError> {
494 let run_record_value =
495 serde_json::to_value(run).map_err(|error| SessionBundleError::Encode(error.to_string()))?;
496 let mut bundle = raw_bundle_from_run(run, run_record_value, options.include_attachments)?;
497 let mut bundle_value = serde_json::to_value(&bundle)
498 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
499
500 let mut manifest = RedactionManifest {
501 mode: options.mode.as_str().to_string(),
502 policy: if matches!(options.mode, SessionBundleExportMode::Local) {
503 "none".to_string()
504 } else {
505 "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths".to_string()
506 },
507 placeholder: REDACTED_PLACEHOLDER.to_string(),
508 entries: Vec::new(),
509 unsafe_secret_markers_rejected: !matches!(options.mode, SessionBundleExportMode::Local),
510 };
511
512 if !matches!(options.mode, SessionBundleExportMode::Local) {
513 let redaction_policy = bundle_redaction_policy(&options.redaction_policy);
514 redact_json_with_manifest(
515 &mut bundle_value,
516 "$",
517 &redaction_policy,
518 &mut manifest.entries,
519 );
520 redact_bundle_pointer_paths_json(&mut bundle_value, "$", &mut manifest.entries);
521 }
522 if matches!(options.mode, SessionBundleExportMode::ReplayOnly) {
523 withhold_replay_only_json(&mut bundle_value, "$", &mut manifest.entries);
524 }
525 set_json_path(
526 &mut bundle_value,
527 &["redaction"],
528 serde_json::to_value(&manifest)
529 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
530 );
531 bundle = serde_json::from_value(bundle_value)
532 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
533 Ok(bundle)
534}
535
536pub fn export_worker_snapshot_bundle(
537 snapshot_path: &Path,
538 options: &SessionBundleExportOptions,
539) -> Result<SessionBundle, SessionBundleError> {
540 let run = run_record_from_worker_snapshot(snapshot_path)?;
541 export_run_record_bundle(&run, options)
542}
543
544pub fn run_record_from_worker_snapshot(
545 snapshot_path: &Path,
546) -> Result<RunRecord, SessionBundleError> {
547 let content = fs::read_to_string(snapshot_path).map_err(|error| {
548 SessionBundleError::Decode(format!(
549 "failed to read worker snapshot {}: {error}",
550 snapshot_path.display()
551 ))
552 })?;
553 let value: JsonValue = serde_json::from_str(&content)
554 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
555 run_record_from_worker_snapshot_value(snapshot_path, value)
556}
557
558pub fn validate_session_bundle_value(
559 value: &JsonValue,
560 options: &SessionBundleValidationOptions,
561) -> Result<SessionBundle, SessionBundleError> {
562 require_field(value, "_type")?;
563 require_field(value, "schema_version")?;
564 require_field(value, "bundle_id")?;
565 require_field(value, "created_at")?;
566 require_field(value, "producer")?;
567 require_field(value, "source")?;
568 require_field(value, "runtime")?;
569 require_field(value, "transcript")?;
570 require_field(value, "tools")?;
571 require_field(value, "permissions")?;
572 require_field(value, "replay")?;
573 require_field(value, "redaction")?;
574 require_field(value, "attachments")?;
575 require_nested_field(value, &["producer", "name"])?;
576 require_nested_field(value, &["producer", "version"])?;
577 require_nested_field(value, &["producer", "schema_id"])?;
578 require_nested_field(value, &["source", "kind"])?;
579 require_nested_field(value, &["source", "run_record_id"])?;
580 require_nested_field(value, &["source", "workflow_id"])?;
581 require_nested_field(value, &["source", "task"])?;
582 require_nested_field(value, &["source", "status"])?;
583 require_nested_field(value, &["runtime", "harn_version"])?;
584 require_nested_field(value, &["runtime", "provider_models"])?;
585 require_nested_field(value, &["transcript", "sections"])?;
586 require_nested_field(value, &["tools", "schemas"])?;
587 require_nested_field(value, &["tools", "calls"])?;
588 require_nested_field(value, &["replay", "event_log_pointers"])?;
589 require_nested_field(value, &["replay", "transitions"])?;
590 require_nested_field(value, &["replay", "checkpoints"])?;
591 require_nested_field(value, &["replay", "trace_spans"])?;
592 require_nested_field(value, &["replay", "deterministic_events"])?;
593 require_nested_field(value, &["redaction", "mode"])?;
594 require_nested_field(value, &["redaction", "policy"])?;
595 require_nested_field(value, &["redaction", "placeholder"])?;
596 require_nested_field(value, &["redaction", "entries"])?;
597 require_nested_field(value, &["redaction", "unsafe_secret_markers_rejected"])?;
598
599 let type_name = value
600 .get("_type")
601 .and_then(JsonValue::as_str)
602 .ok_or_else(|| SessionBundleError::InvalidType {
603 path: "$._type".to_string(),
604 expected: "string".to_string(),
605 })?;
606 if type_name != SESSION_BUNDLE_TYPE {
607 return Err(SessionBundleError::InvalidType {
608 path: "$._type".to_string(),
609 expected: format!("\"{SESSION_BUNDLE_TYPE}\""),
610 });
611 }
612
613 let version = value
614 .get("schema_version")
615 .and_then(JsonValue::as_u64)
616 .ok_or_else(|| SessionBundleError::InvalidType {
617 path: "$.schema_version".to_string(),
618 expected: "positive integer".to_string(),
619 })?;
620 if version == 0 || version > u64::from(SESSION_BUNDLE_SCHEMA_VERSION) {
621 return Err(SessionBundleError::UnsupportedSchemaVersion {
622 found: version,
623 supported: SESSION_BUNDLE_SCHEMA_VERSION,
624 });
625 }
626
627 if !options.allow_unsafe_secret_markers {
628 reject_unredacted_secret_markers(value, "$", &options.redaction_policy)?;
629 }
630
631 serde_json::from_value::<SessionBundle>(value.clone())
632 .map_err(|error| SessionBundleError::Decode(error.to_string()))
633}
634
635pub fn validate_session_bundle_str(
636 content: &str,
637 options: &SessionBundleValidationOptions,
638) -> Result<SessionBundle, SessionBundleError> {
639 let value: JsonValue = serde_json::from_str(content)
640 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
641 validate_session_bundle_value(&value, options)
642}
643
644pub fn import_run_record_value(bundle: &SessionBundle) -> Result<JsonValue, SessionBundleError> {
645 let replay_observability = replay_observability_for_import(&bundle.replay);
646 if let Some(mut run_record) = bundle.replay.run_record.clone() {
647 let should_fill_observability = match run_record.get("observability") {
648 Some(value) => value.is_null(),
649 None => true,
650 };
651 if should_fill_observability {
652 if let (JsonValue::Object(map), Some(observability)) =
653 (&mut run_record, replay_observability.as_ref())
654 {
655 map.insert(
656 "observability".to_string(),
657 serde_json::to_value(observability)
658 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
659 );
660 }
661 }
662 return Ok(run_record);
663 }
664 if let Some(fixture) = &bundle.replay.replay_fixture {
665 let transcript = bundle.transcript.sections.first().map(|section| {
666 json!({
667 "_type": "transcript",
668 "messages": section.messages.clone(),
669 "events": section.events.clone(),
670 "assets": section.assets.clone(),
671 "summary": section.summary.clone(),
672 "metadata": section.metadata.clone(),
673 })
674 });
675 let hitl_questions = bundle
676 .permissions
677 .iter()
678 .filter(|permission| permission.kind == "hitl_question")
679 .map(|permission| permission.payload.clone())
680 .collect::<Vec<_>>();
681 return Ok(json!({
682 "_type": "run_record",
683 "id": bundle.source.run_record_id.clone(),
684 "workflow_id": bundle.source.workflow_id.clone(),
685 "workflow_name": bundle.source.workflow_name.clone(),
686 "task": bundle.source.task.clone(),
687 "status": bundle.source.status.clone(),
688 "started_at": bundle.source.started_at.clone(),
689 "finished_at": bundle.source.finished_at.clone(),
690 "stages": [],
691 "transitions": bundle.replay.transitions.clone(),
692 "checkpoints": bundle.replay.checkpoints.clone(),
693 "pending_nodes": [],
694 "completed_nodes": [],
695 "child_runs": [],
696 "artifacts": [],
697 "handoffs": [],
698 "policy": {},
699 "transcript": transcript,
700 "usage": bundle.runtime.usage.clone(),
701 "replay_fixture": fixture,
702 "observability": replay_observability,
703 "trace_spans": bundle.replay.trace_spans.clone(),
704 "tool_recordings": bundle.tools.calls.clone(),
705 "hitl_questions": hitl_questions,
706 "persona_runtime": [],
707 "metadata": {
708 "imported_from_session_bundle": bundle.bundle_id.clone(),
709 "session_bundle_schema_version": bundle.schema_version,
710 "worker_snapshot_count": bundle.replay.worker_snapshots.len(),
711 }
712 }));
713 }
714 Err(SessionBundleError::MissingRunRecord)
715}
716
717pub fn import_run_record_value_with_materialized_worker_snapshots(
718 bundle: &SessionBundle,
719 materialized: &[MaterializedWorkerSnapshot],
720) -> Result<JsonValue, SessionBundleError> {
721 let mut run_record = import_run_record_value(bundle)?;
722 apply_materialized_worker_snapshot_paths(&mut run_record, materialized);
723 Ok(run_record)
724}
725
726fn run_record_from_worker_snapshot_value(
727 snapshot_path: &Path,
728 value: JsonValue,
729) -> Result<RunRecord, SessionBundleError> {
730 require_worker_snapshot_marker(&value)?;
731 let status =
732 snapshot_string(&value, "status").ok_or_else(|| missing_worker_snapshot_field("status"))?;
733 if status != "suspended" {
734 return Err(SessionBundleError::UnsupportedCheckpointState { status });
735 }
736 require_worker_snapshot_object_field(&value, "config")?;
737 require_worker_snapshot_object_field(&value, "suspension")?;
738
739 let snapshot_path_string = snapshot_path.to_string_lossy().into_owned();
740 let worker_id =
741 snapshot_string(&value, "id").ok_or_else(|| missing_worker_snapshot_field("id"))?;
742 let worker_name = snapshot_string(&value, "name").unwrap_or_else(|| "worker".to_string());
743 let task = snapshot_string(&value, "task").unwrap_or_else(|| "Suspended worker".to_string());
744 let suspended_at = snapshot_pointer_string(&value, &["suspension", "suspended_at"]);
745 let started_at = snapshot_string(&value, "started_at")
746 .or_else(|| snapshot_string(&value, "created_at"))
747 .or_else(|| suspended_at.clone())
748 .unwrap_or_else(now_rfc3339);
749 let finished_at = snapshot_string(&value, "finished_at");
750 let session_id = snapshot_pointer_string(&value, &["config", "spec", "session_id"])
751 .or_else(|| snapshot_pointer_string(&value, &["audit", "session_id"]));
752 let parent_session_id =
753 snapshot_pointer_string(&value, &["config", "spec", "parent_session_id"])
754 .or_else(|| snapshot_pointer_string(&value, &["audit", "parent_session_id"]));
755 let child_run_id = snapshot_string(&value, "child_run_id");
756 let child_run_path = snapshot_string(&value, "child_run_path");
757 let execution = value
758 .get("execution")
759 .cloned()
760 .and_then(|value| serde_json::from_value::<RunExecutionRecord>(value).ok());
761
762 let child = RunChildRecord {
763 worker_id: worker_id.clone(),
764 worker_name: worker_name.clone(),
765 parent_stage_id: snapshot_string(&value, "parent_stage_id"),
766 session_id: session_id.clone(),
767 parent_session_id: parent_session_id.clone(),
768 mutation_scope: snapshot_pointer_string(&value, &["audit", "mutation_scope"]),
769 approval_policy: None,
770 task: task.clone(),
771 request: value.get("request").cloned(),
772 provenance: value.get("provenance").cloned(),
773 status: status.clone(),
774 started_at: started_at.clone(),
775 finished_at: finished_at.clone(),
776 run_id: child_run_id.clone(),
777 run_path: child_run_path.clone(),
778 snapshot_path: Some(snapshot_path_string.clone()),
779 execution,
780 };
781 let lineage = RunWorkerLineageRecord {
782 worker_id: worker_id.clone(),
783 worker_name,
784 parent_stage_id: child.parent_stage_id.clone(),
785 task: task.clone(),
786 status: status.clone(),
787 session_id,
788 parent_session_id,
789 run_id: child_run_id,
790 run_path: child_run_path,
791 snapshot_path: Some(snapshot_path_string.clone()),
792 };
793
794 let run_id = format!("checkpoint_{}", sanitize_topic_component(&worker_id));
795 let workflow_id = "worker_snapshot_checkpoint".to_string();
796 let workflow_name = Some("Worker snapshot checkpoint".to_string());
797 let checkpoint_id = format!("{run_id}_turn_boundary");
798 let checkpointed_at = suspended_at
799 .or_else(|| finished_at.clone())
800 .unwrap_or_else(|| started_at.clone());
801
802 Ok(RunRecord {
803 type_name: "run_record".to_string(),
804 id: run_id.clone(),
805 workflow_id: workflow_id.clone(),
806 workflow_name: workflow_name.clone(),
807 task,
808 status,
809 started_at,
810 finished_at,
811 checkpoints: vec![RunCheckpointRecord {
812 id: checkpoint_id,
813 ready_nodes: vec!["worker_snapshot_resume".to_string()],
814 completed_nodes: Vec::new(),
815 last_stage_id: None,
816 persisted_at: checkpointed_at.clone(),
817 reason: "suspended_worker_snapshot_turn_boundary".to_string(),
818 }],
819 child_runs: vec![child],
820 transcript: value.get("transcript").cloned(),
821 replay_fixture: Some(ReplayFixture {
822 type_name: "replay_fixture".to_string(),
823 id: format!("fixture_{run_id}"),
824 source_run_id: run_id,
825 workflow_id,
826 workflow_name,
827 created_at: checkpointed_at,
828 eval_kind: Some("worker_snapshot_checkpoint".to_string()),
829 expected_status: "suspended".to_string(),
830 ..ReplayFixture::default()
831 }),
832 observability: Some(RunObservabilityRecord {
833 schema_version: 4,
834 worker_lineage: vec![lineage],
835 ..RunObservabilityRecord::default()
836 }),
837 metadata: BTreeMap::from([
838 ("checkpoint_kind".to_string(), json!("worker_snapshot")),
839 (
840 "worker_snapshot_path".to_string(),
841 json!(snapshot_path_string),
842 ),
843 ]),
844 ..RunRecord::default()
845 })
846}
847
848fn snapshot_string(value: &JsonValue, key: &str) -> Option<String> {
849 value
850 .get(key)
851 .and_then(JsonValue::as_str)
852 .filter(|value| !value.is_empty())
853 .map(str::to_string)
854}
855
856fn missing_worker_snapshot_field(field: &str) -> SessionBundleError {
857 SessionBundleError::MissingRequired(format!("$.worker_snapshot.{field}"))
858}
859
860fn require_worker_snapshot_marker(value: &JsonValue) -> Result<(), SessionBundleError> {
861 match snapshot_string(value, "_type").as_deref() {
862 Some("worker_snapshot") => Ok(()),
863 Some(_) => Err(SessionBundleError::InvalidType {
864 path: "$.worker_snapshot._type".to_string(),
865 expected: "\"worker_snapshot\"".to_string(),
866 }),
867 None => Err(missing_worker_snapshot_field("_type")),
868 }
869}
870
871fn require_worker_snapshot_object_field(
872 value: &JsonValue,
873 field: &str,
874) -> Result<(), SessionBundleError> {
875 match value.get(field) {
876 Some(JsonValue::Object(_)) => Ok(()),
877 Some(_) => Err(SessionBundleError::InvalidType {
878 path: format!("$.worker_snapshot.{field}"),
879 expected: "object".to_string(),
880 }),
881 None => Err(missing_worker_snapshot_field(field)),
882 }
883}
884
885fn snapshot_pointer_string(value: &JsonValue, path: &[&str]) -> Option<String> {
886 let mut current = value;
887 for component in path {
888 current = current.get(*component)?;
889 }
890 current
891 .as_str()
892 .filter(|value| !value.is_empty())
893 .map(str::to_string)
894}
895
896pub fn materialize_worker_snapshots(
897 bundle: &SessionBundle,
898 out_dir: &Path,
899) -> Result<Vec<MaterializedWorkerSnapshot>, SessionBundleError> {
900 if bundle.replay.worker_snapshots.is_empty() {
901 return Ok(Vec::new());
902 }
903 fs::create_dir_all(out_dir).map_err(|error| {
904 SessionBundleError::Encode(format!(
905 "failed to create worker snapshot directory {}: {error}",
906 out_dir.display()
907 ))
908 })?;
909
910 let mut materialized = Vec::new();
911 for (index, snapshot) in bundle.replay.worker_snapshots.iter().enumerate() {
912 let worker_id = if snapshot.worker_id.trim().is_empty() {
913 format!("worker_{index}")
914 } else {
915 snapshot.worker_id.clone()
916 };
917 let path = out_dir.join(worker_snapshot_file_name(&worker_id, index));
918 let value = worker_snapshot_value_for_import(&snapshot.value, &path);
919 let rendered = serde_json::to_string_pretty(&value)
920 .map(|json| format!("{json}\n"))
921 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
922 fs::write(&path, rendered).map_err(|error| {
923 SessionBundleError::Encode(format!(
924 "failed to write worker snapshot {}: {error}",
925 path.display()
926 ))
927 })?;
928 materialized.push(MaterializedWorkerSnapshot {
929 worker_id,
930 path: path.to_string_lossy().into_owned(),
931 });
932 }
933 Ok(materialized)
934}
935
936fn apply_materialized_worker_snapshot_paths(
937 run_record: &mut JsonValue,
938 materialized: &[MaterializedWorkerSnapshot],
939) {
940 if materialized.is_empty() {
941 return;
942 }
943
944 let paths_by_worker_id = materialized
945 .iter()
946 .filter(|snapshot| !snapshot.worker_id.is_empty())
947 .map(|snapshot| (snapshot.worker_id.as_str(), snapshot.path.as_str()))
948 .collect::<BTreeMap<_, _>>();
949 if paths_by_worker_id.is_empty() {
950 return;
951 }
952
953 rewrite_worker_snapshot_paths(run_record.get_mut("child_runs"), &paths_by_worker_id);
954 rewrite_worker_snapshot_paths(
955 run_record
956 .get_mut("observability")
957 .and_then(|observability| observability.get_mut("worker_lineage")),
958 &paths_by_worker_id,
959 );
960 rewrite_checkpoint_metadata_snapshot_path(run_record, materialized);
961}
962
963fn rewrite_worker_snapshot_paths(
964 records: Option<&mut JsonValue>,
965 paths_by_worker_id: &BTreeMap<&str, &str>,
966) {
967 let Some(records) = records.and_then(JsonValue::as_array_mut) else {
968 return;
969 };
970 for record in records {
971 let Some(worker_id) = record.get("worker_id").and_then(JsonValue::as_str) else {
972 continue;
973 };
974 let Some(path) = paths_by_worker_id.get(worker_id) else {
975 continue;
976 };
977 if let JsonValue::Object(map) = record {
978 map.insert(
979 "snapshot_path".to_string(),
980 JsonValue::String((*path).to_string()),
981 );
982 }
983 }
984}
985
986fn rewrite_checkpoint_metadata_snapshot_path(
987 run_record: &mut JsonValue,
988 materialized: &[MaterializedWorkerSnapshot],
989) {
990 let Some(snapshot) = materialized.first() else {
991 return;
992 };
993 let Some(metadata) = run_record
994 .get_mut("metadata")
995 .and_then(JsonValue::as_object_mut)
996 else {
997 return;
998 };
999 if metadata.contains_key("worker_snapshot_path") {
1000 metadata.insert(
1001 "worker_snapshot_path".to_string(),
1002 JsonValue::String(snapshot.path.clone()),
1003 );
1004 }
1005}
1006
1007fn replay_observability_for_import(replay: &BundleReplay) -> Option<RunObservabilityRecord> {
1008 let mut observability = replay.observability.clone().unwrap_or_default();
1009 let has_observability = replay.observability.is_some();
1010 let has_verification_outcomes = !replay.verification_outcomes.is_empty();
1011 if !has_observability && !has_verification_outcomes {
1012 return None;
1013 }
1014 if observability.schema_version == 0 {
1015 observability.schema_version = 4;
1016 }
1017 if observability.verification_outcomes.is_empty() && has_verification_outcomes {
1018 observability.verification_outcomes = replay.verification_outcomes.clone();
1019 }
1020 Some(observability)
1021}
1022
1023pub fn session_bundle_from_agent_session_events(
1024 session_id: &str,
1025 events: &[AgentSessionReplayEvent],
1026) -> Result<SessionBundle, SessionBundleError> {
1027 if events.is_empty() {
1028 return Err(SessionBundleError::MissingSessionEvents {
1029 session_id: session_id.to_string(),
1030 });
1031 }
1032
1033 let stable_id = sanitize_topic_component(session_id);
1034 let started_at = rfc3339_from_epoch_ms(events[0].occurred_at_ms);
1035 let liveness = agent_session_liveness(events);
1040 let finished_at = match &liveness {
1041 AgentSessionLiveness::Closed { finished_at_ms, .. } => {
1042 Some(rfc3339_from_epoch_ms(*finished_at_ms))
1043 }
1044 AgentSessionLiveness::Suspended => None,
1045 };
1046 let status = liveness.status().to_string();
1047 let run_id = session_id.to_string();
1048 let workflow_id = "agent_session".to_string();
1049 let created_at = finished_at.clone().unwrap_or_else(|| started_at.clone());
1050 let transcript_events = transcript_events_from_agent_session(events)?;
1051 let transcript_messages = transcript_messages_from_agent_session(events);
1052 let mut transcript_metadata = BTreeMap::new();
1053 transcript_metadata.insert("session_id".to_string(), json!(session_id));
1054 transcript_metadata.insert(
1055 "source".to_string(),
1056 json!("events.sqlite observability.agent_events topic"),
1057 );
1058
1059 let replay_fixture = ReplayFixture {
1060 type_name: "replay_fixture".to_string(),
1061 id: format!("fixture_from_session_{stable_id}"),
1062 source_run_id: run_id.clone(),
1063 workflow_id: workflow_id.clone(),
1064 workflow_name: Some(format!("Agent session {session_id}")),
1065 created_at: created_at.clone(),
1066 eval_kind: Some("replay".to_string()),
1067 expected_status: status.clone(),
1068 ..ReplayFixture::default()
1069 };
1070
1071 Ok(SessionBundle {
1072 bundle_id: format!("bundle_from_session_{stable_id}"),
1073 created_at,
1074 producer: BundleProducer {
1075 name: "harn".to_string(),
1076 version: env!("CARGO_PKG_VERSION").to_string(),
1077 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1078 },
1079 source: BundleSource {
1080 kind: "event_log_session".to_string(),
1081 run_record_id: run_id,
1082 workflow_id,
1083 workflow_name: Some(format!("Agent session {session_id}")),
1084 task: task_from_agent_session(events)
1085 .unwrap_or_else(|| format!("Agent session {session_id}")),
1086 status,
1087 started_at,
1088 finished_at,
1089 ..BundleSource::default()
1090 },
1091 runtime: BundleRuntime {
1092 harn_version: env!("CARGO_PKG_VERSION").to_string(),
1093 ..BundleRuntime::default()
1094 },
1095 transcript: BundleTranscript {
1096 sections: vec![BundleTranscriptSection {
1097 id: "agent_events".to_string(),
1098 label: "Agent event log".to_string(),
1099 scope: "session".to_string(),
1100 location: format!(
1101 "observability.agent_events.{}",
1102 sanitize_topic_component(session_id)
1103 ),
1104 summary: None,
1105 messages: transcript_messages,
1106 events: transcript_events,
1107 assets: Vec::new(),
1108 metadata: transcript_metadata,
1109 }],
1110 },
1111 permissions: permissions_from_agent_session(events),
1112 replay: BundleReplay {
1113 replay_fixture: Some(replay_fixture),
1114 event_log_pointers: vec![BundleEventLogPointer {
1115 kind: "agent_events".to_string(),
1116 topic: Some(format!(
1117 "observability.agent_events.{}",
1118 sanitize_topic_component(session_id)
1119 )),
1120 path: None,
1121 location: "events.sqlite".to_string(),
1122 available: true,
1123 }],
1124 deterministic_events: deterministic_events_from_agent_session(events)?,
1125 ..BundleReplay::default()
1126 },
1127 metadata: BTreeMap::from([(
1128 SESSION_BUNDLE_LIVENESS_KEY.to_string(),
1129 json!(liveness.tag()),
1130 )]),
1131 ..SessionBundle::default()
1132 })
1133}
1134
1135pub fn import_run_record_from_agent_session_events(
1136 session_id: &str,
1137 events: &[AgentSessionReplayEvent],
1138) -> Result<RunRecord, SessionBundleError> {
1139 let bundle = session_bundle_from_agent_session_events(session_id, events)?;
1140 let run_record = import_run_record_value(&bundle)?;
1141 serde_json::from_value(run_record)
1142 .map_err(|error| SessionBundleError::Decode(error.to_string()))
1143}
1144
1145fn transcript_events_from_agent_session(
1146 events: &[AgentSessionReplayEvent],
1147) -> Result<Vec<JsonValue>, SessionBundleError> {
1148 events
1149 .iter()
1150 .map(|entry| {
1151 let event = serde_json::to_value(&entry.event)
1152 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
1153 Ok(json!({
1154 "event_id": entry.event_id,
1155 "kind": entry.kind,
1156 "occurred_at_ms": entry.occurred_at_ms,
1157 "event": event,
1158 }))
1159 })
1160 .collect()
1161}
1162
1163fn transcript_messages_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<JsonValue> {
1164 events
1165 .iter()
1166 .filter_map(|entry| match &entry.event {
1167 AgentEvent::UserMessage { content, .. } => Some(json!({
1168 "role": "user",
1169 "content": content,
1170 })),
1171 AgentEvent::AgentMessageChunk { content, .. } if !content.is_empty() => Some(json!({
1172 "role": "assistant",
1173 "content": content,
1174 })),
1175 _ => None,
1176 })
1177 .collect()
1178}
1179
1180fn permissions_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<BundlePermission> {
1181 let mut permissions = Vec::new();
1182 for entry in events {
1183 if let AgentEvent::HitlRequested {
1184 request_id,
1185 kind,
1186 payload,
1187 ..
1188 } = &entry.event
1189 {
1190 permissions.push(BundlePermission {
1191 kind: "hitl_question".to_string(),
1192 source: "agent_events".to_string(),
1193 request_id: Some(request_id.clone()),
1194 agent: None,
1195 payload: json!({
1196 "kind": kind,
1197 "payload": payload,
1198 "event_id": entry.event_id,
1199 "occurred_at_ms": entry.occurred_at_ms,
1200 }),
1201 });
1202 }
1203 }
1204 permissions
1205}
1206
1207fn deterministic_events_from_agent_session(
1208 events: &[AgentSessionReplayEvent],
1209) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1210 transcript_events_from_agent_session(events).map(|entries| {
1211 entries
1212 .into_iter()
1213 .enumerate()
1214 .map(|(index, value)| BundleJsonEntry {
1215 source: "events.sqlite.agent_events".to_string(),
1216 index,
1217 value,
1218 })
1219 .collect()
1220 })
1221}
1222
1223fn task_from_agent_session(events: &[AgentSessionReplayEvent]) -> Option<String> {
1224 events.iter().find_map(|entry| match &entry.event {
1225 AgentEvent::UserMessage { content, .. } => user_message_text(content),
1226 _ => None,
1227 })
1228}
1229
1230fn user_message_text(content: &[JsonValue]) -> Option<String> {
1231 let parts = content
1232 .iter()
1233 .filter_map(|value| {
1234 value
1235 .get("text")
1236 .and_then(JsonValue::as_str)
1237 .or_else(|| value.as_str())
1238 .map(str::to_string)
1239 })
1240 .filter(|text| !text.trim().is_empty())
1241 .collect::<Vec<_>>();
1242 if parts.is_empty() {
1243 None
1244 } else {
1245 Some(parts.join("\n"))
1246 }
1247}
1248
1249fn rfc3339_from_epoch_ms(ms: i64) -> String {
1250 DateTime::<Utc>::from_timestamp_millis(ms)
1251 .unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).expect("unix epoch is valid"))
1252 .to_rfc3339_opts(SecondsFormat::Millis, true)
1253}
1254
1255fn raw_bundle_from_run(
1256 run: &RunRecord,
1257 run_record_value: JsonValue,
1258 include_attachments: bool,
1259) -> Result<SessionBundle, SessionBundleError> {
1260 let mut bundle = SessionBundle {
1261 bundle_id: new_id("bundle"),
1262 created_at: now_rfc3339(),
1263 producer: BundleProducer {
1264 name: "harn".to_string(),
1265 version: env!("CARGO_PKG_VERSION").to_string(),
1266 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1267 },
1268 source: BundleSource {
1269 kind: "run_record".to_string(),
1270 run_record_id: run.id.clone(),
1271 workflow_id: run.workflow_id.clone(),
1272 workflow_name: run.workflow_name.clone(),
1273 task: run.task.clone(),
1274 status: run.status.clone(),
1275 started_at: run.started_at.clone(),
1276 finished_at: run.finished_at.clone(),
1277 persisted_path: run.persisted_path.clone(),
1278 root_run_id: run.root_run_id.clone(),
1279 parent_run_id: run.parent_run_id.clone(),
1280 child_run_count: run.child_runs.len(),
1281 },
1282 runtime: BundleRuntime {
1283 harn_version: env!("CARGO_PKG_VERSION").to_string(),
1284 provider_models: run
1285 .usage
1286 .as_ref()
1287 .map(|usage| usage.models.clone())
1288 .unwrap_or_default(),
1289 usage: run.usage.as_ref().map(|usage| BundleUsage {
1290 input_tokens: usage.input_tokens,
1291 output_tokens: usage.output_tokens,
1292 call_count: usage.call_count,
1293 total_duration_ms: usage.total_duration_ms,
1294 total_cost: usage.total_cost,
1295 models: usage.models.clone(),
1296 }),
1297 metadata: BTreeMap::new(),
1298 },
1299 workspace: workspace_from_run(run),
1300 transcript: transcript_from_run(run),
1301 tools: BundleTools {
1302 schemas: tool_schema_entries(run),
1303 calls: run
1304 .tool_recordings
1305 .iter()
1306 .map(BundleToolCall::from)
1307 .collect(),
1308 },
1309 permissions: permissions_from_run(run),
1310 replay: BundleReplay {
1311 replay_fixture: run.replay_fixture.clone(),
1312 run_record: Some(run_record_value),
1313 observability: run.observability.clone(),
1314 verification_outcomes: verification_outcomes_for_run(run),
1315 worker_snapshots: worker_snapshots_from_run(run),
1316 event_log_pointers: event_log_pointers_from_run(run),
1317 transitions: run.transitions.clone(),
1318 checkpoints: run.checkpoints.clone(),
1319 trace_spans: run.trace_spans.clone(),
1320 deterministic_events: deterministic_events_from_run(run)?,
1321 },
1322 redaction: RedactionManifest {
1323 mode: "sanitized".to_string(),
1324 policy: "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths"
1325 .to_string(),
1326 placeholder: REDACTED_PLACEHOLDER.to_string(),
1327 entries: Vec::new(),
1328 unsafe_secret_markers_rejected: true,
1329 },
1330 attachments: if include_attachments {
1331 attachments_from_run(run)
1332 } else {
1333 Vec::new()
1334 },
1335 ..SessionBundle::default()
1336 };
1337 bundle.metadata.insert(
1338 "format_note".to_string(),
1339 json!("Session bundles are portable JSON envelopes; hosted share links should reference sanitized bundles rather than raw run records."),
1340 );
1341 Ok(bundle)
1342}
1343
1344fn verification_outcomes_for_run(run: &RunRecord) -> Vec<RunVerificationOutcomeRecord> {
1345 if let Some(observability) = run.observability.as_ref() {
1346 return observability.verification_outcomes.clone();
1347 }
1348 derive_run_observability(run, run.persisted_path.as_deref().map(Path::new))
1349 .verification_outcomes
1350}
1351
1352fn bundle_redaction_policy(base: &RedactionPolicy) -> RedactionPolicy {
1353 base.clone()
1354 .with_extra_field("persisted_path")
1355 .with_extra_field("primary")
1356 .with_extra_field("run_path")
1357 .with_extra_field("snapshot_ref")
1358 .with_extra_field("snapshot_path")
1359 .with_extra_field("source_path")
1360}
1361
1362fn workspace_from_run(run: &RunRecord) -> Option<BundleWorkspace> {
1363 let anchor = run
1364 .transcript
1365 .as_ref()
1366 .and_then(|transcript| transcript.get("metadata"))
1367 .and_then(anchor_from_transcript_metadata_json)?;
1368 Some(BundleWorkspace::from(&anchor))
1369}
1370
1371fn transcript_from_run(run: &RunRecord) -> BundleTranscript {
1372 let mut sections = Vec::new();
1373 if let Some(transcript) = &run.transcript {
1374 sections.push(transcript_section(
1375 "run",
1376 "Run transcript",
1377 "run",
1378 "$.transcript",
1379 transcript,
1380 ));
1381 }
1382 for (index, stage) in run.stages.iter().enumerate() {
1383 if let Some(transcript) = &stage.transcript {
1384 sections.push(transcript_section(
1385 &stage.id,
1386 &format!("Stage {}", stage.node_id),
1387 "stage",
1388 &format!("$.stages[{index}].transcript"),
1389 transcript,
1390 ));
1391 }
1392 }
1393 BundleTranscript { sections }
1394}
1395
1396fn transcript_section(
1397 id: &str,
1398 label: &str,
1399 scope: &str,
1400 location: &str,
1401 transcript: &JsonValue,
1402) -> BundleTranscriptSection {
1403 BundleTranscriptSection {
1404 id: id.to_string(),
1405 label: label.to_string(),
1406 scope: scope.to_string(),
1407 location: location.to_string(),
1408 summary: transcript
1409 .get("summary")
1410 .and_then(JsonValue::as_str)
1411 .map(str::to_string),
1412 messages: json_array(transcript.get("messages")),
1413 events: json_array(transcript.get("events")),
1414 assets: json_array(transcript.get("assets")),
1415 metadata: transcript
1416 .get("metadata")
1417 .and_then(JsonValue::as_object)
1418 .map(|map| {
1419 map.iter()
1420 .map(|(key, value)| (key.clone(), value.clone()))
1421 .collect()
1422 })
1423 .unwrap_or_default(),
1424 }
1425}
1426
1427fn json_array(value: Option<&JsonValue>) -> Vec<JsonValue> {
1428 value
1429 .and_then(JsonValue::as_array)
1430 .cloned()
1431 .unwrap_or_default()
1432}
1433
1434fn tool_schema_entries(run: &RunRecord) -> Vec<BundleJsonEntry> {
1435 let mut entries = Vec::new();
1436 collect_tool_schema_entries_from_transcript(&mut entries, "run.transcript", &run.transcript);
1437 for stage in &run.stages {
1438 collect_tool_schema_entries_from_transcript(
1439 &mut entries,
1440 &format!("stage.{}.transcript", stage.node_id),
1441 &stage.transcript,
1442 );
1443 if let Some(tools) = stage
1444 .metadata
1445 .get("tool_schemas")
1446 .or_else(|| stage.metadata.get("tools"))
1447 {
1448 entries.push(BundleJsonEntry {
1449 source: format!("stage.{}.metadata", stage.node_id),
1450 index: entries.len(),
1451 value: tools.clone(),
1452 });
1453 }
1454 }
1455 entries
1456}
1457
1458fn collect_tool_schema_entries_from_transcript(
1459 entries: &mut Vec<BundleJsonEntry>,
1460 source: &str,
1461 transcript: &Option<JsonValue>,
1462) {
1463 let Some(transcript) = transcript else {
1464 return;
1465 };
1466 for event in transcript
1467 .get("events")
1468 .and_then(JsonValue::as_array)
1469 .into_iter()
1470 .flatten()
1471 {
1472 let kind = event
1473 .get("type")
1474 .or_else(|| event.get("kind"))
1475 .and_then(JsonValue::as_str)
1476 .unwrap_or_default();
1477 if kind == "tool_schemas" || kind == "tool_schema" {
1478 entries.push(BundleJsonEntry {
1479 source: source.to_string(),
1480 index: entries.len(),
1481 value: event.clone(),
1482 });
1483 }
1484 }
1485}
1486
1487fn permissions_from_run(run: &RunRecord) -> Vec<BundlePermission> {
1488 let mut permissions = run
1489 .hitl_questions
1490 .iter()
1491 .map(permission_from_hitl_question)
1492 .collect::<Vec<_>>();
1493 collect_permission_events(&mut permissions, "run.transcript", &run.transcript);
1494 for stage in &run.stages {
1495 collect_permission_events(
1496 &mut permissions,
1497 &format!("stage.{}.transcript", stage.node_id),
1498 &stage.transcript,
1499 );
1500 if let Some(worker) = stage.metadata.get("worker") {
1501 if let Some(policy) = worker
1502 .get("audit")
1503 .and_then(|audit| audit.get("approval_policy"))
1504 {
1505 permissions.push(BundlePermission {
1506 kind: "approval_policy".to_string(),
1507 source: format!("stage.{}.worker.audit", stage.node_id),
1508 request_id: None,
1509 agent: worker
1510 .get("name")
1511 .and_then(JsonValue::as_str)
1512 .map(str::to_string),
1513 payload: policy.clone(),
1514 });
1515 }
1516 }
1517 }
1518 permissions
1519}
1520
1521fn permission_from_hitl_question(question: &RunHitlQuestionRecord) -> BundlePermission {
1522 BundlePermission {
1523 kind: "hitl_question".to_string(),
1524 source: "run.hitl_questions".to_string(),
1525 request_id: Some(question.request_id.clone()),
1526 agent: if question.agent.is_empty() {
1527 None
1528 } else {
1529 Some(question.agent.clone())
1530 },
1531 payload: serde_json::to_value(question).unwrap_or(JsonValue::Null),
1532 }
1533}
1534
1535fn collect_permission_events(
1536 permissions: &mut Vec<BundlePermission>,
1537 source: &str,
1538 transcript: &Option<JsonValue>,
1539) {
1540 let Some(transcript) = transcript else {
1541 return;
1542 };
1543 for event in transcript
1544 .get("events")
1545 .and_then(JsonValue::as_array)
1546 .into_iter()
1547 .flatten()
1548 {
1549 let kind = event
1550 .get("type")
1551 .or_else(|| event.get("kind"))
1552 .and_then(JsonValue::as_str)
1553 .unwrap_or_default();
1554 if kind.contains("permission") || kind.contains("approval") || kind.starts_with("hitl_") {
1555 permissions.push(BundlePermission {
1556 kind: kind.to_string(),
1557 source: source.to_string(),
1558 request_id: event
1559 .get("request_id")
1560 .or_else(|| event.get("id"))
1561 .and_then(JsonValue::as_str)
1562 .map(str::to_string),
1563 agent: event
1564 .get("agent")
1565 .and_then(JsonValue::as_str)
1566 .map(str::to_string),
1567 payload: event.clone(),
1568 });
1569 }
1570 }
1571}
1572
1573fn event_log_pointers_from_run(run: &RunRecord) -> Vec<BundleEventLogPointer> {
1574 let mut pointers = Vec::new();
1575 if let Some(observability) = &run.observability {
1576 for pointer in &observability.transcript_pointers {
1577 pointers.push(BundleEventLogPointer {
1578 kind: pointer.kind.clone(),
1579 topic: None,
1580 path: pointer.path.clone(),
1581 location: pointer.location.clone(),
1582 available: pointer.available,
1583 });
1584 }
1585 for worker in &observability.worker_lineage {
1586 if let Some(session_id) = &worker.session_id {
1587 pointers.push(BundleEventLogPointer {
1588 kind: "agent_events".to_string(),
1589 topic: Some(format!("observability.agent_events.{session_id}")),
1590 path: worker.snapshot_path.clone(),
1591 location: format!("worker.{}.session", worker.worker_id),
1592 available: worker.snapshot_path.is_some(),
1593 });
1594 }
1595 }
1596 }
1597 pointers
1598}
1599
1600fn worker_snapshots_from_run(run: &RunRecord) -> Vec<BundleWorkerSnapshot> {
1601 let mut snapshots = Vec::new();
1602 let mut seen_paths = BTreeSet::new();
1603 for child in &run.child_runs {
1604 let Some(path) = child.snapshot_path.as_deref() else {
1605 continue;
1606 };
1607 if !seen_paths.insert(path.to_string()) {
1608 continue;
1609 }
1610 if let Some(snapshot) = worker_snapshot_from_path(
1611 &child.worker_id,
1612 &child.worker_name,
1613 &child.status,
1614 Path::new(path),
1615 ) {
1616 snapshots.push(snapshot);
1617 }
1618 }
1619 if let Some(observability) = run.observability.as_ref() {
1620 for worker in &observability.worker_lineage {
1621 let Some(path) = worker.snapshot_path.as_deref() else {
1622 continue;
1623 };
1624 if !seen_paths.insert(path.to_string()) {
1625 continue;
1626 }
1627 if let Some(snapshot) = worker_snapshot_from_path(
1628 &worker.worker_id,
1629 &worker.worker_name,
1630 &worker.status,
1631 Path::new(path),
1632 ) {
1633 snapshots.push(snapshot);
1634 }
1635 }
1636 }
1637 snapshots
1638}
1639
1640fn worker_snapshot_from_path(
1641 worker_id: &str,
1642 worker_name: &str,
1643 status: &str,
1644 path: &Path,
1645) -> Option<BundleWorkerSnapshot> {
1646 let content = fs::read_to_string(path).ok()?;
1647 let value = serde_json::from_str::<JsonValue>(&content).ok()?;
1648 Some(BundleWorkerSnapshot {
1649 worker_id: if worker_id.is_empty() {
1650 value
1651 .get("id")
1652 .and_then(JsonValue::as_str)
1653 .unwrap_or_default()
1654 .to_string()
1655 } else {
1656 worker_id.to_string()
1657 },
1658 worker_name: if worker_name.is_empty() {
1659 value
1660 .get("name")
1661 .and_then(JsonValue::as_str)
1662 .unwrap_or("worker")
1663 .to_string()
1664 } else {
1665 worker_name.to_string()
1666 },
1667 status: if status.is_empty() {
1668 value
1669 .get("status")
1670 .and_then(JsonValue::as_str)
1671 .unwrap_or_default()
1672 .to_string()
1673 } else {
1674 status.to_string()
1675 },
1676 snapshot_ref: value
1677 .get("suspension")
1678 .and_then(|value| value.get("snapshot_ref"))
1679 .and_then(JsonValue::as_str)
1680 .or_else(|| value.get("snapshot_path").and_then(JsonValue::as_str))
1681 .unwrap_or_else(|| path.to_str().unwrap_or_default())
1682 .to_string(),
1683 source_path: Some(path.to_string_lossy().into_owned()),
1684 value,
1685 })
1686}
1687
1688fn worker_snapshot_file_name(worker_id: &str, index: usize) -> String {
1689 let component = sanitize_topic_component(worker_id);
1690 let component = if component.is_empty() {
1691 format!("worker_{index}")
1692 } else {
1693 component
1694 };
1695 format!("{component}.json")
1696}
1697
1698fn worker_snapshot_value_for_import(value: &JsonValue, path: &Path) -> JsonValue {
1699 let mut value = value.clone();
1700 let path = path.to_string_lossy().into_owned();
1701 if let JsonValue::Object(map) = &mut value {
1702 map.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
1703 if let Some(JsonValue::Object(suspension)) = map.get_mut("suspension") {
1704 suspension.insert("snapshot_ref".to_string(), JsonValue::String(path));
1705 }
1706 }
1707 value
1708}
1709
1710fn deterministic_events_from_run(
1711 run: &RunRecord,
1712) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1713 let mut events = Vec::new();
1714 for (index, transition) in run.transitions.iter().enumerate() {
1715 events.push(BundleJsonEntry {
1716 source: "run.transitions".to_string(),
1717 index,
1718 value: serde_json::to_value(transition)
1719 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1720 });
1721 }
1722 for (index, checkpoint) in run.checkpoints.iter().enumerate() {
1723 events.push(BundleJsonEntry {
1724 source: "run.checkpoints".to_string(),
1725 index,
1726 value: serde_json::to_value(checkpoint)
1727 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1728 });
1729 }
1730 Ok(events)
1731}
1732
1733fn attachments_from_run(run: &RunRecord) -> Vec<BundleAttachment> {
1734 run.artifacts
1735 .iter()
1736 .map(|artifact| BundleAttachment {
1737 id: artifact.id.clone(),
1738 kind: artifact.kind.clone(),
1739 title: artifact.title.clone(),
1740 stage: artifact.stage.clone(),
1741 text: artifact.text.clone(),
1742 data: artifact.data.clone(),
1743 metadata: artifact.metadata.clone(),
1744 })
1745 .collect()
1746}
1747
1748fn redact_json_with_manifest(
1749 value: &mut JsonValue,
1750 path: &str,
1751 policy: &RedactionPolicy,
1752 entries: &mut Vec<RedactionEntry>,
1753) {
1754 match value {
1755 JsonValue::Object(map) => {
1756 let keys = map.keys().cloned().collect::<Vec<_>>();
1757 for key in keys {
1758 let child_path = json_path_child(path, &key);
1759 if policy.field_is_sensitive(&key) {
1760 map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1761 entries.push(RedactionEntry {
1762 path: child_path,
1763 class: "sensitive_field".to_string(),
1764 action: "replaced".to_string(),
1765 replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1766 });
1767 } else if let Some(child) = map.get_mut(&key) {
1768 redact_json_with_manifest(child, &child_path, policy, entries);
1769 }
1770 }
1771 }
1772 JsonValue::Array(items) => {
1773 for (index, item) in items.iter_mut().enumerate() {
1774 redact_json_with_manifest(item, &format!("{path}[{index}]"), policy, entries);
1775 }
1776 }
1777 JsonValue::String(text) => {
1778 let redacted = policy.redact_string(text);
1779 if let Cow::Owned(replacement) = redacted {
1780 let manifest_replacement = replacement.clone();
1785 *text = replacement;
1786 entries.push(RedactionEntry {
1787 path: path.to_string(),
1788 class: "secret_pattern_or_url".to_string(),
1789 action: "replaced".to_string(),
1790 replacement: Some(manifest_replacement),
1791 });
1792 }
1793 }
1794 _ => {}
1795 }
1796}
1797
1798fn redact_bundle_pointer_paths_json(
1799 value: &mut JsonValue,
1800 path: &str,
1801 entries: &mut Vec<RedactionEntry>,
1802) {
1803 match value {
1804 JsonValue::Object(map) => {
1805 let keys = map.keys().cloned().collect::<Vec<_>>();
1806 for key in keys {
1807 let child_path = json_path_child(path, &key);
1808 if key == "path" && bundle_pointer_path_should_redact(&child_path) {
1809 if !map.get(&key).is_some_and(JsonValue::is_null) {
1810 map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1811 entries.push(RedactionEntry {
1812 path: child_path,
1813 class: "local_pointer_path".to_string(),
1814 action: "replaced".to_string(),
1815 replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1816 });
1817 }
1818 } else if let Some(child) = map.get_mut(&key) {
1819 redact_bundle_pointer_paths_json(child, &child_path, entries);
1820 }
1821 }
1822 }
1823 JsonValue::Array(items) => {
1824 for (index, item) in items.iter_mut().enumerate() {
1825 redact_bundle_pointer_paths_json(item, &format!("{path}[{index}]"), entries);
1826 }
1827 }
1828 _ => {}
1829 }
1830}
1831
1832fn bundle_pointer_path_should_redact(path: &str) -> bool {
1833 path.contains(".event_log_pointers[") || path.contains(".transcript_pointers[")
1834}
1835
1836fn withhold_replay_only_json(value: &mut JsonValue, path: &str, entries: &mut Vec<RedactionEntry>) {
1837 match value {
1838 JsonValue::Object(map) => {
1839 let keys = map.keys().cloned().collect::<Vec<_>>();
1840 for key in keys {
1841 let child_path = json_path_child(path, &key);
1842 if replay_only_field_is_prompt_payload(&key) {
1843 if !map.get(&key).is_some_and(JsonValue::is_null) {
1844 map.insert(key, JsonValue::String(REPLAY_ONLY_PLACEHOLDER.to_string()));
1845 entries.push(RedactionEntry {
1846 path: child_path,
1847 class: "prompt_or_tool_payload".to_string(),
1848 action: "withheld".to_string(),
1849 replacement: Some(REPLAY_ONLY_PLACEHOLDER.to_string()),
1850 });
1851 }
1852 } else if let Some(child) = map.get_mut(&key) {
1853 withhold_replay_only_json(child, &child_path, entries);
1854 }
1855 }
1856 }
1857 JsonValue::Array(items) => {
1858 for (index, item) in items.iter_mut().enumerate() {
1859 withhold_replay_only_json(item, &format!("{path}[{index}]"), entries);
1860 }
1861 }
1862 _ => {}
1863 }
1864}
1865
1866fn replay_only_field_is_prompt_payload(key: &str) -> bool {
1867 matches!(
1868 key,
1869 "args"
1870 | "arguments"
1871 | "blocks"
1872 | "content"
1873 | "data"
1874 | "private_reasoning"
1875 | "prompt"
1876 | "raw_input"
1877 | "raw_output"
1878 | "result"
1879 | "response_text"
1880 | "summary"
1881 | "system"
1882 | "system_prompt"
1883 | "task"
1884 | "text"
1885 | "thinking"
1886 | "visible_text"
1887 )
1888}
1889
1890fn set_json_path(value: &mut JsonValue, path: &[&str], replacement: JsonValue) {
1891 let Some((head, tail)) = path.split_first() else {
1892 *value = replacement;
1893 return;
1894 };
1895 if tail.is_empty() {
1896 if let JsonValue::Object(map) = value {
1897 map.insert((*head).to_string(), replacement);
1898 }
1899 return;
1900 }
1901 if let JsonValue::Object(map) = value {
1902 if let Some(child) = map.get_mut(*head) {
1903 set_json_path(child, tail, replacement);
1904 }
1905 }
1906}
1907
1908fn require_field(value: &JsonValue, field: &str) -> Result<(), SessionBundleError> {
1909 if value.get(field).is_some() {
1910 Ok(())
1911 } else {
1912 Err(SessionBundleError::MissingRequired(format!("$.{field}")))
1913 }
1914}
1915
1916fn require_nested_field(value: &JsonValue, path: &[&str]) -> Result<(), SessionBundleError> {
1917 let mut current = value;
1918 for segment in path {
1919 current = current
1920 .get(*segment)
1921 .ok_or_else(|| SessionBundleError::MissingRequired(json_path_from_segments(path)))?;
1922 }
1923 Ok(())
1924}
1925
1926fn json_path_from_segments(path: &[&str]) -> String {
1927 path.iter().fold("$".to_string(), |parent, segment| {
1928 json_path_child(&parent, segment)
1929 })
1930}
1931
1932fn reject_unredacted_secret_markers(
1933 value: &JsonValue,
1934 path: &str,
1935 policy: &RedactionPolicy,
1936) -> Result<(), SessionBundleError> {
1937 match value {
1938 JsonValue::Object(map) => {
1939 for (key, child) in map {
1940 reject_unredacted_secret_markers(child, &json_path_child(path, key), policy)?;
1941 }
1942 }
1943 JsonValue::Array(items) => {
1944 for (index, item) in items.iter().enumerate() {
1945 reject_unredacted_secret_markers(item, &format!("{path}[{index}]"), policy)?;
1946 }
1947 }
1948 JsonValue::String(text) => {
1949 if matches!(policy.redact_string(text), Cow::Owned(_)) {
1950 return Err(SessionBundleError::UnsafeSecretMarker {
1951 path: path.to_string(),
1952 excerpt: secret_excerpt(text),
1953 });
1954 }
1955 }
1956 _ => {}
1957 }
1958 Ok(())
1959}
1960
1961fn secret_excerpt(text: &str) -> String {
1962 let excerpt = text.chars().take(80).collect::<String>();
1963 if text.chars().count() > 80 {
1964 format!("{excerpt}...")
1965 } else {
1966 excerpt
1967 }
1968}
1969
1970fn json_path_child(parent: &str, key: &str) -> String {
1971 if key
1972 .chars()
1973 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1974 {
1975 format!("{parent}.{key}")
1976 } else {
1977 format!(
1978 "{parent}[{}]",
1979 serde_json::to_string(key).unwrap_or_default()
1980 )
1981 }
1982}