Struct timely::dataflow::stream::Stream
[−]
[src]
pub struct Stream<S: Scope, D> { /* fields omitted */ }
Abstraction of a stream of D: Data
records timestamped with S::Timestamp
.
Internally Stream
maintains a list of data recipients who should be presented with data
produced by the source of the stream.
Methods
impl<S: Scope, D> Stream<S, D>
[src]
fn connect_to<P: Push<(S::Timestamp, Content<D>)> + 'static>(
&self,
target: Target,
pusher: P,
identifier: usize
)
&self,
target: Target,
pusher: P,
identifier: usize
)
Connects the stream to a destination.
The destination is described both by a Target
, for progress tracking information, and a P: Push
where the
records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self
Allocates a Stream
from a supplied Source
name and rendezvous point.
fn name(&self) -> &Source
The name of the stream's source operator.
fn scope(&self) -> S
The scope immediately containing the stream.
Trait Implementations
impl<T: Timestamp, G: Scope, D: Data> Enter<G, T, D> for Stream<G, D>
[src]
fn enter<'a>(&self, scope: &Child<'a, G, T>) -> Stream<Child<'a, G, T>, D>
Moves the Stream
argument into a child of its current Scope
. Read more
impl<'a, G: Scope, D: Data, T: Timestamp> Leave<G, D> for Stream<Child<'a, G, T>, D>
[src]
impl<G: Scope, D1: Data> Unary<G, D1> for Stream<G, D1>
[src]
fn unary_notify<D2: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>, &mut Notificator<G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D2>
&self,
pact: P,
name: &str,
init: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D2>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact
, and repeatedly invokes logic
which can read from the input stream, write to the output stream, and request and receive notifications. The method also requires a vector of the initial notifications the operator requires (commonly none). Read more
fn unary_stream<D2: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static, P: ParallelizationContract<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
logic: L
) -> Stream<G, D2>
&self,
pact: P,
name: &str,
logic: L
) -> Stream<G, D2>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact
, and repeatedly invokes logic
which can read from the input stream and write to the output stream. Read more
impl<'a, G: ScopeParent, T: Timestamp, D: Data> ConnectLoop<G, T, D> for Stream<Child<'a, G, T>, D>
[src]
fn connect_loop(&self, helper: Handle<G::Timestamp, T, D>)
Connect a Stream
to be the input of a loop variable. Read more
impl<G: Scope, D: Data> Concat<G, D> for Stream<G, D>
[src]
impl<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2) + 'static> Partition<G, D, D2, F> for Stream<G, D>
[src]
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>
Produces parts
output streams, containing records produced and assigned by route
. Read more
impl<S: Scope, D: Data> Map<S, D> for Stream<S, D>
[src]
fn map<D2: Data, L: Fn(D) -> D2 + 'static>(&self, logic: L) -> Stream<S, D2>
Consumes each element of the stream and yields a new element. Read more
fn map_in_place<L: Fn(&mut D) + 'static>(&self, logic: L) -> Stream<S, D>
Updates each element of the stream and yields the element, re-using memory where possible. Read more
fn flat_map<I: IntoIterator, L: Fn(D) -> I + 'static>(
&self,
logic: L
) -> Stream<S, I::Item> where
I::Item: Data,
&self,
logic: L
) -> Stream<S, I::Item> where
I::Item: Data,
Consumes each element of the stream and yields some number of new elements. Read more
impl<G: Scope, D: Data> Inspect<G, D> for Stream<G, D>
[src]
fn inspect<F: FnMut(&D) + 'static>(&self, func: F) -> Stream<G, D>
Runs a supplied closure on each observed data element. Read more
fn inspect_batch<F: FnMut(&G::Timestamp, &[D]) + 'static>(
&self,
func: F
) -> Stream<G, D>
&self,
func: F
) -> Stream<G, D>
Runs a supplied closure on each observed data batch (time and data slice). Read more
impl<G: Scope, D: Data> Filter<D> for Stream<G, D>
[src]
fn filter<L: Fn(&D) -> bool + 'static>(&self, predicate: L) -> Stream<G, D>
Returns a new instance of self
containing only records satisfying predicate
. Read more
impl<G: Scope, D1: Data> Binary<G, D1> for Stream<G, D1>
[src]
fn binary_stream<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
logic: L
) -> Stream<G, D3>
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
logic: L
) -> Stream<G, D3>
Creates a new dataflow operator that partitions each of its input stream by a parallelization strategy pact
, and repeatedly invokes logic
which can read from the input streams and write to the output stream. Read more
fn binary_notify<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>, &mut Notificator<G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
notify: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D3>
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
notify: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D3>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact
, and repeatedly invokes logic
which can read from the input streams, write to the output stream, and request and receive notifications. The method also requires a vector of the initial notifications the operator requires (commonly none). Read more
impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D> where
G::Timestamp: Hash,
[src]
G::Timestamp: Hash,
fn delay<F: Fn(&D, &G::Timestamp) -> G::Timestamp + 'static>(
&self,
func: F
) -> Stream<G, D>
&self,
func: F
) -> Stream<G, D>
Advances the timestamp of records using a supplied function. Read more
fn delay_batch<F: Fn(&G::Timestamp) -> G::Timestamp + 'static>(
&self,
func: F
) -> Stream<G, D>
&self,
func: F
) -> Stream<G, D>
Advances the timestamp of batches of records using a supplied function. Read more
impl<T: Timestamp, G: Scope<Timestamp = T>, D: ExchangeData> Exchange<T, D> for Stream<G, D>
[src]
fn exchange<F: Fn(&D) -> u64 + 'static>(&self, route: F) -> Stream<G, D>
Exchange records so that all records with the same route
are at the same worker. Read more
fn exchange_ts<F: Fn(&T, &D) -> u64 + 'static>(&self, route: F) -> Stream<G, D>
Exchange records by time so that all records whose time and data evaluate to the same route
are at the same worker. Read more
impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D>
[src]
impl<G: Scope, D: Data> Probe<G, D> for Stream<G, D>
[src]
fn probe(&self) -> Handle<G::Timestamp>
Constructs a progress probe which indicates which timestamps have elapsed at the operator. Read more
fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> Stream<G, D>
Inserts a progress probe in a stream. Read more
impl<S: Scope, D: Data> Capture<S::Timestamp, D> for Stream<S, D>
[src]
fn capture_into<P: EventPusher<S::Timestamp, D> + 'static>(&self, pusher: P)
Captures a stream of timestamped data for later replay. Read more
fn capture(&self) -> Receiver<Event<T, D>>
Captures a stream using Rust's MPSC channels.
impl<G: Scope, D1: Data> Operator<G, D1> for Stream<G, D1>
[src]
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
. logic
can read from the input stream, write to the output stream, and inspect the frontier at the input. Read more
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
. logic
can read from the input stream, and write to the output stream. Read more
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1>, &mut InputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
. logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1>, &mut FrontieredInputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1>, &mut FrontieredInputHandle<G::Timestamp, D2>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact
, and repeteadly invokes logic
, the function returned by the function passed as constructor
. logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)>
[src]
fn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq,
&self,
fold: F,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq,
Tracks a state for each presented key, using user-supplied state transition logic. Read more
impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)>
[src]
fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, G: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
emit: G,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq,
&self,
fold: F,
emit: G,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq,
Aggregates data of the form (key, val)
, using user-supplied logic. Read more
impl<S: Scope, D: Data> Reclock<S, D> for Stream<S, D>
[src]
fn reclock(&self, clock: &Stream<S, ()>) -> Stream<S, D>
Delays records until an input is observed on the clock
input. Read more
impl<G: Scope, D: Data> Accumulate<G, D> for Stream<G, D> where
G::Timestamp: Hash,
[src]
G::Timestamp: Hash,
fn accumulate<A: Data, F: Fn(&mut A, &mut Content<D>) + 'static>(
&self,
default: A,
logic: F
) -> Stream<G, A>
&self,
default: A,
logic: F
) -> Stream<G, A>
Accumulates records within a timestamp. Read more
fn count(&self) -> Stream<G, usize>
Counts the number of records observed at each time. Read more
impl<S: Clone + Scope, D: Clone> Clone for Stream<S, D> where
S::Timestamp: Clone,
[src]
S::Timestamp: Clone,
fn clone(&self) -> Stream<S, D>
Returns a copy of the value. Read more
fn clone_from(&mut self, source: &Self)
1.0.0
Performs copy-assignment from source
. Read more