pub trait StatefulStreamExt<T>:
Stream<Item = T>
+ Send
+ Sync
+ Sized
+ Unpin
+ 'static{
Show 13 methods
// Provided methods
fn stateful_map_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
R: Send + Sync + 'static,
Self: Sized { ... }
fn stateful_filter_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(&T, StateAccess) -> Pin<Box<dyn Future<Output = Result<bool, StateError>> + Send>> + Send + Sync + 'static,
Self: Sized { ... }
fn stateful_fold_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
initial: R,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(R, T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
R: Send + Sync + Clone + 'static,
Self: Sized { ... }
fn stateful_reduce_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
initial: R,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(R, T, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
R: Send + Sync + Clone + 'static,
Self: Sized { ... }
fn stateful_group_by_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(String, Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
R: Send + Sync + 'static,
Self: Sized { ... }
fn stateful_group_by_advanced_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
max_group_size: Option<usize>,
group_timeout: Option<Duration>,
emit_on_key_change: bool,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(String, Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
R: Send + Sync + 'static,
Self: Sized { ... }
fn stateful_deduplicate_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
ttl: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(T) -> T + Send + Sync + 'static,
Self: Sized { ... }
fn stateful_throttle_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
rate_limit: u32,
window_duration: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(T) -> T + Send + Sync + 'static,
Self: Sized { ... }
fn stateful_session_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
session_timeout: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
where F: FnMut(T, bool) -> T + Send + Sync + 'static,
Self: Sized { ... }
fn stateful_pattern_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
pattern_size: usize,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<Option<String>, StateError>> + Send>> + Send + Sync + 'static,
Self: Sized { ... }
fn stateful_join_rs2<U, F, R>(
self,
other: Pin<Box<dyn Stream<Item = U> + Send>>,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static,
window_duration: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(T, U, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
U: Send + Sync + Clone + Serialize + for<'de> Deserialize<'de> + 'static,
R: Send + Sync + 'static,
Self: Sized { ... }
fn stateful_window_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
window_size: usize,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
R: Send + Sync + 'static,
Self: Sized { ... }
fn stateful_window_rs2_advanced<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
window_size: usize,
slide_size: Option<usize>,
emit_partial: bool,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
where F: FnMut(Vec<T>, StateAccess) -> Pin<Box<dyn Future<Output = Result<R, StateError>> + Send>> + Send + Sync + 'static,
R: Send + Sync + 'static,
Self: Sized { ... }
}Expand description
Extension trait for adding stateful operations to streams
Provided Methods§
Sourcefn stateful_map_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_map_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Apply a stateful map operation
Sourcefn stateful_filter_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
fn stateful_filter_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
Apply a stateful filter operation
Sourcefn stateful_fold_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
initial: R,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_fold_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, initial: R, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Apply a stateful fold operation
Sourcefn stateful_reduce_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
initial: R,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_reduce_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, initial: R, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Apply a stateful reduce operation
Sourcefn stateful_group_by_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_group_by_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Apply a stateful group by operation
Sourcefn stateful_group_by_advanced_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
max_group_size: Option<usize>,
group_timeout: Option<Duration>,
emit_on_key_change: bool,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_group_by_advanced_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, max_group_size: Option<usize>, group_timeout: Option<Duration>, emit_on_key_change: bool, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Apply a stateful group by operation with advanced configuration
Sourcefn stateful_deduplicate_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
ttl: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
fn stateful_deduplicate_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, ttl: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
Apply a stateful deduplication operation
Sourcefn stateful_throttle_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
rate_limit: u32,
window_duration: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
fn stateful_throttle_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, rate_limit: u32, window_duration: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
Apply a stateful throttle operation
Sourcefn stateful_session_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
session_timeout: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
fn stateful_session_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, session_timeout: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<T, StateError>> + Send>>
Apply a stateful session operation
Sourcefn stateful_pattern_rs2<F>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
pattern_size: usize,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
fn stateful_pattern_rs2<F>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, pattern_size: usize, f: F, ) -> Pin<Box<dyn Stream<Item = Result<Option<String>, StateError>> + Send>>
Apply a stateful pattern operation
Sourcefn stateful_join_rs2<U, F, R>(
self,
other: Pin<Box<dyn Stream<Item = U> + Send>>,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static,
window_duration: Duration,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_join_rs2<U, F, R>( self, other: Pin<Box<dyn Stream<Item = U> + Send>>, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, other_key_extractor: impl KeyExtractor<U> + Send + Sync + 'static, window_duration: Duration, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Join two streams based on keys with time-based windows (true streaming join)
Sourcefn stateful_window_rs2<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
window_size: usize,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_window_rs2<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, window_size: usize, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Apply a stateful window operation (tumbling window, no partial emission)
Sourcefn stateful_window_rs2_advanced<F, R>(
self,
config: StateConfig,
key_extractor: impl KeyExtractor<T> + Send + Sync + 'static,
window_size: usize,
slide_size: Option<usize>,
emit_partial: bool,
f: F,
) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
fn stateful_window_rs2_advanced<F, R>( self, config: StateConfig, key_extractor: impl KeyExtractor<T> + Send + Sync + 'static, window_size: usize, slide_size: Option<usize>, emit_partial: bool, f: F, ) -> Pin<Box<dyn Stream<Item = Result<R, StateError>> + Send>>
Apply a stateful window operation with sliding window support
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.