Skip to main content

selene_graph/
json_search.rs

1//! Exact JSON search over graph node properties.
2//!
3//! This module is the JSON correctness oracle and small-corpus path. It scans
4//! the current graph snapshot for JSON-valued node properties and returns node
5//! candidates whose stored JSON matches the requested exact predicate.
6
7use std::collections::BinaryHeap;
8use std::time::Duration;
9
10use selene_core::{
11    CancellationCause, CancellationChecker, DbString, JsonPathSelector, JsonValue, JsonValueRef,
12    NodeId, Value,
13};
14
15use crate::error::{GraphError, GraphResult};
16use crate::graph::SeleneGraph;
17use crate::shared::SharedGraph;
18use crate::store::RowIndex;
19
20#[path = "json_search/parallel.rs"]
21mod parallel;
22
23pub(crate) const JSON_SEARCH_CANCEL_STRIDE: usize = 1024;
24pub(crate) const JSON_SEARCH_PARALLEL_CHUNK_ROWS: usize = 2048;
25#[cfg(not(test))]
26pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 16_384;
27#[cfg(test)]
28pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 8;
29/// Maximum selector count accepted by GQL JSON path search procedures.
30pub const JSON_PATH_SELECTOR_LIMIT: usize = 64;
31
32/// One JSON-containment node hit.
33#[derive(Clone, Copy, Debug, Eq, PartialEq)]
34pub struct JsonContainmentHit {
35    /// Matched node id.
36    pub node_id: NodeId,
37}
38
39/// One JSON path-existence node hit.
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub struct JsonPathHit {
42    /// Matched node id.
43    pub node_id: NodeId,
44}
45
46/// One JSON path-containment node hit.
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub struct JsonPathContainmentHit {
49    /// Matched node id.
50    pub node_id: NodeId,
51}
52
53/// One JSON path-value node hit.
54#[derive(Clone, Debug, PartialEq)]
55pub struct JsonPathValueHit {
56    /// Matched node id.
57    pub node_id: NodeId,
58    /// JSON value selected by the requested path.
59    pub value: JsonValue,
60}
61
62/// Error returned by checked JSON search APIs.
63#[derive(Debug, thiserror::Error)]
64pub enum JsonSearchError {
65    /// Graph storage or consistency failure.
66    #[error(transparent)]
67    Graph(#[from] GraphError),
68    /// Caller requested cooperative cancellation.
69    #[error("JSON search cancelled")]
70    Cancelled,
71    /// Statement deadline elapsed.
72    #[error("JSON search timed out after {elapsed:?}")]
73    Timeout {
74        /// Wall-clock duration since the deadline elapsed.
75        elapsed: Duration,
76    },
77    /// Deterministic node-scan budget was exceeded.
78    #[error("JSON search node scan budget exceeded ({scanned} > {limit})")]
79    NodeScanBudgetExceeded {
80        /// Maximum allowed scanned nodes.
81        limit: usize,
82        /// Observed scanned nodes after the batch that crossed the limit.
83        scanned: usize,
84    },
85}
86
87impl JsonSearchError {
88    pub(crate) fn into_graph_error(self) -> GraphError {
89        match self {
90            Self::Graph(error) => error,
91            Self::Cancelled | Self::Timeout { .. } | Self::NodeScanBudgetExceeded { .. } => {
92                GraphError::Inconsistent {
93                    reason: format!("disabled JSON-search checker returned {self}"),
94                }
95            }
96        }
97    }
98}
99
100impl From<CancellationCause> for JsonSearchError {
101    fn from(cause: CancellationCause) -> Self {
102        match cause {
103            CancellationCause::Cancelled => Self::Cancelled,
104            CancellationCause::Timeout { elapsed } => Self::Timeout { elapsed },
105            CancellationCause::NodeScanBudgetExceeded { limit, scanned } => {
106                Self::NodeScanBudgetExceeded { limit, scanned }
107            }
108        }
109    }
110}
111
112impl SeleneGraph {
113    /// Exhaustively find JSON-valued node properties containing `candidate`.
114    pub fn exact_json_contains_nodes(
115        &self,
116        label: &DbString,
117        property: &DbString,
118        candidate: &JsonValue,
119        k: usize,
120    ) -> GraphResult<Vec<JsonContainmentHit>> {
121        self.exact_json_contains_nodes_checked(
122            label,
123            property,
124            candidate,
125            k,
126            CancellationChecker::disabled(),
127        )
128        .map_err(JsonSearchError::into_graph_error)
129    }
130
131    /// Exhaustively find JSON-valued node properties with cancellation checks.
132    pub fn exact_json_contains_nodes_checked(
133        &self,
134        label: &DbString,
135        property: &DbString,
136        candidate: &JsonValue,
137        k: usize,
138        checker: CancellationChecker<'_>,
139    ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
140        checker.check()?;
141        if k == 0 {
142            return Ok(Vec::new());
143        }
144        let Some(label_rows) = self.nodes_with_label(label) else {
145            return Ok(Vec::new());
146        };
147        if parallel::should_parallelize_json_scan(label_rows, k) {
148            let scan = parallel::JsonScan::new(self, label, property);
149            return parallel::contains_nodes(scan, candidate, k, label_rows, checker);
150        }
151
152        let mut top_k = JsonContainmentTopK::new(k);
153        let mut rows_since_check = 0usize;
154        for raw_row in label_rows.iter() {
155            rows_since_check += 1;
156            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
157                checker.note_nodes_scanned(rows_since_check)?;
158                rows_since_check = 0;
159            }
160            if !self.node_store.is_alive(raw_row) {
161                continue;
162            }
163            let row = RowIndex::new(raw_row);
164            let node_id = self
165                .node_id_for_row(row)
166                .ok_or_else(|| GraphError::Inconsistent {
167                    reason: format!(
168                        "label index row {raw_row} for {} has no node id",
169                        label.as_str()
170                    ),
171                })?;
172            let properties = self
173                .node_store
174                .properties
175                .get(raw_row as usize)
176                .ok_or_else(|| GraphError::Inconsistent {
177                    reason: format!(
178                        "JSON search row {raw_row} for {} has no property row",
179                        label.as_str()
180                    ),
181                })?;
182            let Some(Value::Json(value)) = properties.get(property) else {
183                continue;
184            };
185            if value.contains(candidate) {
186                top_k.push(node_id);
187            }
188        }
189        if rows_since_check > 0 {
190            checker.note_nodes_scanned(rows_since_check)?;
191        }
192        Ok(top_k.into_hits())
193    }
194
195    /// Exhaustively find JSON-valued node properties where `path` exists.
196    pub fn exact_json_path_exists_nodes(
197        &self,
198        label: &DbString,
199        property: &DbString,
200        path: &[JsonPathSelector],
201        k: usize,
202    ) -> GraphResult<Vec<JsonPathHit>> {
203        self.exact_json_path_exists_nodes_checked(
204            label,
205            property,
206            path,
207            k,
208            CancellationChecker::disabled(),
209        )
210        .map_err(JsonSearchError::into_graph_error)
211    }
212
213    /// Exhaustively find JSON-valued node properties with path-existence checks.
214    pub fn exact_json_path_exists_nodes_checked(
215        &self,
216        label: &DbString,
217        property: &DbString,
218        path: &[JsonPathSelector],
219        k: usize,
220        checker: CancellationChecker<'_>,
221    ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
222        checker.check()?;
223        if k == 0 || path.is_empty() {
224            return Ok(Vec::new());
225        }
226        let Some(label_rows) = self.nodes_with_label(label) else {
227            return Ok(Vec::new());
228        };
229        if parallel::should_parallelize_json_scan(label_rows, k) {
230            let scan = parallel::JsonScan::new(self, label, property);
231            return parallel::path_exists_nodes(scan, path, k, label_rows, checker);
232        }
233
234        let mut top_k = JsonContainmentTopK::new(k);
235        let mut rows_since_check = 0usize;
236        for raw_row in label_rows.iter() {
237            rows_since_check += 1;
238            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
239                checker.note_nodes_scanned(rows_since_check)?;
240                rows_since_check = 0;
241            }
242            if !self.node_store.is_alive(raw_row) {
243                continue;
244            }
245            let row = RowIndex::new(raw_row);
246            let node_id = self
247                .node_id_for_row(row)
248                .ok_or_else(|| GraphError::Inconsistent {
249                    reason: format!(
250                        "label index row {raw_row} for {} has no node id",
251                        label.as_str()
252                    ),
253                })?;
254            let properties = self
255                .node_store
256                .properties
257                .get(raw_row as usize)
258                .ok_or_else(|| GraphError::Inconsistent {
259                    reason: format!(
260                        "JSON search row {raw_row} for {} has no property row",
261                        label.as_str()
262                    ),
263                })?;
264            let Some(Value::Json(value)) = properties.get(property) else {
265                continue;
266            };
267            if value.path_exists(path) {
268                top_k.push(node_id);
269            }
270        }
271        if rows_since_check > 0 {
272            checker.note_nodes_scanned(rows_since_check)?;
273        }
274        Ok(top_k.into_path_hits())
275    }
276
277    /// Exhaustively find JSON-valued node properties whose selected path
278    /// contains `candidate`.
279    pub fn exact_json_path_contains_nodes(
280        &self,
281        label: &DbString,
282        property: &DbString,
283        path: &[JsonPathSelector],
284        candidate: &JsonValue,
285        k: usize,
286    ) -> GraphResult<Vec<JsonPathContainmentHit>> {
287        self.exact_json_path_contains_nodes_checked(
288            label,
289            property,
290            path,
291            candidate,
292            k,
293            CancellationChecker::disabled(),
294        )
295        .map_err(JsonSearchError::into_graph_error)
296    }
297
298    /// Exhaustively find JSON path containment matches with cancellation checks.
299    pub fn exact_json_path_contains_nodes_checked(
300        &self,
301        label: &DbString,
302        property: &DbString,
303        path: &[JsonPathSelector],
304        candidate: &JsonValue,
305        k: usize,
306        checker: CancellationChecker<'_>,
307    ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
308        checker.check()?;
309        if k == 0 || path.is_empty() {
310            return Ok(Vec::new());
311        }
312        let Some(label_rows) = self.nodes_with_label(label) else {
313            return Ok(Vec::new());
314        };
315        if parallel::should_parallelize_json_scan(label_rows, k) {
316            let scan = parallel::JsonScan::new(self, label, property);
317            return parallel::path_contains_nodes(scan, path, candidate, k, label_rows, checker);
318        }
319
320        let mut top_k = JsonContainmentTopK::new(k);
321        let mut rows_since_check = 0usize;
322        for raw_row in label_rows.iter() {
323            rows_since_check += 1;
324            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
325                checker.note_nodes_scanned(rows_since_check)?;
326                rows_since_check = 0;
327            }
328            if !self.node_store.is_alive(raw_row) {
329                continue;
330            }
331            let row = RowIndex::new(raw_row);
332            let node_id = self
333                .node_id_for_row(row)
334                .ok_or_else(|| GraphError::Inconsistent {
335                    reason: format!(
336                        "label index row {raw_row} for {} has no node id",
337                        label.as_str()
338                    ),
339                })?;
340            let properties = self
341                .node_store
342                .properties
343                .get(raw_row as usize)
344                .ok_or_else(|| GraphError::Inconsistent {
345                    reason: format!(
346                        "JSON search row {raw_row} for {} has no property row",
347                        label.as_str()
348                    ),
349                })?;
350            let Some(Value::Json(value)) = properties.get(property) else {
351                continue;
352            };
353            if value.path_contains(path, candidate) {
354                top_k.push(node_id);
355            }
356        }
357        if rows_since_check > 0 {
358            checker.note_nodes_scanned(rows_since_check)?;
359        }
360        Ok(top_k.into_path_containment_hits())
361    }
362
363    /// Exhaustively find JSON-valued node properties where `path` selects a value.
364    pub fn exact_json_path_value_nodes(
365        &self,
366        label: &DbString,
367        property: &DbString,
368        path: &[JsonPathSelector],
369        k: usize,
370    ) -> GraphResult<Vec<JsonPathValueHit>> {
371        self.exact_json_path_value_nodes_checked(
372            label,
373            property,
374            path,
375            k,
376            CancellationChecker::disabled(),
377        )
378        .map_err(JsonSearchError::into_graph_error)
379    }
380
381    /// Exhaustively find JSON path values with cancellation checks.
382    pub fn exact_json_path_value_nodes_checked(
383        &self,
384        label: &DbString,
385        property: &DbString,
386        path: &[JsonPathSelector],
387        k: usize,
388        checker: CancellationChecker<'_>,
389    ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
390        checker.check()?;
391        if k == 0 || path.is_empty() {
392            return Ok(Vec::new());
393        }
394        let Some(label_rows) = self.nodes_with_label(label) else {
395            return Ok(Vec::new());
396        };
397        if parallel::should_parallelize_json_scan(label_rows, k) {
398            let scan = parallel::JsonScan::new(self, label, property);
399            return parallel::path_value_nodes(scan, path, k, label_rows, checker);
400        }
401
402        let mut top_k = JsonPathValueTopK::new(k);
403        let mut rows_since_check = 0usize;
404        for raw_row in label_rows.iter() {
405            rows_since_check += 1;
406            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
407                checker.note_nodes_scanned(rows_since_check)?;
408                rows_since_check = 0;
409            }
410            if !self.node_store.is_alive(raw_row) {
411                continue;
412            }
413            let row = RowIndex::new(raw_row);
414            let node_id = self
415                .node_id_for_row(row)
416                .ok_or_else(|| GraphError::Inconsistent {
417                    reason: format!(
418                        "label index row {raw_row} for {} has no node id",
419                        label.as_str()
420                    ),
421                })?;
422            let properties = self
423                .node_store
424                .properties
425                .get(raw_row as usize)
426                .ok_or_else(|| GraphError::Inconsistent {
427                    reason: format!(
428                        "JSON search row {raw_row} for {} has no property row",
429                        label.as_str()
430                    ),
431                })?;
432            let Some(Value::Json(value)) = properties.get(property) else {
433                continue;
434            };
435            let Some(value) = value.path_value_ref(path) else {
436                continue;
437            };
438            top_k.push(node_id, value);
439        }
440        if rows_since_check > 0 {
441            checker.note_nodes_scanned(rows_since_check)?;
442        }
443        Ok(top_k.into_hits())
444    }
445}
446
447impl SharedGraph {
448    /// Exhaustively find JSON-valued node properties in the current snapshot.
449    pub fn exact_json_contains_nodes(
450        &self,
451        label: &DbString,
452        property: &DbString,
453        candidate: &JsonValue,
454        k: usize,
455    ) -> GraphResult<Vec<JsonContainmentHit>> {
456        self.read()
457            .exact_json_contains_nodes(label, property, candidate, k)
458    }
459
460    /// Exhaustively find JSON-valued node properties with cancellation checks.
461    pub fn exact_json_contains_nodes_checked(
462        &self,
463        label: &DbString,
464        property: &DbString,
465        candidate: &JsonValue,
466        k: usize,
467        checker: CancellationChecker<'_>,
468    ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
469        self.read()
470            .exact_json_contains_nodes_checked(label, property, candidate, k, checker)
471    }
472
473    /// Exhaustively find JSON-valued node properties where `path` exists.
474    pub fn exact_json_path_exists_nodes(
475        &self,
476        label: &DbString,
477        property: &DbString,
478        path: &[JsonPathSelector],
479        k: usize,
480    ) -> GraphResult<Vec<JsonPathHit>> {
481        self.read()
482            .exact_json_path_exists_nodes(label, property, path, k)
483    }
484
485    /// Exhaustively find JSON-valued node properties with path-existence checks.
486    pub fn exact_json_path_exists_nodes_checked(
487        &self,
488        label: &DbString,
489        property: &DbString,
490        path: &[JsonPathSelector],
491        k: usize,
492        checker: CancellationChecker<'_>,
493    ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
494        self.read()
495            .exact_json_path_exists_nodes_checked(label, property, path, k, checker)
496    }
497
498    /// Exhaustively find JSON-valued node properties whose selected path
499    /// contains `candidate`.
500    pub fn exact_json_path_contains_nodes(
501        &self,
502        label: &DbString,
503        property: &DbString,
504        path: &[JsonPathSelector],
505        candidate: &JsonValue,
506        k: usize,
507    ) -> GraphResult<Vec<JsonPathContainmentHit>> {
508        self.read()
509            .exact_json_path_contains_nodes(label, property, path, candidate, k)
510    }
511
512    /// Exhaustively find JSON path containment matches with cancellation checks.
513    pub fn exact_json_path_contains_nodes_checked(
514        &self,
515        label: &DbString,
516        property: &DbString,
517        path: &[JsonPathSelector],
518        candidate: &JsonValue,
519        k: usize,
520        checker: CancellationChecker<'_>,
521    ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
522        self.read()
523            .exact_json_path_contains_nodes_checked(label, property, path, candidate, k, checker)
524    }
525
526    /// Exhaustively find JSON-valued node properties where `path` selects a value.
527    pub fn exact_json_path_value_nodes(
528        &self,
529        label: &DbString,
530        property: &DbString,
531        path: &[JsonPathSelector],
532        k: usize,
533    ) -> GraphResult<Vec<JsonPathValueHit>> {
534        self.read()
535            .exact_json_path_value_nodes(label, property, path, k)
536    }
537
538    /// Exhaustively find JSON path values with cancellation checks.
539    pub fn exact_json_path_value_nodes_checked(
540        &self,
541        label: &DbString,
542        property: &DbString,
543        path: &[JsonPathSelector],
544        k: usize,
545        checker: CancellationChecker<'_>,
546    ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
547        self.read()
548            .exact_json_path_value_nodes_checked(label, property, path, k, checker)
549    }
550}
551
552struct JsonContainmentTopK {
553    k: usize,
554    nodes: BinaryHeap<NodeId>,
555}
556
557impl JsonContainmentTopK {
558    fn new(k: usize) -> Self {
559        Self {
560            k,
561            nodes: BinaryHeap::new(),
562        }
563    }
564
565    fn push(&mut self, node_id: NodeId) {
566        if self.k == 0 {
567            return;
568        }
569        if self.nodes.len() < self.k {
570            self.nodes.push(node_id);
571            return;
572        }
573        let Some(mut max_node_id) = self.nodes.peek_mut() else {
574            return;
575        };
576        if node_id < *max_node_id {
577            *max_node_id = node_id;
578        }
579    }
580
581    fn into_hits(self) -> Vec<JsonContainmentHit> {
582        self.nodes
583            .into_sorted_vec()
584            .into_iter()
585            .map(|node_id| JsonContainmentHit { node_id })
586            .collect()
587    }
588
589    fn into_path_hits(self) -> Vec<JsonPathHit> {
590        self.nodes
591            .into_sorted_vec()
592            .into_iter()
593            .map(|node_id| JsonPathHit { node_id })
594            .collect()
595    }
596
597    fn into_path_containment_hits(self) -> Vec<JsonPathContainmentHit> {
598        self.nodes
599            .into_sorted_vec()
600            .into_iter()
601            .map(|node_id| JsonPathContainmentHit { node_id })
602            .collect()
603    }
604}
605
606struct JsonPathValueCandidate {
607    node_id: NodeId,
608    value: JsonValue,
609}
610
611impl PartialEq for JsonPathValueCandidate {
612    fn eq(&self, other: &Self) -> bool {
613        self.node_id == other.node_id
614    }
615}
616
617impl Eq for JsonPathValueCandidate {}
618
619impl PartialOrd for JsonPathValueCandidate {
620    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
621        Some(self.cmp(other))
622    }
623}
624
625impl Ord for JsonPathValueCandidate {
626    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
627        self.node_id.cmp(&other.node_id)
628    }
629}
630
631struct JsonPathValueTopK {
632    k: usize,
633    nodes: BinaryHeap<JsonPathValueCandidate>,
634}
635
636impl JsonPathValueTopK {
637    fn new(k: usize) -> Self {
638        Self {
639            k,
640            nodes: BinaryHeap::new(),
641        }
642    }
643
644    fn push(&mut self, node_id: NodeId, value: JsonValueRef<'_>) {
645        self.push_with(node_id, || value.to_owned_json_value());
646    }
647
648    fn push_owned(&mut self, node_id: NodeId, value: JsonValue) {
649        self.push_with(node_id, || value);
650    }
651
652    fn push_with(&mut self, node_id: NodeId, value: impl FnOnce() -> JsonValue) {
653        if self.k == 0 {
654            return;
655        }
656        if self.nodes.len() < self.k {
657            self.nodes.push(JsonPathValueCandidate {
658                node_id,
659                value: value(),
660            });
661            return;
662        }
663        let Some(mut max_node) = self.nodes.peek_mut() else {
664            return;
665        };
666        if node_id < max_node.node_id {
667            *max_node = JsonPathValueCandidate {
668                node_id,
669                value: value(),
670            };
671        }
672    }
673
674    fn into_hits(self) -> Vec<JsonPathValueHit> {
675        self.nodes
676            .into_sorted_vec()
677            .into_iter()
678            .map(|hit| JsonPathValueHit {
679                node_id: hit.node_id,
680                value: hit.value,
681            })
682            .collect()
683    }
684}
685
686#[cfg(test)]
687mod tests;