pub struct Stream<S, D>where
S: Scope,{ /* private fields */ }
Expand description
Abstraction of a stream of D: Data
records timestamped with S::Timestamp
.
Internally Stream
maintains a list of data recipients who should be presented with data
produced by the source of the stream.
Implementations§
Source§impl<S, D> Stream<S, D>where
S: Scope,
impl<S, D> Stream<S, D>where
S: Scope,
Sourcepub fn connect_to<P>(&self, target: Target, pusher: P, identifier: usize)
pub fn connect_to<P>(&self, target: Target, pusher: P, identifier: usize)
Connects the stream to a destination.
The destination is described both by a Target
, for progress tracking information, and a P: Push
where the
records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
Trait Implementations§
Source§impl<G, D> Accumulate<G, D> for Stream<G, D>
impl<G, D> Accumulate<G, D> for Stream<G, D>
Source§impl<S, K, V> Aggregate<S, K, V> for Stream<S, (K, V)>
impl<S, K, V> Aggregate<S, K, V> for Stream<S, (K, V)>
Source§impl<S, D> Branch<S, D> for Stream<S, D>
impl<S, D> Branch<S, D> for Stream<S, D>
Source§fn branch(
&self,
condition: impl Fn(&<S as ScopeParent>::Timestamp, &D) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>)
fn branch( &self, condition: impl Fn(&<S as ScopeParent>::Timestamp, &D) -> bool + 'static, ) -> (Stream<S, D>, Stream<S, D>)
Takes one input stream and splits it into two output streams.
For each record, the supplied closure is called with a reference to
the data and its time. If it returns true, the record will be sent
to the second returned stream, otherwise it will be sent to the first. Read more
Source§impl<S, D> BranchWhen<S, D> for Stream<S, D>
impl<S, D> BranchWhen<S, D> for Stream<S, D>
Source§fn branch_when(
&self,
condition: impl Fn(&<S as ScopeParent>::Timestamp) -> bool + 'static,
) -> (Stream<S, D>, Stream<S, D>)
fn branch_when( &self, condition: impl Fn(&<S as ScopeParent>::Timestamp) -> bool + 'static, ) -> (Stream<S, D>, Stream<S, D>)
Takes one input stream and splits it into two output streams.
For each time, the supplied closure is called. If it returns true,
the records for that will be sent to the second returned stream, otherwise
they will be sent to the first. Read more
Source§impl<S, D> Capture<<S as ScopeParent>::Timestamp, D> for Stream<S, D>
impl<S, D> Capture<<S as ScopeParent>::Timestamp, D> for Stream<S, D>
Source§fn capture_into<P>(&self, event_pusher: P)
fn capture_into<P>(&self, event_pusher: P)
Captures a stream of timestamped data for later replay. Read more
Source§impl<G, D> Concatenate<G, D> for Stream<G, D>
impl<G, D> Concatenate<G, D> for Stream<G, D>
Source§fn concatenate<I>(&self, sources: I) -> Stream<G, D>where
I: IntoIterator<Item = Stream<G, D>>,
fn concatenate<I>(&self, sources: I) -> Stream<G, D>where
I: IntoIterator<Item = Stream<G, D>>,
Merge the contents of multiple streams. Read more
Source§impl<G, D> ConnectLoop<G, D> for Stream<G, D>
impl<G, D> ConnectLoop<G, D> for Stream<G, D>
Source§fn connect_loop(&self, helper: Handle<G, D>)
fn connect_loop(&self, helper: Handle<G, D>)
Connect a
Stream
to be the input of a loop variable. Read moreSource§impl<G, D> Delay<G, D> for Stream<G, D>
impl<G, D> Delay<G, D> for Stream<G, D>
Source§fn delay<L>(&self, func: L) -> Stream<G, D>
fn delay<L>(&self, func: L) -> Stream<G, D>
Advances the timestamp of records using a supplied function. Read more
Source§fn delay_total<L>(&self, func: L) -> Stream<G, D>where
L: FnMut(&D, &<G as ScopeParent>::Timestamp) -> <G as ScopeParent>::Timestamp + 'static,
<G as ScopeParent>::Timestamp: TotalOrder,
fn delay_total<L>(&self, func: L) -> Stream<G, D>where
L: FnMut(&D, &<G as ScopeParent>::Timestamp) -> <G as ScopeParent>::Timestamp + 'static,
<G as ScopeParent>::Timestamp: TotalOrder,
Advances the timestamp of records using a supplied function. Read more
Source§fn delay_batch<L>(&self, func: L) -> Stream<G, D>
fn delay_batch<L>(&self, func: L) -> Stream<G, D>
Advances the timestamp of batches of records using a supplied function. Read more
Source§impl<G, D> Exchange<<G as ScopeParent>::Timestamp, D> for Stream<G, D>where
G: Scope,
D: ExchangeData,
impl<G, D> Exchange<<G as ScopeParent>::Timestamp, D> for Stream<G, D>where
G: Scope,
D: ExchangeData,
Source§impl<G, D> Inspect<G, D> for Stream<G, D>
impl<G, D> Inspect<G, D> for Stream<G, D>
Source§fn inspect_batch(
&self,
func: impl FnMut(&<G as ScopeParent>::Timestamp, &[D]) + 'static,
) -> Stream<G, D>
fn inspect_batch( &self, func: impl FnMut(&<G as ScopeParent>::Timestamp, &[D]) + 'static, ) -> Stream<G, D>
Runs a supplied closure on each observed data batch (time and data slice). Read more
Source§fn inspect(&self, func: impl FnMut(&D) + 'static) -> Stream<G, D>
fn inspect(&self, func: impl FnMut(&D) + 'static) -> Stream<G, D>
Runs a supplied closure on each observed data element. Read more
Source§fn inspect_time(
&self,
func: impl FnMut(&<G as ScopeParent>::Timestamp, &D) + 'static,
) -> Stream<G, D>
fn inspect_time( &self, func: impl FnMut(&<G as ScopeParent>::Timestamp, &D) + 'static, ) -> Stream<G, D>
Runs a supplied closure on each observed data element and associated time. Read more
Source§impl<S, D> Map<S, D> for Stream<S, D>
impl<S, D> Map<S, D> for Stream<S, D>
Source§fn map<D2, L>(&self, logic: L) -> Stream<S, D2>
fn map<D2, L>(&self, logic: L) -> Stream<S, D2>
Consumes each element of the stream and yields a new element. Read more
Source§fn map_in_place<L>(&self, logic: L) -> Stream<S, D>
fn map_in_place<L>(&self, logic: L) -> Stream<S, D>
Updates each element of the stream and yields the element, re-using memory where possible. Read more
Source§impl<S, D> OkErr<S, D> for Stream<S, D>
impl<S, D> OkErr<S, D> for Stream<S, D>
Source§fn ok_err<D1, D2, L>(&self, logic: L) -> (Stream<S, D1>, Stream<S, D2>)
fn ok_err<D1, D2, L>(&self, logic: L) -> (Stream<S, D1>, Stream<S, D2>)
Takes one input stream and splits it into two output streams.
For each record, the supplied closure is called with the data.
If it returns
Ok(x)
, then x
will be sent
to the first returned stream; otherwise, if it returns Err(e)
,
then e
will be sent to the second. Read moreSource§impl<G, D1> Operator<G, D1> for Stream<G, D1>
impl<G, D1> Operator<G, D1> for Stream<G, D1>
Source§fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, D2>where
D2: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D2, Tee<<G as ScopeParent>::Timestamp, D2>>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, D2>where
D2: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D2, Tee<<G as ScopeParent>::Timestamp, D2>>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moreSource§fn unary_notify<D2, L, P>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = <G as ScopeParent>::Timestamp>,
logic: L,
) -> Stream<G, D2>where
D2: Data,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D2, Tee<<G as ScopeParent>::Timestamp, D2>>, &mut Notificator<'_, <G as ScopeParent>::Timestamp>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
fn unary_notify<D2, L, P>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = <G as ScopeParent>::Timestamp>,
logic: L,
) -> Stream<G, D2>where
D2: Data,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D2, Tee<<G as ScopeParent>::Timestamp, D2>>, &mut Notificator<'_, <G as ScopeParent>::Timestamp>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moreSource§fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, D2>where
D2: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D2, Tee<<G as ScopeParent>::Timestamp, D2>>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> Stream<G, D2>where
D2: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D2, Tee<<G as ScopeParent>::Timestamp, D2>>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, and write to the output stream. Read moreSource§fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, D3>where
D2: Data,
D3: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D1, <P1 as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D2, <P2 as ParallelizationContract<<G as ScopeParent>::Timestamp, D2>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D3, Tee<<G as ScopeParent>::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
P2: ParallelizationContract<<G as ScopeParent>::Timestamp, D2>,
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, D3>where
D2: Data,
D3: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D1, <P1 as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D2, <P2 as ParallelizationContract<<G as ScopeParent>::Timestamp, D2>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D3, Tee<<G as ScopeParent>::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
P2: ParallelizationContract<<G as ScopeParent>::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary_notify<D2, D3, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = <G as ScopeParent>::Timestamp>,
logic: L,
) -> Stream<G, D3>where
D2: Data,
D3: Data,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P1 as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut InputHandle<<G as ScopeParent>::Timestamp, D2, <P2 as ParallelizationContract<<G as ScopeParent>::Timestamp, D2>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D3, Tee<<G as ScopeParent>::Timestamp, D3>>, &mut Notificator<'_, <G as ScopeParent>::Timestamp>) + 'static,
P1: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
P2: ParallelizationContract<<G as ScopeParent>::Timestamp, D2>,
fn binary_notify<D2, D3, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = <G as ScopeParent>::Timestamp>,
logic: L,
) -> Stream<G, D3>where
D2: Data,
D3: Data,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P1 as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut InputHandle<<G as ScopeParent>::Timestamp, D2, <P2 as ParallelizationContract<<G as ScopeParent>::Timestamp, D2>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D3, Tee<<G as ScopeParent>::Timestamp, D3>>, &mut Notificator<'_, <G as ScopeParent>::Timestamp>) + 'static,
P1: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
P2: ParallelizationContract<<G as ScopeParent>::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, D3>where
D2: Data,
D3: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P1 as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut InputHandle<<G as ScopeParent>::Timestamp, D2, <P2 as ParallelizationContract<<G as ScopeParent>::Timestamp, D2>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D3, Tee<<G as ScopeParent>::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
P2: ParallelizationContract<<G as ScopeParent>::Timestamp, D2>,
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &Stream<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> Stream<G, D3>where
D2: Data,
D3: Data,
B: FnOnce(Capability<<G as ScopeParent>::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandle<<G as ScopeParent>::Timestamp, D1, <P1 as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>, &mut InputHandle<<G as ScopeParent>::Timestamp, D2, <P2 as ParallelizationContract<<G as ScopeParent>::Timestamp, D2>>::Puller>, &mut OutputHandle<'_, <G as ScopeParent>::Timestamp, D3, Tee<<G as ScopeParent>::Timestamp, D3>>) + 'static,
P1: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
P2: ParallelizationContract<<G as ScopeParent>::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandle<'_, <G as ScopeParent>::Timestamp, D1, <P as ParallelizationContract<<G as ScopeParent>::Timestamp, D1>>::Puller>) + 'static,
P: ParallelizationContract<<G as ScopeParent>::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes the function logic
which can read from the input stream
and inspect the frontier at the input. Read moreSource§impl<G, D> Probe<G, D> for Stream<G, D>
impl<G, D> Probe<G, D> for Stream<G, D>
Source§fn probe(&self) -> Handle<<G as ScopeParent>::Timestamp>
fn probe(&self) -> Handle<<G as ScopeParent>::Timestamp>
Constructs a progress probe which indicates which timestamps have elapsed at the operator. Read more
Source§fn probe_with(
&self,
handle: &mut Handle<<G as ScopeParent>::Timestamp>,
) -> Stream<G, D>
fn probe_with( &self, handle: &mut Handle<<G as ScopeParent>::Timestamp>, ) -> Stream<G, D>
Inserts a progress probe in a stream. Read more
Source§impl<S, T, E> ResultStream<S, T, E> for Stream<S, Result<T, E>>
impl<S, T, E> ResultStream<S, T, E> for Stream<S, Result<T, E>>
Source§impl<S, K, V> StateMachine<S, K, V> for Stream<S, (K, V)>
impl<S, K, V> StateMachine<S, K, V> for Stream<S, (K, V)>
Source§fn state_machine<R, D, I, F, H>(&self, fold: F, hash: H) -> Stream<S, R>
fn state_machine<R, D, I, F, H>(&self, fold: F, hash: H) -> Stream<S, R>
Tracks a state for each presented key, using user-supplied state transition logic. Read more
Auto Trait Implementations§
impl<S, D> Freeze for Stream<S, D>where
S: Freeze,
impl<S, D> !RefUnwindSafe for Stream<S, D>
impl<S, D> !Send for Stream<S, D>
impl<S, D> !Sync for Stream<S, D>
impl<S, D> Unpin for Stream<S, D>where
S: Unpin,
impl<S, D> !UnwindSafe for Stream<S, D>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<G, T, D, E> EnterAt<G, T, D> for E
impl<G, T, D, E> EnterAt<G, T, D> for E
Source§fn enter_at<'a, F>(
&self,
scope: &Child<'a, G, Product<<G as ScopeParent>::Timestamp, T>>,
initial: F,
) -> Stream<Child<'a, G, Product<<G as ScopeParent>::Timestamp, T>>, D>
fn enter_at<'a, F>( &self, scope: &Child<'a, G, Product<<G as ScopeParent>::Timestamp, T>>, initial: F, ) -> Stream<Child<'a, G, Product<<G as ScopeParent>::Timestamp, T>>, D>
Moves the
Stream
argument into a child of its current Scope
setting the timestamp for each element by initial
. Read more