declarative_dataflow/plan/hector.rs
1//! Worst-case optimal, n-way joins.
2//!
3//! This is an extended implementation of Delta-BiGJoin, by Ammar, McSherry,
4//! Salihoglu, and Joglekar ([paper](https://dl.acm.org/citation.cfm?id=3199520)).
5//!
6//! The overall structure and the CollectionExtender implementation is adapted from:
7//! https://github.com/frankmcsherry/differential-dataflow/tree/master/dogsdogsdogs
8
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::hash::Hash;
11use std::rc::Rc;
12
13use timely::dataflow::channels::pact::{Exchange, Pipeline};
14use timely::dataflow::operators::{Concatenate, Operator, Partition};
15use timely::dataflow::scopes::child::Iterative;
16use timely::dataflow::Scope;
17use timely::order::Product;
18use timely::progress::Timestamp;
19use timely::worker::AsWorker;
20use timely::PartialOrder;
21
22use timely_sort::Unsigned;
23
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::operators::arrange::Arranged;
26use differential_dataflow::operators::{Consolidate, Count};
27use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
28use differential_dataflow::{AsCollection, Collection, ExchangeData, Hashable};
29
30use crate::binding::{AsBinding, BinaryPredicate, Binding};
31use crate::binding::{BinaryPredicateBinding, ConstantBinding};
32use crate::logging::DeclarativeEvent;
33use crate::plan::{Dependencies, ImplContext, Implementable};
34use crate::timestamp::altneu::AltNeu;
35use crate::{Aid, Value, Var};
36use crate::{CollectionRelation, Implemented, ShutdownHandle, VariableMap};
37
38type Extender<'a, S, P, V> = Box<(dyn PrefixExtender<S, Prefix = P, Extension = V> + 'a)>;
39
40/// A type capable of extending a stream of prefixes. Implementors of
41/// `PrefixExtension` provide types and methods for extending a
42/// differential dataflow collection, via the three methods `count`,
43/// `propose`, and `validate`.
44trait PrefixExtender<G: Scope> {
45 /// The required type of prefix to extend.
46 type Prefix;
47 /// The type to be produced as extension.
48 type Extension;
49 /// Annotates prefixes with the number of extensions the relation would propose.
50 fn count(
51 &mut self,
52 prefixes: &Collection<G, (Self::Prefix, usize, usize)>,
53 index: usize,
54 ) -> Option<Collection<G, (Self::Prefix, usize, usize)>>;
55 /// Extends each prefix with corresponding extensions.
56 fn propose(
57 &mut self,
58 prefixes: &Collection<G, Self::Prefix>,
59 ) -> Collection<G, (Self::Prefix, Self::Extension)>;
60 /// Restricts proposed extensions by those the extender would have proposed.
61 fn validate(
62 &mut self,
63 extensions: &Collection<G, (Self::Prefix, Self::Extension)>,
64 ) -> Collection<G, (Self::Prefix, Self::Extension)>;
65}
66
67trait IntoExtender<'a, S, V>
68where
69 S: Scope,
70 S::Timestamp: Timestamp + Lattice,
71 V: ExchangeData + Hash,
72{
73 fn into_extender<P: ExchangeData + IndexNode<V>, B: AsBinding + std::fmt::Debug>(
74 &self,
75 prefix: &B,
76 ) -> Vec<Extender<'a, S, P, V>>;
77}
78
79impl<'a, S> IntoExtender<'a, S, Value> for ConstantBinding
80where
81 S: Scope,
82 S::Timestamp: Timestamp + Lattice,
83{
84 fn into_extender<P: ExchangeData + IndexNode<Value>, B: AsBinding + std::fmt::Debug>(
85 &self,
86 _prefix: &B,
87 ) -> Vec<Extender<'a, S, P, Value>> {
88 vec![Box::new(ConstantExtender {
89 phantom: std::marker::PhantomData,
90 value: self.value.clone(),
91 })]
92 }
93}
94
95impl<'a, S, V> IntoExtender<'a, S, V> for BinaryPredicateBinding
96where
97 S: Scope,
98 S::Timestamp: Timestamp + Lattice,
99 V: ExchangeData + Hash,
100{
101 fn into_extender<P: ExchangeData + IndexNode<V>, B: AsBinding + std::fmt::Debug>(
102 &self,
103 prefix: &B,
104 ) -> Vec<Extender<'a, S, P, V>> {
105 match direction(prefix, self.variables) {
106 Err(_msg) => {
107 // We won't panic here, this just means the predicate's variables
108 // aren't sufficiently bound by the prefixes yet.
109 vec![]
110 }
111 Ok(direction) => vec![Box::new(BinaryPredicateExtender {
112 phantom: std::marker::PhantomData,
113 predicate: self.predicate.clone(),
114 direction,
115 })],
116 }
117 }
118}
119
120//
121// OPERATOR
122//
123
124/// A plan stage joining two source relations on the specified
125/// variables. Throws if any of the join variables isn't bound by both
126/// sources.
127#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
128pub struct Hector {
129 /// Variables to bind.
130 pub variables: Vec<Var>,
131 /// Bindings to join.
132 pub bindings: Vec<Binding>,
133}
134
135enum Direction {
136 Forward(usize),
137 Reverse(usize),
138}
139
140fn direction<P>(prefix: &P, extender_variables: (Var, Var)) -> Result<Direction, &'static str>
141where
142 P: AsBinding + std::fmt::Debug,
143{
144 match AsBinding::binds(prefix, extender_variables.0) {
145 None => match AsBinding::binds(prefix, extender_variables.1) {
146 None => {
147 error!(
148 "Neither extender variable {:?} bound by prefix {:?}.",
149 extender_variables, prefix
150 );
151 Err("Neither extender variable bound by prefix.")
152 }
153 Some(offset) => Ok(Direction::Reverse(offset)),
154 },
155 Some(offset) => {
156 match AsBinding::binds(prefix, extender_variables.1) {
157 Some(_) => Err("Both extender variables already bound by prefix."),
158 None => {
159 // Prefix binds the first extender variable, but not
160 // the second. Can use forward index.
161 Ok(Direction::Forward(offset))
162 }
163 }
164 }
165 }
166}
167
168/// Bindings can be in conflict with the source binding of a given
169/// delta pipeline. We need to identify them and handle them as
170/// special cases, because we always have to start from prefixes of
171/// size two.
172pub fn source_conflicts(source_index: usize, bindings: &[Binding]) -> Vec<&Binding> {
173 match bindings[source_index] {
174 Binding::Attribute(ref source) => {
175 let prefix_0 = vec![source.variables.0];
176 let prefix_1 = vec![source.variables.1];
177
178 bindings
179 .iter()
180 .enumerate()
181 .flat_map(|(index, binding)| {
182 if index == source_index {
183 None
184 } else if binding.can_extend(&prefix_0, source.variables.1)
185 || binding.can_extend(&prefix_1, source.variables.0)
186 {
187 Some(binding)
188 } else {
189 None
190 }
191 })
192 .collect()
193 }
194 _ => panic!("Source must be an AttributeBinding."),
195 }
196}
197
198/// Orders the variables s.t. each has at least one binding from
199/// itself to a prior variable. `source_binding` indicates the binding
200/// from which we will source the prefixes in the resulting delta
201/// pipeline. Returns the chosen variable order and the corresponding
202/// binding order.
203///
204/// (adapted from github.com/frankmcsherry/dataflow-join/src/motif.rs)
205pub fn plan_order(source_index: usize, bindings: &[Binding]) -> (Vec<Var>, Vec<Binding>) {
206 let mut variables = bindings
207 .iter()
208 .flat_map(AsBinding::variables)
209 .collect::<Vec<Var>>();
210 variables.sort();
211 variables.dedup();
212
213 // Determine an order on the attributes. The order may not
214 // introduce a binding until one of its consituents is already
215 // bound by the prefix. These constraints are captured via the
216 // `AsBinding::ready_to_extend` method. The order may otherwise be
217 // arbitrary, for example selecting the most constrained attribute
218 // first. Presently, we just pick attributes arbitrarily.
219
220 let mut prefix: Vec<Var> = Vec::with_capacity(variables.len());
221 match bindings[source_index] {
222 Binding::Attribute(ref source) => {
223 prefix.push(source.variables.0);
224 prefix.push(source.variables.1);
225 }
226 _ => panic!("Source binding must be an attribute."),
227 }
228
229 let candidates_for = |bindings: &[Binding], target: Var| {
230 bindings
231 .iter()
232 .enumerate()
233 .flat_map(move |(index, other)| {
234 if index == source_index {
235 // Ignore the source binding itself.
236 None
237 } else if other.binds(target).is_some() {
238 Some(other.clone())
239 } else {
240 // Some bindings might not even talk about the target
241 // variable.
242 None
243 }
244 })
245 .collect::<Vec<Binding>>()
246 };
247
248 let mut ordered_bindings = Vec::new();
249 let mut candidates: Vec<Binding> = prefix
250 .iter()
251 .flat_map(|x| candidates_for(&bindings, *x))
252 .collect();
253
254 loop {
255 debug!("Candidates: {:?}", candidates);
256
257 let mut waiting_candidates = Vec::new();
258
259 candidates.sort();
260 candidates.dedup();
261
262 for candidate in candidates.drain(..) {
263 match candidate.ready_to_extend(&prefix) {
264 None => {
265 waiting_candidates.push(candidate);
266 }
267 Some(target) => {
268 if AsBinding::binds(&prefix, target).is_none() {
269 prefix.push(target);
270 for new_candidate in candidates_for(&bindings, target) {
271 if candidate != new_candidate {
272 waiting_candidates.push(new_candidate);
273 }
274 }
275 }
276
277 ordered_bindings.push(candidate);
278 }
279 }
280 }
281
282 if waiting_candidates.is_empty() {
283 break;
284 }
285
286 for candidate in waiting_candidates.drain(..) {
287 candidates.push(candidate);
288 }
289
290 if prefix.len() == variables.len() {
291 break;
292 }
293 }
294
295 debug!("Candidates: {:?}", candidates);
296
297 for candidate in candidates.drain(..) {
298 ordered_bindings.push(candidate);
299 }
300
301 (prefix, ordered_bindings)
302}
303
304trait IndexNode<V> {
305 fn index(&self, index: usize) -> V;
306}
307
308impl IndexNode<Value> for (Value, Value) {
309 #[inline(always)]
310 fn index(&self, index: usize) -> Value {
311 assert!(index <= 1);
312 if index == 0 {
313 self.0.clone()
314 } else {
315 self.1.clone()
316 }
317 }
318}
319
320impl IndexNode<Value> for (&Value, &Value) {
321 #[inline(always)]
322 fn index(&self, index: usize) -> Value {
323 assert!(index <= 1);
324 if index == 0 {
325 self.0.clone()
326 } else {
327 self.1.clone()
328 }
329 }
330}
331
332impl IndexNode<Value> for Vec<Value> {
333 #[inline(always)]
334 fn index(&self, index: usize) -> Value {
335 self[index].clone()
336 }
337}
338
339impl Hector {
340 // @TODO pass single binding as argument?
341 // @TODO make these static and take variables as well?
342
343 fn implement_single_binding<'b, T, I, S>(
344 &self,
345 nested: &mut Iterative<'b, S, u64>,
346 _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
347 context: &mut I,
348 ) -> (Implemented<'b, S>, ShutdownHandle)
349 where
350 T: Timestamp + Lattice,
351 I: ImplContext<T>,
352 S: Scope<Timestamp = T>,
353 {
354 // With only a single binding given, we don't want to do
355 // anything fancy (provided the binding is sourceable).
356
357 match self.bindings.first().unwrap() {
358 Binding::Attribute(binding) => {
359 match context.forward_propose(&binding.source_attribute) {
360 None => panic!("Unknown attribute {}", &binding.source_attribute),
361 Some(forward_trace) => {
362 let frontier: Vec<T> = forward_trace.advance_frontier().to_vec();
363 let (forward, shutdown_forward) = forward_trace.import_core(
364 &nested.parent,
365 &format!("Propose({})", &binding.source_attribute),
366 );
367
368 let prefix = binding.variables();
369 let target_variables = self.variables.clone();
370 let tuples = forward
371 .enter_at(nested, move |_, _, time| {
372 let mut forwarded = time.clone();
373 forwarded.advance_by(&frontier);
374 Product::new(forwarded, 0)
375 })
376 .as_collection(move |e, v| {
377 let tuple = (e, v);
378 target_variables
379 .iter()
380 .flat_map(|x| {
381 Some(tuple.index(AsBinding::binds(&prefix, *x).unwrap()))
382 })
383 .collect()
384 });
385
386 let relation = CollectionRelation {
387 variables: self.variables.clone(),
388 tuples,
389 };
390
391 (
392 Implemented::Collection(relation),
393 ShutdownHandle::from_button(shutdown_forward),
394 )
395 }
396 }
397 }
398 _ => {
399 panic!("Passed a single, non-sourceable binding.");
400 }
401 }
402 }
403
404 // fn two_way<'b, T, I, S>(
405 // nested: &mut Iterative<'b, S, u64>,
406 // _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
407 // context: &mut I,
408 // left: Binding,
409 // right: Binding,
410 // ) -> (Implemented<'b, S>, ShutdownHandle)
411 // where
412 // T: Timestamp + Lattice,
413 // I: ImplContext<T>,
414 // S: Scope<Timestamp = T>,
415 // {
416 // let (source, right) = match left {
417 // Binding::Attribute(source) => (source, right),
418 // _ => match right {
419 // Binding::Attribute(source) => (source, left),
420 // _ => panic!("At least one binding must be sourceable for Hector::two_way."),
421 // }
422 // };
423
424 // match right {
425 // Binding::Constant(constant_binding) => {
426 // let match_v = constant_binding.value;
427 // let offset = source.binds(constant_binding.variable)
428 // .unwrap_or_else(|| panic!("Source doesn't bind constant binding {:?}", constant_binding));
429
430 // match offset {
431 // 0 => {
432 // // [?a :edge ?b] (constant ?a x) <=> [x :edge ?b]
433
434 // let (index, shutdown) = context.forward_propose(&source.source_attribute)
435 // .unwrap_or_else(|| panic!("No forward index found for attribute {}", &source.source_attribute))
436 // .import_core(&nested.parent, &source.source_attribute);
437
438 // let frontier: Vec<T> = index.trace.advance_frontier().to_vec();
439
440 // index
441 // .filter(move |e, _v| *e == match_v)
442 // .enter_at(&nested, move |_, _, time| {
443 // let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
444 // Product::new(forwarded, Default::default())
445 // })
446 // }
447 // 1 => {
448 // // [?a :edge ?b] (constant ?b x) <=> [?a :edge x]
449
450 // let (index, shutdown) = context.reverse_propose(&source.source_attribute)
451 // .unwrap_or_else(|| panic!("No reverse index found for attribute {}", &source.source_attribute))
452 // .import_core(&nested.parent, &source.source_attribute);
453
454 // let frontier: Vec<T> = index.trace.advance_frontier().to_vec();
455
456 // index
457 // .filter(move |e, _v| *e == match_v)
458 // .enter_at(&nested, move |_, _, time| {
459 // let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
460 // Product::new(forwarded, Default::default())
461 // })
462 // }
463 // other => panic!("Unexpected offset {}", other),
464 // }
465 // }
466 // _ => unimplemented!(),
467 // }
468 // }
469}
470
471impl Implementable for Hector {
472 fn dependencies(&self) -> Dependencies {
473 let attributes = self
474 .bindings
475 .iter()
476 .flat_map(|binding| {
477 if let Binding::Attribute(binding) = binding {
478 Some(binding.source_attribute.clone())
479 } else {
480 None
481 }
482 })
483 .collect::<HashSet<Aid>>();
484
485 Dependencies {
486 names: HashSet::new(),
487 attributes,
488 }
489 }
490
491 fn into_bindings(&self) -> Vec<Binding> {
492 self.bindings.clone()
493 }
494
495 fn implement<'b, T, I, S>(
496 &self,
497 nested: &mut Iterative<'b, S, u64>,
498 _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
499 context: &mut I,
500 ) -> (Implemented<'b, S>, ShutdownHandle)
501 where
502 T: Timestamp + Lattice,
503 I: ImplContext<T>,
504 S: Scope<Timestamp = T>,
505 {
506 if self.bindings.is_empty() {
507 panic!("No bindings passed.");
508 } else if self.variables.is_empty() {
509 panic!("No variables requested.");
510 } else if self.bindings.len() == 1 {
511 self.implement_single_binding(nested, _local_arrangements, context)
512 // } else if self.bindings.len() == 2 {
513 // Hector::two_way(nested, _local_arrangements, context, self.bindings[0].clone(), self.bindings[1].clone())
514 } else {
515 // In order to avoid delta pipelines looking at each
516 // other's data in naughty ways, we need to run them all
517 // inside a scope with lexicographic times.
518
519 let (joined, shutdown_handle) = nested.scoped::<AltNeu<Product<T,u64>>, _, _>("AltNeu", |inner| {
520
521 let scope = inner.clone();
522
523 // We cache aggressively, to avoid importing and
524 // wrapping things more than once.
525
526 let mut shutdown_handle = ShutdownHandle::empty();
527
528 let mut forward_counts = HashMap::new();
529 let mut forward_proposes = HashMap::new();
530 let mut forward_validates = HashMap::new();
531
532 let mut reverse_counts = HashMap::new();
533 let mut reverse_proposes = HashMap::new();
534 let mut reverse_validates = HashMap::new();
535
536 // Attempt to acquire a logger for tuple counts.
537 let logger = {
538 let register = scope.parent.log_register();
539 register.get::<DeclarativeEvent>("declarative")
540 };
541
542 // For each AttributeBinding (only AttributeBindings
543 // actually experience change), we construct a delta query
544 // driven by changes to that binding.
545
546 let changes = self.bindings.iter().enumerate()
547 .flat_map(|(idx, delta_binding)| match delta_binding {
548 Binding::Attribute(delta_binding) => {
549
550 // We need to determine an order on the attributes
551 // that ensures that each is bound by preceeding
552 // attributes. For now, we will take the requested order.
553
554 // @TODO use binding order returned here?
555 // might be problematic to ensure ordering is maintained?
556 let (variables, _) = plan_order(idx, &self.bindings);
557
558 let mut prefix = Vec::with_capacity(variables.len());
559
560 debug!("Source {:?}", delta_binding);
561
562 // We would like to avoid starting with single-variable
563 // (or even empty) prefixes, because the dataflow-y nature
564 // of this implementation means we will always be starting
565 // from attributes (which correspond to two-variable prefixes).
566 //
567 // But to get away with that we need to check for single-variable
568 // bindings in conflict with the source binding.
569
570 let propose = forward_proposes
571 .entry(delta_binding.source_attribute.to_string())
572 .or_insert_with(|| {
573 let (arranged, shutdown) = context
574 .forward_propose(&delta_binding.source_attribute)
575 .expect("forward propose trace doesn't exist")
576 .import_core(&scope.parent.parent, &format!("Counts({})", &delta_binding.source_attribute));
577
578 shutdown_handle.add_button(shutdown);
579
580 arranged
581 });
582 let frontier: Vec<T> = propose.trace.advance_frontier().to_vec();
583
584 let mut source_conflicts = source_conflicts(idx, &self.bindings);
585
586 let mut source = if !source_conflicts.is_empty() {
587 // @TODO there can be more than one conflict
588 // @TODO Not just constant bindings can cause issues here!
589 assert_eq!(source_conflicts.len(), 1);
590
591 let conflict = source_conflicts.pop().unwrap();
592 // for conflict in source_conflicts.drain(..) {
593 match conflict {
594 Binding::Constant(constant_binding) => {
595 prefix.push(constant_binding.variable);
596
597 let match_v = constant_binding.value.clone();
598
599 // Guaranteed to intersect with offset zero at this point.
600 match direction(&prefix, delta_binding.variables).unwrap() {
601 Direction::Forward(_) => {
602 prefix.push(delta_binding.variables.1);
603
604 propose
605 .filter(move |e, _v| *e == match_v)
606 .enter_at(&scope.parent, move |_, _, time| {
607 let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
608 Product::new(forwarded, Default::default())
609 })
610 .enter(&scope)
611 .as_collection(|e,v| vec![e.clone(), v.clone()])
612 }
613 Direction::Reverse(_) => {
614 prefix.push(delta_binding.variables.0);
615
616 propose
617 .filter(move |_e, v| *v == match_v)
618 .enter_at(&scope.parent, move |_, _, time| {
619 let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
620 Product::new(forwarded, Default::default())
621 })
622 .enter(&scope)
623 .as_collection(|v,e| vec![e.clone(), v.clone()])
624 }
625 }
626 }
627 _ => panic!("Can't resolve conflicts on {:?} bindings", conflict),
628 // }
629 }
630 } else {
631 prefix.push(delta_binding.variables.0);
632 prefix.push(delta_binding.variables.1);
633
634 propose
635 .enter_at(&scope.parent, move |_, _, time| {
636 let mut forwarded = time.clone();
637 forwarded.advance_by(&frontier);
638 Product::new(forwarded, Default::default())
639 })
640 .enter(&scope)
641 .as_collection(|e,v| vec![e.clone(), v.clone()])
642 };
643
644 for target in variables.iter() {
645 match AsBinding::binds(&prefix, *target) {
646 Some(_) => { /* already bound */ continue },
647 None => {
648 debug!("Extending {:?} to {:?}", prefix, target);
649
650 let mut extenders: Vec<Extender<'_, _, Vec<Value>, _>> = vec![];
651
652 // Handling AntijoinBinding's requires dealing with recursion,
653 // because they wrap another binding. We don't actually want to wrap
654 // all of the below inside of a recursive function, because passing
655 // all these nested scopes and caches around leads to a world of lifetimes pain.
656 //
657 // Therefore we make our own little queue of bindings and process them iteratively.
658
659 let mut bindings: VecDeque<(usize, Binding)> = VecDeque::new();
660
661 for (idx, binding) in self.bindings.iter().cloned().enumerate() {
662 if let Binding::Not(antijoin_binding) = binding {
663 bindings.push_back((idx, (*antijoin_binding.binding).clone()));
664 bindings.push_back((idx, Binding::Not(antijoin_binding)));
665 } else {
666 bindings.push_back((idx, binding));
667 }
668 }
669
670 while let Some((other_idx, other)) = bindings.pop_front() {
671
672 // We need to distinguish between conflicting relations
673 // that appear before the current one in the sequence (< idx),
674 // and those that appear afterwards.
675
676 // Ignore the current delta source itself.
677 if other_idx == idx { continue; }
678
679 // Ignore any binding not talking about the target variable.
680 if other.binds(*target).is_none() { continue; }
681
682 // Ignore any binding that isn't ready to extend, either
683 // because it doesn't even talk about the target variable, or
684 // because none of its dependent variables are bound by the prefix
685 // yet (relevant for attributes).
686 if !other.can_extend(&prefix, *target) {
687 debug!("{:?} can't extend", other);
688 continue;
689 }
690
691 let is_neu = other_idx >= idx;
692
693 debug!("\t...using {:?}", other);
694
695 match other {
696 Binding::Not(_other) => {
697 // Due to the way we enqueued the bindings above, we can now
698 // rely on the internal exteneder being available as the last
699 // extender on the stack.
700 let internal_extender = extenders.pop().expect("No internal extender available on stack.");
701
702 extenders.push(
703 Box::new(AntijoinExtender {
704 phantom: std::marker::PhantomData,
705 extender: internal_extender,
706 })
707 );
708 }
709 Binding::Constant(other) => {
710 extenders.append(&mut other.into_extender(&prefix));
711 }
712 Binding::BinaryPredicate(other) => {
713 extenders.append(&mut other.into_extender(&prefix));
714 }
715 Binding::Attribute(other) => {
716 match direction(&prefix, other.variables) {
717 Err(msg) => panic!(msg),
718 Ok(direction) => match direction {
719 Direction::Forward(offset) => {
720 let count = {
721 let count = forward_counts
722 .entry(other.source_attribute.to_string())
723 .or_insert_with(|| {
724 let (arranged, shutdown) =
725 context.forward_count(&other.source_attribute)
726 .expect("forward count doesn't exist")
727 .import_core(&scope.parent.parent, &format!("Counts({})", &delta_binding.source_attribute));
728
729 shutdown_handle.add_button(shutdown);
730
731 arranged
732 });
733
734 let frontier: Vec<T> = count.trace.advance_frontier().to_vec();
735 let neu = is_neu;
736
737 count
738 .enter_at(&scope.parent, move |_, _, t| {
739 let mut forwarded = t.clone();
740 forwarded.advance_by(&frontier);
741 Product::new(forwarded, 0)
742 })
743 .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
744 };
745;
746 let propose = {
747 let propose = forward_proposes
748 .entry(other.source_attribute.to_string())
749 .or_insert_with(|| {
750 let (arranged, shutdown) =
751 context.forward_propose(&other.source_attribute)
752 .expect("forward propose doesn't exist")
753 .import_core(&scope.parent.parent, &format!("Propose({})", &delta_binding.source_attribute));
754
755 shutdown_handle.add_button(shutdown);
756
757 arranged
758 });
759
760 let frontier: Vec<T> = propose.trace.advance_frontier().to_vec();
761 let neu = is_neu;
762
763 propose
764 .enter_at(&scope.parent, move |_, _, t| {
765 let mut forwarded = t.clone();
766 forwarded.advance_by(&frontier);
767 Product::new(forwarded, 0)
768 })
769 .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
770 };
771
772 let validate = {
773 let validate = forward_validates
774 .entry(other.source_attribute.to_string())
775 .or_insert_with(|| {
776 let (arranged, shutdown) =
777 context.forward_validate(&other.source_attribute)
778 .expect("forward validate doesn't exist")
779 .import_core(&scope.parent.parent, &format!("Validate({})", &delta_binding.source_attribute));
780
781 shutdown_handle.add_button(shutdown);
782
783 arranged
784 });
785
786 let frontier: Vec<T> = validate.trace.advance_frontier().to_vec();
787 let neu = is_neu;
788
789 validate
790 .enter_at(&scope.parent, move |_, _, t| {
791 let mut forwarded = t.clone();
792 forwarded.advance_by(&frontier);
793 Product::new(forwarded, 0)
794 })
795 .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
796 };
797
798 extenders.push(
799 Box::new(CollectionExtender {
800 phantom: std::marker::PhantomData,
801 count,
802 propose,
803 validate,
804 key_selector: Rc::new(move |prefix: &Vec<Value>| prefix.index(offset)),
805 })
806 );
807 },
808 Direction::Reverse(offset) => {
809 let count = {
810 let count = reverse_counts
811 .entry(other.source_attribute.to_string())
812 .or_insert_with(|| {
813 let (arranged, shutdown) =
814 context.reverse_count(&other.source_attribute)
815 .expect("reverse count doesn't exist")
816 .import_core(&scope.parent.parent, &format!("_Counts({})", &delta_binding.source_attribute));
817
818 shutdown_handle.add_button(shutdown);
819
820 arranged
821 });
822
823 let frontier: Vec<T> = count.trace.advance_frontier().to_vec();
824 let neu = is_neu;
825
826 count
827 .enter_at(&scope.parent, move |_, _, t| {
828 let mut forwarded = t.clone();
829 forwarded.advance_by(&frontier);
830 Product::new(forwarded, 0)
831 })
832 .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
833 };
834;
835 let propose = {
836 let propose = reverse_proposes
837 .entry(other.source_attribute.to_string())
838 .or_insert_with(|| {
839 let (arranged, shutdown) =
840 context.reverse_propose(&other.source_attribute)
841 .expect("reverse propose doesn't exist")
842 .import_core(&scope.parent.parent, &format!("_Propose({})", &delta_binding.source_attribute));
843
844 shutdown_handle.add_button(shutdown);
845
846 arranged
847 });
848
849 let frontier: Vec<T> = propose.trace.advance_frontier().to_vec();
850 let neu = is_neu;
851
852 propose
853 .enter_at(&scope.parent, move |_, _, t| {
854 let mut forwarded = t.clone();
855 forwarded.advance_by(&frontier);
856 Product::new(forwarded, 0)
857 })
858 .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
859 };
860
861 let validate = {
862 let validate = reverse_validates
863 .entry(other.source_attribute.to_string())
864 .or_insert_with(|| {
865 let (arranged, shutdown) =
866 context.reverse_validate(&other.source_attribute)
867 .expect("reverse validate doesn't exist")
868 .import_core(&scope.parent.parent, &format!("_Validate({})", &delta_binding.source_attribute));
869
870 shutdown_handle.add_button(shutdown);
871
872 arranged
873 });
874
875 let frontier: Vec<T> = validate.trace.advance_frontier().to_vec();
876 let neu = is_neu;
877
878 validate
879 .enter_at(&scope.parent, move |_, _, t| {
880 let mut forwarded = t.clone();
881 forwarded.advance_by(&frontier);
882 Product::new(forwarded, 0)
883 })
884 .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
885 };
886
887 extenders.push(
888 Box::new(CollectionExtender {
889 phantom: std::marker::PhantomData,
890 count,
891 propose,
892 validate,
893 key_selector: Rc::new(move |prefix: &Vec<Value>| prefix.index(offset)),
894 })
895 );
896 },
897 }
898 }
899 }
900 }
901 }
902
903 prefix.push(*target);
904
905 // @TODO impl ProposeExtensionMethod for Arranged
906 let extended = source.extend(&mut extenders[..]);
907
908 if logger.is_some() {
909 let worker_index = scope.index();
910 let source_attribute = delta_binding.source_attribute.to_string();
911 extended
912 // .inspect(move |x| { println!("{} extended: {:?}", source_attribute, x); })
913 .map(|_| ())
914 .consolidate()
915 .count()
916 .map(move |(_, count)| (Value::Eid(worker_index as u64), Value::Number(count as i64)))
917 .leave()
918 .leave()
919 .inspect(move |x| { println!("{}: {:?}", source_attribute, x); });
920 }
921
922 source = extended
923 .map(|(tuple,v)| {
924 let mut out = Vec::with_capacity(tuple.len() + 1);
925 out.append(&mut tuple.clone());
926 out.push(v);
927
928 out
929 })
930 }
931 }
932 }
933
934 if self.variables == prefix {
935 Some(source.inner)
936 } else {
937 let target_variables = self.variables.clone();
938
939 Some(source
940 .map(move |tuple| {
941 target_variables.iter()
942 .flat_map(|x| Some(tuple.index(AsBinding::binds(&prefix, *x).unwrap())))
943 .collect()
944 })
945 .inner)
946 }
947 }
948 _ => None
949 });
950
951 (inner.concatenate(changes).as_collection().leave(), shutdown_handle)
952 });
953
954 let relation = CollectionRelation {
955 variables: self.variables.clone(),
956 tuples: joined,
957 };
958
959 (Implemented::Collection(relation), shutdown_handle)
960 }
961 }
962}
963
964//
965// GENERIC IMPLEMENTATION
966//
967
968trait ProposeExtensionMethod<'a, S: Scope, P: ExchangeData + Ord> {
969 fn extend<E: ExchangeData + Ord>(
970 &self,
971 extenders: &mut [Extender<'a, S, P, E>],
972 ) -> Collection<S, (P, E)>;
973}
974
975impl<'a, S: Scope, P: ExchangeData + Ord> ProposeExtensionMethod<'a, S, P> for Collection<S, P> {
976 fn extend<E: ExchangeData + Ord>(
977 &self,
978 extenders: &mut [Extender<'a, S, P, E>],
979 ) -> Collection<S, (P, E)> {
980 if extenders.is_empty() {
981 // @TODO don't panic
982 panic!("No extenders specified.");
983 } else if extenders.len() == 1 {
984 extenders[0].propose(&self.clone())
985 } else {
986 let mut counts = self.map(|p| (p, 1 << 31, 0));
987 for (index, extender) in extenders.iter_mut().enumerate() {
988 if let Some(new_counts) = extender.count(&counts, index) {
989 counts = new_counts;
990 }
991 }
992
993 let parts = counts
994 .inner
995 .partition(extenders.len() as u64, |((p, _, i), t, d)| {
996 (i as u64, (p, t, d))
997 });
998
999 let mut results = Vec::new();
1000 for (index, nominations) in parts.into_iter().enumerate() {
1001 let mut extensions = extenders[index].propose(&nominations.as_collection());
1002 for other in (0..extenders.len()).filter(|&x| x != index) {
1003 extensions = extenders[other].validate(&extensions);
1004 }
1005
1006 results.push(extensions.inner); // save extensions
1007 }
1008
1009 self.scope().concatenate(results).as_collection()
1010 }
1011 }
1012}
1013
1014struct ConstantExtender<P, V>
1015where
1016 V: ExchangeData + Hash,
1017{
1018 phantom: std::marker::PhantomData<P>,
1019 value: V,
1020}
1021
1022impl<'a, S, V, P> PrefixExtender<S> for ConstantExtender<P, V>
1023where
1024 S: Scope,
1025 S::Timestamp: Lattice + ExchangeData,
1026 V: ExchangeData + Hash,
1027 P: ExchangeData,
1028{
1029 type Prefix = P;
1030 type Extension = V;
1031
1032 fn count(
1033 &mut self,
1034 prefixes: &Collection<S, (P, usize, usize)>,
1035 index: usize,
1036 ) -> Option<Collection<S, (P, usize, usize)>> {
1037 Some(prefixes.map(move |(prefix, old_count, old_index)| {
1038 if 1 < old_count {
1039 (prefix.clone(), 1, index)
1040 } else {
1041 (prefix.clone(), old_count, old_index)
1042 }
1043 }))
1044 }
1045
1046 fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1047 let value = self.value.clone();
1048 prefixes.map(move |prefix| (prefix.clone(), value.clone()))
1049 }
1050
1051 fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1052 let target = self.value.clone();
1053 extensions.filter(move |(_prefix, extension)| *extension == target)
1054 }
1055}
1056
1057struct BinaryPredicateExtender<P, V>
1058where
1059 V: ExchangeData + Hash,
1060{
1061 phantom: std::marker::PhantomData<(P, V)>,
1062 predicate: BinaryPredicate,
1063 direction: Direction,
1064}
1065
1066impl<'a, S, V, P> PrefixExtender<S> for BinaryPredicateExtender<P, V>
1067where
1068 S: Scope,
1069 S::Timestamp: Lattice + ExchangeData,
1070 V: ExchangeData + Hash,
1071 P: ExchangeData + IndexNode<V>,
1072{
1073 type Prefix = P;
1074 type Extension = V;
1075
1076 fn count(
1077 &mut self,
1078 _prefixes: &Collection<S, (P, usize, usize)>,
1079 _index: usize,
1080 ) -> Option<Collection<S, (P, usize, usize)>> {
1081 None
1082 }
1083
1084 fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1085 prefixes.map(|_prefix| panic!("BinaryPredicateExtender should never be asked to propose."))
1086 }
1087
1088 fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1089 use self::BinaryPredicate::{EQ, GT, GTE, LT, LTE, NEQ};
1090 match self.direction {
1091 Direction::Reverse(offset) => {
1092 match self.predicate {
1093 LT => extensions
1094 .filter(move |(prefix, extension)| *extension > prefix.index(offset)),
1095 LTE => extensions
1096 .filter(move |(prefix, extension)| *extension >= prefix.index(offset)),
1097 GT => extensions
1098 .filter(move |(prefix, extension)| *extension < prefix.index(offset)),
1099 GTE => extensions
1100 .filter(move |(prefix, extension)| *extension <= prefix.index(offset)),
1101 EQ => extensions
1102 .filter(move |(prefix, extension)| *extension == prefix.index(offset)),
1103 NEQ => extensions
1104 .filter(move |(prefix, extension)| *extension != prefix.index(offset)),
1105 }
1106 }
1107 Direction::Forward(offset) => {
1108 match self.predicate {
1109 LT => extensions
1110 .filter(move |(prefix, extension)| *extension < prefix.index(offset)),
1111 LTE => extensions
1112 .filter(move |(prefix, extension)| *extension <= prefix.index(offset)),
1113 GT => extensions
1114 .filter(move |(prefix, extension)| *extension > prefix.index(offset)),
1115 GTE => extensions
1116 .filter(move |(prefix, extension)| *extension >= prefix.index(offset)),
1117 EQ => extensions
1118 .filter(move |(prefix, extension)| *extension == prefix.index(offset)),
1119 NEQ => extensions
1120 .filter(move |(prefix, extension)| *extension != prefix.index(offset)),
1121 }
1122 }
1123 }
1124 }
1125}
1126
1127struct CollectionExtender<S, K, V, P, F, TrCount, TrPropose, TrValidate>
1128where
1129 S: Scope,
1130 S::Timestamp: Lattice + ExchangeData,
1131 K: ExchangeData,
1132 V: ExchangeData,
1133 F: Fn(&P) -> K,
1134 TrCount: TraceReader<Key = K, Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1135 TrCount::Batch: BatchReader<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1136 TrCount::Cursor: Cursor<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1137 TrPropose: TraceReader<Key = K, Val = V, Time = S::Timestamp, R = isize> + Clone + 'static,
1138 TrPropose::Batch:
1139 BatchReader<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1140 TrPropose::Cursor: Cursor<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1141 TrValidate:
1142 TraceReader<Key = (K, V), Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1143 TrValidate::Batch:
1144 BatchReader<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1145 TrValidate::Cursor:
1146 Cursor<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1147{
1148 phantom: std::marker::PhantomData<P>,
1149 count: Arranged<S, TrCount>,
1150 propose: Arranged<S, TrPropose>,
1151 validate: Arranged<S, TrValidate>,
1152 key_selector: Rc<F>,
1153}
1154
1155impl<'a, S, K, V, P, F, TrCount, TrPropose, TrValidate> PrefixExtender<S>
1156 for CollectionExtender<S, K, V, P, F, TrCount, TrPropose, TrValidate>
1157where
1158 S: Scope,
1159 S::Timestamp: Lattice + ExchangeData,
1160 K: ExchangeData + Hash,
1161 V: ExchangeData + Hash,
1162 P: ExchangeData,
1163 F: Fn(&P) -> K + 'static,
1164 TrCount: TraceReader<Key = K, Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1165 TrCount::Batch: BatchReader<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1166 TrCount::Cursor: Cursor<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1167 TrPropose: TraceReader<Key = K, Val = V, Time = S::Timestamp, R = isize> + Clone + 'static,
1168 TrPropose::Batch:
1169 BatchReader<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1170 TrPropose::Cursor: Cursor<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1171 TrValidate:
1172 TraceReader<Key = (K, V), Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1173 TrValidate::Batch:
1174 BatchReader<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1175 TrValidate::Cursor:
1176 Cursor<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1177{
1178 type Prefix = P;
1179 type Extension = V;
1180
1181 fn count(
1182 &mut self,
1183 prefixes: &Collection<S, (P, usize, usize)>,
1184 index: usize,
1185 ) -> Option<Collection<S, (P, usize, usize)>> {
1186 // This method takes a stream of `(prefix, time, diff)`
1187 // changes, and we want to produce the corresponding stream of
1188 // `((prefix, count), time, diff)` changes, just by looking up
1189 // `count` in `count_trace`. We are just doing a stream of
1190 // changes and a stream of look-ups, no consolidation or any
1191 // funny business like that. We *could* organize the input
1192 // differences by key and save some time, or we could skip
1193 // that.
1194
1195 let counts = &self.count;
1196 let mut counts_trace = Some(counts.trace.clone());
1197
1198 let mut stash = HashMap::new();
1199 let logic1 = self.key_selector.clone();
1200 let logic2 = self.key_selector.clone();
1201
1202 let exchange = Exchange::new(move |update: &((P, usize, usize), S::Timestamp, isize)| {
1203 logic1(&(update.0).0).hashed().as_u64()
1204 });
1205
1206 let mut buffer1 = Vec::new();
1207 let mut buffer2 = Vec::new();
1208
1209 // TODO: This should be a custom operator with no connection from the second input to the output.
1210 Some(
1211 prefixes
1212 .inner
1213 .binary_frontier(&counts.stream, exchange, Pipeline, "Count", move |_, _| {
1214 move |input1, input2, output| {
1215 // drain the first input, stashing requests.
1216 input1.for_each(|capability, data| {
1217 data.swap(&mut buffer1);
1218 stash
1219 .entry(capability.retain())
1220 .or_insert_with(Vec::new)
1221 .extend(buffer1.drain(..))
1222 });
1223
1224 // advance the `distinguish_since` frontier to allow all merges.
1225 input2.for_each(|_, batches| {
1226 batches.swap(&mut buffer2);
1227 for batch in buffer2.drain(..) {
1228 if let Some(ref mut trace) = counts_trace {
1229 trace.distinguish_since(batch.upper());
1230 }
1231 }
1232 });
1233
1234 if let Some(ref mut trace) = counts_trace {
1235 for (capability, prefixes) in stash.iter_mut() {
1236 // defer requests at incomplete times.
1237 // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
1238 if !input2.frontier.less_equal(capability.time()) {
1239 let mut session = output.session(capability);
1240
1241 // sort requests for in-order cursor traversal. could consolidate?
1242 prefixes
1243 .sort_by(|x, y| logic2(&(x.0).0).cmp(&logic2(&(y.0).0)));
1244
1245 let (mut cursor, storage) = trace.cursor();
1246
1247 for &mut (
1248 (ref prefix, old_count, old_index),
1249 ref time,
1250 ref mut diff,
1251 ) in prefixes.iter_mut()
1252 {
1253 if !input2.frontier.less_equal(time) {
1254 let key = logic2(prefix);
1255 cursor.seek_key(&storage, &key);
1256 if cursor.get_key(&storage) == Some(&key) {
1257 let mut count = 0;
1258 cursor.map_times(&storage, |t, d| {
1259 if t.less_equal(time) {
1260 count += d;
1261 }
1262 });
1263 // assert!(count >= 0);
1264 let count = count as usize;
1265 if count > 0 {
1266 if count < old_count {
1267 session.give((
1268 (prefix.clone(), count, index),
1269 time.clone(),
1270 *diff,
1271 ));
1272 } else {
1273 session.give((
1274 (prefix.clone(), old_count, old_index),
1275 time.clone(),
1276 *diff,
1277 ));
1278 }
1279 }
1280 }
1281 *diff = 0;
1282 }
1283 }
1284
1285 prefixes.retain(|ptd| ptd.2 != 0);
1286 }
1287 }
1288 }
1289
1290 // drop fully processed capabilities.
1291 stash.retain(|_, prefixes| !prefixes.is_empty());
1292
1293 // advance the consolidation frontier (TODO: wierd lexicographic times!)
1294 if let Some(trace) = counts_trace.as_mut() {
1295 trace.advance_by(&input1.frontier().frontier());
1296 }
1297
1298 if input1.frontier().is_empty() && stash.is_empty() {
1299 counts_trace = None;
1300 }
1301 }
1302 })
1303 .as_collection(),
1304 )
1305 }
1306
1307 fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1308 let propose = &self.propose;
1309 let mut propose_trace = Some(propose.trace.clone());
1310
1311 let mut stash = HashMap::new();
1312 let logic1 = self.key_selector.clone();
1313 let logic2 = self.key_selector.clone();
1314
1315 let mut buffer1 = Vec::new();
1316 let mut buffer2 = Vec::new();
1317
1318 let exchange = Exchange::new(move |update: &(P, S::Timestamp, isize)| {
1319 logic1(&update.0).hashed().as_u64()
1320 });
1321
1322 prefixes
1323 .inner
1324 .binary_frontier(
1325 &propose.stream,
1326 exchange,
1327 Pipeline,
1328 "Propose",
1329 move |_, _| {
1330 move |input1, input2, output| {
1331 // drain the first input, stashing requests.
1332 input1.for_each(|capability, data| {
1333 data.swap(&mut buffer1);
1334 stash
1335 .entry(capability.retain())
1336 .or_insert_with(Vec::new)
1337 .extend(buffer1.drain(..))
1338 });
1339
1340 // advance the `distinguish_since` frontier to allow all merges.
1341 input2.for_each(|_, batches| {
1342 batches.swap(&mut buffer2);
1343 for batch in buffer2.drain(..) {
1344 if let Some(ref mut trace) = propose_trace {
1345 trace.distinguish_since(batch.upper());
1346 }
1347 }
1348 });
1349
1350 if let Some(ref mut trace) = propose_trace {
1351 for (capability, prefixes) in stash.iter_mut() {
1352 // defer requests at incomplete times.
1353 // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
1354 if !input2.frontier.less_equal(capability.time()) {
1355 let mut session = output.session(capability);
1356
1357 // sort requests for in-order cursor traversal. could consolidate?
1358 prefixes.sort_by(|x, y| logic2(&x.0).cmp(&logic2(&y.0)));
1359
1360 let (mut cursor, storage) = trace.cursor();
1361
1362 for &mut (ref prefix, ref time, ref mut diff) in
1363 prefixes.iter_mut()
1364 {
1365 if !input2.frontier.less_equal(time) {
1366 let key = logic2(prefix);
1367 cursor.seek_key(&storage, &key);
1368 if cursor.get_key(&storage) == Some(&key) {
1369 while let Some(value) = cursor.get_val(&storage) {
1370 let mut count = 0;
1371 cursor.map_times(&storage, |t, d| {
1372 if t.less_equal(time) {
1373 count += d;
1374 }
1375 });
1376 // assert!(count >= 0);
1377 if count > 0 {
1378 session.give((
1379 (prefix.clone(), value.clone()),
1380 time.clone(),
1381 *diff,
1382 ));
1383 }
1384 cursor.step_val(&storage);
1385 }
1386 cursor.rewind_vals(&storage);
1387 }
1388 *diff = 0;
1389 }
1390 }
1391
1392 prefixes.retain(|ptd| ptd.2 != 0);
1393 }
1394 }
1395 }
1396
1397 // drop fully processed capabilities.
1398 stash.retain(|_, prefixes| !prefixes.is_empty());
1399
1400 // advance the consolidation frontier (TODO: wierd lexicographic times!)
1401 if let Some(trace) = propose_trace.as_mut() {
1402 trace.advance_by(&input1.frontier().frontier());
1403 }
1404
1405 if input1.frontier().is_empty() && stash.is_empty() {
1406 propose_trace = None;
1407 }
1408 }
1409 },
1410 )
1411 .as_collection()
1412 }
1413
1414 fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1415 // This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
1416 // stream of `((prefix, count), time, diff)` changes, just by looking up `count` in `count_trace`. We are
1417 // just doing a stream of changes and a stream of look-ups, no consolidation or any funny business like
1418 // that. We *could* organize the input differences by key and save some time, or we could skip that.
1419
1420 let validate = &self.validate;
1421 let mut validate_trace = Some(validate.trace.clone());
1422
1423 let mut stash = HashMap::new();
1424 let logic1 = self.key_selector.clone();
1425 let logic2 = self.key_selector.clone();
1426
1427 let mut buffer1 = Vec::new();
1428 let mut buffer2 = Vec::new();
1429
1430 let exchange = Exchange::new(move |update: &((P, V), S::Timestamp, isize)| {
1431 (logic1(&(update.0).0).clone(), ((update.0).1).clone())
1432 .hashed()
1433 .as_u64()
1434 });
1435
1436 extensions
1437 .inner
1438 .binary_frontier(
1439 &validate.stream,
1440 exchange,
1441 Pipeline,
1442 "Validate",
1443 move |_, _| {
1444 move |input1, input2, output| {
1445 // drain the first input, stashing requests.
1446 input1.for_each(|capability, data| {
1447 data.swap(&mut buffer1);
1448 stash
1449 .entry(capability.retain())
1450 .or_insert_with(Vec::new)
1451 .extend(buffer1.drain(..))
1452 });
1453
1454 // advance the `distinguish_since` frontier to allow all merges.
1455 input2.for_each(|_, batches| {
1456 batches.swap(&mut buffer2);
1457 for batch in buffer2.drain(..) {
1458 if let Some(ref mut trace) = validate_trace {
1459 trace.distinguish_since(batch.upper());
1460 }
1461 }
1462 });
1463
1464 if let Some(ref mut trace) = validate_trace {
1465 for (capability, prefixes) in stash.iter_mut() {
1466 // defer requests at incomplete times.
1467 // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
1468 if !input2.frontier.less_equal(capability.time()) {
1469 let mut session = output.session(capability);
1470
1471 // sort requests for in-order cursor traversal. could consolidate?
1472 prefixes.sort_by(|x, y| {
1473 (logic2(&(x.0).0), &((x.0).1))
1474 .cmp(&(logic2(&(y.0).0), &((y.0).1)))
1475 });
1476
1477 let (mut cursor, storage) = trace.cursor();
1478
1479 for &mut (ref prefix, ref time, ref mut diff) in
1480 prefixes.iter_mut()
1481 {
1482 if !input2.frontier.less_equal(time) {
1483 let key = (logic2(&prefix.0), (prefix.1).clone());
1484 cursor.seek_key(&storage, &key);
1485 if cursor.get_key(&storage) == Some(&key) {
1486 let mut count = 0;
1487 cursor.map_times(&storage, |t, d| {
1488 if t.less_equal(time) {
1489 count += d;
1490 }
1491 });
1492 // assert!(count >= 0);
1493 if count > 0 {
1494 session.give((
1495 prefix.clone(),
1496 time.clone(),
1497 *diff,
1498 ));
1499 }
1500 }
1501 *diff = 0;
1502 }
1503 }
1504
1505 prefixes.retain(|ptd| ptd.2 != 0);
1506 }
1507 }
1508 }
1509
1510 // drop fully processed capabilities.
1511 stash.retain(|_, prefixes| !prefixes.is_empty());
1512
1513 // advance the consolidation frontier (TODO: wierd lexicographic times!)
1514 if let Some(trace) = validate_trace.as_mut() {
1515 trace.advance_by(&input1.frontier().frontier());
1516 }
1517
1518 if input1.frontier().is_empty() && stash.is_empty() {
1519 validate_trace = None;
1520 }
1521 }
1522 },
1523 )
1524 .as_collection()
1525 }
1526}
1527
1528struct AntijoinExtender<'a, S, V, P>
1529where
1530 S: Scope,
1531 S::Timestamp: Lattice + ExchangeData,
1532 V: ExchangeData,
1533{
1534 phantom: std::marker::PhantomData<P>,
1535 extender: Extender<'a, S, P, V>,
1536}
1537
1538impl<'a, S, V, P> PrefixExtender<S> for AntijoinExtender<'a, S, V, P>
1539where
1540 S: Scope,
1541 S::Timestamp: Lattice + ExchangeData,
1542 V: ExchangeData + Hash,
1543 P: ExchangeData,
1544{
1545 type Prefix = P;
1546 type Extension = V;
1547
1548 fn count(
1549 &mut self,
1550 _prefixes: &Collection<S, (P, usize, usize)>,
1551 _index: usize,
1552 ) -> Option<Collection<S, (P, usize, usize)>> {
1553 None
1554 }
1555
1556 fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1557 prefixes.map(|_prefix| panic!("AntijoinExtender should never be asked to propose."))
1558 }
1559
1560 fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1561 extensions.concat(&self.extender.validate(extensions).negate())
1562 }
1563}