1use rayon::prelude::*;
19use std::collections::HashMap;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct BindingMap(pub HashMap<String, String>);
31
32impl BindingMap {
33 pub fn new() -> Self {
35 Self(HashMap::new())
36 }
37
38 pub fn from_pairs(
40 pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
41 ) -> Self {
42 Self(
43 pairs
44 .into_iter()
45 .map(|(k, v)| (k.into(), v.into()))
46 .collect(),
47 )
48 }
49
50 pub fn bind(&mut self, variable: impl Into<String>, value: impl Into<String>) {
52 self.0.insert(variable.into(), value.into());
53 }
54
55 pub fn get(&self, variable: &str) -> Option<&str> {
57 self.0.get(variable).map(|s| s.as_str())
58 }
59
60 pub fn len(&self) -> usize {
62 self.0.len()
63 }
64
65 pub fn is_empty(&self) -> bool {
67 self.0.is_empty()
68 }
69
70 pub fn merge(&mut self, other: &BindingMap) {
72 for (k, v) in &other.0 {
73 self.0.insert(k.clone(), v.clone());
74 }
75 }
76
77 pub fn compatible_merge(&self, other: &BindingMap) -> Option<BindingMap> {
83 for (k, v) in &other.0 {
85 if let Some(existing) = self.0.get(k) {
86 if existing != v {
87 return None;
88 }
89 }
90 }
91 let mut merged = self.clone();
92 for (k, v) in &other.0 {
93 merged.0.insert(k.clone(), v.clone());
94 }
95 Some(merged)
96 }
97
98 pub fn canonical_key(&self) -> String {
100 let mut pairs: Vec<(&str, &str)> = self
101 .0
102 .iter()
103 .map(|(k, v)| (k.as_str(), v.as_str()))
104 .collect();
105 pairs.sort_unstable();
106 pairs
107 .into_iter()
108 .map(|(k, v)| format!("{k}={v}"))
109 .collect::<Vec<_>>()
110 .join(";")
111 }
112}
113
114impl Default for BindingMap {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl std::fmt::Display for BindingMap {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 write!(f, "{{{}}}", self.canonical_key())
123 }
124}
125
126pub enum PipelineStage {
132 Map(Box<dyn Fn(BindingMap) -> Option<BindingMap> + Send + Sync>),
134 Filter(Box<dyn Fn(&BindingMap) -> bool + Send + Sync>),
136}
137
138impl PipelineStage {
139 pub fn apply(&self, row: BindingMap) -> Option<BindingMap> {
142 match self {
143 PipelineStage::Map(f) => f(row),
144 PipelineStage::Filter(pred) => {
145 if pred(&row) {
146 Some(row)
147 } else {
148 None
149 }
150 }
151 }
152 }
153}
154
155pub struct ParallelPipelineStage {
167 stages: Vec<PipelineStage>,
168}
169
170impl ParallelPipelineStage {
171 pub fn map_stage(f: impl Fn(BindingMap) -> Option<BindingMap> + Send + Sync + 'static) -> Self {
173 Self {
174 stages: vec![PipelineStage::Map(Box::new(f))],
175 }
176 }
177
178 pub fn filter_stage(pred: impl Fn(&BindingMap) -> bool + Send + Sync + 'static) -> Self {
180 Self {
181 stages: vec![PipelineStage::Filter(Box::new(pred))],
182 }
183 }
184
185 pub fn add_map(
187 mut self,
188 f: impl Fn(BindingMap) -> Option<BindingMap> + Send + Sync + 'static,
189 ) -> Self {
190 self.stages.push(PipelineStage::Map(Box::new(f)));
191 self
192 }
193
194 pub fn add_filter(
196 mut self,
197 pred: impl Fn(&BindingMap) -> bool + Send + Sync + 'static,
198 ) -> Self {
199 self.stages.push(PipelineStage::Filter(Box::new(pred)));
200 self
201 }
202
203 pub fn process(&self, inputs: Vec<BindingMap>) -> Vec<BindingMap> {
205 inputs
206 .into_iter()
207 .filter_map(|mut row| {
208 for stage in &self.stages {
209 row = stage.apply(row)?;
210 }
211 Some(row)
212 })
213 .collect()
214 }
215
216 pub fn chain(stages: Vec<Self>, inputs: Vec<BindingMap>) -> Vec<BindingMap> {
219 stages
220 .into_iter()
221 .flat_map(|pipeline| pipeline.process(inputs.clone()))
222 .collect()
223 }
224
225 pub fn stage_count(&self) -> usize {
227 self.stages.len()
228 }
229}
230
231pub struct UnionParallelExecutor;
240
241impl UnionParallelExecutor {
242 pub fn execute_branches(branches: Vec<Vec<BindingMap>>) -> Vec<BindingMap> {
248 if branches.is_empty() {
249 return vec![];
250 }
251 if branches.len() == 1 {
252 return branches.into_iter().next().unwrap_or_default();
253 }
254
255 let merged: Vec<BindingMap> = branches
257 .into_par_iter()
258 .flat_map(|branch| branch.into_par_iter())
259 .collect();
260
261 Self::dedup(merged)
262 }
263
264 pub fn execute_optional(main: Vec<BindingMap>, optional: Vec<BindingMap>) -> Vec<BindingMap> {
271 if optional.is_empty() {
272 return main;
273 }
274
275 main.into_par_iter()
276 .flat_map(|main_row| {
277 let compatible: Vec<BindingMap> = optional
278 .iter()
279 .filter_map(|opt_row| main_row.compatible_merge(opt_row))
280 .collect();
281 if compatible.is_empty() {
282 vec![main_row]
283 } else {
284 compatible
285 }
286 })
287 .collect()
288 }
289
290 pub fn dedup(rows: Vec<BindingMap>) -> Vec<BindingMap> {
292 let mut seen = std::collections::HashSet::new();
293 rows.into_iter()
294 .filter(|row| seen.insert(row.canonical_key()))
295 .collect()
296 }
297}
298
299#[cfg(test)]
304mod tests {
305 use super::*;
306
307 fn bm(pairs: &[(&str, &str)]) -> BindingMap {
308 BindingMap::from_pairs(pairs.iter().map(|&(k, v)| (k, v)))
309 }
310
311 #[test]
316 fn test_binding_map_new_empty() {
317 let m = BindingMap::new();
318 assert!(m.is_empty());
319 assert_eq!(m.len(), 0);
320 }
321
322 #[test]
323 fn test_binding_map_from_pairs() {
324 let m = bm(&[("s", "http://ex.org/s"), ("p", "http://ex.org/p")]);
325 assert_eq!(m.get("s"), Some("http://ex.org/s"));
326 assert_eq!(m.get("p"), Some("http://ex.org/p"));
327 assert_eq!(m.len(), 2);
328 }
329
330 #[test]
331 fn test_binding_map_bind() {
332 let mut m = BindingMap::new();
333 m.bind("x", "value");
334 assert_eq!(m.get("x"), Some("value"));
335 }
336
337 #[test]
338 fn test_binding_map_get_missing() {
339 let m = BindingMap::new();
340 assert_eq!(m.get("missing"), None);
341 }
342
343 #[test]
344 fn test_binding_map_merge_override() {
345 let mut m1 = bm(&[("a", "1"), ("b", "2")]);
346 let m2 = bm(&[("b", "override"), ("c", "3")]);
347 m1.merge(&m2);
348 assert_eq!(m1.get("b"), Some("override"));
349 assert_eq!(m1.get("c"), Some("3"));
350 }
351
352 #[test]
353 fn test_binding_map_compatible_merge_compatible() {
354 let m1 = bm(&[("s", "http://ex.org/s"), ("p", "http://ex.org/p")]);
355 let m2 = bm(&[("s", "http://ex.org/s"), ("o", "http://ex.org/o")]);
356 let result = m1.compatible_merge(&m2);
357 assert!(result.is_some());
358 let merged = result.unwrap();
359 assert_eq!(merged.get("o"), Some("http://ex.org/o"));
360 }
361
362 #[test]
363 fn test_binding_map_compatible_merge_incompatible() {
364 let m1 = bm(&[("s", "http://ex.org/s1")]);
365 let m2 = bm(&[("s", "http://ex.org/s2")]);
366 assert!(m1.compatible_merge(&m2).is_none());
367 }
368
369 #[test]
370 fn test_binding_map_canonical_key_stable() {
371 let m = bm(&[("z", "3"), ("a", "1"), ("m", "2")]);
372 let key = m.canonical_key();
373 assert!(key.starts_with("a=1"));
375 }
376
377 #[test]
378 fn test_binding_map_display() {
379 let m = bm(&[("x", "1")]);
380 let s = format!("{m}");
381 assert!(s.contains("x=1"));
382 }
383
384 #[test]
385 fn test_binding_map_default() {
386 let m = BindingMap::default();
387 assert!(m.is_empty());
388 }
389
390 #[test]
395 fn test_pipeline_stage_map_passes() {
396 let stage = PipelineStage::Map(Box::new(|mut bm| {
397 bm.bind("extra", "value");
398 Some(bm)
399 }));
400 let row = bm(&[("x", "1")]);
401 let result = stage.apply(row);
402 assert!(result.is_some());
403 assert_eq!(result.unwrap().get("extra"), Some("value"));
404 }
405
406 #[test]
407 fn test_pipeline_stage_map_removes() {
408 let stage = PipelineStage::Map(Box::new(|_| None));
409 let row = bm(&[("x", "1")]);
410 assert!(stage.apply(row).is_none());
411 }
412
413 #[test]
414 fn test_pipeline_stage_filter_passes() {
415 let stage = PipelineStage::Filter(Box::new(|_| true));
416 let row = bm(&[("x", "1")]);
417 assert!(stage.apply(row).is_some());
418 }
419
420 #[test]
421 fn test_pipeline_stage_filter_removes() {
422 let stage = PipelineStage::Filter(Box::new(|_| false));
423 let row = bm(&[("x", "1")]);
424 assert!(stage.apply(row).is_none());
425 }
426
427 #[test]
432 fn test_pipeline_map_stage_constructor() {
433 let pipeline = ParallelPipelineStage::map_stage(|mut b| {
434 b.bind("new", "val");
435 Some(b)
436 });
437 assert_eq!(pipeline.stage_count(), 1);
438 }
439
440 #[test]
441 fn test_pipeline_filter_stage_constructor() {
442 let pipeline = ParallelPipelineStage::filter_stage(|_| true);
443 assert_eq!(pipeline.stage_count(), 1);
444 }
445
446 #[test]
447 fn test_pipeline_add_map() {
448 let pipeline = ParallelPipelineStage::filter_stage(|_| true).add_map(Some);
449 assert_eq!(pipeline.stage_count(), 2);
450 }
451
452 #[test]
453 fn test_pipeline_add_filter() {
454 let pipeline = ParallelPipelineStage::map_stage(Some).add_filter(|_| true);
455 assert_eq!(pipeline.stage_count(), 2);
456 }
457
458 #[test]
459 fn test_pipeline_process_map_all() {
460 let pipeline = ParallelPipelineStage::map_stage(|mut b| {
461 b.bind("added", "yes");
462 Some(b)
463 });
464 let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
465 let result = pipeline.process(inputs);
466 assert_eq!(result.len(), 2);
467 for r in &result {
468 assert_eq!(r.get("added"), Some("yes"));
469 }
470 }
471
472 #[test]
473 fn test_pipeline_process_filter_some() {
474 let pipeline = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("1"));
475 let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")]), bm(&[("x", "1")])];
476 let result = pipeline.process(inputs);
477 assert_eq!(result.len(), 2);
478 }
479
480 #[test]
481 fn test_pipeline_process_filter_all_out() {
482 let pipeline = ParallelPipelineStage::filter_stage(|_| false);
483 let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
484 let result = pipeline.process(inputs);
485 assert!(result.is_empty());
486 }
487
488 #[test]
489 fn test_pipeline_process_empty_input() {
490 let pipeline = ParallelPipelineStage::map_stage(Some);
491 let result = pipeline.process(vec![]);
492 assert!(result.is_empty());
493 }
494
495 #[test]
496 fn test_pipeline_chain_two_pipelines() {
497 let p1 = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("1"));
498 let p2 = ParallelPipelineStage::filter_stage(|b| b.get("x") == Some("2"));
499 let inputs = vec![bm(&[("x", "1")]), bm(&[("x", "2")]), bm(&[("x", "3")])];
500 let result = ParallelPipelineStage::chain(vec![p1, p2], inputs);
501 assert_eq!(result.len(), 2);
503 }
504
505 #[test]
506 fn test_pipeline_chain_empty_stages() {
507 let result = ParallelPipelineStage::chain(vec![], vec![bm(&[("x", "1")])]);
508 assert!(result.is_empty());
509 }
510
511 #[test]
512 fn test_pipeline_multi_stage_composition() {
513 let pipeline = ParallelPipelineStage::map_stage(|mut b| {
515 let x = b.get("x").unwrap_or("").to_string();
516 b.bind("label", format!("item_{x}"));
517 Some(b)
518 })
519 .add_filter(|b| b.get("label").is_some_and(|l| l.starts_with("item_")));
520
521 let inputs = vec![bm(&[("x", "a")]), bm(&[("x", "b")])];
522 let result = pipeline.process(inputs);
523 assert_eq!(result.len(), 2);
524 assert_eq!(result[0].get("label"), Some("item_a"));
525 }
526
527 #[test]
532 fn test_union_executor_empty() {
533 let result = UnionParallelExecutor::execute_branches(vec![]);
534 assert!(result.is_empty());
535 }
536
537 #[test]
538 fn test_union_executor_single_branch() {
539 let branch = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
540 let result = UnionParallelExecutor::execute_branches(vec![branch]);
541 assert_eq!(result.len(), 2);
542 }
543
544 #[test]
545 fn test_union_executor_multiple_branches_dedup() {
546 let b1 = vec![bm(&[("x", "1")]), bm(&[("x", "2")])];
547 let b2 = vec![bm(&[("x", "2")]), bm(&[("x", "3")])]; let result = UnionParallelExecutor::execute_branches(vec![b1, b2]);
549 assert_eq!(result.len(), 3, "duplicate should be removed");
550 }
551
552 #[test]
553 fn test_union_executor_multiple_branches_no_overlap() {
554 let b1 = vec![bm(&[("x", "1")])];
555 let b2 = vec![bm(&[("x", "2")])];
556 let b3 = vec![bm(&[("x", "3")])];
557 let result = UnionParallelExecutor::execute_branches(vec![b1, b2, b3]);
558 assert_eq!(result.len(), 3);
559 }
560
561 #[test]
562 fn test_optional_executor_empty_optional() {
563 let main = vec![bm(&[("s", "s1")]), bm(&[("s", "s2")])];
564 let result = UnionParallelExecutor::execute_optional(main.clone(), vec![]);
565 assert_eq!(result.len(), 2);
567 }
568
569 #[test]
570 fn test_optional_executor_compatible_rows() {
571 let main = vec![bm(&[("s", "s1")])];
573 let optional = vec![bm(&[("s", "s1"), ("o", "o1")])];
574 let result = UnionParallelExecutor::execute_optional(main, optional);
575 assert_eq!(result.len(), 1);
576 assert_eq!(result[0].get("o"), Some("o1"));
577 }
578
579 #[test]
580 fn test_optional_executor_no_compatible_rows() {
581 let main = vec![bm(&[("s", "s1")])];
583 let optional = vec![bm(&[("s", "s2"), ("o", "o1")])];
584 let result = UnionParallelExecutor::execute_optional(main, optional);
585 assert_eq!(result.len(), 1);
587 assert_eq!(result[0].get("s"), Some("s1"));
588 assert_eq!(result[0].get("o"), None);
589 }
590
591 #[test]
592 fn test_optional_executor_multiple_compatible() {
593 let main = vec![bm(&[("s", "s1")])];
595 let optional = vec![
596 bm(&[("s", "s1"), ("o", "o1")]),
597 bm(&[("s", "s1"), ("o", "o2")]),
598 ];
599 let result = UnionParallelExecutor::execute_optional(main, optional);
600 assert_eq!(result.len(), 2);
601 }
602
603 #[test]
604 fn test_dedup_removes_duplicates() {
605 let rows = vec![bm(&[("x", "1")]), bm(&[("x", "1")]), bm(&[("x", "2")])];
606 let deduped = UnionParallelExecutor::dedup(rows);
607 assert_eq!(deduped.len(), 2);
608 }
609
610 #[test]
611 fn test_dedup_empty() {
612 let deduped = UnionParallelExecutor::dedup(vec![]);
613 assert!(deduped.is_empty());
614 }
615}