Trait timely::dataflow::operators::binary::Binary
[−]
[src]
pub trait Binary<G: Scope, D1: Data> { fn binary_stream<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
_: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
logic: L
) -> Stream<G, D3>; fn binary_notify<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>, &mut Notificator<G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
_: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
notify: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D3>; }
Methods to construct generic streaming and blocking binary operators.
Required Methods
fn binary_stream<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
_: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
logic: L
) -> Stream<G, D3>
&self,
_: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
logic: L
) -> Stream<G, D3>
Creates a new dataflow operator that partitions each of its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
which can read from the input streams and
write to the output stream.
Examples
use timely::dataflow::operators::{ToStream, Binary}; use timely::dataflow::channels::pact::Pipeline; timely::example(|scope| { let stream1 = (0..10).to_stream(scope); let stream2 = (0..10).to_stream(scope); stream1.binary_stream(&stream2, Pipeline, Pipeline, "example", |input1, input2, output| { input1.for_each(|time, data| { output.session(&time).give_content(data); }); input2.for_each(|time, data| { output.session(&time).give_content(data); }); }); });
fn binary_notify<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>, &mut Notificator<G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
_: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
notify: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D3>
&self,
_: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
notify: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D3>
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
which can read from the input streams,
write to the output stream, and request and receive notifications. The method also requires
a vector of the initial notifications the operator requires (commonly none).
Examples
use timely::dataflow::operators::{ToStream, Binary}; use timely::dataflow::channels::pact::Pipeline; timely::example(|scope| { let stream1 = (0..10).to_stream(scope); let stream2 = (0..10).to_stream(scope); stream1.binary_notify(&stream2, Pipeline, Pipeline, "example", Vec::new(), |input1, input2, output, notificator| { input1.for_each(|time, data| { output.session(&time).give_content(data); notificator.notify_at(time); }); input2.for_each(|time, data| { output.session(&time).give_content(data); notificator.notify_at(time); }); notificator.for_each(|time,_count,_notificator| { println!("done with time: {:?}", time.time()); }); }); });