1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use sha2::{Digest, Sha256};
6use time::format_description::well_known::Rfc3339;
7use time::OffsetDateTime;
8
9use crate::event_log::{AnyEventLog, EventId, EventLog, LogError};
10use crate::provenance::event_record_hash_from_headers;
11use crate::redact::{current_policy, RedactionPolicy};
12
13use super::super::ArtifactRecord;
14use super::{
15 LlmUsageRecord, RunCheckpointRecord, RunChildRecord, RunHitlQuestionRecord, RunRecord,
16 RunStageRecord, RunTraceSpanRecord,
17};
18
19pub const RUN_VIEW_SCHEMA: &str = "harn.run_view.v1";
20pub const SESSION_VIEW_SCHEMA: &str = "harn.session_view.v1";
21pub const RUN_VIEW_SCHEMA_VERSION: u32 = 1;
22pub const SESSION_VIEW_SCHEMA_VERSION: u32 = 1;
23pub const SESSION_VIEW_QUERY_METHOD: &str = "harn.session_view.query";
24
25const TEXT_LIMIT: usize = 16 * 1024;
26const PREVIEW_LIMIT: usize = 1200;
27
28#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
29#[serde(default)]
30pub struct ViewProducer {
31 pub name: String,
32 pub version: String,
33}
34
35impl Default for ViewProducer {
36 fn default() -> Self {
37 Self {
38 name: "harn".to_string(),
39 version: env!("CARGO_PKG_VERSION").to_string(),
40 }
41 }
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
45#[serde(default)]
46pub struct ProjectionInfo {
47 pub projection_id: String,
48 pub projection_hash: Option<String>,
49 pub prefix_hash: Option<String>,
50 pub last_event_id: Option<EventId>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct RunView {
56 pub schema: String,
57 pub schema_version: u32,
58 pub producer: ViewProducer,
59 pub run: RunViewRun,
60 pub projection: ProjectionInfo,
61 pub visible_text: Option<String>,
62 pub transcript: TranscriptSummary,
63 pub usage: RunViewUsage,
64 pub providers: Vec<RunViewProvider>,
65 pub stages: Vec<RunViewStage>,
66 pub artifacts: Vec<RunViewArtifact>,
67 pub checkpoints: Vec<RunViewCheckpoint>,
68 pub pending: RunViewPendingState,
69 pub failure: Option<RunViewFailure>,
70 pub metadata: RunViewMetadata,
71}
72
73#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
74#[serde(default)]
75pub struct RunViewRun {
76 pub run_id: String,
77 pub session_id: Option<String>,
78 pub parent_run_id: Option<String>,
79 pub root_run_id: Option<String>,
80 pub parent_session_id: Option<String>,
81 pub child_runs: Vec<RunViewChild>,
82 pub run_path: Option<String>,
83 pub status: String,
84 pub workflow_id: String,
85 pub workflow_name: Option<String>,
86 pub task: String,
87 pub started_at: String,
88 pub finished_at: Option<String>,
89 pub duration_ms: Option<u64>,
90}
91
92#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
93#[serde(default)]
94pub struct RunViewChild {
95 pub worker_id: String,
96 pub worker_name: String,
97 pub run_id: Option<String>,
98 pub session_id: Option<String>,
99 pub parent_session_id: Option<String>,
100 pub run_path: Option<String>,
101 pub status: String,
102 pub task: String,
103}
104
105#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
106#[serde(default)]
107pub struct RunViewUsage {
108 pub input_tokens: i64,
109 pub output_tokens: i64,
110 pub total_duration_ms: i64,
111 pub call_count: i64,
112 pub total_cost: f64,
113 pub models: Vec<String>,
114}
115
116impl From<&LlmUsageRecord> for RunViewUsage {
117 fn from(value: &LlmUsageRecord) -> Self {
118 Self {
119 input_tokens: value.input_tokens,
120 output_tokens: value.output_tokens,
121 total_duration_ms: value.total_duration_ms,
122 call_count: value.call_count,
123 total_cost: value.total_cost,
124 models: value.models.clone(),
125 }
126 }
127}
128
129impl RunViewUsage {
130 fn add_usage(&mut self, usage: &RunViewUsage) {
131 self.input_tokens += usage.input_tokens;
132 self.output_tokens += usage.output_tokens;
133 self.total_duration_ms += usage.total_duration_ms;
134 self.call_count += usage.call_count;
135 self.total_cost += usage.total_cost;
136 for model in &usage.models {
137 if !model.is_empty() && !self.models.contains(model) {
138 self.models.push(model.clone());
139 }
140 }
141 }
142}
143
144#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
145#[serde(default)]
146pub struct RunViewProvider {
147 pub provider: String,
148 pub model: String,
149 pub call_count: i64,
150 pub input_tokens: i64,
151 pub output_tokens: i64,
152 pub cost_usd: f64,
153}
154
155#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
156#[serde(default)]
157pub struct RunViewStage {
158 pub id: String,
159 pub node_id: String,
160 pub kind: String,
161 pub status: String,
162 pub outcome: String,
163 pub branch: Option<String>,
164 pub started_at: String,
165 pub finished_at: Option<String>,
166 pub duration_ms: Option<u64>,
167 pub visible_text: Option<String>,
168 pub usage: RunViewUsage,
169 pub provider: Option<String>,
170 pub model: Option<String>,
171 pub artifact_refs: Vec<String>,
172 pub attempt_count: usize,
173 pub error: Option<String>,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct RunViewArtifact {
179 pub id: String,
180 pub kind: String,
181 pub title: Option<String>,
182 pub source: Option<String>,
183 pub stage: Option<String>,
184 pub estimated_tokens: Option<usize>,
185 pub lineage: Vec<String>,
186 pub preview: Option<String>,
187}
188
189#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
190#[serde(default)]
191pub struct RunViewCheckpoint {
192 pub id: String,
193 pub reason: String,
194 pub ready_count: usize,
195 pub completed_count: usize,
196 pub last_stage_id: Option<String>,
197 pub persisted_at: String,
198}
199
200#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
201#[serde(default)]
202pub struct RunViewPendingState {
203 pub nodes: Vec<String>,
204 pub approvals: Vec<RunViewApproval>,
205 pub auth: Vec<RunViewAuth>,
206}
207
208#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
209#[serde(default)]
210pub struct RunViewApproval {
211 pub request_id: String,
212 pub prompt: String,
213 pub agent: String,
214 pub trace_id: Option<String>,
215 pub asked_at: String,
216}
217
218#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
219#[serde(default)]
220pub struct RunViewAuth {
221 pub provider: Option<String>,
222 pub server: Option<String>,
223 pub scope: Option<String>,
224 pub stage_id: Option<String>,
225 pub message: Option<String>,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
229#[serde(default)]
230pub struct RunViewFailure {
231 pub stage_id: Option<String>,
232 pub node_id: Option<String>,
233 pub status: String,
234 pub outcome: String,
235 pub message: Option<String>,
236}
237
238#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
239#[serde(default)]
240pub struct TranscriptSummary {
241 pub present: bool,
242 pub message_count: usize,
243 pub event_count: usize,
244 pub summary: Option<String>,
245 pub source: Option<String>,
246}
247
248#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
249#[serde(default)]
250pub struct RunViewMetadata {
251 pub record_type: String,
252 pub stage_count: usize,
253 pub transition_count: usize,
254 pub artifact_count: usize,
255 pub checkpoint_count: usize,
256 pub child_run_count: usize,
257 pub observability_present: bool,
258 pub planner_round_count: usize,
259 pub tool_recording_count: usize,
260 pub replay_fixture_id: Option<String>,
261 pub execution: Option<super::RunExecutionRecord>,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
265#[serde(default)]
266pub struct SessionView {
267 pub schema: String,
268 pub schema_version: u32,
269 pub producer: ViewProducer,
270 pub session: SessionViewSession,
271 pub projection: ProjectionInfo,
272 pub runs: Vec<RunView>,
273 pub history: Vec<SessionViewHistoryItem>,
274 pub usage: RunViewUsage,
275 pub pending: RunViewPendingState,
276 pub failure: Option<RunViewFailure>,
277 pub metadata: SessionViewMetadata,
278}
279
280#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
281#[serde(default)]
282pub struct SessionViewSession {
283 pub session_id: Option<String>,
284 pub parent_session_id: Option<String>,
285 pub root_session_id: Option<String>,
286 pub status: String,
287 pub run_count: usize,
288 pub started_at: Option<String>,
289 pub updated_at: Option<String>,
290 pub last_event_id: Option<EventId>,
291 pub chain_root_hash: Option<String>,
292}
293
294#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
295#[serde(default)]
296pub struct SessionViewHistoryItem {
297 pub run_id: String,
298 pub run_path: Option<String>,
299 pub session_id: Option<String>,
300 pub status: String,
301 pub started_at: Option<String>,
302 pub finished_at: Option<String>,
303 pub last_event_id: Option<EventId>,
304 pub visible_text: Option<String>,
305}
306
307#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
308#[serde(default)]
309pub struct SessionViewMetadata {
310 pub record_count: usize,
311 pub event_count: usize,
312 pub has_event_log: bool,
313}
314
315#[derive(Clone, Debug, Default)]
316pub struct RunViewOptions {
317 pub producer: ViewProducer,
318 pub run_path: Option<String>,
319 pub last_event_id: Option<EventId>,
320 pub prefix_hash: Option<String>,
321}
322
323#[derive(Clone, Debug, Default)]
324pub struct SessionViewOptions {
325 pub producer: ViewProducer,
326 pub session_id: Option<String>,
327 pub parent_session_id: Option<String>,
328 pub root_session_id: Option<String>,
329 pub status: Option<String>,
330 pub started_at: Option<String>,
331 pub updated_at: Option<String>,
332 pub last_event_id: Option<EventId>,
333 pub chain_root_hash: Option<String>,
334 pub event_count: usize,
335 pub has_event_log: bool,
336}
337
338#[derive(Debug)]
339pub enum RunViewError {
340 EventLog(LogError),
341}
342
343impl std::fmt::Display for RunViewError {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 match self {
346 Self::EventLog(error) => error.fmt(f),
347 }
348 }
349}
350
351impl std::error::Error for RunViewError {}
352
353impl From<LogError> for RunViewError {
354 fn from(error: LogError) -> Self {
355 Self::EventLog(error)
356 }
357}
358
359pub fn build_run_view(run: &RunRecord) -> RunView {
360 build_run_view_with_options(run, RunViewOptions::default())
361}
362
363pub fn build_run_view_with_path(run: &RunRecord, run_path: Option<impl Into<String>>) -> RunView {
364 build_run_view_with_options(
365 run,
366 RunViewOptions {
367 run_path: run_path.map(Into::into),
368 ..RunViewOptions::default()
369 },
370 )
371}
372
373pub async fn build_run_view_with_event_log(
374 run: &RunRecord,
375 run_path: Option<impl Into<String>>,
376 log: Option<&AnyEventLog>,
377) -> Result<RunView, RunViewError> {
378 let mut options = RunViewOptions {
379 run_path: run_path.map(Into::into),
380 ..RunViewOptions::default()
381 };
382 if let Some(log) = log {
383 if let Some(session_id) = infer_run_session_id(run) {
384 let (last_event_id, prefix_hash) = read_session_tip(log, &session_id).await?;
385 options.last_event_id = last_event_id;
386 options.prefix_hash = prefix_hash;
387 }
388 }
389 Ok(build_run_view_with_options(run, options))
390}
391
392pub fn build_run_view_with_options(run: &RunRecord, options: RunViewOptions) -> RunView {
393 let policy = current_policy();
394 let session_id = infer_run_session_id(run);
395 let parent_session_id = infer_parent_session_id(run);
396 let stages = run
397 .stages
398 .iter()
399 .map(|stage| build_stage_view(stage, &policy))
400 .collect::<Vec<_>>();
401 let visible_text = bounded_join(
402 run.stages
403 .iter()
404 .filter_map(|stage| stage.visible_text.as_deref())
405 .map(|text| redact_bounded(text, &policy, TEXT_LIMIT)),
406 TEXT_LIMIT,
407 );
408 let usage = run
409 .usage
410 .as_ref()
411 .map(RunViewUsage::from)
412 .unwrap_or_else(|| usage_from_stages(&stages));
413 let mut view = RunView {
414 schema: RUN_VIEW_SCHEMA.to_string(),
415 schema_version: RUN_VIEW_SCHEMA_VERSION,
416 producer: options.producer.clone(),
417 run: RunViewRun {
418 run_id: run.id.clone(),
419 session_id,
420 parent_run_id: run.parent_run_id.clone(),
421 root_run_id: run.root_run_id.clone(),
422 parent_session_id,
423 child_runs: run
424 .child_runs
425 .iter()
426 .map(|child| build_child_view(child, &policy))
427 .collect(),
428 run_path: options
429 .run_path
430 .clone()
431 .or_else(|| run.persisted_path.clone()),
432 status: run.status.clone(),
433 workflow_id: run.workflow_id.clone(),
434 workflow_name: run.workflow_name.clone(),
435 task: redact_bounded(&run.task, &policy, TEXT_LIMIT),
436 started_at: run.started_at.clone(),
437 finished_at: run.finished_at.clone(),
438 duration_ms: run_duration_ms(run),
439 },
440 projection: ProjectionInfo {
441 projection_id: String::new(),
442 projection_hash: None,
443 prefix_hash: options.prefix_hash,
444 last_event_id: options.last_event_id,
445 },
446 visible_text,
447 transcript: transcript_summary_for_run(run, &policy),
448 usage,
449 providers: provider_summary(run),
450 stages,
451 artifacts: run
452 .artifacts
453 .iter()
454 .map(|artifact| build_artifact_view(artifact, &policy))
455 .collect(),
456 checkpoints: run.checkpoints.iter().map(build_checkpoint_view).collect(),
457 pending: RunViewPendingState {
458 nodes: run.pending_nodes.clone(),
459 approvals: run
460 .hitl_questions
461 .iter()
462 .map(|question| build_approval_view(question, &policy))
463 .collect(),
464 auth: pending_auth(run, &policy),
465 },
466 failure: failure_summary(run, &policy),
467 metadata: RunViewMetadata {
468 record_type: run.type_name.clone(),
469 stage_count: run.stages.len(),
470 transition_count: run.transitions.len(),
471 artifact_count: run.artifacts.len(),
472 checkpoint_count: run.checkpoints.len(),
473 child_run_count: run.child_runs.len(),
474 observability_present: run.observability.is_some(),
475 planner_round_count: run
476 .observability
477 .as_ref()
478 .map(|observability| observability.planner_rounds.len())
479 .unwrap_or_default(),
480 tool_recording_count: run.tool_recordings.len(),
481 replay_fixture_id: run
482 .replay_fixture
483 .as_ref()
484 .map(|fixture| fixture.id.clone()),
485 execution: run.execution.clone(),
486 },
487 };
488 finalize_run_projection(&mut view);
489 view
490}
491
492pub fn build_session_view_from_run_views(
493 runs: Vec<RunView>,
494 options: SessionViewOptions,
495) -> SessionView {
496 let session_id = options
497 .session_id
498 .clone()
499 .or_else(|| runs.iter().find_map(|run| run.run.session_id.clone()));
500 let mut usage = RunViewUsage::default();
501 let mut pending = RunViewPendingState::default();
502 let mut failure = None;
503 let mut started_at = options.started_at.clone();
504 let mut updated_at = options.updated_at.clone();
505 let history = runs
506 .iter()
507 .map(|run| {
508 usage.add_usage(&run.usage);
509 pending.nodes.extend(run.pending.nodes.clone());
510 pending.approvals.extend(run.pending.approvals.clone());
511 pending.auth.extend(run.pending.auth.clone());
512 if failure.is_none() {
513 failure = run.failure.clone();
514 }
515 if !run.run.started_at.is_empty() {
516 started_at = min_opt_string(started_at.take(), Some(run.run.started_at.clone()));
517 updated_at = max_opt_string(updated_at.take(), Some(run.run.started_at.clone()));
518 }
519 updated_at = max_opt_string(updated_at.take(), run.run.finished_at.clone());
520 SessionViewHistoryItem {
521 run_id: run.run.run_id.clone(),
522 run_path: run.run.run_path.clone(),
523 session_id: run.run.session_id.clone(),
524 status: run.run.status.clone(),
525 started_at: non_empty_string(&run.run.started_at),
526 finished_at: run.run.finished_at.clone(),
527 last_event_id: run.projection.last_event_id,
528 visible_text: run.visible_text.clone(),
529 }
530 })
531 .collect::<Vec<_>>();
532 let status = options
533 .status
534 .clone()
535 .unwrap_or_else(|| aggregate_session_status(&runs));
536 let last_event_id = options.last_event_id.or_else(|| {
537 runs.iter()
538 .filter_map(|run| run.projection.last_event_id)
539 .max()
540 });
541 let chain_root_hash = options.chain_root_hash.clone().or_else(|| {
542 runs.iter()
543 .rev()
544 .find_map(|run| run.projection.prefix_hash.clone())
545 });
546 let mut view = SessionView {
547 schema: SESSION_VIEW_SCHEMA.to_string(),
548 schema_version: SESSION_VIEW_SCHEMA_VERSION,
549 producer: options.producer.clone(),
550 session: SessionViewSession {
551 session_id,
552 parent_session_id: options.parent_session_id.clone().or_else(|| {
553 runs.iter()
554 .find_map(|run| run.run.parent_session_id.clone())
555 }),
556 root_session_id: options.root_session_id.clone(),
557 status,
558 run_count: runs.len(),
559 started_at,
560 updated_at,
561 last_event_id,
562 chain_root_hash,
563 },
564 projection: ProjectionInfo {
565 projection_id: String::new(),
566 projection_hash: None,
567 prefix_hash: None,
568 last_event_id,
569 },
570 runs,
571 history,
572 usage,
573 pending: dedupe_pending(pending),
574 failure,
575 metadata: SessionViewMetadata {
576 record_count: 0,
577 event_count: options.event_count,
578 has_event_log: options.has_event_log,
579 },
580 };
581 view.metadata.record_count = view.runs.len();
582 view.projection.prefix_hash = view.session.chain_root_hash.clone();
583 finalize_session_projection(&mut view);
584 view
585}
586
587pub async fn build_session_view_from_run_records(
588 runs: Vec<(&RunRecord, Option<String>)>,
589 session_id: Option<String>,
590 log: Option<&AnyEventLog>,
591) -> Result<SessionView, RunViewError> {
592 let mut views = Vec::new();
593 for (run, path) in runs {
594 views.push(build_run_view_with_event_log(run, path, log).await?);
595 }
596 let mut options = SessionViewOptions {
597 session_id,
598 has_event_log: log.is_some(),
599 ..SessionViewOptions::default()
600 };
601 if let (Some(log), Some(session_id)) = (log, options.session_id.as_deref()) {
602 let (last_event_id, chain_root_hash) = read_session_tip(log, session_id).await?;
603 options.last_event_id = last_event_id;
604 options.chain_root_hash = chain_root_hash;
605 }
606 Ok(build_session_view_from_run_views(views, options))
607}
608
609pub async fn build_empty_session_view(
610 session_id: Option<String>,
611 log: Option<&AnyEventLog>,
612) -> Result<SessionView, RunViewError> {
613 let mut options = SessionViewOptions {
614 session_id: session_id.clone(),
615 has_event_log: log.is_some(),
616 ..SessionViewOptions::default()
617 };
618 if let (Some(log), Some(session_id)) = (log, session_id.as_deref()) {
619 let (last_event_id, chain_root_hash) = read_session_tip(log, session_id).await?;
620 options.last_event_id = last_event_id;
621 options.chain_root_hash = chain_root_hash;
622 }
623 Ok(build_session_view_from_run_views(Vec::new(), options))
624}
625
626async fn read_session_tip(
627 log: &AnyEventLog,
628 session_id: &str,
629) -> Result<(Option<EventId>, Option<String>), LogError> {
630 let topic = crate::session_timeline::agent_events_topic(session_id);
631 let Some(latest) = log.latest(&topic).await? else {
632 return Ok((None, None));
633 };
634 let from = latest.checked_sub(1);
635 let events = log.read_range(&topic, from, 1).await?;
636 let prefix_hash = events
637 .into_iter()
638 .find(|(event_id, _)| *event_id == latest)
639 .and_then(|(event_id, event)| {
640 event_record_hash_from_headers(topic.as_str(), event_id, &event).ok()
641 });
642 Ok((Some(latest), prefix_hash))
643}
644
645fn build_child_view(child: &RunChildRecord, policy: &RedactionPolicy) -> RunViewChild {
646 RunViewChild {
647 worker_id: child.worker_id.clone(),
648 worker_name: child.worker_name.clone(),
649 run_id: child.run_id.clone(),
650 session_id: child.session_id.clone(),
651 parent_session_id: child.parent_session_id.clone(),
652 run_path: child.run_path.clone(),
653 status: child.status.clone(),
654 task: redact_bounded(&child.task, policy, TEXT_LIMIT),
655 }
656}
657
658fn build_stage_view(stage: &RunStageRecord, policy: &RedactionPolicy) -> RunViewStage {
659 let usage = stage
660 .usage
661 .as_ref()
662 .map(RunViewUsage::from)
663 .unwrap_or_default();
664 let artifact_refs = stage
665 .produced_artifact_ids
666 .iter()
667 .chain(stage.artifacts.iter().map(|artifact| &artifact.id))
668 .filter(|id| !id.is_empty())
669 .cloned()
670 .collect::<BTreeSet<_>>()
671 .into_iter()
672 .collect();
673 RunViewStage {
674 id: stage.id.clone(),
675 node_id: stage.node_id.clone(),
676 kind: stage.kind.clone(),
677 status: stage.status.clone(),
678 outcome: stage.outcome.clone(),
679 branch: stage.branch.clone(),
680 started_at: stage.started_at.clone(),
681 finished_at: stage.finished_at.clone(),
682 duration_ms: stage_duration_ms(stage),
683 visible_text: stage
684 .visible_text
685 .as_deref()
686 .map(|text| redact_bounded(text, policy, TEXT_LIMIT)),
687 usage,
688 provider: metadata_string_any(&stage.metadata, &["provider"])
689 .or_else(|| metadata_path_string(&stage.metadata, &["model_policy", "provider"])),
690 model: metadata_string_any(&stage.metadata, &["model"])
691 .or_else(|| metadata_path_string(&stage.metadata, &["model_policy", "model"])),
692 artifact_refs,
693 attempt_count: stage.attempts.len(),
694 error: stage_error(stage, policy),
695 }
696}
697
698fn build_artifact_view(artifact: &ArtifactRecord, policy: &RedactionPolicy) -> RunViewArtifact {
699 RunViewArtifact {
700 id: artifact.id.clone(),
701 kind: artifact.kind.clone(),
702 title: artifact.title.clone(),
703 source: artifact.source.clone(),
704 stage: artifact.stage.clone(),
705 estimated_tokens: artifact.estimated_tokens,
706 lineage: artifact.lineage.clone(),
707 preview: artifact
708 .text
709 .as_deref()
710 .map(|text| redact_bounded(text, policy, PREVIEW_LIMIT))
711 .or_else(|| {
712 artifact
713 .data
714 .as_ref()
715 .map(|data| redact_json_preview(data, policy))
716 }),
717 }
718}
719
720fn build_checkpoint_view(checkpoint: &RunCheckpointRecord) -> RunViewCheckpoint {
721 RunViewCheckpoint {
722 id: checkpoint.id.clone(),
723 reason: checkpoint.reason.clone(),
724 ready_count: checkpoint.ready_nodes.len(),
725 completed_count: checkpoint.completed_nodes.len(),
726 last_stage_id: checkpoint.last_stage_id.clone(),
727 persisted_at: checkpoint.persisted_at.clone(),
728 }
729}
730
731fn build_approval_view(
732 question: &RunHitlQuestionRecord,
733 policy: &RedactionPolicy,
734) -> RunViewApproval {
735 RunViewApproval {
736 request_id: question.request_id.clone(),
737 prompt: redact_bounded(&question.prompt, policy, PREVIEW_LIMIT),
738 agent: question.agent.clone(),
739 trace_id: question.trace_id.clone(),
740 asked_at: question.asked_at.clone(),
741 }
742}
743
744fn provider_summary(run: &RunRecord) -> Vec<RunViewProvider> {
745 let mut providers = BTreeMap::<(String, String), RunViewProvider>::new();
746 for span in run
747 .trace_spans
748 .iter()
749 .filter(|span| span.kind == "llm_call")
750 {
751 let provider = span
752 .metadata
753 .get("provider")
754 .and_then(Value::as_str)
755 .unwrap_or("unknown")
756 .to_string();
757 let model = span
758 .metadata
759 .get("model")
760 .and_then(Value::as_str)
761 .unwrap_or("unknown")
762 .to_string();
763 let input_tokens = metadata_i64(&span.metadata, "input_tokens");
764 let output_tokens = metadata_i64(&span.metadata, "output_tokens");
765 let cost_usd = span
766 .metadata
767 .get("cost_usd")
768 .and_then(Value::as_f64)
769 .unwrap_or_else(|| {
770 crate::llm::calculate_cost_for_provider(
771 &provider,
772 &model,
773 input_tokens,
774 output_tokens,
775 )
776 });
777 let entry = providers
778 .entry((provider.clone(), model.clone()))
779 .or_insert_with(|| RunViewProvider {
780 provider,
781 model,
782 ..RunViewProvider::default()
783 });
784 entry.call_count += 1;
785 entry.input_tokens += input_tokens;
786 entry.output_tokens += output_tokens;
787 entry.cost_usd += cost_usd;
788 }
789 if providers.is_empty() {
790 if let Some(usage) = &run.usage {
791 for model in &usage.models {
792 if model.is_empty() {
793 continue;
794 }
795 providers.insert(
796 ("unknown".to_string(), model.clone()),
797 RunViewProvider {
798 provider: "unknown".to_string(),
799 model: model.clone(),
800 call_count: usage.call_count,
801 input_tokens: usage.input_tokens,
802 output_tokens: usage.output_tokens,
803 cost_usd: usage.total_cost,
804 },
805 );
806 }
807 }
808 }
809 providers.into_values().collect()
810}
811
812fn transcript_summary_for_run(run: &RunRecord, policy: &RedactionPolicy) -> TranscriptSummary {
813 if let Some(transcript) = run.transcript.as_ref() {
814 return transcript_summary(Some(transcript), policy);
815 }
816 transcript_summary_from_stages(&run.stages, policy)
817}
818
819fn transcript_summary_from_stages(
820 stages: &[RunStageRecord],
821 policy: &RedactionPolicy,
822) -> TranscriptSummary {
823 let mut out = TranscriptSummary::default();
824 let mut summaries = Vec::new();
825 for stage in stages {
826 let Some(transcript) = stage.transcript.as_ref() else {
827 continue;
828 };
829 out.present = true;
830 out.message_count += count_array_field(transcript, "messages");
831 out.event_count += count_array_field(transcript, "events");
832 if let Some(summary) = transcript_summary(Some(transcript), policy).summary {
833 let label = non_empty_string(&stage.node_id)
834 .or_else(|| non_empty_string(&stage.id))
835 .unwrap_or_else(|| "stage".to_string());
836 summaries.push(format!("{label}: {summary}"));
837 }
838 }
839 if out.present {
840 out.summary = bounded_join(summaries, PREVIEW_LIMIT);
841 out.source = Some("stages".to_string());
842 }
843 out
844}
845
846fn transcript_summary(value: Option<&Value>, policy: &RedactionPolicy) -> TranscriptSummary {
847 let Some(value) = value else {
848 return TranscriptSummary::default();
849 };
850 TranscriptSummary {
851 present: true,
852 message_count: count_array_field(value, "messages"),
853 event_count: count_array_field(value, "events"),
854 summary: value
855 .get("summary")
856 .and_then(Value::as_str)
857 .map(|text| redact_bounded(text, policy, PREVIEW_LIMIT))
858 .or_else(|| {
859 value
860 .get("summary")
861 .map(|value| redact_json_preview(value, policy))
862 }),
863 source: value
864 .get("source")
865 .and_then(Value::as_str)
866 .map(str::to_string),
867 }
868}
869
870fn pending_auth(run: &RunRecord, policy: &RedactionPolicy) -> Vec<RunViewAuth> {
871 let mut auth = Vec::new();
872 collect_auth_from_metadata(None, &run.metadata, &mut auth, policy);
873 for stage in &run.stages {
874 collect_auth_from_metadata(Some(&stage.id), &stage.metadata, &mut auth, policy);
875 }
876 auth
877}
878
879fn collect_auth_from_metadata(
880 stage_id: Option<&str>,
881 metadata: &BTreeMap<String, Value>,
882 out: &mut Vec<RunViewAuth>,
883 policy: &RedactionPolicy,
884) {
885 for key in ["pending_auth", "auth_required", "mcp_auth_required"] {
886 let Some(value) = metadata.get(key) else {
887 continue;
888 };
889 match value {
890 Value::Array(items) => {
891 for item in items {
892 out.push(auth_from_value(stage_id, item, policy));
893 }
894 }
895 Value::Object(_) => out.push(auth_from_value(stage_id, value, policy)),
896 Value::Bool(true) => out.push(RunViewAuth {
897 stage_id: stage_id.map(str::to_string),
898 ..RunViewAuth::default()
899 }),
900 Value::String(message) => out.push(RunViewAuth {
901 stage_id: stage_id.map(str::to_string),
902 message: Some(redact_bounded(message, policy, PREVIEW_LIMIT)),
903 ..RunViewAuth::default()
904 }),
905 _ => {}
906 }
907 }
908}
909
910fn auth_from_value(stage_id: Option<&str>, value: &Value, policy: &RedactionPolicy) -> RunViewAuth {
911 let object = value.as_object();
912 let field = |name: &str| {
913 object
914 .and_then(|object| object.get(name))
915 .and_then(Value::as_str)
916 .map(|text| redact_bounded(text, policy, PREVIEW_LIMIT))
917 };
918 RunViewAuth {
919 provider: field("provider"),
920 server: field("server").or_else(|| field("server_name")),
921 scope: field("scope"),
922 stage_id: stage_id.map(str::to_string).or_else(|| field("stage_id")),
923 message: field("message").or_else(|| Some(redact_json_preview(value, policy))),
924 }
925}
926
927fn failure_summary(run: &RunRecord, policy: &RedactionPolicy) -> Option<RunViewFailure> {
928 run.stages
929 .iter()
930 .rev()
931 .find(|stage| failed_status(&stage.status) || failed_status(&stage.outcome))
932 .map(|stage| RunViewFailure {
933 stage_id: Some(stage.id.clone()),
934 node_id: Some(stage.node_id.clone()),
935 status: stage.status.clone(),
936 outcome: stage.outcome.clone(),
937 message: stage_error(stage, policy)
938 .or_else(|| Some(format!("{} failed with {}", stage.node_id, stage.outcome))),
939 })
940 .or_else(|| {
941 failed_status(&run.status).then(|| RunViewFailure {
942 status: run.status.clone(),
943 outcome: run.status.clone(),
944 ..RunViewFailure::default()
945 })
946 })
947}
948
949fn stage_error(stage: &RunStageRecord, policy: &RedactionPolicy) -> Option<String> {
950 stage
951 .metadata
952 .get("error")
953 .map(|value| redact_json_preview(value, policy))
954 .or_else(|| {
955 stage
956 .attempts
957 .iter()
958 .rev()
959 .find_map(|attempt| attempt.error.as_deref())
960 .map(|error| redact_bounded(error, policy, PREVIEW_LIMIT))
961 })
962}
963
964fn failed_status(value: &str) -> bool {
965 matches!(
966 value,
967 "failed" | "error" | "errored" | "cancelled" | "canceled" | "timeout" | "timed_out"
968 )
969}
970
971fn usage_from_stages(stages: &[RunViewStage]) -> RunViewUsage {
972 let mut usage = RunViewUsage::default();
973 for stage in stages {
974 usage.add_usage(&stage.usage);
975 }
976 usage
977}
978
979fn run_duration_ms(run: &RunRecord) -> Option<u64> {
980 let from_usage = run
981 .usage
982 .as_ref()
983 .and_then(|usage| u64::try_from(usage.total_duration_ms).ok())
984 .filter(|duration| *duration > 0);
985 let from_spans = run
986 .trace_spans
987 .iter()
988 .map(trace_span_end_ms)
989 .max()
990 .filter(|duration| *duration > 0);
991 let from_timestamps = run
992 .finished_at
993 .as_deref()
994 .and_then(|finished| timestamp_delta_ms(&run.started_at, finished));
995 from_timestamps.or(from_spans).or(from_usage)
996}
997
998fn stage_duration_ms(stage: &RunStageRecord) -> Option<u64> {
999 stage
1000 .usage
1001 .as_ref()
1002 .and_then(|usage| u64::try_from(usage.total_duration_ms).ok())
1003 .filter(|duration| *duration > 0)
1004 .or_else(|| {
1005 stage
1006 .finished_at
1007 .as_deref()
1008 .and_then(|finished| timestamp_delta_ms(&stage.started_at, finished))
1009 })
1010}
1011
1012fn timestamp_delta_ms(started_at: &str, finished_at: &str) -> Option<u64> {
1013 let start = parse_timestamp_ms(started_at)?;
1014 let end = parse_timestamp_ms(finished_at)?;
1015 u64::try_from(end.saturating_sub(start)).ok()
1016}
1017
1018fn parse_timestamp_ms(value: &str) -> Option<i128> {
1019 if value.trim().is_empty() {
1020 return None;
1021 }
1022 if let Ok(seconds) = value.parse::<i128>() {
1023 return Some(seconds.saturating_mul(1000));
1024 }
1025 let parsed = OffsetDateTime::parse(value, &Rfc3339).ok()?;
1026 Some(
1027 i128::from(parsed.unix_timestamp()).saturating_mul(1000) + i128::from(parsed.millisecond()),
1028 )
1029}
1030
1031fn trace_span_end_ms(span: &RunTraceSpanRecord) -> u64 {
1032 span.start_ms.saturating_add(span.duration_ms)
1033}
1034
1035fn infer_run_session_id(run: &RunRecord) -> Option<String> {
1036 metadata_string_any(&run.metadata, &["session_id", "agent_session_id"])
1037 .or_else(|| metadata_path_string(&run.metadata, &["model_policy", "session_id"]))
1038 .or_else(|| metadata_path_string(&run.metadata, &["audit", "session_id"]))
1039 .or_else(|| {
1040 run.child_runs
1041 .iter()
1042 .find_map(|child| child.session_id.clone())
1043 })
1044 .or_else(|| {
1045 run.stages.iter().find_map(|stage| {
1046 metadata_string_any(&stage.metadata, &["session_id", "agent_session_id"])
1047 .or_else(|| {
1048 metadata_path_string(&stage.metadata, &["model_policy", "session_id"])
1049 })
1050 .or_else(|| metadata_path_string(&stage.metadata, &["audit", "session_id"]))
1051 .or_else(|| {
1052 metadata_path_string(&stage.metadata, &["worker", "audit", "session_id"])
1053 })
1054 })
1055 })
1056 .or_else(|| {
1057 run.trace_spans.iter().find_map(|span| {
1058 metadata_string_any(&span.metadata, &["session_id", "agent_session_id"])
1059 })
1060 })
1061}
1062
1063fn infer_parent_session_id(run: &RunRecord) -> Option<String> {
1064 metadata_string_any(&run.metadata, &["parent_session_id"])
1065 .or_else(|| metadata_path_string(&run.metadata, &["audit", "parent_session_id"]))
1066 .or_else(|| {
1067 run.child_runs
1068 .iter()
1069 .find_map(|child| child.parent_session_id.clone())
1070 })
1071 .or_else(|| {
1072 run.stages.iter().find_map(|stage| {
1073 metadata_string_any(&stage.metadata, &["parent_session_id"])
1074 .or_else(|| {
1075 metadata_path_string(&stage.metadata, &["audit", "parent_session_id"])
1076 })
1077 .or_else(|| {
1078 metadata_path_string(
1079 &stage.metadata,
1080 &["worker", "audit", "parent_session_id"],
1081 )
1082 })
1083 })
1084 })
1085}
1086
1087fn metadata_string_any(metadata: &BTreeMap<String, Value>, keys: &[&str]) -> Option<String> {
1088 keys.iter()
1089 .find_map(|key| metadata.get(*key).and_then(Value::as_str))
1090 .filter(|value| !value.is_empty())
1091 .map(str::to_string)
1092}
1093
1094fn metadata_path_string(metadata: &BTreeMap<String, Value>, path: &[&str]) -> Option<String> {
1095 let mut value = metadata.get(*path.first()?)?;
1096 for key in &path[1..] {
1097 value = value.get(*key)?;
1098 }
1099 value
1100 .as_str()
1101 .filter(|value| !value.is_empty())
1102 .map(str::to_string)
1103}
1104
1105fn metadata_i64(metadata: &BTreeMap<String, Value>, key: &str) -> i64 {
1106 metadata
1107 .get(key)
1108 .and_then(Value::as_i64)
1109 .or_else(|| {
1110 metadata
1111 .get(key)
1112 .and_then(Value::as_u64)
1113 .and_then(|value| i64::try_from(value).ok())
1114 })
1115 .unwrap_or_default()
1116}
1117
1118fn count_array_field(value: &Value, field: &str) -> usize {
1119 value
1120 .get(field)
1121 .and_then(Value::as_array)
1122 .map(Vec::len)
1123 .unwrap_or_default()
1124}
1125
1126fn redact_json_preview(value: &Value, policy: &RedactionPolicy) -> String {
1127 let mut value = value.clone();
1128 policy.redact_json_in_place(&mut value);
1129 bounded_text(
1130 &serde_json::to_string(&value).unwrap_or_default(),
1131 PREVIEW_LIMIT,
1132 )
1133}
1134
1135fn redact_bounded(text: &str, policy: &RedactionPolicy, limit: usize) -> String {
1136 let redacted = policy.redact_string(text);
1137 bounded_text(redacted.as_ref(), limit)
1138}
1139
1140fn bounded_text(text: &str, limit: usize) -> String {
1141 if text.len() <= limit {
1142 return text.to_string();
1143 }
1144 let boundary = text
1145 .char_indices()
1146 .map(|(index, _)| index)
1147 .take_while(|index| *index <= limit)
1148 .last()
1149 .unwrap_or(0);
1150 format!("{}...", &text[..boundary])
1151}
1152
1153fn bounded_join(values: impl IntoIterator<Item = String>, limit: usize) -> Option<String> {
1154 let mut out = String::new();
1155 for value in values {
1156 if value.is_empty() {
1157 continue;
1158 }
1159 if !out.is_empty() {
1160 out.push_str("\n\n");
1161 }
1162 out.push_str(&value);
1163 if out.len() > limit {
1164 return Some(bounded_text(&out, limit));
1165 }
1166 }
1167 non_empty_string(&out)
1168}
1169
1170fn non_empty_string(value: &str) -> Option<String> {
1171 (!value.is_empty()).then(|| value.to_string())
1172}
1173
1174fn min_opt_string(left: Option<String>, right: Option<String>) -> Option<String> {
1175 match (left, right) {
1176 (Some(left), Some(right)) => Some(left.min(right)),
1177 (Some(left), None) => Some(left),
1178 (None, Some(right)) => Some(right),
1179 (None, None) => None,
1180 }
1181}
1182
1183fn max_opt_string(left: Option<String>, right: Option<String>) -> Option<String> {
1184 match (left, right) {
1185 (Some(left), Some(right)) => Some(left.max(right)),
1186 (Some(left), None) => Some(left),
1187 (None, Some(right)) => Some(right),
1188 (None, None) => None,
1189 }
1190}
1191
1192fn aggregate_session_status(runs: &[RunView]) -> String {
1193 if runs.is_empty() {
1194 return "unknown".to_string();
1195 }
1196 if runs
1197 .iter()
1198 .any(|run| failed_status(&run.run.status) || run.failure.is_some())
1199 {
1200 return "failed".to_string();
1201 }
1202 if runs.iter().all(|run| {
1203 matches!(
1204 run.run.status.as_str(),
1205 "completed" | "succeeded" | "success" | "ok"
1206 )
1207 }) {
1208 return "completed".to_string();
1209 }
1210 "active".to_string()
1211}
1212
1213fn dedupe_pending(mut pending: RunViewPendingState) -> RunViewPendingState {
1214 let mut nodes = BTreeSet::new();
1215 pending.nodes.retain(|node| nodes.insert(node.clone()));
1216 let mut approvals = BTreeSet::new();
1217 pending
1218 .approvals
1219 .retain(|approval| approvals.insert(approval.request_id.clone()));
1220 let mut auth_seen = BTreeSet::new();
1221 pending.auth.retain(|item| {
1222 auth_seen.insert((
1223 item.provider.clone(),
1224 item.server.clone(),
1225 item.scope.clone(),
1226 item.stage_id.clone(),
1227 ))
1228 });
1229 pending
1230}
1231
1232fn finalize_run_projection(view: &mut RunView) {
1233 if let Some(hash) = projection_hash(RUN_VIEW_SCHEMA, view) {
1234 view.projection.projection_id =
1235 format!("run_view:{}:{}", view.run.run_id, hash_suffix(&hash));
1236 view.projection.projection_hash = Some(hash);
1237 } else {
1238 view.projection.projection_id = format!("run_view:{}", view.run.run_id);
1239 }
1240}
1241
1242fn finalize_session_projection(view: &mut SessionView) {
1243 let id = view
1244 .session
1245 .session_id
1246 .clone()
1247 .unwrap_or_else(|| "unknown".to_string());
1248 if let Some(hash) = projection_hash(SESSION_VIEW_SCHEMA, view) {
1249 view.projection.projection_id = format!("session_view:{id}:{}", hash_suffix(&hash));
1250 view.projection.projection_hash = Some(hash);
1251 } else {
1252 view.projection.projection_id = format!("session_view:{id}");
1253 }
1254}
1255
1256fn projection_hash<T: Serialize>(schema: &str, value: &T) -> Option<String> {
1257 let mut value = serde_json::to_value(value).ok()?;
1258 if let Some(projection) = value
1259 .as_object_mut()
1260 .and_then(|object| object.get_mut("projection"))
1261 .and_then(Value::as_object_mut)
1262 {
1263 projection.remove("projection_id");
1264 projection.remove("projection_hash");
1265 }
1266 let bytes = serde_json::to_vec(&value).ok()?;
1267 let mut hasher = Sha256::new();
1268 hasher.update(schema.as_bytes());
1269 hasher.update([0]);
1270 hasher.update(bytes);
1271 Some(format!("sha256:{}", hex::encode(hasher.finalize())))
1272}
1273
1274fn hash_suffix(hash: &str) -> String {
1275 hash.strip_prefix("sha256:")
1276 .unwrap_or(hash)
1277 .chars()
1278 .take(12)
1279 .collect()
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284 use serde_json::json;
1285
1286 use super::*;
1287
1288 fn sample_run() -> RunRecord {
1289 RunRecord {
1290 type_name: "run_record".to_string(),
1291 id: "run_1".to_string(),
1292 workflow_id: "wf".to_string(),
1293 workflow_name: Some("Workflow".to_string()),
1294 task: "do work".to_string(),
1295 status: "completed".to_string(),
1296 started_at: "2026-01-01T00:00:00Z".to_string(),
1297 finished_at: Some("2026-01-01T00:00:02Z".to_string()),
1298 stages: vec![RunStageRecord {
1299 id: "stage_1".to_string(),
1300 node_id: "plan".to_string(),
1301 kind: "llm".to_string(),
1302 status: "completed".to_string(),
1303 outcome: "ok".to_string(),
1304 started_at: "2026-01-01T00:00:00Z".to_string(),
1305 finished_at: Some("2026-01-01T00:00:01Z".to_string()),
1306 visible_text: Some("done".to_string()),
1307 usage: Some(LlmUsageRecord {
1308 input_tokens: 10,
1309 output_tokens: 5,
1310 total_duration_ms: 1000,
1311 call_count: 1,
1312 total_cost: 0.01,
1313 models: vec!["model-a".to_string()],
1314 }),
1315 metadata: BTreeMap::from([
1316 ("session_id".to_string(), json!("session_1")),
1317 ("provider".to_string(), json!("test")),
1318 ("model".to_string(), json!("model-a")),
1319 ]),
1320 ..RunStageRecord::default()
1321 }],
1322 trace_spans: vec![RunTraceSpanRecord {
1323 kind: "llm_call".to_string(),
1324 metadata: BTreeMap::from([
1325 ("provider".to_string(), json!("test")),
1326 ("model".to_string(), json!("model-a")),
1327 ("input_tokens".to_string(), json!(10)),
1328 ("output_tokens".to_string(), json!(5)),
1329 ("cost_usd".to_string(), json!(0.01)),
1330 ]),
1331 ..RunTraceSpanRecord::default()
1332 }],
1333 transcript: Some(json!({
1334 "source": "inline",
1335 "summary": "short",
1336 "messages": [{"role": "assistant"}],
1337 "events": [{"kind": "output"}]
1338 })),
1339 ..RunRecord::default()
1340 }
1341 }
1342
1343 #[test]
1344 fn build_run_view_projects_stable_public_fields() {
1345 let view = build_run_view_with_path(&sample_run(), Some("runs/run_1.json"));
1346 assert_eq!(view.schema, RUN_VIEW_SCHEMA);
1347 assert_eq!(view.schema_version, RUN_VIEW_SCHEMA_VERSION);
1348 assert_eq!(view.run.run_id, "run_1");
1349 assert_eq!(view.run.session_id.as_deref(), Some("session_1"));
1350 assert_eq!(view.run.run_path.as_deref(), Some("runs/run_1.json"));
1351 assert_eq!(view.run.duration_ms, Some(2000));
1352 assert_eq!(view.visible_text.as_deref(), Some("done"));
1353 assert_eq!(view.transcript.message_count, 1);
1354 assert_eq!(view.usage.input_tokens, 10);
1355 assert_eq!(view.providers.len(), 1);
1356 assert!(view.projection.projection_id.starts_with("run_view:run_1:"));
1357 assert!(view.projection.projection_hash.is_some());
1358 }
1359
1360 #[test]
1361 fn build_run_view_tolerates_sparse_legacy_records() {
1362 let run = RunRecord {
1363 type_name: "run_record".to_string(),
1364 id: "legacy".to_string(),
1365 status: "failed".to_string(),
1366 ..RunRecord::default()
1367 };
1368 let view = build_run_view(&run);
1369 assert_eq!(view.run.run_id, "legacy");
1370 assert_eq!(view.run.session_id, None);
1371 assert!(!view.transcript.present);
1372 assert_eq!(
1373 view.failure.as_ref().map(|failure| failure.status.as_str()),
1374 Some("failed")
1375 );
1376 }
1377
1378 #[test]
1379 fn build_session_view_aggregates_runs() {
1380 let run = build_run_view(&sample_run());
1381 let view = build_session_view_from_run_views(
1382 vec![run],
1383 SessionViewOptions {
1384 session_id: Some("session_1".to_string()),
1385 last_event_id: Some(7),
1386 chain_root_hash: Some("sha256:abc".to_string()),
1387 ..SessionViewOptions::default()
1388 },
1389 );
1390 assert_eq!(view.schema, SESSION_VIEW_SCHEMA);
1391 assert_eq!(view.session.session_id.as_deref(), Some("session_1"));
1392 assert_eq!(view.session.last_event_id, Some(7));
1393 assert_eq!(view.session.chain_root_hash.as_deref(), Some("sha256:abc"));
1394 assert_eq!(view.history.len(), 1);
1395 assert_eq!(view.usage.call_count, 1);
1396 assert!(view
1397 .projection
1398 .projection_id
1399 .starts_with("session_view:session_1:"));
1400 }
1401
1402 #[test]
1403 fn build_run_view_summarizes_stage_only_transcripts() {
1404 let mut run = sample_run();
1405 run.transcript = None;
1406 run.stages[0].transcript = Some(json!({
1407 "summary": "stage transcript only",
1408 "messages": [{"role": "assistant"}, {"role": "tool"}],
1409 "events": [{"kind": "tool_result"}]
1410 }));
1411
1412 let view = build_run_view(&run);
1413 assert!(view.transcript.present);
1414 assert_eq!(view.transcript.source.as_deref(), Some("stages"));
1415 assert_eq!(view.transcript.message_count, 2);
1416 assert_eq!(view.transcript.event_count, 1);
1417 assert_eq!(
1418 view.transcript.summary.as_deref(),
1419 Some("plan: stage transcript only")
1420 );
1421 }
1422
1423 #[test]
1424 fn build_run_view_redacts_child_tasks_and_approvals() {
1425 let mut run = sample_run();
1426 run.child_runs.push(RunChildRecord {
1427 worker_id: "worker_1".to_string(),
1428 worker_name: "worker".to_string(),
1429 task: "inspect AKIAABCDEFGHIJKLMNOP".to_string(),
1430 ..RunChildRecord::default()
1431 });
1432 run.hitl_questions.push(RunHitlQuestionRecord {
1433 request_id: "approval_1".to_string(),
1434 prompt: "approve AKIAABCDEFGHIJKLMNOP".to_string(),
1435 ..RunHitlQuestionRecord::default()
1436 });
1437
1438 let view = build_run_view(&run);
1439 assert!(!view.run.child_runs[0].task.contains("AKIAABCDEFGHIJKLMNOP"));
1440 assert!(!view.pending.approvals[0]
1441 .prompt
1442 .contains("AKIAABCDEFGHIJKLMNOP"));
1443 }
1444}