1use anyhow::Result;
21use chrono::{DateTime, Utc};
22use serde::{Deserialize, Serialize};
23use std::collections::{BTreeMap, HashMap, VecDeque};
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tracing::{debug, info};
28
29use crate::event::StreamEvent;
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct OutOfOrderConfig {
34 pub max_lateness: Duration,
36 pub buffer_capacity: usize,
38 pub late_event_strategy: LateEventStrategy,
40 pub enable_watermarks: bool,
42 pub watermark_interval: Duration,
44 pub allowed_out_of_orderness: Duration,
46 pub enable_sequence_tracking: bool,
48 pub gap_filling_strategy: GapFillingStrategy,
50 pub enable_deduplication: bool,
52 pub deduplication_window: Duration,
54 pub emit_strategy: EmitStrategy,
56}
57
58impl Default for OutOfOrderConfig {
59 fn default() -> Self {
60 Self {
61 max_lateness: Duration::from_secs(60),
62 buffer_capacity: 10000,
63 late_event_strategy: LateEventStrategy::SideOutput,
64 enable_watermarks: true,
65 watermark_interval: Duration::from_secs(1),
66 allowed_out_of_orderness: Duration::from_secs(5),
67 enable_sequence_tracking: true,
68 gap_filling_strategy: GapFillingStrategy::Wait,
69 enable_deduplication: true,
70 deduplication_window: Duration::from_secs(60),
71 emit_strategy: EmitStrategy::Watermark,
72 }
73 }
74}
75
76#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
78pub enum LateEventStrategy {
79 Drop,
81 SideOutput,
83 Reprocess,
85 UpdateOnly,
87 Queue,
89}
90
91#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93pub enum GapFillingStrategy {
94 Wait,
96 SkipAfterTimeout(Duration),
98 Interpolate,
100 Placeholder,
102 Ignore,
104}
105
106#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
108pub enum EmitStrategy {
109 Watermark,
111 Delay(Duration),
113 BufferFull,
115 Immediate,
117 ProcessingTimeTimeout(Duration),
119}
120
121#[derive(Debug, Clone)]
123pub struct OrderedEvent {
124 pub event: StreamEvent,
126 pub event_time: DateTime<Utc>,
128 pub sequence: Option<u64>,
130 pub ingestion_time: DateTime<Utc>,
132 pub is_late: bool,
134 pub gap_before: Option<u64>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct Watermark {
141 pub timestamp: DateTime<Utc>,
143 pub last_update: DateTime<Utc>,
145 pub events_processed: u64,
147}
148
149impl Watermark {
150 pub fn new() -> Self {
152 Self {
153 timestamp: DateTime::from_timestamp(0, 0).expect("epoch timestamp should be valid"),
154 last_update: Utc::now(),
155 events_processed: 0,
156 }
157 }
158
159 pub fn update(&mut self, event_time: DateTime<Utc>, allowed_lateness: Duration) {
161 let lateness = chrono::Duration::from_std(allowed_lateness).unwrap_or_default();
163 let new_watermark = event_time - lateness;
164
165 if new_watermark > self.timestamp {
166 self.timestamp = new_watermark;
167 self.last_update = Utc::now();
168 }
169 self.events_processed += 1;
170 }
171
172 pub fn is_late(&self, event_time: DateTime<Utc>) -> bool {
174 event_time < self.timestamp
175 }
176}
177
178impl Default for Watermark {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184pub struct OutOfOrderHandler {
186 config: OutOfOrderConfig,
188 buffer: Arc<RwLock<BTreeMap<i64, VecDeque<OrderedEvent>>>>,
190 watermark: Arc<RwLock<Watermark>>,
192 late_events: Arc<RwLock<VecDeque<OrderedEvent>>>,
194 seen_events: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
196 sequence_tracker: Arc<RwLock<SequenceTracker>>,
198 stats: Arc<RwLock<OutOfOrderStats>>,
200 next_emit_time: Arc<RwLock<DateTime<Utc>>>,
202}
203
204#[derive(Debug, Default)]
206pub struct SequenceTracker {
207 expected_sequence: u64,
209 highest_seen: u64,
211 missing: Vec<u64>,
213 gaps_detected: u64,
215 gaps_filled: u64,
217}
218
219impl SequenceTracker {
220 pub fn track(&mut self, sequence: u64) -> Option<u64> {
222 let gap = if sequence > self.expected_sequence {
223 let gap_size = sequence - self.expected_sequence;
224 for seq in self.expected_sequence..sequence {
225 self.missing.push(seq);
226 }
227 self.gaps_detected += 1;
228 Some(gap_size)
229 } else {
230 if let Some(pos) = self.missing.iter().position(|&s| s == sequence) {
232 self.missing.remove(pos);
233 self.gaps_filled += 1;
234 }
235 None
236 };
237
238 if sequence >= self.expected_sequence {
239 self.expected_sequence = sequence + 1;
240 }
241 if sequence > self.highest_seen {
242 self.highest_seen = sequence;
243 }
244
245 gap
246 }
247
248 pub fn get_missing(&self) -> &[u64] {
250 &self.missing
251 }
252}
253
254#[derive(Debug, Clone, Default, Serialize, Deserialize)]
256pub struct OutOfOrderStats {
257 pub total_events: u64,
259 pub ordered_events: u64,
261 pub late_events: u64,
263 pub late_events_dropped: u64,
265 pub late_events_reprocessed: u64,
267 pub duplicates_detected: u64,
269 pub gaps_detected: u64,
271 pub gaps_filled: u64,
273 pub buffer_size: usize,
275 pub max_buffer_size: usize,
277 pub avg_lateness_ms: f64,
279 pub max_lateness_ms: i64,
281 pub current_watermark: DateTime<Utc>,
283 pub events_per_second: f64,
285}
286
287impl OutOfOrderHandler {
288 pub fn new(config: OutOfOrderConfig) -> Self {
290 Self {
291 config,
292 buffer: Arc::new(RwLock::new(BTreeMap::new())),
293 watermark: Arc::new(RwLock::new(Watermark::new())),
294 late_events: Arc::new(RwLock::new(VecDeque::new())),
295 seen_events: Arc::new(RwLock::new(HashMap::new())),
296 sequence_tracker: Arc::new(RwLock::new(SequenceTracker::default())),
297 stats: Arc::new(RwLock::new(OutOfOrderStats::default())),
298 next_emit_time: Arc::new(RwLock::new(Utc::now())),
299 }
300 }
301
302 pub async fn add_event(&self, event: StreamEvent) -> Result<Vec<OrderedEvent>> {
304 let event_time = self.get_event_time(&event);
305 let event_id = self.get_event_id(&event);
306 let ingestion_time = Utc::now();
307
308 if self.config.enable_deduplication {
310 let mut seen = self.seen_events.write().await;
311 if let Some(_first_seen) = seen.get(&event_id) {
312 let mut stats = self.stats.write().await;
313 stats.duplicates_detected += 1;
314 debug!("Duplicate event detected: {}", event_id);
315 return Ok(Vec::new());
316 }
317 seen.insert(event_id.clone(), ingestion_time);
318
319 let cutoff = ingestion_time
321 - chrono::Duration::from_std(self.config.deduplication_window).unwrap_or_default();
322 seen.retain(|_, time| *time > cutoff);
323 }
324
325 let is_late = if self.config.enable_watermarks {
327 let mut watermark = self.watermark.write().await;
328 let late = watermark.is_late(event_time);
329 watermark.update(event_time, self.config.allowed_out_of_orderness);
330 late
331 } else {
332 false
333 };
334
335 let (sequence, gap_before) = if self.config.enable_sequence_tracking {
337 if let Some(seq) = self.get_sequence(&event) {
338 let mut tracker = self.sequence_tracker.write().await;
339 let gap = tracker.track(seq);
340 (Some(seq), gap)
341 } else {
342 (None, None)
343 }
344 } else {
345 (None, None)
346 };
347
348 let ordered_event = OrderedEvent {
350 event,
351 event_time,
352 sequence,
353 ingestion_time,
354 is_late,
355 gap_before,
356 };
357
358 {
360 let mut stats = self.stats.write().await;
361 stats.total_events += 1;
362
363 let lateness_ms = (ingestion_time - event_time).num_milliseconds();
364 stats.avg_lateness_ms = (stats.avg_lateness_ms * (stats.total_events - 1) as f64
365 + lateness_ms as f64)
366 / stats.total_events as f64;
367 stats.max_lateness_ms = stats.max_lateness_ms.max(lateness_ms);
368
369 if is_late {
370 stats.late_events += 1;
371 }
372 if gap_before.is_some() {
373 stats.gaps_detected += 1;
374 }
375 }
376
377 if is_late {
379 return self.handle_late_event(ordered_event).await;
380 }
381
382 {
384 let mut buffer = self.buffer.write().await;
385 let timestamp_key = event_time.timestamp_millis();
386 buffer
387 .entry(timestamp_key)
388 .or_insert_with(VecDeque::new)
389 .push_back(ordered_event);
390
391 let mut stats = self.stats.write().await;
392 let total_size: usize = buffer.values().map(|v| v.len()).sum();
393 stats.buffer_size = total_size;
394 stats.max_buffer_size = stats.max_buffer_size.max(total_size);
395 }
396
397 self.emit_ready_events().await
399 }
400
401 async fn handle_late_event(&self, event: OrderedEvent) -> Result<Vec<OrderedEvent>> {
403 match &self.config.late_event_strategy {
404 LateEventStrategy::Drop => {
405 let mut stats = self.stats.write().await;
406 stats.late_events_dropped += 1;
407 debug!("Dropped late event: {:?}", event.event_time);
408 Ok(Vec::new())
409 }
410 LateEventStrategy::SideOutput => {
411 let mut late_events = self.late_events.write().await;
412 late_events.push_back(event);
413
414 while late_events.len() > self.config.buffer_capacity / 10 {
416 late_events.pop_front();
417 }
418
419 Ok(Vec::new())
420 }
421 LateEventStrategy::Reprocess => {
422 let mut stats = self.stats.write().await;
423 stats.late_events_reprocessed += 1;
424 Ok(vec![event])
425 }
426 LateEventStrategy::UpdateOnly => {
427 let mut stats = self.stats.write().await;
428 stats.late_events_reprocessed += 1;
429 Ok(vec![event])
430 }
431 LateEventStrategy::Queue => {
432 let mut late_events = self.late_events.write().await;
433 late_events.push_back(event);
434 Ok(Vec::new())
435 }
436 }
437 }
438
439 async fn emit_ready_events(&self) -> Result<Vec<OrderedEvent>> {
441 match &self.config.emit_strategy {
442 EmitStrategy::Watermark => self.emit_before_watermark().await,
443 EmitStrategy::Delay(delay) => self.emit_after_delay(*delay).await,
444 EmitStrategy::BufferFull => self.emit_if_buffer_full().await,
445 EmitStrategy::Immediate => self.emit_oldest().await,
446 EmitStrategy::ProcessingTimeTimeout(timeout) => self.emit_after_timeout(*timeout).await,
447 }
448 }
449
450 async fn emit_before_watermark(&self) -> Result<Vec<OrderedEvent>> {
452 let watermark = self.watermark.read().await.timestamp;
453 let watermark_key = watermark.timestamp_millis();
454
455 let mut buffer = self.buffer.write().await;
456 let mut to_emit = Vec::new();
457
458 let keys_to_remove: Vec<i64> = buffer.range(..watermark_key).map(|(k, _)| *k).collect();
460
461 for key in keys_to_remove {
462 if let Some(events) = buffer.remove(&key) {
463 to_emit.extend(events);
464 }
465 }
466
467 let mut stats = self.stats.write().await;
469 stats.ordered_events += to_emit.len() as u64;
470 stats.buffer_size = buffer.values().map(|v| v.len()).sum();
471 stats.current_watermark = watermark;
472
473 Ok(to_emit)
474 }
475
476 async fn emit_after_delay(&self, delay: Duration) -> Result<Vec<OrderedEvent>> {
478 let cutoff = Utc::now() - chrono::Duration::from_std(delay).unwrap_or_default();
479 let cutoff_key = cutoff.timestamp_millis();
480
481 let mut buffer = self.buffer.write().await;
482 let mut to_emit = Vec::new();
483
484 let keys_to_remove: Vec<i64> = buffer.range(..cutoff_key).map(|(k, _)| *k).collect();
485
486 for key in keys_to_remove {
487 if let Some(events) = buffer.remove(&key) {
488 to_emit.extend(events);
489 }
490 }
491
492 let mut stats = self.stats.write().await;
493 stats.ordered_events += to_emit.len() as u64;
494 stats.buffer_size = buffer.values().map(|v| v.len()).sum();
495
496 Ok(to_emit)
497 }
498
499 async fn emit_if_buffer_full(&self) -> Result<Vec<OrderedEvent>> {
501 let buffer = self.buffer.read().await;
502 let size: usize = buffer.values().map(|v| v.len()).sum();
503
504 if size >= self.config.buffer_capacity {
505 drop(buffer);
506 let to_emit_count = self.config.buffer_capacity / 10;
508 self.emit_n_oldest(to_emit_count).await
509 } else {
510 Ok(Vec::new())
511 }
512 }
513
514 async fn emit_oldest(&self) -> Result<Vec<OrderedEvent>> {
516 self.emit_n_oldest(1).await
517 }
518
519 async fn emit_n_oldest(&self, n: usize) -> Result<Vec<OrderedEvent>> {
521 let mut buffer = self.buffer.write().await;
522 let mut to_emit = Vec::new();
523 let mut remaining = n;
524
525 while remaining > 0 {
526 if let Some(first_key) = buffer.keys().next().copied() {
527 if let Some(events) = buffer.get_mut(&first_key) {
528 while remaining > 0 && !events.is_empty() {
529 if let Some(event) = events.pop_front() {
530 to_emit.push(event);
531 remaining -= 1;
532 }
533 }
534 if events.is_empty() {
535 buffer.remove(&first_key);
536 }
537 }
538 } else {
539 break;
540 }
541 }
542
543 let mut stats = self.stats.write().await;
544 stats.ordered_events += to_emit.len() as u64;
545 stats.buffer_size = buffer.values().map(|v| v.len()).sum();
546
547 Ok(to_emit)
548 }
549
550 async fn emit_after_timeout(&self, timeout: Duration) -> Result<Vec<OrderedEvent>> {
552 let now = Utc::now();
553 let mut next_emit = self.next_emit_time.write().await;
554
555 if now >= *next_emit {
556 *next_emit = now + chrono::Duration::from_std(timeout).unwrap_or_default();
557 drop(next_emit);
558
559 let mut buffer = self.buffer.write().await;
561 let mut to_emit = Vec::new();
562
563 for (_, events) in buffer.iter_mut() {
564 to_emit.extend(events.drain(..));
565 }
566 buffer.clear();
567
568 let mut stats = self.stats.write().await;
569 stats.ordered_events += to_emit.len() as u64;
570 stats.buffer_size = 0;
571
572 Ok(to_emit)
573 } else {
574 Ok(Vec::new())
575 }
576 }
577
578 fn get_event_time(&self, event: &StreamEvent) -> DateTime<Utc> {
580 match event {
581 StreamEvent::TripleAdded { metadata, .. }
582 | StreamEvent::TripleRemoved { metadata, .. }
583 | StreamEvent::GraphCreated { metadata, .. }
584 | StreamEvent::GraphDeleted { metadata, .. }
585 | StreamEvent::TransactionBegin { metadata, .. }
586 | StreamEvent::TransactionCommit { metadata, .. }
587 | StreamEvent::TransactionAbort { metadata, .. }
588 | StreamEvent::Heartbeat { metadata, .. }
589 | StreamEvent::SparqlUpdate { metadata, .. }
590 | StreamEvent::SchemaChanged { metadata, .. } => metadata.timestamp,
591 _ => Utc::now(),
592 }
593 }
594
595 fn get_event_id(&self, event: &StreamEvent) -> String {
597 match event {
598 StreamEvent::TripleAdded { metadata, .. }
599 | StreamEvent::TripleRemoved { metadata, .. }
600 | StreamEvent::GraphCreated { metadata, .. }
601 | StreamEvent::GraphDeleted { metadata, .. }
602 | StreamEvent::TransactionBegin { metadata, .. }
603 | StreamEvent::TransactionCommit { metadata, .. }
604 | StreamEvent::TransactionAbort { metadata, .. }
605 | StreamEvent::Heartbeat { metadata, .. }
606 | StreamEvent::SparqlUpdate { metadata, .. }
607 | StreamEvent::SchemaChanged { metadata, .. } => metadata.event_id.clone(),
608 _ => uuid::Uuid::new_v4().to_string(),
609 }
610 }
611
612 fn get_sequence(&self, event: &StreamEvent) -> Option<u64> {
614 match event {
615 StreamEvent::TripleAdded { metadata, .. }
616 | StreamEvent::TripleRemoved { metadata, .. }
617 | StreamEvent::Heartbeat { metadata, .. } => metadata
618 .properties
619 .get("sequence")
620 .and_then(|s| s.parse().ok()),
621 _ => None,
622 }
623 }
624
625 pub async fn flush(&self) -> Result<Vec<OrderedEvent>> {
627 let mut buffer = self.buffer.write().await;
628 let mut to_emit = Vec::new();
629
630 for (_, events) in buffer.iter_mut() {
631 to_emit.extend(events.drain(..));
632 }
633 buffer.clear();
634
635 let mut stats = self.stats.write().await;
636 stats.ordered_events += to_emit.len() as u64;
637 stats.buffer_size = 0;
638
639 info!("Flushed {} events from out-of-order buffer", to_emit.len());
640
641 Ok(to_emit)
642 }
643
644 pub async fn get_late_events(&self) -> Vec<OrderedEvent> {
646 let late_events = self.late_events.read().await;
647 late_events.iter().cloned().collect()
648 }
649
650 pub async fn clear_late_events(&self) {
652 let mut late_events = self.late_events.write().await;
653 late_events.clear();
654 }
655
656 pub async fn get_watermark(&self) -> Watermark {
658 self.watermark.read().await.clone()
659 }
660
661 pub async fn get_stats(&self) -> OutOfOrderStats {
663 self.stats.read().await.clone()
664 }
665
666 pub async fn get_missing_sequences(&self) -> Vec<u64> {
668 let tracker = self.sequence_tracker.read().await;
669 tracker.get_missing().to_vec()
670 }
671
672 pub async fn reset(&self) {
674 self.buffer.write().await.clear();
675 self.late_events.write().await.clear();
676 self.seen_events.write().await.clear();
677 *self.watermark.write().await = Watermark::new();
678 *self.sequence_tracker.write().await = SequenceTracker::default();
679 *self.stats.write().await = OutOfOrderStats::default();
680
681 info!("Out-of-order handler reset");
682 }
683}
684
685pub struct OutOfOrderHandlerBuilder {
687 config: OutOfOrderConfig,
688}
689
690impl OutOfOrderHandlerBuilder {
691 pub fn new() -> Self {
693 Self {
694 config: OutOfOrderConfig::default(),
695 }
696 }
697
698 pub fn max_lateness(mut self, duration: Duration) -> Self {
700 self.config.max_lateness = duration;
701 self
702 }
703
704 pub fn buffer_capacity(mut self, capacity: usize) -> Self {
706 self.config.buffer_capacity = capacity;
707 self
708 }
709
710 pub fn late_event_strategy(mut self, strategy: LateEventStrategy) -> Self {
712 self.config.late_event_strategy = strategy;
713 self
714 }
715
716 pub fn allowed_out_of_orderness(mut self, duration: Duration) -> Self {
718 self.config.allowed_out_of_orderness = duration;
719 self
720 }
721
722 pub fn emit_strategy(mut self, strategy: EmitStrategy) -> Self {
724 self.config.emit_strategy = strategy;
725 self
726 }
727
728 pub fn with_deduplication(mut self, window: Duration) -> Self {
730 self.config.enable_deduplication = true;
731 self.config.deduplication_window = window;
732 self
733 }
734
735 pub fn with_sequence_tracking(mut self) -> Self {
737 self.config.enable_sequence_tracking = true;
738 self
739 }
740
741 pub fn build(self) -> OutOfOrderHandler {
743 OutOfOrderHandler::new(self.config)
744 }
745}
746
747impl Default for OutOfOrderHandlerBuilder {
748 fn default() -> Self {
749 Self::new()
750 }
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use crate::event::EventMetadata;
757 use std::collections::HashMap;
758 use uuid::Uuid;
759
760 fn create_test_event(timestamp: DateTime<Utc>, sequence: Option<u64>) -> StreamEvent {
761 let mut properties = HashMap::new();
762 if let Some(seq) = sequence {
763 properties.insert("sequence".to_string(), seq.to_string());
764 }
765
766 StreamEvent::TripleAdded {
767 subject: "test:subject".to_string(),
768 predicate: "test:predicate".to_string(),
769 object: "test:object".to_string(),
770 graph: None,
771 metadata: EventMetadata {
772 event_id: Uuid::new_v4().to_string(),
773 timestamp,
774 source: "test".to_string(),
775 user: None,
776 context: None,
777 caused_by: None,
778 version: "1.0".to_string(),
779 properties,
780 checksum: None,
781 },
782 }
783 }
784
785 #[tokio::test]
786 async fn test_handler_creation() {
787 let handler = OutOfOrderHandler::new(OutOfOrderConfig::default());
788 let stats = handler.get_stats().await;
789 assert_eq!(stats.total_events, 0);
790 }
791
792 #[tokio::test]
793 async fn test_add_event() {
794 let handler = OutOfOrderHandlerBuilder::new()
795 .emit_strategy(EmitStrategy::Immediate)
796 .build();
797
798 let event = create_test_event(Utc::now(), Some(1));
799 let emitted = handler.add_event(event).await.unwrap();
800
801 assert_eq!(emitted.len(), 1);
802 let stats = handler.get_stats().await;
803 assert_eq!(stats.total_events, 1);
804 }
805
806 #[tokio::test]
807 async fn test_watermark_update() {
808 let handler = OutOfOrderHandler::new(OutOfOrderConfig::default());
809
810 let now = Utc::now();
811 let event = create_test_event(now, None);
812 handler.add_event(event).await.unwrap();
813
814 let watermark = handler.get_watermark().await;
815 assert!(watermark.events_processed > 0);
816 }
817
818 #[tokio::test]
819 async fn test_late_event_detection() {
820 let config = OutOfOrderConfig {
821 late_event_strategy: LateEventStrategy::SideOutput,
822 allowed_out_of_orderness: Duration::from_secs(1),
823 ..Default::default()
824 };
825 let handler = OutOfOrderHandler::new(config);
826
827 let now = Utc::now();
829 let event = create_test_event(now, None);
830 handler.add_event(event).await.unwrap();
831
832 let old_time = now - chrono::Duration::seconds(10);
834 let late_event = create_test_event(old_time, None);
835 handler.add_event(late_event).await.unwrap();
836
837 let stats = handler.get_stats().await;
838 assert!(stats.late_events > 0);
839 }
840
841 #[tokio::test]
842 async fn test_deduplication() {
843 let config = OutOfOrderConfig {
844 enable_deduplication: true,
845 deduplication_window: Duration::from_secs(60),
846 ..Default::default()
847 };
848 let handler = OutOfOrderHandler::new(config);
849
850 let event = create_test_event(Utc::now(), None);
851 let event_clone = event.clone();
852
853 handler.add_event(event).await.unwrap();
854 handler.add_event(event_clone).await.unwrap();
855
856 let stats = handler.get_stats().await;
857 assert_eq!(stats.duplicates_detected, 1);
858 }
859
860 #[tokio::test]
861 async fn test_sequence_tracking() {
862 let handler = OutOfOrderHandlerBuilder::new()
863 .with_sequence_tracking()
864 .emit_strategy(EmitStrategy::Immediate)
865 .build();
866
867 let now = Utc::now();
869 handler
870 .add_event(create_test_event(now, Some(1)))
871 .await
872 .unwrap();
873 handler
874 .add_event(create_test_event(now, Some(5)))
875 .await
876 .unwrap(); let missing = handler.get_missing_sequences().await;
879 assert!(!missing.is_empty());
880 assert!(missing.contains(&2));
881 assert!(missing.contains(&3));
882 assert!(missing.contains(&4));
883 }
884
885 #[tokio::test]
886 async fn test_flush() {
887 let handler = OutOfOrderHandlerBuilder::new()
888 .emit_strategy(EmitStrategy::Watermark)
889 .build();
890
891 let now = Utc::now();
892 handler
893 .add_event(create_test_event(now, Some(1)))
894 .await
895 .unwrap();
896 handler
897 .add_event(create_test_event(now, Some(2)))
898 .await
899 .unwrap();
900
901 let flushed = handler.flush().await.unwrap();
902 assert!(flushed.len() >= 2);
903
904 let stats = handler.get_stats().await;
905 assert_eq!(stats.buffer_size, 0);
906 }
907
908 #[tokio::test]
909 async fn test_emit_strategy_delay() {
910 let handler = OutOfOrderHandlerBuilder::new()
911 .emit_strategy(EmitStrategy::Delay(Duration::from_millis(100)))
912 .build();
913
914 let now = Utc::now();
915 handler
916 .add_event(create_test_event(now, Some(1)))
917 .await
918 .unwrap();
919
920 tokio::time::sleep(Duration::from_millis(150)).await;
922 let emitted = handler.emit_ready_events().await.unwrap();
923 assert!(!emitted.is_empty());
924 }
925
926 #[tokio::test]
927 async fn test_buffer_full_emit() {
928 let handler = OutOfOrderHandlerBuilder::new()
929 .buffer_capacity(10)
930 .emit_strategy(EmitStrategy::BufferFull)
931 .build();
932
933 let now = Utc::now();
934 for i in 0..15 {
935 let time = now + chrono::Duration::milliseconds(i);
936 handler
937 .add_event(create_test_event(time, Some(i as u64)))
938 .await
939 .unwrap();
940 }
941
942 let stats = handler.get_stats().await;
943 assert!(stats.ordered_events > 0);
944 }
945
946 #[tokio::test]
947 async fn test_late_event_strategies() {
948 let handler = OutOfOrderHandlerBuilder::new()
950 .late_event_strategy(LateEventStrategy::Drop)
951 .allowed_out_of_orderness(Duration::from_secs(1))
952 .emit_strategy(EmitStrategy::Immediate)
953 .build();
954
955 let now = Utc::now();
956 handler
957 .add_event(create_test_event(now, None))
958 .await
959 .unwrap();
960
961 let old = now - chrono::Duration::seconds(100);
962 let result = handler
963 .add_event(create_test_event(old, None))
964 .await
965 .unwrap();
966 assert!(result.is_empty());
967
968 let stats = handler.get_stats().await;
969 assert_eq!(stats.late_events_dropped, 1);
970 }
971
972 #[tokio::test]
973 async fn test_reset() {
974 let handler = OutOfOrderHandler::new(OutOfOrderConfig::default());
975
976 let event = create_test_event(Utc::now(), Some(1));
977 handler.add_event(event).await.unwrap();
978
979 handler.reset().await;
980
981 let stats = handler.get_stats().await;
982 assert_eq!(stats.total_events, 0);
983 assert_eq!(stats.buffer_size, 0);
984 }
985
986 #[tokio::test]
987 async fn test_ordered_emission() {
988 let handler = OutOfOrderHandlerBuilder::new()
989 .emit_strategy(EmitStrategy::Delay(Duration::from_millis(50)))
990 .build();
991
992 let base = Utc::now();
994 handler
995 .add_event(create_test_event(
996 base + chrono::Duration::milliseconds(30),
997 Some(3),
998 ))
999 .await
1000 .unwrap();
1001 handler
1002 .add_event(create_test_event(
1003 base + chrono::Duration::milliseconds(10),
1004 Some(1),
1005 ))
1006 .await
1007 .unwrap();
1008 handler
1009 .add_event(create_test_event(
1010 base + chrono::Duration::milliseconds(20),
1011 Some(2),
1012 ))
1013 .await
1014 .unwrap();
1015
1016 tokio::time::sleep(Duration::from_millis(100)).await;
1018 let emitted = handler.emit_ready_events().await.unwrap();
1019
1020 assert_eq!(emitted.len(), 3);
1022 for i in 0..emitted.len() - 1 {
1023 assert!(emitted[i].event_time <= emitted[i + 1].event_time);
1024 }
1025 }
1026}