1use std::time::{Duration, Instant};
6
7use rustc_hash::FxHashMap;
8
9use crate::event::Event;
10
11pub type SequenceFilter = Box<dyn Fn(&Event, &SequenceContext) -> bool + Send + Sync>;
13
14pub struct SequenceStep {
16 pub event_type: String,
18 pub filter: Option<SequenceFilter>,
20 pub alias: Option<String>,
22 pub timeout: Option<Duration>,
24 pub match_all: bool,
26}
27
28impl std::fmt::Debug for SequenceStep {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 f.debug_struct("SequenceStep")
31 .field("event_type", &self.event_type)
32 .field("has_filter", &self.filter.is_some())
33 .field("alias", &self.alias)
34 .field("timeout", &self.timeout)
35 .field("match_all", &self.match_all)
36 .finish_non_exhaustive()
37 }
38}
39
40#[derive(Debug, Clone, Default)]
42pub struct SequenceContext {
43 pub captured: FxHashMap<String, Event>,
45 pub previous: Option<Event>,
47}
48
49pub static EMPTY_CONTEXT: std::sync::LazyLock<SequenceContext> =
51 std::sync::LazyLock::new(SequenceContext::new);
52
53impl SequenceContext {
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 #[inline]
61 pub fn empty() -> &'static Self {
62 &EMPTY_CONTEXT
63 }
64
65 pub fn with_captured(mut self, alias: String, event: Event) -> Self {
66 self.previous = Some(event.clone());
67 self.captured.insert(alias, event);
68 self
69 }
70
71 pub fn get(&self, alias: &str) -> Option<&Event> {
72 if alias == "$" {
73 self.previous.as_ref()
74 } else {
75 self.captured.get(alias)
76 }
77 }
78}
79
80#[derive(Debug)]
82pub struct ActiveCorrelation {
83 pub current_step: usize,
85 pub context: SequenceContext,
87 pub started_at: Instant,
89 pub step_timeout: Option<Instant>,
91}
92
93impl Default for ActiveCorrelation {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl ActiveCorrelation {
100 pub fn new() -> Self {
101 Self {
102 current_step: 0,
103 context: SequenceContext::new(),
104 started_at: Instant::now(),
105 step_timeout: None,
106 }
107 }
108
109 pub fn advance(&mut self, event: Event, alias: Option<&str>) {
110 if let Some(alias) = alias {
111 self.context =
112 std::mem::take(&mut self.context).with_captured(alias.to_string(), event);
113 } else {
114 self.context.previous = Some(event);
115 }
116 self.current_step += 1;
117 self.step_timeout = None;
118 }
119
120 pub fn is_timed_out(&self) -> bool {
121 if let Some(deadline) = self.step_timeout {
122 Instant::now() > deadline
123 } else {
124 false
125 }
126 }
127
128 pub fn set_timeout(&mut self, duration: Duration) {
129 self.step_timeout = Some(Instant::now() + duration);
130 }
131}
132
133pub struct NegationCondition {
135 pub event_type: String,
137 pub filter: Option<SequenceFilter>,
139}
140
141impl std::fmt::Debug for NegationCondition {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("NegationCondition")
144 .field("event_type", &self.event_type)
145 .field("has_filter", &self.filter.is_some())
146 .finish_non_exhaustive()
147 }
148}
149
150pub struct SequenceTracker {
152 steps: Vec<SequenceStep>,
154 active: Vec<ActiveCorrelation>,
156 max_active: usize,
158 match_all_first: bool,
160 negations: Vec<NegationCondition>,
162}
163
164impl std::fmt::Debug for SequenceTracker {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 f.debug_struct("SequenceTracker")
167 .field("steps", &self.steps)
168 .field("active_count", &self.active.len())
169 .field("max_active", &self.max_active)
170 .field("match_all_first", &self.match_all_first)
171 .field("negations", &self.negations)
172 .finish_non_exhaustive()
173 }
174}
175
176impl SequenceTracker {
177 pub const fn new(steps: Vec<SequenceStep>, match_all_first: bool) -> Self {
178 Self {
179 steps,
180 active: Vec::new(),
181 max_active: 10000,
182 match_all_first,
183 negations: Vec::new(),
184 }
185 }
186
187 pub const fn with_max_active(mut self, max: usize) -> Self {
188 self.max_active = max;
189 self
190 }
191
192 pub fn with_negation(mut self, negation: NegationCondition) -> Self {
193 self.negations.push(negation);
194 self
195 }
196
197 pub fn add_negation(&mut self, negation: NegationCondition) {
198 self.negations.push(negation);
199 }
200
201 pub fn process(&mut self, event: &Event) -> Vec<SequenceContext> {
203 let mut completed = Vec::new();
204
205 self.check_negations(event);
208
209 let existing_count = self.active.len();
211
212 if self.should_start_new(event) {
214 self.start_correlation(event);
215 }
216
217 let mut i = 0;
220 while i < existing_count.min(self.active.len()) {
221 if self.active[i].is_timed_out() {
223 self.active.remove(i);
224 continue;
225 }
226
227 let step_idx = self.active[i].current_step;
228
229 if step_idx >= self.steps.len() {
231 i += 1;
232 continue;
233 }
234
235 let matches = {
237 let step = &self.steps[step_idx];
238 let context = &self.active[i].context;
239 self.event_matches_step(event, step, context)
240 };
241
242 if matches {
243 let step = &self.steps[step_idx];
244 let alias = step.alias.clone();
245 let step_match_all = step.match_all;
246
247 self.active[i].advance(event.clone(), alias.as_deref());
248
249 let current_step = self.active[i].current_step;
251 if let Some(next_step) = self.steps.get(current_step) {
252 if let Some(t) = next_step.timeout {
253 self.active[i].set_timeout(t);
254 }
255 }
256
257 if self.active[i].current_step >= self.steps.len() {
259 if step_match_all {
260 completed.push(self.active[i].context.clone());
262 self.active[i].current_step = step_idx;
264 } else {
265 completed.push(self.active.remove(i).context);
267 continue;
268 }
269 }
270 }
271
272 i += 1;
273 }
274
275 completed
276 }
277
278 fn should_start_new(&self, event: &Event) -> bool {
279 if self.steps.is_empty() {
280 return false;
281 }
282
283 if self.active.len() >= self.max_active {
285 return false;
286 }
287
288 let first_step = &self.steps[0];
290 if *event.event_type != first_step.event_type {
291 return false;
292 }
293
294 if let Some(ref filter) = first_step.filter {
296 let ctx = SequenceContext::new();
297 if !filter(event, &ctx) {
298 return false;
299 }
300 }
301
302 if !self.match_all_first && !self.active.is_empty() {
304 return false;
305 }
306
307 true
308 }
309
310 fn start_correlation(&mut self, event: &Event) {
311 let mut correlation = ActiveCorrelation::new();
312 let first_step = &self.steps[0];
313
314 correlation.advance(event.clone(), first_step.alias.as_deref());
315
316 if let Some(second_step) = self.steps.get(1) {
318 if let Some(t) = second_step.timeout {
319 correlation.set_timeout(t);
320 }
321 }
322
323 self.active.push(correlation);
324 }
325
326 fn event_matches_step(
327 &self,
328 event: &Event,
329 step: &SequenceStep,
330 context: &SequenceContext,
331 ) -> bool {
332 if *event.event_type != step.event_type {
334 return false;
335 }
336
337 if let Some(ref filter) = step.filter {
339 if !filter(event, context) {
340 return false;
341 }
342 }
343
344 true
345 }
346
347 fn check_negations(&mut self, event: &Event) {
349 if self.negations.is_empty() {
350 return;
351 }
352
353 for negation in &self.negations {
355 if *event.event_type != negation.event_type {
356 continue;
357 }
358
359 self.active.retain(|correlation| {
361 if let Some(ref filter) = negation.filter {
362 !filter(event, &correlation.context)
364 } else {
365 false
367 }
368 });
369 }
370 }
371
372 pub const fn active_count(&self) -> usize {
374 self.active.len()
375 }
376
377 pub fn cleanup_timeouts(&mut self) -> usize {
379 let before = self.active.len();
380 self.active.retain(|c| !c.is_timed_out());
381 before - self.active.len()
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use varpulis_core::Value;
388
389 use super::*;
390
391 fn make_event(event_type: &str, fields: Vec<(&str, Value)>) -> Event {
392 let mut event = Event::new(event_type);
393 for (k, v) in fields {
394 event.data.insert(k.into(), v);
395 }
396 event
397 }
398
399 #[test]
400 fn test_simple_sequence_a_then_b() {
401 let steps = vec![
402 SequenceStep {
403 event_type: "A".to_string(),
404 filter: None,
405 alias: Some("a".to_string()),
406 timeout: None,
407 match_all: false,
408 },
409 SequenceStep {
410 event_type: "B".to_string(),
411 filter: None,
412 alias: Some("b".to_string()),
413 timeout: None,
414 match_all: false,
415 },
416 ];
417
418 let mut tracker = SequenceTracker::new(steps, false);
419
420 let a = make_event("A", vec![("id", Value::Int(1))]);
422 let completed = tracker.process(&a);
423 assert!(completed.is_empty());
424 assert_eq!(tracker.active_count(), 1);
425
426 let b = make_event("B", vec![("id", Value::Int(2))]);
428 let completed = tracker.process(&b);
429 assert_eq!(completed.len(), 1);
430 assert_eq!(tracker.active_count(), 0);
431
432 let ctx = &completed[0];
434 assert!(ctx.get("a").is_some());
435 assert!(ctx.get("b").is_some());
436 }
437
438 #[test]
439 fn test_sequence_with_filter() {
440 let steps = vec![
441 SequenceStep {
442 event_type: "Order".to_string(),
443 filter: None,
444 alias: Some("order".to_string()),
445 timeout: None,
446 match_all: false,
447 },
448 SequenceStep {
449 event_type: "Payment".to_string(),
450 filter: Some(Box::new(|event, ctx| {
451 let order = ctx.get("order").unwrap();
453 event.get_int("order_id") == order.get_int("id")
454 })),
455 alias: Some("payment".to_string()),
456 timeout: None,
457 match_all: false,
458 },
459 ];
460
461 let mut tracker = SequenceTracker::new(steps, false);
462
463 let order1 = make_event("Order", vec![("id", Value::Int(1))]);
465 tracker.process(&order1);
466 assert_eq!(tracker.active_count(), 1);
467
468 let wrong_payment = make_event("Payment", vec![("order_id", Value::Int(999))]);
470 let completed = tracker.process(&wrong_payment);
471 assert!(completed.is_empty());
472 assert_eq!(tracker.active_count(), 1);
473
474 let correct_payment = make_event("Payment", vec![("order_id", Value::Int(1))]);
476 let completed = tracker.process(&correct_payment);
477 assert_eq!(completed.len(), 1);
478 }
479
480 #[test]
481 fn test_sequence_match_all() {
482 let steps = vec![
483 SequenceStep {
484 event_type: "News".to_string(),
485 filter: None,
486 alias: Some("news".to_string()),
487 timeout: None,
488 match_all: true,
489 },
490 SequenceStep {
491 event_type: "Tick".to_string(),
492 filter: None,
493 alias: Some("tick".to_string()),
494 timeout: None,
495 match_all: false,
496 },
497 ];
498
499 let mut tracker = SequenceTracker::new(steps, true);
500
501 let news1 = make_event("News", vec![("id", Value::Int(1))]);
503 let news2 = make_event("News", vec![("id", Value::Int(2))]);
504 tracker.process(&news1);
505 tracker.process(&news2);
506 assert_eq!(tracker.active_count(), 2);
507
508 let tick = make_event("Tick", vec![("price", Value::Float(100.0))]);
510 let completed = tracker.process(&tick);
511 assert_eq!(completed.len(), 2);
512 assert_eq!(tracker.active_count(), 0);
513 }
514
515 #[test]
516 fn test_sequence_three_steps() {
517 let steps = vec![
518 SequenceStep {
519 event_type: "A".to_string(),
520 filter: None,
521 alias: Some("a".to_string()),
522 timeout: None,
523 match_all: false,
524 },
525 SequenceStep {
526 event_type: "B".to_string(),
527 filter: None,
528 alias: Some("b".to_string()),
529 timeout: None,
530 match_all: false,
531 },
532 SequenceStep {
533 event_type: "C".to_string(),
534 filter: None,
535 alias: Some("c".to_string()),
536 timeout: None,
537 match_all: false,
538 },
539 ];
540
541 let mut tracker = SequenceTracker::new(steps, false);
542
543 tracker.process(&make_event("A", vec![]));
544 assert_eq!(tracker.active_count(), 1);
545
546 tracker.process(&make_event("B", vec![]));
547 assert_eq!(tracker.active_count(), 1);
548
549 let completed = tracker.process(&make_event("C", vec![]));
550 assert_eq!(completed.len(), 1);
551 assert_eq!(tracker.active_count(), 0);
552
553 let ctx = &completed[0];
555 assert!(ctx.get("a").is_some());
556 assert!(ctx.get("b").is_some());
557 assert!(ctx.get("c").is_some());
558 }
559
560 #[test]
561 fn test_sequence_dollar_reference() {
562 let steps = vec![
563 SequenceStep {
564 event_type: "Start".to_string(),
565 filter: None,
566 alias: None, timeout: None,
568 match_all: false,
569 },
570 SequenceStep {
571 event_type: "End".to_string(),
572 filter: Some(Box::new(|event, ctx| {
573 let prev = ctx.get("$").unwrap();
575 event.get_int("id") == prev.get_int("id")
576 })),
577 alias: None,
578 timeout: None,
579 match_all: false,
580 },
581 ];
582
583 let mut tracker = SequenceTracker::new(steps, false);
584
585 tracker.process(&make_event("Start", vec![("id", Value::Int(42))]));
586
587 let completed = tracker.process(&make_event("End", vec![("id", Value::Int(99))]));
589 assert!(completed.is_empty());
590
591 let completed = tracker.process(&make_event("End", vec![("id", Value::Int(42))]));
593 assert_eq!(completed.len(), 1);
594 }
595
596 #[test]
597 fn test_sequence_max_active() {
598 let steps = vec![
599 SequenceStep {
600 event_type: "A".to_string(),
601 filter: None,
602 alias: None,
603 timeout: None,
604 match_all: true,
605 },
606 SequenceStep {
607 event_type: "B".to_string(),
608 filter: None,
609 alias: None,
610 timeout: None,
611 match_all: false,
612 },
613 ];
614
615 let mut tracker = SequenceTracker::new(steps, true).with_max_active(3);
616
617 for i in 0..5 {
619 tracker.process(&make_event("A", vec![("id", Value::Int(i))]));
620 }
621 assert_eq!(tracker.active_count(), 3);
622 }
623
624 #[test]
625 fn test_sequence_no_match_wrong_type() {
626 let steps = vec![
627 SequenceStep {
628 event_type: "A".to_string(),
629 filter: None,
630 alias: None,
631 timeout: None,
632 match_all: false,
633 },
634 SequenceStep {
635 event_type: "B".to_string(),
636 filter: None,
637 alias: None,
638 timeout: None,
639 match_all: false,
640 },
641 ];
642
643 let mut tracker = SequenceTracker::new(steps, false);
644
645 tracker.process(&make_event("X", vec![]));
647 assert_eq!(tracker.active_count(), 0);
648
649 tracker.process(&make_event("A", vec![]));
651 assert_eq!(tracker.active_count(), 1);
652
653 tracker.process(&make_event("X", vec![]));
655 assert_eq!(tracker.active_count(), 1);
656 }
657
658 #[test]
663 fn test_sequence_with_negation() {
664 let steps = vec![
665 SequenceStep {
666 event_type: "Order".to_string(),
667 filter: None,
668 alias: Some("order".to_string()),
669 timeout: None,
670 match_all: false,
671 },
672 SequenceStep {
673 event_type: "Shipped".to_string(),
674 filter: None,
675 alias: Some("shipped".to_string()),
676 timeout: None,
677 match_all: false,
678 },
679 ];
680
681 let mut tracker = SequenceTracker::new(steps, false).with_negation(NegationCondition {
683 event_type: "Cancel".to_string(),
684 filter: None,
685 });
686
687 tracker.process(&make_event("Order", vec![("id", Value::Int(1))]));
689 assert_eq!(tracker.active_count(), 1);
690
691 tracker.process(&make_event("Cancel", vec![]));
693 assert_eq!(tracker.active_count(), 0);
694
695 let completed = tracker.process(&make_event("Shipped", vec![]));
697 assert!(completed.is_empty());
698 }
699
700 #[test]
701 fn test_sequence_negation_with_filter() {
702 let steps = vec![
703 SequenceStep {
704 event_type: "Order".to_string(),
705 filter: None,
706 alias: Some("order".to_string()),
707 timeout: None,
708 match_all: false,
709 },
710 SequenceStep {
711 event_type: "Shipped".to_string(),
712 filter: None,
713 alias: None,
714 timeout: None,
715 match_all: false,
716 },
717 ];
718
719 let negation = NegationCondition {
721 event_type: "Cancel".to_string(),
722 filter: Some(Box::new(|event, ctx| {
723 if let Some(order) = ctx.get("order") {
724 event.get_int("order_id") == order.get_int("id")
725 } else {
726 false
727 }
728 })),
729 };
730
731 let mut tracker = SequenceTracker::new(steps, false).with_negation(negation);
732
733 tracker.process(&make_event("Order", vec![("id", Value::Int(1))]));
735 assert_eq!(tracker.active_count(), 1);
736
737 tracker.process(&make_event("Cancel", vec![("order_id", Value::Int(999))]));
739 assert_eq!(tracker.active_count(), 1);
740
741 tracker.process(&make_event("Cancel", vec![("order_id", Value::Int(1))]));
743 assert_eq!(tracker.active_count(), 0);
744 }
745
746 #[test]
747 fn test_sequence_add_negation() {
748 let steps = vec![
749 SequenceStep {
750 event_type: "A".to_string(),
751 filter: None,
752 alias: None,
753 timeout: None,
754 match_all: false,
755 },
756 SequenceStep {
757 event_type: "B".to_string(),
758 filter: None,
759 alias: None,
760 timeout: None,
761 match_all: false,
762 },
763 ];
764
765 let mut tracker = SequenceTracker::new(steps, false);
766 tracker.add_negation(NegationCondition {
767 event_type: "X".to_string(),
768 filter: None,
769 });
770
771 tracker.process(&make_event("A", vec![]));
772 assert_eq!(tracker.active_count(), 1);
773
774 tracker.process(&make_event("X", vec![]));
775 assert_eq!(tracker.active_count(), 0);
776 }
777
778 #[test]
783 fn test_sequence_cleanup_timeouts() {
784 let steps = vec![
785 SequenceStep {
786 event_type: "A".to_string(),
787 filter: None,
788 alias: None,
789 timeout: None,
790 match_all: true,
791 },
792 SequenceStep {
793 event_type: "B".to_string(),
794 filter: None,
795 alias: None,
796 timeout: Some(Duration::from_millis(1)), match_all: false,
798 },
799 ];
800
801 let mut tracker = SequenceTracker::new(steps, true);
802
803 tracker.process(&make_event("A", vec![]));
805 assert_eq!(tracker.active_count(), 1);
806
807 std::thread::sleep(Duration::from_millis(10));
809
810 let cleaned = tracker.cleanup_timeouts();
812 assert_eq!(cleaned, 1);
813 assert_eq!(tracker.active_count(), 0);
814 }
815
816 #[test]
817 fn test_sequence_timeout_during_process() {
818 let steps = vec![
819 SequenceStep {
820 event_type: "A".to_string(),
821 filter: None,
822 alias: None,
823 timeout: None,
824 match_all: true,
825 },
826 SequenceStep {
827 event_type: "B".to_string(),
828 filter: None,
829 alias: None,
830 timeout: Some(Duration::from_millis(1)),
831 match_all: false,
832 },
833 ];
834
835 let mut tracker = SequenceTracker::new(steps, true);
836
837 tracker.process(&make_event("A", vec![]));
838 assert_eq!(tracker.active_count(), 1);
839
840 std::thread::sleep(Duration::from_millis(10));
842
843 tracker.process(&make_event("B", vec![]));
845 assert_eq!(tracker.active_count(), 0);
847 }
848
849 #[test]
854 fn test_sequence_context_creation() {
855 let ctx = SequenceContext::new();
856 assert!(ctx.previous.is_none());
857 assert!(ctx.captured.is_empty());
858 }
859
860 #[test]
861 fn test_sequence_context_with_captured() {
862 let event = make_event("TestEvent", vec![("value", Value::Int(42))]);
863 let ctx = SequenceContext::new().with_captured("test".to_string(), event);
864
865 assert!(ctx.get("test").is_some());
866 assert!(ctx.get("$").is_some()); assert!(ctx.get("unknown").is_none());
868 }
869
870 #[test]
875 fn test_active_correlation_default() {
876 let corr = ActiveCorrelation::default();
877 assert_eq!(corr.current_step, 0);
878 assert!(!corr.is_timed_out());
879 }
880
881 #[test]
882 fn test_active_correlation_advance() {
883 let mut corr = ActiveCorrelation::new();
884 let event = make_event("A", vec![]);
885
886 corr.advance(event, Some("a"));
887 assert_eq!(corr.current_step, 1);
888 assert!(corr.context.get("a").is_some());
889 }
890
891 #[test]
892 fn test_active_correlation_set_timeout() {
893 let mut corr = ActiveCorrelation::new();
894 assert!(!corr.is_timed_out());
895
896 corr.set_timeout(Duration::from_millis(1));
897 std::thread::sleep(Duration::from_millis(10));
898
899 assert!(corr.is_timed_out());
900 }
901
902 #[test]
907 fn test_sequence_empty_steps() {
908 let mut tracker = SequenceTracker::new(vec![], false);
909
910 let completed = tracker.process(&make_event("A", vec![]));
911 assert!(completed.is_empty());
912 assert_eq!(tracker.active_count(), 0);
913 }
914
915 #[test]
920 fn test_sequence_first_step_filter_rejection() {
921 let steps = vec![
922 SequenceStep {
923 event_type: "A".to_string(),
924 filter: Some(Box::new(|event, _ctx| {
925 event.get_int("value").unwrap_or(0) > 10
926 })),
927 alias: None,
928 timeout: None,
929 match_all: false,
930 },
931 SequenceStep {
932 event_type: "B".to_string(),
933 filter: None,
934 alias: None,
935 timeout: None,
936 match_all: false,
937 },
938 ];
939
940 let mut tracker = SequenceTracker::new(steps, false);
941
942 tracker.process(&make_event("A", vec![("value", Value::Int(5))]));
944 assert_eq!(tracker.active_count(), 0);
945
946 tracker.process(&make_event("A", vec![("value", Value::Int(15))]));
948 assert_eq!(tracker.active_count(), 1);
949 }
950}