Skip to main content

harn_vm/orchestration/records/
view.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use sha2::{Digest, Sha256};
6use time::format_description::well_known::Rfc3339;
7use time::OffsetDateTime;
8
9use crate::event_log::{AnyEventLog, EventId, EventLog, LogError};
10use crate::provenance::event_record_hash_from_headers;
11use crate::redact::{current_policy, RedactionPolicy};
12
13use super::super::ArtifactRecord;
14use super::{
15    LlmUsageRecord, RunCheckpointRecord, RunChildRecord, RunHitlQuestionRecord, RunRecord,
16    RunStageRecord, RunTraceSpanRecord,
17};
18
19pub const RUN_VIEW_SCHEMA: &str = "harn.run_view.v1";
20pub const SESSION_VIEW_SCHEMA: &str = "harn.session_view.v1";
21pub const RUN_VIEW_SCHEMA_VERSION: u32 = 1;
22pub const SESSION_VIEW_SCHEMA_VERSION: u32 = 1;
23pub const SESSION_VIEW_QUERY_METHOD: &str = "harn.session_view.query";
24
25const TEXT_LIMIT: usize = 16 * 1024;
26const PREVIEW_LIMIT: usize = 1200;
27
28#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
29#[serde(default)]
30pub struct ViewProducer {
31    pub name: String,
32    pub version: String,
33}
34
35impl Default for ViewProducer {
36    fn default() -> Self {
37        Self {
38            name: "harn".to_string(),
39            version: env!("CARGO_PKG_VERSION").to_string(),
40        }
41    }
42}
43
44#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
45#[serde(default)]
46pub struct ProjectionInfo {
47    pub projection_id: String,
48    pub projection_hash: Option<String>,
49    pub prefix_hash: Option<String>,
50    pub last_event_id: Option<EventId>,
51}
52
53#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
54#[serde(default)]
55pub struct RunView {
56    pub schema: String,
57    pub schema_version: u32,
58    pub producer: ViewProducer,
59    pub run: RunViewRun,
60    pub projection: ProjectionInfo,
61    pub visible_text: Option<String>,
62    pub transcript: TranscriptSummary,
63    pub usage: RunViewUsage,
64    pub providers: Vec<RunViewProvider>,
65    pub stages: Vec<RunViewStage>,
66    pub artifacts: Vec<RunViewArtifact>,
67    pub checkpoints: Vec<RunViewCheckpoint>,
68    pub pending: RunViewPendingState,
69    pub failure: Option<RunViewFailure>,
70    pub metadata: RunViewMetadata,
71}
72
73#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
74#[serde(default)]
75pub struct RunViewRun {
76    pub run_id: String,
77    pub session_id: Option<String>,
78    pub parent_run_id: Option<String>,
79    pub root_run_id: Option<String>,
80    pub parent_session_id: Option<String>,
81    pub child_runs: Vec<RunViewChild>,
82    pub run_path: Option<String>,
83    pub status: String,
84    pub workflow_id: String,
85    pub workflow_name: Option<String>,
86    pub task: String,
87    pub started_at: String,
88    pub finished_at: Option<String>,
89    pub duration_ms: Option<u64>,
90}
91
92#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
93#[serde(default)]
94pub struct RunViewChild {
95    pub worker_id: String,
96    pub worker_name: String,
97    pub run_id: Option<String>,
98    pub session_id: Option<String>,
99    pub parent_session_id: Option<String>,
100    pub run_path: Option<String>,
101    pub status: String,
102    pub task: String,
103}
104
105#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
106#[serde(default)]
107pub struct RunViewUsage {
108    pub input_tokens: i64,
109    pub output_tokens: i64,
110    pub total_duration_ms: i64,
111    pub call_count: i64,
112    pub total_cost: f64,
113    pub models: Vec<String>,
114}
115
116impl From<&LlmUsageRecord> for RunViewUsage {
117    fn from(value: &LlmUsageRecord) -> Self {
118        Self {
119            input_tokens: value.input_tokens,
120            output_tokens: value.output_tokens,
121            total_duration_ms: value.total_duration_ms,
122            call_count: value.call_count,
123            total_cost: value.total_cost,
124            models: value.models.clone(),
125        }
126    }
127}
128
129impl RunViewUsage {
130    fn add_usage(&mut self, usage: &RunViewUsage) {
131        self.input_tokens += usage.input_tokens;
132        self.output_tokens += usage.output_tokens;
133        self.total_duration_ms += usage.total_duration_ms;
134        self.call_count += usage.call_count;
135        self.total_cost += usage.total_cost;
136        for model in &usage.models {
137            if !model.is_empty() && !self.models.contains(model) {
138                self.models.push(model.clone());
139            }
140        }
141    }
142}
143
144#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
145#[serde(default)]
146pub struct RunViewProvider {
147    pub provider: String,
148    pub model: String,
149    pub call_count: i64,
150    pub input_tokens: i64,
151    pub output_tokens: i64,
152    pub cost_usd: f64,
153}
154
155#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
156#[serde(default)]
157pub struct RunViewStage {
158    pub id: String,
159    pub node_id: String,
160    pub kind: String,
161    pub status: String,
162    pub outcome: String,
163    pub branch: Option<String>,
164    pub started_at: String,
165    pub finished_at: Option<String>,
166    pub duration_ms: Option<u64>,
167    pub visible_text: Option<String>,
168    pub usage: RunViewUsage,
169    pub provider: Option<String>,
170    pub model: Option<String>,
171    pub artifact_refs: Vec<String>,
172    pub attempt_count: usize,
173    pub error: Option<String>,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct RunViewArtifact {
179    pub id: String,
180    pub kind: String,
181    pub title: Option<String>,
182    pub source: Option<String>,
183    pub stage: Option<String>,
184    pub estimated_tokens: Option<usize>,
185    pub lineage: Vec<String>,
186    pub preview: Option<String>,
187}
188
189#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
190#[serde(default)]
191pub struct RunViewCheckpoint {
192    pub id: String,
193    pub reason: String,
194    pub ready_count: usize,
195    pub completed_count: usize,
196    pub last_stage_id: Option<String>,
197    pub persisted_at: String,
198}
199
200#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
201#[serde(default)]
202pub struct RunViewPendingState {
203    pub nodes: Vec<String>,
204    pub approvals: Vec<RunViewApproval>,
205    pub auth: Vec<RunViewAuth>,
206}
207
208#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
209#[serde(default)]
210pub struct RunViewApproval {
211    pub request_id: String,
212    pub prompt: String,
213    pub agent: String,
214    pub trace_id: Option<String>,
215    pub asked_at: String,
216}
217
218#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
219#[serde(default)]
220pub struct RunViewAuth {
221    pub provider: Option<String>,
222    pub server: Option<String>,
223    pub scope: Option<String>,
224    pub stage_id: Option<String>,
225    pub message: Option<String>,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
229#[serde(default)]
230pub struct RunViewFailure {
231    pub stage_id: Option<String>,
232    pub node_id: Option<String>,
233    pub status: String,
234    pub outcome: String,
235    pub message: Option<String>,
236}
237
238#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
239#[serde(default)]
240pub struct TranscriptSummary {
241    pub present: bool,
242    pub message_count: usize,
243    pub event_count: usize,
244    pub summary: Option<String>,
245    pub source: Option<String>,
246}
247
248#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
249#[serde(default)]
250pub struct RunViewMetadata {
251    pub record_type: String,
252    pub stage_count: usize,
253    pub transition_count: usize,
254    pub artifact_count: usize,
255    pub checkpoint_count: usize,
256    pub child_run_count: usize,
257    pub observability_present: bool,
258    pub planner_round_count: usize,
259    pub tool_recording_count: usize,
260    pub replay_fixture_id: Option<String>,
261    pub execution: Option<super::RunExecutionRecord>,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
265#[serde(default)]
266pub struct SessionView {
267    pub schema: String,
268    pub schema_version: u32,
269    pub producer: ViewProducer,
270    pub session: SessionViewSession,
271    pub projection: ProjectionInfo,
272    pub runs: Vec<RunView>,
273    pub history: Vec<SessionViewHistoryItem>,
274    pub usage: RunViewUsage,
275    pub pending: RunViewPendingState,
276    pub failure: Option<RunViewFailure>,
277    pub metadata: SessionViewMetadata,
278}
279
280#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
281#[serde(default)]
282pub struct SessionViewSession {
283    pub session_id: Option<String>,
284    pub parent_session_id: Option<String>,
285    pub root_session_id: Option<String>,
286    pub status: String,
287    pub run_count: usize,
288    pub started_at: Option<String>,
289    pub updated_at: Option<String>,
290    pub last_event_id: Option<EventId>,
291    pub chain_root_hash: Option<String>,
292}
293
294#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
295#[serde(default)]
296pub struct SessionViewHistoryItem {
297    pub run_id: String,
298    pub run_path: Option<String>,
299    pub session_id: Option<String>,
300    pub status: String,
301    pub started_at: Option<String>,
302    pub finished_at: Option<String>,
303    pub last_event_id: Option<EventId>,
304    pub visible_text: Option<String>,
305}
306
307#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
308#[serde(default)]
309pub struct SessionViewMetadata {
310    pub record_count: usize,
311    pub event_count: usize,
312    pub has_event_log: bool,
313}
314
315#[derive(Clone, Debug, Default)]
316pub struct RunViewOptions {
317    pub producer: ViewProducer,
318    pub run_path: Option<String>,
319    pub last_event_id: Option<EventId>,
320    pub prefix_hash: Option<String>,
321}
322
323#[derive(Clone, Debug, Default)]
324pub struct SessionViewOptions {
325    pub producer: ViewProducer,
326    pub session_id: Option<String>,
327    pub parent_session_id: Option<String>,
328    pub root_session_id: Option<String>,
329    pub status: Option<String>,
330    pub started_at: Option<String>,
331    pub updated_at: Option<String>,
332    pub last_event_id: Option<EventId>,
333    pub chain_root_hash: Option<String>,
334    pub event_count: usize,
335    pub has_event_log: bool,
336}
337
338#[derive(Debug)]
339pub enum RunViewError {
340    EventLog(LogError),
341}
342
343impl std::fmt::Display for RunViewError {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        match self {
346            Self::EventLog(error) => error.fmt(f),
347        }
348    }
349}
350
351impl std::error::Error for RunViewError {}
352
353impl From<LogError> for RunViewError {
354    fn from(error: LogError) -> Self {
355        Self::EventLog(error)
356    }
357}
358
359pub fn build_run_view(run: &RunRecord) -> RunView {
360    build_run_view_with_options(run, RunViewOptions::default())
361}
362
363pub fn build_run_view_with_path(run: &RunRecord, run_path: Option<impl Into<String>>) -> RunView {
364    build_run_view_with_options(
365        run,
366        RunViewOptions {
367            run_path: run_path.map(Into::into),
368            ..RunViewOptions::default()
369        },
370    )
371}
372
373pub async fn build_run_view_with_event_log(
374    run: &RunRecord,
375    run_path: Option<impl Into<String>>,
376    log: Option<&AnyEventLog>,
377) -> Result<RunView, RunViewError> {
378    let mut options = RunViewOptions {
379        run_path: run_path.map(Into::into),
380        ..RunViewOptions::default()
381    };
382    if let Some(log) = log {
383        if let Some(session_id) = infer_run_session_id(run) {
384            let (last_event_id, prefix_hash) = read_session_tip(log, &session_id).await?;
385            options.last_event_id = last_event_id;
386            options.prefix_hash = prefix_hash;
387        }
388    }
389    Ok(build_run_view_with_options(run, options))
390}
391
392pub fn build_run_view_with_options(run: &RunRecord, options: RunViewOptions) -> RunView {
393    let policy = current_policy();
394    let session_id = infer_run_session_id(run);
395    let parent_session_id = infer_parent_session_id(run);
396    let stages = run
397        .stages
398        .iter()
399        .map(|stage| build_stage_view(stage, &policy))
400        .collect::<Vec<_>>();
401    let visible_text = bounded_join(
402        run.stages
403            .iter()
404            .filter_map(|stage| stage.visible_text.as_deref())
405            .map(|text| redact_bounded(text, &policy, TEXT_LIMIT)),
406        TEXT_LIMIT,
407    );
408    let usage = run
409        .usage
410        .as_ref()
411        .map(RunViewUsage::from)
412        .unwrap_or_else(|| usage_from_stages(&stages));
413    let mut view = RunView {
414        schema: RUN_VIEW_SCHEMA.to_string(),
415        schema_version: RUN_VIEW_SCHEMA_VERSION,
416        producer: options.producer.clone(),
417        run: RunViewRun {
418            run_id: run.id.clone(),
419            session_id,
420            parent_run_id: run.parent_run_id.clone(),
421            root_run_id: run.root_run_id.clone(),
422            parent_session_id,
423            child_runs: run
424                .child_runs
425                .iter()
426                .map(|child| build_child_view(child, &policy))
427                .collect(),
428            run_path: options
429                .run_path
430                .clone()
431                .or_else(|| run.persisted_path.clone()),
432            status: run.status.clone(),
433            workflow_id: run.workflow_id.clone(),
434            workflow_name: run.workflow_name.clone(),
435            task: redact_bounded(&run.task, &policy, TEXT_LIMIT),
436            started_at: run.started_at.clone(),
437            finished_at: run.finished_at.clone(),
438            duration_ms: run_duration_ms(run),
439        },
440        projection: ProjectionInfo {
441            projection_id: String::new(),
442            projection_hash: None,
443            prefix_hash: options.prefix_hash,
444            last_event_id: options.last_event_id,
445        },
446        visible_text,
447        transcript: transcript_summary_for_run(run, &policy),
448        usage,
449        providers: provider_summary(run),
450        stages,
451        artifacts: run
452            .artifacts
453            .iter()
454            .map(|artifact| build_artifact_view(artifact, &policy))
455            .collect(),
456        checkpoints: run.checkpoints.iter().map(build_checkpoint_view).collect(),
457        pending: RunViewPendingState {
458            nodes: run.pending_nodes.clone(),
459            approvals: run
460                .hitl_questions
461                .iter()
462                .map(|question| build_approval_view(question, &policy))
463                .collect(),
464            auth: pending_auth(run, &policy),
465        },
466        failure: failure_summary(run, &policy),
467        metadata: RunViewMetadata {
468            record_type: run.type_name.clone(),
469            stage_count: run.stages.len(),
470            transition_count: run.transitions.len(),
471            artifact_count: run.artifacts.len(),
472            checkpoint_count: run.checkpoints.len(),
473            child_run_count: run.child_runs.len(),
474            observability_present: run.observability.is_some(),
475            planner_round_count: run
476                .observability
477                .as_ref()
478                .map(|observability| observability.planner_rounds.len())
479                .unwrap_or_default(),
480            tool_recording_count: run.tool_recordings.len(),
481            replay_fixture_id: run
482                .replay_fixture
483                .as_ref()
484                .map(|fixture| fixture.id.clone()),
485            execution: run.execution.clone(),
486        },
487    };
488    finalize_run_projection(&mut view);
489    view
490}
491
492pub fn build_session_view_from_run_views(
493    runs: Vec<RunView>,
494    options: SessionViewOptions,
495) -> SessionView {
496    let session_id = options
497        .session_id
498        .clone()
499        .or_else(|| runs.iter().find_map(|run| run.run.session_id.clone()));
500    let mut usage = RunViewUsage::default();
501    let mut pending = RunViewPendingState::default();
502    let mut failure = None;
503    let mut started_at = options.started_at.clone();
504    let mut updated_at = options.updated_at.clone();
505    let history = runs
506        .iter()
507        .map(|run| {
508            usage.add_usage(&run.usage);
509            pending.nodes.extend(run.pending.nodes.clone());
510            pending.approvals.extend(run.pending.approvals.clone());
511            pending.auth.extend(run.pending.auth.clone());
512            if failure.is_none() {
513                failure = run.failure.clone();
514            }
515            if !run.run.started_at.is_empty() {
516                started_at = min_opt_string(started_at.take(), Some(run.run.started_at.clone()));
517                updated_at = max_opt_string(updated_at.take(), Some(run.run.started_at.clone()));
518            }
519            updated_at = max_opt_string(updated_at.take(), run.run.finished_at.clone());
520            SessionViewHistoryItem {
521                run_id: run.run.run_id.clone(),
522                run_path: run.run.run_path.clone(),
523                session_id: run.run.session_id.clone(),
524                status: run.run.status.clone(),
525                started_at: non_empty_string(&run.run.started_at),
526                finished_at: run.run.finished_at.clone(),
527                last_event_id: run.projection.last_event_id,
528                visible_text: run.visible_text.clone(),
529            }
530        })
531        .collect::<Vec<_>>();
532    let status = options
533        .status
534        .clone()
535        .unwrap_or_else(|| aggregate_session_status(&runs));
536    let last_event_id = options.last_event_id.or_else(|| {
537        runs.iter()
538            .filter_map(|run| run.projection.last_event_id)
539            .max()
540    });
541    let chain_root_hash = options.chain_root_hash.clone().or_else(|| {
542        runs.iter()
543            .rev()
544            .find_map(|run| run.projection.prefix_hash.clone())
545    });
546    let mut view = SessionView {
547        schema: SESSION_VIEW_SCHEMA.to_string(),
548        schema_version: SESSION_VIEW_SCHEMA_VERSION,
549        producer: options.producer.clone(),
550        session: SessionViewSession {
551            session_id,
552            parent_session_id: options.parent_session_id.clone().or_else(|| {
553                runs.iter()
554                    .find_map(|run| run.run.parent_session_id.clone())
555            }),
556            root_session_id: options.root_session_id.clone(),
557            status,
558            run_count: runs.len(),
559            started_at,
560            updated_at,
561            last_event_id,
562            chain_root_hash,
563        },
564        projection: ProjectionInfo {
565            projection_id: String::new(),
566            projection_hash: None,
567            prefix_hash: None,
568            last_event_id,
569        },
570        runs,
571        history,
572        usage,
573        pending: dedupe_pending(pending),
574        failure,
575        metadata: SessionViewMetadata {
576            record_count: 0,
577            event_count: options.event_count,
578            has_event_log: options.has_event_log,
579        },
580    };
581    view.metadata.record_count = view.runs.len();
582    view.projection.prefix_hash = view.session.chain_root_hash.clone();
583    finalize_session_projection(&mut view);
584    view
585}
586
587pub async fn build_session_view_from_run_records(
588    runs: Vec<(&RunRecord, Option<String>)>,
589    session_id: Option<String>,
590    log: Option<&AnyEventLog>,
591) -> Result<SessionView, RunViewError> {
592    let mut views = Vec::new();
593    for (run, path) in runs {
594        views.push(build_run_view_with_event_log(run, path, log).await?);
595    }
596    let mut options = SessionViewOptions {
597        session_id,
598        has_event_log: log.is_some(),
599        ..SessionViewOptions::default()
600    };
601    if let (Some(log), Some(session_id)) = (log, options.session_id.as_deref()) {
602        let (last_event_id, chain_root_hash) = read_session_tip(log, session_id).await?;
603        options.last_event_id = last_event_id;
604        options.chain_root_hash = chain_root_hash;
605    }
606    Ok(build_session_view_from_run_views(views, options))
607}
608
609pub async fn build_empty_session_view(
610    session_id: Option<String>,
611    log: Option<&AnyEventLog>,
612) -> Result<SessionView, RunViewError> {
613    let mut options = SessionViewOptions {
614        session_id: session_id.clone(),
615        has_event_log: log.is_some(),
616        ..SessionViewOptions::default()
617    };
618    if let (Some(log), Some(session_id)) = (log, session_id.as_deref()) {
619        let (last_event_id, chain_root_hash) = read_session_tip(log, session_id).await?;
620        options.last_event_id = last_event_id;
621        options.chain_root_hash = chain_root_hash;
622    }
623    Ok(build_session_view_from_run_views(Vec::new(), options))
624}
625
626async fn read_session_tip(
627    log: &AnyEventLog,
628    session_id: &str,
629) -> Result<(Option<EventId>, Option<String>), LogError> {
630    let topic = crate::session_timeline::agent_events_topic(session_id);
631    let Some(latest) = log.latest(&topic).await? else {
632        return Ok((None, None));
633    };
634    let from = latest.checked_sub(1);
635    let events = log.read_range(&topic, from, 1).await?;
636    let prefix_hash = events
637        .into_iter()
638        .find(|(event_id, _)| *event_id == latest)
639        .and_then(|(event_id, event)| {
640            event_record_hash_from_headers(topic.as_str(), event_id, &event).ok()
641        });
642    Ok((Some(latest), prefix_hash))
643}
644
645fn build_child_view(child: &RunChildRecord, policy: &RedactionPolicy) -> RunViewChild {
646    RunViewChild {
647        worker_id: child.worker_id.clone(),
648        worker_name: child.worker_name.clone(),
649        run_id: child.run_id.clone(),
650        session_id: child.session_id.clone(),
651        parent_session_id: child.parent_session_id.clone(),
652        run_path: child.run_path.clone(),
653        status: child.status.clone(),
654        task: redact_bounded(&child.task, policy, TEXT_LIMIT),
655    }
656}
657
658fn build_stage_view(stage: &RunStageRecord, policy: &RedactionPolicy) -> RunViewStage {
659    let usage = stage
660        .usage
661        .as_ref()
662        .map(RunViewUsage::from)
663        .unwrap_or_default();
664    let artifact_refs = stage
665        .produced_artifact_ids
666        .iter()
667        .chain(stage.artifacts.iter().map(|artifact| &artifact.id))
668        .filter(|id| !id.is_empty())
669        .cloned()
670        .collect::<BTreeSet<_>>()
671        .into_iter()
672        .collect();
673    RunViewStage {
674        id: stage.id.clone(),
675        node_id: stage.node_id.clone(),
676        kind: stage.kind.clone(),
677        status: stage.status.clone(),
678        outcome: stage.outcome.clone(),
679        branch: stage.branch.clone(),
680        started_at: stage.started_at.clone(),
681        finished_at: stage.finished_at.clone(),
682        duration_ms: stage_duration_ms(stage),
683        visible_text: stage
684            .visible_text
685            .as_deref()
686            .map(|text| redact_bounded(text, policy, TEXT_LIMIT)),
687        usage,
688        provider: metadata_string_any(&stage.metadata, &["provider"])
689            .or_else(|| metadata_path_string(&stage.metadata, &["model_policy", "provider"])),
690        model: metadata_string_any(&stage.metadata, &["model"])
691            .or_else(|| metadata_path_string(&stage.metadata, &["model_policy", "model"])),
692        artifact_refs,
693        attempt_count: stage.attempts.len(),
694        error: stage_error(stage, policy),
695    }
696}
697
698fn build_artifact_view(artifact: &ArtifactRecord, policy: &RedactionPolicy) -> RunViewArtifact {
699    RunViewArtifact {
700        id: artifact.id.clone(),
701        kind: artifact.kind.clone(),
702        title: artifact.title.clone(),
703        source: artifact.source.clone(),
704        stage: artifact.stage.clone(),
705        estimated_tokens: artifact.estimated_tokens,
706        lineage: artifact.lineage.clone(),
707        preview: artifact
708            .text
709            .as_deref()
710            .map(|text| redact_bounded(text, policy, PREVIEW_LIMIT))
711            .or_else(|| {
712                artifact
713                    .data
714                    .as_ref()
715                    .map(|data| redact_json_preview(data, policy))
716            }),
717    }
718}
719
720fn build_checkpoint_view(checkpoint: &RunCheckpointRecord) -> RunViewCheckpoint {
721    RunViewCheckpoint {
722        id: checkpoint.id.clone(),
723        reason: checkpoint.reason.clone(),
724        ready_count: checkpoint.ready_nodes.len(),
725        completed_count: checkpoint.completed_nodes.len(),
726        last_stage_id: checkpoint.last_stage_id.clone(),
727        persisted_at: checkpoint.persisted_at.clone(),
728    }
729}
730
731fn build_approval_view(
732    question: &RunHitlQuestionRecord,
733    policy: &RedactionPolicy,
734) -> RunViewApproval {
735    RunViewApproval {
736        request_id: question.request_id.clone(),
737        prompt: redact_bounded(&question.prompt, policy, PREVIEW_LIMIT),
738        agent: question.agent.clone(),
739        trace_id: question.trace_id.clone(),
740        asked_at: question.asked_at.clone(),
741    }
742}
743
744fn provider_summary(run: &RunRecord) -> Vec<RunViewProvider> {
745    let mut providers = BTreeMap::<(String, String), RunViewProvider>::new();
746    for span in run
747        .trace_spans
748        .iter()
749        .filter(|span| span.kind == "llm_call")
750    {
751        let provider = span
752            .metadata
753            .get("provider")
754            .and_then(Value::as_str)
755            .unwrap_or("unknown")
756            .to_string();
757        let model = span
758            .metadata
759            .get("model")
760            .and_then(Value::as_str)
761            .unwrap_or("unknown")
762            .to_string();
763        let input_tokens = metadata_i64(&span.metadata, "input_tokens");
764        let output_tokens = metadata_i64(&span.metadata, "output_tokens");
765        let cost_usd = span
766            .metadata
767            .get("cost_usd")
768            .and_then(Value::as_f64)
769            .unwrap_or_else(|| {
770                crate::llm::calculate_cost_for_provider(
771                    &provider,
772                    &model,
773                    input_tokens,
774                    output_tokens,
775                )
776            });
777        let entry = providers
778            .entry((provider.clone(), model.clone()))
779            .or_insert_with(|| RunViewProvider {
780                provider,
781                model,
782                ..RunViewProvider::default()
783            });
784        entry.call_count += 1;
785        entry.input_tokens += input_tokens;
786        entry.output_tokens += output_tokens;
787        entry.cost_usd += cost_usd;
788    }
789    if providers.is_empty() {
790        if let Some(usage) = &run.usage {
791            for model in &usage.models {
792                if model.is_empty() {
793                    continue;
794                }
795                providers.insert(
796                    ("unknown".to_string(), model.clone()),
797                    RunViewProvider {
798                        provider: "unknown".to_string(),
799                        model: model.clone(),
800                        call_count: usage.call_count,
801                        input_tokens: usage.input_tokens,
802                        output_tokens: usage.output_tokens,
803                        cost_usd: usage.total_cost,
804                    },
805                );
806            }
807        }
808    }
809    providers.into_values().collect()
810}
811
812fn transcript_summary_for_run(run: &RunRecord, policy: &RedactionPolicy) -> TranscriptSummary {
813    if let Some(transcript) = run.transcript.as_ref() {
814        return transcript_summary(Some(transcript), policy);
815    }
816    transcript_summary_from_stages(&run.stages, policy)
817}
818
819fn transcript_summary_from_stages(
820    stages: &[RunStageRecord],
821    policy: &RedactionPolicy,
822) -> TranscriptSummary {
823    let mut out = TranscriptSummary::default();
824    let mut summaries = Vec::new();
825    for stage in stages {
826        let Some(transcript) = stage.transcript.as_ref() else {
827            continue;
828        };
829        out.present = true;
830        out.message_count += count_array_field(transcript, "messages");
831        out.event_count += count_array_field(transcript, "events");
832        if let Some(summary) = transcript_summary(Some(transcript), policy).summary {
833            let label = non_empty_string(&stage.node_id)
834                .or_else(|| non_empty_string(&stage.id))
835                .unwrap_or_else(|| "stage".to_string());
836            summaries.push(format!("{label}: {summary}"));
837        }
838    }
839    if out.present {
840        out.summary = bounded_join(summaries, PREVIEW_LIMIT);
841        out.source = Some("stages".to_string());
842    }
843    out
844}
845
846fn transcript_summary(value: Option<&Value>, policy: &RedactionPolicy) -> TranscriptSummary {
847    let Some(value) = value else {
848        return TranscriptSummary::default();
849    };
850    TranscriptSummary {
851        present: true,
852        message_count: count_array_field(value, "messages"),
853        event_count: count_array_field(value, "events"),
854        summary: value
855            .get("summary")
856            .and_then(Value::as_str)
857            .map(|text| redact_bounded(text, policy, PREVIEW_LIMIT))
858            .or_else(|| {
859                value
860                    .get("summary")
861                    .map(|value| redact_json_preview(value, policy))
862            }),
863        source: value
864            .get("source")
865            .and_then(Value::as_str)
866            .map(str::to_string),
867    }
868}
869
870fn pending_auth(run: &RunRecord, policy: &RedactionPolicy) -> Vec<RunViewAuth> {
871    let mut auth = Vec::new();
872    collect_auth_from_metadata(None, &run.metadata, &mut auth, policy);
873    for stage in &run.stages {
874        collect_auth_from_metadata(Some(&stage.id), &stage.metadata, &mut auth, policy);
875    }
876    auth
877}
878
879fn collect_auth_from_metadata(
880    stage_id: Option<&str>,
881    metadata: &BTreeMap<String, Value>,
882    out: &mut Vec<RunViewAuth>,
883    policy: &RedactionPolicy,
884) {
885    for key in ["pending_auth", "auth_required", "mcp_auth_required"] {
886        let Some(value) = metadata.get(key) else {
887            continue;
888        };
889        match value {
890            Value::Array(items) => {
891                for item in items {
892                    out.push(auth_from_value(stage_id, item, policy));
893                }
894            }
895            Value::Object(_) => out.push(auth_from_value(stage_id, value, policy)),
896            Value::Bool(true) => out.push(RunViewAuth {
897                stage_id: stage_id.map(str::to_string),
898                ..RunViewAuth::default()
899            }),
900            Value::String(message) => out.push(RunViewAuth {
901                stage_id: stage_id.map(str::to_string),
902                message: Some(redact_bounded(message, policy, PREVIEW_LIMIT)),
903                ..RunViewAuth::default()
904            }),
905            _ => {}
906        }
907    }
908}
909
910fn auth_from_value(stage_id: Option<&str>, value: &Value, policy: &RedactionPolicy) -> RunViewAuth {
911    let object = value.as_object();
912    let field = |name: &str| {
913        object
914            .and_then(|object| object.get(name))
915            .and_then(Value::as_str)
916            .map(|text| redact_bounded(text, policy, PREVIEW_LIMIT))
917    };
918    RunViewAuth {
919        provider: field("provider"),
920        server: field("server").or_else(|| field("server_name")),
921        scope: field("scope"),
922        stage_id: stage_id.map(str::to_string).or_else(|| field("stage_id")),
923        message: field("message").or_else(|| Some(redact_json_preview(value, policy))),
924    }
925}
926
927fn failure_summary(run: &RunRecord, policy: &RedactionPolicy) -> Option<RunViewFailure> {
928    run.stages
929        .iter()
930        .rev()
931        .find(|stage| failed_status(&stage.status) || failed_status(&stage.outcome))
932        .map(|stage| RunViewFailure {
933            stage_id: Some(stage.id.clone()),
934            node_id: Some(stage.node_id.clone()),
935            status: stage.status.clone(),
936            outcome: stage.outcome.clone(),
937            message: stage_error(stage, policy)
938                .or_else(|| Some(format!("{} failed with {}", stage.node_id, stage.outcome))),
939        })
940        .or_else(|| {
941            failed_status(&run.status).then(|| RunViewFailure {
942                status: run.status.clone(),
943                outcome: run.status.clone(),
944                ..RunViewFailure::default()
945            })
946        })
947}
948
949fn stage_error(stage: &RunStageRecord, policy: &RedactionPolicy) -> Option<String> {
950    stage
951        .metadata
952        .get("error")
953        .map(|value| redact_json_preview(value, policy))
954        .or_else(|| {
955            stage
956                .attempts
957                .iter()
958                .rev()
959                .find_map(|attempt| attempt.error.as_deref())
960                .map(|error| redact_bounded(error, policy, PREVIEW_LIMIT))
961        })
962}
963
964fn failed_status(value: &str) -> bool {
965    matches!(
966        value,
967        "failed" | "error" | "errored" | "cancelled" | "canceled" | "timeout" | "timed_out"
968    )
969}
970
971fn usage_from_stages(stages: &[RunViewStage]) -> RunViewUsage {
972    let mut usage = RunViewUsage::default();
973    for stage in stages {
974        usage.add_usage(&stage.usage);
975    }
976    usage
977}
978
979fn run_duration_ms(run: &RunRecord) -> Option<u64> {
980    let from_usage = run
981        .usage
982        .as_ref()
983        .and_then(|usage| u64::try_from(usage.total_duration_ms).ok())
984        .filter(|duration| *duration > 0);
985    let from_spans = run
986        .trace_spans
987        .iter()
988        .map(trace_span_end_ms)
989        .max()
990        .filter(|duration| *duration > 0);
991    let from_timestamps = run
992        .finished_at
993        .as_deref()
994        .and_then(|finished| timestamp_delta_ms(&run.started_at, finished));
995    from_timestamps.or(from_spans).or(from_usage)
996}
997
998fn stage_duration_ms(stage: &RunStageRecord) -> Option<u64> {
999    stage
1000        .usage
1001        .as_ref()
1002        .and_then(|usage| u64::try_from(usage.total_duration_ms).ok())
1003        .filter(|duration| *duration > 0)
1004        .or_else(|| {
1005            stage
1006                .finished_at
1007                .as_deref()
1008                .and_then(|finished| timestamp_delta_ms(&stage.started_at, finished))
1009        })
1010}
1011
1012fn timestamp_delta_ms(started_at: &str, finished_at: &str) -> Option<u64> {
1013    let start = parse_timestamp_ms(started_at)?;
1014    let end = parse_timestamp_ms(finished_at)?;
1015    u64::try_from(end.saturating_sub(start)).ok()
1016}
1017
1018fn parse_timestamp_ms(value: &str) -> Option<i128> {
1019    if value.trim().is_empty() {
1020        return None;
1021    }
1022    if let Ok(seconds) = value.parse::<i128>() {
1023        return Some(seconds.saturating_mul(1000));
1024    }
1025    let parsed = OffsetDateTime::parse(value, &Rfc3339).ok()?;
1026    Some(
1027        i128::from(parsed.unix_timestamp()).saturating_mul(1000) + i128::from(parsed.millisecond()),
1028    )
1029}
1030
1031fn trace_span_end_ms(span: &RunTraceSpanRecord) -> u64 {
1032    span.start_ms.saturating_add(span.duration_ms)
1033}
1034
1035fn infer_run_session_id(run: &RunRecord) -> Option<String> {
1036    metadata_string_any(&run.metadata, &["session_id", "agent_session_id"])
1037        .or_else(|| metadata_path_string(&run.metadata, &["model_policy", "session_id"]))
1038        .or_else(|| metadata_path_string(&run.metadata, &["audit", "session_id"]))
1039        .or_else(|| {
1040            run.child_runs
1041                .iter()
1042                .find_map(|child| child.session_id.clone())
1043        })
1044        .or_else(|| {
1045            run.stages.iter().find_map(|stage| {
1046                metadata_string_any(&stage.metadata, &["session_id", "agent_session_id"])
1047                    .or_else(|| {
1048                        metadata_path_string(&stage.metadata, &["model_policy", "session_id"])
1049                    })
1050                    .or_else(|| metadata_path_string(&stage.metadata, &["audit", "session_id"]))
1051                    .or_else(|| {
1052                        metadata_path_string(&stage.metadata, &["worker", "audit", "session_id"])
1053                    })
1054            })
1055        })
1056        .or_else(|| {
1057            run.trace_spans.iter().find_map(|span| {
1058                metadata_string_any(&span.metadata, &["session_id", "agent_session_id"])
1059            })
1060        })
1061}
1062
1063fn infer_parent_session_id(run: &RunRecord) -> Option<String> {
1064    metadata_string_any(&run.metadata, &["parent_session_id"])
1065        .or_else(|| metadata_path_string(&run.metadata, &["audit", "parent_session_id"]))
1066        .or_else(|| {
1067            run.child_runs
1068                .iter()
1069                .find_map(|child| child.parent_session_id.clone())
1070        })
1071        .or_else(|| {
1072            run.stages.iter().find_map(|stage| {
1073                metadata_string_any(&stage.metadata, &["parent_session_id"])
1074                    .or_else(|| {
1075                        metadata_path_string(&stage.metadata, &["audit", "parent_session_id"])
1076                    })
1077                    .or_else(|| {
1078                        metadata_path_string(
1079                            &stage.metadata,
1080                            &["worker", "audit", "parent_session_id"],
1081                        )
1082                    })
1083            })
1084        })
1085}
1086
1087fn metadata_string_any(metadata: &BTreeMap<String, Value>, keys: &[&str]) -> Option<String> {
1088    keys.iter()
1089        .find_map(|key| metadata.get(*key).and_then(Value::as_str))
1090        .filter(|value| !value.is_empty())
1091        .map(str::to_string)
1092}
1093
1094fn metadata_path_string(metadata: &BTreeMap<String, Value>, path: &[&str]) -> Option<String> {
1095    let mut value = metadata.get(*path.first()?)?;
1096    for key in &path[1..] {
1097        value = value.get(*key)?;
1098    }
1099    value
1100        .as_str()
1101        .filter(|value| !value.is_empty())
1102        .map(str::to_string)
1103}
1104
1105fn metadata_i64(metadata: &BTreeMap<String, Value>, key: &str) -> i64 {
1106    metadata
1107        .get(key)
1108        .and_then(Value::as_i64)
1109        .or_else(|| {
1110            metadata
1111                .get(key)
1112                .and_then(Value::as_u64)
1113                .and_then(|value| i64::try_from(value).ok())
1114        })
1115        .unwrap_or_default()
1116}
1117
1118fn count_array_field(value: &Value, field: &str) -> usize {
1119    value
1120        .get(field)
1121        .and_then(Value::as_array)
1122        .map(Vec::len)
1123        .unwrap_or_default()
1124}
1125
1126fn redact_json_preview(value: &Value, policy: &RedactionPolicy) -> String {
1127    let mut value = value.clone();
1128    policy.redact_json_in_place(&mut value);
1129    bounded_text(
1130        &serde_json::to_string(&value).unwrap_or_default(),
1131        PREVIEW_LIMIT,
1132    )
1133}
1134
1135fn redact_bounded(text: &str, policy: &RedactionPolicy, limit: usize) -> String {
1136    let redacted = policy.redact_string(text);
1137    bounded_text(redacted.as_ref(), limit)
1138}
1139
1140fn bounded_text(text: &str, limit: usize) -> String {
1141    if text.len() <= limit {
1142        return text.to_string();
1143    }
1144    let boundary = text
1145        .char_indices()
1146        .map(|(index, _)| index)
1147        .take_while(|index| *index <= limit)
1148        .last()
1149        .unwrap_or(0);
1150    format!("{}...", &text[..boundary])
1151}
1152
1153fn bounded_join(values: impl IntoIterator<Item = String>, limit: usize) -> Option<String> {
1154    let mut out = String::new();
1155    for value in values {
1156        if value.is_empty() {
1157            continue;
1158        }
1159        if !out.is_empty() {
1160            out.push_str("\n\n");
1161        }
1162        out.push_str(&value);
1163        if out.len() > limit {
1164            return Some(bounded_text(&out, limit));
1165        }
1166    }
1167    non_empty_string(&out)
1168}
1169
1170fn non_empty_string(value: &str) -> Option<String> {
1171    (!value.is_empty()).then(|| value.to_string())
1172}
1173
1174fn min_opt_string(left: Option<String>, right: Option<String>) -> Option<String> {
1175    match (left, right) {
1176        (Some(left), Some(right)) => Some(left.min(right)),
1177        (Some(left), None) => Some(left),
1178        (None, Some(right)) => Some(right),
1179        (None, None) => None,
1180    }
1181}
1182
1183fn max_opt_string(left: Option<String>, right: Option<String>) -> Option<String> {
1184    match (left, right) {
1185        (Some(left), Some(right)) => Some(left.max(right)),
1186        (Some(left), None) => Some(left),
1187        (None, Some(right)) => Some(right),
1188        (None, None) => None,
1189    }
1190}
1191
1192fn aggregate_session_status(runs: &[RunView]) -> String {
1193    if runs.is_empty() {
1194        return "unknown".to_string();
1195    }
1196    if runs
1197        .iter()
1198        .any(|run| failed_status(&run.run.status) || run.failure.is_some())
1199    {
1200        return "failed".to_string();
1201    }
1202    if runs.iter().all(|run| {
1203        matches!(
1204            run.run.status.as_str(),
1205            "completed" | "succeeded" | "success" | "ok"
1206        )
1207    }) {
1208        return "completed".to_string();
1209    }
1210    "active".to_string()
1211}
1212
1213fn dedupe_pending(mut pending: RunViewPendingState) -> RunViewPendingState {
1214    let mut nodes = BTreeSet::new();
1215    pending.nodes.retain(|node| nodes.insert(node.clone()));
1216    let mut approvals = BTreeSet::new();
1217    pending
1218        .approvals
1219        .retain(|approval| approvals.insert(approval.request_id.clone()));
1220    let mut auth_seen = BTreeSet::new();
1221    pending.auth.retain(|item| {
1222        auth_seen.insert((
1223            item.provider.clone(),
1224            item.server.clone(),
1225            item.scope.clone(),
1226            item.stage_id.clone(),
1227        ))
1228    });
1229    pending
1230}
1231
1232fn finalize_run_projection(view: &mut RunView) {
1233    if let Some(hash) = projection_hash(RUN_VIEW_SCHEMA, view) {
1234        view.projection.projection_id =
1235            format!("run_view:{}:{}", view.run.run_id, hash_suffix(&hash));
1236        view.projection.projection_hash = Some(hash);
1237    } else {
1238        view.projection.projection_id = format!("run_view:{}", view.run.run_id);
1239    }
1240}
1241
1242fn finalize_session_projection(view: &mut SessionView) {
1243    let id = view
1244        .session
1245        .session_id
1246        .clone()
1247        .unwrap_or_else(|| "unknown".to_string());
1248    if let Some(hash) = projection_hash(SESSION_VIEW_SCHEMA, view) {
1249        view.projection.projection_id = format!("session_view:{id}:{}", hash_suffix(&hash));
1250        view.projection.projection_hash = Some(hash);
1251    } else {
1252        view.projection.projection_id = format!("session_view:{id}");
1253    }
1254}
1255
1256fn projection_hash<T: Serialize>(schema: &str, value: &T) -> Option<String> {
1257    let mut value = serde_json::to_value(value).ok()?;
1258    if let Some(projection) = value
1259        .as_object_mut()
1260        .and_then(|object| object.get_mut("projection"))
1261        .and_then(Value::as_object_mut)
1262    {
1263        projection.remove("projection_id");
1264        projection.remove("projection_hash");
1265    }
1266    let bytes = serde_json::to_vec(&value).ok()?;
1267    let mut hasher = Sha256::new();
1268    hasher.update(schema.as_bytes());
1269    hasher.update([0]);
1270    hasher.update(bytes);
1271    Some(format!("sha256:{}", hex::encode(hasher.finalize())))
1272}
1273
1274fn hash_suffix(hash: &str) -> String {
1275    hash.strip_prefix("sha256:")
1276        .unwrap_or(hash)
1277        .chars()
1278        .take(12)
1279        .collect()
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284    use serde_json::json;
1285
1286    use super::*;
1287
1288    fn sample_run() -> RunRecord {
1289        RunRecord {
1290            type_name: "run_record".to_string(),
1291            id: "run_1".to_string(),
1292            workflow_id: "wf".to_string(),
1293            workflow_name: Some("Workflow".to_string()),
1294            task: "do work".to_string(),
1295            status: "completed".to_string(),
1296            started_at: "2026-01-01T00:00:00Z".to_string(),
1297            finished_at: Some("2026-01-01T00:00:02Z".to_string()),
1298            stages: vec![RunStageRecord {
1299                id: "stage_1".to_string(),
1300                node_id: "plan".to_string(),
1301                kind: "llm".to_string(),
1302                status: "completed".to_string(),
1303                outcome: "ok".to_string(),
1304                started_at: "2026-01-01T00:00:00Z".to_string(),
1305                finished_at: Some("2026-01-01T00:00:01Z".to_string()),
1306                visible_text: Some("done".to_string()),
1307                usage: Some(LlmUsageRecord {
1308                    input_tokens: 10,
1309                    output_tokens: 5,
1310                    total_duration_ms: 1000,
1311                    call_count: 1,
1312                    total_cost: 0.01,
1313                    models: vec!["model-a".to_string()],
1314                }),
1315                metadata: BTreeMap::from([
1316                    ("session_id".to_string(), json!("session_1")),
1317                    ("provider".to_string(), json!("test")),
1318                    ("model".to_string(), json!("model-a")),
1319                ]),
1320                ..RunStageRecord::default()
1321            }],
1322            trace_spans: vec![RunTraceSpanRecord {
1323                kind: "llm_call".to_string(),
1324                metadata: BTreeMap::from([
1325                    ("provider".to_string(), json!("test")),
1326                    ("model".to_string(), json!("model-a")),
1327                    ("input_tokens".to_string(), json!(10)),
1328                    ("output_tokens".to_string(), json!(5)),
1329                    ("cost_usd".to_string(), json!(0.01)),
1330                ]),
1331                ..RunTraceSpanRecord::default()
1332            }],
1333            transcript: Some(json!({
1334                "source": "inline",
1335                "summary": "short",
1336                "messages": [{"role": "assistant"}],
1337                "events": [{"kind": "output"}]
1338            })),
1339            ..RunRecord::default()
1340        }
1341    }
1342
1343    #[test]
1344    fn build_run_view_projects_stable_public_fields() {
1345        let view = build_run_view_with_path(&sample_run(), Some("runs/run_1.json"));
1346        assert_eq!(view.schema, RUN_VIEW_SCHEMA);
1347        assert_eq!(view.schema_version, RUN_VIEW_SCHEMA_VERSION);
1348        assert_eq!(view.run.run_id, "run_1");
1349        assert_eq!(view.run.session_id.as_deref(), Some("session_1"));
1350        assert_eq!(view.run.run_path.as_deref(), Some("runs/run_1.json"));
1351        assert_eq!(view.run.duration_ms, Some(2000));
1352        assert_eq!(view.visible_text.as_deref(), Some("done"));
1353        assert_eq!(view.transcript.message_count, 1);
1354        assert_eq!(view.usage.input_tokens, 10);
1355        assert_eq!(view.providers.len(), 1);
1356        assert!(view.projection.projection_id.starts_with("run_view:run_1:"));
1357        assert!(view.projection.projection_hash.is_some());
1358    }
1359
1360    #[test]
1361    fn build_run_view_tolerates_sparse_legacy_records() {
1362        let run = RunRecord {
1363            type_name: "run_record".to_string(),
1364            id: "legacy".to_string(),
1365            status: "failed".to_string(),
1366            ..RunRecord::default()
1367        };
1368        let view = build_run_view(&run);
1369        assert_eq!(view.run.run_id, "legacy");
1370        assert_eq!(view.run.session_id, None);
1371        assert!(!view.transcript.present);
1372        assert_eq!(
1373            view.failure.as_ref().map(|failure| failure.status.as_str()),
1374            Some("failed")
1375        );
1376    }
1377
1378    #[test]
1379    fn build_session_view_aggregates_runs() {
1380        let run = build_run_view(&sample_run());
1381        let view = build_session_view_from_run_views(
1382            vec![run],
1383            SessionViewOptions {
1384                session_id: Some("session_1".to_string()),
1385                last_event_id: Some(7),
1386                chain_root_hash: Some("sha256:abc".to_string()),
1387                ..SessionViewOptions::default()
1388            },
1389        );
1390        assert_eq!(view.schema, SESSION_VIEW_SCHEMA);
1391        assert_eq!(view.session.session_id.as_deref(), Some("session_1"));
1392        assert_eq!(view.session.last_event_id, Some(7));
1393        assert_eq!(view.session.chain_root_hash.as_deref(), Some("sha256:abc"));
1394        assert_eq!(view.history.len(), 1);
1395        assert_eq!(view.usage.call_count, 1);
1396        assert!(view
1397            .projection
1398            .projection_id
1399            .starts_with("session_view:session_1:"));
1400    }
1401
1402    #[test]
1403    fn build_run_view_summarizes_stage_only_transcripts() {
1404        let mut run = sample_run();
1405        run.transcript = None;
1406        run.stages[0].transcript = Some(json!({
1407            "summary": "stage transcript only",
1408            "messages": [{"role": "assistant"}, {"role": "tool"}],
1409            "events": [{"kind": "tool_result"}]
1410        }));
1411
1412        let view = build_run_view(&run);
1413        assert!(view.transcript.present);
1414        assert_eq!(view.transcript.source.as_deref(), Some("stages"));
1415        assert_eq!(view.transcript.message_count, 2);
1416        assert_eq!(view.transcript.event_count, 1);
1417        assert_eq!(
1418            view.transcript.summary.as_deref(),
1419            Some("plan: stage transcript only")
1420        );
1421    }
1422
1423    #[test]
1424    fn build_run_view_redacts_child_tasks_and_approvals() {
1425        let mut run = sample_run();
1426        run.child_runs.push(RunChildRecord {
1427            worker_id: "worker_1".to_string(),
1428            worker_name: "worker".to_string(),
1429            task: "inspect AKIAABCDEFGHIJKLMNOP".to_string(),
1430            ..RunChildRecord::default()
1431        });
1432        run.hitl_questions.push(RunHitlQuestionRecord {
1433            request_id: "approval_1".to_string(),
1434            prompt: "approve AKIAABCDEFGHIJKLMNOP".to_string(),
1435            ..RunHitlQuestionRecord::default()
1436        });
1437
1438        let view = build_run_view(&run);
1439        assert!(!view.run.child_runs[0].task.contains("AKIAABCDEFGHIJKLMNOP"));
1440        assert!(!view.pending.approvals[0]
1441            .prompt
1442            .contains("AKIAABCDEFGHIJKLMNOP"));
1443    }
1444}