#![allow(missing_docs)]
#![allow(clippy::doc_markdown, clippy::cast_possible_wrap)]
use core::time::Duration;
use taktora_bounded_alloc::CountingAllocator;
use taktora_executor::{ControlFlow, Executor, item, item_with_triggers};
#[global_allocator]
static ALLOC: CountingAllocator = CountingAllocator::new();
fn count_allocs<R>(f: impl FnOnce() -> R) -> (usize, R) {
ALLOC.reset();
ALLOC.set_tracking(true);
let r = f();
ALLOC.set_tracking(false);
(ALLOC.alloc_count(), r)
}
fn trivial_chain() -> Vec<Box<dyn taktora_executor::ExecutableItem>> {
let head = item_with_triggers(
|d| {
d.interval(Duration::from_millis(1));
Ok(())
},
|_| Ok(ControlFlow::Continue),
);
let mid = item(|_| Ok(ControlFlow::Continue));
let tail = item(|_| Ok(ControlFlow::Continue));
vec![Box::new(head), Box::new(mid), Box::new(tail)]
}
const ITERS_BIG: usize = 100;
const ITERS_SMALL: usize = 10;
fn per_iter_allocs(exec: &mut Executor) -> i64 {
exec.run_n(ITERS_SMALL).unwrap();
let (a_small, ()) = count_allocs(|| exec.run_n(ITERS_SMALL).unwrap());
let (a_big, ()) = count_allocs(|| exec.run_n(ITERS_BIG).unwrap());
let diff = a_big as i64 - a_small as i64;
let iters = (ITERS_BIG - ITERS_SMALL) as i64;
(diff + iters - 1) / iters
}
#[test]
fn dispatch_is_zero_allocation() {
{
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
exec.add_chain(trivial_chain()).unwrap();
let per_iter = per_iter_allocs(&mut exec);
assert_eq!(
per_iter, 0,
"REQ_0060 violated: ~{per_iter} steady-state allocations per iteration (single-threaded chain)"
);
}
{
let mut exec = Executor::builder().worker_threads(2).build().unwrap();
exec.add_chain(trivial_chain()).unwrap();
let per_iter = per_iter_allocs(&mut exec);
assert_eq!(
per_iter, 0,
"REQ_0060 violated: ~{per_iter} steady-state allocations per iteration (2 worker threads, chain)"
);
}
{
let mut exec = Executor::builder().worker_threads(2).build().unwrap();
let mut g = exec.add_graph();
let r = g.vertex(item_with_triggers(
|d| {
d.interval(Duration::from_millis(1));
Ok(())
},
|_| Ok(ControlFlow::Continue),
));
let l = g.vertex(item(|_| Ok(ControlFlow::Continue)));
let rt = g.vertex(item(|_| Ok(ControlFlow::Continue)));
let m = g.vertex(item(|_| Ok(ControlFlow::Continue)));
g.edge(r, l).edge(r, rt).edge(l, m).edge(rt, m).root(r);
g.build().unwrap();
let per_iter = per_iter_allocs(&mut exec);
assert_eq!(
per_iter, 0,
"REQ_0060 violated: ~{per_iter} steady-state allocations per iteration (graph diamond, 2 workers)"
);
}
{
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let it = item_with_triggers(
|d| {
d.interval(Duration::from_millis(1));
Ok(())
},
|_| Ok(ControlFlow::Continue),
);
exec.add(it).unwrap();
let per_iter = per_iter_allocs(&mut exec);
assert_eq!(
per_iter, 0,
"REQ_0060 violated: ~{per_iter} steady-state allocations per iteration (single-threaded, Single task)"
);
}
{
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let head = item_with_triggers(
|d| {
d.interval(Duration::from_millis(1));
Ok(())
},
|_| {
let v: Vec<u8> = vec![1, 2, 3];
core::hint::black_box(&v);
Ok(ControlFlow::Continue)
},
);
exec.add(head).unwrap();
exec.run_n(1).unwrap();
let (allocs, ()) = count_allocs(|| exec.run_n(10).unwrap());
assert!(
allocs >= 10,
"harness regression: counting allocator did not catch deliberate vec! allocations (saw {allocs})"
);
}
}