1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use serde::Deserialize;
4use serde_json::Value;
5
6use crate::config::parse_config;
7use crate::envelope::Envelope;
8use crate::pipeline::ErrorPolicy;
9use crate::transforms::{BasicTransform, MapOne, Transform};
10
11pub struct FilterTransform {
29 id: String,
30 predicate: Expr,
31}
32
33impl FilterTransform {
34 pub fn new(id: impl Into<String>, predicate: Expr) -> Self {
35 Self {
36 id: id.into(),
37 predicate,
38 }
39 }
40}
41
42#[async_trait]
43impl MapOne for FilterTransform {
44 fn id(&self) -> &str {
45 &self.id
46 }
47
48 async fn map(&self, env: Envelope) -> Result<Option<Envelope>> {
49 let keep = self.predicate.eval(&env)?;
50 Ok(if keep { Some(env) } else { None })
51 }
52}
53
54#[derive(Debug, Clone, PartialEq)]
59pub enum Expr {
60 Bool(bool),
61 Compare {
62 left: Box<Expr>,
63 op: CompareOp,
64 right: Box<Expr>,
65 },
66 And(Box<Expr>, Box<Expr>),
67 Or(Box<Expr>, Box<Expr>),
68 Not(Box<Expr>),
69 Exists(Path),
70 Path(Path),
71 Literal(Value),
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum CompareOp {
76 Eq,
77 Ne,
78 Lt,
79 Le,
80 Gt,
81 Ge,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Hash)]
86pub struct Path {
87 segments: Vec<String>,
88}
89
90impl Path {
91 pub fn from_dotted(s: &str) -> Self {
92 Self {
93 segments: s.split('.').map(String::from).collect(),
94 }
95 }
96
97 pub fn eval(&self, env: &Envelope) -> Result<Value> {
98 let mut current: Value = match self.segments.first().map(String::as_str) {
99 Some("payload") => env.payload.clone(),
100 Some("meta") => serde_json::to_value(&env.meta)?,
101 Some(other) => bail!("filter: unknown root '{}' in path", other),
102 None => bail!("filter: empty path"),
103 };
104
105 for segment in &self.segments[1..] {
106 match current {
107 Value::Object(map) => {
108 current = map.get(segment).cloned().unwrap_or(Value::Null);
109 }
110 _ => {
111 return Ok(Value::Null);
112 }
113 }
114 }
115 Ok(current)
116 }
117
118 pub fn exists(&self, env: &Envelope) -> bool {
119 if self.segments.is_empty() {
120 return false;
121 }
122 let mut current: Value = match self.segments.first().map(String::as_str) {
123 Some("payload") => env.payload.clone(),
124 Some("meta") => serde_json::to_value(&env.meta).unwrap_or(Value::Null),
125 Some(_) | None => return false,
126 };
127
128 for segment in &self.segments[1..] {
129 match current {
130 Value::Object(map) => {
131 current = match map.get(segment) {
132 Some(v) => v.clone(),
133 None => return false,
134 };
135 }
136 _ => return false,
137 }
138 }
139 true
140 }
141}
142
143impl Expr {
144 pub fn eval(&self, env: &Envelope) -> Result<bool> {
145 match self {
146 Expr::Bool(b) => Ok(*b),
147 Expr::Compare { left, op, right } => {
148 let lv = left.value(env)?;
149 let rv = right.value(env)?;
150 Ok(compare_values(&lv, *op, &rv))
151 }
152 Expr::And(a, b) => Ok(a.eval(env)? && b.eval(env)?),
153 Expr::Or(a, b) => Ok(a.eval(env)? || b.eval(env)?),
154 Expr::Not(e) => Ok(!e.eval(env)?),
155 Expr::Exists(path) => Ok(path.exists(env)),
156 Expr::Path(path) => Ok(truthy(&path.eval(env)?)),
157 Expr::Literal(v) => Ok(truthy(v)),
158 }
159 }
160
161 fn value(&self, env: &Envelope) -> Result<Value> {
162 match self {
163 Expr::Path(path) => path.eval(env),
164 Expr::Literal(v) => Ok(v.clone()),
165 Expr::Bool(b) => Ok(Value::Bool(*b)),
166 other => bail!("filter: expected path or literal, got {:?}", other),
167 }
168 }
169}
170
171fn compare_values(left: &Value, op: CompareOp, right: &Value) -> bool {
172 match op {
173 CompareOp::Eq => left == right,
174 CompareOp::Ne => left != right,
175 CompareOp::Lt | CompareOp::Le | CompareOp::Gt | CompareOp::Ge => {
176 match (as_f64(left), as_f64(right)) {
179 (Some(l), Some(r)) => match op {
180 CompareOp::Lt => l < r,
181 CompareOp::Le => l <= r,
182 CompareOp::Gt => l > r,
183 CompareOp::Ge => l >= r,
184 _ => unreachable!(),
185 },
186 (None, None) if left.is_string() && right.is_string() => {
187 let ls = left.as_str().unwrap();
188 let rs = right.as_str().unwrap();
189 match op {
190 CompareOp::Lt => ls < rs,
191 CompareOp::Le => ls <= rs,
192 CompareOp::Gt => ls > rs,
193 CompareOp::Ge => ls >= rs,
194 _ => unreachable!(),
195 }
196 }
197 _ => false,
198 }
199 }
200 }
201}
202
203fn as_f64(v: &Value) -> Option<f64> {
204 match v {
205 Value::Number(n) => n.as_f64(),
206 _ => None,
207 }
208}
209
210fn truthy(v: &Value) -> bool {
211 match v {
212 Value::Null => false,
213 Value::Bool(b) => *b,
214 Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
215 Value::String(s) => !s.is_empty(),
216 Value::Array(a) => !a.is_empty(),
217 Value::Object(o) => !o.is_empty(),
218 }
219}
220
221pub fn parse_predicate(input: &str) -> Result<Expr> {
226 let mut lexer = Lexer::new(input);
227 let expr = parse_expr(&mut lexer);
228 if let Some(err) = lexer.error {
229 return Err(err);
230 }
231 let expr = expr?;
232 if !matches!(lexer.peek(), Some(Token::Eof) | None) {
233 let rest: String = lexer.input[lexer.pos..].trim().into();
234 bail!("filter: unexpected trailing tokens: '{}'", rest);
235 }
236 Ok(expr)
237}
238
239#[derive(Debug, Clone, PartialEq)]
240enum Token {
241 Ident(String),
242 Str(String),
243 Number(serde_json::Number),
244 Bool(bool),
245 Null,
246 Eq,
247 Ne,
248 Lt,
249 Le,
250 Gt,
251 Ge,
252 And,
253 Or,
254 Not,
255 LParen,
256 RParen,
257 Dot,
258 Exists,
259 Eof,
260}
261
262struct Lexer<'a> {
263 input: &'a str,
264 pos: usize,
265 current: Option<Token>,
266 error: Option<anyhow::Error>,
267}
268
269impl<'a> Lexer<'a> {
270 fn new(input: &'a str) -> Self {
271 let mut l = Self {
272 input,
273 pos: 0,
274 current: None,
275 error: None,
276 };
277 l.advance();
278 l
279 }
280
281 fn advance(&mut self) {
282 if self.error.is_some() {
283 self.current = Some(Token::Eof);
284 return;
285 }
286 self.skip_whitespace();
287 if self.pos >= self.input.len() {
288 self.current = Some(Token::Eof);
289 return;
290 }
291 let ch = self.input[self.pos..].chars().next().unwrap();
292
293 match ch {
294 '(' => {
295 self.pos += 1;
296 self.current = Some(Token::LParen);
297 }
298 ')' => {
299 self.pos += 1;
300 self.current = Some(Token::RParen);
301 }
302 '.' => {
303 self.pos += 1;
304 self.current = Some(Token::Dot);
305 }
306 '!' => {
307 if self.peek_char() == Some('=') {
308 self.pos += 2;
309 self.current = Some(Token::Ne);
310 } else {
311 self.pos += 1;
312 self.current = Some(Token::Not);
313 }
314 }
315 '=' => {
316 if self.peek_char() == Some('=') {
317 self.pos += 2;
318 self.current = Some(Token::Eq);
319 } else {
320 self.error = Some(anyhow::anyhow!(
321 "filter: unexpected character '=' at position {} (did you mean '=='?)",
322 self.pos
323 ));
324 self.current = Some(Token::Eof);
325 }
326 }
327 '<' => {
328 if self.peek_char() == Some('=') {
329 self.pos += 2;
330 self.current = Some(Token::Le);
331 } else {
332 self.pos += 1;
333 self.current = Some(Token::Lt);
334 }
335 }
336 '>' => {
337 if self.peek_char() == Some('=') {
338 self.pos += 2;
339 self.current = Some(Token::Ge);
340 } else {
341 self.pos += 1;
342 self.current = Some(Token::Gt);
343 }
344 }
345 '&' => {
346 if self.peek_char() == Some('&') {
347 self.pos += 2;
348 self.current = Some(Token::And);
349 } else {
350 self.error = Some(anyhow::anyhow!(
351 "filter: unexpected character '&' at position {} (did you mean '&&'?)",
352 self.pos
353 ));
354 self.current = Some(Token::Eof);
355 }
356 }
357 '|' => {
358 if self.peek_char() == Some('|') {
359 self.pos += 2;
360 self.current = Some(Token::Or);
361 } else {
362 self.error = Some(anyhow::anyhow!(
363 "filter: unexpected character '|' at position {} (did you mean '||'?)",
364 self.pos
365 ));
366 self.current = Some(Token::Eof);
367 }
368 }
369 '"' | '\'' => {
370 let quote = ch;
371 self.pos += 1;
372 let mut s = String::new();
373 let mut closed = false;
374 while self.pos < self.input.len() {
375 let c = self.input[self.pos..].chars().next().unwrap();
376 if c == quote {
377 self.pos += 1;
378 closed = true;
379 break;
380 }
381 if c == '\\' {
382 self.pos += 1;
383 if self.pos >= self.input.len() {
384 break;
385 }
386 let escaped = self.input[self.pos..].chars().next().unwrap();
387 match escaped {
388 'n' => s.push('\n'),
389 'r' => s.push('\r'),
390 't' => s.push('\t'),
391 '\\' => s.push('\\'),
392 '"' => s.push('"'),
393 '\'' => s.push('\''),
394 other => s.push(other),
395 }
396 self.pos += escaped.len_utf8();
397 } else {
398 s.push(c);
399 self.pos += c.len_utf8();
400 }
401 }
402 if !closed {
403 self.error = Some(anyhow::anyhow!("filter: unterminated string literal"));
404 self.current = Some(Token::Eof);
405 } else {
406 self.current = Some(Token::Str(s));
407 }
408 }
409 c if c.is_ascii_digit() || c == '-' => {
410 let start = self.pos;
411 if c == '-' {
412 self.pos += 1;
413 }
414 while self.pos < self.input.len() {
415 let c = self.input[self.pos..].chars().next().unwrap();
416 if c.is_ascii_digit() || c == '.' {
417 self.pos += 1;
418 } else {
419 break;
420 }
421 }
422 let num_str = &self.input[start..self.pos];
423 match num_str.parse::<serde_json::Number>() {
424 Ok(num) => self.current = Some(Token::Number(num)),
425 Err(_) => {
426 self.error = Some(anyhow::anyhow!("filter: invalid number '{}'", num_str));
427 self.current = Some(Token::Eof);
428 }
429 }
430 }
431 c if c.is_alphabetic() || c == '_' => {
432 let start = self.pos;
433 while self.pos < self.input.len() {
434 let c = self.input[self.pos..].chars().next().unwrap();
435 if c.is_alphanumeric() || c == '_' {
436 self.pos += c.len_utf8();
437 } else {
438 break;
439 }
440 }
441 let ident = &self.input[start..self.pos];
442 let token = match ident {
443 "true" => Token::Bool(true),
444 "false" => Token::Bool(false),
445 "null" => Token::Null,
446 "exists" => Token::Exists,
447 _ => Token::Ident(ident.to_string()),
448 };
449 self.current = Some(token);
450 }
451 _ => {
452 self.error = Some(anyhow::anyhow!(
453 "filter: unexpected character '{}' at position {}",
454 ch,
455 self.pos
456 ));
457 self.current = Some(Token::Eof);
458 }
459 }
460 }
461
462 fn peek(&self) -> Option<&Token> {
463 self.current.as_ref()
464 }
465
466 fn next_token(&mut self) -> Option<Token> {
467 let t = self.current.take();
468 self.advance();
469 t
470 }
471
472 fn skip_whitespace(&mut self) {
473 while self.pos < self.input.len() {
474 let c = self.input[self.pos..].chars().next().unwrap();
475 if c.is_whitespace() {
476 self.pos += c.len_utf8();
477 } else {
478 break;
479 }
480 }
481 }
482
483 fn peek_char(&self) -> Option<char> {
484 self.input[self.pos + 1..].chars().next()
485 }
486}
487
488fn parse_expr(lexer: &mut Lexer) -> Result<Expr> {
489 parse_or(lexer)
490}
491
492fn parse_or(lexer: &mut Lexer) -> Result<Expr> {
493 let mut left = parse_and(lexer)?;
494 while let Some(Token::Or) = lexer.peek() {
495 lexer.next_token();
496 let right = parse_and(lexer)?;
497 left = Expr::Or(Box::new(left), Box::new(right));
498 }
499 Ok(left)
500}
501
502fn parse_and(lexer: &mut Lexer) -> Result<Expr> {
503 let mut left = parse_unary(lexer)?;
504 while let Some(Token::And) = lexer.peek() {
505 lexer.next_token();
506 let right = parse_unary(lexer)?;
507 left = Expr::And(Box::new(left), Box::new(right));
508 }
509 Ok(left)
510}
511
512fn parse_unary(lexer: &mut Lexer) -> Result<Expr> {
513 match lexer.peek() {
514 Some(Token::Not) => {
515 lexer.next_token();
516 let inner = parse_unary(lexer)?;
517 Ok(Expr::Not(Box::new(inner)))
518 }
519 Some(Token::Exists) => {
520 lexer.next_token();
521 let path = parse_path(lexer)?;
522 Ok(Expr::Exists(path))
523 }
524 _ => parse_comparison(lexer),
525 }
526}
527
528fn parse_comparison(lexer: &mut Lexer) -> Result<Expr> {
529 let left = parse_primary(lexer)?;
530 let op = match lexer.peek() {
531 Some(Token::Eq) => CompareOp::Eq,
532 Some(Token::Ne) => CompareOp::Ne,
533 Some(Token::Lt) => CompareOp::Lt,
534 Some(Token::Le) => CompareOp::Le,
535 Some(Token::Gt) => CompareOp::Gt,
536 Some(Token::Ge) => CompareOp::Ge,
537 _ => return Ok(left),
538 };
539 lexer.next_token();
540 let right = parse_primary(lexer)?;
541 Ok(Expr::Compare {
542 left: Box::new(left),
543 op,
544 right: Box::new(right),
545 })
546}
547
548fn parse_primary(lexer: &mut Lexer) -> Result<Expr> {
549 match lexer.peek().cloned() {
550 Some(Token::Bool(b)) => {
551 lexer.next_token();
552 Ok(Expr::Bool(b))
553 }
554 Some(Token::Null) => {
555 lexer.next_token();
556 Ok(Expr::Literal(Value::Null))
557 }
558 Some(Token::Str(s)) => {
559 lexer.next_token();
560 Ok(Expr::Literal(Value::String(s)))
561 }
562 Some(Token::Number(n)) => {
563 lexer.next_token();
564 Ok(Expr::Literal(Value::Number(n)))
565 }
566 Some(Token::Ident(_)) => {
567 let path = parse_path(lexer)?;
568 Ok(Expr::Path(path))
569 }
570 Some(Token::LParen) => {
571 lexer.next_token();
572 let inner = parse_expr(lexer)?;
573 match lexer.peek() {
574 Some(Token::RParen) => {
575 lexer.next_token();
576 Ok(inner)
577 }
578 _ => bail!("filter: expected ')'"),
579 }
580 }
581 other => bail!("filter: unexpected token {:?}", other),
582 }
583}
584
585fn parse_path(lexer: &mut Lexer) -> Result<Path> {
586 let mut segments = Vec::new();
587 while let Some(Token::Ident(name)) = lexer.peek().cloned() {
588 lexer.next_token();
589 segments.push(name);
590 match lexer.peek() {
591 Some(Token::Dot) => {
592 lexer.next_token();
593 }
594 _ => break,
595 }
596 }
597 if segments.is_empty() {
598 bail!("filter: expected identifier in path");
599 }
600 Ok(Path { segments })
601}
602
603#[derive(Debug, Deserialize)]
608struct FilterTransformConfig {
609 predicate: String,
610}
611
612pub fn filter_transform_factory(
615 id: &str,
616 config: Value,
617 on_error: ErrorPolicy,
618) -> Result<Box<dyn Transform>> {
619 let config: FilterTransformConfig = parse_config("filter", config)?;
620 let predicate = parse_predicate(&config.predicate)?;
621 Ok(Box::new(
622 BasicTransform::new(FilterTransform::new(id, predicate)).with_error_policy(on_error),
623 ))
624}
625
626#[cfg(test)]
627mod tests {
628 use serde_json::json;
629
630 use super::*;
631 use crate::Registry;
632 use crate::config::{ErrorPolicyConfig, TransformSpec};
633 use crate::envelope::Envelope;
634
635 #[tokio::test]
636 async fn keeps_matching_envelope() {
637 let predicate = parse_predicate("payload.status == \"ok\"").unwrap();
638 let t = FilterTransform::new("t", predicate);
639 let env = Envelope::new("src", json!({ "status": "ok" }));
640 assert!(t.map(env).await.unwrap().is_some());
641 }
642
643 #[tokio::test]
644 async fn drops_non_matching_envelope() {
645 let predicate = parse_predicate("payload.status == \"ok\"").unwrap();
646 let t = FilterTransform::new("t", predicate);
647 let env = Envelope::new("src", json!({ "status": "error" }));
648 assert!(t.map(env).await.unwrap().is_none());
649 }
650
651 #[tokio::test]
652 async fn filters_by_numeric_comparison() {
653 let predicate = parse_predicate("payload.score >= 0.5").unwrap();
654 let t = FilterTransform::new("t", predicate);
655 let env = Envelope::new("src", json!({ "score": 0.7 }));
656 assert!(t.map(env).await.unwrap().is_some());
657
658 let env = Envelope::new("src", json!({ "score": 0.3 }));
659 assert!(t.map(env).await.unwrap().is_none());
660 }
661
662 #[tokio::test]
663 async fn filters_by_meta_header() {
664 let predicate = parse_predicate("meta.headers.priority == \"high\"").unwrap();
665 let t = FilterTransform::new("t", predicate);
666 let mut env = Envelope::new("src", json!({}));
667 env.meta.headers.insert("priority".into(), "high".into());
668 assert!(t.map(env).await.unwrap().is_some());
669 }
670
671 #[tokio::test]
672 async fn logical_and_or() {
673 let predicate = parse_predicate("payload.a == 1 && payload.b == 2").unwrap();
674 let t = FilterTransform::new("t", predicate);
675 let env = Envelope::new("src", json!({ "a": 1, "b": 2 }));
676 assert!(t.map(env).await.unwrap().is_some());
677
678 let env = Envelope::new("src", json!({ "a": 1, "b": 3 }));
679 assert!(t.map(env).await.unwrap().is_none());
680 }
681
682 #[tokio::test]
683 async fn logical_not() {
684 let predicate = parse_predicate("!exists payload.skip").unwrap();
685 let t = FilterTransform::new("t", predicate);
686 let env = Envelope::new("src", json!({}));
687 assert!(t.map(env).await.unwrap().is_some());
688
689 let env = Envelope::new("src", json!({ "skip": true }));
690 assert!(t.map(env).await.unwrap().is_none());
691 }
692
693 #[tokio::test]
694 async fn null_equality() {
695 let predicate = parse_predicate("payload.missing == null").unwrap();
696 let t = FilterTransform::new("t", predicate);
697 let env = Envelope::new("src", json!({}));
698 assert!(t.map(env).await.unwrap().is_some());
699 }
700
701 #[tokio::test]
702 async fn grouping_parentheses() {
703 let predicate =
704 parse_predicate("(payload.a == 1 || payload.a == 2) && payload.b == 3").unwrap();
705 let t = FilterTransform::new("t", predicate);
706 let env = Envelope::new("src", json!({ "a": 2, "b": 3 }));
707 assert!(t.map(env).await.unwrap().is_some());
708
709 let env = Envelope::new("src", json!({ "a": 3, "b": 3 }));
710 assert!(t.map(env).await.unwrap().is_none());
711 }
712
713 #[test]
714 fn rejects_invalid_predicate_at_parse_time() {
715 let err = parse_predicate("payload.status == ").unwrap_err();
716 let msg = format!("{err:#}");
717 assert!(
718 msg.contains("unexpected token") || msg.contains("EOF"),
719 "{msg}"
720 );
721 }
722
723 #[test]
724 fn rejects_unknown_root() {
725 let predicate = parse_predicate("unknown.field == 1").unwrap();
726 let env = Envelope::new("src", json!({}));
727 let result = predicate.eval(&env);
728 assert!(result.is_err());
729 }
730
731 #[test]
732 fn factory_resolves_through_registry() {
733 let registry = Registry::with_builtins().unwrap();
734 registry
735 .build_transform(
736 "p/t0",
737 TransformSpec {
738 kind: "filter".into(),
739 config: json!({ "predicate": "payload.active == true" }),
740 on_error: Some(ErrorPolicyConfig::Drop),
741 },
742 )
743 .unwrap();
744 }
745
746 #[test]
747 fn factory_reports_invalid_config() {
748 let registry = Registry::with_builtins().unwrap();
749 let err = registry
750 .build_transform(
751 "p/t0",
752 TransformSpec {
753 kind: "filter".into(),
754 config: json!({ "wrong_field": "x" }),
755 on_error: None,
756 },
757 )
758 .err()
759 .expect("expected invalid-config error");
760 let msg = format!("{err:#}");
761 assert!(
762 msg.contains("invalid config for component type 'filter'"),
763 "{msg}",
764 );
765 }
766
767 #[test]
768 fn factory_rejects_malformed_predicate() {
769 let registry = Registry::with_builtins().unwrap();
770 let err = registry
771 .build_transform(
772 "p/t0",
773 TransformSpec {
774 kind: "filter".into(),
775 config: json!({ "predicate": "payload.status == " }),
776 on_error: None,
777 },
778 )
779 .err()
780 .expect("expected predicate parse error");
781 let msg = format!("{err:#}");
782 assert!(msg.contains("filter"), "{msg}");
783 }
784
785 #[test]
786 fn rejects_single_equals() {
787 let err = parse_predicate("payload.status = \"ok\"").unwrap_err();
788 let msg = format!("{err:#}");
789 assert!(msg.contains("'='") && msg.contains("'=='"), "{msg}");
790 }
791
792 #[test]
793 fn rejects_single_ampersand() {
794 let err = parse_predicate("payload.a & payload.b").unwrap_err();
795 let msg = format!("{err:#}");
796 assert!(msg.contains("'&'") && msg.contains("'&&'"), "{msg}");
797 }
798
799 #[test]
800 fn rejects_single_pipe() {
801 let err = parse_predicate("payload.a | payload.b").unwrap_err();
802 let msg = format!("{err:#}");
803 assert!(msg.contains("'|'") && msg.contains("'||'"), "{msg}");
804 }
805
806 #[tokio::test]
807 async fn relational_compare_with_missing_field_returns_false() {
808 let predicate = parse_predicate("payload.score >= 0.5").unwrap();
809 let t = FilterTransform::new("t", predicate);
810 let env = Envelope::new("src", json!({}));
812 assert!(t.map(env).await.unwrap().is_none());
813
814 let env = Envelope::new("src", json!({ "score": "high" }));
816 assert!(t.map(env).await.unwrap().is_none());
817 }
818
819 #[test]
820 fn rejects_unterminated_string_literal() {
821 let err = parse_predicate("payload.status == \"ok").unwrap_err();
822 let msg = format!("{err:#}");
823 assert!(msg.contains("unterminated string"), "{msg}");
824 }
825
826 #[tokio::test]
827 async fn parses_escaped_utf8_string_literal() {
828 let predicate = parse_predicate("payload.status == \"\\é\"").unwrap();
829 let t = FilterTransform::new("t", predicate);
830 let env = Envelope::new("src", json!({ "status": "é" }));
831 assert!(t.map(env).await.unwrap().is_some());
832 }
833
834 #[tokio::test]
835 async fn parses_utf8_identifier_segments() {
836 let predicate = parse_predicate("payload.café == 1").unwrap();
837 let t = FilterTransform::new("t", predicate);
838 let env = Envelope::new("src", json!({ "café": 1 }));
839 assert!(t.map(env).await.unwrap().is_some());
840 }
841
842 #[test]
843 fn skips_utf8_whitespace() {
844 parse_predicate("payload.status\u{00a0}==\u{00a0}\"ok\"").unwrap();
845 }
846}