1use std::collections::HashMap;
7
8#[derive(Debug, Clone, PartialEq)]
14pub struct StreamEvent {
15 pub key: String,
17 pub value: String,
19 pub topic: String,
21 pub timestamp_ms: u64,
23 pub headers: HashMap<String, String>,
25}
26
27impl StreamEvent {
28 pub fn new(key: &str, value: &str, topic: &str, timestamp_ms: u64) -> Self {
30 Self {
31 key: key.to_string(),
32 value: value.to_string(),
33 topic: topic.to_string(),
34 timestamp_ms,
35 headers: HashMap::new(),
36 }
37 }
38
39 pub fn with_header(
41 mut self,
42 header_key: impl Into<String>,
43 header_val: impl Into<String>,
44 ) -> Self {
45 self.headers.insert(header_key.into(), header_val.into());
46 self
47 }
48}
49
50#[derive(Debug, Clone)]
56pub enum FilterPredicate {
57 KeyEquals(String),
59 ValueContains(String),
61 TopicIs(String),
63 TimestampAfter(u64),
65 TimestampBefore(u64),
67 HeaderMatches(String, String),
69 And(Box<FilterPredicate>, Box<FilterPredicate>),
71 Or(Box<FilterPredicate>, Box<FilterPredicate>),
73 Not(Box<FilterPredicate>),
75}
76
77impl FilterPredicate {
78 pub fn matches(&self, event: &StreamEvent) -> bool {
80 match self {
81 FilterPredicate::KeyEquals(k) => &event.key == k,
82 FilterPredicate::ValueContains(sub) => event.value.contains(sub.as_str()),
83 FilterPredicate::TopicIs(t) => &event.topic == t,
84 FilterPredicate::TimestampAfter(ts) => event.timestamp_ms >= *ts,
85 FilterPredicate::TimestampBefore(ts) => event.timestamp_ms <= *ts,
86 FilterPredicate::HeaderMatches(hk, hv) => event
87 .headers
88 .get(hk.as_str())
89 .map(|v| v == hv)
90 .unwrap_or(false),
91 FilterPredicate::And(left, right) => left.matches(event) && right.matches(event),
92 FilterPredicate::Or(left, right) => left.matches(event) || right.matches(event),
93 FilterPredicate::Not(inner) => !inner.matches(event),
94 }
95 }
96
97 pub fn and(self, other: FilterPredicate) -> FilterPredicate {
99 FilterPredicate::And(Box::new(self), Box::new(other))
100 }
101
102 pub fn or(self, other: FilterPredicate) -> FilterPredicate {
104 FilterPredicate::Or(Box::new(self), Box::new(other))
105 }
106}
107
108impl std::ops::Not for FilterPredicate {
109 type Output = FilterPredicate;
110
111 fn not(self) -> FilterPredicate {
113 FilterPredicate::Not(Box::new(self))
114 }
115}
116
117#[derive(Debug, Clone, Default)]
125pub struct EventFilter;
126
127impl EventFilter {
128 pub fn new() -> Self {
130 Self
131 }
132
133 pub fn filter<'a>(
135 &self,
136 events: &'a [StreamEvent],
137 predicate: &FilterPredicate,
138 ) -> Vec<&'a StreamEvent> {
139 events.iter().filter(|e| predicate.matches(e)).collect()
140 }
141
142 pub fn count(&self, events: &[StreamEvent], predicate: &FilterPredicate) -> usize {
144 events.iter().filter(|e| predicate.matches(e)).count()
145 }
146
147 pub fn any(&self, events: &[StreamEvent], predicate: &FilterPredicate) -> bool {
149 events.iter().any(|e| predicate.matches(e))
150 }
151
152 pub fn all(&self, events: &[StreamEvent], predicate: &FilterPredicate) -> bool {
156 events.iter().all(|e| predicate.matches(e))
157 }
158}
159
160#[cfg(test)]
165mod tests {
166 use super::*;
167 use std::ops::Not;
168
169 fn make_event(key: &str, value: &str, topic: &str, ts: u64) -> StreamEvent {
172 StreamEvent::new(key, value, topic, ts)
173 }
174
175 fn make_event_with_header(
176 key: &str,
177 value: &str,
178 topic: &str,
179 ts: u64,
180 hk: &str,
181 hv: &str,
182 ) -> StreamEvent {
183 StreamEvent::new(key, value, topic, ts).with_header(hk, hv)
184 }
185
186 fn sample_events() -> Vec<StreamEvent> {
187 vec![
188 make_event("k1", "hello world", "topic-a", 1000),
189 make_event("k2", "foo bar", "topic-b", 2000),
190 make_event("k1", "hello again", "topic-a", 3000),
191 make_event("k3", "testing", "topic-c", 4000),
192 make_event_with_header("k4", "header test", "topic-d", 5000, "x-type", "rdf"),
193 ]
194 }
195
196 #[test]
199 fn test_stream_event_new_fields() {
200 let e = StreamEvent::new("mykey", "myvalue", "mytopic", 9999);
201 assert_eq!(e.key, "mykey");
202 assert_eq!(e.value, "myvalue");
203 assert_eq!(e.topic, "mytopic");
204 assert_eq!(e.timestamp_ms, 9999);
205 assert!(e.headers.is_empty());
206 }
207
208 #[test]
209 fn test_stream_event_with_header() {
210 let e = StreamEvent::new("k", "v", "t", 1).with_header("content-type", "json");
211 assert_eq!(
212 e.headers.get("content-type").map(|s| s.as_str()),
213 Some("json")
214 );
215 }
216
217 #[test]
218 fn test_stream_event_multiple_headers() {
219 let e = StreamEvent::new("k", "v", "t", 1)
220 .with_header("a", "1")
221 .with_header("b", "2");
222 assert_eq!(e.headers.len(), 2);
223 }
224
225 #[test]
226 fn test_stream_event_equality() {
227 let e1 = make_event("k", "v", "t", 10);
228 let e2 = make_event("k", "v", "t", 10);
229 assert_eq!(e1, e2);
230 }
231
232 #[test]
235 fn test_key_equals_match() {
236 let e = make_event("my-key", "val", "topic", 0);
237 assert!(FilterPredicate::KeyEquals("my-key".to_string()).matches(&e));
238 }
239
240 #[test]
241 fn test_key_equals_no_match() {
242 let e = make_event("other-key", "val", "topic", 0);
243 assert!(!FilterPredicate::KeyEquals("my-key".to_string()).matches(&e));
244 }
245
246 #[test]
247 fn test_key_equals_case_sensitive() {
248 let e = make_event("Key", "val", "topic", 0);
249 assert!(!FilterPredicate::KeyEquals("key".to_string()).matches(&e));
250 }
251
252 #[test]
255 fn test_value_contains_match() {
256 let e = make_event("k", "hello world", "t", 0);
257 assert!(FilterPredicate::ValueContains("hello".to_string()).matches(&e));
258 }
259
260 #[test]
261 fn test_value_contains_no_match() {
262 let e = make_event("k", "hello world", "t", 0);
263 assert!(!FilterPredicate::ValueContains("xyz".to_string()).matches(&e));
264 }
265
266 #[test]
267 fn test_value_contains_exact_match() {
268 let e = make_event("k", "exact", "t", 0);
269 assert!(FilterPredicate::ValueContains("exact".to_string()).matches(&e));
270 }
271
272 #[test]
273 fn test_value_contains_empty_substring() {
274 let e = make_event("k", "anything", "t", 0);
276 assert!(FilterPredicate::ValueContains(String::new()).matches(&e));
277 }
278
279 #[test]
282 fn test_topic_is_match() {
283 let e = make_event("k", "v", "events.rdf", 0);
284 assert!(FilterPredicate::TopicIs("events.rdf".to_string()).matches(&e));
285 }
286
287 #[test]
288 fn test_topic_is_no_match() {
289 let e = make_event("k", "v", "events.rdf", 0);
290 assert!(!FilterPredicate::TopicIs("other".to_string()).matches(&e));
291 }
292
293 #[test]
296 fn test_timestamp_after_match() {
297 let e = make_event("k", "v", "t", 5000);
298 assert!(FilterPredicate::TimestampAfter(4999).matches(&e));
299 }
300
301 #[test]
302 fn test_timestamp_after_equal_is_match() {
303 let e = make_event("k", "v", "t", 5000);
304 assert!(FilterPredicate::TimestampAfter(5000).matches(&e));
305 }
306
307 #[test]
308 fn test_timestamp_after_no_match() {
309 let e = make_event("k", "v", "t", 3000);
310 assert!(!FilterPredicate::TimestampAfter(5000).matches(&e));
311 }
312
313 #[test]
316 fn test_timestamp_before_match() {
317 let e = make_event("k", "v", "t", 1000);
318 assert!(FilterPredicate::TimestampBefore(2000).matches(&e));
319 }
320
321 #[test]
322 fn test_timestamp_before_equal_is_match() {
323 let e = make_event("k", "v", "t", 2000);
324 assert!(FilterPredicate::TimestampBefore(2000).matches(&e));
325 }
326
327 #[test]
328 fn test_timestamp_before_no_match() {
329 let e = make_event("k", "v", "t", 9000);
330 assert!(!FilterPredicate::TimestampBefore(2000).matches(&e));
331 }
332
333 #[test]
336 fn test_header_matches_match() {
337 let e = make_event_with_header("k", "v", "t", 0, "x-source", "sensor-1");
338 assert!(
339 FilterPredicate::HeaderMatches("x-source".to_string(), "sensor-1".to_string())
340 .matches(&e)
341 );
342 }
343
344 #[test]
345 fn test_header_matches_wrong_value() {
346 let e = make_event_with_header("k", "v", "t", 0, "x-source", "sensor-1");
347 assert!(
348 !FilterPredicate::HeaderMatches("x-source".to_string(), "sensor-2".to_string())
349 .matches(&e)
350 );
351 }
352
353 #[test]
354 fn test_header_matches_missing_key() {
355 let e = make_event("k", "v", "t", 0);
356 assert!(
357 !FilterPredicate::HeaderMatches("x-source".to_string(), "anything".to_string())
358 .matches(&e)
359 );
360 }
361
362 #[test]
365 fn test_and_both_true() {
366 let e = make_event("k1", "hello world", "topic-a", 1500);
367 let pred = FilterPredicate::KeyEquals("k1".to_string())
368 .and(FilterPredicate::ValueContains("hello".to_string()));
369 assert!(pred.matches(&e));
370 }
371
372 #[test]
373 fn test_and_first_false() {
374 let e = make_event("k2", "hello world", "topic-a", 1500);
375 let pred = FilterPredicate::KeyEquals("k1".to_string())
376 .and(FilterPredicate::ValueContains("hello".to_string()));
377 assert!(!pred.matches(&e));
378 }
379
380 #[test]
381 fn test_and_second_false() {
382 let e = make_event("k1", "goodbye world", "topic-a", 1500);
383 let pred = FilterPredicate::KeyEquals("k1".to_string())
384 .and(FilterPredicate::ValueContains("hello".to_string()));
385 assert!(!pred.matches(&e));
386 }
387
388 #[test]
389 fn test_and_both_false() {
390 let e = make_event("k2", "goodbye world", "topic-a", 1500);
391 let pred = FilterPredicate::KeyEquals("k1".to_string())
392 .and(FilterPredicate::ValueContains("hello".to_string()));
393 assert!(!pred.matches(&e));
394 }
395
396 #[test]
399 fn test_or_first_true() {
400 let e = make_event("k1", "no match", "topic-a", 0);
401 let pred = FilterPredicate::KeyEquals("k1".to_string())
402 .or(FilterPredicate::ValueContains("hello".to_string()));
403 assert!(pred.matches(&e));
404 }
405
406 #[test]
407 fn test_or_second_true() {
408 let e = make_event("k2", "hello there", "topic-a", 0);
409 let pred = FilterPredicate::KeyEquals("k1".to_string())
410 .or(FilterPredicate::ValueContains("hello".to_string()));
411 assert!(pred.matches(&e));
412 }
413
414 #[test]
415 fn test_or_both_false() {
416 let e = make_event("k2", "nothing", "topic-a", 0);
417 let pred = FilterPredicate::KeyEquals("k1".to_string())
418 .or(FilterPredicate::ValueContains("hello".to_string()));
419 assert!(!pred.matches(&e));
420 }
421
422 #[test]
425 fn test_not_negates_match() {
426 let e = make_event("k1", "v", "t", 0);
427 let pred = FilterPredicate::KeyEquals("k1".to_string()).not();
428 assert!(!pred.matches(&e));
429 }
430
431 #[test]
432 fn test_not_negates_non_match() {
433 let e = make_event("k2", "v", "t", 0);
434 let pred = FilterPredicate::KeyEquals("k1".to_string()).not();
435 assert!(pred.matches(&e));
436 }
437
438 #[test]
441 fn test_nested_and_or() {
442 let e1 = make_event("k1", "v", "t", 2500);
444 let e2 = make_event("k2", "v", "t", 1500);
445 let e3 = make_event("k3", "v", "t", 3000);
446
447 let pred = FilterPredicate::KeyEquals("k1".to_string())
448 .or(FilterPredicate::KeyEquals("k2".to_string()))
449 .and(FilterPredicate::TimestampAfter(2000));
450
451 assert!(pred.matches(&e1)); assert!(!pred.matches(&e2)); assert!(!pred.matches(&e3)); }
455
456 #[test]
457 fn test_nested_not_and() {
458 let e_pass = make_event("k", "v", "topic-b", 2000);
460 let e_fail_topic = make_event("k", "v", "topic-a", 2000);
461 let e_fail_ts = make_event("k", "v", "topic-b", 5000);
462
463 let pred = FilterPredicate::TopicIs("topic-a".to_string())
464 .not()
465 .and(FilterPredicate::TimestampBefore(3000));
466
467 assert!(pred.matches(&e_pass));
468 assert!(!pred.matches(&e_fail_topic));
469 assert!(!pred.matches(&e_fail_ts));
470 }
471
472 #[test]
473 fn test_triple_nested_predicate() {
474 let e1 = make_event("k1", "hello there", "t", 0);
476 let e2 = make_event_with_header("k1", "data", "t", 0, "x-type", "rdf");
477 let e3 = make_event("k1", "data", "t", 0); let e4 = make_event("k2", "hello there", "t", 0); let pred = FilterPredicate::KeyEquals("k1".to_string()).and(
481 FilterPredicate::ValueContains("hello".to_string()).or(FilterPredicate::HeaderMatches(
482 "x-type".to_string(),
483 "rdf".to_string(),
484 )),
485 );
486
487 assert!(pred.matches(&e1));
488 assert!(pred.matches(&e2));
489 assert!(!pred.matches(&e3));
490 assert!(!pred.matches(&e4));
491 }
492
493 #[test]
496 fn test_filter_returns_matching_subset() {
497 let events = sample_events();
498 let ef = EventFilter::new();
499 let pred = FilterPredicate::TopicIs("topic-a".to_string());
500 let result = ef.filter(&events, &pred);
501 assert_eq!(result.len(), 2);
502 assert!(result.iter().all(|e| e.topic == "topic-a"));
503 }
504
505 #[test]
506 fn test_filter_empty_input() {
507 let ef = EventFilter::new();
508 let pred = FilterPredicate::KeyEquals("anything".to_string());
509 let result = ef.filter(&[], &pred);
510 assert!(result.is_empty());
511 }
512
513 #[test]
514 fn test_filter_no_matches() {
515 let events = sample_events();
516 let ef = EventFilter::new();
517 let pred = FilterPredicate::KeyEquals("nonexistent".to_string());
518 let result = ef.filter(&events, &pred);
519 assert!(result.is_empty());
520 }
521
522 #[test]
523 fn test_filter_all_match() {
524 let events = sample_events();
525 let ef = EventFilter::new();
526 let pred = FilterPredicate::TimestampAfter(0);
527 let result = ef.filter(&events, &pred);
528 assert_eq!(result.len(), events.len());
529 }
530
531 #[test]
532 fn test_filter_returns_references_to_original() {
533 let events = sample_events();
534 let ef = EventFilter::new();
535 let pred = FilterPredicate::KeyEquals("k1".to_string());
536 let result = ef.filter(&events, &pred);
537 assert_eq!(result.len(), 2);
538 assert_eq!(result[0].key, "k1");
540 assert_eq!(result[1].key, "k1");
541 }
542
543 #[test]
546 fn test_count_matching_events() {
547 let events = sample_events();
548 let ef = EventFilter::new();
549 let pred = FilterPredicate::KeyEquals("k1".to_string());
550 assert_eq!(ef.count(&events, &pred), 2);
551 }
552
553 #[test]
554 fn test_count_empty_input() {
555 let ef = EventFilter::new();
556 let pred = FilterPredicate::KeyEquals("k".to_string());
557 assert_eq!(ef.count(&[], &pred), 0);
558 }
559
560 #[test]
561 fn test_count_zero_matches() {
562 let events = sample_events();
563 let ef = EventFilter::new();
564 let pred = FilterPredicate::KeyEquals("ghost".to_string());
565 assert_eq!(ef.count(&events, &pred), 0);
566 }
567
568 #[test]
569 fn test_count_all_match() {
570 let events = vec![
571 make_event("k", "v", "t", 100),
572 make_event("k", "v", "t", 200),
573 make_event("k", "v", "t", 300),
574 ];
575 let ef = EventFilter::new();
576 let pred = FilterPredicate::TopicIs("t".to_string());
577 assert_eq!(ef.count(&events, &pred), 3);
578 }
579
580 #[test]
583 fn test_any_true_when_one_matches() {
584 let events = sample_events();
585 let ef = EventFilter::new();
586 let pred = FilterPredicate::TopicIs("topic-c".to_string());
587 assert!(ef.any(&events, &pred));
588 }
589
590 #[test]
591 fn test_any_false_when_none_match() {
592 let events = sample_events();
593 let ef = EventFilter::new();
594 let pred = FilterPredicate::TopicIs("topic-z".to_string());
595 assert!(!ef.any(&events, &pred));
596 }
597
598 #[test]
599 fn test_any_empty_returns_false() {
600 let ef = EventFilter::new();
601 let pred = FilterPredicate::KeyEquals("k".to_string());
602 assert!(!ef.any(&[], &pred));
603 }
604
605 #[test]
608 fn test_all_true_when_all_match() {
609 let events = vec![
610 make_event("k", "hello", "t", 100),
611 make_event("k", "hello there", "t", 200),
612 ];
613 let ef = EventFilter::new();
614 let pred = FilterPredicate::ValueContains("hello".to_string());
615 assert!(ef.all(&events, &pred));
616 }
617
618 #[test]
619 fn test_all_false_when_one_fails() {
620 let events = vec![
621 make_event("k", "hello", "t", 100),
622 make_event("k", "goodbye", "t", 200),
623 ];
624 let ef = EventFilter::new();
625 let pred = FilterPredicate::ValueContains("hello".to_string());
626 assert!(!ef.all(&events, &pred));
627 }
628
629 #[test]
630 fn test_all_empty_returns_true() {
631 let ef = EventFilter::new();
632 let pred = FilterPredicate::KeyEquals("impossible".to_string());
633 assert!(ef.all(&[], &pred));
634 }
635
636 #[test]
639 fn test_timestamp_window_filter() {
640 let events = sample_events(); let ef = EventFilter::new();
642 let pred =
643 FilterPredicate::TimestampAfter(2000).and(FilterPredicate::TimestampBefore(4000));
644 let result = ef.filter(&events, &pred);
645 assert_eq!(result.len(), 3); for e in &result {
647 assert!(e.timestamp_ms >= 2000 && e.timestamp_ms <= 4000);
648 }
649 }
650
651 #[test]
654 fn test_event_filter_default() {
655 let ef = EventFilter;
656 let events = vec![make_event("k", "v", "t", 0)];
657 let pred = FilterPredicate::KeyEquals("k".to_string());
658 assert_eq!(ef.count(&events, &pred), 1);
659 }
660
661 #[test]
664 fn test_header_matches_with_multiple_headers() {
665 let e = make_event_with_header("k", "v", "t", 0, "h1", "v1")
666 .with_header("h2", "v2")
667 .with_header("h3", "v3");
668 assert!(FilterPredicate::HeaderMatches("h2".to_string(), "v2".to_string()).matches(&e));
669 assert!(!FilterPredicate::HeaderMatches("h2".to_string(), "wrong".to_string()).matches(&e));
670 }
671
672 #[test]
675 fn test_topic_and_key_combined() {
676 let events = sample_events();
677 let ef = EventFilter::new();
678 let pred = FilterPredicate::TopicIs("topic-a".to_string())
679 .and(FilterPredicate::KeyEquals("k1".to_string()));
680 let result = ef.filter(&events, &pred);
681 assert_eq!(result.len(), 2);
682 }
683}