timely/dataflow/operators/
enterleave.rs

1//! Extension traits to move a `Stream` between an outer `Scope` and inner `Scope`.
2//!
3//! Each `Stream` indicates its containing `Scope` as part of its type signature. To create a new
4//! stream with the same contents in another scope, one must explicit use the methods `enter` and
5//! `leave`, to clearly indicate the transition to the timely dataflow progress tracking logic.
6//!
7//! # Examples
8//! ```
9//! use timely::dataflow::scopes::Scope;
10//! use timely::dataflow::operators::{Enter, Leave, ToStream, Inspect};
11//!
12//! timely::example(|outer| {
13//!     let stream = (0..9).to_stream(outer);
14//!     let output = outer.region(|inner| {
15//!         stream.enter(inner)
16//!               .inspect(|x| println!("in nested scope: {:?}", x))
17//!               .leave()
18//!     });
19//! });
20//! ```
21
22use std::marker::PhantomData;
23
24use crate::logging::{TimelyLogger, MessagesEvent};
25use crate::progress::Timestamp;
26use crate::progress::timestamp::Refines;
27use crate::progress::{Source, Target};
28use crate::order::Product;
29use crate::{Container, Data};
30use crate::communication::Push;
31use crate::dataflow::channels::pushers::{CounterCore, TeeCore};
32use crate::dataflow::channels::{BundleCore, Message};
33
34use crate::worker::AsWorker;
35use crate::dataflow::{StreamCore, Scope, Stream};
36use crate::dataflow::scopes::{Child, ScopeParent};
37use crate::dataflow::operators::delay::Delay;
38
39/// Extension trait to move a `Stream` into a child of its current `Scope`.
40pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
41    /// Moves the `Stream` argument into a child of its current `Scope`.
42    ///
43    /// # Examples
44    /// ```
45    /// use timely::dataflow::scopes::Scope;
46    /// use timely::dataflow::operators::{Enter, Leave, ToStream};
47    ///
48    /// timely::example(|outer| {
49    ///     let stream = (0..9).to_stream(outer);
50    ///     let output = outer.region(|inner| {
51    ///         stream.enter(inner).leave()
52    ///     });
53    /// });
54    /// ```
55    fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
56}
57
58use crate::dataflow::scopes::child::Iterative;
59
60/// Extension trait to move a `Stream` into a child of its current `Scope` setting the timestamp for each element.
61pub trait EnterAt<G: Scope, T: Timestamp, D: Data> {
62    /// Moves the `Stream` argument into a child of its current `Scope` setting the timestamp for each element by `initial`.
63    ///
64    /// # Examples
65    /// ```
66    /// use timely::dataflow::scopes::Scope;
67    /// use timely::dataflow::operators::{EnterAt, Leave, ToStream};
68    ///
69    /// timely::example(|outer| {
70    ///     let stream = (0..9u64).to_stream(outer);
71    ///     let output = outer.iterative(|inner| {
72    ///         stream.enter_at(inner, |x| *x).leave()
73    ///     });
74    /// });
75    /// ```
76    fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream<Iterative<'a, G, T>, D> ;
77}
78
79impl<G: Scope, T: Timestamp, D: Data, E: Enter<G, Product<<G as ScopeParent>::Timestamp, T>, Vec<D>>> EnterAt<G, T, D> for E {
80    fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) ->
81        Stream<Iterative<'a, G, T>, D> {
82            self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum)))
83    }
84}
85
86impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T, C> for StreamCore<G, C> {
87    fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {
88
89        use crate::scheduling::Scheduler;
90
91        let (targets, registrar) = TeeCore::<T, C>::new();
92        let ingress = IngressNub {
93            targets: CounterCore::new(targets),
94            phantom: PhantomData,
95            activator: scope.activator_for(&scope.addr()),
96            active: false,
97        };
98        let produced = ingress.targets.produced().clone();
99        let input = scope.subgraph.borrow_mut().new_input(produced);
100        let channel_id = scope.clone().new_identifier();
101
102        if let Some(logger) = scope.logging() {
103            let pusher = LogPusher::new(ingress, channel_id, scope.index(), logger);
104            self.connect_to(input, pusher, channel_id);
105        } else {
106            self.connect_to(input, ingress, channel_id);
107        }
108
109        StreamCore::new(
110            Source::new(0, input.port),
111            registrar,
112            scope.clone(),
113        )
114    }
115}
116
117/// Extension trait to move a `Stream` to the parent of its current `Scope`.
118pub trait Leave<G: Scope, D: Container> {
119    /// Moves a `Stream` to the parent of its current `Scope`.
120    ///
121    /// # Examples
122    /// ```
123    /// use timely::dataflow::scopes::Scope;
124    /// use timely::dataflow::operators::{Enter, Leave, ToStream};
125    ///
126    /// timely::example(|outer| {
127    ///     let stream = (0..9).to_stream(outer);
128    ///     let output = outer.region(|inner| {
129    ///         stream.enter(inner).leave()
130    ///     });
131    /// });
132    /// ```
133    fn leave(&self) -> StreamCore<G, D>;
134}
135
136impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines<G::Timestamp>> Leave<G, D> for StreamCore<Child<'a, G, T>, D> {
137    fn leave(&self) -> StreamCore<G, D> {
138
139        let scope = self.scope();
140
141        let output = scope.subgraph.borrow_mut().new_output();
142        let target = Target::new(0, output.port);
143        let (targets, registrar) = TeeCore::<G::Timestamp, D>::new();
144        let egress = EgressNub { targets, phantom: PhantomData };
145        let channel_id = scope.clone().new_identifier();
146
147        if let Some(logger) = scope.logging() {
148            let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
149            self.connect_to(target, pusher, channel_id);
150        } else {
151            self.connect_to(target, egress, channel_id);
152        }
153
154        StreamCore::new(
155            output,
156            registrar,
157            scope.parent,
158        )
159    }
160}
161
162
163struct IngressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> {
164    targets: CounterCore<TInner, TData, TeeCore<TInner, TData>>,
165    phantom: ::std::marker::PhantomData<TOuter>,
166    activator: crate::scheduling::Activator,
167    active: bool,
168}
169
170impl<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Container> Push<BundleCore<TOuter, TData>> for IngressNub<TOuter, TInner, TData> {
171    fn push(&mut self, element: &mut Option<BundleCore<TOuter, TData>>) {
172        if let Some(message) = element {
173            let outer_message = message.as_mut();
174            let data = ::std::mem::take(&mut outer_message.data);
175            let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0)));
176            self.targets.push(&mut inner_message);
177            if let Some(inner_message) = inner_message {
178                if let Some(inner_message) = inner_message.if_typed() {
179                    outer_message.data = inner_message.data;
180                }
181            }
182            self.active = true;
183        }
184        else {
185            if self.active {
186                self.activator.activate();
187                self.active = false;
188            }
189            self.targets.done();
190        }
191    }
192}
193
194
195struct EgressNub<TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data> {
196    targets: TeeCore<TOuter, TData>,
197    phantom: PhantomData<TInner>,
198}
199
200impl<TOuter, TInner, TData: Container> Push<BundleCore<TInner, TData>> for EgressNub<TOuter, TInner, TData>
201where TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, TData: Data {
202    fn push(&mut self, message: &mut Option<BundleCore<TInner, TData>>) {
203        if let Some(message) = message {
204            let inner_message = message.as_mut();
205            let data = ::std::mem::take(&mut inner_message.data);
206            let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0)));
207            self.targets.push(&mut outer_message);
208            if let Some(outer_message) = outer_message {
209                if let Some(outer_message) = outer_message.if_typed() {
210                    inner_message.data = outer_message.data;
211                }
212            }
213        }
214        else { self.targets.done(); }
215    }
216}
217
218/// A pusher that logs messages passing through it.
219///
220/// This type performs the same function as the `LogPusher` and `LogPuller` types in
221/// `timely::dataflow::channels::pact`. We need a special implementation for `enter`/`leave`
222/// channels because those don't have a puller connected. Thus, this pusher needs to log both the
223/// send and the receive `MessageEvent`.
224struct LogPusher<P> {
225    pusher: P,
226    channel: usize,
227    counter: usize,
228    index: usize,
229    logger: TimelyLogger,
230}
231
232impl<P> LogPusher<P> {
233    fn new(pusher: P, channel: usize, index: usize, logger: TimelyLogger) -> Self {
234        Self {
235            pusher,
236            channel,
237            counter: 0,
238            index,
239            logger,
240        }
241    }
242}
243
244impl<T, D, P> Push<BundleCore<T, D>> for LogPusher<P>
245where
246    D: Container,
247    P: Push<BundleCore<T, D>>,
248{
249    fn push(&mut self, element: &mut Option<BundleCore<T, D>>) {
250        if let Some(bundle) = element {
251            let send_event = MessagesEvent {
252                is_send: true,
253                channel: self.channel,
254                source: self.index,
255                target: self.index,
256                seq_no: self.counter,
257                length: bundle.data.len(),
258            };
259            let recv_event = MessagesEvent {
260                is_send: false,
261                ..send_event
262            };
263
264            self.logger.log(send_event);
265            self.logger.log(recv_event);
266            self.counter += 1;
267        }
268
269        self.pusher.push(element);
270    }
271}
272
273#[cfg(test)]
274mod test {
275    /// Test that nested scopes with pass-through edges (no operators) correctly communicate progress.
276    ///
277    /// This is for issue: https://github.com/TimelyDataflow/timely-dataflow/issues/377
278    #[test]
279    fn test_nested() {
280
281        use crate::dataflow::{InputHandle, ProbeHandle};
282        use crate::dataflow::operators::{Input, Inspect, Probe};
283
284        use crate::dataflow::Scope;
285        use crate::dataflow::operators::{Enter, Leave};
286
287        // initializes and runs a timely dataflow.
288        crate::execute(crate::Config::process(3), |worker| {
289
290            let index = worker.index();
291            let mut input = InputHandle::new();
292            let mut probe = ProbeHandle::new();
293
294            // create a new input, exchange data, and inspect its output
295            worker.dataflow(|scope| {
296                let data = scope.input_from(&mut input);
297
298                scope.region(|inner| {
299
300                    let data = data.enter(inner);
301                    inner.region(|inner2| data.enter(inner2).leave()).leave()
302                })
303                    .inspect(move |x| println!("worker {}:\thello {}", index, x))
304                    .probe_with(&mut probe);
305            });
306
307            // introduce data and watch!
308            input.advance_to(0);
309            for round in 0..10 {
310                if index == 0 {
311                    input.send(round);
312                }
313                input.advance_to(round + 1);
314                while probe.less_than(input.time()) {
315                    worker.step_or_park(None);
316                }
317            }
318        }).unwrap();
319    }
320
321}