1use std::cmp::Ordering;
11
12use crate::msgpack_scan::field::extract_field;
13use crate::msgpack_scan::index::FieldIndex;
14use crate::msgpack_scan::reader::{
15 array_header, map_header, read_null, read_str, read_value, skip_value,
16};
17use crate::scan_filter::like::sql_like_match;
18use crate::scan_filter::{FilterOp, ScanFilter};
19
20impl ScanFilter {
21 pub fn matches_binary(&self, doc: &[u8]) -> bool {
25 match self.op {
26 FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
27 FilterOp::Or => {
28 return self
29 .clauses
30 .iter()
31 .any(|clause| clause.iter().all(|f| f.matches_binary(doc)));
32 }
33 FilterOp::Expr => {
34 return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
35 (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
36 _ => false,
37 };
38 }
39 _ => {}
40 }
41
42 let (start, end) = match extract_field(doc, 0, &self.field) {
43 Some(r) => r,
44 None => {
45 let suffix = format!(".{}", self.field);
47 match find_field_by_suffix(doc, &suffix) {
48 Some(r) => r,
49 None => return self.op == FilterOp::IsNull,
50 }
51 }
52 };
53
54 eval_op(self, doc, start, end)
55 }
56
57 pub fn matches_binary_indexed(&self, doc: &[u8], idx: &FieldIndex) -> bool {
61 match self.op {
62 FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
63 FilterOp::Or => {
64 return self
65 .clauses
66 .iter()
67 .any(|clause| clause.iter().all(|f| f.matches_binary_indexed(doc, idx)));
68 }
69 FilterOp::Expr => {
70 return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
71 (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
72 _ => false,
73 };
74 }
75 _ => {}
76 }
77
78 let (start, end) = match idx.get(&self.field) {
79 Some(r) => r,
80 None => return self.op == FilterOp::IsNull,
81 };
82
83 eval_op(self, doc, start, end)
84 }
85}
86
87fn eval_op(filter: &ScanFilter, doc: &[u8], start: usize, _end: usize) -> bool {
89 match filter.op {
90 FilterOp::IsNull => read_null(doc, start),
91 FilterOp::IsNotNull => !read_null(doc, start),
92 FilterOp::Eq => eq_value(doc, start, &filter.value),
93 FilterOp::Ne => !eq_value(doc, start, &filter.value),
94 FilterOp::Gt => cmp_value(doc, start, &filter.value) == Ordering::Greater,
95 FilterOp::Gte => {
96 let c = cmp_value(doc, start, &filter.value);
97 c == Ordering::Greater || c == Ordering::Equal
98 }
99 FilterOp::Lt => cmp_value(doc, start, &filter.value) == Ordering::Less,
100 FilterOp::Lte => {
101 let c = cmp_value(doc, start, &filter.value);
102 c == Ordering::Less || c == Ordering::Equal
103 }
104 FilterOp::Contains => {
105 if let (Some(s), Some(pattern)) = (read_str(doc, start), filter.value.as_str()) {
106 s.contains(pattern)
107 } else {
108 false
109 }
110 }
111 FilterOp::Like => str_match(doc, start, &filter.value, false, false),
112 FilterOp::NotLike => str_match(doc, start, &filter.value, false, true),
113 FilterOp::Ilike => str_match(doc, start, &filter.value, true, false),
114 FilterOp::NotIlike => str_match(doc, start, &filter.value, true, true),
115 FilterOp::In => {
116 if let Some(mut iter) = filter.value.as_array_iter() {
117 iter.any(|v| eq_value(doc, start, v))
118 } else {
119 false
120 }
121 }
122 FilterOp::NotIn => {
123 if let Some(mut iter) = filter.value.as_array_iter() {
124 !iter.any(|v| eq_value(doc, start, v))
125 } else {
126 true
127 }
128 }
129 FilterOp::ArrayContains => array_any(doc, start, |elem_start| {
130 eq_value(doc, elem_start, &filter.value)
131 }),
132 FilterOp::ArrayContainsAll => {
133 if let Some(mut needles) = filter.value.as_array_iter() {
134 needles.all(|needle| {
135 array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
136 })
137 } else {
138 false
139 }
140 }
141 FilterOp::ArrayOverlap => {
142 if let Some(mut needles) = filter.value.as_array_iter() {
143 needles.any(|needle| {
144 array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
145 })
146 } else {
147 false
148 }
149 }
150 FilterOp::GtColumn
152 | FilterOp::GteColumn
153 | FilterOp::LtColumn
154 | FilterOp::LteColumn
155 | FilterOp::EqColumn
156 | FilterOp::NeColumn => {
157 let other_col = match &filter.value {
158 nodedb_types::Value::String(s) => s.as_str(),
159 _ => return false,
160 };
161 let other_range = extract_field(doc, 0, other_col).or_else(|| {
163 let suffix = format!(".{other_col}");
164 find_field_by_suffix(doc, &suffix)
165 });
166 let Some((other_start, _)) = other_range else {
167 return false;
168 };
169 let left = read_value(doc, start).unwrap_or(nodedb_types::Value::Null);
171 let right = read_value(doc, other_start).unwrap_or(nodedb_types::Value::Null);
172 match filter.op {
173 FilterOp::GtColumn => left.cmp_coerced(&right) == Ordering::Greater,
174 FilterOp::GteColumn => left.cmp_coerced(&right) != Ordering::Less,
175 FilterOp::LtColumn => left.cmp_coerced(&right) == Ordering::Less,
176 FilterOp::LteColumn => left.cmp_coerced(&right) != Ordering::Greater,
177 FilterOp::EqColumn => left.eq_coerced(&right),
178 FilterOp::NeColumn => !left.eq_coerced(&right),
179 _ => false,
180 }
181 }
182 _ => false,
183 }
184}
185
186fn find_field_by_suffix(doc: &[u8], suffix: &str) -> Option<(usize, usize)> {
188 let (count, mut pos) = map_header(doc, 0)?;
189 for _ in 0..count {
190 let key = read_str(doc, pos);
191 let key_end = skip_value(doc, pos)?;
192 let val_start = key_end;
193 let val_end = skip_value(doc, val_start)?;
194 if let Some(k) = key
195 && k.ends_with(suffix)
196 {
197 return Some((val_start, val_end));
198 }
199 pos = val_end;
200 }
201 None
202}
203
204#[inline]
209fn eq_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> bool {
210 if read_null(buf, offset) {
211 return filter_val.is_null();
212 }
213 match read_value(buf, offset) {
214 Some(field_val) => filter_val.eq_coerced(&field_val),
215 None => false,
216 }
217}
218
219#[inline]
224fn cmp_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> Ordering {
225 match read_value(buf, offset) {
226 Some(field_val) => field_val.cmp_coerced(filter_val),
227 None => Ordering::Equal,
228 }
229}
230
231#[inline]
233fn str_match(
234 buf: &[u8],
235 offset: usize,
236 pattern_val: &nodedb_types::Value,
237 icase: bool,
238 negate: bool,
239) -> bool {
240 let result = if let (Some(s), Some(pattern)) = (read_str(buf, offset), pattern_val.as_str()) {
241 sql_like_match(s, pattern, icase)
242 } else {
243 false
244 };
245 if negate { !result } else { result }
246}
247
248fn array_any(buf: &[u8], start: usize, mut pred: impl FnMut(usize) -> bool) -> bool {
250 let Some((count, mut pos)) = array_header(buf, start) else {
251 return false;
252 };
253 for _ in 0..count {
254 if pred(pos) {
255 return true;
256 }
257 let Some(next) = skip_value(buf, pos) else {
258 return false;
259 };
260 pos = next;
261 }
262 false
263}
264
265#[cfg(test)]
266mod tests {
267 use super::*;
268 use serde_json::json;
269
270 fn encode(v: &serde_json::Value) -> Vec<u8> {
271 nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
272 }
273
274 fn filter(field: &str, op: &str, value: nodedb_types::Value) -> ScanFilter {
275 ScanFilter {
276 field: field.into(),
277 op: op.into(),
278 value,
279 clauses: vec![],
280 expr: None,
281 }
282 }
283
284 #[test]
285 fn eq_integer() {
286 let doc = encode(&json!({"age": 25}));
287 assert!(filter("age", "eq", nodedb_types::Value::Integer(25)).matches_binary(&doc));
288 assert!(!filter("age", "eq", nodedb_types::Value::Integer(30)).matches_binary(&doc));
289 }
290
291 #[test]
292 fn eq_coerces_string_to_integer() {
293 let doc = encode(&json!({"age": 25}));
294 assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
295 }
296
297 #[test]
298 fn gt_coerces_string_to_integer() {
299 let doc = encode(&json!({"score": "90"}));
300 assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
301 }
302
303 #[test]
304 fn eq_string() {
305 let doc = encode(&json!({"name": "alice"}));
306 assert!(
307 filter("name", "eq", nodedb_types::Value::String("alice".into())).matches_binary(&doc)
308 );
309 }
310
311 #[test]
312 fn eq_coercion_int_vs_string() {
313 let doc = encode(&json!({"age": 25}));
314 assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
315 }
316
317 #[test]
318 fn eq_coercion_string_vs_int() {
319 let doc = encode(&json!({"score": "90"}));
320 assert!(filter("score", "eq", nodedb_types::Value::Integer(90)).matches_binary(&doc));
321 }
322
323 #[test]
324 fn ne() {
325 let doc = encode(&json!({"x": 1}));
326 assert!(filter("x", "ne", nodedb_types::Value::Integer(2)).matches_binary(&doc));
327 assert!(!filter("x", "ne", nodedb_types::Value::Integer(1)).matches_binary(&doc));
328 }
329
330 #[test]
331 fn gt_lt() {
332 let doc = encode(&json!({"v": 10}));
333 assert!(filter("v", "gt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
334 assert!(!filter("v", "gt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
335 assert!(filter("v", "lt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
336 assert!(!filter("v", "lt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
337 }
338
339 #[test]
340 fn gte_lte() {
341 let doc = encode(&json!({"v": 10}));
342 assert!(filter("v", "gte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
343 assert!(filter("v", "gte", nodedb_types::Value::Integer(5)).matches_binary(&doc));
344 assert!(!filter("v", "gte", nodedb_types::Value::Integer(15)).matches_binary(&doc));
345 assert!(filter("v", "lte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
346 }
347
348 #[test]
349 fn is_null_not_null() {
350 let doc = encode(&json!({"a": null, "b": 1}));
351 assert!(filter("a", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
352 assert!(!filter("b", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
353 assert!(filter("b", "is_not_null", nodedb_types::Value::Null).matches_binary(&doc));
354 }
355
356 #[test]
357 fn missing_field_is_null() {
358 let doc = encode(&json!({"x": 1}));
359 assert!(filter("missing", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
360 }
361
362 #[test]
363 fn contains_str() {
364 let doc = encode(&json!({"msg": "hello world"}));
365 assert!(
366 filter(
367 "msg",
368 "contains",
369 nodedb_types::Value::String("world".into())
370 )
371 .matches_binary(&doc)
372 );
373 }
374
375 #[test]
376 fn like_ilike() {
377 let doc = encode(&json!({"name": "Alice"}));
378 assert!(
379 filter("name", "like", nodedb_types::Value::String("Ali%".into())).matches_binary(&doc)
380 );
381 assert!(
382 !filter("name", "like", nodedb_types::Value::String("ali%".into()))
383 .matches_binary(&doc)
384 );
385 assert!(
386 filter("name", "ilike", nodedb_types::Value::String("ali%".into()))
387 .matches_binary(&doc)
388 );
389 assert!(
390 filter(
391 "name",
392 "not_like",
393 nodedb_types::Value::String("Bob%".into())
394 )
395 .matches_binary(&doc)
396 );
397 }
398
399 #[test]
400 fn in_not_in() {
401 let doc = encode(&json!({"status": "active"}));
402 let vals = nodedb_types::Value::Array(vec![
403 nodedb_types::Value::String("active".into()),
404 nodedb_types::Value::String("pending".into()),
405 ]);
406 assert!(
407 ScanFilter {
408 field: "status".into(),
409 op: "in".into(),
410 value: vals.clone(),
411 clauses: vec![],
412 expr: None
413 }
414 .matches_binary(&doc)
415 );
416
417 let doc2 = encode(&json!({"status": "deleted"}));
418 assert!(
419 ScanFilter {
420 field: "status".into(),
421 op: "not_in".into(),
422 value: vals,
423 clauses: vec![],
424 expr: None
425 }
426 .matches_binary(&doc2)
427 );
428 }
429
430 #[test]
431 fn array_contains() {
432 let doc = encode(&json!({"tags": ["rust", "db", "fast"]}));
433 assert!(
434 filter(
435 "tags",
436 "array_contains",
437 nodedb_types::Value::String("rust".into())
438 )
439 .matches_binary(&doc)
440 );
441 assert!(
442 !filter(
443 "tags",
444 "array_contains",
445 nodedb_types::Value::String("slow".into())
446 )
447 .matches_binary(&doc)
448 );
449 }
450
451 #[test]
452 fn array_contains_all() {
453 let doc = encode(&json!({"tags": ["a", "b", "c"]}));
454 let needles = nodedb_types::Value::Array(vec![
455 nodedb_types::Value::String("a".into()),
456 nodedb_types::Value::String("c".into()),
457 ]);
458 assert!(
459 ScanFilter {
460 field: "tags".into(),
461 op: "array_contains_all".into(),
462 value: needles,
463 clauses: vec![],
464 expr: None
465 }
466 .matches_binary(&doc)
467 );
468 }
469
470 #[test]
471 fn array_overlap() {
472 let doc = encode(&json!({"tags": ["x", "y"]}));
473 let needles = nodedb_types::Value::Array(vec![
474 nodedb_types::Value::String("y".into()),
475 nodedb_types::Value::String("z".into()),
476 ]);
477 assert!(
478 ScanFilter {
479 field: "tags".into(),
480 op: "array_overlap".into(),
481 value: needles,
482 clauses: vec![],
483 expr: None
484 }
485 .matches_binary(&doc)
486 );
487 }
488
489 #[test]
490 fn or_clauses() {
491 let doc = encode(&json!({"x": 5}));
492 let f = ScanFilter {
493 field: String::new(),
494 op: "or".into(),
495 value: nodedb_types::Value::Null,
496 clauses: vec![
497 vec![filter("x", "eq", nodedb_types::Value::Integer(10))],
498 vec![filter("x", "eq", nodedb_types::Value::Integer(5))],
499 ],
500 expr: None,
501 };
502 assert!(f.matches_binary(&doc));
503 }
504
505 #[test]
506 fn match_all() {
507 let doc = encode(&json!({"any": "thing"}));
508 assert!(filter("", "match_all", nodedb_types::Value::Null).matches_binary(&doc));
509 }
510
511 #[test]
512 fn float_comparison() {
513 let doc = encode(&json!({"temp": 36.6}));
514 assert!(filter("temp", "gt", nodedb_types::Value::Float(30.0)).matches_binary(&doc));
515 assert!(filter("temp", "lt", nodedb_types::Value::Float(40.0)).matches_binary(&doc));
516 }
517
518 #[test]
519 fn bool_eq() {
520 let doc = encode(&json!({"active": true}));
521 assert!(filter("active", "eq", nodedb_types::Value::Bool(true)).matches_binary(&doc));
522 assert!(!filter("active", "eq", nodedb_types::Value::Bool(false)).matches_binary(&doc));
523 }
524
525 #[test]
526 fn gt_coercion_string_field() {
527 let doc = encode(&json!({"score": "90"}));
528 assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
529 }
530
531 #[test]
534 fn indexed_matches_same_as_sequential() {
535 let doc = encode(&json!({"a": 1, "b": "hello", "c": true, "d": null}));
536 let idx = FieldIndex::build(&doc, 0).unwrap();
537
538 let filters = vec![
539 filter("a", "eq", nodedb_types::Value::Integer(1)),
540 filter("b", "contains", nodedb_types::Value::String("ell".into())),
541 filter("c", "eq", nodedb_types::Value::Bool(true)),
542 filter("d", "is_null", nodedb_types::Value::Null),
543 filter("missing", "is_null", nodedb_types::Value::Null),
544 ];
545
546 for f in &filters {
547 assert_eq!(
548 f.matches_binary(&doc),
549 f.matches_binary_indexed(&doc, &idx),
550 "mismatch for field={} op={:?}",
551 f.field,
552 f.op
553 );
554 }
555 }
556}