pub trait Operator<G: Scope, D1: Container> {
// Required methods
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, D2>
where D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>;
fn unary_notify<D2: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, D2>;
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, D2>
where D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>;
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, D3>
where D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>;
fn binary_notify<D2: Container, D3: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, D3>;
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, D3>
where D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>;
fn sink<L, P>(&self, pact: P, name: &str, logic: L)
where L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>;
}Expand description
Methods to construct generic streaming and blocking operators.
Required Methods§
Sourcefn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, D2>where
D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, D2>where
D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, write to the output stream, and inspect the frontier at the input.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
timely::example(|scope| {
(0u64..10).to_stream(scope)
.unary_frontier(Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
let mut notificator = FrontierNotificator::new();
let mut stash = HashMap::new();
let mut vector = Vec::new();
move |input, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(12);
}
while let Some((time, data)) = input.next() {
data.swap(&mut vector);
stash.entry(time.time().clone())
.or_insert(Vec::new())
.extend(vector.drain(..));
}
notificator.for_each(&[input.frontier()], |time, _not| {
if let Some(mut vec) = stash.remove(time.time()) {
output.session(&time).give_iterator(vec.drain(..));
}
});
}
});
});
}Sourcefn unary_notify<D2: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, D2>
fn unary_notify<D2: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>>( &self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, D2>
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, write to the output stream, and inspect the frontier at the input.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
timely::example(|scope| {
let mut vector = Vec::new();
(0u64..10)
.to_stream(scope)
.unary_notify(Pipeline, "example", None, move |input, output, notificator| {
input.for_each(|time, data| {
data.swap(&mut vector);
output.session(&time).give_vec(&mut vector);
notificator.notify_at(time.retain());
});
notificator.for_each(|time, _cnt, _not| {
println!("notified at {:?}", time);
});
});
});
}Sourcefn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, D2>where
D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, D2>where
D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, and write to the output stream.
§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
(0u64..10).to_stream(scope)
.unary(Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
let mut vector = Vec::new();
move |input, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(100);
}
while let Some((time, data)) = input.next() {
data.swap(&mut vector);
output.session(&time).give_vec(&mut vector);
}
}
});
});Sourcefn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, D3>where
D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>,
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, D3>where
D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::execute(timely::Config::thread(), |worker| {
let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
let (in1_handle, in1) = scope.new_input();
let (in2_handle, in2) = scope.new_input();
in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
let mut notificator = FrontierNotificator::new();
let mut stash = HashMap::new();
let mut vector1 = Vec::new();
let mut vector2 = Vec::new();
move |input1, input2, output| {
while let Some((time, data)) = input1.next() {
data.swap(&mut vector1);
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..));
notificator.notify_at(time.retain());
}
while let Some((time, data)) = input2.next() {
data.swap(&mut vector2);
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..));
notificator.notify_at(time.retain());
}
notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
if let Some(mut vec) = stash.remove(time.time()) {
output.session(&time).give_iterator(vec.drain(..));
}
});
}
}).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
(in1_handle, in2_handle)
});
for i in 1..10 {
in1.send(i - 1);
in1.advance_to(i);
in2.send(i - 1);
in2.advance_to(i);
}
}).unwrap();Sourcefn binary_notify<D2: Container, D3: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, D3>
fn binary_notify<D2: Container, D3: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>>( &self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, D3>
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::execute(timely::Config::thread(), |worker| {
let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
let (in1_handle, in1) = scope.new_input();
let (in2_handle, in2) = scope.new_input();
let mut vector1 = Vec::new();
let mut vector2 = Vec::new();
in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
input1.for_each(|time, data| {
data.swap(&mut vector1);
output.session(&time).give_vec(&mut vector1);
notificator.notify_at(time.retain());
});
input2.for_each(|time, data| {
data.swap(&mut vector2);
output.session(&time).give_vec(&mut vector2);
notificator.notify_at(time.retain());
});
notificator.for_each(|time, _cnt, _not| {
println!("notified at {:?}", time);
});
});
(in1_handle, in2_handle)
});
for i in 1..10 {
in1.send(i - 1);
in1.advance_to(i);
in2.send(i - 1);
in2.advance_to(i);
}
}).unwrap();Sourcefn binary<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, D3>where
D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>,
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, D3>where
D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
§Examples
use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
let stream2 = (0u64..10).to_stream(scope);
(0u64..10).to_stream(scope)
.binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
let mut vector1 = Vec::new();
let mut vector2 = Vec::new();
move |input1, input2, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(100);
}
while let Some((time, data)) = input1.next() {
data.swap(&mut vector1);
output.session(&time).give_vec(&mut vector1);
}
while let Some((time, data)) = input2.next() {
data.swap(&mut vector2);
output.session(&time).give_vec(&mut vector2);
}
}
}).inspect(|x| println!("{:?}", x));
});Sourcefn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact, and repeatedly invokes the function logic which can read from the input stream
and inspect the frontier at the input.
§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
(0u64..10)
.to_stream(scope)
.sink(Pipeline, "example", |input| {
while let Some((time, data)) = input.next() {
for datum in data.iter() {
println!("{:?}:\t{:?}", time, datum);
}
}
});
});Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.