pub fn apply_chunker<T, Chunk, State, F, Fut>(
chunker: F,
stream: impl Stream<Item = T> + Unpin,
initial_chunk: Option<Chunk>,
initial_state: State,
) -> impl Stream<Item = Chunk> + UnpinExpand description
Converts a Stream of type T to a Stream of type Chunk by applying the function chunker
to each item in the stream in turn.
On each call chunker can either return ChunkResult::Yield to yield a chunk to the output stream,
or ChunkResult::Continue to continue on to the next item without yielding.
The chunker is passed three arguments; the next item in the input stream (T), the current chunk (Option<Chunk>), and the current state (State).
On each call, the chunker can update the current chunk and the current state.
The chunker must return the updated chunk and state values.
Once the input stream has exhausted, if the current chunk is not None it will be automatically yielded as the final value in the output stream.
The caller must provide initial values for both the chunk and state.
ยงExample
use cobalt_async::{apply_chunker, ChunkResult};
use futures::{stream, StreamExt};
/// Takes an input stream of numerical values and returns a stream of vectors of numbers,
/// where the sum of each vector no more than ten.
/// If a value greater than ten is encountered, it is yielded as a vector with a single value.
async fn ten_chunker(
val: u64,
chunk: Option<Vec<u64>>,
state: u64, // The current running sum
) -> ChunkResult<Vec<u64>, u64> {
if state + val > 10 {
// Yield the current chunk, and start a new chunk
ChunkResult::Yield(Some(vec![val]), val, chunk.unwrap())
} else {
// Add the value to the current chunk, and update the sum
let mut chunk = chunk.unwrap_or_default();
chunk.push(val);
ChunkResult::Continue(Some(chunk), state + val)
}
}
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 3, 13, 4, 5]);
let groups = apply_chunker(ten_chunker, stream, None, 0).collect::<Vec<_>>().await;
assert_eq!(groups, vec![vec![1, 2, 3, 4], vec![5], vec![6, 3], vec![13], vec![4, 5]]);