1use std::any::Any;
15use std::hash::{Hash, Hasher};
16use std::sync::Arc;
17
18use arrow::datatypes::DataType;
19use arrow_array::{Array, BooleanArray, LargeBinaryArray};
20use datafusion_common::Result;
21use datafusion_expr::{
22 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
23};
24use parking_lot::Mutex;
25
26use super::json_types;
27
28type PathCache = Mutex<Option<(String, CompiledJsonPath)>>;
33
34#[derive(Debug, Clone, PartialEq)]
38pub enum JsonPathStep {
39 Root,
41 Member(String),
43 ArrayIndex(i64),
45 ArrayWildcard,
47}
48
49#[derive(Debug, Clone, PartialEq)]
54pub struct CompiledJsonPath {
55 pub steps: Vec<JsonPathStep>,
57}
58
59impl CompiledJsonPath {
60 pub fn compile(path_str: &str) -> std::result::Result<Self, String> {
74 let path_str = path_str.trim();
75 if path_str.is_empty() {
76 return Err("empty path expression".into());
77 }
78
79 let mut steps = Vec::new();
80 let chars: Vec<char> = path_str.chars().collect();
81 let mut pos = 0;
82
83 if chars.first() != Some(&'$') {
85 return Err(format!(
86 "path must start with '$', got '{}'",
87 chars.first().unwrap_or(&' ')
88 ));
89 }
90 steps.push(JsonPathStep::Root);
91 pos += 1;
92
93 while pos < chars.len() {
94 match chars[pos] {
95 '.' => {
96 pos += 1;
97 let start = pos;
99 while pos < chars.len()
100 && chars[pos] != '.'
101 && chars[pos] != '['
102 && !chars[pos].is_whitespace()
103 {
104 pos += 1;
105 }
106 if pos == start {
107 return Err(format!("empty member name at position {start}"));
108 }
109 let name: String = chars[start..pos].iter().collect();
110 steps.push(JsonPathStep::Member(name));
111 }
112 '[' => {
113 pos += 1;
114 while pos < chars.len() && chars[pos].is_whitespace() {
116 pos += 1;
117 }
118 if pos >= chars.len() {
119 return Err("unclosed bracket".into());
120 }
121 if chars[pos] == '*' {
122 steps.push(JsonPathStep::ArrayWildcard);
123 pos += 1;
124 } else if chars[pos] == '"' || chars[pos] == '\'' {
125 let quote = chars[pos];
127 pos += 1;
128 let start = pos;
129 while pos < chars.len() && chars[pos] != quote {
130 pos += 1;
131 }
132 if pos >= chars.len() {
133 return Err("unclosed quoted member".into());
134 }
135 let name: String = chars[start..pos].iter().collect();
136 steps.push(JsonPathStep::Member(name));
137 pos += 1; } else {
139 let start = pos;
141 let mut negative = false;
142 if pos < chars.len() && chars[pos] == '-' {
143 negative = true;
144 pos += 1;
145 }
146 while pos < chars.len() && chars[pos].is_ascii_digit() {
147 pos += 1;
148 }
149 if pos == start || (negative && pos == start + 1) {
150 return Err(format!("expected array index or '*' at position {start}"));
151 }
152 let idx_str: String = chars[start..pos].iter().collect();
153 let idx: i64 = idx_str
154 .parse()
155 .map_err(|_| format!("invalid array index: '{idx_str}'"))?;
156 steps.push(JsonPathStep::ArrayIndex(idx));
157 }
158 while pos < chars.len() && chars[pos].is_whitespace() {
160 pos += 1;
161 }
162 if pos >= chars.len() || chars[pos] != ']' {
163 return Err(format!("expected ']' at position {pos}"));
164 }
165 pos += 1;
166 }
167 c if c.is_whitespace() => {
168 pos += 1;
169 }
170 c => {
171 return Err(format!("unexpected character '{c}' at position {pos}"));
172 }
173 }
174 }
175
176 Ok(Self { steps })
177 }
178
179 #[must_use]
183 pub fn evaluate<'a>(&self, jsonb: &'a [u8]) -> Vec<&'a [u8]> {
184 if jsonb.is_empty() {
185 return Vec::new();
186 }
187 let mut current = vec![jsonb];
188
189 for step in &self.steps {
190 match step {
191 JsonPathStep::Root => {
192 }
194 JsonPathStep::Member(name) => {
195 let mut next = Vec::new();
196 for data in ¤t {
197 if let Some(val) = json_types::jsonb_get_field(data, name) {
198 next.push(val);
199 }
200 }
201 current = next;
202 }
203 JsonPathStep::ArrayIndex(idx) => {
204 let mut next = Vec::new();
205 for data in ¤t {
206 if *idx >= 0 {
207 if let Ok(i) = usize::try_from(*idx) {
208 if let Some(val) = json_types::jsonb_array_get(data, i) {
209 next.push(val);
210 }
211 }
212 }
213 }
214 current = next;
215 }
216 JsonPathStep::ArrayWildcard => {
217 let mut next = Vec::new();
218 for data in ¤t {
219 if !data.is_empty() && data[0] == 0x06 {
220 if data.len() >= 5 {
222 let count = u32::from_le_bytes([data[1], data[2], data[3], data[4]])
223 as usize;
224 for i in 0..count {
225 if let Some(val) = json_types::jsonb_array_get(data, i) {
226 next.push(val);
227 }
228 }
229 }
230 }
231 }
232 current = next;
233 }
234 }
235 }
236
237 current
238 }
239
240 #[must_use]
243 pub fn exists(&self, jsonb: &[u8]) -> bool {
244 !self.evaluate(jsonb).is_empty()
245 }
246}
247
248fn compile_cached(
254 cache: &PathCache,
255 path_str: &str,
256) -> std::result::Result<CompiledJsonPath, String> {
257 let mut guard = cache.lock();
258 if let Some((cached_str, cached_path)) = guard.as_ref() {
259 if cached_str == path_str {
260 return Ok(cached_path.clone());
261 }
262 }
263 let compiled = CompiledJsonPath::compile(path_str)?;
264 *guard = Some((path_str.to_owned(), compiled.clone()));
265 Ok(compiled)
266}
267
268#[derive(Debug)]
270pub struct JsonbPathExistsUdf {
271 signature: Signature,
272 path_cache: PathCache,
273}
274
275impl Default for JsonbPathExistsUdf {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281impl JsonbPathExistsUdf {
282 #[must_use]
284 pub fn new() -> Self {
285 Self {
286 signature: Signature::new(
287 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
288 Volatility::Immutable,
289 ),
290 path_cache: Mutex::new(None),
291 }
292 }
293}
294
295impl PartialEq for JsonbPathExistsUdf {
296 fn eq(&self, _other: &Self) -> bool {
297 true
298 }
299}
300
301impl Eq for JsonbPathExistsUdf {}
302
303impl Hash for JsonbPathExistsUdf {
304 fn hash<H: Hasher>(&self, state: &mut H) {
305 "jsonb_path_exists".hash(state);
306 }
307}
308
309impl ScalarUDFImpl for JsonbPathExistsUdf {
310 fn as_any(&self) -> &dyn Any {
311 self
312 }
313
314 fn name(&self) -> &'static str {
315 "jsonb_path_exists"
316 }
317
318 fn signature(&self) -> &Signature {
319 &self.signature
320 }
321
322 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
323 Ok(DataType::Boolean)
324 }
325
326 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
327 let first = &args.args[0];
328 let second = &args.args[1];
329
330 match (first, second) {
331 (
332 ColumnarValue::Array(bin_arr),
333 ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
334 ) => {
335 let compiled = compile_cached(&self.path_cache, path_str)
336 .map_err(datafusion_common::DataFusionError::Execution)?;
337 let binary = bin_arr
338 .as_any()
339 .downcast_ref::<LargeBinaryArray>()
340 .ok_or_else(|| {
341 datafusion_common::DataFusionError::Execution(
342 "expected LargeBinary array".into(),
343 )
344 })?;
345 let results: BooleanArray = (0..binary.len())
346 .map(|i| {
347 if binary.is_null(i) {
348 None
349 } else {
350 Some(compiled.exists(binary.value(i)))
351 }
352 })
353 .collect();
354 Ok(ColumnarValue::Array(Arc::new(results)))
355 }
356 (
357 ColumnarValue::Scalar(datafusion_common::ScalarValue::LargeBinary(Some(bytes))),
358 ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
359 ) => {
360 let compiled = compile_cached(&self.path_cache, path_str)
361 .map_err(datafusion_common::DataFusionError::Execution)?;
362 let result = compiled.exists(bytes);
363 Ok(ColumnarValue::Scalar(
364 datafusion_common::ScalarValue::Boolean(Some(result)),
365 ))
366 }
367 _ => Ok(ColumnarValue::Scalar(
368 datafusion_common::ScalarValue::Boolean(None),
369 )),
370 }
371 }
372}
373
374#[derive(Debug)]
382pub struct JsonbPathMatchUdf {
383 signature: Signature,
384 path_cache: PathCache,
385}
386
387impl Default for JsonbPathMatchUdf {
388 fn default() -> Self {
389 Self::new()
390 }
391}
392
393impl JsonbPathMatchUdf {
394 #[must_use]
396 pub fn new() -> Self {
397 Self {
398 signature: Signature::new(
399 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
400 Volatility::Immutable,
401 ),
402 path_cache: Mutex::new(None),
403 }
404 }
405}
406
407impl PartialEq for JsonbPathMatchUdf {
408 fn eq(&self, _other: &Self) -> bool {
409 true
410 }
411}
412
413impl Eq for JsonbPathMatchUdf {}
414
415impl Hash for JsonbPathMatchUdf {
416 fn hash<H: Hasher>(&self, state: &mut H) {
417 "jsonb_path_match".hash(state);
418 }
419}
420
421fn jsonb_is_true(data: &[u8]) -> Option<bool> {
423 if data.is_empty() {
424 return None;
425 }
426 match data[0] {
427 0x01 => Some(false), 0x02 => Some(true), _ => None, }
431}
432
433impl ScalarUDFImpl for JsonbPathMatchUdf {
434 fn as_any(&self) -> &dyn Any {
435 self
436 }
437
438 fn name(&self) -> &'static str {
439 "jsonb_path_match"
440 }
441
442 fn signature(&self) -> &Signature {
443 &self.signature
444 }
445
446 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
447 Ok(DataType::Boolean)
448 }
449
450 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
451 let first = &args.args[0];
452 let second = &args.args[1];
453
454 match (first, second) {
455 (
456 ColumnarValue::Array(bin_arr),
457 ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
458 ) => {
459 let compiled = compile_cached(&self.path_cache, path_str)
460 .map_err(datafusion_common::DataFusionError::Execution)?;
461 let binary = bin_arr
462 .as_any()
463 .downcast_ref::<LargeBinaryArray>()
464 .ok_or_else(|| {
465 datafusion_common::DataFusionError::Execution(
466 "expected LargeBinary array".into(),
467 )
468 })?;
469 let results: BooleanArray = (0..binary.len())
470 .map(|i| {
471 if binary.is_null(i) {
472 None
473 } else {
474 let matched = compiled.evaluate(binary.value(i));
475 if matched.len() == 1 {
476 jsonb_is_true(matched[0])
477 } else {
478 None
479 }
480 }
481 })
482 .collect();
483 Ok(ColumnarValue::Array(Arc::new(results)))
484 }
485 (
486 ColumnarValue::Scalar(datafusion_common::ScalarValue::LargeBinary(Some(bytes))),
487 ColumnarValue::Scalar(datafusion_common::ScalarValue::Utf8(Some(path_str))),
488 ) => {
489 let compiled = compile_cached(&self.path_cache, path_str)
490 .map_err(datafusion_common::DataFusionError::Execution)?;
491 let matched = compiled.evaluate(bytes);
492 let result = if matched.len() == 1 {
493 jsonb_is_true(matched[0])
494 } else {
495 None
496 };
497 Ok(ColumnarValue::Scalar(
498 datafusion_common::ScalarValue::Boolean(result),
499 ))
500 }
501 _ => Ok(ColumnarValue::Scalar(
502 datafusion_common::ScalarValue::Boolean(None),
503 )),
504 }
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use super::*;
511
512 #[test]
515 fn test_compile_root_only() {
516 let path = CompiledJsonPath::compile("$").unwrap();
517 assert_eq!(path.steps, vec![JsonPathStep::Root]);
518 }
519
520 #[test]
521 fn test_compile_member_access() {
522 let path = CompiledJsonPath::compile("$.name").unwrap();
523 assert_eq!(
524 path.steps,
525 vec![JsonPathStep::Root, JsonPathStep::Member("name".into()),]
526 );
527 }
528
529 #[test]
530 fn test_compile_nested_members() {
531 let path = CompiledJsonPath::compile("$.user.address.city").unwrap();
532 assert_eq!(
533 path.steps,
534 vec![
535 JsonPathStep::Root,
536 JsonPathStep::Member("user".into()),
537 JsonPathStep::Member("address".into()),
538 JsonPathStep::Member("city".into()),
539 ]
540 );
541 }
542
543 #[test]
544 fn test_compile_array_index() {
545 let path = CompiledJsonPath::compile("$.items[0]").unwrap();
546 assert_eq!(
547 path.steps,
548 vec![
549 JsonPathStep::Root,
550 JsonPathStep::Member("items".into()),
551 JsonPathStep::ArrayIndex(0),
552 ]
553 );
554 }
555
556 #[test]
557 fn test_compile_wildcard() {
558 let path = CompiledJsonPath::compile("$.items[*].price").unwrap();
559 assert_eq!(
560 path.steps,
561 vec![
562 JsonPathStep::Root,
563 JsonPathStep::Member("items".into()),
564 JsonPathStep::ArrayWildcard,
565 JsonPathStep::Member("price".into()),
566 ]
567 );
568 }
569
570 #[test]
571 fn test_compile_quoted_member() {
572 let path = CompiledJsonPath::compile("$[\"spaced key\"]").unwrap();
573 assert_eq!(
574 path.steps,
575 vec![
576 JsonPathStep::Root,
577 JsonPathStep::Member("spaced key".into()),
578 ]
579 );
580 }
581
582 #[test]
583 fn test_compile_empty_path_error() {
584 assert!(CompiledJsonPath::compile("").is_err());
585 }
586
587 #[test]
588 fn test_compile_no_root_error() {
589 assert!(CompiledJsonPath::compile("name").is_err());
590 }
591
592 #[test]
595 fn test_evaluate_root() {
596 let json: serde_json::Value = serde_json::from_str(r#"{"a": 1}"#).unwrap();
597 let jsonb = json_types::encode_jsonb(&json);
598 let path = CompiledJsonPath::compile("$").unwrap();
599 let results = path.evaluate(&jsonb);
600 assert_eq!(results.len(), 1);
601 }
602
603 #[test]
604 fn test_evaluate_member() {
605 let json: serde_json::Value = serde_json::from_str(r#"{"name": "Alice"}"#).unwrap();
606 let jsonb = json_types::encode_jsonb(&json);
607 let path = CompiledJsonPath::compile("$.name").unwrap();
608 let results = path.evaluate(&jsonb);
609 assert_eq!(results.len(), 1);
610 assert_eq!(
612 json_types::jsonb_to_text(results[0]),
613 Some("Alice".to_string())
614 );
615 }
616
617 #[test]
618 fn test_evaluate_missing_member() {
619 let json: serde_json::Value = serde_json::from_str(r#"{"name": "Alice"}"#).unwrap();
620 let jsonb = json_types::encode_jsonb(&json);
621 let path = CompiledJsonPath::compile("$.age").unwrap();
622 let results = path.evaluate(&jsonb);
623 assert!(results.is_empty());
624 }
625
626 #[test]
627 fn test_evaluate_array_index() {
628 let json: serde_json::Value = serde_json::from_str(r#"{"items": [10, 20, 30]}"#).unwrap();
629 let jsonb = json_types::encode_jsonb(&json);
630 let path = CompiledJsonPath::compile("$.items[1]").unwrap();
631 let results = path.evaluate(&jsonb);
632 assert_eq!(results.len(), 1);
633 assert_eq!(
634 json_types::jsonb_to_text(results[0]),
635 Some("20".to_string())
636 );
637 }
638
639 #[test]
640 fn test_evaluate_wildcard() {
641 let json: serde_json::Value =
642 serde_json::from_str(r#"{"items": [{"price": 10}, {"price": 20}]}"#).unwrap();
643 let jsonb = json_types::encode_jsonb(&json);
644 let path = CompiledJsonPath::compile("$.items[*].price").unwrap();
645 let results = path.evaluate(&jsonb);
646 assert_eq!(results.len(), 2);
647 }
648
649 #[test]
650 fn test_exists_true() {
651 let json: serde_json::Value = serde_json::from_str(r#"{"users": [{"age": 30}]}"#).unwrap();
652 let jsonb = json_types::encode_jsonb(&json);
653 let path = CompiledJsonPath::compile("$.users[0].age").unwrap();
654 assert!(path.exists(&jsonb));
655 }
656
657 #[test]
658 fn test_exists_false() {
659 let json: serde_json::Value = serde_json::from_str(r#"{"users": [{"age": 30}]}"#).unwrap();
660 let jsonb = json_types::encode_jsonb(&json);
661 let path = CompiledJsonPath::compile("$.users[0].email").unwrap();
662 assert!(!path.exists(&jsonb));
663 }
664
665 #[test]
668 fn test_path_exists_udf_scalar() {
669 let udf = JsonbPathExistsUdf::new();
670 assert_eq!(udf.name(), "jsonb_path_exists");
671 assert_eq!(udf.return_type(&[]).unwrap(), DataType::Boolean);
672 }
673
674 #[test]
675 fn test_path_match_udf_scalar() {
676 let udf = JsonbPathMatchUdf::new();
677 assert_eq!(udf.name(), "jsonb_path_match");
678 assert_eq!(udf.return_type(&[]).unwrap(), DataType::Boolean);
679 }
680}