Struct timely::dataflow::stream::Stream
[−]
[src]
pub struct Stream<S: Scope, D> { /* fields omitted */ }
Abstraction of a stream of D: Data records timestamped with S::Timestamp.
Internally Stream maintains a list of data recipients who should be presented with data
produced by the source of the stream.
Methods
impl<S: Scope, D> Stream<S, D>[src]
pub fn connect_to<P: Push<(S::Timestamp, Content<D>)> + 'static>(
&self,
target: Target,
pusher: P,
identifier: usize
)[src]
&self,
target: Target,
pusher: P,
identifier: usize
)
Connects the stream to a destination.
The destination is described both by a Target, for progress tracking information, and a P: Push where the
records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
pub fn new(source: Source, output: TeeHelper<S::Timestamp, D>, scope: S) -> Self[src]
Allocates a Stream from a supplied Source name and rendezvous point.
pub fn name(&self) -> &Source[src]
The name of the stream's source operator.
pub fn scope(&self) -> S[src]
The scope immediately containing the stream.
Trait Implementations
impl<T: Timestamp, G: Scope, D: Data> Enter<G, T, D> for Stream<G, D>[src]
fn enter<'a>(&self, scope: &Child<'a, G, T>) -> Stream<Child<'a, G, T>, D>[src]
Moves the Stream argument into a child of its current Scope. Read more
impl<'a, G: Scope, D: Data, T: Timestamp> Leave<G, D> for Stream<Child<'a, G, T>, D>[src]
impl<'a, G: ScopeParent, T: Timestamp, D: Data> ConnectLoop<G, T, D> for Stream<Child<'a, G, T>, D>[src]
fn connect_loop(&self, helper: Handle<G::Timestamp, T, D>)[src]
Connect a Stream to be the input of a loop variable. Read more
impl<G: Scope, D: Data> Concat<G, D> for Stream<G, D>[src]
fn concat(&self, other: &Stream<G, D>) -> Stream<G, D>[src]
Merge the contents of two streams. Read more
impl<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2) + 'static> Partition<G, D, D2, F> for Stream<G, D>[src]
fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>[src]
Produces parts output streams, containing records produced and assigned by route. Read more
impl<S: Scope, D: Data> Map<S, D> for Stream<S, D>[src]
fn map<D2: Data, L: Fn(D) -> D2 + 'static>(&self, logic: L) -> Stream<S, D2>[src]
Consumes each element of the stream and yields a new element. Read more
fn map_in_place<L: Fn(&mut D) + 'static>(&self, logic: L) -> Stream<S, D>[src]
Updates each element of the stream and yields the element, re-using memory where possible. Read more
fn flat_map<I: IntoIterator, L: Fn(D) -> I + 'static>(
&self,
logic: L
) -> Stream<S, I::Item> where
I::Item: Data, [src]
&self,
logic: L
) -> Stream<S, I::Item> where
I::Item: Data,
Consumes each element of the stream and yields some number of new elements. Read more
impl<G: Scope, D: Data> Inspect<G, D> for Stream<G, D>[src]
fn inspect<F: FnMut(&D) + 'static>(&self, func: F) -> Stream<G, D>[src]
Runs a supplied closure on each observed data element. Read more
fn inspect_batch<F: FnMut(&G::Timestamp, &[D]) + 'static>(
&self,
func: F
) -> Stream<G, D>[src]
&self,
func: F
) -> Stream<G, D>
Runs a supplied closure on each observed data batch (time and data slice). Read more
impl<G: Scope, D: Data> Filter<D> for Stream<G, D>[src]
fn filter<L: Fn(&D) -> bool + 'static>(&self, predicate: L) -> Stream<G, D>[src]
Returns a new instance of self containing only records satisfying predicate. Read more
impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D>[src]
fn delay<F: Fn(&D, &G::Timestamp) -> G::Timestamp + 'static>(
&self,
func: F
) -> Stream<G, D>[src]
&self,
func: F
) -> Stream<G, D>
Advances the timestamp of records using a supplied function. Read more
fn delay_batch<F: Fn(&G::Timestamp) -> G::Timestamp + 'static>(
&self,
func: F
) -> Stream<G, D>[src]
&self,
func: F
) -> Stream<G, D>
Advances the timestamp of batches of records using a supplied function. Read more
impl<T: Timestamp, G: Scope<Timestamp = T>, D: ExchangeData> Exchange<T, D> for Stream<G, D>[src]
fn exchange<F: Fn(&D) -> u64 + 'static>(&self, route: F) -> Stream<G, D>[src]
Exchange records so that all records with the same route are at the same worker. Read more
fn exchange_ts<F: Fn(&T, &D) -> u64 + 'static>(&self, route: F) -> Stream<G, D>[src]
Exchange records by time so that all records whose time and data evaluate to the same route are at the same worker. Read more
impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D>[src]
impl<G: Scope, D: Data> Probe<G, D> for Stream<G, D>[src]
fn probe(&self) -> Handle<G::Timestamp>[src]
Constructs a progress probe which indicates which timestamps have elapsed at the operator. Read more
fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> Stream<G, D>[src]
Inserts a progress probe in a stream. Read more
impl<S: Scope, D: Data> Capture<S::Timestamp, D> for Stream<S, D>[src]
fn capture_into<P: EventPusher<S::Timestamp, D> + 'static>(
&self,
event_pusher: P
)[src]
&self,
event_pusher: P
)
Captures a stream of timestamped data for later replay. Read more
fn capture(&self) -> Receiver<Event<T, D>>[src]
Captures a stream using Rust's MPSC channels.
impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)>[src]
fn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq, [src]
&self,
fold: F,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq,
Tracks a state for each presented key, using user-supplied state transition logic. Read more
impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)>[src]
fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, G: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
emit: G,
hash: H
) -> Stream<S, R> where
S::Timestamp: Eq, [src]
&self,
fold: F,
emit: G,
hash: H
) -> Stream<S, R> where
S::Timestamp: Eq,
Aggregates data of the form (key, val), using user-supplied logic. Read more
impl<G: Scope, D1: Data> Unary<G, D1> for Stream<G, D1>[src]
fn unary_notify<D2: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>, &mut Notificator<G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D2>[src]
&self,
pact: P,
name: &str,
init: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D2>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic which can read from the input stream, write to the output stream, and request and receive notifications. The method also requires a vector of the initial notifications the operator requires (commonly none). Read more
fn unary_stream<D2: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static, P: ParallelizationContract<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
logic: L
) -> Stream<G, D2>[src]
&self,
pact: P,
name: &str,
logic: L
) -> Stream<G, D2>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic which can read from the input stream and write to the output stream. Read more
impl<G: Scope, D1: Data> Binary<G, D1> for Stream<G, D1>[src]
fn binary_stream<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
logic: L
) -> Stream<G, D3>[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
logic: L
) -> Stream<G, D3>
Creates a new dataflow operator that partitions each of its input stream by a parallelization strategy pact, and repeatedly invokes logic which can read from the input streams and write to the output stream. Read more
fn binary_notify<D2: Data, D3: Data, L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>, &mut Notificator<G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, D1>, P2: ParallelizationContract<G::Timestamp, D2>>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D3>[src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: Vec<G::Timestamp>,
logic: L
) -> Stream<G, D3>
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic which can read from the input streams, write to the output stream, and request and receive notifications. The method also requires a vector of the initial notifications the operator requires (commonly none). Read more
impl<G: Scope, D1: Data> Operator<G, D1> for Stream<G, D1>[src]
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>, [src]
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeteadly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read more
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>, [src]
&self,
pact: P,
name: &str,
constructor: B
) -> Stream<G, D2> where
D2: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P::Puller>, &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeteadly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, and write to the output stream. Read more
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>, [src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeteadly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>, [src]
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> Stream<G, D3> where
D2: Data,
D3: Data,
B: FnOnce(Capability<G::Timestamp>) -> L,
L: FnMut(&mut InputHandle<G::Timestamp, D1, P1::Puller>, &mut InputHandle<G::Timestamp, D2, P2::Puller>, &mut OutputHandle<G::Timestamp, D3, Tee<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<G::Timestamp, D1>,
P2: ParallelizationContract<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeteadly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
fn sink<L, P>(&self, pact: P, name: &str, logic: L) where
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>, [src]
L: FnMut(&mut FrontieredInputHandle<G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeteadly invokes the function logic which can read from the input stream and inspect the frontier at the input. Read more
impl<S: Scope, D: Data> Reclock<S, D> for Stream<S, D>[src]
fn reclock(&self, clock: &Stream<S, ()>) -> Stream<S, D>[src]
Delays records until an input is observed on the clock input. Read more
impl<G: Scope, D: Data> Accumulate<G, D> for Stream<G, D>[src]
fn accumulate<A: Data, F: Fn(&mut A, &mut Content<D>) + 'static>(
&self,
default: A,
logic: F
) -> Stream<G, A>[src]
&self,
default: A,
logic: F
) -> Stream<G, A>
Accumulates records within a timestamp. Read more
fn count(&self) -> Stream<G, usize>[src]
Counts the number of records observed at each time. Read more
impl<S: Clone + Scope, D: Clone> Clone for Stream<S, D> where
S::Timestamp: Clone, [src]
S::Timestamp: Clone,