1use timely::dataflow::operators::{Enter, Map};
21use timely::order::PartialOrder;
22use timely::dataflow::{Scope, Stream, StreamCore};
23use timely::dataflow::operators::generic::Operator;
24use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange};
25use timely::progress::Timestamp;
26use timely::progress::Antichain;
27use timely::dataflow::operators::Capability;
28
29use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
30use crate::difference::Semigroup;
31use crate::lattice::Lattice;
32use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
33use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
34
35use trace::wrappers::enter::{TraceEnter, BatchEnter,};
36use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
37use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
38use trace::wrappers::filter::{TraceFilter, BatchFilter};
39
40use super::TraceAgent;
41
42pub struct Arranged<G, Tr>
47where
48 G: Scope<Timestamp: Lattice+Ord>,
49 Tr: TraceReader+Clone,
50{
51 pub stream: Stream<G, Tr::Batch>,
57 pub trace: Tr,
59 }
62
63impl<G, Tr> Clone for Arranged<G, Tr>
64where
65 G: Scope<Timestamp=Tr::Time>,
66 Tr: TraceReader + Clone,
67{
68 fn clone(&self) -> Self {
69 Arranged {
70 stream: self.stream.clone(),
71 trace: self.trace.clone(),
72 }
73 }
74}
75
76use ::timely::dataflow::scopes::Child;
77use ::timely::progress::timestamp::Refines;
78use timely::Container;
79use timely::container::PushInto;
80
81impl<G, Tr> Arranged<G, Tr>
82where
83 G: Scope<Timestamp=Tr::Time>,
84 Tr: TraceReader + Clone,
85{
86 pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
92 -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
93 where
94 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone,
95 {
96 Arranged {
97 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
98 trace: TraceEnter::make_from(self.trace.clone()),
99 }
100 }
101
102 pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
107 -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
108 Arranged {
109 stream: self.stream.enter(child),
110 trace: self.trace.clone(),
111 }
112 }
113
114 pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
120 -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
121 where
122 TInner: Refines<G::Timestamp>+Lattice+Timestamp+Clone+'static,
123 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>)->TInner+Clone+'static,
124 P: FnMut(&TInner)->Tr::Time+Clone+'static,
125 {
126 let logic1 = logic.clone();
127 let logic2 = logic.clone();
128 Arranged {
129 trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
130 stream: self.stream.enter(child).map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
131 }
132 }
133
134 pub fn filter<F>(&self, logic: F)
161 -> Arranged<G, TraceFilter<Tr, F>>
162 where
163 F: FnMut(Tr::Key<'_>, Tr::Val<'_>)->bool+Clone+'static,
164 {
165 let logic1 = logic.clone();
166 let logic2 = logic.clone();
167 Arranged {
168 trace: TraceFilter::make_from(self.trace.clone(), logic1),
169 stream: self.stream.map(move |bw| BatchFilter::make_from(bw, logic2.clone())),
170 }
171 }
172 pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::Diff>
178 where
179 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D+'static,
180 {
181 self.flat_map_ref(move |key, val| Some(logic(key,val)))
182 }
183
184 pub fn flat_map_ref<I, L>(&self, logic: L) -> Collection<G, I::Item, Tr::Diff>
189 where
190 I: IntoIterator<Item: Data>,
191 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
192 {
193 Self::flat_map_batches(&self.stream, logic)
194 }
195
196 pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::Diff>
204 where
205 I: IntoIterator<Item: Data>,
206 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I+'static,
207 {
208 stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
209 input.for_each(|time, data| {
210 let mut session = output.session(&time);
211 for wrapper in data.iter() {
212 let batch = &wrapper;
213 let mut cursor = batch.cursor();
214 while let Some(key) = cursor.get_key(batch) {
215 while let Some(val) = cursor.get_val(batch) {
216 for datum in logic(key, val) {
217 cursor.map_times(batch, |time, diff| {
218 session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
219 });
220 }
221 cursor.step_val(batch);
222 }
223 cursor.step_key(batch);
224 }
225 }
226 });
227 })
228 .as_collection()
229 }
230}
231
232
233use crate::difference::Multiply;
234impl<G, T1> Arranged<G, T1>
236where
237 G: Scope<Timestamp=T1::Time>,
238 T1: TraceReader + Clone + 'static,
239{
240 pub fn join_core<T2,I,L>(&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<T2::Diff>>::Output>
242 where
243 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=T1::Time>+Clone+'static,
244 T1::Diff: Multiply<T2::Diff, Output: Semigroup+'static>,
245 I: IntoIterator<Item: Data>,
246 L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>)->I+'static
247 {
248 let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: T2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &T2::Diff| {
249 let t = t.clone();
250 let r = (r1.clone()).multiply(r2);
251 result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
252 };
253 self.join_core_internal_unsafe(other, result)
254 }
255 pub fn join_core_internal_unsafe<T2,I,L,D,ROut> (&self, other: &Arranged<G,T2>, mut result: L) -> Collection<G,D,ROut>
257 where
258 T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=T1::Time>+Clone+'static,
259 D: Data,
260 ROut: Semigroup+'static,
261 I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
262 L: FnMut(T1::Key<'_>, T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
263 {
264 use crate::operators::join::join_traces;
265 join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
266 self,
267 other,
268 move |k, v1, v2, t, d1, d2, c| {
269 for datum in result(k, v1, v2, t, d1, d2) {
270 c.give(datum);
271 }
272 }
273 )
274 .as_collection()
275 }
276}
277
278use crate::difference::Abelian;
280impl<G, T1> Arranged<G, T1>
281where
282 G: Scope<Timestamp = T1::Time>,
283 T1: TraceReader + Clone + 'static,
284{
285 pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
287 where
288 T1: TraceReader<KeyOwn: Ord>,
289 T2: for<'a> Trace<
290 Key<'a>= T1::Key<'a>,
291 KeyOwn=T1::KeyOwn,
292 ValOwn: Data,
293 Time=T1::Time,
294 Diff: Abelian,
295 >+'static,
296 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
297 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
298 {
299 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
300 if !input.is_empty() {
301 logic(key, input, change);
302 }
303 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
304 crate::consolidation::consolidate(change);
305 })
306 }
307
308 pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
310 where
311 T1: TraceReader<KeyOwn: Ord>,
312 T2: for<'a> Trace<
313 Key<'a>=T1::Key<'a>,
314 KeyOwn=T1::KeyOwn,
315 ValOwn: Data,
316 Time=T1::Time,
317 >+'static,
318 Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
320 {
321 use crate::operators::reduce::reduce_trace;
322 reduce_trace::<_,_,Bu,_,_>(self, name, logic)
323 }
324}
325
326
327impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
328where
329 G: Scope<Timestamp=Tr::Time>,
330 Tr: TraceReader + Clone,
331{
332 pub fn leave_region(&self) -> Arranged<G, Tr> {
337 use timely::dataflow::operators::Leave;
338 Arranged {
339 stream: self.stream.leave(),
340 trace: self.trace.clone(),
341 }
342 }
343}
344
345pub trait Arrange<G, C>
347where
348 G: Scope<Timestamp: Lattice>,
349{
350 fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
352 where
353 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
354 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
355 Tr: Trace<Time=G::Timestamp> + 'static,
356 {
357 self.arrange_named::<Ba, Bu, Tr>("Arrange")
358 }
359
360 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
362 where
363 Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
364 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
365 Tr: Trace<Time=G::Timestamp> + 'static,
366 ;
367}
368
369impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
370where
371 G: Scope<Timestamp: Lattice>,
372 K: ExchangeData + Hashable,
373 V: ExchangeData,
374 R: ExchangeData + Semigroup,
375{
376 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
377 where
378 Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
379 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
380 Tr: Trace<Time=G::Timestamp> + 'static,
381 {
382 let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
383 arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
384 }
385}
386
387pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
393where
394 G: Scope<Timestamp: Lattice>,
395 P: ParallelizationContract<G::Timestamp, Ba::Input>,
396 Ba: Batcher<Time=G::Timestamp,Input: Container + Clone + 'static> + 'static,
397 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
398 Tr: Trace<Time=G::Timestamp>+'static,
399{
400 let mut reader: Option<TraceAgent<Tr>> = None;
416
417 let reader_ref = &mut reader;
419 let scope = stream.scope();
420
421 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
422
423 let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
425
426 let mut batcher = Ba::new(logger.clone(), info.global_id);
428
429 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
431
432 let activator = Some(scope.activator_for(info.address.clone()));
433 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
434 if let Some(exert_logic) = scope.config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
436 empty_trace.set_exert_logic(exert_logic);
437 }
438
439 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
440
441 *reader_ref = Some(reader_local);
442
443 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
445
446 move |input, output| {
447
448 input.for_each(|cap, data| {
453 capabilities.insert(cap.retain());
454 batcher.push_container(data);
455 });
456
457 assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier()));
464
465 if prev_frontier.borrow() != input.frontier().frontier() {
469 if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
483
484 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
488
489 if !input.frontier().less_equal(capability.time()) {
490
491 upper.clear();
495 for time in input.frontier().frontier().iter() {
496 upper.insert(time.clone());
497 }
498 for other_capability in &capabilities.elements()[(index + 1) .. ] {
499 upper.insert(other_capability.time().clone());
500 }
501
502 let batch = batcher.seal::<Bu>(upper.clone());
504
505 writer.insert(batch.clone(), Some(capability.time().clone()));
506
507 output.session(&capabilities.elements()[index]).give(batch);
509 }
510 }
511
512 let mut new_capabilities = Antichain::new();
518 for time in batcher.frontier().iter() {
519 if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
520 new_capabilities.insert(capability.delayed(time));
521 }
522 else {
523 panic!("failed to find capability");
524 }
525 }
526
527 capabilities = new_capabilities;
528 }
529 else {
530 let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
532 writer.seal(input.frontier().frontier().to_owned());
533 }
534
535 prev_frontier.clear();
536 prev_frontier.extend(input.frontier().frontier().iter().cloned());
537 }
538
539 writer.exert();
540 }
541 });
542
543 Arranged { stream, trace: reader.unwrap() }
544}
545
546impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
547where
548 G: Scope<Timestamp: Lattice+Ord>,
549{
550 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
551 where
552 Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
553 Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
554 Tr: Trace<Time=G::Timestamp> + 'static,
555 {
556 let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
557 arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
558 }
559}
560
561pub trait ArrangeByKey<G: Scope, K: Data+Hashable, V: Data, R: Ord+Semigroup+'static>
567where
568 G: Scope<Timestamp: Lattice+Ord>,
569{
570 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
576
577 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
579}
580
581impl<G, K: ExchangeData+Hashable, V: ExchangeData, R: ExchangeData+Semigroup> ArrangeByKey<G, K, V, R> for Collection<G, (K,V), R>
582where
583 G: Scope<Timestamp: Lattice+Ord>,
584{
585 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
586 self.arrange_by_key_named("ArrangeByKey")
587 }
588
589 fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
590 self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
591 }
592}
593
594pub trait ArrangeBySelf<G, K: Data+Hashable, R: Ord+Semigroup+'static>
600where
601 G: Scope<Timestamp: Lattice+Ord>,
602{
603 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
609
610 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
612}
613
614
615impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ArrangeBySelf<G, K, R> for Collection<G, K, R>
616where
617 G: Scope<Timestamp: Lattice+Ord>,
618{
619 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
620 self.arrange_by_self_named("ArrangeBySelf")
621 }
622
623 fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
624 self.map(|k| (k, ()))
625 .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
626 }
627}