Skip to main content

atomr_streams/
source.rs

1//! Source — the origin of elements in a stream graph.
2//!
3//! Implemented as a thin wrapper around a boxed [`futures::Stream`]; each
4//! combinator returns a new `Source` whose inner stream lazily applies the
5//! transformation. Matches the linear operator surface of `Dsl/Source.cs`
6//! and `Dsl/SourceOperations.cs`.
7
8use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{self, BoxStream, StreamExt};
12
13use crate::flow::Flow;
14use crate::overflow::OverflowStrategy;
15
16pub struct Source<T> {
17    pub(crate) inner: BoxStream<'static, T>,
18}
19
20impl<T: Send + 'static> Source<T> {
21    // --- factories ---------------------------------
22
23    #[allow(clippy::should_implement_trait)]
24    pub fn from_iter<I: IntoIterator<Item = T> + Send + 'static>(iter: I) -> Self
25    where
26        I::IntoIter: Send + 'static,
27    {
28        Source { inner: stream::iter(iter).boxed() }
29    }
30
31    pub fn single(value: T) -> Self {
32        Source { inner: stream::once(async move { value }).boxed() }
33    }
34
35    pub fn empty() -> Self {
36        Source { inner: stream::empty().boxed() }
37    }
38
39    pub fn repeat(value: T) -> Self
40    where
41        T: Clone,
42    {
43        Source { inner: stream::repeat(value).boxed() }
44    }
45
46    pub fn cycle<I: IntoIterator<Item = T> + Clone + Send + 'static>(iter: I) -> Self
47    where
48        I::IntoIter: Send + 'static,
49        T: Clone,
50    {
51        Source {
52            inner: stream::unfold(iter.into_iter(), |mut it| async move { it.next().map(|v| (v, it)) })
53                .boxed(),
54        }
55    }
56
57    pub fn from_future<F>(fut: F) -> Self
58    where
59        F: Future<Output = T> + Send + 'static,
60    {
61        Source { inner: stream::once(fut).boxed() }
62    }
63
64    pub fn unfold<S, F, Fut>(init: S, f: F) -> Self
65    where
66        S: Send + 'static,
67        F: FnMut(S) -> Fut + Send + 'static,
68        Fut: Future<Output = Option<(T, S)>> + Send + 'static,
69    {
70        Source { inner: stream::unfold(init, f).boxed() }
71    }
72
73    pub fn tick(initial_delay: Duration, interval: Duration, value: T) -> Self
74    where
75        T: Clone,
76    {
77        let stream = stream::unfold(true, move |first| {
78            let d = if first { initial_delay } else { interval };
79            let v = value.clone();
80            async move {
81                tokio::time::sleep(d).await;
82                Some((v, false))
83            }
84        });
85        Source { inner: stream.boxed() }
86    }
87
88    pub fn failed<E>(error: E) -> Source<Result<T, E>>
89    where
90        E: Send + 'static,
91    {
92        Source { inner: stream::once(async move { Err(error) }).boxed() }
93    }
94
95    pub fn from_receiver(rx: tokio::sync::mpsc::UnboundedReceiver<T>) -> Self {
96        Source { inner: stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|v| (v, rx)) }).boxed() }
97    }
98
99    // --- linear transforms -----------------------------------------------------
100
101    pub fn map<U, F>(self, f: F) -> Source<U>
102    where
103        F: FnMut(T) -> U + Send + 'static,
104        U: Send + 'static,
105    {
106        Source { inner: self.inner.map(f).boxed() }
107    }
108
109    /// (ordered, bounded parallelism).
110    pub fn map_async<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
111    where
112        F: FnMut(T) -> Fut + Send + 'static,
113        Fut: Future<Output = U> + Send + 'static,
114        U: Send + 'static,
115    {
116        let p = parallelism.max(1);
117        Source { inner: self.inner.map(f).buffered(p).boxed() }
118    }
119
120    pub fn map_async_unordered<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
121    where
122        F: FnMut(T) -> Fut + Send + 'static,
123        Fut: Future<Output = U> + Send + 'static,
124        U: Send + 'static,
125    {
126        let p = parallelism.max(1);
127        Source { inner: self.inner.map(f).buffer_unordered(p).boxed() }
128    }
129
130    /// `async_boundary(buffer)` — explicit async stage that decouples
131    /// the upstream and downstream pipelines onto separate Tokio
132    /// tasks via a bounded mpsc channel of capacity `buffer`.
133    /// the `.async` call. Phase 12.3 of
134    /// `docs/full-port-plan.md`.
135    ///
136    /// Useful when an upstream stage is CPU-heavy and you want
137    /// downstream consumption to overlap with production. Slow
138    /// downstream applies natural back-pressure once the buffer
139    /// fills.
140    pub fn async_boundary(self, buffer: usize) -> Source<T> {
141        let buffer = buffer.max(1);
142        let (tx, rx) = tokio::sync::mpsc::channel::<T>(buffer);
143        let mut inner = self.inner;
144        tokio::spawn(async move {
145            while let Some(item) = inner.next().await {
146                if tx.send(item).await.is_err() {
147                    return;
148                }
149            }
150        });
151        let stream =
152            futures::stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|item| (item, rx)) });
153        Source { inner: stream.boxed() }
154    }
155
156    pub fn filter<F>(self, mut f: F) -> Source<T>
157    where
158        F: FnMut(&T) -> bool + Send + 'static,
159    {
160        Source { inner: self.inner.filter(move |v| futures::future::ready(f(v))).boxed() }
161    }
162
163    pub fn filter_map<U, F>(self, mut f: F) -> Source<U>
164    where
165        F: FnMut(T) -> Option<U> + Send + 'static,
166        U: Send + 'static,
167    {
168        Source { inner: self.inner.filter_map(move |v| futures::future::ready(f(v))).boxed() }
169    }
170
171    pub fn take(self, n: usize) -> Source<T> {
172        Source { inner: self.inner.take(n).boxed() }
173    }
174
175    pub fn take_while<F>(self, mut f: F) -> Source<T>
176    where
177        F: FnMut(&T) -> bool + Send + 'static,
178    {
179        Source { inner: self.inner.take_while(move |v| futures::future::ready(f(v))).boxed() }
180    }
181
182    pub fn skip(self, n: usize) -> Source<T> {
183        Source { inner: self.inner.skip(n).boxed() }
184    }
185
186    pub fn skip_while<F>(self, mut f: F) -> Source<T>
187    where
188        F: FnMut(&T) -> bool + Send + 'static,
189    {
190        Source { inner: self.inner.skip_while(move |v| futures::future::ready(f(v))).boxed() }
191    }
192
193    pub fn scan<Acc, F>(self, init: Acc, mut f: F) -> Source<Acc>
194    where
195        Acc: Clone + Send + 'static,
196        F: FnMut(&Acc, T) -> Acc + Send + 'static,
197    {
198        Source {
199            inner: self
200                .inner
201                .scan(init, move |state, item| {
202                    *state = f(state, item);
203                    futures::future::ready(Some(state.clone()))
204                })
205                .boxed(),
206        }
207    }
208
209    /// Emit vectors of up to n items.
210    pub fn grouped(self, n: usize) -> Source<Vec<T>> {
211        Source { inner: self.inner.chunks(n.max(1)).boxed() }
212    }
213
214    pub fn intersperse(self, sep: T) -> Source<T>
215    where
216        T: Clone,
217    {
218        let state = InterspersState { started: false, pending: None, sep, done: false };
219        Source {
220            inner: stream::unfold((self.inner, state), |(mut s, mut st)| async move {
221                if st.done {
222                    return None;
223                }
224                if let Some(p) = st.pending.take() {
225                    return Some((p, (s, st)));
226                }
227                let next = s.next().await;
228                match next {
229                    None => None,
230                    Some(v) => {
231                        if !st.started {
232                            st.started = true;
233                            Some((v, (s, st)))
234                        } else {
235                            st.pending = Some(v);
236                            let sep = st.sep.clone();
237                            Some((sep, (s, st)))
238                        }
239                    }
240                }
241            })
242            .boxed(),
243        }
244    }
245
246    pub fn concat(self, other: Source<T>) -> Source<T> {
247        Source { inner: self.inner.chain(other.inner).boxed() }
248    }
249
250    pub fn prepend(self, other: Source<T>) -> Source<T> {
251        Source { inner: other.inner.chain(self.inner).boxed() }
252    }
253
254    /// Shift every element by `d`.
255    pub fn delay(self, d: Duration) -> Source<T> {
256        Source {
257            inner: self
258                .inner
259                .then(move |v| async move {
260                    tokio::time::sleep(d).await;
261                    v
262                })
263                .boxed(),
264        }
265    }
266
267    /// Wait `d` before emitting the first element.
268    pub fn initial_delay(self, d: Duration) -> Source<T> {
269        let inner = self.inner;
270        Source {
271            inner: stream::once(async move {
272                tokio::time::sleep(d).await;
273                inner
274            })
275            .flatten()
276            .boxed(),
277        }
278    }
279
280    /// Limit element rate (one per `interval`).
281    pub fn throttle(self, interval: Duration) -> Source<T> {
282        Source {
283            inner: self
284                .inner
285                .then(move |v| async move {
286                    tokio::time::sleep(interval).await;
287                    v
288                })
289                .boxed(),
290        }
291    }
292
293    pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Source<T> {
294        crate::overflow::apply(self, size, strategy)
295    }
296
297    /// Observes each element without affecting the stream.
298    pub fn wire_tap<F>(self, mut f: F) -> Source<T>
299    where
300        F: FnMut(&T) + Send + 'static,
301    {
302        Source { inner: self.inner.inspect(move |v| f(v)).boxed() }
303    }
304
305    pub fn via<U>(self, flow: Flow<T, U>) -> Source<U>
306    where
307        U: Send + 'static,
308    {
309        Source { inner: (flow.transform)(self.inner) }
310    }
311
312    pub(crate) fn into_boxed(self) -> BoxStream<'static, T> {
313        self.inner
314    }
315}
316
317struct InterspersState<T> {
318    started: bool,
319    pending: Option<T>,
320    sep: T,
321    done: bool,
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use crate::sink::Sink;
328
329    #[tokio::test]
330    async fn map_filter_take() {
331        let out: Vec<i32> =
332            Sink::collect(Source::from_iter(0..100).map(|x| x * 3).filter(|x| x % 2 == 0).take(5)).await;
333        assert_eq!(out, vec![0, 6, 12, 18, 24]);
334    }
335
336    #[tokio::test]
337    async fn scan_emits_running_state() {
338        let out: Vec<i32> =
339            Sink::collect(Source::from_iter(vec![1, 2, 3, 4]).scan(0, |acc, x| acc + x)).await;
340        assert_eq!(out, vec![1, 3, 6, 10]);
341    }
342
343    #[tokio::test]
344    async fn grouped_packs_chunks() {
345        let out: Vec<Vec<i32>> = Sink::collect(Source::from_iter(1..=7).grouped(3)).await;
346        assert_eq!(out, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
347    }
348
349    #[tokio::test]
350    async fn intersperse_inserts_separator() {
351        let out: Vec<i32> = Sink::collect(Source::from_iter(vec![1, 2, 3]).intersperse(0)).await;
352        assert_eq!(out, vec![1, 0, 2, 0, 3]);
353    }
354
355    #[tokio::test]
356    async fn map_async_preserves_order() {
357        let out: Vec<i32> = Sink::collect(Source::from_iter(1..=4).map_async(4, |x| async move {
358            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
359            x * x
360        }))
361        .await;
362        assert_eq!(out, vec![1, 4, 9, 16]);
363    }
364
365    #[tokio::test]
366    async fn concat_and_prepend_join_sources() {
367        let a = Source::from_iter(vec![1, 2]);
368        let b = Source::from_iter(vec![3, 4]);
369        assert_eq!(Sink::collect(a.concat(b)).await, vec![1, 2, 3, 4]);
370
371        let a = Source::from_iter(vec![1, 2]);
372        let b = Source::from_iter(vec![3, 4]);
373        assert_eq!(Sink::collect(a.prepend(b)).await, vec![3, 4, 1, 2]);
374    }
375
376    #[tokio::test]
377    async fn wire_tap_observes_without_consuming() {
378        let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::<i32>::new()));
379        let seen_c = seen.clone();
380        let out = Sink::collect(
381            Source::from_iter(vec![1, 2, 3]).wire_tap(move |v| seen_c.lock().unwrap().push(*v)),
382        )
383        .await;
384        assert_eq!(out, vec![1, 2, 3]);
385        assert_eq!(seen.lock().unwrap().clone(), vec![1, 2, 3]);
386    }
387}