1use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{self, BoxStream, StreamExt};
12
13use crate::flow::Flow;
14use crate::overflow::OverflowStrategy;
15
16pub struct Source<T> {
17 pub(crate) inner: BoxStream<'static, T>,
18}
19
20impl<T: Send + 'static> Source<T> {
21 #[allow(clippy::should_implement_trait)]
24 pub fn from_iter<I: IntoIterator<Item = T> + Send + 'static>(iter: I) -> Self
25 where
26 I::IntoIter: Send + 'static,
27 {
28 Source { inner: stream::iter(iter).boxed() }
29 }
30
31 pub fn single(value: T) -> Self {
32 Source { inner: stream::once(async move { value }).boxed() }
33 }
34
35 pub fn empty() -> Self {
36 Source { inner: stream::empty().boxed() }
37 }
38
39 pub fn repeat(value: T) -> Self
40 where
41 T: Clone,
42 {
43 Source { inner: stream::repeat(value).boxed() }
44 }
45
46 pub fn cycle<I: IntoIterator<Item = T> + Clone + Send + 'static>(iter: I) -> Self
47 where
48 I::IntoIter: Send + 'static,
49 T: Clone,
50 {
51 Source {
52 inner: stream::unfold(iter.into_iter(), |mut it| async move { it.next().map(|v| (v, it)) })
53 .boxed(),
54 }
55 }
56
57 pub fn from_future<F>(fut: F) -> Self
58 where
59 F: Future<Output = T> + Send + 'static,
60 {
61 Source { inner: stream::once(fut).boxed() }
62 }
63
64 pub fn unfold<S, F, Fut>(init: S, f: F) -> Self
65 where
66 S: Send + 'static,
67 F: FnMut(S) -> Fut + Send + 'static,
68 Fut: Future<Output = Option<(T, S)>> + Send + 'static,
69 {
70 Source { inner: stream::unfold(init, f).boxed() }
71 }
72
73 pub fn tick(initial_delay: Duration, interval: Duration, value: T) -> Self
74 where
75 T: Clone,
76 {
77 let stream = stream::unfold(true, move |first| {
78 let d = if first { initial_delay } else { interval };
79 let v = value.clone();
80 async move {
81 tokio::time::sleep(d).await;
82 Some((v, false))
83 }
84 });
85 Source { inner: stream.boxed() }
86 }
87
88 pub fn failed<E>(error: E) -> Source<Result<T, E>>
89 where
90 E: Send + 'static,
91 {
92 Source { inner: stream::once(async move { Err(error) }).boxed() }
93 }
94
95 pub fn from_receiver(rx: tokio::sync::mpsc::UnboundedReceiver<T>) -> Self {
96 Source { inner: stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|v| (v, rx)) }).boxed() }
97 }
98
99 pub fn map<U, F>(self, f: F) -> Source<U>
102 where
103 F: FnMut(T) -> U + Send + 'static,
104 U: Send + 'static,
105 {
106 Source { inner: self.inner.map(f).boxed() }
107 }
108
109 pub fn map_async<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
111 where
112 F: FnMut(T) -> Fut + Send + 'static,
113 Fut: Future<Output = U> + Send + 'static,
114 U: Send + 'static,
115 {
116 let p = parallelism.max(1);
117 Source { inner: self.inner.map(f).buffered(p).boxed() }
118 }
119
120 pub fn map_async_unordered<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
121 where
122 F: FnMut(T) -> Fut + Send + 'static,
123 Fut: Future<Output = U> + Send + 'static,
124 U: Send + 'static,
125 {
126 let p = parallelism.max(1);
127 Source { inner: self.inner.map(f).buffer_unordered(p).boxed() }
128 }
129
130 pub fn async_boundary(self, buffer: usize) -> Source<T> {
141 let buffer = buffer.max(1);
142 let (tx, rx) = tokio::sync::mpsc::channel::<T>(buffer);
143 let mut inner = self.inner;
144 tokio::spawn(async move {
145 while let Some(item) = inner.next().await {
146 if tx.send(item).await.is_err() {
147 return;
148 }
149 }
150 });
151 let stream =
152 futures::stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|item| (item, rx)) });
153 Source { inner: stream.boxed() }
154 }
155
156 pub fn filter<F>(self, mut f: F) -> Source<T>
157 where
158 F: FnMut(&T) -> bool + Send + 'static,
159 {
160 Source { inner: self.inner.filter(move |v| futures::future::ready(f(v))).boxed() }
161 }
162
163 pub fn filter_map<U, F>(self, mut f: F) -> Source<U>
164 where
165 F: FnMut(T) -> Option<U> + Send + 'static,
166 U: Send + 'static,
167 {
168 Source { inner: self.inner.filter_map(move |v| futures::future::ready(f(v))).boxed() }
169 }
170
171 pub fn take(self, n: usize) -> Source<T> {
172 Source { inner: self.inner.take(n).boxed() }
173 }
174
175 pub fn take_while<F>(self, mut f: F) -> Source<T>
176 where
177 F: FnMut(&T) -> bool + Send + 'static,
178 {
179 Source { inner: self.inner.take_while(move |v| futures::future::ready(f(v))).boxed() }
180 }
181
182 pub fn skip(self, n: usize) -> Source<T> {
183 Source { inner: self.inner.skip(n).boxed() }
184 }
185
186 pub fn skip_while<F>(self, mut f: F) -> Source<T>
187 where
188 F: FnMut(&T) -> bool + Send + 'static,
189 {
190 Source { inner: self.inner.skip_while(move |v| futures::future::ready(f(v))).boxed() }
191 }
192
193 pub fn scan<Acc, F>(self, init: Acc, mut f: F) -> Source<Acc>
194 where
195 Acc: Clone + Send + 'static,
196 F: FnMut(&Acc, T) -> Acc + Send + 'static,
197 {
198 Source {
199 inner: self
200 .inner
201 .scan(init, move |state, item| {
202 *state = f(state, item);
203 futures::future::ready(Some(state.clone()))
204 })
205 .boxed(),
206 }
207 }
208
209 pub fn grouped(self, n: usize) -> Source<Vec<T>> {
211 Source { inner: self.inner.chunks(n.max(1)).boxed() }
212 }
213
214 pub fn intersperse(self, sep: T) -> Source<T>
215 where
216 T: Clone,
217 {
218 let state = InterspersState { started: false, pending: None, sep, done: false };
219 Source {
220 inner: stream::unfold((self.inner, state), |(mut s, mut st)| async move {
221 if st.done {
222 return None;
223 }
224 if let Some(p) = st.pending.take() {
225 return Some((p, (s, st)));
226 }
227 let next = s.next().await;
228 match next {
229 None => None,
230 Some(v) => {
231 if !st.started {
232 st.started = true;
233 Some((v, (s, st)))
234 } else {
235 st.pending = Some(v);
236 let sep = st.sep.clone();
237 Some((sep, (s, st)))
238 }
239 }
240 }
241 })
242 .boxed(),
243 }
244 }
245
246 pub fn concat(self, other: Source<T>) -> Source<T> {
247 Source { inner: self.inner.chain(other.inner).boxed() }
248 }
249
250 pub fn prepend(self, other: Source<T>) -> Source<T> {
251 Source { inner: other.inner.chain(self.inner).boxed() }
252 }
253
254 pub fn delay(self, d: Duration) -> Source<T> {
256 Source {
257 inner: self
258 .inner
259 .then(move |v| async move {
260 tokio::time::sleep(d).await;
261 v
262 })
263 .boxed(),
264 }
265 }
266
267 pub fn initial_delay(self, d: Duration) -> Source<T> {
269 let inner = self.inner;
270 Source {
271 inner: stream::once(async move {
272 tokio::time::sleep(d).await;
273 inner
274 })
275 .flatten()
276 .boxed(),
277 }
278 }
279
280 pub fn throttle(self, interval: Duration) -> Source<T> {
282 Source {
283 inner: self
284 .inner
285 .then(move |v| async move {
286 tokio::time::sleep(interval).await;
287 v
288 })
289 .boxed(),
290 }
291 }
292
293 pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Source<T> {
294 crate::overflow::apply(self, size, strategy)
295 }
296
297 pub fn wire_tap<F>(self, mut f: F) -> Source<T>
299 where
300 F: FnMut(&T) + Send + 'static,
301 {
302 Source { inner: self.inner.inspect(move |v| f(v)).boxed() }
303 }
304
305 pub fn via<U>(self, flow: Flow<T, U>) -> Source<U>
306 where
307 U: Send + 'static,
308 {
309 Source { inner: (flow.transform)(self.inner) }
310 }
311
312 pub(crate) fn into_boxed(self) -> BoxStream<'static, T> {
313 self.inner
314 }
315}
316
317struct InterspersState<T> {
318 started: bool,
319 pending: Option<T>,
320 sep: T,
321 done: bool,
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use crate::sink::Sink;
328
329 #[tokio::test]
330 async fn map_filter_take() {
331 let out: Vec<i32> =
332 Sink::collect(Source::from_iter(0..100).map(|x| x * 3).filter(|x| x % 2 == 0).take(5)).await;
333 assert_eq!(out, vec![0, 6, 12, 18, 24]);
334 }
335
336 #[tokio::test]
337 async fn scan_emits_running_state() {
338 let out: Vec<i32> =
339 Sink::collect(Source::from_iter(vec![1, 2, 3, 4]).scan(0, |acc, x| acc + x)).await;
340 assert_eq!(out, vec![1, 3, 6, 10]);
341 }
342
343 #[tokio::test]
344 async fn grouped_packs_chunks() {
345 let out: Vec<Vec<i32>> = Sink::collect(Source::from_iter(1..=7).grouped(3)).await;
346 assert_eq!(out, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
347 }
348
349 #[tokio::test]
350 async fn intersperse_inserts_separator() {
351 let out: Vec<i32> = Sink::collect(Source::from_iter(vec![1, 2, 3]).intersperse(0)).await;
352 assert_eq!(out, vec![1, 0, 2, 0, 3]);
353 }
354
355 #[tokio::test]
356 async fn map_async_preserves_order() {
357 let out: Vec<i32> = Sink::collect(Source::from_iter(1..=4).map_async(4, |x| async move {
358 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
359 x * x
360 }))
361 .await;
362 assert_eq!(out, vec![1, 4, 9, 16]);
363 }
364
365 #[tokio::test]
366 async fn concat_and_prepend_join_sources() {
367 let a = Source::from_iter(vec![1, 2]);
368 let b = Source::from_iter(vec![3, 4]);
369 assert_eq!(Sink::collect(a.concat(b)).await, vec![1, 2, 3, 4]);
370
371 let a = Source::from_iter(vec![1, 2]);
372 let b = Source::from_iter(vec![3, 4]);
373 assert_eq!(Sink::collect(a.prepend(b)).await, vec![3, 4, 1, 2]);
374 }
375
376 #[tokio::test]
377 async fn wire_tap_observes_without_consuming() {
378 let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::<i32>::new()));
379 let seen_c = seen.clone();
380 let out = Sink::collect(
381 Source::from_iter(vec![1, 2, 3]).wire_tap(move |v| seen_c.lock().unwrap().push(*v)),
382 )
383 .await;
384 assert_eq!(out, vec![1, 2, 3]);
385 assert_eq!(seen.lock().unwrap().clone(), vec![1, 2, 3]);
386 }
387}