1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Mutex;
3
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 TraceEvent, TraceLabelMetadata, TraceLashlangExecutionEvent, TraceLashlangExecutionIdentity,
8 TraceLashlangMap, TraceLashlangStatus, TraceRecord, TraceRuntimeScope, TraceRuntimeSubject,
9 TraceSink, TraceSinkError,
10};
11
12#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
14pub struct TraceLashlangGraph {
15 pub graph_key: String,
16 pub scope: TraceRuntimeScope,
17 pub subject: TraceRuntimeSubject,
18 pub module_ref: String,
19 pub entry_kind: String,
20 #[serde(default, skip_serializing_if = "Option::is_none")]
21 pub entry_ref: Option<String>,
22 pub entry_name: String,
23 pub status: TraceLashlangStatus,
24 pub nodes: Vec<TraceLashlangGraphNode>,
25 pub edges: Vec<TraceLashlangGraphEdge>,
26 pub children: Vec<TraceLashlangGraphChildLink>,
27}
28
29#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub enum TraceLashlangNodeStatus {
33 #[default]
34 Unobserved,
35 Running,
36 Completed,
37 Failed,
38}
39
40#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub enum TraceLashlangEdgeSelection {
44 #[default]
45 Unknown,
46 Selected,
47 Rejected,
48}
49
50#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
52pub struct TraceLashlangGraphNode {
53 pub id: String,
54 pub kind: String,
55 pub label: String,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub label_metadata: Option<TraceLabelMetadata>,
58 pub status: TraceLashlangNodeStatus,
59 pub first_timestamp: Option<String>,
60 pub last_timestamp: Option<String>,
61 pub duration_ms: Option<i64>,
62 pub latest_error: Option<String>,
63 pub occurrence: Option<u64>,
64}
65
66impl TraceLashlangGraphNode {
67 fn unobserved(
68 id: impl Into<String>,
69 kind: impl Into<String>,
70 label: impl Into<String>,
71 label_metadata: Option<TraceLabelMetadata>,
72 ) -> Self {
73 Self {
74 id: id.into(),
75 kind: kind.into(),
76 label: label.into(),
77 label_metadata,
78 status: TraceLashlangNodeStatus::Unobserved,
79 first_timestamp: None,
80 last_timestamp: None,
81 duration_ms: None,
82 latest_error: None,
83 occurrence: None,
84 }
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
90pub struct TraceLashlangGraphEdge {
91 pub id: String,
92 pub from: String,
93 pub to: String,
94 pub label: String,
95 pub selection: TraceLashlangEdgeSelection,
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100pub struct TraceLashlangGraphChildLink {
101 pub parent_graph_key: String,
102 pub parent_node_id: String,
103 pub child_graph_key: String,
104 #[serde(default, skip_serializing_if = "Option::is_none")]
105 pub child_module_ref: Option<String>,
106 #[serde(default, skip_serializing_if = "Option::is_none")]
107 pub child_entry_ref: Option<String>,
108 #[serde(default, skip_serializing_if = "Option::is_none")]
109 pub child_entry_name: Option<String>,
110}
111
112#[derive(Default)]
114pub struct TraceLashlangGraphStore {
115 inner: Mutex<TraceLashlangGraphState>,
116}
117
118#[derive(Default)]
119struct TraceLashlangGraphState {
120 seen_event_keys: BTreeSet<String>,
121 graphs: BTreeMap<String, TraceLashlangGraphAccumulator>,
122}
123
124#[derive(Clone, Debug)]
125struct TraceLashlangGraphAccumulator {
126 graph_key: String,
127 scope: TraceRuntimeScope,
128 subject: TraceRuntimeSubject,
129 module_ref: String,
130 entry_kind: String,
131 entry_ref: Option<String>,
132 entry_name: String,
133 status: TraceLashlangStatus,
134 nodes: BTreeMap<String, TraceLashlangGraphNode>,
135 edges: BTreeMap<String, TraceLashlangGraphEdge>,
136 children: Vec<TraceLashlangGraphChildLink>,
137}
138
139impl TraceLashlangGraphStore {
140 pub fn graph(&self, graph_key: &str) -> Option<TraceLashlangGraph> {
142 self.inner
143 .lock()
144 .ok()?
145 .graphs
146 .get(graph_key)
147 .map(TraceLashlangGraphAccumulator::to_graph)
148 }
149
150 pub fn graphs(&self) -> Vec<TraceLashlangGraph> {
152 self.inner
153 .lock()
154 .map(|state| {
155 state
156 .graphs
157 .values()
158 .map(TraceLashlangGraphAccumulator::to_graph)
159 .collect()
160 })
161 .unwrap_or_default()
162 }
163
164 pub fn clear(&self) {
166 if let Ok(mut state) = self.inner.lock() {
167 *state = TraceLashlangGraphState::default();
168 }
169 }
170}
171
172impl TraceSink for TraceLashlangGraphStore {
173 fn append(&self, record: &TraceRecord) -> Result<(), TraceSinkError> {
174 let TraceEvent::LashlangExecution { event } = &record.event else {
175 return Ok(());
176 };
177 let event_key = lashlang_execution_event_key(event);
178 let mut state = self
179 .inner
180 .lock()
181 .map_err(|_| TraceSinkError::LockPoisoned)?;
182 if !state.seen_event_keys.insert(event_key.to_string()) {
183 return Ok(());
184 }
185 reduce_lashlang_execution_event(&mut state, event, &record.timestamp);
186 Ok(())
187 }
188}
189
190impl TraceLashlangGraphAccumulator {
191 fn new(identity: &TraceLashlangExecutionIdentity) -> Self {
192 Self {
193 graph_key: identity.graph_key(),
194 scope: identity.scope.clone(),
195 subject: identity.subject.clone(),
196 module_ref: identity.module_ref.clone(),
197 entry_kind: identity.entry_kind.clone(),
198 entry_ref: identity.entry_ref.clone(),
199 entry_name: identity.entry_name.clone(),
200 status: TraceLashlangStatus::Running,
201 nodes: BTreeMap::new(),
202 edges: BTreeMap::new(),
203 children: Vec::new(),
204 }
205 }
206
207 fn to_graph(&self) -> TraceLashlangGraph {
208 TraceLashlangGraph {
209 graph_key: self.graph_key.clone(),
210 scope: self.scope.clone(),
211 subject: self.subject.clone(),
212 module_ref: self.module_ref.clone(),
213 entry_kind: self.entry_kind.clone(),
214 entry_ref: self.entry_ref.clone(),
215 entry_name: self.entry_name.clone(),
216 status: self.status,
217 nodes: self.nodes.values().cloned().collect(),
218 edges: self.edges.values().cloned().collect(),
219 children: self.children.clone(),
220 }
221 }
222}
223
224fn reduce_lashlang_execution_event(
225 state: &mut TraceLashlangGraphState,
226 event: &TraceLashlangExecutionEvent,
227 timestamp: &str,
228) {
229 match event {
230 TraceLashlangExecutionEvent::ExecutionStarted {
231 identity,
232 execution_map,
233 ..
234 } => seed_lashlang_graph(graph_mut(state, identity), execution_map),
235 TraceLashlangExecutionEvent::ExecutionFinished {
236 identity, status, ..
237 } => {
238 graph_mut(state, identity).status = *status;
239 }
240 TraceLashlangExecutionEvent::NodeStarted {
241 identity,
242 node_id,
243 node_kind,
244 label,
245 occurrence,
246 ..
247 } => {
248 let node = node_mut(
249 state,
250 TraceLashlangNodeIdentity {
251 identity,
252 node_id,
253 node_kind,
254 label,
255 },
256 );
257 if node.first_timestamp.is_none() {
258 node.first_timestamp = Some(timestamp.to_string());
259 }
260 node.last_timestamp = Some(timestamp.to_string());
261 node.status = TraceLashlangNodeStatus::Running;
262 node.occurrence = Some(*occurrence);
263 }
264 TraceLashlangExecutionEvent::NodeCompleted {
265 identity,
266 node_id,
267 node_kind,
268 label,
269 occurrence,
270 ..
271 } => {
272 let node = node_mut(
273 state,
274 TraceLashlangNodeIdentity {
275 identity,
276 node_id,
277 node_kind,
278 label,
279 },
280 );
281 node.last_timestamp = Some(timestamp.to_string());
282 node.duration_ms = duration_ms(node.first_timestamp.as_deref(), Some(timestamp));
283 node.status = TraceLashlangNodeStatus::Completed;
284 node.occurrence = Some(*occurrence);
285 }
286 TraceLashlangExecutionEvent::NodeFailed {
287 identity,
288 node_id,
289 node_kind,
290 label,
291 occurrence,
292 error,
293 ..
294 } => {
295 let node = node_mut(
296 state,
297 TraceLashlangNodeIdentity {
298 identity,
299 node_id,
300 node_kind,
301 label,
302 },
303 );
304 node.last_timestamp = Some(timestamp.to_string());
305 node.duration_ms = duration_ms(node.first_timestamp.as_deref(), Some(timestamp));
306 node.status = TraceLashlangNodeStatus::Failed;
307 node.latest_error = Some(error.clone());
308 node.occurrence = Some(*occurrence);
309 }
310 TraceLashlangExecutionEvent::BranchSelected {
311 identity,
312 node_id,
313 occurrence,
314 edge_id,
315 ..
316 } => {
317 let graph = graph_mut(state, identity);
318 if let Some(node) = graph.nodes.get_mut(node_id) {
319 node.status = TraceLashlangNodeStatus::Completed;
320 node.last_timestamp = Some(timestamp.to_string());
321 node.occurrence = Some(*occurrence);
322 }
323 let selected_edge = graph
324 .edges
325 .get(edge_id)
326 .map(|edge| (edge.from.clone(), edge.to.clone()));
327 if let Some(edge) = graph.edges.get_mut(edge_id) {
328 edge.selection = TraceLashlangEdgeSelection::Selected;
329 }
330 if let Some((selected_from, selected_to)) = selected_edge {
331 if let Some(selected_node) = graph.nodes.get_mut(&selected_to)
332 && selected_node.kind == "branch_arm"
333 {
334 if selected_node.first_timestamp.is_none() {
335 selected_node.first_timestamp = Some(timestamp.to_string());
336 }
337 selected_node.last_timestamp = Some(timestamp.to_string());
338 selected_node.duration_ms =
339 duration_ms(selected_node.first_timestamp.as_deref(), Some(timestamp));
340 selected_node.status = TraceLashlangNodeStatus::Completed;
341 selected_node.occurrence = Some(*occurrence);
342 }
343 for edge in graph.edges.values_mut() {
344 if edge.from == selected_from
345 && matches!(edge.label.as_str(), "then" | "else")
346 && edge.id != *edge_id
347 {
348 edge.selection = TraceLashlangEdgeSelection::Rejected;
349 }
350 }
351 }
352 }
353 TraceLashlangExecutionEvent::ChildStarted {
354 identity,
355 parent_node_id,
356 child,
357 ..
358 } => {
359 let graph = graph_mut(state, identity);
360 let child_graph_key = child.graph_key();
361 if !graph.children.iter().any(|link| {
362 link.parent_node_id == *parent_node_id && link.child_graph_key == child_graph_key
363 }) {
364 graph.children.push(TraceLashlangGraphChildLink {
365 parent_graph_key: identity.graph_key(),
366 parent_node_id: parent_node_id.clone(),
367 child_graph_key,
368 child_module_ref: child.module_ref.clone(),
369 child_entry_ref: child.entry_ref.clone(),
370 child_entry_name: child.entry_name.clone(),
371 });
372 }
373 }
374 }
375}
376
377fn seed_lashlang_graph(
378 graph: &mut TraceLashlangGraphAccumulator,
379 execution_map: &TraceLashlangMap,
380) {
381 graph.status = TraceLashlangStatus::Running;
382 for node in &execution_map.nodes {
383 graph.nodes.entry(node.id.clone()).or_insert_with(|| {
384 TraceLashlangGraphNode::unobserved(
385 node.id.clone(),
386 node.kind.clone(),
387 node.label.clone(),
388 node.label_metadata.clone(),
389 )
390 });
391 }
392 for edge in &execution_map.edges {
393 graph
394 .edges
395 .entry(edge.id.clone())
396 .or_insert_with(|| TraceLashlangGraphEdge {
397 id: edge.id.clone(),
398 from: edge.from.clone(),
399 to: edge.to.clone(),
400 label: edge.label.clone(),
401 selection: TraceLashlangEdgeSelection::Unknown,
402 });
403 }
404}
405
406#[derive(Clone, Copy)]
407struct TraceLashlangNodeIdentity<'event> {
408 identity: &'event TraceLashlangExecutionIdentity,
409 node_id: &'event str,
410 node_kind: &'event str,
411 label: &'event str,
412}
413
414fn graph_mut<'a>(
415 state: &'a mut TraceLashlangGraphState,
416 identity: &TraceLashlangExecutionIdentity,
417) -> &'a mut TraceLashlangGraphAccumulator {
418 let graph_key = identity.graph_key();
419 state
420 .graphs
421 .entry(graph_key)
422 .or_insert_with(|| TraceLashlangGraphAccumulator::new(identity))
423}
424
425fn node_mut<'a>(
426 state: &'a mut TraceLashlangGraphState,
427 identity: TraceLashlangNodeIdentity<'_>,
428) -> &'a mut TraceLashlangGraphNode {
429 graph_mut(state, identity.identity)
430 .nodes
431 .entry(identity.node_id.to_string())
432 .or_insert_with(|| {
433 TraceLashlangGraphNode::unobserved(
434 identity.node_id,
435 identity.node_kind,
436 identity.label,
437 None,
438 )
439 })
440}
441
442fn lashlang_execution_event_key(event: &TraceLashlangExecutionEvent) -> &str {
443 match event {
444 TraceLashlangExecutionEvent::ExecutionStarted { event_key, .. }
445 | TraceLashlangExecutionEvent::ExecutionFinished { event_key, .. }
446 | TraceLashlangExecutionEvent::NodeStarted { event_key, .. }
447 | TraceLashlangExecutionEvent::NodeCompleted { event_key, .. }
448 | TraceLashlangExecutionEvent::NodeFailed { event_key, .. }
449 | TraceLashlangExecutionEvent::BranchSelected { event_key, .. }
450 | TraceLashlangExecutionEvent::ChildStarted { event_key, .. } => event_key,
451 }
452}
453
454fn duration_ms(first: Option<&str>, last: Option<&str>) -> Option<i64> {
455 let first = chrono::DateTime::parse_from_rfc3339(first?).ok()?;
456 let last = chrono::DateTime::parse_from_rfc3339(last?).ok()?;
457 Some((last - first).num_milliseconds().max(0))
458}
459
460#[cfg(test)]
461mod tests {
462 use chrono::{TimeZone, Utc};
463
464 use super::*;
465 use crate::{
466 TraceBranchSelection, TraceContext, TraceLabelMetadata, TraceLashlangChildExecution,
467 TraceLashlangMapEdge, TraceLashlangMapNode,
468 };
469
470 fn identity() -> TraceLashlangExecutionIdentity {
471 TraceLashlangExecutionIdentity {
472 scope: TraceRuntimeScope {
473 session_id: "session-1".to_string(),
474 turn_id: Some("turn-1".to_string()),
475 turn_index: Some(0),
476 protocol_iteration: Some(0),
477 },
478 subject: TraceRuntimeSubject::Effect {
479 effect_id: "exec-1".to_string(),
480 kind: "exec_code".to_string(),
481 },
482 module_ref: "module-1".to_string(),
483 entry_kind: "main".to_string(),
484 entry_ref: None,
485 entry_name: "main".to_string(),
486 }
487 }
488
489 fn append_at(store: &TraceLashlangGraphStore, event: TraceLashlangExecutionEvent, ms: i64) {
490 store
491 .append(&TraceRecord::new_with_timestamp(
492 TraceContext::default().for_session("session-1"),
493 TraceEvent::LashlangExecution { event },
494 Utc.timestamp_millis_opt(ms).single().expect("timestamp"),
495 ))
496 .expect("append lashlang execution event");
497 }
498
499 fn started_event(event_key: &str) -> TraceLashlangExecutionEvent {
500 TraceLashlangExecutionEvent::ExecutionStarted {
501 event_key: event_key.to_string(),
502 identity: identity(),
503 execution_map: TraceLashlangMap {
504 module_ref: "module-1".to_string(),
505 entry_kind: "main".to_string(),
506 entry_ref: None,
507 entry_name: "main".to_string(),
508 nodes: vec![
509 TraceLashlangMapNode {
510 id: "branch".to_string(),
511 kind: "branch".to_string(),
512 label: "if ready".to_string(),
513 label_metadata: None,
514 },
515 TraceLashlangMapNode {
516 id: "then".to_string(),
517 kind: "branch_arm".to_string(),
518 label: "then".to_string(),
519 label_metadata: None,
520 },
521 TraceLashlangMapNode {
522 id: "else".to_string(),
523 kind: "branch_arm".to_string(),
524 label: "else".to_string(),
525 label_metadata: None,
526 },
527 ],
528 edges: vec![
529 TraceLashlangMapEdge {
530 id: "then-edge".to_string(),
531 from: "branch".to_string(),
532 to: "then".to_string(),
533 label: "then".to_string(),
534 },
535 TraceLashlangMapEdge {
536 id: "else-edge".to_string(),
537 from: "branch".to_string(),
538 to: "else".to_string(),
539 label: "else".to_string(),
540 },
541 ],
542 },
543 }
544 }
545
546 fn node_started(event_key: &str, occurrence: u64) -> TraceLashlangExecutionEvent {
547 TraceLashlangExecutionEvent::NodeStarted {
548 event_key: event_key.to_string(),
549 identity: identity(),
550 node_id: "branch".to_string(),
551 node_kind: "branch".to_string(),
552 label: "if ready".to_string(),
553 occurrence,
554 }
555 }
556
557 fn node_completed(event_key: &str, occurrence: u64) -> TraceLashlangExecutionEvent {
558 TraceLashlangExecutionEvent::NodeCompleted {
559 event_key: event_key.to_string(),
560 identity: identity(),
561 node_id: "branch".to_string(),
562 node_kind: "branch".to_string(),
563 label: "if ready".to_string(),
564 occurrence,
565 }
566 }
567
568 #[test]
569 fn graph_store_seeds_static_map_on_execution_start() {
570 let store = TraceLashlangGraphStore::default();
571
572 append_at(&store, started_event("start"), 1_000);
573
574 let graph = store
575 .graph("effect:session-1:turn-1:exec-1")
576 .expect("graph");
577 assert_eq!(graph.status, TraceLashlangStatus::Running);
578 assert_eq!(graph.nodes[0].status, TraceLashlangNodeStatus::Unobserved);
579 assert_eq!(
580 graph.edges[0].selection,
581 TraceLashlangEdgeSelection::Unknown
582 );
583 }
584
585 #[test]
586 fn graph_store_preserves_static_label_metadata() {
587 let store = TraceLashlangGraphStore::default();
588 let mut event = started_event("start");
589 if let TraceLashlangExecutionEvent::ExecutionStarted { execution_map, .. } = &mut event {
590 execution_map.nodes[0].label_metadata = Some(TraceLabelMetadata {
591 title: "Choose path".to_string(),
592 description: Some("Branch detail".to_string()),
593 });
594 }
595
596 append_at(&store, event, 1_000);
597
598 let graph = store
599 .graph("effect:session-1:turn-1:exec-1")
600 .expect("graph");
601 assert_eq!(
602 graph.nodes[0].label_metadata,
603 Some(TraceLabelMetadata {
604 title: "Choose path".to_string(),
605 description: Some("Branch detail".to_string()),
606 })
607 );
608 }
609
610 #[test]
611 fn graph_store_ignores_duplicate_event_keys() {
612 let store = TraceLashlangGraphStore::default();
613
614 append_at(&store, node_started("same-key", 1), 1_000);
615 append_at(&store, node_completed("same-key", 1), 1_250);
616
617 let graph = store
618 .graph("effect:session-1:turn-1:exec-1")
619 .expect("graph");
620 assert_eq!(graph.nodes[0].status, TraceLashlangNodeStatus::Running);
621 }
622
623 #[test]
624 fn graph_store_updates_completed_node_duration() {
625 let store = TraceLashlangGraphStore::default();
626
627 append_at(&store, node_started("start-node", 1), 1_000);
628 append_at(&store, node_completed("complete-node", 2), 1_750);
629
630 let graph = store
631 .graph("effect:session-1:turn-1:exec-1")
632 .expect("graph");
633 let node = &graph.nodes[0];
634 assert_eq!(node.status, TraceLashlangNodeStatus::Completed);
635 assert_eq!(node.duration_ms, Some(750));
636 assert_eq!(node.occurrence, Some(2));
637 }
638
639 #[test]
640 fn graph_store_marks_selected_and_rejected_branch_edges() {
641 let store = TraceLashlangGraphStore::default();
642
643 append_at(&store, started_event("start"), 1_000);
644 append_at(
645 &store,
646 TraceLashlangExecutionEvent::BranchSelected {
647 event_key: "branch".to_string(),
648 identity: identity(),
649 node_id: "branch".to_string(),
650 occurrence: 1,
651 edge_id: "then-edge".to_string(),
652 selected: TraceBranchSelection::Then,
653 },
654 1_100,
655 );
656
657 let graph = store
658 .graph("effect:session-1:turn-1:exec-1")
659 .expect("graph");
660 assert_eq!(
661 graph
662 .edges
663 .iter()
664 .find(|edge| edge.id == "then-edge")
665 .map(|edge| edge.selection),
666 Some(TraceLashlangEdgeSelection::Selected)
667 );
668 assert_eq!(
669 graph
670 .edges
671 .iter()
672 .find(|edge| edge.id == "else-edge")
673 .map(|edge| edge.selection),
674 Some(TraceLashlangEdgeSelection::Rejected)
675 );
676 }
677
678 #[test]
679 fn graph_store_records_child_links() {
680 let store = TraceLashlangGraphStore::default();
681
682 append_at(
683 &store,
684 TraceLashlangExecutionEvent::ChildStarted {
685 event_key: "child".to_string(),
686 identity: identity(),
687 parent_node_id: "spawn".to_string(),
688 occurrence: 1,
689 child: TraceLashlangChildExecution {
690 scope: TraceRuntimeScope::new("session-1"),
691 subject: TraceRuntimeSubject::Process {
692 process_id: "process:child".to_string(),
693 },
694 module_ref: Some("module-1".to_string()),
695 entry_ref: Some("process:0".to_string()),
696 entry_name: Some("child".to_string()),
697 },
698 },
699 1_000,
700 );
701
702 let graph = store
703 .graph("effect:session-1:turn-1:exec-1")
704 .expect("graph");
705 assert_eq!(graph.children[0].parent_node_id, "spawn");
706 assert_eq!(graph.children[0].child_graph_key, "process:process:child");
707 assert_eq!(graph.children[0].child_entry_name.as_deref(), Some("child"));
708 }
709}