Skip to main content

timely/dataflow/operators/core/
exchange.rs

1//! Exchange records between workers.
2
3use crate::Container;
4use crate::container::{DrainContainer, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::ExchangeCore;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::{Scope, Stream};
8
9/// Exchange records between workers.
10pub trait Exchange<C: DrainContainer> {
11    /// Exchange records between workers.
12    ///
13    /// The closure supplied should map a reference to a record to a `u64`,
14    /// whose value determines to which worker the record will be routed.
15    ///
16    /// # Examples
17    /// ```
18    /// use timely::dataflow::operators::{ToStream, Exchange, Inspect};
19    ///
20    /// timely::example(|scope| {
21    ///     (0..10).to_stream(scope)
22    ///            .container::<Vec<_>>()
23    ///            .exchange(|x| *x)
24    ///            .inspect(|x| println!("seen: {:?}", x));
25    /// });
26    /// ```
27    fn exchange<F>(self, route: F) -> Self
28    where
29        for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
30}
31
32impl<G: Scope, C> Exchange<C> for Stream<G, C>
33where
34    C: Container
35        + SizableContainer
36        + DrainContainer
37        + Send
38        + crate::dataflow::channels::ContainerBytes
39        + for<'a> PushInto<C::Item<'a>>,
40{
41    fn exchange<F>(self, route: F) -> Stream<G, C>
42    where
43        for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
44    {
45        self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
46            move |input, output| {
47                input.for_each_time(|time, data| {
48                    output.session(&time).give_containers(data);
49                });
50            }
51        })
52    }
53}