Skip to main content

Stream

Struct Stream 

Source
pub struct Stream<S: Scope, C> { /* private fields */ }
Expand description

Abstraction of a stream of C: Container records timestamped with S::Timestamp.

Internally Stream maintains a list of data recipients who should be presented with data produced by the source of the stream.

Implementations§

Source§

impl<S: Scope, C> Stream<S, C>

Source

pub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>( self, target: Target, pusher: P, identifier: usize, )
where C: 'static,

Connects the stream to a destination.

The destination is described both by a Target, for progress tracking information, and a P: Push where the records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.

Source

pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self

Allocates a Stream from a supplied Source name and rendezvous point.

Source

pub fn name(&self) -> &Source

The name of the stream’s source operator.

Source

pub fn scope(&self) -> S

The scope immediately containing the stream.

Source

pub fn container<C2>(self) -> Stream<S, C2>
where Self: AsStream<S, C2>,

Allows the assertion of a container type, for the benefit of type inference.

This method can be needed when the container type of a stream is unconstrained, most commonly after creating an input, or bracking wholly generic operators.

§Examples
use timely::dataflow::operators::{ToStream, Inspect};

timely::example(|scope| {
    (0..10).to_stream(scope)
           .container::<Vec<_>>()
           .inspect(|x| println!("seen: {:?}", x));
});
Examples found in repository?
examples/capture_send.rs (line 14)
5fn main() {
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let addr = format!("127.0.0.1:{}", 8000 + worker.index());
9        let send = TcpStream::connect(addr).unwrap();
10
11        worker.dataflow::<u64,_,_>(|scope|
12            (0..10u64)
13                .to_stream(scope)
14                .container::<Vec<_>>()
15                .capture_into(EventWriter::new(send))
16        );
17    }).unwrap();
18}
More examples
Hide additional examples
examples/unordered_input.rs (line 8)
4fn main() {
5    timely::execute(Config::thread(), |worker| {
6        let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
7            let (input, stream) = scope.new_unordered_input();
8            stream.container::<Vec<_>>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
9            input
10        });
11
12        for round in 0..10 {
13            input.activate().session(&cap).give(round);
14            cap = cap.delayed(&(round + 1));
15            worker.step();
16        }
17    }).unwrap();
18}
examples/threadless.rs (line 19)
5fn main() {
6
7    // create a naked single-threaded worker.
8    let allocator = timely::communication::allocator::Thread::default();
9    let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
10
11    // create input and probe handles.
12    let mut input = InputHandle::new();
13    let probe = ProbeHandle::new();
14
15    // directly build a dataflow.
16    worker.dataflow(|scope| {
17        input
18            .to_stream(scope)
19            .container::<Vec<_>>()
20            .inspect(|x| println!("{:?}", x))
21            .probe_with(&probe);
22    });
23
24    // manage inputs.
25    for i in 0 .. 10 {
26        input.send(i);
27        input.advance_to(i);
28        while probe.less_than(input.time()) {
29            worker.step();
30        }
31    }
32}
examples/rc.rs (line 19)
10fn main() {
11    // initializes and runs a timely dataflow.
12    timely::execute_from_args(std::env::args(), |worker| {
13        // create a new input, exchange data, and inspect its output
14        let index = worker.index();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17        worker.dataflow(|scope| {
18            scope.input_from(&mut input)
19                 .container::<Vec<_>>()
20                 //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send.
21                 .inspect(move |x| println!("worker {}:\thello {:?}", index, x))
22                 .probe_with(&probe);
23        });
24
25        // introduce data and watch!
26        for round in 0..10 {
27            input.send(Test { _field: Rc::new(round) } );
28            input.advance_to(round + 1);
29            worker.step_while(|| probe.less_than(input.time()));
30        }
31    }).unwrap();
32}
examples/hello.rs (line 15)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let index = worker.index();
9        let mut input = InputHandle::new();
10        let probe = ProbeHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        worker.dataflow(|scope| {
14            scope.input_from(&mut input)
15                 .container::<Vec<_>>()
16                 .exchange(|x| *x)
17                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
18                 .probe_with(&probe);
19        });
20
21        // introduce data and watch!
22        for round in 0..10 {
23            if index == 0 {
24                input.send(round);
25            }
26            input.advance_to(round + 1);
27            while probe.less_than(input.time()) {
28                worker.step();
29            }
30        }
31    }).unwrap();
32}
examples/exchange.rs (line 16)
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10        let mut input = InputHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        let probe = worker.dataflow(|scope|
14            scope
15                .input_from(&mut input)
16                .container::<Vec<_>>()
17                .exchange(|&x| x as u64)
18                .probe()
19                .0
20        );
21
22
23        let timer = std::time::Instant::now();
24
25        for round in 0 .. rounds {
26
27            for i in 0 .. batch {
28                input.send(i);
29            }
30            input.advance_to(round);
31
32            while probe.less_than(input.time()) {
33                worker.step();
34            }
35
36        }
37
38        let volume = (rounds * batch) as f64;
39        let elapsed = timer.elapsed();
40        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44    }).unwrap();
45}

