Skip to main content

nodedb_query/msgpack_scan/
filter.rs

1//! Binary filter evaluation on raw MessagePack documents.
2//!
3//! `ScanFilter::matches_binary(doc: &[u8])` evaluates a filter predicate
4//! directly on msgpack bytes without decoding to `serde_json::Value`.
5//! Uses `Value::eq_coerced`/`cmp_coerced` for type coercion — single
6//! source of truth shared with the JSON filter path.
7
8use std::cmp::Ordering;
9
10use crate::msgpack_scan::field::extract_field;
11use crate::msgpack_scan::index::FieldIndex;
12use crate::msgpack_scan::reader::{
13    array_header, map_header, read_null, read_str, read_value, skip_value,
14};
15use crate::scan_filter::like::sql_like_match;
16use crate::scan_filter::{FilterOp, ScanFilter};
17
18impl ScanFilter {
19    /// Evaluate this filter against a raw MessagePack document.
20    ///
21    /// Zero deserialization — extracts only the needed field bytes.
22    pub fn matches_binary(&self, doc: &[u8]) -> bool {
23        match self.op {
24            FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
25            FilterOp::Or => {
26                return self
27                    .clauses
28                    .iter()
29                    .any(|clause| clause.iter().all(|f| f.matches_binary(doc)));
30            }
31            _ => {}
32        }
33
34        let (start, end) = match extract_field(doc, 0, &self.field) {
35            Some(r) => r,
36            None => {
37                // Qualified-name fallback: "amount" might be stored as "orders.amount".
38                let suffix = format!(".{}", self.field);
39                match find_field_by_suffix(doc, &suffix) {
40                    Some(r) => r,
41                    None => return self.op == FilterOp::IsNull,
42                }
43            }
44        };
45
46        eval_op(self, doc, start, end)
47    }
48
49    /// Evaluate using a pre-built `FieldIndex` for O(1) field lookup.
50    ///
51    /// Use when evaluating multiple predicates on the same document.
52    pub fn matches_binary_indexed(&self, doc: &[u8], idx: &FieldIndex) -> bool {
53        match self.op {
54            FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
55            FilterOp::Or => {
56                return self
57                    .clauses
58                    .iter()
59                    .any(|clause| clause.iter().all(|f| f.matches_binary_indexed(doc, idx)));
60            }
61            _ => {}
62        }
63
64        let (start, end) = match idx.get(&self.field) {
65            Some(r) => r,
66            None => return self.op == FilterOp::IsNull,
67        };
68
69        eval_op(self, doc, start, end)
70    }
71}
72
73/// Shared filter op evaluation — used by both `matches_binary` and `matches_binary_indexed`.
74fn eval_op(filter: &ScanFilter, doc: &[u8], start: usize, _end: usize) -> bool {
75    match filter.op {
76        FilterOp::IsNull => read_null(doc, start),
77        FilterOp::IsNotNull => !read_null(doc, start),
78        FilterOp::Eq => eq_value(doc, start, &filter.value),
79        FilterOp::Ne => !eq_value(doc, start, &filter.value),
80        FilterOp::Gt => cmp_value(doc, start, &filter.value) == Ordering::Greater,
81        FilterOp::Gte => {
82            let c = cmp_value(doc, start, &filter.value);
83            c == Ordering::Greater || c == Ordering::Equal
84        }
85        FilterOp::Lt => cmp_value(doc, start, &filter.value) == Ordering::Less,
86        FilterOp::Lte => {
87            let c = cmp_value(doc, start, &filter.value);
88            c == Ordering::Less || c == Ordering::Equal
89        }
90        FilterOp::Contains => {
91            if let (Some(s), Some(pattern)) = (read_str(doc, start), filter.value.as_str()) {
92                s.contains(pattern)
93            } else {
94                false
95            }
96        }
97        FilterOp::Like => str_match(doc, start, &filter.value, false, false),
98        FilterOp::NotLike => str_match(doc, start, &filter.value, false, true),
99        FilterOp::Ilike => str_match(doc, start, &filter.value, true, false),
100        FilterOp::NotIlike => str_match(doc, start, &filter.value, true, true),
101        FilterOp::In => {
102            if let Some(mut iter) = filter.value.as_array_iter() {
103                iter.any(|v| eq_value(doc, start, v))
104            } else {
105                false
106            }
107        }
108        FilterOp::NotIn => {
109            if let Some(mut iter) = filter.value.as_array_iter() {
110                !iter.any(|v| eq_value(doc, start, v))
111            } else {
112                true
113            }
114        }
115        FilterOp::ArrayContains => array_any(doc, start, |elem_start| {
116            eq_value(doc, elem_start, &filter.value)
117        }),
118        FilterOp::ArrayContainsAll => {
119            if let Some(mut needles) = filter.value.as_array_iter() {
120                needles.all(|needle| {
121                    array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
122                })
123            } else {
124                false
125            }
126        }
127        FilterOp::ArrayOverlap => {
128            if let Some(mut needles) = filter.value.as_array_iter() {
129                needles.any(|needle| {
130                    array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
131                })
132            } else {
133                false
134            }
135        }
136        // Column-vs-column: extract right-side field from the same doc.
137        FilterOp::GtColumn
138        | FilterOp::GteColumn
139        | FilterOp::LtColumn
140        | FilterOp::LteColumn
141        | FilterOp::EqColumn
142        | FilterOp::NeColumn => {
143            let other_col = match &filter.value {
144                nodedb_types::Value::String(s) => s.as_str(),
145                _ => return false,
146            };
147            // Try exact field name, then qualified suffix match.
148            let other_range = extract_field(doc, 0, other_col).or_else(|| {
149                let suffix = format!(".{other_col}");
150                find_field_by_suffix(doc, &suffix)
151            });
152            let Some((other_start, _)) = other_range else {
153                return false;
154            };
155            // Read both sides as Value and compare.
156            let left = read_value(doc, start).unwrap_or(nodedb_types::Value::Null);
157            let right = read_value(doc, other_start).unwrap_or(nodedb_types::Value::Null);
158            match filter.op {
159                FilterOp::GtColumn => left.cmp_coerced(&right) == Ordering::Greater,
160                FilterOp::GteColumn => left.cmp_coerced(&right) != Ordering::Less,
161                FilterOp::LtColumn => left.cmp_coerced(&right) == Ordering::Less,
162                FilterOp::LteColumn => left.cmp_coerced(&right) != Ordering::Greater,
163                FilterOp::EqColumn => left.eq_coerced(&right),
164                FilterOp::NeColumn => !left.eq_coerced(&right),
165                _ => false,
166            }
167        }
168        _ => false,
169    }
170}
171
172/// Find a field in a msgpack map by suffix match (e.g., ".amount" matches "orders.amount").
173fn find_field_by_suffix(doc: &[u8], suffix: &str) -> Option<(usize, usize)> {
174    let (count, mut pos) = map_header(doc, 0)?;
175    for _ in 0..count {
176        let key = read_str(doc, pos);
177        let key_end = skip_value(doc, pos)?;
178        let val_start = key_end;
179        let val_end = skip_value(doc, val_start)?;
180        if let Some(k) = key
181            && k.ends_with(suffix)
182        {
183            return Some((val_start, val_end));
184        }
185        pos = val_end;
186    }
187    None
188}
189
190// ── Helpers ────────────────────────────────────────────────────────────
191
192/// Coerced equality: read msgpack value at offset → compare with `Value`.
193/// Uses `Value::eq_coerced` — single source of truth for type coercion.
194#[inline]
195fn eq_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> bool {
196    if read_null(buf, offset) {
197        return filter_val.is_null();
198    }
199    match read_value(buf, offset) {
200        Some(field_val) => filter_val.eq_coerced(&field_val),
201        None => false,
202    }
203}
204
205/// Coerced ordering: read msgpack value at offset → compare with `Value`.
206/// Uses `Value::cmp_coerced` — single source of truth for ordering.
207///
208/// Returns ordering of field_val relative to filter_val (field <=> filter).
209#[inline]
210fn cmp_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> Ordering {
211    match read_value(buf, offset) {
212        Some(field_val) => field_val.cmp_coerced(filter_val),
213        None => Ordering::Equal,
214    }
215}
216
217/// LIKE/ILIKE/NOT LIKE/NOT ILIKE helper.
218#[inline]
219fn str_match(
220    buf: &[u8],
221    offset: usize,
222    pattern_val: &nodedb_types::Value,
223    icase: bool,
224    negate: bool,
225) -> bool {
226    let result = if let (Some(s), Some(pattern)) = (read_str(buf, offset), pattern_val.as_str()) {
227        sql_like_match(s, pattern, icase)
228    } else {
229        false
230    };
231    if negate { !result } else { result }
232}
233
234/// Iterate msgpack array elements, return true if any satisfies predicate.
235fn array_any(buf: &[u8], start: usize, mut pred: impl FnMut(usize) -> bool) -> bool {
236    let Some((count, mut pos)) = array_header(buf, start) else {
237        return false;
238    };
239    for _ in 0..count {
240        if pred(pos) {
241            return true;
242        }
243        let Some(next) = skip_value(buf, pos) else {
244            return false;
245        };
246        pos = next;
247    }
248    false
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254    use serde_json::json;
255
256    fn encode(v: &serde_json::Value) -> Vec<u8> {
257        nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
258    }
259
260    fn filter(field: &str, op: &str, value: nodedb_types::Value) -> ScanFilter {
261        ScanFilter {
262            field: field.into(),
263            op: op.into(),
264            value,
265            clauses: vec![],
266        }
267    }
268
269    #[test]
270    fn eq_integer() {
271        let doc = encode(&json!({"age": 25}));
272        assert!(filter("age", "eq", nodedb_types::Value::Integer(25)).matches_binary(&doc));
273        assert!(!filter("age", "eq", nodedb_types::Value::Integer(30)).matches_binary(&doc));
274    }
275
276    #[test]
277    fn eq_string() {
278        let doc = encode(&json!({"name": "alice"}));
279        assert!(
280            filter("name", "eq", nodedb_types::Value::String("alice".into())).matches_binary(&doc)
281        );
282    }
283
284    #[test]
285    fn eq_coercion_int_vs_string() {
286        let doc = encode(&json!({"age": 25}));
287        assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
288    }
289
290    #[test]
291    fn eq_coercion_string_vs_int() {
292        let doc = encode(&json!({"score": "90"}));
293        assert!(filter("score", "eq", nodedb_types::Value::Integer(90)).matches_binary(&doc));
294    }
295
296    #[test]
297    fn ne() {
298        let doc = encode(&json!({"x": 1}));
299        assert!(filter("x", "ne", nodedb_types::Value::Integer(2)).matches_binary(&doc));
300        assert!(!filter("x", "ne", nodedb_types::Value::Integer(1)).matches_binary(&doc));
301    }
302
303    #[test]
304    fn gt_lt() {
305        let doc = encode(&json!({"v": 10}));
306        assert!(filter("v", "gt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
307        assert!(!filter("v", "gt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
308        assert!(filter("v", "lt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
309        assert!(!filter("v", "lt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
310    }
311
312    #[test]
313    fn gte_lte() {
314        let doc = encode(&json!({"v": 10}));
315        assert!(filter("v", "gte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
316        assert!(filter("v", "gte", nodedb_types::Value::Integer(5)).matches_binary(&doc));
317        assert!(!filter("v", "gte", nodedb_types::Value::Integer(15)).matches_binary(&doc));
318        assert!(filter("v", "lte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
319    }
320
321    #[test]
322    fn is_null_not_null() {
323        let doc = encode(&json!({"a": null, "b": 1}));
324        assert!(filter("a", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
325        assert!(!filter("b", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
326        assert!(filter("b", "is_not_null", nodedb_types::Value::Null).matches_binary(&doc));
327    }
328
329    #[test]
330    fn missing_field_is_null() {
331        let doc = encode(&json!({"x": 1}));
332        assert!(filter("missing", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
333    }
334
335    #[test]
336    fn contains_str() {
337        let doc = encode(&json!({"msg": "hello world"}));
338        assert!(
339            filter(
340                "msg",
341                "contains",
342                nodedb_types::Value::String("world".into())
343            )
344            .matches_binary(&doc)
345        );
346    }
347
348    #[test]
349    fn like_ilike() {
350        let doc = encode(&json!({"name": "Alice"}));
351        assert!(
352            filter("name", "like", nodedb_types::Value::String("Ali%".into())).matches_binary(&doc)
353        );
354        assert!(
355            !filter("name", "like", nodedb_types::Value::String("ali%".into()))
356                .matches_binary(&doc)
357        );
358        assert!(
359            filter("name", "ilike", nodedb_types::Value::String("ali%".into()))
360                .matches_binary(&doc)
361        );
362        assert!(
363            filter(
364                "name",
365                "not_like",
366                nodedb_types::Value::String("Bob%".into())
367            )
368            .matches_binary(&doc)
369        );
370    }
371
372    #[test]
373    fn in_not_in() {
374        let doc = encode(&json!({"status": "active"}));
375        let vals = nodedb_types::Value::Array(vec![
376            nodedb_types::Value::String("active".into()),
377            nodedb_types::Value::String("pending".into()),
378        ]);
379        assert!(
380            ScanFilter {
381                field: "status".into(),
382                op: "in".into(),
383                value: vals.clone(),
384                clauses: vec![]
385            }
386            .matches_binary(&doc)
387        );
388
389        let doc2 = encode(&json!({"status": "deleted"}));
390        assert!(
391            ScanFilter {
392                field: "status".into(),
393                op: "not_in".into(),
394                value: vals,
395                clauses: vec![]
396            }
397            .matches_binary(&doc2)
398        );
399    }
400
401    #[test]
402    fn array_contains() {
403        let doc = encode(&json!({"tags": ["rust", "db", "fast"]}));
404        assert!(
405            filter(
406                "tags",
407                "array_contains",
408                nodedb_types::Value::String("rust".into())
409            )
410            .matches_binary(&doc)
411        );
412        assert!(
413            !filter(
414                "tags",
415                "array_contains",
416                nodedb_types::Value::String("slow".into())
417            )
418            .matches_binary(&doc)
419        );
420    }
421
422    #[test]
423    fn array_contains_all() {
424        let doc = encode(&json!({"tags": ["a", "b", "c"]}));
425        let needles = nodedb_types::Value::Array(vec![
426            nodedb_types::Value::String("a".into()),
427            nodedb_types::Value::String("c".into()),
428        ]);
429        assert!(
430            ScanFilter {
431                field: "tags".into(),
432                op: "array_contains_all".into(),
433                value: needles,
434                clauses: vec![]
435            }
436            .matches_binary(&doc)
437        );
438    }
439
440    #[test]
441    fn array_overlap() {
442        let doc = encode(&json!({"tags": ["x", "y"]}));
443        let needles = nodedb_types::Value::Array(vec![
444            nodedb_types::Value::String("y".into()),
445            nodedb_types::Value::String("z".into()),
446        ]);
447        assert!(
448            ScanFilter {
449                field: "tags".into(),
450                op: "array_overlap".into(),
451                value: needles,
452                clauses: vec![]
453            }
454            .matches_binary(&doc)
455        );
456    }
457
458    #[test]
459    fn or_clauses() {
460        let doc = encode(&json!({"x": 5}));
461        let f = ScanFilter {
462            field: String::new(),
463            op: "or".into(),
464            value: nodedb_types::Value::Null,
465            clauses: vec![
466                vec![filter("x", "eq", nodedb_types::Value::Integer(10))],
467                vec![filter("x", "eq", nodedb_types::Value::Integer(5))],
468            ],
469        };
470        assert!(f.matches_binary(&doc));
471    }
472
473    #[test]
474    fn match_all() {
475        let doc = encode(&json!({"any": "thing"}));
476        assert!(filter("", "match_all", nodedb_types::Value::Null).matches_binary(&doc));
477    }
478
479    #[test]
480    fn float_comparison() {
481        let doc = encode(&json!({"temp": 36.6}));
482        assert!(filter("temp", "gt", nodedb_types::Value::Float(30.0)).matches_binary(&doc));
483        assert!(filter("temp", "lt", nodedb_types::Value::Float(40.0)).matches_binary(&doc));
484    }
485
486    #[test]
487    fn bool_eq() {
488        let doc = encode(&json!({"active": true}));
489        assert!(filter("active", "eq", nodedb_types::Value::Bool(true)).matches_binary(&doc));
490        assert!(!filter("active", "eq", nodedb_types::Value::Bool(false)).matches_binary(&doc));
491    }
492
493    #[test]
494    fn gt_coercion_string_field() {
495        let doc = encode(&json!({"score": "90"}));
496        assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
497    }
498
499    // ── Indexed variant tests ──────────────────────────────────────────
500
501    #[test]
502    fn indexed_matches_same_as_sequential() {
503        let doc = encode(&json!({"a": 1, "b": "hello", "c": true, "d": null}));
504        let idx = FieldIndex::build(&doc, 0).unwrap();
505
506        let filters = vec![
507            filter("a", "eq", nodedb_types::Value::Integer(1)),
508            filter("b", "contains", nodedb_types::Value::String("ell".into())),
509            filter("c", "eq", nodedb_types::Value::Bool(true)),
510            filter("d", "is_null", nodedb_types::Value::Null),
511            filter("missing", "is_null", nodedb_types::Value::Null),
512        ];
513
514        for f in &filters {
515            assert_eq!(
516                f.matches_binary(&doc),
517                f.matches_binary_indexed(&doc, &idx),
518                "mismatch for field={} op={:?}",
519                f.field,
520                f.op
521            );
522        }
523    }
524}