use rayon::prelude::*;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BindingMap(pub HashMap<String, String>);
impl BindingMap {
pub fn new() -> Self {
Self(HashMap::new())
}
pub fn from_pairs(
pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
Self(
pairs
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect(),
)
}
pub fn bind(&mut self, variable: impl Into<String>, value: impl Into<String>) {
self.0.insert(variable.into(), value.into());
}
pub fn get(&self, variable: &str) -> Option<&str> {
self.0.get(variable).map(|s| s.as_str())
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn merge(&mut self, other: &BindingMap) {
for (k, v) in &other.0 {
self.0.insert(k.clone(), v.clone());
}
}
pub fn compatible_merge(&self, other: &BindingMap) -> Option<BindingMap> {
for (k, v) in &other.0 {
if let Some(existing) = self.0.get(k) {
if existing != v {
return None;
}
}
}
let mut merged = self.clone();
for (k, v) in &other.0 {
merged.0.insert(k.clone(), v.clone());
}
Some(merged)
}
pub fn canonical_key(&self) -> String {
let mut pairs: Vec<(&str, &str)> = self
.0
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
pairs.sort_unstable();
pairs
.into_iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(";")
}
}
impl Default for BindingMap {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for BindingMap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{{}}}", self.canonical_key())
}
}
pub enum PipelineStage {
Map(Box<dyn Fn(BindingMap) -> Option<BindingMap> + Send + Sync>),
Filter(Box<dyn Fn(&BindingMap) -> bool + Send + Sync>),
}
impl PipelineStage {
pub fn apply(&self, row: BindingMap) -> Option<BindingMap> {
match self {
PipelineStage::Map(f) => f(row),
PipelineStage::Filter(pred) => {
if pred(&row) {
Some(row)
} else {
None
}
}
}
}
}
pub struct ParallelPipelineStage {
stages: Vec<PipelineStage>,
}
impl ParallelPipelineStage {
pub fn map_stage(f: impl Fn(BindingMap) -> Option<BindingMap> + Send + Sync + 'static) -> Self {
Self {
stages: vec![PipelineStage::Map(Box::new(f))],
}
}
pub fn filter_stage(pred: impl Fn(&BindingMap) -> bool + Send + Sync + 'static) -> Self {
Self {
stages: vec![PipelineStage::Filter(Box::new(pred))],
}
}
pub fn add_map(
mut self,
f: impl Fn(BindingMap) -> Option<BindingMap> + Send + Sync + 'static,
) -> Self {
self.stages.push(PipelineStage::Map(Box::new(f)));
self
}
pub fn add_filter(
mut self,
pred: impl Fn(&BindingMap) -> bool + Send + Sync + 'static,
) -> Self {
self.stages.push(PipelineStage::Filter(Box::new(pred)));
self
}
pub fn process(&self, inputs: Vec<BindingMap>) -> Vec<BindingMap> {
inputs
.into_iter()
.filter_map(|mut row| {
for stage in &self.stages {
row = stage.apply(row)?;
}
Some(row)
})
.collect()
}
pub fn chain(stages: Vec<Self>, inputs: Vec<BindingMap>) -> Vec<BindingMap> {
stages
.into_iter()
.flat_map(|pipeline| pipeline.process(inputs.clone()))
.collect()
}
pub fn stage_count(&self) -> usize {
self.stages.len()
}
}
pub struct UnionParallelExecutor;
impl UnionParallelExecutor {
pub fn execute_branches(branches: Vec<Vec<BindingMap>>) -> Vec<BindingMap> {
if branches.is_empty() {
return vec![];
}
if branches.len() == 1 {
return branches.into_iter().next().unwrap_or_default();
}
let merged: Vec<BindingMap> = branches
.into_par_iter()
.flat_map(|branch| branch.into_par_iter())
.collect();
Self::dedup(merged)
}
pub fn execute_optional(main: Vec<BindingMap>, optional: Vec<BindingMap>) -> Vec<BindingMap> {
if optional.is_empty() {
return main;
}
main.into_par_iter()
.flat_map(|main_row| {
let compatible: Vec<BindingMap> = optional
.iter()
.filter_map(|opt_row| main_row.compatible_merge(opt_row))
.collect();
if compatible.is_empty() {
vec![main_row]
} else {
compatible
}
})
.collect()
}
pub fn dedup(rows: Vec<BindingMap>) -> Vec<BindingMap> {
let mut seen = std::collections::HashSet::new();
rows.into_iter()
.filter(|row| seen.insert(row.canonical_key()))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn bm(pairs: &[(&str, &str)]) -> BindingMap {
BindingMap::from_pairs(pairs.iter().map(|&(k, v)| (k, v)))
}
#[test]
fn test_binding_map_new_empty() {
let m = BindingMap::new();
assert!(m.is_empty());
assert_eq!(m.len(), 0);
}
#[test]
fn test_binding_map_from_pairs() {
let m = bm(&[("s", "http://ex.org/s"), ("p", "http://ex.org/p")]);
assert_eq!(m.get("s"), Some("http://ex.org/s"));
assert_eq!(m.get("p"), Some("http://ex.org/p"));
assert_eq!(m.len(), 2);
}
#[test]
fn test_binding_map_bind() {
let mut m = BindingMap::new();
m.bind("x", "value");
assert_eq!(m.get("x"), Some("value"));
}
#[test]
fn test_binding_map_get_missing() {
let m = BindingMap::new();
assert_eq!(m.get("missing"), None);
}
#[test]
fn test_binding_map_merge_override() {
let mut m1 = bm(&[("a", "1"), ("b", "2")]);
let m2 = bm(&[("b", "override"), ("c", "3")]);
m1.merge(&m2);
assert_eq!(m1.get("b"), Some("override"));
assert_eq!(m1.get("c"), Some("3"));
}
#[test]
fn test_binding_map_compatible_merge_compatible() {
let m1 = bm(&[("s", "http://ex.org/s"), ("p", "http://ex.org/p")]);
let m2 = bm(&[("s", "http://ex.org/s"), ("o", "http://ex.org/o")]);
let result = m1.compatible_merge(&m2);
assert!(result.is_some());
let merged = result.unwrap();
assert_eq!(merged.get("o"), Some("http://ex.org/o"));
}
#[test]
fn test_binding_map_compatible_merge_incompatible() {
let m1 = bm(&[("s", "http://ex.org/s1")]);
let m2 = bm(&[("s", "http://ex.org/s2")]);
assert!(m1.compatible_merge(&m2).is_none());
}
#[test]
fn test_binding_map_canonical_key_stable() {
let m = bm(&[("z", "3"), ("a", "1"), ("m", "2")]);
let key = m.canonical_key();
assert!(key.starts_with("a=1"));
}
#[test]
fn test_binding_map_display() {
let m = bm(&[("x", "1")]);
let s = format!("{m}");
assert!(s.contains("x=1"));
}
#[test]
fn test_binding_map_default() {
let m = BindingMap::default();
assert!(m.is_empty());
}
#[test]
fn test_pipeline_stage_map_passes() {
let stage = PipelineStage::Map(Box::new(|mut bm| {
bm.bind("extra", "value");
Some(bm)
}));
let row = bm(&[("x", "1")]);
let result = stage.apply(row);
assert!(result.is_some());
assert_eq!(result.unwrap().get("extra"), Some("value"));
}
#[test]
fn test_pipeline_stage_map_removes() {
let stage = PipelineStage::Map(Box::new(|_| None));
let row = bm(&[("x", "1")]);
assert!(stage.apply(row).is_none());
}
#[test]
fn test_pipeline_stage_filter_passes() {
let stage = PipelineStage::Filter(Box::new(|_| true));
let row = bm(&[("x", "1")]);
assert!(stage.apply(row).is_some());
}
#[test]
fn test_pipeline_stage_filter_removes() {
let stage = PipelineStage::Filter(Box::new(|_| false));
let row = bm(&[("x", "1")]);
assert!(stage.apply(row).is_none());
}
#[test]
fn test_pipeline_map_stage_constructor() {
let pipeline = ParallelPipelineStage::map_stage(|mut b| {
b.bind("new", "val");
Some(b)
});
assert_eq!(pipeline.stage_count(), 1);
}
#[test]
fn test_pipeline_filter_stage_constructor() {
let pipeline = ParallelPipelineStage::filter_stage(|_| true);
assert_eq!(pipeline.stage_count(), 1);
}
#[test]
fn test_pipeline_add_map() {
let pipeline = ParallelPipelineStage::filter_stage(|_| true).add_map(Some);
assert_eq!(pipeline.stage_count(), 2);
}
#[test]
fn test_pipeline_add_filter() {
let pipeline = ParallelPipelineStage::map_stage(Some).add_filter(|_| true);
assert_eq!(pipeline.stage_count(), 2);
}
#[test]
fn test_pipeline_process_map_all() {
let pipeline = ParallelPipelineStage::map_stage(|mut b| {
b.bind("added", "yes");
Some(b)
});
let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
let result = pipeline.process(inputs);
assert_eq!(result.len(), 2);
for r in &result {
assert_eq!(r.get("added"), Some("yes"));
}
}
#[test]
fn test_pipeline_process_filter_some() {
let pipeline = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("1"));
let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")]), bm(&[("x", "1")])];
let result = pipeline.process(inputs);
assert_eq!(result.len(), 2);
}
#[test]
fn test_pipeline_process_filter_all_out() {
let pipeline = ParallelPipelineStage::filter_stage(|_| false);
let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
let result = pipeline.process(inputs);
assert!(result.is_empty());
}
#[test]
fn test_pipeline_process_empty_input() {
let pipeline = ParallelPipelineStage::map_stage(Some);
let result = pipeline.process(vec![]);
assert!(result.is_empty());
}
#[test]
fn test_pipeline_chain_two_pipelines() {
let p1 = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("1"));
let p2 = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("2"));
let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")]), bm(&[("x", "3")])];
let result = ParallelPipelineStage::chain(vec![p1, p2], inputs);
assert_eq!(result.len(), 2);
}
#[test]
fn test_pipeline_chain_empty_stages() {
let result = ParallelPipelineStage::chain(vec![], vec![bm(&[("x", "1")])]);
assert!(result.is_empty());
}
#[test]
fn test_pipeline_multi_stage_composition() {
let pipeline = ParallelPipelineStage::map_stage(|mut b| {
let x = b.get("x").unwrap_or("").to_string();
b.bind("label", format!("item_{x}"));
Some(b)
})
.add_filter(|b| b.get("label").is_some_and(|l| l.starts_with("item_")));
let inputs = vec![bm(&[("x", "a")]), bm(&[("x", "b")])];
let result = pipeline.process(inputs);
assert_eq!(result.len(), 2);
assert_eq!(result[0].get("label"), Some("item_a"));
}
#[test]
fn test_union_executor_empty() {
let result = UnionParallelExecutor::execute_branches(vec![]);
assert!(result.is_empty());
}
#[test]
fn test_union_executor_single_branch() {
let branch = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
let result = UnionParallelExecutor::execute_branches(vec![branch]);
assert_eq!(result.len(), 2);
}
#[test]
fn test_union_executor_multiple_branches_dedup() {
let b1 = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
let b2 = vec![bm(&[("x", "2")]), bm(&[("x", "3")])]; let result = UnionParallelExecutor::execute_branches(vec![b1, b2]);
assert_eq!(result.len(), 3, "duplicate should be removed");
}
#[test]
fn test_union_executor_multiple_branches_no_overlap() {
let b1 = vec![bm(&[("x", "1")])];
let b2 = vec![bm(&[("x", "2")])];
let b3 = vec![bm(&[("x", "3")])];
let result = UnionParallelExecutor::execute_branches(vec![b1, b2, b3]);
assert_eq!(result.len(), 3);
}
#[test]
fn test_optional_executor_empty_optional() {
let main = vec![bm(&[("s", "s1")]), bm(&[("s", "s2")])];
let result = UnionParallelExecutor::execute_optional(main.clone(), vec![]);
assert_eq!(result.len(), 2);
}
#[test]
fn test_optional_executor_compatible_rows() {
let main = vec![bm(&[("s", "s1")])];
let optional = vec![bm(&[("s", "s1"), ("o", "o1")])];
let result = UnionParallelExecutor::execute_optional(main, optional);
assert_eq!(result.len(), 1);
assert_eq!(result[0].get("o"), Some("o1"));
}
#[test]
fn test_optional_executor_no_compatible_rows() {
let main = vec![bm(&[("s", "s1")])];
let optional = vec![bm(&[("s", "s2"), ("o", "o1")])];
let result = UnionParallelExecutor::execute_optional(main, optional);
assert_eq!(result.len(), 1);
assert_eq!(result[0].get("s"), Some("s1"));
assert_eq!(result[0].get("o"), None);
}
#[test]
fn test_optional_executor_multiple_compatible() {
let main = vec![bm(&[("s", "s1")])];
let optional = vec![
bm(&[("s", "s1"), ("o", "o1")]),
bm(&[("s", "s1"), ("o", "o2")]),
];
let result = UnionParallelExecutor::execute_optional(main, optional);
assert_eq!(result.len(), 2);
}
#[test]
fn test_dedup_removes_duplicates() {
let rows = vec![bm(&[("x", "1")]), bm(&[("x", "1")]), bm(&[("x", "2")])];
let deduped = UnionParallelExecutor::dedup(rows);
assert_eq!(deduped.len(), 2);
}
#[test]
fn test_dedup_empty() {
let deduped = UnionParallelExecutor::dedup(vec![]);
assert!(deduped.is_empty());
}
}