1use std::future::Future;
9use std::time::Duration;
10
11use futures::stream::{self, BoxStream, StreamExt};
12
13use crate::flow::Flow;
14use crate::overflow::OverflowStrategy;
15
16pub struct Source<T> {
17 pub(crate) inner: BoxStream<'static, T>,
18}
19
20impl<T: Send + 'static> Source<T> {
21 #[allow(clippy::should_implement_trait)]
24 pub fn from_iter<I: IntoIterator<Item = T> + Send + 'static>(iter: I) -> Self
25 where
26 I::IntoIter: Send + 'static,
27 {
28 Source { inner: stream::iter(iter).boxed() }
29 }
30
31 pub fn single(value: T) -> Self {
32 Source { inner: stream::once(async move { value }).boxed() }
33 }
34
35 pub fn empty() -> Self {
36 Source { inner: stream::empty().boxed() }
37 }
38
39 pub fn repeat(value: T) -> Self
40 where
41 T: Clone,
42 {
43 Source { inner: stream::repeat(value).boxed() }
44 }
45
46 pub fn cycle<I: IntoIterator<Item = T> + Clone + Send + 'static>(iter: I) -> Self
47 where
48 I::IntoIter: Send + 'static,
49 T: Clone,
50 {
51 Source {
52 inner: stream::unfold(iter.into_iter(), |mut it| async move { it.next().map(|v| (v, it)) })
53 .boxed(),
54 }
55 }
56
57 pub fn from_future<F>(fut: F) -> Self
58 where
59 F: Future<Output = T> + Send + 'static,
60 {
61 Source { inner: stream::once(fut).boxed() }
62 }
63
64 pub fn unfold<S, F, Fut>(init: S, f: F) -> Self
65 where
66 S: Send + 'static,
67 F: FnMut(S) -> Fut + Send + 'static,
68 Fut: Future<Output = Option<(T, S)>> + Send + 'static,
69 {
70 Source { inner: stream::unfold(init, f).boxed() }
71 }
72
73 pub fn tick(initial_delay: Duration, interval: Duration, value: T) -> Self
74 where
75 T: Clone,
76 {
77 let stream = stream::unfold(true, move |first| {
78 let d = if first { initial_delay } else { interval };
79 let v = value.clone();
80 async move {
81 tokio::time::sleep(d).await;
82 Some((v, false))
83 }
84 });
85 Source { inner: stream.boxed() }
86 }
87
88 pub fn failed<E>(error: E) -> Source<Result<T, E>>
89 where
90 E: Send + 'static,
91 {
92 Source { inner: stream::once(async move { Err(error) }).boxed() }
93 }
94
95 pub fn from_receiver(rx: tokio::sync::mpsc::UnboundedReceiver<T>) -> Self {
96 Source { inner: stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|v| (v, rx)) }).boxed() }
97 }
98
99 pub fn map<U, F>(self, f: F) -> Source<U>
102 where
103 F: FnMut(T) -> U + Send + 'static,
104 U: Send + 'static,
105 {
106 Source { inner: self.inner.map(f).boxed() }
107 }
108
109 pub fn map_async<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
111 where
112 F: FnMut(T) -> Fut + Send + 'static,
113 Fut: Future<Output = U> + Send + 'static,
114 U: Send + 'static,
115 {
116 let p = parallelism.max(1);
117 Source { inner: self.inner.map(f).buffered(p).boxed() }
118 }
119
120 pub fn map_async_unordered<U, F, Fut>(self, parallelism: usize, f: F) -> Source<U>
122 where
123 F: FnMut(T) -> Fut + Send + 'static,
124 Fut: Future<Output = U> + Send + 'static,
125 U: Send + 'static,
126 {
127 let p = parallelism.max(1);
128 Source { inner: self.inner.map(f).buffer_unordered(p).boxed() }
129 }
130
131 pub fn async_boundary(self, buffer: usize) -> Source<T> {
142 let buffer = buffer.max(1);
143 let (tx, rx) = tokio::sync::mpsc::channel::<T>(buffer);
144 let mut inner = self.inner;
145 tokio::spawn(async move {
146 while let Some(item) = inner.next().await {
147 if tx.send(item).await.is_err() {
148 return;
149 }
150 }
151 });
152 let stream =
153 futures::stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|item| (item, rx)) });
154 Source { inner: stream.boxed() }
155 }
156
157 pub fn filter<F>(self, mut f: F) -> Source<T>
158 where
159 F: FnMut(&T) -> bool + Send + 'static,
160 {
161 Source { inner: self.inner.filter(move |v| futures::future::ready(f(v))).boxed() }
162 }
163
164 pub fn filter_map<U, F>(self, mut f: F) -> Source<U>
165 where
166 F: FnMut(T) -> Option<U> + Send + 'static,
167 U: Send + 'static,
168 {
169 Source { inner: self.inner.filter_map(move |v| futures::future::ready(f(v))).boxed() }
170 }
171
172 pub fn take(self, n: usize) -> Source<T> {
173 Source { inner: self.inner.take(n).boxed() }
174 }
175
176 pub fn take_while<F>(self, mut f: F) -> Source<T>
177 where
178 F: FnMut(&T) -> bool + Send + 'static,
179 {
180 Source { inner: self.inner.take_while(move |v| futures::future::ready(f(v))).boxed() }
181 }
182
183 pub fn skip(self, n: usize) -> Source<T> {
184 Source { inner: self.inner.skip(n).boxed() }
185 }
186
187 pub fn skip_while<F>(self, mut f: F) -> Source<T>
188 where
189 F: FnMut(&T) -> bool + Send + 'static,
190 {
191 Source { inner: self.inner.skip_while(move |v| futures::future::ready(f(v))).boxed() }
192 }
193
194 pub fn scan<Acc, F>(self, init: Acc, mut f: F) -> Source<Acc>
195 where
196 Acc: Clone + Send + 'static,
197 F: FnMut(&Acc, T) -> Acc + Send + 'static,
198 {
199 Source {
200 inner: self
201 .inner
202 .scan(init, move |state, item| {
203 *state = f(state, item);
204 futures::future::ready(Some(state.clone()))
205 })
206 .boxed(),
207 }
208 }
209
210 pub fn grouped(self, n: usize) -> Source<Vec<T>> {
212 Source { inner: self.inner.chunks(n.max(1)).boxed() }
213 }
214
215 pub fn intersperse(self, sep: T) -> Source<T>
216 where
217 T: Clone,
218 {
219 let state = InterspersState { started: false, pending: None, sep, done: false };
220 Source {
221 inner: stream::unfold((self.inner, state), |(mut s, mut st)| async move {
222 if st.done {
223 return None;
224 }
225 if let Some(p) = st.pending.take() {
226 return Some((p, (s, st)));
227 }
228 let next = s.next().await;
229 match next {
230 None => None,
231 Some(v) => {
232 if !st.started {
233 st.started = true;
234 Some((v, (s, st)))
235 } else {
236 st.pending = Some(v);
237 let sep = st.sep.clone();
238 Some((sep, (s, st)))
239 }
240 }
241 }
242 })
243 .boxed(),
244 }
245 }
246
247 pub fn concat(self, other: Source<T>) -> Source<T> {
248 Source { inner: self.inner.chain(other.inner).boxed() }
249 }
250
251 pub fn prepend(self, other: Source<T>) -> Source<T> {
252 Source { inner: other.inner.chain(self.inner).boxed() }
253 }
254
255 pub fn delay(self, d: Duration) -> Source<T> {
257 Source {
258 inner: self
259 .inner
260 .then(move |v| async move {
261 tokio::time::sleep(d).await;
262 v
263 })
264 .boxed(),
265 }
266 }
267
268 pub fn initial_delay(self, d: Duration) -> Source<T> {
270 let inner = self.inner;
271 Source {
272 inner: stream::once(async move {
273 tokio::time::sleep(d).await;
274 inner
275 })
276 .flatten()
277 .boxed(),
278 }
279 }
280
281 pub fn throttle(self, interval: Duration) -> Source<T> {
283 Source {
284 inner: self
285 .inner
286 .then(move |v| async move {
287 tokio::time::sleep(interval).await;
288 v
289 })
290 .boxed(),
291 }
292 }
293
294 pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Source<T> {
296 crate::overflow::apply(self, size, strategy)
297 }
298
299 pub fn wire_tap<F>(self, mut f: F) -> Source<T>
301 where
302 F: FnMut(&T) + Send + 'static,
303 {
304 Source { inner: self.inner.inspect(move |v| f(v)).boxed() }
305 }
306
307 pub fn via<U>(self, flow: Flow<T, U>) -> Source<U>
308 where
309 U: Send + 'static,
310 {
311 Source { inner: (flow.transform)(self.inner) }
312 }
313
314 pub(crate) fn into_boxed(self) -> BoxStream<'static, T> {
315 self.inner
316 }
317}
318
319struct InterspersState<T> {
320 started: bool,
321 pending: Option<T>,
322 sep: T,
323 done: bool,
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use crate::sink::Sink;
330
331 #[tokio::test]
332 async fn map_filter_take() {
333 let out: Vec<i32> =
334 Sink::collect(Source::from_iter(0..100).map(|x| x * 3).filter(|x| x % 2 == 0).take(5)).await;
335 assert_eq!(out, vec![0, 6, 12, 18, 24]);
336 }
337
338 #[tokio::test]
339 async fn scan_emits_running_state() {
340 let out: Vec<i32> =
341 Sink::collect(Source::from_iter(vec![1, 2, 3, 4]).scan(0, |acc, x| acc + x)).await;
342 assert_eq!(out, vec![1, 3, 6, 10]);
343 }
344
345 #[tokio::test]
346 async fn grouped_packs_chunks() {
347 let out: Vec<Vec<i32>> = Sink::collect(Source::from_iter(1..=7).grouped(3)).await;
348 assert_eq!(out, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
349 }
350
351 #[tokio::test]
352 async fn intersperse_inserts_separator() {
353 let out: Vec<i32> = Sink::collect(Source::from_iter(vec![1, 2, 3]).intersperse(0)).await;
354 assert_eq!(out, vec![1, 0, 2, 0, 3]);
355 }
356
357 #[tokio::test]
358 async fn map_async_preserves_order() {
359 let out: Vec<i32> = Sink::collect(Source::from_iter(1..=4).map_async(4, |x| async move {
360 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
361 x * x
362 }))
363 .await;
364 assert_eq!(out, vec![1, 4, 9, 16]);
365 }
366
367 #[tokio::test]
368 async fn concat_and_prepend_join_sources() {
369 let a = Source::from_iter(vec![1, 2]);
370 let b = Source::from_iter(vec![3, 4]);
371 assert_eq!(Sink::collect(a.concat(b)).await, vec![1, 2, 3, 4]);
372
373 let a = Source::from_iter(vec![1, 2]);
374 let b = Source::from_iter(vec![3, 4]);
375 assert_eq!(Sink::collect(a.prepend(b)).await, vec![3, 4, 1, 2]);
376 }
377
378 #[tokio::test]
379 async fn wire_tap_observes_without_consuming() {
380 let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::<i32>::new()));
381 let seen_c = seen.clone();
382 let out = Sink::collect(
383 Source::from_iter(vec![1, 2, 3]).wire_tap(move |v| seen_c.lock().unwrap().push(*v)),
384 )
385 .await;
386 assert_eq!(out, vec![1, 2, 3]);
387 assert_eq!(seen.lock().unwrap().clone(), vec![1, 2, 3]);
388 }
389}