timely/dataflow/operators/delay.rs
1//! Operators acting on timestamps to logically delay records
2
3use std::collections::HashMap;
4
5use crate::Data;
6use crate::order::{PartialOrder, TotalOrder};
7use crate::dataflow::channels::pact::Pipeline;
8use crate::dataflow::{Stream, Scope};
9use crate::dataflow::operators::generic::operator::Operator;
10
11/// Methods to advance the timestamps of records or batches of records.
12pub trait Delay<G: Scope, D: Data> {
13
14 /// Advances the timestamp of records using a supplied function.
15 ///
16 /// The function *must* advance the timestamp; the operator will test that the
17 /// new timestamp is greater or equal to the old timestamp, and will assert if
18 /// it is not.
19 ///
20 /// # Examples
21 ///
22 /// The following example takes the sequence `0..10` at time `0`
23 /// and delays each element `i` to time `i`.
24 ///
25 /// ```
26 /// use timely::dataflow::operators::{ToStream, Delay, Operator};
27 /// use timely::dataflow::channels::pact::Pipeline;
28 ///
29 /// timely::example(|scope| {
30 /// (0..10).to_stream(scope)
31 /// .delay(|data, time| *data)
32 /// .sink(Pipeline, "example", |input| {
33 /// input.for_each(|time, data| {
34 /// println!("data at time: {:?}", time);
35 /// });
36 /// });
37 /// });
38 /// ```
39 fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
40
41 /// Advances the timestamp of records using a supplied function.
42 ///
43 /// This method is a specialization of `delay` for when the timestamp is totally
44 /// ordered. In this case, we can use a priority queue rather than an unsorted
45 /// list to manage the potentially available timestamps.
46 ///
47 /// # Examples
48 ///
49 /// The following example takes the sequence `0..10` at time `0`
50 /// and delays each element `i` to time `i`.
51 ///
52 /// ```
53 /// use timely::dataflow::operators::{ToStream, Delay, Operator};
54 /// use timely::dataflow::channels::pact::Pipeline;
55 ///
56 /// timely::example(|scope| {
57 /// (0..10).to_stream(scope)
58 /// .delay(|data, time| *data)
59 /// .sink(Pipeline, "example", |input| {
60 /// input.for_each(|time, data| {
61 /// println!("data at time: {:?}", time);
62 /// });
63 /// });
64 /// });
65 /// ```
66 fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
67 where G::Timestamp: TotalOrder;
68
69 /// Advances the timestamp of batches of records using a supplied function.
70 ///
71 /// The operator will test that the new timestamp is greater or equal to the
72 /// old timestamp, and will assert if it is not. The batch version does not
73 /// consult the data, and may only view the timestamp itself.
74 ///
75 /// # Examples
76 ///
77 /// The following example takes the sequence `0..10` at time `0`
78 /// and delays each batch (there is just one) to time `1`.
79 ///
80 /// ```
81 /// use timely::dataflow::operators::{ToStream, Delay, Operator};
82 /// use timely::dataflow::channels::pact::Pipeline;
83 ///
84 /// timely::example(|scope| {
85 /// (0..10).to_stream(scope)
86 /// .delay_batch(|time| time + 1)
87 /// .sink(Pipeline, "example", |input| {
88 /// input.for_each(|time, data| {
89 /// println!("data at time: {:?}", time);
90 /// });
91 /// });
92 /// });
93 /// ```
94 fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self;
95}
96
97impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> {
98 fn delay<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
99 let mut elements = HashMap::new();
100 let mut vector = Vec::new();
101 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
102 input.for_each(|time, data| {
103 data.swap(&mut vector);
104 for datum in vector.drain(..) {
105 let new_time = func(&datum, &time);
106 assert!(time.time().less_equal(&new_time));
107 elements.entry(new_time.clone())
108 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
109 .push(datum);
110 }
111 });
112
113 // for each available notification, send corresponding set
114 notificator.for_each(|time,_,_| {
115 if let Some(mut data) = elements.remove(&time) {
116 output.session(&time).give_iterator(data.drain(..));
117 }
118 });
119 })
120 }
121
122 fn delay_total<L: FnMut(&D, &G::Timestamp)->G::Timestamp+'static>(&self, func: L) -> Self
123 where G::Timestamp: TotalOrder
124 {
125 self.delay(func)
126 }
127
128 fn delay_batch<L: FnMut(&G::Timestamp)->G::Timestamp+'static>(&self, mut func: L) -> Self {
129 let mut elements = HashMap::new();
130 self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| {
131 input.for_each(|time, data| {
132 let new_time = func(&time);
133 assert!(time.time().less_equal(&new_time));
134 elements.entry(new_time.clone())
135 .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() })
136 .push(data.replace(Vec::new()));
137 });
138
139 // for each available notification, send corresponding set
140 notificator.for_each(|time,_,_| {
141 if let Some(mut datas) = elements.remove(&time) {
142 for mut data in datas.drain(..) {
143 output.session(&time).give_vec(&mut data);
144 }
145 }
146 });
147 })
148 }
149}