use std::{
collections::HashMap,
fmt::Debug,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use assert_matches::assert_matches;
use batch_aint_one::{BatchError, Batcher, BatchingPolicy, Limits, Processor};
use futures::future::join_all;
use rstest::rstest;
use tokio::sync::Mutex;
mod locking;
mod variable_timing;
#[derive(Debug, Clone)]
pub struct ResourceAcquiringProcessor {
fail: bool,
acquisition_dur: Duration,
processing_dur: Duration,
resource_count: Arc<AtomicUsize>,
batches: Arc<Mutex<HashMap<String, Vec<usize>>>>,
}
impl ResourceAcquiringProcessor {
fn new(acquisition_dur: Duration, processing_dur: Duration) -> Self {
Self {
fail: false,
acquisition_dur,
processing_dur,
resource_count: Arc::new(AtomicUsize::new(0)),
batches: Arc::new(Mutex::new(HashMap::new())),
}
}
fn with_failure(mut self) -> Self {
self.fail = true;
self
}
}
impl Processor for ResourceAcquiringProcessor {
type Key = String;
type Input = String;
type Output = String;
type Error = String;
type Resources = String;
async fn acquire_resources(&self, key: String) -> Result<String, String> {
tokio::time::sleep(self.acquisition_dur).await;
let count = self.resource_count.fetch_add(1, Ordering::SeqCst);
if self.fail {
return Err("Failed to acquire resources - ".to_string()
+ &key
+ "_"
+ &count.to_string());
}
Ok(key + "_" + &count.to_string())
}
async fn process(
&self,
key: String,
inputs: impl Iterator<Item = String> + Send,
resources: String,
) -> Result<Vec<String>, String> {
tokio::time::sleep(self.processing_dur).await;
let outputs: Vec<String> = inputs
.map(|s| {
"Item ".to_string()
+ &s
+ " processed for "
+ &key
+ " with resources "
+ &resources
})
.collect();
let mut batches = self.batches.lock().await;
batches.entry(key.clone()).or_default().push(outputs.len());
Ok(outputs)
}
}
#[tokio::test]
#[rstest]
#[timeout(Duration::from_secs(5))]
async fn immediate_batches_while_acquiring() {
tokio::time::pause();
let acquisition_dur = Duration::from_millis(100);
let processing_dur = Duration::from_millis(5);
let processor = ResourceAcquiringProcessor::new(acquisition_dur, processing_dur);
let batcher = Batcher::builder()
.name("immediate_batches_while_acquiring")
.processor(processor.clone())
.limits(
Limits::builder()
.max_batch_size(10)
.max_key_concurrency(2)
.build(),
)
.batching_policy(BatchingPolicy::Immediate)
.build();
let handler = |i: i32| {
let f = batcher.add("key".to_string(), i.to_string());
async move { f.await.unwrap() }
};
let mut tasks = vec![];
for i in 1..=20 {
tasks.push(tokio_test::task::spawn(handler(i)));
}
let outputs = join_all(tasks.into_iter()).await;
assert_eq!(
outputs.first().unwrap(),
"Item 1 processed for key with resources key_0"
);
assert_eq!(
outputs.last().unwrap(),
"Item 20 processed for key with resources key_1"
);
let batches = processor.batches.lock().await;
let batch_sizes = batches.get("key").unwrap();
assert_eq!(batch_sizes.len(), 2);
assert_eq!(batch_sizes[0], 10);
assert_eq!(batch_sizes[1], 10);
}
#[tokio::test]
#[rstest]
#[timeout(Duration::from_secs(5))]
async fn size_when_acquisition_fails() {
tokio::time::pause();
let acquisition_dur = Duration::from_millis(100);
let processing_dur = Duration::from_millis(5);
let processor = ResourceAcquiringProcessor::new(acquisition_dur, processing_dur).with_failure();
let batcher = Batcher::builder()
.name("test_size_when_acquisition_fails")
.processor(processor.clone())
.limits(
Limits::builder()
.max_batch_size(10)
.max_key_concurrency(2)
.build(),
)
.batching_policy(BatchingPolicy::Size)
.build();
let handler = |i: i32| batcher.add("key".to_string(), i.to_string());
let mut tasks = vec![];
for i in 1..=20 {
tasks.push(tokio_test::task::spawn(handler(i)));
}
let outputs = join_all(tasks.into_iter()).await;
assert_matches!(
outputs.first(),
Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
assert_eq!(s, "Failed to acquire resources - key_0");
}
);
assert_matches!(
outputs.last(),
Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
assert_eq!(s, "Failed to acquire resources - key_1");
}
);
}
#[tokio::test]
#[rstest]
#[timeout(Duration::from_secs(5))]
async fn immediate_when_acquisition_fails() {
tokio::time::pause();
let acquisition_dur = Duration::from_millis(100);
let processing_dur = Duration::from_millis(5);
let processor = ResourceAcquiringProcessor::new(acquisition_dur, processing_dur).with_failure();
let batcher = Batcher::builder()
.name("immediate_when_acquisition_fails")
.processor(processor.clone())
.limits(
Limits::builder()
.max_batch_size(10)
.max_key_concurrency(2)
.build(),
)
.batching_policy(BatchingPolicy::Immediate)
.build();
let handler = |i: i32| batcher.add("key".to_string(), i.to_string());
let mut tasks = vec![];
for i in 1..=20 {
tasks.push(tokio_test::task::spawn(handler(i)));
}
let outputs = join_all(tasks.into_iter()).await;
assert_matches!(
outputs.first(),
Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
assert_eq!(s, "Failed to acquire resources - key_0");
}
);
assert_matches!(
outputs.last(),
Some(Err(BatchError::ResourceAcquisitionFailed(s))) => {
assert_eq!(s, "Failed to acquire resources - key_1");
}
);
}