1use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{BoxStream, StreamExt};
12
13pub struct Flow<In, Out> {
14 pub(crate) transform: Box<dyn FnOnce(BoxStream<'static, In>) -> BoxStream<'static, Out> + Send + 'static>,
15}
16
17impl<T: Send + 'static> Flow<T, T> {
18 pub fn identity() -> Self {
19 Flow { transform: Box::new(|s| s) }
20 }
21}
22
23impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
24 pub fn from_fn<F>(f: F) -> Self
26 where
27 F: FnMut(In) -> Out + Send + 'static,
28 {
29 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).boxed()) }
30 }
31
32 pub fn map_async<F, Fut>(parallelism: usize, f: F) -> Self
34 where
35 F: FnMut(In) -> Fut + Send + 'static,
36 Fut: Future<Output = Out> + Send + 'static,
37 {
38 let p = parallelism.max(1);
39 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).buffered(p).boxed()) }
40 }
41
42 pub fn via<Out2: Send + 'static>(self, next: Flow<Out, Out2>) -> Flow<In, Out2> {
44 Flow {
45 transform: Box::new(move |s: BoxStream<'static, In>| {
46 let mid = (self.transform)(s);
47 (next.transform)(mid)
48 }),
49 }
50 }
51
52 pub fn then<Out2, F>(self, g: F) -> Flow<In, Out2>
54 where
55 Out2: Send + 'static,
56 F: FnMut(Out) -> Out2 + Send + 'static,
57 {
58 Flow {
59 transform: Box::new(move |s: BoxStream<'static, In>| {
60 let out = (self.transform)(s);
61 out.map(g).boxed()
62 }),
63 }
64 }
65}
66
67impl<In: Send + 'static> Flow<In, In> {
68 pub fn filter<F>(mut f: F) -> Self
69 where
70 F: FnMut(&In) -> bool + Send + 'static,
71 {
72 Flow {
73 transform: Box::new(move |s: BoxStream<'static, In>| {
74 s.filter(move |v| futures::future::ready(f(v))).boxed()
75 }),
76 }
77 }
78
79 pub fn take(n: usize) -> Self {
80 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.take(n).boxed()) }
81 }
82
83 pub fn skip(n: usize) -> Self {
84 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.skip(n).boxed()) }
85 }
86
87 pub fn throttle(interval: Duration) -> Self {
88 Flow {
89 transform: Box::new(move |s: BoxStream<'static, In>| {
90 s.then(move |v| async move {
91 tokio::time::sleep(interval).await;
92 v
93 })
94 .boxed()
95 }),
96 }
97 }
98}
99
100impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
101 pub fn flat_map_concat<F, S, U>(mut f: F) -> Flow<In, U>
103 where
104 F: FnMut(In) -> S + Send + 'static,
105 S: IntoIterator<Item = U> + Send + 'static,
106 S::IntoIter: Send + 'static,
107 U: Send + 'static,
108 In: 'static,
110 {
111 Flow {
112 transform: Box::new(move |s: BoxStream<'static, In>| {
113 s.flat_map(move |x| futures::stream::iter(f(x))).boxed()
114 }),
115 }
116 }
117}