timely/dataflow/operators/
exchange.rs1use crate::ExchangeData;
4use crate::container::PushPartitioned;
5use crate::dataflow::channels::pact::ExchangeCore;
6use crate::dataflow::operators::generic::operator::Operator;
7use crate::dataflow::{Scope, StreamCore};
8
9pub trait Exchange<D> {
11 fn exchange(&self, route: impl FnMut(&D) -> u64 + 'static) -> Self;
27}
28
29impl<G: Scope, C> Exchange<C::Item> for StreamCore<G, C>
30where
31 C: PushPartitioned + ExchangeData,
32 C::Item: ExchangeData,
33{
34 fn exchange(&self, route: impl FnMut(&C::Item) -> u64 + 'static) -> StreamCore<G, C> {
35 let mut container = Default::default();
36 self.unary(ExchangeCore::new(route), "Exchange", |_, _| {
37 move |input, output| {
38 input.for_each(|time, data| {
39 data.swap(&mut container);
40 output.session(&time).give_container(&mut container);
41 });
42 }
43 })
44 }
45}