1use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{BoxStream, StreamExt};
12
13pub struct Flow<In, Out> {
14 pub(crate) transform: Box<dyn FnOnce(BoxStream<'static, In>) -> BoxStream<'static, Out> + Send + 'static>,
15}
16
17impl<T: Send + 'static> Flow<T, T> {
18 pub fn identity() -> Self {
19 Flow { transform: Box::new(|s| s) }
20 }
21}
22
23impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
24 pub fn from_fn<F>(f: F) -> Self
26 where
27 F: FnMut(In) -> Out + Send + 'static,
28 {
29 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).boxed()) }
30 }
31
32 pub fn map_async<F, Fut>(parallelism: usize, f: F) -> Self
35 where
36 F: FnMut(In) -> Fut + Send + 'static,
37 Fut: Future<Output = Out> + Send + 'static,
38 {
39 let p = parallelism.max(1);
40 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.map(f).buffered(p).boxed()) }
41 }
42
43 pub fn via<Out2: Send + 'static>(self, next: Flow<Out, Out2>) -> Flow<In, Out2> {
45 Flow {
46 transform: Box::new(move |s: BoxStream<'static, In>| {
47 let mid = (self.transform)(s);
48 (next.transform)(mid)
49 }),
50 }
51 }
52
53 pub fn then<Out2, F>(self, g: F) -> Flow<In, Out2>
55 where
56 Out2: Send + 'static,
57 F: FnMut(Out) -> Out2 + Send + 'static,
58 {
59 Flow {
60 transform: Box::new(move |s: BoxStream<'static, In>| {
61 let out = (self.transform)(s);
62 out.map(g).boxed()
63 }),
64 }
65 }
66}
67
68impl<In: Send + 'static> Flow<In, In> {
69 pub fn filter<F>(mut f: F) -> Self
70 where
71 F: FnMut(&In) -> bool + Send + 'static,
72 {
73 Flow {
74 transform: Box::new(move |s: BoxStream<'static, In>| {
75 s.filter(move |v| futures::future::ready(f(v))).boxed()
76 }),
77 }
78 }
79
80 pub fn take(n: usize) -> Self {
81 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.take(n).boxed()) }
82 }
83
84 pub fn skip(n: usize) -> Self {
85 Flow { transform: Box::new(move |s: BoxStream<'static, In>| s.skip(n).boxed()) }
86 }
87
88 pub fn throttle(interval: Duration) -> Self {
90 Flow {
91 transform: Box::new(move |s: BoxStream<'static, In>| {
92 s.then(move |v| async move {
93 tokio::time::sleep(interval).await;
94 v
95 })
96 .boxed()
97 }),
98 }
99 }
100}
101
102impl<In: Send + 'static, Out: Send + 'static> Flow<In, Out> {
103 pub fn flat_map_concat<F, S, U>(mut f: F) -> Flow<In, U>
105 where
106 F: FnMut(In) -> S + Send + 'static,
107 S: IntoIterator<Item = U> + Send + 'static,
108 S::IntoIter: Send + 'static,
109 U: Send + 'static,
110 In: 'static,
112 {
113 Flow {
114 transform: Box::new(move |s: BoxStream<'static, In>| {
115 s.flat_map(move |x| futures::stream::iter(f(x))).boxed()
116 }),
117 }
118 }
119}