cipherstash-client 0.34.1-alpha.1

The official CipherStash SDK
Documentation
use futures::StreamExt;
use std::future::Future;

/**
 * Chunk an input slice and run an async callback on each of the chunks.
 * The futures generated by that callback are run concurrently with their results returned in a
 * vector.
 */
pub async fn map_async_chunked<
    'a,
    T: Send + Sync,
    U,
    E,
    F: Future<Output = Result<Vec<U>, E>>,
    C: Send + Sync + FnMut(&'a [T]) -> F,
>(
    input: &'a [T],
    callback: C,
    chunk_size: usize,
    concurrent_futs: usize,
) -> Result<Vec<U>, E> {
    let mut output = Vec::with_capacity(input.len());

    let mut stream = futures::stream::iter(input.chunks(chunk_size).map(callback))
        .boxed()
        .buffered(concurrent_futs);

    while let Some(result) = stream.next().await {
        output.append(&mut result?);
    }

    Ok(output)
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::*;

    #[tokio::test]
    async fn test_keeps_the_same_order() {
        let input = vec![200, 100, 50, 25, 20, 15, 10, 5, 4, 3, 2, 1];

        let output = map_async_chunked(
            &input,
            |x| async {
                tokio::time::sleep(Duration::from_millis(input[0])).await;
                Result::<_, ()>::Ok(x.to_vec())
            },
            2,
            10,
        )
        .await
        .unwrap();

        assert_eq!(input, output);
    }

    #[tokio::test]
    async fn test_works_when_chunks_dont_divide_nicely() {
        let input = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

        let output = map_async_chunked(&input, |x| async { Result::<_, ()>::Ok(x.to_vec()) }, 3, 1)
            .await
            .unwrap();

        assert_eq!(input, output);
    }
}