cobalt_async/chunker.rs
1use futures::prelude::*;
2
3/// The return value of chunker functions used with [`apply_chunker`].
4pub enum ChunkResult<Chunk, State> {
5 /// Indicates that `apply_chunker` should continue to the next input stream value without yielding a chunk.
6 Continue(
7 /// The updated chunk value to be passed to the next iteration.
8 Option<Chunk>,
9 /// The updated state value to be passed to the next iteration.
10 State,
11 ),
12 /// Indicates that `apply_chunker` should yield a chunk and then continue to the next input stream value.
13 Yield(
14 /// The updated chunk value to be passed to the next iteration.
15 Option<Chunk>,
16 /// The updated state value to be passed to the next iteration.
17 State,
18 /// The chunk to be yielded to the output stream.
19 Chunk,
20 ),
21}
22
23/// Converts a [`Stream`] of type `T` to a `Stream` of type `Chunk` by applying the function `chunker`
24/// to each item in the stream in turn.
25///
26/// On each call `chunker` can either return [`ChunkResult::Yield`] to yield a chunk to the output stream,
27/// or [`ChunkResult::Continue`] to continue on to the next item without yielding.
28///
29/// The `chunker` is passed three arguments; the next item in the input stream (`T`), the current chunk (`Option<Chunk>`), and the current state (`State`).
30/// On each call, the chunker can update the current chunk and the current state.
31/// The chunker must return the updated chunk and state values.
32///
33/// Once the input stream has exhausted, if the current chunk is not `None` it will be automatically yielded as the final value in the output stream.
34///
35/// The caller must provide initial values for both the chunk and state.
36///
37/// # Example
38///
39/// ```
40/// use cobalt_async::{apply_chunker, ChunkResult};
41/// use futures::{stream, StreamExt};
42///
43/// /// Takes an input stream of numerical values and returns a stream of vectors of numbers,
44/// /// where the sum of each vector no more than ten.
45/// /// If a value greater than ten is encountered, it is yielded as a vector with a single value.
46/// async fn ten_chunker(
47/// val: u64,
48/// chunk: Option<Vec<u64>>,
49/// state: u64, // The current running sum
50/// ) -> ChunkResult<Vec<u64>, u64> {
51/// if state + val > 10 {
52/// // Yield the current chunk, and start a new chunk
53/// ChunkResult::Yield(Some(vec![val]), val, chunk.unwrap())
54/// } else {
55/// // Add the value to the current chunk, and update the sum
56/// let mut chunk = chunk.unwrap_or_default();
57/// chunk.push(val);
58/// ChunkResult::Continue(Some(chunk), state + val)
59/// }
60/// }
61///
62/// # tokio_test::block_on(async {
63/// let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 3, 13, 4, 5]);
64/// let groups = apply_chunker(ten_chunker, stream, None, 0).collect::<Vec<_>>().await;
65/// assert_eq!(groups, vec![vec![1, 2, 3, 4], vec![5], vec![6, 3], vec![13], vec![4, 5]]);
66/// # })
67/// ```
68pub fn apply_chunker<T, Chunk, State, F, Fut>(
69 chunker: F,
70 stream: impl Stream<Item = T> + Unpin,
71 initial_chunk: Option<Chunk>,
72 initial_state: State,
73) -> impl Stream<Item = Chunk> + Unpin
74where
75 F: Fn(T, Option<Chunk>, State) -> Fut,
76 Fut: Future<Output = ChunkResult<Chunk, State>>,
77{
78 let complete = false;
79 Box::pin(stream::unfold(
80 (initial_chunk, initial_state, stream, chunker, complete),
81 |(mut current_chunk, mut current_state, mut stream, chunker, complete)| async move {
82 if complete {
83 return None;
84 }
85
86 // Iterate over the stream, and pass each item to the chunking function
87 while let Some(item) = stream.next().await {
88 match chunker(item, current_chunk, current_state).await {
89 ChunkResult::Continue(chunk, state) => {
90 current_chunk = chunk;
91 current_state = state;
92 }
93 ChunkResult::Yield(chunk, state, complete_chunk) => {
94 return Some((complete_chunk, (chunk, state, stream, chunker, false)));
95 }
96 }
97 }
98
99 // writing this as current_chunk.map(...) makes the logic less obvious
100 #[allow(clippy::manual_map)]
101 // The stream ran out, so check if we have a pending chunk
102 match current_chunk {
103 Some(chunk) => {
104 // If we do, yield the pending chunk, and set the "complete"
105 // flag to true, so that we can yield a final None next time around the loop.
106 Some((chunk, (None, current_state, stream, chunker, true)))
107 }
108 // Otherwise simply return None.
109 None => None,
110 }
111 },
112 ))
113}
114
115/// Converts a [`TryStream`] of type `T` to a `TryStream` of type `Chunk` by applying the function `chunker`
116/// to each item in the stream in turn.
117///
118/// On each call `chunker` can either return [`ChunkResult::Yield`] to yield a chunk to the output stream,
119/// or [`ChunkResult::Continue`] to continue on to the next item without yielding.
120///
121/// The `chunker` is passed three arguments; the next item in the input stream (`T`), the current chunk (`Option<Chunk>`), and the current state (`State`).
122/// On each call, the chunker can update the current chunk and the current state.
123/// The chunker must return the updated chunk and state values.
124///
125/// Once the input stream has exhausted, if the current chunk is not `None` it will be automatically yielded as the final value in the output stream.
126///
127/// In an `Err` is encountered in the input stream, it is passed through to the output stream while maintaining the current chunk and state.
128///
129/// The caller must provide initial values for both the chunk and state.
130///
131/// # Example
132///
133/// ```
134/// use cobalt_async::{try_apply_chunker, ChunkResult};
135/// use futures::{stream, StreamExt, TryStreamExt};
136///
137/// /// Takes an input stream of numerical values and returns a stream of vectors of numbers,
138/// /// where the sum of each vector no more than ten.
139/// /// If a value greater than ten is encountered, it is yielded as a vector with a single value.
140/// async fn ten_chunker(
141/// val: u64,
142/// chunk: Option<Vec<u64>>,
143/// state: u64, // The current running sum
144/// ) -> ChunkResult<Vec<u64>, u64> {
145/// if state + val > 10 {
146/// // Yield the current chunk, and start a new chunk
147/// ChunkResult::Yield(Some(vec![val]), val, chunk.unwrap())
148/// } else {
149/// // Add the value to the current chunk, and update the sum
150/// let mut chunk = chunk.unwrap_or_default();
151/// chunk.push(val);
152/// ChunkResult::Continue(Some(chunk), state + val)
153/// }
154/// }
155///
156/// # tokio_test::block_on(async {
157///
158/// // A `TryStream` of values with no errors
159/// let stream = stream::iter(vec![Ok::<_,std::io::Error>(1), Ok(2), Ok(3), Ok(4), Ok(5), Ok(6)]);
160/// let groups = try_apply_chunker(ten_chunker, stream, None, 0)
161/// .try_collect::<Vec<_>>()
162/// .await
163/// .unwrap();
164/// assert_eq!(groups, vec![vec![1, 2, 3, 4], vec![5], vec![6]]);
165///
166/// // A `TryStream` of values with interleaved errors
167/// let stream = stream::iter(vec![
168/// Ok(1),
169/// Ok(2),
170/// Err("Error"),
171/// Ok(3),
172/// Ok(4),
173/// Ok(5),
174/// Err("Error"),
175/// Ok(6),
176/// ]);
177/// let groups = try_apply_chunker(ten_chunker, stream, None, 0)
178/// .into_stream()
179/// .collect::<Vec<_>>()
180/// .await;
181///
182/// // The resulting `TryStream` includes the errors
183/// assert!(groups[0].is_err());
184/// assert_eq!(*groups[1].as_ref().unwrap(), vec![1, 2, 3, 4]);
185/// assert!(groups[2].is_err());
186/// assert_eq!(*groups[3].as_ref().unwrap(), vec![5]);
187/// assert_eq!(*groups[4].as_ref().unwrap(), vec![6]);
188/// # })
189/// ```
190pub fn try_apply_chunker<T, E, Chunk, State, F, Fut>(
191 chunker: F,
192 stream: impl TryStream<Ok = T, Error = E> + Unpin,
193 initial_chunk: Option<Chunk>,
194 initial_state: State,
195) -> impl TryStream<Ok = Chunk, Error = E> + Unpin
196where
197 F: Fn(T, Option<Chunk>, State) -> Fut,
198 Fut: Future<Output = ChunkResult<Chunk, State>>,
199{
200 let complete = false;
201 Box::pin(stream::unfold(
202 (initial_chunk, initial_state, stream, chunker, complete),
203 |(mut current_chunk, mut current_state, mut stream, chunker, complete)| async move {
204 if complete {
205 return None;
206 }
207 loop {
208 match stream.try_next().await {
209 Err(e) => {
210 // An error in the input stream. Return an error, but maintain the current chunk
211 return Some((
212 Err(e),
213 (current_chunk, current_state, stream, chunker, complete),
214 ));
215 }
216 Ok(None) => {
217 // writing this as current_chunk.map(...) makes the logic less obvious
218 #[allow(clippy::manual_map)]
219 // The stream ran out, so check if we have a pending chunk
220 match current_chunk {
221 Some(chunk) => {
222 // If we do, yield the pending chunk, and set the "complete"
223 // flag to true, so that we can yield a final None next time around the loop.
224 return Some((
225 Ok(chunk),
226 (None, current_state, stream, chunker, true),
227 ));
228 }
229 // Otherwise simply return None.
230 None => return None,
231 }
232 }
233 Ok(Some(item)) => match chunker(item, current_chunk, current_state).await {
234 ChunkResult::Continue(chunk, state) => {
235 current_chunk = chunk;
236 current_state = state;
237 }
238 ChunkResult::Yield(chunk, state, complete_chunk) => {
239 return Some((
240 Ok(complete_chunk),
241 (chunk, state, stream, chunker, false),
242 ));
243 }
244 },
245 }
246 }
247 },
248 ))
249}
250
251#[cfg(test)]
252mod test_apply_chunker {
253
254 use super::*;
255 use futures::stream::empty;
256
257 /// Yields the values it recieves
258 async fn identity_chunker<T>(item: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
259 ChunkResult::Yield(None, (), item)
260 }
261
262 #[tokio::test]
263 async fn test_identity() {
264 // An empty stream yields an empty stream
265 let x = apply_chunker(identity_chunker, empty::<()>(), None, ())
266 .collect::<Vec<_>>()
267 .await;
268 assert_eq!(x, vec![]);
269
270 // A stream with values yields the same values
271 let stream = stream::iter(vec![1, 2, 3, 4]);
272 let x = apply_chunker(identity_chunker, stream, None, ())
273 .collect::<Vec<_>>()
274 .await;
275 assert_eq!(x, vec![1, 2, 3, 4]);
276 }
277
278 /// Never yields any values
279 async fn null_chunker<T>(_: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
280 ChunkResult::Continue(None, ())
281 }
282
283 #[tokio::test]
284 async fn test_null() {
285 // An empty stream yields an empty stream
286 let x = apply_chunker(null_chunker, empty::<()>(), None, ())
287 .collect::<Vec<_>>()
288 .await;
289 assert_eq!(x, vec![]);
290
291 // A stream with values yields an empty stream
292 let stream = stream::iter(vec![1, 2, 3, 4]);
293 let x = apply_chunker(null_chunker, stream, None, ())
294 .collect::<Vec<_>>()
295 .await;
296 assert_eq!(x, vec![]);
297 }
298
299 /// Yields pairs of values. If there are an odd number of values, the un-paired value is dropped.
300 async fn pair_chunker<T>(
301 item: T,
302 _: Option<(T, T)>,
303 state: Option<T>,
304 ) -> ChunkResult<(T, T), Option<T>> {
305 match state {
306 Some(first) => ChunkResult::Yield(None, None, (first, item)),
307 None => ChunkResult::Continue(None, Some(item)),
308 }
309 }
310
311 #[tokio::test]
312 async fn test_pairs() {
313 // An empty stream yields an empty stream
314 let x = apply_chunker(pair_chunker, empty::<()>(), None, None)
315 .collect::<Vec<_>>()
316 .await;
317 assert_eq!(x, vec![]);
318
319 // A stream with values yields pairs, but drops the last value
320 let stream = stream::iter(vec![1, 2, 3, 4, 5]);
321 let x = apply_chunker(pair_chunker, stream, None, None)
322 .collect::<Vec<_>>()
323 .await;
324 assert_eq!(x, vec![(1, 2), (3, 4)]);
325 }
326
327 /// Yields vectors of values with a sum less than ten (unless the value itself)
328 /// is greater than 10, in which case we return a vec with just that value.
329 async fn ten_chunker(
330 item: u64,
331 chunk: Option<Vec<u64>>,
332 state: u64,
333 ) -> ChunkResult<Vec<u64>, u64> {
334 if state + item > 10 {
335 ChunkResult::Yield(Some(vec![item]), item, chunk.unwrap())
336 } else {
337 let mut chunk = chunk.unwrap_or_default();
338 chunk.push(item);
339 ChunkResult::Continue(Some(chunk), state + item)
340 }
341 }
342
343 #[tokio::test]
344 async fn test_ten_chunker() {
345 // An empty stream yields an empty stream
346 let x = apply_chunker(ten_chunker, empty::<u64>(), None, 0)
347 .collect::<Vec<Vec<u64>>>()
348 .await;
349 let y: Vec<Vec<_>> = vec![];
350 assert_eq!(x, y);
351
352 // A stream with values yields pairs, but drops the last value
353 let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 3, 13, 4, 5]);
354 let x = apply_chunker(ten_chunker, stream, None, 0)
355 .collect::<Vec<_>>()
356 .await;
357 assert_eq!(
358 x,
359 vec![vec![1, 2, 3, 4], vec![5], vec![6, 3], vec![13], vec![4, 5]]
360 );
361 }
362}
363
364#[cfg(test)]
365mod test_try_apply_chunker {
366
367 use super::*;
368 use anyhow::Result;
369 use futures::stream::empty;
370
371 /// Yields the values it recieves
372 async fn identity_chunker<T>(item: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
373 ChunkResult::Yield(None, (), item)
374 }
375
376 #[tokio::test]
377 async fn test_identity() -> Result<()> {
378 // An empty stream yields an empty stream
379 let x = try_apply_chunker(identity_chunker, empty::<Result<()>>(), None, ())
380 .try_collect::<Vec<_>>()
381 .await?;
382 assert_eq!(x, vec![]);
383
384 // A stream with values yields the same values
385 let stream = stream::iter(vec![Ok::<_, anyhow::Error>(1), Ok(2), Ok(3), Ok(4)]);
386 let x = try_apply_chunker(identity_chunker, stream, None, ())
387 .try_collect::<Vec<_>>()
388 .await?;
389 assert_eq!(x, vec![1, 2, 3, 4]);
390
391 // A stream with values and errors yields the same values and errors
392 let stream = stream::iter(vec![
393 Ok(1),
394 Err(anyhow::anyhow!("ERR")),
395 Ok(2),
396 Ok(3),
397 Ok(4),
398 Err(anyhow::anyhow!("ERR")),
399 ]);
400 let x = try_apply_chunker(identity_chunker, stream, None, ())
401 .into_stream()
402 .collect::<Vec<_>>()
403 .await;
404 assert_eq!(*x[0].as_ref().unwrap(), 1);
405 assert!(x[1].is_err());
406 assert_eq!(*x[2].as_ref().unwrap(), 2);
407 assert_eq!(*x[3].as_ref().unwrap(), 3);
408 assert_eq!(*x[4].as_ref().unwrap(), 4);
409 assert!(x[5].is_err());
410
411 Ok(())
412 }
413
414 /// Never yields any values
415 async fn null_chunker<T>(_: T, _: Option<T>, _: ()) -> ChunkResult<T, ()> {
416 ChunkResult::Continue(None, ())
417 }
418
419 #[tokio::test]
420 async fn test_null() -> Result<()> {
421 // An empty stream yields an empty stream
422 let x = try_apply_chunker(null_chunker, empty::<Result<()>>(), None, ())
423 .try_collect::<Vec<_>>()
424 .await?;
425 assert_eq!(x, vec![]);
426
427 // A stream with values yields an empty stream
428 let stream = stream::iter(vec![Ok::<_, anyhow::Error>(1), Ok(2), Ok(3), Ok(4)]);
429 let x = try_apply_chunker(null_chunker, stream, None, ())
430 .try_collect::<Vec<_>>()
431 .await?;
432 assert_eq!(x, vec![]);
433
434 // A stream with values and errors yields just the errors
435 let stream = stream::iter(vec![
436 Ok(1),
437 Err(anyhow::anyhow!("ERR")),
438 Ok(2),
439 Ok(3),
440 Ok(4),
441 Err(anyhow::anyhow!("ERR")),
442 ]);
443 let x = try_apply_chunker(null_chunker, stream, None, ())
444 .into_stream()
445 .collect::<Vec<_>>()
446 .await;
447 assert!(x[0].is_err());
448 assert!(x[1].is_err());
449
450 Ok(())
451 }
452
453 /// Yields pairs of values. If there are an odd number of values, the un-paired value is dropped.
454 async fn pair_chunker<T>(
455 item: T,
456 _: Option<(T, T)>,
457 state: Option<T>,
458 ) -> ChunkResult<(T, T), Option<T>> {
459 match state {
460 Some(first) => ChunkResult::Yield(None, None, (first, item)),
461 None => ChunkResult::Continue(None, Some(item)),
462 }
463 }
464
465 #[tokio::test]
466 async fn test_pairs() -> Result<()> {
467 // An empty stream yields an empty stream
468 let x = try_apply_chunker(pair_chunker, empty::<Result<()>>(), None, None)
469 .try_collect::<Vec<_>>()
470 .await?;
471 assert_eq!(x, vec![]);
472
473 // A stream with values yields pairs, but drops the last value
474 let stream = stream::iter(vec![Ok::<_, anyhow::Error>(1), Ok(2), Ok(3), Ok(4), Ok(5)]);
475 let x = try_apply_chunker(pair_chunker, stream, None, None)
476 .try_collect::<Vec<_>>()
477 .await?;
478 assert_eq!(x, vec![(1, 2), (3, 4)]);
479
480 // A stream with values and errors yields pairs, and interleaves the errors
481 let stream = stream::iter(vec![
482 Ok(1),
483 Err(anyhow::anyhow!("ERR")),
484 Ok(2),
485 Ok(3),
486 Ok(4),
487 Err(anyhow::anyhow!("ERR")),
488 ]);
489 let x = try_apply_chunker(pair_chunker, stream, None, None)
490 .into_stream()
491 .collect::<Vec<_>>()
492 .await;
493 assert!(x[0].is_err());
494 assert_eq!(*x[1].as_ref().unwrap(), (1, 2));
495 assert_eq!(*x[2].as_ref().unwrap(), (3, 4));
496 assert!(x[3].is_err());
497
498 Ok(())
499 }
500
501 /// Yields vectors of values with a sum less than ten (unless the value itself)
502 /// is greater than 10, in which case we return a vec with just that value.
503 async fn ten_chunker(
504 item: u64,
505 chunk: Option<Vec<u64>>,
506 state: u64,
507 ) -> ChunkResult<Vec<u64>, u64> {
508 if state + item > 10 {
509 ChunkResult::Yield(Some(vec![item]), item, chunk.unwrap())
510 } else {
511 let mut chunk = chunk.unwrap_or_default();
512 chunk.push(item);
513 ChunkResult::Continue(Some(chunk), state + item)
514 }
515 }
516
517 #[tokio::test]
518 async fn test_ten_chunker() -> Result<()> {
519 // An empty stream yields an empty stream
520 let x = try_apply_chunker(ten_chunker, empty::<Result<u64>>(), None, 0)
521 .try_collect::<Vec<Vec<u64>>>()
522 .await?;
523 let y: Vec<Vec<_>> = vec![];
524 assert_eq!(x, y);
525
526 // A stream with values yields groups which add up to ten
527 let stream = stream::iter(vec![
528 Ok::<_, anyhow::Error>(1),
529 Ok(2),
530 Ok(3),
531 Ok(4),
532 Ok(5),
533 Ok(6),
534 Ok(3),
535 Ok(13),
536 Ok(4),
537 Ok(5),
538 ]);
539 let x = try_apply_chunker(ten_chunker, stream, None, 0)
540 .try_collect::<Vec<_>>()
541 .await?;
542 assert_eq!(
543 x,
544 vec![vec![1, 2, 3, 4], vec![5], vec![6, 3], vec![13], vec![4, 5],]
545 );
546
547 // A stream with values and errors yields groups which add up to ten, with interleaved errors
548 let stream = stream::iter(vec![
549 Ok::<_, anyhow::Error>(1),
550 Ok(2),
551 Err(anyhow::anyhow!("ERR")),
552 Ok(3),
553 Ok(4),
554 Ok(5),
555 Ok(6),
556 Err(anyhow::anyhow!("ERR")),
557 Ok(3),
558 Ok(13),
559 Ok(4),
560 Ok(5),
561 ]);
562 let x = try_apply_chunker(ten_chunker, stream, None, 0)
563 .into_stream()
564 .collect::<Vec<_>>()
565 .await;
566 assert!(x[0].is_err());
567 assert_eq!(*x[1].as_ref().unwrap(), vec![1, 2, 3, 4]);
568 assert_eq!(*x[2].as_ref().unwrap(), vec![5]);
569 assert!(x[3].is_err());
570 assert_eq!(*x[4].as_ref().unwrap(), vec![6, 3]);
571 assert_eq!(*x[5].as_ref().unwrap(), vec![13]);
572 assert_eq!(*x[6].as_ref().unwrap(), vec![4, 5]);
573
574 Ok(())
575 }
576}