1use std::cmp::Ordering;
9
10use crate::msgpack_scan::field::extract_field;
11use crate::msgpack_scan::index::FieldIndex;
12use crate::msgpack_scan::reader::{
13 array_header, map_header, read_null, read_str, read_value, skip_value,
14};
15use crate::scan_filter::like::sql_like_match;
16use crate::scan_filter::{FilterOp, ScanFilter};
17
18impl ScanFilter {
19 pub fn matches_binary(&self, doc: &[u8]) -> bool {
23 match self.op {
24 FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
25 FilterOp::Or => {
26 return self
27 .clauses
28 .iter()
29 .any(|clause| clause.iter().all(|f| f.matches_binary(doc)));
30 }
31 FilterOp::Expr => {
32 return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
33 (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
34 _ => false,
35 };
36 }
37 _ => {}
38 }
39
40 let (start, end) = match extract_field(doc, 0, &self.field) {
41 Some(r) => r,
42 None => {
43 let suffix = format!(".{}", self.field);
45 match find_field_by_suffix(doc, &suffix) {
46 Some(r) => r,
47 None => return self.op == FilterOp::IsNull,
48 }
49 }
50 };
51
52 eval_op(self, doc, start, end)
53 }
54
55 pub fn matches_binary_indexed(&self, doc: &[u8], idx: &FieldIndex) -> bool {
59 match self.op {
60 FilterOp::MatchAll | FilterOp::Exists | FilterOp::NotExists => return true,
61 FilterOp::Or => {
62 return self
63 .clauses
64 .iter()
65 .any(|clause| clause.iter().all(|f| f.matches_binary_indexed(doc, idx)));
66 }
67 FilterOp::Expr => {
68 return match (self.expr.as_ref(), nodedb_types::value_from_msgpack(doc)) {
69 (Some(expr), Ok(value)) => crate::value_ops::is_truthy(&expr.eval(&value)),
70 _ => false,
71 };
72 }
73 _ => {}
74 }
75
76 let (start, end) = match idx.get(&self.field) {
77 Some(r) => r,
78 None => return self.op == FilterOp::IsNull,
79 };
80
81 eval_op(self, doc, start, end)
82 }
83}
84
85fn eval_op(filter: &ScanFilter, doc: &[u8], start: usize, _end: usize) -> bool {
87 match filter.op {
88 FilterOp::IsNull => read_null(doc, start),
89 FilterOp::IsNotNull => !read_null(doc, start),
90 FilterOp::Eq => eq_value(doc, start, &filter.value),
91 FilterOp::Ne => !eq_value(doc, start, &filter.value),
92 FilterOp::Gt => cmp_value(doc, start, &filter.value) == Ordering::Greater,
93 FilterOp::Gte => {
94 let c = cmp_value(doc, start, &filter.value);
95 c == Ordering::Greater || c == Ordering::Equal
96 }
97 FilterOp::Lt => cmp_value(doc, start, &filter.value) == Ordering::Less,
98 FilterOp::Lte => {
99 let c = cmp_value(doc, start, &filter.value);
100 c == Ordering::Less || c == Ordering::Equal
101 }
102 FilterOp::Contains => {
103 if let (Some(s), Some(pattern)) = (read_str(doc, start), filter.value.as_str()) {
104 s.contains(pattern)
105 } else {
106 false
107 }
108 }
109 FilterOp::Like => str_match(doc, start, &filter.value, false, false),
110 FilterOp::NotLike => str_match(doc, start, &filter.value, false, true),
111 FilterOp::Ilike => str_match(doc, start, &filter.value, true, false),
112 FilterOp::NotIlike => str_match(doc, start, &filter.value, true, true),
113 FilterOp::In => {
114 if let Some(mut iter) = filter.value.as_array_iter() {
115 iter.any(|v| eq_value(doc, start, v))
116 } else {
117 false
118 }
119 }
120 FilterOp::NotIn => {
121 if let Some(mut iter) = filter.value.as_array_iter() {
122 !iter.any(|v| eq_value(doc, start, v))
123 } else {
124 true
125 }
126 }
127 FilterOp::ArrayContains => array_any(doc, start, |elem_start| {
128 eq_value(doc, elem_start, &filter.value)
129 }),
130 FilterOp::ArrayContainsAll => {
131 if let Some(mut needles) = filter.value.as_array_iter() {
132 needles.all(|needle| {
133 array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
134 })
135 } else {
136 false
137 }
138 }
139 FilterOp::ArrayOverlap => {
140 if let Some(mut needles) = filter.value.as_array_iter() {
141 needles.any(|needle| {
142 array_any(doc, start, |elem_start| eq_value(doc, elem_start, needle))
143 })
144 } else {
145 false
146 }
147 }
148 FilterOp::GtColumn
150 | FilterOp::GteColumn
151 | FilterOp::LtColumn
152 | FilterOp::LteColumn
153 | FilterOp::EqColumn
154 | FilterOp::NeColumn => {
155 let other_col = match &filter.value {
156 nodedb_types::Value::String(s) => s.as_str(),
157 _ => return false,
158 };
159 let other_range = extract_field(doc, 0, other_col).or_else(|| {
161 let suffix = format!(".{other_col}");
162 find_field_by_suffix(doc, &suffix)
163 });
164 let Some((other_start, _)) = other_range else {
165 return false;
166 };
167 let left = read_value(doc, start).unwrap_or(nodedb_types::Value::Null);
169 let right = read_value(doc, other_start).unwrap_or(nodedb_types::Value::Null);
170 match filter.op {
171 FilterOp::GtColumn => left.cmp_coerced(&right) == Ordering::Greater,
172 FilterOp::GteColumn => left.cmp_coerced(&right) != Ordering::Less,
173 FilterOp::LtColumn => left.cmp_coerced(&right) == Ordering::Less,
174 FilterOp::LteColumn => left.cmp_coerced(&right) != Ordering::Greater,
175 FilterOp::EqColumn => left.eq_coerced(&right),
176 FilterOp::NeColumn => !left.eq_coerced(&right),
177 _ => false,
178 }
179 }
180 _ => false,
181 }
182}
183
184fn find_field_by_suffix(doc: &[u8], suffix: &str) -> Option<(usize, usize)> {
186 let (count, mut pos) = map_header(doc, 0)?;
187 for _ in 0..count {
188 let key = read_str(doc, pos);
189 let key_end = skip_value(doc, pos)?;
190 let val_start = key_end;
191 let val_end = skip_value(doc, val_start)?;
192 if let Some(k) = key
193 && k.ends_with(suffix)
194 {
195 return Some((val_start, val_end));
196 }
197 pos = val_end;
198 }
199 None
200}
201
202#[inline]
207fn eq_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> bool {
208 if read_null(buf, offset) {
209 return filter_val.is_null();
210 }
211 match read_value(buf, offset) {
212 Some(field_val) => filter_val.eq_coerced(&field_val),
213 None => false,
214 }
215}
216
217#[inline]
222fn cmp_value(buf: &[u8], offset: usize, filter_val: &nodedb_types::Value) -> Ordering {
223 match read_value(buf, offset) {
224 Some(field_val) => field_val.cmp_coerced(filter_val),
225 None => Ordering::Equal,
226 }
227}
228
229#[inline]
231fn str_match(
232 buf: &[u8],
233 offset: usize,
234 pattern_val: &nodedb_types::Value,
235 icase: bool,
236 negate: bool,
237) -> bool {
238 let result = if let (Some(s), Some(pattern)) = (read_str(buf, offset), pattern_val.as_str()) {
239 sql_like_match(s, pattern, icase)
240 } else {
241 false
242 };
243 if negate { !result } else { result }
244}
245
246fn array_any(buf: &[u8], start: usize, mut pred: impl FnMut(usize) -> bool) -> bool {
248 let Some((count, mut pos)) = array_header(buf, start) else {
249 return false;
250 };
251 for _ in 0..count {
252 if pred(pos) {
253 return true;
254 }
255 let Some(next) = skip_value(buf, pos) else {
256 return false;
257 };
258 pos = next;
259 }
260 false
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use serde_json::json;
267
268 fn encode(v: &serde_json::Value) -> Vec<u8> {
269 nodedb_types::json_msgpack::json_to_msgpack(v).expect("encode")
270 }
271
272 fn filter(field: &str, op: &str, value: nodedb_types::Value) -> ScanFilter {
273 ScanFilter {
274 field: field.into(),
275 op: op.into(),
276 value,
277 clauses: vec![],
278 expr: None,
279 }
280 }
281
282 #[test]
283 fn eq_integer() {
284 let doc = encode(&json!({"age": 25}));
285 assert!(filter("age", "eq", nodedb_types::Value::Integer(25)).matches_binary(&doc));
286 assert!(!filter("age", "eq", nodedb_types::Value::Integer(30)).matches_binary(&doc));
287 }
288
289 #[test]
290 fn eq_coerces_string_to_integer() {
291 let doc = encode(&json!({"age": 25}));
292 assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
293 }
294
295 #[test]
296 fn gt_coerces_string_to_integer() {
297 let doc = encode(&json!({"score": "90"}));
298 assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
299 }
300
301 #[test]
302 fn eq_string() {
303 let doc = encode(&json!({"name": "alice"}));
304 assert!(
305 filter("name", "eq", nodedb_types::Value::String("alice".into())).matches_binary(&doc)
306 );
307 }
308
309 #[test]
310 fn eq_coercion_int_vs_string() {
311 let doc = encode(&json!({"age": 25}));
312 assert!(filter("age", "eq", nodedb_types::Value::String("25".into())).matches_binary(&doc));
313 }
314
315 #[test]
316 fn eq_coercion_string_vs_int() {
317 let doc = encode(&json!({"score": "90"}));
318 assert!(filter("score", "eq", nodedb_types::Value::Integer(90)).matches_binary(&doc));
319 }
320
321 #[test]
322 fn ne() {
323 let doc = encode(&json!({"x": 1}));
324 assert!(filter("x", "ne", nodedb_types::Value::Integer(2)).matches_binary(&doc));
325 assert!(!filter("x", "ne", nodedb_types::Value::Integer(1)).matches_binary(&doc));
326 }
327
328 #[test]
329 fn gt_lt() {
330 let doc = encode(&json!({"v": 10}));
331 assert!(filter("v", "gt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
332 assert!(!filter("v", "gt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
333 assert!(filter("v", "lt", nodedb_types::Value::Integer(15)).matches_binary(&doc));
334 assert!(!filter("v", "lt", nodedb_types::Value::Integer(5)).matches_binary(&doc));
335 }
336
337 #[test]
338 fn gte_lte() {
339 let doc = encode(&json!({"v": 10}));
340 assert!(filter("v", "gte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
341 assert!(filter("v", "gte", nodedb_types::Value::Integer(5)).matches_binary(&doc));
342 assert!(!filter("v", "gte", nodedb_types::Value::Integer(15)).matches_binary(&doc));
343 assert!(filter("v", "lte", nodedb_types::Value::Integer(10)).matches_binary(&doc));
344 }
345
346 #[test]
347 fn is_null_not_null() {
348 let doc = encode(&json!({"a": null, "b": 1}));
349 assert!(filter("a", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
350 assert!(!filter("b", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
351 assert!(filter("b", "is_not_null", nodedb_types::Value::Null).matches_binary(&doc));
352 }
353
354 #[test]
355 fn missing_field_is_null() {
356 let doc = encode(&json!({"x": 1}));
357 assert!(filter("missing", "is_null", nodedb_types::Value::Null).matches_binary(&doc));
358 }
359
360 #[test]
361 fn contains_str() {
362 let doc = encode(&json!({"msg": "hello world"}));
363 assert!(
364 filter(
365 "msg",
366 "contains",
367 nodedb_types::Value::String("world".into())
368 )
369 .matches_binary(&doc)
370 );
371 }
372
373 #[test]
374 fn like_ilike() {
375 let doc = encode(&json!({"name": "Alice"}));
376 assert!(
377 filter("name", "like", nodedb_types::Value::String("Ali%".into())).matches_binary(&doc)
378 );
379 assert!(
380 !filter("name", "like", nodedb_types::Value::String("ali%".into()))
381 .matches_binary(&doc)
382 );
383 assert!(
384 filter("name", "ilike", nodedb_types::Value::String("ali%".into()))
385 .matches_binary(&doc)
386 );
387 assert!(
388 filter(
389 "name",
390 "not_like",
391 nodedb_types::Value::String("Bob%".into())
392 )
393 .matches_binary(&doc)
394 );
395 }
396
397 #[test]
398 fn in_not_in() {
399 let doc = encode(&json!({"status": "active"}));
400 let vals = nodedb_types::Value::Array(vec![
401 nodedb_types::Value::String("active".into()),
402 nodedb_types::Value::String("pending".into()),
403 ]);
404 assert!(
405 ScanFilter {
406 field: "status".into(),
407 op: "in".into(),
408 value: vals.clone(),
409 clauses: vec![],
410 expr: None
411 }
412 .matches_binary(&doc)
413 );
414
415 let doc2 = encode(&json!({"status": "deleted"}));
416 assert!(
417 ScanFilter {
418 field: "status".into(),
419 op: "not_in".into(),
420 value: vals,
421 clauses: vec![],
422 expr: None
423 }
424 .matches_binary(&doc2)
425 );
426 }
427
428 #[test]
429 fn array_contains() {
430 let doc = encode(&json!({"tags": ["rust", "db", "fast"]}));
431 assert!(
432 filter(
433 "tags",
434 "array_contains",
435 nodedb_types::Value::String("rust".into())
436 )
437 .matches_binary(&doc)
438 );
439 assert!(
440 !filter(
441 "tags",
442 "array_contains",
443 nodedb_types::Value::String("slow".into())
444 )
445 .matches_binary(&doc)
446 );
447 }
448
449 #[test]
450 fn array_contains_all() {
451 let doc = encode(&json!({"tags": ["a", "b", "c"]}));
452 let needles = nodedb_types::Value::Array(vec![
453 nodedb_types::Value::String("a".into()),
454 nodedb_types::Value::String("c".into()),
455 ]);
456 assert!(
457 ScanFilter {
458 field: "tags".into(),
459 op: "array_contains_all".into(),
460 value: needles,
461 clauses: vec![],
462 expr: None
463 }
464 .matches_binary(&doc)
465 );
466 }
467
468 #[test]
469 fn array_overlap() {
470 let doc = encode(&json!({"tags": ["x", "y"]}));
471 let needles = nodedb_types::Value::Array(vec![
472 nodedb_types::Value::String("y".into()),
473 nodedb_types::Value::String("z".into()),
474 ]);
475 assert!(
476 ScanFilter {
477 field: "tags".into(),
478 op: "array_overlap".into(),
479 value: needles,
480 clauses: vec![],
481 expr: None
482 }
483 .matches_binary(&doc)
484 );
485 }
486
487 #[test]
488 fn or_clauses() {
489 let doc = encode(&json!({"x": 5}));
490 let f = ScanFilter {
491 field: String::new(),
492 op: "or".into(),
493 value: nodedb_types::Value::Null,
494 clauses: vec![
495 vec![filter("x", "eq", nodedb_types::Value::Integer(10))],
496 vec![filter("x", "eq", nodedb_types::Value::Integer(5))],
497 ],
498 expr: None,
499 };
500 assert!(f.matches_binary(&doc));
501 }
502
503 #[test]
504 fn match_all() {
505 let doc = encode(&json!({"any": "thing"}));
506 assert!(filter("", "match_all", nodedb_types::Value::Null).matches_binary(&doc));
507 }
508
509 #[test]
510 fn float_comparison() {
511 let doc = encode(&json!({"temp": 36.6}));
512 assert!(filter("temp", "gt", nodedb_types::Value::Float(30.0)).matches_binary(&doc));
513 assert!(filter("temp", "lt", nodedb_types::Value::Float(40.0)).matches_binary(&doc));
514 }
515
516 #[test]
517 fn bool_eq() {
518 let doc = encode(&json!({"active": true}));
519 assert!(filter("active", "eq", nodedb_types::Value::Bool(true)).matches_binary(&doc));
520 assert!(!filter("active", "eq", nodedb_types::Value::Bool(false)).matches_binary(&doc));
521 }
522
523 #[test]
524 fn gt_coercion_string_field() {
525 let doc = encode(&json!({"score": "90"}));
526 assert!(filter("score", "gt", nodedb_types::Value::Integer(80)).matches_binary(&doc));
527 }
528
529 #[test]
532 fn indexed_matches_same_as_sequential() {
533 let doc = encode(&json!({"a": 1, "b": "hello", "c": true, "d": null}));
534 let idx = FieldIndex::build(&doc, 0).unwrap();
535
536 let filters = vec![
537 filter("a", "eq", nodedb_types::Value::Integer(1)),
538 filter("b", "contains", nodedb_types::Value::String("ell".into())),
539 filter("c", "eq", nodedb_types::Value::Bool(true)),
540 filter("d", "is_null", nodedb_types::Value::Null),
541 filter("missing", "is_null", nodedb_types::Value::Null),
542 ];
543
544 for f in &filters {
545 assert_eq!(
546 f.matches_binary(&doc),
547 f.matches_binary_indexed(&doc, &idx),
548 "mismatch for field={} op={:?}",
549 f.field,
550 f.op
551 );
552 }
553 }
554}