oxirs_stream/
out_of_order.rs

1//! # Out-of-Order Event Handling Optimization
2//!
3//! This module provides advanced out-of-order event handling capabilities
4//! for stream processing, ensuring correct event ordering even when events
5//! arrive with varying delays.
6//!
7//! ## Features
8//! - Event reordering buffers with configurable capacity
9//! - Watermark-based late data handling
10//! - Multiple strategies for late event processing
11//! - Sequence number tracking and gap detection
12//! - Configurable lateness tolerances
13//! - Performance-optimized sorting algorithms
14//!
15//! ## Performance
16//! - O(log n) insertion for sorted buffer
17//! - Constant-time watermark updates
18//! - Memory-efficient event storage
19
20use anyhow::Result;
21use chrono::{DateTime, Utc};
22use serde::{Deserialize, Serialize};
23use std::collections::{BTreeMap, HashMap, VecDeque};
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::sync::RwLock;
27use tracing::{debug, info};
28
29use crate::event::StreamEvent;
30
31/// Configuration for out-of-order handling
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct OutOfOrderConfig {
34    /// Maximum time to wait for late events
35    pub max_lateness: Duration,
36    /// Buffer capacity (number of events)
37    pub buffer_capacity: usize,
38    /// Strategy for handling late events
39    pub late_event_strategy: LateEventStrategy,
40    /// Enable watermark tracking
41    pub enable_watermarks: bool,
42    /// Watermark update interval
43    pub watermark_interval: Duration,
44    /// Allowed out-of-orderness
45    pub allowed_out_of_orderness: Duration,
46    /// Enable sequence number tracking
47    pub enable_sequence_tracking: bool,
48    /// Gap filling strategy
49    pub gap_filling_strategy: GapFillingStrategy,
50    /// Enable event deduplication
51    pub enable_deduplication: bool,
52    /// Deduplication window
53    pub deduplication_window: Duration,
54    /// Emit strategy
55    pub emit_strategy: EmitStrategy,
56}
57
58impl Default for OutOfOrderConfig {
59    fn default() -> Self {
60        Self {
61            max_lateness: Duration::from_secs(60),
62            buffer_capacity: 10000,
63            late_event_strategy: LateEventStrategy::SideOutput,
64            enable_watermarks: true,
65            watermark_interval: Duration::from_secs(1),
66            allowed_out_of_orderness: Duration::from_secs(5),
67            enable_sequence_tracking: true,
68            gap_filling_strategy: GapFillingStrategy::Wait,
69            enable_deduplication: true,
70            deduplication_window: Duration::from_secs(60),
71            emit_strategy: EmitStrategy::Watermark,
72        }
73    }
74}
75
76/// Strategy for handling late events
77#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
78pub enum LateEventStrategy {
79    /// Drop late events silently
80    Drop,
81    /// Send to side output
82    SideOutput,
83    /// Reprocess with updated state
84    Reprocess,
85    /// Update aggregates without full reprocess
86    UpdateOnly,
87    /// Queue for manual review
88    Queue,
89}
90
91/// Strategy for filling gaps in sequences
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93pub enum GapFillingStrategy {
94    /// Wait for missing events
95    Wait,
96    /// Skip gaps after timeout
97    SkipAfterTimeout(Duration),
98    /// Interpolate missing events
99    Interpolate,
100    /// Emit placeholder events
101    Placeholder,
102    /// Ignore gaps
103    Ignore,
104}
105
106/// Strategy for emitting events
107#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
108pub enum EmitStrategy {
109    /// Emit when watermark advances
110    Watermark,
111    /// Emit after fixed delay
112    Delay(Duration),
113    /// Emit when buffer is full
114    BufferFull,
115    /// Emit immediately with possible reordering
116    Immediate,
117    /// Emit after processing-time timeout
118    ProcessingTimeTimeout(Duration),
119}
120
121/// Event with ordering information
122#[derive(Debug, Clone)]
123pub struct OrderedEvent {
124    /// Original event
125    pub event: StreamEvent,
126    /// Event timestamp
127    pub event_time: DateTime<Utc>,
128    /// Sequence number
129    pub sequence: Option<u64>,
130    /// Ingestion time
131    pub ingestion_time: DateTime<Utc>,
132    /// Is this a late event
133    pub is_late: bool,
134    /// Gap before this event
135    pub gap_before: Option<u64>,
136}
137
138/// Watermark for tracking event time progress
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct Watermark {
141    /// Current watermark timestamp
142    pub timestamp: DateTime<Utc>,
143    /// Last update time
144    pub last_update: DateTime<Utc>,
145    /// Number of events processed
146    pub events_processed: u64,
147}
148
149impl Watermark {
150    /// Create a new watermark
151    pub fn new() -> Self {
152        Self {
153            timestamp: DateTime::from_timestamp(0, 0).unwrap(),
154            last_update: Utc::now(),
155            events_processed: 0,
156        }
157    }
158
159    /// Update watermark with new event time
160    pub fn update(&mut self, event_time: DateTime<Utc>, allowed_lateness: Duration) {
161        // Watermark = event_time - allowed_lateness
162        let lateness = chrono::Duration::from_std(allowed_lateness).unwrap_or_default();
163        let new_watermark = event_time - lateness;
164
165        if new_watermark > self.timestamp {
166            self.timestamp = new_watermark;
167            self.last_update = Utc::now();
168        }
169        self.events_processed += 1;
170    }
171
172    /// Check if event is late
173    pub fn is_late(&self, event_time: DateTime<Utc>) -> bool {
174        event_time < self.timestamp
175    }
176}
177
178impl Default for Watermark {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184/// Out-of-order event handler
185pub struct OutOfOrderHandler {
186    /// Configuration
187    config: OutOfOrderConfig,
188    /// Event buffer (ordered by event time)
189    buffer: Arc<RwLock<BTreeMap<i64, VecDeque<OrderedEvent>>>>,
190    /// Current watermark
191    watermark: Arc<RwLock<Watermark>>,
192    /// Late events buffer
193    late_events: Arc<RwLock<VecDeque<OrderedEvent>>>,
194    /// Deduplication set
195    seen_events: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
196    /// Sequence tracker
197    sequence_tracker: Arc<RwLock<SequenceTracker>>,
198    /// Statistics
199    stats: Arc<RwLock<OutOfOrderStats>>,
200    /// Next emit time
201    next_emit_time: Arc<RwLock<DateTime<Utc>>>,
202}
203
204/// Sequence tracker for gap detection
205#[derive(Debug, Default)]
206pub struct SequenceTracker {
207    /// Expected next sequence
208    expected_sequence: u64,
209    /// Highest seen sequence
210    highest_seen: u64,
211    /// Missing sequences
212    missing: Vec<u64>,
213    /// Gaps detected
214    gaps_detected: u64,
215    /// Gaps filled
216    gaps_filled: u64,
217}
218
219impl SequenceTracker {
220    /// Track a sequence number
221    pub fn track(&mut self, sequence: u64) -> Option<u64> {
222        let gap = if sequence > self.expected_sequence {
223            let gap_size = sequence - self.expected_sequence;
224            for seq in self.expected_sequence..sequence {
225                self.missing.push(seq);
226            }
227            self.gaps_detected += 1;
228            Some(gap_size)
229        } else {
230            // Check if this fills a gap
231            if let Some(pos) = self.missing.iter().position(|&s| s == sequence) {
232                self.missing.remove(pos);
233                self.gaps_filled += 1;
234            }
235            None
236        };
237
238        if sequence >= self.expected_sequence {
239            self.expected_sequence = sequence + 1;
240        }
241        if sequence > self.highest_seen {
242            self.highest_seen = sequence;
243        }
244
245        gap
246    }
247
248    /// Get missing sequences
249    pub fn get_missing(&self) -> &[u64] {
250        &self.missing
251    }
252}
253
254/// Statistics for out-of-order handling
255#[derive(Debug, Clone, Default, Serialize, Deserialize)]
256pub struct OutOfOrderStats {
257    /// Total events processed
258    pub total_events: u64,
259    /// Events emitted in order
260    pub ordered_events: u64,
261    /// Late events detected
262    pub late_events: u64,
263    /// Late events dropped
264    pub late_events_dropped: u64,
265    /// Late events reprocessed
266    pub late_events_reprocessed: u64,
267    /// Duplicates detected
268    pub duplicates_detected: u64,
269    /// Gaps detected
270    pub gaps_detected: u64,
271    /// Gaps filled
272    pub gaps_filled: u64,
273    /// Current buffer size
274    pub buffer_size: usize,
275    /// Max buffer size reached
276    pub max_buffer_size: usize,
277    /// Average lateness
278    pub avg_lateness_ms: f64,
279    /// Max lateness
280    pub max_lateness_ms: i64,
281    /// Current watermark
282    pub current_watermark: DateTime<Utc>,
283    /// Events per second
284    pub events_per_second: f64,
285}
286
287impl OutOfOrderHandler {
288    /// Create a new out-of-order handler
289    pub fn new(config: OutOfOrderConfig) -> Self {
290        Self {
291            config,
292            buffer: Arc::new(RwLock::new(BTreeMap::new())),
293            watermark: Arc::new(RwLock::new(Watermark::new())),
294            late_events: Arc::new(RwLock::new(VecDeque::new())),
295            seen_events: Arc::new(RwLock::new(HashMap::new())),
296            sequence_tracker: Arc::new(RwLock::new(SequenceTracker::default())),
297            stats: Arc::new(RwLock::new(OutOfOrderStats::default())),
298            next_emit_time: Arc::new(RwLock::new(Utc::now())),
299        }
300    }
301
302    /// Add an event to the handler
303    pub async fn add_event(&self, event: StreamEvent) -> Result<Vec<OrderedEvent>> {
304        let event_time = self.get_event_time(&event);
305        let event_id = self.get_event_id(&event);
306        let ingestion_time = Utc::now();
307
308        // Check for duplicates
309        if self.config.enable_deduplication {
310            let mut seen = self.seen_events.write().await;
311            if let Some(_first_seen) = seen.get(&event_id) {
312                let mut stats = self.stats.write().await;
313                stats.duplicates_detected += 1;
314                debug!("Duplicate event detected: {}", event_id);
315                return Ok(Vec::new());
316            }
317            seen.insert(event_id.clone(), ingestion_time);
318
319            // Clean old entries
320            let cutoff = ingestion_time
321                - chrono::Duration::from_std(self.config.deduplication_window).unwrap_or_default();
322            seen.retain(|_, time| *time > cutoff);
323        }
324
325        // Update watermark
326        let is_late = if self.config.enable_watermarks {
327            let mut watermark = self.watermark.write().await;
328            let late = watermark.is_late(event_time);
329            watermark.update(event_time, self.config.allowed_out_of_orderness);
330            late
331        } else {
332            false
333        };
334
335        // Track sequence
336        let (sequence, gap_before) = if self.config.enable_sequence_tracking {
337            if let Some(seq) = self.get_sequence(&event) {
338                let mut tracker = self.sequence_tracker.write().await;
339                let gap = tracker.track(seq);
340                (Some(seq), gap)
341            } else {
342                (None, None)
343            }
344        } else {
345            (None, None)
346        };
347
348        // Create ordered event
349        let ordered_event = OrderedEvent {
350            event,
351            event_time,
352            sequence,
353            ingestion_time,
354            is_late,
355            gap_before,
356        };
357
358        // Update statistics
359        {
360            let mut stats = self.stats.write().await;
361            stats.total_events += 1;
362
363            let lateness_ms = (ingestion_time - event_time).num_milliseconds();
364            stats.avg_lateness_ms = (stats.avg_lateness_ms * (stats.total_events - 1) as f64
365                + lateness_ms as f64)
366                / stats.total_events as f64;
367            stats.max_lateness_ms = stats.max_lateness_ms.max(lateness_ms);
368
369            if is_late {
370                stats.late_events += 1;
371            }
372            if gap_before.is_some() {
373                stats.gaps_detected += 1;
374            }
375        }
376
377        // Handle late event
378        if is_late {
379            return self.handle_late_event(ordered_event).await;
380        }
381
382        // Add to buffer
383        {
384            let mut buffer = self.buffer.write().await;
385            let timestamp_key = event_time.timestamp_millis();
386            buffer
387                .entry(timestamp_key)
388                .or_insert_with(VecDeque::new)
389                .push_back(ordered_event);
390
391            let mut stats = self.stats.write().await;
392            let total_size: usize = buffer.values().map(|v| v.len()).sum();
393            stats.buffer_size = total_size;
394            stats.max_buffer_size = stats.max_buffer_size.max(total_size);
395        }
396
397        // Emit events if ready
398        self.emit_ready_events().await
399    }
400
401    /// Handle a late event based on configured strategy
402    async fn handle_late_event(&self, event: OrderedEvent) -> Result<Vec<OrderedEvent>> {
403        match &self.config.late_event_strategy {
404            LateEventStrategy::Drop => {
405                let mut stats = self.stats.write().await;
406                stats.late_events_dropped += 1;
407                debug!("Dropped late event: {:?}", event.event_time);
408                Ok(Vec::new())
409            }
410            LateEventStrategy::SideOutput => {
411                let mut late_events = self.late_events.write().await;
412                late_events.push_back(event);
413
414                // Trim if too many
415                while late_events.len() > self.config.buffer_capacity / 10 {
416                    late_events.pop_front();
417                }
418
419                Ok(Vec::new())
420            }
421            LateEventStrategy::Reprocess => {
422                let mut stats = self.stats.write().await;
423                stats.late_events_reprocessed += 1;
424                Ok(vec![event])
425            }
426            LateEventStrategy::UpdateOnly => {
427                let mut stats = self.stats.write().await;
428                stats.late_events_reprocessed += 1;
429                Ok(vec![event])
430            }
431            LateEventStrategy::Queue => {
432                let mut late_events = self.late_events.write().await;
433                late_events.push_back(event);
434                Ok(Vec::new())
435            }
436        }
437    }
438
439    /// Emit events that are ready based on emit strategy
440    async fn emit_ready_events(&self) -> Result<Vec<OrderedEvent>> {
441        match &self.config.emit_strategy {
442            EmitStrategy::Watermark => self.emit_before_watermark().await,
443            EmitStrategy::Delay(delay) => self.emit_after_delay(*delay).await,
444            EmitStrategy::BufferFull => self.emit_if_buffer_full().await,
445            EmitStrategy::Immediate => self.emit_oldest().await,
446            EmitStrategy::ProcessingTimeTimeout(timeout) => self.emit_after_timeout(*timeout).await,
447        }
448    }
449
450    /// Emit events before current watermark
451    async fn emit_before_watermark(&self) -> Result<Vec<OrderedEvent>> {
452        let watermark = self.watermark.read().await.timestamp;
453        let watermark_key = watermark.timestamp_millis();
454
455        let mut buffer = self.buffer.write().await;
456        let mut to_emit = Vec::new();
457
458        // Collect all events before watermark
459        let keys_to_remove: Vec<i64> = buffer.range(..watermark_key).map(|(k, _)| *k).collect();
460
461        for key in keys_to_remove {
462            if let Some(events) = buffer.remove(&key) {
463                to_emit.extend(events);
464            }
465        }
466
467        // Update stats
468        let mut stats = self.stats.write().await;
469        stats.ordered_events += to_emit.len() as u64;
470        stats.buffer_size = buffer.values().map(|v| v.len()).sum();
471        stats.current_watermark = watermark;
472
473        Ok(to_emit)
474    }
475
476    /// Emit events after a fixed delay
477    async fn emit_after_delay(&self, delay: Duration) -> Result<Vec<OrderedEvent>> {
478        let cutoff = Utc::now() - chrono::Duration::from_std(delay).unwrap_or_default();
479        let cutoff_key = cutoff.timestamp_millis();
480
481        let mut buffer = self.buffer.write().await;
482        let mut to_emit = Vec::new();
483
484        let keys_to_remove: Vec<i64> = buffer.range(..cutoff_key).map(|(k, _)| *k).collect();
485
486        for key in keys_to_remove {
487            if let Some(events) = buffer.remove(&key) {
488                to_emit.extend(events);
489            }
490        }
491
492        let mut stats = self.stats.write().await;
493        stats.ordered_events += to_emit.len() as u64;
494        stats.buffer_size = buffer.values().map(|v| v.len()).sum();
495
496        Ok(to_emit)
497    }
498
499    /// Emit if buffer is full
500    async fn emit_if_buffer_full(&self) -> Result<Vec<OrderedEvent>> {
501        let buffer = self.buffer.read().await;
502        let size: usize = buffer.values().map(|v| v.len()).sum();
503
504        if size >= self.config.buffer_capacity {
505            drop(buffer);
506            // Emit oldest 10%
507            let to_emit_count = self.config.buffer_capacity / 10;
508            self.emit_n_oldest(to_emit_count).await
509        } else {
510            Ok(Vec::new())
511        }
512    }
513
514    /// Emit oldest event
515    async fn emit_oldest(&self) -> Result<Vec<OrderedEvent>> {
516        self.emit_n_oldest(1).await
517    }
518
519    /// Emit N oldest events
520    async fn emit_n_oldest(&self, n: usize) -> Result<Vec<OrderedEvent>> {
521        let mut buffer = self.buffer.write().await;
522        let mut to_emit = Vec::new();
523        let mut remaining = n;
524
525        while remaining > 0 {
526            if let Some(first_key) = buffer.keys().next().copied() {
527                if let Some(events) = buffer.get_mut(&first_key) {
528                    while remaining > 0 && !events.is_empty() {
529                        if let Some(event) = events.pop_front() {
530                            to_emit.push(event);
531                            remaining -= 1;
532                        }
533                    }
534                    if events.is_empty() {
535                        buffer.remove(&first_key);
536                    }
537                }
538            } else {
539                break;
540            }
541        }
542
543        let mut stats = self.stats.write().await;
544        stats.ordered_events += to_emit.len() as u64;
545        stats.buffer_size = buffer.values().map(|v| v.len()).sum();
546
547        Ok(to_emit)
548    }
549
550    /// Emit events after processing time timeout
551    async fn emit_after_timeout(&self, timeout: Duration) -> Result<Vec<OrderedEvent>> {
552        let now = Utc::now();
553        let mut next_emit = self.next_emit_time.write().await;
554
555        if now >= *next_emit {
556            *next_emit = now + chrono::Duration::from_std(timeout).unwrap_or_default();
557            drop(next_emit);
558
559            // Emit all buffered events
560            let mut buffer = self.buffer.write().await;
561            let mut to_emit = Vec::new();
562
563            for (_, events) in buffer.iter_mut() {
564                to_emit.extend(events.drain(..));
565            }
566            buffer.clear();
567
568            let mut stats = self.stats.write().await;
569            stats.ordered_events += to_emit.len() as u64;
570            stats.buffer_size = 0;
571
572            Ok(to_emit)
573        } else {
574            Ok(Vec::new())
575        }
576    }
577
578    /// Get event time from StreamEvent
579    fn get_event_time(&self, event: &StreamEvent) -> DateTime<Utc> {
580        match event {
581            StreamEvent::TripleAdded { metadata, .. }
582            | StreamEvent::TripleRemoved { metadata, .. }
583            | StreamEvent::GraphCreated { metadata, .. }
584            | StreamEvent::GraphDeleted { metadata, .. }
585            | StreamEvent::TransactionBegin { metadata, .. }
586            | StreamEvent::TransactionCommit { metadata, .. }
587            | StreamEvent::TransactionAbort { metadata, .. }
588            | StreamEvent::Heartbeat { metadata, .. }
589            | StreamEvent::SparqlUpdate { metadata, .. }
590            | StreamEvent::SchemaChanged { metadata, .. } => metadata.timestamp,
591            _ => Utc::now(),
592        }
593    }
594
595    /// Get event ID from StreamEvent
596    fn get_event_id(&self, event: &StreamEvent) -> String {
597        match event {
598            StreamEvent::TripleAdded { metadata, .. }
599            | StreamEvent::TripleRemoved { metadata, .. }
600            | StreamEvent::GraphCreated { metadata, .. }
601            | StreamEvent::GraphDeleted { metadata, .. }
602            | StreamEvent::TransactionBegin { metadata, .. }
603            | StreamEvent::TransactionCommit { metadata, .. }
604            | StreamEvent::TransactionAbort { metadata, .. }
605            | StreamEvent::Heartbeat { metadata, .. }
606            | StreamEvent::SparqlUpdate { metadata, .. }
607            | StreamEvent::SchemaChanged { metadata, .. } => metadata.event_id.clone(),
608            _ => uuid::Uuid::new_v4().to_string(),
609        }
610    }
611
612    /// Get sequence number from StreamEvent
613    fn get_sequence(&self, event: &StreamEvent) -> Option<u64> {
614        match event {
615            StreamEvent::TripleAdded { metadata, .. }
616            | StreamEvent::TripleRemoved { metadata, .. }
617            | StreamEvent::Heartbeat { metadata, .. } => metadata
618                .properties
619                .get("sequence")
620                .and_then(|s| s.parse().ok()),
621            _ => None,
622        }
623    }
624
625    /// Flush all buffered events
626    pub async fn flush(&self) -> Result<Vec<OrderedEvent>> {
627        let mut buffer = self.buffer.write().await;
628        let mut to_emit = Vec::new();
629
630        for (_, events) in buffer.iter_mut() {
631            to_emit.extend(events.drain(..));
632        }
633        buffer.clear();
634
635        let mut stats = self.stats.write().await;
636        stats.ordered_events += to_emit.len() as u64;
637        stats.buffer_size = 0;
638
639        info!("Flushed {} events from out-of-order buffer", to_emit.len());
640
641        Ok(to_emit)
642    }
643
644    /// Get late events
645    pub async fn get_late_events(&self) -> Vec<OrderedEvent> {
646        let late_events = self.late_events.read().await;
647        late_events.iter().cloned().collect()
648    }
649
650    /// Clear late events
651    pub async fn clear_late_events(&self) {
652        let mut late_events = self.late_events.write().await;
653        late_events.clear();
654    }
655
656    /// Get current watermark
657    pub async fn get_watermark(&self) -> Watermark {
658        self.watermark.read().await.clone()
659    }
660
661    /// Get statistics
662    pub async fn get_stats(&self) -> OutOfOrderStats {
663        self.stats.read().await.clone()
664    }
665
666    /// Get missing sequences
667    pub async fn get_missing_sequences(&self) -> Vec<u64> {
668        let tracker = self.sequence_tracker.read().await;
669        tracker.get_missing().to_vec()
670    }
671
672    /// Reset handler state
673    pub async fn reset(&self) {
674        self.buffer.write().await.clear();
675        self.late_events.write().await.clear();
676        self.seen_events.write().await.clear();
677        *self.watermark.write().await = Watermark::new();
678        *self.sequence_tracker.write().await = SequenceTracker::default();
679        *self.stats.write().await = OutOfOrderStats::default();
680
681        info!("Out-of-order handler reset");
682    }
683}
684
685/// Builder for out-of-order handler
686pub struct OutOfOrderHandlerBuilder {
687    config: OutOfOrderConfig,
688}
689
690impl OutOfOrderHandlerBuilder {
691    /// Create a new builder
692    pub fn new() -> Self {
693        Self {
694            config: OutOfOrderConfig::default(),
695        }
696    }
697
698    /// Set maximum lateness
699    pub fn max_lateness(mut self, duration: Duration) -> Self {
700        self.config.max_lateness = duration;
701        self
702    }
703
704    /// Set buffer capacity
705    pub fn buffer_capacity(mut self, capacity: usize) -> Self {
706        self.config.buffer_capacity = capacity;
707        self
708    }
709
710    /// Set late event strategy
711    pub fn late_event_strategy(mut self, strategy: LateEventStrategy) -> Self {
712        self.config.late_event_strategy = strategy;
713        self
714    }
715
716    /// Set allowed out-of-orderness
717    pub fn allowed_out_of_orderness(mut self, duration: Duration) -> Self {
718        self.config.allowed_out_of_orderness = duration;
719        self
720    }
721
722    /// Set emit strategy
723    pub fn emit_strategy(mut self, strategy: EmitStrategy) -> Self {
724        self.config.emit_strategy = strategy;
725        self
726    }
727
728    /// Enable deduplication
729    pub fn with_deduplication(mut self, window: Duration) -> Self {
730        self.config.enable_deduplication = true;
731        self.config.deduplication_window = window;
732        self
733    }
734
735    /// Enable sequence tracking
736    pub fn with_sequence_tracking(mut self) -> Self {
737        self.config.enable_sequence_tracking = true;
738        self
739    }
740
741    /// Build the handler
742    pub fn build(self) -> OutOfOrderHandler {
743        OutOfOrderHandler::new(self.config)
744    }
745}
746
747impl Default for OutOfOrderHandlerBuilder {
748    fn default() -> Self {
749        Self::new()
750    }
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use crate::event::EventMetadata;
757    use std::collections::HashMap;
758    use uuid::Uuid;
759
760    fn create_test_event(timestamp: DateTime<Utc>, sequence: Option<u64>) -> StreamEvent {
761        let mut properties = HashMap::new();
762        if let Some(seq) = sequence {
763            properties.insert("sequence".to_string(), seq.to_string());
764        }
765
766        StreamEvent::TripleAdded {
767            subject: "test:subject".to_string(),
768            predicate: "test:predicate".to_string(),
769            object: "test:object".to_string(),
770            graph: None,
771            metadata: EventMetadata {
772                event_id: Uuid::new_v4().to_string(),
773                timestamp,
774                source: "test".to_string(),
775                user: None,
776                context: None,
777                caused_by: None,
778                version: "1.0".to_string(),
779                properties,
780                checksum: None,
781            },
782        }
783    }
784
785    #[tokio::test]
786    async fn test_handler_creation() {
787        let handler = OutOfOrderHandler::new(OutOfOrderConfig::default());
788        let stats = handler.get_stats().await;
789        assert_eq!(stats.total_events, 0);
790    }
791
792    #[tokio::test]
793    async fn test_add_event() {
794        let handler = OutOfOrderHandlerBuilder::new()
795            .emit_strategy(EmitStrategy::Immediate)
796            .build();
797
798        let event = create_test_event(Utc::now(), Some(1));
799        let emitted = handler.add_event(event).await.unwrap();
800
801        assert_eq!(emitted.len(), 1);
802        let stats = handler.get_stats().await;
803        assert_eq!(stats.total_events, 1);
804    }
805
806    #[tokio::test]
807    async fn test_watermark_update() {
808        let handler = OutOfOrderHandler::new(OutOfOrderConfig::default());
809
810        let now = Utc::now();
811        let event = create_test_event(now, None);
812        handler.add_event(event).await.unwrap();
813
814        let watermark = handler.get_watermark().await;
815        assert!(watermark.events_processed > 0);
816    }
817
818    #[tokio::test]
819    async fn test_late_event_detection() {
820        let config = OutOfOrderConfig {
821            late_event_strategy: LateEventStrategy::SideOutput,
822            allowed_out_of_orderness: Duration::from_secs(1),
823            ..Default::default()
824        };
825        let handler = OutOfOrderHandler::new(config);
826
827        // Add a current event
828        let now = Utc::now();
829        let event = create_test_event(now, None);
830        handler.add_event(event).await.unwrap();
831
832        // Add a late event (10 seconds old)
833        let old_time = now - chrono::Duration::seconds(10);
834        let late_event = create_test_event(old_time, None);
835        handler.add_event(late_event).await.unwrap();
836
837        let stats = handler.get_stats().await;
838        assert!(stats.late_events > 0);
839    }
840
841    #[tokio::test]
842    async fn test_deduplication() {
843        let config = OutOfOrderConfig {
844            enable_deduplication: true,
845            deduplication_window: Duration::from_secs(60),
846            ..Default::default()
847        };
848        let handler = OutOfOrderHandler::new(config);
849
850        let event = create_test_event(Utc::now(), None);
851        let event_clone = event.clone();
852
853        handler.add_event(event).await.unwrap();
854        handler.add_event(event_clone).await.unwrap();
855
856        let stats = handler.get_stats().await;
857        assert_eq!(stats.duplicates_detected, 1);
858    }
859
860    #[tokio::test]
861    async fn test_sequence_tracking() {
862        let handler = OutOfOrderHandlerBuilder::new()
863            .with_sequence_tracking()
864            .emit_strategy(EmitStrategy::Immediate)
865            .build();
866
867        // Add events with gaps
868        let now = Utc::now();
869        handler
870            .add_event(create_test_event(now, Some(1)))
871            .await
872            .unwrap();
873        handler
874            .add_event(create_test_event(now, Some(5)))
875            .await
876            .unwrap(); // Gap
877
878        let missing = handler.get_missing_sequences().await;
879        assert!(!missing.is_empty());
880        assert!(missing.contains(&2));
881        assert!(missing.contains(&3));
882        assert!(missing.contains(&4));
883    }
884
885    #[tokio::test]
886    async fn test_flush() {
887        let handler = OutOfOrderHandlerBuilder::new()
888            .emit_strategy(EmitStrategy::Watermark)
889            .build();
890
891        let now = Utc::now();
892        handler
893            .add_event(create_test_event(now, Some(1)))
894            .await
895            .unwrap();
896        handler
897            .add_event(create_test_event(now, Some(2)))
898            .await
899            .unwrap();
900
901        let flushed = handler.flush().await.unwrap();
902        assert!(flushed.len() >= 2);
903
904        let stats = handler.get_stats().await;
905        assert_eq!(stats.buffer_size, 0);
906    }
907
908    #[tokio::test]
909    async fn test_emit_strategy_delay() {
910        let handler = OutOfOrderHandlerBuilder::new()
911            .emit_strategy(EmitStrategy::Delay(Duration::from_millis(100)))
912            .build();
913
914        let now = Utc::now();
915        handler
916            .add_event(create_test_event(now, Some(1)))
917            .await
918            .unwrap();
919
920        // Should emit after delay
921        tokio::time::sleep(Duration::from_millis(150)).await;
922        let emitted = handler.emit_ready_events().await.unwrap();
923        assert!(!emitted.is_empty());
924    }
925
926    #[tokio::test]
927    async fn test_buffer_full_emit() {
928        let handler = OutOfOrderHandlerBuilder::new()
929            .buffer_capacity(10)
930            .emit_strategy(EmitStrategy::BufferFull)
931            .build();
932
933        let now = Utc::now();
934        for i in 0..15 {
935            let time = now + chrono::Duration::milliseconds(i);
936            handler
937                .add_event(create_test_event(time, Some(i as u64)))
938                .await
939                .unwrap();
940        }
941
942        let stats = handler.get_stats().await;
943        assert!(stats.ordered_events > 0);
944    }
945
946    #[tokio::test]
947    async fn test_late_event_strategies() {
948        // Test Drop strategy
949        let handler = OutOfOrderHandlerBuilder::new()
950            .late_event_strategy(LateEventStrategy::Drop)
951            .allowed_out_of_orderness(Duration::from_secs(1))
952            .emit_strategy(EmitStrategy::Immediate)
953            .build();
954
955        let now = Utc::now();
956        handler
957            .add_event(create_test_event(now, None))
958            .await
959            .unwrap();
960
961        let old = now - chrono::Duration::seconds(100);
962        let result = handler
963            .add_event(create_test_event(old, None))
964            .await
965            .unwrap();
966        assert!(result.is_empty());
967
968        let stats = handler.get_stats().await;
969        assert_eq!(stats.late_events_dropped, 1);
970    }
971
972    #[tokio::test]
973    async fn test_reset() {
974        let handler = OutOfOrderHandler::new(OutOfOrderConfig::default());
975
976        let event = create_test_event(Utc::now(), Some(1));
977        handler.add_event(event).await.unwrap();
978
979        handler.reset().await;
980
981        let stats = handler.get_stats().await;
982        assert_eq!(stats.total_events, 0);
983        assert_eq!(stats.buffer_size, 0);
984    }
985
986    #[tokio::test]
987    async fn test_ordered_emission() {
988        let handler = OutOfOrderHandlerBuilder::new()
989            .emit_strategy(EmitStrategy::Delay(Duration::from_millis(50)))
990            .build();
991
992        // Add events out of order
993        let base = Utc::now();
994        handler
995            .add_event(create_test_event(
996                base + chrono::Duration::milliseconds(30),
997                Some(3),
998            ))
999            .await
1000            .unwrap();
1001        handler
1002            .add_event(create_test_event(
1003                base + chrono::Duration::milliseconds(10),
1004                Some(1),
1005            ))
1006            .await
1007            .unwrap();
1008        handler
1009            .add_event(create_test_event(
1010                base + chrono::Duration::milliseconds(20),
1011                Some(2),
1012            ))
1013            .await
1014            .unwrap();
1015
1016        // Wait and emit
1017        tokio::time::sleep(Duration::from_millis(100)).await;
1018        let emitted = handler.emit_ready_events().await.unwrap();
1019
1020        // Verify ordering
1021        assert_eq!(emitted.len(), 3);
1022        for i in 0..emitted.len() - 1 {
1023            assert!(emitted[i].event_time <= emitted[i + 1].event_time);
1024        }
1025    }
1026}