Skip to main content

oxirs_arq/
executor_pipeline.rs

1//! Parallel Query Execution Pipeline
2//!
3//! This module provides parallel execution primitives for SPARQL sub-queries:
4//!
5//! - [`BindingMap`] — a single row of query result bindings (variable → value)
6//! - [`PipelineStage`] — a map or filter transformation on binding rows
7//! - [`ParallelPipelineStage`] — a composable sequence of pipeline stages
8//! - [`UnionParallelExecutor`] — executes union branches in parallel using rayon
9//!
10//! # Design
11//!
12//! The pipeline model allows complex query execution plans to be expressed as
13//! chains of transformations.  `ParallelPipelineStage::chain` runs multiple
14//! independent pipelines and collects their outputs.  `UnionParallelExecutor`
15//! merges results from parallel union branches and optionally handles LEFT JOIN
16//! (OPTIONAL) semantics.
17
18use rayon::prelude::*;
19use std::collections::HashMap;
20
21// ---------------------------------------------------------------------------
22// BindingMap
23// ---------------------------------------------------------------------------
24
25/// A single row of SPARQL query result bindings: variable name → value string.
26///
27/// This is a thin newtype over `HashMap<String, String>` that provides
28/// convenient construction and merging helpers.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct BindingMap(pub HashMap<String, String>);
31
32impl BindingMap {
33    /// Create an empty binding map.
34    pub fn new() -> Self {
35        Self(HashMap::new())
36    }
37
38    /// Create a binding map from an iterator of `(variable, value)` pairs.
39    pub fn from_pairs(
40        pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
41    ) -> Self {
42        Self(
43            pairs
44                .into_iter()
45                .map(|(k, v)| (k.into(), v.into()))
46                .collect(),
47        )
48    }
49
50    /// Bind a variable to a value.
51    pub fn bind(&mut self, variable: impl Into<String>, value: impl Into<String>) {
52        self.0.insert(variable.into(), value.into());
53    }
54
55    /// Get the value bound to `variable`, if any.
56    pub fn get(&self, variable: &str) -> Option<&str> {
57        self.0.get(variable).map(|s| s.as_str())
58    }
59
60    /// Number of bound variables.
61    pub fn len(&self) -> usize {
62        self.0.len()
63    }
64
65    /// Whether no variables are bound.
66    pub fn is_empty(&self) -> bool {
67        self.0.is_empty()
68    }
69
70    /// Merge another binding map into this one.  Values from `other` override.
71    pub fn merge(&mut self, other: &BindingMap) {
72        for (k, v) in &other.0 {
73            self.0.insert(k.clone(), v.clone());
74        }
75    }
76
77    /// Return a new binding map that is the left-outer-join result of `self`
78    /// extended with compatible bindings from `other`.
79    ///
80    /// Two binding maps are *compatible* if they agree on all variables they
81    /// share.  When compatible, the merged map contains all bindings from both.
82    pub fn compatible_merge(&self, other: &BindingMap) -> Option<BindingMap> {
83        // Check compatibility: shared variables must have the same value.
84        for (k, v) in &other.0 {
85            if let Some(existing) = self.0.get(k) {
86                if existing != v {
87                    return None;
88                }
89            }
90        }
91        let mut merged = self.clone();
92        for (k, v) in &other.0 {
93            merged.0.insert(k.clone(), v.clone());
94        }
95        Some(merged)
96    }
97
98    /// Canonical string representation for deduplication.
99    pub fn canonical_key(&self) -> String {
100        let mut pairs: Vec<(&str, &str)> = self
101            .0
102            .iter()
103            .map(|(k, v)| (k.as_str(), v.as_str()))
104            .collect();
105        pairs.sort_unstable();
106        pairs
107            .into_iter()
108            .map(|(k, v)| format!("{k}={v}"))
109            .collect::<Vec<_>>()
110            .join(";")
111    }
112}
113
114impl Default for BindingMap {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl std::fmt::Display for BindingMap {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        write!(f, "{{{}}}", self.canonical_key())
123    }
124}
125
126// ---------------------------------------------------------------------------
127// PipelineStage
128// ---------------------------------------------------------------------------
129
130/// A single transformation stage in a query execution pipeline.
131pub enum PipelineStage {
132    /// Map each binding row to an optional new row (None = remove from stream).
133    Map(Box<dyn Fn(BindingMap) -> Option<BindingMap> + Send + Sync>),
134    /// Filter binding rows: only rows where the predicate returns `true` pass.
135    Filter(Box<dyn Fn(&BindingMap) -> bool + Send + Sync>),
136}
137
138impl PipelineStage {
139    /// Apply this stage to a single binding map.  Returns `None` if the row
140    /// is filtered out.
141    pub fn apply(&self, row: BindingMap) -> Option<BindingMap> {
142        match self {
143            PipelineStage::Map(f) => f(row),
144            PipelineStage::Filter(pred) => {
145                if pred(&row) {
146                    Some(row)
147                } else {
148                    None
149                }
150            }
151        }
152    }
153}
154
155// ---------------------------------------------------------------------------
156// ParallelPipelineStage
157// ---------------------------------------------------------------------------
158
159/// A composable, sequential pipeline of [`PipelineStage`] transformations.
160///
161/// Stages are applied left-to-right.  Use `map_stage` and `filter_stage`
162/// as constructors, then `add_map` / `add_filter` to extend the pipeline.
163/// Use `process` to run the pipeline on a batch of inputs.
164///
165/// Multiple pipelines can be composed with `chain`.
166pub struct ParallelPipelineStage {
167    stages: Vec<PipelineStage>,
168}
169
170impl ParallelPipelineStage {
171    /// Create a pipeline starting with a single map stage.
172    pub fn map_stage(f: impl Fn(BindingMap) -> Option<BindingMap> + Send + Sync + 'static) -> Self {
173        Self {
174            stages: vec![PipelineStage::Map(Box::new(f))],
175        }
176    }
177
178    /// Create a pipeline starting with a single filter stage.
179    pub fn filter_stage(pred: impl Fn(&BindingMap) -> bool + Send + Sync + 'static) -> Self {
180        Self {
181            stages: vec![PipelineStage::Filter(Box::new(pred))],
182        }
183    }
184
185    /// Append a map stage to this pipeline.
186    pub fn add_map(
187        mut self,
188        f: impl Fn(BindingMap) -> Option<BindingMap> + Send + Sync + 'static,
189    ) -> Self {
190        self.stages.push(PipelineStage::Map(Box::new(f)));
191        self
192    }
193
194    /// Append a filter stage to this pipeline.
195    pub fn add_filter(
196        mut self,
197        pred: impl Fn(&BindingMap) -> bool + Send + Sync + 'static,
198    ) -> Self {
199        self.stages.push(PipelineStage::Filter(Box::new(pred)));
200        self
201    }
202
203    /// Apply all stages sequentially to every input row and collect the results.
204    pub fn process(&self, inputs: Vec<BindingMap>) -> Vec<BindingMap> {
205        inputs
206            .into_iter()
207            .filter_map(|mut row| {
208                for stage in &self.stages {
209                    row = stage.apply(row)?;
210                }
211                Some(row)
212            })
213            .collect()
214    }
215
216    /// Run multiple independent pipelines over the same `inputs` and collect
217    /// all outputs (concatenation, not deduplication).
218    pub fn chain(stages: Vec<Self>, inputs: Vec<BindingMap>) -> Vec<BindingMap> {
219        stages
220            .into_iter()
221            .flat_map(|pipeline| pipeline.process(inputs.clone()))
222            .collect()
223    }
224
225    /// Return the number of stages in this pipeline.
226    pub fn stage_count(&self) -> usize {
227        self.stages.len()
228    }
229}
230
231// ---------------------------------------------------------------------------
232// UnionParallelExecutor
233// ---------------------------------------------------------------------------
234
235/// Executes independent SPARQL union branches in parallel and merges results.
236///
237/// Uses rayon for data-parallelism across branches.  Deduplication uses the
238/// canonical string key of each `BindingMap`.
239pub struct UnionParallelExecutor;
240
241impl UnionParallelExecutor {
242    /// Execute multiple pre-computed branches (each a `Vec<BindingMap>`) in
243    /// parallel and merge the results.
244    ///
245    /// Duplicates are removed using the canonical string representation of each
246    /// `BindingMap` as the deduplication key.
247    pub fn execute_branches(branches: Vec<Vec<BindingMap>>) -> Vec<BindingMap> {
248        if branches.is_empty() {
249            return vec![];
250        }
251        if branches.len() == 1 {
252            return branches.into_iter().next().unwrap_or_default();
253        }
254
255        // Merge all branches in parallel.
256        let merged: Vec<BindingMap> = branches
257            .into_par_iter()
258            .flat_map(|branch| branch.into_par_iter())
259            .collect();
260
261        Self::dedup(merged)
262    }
263
264    /// Compute the LEFT OUTER JOIN (OPTIONAL) of `main` rows with `optional` rows.
265    ///
266    /// For each row in `main`:
267    /// - If there is at least one compatible row in `optional`, emit all compatible
268    ///   merged rows.
269    /// - Otherwise, emit the `main` row unchanged (preserving LEFT JOIN semantics).
270    pub fn execute_optional(main: Vec<BindingMap>, optional: Vec<BindingMap>) -> Vec<BindingMap> {
271        if optional.is_empty() {
272            return main;
273        }
274
275        main.into_par_iter()
276            .flat_map(|main_row| {
277                let compatible: Vec<BindingMap> = optional
278                    .iter()
279                    .filter_map(|opt_row| main_row.compatible_merge(opt_row))
280                    .collect();
281                if compatible.is_empty() {
282                    vec![main_row]
283                } else {
284                    compatible
285                }
286            })
287            .collect()
288    }
289
290    /// Remove duplicate binding maps by canonical key.
291    pub fn dedup(rows: Vec<BindingMap>) -> Vec<BindingMap> {
292        let mut seen = std::collections::HashSet::new();
293        rows.into_iter()
294            .filter(|row| seen.insert(row.canonical_key()))
295            .collect()
296    }
297}
298
299// ---------------------------------------------------------------------------
300// Tests
301// ---------------------------------------------------------------------------
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    fn bm(pairs: &[(&str, &str)]) -> BindingMap {
308        BindingMap::from_pairs(pairs.iter().map(|&(k, v)| (k, v)))
309    }
310
311    // ------------------------------------------------------------------
312    // BindingMap tests
313    // ------------------------------------------------------------------
314
315    #[test]
316    fn test_binding_map_new_empty() {
317        let m = BindingMap::new();
318        assert!(m.is_empty());
319        assert_eq!(m.len(), 0);
320    }
321
322    #[test]
323    fn test_binding_map_from_pairs() {
324        let m = bm(&[("s", "http://ex.org/s"), ("p", "http://ex.org/p")]);
325        assert_eq!(m.get("s"), Some("http://ex.org/s"));
326        assert_eq!(m.get("p"), Some("http://ex.org/p"));
327        assert_eq!(m.len(), 2);
328    }
329
330    #[test]
331    fn test_binding_map_bind() {
332        let mut m = BindingMap::new();
333        m.bind("x", "value");
334        assert_eq!(m.get("x"), Some("value"));
335    }
336
337    #[test]
338    fn test_binding_map_get_missing() {
339        let m = BindingMap::new();
340        assert_eq!(m.get("missing"), None);
341    }
342
343    #[test]
344    fn test_binding_map_merge_override() {
345        let mut m1 = bm(&[("a", "1"), ("b", "2")]);
346        let m2 = bm(&[("b", "override"), ("c", "3")]);
347        m1.merge(&m2);
348        assert_eq!(m1.get("b"), Some("override"));
349        assert_eq!(m1.get("c"), Some("3"));
350    }
351
352    #[test]
353    fn test_binding_map_compatible_merge_compatible() {
354        let m1 = bm(&[("s", "http://ex.org/s"), ("p", "http://ex.org/p")]);
355        let m2 = bm(&[("s", "http://ex.org/s"), ("o", "http://ex.org/o")]);
356        let result = m1.compatible_merge(&m2);
357        assert!(result.is_some());
358        let merged = result.unwrap();
359        assert_eq!(merged.get("o"), Some("http://ex.org/o"));
360    }
361
362    #[test]
363    fn test_binding_map_compatible_merge_incompatible() {
364        let m1 = bm(&[("s", "http://ex.org/s1")]);
365        let m2 = bm(&[("s", "http://ex.org/s2")]);
366        assert!(m1.compatible_merge(&m2).is_none());
367    }
368
369    #[test]
370    fn test_binding_map_canonical_key_stable() {
371        let m = bm(&[("z", "3"), ("a", "1"), ("m", "2")]);
372        let key = m.canonical_key();
373        // Should be sorted.
374        assert!(key.starts_with("a=1"));
375    }
376
377    #[test]
378    fn test_binding_map_display() {
379        let m = bm(&[("x", "1")]);
380        let s = format!("{m}");
381        assert!(s.contains("x=1"));
382    }
383
384    #[test]
385    fn test_binding_map_default() {
386        let m = BindingMap::default();
387        assert!(m.is_empty());
388    }
389
390    // ------------------------------------------------------------------
391    // PipelineStage tests
392    // ------------------------------------------------------------------
393
394    #[test]
395    fn test_pipeline_stage_map_passes() {
396        let stage = PipelineStage::Map(Box::new(|mut bm| {
397            bm.bind("extra", "value");
398            Some(bm)
399        }));
400        let row = bm(&[("x", "1")]);
401        let result = stage.apply(row);
402        assert!(result.is_some());
403        assert_eq!(result.unwrap().get("extra"), Some("value"));
404    }
405
406    #[test]
407    fn test_pipeline_stage_map_removes() {
408        let stage = PipelineStage::Map(Box::new(|_| None));
409        let row = bm(&[("x", "1")]);
410        assert!(stage.apply(row).is_none());
411    }
412
413    #[test]
414    fn test_pipeline_stage_filter_passes() {
415        let stage = PipelineStage::Filter(Box::new(|_| true));
416        let row = bm(&[("x", "1")]);
417        assert!(stage.apply(row).is_some());
418    }
419
420    #[test]
421    fn test_pipeline_stage_filter_removes() {
422        let stage = PipelineStage::Filter(Box::new(|_| false));
423        let row = bm(&[("x", "1")]);
424        assert!(stage.apply(row).is_none());
425    }
426
427    // ------------------------------------------------------------------
428    // ParallelPipelineStage tests
429    // ------------------------------------------------------------------
430
431    #[test]
432    fn test_pipeline_map_stage_constructor() {
433        let pipeline = ParallelPipelineStage::map_stage(|mut b| {
434            b.bind("new", "val");
435            Some(b)
436        });
437        assert_eq!(pipeline.stage_count(), 1);
438    }
439
440    #[test]
441    fn test_pipeline_filter_stage_constructor() {
442        let pipeline = ParallelPipelineStage::filter_stage(|_| true);
443        assert_eq!(pipeline.stage_count(), 1);
444    }
445
446    #[test]
447    fn test_pipeline_add_map() {
448        let pipeline = ParallelPipelineStage::filter_stage(|_| true).add_map(Some);
449        assert_eq!(pipeline.stage_count(), 2);
450    }
451
452    #[test]
453    fn test_pipeline_add_filter() {
454        let pipeline = ParallelPipelineStage::map_stage(Some).add_filter(|_| true);
455        assert_eq!(pipeline.stage_count(), 2);
456    }
457
458    #[test]
459    fn test_pipeline_process_map_all() {
460        let pipeline = ParallelPipelineStage::map_stage(|mut b| {
461            b.bind("added", "yes");
462            Some(b)
463        });
464        let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
465        let result = pipeline.process(inputs);
466        assert_eq!(result.len(), 2);
467        for r in &result {
468            assert_eq!(r.get("added"), Some("yes"));
469        }
470    }
471
472    #[test]
473    fn test_pipeline_process_filter_some() {
474        let pipeline = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("1"));
475        let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")]), bm(&[("x", "1")])];
476        let result = pipeline.process(inputs);
477        assert_eq!(result.len(), 2);
478    }
479
480    #[test]
481    fn test_pipeline_process_filter_all_out() {
482        let pipeline = ParallelPipelineStage::filter_stage(|_| false);
483        let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
484        let result = pipeline.process(inputs);
485        assert!(result.is_empty());
486    }
487
488    #[test]
489    fn test_pipeline_process_empty_input() {
490        let pipeline = ParallelPipelineStage::map_stage(Some);
491        let result = pipeline.process(vec![]);
492        assert!(result.is_empty());
493    }
494
495    #[test]
496    fn test_pipeline_chain_two_pipelines() {
497        let p1 = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("1"));
498        let p2 = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("2"));
499        let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")]), bm(&[("x", "3")])];
500        let result = ParallelPipelineStage::chain(vec![p1, p2], inputs);
501        // p1 passes 1 row, p2 passes 1 row → 2 total.
502        assert_eq!(result.len(), 2);
503    }
504
505    #[test]
506    fn test_pipeline_chain_empty_stages() {
507        let result = ParallelPipelineStage::chain(vec![], vec![bm(&[("x", "1")])]);
508        assert!(result.is_empty());
509    }
510
511    #[test]
512    fn test_pipeline_multi_stage_composition() {
513        // Map: add a field, then Filter: keep only rows where new field equals "mapped".
514        let pipeline = ParallelPipelineStage::map_stage(|mut b| {
515            let x = b.get("x").unwrap_or("").to_string();
516            b.bind("label", format!("item_{x}"));
517            Some(b)
518        })
519        .add_filter(|b| b.get("label").is_some_and(|l| l.starts_with("item_")));
520
521        let inputs = vec![bm(&[("x", "a")]), bm(&[("x", "b")])];
522        let result = pipeline.process(inputs);
523        assert_eq!(result.len(), 2);
524        assert_eq!(result[0].get("label"), Some("item_a"));
525    }
526
527    // ------------------------------------------------------------------
528    // UnionParallelExecutor tests
529    // ------------------------------------------------------------------
530
531    #[test]
532    fn test_union_executor_empty() {
533        let result = UnionParallelExecutor::execute_branches(vec![]);
534        assert!(result.is_empty());
535    }
536
537    #[test]
538    fn test_union_executor_single_branch() {
539        let branch = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
540        let result = UnionParallelExecutor::execute_branches(vec![branch]);
541        assert_eq!(result.len(), 2);
542    }
543
544    #[test]
545    fn test_union_executor_multiple_branches_dedup() {
546        let b1 = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
547        let b2 = vec![bm(&[("x", "2")]), bm(&[("x", "3")])]; // "x=2" is a duplicate.
548        let result = UnionParallelExecutor::execute_branches(vec![b1, b2]);
549        assert_eq!(result.len(), 3, "duplicate should be removed");
550    }
551
552    #[test]
553    fn test_union_executor_multiple_branches_no_overlap() {
554        let b1 = vec![bm(&[("x", "1")])];
555        let b2 = vec![bm(&[("x", "2")])];
556        let b3 = vec![bm(&[("x", "3")])];
557        let result = UnionParallelExecutor::execute_branches(vec![b1, b2, b3]);
558        assert_eq!(result.len(), 3);
559    }
560
561    #[test]
562    fn test_optional_executor_empty_optional() {
563        let main = vec![bm(&[("s", "s1")]), bm(&[("s", "s2")])];
564        let result = UnionParallelExecutor::execute_optional(main.clone(), vec![]);
565        // When optional is empty, return main unchanged.
566        assert_eq!(result.len(), 2);
567    }
568
569    #[test]
570    fn test_optional_executor_compatible_rows() {
571        // main row binds ?s; optional row adds ?o for the same ?s.
572        let main = vec![bm(&[("s", "s1")])];
573        let optional = vec![bm(&[("s", "s1"), ("o", "o1")])];
574        let result = UnionParallelExecutor::execute_optional(main, optional);
575        assert_eq!(result.len(), 1);
576        assert_eq!(result[0].get("o"), Some("o1"));
577    }
578
579    #[test]
580    fn test_optional_executor_no_compatible_rows() {
581        // main and optional share ?s but with different values.
582        let main = vec![bm(&[("s", "s1")])];
583        let optional = vec![bm(&[("s", "s2"), ("o", "o1")])];
584        let result = UnionParallelExecutor::execute_optional(main, optional);
585        // No compatible rows → return main row unchanged.
586        assert_eq!(result.len(), 1);
587        assert_eq!(result[0].get("s"), Some("s1"));
588        assert_eq!(result[0].get("o"), None);
589    }
590
591    #[test]
592    fn test_optional_executor_multiple_compatible() {
593        // main row binds ?s=s1; two optional rows are compatible (different ?o values).
594        let main = vec![bm(&[("s", "s1")])];
595        let optional = vec![
596            bm(&[("s", "s1"), ("o", "o1")]),
597            bm(&[("s", "s1"), ("o", "o2")]),
598        ];
599        let result = UnionParallelExecutor::execute_optional(main, optional);
600        assert_eq!(result.len(), 2);
601    }
602
603    #[test]
604    fn test_dedup_removes_duplicates() {
605        let rows = vec![bm(&[("x", "1")]), bm(&[("x", "1")]), bm(&[("x", "2")])];
606        let deduped = UnionParallelExecutor::dedup(rows);
607        assert_eq!(deduped.len(), 2);
608    }
609
610    #[test]
611    fn test_dedup_empty() {
612        let deduped = UnionParallelExecutor::dedup(vec![]);
613        assert!(deduped.is_empty());
614    }
615}