Trait Implementations§

Source§

impl<S: Scope, C> AsStream<S, C> for Stream<S, C>

Source§

fn as_stream(self) -> Self

Translate self to a Stream.
Source§

impl<S: Scope, C: Container> BranchWhen<<S as ScopeParent>::Timestamp> for Stream<S, C>

Source§

fn branch_when( self, condition: impl Fn(&S::Timestamp) -> bool + 'static, ) -> (Self, Self)

Takes one input stream and splits it into two output streams. For each time, the supplied closure is called. If it returns true, the records for that will be sent to the second returned stream, otherwise they will be sent to the first. Read more
Source§

impl<S: Scope, C: Container> Capture<<S as ScopeParent>::Timestamp, C> for Stream<S, C>

Source§

fn capture_into<P: EventPusher<S::Timestamp, C> + 'static>( self, event_pusher: P, )

Captures a stream of timestamped data for later replay. Read more
Source§

fn capture(self) -> Receiver<Event<T, C>>

Captures a stream using Rust’s MPSC channels.
Source§

impl<S: Scope, C: Clone + 'static> Clone for Stream<S, C>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<G: Scope, C: Container> Concat<G, C> for Stream<G, C>

Source§

fn concat(self, other: Stream<G, C>) -> Stream<G, C>

Merge the contents of two streams. Read more
Source§

impl<G: Scope, C: Container> ConnectLoop<G, C> for Stream<G, C>

Source§

fn connect_loop(self, handle: Handle<G, C>)

Connect a Stream to be the input of a loop variable. Read more
Source§

impl<S, C> Debug for Stream<S, C>
where S: Scope,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Container> Enter<G, T, C> for Stream<G, C>

Source§

fn enter<'a>(self, scope: &Child<'a, G, T>) -> Stream<Child<'a, G, T>, C>

Moves the Stream argument into a child of its current Scope. Read more
Source§

impl<G: Scope, C> Exchange<C> for Stream<G, C>

Source§

fn exchange<F>(self, route: F) -> Stream<G, C>
where for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,

Exchange records between workers. Read more
Source§

impl<G: Scope, C> Filter<C> for Stream<G, C>
where for<'a> C: PushInto<C::Item<'a>> + Container + SizableContainer + DrainContainer,

Source§

