use ultra_batch::{BatchFetcher, Cache, Fetcher};
fn fib(n: usize) -> usize {
match n {
0 => 0,
1 => 1,
n => fib(n - 1).wrapping_add(fib(n - 2)),
}
}
#[derive(Debug, Clone)]
struct BigValue {
items: Vec<bool>,
}
struct SlowFetcher;
impl Fetcher for SlowFetcher {
type Key = usize;
type Value = BigValue;
type Error = anyhow::Error;
async fn fetch(
&self,
keys: &[usize],
values: &mut Cache<'_, usize, BigValue>,
) -> anyhow::Result<()> {
tokio::time::sleep(tokio::time::Duration::from_millis(15)).await;
let value_tasks = keys
.iter()
.copied()
.map(|key| {
tokio::task::spawn_blocking(move || {
let len = (fib(key % 25)) + 1;
let items = (0..len).map(|value| value % 2 == 0).collect::<Vec<_>>();
let value = BigValue { items };
(key, value)
})
})
.collect::<Vec<_>>();
for task in value_tasks {
let (key, value) = task.await?;
values.insert(key, value);
}
Ok(())
}
}
async fn concurrency_task() -> anyhow::Result<()> {
let batcher = BatchFetcher::build(SlowFetcher).finish();
let load_tasks = (0..2000)
.map(|n| {
let key = n / 3;
let batcher = batcher.clone();
tokio::spawn(async move {
let result = batcher.load(key).await?;
if !result.items.is_empty() {
Ok(())
} else {
Err(anyhow::anyhow!("length was 0"))
}
})
})
.collect::<Vec<_>>();
for load_task in load_tasks {
let () = load_task.await??;
}
Ok(())
}
#[test]
fn test_concurrency_basic_scheduler() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()?;
runtime.block_on(concurrency_task())
}
#[test]
fn test_concurrency_one_thread() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.build()?;
runtime.block_on(concurrency_task())
}
#[test]
fn test_concurrency_eight_threads() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(8)
.build()?;
runtime.block_on(concurrency_task())
}