Skip to main content

lash_trace/
lashlang_graph.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Mutex;
3
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    TraceEvent, TraceLabelMetadata, TraceLashlangExecutionEvent, TraceLashlangExecutionIdentity,
8    TraceLashlangMap, TraceLashlangStatus, TraceRecord, TraceRuntimeScope, TraceRuntimeSubject,
9    TraceSink, TraceSinkError,
10};
11
12/// Trace-derived Lashlang execution graph snapshot for hosts and debugging tools.
13#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
14pub struct TraceLashlangGraph {
15    pub graph_key: String,
16    pub scope: TraceRuntimeScope,
17    pub subject: TraceRuntimeSubject,
18    pub module_ref: String,
19    pub entry_kind: String,
20    #[serde(default, skip_serializing_if = "Option::is_none")]
21    pub entry_ref: Option<String>,
22    pub entry_name: String,
23    pub status: TraceLashlangStatus,
24    pub nodes: Vec<TraceLashlangGraphNode>,
25    pub edges: Vec<TraceLashlangGraphEdge>,
26    pub children: Vec<TraceLashlangGraphChildLink>,
27}
28
29/// Observed Lashlang graph node state.
30#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "snake_case")]
32pub enum TraceLashlangNodeStatus {
33    #[default]
34    Unobserved,
35    Running,
36    Completed,
37    Failed,
38}
39
40/// Observed branch-edge selection state.
41#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "snake_case")]
43pub enum TraceLashlangEdgeSelection {
44    #[default]
45    Unknown,
46    Selected,
47    Rejected,
48}
49
50/// Trace-derived Lashlang graph node.
51#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
52pub struct TraceLashlangGraphNode {
53    pub id: String,
54    pub kind: String,
55    pub label: String,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub label_metadata: Option<TraceLabelMetadata>,
58    pub status: TraceLashlangNodeStatus,
59    pub first_timestamp: Option<String>,
60    pub last_timestamp: Option<String>,
61    pub duration_ms: Option<i64>,
62    pub latest_error: Option<String>,
63    pub occurrence: Option<u64>,
64}
65
66impl TraceLashlangGraphNode {
67    fn unobserved(
68        id: impl Into<String>,
69        kind: impl Into<String>,
70        label: impl Into<String>,
71        label_metadata: Option<TraceLabelMetadata>,
72    ) -> Self {
73        Self {
74            id: id.into(),
75            kind: kind.into(),
76            label: label.into(),
77            label_metadata,
78            status: TraceLashlangNodeStatus::Unobserved,
79            first_timestamp: None,
80            last_timestamp: None,
81            duration_ms: None,
82            latest_error: None,
83            occurrence: None,
84        }
85    }
86}
87
88/// Trace-derived Lashlang graph edge.
89#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
90pub struct TraceLashlangGraphEdge {
91    pub id: String,
92    pub from: String,
93    pub to: String,
94    pub label: String,
95    pub selection: TraceLashlangEdgeSelection,
96}
97
98/// Link from an observed parent Lashlang node to a child execution graph.
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100pub struct TraceLashlangGraphChildLink {
101    pub parent_graph_key: String,
102    pub parent_node_id: String,
103    pub child_graph_key: String,
104    #[serde(default, skip_serializing_if = "Option::is_none")]
105    pub child_module_ref: Option<String>,
106    #[serde(default, skip_serializing_if = "Option::is_none")]
107    pub child_entry_ref: Option<String>,
108    #[serde(default, skip_serializing_if = "Option::is_none")]
109    pub child_entry_name: Option<String>,
110}
111
112/// In-memory store that reduces Lashlang execution trace records into graph snapshots.
113#[derive(Default)]
114pub struct TraceLashlangGraphStore {
115    inner: Mutex<TraceLashlangGraphState>,
116}
117
118#[derive(Default)]
119struct TraceLashlangGraphState {
120    seen_event_keys: BTreeSet<String>,
121    graphs: BTreeMap<String, TraceLashlangGraphAccumulator>,
122}
123
124#[derive(Clone, Debug)]
125struct TraceLashlangGraphAccumulator {
126    graph_key: String,
127    scope: TraceRuntimeScope,
128    subject: TraceRuntimeSubject,
129    module_ref: String,
130    entry_kind: String,
131    entry_ref: Option<String>,
132    entry_name: String,
133    status: TraceLashlangStatus,
134    nodes: BTreeMap<String, TraceLashlangGraphNode>,
135    edges: BTreeMap<String, TraceLashlangGraphEdge>,
136    children: Vec<TraceLashlangGraphChildLink>,
137}
138
139impl TraceLashlangGraphStore {
140    /// Returns a snapshot for one observed Lashlang graph key.
141    pub fn graph(&self, graph_key: &str) -> Option<TraceLashlangGraph> {
142        self.inner
143            .lock()
144            .ok()?
145            .graphs
146            .get(graph_key)
147            .map(TraceLashlangGraphAccumulator::to_graph)
148    }
149
150    /// Returns snapshots for all observed executions in stable graph-key order.
151    pub fn graphs(&self) -> Vec<TraceLashlangGraph> {
152        self.inner
153            .lock()
154            .map(|state| {
155                state
156                    .graphs
157                    .values()
158                    .map(TraceLashlangGraphAccumulator::to_graph)
159                    .collect()
160            })
161            .unwrap_or_default()
162    }
163
164    /// Clears all reduced graph projections and replay de-duplication keys.
165    pub fn clear(&self) {
166        if let Ok(mut state) = self.inner.lock() {
167            *state = TraceLashlangGraphState::default();
168        }
169    }
170}
171
172impl TraceSink for TraceLashlangGraphStore {
173    fn append(&self, record: &TraceRecord) -> Result<(), TraceSinkError> {
174        let TraceEvent::LashlangExecution { event } = &record.event else {
175            return Ok(());
176        };
177        let event_key = lashlang_execution_event_key(event);
178        let mut state = self
179            .inner
180            .lock()
181            .map_err(|_| TraceSinkError::LockPoisoned)?;
182        if !state.seen_event_keys.insert(event_key.to_string()) {
183            return Ok(());
184        }
185        reduce_lashlang_execution_event(&mut state, event, &record.timestamp);
186        Ok(())
187    }
188}
189
190impl TraceLashlangGraphAccumulator {
191    fn new(identity: &TraceLashlangExecutionIdentity) -> Self {
192        Self {
193            graph_key: identity.graph_key(),
194            scope: identity.scope.clone(),
195            subject: identity.subject.clone(),
196            module_ref: identity.module_ref.clone(),
197            entry_kind: identity.entry_kind.clone(),
198            entry_ref: identity.entry_ref.clone(),
199            entry_name: identity.entry_name.clone(),
200            status: TraceLashlangStatus::Running,
201            nodes: BTreeMap::new(),
202            edges: BTreeMap::new(),
203            children: Vec::new(),
204        }
205    }
206
207    fn to_graph(&self) -> TraceLashlangGraph {
208        TraceLashlangGraph {
209            graph_key: self.graph_key.clone(),
210            scope: self.scope.clone(),
211            subject: self.subject.clone(),
212            module_ref: self.module_ref.clone(),
213            entry_kind: self.entry_kind.clone(),
214            entry_ref: self.entry_ref.clone(),
215            entry_name: self.entry_name.clone(),
216            status: self.status,
217            nodes: self.nodes.values().cloned().collect(),
218            edges: self.edges.values().cloned().collect(),
219            children: self.children.clone(),
220        }
221    }
222}
223
224fn reduce_lashlang_execution_event(
225    state: &mut TraceLashlangGraphState,
226    event: &TraceLashlangExecutionEvent,
227    timestamp: &str,
228) {
229    match event {
230        TraceLashlangExecutionEvent::ExecutionStarted {
231            identity,
232            execution_map,
233            ..
234        } => seed_lashlang_graph(graph_mut(state, identity), execution_map),
235        TraceLashlangExecutionEvent::ExecutionFinished {
236            identity, status, ..
237        } => {
238            graph_mut(state, identity).status = *status;
239        }
240        TraceLashlangExecutionEvent::NodeStarted {
241            identity,
242            node_id,
243            node_kind,
244            label,
245            occurrence,
246            ..
247        } => {
248            let node = node_mut(
249                state,
250                TraceLashlangNodeIdentity {
251                    identity,
252                    node_id,
253                    node_kind,
254                    label,
255                },
256            );
257            if node.first_timestamp.is_none() {
258                node.first_timestamp = Some(timestamp.to_string());
259            }
260            node.last_timestamp = Some(timestamp.to_string());
261            node.status = TraceLashlangNodeStatus::Running;
262            node.occurrence = Some(*occurrence);
263        }
264        TraceLashlangExecutionEvent::NodeCompleted {
265            identity,
266            node_id,
267            node_kind,
268            label,
269            occurrence,
270            ..
271        } => {
272            let node = node_mut(
273                state,
274                TraceLashlangNodeIdentity {
275                    identity,
276                    node_id,
277                    node_kind,
278                    label,
279                },
280            );
281            node.last_timestamp = Some(timestamp.to_string());
282            node.duration_ms = duration_ms(node.first_timestamp.as_deref(), Some(timestamp));
283            node.status = TraceLashlangNodeStatus::Completed;
284            node.occurrence = Some(*occurrence);
285        }
286        TraceLashlangExecutionEvent::NodeFailed {
287            identity,
288            node_id,
289            node_kind,
290            label,
291            occurrence,
292            error,
293            ..
294        } => {
295            let node = node_mut(
296                state,
297                TraceLashlangNodeIdentity {
298                    identity,
299                    node_id,
300                    node_kind,
301                    label,
302                },
303            );
304            node.last_timestamp = Some(timestamp.to_string());
305            node.duration_ms = duration_ms(node.first_timestamp.as_deref(), Some(timestamp));
306            node.status = TraceLashlangNodeStatus::Failed;
307            node.latest_error = Some(error.clone());
308            node.occurrence = Some(*occurrence);
309        }
310        TraceLashlangExecutionEvent::BranchSelected {
311            identity,
312            node_id,
313            occurrence,
314            edge_id,
315            ..
316        } => {
317            let graph = graph_mut(state, identity);
318            if let Some(node) = graph.nodes.get_mut(node_id) {
319                node.status = TraceLashlangNodeStatus::Completed;
320                node.last_timestamp = Some(timestamp.to_string());
321                node.occurrence = Some(*occurrence);
322            }
323            let selected_edge = graph
324                .edges
325                .get(edge_id)
326                .map(|edge| (edge.from.clone(), edge.to.clone()));
327            if let Some(edge) = graph.edges.get_mut(edge_id) {
328                edge.selection = TraceLashlangEdgeSelection::Selected;
329            }
330            if let Some((selected_from, selected_to)) = selected_edge {
331                if let Some(selected_node) = graph.nodes.get_mut(&selected_to)
332                    && selected_node.kind == "branch_arm"
333                {
334                    if selected_node.first_timestamp.is_none() {
335                        selected_node.first_timestamp = Some(timestamp.to_string());
336                    }
337                    selected_node.last_timestamp = Some(timestamp.to_string());
338                    selected_node.duration_ms =
339                        duration_ms(selected_node.first_timestamp.as_deref(), Some(timestamp));
340                    selected_node.status = TraceLashlangNodeStatus::Completed;
341                    selected_node.occurrence = Some(*occurrence);
342                }
343                for edge in graph.edges.values_mut() {
344                    if edge.from == selected_from
345                        && matches!(edge.label.as_str(), "then" | "else")
346                        && edge.id != *edge_id
347                    {
348                        edge.selection = TraceLashlangEdgeSelection::Rejected;
349                    }
350                }
351            }
352        }
353        TraceLashlangExecutionEvent::ChildStarted {
354            identity,
355            parent_node_id,
356            child,
357            ..
358        } => {
359            let graph = graph_mut(state, identity);
360            let child_graph_key = child.graph_key();
361            if !graph.children.iter().any(|link| {
362                link.parent_node_id == *parent_node_id && link.child_graph_key == child_graph_key
363            }) {
364                graph.children.push(TraceLashlangGraphChildLink {
365                    parent_graph_key: identity.graph_key(),
366                    parent_node_id: parent_node_id.clone(),
367                    child_graph_key,
368                    child_module_ref: child.module_ref.clone(),
369                    child_entry_ref: child.entry_ref.clone(),
370                    child_entry_name: child.entry_name.clone(),
371                });
372            }
373        }
374    }
375}
376
377fn seed_lashlang_graph(
378    graph: &mut TraceLashlangGraphAccumulator,
379    execution_map: &TraceLashlangMap,
380) {
381    graph.status = TraceLashlangStatus::Running;
382    for node in &execution_map.nodes {
383        graph.nodes.entry(node.id.clone()).or_insert_with(|| {
384            TraceLashlangGraphNode::unobserved(
385                node.id.clone(),
386                node.kind.clone(),
387                node.label.clone(),
388                node.label_metadata.clone(),
389            )
390        });
391    }
392    for edge in &execution_map.edges {
393        graph
394            .edges
395            .entry(edge.id.clone())
396            .or_insert_with(|| TraceLashlangGraphEdge {
397                id: edge.id.clone(),
398                from: edge.from.clone(),
399                to: edge.to.clone(),
400                label: edge.label.clone(),
401                selection: TraceLashlangEdgeSelection::Unknown,
402            });
403    }
404}
405
406#[derive(Clone, Copy)]
407struct TraceLashlangNodeIdentity<'event> {
408    identity: &'event TraceLashlangExecutionIdentity,
409    node_id: &'event str,
410    node_kind: &'event str,
411    label: &'event str,
412}
413
414fn graph_mut<'a>(
415    state: &'a mut TraceLashlangGraphState,
416    identity: &TraceLashlangExecutionIdentity,
417) -> &'a mut TraceLashlangGraphAccumulator {
418    let graph_key = identity.graph_key();
419    state
420        .graphs
421        .entry(graph_key)
422        .or_insert_with(|| TraceLashlangGraphAccumulator::new(identity))
423}
424
425fn node_mut<'a>(
426    state: &'a mut TraceLashlangGraphState,
427    identity: TraceLashlangNodeIdentity<'_>,
428) -> &'a mut TraceLashlangGraphNode {
429    graph_mut(state, identity.identity)
430        .nodes
431        .entry(identity.node_id.to_string())
432        .or_insert_with(|| {
433            TraceLashlangGraphNode::unobserved(
434                identity.node_id,
435                identity.node_kind,
436                identity.label,
437                None,
438            )
439        })
440}
441
442fn lashlang_execution_event_key(event: &TraceLashlangExecutionEvent) -> &str {
443    match event {
444        TraceLashlangExecutionEvent::ExecutionStarted { event_key, .. }
445        | TraceLashlangExecutionEvent::ExecutionFinished { event_key, .. }
446        | TraceLashlangExecutionEvent::NodeStarted { event_key, .. }
447        | TraceLashlangExecutionEvent::NodeCompleted { event_key, .. }
448        | TraceLashlangExecutionEvent::NodeFailed { event_key, .. }
449        | TraceLashlangExecutionEvent::BranchSelected { event_key, .. }
450        | TraceLashlangExecutionEvent::ChildStarted { event_key, .. } => event_key,
451    }
452}
453
454fn duration_ms(first: Option<&str>, last: Option<&str>) -> Option<i64> {
455    let first = chrono::DateTime::parse_from_rfc3339(first?).ok()?;
456    let last = chrono::DateTime::parse_from_rfc3339(last?).ok()?;
457    Some((last - first).num_milliseconds().max(0))
458}
459
460#[cfg(test)]
461mod tests {
462    use chrono::{TimeZone, Utc};
463
464    use super::*;
465    use crate::{
466        TraceBranchSelection, TraceContext, TraceLabelMetadata, TraceLashlangChildExecution,
467        TraceLashlangMapEdge, TraceLashlangMapNode,
468    };
469
470    fn identity() -> TraceLashlangExecutionIdentity {
471        TraceLashlangExecutionIdentity {
472            scope: TraceRuntimeScope {
473                session_id: "session-1".to_string(),
474                turn_id: Some("turn-1".to_string()),
475                turn_index: Some(0),
476                protocol_iteration: Some(0),
477            },
478            subject: TraceRuntimeSubject::Effect {
479                effect_id: "exec-1".to_string(),
480                kind: "exec_code".to_string(),
481            },
482            module_ref: "module-1".to_string(),
483            entry_kind: "main".to_string(),
484            entry_ref: None,
485            entry_name: "main".to_string(),
486        }
487    }
488
489    fn append_at(store: &TraceLashlangGraphStore, event: TraceLashlangExecutionEvent, ms: i64) {
490        store
491            .append(&TraceRecord::new_with_timestamp(
492                TraceContext::default().for_session("session-1"),
493                TraceEvent::LashlangExecution { event },
494                Utc.timestamp_millis_opt(ms).single().expect("timestamp"),
495            ))
496            .expect("append lashlang execution event");
497    }
498
499    fn started_event(event_key: &str) -> TraceLashlangExecutionEvent {
500        TraceLashlangExecutionEvent::ExecutionStarted {
501            event_key: event_key.to_string(),
502            identity: identity(),
503            execution_map: TraceLashlangMap {
504                module_ref: "module-1".to_string(),
505                entry_kind: "main".to_string(),
506                entry_ref: None,
507                entry_name: "main".to_string(),
508                nodes: vec![
509                    TraceLashlangMapNode {
510                        id: "branch".to_string(),
511                        kind: "branch".to_string(),
512                        label: "if ready".to_string(),
513                        label_metadata: None,
514                    },
515                    TraceLashlangMapNode {
516                        id: "then".to_string(),
517                        kind: "branch_arm".to_string(),
518                        label: "then".to_string(),
519                        label_metadata: None,
520                    },
521                    TraceLashlangMapNode {
522                        id: "else".to_string(),
523                        kind: "branch_arm".to_string(),
524                        label: "else".to_string(),
525                        label_metadata: None,
526                    },
527                ],
528                edges: vec![
529                    TraceLashlangMapEdge {
530                        id: "then-edge".to_string(),
531                        from: "branch".to_string(),
532                        to: "then".to_string(),
533                        label: "then".to_string(),
534                    },
535                    TraceLashlangMapEdge {
536                        id: "else-edge".to_string(),
537                        from: "branch".to_string(),
538                        to: "else".to_string(),
539                        label: "else".to_string(),
540                    },
541                ],
542            },
543        }
544    }
545
546    fn node_started(event_key: &str, occurrence: u64) -> TraceLashlangExecutionEvent {
547        TraceLashlangExecutionEvent::NodeStarted {
548            event_key: event_key.to_string(),
549            identity: identity(),
550            node_id: "branch".to_string(),
551            node_kind: "branch".to_string(),
552            label: "if ready".to_string(),
553            occurrence,
554        }
555    }
556
557    fn node_completed(event_key: &str, occurrence: u64) -> TraceLashlangExecutionEvent {
558        TraceLashlangExecutionEvent::NodeCompleted {
559            event_key: event_key.to_string(),
560            identity: identity(),
561            node_id: "branch".to_string(),
562            node_kind: "branch".to_string(),
563            label: "if ready".to_string(),
564            occurrence,
565        }
566    }
567
568    #[test]
569    fn graph_store_seeds_static_map_on_execution_start() {
570        let store = TraceLashlangGraphStore::default();
571
572        append_at(&store, started_event("start"), 1_000);
573
574        let graph = store
575            .graph("effect:session-1:turn-1:exec-1")
576            .expect("graph");
577        assert_eq!(graph.status, TraceLashlangStatus::Running);
578        assert_eq!(graph.nodes[0].status, TraceLashlangNodeStatus::Unobserved);
579        assert_eq!(
580            graph.edges[0].selection,
581            TraceLashlangEdgeSelection::Unknown
582        );
583    }
584
585    #[test]
586    fn graph_store_preserves_static_label_metadata() {
587        let store = TraceLashlangGraphStore::default();
588        let mut event = started_event("start");
589        if let TraceLashlangExecutionEvent::ExecutionStarted { execution_map, .. } = &mut event {
590            execution_map.nodes[0].label_metadata = Some(TraceLabelMetadata {
591                title: "Choose path".to_string(),
592                description: Some("Branch detail".to_string()),
593            });
594        }
595
596        append_at(&store, event, 1_000);
597
598        let graph = store
599            .graph("effect:session-1:turn-1:exec-1")
600            .expect("graph");
601        assert_eq!(
602            graph.nodes[0].label_metadata,
603            Some(TraceLabelMetadata {
604                title: "Choose path".to_string(),
605                description: Some("Branch detail".to_string()),
606            })
607        );
608    }
609
610    #[test]
611    fn graph_store_ignores_duplicate_event_keys() {
612        let store = TraceLashlangGraphStore::default();
613
614        append_at(&store, node_started("same-key", 1), 1_000);
615        append_at(&store, node_completed("same-key", 1), 1_250);
616
617        let graph = store
618            .graph("effect:session-1:turn-1:exec-1")
619            .expect("graph");
620        assert_eq!(graph.nodes[0].status, TraceLashlangNodeStatus::Running);
621    }
622
623    #[test]
624    fn graph_store_updates_completed_node_duration() {
625        let store = TraceLashlangGraphStore::default();
626
627        append_at(&store, node_started("start-node", 1), 1_000);
628        append_at(&store, node_completed("complete-node", 2), 1_750);
629
630        let graph = store
631            .graph("effect:session-1:turn-1:exec-1")
632            .expect("graph");
633        let node = &graph.nodes[0];
634        assert_eq!(node.status, TraceLashlangNodeStatus::Completed);
635        assert_eq!(node.duration_ms, Some(750));
636        assert_eq!(node.occurrence, Some(2));
637    }
638
639    #[test]
640    fn graph_store_marks_selected_and_rejected_branch_edges() {
641        let store = TraceLashlangGraphStore::default();
642
643        append_at(&store, started_event("start"), 1_000);
644        append_at(
645            &store,
646            TraceLashlangExecutionEvent::BranchSelected {
647                event_key: "branch".to_string(),
648                identity: identity(),
649                node_id: "branch".to_string(),
650                occurrence: 1,
651                edge_id: "then-edge".to_string(),
652                selected: TraceBranchSelection::Then,
653            },
654            1_100,
655        );
656
657        let graph = store
658            .graph("effect:session-1:turn-1:exec-1")
659            .expect("graph");
660        assert_eq!(
661            graph
662                .edges
663                .iter()
664                .find(|edge| edge.id == "then-edge")
665                .map(|edge| edge.selection),
666            Some(TraceLashlangEdgeSelection::Selected)
667        );
668        assert_eq!(
669            graph
670                .edges
671                .iter()
672                .find(|edge| edge.id == "else-edge")
673                .map(|edge| edge.selection),
674            Some(TraceLashlangEdgeSelection::Rejected)
675        );
676    }
677
678    #[test]
679    fn graph_store_records_child_links() {
680        let store = TraceLashlangGraphStore::default();
681
682        append_at(
683            &store,
684            TraceLashlangExecutionEvent::ChildStarted {
685                event_key: "child".to_string(),
686                identity: identity(),
687                parent_node_id: "spawn".to_string(),
688                occurrence: 1,
689                child: TraceLashlangChildExecution {
690                    scope: TraceRuntimeScope::new("session-1"),
691                    subject: TraceRuntimeSubject::Process {
692                        process_id: "process:child".to_string(),
693                    },
694                    module_ref: Some("module-1".to_string()),
695                    entry_ref: Some("process:0".to_string()),
696                    entry_name: Some("child".to_string()),
697                },
698            },
699            1_000,
700        );
701
702        let graph = store
703            .graph("effect:session-1:turn-1:exec-1")
704            .expect("graph");
705        assert_eq!(graph.children[0].parent_node_id, "spawn");
706        assert_eq!(graph.children[0].child_graph_key, "process:process:child");
707        assert_eq!(graph.children[0].child_entry_name.as_deref(), Some("child"));
708    }
709}