1use std::collections::BinaryHeap;
8use std::time::Duration;
9
10use selene_core::{
11 CancellationCause, CancellationChecker, DbString, JsonPathSelector, JsonValue, JsonValueRef,
12 NodeId, Value,
13};
14
15use crate::error::{GraphError, GraphResult};
16use crate::graph::SeleneGraph;
17use crate::shared::SharedGraph;
18use crate::store::RowIndex;
19
20#[path = "json_search/parallel.rs"]
21mod parallel;
22
23pub(crate) const JSON_SEARCH_CANCEL_STRIDE: usize = 1024;
24pub(crate) const JSON_SEARCH_PARALLEL_CHUNK_ROWS: usize = 2048;
25#[cfg(not(test))]
26pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 16_384;
27#[cfg(test)]
28pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 8;
29pub const JSON_PATH_SELECTOR_LIMIT: usize = 64;
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
34pub struct JsonContainmentHit {
35 pub node_id: NodeId,
37}
38
39#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub struct JsonPathHit {
42 pub node_id: NodeId,
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub struct JsonPathContainmentHit {
49 pub node_id: NodeId,
51}
52
53#[derive(Clone, Debug, PartialEq)]
55pub struct JsonPathValueHit {
56 pub node_id: NodeId,
58 pub value: JsonValue,
60}
61
62#[derive(Debug, thiserror::Error)]
64pub enum JsonSearchError {
65 #[error(transparent)]
67 Graph(#[from] GraphError),
68 #[error("JSON search cancelled")]
70 Cancelled,
71 #[error("JSON search timed out after {elapsed:?}")]
73 Timeout {
74 elapsed: Duration,
76 },
77 #[error("JSON search node scan budget exceeded ({scanned} > {limit})")]
79 NodeScanBudgetExceeded {
80 limit: usize,
82 scanned: usize,
84 },
85}
86
87impl JsonSearchError {
88 pub(crate) fn into_graph_error(self) -> GraphError {
89 match self {
90 Self::Graph(error) => error,
91 Self::Cancelled | Self::Timeout { .. } | Self::NodeScanBudgetExceeded { .. } => {
92 GraphError::Inconsistent {
93 reason: format!("disabled JSON-search checker returned {self}"),
94 }
95 }
96 }
97 }
98}
99
100impl From<CancellationCause> for JsonSearchError {
101 fn from(cause: CancellationCause) -> Self {
102 match cause {
103 CancellationCause::Cancelled => Self::Cancelled,
104 CancellationCause::Timeout { elapsed } => Self::Timeout { elapsed },
105 CancellationCause::NodeScanBudgetExceeded { limit, scanned } => {
106 Self::NodeScanBudgetExceeded { limit, scanned }
107 }
108 }
109 }
110}
111
112impl SeleneGraph {
113 pub fn exact_json_contains_nodes(
115 &self,
116 label: &DbString,
117 property: &DbString,
118 candidate: &JsonValue,
119 k: usize,
120 ) -> GraphResult<Vec<JsonContainmentHit>> {
121 self.exact_json_contains_nodes_checked(
122 label,
123 property,
124 candidate,
125 k,
126 CancellationChecker::disabled(),
127 )
128 .map_err(JsonSearchError::into_graph_error)
129 }
130
131 pub fn exact_json_contains_nodes_checked(
133 &self,
134 label: &DbString,
135 property: &DbString,
136 candidate: &JsonValue,
137 k: usize,
138 checker: CancellationChecker<'_>,
139 ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
140 checker.check()?;
141 if k == 0 {
142 return Ok(Vec::new());
143 }
144 let Some(label_rows) = self.nodes_with_label(label) else {
145 return Ok(Vec::new());
146 };
147 if parallel::should_parallelize_json_scan(label_rows, k) {
148 let scan = parallel::JsonScan::new(self, label, property);
149 return parallel::contains_nodes(scan, candidate, k, label_rows, checker);
150 }
151
152 let mut top_k = JsonContainmentTopK::new(k);
153 let mut rows_since_check = 0usize;
154 for raw_row in label_rows.iter() {
155 rows_since_check += 1;
156 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
157 checker.note_nodes_scanned(rows_since_check)?;
158 rows_since_check = 0;
159 }
160 if !self.node_store.is_alive(raw_row) {
161 continue;
162 }
163 let row = RowIndex::new(raw_row);
164 let node_id = self
165 .node_id_for_row(row)
166 .ok_or_else(|| GraphError::Inconsistent {
167 reason: format!(
168 "label index row {raw_row} for {} has no node id",
169 label.as_str()
170 ),
171 })?;
172 let properties = self
173 .node_store
174 .properties
175 .get(raw_row as usize)
176 .ok_or_else(|| GraphError::Inconsistent {
177 reason: format!(
178 "JSON search row {raw_row} for {} has no property row",
179 label.as_str()
180 ),
181 })?;
182 let Some(Value::Json(value)) = properties.get(property) else {
183 continue;
184 };
185 if value.contains(candidate) {
186 top_k.push(node_id);
187 }
188 }
189 if rows_since_check > 0 {
190 checker.note_nodes_scanned(rows_since_check)?;
191 }
192 Ok(top_k.into_hits())
193 }
194
195 pub fn exact_json_path_exists_nodes(
197 &self,
198 label: &DbString,
199 property: &DbString,
200 path: &[JsonPathSelector],
201 k: usize,
202 ) -> GraphResult<Vec<JsonPathHit>> {
203 self.exact_json_path_exists_nodes_checked(
204 label,
205 property,
206 path,
207 k,
208 CancellationChecker::disabled(),
209 )
210 .map_err(JsonSearchError::into_graph_error)
211 }
212
213 pub fn exact_json_path_exists_nodes_checked(
215 &self,
216 label: &DbString,
217 property: &DbString,
218 path: &[JsonPathSelector],
219 k: usize,
220 checker: CancellationChecker<'_>,
221 ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
222 checker.check()?;
223 if k == 0 || path.is_empty() {
224 return Ok(Vec::new());
225 }
226 let Some(label_rows) = self.nodes_with_label(label) else {
227 return Ok(Vec::new());
228 };
229 if parallel::should_parallelize_json_scan(label_rows, k) {
230 let scan = parallel::JsonScan::new(self, label, property);
231 return parallel::path_exists_nodes(scan, path, k, label_rows, checker);
232 }
233
234 let mut top_k = JsonContainmentTopK::new(k);
235 let mut rows_since_check = 0usize;
236 for raw_row in label_rows.iter() {
237 rows_since_check += 1;
238 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
239 checker.note_nodes_scanned(rows_since_check)?;
240 rows_since_check = 0;
241 }
242 if !self.node_store.is_alive(raw_row) {
243 continue;
244 }
245 let row = RowIndex::new(raw_row);
246 let node_id = self
247 .node_id_for_row(row)
248 .ok_or_else(|| GraphError::Inconsistent {
249 reason: format!(
250 "label index row {raw_row} for {} has no node id",
251 label.as_str()
252 ),
253 })?;
254 let properties = self
255 .node_store
256 .properties
257 .get(raw_row as usize)
258 .ok_or_else(|| GraphError::Inconsistent {
259 reason: format!(
260 "JSON search row {raw_row} for {} has no property row",
261 label.as_str()
262 ),
263 })?;
264 let Some(Value::Json(value)) = properties.get(property) else {
265 continue;
266 };
267 if value.path_exists(path) {
268 top_k.push(node_id);
269 }
270 }
271 if rows_since_check > 0 {
272 checker.note_nodes_scanned(rows_since_check)?;
273 }
274 Ok(top_k.into_path_hits())
275 }
276
277 pub fn exact_json_path_contains_nodes(
280 &self,
281 label: &DbString,
282 property: &DbString,
283 path: &[JsonPathSelector],
284 candidate: &JsonValue,
285 k: usize,
286 ) -> GraphResult<Vec<JsonPathContainmentHit>> {
287 self.exact_json_path_contains_nodes_checked(
288 label,
289 property,
290 path,
291 candidate,
292 k,
293 CancellationChecker::disabled(),
294 )
295 .map_err(JsonSearchError::into_graph_error)
296 }
297
298 pub fn exact_json_path_contains_nodes_checked(
300 &self,
301 label: &DbString,
302 property: &DbString,
303 path: &[JsonPathSelector],
304 candidate: &JsonValue,
305 k: usize,
306 checker: CancellationChecker<'_>,
307 ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
308 checker.check()?;
309 if k == 0 || path.is_empty() {
310 return Ok(Vec::new());
311 }
312 let Some(label_rows) = self.nodes_with_label(label) else {
313 return Ok(Vec::new());
314 };
315 if parallel::should_parallelize_json_scan(label_rows, k) {
316 let scan = parallel::JsonScan::new(self, label, property);
317 return parallel::path_contains_nodes(scan, path, candidate, k, label_rows, checker);
318 }
319
320 let mut top_k = JsonContainmentTopK::new(k);
321 let mut rows_since_check = 0usize;
322 for raw_row in label_rows.iter() {
323 rows_since_check += 1;
324 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
325 checker.note_nodes_scanned(rows_since_check)?;
326 rows_since_check = 0;
327 }
328 if !self.node_store.is_alive(raw_row) {
329 continue;
330 }
331 let row = RowIndex::new(raw_row);
332 let node_id = self
333 .node_id_for_row(row)
334 .ok_or_else(|| GraphError::Inconsistent {
335 reason: format!(
336 "label index row {raw_row} for {} has no node id",
337 label.as_str()
338 ),
339 })?;
340 let properties = self
341 .node_store
342 .properties
343 .get(raw_row as usize)
344 .ok_or_else(|| GraphError::Inconsistent {
345 reason: format!(
346 "JSON search row {raw_row} for {} has no property row",
347 label.as_str()
348 ),
349 })?;
350 let Some(Value::Json(value)) = properties.get(property) else {
351 continue;
352 };
353 if value.path_contains(path, candidate) {
354 top_k.push(node_id);
355 }
356 }
357 if rows_since_check > 0 {
358 checker.note_nodes_scanned(rows_since_check)?;
359 }
360 Ok(top_k.into_path_containment_hits())
361 }
362
363 pub fn exact_json_path_value_nodes(
365 &self,
366 label: &DbString,
367 property: &DbString,
368 path: &[JsonPathSelector],
369 k: usize,
370 ) -> GraphResult<Vec<JsonPathValueHit>> {
371 self.exact_json_path_value_nodes_checked(
372 label,
373 property,
374 path,
375 k,
376 CancellationChecker::disabled(),
377 )
378 .map_err(JsonSearchError::into_graph_error)
379 }
380
381 pub fn exact_json_path_value_nodes_checked(
383 &self,
384 label: &DbString,
385 property: &DbString,
386 path: &[JsonPathSelector],
387 k: usize,
388 checker: CancellationChecker<'_>,
389 ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
390 checker.check()?;
391 if k == 0 || path.is_empty() {
392 return Ok(Vec::new());
393 }
394 let Some(label_rows) = self.nodes_with_label(label) else {
395 return Ok(Vec::new());
396 };
397 if parallel::should_parallelize_json_scan(label_rows, k) {
398 let scan = parallel::JsonScan::new(self, label, property);
399 return parallel::path_value_nodes(scan, path, k, label_rows, checker);
400 }
401
402 let mut top_k = JsonPathValueTopK::new(k);
403 let mut rows_since_check = 0usize;
404 for raw_row in label_rows.iter() {
405 rows_since_check += 1;
406 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
407 checker.note_nodes_scanned(rows_since_check)?;
408 rows_since_check = 0;
409 }
410 if !self.node_store.is_alive(raw_row) {
411 continue;
412 }
413 let row = RowIndex::new(raw_row);
414 let node_id = self
415 .node_id_for_row(row)
416 .ok_or_else(|| GraphError::Inconsistent {
417 reason: format!(
418 "label index row {raw_row} for {} has no node id",
419 label.as_str()
420 ),
421 })?;
422 let properties = self
423 .node_store
424 .properties
425 .get(raw_row as usize)
426 .ok_or_else(|| GraphError::Inconsistent {
427 reason: format!(
428 "JSON search row {raw_row} for {} has no property row",
429 label.as_str()
430 ),
431 })?;
432 let Some(Value::Json(value)) = properties.get(property) else {
433 continue;
434 };
435 let Some(value) = value.path_value_ref(path) else {
436 continue;
437 };
438 top_k.push(node_id, value);
439 }
440 if rows_since_check > 0 {
441 checker.note_nodes_scanned(rows_since_check)?;
442 }
443 Ok(top_k.into_hits())
444 }
445}
446
447impl SharedGraph {
448 pub fn exact_json_contains_nodes(
450 &self,
451 label: &DbString,
452 property: &DbString,
453 candidate: &JsonValue,
454 k: usize,
455 ) -> GraphResult<Vec<JsonContainmentHit>> {
456 self.read()
457 .exact_json_contains_nodes(label, property, candidate, k)
458 }
459
460 pub fn exact_json_contains_nodes_checked(
462 &self,
463 label: &DbString,
464 property: &DbString,
465 candidate: &JsonValue,
466 k: usize,
467 checker: CancellationChecker<'_>,
468 ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
469 self.read()
470 .exact_json_contains_nodes_checked(label, property, candidate, k, checker)
471 }
472
473 pub fn exact_json_path_exists_nodes(
475 &self,
476 label: &DbString,
477 property: &DbString,
478 path: &[JsonPathSelector],
479 k: usize,
480 ) -> GraphResult<Vec<JsonPathHit>> {
481 self.read()
482 .exact_json_path_exists_nodes(label, property, path, k)
483 }
484
485 pub fn exact_json_path_exists_nodes_checked(
487 &self,
488 label: &DbString,
489 property: &DbString,
490 path: &[JsonPathSelector],
491 k: usize,
492 checker: CancellationChecker<'_>,
493 ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
494 self.read()
495 .exact_json_path_exists_nodes_checked(label, property, path, k, checker)
496 }
497
498 pub fn exact_json_path_contains_nodes(
501 &self,
502 label: &DbString,
503 property: &DbString,
504 path: &[JsonPathSelector],
505 candidate: &JsonValue,
506 k: usize,
507 ) -> GraphResult<Vec<JsonPathContainmentHit>> {
508 self.read()
509 .exact_json_path_contains_nodes(label, property, path, candidate, k)
510 }
511
512 pub fn exact_json_path_contains_nodes_checked(
514 &self,
515 label: &DbString,
516 property: &DbString,
517 path: &[JsonPathSelector],
518 candidate: &JsonValue,
519 k: usize,
520 checker: CancellationChecker<'_>,
521 ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
522 self.read()
523 .exact_json_path_contains_nodes_checked(label, property, path, candidate, k, checker)
524 }
525
526 pub fn exact_json_path_value_nodes(
528 &self,
529 label: &DbString,
530 property: &DbString,
531 path: &[JsonPathSelector],
532 k: usize,
533 ) -> GraphResult<Vec<JsonPathValueHit>> {
534 self.read()
535 .exact_json_path_value_nodes(label, property, path, k)
536 }
537
538 pub fn exact_json_path_value_nodes_checked(
540 &self,
541 label: &DbString,
542 property: &DbString,
543 path: &[JsonPathSelector],
544 k: usize,
545 checker: CancellationChecker<'_>,
546 ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
547 self.read()
548 .exact_json_path_value_nodes_checked(label, property, path, k, checker)
549 }
550}
551
552struct JsonContainmentTopK {
553 k: usize,
554 nodes: BinaryHeap<NodeId>,
555}
556
557impl JsonContainmentTopK {
558 fn new(k: usize) -> Self {
559 Self {
560 k,
561 nodes: BinaryHeap::new(),
562 }
563 }
564
565 fn push(&mut self, node_id: NodeId) {
566 if self.k == 0 {
567 return;
568 }
569 if self.nodes.len() < self.k {
570 self.nodes.push(node_id);
571 return;
572 }
573 let Some(mut max_node_id) = self.nodes.peek_mut() else {
574 return;
575 };
576 if node_id < *max_node_id {
577 *max_node_id = node_id;
578 }
579 }
580
581 fn into_hits(self) -> Vec<JsonContainmentHit> {
582 self.nodes
583 .into_sorted_vec()
584 .into_iter()
585 .map(|node_id| JsonContainmentHit { node_id })
586 .collect()
587 }
588
589 fn into_path_hits(self) -> Vec<JsonPathHit> {
590 self.nodes
591 .into_sorted_vec()
592 .into_iter()
593 .map(|node_id| JsonPathHit { node_id })
594 .collect()
595 }
596
597 fn into_path_containment_hits(self) -> Vec<JsonPathContainmentHit> {
598 self.nodes
599 .into_sorted_vec()
600 .into_iter()
601 .map(|node_id| JsonPathContainmentHit { node_id })
602 .collect()
603 }
604}
605
606struct JsonPathValueCandidate {
607 node_id: NodeId,
608 value: JsonValue,
609}
610
611impl PartialEq for JsonPathValueCandidate {
612 fn eq(&self, other: &Self) -> bool {
613 self.node_id == other.node_id
614 }
615}
616
617impl Eq for JsonPathValueCandidate {}
618
619impl PartialOrd for JsonPathValueCandidate {
620 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
621 Some(self.cmp(other))
622 }
623}
624
625impl Ord for JsonPathValueCandidate {
626 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
627 self.node_id.cmp(&other.node_id)
628 }
629}
630
631struct JsonPathValueTopK {
632 k: usize,
633 nodes: BinaryHeap<JsonPathValueCandidate>,
634}
635
636impl JsonPathValueTopK {
637 fn new(k: usize) -> Self {
638 Self {
639 k,
640 nodes: BinaryHeap::new(),
641 }
642 }
643
644 fn push(&mut self, node_id: NodeId, value: JsonValueRef<'_>) {
645 self.push_with(node_id, || value.to_owned_json_value());
646 }
647
648 fn push_owned(&mut self, node_id: NodeId, value: JsonValue) {
649 self.push_with(node_id, || value);
650 }
651
652 fn push_with(&mut self, node_id: NodeId, value: impl FnOnce() -> JsonValue) {
653 if self.k == 0 {
654 return;
655 }
656 if self.nodes.len() < self.k {
657 self.nodes.push(JsonPathValueCandidate {
658 node_id,
659 value: value(),
660 });
661 return;
662 }
663 let Some(mut max_node) = self.nodes.peek_mut() else {
664 return;
665 };
666 if node_id < max_node.node_id {
667 *max_node = JsonPathValueCandidate {
668 node_id,
669 value: value(),
670 };
671 }
672 }
673
674 fn into_hits(self) -> Vec<JsonPathValueHit> {
675 self.nodes
676 .into_sorted_vec()
677 .into_iter()
678 .map(|hit| JsonPathValueHit {
679 node_id: hit.node_id,
680 value: hit.value,
681 })
682 .collect()
683 }
684}
685
686#[cfg(test)]
687mod tests;