varpulis_runtime/engine/
trace.rs1#[derive(Debug, Clone)]
13pub enum TraceEntry {
14 StreamMatched {
16 stream_name: String,
17 event_type: String,
18 },
19 OperatorResult {
21 stream_name: String,
22 op_name: String,
23 passed: bool,
25 detail: Option<String>,
27 },
28 PatternState {
30 stream_name: String,
31 active_runs: usize,
32 completed: usize,
33 },
34 EventEmitted {
36 stream_name: String,
37 fields: Vec<(String, String)>,
38 },
39}
40
41#[derive(Debug)]
46pub struct TraceCollector {
47 enabled: bool,
48 entries: Vec<TraceEntry>,
49}
50
51impl Default for TraceCollector {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl TraceCollector {
58 pub fn new() -> Self {
60 Self {
61 enabled: false,
62 entries: Vec::new(),
63 }
64 }
65
66 #[inline]
68 pub fn set_enabled(&mut self, enabled: bool) {
69 self.enabled = enabled;
70 }
71
72 #[inline]
74 pub fn is_enabled(&self) -> bool {
75 self.enabled
76 }
77
78 #[inline]
80 pub fn record(&mut self, entry: TraceEntry) {
81 if self.enabled {
82 self.entries.push(entry);
83 }
84 }
85
86 pub fn drain(&mut self) -> Vec<TraceEntry> {
88 std::mem::take(&mut self.entries)
89 }
90
91 pub fn is_empty(&self) -> bool {
93 self.entries.is_empty()
94 }
95
96 pub fn len(&self) -> usize {
98 self.entries.len()
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 #[test]
107 fn test_disabled_collector_does_not_record() {
108 let mut tc = TraceCollector::new();
109 assert!(!tc.is_enabled());
110 tc.record(TraceEntry::StreamMatched {
111 stream_name: "s1".into(),
112 event_type: "Tick".into(),
113 });
114 assert!(tc.is_empty());
115 assert_eq!(tc.len(), 0);
116 }
117
118 #[test]
119 fn test_enabled_collector_records_and_drains() {
120 let mut tc = TraceCollector::new();
121 tc.set_enabled(true);
122 assert!(tc.is_enabled());
123
124 tc.record(TraceEntry::StreamMatched {
125 stream_name: "Alerts".into(),
126 event_type: "Temperature".into(),
127 });
128 tc.record(TraceEntry::OperatorResult {
129 stream_name: "Alerts".into(),
130 op_name: "Filter".into(),
131 passed: true,
132 detail: Some("temperature=105 > 100".into()),
133 });
134 tc.record(TraceEntry::PatternState {
135 stream_name: "Alerts".into(),
136 active_runs: 2,
137 completed: 1,
138 });
139 tc.record(TraceEntry::EventEmitted {
140 stream_name: "Alerts".into(),
141 fields: vec![
142 ("symbol".into(), "AAPL".into()),
143 ("price".into(), "150.5".into()),
144 ],
145 });
146
147 assert_eq!(tc.len(), 4);
148 assert!(!tc.is_empty());
149
150 let entries = tc.drain();
151 assert_eq!(entries.len(), 4);
152 assert!(tc.is_empty());
153
154 assert!(matches!(&entries[0], TraceEntry::StreamMatched { .. }));
156 assert!(matches!(
157 &entries[1],
158 TraceEntry::OperatorResult { passed: true, .. }
159 ));
160 assert!(matches!(
161 &entries[2],
162 TraceEntry::PatternState { completed: 1, .. }
163 ));
164 assert!(matches!(&entries[3], TraceEntry::EventEmitted { .. }));
165 }
166
167 #[test]
168 fn test_drain_returns_empty_when_no_entries() {
169 let mut tc = TraceCollector::new();
170 tc.set_enabled(true);
171 let entries = tc.drain();
172 assert!(entries.is_empty());
173 }
174
175 #[test]
176 fn test_toggle_enabled() {
177 let mut tc = TraceCollector::new();
178 tc.set_enabled(true);
179 tc.record(TraceEntry::StreamMatched {
180 stream_name: "s1".into(),
181 event_type: "e1".into(),
182 });
183 assert_eq!(tc.len(), 1);
184
185 tc.set_enabled(false);
186 tc.record(TraceEntry::StreamMatched {
187 stream_name: "s2".into(),
188 event_type: "e2".into(),
189 });
190 assert_eq!(tc.len(), 1);
192 }
193}