1use std::borrow::Cow;
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt;
12use std::fs;
13use std::path::Path;
14
15use chrono::{DateTime, SecondsFormat, Utc};
16use serde::{Deserialize, Serialize};
17use serde_json::{json, Value as JsonValue};
18
19use crate::agent_events::AgentEvent;
20use crate::event_log::sanitize_topic_component;
21use crate::orchestration::{
22 derive_run_observability, new_id, now_rfc3339, AgentSessionReplayEvent, ReplayFixture,
23 RunCheckpointRecord, RunHitlQuestionRecord, RunObservabilityRecord, RunRecord,
24 RunTraceSpanRecord, RunTransitionRecord, RunVerificationOutcomeRecord, ToolCallRecord,
25};
26use crate::redact::{RedactionPolicy, REDACTED_PLACEHOLDER};
27use crate::workspace_anchor::{anchor_from_transcript_metadata_json, MountedRoot, WorkspaceAnchor};
28
29mod schema;
30pub use schema::{session_bundle_schema, session_bundle_schema_pretty};
31
32#[cfg(test)]
33mod tests;
34
35pub const SESSION_BUNDLE_TYPE: &str = "harn_session_bundle";
36pub const SESSION_BUNDLE_SCHEMA_VERSION: u32 = 1;
37pub const SESSION_BUNDLE_SCHEMA_ID: &str = "https://harnlang.com/schemas/session-bundle.v1.json";
38pub const REPLAY_ONLY_PLACEHOLDER: &str = "[withheld]";
39
40pub const SESSION_BUNDLE_STATUS_SUSPENDED: &str = "suspended";
46
47pub const SESSION_BUNDLE_STATUS_COMPLETED: &str = "completed";
50
51pub const SESSION_BUNDLE_LIVENESS_KEY: &str = "session_liveness";
55
56#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum AgentSessionLiveness {
66 Closed { status: String, finished_at_ms: i64 },
69 Suspended,
72}
73
74impl AgentSessionLiveness {
75 pub fn status(&self) -> &str {
77 match self {
78 AgentSessionLiveness::Closed { status, .. } => status,
79 AgentSessionLiveness::Suspended => SESSION_BUNDLE_STATUS_SUSPENDED,
80 }
81 }
82
83 pub fn tag(&self) -> &'static str {
85 match self {
86 AgentSessionLiveness::Closed { .. } => "closed",
87 AgentSessionLiveness::Suspended => "suspended",
88 }
89 }
90
91 pub fn is_suspended(&self) -> bool {
93 matches!(self, AgentSessionLiveness::Suspended)
94 }
95}
96
97pub fn agent_session_liveness(events: &[AgentSessionReplayEvent]) -> AgentSessionLiveness {
103 events
104 .iter()
105 .rev()
106 .find_map(|entry| match &entry.event {
107 AgentEvent::SessionClosed { status, .. } => Some(AgentSessionLiveness::Closed {
108 status: if status.is_empty() {
109 SESSION_BUNDLE_STATUS_COMPLETED.to_string()
110 } else {
111 status.clone()
112 },
113 finished_at_ms: entry.occurred_at_ms,
114 }),
115 _ => None,
116 })
117 .unwrap_or(AgentSessionLiveness::Suspended)
118}
119
120#[derive(Clone, Copy, Debug, Eq, PartialEq)]
121pub enum SessionBundleExportMode {
122 Local,
123 Sanitized,
124 ReplayOnly,
125}
126
127impl SessionBundleExportMode {
128 pub fn as_str(self) -> &'static str {
129 match self {
130 Self::Local => "local",
131 Self::Sanitized => "sanitized",
132 Self::ReplayOnly => "replay_only",
133 }
134 }
135}
136
137#[derive(Clone, Debug)]
138pub struct SessionBundleExportOptions {
139 pub mode: SessionBundleExportMode,
140 pub include_attachments: bool,
141 pub redaction_policy: RedactionPolicy,
142}
143
144impl Default for SessionBundleExportOptions {
145 fn default() -> Self {
146 Self {
147 mode: SessionBundleExportMode::Sanitized,
148 include_attachments: false,
149 redaction_policy: RedactionPolicy::default(),
150 }
151 }
152}
153
154#[derive(Clone, Debug, Default)]
155pub struct SessionBundleValidationOptions {
156 pub allow_unsafe_secret_markers: bool,
157 pub redaction_policy: RedactionPolicy,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
161#[serde(default)]
162pub struct SessionBundle {
163 #[serde(rename = "_type")]
164 pub type_name: String,
165 pub schema_version: u32,
166 pub bundle_id: String,
167 pub created_at: String,
168 pub producer: BundleProducer,
169 pub source: BundleSource,
170 pub runtime: BundleRuntime,
171 pub workspace: Option<BundleWorkspace>,
172 pub transcript: BundleTranscript,
173 pub tools: BundleTools,
174 pub permissions: Vec<BundlePermission>,
175 pub replay: BundleReplay,
176 pub redaction: RedactionManifest,
177 pub attachments: Vec<BundleAttachment>,
178 pub metadata: BTreeMap<String, JsonValue>,
179}
180
181impl Default for SessionBundle {
182 fn default() -> Self {
183 Self {
184 type_name: SESSION_BUNDLE_TYPE.to_string(),
185 schema_version: SESSION_BUNDLE_SCHEMA_VERSION,
186 bundle_id: String::new(),
187 created_at: String::new(),
188 producer: BundleProducer::default(),
189 source: BundleSource::default(),
190 runtime: BundleRuntime::default(),
191 workspace: None,
192 transcript: BundleTranscript::default(),
193 tools: BundleTools::default(),
194 permissions: Vec::new(),
195 replay: BundleReplay::default(),
196 redaction: RedactionManifest::default(),
197 attachments: Vec::new(),
198 metadata: BTreeMap::new(),
199 }
200 }
201}
202
203#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
204#[serde(default)]
205pub struct BundleProducer {
206 pub name: String,
207 pub version: String,
208 pub schema_id: String,
209}
210
211#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
212#[serde(default)]
213pub struct BundleSource {
214 pub kind: String,
215 pub run_record_id: String,
216 pub workflow_id: String,
217 pub workflow_name: Option<String>,
218 pub task: String,
219 pub status: String,
220 pub started_at: String,
221 pub finished_at: Option<String>,
222 pub persisted_path: Option<String>,
223 pub root_run_id: Option<String>,
224 pub parent_run_id: Option<String>,
225 pub child_run_count: usize,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
229#[serde(default)]
230pub struct BundleRuntime {
231 pub harn_version: String,
232 pub provider_models: Vec<String>,
233 pub usage: Option<BundleUsage>,
234 pub metadata: BTreeMap<String, JsonValue>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
238#[serde(default)]
239pub struct BundleUsage {
240 pub input_tokens: i64,
241 pub output_tokens: i64,
242 pub call_count: i64,
243 pub total_duration_ms: i64,
244 pub total_cost: f64,
245 pub models: Vec<String>,
246}
247
248#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
249#[serde(default)]
250pub struct BundleWorkspace {
251 pub primary: Option<String>,
253 #[serde(default)]
255 pub additional_roots: Vec<BundleMountedRoot>,
256 pub anchored_at: Option<String>,
258 pub policy: String,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
265#[serde(default)]
266pub struct BundleMountedRoot {
267 pub path: String,
268 pub mount_mode: String,
269 pub mounted_at: String,
270}
271
272impl From<&MountedRoot> for BundleMountedRoot {
273 fn from(root: &MountedRoot) -> Self {
274 Self {
275 path: root.path.to_string_lossy().into_owned(),
276 mount_mode: root.mount_mode.as_str().to_string(),
277 mounted_at: root.mounted_at.clone(),
278 }
279 }
280}
281
282impl From<&WorkspaceAnchor> for BundleWorkspace {
283 fn from(anchor: &WorkspaceAnchor) -> Self {
284 Self {
285 primary: Some(anchor.primary.to_string_lossy().into_owned()),
286 additional_roots: anchor.additional_roots.iter().map(Into::into).collect(),
287 anchored_at: Some(anchor.anchored_at.clone()),
288 policy: "safe_identity_only".to_string(),
289 }
290 }
291}
292
293#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
294#[serde(default)]
295pub struct BundleTranscript {
296 pub sections: Vec<BundleTranscriptSection>,
297}
298
299#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
300#[serde(default)]
301pub struct BundleTranscriptSection {
302 pub id: String,
303 pub label: String,
304 pub scope: String,
305 pub location: String,
306 pub summary: Option<String>,
307 pub messages: Vec<JsonValue>,
308 pub events: Vec<JsonValue>,
309 pub assets: Vec<JsonValue>,
310 pub metadata: BTreeMap<String, JsonValue>,
311}
312
313#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
314#[serde(default)]
315pub struct BundleTools {
316 pub schemas: Vec<BundleJsonEntry>,
317 pub calls: Vec<BundleToolCall>,
318}
319
320#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
321#[serde(default)]
322pub struct BundleJsonEntry {
323 pub source: String,
324 pub index: usize,
325 pub value: JsonValue,
326}
327
328#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
329#[serde(default)]
330pub struct BundleToolCall {
331 pub tool_name: String,
332 pub tool_use_id: String,
333 pub args_hash: String,
334 pub result: String,
335 pub is_rejected: bool,
336 pub duration_ms: u64,
337 pub iteration: usize,
338 pub timestamp: String,
339}
340
341impl From<&ToolCallRecord> for BundleToolCall {
342 fn from(record: &ToolCallRecord) -> Self {
343 Self {
344 tool_name: record.tool_name.clone(),
345 tool_use_id: record.tool_use_id.clone(),
346 args_hash: record.args_hash.clone(),
347 result: record.result.clone(),
348 is_rejected: record.is_rejected,
349 duration_ms: record.duration_ms,
350 iteration: record.iteration,
351 timestamp: record.timestamp.clone(),
352 }
353 }
354}
355
356#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(default)]
358pub struct BundlePermission {
359 pub kind: String,
360 pub source: String,
361 pub request_id: Option<String>,
362 pub agent: Option<String>,
363 pub payload: JsonValue,
364}
365
366#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
367#[serde(default)]
368pub struct BundleReplay {
369 pub replay_fixture: Option<ReplayFixture>,
370 pub run_record: Option<JsonValue>,
371 #[serde(skip_serializing_if = "Option::is_none")]
372 pub observability: Option<RunObservabilityRecord>,
373 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
374 #[serde(skip_serializing_if = "Vec::is_empty")]
375 pub worker_snapshots: Vec<BundleWorkerSnapshot>,
376 pub event_log_pointers: Vec<BundleEventLogPointer>,
377 pub transitions: Vec<RunTransitionRecord>,
378 pub checkpoints: Vec<RunCheckpointRecord>,
379 pub trace_spans: Vec<RunTraceSpanRecord>,
380 pub deterministic_events: Vec<BundleJsonEntry>,
381}
382
383#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
384#[serde(default)]
385pub struct BundleWorkerSnapshot {
386 pub worker_id: String,
387 pub worker_name: String,
388 pub status: String,
389 pub snapshot_ref: String,
390 pub source_path: Option<String>,
391 pub value: JsonValue,
392}
393
394#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
395#[serde(default)]
396pub struct MaterializedWorkerSnapshot {
397 pub worker_id: String,
398 pub path: String,
399}
400
401#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
402#[serde(default)]
403pub struct BundleEventLogPointer {
404 pub kind: String,
405 pub topic: Option<String>,
406 pub path: Option<String>,
407 pub location: String,
408 pub available: bool,
409}
410
411#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
412#[serde(default)]
413pub struct RedactionManifest {
414 pub mode: String,
415 pub policy: String,
416 pub placeholder: String,
417 pub entries: Vec<RedactionEntry>,
418 pub unsafe_secret_markers_rejected: bool,
419}
420
421#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
422#[serde(default)]
423pub struct RedactionEntry {
424 pub path: String,
425 pub class: String,
426 pub action: String,
427 pub replacement: Option<String>,
428}
429
430#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
431#[serde(default)]
432pub struct BundleAttachment {
433 pub id: String,
434 pub kind: String,
435 pub title: Option<String>,
436 pub stage: Option<String>,
437 pub text: Option<String>,
438 pub data: Option<JsonValue>,
439 pub metadata: BTreeMap<String, JsonValue>,
440}
441
442#[derive(Debug, Clone, PartialEq, Eq)]
443pub enum SessionBundleError {
444 Decode(String),
445 Encode(String),
446 MissingRequired(String),
447 UnsupportedSchemaVersion { found: u64, supported: u32 },
448 InvalidType { path: String, expected: String },
449 UnsafeSecretMarker { path: String, excerpt: String },
450 MissingRunRecord,
451 MissingSessionEvents { session_id: String },
452}
453
454impl fmt::Display for SessionBundleError {
455 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456 match self {
457 Self::Decode(error) => write!(f, "failed to decode session bundle: {error}"),
458 Self::Encode(error) => write!(f, "failed to encode session bundle: {error}"),
459 Self::MissingRequired(path) => {
460 write!(f, "session bundle is missing required field {path}")
461 }
462 Self::UnsupportedSchemaVersion { found, supported } => write!(
463 f,
464 "unsupported session bundle schema_version {found}; this build supports <= {supported}"
465 ),
466 Self::InvalidType { path, expected } => {
467 write!(f, "session bundle field {path} must be {expected}")
468 }
469 Self::UnsafeSecretMarker { path, excerpt } => write!(
470 f,
471 "session bundle contains an unsafe unredacted secret marker at {path}: {excerpt}"
472 ),
473 Self::MissingRunRecord => write!(f, "session bundle does not include an importable run record"),
474 Self::MissingSessionEvents { session_id } => write!(
475 f,
476 "event log does not contain replayable events for session_id {session_id:?}"
477 ),
478 }
479 }
480}
481
482impl std::error::Error for SessionBundleError {}
483
484pub fn export_run_record_bundle(
485 run: &RunRecord,
486 options: &SessionBundleExportOptions,
487) -> Result<SessionBundle, SessionBundleError> {
488 let run_record_value =
489 serde_json::to_value(run).map_err(|error| SessionBundleError::Encode(error.to_string()))?;
490 let mut bundle = raw_bundle_from_run(run, run_record_value, options.include_attachments)?;
491 let mut bundle_value = serde_json::to_value(&bundle)
492 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
493
494 let mut manifest = RedactionManifest {
495 mode: options.mode.as_str().to_string(),
496 policy: if matches!(options.mode, SessionBundleExportMode::Local) {
497 "none".to_string()
498 } else {
499 "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths".to_string()
500 },
501 placeholder: REDACTED_PLACEHOLDER.to_string(),
502 entries: Vec::new(),
503 unsafe_secret_markers_rejected: !matches!(options.mode, SessionBundleExportMode::Local),
504 };
505
506 if !matches!(options.mode, SessionBundleExportMode::Local) {
507 let redaction_policy = bundle_redaction_policy(&options.redaction_policy);
508 redact_json_with_manifest(
509 &mut bundle_value,
510 "$",
511 &redaction_policy,
512 &mut manifest.entries,
513 );
514 redact_bundle_pointer_paths_json(&mut bundle_value, "$", &mut manifest.entries);
515 }
516 if matches!(options.mode, SessionBundleExportMode::ReplayOnly) {
517 withhold_replay_only_json(&mut bundle_value, "$", &mut manifest.entries);
518 }
519 set_json_path(
520 &mut bundle_value,
521 &["redaction"],
522 serde_json::to_value(&manifest)
523 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
524 );
525 bundle = serde_json::from_value(bundle_value)
526 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
527 Ok(bundle)
528}
529
530pub fn validate_session_bundle_value(
531 value: &JsonValue,
532 options: &SessionBundleValidationOptions,
533) -> Result<SessionBundle, SessionBundleError> {
534 require_field(value, "_type")?;
535 require_field(value, "schema_version")?;
536 require_field(value, "bundle_id")?;
537 require_field(value, "created_at")?;
538 require_field(value, "producer")?;
539 require_field(value, "source")?;
540 require_field(value, "runtime")?;
541 require_field(value, "transcript")?;
542 require_field(value, "tools")?;
543 require_field(value, "permissions")?;
544 require_field(value, "replay")?;
545 require_field(value, "redaction")?;
546 require_field(value, "attachments")?;
547 require_nested_field(value, &["producer", "name"])?;
548 require_nested_field(value, &["producer", "version"])?;
549 require_nested_field(value, &["producer", "schema_id"])?;
550 require_nested_field(value, &["source", "kind"])?;
551 require_nested_field(value, &["source", "run_record_id"])?;
552 require_nested_field(value, &["source", "workflow_id"])?;
553 require_nested_field(value, &["source", "task"])?;
554 require_nested_field(value, &["source", "status"])?;
555 require_nested_field(value, &["runtime", "harn_version"])?;
556 require_nested_field(value, &["runtime", "provider_models"])?;
557 require_nested_field(value, &["transcript", "sections"])?;
558 require_nested_field(value, &["tools", "schemas"])?;
559 require_nested_field(value, &["tools", "calls"])?;
560 require_nested_field(value, &["replay", "event_log_pointers"])?;
561 require_nested_field(value, &["replay", "transitions"])?;
562 require_nested_field(value, &["replay", "checkpoints"])?;
563 require_nested_field(value, &["replay", "trace_spans"])?;
564 require_nested_field(value, &["replay", "deterministic_events"])?;
565 require_nested_field(value, &["redaction", "mode"])?;
566 require_nested_field(value, &["redaction", "policy"])?;
567 require_nested_field(value, &["redaction", "placeholder"])?;
568 require_nested_field(value, &["redaction", "entries"])?;
569 require_nested_field(value, &["redaction", "unsafe_secret_markers_rejected"])?;
570
571 let type_name = value
572 .get("_type")
573 .and_then(JsonValue::as_str)
574 .ok_or_else(|| SessionBundleError::InvalidType {
575 path: "$._type".to_string(),
576 expected: "string".to_string(),
577 })?;
578 if type_name != SESSION_BUNDLE_TYPE {
579 return Err(SessionBundleError::InvalidType {
580 path: "$._type".to_string(),
581 expected: format!("\"{SESSION_BUNDLE_TYPE}\""),
582 });
583 }
584
585 let version = value
586 .get("schema_version")
587 .and_then(JsonValue::as_u64)
588 .ok_or_else(|| SessionBundleError::InvalidType {
589 path: "$.schema_version".to_string(),
590 expected: "positive integer".to_string(),
591 })?;
592 if version == 0 || version > u64::from(SESSION_BUNDLE_SCHEMA_VERSION) {
593 return Err(SessionBundleError::UnsupportedSchemaVersion {
594 found: version,
595 supported: SESSION_BUNDLE_SCHEMA_VERSION,
596 });
597 }
598
599 if !options.allow_unsafe_secret_markers {
600 reject_unredacted_secret_markers(value, "$", &options.redaction_policy)?;
601 }
602
603 serde_json::from_value::<SessionBundle>(value.clone())
604 .map_err(|error| SessionBundleError::Decode(error.to_string()))
605}
606
607pub fn validate_session_bundle_str(
608 content: &str,
609 options: &SessionBundleValidationOptions,
610) -> Result<SessionBundle, SessionBundleError> {
611 let value: JsonValue = serde_json::from_str(content)
612 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
613 validate_session_bundle_value(&value, options)
614}
615
616pub fn import_run_record_value(bundle: &SessionBundle) -> Result<JsonValue, SessionBundleError> {
617 let replay_observability = replay_observability_for_import(&bundle.replay);
618 if let Some(mut run_record) = bundle.replay.run_record.clone() {
619 let should_fill_observability = match run_record.get("observability") {
620 Some(value) => value.is_null(),
621 None => true,
622 };
623 if should_fill_observability {
624 if let (JsonValue::Object(map), Some(observability)) =
625 (&mut run_record, replay_observability.as_ref())
626 {
627 map.insert(
628 "observability".to_string(),
629 serde_json::to_value(observability)
630 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
631 );
632 }
633 }
634 return Ok(run_record);
635 }
636 if let Some(fixture) = &bundle.replay.replay_fixture {
637 let transcript = bundle.transcript.sections.first().map(|section| {
638 json!({
639 "_type": "transcript",
640 "messages": section.messages.clone(),
641 "events": section.events.clone(),
642 "assets": section.assets.clone(),
643 "summary": section.summary.clone(),
644 "metadata": section.metadata.clone(),
645 })
646 });
647 let hitl_questions = bundle
648 .permissions
649 .iter()
650 .filter(|permission| permission.kind == "hitl_question")
651 .map(|permission| permission.payload.clone())
652 .collect::<Vec<_>>();
653 return Ok(json!({
654 "_type": "run_record",
655 "id": bundle.source.run_record_id.clone(),
656 "workflow_id": bundle.source.workflow_id.clone(),
657 "workflow_name": bundle.source.workflow_name.clone(),
658 "task": bundle.source.task.clone(),
659 "status": bundle.source.status.clone(),
660 "started_at": bundle.source.started_at.clone(),
661 "finished_at": bundle.source.finished_at.clone(),
662 "stages": [],
663 "transitions": bundle.replay.transitions.clone(),
664 "checkpoints": bundle.replay.checkpoints.clone(),
665 "pending_nodes": [],
666 "completed_nodes": [],
667 "child_runs": [],
668 "artifacts": [],
669 "handoffs": [],
670 "policy": {},
671 "transcript": transcript,
672 "usage": bundle.runtime.usage.clone(),
673 "replay_fixture": fixture,
674 "observability": replay_observability,
675 "trace_spans": bundle.replay.trace_spans.clone(),
676 "tool_recordings": bundle.tools.calls.clone(),
677 "hitl_questions": hitl_questions,
678 "persona_runtime": [],
679 "metadata": {
680 "imported_from_session_bundle": bundle.bundle_id.clone(),
681 "session_bundle_schema_version": bundle.schema_version,
682 "worker_snapshot_count": bundle.replay.worker_snapshots.len(),
683 }
684 }));
685 }
686 Err(SessionBundleError::MissingRunRecord)
687}
688
689pub fn import_run_record_value_with_materialized_worker_snapshots(
690 bundle: &SessionBundle,
691 materialized: &[MaterializedWorkerSnapshot],
692) -> Result<JsonValue, SessionBundleError> {
693 let mut run_record = import_run_record_value(bundle)?;
694 apply_materialized_worker_snapshot_paths(&mut run_record, materialized);
695 Ok(run_record)
696}
697
698pub fn materialize_worker_snapshots(
699 bundle: &SessionBundle,
700 out_dir: &Path,
701) -> Result<Vec<MaterializedWorkerSnapshot>, SessionBundleError> {
702 if bundle.replay.worker_snapshots.is_empty() {
703 return Ok(Vec::new());
704 }
705 fs::create_dir_all(out_dir).map_err(|error| {
706 SessionBundleError::Encode(format!(
707 "failed to create worker snapshot directory {}: {error}",
708 out_dir.display()
709 ))
710 })?;
711
712 let mut materialized = Vec::new();
713 for (index, snapshot) in bundle.replay.worker_snapshots.iter().enumerate() {
714 let worker_id = if snapshot.worker_id.trim().is_empty() {
715 format!("worker_{index}")
716 } else {
717 snapshot.worker_id.clone()
718 };
719 let path = out_dir.join(worker_snapshot_file_name(&worker_id, index));
720 let value = worker_snapshot_value_for_import(&snapshot.value, &path);
721 let rendered = serde_json::to_string_pretty(&value)
722 .map(|json| format!("{json}\n"))
723 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
724 fs::write(&path, rendered).map_err(|error| {
725 SessionBundleError::Encode(format!(
726 "failed to write worker snapshot {}: {error}",
727 path.display()
728 ))
729 })?;
730 materialized.push(MaterializedWorkerSnapshot {
731 worker_id,
732 path: path.to_string_lossy().into_owned(),
733 });
734 }
735 Ok(materialized)
736}
737
738fn apply_materialized_worker_snapshot_paths(
739 run_record: &mut JsonValue,
740 materialized: &[MaterializedWorkerSnapshot],
741) {
742 if materialized.is_empty() {
743 return;
744 }
745
746 let paths_by_worker_id = materialized
747 .iter()
748 .filter(|snapshot| !snapshot.worker_id.is_empty())
749 .map(|snapshot| (snapshot.worker_id.as_str(), snapshot.path.as_str()))
750 .collect::<BTreeMap<_, _>>();
751 if paths_by_worker_id.is_empty() {
752 return;
753 }
754
755 rewrite_worker_snapshot_paths(run_record.get_mut("child_runs"), &paths_by_worker_id);
756 rewrite_worker_snapshot_paths(
757 run_record
758 .get_mut("observability")
759 .and_then(|observability| observability.get_mut("worker_lineage")),
760 &paths_by_worker_id,
761 );
762}
763
764fn rewrite_worker_snapshot_paths(
765 records: Option<&mut JsonValue>,
766 paths_by_worker_id: &BTreeMap<&str, &str>,
767) {
768 let Some(records) = records.and_then(JsonValue::as_array_mut) else {
769 return;
770 };
771 for record in records {
772 let Some(worker_id) = record.get("worker_id").and_then(JsonValue::as_str) else {
773 continue;
774 };
775 let Some(path) = paths_by_worker_id.get(worker_id) else {
776 continue;
777 };
778 if let JsonValue::Object(map) = record {
779 map.insert(
780 "snapshot_path".to_string(),
781 JsonValue::String((*path).to_string()),
782 );
783 }
784 }
785}
786
787fn replay_observability_for_import(replay: &BundleReplay) -> Option<RunObservabilityRecord> {
788 let mut observability = replay.observability.clone().unwrap_or_default();
789 let has_observability = replay.observability.is_some();
790 let has_verification_outcomes = !replay.verification_outcomes.is_empty();
791 if !has_observability && !has_verification_outcomes {
792 return None;
793 }
794 if observability.schema_version == 0 {
795 observability.schema_version = 4;
796 }
797 if observability.verification_outcomes.is_empty() && has_verification_outcomes {
798 observability.verification_outcomes = replay.verification_outcomes.clone();
799 }
800 Some(observability)
801}
802
803pub fn session_bundle_from_agent_session_events(
804 session_id: &str,
805 events: &[AgentSessionReplayEvent],
806) -> Result<SessionBundle, SessionBundleError> {
807 if events.is_empty() {
808 return Err(SessionBundleError::MissingSessionEvents {
809 session_id: session_id.to_string(),
810 });
811 }
812
813 let stable_id = sanitize_topic_component(session_id);
814 let started_at = rfc3339_from_epoch_ms(events[0].occurred_at_ms);
815 let liveness = agent_session_liveness(events);
820 let finished_at = match &liveness {
821 AgentSessionLiveness::Closed { finished_at_ms, .. } => {
822 Some(rfc3339_from_epoch_ms(*finished_at_ms))
823 }
824 AgentSessionLiveness::Suspended => None,
825 };
826 let status = liveness.status().to_string();
827 let run_id = session_id.to_string();
828 let workflow_id = "agent_session".to_string();
829 let created_at = finished_at.clone().unwrap_or_else(|| started_at.clone());
830 let transcript_events = transcript_events_from_agent_session(events)?;
831 let transcript_messages = transcript_messages_from_agent_session(events);
832 let mut transcript_metadata = BTreeMap::new();
833 transcript_metadata.insert("session_id".to_string(), json!(session_id));
834 transcript_metadata.insert(
835 "source".to_string(),
836 json!("events.sqlite observability.agent_events topic"),
837 );
838
839 let replay_fixture = ReplayFixture {
840 type_name: "replay_fixture".to_string(),
841 id: format!("fixture_from_session_{stable_id}"),
842 source_run_id: run_id.clone(),
843 workflow_id: workflow_id.clone(),
844 workflow_name: Some(format!("Agent session {session_id}")),
845 created_at: created_at.clone(),
846 eval_kind: Some("replay".to_string()),
847 expected_status: status.clone(),
848 ..ReplayFixture::default()
849 };
850
851 Ok(SessionBundle {
852 bundle_id: format!("bundle_from_session_{stable_id}"),
853 created_at,
854 producer: BundleProducer {
855 name: "harn".to_string(),
856 version: env!("CARGO_PKG_VERSION").to_string(),
857 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
858 },
859 source: BundleSource {
860 kind: "event_log_session".to_string(),
861 run_record_id: run_id,
862 workflow_id,
863 workflow_name: Some(format!("Agent session {session_id}")),
864 task: task_from_agent_session(events)
865 .unwrap_or_else(|| format!("Agent session {session_id}")),
866 status,
867 started_at,
868 finished_at,
869 ..BundleSource::default()
870 },
871 runtime: BundleRuntime {
872 harn_version: env!("CARGO_PKG_VERSION").to_string(),
873 ..BundleRuntime::default()
874 },
875 transcript: BundleTranscript {
876 sections: vec![BundleTranscriptSection {
877 id: "agent_events".to_string(),
878 label: "Agent event log".to_string(),
879 scope: "session".to_string(),
880 location: format!(
881 "observability.agent_events.{}",
882 sanitize_topic_component(session_id)
883 ),
884 summary: None,
885 messages: transcript_messages,
886 events: transcript_events,
887 assets: Vec::new(),
888 metadata: transcript_metadata,
889 }],
890 },
891 permissions: permissions_from_agent_session(events),
892 replay: BundleReplay {
893 replay_fixture: Some(replay_fixture),
894 event_log_pointers: vec![BundleEventLogPointer {
895 kind: "agent_events".to_string(),
896 topic: Some(format!(
897 "observability.agent_events.{}",
898 sanitize_topic_component(session_id)
899 )),
900 path: None,
901 location: "events.sqlite".to_string(),
902 available: true,
903 }],
904 deterministic_events: deterministic_events_from_agent_session(events)?,
905 ..BundleReplay::default()
906 },
907 metadata: BTreeMap::from([(
908 SESSION_BUNDLE_LIVENESS_KEY.to_string(),
909 json!(liveness.tag()),
910 )]),
911 ..SessionBundle::default()
912 })
913}
914
915pub fn import_run_record_from_agent_session_events(
916 session_id: &str,
917 events: &[AgentSessionReplayEvent],
918) -> Result<RunRecord, SessionBundleError> {
919 let bundle = session_bundle_from_agent_session_events(session_id, events)?;
920 let run_record = import_run_record_value(&bundle)?;
921 serde_json::from_value(run_record)
922 .map_err(|error| SessionBundleError::Decode(error.to_string()))
923}
924
925fn transcript_events_from_agent_session(
926 events: &[AgentSessionReplayEvent],
927) -> Result<Vec<JsonValue>, SessionBundleError> {
928 events
929 .iter()
930 .map(|entry| {
931 let event = serde_json::to_value(&entry.event)
932 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
933 Ok(json!({
934 "event_id": entry.event_id,
935 "kind": entry.kind,
936 "occurred_at_ms": entry.occurred_at_ms,
937 "event": event,
938 }))
939 })
940 .collect()
941}
942
943fn transcript_messages_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<JsonValue> {
944 events
945 .iter()
946 .filter_map(|entry| match &entry.event {
947 AgentEvent::UserMessage { content, .. } => Some(json!({
948 "role": "user",
949 "content": content,
950 })),
951 AgentEvent::AgentMessageChunk { content, .. } if !content.is_empty() => Some(json!({
952 "role": "assistant",
953 "content": content,
954 })),
955 _ => None,
956 })
957 .collect()
958}
959
960fn permissions_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<BundlePermission> {
961 let mut permissions = Vec::new();
962 for entry in events {
963 if let AgentEvent::HitlRequested {
964 request_id,
965 kind,
966 payload,
967 ..
968 } = &entry.event
969 {
970 permissions.push(BundlePermission {
971 kind: "hitl_question".to_string(),
972 source: "agent_events".to_string(),
973 request_id: Some(request_id.clone()),
974 agent: None,
975 payload: json!({
976 "kind": kind,
977 "payload": payload,
978 "event_id": entry.event_id,
979 "occurred_at_ms": entry.occurred_at_ms,
980 }),
981 });
982 }
983 }
984 permissions
985}
986
987fn deterministic_events_from_agent_session(
988 events: &[AgentSessionReplayEvent],
989) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
990 transcript_events_from_agent_session(events).map(|entries| {
991 entries
992 .into_iter()
993 .enumerate()
994 .map(|(index, value)| BundleJsonEntry {
995 source: "events.sqlite.agent_events".to_string(),
996 index,
997 value,
998 })
999 .collect()
1000 })
1001}
1002
1003fn task_from_agent_session(events: &[AgentSessionReplayEvent]) -> Option<String> {
1004 events.iter().find_map(|entry| match &entry.event {
1005 AgentEvent::UserMessage { content, .. } => user_message_text(content),
1006 _ => None,
1007 })
1008}
1009
1010fn user_message_text(content: &[JsonValue]) -> Option<String> {
1011 let parts = content
1012 .iter()
1013 .filter_map(|value| {
1014 value
1015 .get("text")
1016 .and_then(JsonValue::as_str)
1017 .or_else(|| value.as_str())
1018 .map(str::to_string)
1019 })
1020 .filter(|text| !text.trim().is_empty())
1021 .collect::<Vec<_>>();
1022 if parts.is_empty() {
1023 None
1024 } else {
1025 Some(parts.join("\n"))
1026 }
1027}
1028
1029fn rfc3339_from_epoch_ms(ms: i64) -> String {
1030 DateTime::<Utc>::from_timestamp_millis(ms)
1031 .unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).expect("unix epoch is valid"))
1032 .to_rfc3339_opts(SecondsFormat::Millis, true)
1033}
1034
1035fn raw_bundle_from_run(
1036 run: &RunRecord,
1037 run_record_value: JsonValue,
1038 include_attachments: bool,
1039) -> Result<SessionBundle, SessionBundleError> {
1040 let mut bundle = SessionBundle {
1041 bundle_id: new_id("bundle"),
1042 created_at: now_rfc3339(),
1043 producer: BundleProducer {
1044 name: "harn".to_string(),
1045 version: env!("CARGO_PKG_VERSION").to_string(),
1046 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1047 },
1048 source: BundleSource {
1049 kind: "run_record".to_string(),
1050 run_record_id: run.id.clone(),
1051 workflow_id: run.workflow_id.clone(),
1052 workflow_name: run.workflow_name.clone(),
1053 task: run.task.clone(),
1054 status: run.status.clone(),
1055 started_at: run.started_at.clone(),
1056 finished_at: run.finished_at.clone(),
1057 persisted_path: run.persisted_path.clone(),
1058 root_run_id: run.root_run_id.clone(),
1059 parent_run_id: run.parent_run_id.clone(),
1060 child_run_count: run.child_runs.len(),
1061 },
1062 runtime: BundleRuntime {
1063 harn_version: env!("CARGO_PKG_VERSION").to_string(),
1064 provider_models: run
1065 .usage
1066 .as_ref()
1067 .map(|usage| usage.models.clone())
1068 .unwrap_or_default(),
1069 usage: run.usage.as_ref().map(|usage| BundleUsage {
1070 input_tokens: usage.input_tokens,
1071 output_tokens: usage.output_tokens,
1072 call_count: usage.call_count,
1073 total_duration_ms: usage.total_duration_ms,
1074 total_cost: usage.total_cost,
1075 models: usage.models.clone(),
1076 }),
1077 metadata: BTreeMap::new(),
1078 },
1079 workspace: workspace_from_run(run),
1080 transcript: transcript_from_run(run),
1081 tools: BundleTools {
1082 schemas: tool_schema_entries(run),
1083 calls: run
1084 .tool_recordings
1085 .iter()
1086 .map(BundleToolCall::from)
1087 .collect(),
1088 },
1089 permissions: permissions_from_run(run),
1090 replay: BundleReplay {
1091 replay_fixture: run.replay_fixture.clone(),
1092 run_record: Some(run_record_value),
1093 observability: run.observability.clone(),
1094 verification_outcomes: verification_outcomes_for_run(run),
1095 worker_snapshots: worker_snapshots_from_run(run),
1096 event_log_pointers: event_log_pointers_from_run(run),
1097 transitions: run.transitions.clone(),
1098 checkpoints: run.checkpoints.clone(),
1099 trace_spans: run.trace_spans.clone(),
1100 deterministic_events: deterministic_events_from_run(run)?,
1101 },
1102 redaction: RedactionManifest {
1103 mode: "sanitized".to_string(),
1104 policy: "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths"
1105 .to_string(),
1106 placeholder: REDACTED_PLACEHOLDER.to_string(),
1107 entries: Vec::new(),
1108 unsafe_secret_markers_rejected: true,
1109 },
1110 attachments: if include_attachments {
1111 attachments_from_run(run)
1112 } else {
1113 Vec::new()
1114 },
1115 ..SessionBundle::default()
1116 };
1117 bundle.metadata.insert(
1118 "format_note".to_string(),
1119 json!("Session bundles are portable JSON envelopes; hosted share links should reference sanitized bundles rather than raw run records."),
1120 );
1121 Ok(bundle)
1122}
1123
1124fn verification_outcomes_for_run(run: &RunRecord) -> Vec<RunVerificationOutcomeRecord> {
1125 if let Some(observability) = run.observability.as_ref() {
1126 return observability.verification_outcomes.clone();
1127 }
1128 derive_run_observability(run, run.persisted_path.as_deref().map(Path::new))
1129 .verification_outcomes
1130}
1131
1132fn bundle_redaction_policy(base: &RedactionPolicy) -> RedactionPolicy {
1133 base.clone()
1134 .with_extra_field("persisted_path")
1135 .with_extra_field("primary")
1136 .with_extra_field("run_path")
1137 .with_extra_field("snapshot_ref")
1138 .with_extra_field("snapshot_path")
1139 .with_extra_field("source_path")
1140}
1141
1142fn workspace_from_run(run: &RunRecord) -> Option<BundleWorkspace> {
1143 let anchor = run
1144 .transcript
1145 .as_ref()
1146 .and_then(|transcript| transcript.get("metadata"))
1147 .and_then(anchor_from_transcript_metadata_json)?;
1148 Some(BundleWorkspace::from(&anchor))
1149}
1150
1151fn transcript_from_run(run: &RunRecord) -> BundleTranscript {
1152 let mut sections = Vec::new();
1153 if let Some(transcript) = &run.transcript {
1154 sections.push(transcript_section(
1155 "run",
1156 "Run transcript",
1157 "run",
1158 "$.transcript",
1159 transcript,
1160 ));
1161 }
1162 for (index, stage) in run.stages.iter().enumerate() {
1163 if let Some(transcript) = &stage.transcript {
1164 sections.push(transcript_section(
1165 &stage.id,
1166 &format!("Stage {}", stage.node_id),
1167 "stage",
1168 &format!("$.stages[{index}].transcript"),
1169 transcript,
1170 ));
1171 }
1172 }
1173 BundleTranscript { sections }
1174}
1175
1176fn transcript_section(
1177 id: &str,
1178 label: &str,
1179 scope: &str,
1180 location: &str,
1181 transcript: &JsonValue,
1182) -> BundleTranscriptSection {
1183 BundleTranscriptSection {
1184 id: id.to_string(),
1185 label: label.to_string(),
1186 scope: scope.to_string(),
1187 location: location.to_string(),
1188 summary: transcript
1189 .get("summary")
1190 .and_then(JsonValue::as_str)
1191 .map(str::to_string),
1192 messages: json_array(transcript.get("messages")),
1193 events: json_array(transcript.get("events")),
1194 assets: json_array(transcript.get("assets")),
1195 metadata: transcript
1196 .get("metadata")
1197 .and_then(JsonValue::as_object)
1198 .map(|map| {
1199 map.iter()
1200 .map(|(key, value)| (key.clone(), value.clone()))
1201 .collect()
1202 })
1203 .unwrap_or_default(),
1204 }
1205}
1206
1207fn json_array(value: Option<&JsonValue>) -> Vec<JsonValue> {
1208 value
1209 .and_then(JsonValue::as_array)
1210 .cloned()
1211 .unwrap_or_default()
1212}
1213
1214fn tool_schema_entries(run: &RunRecord) -> Vec<BundleJsonEntry> {
1215 let mut entries = Vec::new();
1216 collect_tool_schema_entries_from_transcript(&mut entries, "run.transcript", &run.transcript);
1217 for stage in &run.stages {
1218 collect_tool_schema_entries_from_transcript(
1219 &mut entries,
1220 &format!("stage.{}.transcript", stage.node_id),
1221 &stage.transcript,
1222 );
1223 if let Some(tools) = stage
1224 .metadata
1225 .get("tool_schemas")
1226 .or_else(|| stage.metadata.get("tools"))
1227 {
1228 entries.push(BundleJsonEntry {
1229 source: format!("stage.{}.metadata", stage.node_id),
1230 index: entries.len(),
1231 value: tools.clone(),
1232 });
1233 }
1234 }
1235 entries
1236}
1237
1238fn collect_tool_schema_entries_from_transcript(
1239 entries: &mut Vec<BundleJsonEntry>,
1240 source: &str,
1241 transcript: &Option<JsonValue>,
1242) {
1243 let Some(transcript) = transcript else {
1244 return;
1245 };
1246 for event in transcript
1247 .get("events")
1248 .and_then(JsonValue::as_array)
1249 .into_iter()
1250 .flatten()
1251 {
1252 let kind = event
1253 .get("type")
1254 .or_else(|| event.get("kind"))
1255 .and_then(JsonValue::as_str)
1256 .unwrap_or_default();
1257 if kind == "tool_schemas" || kind == "tool_schema" {
1258 entries.push(BundleJsonEntry {
1259 source: source.to_string(),
1260 index: entries.len(),
1261 value: event.clone(),
1262 });
1263 }
1264 }
1265}
1266
1267fn permissions_from_run(run: &RunRecord) -> Vec<BundlePermission> {
1268 let mut permissions = run
1269 .hitl_questions
1270 .iter()
1271 .map(permission_from_hitl_question)
1272 .collect::<Vec<_>>();
1273 collect_permission_events(&mut permissions, "run.transcript", &run.transcript);
1274 for stage in &run.stages {
1275 collect_permission_events(
1276 &mut permissions,
1277 &format!("stage.{}.transcript", stage.node_id),
1278 &stage.transcript,
1279 );
1280 if let Some(worker) = stage.metadata.get("worker") {
1281 if let Some(policy) = worker
1282 .get("audit")
1283 .and_then(|audit| audit.get("approval_policy"))
1284 {
1285 permissions.push(BundlePermission {
1286 kind: "approval_policy".to_string(),
1287 source: format!("stage.{}.worker.audit", stage.node_id),
1288 request_id: None,
1289 agent: worker
1290 .get("name")
1291 .and_then(JsonValue::as_str)
1292 .map(str::to_string),
1293 payload: policy.clone(),
1294 });
1295 }
1296 }
1297 }
1298 permissions
1299}
1300
1301fn permission_from_hitl_question(question: &RunHitlQuestionRecord) -> BundlePermission {
1302 BundlePermission {
1303 kind: "hitl_question".to_string(),
1304 source: "run.hitl_questions".to_string(),
1305 request_id: Some(question.request_id.clone()),
1306 agent: if question.agent.is_empty() {
1307 None
1308 } else {
1309 Some(question.agent.clone())
1310 },
1311 payload: serde_json::to_value(question).unwrap_or(JsonValue::Null),
1312 }
1313}
1314
1315fn collect_permission_events(
1316 permissions: &mut Vec<BundlePermission>,
1317 source: &str,
1318 transcript: &Option<JsonValue>,
1319) {
1320 let Some(transcript) = transcript else {
1321 return;
1322 };
1323 for event in transcript
1324 .get("events")
1325 .and_then(JsonValue::as_array)
1326 .into_iter()
1327 .flatten()
1328 {
1329 let kind = event
1330 .get("type")
1331 .or_else(|| event.get("kind"))
1332 .and_then(JsonValue::as_str)
1333 .unwrap_or_default();
1334 if kind.contains("permission") || kind.contains("approval") || kind.starts_with("hitl_") {
1335 permissions.push(BundlePermission {
1336 kind: kind.to_string(),
1337 source: source.to_string(),
1338 request_id: event
1339 .get("request_id")
1340 .or_else(|| event.get("id"))
1341 .and_then(JsonValue::as_str)
1342 .map(str::to_string),
1343 agent: event
1344 .get("agent")
1345 .and_then(JsonValue::as_str)
1346 .map(str::to_string),
1347 payload: event.clone(),
1348 });
1349 }
1350 }
1351}
1352
1353fn event_log_pointers_from_run(run: &RunRecord) -> Vec<BundleEventLogPointer> {
1354 let mut pointers = Vec::new();
1355 if let Some(observability) = &run.observability {
1356 for pointer in &observability.transcript_pointers {
1357 pointers.push(BundleEventLogPointer {
1358 kind: pointer.kind.clone(),
1359 topic: None,
1360 path: pointer.path.clone(),
1361 location: pointer.location.clone(),
1362 available: pointer.available,
1363 });
1364 }
1365 for worker in &observability.worker_lineage {
1366 if let Some(session_id) = &worker.session_id {
1367 pointers.push(BundleEventLogPointer {
1368 kind: "agent_events".to_string(),
1369 topic: Some(format!("observability.agent_events.{session_id}")),
1370 path: worker.snapshot_path.clone(),
1371 location: format!("worker.{}.session", worker.worker_id),
1372 available: worker.snapshot_path.is_some(),
1373 });
1374 }
1375 }
1376 }
1377 pointers
1378}
1379
1380fn worker_snapshots_from_run(run: &RunRecord) -> Vec<BundleWorkerSnapshot> {
1381 let mut snapshots = Vec::new();
1382 let mut seen_paths = BTreeSet::new();
1383 for child in &run.child_runs {
1384 let Some(path) = child.snapshot_path.as_deref() else {
1385 continue;
1386 };
1387 if !seen_paths.insert(path.to_string()) {
1388 continue;
1389 }
1390 if let Some(snapshot) = worker_snapshot_from_path(
1391 &child.worker_id,
1392 &child.worker_name,
1393 &child.status,
1394 Path::new(path),
1395 ) {
1396 snapshots.push(snapshot);
1397 }
1398 }
1399 if let Some(observability) = run.observability.as_ref() {
1400 for worker in &observability.worker_lineage {
1401 let Some(path) = worker.snapshot_path.as_deref() else {
1402 continue;
1403 };
1404 if !seen_paths.insert(path.to_string()) {
1405 continue;
1406 }
1407 if let Some(snapshot) = worker_snapshot_from_path(
1408 &worker.worker_id,
1409 &worker.worker_name,
1410 &worker.status,
1411 Path::new(path),
1412 ) {
1413 snapshots.push(snapshot);
1414 }
1415 }
1416 }
1417 snapshots
1418}
1419
1420fn worker_snapshot_from_path(
1421 worker_id: &str,
1422 worker_name: &str,
1423 status: &str,
1424 path: &Path,
1425) -> Option<BundleWorkerSnapshot> {
1426 let content = fs::read_to_string(path).ok()?;
1427 let value = serde_json::from_str::<JsonValue>(&content).ok()?;
1428 Some(BundleWorkerSnapshot {
1429 worker_id: if worker_id.is_empty() {
1430 value
1431 .get("id")
1432 .and_then(JsonValue::as_str)
1433 .unwrap_or_default()
1434 .to_string()
1435 } else {
1436 worker_id.to_string()
1437 },
1438 worker_name: if worker_name.is_empty() {
1439 value
1440 .get("name")
1441 .and_then(JsonValue::as_str)
1442 .unwrap_or("worker")
1443 .to_string()
1444 } else {
1445 worker_name.to_string()
1446 },
1447 status: if status.is_empty() {
1448 value
1449 .get("status")
1450 .and_then(JsonValue::as_str)
1451 .unwrap_or_default()
1452 .to_string()
1453 } else {
1454 status.to_string()
1455 },
1456 snapshot_ref: value
1457 .get("suspension")
1458 .and_then(|value| value.get("snapshot_ref"))
1459 .and_then(JsonValue::as_str)
1460 .or_else(|| value.get("snapshot_path").and_then(JsonValue::as_str))
1461 .unwrap_or_else(|| path.to_str().unwrap_or_default())
1462 .to_string(),
1463 source_path: Some(path.to_string_lossy().into_owned()),
1464 value,
1465 })
1466}
1467
1468fn worker_snapshot_file_name(worker_id: &str, index: usize) -> String {
1469 let component = sanitize_topic_component(worker_id);
1470 let component = if component.is_empty() {
1471 format!("worker_{index}")
1472 } else {
1473 component
1474 };
1475 format!("{component}.json")
1476}
1477
1478fn worker_snapshot_value_for_import(value: &JsonValue, path: &Path) -> JsonValue {
1479 let mut value = value.clone();
1480 let path = path.to_string_lossy().into_owned();
1481 if let JsonValue::Object(map) = &mut value {
1482 map.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
1483 if let Some(JsonValue::Object(suspension)) = map.get_mut("suspension") {
1484 suspension.insert("snapshot_ref".to_string(), JsonValue::String(path));
1485 }
1486 }
1487 value
1488}
1489
1490fn deterministic_events_from_run(
1491 run: &RunRecord,
1492) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1493 let mut events = Vec::new();
1494 for (index, transition) in run.transitions.iter().enumerate() {
1495 events.push(BundleJsonEntry {
1496 source: "run.transitions".to_string(),
1497 index,
1498 value: serde_json::to_value(transition)
1499 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1500 });
1501 }
1502 for (index, checkpoint) in run.checkpoints.iter().enumerate() {
1503 events.push(BundleJsonEntry {
1504 source: "run.checkpoints".to_string(),
1505 index,
1506 value: serde_json::to_value(checkpoint)
1507 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1508 });
1509 }
1510 Ok(events)
1511}
1512
1513fn attachments_from_run(run: &RunRecord) -> Vec<BundleAttachment> {
1514 run.artifacts
1515 .iter()
1516 .map(|artifact| BundleAttachment {
1517 id: artifact.id.clone(),
1518 kind: artifact.kind.clone(),
1519 title: artifact.title.clone(),
1520 stage: artifact.stage.clone(),
1521 text: artifact.text.clone(),
1522 data: artifact.data.clone(),
1523 metadata: artifact.metadata.clone(),
1524 })
1525 .collect()
1526}
1527
1528fn redact_json_with_manifest(
1529 value: &mut JsonValue,
1530 path: &str,
1531 policy: &RedactionPolicy,
1532 entries: &mut Vec<RedactionEntry>,
1533) {
1534 match value {
1535 JsonValue::Object(map) => {
1536 let keys = map.keys().cloned().collect::<Vec<_>>();
1537 for key in keys {
1538 let child_path = json_path_child(path, &key);
1539 if policy.field_is_sensitive(&key) {
1540 map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1541 entries.push(RedactionEntry {
1542 path: child_path,
1543 class: "sensitive_field".to_string(),
1544 action: "replaced".to_string(),
1545 replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1546 });
1547 } else if let Some(child) = map.get_mut(&key) {
1548 redact_json_with_manifest(child, &child_path, policy, entries);
1549 }
1550 }
1551 }
1552 JsonValue::Array(items) => {
1553 for (index, item) in items.iter_mut().enumerate() {
1554 redact_json_with_manifest(item, &format!("{path}[{index}]"), policy, entries);
1555 }
1556 }
1557 JsonValue::String(text) => {
1558 let redacted = policy.redact_string(text);
1559 if let Cow::Owned(replacement) = redacted {
1560 let manifest_replacement = replacement.clone();
1565 *text = replacement;
1566 entries.push(RedactionEntry {
1567 path: path.to_string(),
1568 class: "secret_pattern_or_url".to_string(),
1569 action: "replaced".to_string(),
1570 replacement: Some(manifest_replacement),
1571 });
1572 }
1573 }
1574 _ => {}
1575 }
1576}
1577
1578fn redact_bundle_pointer_paths_json(
1579 value: &mut JsonValue,
1580 path: &str,
1581 entries: &mut Vec<RedactionEntry>,
1582) {
1583 match value {
1584 JsonValue::Object(map) => {
1585 let keys = map.keys().cloned().collect::<Vec<_>>();
1586 for key in keys {
1587 let child_path = json_path_child(path, &key);
1588 if key == "path" && bundle_pointer_path_should_redact(&child_path) {
1589 if !map.get(&key).is_some_and(JsonValue::is_null) {
1590 map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1591 entries.push(RedactionEntry {
1592 path: child_path,
1593 class: "local_pointer_path".to_string(),
1594 action: "replaced".to_string(),
1595 replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1596 });
1597 }
1598 } else if let Some(child) = map.get_mut(&key) {
1599 redact_bundle_pointer_paths_json(child, &child_path, entries);
1600 }
1601 }
1602 }
1603 JsonValue::Array(items) => {
1604 for (index, item) in items.iter_mut().enumerate() {
1605 redact_bundle_pointer_paths_json(item, &format!("{path}[{index}]"), entries);
1606 }
1607 }
1608 _ => {}
1609 }
1610}
1611
1612fn bundle_pointer_path_should_redact(path: &str) -> bool {
1613 path.contains(".event_log_pointers[") || path.contains(".transcript_pointers[")
1614}
1615
1616fn withhold_replay_only_json(value: &mut JsonValue, path: &str, entries: &mut Vec<RedactionEntry>) {
1617 match value {
1618 JsonValue::Object(map) => {
1619 let keys = map.keys().cloned().collect::<Vec<_>>();
1620 for key in keys {
1621 let child_path = json_path_child(path, &key);
1622 if replay_only_field_is_prompt_payload(&key) {
1623 if !map.get(&key).is_some_and(JsonValue::is_null) {
1624 map.insert(key, JsonValue::String(REPLAY_ONLY_PLACEHOLDER.to_string()));
1625 entries.push(RedactionEntry {
1626 path: child_path,
1627 class: "prompt_or_tool_payload".to_string(),
1628 action: "withheld".to_string(),
1629 replacement: Some(REPLAY_ONLY_PLACEHOLDER.to_string()),
1630 });
1631 }
1632 } else if let Some(child) = map.get_mut(&key) {
1633 withhold_replay_only_json(child, &child_path, entries);
1634 }
1635 }
1636 }
1637 JsonValue::Array(items) => {
1638 for (index, item) in items.iter_mut().enumerate() {
1639 withhold_replay_only_json(item, &format!("{path}[{index}]"), entries);
1640 }
1641 }
1642 _ => {}
1643 }
1644}
1645
1646fn replay_only_field_is_prompt_payload(key: &str) -> bool {
1647 matches!(
1648 key,
1649 "args"
1650 | "arguments"
1651 | "blocks"
1652 | "content"
1653 | "data"
1654 | "private_reasoning"
1655 | "prompt"
1656 | "raw_input"
1657 | "raw_output"
1658 | "result"
1659 | "response_text"
1660 | "summary"
1661 | "system"
1662 | "system_prompt"
1663 | "task"
1664 | "text"
1665 | "thinking"
1666 | "visible_text"
1667 )
1668}
1669
1670fn set_json_path(value: &mut JsonValue, path: &[&str], replacement: JsonValue) {
1671 let Some((head, tail)) = path.split_first() else {
1672 *value = replacement;
1673 return;
1674 };
1675 if tail.is_empty() {
1676 if let JsonValue::Object(map) = value {
1677 map.insert((*head).to_string(), replacement);
1678 }
1679 return;
1680 }
1681 if let JsonValue::Object(map) = value {
1682 if let Some(child) = map.get_mut(*head) {
1683 set_json_path(child, tail, replacement);
1684 }
1685 }
1686}
1687
1688fn require_field(value: &JsonValue, field: &str) -> Result<(), SessionBundleError> {
1689 if value.get(field).is_some() {
1690 Ok(())
1691 } else {
1692 Err(SessionBundleError::MissingRequired(format!("$.{field}")))
1693 }
1694}
1695
1696fn require_nested_field(value: &JsonValue, path: &[&str]) -> Result<(), SessionBundleError> {
1697 let mut current = value;
1698 for segment in path {
1699 current = current
1700 .get(*segment)
1701 .ok_or_else(|| SessionBundleError::MissingRequired(json_path_from_segments(path)))?;
1702 }
1703 Ok(())
1704}
1705
1706fn json_path_from_segments(path: &[&str]) -> String {
1707 path.iter().fold("$".to_string(), |parent, segment| {
1708 json_path_child(&parent, segment)
1709 })
1710}
1711
1712fn reject_unredacted_secret_markers(
1713 value: &JsonValue,
1714 path: &str,
1715 policy: &RedactionPolicy,
1716) -> Result<(), SessionBundleError> {
1717 match value {
1718 JsonValue::Object(map) => {
1719 for (key, child) in map {
1720 reject_unredacted_secret_markers(child, &json_path_child(path, key), policy)?;
1721 }
1722 }
1723 JsonValue::Array(items) => {
1724 for (index, item) in items.iter().enumerate() {
1725 reject_unredacted_secret_markers(item, &format!("{path}[{index}]"), policy)?;
1726 }
1727 }
1728 JsonValue::String(text) => {
1729 if matches!(policy.redact_string(text), Cow::Owned(_)) {
1730 return Err(SessionBundleError::UnsafeSecretMarker {
1731 path: path.to_string(),
1732 excerpt: secret_excerpt(text),
1733 });
1734 }
1735 }
1736 _ => {}
1737 }
1738 Ok(())
1739}
1740
1741fn secret_excerpt(text: &str) -> String {
1742 let excerpt = text.chars().take(80).collect::<String>();
1743 if text.chars().count() > 80 {
1744 format!("{excerpt}...")
1745 } else {
1746 excerpt
1747 }
1748}
1749
1750fn json_path_child(parent: &str, key: &str) -> String {
1751 if key
1752 .chars()
1753 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1754 {
1755 format!("{parent}.{key}")
1756 } else {
1757 format!(
1758 "{parent}[{}]",
1759 serde_json::to_string(key).unwrap_or_default()
1760 )
1761 }
1762}