1use std::collections::{BTreeMap, BTreeSet};
10use std::fmt;
11use std::fs;
12use std::path::Path;
13
14use chrono::{DateTime, SecondsFormat, Utc};
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value as JsonValue};
17
18use crate::agent_events::AgentEvent;
19use crate::event_log::sanitize_topic_component;
20use crate::orchestration::{
21 derive_run_observability, new_id, now_rfc3339, AgentSessionReplayEvent, ReplayFixture,
22 RunCheckpointRecord, RunChildRecord, RunExecutionRecord, RunHitlQuestionRecord,
23 RunObservabilityRecord, RunRecord, RunTraceSpanRecord, RunTransitionRecord,
24 RunVerificationOutcomeRecord, RunWorkerLineageRecord, ToolCallRecord,
25};
26use crate::redact::{json_path_child, RedactionEntry, RedactionPolicy, REDACTED_PLACEHOLDER};
27use crate::workspace_anchor::{anchor_from_transcript_metadata_json, MountedRoot, WorkspaceAnchor};
28
29mod schema;
30pub use schema::{session_bundle_schema, session_bundle_schema_pretty};
31
32#[cfg(test)]
33mod tests;
34
35pub const SESSION_BUNDLE_TYPE: &str = "harn_session_bundle";
36pub const SESSION_BUNDLE_SCHEMA_VERSION: u32 = 1;
37pub const SESSION_BUNDLE_SCHEMA_ID: &str = "https://harnlang.com/schemas/session-bundle.v1.json";
38pub const REPLAY_ONLY_PLACEHOLDER: &str = "[withheld]";
39
40pub const SESSION_BUNDLE_STATUS_SUSPENDED: &str = "suspended";
46
47pub const SESSION_BUNDLE_STATUS_COMPLETED: &str = "completed";
50
51pub const SESSION_BUNDLE_LIVENESS_KEY: &str = "session_liveness";
55
56#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum AgentSessionLiveness {
66 Closed { status: String, finished_at_ms: i64 },
69 Suspended,
72}
73
74impl AgentSessionLiveness {
75 pub fn status(&self) -> &str {
77 match self {
78 AgentSessionLiveness::Closed { status, .. } => status,
79 AgentSessionLiveness::Suspended => SESSION_BUNDLE_STATUS_SUSPENDED,
80 }
81 }
82
83 pub fn tag(&self) -> &'static str {
85 match self {
86 AgentSessionLiveness::Closed { .. } => "closed",
87 AgentSessionLiveness::Suspended => "suspended",
88 }
89 }
90
91 pub fn is_suspended(&self) -> bool {
93 matches!(self, AgentSessionLiveness::Suspended)
94 }
95}
96
97pub fn agent_session_liveness(events: &[AgentSessionReplayEvent]) -> AgentSessionLiveness {
103 events
104 .iter()
105 .rev()
106 .find_map(|entry| match &entry.event {
107 AgentEvent::SessionClosed { status, .. } => Some(AgentSessionLiveness::Closed {
108 status: if status.is_empty() {
109 SESSION_BUNDLE_STATUS_COMPLETED.to_string()
110 } else {
111 status.clone()
112 },
113 finished_at_ms: entry.occurred_at_ms,
114 }),
115 _ => None,
116 })
117 .unwrap_or(AgentSessionLiveness::Suspended)
118}
119
120#[derive(Clone, Copy, Debug, Eq, PartialEq)]
121pub enum SessionBundleExportMode {
122 Local,
123 Sanitized,
124 ReplayOnly,
125}
126
127impl SessionBundleExportMode {
128 pub fn as_str(self) -> &'static str {
129 match self {
130 Self::Local => "local",
131 Self::Sanitized => "sanitized",
132 Self::ReplayOnly => "replay_only",
133 }
134 }
135}
136
137#[derive(Clone, Debug)]
138pub struct SessionBundleExportOptions {
139 pub mode: SessionBundleExportMode,
140 pub include_attachments: bool,
141 pub redaction_policy: RedactionPolicy,
142}
143
144impl Default for SessionBundleExportOptions {
145 fn default() -> Self {
146 Self {
147 mode: SessionBundleExportMode::Sanitized,
148 include_attachments: false,
149 redaction_policy: RedactionPolicy::default(),
150 }
151 }
152}
153
154#[derive(Clone, Debug, Default)]
155pub struct SessionBundleValidationOptions {
156 pub allow_unsafe_secret_markers: bool,
157 pub redaction_policy: RedactionPolicy,
158}
159
160#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
161#[serde(default)]
162pub struct SessionBundle {
163 #[serde(rename = "_type")]
164 pub type_name: String,
165 pub schema_version: u32,
166 pub bundle_id: String,
167 pub created_at: String,
168 pub producer: BundleProducer,
169 pub source: BundleSource,
170 pub runtime: BundleRuntime,
171 pub workspace: Option<BundleWorkspace>,
172 pub transcript: BundleTranscript,
173 pub tools: BundleTools,
174 pub permissions: Vec<BundlePermission>,
175 pub replay: BundleReplay,
176 pub redaction: RedactionManifest,
177 pub attachments: Vec<BundleAttachment>,
178 pub metadata: BTreeMap<String, JsonValue>,
179}
180
181impl Default for SessionBundle {
182 fn default() -> Self {
183 Self {
184 type_name: SESSION_BUNDLE_TYPE.to_string(),
185 schema_version: SESSION_BUNDLE_SCHEMA_VERSION,
186 bundle_id: String::new(),
187 created_at: String::new(),
188 producer: BundleProducer::default(),
189 source: BundleSource::default(),
190 runtime: BundleRuntime::default(),
191 workspace: None,
192 transcript: BundleTranscript::default(),
193 tools: BundleTools::default(),
194 permissions: Vec::new(),
195 replay: BundleReplay::default(),
196 redaction: RedactionManifest::default(),
197 attachments: Vec::new(),
198 metadata: BTreeMap::new(),
199 }
200 }
201}
202
203#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
204#[serde(default)]
205pub struct BundleProducer {
206 pub name: String,
207 pub version: String,
208 pub schema_id: String,
209}
210
211#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
212#[serde(default)]
213pub struct BundleSource {
214 pub kind: String,
215 pub run_record_id: String,
216 pub workflow_id: String,
217 pub workflow_name: Option<String>,
218 pub task: String,
219 pub status: String,
220 pub started_at: String,
221 pub finished_at: Option<String>,
222 pub persisted_path: Option<String>,
223 pub root_run_id: Option<String>,
224 pub parent_run_id: Option<String>,
225 pub child_run_count: usize,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
229#[serde(default)]
230pub struct BundleRuntime {
231 pub harn_version: String,
232 pub provider_models: Vec<String>,
233 pub usage: Option<BundleUsage>,
234 pub metadata: BTreeMap<String, JsonValue>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
238#[serde(default)]
239pub struct BundleUsage {
240 pub input_tokens: i64,
241 pub output_tokens: i64,
242 pub call_count: i64,
243 pub total_duration_ms: i64,
244 pub total_cost: f64,
245 pub models: Vec<String>,
246}
247
248#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
249#[serde(default)]
250pub struct BundleWorkspace {
251 pub primary: Option<String>,
253 #[serde(default)]
255 pub additional_roots: Vec<BundleMountedRoot>,
256 pub anchored_at: Option<String>,
258 pub policy: String,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
265#[serde(default)]
266pub struct BundleMountedRoot {
267 pub path: String,
268 pub mount_mode: String,
269 pub mounted_at: String,
270}
271
272impl From<&MountedRoot> for BundleMountedRoot {
273 fn from(root: &MountedRoot) -> Self {
274 Self {
275 path: root.path.to_string_lossy().into_owned(),
276 mount_mode: root.mount_mode.as_str().to_string(),
277 mounted_at: root.mounted_at.clone(),
278 }
279 }
280}
281
282impl From<&WorkspaceAnchor> for BundleWorkspace {
283 fn from(anchor: &WorkspaceAnchor) -> Self {
284 Self {
285 primary: Some(anchor.primary.to_string_lossy().into_owned()),
286 additional_roots: anchor.additional_roots.iter().map(Into::into).collect(),
287 anchored_at: Some(anchor.anchored_at.clone()),
288 policy: "safe_identity_only".to_string(),
289 }
290 }
291}
292
293#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
294#[serde(default)]
295pub struct BundleTranscript {
296 pub sections: Vec<BundleTranscriptSection>,
297}
298
299#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
300#[serde(default)]
301pub struct BundleTranscriptSection {
302 pub id: String,
303 pub label: String,
304 pub scope: String,
305 pub location: String,
306 pub summary: Option<String>,
307 pub messages: Vec<JsonValue>,
308 pub events: Vec<JsonValue>,
309 pub assets: Vec<JsonValue>,
310 pub metadata: BTreeMap<String, JsonValue>,
311}
312
313#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
314#[serde(default)]
315pub struct BundleTools {
316 pub schemas: Vec<BundleJsonEntry>,
317 pub calls: Vec<BundleToolCall>,
318}
319
320#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
321#[serde(default)]
322pub struct BundleJsonEntry {
323 pub source: String,
324 pub index: usize,
325 pub value: JsonValue,
326}
327
328#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
329#[serde(default)]
330pub struct BundleToolCall {
331 pub tool_name: String,
332 pub tool_use_id: String,
333 pub args_hash: String,
334 pub result: String,
335 pub is_rejected: bool,
336 pub duration_ms: u64,
337 pub iteration: usize,
338 pub timestamp: String,
339}
340
341impl From<&ToolCallRecord> for BundleToolCall {
342 fn from(record: &ToolCallRecord) -> Self {
343 Self {
344 tool_name: record.tool_name.clone(),
345 tool_use_id: record.tool_use_id.clone(),
346 args_hash: record.args_hash.clone(),
347 result: record.result.clone(),
348 is_rejected: record.is_rejected,
349 duration_ms: record.duration_ms,
350 iteration: record.iteration,
351 timestamp: record.timestamp.clone(),
352 }
353 }
354}
355
356#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
357#[serde(default)]
358pub struct BundlePermission {
359 pub kind: String,
360 pub source: String,
361 pub request_id: Option<String>,
362 pub agent: Option<String>,
363 pub payload: JsonValue,
364}
365
366#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
369#[serde(default)]
370pub struct BundleReplay {
371 pub replay_fixture: Option<ReplayFixture>,
372 pub run_record: Option<JsonValue>,
373 #[serde(skip_serializing_if = "Option::is_none")]
374 pub observability: Option<RunObservabilityRecord>,
375 pub verification_outcomes: Vec<RunVerificationOutcomeRecord>,
376 #[serde(skip_serializing_if = "Vec::is_empty")]
377 pub worker_snapshots: Vec<BundleWorkerSnapshot>,
378 pub event_log_pointers: Vec<BundleEventLogPointer>,
379 pub transitions: Vec<RunTransitionRecord>,
380 pub checkpoints: Vec<RunCheckpointRecord>,
381 pub trace_spans: Vec<RunTraceSpanRecord>,
382 pub deterministic_events: Vec<BundleJsonEntry>,
383}
384
385#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
386#[serde(default)]
387pub struct BundleWorkerSnapshot {
388 pub worker_id: String,
389 pub worker_name: String,
390 pub status: String,
391 pub snapshot_ref: String,
392 pub source_path: Option<String>,
393 pub value: JsonValue,
394}
395
396#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
397#[serde(default)]
398pub struct MaterializedWorkerSnapshot {
399 pub worker_id: String,
400 pub path: String,
401}
402
403#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
404#[serde(default)]
405pub struct BundleEventLogPointer {
406 pub kind: String,
407 pub topic: Option<String>,
408 pub path: Option<String>,
409 pub location: String,
410 pub available: bool,
411}
412
413#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
414#[serde(default)]
415pub struct RedactionManifest {
416 pub mode: String,
417 pub policy: String,
418 pub placeholder: String,
419 pub entries: Vec<RedactionEntry>,
420 pub unsafe_secret_markers_rejected: bool,
421}
422
423#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
424#[serde(default)]
425pub struct BundleAttachment {
426 pub id: String,
427 pub kind: String,
428 pub title: Option<String>,
429 pub stage: Option<String>,
430 pub text: Option<String>,
431 pub data: Option<JsonValue>,
432 pub metadata: BTreeMap<String, JsonValue>,
433}
434
435#[derive(Debug, Clone, PartialEq, Eq)]
436pub enum SessionBundleError {
437 Decode(String),
438 Encode(String),
439 MissingRequired(String),
440 UnsupportedSchemaVersion { found: u64, supported: u32 },
441 InvalidType { path: String, expected: String },
442 UnsupportedCheckpointState { status: String },
443 UnsafeSecretMarker { path: String, excerpt: String },
444 MissingRunRecord,
445 MissingSessionEvents { session_id: String },
446}
447
448impl fmt::Display for SessionBundleError {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 match self {
451 Self::Decode(error) => write!(f, "failed to decode session bundle: {error}"),
452 Self::Encode(error) => write!(f, "failed to encode session bundle: {error}"),
453 Self::MissingRequired(path) => {
454 write!(f, "session bundle is missing required field {path}")
455 }
456 Self::UnsupportedSchemaVersion { found, supported } => write!(
457 f,
458 "unsupported session bundle schema_version {found}; this build supports <= {supported}"
459 ),
460 Self::InvalidType { path, expected } => {
461 write!(f, "session bundle field {path} must be {expected}")
462 }
463 Self::UnsupportedCheckpointState { status } => write!(
464 f,
465 "worker snapshot status {status:?} is not checkpointable; suspend the worker at a turn boundary first"
466 ),
467 Self::UnsafeSecretMarker { path, excerpt } => write!(
468 f,
469 "session bundle contains an unsafe unredacted secret marker at {path}: {excerpt}"
470 ),
471 Self::MissingRunRecord => write!(f, "session bundle does not include an importable run record"),
472 Self::MissingSessionEvents { session_id } => write!(
473 f,
474 "event log does not contain replayable events for session_id {session_id:?}"
475 ),
476 }
477 }
478}
479
480impl std::error::Error for SessionBundleError {}
481
482pub fn export_run_record_bundle(
483 run: &RunRecord,
484 options: &SessionBundleExportOptions,
485) -> Result<SessionBundle, SessionBundleError> {
486 let run_record_value =
487 serde_json::to_value(run).map_err(|error| SessionBundleError::Encode(error.to_string()))?;
488 let mut bundle = raw_bundle_from_run(run, run_record_value, options.include_attachments)?;
489 let mut bundle_value = serde_json::to_value(&bundle)
490 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
491
492 let mut manifest = RedactionManifest {
493 mode: options.mode.as_str().to_string(),
494 policy: if matches!(options.mode, SessionBundleExportMode::Local) {
495 "none".to_string()
496 } else {
497 "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths".to_string()
498 },
499 placeholder: REDACTED_PLACEHOLDER.to_string(),
500 entries: Vec::new(),
501 unsafe_secret_markers_rejected: !matches!(options.mode, SessionBundleExportMode::Local),
502 };
503
504 if !matches!(options.mode, SessionBundleExportMode::Local) {
505 let redaction_policy = bundle_redaction_policy(&options.redaction_policy);
506 manifest
507 .entries
508 .extend(redaction_policy.redact_json_manifest(&mut bundle_value));
509 redact_bundle_pointer_paths_json(&mut bundle_value, "$", &mut manifest.entries);
510 }
511 if matches!(options.mode, SessionBundleExportMode::ReplayOnly) {
512 withhold_replay_only_json(&mut bundle_value, "$", &mut manifest.entries);
513 }
514 set_json_path(
515 &mut bundle_value,
516 &["redaction"],
517 serde_json::to_value(&manifest)
518 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
519 );
520 bundle = serde_json::from_value(bundle_value)
521 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
522 Ok(bundle)
523}
524
525pub fn export_worker_snapshot_bundle(
526 snapshot_path: &Path,
527 options: &SessionBundleExportOptions,
528) -> Result<SessionBundle, SessionBundleError> {
529 let run = run_record_from_worker_snapshot(snapshot_path)?;
530 export_run_record_bundle(&run, options)
531}
532
533pub fn run_record_from_worker_snapshot(
534 snapshot_path: &Path,
535) -> Result<RunRecord, SessionBundleError> {
536 let content = fs::read_to_string(snapshot_path).map_err(|error| {
537 SessionBundleError::Decode(format!(
538 "failed to read worker snapshot {}: {error}",
539 snapshot_path.display()
540 ))
541 })?;
542 let value: JsonValue = serde_json::from_str(&content)
543 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
544 run_record_from_worker_snapshot_value(snapshot_path, value)
545}
546
547pub fn validate_session_bundle_value(
548 value: &JsonValue,
549 options: &SessionBundleValidationOptions,
550) -> Result<SessionBundle, SessionBundleError> {
551 require_field(value, "_type")?;
552 require_field(value, "schema_version")?;
553 require_field(value, "bundle_id")?;
554 require_field(value, "created_at")?;
555 require_field(value, "producer")?;
556 require_field(value, "source")?;
557 require_field(value, "runtime")?;
558 require_field(value, "transcript")?;
559 require_field(value, "tools")?;
560 require_field(value, "permissions")?;
561 require_field(value, "replay")?;
562 require_field(value, "redaction")?;
563 require_field(value, "attachments")?;
564 require_nested_field(value, &["producer", "name"])?;
565 require_nested_field(value, &["producer", "version"])?;
566 require_nested_field(value, &["producer", "schema_id"])?;
567 require_nested_field(value, &["source", "kind"])?;
568 require_nested_field(value, &["source", "run_record_id"])?;
569 require_nested_field(value, &["source", "workflow_id"])?;
570 require_nested_field(value, &["source", "task"])?;
571 require_nested_field(value, &["source", "status"])?;
572 require_nested_field(value, &["runtime", "harn_version"])?;
573 require_nested_field(value, &["runtime", "provider_models"])?;
574 require_nested_field(value, &["transcript", "sections"])?;
575 require_nested_field(value, &["tools", "schemas"])?;
576 require_nested_field(value, &["tools", "calls"])?;
577 require_nested_field(value, &["replay", "event_log_pointers"])?;
578 require_nested_field(value, &["replay", "transitions"])?;
579 require_nested_field(value, &["replay", "checkpoints"])?;
580 require_nested_field(value, &["replay", "trace_spans"])?;
581 require_nested_field(value, &["replay", "deterministic_events"])?;
582 require_nested_field(value, &["redaction", "mode"])?;
583 require_nested_field(value, &["redaction", "policy"])?;
584 require_nested_field(value, &["redaction", "placeholder"])?;
585 require_nested_field(value, &["redaction", "entries"])?;
586 require_nested_field(value, &["redaction", "unsafe_secret_markers_rejected"])?;
587
588 let type_name = value
589 .get("_type")
590 .and_then(JsonValue::as_str)
591 .ok_or_else(|| SessionBundleError::InvalidType {
592 path: "$._type".to_string(),
593 expected: "string".to_string(),
594 })?;
595 if type_name != SESSION_BUNDLE_TYPE {
596 return Err(SessionBundleError::InvalidType {
597 path: "$._type".to_string(),
598 expected: format!("\"{SESSION_BUNDLE_TYPE}\""),
599 });
600 }
601
602 let version = value
603 .get("schema_version")
604 .and_then(JsonValue::as_u64)
605 .ok_or_else(|| SessionBundleError::InvalidType {
606 path: "$.schema_version".to_string(),
607 expected: "positive integer".to_string(),
608 })?;
609 if version == 0 || version > u64::from(SESSION_BUNDLE_SCHEMA_VERSION) {
610 return Err(SessionBundleError::UnsupportedSchemaVersion {
611 found: version,
612 supported: SESSION_BUNDLE_SCHEMA_VERSION,
613 });
614 }
615
616 if !options.allow_unsafe_secret_markers {
617 if let Some(found) = options.redaction_policy.find_unredacted_secret(value) {
618 return Err(SessionBundleError::UnsafeSecretMarker {
619 path: found.path,
620 excerpt: found.excerpt,
621 });
622 }
623 }
624
625 serde_json::from_value::<SessionBundle>(value.clone())
626 .map_err(|error| SessionBundleError::Decode(error.to_string()))
627}
628
629pub fn validate_session_bundle_str(
630 content: &str,
631 options: &SessionBundleValidationOptions,
632) -> Result<SessionBundle, SessionBundleError> {
633 let value: JsonValue = serde_json::from_str(content)
634 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
635 validate_session_bundle_value(&value, options)
636}
637
638pub fn import_run_record_value(bundle: &SessionBundle) -> Result<JsonValue, SessionBundleError> {
639 let replay_observability = replay_observability_for_import(&bundle.replay);
640 if let Some(mut run_record) = bundle.replay.run_record.clone() {
641 let should_fill_observability = match run_record.get("observability") {
642 Some(value) => value.is_null(),
643 None => true,
644 };
645 if should_fill_observability {
646 if let (JsonValue::Object(map), Some(observability)) =
647 (&mut run_record, replay_observability.as_ref())
648 {
649 map.insert(
650 "observability".to_string(),
651 serde_json::to_value(observability)
652 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
653 );
654 }
655 }
656 return Ok(run_record);
657 }
658 if let Some(fixture) = &bundle.replay.replay_fixture {
659 let transcript = bundle.transcript.sections.first().map(|section| {
660 json!({
661 "_type": "transcript",
662 "messages": section.messages.clone(),
663 "events": section.events.clone(),
664 "assets": section.assets.clone(),
665 "summary": section.summary.clone(),
666 "metadata": section.metadata.clone(),
667 })
668 });
669 let hitl_questions = bundle
670 .permissions
671 .iter()
672 .filter(|permission| permission.kind == "hitl_question")
673 .map(|permission| permission.payload.clone())
674 .collect::<Vec<_>>();
675 return Ok(json!({
676 "_type": "run_record",
677 "id": bundle.source.run_record_id.clone(),
678 "workflow_id": bundle.source.workflow_id.clone(),
679 "workflow_name": bundle.source.workflow_name.clone(),
680 "task": bundle.source.task.clone(),
681 "status": bundle.source.status.clone(),
682 "started_at": bundle.source.started_at.clone(),
683 "finished_at": bundle.source.finished_at.clone(),
684 "stages": [],
685 "transitions": bundle.replay.transitions.clone(),
686 "checkpoints": bundle.replay.checkpoints.clone(),
687 "pending_nodes": [],
688 "completed_nodes": [],
689 "child_runs": [],
690 "artifacts": [],
691 "handoffs": [],
692 "policy": {},
693 "transcript": transcript,
694 "usage": bundle.runtime.usage.clone(),
695 "replay_fixture": fixture,
696 "observability": replay_observability,
697 "trace_spans": bundle.replay.trace_spans.clone(),
698 "tool_recordings": bundle.tools.calls.clone(),
699 "hitl_questions": hitl_questions,
700 "persona_runtime": [],
701 "metadata": {
702 "imported_from_session_bundle": bundle.bundle_id.clone(),
703 "session_bundle_schema_version": bundle.schema_version,
704 "worker_snapshot_count": bundle.replay.worker_snapshots.len(),
705 }
706 }));
707 }
708 Err(SessionBundleError::MissingRunRecord)
709}
710
711pub fn import_run_record_value_with_materialized_worker_snapshots(
712 bundle: &SessionBundle,
713 materialized: &[MaterializedWorkerSnapshot],
714) -> Result<JsonValue, SessionBundleError> {
715 let mut run_record = import_run_record_value(bundle)?;
716 apply_materialized_worker_snapshot_paths(&mut run_record, materialized);
717 Ok(run_record)
718}
719
720fn run_record_from_worker_snapshot_value(
721 snapshot_path: &Path,
722 value: JsonValue,
723) -> Result<RunRecord, SessionBundleError> {
724 require_worker_snapshot_marker(&value)?;
725 let status =
726 snapshot_string(&value, "status").ok_or_else(|| missing_worker_snapshot_field("status"))?;
727 if status != "suspended" {
728 return Err(SessionBundleError::UnsupportedCheckpointState { status });
729 }
730 require_worker_snapshot_object_field(&value, "config")?;
731 require_worker_snapshot_object_field(&value, "suspension")?;
732
733 let snapshot_path_string = snapshot_path.to_string_lossy().into_owned();
734 let worker_id =
735 snapshot_string(&value, "id").ok_or_else(|| missing_worker_snapshot_field("id"))?;
736 let worker_name = snapshot_string(&value, "name").unwrap_or_else(|| "worker".to_string());
737 let task = snapshot_string(&value, "task").unwrap_or_else(|| "Suspended worker".to_string());
738 let suspended_at = snapshot_pointer_string(&value, &["suspension", "suspended_at"]);
739 let started_at = snapshot_string(&value, "started_at")
740 .or_else(|| snapshot_string(&value, "created_at"))
741 .or_else(|| suspended_at.clone())
742 .unwrap_or_else(now_rfc3339);
743 let finished_at = snapshot_string(&value, "finished_at");
744 let session_id = snapshot_pointer_string(&value, &["config", "spec", "session_id"])
745 .or_else(|| snapshot_pointer_string(&value, &["audit", "session_id"]));
746 let parent_session_id =
747 snapshot_pointer_string(&value, &["config", "spec", "parent_session_id"])
748 .or_else(|| snapshot_pointer_string(&value, &["audit", "parent_session_id"]));
749 let child_run_id = snapshot_string(&value, "child_run_id");
750 let child_run_path = snapshot_string(&value, "child_run_path");
751 let execution = value
752 .get("execution")
753 .cloned()
754 .and_then(|value| serde_json::from_value::<RunExecutionRecord>(value).ok());
755
756 let child = RunChildRecord {
757 worker_id: worker_id.clone(),
758 worker_name: worker_name.clone(),
759 parent_stage_id: snapshot_string(&value, "parent_stage_id"),
760 session_id: session_id.clone(),
761 parent_session_id: parent_session_id.clone(),
762 mutation_scope: snapshot_pointer_string(&value, &["audit", "mutation_scope"]),
763 approval_policy: None,
764 task: task.clone(),
765 request: value.get("request").cloned(),
766 provenance: value.get("provenance").cloned(),
767 status: status.clone(),
768 started_at: started_at.clone(),
769 finished_at: finished_at.clone(),
770 run_id: child_run_id.clone(),
771 run_path: child_run_path.clone(),
772 snapshot_path: Some(snapshot_path_string.clone()),
773 execution,
774 };
775 let lineage = RunWorkerLineageRecord {
776 worker_id: worker_id.clone(),
777 worker_name,
778 parent_stage_id: child.parent_stage_id.clone(),
779 task: task.clone(),
780 status: status.clone(),
781 session_id,
782 parent_session_id,
783 run_id: child_run_id,
784 run_path: child_run_path,
785 snapshot_path: Some(snapshot_path_string.clone()),
786 };
787
788 let run_id = format!("checkpoint_{}", sanitize_topic_component(&worker_id));
789 let workflow_id = "worker_snapshot_checkpoint".to_string();
790 let workflow_name = Some("Worker snapshot checkpoint".to_string());
791 let checkpoint_id = format!("{run_id}_turn_boundary");
792 let checkpointed_at = suspended_at
793 .or_else(|| finished_at.clone())
794 .unwrap_or_else(|| started_at.clone());
795
796 Ok(RunRecord {
797 type_name: "run_record".to_string(),
798 id: run_id.clone(),
799 workflow_id: workflow_id.clone(),
800 workflow_name: workflow_name.clone(),
801 task,
802 status,
803 started_at,
804 finished_at,
805 checkpoints: vec![RunCheckpointRecord {
806 id: checkpoint_id,
807 ready_nodes: vec!["worker_snapshot_resume".to_string()],
808 completed_nodes: Vec::new(),
809 last_stage_id: None,
810 persisted_at: checkpointed_at.clone(),
811 reason: "suspended_worker_snapshot_turn_boundary".to_string(),
812 }],
813 child_runs: vec![child],
814 transcript: value.get("transcript").cloned(),
815 replay_fixture: Some(ReplayFixture {
816 type_name: "replay_fixture".to_string(),
817 id: format!("fixture_{run_id}"),
818 source_run_id: run_id,
819 workflow_id,
820 workflow_name,
821 created_at: checkpointed_at,
822 eval_kind: Some("worker_snapshot_checkpoint".to_string()),
823 expected_status: "suspended".to_string(),
824 ..ReplayFixture::default()
825 }),
826 observability: Some(RunObservabilityRecord {
827 schema_version: 4,
828 worker_lineage: vec![lineage],
829 ..RunObservabilityRecord::default()
830 }),
831 metadata: BTreeMap::from([
832 ("checkpoint_kind".to_string(), json!("worker_snapshot")),
833 (
834 "worker_snapshot_path".to_string(),
835 json!(snapshot_path_string),
836 ),
837 ]),
838 ..RunRecord::default()
839 })
840}
841
842fn snapshot_string(value: &JsonValue, key: &str) -> Option<String> {
843 value
844 .get(key)
845 .and_then(JsonValue::as_str)
846 .filter(|value| !value.is_empty())
847 .map(str::to_string)
848}
849
850fn missing_worker_snapshot_field(field: &str) -> SessionBundleError {
851 SessionBundleError::MissingRequired(format!("$.worker_snapshot.{field}"))
852}
853
854fn require_worker_snapshot_marker(value: &JsonValue) -> Result<(), SessionBundleError> {
855 match snapshot_string(value, "_type").as_deref() {
856 Some("worker_snapshot") => Ok(()),
857 Some(_) => Err(SessionBundleError::InvalidType {
858 path: "$.worker_snapshot._type".to_string(),
859 expected: "\"worker_snapshot\"".to_string(),
860 }),
861 None => Err(missing_worker_snapshot_field("_type")),
862 }
863}
864
865fn require_worker_snapshot_object_field(
866 value: &JsonValue,
867 field: &str,
868) -> Result<(), SessionBundleError> {
869 match value.get(field) {
870 Some(JsonValue::Object(_)) => Ok(()),
871 Some(_) => Err(SessionBundleError::InvalidType {
872 path: format!("$.worker_snapshot.{field}"),
873 expected: "object".to_string(),
874 }),
875 None => Err(missing_worker_snapshot_field(field)),
876 }
877}
878
879fn snapshot_pointer_string(value: &JsonValue, path: &[&str]) -> Option<String> {
880 let mut current = value;
881 for component in path {
882 current = current.get(*component)?;
883 }
884 current
885 .as_str()
886 .filter(|value| !value.is_empty())
887 .map(str::to_string)
888}
889
890pub fn materialize_worker_snapshots(
891 bundle: &SessionBundle,
892 out_dir: &Path,
893) -> Result<Vec<MaterializedWorkerSnapshot>, SessionBundleError> {
894 if bundle.replay.worker_snapshots.is_empty() {
895 return Ok(Vec::new());
896 }
897 fs::create_dir_all(out_dir).map_err(|error| {
898 SessionBundleError::Encode(format!(
899 "failed to create worker snapshot directory {}: {error}",
900 out_dir.display()
901 ))
902 })?;
903
904 let mut materialized = Vec::new();
905 for (index, snapshot) in bundle.replay.worker_snapshots.iter().enumerate() {
906 let worker_id = if snapshot.worker_id.trim().is_empty() {
907 format!("worker_{index}")
908 } else {
909 snapshot.worker_id.clone()
910 };
911 let path = out_dir.join(worker_snapshot_file_name(&worker_id, index));
912 let value = worker_snapshot_value_for_import(&snapshot.value, &path);
913 let rendered = serde_json::to_string_pretty(&value)
914 .map(|json| format!("{json}\n"))
915 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
916 fs::write(&path, rendered).map_err(|error| {
917 SessionBundleError::Encode(format!(
918 "failed to write worker snapshot {}: {error}",
919 path.display()
920 ))
921 })?;
922 materialized.push(MaterializedWorkerSnapshot {
923 worker_id,
924 path: path.to_string_lossy().into_owned(),
925 });
926 }
927 Ok(materialized)
928}
929
930fn apply_materialized_worker_snapshot_paths(
931 run_record: &mut JsonValue,
932 materialized: &[MaterializedWorkerSnapshot],
933) {
934 if materialized.is_empty() {
935 return;
936 }
937
938 let paths_by_worker_id = materialized
939 .iter()
940 .filter(|snapshot| !snapshot.worker_id.is_empty())
941 .map(|snapshot| (snapshot.worker_id.as_str(), snapshot.path.as_str()))
942 .collect::<BTreeMap<_, _>>();
943 if paths_by_worker_id.is_empty() {
944 return;
945 }
946
947 rewrite_worker_snapshot_paths(run_record.get_mut("child_runs"), &paths_by_worker_id);
948 rewrite_worker_snapshot_paths(
949 run_record
950 .get_mut("observability")
951 .and_then(|observability| observability.get_mut("worker_lineage")),
952 &paths_by_worker_id,
953 );
954 rewrite_checkpoint_metadata_snapshot_path(run_record, materialized);
955}
956
957fn rewrite_worker_snapshot_paths(
958 records: Option<&mut JsonValue>,
959 paths_by_worker_id: &BTreeMap<&str, &str>,
960) {
961 let Some(records) = records.and_then(JsonValue::as_array_mut) else {
962 return;
963 };
964 for record in records {
965 let Some(worker_id) = record.get("worker_id").and_then(JsonValue::as_str) else {
966 continue;
967 };
968 let Some(path) = paths_by_worker_id.get(worker_id) else {
969 continue;
970 };
971 if let JsonValue::Object(map) = record {
972 map.insert(
973 "snapshot_path".to_string(),
974 JsonValue::String((*path).to_string()),
975 );
976 }
977 }
978}
979
980fn rewrite_checkpoint_metadata_snapshot_path(
981 run_record: &mut JsonValue,
982 materialized: &[MaterializedWorkerSnapshot],
983) {
984 let Some(snapshot) = materialized.first() else {
985 return;
986 };
987 let Some(metadata) = run_record
988 .get_mut("metadata")
989 .and_then(JsonValue::as_object_mut)
990 else {
991 return;
992 };
993 if metadata.contains_key("worker_snapshot_path") {
994 metadata.insert(
995 "worker_snapshot_path".to_string(),
996 JsonValue::String(snapshot.path.clone()),
997 );
998 }
999}
1000
1001fn replay_observability_for_import(replay: &BundleReplay) -> Option<RunObservabilityRecord> {
1002 let mut observability = replay.observability.clone().unwrap_or_default();
1003 let has_observability = replay.observability.is_some();
1004 let has_verification_outcomes = !replay.verification_outcomes.is_empty();
1005 if !has_observability && !has_verification_outcomes {
1006 return None;
1007 }
1008 if observability.schema_version == 0 {
1009 observability.schema_version = 4;
1010 }
1011 if observability.verification_outcomes.is_empty() && has_verification_outcomes {
1012 observability.verification_outcomes = replay.verification_outcomes.clone();
1013 }
1014 Some(observability)
1015}
1016
1017pub fn session_bundle_from_agent_session_events(
1018 session_id: &str,
1019 events: &[AgentSessionReplayEvent],
1020) -> Result<SessionBundle, SessionBundleError> {
1021 if events.is_empty() {
1022 return Err(SessionBundleError::MissingSessionEvents {
1023 session_id: session_id.to_string(),
1024 });
1025 }
1026
1027 let stable_id = sanitize_topic_component(session_id);
1028 let started_at = rfc3339_from_epoch_ms(events[0].occurred_at_ms);
1029 let liveness = agent_session_liveness(events);
1034 let finished_at = match &liveness {
1035 AgentSessionLiveness::Closed { finished_at_ms, .. } => {
1036 Some(rfc3339_from_epoch_ms(*finished_at_ms))
1037 }
1038 AgentSessionLiveness::Suspended => None,
1039 };
1040 let status = liveness.status().to_string();
1041 let run_id = session_id.to_string();
1042 let workflow_id = "agent_session".to_string();
1043 let created_at = finished_at.clone().unwrap_or_else(|| started_at.clone());
1044 let transcript_events = transcript_events_from_agent_session(events)?;
1045 let transcript_messages = transcript_messages_from_agent_session(events);
1046 let mut transcript_metadata = BTreeMap::new();
1047 transcript_metadata.insert("session_id".to_string(), json!(session_id));
1048 transcript_metadata.insert(
1049 "source".to_string(),
1050 json!("events.sqlite observability.agent_events topic"),
1051 );
1052
1053 let replay_fixture = ReplayFixture {
1054 type_name: "replay_fixture".to_string(),
1055 id: format!("fixture_from_session_{stable_id}"),
1056 source_run_id: run_id.clone(),
1057 workflow_id: workflow_id.clone(),
1058 workflow_name: Some(format!("Agent session {session_id}")),
1059 created_at: created_at.clone(),
1060 eval_kind: Some("replay".to_string()),
1061 expected_status: status.clone(),
1062 ..ReplayFixture::default()
1063 };
1064
1065 Ok(SessionBundle {
1066 bundle_id: format!("bundle_from_session_{stable_id}"),
1067 created_at,
1068 producer: BundleProducer {
1069 name: "harn".to_string(),
1070 version: env!("CARGO_PKG_VERSION").to_string(),
1071 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1072 },
1073 source: BundleSource {
1074 kind: "event_log_session".to_string(),
1075 run_record_id: run_id,
1076 workflow_id,
1077 workflow_name: Some(format!("Agent session {session_id}")),
1078 task: task_from_agent_session(events)
1079 .unwrap_or_else(|| format!("Agent session {session_id}")),
1080 status,
1081 started_at,
1082 finished_at,
1083 ..BundleSource::default()
1084 },
1085 runtime: BundleRuntime {
1086 harn_version: env!("CARGO_PKG_VERSION").to_string(),
1087 ..BundleRuntime::default()
1088 },
1089 transcript: BundleTranscript {
1090 sections: vec![BundleTranscriptSection {
1091 id: "agent_events".to_string(),
1092 label: "Agent event log".to_string(),
1093 scope: "session".to_string(),
1094 location: format!(
1095 "observability.agent_events.{}",
1096 sanitize_topic_component(session_id)
1097 ),
1098 summary: None,
1099 messages: transcript_messages,
1100 events: transcript_events,
1101 assets: Vec::new(),
1102 metadata: transcript_metadata,
1103 }],
1104 },
1105 permissions: permissions_from_agent_session(events),
1106 replay: BundleReplay {
1107 replay_fixture: Some(replay_fixture),
1108 event_log_pointers: vec![BundleEventLogPointer {
1109 kind: "agent_events".to_string(),
1110 topic: Some(format!(
1111 "observability.agent_events.{}",
1112 sanitize_topic_component(session_id)
1113 )),
1114 path: None,
1115 location: "events.sqlite".to_string(),
1116 available: true,
1117 }],
1118 deterministic_events: deterministic_events_from_agent_session(events)?,
1119 ..BundleReplay::default()
1120 },
1121 metadata: BTreeMap::from([(
1122 SESSION_BUNDLE_LIVENESS_KEY.to_string(),
1123 json!(liveness.tag()),
1124 )]),
1125 ..SessionBundle::default()
1126 })
1127}
1128
1129pub fn import_run_record_from_agent_session_events(
1130 session_id: &str,
1131 events: &[AgentSessionReplayEvent],
1132) -> Result<RunRecord, SessionBundleError> {
1133 let bundle = session_bundle_from_agent_session_events(session_id, events)?;
1134 let run_record = import_run_record_value(&bundle)?;
1135 serde_json::from_value(run_record)
1136 .map_err(|error| SessionBundleError::Decode(error.to_string()))
1137}
1138
1139fn transcript_events_from_agent_session(
1140 events: &[AgentSessionReplayEvent],
1141) -> Result<Vec<JsonValue>, SessionBundleError> {
1142 events
1143 .iter()
1144 .map(|entry| {
1145 let event = serde_json::to_value(&entry.event)
1146 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
1147 Ok(json!({
1148 "event_id": entry.event_id,
1149 "kind": entry.kind,
1150 "occurred_at_ms": entry.occurred_at_ms,
1151 "event": event,
1152 }))
1153 })
1154 .collect()
1155}
1156
1157fn transcript_messages_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<JsonValue> {
1158 events
1159 .iter()
1160 .filter_map(|entry| match &entry.event {
1161 AgentEvent::UserMessage { content, .. } => Some(json!({
1162 "role": "user",
1163 "content": content,
1164 })),
1165 AgentEvent::AgentMessageChunk { content, .. } if !content.is_empty() => Some(json!({
1166 "role": "assistant",
1167 "content": content,
1168 })),
1169 _ => None,
1170 })
1171 .collect()
1172}
1173
1174fn permissions_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<BundlePermission> {
1175 let mut permissions = Vec::new();
1176 for entry in events {
1177 if let AgentEvent::HitlRequested {
1178 request_id,
1179 kind,
1180 payload,
1181 ..
1182 } = &entry.event
1183 {
1184 permissions.push(BundlePermission {
1185 kind: "hitl_question".to_string(),
1186 source: "agent_events".to_string(),
1187 request_id: Some(request_id.clone()),
1188 agent: None,
1189 payload: json!({
1190 "kind": kind,
1191 "payload": payload,
1192 "event_id": entry.event_id,
1193 "occurred_at_ms": entry.occurred_at_ms,
1194 }),
1195 });
1196 }
1197 }
1198 permissions
1199}
1200
1201fn deterministic_events_from_agent_session(
1202 events: &[AgentSessionReplayEvent],
1203) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1204 transcript_events_from_agent_session(events).map(|entries| {
1205 entries
1206 .into_iter()
1207 .enumerate()
1208 .map(|(index, value)| BundleJsonEntry {
1209 source: "events.sqlite.agent_events".to_string(),
1210 index,
1211 value,
1212 })
1213 .collect()
1214 })
1215}
1216
1217fn task_from_agent_session(events: &[AgentSessionReplayEvent]) -> Option<String> {
1218 events.iter().find_map(|entry| match &entry.event {
1219 AgentEvent::UserMessage { content, .. } => user_message_text(content),
1220 _ => None,
1221 })
1222}
1223
1224fn user_message_text(content: &[JsonValue]) -> Option<String> {
1225 let parts = content
1226 .iter()
1227 .filter_map(|value| {
1228 value
1229 .get("text")
1230 .and_then(JsonValue::as_str)
1231 .or_else(|| value.as_str())
1232 .map(str::to_string)
1233 })
1234 .filter(|text| !text.trim().is_empty())
1235 .collect::<Vec<_>>();
1236 if parts.is_empty() {
1237 None
1238 } else {
1239 Some(parts.join("\n"))
1240 }
1241}
1242
1243fn rfc3339_from_epoch_ms(ms: i64) -> String {
1244 DateTime::<Utc>::from_timestamp_millis(ms)
1245 .unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).expect("unix epoch is valid"))
1246 .to_rfc3339_opts(SecondsFormat::Millis, true)
1247}
1248
1249fn raw_bundle_from_run(
1250 run: &RunRecord,
1251 run_record_value: JsonValue,
1252 include_attachments: bool,
1253) -> Result<SessionBundle, SessionBundleError> {
1254 let mut bundle = SessionBundle {
1255 bundle_id: new_id("bundle"),
1256 created_at: now_rfc3339(),
1257 producer: BundleProducer {
1258 name: "harn".to_string(),
1259 version: env!("CARGO_PKG_VERSION").to_string(),
1260 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
1261 },
1262 source: BundleSource {
1263 kind: "run_record".to_string(),
1264 run_record_id: run.id.clone(),
1265 workflow_id: run.workflow_id.clone(),
1266 workflow_name: run.workflow_name.clone(),
1267 task: run.task.clone(),
1268 status: run.status.clone(),
1269 started_at: run.started_at.clone(),
1270 finished_at: run.finished_at.clone(),
1271 persisted_path: run.persisted_path.clone(),
1272 root_run_id: run.root_run_id.clone(),
1273 parent_run_id: run.parent_run_id.clone(),
1274 child_run_count: run.child_runs.len(),
1275 },
1276 runtime: BundleRuntime {
1277 harn_version: env!("CARGO_PKG_VERSION").to_string(),
1278 provider_models: run
1279 .usage
1280 .as_ref()
1281 .map(|usage| usage.models.clone())
1282 .unwrap_or_default(),
1283 usage: run.usage.as_ref().map(|usage| BundleUsage {
1284 input_tokens: usage.input_tokens,
1285 output_tokens: usage.output_tokens,
1286 call_count: usage.call_count,
1287 total_duration_ms: usage.total_duration_ms,
1288 total_cost: usage.total_cost,
1289 models: usage.models.clone(),
1290 }),
1291 metadata: BTreeMap::new(),
1292 },
1293 workspace: workspace_from_run(run),
1294 transcript: transcript_from_run(run),
1295 tools: BundleTools {
1296 schemas: tool_schema_entries(run),
1297 calls: run
1298 .tool_recordings
1299 .iter()
1300 .map(BundleToolCall::from)
1301 .collect(),
1302 },
1303 permissions: permissions_from_run(run),
1304 replay: BundleReplay {
1305 replay_fixture: run.replay_fixture.clone(),
1306 run_record: Some(run_record_value),
1307 observability: run.observability.clone(),
1308 verification_outcomes: verification_outcomes_for_run(run),
1309 worker_snapshots: worker_snapshots_from_run(run),
1310 event_log_pointers: event_log_pointers_from_run(run),
1311 transitions: run.transitions.clone(),
1312 checkpoints: run.checkpoints.clone(),
1313 trace_spans: run.trace_spans.clone(),
1314 deterministic_events: deterministic_events_from_run(run)?,
1315 },
1316 redaction: RedactionManifest {
1317 mode: "sanitized".to_string(),
1318 policy: "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths"
1319 .to_string(),
1320 placeholder: REDACTED_PLACEHOLDER.to_string(),
1321 entries: Vec::new(),
1322 unsafe_secret_markers_rejected: true,
1323 },
1324 attachments: if include_attachments {
1325 attachments_from_run(run)
1326 } else {
1327 Vec::new()
1328 },
1329 ..SessionBundle::default()
1330 };
1331 bundle.metadata.insert(
1332 "format_note".to_string(),
1333 json!("Session bundles are portable JSON envelopes; hosted share links should reference sanitized bundles rather than raw run records."),
1334 );
1335 Ok(bundle)
1336}
1337
1338fn verification_outcomes_for_run(run: &RunRecord) -> Vec<RunVerificationOutcomeRecord> {
1339 if let Some(observability) = run.observability.as_ref() {
1340 return observability.verification_outcomes.clone();
1341 }
1342 derive_run_observability(run, run.persisted_path.as_deref().map(Path::new))
1343 .verification_outcomes
1344}
1345
1346fn bundle_redaction_policy(base: &RedactionPolicy) -> RedactionPolicy {
1347 base.clone()
1348 .with_extra_field("persisted_path")
1349 .with_extra_field("primary")
1350 .with_extra_field("run_path")
1351 .with_extra_field("snapshot_ref")
1352 .with_extra_field("snapshot_path")
1353 .with_extra_field("source_path")
1354}
1355
1356fn workspace_from_run(run: &RunRecord) -> Option<BundleWorkspace> {
1357 let anchor = run
1358 .transcript
1359 .as_ref()
1360 .and_then(|transcript| transcript.get("metadata"))
1361 .and_then(anchor_from_transcript_metadata_json)?;
1362 Some(BundleWorkspace::from(&anchor))
1363}
1364
1365fn transcript_from_run(run: &RunRecord) -> BundleTranscript {
1366 let mut sections = Vec::new();
1367 if let Some(transcript) = &run.transcript {
1368 sections.push(transcript_section(
1369 "run",
1370 "Run transcript",
1371 "run",
1372 "$.transcript",
1373 transcript,
1374 ));
1375 }
1376 for (index, stage) in run.stages.iter().enumerate() {
1377 if let Some(transcript) = &stage.transcript {
1378 sections.push(transcript_section(
1379 &stage.id,
1380 &format!("Stage {}", stage.node_id),
1381 "stage",
1382 &format!("$.stages[{index}].transcript"),
1383 transcript,
1384 ));
1385 }
1386 }
1387 BundleTranscript { sections }
1388}
1389
1390fn transcript_section(
1391 id: &str,
1392 label: &str,
1393 scope: &str,
1394 location: &str,
1395 transcript: &JsonValue,
1396) -> BundleTranscriptSection {
1397 BundleTranscriptSection {
1398 id: id.to_string(),
1399 label: label.to_string(),
1400 scope: scope.to_string(),
1401 location: location.to_string(),
1402 summary: transcript
1403 .get("summary")
1404 .and_then(JsonValue::as_str)
1405 .map(str::to_string),
1406 messages: json_array(transcript.get("messages")),
1407 events: json_array(transcript.get("events")),
1408 assets: json_array(transcript.get("assets")),
1409 metadata: transcript
1410 .get("metadata")
1411 .and_then(JsonValue::as_object)
1412 .map(|map| {
1413 map.iter()
1414 .map(|(key, value)| (key.clone(), value.clone()))
1415 .collect()
1416 })
1417 .unwrap_or_default(),
1418 }
1419}
1420
1421fn json_array(value: Option<&JsonValue>) -> Vec<JsonValue> {
1422 value
1423 .and_then(JsonValue::as_array)
1424 .cloned()
1425 .unwrap_or_default()
1426}
1427
1428fn tool_schema_entries(run: &RunRecord) -> Vec<BundleJsonEntry> {
1429 let mut entries = Vec::new();
1430 collect_tool_schema_entries_from_transcript(&mut entries, "run.transcript", &run.transcript);
1431 for stage in &run.stages {
1432 collect_tool_schema_entries_from_transcript(
1433 &mut entries,
1434 &format!("stage.{}.transcript", stage.node_id),
1435 &stage.transcript,
1436 );
1437 if let Some(tools) = stage
1438 .metadata
1439 .get("tool_schemas")
1440 .or_else(|| stage.metadata.get("tools"))
1441 {
1442 entries.push(BundleJsonEntry {
1443 source: format!("stage.{}.metadata", stage.node_id),
1444 index: entries.len(),
1445 value: tools.clone(),
1446 });
1447 }
1448 }
1449 entries
1450}
1451
1452fn collect_tool_schema_entries_from_transcript(
1453 entries: &mut Vec<BundleJsonEntry>,
1454 source: &str,
1455 transcript: &Option<JsonValue>,
1456) {
1457 let Some(transcript) = transcript else {
1458 return;
1459 };
1460 for event in transcript
1461 .get("events")
1462 .and_then(JsonValue::as_array)
1463 .into_iter()
1464 .flatten()
1465 {
1466 let kind = event
1467 .get("type")
1468 .or_else(|| event.get("kind"))
1469 .and_then(JsonValue::as_str)
1470 .unwrap_or_default();
1471 if kind == "tool_schemas" || kind == "tool_schema" {
1472 entries.push(BundleJsonEntry {
1473 source: source.to_string(),
1474 index: entries.len(),
1475 value: event.clone(),
1476 });
1477 }
1478 }
1479}
1480
1481fn permissions_from_run(run: &RunRecord) -> Vec<BundlePermission> {
1482 let mut permissions = run
1483 .hitl_questions
1484 .iter()
1485 .map(permission_from_hitl_question)
1486 .collect::<Vec<_>>();
1487 collect_permission_events(&mut permissions, "run.transcript", &run.transcript);
1488 for stage in &run.stages {
1489 collect_permission_events(
1490 &mut permissions,
1491 &format!("stage.{}.transcript", stage.node_id),
1492 &stage.transcript,
1493 );
1494 if let Some(worker) = stage.metadata.get("worker") {
1495 if let Some(policy) = worker
1496 .get("audit")
1497 .and_then(|audit| audit.get("approval_policy"))
1498 {
1499 permissions.push(BundlePermission {
1500 kind: "approval_policy".to_string(),
1501 source: format!("stage.{}.worker.audit", stage.node_id),
1502 request_id: None,
1503 agent: worker
1504 .get("name")
1505 .and_then(JsonValue::as_str)
1506 .map(str::to_string),
1507 payload: policy.clone(),
1508 });
1509 }
1510 }
1511 }
1512 permissions
1513}
1514
1515fn permission_from_hitl_question(question: &RunHitlQuestionRecord) -> BundlePermission {
1516 BundlePermission {
1517 kind: "hitl_question".to_string(),
1518 source: "run.hitl_questions".to_string(),
1519 request_id: Some(question.request_id.clone()),
1520 agent: if question.agent.is_empty() {
1521 None
1522 } else {
1523 Some(question.agent.clone())
1524 },
1525 payload: serde_json::to_value(question).unwrap_or(JsonValue::Null),
1526 }
1527}
1528
1529fn collect_permission_events(
1530 permissions: &mut Vec<BundlePermission>,
1531 source: &str,
1532 transcript: &Option<JsonValue>,
1533) {
1534 let Some(transcript) = transcript else {
1535 return;
1536 };
1537 for event in transcript
1538 .get("events")
1539 .and_then(JsonValue::as_array)
1540 .into_iter()
1541 .flatten()
1542 {
1543 let kind = event
1544 .get("type")
1545 .or_else(|| event.get("kind"))
1546 .and_then(JsonValue::as_str)
1547 .unwrap_or_default();
1548 if kind.contains("permission") || kind.contains("approval") || kind.starts_with("hitl_") {
1549 permissions.push(BundlePermission {
1550 kind: kind.to_string(),
1551 source: source.to_string(),
1552 request_id: event
1553 .get("request_id")
1554 .or_else(|| event.get("id"))
1555 .and_then(JsonValue::as_str)
1556 .map(str::to_string),
1557 agent: event
1558 .get("agent")
1559 .and_then(JsonValue::as_str)
1560 .map(str::to_string),
1561 payload: event.clone(),
1562 });
1563 }
1564 }
1565}
1566
1567fn event_log_pointers_from_run(run: &RunRecord) -> Vec<BundleEventLogPointer> {
1568 let mut pointers = Vec::new();
1569 if let Some(observability) = &run.observability {
1570 for pointer in &observability.transcript_pointers {
1571 pointers.push(BundleEventLogPointer {
1572 kind: pointer.kind.clone(),
1573 topic: None,
1574 path: pointer.path.clone(),
1575 location: pointer.location.clone(),
1576 available: pointer.available,
1577 });
1578 }
1579 for worker in &observability.worker_lineage {
1580 if let Some(session_id) = &worker.session_id {
1581 pointers.push(BundleEventLogPointer {
1582 kind: "agent_events".to_string(),
1583 topic: Some(format!("observability.agent_events.{session_id}")),
1584 path: worker.snapshot_path.clone(),
1585 location: format!("worker.{}.session", worker.worker_id),
1586 available: worker.snapshot_path.is_some(),
1587 });
1588 }
1589 }
1590 }
1591 pointers
1592}
1593
1594fn worker_snapshots_from_run(run: &RunRecord) -> Vec<BundleWorkerSnapshot> {
1595 let mut snapshots = Vec::new();
1596 let mut seen_paths = BTreeSet::new();
1597 for child in &run.child_runs {
1598 let Some(path) = child.snapshot_path.as_deref() else {
1599 continue;
1600 };
1601 if !seen_paths.insert(path.to_string()) {
1602 continue;
1603 }
1604 if let Some(snapshot) = worker_snapshot_from_path(
1605 &child.worker_id,
1606 &child.worker_name,
1607 &child.status,
1608 Path::new(path),
1609 ) {
1610 snapshots.push(snapshot);
1611 }
1612 }
1613 if let Some(observability) = run.observability.as_ref() {
1614 for worker in &observability.worker_lineage {
1615 let Some(path) = worker.snapshot_path.as_deref() else {
1616 continue;
1617 };
1618 if !seen_paths.insert(path.to_string()) {
1619 continue;
1620 }
1621 if let Some(snapshot) = worker_snapshot_from_path(
1622 &worker.worker_id,
1623 &worker.worker_name,
1624 &worker.status,
1625 Path::new(path),
1626 ) {
1627 snapshots.push(snapshot);
1628 }
1629 }
1630 }
1631 snapshots
1632}
1633
1634fn worker_snapshot_from_path(
1635 worker_id: &str,
1636 worker_name: &str,
1637 status: &str,
1638 path: &Path,
1639) -> Option<BundleWorkerSnapshot> {
1640 let content = fs::read_to_string(path).ok()?;
1641 let value = serde_json::from_str::<JsonValue>(&content).ok()?;
1642 Some(BundleWorkerSnapshot {
1643 worker_id: if worker_id.is_empty() {
1644 value
1645 .get("id")
1646 .and_then(JsonValue::as_str)
1647 .unwrap_or_default()
1648 .to_string()
1649 } else {
1650 worker_id.to_string()
1651 },
1652 worker_name: if worker_name.is_empty() {
1653 value
1654 .get("name")
1655 .and_then(JsonValue::as_str)
1656 .unwrap_or("worker")
1657 .to_string()
1658 } else {
1659 worker_name.to_string()
1660 },
1661 status: if status.is_empty() {
1662 value
1663 .get("status")
1664 .and_then(JsonValue::as_str)
1665 .unwrap_or_default()
1666 .to_string()
1667 } else {
1668 status.to_string()
1669 },
1670 snapshot_ref: value
1671 .get("suspension")
1672 .and_then(|value| value.get("snapshot_ref"))
1673 .and_then(JsonValue::as_str)
1674 .or_else(|| value.get("snapshot_path").and_then(JsonValue::as_str))
1675 .unwrap_or_else(|| path.to_str().unwrap_or_default())
1676 .to_string(),
1677 source_path: Some(path.to_string_lossy().into_owned()),
1678 value,
1679 })
1680}
1681
1682fn worker_snapshot_file_name(worker_id: &str, index: usize) -> String {
1683 let component = sanitize_topic_component(worker_id);
1684 let component = if component.is_empty() {
1685 format!("worker_{index}")
1686 } else {
1687 component
1688 };
1689 format!("{component}.json")
1690}
1691
1692fn worker_snapshot_value_for_import(value: &JsonValue, path: &Path) -> JsonValue {
1693 let mut value = value.clone();
1694 let path = path.to_string_lossy().into_owned();
1695 if let JsonValue::Object(map) = &mut value {
1696 map.insert("snapshot_path".to_string(), JsonValue::String(path.clone()));
1697 if let Some(JsonValue::Object(suspension)) = map.get_mut("suspension") {
1698 suspension.insert("snapshot_ref".to_string(), JsonValue::String(path));
1699 }
1700 }
1701 value
1702}
1703
1704fn deterministic_events_from_run(
1705 run: &RunRecord,
1706) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1707 let mut events = Vec::new();
1708 for (index, transition) in run.transitions.iter().enumerate() {
1709 events.push(BundleJsonEntry {
1710 source: "run.transitions".to_string(),
1711 index,
1712 value: serde_json::to_value(transition)
1713 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1714 });
1715 }
1716 for (index, checkpoint) in run.checkpoints.iter().enumerate() {
1717 events.push(BundleJsonEntry {
1718 source: "run.checkpoints".to_string(),
1719 index,
1720 value: serde_json::to_value(checkpoint)
1721 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1722 });
1723 }
1724 Ok(events)
1725}
1726
1727fn attachments_from_run(run: &RunRecord) -> Vec<BundleAttachment> {
1728 run.artifacts
1729 .iter()
1730 .map(|artifact| BundleAttachment {
1731 id: artifact.id.clone(),
1732 kind: artifact.kind.clone(),
1733 title: artifact.title.clone(),
1734 stage: artifact.stage.clone(),
1735 text: artifact.text.clone(),
1736 data: artifact.data.clone(),
1737 metadata: artifact.metadata.clone(),
1738 })
1739 .collect()
1740}
1741
1742fn redact_bundle_pointer_paths_json(
1743 value: &mut JsonValue,
1744 path: &str,
1745 entries: &mut Vec<RedactionEntry>,
1746) {
1747 match value {
1748 JsonValue::Object(map) => {
1749 let keys = map.keys().cloned().collect::<Vec<_>>();
1750 for key in keys {
1751 let child_path = json_path_child(path, &key);
1752 if key == "path" && bundle_pointer_path_should_redact(&child_path) {
1753 if !map.get(&key).is_some_and(JsonValue::is_null) {
1754 map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1755 entries.push(RedactionEntry {
1756 path: child_path,
1757 class: "local_pointer_path".to_string(),
1758 action: "replaced".to_string(),
1759 replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1760 });
1761 }
1762 } else if let Some(child) = map.get_mut(&key) {
1763 redact_bundle_pointer_paths_json(child, &child_path, entries);
1764 }
1765 }
1766 }
1767 JsonValue::Array(items) => {
1768 for (index, item) in items.iter_mut().enumerate() {
1769 redact_bundle_pointer_paths_json(item, &format!("{path}[{index}]"), entries);
1770 }
1771 }
1772 _ => {}
1773 }
1774}
1775
1776fn bundle_pointer_path_should_redact(path: &str) -> bool {
1777 path.contains(".event_log_pointers[") || path.contains(".transcript_pointers[")
1778}
1779
1780fn withhold_replay_only_json(value: &mut JsonValue, path: &str, entries: &mut Vec<RedactionEntry>) {
1781 match value {
1782 JsonValue::Object(map) => {
1783 let keys = map.keys().cloned().collect::<Vec<_>>();
1784 for key in keys {
1785 let child_path = json_path_child(path, &key);
1786 if replay_only_field_is_prompt_payload(&key) {
1787 if !map.get(&key).is_some_and(JsonValue::is_null) {
1788 map.insert(key, JsonValue::String(REPLAY_ONLY_PLACEHOLDER.to_string()));
1789 entries.push(RedactionEntry {
1790 path: child_path,
1791 class: "prompt_or_tool_payload".to_string(),
1792 action: "withheld".to_string(),
1793 replacement: Some(REPLAY_ONLY_PLACEHOLDER.to_string()),
1794 });
1795 }
1796 } else if let Some(child) = map.get_mut(&key) {
1797 withhold_replay_only_json(child, &child_path, entries);
1798 }
1799 }
1800 }
1801 JsonValue::Array(items) => {
1802 for (index, item) in items.iter_mut().enumerate() {
1803 withhold_replay_only_json(item, &format!("{path}[{index}]"), entries);
1804 }
1805 }
1806 _ => {}
1807 }
1808}
1809
1810fn replay_only_field_is_prompt_payload(key: &str) -> bool {
1811 matches!(
1812 key,
1813 "args"
1814 | "arguments"
1815 | "blocks"
1816 | "content"
1817 | "data"
1818 | "private_reasoning"
1819 | "prompt"
1820 | "raw_input"
1821 | "raw_output"
1822 | "result"
1823 | "response_text"
1824 | "summary"
1825 | "system"
1826 | "system_prompt"
1827 | "task"
1828 | "text"
1829 | "thinking"
1830 | "visible_text"
1831 )
1832}
1833
1834fn set_json_path(value: &mut JsonValue, path: &[&str], replacement: JsonValue) {
1835 let Some((head, tail)) = path.split_first() else {
1836 *value = replacement;
1837 return;
1838 };
1839 if tail.is_empty() {
1840 if let JsonValue::Object(map) = value {
1841 map.insert((*head).to_string(), replacement);
1842 }
1843 return;
1844 }
1845 if let JsonValue::Object(map) = value {
1846 if let Some(child) = map.get_mut(*head) {
1847 set_json_path(child, tail, replacement);
1848 }
1849 }
1850}
1851
1852fn require_field(value: &JsonValue, field: &str) -> Result<(), SessionBundleError> {
1853 if value.get(field).is_some() {
1854 Ok(())
1855 } else {
1856 Err(SessionBundleError::MissingRequired(format!("$.{field}")))
1857 }
1858}
1859
1860fn require_nested_field(value: &JsonValue, path: &[&str]) -> Result<(), SessionBundleError> {
1861 let mut current = value;
1862 for segment in path {
1863 current = current
1864 .get(*segment)
1865 .ok_or_else(|| SessionBundleError::MissingRequired(json_path_from_segments(path)))?;
1866 }
1867 Ok(())
1868}
1869
1870fn json_path_from_segments(path: &[&str]) -> String {
1871 path.iter().fold("$".to_string(), |parent, segment| {
1872 json_path_child(&parent, segment)
1873 })
1874}