Trait timely::dataflow::operators::generic::operator::Operator [−][src]
Methods to construct generic streaming and blocking operators.
Required methods
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
[src]
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::Pipeline; fn main() { timely::example(|scope| { (0u64..10).to_stream(scope) .unary_frontier(Pipeline, "example", |default_cap, _info| { let mut cap = Some(default_cap.delayed(&12)); let mut notificator = FrontierNotificator::new(); let mut stash = HashMap::new(); let mut vector = Vec::new(); move |input, output| { if let Some(ref c) = cap.take() { output.session(&c).give(12); } while let Some((time, data)) = input.next() { data.swap(&mut vector); stash.entry(time.time().clone()) .or_insert(Vec::new()) .extend(vector.drain(..)); } notificator.for_each(&[input.frontier()], |time, _not| { if let Some(mut vec) = stash.remove(time.time()) { output.session(&time).give_iterator(vec.drain(..)); } }); } }); }); }
fn unary_notify<D2: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D2>
[src]
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D2>
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::Pipeline; fn main() { timely::example(|scope| { let mut vector = Vec::new(); (0u64..10) .to_stream(scope) .unary_notify(Pipeline, "example", None, move |input, output, notificator| { input.for_each(|time, data| { data.swap(&mut vector); output.session(&time).give_vec(&mut vector); notificator.notify_at(time.retain()); }); notificator.for_each(|time, _cnt, _not| { println!("notified at {:?}", time); }); }); }); }
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
[src]
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, and write to the output stream.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::Scope; timely::example(|scope| { (0u64..10).to_stream(scope) .unary(Pipeline, "example", |default_cap, _info| { let mut cap = Some(default_cap.delayed(&12)); let mut vector = Vec::new(); move |input, output| { if let Some(ref c) = cap.take() { output.session(&c).give(100); } while let Some((time, data)) = input.next() { data.swap(&mut vector); output.session(&time).give_vec(&mut vector); } } }); });
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{Input, Inspect, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::execute(timely::Config::thread(), |worker| { let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| { let (in1_handle, in1) = scope.new_input(); let (in2_handle, in2) = scope.new_input(); in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { let mut notificator = FrontierNotificator::new(); let mut stash = HashMap::new(); let mut vector1 = Vec::new(); let mut vector2 = Vec::new(); move |input1, input2, output| { while let Some((time, data)) = input1.next() { data.swap(&mut vector1); stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..)); notificator.notify_at(time.retain()); } while let Some((time, data)) = input2.next() { data.swap(&mut vector2); stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..)); notificator.notify_at(time.retain()); } notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| { if let Some(mut vec) = stash.remove(time.time()) { output.session(&time).give_iterator(vec.drain(..)); } }); } }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); (in1_handle, in2_handle) }); for i in 1..10 { in1.send(i - 1); in1.advance_to(i); in2.send(i - 1); in2.advance_to(i); } }).unwrap();
fn binary_notify<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D3>
[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D3>
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{Input, Inspect, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::execute(timely::Config::thread(), |worker| { let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| { let (in1_handle, in1) = scope.new_input(); let (in2_handle, in2) = scope.new_input(); let mut vector1 = Vec::new(); let mut vector2 = Vec::new(); in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { input1.for_each(|time, data| { data.swap(&mut vector1); output.session(&time).give_vec(&mut vector1); notificator.notify_at(time.retain()); }); input2.for_each(|time, data| { data.swap(&mut vector2); output.session(&time).give_vec(&mut vector2); notificator.notify_at(time.retain()); }); notificator.for_each(|time, _cnt, _not| { println!("notified at {:?}", time); }); }); (in1_handle, in2_handle) }); for i in 1..10 { in1.send(i - 1); in1.advance_to(i); in2.send(i - 1); in2.advance_to(i); } }).unwrap();
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::Scope; timely::example(|scope| { let stream2 = (0u64..10).to_stream(scope); (0u64..10).to_stream(scope) .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| { let mut cap = Some(default_cap.delayed(&12)); let mut vector1 = Vec::new(); let mut vector2 = Vec::new(); move |input1, input2, output| { if let Some(ref c) = cap.take() { output.session(&c).give(100); } while let Some((time, data)) = input1.next() { data.swap(&mut vector1); output.session(&time).give_vec(&mut vector1); } while let Some((time, data)) = input2.next() { data.swap(&mut vector2); output.session(&time).give_vec(&mut vector2); } } }).inspect(|x| println!("{:?}", x)); });
fn sink<L, P>(&self, pact: P, name: &str, logic: L) where
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
[src]
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes the function logic
which can read from the input stream
and inspect the frontier at the input.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::Scope; timely::example(|scope| { (0u64..10) .to_stream(scope) .sink(Pipeline, "example", |input| { while let Some((time, data)) = input.next() { for datum in data.iter() { println!("{:?}:\t{:?}", time, datum); } } }); });
Implementors
impl<G: Scope, D1: Data> Operator<G, D1> for Stream<G, D1>
[src]
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
[src]
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
fn unary_notify<D2: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D2>
[src]
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D2>
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
[src]
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<'_, G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
fn binary_notify<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D3>
[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> Stream<G, D3>
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<'_, G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
fn sink<L, P>(&self, pact: P, name: &str, logic: L) where
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
[src]
L: FnMut(&mut FrontieredInputHandle<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,