Trait timely::dataflow::operators::generic::operator::Operator
[−]
[src]
pub trait Operator<G: Scope, D1: Data> { fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2>
where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>; fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2>
where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>; fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3>
where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>; fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3>
where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>; fn sink<L, P>(&self, pact: P, name: &str, logic: L)
where
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>; }
Methods to construct generic streaming and blocking operators.
Required Methods
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::progress::timestamp::RootTimestamp; fn main() { timely::example(|scope| { (0u64..10).to_stream(scope) .unary_frontier(Pipeline, "example", |default_cap| { let mut cap = Some(default_cap.delayed(&RootTimestamp::new(12))); let mut notificator = FrontierNotificator::new(); let mut stash = HashMap::new(); move |input, output| { if let Some(ref c) = cap.take() { output.session(&c).give(12); } while let Some((time, data)) = input.next() { stash.entry(time.time().clone()).or_insert(Vec::new()); } notificator.for_each(&[input.frontier()], |time, _not| { if let Some(mut vec) = stash.remove(time.time()) { output.session(&time).give_iterator(vec.drain(..)); } }); } }); }); }
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, and write to the output stream.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::progress::timestamp::RootTimestamp; use timely::dataflow::Scope; timely::example(|scope| { (0u64..10).to_stream(scope) .unary(Pipeline, "example", |default_cap| { let mut cap = Some(default_cap.delayed(&RootTimestamp::new(12))); move |input, output| { if let Some(ref c) = cap.take() { output.session(&c).give(100); } while let Some((time, data)) = input.next() { output.session(&time).give_content(data); } } }); });
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{Input, Inspect, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::execute(timely::Configuration::Thread, |worker| { let (mut in1, mut in2) = worker.dataflow(|scope| { let (in1_handle, in1) = scope.new_input(); let (in2_handle, in2) = scope.new_input(); in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _builder| { let mut notificator = FrontierNotificator::new(); let mut stash = HashMap::new(); move |input1, input2, output| { while let Some((time, data)) = input1.next() { stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); notificator.notify_at(time); } while let Some((time, data)) = input2.next() { stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); notificator.notify_at(time); } notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| { if let Some(mut vec) = stash.remove(time.time()) { output.session(&time).give_iterator(vec.drain(..)); } }); } }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); (in1_handle, in2_handle) }); for i in 1..10 { in1.send(i - 1); in1.advance_to(i); in2.send(i - 1); in2.advance_to(i); } }).unwrap();
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::progress::timestamp::RootTimestamp; use timely::dataflow::Scope; timely::example(|scope| { let stream2 = (0u64..10).to_stream(scope); (0u64..10).to_stream(scope) .binary(&stream2, Pipeline, Pipeline, "example", |default_cap| { let mut cap = Some(default_cap.delayed(&RootTimestamp::new(12))); move |input1, input2, output| { if let Some(ref c) = cap.take() { output.session(&c).give(100); } while let Some((time, data)) = input1.next() { output.session(&time).give_content(data); } while let Some((time, data)) = input2.next() { output.session(&time).give_content(data); } } }).inspect(|x| println!("{:?}", x)); });
fn sink<L, P>(&self, pact: P, name: &str, logic: L) where
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeteadly invokes the function logic
which can read from the input stream
and inspect the frontier at the input.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::progress::timestamp::RootTimestamp; use timely::dataflow::Scope; timely::example(|scope| { (0u64..10) .to_stream(scope) .sink(Pipeline, "example", |input| { while let Some((time, data)) = input.next() { for datum in data.iter() { println!("{:?}:\t{:?}", time, datum); } } }); });