Skip to main content

varpulis_runtime/
event_file.rs

1//! Event file parser for VPL
2//!
3//! Inspired by Apama's .evt file format, this module provides:
4//! - Event file parsing with timing control
5//! - BATCH tags for grouping events with delays
6//! - Support for JSON-style event representation
7//!
8//! # Event File Format
9//!
10//! ```text
11//! # Comment line
12//! // Also a comment
13//!
14//! # Simple event (sent immediately)
15//! StockTick { symbol: "AAPL", price: 150.0, volume: 1000 }
16//!
17//! # Batch with delay (wait 100ms before sending)
18//! BATCH 100
19//! Order { id: 1, symbol: "AAPL", quantity: 100 }
20//! Order { id: 2, symbol: "GOOG", quantity: 50 }
21//!
22//! # Another batch at 200ms from start
23//! BATCH 200
24//! Payment { order_id: 1, amount: 15000.0 }
25//! ```
26
27use std::cell::RefCell;
28#[cfg(feature = "async-runtime")]
29use std::collections::HashMap;
30use std::fs;
31use std::path::Path;
32use std::sync::Arc;
33#[cfg(feature = "async-runtime")]
34use std::time::Duration;
35
36use indexmap::IndexMap;
37use rustc_hash::{FxBuildHasher, FxHashMap};
38#[cfg(feature = "async-runtime")]
39use tokio::sync::mpsc;
40#[cfg(feature = "async-runtime")]
41use tokio::time;
42#[cfg(feature = "async-runtime")]
43use tracing::{debug, info};
44use varpulis_core::Value;
45
46use crate::event::Event;
47
48thread_local! {
49    static FIELD_INTERNER: RefCell<FxHashMap<Box<str>, Arc<str>>> =
50        RefCell::new(FxHashMap::default());
51}
52
53/// Intern a field name to reuse the same `Arc<str>` across events.
54/// After the first occurrence, subsequent calls for the same name are O(1) Arc clones.
55fn intern_field_name(name: &str) -> Arc<str> {
56    FIELD_INTERNER.with(|interner| {
57        let mut map = interner.borrow_mut();
58        if let Some(arc) = map.get(name) {
59            arc.clone()
60        } else {
61            let arc: Arc<str> = name.into();
62            map.insert(name.into(), arc.clone());
63            arc
64        }
65    })
66}
67
68/// A parsed event with optional timing
69#[derive(Debug, Clone)]
70pub struct TimedEvent {
71    /// The event to send
72    pub event: Event,
73    /// Time offset from start (in milliseconds)
74    pub time_offset_ms: u64,
75}
76
77/// Parsed event file
78#[derive(Debug, Clone)]
79pub struct EventFile {
80    /// Name/path of the file
81    pub name: String,
82    /// Parsed events with timing
83    pub events: Vec<TimedEvent>,
84}
85
86/// Event file parser
87#[derive(Debug)]
88pub struct EventFileParser;
89
90impl EventFileParser {
91    /// Parse an event file from a string (supports both .evt and JSONL formats)
92    pub fn parse(source: &str) -> Result<Vec<TimedEvent>, String> {
93        let mut events = Vec::new();
94        let mut current_batch_time: u64 = 0;
95
96        for (line_num, line) in source.lines().enumerate() {
97            let line = line.trim();
98
99            // Skip empty lines and comments
100            if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
101                continue;
102            }
103
104            // Check for BATCH directive
105            if line.starts_with("BATCH") {
106                let parts: Vec<&str> = line.split_whitespace().collect();
107                if parts.len() >= 2 {
108                    current_batch_time = parts[1]
109                        .parse()
110                        .map_err(|_| format!("Invalid BATCH time at line {}", line_num + 1))?;
111                }
112                continue;
113            }
114
115            // Check for @Ns timing prefix: @0s EventType { ... }
116            let (time_offset, event_line, has_explicit_timing) = if line.starts_with('@') {
117                let (t, e) = Self::parse_timing_prefix(line)?;
118                (t, e, true)
119            } else {
120                (current_batch_time, line, current_batch_time > 0)
121            };
122
123            // Parse event - try JSONL first, then .evt format
124            let event = if event_line.starts_with('{') {
125                Self::parse_jsonl_line(event_line)
126                    .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
127            } else {
128                Self::parse_event_line(event_line)
129                    .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
130            };
131
132            // Apply timing offset to event timestamp so time-based windows
133            // and watermarks work correctly with .evt files.
134            // For JSONL with embedded timestamps (e.g. Sysmon @timestamp),
135            // preserve the parsed timestamp when no explicit timing directive is used.
136            let mut event = event;
137            if has_explicit_timing {
138                event.timestamp = chrono::DateTime::UNIX_EPOCH
139                    + chrono::Duration::milliseconds(time_offset as i64);
140            }
141
142            events.push(TimedEvent {
143                event,
144                time_offset_ms: time_offset,
145            });
146        }
147
148        Ok(events)
149    }
150
151    /// Parse @Ns timing prefix and return (time_ms, rest_of_line)
152    fn parse_timing_prefix(line: &str) -> Result<(u64, &str), String> {
153        // Format: @10s EventType { ... } or @100ms EventType { ... }
154        let line = line.trim_start_matches('@');
155
156        // Find first space to separate timing from event
157        let space_pos = line
158            .find(char::is_whitespace)
159            .ok_or_else(|| "Invalid timing prefix format".to_string())?;
160
161        let timing_str = &line[..space_pos];
162        let rest = line[space_pos..].trim();
163
164        // Parse timing value with unit
165        let time_ms = if timing_str.ends_with("ms") {
166            timing_str
167                .trim_end_matches("ms")
168                .parse::<u64>()
169                .map_err(|_| format!("Invalid timing value: {timing_str}"))?
170        } else if timing_str.ends_with('s') {
171            let secs = timing_str
172                .trim_end_matches('s')
173                .parse::<u64>()
174                .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
175            secs.checked_mul(1000)
176                .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
177        } else if timing_str.ends_with('m') {
178            let mins = timing_str
179                .trim_end_matches('m')
180                .parse::<u64>()
181                .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
182            mins.checked_mul(60_000)
183                .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
184        } else {
185            // Assume milliseconds if no unit
186            timing_str
187                .parse::<u64>()
188                .map_err(|_| format!("Invalid timing value: {timing_str}"))?
189        };
190
191        Ok((time_ms, rest))
192    }
193
194    /// Parse a single event line
195    fn parse_event_line(line: &str) -> Result<Event, String> {
196        // Format: EventType { field: value, field2: value2 }
197        // Or: EventType(value1, value2) - positional format
198
199        let line = line.trim().trim_end_matches(';');
200
201        // Find event type name
202        let (event_type, rest) = if let Some(brace_pos) = line.find('{') {
203            (&line[..brace_pos].trim(), &line[brace_pos..])
204        } else if let Some(paren_pos) = line.find('(') {
205            (&line[..paren_pos].trim(), &line[paren_pos..])
206        } else {
207            return Err(format!("Invalid event format: {line}"));
208        };
209
210        // Parse fields
211        if rest.starts_with('{') {
212            // JSON-style: { field: value, ... }
213            let content = rest.trim_start_matches('{').trim_end_matches('}').trim();
214            let fields = Self::split_fields(content);
215
216            // Pre-allocate event with known capacity, skip Utc::now() syscall
217            let mut event =
218                Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
219
220            for field_str in &fields {
221                let field_str = field_str.trim();
222                if field_str.is_empty() {
223                    continue;
224                }
225
226                let colon_pos = field_str
227                    .find(':')
228                    .ok_or_else(|| format!("Invalid field format: {field_str}"))?;
229                let field_name = field_str[..colon_pos].trim();
230                let field_value = Self::parse_value(field_str[colon_pos + 1..].trim())?;
231                event
232                    .data
233                    .insert(intern_field_name(field_name), field_value);
234            }
235
236            Ok(event)
237        } else if rest.starts_with('(') {
238            // Positional: (value1, value2, ...)
239            let content = rest.trim_start_matches('(').trim_end_matches(')').trim();
240            let fields = Self::split_fields(content);
241
242            let mut event =
243                Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
244
245            for (i, value_str) in fields.iter().enumerate() {
246                let value_str = value_str.trim();
247                if value_str.is_empty() {
248                    continue;
249                }
250
251                let field_value = Self::parse_value(value_str)?;
252                event.data.insert(format!("field_{i}").into(), field_value);
253            }
254
255            Ok(event)
256        } else {
257            Ok(Event::new_at(*event_type, chrono::DateTime::UNIX_EPOCH))
258        }
259    }
260
261    /// Split fields by comma, respecting nested structures.
262    /// Uses byte-level scanning since all delimiters are ASCII.
263    /// Returns slices into the original content string to avoid allocations.
264    fn split_fields(content: &str) -> Vec<&str> {
265        let bytes = content.as_bytes();
266        let mut fields = Vec::new();
267        let mut field_start = 0;
268        let mut depth = 0i32;
269        let mut in_string = false;
270        let mut escape_next = false;
271
272        for i in 0..bytes.len() {
273            if escape_next {
274                escape_next = false;
275                continue;
276            }
277            match bytes[i] {
278                b'\\' => {
279                    escape_next = true;
280                }
281                b'"' => {
282                    in_string = !in_string;
283                }
284                b'{' | b'[' | b'(' if !in_string => {
285                    depth += 1;
286                }
287                b'}' | b']' | b')' if !in_string => {
288                    depth -= 1;
289                }
290                b',' if !in_string && depth == 0 => {
291                    let field = content[field_start..i].trim();
292                    if !field.is_empty() {
293                        fields.push(field);
294                    }
295                    field_start = i + 1;
296                }
297                _ => {}
298            }
299        }
300
301        let last = content[field_start..].trim();
302        if !last.is_empty() {
303            fields.push(last);
304        }
305
306        fields
307    }
308
309    /// Parse a value string into a Value
310    fn parse_value(s: &str) -> Result<Value, String> {
311        Self::parse_value_bounded(s, crate::limits::MAX_JSON_DEPTH)
312    }
313
314    /// Depth-bounded value parsing to prevent stack overflow on nested arrays.
315    fn parse_value_bounded(s: &str, depth: usize) -> Result<Value, String> {
316        let s = s.trim();
317
318        // Boolean
319        if s == "true" {
320            return Ok(Value::Bool(true));
321        }
322        if s == "false" {
323            return Ok(Value::Bool(false));
324        }
325
326        // Null
327        if s == "null" || s == "nil" {
328            return Ok(Value::Null);
329        }
330
331        // String (quoted) — require at least 2 chars for open+close quotes
332        if s.len() >= 2
333            && ((s.starts_with('"') && s.ends_with('"'))
334                || (s.starts_with('\'') && s.ends_with('\'')))
335        {
336            let inner = &s[1..s.len() - 1];
337            // Fast path: no escape sequences (common case) — zero allocations
338            if !inner.contains('\\') {
339                return Ok(Value::Str(inner.into()));
340            }
341            // Slow path: single-pass escape processing
342            let mut result = String::with_capacity(inner.len());
343            let mut chars = inner.chars();
344            while let Some(ch) = chars.next() {
345                if ch == '\\' {
346                    match chars.next() {
347                        Some('n') => result.push('\n'),
348                        Some('t') => result.push('\t'),
349                        Some('"') => result.push('"'),
350                        Some('\'') => result.push('\''),
351                        Some('\\') => result.push('\\'),
352                        Some(other) => {
353                            result.push('\\');
354                            result.push(other);
355                        }
356                        None => result.push('\\'),
357                    }
358                } else {
359                    result.push(ch);
360                }
361            }
362            return Ok(Value::Str(result.into()));
363        }
364
365        // Integer
366        if let Ok(i) = s.parse::<i64>() {
367            return Ok(Value::Int(i));
368        }
369
370        // Float
371        if let Ok(f) = s.parse::<f64>() {
372            return Ok(Value::Float(f));
373        }
374
375        // Array [v1, v2, ...]
376        if s.starts_with('[') && s.ends_with(']') {
377            if depth == 0 {
378                return Err("Array nesting too deep".to_string());
379            }
380            let inner = &s[1..s.len() - 1];
381            let items: Result<Vec<Value>, String> = Self::split_fields(inner)
382                .iter()
383                .filter(|s| !s.is_empty())
384                .map(|item| Self::parse_value_bounded(item, depth - 1))
385                .collect();
386            return Ok(Value::array(items?));
387        }
388
389        // Unquoted string (identifier-like)
390        Ok(Value::Str(s.to_string().into()))
391    }
392
393    /// Parse from a file path
394    pub fn parse_file<P: AsRef<Path>>(path: P) -> Result<EventFile, String> {
395        let path = path.as_ref();
396        let content = fs::read_to_string(path)
397            .map_err(|e| format!("Failed to read file {}: {e}", path.display()))?;
398
399        let events = Self::parse(&content)?;
400
401        Ok(EventFile {
402            name: path.to_string_lossy().to_string(),
403            events,
404        })
405    }
406
407    /// Parse a single line (either .evt format or JSONL)
408    ///
409    /// In streaming mode, events get `Utc::now()` timestamps since there is
410    /// no batch-level timing context. Lines with `@Ns` timing prefixes are
411    /// parsed normally (the prefix is stripped and the event gets wall-clock time).
412    pub fn parse_line(line: &str) -> Result<Option<Event>, String> {
413        let line = line.trim();
414
415        // Skip empty lines and comments
416        if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
417            return Ok(None);
418        }
419
420        // Skip BATCH directives (not meaningful in streaming mode)
421        if line.starts_with("BATCH") {
422            return Ok(None);
423        }
424
425        // Strip @Ns timing prefix if present (timing is wall-clock in streaming mode)
426        let line = if line.starts_with('@') {
427            let (_, rest) = Self::parse_timing_prefix(line)?;
428            rest.trim()
429        } else {
430            line
431        };
432
433        // Try JSONL format first: {"event_type": "X", "data": {...}} or Sysmon flat JSON
434        if line.starts_with('{') {
435            let event = Self::parse_jsonl_line(line)?;
436            // Preserve timestamp if extracted from JSON (e.g. Sysmon @timestamp);
437            // only override with wall-clock time for native JSONL without embedded timestamps.
438            return Ok(Some(event));
439        }
440
441        // Fall back to .evt format: EventType { field: value, ... }
442        let mut event = Self::parse_event_line(line)?;
443        event.timestamp = chrono::Utc::now();
444        Ok(Some(event))
445    }
446
447    /// Parse a JSONL line.
448    ///
449    /// Supports two formats:
450    /// 1. **Varpulis native**: `{"event_type": "X", "data": {...}}`
451    /// 2. **Sysmon / flat JSONL**: `{"EventID": 1, "Channel": "...", "Image": "...", ...}`
452    ///    Auto-detected when `event_type` is absent but `EventID` + `Channel` are present.
453    ///    Also supports generic flat JSONL with a `type` field as event_type.
454    fn parse_jsonl_line(line: &str) -> Result<Event, String> {
455        // Enforce payload size limit before parsing
456        if line.len() > crate::limits::MAX_EVENT_PAYLOAD_BYTES {
457            return Err(format!(
458                "JSONL line too large ({} bytes, max {})",
459                line.len(),
460                crate::limits::MAX_EVENT_PAYLOAD_BYTES
461            ));
462        }
463
464        let json: serde_json::Value =
465            serde_json::from_str(line).map_err(|e| format!("Invalid JSON: {e}"))?;
466
467        // 1. Varpulis native format: {"event_type": "X", "data": {...}}
468        if let Some(event_type) = json.get("event_type").and_then(|v| v.as_str()) {
469            let mut event = Event::new(event_type);
470            if let Some(data) = json.get("data").and_then(|v| v.as_object()) {
471                for (key, value) in data.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
472                    event
473                        .data
474                        .insert(key.as_str().into(), Self::json_to_value(value));
475                }
476            }
477            return Ok(event);
478        }
479
480        // 2. Sysmon / Windows Event Log format: {"EventID": N, "Channel": "...", ...}
481        if let Some(event_id) = json.get("EventID").and_then(|v| v.as_i64()) {
482            let channel = json.get("Channel").and_then(|v| v.as_str()).unwrap_or("");
483            let event_type = if channel.contains("Sysmon") {
484                match event_id {
485                    1 => "SysmonProcessCreate".to_string(),
486                    2 => "SysmonFileCreateTime".to_string(),
487                    3 => "SysmonNetworkConnect".to_string(),
488                    5 => "SysmonProcessTerminate".to_string(),
489                    7 => "SysmonImageLoad".to_string(),
490                    8 => "SysmonCreateRemoteThread".to_string(),
491                    10 => "SysmonProcessAccess".to_string(),
492                    11 => "SysmonFileCreate".to_string(),
493                    12 => "SysmonRegistryAddDel".to_string(),
494                    13 => "SysmonRegistryValueSet".to_string(),
495                    15 => "SysmonFileCreateStreamHash".to_string(),
496                    17 => "SysmonPipeCreated".to_string(),
497                    18 => "SysmonPipeConnected".to_string(),
498                    22 => "SysmonDnsQuery".to_string(),
499                    23 => "SysmonFileDelete".to_string(),
500                    other => format!("Sysmon{other}"),
501                }
502            } else {
503                format!("WinEvent{event_id}")
504            };
505
506            let mut event = Event::new(&*event_type);
507
508            // Extract timestamp from Sysmon/Windows fields
509            Self::apply_json_timestamp(&json, &mut event);
510
511            // Promote all top-level fields except metadata
512            Self::promote_flat_json_fields(&json, &mut event);
513
514            return Ok(event);
515        }
516
517        // 3. Generic flat JSONL with "type" field
518        if let Some(event_type) = json.get("type").and_then(|v| v.as_str()) {
519            let mut event = Event::new(event_type);
520            Self::apply_json_timestamp(&json, &mut event);
521            Self::promote_flat_json_fields(&json, &mut event);
522            return Ok(event);
523        }
524
525        Err(
526            "Missing event_type field (expected 'event_type', 'EventID'+'Channel', or 'type')"
527                .to_string(),
528        )
529    }
530
531    /// Extract and apply a timestamp from JSON fields.
532    /// Tries `@timestamp` (RFC3339), `UtcTime` (Sysmon), `TimeCreated` in order.
533    /// On success, sets `event.timestamp`; on failure, leaves it unchanged.
534    fn apply_json_timestamp(json: &serde_json::Value, event: &mut Event) {
535        let ts_str = json
536            .get("@timestamp")
537            .or_else(|| json.get("UtcTime"))
538            .or_else(|| json.get("TimeCreated"))
539            .and_then(|v| v.as_str());
540
541        if let Some(s) = ts_str {
542            // Try RFC3339 first: "2020-10-18T07:50:05.917Z"
543            if let Ok(ts) = chrono::DateTime::parse_from_rfc3339(s) {
544                event.timestamp = ts.with_timezone(&chrono::Utc);
545                return;
546            }
547            // Try Sysmon format: "2020-10-18 07:50:05.917"
548            if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") {
549                event.timestamp = naive.and_utc();
550                return;
551            }
552            // Try without fractional seconds: "2020-10-18 07:50:05"
553            if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
554                event.timestamp = naive.and_utc();
555            }
556        }
557    }
558
559    /// Promote all top-level JSON keys as event fields, skipping metadata keys.
560    /// String values that look like pure integers (e.g. "8524") are coerced to
561    /// `Value::Int` so VPL comparisons like `DestinationPort == 445` work with
562    /// data sources that serialize numbers as strings (e.g. NXLog/MORDOR).
563    fn promote_flat_json_fields(json: &serde_json::Value, event: &mut Event) {
564        static SKIP_KEYS: &[&str] = &[
565            "EventID",
566            "Channel",
567            "@timestamp",
568            "@version",
569            "type",
570            "event_type",
571        ];
572
573        if let Some(obj) = json.as_object() {
574            for (key, value) in obj.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
575                if !SKIP_KEYS.contains(&key.as_str()) {
576                    let v = Self::json_to_value_coerced(value);
577                    event.data.insert(intern_field_name(key), v);
578                }
579            }
580        }
581    }
582
583    /// Like `json_to_value` but coerces string values that look like pure
584    /// integers (e.g. "8524", "-1") to `Value::Int`. Hex strings like "0x1010"
585    /// and floats like "3.14" are kept as strings to preserve their format.
586    fn json_to_value_coerced(v: &serde_json::Value) -> Value {
587        if let Some(s) = v.as_str() {
588            if !s.is_empty() && s.len() <= 19 {
589                if let Ok(n) = s.parse::<i64>() {
590                    return Value::Int(n);
591                }
592            }
593            return Self::json_to_value(v);
594        }
595        Self::json_to_value(v)
596    }
597
598    /// Convert serde_json::Value to varpulis Value (depth-bounded to prevent stack overflow)
599    fn json_to_value(v: &serde_json::Value) -> Value {
600        Self::json_to_value_bounded(v, crate::limits::MAX_JSON_DEPTH)
601    }
602
603    fn json_to_value_bounded(v: &serde_json::Value, depth: usize) -> Value {
604        if depth == 0 {
605            return Value::Null;
606        }
607        match v {
608            serde_json::Value::Null => Value::Null,
609            serde_json::Value::Bool(b) => Value::Bool(*b),
610            serde_json::Value::Number(n) => {
611                if let Some(i) = n.as_i64() {
612                    Value::Int(i)
613                } else if let Some(f) = n.as_f64() {
614                    Value::Float(f)
615                } else {
616                    Value::Null
617                }
618            }
619            serde_json::Value::String(s) => {
620                if s.len() > crate::limits::MAX_STRING_VALUE_BYTES {
621                    let truncated =
622                        &s[..s.floor_char_boundary(crate::limits::MAX_STRING_VALUE_BYTES)];
623                    Value::Str(truncated.into())
624                } else {
625                    Value::Str(s.clone().into())
626                }
627            }
628            serde_json::Value::Array(arr) => {
629                let capped = arr.len().min(crate::limits::MAX_ARRAY_ELEMENTS);
630                Value::array(
631                    arr.iter()
632                        .take(capped)
633                        .map(|v| Self::json_to_value_bounded(v, depth - 1))
634                        .collect(),
635                )
636            }
637            serde_json::Value::Object(obj) => {
638                let mut map: IndexMap<std::sync::Arc<str>, Value, FxBuildHasher> =
639                    IndexMap::with_hasher(FxBuildHasher);
640                for (k, v) in obj.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
641                    map.insert(k.as_str().into(), Self::json_to_value_bounded(v, depth - 1));
642                }
643                Value::map(map)
644            }
645        }
646    }
647}
648
649/// Streaming event file reader - reads events one at a time without loading entire file
650pub struct StreamingEventReader<R: std::io::BufRead> {
651    reader: R,
652    line_buffer: String,
653    events_read: usize,
654}
655
656impl<R: std::io::BufRead> std::fmt::Debug for StreamingEventReader<R> {
657    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
658        f.debug_struct("StreamingEventReader")
659            .field("events_read", &self.events_read)
660            .finish_non_exhaustive()
661    }
662}
663
664impl<R: std::io::BufRead> StreamingEventReader<R> {
665    pub const fn new(reader: R) -> Self {
666        Self {
667            reader,
668            line_buffer: String::new(),
669            events_read: 0,
670        }
671    }
672
673    /// Get count of events read so far
674    pub const fn events_read(&self) -> usize {
675        self.events_read
676    }
677}
678
679impl StreamingEventReader<std::io::BufReader<std::fs::File>> {
680    /// Create a streaming reader from a file path with large buffer for performance
681    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
682        let file =
683            std::fs::File::open(path.as_ref()).map_err(|e| format!("Failed to open file: {e}"))?;
684        // Use 64KB buffer for better I/O performance
685        Ok(Self::new(std::io::BufReader::with_capacity(
686            64 * 1024,
687            file,
688        )))
689    }
690}
691
692impl<R: std::io::BufRead> Iterator for StreamingEventReader<R> {
693    type Item = Result<Event, String>;
694
695    fn next(&mut self) -> Option<Self::Item> {
696        loop {
697            self.line_buffer.clear();
698            match self.reader.read_line(&mut self.line_buffer) {
699                Ok(0) => return None, // EOF
700                Ok(_) => {
701                    // Enforce line length limit to prevent OOM from single huge lines
702                    if self.line_buffer.len() > crate::limits::MAX_LINE_LENGTH {
703                        tracing::warn!(
704                            len = self.line_buffer.len(),
705                            "Skipping oversized line ({} bytes, max {})",
706                            self.line_buffer.len(),
707                            crate::limits::MAX_LINE_LENGTH
708                        );
709                        self.line_buffer.clear();
710                        continue;
711                    }
712                    match EventFileParser::parse_line(&self.line_buffer) {
713                        Ok(Some(event)) => {
714                            self.events_read += 1;
715                            return Some(Ok(event));
716                        }
717                        Ok(None) => continue, // Skip empty/comment lines
718                        Err(e) => return Some(Err(e)),
719                    }
720                }
721                Err(e) => return Some(Err(format!("Read error: {e}"))),
722            }
723        }
724    }
725}
726
727/// Event file player - sends events to engine with timing (async-runtime only).
728#[cfg(feature = "async-runtime")]
729#[derive(Debug)]
730pub struct EventFilePlayer {
731    events: Vec<TimedEvent>,
732    sender: mpsc::Sender<Event>,
733}
734
735#[cfg(feature = "async-runtime")]
736impl EventFilePlayer {
737    pub const fn new(events: Vec<TimedEvent>, sender: mpsc::Sender<Event>) -> Self {
738        Self { events, sender }
739    }
740
741    pub fn from_file<P: AsRef<Path>>(path: P, sender: mpsc::Sender<Event>) -> Result<Self, String> {
742        let event_file = EventFileParser::parse_file(path)?;
743        Ok(Self::new(event_file.events, sender))
744    }
745
746    /// Play events with timing
747    pub async fn play(&self) -> Result<usize, String> {
748        let start = std::time::Instant::now();
749        let mut sent_count = 0;
750
751        // Group events by time offset
752        let mut batches: HashMap<u64, Vec<&TimedEvent>> = HashMap::new();
753        for event in &self.events {
754            batches.entry(event.time_offset_ms).or_default().push(event);
755        }
756
757        // Sort batch times
758        let mut times: Vec<u64> = batches.keys().copied().collect();
759        times.sort_unstable();
760
761        for batch_time in times {
762            // Wait until batch time
763            let elapsed = start.elapsed().as_millis() as u64;
764            if batch_time > elapsed {
765                time::sleep(Duration::from_millis(batch_time - elapsed)).await;
766            }
767
768            // Send all events in this batch
769            if let Some(events) = batches.get(&batch_time) {
770                for timed_event in events {
771                    debug!(
772                        "Sending event: {} at {}ms",
773                        timed_event.event.event_type, batch_time
774                    );
775                    self.sender
776                        .send(timed_event.event.clone())
777                        .await
778                        .map_err(|e| format!("Failed to send event: {e}"))?;
779                    sent_count += 1;
780                }
781            }
782        }
783
784        info!("Played {} events from file", sent_count);
785        Ok(sent_count)
786    }
787
788    /// Play events without timing (immediate)
789    pub async fn play_immediate(&self) -> Result<usize, String> {
790        let mut sent_count = 0;
791
792        for timed_event in &self.events {
793            debug!("Sending event: {}", timed_event.event.event_type);
794            self.sender
795                .send(timed_event.event.clone())
796                .await
797                .map_err(|e| format!("Failed to send event: {e}"))?;
798            sent_count += 1;
799        }
800
801        info!("Played {} events (immediate mode)", sent_count);
802        Ok(sent_count)
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809
810    #[test]
811    fn test_parse_simple_event() {
812        let source = r#"
813            StockTick { symbol: "AAPL", price: 150.5, volume: 1000 }
814        "#;
815
816        let events = EventFileParser::parse(source).unwrap();
817        assert_eq!(events.len(), 1);
818        assert_eq!(&*events[0].event.event_type, "StockTick");
819        assert_eq!(
820            events[0].event.get("symbol"),
821            Some(&Value::Str("AAPL".into()))
822        );
823        assert_eq!(events[0].event.get("price"), Some(&Value::Float(150.5)));
824        assert_eq!(events[0].event.get("volume"), Some(&Value::Int(1000)));
825    }
826
827    #[test]
828    fn test_parse_batched_events() {
829        let source = r#"
830            # First batch at 0ms
831            Order { id: 1, symbol: "AAPL" }
832
833            # Second batch at 100ms
834            BATCH 100
835            Payment { order_id: 1 }
836            Shipping { order_id: 1 }
837
838            # Third batch at 200ms
839            BATCH 200
840            Confirmation { order_id: 1 }
841        "#;
842
843        let events = EventFileParser::parse(source).unwrap();
844        assert_eq!(events.len(), 4);
845
846        assert_eq!(events[0].time_offset_ms, 0);
847        assert_eq!(&*events[0].event.event_type, "Order");
848
849        assert_eq!(events[1].time_offset_ms, 100);
850        assert_eq!(&*events[1].event.event_type, "Payment");
851
852        assert_eq!(events[2].time_offset_ms, 100);
853        assert_eq!(&*events[2].event.event_type, "Shipping");
854
855        assert_eq!(events[3].time_offset_ms, 200);
856        assert_eq!(&*events[3].event.event_type, "Confirmation");
857    }
858
859    #[test]
860    fn test_parse_positional_format() {
861        let source = r#"
862            StockPrice("AAPL", 150.5)
863        "#;
864
865        let events = EventFileParser::parse(source).unwrap();
866        assert_eq!(events.len(), 1);
867        assert_eq!(&*events[0].event.event_type, "StockPrice");
868        assert_eq!(
869            events[0].event.get("field_0"),
870            Some(&Value::Str("AAPL".into()))
871        );
872        assert_eq!(events[0].event.get("field_1"), Some(&Value::Float(150.5)));
873    }
874
875    #[test]
876    fn test_parse_array_values() {
877        let source = r#"
878            BatchOrder { ids: [1, 2, 3], symbols: ["AAPL", "GOOG"] }
879        "#;
880
881        let events = EventFileParser::parse(source).unwrap();
882        assert_eq!(events.len(), 1);
883
884        let ids = events[0].event.get("ids").unwrap();
885        if let Value::Array(arr) = ids {
886            assert_eq!(arr.len(), 3);
887        } else {
888            panic!("Expected array");
889        }
890    }
891
892    #[test]
893    fn test_parse_comments() {
894        let source = r"
895            # This is a comment
896            // This is also a comment
897            Event1 { x: 1 }
898            # Another comment
899            Event2 { y: 2 }
900        ";
901
902        let events = EventFileParser::parse(source).unwrap();
903        assert_eq!(events.len(), 2);
904    }
905
906    #[test]
907    fn test_sequence_scenario() {
908        let source = r#"
909            # Test sequence: Order -> Payment
910            
911            # Start with an order
912            Order { id: 1, symbol: "AAPL", quantity: 100 }
913            
914            # Payment arrives 50ms later
915            BATCH 50
916            Payment { order_id: 1, amount: 15000.0 }
917            
918            # Another order without payment (should timeout)
919            BATCH 100
920            Order { id: 2, symbol: "GOOG", quantity: 50 }
921            
922            # Much later, payment for order 2 (may timeout)
923            BATCH 5000
924            Payment { order_id: 2, amount: 7500.0 }
925        "#;
926
927        let events = EventFileParser::parse(source).unwrap();
928        assert_eq!(events.len(), 4);
929
930        // Verify order sequence
931        assert_eq!(&*events[0].event.event_type, "Order");
932        assert_eq!(events[0].time_offset_ms, 0);
933
934        assert_eq!(&*events[1].event.event_type, "Payment");
935        assert_eq!(events[1].time_offset_ms, 50);
936
937        assert_eq!(&*events[2].event.event_type, "Order");
938        assert_eq!(events[2].time_offset_ms, 100);
939
940        assert_eq!(&*events[3].event.event_type, "Payment");
941        assert_eq!(events[3].time_offset_ms, 5000);
942    }
943
944    // ==========================================================================
945    // Value Parsing Tests
946    // ==========================================================================
947
948    #[test]
949    fn test_parse_boolean_values() {
950        let source = r"
951            Flags { active: true, disabled: false }
952        ";
953
954        let events = EventFileParser::parse(source).unwrap();
955        assert_eq!(events[0].event.get("active"), Some(&Value::Bool(true)));
956        assert_eq!(events[0].event.get("disabled"), Some(&Value::Bool(false)));
957    }
958
959    #[test]
960    fn test_parse_null_values() {
961        let source = r"
962            Data { value: null, other: nil }
963        ";
964
965        let events = EventFileParser::parse(source).unwrap();
966        assert_eq!(events[0].event.get("value"), Some(&Value::Null));
967        assert_eq!(events[0].event.get("other"), Some(&Value::Null));
968    }
969
970    #[test]
971    fn test_parse_escape_sequences() {
972        let source = r#"
973            Message { text: "Hello\nWorld", path: "C:\\Users\\test" }
974        "#;
975
976        let events = EventFileParser::parse(source).unwrap();
977        let text = events[0].event.get("text").unwrap();
978        if let Value::Str(s) = text {
979            assert!(s.contains('\n'));
980        }
981    }
982
983    #[test]
984    fn test_parse_single_quoted_string() {
985        let source = r"
986            Event { name: 'single quoted' }
987        ";
988
989        let events = EventFileParser::parse(source).unwrap();
990        assert_eq!(
991            events[0].event.get("name"),
992            Some(&Value::Str("single quoted".into()))
993        );
994    }
995
996    #[test]
997    fn test_parse_unquoted_identifier() {
998        let source = r"
999            Event { status: active, mode: processing }
1000        ";
1001
1002        let events = EventFileParser::parse(source).unwrap();
1003        assert_eq!(
1004            events[0].event.get("status"),
1005            Some(&Value::Str("active".into()))
1006        );
1007        assert_eq!(
1008            events[0].event.get("mode"),
1009            Some(&Value::Str("processing".into()))
1010        );
1011    }
1012
1013    #[test]
1014    fn test_parse_negative_numbers() {
1015        let source = r"
1016            Data { temp: -15, delta: -2.5 }
1017        ";
1018
1019        let events = EventFileParser::parse(source).unwrap();
1020        assert_eq!(events[0].event.get("temp"), Some(&Value::Int(-15)));
1021        assert_eq!(events[0].event.get("delta"), Some(&Value::Float(-2.5)));
1022    }
1023
1024    #[test]
1025    fn test_parse_nested_array() {
1026        let source = r"
1027            Complex { matrix: [[1, 2], [3, 4]] }
1028        ";
1029
1030        let events = EventFileParser::parse(source).unwrap();
1031        let matrix = events[0].event.get("matrix").unwrap();
1032        if let Value::Array(arr) = matrix {
1033            assert_eq!(arr.len(), 2);
1034        } else {
1035            panic!("Expected array");
1036        }
1037    }
1038
1039    // ==========================================================================
1040    // Error Handling Tests
1041    // ==========================================================================
1042
1043    #[test]
1044    fn test_parse_invalid_event_format() {
1045        let source = "InvalidEvent";
1046        let result = EventFileParser::parse(source);
1047        assert!(result.is_err());
1048    }
1049
1050    #[test]
1051    fn test_parse_invalid_field_format() {
1052        let source = "Event { invalid_no_colon }";
1053        let result = EventFileParser::parse(source);
1054        assert!(result.is_err());
1055    }
1056
1057    #[test]
1058    fn test_parse_invalid_batch_time() {
1059        let source = r"
1060            BATCH not_a_number
1061            Event { x: 1 }
1062        ";
1063        let result = EventFileParser::parse(source);
1064        assert!(result.is_err());
1065    }
1066
1067    // ==========================================================================
1068    // Edge Cases
1069    // ==========================================================================
1070
1071    #[test]
1072    fn test_parse_empty_content() {
1073        let source = "";
1074        let events = EventFileParser::parse(source).unwrap();
1075        assert!(events.is_empty());
1076    }
1077
1078    #[test]
1079    fn test_parse_only_comments() {
1080        let source = r"
1081            # Comment 1
1082            // Comment 2
1083            # Comment 3
1084        ";
1085        let events = EventFileParser::parse(source).unwrap();
1086        assert!(events.is_empty());
1087    }
1088
1089    #[test]
1090    fn test_parse_empty_braces() {
1091        let source = "EmptyEvent { }";
1092        let events = EventFileParser::parse(source).unwrap();
1093        assert_eq!(events.len(), 1);
1094        assert!(events[0].event.data.is_empty());
1095    }
1096
1097    #[test]
1098    fn test_parse_semicolon_terminated() {
1099        let source = "Event { x: 1 };";
1100        let events = EventFileParser::parse(source).unwrap();
1101        assert_eq!(events.len(), 1);
1102    }
1103
1104    #[test]
1105    fn test_parse_whitespace_handling() {
1106        let source = "  Event  {  x  :  1  ,  y  :  2  }  ";
1107        let events = EventFileParser::parse(source).unwrap();
1108        assert_eq!(events.len(), 1);
1109        assert_eq!(events[0].event.get("x"), Some(&Value::Int(1)));
1110        assert_eq!(events[0].event.get("y"), Some(&Value::Int(2)));
1111    }
1112
1113    // ==========================================================================
1114    // EventFilePlayer Tests
1115    // ==========================================================================
1116
1117    #[tokio::test]
1118    async fn test_player_immediate() {
1119        let events = vec![
1120            TimedEvent {
1121                event: Event::new("A").with_field("id", 1i64),
1122                time_offset_ms: 0,
1123            },
1124            TimedEvent {
1125                event: Event::new("B").with_field("id", 2i64),
1126                time_offset_ms: 100,
1127            },
1128        ];
1129
1130        let (tx, mut rx) = mpsc::channel(10);
1131        let player = EventFilePlayer::new(events, tx);
1132
1133        let count = player.play_immediate().await.unwrap();
1134        assert_eq!(count, 2);
1135
1136        let e1 = rx.recv().await.unwrap();
1137        assert_eq!(&*e1.event_type, "A");
1138
1139        let e2 = rx.recv().await.unwrap();
1140        assert_eq!(&*e2.event_type, "B");
1141    }
1142
1143    #[tokio::test]
1144    async fn test_player_with_batches() {
1145        let events = vec![
1146            TimedEvent {
1147                event: Event::new("First"),
1148                time_offset_ms: 0,
1149            },
1150            TimedEvent {
1151                event: Event::new("Second"),
1152                time_offset_ms: 0,
1153            },
1154            TimedEvent {
1155                event: Event::new("Third"),
1156                time_offset_ms: 10, // 10ms later
1157            },
1158        ];
1159
1160        let (tx, mut rx) = mpsc::channel(10);
1161        let player = EventFilePlayer::new(events, tx);
1162
1163        let count = player.play().await.unwrap();
1164        assert_eq!(count, 3);
1165
1166        // All events should have been sent
1167        assert!(rx.recv().await.is_some());
1168        assert!(rx.recv().await.is_some());
1169        assert!(rx.recv().await.is_some());
1170    }
1171
1172    #[tokio::test]
1173    async fn test_player_empty() {
1174        let events = vec![];
1175        let (tx, _rx) = mpsc::channel(10);
1176        let player = EventFilePlayer::new(events, tx);
1177
1178        let count = player.play_immediate().await.unwrap();
1179        assert_eq!(count, 0);
1180    }
1181
1182    // =========================================================================
1183    // Sysmon / flat JSONL parsing tests
1184    // =========================================================================
1185    use chrono::{Datelike, Timelike};
1186
1187    #[test]
1188    fn test_parse_sysmon_process_create() {
1189        let line = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\System32\\cmd.exe", "CommandLine": "cmd.exe /c whoami", "ParentImage": "C:\\Windows\\explorer.exe", "User": "CORP\\admin", "Hostname": "WS01", "ProcessId": 1234}"#;
1190        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1191        assert_eq!(event.event_type.as_ref(), "SysmonProcessCreate");
1192        assert_eq!(
1193            event.get_str("Image"),
1194            Some("C:\\Windows\\System32\\cmd.exe")
1195        );
1196        assert_eq!(event.get_str("CommandLine"), Some("cmd.exe /c whoami"));
1197        assert_eq!(
1198            event.get_str("ParentImage"),
1199            Some("C:\\Windows\\explorer.exe")
1200        );
1201        assert_eq!(event.get_str("User"), Some("CORP\\admin"));
1202        assert_eq!(event.get_str("Hostname"), Some("WS01"));
1203        assert_eq!(event.get_int("ProcessId"), Some(1234));
1204        // Metadata keys should NOT be promoted
1205        assert!(event.data.get("EventID").is_none());
1206        assert!(event.data.get("Channel").is_none());
1207    }
1208
1209    #[test]
1210    fn test_parse_sysmon_network_connect() {
1211        let line = r#"{"EventID": 3, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\System32\\svchost.exe", "SourceIp": "10.0.0.5", "DestinationIp": "192.168.1.100", "DestinationPort": 445, "Protocol": "tcp", "Hostname": "WS01"}"#;
1212        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1213        assert_eq!(event.event_type.as_ref(), "SysmonNetworkConnect");
1214        assert_eq!(event.get_str("SourceIp"), Some("10.0.0.5"));
1215        assert_eq!(event.get_str("DestinationIp"), Some("192.168.1.100"));
1216        assert_eq!(event.get_int("DestinationPort"), Some(445));
1217        assert_eq!(event.get_str("Protocol"), Some("tcp"));
1218    }
1219
1220    #[test]
1221    fn test_parse_sysmon_process_access() {
1222        let line = r#"{"EventID": 10, "Channel": "Microsoft-Windows-Sysmon/Operational", "SourceImage": "C:\\tools\\mimikatz.exe", "TargetImage": "C:\\Windows\\System32\\lsass.exe", "GrantedAccess": "0x1010"}"#;
1223        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1224        assert_eq!(event.event_type.as_ref(), "SysmonProcessAccess");
1225        assert_eq!(
1226            event.get_str("TargetImage"),
1227            Some("C:\\Windows\\System32\\lsass.exe")
1228        );
1229        assert_eq!(event.get_str("GrantedAccess"), Some("0x1010"));
1230    }
1231
1232    #[test]
1233    fn test_parse_sysmon_file_create() {
1234        let line = r#"{"EventID": 11, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\System32\\cmd.exe", "TargetFilename": "C:\\temp\\data.zip"}"#;
1235        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1236        assert_eq!(event.event_type.as_ref(), "SysmonFileCreate");
1237        assert_eq!(event.get_str("TargetFilename"), Some("C:\\temp\\data.zip"));
1238    }
1239
1240    #[test]
1241    fn test_parse_sysmon_registry_value_set() {
1242        let line = r#"{"EventID": 13, "Channel": "Microsoft-Windows-Sysmon/Operational", "Image": "C:\\Windows\\regedit.exe", "TargetObject": "HKLM\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Run\\backdoor", "Details": "C:\\malware\\payload.exe"}"#;
1243        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1244        assert_eq!(event.event_type.as_ref(), "SysmonRegistryValueSet");
1245        assert!(event.get_str("TargetObject").unwrap().contains("\\Run\\"));
1246    }
1247
1248    #[test]
1249    fn test_parse_sysmon_unknown_event_id() {
1250        let line = r#"{"EventID": 99, "Channel": "Microsoft-Windows-Sysmon/Operational", "SomeField": "value"}"#;
1251        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1252        assert_eq!(event.event_type.as_ref(), "Sysmon99");
1253        assert_eq!(event.get_str("SomeField"), Some("value"));
1254    }
1255
1256    #[test]
1257    fn test_parse_winevent_non_sysmon() {
1258        let line = r#"{"EventID": 4624, "Channel": "Security", "LogonType": 3, "TargetUserName": "admin"}"#;
1259        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1260        assert_eq!(event.event_type.as_ref(), "WinEvent4624");
1261        assert_eq!(event.get_int("LogonType"), Some(3));
1262        assert_eq!(event.get_str("TargetUserName"), Some("admin"));
1263    }
1264
1265    #[test]
1266    fn test_parse_sysmon_timestamp_rfc3339() {
1267        let line = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "@timestamp": "2020-10-18T07:50:05.917Z", "Image": "cmd.exe"}"#;
1268        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1269        assert_eq!(event.timestamp.year(), 2020);
1270        assert_eq!(event.timestamp.month(), 10);
1271        assert_eq!(event.timestamp.day(), 18);
1272        assert_eq!(event.timestamp.hour(), 7);
1273        assert_eq!(event.timestamp.minute(), 50);
1274    }
1275
1276    #[test]
1277    fn test_parse_sysmon_timestamp_utctime() {
1278        let line = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "UtcTime": "2020-10-18 07:50:05.917", "Image": "cmd.exe"}"#;
1279        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1280        assert_eq!(event.timestamp.year(), 2020);
1281        assert_eq!(event.timestamp.month(), 10);
1282        assert_eq!(event.timestamp.day(), 18);
1283    }
1284
1285    #[test]
1286    fn test_parse_flat_jsonl_with_type_field() {
1287        let line = r#"{"type": "Login", "user": "admin", "ip": "10.0.0.1"}"#;
1288        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1289        assert_eq!(event.event_type.as_ref(), "Login");
1290        assert_eq!(event.get_str("user"), Some("admin"));
1291        assert_eq!(event.get_str("ip"), Some("10.0.0.1"));
1292        // "type" should be skipped
1293        assert!(event.data.get("type").is_none());
1294    }
1295
1296    #[test]
1297    fn test_parse_jsonl_missing_all_type_fields() {
1298        let line = r#"{"foo": "bar"}"#;
1299        let result = EventFileParser::parse_jsonl_line(line);
1300        assert!(result.is_err());
1301        assert!(result.unwrap_err().contains("Missing event_type field"));
1302    }
1303
1304    #[test]
1305    fn test_sysmon_timestamp_preserved_in_parse() {
1306        // When JSONL has embedded timestamps and no BATCH/@ prefix,
1307        // the parsed timestamp should be preserved (not overridden with UNIX_EPOCH)
1308        let source = r#"{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "@timestamp": "2020-10-18T07:50:05.000Z", "Image": "cmd.exe"}
1309{"EventID": 1, "Channel": "Microsoft-Windows-Sysmon/Operational", "@timestamp": "2020-10-18T07:55:10.000Z", "Image": "powershell.exe"}"#;
1310        let events = EventFileParser::parse(source).unwrap();
1311        assert_eq!(events.len(), 2);
1312        // Timestamps should be from the JSON, not UNIX_EPOCH
1313        assert_eq!(events[0].event.timestamp.year(), 2020);
1314        assert_eq!(events[1].event.timestamp.year(), 2020);
1315        // Second event is 5 minutes later
1316        let diff = events[1].event.timestamp - events[0].event.timestamp;
1317        assert_eq!(diff.num_seconds(), 305); // 5min 5sec
1318    }
1319
1320    #[test]
1321    fn test_sysmon_timestamp_overridden_by_batch() {
1322        // When BATCH directive is used, it should override the parsed timestamp
1323        let source = "BATCH 1000\n{\"EventID\": 1, \"Channel\": \"Microsoft-Windows-Sysmon/Operational\", \"@timestamp\": \"2020-10-18T07:50:05.000Z\", \"Image\": \"cmd.exe\"}";
1324        let events = EventFileParser::parse(source).unwrap();
1325        assert_eq!(events.len(), 1);
1326        // Should be UNIX_EPOCH + 1000ms, not the parsed timestamp
1327        assert_eq!(events[0].event.timestamp.year(), 1970);
1328    }
1329
1330    #[test]
1331    fn test_sysmon_string_to_int_coercion() {
1332        // MORDOR/NXLog serializes numeric fields as strings: "8524", "445"
1333        // The parser should coerce these to Value::Int for VPL comparisons
1334        let line = r#"{"EventID": 3, "Channel": "Microsoft-Windows-Sysmon/Operational", "DestinationPort": "445", "ProcessId": "8524", "SourceIp": "10.0.0.5", "GrantedAccess": "0x1010", "Hostname": "WS01"}"#;
1335        let event = EventFileParser::parse_jsonl_line(line).unwrap();
1336        // Pure numeric strings → Int
1337        assert_eq!(event.get_int("DestinationPort"), Some(445));
1338        assert_eq!(event.get_int("ProcessId"), Some(8524));
1339        // Hex string stays as Str (not pure decimal)
1340        assert_eq!(event.get_str("GrantedAccess"), Some("0x1010"));
1341        // IP address stays as Str
1342        assert_eq!(event.get_str("SourceIp"), Some("10.0.0.5"));
1343    }
1344
1345    #[test]
1346    fn test_native_jsonl_still_works() {
1347        // Existing Varpulis JSONL format should continue working
1348        let source = r#"{"event_type": "StockTick", "data": {"symbol": "AAPL", "price": 150.5}}"#;
1349        let events = EventFileParser::parse(source).unwrap();
1350        assert_eq!(events.len(), 1);
1351        assert_eq!(events[0].event.event_type.as_ref(), "StockTick");
1352    }
1353}