fluxion_stream/take_while_with.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5use crate::FluxionStream;
6use fluxion_core::lock_utilities::lock_or_recover;
7use fluxion_core::{ComparableUnpinTimestamped, FluxionError, HasTimestamp, StreamItem};
8use fluxion_ordered_merge::OrderedMergeExt;
9use futures::stream::StreamExt;
10use futures::Stream;
11use std::fmt::Debug;
12use std::pin::Pin;
13use std::sync::{Arc, Mutex};
14
15type PinnedItemStream<TItem, TFilter> =
16 Pin<Box<dyn Stream<Item = Item<TItem, TFilter>> + Send + Sync + 'static>>;
17
18/// Extension trait providing the `take_while_with` operator for timestamped streams.
19///
20/// This operator conditionally emits elements from a source stream based on values
21/// from a separate filter stream. The stream terminates when the filter condition
22/// becomes false.
23pub trait TakeWhileExt<TItem, TFilter, S>: Stream<Item = StreamItem<TItem>> + Sized
24where
25 TItem: ComparableUnpinTimestamped,
26 TItem::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
27 TFilter: ComparableUnpinTimestamped<Timestamp = TItem::Timestamp>,
28 TFilter::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
29 S: Stream<Item = StreamItem<TFilter>> + Send + Sync + 'static,
30{
31 /// Takes elements from the source stream while the filter predicate returns true.
32 ///
33 /// This operator merges the source stream with a filter stream in temporal order.
34 /// It maintains the latest filter value and only emits source values when the
35 /// filter predicate evaluates to `true`. Once the predicate returns `false`,
36 /// the entire stream terminates.
37 ///
38 /// # Behavior
39 ///
40 /// - Source values are emitted only when latest filter passes the predicate
41 /// - Filter stream updates change the gating condition
42 /// - Stream terminates immediately when filter predicate returns `false`
43 /// - Emitted values maintain their ordered wrapper
44 ///
45 /// # Arguments
46 ///
47 /// * `filter_stream` - Stream providing filter values that control emission
48 /// * `filter` - Predicate function applied to filter values. Returns `true` to continue.
49 ///
50 /// # Returns
51 ///
52 /// A `FluxionStream` of source elements that are emitted while the filter condition
53 /// remains true. Stream terminates when condition becomes false.
54 ///
55 /// # Errors
56 ///
57 /// This operator may produce `StreamItem::Error` in the following cases:
58 ///
59 /// - **Lock Errors**: When acquiring the combined state lock fails (e.g., due to lock poisoning).
60 /// These are transient errors - the stream continues processing and may succeed on subsequent items.
61 ///
62 /// Lock errors are typically non-fatal and indicate temporary contention. The operator will continue
63 /// processing subsequent items. See the [Error Handling Guide](../docs/ERROR-HANDLING.md) for patterns
64 /// on handling these errors in your application.
65 ///
66 /// # See Also
67 ///
68 /// - [`emit_when`](crate::EmitWhenExt::emit_when) - Gates emissions but doesn't terminate
69 /// - [`take_latest_when`](crate::TakeLatestWhenExt::take_latest_when) - Samples on condition
70 ///
71 /// # Examples
72 ///
73 /// ```rust
74 /// use fluxion_stream::{TakeWhileExt, FluxionStream};
75 /// use fluxion_test_utils::Sequenced;
76 /// use fluxion_core::Timestamped as TimestampedTrait;
77 /// use futures::StreamExt;
78 ///
79 /// # async fn example() {
80 /// // Create channels
81 /// let (tx_data, rx_data) = tokio::sync::mpsc::unbounded_channel::<Sequenced<i32>>();
82 /// let (tx_gate, rx_gate) = tokio::sync::mpsc::unbounded_channel::<Sequenced<bool>>();
83 ///
84 /// // Create streams
85 /// let data_stream = FluxionStream::from_unbounded_receiver(rx_data);
86 /// let gate_stream = FluxionStream::from_unbounded_receiver(rx_gate);
87 ///
88 /// // Combine streams
89 /// let mut gated = data_stream.take_while_with(
90 /// gate_stream,
91 /// |gate_value| *gate_value == true
92 /// );
93 ///
94 /// // Send values
95 /// tx_gate.send((true, 1).into()).unwrap();
96 /// tx_data.send((1, 2).into()).unwrap();
97 ///
98 /// // Assert
99 /// assert_eq!(&gated.next().await.unwrap().unwrap().value, &1);
100 /// # }
101 /// ```
102 ///
103 /// # Use Cases
104 ///
105 /// - Emergency stop mechanism for data streams
106 /// - Time-bounded stream processing
107 /// - Conditional data forwarding with external control
108 ///
109 /// # Thread Safety
110 ///
111 /// Uses internal locks to maintain state. Lock errors are logged and cause
112 /// the stream to terminate.
113 fn take_while_with(
114 self,
115 filter_stream: S,
116 filter: impl Fn(&TFilter::Inner) -> bool + Send + Sync + 'static,
117 ) -> FluxionStream<impl Stream<Item = StreamItem<TItem>> + Send>;
118}
119
120impl<TItem, TFilter, S, P> TakeWhileExt<TItem, TFilter, S> for P
121where
122 P: Stream<Item = StreamItem<TItem>> + Send + Sync + Unpin + 'static,
123 TItem: ComparableUnpinTimestamped,
124 TItem::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
125 TFilter: ComparableUnpinTimestamped<Timestamp = TItem::Timestamp>,
126 TFilter::Inner: Clone + Debug + Ord + Send + Sync + Unpin + 'static,
127 S: Stream<Item = StreamItem<TFilter>> + Send + Sync + 'static,
128{
129 fn take_while_with(
130 self,
131 filter_stream: S,
132 filter: impl Fn(&TFilter::Inner) -> bool + Send + Sync + 'static,
133 ) -> FluxionStream<impl Stream<Item = StreamItem<TItem>> + Send> {
134 let filter = Arc::new(filter);
135
136 // Tag each stream with its type - unwrap StreamItem first
137 let source_stream = self.map(|item| match item {
138 StreamItem::Value(value) => Item::<TItem, TFilter>::Source(value),
139 StreamItem::Error(e) => Item::<TItem, TFilter>::Error(e),
140 });
141 let filter_stream = filter_stream.map(|item| match item {
142 StreamItem::Value(value) => Item::<TItem, TFilter>::Filter(value),
143 StreamItem::Error(e) => Item::<TItem, TFilter>::Error(e),
144 });
145
146 // Box the streams to make them the same type
147 let streams: Vec<PinnedItemStream<TItem, TFilter>> =
148 vec![Box::pin(source_stream), Box::pin(filter_stream)];
149
150 // State to track the latest filter value and termination
151 let state = Arc::new(Mutex::new((None::<TFilter::Inner>, false)));
152
153 // Use ordered_merge and process items in order
154 let combined_stream = streams.ordered_merge().filter_map({
155 let state = Arc::clone(&state);
156 move |item| {
157 let state = Arc::clone(&state);
158 let filter = Arc::clone(&filter);
159
160 async move {
161 // Restrict the mutex guard's lifetime to the smallest possible scope
162 let mut guard = lock_or_recover(&state, "take_while_with state");
163 let (filter_state, terminated) = &mut *guard;
164
165 if *terminated {
166 return None;
167 }
168
169 match item {
170 Item::Error(e) => Some(StreamItem::Error(e)),
171 Item::Filter(filter_val) => {
172 *filter_state = Some(filter_val.clone().into_inner());
173 None
174 }
175 Item::Source(source_val) => filter_state.as_ref().map_or_else(
176 || None,
177 |fval| {
178 if filter(fval) {
179 Some(StreamItem::Value(source_val.clone()))
180 } else {
181 *terminated = true;
182 None
183 }
184 },
185 ),
186 }
187 }
188 }
189 });
190
191 FluxionStream::new(Box::pin(combined_stream))
192 }
193}
194
195#[derive(Clone, Debug)]
196pub enum Item<TItem, TFilter> {
197 Source(TItem),
198 Filter(TFilter),
199 Error(FluxionError),
200}
201
202impl<TItem, TFilter> HasTimestamp for Item<TItem, TFilter>
203where
204 TItem: HasTimestamp,
205 TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
206{
207 type Inner = TItem::Inner;
208 type Timestamp = TItem::Timestamp;
209
210 fn timestamp(&self) -> Self::Timestamp {
211 match self {
212 Self::Source(s) => s.timestamp(),
213 Self::Filter(f) => f.timestamp(),
214 Self::Error(_) => panic!("Error items cannot provide timestamps"),
215 }
216 }
217}
218
219impl<TItem, TFilter> PartialEq for Item<TItem, TFilter>
220where
221 TItem: HasTimestamp,
222 TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
223{
224 fn eq(&self, other: &Self) -> bool {
225 self.timestamp() == other.timestamp()
226 }
227}
228
229impl<TItem, TFilter> Eq for Item<TItem, TFilter>
230where
231 TItem: HasTimestamp,
232 TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
233{
234}
235
236impl<TItem, TFilter> PartialOrd for Item<TItem, TFilter>
237where
238 TItem: HasTimestamp,
239 TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
240{
241 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
242 Some(self.cmp(other))
243 }
244}
245
246impl<TItem, TFilter> Ord for Item<TItem, TFilter>
247where
248 TItem: HasTimestamp,
249 TFilter: HasTimestamp<Timestamp = TItem::Timestamp>,
250{
251 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
252 self.timestamp().cmp(&other.timestamp())
253 }
254}