Skip to main content

nodedb_query/msgpack_scan/
filter.rs

1//! Binary filter evaluation on raw MessagePack documents.
2//!
3//! `ScanFilter::matches_binary(doc: &[u8])` evaluates a filter predicate
4//! directly on msgpack bytes without decoding to `serde_json::Value`.
5//! Uses `Value::eq_coerced`/`cmp_coerced` for type coercion — single
6//! source of truth shared with the JSON filter path.
7
8use std::cmp::Ordering;
9
10use crate::msgpack_scan::field::extract_field;
11use crate::msgpack_scan::index::FieldIndex;
12use crate::msgpack_scan::reader::{
13    array_header, map_header, read_null, read_str, read_value, skip_value,
14};
15use crate::scan_filter::like::sql_like_match;
16use crate::scan_filter::{FilterOp, ScanFilter};
17
18impl ScanFilter {
19    /// Evaluate this filter against a raw MessagePack document.
20    ///
21    /// Zero deserialization — extracts only the needed field bytes.
22    pub fn matches_binary(&self, doc: &[u8]) -> bool {
23        match self.op {
24            FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
25            FilterOp::Or => {
26                return self
27                    .clauses
28                    .iter()
29                    .any(|clause| clause.iter().all(|f| f.matches_binary(doc)));
30            }
31            FilterOp::Expr => {
32                return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
33                    (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
34                    _ => false,
35                };
36            }
37            _ => {}
38        }
39
40        let (start, end) = match extract_field(doc, 0, &self.field) {
41            Some(r) => r,
42            None => {
43                // Qualified-name fallback: "amount" might be stored as "orders.amount".
44                let suffix = format!(".{}", self.field);
45                match find_field_by_suffix(doc, &suffix) {
46                    Some(r) => r,
47                    None => return self.op == FilterOp::IsNull,
48                }
49            }
50        };
51
52        eval_op(self, doc, start, end)
53    }
54
55    /// Evaluate using a pre-built `FieldIndex` for O(1) field lookup.
56    ///
57    /// Use when evaluating multiple predicates on the same document.
58    pub fn matches_binary_indexed(&self, doc: &[u8], idx: &FieldIndex) -> bool {
59        match self.op {
60            FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
61            FilterOp::Or => {
62                return self
63                    .clauses
64                    .iter()
65                    .any(|clause| clause.iter().all(|f| f.matches_binary_indexed(doc, idx)));
66            }
67            FilterOp::Expr => {
68                return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
69                    (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
70                    _ => false,
71                };
72            }
73            _ => {}
74        }
75
76        let (start, end) = match idx.get(&self.field) {
77            Some(r) => r,
78            None => return self.op == FilterOp::IsNull,
79        };
80
81        eval_op(self, doc, start, end)
82    }
83}
84
85/// Shared filter op evaluation — used by both `matches_binary` and `matches_binary_indexed`.
86fn eval_op(filter: &ScanFilter, doc: &[u8], start: usize, _end: usize) -> bool {
87    match filter.op {
88        FilterOp::IsNull => read_null(doc, start),
89        FilterOp::IsNotNull => !read_null(doc, start),
90        FilterOp::Eq => eq_value(doc, start, &filter.value),
91        FilterOp::Ne => !eq_value(doc, start, &filter.value),
92        FilterOp::Gt => cmp_value(doc, start, &filter.value) == Ordering::Greater,
93        FilterOp::Gte => {
94            let c = cmp_value(doc, start, &filter.value);
95            c == Ordering::Greater || c == Ordering::Equal
96        }
97        FilterOp::Lt => cmp_value(doc, start, &filter.value) == Ordering::Less,
98        FilterOp::Lte => {
99            let c = cmp_value(doc, start, &filter.value);
100            c == Ordering::Less || c == Ordering::Equal
101        }
102        FilterOp::Contains => {
103            if let (Some(s), Some(pattern)) = (read_str(doc, start), filter.value.as_str()) {
104                s.contains(pattern)
105            } else {
106                false
107            }
108        }
109        FilterOp::Like => str_match(doc, start, &filter.value, false, false),
110        FilterOp::NotLike => str_match(doc, start, &filter.value, false, true),
111        FilterOp::Ilike => str_match(doc, start, &filter.value, true, false),
112        FilterOp::NotIlike => str_match(doc, start, &filter.value, true, true),
113        FilterOp::In => {
114            if let Some(mut iter) = filter.value.as_array_iter() {
115                iter.any(|v| eq_value(doc, start, v))
116            } else {
117                false
118            }
119        }
120        FilterOp::NotIn => {
121            if let Some(mut iter) = filter.value.as_array_iter() {
122                !iter.any(|v| eq_value(doc, start, v))
123            } else {
124                true
125            }
126        }
127        FilterOp::ArrayContains => array_any(doc, start, |elem_start| {
128            eq_value(doc, elem_start, &filter.value)
129        }),
130        FilterOp::ArrayContainsAll => {
131            if let Some(mut needles) = filter.value.as_array_iter() {
132                needles.all(|needle| {
133                    array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
134                })
135            } else {
136                false
137            }
138        }
139        FilterOp::ArrayOverlap => {
140            if let Some(mut needles) = filter.value.as_array_iter() {
141                needles.any(|needle| {
142                    array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
143                })
144            } else {
145                false
146            }
147        }
148        // Column-vs-column: extract right-side field from the same doc.
149        FilterOp::GtColumn
150        | FilterOp::GteColumn
151        | FilterOp::LtColumn
152        | FilterOp::LteColumn
153        | FilterOp::EqColumn
154        | FilterOp::NeColumn => {
155            let other_col = match &filter.value {
156                nodedb_types::Value::String(s) => s.as_str(),
157                _ => return false,
158            };
159            // Try exact field name, then qualified suffix match.
160            let other_range = extract_field(doc, 0, other_col).or_else(|| {
161                let suffix = format!(".{other_col}");
162                find_field_by_suffix(doc, &suffix)
163            });
164            let Some((other_start, _)) = other_range else {
165                return false;
166            };
167            // Read both sides as Value and compare.
168            let left = read_value(doc, start).unwrap_or(nodedb_types::Value::Null);
169            let right = read_value(doc, other_start).unwrap_or(nodedb_types::Value::Null);
170            match filter.op {
171                FilterOp::GtColumn => left.cmp_coerced(&right) == Ordering::Greater,
172                FilterOp::GteColumn => left.cmp_coerced(&right) != Ordering::Less,
173                FilterOp::LtColumn => left.cmp_coerced(&right) == Ordering::Less,
174                FilterOp::LteColumn => left.cmp_coerced(&right) != Ordering::Greater,
175                FilterOp::EqColumn => left.eq_coerced(&right),
176                FilterOp::NeColumn => !left.eq_coerced(&right),
177                _ => false,
178            }
179        }
180        _ => false,
181    }
182}
183
184/// Find a field in a msgpack map by suffix match (e.g., ".amount" matches "orders.amount").
185fn find_field_by_suffix(doc: &[u8], suffix: &str) -> Option<(usize, usize)> {
186    let (count, mut pos) = map_header(doc, 0)?;
187    for _ in 0..count {
188        let key = read_str(doc, pos);
189        let key_end = skip_value(doc, pos)?;
190        let val_start = key_end;
191        let val_end = skip_value(doc, val_start)?;
192        if let Some(k) = key
193            && k.ends_with(suffix)
194        {
195            return Some((val_start, val_end));
196        }
197        pos = val_end;
198    }
199    None
200}
201
202// ── Helpers ────────────────────────────────────────────────────────────
203
204/// Coerced equality: read msgpack value at offset → compare with `Value`.
205/// Uses `Value::eq_coerced` — single source of truth for type coercion.
206#[inline]
207fn eq_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> bool {
208    if read_null(buf, offset) {
209        return filter_val.is_null();
210    }
211    match read_value(buf, offset) {
212        Some(field_val) => filter_val.eq_coerced(&field_val),
213        None => false,
214    }
215}
216
217/// Coerced ordering: read msgpack value at offset → compare with `Value`.
218/// Uses `Value::cmp_coerced` — single source of truth for ordering.
219///
220/// Returns ordering of field_val relative to filter_val (field <=> filter).
221#[inline]
222fn cmp_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> Ordering {
223    match read_value(buf, offset) {
224        Some(field_val) => field_val.cmp_coerced(filter_val),
225        None => Ordering::Equal,
226    }
227}
228
229/// LIKE/ILIKE/NOT LIKE/NOT ILIKE helper.
230#[inline]
231fn str_match(
232    buf: &[u8],
233    offset: usize,
234    pattern_val: &nodedb_types::Value,
235    icase: bool,
236    negate: bool,
237) -> bool {
238    let result = if let (Some(s), Some(pattern)) = (read_str(buf, offset), pattern_val.as_str()) {
239        sql_like_match(s, pattern, icase)
240    } else {
241        false
242    };
243    if negate { !result } else { result }
244}
245
246/// Iterate msgpack array elements, return true if any satisfies predicate.
247fn array_any(buf: &[u8], start: usize, mut pred: impl FnMut(usize) -> bool) -> bool {
248    let Some((count, mut pos)) = array_header(buf, start) else {
249        return false;
250    };
251    for _ in 0..count {
252        if pred(pos) {
253            return true;
254        }
255        let Some(next) = skip_value(buf, pos) else {
256            return false;
257        };
258        pos = next;
259    }
260    false
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use serde_json::json;
267
268    fn encode(v: &serde_json::Value) -> Vec<u8> {
269        nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
270    }
271
272    fn filter(field: &str, op: &str, value: nodedb_types::Value) -> ScanFilter {
273        ScanFilter {
274            field: field.into(),
275            op: op.into(),
276            value,
277            clauses: vec![],
278            expr: None,
279        }
280    }
281
282    #[test]
283    fn eq_integer() {
284        let doc = encode(&json!({"age": 25}));
285        assert!(filter("age", "eq", nodedb_types::Value::Integer(25)).matches_binary(&doc));
286        assert!(!filter("age", "eq", nodedb_types::Value::Integer(30)).matches_binary(&doc));
287    }
288
289    #[test]
290    fn eq_coerces_string_to_integer() {
291        let doc = encode(&json!({"age": 25}));
292        assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
293    }
294
295    #[test]
296    fn gt_coerces_string_to_integer() {
297        let doc = encode(&json!({"score": "90"}));
298        assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
299    }
300
301    #[test]
302    fn eq_string() {
303        let doc = encode(&json!({"name": "alice"}));
304        assert!(
305            filter("name", "eq", nodedb_types::Value::String("alice".into())).matches_binary(&doc)
306        );
307    }
308
309    #[test]
310    fn eq_coercion_int_vs_string() {
311        let doc = encode(&json!({"age": 25}));
312        assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
313    }
314
315    #[test]
316    fn eq_coercion_string_vs_int() {
317        let doc = encode(&json!({"score": "90"}));
318        assert!(filter("score", "eq", nodedb_types::Value::Integer(90)).matches_binary(&doc));
319    }
320
321    #[test]
322    fn ne() {
323        let doc = encode(&json!({"x": 1}));
324        assert!(filter("x", "ne", nodedb_types::Value::Integer(2)).matches_binary(&doc));
325        assert!(!filter("x", "ne", nodedb_types::Value::Integer(1)).matches_binary(&doc));
326    }
327
328    #[test]
329    fn gt_lt() {
330        let doc = encode(&json!({"v": 10}));
331        assert!(filter("v", "gt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
332        assert!(!filter("v", "gt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
333        assert!(filter("v", "lt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
334        assert!(!filter("v", "lt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
335    }
336
337    #[test]
338    fn gte_lte() {
339        let doc = encode(&json!({"v": 10}));
340        assert!(filter("v", "gte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
341        assert!(filter("v", "gte", nodedb_types::Value::Integer(5)).matches_binary(&doc));
342        assert!(!filter("v", "gte", nodedb_types::Value::Integer(15)).matches_binary(&doc));
343        assert!(filter("v", "lte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
344    }
345
346    #[test]
347    fn is_null_not_null() {
348        let doc = encode(&json!({"a": null, "b": 1}));
349        assert!(filter("a", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
350        assert!(!filter("b", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
351        assert!(filter("b", "is_not_null", nodedb_types::Value::Null).matches_binary(&doc));
352    }
353
354    #[test]
355    fn missing_field_is_null() {
356        let doc = encode(&json!({"x": 1}));
357        assert!(filter("missing", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
358    }
359
360    #[test]
361    fn contains_str() {
362        let doc = encode(&json!({"msg": "hello world"}));
363        assert!(
364            filter(
365                "msg",
366                "contains",
367                nodedb_types::Value::String("world".into())
368            )
369            .matches_binary(&doc)
370        );
371    }
372
373    #[test]
374    fn like_ilike() {
375        let doc = encode(&json!({"name": "Alice"}));
376        assert!(
377            filter("name", "like", nodedb_types::Value::String("Ali%".into())).matches_binary(&doc)
378        );
379        assert!(
380            !filter("name", "like", nodedb_types::Value::String("ali%".into()))
381                .matches_binary(&doc)
382        );
383        assert!(
384            filter("name", "ilike", nodedb_types::Value::String("ali%".into()))
385                .matches_binary(&doc)
386        );
387        assert!(
388            filter(
389                "name",
390                "not_like",
391                nodedb_types::Value::String("Bob%".into())
392            )
393            .matches_binary(&doc)
394        );
395    }
396
397    #[test]
398    fn in_not_in() {
399        let doc = encode(&json!({"status": "active"}));
400        let vals = nodedb_types::Value::Array(vec![
401            nodedb_types::Value::String("active".into()),
402            nodedb_types::Value::String("pending".into()),
403        ]);
404        assert!(
405            ScanFilter {
406                field: "status".into(),
407                op: "in".into(),
408                value: vals.clone(),
409                clauses: vec![],
410                expr: None
411            }
412            .matches_binary(&doc)
413        );
414
415        let doc2 = encode(&json!({"status": "deleted"}));
416        assert!(
417            ScanFilter {
418                field: "status".into(),
419                op: "not_in".into(),
420                value: vals,
421                clauses: vec![],
422                expr: None
423            }
424            .matches_binary(&doc2)
425        );
426    }
427
428    #[test]
429    fn array_contains() {
430        let doc = encode(&json!({"tags": ["rust", "db", "fast"]}));
431        assert!(
432            filter(
433                "tags",
434                "array_contains",
435                nodedb_types::Value::String("rust".into())
436            )
437            .matches_binary(&doc)
438        );
439        assert!(
440            !filter(
441                "tags",
442                "array_contains",
443                nodedb_types::Value::String("slow".into())
444            )
445            .matches_binary(&doc)
446        );
447    }
448
449    #[test]
450    fn array_contains_all() {
451        let doc = encode(&json!({"tags": ["a", "b", "c"]}));
452        let needles = nodedb_types::Value::Array(vec![
453            nodedb_types::Value::String("a".into()),
454            nodedb_types::Value::String("c".into()),
455        ]);
456        assert!(
457            ScanFilter {
458                field: "tags".into(),
459                op: "array_contains_all".into(),
460                value: needles,
461                clauses: vec![],
462                expr: None
463            }
464            .matches_binary(&doc)
465        );
466    }
467
468    #[test]
469    fn array_overlap() {
470        let doc = encode(&json!({"tags": ["x", "y"]}));
471        let needles = nodedb_types::Value::Array(vec![
472            nodedb_types::Value::String("y".into()),
473            nodedb_types::Value::String("z".into()),
474        ]);
475        assert!(
476            ScanFilter {
477                field: "tags".into(),
478                op: "array_overlap".into(),
479                value: needles,
480                clauses: vec![],
481                expr: None
482            }
483            .matches_binary(&doc)
484        );
485    }
486
487    #[test]
488    fn or_clauses() {
489        let doc = encode(&json!({"x": 5}));
490        let f = ScanFilter {
491            field: String::new(),
492            op: "or".into(),
493            value: nodedb_types::Value::Null,
494            clauses: vec![
495                vec![filter("x", "eq", nodedb_types::Value::Integer(10))],
496                vec![filter("x", "eq", nodedb_types::Value::Integer(5))],
497            ],
498            expr: None,
499        };
500        assert!(f.matches_binary(&doc));
501    }
502
503    #[test]
504    fn match_all() {
505        let doc = encode(&json!({"any": "thing"}));
506        assert!(filter("", "match_all", nodedb_types::Value::Null).matches_binary(&doc));
507    }
508
509    #[test]
510    fn float_comparison() {
511        let doc = encode(&json!({"temp": 36.6}));
512        assert!(filter("temp", "gt", nodedb_types::Value::Float(30.0)).matches_binary(&doc));
513        assert!(filter("temp", "lt", nodedb_types::Value::Float(40.0)).matches_binary(&doc));
514    }
515
516    #[test]
517    fn bool_eq() {
518        let doc = encode(&json!({"active": true}));
519        assert!(filter("active", "eq", nodedb_types::Value::Bool(true)).matches_binary(&doc));
520        assert!(!filter("active", "eq", nodedb_types::Value::Bool(false)).matches_binary(&doc));
521    }
522
523    #[test]
524    fn gt_coercion_string_field() {
525        let doc = encode(&json!({"score": "90"}));
526        assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
527    }
528
529    // ── Indexed variant tests ──────────────────────────────────────────
530
531    #[test]
532    fn indexed_matches_same_as_sequential() {
533        let doc = encode(&json!({"a": 1, "b": "hello", "c": true, "d": null}));
534        let idx = FieldIndex::build(&doc, 0).unwrap();
535
536        let filters = vec![
537            filter("a", "eq", nodedb_types::Value::Integer(1)),
538            filter("b", "contains", nodedb_types::Value::String("ell".into())),
539            filter("c", "eq", nodedb_types::Value::Bool(true)),
540            filter("d", "is_null", nodedb_types::Value::Null),
541            filter("missing", "is_null", nodedb_types::Value::Null),
542        ];
543
544        for f in &filters {
545            assert_eq!(
546                f.matches_binary(&doc),
547                f.matches_binary_indexed(&doc, &idx),
548                "mismatch for field={} op={:?}",
549                f.field,
550                f.op
551            );
552        }
553    }
554}