1use std::hash::Hash;
2
3use timely::dataflow::Scope;
4use timely::progress::Timestamp;
5use timely::dataflow::operators::vec::Partition;
6use timely::dataflow::operators::Concatenate;
7
8use differential_dataflow::{ExchangeData, VecCollection, AsCollection};
9use differential_dataflow::difference::{Monoid, Multiply};
10use differential_dataflow::lattice::Lattice;
11use differential_dataflow::operators::arrange::TraceAgent;
12
13pub mod altneu;
14pub mod calculus;
15pub mod operators;
16
17pub trait PrefixExtender<G: Scope, R: Monoid+Multiply<Output = R>> {
24 type Prefix;
26 type Extension;
28 fn count(&mut self, prefixes: VecCollection<G, (Self::Prefix, usize, usize), R>, index: usize) -> VecCollection<G, (Self::Prefix, usize, usize), R>;
30 fn propose(&mut self, prefixes: VecCollection<G, Self::Prefix, R>) -> VecCollection<G, (Self::Prefix, Self::Extension), R>;
32 fn validate(&mut self, extensions: VecCollection<G, (Self::Prefix, Self::Extension), R>) -> VecCollection<G, (Self::Prefix, Self::Extension), R>;
34}
35
36pub trait ProposeExtensionMethod<G: Scope, P: ExchangeData+Ord, R: Monoid+Multiply<Output = R>> {
37 fn propose_using<PE: PrefixExtender<G, R, Prefix=P>>(self, extender: &mut PE) -> VecCollection<G, (P, PE::Extension), R>;
38 fn extend<E: ExchangeData+Ord>(self, extenders: &mut [&mut dyn PrefixExtender<G,R,Prefix=P,Extension=E>]) -> VecCollection<G, (P, E), R>;
39}
40
41impl<G, P, R> ProposeExtensionMethod<G, P, R> for VecCollection<G, P, R>
42where
43 G: Scope,
44 P: ExchangeData+Ord,
45 R: Monoid+Multiply<Output = R>+'static,
46{
47 fn propose_using<PE>(self, extender: &mut PE) -> VecCollection<G, (P, PE::Extension), R>
48 where
49 PE: PrefixExtender<G, R, Prefix=P>
50 {
51 extender.propose(self)
52 }
53 fn extend<E>(self, extenders: &mut [&mut dyn PrefixExtender<G,R,Prefix=P,Extension=E>]) -> VecCollection<G, (P, E), R>
54 where
55 E: ExchangeData+Ord
56 {
57
58 if extenders.len() == 1 {
59 extenders[0].propose(self)
60 }
61 else {
62 let mut counts = self.clone().map(|p| (p, 1 << 31, 0));
63 for (index,extender) in extenders.iter_mut().enumerate() {
64 counts = extender.count(counts, index);
65 }
66
67 let parts = counts.inner.partition(extenders.len() as u64, |((p, _, i),t,d)| (i as u64, (p,t,d)));
68
69 let mut results = Vec::new();
70 for (index, nominations) in parts.into_iter().enumerate() {
71 let mut extensions = extenders[index].propose(nominations.as_collection());
72 for other in (0..extenders.len()).filter(|&x| x != index) {
73 extensions = extenders[other].validate(extensions);
74 }
75
76 results.push(extensions.inner); }
78
79 self.scope().concatenate(results).as_collection()
80 }
81 }
82}
83
84pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Multiply<Output = R>, P, E> {
85 fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(self, extender: &mut PE) -> VecCollection<G, (P, E), R>;
86}
87
88impl<G: Scope, R: Monoid+Multiply<Output = R>, P, E> ValidateExtensionMethod<G, R, P, E> for VecCollection<G, (P, E), R> {
89 fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(self, extender: &mut PE) -> VecCollection<G, (P, E), R> {
90 extender.validate(self)
91 }
92}
93
94use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
96type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K,V,T,R>>;
97type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K,T,R>>;
98
99pub struct CollectionIndex<K, V, T, R>
100where
101 K: ExchangeData,
102 V: ExchangeData,
103 T: Lattice+ExchangeData+Timestamp,
104 R: Monoid+Multiply<Output = R>+ExchangeData,
105{
106 count_trace: TraceKeyHandle<K, T, isize>,
108
109 propose_trace: TraceValHandle<K, V, T, R>,
111
112 validate_trace: TraceKeyHandle<(K, V), T, R>,
114}
115
116impl<K, V, T, R> Clone for CollectionIndex<K, V, T, R>
117where
118 K: ExchangeData+Hash,
119 V: ExchangeData+Hash,
120 T: Lattice+ExchangeData+Timestamp,
121 R: Monoid+Multiply<Output = R>+ExchangeData,
122{
123 fn clone(&self) -> Self {
124 CollectionIndex {
125 count_trace: self.count_trace.clone(),
126 propose_trace: self.propose_trace.clone(),
127 validate_trace: self.validate_trace.clone(),
128 }
129 }
130}
131
132impl<K, V, T, R> CollectionIndex<K, V, T, R>
133where
134 K: ExchangeData+Hash,
135 V: ExchangeData+Hash,
136 T: Lattice+ExchangeData+Timestamp,
137 R: Monoid+Multiply<Output = R>+ExchangeData,
138{
139
140 pub fn index<G: Scope<Timestamp = T>>(collection: VecCollection<G, (K, V), R>) -> Self {
141 let arranged = collection.clone().arrange_by_self();
144 let counts = arranged
146 .clone()
147 .as_collection(|k,_v| k.clone())
148 .distinct()
149 .map(|(k, _v)| k)
150 .arrange_by_self()
151 .trace;
152 let propose = collection.arrange_by_key().trace;
153 let validate = arranged.trace;
154
155 CollectionIndex {
156 count_trace: counts,
157 propose_trace: propose,
158 validate_trace: validate,
159 }
160 }
161 pub fn extend_using<P, F: Fn(&P)->K+Clone>(&self, logic: F) -> CollectionExtender<K, V, T, R, P, F> {
162 CollectionExtender {
163 phantom: std::marker::PhantomData,
164 indices: self.clone(),
165 key_selector: logic,
166 }
167 }
168}
169
170pub struct CollectionExtender<K, V, T, R, P, F>
171where
172 K: ExchangeData,
173 V: ExchangeData,
174 T: Lattice+ExchangeData+Timestamp,
175 R: Monoid+Multiply<Output = R>+ExchangeData,
176 F: Fn(&P)->K+Clone,
177{
178 phantom: std::marker::PhantomData<P>,
179 indices: CollectionIndex<K, V, T, R>,
180 key_selector: F,
181}
182
183impl<G, K, V, R, P, F> PrefixExtender<G, R> for CollectionExtender<K, V, G::Timestamp, R, P, F>
184where
185 G: Scope<Timestamp: Lattice+ExchangeData+Hash>,
186 K: ExchangeData+Hash+Default,
187 V: ExchangeData+Hash+Default,
188 P: ExchangeData,
189 R: Monoid+Multiply<Output = R>+ExchangeData,
190 F: Fn(&P)->K+Clone+'static,
191{
192 type Prefix = P;
193 type Extension = V;
194
195 fn count(&mut self, prefixes: VecCollection<G, (P, usize, usize), R>, index: usize) -> VecCollection<G, (P, usize, usize), R> {
196 let counts = self.indices.count_trace.import(&prefixes.scope());
197 operators::count::count(prefixes, counts, self.key_selector.clone(), index)
198 }
199
200 fn propose(&mut self, prefixes: VecCollection<G, P, R>) -> VecCollection<G, (P, V), R> {
201 let propose = self.indices.propose_trace.import(&prefixes.scope());
202 operators::propose::propose(prefixes, propose, self.key_selector.clone())
203 }
204
205 fn validate(&mut self, extensions: VecCollection<G, (P, V), R>) -> VecCollection<G, (P, V), R> {
206 let validate = self.indices.validate_trace.import(&extensions.scope());
207 operators::validate::validate(extensions, validate, self.key_selector.clone())
208 }
209}