Skip to main content

nodedb_query/msgpack_scan/
filter.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Binary filter evaluation on raw MessagePack documents.
4//!
5//! `ScanFilter::matches_binary(doc: &[u8])` evaluates a filter predicate
6//! directly on msgpack bytes without decoding to `serde_json::Value`.
7//! Uses `Value::eq_coerced`/`cmp_coerced` for type coercion — single
8//! source of truth shared with the JSON filter path.
9
10use std::cmp::Ordering;
11
12use crate::msgpack_scan::field::extract_field;
13use crate::msgpack_scan::index::FieldIndex;
14use crate::msgpack_scan::reader::{
15    array_header, map_header, read_null, read_str, read_value, skip_value,
16};
17use crate::scan_filter::like::sql_like_match;
18use crate::scan_filter::{FilterOp, ScanFilter};
19
20impl ScanFilter {
21    /// Evaluate this filter against a raw MessagePack document.
22    ///
23    /// Zero deserialization — extracts only the needed field bytes.
24    pub fn matches_binary(&self, doc: &[u8]) -> bool {
25        match self.op {
26            FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
27            FilterOp::Or => {
28                return self
29                    .clauses
30                    .iter()
31                    .any(|clause| clause.iter().all(|f| f.matches_binary(doc)));
32            }
33            FilterOp::Expr => {
34                return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
35                    (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
36                    _ => false,
37                };
38            }
39            _ => {}
40        }
41
42        let (start, end) = match extract_field(doc, 0, &self.field) {
43            Some(r) => r,
44            None => {
45                // Qualified-name fallback: "amount" might be stored as "orders.amount".
46                let suffix = format!(".{}", self.field);
47                match find_field_by_suffix(doc, &suffix) {
48                    Some(r) => r,
49                    None => return self.op == FilterOp::IsNull,
50                }
51            }
52        };
53
54        eval_op(self, doc, start, end)
55    }
56
57    /// Evaluate using a pre-built `FieldIndex` for O(1) field lookup.
58    ///
59    /// Use when evaluating multiple predicates on the same document.
60    pub fn matches_binary_indexed(&self, doc: &[u8], idx: &FieldIndex) -> bool {
61        match self.op {
62            FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
63            FilterOp::Or => {
64                return self
65                    .clauses
66                    .iter()
67                    .any(|clause| clause.iter().all(|f| f.matches_binary_indexed(doc, idx)));
68            }
69            FilterOp::Expr => {
70                return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
71                    (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
72                    _ => false,
73                };
74            }
75            _ => {}
76        }
77
78        let (start, end) = match idx.get(&self.field) {
79            Some(r) => r,
80            None => return self.op == FilterOp::IsNull,
81        };
82
83        eval_op(self, doc, start, end)
84    }
85}
86
87/// Shared filter op evaluation — used by both `matches_binary` and `matches_binary_indexed`.
88fn eval_op(filter: &ScanFilter, doc: &[u8], start: usize, _end: usize) -> bool {
89    match filter.op {
90        FilterOp::IsNull => read_null(doc, start),
91        FilterOp::IsNotNull => !read_null(doc, start),
92        FilterOp::Eq => eq_value(doc, start, &filter.value),
93        FilterOp::Ne => !eq_value(doc, start, &filter.value),
94        FilterOp::Gt => cmp_value(doc, start, &filter.value) == Ordering::Greater,
95        FilterOp::Gte => {
96            let c = cmp_value(doc, start, &filter.value);
97            c == Ordering::Greater || c == Ordering::Equal
98        }
99        FilterOp::Lt => cmp_value(doc, start, &filter.value) == Ordering::Less,
100        FilterOp::Lte => {
101            let c = cmp_value(doc, start, &filter.value);
102            c == Ordering::Less || c == Ordering::Equal
103        }
104        FilterOp::Contains => {
105            if let (Some(s), Some(pattern)) = (read_str(doc, start), filter.value.as_str()) {
106                s.contains(pattern)
107            } else {
108                false
109            }
110        }
111        FilterOp::Like => str_match(doc, start, &filter.value, false, false),
112        FilterOp::NotLike => str_match(doc, start, &filter.value, false, true),
113        FilterOp::Ilike => str_match(doc, start, &filter.value, true, false),
114        FilterOp::NotIlike => str_match(doc, start, &filter.value, true, true),
115        FilterOp::In => {
116            if let Some(mut iter) = filter.value.as_array_iter() {
117                iter.any(|v| eq_value(doc, start, v))
118            } else {
119                false
120            }
121        }
122        FilterOp::NotIn => {
123            if let Some(mut iter) = filter.value.as_array_iter() {
124                !iter.any(|v| eq_value(doc, start, v))
125            } else {
126                true
127            }
128        }
129        FilterOp::ArrayContains => array_any(doc, start, |elem_start| {
130            eq_value(doc, elem_start, &filter.value)
131        }),
132        FilterOp::ArrayContainsAll => {
133            if let Some(mut needles) = filter.value.as_array_iter() {
134                needles.all(|needle| {
135                    array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
136                })
137            } else {
138                false
139            }
140        }
141        FilterOp::ArrayOverlap => {
142            if let Some(mut needles) = filter.value.as_array_iter() {
143                needles.any(|needle| {
144                    array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
145                })
146            } else {
147                false
148            }
149        }
150        // Column-vs-column: extract right-side field from the same doc.
151        FilterOp::GtColumn
152        | FilterOp::GteColumn
153        | FilterOp::LtColumn
154        | FilterOp::LteColumn
155        | FilterOp::EqColumn
156        | FilterOp::NeColumn => {
157            let other_col = match &filter.value {
158                nodedb_types::Value::String(s) => s.as_str(),
159                _ => return false,
160            };
161            // Try exact field name, then qualified suffix match.
162            let other_range = extract_field(doc, 0, other_col).or_else(|| {
163                let suffix = format!(".{other_col}");
164                find_field_by_suffix(doc, &suffix)
165            });
166            let Some((other_start, _)) = other_range else {
167                return false;
168            };
169            // Read both sides as Value and compare.
170            let left = read_value(doc, start).unwrap_or(nodedb_types::Value::Null);
171            let right = read_value(doc, other_start).unwrap_or(nodedb_types::Value::Null);
172            match filter.op {
173                FilterOp::GtColumn => left.cmp_coerced(&right) == Ordering::Greater,
174                FilterOp::GteColumn => left.cmp_coerced(&right) != Ordering::Less,
175                FilterOp::LtColumn => left.cmp_coerced(&right) == Ordering::Less,
176                FilterOp::LteColumn => left.cmp_coerced(&right) != Ordering::Greater,
177                FilterOp::EqColumn => left.eq_coerced(&right),
178                FilterOp::NeColumn => !left.eq_coerced(&right),
179                _ => false,
180            }
181        }
182        _ => false,
183    }
184}
185
186/// Find a field in a msgpack map by suffix match (e.g., ".amount" matches "orders.amount").
187fn find_field_by_suffix(doc: &[u8], suffix: &str) -> Option<(usize, usize)> {
188    let (count, mut pos) = map_header(doc, 0)?;
189    for _ in 0..count {
190        let key = read_str(doc, pos);
191        let key_end = skip_value(doc, pos)?;
192        let val_start = key_end;
193        let val_end = skip_value(doc, val_start)?;
194        if let Some(k) = key
195            && k.ends_with(suffix)
196        {
197            return Some((val_start, val_end));
198        }
199        pos = val_end;
200    }
201    None
202}
203
204// ── Helpers ────────────────────────────────────────────────────────────
205
206/// Coerced equality: read msgpack value at offset → compare with `Value`.
207/// Uses `Value::eq_coerced` — single source of truth for type coercion.
208#[inline]
209fn eq_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> bool {
210    if read_null(buf, offset) {
211        return filter_val.is_null();
212    }
213    match read_value(buf, offset) {
214        Some(field_val) => filter_val.eq_coerced(&field_val),
215        None => false,
216    }
217}
218
219/// Coerced ordering: read msgpack value at offset → compare with `Value`.
220/// Uses `Value::cmp_coerced` — single source of truth for ordering.
221///
222/// Returns ordering of field_val relative to filter_val (field <=> filter).
223#[inline]
224fn cmp_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> Ordering {
225    match read_value(buf, offset) {
226        Some(field_val) => field_val.cmp_coerced(filter_val),
227        None => Ordering::Equal,
228    }
229}
230
231/// LIKE/ILIKE/NOT LIKE/NOT ILIKE helper.
232#[inline]
233fn str_match(
234    buf: &[u8],
235    offset: usize,
236    pattern_val: &nodedb_types::Value,
237    icase: bool,
238    negate: bool,
239) -> bool {
240    let result = if let (Some(s), Some(pattern)) = (read_str(buf, offset), pattern_val.as_str()) {
241        sql_like_match(s, pattern, icase)
242    } else {
243        false
244    };
245    if negate { !result } else { result }
246}
247
248/// Iterate msgpack array elements, return true if any satisfies predicate.
249fn array_any(buf: &[u8], start: usize, mut pred: impl FnMut(usize) -> bool) -> bool {
250    let Some((count, mut pos)) = array_header(buf, start) else {
251        return false;
252    };
253    for _ in 0..count {
254        if pred(pos) {
255            return true;
256        }
257        let Some(next) = skip_value(buf, pos) else {
258            return false;
259        };
260        pos = next;
261    }
262    false
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use serde_json::json;
269
270    fn encode(v: &serde_json::Value) -> Vec<u8> {
271        nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
272    }
273
274    fn filter(field: &str, op: &str, value: nodedb_types::Value) -> ScanFilter {
275        ScanFilter {
276            field: field.into(),
277            op: op.into(),
278            value,
279            clauses: vec![],
280            expr: None,
281        }
282    }
283
284    #[test]
285    fn eq_integer() {
286        let doc = encode(&json!({"age": 25}));
287        assert!(filter("age", "eq", nodedb_types::Value::Integer(25)).matches_binary(&doc));
288        assert!(!filter("age", "eq", nodedb_types::Value::Integer(30)).matches_binary(&doc));
289    }
290
291    #[test]
292    fn eq_coerces_string_to_integer() {
293        let doc = encode(&json!({"age": 25}));
294        assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
295    }
296
297    #[test]
298    fn gt_coerces_string_to_integer() {
299        let doc = encode(&json!({"score": "90"}));
300        assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
301    }
302
303    #[test]
304    fn eq_string() {
305        let doc = encode(&json!({"name": "alice"}));
306        assert!(
307            filter("name", "eq", nodedb_types::Value::String("alice".into())).matches_binary(&doc)
308        );
309    }
310
311    #[test]
312    fn eq_coercion_int_vs_string() {
313        let doc = encode(&json!({"age": 25}));
314        assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
315    }
316
317    #[test]
318    fn eq_coercion_string_vs_int() {
319        let doc = encode(&json!({"score": "90"}));
320        assert!(filter("score", "eq", nodedb_types::Value::Integer(90)).matches_binary(&doc));
321    }
322
323    #[test]
324    fn ne() {
325        let doc = encode(&json!({"x": 1}));
326        assert!(filter("x", "ne", nodedb_types::Value::Integer(2)).matches_binary(&doc));
327        assert!(!filter("x", "ne", nodedb_types::Value::Integer(1)).matches_binary(&doc));
328    }
329
330    #[test]
331    fn gt_lt() {
332        let doc = encode(&json!({"v": 10}));
333        assert!(filter("v", "gt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
334        assert!(!filter("v", "gt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
335        assert!(filter("v", "lt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
336        assert!(!filter("v", "lt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
337    }
338
339    #[test]
340    fn gte_lte() {
341        let doc = encode(&json!({"v": 10}));
342        assert!(filter("v", "gte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
343        assert!(filter("v", "gte", nodedb_types::Value::Integer(5)).matches_binary(&doc));
344        assert!(!filter("v", "gte", nodedb_types::Value::Integer(15)).matches_binary(&doc));
345        assert!(filter("v", "lte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
346    }
347
348    #[test]
349    fn is_null_not_null() {
350        let doc = encode(&json!({"a": null, "b": 1}));
351        assert!(filter("a", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
352        assert!(!filter("b", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
353        assert!(filter("b", "is_not_null", nodedb_types::Value::Null).matches_binary(&doc));
354    }
355
356    #[test]
357    fn missing_field_is_null() {
358        let doc = encode(&json!({"x": 1}));
359        assert!(filter("missing", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
360    }
361
362    #[test]
363    fn contains_str() {
364        let doc = encode(&json!({"msg": "hello world"}));
365        assert!(
366            filter(
367                "msg",
368                "contains",
369                nodedb_types::Value::String("world".into())
370            )
371            .matches_binary(&doc)
372        );
373    }
374
375    #[test]
376    fn like_ilike() {
377        let doc = encode(&json!({"name": "Alice"}));
378        assert!(
379            filter("name", "like", nodedb_types::Value::String("Ali%".into())).matches_binary(&doc)
380        );
381        assert!(
382            !filter("name", "like", nodedb_types::Value::String("ali%".into()))
383                .matches_binary(&doc)
384        );
385        assert!(
386            filter("name", "ilike", nodedb_types::Value::String("ali%".into()))
387                .matches_binary(&doc)
388        );
389        assert!(
390            filter(
391                "name",
392                "not_like",
393                nodedb_types::Value::String("Bob%".into())
394            )
395            .matches_binary(&doc)
396        );
397    }
398
399    #[test]
400    fn in_not_in() {
401        let doc = encode(&json!({"status": "active"}));
402        let vals = nodedb_types::Value::Array(vec![
403            nodedb_types::Value::String("active".into()),
404            nodedb_types::Value::String("pending".into()),
405        ]);
406        assert!(
407            ScanFilter {
408                field: "status".into(),
409                op: "in".into(),
410                value: vals.clone(),
411                clauses: vec![],
412                expr: None
413            }
414            .matches_binary(&doc)
415        );
416
417        let doc2 = encode(&json!({"status": "deleted"}));
418        assert!(
419            ScanFilter {
420                field: "status".into(),
421                op: "not_in".into(),
422                value: vals,
423                clauses: vec![],
424                expr: None
425            }
426            .matches_binary(&doc2)
427        );
428    }
429
430    #[test]
431    fn array_contains() {
432        let doc = encode(&json!({"tags": ["rust", "db", "fast"]}));
433        assert!(
434            filter(
435                "tags",
436                "array_contains",
437                nodedb_types::Value::String("rust".into())
438            )
439            .matches_binary(&doc)
440        );
441        assert!(
442            !filter(
443                "tags",
444                "array_contains",
445                nodedb_types::Value::String("slow".into())
446            )
447            .matches_binary(&doc)
448        );
449    }
450
451    #[test]
452    fn array_contains_all() {
453        let doc = encode(&json!({"tags": ["a", "b", "c"]}));
454        let needles = nodedb_types::Value::Array(vec![
455            nodedb_types::Value::String("a".into()),
456            nodedb_types::Value::String("c".into()),
457        ]);
458        assert!(
459            ScanFilter {
460                field: "tags".into(),
461                op: "array_contains_all".into(),
462                value: needles,
463                clauses: vec![],
464                expr: None
465            }
466            .matches_binary(&doc)
467        );
468    }
469
470    #[test]
471    fn array_overlap() {
472        let doc = encode(&json!({"tags": ["x", "y"]}));
473        let needles = nodedb_types::Value::Array(vec![
474            nodedb_types::Value::String("y".into()),
475            nodedb_types::Value::String("z".into()),
476        ]);
477        assert!(
478            ScanFilter {
479                field: "tags".into(),
480                op: "array_overlap".into(),
481                value: needles,
482                clauses: vec![],
483                expr: None
484            }
485            .matches_binary(&doc)
486        );
487    }
488
489    #[test]
490    fn or_clauses() {
491        let doc = encode(&json!({"x": 5}));
492        let f = ScanFilter {
493            field: String::new(),
494            op: "or".into(),
495            value: nodedb_types::Value::Null,
496            clauses: vec![
497                vec![filter("x", "eq", nodedb_types::Value::Integer(10))],
498                vec![filter("x", "eq", nodedb_types::Value::Integer(5))],
499            ],
500            expr: None,
501        };
502        assert!(f.matches_binary(&doc));
503    }
504
505    #[test]
506    fn match_all() {
507        let doc = encode(&json!({"any": "thing"}));
508        assert!(filter("", "match_all", nodedb_types::Value::Null).matches_binary(&doc));
509    }
510
511    #[test]
512    fn float_comparison() {
513        let doc = encode(&json!({"temp": 36.6}));
514        assert!(filter("temp", "gt", nodedb_types::Value::Float(30.0)).matches_binary(&doc));
515        assert!(filter("temp", "lt", nodedb_types::Value::Float(40.0)).matches_binary(&doc));
516    }
517
518    #[test]
519    fn bool_eq() {
520        let doc = encode(&json!({"active": true}));
521        assert!(filter("active", "eq", nodedb_types::Value::Bool(true)).matches_binary(&doc));
522        assert!(!filter("active", "eq", nodedb_types::Value::Bool(false)).matches_binary(&doc));
523    }
524
525    #[test]
526    fn gt_coercion_string_field() {
527        let doc = encode(&json!({"score": "90"}));
528        assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
529    }
530
531    // ── Indexed variant tests ──────────────────────────────────────────
532
533    #[test]
534    fn indexed_matches_same_as_sequential() {
535        let doc = encode(&json!({"a": 1, "b": "hello", "c": true, "d": null}));
536        let idx = FieldIndex::build(&doc, 0).unwrap();
537
538        let filters = vec![
539            filter("a", "eq", nodedb_types::Value::Integer(1)),
540            filter("b", "contains", nodedb_types::Value::String("ell".into())),
541            filter("c", "eq", nodedb_types::Value::Bool(true)),
542            filter("d", "is_null", nodedb_types::Value::Null),
543            filter("missing", "is_null", nodedb_types::Value::Null),
544        ];
545
546        for f in &filters {
547            assert_eq!(
548                f.matches_binary(&doc),
549                f.matches_binary_indexed(&doc, &idx),
550                "mismatch for field={} op={:?}",
551                f.field,
552                f.op
553            );
554        }
555    }
556}