Skip to main content

harn_vm/
session_timeline.rs

1//! Session timeline projection for client-facing observability.
2//!
3//! This module does not own persistence. It projects the existing run record
4//! spans and event-log topics into one stable, redacted shape that clients can
5//! query or subscribe to without learning Harn's storage internals.
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use futures::stream::{self, BoxStream};
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14
15use crate::event_log::{AnyEventLog, EventId, EventLog, LogError, LogEvent, Topic};
16use crate::orchestration::{load_run_record, RunRecord, RunTraceSpanRecord};
17use crate::redact::{current_policy, RedactionPolicy};
18
19pub const SESSION_TIMELINE_SCHEMA_VERSION: u32 = 1;
20pub const SESSION_TIMELINE_QUERY_METHOD: &str = "harn.session_timeline.query";
21pub const SESSION_TIMELINE_SUBSCRIBE_METHOD: &str = "harn.session_timeline.subscribe";
22pub const SESSION_TIMELINE_UNSUBSCRIBE_METHOD: &str = "harn.session_timeline.unsubscribe";
23pub const SESSION_TIMELINE_UPDATE_METHOD: &str = "harn.session_timeline.update";
24
25const DEFAULT_QUERY_LIMIT: usize = 1024;
26const READ_BATCH_SIZE: usize = 256;
27
28#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
29#[serde(default, rename_all = "camelCase")]
30pub struct SessionTimelineQuery {
31    #[serde(alias = "session_id")]
32    pub session_id: Option<String>,
33    #[serde(alias = "run_id")]
34    pub run_id: Option<String>,
35    #[serde(alias = "run_path")]
36    pub run_path: Option<String>,
37    #[serde(alias = "project_id")]
38    pub project_id: Option<String>,
39    #[serde(alias = "from_cursor")]
40    pub from_cursor: SessionTimelineCursor,
41    pub limit: Option<usize>,
42}
43
44impl SessionTimelineQuery {
45    pub fn for_session(session_id: impl Into<String>) -> Self {
46        Self {
47            session_id: Some(session_id.into()),
48            ..Self::default()
49        }
50    }
51
52    fn limit(&self) -> usize {
53        self.limit.unwrap_or(DEFAULT_QUERY_LIMIT).max(1)
54    }
55
56    fn topics(&self) -> Vec<Topic> {
57        let mut topics = Vec::new();
58        if let Some(session_id) = self.session_id.as_deref() {
59            topics.push(agent_events_topic(session_id));
60        }
61        topics.push(static_topic(crate::channels::CHANNEL_TRANSCRIPT_TOPIC));
62        topics.push(static_topic(crate::channels::CHANNEL_AUDIT_TOPIC));
63        topics
64    }
65}
66
67#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
68#[serde(default)]
69pub struct SessionTimelineCursor {
70    pub topics: BTreeMap<String, EventId>,
71}
72
73impl SessionTimelineCursor {
74    pub fn event_id_for(&self, topic: &Topic) -> Option<EventId> {
75        self.topics.get(topic.as_str()).copied()
76    }
77
78    fn bump(&mut self, topic: &str, event_id: EventId) {
79        self.topics
80            .entry(topic.to_string())
81            .and_modify(|cursor| *cursor = (*cursor).max(event_id))
82            .or_insert(event_id);
83    }
84}
85
86#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(rename_all = "camelCase")]
88pub struct SessionTimelineSnapshot {
89    pub schema_version: u32,
90    pub query: SessionTimelineQuery,
91    pub cursor: SessionTimelineCursor,
92    pub nodes: Vec<SessionTimelineNode>,
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(rename_all = "camelCase")]
97pub struct SessionTimelineUpdate {
98    pub schema_version: u32,
99    pub cursor: SessionTimelineCursor,
100    pub node: SessionTimelineNode,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(rename_all = "camelCase")]
105pub struct SessionTimelineNode {
106    pub id: String,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub parent_id: Option<String>,
109    #[serde(default, skip_serializing_if = "Vec::is_empty")]
110    pub children: Vec<String>,
111    pub category: String,
112    pub kind: String,
113    pub name: String,
114    pub status: String,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub trace_id: Option<String>,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub span_id: Option<String>,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub occurred_at_ms: Option<i64>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub start_ms: Option<u64>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub duration_ms: Option<u64>,
125    #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
126    pub attributes: serde_json::Value,
127    #[serde(default, skip_serializing_if = "Vec::is_empty")]
128    pub references: Vec<SessionTimelineReference>,
129    #[serde(default, skip_serializing_if = "Vec::is_empty")]
130    pub links: Vec<SessionTimelineLink>,
131    pub order: u64,
132}
133
134#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
135#[serde(rename_all = "camelCase")]
136pub struct SessionTimelineReference {
137    pub kind: String,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    pub id: Option<String>,
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub topic: Option<String>,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub event_id: Option<EventId>,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147#[serde(rename_all = "camelCase")]
148pub struct SessionTimelineLink {
149    pub kind: String,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    pub target_id: Option<String>,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub trace_id: Option<String>,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub span_id: Option<String>,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub event_id: Option<String>,
158}
159
160#[derive(Debug)]
161pub enum SessionTimelineError {
162    EventLog(LogError),
163    RunRecord(String),
164}
165
166impl std::fmt::Display for SessionTimelineError {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            Self::EventLog(error) => error.fmt(f),
170            Self::RunRecord(message) => f.write_str(message),
171        }
172    }
173}
174
175impl std::error::Error for SessionTimelineError {}
176
177impl From<LogError> for SessionTimelineError {
178    fn from(error: LogError) -> Self {
179        Self::EventLog(error)
180    }
181}
182
183#[derive(Clone)]
184struct TimelineDraft {
185    sort_ms: i128,
186    node: SessionTimelineNode,
187}
188
189pub fn agent_events_topic(session_id: &str) -> Topic {
190    Topic::new(format!(
191        "observability.agent_events.{}",
192        crate::event_log::sanitize_topic_component(session_id)
193    ))
194    .expect("sanitized session id should produce a valid topic")
195}
196
197pub fn timeline_from_run_record(
198    run: &RunRecord,
199    query: SessionTimelineQuery,
200) -> SessionTimelineSnapshot {
201    let policy = current_policy();
202    let mut builder = TimelineBuilder::new(query.clone());
203    if run_matches_query(run, &query) {
204        builder.add_run_spans(run, &policy);
205    }
206    builder.finish()
207}
208
209pub async fn query_session_timeline(
210    log: Option<&AnyEventLog>,
211    run: Option<&RunRecord>,
212    query: SessionTimelineQuery,
213) -> Result<SessionTimelineSnapshot, SessionTimelineError> {
214    let policy = current_policy();
215    let mut builder = TimelineBuilder::new(query.clone());
216    if let Some(run) = run.filter(|run| run_matches_query(run, &query)) {
217        builder.add_run_spans(run, &policy);
218    } else if run.is_none() {
219        if let Some(run) = load_run_for_timeline(&query)? {
220            if run_matches_query(&run, &query) {
221                builder.add_run_spans(&run, &policy);
222            }
223        }
224    }
225    if let Some(log) = log {
226        builder.add_event_log(log, &policy).await?;
227    }
228    Ok(builder.finish())
229}
230
231pub async fn subscribe_session_timeline(
232    log: Arc<AnyEventLog>,
233    query: SessionTimelineQuery,
234) -> Result<
235    BoxStream<'static, Result<SessionTimelineUpdate, SessionTimelineError>>,
236    SessionTimelineError,
237> {
238    let policy = current_policy();
239    let mut streams = Vec::new();
240    for topic in query.topics() {
241        let topic_name = topic.as_str().to_string();
242        let from_cursor = query.from_cursor.event_id_for(&topic);
243        let events = log.clone().subscribe(&topic, from_cursor).await?;
244        let query = query.clone();
245        let policy = policy.clone();
246        streams.push(Box::pin(events.filter_map(move |item| {
247            let topic_name = topic_name.clone();
248            let query = query.clone();
249            let policy = policy.clone();
250            async move {
251                match item {
252                    Ok((event_id, event)) => {
253                        event_update(&query, &policy, &topic_name, event_id, event).map(Ok)
254                    }
255                    Err(error) => Some(Err(SessionTimelineError::EventLog(error))),
256                }
257            }
258        }))
259            as BoxStream<
260                'static,
261                Result<SessionTimelineUpdate, SessionTimelineError>,
262            >);
263    }
264    Ok(Box::pin(stream::select_all(streams)))
265}
266
267struct TimelineBuilder {
268    query: SessionTimelineQuery,
269    cursor: SessionTimelineCursor,
270    nodes: Vec<TimelineDraft>,
271}
272
273impl TimelineBuilder {
274    fn new(query: SessionTimelineQuery) -> Self {
275        Self {
276            cursor: query.from_cursor.clone(),
277            query,
278            nodes: Vec::new(),
279        }
280    }
281
282    fn add_run_spans(&mut self, run: &RunRecord, policy: &RedactionPolicy) {
283        for span in &run.trace_spans {
284            if !span_matches_query(span, &self.query) {
285                continue;
286            }
287            let node = span_node(span, policy);
288            self.nodes.push(TimelineDraft {
289                sort_ms: i128::from(span.start_ms),
290                node,
291            });
292        }
293    }
294
295    async fn add_event_log(
296        &mut self,
297        log: &AnyEventLog,
298        policy: &RedactionPolicy,
299    ) -> Result<(), SessionTimelineError> {
300        for topic in self.query.topics() {
301            let topic_name = topic.as_str().to_string();
302            let mut from = self.query.from_cursor.event_id_for(&topic);
303            loop {
304                let batch = log.read_range(&topic, from, READ_BATCH_SIZE).await?;
305                let batch_len = batch.len();
306                for (event_id, event) in batch {
307                    from = Some(event_id);
308                    self.cursor.bump(&topic_name, event_id);
309                    if let Some(node) =
310                        event_node(&self.query, policy, &topic_name, event_id, event)
311                    {
312                        let sort_ms = node
313                            .occurred_at_ms
314                            .map(i128::from)
315                            .or_else(|| node.start_ms.map(i128::from))
316                            .unwrap_or(i128::from(event_id));
317                        self.nodes.push(TimelineDraft { sort_ms, node });
318                    }
319                }
320                if batch_len < READ_BATCH_SIZE || self.nodes.len() >= self.query.limit() {
321                    break;
322                }
323            }
324        }
325        Ok(())
326    }
327
328    fn finish(mut self) -> SessionTimelineSnapshot {
329        self.nodes.sort_by(|left, right| {
330            left.sort_ms
331                .cmp(&right.sort_ms)
332                .then_with(|| left.node.id.cmp(&right.node.id))
333        });
334        self.nodes.truncate(self.query.limit());
335
336        let visible_ids: BTreeSet<String> = self
337            .nodes
338            .iter()
339            .map(|draft| draft.node.id.clone())
340            .collect();
341        let mut children_by_parent: BTreeMap<String, Vec<String>> = BTreeMap::new();
342        for draft in &self.nodes {
343            let Some(parent_id) = draft.node.parent_id.as_ref() else {
344                continue;
345            };
346            if visible_ids.contains(parent_id) {
347                children_by_parent
348                    .entry(parent_id.clone())
349                    .or_default()
350                    .push(draft.node.id.clone());
351            }
352        }
353
354        let nodes = self
355            .nodes
356            .into_iter()
357            .enumerate()
358            .map(|(index, mut draft)| {
359                draft.node.order = index as u64;
360                draft.node.children = children_by_parent
361                    .remove(&draft.node.id)
362                    .unwrap_or_default();
363                draft.node
364            })
365            .collect();
366
367        SessionTimelineSnapshot {
368            schema_version: SESSION_TIMELINE_SCHEMA_VERSION,
369            query: self.query,
370            cursor: self.cursor,
371            nodes,
372        }
373    }
374}
375
376fn event_update(
377    query: &SessionTimelineQuery,
378    policy: &RedactionPolicy,
379    topic: &str,
380    event_id: EventId,
381    event: LogEvent,
382) -> Option<SessionTimelineUpdate> {
383    let mut node = event_node(query, policy, topic, event_id, event)?;
384    node.order = 0;
385    let mut cursor = SessionTimelineCursor::default();
386    cursor.bump(topic, event_id);
387    Some(SessionTimelineUpdate {
388        schema_version: SESSION_TIMELINE_SCHEMA_VERSION,
389        cursor,
390        node,
391    })
392}
393
394fn event_node(
395    query: &SessionTimelineQuery,
396    policy: &RedactionPolicy,
397    topic: &str,
398    event_id: EventId,
399    mut event: LogEvent,
400) -> Option<SessionTimelineNode> {
401    event.redact_in_place(policy);
402    if topic.starts_with("observability.agent_events.") {
403        return agent_event_node(query, topic, event_id, event);
404    }
405    if topic == crate::channels::CHANNEL_TRANSCRIPT_TOPIC {
406        return channel_lifecycle_node(query, topic, event_id, event);
407    }
408    if topic == crate::channels::CHANNEL_AUDIT_TOPIC {
409        return channel_audit_node(query, topic, event_id, event);
410    }
411    None
412}
413
414fn span_node(span: &RunTraceSpanRecord, policy: &RedactionPolicy) -> SessionTimelineNode {
415    let mut attributes = serde_json::json!(span.metadata);
416    policy.redact_json_in_place(&mut attributes);
417    let status = attributes
418        .get("status")
419        .and_then(serde_json::Value::as_str)
420        .unwrap_or("completed")
421        .to_string();
422    SessionTimelineNode {
423        id: span_node_id(&span.trace_id, span.span_id),
424        parent_id: span
425            .parent_id
426            .map(|parent| span_node_id(&span.trace_id, parent)),
427        children: Vec::new(),
428        category: "span".to_string(),
429        kind: span.kind.clone(),
430        name: span.name.clone(),
431        status,
432        trace_id: Some(span.trace_id.clone()),
433        span_id: Some(span.span_id.to_string()),
434        occurred_at_ms: None,
435        start_ms: Some(span.start_ms),
436        duration_ms: Some(span.duration_ms),
437        attributes,
438        references: vec![SessionTimelineReference {
439            kind: "run_trace_span".to_string(),
440            id: Some(span.span_id.to_string()),
441            topic: None,
442            event_id: None,
443        }],
444        links: span
445            .links
446            .iter()
447            .map(|link| SessionTimelineLink {
448                kind: link
449                    .attributes
450                    .get("harn.link.kind")
451                    .cloned()
452                    .unwrap_or_else(|| "span_link".to_string()),
453                target_id: Some(format!("span:{}:{}", link.trace_id, link.span_id)),
454                trace_id: Some(link.trace_id.clone()),
455                span_id: Some(link.span_id.clone()),
456                event_id: None,
457            })
458            .collect(),
459        order: 0,
460    }
461}
462
463fn agent_event_node(
464    query: &SessionTimelineQuery,
465    topic: &str,
466    event_id: EventId,
467    event: LogEvent,
468) -> Option<SessionTimelineNode> {
469    if !event_matches_query(
470        query,
471        &event.payload,
472        Some(&event.headers),
473        &["session_id"],
474        &[],
475    ) {
476        return None;
477    }
478    let event_value = event.payload.get("event").unwrap_or(&event.payload);
479    let event_type = event_value
480        .get("type")
481        .and_then(serde_json::Value::as_str)
482        .unwrap_or(event.kind.as_str());
483    let status = event_status(event_value).unwrap_or("observed").to_string();
484    Some(SessionTimelineNode {
485        id: format!("event:{topic}:{event_id}"),
486        parent_id: None,
487        children: Vec::new(),
488        category: "agent_event".to_string(),
489        kind: event.kind.clone(),
490        name: event_type.to_string(),
491        status,
492        trace_id: None,
493        span_id: None,
494        occurred_at_ms: Some(event.occurred_at_ms),
495        start_ms: None,
496        duration_ms: duration_ms(event_value),
497        attributes: event.payload,
498        references: vec![event_ref(topic, event_id)],
499        links: Vec::new(),
500        order: 0,
501    })
502}
503
504fn channel_lifecycle_node(
505    query: &SessionTimelineQuery,
506    topic: &str,
507    event_id: EventId,
508    event: LogEvent,
509) -> Option<SessionTimelineNode> {
510    if !event_matches_query(
511        query,
512        &event.payload,
513        Some(&event.headers),
514        &["session_id", "matched_in_session_id"],
515        &["pipeline_id"],
516    ) {
517        return None;
518    }
519    let channel_event_id = string_field(&event.payload, "event_id");
520    let trigger_id = string_field(&event.payload, "trigger_id");
521    let is_match = event.kind == crate::channels::CHANNEL_MATCH_TRANSCRIPT_KIND;
522    let id = if is_match {
523        format!(
524            "channel:{}:match:{}",
525            channel_event_id.as_deref().unwrap_or("unknown"),
526            trigger_id.as_deref().unwrap_or("unknown")
527        )
528    } else {
529        format!(
530            "channel:{}:emit",
531            channel_event_id.as_deref().unwrap_or("unknown")
532        )
533    };
534    let mut links: Vec<SessionTimelineLink> = if is_match {
535        channel_event_id
536            .as_ref()
537            .map(|event_id| SessionTimelineLink {
538                kind: "channel_emit".to_string(),
539                target_id: Some(format!("channel:{event_id}:emit")),
540                trace_id: None,
541                span_id: None,
542                event_id: Some(event_id.clone()),
543            })
544            .into_iter()
545            .collect()
546    } else {
547        Vec::new()
548    };
549    if is_match {
550        links.extend(channel_batch_links(&event.payload));
551    }
552    Some(SessionTimelineNode {
553        id,
554        parent_id: None,
555        children: Vec::new(),
556        category: "channel".to_string(),
557        kind: event.kind.clone(),
558        name: string_field(&event.payload, "name_resolved")
559            .or_else(|| string_field(&event.payload, "name"))
560            .unwrap_or_else(|| event.kind.clone()),
561        status: if event
562            .payload
563            .get("duplicate")
564            .and_then(serde_json::Value::as_bool)
565            .unwrap_or(false)
566        {
567            "duplicate".to_string()
568        } else {
569            "observed".to_string()
570        },
571        trace_id: None,
572        span_id: string_field(&event.payload, "span_id"),
573        occurred_at_ms: Some(event.occurred_at_ms),
574        start_ms: None,
575        duration_ms: None,
576        attributes: event.payload,
577        references: vec![event_ref(topic, event_id)],
578        links,
579        order: 0,
580    })
581}
582
583fn channel_audit_node(
584    query: &SessionTimelineQuery,
585    topic: &str,
586    event_id: EventId,
587    event: LogEvent,
588) -> Option<SessionTimelineNode> {
589    if !event_matches_query(
590        query,
591        &event.payload,
592        Some(&event.headers),
593        &["session_id", "matched_in_session_id"],
594        &["pipeline_id", "run_id"],
595    ) {
596        return None;
597    }
598    let channel_event_id = string_field(&event.payload, "event_id");
599    let trigger_id = string_field(&event.payload, "trigger_id");
600    let is_match = event.kind == crate::channels::CHANNEL_MATCH_RECEIPT_KIND;
601    let id = if is_match {
602        format!(
603            "channel_receipt:{}:match:{}",
604            channel_event_id.as_deref().unwrap_or("unknown"),
605            trigger_id.as_deref().unwrap_or("unknown")
606        )
607    } else {
608        format!(
609            "channel_receipt:{}:emit",
610            channel_event_id.as_deref().unwrap_or("unknown")
611        )
612    };
613    let mut links: Vec<SessionTimelineLink> = if is_match {
614        channel_event_id
615            .as_ref()
616            .map(|event_id| SessionTimelineLink {
617                kind: "channel_emit".to_string(),
618                target_id: Some(format!("channel_receipt:{event_id}:emit")),
619                trace_id: None,
620                span_id: None,
621                event_id: Some(event_id.clone()),
622            })
623            .into_iter()
624            .collect()
625    } else {
626        Vec::new()
627    };
628    if is_match {
629        links.extend(channel_batch_links(&event.payload));
630    }
631    Some(SessionTimelineNode {
632        id,
633        parent_id: None,
634        children: Vec::new(),
635        category: "channel_audit".to_string(),
636        kind: event.kind.clone(),
637        name: string_field(&event.payload, "name_resolved").unwrap_or_else(|| event.kind.clone()),
638        status: event
639            .payload
640            .get("handler_result")
641            .and_then(|value| value.get("status"))
642            .and_then(serde_json::Value::as_str)
643            .or_else(|| {
644                event.payload.get("inserted").and_then(|inserted| {
645                    if inserted.as_bool() == Some(false) {
646                        Some("duplicate")
647                    } else {
648                        None
649                    }
650                })
651            })
652            .unwrap_or("recorded")
653            .to_string(),
654        trace_id: None,
655        span_id: string_field(&event.payload, "span_id"),
656        occurred_at_ms: Some(event.occurred_at_ms),
657        start_ms: None,
658        duration_ms: None,
659        attributes: event.payload,
660        references: vec![event_ref(topic, event_id)],
661        links,
662        order: 0,
663    })
664}
665
666fn event_matches_query(
667    query: &SessionTimelineQuery,
668    payload: &serde_json::Value,
669    headers: Option<&BTreeMap<String, String>>,
670    session_keys: &[&str],
671    run_keys: &[&str],
672) -> bool {
673    field_query_matches(query.session_id.as_deref(), payload, headers, session_keys)
674        && field_query_matches(query.run_id.as_deref(), payload, headers, run_keys)
675        && field_query_matches(
676            query.project_id.as_deref(),
677            payload,
678            headers,
679            &["project_id", "projectId", "workspace_id", "workspaceId"],
680        )
681}
682
683fn field_query_matches(
684    expected: Option<&str>,
685    payload: &serde_json::Value,
686    headers: Option<&BTreeMap<String, String>>,
687    keys: &[&str],
688) -> bool {
689    let Some(expected) = expected else {
690        return true;
691    };
692    if expected.is_empty() {
693        return true;
694    }
695    if keys.is_empty() {
696        return true;
697    }
698    keys.iter().any(|key| {
699        payload
700            .get(*key)
701            .and_then(serde_json::Value::as_str)
702            .is_some_and(|value| value == expected)
703            || payload
704                .get("event")
705                .and_then(|event| event.get(*key))
706                .and_then(serde_json::Value::as_str)
707                .is_some_and(|value| value == expected)
708            || headers
709                .and_then(|headers| headers.get(*key))
710                .is_some_and(|value| value == expected)
711    })
712}
713
714fn span_matches_query(span: &RunTraceSpanRecord, query: &SessionTimelineQuery) -> bool {
715    if let Some(session_id) = query.session_id.as_deref() {
716        let has_session_attr = span.metadata.contains_key("session_id")
717            || span.metadata.contains_key("agent_session_id");
718        if has_session_attr
719            && !metadata_matches(
720                &span.metadata,
721                &["session_id", "agent_session_id"],
722                session_id,
723            )
724        {
725            return false;
726        }
727    }
728    true
729}
730
731fn run_matches_query(run: &RunRecord, query: &SessionTimelineQuery) -> bool {
732    if let Some(run_id) = query.run_id.as_deref() {
733        if run.id != run_id {
734            return false;
735        }
736    }
737    if let Some(project_id) = query.project_id.as_deref() {
738        if !metadata_matches(&run.metadata, &["project_id", "projectId"], project_id) {
739            return false;
740        }
741    }
742    true
743}
744
745fn metadata_matches(
746    metadata: &BTreeMap<String, serde_json::Value>,
747    keys: &[&str],
748    expected: &str,
749) -> bool {
750    keys.iter().any(|key| {
751        metadata
752            .get(*key)
753            .and_then(serde_json::Value::as_str)
754            .is_some_and(|value| value == expected)
755    })
756}
757
758fn event_status(value: &serde_json::Value) -> Option<&str> {
759    value
760        .get("status")
761        .and_then(serde_json::Value::as_str)
762        .or_else(|| value.get("verdict").and_then(serde_json::Value::as_str))
763}
764
765fn duration_ms(value: &serde_json::Value) -> Option<u64> {
766    value
767        .get("duration_ms")
768        .or_else(|| value.get("judge_duration_ms"))
769        .and_then(serde_json::Value::as_u64)
770}
771
772fn string_field(value: &serde_json::Value, key: &str) -> Option<String> {
773    let value = value.get(key)?;
774    if let Some(text) = value.as_str() {
775        if !text.is_empty() {
776            return Some(text.to_string());
777        }
778        return None;
779    }
780    value.as_u64().map(|number| number.to_string())
781}
782
783fn channel_batch_links(payload: &serde_json::Value) -> Vec<SessionTimelineLink> {
784    payload
785        .get("batch")
786        .and_then(|batch| batch.get("constituent_event_ids"))
787        .and_then(serde_json::Value::as_array)
788        .into_iter()
789        .flatten()
790        .filter_map(|value| value.as_str())
791        .map(|event_id| SessionTimelineLink {
792            kind: "channel_batch_member".to_string(),
793            target_id: None,
794            trace_id: None,
795            span_id: None,
796            event_id: Some(event_id.to_string()),
797        })
798        .collect()
799}
800
801fn span_node_id(trace_id: &str, span_id: u64) -> String {
802    format!("span:{trace_id}:{span_id}")
803}
804
805fn event_ref(topic: &str, event_id: EventId) -> SessionTimelineReference {
806    SessionTimelineReference {
807        kind: "event_log".to_string(),
808        id: None,
809        topic: Some(topic.to_string()),
810        event_id: Some(event_id),
811    }
812}
813
814fn static_topic(topic: &str) -> Topic {
815    Topic::new(topic).expect("static session timeline topic should be valid")
816}
817
818fn load_run_for_timeline(
819    query: &SessionTimelineQuery,
820) -> Result<Option<RunRecord>, SessionTimelineError> {
821    if let Some(path) = query
822        .run_path
823        .as_deref()
824        .map(str::trim)
825        .filter(|path| !path.is_empty())
826    {
827        return load_run_record_for_timeline(Path::new(path), true);
828    }
829
830    let Some(run_id) = query
831        .run_id
832        .as_deref()
833        .map(str::trim)
834        .filter(|run_id| !run_id.is_empty())
835    else {
836        return Ok(None);
837    };
838    let path = default_run_record_path(run_id)?;
839    load_run_record_for_timeline(&path, false)
840}
841
842fn load_run_record_for_timeline(
843    path: &Path,
844    explicit: bool,
845) -> Result<Option<RunRecord>, SessionTimelineError> {
846    if !path.exists() {
847        if explicit {
848            return Err(SessionTimelineError::RunRecord(format!(
849                "session timeline run record not found: {}",
850                path.display()
851            )));
852        }
853        return Ok(None);
854    }
855    load_run_record(path).map(Some).map_err(|error| {
856        SessionTimelineError::RunRecord(format!(
857            "failed to load session timeline run record {}: {error}",
858            path.display()
859        ))
860    })
861}
862
863fn default_run_record_path(run_id: &str) -> Result<PathBuf, SessionTimelineError> {
864    if run_id == "." || run_id == ".." || run_id.contains('/') || run_id.contains('\\') {
865        return Err(SessionTimelineError::RunRecord(format!(
866            "session timeline runId is not a valid default run-record filename: {run_id}"
867        )));
868    }
869    let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
870    Ok(crate::runtime_paths::run_root(&base).join(format!("{run_id}.json")))
871}
872
873#[cfg(test)]
874#[path = "session_timeline_tests.rs"]
875mod session_timeline_tests;