use benjamin_batchly::{BatchMutex, BatchResult};
use std::{
mem,
time::{Duration, Instant},
};
#[tokio::test]
async fn batch_all_ok() {
let batcher = BatchMutex::default();
let batch_key = 123;
let (a, b, c, d, e) = tokio::join!(
handle_batch_in_100ms(&batcher, batch_key, Item('a')),
handle_batch_in_100ms(&batcher, batch_key, Item('b')),
handle_batch_in_100ms(&batcher, batch_key, Item('c')),
handle_batch_in_100ms(&batcher, batch_key, Item('d')),
handle_batch_in_100ms(&batcher, batch_key, Item('e')),
);
assert_eq!(a.0, HandleResult::DidWork(vec![Item('a')]));
assert_eq!(
b.0,
HandleResult::DidWork(vec![Item('b'), Item('c'), Item('d'), Item('e')])
);
assert!(b.1 >= a.1);
assert_eq!(c.0, HandleResult::Done(()));
assert!(c.1 >= b.1);
assert_eq!(d.0, HandleResult::Done(()));
assert!(d.1 >= b.1);
assert_eq!(e.0, HandleResult::Done(()));
assert!(e.1 >= b.1);
}
#[tokio::test]
async fn batch_all_ok_pull_waiting_items() {
let batcher = BatchMutex::default();
let batch_key = 123;
let (a, b, c, d, e) = tokio::join!(
lock_30ms_pull_waiting_items_and_handle_in_50ms(&batcher, batch_key, Item('a')),
lock_30ms_pull_waiting_items_and_handle_in_50ms(&batcher, batch_key, Item('b')),
lock_30ms_pull_waiting_items_and_handle_in_50ms(&batcher, batch_key, Item('c')),
lock_30ms_pull_waiting_items_and_handle_in_50ms(&batcher, batch_key, Item('d')),
lock_30ms_pull_waiting_items_and_handle_in_50ms(&batcher, batch_key, Item('e')),
);
assert_eq!(
a.0,
HandleResult::DidWork(vec![Item('a'), Item('b'), Item('c'), Item('d'), Item('e')])
);
assert_eq!(b.0, HandleResult::Done(()));
assert!(b.1 >= a.1);
assert_eq!(c.0, HandleResult::Done(()));
assert!(c.1 >= b.1);
assert_eq!(d.0, HandleResult::Done(()));
assert!(d.1 >= b.1);
assert_eq!(e.0, HandleResult::Done(()));
assert!(e.1 >= b.1);
}
#[tokio::test]
async fn batch_different_keys() {
let batcher = BatchMutex::default();
let (a, b, c, d, e) = tokio::join!(
handle_batch_in_100ms(&batcher, 45, Item('a')),
handle_batch_in_100ms(&batcher, 56, Item('b')),
handle_batch_in_100ms(&batcher, 67, Item('c')),
handle_batch_in_100ms(&batcher, 78, Item('d')),
handle_batch_in_100ms(&batcher, 89, Item('e')),
);
assert_eq!(a.0, HandleResult::DidWork(vec![Item('a')]));
assert_eq!(b.0, HandleResult::DidWork(vec![Item('b')]));
assert_eq!(c.0, HandleResult::DidWork(vec![Item('c')]));
assert_eq!(d.0, HandleResult::DidWork(vec![Item('d')]));
assert_eq!(e.0, HandleResult::DidWork(vec![Item('e')]));
}
#[tokio::test]
async fn batch_cancelled() {
let batcher = BatchMutex::default();
let batch_key = 123;
let (a, b, c, d, e) = tokio::join!(
handle_batch_in_100ms(&batcher, batch_key, Item('a')),
tokio::time::timeout(
Duration::from_millis(150),
handle_batch_in_100ms(&batcher, batch_key, Item('b'))
),
handle_batch_in_100ms(&batcher, batch_key, Item('c')),
handle_batch_in_100ms(&batcher, batch_key, Item('d')),
handle_batch_in_100ms(&batcher, batch_key, Item('e')),
);
assert_eq!(a.0, HandleResult::DidWork(vec![Item('a')]));
assert!(b.is_err());
assert_eq!(c.0, HandleResult::Failed);
assert_eq!(d.0, HandleResult::Failed);
assert_eq!(e.0, HandleResult::Failed);
}
#[tokio::test]
async fn batch_return_value() {
let batcher: BatchMutex<_, _, Result<u32, NotEven>> = BatchMutex::default();
let batch_key = 123;
let (a, b, c, d, e) = tokio::join!(
handle_convert_to_u32_if_even(&batcher, batch_key, Item('a')),
handle_convert_to_u32_if_even(&batcher, batch_key, Item('b')),
handle_convert_to_u32_if_even(&batcher, batch_key, Item('c')),
handle_convert_to_u32_if_even(&batcher, batch_key, Item('d')),
handle_convert_to_u32_if_even(&batcher, batch_key, Item('e')),
);
assert!(matches!(a, ToU32Result::DidWork(Err(NotEven(97)))), "{a:?}");
assert!(matches!(b, ToU32Result::DidWork(Ok(98))), "{b:?}");
assert!(matches!(c, ToU32Result::Done(Err(NotEven(99)))), "{c:?}");
assert!(matches!(d, ToU32Result::Done(Ok(100))), "{d:?}");
assert!(matches!(e, ToU32Result::Done(Err(NotEven(101)))), "{e:?}");
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Item(char);
async fn handle_batch_in_100ms(
batcher: &BatchMutex<i32, Item>,
key: i32,
item: Item,
) -> (HandleResult, Instant) {
match batcher.submit(key, item).await {
BatchResult::Done(_) => (HandleResult::Done(()), Instant::now()),
BatchResult::Failed => (HandleResult::Failed, Instant::now()),
BatchResult::Work(mut batch) => {
assert_eq!(batch.items[0], item);
tokio::time::sleep(Duration::from_millis(100)).await;
let finish = Instant::now();
batch.notify_all_done();
(HandleResult::DidWork(mem::take(&mut batch.items)), finish)
}
}
}
async fn lock_30ms_pull_waiting_items_and_handle_in_50ms(
batcher: &BatchMutex<i32, Item>,
key: i32,
item: Item,
) -> (HandleResult, Instant) {
match batcher.submit(key, item).await {
BatchResult::Done(_) => (HandleResult::Done(()), Instant::now()),
BatchResult::Failed => (HandleResult::Failed, Instant::now()),
BatchResult::Work(mut batch) => {
assert_eq!(batch.items[0], item);
tokio::time::sleep(Duration::from_millis(30)).await;
batch.pull_waiting_items();
tokio::time::sleep(Duration::from_millis(50)).await;
let finish = Instant::now();
batch.notify_all_done();
(HandleResult::DidWork(mem::take(&mut batch.items)), finish)
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum HandleResult<T = ()> {
Done(T),
Failed,
DidWork(Vec<Item>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct NotEven(u32);
async fn handle_convert_to_u32_if_even(
batcher: &BatchMutex<i32, Item, Result<u32, NotEven>>,
key: i32,
item: Item,
) -> ToU32Result<Result<u32, NotEven>> {
match batcher.submit(key, item).await {
BatchResult::Done(v) => ToU32Result::Done(v),
BatchResult::Failed => ToU32Result::Failed,
BatchResult::Work(mut batch) => {
let items = mem::take(&mut batch.items);
tokio::time::sleep(Duration::from_millis(100)).await;
for (idx, item) in items.iter().enumerate() {
let n = item.0 as u32;
if n % 2 == 0 {
batch.notify_done(idx, Ok(n));
} else {
batch.notify_done(idx, Err(NotEven(n)));
}
}
match batch.recv_local_notify_done() {
Some(v) => ToU32Result::DidWork(v),
None => ToU32Result::RecvLocalErr,
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum ToU32Result<T> {
Done(T),
Failed,
DidWork(T),
RecvLocalErr,
}