Skip to main content

atomr_streams/
flow.rs

1//! Flow — a linear transformation from `In` to `Out`. akka.net: `Dsl/Flow.cs`.
2//!
3//! A `Flow<A, B>` is a boxed closure that turns a `Stream<A>` into a
4//! `Stream<B>`. Composition is by function chaining, which mirrors the
5//! semantics of `Dsl/FlowOperations.cs` for the linear subset of
6//! operators we provide.
7
8use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{BoxStream, StreamExt};
12
13pub struct Flow<In, Out> {
14    pub(crate) transform: Box<dyn FnOnce(BoxStream<'static, In>) -> BoxStream<'static, Out> + Send + 'static>,
15}
16
17impl<T: Send + 'static> Flow<T, T> {
18    pub fn identity() -> Self {
19        Flow { transform: Box::new(|s| s) }
20    }
21}
22
23impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
24    /// Pure synchronous mapping. akka.net: `Flow.FromFunction` / `Select`.
25    pub fn from_fn<F>(f: F) -> Self
26    where
27        F: FnMut(In) -> Out + Send + 'static,
28    {
29        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).boxed()) }
30    }
31
32    /// Asynchronous mapping with ordered bounded parallelism.
33    /// akka.net: `SelectAsync(parallelism, mapper)`.
34    pub fn map_async<F, Fut>(parallelism: usize, f: F) -> Self
35    where
36        F: FnMut(In) -> Fut + Send + 'static,
37        Fut: Future<Output = Out> + Send + 'static,
38    {
39        let p = parallelism.max(1);
40        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).buffered(p).boxed()) }
41    }
42
43    /// Chain another flow after this one. akka.net: `Flow.Via`.
44    pub fn via<Out2: Send + 'static>(self, next: Flow<Out, Out2>) -> Flow<In, Out2> {
45        Flow {
46            transform: Box::new(move |s: BoxStream<'static, In>| {
47                let mid = (self.transform)(s);
48                (next.transform)(mid)
49            }),
50        }
51    }
52
53    /// Compose with a post-processing closure. akka.net: `Then` / `Select`.
54    pub fn then<Out2, F>(self, g: F) -> Flow<In, Out2>
55    where
56        Out2: Send + 'static,
57        F: FnMut(Out) -> Out2 + Send + 'static,
58    {
59        Flow {
60            transform: Box::new(move |s: BoxStream<'static, In>| {
61                let out = (self.transform)(s);
62                out.map(g).boxed()
63            }),
64        }
65    }
66}
67
68impl<In: Send + 'static> Flow<In, In> {
69    pub fn filter<F>(mut f: F) -> Self
70    where
71        F: FnMut(&In) -> bool + Send + 'static,
72    {
73        Flow {
74            transform: Box::new(move |s: BoxStream<'static, In>| {
75                s.filter(move |v| futures::future::ready(f(v))).boxed()
76            }),
77        }
78    }
79
80    pub fn take(n: usize) -> Self {
81        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.take(n).boxed()) }
82    }
83
84    pub fn skip(n: usize) -> Self {
85        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.skip(n).boxed()) }
86    }
87
88    /// akka.net: `Throttle`.
89    pub fn throttle(interval: Duration) -> Self {
90        Flow {
91            transform: Box::new(move |s: BoxStream<'static, In>| {
92                s.then(move |v| async move {
93                    tokio::time::sleep(interval).await;
94                    v
95                })
96                .boxed()
97            }),
98        }
99    }
100}
101
102impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
103    /// akka.net: `SelectMany` / `flatMapConcat`.
104    pub fn flat_map_concat<F, S, U>(mut f: F) -> Flow<In, U>
105    where
106        F: FnMut(In) -> S + Send + 'static,
107        S: IntoIterator<Item = U> + Send + 'static,
108        S::IntoIter: Send + 'static,
109        U: Send + 'static,
110        // Keep Out type linked for inference; unused here.
111        In: 'static,
112    {
113        Flow {
114            transform: Box::new(move |s: BoxStream<'static, In>| {
115                s.flat_map(move |x| futures::stream::iter(f(x))).boxed()
116            }),
117        }
118    }
119}