use std::sync::{atomic::AtomicUsize, Arc, RwLock};
use ultra_batch::{BatchExecutor, ExecuteError, Executor};
mod db;
mod stubs;
#[tokio::test]
async fn test_execute() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let new_user = db::User::fake();
let batch_inserter = BatchExecutor::build(db::InsertUsers { db: db.clone() }).finish();
let result = batch_inserter.execute(new_user.clone()).await?;
assert_eq!(result, Some(Some(new_user.id)));
let db = db.read().unwrap();
assert!(db.users.contains_key(&new_user.id));
Ok(())
}
#[tokio::test]
async fn test_execute_returning_nothing() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let new_user = db::User::fake();
let batch_inserter = BatchExecutor::build(stubs::ExecutorReturnsEmpty(db::InsertUsers {
db: db.clone(),
}))
.finish();
let result = batch_inserter.execute(new_user.clone()).await?;
assert_eq!(result, None);
let db = db.read().unwrap();
assert!(db.users.contains_key(&new_user.id));
Ok(())
}
#[tokio::test]
async fn test_execute_many_with_one_element() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let new_user = db::User::fake();
let batch_inserter = BatchExecutor::build(db::InsertUsers { db: db.clone() }).finish();
let result = batch_inserter.execute_many(vec![new_user.clone()]).await?;
assert_eq!(result, [Some(new_user.id)]);
let db = db.read().unwrap();
assert!(db.users.contains_key(&new_user.id));
Ok(())
}
#[tokio::test]
async fn test_execute_many_with_one_element_returning_nothing() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let new_user = db::User::fake();
let batch_inserter = BatchExecutor::build(stubs::ExecutorReturnsEmpty(db::InsertUsers {
db: db.clone(),
}))
.finish();
let result = batch_inserter.execute_many(vec![new_user.clone()]).await?;
assert_eq!(result, []);
let db = db.read().unwrap();
assert!(db.users.contains_key(&new_user.id));
Ok(())
}
#[tokio::test]
async fn test_execute_many_ordering() -> anyhow::Result<()> {
let db = db::Database::fake();
let existing_user = db.users.values().next().cloned().unwrap();
let new_user_1 = db::User::fake();
let new_user_2 = db::User::fake();
let db = Arc::new(RwLock::new(db));
let inserts = vec![
existing_user.clone(),
new_user_1.clone(),
new_user_1.clone(),
new_user_2.clone(),
new_user_2.clone(),
];
let batch_executor = BatchExecutor::build(db::InsertUsers { db: db.clone() }).finish();
let results = batch_executor.execute_many(inserts).await?;
assert_eq!(
results,
[None, Some(new_user_1.id), None, Some(new_user_2.id), None]
);
Ok(())
}
#[tokio::test]
async fn test_execute_big_batch() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let inserts: Vec<_> = (0..1000).map(|_| db::User::fake()).collect();
let executor = stubs::ObserveExecutor::new(db::InsertUsers { db: db.clone() });
let batch_executor = BatchExecutor::build(executor.clone())
.eager_batch_size(Some(50))
.finish();
let results = batch_executor.execute_many(inserts).await?;
let num_results = results.into_iter().flatten().count();
assert_eq!(num_results, 1000);
assert_eq!(executor.total_calls(), 1);
Ok(())
}
#[tokio::test]
async fn test_execute_small_awaited_batches() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let executor = stubs::ObserveExecutor::new(db::InsertUsers { db: db.clone() });
let batch_executor = BatchExecutor::build(executor.clone())
.eager_batch_size(Some(50))
.finish();
let mut num_results = 0;
for _ in 0..50 {
let inserts: Vec<_> = (0..10).map(|_| db::User::fake()).collect();
let results = batch_executor.execute_many(inserts).await?;
num_results += results.into_iter().flatten().count();
}
assert_eq!(num_results, 500);
assert_eq!(executor.total_calls(), 50);
Ok(())
}
#[tokio::test]
async fn test_execute_merged_batches() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let executor = stubs::ObserveExecutor::new(db::InsertUsers { db: db.clone() });
let batch_executor = BatchExecutor::build(executor.clone())
.eager_batch_size(Some(50))
.finish();
let num_results = Arc::new(AtomicUsize::new(0));
let spawn_batch_executor = || {
let num_results = num_results.clone();
let batch_executor = batch_executor.clone();
let inserts: Vec<_> = (0..10).map(|_| db::User::fake()).collect();
async move {
let task =
tokio::spawn(async move { batch_executor.execute_many(inserts).await.unwrap() });
let results = task.await.unwrap();
num_results.fetch_add(
results.into_iter().flatten().count(),
std::sync::atomic::Ordering::SeqCst,
);
}
};
tokio::join![
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
];
assert_eq!(num_results.load(std::sync::atomic::Ordering::SeqCst), 120);
assert_eq!(executor.total_calls(), 3);
Ok(())
}
#[tokio::test]
async fn test_execute_merged_batches_returning_none() -> anyhow::Result<()> {
let db = db::Database::fake();
let db = Arc::new(RwLock::new(db));
let executor = stubs::ObserveExecutor::new(stubs::ExecutorReturnsEmpty(db::InsertUsers {
db: db.clone(),
}));
let batch_executor = BatchExecutor::build(executor.clone())
.eager_batch_size(Some(50))
.finish();
let num_results = Arc::new(AtomicUsize::new(0));
let spawn_batch_executor = || {
let num_results = num_results.clone();
let batch_executor = batch_executor.clone();
let inserts: Vec<_> = (0..10).map(|_| db::User::fake()).collect();
async move {
let task =
tokio::spawn(async move { batch_executor.execute_many(inserts).await.unwrap() });
let results = task.await.unwrap();
num_results.fetch_add(
results.into_iter().flatten().count(),
std::sync::atomic::Ordering::SeqCst,
);
}
};
tokio::join![
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
];
assert_eq!(num_results.load(std::sync::atomic::Ordering::SeqCst), 0);
assert_eq!(executor.total_calls(), 3);
Ok(())
}
#[tokio::test]
async fn test_execute_merged_batches_returning_error() -> anyhow::Result<()> {
struct ErrorExecutor;
impl Executor for ErrorExecutor {
type Value = db::User;
type Result = Option<uuid::Uuid>;
type Error = anyhow::Error;
async fn execute(
&self,
_values: Vec<Self::Value>,
) -> Result<Vec<Self::Result>, Self::Error> {
anyhow::bail!("uh oh");
}
}
let executor = stubs::ObserveExecutor::new(ErrorExecutor);
let batch_executor = BatchExecutor::build(executor.clone())
.eager_batch_size(Some(50))
.finish();
let results = Arc::new(RwLock::new(vec![]));
let spawn_batch_executor = || {
let results = results.clone();
let batch_executor = batch_executor.clone();
let inserts: Vec<_> = (0..10).map(|_| db::User::fake()).collect();
async move {
let task = tokio::spawn(async move { batch_executor.execute_many(inserts).await });
let task_results = task.await.unwrap();
let mut results = results.write().unwrap();
results.push(task_results);
}
};
tokio::join![
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
spawn_batch_executor(),
];
let results = results.read().unwrap();
assert_eq!(executor.total_calls(), 3);
assert_eq!(results.len(), 12);
for result in &*results {
assert!(matches!(result, Err(ExecuteError::ExecutorError(_))));
}
Ok(())
}