timely/dataflow/operators/core/
exchange.rs1use crate::Container;
4use crate::container::{DrainContainer, SizableContainer, PushInto};
5use crate::dataflow::channels::pact::ExchangeCore;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::{Scope, Stream};
8
9pub trait Exchange<C: DrainContainer> {
11 fn exchange<F>(self, route: F) -> Self
28 where
29 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static;
30}
31
32impl<G: Scope, C> Exchange<C> for Stream<G, C>
33where
34 C: Container
35 + SizableContainer
36 + DrainContainer
37 + Send
38 + crate::dataflow::channels::ContainerBytes
39 + for<'a> PushInto<C::Item<'a>>,
40{
41 fn exchange<F>(self, route: F) -> Stream<G, C>
42 where
43 for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,
44 {
45 self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
46 move |input, output| {
47 input.for_each_time(|time, data| {
48 output.session(&time).give_containers(data);
49 });
50 }
51 })
52 }
53}