fn filter<P: FnMut(&C::Item<'_>) -> bool + 'static>( self, predicate: P, ) -> Stream<G, C>

Returns a new instance of self containing only records satisfying predicate. Read more
Source§

impl<G: Scope, C: Container> Inspect<G, C> for Stream<G, C>
where for<'a> &'a C: IntoIterator,

Source§

fn inspect_core<F>(self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,

Runs a supplied closure on each observed data batch, and each frontier advancement. Read more
Source§

fn inspect<F>(self, func: F) -> Self
where F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,

Runs a supplied closure on each observed data element. Read more
Source§

fn inspect_time<F>(self, func: F) -> Self
where F: for<'a> FnMut(&G::Timestamp, <&'a C as IntoIterator>::Item) + 'static,

Runs a supplied closure on each observed data element and associated time. Read more
Source§

fn inspect_batch(self, func: impl FnMut(&G::Timestamp, &C) + 'static) -> Self

Runs a supplied closure on each observed data batch (time and data slice). Read more
Source§

impl<G: Scope, C: Container> InspectCore<G, C> for Stream<G, C>

Source§

fn inspect_container<F>(self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,

Runs a supplied closure on each observed container, and each frontier advancement. Read more
Source§

impl<G: Scope, C: Container, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for Stream<Child<'_, G, T>, C>

Source§

fn leave(self) -> Stream<G, C>

Moves a Stream to the parent of its current Scope. Read more
Source§

impl<S: Scope, C: Container + DrainContainer> Map<S, C> for Stream<S, C>

Source§

fn flat_map<C2, I, L>(self, logic: L) -> Stream<S, C2>
where I: IntoIterator, C2: Container + SizableContainer + PushInto<I::Item>, L: FnMut(C::Item<'_>) -> I + 'static,

Consumes each element of the stream and yields some number of new elements. Read more
Source§

fn map<C2, D2, L>(self, logic: L) -> Stream<S, C2>
where C2: Container + SizableContainer + PushInto<D2>, L: FnMut(C::Item<'_>) -> D2 + 'static,

Consumes each element of the stream and yields a new element. Read more
Source§

fn flat_map_builder<I, L>(self, logic: L) -> FlatMapBuilder<Self, C, L, I>
where L: for<'a> Fn(C::Item<'a>) -> I,

Creates a FlatMapBuilder, which allows chaining of iterator logic before finalization into a stream. Read more
Source§

impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for Stream<S, C>

Source§

fn ok_err<C1, D1, C2, D2, L>(self, logic: L) -> (Stream<S, C1>, Stream<S, C2>)
where C1: Container + SizableContainer + PushInto<D1>, C2: Container + SizableContainer + PushInto<D2>, L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,

Takes one input stream and splits it into two output streams. For each record, the supplied closure is called with the data. If it returns Ok(x), then x will be sent to the first returned stream; otherwise, if it returns Err(e), then e will be sent to the second. Read more
Source§

impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1>

Source§

fn unary_frontier<CB, B, L, P>( self, pact: P, name: &str, constructor: B, ) -> Stream<G, CB::Container>

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read more
Source§

fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>( self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> Stream<G, CB::Container>

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes the closure supplied as logic, which can read from the input stream, write to the output stream, and inspect the frontier at the input. Read more
Source§

fn unary<CB, B, L, P>( self, pact: P, name: &str, constructor: B, ) -> Stream<G, CB::Container>
where CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, and write to the output stream. Read more
Source§

fn binary_frontier<C2, CB, B, L, P1, P2>( self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B, ) -> Stream<G, CB::Container>

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
Source§

fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>( self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> Stream<G, CB::Container>

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes the closure supplied as logic, which can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
Source§

fn binary<C2, CB, B, L, P1, P2>( self, other: Stream<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B, ) -> Stream<G, CB::Container>

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
Source§

fn sink<L, P>(self, pact: P, name: &str, logic: L)
where L: FnMut((&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>)) + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes the function logic which can read from the input stream and inspect the frontier at the input. Read more
Source§

impl<G: Scope, C: Container + DrainContainer> Partition<G, C> for Stream<G, C>

Source§

fn partition<CB, D2, F>( self, parts: u64, route: F, ) -> Vec<Stream<G, CB::Container>>
where CB: ContainerBuilder + PushInto<D2>, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,

Produces parts output streams, containing records produced and assigned by route. Read more
Source§

impl<G: Scope, C: Container> Probe<G, C> for Stream<G, C>

Source§

fn probe(self) -> (Handle<G::Timestamp>, Self)

Constructs a progress probe which indicates which timestamps have elapsed at the operator. Read more
Source§

fn probe_with(self, handle: &Handle<G::Timestamp>) -> Stream<G, C>

Inserts a progress probe in a stream. Read more
Source§

impl<S: Scope, C: Container> Reclock<S> for Stream<S, C>

Source§

fn reclock<TC: Container>(self, clock: Stream<S, TC>) -> Stream<S, C>

Delays records until an input is observed on the clock input. Read more
Source§

impl<S: Scope, C: Container> SharedStream<S, C> for Stream<S, C>

Source§

fn shared(self) -> Stream<S, Rc<C>>

Convert a stream into a stream of shared data Read more

Auto Trait Implementations§

§

impl<S, C> Freeze for Stream<S, C>
where S: Freeze,

§

impl<S, C> !RefUnwindSafe for Stream<S, C>

§

impl<S, C> !Send for Stream<S, C>

§

impl<S, C> !Sync for Stream<S, C>

§

impl<S, C> Unpin for Stream<S, C>
where S: Unpin,

§

impl<S, C> UnsafeUnpin for Stream<S, C>
where S: UnsafeUnpin,

§

impl<S, C> !UnwindSafe for Stream<S, C>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.