Skip to main content

varpulis_runtime/
sequence.rs

1//! Sequence tracking for temporal event correlations
2//!
3//! Implements the runtime support for the `->` (followed-by) operator.
4
5use std::time::{Duration, Instant};
6
7use rustc_hash::FxHashMap;
8
9use crate::event::Event;
10
11/// Type alias for sequence filter functions
12pub type SequenceFilter = Box<dyn Fn(&Event, &SequenceContext) -> bool + Send + Sync>;
13
14/// A step in a sequence pattern
15pub struct SequenceStep {
16    /// Event type to match
17    pub event_type: String,
18    /// Filter function to apply
19    pub filter: Option<SequenceFilter>,
20    /// Alias for captured event
21    pub alias: Option<String>,
22    /// Timeout for this step
23    pub timeout: Option<Duration>,
24    /// Match all events (true) or just one (false)
25    pub match_all: bool,
26}
27
28impl std::fmt::Debug for SequenceStep {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        f.debug_struct("SequenceStep")
31            .field("event_type", &self.event_type)
32            .field("has_filter", &self.filter.is_some())
33            .field("alias", &self.alias)
34            .field("timeout", &self.timeout)
35            .field("match_all", &self.match_all)
36            .finish_non_exhaustive()
37    }
38}
39
40/// Context for evaluating sequence filters
41#[derive(Debug, Clone, Default)]
42pub struct SequenceContext {
43    /// Captured events by alias
44    pub captured: FxHashMap<String, Event>,
45    /// Previous event in sequence (accessible as $)
46    pub previous: Option<Event>,
47}
48
49/// PERF: Static empty context for read-only operations (avoids allocation)
50pub static EMPTY_CONTEXT: std::sync::LazyLock<SequenceContext> =
51    std::sync::LazyLock::new(SequenceContext::new);
52
53impl SequenceContext {
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    /// Returns a reference to a shared empty context for read-only operations.
59    /// PERF: Avoids allocating a new HashMap for each operation that doesn't need captured events.
60    #[inline]
61    pub fn empty() -> &'static Self {
62        &EMPTY_CONTEXT
63    }
64
65    pub fn with_captured(mut self, alias: String, event: Event) -> Self {
66        self.previous = Some(event.clone());
67        self.captured.insert(alias, event);
68        self
69    }
70
71    pub fn get(&self, alias: &str) -> Option<&Event> {
72        if alias == "$" {
73            self.previous.as_ref()
74        } else {
75            self.captured.get(alias)
76        }
77    }
78}
79
80/// An active correlation being tracked
81#[derive(Debug)]
82pub struct ActiveCorrelation {
83    /// Current step index in the sequence
84    pub current_step: usize,
85    /// Context with captured events
86    pub context: SequenceContext,
87    /// When this correlation was started
88    pub started_at: Instant,
89    /// Timeout for current step (if any)
90    pub step_timeout: Option<Instant>,
91}
92
93impl Default for ActiveCorrelation {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl ActiveCorrelation {
100    pub fn new() -> Self {
101        Self {
102            current_step: 0,
103            context: SequenceContext::new(),
104            started_at: Instant::now(),
105            step_timeout: None,
106        }
107    }
108
109    pub fn advance(&mut self, event: Event, alias: Option<&str>) {
110        if let Some(alias) = alias {
111            self.context =
112                std::mem::take(&mut self.context).with_captured(alias.to_string(), event);
113        } else {
114            self.context.previous = Some(event);
115        }
116        self.current_step += 1;
117        self.step_timeout = None;
118    }
119
120    pub fn is_timed_out(&self) -> bool {
121        if let Some(deadline) = self.step_timeout {
122            Instant::now() > deadline
123        } else {
124            false
125        }
126    }
127
128    pub fn set_timeout(&mut self, duration: Duration) {
129        self.step_timeout = Some(Instant::now() + duration);
130    }
131}
132
133/// A negation condition that invalidates a sequence
134pub struct NegationCondition {
135    /// Event type that triggers negation
136    pub event_type: String,
137    /// Filter function to apply
138    pub filter: Option<SequenceFilter>,
139}
140
141impl std::fmt::Debug for NegationCondition {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("NegationCondition")
144            .field("event_type", &self.event_type)
145            .field("has_filter", &self.filter.is_some())
146            .finish_non_exhaustive()
147    }
148}
149
150/// Tracks sequences for a stream
151pub struct SequenceTracker {
152    /// Steps in the sequence pattern
153    steps: Vec<SequenceStep>,
154    /// Active correlations being tracked
155    active: Vec<ActiveCorrelation>,
156    /// Maximum concurrent correlations
157    max_active: usize,
158    /// Match all for first step
159    match_all_first: bool,
160    /// Negation conditions that invalidate sequences
161    negations: Vec<NegationCondition>,
162}
163
164impl std::fmt::Debug for SequenceTracker {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        f.debug_struct("SequenceTracker")
167            .field("steps", &self.steps)
168            .field("active_count", &self.active.len())
169            .field("max_active", &self.max_active)
170            .field("match_all_first", &self.match_all_first)
171            .field("negations", &self.negations)
172            .finish_non_exhaustive()
173    }
174}
175
176impl SequenceTracker {
177    pub const fn new(steps: Vec<SequenceStep>, match_all_first: bool) -> Self {
178        Self {
179            steps,
180            active: Vec::new(),
181            max_active: 10000,
182            match_all_first,
183            negations: Vec::new(),
184        }
185    }
186
187    pub const fn with_max_active(mut self, max: usize) -> Self {
188        self.max_active = max;
189        self
190    }
191
192    pub fn with_negation(mut self, negation: NegationCondition) -> Self {
193        self.negations.push(negation);
194        self
195    }
196
197    pub fn add_negation(&mut self, negation: NegationCondition) {
198        self.negations.push(negation);
199    }
200
201    /// Process an incoming event, returns completed correlations
202    pub fn process(&mut self, event: &Event) -> Vec<SequenceContext> {
203        let mut completed = Vec::new();
204
205        // First, check if this event triggers any negation condition
206        // If so, invalidate matching active correlations
207        self.check_negations(event);
208
209        // Track how many correlations existed before starting new ones
210        let existing_count = self.active.len();
211
212        // Check if this event starts a new correlation
213        if self.should_start_new(event) {
214            self.start_correlation(event);
215        }
216
217        // Then, advance existing correlations (NOT newly started ones)
218        // This prevents the same event from matching multiple steps
219        let mut i = 0;
220        while i < existing_count.min(self.active.len()) {
221            // Remove timed out correlations
222            if self.active[i].is_timed_out() {
223                self.active.remove(i);
224                continue;
225            }
226
227            let step_idx = self.active[i].current_step;
228
229            // Check if we're waiting for more steps
230            if step_idx >= self.steps.len() {
231                i += 1;
232                continue;
233            }
234
235            // Check if event matches this step (need to avoid borrow issues)
236            let matches = {
237                let step = &self.steps[step_idx];
238                let context = &self.active[i].context;
239                self.event_matches_step(event, step, context)
240            };
241
242            if matches {
243                let step = &self.steps[step_idx];
244                let alias = step.alias.clone();
245                let step_match_all = step.match_all;
246
247                self.active[i].advance(event.clone(), alias.as_deref());
248
249                // Set timeout for next step if specified
250                let current_step = self.active[i].current_step;
251                if let Some(next_step) = self.steps.get(current_step) {
252                    if let Some(t) = next_step.timeout {
253                        self.active[i].set_timeout(t);
254                    }
255                }
256
257                // Check if sequence is complete
258                if self.active[i].current_step >= self.steps.len() {
259                    if step_match_all {
260                        // match_all: emit result but keep correlation active for more matches
261                        completed.push(self.active[i].context.clone());
262                        // Reset to previous step to match more events of this type
263                        self.active[i].current_step = step_idx;
264                    } else {
265                        // Normal: remove completed correlation
266                        completed.push(self.active.remove(i).context);
267                        continue;
268                    }
269                }
270            }
271
272            i += 1;
273        }
274
275        completed
276    }
277
278    fn should_start_new(&self, event: &Event) -> bool {
279        if self.steps.is_empty() {
280            return false;
281        }
282
283        // Check if we're at capacity
284        if self.active.len() >= self.max_active {
285            return false;
286        }
287
288        // Check if event matches the first step
289        let first_step = &self.steps[0];
290        if *event.event_type != first_step.event_type {
291            return false;
292        }
293
294        // Apply filter if any
295        if let Some(ref filter) = first_step.filter {
296            let ctx = SequenceContext::new();
297            if !filter(event, &ctx) {
298                return false;
299            }
300        }
301
302        // If not match_all, only start if no active correlations
303        if !self.match_all_first && !self.active.is_empty() {
304            return false;
305        }
306
307        true
308    }
309
310    fn start_correlation(&mut self, event: &Event) {
311        let mut correlation = ActiveCorrelation::new();
312        let first_step = &self.steps[0];
313
314        correlation.advance(event.clone(), first_step.alias.as_deref());
315
316        // Set timeout for second step if specified
317        if let Some(second_step) = self.steps.get(1) {
318            if let Some(t) = second_step.timeout {
319                correlation.set_timeout(t);
320            }
321        }
322
323        self.active.push(correlation);
324    }
325
326    fn event_matches_step(
327        &self,
328        event: &Event,
329        step: &SequenceStep,
330        context: &SequenceContext,
331    ) -> bool {
332        // Check event type
333        if *event.event_type != step.event_type {
334            return false;
335        }
336
337        // Apply filter if any
338        if let Some(ref filter) = step.filter {
339            if !filter(event, context) {
340                return false;
341            }
342        }
343
344        true
345    }
346
347    /// Check if event triggers any negation condition and invalidate matching correlations
348    fn check_negations(&mut self, event: &Event) {
349        if self.negations.is_empty() {
350            return;
351        }
352
353        // Check each negation condition
354        for negation in &self.negations {
355            if *event.event_type != negation.event_type {
356                continue;
357            }
358
359            // Remove correlations where the negation filter matches
360            self.active.retain(|correlation| {
361                if let Some(ref filter) = negation.filter {
362                    // If filter returns true, the negation matches -> remove correlation
363                    !filter(event, &correlation.context)
364                } else {
365                    // No filter means any event of this type negates -> remove correlation
366                    false
367                }
368            });
369        }
370    }
371
372    /// Get count of active correlations
373    pub const fn active_count(&self) -> usize {
374        self.active.len()
375    }
376
377    /// Clean up timed out correlations
378    pub fn cleanup_timeouts(&mut self) -> usize {
379        let before = self.active.len();
380        self.active.retain(|c| !c.is_timed_out());
381        before - self.active.len()
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use varpulis_core::Value;
388
389    use super::*;
390
391    fn make_event(event_type: &str, fields: Vec<(&str, Value)>) -> Event {
392        let mut event = Event::new(event_type);
393        for (k, v) in fields {
394            event.data.insert(k.into(), v);
395        }
396        event
397    }
398
399    #[test]
400    fn test_simple_sequence_a_then_b() {
401        let steps = vec![
402            SequenceStep {
403                event_type: "A".to_string(),
404                filter: None,
405                alias: Some("a".to_string()),
406                timeout: None,
407                match_all: false,
408            },
409            SequenceStep {
410                event_type: "B".to_string(),
411                filter: None,
412                alias: Some("b".to_string()),
413                timeout: None,
414                match_all: false,
415            },
416        ];
417
418        let mut tracker = SequenceTracker::new(steps, false);
419
420        // Send A - should start correlation
421        let a = make_event("A", vec![("id", Value::Int(1))]);
422        let completed = tracker.process(&a);
423        assert!(completed.is_empty());
424        assert_eq!(tracker.active_count(), 1);
425
426        // Send B - should complete sequence
427        let b = make_event("B", vec![("id", Value::Int(2))]);
428        let completed = tracker.process(&b);
429        assert_eq!(completed.len(), 1);
430        assert_eq!(tracker.active_count(), 0);
431
432        // Check captured events
433        let ctx = &completed[0];
434        assert!(ctx.get("a").is_some());
435        assert!(ctx.get("b").is_some());
436    }
437
438    #[test]
439    fn test_sequence_with_filter() {
440        let steps = vec![
441            SequenceStep {
442                event_type: "Order".to_string(),
443                filter: None,
444                alias: Some("order".to_string()),
445                timeout: None,
446                match_all: false,
447            },
448            SequenceStep {
449                event_type: "Payment".to_string(),
450                filter: Some(Box::new(|event, ctx| {
451                    // Payment.order_id == order.id
452                    let order = ctx.get("order").unwrap();
453                    event.get_int("order_id") == order.get_int("id")
454                })),
455                alias: Some("payment".to_string()),
456                timeout: None,
457                match_all: false,
458            },
459        ];
460
461        let mut tracker = SequenceTracker::new(steps, false);
462
463        // Order 1
464        let order1 = make_event("Order", vec![("id", Value::Int(1))]);
465        tracker.process(&order1);
466        assert_eq!(tracker.active_count(), 1);
467
468        // Payment for wrong order - should not complete
469        let wrong_payment = make_event("Payment", vec![("order_id", Value::Int(999))]);
470        let completed = tracker.process(&wrong_payment);
471        assert!(completed.is_empty());
472        assert_eq!(tracker.active_count(), 1);
473
474        // Payment for correct order - should complete
475        let correct_payment = make_event("Payment", vec![("order_id", Value::Int(1))]);
476        let completed = tracker.process(&correct_payment);
477        assert_eq!(completed.len(), 1);
478    }
479
480    #[test]
481    fn test_sequence_match_all() {
482        let steps = vec![
483            SequenceStep {
484                event_type: "News".to_string(),
485                filter: None,
486                alias: Some("news".to_string()),
487                timeout: None,
488                match_all: true,
489            },
490            SequenceStep {
491                event_type: "Tick".to_string(),
492                filter: None,
493                alias: Some("tick".to_string()),
494                timeout: None,
495                match_all: false,
496            },
497        ];
498
499        let mut tracker = SequenceTracker::new(steps, true);
500
501        // Send multiple News events
502        let news1 = make_event("News", vec![("id", Value::Int(1))]);
503        let news2 = make_event("News", vec![("id", Value::Int(2))]);
504        tracker.process(&news1);
505        tracker.process(&news2);
506        assert_eq!(tracker.active_count(), 2);
507
508        // Send Tick - should complete BOTH correlations
509        let tick = make_event("Tick", vec![("price", Value::Float(100.0))]);
510        let completed = tracker.process(&tick);
511        assert_eq!(completed.len(), 2);
512        assert_eq!(tracker.active_count(), 0);
513    }
514
515    #[test]
516    fn test_sequence_three_steps() {
517        let steps = vec![
518            SequenceStep {
519                event_type: "A".to_string(),
520                filter: None,
521                alias: Some("a".to_string()),
522                timeout: None,
523                match_all: false,
524            },
525            SequenceStep {
526                event_type: "B".to_string(),
527                filter: None,
528                alias: Some("b".to_string()),
529                timeout: None,
530                match_all: false,
531            },
532            SequenceStep {
533                event_type: "C".to_string(),
534                filter: None,
535                alias: Some("c".to_string()),
536                timeout: None,
537                match_all: false,
538            },
539        ];
540
541        let mut tracker = SequenceTracker::new(steps, false);
542
543        tracker.process(&make_event("A", vec![]));
544        assert_eq!(tracker.active_count(), 1);
545
546        tracker.process(&make_event("B", vec![]));
547        assert_eq!(tracker.active_count(), 1);
548
549        let completed = tracker.process(&make_event("C", vec![]));
550        assert_eq!(completed.len(), 1);
551        assert_eq!(tracker.active_count(), 0);
552
553        // All three should be captured
554        let ctx = &completed[0];
555        assert!(ctx.get("a").is_some());
556        assert!(ctx.get("b").is_some());
557        assert!(ctx.get("c").is_some());
558    }
559
560    #[test]
561    fn test_sequence_dollar_reference() {
562        let steps = vec![
563            SequenceStep {
564                event_type: "Start".to_string(),
565                filter: None,
566                alias: None, // No alias, but should be accessible as $
567                timeout: None,
568                match_all: false,
569            },
570            SequenceStep {
571                event_type: "End".to_string(),
572                filter: Some(Box::new(|event, ctx| {
573                    // End.id == $.id (previous event)
574                    let prev = ctx.get("$").unwrap();
575                    event.get_int("id") == prev.get_int("id")
576                })),
577                alias: None,
578                timeout: None,
579                match_all: false,
580            },
581        ];
582
583        let mut tracker = SequenceTracker::new(steps, false);
584
585        tracker.process(&make_event("Start", vec![("id", Value::Int(42))]));
586
587        // Wrong ID
588        let completed = tracker.process(&make_event("End", vec![("id", Value::Int(99))]));
589        assert!(completed.is_empty());
590
591        // Correct ID
592        let completed = tracker.process(&make_event("End", vec![("id", Value::Int(42))]));
593        assert_eq!(completed.len(), 1);
594    }
595
596    #[test]
597    fn test_sequence_max_active() {
598        let steps = vec![
599            SequenceStep {
600                event_type: "A".to_string(),
601                filter: None,
602                alias: None,
603                timeout: None,
604                match_all: true,
605            },
606            SequenceStep {
607                event_type: "B".to_string(),
608                filter: None,
609                alias: None,
610                timeout: None,
611                match_all: false,
612            },
613        ];
614
615        let mut tracker = SequenceTracker::new(steps, true).with_max_active(3);
616
617        // Start 5 correlations, but only 3 should be active
618        for i in 0..5 {
619            tracker.process(&make_event("A", vec![("id", Value::Int(i))]));
620        }
621        assert_eq!(tracker.active_count(), 3);
622    }
623
624    #[test]
625    fn test_sequence_no_match_wrong_type() {
626        let steps = vec![
627            SequenceStep {
628                event_type: "A".to_string(),
629                filter: None,
630                alias: None,
631                timeout: None,
632                match_all: false,
633            },
634            SequenceStep {
635                event_type: "B".to_string(),
636                filter: None,
637                alias: None,
638                timeout: None,
639                match_all: false,
640            },
641        ];
642
643        let mut tracker = SequenceTracker::new(steps, false);
644
645        // Send X - should not start
646        tracker.process(&make_event("X", vec![]));
647        assert_eq!(tracker.active_count(), 0);
648
649        // Send A - should start
650        tracker.process(&make_event("A", vec![]));
651        assert_eq!(tracker.active_count(), 1);
652
653        // Send X - should not advance
654        tracker.process(&make_event("X", vec![]));
655        assert_eq!(tracker.active_count(), 1);
656    }
657
658    // ==========================================================================
659    // Negation Tests
660    // ==========================================================================
661
662    #[test]
663    fn test_sequence_with_negation() {
664        let steps = vec![
665            SequenceStep {
666                event_type: "Order".to_string(),
667                filter: None,
668                alias: Some("order".to_string()),
669                timeout: None,
670                match_all: false,
671            },
672            SequenceStep {
673                event_type: "Shipped".to_string(),
674                filter: None,
675                alias: Some("shipped".to_string()),
676                timeout: None,
677                match_all: false,
678            },
679        ];
680
681        // Negation: Cancel event invalidates the sequence
682        let mut tracker = SequenceTracker::new(steps, false).with_negation(NegationCondition {
683            event_type: "Cancel".to_string(),
684            filter: None,
685        });
686
687        // Start order
688        tracker.process(&make_event("Order", vec![("id", Value::Int(1))]));
689        assert_eq!(tracker.active_count(), 1);
690
691        // Cancel order - should invalidate
692        tracker.process(&make_event("Cancel", vec![]));
693        assert_eq!(tracker.active_count(), 0);
694
695        // Shipped - no active correlation
696        let completed = tracker.process(&make_event("Shipped", vec![]));
697        assert!(completed.is_empty());
698    }
699
700    #[test]
701    fn test_sequence_negation_with_filter() {
702        let steps = vec![
703            SequenceStep {
704                event_type: "Order".to_string(),
705                filter: None,
706                alias: Some("order".to_string()),
707                timeout: None,
708                match_all: false,
709            },
710            SequenceStep {
711                event_type: "Shipped".to_string(),
712                filter: None,
713                alias: None,
714                timeout: None,
715                match_all: false,
716            },
717        ];
718
719        // Negation only if Cancel.order_id matches order.id
720        let negation = NegationCondition {
721            event_type: "Cancel".to_string(),
722            filter: Some(Box::new(|event, ctx| {
723                if let Some(order) = ctx.get("order") {
724                    event.get_int("order_id") == order.get_int("id")
725                } else {
726                    false
727                }
728            })),
729        };
730
731        let mut tracker = SequenceTracker::new(steps, false).with_negation(negation);
732
733        // Start order with id=1
734        tracker.process(&make_event("Order", vec![("id", Value::Int(1))]));
735        assert_eq!(tracker.active_count(), 1);
736
737        // Cancel for different order - should NOT invalidate
738        tracker.process(&make_event("Cancel", vec![("order_id", Value::Int(999))]));
739        assert_eq!(tracker.active_count(), 1);
740
741        // Cancel for same order - should invalidate
742        tracker.process(&make_event("Cancel", vec![("order_id", Value::Int(1))]));
743        assert_eq!(tracker.active_count(), 0);
744    }
745
746    #[test]
747    fn test_sequence_add_negation() {
748        let steps = vec![
749            SequenceStep {
750                event_type: "A".to_string(),
751                filter: None,
752                alias: None,
753                timeout: None,
754                match_all: false,
755            },
756            SequenceStep {
757                event_type: "B".to_string(),
758                filter: None,
759                alias: None,
760                timeout: None,
761                match_all: false,
762            },
763        ];
764
765        let mut tracker = SequenceTracker::new(steps, false);
766        tracker.add_negation(NegationCondition {
767            event_type: "X".to_string(),
768            filter: None,
769        });
770
771        tracker.process(&make_event("A", vec![]));
772        assert_eq!(tracker.active_count(), 1);
773
774        tracker.process(&make_event("X", vec![]));
775        assert_eq!(tracker.active_count(), 0);
776    }
777
778    // ==========================================================================
779    // Timeout Tests
780    // ==========================================================================
781
782    #[test]
783    fn test_sequence_cleanup_timeouts() {
784        let steps = vec![
785            SequenceStep {
786                event_type: "A".to_string(),
787                filter: None,
788                alias: None,
789                timeout: None,
790                match_all: true,
791            },
792            SequenceStep {
793                event_type: "B".to_string(),
794                filter: None,
795                alias: None,
796                timeout: Some(Duration::from_millis(1)), // Very short timeout
797                match_all: false,
798            },
799        ];
800
801        let mut tracker = SequenceTracker::new(steps, true);
802
803        // Start correlation
804        tracker.process(&make_event("A", vec![]));
805        assert_eq!(tracker.active_count(), 1);
806
807        // Wait for timeout
808        std::thread::sleep(Duration::from_millis(10));
809
810        // Cleanup should remove timed out correlations
811        let cleaned = tracker.cleanup_timeouts();
812        assert_eq!(cleaned, 1);
813        assert_eq!(tracker.active_count(), 0);
814    }
815
816    #[test]
817    fn test_sequence_timeout_during_process() {
818        let steps = vec![
819            SequenceStep {
820                event_type: "A".to_string(),
821                filter: None,
822                alias: None,
823                timeout: None,
824                match_all: true,
825            },
826            SequenceStep {
827                event_type: "B".to_string(),
828                filter: None,
829                alias: None,
830                timeout: Some(Duration::from_millis(1)),
831                match_all: false,
832            },
833        ];
834
835        let mut tracker = SequenceTracker::new(steps, true);
836
837        tracker.process(&make_event("A", vec![]));
838        assert_eq!(tracker.active_count(), 1);
839
840        // Wait for timeout
841        std::thread::sleep(Duration::from_millis(10));
842
843        // Processing any event should also clean up timeouts
844        tracker.process(&make_event("B", vec![]));
845        // The timed out correlation is removed during process
846        assert_eq!(tracker.active_count(), 0);
847    }
848
849    // ==========================================================================
850    // Context Tests
851    // ==========================================================================
852
853    #[test]
854    fn test_sequence_context_creation() {
855        let ctx = SequenceContext::new();
856        assert!(ctx.previous.is_none());
857        assert!(ctx.captured.is_empty());
858    }
859
860    #[test]
861    fn test_sequence_context_with_captured() {
862        let event = make_event("TestEvent", vec![("value", Value::Int(42))]);
863        let ctx = SequenceContext::new().with_captured("test".to_string(), event);
864
865        assert!(ctx.get("test").is_some());
866        assert!(ctx.get("$").is_some()); // Previous should also be set
867        assert!(ctx.get("unknown").is_none());
868    }
869
870    // ==========================================================================
871    // Active Correlation Tests
872    // ==========================================================================
873
874    #[test]
875    fn test_active_correlation_default() {
876        let corr = ActiveCorrelation::default();
877        assert_eq!(corr.current_step, 0);
878        assert!(!corr.is_timed_out());
879    }
880
881    #[test]
882    fn test_active_correlation_advance() {
883        let mut corr = ActiveCorrelation::new();
884        let event = make_event("A", vec![]);
885
886        corr.advance(event, Some("a"));
887        assert_eq!(corr.current_step, 1);
888        assert!(corr.context.get("a").is_some());
889    }
890
891    #[test]
892    fn test_active_correlation_set_timeout() {
893        let mut corr = ActiveCorrelation::new();
894        assert!(!corr.is_timed_out());
895
896        corr.set_timeout(Duration::from_millis(1));
897        std::thread::sleep(Duration::from_millis(10));
898
899        assert!(corr.is_timed_out());
900    }
901
902    // ==========================================================================
903    // Empty Steps Tests
904    // ==========================================================================
905
906    #[test]
907    fn test_sequence_empty_steps() {
908        let mut tracker = SequenceTracker::new(vec![], false);
909
910        let completed = tracker.process(&make_event("A", vec![]));
911        assert!(completed.is_empty());
912        assert_eq!(tracker.active_count(), 0);
913    }
914
915    // ==========================================================================
916    // Filter Rejection Test
917    // ==========================================================================
918
919    #[test]
920    fn test_sequence_first_step_filter_rejection() {
921        let steps = vec![
922            SequenceStep {
923                event_type: "A".to_string(),
924                filter: Some(Box::new(|event, _ctx| {
925                    event.get_int("value").unwrap_or(0) > 10
926                })),
927                alias: None,
928                timeout: None,
929                match_all: false,
930            },
931            SequenceStep {
932                event_type: "B".to_string(),
933                filter: None,
934                alias: None,
935                timeout: None,
936                match_all: false,
937            },
938        ];
939
940        let mut tracker = SequenceTracker::new(steps, false);
941
942        // Event with value <= 10 should not start correlation
943        tracker.process(&make_event("A", vec![("value", Value::Int(5))]));
944        assert_eq!(tracker.active_count(), 0);
945
946        // Event with value > 10 should start
947        tracker.process(&make_event("A", vec![("value", Value::Int(15))]));
948        assert_eq!(tracker.active_count(), 1);
949    }
950}