Skip to main content

varpulis_runtime/
event_file.rs

1//! Event file parser for VPL
2//!
3//! Inspired by Apama's .evt file format, this module provides:
4//! - Event file parsing with timing control
5//! - BATCH tags for grouping events with delays
6//! - Support for JSON-style event representation
7//!
8//! # Event File Format
9//!
10//! ```text
11//! # Comment line
12//! // Also a comment
13//!
14//! # Simple event (sent immediately)
15//! StockTick { symbol: "AAPL", price: 150.0, volume: 1000 }
16//!
17//! # Batch with delay (wait 100ms before sending)
18//! BATCH 100
19//! Order { id: 1, symbol: "AAPL", quantity: 100 }
20//! Order { id: 2, symbol: "GOOG", quantity: 50 }
21//!
22//! # Another batch at 200ms from start
23//! BATCH 200
24//! Payment { order_id: 1, amount: 15000.0 }
25//! ```
26
27use std::cell::RefCell;
28use std::collections::HashMap;
29use std::fs;
30use std::path::Path;
31use std::sync::Arc;
32use std::time::Duration;
33
34use indexmap::IndexMap;
35use rustc_hash::{FxBuildHasher, FxHashMap};
36use tokio::sync::mpsc;
37use tokio::time;
38use tracing::{debug, info};
39use varpulis_core::Value;
40
41use crate::event::Event;
42
43thread_local! {
44    static FIELD_INTERNER: RefCell<FxHashMap<Box<str>, Arc<str>>> =
45        RefCell::new(FxHashMap::default());
46}
47
48/// Intern a field name to reuse the same `Arc<str>` across events.
49/// After the first occurrence, subsequent calls for the same name are O(1) Arc clones.
50fn intern_field_name(name: &str) -> Arc<str> {
51    FIELD_INTERNER.with(|interner| {
52        let mut map = interner.borrow_mut();
53        if let Some(arc) = map.get(name) {
54            arc.clone()
55        } else {
56            let arc: Arc<str> = name.into();
57            map.insert(name.into(), arc.clone());
58            arc
59        }
60    })
61}
62
63/// A parsed event with optional timing
64#[derive(Debug, Clone)]
65pub struct TimedEvent {
66    /// The event to send
67    pub event: Event,
68    /// Time offset from start (in milliseconds)
69    pub time_offset_ms: u64,
70}
71
72/// Parsed event file
73#[derive(Debug, Clone)]
74pub struct EventFile {
75    /// Name/path of the file
76    pub name: String,
77    /// Parsed events with timing
78    pub events: Vec<TimedEvent>,
79}
80
81/// Event file parser
82#[derive(Debug)]
83pub struct EventFileParser;
84
85impl EventFileParser {
86    /// Parse an event file from a string (supports both .evt and JSONL formats)
87    pub fn parse(source: &str) -> Result<Vec<TimedEvent>, String> {
88        let mut events = Vec::new();
89        let mut current_batch_time: u64 = 0;
90
91        for (line_num, line) in source.lines().enumerate() {
92            let line = line.trim();
93
94            // Skip empty lines and comments
95            if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
96                continue;
97            }
98
99            // Check for BATCH directive
100            if line.starts_with("BATCH") {
101                let parts: Vec<&str> = line.split_whitespace().collect();
102                if parts.len() >= 2 {
103                    current_batch_time = parts[1]
104                        .parse()
105                        .map_err(|_| format!("Invalid BATCH time at line {}", line_num + 1))?;
106                }
107                continue;
108            }
109
110            // Check for @Ns timing prefix: @0s EventType { ... }
111            let (time_offset, event_line) = if line.starts_with('@') {
112                Self::parse_timing_prefix(line)?
113            } else {
114                (current_batch_time, line)
115            };
116
117            // Parse event - try JSONL first, then .evt format
118            let event = if event_line.starts_with('{') {
119                Self::parse_jsonl_line(event_line)
120                    .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
121            } else {
122                Self::parse_event_line(event_line)
123                    .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
124            };
125
126            // Apply timing offset to event timestamp so time-based windows
127            // and watermarks work correctly with .evt files
128            let mut event = event;
129            event.timestamp =
130                chrono::DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(time_offset as i64);
131
132            events.push(TimedEvent {
133                event,
134                time_offset_ms: time_offset,
135            });
136        }
137
138        Ok(events)
139    }
140
141    /// Parse @Ns timing prefix and return (time_ms, rest_of_line)
142    fn parse_timing_prefix(line: &str) -> Result<(u64, &str), String> {
143        // Format: @10s EventType { ... } or @100ms EventType { ... }
144        let line = line.trim_start_matches('@');
145
146        // Find first space to separate timing from event
147        let space_pos = line
148            .find(char::is_whitespace)
149            .ok_or_else(|| "Invalid timing prefix format".to_string())?;
150
151        let timing_str = &line[..space_pos];
152        let rest = line[space_pos..].trim();
153
154        // Parse timing value with unit
155        let time_ms = if timing_str.ends_with("ms") {
156            timing_str
157                .trim_end_matches("ms")
158                .parse::<u64>()
159                .map_err(|_| format!("Invalid timing value: {timing_str}"))?
160        } else if timing_str.ends_with('s') {
161            let secs = timing_str
162                .trim_end_matches('s')
163                .parse::<u64>()
164                .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
165            secs.checked_mul(1000)
166                .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
167        } else if timing_str.ends_with('m') {
168            let mins = timing_str
169                .trim_end_matches('m')
170                .parse::<u64>()
171                .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
172            mins.checked_mul(60_000)
173                .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
174        } else {
175            // Assume milliseconds if no unit
176            timing_str
177                .parse::<u64>()
178                .map_err(|_| format!("Invalid timing value: {timing_str}"))?
179        };
180
181        Ok((time_ms, rest))
182    }
183
184    /// Parse a single event line
185    fn parse_event_line(line: &str) -> Result<Event, String> {
186        // Format: EventType { field: value, field2: value2 }
187        // Or: EventType(value1, value2) - positional format
188
189        let line = line.trim().trim_end_matches(';');
190
191        // Find event type name
192        let (event_type, rest) = if let Some(brace_pos) = line.find('{') {
193            (&line[..brace_pos].trim(), &line[brace_pos..])
194        } else if let Some(paren_pos) = line.find('(') {
195            (&line[..paren_pos].trim(), &line[paren_pos..])
196        } else {
197            return Err(format!("Invalid event format: {line}"));
198        };
199
200        // Parse fields
201        if rest.starts_with('{') {
202            // JSON-style: { field: value, ... }
203            let content = rest.trim_start_matches('{').trim_end_matches('}').trim();
204            let fields = Self::split_fields(content);
205
206            // Pre-allocate event with known capacity, skip Utc::now() syscall
207            let mut event =
208                Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
209
210            for field_str in &fields {
211                let field_str = field_str.trim();
212                if field_str.is_empty() {
213                    continue;
214                }
215
216                let colon_pos = field_str
217                    .find(':')
218                    .ok_or_else(|| format!("Invalid field format: {field_str}"))?;
219                let field_name = field_str[..colon_pos].trim();
220                let field_value = Self::parse_value(field_str[colon_pos + 1..].trim())?;
221                event
222                    .data
223                    .insert(intern_field_name(field_name), field_value);
224            }
225
226            Ok(event)
227        } else if rest.starts_with('(') {
228            // Positional: (value1, value2, ...)
229            let content = rest.trim_start_matches('(').trim_end_matches(')').trim();
230            let fields = Self::split_fields(content);
231
232            let mut event =
233                Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
234
235            for (i, value_str) in fields.iter().enumerate() {
236                let value_str = value_str.trim();
237                if value_str.is_empty() {
238                    continue;
239                }
240
241                let field_value = Self::parse_value(value_str)?;
242                event.data.insert(format!("field_{i}").into(), field_value);
243            }
244
245            Ok(event)
246        } else {
247            Ok(Event::new_at(*event_type, chrono::DateTime::UNIX_EPOCH))
248        }
249    }
250
251    /// Split fields by comma, respecting nested structures.
252    /// Uses byte-level scanning since all delimiters are ASCII.
253    /// Returns slices into the original content string to avoid allocations.
254    fn split_fields(content: &str) -> Vec<&str> {
255        let bytes = content.as_bytes();
256        let mut fields = Vec::new();
257        let mut field_start = 0;
258        let mut depth = 0i32;
259        let mut in_string = false;
260        let mut escape_next = false;
261
262        for i in 0..bytes.len() {
263            if escape_next {
264                escape_next = false;
265                continue;
266            }
267            match bytes[i] {
268                b'\\' => {
269                    escape_next = true;
270                }
271                b'"' => {
272                    in_string = !in_string;
273                }
274                b'{' | b'[' | b'(' if !in_string => {
275                    depth += 1;
276                }
277                b'}' | b']' | b')' if !in_string => {
278                    depth -= 1;
279                }
280                b',' if !in_string && depth == 0 => {
281                    let field = content[field_start..i].trim();
282                    if !field.is_empty() {
283                        fields.push(field);
284                    }
285                    field_start = i + 1;
286                }
287                _ => {}
288            }
289        }
290
291        let last = content[field_start..].trim();
292        if !last.is_empty() {
293            fields.push(last);
294        }
295
296        fields
297    }
298
299    /// Parse a value string into a Value
300    fn parse_value(s: &str) -> Result<Value, String> {
301        Self::parse_value_bounded(s, crate::limits::MAX_JSON_DEPTH)
302    }
303
304    /// Depth-bounded value parsing to prevent stack overflow on nested arrays.
305    fn parse_value_bounded(s: &str, depth: usize) -> Result<Value, String> {
306        let s = s.trim();
307
308        // Boolean
309        if s == "true" {
310            return Ok(Value::Bool(true));
311        }
312        if s == "false" {
313            return Ok(Value::Bool(false));
314        }
315
316        // Null
317        if s == "null" || s == "nil" {
318            return Ok(Value::Null);
319        }
320
321        // String (quoted) — require at least 2 chars for open+close quotes
322        if s.len() >= 2
323            && ((s.starts_with('"') && s.ends_with('"'))
324                || (s.starts_with('\'') && s.ends_with('\'')))
325        {
326            let inner = &s[1..s.len() - 1];
327            // Fast path: no escape sequences (common case) — zero allocations
328            if !inner.contains('\\') {
329                return Ok(Value::Str(inner.into()));
330            }
331            // Slow path: single-pass escape processing
332            let mut result = String::with_capacity(inner.len());
333            let mut chars = inner.chars();
334            while let Some(ch) = chars.next() {
335                if ch == '\\' {
336                    match chars.next() {
337                        Some('n') => result.push('\n'),
338                        Some('t') => result.push('\t'),
339                        Some('"') => result.push('"'),
340                        Some('\'') => result.push('\''),
341                        Some('\\') => result.push('\\'),
342                        Some(other) => {
343                            result.push('\\');
344                            result.push(other);
345                        }
346                        None => result.push('\\'),
347                    }
348                } else {
349                    result.push(ch);
350                }
351            }
352            return Ok(Value::Str(result.into()));
353        }
354
355        // Integer
356        if let Ok(i) = s.parse::<i64>() {
357            return Ok(Value::Int(i));
358        }
359
360        // Float
361        if let Ok(f) = s.parse::<f64>() {
362            return Ok(Value::Float(f));
363        }
364
365        // Array [v1, v2, ...]
366        if s.starts_with('[') && s.ends_with(']') {
367            if depth == 0 {
368                return Err("Array nesting too deep".to_string());
369            }
370            let inner = &s[1..s.len() - 1];
371            let items: Result<Vec<Value>, String> = Self::split_fields(inner)
372                .iter()
373                .filter(|s| !s.is_empty())
374                .map(|item| Self::parse_value_bounded(item, depth - 1))
375                .collect();
376            return Ok(Value::array(items?));
377        }
378
379        // Unquoted string (identifier-like)
380        Ok(Value::Str(s.to_string().into()))
381    }
382
383    /// Parse from a file path
384    pub fn parse_file<P: AsRef<Path>>(path: P) -> Result<EventFile, String> {
385        let path = path.as_ref();
386        let content = fs::read_to_string(path)
387            .map_err(|e| format!("Failed to read file {}: {e}", path.display()))?;
388
389        let events = Self::parse(&content)?;
390
391        Ok(EventFile {
392            name: path.to_string_lossy().to_string(),
393            events,
394        })
395    }
396
397    /// Parse a single line (either .evt format or JSONL)
398    ///
399    /// In streaming mode, events get `Utc::now()` timestamps since there is
400    /// no batch-level timing context. Lines with `@Ns` timing prefixes are
401    /// parsed normally (the prefix is stripped and the event gets wall-clock time).
402    pub fn parse_line(line: &str) -> Result<Option<Event>, String> {
403        let line = line.trim();
404
405        // Skip empty lines and comments
406        if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
407            return Ok(None);
408        }
409
410        // Skip BATCH directives (not meaningful in streaming mode)
411        if line.starts_with("BATCH") {
412            return Ok(None);
413        }
414
415        // Strip @Ns timing prefix if present (timing is wall-clock in streaming mode)
416        let line = if line.starts_with('@') {
417            let (_, rest) = Self::parse_timing_prefix(line)?;
418            rest.trim()
419        } else {
420            line
421        };
422
423        // Try JSONL format first: {"event_type": "X", "data": {...}}
424        if line.starts_with('{') {
425            let mut event = Self::parse_jsonl_line(line)?;
426            event.timestamp = chrono::Utc::now();
427            return Ok(Some(event));
428        }
429
430        // Fall back to .evt format: EventType { field: value, ... }
431        let mut event = Self::parse_event_line(line)?;
432        event.timestamp = chrono::Utc::now();
433        Ok(Some(event))
434    }
435
436    /// Parse a JSONL line
437    fn parse_jsonl_line(line: &str) -> Result<Event, String> {
438        // Enforce payload size limit before parsing
439        if line.len() > crate::limits::MAX_EVENT_PAYLOAD_BYTES {
440            return Err(format!(
441                "JSONL line too large ({} bytes, max {})",
442                line.len(),
443                crate::limits::MAX_EVENT_PAYLOAD_BYTES
444            ));
445        }
446
447        let json: serde_json::Value =
448            serde_json::from_str(line).map_err(|e| format!("Invalid JSON: {e}"))?;
449
450        let event_type = json
451            .get("event_type")
452            .and_then(|v| v.as_str())
453            .ok_or_else(|| "Missing event_type field".to_string())?;
454
455        let mut event = Event::new(event_type);
456
457        if let Some(data) = json.get("data").and_then(|v| v.as_object()) {
458            for (key, value) in data.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
459                event
460                    .data
461                    .insert(key.as_str().into(), Self::json_to_value(value));
462            }
463        }
464
465        Ok(event)
466    }
467
468    /// Convert serde_json::Value to varpulis Value (depth-bounded to prevent stack overflow)
469    fn json_to_value(v: &serde_json::Value) -> Value {
470        Self::json_to_value_bounded(v, crate::limits::MAX_JSON_DEPTH)
471    }
472
473    fn json_to_value_bounded(v: &serde_json::Value, depth: usize) -> Value {
474        if depth == 0 {
475            return Value::Null;
476        }
477        match v {
478            serde_json::Value::Null => Value::Null,
479            serde_json::Value::Bool(b) => Value::Bool(*b),
480            serde_json::Value::Number(n) => {
481                if let Some(i) = n.as_i64() {
482                    Value::Int(i)
483                } else if let Some(f) = n.as_f64() {
484                    Value::Float(f)
485                } else {
486                    Value::Null
487                }
488            }
489            serde_json::Value::String(s) => {
490                if s.len() > crate::limits::MAX_STRING_VALUE_BYTES {
491                    let truncated =
492                        &s[..s.floor_char_boundary(crate::limits::MAX_STRING_VALUE_BYTES)];
493                    Value::Str(truncated.into())
494                } else {
495                    Value::Str(s.clone().into())
496                }
497            }
498            serde_json::Value::Array(arr) => {
499                let capped = arr.len().min(crate::limits::MAX_ARRAY_ELEMENTS);
500                Value::array(
501                    arr.iter()
502                        .take(capped)
503                        .map(|v| Self::json_to_value_bounded(v, depth - 1))
504                        .collect(),
505                )
506            }
507            serde_json::Value::Object(obj) => {
508                let mut map: IndexMap<std::sync::Arc<str>, Value, FxBuildHasher> =
509                    IndexMap::with_hasher(FxBuildHasher);
510                for (k, v) in obj.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
511                    map.insert(k.as_str().into(), Self::json_to_value_bounded(v, depth - 1));
512                }
513                Value::map(map)
514            }
515        }
516    }
517}
518
519/// Streaming event file reader - reads events one at a time without loading entire file
520pub struct StreamingEventReader<R: std::io::BufRead> {
521    reader: R,
522    line_buffer: String,
523    events_read: usize,
524}
525
526impl<R: std::io::BufRead> std::fmt::Debug for StreamingEventReader<R> {
527    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
528        f.debug_struct("StreamingEventReader")
529            .field("events_read", &self.events_read)
530            .finish_non_exhaustive()
531    }
532}
533
534impl<R: std::io::BufRead> StreamingEventReader<R> {
535    pub const fn new(reader: R) -> Self {
536        Self {
537            reader,
538            line_buffer: String::new(),
539            events_read: 0,
540        }
541    }
542
543    /// Get count of events read so far
544    pub const fn events_read(&self) -> usize {
545        self.events_read
546    }
547}
548
549impl StreamingEventReader<std::io::BufReader<std::fs::File>> {
550    /// Create a streaming reader from a file path with large buffer for performance
551    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
552        let file =
553            std::fs::File::open(path.as_ref()).map_err(|e| format!("Failed to open file: {e}"))?;
554        // Use 64KB buffer for better I/O performance
555        Ok(Self::new(std::io::BufReader::with_capacity(
556            64 * 1024,
557            file,
558        )))
559    }
560}
561
562impl<R: std::io::BufRead> Iterator for StreamingEventReader<R> {
563    type Item = Result<Event, String>;
564
565    fn next(&mut self) -> Option<Self::Item> {
566        loop {
567            self.line_buffer.clear();
568            match self.reader.read_line(&mut self.line_buffer) {
569                Ok(0) => return None, // EOF
570                Ok(_) => {
571                    // Enforce line length limit to prevent OOM from single huge lines
572                    if self.line_buffer.len() > crate::limits::MAX_LINE_LENGTH {
573                        tracing::warn!(
574                            len = self.line_buffer.len(),
575                            "Skipping oversized line ({} bytes, max {})",
576                            self.line_buffer.len(),
577                            crate::limits::MAX_LINE_LENGTH
578                        );
579                        self.line_buffer.clear();
580                        continue;
581                    }
582                    match EventFileParser::parse_line(&self.line_buffer) {
583                        Ok(Some(event)) => {
584                            self.events_read += 1;
585                            return Some(Ok(event));
586                        }
587                        Ok(None) => continue, // Skip empty/comment lines
588                        Err(e) => return Some(Err(e)),
589                    }
590                }
591                Err(e) => return Some(Err(format!("Read error: {e}"))),
592            }
593        }
594    }
595}
596
597/// Event file player - sends events to engine with timing
598#[derive(Debug)]
599pub struct EventFilePlayer {
600    events: Vec<TimedEvent>,
601    sender: mpsc::Sender<Event>,
602}
603
604impl EventFilePlayer {
605    pub const fn new(events: Vec<TimedEvent>, sender: mpsc::Sender<Event>) -> Self {
606        Self { events, sender }
607    }
608
609    pub fn from_file<P: AsRef<Path>>(path: P, sender: mpsc::Sender<Event>) -> Result<Self, String> {
610        let event_file = EventFileParser::parse_file(path)?;
611        Ok(Self::new(event_file.events, sender))
612    }
613
614    /// Play events with timing
615    pub async fn play(&self) -> Result<usize, String> {
616        let start = std::time::Instant::now();
617        let mut sent_count = 0;
618
619        // Group events by time offset
620        let mut batches: HashMap<u64, Vec<&TimedEvent>> = HashMap::new();
621        for event in &self.events {
622            batches.entry(event.time_offset_ms).or_default().push(event);
623        }
624
625        // Sort batch times
626        let mut times: Vec<u64> = batches.keys().copied().collect();
627        times.sort_unstable();
628
629        for batch_time in times {
630            // Wait until batch time
631            let elapsed = start.elapsed().as_millis() as u64;
632            if batch_time > elapsed {
633                time::sleep(Duration::from_millis(batch_time - elapsed)).await;
634            }
635
636            // Send all events in this batch
637            if let Some(events) = batches.get(&batch_time) {
638                for timed_event in events {
639                    debug!(
640                        "Sending event: {} at {}ms",
641                        timed_event.event.event_type, batch_time
642                    );
643                    self.sender
644                        .send(timed_event.event.clone())
645                        .await
646                        .map_err(|e| format!("Failed to send event: {e}"))?;
647                    sent_count += 1;
648                }
649            }
650        }
651
652        info!("Played {} events from file", sent_count);
653        Ok(sent_count)
654    }
655
656    /// Play events without timing (immediate)
657    pub async fn play_immediate(&self) -> Result<usize, String> {
658        let mut sent_count = 0;
659
660        for timed_event in &self.events {
661            debug!("Sending event: {}", timed_event.event.event_type);
662            self.sender
663                .send(timed_event.event.clone())
664                .await
665                .map_err(|e| format!("Failed to send event: {e}"))?;
666            sent_count += 1;
667        }
668
669        info!("Played {} events (immediate mode)", sent_count);
670        Ok(sent_count)
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use super::*;
677
678    #[test]
679    fn test_parse_simple_event() {
680        let source = r#"
681            StockTick { symbol: "AAPL", price: 150.5, volume: 1000 }
682        "#;
683
684        let events = EventFileParser::parse(source).unwrap();
685        assert_eq!(events.len(), 1);
686        assert_eq!(&*events[0].event.event_type, "StockTick");
687        assert_eq!(
688            events[0].event.get("symbol"),
689            Some(&Value::Str("AAPL".into()))
690        );
691        assert_eq!(events[0].event.get("price"), Some(&Value::Float(150.5)));
692        assert_eq!(events[0].event.get("volume"), Some(&Value::Int(1000)));
693    }
694
695    #[test]
696    fn test_parse_batched_events() {
697        let source = r#"
698            # First batch at 0ms
699            Order { id: 1, symbol: "AAPL" }
700
701            # Second batch at 100ms
702            BATCH 100
703            Payment { order_id: 1 }
704            Shipping { order_id: 1 }
705
706            # Third batch at 200ms
707            BATCH 200
708            Confirmation { order_id: 1 }
709        "#;
710
711        let events = EventFileParser::parse(source).unwrap();
712        assert_eq!(events.len(), 4);
713
714        assert_eq!(events[0].time_offset_ms, 0);
715        assert_eq!(&*events[0].event.event_type, "Order");
716
717        assert_eq!(events[1].time_offset_ms, 100);
718        assert_eq!(&*events[1].event.event_type, "Payment");
719
720        assert_eq!(events[2].time_offset_ms, 100);
721        assert_eq!(&*events[2].event.event_type, "Shipping");
722
723        assert_eq!(events[3].time_offset_ms, 200);
724        assert_eq!(&*events[3].event.event_type, "Confirmation");
725    }
726
727    #[test]
728    fn test_parse_positional_format() {
729        let source = r#"
730            StockPrice("AAPL", 150.5)
731        "#;
732
733        let events = EventFileParser::parse(source).unwrap();
734        assert_eq!(events.len(), 1);
735        assert_eq!(&*events[0].event.event_type, "StockPrice");
736        assert_eq!(
737            events[0].event.get("field_0"),
738            Some(&Value::Str("AAPL".into()))
739        );
740        assert_eq!(events[0].event.get("field_1"), Some(&Value::Float(150.5)));
741    }
742
743    #[test]
744    fn test_parse_array_values() {
745        let source = r#"
746            BatchOrder { ids: [1, 2, 3], symbols: ["AAPL", "GOOG"] }
747        "#;
748
749        let events = EventFileParser::parse(source).unwrap();
750        assert_eq!(events.len(), 1);
751
752        let ids = events[0].event.get("ids").unwrap();
753        if let Value::Array(arr) = ids {
754            assert_eq!(arr.len(), 3);
755        } else {
756            panic!("Expected array");
757        }
758    }
759
760    #[test]
761    fn test_parse_comments() {
762        let source = r"
763            # This is a comment
764            // This is also a comment
765            Event1 { x: 1 }
766            # Another comment
767            Event2 { y: 2 }
768        ";
769
770        let events = EventFileParser::parse(source).unwrap();
771        assert_eq!(events.len(), 2);
772    }
773
774    #[test]
775    fn test_sequence_scenario() {
776        let source = r#"
777            # Test sequence: Order -> Payment
778            
779            # Start with an order
780            Order { id: 1, symbol: "AAPL", quantity: 100 }
781            
782            # Payment arrives 50ms later
783            BATCH 50
784            Payment { order_id: 1, amount: 15000.0 }
785            
786            # Another order without payment (should timeout)
787            BATCH 100
788            Order { id: 2, symbol: "GOOG", quantity: 50 }
789            
790            # Much later, payment for order 2 (may timeout)
791            BATCH 5000
792            Payment { order_id: 2, amount: 7500.0 }
793        "#;
794
795        let events = EventFileParser::parse(source).unwrap();
796        assert_eq!(events.len(), 4);
797
798        // Verify order sequence
799        assert_eq!(&*events[0].event.event_type, "Order");
800        assert_eq!(events[0].time_offset_ms, 0);
801
802        assert_eq!(&*events[1].event.event_type, "Payment");
803        assert_eq!(events[1].time_offset_ms, 50);
804
805        assert_eq!(&*events[2].event.event_type, "Order");
806        assert_eq!(events[2].time_offset_ms, 100);
807
808        assert_eq!(&*events[3].event.event_type, "Payment");
809        assert_eq!(events[3].time_offset_ms, 5000);
810    }
811
812    // ==========================================================================
813    // Value Parsing Tests
814    // ==========================================================================
815
816    #[test]
817    fn test_parse_boolean_values() {
818        let source = r"
819            Flags { active: true, disabled: false }
820        ";
821
822        let events = EventFileParser::parse(source).unwrap();
823        assert_eq!(events[0].event.get("active"), Some(&Value::Bool(true)));
824        assert_eq!(events[0].event.get("disabled"), Some(&Value::Bool(false)));
825    }
826
827    #[test]
828    fn test_parse_null_values() {
829        let source = r"
830            Data { value: null, other: nil }
831        ";
832
833        let events = EventFileParser::parse(source).unwrap();
834        assert_eq!(events[0].event.get("value"), Some(&Value::Null));
835        assert_eq!(events[0].event.get("other"), Some(&Value::Null));
836    }
837
838    #[test]
839    fn test_parse_escape_sequences() {
840        let source = r#"
841            Message { text: "Hello\nWorld", path: "C:\\Users\\test" }
842        "#;
843
844        let events = EventFileParser::parse(source).unwrap();
845        let text = events[0].event.get("text").unwrap();
846        if let Value::Str(s) = text {
847            assert!(s.contains('\n'));
848        }
849    }
850
851    #[test]
852    fn test_parse_single_quoted_string() {
853        let source = r"
854            Event { name: 'single quoted' }
855        ";
856
857        let events = EventFileParser::parse(source).unwrap();
858        assert_eq!(
859            events[0].event.get("name"),
860            Some(&Value::Str("single quoted".into()))
861        );
862    }
863
864    #[test]
865    fn test_parse_unquoted_identifier() {
866        let source = r"
867            Event { status: active, mode: processing }
868        ";
869
870        let events = EventFileParser::parse(source).unwrap();
871        assert_eq!(
872            events[0].event.get("status"),
873            Some(&Value::Str("active".into()))
874        );
875        assert_eq!(
876            events[0].event.get("mode"),
877            Some(&Value::Str("processing".into()))
878        );
879    }
880
881    #[test]
882    fn test_parse_negative_numbers() {
883        let source = r"
884            Data { temp: -15, delta: -2.5 }
885        ";
886
887        let events = EventFileParser::parse(source).unwrap();
888        assert_eq!(events[0].event.get("temp"), Some(&Value::Int(-15)));
889        assert_eq!(events[0].event.get("delta"), Some(&Value::Float(-2.5)));
890    }
891
892    #[test]
893    fn test_parse_nested_array() {
894        let source = r"
895            Complex { matrix: [[1, 2], [3, 4]] }
896        ";
897
898        let events = EventFileParser::parse(source).unwrap();
899        let matrix = events[0].event.get("matrix").unwrap();
900        if let Value::Array(arr) = matrix {
901            assert_eq!(arr.len(), 2);
902        } else {
903            panic!("Expected array");
904        }
905    }
906
907    // ==========================================================================
908    // Error Handling Tests
909    // ==========================================================================
910
911    #[test]
912    fn test_parse_invalid_event_format() {
913        let source = "InvalidEvent";
914        let result = EventFileParser::parse(source);
915        assert!(result.is_err());
916    }
917
918    #[test]
919    fn test_parse_invalid_field_format() {
920        let source = "Event { invalid_no_colon }";
921        let result = EventFileParser::parse(source);
922        assert!(result.is_err());
923    }
924
925    #[test]
926    fn test_parse_invalid_batch_time() {
927        let source = r"
928            BATCH not_a_number
929            Event { x: 1 }
930        ";
931        let result = EventFileParser::parse(source);
932        assert!(result.is_err());
933    }
934
935    // ==========================================================================
936    // Edge Cases
937    // ==========================================================================
938
939    #[test]
940    fn test_parse_empty_content() {
941        let source = "";
942        let events = EventFileParser::parse(source).unwrap();
943        assert!(events.is_empty());
944    }
945
946    #[test]
947    fn test_parse_only_comments() {
948        let source = r"
949            # Comment 1
950            // Comment 2
951            # Comment 3
952        ";
953        let events = EventFileParser::parse(source).unwrap();
954        assert!(events.is_empty());
955    }
956
957    #[test]
958    fn test_parse_empty_braces() {
959        let source = "EmptyEvent { }";
960        let events = EventFileParser::parse(source).unwrap();
961        assert_eq!(events.len(), 1);
962        assert!(events[0].event.data.is_empty());
963    }
964
965    #[test]
966    fn test_parse_semicolon_terminated() {
967        let source = "Event { x: 1 };";
968        let events = EventFileParser::parse(source).unwrap();
969        assert_eq!(events.len(), 1);
970    }
971
972    #[test]
973    fn test_parse_whitespace_handling() {
974        let source = "  Event  {  x  :  1  ,  y  :  2  }  ";
975        let events = EventFileParser::parse(source).unwrap();
976        assert_eq!(events.len(), 1);
977        assert_eq!(events[0].event.get("x"), Some(&Value::Int(1)));
978        assert_eq!(events[0].event.get("y"), Some(&Value::Int(2)));
979    }
980
981    // ==========================================================================
982    // EventFilePlayer Tests
983    // ==========================================================================
984
985    #[tokio::test]
986    async fn test_player_immediate() {
987        let events = vec![
988            TimedEvent {
989                event: Event::new("A").with_field("id", 1i64),
990                time_offset_ms: 0,
991            },
992            TimedEvent {
993                event: Event::new("B").with_field("id", 2i64),
994                time_offset_ms: 100,
995            },
996        ];
997
998        let (tx, mut rx) = mpsc::channel(10);
999        let player = EventFilePlayer::new(events, tx);
1000
1001        let count = player.play_immediate().await.unwrap();
1002        assert_eq!(count, 2);
1003
1004        let e1 = rx.recv().await.unwrap();
1005        assert_eq!(&*e1.event_type, "A");
1006
1007        let e2 = rx.recv().await.unwrap();
1008        assert_eq!(&*e2.event_type, "B");
1009    }
1010
1011    #[tokio::test]
1012    async fn test_player_with_batches() {
1013        let events = vec![
1014            TimedEvent {
1015                event: Event::new("First"),
1016                time_offset_ms: 0,
1017            },
1018            TimedEvent {
1019                event: Event::new("Second"),
1020                time_offset_ms: 0,
1021            },
1022            TimedEvent {
1023                event: Event::new("Third"),
1024                time_offset_ms: 10, // 10ms later
1025            },
1026        ];
1027
1028        let (tx, mut rx) = mpsc::channel(10);
1029        let player = EventFilePlayer::new(events, tx);
1030
1031        let count = player.play().await.unwrap();
1032        assert_eq!(count, 3);
1033
1034        // All events should have been sent
1035        assert!(rx.recv().await.is_some());
1036        assert!(rx.recv().await.is_some());
1037        assert!(rx.recv().await.is_some());
1038    }
1039
1040    #[tokio::test]
1041    async fn test_player_empty() {
1042        let events = vec![];
1043        let (tx, _rx) = mpsc::channel(10);
1044        let player = EventFilePlayer::new(events, tx);
1045
1046        let count = player.play_immediate().await.unwrap();
1047        assert_eq!(count, 0);
1048    }
1049}