Skip to main content

varpulis_runtime/engine/
trace.rs

1//! Pipeline trace/explain mode for Varpulis.
2//!
3//! When enabled, the trace collector records how each event flows through the
4//! pipeline: which streams matched, which operators passed or blocked, pattern
5//! state transitions, and emitted output events.
6//!
7//! The collector is designed to be zero-cost when disabled — callers check
8//! [`TraceCollector::is_enabled`] before recording, and the compiler can
9//! eliminate the branch entirely when the flag is `false`.
10
11/// Trace entry recorded during pipeline execution.
12#[derive(Debug, Clone)]
13pub enum TraceEntry {
14    /// Event was routed to a stream for processing.
15    StreamMatched {
16        stream_name: String,
17        event_type: String,
18    },
19    /// An operator processed the event.
20    OperatorResult {
21        stream_name: String,
22        op_name: String,
23        /// `true` = event(s) passed through, `false` = filtered out / empty.
24        passed: bool,
25        /// Optional detail (e.g., "temperature=105 > 100").
26        detail: Option<String>,
27    },
28    /// SASE pattern state changed.
29    PatternState {
30        stream_name: String,
31        active_runs: usize,
32        completed: usize,
33    },
34    /// Output event emitted from the pipeline.
35    EventEmitted {
36        stream_name: String,
37        fields: Vec<(String, String)>,
38    },
39}
40
41/// Collects trace entries during pipeline execution.
42///
43/// When `enabled` is `false`, [`record`](Self::record) is a no-op and the
44/// compiler can optimise away the call sites.
45#[derive(Debug)]
46pub struct TraceCollector {
47    enabled: bool,
48    entries: Vec<TraceEntry>,
49}
50
51impl Default for TraceCollector {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl TraceCollector {
58    /// Create a new (disabled) trace collector.
59    pub fn new() -> Self {
60        Self {
61            enabled: false,
62            entries: Vec::new(),
63        }
64    }
65
66    /// Enable or disable trace collection.
67    #[inline]
68    pub fn set_enabled(&mut self, enabled: bool) {
69        self.enabled = enabled;
70    }
71
72    /// Whether the collector is active.
73    #[inline]
74    pub fn is_enabled(&self) -> bool {
75        self.enabled
76    }
77
78    /// Record a trace entry. No-op when disabled.
79    #[inline]
80    pub fn record(&mut self, entry: TraceEntry) {
81        if self.enabled {
82            self.entries.push(entry);
83        }
84    }
85
86    /// Drain all collected entries, returning them and leaving the collector empty.
87    pub fn drain(&mut self) -> Vec<TraceEntry> {
88        std::mem::take(&mut self.entries)
89    }
90
91    /// Whether there are no collected entries.
92    pub fn is_empty(&self) -> bool {
93        self.entries.is_empty()
94    }
95
96    /// Number of collected entries.
97    pub fn len(&self) -> usize {
98        self.entries.len()
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[test]
107    fn test_disabled_collector_does_not_record() {
108        let mut tc = TraceCollector::new();
109        assert!(!tc.is_enabled());
110        tc.record(TraceEntry::StreamMatched {
111            stream_name: "s1".into(),
112            event_type: "Tick".into(),
113        });
114        assert!(tc.is_empty());
115        assert_eq!(tc.len(), 0);
116    }
117
118    #[test]
119    fn test_enabled_collector_records_and_drains() {
120        let mut tc = TraceCollector::new();
121        tc.set_enabled(true);
122        assert!(tc.is_enabled());
123
124        tc.record(TraceEntry::StreamMatched {
125            stream_name: "Alerts".into(),
126            event_type: "Temperature".into(),
127        });
128        tc.record(TraceEntry::OperatorResult {
129            stream_name: "Alerts".into(),
130            op_name: "Filter".into(),
131            passed: true,
132            detail: Some("temperature=105 > 100".into()),
133        });
134        tc.record(TraceEntry::PatternState {
135            stream_name: "Alerts".into(),
136            active_runs: 2,
137            completed: 1,
138        });
139        tc.record(TraceEntry::EventEmitted {
140            stream_name: "Alerts".into(),
141            fields: vec![
142                ("symbol".into(), "AAPL".into()),
143                ("price".into(), "150.5".into()),
144            ],
145        });
146
147        assert_eq!(tc.len(), 4);
148        assert!(!tc.is_empty());
149
150        let entries = tc.drain();
151        assert_eq!(entries.len(), 4);
152        assert!(tc.is_empty());
153
154        // Verify entry types
155        assert!(matches!(&entries[0], TraceEntry::StreamMatched { .. }));
156        assert!(matches!(
157            &entries[1],
158            TraceEntry::OperatorResult { passed: true, .. }
159        ));
160        assert!(matches!(
161            &entries[2],
162            TraceEntry::PatternState { completed: 1, .. }
163        ));
164        assert!(matches!(&entries[3], TraceEntry::EventEmitted { .. }));
165    }
166
167    #[test]
168    fn test_drain_returns_empty_when_no_entries() {
169        let mut tc = TraceCollector::new();
170        tc.set_enabled(true);
171        let entries = tc.drain();
172        assert!(entries.is_empty());
173    }
174
175    #[test]
176    fn test_toggle_enabled() {
177        let mut tc = TraceCollector::new();
178        tc.set_enabled(true);
179        tc.record(TraceEntry::StreamMatched {
180            stream_name: "s1".into(),
181            event_type: "e1".into(),
182        });
183        assert_eq!(tc.len(), 1);
184
185        tc.set_enabled(false);
186        tc.record(TraceEntry::StreamMatched {
187            stream_name: "s2".into(),
188            event_type: "e2".into(),
189        });
190        // Should still be 1 — second record was ignored
191        assert_eq!(tc.len(), 1);
192    }
193}