1use crate::optimizer::adaptive::TriplePatternInfo;
12use crate::optimizer::materialized_view::{BindingRow, RdfTerm};
13use anyhow::Result;
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::sync::Arc;
16
17pub trait TripleStore: Send + Sync {
26 fn evaluate_pattern(
31 &self,
32 pattern: &TriplePatternInfo,
33 bindings: Option<&[BindingRow]>,
34 ) -> Result<Vec<BindingRow>>;
35
36 fn estimate_cardinality(&self, pattern: &TriplePatternInfo) -> u64;
38}
39
40pub struct PatternDependencyGraph {
50 patterns: Vec<TriplePatternInfo>,
51 dependencies: Vec<HashSet<usize>>,
53 execution_stages: Vec<Vec<usize>>,
56}
57
58impl PatternDependencyGraph {
59 pub fn build(patterns: Vec<TriplePatternInfo>) -> Self {
61 let n = patterns.len();
62 let mut dependencies: Vec<HashSet<usize>> = vec![HashSet::new(); n];
63
64 let mut var_producer: HashMap<String, usize> = HashMap::new();
66 for (i, pattern) in patterns.iter().enumerate() {
67 for var in &pattern.bound_variables {
68 var_producer.entry(var.clone()).or_insert(i);
69 }
70 }
71
72 for i in 0..n {
75 for (var_name, &producer) in &var_producer {
76 if producer == i {
77 continue;
78 }
79 if patterns[i].bound_variables.contains(var_name) {
80 dependencies[i].insert(producer);
81 }
82 }
83 }
84
85 let execution_stages = Self::topological_stages(&dependencies, n);
86
87 Self {
88 patterns,
89 dependencies,
90 execution_stages,
91 }
92 }
93
94 pub fn get_independent_patterns(&self) -> Vec<Vec<usize>> {
96 self.execution_stages.clone()
97 }
98
99 pub fn execution_order(&self) -> &[Vec<usize>] {
101 &self.execution_stages
102 }
103
104 pub fn patterns(&self) -> &[TriplePatternInfo] {
106 &self.patterns
107 }
108
109 pub fn are_independent(&self, i: usize, j: usize) -> bool {
111 !self.dependencies[i].contains(&j) && !self.dependencies[j].contains(&i)
112 }
113
114 fn topological_stages(dependencies: &[HashSet<usize>], n: usize) -> Vec<Vec<usize>> {
120 let mut in_degree: Vec<usize> = dependencies.iter().map(|d| d.len()).collect();
121 let mut reverse: Vec<Vec<usize>> = vec![Vec::new(); n];
122 for (i, deps) in dependencies.iter().enumerate() {
123 for &dep in deps {
124 reverse[dep].push(i);
125 }
126 }
127
128 let mut stages: Vec<Vec<usize>> = Vec::new();
129 let mut queue: VecDeque<usize> = in_degree
130 .iter()
131 .enumerate()
132 .filter(|(_, &d)| d == 0)
133 .map(|(i, _)| i)
134 .collect();
135
136 while !queue.is_empty() {
137 let stage: Vec<usize> = queue.drain(..).collect();
138 for &node in &stage {
139 for &dependent in &reverse[node] {
140 in_degree[dependent] -= 1;
141 if in_degree[dependent] == 0 {
142 queue.push_back(dependent);
143 }
144 }
145 }
146 stages.push(stage);
147 }
148
149 stages
150 }
151}
152
153pub struct ParallelBgpEvaluator {
159 pub num_threads: usize,
161 pub chunk_size: usize,
163}
164
165impl Default for ParallelBgpEvaluator {
166 fn default() -> Self {
167 Self {
168 num_threads: num_cpus::get(),
169 chunk_size: 1,
170 }
171 }
172}
173
174impl ParallelBgpEvaluator {
175 pub fn new(num_threads: usize) -> Self {
177 Self {
178 num_threads,
179 chunk_size: 1,
180 }
181 }
182
183 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
185 self.chunk_size = chunk_size.max(1);
186 self
187 }
188
189 pub fn evaluate(
191 &self,
192 patterns: Vec<TriplePatternInfo>,
193 store: &dyn TripleStore,
194 ) -> Result<Vec<BindingRow>> {
195 if patterns.is_empty() {
196 return Ok(Vec::new());
197 }
198
199 let graph = PatternDependencyGraph::build(patterns);
200 let stages = graph.execution_order().to_vec();
201
202 let mut current_bindings: Vec<BindingRow> = vec![BindingRow::new()];
204
205 for stage in &stages {
206 let stage_results =
207 self.evaluate_stage(stage, graph.patterns(), store, ¤t_bindings)?;
208
209 for (pattern_idx, pattern_rows) in stage_results {
210 let pattern = &graph.patterns()[pattern_idx];
211 let join_vars: Vec<String> = if current_bindings.is_empty() {
213 Vec::new()
214 } else {
215 let first_row = ¤t_bindings[0];
216 pattern
217 .bound_variables
218 .iter()
219 .filter(|v| first_row.contains_key(v.as_str()))
220 .cloned()
221 .collect()
222 };
223
224 current_bindings = self.merge_results(current_bindings, pattern_rows, &join_vars);
225 }
226 }
227
228 Ok(current_bindings)
229 }
230
231 fn evaluate_stage(
233 &self,
234 stage: &[usize],
235 patterns: &[TriplePatternInfo],
236 store: &dyn TripleStore,
237 current_bindings: &[BindingRow],
238 ) -> Result<Vec<(usize, Vec<BindingRow>)>> {
239 if stage.is_empty() {
240 return Ok(Vec::new());
241 }
242
243 if stage.len() < self.chunk_size || self.num_threads <= 1 {
244 return self.evaluate_stage_sequential(stage, patterns, store, current_bindings);
245 }
246
247 #[cfg(feature = "parallel")]
248 {
249 self.evaluate_stage_parallel(stage, patterns, store, current_bindings)
250 }
251 #[cfg(not(feature = "parallel"))]
252 {
253 self.evaluate_stage_sequential(stage, patterns, store, current_bindings)
254 }
255 }
256
257 fn evaluate_stage_sequential(
258 &self,
259 stage: &[usize],
260 patterns: &[TriplePatternInfo],
261 store: &dyn TripleStore,
262 current_bindings: &[BindingRow],
263 ) -> Result<Vec<(usize, Vec<BindingRow>)>> {
264 let mut results = Vec::with_capacity(stage.len());
265 for &idx in stage {
266 let rows = store.evaluate_pattern(&patterns[idx], Some(current_bindings))?;
267 results.push((idx, rows));
268 }
269 Ok(results)
270 }
271
272 #[cfg(feature = "parallel")]
273 fn evaluate_stage_parallel(
274 &self,
275 stage: &[usize],
276 patterns: &[TriplePatternInfo],
277 store: &dyn TripleStore,
278 current_bindings: &[BindingRow],
279 ) -> Result<Vec<(usize, Vec<BindingRow>)>> {
280 use rayon::prelude::*;
281 use std::sync::Mutex;
282
283 let error_cell: Arc<Mutex<Option<anyhow::Error>>> = Arc::new(Mutex::new(None));
284 let error_clone = Arc::clone(&error_cell);
285
286 let results: Vec<(usize, Vec<BindingRow>)> = stage
287 .par_iter()
288 .filter_map(|&idx| {
289 match store.evaluate_pattern(&patterns[idx], Some(current_bindings)) {
290 Ok(rows) => Some((idx, rows)),
291 Err(e) => {
292 if let Ok(mut guard) = error_clone.lock() {
293 if guard.is_none() {
294 *guard = Some(e);
295 }
296 }
297 None
298 }
299 }
300 })
301 .collect();
302
303 if let Ok(mut guard) = error_cell.lock() {
304 if let Some(err) = guard.take() {
305 return Err(err);
306 }
307 }
308 Ok(results)
309 }
310
311 pub fn merge_results(
313 &self,
314 left: Vec<BindingRow>,
315 right: Vec<BindingRow>,
316 join_vars: &[String],
317 ) -> Vec<BindingRow> {
318 if right.is_empty() {
319 return left;
320 }
321 if left.is_empty() {
322 return right;
323 }
324
325 if join_vars.is_empty() {
327 let mut output: Vec<BindingRow> = Vec::with_capacity(left.len() * right.len());
328 for l_row in &left {
329 for r_row in &right {
330 let mut merged: BindingRow = l_row.clone();
331 for (k, v) in r_row {
332 merged.insert(k.clone(), v.clone());
333 }
334 output.push(merged);
335 }
336 }
337 return output;
338 }
339
340 let mut hash_index: HashMap<Vec<String>, Vec<usize>> = HashMap::new();
342 for (ridx, row) in right.iter().enumerate() {
343 let key: Vec<String> = join_vars.iter().map(|v| rdf_term_key(row.get(v))).collect();
344 hash_index.entry(key).or_default().push(ridx);
345 }
346
347 let mut output: Vec<BindingRow> = Vec::new();
349 for l_row in &left {
350 let key: Vec<String> = join_vars
351 .iter()
352 .map(|v| rdf_term_key(l_row.get(v)))
353 .collect();
354
355 if let Some(right_indices) = hash_index.get(&key) {
356 for &ridx in right_indices {
357 let r_row = &right[ridx];
358 let mut merged: BindingRow = l_row.clone();
359 for (k, v) in r_row {
360 merged.insert(k.clone(), v.clone());
361 }
362 output.push(merged);
363 }
364 }
365 }
366 output
367 }
368}
369
370fn rdf_term_key(term: Option<&RdfTerm>) -> String {
372 match term {
373 None => String::new(),
374 Some(t) => format!("{t}"),
375 }
376}
377
378#[cfg(test)]
383pub(crate) mod test_support {
384 use super::*;
385
386 pub struct MockTripleStore {
388 pub results: HashMap<String, Vec<BindingRow>>,
389 pub default_result: Vec<BindingRow>,
390 }
391
392 impl MockTripleStore {
393 pub fn new() -> Self {
394 Self {
395 results: HashMap::new(),
396 default_result: Vec::new(),
397 }
398 }
399
400 pub fn with_result(mut self, pattern_id: &str, rows: Vec<BindingRow>) -> Self {
401 self.results.insert(pattern_id.to_string(), rows);
402 self
403 }
404 }
405
406 impl TripleStore for MockTripleStore {
407 fn evaluate_pattern(
408 &self,
409 pattern: &TriplePatternInfo,
410 _bindings: Option<&[BindingRow]>,
411 ) -> Result<Vec<BindingRow>> {
412 Ok(self
413 .results
414 .get(&pattern.id)
415 .cloned()
416 .unwrap_or_else(|| self.default_result.clone()))
417 }
418
419 fn estimate_cardinality(&self, pattern: &TriplePatternInfo) -> u64 {
420 self.results
421 .get(&pattern.id)
422 .map(|r| r.len() as u64)
423 .unwrap_or(0)
424 }
425 }
426
427 pub fn iri_term(value: &str) -> RdfTerm {
428 RdfTerm::Iri(value.to_string())
429 }
430
431 pub fn make_row(pairs: &[(&str, RdfTerm)]) -> BindingRow {
432 pairs
433 .iter()
434 .map(|(k, v)| (k.to_string(), v.clone()))
435 .collect()
436 }
437}
438
439#[cfg(test)]
444mod tests {
445 use super::test_support::*;
446 use super::*;
447 use crate::optimizer::adaptive::{PatternTerm, TriplePatternInfo};
448 use crate::optimizer::materialized_view::RdfTerm;
449
450 fn simple_pattern(id: &str, vars: Vec<String>, cardinality: u64) -> TriplePatternInfo {
451 TriplePatternInfo {
452 id: id.to_string(),
453 subject: PatternTerm::Variable(vars.first().cloned().unwrap_or_default()),
454 predicate: PatternTerm::Iri(format!("http://example.org/p_{id}")),
455 object: PatternTerm::Variable(vars.last().cloned().unwrap_or_default()),
456 estimated_cardinality: cardinality,
457 bound_variables: vars,
458 original_pattern: None,
459 }
460 }
461
462 #[test]
463 fn test_dependency_graph_independent_patterns() {
464 let p1 = simple_pattern("p1", vec!["a".to_string(), "b".to_string()], 10);
465 let p2 = simple_pattern("p2", vec!["c".to_string(), "d".to_string()], 20);
466 let graph = PatternDependencyGraph::build(vec![p1, p2]);
467
468 assert!(
469 graph.are_independent(0, 1),
470 "Patterns with no shared vars should be independent"
471 );
472 let stages = graph.get_independent_patterns();
473 assert_eq!(stages.len(), 1, "Independent patterns fit into one stage");
474 assert_eq!(stages[0].len(), 2);
475 }
476
477 #[test]
478 fn test_dependency_graph_dependent_patterns() {
479 let p1 = simple_pattern("p1", vec!["s".to_string(), "type".to_string()], 10);
480 let p2 = simple_pattern("p2", vec!["s".to_string(), "name".to_string()], 100);
481 let graph = PatternDependencyGraph::build(vec![p1, p2]);
482
483 let stages = graph.get_independent_patterns();
484 let total: usize = stages.iter().map(|s| s.len()).sum();
485 assert_eq!(total, 2, "All patterns should appear across stages");
486 }
487
488 #[test]
489 fn test_parallel_evaluator_empty_patterns() {
490 let evaluator = ParallelBgpEvaluator::new(2);
491 let store = MockTripleStore::new();
492 let result = evaluator.evaluate(vec![], &store).unwrap();
493 assert!(result.is_empty());
494 }
495
496 #[test]
497 fn test_parallel_evaluator_single_pattern() {
498 let pattern = simple_pattern("pat1", vec!["s".to_string()], 2);
499 let rows = vec![
500 make_row(&[("s", iri_term("http://example.org/a"))]),
501 make_row(&[("s", iri_term("http://example.org/b"))]),
502 ];
503 let store = MockTripleStore::new().with_result("pat1", rows);
504 let evaluator = ParallelBgpEvaluator::new(1);
505 let result = evaluator.evaluate(vec![pattern], &store).unwrap();
506 assert_eq!(result.len(), 2);
507 }
508
509 #[test]
510 fn test_parallel_evaluator_two_patterns_with_join() {
511 let p1 = simple_pattern("p1", vec!["s".to_string(), "type".to_string()], 2);
512 let p2 = simple_pattern("p2", vec!["s".to_string(), "name".to_string()], 2);
513
514 let p1_rows = vec![
515 make_row(&[
516 ("s", iri_term("http://example.org/alice")),
517 ("type", iri_term("http://example.org/Person")),
518 ]),
519 make_row(&[
520 ("s", iri_term("http://example.org/bob")),
521 ("type", iri_term("http://example.org/Person")),
522 ]),
523 ];
524 let p2_rows = vec![
525 make_row(&[
526 ("s", iri_term("http://example.org/alice")),
527 ("name", RdfTerm::plain_literal("Alice")),
528 ]),
529 make_row(&[
530 ("s", iri_term("http://example.org/bob")),
531 ("name", RdfTerm::plain_literal("Bob")),
532 ]),
533 ];
534
535 let store = MockTripleStore::new()
536 .with_result("p1", p1_rows)
537 .with_result("p2", p2_rows);
538
539 let evaluator = ParallelBgpEvaluator::new(2);
540 let result = evaluator.evaluate(vec![p1, p2], &store).unwrap();
541
542 assert_eq!(
543 result.len(),
544 2,
545 "Should produce 2 joined rows (one per person)"
546 );
547 for row in &result {
548 assert!(row.contains_key("s"));
549 assert!(row.contains_key("name"));
550 }
551 }
552
553 #[test]
554 fn test_merge_results_no_join_vars_cross_product() {
555 let evaluator = ParallelBgpEvaluator::new(1);
556 let left = vec![
557 make_row(&[("a", iri_term("http://example.org/1"))]),
558 make_row(&[("a", iri_term("http://example.org/2"))]),
559 ];
560 let right = vec![make_row(&[("b", iri_term("http://example.org/x"))])];
561
562 let merged = evaluator.merge_results(left, right, &[]);
563 assert_eq!(merged.len(), 2, "Cross product of 2x1 = 2 rows");
564 }
565
566 #[test]
567 fn test_merge_results_with_join_var() {
568 let evaluator = ParallelBgpEvaluator::new(1);
569 let left = vec![
570 make_row(&[
571 ("s", iri_term("http://a")),
572 ("type", iri_term("http://Person")),
573 ]),
574 make_row(&[
575 ("s", iri_term("http://b")),
576 ("type", iri_term("http://Person")),
577 ]),
578 ];
579 let right = vec![make_row(&[
580 ("s", iri_term("http://a")),
581 ("name", RdfTerm::plain_literal("Alice")),
582 ])];
583
584 let merged = evaluator.merge_results(left, right, &["s".to_string()]);
585 assert_eq!(merged.len(), 1);
586 assert_eq!(
587 merged[0].get("name"),
588 Some(&RdfTerm::plain_literal("Alice"))
589 );
590 }
591
592 #[test]
593 fn test_merge_results_empty_left_returns_right() {
594 let evaluator = ParallelBgpEvaluator::new(1);
595 let right = vec![make_row(&[("s", iri_term("http://a"))])];
596 let merged = evaluator.merge_results(vec![], right, &[]);
597 assert_eq!(merged.len(), 1);
598 }
599
600 #[test]
601 fn test_merge_results_empty_right_returns_left() {
602 let evaluator = ParallelBgpEvaluator::new(1);
603 let left = vec![make_row(&[("s", iri_term("http://a"))])];
604 let merged = evaluator.merge_results(left, vec![], &[]);
605 assert_eq!(merged.len(), 1);
606 }
607
608 #[test]
609 fn test_dependency_graph_three_chain() {
610 let p1 = simple_pattern("p1", vec!["x".to_string()], 5);
611 let p2 = simple_pattern("p2", vec!["x".to_string(), "y".to_string()], 50);
612 let p3 = simple_pattern("p3", vec!["y".to_string(), "z".to_string()], 500);
613
614 let graph = PatternDependencyGraph::build(vec![p1, p2, p3]);
615 let stages = graph.execution_order();
616 let total: usize = stages.iter().map(|s| s.len()).sum();
617 assert_eq!(total, 3);
618 assert!(!stages.is_empty());
619 }
620
621 #[test]
622 fn test_evaluator_default_thread_count() {
623 let evaluator = ParallelBgpEvaluator::default();
624 assert!(evaluator.num_threads >= 1);
625 }
626}
627
628#[cfg(test)]
629mod extended_tests {
630 use super::test_support::*;
631 use super::*;
632 use crate::optimizer::adaptive::{PatternTerm, TriplePatternInfo};
633 use crate::optimizer::materialized_view::RdfTerm;
634
635 fn pat(id: &str, vars: Vec<String>, cardinality: u64) -> TriplePatternInfo {
636 TriplePatternInfo {
637 id: id.to_string(),
638 subject: PatternTerm::Variable(vars.first().cloned().unwrap_or_default()),
639 predicate: PatternTerm::Iri(format!("http://example.org/p_{id}")),
640 object: PatternTerm::Variable(vars.last().cloned().unwrap_or_default()),
641 estimated_cardinality: cardinality,
642 bound_variables: vars,
643 original_pattern: None,
644 }
645 }
646
647 #[test]
650 fn test_dependency_graph_single_pattern() {
651 let p1 = pat("solo", vec!["x".to_string()], 10);
652 let graph = PatternDependencyGraph::build(vec![p1]);
653
654 let stages = graph.get_independent_patterns();
655 assert_eq!(
656 stages.len(),
657 1,
658 "Single pattern should produce a single stage"
659 );
660 assert_eq!(stages[0], vec![0], "Stage 0 should contain pattern 0");
661 }
662
663 #[test]
664 fn test_dependency_graph_no_patterns() {
665 let graph = PatternDependencyGraph::build(vec![]);
666 assert!(graph.get_independent_patterns().is_empty());
667 }
668
669 #[test]
670 fn test_dependency_graph_are_independent_different_vars() {
671 let p1 = pat("p1", vec!["a".to_string(), "b".to_string()], 10);
672 let p2 = pat("p2", vec!["c".to_string(), "d".to_string()], 10);
673 let graph = PatternDependencyGraph::build(vec![p1, p2]);
674
675 assert!(
676 graph.are_independent(0, 1),
677 "Patterns with disjoint variables should be independent"
678 );
679 }
680
681 #[test]
682 fn test_dependency_graph_are_not_independent_shared_var() {
683 let p1 = pat("p1", vec!["s".to_string(), "o1".to_string()], 10);
684 let p2 = pat("p2", vec!["s".to_string(), "o2".to_string()], 10);
685 let graph = PatternDependencyGraph::build(vec![p1, p2]);
686
687 let _stages = graph.get_independent_patterns();
691 let patterns = graph.patterns();
692 assert_eq!(patterns.len(), 2, "Graph should contain 2 patterns");
693 }
694
695 #[test]
696 fn test_dependency_graph_execution_order_returns_all_patterns() {
697 let p1 = pat("p1", vec!["a".to_string()], 10);
698 let p2 = pat("p2", vec!["b".to_string()], 20);
699 let p3 = pat("p3", vec!["c".to_string()], 30);
700 let graph = PatternDependencyGraph::build(vec![p1, p2, p3]);
701
702 let total_in_stages: usize = graph.execution_order().iter().map(|s| s.len()).sum();
703 assert_eq!(
704 total_in_stages, 3,
705 "All patterns should appear in execution stages"
706 );
707 }
708
709 #[test]
710 fn test_dependency_graph_patterns_accessor() {
711 let p1 = pat("x", vec!["a".to_string()], 5);
712 let p2 = pat("y", vec!["b".to_string()], 15);
713 let graph = PatternDependencyGraph::build(vec![p1, p2]);
714
715 let patterns = graph.patterns();
716 assert_eq!(patterns.len(), 2);
717 assert_eq!(patterns[0].estimated_cardinality, 5);
718 assert_eq!(patterns[1].estimated_cardinality, 15);
719 }
720
721 #[test]
724 fn test_merge_results_multi_var_join() {
725 let evaluator = ParallelBgpEvaluator::new(1);
726
727 let mut row_l = BindingRow::new();
728 row_l.insert("x".to_string(), RdfTerm::iri("http://a"));
729 row_l.insert("y".to_string(), RdfTerm::iri("http://b"));
730
731 let mut row_r = BindingRow::new();
732 row_r.insert("x".to_string(), RdfTerm::iri("http://a"));
733 row_r.insert("y".to_string(), RdfTerm::iri("http://b"));
734 row_r.insert("z".to_string(), RdfTerm::iri("http://c"));
735
736 let result = evaluator.merge_results(
737 vec![row_l],
738 vec![row_r],
739 &["x".to_string(), "y".to_string()],
740 );
741
742 assert_eq!(
743 result.len(),
744 1,
745 "Matching multi-var join should produce one row"
746 );
747 assert!(
748 result[0].contains_key("z"),
749 "Joined row should contain z from right side"
750 );
751 }
752
753 #[test]
754 fn test_merge_results_no_matching_join_vars() {
755 let evaluator = ParallelBgpEvaluator::new(1);
756
757 let mut row_l = BindingRow::new();
758 row_l.insert("x".to_string(), RdfTerm::iri("http://a"));
759
760 let mut row_r = BindingRow::new();
761 row_r.insert("x".to_string(), RdfTerm::iri("http://DIFFERENT"));
762
763 let result = evaluator.merge_results(vec![row_l], vec![row_r], &["x".to_string()]);
764
765 assert_eq!(
766 result.len(),
767 0,
768 "Non-matching join should produce empty result"
769 );
770 }
771
772 #[test]
773 fn test_merge_results_multiple_right_matches() {
774 let evaluator = ParallelBgpEvaluator::new(1);
775
776 let mut row_l = BindingRow::new();
777 row_l.insert("x".to_string(), RdfTerm::iri("http://shared"));
778
779 let right: Vec<BindingRow> = (0..3)
780 .map(|i| {
781 let mut row = BindingRow::new();
782 row.insert("x".to_string(), RdfTerm::iri("http://shared"));
783 row.insert("y".to_string(), RdfTerm::iri(format!("http://val{i}")));
784 row
785 })
786 .collect();
787
788 let result = evaluator.merge_results(vec![row_l], right, &["x".to_string()]);
789 assert_eq!(
790 result.len(),
791 3,
792 "Should produce one row for each matching right-side row"
793 );
794 }
795
796 #[test]
799 fn test_evaluator_chunk_size_minimum_is_one() {
800 let evaluator = ParallelBgpEvaluator::new(4).with_chunk_size(0);
801 assert_eq!(evaluator.chunk_size, 1, "Chunk size should be at least 1");
802 }
803
804 #[test]
805 fn test_evaluator_chunk_size_set_correctly() {
806 let evaluator = ParallelBgpEvaluator::new(4).with_chunk_size(8);
807 assert_eq!(evaluator.chunk_size, 8);
808 }
809
810 #[test]
811 fn test_evaluator_default_uses_cpu_count() {
812 let evaluator = ParallelBgpEvaluator::default();
813 assert!(evaluator.num_threads >= 1, "Should use at least 1 thread");
814 }
815
816 #[test]
819 fn test_evaluate_no_results_from_store() {
820 let store = MockTripleStore::new();
825 let evaluator = ParallelBgpEvaluator::new(1);
826
827 let pattern = pat("no_results", vec!["x".to_string(), "y".to_string()], 100);
828 let result = evaluator.evaluate(vec![pattern], &store).unwrap();
829 let has_bindings = result.iter().any(|row| !row.is_empty());
831 assert!(
832 !has_bindings,
833 "Empty store should produce no variable bindings"
834 );
835 }
836
837 #[test]
838 fn test_evaluate_two_independent_patterns_cross_product() {
839 let mut store = MockTripleStore::new();
840
841 let p1 = pat("pat_a", vec!["a".to_string()], 2);
842 let p2 = pat("pat_b", vec!["b".to_string()], 3);
843
844 store.results.insert(
845 "pat_a".to_string(),
846 vec![
847 make_row(&[("a", iri_term("http://a1"))]),
848 make_row(&[("a", iri_term("http://a2"))]),
849 ],
850 );
851 store.results.insert(
852 "pat_b".to_string(),
853 vec![
854 make_row(&[("b", iri_term("http://b1"))]),
855 make_row(&[("b", iri_term("http://b2"))]),
856 make_row(&[("b", iri_term("http://b3"))]),
857 ],
858 );
859
860 let evaluator = ParallelBgpEvaluator::new(1);
861 let result = evaluator.evaluate(vec![p1, p2], &store).unwrap();
862
863 assert_eq!(
865 result.len(),
866 6,
867 "Independent patterns produce cross product"
868 );
869 }
870}