declarative_dataflow/plan/
union.rs

1//! Union expression plan.
2
3use timely::dataflow::scopes::child::Iterative;
4use timely::dataflow::Scope;
5use timely::progress::Timestamp;
6
7use differential_dataflow::lattice::Lattice;
8use differential_dataflow::operators::Threshold;
9
10use crate::binding::Binding;
11use crate::plan::{Dependencies, ImplContext, Implementable};
12use crate::{CollectionRelation, Implemented, Relation, ShutdownHandle, Var, VariableMap};
13
14/// A plan stage taking the union over its sources. Frontends are
15/// responsible to ensure that the sources are union-compatible
16/// (i.e. bind all of the same variables in the same order).
17#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
18pub struct Union<P: Implementable> {
19    /// TODO
20    pub variables: Vec<Var>,
21    /// Plan for the data source.
22    pub plans: Vec<P>,
23}
24
25impl<P: Implementable> Implementable for Union<P> {
26    fn dependencies(&self) -> Dependencies {
27        let mut dependencies = Dependencies::none();
28
29        for plan in self.plans.iter() {
30            dependencies = Dependencies::merge(dependencies, plan.dependencies());
31        }
32
33        dependencies
34    }
35
36    fn into_bindings(&self) -> Vec<Binding> {
37        self.plans
38            .iter()
39            .flat_map(Implementable::into_bindings)
40            .collect()
41    }
42
43    fn implement<'b, T, I, S>(
44        &self,
45        nested: &mut Iterative<'b, S, u64>,
46        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
47        context: &mut I,
48    ) -> (Implemented<'b, S>, ShutdownHandle)
49    where
50        T: Timestamp + Lattice,
51        I: ImplContext<T>,
52        S: Scope<Timestamp = T>,
53    {
54        use differential_dataflow::AsCollection;
55        use timely::dataflow::operators::Concatenate;
56
57        let mut scope = nested.clone();
58        let mut shutdown_handle = ShutdownHandle::empty();
59
60        let streams = self.plans.iter().map(|plan| {
61            let relation = {
62                let (relation, shutdown) = plan.implement(&mut scope, local_arrangements, context);
63                shutdown_handle.merge_with(shutdown);
64                relation
65            };
66
67            let projected = {
68                let (projected, shutdown) =
69                    relation.projected(&mut scope, context, &self.variables);
70                shutdown_handle.merge_with(shutdown);
71                projected
72            };
73
74            projected.inner
75        });
76
77        let concat = nested.concatenate(streams).as_collection();
78
79        let concatenated = CollectionRelation {
80            variables: self.variables.to_vec(),
81            tuples: concat.distinct(),
82        };
83
84        (Implemented::Collection(concatenated), shutdown_handle)
85    }
86}