Trait StatefulStreamExt

Source
pub trait StatefulStreamExt<T>:
    Stream<Item = T>
    + Send
    + Sync
    + Sized
    + Unpin
    + 'static
where Self: 'static, T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
{
Show 13 methods // Provided methods fn stateful_map_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized { ... } fn stateful_filter_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>> where F: FnMut(&T, StateAccess) -> Pin<Box<dyn Future<Output = Result<bool, StateError>> + Send>> + Send + Sync + 'static, Self: Sized { ... } fn stateful_fold_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, initial: R, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(R, T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + Clone + 'static, Self: Sized { ... } fn stateful_reduce_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, initial: R, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(R, T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + Clone + 'static, Self: Sized { ... } fn stateful_group_by_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(String, Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized { ... } fn stateful_group_by_advanced_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, max_group_size: Option<usize>, group_timeout: Option<Duration>, emit_on_key_change: bool, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(String, Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized { ... } fn stateful_deduplicate_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, ttl: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>> where F: FnMut(T) -> T + Send + Sync + 'static, Self: Sized { ... } fn stateful_throttle_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, rate_limit: u32, window_duration: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>> where F: FnMut(T) -> T + Send + Sync + 'static, Self: Sized { ... } fn stateful_session_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, session_timeout: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>> where F: FnMut(T, bool) -> T + Send + Sync + 'static, Self: Sized { ... } fn stateful_pattern_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, pattern_size: usize, f: F, ) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>> where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<Option<String>, StateError>> + Send>> + Send + Sync + 'static, Self: Sized { ... } fn stateful_join_rs2<U, F, R>( self, other: Pin<Box<dyn Stream<Item = U> + Send>>, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static, window_duration: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(T, U, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, U: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static, R: Send + Sync + 'static, Self: Sized { ... } fn stateful_window_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, window_size: usize, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized { ... } fn stateful_window_rs2_advanced<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, window_size: usize, slide_size: Option<usize>, emit_partial: bool, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>> where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized { ... }
}
Expand description

Extension trait for adding stateful operations to streams

Provided Methods§

Source

fn stateful_map_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized,

Apply a stateful map operation

Source

fn stateful_filter_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(&T, StateAccess) -> Pin<Box<dyn Future<Output = Result<bool, StateError>> + Send>> + Send + Sync + 'static, Self: Sized,

Apply a stateful filter operation

Source

fn stateful_fold_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, initial: R, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(R, T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + Clone + 'static, Self: Sized,

Apply a stateful fold operation

Source

fn stateful_reduce_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, initial: R, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(R, T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + Clone + 'static, Self: Sized,

Apply a stateful reduce operation

Source

fn stateful_group_by_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(String, Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized,

Apply a stateful group by operation

Source

fn stateful_group_by_advanced_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, max_group_size: Option<usize>, group_timeout: Option<Duration>, emit_on_key_change: bool, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(String, Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized,

Apply a stateful group by operation with advanced configuration

Source

fn stateful_deduplicate_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, ttl: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(T) -> T + Send + Sync + 'static, Self: Sized,

Apply a stateful deduplication operation

Source

fn stateful_throttle_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, rate_limit: u32, window_duration: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(T) -> T + Send + Sync + 'static, Self: Sized,

Apply a stateful throttle operation

Source

fn stateful_session_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, session_timeout: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(T, bool) -> T + Send + Sync + 'static, Self: Sized,

Apply a stateful session operation

Source

fn stateful_pattern_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, pattern_size: usize, f: F, ) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<Option<String>, StateError>> + Send>> + Send + Sync + 'static, Self: Sized,

Apply a stateful pattern operation

Source

fn stateful_join_rs2<U, F, R>( self, other: Pin<Box<dyn Stream<Item = U> + Send>>, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static, window_duration: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(T, U, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, U: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static, R: Send + Sync + 'static, Self: Sized,

Join two streams based on keys with time-based windows (true streaming join)

Source

fn stateful_window_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, window_size: usize, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized,

Apply a stateful window operation (tumbling window, no partial emission)

Source

fn stateful_window_rs2_advanced<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, window_size: usize, slide_size: Option<usize>, emit_partial: bool, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static, R: Send + Sync + 'static, Self: Sized,

Apply a stateful window operation with sliding window support

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<T, S> StatefulStreamExt<T> for S
where S: Stream<Item = T> + Send + Sync + Unpin + 'static, T: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,