Skip to main content

selene_graph/
json_search.rs

1//! Exact JSON search over graph node properties.
2//!
3//! This module is the JSON correctness oracle and small-corpus path. It scans
4//! the current graph snapshot for JSON-valued node properties and returns node
5//! candidates whose stored JSON matches the requested exact predicate.
6
7use std::collections::BinaryHeap;
8use std::time::Duration;
9
10use selene_core::{
11    CancellationCause, CancellationChecker, DbString, JsonPathSelector, JsonValue, JsonValueRef,
12    NodeId, Value,
13};
14
15use crate::error::{GraphError, GraphResult};
16use crate::graph::SeleneGraph;
17use crate::shared::SharedGraph;
18use crate::store::RowIndex;
19
20#[path = "json_search/parallel.rs"]
21mod parallel;
22
23pub(crate) const JSON_SEARCH_CANCEL_STRIDE: usize = 1024;
24pub(crate) const JSON_SEARCH_PARALLEL_CHUNK_ROWS: usize = 2048;
25#[cfg(not(test))]
26pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 16_384;
27#[cfg(test)]
28pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 8;
29/// Maximum selector count accepted by GQL JSON path search procedures.
30pub const JSON_PATH_SELECTOR_LIMIT: usize = 64;
31
32/// One JSON-containment node hit.
33#[derive(Clone, Copy, Debug, Eq, PartialEq)]
34pub struct JsonContainmentHit {
35    /// Matched node id.
36    pub node_id: NodeId,
37}
38
39/// One JSON path-existence node hit.
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub struct JsonPathHit {
42    /// Matched node id.
43    pub node_id: NodeId,
44}
45
46/// One JSON path-containment node hit.
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub struct JsonPathContainmentHit {
49    /// Matched node id.
50    pub node_id: NodeId,
51}
52
53/// One JSON path-value node hit.
54#[derive(Clone, Debug, PartialEq)]
55pub struct JsonPathValueHit {
56    /// Matched node id.
57    pub node_id: NodeId,
58    /// JSON value selected by the requested path.
59    pub value: JsonValue,
60}
61
62/// Error returned by checked JSON search APIs.
63#[derive(Debug, thiserror::Error)]
64pub enum JsonSearchError {
65    /// Graph storage or consistency failure.
66    #[error(transparent)]
67    Graph(#[from] GraphError),
68    /// Caller requested cooperative cancellation.
69    #[error("JSON search cancelled")]
70    Cancelled,
71    /// Statement deadline elapsed.
72    #[error("JSON search timed out after {elapsed:?}")]
73    Timeout {
74        /// Wall-clock duration since the deadline elapsed.
75        elapsed: Duration,
76    },
77}
78
79impl JsonSearchError {
80    pub(crate) fn into_graph_error(self) -> GraphError {
81        match self {
82            Self::Graph(error) => error,
83            Self::Cancelled | Self::Timeout { .. } => GraphError::Inconsistent {
84                reason: format!("disabled JSON-search checker returned {self}"),
85            },
86        }
87    }
88}
89
90impl From<CancellationCause> for JsonSearchError {
91    fn from(cause: CancellationCause) -> Self {
92        match cause {
93            CancellationCause::Cancelled => Self::Cancelled,
94            CancellationCause::Timeout { elapsed } => Self::Timeout { elapsed },
95        }
96    }
97}
98
99impl SeleneGraph {
100    /// Exhaustively find JSON-valued node properties containing `candidate`.
101    pub fn exact_json_contains_nodes(
102        &self,
103        label: &DbString,
104        property: &DbString,
105        candidate: &JsonValue,
106        k: usize,
107    ) -> GraphResult<Vec<JsonContainmentHit>> {
108        self.exact_json_contains_nodes_checked(
109            label,
110            property,
111            candidate,
112            k,
113            CancellationChecker::disabled(),
114        )
115        .map_err(JsonSearchError::into_graph_error)
116    }
117
118    /// Exhaustively find JSON-valued node properties with cancellation checks.
119    pub fn exact_json_contains_nodes_checked(
120        &self,
121        label: &DbString,
122        property: &DbString,
123        candidate: &JsonValue,
124        k: usize,
125        checker: CancellationChecker<'_>,
126    ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
127        checker.check()?;
128        if k == 0 {
129            return Ok(Vec::new());
130        }
131        let Some(label_rows) = self.nodes_with_label(label) else {
132            return Ok(Vec::new());
133        };
134        if parallel::should_parallelize_json_scan(label_rows, k) {
135            let scan = parallel::JsonScan::new(self, label, property);
136            return parallel::contains_nodes(scan, candidate, k, label_rows, checker);
137        }
138
139        let mut top_k = JsonContainmentTopK::new(k);
140        let mut rows_since_check = 0usize;
141        for raw_row in label_rows.iter() {
142            rows_since_check += 1;
143            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
144                checker.check()?;
145                rows_since_check = 0;
146            }
147            if !self.node_store.is_alive(raw_row) {
148                continue;
149            }
150            let row = RowIndex::new(raw_row);
151            let node_id = self
152                .node_id_for_row(row)
153                .ok_or_else(|| GraphError::Inconsistent {
154                    reason: format!(
155                        "label index row {raw_row} for {} has no node id",
156                        label.as_str()
157                    ),
158                })?;
159            let properties = self
160                .node_store
161                .properties
162                .get(raw_row as usize)
163                .ok_or_else(|| GraphError::Inconsistent {
164                    reason: format!(
165                        "JSON search row {raw_row} for {} has no property row",
166                        label.as_str()
167                    ),
168                })?;
169            let Some(Value::Json(value)) = properties.get(property) else {
170                continue;
171            };
172            if value.contains(candidate) {
173                top_k.push(node_id);
174            }
175        }
176        Ok(top_k.into_hits())
177    }
178
179    /// Exhaustively find JSON-valued node properties where `path` exists.
180    pub fn exact_json_path_exists_nodes(
181        &self,
182        label: &DbString,
183        property: &DbString,
184        path: &[JsonPathSelector],
185        k: usize,
186    ) -> GraphResult<Vec<JsonPathHit>> {
187        self.exact_json_path_exists_nodes_checked(
188            label,
189            property,
190            path,
191            k,
192            CancellationChecker::disabled(),
193        )
194        .map_err(JsonSearchError::into_graph_error)
195    }
196
197    /// Exhaustively find JSON-valued node properties with path-existence checks.
198    pub fn exact_json_path_exists_nodes_checked(
199        &self,
200        label: &DbString,
201        property: &DbString,
202        path: &[JsonPathSelector],
203        k: usize,
204        checker: CancellationChecker<'_>,
205    ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
206        checker.check()?;
207        if k == 0 || path.is_empty() {
208            return Ok(Vec::new());
209        }
210        let Some(label_rows) = self.nodes_with_label(label) else {
211            return Ok(Vec::new());
212        };
213        if parallel::should_parallelize_json_scan(label_rows, k) {
214            let scan = parallel::JsonScan::new(self, label, property);
215            return parallel::path_exists_nodes(scan, path, k, label_rows, checker);
216        }
217
218        let mut top_k = JsonContainmentTopK::new(k);
219        let mut rows_since_check = 0usize;
220        for raw_row in label_rows.iter() {
221            rows_since_check += 1;
222            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
223                checker.check()?;
224                rows_since_check = 0;
225            }
226            if !self.node_store.is_alive(raw_row) {
227                continue;
228            }
229            let row = RowIndex::new(raw_row);
230            let node_id = self
231                .node_id_for_row(row)
232                .ok_or_else(|| GraphError::Inconsistent {
233                    reason: format!(
234                        "label index row {raw_row} for {} has no node id",
235                        label.as_str()
236                    ),
237                })?;
238            let properties = self
239                .node_store
240                .properties
241                .get(raw_row as usize)
242                .ok_or_else(|| GraphError::Inconsistent {
243                    reason: format!(
244                        "JSON search row {raw_row} for {} has no property row",
245                        label.as_str()
246                    ),
247                })?;
248            let Some(Value::Json(value)) = properties.get(property) else {
249                continue;
250            };
251            if value.path_exists(path) {
252                top_k.push(node_id);
253            }
254        }
255        Ok(top_k.into_path_hits())
256    }
257
258    /// Exhaustively find JSON-valued node properties whose selected path
259    /// contains `candidate`.
260    pub fn exact_json_path_contains_nodes(
261        &self,
262        label: &DbString,
263        property: &DbString,
264        path: &[JsonPathSelector],
265        candidate: &JsonValue,
266        k: usize,
267    ) -> GraphResult<Vec<JsonPathContainmentHit>> {
268        self.exact_json_path_contains_nodes_checked(
269            label,
270            property,
271            path,
272            candidate,
273            k,
274            CancellationChecker::disabled(),
275        )
276        .map_err(JsonSearchError::into_graph_error)
277    }
278
279    /// Exhaustively find JSON path containment matches with cancellation checks.
280    pub fn exact_json_path_contains_nodes_checked(
281        &self,
282        label: &DbString,
283        property: &DbString,
284        path: &[JsonPathSelector],
285        candidate: &JsonValue,
286        k: usize,
287        checker: CancellationChecker<'_>,
288    ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
289        checker.check()?;
290        if k == 0 || path.is_empty() {
291            return Ok(Vec::new());
292        }
293        let Some(label_rows) = self.nodes_with_label(label) else {
294            return Ok(Vec::new());
295        };
296        if parallel::should_parallelize_json_scan(label_rows, k) {
297            let scan = parallel::JsonScan::new(self, label, property);
298            return parallel::path_contains_nodes(scan, path, candidate, k, label_rows, checker);
299        }
300
301        let mut top_k = JsonContainmentTopK::new(k);
302        let mut rows_since_check = 0usize;
303        for raw_row in label_rows.iter() {
304            rows_since_check += 1;
305            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
306                checker.check()?;
307                rows_since_check = 0;
308            }
309            if !self.node_store.is_alive(raw_row) {
310                continue;
311            }
312            let row = RowIndex::new(raw_row);
313            let node_id = self
314                .node_id_for_row(row)
315                .ok_or_else(|| GraphError::Inconsistent {
316                    reason: format!(
317                        "label index row {raw_row} for {} has no node id",
318                        label.as_str()
319                    ),
320                })?;
321            let properties = self
322                .node_store
323                .properties
324                .get(raw_row as usize)
325                .ok_or_else(|| GraphError::Inconsistent {
326                    reason: format!(
327                        "JSON search row {raw_row} for {} has no property row",
328                        label.as_str()
329                    ),
330                })?;
331            let Some(Value::Json(value)) = properties.get(property) else {
332                continue;
333            };
334            if value.path_contains(path, candidate) {
335                top_k.push(node_id);
336            }
337        }
338        Ok(top_k.into_path_containment_hits())
339    }
340
341    /// Exhaustively find JSON-valued node properties where `path` selects a value.
342    pub fn exact_json_path_value_nodes(
343        &self,
344        label: &DbString,
345        property: &DbString,
346        path: &[JsonPathSelector],
347        k: usize,
348    ) -> GraphResult<Vec<JsonPathValueHit>> {
349        self.exact_json_path_value_nodes_checked(
350            label,
351            property,
352            path,
353            k,
354            CancellationChecker::disabled(),
355        )
356        .map_err(JsonSearchError::into_graph_error)
357    }
358
359    /// Exhaustively find JSON path values with cancellation checks.
360    pub fn exact_json_path_value_nodes_checked(
361        &self,
362        label: &DbString,
363        property: &DbString,
364        path: &[JsonPathSelector],
365        k: usize,
366        checker: CancellationChecker<'_>,
367    ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
368        checker.check()?;
369        if k == 0 || path.is_empty() {
370            return Ok(Vec::new());
371        }
372        let Some(label_rows) = self.nodes_with_label(label) else {
373            return Ok(Vec::new());
374        };
375        if parallel::should_parallelize_json_scan(label_rows, k) {
376            let scan = parallel::JsonScan::new(self, label, property);
377            return parallel::path_value_nodes(scan, path, k, label_rows, checker);
378        }
379
380        let mut top_k = JsonPathValueTopK::new(k);
381        let mut rows_since_check = 0usize;
382        for raw_row in label_rows.iter() {
383            rows_since_check += 1;
384            if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
385                checker.check()?;
386                rows_since_check = 0;
387            }
388            if !self.node_store.is_alive(raw_row) {
389                continue;
390            }
391            let row = RowIndex::new(raw_row);
392            let node_id = self
393                .node_id_for_row(row)
394                .ok_or_else(|| GraphError::Inconsistent {
395                    reason: format!(
396                        "label index row {raw_row} for {} has no node id",
397                        label.as_str()
398                    ),
399                })?;
400            let properties = self
401                .node_store
402                .properties
403                .get(raw_row as usize)
404                .ok_or_else(|| GraphError::Inconsistent {
405                    reason: format!(
406                        "JSON search row {raw_row} for {} has no property row",
407                        label.as_str()
408                    ),
409                })?;
410            let Some(Value::Json(value)) = properties.get(property) else {
411                continue;
412            };
413            let Some(value) = value.path_value_ref(path) else {
414                continue;
415            };
416            top_k.push(node_id, value);
417        }
418        Ok(top_k.into_hits())
419    }
420}
421
422impl SharedGraph {
423    /// Exhaustively find JSON-valued node properties in the current snapshot.
424    pub fn exact_json_contains_nodes(
425        &self,
426        label: &DbString,
427        property: &DbString,
428        candidate: &JsonValue,
429        k: usize,
430    ) -> GraphResult<Vec<JsonContainmentHit>> {
431        self.read()
432            .exact_json_contains_nodes(label, property, candidate, k)
433    }
434
435    /// Exhaustively find JSON-valued node properties with cancellation checks.
436    pub fn exact_json_contains_nodes_checked(
437        &self,
438        label: &DbString,
439        property: &DbString,
440        candidate: &JsonValue,
441        k: usize,
442        checker: CancellationChecker<'_>,
443    ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
444        self.read()
445            .exact_json_contains_nodes_checked(label, property, candidate, k, checker)
446    }
447
448    /// Exhaustively find JSON-valued node properties where `path` exists.
449    pub fn exact_json_path_exists_nodes(
450        &self,
451        label: &DbString,
452        property: &DbString,
453        path: &[JsonPathSelector],
454        k: usize,
455    ) -> GraphResult<Vec<JsonPathHit>> {
456        self.read()
457            .exact_json_path_exists_nodes(label, property, path, k)
458    }
459
460    /// Exhaustively find JSON-valued node properties with path-existence checks.
461    pub fn exact_json_path_exists_nodes_checked(
462        &self,
463        label: &DbString,
464        property: &DbString,
465        path: &[JsonPathSelector],
466        k: usize,
467        checker: CancellationChecker<'_>,
468    ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
469        self.read()
470            .exact_json_path_exists_nodes_checked(label, property, path, k, checker)
471    }
472
473    /// Exhaustively find JSON-valued node properties whose selected path
474    /// contains `candidate`.
475    pub fn exact_json_path_contains_nodes(
476        &self,
477        label: &DbString,
478        property: &DbString,
479        path: &[JsonPathSelector],
480        candidate: &JsonValue,
481        k: usize,
482    ) -> GraphResult<Vec<JsonPathContainmentHit>> {
483        self.read()
484            .exact_json_path_contains_nodes(label, property, path, candidate, k)
485    }
486
487    /// Exhaustively find JSON path containment matches with cancellation checks.
488    pub fn exact_json_path_contains_nodes_checked(
489        &self,
490        label: &DbString,
491        property: &DbString,
492        path: &[JsonPathSelector],
493        candidate: &JsonValue,
494        k: usize,
495        checker: CancellationChecker<'_>,
496    ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
497        self.read()
498            .exact_json_path_contains_nodes_checked(label, property, path, candidate, k, checker)
499    }
500
501    /// Exhaustively find JSON-valued node properties where `path` selects a value.
502    pub fn exact_json_path_value_nodes(
503        &self,
504        label: &DbString,
505        property: &DbString,
506        path: &[JsonPathSelector],
507        k: usize,
508    ) -> GraphResult<Vec<JsonPathValueHit>> {
509        self.read()
510            .exact_json_path_value_nodes(label, property, path, k)
511    }
512
513    /// Exhaustively find JSON path values with cancellation checks.
514    pub fn exact_json_path_value_nodes_checked(
515        &self,
516        label: &DbString,
517        property: &DbString,
518        path: &[JsonPathSelector],
519        k: usize,
520        checker: CancellationChecker<'_>,
521    ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
522        self.read()
523            .exact_json_path_value_nodes_checked(label, property, path, k, checker)
524    }
525}
526
527struct JsonContainmentTopK {
528    k: usize,
529    nodes: BinaryHeap<NodeId>,
530}
531
532impl JsonContainmentTopK {
533    fn new(k: usize) -> Self {
534        Self {
535            k,
536            nodes: BinaryHeap::new(),
537        }
538    }
539
540    fn push(&mut self, node_id: NodeId) {
541        if self.k == 0 {
542            return;
543        }
544        if self.nodes.len() < self.k {
545            self.nodes.push(node_id);
546            return;
547        }
548        let Some(mut max_node_id) = self.nodes.peek_mut() else {
549            return;
550        };
551        if node_id < *max_node_id {
552            *max_node_id = node_id;
553        }
554    }
555
556    fn into_hits(self) -> Vec<JsonContainmentHit> {
557        self.nodes
558            .into_sorted_vec()
559            .into_iter()
560            .map(|node_id| JsonContainmentHit { node_id })
561            .collect()
562    }
563
564    fn into_path_hits(self) -> Vec<JsonPathHit> {
565        self.nodes
566            .into_sorted_vec()
567            .into_iter()
568            .map(|node_id| JsonPathHit { node_id })
569            .collect()
570    }
571
572    fn into_path_containment_hits(self) -> Vec<JsonPathContainmentHit> {
573        self.nodes
574            .into_sorted_vec()
575            .into_iter()
576            .map(|node_id| JsonPathContainmentHit { node_id })
577            .collect()
578    }
579}
580
581struct JsonPathValueCandidate {
582    node_id: NodeId,
583    value: JsonValue,
584}
585
586impl PartialEq for JsonPathValueCandidate {
587    fn eq(&self, other: &Self) -> bool {
588        self.node_id == other.node_id
589    }
590}
591
592impl Eq for JsonPathValueCandidate {}
593
594impl PartialOrd for JsonPathValueCandidate {
595    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
596        Some(self.cmp(other))
597    }
598}
599
600impl Ord for JsonPathValueCandidate {
601    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
602        self.node_id.cmp(&other.node_id)
603    }
604}
605
606struct JsonPathValueTopK {
607    k: usize,
608    nodes: BinaryHeap<JsonPathValueCandidate>,
609}
610
611impl JsonPathValueTopK {
612    fn new(k: usize) -> Self {
613        Self {
614            k,
615            nodes: BinaryHeap::new(),
616        }
617    }
618
619    fn push(&mut self, node_id: NodeId, value: JsonValueRef<'_>) {
620        self.push_with(node_id, || value.to_owned_json_value());
621    }
622
623    fn push_owned(&mut self, node_id: NodeId, value: JsonValue) {
624        self.push_with(node_id, || value);
625    }
626
627    fn push_with(&mut self, node_id: NodeId, value: impl FnOnce() -> JsonValue) {
628        if self.k == 0 {
629            return;
630        }
631        if self.nodes.len() < self.k {
632            self.nodes.push(JsonPathValueCandidate {
633                node_id,
634                value: value(),
635            });
636            return;
637        }
638        let Some(mut max_node) = self.nodes.peek_mut() else {
639            return;
640        };
641        if node_id < max_node.node_id {
642            *max_node = JsonPathValueCandidate {
643                node_id,
644                value: value(),
645            };
646        }
647    }
648
649    fn into_hits(self) -> Vec<JsonPathValueHit> {
650        self.nodes
651            .into_sorted_vec()
652            .into_iter()
653            .map(|hit| JsonPathValueHit {
654                node_id: hit.node_id,
655                value: hit.value,
656            })
657            .collect()
658    }
659}
660
661#[cfg(test)]
662mod tests;