1use std::cmp::Ordering;
9
10use crate::msgpack_scan::field::extract_field;
11use crate::msgpack_scan::index::FieldIndex;
12use crate::msgpack_scan::reader::{
13 array_header, map_header, read_null, read_str, read_value, skip_value,
14};
15use crate::scan_filter::like::sql_like_match;
16use crate::scan_filter::{FilterOp, ScanFilter};
17
18impl ScanFilter {
19 pub fn matches_binary(&self, doc: &[u8]) -> bool {
23 match self.op {
24 FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
25 FilterOp::Or => {
26 return self
27 .clauses
28 .iter()
29 .any(|clause| clause.iter().all(|f| f.matches_binary(doc)));
30 }
31 _ => {}
32 }
33
34 let (start, end) = match extract_field(doc, 0, &self.field) {
35 Some(r) => r,
36 None => {
37 let suffix = format!(".{}", self.field);
39 match find_field_by_suffix(doc, &suffix) {
40 Some(r) => r,
41 None => return self.op == FilterOp::IsNull,
42 }
43 }
44 };
45
46 eval_op(self, doc, start, end)
47 }
48
49 pub fn matches_binary_indexed(&self, doc: &[u8], idx: &FieldIndex) -> bool {
53 match self.op {
54 FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
55 FilterOp::Or => {
56 return self
57 .clauses
58 .iter()
59 .any(|clause| clause.iter().all(|f| f.matches_binary_indexed(doc, idx)));
60 }
61 _ => {}
62 }
63
64 let (start, end) = match idx.get(&self.field) {
65 Some(r) => r,
66 None => return self.op == FilterOp::IsNull,
67 };
68
69 eval_op(self, doc, start, end)
70 }
71}
72
73fn eval_op(filter: &ScanFilter, doc: &[u8], start: usize, _end: usize) -> bool {
75 match filter.op {
76 FilterOp::IsNull => read_null(doc, start),
77 FilterOp::IsNotNull => !read_null(doc, start),
78 FilterOp::Eq => eq_value(doc, start, &filter.value),
79 FilterOp::Ne => !eq_value(doc, start, &filter.value),
80 FilterOp::Gt => cmp_value(doc, start, &filter.value) == Ordering::Greater,
81 FilterOp::Gte => {
82 let c = cmp_value(doc, start, &filter.value);
83 c == Ordering::Greater || c == Ordering::Equal
84 }
85 FilterOp::Lt => cmp_value(doc, start, &filter.value) == Ordering::Less,
86 FilterOp::Lte => {
87 let c = cmp_value(doc, start, &filter.value);
88 c == Ordering::Less || c == Ordering::Equal
89 }
90 FilterOp::Contains => {
91 if let (Some(s), Some(pattern)) = (read_str(doc, start), filter.value.as_str()) {
92 s.contains(pattern)
93 } else {
94 false
95 }
96 }
97 FilterOp::Like => str_match(doc, start, &filter.value, false, false),
98 FilterOp::NotLike => str_match(doc, start, &filter.value, false, true),
99 FilterOp::Ilike => str_match(doc, start, &filter.value, true, false),
100 FilterOp::NotIlike => str_match(doc, start, &filter.value, true, true),
101 FilterOp::In => {
102 if let Some(mut iter) = filter.value.as_array_iter() {
103 iter.any(|v| eq_value(doc, start, v))
104 } else {
105 false
106 }
107 }
108 FilterOp::NotIn => {
109 if let Some(mut iter) = filter.value.as_array_iter() {
110 !iter.any(|v| eq_value(doc, start, v))
111 } else {
112 true
113 }
114 }
115 FilterOp::ArrayContains => array_any(doc, start, |elem_start| {
116 eq_value(doc, elem_start, &filter.value)
117 }),
118 FilterOp::ArrayContainsAll => {
119 if let Some(mut needles) = filter.value.as_array_iter() {
120 needles.all(|needle| {
121 array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
122 })
123 } else {
124 false
125 }
126 }
127 FilterOp::ArrayOverlap => {
128 if let Some(mut needles) = filter.value.as_array_iter() {
129 needles.any(|needle| {
130 array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
131 })
132 } else {
133 false
134 }
135 }
136 FilterOp::GtColumn
138 | FilterOp::GteColumn
139 | FilterOp::LtColumn
140 | FilterOp::LteColumn
141 | FilterOp::EqColumn
142 | FilterOp::NeColumn => {
143 let other_col = match &filter.value {
144 nodedb_types::Value::String(s) => s.as_str(),
145 _ => return false,
146 };
147 let other_range = extract_field(doc, 0, other_col).or_else(|| {
149 let suffix = format!(".{other_col}");
150 find_field_by_suffix(doc, &suffix)
151 });
152 let Some((other_start, _)) = other_range else {
153 return false;
154 };
155 let left = read_value(doc, start).unwrap_or(nodedb_types::Value::Null);
157 let right = read_value(doc, other_start).unwrap_or(nodedb_types::Value::Null);
158 match filter.op {
159 FilterOp::GtColumn => left.cmp_coerced(&right) == Ordering::Greater,
160 FilterOp::GteColumn => left.cmp_coerced(&right) != Ordering::Less,
161 FilterOp::LtColumn => left.cmp_coerced(&right) == Ordering::Less,
162 FilterOp::LteColumn => left.cmp_coerced(&right) != Ordering::Greater,
163 FilterOp::EqColumn => left.eq_coerced(&right),
164 FilterOp::NeColumn => !left.eq_coerced(&right),
165 _ => false,
166 }
167 }
168 _ => false,
169 }
170}
171
172fn find_field_by_suffix(doc: &[u8], suffix: &str) -> Option<(usize, usize)> {
174 let (count, mut pos) = map_header(doc, 0)?;
175 for _ in 0..count {
176 let key = read_str(doc, pos);
177 let key_end = skip_value(doc, pos)?;
178 let val_start = key_end;
179 let val_end = skip_value(doc, val_start)?;
180 if let Some(k) = key
181 && k.ends_with(suffix)
182 {
183 return Some((val_start, val_end));
184 }
185 pos = val_end;
186 }
187 None
188}
189
190#[inline]
195fn eq_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> bool {
196 if read_null(buf, offset) {
197 return filter_val.is_null();
198 }
199 match read_value(buf, offset) {
200 Some(field_val) => filter_val.eq_coerced(&field_val),
201 None => false,
202 }
203}
204
205#[inline]
210fn cmp_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> Ordering {
211 match read_value(buf, offset) {
212 Some(field_val) => field_val.cmp_coerced(filter_val),
213 None => Ordering::Equal,
214 }
215}
216
217#[inline]
219fn str_match(
220 buf: &[u8],
221 offset: usize,
222 pattern_val: &nodedb_types::Value,
223 icase: bool,
224 negate: bool,
225) -> bool {
226 let result = if let (Some(s), Some(pattern)) = (read_str(buf, offset), pattern_val.as_str()) {
227 sql_like_match(s, pattern, icase)
228 } else {
229 false
230 };
231 if negate { !result } else { result }
232}
233
234fn array_any(buf: &[u8], start: usize, mut pred: impl FnMut(usize) -> bool) -> bool {
236 let Some((count, mut pos)) = array_header(buf, start) else {
237 return false;
238 };
239 for _ in 0..count {
240 if pred(pos) {
241 return true;
242 }
243 let Some(next) = skip_value(buf, pos) else {
244 return false;
245 };
246 pos = next;
247 }
248 false
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use serde_json::json;
255
256 fn encode(v: &serde_json::Value) -> Vec<u8> {
257 nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
258 }
259
260 fn filter(field: &str, op: &str, value: nodedb_types::Value) -> ScanFilter {
261 ScanFilter {
262 field: field.into(),
263 op: op.into(),
264 value,
265 clauses: vec![],
266 }
267 }
268
269 #[test]
270 fn eq_integer() {
271 let doc = encode(&json!({"age": 25}));
272 assert!(filter("age", "eq", nodedb_types::Value::Integer(25)).matches_binary(&doc));
273 assert!(!filter("age", "eq", nodedb_types::Value::Integer(30)).matches_binary(&doc));
274 }
275
276 #[test]
277 fn eq_string() {
278 let doc = encode(&json!({"name": "alice"}));
279 assert!(
280 filter("name", "eq", nodedb_types::Value::String("alice".into())).matches_binary(&doc)
281 );
282 }
283
284 #[test]
285 fn eq_coercion_int_vs_string() {
286 let doc = encode(&json!({"age": 25}));
287 assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
288 }
289
290 #[test]
291 fn eq_coercion_string_vs_int() {
292 let doc = encode(&json!({"score": "90"}));
293 assert!(filter("score", "eq", nodedb_types::Value::Integer(90)).matches_binary(&doc));
294 }
295
296 #[test]
297 fn ne() {
298 let doc = encode(&json!({"x": 1}));
299 assert!(filter("x", "ne", nodedb_types::Value::Integer(2)).matches_binary(&doc));
300 assert!(!filter("x", "ne", nodedb_types::Value::Integer(1)).matches_binary(&doc));
301 }
302
303 #[test]
304 fn gt_lt() {
305 let doc = encode(&json!({"v": 10}));
306 assert!(filter("v", "gt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
307 assert!(!filter("v", "gt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
308 assert!(filter("v", "lt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
309 assert!(!filter("v", "lt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
310 }
311
312 #[test]
313 fn gte_lte() {
314 let doc = encode(&json!({"v": 10}));
315 assert!(filter("v", "gte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
316 assert!(filter("v", "gte", nodedb_types::Value::Integer(5)).matches_binary(&doc));
317 assert!(!filter("v", "gte", nodedb_types::Value::Integer(15)).matches_binary(&doc));
318 assert!(filter("v", "lte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
319 }
320
321 #[test]
322 fn is_null_not_null() {
323 let doc = encode(&json!({"a": null, "b": 1}));
324 assert!(filter("a", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
325 assert!(!filter("b", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
326 assert!(filter("b", "is_not_null", nodedb_types::Value::Null).matches_binary(&doc));
327 }
328
329 #[test]
330 fn missing_field_is_null() {
331 let doc = encode(&json!({"x": 1}));
332 assert!(filter("missing", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
333 }
334
335 #[test]
336 fn contains_str() {
337 let doc = encode(&json!({"msg": "hello world"}));
338 assert!(
339 filter(
340 "msg",
341 "contains",
342 nodedb_types::Value::String("world".into())
343 )
344 .matches_binary(&doc)
345 );
346 }
347
348 #[test]
349 fn like_ilike() {
350 let doc = encode(&json!({"name": "Alice"}));
351 assert!(
352 filter("name", "like", nodedb_types::Value::String("Ali%".into())).matches_binary(&doc)
353 );
354 assert!(
355 !filter("name", "like", nodedb_types::Value::String("ali%".into()))
356 .matches_binary(&doc)
357 );
358 assert!(
359 filter("name", "ilike", nodedb_types::Value::String("ali%".into()))
360 .matches_binary(&doc)
361 );
362 assert!(
363 filter(
364 "name",
365 "not_like",
366 nodedb_types::Value::String("Bob%".into())
367 )
368 .matches_binary(&doc)
369 );
370 }
371
372 #[test]
373 fn in_not_in() {
374 let doc = encode(&json!({"status": "active"}));
375 let vals = nodedb_types::Value::Array(vec![
376 nodedb_types::Value::String("active".into()),
377 nodedb_types::Value::String("pending".into()),
378 ]);
379 assert!(
380 ScanFilter {
381 field: "status".into(),
382 op: "in".into(),
383 value: vals.clone(),
384 clauses: vec![]
385 }
386 .matches_binary(&doc)
387 );
388
389 let doc2 = encode(&json!({"status": "deleted"}));
390 assert!(
391 ScanFilter {
392 field: "status".into(),
393 op: "not_in".into(),
394 value: vals,
395 clauses: vec![]
396 }
397 .matches_binary(&doc2)
398 );
399 }
400
401 #[test]
402 fn array_contains() {
403 let doc = encode(&json!({"tags": ["rust", "db", "fast"]}));
404 assert!(
405 filter(
406 "tags",
407 "array_contains",
408 nodedb_types::Value::String("rust".into())
409 )
410 .matches_binary(&doc)
411 );
412 assert!(
413 !filter(
414 "tags",
415 "array_contains",
416 nodedb_types::Value::String("slow".into())
417 )
418 .matches_binary(&doc)
419 );
420 }
421
422 #[test]
423 fn array_contains_all() {
424 let doc = encode(&json!({"tags": ["a", "b", "c"]}));
425 let needles = nodedb_types::Value::Array(vec![
426 nodedb_types::Value::String("a".into()),
427 nodedb_types::Value::String("c".into()),
428 ]);
429 assert!(
430 ScanFilter {
431 field: "tags".into(),
432 op: "array_contains_all".into(),
433 value: needles,
434 clauses: vec![]
435 }
436 .matches_binary(&doc)
437 );
438 }
439
440 #[test]
441 fn array_overlap() {
442 let doc = encode(&json!({"tags": ["x", "y"]}));
443 let needles = nodedb_types::Value::Array(vec![
444 nodedb_types::Value::String("y".into()),
445 nodedb_types::Value::String("z".into()),
446 ]);
447 assert!(
448 ScanFilter {
449 field: "tags".into(),
450 op: "array_overlap".into(),
451 value: needles,
452 clauses: vec![]
453 }
454 .matches_binary(&doc)
455 );
456 }
457
458 #[test]
459 fn or_clauses() {
460 let doc = encode(&json!({"x": 5}));
461 let f = ScanFilter {
462 field: String::new(),
463 op: "or".into(),
464 value: nodedb_types::Value::Null,
465 clauses: vec![
466 vec![filter("x", "eq", nodedb_types::Value::Integer(10))],
467 vec![filter("x", "eq", nodedb_types::Value::Integer(5))],
468 ],
469 };
470 assert!(f.matches_binary(&doc));
471 }
472
473 #[test]
474 fn match_all() {
475 let doc = encode(&json!({"any": "thing"}));
476 assert!(filter("", "match_all", nodedb_types::Value::Null).matches_binary(&doc));
477 }
478
479 #[test]
480 fn float_comparison() {
481 let doc = encode(&json!({"temp": 36.6}));
482 assert!(filter("temp", "gt", nodedb_types::Value::Float(30.0)).matches_binary(&doc));
483 assert!(filter("temp", "lt", nodedb_types::Value::Float(40.0)).matches_binary(&doc));
484 }
485
486 #[test]
487 fn bool_eq() {
488 let doc = encode(&json!({"active": true}));
489 assert!(filter("active", "eq", nodedb_types::Value::Bool(true)).matches_binary(&doc));
490 assert!(!filter("active", "eq", nodedb_types::Value::Bool(false)).matches_binary(&doc));
491 }
492
493 #[test]
494 fn gt_coercion_string_field() {
495 let doc = encode(&json!({"score": "90"}));
496 assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
497 }
498
499 #[test]
502 fn indexed_matches_same_as_sequential() {
503 let doc = encode(&json!({"a": 1, "b": "hello", "c": true, "d": null}));
504 let idx = FieldIndex::build(&doc, 0).unwrap();
505
506 let filters = vec![
507 filter("a", "eq", nodedb_types::Value::Integer(1)),
508 filter("b", "contains", nodedb_types::Value::String("ell".into())),
509 filter("c", "eq", nodedb_types::Value::Bool(true)),
510 filter("d", "is_null", nodedb_types::Value::Null),
511 filter("missing", "is_null", nodedb_types::Value::Null),
512 ];
513
514 for f in &filters {
515 assert_eq!(
516 f.matches_binary(&doc),
517 f.matches_binary_indexed(&doc, &idx),
518 "mismatch for field={} op={:?}",
519 f.field,
520 f.op
521 );
522 }
523 }
524}