cobalt_async/
chunker.rs

1use futures::prelude::*;
2
3/// The return value of chunker functions used with [`apply_chunker`].
4pub enum ChunkResult<Chunk, State> {
5    /// Indicates that `apply_chunker` should continue to the next input stream value without yielding a chunk.
6    Continue(
7        /// The updated chunk value to be passed to the next iteration.
8        Option<Chunk>,
9        /// The updated state value to be passed to the next iteration.
10        State,
11    ),
12    /// Indicates that `apply_chunker` should yield a chunk and then continue to the next input stream value.
13    Yield(
14        /// The updated chunk value to be passed to the next iteration.
15        Option<Chunk>,
16        /// The updated state value to be passed to the next iteration.
17        State,
18        /// The chunk to be yielded to the output stream.
19        Chunk,
20    ),
21}
22
23/// Converts a [`Stream`] of type `T` to a `Stream` of type `Chunk` by applying the function `chunker`
24/// to each item in the stream in turn.
25///
26/// On each call `chunker` can either return [`ChunkResult::Yield`] to yield a chunk to the output stream,
27/// or [`ChunkResult::Continue`] to continue on to the next item without yielding.
28///
29/// The `chunker` is passed three arguments; the next item in the input stream (`T`), the current chunk (`Option<Chunk>`), and the current state (`State`).
30/// On each call, the chunker can update the current chunk and the current state.
31/// The chunker must return the updated chunk and state values.
32///
33/// Once the input stream has exhausted, if the current chunk is not `None` it will be automatically yielded as the final value in the output stream.
34///
35/// The caller must provide initial values for both the chunk and state.
36///
37/// # Example
38///
39/// ```
40/// use cobalt_async::{apply_chunker, ChunkResult};
41/// use futures::{stream, StreamExt};
42///
43/// /// Takes an input stream of numerical values and returns a stream of vectors of numbers,
44/// /// where the sum of each vector no more than ten.
45/// /// If a value greater than ten is encountered, it is yielded as a vector with a single value.
46/// async fn ten_chunker(
47///     val: u64,
48///     chunk: Option<Vec<u64>>,
49///     state: u64,  // The current running sum
50/// ) -> ChunkResult<Vec<u64>, u64> {
51///     if state + val > 10 {
52///         // Yield the current chunk, and start a new chunk
53///         ChunkResult::Yield(Some(vec![val]), val, chunk.unwrap())
54///     } else {
55///         // Add the value to the current chunk, and update the sum
56///         let mut chunk = chunk.unwrap_or_default();
57///         chunk.push(val);
58///         ChunkResult::Continue(Some(chunk), state + val)
59///     }
60/// }
61///
62/// # tokio_test::block_on(async {
63/// let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 3, 13, 4, 5]);
64/// let groups = apply_chunker(ten_chunker, stream, None, 0).collect::<Vec<_>>().await;
65/// assert_eq!(groups, vec![vec![1, 2, 3, 4], vec![5], vec![6, 3], vec![13], vec![4, 5]]);
66/// # })
67/// ```
68pub fn apply_chunker<T, Chunk, State, F, Fut>(
69    chunker: F,
70    stream: impl Stream<Item = T> + Unpin,
71    initial_chunk: Option<Chunk>,
72    initial_state: State,
73) -> impl Stream<Item = Chunk> + Unpin
74where
75    F: Fn(T, Option<Chunk>, State) -> Fut,
76    Fut: Future<Output = ChunkResult<Chunk, State>>,
77{
78    let complete = false;
79    Box::pin(stream::unfold(
80        (initial_chunk, initial_state, stream, chunker, complete),
81        |(mut current_chunk, mut current_state, mut stream, chunker, complete)| async move {
82            if complete {
83                return None;
84            }
85
86            // Iterate over the stream, and pass each item to the chunking function
87            while let Some(item) = stream.next().await {
88                match chunker(item, current_chunk, current_state).await {
89                    ChunkResult::Continue(chunk, state) => {
90                        current_chunk = chunk;
91                        current_state = state;
92                    }
93                    ChunkResult::Yield(chunk, state, complete_chunk) => {
94                        return Some((complete_chunk, (chunk, state, stream, chunker, false)));
95                    }
96                }
97            }
98
99            // writing this as current_chunk.map(...) makes the logic less obvious
100            #[allow(clippy::manual_map)]
101            // The stream ran out, so check if we have a pending chunk
102            match current_chunk {
103                Some(chunk) => {
104                    // If we do, yield the pending chunk, and set the "complete"
105                    // flag to true, so that we can yield a final None next time around the loop.
106                    Some((chunk, (None, current_state, stream, chunker, true)))
107                }
108                // Otherwise simply return None.
109                None => None,
110            }
111        },
112    ))
113}
114
115/// Converts a [`TryStream`] of type `T` to a `TryStream` of type `Chunk` by applying the function `chunker`
116/// to each item in the stream in turn.
117///
118/// On each call `chunker` can either return [`ChunkResult::Yield`] to yield a chunk to the output stream,
119/// or [`ChunkResult::Continue`] to continue on to the next item without yielding.
120///
121/// The `chunker` is passed three arguments; the next item in the input stream (`T`), the current chunk (`Option<Chunk>`), and the current state (`State`).
122/// On each call, the chunker can update the current chunk and the current state.
123/// The chunker must return the updated chunk and state values.
124///
125/// Once the input stream has exhausted, if the current chunk is not `None` it will be automatically yielded as the final value in the output stream.
126///
127/// In an `Err` is encountered in the input stream, it is passed through to the output stream while maintaining the current chunk and state.
128///
129/// The caller must provide initial values for both the chunk and state.
130///
131/// # Example
132///
133/// ```
134/// use cobalt_async::{try_apply_chunker, ChunkResult};
135/// use futures::{stream, StreamExt, TryStreamExt};
136///
137/// /// Takes an input stream of numerical values and returns a stream of vectors of numbers,
138/// /// where the sum of each vector no more than ten.
139/// /// If a value greater than ten is encountered, it is yielded as a vector with a single value.
140/// async fn ten_chunker(
141///     val: u64,
142///     chunk: Option<Vec<u64>>,
143///     state: u64,  // The current running sum
144/// ) -> ChunkResult<Vec<u64>, u64> {
145///     if state + val > 10 {
146///         // Yield the current chunk, and start a new chunk
147///         ChunkResult::Yield(Some(vec![val]), val, chunk.unwrap())
148///     } else {
149///         // Add the value to the current chunk, and update the sum
150///         let mut chunk = chunk.unwrap_or_default();
151///         chunk.push(val);
152///         ChunkResult::Continue(Some(chunk), state + val)
153///     }
154/// }
155///
156/// # tokio_test::block_on(async {
157///
158/// // A `TryStream` of values with no errors
159/// let stream = stream::iter(vec![Ok::<_,std::io::Error>(1), Ok(2), Ok(3), Ok(4), Ok(5), Ok(6)]);
160/// let groups = try_apply_chunker(ten_chunker, stream, None, 0)
161///     .try_collect::<Vec<_>>()
162///     .await
163///     .unwrap();
164/// assert_eq!(groups, vec![vec![1, 2, 3, 4], vec![5], vec![6]]);
165///
166/// // A `TryStream` of values with interleaved errors
167/// let stream = stream::iter(vec![
168///     Ok(1),
169///     Ok(2),
170///     Err("Error"),
171///     Ok(3),
172///     Ok(4),
173///     Ok(5),
174///     Err("Error"),
175///     Ok(6),
176/// ]);
177/// let groups = try_apply_chunker(ten_chunker, stream, None, 0)
178///     .into_stream()
179///     .collect::<Vec<_>>()
180///     .await;
181///
182/// // The resulting `TryStream` includes the errors
183/// assert!(groups[0].is_err());
184/// assert_eq!(*groups[1].as_ref().unwrap(), vec![1, 2, 3, 4]);
185/// assert!(groups[2].is_err());
186/// assert_eq!(*groups[3].as_ref().unwrap(), vec![5]);
187/// assert_eq!(*groups[4].as_ref().unwrap(), vec![6]);
188/// # })
189/// ```
190pub fn try_apply_chunker<T, E, Chunk, State, F, Fut>(
191    chunker: F,
192    stream: impl TryStream<Ok = T, Error = E> + Unpin,
193    initial_chunk: Option<Chunk>,
194    initial_state: State,
195) -> impl TryStream<Ok = Chunk, Error = E> + Unpin
196where
197    F: Fn(T, Option<Chunk>, State) -> Fut,
198    Fut: Future<Output = ChunkResult<Chunk, State>>,
199{
200    let complete = false;
201    Box::pin(stream::unfold(
202        (initial_chunk, initial_state, stream, chunker, complete),
203        |(mut current_chunk, mut current_state, mut stream, chunker, complete)| async move {
204            if complete {
205                return None;
206            }
207            loop {
208                match stream.try_next().await {
209                    Err(e) => {
210                        // An error in the input stream. Return an error, but maintain the current chunk
211                        return Some((
212                            Err(e),
213                            (current_chunk, current_state, stream, chunker, complete),
214                        ));
215                    }
216                    Ok(None) => {
217                        // writing this as current_chunk.map(...) makes the logic less obvious
218                        #[allow(clippy::manual_map)]
219                        // The stream ran out, so check if we have a pending chunk
220                        match current_chunk {
221                            Some(chunk) => {
222                                // If we do, yield the pending chunk, and set the "complete"
223                                // flag to true, so that we can yield a final None next time around the loop.
224                                return Some((
225                                    Ok(chunk),
226                                    (None, current_state, stream, chunker, true),
227                                ));
228                            }
229                            // Otherwise simply return None.
230                            None => return None,
231                        }
232                    }
233                    Ok(Some(item)) => match chunker(item, current_chunk, current_state).await {
234                        ChunkResult::Continue(chunk, state) => {
235                            current_chunk = chunk;
236                            current_state = state;
237                        }
238                        ChunkResult::Yield(chunk, state, complete_chunk) => {
239                            return Some((
240                                Ok(complete_chunk),
241                                (chunk, state, stream, chunker, false),
242                            ));
243                        }
244                    },
245                }
246            }
247        },
248    ))
249}
250
251#[cfg(test)]
252mod test_apply_chunker {
253
254    use super::*;
255    use futures::stream::empty;
256
257    /// Yields the values it recieves
258    async fn identity_chunker<T>(item: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
259        ChunkResult::Yield(None, (), item)
260    }
261
262    #[tokio::test]
263    async fn test_identity() {
264        // An empty stream yields an empty stream
265        let x = apply_chunker(identity_chunker, empty::<()>(), None, ())
266            .collect::<Vec<_>>()
267            .await;
268        assert_eq!(x, vec![]);
269
270        // A stream with values yields the same values
271        let stream = stream::iter(vec![1, 2, 3, 4]);
272        let x = apply_chunker(identity_chunker, stream, None, ())
273            .collect::<Vec<_>>()
274            .await;
275        assert_eq!(x, vec![1, 2, 3, 4]);
276    }
277
278    /// Never yields any values
279    async fn null_chunker<T>(_: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
280        ChunkResult::Continue(None, ())
281    }
282
283    #[tokio::test]
284    async fn test_null() {
285        // An empty stream yields an empty stream
286        let x = apply_chunker(null_chunker, empty::<()>(), None, ())
287            .collect::<Vec<_>>()
288            .await;
289        assert_eq!(x, vec![]);
290
291        // A stream with values yields an empty stream
292        let stream = stream::iter(vec![1, 2, 3, 4]);
293        let x = apply_chunker(null_chunker, stream, None, ())
294            .collect::<Vec<_>>()
295            .await;
296        assert_eq!(x, vec![]);
297    }
298
299    /// Yields pairs of values. If there are an odd number of values, the un-paired value is dropped.
300    async fn pair_chunker<T>(
301        item: T,
302        _: Option<(T, T)>,
303        state: Option<T>,
304    ) -> ChunkResult<(T, T), Option<T>> {
305        match state {
306            Some(first) => ChunkResult::Yield(None, None, (first, item)),
307            None => ChunkResult::Continue(None, Some(item)),
308        }
309    }
310
311    #[tokio::test]
312    async fn test_pairs() {
313        // An empty stream yields an empty stream
314        let x = apply_chunker(pair_chunker, empty::<()>(), None, None)
315            .collect::<Vec<_>>()
316            .await;
317        assert_eq!(x, vec![]);
318
319        // A stream with values yields pairs, but drops the last value
320        let stream = stream::iter(vec![1, 2, 3, 4, 5]);
321        let x = apply_chunker(pair_chunker, stream, None, None)
322            .collect::<Vec<_>>()
323            .await;
324        assert_eq!(x, vec![(1, 2), (3, 4)]);
325    }
326
327    /// Yields vectors of values with a sum less than ten (unless the value itself)
328    /// is greater than 10, in which case we return a vec with just that value.
329    async fn ten_chunker(
330        item: u64,
331        chunk: Option<Vec<u64>>,
332        state: u64,
333    ) -> ChunkResult<Vec<u64>, u64> {
334        if state + item > 10 {
335            ChunkResult::Yield(Some(vec![item]), item, chunk.unwrap())
336        } else {
337            let mut chunk = chunk.unwrap_or_default();
338            chunk.push(item);
339            ChunkResult::Continue(Some(chunk), state + item)
340        }
341    }
342
343    #[tokio::test]
344    async fn test_ten_chunker() {
345        // An empty stream yields an empty stream
346        let x = apply_chunker(ten_chunker, empty::<u64>(), None, 0)
347            .collect::<Vec<Vec<u64>>>()
348            .await;
349        let y: Vec<Vec<_>> = vec![];
350        assert_eq!(x, y);
351
352        // A stream with values yields pairs, but drops the last value
353        let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 3, 13, 4, 5]);
354        let x = apply_chunker(ten_chunker, stream, None, 0)
355            .collect::<Vec<_>>()
356            .await;
357        assert_eq!(
358            x,
359            vec![vec![1, 2, 3, 4], vec![5], vec![6, 3], vec![13], vec![4, 5]]
360        );
361    }
362}
363
364#[cfg(test)]
365mod test_try_apply_chunker {
366
367    use super::*;
368    use anyhow::Result;
369    use futures::stream::empty;
370
371    /// Yields the values it recieves
372    async fn identity_chunker<T>(item: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
373        ChunkResult::Yield(None, (), item)
374    }
375
376    #[tokio::test]
377    async fn test_identity() -> Result<()> {
378        // An empty stream yields an empty stream
379        let x = try_apply_chunker(identity_chunker, empty::<Result<()>>(), None, ())
380            .try_collect::<Vec<_>>()
381            .await?;
382        assert_eq!(x, vec![]);
383
384        // A stream with values yields the same values
385        let stream = stream::iter(vec![Ok::<_, anyhow::Error>(1), Ok(2), Ok(3), Ok(4)]);
386        let x = try_apply_chunker(identity_chunker, stream, None, ())
387            .try_collect::<Vec<_>>()
388            .await?;
389        assert_eq!(x, vec![1, 2, 3, 4]);
390
391        // A stream with values and errors yields the same values and errors
392        let stream = stream::iter(vec![
393            Ok(1),
394            Err(anyhow::anyhow!("ERR")),
395            Ok(2),
396            Ok(3),
397            Ok(4),
398            Err(anyhow::anyhow!("ERR")),
399        ]);
400        let x = try_apply_chunker(identity_chunker, stream, None, ())
401            .into_stream()
402            .collect::<Vec<_>>()
403            .await;
404        assert_eq!(*x[0].as_ref().unwrap(), 1);
405        assert!(x[1].is_err());
406        assert_eq!(*x[2].as_ref().unwrap(), 2);
407        assert_eq!(*x[3].as_ref().unwrap(), 3);
408        assert_eq!(*x[4].as_ref().unwrap(), 4);
409        assert!(x[5].is_err());
410
411        Ok(())
412    }
413
414    /// Never yields any values
415    async fn null_chunker<T>(_: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
416        ChunkResult::Continue(None, ())
417    }
418
419    #[tokio::test]
420    async fn test_null() -> Result<()> {
421        // An empty stream yields an empty stream
422        let x = try_apply_chunker(null_chunker, empty::<Result<()>>(), None, ())
423            .try_collect::<Vec<_>>()
424            .await?;
425        assert_eq!(x, vec![]);
426
427        // A stream with values yields an empty stream
428        let stream = stream::iter(vec![Ok::<_, anyhow::Error>(1), Ok(2), Ok(3), Ok(4)]);
429        let x = try_apply_chunker(null_chunker, stream, None, ())
430            .try_collect::<Vec<_>>()
431            .await?;
432        assert_eq!(x, vec![]);
433
434        // A stream with values and errors yields just the errors
435        let stream = stream::iter(vec![
436            Ok(1),
437            Err(anyhow::anyhow!("ERR")),
438            Ok(2),
439            Ok(3),
440            Ok(4),
441            Err(anyhow::anyhow!("ERR")),
442        ]);
443        let x = try_apply_chunker(null_chunker, stream, None, ())
444            .into_stream()
445            .collect::<Vec<_>>()
446            .await;
447        assert!(x[0].is_err());
448        assert!(x[1].is_err());
449
450        Ok(())
451    }
452
453    /// Yields pairs of values. If there are an odd number of values, the un-paired value is dropped.
454    async fn pair_chunker<T>(
455        item: T,
456        _: Option<(T, T)>,
457        state: Option<T>,
458    ) -> ChunkResult<(T, T), Option<T>> {
459        match state {
460            Some(first) => ChunkResult::Yield(None, None, (first, item)),
461            None => ChunkResult::Continue(None, Some(item)),
462        }
463    }
464
465    #[tokio::test]
466    async fn test_pairs() -> Result<()> {
467        // An empty stream yields an empty stream
468        let x = try_apply_chunker(pair_chunker, empty::<Result<()>>(), None, None)
469            .try_collect::<Vec<_>>()
470            .await?;
471        assert_eq!(x, vec![]);
472
473        // A stream with values yields pairs, but drops the last value
474        let stream = stream::iter(vec![Ok::<_, anyhow::Error>(1), Ok(2), Ok(3), Ok(4), Ok(5)]);
475        let x = try_apply_chunker(pair_chunker, stream, None, None)
476            .try_collect::<Vec<_>>()
477            .await?;
478        assert_eq!(x, vec![(1, 2), (3, 4)]);
479
480        // A stream with values and errors yields pairs, and interleaves the errors
481        let stream = stream::iter(vec![
482            Ok(1),
483            Err(anyhow::anyhow!("ERR")),
484            Ok(2),
485            Ok(3),
486            Ok(4),
487            Err(anyhow::anyhow!("ERR")),
488        ]);
489        let x = try_apply_chunker(pair_chunker, stream, None, None)
490            .into_stream()
491            .collect::<Vec<_>>()
492            .await;
493        assert!(x[0].is_err());
494        assert_eq!(*x[1].as_ref().unwrap(), (1, 2));
495        assert_eq!(*x[2].as_ref().unwrap(), (3, 4));
496        assert!(x[3].is_err());
497
498        Ok(())
499    }
500
501    /// Yields vectors of values with a sum less than ten (unless the value itself)
502    /// is greater than 10, in which case we return a vec with just that value.
503    async fn ten_chunker(
504        item: u64,
505        chunk: Option<Vec<u64>>,
506        state: u64,
507    ) -> ChunkResult<Vec<u64>, u64> {
508        if state + item > 10 {
509            ChunkResult::Yield(Some(vec![item]), item, chunk.unwrap())
510        } else {
511            let mut chunk = chunk.unwrap_or_default();
512            chunk.push(item);
513            ChunkResult::Continue(Some(chunk), state + item)
514        }
515    }
516
517    #[tokio::test]
518    async fn test_ten_chunker() -> Result<()> {
519        // An empty stream yields an empty stream
520        let x = try_apply_chunker(ten_chunker, empty::<Result<u64>>(), None, 0)
521            .try_collect::<Vec<Vec<u64>>>()
522            .await?;
523        let y: Vec<Vec<_>> = vec![];
524        assert_eq!(x, y);
525
526        // A stream with values yields groups which add up to ten
527        let stream = stream::iter(vec![
528            Ok::<_, anyhow::Error>(1),
529            Ok(2),
530            Ok(3),
531            Ok(4),
532            Ok(5),
533            Ok(6),
534            Ok(3),
535            Ok(13),
536            Ok(4),
537            Ok(5),
538        ]);
539        let x = try_apply_chunker(ten_chunker, stream, None, 0)
540            .try_collect::<Vec<_>>()
541            .await?;
542        assert_eq!(
543            x,
544            vec![vec![1, 2, 3, 4], vec![5], vec![6, 3], vec![13], vec![4, 5],]
545        );
546
547        // A stream with values and errors yields groups which add up to ten, with interleaved errors
548        let stream = stream::iter(vec![
549            Ok::<_, anyhow::Error>(1),
550            Ok(2),
551            Err(anyhow::anyhow!("ERR")),
552            Ok(3),
553            Ok(4),
554            Ok(5),
555            Ok(6),
556            Err(anyhow::anyhow!("ERR")),
557            Ok(3),
558            Ok(13),
559            Ok(4),
560            Ok(5),
561        ]);
562        let x = try_apply_chunker(ten_chunker, stream, None, 0)
563            .into_stream()
564            .collect::<Vec<_>>()
565            .await;
566        assert!(x[0].is_err());
567        assert_eq!(*x[1].as_ref().unwrap(), vec![1, 2, 3, 4]);
568        assert_eq!(*x[2].as_ref().unwrap(), vec![5]);
569        assert!(x[3].is_err());
570        assert_eq!(*x[4].as_ref().unwrap(), vec![6, 3]);
571        assert_eq!(*x[5].as_ref().unwrap(), vec![13]);
572        assert_eq!(*x[6].as_ref().unwrap(), vec![4, 5]);
573
574        Ok(())
575    }
576}