pub struct Stream<S: Scope, C> { /* private fields */ }Expand description
Abstraction of a stream of C: Container records timestamped with S::Timestamp.
Internally Stream maintains a list of data recipients who should be presented with data
produced by the source of the stream.
Implementations§
Source§impl<S: Scope, C> Stream<S, C>
impl<S: Scope, C> Stream<S, C>
Sourcepub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>(
self,
target: Target,
pusher: P,
identifier: usize,
)where
C: 'static,
pub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>(
self,
target: Target,
pusher: P,
identifier: usize,
)where
C: 'static,
Connects the stream to a destination.
The destination is described both by a Target, for progress tracking information, and a P: Push where the
records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
Sourcepub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self
pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self
Allocates a Stream from a supplied Source name and rendezvous point.
Sourcepub fn container<C2>(self) -> Stream<S, C2>where
Self: AsStream<S, C2>,
pub fn container<C2>(self) -> Stream<S, C2>where
Self: AsStream<S, C2>,
Allows the assertion of a container type, for the benefit of type inference.
This method can be needed when the container type of a stream is unconstrained, most commonly after creating an input, or bracking wholly generic operators.
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
});Examples found in repository?
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9 let send = TcpStream::connect(addr).unwrap();
10
11 worker.dataflow::<u64,_,_>(|scope|
12 (0..10u64)
13 .to_stream(scope)
14 .container::<Vec<_>>()
15 .capture_into(EventWriter::new(send))
16 );
17 }).unwrap();
18}More examples
4fn main() {
5 timely::execute(Config::thread(), |worker| {
6 let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7 let (input, stream) = scope.new_unordered_input();
8 stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9 input
10 });
11
12 for round in 0..10 {
13 input.activate().session(&cap).give(round);
14 cap = cap.delayed(&(round + 1));
15 worker.step();
16 }
17 }).unwrap();
18}5fn main() {
6
7 // create a naked single-threaded worker.
8 let allocator = timely::communication::allocator::Thread::default();
9 let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11 // create input and probe handles.
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 // directly build a dataflow.
16 worker.dataflow(|scope| {
17 input
18 .to_stream(scope)
19 .container::<Vec<_>>()
20 .inspect(|x| println!("{:?}", x))
21 .probe_with(&probe);
22 });
23
24 // manage inputs.
25 for i in 0 .. 10 {
26 input.send(i);
27 input.advance_to(i);
28 while probe.less_than(input.time()) {
29 worker.step();
30 }
31 }
32}10fn main() {
11 // initializes and runs a timely dataflow.
12 timely::execute_from_args(std::env::args(), |worker| {
13 // create a new input, exchange data, and inspect its output
14 let index = worker.index();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17 worker.dataflow(|scope| {
18 scope.input_from(&mut input)
19 .container::<Vec<_>>()
20 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22 .probe_with(&probe);
23 });
24
25 // introduce data and watch!
26 for round in 0..10 {
27 input.send(Test { _field: Rc::new(round) } );
28 input.advance_to(round + 1);
29 worker.step_while(|| probe.less_than(input.time()));
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let index = worker.index();
9 let mut input = InputHandle::new();
10 let probe = ProbeHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 worker.dataflow(|scope| {
14 scope.input_from(&mut input)
15 .container::<Vec<_>>()
16 .exchange(|x| *x)
17 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18 .probe_with(&probe);
19 });
20
21 // introduce data and watch!
22 for round in 0..10 {
23 if index == 0 {
24 input.send(round);
25 }
26 input.advance_to(round + 1);
27 while probe.less_than(input.time()) {
28 worker.step();
29 }
30 }
31 }).unwrap();
32}4fn main() {
5 // initializes and runs a timely dataflow.
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 // create a new input, exchange data, and inspect its output
13 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}Trait Implementations§
Source§impl<S: Scope, C: Container> BranchWhen<<S as ScopeParent>::Timestamp> for Stream<S, C>
impl<S: Scope, C: Container> BranchWhen<<S as ScopeParent>::Timestamp> for Stream<S, C>
Source§fn branch_when(
self,
condition: impl Fn(&S::Timestamp) -> bool + 'static,
) -> (Self, Self)
fn branch_when( self, condition: impl Fn(&S::Timestamp) -> bool + 'static, ) -> (Self, Self)
true,
the records for that will be sent to the second returned stream, otherwise
they will be sent to the first. Read moreSource§impl<G: Scope, C: Container> ConnectLoop<G, C> for Stream<G, C>
impl<G: Scope, C: Container> ConnectLoop<G, C> for Stream<G, C>
Source§fn connect_loop(self, handle: Handle<G, C>)
fn connect_loop(self, handle: Handle<G, C>)
Stream to be the input of a loop variable. Read moreSource§impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Container> Enter<G, T, C> for Stream<G, C>
impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Container> Enter<G, T, C> for Stream<G, C>
Source§impl<G: Scope, C> Exchange<C> for Stream<G, C>where
C: Container + SizableContainer + DrainContainer + Send + ContainerBytes + for<'a> PushInto<C::Item<'a>>,
impl<G: Scope, C> Exchange<C> for Stream<G, C>where
C: Container + SizableContainer + DrainContainer + Send + ContainerBytes + for<'a> PushInto<C::Item<'a>>,
Source§impl<G: Scope, C: Container> Inspect<G, C> for Stream<G, C>where
for<'a> &'a C: IntoIterator,
impl<G: Scope, C: Container> Inspect<G, C> for Stream<G, C>where
for<'a> &'a C: IntoIterator,
Source§fn inspect_core<F>(self, func: F) -> Self
fn inspect_core<F>(self, func: F) -> Self
Source§fn inspect<F>(self, func: F) -> Self
fn inspect<F>(self, func: F) -> Self
Source§fn inspect_time<F>(self, func: F) -> Self
fn inspect_time<F>(self, func: F) -> Self
Source§impl<G: Scope, C: Container, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for Stream<Child<'_, G, T>, C>
impl<G: Scope, C: Container, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for Stream<Child<'_, G, T>, C>
Source§impl<S: Scope, C: Container + DrainContainer> Map<S, C> for Stream<S, C>
impl<S: Scope, C: Container + DrainContainer> Map<S, C> for Stream<S, C>
Source§fn flat_map<C2, I, L>(self, logic: L) -> Stream<S, C2>where
I: IntoIterator,
C2: Container + SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>) -> I + 'static,
fn flat_map<C2, I, L>(self, logic: L) -> Stream<S, C2>where
I: IntoIterator,
C2: Container + SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>) -> I + 'static,
Source§fn map<C2, D2, L>(self, logic: L) -> Stream<S, C2>
fn map<C2, D2, L>(self, logic: L) -> Stream<S, C2>
Source§fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
FlatMapBuilder, which allows chaining of iterator logic before finalization into a stream. Read moreSource§impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for Stream<S, C>
impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for Stream<S, C>
Source§fn ok_err<C1, D1, C2, D2, L>(self, logic: L) -> (Stream<S, C1>, Stream<S, C2>)where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
fn ok_err<C1, D1, C2, D2, L>(self, logic: L) -> (Stream<S, C1>, Stream<S, C2>)where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
Ok(x), then x will be sent
to the first returned stream; otherwise, if it returns Err(e),
then e will be sent to the second. Read moreSource§impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1>
impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1>
Source§fn unary_frontier<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary_frontier<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moreSource§fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>(
self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> Stream<G, CB::Container>
fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>( self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> Stream<G, CB::Container>
pact,
and repeatedly invokes the closure supplied as logic, which can read from the input stream, write to
the output stream, and inspect the frontier at the input. Read moreSource§fn unary<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary<CB, B, L, P>(
self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, and write to the output stream. Read moreSource§fn binary_frontier<C2, CB, B, L, P1, P2>(
self,
other: Stream<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>), (&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary_frontier<C2, CB, B, L, P1, P2>(
self,
other: Stream<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>), (&mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>(
self,
other: Stream<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> Stream<G, CB::Container>
fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>( self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> Stream<G, CB::Container>
pact,
and repeatedly invokes the closure supplied as logic, which can read from the input streams, write to
the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary<C2, CB, B, L, P1, P2>(
self,
other: Stream<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary<C2, CB, B, L, P1, P2>(
self,
other: Stream<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn sink<L, P>(self, pact: P, name: &str, logic: L)where
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>)) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn sink<L, P>(self, pact: P, name: &str, logic: L)where
L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>)) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
pact, and repeatedly invokes the function logic which can read from the input stream
and inspect the frontier at the input. Read moreAuto Trait Implementations§
impl<S, C> Freeze for Stream<S, C>where
S: Freeze,
impl<S, C> !RefUnwindSafe for Stream<S, C>
impl<S, C> !Send for Stream<S, C>
impl<S, C> !Sync for Stream<S, C>
impl<S, C> Unpin for Stream<S, C>where
S: Unpin,
impl<S, C> UnsafeUnpin for Stream<S, C>where
S: UnsafeUnpin,
impl<S, C> !UnwindSafe for Stream<S, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more