batch-aint-one 0.12.0

I got 99 problems, but a batch ain't one
Documentation
use std::{
    collections::HashMap,
    fmt::Debug,
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    time::Duration,
};

use assert_matches::assert_matches;
use batch_aint_one::{BatchError, Batcher, BatchingPolicy, Limits, Processor};
use futures::future::join_all;
use rstest::rstest;
use tokio::sync::Mutex;

mod locking;
mod variable_timing;

#[derive(Debug, Clone)]
pub struct ResourceAcquiringProcessor {
    fail: bool,

    acquisition_dur: Duration,
    processing_dur: Duration,

    resource_count: Arc<AtomicUsize>,
    batches: Arc<Mutex<HashMap<String, Vec<usize>>>>,
}

impl ResourceAcquiringProcessor {
    fn new(acquisition_dur: Duration, processing_dur: Duration) -> Self {
        Self {
            fail: false,

            acquisition_dur,
            processing_dur,
            resource_count: Arc::new(AtomicUsize::new(0)),
            batches: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    fn with_failure(mut self) -> Self {
        self.fail = true;
        self
    }
}

impl Processor for ResourceAcquiringProcessor {
    type Key = String;
    type Input = String;
    type Output = String;
    type Error = String;
    type Resources = String;

    async fn acquire_resources(&self, key: String) -> Result<String, String> {
        tokio::time::sleep(self.acquisition_dur).await;
        let count = self.resource_count.fetch_add(1, Ordering::SeqCst);
        if self.fail {
            return Err("Failed to acquire resources - ".to_string()
                + &key
                + "_"
                + &count.to_string());
        }
        Ok(key + "_" + &count.to_string())
    }

    async fn process(
        &self,
        key: String,
        inputs: impl Iterator<Item = String> + Send,
        resources: String,
    ) -> Result<Vec<String>, String> {
        tokio::time::sleep(self.processing_dur).await;

        let outputs: Vec<String> = inputs
            .map(|s| {
                "Item ".to_string()
                    + &s
                    + " processed for "
                    + &key
                    + " with resources "
                    + &resources
            })
            .collect();

        let mut batches = self.batches.lock().await;
        batches.entry(key.clone()).or_default().push(outputs.len());

        Ok(outputs)
    }
}

/// Given we acquire resources before processing
/// When we use an Immediate batching strategy
/// Then items should continue to be added to the batch while resources are being acquired
#[tokio::test]
#[rstest]
#[timeout(Duration::from_secs(5))]
async fn immediate_batches_while_acquiring() {
    tokio::time::pause();

    let acquisition_dur = Duration::from_millis(100);
    let processing_dur = Duration::from_millis(5);

    let processor = ResourceAcquiringProcessor::new(acquisition_dur, processing_dur);

    let batcher = Batcher::builder()
        .name("immediate_batches_while_acquiring")
        .processor(processor.clone())
        .limits(
            Limits::builder()
                .max_batch_size(10)
                .max_key_concurrency(2)
                .build(),
        )
        .batching_policy(BatchingPolicy::Immediate)
        .build();

    let handler = |i: i32| {
        let f = batcher.add("key".to_string(), i.to_string());
        async move { f.await.unwrap() }
    };

    let mut tasks = vec![];
    for i in 1..=20 {
        tasks.push(tokio_test::task::spawn(handler(i)));
    }

    let outputs = join_all(tasks.into_iter()).await;

    assert_eq!(
        outputs.first().unwrap(),
        "Item 1 processed for key with resources key_0"
    );
    assert_eq!(
        outputs.last().unwrap(),
        "Item 20 processed for key with resources key_1"
    );

    let batches = processor.batches.lock().await;
    let batch_sizes = batches.get("key").unwrap();
    assert_eq!(batch_sizes.len(), 2);
    assert_eq!(batch_sizes[0], 10);
    assert_eq!(batch_sizes[1], 10);
}

/// Given we acquire resources before processing
/// And resource acquisition fails consistently
/// When we use a Size batching strategy
/// Then all items should fail with a resource acquisition error
#[tokio::test]
#[rstest]
#[timeout(Duration::from_secs(5))]
async fn size_when_acquisition_fails() {
    tokio::time::pause();

    let acquisition_dur = Duration::from_millis(100);
    let processing_dur = Duration::from_millis(5);

    let processor = ResourceAcquiringProcessor::new(acquisition_dur, processing_dur).with_failure();

    let batcher = Batcher::builder()
        .name("test_size_when_acquisition_fails")
        .processor(processor.clone())
        .limits(
            Limits::builder()
                .max_batch_size(10)
                .max_key_concurrency(2)
                .build(),
        )
        .batching_policy(BatchingPolicy::Size)
        .build();

    let handler = |i: i32| batcher.add("key".to_string(), i.to_string());

    let mut tasks = vec![];
    for i in 1..=20 {
        tasks.push(tokio_test::task::spawn(handler(i)));
    }

    let outputs = join_all(tasks.into_iter()).await;

    assert_matches!(
        outputs.first(),
        Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
            assert_eq!(s, "Failed to acquire resources - key_0");
        }
    );
    assert_matches!(
        outputs.last(),
        Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
            assert_eq!(s, "Failed to acquire resources - key_1");
        }
    );
}

/// Given we acquire resources before processing
/// And resource acquisition fails consistently
/// When we use an Immediate batching strategy
/// Then all items should fail with a resource acquisition error
#[tokio::test]
#[rstest]
#[timeout(Duration::from_secs(5))]
async fn immediate_when_acquisition_fails() {
    tokio::time::pause();

    let acquisition_dur = Duration::from_millis(100);
    let processing_dur = Duration::from_millis(5);

    let processor = ResourceAcquiringProcessor::new(acquisition_dur, processing_dur).with_failure();

    let batcher = Batcher::builder()
        .name("immediate_when_acquisition_fails")
        .processor(processor.clone())
        .limits(
            Limits::builder()
                .max_batch_size(10)
                .max_key_concurrency(2)
                .build(),
        )
        .batching_policy(BatchingPolicy::Immediate)
        .build();

    let handler = |i: i32| batcher.add("key".to_string(), i.to_string());

    let mut tasks = vec![];
    for i in 1..=20 {
        tasks.push(tokio_test::task::spawn(handler(i)));
    }

    let outputs = join_all(tasks.into_iter()).await;

    assert_matches!(
        outputs.first(),
        Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
            assert_eq!(s, "Failed to acquire resources - key_0");
        }
    );
    assert_matches!(
        outputs.last(),
        Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
            assert_eq!(s, "Failed to acquire resources - key_1");
        }
    );
}