1use std::cell::RefCell;
28use std::collections::HashMap;
29use std::fs;
30use std::path::Path;
31use std::sync::Arc;
32use std::time::Duration;
33
34use indexmap::IndexMap;
35use rustc_hash::{FxBuildHasher, FxHashMap};
36use tokio::sync::mpsc;
37use tokio::time;
38use tracing::{debug, info};
39use varpulis_core::Value;
40
41use crate::event::Event;
42
43thread_local! {
44 static FIELD_INTERNER: RefCell<FxHashMap<Box<str>, Arc<str>>> =
45 RefCell::new(FxHashMap::default());
46}
47
48fn intern_field_name(name: &str) -> Arc<str> {
51 FIELD_INTERNER.with(|interner| {
52 let mut map = interner.borrow_mut();
53 if let Some(arc) = map.get(name) {
54 arc.clone()
55 } else {
56 let arc: Arc<str> = name.into();
57 map.insert(name.into(), arc.clone());
58 arc
59 }
60 })
61}
62
63#[derive(Debug, Clone)]
65pub struct TimedEvent {
66 pub event: Event,
68 pub time_offset_ms: u64,
70}
71
72#[derive(Debug, Clone)]
74pub struct EventFile {
75 pub name: String,
77 pub events: Vec<TimedEvent>,
79}
80
81#[derive(Debug)]
83pub struct EventFileParser;
84
85impl EventFileParser {
86 pub fn parse(source: &str) -> Result<Vec<TimedEvent>, String> {
88 let mut events = Vec::new();
89 let mut current_batch_time: u64 = 0;
90
91 for (line_num, line) in source.lines().enumerate() {
92 let line = line.trim();
93
94 if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
96 continue;
97 }
98
99 if line.starts_with("BATCH") {
101 let parts: Vec<&str> = line.split_whitespace().collect();
102 if parts.len() >= 2 {
103 current_batch_time = parts[1]
104 .parse()
105 .map_err(|_| format!("Invalid BATCH time at line {}", line_num + 1))?;
106 }
107 continue;
108 }
109
110 let (time_offset, event_line) = if line.starts_with('@') {
112 Self::parse_timing_prefix(line)?
113 } else {
114 (current_batch_time, line)
115 };
116
117 let event = if event_line.starts_with('{') {
119 Self::parse_jsonl_line(event_line)
120 .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
121 } else {
122 Self::parse_event_line(event_line)
123 .map_err(|e| format!("Error at line {}: {}", line_num + 1, e))?
124 };
125
126 let mut event = event;
129 event.timestamp =
130 chrono::DateTime::UNIX_EPOCH + chrono::Duration::milliseconds(time_offset as i64);
131
132 events.push(TimedEvent {
133 event,
134 time_offset_ms: time_offset,
135 });
136 }
137
138 Ok(events)
139 }
140
141 fn parse_timing_prefix(line: &str) -> Result<(u64, &str), String> {
143 let line = line.trim_start_matches('@');
145
146 let space_pos = line
148 .find(char::is_whitespace)
149 .ok_or_else(|| "Invalid timing prefix format".to_string())?;
150
151 let timing_str = &line[..space_pos];
152 let rest = line[space_pos..].trim();
153
154 let time_ms = if timing_str.ends_with("ms") {
156 timing_str
157 .trim_end_matches("ms")
158 .parse::<u64>()
159 .map_err(|_| format!("Invalid timing value: {timing_str}"))?
160 } else if timing_str.ends_with('s') {
161 let secs = timing_str
162 .trim_end_matches('s')
163 .parse::<u64>()
164 .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
165 secs.checked_mul(1000)
166 .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
167 } else if timing_str.ends_with('m') {
168 let mins = timing_str
169 .trim_end_matches('m')
170 .parse::<u64>()
171 .map_err(|_| format!("Invalid timing value: {timing_str}"))?;
172 mins.checked_mul(60_000)
173 .ok_or_else(|| format!("Timing value overflow: {timing_str}"))?
174 } else {
175 timing_str
177 .parse::<u64>()
178 .map_err(|_| format!("Invalid timing value: {timing_str}"))?
179 };
180
181 Ok((time_ms, rest))
182 }
183
184 fn parse_event_line(line: &str) -> Result<Event, String> {
186 let line = line.trim().trim_end_matches(';');
190
191 let (event_type, rest) = if let Some(brace_pos) = line.find('{') {
193 (&line[..brace_pos].trim(), &line[brace_pos..])
194 } else if let Some(paren_pos) = line.find('(') {
195 (&line[..paren_pos].trim(), &line[paren_pos..])
196 } else {
197 return Err(format!("Invalid event format: {line}"));
198 };
199
200 if rest.starts_with('{') {
202 let content = rest.trim_start_matches('{').trim_end_matches('}').trim();
204 let fields = Self::split_fields(content);
205
206 let mut event =
208 Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
209
210 for field_str in &fields {
211 let field_str = field_str.trim();
212 if field_str.is_empty() {
213 continue;
214 }
215
216 let colon_pos = field_str
217 .find(':')
218 .ok_or_else(|| format!("Invalid field format: {field_str}"))?;
219 let field_name = field_str[..colon_pos].trim();
220 let field_value = Self::parse_value(field_str[colon_pos + 1..].trim())?;
221 event
222 .data
223 .insert(intern_field_name(field_name), field_value);
224 }
225
226 Ok(event)
227 } else if rest.starts_with('(') {
228 let content = rest.trim_start_matches('(').trim_end_matches(')').trim();
230 let fields = Self::split_fields(content);
231
232 let mut event =
233 Event::with_capacity_at(*event_type, fields.len(), chrono::DateTime::UNIX_EPOCH);
234
235 for (i, value_str) in fields.iter().enumerate() {
236 let value_str = value_str.trim();
237 if value_str.is_empty() {
238 continue;
239 }
240
241 let field_value = Self::parse_value(value_str)?;
242 event.data.insert(format!("field_{i}").into(), field_value);
243 }
244
245 Ok(event)
246 } else {
247 Ok(Event::new_at(*event_type, chrono::DateTime::UNIX_EPOCH))
248 }
249 }
250
251 fn split_fields(content: &str) -> Vec<&str> {
255 let bytes = content.as_bytes();
256 let mut fields = Vec::new();
257 let mut field_start = 0;
258 let mut depth = 0i32;
259 let mut in_string = false;
260 let mut escape_next = false;
261
262 for i in 0..bytes.len() {
263 if escape_next {
264 escape_next = false;
265 continue;
266 }
267 match bytes[i] {
268 b'\\' => {
269 escape_next = true;
270 }
271 b'"' => {
272 in_string = !in_string;
273 }
274 b'{' | b'[' | b'(' if !in_string => {
275 depth += 1;
276 }
277 b'}' | b']' | b')' if !in_string => {
278 depth -= 1;
279 }
280 b',' if !in_string && depth == 0 => {
281 let field = content[field_start..i].trim();
282 if !field.is_empty() {
283 fields.push(field);
284 }
285 field_start = i + 1;
286 }
287 _ => {}
288 }
289 }
290
291 let last = content[field_start..].trim();
292 if !last.is_empty() {
293 fields.push(last);
294 }
295
296 fields
297 }
298
299 fn parse_value(s: &str) -> Result<Value, String> {
301 Self::parse_value_bounded(s, crate::limits::MAX_JSON_DEPTH)
302 }
303
304 fn parse_value_bounded(s: &str, depth: usize) -> Result<Value, String> {
306 let s = s.trim();
307
308 if s == "true" {
310 return Ok(Value::Bool(true));
311 }
312 if s == "false" {
313 return Ok(Value::Bool(false));
314 }
315
316 if s == "null" || s == "nil" {
318 return Ok(Value::Null);
319 }
320
321 if s.len() >= 2
323 && ((s.starts_with('"') && s.ends_with('"'))
324 || (s.starts_with('\'') && s.ends_with('\'')))
325 {
326 let inner = &s[1..s.len() - 1];
327 if !inner.contains('\\') {
329 return Ok(Value::Str(inner.into()));
330 }
331 let mut result = String::with_capacity(inner.len());
333 let mut chars = inner.chars();
334 while let Some(ch) = chars.next() {
335 if ch == '\\' {
336 match chars.next() {
337 Some('n') => result.push('\n'),
338 Some('t') => result.push('\t'),
339 Some('"') => result.push('"'),
340 Some('\'') => result.push('\''),
341 Some('\\') => result.push('\\'),
342 Some(other) => {
343 result.push('\\');
344 result.push(other);
345 }
346 None => result.push('\\'),
347 }
348 } else {
349 result.push(ch);
350 }
351 }
352 return Ok(Value::Str(result.into()));
353 }
354
355 if let Ok(i) = s.parse::<i64>() {
357 return Ok(Value::Int(i));
358 }
359
360 if let Ok(f) = s.parse::<f64>() {
362 return Ok(Value::Float(f));
363 }
364
365 if s.starts_with('[') && s.ends_with(']') {
367 if depth == 0 {
368 return Err("Array nesting too deep".to_string());
369 }
370 let inner = &s[1..s.len() - 1];
371 let items: Result<Vec<Value>, String> = Self::split_fields(inner)
372 .iter()
373 .filter(|s| !s.is_empty())
374 .map(|item| Self::parse_value_bounded(item, depth - 1))
375 .collect();
376 return Ok(Value::array(items?));
377 }
378
379 Ok(Value::Str(s.to_string().into()))
381 }
382
383 pub fn parse_file<P: AsRef<Path>>(path: P) -> Result<EventFile, String> {
385 let path = path.as_ref();
386 let content = fs::read_to_string(path)
387 .map_err(|e| format!("Failed to read file {}: {e}", path.display()))?;
388
389 let events = Self::parse(&content)?;
390
391 Ok(EventFile {
392 name: path.to_string_lossy().to_string(),
393 events,
394 })
395 }
396
397 pub fn parse_line(line: &str) -> Result<Option<Event>, String> {
403 let line = line.trim();
404
405 if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
407 return Ok(None);
408 }
409
410 if line.starts_with("BATCH") {
412 return Ok(None);
413 }
414
415 let line = if line.starts_with('@') {
417 let (_, rest) = Self::parse_timing_prefix(line)?;
418 rest.trim()
419 } else {
420 line
421 };
422
423 if line.starts_with('{') {
425 let mut event = Self::parse_jsonl_line(line)?;
426 event.timestamp = chrono::Utc::now();
427 return Ok(Some(event));
428 }
429
430 let mut event = Self::parse_event_line(line)?;
432 event.timestamp = chrono::Utc::now();
433 Ok(Some(event))
434 }
435
436 fn parse_jsonl_line(line: &str) -> Result<Event, String> {
438 if line.len() > crate::limits::MAX_EVENT_PAYLOAD_BYTES {
440 return Err(format!(
441 "JSONL line too large ({} bytes, max {})",
442 line.len(),
443 crate::limits::MAX_EVENT_PAYLOAD_BYTES
444 ));
445 }
446
447 let json: serde_json::Value =
448 serde_json::from_str(line).map_err(|e| format!("Invalid JSON: {e}"))?;
449
450 let event_type = json
451 .get("event_type")
452 .and_then(|v| v.as_str())
453 .ok_or_else(|| "Missing event_type field".to_string())?;
454
455 let mut event = Event::new(event_type);
456
457 if let Some(data) = json.get("data").and_then(|v| v.as_object()) {
458 for (key, value) in data.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
459 event
460 .data
461 .insert(key.as_str().into(), Self::json_to_value(value));
462 }
463 }
464
465 Ok(event)
466 }
467
468 fn json_to_value(v: &serde_json::Value) -> Value {
470 Self::json_to_value_bounded(v, crate::limits::MAX_JSON_DEPTH)
471 }
472
473 fn json_to_value_bounded(v: &serde_json::Value, depth: usize) -> Value {
474 if depth == 0 {
475 return Value::Null;
476 }
477 match v {
478 serde_json::Value::Null => Value::Null,
479 serde_json::Value::Bool(b) => Value::Bool(*b),
480 serde_json::Value::Number(n) => {
481 if let Some(i) = n.as_i64() {
482 Value::Int(i)
483 } else if let Some(f) = n.as_f64() {
484 Value::Float(f)
485 } else {
486 Value::Null
487 }
488 }
489 serde_json::Value::String(s) => {
490 if s.len() > crate::limits::MAX_STRING_VALUE_BYTES {
491 let truncated =
492 &s[..s.floor_char_boundary(crate::limits::MAX_STRING_VALUE_BYTES)];
493 Value::Str(truncated.into())
494 } else {
495 Value::Str(s.clone().into())
496 }
497 }
498 serde_json::Value::Array(arr) => {
499 let capped = arr.len().min(crate::limits::MAX_ARRAY_ELEMENTS);
500 Value::array(
501 arr.iter()
502 .take(capped)
503 .map(|v| Self::json_to_value_bounded(v, depth - 1))
504 .collect(),
505 )
506 }
507 serde_json::Value::Object(obj) => {
508 let mut map: IndexMap<std::sync::Arc<str>, Value, FxBuildHasher> =
509 IndexMap::with_hasher(FxBuildHasher);
510 for (k, v) in obj.iter().take(crate::limits::MAX_FIELDS_PER_EVENT) {
511 map.insert(k.as_str().into(), Self::json_to_value_bounded(v, depth - 1));
512 }
513 Value::map(map)
514 }
515 }
516 }
517}
518
519pub struct StreamingEventReader<R: std::io::BufRead> {
521 reader: R,
522 line_buffer: String,
523 events_read: usize,
524}
525
526impl<R: std::io::BufRead> std::fmt::Debug for StreamingEventReader<R> {
527 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
528 f.debug_struct("StreamingEventReader")
529 .field("events_read", &self.events_read)
530 .finish_non_exhaustive()
531 }
532}
533
534impl<R: std::io::BufRead> StreamingEventReader<R> {
535 pub const fn new(reader: R) -> Self {
536 Self {
537 reader,
538 line_buffer: String::new(),
539 events_read: 0,
540 }
541 }
542
543 pub const fn events_read(&self) -> usize {
545 self.events_read
546 }
547}
548
549impl StreamingEventReader<std::io::BufReader<std::fs::File>> {
550 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
552 let file =
553 std::fs::File::open(path.as_ref()).map_err(|e| format!("Failed to open file: {e}"))?;
554 Ok(Self::new(std::io::BufReader::with_capacity(
556 64 * 1024,
557 file,
558 )))
559 }
560}
561
562impl<R: std::io::BufRead> Iterator for StreamingEventReader<R> {
563 type Item = Result<Event, String>;
564
565 fn next(&mut self) -> Option<Self::Item> {
566 loop {
567 self.line_buffer.clear();
568 match self.reader.read_line(&mut self.line_buffer) {
569 Ok(0) => return None, Ok(_) => {
571 if self.line_buffer.len() > crate::limits::MAX_LINE_LENGTH {
573 tracing::warn!(
574 len = self.line_buffer.len(),
575 "Skipping oversized line ({} bytes, max {})",
576 self.line_buffer.len(),
577 crate::limits::MAX_LINE_LENGTH
578 );
579 self.line_buffer.clear();
580 continue;
581 }
582 match EventFileParser::parse_line(&self.line_buffer) {
583 Ok(Some(event)) => {
584 self.events_read += 1;
585 return Some(Ok(event));
586 }
587 Ok(None) => continue, Err(e) => return Some(Err(e)),
589 }
590 }
591 Err(e) => return Some(Err(format!("Read error: {e}"))),
592 }
593 }
594 }
595}
596
597#[derive(Debug)]
599pub struct EventFilePlayer {
600 events: Vec<TimedEvent>,
601 sender: mpsc::Sender<Event>,
602}
603
604impl EventFilePlayer {
605 pub const fn new(events: Vec<TimedEvent>, sender: mpsc::Sender<Event>) -> Self {
606 Self { events, sender }
607 }
608
609 pub fn from_file<P: AsRef<Path>>(path: P, sender: mpsc::Sender<Event>) -> Result<Self, String> {
610 let event_file = EventFileParser::parse_file(path)?;
611 Ok(Self::new(event_file.events, sender))
612 }
613
614 pub async fn play(&self) -> Result<usize, String> {
616 let start = std::time::Instant::now();
617 let mut sent_count = 0;
618
619 let mut batches: HashMap<u64, Vec<&TimedEvent>> = HashMap::new();
621 for event in &self.events {
622 batches.entry(event.time_offset_ms).or_default().push(event);
623 }
624
625 let mut times: Vec<u64> = batches.keys().copied().collect();
627 times.sort_unstable();
628
629 for batch_time in times {
630 let elapsed = start.elapsed().as_millis() as u64;
632 if batch_time > elapsed {
633 time::sleep(Duration::from_millis(batch_time - elapsed)).await;
634 }
635
636 if let Some(events) = batches.get(&batch_time) {
638 for timed_event in events {
639 debug!(
640 "Sending event: {} at {}ms",
641 timed_event.event.event_type, batch_time
642 );
643 self.sender
644 .send(timed_event.event.clone())
645 .await
646 .map_err(|e| format!("Failed to send event: {e}"))?;
647 sent_count += 1;
648 }
649 }
650 }
651
652 info!("Played {} events from file", sent_count);
653 Ok(sent_count)
654 }
655
656 pub async fn play_immediate(&self) -> Result<usize, String> {
658 let mut sent_count = 0;
659
660 for timed_event in &self.events {
661 debug!("Sending event: {}", timed_event.event.event_type);
662 self.sender
663 .send(timed_event.event.clone())
664 .await
665 .map_err(|e| format!("Failed to send event: {e}"))?;
666 sent_count += 1;
667 }
668
669 info!("Played {} events (immediate mode)", sent_count);
670 Ok(sent_count)
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use super::*;
677
678 #[test]
679 fn test_parse_simple_event() {
680 let source = r#"
681 StockTick { symbol: "AAPL", price: 150.5, volume: 1000 }
682 "#;
683
684 let events = EventFileParser::parse(source).unwrap();
685 assert_eq!(events.len(), 1);
686 assert_eq!(&*events[0].event.event_type, "StockTick");
687 assert_eq!(
688 events[0].event.get("symbol"),
689 Some(&Value::Str("AAPL".into()))
690 );
691 assert_eq!(events[0].event.get("price"), Some(&Value::Float(150.5)));
692 assert_eq!(events[0].event.get("volume"), Some(&Value::Int(1000)));
693 }
694
695 #[test]
696 fn test_parse_batched_events() {
697 let source = r#"
698 # First batch at 0ms
699 Order { id: 1, symbol: "AAPL" }
700
701 # Second batch at 100ms
702 BATCH 100
703 Payment { order_id: 1 }
704 Shipping { order_id: 1 }
705
706 # Third batch at 200ms
707 BATCH 200
708 Confirmation { order_id: 1 }
709 "#;
710
711 let events = EventFileParser::parse(source).unwrap();
712 assert_eq!(events.len(), 4);
713
714 assert_eq!(events[0].time_offset_ms, 0);
715 assert_eq!(&*events[0].event.event_type, "Order");
716
717 assert_eq!(events[1].time_offset_ms, 100);
718 assert_eq!(&*events[1].event.event_type, "Payment");
719
720 assert_eq!(events[2].time_offset_ms, 100);
721 assert_eq!(&*events[2].event.event_type, "Shipping");
722
723 assert_eq!(events[3].time_offset_ms, 200);
724 assert_eq!(&*events[3].event.event_type, "Confirmation");
725 }
726
727 #[test]
728 fn test_parse_positional_format() {
729 let source = r#"
730 StockPrice("AAPL", 150.5)
731 "#;
732
733 let events = EventFileParser::parse(source).unwrap();
734 assert_eq!(events.len(), 1);
735 assert_eq!(&*events[0].event.event_type, "StockPrice");
736 assert_eq!(
737 events[0].event.get("field_0"),
738 Some(&Value::Str("AAPL".into()))
739 );
740 assert_eq!(events[0].event.get("field_1"), Some(&Value::Float(150.5)));
741 }
742
743 #[test]
744 fn test_parse_array_values() {
745 let source = r#"
746 BatchOrder { ids: [1, 2, 3], symbols: ["AAPL", "GOOG"] }
747 "#;
748
749 let events = EventFileParser::parse(source).unwrap();
750 assert_eq!(events.len(), 1);
751
752 let ids = events[0].event.get("ids").unwrap();
753 if let Value::Array(arr) = ids {
754 assert_eq!(arr.len(), 3);
755 } else {
756 panic!("Expected array");
757 }
758 }
759
760 #[test]
761 fn test_parse_comments() {
762 let source = r"
763 # This is a comment
764 // This is also a comment
765 Event1 { x: 1 }
766 # Another comment
767 Event2 { y: 2 }
768 ";
769
770 let events = EventFileParser::parse(source).unwrap();
771 assert_eq!(events.len(), 2);
772 }
773
774 #[test]
775 fn test_sequence_scenario() {
776 let source = r#"
777 # Test sequence: Order -> Payment
778
779 # Start with an order
780 Order { id: 1, symbol: "AAPL", quantity: 100 }
781
782 # Payment arrives 50ms later
783 BATCH 50
784 Payment { order_id: 1, amount: 15000.0 }
785
786 # Another order without payment (should timeout)
787 BATCH 100
788 Order { id: 2, symbol: "GOOG", quantity: 50 }
789
790 # Much later, payment for order 2 (may timeout)
791 BATCH 5000
792 Payment { order_id: 2, amount: 7500.0 }
793 "#;
794
795 let events = EventFileParser::parse(source).unwrap();
796 assert_eq!(events.len(), 4);
797
798 assert_eq!(&*events[0].event.event_type, "Order");
800 assert_eq!(events[0].time_offset_ms, 0);
801
802 assert_eq!(&*events[1].event.event_type, "Payment");
803 assert_eq!(events[1].time_offset_ms, 50);
804
805 assert_eq!(&*events[2].event.event_type, "Order");
806 assert_eq!(events[2].time_offset_ms, 100);
807
808 assert_eq!(&*events[3].event.event_type, "Payment");
809 assert_eq!(events[3].time_offset_ms, 5000);
810 }
811
812 #[test]
817 fn test_parse_boolean_values() {
818 let source = r"
819 Flags { active: true, disabled: false }
820 ";
821
822 let events = EventFileParser::parse(source).unwrap();
823 assert_eq!(events[0].event.get("active"), Some(&Value::Bool(true)));
824 assert_eq!(events[0].event.get("disabled"), Some(&Value::Bool(false)));
825 }
826
827 #[test]
828 fn test_parse_null_values() {
829 let source = r"
830 Data { value: null, other: nil }
831 ";
832
833 let events = EventFileParser::parse(source).unwrap();
834 assert_eq!(events[0].event.get("value"), Some(&Value::Null));
835 assert_eq!(events[0].event.get("other"), Some(&Value::Null));
836 }
837
838 #[test]
839 fn test_parse_escape_sequences() {
840 let source = r#"
841 Message { text: "Hello\nWorld", path: "C:\\Users\\test" }
842 "#;
843
844 let events = EventFileParser::parse(source).unwrap();
845 let text = events[0].event.get("text").unwrap();
846 if let Value::Str(s) = text {
847 assert!(s.contains('\n'));
848 }
849 }
850
851 #[test]
852 fn test_parse_single_quoted_string() {
853 let source = r"
854 Event { name: 'single quoted' }
855 ";
856
857 let events = EventFileParser::parse(source).unwrap();
858 assert_eq!(
859 events[0].event.get("name"),
860 Some(&Value::Str("single quoted".into()))
861 );
862 }
863
864 #[test]
865 fn test_parse_unquoted_identifier() {
866 let source = r"
867 Event { status: active, mode: processing }
868 ";
869
870 let events = EventFileParser::parse(source).unwrap();
871 assert_eq!(
872 events[0].event.get("status"),
873 Some(&Value::Str("active".into()))
874 );
875 assert_eq!(
876 events[0].event.get("mode"),
877 Some(&Value::Str("processing".into()))
878 );
879 }
880
881 #[test]
882 fn test_parse_negative_numbers() {
883 let source = r"
884 Data { temp: -15, delta: -2.5 }
885 ";
886
887 let events = EventFileParser::parse(source).unwrap();
888 assert_eq!(events[0].event.get("temp"), Some(&Value::Int(-15)));
889 assert_eq!(events[0].event.get("delta"), Some(&Value::Float(-2.5)));
890 }
891
892 #[test]
893 fn test_parse_nested_array() {
894 let source = r"
895 Complex { matrix: [[1, 2], [3, 4]] }
896 ";
897
898 let events = EventFileParser::parse(source).unwrap();
899 let matrix = events[0].event.get("matrix").unwrap();
900 if let Value::Array(arr) = matrix {
901 assert_eq!(arr.len(), 2);
902 } else {
903 panic!("Expected array");
904 }
905 }
906
907 #[test]
912 fn test_parse_invalid_event_format() {
913 let source = "InvalidEvent";
914 let result = EventFileParser::parse(source);
915 assert!(result.is_err());
916 }
917
918 #[test]
919 fn test_parse_invalid_field_format() {
920 let source = "Event { invalid_no_colon }";
921 let result = EventFileParser::parse(source);
922 assert!(result.is_err());
923 }
924
925 #[test]
926 fn test_parse_invalid_batch_time() {
927 let source = r"
928 BATCH not_a_number
929 Event { x: 1 }
930 ";
931 let result = EventFileParser::parse(source);
932 assert!(result.is_err());
933 }
934
935 #[test]
940 fn test_parse_empty_content() {
941 let source = "";
942 let events = EventFileParser::parse(source).unwrap();
943 assert!(events.is_empty());
944 }
945
946 #[test]
947 fn test_parse_only_comments() {
948 let source = r"
949 # Comment 1
950 // Comment 2
951 # Comment 3
952 ";
953 let events = EventFileParser::parse(source).unwrap();
954 assert!(events.is_empty());
955 }
956
957 #[test]
958 fn test_parse_empty_braces() {
959 let source = "EmptyEvent { }";
960 let events = EventFileParser::parse(source).unwrap();
961 assert_eq!(events.len(), 1);
962 assert!(events[0].event.data.is_empty());
963 }
964
965 #[test]
966 fn test_parse_semicolon_terminated() {
967 let source = "Event { x: 1 };";
968 let events = EventFileParser::parse(source).unwrap();
969 assert_eq!(events.len(), 1);
970 }
971
972 #[test]
973 fn test_parse_whitespace_handling() {
974 let source = " Event { x : 1 , y : 2 } ";
975 let events = EventFileParser::parse(source).unwrap();
976 assert_eq!(events.len(), 1);
977 assert_eq!(events[0].event.get("x"), Some(&Value::Int(1)));
978 assert_eq!(events[0].event.get("y"), Some(&Value::Int(2)));
979 }
980
981 #[tokio::test]
986 async fn test_player_immediate() {
987 let events = vec![
988 TimedEvent {
989 event: Event::new("A").with_field("id", 1i64),
990 time_offset_ms: 0,
991 },
992 TimedEvent {
993 event: Event::new("B").with_field("id", 2i64),
994 time_offset_ms: 100,
995 },
996 ];
997
998 let (tx, mut rx) = mpsc::channel(10);
999 let player = EventFilePlayer::new(events, tx);
1000
1001 let count = player.play_immediate().await.unwrap();
1002 assert_eq!(count, 2);
1003
1004 let e1 = rx.recv().await.unwrap();
1005 assert_eq!(&*e1.event_type, "A");
1006
1007 let e2 = rx.recv().await.unwrap();
1008 assert_eq!(&*e2.event_type, "B");
1009 }
1010
1011 #[tokio::test]
1012 async fn test_player_with_batches() {
1013 let events = vec![
1014 TimedEvent {
1015 event: Event::new("First"),
1016 time_offset_ms: 0,
1017 },
1018 TimedEvent {
1019 event: Event::new("Second"),
1020 time_offset_ms: 0,
1021 },
1022 TimedEvent {
1023 event: Event::new("Third"),
1024 time_offset_ms: 10, },
1026 ];
1027
1028 let (tx, mut rx) = mpsc::channel(10);
1029 let player = EventFilePlayer::new(events, tx);
1030
1031 let count = player.play().await.unwrap();
1032 assert_eq!(count, 3);
1033
1034 assert!(rx.recv().await.is_some());
1036 assert!(rx.recv().await.is_some());
1037 assert!(rx.recv().await.is_some());
1038 }
1039
1040 #[tokio::test]
1041 async fn test_player_empty() {
1042 let events = vec![];
1043 let (tx, _rx) = mpsc::channel(10);
1044 let player = EventFilePlayer::new(events, tx);
1045
1046 let count = player.play_immediate().await.unwrap();
1047 assert_eq!(count, 0);
1048 }
1049}