TakeWhileExt

Trait TakeWhileExt 

Source
pub trait TakeWhileExt<TItem, TFilter, S>: Stream<Item = StreamItem<TItem>> + Sized
where TItem: ComparableUnpinTimestamped, TItem::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static, TFilter: ComparableUnpinTimestamped<Timestamp = TItem::Timestamp>, TFilter::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static, S: Stream<Item = StreamItem<TFilter>> + Send + Sync + 'static,
{ // Required method fn take_while_with( self, filter_stream: S, filter: impl Fn(&TFilter::Inner) -> bool + Send + Sync + 'static, ) -> FluxionStream<impl Stream<Item = StreamItem<TItem>> + Send>; }
Expand description

Extension trait providing the take_while_with operator for timestamped streams.

This operator conditionally emits elements from a source stream based on values from a separate filter stream. The stream terminates when the filter condition becomes false.

Required Methods§

Source

fn take_while_with( self, filter_stream: S, filter: impl Fn(&TFilter::Inner) -> bool + Send + Sync + 'static, ) -> FluxionStream<impl Stream<Item = StreamItem<TItem>> + Send>

Takes elements from the source stream while the filter predicate returns true.

This operator merges the source stream with a filter stream in temporal order. It maintains the latest filter value and only emits source values when the filter predicate evaluates to true. Once the predicate returns false, the entire stream terminates.

§Behavior
  • Source values are emitted only when latest filter passes the predicate
  • Filter stream updates change the gating condition
  • Stream terminates immediately when filter predicate returns false
  • Emitted values maintain their ordered wrapper
§Arguments
  • filter_stream - Stream providing filter values that control emission
  • filter - Predicate function applied to filter values. Returns true to continue.
§Returns

A FluxionStream of source elements that are emitted while the filter condition remains true. Stream terminates when condition becomes false.

§Errors

This operator may produce StreamItem::Error in the following cases:

  • Lock Errors: When acquiring the combined state lock fails (e.g., due to lock poisoning). These are transient errors - the stream continues processing and may succeed on subsequent items.

Lock errors are typically non-fatal and indicate temporary contention. The operator will continue processing subsequent items. See the Error Handling Guide for patterns on handling these errors in your application.

§See Also
§Examples
use fluxion_stream::{TakeWhileExt, FluxionStream};
use fluxion_test_utils::Sequenced;
use fluxion_core::Timestamped as TimestampedTrait;
use futures::StreamExt;

// Create channels
let (tx_data, rx_data) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
let (tx_gate, rx_gate) = tokio::sync::mpsc::unbounded_channel::<Sequenced<bool>>();

// Create streams
let data_stream = FluxionStream::from_unbounded_receiver(rx_data);
let gate_stream = FluxionStream::from_unbounded_receiver(rx_gate);

// Combine streams
let mut gated = data_stream.take_while_with(
    gate_stream,
    |gate_value| *gate_value == true
);

// Send values
tx_gate.send((true, 1).into()).unwrap();
tx_data.send((1, 2).into()).unwrap();

// Assert
assert_eq!(&gated.next().await.unwrap().unwrap().value, &1);
§Use Cases
  • Emergency stop mechanism for data streams
  • Time-bounded stream processing
  • Conditional data forwarding with external control
§Thread Safety

Uses internal locks to maintain state. Lock errors are logged and cause the stream to terminate.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<TItem, TFilter, S, P> TakeWhileExt<TItem, TFilter, S> for P
where P: Stream<Item = StreamItem<TItem>> + Send + Sync + Unpin + 'static, TItem: ComparableUnpinTimestamped, TItem::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static, TFilter: ComparableUnpinTimestamped<Timestamp = TItem::Timestamp>, TFilter::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static, S: Stream<Item = StreamItem<TFilter>> + Send + Sync + 'static,