declarative_dataflow/plan/
hector.rs

1//! Worst-case optimal, n-way joins.
2//!
3//! This is an extended implementation of Delta-BiGJoin, by Ammar, McSherry,
4//! Salihoglu, and Joglekar ([paper](https://dl.acm.org/citation.cfm?id=3199520)).
5//!
6//! The overall structure and the CollectionExtender implementation is adapted from:
7//! https://github.com/frankmcsherry/differential-dataflow/tree/master/dogsdogsdogs
8
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::hash::Hash;
11use std::rc::Rc;
12
13use timely::dataflow::channels::pact::{Exchange, Pipeline};
14use timely::dataflow::operators::{Concatenate, Operator, Partition};
15use timely::dataflow::scopes::child::Iterative;
16use timely::dataflow::Scope;
17use timely::order::Product;
18use timely::progress::Timestamp;
19use timely::worker::AsWorker;
20use timely::PartialOrder;
21
22use timely_sort::Unsigned;
23
24use differential_dataflow::lattice::Lattice;
25use differential_dataflow::operators::arrange::Arranged;
26use differential_dataflow::operators::{Consolidate, Count};
27use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
28use differential_dataflow::{AsCollection, Collection, ExchangeData, Hashable};
29
30use crate::binding::{AsBinding, BinaryPredicate, Binding};
31use crate::binding::{BinaryPredicateBinding, ConstantBinding};
32use crate::logging::DeclarativeEvent;
33use crate::plan::{Dependencies, ImplContext, Implementable};
34use crate::timestamp::altneu::AltNeu;
35use crate::{Aid, Value, Var};
36use crate::{CollectionRelation, Implemented, ShutdownHandle, VariableMap};
37
38type Extender<'a, S, P, V> = Box<(dyn PrefixExtender<S, Prefix = P, Extension = V> + 'a)>;
39
40/// A type capable of extending a stream of prefixes. Implementors of
41/// `PrefixExtension` provide types and methods for extending a
42/// differential dataflow collection, via the three methods `count`,
43/// `propose`, and `validate`.
44trait PrefixExtender<G: Scope> {
45    /// The required type of prefix to extend.
46    type Prefix;
47    /// The type to be produced as extension.
48    type Extension;
49    /// Annotates prefixes with the number of extensions the relation would propose.
50    fn count(
51        &mut self,
52        prefixes: &Collection<G, (Self::Prefix, usize, usize)>,
53        index: usize,
54    ) -> Option<Collection<G, (Self::Prefix, usize, usize)>>;
55    /// Extends each prefix with corresponding extensions.
56    fn propose(
57        &mut self,
58        prefixes: &Collection<G, Self::Prefix>,
59    ) -> Collection<G, (Self::Prefix, Self::Extension)>;
60    /// Restricts proposed extensions by those the extender would have proposed.
61    fn validate(
62        &mut self,
63        extensions: &Collection<G, (Self::Prefix, Self::Extension)>,
64    ) -> Collection<G, (Self::Prefix, Self::Extension)>;
65}
66
67trait IntoExtender<'a, S, V>
68where
69    S: Scope,
70    S::Timestamp: Timestamp + Lattice,
71    V: ExchangeData + Hash,
72{
73    fn into_extender<P: ExchangeData + IndexNode<V>, B: AsBinding + std::fmt::Debug>(
74        &self,
75        prefix: &B,
76    ) -> Vec<Extender<'a, S, P, V>>;
77}
78
79impl<'a, S> IntoExtender<'a, S, Value> for ConstantBinding
80where
81    S: Scope,
82    S::Timestamp: Timestamp + Lattice,
83{
84    fn into_extender<P: ExchangeData + IndexNode<Value>, B: AsBinding + std::fmt::Debug>(
85        &self,
86        _prefix: &B,
87    ) -> Vec<Extender<'a, S, P, Value>> {
88        vec![Box::new(ConstantExtender {
89            phantom: std::marker::PhantomData,
90            value: self.value.clone(),
91        })]
92    }
93}
94
95impl<'a, S, V> IntoExtender<'a, S, V> for BinaryPredicateBinding
96where
97    S: Scope,
98    S::Timestamp: Timestamp + Lattice,
99    V: ExchangeData + Hash,
100{
101    fn into_extender<P: ExchangeData + IndexNode<V>, B: AsBinding + std::fmt::Debug>(
102        &self,
103        prefix: &B,
104    ) -> Vec<Extender<'a, S, P, V>> {
105        match direction(prefix, self.variables) {
106            Err(_msg) => {
107                // We won't panic here, this just means the predicate's variables
108                // aren't sufficiently bound by the prefixes yet.
109                vec![]
110            }
111            Ok(direction) => vec![Box::new(BinaryPredicateExtender {
112                phantom: std::marker::PhantomData,
113                predicate: self.predicate.clone(),
114                direction,
115            })],
116        }
117    }
118}
119
120//
121// OPERATOR
122//
123
124/// A plan stage joining two source relations on the specified
125/// variables. Throws if any of the join variables isn't bound by both
126/// sources.
127#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
128pub struct Hector {
129    /// Variables to bind.
130    pub variables: Vec<Var>,
131    /// Bindings to join.
132    pub bindings: Vec<Binding>,
133}
134
135enum Direction {
136    Forward(usize),
137    Reverse(usize),
138}
139
140fn direction<P>(prefix: &P, extender_variables: (Var, Var)) -> Result<Direction, &'static str>
141where
142    P: AsBinding + std::fmt::Debug,
143{
144    match AsBinding::binds(prefix, extender_variables.0) {
145        None => match AsBinding::binds(prefix, extender_variables.1) {
146            None => {
147                error!(
148                    "Neither extender variable {:?} bound by prefix {:?}.",
149                    extender_variables, prefix
150                );
151                Err("Neither extender variable bound by prefix.")
152            }
153            Some(offset) => Ok(Direction::Reverse(offset)),
154        },
155        Some(offset) => {
156            match AsBinding::binds(prefix, extender_variables.1) {
157                Some(_) => Err("Both extender variables already bound by prefix."),
158                None => {
159                    // Prefix binds the first extender variable, but not
160                    // the second. Can use forward index.
161                    Ok(Direction::Forward(offset))
162                }
163            }
164        }
165    }
166}
167
168/// Bindings can be in conflict with the source binding of a given
169/// delta pipeline. We need to identify them and handle them as
170/// special cases, because we always have to start from prefixes of
171/// size two.
172pub fn source_conflicts(source_index: usize, bindings: &[Binding]) -> Vec<&Binding> {
173    match bindings[source_index] {
174        Binding::Attribute(ref source) => {
175            let prefix_0 = vec![source.variables.0];
176            let prefix_1 = vec![source.variables.1];
177
178            bindings
179                .iter()
180                .enumerate()
181                .flat_map(|(index, binding)| {
182                    if index == source_index {
183                        None
184                    } else if binding.can_extend(&prefix_0, source.variables.1)
185                        || binding.can_extend(&prefix_1, source.variables.0)
186                    {
187                        Some(binding)
188                    } else {
189                        None
190                    }
191                })
192                .collect()
193        }
194        _ => panic!("Source must be an AttributeBinding."),
195    }
196}
197
198/// Orders the variables s.t. each has at least one binding from
199/// itself to a prior variable. `source_binding` indicates the binding
200/// from which we will source the prefixes in the resulting delta
201/// pipeline. Returns the chosen variable order and the corresponding
202/// binding order.
203///
204/// (adapted from github.com/frankmcsherry/dataflow-join/src/motif.rs)
205pub fn plan_order(source_index: usize, bindings: &[Binding]) -> (Vec<Var>, Vec<Binding>) {
206    let mut variables = bindings
207        .iter()
208        .flat_map(AsBinding::variables)
209        .collect::<Vec<Var>>();
210    variables.sort();
211    variables.dedup();
212
213    // Determine an order on the attributes. The order may not
214    // introduce a binding until one of its consituents is already
215    // bound by the prefix. These constraints are captured via the
216    // `AsBinding::ready_to_extend` method. The order may otherwise be
217    // arbitrary, for example selecting the most constrained attribute
218    // first. Presently, we just pick attributes arbitrarily.
219
220    let mut prefix: Vec<Var> = Vec::with_capacity(variables.len());
221    match bindings[source_index] {
222        Binding::Attribute(ref source) => {
223            prefix.push(source.variables.0);
224            prefix.push(source.variables.1);
225        }
226        _ => panic!("Source binding must be an attribute."),
227    }
228
229    let candidates_for = |bindings: &[Binding], target: Var| {
230        bindings
231            .iter()
232            .enumerate()
233            .flat_map(move |(index, other)| {
234                if index == source_index {
235                    // Ignore the source binding itself.
236                    None
237                } else if other.binds(target).is_some() {
238                    Some(other.clone())
239                } else {
240                    // Some bindings might not even talk about the target
241                    // variable.
242                    None
243                }
244            })
245            .collect::<Vec<Binding>>()
246    };
247
248    let mut ordered_bindings = Vec::new();
249    let mut candidates: Vec<Binding> = prefix
250        .iter()
251        .flat_map(|x| candidates_for(&bindings, *x))
252        .collect();
253
254    loop {
255        debug!("Candidates: {:?}", candidates);
256
257        let mut waiting_candidates = Vec::new();
258
259        candidates.sort();
260        candidates.dedup();
261
262        for candidate in candidates.drain(..) {
263            match candidate.ready_to_extend(&prefix) {
264                None => {
265                    waiting_candidates.push(candidate);
266                }
267                Some(target) => {
268                    if AsBinding::binds(&prefix, target).is_none() {
269                        prefix.push(target);
270                        for new_candidate in candidates_for(&bindings, target) {
271                            if candidate != new_candidate {
272                                waiting_candidates.push(new_candidate);
273                            }
274                        }
275                    }
276
277                    ordered_bindings.push(candidate);
278                }
279            }
280        }
281
282        if waiting_candidates.is_empty() {
283            break;
284        }
285
286        for candidate in waiting_candidates.drain(..) {
287            candidates.push(candidate);
288        }
289
290        if prefix.len() == variables.len() {
291            break;
292        }
293    }
294
295    debug!("Candidates: {:?}", candidates);
296
297    for candidate in candidates.drain(..) {
298        ordered_bindings.push(candidate);
299    }
300
301    (prefix, ordered_bindings)
302}
303
304trait IndexNode<V> {
305    fn index(&self, index: usize) -> V;
306}
307
308impl IndexNode<Value> for (Value, Value) {
309    #[inline(always)]
310    fn index(&self, index: usize) -> Value {
311        assert!(index <= 1);
312        if index == 0 {
313            self.0.clone()
314        } else {
315            self.1.clone()
316        }
317    }
318}
319
320impl IndexNode<Value> for (&Value, &Value) {
321    #[inline(always)]
322    fn index(&self, index: usize) -> Value {
323        assert!(index <= 1);
324        if index == 0 {
325            self.0.clone()
326        } else {
327            self.1.clone()
328        }
329    }
330}
331
332impl IndexNode<Value> for Vec<Value> {
333    #[inline(always)]
334    fn index(&self, index: usize) -> Value {
335        self[index].clone()
336    }
337}
338
339impl Hector {
340    // @TODO pass single binding as argument?
341    // @TODO make these static and take variables as well?
342
343    fn implement_single_binding<'b, T, I, S>(
344        &self,
345        nested: &mut Iterative<'b, S, u64>,
346        _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
347        context: &mut I,
348    ) -> (Implemented<'b, S>, ShutdownHandle)
349    where
350        T: Timestamp + Lattice,
351        I: ImplContext<T>,
352        S: Scope<Timestamp = T>,
353    {
354        // With only a single binding given, we don't want to do
355        // anything fancy (provided the binding is sourceable).
356
357        match self.bindings.first().unwrap() {
358            Binding::Attribute(binding) => {
359                match context.forward_propose(&binding.source_attribute) {
360                    None => panic!("Unknown attribute {}", &binding.source_attribute),
361                    Some(forward_trace) => {
362                        let frontier: Vec<T> = forward_trace.advance_frontier().to_vec();
363                        let (forward, shutdown_forward) = forward_trace.import_core(
364                            &nested.parent,
365                            &format!("Propose({})", &binding.source_attribute),
366                        );
367
368                        let prefix = binding.variables();
369                        let target_variables = self.variables.clone();
370                        let tuples = forward
371                            .enter_at(nested, move |_, _, time| {
372                                let mut forwarded = time.clone();
373                                forwarded.advance_by(&frontier);
374                                Product::new(forwarded, 0)
375                            })
376                            .as_collection(move |e, v| {
377                                let tuple = (e, v);
378                                target_variables
379                                    .iter()
380                                    .flat_map(|x| {
381                                        Some(tuple.index(AsBinding::binds(&prefix, *x).unwrap()))
382                                    })
383                                    .collect()
384                            });
385
386                        let relation = CollectionRelation {
387                            variables: self.variables.clone(),
388                            tuples,
389                        };
390
391                        (
392                            Implemented::Collection(relation),
393                            ShutdownHandle::from_button(shutdown_forward),
394                        )
395                    }
396                }
397            }
398            _ => {
399                panic!("Passed a single, non-sourceable binding.");
400            }
401        }
402    }
403
404    // fn two_way<'b, T, I, S>(
405    //     nested: &mut Iterative<'b, S, u64>,
406    //     _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
407    //     context: &mut I,
408    //     left: Binding,
409    //     right: Binding,
410    // ) -> (Implemented<'b, S>, ShutdownHandle)
411    // where
412    //     T: Timestamp + Lattice,
413    //     I: ImplContext<T>,
414    //     S: Scope<Timestamp = T>,
415    // {
416    //     let (source, right) = match left {
417    //         Binding::Attribute(source) => (source, right),
418    //         _ => match right {
419    //             Binding::Attribute(source) => (source, left),
420    //             _ => panic!("At least one binding must be sourceable for Hector::two_way."),
421    //         }
422    //     };
423
424    //     match right {
425    //         Binding::Constant(constant_binding) => {
426    //             let match_v = constant_binding.value;
427    //             let offset = source.binds(constant_binding.variable)
428    //                 .unwrap_or_else(|| panic!("Source doesn't bind constant binding {:?}", constant_binding));
429
430    //             match offset {
431    //                 0 => {
432    //                     // [?a :edge ?b] (constant ?a x) <=> [x :edge ?b]
433
434    //                     let (index, shutdown) = context.forward_propose(&source.source_attribute)
435    //                         .unwrap_or_else(|| panic!("No forward index found for attribute {}", &source.source_attribute))
436    //                         .import_core(&nested.parent, &source.source_attribute);
437
438    //                     let frontier: Vec<T> = index.trace.advance_frontier().to_vec();
439
440    //                     index
441    //                         .filter(move |e, _v| *e == match_v)
442    //                         .enter_at(&nested, move |_, _, time| {
443    //                             let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
444    //                             Product::new(forwarded, Default::default())
445    //                         })
446    //                 }
447    //                 1 => {
448    //                     // [?a :edge ?b] (constant ?b x) <=> [?a :edge x]
449
450    //                     let (index, shutdown) = context.reverse_propose(&source.source_attribute)
451    //                         .unwrap_or_else(|| panic!("No reverse index found for attribute {}", &source.source_attribute))
452    //                         .import_core(&nested.parent, &source.source_attribute);
453
454    //                     let frontier: Vec<T> = index.trace.advance_frontier().to_vec();
455
456    //                     index
457    //                         .filter(move |e, _v| *e == match_v)
458    //                         .enter_at(&nested, move |_, _, time| {
459    //                             let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
460    //                             Product::new(forwarded, Default::default())
461    //                         })
462    //                 }
463    //                 other => panic!("Unexpected offset {}", other),
464    //             }
465    //         }
466    //         _ => unimplemented!(),
467    //     }
468    // }
469}
470
471impl Implementable for Hector {
472    fn dependencies(&self) -> Dependencies {
473        let attributes = self
474            .bindings
475            .iter()
476            .flat_map(|binding| {
477                if let Binding::Attribute(binding) = binding {
478                    Some(binding.source_attribute.clone())
479                } else {
480                    None
481                }
482            })
483            .collect::<HashSet<Aid>>();
484
485        Dependencies {
486            names: HashSet::new(),
487            attributes,
488        }
489    }
490
491    fn into_bindings(&self) -> Vec<Binding> {
492        self.bindings.clone()
493    }
494
495    fn implement<'b, T, I, S>(
496        &self,
497        nested: &mut Iterative<'b, S, u64>,
498        _local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
499        context: &mut I,
500    ) -> (Implemented<'b, S>, ShutdownHandle)
501    where
502        T: Timestamp + Lattice,
503        I: ImplContext<T>,
504        S: Scope<Timestamp = T>,
505    {
506        if self.bindings.is_empty() {
507            panic!("No bindings passed.");
508        } else if self.variables.is_empty() {
509            panic!("No variables requested.");
510        } else if self.bindings.len() == 1 {
511            self.implement_single_binding(nested, _local_arrangements, context)
512        // } else if self.bindings.len() == 2 {
513        //     Hector::two_way(nested, _local_arrangements, context, self.bindings[0].clone(), self.bindings[1].clone())
514        } else {
515            // In order to avoid delta pipelines looking at each
516            // other's data in naughty ways, we need to run them all
517            // inside a scope with lexicographic times.
518
519            let (joined, shutdown_handle) = nested.scoped::<AltNeu<Product<T,u64>>, _, _>("AltNeu", |inner| {
520
521                let scope = inner.clone();
522
523                // We cache aggressively, to avoid importing and
524                // wrapping things more than once.
525
526                let mut shutdown_handle = ShutdownHandle::empty();
527
528                let mut forward_counts = HashMap::new();
529                let mut forward_proposes = HashMap::new();
530                let mut forward_validates = HashMap::new();
531
532                let mut reverse_counts = HashMap::new();
533                let mut reverse_proposes = HashMap::new();
534                let mut reverse_validates = HashMap::new();
535
536                // Attempt to acquire a logger for tuple counts.
537                let logger = {
538                    let register = scope.parent.log_register();
539                    register.get::<DeclarativeEvent>("declarative")
540                };
541
542                // For each AttributeBinding (only AttributeBindings
543                // actually experience change), we construct a delta query
544                // driven by changes to that binding.
545
546                let changes = self.bindings.iter().enumerate()
547                    .flat_map(|(idx, delta_binding)| match delta_binding {
548                        Binding::Attribute(delta_binding) => {
549
550                            // We need to determine an order on the attributes
551                            // that ensures that each is bound by preceeding
552                            // attributes. For now, we will take the requested order.
553
554                            // @TODO use binding order returned here?
555                            // might be problematic to ensure ordering is maintained?
556                            let (variables, _) = plan_order(idx, &self.bindings);
557
558                            let mut prefix = Vec::with_capacity(variables.len());
559
560                            debug!("Source {:?}", delta_binding);
561
562                            // We would like to avoid starting with single-variable
563                            // (or even empty) prefixes, because the dataflow-y nature
564                            // of this implementation means we will always be starting
565                            // from attributes (which correspond to two-variable prefixes).
566                            // 
567                            // But to get away with that we need to check for single-variable
568                            // bindings in conflict with the source binding.
569
570                            let propose = forward_proposes
571                                .entry(delta_binding.source_attribute.to_string())
572                                .or_insert_with(|| {
573                                    let (arranged, shutdown) = context
574                                        .forward_propose(&delta_binding.source_attribute)
575                                        .expect("forward propose trace doesn't exist")
576                                        .import_core(&scope.parent.parent, &format!("Counts({})", &delta_binding.source_attribute));
577
578                                    shutdown_handle.add_button(shutdown);
579
580                                    arranged
581                                });
582                            let frontier: Vec<T> = propose.trace.advance_frontier().to_vec();
583
584                            let mut source_conflicts = source_conflicts(idx, &self.bindings);
585
586                            let mut source = if !source_conflicts.is_empty() {
587                                // @TODO there can be more than one conflict
588                                // @TODO Not just constant bindings can cause issues here!
589                                assert_eq!(source_conflicts.len(), 1);
590
591                                let conflict = source_conflicts.pop().unwrap();
592                                // for conflict in source_conflicts.drain(..) {
593                                    match conflict {
594                                        Binding::Constant(constant_binding) => {
595                                            prefix.push(constant_binding.variable);
596
597                                            let match_v = constant_binding.value.clone();
598
599                                            // Guaranteed to intersect with offset zero at this point.
600                                            match direction(&prefix, delta_binding.variables).unwrap() {
601                                                Direction::Forward(_) => {
602                                                    prefix.push(delta_binding.variables.1);
603
604                                                    propose
605                                                        .filter(move |e, _v| *e == match_v)
606                                                        .enter_at(&scope.parent, move |_, _, time| {
607                                                            let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
608                                                            Product::new(forwarded, Default::default())
609                                                        })
610                                                        .enter(&scope)
611                                                        .as_collection(|e,v| vec![e.clone(), v.clone()])
612                                                }
613                                                Direction::Reverse(_) => {
614                                                    prefix.push(delta_binding.variables.0);
615
616                                                    propose
617                                                        .filter(move |_e, v| *v == match_v)
618                                                        .enter_at(&scope.parent, move |_, _, time| {
619                                                            let mut forwarded = time.clone(); forwarded.advance_by(&frontier);
620                                                            Product::new(forwarded, Default::default())
621                                                        })
622                                                        .enter(&scope)
623                                                        .as_collection(|v,e| vec![e.clone(), v.clone()])
624                                                }
625                                            }
626                                        }
627                                        _ => panic!("Can't resolve conflicts on {:?} bindings", conflict),
628                                    // }
629                                }
630                            } else {
631                                prefix.push(delta_binding.variables.0);
632                                prefix.push(delta_binding.variables.1);
633
634                                propose
635                                    .enter_at(&scope.parent, move |_, _, time| {
636                                        let mut forwarded = time.clone();
637                                        forwarded.advance_by(&frontier);
638                                        Product::new(forwarded, Default::default())
639                                    })
640                                    .enter(&scope)
641                                    .as_collection(|e,v| vec![e.clone(), v.clone()])
642                            };
643
644                            for target in variables.iter() {
645                                match AsBinding::binds(&prefix, *target) {
646                                    Some(_) => { /* already bound */ continue },
647                                    None => {
648                                        debug!("Extending {:?} to {:?}", prefix, target);
649
650                                        let mut extenders: Vec<Extender<'_, _, Vec<Value>, _>> = vec![];
651
652                                        // Handling AntijoinBinding's requires dealing with recursion,
653                                        // because they wrap another binding. We don't actually want to wrap
654                                        // all of the below inside of a recursive function, because passing
655                                        // all these nested scopes and caches around leads to a world of lifetimes pain.
656                                        //
657                                        // Therefore we make our own little queue of bindings and process them iteratively.
658
659                                        let mut bindings: VecDeque<(usize, Binding)> = VecDeque::new();
660
661                                        for (idx, binding) in self.bindings.iter().cloned().enumerate() {
662                                            if let Binding::Not(antijoin_binding) = binding {
663                                                bindings.push_back((idx, (*antijoin_binding.binding).clone()));
664                                                bindings.push_back((idx, Binding::Not(antijoin_binding)));
665                                            } else {
666                                                bindings.push_back((idx, binding));
667                                            }
668                                        }
669
670                                        while let Some((other_idx, other)) = bindings.pop_front() {
671
672                                            // We need to distinguish between conflicting relations
673                                            // that appear before the current one in the sequence (< idx),
674                                            // and those that appear afterwards.
675
676                                            // Ignore the current delta source itself.
677                                            if other_idx == idx { continue; }
678
679                                            // Ignore any binding not talking about the target variable.
680                                            if other.binds(*target).is_none() { continue; }
681
682                                            // Ignore any binding that isn't ready to extend, either
683                                            // because it doesn't even talk about the target variable, or
684                                            // because none of its dependent variables are bound by the prefix
685                                            // yet (relevant for attributes).
686                                            if !other.can_extend(&prefix, *target) {
687                                                debug!("{:?} can't extend", other);
688                                                continue;
689                                            }
690
691                                            let is_neu = other_idx >= idx;
692
693                                            debug!("\t...using {:?}", other);
694
695                                            match other {
696                                                Binding::Not(_other) => {
697                                                    // Due to the way we enqueued the bindings above, we can now
698                                                    // rely on the internal exteneder being available as the last
699                                                    // extender on the stack.
700                                                    let internal_extender = extenders.pop().expect("No internal extender available on stack.");
701
702                                                    extenders.push(
703                                                        Box::new(AntijoinExtender {
704                                                            phantom: std::marker::PhantomData,
705                                                            extender: internal_extender,
706                                                        })
707                                                    );
708                                                }
709                                                Binding::Constant(other) => {
710                                                    extenders.append(&mut other.into_extender(&prefix));
711                                                }
712                                                Binding::BinaryPredicate(other) => {
713                                                    extenders.append(&mut other.into_extender(&prefix));
714                                                }
715                                                Binding::Attribute(other) => {
716                                                    match direction(&prefix, other.variables) {
717                                                        Err(msg) => panic!(msg),
718                                                        Ok(direction) => match direction {
719                                                            Direction::Forward(offset) => {
720                                                                let count = {
721                                                                    let count = forward_counts
722                                                                        .entry(other.source_attribute.to_string())
723                                                                        .or_insert_with(|| {
724                                                                            let (arranged, shutdown) =
725                                                                                context.forward_count(&other.source_attribute)
726                                                                                .expect("forward count doesn't exist")
727                                                                                .import_core(&scope.parent.parent, &format!("Counts({})", &delta_binding.source_attribute));
728
729                                                                            shutdown_handle.add_button(shutdown);
730
731                                                                            arranged
732                                                                        });
733
734                                                                    let frontier: Vec<T> = count.trace.advance_frontier().to_vec();
735                                                                    let neu = is_neu;
736
737                                                                    count
738                                                                        .enter_at(&scope.parent, move |_, _, t| {
739                                                                            let mut forwarded = t.clone();
740                                                                            forwarded.advance_by(&frontier);
741                                                                            Product::new(forwarded, 0)
742                                                                        })
743                                                                        .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
744                                                                };
745;
746                                                                let propose = {
747                                                                    let propose = forward_proposes
748                                                                        .entry(other.source_attribute.to_string())
749                                                                        .or_insert_with(|| {
750                                                                            let (arranged, shutdown) =
751                                                                                context.forward_propose(&other.source_attribute)
752                                                                                .expect("forward propose doesn't exist")
753                                                                                .import_core(&scope.parent.parent, &format!("Propose({})", &delta_binding.source_attribute));
754
755                                                                            shutdown_handle.add_button(shutdown);
756
757                                                                            arranged
758                                                                        });
759
760                                                                    let frontier: Vec<T> = propose.trace.advance_frontier().to_vec();
761                                                                    let neu = is_neu;
762
763                                                                    propose
764                                                                        .enter_at(&scope.parent, move |_, _, t| {
765                                                                            let mut forwarded = t.clone();
766                                                                            forwarded.advance_by(&frontier);
767                                                                            Product::new(forwarded, 0)
768                                                                        })
769                                                                        .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
770                                                                };
771
772                                                                let validate = {
773                                                                    let validate = forward_validates
774                                                                        .entry(other.source_attribute.to_string())
775                                                                        .or_insert_with(|| {
776                                                                            let (arranged, shutdown) =
777                                                                                context.forward_validate(&other.source_attribute)
778                                                                                .expect("forward validate doesn't exist")
779                                                                                .import_core(&scope.parent.parent, &format!("Validate({})", &delta_binding.source_attribute));
780
781                                                                            shutdown_handle.add_button(shutdown);
782
783                                                                            arranged
784                                                                        });
785
786                                                                    let frontier: Vec<T> = validate.trace.advance_frontier().to_vec();
787                                                                    let neu = is_neu;
788
789                                                                    validate
790                                                                        .enter_at(&scope.parent, move |_, _, t| {
791                                                                            let mut forwarded = t.clone();
792                                                                            forwarded.advance_by(&frontier);
793                                                                            Product::new(forwarded, 0)
794                                                                        })
795                                                                        .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
796                                                                };
797
798                                                                extenders.push(
799                                                                    Box::new(CollectionExtender {
800                                                                        phantom: std::marker::PhantomData,
801                                                                        count,
802                                                                        propose,
803                                                                        validate,
804                                                                        key_selector: Rc::new(move |prefix: &Vec<Value>| prefix.index(offset)),
805                                                                    })
806                                                                );
807                                                            },
808                                                            Direction::Reverse(offset) => {
809                                                                let count = {
810                                                                    let count = reverse_counts
811                                                                        .entry(other.source_attribute.to_string())
812                                                                        .or_insert_with(|| {
813                                                                            let (arranged, shutdown) =
814                                                                                context.reverse_count(&other.source_attribute)
815                                                                                .expect("reverse count doesn't exist")
816                                                                                .import_core(&scope.parent.parent, &format!("_Counts({})", &delta_binding.source_attribute));
817
818                                                                            shutdown_handle.add_button(shutdown);
819
820                                                                            arranged
821                                                                        });
822
823                                                                    let frontier: Vec<T> = count.trace.advance_frontier().to_vec();
824                                                                    let neu = is_neu;
825
826                                                                    count
827                                                                        .enter_at(&scope.parent, move |_, _, t| {
828                                                                            let mut forwarded = t.clone();
829                                                                            forwarded.advance_by(&frontier);
830                                                                            Product::new(forwarded, 0)
831                                                                        })
832                                                                        .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
833                                                                };
834;
835                                                                let propose = {
836                                                                    let propose = reverse_proposes
837                                                                        .entry(other.source_attribute.to_string())
838                                                                        .or_insert_with(|| {
839                                                                            let (arranged, shutdown) =
840                                                                                context.reverse_propose(&other.source_attribute)
841                                                                                .expect("reverse propose doesn't exist")
842                                                                                .import_core(&scope.parent.parent, &format!("_Propose({})", &delta_binding.source_attribute));
843
844                                                                            shutdown_handle.add_button(shutdown);
845
846                                                                            arranged
847                                                                        });
848
849                                                                    let frontier: Vec<T> = propose.trace.advance_frontier().to_vec();
850                                                                    let neu = is_neu;
851
852                                                                    propose
853                                                                        .enter_at(&scope.parent, move |_, _, t| {
854                                                                            let mut forwarded = t.clone();
855                                                                            forwarded.advance_by(&frontier);
856                                                                            Product::new(forwarded, 0)
857                                                                        })
858                                                                        .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
859                                                                };
860
861                                                                let validate = {
862                                                                    let validate = reverse_validates
863                                                                        .entry(other.source_attribute.to_string())
864                                                                        .or_insert_with(|| {
865                                                                            let (arranged, shutdown) =
866                                                                                context.reverse_validate(&other.source_attribute)
867                                                                                .expect("reverse validate doesn't exist")
868                                                                                .import_core(&scope.parent.parent, &format!("_Validate({})", &delta_binding.source_attribute));
869
870                                                                            shutdown_handle.add_button(shutdown);
871
872                                                                            arranged
873                                                                        });
874
875                                                                    let frontier: Vec<T> = validate.trace.advance_frontier().to_vec();
876                                                                    let neu = is_neu;
877
878                                                                    validate
879                                                                        .enter_at(&scope.parent, move |_, _, t| {
880                                                                            let mut forwarded = t.clone();
881                                                                            forwarded.advance_by(&frontier);
882                                                                            Product::new(forwarded, 0)
883                                                                        })
884                                                                        .enter_at(&scope, move |_,_,t| AltNeu { time: t.clone(), neu })
885                                                                };
886
887                                                                extenders.push(
888                                                                    Box::new(CollectionExtender {
889                                                                        phantom: std::marker::PhantomData,
890                                                                        count,
891                                                                        propose,
892                                                                        validate,
893                                                                        key_selector: Rc::new(move |prefix: &Vec<Value>| prefix.index(offset)),
894                                                                    })
895                                                                );
896                                                            },
897                                                        }
898                                                    }
899                                                }
900                                            }
901                                        }
902
903                                        prefix.push(*target);
904
905                                        // @TODO impl ProposeExtensionMethod for Arranged
906                                        let extended = source.extend(&mut extenders[..]);
907
908                                        if logger.is_some() {
909                                            let worker_index = scope.index();
910                                            let source_attribute = delta_binding.source_attribute.to_string();
911                                            extended
912                                                // .inspect(move |x| { println!("{} extended: {:?}", source_attribute, x); })
913                                                .map(|_| ())
914                                                .consolidate()
915                                                .count()
916                                                .map(move |(_, count)| (Value::Eid(worker_index as u64), Value::Number(count as i64)))
917                                                .leave()
918                                                .leave()
919                                                .inspect(move |x| { println!("{}: {:?}", source_attribute, x); });
920                                        }
921
922                                        source = extended
923                                            .map(|(tuple,v)| {
924                                                let mut out = Vec::with_capacity(tuple.len() + 1);
925                                                out.append(&mut tuple.clone());
926                                                out.push(v);
927
928                                                out
929                                            })
930                                    }
931                                }
932                            }
933
934                            if self.variables == prefix {
935                                Some(source.inner)
936                            } else {
937                                let target_variables = self.variables.clone();
938
939                                Some(source
940                                     .map(move |tuple| {
941                                         target_variables.iter()
942                                             .flat_map(|x| Some(tuple.index(AsBinding::binds(&prefix, *x).unwrap())))
943                                             .collect()
944                                     })
945                                     .inner)
946                            }
947                        }
948                        _ => None
949                    });
950
951                (inner.concatenate(changes).as_collection().leave(), shutdown_handle)
952            });
953
954            let relation = CollectionRelation {
955                variables: self.variables.clone(),
956                tuples: joined,
957            };
958
959            (Implemented::Collection(relation), shutdown_handle)
960        }
961    }
962}
963
964//
965// GENERIC IMPLEMENTATION
966//
967
968trait ProposeExtensionMethod<'a, S: Scope, P: ExchangeData + Ord> {
969    fn extend<E: ExchangeData + Ord>(
970        &self,
971        extenders: &mut [Extender<'a, S, P, E>],
972    ) -> Collection<S, (P, E)>;
973}
974
975impl<'a, S: Scope, P: ExchangeData + Ord> ProposeExtensionMethod<'a, S, P> for Collection<S, P> {
976    fn extend<E: ExchangeData + Ord>(
977        &self,
978        extenders: &mut [Extender<'a, S, P, E>],
979    ) -> Collection<S, (P, E)> {
980        if extenders.is_empty() {
981            // @TODO don't panic
982            panic!("No extenders specified.");
983        } else if extenders.len() == 1 {
984            extenders[0].propose(&self.clone())
985        } else {
986            let mut counts = self.map(|p| (p, 1 << 31, 0));
987            for (index, extender) in extenders.iter_mut().enumerate() {
988                if let Some(new_counts) = extender.count(&counts, index) {
989                    counts = new_counts;
990                }
991            }
992
993            let parts = counts
994                .inner
995                .partition(extenders.len() as u64, |((p, _, i), t, d)| {
996                    (i as u64, (p, t, d))
997                });
998
999            let mut results = Vec::new();
1000            for (index, nominations) in parts.into_iter().enumerate() {
1001                let mut extensions = extenders[index].propose(&nominations.as_collection());
1002                for other in (0..extenders.len()).filter(|&x| x != index) {
1003                    extensions = extenders[other].validate(&extensions);
1004                }
1005
1006                results.push(extensions.inner); // save extensions
1007            }
1008
1009            self.scope().concatenate(results).as_collection()
1010        }
1011    }
1012}
1013
1014struct ConstantExtender<P, V>
1015where
1016    V: ExchangeData + Hash,
1017{
1018    phantom: std::marker::PhantomData<P>,
1019    value: V,
1020}
1021
1022impl<'a, S, V, P> PrefixExtender<S> for ConstantExtender<P, V>
1023where
1024    S: Scope,
1025    S::Timestamp: Lattice + ExchangeData,
1026    V: ExchangeData + Hash,
1027    P: ExchangeData,
1028{
1029    type Prefix = P;
1030    type Extension = V;
1031
1032    fn count(
1033        &mut self,
1034        prefixes: &Collection<S, (P, usize, usize)>,
1035        index: usize,
1036    ) -> Option<Collection<S, (P, usize, usize)>> {
1037        Some(prefixes.map(move |(prefix, old_count, old_index)| {
1038            if 1 < old_count {
1039                (prefix.clone(), 1, index)
1040            } else {
1041                (prefix.clone(), old_count, old_index)
1042            }
1043        }))
1044    }
1045
1046    fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1047        let value = self.value.clone();
1048        prefixes.map(move |prefix| (prefix.clone(), value.clone()))
1049    }
1050
1051    fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1052        let target = self.value.clone();
1053        extensions.filter(move |(_prefix, extension)| *extension == target)
1054    }
1055}
1056
1057struct BinaryPredicateExtender<P, V>
1058where
1059    V: ExchangeData + Hash,
1060{
1061    phantom: std::marker::PhantomData<(P, V)>,
1062    predicate: BinaryPredicate,
1063    direction: Direction,
1064}
1065
1066impl<'a, S, V, P> PrefixExtender<S> for BinaryPredicateExtender<P, V>
1067where
1068    S: Scope,
1069    S::Timestamp: Lattice + ExchangeData,
1070    V: ExchangeData + Hash,
1071    P: ExchangeData + IndexNode<V>,
1072{
1073    type Prefix = P;
1074    type Extension = V;
1075
1076    fn count(
1077        &mut self,
1078        _prefixes: &Collection<S, (P, usize, usize)>,
1079        _index: usize,
1080    ) -> Option<Collection<S, (P, usize, usize)>> {
1081        None
1082    }
1083
1084    fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1085        prefixes.map(|_prefix| panic!("BinaryPredicateExtender should never be asked to propose."))
1086    }
1087
1088    fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1089        use self::BinaryPredicate::{EQ, GT, GTE, LT, LTE, NEQ};
1090        match self.direction {
1091            Direction::Reverse(offset) => {
1092                match self.predicate {
1093                    LT => extensions
1094                        .filter(move |(prefix, extension)| *extension > prefix.index(offset)),
1095                    LTE => extensions
1096                        .filter(move |(prefix, extension)| *extension >= prefix.index(offset)),
1097                    GT => extensions
1098                        .filter(move |(prefix, extension)| *extension < prefix.index(offset)),
1099                    GTE => extensions
1100                        .filter(move |(prefix, extension)| *extension <= prefix.index(offset)),
1101                    EQ => extensions
1102                        .filter(move |(prefix, extension)| *extension == prefix.index(offset)),
1103                    NEQ => extensions
1104                        .filter(move |(prefix, extension)| *extension != prefix.index(offset)),
1105                }
1106            }
1107            Direction::Forward(offset) => {
1108                match self.predicate {
1109                    LT => extensions
1110                        .filter(move |(prefix, extension)| *extension < prefix.index(offset)),
1111                    LTE => extensions
1112                        .filter(move |(prefix, extension)| *extension <= prefix.index(offset)),
1113                    GT => extensions
1114                        .filter(move |(prefix, extension)| *extension > prefix.index(offset)),
1115                    GTE => extensions
1116                        .filter(move |(prefix, extension)| *extension >= prefix.index(offset)),
1117                    EQ => extensions
1118                        .filter(move |(prefix, extension)| *extension == prefix.index(offset)),
1119                    NEQ => extensions
1120                        .filter(move |(prefix, extension)| *extension != prefix.index(offset)),
1121                }
1122            }
1123        }
1124    }
1125}
1126
1127struct CollectionExtender<S, K, V, P, F, TrCount, TrPropose, TrValidate>
1128where
1129    S: Scope,
1130    S::Timestamp: Lattice + ExchangeData,
1131    K: ExchangeData,
1132    V: ExchangeData,
1133    F: Fn(&P) -> K,
1134    TrCount: TraceReader<Key = K, Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1135    TrCount::Batch: BatchReader<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1136    TrCount::Cursor: Cursor<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1137    TrPropose: TraceReader<Key = K, Val = V, Time = S::Timestamp, R = isize> + Clone + 'static,
1138    TrPropose::Batch:
1139        BatchReader<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1140    TrPropose::Cursor: Cursor<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1141    TrValidate:
1142        TraceReader<Key = (K, V), Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1143    TrValidate::Batch:
1144        BatchReader<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1145    TrValidate::Cursor:
1146        Cursor<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1147{
1148    phantom: std::marker::PhantomData<P>,
1149    count: Arranged<S, TrCount>,
1150    propose: Arranged<S, TrPropose>,
1151    validate: Arranged<S, TrValidate>,
1152    key_selector: Rc<F>,
1153}
1154
1155impl<'a, S, K, V, P, F, TrCount, TrPropose, TrValidate> PrefixExtender<S>
1156    for CollectionExtender<S, K, V, P, F, TrCount, TrPropose, TrValidate>
1157where
1158    S: Scope,
1159    S::Timestamp: Lattice + ExchangeData,
1160    K: ExchangeData + Hash,
1161    V: ExchangeData + Hash,
1162    P: ExchangeData,
1163    F: Fn(&P) -> K + 'static,
1164    TrCount: TraceReader<Key = K, Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1165    TrCount::Batch: BatchReader<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1166    TrCount::Cursor: Cursor<TrCount::Key, TrCount::Val, S::Timestamp, TrCount::R> + 'static,
1167    TrPropose: TraceReader<Key = K, Val = V, Time = S::Timestamp, R = isize> + Clone + 'static,
1168    TrPropose::Batch:
1169        BatchReader<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1170    TrPropose::Cursor: Cursor<TrPropose::Key, TrPropose::Val, S::Timestamp, TrPropose::R> + 'static,
1171    TrValidate:
1172        TraceReader<Key = (K, V), Val = (), Time = S::Timestamp, R = isize> + Clone + 'static,
1173    TrValidate::Batch:
1174        BatchReader<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1175    TrValidate::Cursor:
1176        Cursor<TrValidate::Key, TrValidate::Val, S::Timestamp, TrValidate::R> + 'static,
1177{
1178    type Prefix = P;
1179    type Extension = V;
1180
1181    fn count(
1182        &mut self,
1183        prefixes: &Collection<S, (P, usize, usize)>,
1184        index: usize,
1185    ) -> Option<Collection<S, (P, usize, usize)>> {
1186        // This method takes a stream of `(prefix, time, diff)`
1187        // changes, and we want to produce the corresponding stream of
1188        // `((prefix, count), time, diff)` changes, just by looking up
1189        // `count` in `count_trace`. We are just doing a stream of
1190        // changes and a stream of look-ups, no consolidation or any
1191        // funny business like that. We *could* organize the input
1192        // differences by key and save some time, or we could skip
1193        // that.
1194
1195        let counts = &self.count;
1196        let mut counts_trace = Some(counts.trace.clone());
1197
1198        let mut stash = HashMap::new();
1199        let logic1 = self.key_selector.clone();
1200        let logic2 = self.key_selector.clone();
1201
1202        let exchange = Exchange::new(move |update: &((P, usize, usize), S::Timestamp, isize)| {
1203            logic1(&(update.0).0).hashed().as_u64()
1204        });
1205
1206        let mut buffer1 = Vec::new();
1207        let mut buffer2 = Vec::new();
1208
1209        // TODO: This should be a custom operator with no connection from the second input to the output.
1210        Some(
1211            prefixes
1212                .inner
1213                .binary_frontier(&counts.stream, exchange, Pipeline, "Count", move |_, _| {
1214                    move |input1, input2, output| {
1215                        // drain the first input, stashing requests.
1216                        input1.for_each(|capability, data| {
1217                            data.swap(&mut buffer1);
1218                            stash
1219                                .entry(capability.retain())
1220                                .or_insert_with(Vec::new)
1221                                .extend(buffer1.drain(..))
1222                        });
1223
1224                        // advance the `distinguish_since` frontier to allow all merges.
1225                        input2.for_each(|_, batches| {
1226                            batches.swap(&mut buffer2);
1227                            for batch in buffer2.drain(..) {
1228                                if let Some(ref mut trace) = counts_trace {
1229                                    trace.distinguish_since(batch.upper());
1230                                }
1231                            }
1232                        });
1233
1234                        if let Some(ref mut trace) = counts_trace {
1235                            for (capability, prefixes) in stash.iter_mut() {
1236                                // defer requests at incomplete times.
1237                                // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
1238                                if !input2.frontier.less_equal(capability.time()) {
1239                                    let mut session = output.session(capability);
1240
1241                                    // sort requests for in-order cursor traversal. could consolidate?
1242                                    prefixes
1243                                        .sort_by(|x, y| logic2(&(x.0).0).cmp(&logic2(&(y.0).0)));
1244
1245                                    let (mut cursor, storage) = trace.cursor();
1246
1247                                    for &mut (
1248                                        (ref prefix, old_count, old_index),
1249                                        ref time,
1250                                        ref mut diff,
1251                                    ) in prefixes.iter_mut()
1252                                    {
1253                                        if !input2.frontier.less_equal(time) {
1254                                            let key = logic2(prefix);
1255                                            cursor.seek_key(&storage, &key);
1256                                            if cursor.get_key(&storage) == Some(&key) {
1257                                                let mut count = 0;
1258                                                cursor.map_times(&storage, |t, d| {
1259                                                    if t.less_equal(time) {
1260                                                        count += d;
1261                                                    }
1262                                                });
1263                                                // assert!(count >= 0);
1264                                                let count = count as usize;
1265                                                if count > 0 {
1266                                                    if count < old_count {
1267                                                        session.give((
1268                                                            (prefix.clone(), count, index),
1269                                                            time.clone(),
1270                                                            *diff,
1271                                                        ));
1272                                                    } else {
1273                                                        session.give((
1274                                                            (prefix.clone(), old_count, old_index),
1275                                                            time.clone(),
1276                                                            *diff,
1277                                                        ));
1278                                                    }
1279                                                }
1280                                            }
1281                                            *diff = 0;
1282                                        }
1283                                    }
1284
1285                                    prefixes.retain(|ptd| ptd.2 != 0);
1286                                }
1287                            }
1288                        }
1289
1290                        // drop fully processed capabilities.
1291                        stash.retain(|_, prefixes| !prefixes.is_empty());
1292
1293                        // advance the consolidation frontier (TODO: wierd lexicographic times!)
1294                        if let Some(trace) = counts_trace.as_mut() {
1295                            trace.advance_by(&input1.frontier().frontier());
1296                        }
1297
1298                        if input1.frontier().is_empty() && stash.is_empty() {
1299                            counts_trace = None;
1300                        }
1301                    }
1302                })
1303                .as_collection(),
1304        )
1305    }
1306
1307    fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1308        let propose = &self.propose;
1309        let mut propose_trace = Some(propose.trace.clone());
1310
1311        let mut stash = HashMap::new();
1312        let logic1 = self.key_selector.clone();
1313        let logic2 = self.key_selector.clone();
1314
1315        let mut buffer1 = Vec::new();
1316        let mut buffer2 = Vec::new();
1317
1318        let exchange = Exchange::new(move |update: &(P, S::Timestamp, isize)| {
1319            logic1(&update.0).hashed().as_u64()
1320        });
1321
1322        prefixes
1323            .inner
1324            .binary_frontier(
1325                &propose.stream,
1326                exchange,
1327                Pipeline,
1328                "Propose",
1329                move |_, _| {
1330                    move |input1, input2, output| {
1331                        // drain the first input, stashing requests.
1332                        input1.for_each(|capability, data| {
1333                            data.swap(&mut buffer1);
1334                            stash
1335                                .entry(capability.retain())
1336                                .or_insert_with(Vec::new)
1337                                .extend(buffer1.drain(..))
1338                        });
1339
1340                        // advance the `distinguish_since` frontier to allow all merges.
1341                        input2.for_each(|_, batches| {
1342                            batches.swap(&mut buffer2);
1343                            for batch in buffer2.drain(..) {
1344                                if let Some(ref mut trace) = propose_trace {
1345                                    trace.distinguish_since(batch.upper());
1346                                }
1347                            }
1348                        });
1349
1350                        if let Some(ref mut trace) = propose_trace {
1351                            for (capability, prefixes) in stash.iter_mut() {
1352                                // defer requests at incomplete times.
1353                                // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
1354                                if !input2.frontier.less_equal(capability.time()) {
1355                                    let mut session = output.session(capability);
1356
1357                                    // sort requests for in-order cursor traversal. could consolidate?
1358                                    prefixes.sort_by(|x, y| logic2(&x.0).cmp(&logic2(&y.0)));
1359
1360                                    let (mut cursor, storage) = trace.cursor();
1361
1362                                    for &mut (ref prefix, ref time, ref mut diff) in
1363                                        prefixes.iter_mut()
1364                                    {
1365                                        if !input2.frontier.less_equal(time) {
1366                                            let key = logic2(prefix);
1367                                            cursor.seek_key(&storage, &key);
1368                                            if cursor.get_key(&storage) == Some(&key) {
1369                                                while let Some(value) = cursor.get_val(&storage) {
1370                                                    let mut count = 0;
1371                                                    cursor.map_times(&storage, |t, d| {
1372                                                        if t.less_equal(time) {
1373                                                            count += d;
1374                                                        }
1375                                                    });
1376                                                    // assert!(count >= 0);
1377                                                    if count > 0 {
1378                                                        session.give((
1379                                                            (prefix.clone(), value.clone()),
1380                                                            time.clone(),
1381                                                            *diff,
1382                                                        ));
1383                                                    }
1384                                                    cursor.step_val(&storage);
1385                                                }
1386                                                cursor.rewind_vals(&storage);
1387                                            }
1388                                            *diff = 0;
1389                                        }
1390                                    }
1391
1392                                    prefixes.retain(|ptd| ptd.2 != 0);
1393                                }
1394                            }
1395                        }
1396
1397                        // drop fully processed capabilities.
1398                        stash.retain(|_, prefixes| !prefixes.is_empty());
1399
1400                        // advance the consolidation frontier (TODO: wierd lexicographic times!)
1401                        if let Some(trace) = propose_trace.as_mut() {
1402                            trace.advance_by(&input1.frontier().frontier());
1403                        }
1404
1405                        if input1.frontier().is_empty() && stash.is_empty() {
1406                            propose_trace = None;
1407                        }
1408                    }
1409                },
1410            )
1411            .as_collection()
1412    }
1413
1414    fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1415        // This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
1416        // stream of `((prefix, count), time, diff)` changes, just by looking up `count` in `count_trace`. We are
1417        // just doing a stream of changes and a stream of look-ups, no consolidation or any funny business like
1418        // that. We *could* organize the input differences by key and save some time, or we could skip that.
1419
1420        let validate = &self.validate;
1421        let mut validate_trace = Some(validate.trace.clone());
1422
1423        let mut stash = HashMap::new();
1424        let logic1 = self.key_selector.clone();
1425        let logic2 = self.key_selector.clone();
1426
1427        let mut buffer1 = Vec::new();
1428        let mut buffer2 = Vec::new();
1429
1430        let exchange = Exchange::new(move |update: &((P, V), S::Timestamp, isize)| {
1431            (logic1(&(update.0).0).clone(), ((update.0).1).clone())
1432                .hashed()
1433                .as_u64()
1434        });
1435
1436        extensions
1437            .inner
1438            .binary_frontier(
1439                &validate.stream,
1440                exchange,
1441                Pipeline,
1442                "Validate",
1443                move |_, _| {
1444                    move |input1, input2, output| {
1445                        // drain the first input, stashing requests.
1446                        input1.for_each(|capability, data| {
1447                            data.swap(&mut buffer1);
1448                            stash
1449                                .entry(capability.retain())
1450                                .or_insert_with(Vec::new)
1451                                .extend(buffer1.drain(..))
1452                        });
1453
1454                        // advance the `distinguish_since` frontier to allow all merges.
1455                        input2.for_each(|_, batches| {
1456                            batches.swap(&mut buffer2);
1457                            for batch in buffer2.drain(..) {
1458                                if let Some(ref mut trace) = validate_trace {
1459                                    trace.distinguish_since(batch.upper());
1460                                }
1461                            }
1462                        });
1463
1464                        if let Some(ref mut trace) = validate_trace {
1465                            for (capability, prefixes) in stash.iter_mut() {
1466                                // defer requests at incomplete times.
1467                                // NOTE: not all updates may be at complete times, but if this test fails then none of them are.
1468                                if !input2.frontier.less_equal(capability.time()) {
1469                                    let mut session = output.session(capability);
1470
1471                                    // sort requests for in-order cursor traversal. could consolidate?
1472                                    prefixes.sort_by(|x, y| {
1473                                        (logic2(&(x.0).0), &((x.0).1))
1474                                            .cmp(&(logic2(&(y.0).0), &((y.0).1)))
1475                                    });
1476
1477                                    let (mut cursor, storage) = trace.cursor();
1478
1479                                    for &mut (ref prefix, ref time, ref mut diff) in
1480                                        prefixes.iter_mut()
1481                                    {
1482                                        if !input2.frontier.less_equal(time) {
1483                                            let key = (logic2(&prefix.0), (prefix.1).clone());
1484                                            cursor.seek_key(&storage, &key);
1485                                            if cursor.get_key(&storage) == Some(&key) {
1486                                                let mut count = 0;
1487                                                cursor.map_times(&storage, |t, d| {
1488                                                    if t.less_equal(time) {
1489                                                        count += d;
1490                                                    }
1491                                                });
1492                                                // assert!(count >= 0);
1493                                                if count > 0 {
1494                                                    session.give((
1495                                                        prefix.clone(),
1496                                                        time.clone(),
1497                                                        *diff,
1498                                                    ));
1499                                                }
1500                                            }
1501                                            *diff = 0;
1502                                        }
1503                                    }
1504
1505                                    prefixes.retain(|ptd| ptd.2 != 0);
1506                                }
1507                            }
1508                        }
1509
1510                        // drop fully processed capabilities.
1511                        stash.retain(|_, prefixes| !prefixes.is_empty());
1512
1513                        // advance the consolidation frontier (TODO: wierd lexicographic times!)
1514                        if let Some(trace) = validate_trace.as_mut() {
1515                            trace.advance_by(&input1.frontier().frontier());
1516                        }
1517
1518                        if input1.frontier().is_empty() && stash.is_empty() {
1519                            validate_trace = None;
1520                        }
1521                    }
1522                },
1523            )
1524            .as_collection()
1525    }
1526}
1527
1528struct AntijoinExtender<'a, S, V, P>
1529where
1530    S: Scope,
1531    S::Timestamp: Lattice + ExchangeData,
1532    V: ExchangeData,
1533{
1534    phantom: std::marker::PhantomData<P>,
1535    extender: Extender<'a, S, P, V>,
1536}
1537
1538impl<'a, S, V, P> PrefixExtender<S> for AntijoinExtender<'a, S, V, P>
1539where
1540    S: Scope,
1541    S::Timestamp: Lattice + ExchangeData,
1542    V: ExchangeData + Hash,
1543    P: ExchangeData,
1544{
1545    type Prefix = P;
1546    type Extension = V;
1547
1548    fn count(
1549        &mut self,
1550        _prefixes: &Collection<S, (P, usize, usize)>,
1551        _index: usize,
1552    ) -> Option<Collection<S, (P, usize, usize)>> {
1553        None
1554    }
1555
1556    fn propose(&mut self, prefixes: &Collection<S, P>) -> Collection<S, (P, V)> {
1557        prefixes.map(|_prefix| panic!("AntijoinExtender should never be asked to propose."))
1558    }
1559
1560    fn validate(&mut self, extensions: &Collection<S, (P, V)>) -> Collection<S, (P, V)> {
1561        extensions.concat(&self.extender.validate(extensions).negate())
1562    }
1563}