fluxion_stream/
take_while_with.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use crate::FluxionStream;
6use fluxion_core::lock_utilities::lock_or_recover;
7use fluxion_core::{ComparableUnpinTimestamped, FluxionError, HasTimestamp, StreamItem};
8use fluxion_ordered_merge::OrderedMergeExt;
9use futures::stream::StreamExt;
10use futures::Stream;
11use std::fmt::Debug;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14
15type PinnedItemStream<TItem, TFilter> =
16    Pin<Box<dyn Stream<Item = Item<TItem, TFilter>> + Send + Sync + 'static>>;
17
18/// Extension trait providing the `take_while_with` operator for timestamped streams.
19///
20/// This operator conditionally emits elements from a source stream based on values
21/// from a separate filter stream. The stream terminates when the filter condition
22/// becomes false.
23pub trait TakeWhileExt<TItem, TFilter, S>: Stream<Item = StreamItem<TItem>> + Sized
24where
25    TItem: ComparableUnpinTimestamped,
26    TItem::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
27    TFilter: ComparableUnpinTimestamped<Timestamp = TItem::Timestamp>,
28    TFilter::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
29    S: Stream<Item = StreamItem<TFilter>> + Send + Sync + 'static,
30{
31    /// Takes elements from the source stream while the filter predicate returns true.
32    ///
33    /// This operator merges the source stream with a filter stream in temporal order.
34    /// It maintains the latest filter value and only emits source values when the
35    /// filter predicate evaluates to `true`. Once the predicate returns `false`,
36    /// the entire stream terminates.
37    ///
38    /// # Behavior
39    ///
40    /// - Source values are emitted only when latest filter passes the predicate
41    /// - Filter stream updates change the gating condition
42    /// - Stream terminates immediately when filter predicate returns `false`
43    /// - Emitted values maintain their ordered wrapper
44    ///
45    /// # Arguments
46    ///
47    /// * `filter_stream` - Stream providing filter values that control emission
48    /// * `filter` - Predicate function applied to filter values. Returns `true` to continue.
49    ///
50    /// # Returns
51    ///
52    /// A `FluxionStream` of source elements that are emitted while the filter condition
53    /// remains true. Stream terminates when condition becomes false.
54    ///
55    /// # Errors
56    ///
57    /// This operator may produce `StreamItem::Error` in the following cases:
58    ///
59    /// - **Lock Errors**: When acquiring the combined state lock fails (e.g., due to lock poisoning).
60    ///   These are transient errors - the stream continues processing and may succeed on subsequent items.
61    ///
62    /// Lock errors are typically non-fatal and indicate temporary contention. The operator will continue
63    /// processing subsequent items. See the [Error Handling Guide](../docs/ERROR-HANDLING.md) for patterns
64    /// on handling these errors in your application.
65    ///
66    /// # See Also
67    ///
68    /// - [`emit_when`](crate::EmitWhenExt::emit_when) - Gates emissions but doesn't terminate
69    /// - [`take_latest_when`](crate::TakeLatestWhenExt::take_latest_when) - Samples on condition
70    ///
71    /// # Examples
72    ///
73    /// ```rust
74    /// use fluxion_stream::{TakeWhileExt, FluxionStream};
75    /// use fluxion_test_utils::Sequenced;
76    /// use fluxion_core::Timestamped as TimestampedTrait;
77    /// use futures::StreamExt;
78    ///
79    /// # async fn example() {
80    /// // Create channels
81    /// let (tx_data, rx_data) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
82    /// let (tx_gate, rx_gate) = tokio::sync::mpsc::unbounded_channel::<Sequenced<bool>>();
83    ///
84    /// // Create streams
85    /// let data_stream = FluxionStream::from_unbounded_receiver(rx_data);
86    /// let gate_stream = FluxionStream::from_unbounded_receiver(rx_gate);
87    ///
88    /// // Combine streams
89    /// let mut gated = data_stream.take_while_with(
90    ///     gate_stream,
91    ///     |gate_value| *gate_value == true
92    /// );
93    ///
94    /// // Send values
95    /// tx_gate.send((true, 1).into()).unwrap();
96    /// tx_data.send((1, 2).into()).unwrap();
97    ///
98    /// // Assert
99    /// assert_eq!(&gated.next().await.unwrap().unwrap().value, &1);
100    /// # }
101    /// ```
102    ///
103    /// # Use Cases
104    ///
105    /// - Emergency stop mechanism for data streams
106    /// - Time-bounded stream processing
107    /// - Conditional data forwarding with external control
108    ///
109    /// # Thread Safety
110    ///
111    /// Uses internal locks to maintain state. Lock errors are logged and cause
112    /// the stream to terminate.
113    fn take_while_with(
114        self,
115        filter_stream: S,
116        filter: impl Fn(&TFilter::Inner) -> bool + Send + Sync + 'static,
117    ) -> FluxionStream<impl Stream<Item = StreamItem<TItem>> + Send>;
118}
119
120impl<TItem, TFilter, S, P> TakeWhileExt<TItem, TFilter, S> for P
121where
122    P: Stream<Item = StreamItem<TItem>> + Send + Sync + Unpin + 'static,
123    TItem: ComparableUnpinTimestamped,
124    TItem::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
125    TFilter: ComparableUnpinTimestamped<Timestamp = TItem::Timestamp>,
126    TFilter::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
127    S: Stream<Item = StreamItem<TFilter>> + Send + Sync + 'static,
128{
129    fn take_while_with(
130        self,
131        filter_stream: S,
132        filter: impl Fn(&TFilter::Inner) -> bool + Send + Sync + 'static,
133    ) -> FluxionStream<impl Stream<Item = StreamItem<TItem>> + Send> {
134        let filter = Arc::new(filter);
135
136        // Tag each stream with its type - unwrap StreamItem first
137        let source_stream = self.map(|item| match item {
138            StreamItem::Value(value) => Item::<TItem, TFilter>::Source(value),
139            StreamItem::Error(e) => Item::<TItem, TFilter>::Error(e),
140        });
141        let filter_stream = filter_stream.map(|item| match item {
142            StreamItem::Value(value) => Item::<TItem, TFilter>::Filter(value),
143            StreamItem::Error(e) => Item::<TItem, TFilter>::Error(e),
144        });
145
146        // Box the streams to make them the same type
147        let streams: Vec<PinnedItemStream<TItem, TFilter>> =
148            vec![Box::pin(source_stream), Box::pin(filter_stream)];
149
150        // State to track the latest filter value and termination
151        let state = Arc::new(Mutex::new((None::<TFilter::Inner>, false)));
152
153        // Use ordered_merge and process items in order
154        let combined_stream = streams.ordered_merge().filter_map({
155            let state = Arc::clone(&state);
156            move |item| {
157                let state = Arc::clone(&state);
158                let filter = Arc::clone(&filter);
159
160                async move {
161                    // Restrict the mutex guard's lifetime to the smallest possible scope
162                    let mut guard = lock_or_recover(&state, "take_while_with state");
163                    let (filter_state, terminated) = &mut *guard;
164
165                    if *terminated {
166                        return None;
167                    }
168
169                    match item {
170                        Item::Error(e) => Some(StreamItem::Error(e)),
171                        Item::Filter(filter_val) => {
172                            *filter_state = Some(filter_val.clone().into_inner());
173                            None
174                        }
175                        Item::Source(source_val) => filter_state.as_ref().map_or_else(
176                            || None,
177                            |fval| {
178                                if filter(fval) {
179                                    Some(StreamItem::Value(source_val.clone()))
180                                } else {
181                                    *terminated = true;
182                                    None
183                                }
184                            },
185                        ),
186                    }
187                }
188            }
189        });
190
191        FluxionStream::new(Box::pin(combined_stream))
192    }
193}
194
195#[derive(Clone, Debug)]
196pub enum Item<TItem, TFilter> {
197    Source(TItem),
198    Filter(TFilter),
199    Error(FluxionError),
200}
201
202impl<TItem, TFilter> HasTimestamp for Item<TItem, TFilter>
203where
204    TItem: HasTimestamp,
205    TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
206{
207    type Inner = TItem::Inner;
208    type Timestamp = TItem::Timestamp;
209
210    fn timestamp(&self) -> Self::Timestamp {
211        match self {
212            Self::Source(s) => s.timestamp(),
213            Self::Filter(f) => f.timestamp(),
214            Self::Error(_) => panic!("Error items cannot provide timestamps"),
215        }
216    }
217}
218
219impl<TItem, TFilter> PartialEq for Item<TItem, TFilter>
220where
221    TItem: HasTimestamp,
222    TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
223{
224    fn eq(&self, other: &Self) -> bool {
225        self.timestamp() == other.timestamp()
226    }
227}
228
229impl<TItem, TFilter> Eq for Item<TItem, TFilter>
230where
231    TItem: HasTimestamp,
232    TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
233{
234}
235
236impl<TItem, TFilter> PartialOrd for Item<TItem, TFilter>
237where
238    TItem: HasTimestamp,
239    TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
240{
241    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
242        Some(self.cmp(other))
243    }
244}
245
246impl<TItem, TFilter> Ord for Item<TItem, TFilter>
247where
248    TItem: HasTimestamp,
249    TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
250{
251    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
252        self.timestamp().cmp(&other.timestamp())
253    }
254}