Skip to main content

atomr_streams/
source.rs

1//! Source — the origin of elements in a stream graph. akka.net: `Dsl/Source.cs`.
2//!
3//! Implemented as a thin wrapper around a boxed [`futures::Stream`]; each
4//! combinator returns a new `Source` whose inner stream lazily applies the
5//! transformation. Matches the linear operator surface of `Dsl/Source.cs`
6//! and `Dsl/SourceOperations.cs`.
7
8use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{self, BoxStream, StreamExt};
12
13use crate::flow::Flow;
14use crate::overflow::OverflowStrategy;
15
16pub struct Source<T> {
17    pub(crate) inner: BoxStream<'static, T>,
18}
19
20impl<T: Send + 'static> Source<T> {
21    // --- factories (akka.net: `Dsl/Source.cs`) ---------------------------------
22
23    #[allow(clippy::should_implement_trait)]
24    pub fn from_iter<I: IntoIterator<Item = T> + Send + 'static>(iter: I) -> Self
25    where
26        I::IntoIter: Send + 'static,
27    {
28        Source { inner: stream::iter(iter).boxed() }
29    }
30
31    pub fn single(value: T) -> Self {
32        Source { inner: stream::once(async move { value }).boxed() }
33    }
34
35    pub fn empty() -> Self {
36        Source { inner: stream::empty().boxed() }
37    }
38
39    pub fn repeat(value: T) -> Self
40    where
41        T: Clone,
42    {
43        Source { inner: stream::repeat(value).boxed() }
44    }
45
46    pub fn cycle<I: IntoIterator<Item = T> + Clone + Send + 'static>(iter: I) -> Self
47    where
48        I::IntoIter: Send + 'static,
49        T: Clone,
50    {
51        Source {
52            inner: stream::unfold(iter.into_iter(), |mut it| async move { it.next().map(|v| (v, it)) })
53                .boxed(),
54        }
55    }
56
57    pub fn from_future<F>(fut: F) -> Self
58    where
59        F: Future<Output = T> + Send + 'static,
60    {
61        Source { inner: stream::once(fut).boxed() }
62    }
63
64    pub fn unfold<S, F, Fut>(init: S, f: F) -> Self
65    where
66        S: Send + 'static,
67        F: FnMut(S) -> Fut + Send + 'static,
68        Fut: Future<Output = Option<(T, S)>> + Send + 'static,
69    {
70        Source { inner: stream::unfold(init, f).boxed() }
71    }
72
73    pub fn tick(initial_delay: Duration, interval: Duration, value: T) -> Self
74    where
75        T: Clone,
76    {
77        let stream = stream::unfold(true, move |first| {
78            let d = if first { initial_delay } else { interval };
79            let v = value.clone();
80            async move {
81                tokio::time::sleep(d).await;
82                Some((v, false))
83            }
84        });
85        Source { inner: stream.boxed() }
86    }
87
88    pub fn failed<E>(error: E) -> Source<Result<T, E>>
89    where
90        E: Send + 'static,
91    {
92        Source { inner: stream::once(async move { Err(error) }).boxed() }
93    }
94
95    pub fn from_receiver(rx: tokio::sync::mpsc::UnboundedReceiver<T>) -> Self {
96        Source { inner: stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|v| (v, rx)) }).boxed() }
97    }
98
99    // --- linear transforms -----------------------------------------------------
100
101    pub fn map<U, F>(self, f: F) -> Source<U>
102    where
103        F: FnMut(T) -> U + Send + 'static,
104        U: Send + 'static,
105    {
106        Source { inner: self.inner.map(f).boxed() }
107    }
108
109    /// akka.net: `SelectAsync` (ordered, bounded parallelism).
110    pub fn map_async<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
111    where
112        F: FnMut(T) -> Fut + Send + 'static,
113        Fut: Future<Output = U> + Send + 'static,
114        U: Send + 'static,
115    {
116        let p = parallelism.max(1);
117        Source { inner: self.inner.map(f).buffered(p).boxed() }
118    }
119
120    /// akka.net: `SelectAsyncUnordered`.
121    pub fn map_async_unordered<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
122    where
123        F: FnMut(T) -> Fut + Send + 'static,
124        Fut: Future<Output = U> + Send + 'static,
125        U: Send + 'static,
126    {
127        let p = parallelism.max(1);
128        Source { inner: self.inner.map(f).buffer_unordered(p).boxed() }
129    }
130
131    /// `async_boundary(buffer)` — explicit async stage that decouples
132    /// the upstream and downstream pipelines onto separate Tokio
133    /// tasks via a bounded mpsc channel of capacity `buffer`.
134    /// Akka.NET / Akka Streams: the `.async` call. Phase 12.3 of
135    /// `docs/full-port-plan.md`.
136    ///
137    /// Useful when an upstream stage is CPU-heavy and you want
138    /// downstream consumption to overlap with production. Slow
139    /// downstream applies natural back-pressure once the buffer
140    /// fills.
141    pub fn async_boundary(self, buffer: usize) -> Source<T> {
142        let buffer = buffer.max(1);
143        let (tx, rx) = tokio::sync::mpsc::channel::<T>(buffer);
144        let mut inner = self.inner;
145        tokio::spawn(async move {
146            while let Some(item) = inner.next().await {
147                if tx.send(item).await.is_err() {
148                    return;
149                }
150            }
151        });
152        let stream =
153            futures::stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|item| (item, rx)) });
154        Source { inner: stream.boxed() }
155    }
156
157    pub fn filter<F>(self, mut f: F) -> Source<T>
158    where
159        F: FnMut(&T) -> bool + Send + 'static,
160    {
161        Source { inner: self.inner.filter(move |v| futures::future::ready(f(v))).boxed() }
162    }
163
164    pub fn filter_map<U, F>(self, mut f: F) -> Source<U>
165    where
166        F: FnMut(T) -> Option<U> + Send + 'static,
167        U: Send + 'static,
168    {
169        Source { inner: self.inner.filter_map(move |v| futures::future::ready(f(v))).boxed() }
170    }
171
172    pub fn take(self, n: usize) -> Source<T> {
173        Source { inner: self.inner.take(n).boxed() }
174    }
175
176    pub fn take_while<F>(self, mut f: F) -> Source<T>
177    where
178        F: FnMut(&T) -> bool + Send + 'static,
179    {
180        Source { inner: self.inner.take_while(move |v| futures::future::ready(f(v))).boxed() }
181    }
182
183    pub fn skip(self, n: usize) -> Source<T> {
184        Source { inner: self.inner.skip(n).boxed() }
185    }
186
187    pub fn skip_while<F>(self, mut f: F) -> Source<T>
188    where
189        F: FnMut(&T) -> bool + Send + 'static,
190    {
191        Source { inner: self.inner.skip_while(move |v| futures::future::ready(f(v))).boxed() }
192    }
193
194    pub fn scan<Acc, F>(self, init: Acc, mut f: F) -> Source<Acc>
195    where
196        Acc: Clone + Send + 'static,
197        F: FnMut(&Acc, T) -> Acc + Send + 'static,
198    {
199        Source {
200            inner: self
201                .inner
202                .scan(init, move |state, item| {
203                    *state = f(state, item);
204                    futures::future::ready(Some(state.clone()))
205                })
206                .boxed(),
207        }
208    }
209
210    /// akka.net: `Grouped(n)` — emit vectors of up to n items.
211    pub fn grouped(self, n: usize) -> Source<Vec<T>> {
212        Source { inner: self.inner.chunks(n.max(1)).boxed() }
213    }
214
215    pub fn intersperse(self, sep: T) -> Source<T>
216    where
217        T: Clone,
218    {
219        let state = InterspersState { started: false, pending: None, sep, done: false };
220        Source {
221            inner: stream::unfold((self.inner, state), |(mut s, mut st)| async move {
222                if st.done {
223                    return None;
224                }
225                if let Some(p) = st.pending.take() {
226                    return Some((p, (s, st)));
227                }
228                let next = s.next().await;
229                match next {
230                    None => None,
231                    Some(v) => {
232                        if !st.started {
233                            st.started = true;
234                            Some((v, (s, st)))
235                        } else {
236                            st.pending = Some(v);
237                            let sep = st.sep.clone();
238                            Some((sep, (s, st)))
239                        }
240                    }
241                }
242            })
243            .boxed(),
244        }
245    }
246
247    pub fn concat(self, other: Source<T>) -> Source<T> {
248        Source { inner: self.inner.chain(other.inner).boxed() }
249    }
250
251    pub fn prepend(self, other: Source<T>) -> Source<T> {
252        Source { inner: other.inner.chain(self.inner).boxed() }
253    }
254
255    /// akka.net: `Delay` — shift every element by `d`.
256    pub fn delay(self, d: Duration) -> Source<T> {
257        Source {
258            inner: self
259                .inner
260                .then(move |v| async move {
261                    tokio::time::sleep(d).await;
262                    v
263                })
264                .boxed(),
265        }
266    }
267
268    /// akka.net: `InitialDelay` — wait `d` before emitting the first element.
269    pub fn initial_delay(self, d: Duration) -> Source<T> {
270        let inner = self.inner;
271        Source {
272            inner: stream::once(async move {
273                tokio::time::sleep(d).await;
274                inner
275            })
276            .flatten()
277            .boxed(),
278        }
279    }
280
281    /// akka.net: `Throttle` — limit element rate (one per `interval`).
282    pub fn throttle(self, interval: Duration) -> Source<T> {
283        Source {
284            inner: self
285                .inner
286                .then(move |v| async move {
287                    tokio::time::sleep(interval).await;
288                    v
289                })
290                .boxed(),
291        }
292    }
293
294    /// akka.net: `Buffer(size, OverflowStrategy)`.
295    pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Source<T> {
296        crate::overflow::apply(self, size, strategy)
297    }
298
299    /// akka.net: `WireTap` — observes each element without affecting the stream.
300    pub fn wire_tap<F>(self, mut f: F) -> Source<T>
301    where
302        F: FnMut(&T) + Send + 'static,
303    {
304        Source { inner: self.inner.inspect(move |v| f(v)).boxed() }
305    }
306
307    pub fn via<U>(self, flow: Flow<T, U>) -> Source<U>
308    where
309        U: Send + 'static,
310    {
311        Source { inner: (flow.transform)(self.inner) }
312    }
313
314    pub(crate) fn into_boxed(self) -> BoxStream<'static, T> {
315        self.inner
316    }
317}
318
319struct InterspersState<T> {
320    started: bool,
321    pending: Option<T>,
322    sep: T,
323    done: bool,
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use crate::sink::Sink;
330
331    #[tokio::test]
332    async fn map_filter_take() {
333        let out: Vec<i32> =
334            Sink::collect(Source::from_iter(0..100).map(|x| x * 3).filter(|x| x % 2 == 0).take(5)).await;
335        assert_eq!(out, vec![0, 6, 12, 18, 24]);
336    }
337
338    #[tokio::test]
339    async fn scan_emits_running_state() {
340        let out: Vec<i32> =
341            Sink::collect(Source::from_iter(vec![1, 2, 3, 4]).scan(0, |acc, x| acc + x)).await;
342        assert_eq!(out, vec![1, 3, 6, 10]);
343    }
344
345    #[tokio::test]
346    async fn grouped_packs_chunks() {
347        let out: Vec<Vec<i32>> = Sink::collect(Source::from_iter(1..=7).grouped(3)).await;
348        assert_eq!(out, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
349    }
350
351    #[tokio::test]
352    async fn intersperse_inserts_separator() {
353        let out: Vec<i32> = Sink::collect(Source::from_iter(vec![1, 2, 3]).intersperse(0)).await;
354        assert_eq!(out, vec![1, 0, 2, 0, 3]);
355    }
356
357    #[tokio::test]
358    async fn map_async_preserves_order() {
359        let out: Vec<i32> = Sink::collect(Source::from_iter(1..=4).map_async(4, |x| async move {
360            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
361            x * x
362        }))
363        .await;
364        assert_eq!(out, vec![1, 4, 9, 16]);
365    }
366
367    #[tokio::test]
368    async fn concat_and_prepend_join_sources() {
369        let a = Source::from_iter(vec![1, 2]);
370        let b = Source::from_iter(vec![3, 4]);
371        assert_eq!(Sink::collect(a.concat(b)).await, vec![1, 2, 3, 4]);
372
373        let a = Source::from_iter(vec![1, 2]);
374        let b = Source::from_iter(vec![3, 4]);
375        assert_eq!(Sink::collect(a.prepend(b)).await, vec![3, 4, 1, 2]);
376    }
377
378    #[tokio::test]
379    async fn wire_tap_observes_without_consuming() {
380        let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::<i32>::new()));
381        let seen_c = seen.clone();
382        let out = Sink::collect(
383            Source::from_iter(vec![1, 2, 3]).wire_tap(move |v| seen_c.lock().unwrap().push(*v)),
384        )
385        .await;
386        assert_eq!(out, vec![1, 2, 3]);
387        assert_eq!(seen.lock().unwrap().clone(), vec![1, 2, 3]);
388    }
389}