1use std::collections::HashSet;
4use std::ops::Deref;
5use std::sync::atomic::{self, AtomicUsize};
6
7use timely::dataflow::scopes::child::Iterative;
8use timely::dataflow::Scope;
9use timely::order::Product;
10use timely::progress::Timestamp;
11
12use differential_dataflow::lattice::Lattice;
13use differential_dataflow::trace::TraceReader;
14
15use crate::binding::{AsBinding, AttributeBinding, Binding};
16use crate::Rule;
17use crate::{Aid, Eid, Value, Var};
18use crate::{
19 CollectionRelation, Implemented, Relation, RelationHandle, ShutdownHandle, VariableMap,
20};
21use crate::{TraceKeyHandle, TraceValHandle};
22
23#[cfg(feature = "set-semantics")]
24pub mod aggregate;
25#[cfg(not(feature = "set-semantics"))]
26pub mod aggregate_neu;
27pub mod antijoin;
28pub mod filter;
29#[cfg(feature = "graphql")]
30pub mod graphql;
31#[cfg(feature = "graphql")]
32pub mod graphql_v2;
33pub mod hector;
34pub mod join;
35pub mod project;
36pub mod pull;
37pub mod pull_v2;
38pub mod transform;
39pub mod union;
40
41#[cfg(feature = "set-semantics")]
42pub use self::aggregate::{Aggregate, AggregationFn};
43#[cfg(not(feature = "set-semantics"))]
44pub use self::aggregate_neu::{Aggregate, AggregationFn};
45pub use self::antijoin::Antijoin;
46pub use self::filter::{Filter, Predicate};
47#[cfg(feature = "graphql")]
48pub use self::graphql::GraphQl;
49pub use self::hector::Hector;
50pub use self::join::Join;
51pub use self::project::Project;
52pub use self::pull::{Pull, PullAll, PullLevel};
53pub use self::transform::{Function, Transform};
54pub use self::union::Union;
55
56static ID: AtomicUsize = AtomicUsize::new(0);
57static SYM: AtomicUsize = AtomicUsize::new(std::usize::MAX);
58
59pub fn next_id() -> Eid {
61 ID.fetch_add(1, atomic::Ordering::SeqCst) as Eid
62}
63
64pub fn gensym() -> Var {
66 SYM.fetch_sub(1, atomic::Ordering::SeqCst) as Var
67}
68
69pub trait ImplContext<T>
72where
73 T: Timestamp + Lattice,
74{
75 fn rule(&self, name: &str) -> Option<&Rule>;
77
78 fn global_arrangement(&mut self, name: &str) -> Option<&mut RelationHandle<T>>;
81
82 fn has_attribute(&self, name: &str) -> bool;
84
85 fn forward_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>>;
87
88 fn forward_propose(
90 &mut self,
91 name: &str,
92 ) -> Option<&mut TraceValHandle<Value, Value, T, isize>>;
93
94 fn forward_validate(
96 &mut self,
97 name: &str,
98 ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>>;
99
100 fn reverse_count(&mut self, name: &str) -> Option<&mut TraceKeyHandle<Value, T, isize>>;
102
103 fn reverse_propose(
105 &mut self,
106 name: &str,
107 ) -> Option<&mut TraceValHandle<Value, Value, T, isize>>;
108
109 fn reverse_validate(
111 &mut self,
112 name: &str,
113 ) -> Option<&mut TraceKeyHandle<(Value, Value), T, isize>>;
114
115 fn is_underconstrained(&self, name: &str) -> bool;
120}
121
122pub struct Dependencies {
124 pub names: HashSet<String>,
126 pub attributes: HashSet<Aid>,
128}
129
130impl Dependencies {
131 pub fn none() -> Dependencies {
133 Dependencies {
134 names: HashSet::new(),
135 attributes: HashSet::new(),
136 }
137 }
138
139 pub fn name(name: &str) -> Dependencies {
141 let mut names = HashSet::new();
142 names.insert(name.to_string());
143
144 Dependencies {
145 names,
146 attributes: HashSet::new(),
147 }
148 }
149
150 pub fn attribute(aid: &str) -> Dependencies {
152 let mut attributes = HashSet::new();
153 attributes.insert(aid.to_string());
154
155 Dependencies {
156 names: HashSet::new(),
157 attributes,
158 }
159 }
160
161 pub fn merge(left: Dependencies, right: Dependencies) -> Dependencies {
164 Dependencies {
165 names: left.names.union(&right.names).cloned().collect(),
166 attributes: left.attributes.union(&right.attributes).cloned().collect(),
167 }
168 }
169}
170
171pub trait Implementable {
173 fn dependencies(&self) -> Dependencies;
177
178 fn into_bindings(&self) -> Vec<Binding> {
181 panic!("This plan can't be implemented via Hector.");
182 }
183
184 fn datafy(&self) -> Vec<(Eid, Aid, Value)> {
186 Vec::new()
187 }
188
189 fn implement<'b, T, I, S>(
191 &self,
192 nested: &mut Iterative<'b, S, u64>,
193 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
194 context: &mut I,
195 ) -> (Implemented<'b, S>, ShutdownHandle)
196 where
197 T: Timestamp + Lattice,
198 I: ImplContext<T>,
199 S: Scope<Timestamp = T>;
200}
201
202#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
204pub enum Plan {
205 Project(Project<Plan>),
207 Aggregate(Aggregate<Plan>),
209 Union(Union<Plan>),
211 Join(Join<Plan, Plan>),
213 Hector(Hector),
215 Antijoin(Antijoin<Plan, Plan>),
217 Negate(Box<Plan>),
219 Filter(Filter<Plan>),
221 Transform(Transform<Plan>),
223 MatchA(Var, Aid, Var),
225 MatchEA(Eid, Aid, Var),
227 MatchAV(Var, Aid, Value),
229 NameExpr(Vec<Var>, String),
231 Pull(Pull<Plan>),
233 PullLevel(PullLevel<Plan>),
235 PullAll(PullAll),
237 #[cfg(feature = "graphql")]
239 GraphQl(GraphQl),
240}
241
242impl Plan {
243 pub fn variables(&self) -> Vec<Var> {
245 match *self {
246 Plan::Project(ref projection) => projection.variables.clone(),
247 Plan::Aggregate(ref aggregate) => aggregate.variables.clone(),
248 Plan::Union(ref union) => union.variables.clone(),
249 Plan::Join(ref join) => join.variables.clone(),
250 Plan::Hector(ref hector) => hector.variables.clone(),
251 Plan::Antijoin(ref antijoin) => antijoin.variables.clone(),
252 Plan::Negate(ref plan) => plan.variables(),
253 Plan::Filter(ref filter) => filter.variables.clone(),
254 Plan::Transform(ref transform) => transform.variables.clone(),
255 Plan::MatchA(e, _, v) => vec![e, v],
256 Plan::MatchEA(_, _, v) => vec![v],
257 Plan::MatchAV(e, _, _) => vec![e],
258 Plan::NameExpr(ref variables, ref _name) => variables.clone(),
259 Plan::Pull(ref pull) => pull.variables.clone(),
260 Plan::PullLevel(ref path) => path.variables.clone(),
261 Plan::PullAll(ref path) => path.variables.clone(),
262 #[cfg(feature = "graphql")]
263 Plan::GraphQl(_) => unimplemented!(),
264 }
265 }
266}
267
268impl Implementable for Plan {
269 fn dependencies(&self) -> Dependencies {
270 match *self {
272 Plan::Project(ref projection) => projection.dependencies(),
273 Plan::Aggregate(ref aggregate) => aggregate.dependencies(),
274 Plan::Union(ref union) => union.dependencies(),
275 Plan::Join(ref join) => join.dependencies(),
276 Plan::Hector(ref hector) => hector.dependencies(),
277 Plan::Antijoin(ref antijoin) => antijoin.dependencies(),
278 Plan::Negate(ref plan) => plan.dependencies(),
279 Plan::Filter(ref filter) => filter.dependencies(),
280 Plan::Transform(ref transform) => transform.dependencies(),
281 Plan::MatchA(_, ref a, _) => Dependencies::attribute(a),
282 Plan::MatchEA(_, ref a, _) => Dependencies::attribute(a),
283 Plan::MatchAV(_, ref a, _) => Dependencies::attribute(a),
284 Plan::NameExpr(_, ref name) => Dependencies::name(name),
285 Plan::Pull(ref pull) => pull.dependencies(),
286 Plan::PullLevel(ref path) => path.dependencies(),
287 Plan::PullAll(ref path) => path.dependencies(),
288 #[cfg(feature = "graphql")]
289 Plan::GraphQl(ref q) => q.dependencies(),
290 }
291 }
292
293 fn into_bindings(&self) -> Vec<Binding> {
294 match *self {
296 Plan::Project(ref projection) => projection.into_bindings(),
297 Plan::Aggregate(ref aggregate) => aggregate.into_bindings(),
298 Plan::Union(ref union) => union.into_bindings(),
299 Plan::Join(ref join) => join.into_bindings(),
300 Plan::Hector(ref hector) => hector.into_bindings(),
301 Plan::Antijoin(ref antijoin) => antijoin.into_bindings(),
302 Plan::Negate(ref plan) => plan.into_bindings(),
303 Plan::Filter(ref filter) => filter.into_bindings(),
304 Plan::Transform(ref transform) => transform.into_bindings(),
305 Plan::MatchA(e, ref a, v) => vec![Binding::attribute(e, a, v)],
306 Plan::MatchEA(match_e, ref a, v) => {
307 let e = gensym();
308 vec![
309 Binding::attribute(e, a, v),
310 Binding::constant(e, Value::Eid(match_e)),
311 ]
312 }
313 Plan::MatchAV(e, ref a, ref match_v) => {
314 let v = gensym();
315 vec![
316 Binding::attribute(e, a, v),
317 Binding::constant(v, match_v.clone()),
318 ]
319 }
320 Plan::NameExpr(_, ref _name) => unimplemented!(), Plan::Pull(ref pull) => pull.into_bindings(),
322 Plan::PullLevel(ref path) => path.into_bindings(),
323 Plan::PullAll(ref path) => path.into_bindings(),
324 #[cfg(feature = "graphql")]
325 Plan::GraphQl(ref q) => q.into_bindings(),
326 }
327 }
328
329 fn datafy(&self) -> Vec<(Eid, Aid, Value)> {
330 match *self {
332 Plan::Project(ref projection) => projection.datafy(),
333 Plan::Aggregate(ref aggregate) => aggregate.datafy(),
334 Plan::Union(ref union) => union.datafy(),
335 Plan::Join(ref join) => join.datafy(),
336 Plan::Hector(ref hector) => hector.datafy(),
337 Plan::Antijoin(ref antijoin) => antijoin.datafy(),
338 Plan::Negate(ref plan) => plan.datafy(),
339 Plan::Filter(ref filter) => filter.datafy(),
340 Plan::Transform(ref transform) => transform.datafy(),
341 Plan::MatchA(_e, ref a, _v) => vec![(
342 next_id(),
343 "df.pattern/a".to_string(),
344 Value::Aid(a.to_string()),
345 )],
346 Plan::MatchEA(e, ref a, _) => vec![
347 (next_id(), "df.pattern/e".to_string(), Value::Eid(e)),
348 (
349 next_id(),
350 "df.pattern/a".to_string(),
351 Value::Aid(a.to_string()),
352 ),
353 ],
354 Plan::MatchAV(_, ref a, ref v) => vec![
355 (
356 next_id(),
357 "df.pattern/a".to_string(),
358 Value::Aid(a.to_string()),
359 ),
360 (next_id(), "df.pattern/v".to_string(), v.clone()),
361 ],
362 Plan::NameExpr(_, ref _name) => Vec::new(),
363 Plan::Pull(ref pull) => pull.datafy(),
364 Plan::PullLevel(ref path) => path.datafy(),
365 Plan::PullAll(ref path) => path.datafy(),
366 #[cfg(feature = "graphql")]
367 Plan::GraphQl(ref q) => q.datafy(),
368 }
369 }
370
371 fn implement<'b, T, I, S>(
372 &self,
373 nested: &mut Iterative<'b, S, u64>,
374 local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
375 context: &mut I,
376 ) -> (Implemented<'b, S>, ShutdownHandle)
377 where
378 T: Timestamp + Lattice,
379 I: ImplContext<T>,
380 S: Scope<Timestamp = T>,
381 {
382 match *self {
383 Plan::Project(ref projection) => {
384 projection.implement(nested, local_arrangements, context)
385 }
386 Plan::Aggregate(ref aggregate) => {
387 aggregate.implement(nested, local_arrangements, context)
388 }
389 Plan::Union(ref union) => union.implement(nested, local_arrangements, context),
390 Plan::Join(ref join) => join.implement(nested, local_arrangements, context),
391 Plan::Hector(ref hector) => hector.implement(nested, local_arrangements, context),
392 Plan::Antijoin(ref antijoin) => antijoin.implement(nested, local_arrangements, context),
393 Plan::Negate(ref plan) => {
394 let (relation, mut shutdown_handle) =
395 plan.implement(nested, local_arrangements, context);
396 let variables = relation.variables();
397
398 let tuples = {
399 let (projected, shutdown) = relation.projected(nested, context, &variables);
400 shutdown_handle.merge_with(shutdown);
401
402 projected.negate()
403 };
404
405 (
406 Implemented::Collection(CollectionRelation { variables, tuples }),
407 shutdown_handle,
408 )
409 }
410 Plan::Filter(ref filter) => filter.implement(nested, local_arrangements, context),
411 Plan::Transform(ref transform) => {
412 transform.implement(nested, local_arrangements, context)
413 }
414 Plan::MatchA(e, ref a, v) => {
415 let binding = AttributeBinding {
416 variables: (e, v),
417 source_attribute: a.to_string(),
418 };
419
420 (Implemented::Attribute(binding), ShutdownHandle::empty())
421 }
422 Plan::MatchEA(match_e, ref a, sym1) => {
423 let (tuples, shutdown_propose) = match context.forward_propose(a) {
424 None => panic!("attribute {:?} does not exist", a),
425 Some(propose_trace) => {
426 let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
427 let (propose, shutdown_propose) =
428 propose_trace.import_core(&nested.parent, a);
429
430 let tuples = propose
431 .enter_at(nested, move |_, _, time| {
432 let mut forwarded = time.clone();
433 forwarded.advance_by(&frontier);
434 Product::new(forwarded, 0)
435 })
436 .filter(move |e, _v| *e == Value::Eid(match_e))
437 .as_collection(|_e, v| vec![v.clone()]);
438
439 (tuples, shutdown_propose)
440 }
441 };
442
443 let relation = CollectionRelation {
444 variables: vec![sym1],
445 tuples,
446 };
447
448 (
449 Implemented::Collection(relation),
450 ShutdownHandle::from_button(shutdown_propose),
451 )
452 }
453 Plan::MatchAV(sym1, ref a, ref match_v) => {
454 let (tuples, shutdown_propose) = match context.forward_propose(a) {
455 None => panic!("attribute {:?} does not exist", a),
456 Some(propose_trace) => {
457 let match_v = match_v.clone();
458 let frontier: Vec<T> = propose_trace.advance_frontier().to_vec();
459 let (propose, shutdown_propose) =
460 propose_trace.import_core(&nested.parent, a);
461
462 let tuples = propose
463 .enter_at(nested, move |_, _, time| {
464 let mut forwarded = time.clone();
465 forwarded.advance_by(&frontier);
466 Product::new(forwarded, 0)
467 })
468 .filter(move |_e, v| *v == match_v)
469 .as_collection(|e, _v| vec![e.clone()]);
470
471 (tuples, shutdown_propose)
472 }
473 };
474
475 let relation = CollectionRelation {
476 variables: vec![sym1],
477 tuples,
478 };
479
480 (
481 Implemented::Collection(relation),
482 ShutdownHandle::from_button(shutdown_propose),
483 )
484 }
485 Plan::NameExpr(ref syms, ref name) => {
486 if context.is_underconstrained(name) {
487 match local_arrangements.get(name) {
488 None => panic!("{:?} not in relation map", name),
489 Some(named) => {
490 let relation = CollectionRelation {
491 variables: syms.clone(),
492 tuples: named.deref().clone(), };
494
495 (Implemented::Collection(relation), ShutdownHandle::empty())
496 }
497 }
498 } else {
499 match context.global_arrangement(name) {
506 None => panic!("{:?} not in query map", name),
507 Some(named) => {
508 let frontier: Vec<T> = named.advance_frontier().to_vec();
509 let (arranged, shutdown_button) =
510 named.import_core(&nested.parent, name);
511
512 let relation = CollectionRelation {
513 variables: syms.clone(),
514 tuples: arranged
515 .enter_at(nested, move |_, _, time| {
516 let mut forwarded = time.clone();
517 forwarded.advance_by(&frontier);
518 Product::new(forwarded, 0)
519 })
520 .as_collection(|tuple, _| tuple.clone()),
522 };
523
524 (
525 Implemented::Collection(relation),
526 ShutdownHandle::from_button(shutdown_button),
527 )
528 }
529 }
530 }
531 }
532 Plan::Pull(ref pull) => pull.implement(nested, local_arrangements, context),
533 Plan::PullLevel(ref path) => path.implement(nested, local_arrangements, context),
534 Plan::PullAll(ref path) => path.implement(nested, local_arrangements, context),
535 #[cfg(feature = "graphql")]
536 Plan::GraphQl(ref query) => query.implement(nested, local_arrangements, context),
537 }
538 }
539}