Skip to main content

atomr_streams/
flow.rs

1//! Flow — a linear transformation from `In` to `Out`.
2//!
3//! A `Flow<A, B>` is a boxed closure that turns a `Stream<A>` into a
4//! `Stream<B>`. Composition is by function chaining, which mirrors the
5//! semantics of `Dsl/FlowOperations.cs` for the linear subset of
6//! operators we provide.
7
8use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{BoxStream, StreamExt};
12
13pub struct Flow<In, Out> {
14    pub(crate) transform: Box<dyn FnOnce(BoxStream<'static, In>) -> BoxStream<'static, Out> + Send + 'static>,
15}
16
17impl<T: Send + 'static> Flow<T, T> {
18    pub fn identity() -> Self {
19        Flow { transform: Box::new(|s| s) }
20    }
21}
22
23impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
24    /// Pure synchronous mapping./ `Select`.
25    pub fn from_fn<F>(f: F) -> Self
26    where
27        F: FnMut(In) -> Out + Send + 'static,
28    {
29        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).boxed()) }
30    }
31
32    /// Asynchronous mapping with ordered bounded parallelism.
33    pub fn map_async<F, Fut>(parallelism: usize, f: F) -> Self
34    where
35        F: FnMut(In) -> Fut + Send + 'static,
36        Fut: Future<Output = Out> + Send + 'static,
37    {
38        let p = parallelism.max(1);
39        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).buffered(p).boxed()) }
40    }
41
42    /// Chain another flow after this one.
43    pub fn via<Out2: Send + 'static>(self, next: Flow<Out, Out2>) -> Flow<In, Out2> {
44        Flow {
45            transform: Box::new(move |s: BoxStream<'static, In>| {
46                let mid = (self.transform)(s);
47                (next.transform)(mid)
48            }),
49        }
50    }
51
52    /// Compose with a post-processing closure./ `Select`.
53    pub fn then<Out2, F>(self, g: F) -> Flow<In, Out2>
54    where
55        Out2: Send + 'static,
56        F: FnMut(Out) -> Out2 + Send + 'static,
57    {
58        Flow {
59            transform: Box::new(move |s: BoxStream<'static, In>| {
60                let out = (self.transform)(s);
61                out.map(g).boxed()
62            }),
63        }
64    }
65}
66
67impl<In: Send + 'static> Flow<In, In> {
68    pub fn filter<F>(mut f: F) -> Self
69    where
70        F: FnMut(&In) -> bool + Send + 'static,
71    {
72        Flow {
73            transform: Box::new(move |s: BoxStream<'static, In>| {
74                s.filter(move |v| futures::future::ready(f(v))).boxed()
75            }),
76        }
77    }
78
79    pub fn take(n: usize) -> Self {
80        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.take(n).boxed()) }
81    }
82
83    pub fn skip(n: usize) -> Self {
84        Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.skip(n).boxed()) }
85    }
86
87    pub fn throttle(interval: Duration) -> Self {
88        Flow {
89            transform: Box::new(move |s: BoxStream<'static, In>| {
90                s.then(move |v| async move {
91                    tokio::time::sleep(interval).await;
92                    v
93                })
94                .boxed()
95            }),
96        }
97    }
98}
99
100impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
101    /// / `flatMapConcat`.
102    pub fn flat_map_concat<F, S, U>(mut f: F) -> Flow<In, U>
103    where
104        F: FnMut(In) -> S + Send + 'static,
105        S: IntoIterator<Item = U> + Send + 'static,
106        S::IntoIter: Send + 'static,
107        U: Send + 'static,
108        // Keep Out type linked for inference; unused here.
109        In: 'static,
110    {
111        Flow {
112            transform: Box::new(move |s: BoxStream<'static, In>| {
113                s.flat_map(move |x| futures::stream::iter(f(x))).boxed()
114            }),
115        }
116    }
117}