declarative_dataflow/plan/
union.rs1use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8use differential_dataflow::operators::Threshold;
9
10use crate::binding::Binding;
11use crate::plan::{Dependencies, ImplContext, Implementable};
12use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Var, VariableMap};
13
14#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
18pub struct Union<P: Implementable> {
19 pub variables: Vec<Var>,
21 pub plans: Vec<P>,
23}
24
25impl<P: Implementable> Implementable for Union<P> {
26 fn dependencies(&self) -> Dependencies {
27 let mut dependencies = Dependencies::none();
28
29 for plan in self.plans.iter() {
30 dependencies = Dependencies::merge(dependencies, plan.dependencies());
31 }
32
33 dependencies
34 }
35
36 fn into_bindings(&self) -> Vec<Binding> {
37 self.plans
38 .iter()
39 .flat_map(Implementable::into_bindings)
40 .collect()
41 }
42
43 fn implement<'b, T, I, S>(
44 &self,
45 nested: &mut Iterative<'b, S, u64>,
46 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
47 context: &mut I,
48 ) -> (Implemented<'b, S>, ShutdownHandle)
49 where
50 T: Timestamp + Lattice,
51 I: ImplContext<T>,
52 S: Scope<Timestamp = T>,
53 {
54 use differential_dataflow::AsCollection;
55 use timely::dataflow::operators::Concatenate;
56
57 let mut scope = nested.clone();
58 let mut shutdown_handle = ShutdownHandle::empty();
59
60 let streams = self.plans.iter().map(|plan| {
61 let relation = {
62 let (relation, shutdown) = plan.implement(&mut scope, local_arrangements, context);
63 shutdown_handle.merge_with(shutdown);
64 relation
65 };
66
67 let projected = {
68 let (projected, shutdown) =
69 relation.projected(&mut scope, context, &self.variables);
70 shutdown_handle.merge_with(shutdown);
71 projected
72 };
73
74 projected.inner
75 });
76
77 let concat = nested.concatenate(streams).as_collection();
78
79 let concatenated = CollectionRelation {
80 variables: self.variables.to_vec(),
81 tuples: concat.distinct(),
82 };
83
84 (Implemented::Collection(concatenated), shutdown_handle)
85 }
86}