1use std::borrow::Cow;
10use std::collections::BTreeMap;
11use std::fmt;
12
13use chrono::{DateTime, SecondsFormat, Utc};
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value as JsonValue};
16
17use crate::agent_events::AgentEvent;
18use crate::event_log::sanitize_topic_component;
19use crate::orchestration::{
20 new_id, now_rfc3339, AgentSessionReplayEvent, ReplayFixture, RunCheckpointRecord,
21 RunHitlQuestionRecord, RunRecord, RunTraceSpanRecord, RunTransitionRecord, ToolCallRecord,
22};
23use crate::redact::{RedactionPolicy, REDACTED_PLACEHOLDER};
24use crate::workspace_anchor::{anchor_from_transcript_metadata_json, MountedRoot, WorkspaceAnchor};
25
26mod schema;
27pub use schema::{session_bundle_schema, session_bundle_schema_pretty};
28
29#[cfg(test)]
30mod tests;
31
32pub const SESSION_BUNDLE_TYPE: &str = "harn_session_bundle";
33pub const SESSION_BUNDLE_SCHEMA_VERSION: u32 = 1;
34pub const SESSION_BUNDLE_SCHEMA_ID: &str = "https://harnlang.com/schemas/session-bundle.v1.json";
35pub const REPLAY_ONLY_PLACEHOLDER: &str = "[withheld]";
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
38pub enum SessionBundleExportMode {
39 Local,
40 Sanitized,
41 ReplayOnly,
42}
43
44impl SessionBundleExportMode {
45 pub fn as_str(self) -> &'static str {
46 match self {
47 Self::Local => "local",
48 Self::Sanitized => "sanitized",
49 Self::ReplayOnly => "replay_only",
50 }
51 }
52}
53
54#[derive(Clone, Debug)]
55pub struct SessionBundleExportOptions {
56 pub mode: SessionBundleExportMode,
57 pub include_attachments: bool,
58 pub redaction_policy: RedactionPolicy,
59}
60
61impl Default for SessionBundleExportOptions {
62 fn default() -> Self {
63 Self {
64 mode: SessionBundleExportMode::Sanitized,
65 include_attachments: false,
66 redaction_policy: RedactionPolicy::default(),
67 }
68 }
69}
70
71#[derive(Clone, Debug, Default)]
72pub struct SessionBundleValidationOptions {
73 pub allow_unsafe_secret_markers: bool,
74 pub redaction_policy: RedactionPolicy,
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
78#[serde(default)]
79pub struct SessionBundle {
80 #[serde(rename = "_type")]
81 pub type_name: String,
82 pub schema_version: u32,
83 pub bundle_id: String,
84 pub created_at: String,
85 pub producer: BundleProducer,
86 pub source: BundleSource,
87 pub runtime: BundleRuntime,
88 pub workspace: Option<BundleWorkspace>,
89 pub transcript: BundleTranscript,
90 pub tools: BundleTools,
91 pub permissions: Vec<BundlePermission>,
92 pub replay: BundleReplay,
93 pub redaction: RedactionManifest,
94 pub attachments: Vec<BundleAttachment>,
95 pub metadata: BTreeMap<String, JsonValue>,
96}
97
98impl Default for SessionBundle {
99 fn default() -> Self {
100 Self {
101 type_name: SESSION_BUNDLE_TYPE.to_string(),
102 schema_version: SESSION_BUNDLE_SCHEMA_VERSION,
103 bundle_id: String::new(),
104 created_at: String::new(),
105 producer: BundleProducer::default(),
106 source: BundleSource::default(),
107 runtime: BundleRuntime::default(),
108 workspace: None,
109 transcript: BundleTranscript::default(),
110 tools: BundleTools::default(),
111 permissions: Vec::new(),
112 replay: BundleReplay::default(),
113 redaction: RedactionManifest::default(),
114 attachments: Vec::new(),
115 metadata: BTreeMap::new(),
116 }
117 }
118}
119
120#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
121#[serde(default)]
122pub struct BundleProducer {
123 pub name: String,
124 pub version: String,
125 pub schema_id: String,
126}
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
129#[serde(default)]
130pub struct BundleSource {
131 pub kind: String,
132 pub run_record_id: String,
133 pub workflow_id: String,
134 pub workflow_name: Option<String>,
135 pub task: String,
136 pub status: String,
137 pub started_at: String,
138 pub finished_at: Option<String>,
139 pub persisted_path: Option<String>,
140 pub root_run_id: Option<String>,
141 pub parent_run_id: Option<String>,
142 pub child_run_count: usize,
143}
144
145#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
146#[serde(default)]
147pub struct BundleRuntime {
148 pub harn_version: String,
149 pub provider_models: Vec<String>,
150 pub usage: Option<BundleUsage>,
151 pub metadata: BTreeMap<String, JsonValue>,
152}
153
154#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
155#[serde(default)]
156pub struct BundleUsage {
157 pub input_tokens: i64,
158 pub output_tokens: i64,
159 pub call_count: i64,
160 pub total_duration_ms: i64,
161 pub total_cost: f64,
162 pub models: Vec<String>,
163}
164
165#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
166#[serde(default)]
167pub struct BundleWorkspace {
168 pub primary: Option<String>,
170 #[serde(default)]
172 pub additional_roots: Vec<BundleMountedRoot>,
173 pub anchored_at: Option<String>,
175 pub policy: String,
179}
180
181#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
182#[serde(default)]
183pub struct BundleMountedRoot {
184 pub path: String,
185 pub mount_mode: String,
186 pub mounted_at: String,
187}
188
189impl From<&MountedRoot> for BundleMountedRoot {
190 fn from(root: &MountedRoot) -> Self {
191 Self {
192 path: root.path.to_string_lossy().into_owned(),
193 mount_mode: root.mount_mode.as_str().to_string(),
194 mounted_at: root.mounted_at.clone(),
195 }
196 }
197}
198
199impl From<&WorkspaceAnchor> for BundleWorkspace {
200 fn from(anchor: &WorkspaceAnchor) -> Self {
201 Self {
202 primary: Some(anchor.primary.to_string_lossy().into_owned()),
203 additional_roots: anchor.additional_roots.iter().map(Into::into).collect(),
204 anchored_at: Some(anchor.anchored_at.clone()),
205 policy: "safe_identity_only".to_string(),
206 }
207 }
208}
209
210#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
211#[serde(default)]
212pub struct BundleTranscript {
213 pub sections: Vec<BundleTranscriptSection>,
214}
215
216#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
217#[serde(default)]
218pub struct BundleTranscriptSection {
219 pub id: String,
220 pub label: String,
221 pub scope: String,
222 pub location: String,
223 pub summary: Option<String>,
224 pub messages: Vec<JsonValue>,
225 pub events: Vec<JsonValue>,
226 pub assets: Vec<JsonValue>,
227 pub metadata: BTreeMap<String, JsonValue>,
228}
229
230#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
231#[serde(default)]
232pub struct BundleTools {
233 pub schemas: Vec<BundleJsonEntry>,
234 pub calls: Vec<BundleToolCall>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
238#[serde(default)]
239pub struct BundleJsonEntry {
240 pub source: String,
241 pub index: usize,
242 pub value: JsonValue,
243}
244
245#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
246#[serde(default)]
247pub struct BundleToolCall {
248 pub tool_name: String,
249 pub tool_use_id: String,
250 pub args_hash: String,
251 pub result: String,
252 pub is_rejected: bool,
253 pub duration_ms: u64,
254 pub iteration: usize,
255 pub timestamp: String,
256}
257
258impl From<&ToolCallRecord> for BundleToolCall {
259 fn from(record: &ToolCallRecord) -> Self {
260 Self {
261 tool_name: record.tool_name.clone(),
262 tool_use_id: record.tool_use_id.clone(),
263 args_hash: record.args_hash.clone(),
264 result: record.result.clone(),
265 is_rejected: record.is_rejected,
266 duration_ms: record.duration_ms,
267 iteration: record.iteration,
268 timestamp: record.timestamp.clone(),
269 }
270 }
271}
272
273#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
274#[serde(default)]
275pub struct BundlePermission {
276 pub kind: String,
277 pub source: String,
278 pub request_id: Option<String>,
279 pub agent: Option<String>,
280 pub payload: JsonValue,
281}
282
283#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
284#[serde(default)]
285pub struct BundleReplay {
286 pub replay_fixture: Option<ReplayFixture>,
287 pub run_record: Option<JsonValue>,
288 pub event_log_pointers: Vec<BundleEventLogPointer>,
289 pub transitions: Vec<RunTransitionRecord>,
290 pub checkpoints: Vec<RunCheckpointRecord>,
291 pub trace_spans: Vec<RunTraceSpanRecord>,
292 pub deterministic_events: Vec<BundleJsonEntry>,
293}
294
295#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
296#[serde(default)]
297pub struct BundleEventLogPointer {
298 pub kind: String,
299 pub topic: Option<String>,
300 pub path: Option<String>,
301 pub location: String,
302 pub available: bool,
303}
304
305#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
306#[serde(default)]
307pub struct RedactionManifest {
308 pub mode: String,
309 pub policy: String,
310 pub placeholder: String,
311 pub entries: Vec<RedactionEntry>,
312 pub unsafe_secret_markers_rejected: bool,
313}
314
315#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
316#[serde(default)]
317pub struct RedactionEntry {
318 pub path: String,
319 pub class: String,
320 pub action: String,
321 pub replacement: Option<String>,
322}
323
324#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
325#[serde(default)]
326pub struct BundleAttachment {
327 pub id: String,
328 pub kind: String,
329 pub title: Option<String>,
330 pub stage: Option<String>,
331 pub text: Option<String>,
332 pub data: Option<JsonValue>,
333 pub metadata: BTreeMap<String, JsonValue>,
334}
335
336#[derive(Debug, Clone, PartialEq, Eq)]
337pub enum SessionBundleError {
338 Decode(String),
339 Encode(String),
340 MissingRequired(String),
341 UnsupportedSchemaVersion { found: u64, supported: u32 },
342 InvalidType { path: String, expected: String },
343 UnsafeSecretMarker { path: String, excerpt: String },
344 MissingRunRecord,
345 MissingSessionEvents { session_id: String },
346}
347
348impl fmt::Display for SessionBundleError {
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 match self {
351 Self::Decode(error) => write!(f, "failed to decode session bundle: {error}"),
352 Self::Encode(error) => write!(f, "failed to encode session bundle: {error}"),
353 Self::MissingRequired(path) => {
354 write!(f, "session bundle is missing required field {path}")
355 }
356 Self::UnsupportedSchemaVersion { found, supported } => write!(
357 f,
358 "unsupported session bundle schema_version {found}; this build supports <= {supported}"
359 ),
360 Self::InvalidType { path, expected } => {
361 write!(f, "session bundle field {path} must be {expected}")
362 }
363 Self::UnsafeSecretMarker { path, excerpt } => write!(
364 f,
365 "session bundle contains an unsafe unredacted secret marker at {path}: {excerpt}"
366 ),
367 Self::MissingRunRecord => write!(f, "session bundle does not include an importable run record"),
368 Self::MissingSessionEvents { session_id } => write!(
369 f,
370 "event log does not contain replayable events for session_id {session_id:?}"
371 ),
372 }
373 }
374}
375
376impl std::error::Error for SessionBundleError {}
377
378pub fn export_run_record_bundle(
379 run: &RunRecord,
380 options: &SessionBundleExportOptions,
381) -> Result<SessionBundle, SessionBundleError> {
382 let run_record_value =
383 serde_json::to_value(run).map_err(|error| SessionBundleError::Encode(error.to_string()))?;
384 let mut bundle = raw_bundle_from_run(run, run_record_value, options.include_attachments)?;
385 let mut bundle_value = serde_json::to_value(&bundle)
386 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
387
388 let mut manifest = RedactionManifest {
389 mode: options.mode.as_str().to_string(),
390 policy: if matches!(options.mode, SessionBundleExportMode::Local) {
391 "none".to_string()
392 } else {
393 "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths".to_string()
394 },
395 placeholder: REDACTED_PLACEHOLDER.to_string(),
396 entries: Vec::new(),
397 unsafe_secret_markers_rejected: !matches!(options.mode, SessionBundleExportMode::Local),
398 };
399
400 if !matches!(options.mode, SessionBundleExportMode::Local) {
401 let redaction_policy = bundle_redaction_policy(&options.redaction_policy);
402 redact_json_with_manifest(
403 &mut bundle_value,
404 "$",
405 &redaction_policy,
406 &mut manifest.entries,
407 );
408 redact_bundle_pointer_paths_json(&mut bundle_value, "$", &mut manifest.entries);
409 }
410 if matches!(options.mode, SessionBundleExportMode::ReplayOnly) {
411 withhold_replay_only_json(&mut bundle_value, "$", &mut manifest.entries);
412 }
413 set_json_path(
414 &mut bundle_value,
415 &["redaction"],
416 serde_json::to_value(&manifest)
417 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
418 );
419 bundle = serde_json::from_value(bundle_value)
420 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
421 Ok(bundle)
422}
423
424pub fn validate_session_bundle_value(
425 value: &JsonValue,
426 options: &SessionBundleValidationOptions,
427) -> Result<SessionBundle, SessionBundleError> {
428 require_field(value, "_type")?;
429 require_field(value, "schema_version")?;
430 require_field(value, "bundle_id")?;
431 require_field(value, "created_at")?;
432 require_field(value, "producer")?;
433 require_field(value, "source")?;
434 require_field(value, "runtime")?;
435 require_field(value, "transcript")?;
436 require_field(value, "tools")?;
437 require_field(value, "permissions")?;
438 require_field(value, "replay")?;
439 require_field(value, "redaction")?;
440 require_field(value, "attachments")?;
441 require_nested_field(value, &["producer", "name"])?;
442 require_nested_field(value, &["producer", "version"])?;
443 require_nested_field(value, &["producer", "schema_id"])?;
444 require_nested_field(value, &["source", "kind"])?;
445 require_nested_field(value, &["source", "run_record_id"])?;
446 require_nested_field(value, &["source", "workflow_id"])?;
447 require_nested_field(value, &["source", "task"])?;
448 require_nested_field(value, &["source", "status"])?;
449 require_nested_field(value, &["runtime", "harn_version"])?;
450 require_nested_field(value, &["runtime", "provider_models"])?;
451 require_nested_field(value, &["transcript", "sections"])?;
452 require_nested_field(value, &["tools", "schemas"])?;
453 require_nested_field(value, &["tools", "calls"])?;
454 require_nested_field(value, &["replay", "event_log_pointers"])?;
455 require_nested_field(value, &["replay", "transitions"])?;
456 require_nested_field(value, &["replay", "checkpoints"])?;
457 require_nested_field(value, &["replay", "trace_spans"])?;
458 require_nested_field(value, &["replay", "deterministic_events"])?;
459 require_nested_field(value, &["redaction", "mode"])?;
460 require_nested_field(value, &["redaction", "policy"])?;
461 require_nested_field(value, &["redaction", "placeholder"])?;
462 require_nested_field(value, &["redaction", "entries"])?;
463 require_nested_field(value, &["redaction", "unsafe_secret_markers_rejected"])?;
464
465 let type_name = value
466 .get("_type")
467 .and_then(JsonValue::as_str)
468 .ok_or_else(|| SessionBundleError::InvalidType {
469 path: "$._type".to_string(),
470 expected: "string".to_string(),
471 })?;
472 if type_name != SESSION_BUNDLE_TYPE {
473 return Err(SessionBundleError::InvalidType {
474 path: "$._type".to_string(),
475 expected: format!("\"{SESSION_BUNDLE_TYPE}\""),
476 });
477 }
478
479 let version = value
480 .get("schema_version")
481 .and_then(JsonValue::as_u64)
482 .ok_or_else(|| SessionBundleError::InvalidType {
483 path: "$.schema_version".to_string(),
484 expected: "positive integer".to_string(),
485 })?;
486 if version == 0 || version > u64::from(SESSION_BUNDLE_SCHEMA_VERSION) {
487 return Err(SessionBundleError::UnsupportedSchemaVersion {
488 found: version,
489 supported: SESSION_BUNDLE_SCHEMA_VERSION,
490 });
491 }
492
493 if !options.allow_unsafe_secret_markers {
494 reject_unredacted_secret_markers(value, "$", &options.redaction_policy)?;
495 }
496
497 serde_json::from_value::<SessionBundle>(value.clone())
498 .map_err(|error| SessionBundleError::Decode(error.to_string()))
499}
500
501pub fn validate_session_bundle_str(
502 content: &str,
503 options: &SessionBundleValidationOptions,
504) -> Result<SessionBundle, SessionBundleError> {
505 let value: JsonValue = serde_json::from_str(content)
506 .map_err(|error| SessionBundleError::Decode(error.to_string()))?;
507 validate_session_bundle_value(&value, options)
508}
509
510pub fn import_run_record_value(bundle: &SessionBundle) -> Result<JsonValue, SessionBundleError> {
511 if let Some(run_record) = bundle.replay.run_record.clone() {
512 return Ok(run_record);
513 }
514 if let Some(fixture) = &bundle.replay.replay_fixture {
515 let transcript = bundle.transcript.sections.first().map(|section| {
516 json!({
517 "_type": "transcript",
518 "messages": section.messages.clone(),
519 "events": section.events.clone(),
520 "assets": section.assets.clone(),
521 "summary": section.summary.clone(),
522 "metadata": section.metadata.clone(),
523 })
524 });
525 let hitl_questions = bundle
526 .permissions
527 .iter()
528 .filter(|permission| permission.kind == "hitl_question")
529 .map(|permission| permission.payload.clone())
530 .collect::<Vec<_>>();
531 return Ok(json!({
532 "_type": "run_record",
533 "id": bundle.source.run_record_id.clone(),
534 "workflow_id": bundle.source.workflow_id.clone(),
535 "workflow_name": bundle.source.workflow_name.clone(),
536 "task": bundle.source.task.clone(),
537 "status": bundle.source.status.clone(),
538 "started_at": bundle.source.started_at.clone(),
539 "finished_at": bundle.source.finished_at.clone(),
540 "stages": [],
541 "transitions": bundle.replay.transitions.clone(),
542 "checkpoints": bundle.replay.checkpoints.clone(),
543 "pending_nodes": [],
544 "completed_nodes": [],
545 "child_runs": [],
546 "artifacts": [],
547 "handoffs": [],
548 "policy": {},
549 "transcript": transcript,
550 "usage": bundle.runtime.usage.clone(),
551 "replay_fixture": fixture,
552 "trace_spans": bundle.replay.trace_spans.clone(),
553 "tool_recordings": bundle.tools.calls.clone(),
554 "hitl_questions": hitl_questions,
555 "persona_runtime": [],
556 "metadata": {
557 "imported_from_session_bundle": bundle.bundle_id.clone(),
558 "session_bundle_schema_version": bundle.schema_version,
559 }
560 }));
561 }
562 Err(SessionBundleError::MissingRunRecord)
563}
564
565pub fn session_bundle_from_agent_session_events(
566 session_id: &str,
567 events: &[AgentSessionReplayEvent],
568) -> Result<SessionBundle, SessionBundleError> {
569 if events.is_empty() {
570 return Err(SessionBundleError::MissingSessionEvents {
571 session_id: session_id.to_string(),
572 });
573 }
574
575 let stable_id = sanitize_topic_component(session_id);
576 let started_at = rfc3339_from_epoch_ms(events[0].occurred_at_ms);
577 let finished_at = events.iter().rev().find_map(|entry| match &entry.event {
578 AgentEvent::SessionClosed { .. } => Some(rfc3339_from_epoch_ms(entry.occurred_at_ms)),
579 _ => None,
580 });
581 let status = events
582 .iter()
583 .rev()
584 .find_map(|entry| match &entry.event {
585 AgentEvent::SessionClosed { status, .. } if !status.is_empty() => Some(status.clone()),
586 _ => None,
587 })
588 .unwrap_or_else(|| "completed".to_string());
589 let run_id = session_id.to_string();
590 let workflow_id = "agent_session".to_string();
591 let created_at = finished_at.clone().unwrap_or_else(|| started_at.clone());
592 let transcript_events = transcript_events_from_agent_session(events)?;
593 let transcript_messages = transcript_messages_from_agent_session(events);
594 let mut transcript_metadata = BTreeMap::new();
595 transcript_metadata.insert("session_id".to_string(), json!(session_id));
596 transcript_metadata.insert(
597 "source".to_string(),
598 json!("events.sqlite observability.agent_events topic"),
599 );
600
601 let replay_fixture = ReplayFixture {
602 type_name: "replay_fixture".to_string(),
603 id: format!("fixture_from_session_{stable_id}"),
604 source_run_id: run_id.clone(),
605 workflow_id: workflow_id.clone(),
606 workflow_name: Some(format!("Agent session {session_id}")),
607 created_at: created_at.clone(),
608 eval_kind: Some("replay".to_string()),
609 expected_status: status.clone(),
610 ..ReplayFixture::default()
611 };
612
613 Ok(SessionBundle {
614 bundle_id: format!("bundle_from_session_{stable_id}"),
615 created_at,
616 producer: BundleProducer {
617 name: "harn".to_string(),
618 version: env!("CARGO_PKG_VERSION").to_string(),
619 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
620 },
621 source: BundleSource {
622 kind: "event_log_session".to_string(),
623 run_record_id: run_id,
624 workflow_id,
625 workflow_name: Some(format!("Agent session {session_id}")),
626 task: task_from_agent_session(events)
627 .unwrap_or_else(|| format!("Agent session {session_id}")),
628 status,
629 started_at,
630 finished_at,
631 ..BundleSource::default()
632 },
633 runtime: BundleRuntime {
634 harn_version: env!("CARGO_PKG_VERSION").to_string(),
635 ..BundleRuntime::default()
636 },
637 transcript: BundleTranscript {
638 sections: vec![BundleTranscriptSection {
639 id: "agent_events".to_string(),
640 label: "Agent event log".to_string(),
641 scope: "session".to_string(),
642 location: format!(
643 "observability.agent_events.{}",
644 sanitize_topic_component(session_id)
645 ),
646 summary: None,
647 messages: transcript_messages,
648 events: transcript_events,
649 assets: Vec::new(),
650 metadata: transcript_metadata,
651 }],
652 },
653 permissions: permissions_from_agent_session(events),
654 replay: BundleReplay {
655 replay_fixture: Some(replay_fixture),
656 event_log_pointers: vec![BundleEventLogPointer {
657 kind: "agent_events".to_string(),
658 topic: Some(format!(
659 "observability.agent_events.{}",
660 sanitize_topic_component(session_id)
661 )),
662 path: None,
663 location: "events.sqlite".to_string(),
664 available: true,
665 }],
666 deterministic_events: deterministic_events_from_agent_session(events)?,
667 ..BundleReplay::default()
668 },
669 ..SessionBundle::default()
670 })
671}
672
673pub fn import_run_record_from_agent_session_events(
674 session_id: &str,
675 events: &[AgentSessionReplayEvent],
676) -> Result<RunRecord, SessionBundleError> {
677 let bundle = session_bundle_from_agent_session_events(session_id, events)?;
678 let run_record = import_run_record_value(&bundle)?;
679 serde_json::from_value(run_record)
680 .map_err(|error| SessionBundleError::Decode(error.to_string()))
681}
682
683fn transcript_events_from_agent_session(
684 events: &[AgentSessionReplayEvent],
685) -> Result<Vec<JsonValue>, SessionBundleError> {
686 events
687 .iter()
688 .map(|entry| {
689 let event = serde_json::to_value(&entry.event)
690 .map_err(|error| SessionBundleError::Encode(error.to_string()))?;
691 Ok(json!({
692 "event_id": entry.event_id,
693 "kind": entry.kind,
694 "occurred_at_ms": entry.occurred_at_ms,
695 "event": event,
696 }))
697 })
698 .collect()
699}
700
701fn transcript_messages_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<JsonValue> {
702 events
703 .iter()
704 .filter_map(|entry| match &entry.event {
705 AgentEvent::UserMessage { content, .. } => Some(json!({
706 "role": "user",
707 "content": content,
708 })),
709 AgentEvent::AgentMessageChunk { content, .. } if !content.is_empty() => Some(json!({
710 "role": "assistant",
711 "content": content,
712 })),
713 _ => None,
714 })
715 .collect()
716}
717
718fn permissions_from_agent_session(events: &[AgentSessionReplayEvent]) -> Vec<BundlePermission> {
719 let mut permissions = Vec::new();
720 for entry in events {
721 if let AgentEvent::HitlRequested {
722 request_id,
723 kind,
724 payload,
725 ..
726 } = &entry.event
727 {
728 permissions.push(BundlePermission {
729 kind: "hitl_question".to_string(),
730 source: "agent_events".to_string(),
731 request_id: Some(request_id.clone()),
732 agent: None,
733 payload: json!({
734 "kind": kind,
735 "payload": payload,
736 "event_id": entry.event_id,
737 "occurred_at_ms": entry.occurred_at_ms,
738 }),
739 });
740 }
741 }
742 permissions
743}
744
745fn deterministic_events_from_agent_session(
746 events: &[AgentSessionReplayEvent],
747) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
748 transcript_events_from_agent_session(events).map(|entries| {
749 entries
750 .into_iter()
751 .enumerate()
752 .map(|(index, value)| BundleJsonEntry {
753 source: "events.sqlite.agent_events".to_string(),
754 index,
755 value,
756 })
757 .collect()
758 })
759}
760
761fn task_from_agent_session(events: &[AgentSessionReplayEvent]) -> Option<String> {
762 events.iter().find_map(|entry| match &entry.event {
763 AgentEvent::UserMessage { content, .. } => user_message_text(content),
764 _ => None,
765 })
766}
767
768fn user_message_text(content: &[JsonValue]) -> Option<String> {
769 let parts = content
770 .iter()
771 .filter_map(|value| {
772 value
773 .get("text")
774 .and_then(JsonValue::as_str)
775 .or_else(|| value.as_str())
776 .map(str::to_string)
777 })
778 .filter(|text| !text.trim().is_empty())
779 .collect::<Vec<_>>();
780 if parts.is_empty() {
781 None
782 } else {
783 Some(parts.join("\n"))
784 }
785}
786
787fn rfc3339_from_epoch_ms(ms: i64) -> String {
788 DateTime::<Utc>::from_timestamp_millis(ms)
789 .unwrap_or_else(|| DateTime::<Utc>::from_timestamp(0, 0).expect("unix epoch is valid"))
790 .to_rfc3339_opts(SecondsFormat::Millis, true)
791}
792
793fn raw_bundle_from_run(
794 run: &RunRecord,
795 run_record_value: JsonValue,
796 include_attachments: bool,
797) -> Result<SessionBundle, SessionBundleError> {
798 let mut bundle = SessionBundle {
799 bundle_id: new_id("bundle"),
800 created_at: now_rfc3339(),
801 producer: BundleProducer {
802 name: "harn".to_string(),
803 version: env!("CARGO_PKG_VERSION").to_string(),
804 schema_id: SESSION_BUNDLE_SCHEMA_ID.to_string(),
805 },
806 source: BundleSource {
807 kind: "run_record".to_string(),
808 run_record_id: run.id.clone(),
809 workflow_id: run.workflow_id.clone(),
810 workflow_name: run.workflow_name.clone(),
811 task: run.task.clone(),
812 status: run.status.clone(),
813 started_at: run.started_at.clone(),
814 finished_at: run.finished_at.clone(),
815 persisted_path: run.persisted_path.clone(),
816 root_run_id: run.root_run_id.clone(),
817 parent_run_id: run.parent_run_id.clone(),
818 child_run_count: run.child_runs.len(),
819 },
820 runtime: BundleRuntime {
821 harn_version: env!("CARGO_PKG_VERSION").to_string(),
822 provider_models: run
823 .usage
824 .as_ref()
825 .map(|usage| usage.models.clone())
826 .unwrap_or_default(),
827 usage: run.usage.as_ref().map(|usage| BundleUsage {
828 input_tokens: usage.input_tokens,
829 output_tokens: usage.output_tokens,
830 call_count: usage.call_count,
831 total_duration_ms: usage.total_duration_ms,
832 total_cost: usage.total_cost,
833 models: usage.models.clone(),
834 }),
835 metadata: BTreeMap::new(),
836 },
837 workspace: workspace_from_run(run),
838 transcript: transcript_from_run(run),
839 tools: BundleTools {
840 schemas: tool_schema_entries(run),
841 calls: run
842 .tool_recordings
843 .iter()
844 .map(BundleToolCall::from)
845 .collect(),
846 },
847 permissions: permissions_from_run(run),
848 replay: BundleReplay {
849 replay_fixture: run.replay_fixture.clone(),
850 run_record: Some(run_record_value),
851 event_log_pointers: event_log_pointers_from_run(run),
852 transitions: run.transitions.clone(),
853 checkpoints: run.checkpoints.clone(),
854 trace_spans: run.trace_spans.clone(),
855 deterministic_events: deterministic_events_from_run(run)?,
856 },
857 redaction: RedactionManifest {
858 mode: "sanitized".to_string(),
859 policy: "harn_vm::redact::RedactionPolicy::default+session_bundle_local_paths"
860 .to_string(),
861 placeholder: REDACTED_PLACEHOLDER.to_string(),
862 entries: Vec::new(),
863 unsafe_secret_markers_rejected: true,
864 },
865 attachments: if include_attachments {
866 attachments_from_run(run)
867 } else {
868 Vec::new()
869 },
870 ..SessionBundle::default()
871 };
872 bundle.metadata.insert(
873 "format_note".to_string(),
874 json!("Session bundles are portable JSON envelopes; hosted share links should reference sanitized bundles rather than raw run records."),
875 );
876 Ok(bundle)
877}
878
879fn bundle_redaction_policy(base: &RedactionPolicy) -> RedactionPolicy {
880 base.clone()
881 .with_extra_field("persisted_path")
882 .with_extra_field("primary")
883 .with_extra_field("run_path")
884 .with_extra_field("snapshot_path")
885}
886
887fn workspace_from_run(run: &RunRecord) -> Option<BundleWorkspace> {
888 let anchor = run
889 .transcript
890 .as_ref()
891 .and_then(|transcript| transcript.get("metadata"))
892 .and_then(anchor_from_transcript_metadata_json)?;
893 Some(BundleWorkspace::from(&anchor))
894}
895
896fn transcript_from_run(run: &RunRecord) -> BundleTranscript {
897 let mut sections = Vec::new();
898 if let Some(transcript) = &run.transcript {
899 sections.push(transcript_section(
900 "run",
901 "Run transcript",
902 "run",
903 "$.transcript",
904 transcript,
905 ));
906 }
907 for (index, stage) in run.stages.iter().enumerate() {
908 if let Some(transcript) = &stage.transcript {
909 sections.push(transcript_section(
910 &stage.id,
911 &format!("Stage {}", stage.node_id),
912 "stage",
913 &format!("$.stages[{index}].transcript"),
914 transcript,
915 ));
916 }
917 }
918 BundleTranscript { sections }
919}
920
921fn transcript_section(
922 id: &str,
923 label: &str,
924 scope: &str,
925 location: &str,
926 transcript: &JsonValue,
927) -> BundleTranscriptSection {
928 BundleTranscriptSection {
929 id: id.to_string(),
930 label: label.to_string(),
931 scope: scope.to_string(),
932 location: location.to_string(),
933 summary: transcript
934 .get("summary")
935 .and_then(JsonValue::as_str)
936 .map(str::to_string),
937 messages: json_array(transcript.get("messages")),
938 events: json_array(transcript.get("events")),
939 assets: json_array(transcript.get("assets")),
940 metadata: transcript
941 .get("metadata")
942 .and_then(JsonValue::as_object)
943 .map(|map| {
944 map.iter()
945 .map(|(key, value)| (key.clone(), value.clone()))
946 .collect()
947 })
948 .unwrap_or_default(),
949 }
950}
951
952fn json_array(value: Option<&JsonValue>) -> Vec<JsonValue> {
953 value
954 .and_then(JsonValue::as_array)
955 .cloned()
956 .unwrap_or_default()
957}
958
959fn tool_schema_entries(run: &RunRecord) -> Vec<BundleJsonEntry> {
960 let mut entries = Vec::new();
961 collect_tool_schema_entries_from_transcript(&mut entries, "run.transcript", &run.transcript);
962 for stage in &run.stages {
963 collect_tool_schema_entries_from_transcript(
964 &mut entries,
965 &format!("stage.{}.transcript", stage.node_id),
966 &stage.transcript,
967 );
968 if let Some(tools) = stage
969 .metadata
970 .get("tool_schemas")
971 .or_else(|| stage.metadata.get("tools"))
972 {
973 entries.push(BundleJsonEntry {
974 source: format!("stage.{}.metadata", stage.node_id),
975 index: entries.len(),
976 value: tools.clone(),
977 });
978 }
979 }
980 entries
981}
982
983fn collect_tool_schema_entries_from_transcript(
984 entries: &mut Vec<BundleJsonEntry>,
985 source: &str,
986 transcript: &Option<JsonValue>,
987) {
988 let Some(transcript) = transcript else {
989 return;
990 };
991 for event in transcript
992 .get("events")
993 .and_then(JsonValue::as_array)
994 .into_iter()
995 .flatten()
996 {
997 let kind = event
998 .get("type")
999 .or_else(|| event.get("kind"))
1000 .and_then(JsonValue::as_str)
1001 .unwrap_or_default();
1002 if kind == "tool_schemas" || kind == "tool_schema" {
1003 entries.push(BundleJsonEntry {
1004 source: source.to_string(),
1005 index: entries.len(),
1006 value: event.clone(),
1007 });
1008 }
1009 }
1010}
1011
1012fn permissions_from_run(run: &RunRecord) -> Vec<BundlePermission> {
1013 let mut permissions = run
1014 .hitl_questions
1015 .iter()
1016 .map(permission_from_hitl_question)
1017 .collect::<Vec<_>>();
1018 collect_permission_events(&mut permissions, "run.transcript", &run.transcript);
1019 for stage in &run.stages {
1020 collect_permission_events(
1021 &mut permissions,
1022 &format!("stage.{}.transcript", stage.node_id),
1023 &stage.transcript,
1024 );
1025 if let Some(worker) = stage.metadata.get("worker") {
1026 if let Some(policy) = worker
1027 .get("audit")
1028 .and_then(|audit| audit.get("approval_policy"))
1029 {
1030 permissions.push(BundlePermission {
1031 kind: "approval_policy".to_string(),
1032 source: format!("stage.{}.worker.audit", stage.node_id),
1033 request_id: None,
1034 agent: worker
1035 .get("name")
1036 .and_then(JsonValue::as_str)
1037 .map(str::to_string),
1038 payload: policy.clone(),
1039 });
1040 }
1041 }
1042 }
1043 permissions
1044}
1045
1046fn permission_from_hitl_question(question: &RunHitlQuestionRecord) -> BundlePermission {
1047 BundlePermission {
1048 kind: "hitl_question".to_string(),
1049 source: "run.hitl_questions".to_string(),
1050 request_id: Some(question.request_id.clone()),
1051 agent: if question.agent.is_empty() {
1052 None
1053 } else {
1054 Some(question.agent.clone())
1055 },
1056 payload: serde_json::to_value(question).unwrap_or(JsonValue::Null),
1057 }
1058}
1059
1060fn collect_permission_events(
1061 permissions: &mut Vec<BundlePermission>,
1062 source: &str,
1063 transcript: &Option<JsonValue>,
1064) {
1065 let Some(transcript) = transcript else {
1066 return;
1067 };
1068 for event in transcript
1069 .get("events")
1070 .and_then(JsonValue::as_array)
1071 .into_iter()
1072 .flatten()
1073 {
1074 let kind = event
1075 .get("type")
1076 .or_else(|| event.get("kind"))
1077 .and_then(JsonValue::as_str)
1078 .unwrap_or_default();
1079 if kind.contains("permission") || kind.contains("approval") || kind.starts_with("hitl_") {
1080 permissions.push(BundlePermission {
1081 kind: kind.to_string(),
1082 source: source.to_string(),
1083 request_id: event
1084 .get("request_id")
1085 .or_else(|| event.get("id"))
1086 .and_then(JsonValue::as_str)
1087 .map(str::to_string),
1088 agent: event
1089 .get("agent")
1090 .and_then(JsonValue::as_str)
1091 .map(str::to_string),
1092 payload: event.clone(),
1093 });
1094 }
1095 }
1096}
1097
1098fn event_log_pointers_from_run(run: &RunRecord) -> Vec<BundleEventLogPointer> {
1099 let mut pointers = Vec::new();
1100 if let Some(observability) = &run.observability {
1101 for pointer in &observability.transcript_pointers {
1102 pointers.push(BundleEventLogPointer {
1103 kind: pointer.kind.clone(),
1104 topic: None,
1105 path: pointer.path.clone(),
1106 location: pointer.location.clone(),
1107 available: pointer.available,
1108 });
1109 }
1110 for worker in &observability.worker_lineage {
1111 if let Some(session_id) = &worker.session_id {
1112 pointers.push(BundleEventLogPointer {
1113 kind: "agent_events".to_string(),
1114 topic: Some(format!("observability.agent_events.{session_id}")),
1115 path: worker.snapshot_path.clone(),
1116 location: format!("worker.{}.session", worker.worker_id),
1117 available: worker.snapshot_path.is_some(),
1118 });
1119 }
1120 }
1121 }
1122 pointers
1123}
1124
1125fn deterministic_events_from_run(
1126 run: &RunRecord,
1127) -> Result<Vec<BundleJsonEntry>, SessionBundleError> {
1128 let mut events = Vec::new();
1129 for (index, transition) in run.transitions.iter().enumerate() {
1130 events.push(BundleJsonEntry {
1131 source: "run.transitions".to_string(),
1132 index,
1133 value: serde_json::to_value(transition)
1134 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1135 });
1136 }
1137 for (index, checkpoint) in run.checkpoints.iter().enumerate() {
1138 events.push(BundleJsonEntry {
1139 source: "run.checkpoints".to_string(),
1140 index,
1141 value: serde_json::to_value(checkpoint)
1142 .map_err(|error| SessionBundleError::Encode(error.to_string()))?,
1143 });
1144 }
1145 Ok(events)
1146}
1147
1148fn attachments_from_run(run: &RunRecord) -> Vec<BundleAttachment> {
1149 run.artifacts
1150 .iter()
1151 .map(|artifact| BundleAttachment {
1152 id: artifact.id.clone(),
1153 kind: artifact.kind.clone(),
1154 title: artifact.title.clone(),
1155 stage: artifact.stage.clone(),
1156 text: artifact.text.clone(),
1157 data: artifact.data.clone(),
1158 metadata: artifact.metadata.clone(),
1159 })
1160 .collect()
1161}
1162
1163fn redact_json_with_manifest(
1164 value: &mut JsonValue,
1165 path: &str,
1166 policy: &RedactionPolicy,
1167 entries: &mut Vec<RedactionEntry>,
1168) {
1169 match value {
1170 JsonValue::Object(map) => {
1171 let keys = map.keys().cloned().collect::<Vec<_>>();
1172 for key in keys {
1173 let child_path = json_path_child(path, &key);
1174 if policy.field_is_sensitive(&key) {
1175 map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1176 entries.push(RedactionEntry {
1177 path: child_path,
1178 class: "sensitive_field".to_string(),
1179 action: "replaced".to_string(),
1180 replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1181 });
1182 } else if let Some(child) = map.get_mut(&key) {
1183 redact_json_with_manifest(child, &child_path, policy, entries);
1184 }
1185 }
1186 }
1187 JsonValue::Array(items) => {
1188 for (index, item) in items.iter_mut().enumerate() {
1189 redact_json_with_manifest(item, &format!("{path}[{index}]"), policy, entries);
1190 }
1191 }
1192 JsonValue::String(text) => {
1193 let redacted = policy.redact_string(text);
1194 if let Cow::Owned(replacement) = redacted {
1195 let manifest_replacement = replacement.clone();
1200 *text = replacement;
1201 entries.push(RedactionEntry {
1202 path: path.to_string(),
1203 class: "secret_pattern_or_url".to_string(),
1204 action: "replaced".to_string(),
1205 replacement: Some(manifest_replacement),
1206 });
1207 }
1208 }
1209 _ => {}
1210 }
1211}
1212
1213fn redact_bundle_pointer_paths_json(
1214 value: &mut JsonValue,
1215 path: &str,
1216 entries: &mut Vec<RedactionEntry>,
1217) {
1218 match value {
1219 JsonValue::Object(map) => {
1220 let keys = map.keys().cloned().collect::<Vec<_>>();
1221 for key in keys {
1222 let child_path = json_path_child(path, &key);
1223 if key == "path" && bundle_pointer_path_should_redact(&child_path) {
1224 if !map.get(&key).is_some_and(JsonValue::is_null) {
1225 map.insert(key, JsonValue::String(REDACTED_PLACEHOLDER.to_string()));
1226 entries.push(RedactionEntry {
1227 path: child_path,
1228 class: "local_pointer_path".to_string(),
1229 action: "replaced".to_string(),
1230 replacement: Some(REDACTED_PLACEHOLDER.to_string()),
1231 });
1232 }
1233 } else if let Some(child) = map.get_mut(&key) {
1234 redact_bundle_pointer_paths_json(child, &child_path, entries);
1235 }
1236 }
1237 }
1238 JsonValue::Array(items) => {
1239 for (index, item) in items.iter_mut().enumerate() {
1240 redact_bundle_pointer_paths_json(item, &format!("{path}[{index}]"), entries);
1241 }
1242 }
1243 _ => {}
1244 }
1245}
1246
1247fn bundle_pointer_path_should_redact(path: &str) -> bool {
1248 path.contains(".event_log_pointers[") || path.contains(".transcript_pointers[")
1249}
1250
1251fn withhold_replay_only_json(value: &mut JsonValue, path: &str, entries: &mut Vec<RedactionEntry>) {
1252 match value {
1253 JsonValue::Object(map) => {
1254 let keys = map.keys().cloned().collect::<Vec<_>>();
1255 for key in keys {
1256 let child_path = json_path_child(path, &key);
1257 if replay_only_field_is_prompt_payload(&key) {
1258 if !map.get(&key).is_some_and(JsonValue::is_null) {
1259 map.insert(key, JsonValue::String(REPLAY_ONLY_PLACEHOLDER.to_string()));
1260 entries.push(RedactionEntry {
1261 path: child_path,
1262 class: "prompt_or_tool_payload".to_string(),
1263 action: "withheld".to_string(),
1264 replacement: Some(REPLAY_ONLY_PLACEHOLDER.to_string()),
1265 });
1266 }
1267 } else if let Some(child) = map.get_mut(&key) {
1268 withhold_replay_only_json(child, &child_path, entries);
1269 }
1270 }
1271 }
1272 JsonValue::Array(items) => {
1273 for (index, item) in items.iter_mut().enumerate() {
1274 withhold_replay_only_json(item, &format!("{path}[{index}]"), entries);
1275 }
1276 }
1277 _ => {}
1278 }
1279}
1280
1281fn replay_only_field_is_prompt_payload(key: &str) -> bool {
1282 matches!(
1283 key,
1284 "args"
1285 | "arguments"
1286 | "blocks"
1287 | "content"
1288 | "data"
1289 | "private_reasoning"
1290 | "prompt"
1291 | "raw_input"
1292 | "raw_output"
1293 | "result"
1294 | "response_text"
1295 | "summary"
1296 | "system"
1297 | "system_prompt"
1298 | "task"
1299 | "text"
1300 | "thinking"
1301 | "visible_text"
1302 )
1303}
1304
1305fn set_json_path(value: &mut JsonValue, path: &[&str], replacement: JsonValue) {
1306 let Some((head, tail)) = path.split_first() else {
1307 *value = replacement;
1308 return;
1309 };
1310 if tail.is_empty() {
1311 if let JsonValue::Object(map) = value {
1312 map.insert((*head).to_string(), replacement);
1313 }
1314 return;
1315 }
1316 if let JsonValue::Object(map) = value {
1317 if let Some(child) = map.get_mut(*head) {
1318 set_json_path(child, tail, replacement);
1319 }
1320 }
1321}
1322
1323fn require_field(value: &JsonValue, field: &str) -> Result<(), SessionBundleError> {
1324 if value.get(field).is_some() {
1325 Ok(())
1326 } else {
1327 Err(SessionBundleError::MissingRequired(format!("$.{field}")))
1328 }
1329}
1330
1331fn require_nested_field(value: &JsonValue, path: &[&str]) -> Result<(), SessionBundleError> {
1332 let mut current = value;
1333 for segment in path {
1334 current = current
1335 .get(*segment)
1336 .ok_or_else(|| SessionBundleError::MissingRequired(json_path_from_segments(path)))?;
1337 }
1338 Ok(())
1339}
1340
1341fn json_path_from_segments(path: &[&str]) -> String {
1342 path.iter().fold("$".to_string(), |parent, segment| {
1343 json_path_child(&parent, segment)
1344 })
1345}
1346
1347fn reject_unredacted_secret_markers(
1348 value: &JsonValue,
1349 path: &str,
1350 policy: &RedactionPolicy,
1351) -> Result<(), SessionBundleError> {
1352 match value {
1353 JsonValue::Object(map) => {
1354 for (key, child) in map {
1355 reject_unredacted_secret_markers(child, &json_path_child(path, key), policy)?;
1356 }
1357 }
1358 JsonValue::Array(items) => {
1359 for (index, item) in items.iter().enumerate() {
1360 reject_unredacted_secret_markers(item, &format!("{path}[{index}]"), policy)?;
1361 }
1362 }
1363 JsonValue::String(text) => {
1364 if matches!(policy.redact_string(text), Cow::Owned(_)) {
1365 return Err(SessionBundleError::UnsafeSecretMarker {
1366 path: path.to_string(),
1367 excerpt: secret_excerpt(text),
1368 });
1369 }
1370 }
1371 _ => {}
1372 }
1373 Ok(())
1374}
1375
1376fn secret_excerpt(text: &str) -> String {
1377 let excerpt = text.chars().take(80).collect::<String>();
1378 if text.chars().count() > 80 {
1379 format!("{excerpt}...")
1380 } else {
1381 excerpt
1382 }
1383}
1384
1385fn json_path_child(parent: &str, key: &str) -> String {
1386 if key
1387 .chars()
1388 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
1389 {
1390 format!("{parent}.{key}")
1391 } else {
1392 format!(
1393 "{parent}[{}]",
1394 serde_json::to_string(key).unwrap_or_default()
1395 )
1396 }
1397}