1use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use uuid::Uuid;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12#[serde(tag = "type", rename_all = "snake_case")]
13pub enum RecordedEvent {
14 ClockNow {
16 nanos: u64,
18 },
19
20 SystemTime {
22 millis: u64,
24 },
25
26 Sleep {
28 duration_ms: u64,
30 },
31
32 TimeAdvanced {
34 duration_ms: u64,
36 },
37
38 UuidGenerated {
40 uuid: String,
42 },
43
44 RandomU64 {
46 value: u64,
48 },
49
50 RandomF64 {
52 value: f64,
54 },
55
56 RandomBytes {
58 count: usize,
60 },
61
62 HttpRequest {
64 method: String,
66 url: String,
68 headers: HashMap<String, String>,
70 body_size: usize,
72 },
73
74 HttpResponse {
76 status: u16,
78 body_size: usize,
80 },
81
82 FileRead {
84 path: String,
86 success: bool,
88 bytes_read: Option<usize>,
90 },
91
92 FileWrite {
94 path: String,
96 success: bool,
98 bytes_written: usize,
100 },
101
102 EnvRead {
104 key: String,
106 value: Option<String>,
108 },
109
110 EnvSet {
112 key: String,
114 value: String,
116 },
117
118 SecretRead {
120 key: String,
122 found: bool,
124 },
125
126 NodeExecutionStart {
128 node_id: u32,
130 node_type: String,
132 },
133
134 NodeExecutionComplete {
136 node_id: u32,
138 output_port: String,
140 success: bool,
142 },
143
144 Custom {
146 name: String,
148 data: serde_json::Value,
150 },
151}
152
153impl RecordedEvent {
154 pub fn custom(name: impl Into<String>, data: serde_json::Value) -> Self {
156 Self::Custom {
157 name: name.into(),
158 data,
159 }
160 }
161
162 pub fn event_type(&self) -> &'static str {
164 match self {
165 Self::ClockNow { .. } => "clock_now",
166 Self::SystemTime { .. } => "system_time",
167 Self::Sleep { .. } => "sleep",
168 Self::TimeAdvanced { .. } => "time_advanced",
169 Self::UuidGenerated { .. } => "uuid_generated",
170 Self::RandomU64 { .. } => "random_u64",
171 Self::RandomF64 { .. } => "random_f64",
172 Self::RandomBytes { .. } => "random_bytes",
173 Self::HttpRequest { .. } => "http_request",
174 Self::HttpResponse { .. } => "http_response",
175 Self::FileRead { .. } => "file_read",
176 Self::FileWrite { .. } => "file_write",
177 Self::EnvRead { .. } => "env_read",
178 Self::EnvSet { .. } => "env_set",
179 Self::SecretRead { .. } => "secret_read",
180 Self::NodeExecutionStart { .. } => "node_execution_start",
181 Self::NodeExecutionComplete { .. } => "node_execution_complete",
182 Self::Custom { .. } => "custom",
183 }
184 }
185}
186
187pub struct EventRecorder {
208 events: RwLock<Vec<RecordedEvent>>,
209 enabled: std::sync::atomic::AtomicBool,
210}
211
212impl EventRecorder {
213 pub fn new() -> Self {
215 Self {
216 events: RwLock::new(Vec::new()),
217 enabled: std::sync::atomic::AtomicBool::new(true),
218 }
219 }
220
221 pub fn record(&self, event: RecordedEvent) {
223 if self.enabled.load(std::sync::atomic::Ordering::SeqCst) {
224 self.events.write().push(event);
225 }
226 }
227
228 pub fn events(&self) -> Vec<RecordedEvent> {
230 self.events.read().clone()
231 }
232
233 pub fn len(&self) -> usize {
235 self.events.read().len()
236 }
237
238 pub fn is_empty(&self) -> bool {
240 self.events.read().is_empty()
241 }
242
243 pub fn clear(&self) {
245 self.events.write().clear();
246 }
247
248 pub fn set_enabled(&self, enabled: bool) {
250 self.enabled
251 .store(enabled, std::sync::atomic::Ordering::SeqCst);
252 }
253
254 pub fn is_enabled(&self) -> bool {
256 self.enabled.load(std::sync::atomic::Ordering::SeqCst)
257 }
258
259 pub fn to_json(&self) -> String {
261 serde_json::to_string_pretty(&*self.events.read()).expect("Failed to serialize events")
262 }
263
264 pub fn to_json_compact(&self) -> String {
266 serde_json::to_string(&*self.events.read()).expect("Failed to serialize events")
267 }
268
269 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
271 let events: Vec<RecordedEvent> = serde_json::from_str(json)?;
272 Ok(Self {
273 events: RwLock::new(events),
274 enabled: std::sync::atomic::AtomicBool::new(true),
275 })
276 }
277
278 pub fn events_of_type(&self, event_type: &str) -> Vec<RecordedEvent> {
280 self.events
281 .read()
282 .iter()
283 .filter(|e| e.event_type() == event_type)
284 .cloned()
285 .collect()
286 }
287
288 pub fn find<F>(&self, predicate: F) -> Vec<RecordedEvent>
290 where
291 F: Fn(&RecordedEvent) -> bool,
292 {
293 self.events
294 .read()
295 .iter()
296 .filter(|e| predicate(e))
297 .cloned()
298 .collect()
299 }
300
301 pub fn assert_recorded(&self, event_type: &str) -> bool {
303 self.events
304 .read()
305 .iter()
306 .any(|e| e.event_type() == event_type)
307 }
308
309 pub fn assert_http_request(&self, method: &str, url_pattern: &str) -> bool {
311 let re = regex::Regex::new(url_pattern).expect("Invalid URL pattern");
312 self.events.read().iter().any(|e| {
313 if let RecordedEvent::HttpRequest { method: m, url, .. } = e {
314 m.eq_ignore_ascii_case(method) && re.is_match(url)
315 } else {
316 false
317 }
318 })
319 }
320
321 pub fn http_requests(&self) -> Vec<(String, String)> {
323 self.events
324 .read()
325 .iter()
326 .filter_map(|e| {
327 if let RecordedEvent::HttpRequest { method, url, .. } = e {
328 Some((method.clone(), url.clone()))
329 } else {
330 None
331 }
332 })
333 .collect()
334 }
335
336 pub fn generated_uuids(&self) -> Vec<Uuid> {
338 self.events
339 .read()
340 .iter()
341 .filter_map(|e| {
342 if let RecordedEvent::UuidGenerated { uuid } = e {
343 Uuid::parse_str(uuid).ok()
344 } else {
345 None
346 }
347 })
348 .collect()
349 }
350}
351
352impl Default for EventRecorder {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn record_and_retrieve() {
364 let recorder = EventRecorder::new();
365
366 recorder.record(RecordedEvent::ClockNow { nanos: 1000 });
367 recorder.record(RecordedEvent::UuidGenerated {
368 uuid: "test-uuid".to_string(),
369 });
370
371 let events = recorder.events();
372 assert_eq!(events.len(), 2);
373
374 assert!(matches!(events[0], RecordedEvent::ClockNow { nanos: 1000 }));
375 }
376
377 #[test]
378 fn json_serialization() {
379 let recorder = EventRecorder::new();
380 recorder.record(RecordedEvent::RandomU64 { value: 42 });
381
382 let json = recorder.to_json();
383 assert!(json.contains("random_u64"));
384 assert!(json.contains("42"));
385
386 let loaded = EventRecorder::from_json(&json).unwrap();
387 assert_eq!(loaded.len(), 1);
388 }
389
390 #[test]
391 fn filter_by_type() {
392 let recorder = EventRecorder::new();
393 recorder.record(RecordedEvent::ClockNow { nanos: 100 });
394 recorder.record(RecordedEvent::UuidGenerated {
395 uuid: "a".to_string(),
396 });
397 recorder.record(RecordedEvent::ClockNow { nanos: 200 });
398
399 let clock_events = recorder.events_of_type("clock_now");
400 assert_eq!(clock_events.len(), 2);
401 }
402
403 #[test]
404 fn disable_recording() {
405 let recorder = EventRecorder::new();
406
407 recorder.record(RecordedEvent::ClockNow { nanos: 100 });
408 assert_eq!(recorder.len(), 1);
409
410 recorder.set_enabled(false);
411 recorder.record(RecordedEvent::ClockNow { nanos: 200 });
412 assert_eq!(recorder.len(), 1); recorder.set_enabled(true);
415 recorder.record(RecordedEvent::ClockNow { nanos: 300 });
416 assert_eq!(recorder.len(), 2);
417 }
418
419 #[test]
420 fn assert_http_request() {
421 let recorder = EventRecorder::new();
422 recorder.record(RecordedEvent::HttpRequest {
423 method: "POST".to_string(),
424 url: "https://api.example.com/users".to_string(),
425 headers: HashMap::new(),
426 body_size: 0,
427 });
428
429 assert!(recorder.assert_http_request("POST", r"api\.example\.com/users"));
430 assert!(!recorder.assert_http_request("GET", r"api\.example\.com/users"));
431 assert!(!recorder.assert_http_request("POST", r"other\.com"));
432 }
433
434 #[test]
435 fn custom_events() {
436 let recorder = EventRecorder::new();
437 recorder.record(RecordedEvent::custom(
438 "my_event",
439 serde_json::json!({"key": "value"}),
440 ));
441
442 let events = recorder.events_of_type("custom");
443 assert_eq!(events.len(), 1);
444
445 if let RecordedEvent::Custom { name, data } = &events[0] {
446 assert_eq!(name, "my_event");
447 assert_eq!(data["key"], "value");
448 } else {
449 panic!("Expected Custom event");
450 }
451 }
452}