timely/dataflow/operators/
exchange.rs

1//! Exchange records between workers.
2
3use crate::ExchangeData;
4use crate::container::PushPartitioned;
5use crate::dataflow::channels::pact::ExchangeCore;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::{Scope, StreamCore};
8
9/// Exchange records between workers.
10pub trait Exchange<D> {
11    /// Exchange records between workers.
12    ///
13    /// The closure supplied should map a reference to a record to a `u64`,
14    /// whose value determines to which worker the record will be routed.
15    ///
16    /// # Examples
17    /// ```
18    /// use timely::dataflow::operators::{ToStream, Exchange, Inspect};
19    ///
20    /// timely::example(|scope| {
21    ///     (0..10).to_stream(scope)
22    ///            .exchange(|x| *x)
23    ///            .inspect(|x| println!("seen: {:?}", x));
24    /// });
25    /// ```
26    fn exchange(&self, route: impl FnMut(&D) -> u64 + 'static) -> Self;
27}
28
29impl<G: Scope, C> Exchange<C::Item> for StreamCore<G, C>
30where
31    C: PushPartitioned + ExchangeData,
32    C::Item: ExchangeData,
33{
34    fn exchange(&self, route: impl FnMut(&C::Item) -> u64 + 'static) -> StreamCore<G, C> {
35        let mut container = Default::default();
36        self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
37            move |input, output| {
38                input.for_each(|time, data| {
39                    data.swap(&mut container);
40                    output.session(&time).give_container(&mut container);
41                });
42            }
43        })
44    }
45}