1use std::cell::RefCell;
28#[cfg(feature = "async-runtime")]
29use std::collections::HashMap;
30use std::fs;
31use std::path::Path;
32use std::sync::Arc;
33#[cfg(feature = "async-runtime")]
34use std::time::Duration;
35
36use indexmap::IndexMap;
37use rustc_hash::{FxBuildHasher, FxHashMap};
38#[cfg(feature = "async-runtime")]
39use tokio::sync::mpsc;
40#[cfg(feature = "async-runtime")]
41use tokio::time;
42#[cfg(feature = "async-runtime")]
43use tracing::{debug, info};
44use varpulis_core::Value;
45
46use crate::event::Event;
47
48thread_local! {
49 static FIELD_INTERNER: RefCell<FxHashMap<Box<str>, Arc<str>>> =
50 RefCell::new(FxHashMap::default());
51}
52
53fn intern_field_name(name: &str) -> Arc<str> {
56 FIELD_INTERNER.with(|interner| {
57 let mut map = interner.borrow_mut();
58 if let Some(arc) = map.get(name) {
59 arc.clone()
60 } else {
61 let arc: Arc<str> = name.into();
62 map.insert(name.into(), arc.clone());
63 arc
64 }
65 })
66}
67
68#[derive(Debug, Clone)]
70pub struct TimedEvent {
71 pub event: Event,
73 pub time_offset_ms: u64,
75}
76
77#[derive(Debug, Clone)]
79pub struct EventFile {
80 pub name: String,
82 pub events: Vec<TimedEvent>,
84}
85
86#[derive(Debug)]
88pub struct EventFileParser;
89
90impl EventFileParser {
91 pub fn parse(source: &str) -> Result<Vec<TimedEvent>, String> {
93 let mut events = Vec::new();
94 let mut current_batch_time: u64 = 0;
95
96 for (line_num, line) in source.lines().enumerate() {
97 let line = line.trim();
98
99 if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
101 continue;
102 }
103
104 if line.starts_with("BATCH") {
106 let parts: Vec<&str> = line.split_whitespace().collect();
107 if parts.len() >= 2 {
108 current_batch_time = parts[1]
109 .parse()
110 .map_err(|_| format!("Invalid BATCH time at line {}", line_num + 1))?;
111 }
112 continue;
113 }
114
115 let (time_offset, event_line, has_explicit_timing) = if line.starts_with('@') {
117 let (t, e) = Self::parse_timing_prefix(line)?;
118 (t, e, true)
119 } else {
120 (current_batch_time, line, current_batch_time > 0)
121 };
122
123 let event = if event_line.starts_with('{') {
125 Self::parse_jsonl_line(event_line)
126 .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
127 } else {
128 Self::parse_event_line(event_line)
129 .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
130 };
131
132 let mut event = event;
137 if has_explicit_timing {
138 event.timestamp = chrono::DateTime::UNIX_EPOCH
139 + chrono::Duration::milliseconds(time_offset as i64);
140 }
141
142 events.push(TimedEvent {
143 event,
144 time_offset_ms: time_offset,
145 });
146 }
147
148 Ok(events)
149 }
150
151 fn parse_timing_prefix(line: &str) -> Result<(u64, &str), String> {
153 let line = line.trim_start_matches('@');
155
156 let space_pos = line
158 .find(char::is_whitespace)
159 .ok_or_else(|| "Invalid timing prefix format".to_string())?;
160
161 let timing_str = &line[..space_pos];
162 let rest = line[space_pos..].trim();
163
164 let time_ms = if timing_str.ends_with("ms") {
166 timing_str
167 .trim_end_matches("ms")
168 .parse::<u64>()
169 .map_err(|_| format!("Invalid timing value: {timing_str}"))?
170 } else if timing_str.ends_with('s') {
171 let secs = timing_str
172 .trim_end_matches('s')
173 .parse::<u64>()
174 .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
175 secs.checked_mul(1000)
176 .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
177 } else if timing_str.ends_with('m') {
178 let mins = timing_str
179 .trim_end_matches('m')
180 .parse::<u64>()
181 .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
182 mins.checked_mul(60_000)
183 .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
184 } else {
185 timing_str
187 .parse::<u64>()
188 .map_err(|_| format!("Invalid timing value: {timing_str}"))?
189 };
190
191 Ok((time_ms, rest))
192 }
193
194 fn parse_event_line(line: &str) -> Result<Event, String> {
196 let line = line.trim().trim_end_matches(';');
200
201 let (event_type, rest) = if let Some(brace_pos) = line.find('{') {
203 (&line[..brace_pos].trim(), &line[brace_pos..])
204 } else if let Some(paren_pos) = line.find('(') {
205 (&line[..paren_pos].trim(), &line[paren_pos..])
206 } else {
207 return Err(format!("Invalid event format: {line}"));
208 };
209
210 if rest.starts_with('{') {
212 let content = rest.trim_start_matches('{').trim_end_matches('}').trim();
214 let fields = Self::split_fields(content);
215
216 let mut event =
218 Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
219
220 for field_str in &fields {
221 let field_str = field_str.trim();
222 if field_str.is_empty() {
223 continue;
224 }
225
226 let colon_pos = field_str
227 .find(':')
228 .ok_or_else(|| format!("Invalid field format: {field_str}"))?;
229 let field_name = field_str[..colon_pos].trim();
230 let field_value = Self::parse_value(field_str[colon_pos + 1..].trim())?;
231 event
232 .data
233 .insert(intern_field_name(field_name), field_value);
234 }
235
236 Ok(event)
237 } else if rest.starts_with('(') {
238 let content = rest.trim_start_matches('(').trim_end_matches(')').trim();
240 let fields = Self::split_fields(content);
241
242 let mut event =
243 Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
244
245 for (i, value_str) in fields.iter().enumerate() {
246 let value_str = value_str.trim();
247 if value_str.is_empty() {
248 continue;
249 }
250
251 let field_value = Self::parse_value(value_str)?;
252 event.data.insert(format!("field_{i}").into(), field_value);
253 }
254
255 Ok(event)
256 } else {
257 Ok(Event::new_at(*event_type, chrono::DateTime::UNIX_EPOCH))
258 }
259 }
260
261 fn split_fields(content: &str) -> Vec<&str> {
265 let bytes = content.as_bytes();
266 let mut fields = Vec::new();
267 let mut field_start = 0;
268 let mut depth = 0i32;
269 let mut in_string = false;
270 let mut escape_next = false;
271
272 for i in 0..bytes.len() {
273 if escape_next {
274 escape_next = false;
275 continue;
276 }
277 match bytes[i] {
278 b'\\' => {
279 escape_next = true;
280 }
281 b'"' => {
282 in_string = !in_string;
283 }
284 b'{' | b'[' | b'(' if !in_string => {
285 depth += 1;
286 }
287 b'}' | b']' | b')' if !in_string => {
288 depth -= 1;
289 }
290 b',' if !in_string && depth == 0 => {
291 let field = content[field_start..i].trim();
292 if !field.is_empty() {
293 fields.push(field);
294 }
295 field_start = i + 1;
296 }
297 _ => {}
298 }
299 }
300
301 let last = content[field_start..].trim();
302 if !last.is_empty() {
303 fields.push(last);
304 }
305
306 fields
307 }
308
309 fn parse_value(s: &str) -> Result<Value, String> {
311 Self::parse_value_bounded(s, crate::limits::MAX_JSON_DEPTH)
312 }
313
314 fn parse_value_bounded(s: &str, depth: usize) -> Result<Value, String> {
316 let s = s.trim();
317
318 if s == "true" {
320 return Ok(Value::Bool(true));
321 }
322 if s == "false" {
323 return Ok(Value::Bool(false));
324 }
325
326 if s == "null" || s == "nil" {
328 return Ok(Value::Null);
329 }
330
331 if s.len() >= 2
333 && ((s.starts_with('"') && s.ends_with('"'))
334 || (s.starts_with('\'') && s.ends_with('\'')))
335 {
336 let inner = &s[1..s.len() - 1];
337 if !inner.contains('\\') {
339 return Ok(Value::Str(inner.into()));
340 }
341 let mut result = String::with_capacity(inner.len());
343 let mut chars = inner.chars();
344 while let Some(ch) = chars.next() {
345 if ch == '\\' {
346 match chars.next() {
347 Some('n') => result.push('\n'),
348 Some('t') => result.push('\t'),
349 Some('"') => result.push('"'),
350 Some('\'') => result.push('\''),
351 Some('\\') => result.push('\\'),
352 Some(other) => {
353 result.push('\\');
354 result.push(other);
355 }
356 None => result.push('\\'),
357 }
358 } else {
359 result.push(ch);
360 }
361 }
362 return Ok(Value::Str(result.into()));
363 }
364
365 if let Ok(i) = s.parse::<i64>() {
367 return Ok(Value::Int(i));
368 }
369
370 if let Ok(f) = s.parse::<f64>() {
372 return Ok(Value::Float(f));
373 }
374
375 if s.starts_with('[') && s.ends_with(']') {
377 if depth == 0 {
378 return Err("Array nesting too deep".to_string());
379 }
380 let inner = &s[1..s.len() - 1];
381 let items: Result<Vec<Value>, String> = Self::split_fields(inner)
382 .iter()
383 .filter(|s| !s.is_empty())
384 .map(|item| Self::parse_value_bounded(item, depth - 1))
385 .collect();
386 return Ok(Value::array(items?));
387 }
388
389 Ok(Value::Str(s.to_string().into()))
391 }
392
393 pub fn parse_file<P: AsRef<Path>>(path: P) -> Result<EventFile, String> {
395 let path = path.as_ref();
396 let content = fs::read_to_string(path)
397 .map_err(|e| format!("Failed to read file {}: {e}", path.display()))?;
398
399 let events = Self::parse(&content)?;
400
401 Ok(EventFile {
402 name: path.to_string_lossy().to_string(),
403 events,
404 })
405 }
406
407 pub fn parse_line(line: &str) -> Result<Option<Event>, String> {
413 let line = line.trim();
414
415 if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
417 return Ok(None);
418 }
419
420 if line.starts_with("BATCH") {
422 return Ok(None);
423 }
424
425 let line = if line.starts_with('@') {
427 let (_, rest) = Self::parse_timing_prefix(line)?;
428 rest.trim()
429 } else {
430 line
431 };
432
433 if line.starts_with('{') {
435 let event = Self::parse_jsonl_line(line)?;
436 return Ok(Some(event));
439 }
440
441 let mut event = Self::parse_event_line(line)?;
443 event.timestamp = chrono::Utc::now();
444 Ok(Some(event))
445 }
446
447 fn parse_jsonl_line(line: &str) -> Result<Event, String> {
455 if line.len() > crate::limits::MAX_EVENT_PAYLOAD_BYTES {
457 return Err(format!(
458 "JSONL line too large ({} bytes, max {})",
459 line.len(),
460 crate::limits::MAX_EVENT_PAYLOAD_BYTES
461 ));
462 }
463
464 let json: serde_json::Value =
465 serde_json::from_str(line).map_err(|e| format!("Invalid JSON: {e}"))?;
466
467 if let Some(event_type) = json.get("event_type").and_then(|v| v.as_str()) {
469 let mut event = Event::new(event_type);
470 if let Some(data) = json.get("data").and_then(|v| v.as_object()) {
471 for (key, value) in data.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
472 event
473 .data
474 .insert(key.as_str().into(), Self::json_to_value(value));
475 }
476 }
477 return Ok(event);
478 }
479
480 if let Some(event_id) = json.get("EventID").and_then(|v| v.as_i64()) {
482 let channel = json.get("Channel").and_then(|v| v.as_str()).unwrap_or("");
483 let event_type = if channel.contains("Sysmon") {
484 match event_id {
485 1 => "SysmonProcessCreate".to_string(),
486 2 => "SysmonFileCreateTime".to_string(),
487 3 => "SysmonNetworkConnect".to_string(),
488 5 => "SysmonProcessTerminate".to_string(),
489 7 => "SysmonImageLoad".to_string(),
490 8 => "SysmonCreateRemoteThread".to_string(),
491 10 => "SysmonProcessAccess".to_string(),
492 11 => "SysmonFileCreate".to_string(),
493 12 => "SysmonRegistryAddDel".to_string(),
494 13 => "SysmonRegistryValueSet".to_string(),
495 15 => "SysmonFileCreateStreamHash".to_string(),
496 17 => "SysmonPipeCreated".to_string(),
497 18 => "SysmonPipeConnected".to_string(),
498 22 => "SysmonDnsQuery".to_string(),
499 23 => "SysmonFileDelete".to_string(),
500 other => format!("Sysmon{other}"),
501 }
502 } else {
503 format!("WinEvent{event_id}")
504 };
505
506 let mut event = Event::new(&*event_type);
507
508 Self::apply_json_timestamp(&json, &mut event);
510
511 Self::promote_flat_json_fields(&json, &mut event);
513
514 return Ok(event);
515 }
516
517 if let Some(event_type) = json.get("type").and_then(|v| v.as_str()) {
519 let mut event = Event::new(event_type);
520 Self::apply_json_timestamp(&json, &mut event);
521 Self::promote_flat_json_fields(&json, &mut event);
522 return Ok(event);
523 }
524
525 Err(
526 "Missing event_type field (expected 'event_type', 'EventID'+'Channel', or 'type')"
527 .to_string(),
528 )
529 }
530
531 fn apply_json_timestamp(json: &serde_json::Value, event: &mut Event) {
535 let ts_str = json
536 .get("@timestamp")
537 .or_else(|| json.get("UtcTime"))
538 .or_else(|| json.get("TimeCreated"))
539 .and_then(|v| v.as_str());
540
541 if let Some(s) = ts_str {
542 if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(s) {
544 event.timestamp = ts.with_timezone(&chrono::Utc);
545 return;
546 }
547 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
549 event.timestamp = naive.and_utc();
550 return;
551 }
552 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
554 event.timestamp = naive.and_utc();
555 }
556 }
557 }
558
559 fn promote_flat_json_fields(json: &serde_json::Value, event: &mut Event) {
564 static SKIP_KEYS: &[&str] = &[
565 "EventID",
566 "Channel",
567 "@timestamp",
568 "@version",
569 "type",
570 "event_type",
571 ];
572
573 if let Some(obj) = json.as_object() {
574 for (key, value) in obj.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
575 if !SKIP_KEYS.contains(&key.as_str()) {
576 let v = Self::json_to_value_coerced(value);
577 event.data.insert(intern_field_name(key), v);
578 }
579 }
580 }
581 }
582
583 fn json_to_value_coerced(v: &serde_json::Value) -> Value {
587 if let Some(s) = v.as_str() {
588 if !s.is_empty() && s.len() <= 19 {
589 if let Ok(n) = s.parse::<i64>() {
590 return Value::Int(n);
591 }
592 }
593 return Self::json_to_value(v);
594 }
595 Self::json_to_value(v)
596 }
597
598 fn json_to_value(v: &serde_json::Value) -> Value {
600 Self::json_to_value_bounded(v, crate::limits::MAX_JSON_DEPTH)
601 }
602
603 fn json_to_value_bounded(v: &serde_json::Value, depth: usize) -> Value {
604 if depth == 0 {
605 return Value::Null;
606 }
607 match v {
608 serde_json::Value::Null => Value::Null,
609 serde_json::Value::Bool(b) => Value::Bool(*b),
610 serde_json::Value::Number(n) => {
611 if let Some(i) = n.as_i64() {
612 Value::Int(i)
613 } else if let Some(f) = n.as_f64() {
614 Value::Float(f)
615 } else {
616 Value::Null
617 }
618 }
619 serde_json::Value::String(s) => {
620 if s.len() > crate::limits::MAX_STRING_VALUE_BYTES {
621 let truncated =
622 &s[..s.floor_char_boundary(crate::limits::MAX_STRING_VALUE_BYTES)];
623 Value::Str(truncated.into())
624 } else {
625 Value::Str(s.clone().into())
626 }
627 }
628 serde_json::Value::Array(arr) => {
629 let capped = arr.len().min(crate::limits::MAX_ARRAY_ELEMENTS);
630 Value::array(
631 arr.iter()
632 .take(capped)
633 .map(|v| Self::json_to_value_bounded(v, depth - 1))
634 .collect(),
635 )
636 }
637 serde_json::Value::Object(obj) => {
638 let mut map: IndexMap<std::sync::Arc<str>, Value, FxBuildHasher> =
639 IndexMap::with_hasher(FxBuildHasher);
640 for (k, v) in obj.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
641 map.insert(k.as_str().into(), Self::json_to_value_bounded(v, depth - 1));
642 }
643 Value::map(map)
644 }
645 }
646 }
647}
648
649pub struct StreamingEventReader<R: std::io::BufRead> {
651 reader: R,
652 line_buffer: String,
653 events_read: usize,
654}
655
656impl<R: std::io::BufRead> std::fmt::Debug for StreamingEventReader<R> {
657 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
658 f.debug_struct("StreamingEventReader")
659 .field("events_read", &self.events_read)
660 .finish_non_exhaustive()
661 }
662}
663
664impl<R: std::io::BufRead> StreamingEventReader<R> {
665 pub const fn new(reader: R) -> Self {
666 Self {
667 reader,
668 line_buffer: String::new(),
669 events_read: 0,
670 }
671 }
672
673 pub const fn events_read(&self) -> usize {
675 self.events_read
676 }
677}
678
679impl StreamingEventReader<std::io::BufReader<std::fs::File>> {
680 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
682 let file =
683 std::fs::File::open(path.as_ref()).map_err(|e| format!("Failed to open file: {e}"))?;
684 Ok(Self::new(std::io::BufReader::with_capacity(
686 64 * 1024,
687 file,
688 )))
689 }
690}
691
692impl<R: std::io::BufRead> Iterator for StreamingEventReader<R> {
693 type Item = Result<Event, String>;
694
695 fn next(&mut self) -> Option<Self::Item> {
696 loop {
697 self.line_buffer.clear();
698 match self.reader.read_line(&mut self.line_buffer) {
699 Ok(0) => return None, Ok(_) => {
701 if self.line_buffer.len() > crate::limits::MAX_LINE_LENGTH {
703 tracing::warn!(
704 len = self.line_buffer.len(),
705 "Skipping oversized line ({} bytes, max {})",
706 self.line_buffer.len(),
707 crate::limits::MAX_LINE_LENGTH
708 );
709 self.line_buffer.clear();
710 continue;
711 }
712 match EventFileParser::parse_line(&self.line_buffer) {
713 Ok(Some(event)) => {
714 self.events_read += 1;
715 return Some(Ok(event));
716 }
717 Ok(None) => continue, Err(e) => return Some(Err(e)),
719 }
720 }
721 Err(e) => return Some(Err(format!("Read error: {e}"))),
722 }
723 }
724 }
725}
726
727#[cfg(feature = "async-runtime")]
729#[derive(Debug)]
730pub struct EventFilePlayer {
731 events: Vec<TimedEvent>,
732 sender: mpsc::Sender<Event>,
733}
734
735#[cfg(feature = "async-runtime")]
736impl EventFilePlayer {
737 pub const fn new(events: Vec<TimedEvent>, sender: mpsc::Sender<Event>) -> Self {
738 Self { events, sender }
739 }
740
741 pub fn from_file<P: AsRef<Path>>(path: P, sender: mpsc::Sender<Event>) -> Result<Self, String> {
742 let event_file = EventFileParser::parse_file(path)?;
743 Ok(Self::new(event_file.events, sender))
744 }
745
746 pub async fn play(&self) -> Result<usize, String> {
748 let start = std::time::Instant::now();
749 let mut sent_count = 0;
750
751 let mut batches: HashMap<u64, Vec<&TimedEvent>> = HashMap::new();
753 for event in &self.events {
754 batches.entry(event.time_offset_ms).or_default().push(event);
755 }
756
757 let mut times: Vec<u64> = batches.keys().copied().collect();
759 times.sort_unstable();
760
761 for batch_time in times {
762 let elapsed = start.elapsed().as_millis() as u64;
764 if batch_time > elapsed {
765 time::sleep(Duration::from_millis(batch_time - elapsed)).await;
766 }
767
768 if let Some(events) = batches.get(&batch_time) {
770 for timed_event in events {
771 debug!(
772 "Sending event: {} at {}ms",
773 timed_event.event.event_type, batch_time
774 );
775 self.sender
776 .send(timed_event.event.clone())
777 .await
778 .map_err(|e| format!("Failed to send event: {e}"))?;
779 sent_count += 1;
780 }
781 }
782 }
783
784 info!("Played {} events from file", sent_count);
785 Ok(sent_count)
786 }
787
788 pub async fn play_immediate(&self) -> Result<usize, String> {
790 let mut sent_count = 0;
791
792 for timed_event in &self.events {
793 debug!("Sending event: {}", timed_event.event.event_type);
794 self.sender
795 .send(timed_event.event.clone())
796 .await
797 .map_err(|e| format!("Failed to send event: {e}"))?;
798 sent_count += 1;
799 }
800
801 info!("Played {} events (immediate mode)", sent_count);
802 Ok(sent_count)
803 }
804}
805
806#[cfg(test)]
807mod tests {
808 use super::*;
809
810 #[test]
811 fn test_parse_simple_event() {
812 let source = r#"
813 StockTick { symbol: "AAPL", price: 150.5, volume: 1000 }
814 "#;
815
816 let events = EventFileParser::parse(source).unwrap();
817 assert_eq!(events.len(), 1);
818 assert_eq!(&*events[0].event.event_type, "StockTick");
819 assert_eq!(
820 events[0].event.get("symbol"),
821 Some(&Value::Str("AAPL".into()))
822 );
823 assert_eq!(events[0].event.get("price"), Some(&Value::Float(150.5)));
824 assert_eq!(events[0].event.get("volume"), Some(&Value::Int(1000)));
825 }
826
827 #[test]
828 fn test_parse_batched_events() {
829 let source = r#"
830 # First batch at 0ms
831 Order { id: 1, symbol: "AAPL" }
832
833 # Second batch at 100ms
834 BATCH 100
835 Payment { order_id: 1 }
836 Shipping { order_id: 1 }
837
838 # Third batch at 200ms
839 BATCH 200
840 Confirmation { order_id: 1 }
841 "#;
842
843 let events = EventFileParser::parse(source).unwrap();
844 assert_eq!(events.len(), 4);
845
846 assert_eq!(events[0].time_offset_ms, 0);
847 assert_eq!(&*events[0].event.event_type, "Order");
848
849 assert_eq!(events[1].time_offset_ms, 100);
850 assert_eq!(&*events[1].event.event_type, "Payment");
851
852 assert_eq!(events[2].time_offset_ms, 100);
853 assert_eq!(&*events[2].event.event_type, "Shipping");
854
855 assert_eq!(events[3].time_offset_ms, 200);
856 assert_eq!(&*events[3].event.event_type, "Confirmation");
857 }
858
859 #[test]
860 fn test_parse_positional_format() {
861 let source = r#"
862 StockPrice("AAPL", 150.5)
863 "#;
864
865 let events = EventFileParser::parse(source).unwrap();
866 assert_eq!(events.len(), 1);
867 assert_eq!(&*events[0].event.event_type, "StockPrice");
868 assert_eq!(
869 events[0].event.get("field_0"),
870 Some(&Value::Str("AAPL".into()))
871 );
872 assert_eq!(events[0].event.get("field_1"), Some(&Value::Float(150.5)));
873 }
874
875 #[test]
876 fn test_parse_array_values() {
877 let source = r#"
878 BatchOrder { ids: [1, 2, 3], symbols: ["AAPL", "GOOG"] }
879 "#;
880
881 let events = EventFileParser::parse(source).unwrap();
882 assert_eq!(events.len(), 1);
883
884 let ids = events[0].event.get("ids").unwrap();
885 if let Value::Array(arr) = ids {
886 assert_eq!(arr.len(), 3);
887 } else {
888 panic!("Expected array");
889 }
890 }
891
892 #[test]
893 fn test_parse_comments() {
894 let source = r"
895 # This is a comment
896 // This is also a comment
897 Event1 { x: 1 }
898 # Another comment
899 Event2 { y: 2 }
900 ";
901
902 let events = EventFileParser::parse(source).unwrap();
903 assert_eq!(events.len(), 2);
904 }
905
906 #[test]
907 fn test_sequence_scenario() {
908 let source = r#"
909 # Test sequence: Order -> Payment
910
911 # Start with an order
912 Order { id: 1, symbol: "AAPL", quantity: 100 }
913
914 # Payment arrives 50ms later
915 BATCH 50
916 Payment { order_id: 1, amount: 15000.0 }
917
918 # Another order without payment (should timeout)
919 BATCH 100
920 Order { id: 2, symbol: "GOOG", quantity: 50 }
921
922 # Much later, payment for order 2 (may timeout)
923 BATCH 5000
924 Payment { order_id: 2, amount: 7500.0 }
925 "#;
926
927 let events = EventFileParser::parse(source).unwrap();
928 assert_eq!(events.len(), 4);
929
930 assert_eq!(&*events[0].event.event_type, "Order");
932 assert_eq!(events[0].time_offset_ms, 0);
933
934 assert_eq!(&*events[1].event.event_type, "Payment");
935 assert_eq!(events[1].time_offset_ms, 50);
936
937 assert_eq!(&*events[2].event.event_type, "Order");
938 assert_eq!(events[2].time_offset_ms, 100);
939
940 assert_eq!(&*events[3].event.event_type, "Payment");
941 assert_eq!(events[3].time_offset_ms, 5000);
942 }
943
944 #[test]
949 fn test_parse_boolean_values() {
950 let source = r"
951 Flags { active: true, disabled: false }
952 ";
953
954 let events = EventFileParser::parse(source).unwrap();
955 assert_eq!(events[0].event.get("active"), Some(&Value::Bool(true)));
956 assert_eq!(events[0].event.get("disabled"), Some(&Value::Bool(false)));
957 }
958
959 #[test]
960 fn test_parse_null_values() {
961 let source = r"
962 Data { value: null, other: nil }
963 ";
964
965 let events = EventFileParser::parse(source).unwrap();
966 assert_eq!(events[0].event.get("value"), Some(&Value::Null));
967 assert_eq!(events[0].event.get("other"), Some(&Value::Null));
968 }
969
970 #[test]
971 fn test_parse_escape_sequences() {
972 let source = r#"
973 Message { text: "Hello\nWorld", path: "C:\\Users\\test" }
974 "#;
975
976 let events = EventFileParser::parse(source).unwrap();
977 let text = events[0].event.get("text").unwrap();
978 if let Value::Str(s) = text {
979 assert!(s.contains('\n'));
980 }
981 }
982
983 #[test]
984 fn test_parse_single_quoted_string() {
985 let source = r"
986 Event { name: 'single quoted' }
987 ";
988
989 let events = EventFileParser::parse(source).unwrap();
990 assert_eq!(
991 events[0].event.get("name"),
992 Some(&Value::Str("single quoted".into()))
993 );
994 }
995
996 #[test]
997 fn test_parse_unquoted_identifier() {
998 let source = r"
999 Event { status: active, mode: processing }
1000 ";
1001
1002 let events = EventFileParser::parse(source).unwrap();
1003 assert_eq!(
1004 events[0].event.get("status"),
1005 Some(&Value::Str("active".into()))
1006 );
1007 assert_eq!(
1008 events[0].event.get("mode"),
1009 Some(&Value::Str("processing".into()))
1010 );
1011 }
1012
1013 #[test]
1014 fn test_parse_negative_numbers() {
1015 let source = r"
1016 Data { temp: -15, delta: -2.5 }
1017 ";
1018
1019 let events = EventFileParser::parse(source).unwrap();
1020 assert_eq!(events[0].event.get("temp"), Some(&Value::Int(-15)));
1021 assert_eq!(events[0].event.get("delta"), Some(&Value::Float(-2.5)));
1022 }
1023
1024 #[test]
1025 fn test_parse_nested_array() {
1026 let source = r"
1027 Complex { matrix: [[1, 2], [3, 4]] }
1028 ";
1029
1030 let events = EventFileParser::parse(source).unwrap();
1031 let matrix = events[0].event.get("matrix").unwrap();
1032 if let Value::Array(arr) = matrix {
1033 assert_eq!(arr.len(), 2);
1034 } else {
1035 panic!("Expected array");
1036 }
1037 }
1038
1039 #[test]
1044 fn test_parse_invalid_event_format() {
1045 let source = "InvalidEvent";
1046 let result = EventFileParser::parse(source);
1047 assert!(result.is_err());
1048 }
1049
1050 #[test]
1051 fn test_parse_invalid_field_format() {
1052 let source = "Event { invalid_no_colon }";
1053 let result = EventFileParser::parse(source);
1054 assert!(result.is_err());
1055 }
1056
1057 #[test]
1058 fn test_parse_invalid_batch_time() {
1059 let source = r"
1060 BATCH not_a_number
1061 Event { x: 1 }
1062 ";
1063 let result = EventFileParser::parse(source);
1064 assert!(result.is_err());
1065 }
1066
1067 #[test]
1072 fn test_parse_empty_content() {
1073 let source = "";
1074 let events = EventFileParser::parse(source).unwrap();
1075 assert!(events.is_empty());
1076 }
1077
1078 #[test]
1079 fn test_parse_only_comments() {
1080 let source = r"
1081 # Comment 1
1082 // Comment 2
1083 # Comment 3
1084 ";
1085 let events = EventFileParser::parse(source).unwrap();
1086 assert!(events.is_empty());
1087 }
1088
1089 #[test]
1090 fn test_parse_empty_braces() {
1091 let source = "EmptyEvent { }";
1092 let events = EventFileParser::parse(source).unwrap();
1093 assert_eq!(events.len(), 1);
1094 assert!(events[0].event.data.is_empty());
1095 }
1096
1097 #[test]
1098 fn test_parse_semicolon_terminated() {
1099 let source = "Event { x: 1 };";
1100 let events = EventFileParser::parse(source).unwrap();
1101 assert_eq!(events.len(), 1);
1102 }
1103
1104 #[test]
1105 fn test_parse_whitespace_handling() {
1106 let source = " Event { x : 1 , y : 2 } ";
1107 let events = EventFileParser::parse(source).unwrap();
1108 assert_eq!(events.len(), 1);
1109 assert_eq!(events[0].event.get("x"), Some(&Value::Int(1)));
1110 assert_eq!(events[0].event.get("y"), Some(&Value::Int(2)));
1111 }
1112
1113 #[tokio::test]
1118 async fn test_player_immediate() {
1119 let events = vec![
1120 TimedEvent {
1121 event: Event::new("A").with_field("id", 1i64),
1122 time_offset_ms: 0,
1123 },
1124 TimedEvent {
1125 event: Event::new("B").with_field("id", 2i64),
1126 time_offset_ms: 100,
1127 },
1128 ];
1129
1130 let (tx, mut rx) = mpsc::channel(10);
1131 let player = EventFilePlayer::new(events, tx);
1132
1133 let count = player.play_immediate().await.unwrap();
1134 assert_eq!(count, 2);
1135
1136 let e1 = rx.recv().await.unwrap();
1137 assert_eq!(&*e1.event_type, "A");
1138
1139 let e2 = rx.recv().await.unwrap();
1140 assert_eq!(&*e2.event_type, "B");
1141 }
1142
1143 #[tokio::test]
1144 async fn test_player_with_batches() {
1145 let events = vec![
1146 TimedEvent {
1147 event: Event::new("First"),
1148 time_offset_ms: 0,
1149 },
1150 TimedEvent {
1151 event: Event::new("Second"),
1152 time_offset_ms: 0,
1153 },
1154 TimedEvent {
1155 event: Event::new("Third"),
1156 time_offset_ms: 10, },
1158 ];
1159
1160 let (tx, mut rx) = mpsc::channel(10);
1161 let player = EventFilePlayer::new(events, tx);
1162
1163 let count = player.play().await.unwrap();
1164 assert_eq!(count, 3);
1165
1166 assert!(rx.recv().await.is_some());
1168 assert!(rx.recv().await.is_some());
1169 assert!(rx.recv().await.is_some());
1170 }
1171
1172 #[tokio::test]
1173 async fn test_player_empty() {
1174 let events = vec![];
1175 let (tx, _rx) = mpsc::channel(10);
1176 let player = EventFilePlayer::new(events, tx);
1177
1178 let count = player.play_immediate().await.unwrap();
1179 assert_eq!(count, 0);
1180 }
1181
1182 use chrono::{Datelike, Timelike};
1186
1187 #[test]
1188 fn test_parse_sysmon_process_create() {
1189 let line = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\System32\\cmd.exe", "CommandLine": "cmd.exe /c whoami", "ParentImage": "C:\\Windows\\explorer.exe", "User": "CORP\\admin", "Hostname": "WS01", "ProcessId": 1234}"#;
1190 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1191 assert_eq!(event.event_type.as_ref(), "SysmonProcessCreate");
1192 assert_eq!(
1193 event.get_str("Image"),
1194 Some("C:\\Windows\\System32\\cmd.exe")
1195 );
1196 assert_eq!(event.get_str("CommandLine"), Some("cmd.exe /c whoami"));
1197 assert_eq!(
1198 event.get_str("ParentImage"),
1199 Some("C:\\Windows\\explorer.exe")
1200 );
1201 assert_eq!(event.get_str("User"), Some("CORP\\admin"));
1202 assert_eq!(event.get_str("Hostname"), Some("WS01"));
1203 assert_eq!(event.get_int("ProcessId"), Some(1234));
1204 assert!(event.data.get("EventID").is_none());
1206 assert!(event.data.get("Channel").is_none());
1207 }
1208
1209 #[test]
1210 fn test_parse_sysmon_network_connect() {
1211 let line = r#"{"EventID": 3, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\System32\\svchost.exe", "SourceIp": "10.0.0.5", "DestinationIp": "192.168.1.100", "DestinationPort": 445, "Protocol": "tcp", "Hostname": "WS01"}"#;
1212 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1213 assert_eq!(event.event_type.as_ref(), "SysmonNetworkConnect");
1214 assert_eq!(event.get_str("SourceIp"), Some("10.0.0.5"));
1215 assert_eq!(event.get_str("DestinationIp"), Some("192.168.1.100"));
1216 assert_eq!(event.get_int("DestinationPort"), Some(445));
1217 assert_eq!(event.get_str("Protocol"), Some("tcp"));
1218 }
1219
1220 #[test]
1221 fn test_parse_sysmon_process_access() {
1222 let line = r#"{"EventID": 10, "Channel": "Microsoft-Windows-Sysmon/Operational", "SourceImage": "C:\\tools\\mimikatz.exe", "TargetImage": "C:\\Windows\\System32\\lsass.exe", "GrantedAccess": "0x1010"}"#;
1223 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1224 assert_eq!(event.event_type.as_ref(), "SysmonProcessAccess");
1225 assert_eq!(
1226 event.get_str("TargetImage"),
1227 Some("C:\\Windows\\System32\\lsass.exe")
1228 );
1229 assert_eq!(event.get_str("GrantedAccess"), Some("0x1010"));
1230 }
1231
1232 #[test]
1233 fn test_parse_sysmon_file_create() {
1234 let line = r#"{"EventID": 11, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\System32\\cmd.exe", "TargetFilename": "C:\\temp\\data.zip"}"#;
1235 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1236 assert_eq!(event.event_type.as_ref(), "SysmonFileCreate");
1237 assert_eq!(event.get_str("TargetFilename"), Some("C:\\temp\\data.zip"));
1238 }
1239
1240 #[test]
1241 fn test_parse_sysmon_registry_value_set() {
1242 let line = r#"{"EventID": 13, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\regedit.exe", "TargetObject": "HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run\\backdoor", "Details": "C:\\malware\\payload.exe"}"#;
1243 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1244 assert_eq!(event.event_type.as_ref(), "SysmonRegistryValueSet");
1245 assert!(event.get_str("TargetObject").unwrap().contains("\\Run\\"));
1246 }
1247
1248 #[test]
1249 fn test_parse_sysmon_unknown_event_id() {
1250 let line = r#"{"EventID": 99, "Channel": "Microsoft-Windows-Sysmon/Operational", "SomeField": "value"}"#;
1251 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1252 assert_eq!(event.event_type.as_ref(), "Sysmon99");
1253 assert_eq!(event.get_str("SomeField"), Some("value"));
1254 }
1255
1256 #[test]
1257 fn test_parse_winevent_non_sysmon() {
1258 let line = r#"{"EventID": 4624, "Channel": "Security", "LogonType": 3, "TargetUserName": "admin"}"#;
1259 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1260 assert_eq!(event.event_type.as_ref(), "WinEvent4624");
1261 assert_eq!(event.get_int("LogonType"), Some(3));
1262 assert_eq!(event.get_str("TargetUserName"), Some("admin"));
1263 }
1264
1265 #[test]
1266 fn test_parse_sysmon_timestamp_rfc3339() {
1267 let line = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "@timestamp": "2020-10-18T07:50:05.917Z", "Image": "cmd.exe"}"#;
1268 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1269 assert_eq!(event.timestamp.year(), 2020);
1270 assert_eq!(event.timestamp.month(), 10);
1271 assert_eq!(event.timestamp.day(), 18);
1272 assert_eq!(event.timestamp.hour(), 7);
1273 assert_eq!(event.timestamp.minute(), 50);
1274 }
1275
1276 #[test]
1277 fn test_parse_sysmon_timestamp_utctime() {
1278 let line = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "UtcTime": "2020-10-18 07:50:05.917", "Image": "cmd.exe"}"#;
1279 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1280 assert_eq!(event.timestamp.year(), 2020);
1281 assert_eq!(event.timestamp.month(), 10);
1282 assert_eq!(event.timestamp.day(), 18);
1283 }
1284
1285 #[test]
1286 fn test_parse_flat_jsonl_with_type_field() {
1287 let line = r#"{"type": "Login", "user": "admin", "ip": "10.0.0.1"}"#;
1288 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1289 assert_eq!(event.event_type.as_ref(), "Login");
1290 assert_eq!(event.get_str("user"), Some("admin"));
1291 assert_eq!(event.get_str("ip"), Some("10.0.0.1"));
1292 assert!(event.data.get("type").is_none());
1294 }
1295
1296 #[test]
1297 fn test_parse_jsonl_missing_all_type_fields() {
1298 let line = r#"{"foo": "bar"}"#;
1299 let result = EventFileParser::parse_jsonl_line(line);
1300 assert!(result.is_err());
1301 assert!(result.unwrap_err().contains("Missing event_type field"));
1302 }
1303
1304 #[test]
1305 fn test_sysmon_timestamp_preserved_in_parse() {
1306 let source = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "@timestamp": "2020-10-18T07:50:05.000Z", "Image": "cmd.exe"}
1309{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "@timestamp": "2020-10-18T07:55:10.000Z", "Image": "powershell.exe"}"#;
1310 let events = EventFileParser::parse(source).unwrap();
1311 assert_eq!(events.len(), 2);
1312 assert_eq!(events[0].event.timestamp.year(), 2020);
1314 assert_eq!(events[1].event.timestamp.year(), 2020);
1315 let diff = events[1].event.timestamp - events[0].event.timestamp;
1317 assert_eq!(diff.num_seconds(), 305); }
1319
1320 #[test]
1321 fn test_sysmon_timestamp_overridden_by_batch() {
1322 let source = "BATCH 1000\n{\"EventID\": 1, \"Channel\": \"Microsoft-Windows-Sysmon/Operational\", \"@timestamp\": \"2020-10-18T07:50:05.000Z\", \"Image\": \"cmd.exe\"}";
1324 let events = EventFileParser::parse(source).unwrap();
1325 assert_eq!(events.len(), 1);
1326 assert_eq!(events[0].event.timestamp.year(), 1970);
1328 }
1329
1330 #[test]
1331 fn test_sysmon_string_to_int_coercion() {
1332 let line = r#"{"EventID": 3, "Channel": "Microsoft-Windows-Sysmon/Operational", "DestinationPort": "445", "ProcessId": "8524", "SourceIp": "10.0.0.5", "GrantedAccess": "0x1010", "Hostname": "WS01"}"#;
1335 let event = EventFileParser::parse_jsonl_line(line).unwrap();
1336 assert_eq!(event.get_int("DestinationPort"), Some(445));
1338 assert_eq!(event.get_int("ProcessId"), Some(8524));
1339 assert_eq!(event.get_str("GrantedAccess"), Some("0x1010"));
1341 assert_eq!(event.get_str("SourceIp"), Some("10.0.0.5"));
1343 }
1344
1345 #[test]
1346 fn test_native_jsonl_still_works() {
1347 let source = r#"{"event_type": "StockTick", "data": {"symbol": "AAPL", "price": 150.5}}"#;
1349 let events = EventFileParser::parse(source).unwrap();
1350 assert_eq!(events.len(), 1);
1351 assert_eq!(events[0].event.event_type.as_ref(), "StockTick");
1352 }
1353}