1use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
21use timely::dataflow::operators::generic::Operator;
22use timely::dataflow::operators::Capability;
23use timely::dataflow::operators::{Enter, Map};
24use timely::dataflow::{Scope, Stream, StreamCore};
25use timely::order::PartialOrder;
26use timely::progress::Antichain;
27use timely::progress::Timestamp;
28
29use crate::difference::Semigroup;
30use crate::lattice::Lattice;
31use crate::trace::implementations::merge_batcher::container::MergerChunk;
32use crate::trace::implementations::{
33 KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine,
34};
35use crate::trace::{self, BatchReader, Batcher, Builder, Cursor, Trace, TraceReader};
36use crate::{AsCollection, Data, ExchangeData, Hashable, VecCollection};
37
38use trace::wrappers::enter::{BatchEnter, TraceEnter};
39use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
40use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
41
42use super::TraceAgent;
43
44pub struct Arranged<G, Tr>
49where
50 G: Scope<Timestamp: Lattice + Ord>,
51 Tr: TraceReader + Clone,
52{
53 pub stream: Stream<G, Tr::Batch>,
59 pub trace: Tr,
61 }
64
65impl<G, Tr> Clone for Arranged<G, Tr>
66where
67 G: Scope<Timestamp = Tr::Time>,
68 Tr: TraceReader + Clone,
69{
70 fn clone(&self) -> Self {
71 Arranged {
72 stream: self.stream.clone(),
73 trace: self.trace.clone(),
74 }
75 }
76}
77
78use ::timely::dataflow::scopes::Child;
79use ::timely::progress::timestamp::Refines;
80use timely::container::PushInto;
81use timely::Container;
82
83impl<G, Tr> Arranged<G, Tr>
84where
85 G: Scope<Timestamp = Tr::Time>,
86 Tr: TraceReader + Clone,
87{
88 pub fn enter<'a, TInner>(
94 &self,
95 child: &Child<'a, G, TInner>,
96 ) -> Arranged<Child<'a, G, TInner>, TraceEnter<Tr, TInner>>
97 where
98 TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone,
99 {
100 Arranged {
101 stream: self.stream.enter(child).map(|bw| BatchEnter::make_from(bw)),
102 trace: TraceEnter::make_from(self.trace.clone()),
103 }
104 }
105
106 pub fn enter_region<'a>(
111 &self,
112 child: &Child<'a, G, G::Timestamp>,
113 ) -> Arranged<Child<'a, G, G::Timestamp>, Tr> {
114 Arranged {
115 stream: self.stream.enter(child),
116 trace: self.trace.clone(),
117 }
118 }
119
120 pub fn enter_at<'a, TInner, F, P>(
126 &self,
127 child: &Child<'a, G, TInner>,
128 logic: F,
129 prior: P,
130 ) -> Arranged<Child<'a, G, TInner>, TraceEnterAt<Tr, TInner, F, P>>
131 where
132 TInner: Refines<G::Timestamp> + Lattice + Timestamp + Clone + 'static,
133 F: FnMut(Tr::Key<'_>, Tr::Val<'_>, Tr::TimeGat<'_>) -> TInner + Clone + 'static,
134 P: FnMut(&TInner) -> Tr::Time + Clone + 'static,
135 {
136 let logic1 = logic.clone();
137 let logic2 = logic.clone();
138 Arranged {
139 trace: TraceEnterAt::make_from(self.trace.clone(), logic1, prior),
140 stream: self
141 .stream
142 .enter(child)
143 .map(move |bw| BatchEnterAt::make_from(bw, logic2.clone())),
144 }
145 }
146
147 pub fn as_collection<D: Data, L>(&self, mut logic: L) -> VecCollection<G, D, Tr::Diff>
153 where
154 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
155 {
156 self.flat_map_ref(move |key, val| Some(logic(key, val)))
157 }
158
159 pub fn flat_map_ref<I, L>(&self, logic: L) -> VecCollection<G, I::Item, Tr::Diff>
164 where
165 I: IntoIterator<Item: Data>,
166 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,
167 {
168 Self::flat_map_batches(&self.stream, logic)
169 }
170
171 pub fn flat_map_batches<I, L>(
179 stream: &Stream<G, Tr::Batch>,
180 mut logic: L,
181 ) -> VecCollection<G, I::Item, Tr::Diff>
182 where
183 I: IntoIterator<Item: Data>,
184 L: FnMut(Tr::Key<'_>, Tr::Val<'_>) -> I + 'static,
185 {
186 stream
187 .unary(Pipeline, "AsCollection", move |_, _| {
188 move |input, output| {
189 input.for_each(|time, data| {
190 let mut session = output.session(&time);
191 for wrapper in data.iter() {
192 let batch = &wrapper;
193 let mut cursor = batch.cursor();
194 while let Some(key) = cursor.get_key(batch) {
195 while let Some(val) = cursor.get_val(batch) {
196 for datum in logic(key, val) {
197 cursor.map_times(batch, |time, diff| {
198 session.give((
199 datum.clone(),
200 Tr::owned_time(time),
201 Tr::owned_diff(diff),
202 ));
203 });
204 }
205 cursor.step_val(batch);
206 }
207 cursor.step_key(batch);
208 }
209 }
210 });
211 }
212 })
213 .as_collection()
214 }
215}
216
217use crate::difference::Multiply;
218impl<G, T1> Arranged<G, T1>
220where
221 G: Scope<Timestamp = T1::Time>,
222 T1: TraceReader + Clone + 'static,
223{
224 pub fn join_core<T2, I, L>(
226 &self,
227 other: &Arranged<G, T2>,
228 mut result: L,
229 ) -> VecCollection<G, I::Item, <T1::Diff as Multiply<T2::Diff>>::Output>
230 where
231 T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
232 T1::Diff: Multiply<T2::Diff, Output: Semigroup + 'static>,
233 I: IntoIterator<Item: Data>,
234 L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>) -> I + 'static,
235 {
236 let result = move |k: T1::Key<'_>,
237 v1: T1::Val<'_>,
238 v2: T2::Val<'_>,
239 t: &G::Timestamp,
240 r1: &T1::Diff,
241 r2: &T2::Diff| {
242 let t = t.clone();
243 let r = (r1.clone()).multiply(r2);
244 result(k, v1, v2)
245 .into_iter()
246 .map(move |d| (d, t.clone(), r.clone()))
247 };
248 self.join_core_internal_unsafe(other, result)
249 }
250 pub fn join_core_internal_unsafe<T2, I, L, D, ROut>(
252 &self,
253 other: &Arranged<G, T2>,
254 mut result: L,
255 ) -> VecCollection<G, D, ROut>
256 where
257 T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
258 D: Data,
259 ROut: Semigroup + 'static,
260 I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
261 L: FnMut(T1::Key<'_>, T1::Val<'_>, T2::Val<'_>, &G::Timestamp, &T1::Diff, &T2::Diff) -> I
262 + 'static,
263 {
264 use crate::operators::join::join_traces;
265 join_traces::<_, _, _, _, crate::consolidation::ConsolidatingContainerBuilder<_>>(
266 self,
267 other,
268 move |k, v1, v2, t, d1, d2, c| {
269 for datum in result(k, v1, v2, t, d1, d2) {
270 c.give(datum);
271 }
272 },
273 )
274 .as_collection()
275 }
276}
277
278use crate::difference::Abelian;
280impl<G, T1> Arranged<G, T1>
281where
282 G: Scope<Timestamp = T1::Time>,
283 T1: TraceReader + Clone + 'static,
284{
285 pub fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
287 where
288 T1: TraceReader<KeyOwn: Ord>,
289 T2: for<'a> Trace<
290 Key<'a> = T1::Key<'a>,
291 KeyOwn = T1::KeyOwn,
292 ValOwn: Data,
293 Time = T1::Time,
294 Diff: Abelian,
295 > + 'static,
296 Bu: Builder<
297 Time = G::Timestamp,
298 Output = T2::Batch,
299 Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
300 >,
301 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
302 + 'static,
303 {
304 self.reduce_core::<_, Bu, T2>(name, move |key, input, output, change| {
305 if !input.is_empty() {
306 logic(key, input, change);
307 }
308 change.extend(output.drain(..).map(|(x, mut d)| {
309 d.negate();
310 (x, d)
311 }));
312 crate::consolidation::consolidate(change);
313 })
314 }
315
316 pub fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
318 where
319 T1: TraceReader<KeyOwn: Ord>,
320 T2: for<'a> Trace<Key<'a> = T1::Key<'a>, KeyOwn = T1::KeyOwn, ValOwn: Data, Time = T1::Time>
321 + 'static,
322 Bu: Builder<
323 Time = G::Timestamp,
324 Output = T2::Batch,
325 Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
326 >,
327 L: FnMut(
328 T1::Key<'_>,
329 &[(T1::Val<'_>, T1::Diff)],
330 &mut Vec<(T2::ValOwn, T2::Diff)>,
331 &mut Vec<(T2::ValOwn, T2::Diff)>,
332 ) + 'static,
333 {
334 use crate::operators::reduce::reduce_trace;
335 reduce_trace::<_, _, Bu, _, _>(self, name, logic)
336 }
337}
338
339impl<'a, G, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
340where
341 G: Scope<Timestamp = Tr::Time>,
342 Tr: TraceReader + Clone,
343{
344 pub fn leave_region(&self) -> Arranged<G, Tr> {
349 use timely::dataflow::operators::Leave;
350 Arranged {
351 stream: self.stream.leave(),
352 trace: self.trace.clone(),
353 }
354 }
355}
356
357pub trait Arrange<G, C>
359where
360 G: Scope<Timestamp: Lattice>,
361{
362 fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
364 where
365 Ba: Batcher<Input = C, Time = G::Timestamp> + 'static,
366 Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
367 Tr: Trace<Time = G::Timestamp> + 'static,
368 {
369 self.arrange_named::<Ba, Bu, Tr>("Arrange")
370 }
371
372 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
374 where
375 Ba: Batcher<Input = C, Time = G::Timestamp> + 'static,
376 Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
377 Tr: Trace<Time = G::Timestamp> + 'static;
378}
379
380impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for VecCollection<G, (K, V), R>
381where
382 G: Scope<Timestamp: Lattice>,
383 K: ExchangeData + Hashable,
384 V: ExchangeData,
385 R: ExchangeData + Semigroup,
386{
387 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
388 where
389 Ba: Batcher<Input = Vec<((K, V), G::Timestamp, R)>, Time = G::Timestamp> + 'static,
390 Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
391 Tr: Trace<Time = G::Timestamp> + 'static,
392 {
393 let exchange =
394 Exchange::new(move |update: &((K, V), G::Timestamp, R)| (update.0).0.hashed().into());
395 arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
396 }
397}
398
399pub fn arrange_core<G, P, Ba, Bu, Tr>(
405 stream: &StreamCore<G, Ba::Input>,
406 pact: P,
407 name: &str,
408) -> Arranged<G, TraceAgent<Tr>>
409where
410 G: Scope<Timestamp: Lattice>,
411 P: ParallelizationContract<G::Timestamp, Ba::Input>,
412 Ba: Batcher<Time = G::Timestamp, Input: Container> + 'static,
413 Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
414 Tr: Trace<Time = G::Timestamp> + 'static,
415{
416 let mut reader: Option<TraceAgent<Tr>> = None;
432
433 let reader_ref = &mut reader;
435 let scope = stream.scope();
436
437 let stream = stream.unary_frontier(pact, name, move |_capability, info| {
438 let logger = scope
440 .logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange")
441 .map(Into::into);
442
443 let mut batcher = Ba::new(logger.clone(), info.global_id);
445
446 let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
448
449 let activator = Some(scope.activator_for(info.address.clone()));
450 let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
451 if let Some(exert_logic) = scope
453 .config()
454 .get::<trace::ExertionLogic>("differential/default_exert_logic")
455 .cloned()
456 {
457 empty_trace.set_exert_logic(exert_logic);
458 }
459
460 let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
461
462 *reader_ref = Some(reader_local);
463
464 let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
466
467 move |(input, frontier), output| {
468 input.for_each(|cap, data| {
473 capabilities.insert(cap.retain());
474 batcher.push_container(data);
475 });
476
477 assert!(PartialOrder::less_equal(
484 &prev_frontier.borrow(),
485 &frontier.frontier()
486 ));
487
488 if prev_frontier.borrow() != frontier.frontier() {
492 if capabilities
506 .elements()
507 .iter()
508 .any(|c| !frontier.less_equal(c.time()))
509 {
510 let mut upper = Antichain::new(); for (index, capability) in capabilities.elements().iter().enumerate() {
514 if !frontier.less_equal(capability.time()) {
515 upper.clear();
519 for time in frontier.frontier().iter() {
520 upper.insert(time.clone());
521 }
522 for other_capability in &capabilities.elements()[(index + 1)..] {
523 upper.insert(other_capability.time().clone());
524 }
525
526 let batch = batcher.seal::<Bu>(upper.clone());
528
529 writer.insert(batch.clone(), Some(capability.time().clone()));
530
531 output.session(&capabilities.elements()[index]).give(batch);
533 }
534 }
535
536 let mut new_capabilities = Antichain::new();
542 for time in batcher.frontier().iter() {
543 if let Some(capability) = capabilities
544 .elements()
545 .iter()
546 .find(|c| c.time().less_equal(time))
547 {
548 new_capabilities.insert(capability.delayed(time));
549 } else {
550 panic!("failed to find capability");
551 }
552 }
553
554 capabilities = new_capabilities;
555 } else {
556 let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
558 writer.seal(frontier.frontier().to_owned());
559 }
560
561 prev_frontier.clear();
562 prev_frontier.extend(frontier.frontier().iter().cloned());
563 }
564
565 writer.exert();
566 }
567 });
568
569 Arranged {
570 stream,
571 trace: reader.unwrap(),
572 }
573}
574
575impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup>
576 Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for VecCollection<G, K, R>
577where
578 G: Scope<Timestamp: Lattice + Ord>,
579{
580 fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
581 where
582 Ba: Batcher<Input = Vec<((K, ()), G::Timestamp, R)>, Time = G::Timestamp> + 'static,
583 Bu: Builder<Time = G::Timestamp, Input = Ba::Output, Output = Tr::Batch>,
584 Tr: Trace<Time = G::Timestamp> + 'static,
585 {
586 let exchange =
587 Exchange::new(move |update: &((K, ()), G::Timestamp, R)| (update.0).0.hashed().into());
588 arrange_core::<_, _, Ba, Bu, _>(&self.map(|k| (k, ())).inner, exchange, name)
589 }
590}
591
592pub trait ArrangeByKey<G: Scope, K: Data + Hashable, V: Data, R: Ord + Semigroup + 'static>
598where
599 G: Scope<Timestamp: Lattice + Ord>,
600{
601 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
607
608 fn arrange_by_key_named(
610 &self,
611 name: &str,
612 ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>;
613}
614
615impl<G, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup>
616 ArrangeByKey<G, K, V, R> for VecCollection<G, (K, V), R>
617where
618 G: Scope<Timestamp: Lattice + Ord>,
619{
620 fn arrange_by_key(&self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
621 self.arrange_by_key_named("ArrangeByKey")
622 }
623
624 fn arrange_by_key_named(
625 &self,
626 name: &str,
627 ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
628 self.arrange_named::<ValBatcher<_, _, _, _>, ValBuilder<_, _, _, _>, _>(name)
629 }
630}
631
632pub trait ArrangeBySelf<G, K: Data + Hashable, R: Ord + Semigroup + 'static>
638where
639 G: Scope<Timestamp: Lattice + Ord>,
640{
641 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
647
648 fn arrange_by_self_named(
650 &self,
651 name: &str,
652 ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>;
653}
654
655impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ArrangeBySelf<G, K, R>
656 for VecCollection<G, K, R>
657where
658 G: Scope<Timestamp: Lattice + Ord>,
659{
660 fn arrange_by_self(&self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
661 self.arrange_by_self_named("ArrangeBySelf")
662 }
663
664 fn arrange_by_self_named(
665 &self,
666 name: &str,
667 ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
668 self.map(|k| (k, ()))
669 .arrange_named::<KeyBatcher<_, _, _>, KeyBuilder<_, _, _>, _>(name)
670 }
671}