pub struct Stream<'scope, T: Timestamp, C> { /* private fields */ }Expand description
Abstraction of a stream of C: Container records timestamped with T.
Internally Stream maintains a list of data recipients who should be presented with data
produced by the source of the stream.
Implementations§
Source§impl<'scope, T: Timestamp, C> Stream<'scope, T, C>
impl<'scope, T: Timestamp, C> Stream<'scope, T, C>
Sourcepub fn connect_to<P: Push<Message<T, C>> + 'static>(
self,
target: Target,
pusher: P,
identifier: usize,
)where
C: 'static,
pub fn connect_to<P: Push<Message<T, C>> + 'static>(
self,
target: Target,
pusher: P,
identifier: usize,
)where
C: 'static,
Connects the stream to a destination.
The destination is described both by a Target, for progress tracking information, and a P: Push where the
records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
Sourcepub fn new(
source: Source,
output: TeeHelper<T, C>,
scope: Scope<'scope, T>,
) -> Self
pub fn new( source: Source, output: TeeHelper<T, C>, scope: Scope<'scope, T>, ) -> Self
Allocates a Stream from a supplied Source name and rendezvous point.
Sourcepub fn container<C2>(self) -> Stream<'scope, T, C2>where
Self: AsStream<'scope, T, C2>,
pub fn container<C2>(self) -> Stream<'scope, T, C2>where
Self: AsStream<'scope, T, C2>,
Allows the assertion of a container type, for the benefit of type inference.
This method can be needed when the container type of a stream is unconstrained, most commonly after creating an input, or bracking wholly generic operators.
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
});Examples found in repository?
examples/capture_send.rs (line 14)
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9 let send = TcpStream::connect(addr).unwrap();
10
11 worker.dataflow::<u64,_,_>(|scope|
12 (0..10u64)
13 .to_stream(scope)
14 .container::<Vec<_>>()
15 .capture_into(EventWriter::new(send))
16 );
17 }).unwrap();
18}More examples
examples/unordered_input.rs (line 8)
4fn main() {
5 timely::execute(Config::thread(), |worker| {
6 let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7 let (input, stream) = scope.new_unordered_input();
8 stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9 input
10 });
11
12 for round in 0..10 {
13 input.activate().session(&cap).give(round);
14 cap = cap.delayed(&(round + 1));
15 worker.step();
16 }
17 }).unwrap();
18}examples/threadless.rs (line 21)
5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::Allocator::Thread(
9 timely::communication::allocator::Thread::default(),
10 );
11 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
12
13 // create input and probe handles.
14 let mut input = InputHandle::new();
15 let probe = ProbeHandle::new();
16
17 // directly build a dataflow.
18 worker.dataflow(|scope| {
19 input
20 .to_stream(scope)
21 .container::<Vec<_>>()
22 .inspect(|x| println!("{:?}", x))
23 .probe_with(&probe);
24 });
25
26 // manage inputs.
27 for i in 0 .. 10 {
28 input.send(i);
29 input.advance_to(i);
30 while probe.less_than(input.time()) {
31 worker.step();
32 }
33 }
34}examples/rc.rs (line 19)
10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}examples/hello.rs (line 15)
4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}examples/exchange.rs (line 16)
4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}Trait Implementations§
Source§impl<'scope, T: Timestamp, C: Container> BranchWhen<T> for Stream<'scope, T, C>
impl<'scope, T: Timestamp, C: Container> BranchWhen<T> for Stream<'scope, T, C>
Source§fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self)
fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self)
Takes one input stream and splits it into two output streams.
For each time, the supplied closure is called. If it returns
true,
the records for that will be sent to the second returned stream, otherwise
they will be sent to the first. Read moreSource§impl<T: Timestamp, C: Container> Capture<T, C> for Stream<'_, T, C>
impl<T: Timestamp, C: Container> Capture<T, C> for Stream<'_, T, C>
Source§fn capture_into<P: EventPusher<T, C> + 'static>(self, event_pusher: P)
fn capture_into<P: EventPusher<T, C> + 'static>(self, event_pusher: P)
Captures a stream of timestamped data for later replay. Read more
Source§impl<'scope, T: Timestamp, C: Container> ConnectLoop<'scope, T, C> for Stream<'scope, T, C>
impl<'scope, T: Timestamp, C: Container> ConnectLoop<'scope, T, C> for Stream<'scope, T, C>
Source§fn connect_loop(self, handle: Handle<'scope, T, C>)
fn connect_loop(self, handle: Handle<'scope, T, C>)
Connect a
Stream to be the input of a loop variable. Read moreSource§impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C>
impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C>
Source§impl<T: Timestamp, C> Exchange<C> for Stream<'_, T, C>where
C: Container + SizableContainer + DrainContainer + Send + ContainerBytes + for<'a> PushInto<C::Item<'a>>,
impl<T: Timestamp, C> Exchange<C> for Stream<'_, T, C>where
C: Container + SizableContainer + DrainContainer + Send + ContainerBytes + for<'a> PushInto<C::Item<'a>>,
Source§impl<T: Timestamp, C: Container> Inspect<T, C> for Stream<'_, T, C>where
for<'a> &'a C: IntoIterator,
impl<T: Timestamp, C: Container> Inspect<T, C> for Stream<'_, T, C>where
for<'a> &'a C: IntoIterator,
Source§fn inspect_core<F>(self, func: F) -> Self
fn inspect_core<F>(self, func: F) -> Self
Runs a supplied closure on each observed data batch, and each frontier advancement. Read more
Source§fn inspect<F>(self, func: F) -> Self
fn inspect<F>(self, func: F) -> Self
Runs a supplied closure on each observed data element. Read more
Source§fn inspect_time<F>(self, func: F) -> Self
fn inspect_time<F>(self, func: F) -> Self
Runs a supplied closure on each observed data element and associated time. Read more
Source§impl<'scope, T: Timestamp, C: Container + DrainContainer> Map<'scope, T, C> for Stream<'scope, T, C>
impl<'scope, T: Timestamp, C: Container + DrainContainer> Map<'scope, T, C> for Stream<'scope, T, C>
Source§fn flat_map<C2, I, L>(self, logic: L) -> Stream<'scope, T, C2>where
I: IntoIterator,
C2: Container + SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>) -> I + 'static,
fn flat_map<C2, I, L>(self, logic: L) -> Stream<'scope, T, C2>where
I: IntoIterator,
C2: Container + SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>) -> I + 'static,
Consumes each element of the stream and yields some number of new elements. Read more
Source§fn map<C2, D2, L>(self, logic: L) -> Stream<'scope, T, C2>
fn map<C2, D2, L>(self, logic: L) -> Stream<'scope, T, C2>
Consumes each element of the stream and yields a new element. Read more
Source§fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
Creates a
FlatMapBuilder, which allows chaining of iterator logic before finalization into a stream. Read moreSource§impl<'scope, T: Timestamp, C: Container + DrainContainer> OkErr<'scope, T, C> for Stream<'scope, T, C>
impl<'scope, T: Timestamp, C: Container + DrainContainer> OkErr<'scope, T, C> for Stream<'scope, T, C>
Source§fn ok_err<C1, D1, C2, D2, L>(
self,
logic: L,
) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
fn ok_err<C1, D1, C2, D2, L>(
self,
logic: L,
) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
Takes one input stream and splits it into two output streams.
For each record, the supplied closure is called with the data.
If it returns
Ok(x), then x will be sent
to the first returned stream; otherwise, if it returns Err(e),
then e will be sent to the second. Read moreSource§impl<'scope, T: Timestamp, C1: Container> Operator<'scope, T, C1> for Stream<'scope, T, C1>
impl<'scope, T: Timestamp, C1: Container> Operator<'scope, T, C1> for Stream<'scope, T, C1>
Source§fn unary_frontier<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>), &mut OutputBuilderSession<'_, T, CB>) + 'static,
P: ParallelizationContract<T, C1>,
fn unary_frontier<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>), &mut OutputBuilderSession<'_, T, CB>) + 'static,
P: ParallelizationContract<T, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moreSource§fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<T, C1, P::Puller>, &mut OutputBuilderSession<'_, T, CB>, &mut Notificator<'_, T>) + 'static, P: ParallelizationContract<T, C1>>(
self,
pact: P,
name: &str,
init: impl IntoIterator<Item = T>,
logic: L,
) -> Stream<'scope, T, CB::Container>
fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<T, C1, P::Puller>, &mut OutputBuilderSession<'_, T, CB>, &mut Notificator<'_, T>) + 'static, P: ParallelizationContract<T, C1>>( self, pact: P, name: &str, init: impl IntoIterator<Item = T>, logic: L, ) -> Stream<'scope, T, CB::Container>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy
pact,
and repeatedly invokes the closure supplied as logic, which can read from the input stream, write to
the output stream, and inspect the frontier at the input. Read moreSource§fn unary<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<T, C1, P::Puller>, &mut OutputBuilderSession<'_, T, CB>) + 'static,
P: ParallelizationContract<T, C1>,
fn unary<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<T, C1, P::Puller>, &mut OutputBuilderSession<'_, T, CB>) + 'static,
P: ParallelizationContract<T, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, and write to the output stream. Read moreSource§fn binary_frontier<C2, CB, B, L, P1, P2>(
self,
other: Stream<'scope, T, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<T, C1, P1::Puller>, &MutableAntichain<T>), (&mut InputHandleCore<T, C2, P2::Puller>, &MutableAntichain<T>), &mut OutputBuilderSession<'_, T, CB>) + 'static,
P1: ParallelizationContract<T, C1>,
P2: ParallelizationContract<T, C2>,
fn binary_frontier<C2, CB, B, L, P1, P2>(
self,
other: Stream<'scope, T, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<T, C1, P1::Puller>, &MutableAntichain<T>), (&mut InputHandleCore<T, C2, P2::Puller>, &MutableAntichain<T>), &mut OutputBuilderSession<'_, T, CB>) + 'static,
P1: ParallelizationContract<T, C1>,
P2: ParallelizationContract<T, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>, &mut InputHandleCore<T, C2, P2::Puller>, &mut OutputBuilderSession<'_, T, CB>, &mut Notificator<'_, T>) + 'static, P1: ParallelizationContract<T, C1>, P2: ParallelizationContract<T, C2>>(
self,
other: Stream<'scope, T, C2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = T>,
logic: L,
) -> Stream<'scope, T, CB::Container>
fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>, &mut InputHandleCore<T, C2, P2::Puller>, &mut OutputBuilderSession<'_, T, CB>, &mut Notificator<'_, T>) + 'static, P1: ParallelizationContract<T, C1>, P2: ParallelizationContract<T, C2>>( self, other: Stream<'scope, T, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = T>, logic: L, ) -> Stream<'scope, T, CB::Container>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy
pact,
and repeatedly invokes the closure supplied as logic, which can read from the input streams, write to
the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary<C2, CB, B, L, P1, P2>(
self,
other: Stream<'scope, T, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>, &mut InputHandleCore<T, C2, P2::Puller>, &mut OutputBuilderSession<'_, T, CB>) + 'static,
P1: ParallelizationContract<T, C1>,
P2: ParallelizationContract<T, C2>,
fn binary<C2, CB, B, L, P1, P2>(
self,
other: Stream<'scope, T, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<'scope, T, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<T>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<T, C1, P1::Puller>, &mut InputHandleCore<T, C2, P2::Puller>, &mut OutputBuilderSession<'_, T, CB>) + 'static,
P1: ParallelizationContract<T, C1>,
P2: ParallelizationContract<T, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn sink<L, P>(self, pact: P, name: &str, logic: L)where
L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>)) + 'static,
P: ParallelizationContract<T, C1>,
fn sink<L, P>(self, pact: P, name: &str, logic: L)where
L: FnMut((&mut InputHandleCore<T, C1, P::Puller>, &MutableAntichain<T>)) + 'static,
P: ParallelizationContract<T, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact, and repeatedly invokes the function logic which can read from the input stream
and inspect the frontier at the input. Read moreSource§impl<'scope, T: Timestamp, C: Container + DrainContainer> Partition<'scope, T, C> for Stream<'scope, T, C>
impl<'scope, T: Timestamp, C: Container + DrainContainer> Partition<'scope, T, C> for Stream<'scope, T, C>
Convert a stream into a stream of shared data Read more
Auto Trait Implementations§
impl<'scope, T, C> Freeze for Stream<'scope, T, C>
impl<'scope, T, C> !RefUnwindSafe for Stream<'scope, T, C>
impl<'scope, T, C> !Send for Stream<'scope, T, C>
impl<'scope, T, C> !Sync for Stream<'scope, T, C>
impl<'scope, T, C> Unpin for Stream<'scope, T, C>
impl<'scope, T, C> UnsafeUnpin for Stream<'scope, T, C>
impl<'scope, T, C> !UnwindSafe for Stream<'scope, T, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more