Skip to main content

oxirs_arq/execution/
parallel_eval.rs

1//! Parallel Triple Pattern Evaluation
2//!
3//! This module implements parallel evaluation of independent triple patterns
4//! within a Basic Graph Pattern (BGP).  Independent patterns (those that do
5//! not share variables with each other) can be executed concurrently, and
6//! their results are then joined in the correct order.
7//!
8//! The dependency analysis uses a directed acyclic graph (DAG) over patterns
9//! to identify which patterns can be safely parallelised.
10
11use crate::optimizer::adaptive::TriplePatternInfo;
12use crate::optimizer::materialized_view::{BindingRow, RdfTerm};
13use anyhow::Result;
14use std::collections::{HashMap, HashSet, VecDeque};
15use std::sync::Arc;
16
17// ---------------------------------------------------------------------------
18// Public trait: TripleStore
19// ---------------------------------------------------------------------------
20
21/// Trait abstracting a triple store that can evaluate a single pattern
22/// against an optional set of input bindings.
23///
24/// Implementations must be `Send + Sync` to allow parallel evaluation.
25pub trait TripleStore: Send + Sync {
26    /// Evaluate a triple pattern, optionally constrained by `bindings`.
27    ///
28    /// If `bindings` is `Some`, each row is used to ground variables before
29    /// evaluating the pattern (index-nested-loop style).
30    fn evaluate_pattern(
31        &self,
32        pattern: &TriplePatternInfo,
33        bindings: Option<&[BindingRow]>,
34    ) -> Result<Vec<BindingRow>>;
35
36    /// Return a quick cardinality estimate for a pattern (used for ordering).
37    fn estimate_cardinality(&self, pattern: &TriplePatternInfo) -> u64;
38}
39
40// ---------------------------------------------------------------------------
41// Dependency analysis
42// ---------------------------------------------------------------------------
43
44/// Dependency graph over a set of triple patterns.
45///
46/// Pattern `i` depends on pattern `j` when `j` binds a variable that `i`
47/// needs as input.  In the context of parallel evaluation, two patterns are
48/// *independent* when neither depends on the other.
49pub struct PatternDependencyGraph {
50    patterns: Vec<TriplePatternInfo>,
51    /// `dependencies[i]` = set of pattern indices that `i` depends on
52    dependencies: Vec<HashSet<usize>>,
53    /// Topologically sorted execution stages: each stage is a set of indices
54    /// that can be evaluated in parallel once all previous stages complete.
55    execution_stages: Vec<Vec<usize>>,
56}
57
58impl PatternDependencyGraph {
59    /// Build the dependency graph for the given pattern list.
60    pub fn build(patterns: Vec<TriplePatternInfo>) -> Self {
61        let n = patterns.len();
62        let mut dependencies: Vec<HashSet<usize>> = vec![HashSet::new(); n];
63
64        // Build a map from variable name -> first pattern that binds it
65        let mut var_producer: HashMap<String, usize> = HashMap::new();
66        for (i, pattern) in patterns.iter().enumerate() {
67            for var in &pattern.bound_variables {
68                var_producer.entry(var.clone()).or_insert(i);
69            }
70        }
71
72        // Pattern `i` depends on `j` if `j` is the producer of a variable
73        // that `i` also uses, and j != i.
74        for i in 0..n {
75            for (var_name, &producer) in &var_producer {
76                if producer == i {
77                    continue;
78                }
79                if patterns[i].bound_variables.contains(var_name) {
80                    dependencies[i].insert(producer);
81                }
82            }
83        }
84
85        let execution_stages = Self::topological_stages(&dependencies, n);
86
87        Self {
88            patterns,
89            dependencies,
90            execution_stages,
91        }
92    }
93
94    /// Return groups of patterns that can be evaluated in parallel.
95    pub fn get_independent_patterns(&self) -> Vec<Vec<usize>> {
96        self.execution_stages.clone()
97    }
98
99    /// Return the topologically sorted execution stages.
100    pub fn execution_order(&self) -> &[Vec<usize>] {
101        &self.execution_stages
102    }
103
104    /// Access the underlying patterns
105    pub fn patterns(&self) -> &[TriplePatternInfo] {
106        &self.patterns
107    }
108
109    /// Check whether two pattern indices are independent
110    pub fn are_independent(&self, i: usize, j: usize) -> bool {
111        !self.dependencies[i].contains(&j) && !self.dependencies[j].contains(&i)
112    }
113
114    // -----------------------------------------------------------------------
115    // Private helpers
116    // -----------------------------------------------------------------------
117
118    /// Kahn's algorithm for topological layering
119    fn topological_stages(dependencies: &[HashSet<usize>], n: usize) -> Vec<Vec<usize>> {
120        let mut in_degree: Vec<usize> = dependencies.iter().map(|d| d.len()).collect();
121        let mut reverse: Vec<Vec<usize>> = vec![Vec::new(); n];
122        for (i, deps) in dependencies.iter().enumerate() {
123            for &dep in deps {
124                reverse[dep].push(i);
125            }
126        }
127
128        let mut stages: Vec<Vec<usize>> = Vec::new();
129        let mut queue: VecDeque<usize> = in_degree
130            .iter()
131            .enumerate()
132            .filter(|(_, &d)| d == 0)
133            .map(|(i, _)| i)
134            .collect();
135
136        while !queue.is_empty() {
137            let stage: Vec<usize> = queue.drain(..).collect();
138            for &node in &stage {
139                for &dependent in &reverse[node] {
140                    in_degree[dependent] -= 1;
141                    if in_degree[dependent] == 0 {
142                        queue.push_back(dependent);
143                    }
144                }
145            }
146            stages.push(stage);
147        }
148
149        stages
150    }
151}
152
153// ---------------------------------------------------------------------------
154// Parallel BGP evaluator
155// ---------------------------------------------------------------------------
156
157/// Parallel evaluator for Basic Graph Patterns.
158pub struct ParallelBgpEvaluator {
159    /// Number of worker threads (0 = use Rayon's global pool)
160    pub num_threads: usize,
161    /// Minimum patterns per stage to justify parallelism overhead
162    pub chunk_size: usize,
163}
164
165impl Default for ParallelBgpEvaluator {
166    fn default() -> Self {
167        Self {
168            num_threads: num_cpus::get(),
169            chunk_size: 1,
170        }
171    }
172}
173
174impl ParallelBgpEvaluator {
175    /// Create a new evaluator with a specific thread count
176    pub fn new(num_threads: usize) -> Self {
177        Self {
178            num_threads,
179            chunk_size: 1,
180        }
181    }
182
183    /// Create a new evaluator with tunable chunk size
184    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
185        self.chunk_size = chunk_size.max(1);
186        self
187    }
188
189    /// Evaluate a BGP by exploiting parallelism among independent patterns.
190    pub fn evaluate(
191        &self,
192        patterns: Vec<TriplePatternInfo>,
193        store: &dyn TripleStore,
194    ) -> Result<Vec<BindingRow>> {
195        if patterns.is_empty() {
196            return Ok(Vec::new());
197        }
198
199        let graph = PatternDependencyGraph::build(patterns);
200        let stages = graph.execution_order().to_vec();
201
202        // Running binding set: starts as a single empty row (identity for join)
203        let mut current_bindings: Vec<BindingRow> = vec![BindingRow::new()];
204
205        for stage in &stages {
206            let stage_results =
207                self.evaluate_stage(stage, graph.patterns(), store, &current_bindings)?;
208
209            for (pattern_idx, pattern_rows) in stage_results {
210                let pattern = &graph.patterns()[pattern_idx];
211                // Only join on variables that already appear in current_bindings
212                let join_vars: Vec<String> = if current_bindings.is_empty() {
213                    Vec::new()
214                } else {
215                    let first_row = &current_bindings[0];
216                    pattern
217                        .bound_variables
218                        .iter()
219                        .filter(|v| first_row.contains_key(v.as_str()))
220                        .cloned()
221                        .collect()
222                };
223
224                current_bindings = self.merge_results(current_bindings, pattern_rows, &join_vars);
225            }
226        }
227
228        Ok(current_bindings)
229    }
230
231    /// Evaluate a set of pattern indices in parallel within a single stage.
232    fn evaluate_stage(
233        &self,
234        stage: &[usize],
235        patterns: &[TriplePatternInfo],
236        store: &dyn TripleStore,
237        current_bindings: &[BindingRow],
238    ) -> Result<Vec<(usize, Vec<BindingRow>)>> {
239        if stage.is_empty() {
240            return Ok(Vec::new());
241        }
242
243        if stage.len() < self.chunk_size || self.num_threads <= 1 {
244            return self.evaluate_stage_sequential(stage, patterns, store, current_bindings);
245        }
246
247        #[cfg(feature = "parallel")]
248        {
249            self.evaluate_stage_parallel(stage, patterns, store, current_bindings)
250        }
251        #[cfg(not(feature = "parallel"))]
252        {
253            self.evaluate_stage_sequential(stage, patterns, store, current_bindings)
254        }
255    }
256
257    fn evaluate_stage_sequential(
258        &self,
259        stage: &[usize],
260        patterns: &[TriplePatternInfo],
261        store: &dyn TripleStore,
262        current_bindings: &[BindingRow],
263    ) -> Result<Vec<(usize, Vec<BindingRow>)>> {
264        let mut results = Vec::with_capacity(stage.len());
265        for &idx in stage {
266            let rows = store.evaluate_pattern(&patterns[idx], Some(current_bindings))?;
267            results.push((idx, rows));
268        }
269        Ok(results)
270    }
271
272    #[cfg(feature = "parallel")]
273    fn evaluate_stage_parallel(
274        &self,
275        stage: &[usize],
276        patterns: &[TriplePatternInfo],
277        store: &dyn TripleStore,
278        current_bindings: &[BindingRow],
279    ) -> Result<Vec<(usize, Vec<BindingRow>)>> {
280        use rayon::prelude::*;
281        use std::sync::Mutex;
282
283        let error_cell: Arc<Mutex<Option<anyhow::Error>>> = Arc::new(Mutex::new(None));
284        let error_clone = Arc::clone(&error_cell);
285
286        let results: Vec<(usize, Vec<BindingRow>)> = stage
287            .par_iter()
288            .filter_map(|&idx| {
289                match store.evaluate_pattern(&patterns[idx], Some(current_bindings)) {
290                    Ok(rows) => Some((idx, rows)),
291                    Err(e) => {
292                        if let Ok(mut guard) = error_clone.lock() {
293                            if guard.is_none() {
294                                *guard = Some(e);
295                            }
296                        }
297                        None
298                    }
299                }
300            })
301            .collect();
302
303        if let Ok(mut guard) = error_cell.lock() {
304            if let Some(err) = guard.take() {
305                return Err(err);
306            }
307        }
308        Ok(results)
309    }
310
311    /// Hash join of two binding sets on shared variables.
312    pub fn merge_results(
313        &self,
314        left: Vec<BindingRow>,
315        right: Vec<BindingRow>,
316        join_vars: &[String],
317    ) -> Vec<BindingRow> {
318        if right.is_empty() {
319            return left;
320        }
321        if left.is_empty() {
322            return right;
323        }
324
325        // Cross product when no shared variables
326        if join_vars.is_empty() {
327            let mut output: Vec<BindingRow> = Vec::with_capacity(left.len() * right.len());
328            for l_row in &left {
329                for r_row in &right {
330                    let mut merged: BindingRow = l_row.clone();
331                    for (k, v) in r_row {
332                        merged.insert(k.clone(), v.clone());
333                    }
334                    output.push(merged);
335                }
336            }
337            return output;
338        }
339
340        // Build hash index over right side
341        let mut hash_index: HashMap<Vec<String>, Vec<usize>> = HashMap::new();
342        for (ridx, row) in right.iter().enumerate() {
343            let key: Vec<String> = join_vars.iter().map(|v| rdf_term_key(row.get(v))).collect();
344            hash_index.entry(key).or_default().push(ridx);
345        }
346
347        // Probe phase
348        let mut output: Vec<BindingRow> = Vec::new();
349        for l_row in &left {
350            let key: Vec<String> = join_vars
351                .iter()
352                .map(|v| rdf_term_key(l_row.get(v)))
353                .collect();
354
355            if let Some(right_indices) = hash_index.get(&key) {
356                for &ridx in right_indices {
357                    let r_row = &right[ridx];
358                    let mut merged: BindingRow = l_row.clone();
359                    for (k, v) in r_row {
360                        merged.insert(k.clone(), v.clone());
361                    }
362                    output.push(merged);
363                }
364            }
365        }
366        output
367    }
368}
369
370/// Convert an optional RdfTerm to a stable string key for hashing
371fn rdf_term_key(term: Option<&RdfTerm>) -> String {
372    match term {
373        None => String::new(),
374        Some(t) => format!("{t}"),
375    }
376}
377
378// ---------------------------------------------------------------------------
379// Mock TripleStore for testing
380// ---------------------------------------------------------------------------
381
382#[cfg(test)]
383pub(crate) mod test_support {
384    use super::*;
385
386    /// Simple in-memory triple store for tests.
387    pub struct MockTripleStore {
388        pub results: HashMap<String, Vec<BindingRow>>,
389        pub default_result: Vec<BindingRow>,
390    }
391
392    impl MockTripleStore {
393        pub fn new() -> Self {
394            Self {
395                results: HashMap::new(),
396                default_result: Vec::new(),
397            }
398        }
399
400        pub fn with_result(mut self, pattern_id: &str, rows: Vec<BindingRow>) -> Self {
401            self.results.insert(pattern_id.to_string(), rows);
402            self
403        }
404    }
405
406    impl TripleStore for MockTripleStore {
407        fn evaluate_pattern(
408            &self,
409            pattern: &TriplePatternInfo,
410            _bindings: Option<&[BindingRow]>,
411        ) -> Result<Vec<BindingRow>> {
412            Ok(self
413                .results
414                .get(&pattern.id)
415                .cloned()
416                .unwrap_or_else(|| self.default_result.clone()))
417        }
418
419        fn estimate_cardinality(&self, pattern: &TriplePatternInfo) -> u64 {
420            self.results
421                .get(&pattern.id)
422                .map(|r| r.len() as u64)
423                .unwrap_or(0)
424        }
425    }
426
427    pub fn iri_term(value: &str) -> RdfTerm {
428        RdfTerm::Iri(value.to_string())
429    }
430
431    pub fn make_row(pairs: &[(&str, RdfTerm)]) -> BindingRow {
432        pairs
433            .iter()
434            .map(|(k, v)| (k.to_string(), v.clone()))
435            .collect()
436    }
437}
438
439// ---------------------------------------------------------------------------
440// Tests
441// ---------------------------------------------------------------------------
442
443#[cfg(test)]
444mod tests {
445    use super::test_support::*;
446    use super::*;
447    use crate::optimizer::adaptive::{PatternTerm, TriplePatternInfo};
448    use crate::optimizer::materialized_view::RdfTerm;
449
450    fn simple_pattern(id: &str, vars: Vec<String>, cardinality: u64) -> TriplePatternInfo {
451        TriplePatternInfo {
452            id: id.to_string(),
453            subject: PatternTerm::Variable(vars.first().cloned().unwrap_or_default()),
454            predicate: PatternTerm::Iri(format!("http://example.org/p_{id}")),
455            object: PatternTerm::Variable(vars.last().cloned().unwrap_or_default()),
456            estimated_cardinality: cardinality,
457            bound_variables: vars,
458            original_pattern: None,
459        }
460    }
461
462    #[test]
463    fn test_dependency_graph_independent_patterns() {
464        let p1 = simple_pattern("p1", vec!["a".to_string(), "b".to_string()], 10);
465        let p2 = simple_pattern("p2", vec!["c".to_string(), "d".to_string()], 20);
466        let graph = PatternDependencyGraph::build(vec![p1, p2]);
467
468        assert!(
469            graph.are_independent(0, 1),
470            "Patterns with no shared vars should be independent"
471        );
472        let stages = graph.get_independent_patterns();
473        assert_eq!(stages.len(), 1, "Independent patterns fit into one stage");
474        assert_eq!(stages[0].len(), 2);
475    }
476
477    #[test]
478    fn test_dependency_graph_dependent_patterns() {
479        let p1 = simple_pattern("p1", vec!["s".to_string(), "type".to_string()], 10);
480        let p2 = simple_pattern("p2", vec!["s".to_string(), "name".to_string()], 100);
481        let graph = PatternDependencyGraph::build(vec![p1, p2]);
482
483        let stages = graph.get_independent_patterns();
484        let total: usize = stages.iter().map(|s| s.len()).sum();
485        assert_eq!(total, 2, "All patterns should appear across stages");
486    }
487
488    #[test]
489    fn test_parallel_evaluator_empty_patterns() {
490        let evaluator = ParallelBgpEvaluator::new(2);
491        let store = MockTripleStore::new();
492        let result = evaluator.evaluate(vec![], &store).unwrap();
493        assert!(result.is_empty());
494    }
495
496    #[test]
497    fn test_parallel_evaluator_single_pattern() {
498        let pattern = simple_pattern("pat1", vec!["s".to_string()], 2);
499        let rows = vec![
500            make_row(&[("s", iri_term("http://example.org/a"))]),
501            make_row(&[("s", iri_term("http://example.org/b"))]),
502        ];
503        let store = MockTripleStore::new().with_result("pat1", rows);
504        let evaluator = ParallelBgpEvaluator::new(1);
505        let result = evaluator.evaluate(vec![pattern], &store).unwrap();
506        assert_eq!(result.len(), 2);
507    }
508
509    #[test]
510    fn test_parallel_evaluator_two_patterns_with_join() {
511        let p1 = simple_pattern("p1", vec!["s".to_string(), "type".to_string()], 2);
512        let p2 = simple_pattern("p2", vec!["s".to_string(), "name".to_string()], 2);
513
514        let p1_rows = vec![
515            make_row(&[
516                ("s", iri_term("http://example.org/alice")),
517                ("type", iri_term("http://example.org/Person")),
518            ]),
519            make_row(&[
520                ("s", iri_term("http://example.org/bob")),
521                ("type", iri_term("http://example.org/Person")),
522            ]),
523        ];
524        let p2_rows = vec![
525            make_row(&[
526                ("s", iri_term("http://example.org/alice")),
527                ("name", RdfTerm::plain_literal("Alice")),
528            ]),
529            make_row(&[
530                ("s", iri_term("http://example.org/bob")),
531                ("name", RdfTerm::plain_literal("Bob")),
532            ]),
533        ];
534
535        let store = MockTripleStore::new()
536            .with_result("p1", p1_rows)
537            .with_result("p2", p2_rows);
538
539        let evaluator = ParallelBgpEvaluator::new(2);
540        let result = evaluator.evaluate(vec![p1, p2], &store).unwrap();
541
542        assert_eq!(
543            result.len(),
544            2,
545            "Should produce 2 joined rows (one per person)"
546        );
547        for row in &result {
548            assert!(row.contains_key("s"));
549            assert!(row.contains_key("name"));
550        }
551    }
552
553    #[test]
554    fn test_merge_results_no_join_vars_cross_product() {
555        let evaluator = ParallelBgpEvaluator::new(1);
556        let left = vec![
557            make_row(&[("a", iri_term("http://example.org/1"))]),
558            make_row(&[("a", iri_term("http://example.org/2"))]),
559        ];
560        let right = vec![make_row(&[("b", iri_term("http://example.org/x"))])];
561
562        let merged = evaluator.merge_results(left, right, &[]);
563        assert_eq!(merged.len(), 2, "Cross product of 2x1 = 2 rows");
564    }
565
566    #[test]
567    fn test_merge_results_with_join_var() {
568        let evaluator = ParallelBgpEvaluator::new(1);
569        let left = vec![
570            make_row(&[
571                ("s", iri_term("http://a")),
572                ("type", iri_term("http://Person")),
573            ]),
574            make_row(&[
575                ("s", iri_term("http://b")),
576                ("type", iri_term("http://Person")),
577            ]),
578        ];
579        let right = vec![make_row(&[
580            ("s", iri_term("http://a")),
581            ("name", RdfTerm::plain_literal("Alice")),
582        ])];
583
584        let merged = evaluator.merge_results(left, right, &["s".to_string()]);
585        assert_eq!(merged.len(), 1);
586        assert_eq!(
587            merged[0].get("name"),
588            Some(&RdfTerm::plain_literal("Alice"))
589        );
590    }
591
592    #[test]
593    fn test_merge_results_empty_left_returns_right() {
594        let evaluator = ParallelBgpEvaluator::new(1);
595        let right = vec![make_row(&[("s", iri_term("http://a"))])];
596        let merged = evaluator.merge_results(vec![], right, &[]);
597        assert_eq!(merged.len(), 1);
598    }
599
600    #[test]
601    fn test_merge_results_empty_right_returns_left() {
602        let evaluator = ParallelBgpEvaluator::new(1);
603        let left = vec![make_row(&[("s", iri_term("http://a"))])];
604        let merged = evaluator.merge_results(left, vec![], &[]);
605        assert_eq!(merged.len(), 1);
606    }
607
608    #[test]
609    fn test_dependency_graph_three_chain() {
610        let p1 = simple_pattern("p1", vec!["x".to_string()], 5);
611        let p2 = simple_pattern("p2", vec!["x".to_string(), "y".to_string()], 50);
612        let p3 = simple_pattern("p3", vec!["y".to_string(), "z".to_string()], 500);
613
614        let graph = PatternDependencyGraph::build(vec![p1, p2, p3]);
615        let stages = graph.execution_order();
616        let total: usize = stages.iter().map(|s| s.len()).sum();
617        assert_eq!(total, 3);
618        assert!(!stages.is_empty());
619    }
620
621    #[test]
622    fn test_evaluator_default_thread_count() {
623        let evaluator = ParallelBgpEvaluator::default();
624        assert!(evaluator.num_threads >= 1);
625    }
626}
627
628#[cfg(test)]
629mod extended_tests {
630    use super::test_support::*;
631    use super::*;
632    use crate::optimizer::adaptive::{PatternTerm, TriplePatternInfo};
633    use crate::optimizer::materialized_view::RdfTerm;
634
635    fn pat(id: &str, vars: Vec<String>, cardinality: u64) -> TriplePatternInfo {
636        TriplePatternInfo {
637            id: id.to_string(),
638            subject: PatternTerm::Variable(vars.first().cloned().unwrap_or_default()),
639            predicate: PatternTerm::Iri(format!("http://example.org/p_{id}")),
640            object: PatternTerm::Variable(vars.last().cloned().unwrap_or_default()),
641            estimated_cardinality: cardinality,
642            bound_variables: vars,
643            original_pattern: None,
644        }
645    }
646
647    // --- PatternDependencyGraph extended tests ---
648
649    #[test]
650    fn test_dependency_graph_single_pattern() {
651        let p1 = pat("solo", vec!["x".to_string()], 10);
652        let graph = PatternDependencyGraph::build(vec![p1]);
653
654        let stages = graph.get_independent_patterns();
655        assert_eq!(
656            stages.len(),
657            1,
658            "Single pattern should produce a single stage"
659        );
660        assert_eq!(stages[0], vec![0], "Stage 0 should contain pattern 0");
661    }
662
663    #[test]
664    fn test_dependency_graph_no_patterns() {
665        let graph = PatternDependencyGraph::build(vec![]);
666        assert!(graph.get_independent_patterns().is_empty());
667    }
668
669    #[test]
670    fn test_dependency_graph_are_independent_different_vars() {
671        let p1 = pat("p1", vec!["a".to_string(), "b".to_string()], 10);
672        let p2 = pat("p2", vec!["c".to_string(), "d".to_string()], 10);
673        let graph = PatternDependencyGraph::build(vec![p1, p2]);
674
675        assert!(
676            graph.are_independent(0, 1),
677            "Patterns with disjoint variables should be independent"
678        );
679    }
680
681    #[test]
682    fn test_dependency_graph_are_not_independent_shared_var() {
683        let p1 = pat("p1", vec!["s".to_string(), "o1".to_string()], 10);
684        let p2 = pat("p2", vec!["s".to_string(), "o2".to_string()], 10);
685        let graph = PatternDependencyGraph::build(vec![p1, p2]);
686
687        // The graph.are_independent() checks if both directions are dependency-free
688        // Shared variable "s" should create a dependency
689        // They can still be in the same stage if neither depends on the other's bindings
690        let _stages = graph.get_independent_patterns();
691        let patterns = graph.patterns();
692        assert_eq!(patterns.len(), 2, "Graph should contain 2 patterns");
693    }
694
695    #[test]
696    fn test_dependency_graph_execution_order_returns_all_patterns() {
697        let p1 = pat("p1", vec!["a".to_string()], 10);
698        let p2 = pat("p2", vec!["b".to_string()], 20);
699        let p3 = pat("p3", vec!["c".to_string()], 30);
700        let graph = PatternDependencyGraph::build(vec![p1, p2, p3]);
701
702        let total_in_stages: usize = graph.execution_order().iter().map(|s| s.len()).sum();
703        assert_eq!(
704            total_in_stages, 3,
705            "All patterns should appear in execution stages"
706        );
707    }
708
709    #[test]
710    fn test_dependency_graph_patterns_accessor() {
711        let p1 = pat("x", vec!["a".to_string()], 5);
712        let p2 = pat("y", vec!["b".to_string()], 15);
713        let graph = PatternDependencyGraph::build(vec![p1, p2]);
714
715        let patterns = graph.patterns();
716        assert_eq!(patterns.len(), 2);
717        assert_eq!(patterns[0].estimated_cardinality, 5);
718        assert_eq!(patterns[1].estimated_cardinality, 15);
719    }
720
721    // --- merge_results extended tests ---
722
723    #[test]
724    fn test_merge_results_multi_var_join() {
725        let evaluator = ParallelBgpEvaluator::new(1);
726
727        let mut row_l = BindingRow::new();
728        row_l.insert("x".to_string(), RdfTerm::iri("http://a"));
729        row_l.insert("y".to_string(), RdfTerm::iri("http://b"));
730
731        let mut row_r = BindingRow::new();
732        row_r.insert("x".to_string(), RdfTerm::iri("http://a"));
733        row_r.insert("y".to_string(), RdfTerm::iri("http://b"));
734        row_r.insert("z".to_string(), RdfTerm::iri("http://c"));
735
736        let result = evaluator.merge_results(
737            vec![row_l],
738            vec![row_r],
739            &["x".to_string(), "y".to_string()],
740        );
741
742        assert_eq!(
743            result.len(),
744            1,
745            "Matching multi-var join should produce one row"
746        );
747        assert!(
748            result[0].contains_key("z"),
749            "Joined row should contain z from right side"
750        );
751    }
752
753    #[test]
754    fn test_merge_results_no_matching_join_vars() {
755        let evaluator = ParallelBgpEvaluator::new(1);
756
757        let mut row_l = BindingRow::new();
758        row_l.insert("x".to_string(), RdfTerm::iri("http://a"));
759
760        let mut row_r = BindingRow::new();
761        row_r.insert("x".to_string(), RdfTerm::iri("http://DIFFERENT"));
762
763        let result = evaluator.merge_results(vec![row_l], vec![row_r], &["x".to_string()]);
764
765        assert_eq!(
766            result.len(),
767            0,
768            "Non-matching join should produce empty result"
769        );
770    }
771
772    #[test]
773    fn test_merge_results_multiple_right_matches() {
774        let evaluator = ParallelBgpEvaluator::new(1);
775
776        let mut row_l = BindingRow::new();
777        row_l.insert("x".to_string(), RdfTerm::iri("http://shared"));
778
779        let right: Vec<BindingRow> = (0..3)
780            .map(|i| {
781                let mut row = BindingRow::new();
782                row.insert("x".to_string(), RdfTerm::iri("http://shared"));
783                row.insert("y".to_string(), RdfTerm::iri(format!("http://val{i}")));
784                row
785            })
786            .collect();
787
788        let result = evaluator.merge_results(vec![row_l], right, &["x".to_string()]);
789        assert_eq!(
790            result.len(),
791            3,
792            "Should produce one row for each matching right-side row"
793        );
794    }
795
796    // --- ParallelBgpEvaluator configuration tests ---
797
798    #[test]
799    fn test_evaluator_chunk_size_minimum_is_one() {
800        let evaluator = ParallelBgpEvaluator::new(4).with_chunk_size(0);
801        assert_eq!(evaluator.chunk_size, 1, "Chunk size should be at least 1");
802    }
803
804    #[test]
805    fn test_evaluator_chunk_size_set_correctly() {
806        let evaluator = ParallelBgpEvaluator::new(4).with_chunk_size(8);
807        assert_eq!(evaluator.chunk_size, 8);
808    }
809
810    #[test]
811    fn test_evaluator_default_uses_cpu_count() {
812        let evaluator = ParallelBgpEvaluator::default();
813        assert!(evaluator.num_threads >= 1, "Should use at least 1 thread");
814    }
815
816    // --- Evaluate with MockTripleStore ---
817
818    #[test]
819    fn test_evaluate_no_results_from_store() {
820        // MockTripleStore.new() returns empty rows by default for unknown pattern ids.
821        // The evaluator starts with a single empty binding row (identity element for join).
822        // merge_results with right=empty returns left, so we get back the initial empty row.
823        // Verify that the result contains no variable bindings (all rows are empty maps).
824        let store = MockTripleStore::new();
825        let evaluator = ParallelBgpEvaluator::new(1);
826
827        let pattern = pat("no_results", vec!["x".to_string(), "y".to_string()], 100);
828        let result = evaluator.evaluate(vec![pattern], &store).unwrap();
829        // Result may contain the initial empty binding row - verify no actual bindings exist
830        let has_bindings = result.iter().any(|row| !row.is_empty());
831        assert!(
832            !has_bindings,
833            "Empty store should produce no variable bindings"
834        );
835    }
836
837    #[test]
838    fn test_evaluate_two_independent_patterns_cross_product() {
839        let mut store = MockTripleStore::new();
840
841        let p1 = pat("pat_a", vec!["a".to_string()], 2);
842        let p2 = pat("pat_b", vec!["b".to_string()], 3);
843
844        store.results.insert(
845            "pat_a".to_string(),
846            vec![
847                make_row(&[("a", iri_term("http://a1"))]),
848                make_row(&[("a", iri_term("http://a2"))]),
849            ],
850        );
851        store.results.insert(
852            "pat_b".to_string(),
853            vec![
854                make_row(&[("b", iri_term("http://b1"))]),
855                make_row(&[("b", iri_term("http://b2"))]),
856                make_row(&[("b", iri_term("http://b3"))]),
857            ],
858        );
859
860        let evaluator = ParallelBgpEvaluator::new(1);
861        let result = evaluator.evaluate(vec![p1, p2], &store).unwrap();
862
863        // 2 * 3 = 6 cross-product rows (independent patterns)
864        assert_eq!(
865            result.len(),
866            6,
867            "Independent patterns produce cross product"
868        );
869    }
870}