declarative_dataflow/plan/
mod.rs

1//! Types and traits for implementing query plans.
2
3use std::collections::HashSet;
4use std::ops::Deref;
5use std::sync::atomic::{self, AtomicUsize};
6
7use timely::dataflow::scopes::child::Iterative;
8use timely::dataflow::Scope;
9use timely::order::Product;
10use timely::progress::Timestamp;
11
12use differential_dataflow::lattice::Lattice;
13use differential_dataflow::trace::TraceReader;
14
15use crate::binding::{AsBinding, AttributeBinding, Binding};
16use crate::Rule;
17use crate::{Aid, Eid, Value, Var};
18use crate::{
19    CollectionRelation, Implemented, Relation, RelationHandle, ShutdownHandle, VariableMap,
20};
21use crate::{TraceKeyHandle, TraceValHandle};
22
23#[cfg(feature = "set-semantics")]
24pub mod aggregate;
25#[cfg(not(feature = "set-semantics"))]
26pub mod aggregate_neu;
27pub mod antijoin;
28pub mod filter;
29#[cfg(feature = "graphql")]
30pub mod graphql;
31#[cfg(feature = "graphql")]
32pub mod graphql_v2;
33pub mod hector;
34pub mod join;
35pub mod project;
36pub mod pull;
37pub mod pull_v2;
38pub mod transform;
39pub mod union;
40
41#[cfg(feature = "set-semantics")]
42pub use self::aggregate::{Aggregate, AggregationFn};
43#[cfg(not(feature = "set-semantics"))]
44pub use self::aggregate_neu::{Aggregate, AggregationFn};
45pub use self::antijoin::Antijoin;
46pub use self::filter::{Filter, Predicate};
47#[cfg(feature = "graphql")]
48pub use self::graphql::GraphQl;
49pub use self::hector::Hector;
50pub use self::join::Join;
51pub use self::project::Project;
52pub use self::pull::{Pull, PullAll, PullLevel};
53pub use self::transform::{Function, Transform};
54pub use self::union::Union;
55
56static ID: AtomicUsize = AtomicUsize::new(0);
57static SYM: AtomicUsize = AtomicUsize::new(std::usize::MAX);
58
59/// @FIXME
60pub fn next_id() -> Eid {
61    ID.fetch_add(1, atomic::Ordering::SeqCst) as Eid
62}
63
64/// @FIXME
65pub fn gensym() -> Var {
66    SYM.fetch_sub(1, atomic::Ordering::SeqCst) as Var
67}
68
69/// A thing that can provide global state required during the
70/// implementation of plans.
71pub trait ImplContext<T>
72where
73    T: Timestamp + Lattice,
74{
75    /// Returns the definition for the rule of the given name.
76    fn rule(&self, name: &str) -> Option<&Rule>;
77
78    /// Returns a mutable reference to a (non-base) relation, if one
79    /// is registered under the given name.
80    fn global_arrangement(&mut self, name: &str) -> Option<&mut RelationHandle<T>>;
81
82    /// Checks whether an attribute of that name exists.
83    fn has_attribute(&self, name: &str) -> bool;
84
85    /// Retrieves the forward count trace for the specified aid.
86    fn forward_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>>;
87
88    /// Retrieves the forward propose trace for the specified aid.
89    fn forward_propose(
90        &mut self,
91        name: &str,
92    ) -> Option<&mut TraceValHandle<Value, Value, T, isize>>;
93
94    /// Retrieves the forward validate trace for the specified aid.
95    fn forward_validate(
96        &mut self,
97        name: &str,
98    ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>>;
99
100    /// Retrieves the reverse count trace for the specified aid.
101    fn reverse_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>>;
102
103    /// Retrieves the reverse propose trace for the specified aid.
104    fn reverse_propose(
105        &mut self,
106        name: &str,
107    ) -> Option<&mut TraceValHandle<Value, Value, T, isize>>;
108
109    /// Retrieves the reverse validate trace for the specified aid.
110    fn reverse_validate(
111        &mut self,
112        name: &str,
113    ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>>;
114
115    /// Returns the current opinion as to whether this rule is
116    /// underconstrained. Underconstrained rules cannot be safely
117    /// materialized and re-used on their own (i.e. without more
118    /// specific constraints).
119    fn is_underconstrained(&self, name: &str) -> bool;
120}
121
122/// Description of everything a plan needs prior to synthesis.
123pub struct Dependencies {
124    /// NameExpr's used by this plan.
125    pub names: HashSet<String>,
126    /// Attributes queries in Match* expressions.
127    pub attributes: HashSet<Aid>,
128}
129
130impl Dependencies {
131    /// A description representing a dependency on nothing.
132    pub fn none() -> Dependencies {
133        Dependencies {
134            names: HashSet::new(),
135            attributes: HashSet::new(),
136        }
137    }
138
139    /// A description representing a dependency on a single name.
140    pub fn name(name: &str) -> Dependencies {
141        let mut names = HashSet::new();
142        names.insert(name.to_string());
143
144        Dependencies {
145            names,
146            attributes: HashSet::new(),
147        }
148    }
149
150    /// A description representing a dependency on a single attribute.
151    pub fn attribute(aid: &str) -> Dependencies {
152        let mut attributes = HashSet::new();
153        attributes.insert(aid.to_string());
154
155        Dependencies {
156            names: HashSet::new(),
157            attributes,
158        }
159    }
160
161    /// Merges two dependency descriptions into one, representing
162    /// their union.
163    pub fn merge(left: Dependencies, right: Dependencies) -> Dependencies {
164        Dependencies {
165            names: left.names.union(&right.names).cloned().collect(),
166            attributes: left.attributes.union(&right.attributes).cloned().collect(),
167        }
168    }
169}
170
171/// A type that can be implemented as a simple relation.
172pub trait Implementable {
173    /// Returns names of any other implementable things that need to
174    /// be available before implementing this one. Attributes are not
175    /// mentioned explicitley as dependencies.
176    fn dependencies(&self) -> Dependencies;
177
178    /// Transforms an implementable into an equivalent set of bindings
179    /// that can be unified by Hector.
180    fn into_bindings(&self) -> Vec<Binding> {
181        panic!("This plan can't be implemented via Hector.");
182    }
183
184    /// @TODO
185    fn datafy(&self) -> Vec<(Eid, Aid, Value)> {
186        Vec::new()
187    }
188
189    /// Implements the type as a simple relation.
190    fn implement<'b, T, I, S>(
191        &self,
192        nested: &mut Iterative<'b, S, u64>,
193        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
194        context: &mut I,
195    ) -> (Implemented<'b, S>, ShutdownHandle)
196    where
197        T: Timestamp + Lattice,
198        I: ImplContext<T>,
199        S: Scope<Timestamp = T>;
200}
201
202/// Possible query plan types.
203#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
204pub enum Plan {
205    /// Projection
206    Project(Project<Plan>),
207    /// Aggregation
208    Aggregate(Aggregate<Plan>),
209    /// Union
210    Union(Union<Plan>),
211    /// Equijoin
212    Join(Join<Plan, Plan>),
213    /// WCO
214    Hector(Hector),
215    /// Antijoin
216    Antijoin(Antijoin<Plan, Plan>),
217    /// Negation
218    Negate(Box<Plan>),
219    /// Filters bindings by one of the built-in predicates
220    Filter(Filter<Plan>),
221    /// Transforms a binding by a function expression
222    Transform(Transform<Plan>),
223    /// Data pattern of the form [?e a ?v]
224    MatchA(Var, Aid, Var),
225    /// Data pattern of the form [e a ?v]
226    MatchEA(Eid, Aid, Var),
227    /// Data pattern of the form [?e a v]
228    MatchAV(Var, Aid, Value),
229    /// Sources data from another relation.
230    NameExpr(Vec<Var>, String),
231    /// Pull expression
232    Pull(Pull<Plan>),
233    /// Single-level pull expression
234    PullLevel(PullLevel<Plan>),
235    /// Single-level pull expression
236    PullAll(PullAll),
237    /// GraphQl pull expression
238    #[cfg(feature = "graphql")]
239    GraphQl(GraphQl),
240}
241
242impl Plan {
243    /// Returns the variables bound by this plan.
244    pub fn variables(&self) -> Vec<Var> {
245        match *self {
246            Plan::Project(ref projection) => projection.variables.clone(),
247            Plan::Aggregate(ref aggregate) => aggregate.variables.clone(),
248            Plan::Union(ref union) => union.variables.clone(),
249            Plan::Join(ref join) => join.variables.clone(),
250            Plan::Hector(ref hector) => hector.variables.clone(),
251            Plan::Antijoin(ref antijoin) => antijoin.variables.clone(),
252            Plan::Negate(ref plan) => plan.variables(),
253            Plan::Filter(ref filter) => filter.variables.clone(),
254            Plan::Transform(ref transform) => transform.variables.clone(),
255            Plan::MatchA(e, _, v) => vec![e, v],
256            Plan::MatchEA(_, _, v) => vec![v],
257            Plan::MatchAV(e, _, _) => vec![e],
258            Plan::NameExpr(ref variables, ref _name) => variables.clone(),
259            Plan::Pull(ref pull) => pull.variables.clone(),
260            Plan::PullLevel(ref path) => path.variables.clone(),
261            Plan::PullAll(ref path) => path.variables.clone(),
262            #[cfg(feature = "graphql")]
263            Plan::GraphQl(_) => unimplemented!(),
264        }
265    }
266}
267
268impl Implementable for Plan {
269    fn dependencies(&self) -> Dependencies {
270        // @TODO provide a general fold for plans
271        match *self {
272            Plan::Project(ref projection) => projection.dependencies(),
273            Plan::Aggregate(ref aggregate) => aggregate.dependencies(),
274            Plan::Union(ref union) => union.dependencies(),
275            Plan::Join(ref join) => join.dependencies(),
276            Plan::Hector(ref hector) => hector.dependencies(),
277            Plan::Antijoin(ref antijoin) => antijoin.dependencies(),
278            Plan::Negate(ref plan) => plan.dependencies(),
279            Plan::Filter(ref filter) => filter.dependencies(),
280            Plan::Transform(ref transform) => transform.dependencies(),
281            Plan::MatchA(_, ref a, _) => Dependencies::attribute(a),
282            Plan::MatchEA(_, ref a, _) => Dependencies::attribute(a),
283            Plan::MatchAV(_, ref a, _) => Dependencies::attribute(a),
284            Plan::NameExpr(_, ref name) => Dependencies::name(name),
285            Plan::Pull(ref pull) => pull.dependencies(),
286            Plan::PullLevel(ref path) => path.dependencies(),
287            Plan::PullAll(ref path) => path.dependencies(),
288            #[cfg(feature = "graphql")]
289            Plan::GraphQl(ref q) => q.dependencies(),
290        }
291    }
292
293    fn into_bindings(&self) -> Vec<Binding> {
294        // @TODO provide a general fold for plans
295        match *self {
296            Plan::Project(ref projection) => projection.into_bindings(),
297            Plan::Aggregate(ref aggregate) => aggregate.into_bindings(),
298            Plan::Union(ref union) => union.into_bindings(),
299            Plan::Join(ref join) => join.into_bindings(),
300            Plan::Hector(ref hector) => hector.into_bindings(),
301            Plan::Antijoin(ref antijoin) => antijoin.into_bindings(),
302            Plan::Negate(ref plan) => plan.into_bindings(),
303            Plan::Filter(ref filter) => filter.into_bindings(),
304            Plan::Transform(ref transform) => transform.into_bindings(),
305            Plan::MatchA(e, ref a, v) => vec![Binding::attribute(e, a, v)],
306            Plan::MatchEA(match_e, ref a, v) => {
307                let e = gensym();
308                vec![
309                    Binding::attribute(e, a, v),
310                    Binding::constant(e, Value::Eid(match_e)),
311                ]
312            }
313            Plan::MatchAV(e, ref a, ref match_v) => {
314                let v = gensym();
315                vec![
316                    Binding::attribute(e, a, v),
317                    Binding::constant(v, match_v.clone()),
318                ]
319            }
320            Plan::NameExpr(_, ref _name) => unimplemented!(), // @TODO hmm...
321            Plan::Pull(ref pull) => pull.into_bindings(),
322            Plan::PullLevel(ref path) => path.into_bindings(),
323            Plan::PullAll(ref path) => path.into_bindings(),
324            #[cfg(feature = "graphql")]
325            Plan::GraphQl(ref q) => q.into_bindings(),
326        }
327    }
328
329    fn datafy(&self) -> Vec<(Eid, Aid, Value)> {
330        // @TODO provide a general fold for plans
331        match *self {
332            Plan::Project(ref projection) => projection.datafy(),
333            Plan::Aggregate(ref aggregate) => aggregate.datafy(),
334            Plan::Union(ref union) => union.datafy(),
335            Plan::Join(ref join) => join.datafy(),
336            Plan::Hector(ref hector) => hector.datafy(),
337            Plan::Antijoin(ref antijoin) => antijoin.datafy(),
338            Plan::Negate(ref plan) => plan.datafy(),
339            Plan::Filter(ref filter) => filter.datafy(),
340            Plan::Transform(ref transform) => transform.datafy(),
341            Plan::MatchA(_e, ref a, _v) => vec![(
342                next_id(),
343                "df.pattern/a".to_string(),
344                Value::Aid(a.to_string()),
345            )],
346            Plan::MatchEA(e, ref a, _) => vec![
347                (next_id(), "df.pattern/e".to_string(), Value::Eid(e)),
348                (
349                    next_id(),
350                    "df.pattern/a".to_string(),
351                    Value::Aid(a.to_string()),
352                ),
353            ],
354            Plan::MatchAV(_, ref a, ref v) => vec![
355                (
356                    next_id(),
357                    "df.pattern/a".to_string(),
358                    Value::Aid(a.to_string()),
359                ),
360                (next_id(), "df.pattern/v".to_string(), v.clone()),
361            ],
362            Plan::NameExpr(_, ref _name) => Vec::new(),
363            Plan::Pull(ref pull) => pull.datafy(),
364            Plan::PullLevel(ref path) => path.datafy(),
365            Plan::PullAll(ref path) => path.datafy(),
366            #[cfg(feature = "graphql")]
367            Plan::GraphQl(ref q) => q.datafy(),
368        }
369    }
370
371    fn implement<'b, T, I, S>(
372        &self,
373        nested: &mut Iterative<'b, S, u64>,
374        local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
375        context: &mut I,
376    ) -> (Implemented<'b, S>, ShutdownHandle)
377    where
378        T: Timestamp + Lattice,
379        I: ImplContext<T>,
380        S: Scope<Timestamp = T>,
381    {
382        match *self {
383            Plan::Project(ref projection) => {
384                projection.implement(nested, local_arrangements, context)
385            }
386            Plan::Aggregate(ref aggregate) => {
387                aggregate.implement(nested, local_arrangements, context)
388            }
389            Plan::Union(ref union) => union.implement(nested, local_arrangements, context),
390            Plan::Join(ref join) => join.implement(nested, local_arrangements, context),
391            Plan::Hector(ref hector) => hector.implement(nested, local_arrangements, context),
392            Plan::Antijoin(ref antijoin) => antijoin.implement(nested, local_arrangements, context),
393            Plan::Negate(ref plan) => {
394                let (relation, mut shutdown_handle) =
395                    plan.implement(nested, local_arrangements, context);
396                let variables = relation.variables();
397
398                let tuples = {
399                    let (projected, shutdown) = relation.projected(nested, context, &variables);
400                    shutdown_handle.merge_with(shutdown);
401
402                    projected.negate()
403                };
404
405                (
406                    Implemented::Collection(CollectionRelation { variables, tuples }),
407                    shutdown_handle,
408                )
409            }
410            Plan::Filter(ref filter) => filter.implement(nested, local_arrangements, context),
411            Plan::Transform(ref transform) => {
412                transform.implement(nested, local_arrangements, context)
413            }
414            Plan::MatchA(e, ref a, v) => {
415                let binding = AttributeBinding {
416                    variables: (e, v),
417                    source_attribute: a.to_string(),
418                };
419
420                (Implemented::Attribute(binding), ShutdownHandle::empty())
421            }
422            Plan::MatchEA(match_e, ref a, sym1) => {
423                let (tuples, shutdown_propose) = match context.forward_propose(a) {
424                    None => panic!("attribute {:?} does not exist", a),
425                    Some(propose_trace) => {
426                        let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
427                        let (propose, shutdown_propose) =
428                            propose_trace.import_core(&nested.parent, a);
429
430                        let tuples = propose
431                            .enter_at(nested, move |_, _, time| {
432                                let mut forwarded = time.clone();
433                                forwarded.advance_by(&frontier);
434                                Product::new(forwarded, 0)
435                            })
436                            .filter(move |e, _v| *e == Value::Eid(match_e))
437                            .as_collection(|_e, v| vec![v.clone()]);
438
439                        (tuples, shutdown_propose)
440                    }
441                };
442
443                let relation = CollectionRelation {
444                    variables: vec![sym1],
445                    tuples,
446                };
447
448                (
449                    Implemented::Collection(relation),
450                    ShutdownHandle::from_button(shutdown_propose),
451                )
452            }
453            Plan::MatchAV(sym1, ref a, ref match_v) => {
454                let (tuples, shutdown_propose) = match context.forward_propose(a) {
455                    None => panic!("attribute {:?} does not exist", a),
456                    Some(propose_trace) => {
457                        let match_v = match_v.clone();
458                        let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
459                        let (propose, shutdown_propose) =
460                            propose_trace.import_core(&nested.parent, a);
461
462                        let tuples = propose
463                            .enter_at(nested, move |_, _, time| {
464                                let mut forwarded = time.clone();
465                                forwarded.advance_by(&frontier);
466                                Product::new(forwarded, 0)
467                            })
468                            .filter(move |_e, v| *v == match_v)
469                            .as_collection(|e, _v| vec![e.clone()]);
470
471                        (tuples, shutdown_propose)
472                    }
473                };
474
475                let relation = CollectionRelation {
476                    variables: vec![sym1],
477                    tuples,
478                };
479
480                (
481                    Implemented::Collection(relation),
482                    ShutdownHandle::from_button(shutdown_propose),
483                )
484            }
485            Plan::NameExpr(ref syms, ref name) => {
486                if context.is_underconstrained(name) {
487                    match local_arrangements.get(name) {
488                        None => panic!("{:?} not in relation map", name),
489                        Some(named) => {
490                            let relation = CollectionRelation {
491                                variables: syms.clone(),
492                                tuples: named.deref().clone(), // @TODO re-use variable directly?
493                            };
494
495                            (Implemented::Collection(relation), ShutdownHandle::empty())
496                        }
497                    }
498                } else {
499                    // If a rule is not underconstrained, we can
500                    // safely re-use it. @TODO it's debatable whether
501                    // we should then immediately assume that it is
502                    // available as a global arrangement, but we'll do
503                    // so for now.
504
505                    match context.global_arrangement(name) {
506                        None => panic!("{:?} not in query map", name),
507                        Some(named) => {
508                            let frontier: Vec<T> = named.advance_frontier().to_vec();
509                            let (arranged, shutdown_button) =
510                                named.import_core(&nested.parent, name);
511
512                            let relation = CollectionRelation {
513                                variables: syms.clone(),
514                                tuples: arranged
515                                    .enter_at(nested, move |_, _, time| {
516                                        let mut forwarded = time.clone();
517                                        forwarded.advance_by(&frontier);
518                                        Product::new(forwarded, 0)
519                                    })
520                                    // @TODO this destroys all the arrangement re-use
521                                    .as_collection(|tuple, _| tuple.clone()),
522                            };
523
524                            (
525                                Implemented::Collection(relation),
526                                ShutdownHandle::from_button(shutdown_button),
527                            )
528                        }
529                    }
530                }
531            }
532            Plan::Pull(ref pull) => pull.implement(nested, local_arrangements, context),
533            Plan::PullLevel(ref path) => path.implement(nested, local_arrangements, context),
534            Plan::PullAll(ref path) => path.implement(nested, local_arrangements, context),
535            #[cfg(feature = "graphql")]
536            Plan::GraphQl(ref query) => query.implement(nested, local_arrangements, context),
537        }
538    }
539}