1use std::collections::BinaryHeap;
8use std::time::Duration;
9
10use selene_core::{
11 CancellationCause, CancellationChecker, DbString, JsonPathSelector, JsonValue, JsonValueRef,
12 NodeId, Value,
13};
14
15use crate::error::{GraphError, GraphResult};
16use crate::graph::SeleneGraph;
17use crate::shared::SharedGraph;
18use crate::store::RowIndex;
19
20#[path = "json_search/parallel.rs"]
21mod parallel;
22
23pub(crate) const JSON_SEARCH_CANCEL_STRIDE: usize = 1024;
24pub(crate) const JSON_SEARCH_PARALLEL_CHUNK_ROWS: usize = 2048;
25#[cfg(not(test))]
26pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 16_384;
27#[cfg(test)]
28pub(crate) const JSON_SEARCH_PARALLEL_MIN_ROWS: u64 = 8;
29pub const JSON_PATH_SELECTOR_LIMIT: usize = 64;
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
34pub struct JsonContainmentHit {
35 pub node_id: NodeId,
37}
38
39#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub struct JsonPathHit {
42 pub node_id: NodeId,
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub struct JsonPathContainmentHit {
49 pub node_id: NodeId,
51}
52
53#[derive(Clone, Debug, PartialEq)]
55pub struct JsonPathValueHit {
56 pub node_id: NodeId,
58 pub value: JsonValue,
60}
61
62#[derive(Debug, thiserror::Error)]
64pub enum JsonSearchError {
65 #[error(transparent)]
67 Graph(#[from] GraphError),
68 #[error("JSON search cancelled")]
70 Cancelled,
71 #[error("JSON search timed out after {elapsed:?}")]
73 Timeout {
74 elapsed: Duration,
76 },
77}
78
79impl JsonSearchError {
80 pub(crate) fn into_graph_error(self) -> GraphError {
81 match self {
82 Self::Graph(error) => error,
83 Self::Cancelled | Self::Timeout { .. } => GraphError::Inconsistent {
84 reason: format!("disabled JSON-search checker returned {self}"),
85 },
86 }
87 }
88}
89
90impl From<CancellationCause> for JsonSearchError {
91 fn from(cause: CancellationCause) -> Self {
92 match cause {
93 CancellationCause::Cancelled => Self::Cancelled,
94 CancellationCause::Timeout { elapsed } => Self::Timeout { elapsed },
95 }
96 }
97}
98
99impl SeleneGraph {
100 pub fn exact_json_contains_nodes(
102 &self,
103 label: &DbString,
104 property: &DbString,
105 candidate: &JsonValue,
106 k: usize,
107 ) -> GraphResult<Vec<JsonContainmentHit>> {
108 self.exact_json_contains_nodes_checked(
109 label,
110 property,
111 candidate,
112 k,
113 CancellationChecker::disabled(),
114 )
115 .map_err(JsonSearchError::into_graph_error)
116 }
117
118 pub fn exact_json_contains_nodes_checked(
120 &self,
121 label: &DbString,
122 property: &DbString,
123 candidate: &JsonValue,
124 k: usize,
125 checker: CancellationChecker<'_>,
126 ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
127 checker.check()?;
128 if k == 0 {
129 return Ok(Vec::new());
130 }
131 let Some(label_rows) = self.nodes_with_label(label) else {
132 return Ok(Vec::new());
133 };
134 if parallel::should_parallelize_json_scan(label_rows, k) {
135 let scan = parallel::JsonScan::new(self, label, property);
136 return parallel::contains_nodes(scan, candidate, k, label_rows, checker);
137 }
138
139 let mut top_k = JsonContainmentTopK::new(k);
140 let mut rows_since_check = 0usize;
141 for raw_row in label_rows.iter() {
142 rows_since_check += 1;
143 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
144 checker.check()?;
145 rows_since_check = 0;
146 }
147 if !self.node_store.is_alive(raw_row) {
148 continue;
149 }
150 let row = RowIndex::new(raw_row);
151 let node_id = self
152 .node_id_for_row(row)
153 .ok_or_else(|| GraphError::Inconsistent {
154 reason: format!(
155 "label index row {raw_row} for {} has no node id",
156 label.as_str()
157 ),
158 })?;
159 let properties = self
160 .node_store
161 .properties
162 .get(raw_row as usize)
163 .ok_or_else(|| GraphError::Inconsistent {
164 reason: format!(
165 "JSON search row {raw_row} for {} has no property row",
166 label.as_str()
167 ),
168 })?;
169 let Some(Value::Json(value)) = properties.get(property) else {
170 continue;
171 };
172 if value.contains(candidate) {
173 top_k.push(node_id);
174 }
175 }
176 Ok(top_k.into_hits())
177 }
178
179 pub fn exact_json_path_exists_nodes(
181 &self,
182 label: &DbString,
183 property: &DbString,
184 path: &[JsonPathSelector],
185 k: usize,
186 ) -> GraphResult<Vec<JsonPathHit>> {
187 self.exact_json_path_exists_nodes_checked(
188 label,
189 property,
190 path,
191 k,
192 CancellationChecker::disabled(),
193 )
194 .map_err(JsonSearchError::into_graph_error)
195 }
196
197 pub fn exact_json_path_exists_nodes_checked(
199 &self,
200 label: &DbString,
201 property: &DbString,
202 path: &[JsonPathSelector],
203 k: usize,
204 checker: CancellationChecker<'_>,
205 ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
206 checker.check()?;
207 if k == 0 || path.is_empty() {
208 return Ok(Vec::new());
209 }
210 let Some(label_rows) = self.nodes_with_label(label) else {
211 return Ok(Vec::new());
212 };
213 if parallel::should_parallelize_json_scan(label_rows, k) {
214 let scan = parallel::JsonScan::new(self, label, property);
215 return parallel::path_exists_nodes(scan, path, k, label_rows, checker);
216 }
217
218 let mut top_k = JsonContainmentTopK::new(k);
219 let mut rows_since_check = 0usize;
220 for raw_row in label_rows.iter() {
221 rows_since_check += 1;
222 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
223 checker.check()?;
224 rows_since_check = 0;
225 }
226 if !self.node_store.is_alive(raw_row) {
227 continue;
228 }
229 let row = RowIndex::new(raw_row);
230 let node_id = self
231 .node_id_for_row(row)
232 .ok_or_else(|| GraphError::Inconsistent {
233 reason: format!(
234 "label index row {raw_row} for {} has no node id",
235 label.as_str()
236 ),
237 })?;
238 let properties = self
239 .node_store
240 .properties
241 .get(raw_row as usize)
242 .ok_or_else(|| GraphError::Inconsistent {
243 reason: format!(
244 "JSON search row {raw_row} for {} has no property row",
245 label.as_str()
246 ),
247 })?;
248 let Some(Value::Json(value)) = properties.get(property) else {
249 continue;
250 };
251 if value.path_exists(path) {
252 top_k.push(node_id);
253 }
254 }
255 Ok(top_k.into_path_hits())
256 }
257
258 pub fn exact_json_path_contains_nodes(
261 &self,
262 label: &DbString,
263 property: &DbString,
264 path: &[JsonPathSelector],
265 candidate: &JsonValue,
266 k: usize,
267 ) -> GraphResult<Vec<JsonPathContainmentHit>> {
268 self.exact_json_path_contains_nodes_checked(
269 label,
270 property,
271 path,
272 candidate,
273 k,
274 CancellationChecker::disabled(),
275 )
276 .map_err(JsonSearchError::into_graph_error)
277 }
278
279 pub fn exact_json_path_contains_nodes_checked(
281 &self,
282 label: &DbString,
283 property: &DbString,
284 path: &[JsonPathSelector],
285 candidate: &JsonValue,
286 k: usize,
287 checker: CancellationChecker<'_>,
288 ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
289 checker.check()?;
290 if k == 0 || path.is_empty() {
291 return Ok(Vec::new());
292 }
293 let Some(label_rows) = self.nodes_with_label(label) else {
294 return Ok(Vec::new());
295 };
296 if parallel::should_parallelize_json_scan(label_rows, k) {
297 let scan = parallel::JsonScan::new(self, label, property);
298 return parallel::path_contains_nodes(scan, path, candidate, k, label_rows, checker);
299 }
300
301 let mut top_k = JsonContainmentTopK::new(k);
302 let mut rows_since_check = 0usize;
303 for raw_row in label_rows.iter() {
304 rows_since_check += 1;
305 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
306 checker.check()?;
307 rows_since_check = 0;
308 }
309 if !self.node_store.is_alive(raw_row) {
310 continue;
311 }
312 let row = RowIndex::new(raw_row);
313 let node_id = self
314 .node_id_for_row(row)
315 .ok_or_else(|| GraphError::Inconsistent {
316 reason: format!(
317 "label index row {raw_row} for {} has no node id",
318 label.as_str()
319 ),
320 })?;
321 let properties = self
322 .node_store
323 .properties
324 .get(raw_row as usize)
325 .ok_or_else(|| GraphError::Inconsistent {
326 reason: format!(
327 "JSON search row {raw_row} for {} has no property row",
328 label.as_str()
329 ),
330 })?;
331 let Some(Value::Json(value)) = properties.get(property) else {
332 continue;
333 };
334 if value.path_contains(path, candidate) {
335 top_k.push(node_id);
336 }
337 }
338 Ok(top_k.into_path_containment_hits())
339 }
340
341 pub fn exact_json_path_value_nodes(
343 &self,
344 label: &DbString,
345 property: &DbString,
346 path: &[JsonPathSelector],
347 k: usize,
348 ) -> GraphResult<Vec<JsonPathValueHit>> {
349 self.exact_json_path_value_nodes_checked(
350 label,
351 property,
352 path,
353 k,
354 CancellationChecker::disabled(),
355 )
356 .map_err(JsonSearchError::into_graph_error)
357 }
358
359 pub fn exact_json_path_value_nodes_checked(
361 &self,
362 label: &DbString,
363 property: &DbString,
364 path: &[JsonPathSelector],
365 k: usize,
366 checker: CancellationChecker<'_>,
367 ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
368 checker.check()?;
369 if k == 0 || path.is_empty() {
370 return Ok(Vec::new());
371 }
372 let Some(label_rows) = self.nodes_with_label(label) else {
373 return Ok(Vec::new());
374 };
375 if parallel::should_parallelize_json_scan(label_rows, k) {
376 let scan = parallel::JsonScan::new(self, label, property);
377 return parallel::path_value_nodes(scan, path, k, label_rows, checker);
378 }
379
380 let mut top_k = JsonPathValueTopK::new(k);
381 let mut rows_since_check = 0usize;
382 for raw_row in label_rows.iter() {
383 rows_since_check += 1;
384 if rows_since_check >= JSON_SEARCH_CANCEL_STRIDE {
385 checker.check()?;
386 rows_since_check = 0;
387 }
388 if !self.node_store.is_alive(raw_row) {
389 continue;
390 }
391 let row = RowIndex::new(raw_row);
392 let node_id = self
393 .node_id_for_row(row)
394 .ok_or_else(|| GraphError::Inconsistent {
395 reason: format!(
396 "label index row {raw_row} for {} has no node id",
397 label.as_str()
398 ),
399 })?;
400 let properties = self
401 .node_store
402 .properties
403 .get(raw_row as usize)
404 .ok_or_else(|| GraphError::Inconsistent {
405 reason: format!(
406 "JSON search row {raw_row} for {} has no property row",
407 label.as_str()
408 ),
409 })?;
410 let Some(Value::Json(value)) = properties.get(property) else {
411 continue;
412 };
413 let Some(value) = value.path_value_ref(path) else {
414 continue;
415 };
416 top_k.push(node_id, value);
417 }
418 Ok(top_k.into_hits())
419 }
420}
421
422impl SharedGraph {
423 pub fn exact_json_contains_nodes(
425 &self,
426 label: &DbString,
427 property: &DbString,
428 candidate: &JsonValue,
429 k: usize,
430 ) -> GraphResult<Vec<JsonContainmentHit>> {
431 self.read()
432 .exact_json_contains_nodes(label, property, candidate, k)
433 }
434
435 pub fn exact_json_contains_nodes_checked(
437 &self,
438 label: &DbString,
439 property: &DbString,
440 candidate: &JsonValue,
441 k: usize,
442 checker: CancellationChecker<'_>,
443 ) -> Result<Vec<JsonContainmentHit>, JsonSearchError> {
444 self.read()
445 .exact_json_contains_nodes_checked(label, property, candidate, k, checker)
446 }
447
448 pub fn exact_json_path_exists_nodes(
450 &self,
451 label: &DbString,
452 property: &DbString,
453 path: &[JsonPathSelector],
454 k: usize,
455 ) -> GraphResult<Vec<JsonPathHit>> {
456 self.read()
457 .exact_json_path_exists_nodes(label, property, path, k)
458 }
459
460 pub fn exact_json_path_exists_nodes_checked(
462 &self,
463 label: &DbString,
464 property: &DbString,
465 path: &[JsonPathSelector],
466 k: usize,
467 checker: CancellationChecker<'_>,
468 ) -> Result<Vec<JsonPathHit>, JsonSearchError> {
469 self.read()
470 .exact_json_path_exists_nodes_checked(label, property, path, k, checker)
471 }
472
473 pub fn exact_json_path_contains_nodes(
476 &self,
477 label: &DbString,
478 property: &DbString,
479 path: &[JsonPathSelector],
480 candidate: &JsonValue,
481 k: usize,
482 ) -> GraphResult<Vec<JsonPathContainmentHit>> {
483 self.read()
484 .exact_json_path_contains_nodes(label, property, path, candidate, k)
485 }
486
487 pub fn exact_json_path_contains_nodes_checked(
489 &self,
490 label: &DbString,
491 property: &DbString,
492 path: &[JsonPathSelector],
493 candidate: &JsonValue,
494 k: usize,
495 checker: CancellationChecker<'_>,
496 ) -> Result<Vec<JsonPathContainmentHit>, JsonSearchError> {
497 self.read()
498 .exact_json_path_contains_nodes_checked(label, property, path, candidate, k, checker)
499 }
500
501 pub fn exact_json_path_value_nodes(
503 &self,
504 label: &DbString,
505 property: &DbString,
506 path: &[JsonPathSelector],
507 k: usize,
508 ) -> GraphResult<Vec<JsonPathValueHit>> {
509 self.read()
510 .exact_json_path_value_nodes(label, property, path, k)
511 }
512
513 pub fn exact_json_path_value_nodes_checked(
515 &self,
516 label: &DbString,
517 property: &DbString,
518 path: &[JsonPathSelector],
519 k: usize,
520 checker: CancellationChecker<'_>,
521 ) -> Result<Vec<JsonPathValueHit>, JsonSearchError> {
522 self.read()
523 .exact_json_path_value_nodes_checked(label, property, path, k, checker)
524 }
525}
526
527struct JsonContainmentTopK {
528 k: usize,
529 nodes: BinaryHeap<NodeId>,
530}
531
532impl JsonContainmentTopK {
533 fn new(k: usize) -> Self {
534 Self {
535 k,
536 nodes: BinaryHeap::new(),
537 }
538 }
539
540 fn push(&mut self, node_id: NodeId) {
541 if self.k == 0 {
542 return;
543 }
544 if self.nodes.len() < self.k {
545 self.nodes.push(node_id);
546 return;
547 }
548 let Some(mut max_node_id) = self.nodes.peek_mut() else {
549 return;
550 };
551 if node_id < *max_node_id {
552 *max_node_id = node_id;
553 }
554 }
555
556 fn into_hits(self) -> Vec<JsonContainmentHit> {
557 self.nodes
558 .into_sorted_vec()
559 .into_iter()
560 .map(|node_id| JsonContainmentHit { node_id })
561 .collect()
562 }
563
564 fn into_path_hits(self) -> Vec<JsonPathHit> {
565 self.nodes
566 .into_sorted_vec()
567 .into_iter()
568 .map(|node_id| JsonPathHit { node_id })
569 .collect()
570 }
571
572 fn into_path_containment_hits(self) -> Vec<JsonPathContainmentHit> {
573 self.nodes
574 .into_sorted_vec()
575 .into_iter()
576 .map(|node_id| JsonPathContainmentHit { node_id })
577 .collect()
578 }
579}
580
581struct JsonPathValueCandidate {
582 node_id: NodeId,
583 value: JsonValue,
584}
585
586impl PartialEq for JsonPathValueCandidate {
587 fn eq(&self, other: &Self) -> bool {
588 self.node_id == other.node_id
589 }
590}
591
592impl Eq for JsonPathValueCandidate {}
593
594impl PartialOrd for JsonPathValueCandidate {
595 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
596 Some(self.cmp(other))
597 }
598}
599
600impl Ord for JsonPathValueCandidate {
601 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
602 self.node_id.cmp(&other.node_id)
603 }
604}
605
606struct JsonPathValueTopK {
607 k: usize,
608 nodes: BinaryHeap<JsonPathValueCandidate>,
609}
610
611impl JsonPathValueTopK {
612 fn new(k: usize) -> Self {
613 Self {
614 k,
615 nodes: BinaryHeap::new(),
616 }
617 }
618
619 fn push(&mut self, node_id: NodeId, value: JsonValueRef<'_>) {
620 self.push_with(node_id, || value.to_owned_json_value());
621 }
622
623 fn push_owned(&mut self, node_id: NodeId, value: JsonValue) {
624 self.push_with(node_id, || value);
625 }
626
627 fn push_with(&mut self, node_id: NodeId, value: impl FnOnce() -> JsonValue) {
628 if self.k == 0 {
629 return;
630 }
631 if self.nodes.len() < self.k {
632 self.nodes.push(JsonPathValueCandidate {
633 node_id,
634 value: value(),
635 });
636 return;
637 }
638 let Some(mut max_node) = self.nodes.peek_mut() else {
639 return;
640 };
641 if node_id < max_node.node_id {
642 *max_node = JsonPathValueCandidate {
643 node_id,
644 value: value(),
645 };
646 }
647 }
648
649 fn into_hits(self) -> Vec<JsonPathValueHit> {
650 self.nodes
651 .into_sorted_vec()
652 .into_iter()
653 .map(|hit| JsonPathValueHit {
654 node_id: hit.node_id,
655 value: hit.value,
656 })
657 .collect()
658 }
659}
660
661#[cfg(test)]
662mod tests;