1use std::collections::{BTreeMap, BTreeSet};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use futures::stream::{self, BoxStream};
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14
15use crate::event_log::{AnyEventLog, EventId, EventLog, LogError, LogEvent, Topic};
16use crate::orchestration::{load_run_record, RunRecord, RunTraceSpanRecord};
17use crate::redact::{current_policy, RedactionPolicy};
18
19pub const SESSION_TIMELINE_SCHEMA_VERSION: u32 = 1;
20pub const SESSION_TIMELINE_QUERY_METHOD: &str = "harn.session_timeline.query";
21pub const SESSION_TIMELINE_SUBSCRIBE_METHOD: &str = "harn.session_timeline.subscribe";
22pub const SESSION_TIMELINE_UNSUBSCRIBE_METHOD: &str = "harn.session_timeline.unsubscribe";
23pub const SESSION_TIMELINE_UPDATE_METHOD: &str = "harn.session_timeline.update";
24
25const DEFAULT_QUERY_LIMIT: usize = 1024;
26const READ_BATCH_SIZE: usize = 256;
27
28#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
29#[serde(default, rename_all = "camelCase")]
30pub struct SessionTimelineQuery {
31 #[serde(alias = "session_id")]
32 pub session_id: Option<String>,
33 #[serde(alias = "run_id")]
34 pub run_id: Option<String>,
35 #[serde(alias = "run_path")]
36 pub run_path: Option<String>,
37 #[serde(alias = "project_id")]
38 pub project_id: Option<String>,
39 #[serde(alias = "from_cursor")]
40 pub from_cursor: SessionTimelineCursor,
41 pub limit: Option<usize>,
42}
43
44impl SessionTimelineQuery {
45 pub fn for_session(session_id: impl Into<String>) -> Self {
46 Self {
47 session_id: Some(session_id.into()),
48 ..Self::default()
49 }
50 }
51
52 fn limit(&self) -> usize {
53 self.limit.unwrap_or(DEFAULT_QUERY_LIMIT).max(1)
54 }
55
56 fn topics(&self) -> Vec<Topic> {
57 let mut topics = Vec::new();
58 if let Some(session_id) = self.session_id.as_deref() {
59 topics.push(agent_events_topic(session_id));
60 }
61 topics.push(static_topic(crate::channels::CHANNEL_TRANSCRIPT_TOPIC));
62 topics.push(static_topic(crate::channels::CHANNEL_AUDIT_TOPIC));
63 topics
64 }
65}
66
67#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
68#[serde(default)]
69pub struct SessionTimelineCursor {
70 pub topics: BTreeMap<String, EventId>,
71}
72
73impl SessionTimelineCursor {
74 pub fn event_id_for(&self, topic: &Topic) -> Option<EventId> {
75 self.topics.get(topic.as_str()).copied()
76 }
77
78 fn bump(&mut self, topic: &str, event_id: EventId) {
79 self.topics
80 .entry(topic.to_string())
81 .and_modify(|cursor| *cursor = (*cursor).max(event_id))
82 .or_insert(event_id);
83 }
84}
85
86#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(rename_all = "camelCase")]
88pub struct SessionTimelineSnapshot {
89 pub schema_version: u32,
90 pub query: SessionTimelineQuery,
91 pub cursor: SessionTimelineCursor,
92 pub nodes: Vec<SessionTimelineNode>,
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(rename_all = "camelCase")]
97pub struct SessionTimelineUpdate {
98 pub schema_version: u32,
99 pub cursor: SessionTimelineCursor,
100 pub node: SessionTimelineNode,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(rename_all = "camelCase")]
105pub struct SessionTimelineNode {
106 pub id: String,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub parent_id: Option<String>,
109 #[serde(default, skip_serializing_if = "Vec::is_empty")]
110 pub children: Vec<String>,
111 pub category: String,
112 pub kind: String,
113 pub name: String,
114 pub status: String,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 pub trace_id: Option<String>,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub span_id: Option<String>,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub occurred_at_ms: Option<i64>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub start_ms: Option<u64>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub duration_ms: Option<u64>,
125 #[serde(default, skip_serializing_if = "serde_json::Value::is_null")]
126 pub attributes: serde_json::Value,
127 #[serde(default, skip_serializing_if = "Vec::is_empty")]
128 pub references: Vec<SessionTimelineReference>,
129 #[serde(default, skip_serializing_if = "Vec::is_empty")]
130 pub links: Vec<SessionTimelineLink>,
131 pub order: u64,
132}
133
134#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
135#[serde(rename_all = "camelCase")]
136pub struct SessionTimelineReference {
137 pub kind: String,
138 #[serde(skip_serializing_if = "Option::is_none")]
139 pub id: Option<String>,
140 #[serde(skip_serializing_if = "Option::is_none")]
141 pub topic: Option<String>,
142 #[serde(skip_serializing_if = "Option::is_none")]
143 pub event_id: Option<EventId>,
144}
145
146#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
147#[serde(rename_all = "camelCase")]
148pub struct SessionTimelineLink {
149 pub kind: String,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 pub target_id: Option<String>,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub trace_id: Option<String>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub span_id: Option<String>,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub event_id: Option<String>,
158}
159
160#[derive(Debug)]
161pub enum SessionTimelineError {
162 EventLog(LogError),
163 RunRecord(String),
164}
165
166impl std::fmt::Display for SessionTimelineError {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 match self {
169 Self::EventLog(error) => error.fmt(f),
170 Self::RunRecord(message) => f.write_str(message),
171 }
172 }
173}
174
175impl std::error::Error for SessionTimelineError {}
176
177impl From<LogError> for SessionTimelineError {
178 fn from(error: LogError) -> Self {
179 Self::EventLog(error)
180 }
181}
182
183#[derive(Clone)]
184struct TimelineDraft {
185 sort_ms: i128,
186 node: SessionTimelineNode,
187}
188
189pub fn agent_events_topic(session_id: &str) -> Topic {
190 Topic::new(format!(
191 "observability.agent_events.{}",
192 crate::event_log::sanitize_topic_component(session_id)
193 ))
194 .expect("sanitized session id should produce a valid topic")
195}
196
197pub fn timeline_from_run_record(
198 run: &RunRecord,
199 query: SessionTimelineQuery,
200) -> SessionTimelineSnapshot {
201 let policy = current_policy();
202 let mut builder = TimelineBuilder::new(query.clone());
203 if run_matches_query(run, &query) {
204 builder.add_run_spans(run, &policy);
205 }
206 builder.finish()
207}
208
209pub async fn query_session_timeline(
210 log: Option<&AnyEventLog>,
211 run: Option<&RunRecord>,
212 query: SessionTimelineQuery,
213) -> Result<SessionTimelineSnapshot, SessionTimelineError> {
214 let policy = current_policy();
215 let mut builder = TimelineBuilder::new(query.clone());
216 if let Some(run) = run.filter(|run| run_matches_query(run, &query)) {
217 builder.add_run_spans(run, &policy);
218 } else if run.is_none() {
219 if let Some(run) = load_run_for_timeline(&query)? {
220 if run_matches_query(&run, &query) {
221 builder.add_run_spans(&run, &policy);
222 }
223 }
224 }
225 if let Some(log) = log {
226 builder.add_event_log(log, &policy).await?;
227 }
228 Ok(builder.finish())
229}
230
231pub async fn subscribe_session_timeline(
232 log: Arc<AnyEventLog>,
233 query: SessionTimelineQuery,
234) -> Result<
235 BoxStream<'static, Result<SessionTimelineUpdate, SessionTimelineError>>,
236 SessionTimelineError,
237> {
238 let policy = current_policy();
239 let mut streams = Vec::new();
240 for topic in query.topics() {
241 let topic_name = topic.as_str().to_string();
242 let from_cursor = query.from_cursor.event_id_for(&topic);
243 let events = log.clone().subscribe(&topic, from_cursor).await?;
244 let query = query.clone();
245 let policy = policy.clone();
246 streams.push(Box::pin(events.filter_map(move |item| {
247 let topic_name = topic_name.clone();
248 let query = query.clone();
249 let policy = policy.clone();
250 async move {
251 match item {
252 Ok((event_id, event)) => {
253 event_update(&query, &policy, &topic_name, event_id, event).map(Ok)
254 }
255 Err(error) => Some(Err(SessionTimelineError::EventLog(error))),
256 }
257 }
258 }))
259 as BoxStream<
260 'static,
261 Result<SessionTimelineUpdate, SessionTimelineError>,
262 >);
263 }
264 Ok(Box::pin(stream::select_all(streams)))
265}
266
267struct TimelineBuilder {
268 query: SessionTimelineQuery,
269 cursor: SessionTimelineCursor,
270 nodes: Vec<TimelineDraft>,
271}
272
273impl TimelineBuilder {
274 fn new(query: SessionTimelineQuery) -> Self {
275 Self {
276 cursor: query.from_cursor.clone(),
277 query,
278 nodes: Vec::new(),
279 }
280 }
281
282 fn add_run_spans(&mut self, run: &RunRecord, policy: &RedactionPolicy) {
283 for span in &run.trace_spans {
284 if !span_matches_query(span, &self.query) {
285 continue;
286 }
287 let node = span_node(span, policy);
288 self.nodes.push(TimelineDraft {
289 sort_ms: i128::from(span.start_ms),
290 node,
291 });
292 }
293 }
294
295 async fn add_event_log(
296 &mut self,
297 log: &AnyEventLog,
298 policy: &RedactionPolicy,
299 ) -> Result<(), SessionTimelineError> {
300 for topic in self.query.topics() {
301 let topic_name = topic.as_str().to_string();
302 let mut from = self.query.from_cursor.event_id_for(&topic);
303 loop {
304 let batch = log.read_range(&topic, from, READ_BATCH_SIZE).await?;
305 let batch_len = batch.len();
306 for (event_id, event) in batch {
307 from = Some(event_id);
308 self.cursor.bump(&topic_name, event_id);
309 if let Some(node) =
310 event_node(&self.query, policy, &topic_name, event_id, event)
311 {
312 let sort_ms = node
313 .occurred_at_ms
314 .map(i128::from)
315 .or_else(|| node.start_ms.map(i128::from))
316 .unwrap_or(i128::from(event_id));
317 self.nodes.push(TimelineDraft { sort_ms, node });
318 }
319 }
320 if batch_len < READ_BATCH_SIZE || self.nodes.len() >= self.query.limit() {
321 break;
322 }
323 }
324 }
325 Ok(())
326 }
327
328 fn finish(mut self) -> SessionTimelineSnapshot {
329 self.nodes.sort_by(|left, right| {
330 left.sort_ms
331 .cmp(&right.sort_ms)
332 .then_with(|| left.node.id.cmp(&right.node.id))
333 });
334 self.nodes.truncate(self.query.limit());
335
336 let visible_ids: BTreeSet<String> = self
337 .nodes
338 .iter()
339 .map(|draft| draft.node.id.clone())
340 .collect();
341 let mut children_by_parent: BTreeMap<String, Vec<String>> = BTreeMap::new();
342 for draft in &self.nodes {
343 let Some(parent_id) = draft.node.parent_id.as_ref() else {
344 continue;
345 };
346 if visible_ids.contains(parent_id) {
347 children_by_parent
348 .entry(parent_id.clone())
349 .or_default()
350 .push(draft.node.id.clone());
351 }
352 }
353
354 let nodes = self
355 .nodes
356 .into_iter()
357 .enumerate()
358 .map(|(index, mut draft)| {
359 draft.node.order = index as u64;
360 draft.node.children = children_by_parent
361 .remove(&draft.node.id)
362 .unwrap_or_default();
363 draft.node
364 })
365 .collect();
366
367 SessionTimelineSnapshot {
368 schema_version: SESSION_TIMELINE_SCHEMA_VERSION,
369 query: self.query,
370 cursor: self.cursor,
371 nodes,
372 }
373 }
374}
375
376fn event_update(
377 query: &SessionTimelineQuery,
378 policy: &RedactionPolicy,
379 topic: &str,
380 event_id: EventId,
381 event: LogEvent,
382) -> Option<SessionTimelineUpdate> {
383 let mut node = event_node(query, policy, topic, event_id, event)?;
384 node.order = 0;
385 let mut cursor = SessionTimelineCursor::default();
386 cursor.bump(topic, event_id);
387 Some(SessionTimelineUpdate {
388 schema_version: SESSION_TIMELINE_SCHEMA_VERSION,
389 cursor,
390 node,
391 })
392}
393
394fn event_node(
395 query: &SessionTimelineQuery,
396 policy: &RedactionPolicy,
397 topic: &str,
398 event_id: EventId,
399 mut event: LogEvent,
400) -> Option<SessionTimelineNode> {
401 event.redact_in_place(policy);
402 if topic.starts_with("observability.agent_events.") {
403 return agent_event_node(query, topic, event_id, event);
404 }
405 if topic == crate::channels::CHANNEL_TRANSCRIPT_TOPIC {
406 return channel_lifecycle_node(query, topic, event_id, event);
407 }
408 if topic == crate::channels::CHANNEL_AUDIT_TOPIC {
409 return channel_audit_node(query, topic, event_id, event);
410 }
411 None
412}
413
414fn span_node(span: &RunTraceSpanRecord, policy: &RedactionPolicy) -> SessionTimelineNode {
415 let mut attributes = serde_json::json!(span.metadata);
416 policy.redact_json_in_place(&mut attributes);
417 let status = attributes
418 .get("status")
419 .and_then(serde_json::Value::as_str)
420 .unwrap_or("completed")
421 .to_string();
422 SessionTimelineNode {
423 id: span_node_id(&span.trace_id, span.span_id),
424 parent_id: span
425 .parent_id
426 .map(|parent| span_node_id(&span.trace_id, parent)),
427 children: Vec::new(),
428 category: "span".to_string(),
429 kind: span.kind.clone(),
430 name: span.name.clone(),
431 status,
432 trace_id: Some(span.trace_id.clone()),
433 span_id: Some(span.span_id.to_string()),
434 occurred_at_ms: None,
435 start_ms: Some(span.start_ms),
436 duration_ms: Some(span.duration_ms),
437 attributes,
438 references: vec![SessionTimelineReference {
439 kind: "run_trace_span".to_string(),
440 id: Some(span.span_id.to_string()),
441 topic: None,
442 event_id: None,
443 }],
444 links: span
445 .links
446 .iter()
447 .map(|link| SessionTimelineLink {
448 kind: link
449 .attributes
450 .get("harn.link.kind")
451 .cloned()
452 .unwrap_or_else(|| "span_link".to_string()),
453 target_id: Some(format!("span:{}:{}", link.trace_id, link.span_id)),
454 trace_id: Some(link.trace_id.clone()),
455 span_id: Some(link.span_id.clone()),
456 event_id: None,
457 })
458 .collect(),
459 order: 0,
460 }
461}
462
463fn agent_event_node(
464 query: &SessionTimelineQuery,
465 topic: &str,
466 event_id: EventId,
467 event: LogEvent,
468) -> Option<SessionTimelineNode> {
469 if !event_matches_query(
470 query,
471 &event.payload,
472 Some(&event.headers),
473 &["session_id"],
474 &[],
475 ) {
476 return None;
477 }
478 let event_value = event.payload.get("event").unwrap_or(&event.payload);
479 let event_type = event_value
480 .get("type")
481 .and_then(serde_json::Value::as_str)
482 .unwrap_or(event.kind.as_str());
483 let status = event_status(event_value).unwrap_or("observed").to_string();
484 Some(SessionTimelineNode {
485 id: format!("event:{topic}:{event_id}"),
486 parent_id: None,
487 children: Vec::new(),
488 category: "agent_event".to_string(),
489 kind: event.kind.clone(),
490 name: event_type.to_string(),
491 status,
492 trace_id: None,
493 span_id: None,
494 occurred_at_ms: Some(event.occurred_at_ms),
495 start_ms: None,
496 duration_ms: duration_ms(event_value),
497 attributes: event.payload,
498 references: vec![event_ref(topic, event_id)],
499 links: Vec::new(),
500 order: 0,
501 })
502}
503
504fn channel_lifecycle_node(
505 query: &SessionTimelineQuery,
506 topic: &str,
507 event_id: EventId,
508 event: LogEvent,
509) -> Option<SessionTimelineNode> {
510 if !event_matches_query(
511 query,
512 &event.payload,
513 Some(&event.headers),
514 &["session_id", "matched_in_session_id"],
515 &["pipeline_id"],
516 ) {
517 return None;
518 }
519 let channel_event_id = string_field(&event.payload, "event_id");
520 let trigger_id = string_field(&event.payload, "trigger_id");
521 let is_match = event.kind == crate::channels::CHANNEL_MATCH_TRANSCRIPT_KIND;
522 let id = if is_match {
523 format!(
524 "channel:{}:match:{}",
525 channel_event_id.as_deref().unwrap_or("unknown"),
526 trigger_id.as_deref().unwrap_or("unknown")
527 )
528 } else {
529 format!(
530 "channel:{}:emit",
531 channel_event_id.as_deref().unwrap_or("unknown")
532 )
533 };
534 let mut links: Vec<SessionTimelineLink> = if is_match {
535 channel_event_id
536 .as_ref()
537 .map(|event_id| SessionTimelineLink {
538 kind: "channel_emit".to_string(),
539 target_id: Some(format!("channel:{event_id}:emit")),
540 trace_id: None,
541 span_id: None,
542 event_id: Some(event_id.clone()),
543 })
544 .into_iter()
545 .collect()
546 } else {
547 Vec::new()
548 };
549 if is_match {
550 links.extend(channel_batch_links(&event.payload));
551 }
552 Some(SessionTimelineNode {
553 id,
554 parent_id: None,
555 children: Vec::new(),
556 category: "channel".to_string(),
557 kind: event.kind.clone(),
558 name: string_field(&event.payload, "name_resolved")
559 .or_else(|| string_field(&event.payload, "name"))
560 .unwrap_or_else(|| event.kind.clone()),
561 status: if event
562 .payload
563 .get("duplicate")
564 .and_then(serde_json::Value::as_bool)
565 .unwrap_or(false)
566 {
567 "duplicate".to_string()
568 } else {
569 "observed".to_string()
570 },
571 trace_id: None,
572 span_id: string_field(&event.payload, "span_id"),
573 occurred_at_ms: Some(event.occurred_at_ms),
574 start_ms: None,
575 duration_ms: None,
576 attributes: event.payload,
577 references: vec![event_ref(topic, event_id)],
578 links,
579 order: 0,
580 })
581}
582
583fn channel_audit_node(
584 query: &SessionTimelineQuery,
585 topic: &str,
586 event_id: EventId,
587 event: LogEvent,
588) -> Option<SessionTimelineNode> {
589 if !event_matches_query(
590 query,
591 &event.payload,
592 Some(&event.headers),
593 &["session_id", "matched_in_session_id"],
594 &["pipeline_id", "run_id"],
595 ) {
596 return None;
597 }
598 let channel_event_id = string_field(&event.payload, "event_id");
599 let trigger_id = string_field(&event.payload, "trigger_id");
600 let is_match = event.kind == crate::channels::CHANNEL_MATCH_RECEIPT_KIND;
601 let id = if is_match {
602 format!(
603 "channel_receipt:{}:match:{}",
604 channel_event_id.as_deref().unwrap_or("unknown"),
605 trigger_id.as_deref().unwrap_or("unknown")
606 )
607 } else {
608 format!(
609 "channel_receipt:{}:emit",
610 channel_event_id.as_deref().unwrap_or("unknown")
611 )
612 };
613 let mut links: Vec<SessionTimelineLink> = if is_match {
614 channel_event_id
615 .as_ref()
616 .map(|event_id| SessionTimelineLink {
617 kind: "channel_emit".to_string(),
618 target_id: Some(format!("channel_receipt:{event_id}:emit")),
619 trace_id: None,
620 span_id: None,
621 event_id: Some(event_id.clone()),
622 })
623 .into_iter()
624 .collect()
625 } else {
626 Vec::new()
627 };
628 if is_match {
629 links.extend(channel_batch_links(&event.payload));
630 }
631 Some(SessionTimelineNode {
632 id,
633 parent_id: None,
634 children: Vec::new(),
635 category: "channel_audit".to_string(),
636 kind: event.kind.clone(),
637 name: string_field(&event.payload, "name_resolved").unwrap_or_else(|| event.kind.clone()),
638 status: event
639 .payload
640 .get("handler_result")
641 .and_then(|value| value.get("status"))
642 .and_then(serde_json::Value::as_str)
643 .or_else(|| {
644 event.payload.get("inserted").and_then(|inserted| {
645 if inserted.as_bool() == Some(false) {
646 Some("duplicate")
647 } else {
648 None
649 }
650 })
651 })
652 .unwrap_or("recorded")
653 .to_string(),
654 trace_id: None,
655 span_id: string_field(&event.payload, "span_id"),
656 occurred_at_ms: Some(event.occurred_at_ms),
657 start_ms: None,
658 duration_ms: None,
659 attributes: event.payload,
660 references: vec![event_ref(topic, event_id)],
661 links,
662 order: 0,
663 })
664}
665
666fn event_matches_query(
667 query: &SessionTimelineQuery,
668 payload: &serde_json::Value,
669 headers: Option<&BTreeMap<String, String>>,
670 session_keys: &[&str],
671 run_keys: &[&str],
672) -> bool {
673 field_query_matches(query.session_id.as_deref(), payload, headers, session_keys)
674 && field_query_matches(query.run_id.as_deref(), payload, headers, run_keys)
675 && field_query_matches(
676 query.project_id.as_deref(),
677 payload,
678 headers,
679 &["project_id", "projectId", "workspace_id", "workspaceId"],
680 )
681}
682
683fn field_query_matches(
684 expected: Option<&str>,
685 payload: &serde_json::Value,
686 headers: Option<&BTreeMap<String, String>>,
687 keys: &[&str],
688) -> bool {
689 let Some(expected) = expected else {
690 return true;
691 };
692 if expected.is_empty() {
693 return true;
694 }
695 if keys.is_empty() {
696 return true;
697 }
698 keys.iter().any(|key| {
699 payload
700 .get(*key)
701 .and_then(serde_json::Value::as_str)
702 .is_some_and(|value| value == expected)
703 || payload
704 .get("event")
705 .and_then(|event| event.get(*key))
706 .and_then(serde_json::Value::as_str)
707 .is_some_and(|value| value == expected)
708 || headers
709 .and_then(|headers| headers.get(*key))
710 .is_some_and(|value| value == expected)
711 })
712}
713
714fn span_matches_query(span: &RunTraceSpanRecord, query: &SessionTimelineQuery) -> bool {
715 if let Some(session_id) = query.session_id.as_deref() {
716 let has_session_attr = span.metadata.contains_key("session_id")
717 || span.metadata.contains_key("agent_session_id");
718 if has_session_attr
719 && !metadata_matches(
720 &span.metadata,
721 &["session_id", "agent_session_id"],
722 session_id,
723 )
724 {
725 return false;
726 }
727 }
728 true
729}
730
731fn run_matches_query(run: &RunRecord, query: &SessionTimelineQuery) -> bool {
732 if let Some(run_id) = query.run_id.as_deref() {
733 if run.id != run_id {
734 return false;
735 }
736 }
737 if let Some(project_id) = query.project_id.as_deref() {
738 if !metadata_matches(&run.metadata, &["project_id", "projectId"], project_id) {
739 return false;
740 }
741 }
742 true
743}
744
745fn metadata_matches(
746 metadata: &BTreeMap<String, serde_json::Value>,
747 keys: &[&str],
748 expected: &str,
749) -> bool {
750 keys.iter().any(|key| {
751 metadata
752 .get(*key)
753 .and_then(serde_json::Value::as_str)
754 .is_some_and(|value| value == expected)
755 })
756}
757
758fn event_status(value: &serde_json::Value) -> Option<&str> {
759 value
760 .get("status")
761 .and_then(serde_json::Value::as_str)
762 .or_else(|| value.get("verdict").and_then(serde_json::Value::as_str))
763}
764
765fn duration_ms(value: &serde_json::Value) -> Option<u64> {
766 value
767 .get("duration_ms")
768 .or_else(|| value.get("judge_duration_ms"))
769 .and_then(serde_json::Value::as_u64)
770}
771
772fn string_field(value: &serde_json::Value, key: &str) -> Option<String> {
773 let value = value.get(key)?;
774 if let Some(text) = value.as_str() {
775 if !text.is_empty() {
776 return Some(text.to_string());
777 }
778 return None;
779 }
780 value.as_u64().map(|number| number.to_string())
781}
782
783fn channel_batch_links(payload: &serde_json::Value) -> Vec<SessionTimelineLink> {
784 payload
785 .get("batch")
786 .and_then(|batch| batch.get("constituent_event_ids"))
787 .and_then(serde_json::Value::as_array)
788 .into_iter()
789 .flatten()
790 .filter_map(|value| value.as_str())
791 .map(|event_id| SessionTimelineLink {
792 kind: "channel_batch_member".to_string(),
793 target_id: None,
794 trace_id: None,
795 span_id: None,
796 event_id: Some(event_id.to_string()),
797 })
798 .collect()
799}
800
801fn span_node_id(trace_id: &str, span_id: u64) -> String {
802 format!("span:{trace_id}:{span_id}")
803}
804
805fn event_ref(topic: &str, event_id: EventId) -> SessionTimelineReference {
806 SessionTimelineReference {
807 kind: "event_log".to_string(),
808 id: None,
809 topic: Some(topic.to_string()),
810 event_id: Some(event_id),
811 }
812}
813
814fn static_topic(topic: &str) -> Topic {
815 Topic::new(topic).expect("static session timeline topic should be valid")
816}
817
818fn load_run_for_timeline(
819 query: &SessionTimelineQuery,
820) -> Result<Option<RunRecord>, SessionTimelineError> {
821 if let Some(path) = query
822 .run_path
823 .as_deref()
824 .map(str::trim)
825 .filter(|path| !path.is_empty())
826 {
827 return load_run_record_for_timeline(Path::new(path), true);
828 }
829
830 let Some(run_id) = query
831 .run_id
832 .as_deref()
833 .map(str::trim)
834 .filter(|run_id| !run_id.is_empty())
835 else {
836 return Ok(None);
837 };
838 let path = default_run_record_path(run_id)?;
839 load_run_record_for_timeline(&path, false)
840}
841
842fn load_run_record_for_timeline(
843 path: &Path,
844 explicit: bool,
845) -> Result<Option<RunRecord>, SessionTimelineError> {
846 if !path.exists() {
847 if explicit {
848 return Err(SessionTimelineError::RunRecord(format!(
849 "session timeline run record not found: {}",
850 path.display()
851 )));
852 }
853 return Ok(None);
854 }
855 load_run_record(path).map(Some).map_err(|error| {
856 SessionTimelineError::RunRecord(format!(
857 "failed to load session timeline run record {}: {error}",
858 path.display()
859 ))
860 })
861}
862
863fn default_run_record_path(run_id: &str) -> Result<PathBuf, SessionTimelineError> {
864 if run_id == "." || run_id == ".." || run_id.contains('/') || run_id.contains('\\') {
865 return Err(SessionTimelineError::RunRecord(format!(
866 "session timeline runId is not a valid default run-record filename: {run_id}"
867 )));
868 }
869 let base = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
870 Ok(crate::runtime_paths::run_root(&base).join(format!("{run_id}.json")))
871}
872
873#[cfg(test)]
874#[path = "session_timeline_tests.rs"]
875mod session_timeline_tests;