Skip to main content

courier/transforms/
filter.rs

1use anyhow::{Result, bail};
2use async_trait::async_trait;
3use serde::Deserialize;
4use serde_json::Value;
5
6use crate::config::parse_config;
7use crate::envelope::Envelope;
8use crate::pipeline::ErrorPolicy;
9use crate::transforms::{BasicTransform, MapOne, Transform};
10
11/// Drops envelopes that do not match a predicate.
12///
13/// The predicate is a small expression language evaluated against each
14/// envelope. Supported constructs:
15///
16/// - Field paths: `payload.userId`, `meta.headers.priority`, `meta.key`
17/// - Literals: strings (`"high"`), numbers (`42`, `3.14`), booleans (`true`, `false`), `null`
18/// - Comparison: `==`, `!=`, `<`, `<=`, `>`, `>=`
19/// - Logical: `&&` (and), `||` (or), `!` (not)
20/// - Grouping: `(expr)`
21/// - Existence: `exists payload.userId`
22///
23/// Examples:
24/// - `meta.headers.priority == "high"`
25/// - `payload.userId == 1`
26/// - `payload.score > 0.5 && meta.headers.env == "prod"`
27/// - `!exists payload.optionalField || payload.optionalField == null`
28pub struct FilterTransform {
29    id: String,
30    predicate: Expr,
31}
32
33impl FilterTransform {
34    pub fn new(id: impl Into<String>, predicate: Expr) -> Self {
35        Self {
36            id: id.into(),
37            predicate,
38        }
39    }
40}
41
42#[async_trait]
43impl MapOne for FilterTransform {
44    fn id(&self) -> &str {
45        &self.id
46    }
47
48    async fn map(&self, env: Envelope) -> Result<Option<Envelope>> {
49        let keep = self.predicate.eval(&env)?;
50        Ok(if keep { Some(env) } else { None })
51    }
52}
53
54// ------------------------------------------------------------------
55// Expression AST
56// ------------------------------------------------------------------
57
58#[derive(Debug, Clone, PartialEq)]
59pub enum Expr {
60    Bool(bool),
61    Compare {
62        left: Box<Expr>,
63        op: CompareOp,
64        right: Box<Expr>,
65    },
66    And(Box<Expr>, Box<Expr>),
67    Or(Box<Expr>, Box<Expr>),
68    Not(Box<Expr>),
69    Exists(Path),
70    Path(Path),
71    Literal(Value),
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum CompareOp {
76    Eq,
77    Ne,
78    Lt,
79    Le,
80    Gt,
81    Ge,
82}
83
84/// A dotted path like `payload.user.id`.
85#[derive(Debug, Clone, PartialEq, Eq, Hash)]
86pub struct Path {
87    segments: Vec<String>,
88}
89
90impl Path {
91    pub fn from_dotted(s: &str) -> Self {
92        Self {
93            segments: s.split('.').map(String::from).collect(),
94        }
95    }
96
97    pub fn eval(&self, env: &Envelope) -> Result<Value> {
98        let mut current: Value = match self.segments.first().map(String::as_str) {
99            Some("payload") => env.payload.clone(),
100            Some("meta") => serde_json::to_value(&env.meta)?,
101            Some(other) => bail!("filter: unknown root '{}' in path", other),
102            None => bail!("filter: empty path"),
103        };
104
105        for segment in &self.segments[1..] {
106            match current {
107                Value::Object(map) => {
108                    current = map.get(segment).cloned().unwrap_or(Value::Null);
109                }
110                _ => {
111                    return Ok(Value::Null);
112                }
113            }
114        }
115        Ok(current)
116    }
117
118    pub fn exists(&self, env: &Envelope) -> bool {
119        if self.segments.is_empty() {
120            return false;
121        }
122        let mut current: Value = match self.segments.first().map(String::as_str) {
123            Some("payload") => env.payload.clone(),
124            Some("meta") => serde_json::to_value(&env.meta).unwrap_or(Value::Null),
125            Some(_) | None => return false,
126        };
127
128        for segment in &self.segments[1..] {
129            match current {
130                Value::Object(map) => {
131                    current = match map.get(segment) {
132                        Some(v) => v.clone(),
133                        None => return false,
134                    };
135                }
136                _ => return false,
137            }
138        }
139        true
140    }
141}
142
143impl Expr {
144    pub fn eval(&self, env: &Envelope) -> Result<bool> {
145        match self {
146            Expr::Bool(b) => Ok(*b),
147            Expr::Compare { left, op, right } => {
148                let lv = left.value(env)?;
149                let rv = right.value(env)?;
150                Ok(compare_values(&lv, *op, &rv))
151            }
152            Expr::And(a, b) => Ok(a.eval(env)? && b.eval(env)?),
153            Expr::Or(a, b) => Ok(a.eval(env)? || b.eval(env)?),
154            Expr::Not(e) => Ok(!e.eval(env)?),
155            Expr::Exists(path) => Ok(path.exists(env)),
156            Expr::Path(path) => Ok(truthy(&path.eval(env)?)),
157            Expr::Literal(v) => Ok(truthy(v)),
158        }
159    }
160
161    fn value(&self, env: &Envelope) -> Result<Value> {
162        match self {
163            Expr::Path(path) => path.eval(env),
164            Expr::Literal(v) => Ok(v.clone()),
165            Expr::Bool(b) => Ok(Value::Bool(*b)),
166            other => bail!("filter: expected path or literal, got {:?}", other),
167        }
168    }
169}
170
171fn compare_values(left: &Value, op: CompareOp, right: &Value) -> bool {
172    match op {
173        CompareOp::Eq => left == right,
174        CompareOp::Ne => left != right,
175        CompareOp::Lt | CompareOp::Le | CompareOp::Gt | CompareOp::Ge => {
176            // Relational comparisons only match when both sides are comparable.
177            // Null (missing field) or mixed types are treated as non-matches.
178            match (as_f64(left), as_f64(right)) {
179                (Some(l), Some(r)) => match op {
180                    CompareOp::Lt => l < r,
181                    CompareOp::Le => l <= r,
182                    CompareOp::Gt => l > r,
183                    CompareOp::Ge => l >= r,
184                    _ => unreachable!(),
185                },
186                (None, None) if left.is_string() && right.is_string() => {
187                    let ls = left.as_str().unwrap();
188                    let rs = right.as_str().unwrap();
189                    match op {
190                        CompareOp::Lt => ls < rs,
191                        CompareOp::Le => ls <= rs,
192                        CompareOp::Gt => ls > rs,
193                        CompareOp::Ge => ls >= rs,
194                        _ => unreachable!(),
195                    }
196                }
197                _ => false,
198            }
199        }
200    }
201}
202
203fn as_f64(v: &Value) -> Option<f64> {
204    match v {
205        Value::Number(n) => n.as_f64(),
206        _ => None,
207    }
208}
209
210fn truthy(v: &Value) -> bool {
211    match v {
212        Value::Null => false,
213        Value::Bool(b) => *b,
214        Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
215        Value::String(s) => !s.is_empty(),
216        Value::Array(a) => !a.is_empty(),
217        Value::Object(o) => !o.is_empty(),
218    }
219}
220
221// ------------------------------------------------------------------
222// Parser
223// ------------------------------------------------------------------
224
225pub fn parse_predicate(input: &str) -> Result<Expr> {
226    let mut lexer = Lexer::new(input);
227    let expr = parse_expr(&mut lexer);
228    if let Some(err) = lexer.error {
229        return Err(err);
230    }
231    let expr = expr?;
232    if !matches!(lexer.peek(), Some(Token::Eof) | None) {
233        let rest: String = lexer.input[lexer.pos..].trim().into();
234        bail!("filter: unexpected trailing tokens: '{}'", rest);
235    }
236    Ok(expr)
237}
238
239#[derive(Debug, Clone, PartialEq)]
240enum Token {
241    Ident(String),
242    Str(String),
243    Number(serde_json::Number),
244    Bool(bool),
245    Null,
246    Eq,
247    Ne,
248    Lt,
249    Le,
250    Gt,
251    Ge,
252    And,
253    Or,
254    Not,
255    LParen,
256    RParen,
257    Dot,
258    Exists,
259    Eof,
260}
261
262struct Lexer<'a> {
263    input: &'a str,
264    pos: usize,
265    current: Option<Token>,
266    error: Option<anyhow::Error>,
267}
268
269impl<'a> Lexer<'a> {
270    fn new(input: &'a str) -> Self {
271        let mut l = Self {
272            input,
273            pos: 0,
274            current: None,
275            error: None,
276        };
277        l.advance();
278        l
279    }
280
281    fn advance(&mut self) {
282        if self.error.is_some() {
283            self.current = Some(Token::Eof);
284            return;
285        }
286        self.skip_whitespace();
287        if self.pos >= self.input.len() {
288            self.current = Some(Token::Eof);
289            return;
290        }
291        let ch = self.input[self.pos..].chars().next().unwrap();
292
293        match ch {
294            '(' => {
295                self.pos += 1;
296                self.current = Some(Token::LParen);
297            }
298            ')' => {
299                self.pos += 1;
300                self.current = Some(Token::RParen);
301            }
302            '.' => {
303                self.pos += 1;
304                self.current = Some(Token::Dot);
305            }
306            '!' => {
307                if self.peek_char() == Some('=') {
308                    self.pos += 2;
309                    self.current = Some(Token::Ne);
310                } else {
311                    self.pos += 1;
312                    self.current = Some(Token::Not);
313                }
314            }
315            '=' => {
316                if self.peek_char() == Some('=') {
317                    self.pos += 2;
318                    self.current = Some(Token::Eq);
319                } else {
320                    self.error = Some(anyhow::anyhow!(
321                        "filter: unexpected character '=' at position {} (did you mean '=='?)",
322                        self.pos
323                    ));
324                    self.current = Some(Token::Eof);
325                }
326            }
327            '<' => {
328                if self.peek_char() == Some('=') {
329                    self.pos += 2;
330                    self.current = Some(Token::Le);
331                } else {
332                    self.pos += 1;
333                    self.current = Some(Token::Lt);
334                }
335            }
336            '>' => {
337                if self.peek_char() == Some('=') {
338                    self.pos += 2;
339                    self.current = Some(Token::Ge);
340                } else {
341                    self.pos += 1;
342                    self.current = Some(Token::Gt);
343                }
344            }
345            '&' => {
346                if self.peek_char() == Some('&') {
347                    self.pos += 2;
348                    self.current = Some(Token::And);
349                } else {
350                    self.error = Some(anyhow::anyhow!(
351                        "filter: unexpected character '&' at position {} (did you mean '&&'?)",
352                        self.pos
353                    ));
354                    self.current = Some(Token::Eof);
355                }
356            }
357            '|' => {
358                if self.peek_char() == Some('|') {
359                    self.pos += 2;
360                    self.current = Some(Token::Or);
361                } else {
362                    self.error = Some(anyhow::anyhow!(
363                        "filter: unexpected character '|' at position {} (did you mean '||'?)",
364                        self.pos
365                    ));
366                    self.current = Some(Token::Eof);
367                }
368            }
369            '"' | '\'' => {
370                let quote = ch;
371                self.pos += 1;
372                let mut s = String::new();
373                let mut closed = false;
374                while self.pos < self.input.len() {
375                    let c = self.input[self.pos..].chars().next().unwrap();
376                    if c == quote {
377                        self.pos += 1;
378                        closed = true;
379                        break;
380                    }
381                    if c == '\\' {
382                        self.pos += 1;
383                        if self.pos >= self.input.len() {
384                            break;
385                        }
386                        let escaped = self.input[self.pos..].chars().next().unwrap();
387                        match escaped {
388                            'n' => s.push('\n'),
389                            'r' => s.push('\r'),
390                            't' => s.push('\t'),
391                            '\\' => s.push('\\'),
392                            '"' => s.push('"'),
393                            '\'' => s.push('\''),
394                            other => s.push(other),
395                        }
396                        self.pos += escaped.len_utf8();
397                    } else {
398                        s.push(c);
399                        self.pos += c.len_utf8();
400                    }
401                }
402                if !closed {
403                    self.error = Some(anyhow::anyhow!("filter: unterminated string literal"));
404                    self.current = Some(Token::Eof);
405                } else {
406                    self.current = Some(Token::Str(s));
407                }
408            }
409            c if c.is_ascii_digit() || c == '-' => {
410                let start = self.pos;
411                if c == '-' {
412                    self.pos += 1;
413                }
414                while self.pos < self.input.len() {
415                    let c = self.input[self.pos..].chars().next().unwrap();
416                    if c.is_ascii_digit() || c == '.' {
417                        self.pos += 1;
418                    } else {
419                        break;
420                    }
421                }
422                let num_str = &self.input[start..self.pos];
423                match num_str.parse::<serde_json::Number>() {
424                    Ok(num) => self.current = Some(Token::Number(num)),
425                    Err(_) => {
426                        self.error = Some(anyhow::anyhow!("filter: invalid number '{}'", num_str));
427                        self.current = Some(Token::Eof);
428                    }
429                }
430            }
431            c if c.is_alphabetic() || c == '_' => {
432                let start = self.pos;
433                while self.pos < self.input.len() {
434                    let c = self.input[self.pos..].chars().next().unwrap();
435                    if c.is_alphanumeric() || c == '_' {
436                        self.pos += c.len_utf8();
437                    } else {
438                        break;
439                    }
440                }
441                let ident = &self.input[start..self.pos];
442                let token = match ident {
443                    "true" => Token::Bool(true),
444                    "false" => Token::Bool(false),
445                    "null" => Token::Null,
446                    "exists" => Token::Exists,
447                    _ => Token::Ident(ident.to_string()),
448                };
449                self.current = Some(token);
450            }
451            _ => {
452                self.error = Some(anyhow::anyhow!(
453                    "filter: unexpected character '{}' at position {}",
454                    ch,
455                    self.pos
456                ));
457                self.current = Some(Token::Eof);
458            }
459        }
460    }
461
462    fn peek(&self) -> Option<&Token> {
463        self.current.as_ref()
464    }
465
466    fn next_token(&mut self) -> Option<Token> {
467        let t = self.current.take();
468        self.advance();
469        t
470    }
471
472    fn skip_whitespace(&mut self) {
473        while self.pos < self.input.len() {
474            let c = self.input[self.pos..].chars().next().unwrap();
475            if c.is_whitespace() {
476                self.pos += c.len_utf8();
477            } else {
478                break;
479            }
480        }
481    }
482
483    fn peek_char(&self) -> Option<char> {
484        self.input[self.pos + 1..].chars().next()
485    }
486}
487
488fn parse_expr(lexer: &mut Lexer) -> Result<Expr> {
489    parse_or(lexer)
490}
491
492fn parse_or(lexer: &mut Lexer) -> Result<Expr> {
493    let mut left = parse_and(lexer)?;
494    while let Some(Token::Or) = lexer.peek() {
495        lexer.next_token();
496        let right = parse_and(lexer)?;
497        left = Expr::Or(Box::new(left), Box::new(right));
498    }
499    Ok(left)
500}
501
502fn parse_and(lexer: &mut Lexer) -> Result<Expr> {
503    let mut left = parse_unary(lexer)?;
504    while let Some(Token::And) = lexer.peek() {
505        lexer.next_token();
506        let right = parse_unary(lexer)?;
507        left = Expr::And(Box::new(left), Box::new(right));
508    }
509    Ok(left)
510}
511
512fn parse_unary(lexer: &mut Lexer) -> Result<Expr> {
513    match lexer.peek() {
514        Some(Token::Not) => {
515            lexer.next_token();
516            let inner = parse_unary(lexer)?;
517            Ok(Expr::Not(Box::new(inner)))
518        }
519        Some(Token::Exists) => {
520            lexer.next_token();
521            let path = parse_path(lexer)?;
522            Ok(Expr::Exists(path))
523        }
524        _ => parse_comparison(lexer),
525    }
526}
527
528fn parse_comparison(lexer: &mut Lexer) -> Result<Expr> {
529    let left = parse_primary(lexer)?;
530    let op = match lexer.peek() {
531        Some(Token::Eq) => CompareOp::Eq,
532        Some(Token::Ne) => CompareOp::Ne,
533        Some(Token::Lt) => CompareOp::Lt,
534        Some(Token::Le) => CompareOp::Le,
535        Some(Token::Gt) => CompareOp::Gt,
536        Some(Token::Ge) => CompareOp::Ge,
537        _ => return Ok(left),
538    };
539    lexer.next_token();
540    let right = parse_primary(lexer)?;
541    Ok(Expr::Compare {
542        left: Box::new(left),
543        op,
544        right: Box::new(right),
545    })
546}
547
548fn parse_primary(lexer: &mut Lexer) -> Result<Expr> {
549    match lexer.peek().cloned() {
550        Some(Token::Bool(b)) => {
551            lexer.next_token();
552            Ok(Expr::Bool(b))
553        }
554        Some(Token::Null) => {
555            lexer.next_token();
556            Ok(Expr::Literal(Value::Null))
557        }
558        Some(Token::Str(s)) => {
559            lexer.next_token();
560            Ok(Expr::Literal(Value::String(s)))
561        }
562        Some(Token::Number(n)) => {
563            lexer.next_token();
564            Ok(Expr::Literal(Value::Number(n)))
565        }
566        Some(Token::Ident(_)) => {
567            let path = parse_path(lexer)?;
568            Ok(Expr::Path(path))
569        }
570        Some(Token::LParen) => {
571            lexer.next_token();
572            let inner = parse_expr(lexer)?;
573            match lexer.peek() {
574                Some(Token::RParen) => {
575                    lexer.next_token();
576                    Ok(inner)
577                }
578                _ => bail!("filter: expected ')'"),
579            }
580        }
581        other => bail!("filter: unexpected token {:?}", other),
582    }
583}
584
585fn parse_path(lexer: &mut Lexer) -> Result<Path> {
586    let mut segments = Vec::new();
587    while let Some(Token::Ident(name)) = lexer.peek().cloned() {
588        lexer.next_token();
589        segments.push(name);
590        match lexer.peek() {
591            Some(Token::Dot) => {
592                lexer.next_token();
593            }
594            _ => break,
595        }
596    }
597    if segments.is_empty() {
598        bail!("filter: expected identifier in path");
599    }
600    Ok(Path { segments })
601}
602
603// ------------------------------------------------------------------
604// Config + Factory
605// ------------------------------------------------------------------
606
607#[derive(Debug, Deserialize)]
608struct FilterTransformConfig {
609    predicate: String,
610}
611
612/// Registry factory for [`FilterTransform`]. Registered by
613/// `courier::registry::register_builtin` under kind `"filter"`.
614pub fn filter_transform_factory(
615    id: &str,
616    config: Value,
617    on_error: ErrorPolicy,
618) -> Result<Box<dyn Transform>> {
619    let config: FilterTransformConfig = parse_config("filter", config)?;
620    let predicate = parse_predicate(&config.predicate)?;
621    Ok(Box::new(
622        BasicTransform::new(FilterTransform::new(id, predicate)).with_error_policy(on_error),
623    ))
624}
625
626#[cfg(test)]
627mod tests {
628    use serde_json::json;
629
630    use super::*;
631    use crate::Registry;
632    use crate::config::{ErrorPolicyConfig, TransformSpec};
633    use crate::envelope::Envelope;
634
635    #[tokio::test]
636    async fn keeps_matching_envelope() {
637        let predicate = parse_predicate("payload.status == \"ok\"").unwrap();
638        let t = FilterTransform::new("t", predicate);
639        let env = Envelope::new("src", json!({ "status": "ok" }));
640        assert!(t.map(env).await.unwrap().is_some());
641    }
642
643    #[tokio::test]
644    async fn drops_non_matching_envelope() {
645        let predicate = parse_predicate("payload.status == \"ok\"").unwrap();
646        let t = FilterTransform::new("t", predicate);
647        let env = Envelope::new("src", json!({ "status": "error" }));
648        assert!(t.map(env).await.unwrap().is_none());
649    }
650
651    #[tokio::test]
652    async fn filters_by_numeric_comparison() {
653        let predicate = parse_predicate("payload.score >= 0.5").unwrap();
654        let t = FilterTransform::new("t", predicate);
655        let env = Envelope::new("src", json!({ "score": 0.7 }));
656        assert!(t.map(env).await.unwrap().is_some());
657
658        let env = Envelope::new("src", json!({ "score": 0.3 }));
659        assert!(t.map(env).await.unwrap().is_none());
660    }
661
662    #[tokio::test]
663    async fn filters_by_meta_header() {
664        let predicate = parse_predicate("meta.headers.priority == \"high\"").unwrap();
665        let t = FilterTransform::new("t", predicate);
666        let mut env = Envelope::new("src", json!({}));
667        env.meta.headers.insert("priority".into(), "high".into());
668        assert!(t.map(env).await.unwrap().is_some());
669    }
670
671    #[tokio::test]
672    async fn logical_and_or() {
673        let predicate = parse_predicate("payload.a == 1 && payload.b == 2").unwrap();
674        let t = FilterTransform::new("t", predicate);
675        let env = Envelope::new("src", json!({ "a": 1, "b": 2 }));
676        assert!(t.map(env).await.unwrap().is_some());
677
678        let env = Envelope::new("src", json!({ "a": 1, "b": 3 }));
679        assert!(t.map(env).await.unwrap().is_none());
680    }
681
682    #[tokio::test]
683    async fn logical_not() {
684        let predicate = parse_predicate("!exists payload.skip").unwrap();
685        let t = FilterTransform::new("t", predicate);
686        let env = Envelope::new("src", json!({}));
687        assert!(t.map(env).await.unwrap().is_some());
688
689        let env = Envelope::new("src", json!({ "skip": true }));
690        assert!(t.map(env).await.unwrap().is_none());
691    }
692
693    #[tokio::test]
694    async fn null_equality() {
695        let predicate = parse_predicate("payload.missing == null").unwrap();
696        let t = FilterTransform::new("t", predicate);
697        let env = Envelope::new("src", json!({}));
698        assert!(t.map(env).await.unwrap().is_some());
699    }
700
701    #[tokio::test]
702    async fn grouping_parentheses() {
703        let predicate =
704            parse_predicate("(payload.a == 1 || payload.a == 2) && payload.b == 3").unwrap();
705        let t = FilterTransform::new("t", predicate);
706        let env = Envelope::new("src", json!({ "a": 2, "b": 3 }));
707        assert!(t.map(env).await.unwrap().is_some());
708
709        let env = Envelope::new("src", json!({ "a": 3, "b": 3 }));
710        assert!(t.map(env).await.unwrap().is_none());
711    }
712
713    #[test]
714    fn rejects_invalid_predicate_at_parse_time() {
715        let err = parse_predicate("payload.status == ").unwrap_err();
716        let msg = format!("{err:#}");
717        assert!(
718            msg.contains("unexpected token") || msg.contains("EOF"),
719            "{msg}"
720        );
721    }
722
723    #[test]
724    fn rejects_unknown_root() {
725        let predicate = parse_predicate("unknown.field == 1").unwrap();
726        let env = Envelope::new("src", json!({}));
727        let result = predicate.eval(&env);
728        assert!(result.is_err());
729    }
730
731    #[test]
732    fn factory_resolves_through_registry() {
733        let registry = Registry::with_builtins().unwrap();
734        registry
735            .build_transform(
736                "p/t0",
737                TransformSpec {
738                    kind: "filter".into(),
739                    config: json!({ "predicate": "payload.active == true" }),
740                    on_error: Some(ErrorPolicyConfig::Drop),
741                },
742            )
743            .unwrap();
744    }
745
746    #[test]
747    fn factory_reports_invalid_config() {
748        let registry = Registry::with_builtins().unwrap();
749        let err = registry
750            .build_transform(
751                "p/t0",
752                TransformSpec {
753                    kind: "filter".into(),
754                    config: json!({ "wrong_field": "x" }),
755                    on_error: None,
756                },
757            )
758            .err()
759            .expect("expected invalid-config error");
760        let msg = format!("{err:#}");
761        assert!(
762            msg.contains("invalid config for component type 'filter'"),
763            "{msg}",
764        );
765    }
766
767    #[test]
768    fn factory_rejects_malformed_predicate() {
769        let registry = Registry::with_builtins().unwrap();
770        let err = registry
771            .build_transform(
772                "p/t0",
773                TransformSpec {
774                    kind: "filter".into(),
775                    config: json!({ "predicate": "payload.status == " }),
776                    on_error: None,
777                },
778            )
779            .err()
780            .expect("expected predicate parse error");
781        let msg = format!("{err:#}");
782        assert!(msg.contains("filter"), "{msg}");
783    }
784
785    #[test]
786    fn rejects_single_equals() {
787        let err = parse_predicate("payload.status = \"ok\"").unwrap_err();
788        let msg = format!("{err:#}");
789        assert!(msg.contains("'='") && msg.contains("'=='"), "{msg}");
790    }
791
792    #[test]
793    fn rejects_single_ampersand() {
794        let err = parse_predicate("payload.a & payload.b").unwrap_err();
795        let msg = format!("{err:#}");
796        assert!(msg.contains("'&'") && msg.contains("'&&'"), "{msg}");
797    }
798
799    #[test]
800    fn rejects_single_pipe() {
801        let err = parse_predicate("payload.a | payload.b").unwrap_err();
802        let msg = format!("{err:#}");
803        assert!(msg.contains("'|'") && msg.contains("'||'"), "{msg}");
804    }
805
806    #[tokio::test]
807    async fn relational_compare_with_missing_field_returns_false() {
808        let predicate = parse_predicate("payload.score >= 0.5").unwrap();
809        let t = FilterTransform::new("t", predicate);
810        // Missing field → Null → should NOT match (was previously "null" >= "0.5" = true)
811        let env = Envelope::new("src", json!({}));
812        assert!(t.map(env).await.unwrap().is_none());
813
814        // Present but wrong type (string vs number) → should NOT match
815        let env = Envelope::new("src", json!({ "score": "high" }));
816        assert!(t.map(env).await.unwrap().is_none());
817    }
818
819    #[test]
820    fn rejects_unterminated_string_literal() {
821        let err = parse_predicate("payload.status == \"ok").unwrap_err();
822        let msg = format!("{err:#}");
823        assert!(msg.contains("unterminated string"), "{msg}");
824    }
825
826    #[tokio::test]
827    async fn parses_escaped_utf8_string_literal() {
828        let predicate = parse_predicate("payload.status == \"\\é\"").unwrap();
829        let t = FilterTransform::new("t", predicate);
830        let env = Envelope::new("src", json!({ "status": "é" }));
831        assert!(t.map(env).await.unwrap().is_some());
832    }
833
834    #[tokio::test]
835    async fn parses_utf8_identifier_segments() {
836        let predicate = parse_predicate("payload.café == 1").unwrap();
837        let t = FilterTransform::new("t", predicate);
838        let env = Envelope::new("src", json!({ "café": 1 }));
839        assert!(t.map(env).await.unwrap().is_some());
840    }
841
842    #[test]
843    fn skips_utf8_whitespace() {
844        parse_predicate("payload.status\u{00a0}==\u{00a0}\"ok\"").unwrap();
845    }
846}