#![allow(missing_docs)]
use core::time::Duration;
use iceoryx2::prelude::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use taktora_executor::{ControlFlow, Executor, ExecutorError, TriggerDeclarer, item_with_triggers};
static SEQ: AtomicU64 = AtomicU64::new(0);
fn unique(prefix: &str) -> String {
let n = SEQ.fetch_add(1, Ordering::Relaxed);
format!("{prefix}.{}.{n}", std::process::id())
}
#[test]
fn interval_trigger_fires_run_n_times() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let counter = Arc::new(AtomicU32::new(0));
let c = Arc::clone(&counter);
exec.add(item_with_triggers(
|d| {
d.interval(Duration::from_millis(20));
Ok(())
},
move |_| {
c.fetch_add(1, Ordering::SeqCst);
Ok(ControlFlow::Continue)
},
))
.unwrap();
exec.run_n(3).unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[test]
fn run_for_terminates_on_timeout() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let counter = Arc::new(AtomicU32::new(0));
let c = Arc::clone(&counter);
exec.add(item_with_triggers(
|d| {
d.interval(Duration::from_millis(50));
Ok(())
},
move |_| {
c.fetch_add(1, Ordering::SeqCst);
Ok(ControlFlow::Continue)
},
))
.unwrap();
exec.run_for(Duration::from_millis(120)).unwrap();
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test]
fn stoppable_terminates_run() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let stop = exec.stoppable();
exec.add(item_with_triggers(
|d| {
d.interval(Duration::from_millis(20));
Ok(())
},
move |ctx| {
ctx.stop_executor();
Ok(ControlFlow::Continue)
},
))
.unwrap();
exec.run().unwrap();
let _ = stop;
}
#[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
#[repr(C)]
struct Tick(u64);
#[test]
fn subscriber_trigger_dispatches_task() {
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let topic = unique("taktora.test.run.sub");
let ch = exec.channel::<Tick>(&topic).unwrap();
let publisher = ch.publisher().unwrap();
let subscriber = ch.subscriber().unwrap();
let counter = Arc::new(AtomicU32::new(0));
let c = Arc::clone(&counter);
let stop = exec.stoppable();
exec.add(item_with_triggers(
move |d| {
d.subscriber(&subscriber);
Ok(())
},
move |ctx| {
c.fetch_add(1, Ordering::SeqCst);
if c.load(Ordering::SeqCst) >= 3 {
ctx.stop_executor();
}
Ok(ControlFlow::Continue)
},
))
.unwrap();
std::thread::spawn(move || {
for i in 0..5 {
let _ = publisher.send_copy(Tick(i));
std::thread::sleep(Duration::from_millis(20));
}
});
exec.run().unwrap();
let _ = stop;
assert!(counter.load(Ordering::SeqCst) >= 3);
}
#[test]
fn threaded_pool_executes_items_correctly() {
let mut exec = Executor::builder().worker_threads(2).build().unwrap();
let counter = Arc::new(AtomicU32::new(0));
let c = Arc::clone(&counter);
exec.add(item_with_triggers(
|d| {
d.interval(Duration::from_millis(20));
Ok(())
},
move |_| {
c.fetch_add(1, Ordering::SeqCst);
Ok(ControlFlow::Continue)
},
))
.unwrap();
exec.run_n(5).unwrap();
assert_eq!(
counter.load(Ordering::SeqCst),
5,
"threaded pool should fire item exactly 5 times under run_n(5)"
);
}
#[test]
fn item_task_id_override_takes_precedence() {
use taktora_executor::{Context, ExecuteResult};
struct NamedItem;
impl taktora_executor::ExecutableItem for NamedItem {
fn declare_triggers(&mut self, d: &mut TriggerDeclarer<'_>) -> Result<(), ExecutorError> {
d.interval(Duration::from_millis(20));
Ok(())
}
fn execute(&mut self, ctx: &mut Context<'_>) -> ExecuteResult {
ctx.stop_executor();
Ok(ControlFlow::Continue)
}
fn task_id(&self) -> Option<&str> {
Some("custom-from-item")
}
}
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let id = exec.add_with_id("user-supplied-id", NamedItem).unwrap();
assert_eq!(
id.as_str(),
"custom-from-item",
"ExecutableItem::task_id() override should win over user-supplied id"
);
exec.run().unwrap();
}
#[cfg(unix)]
#[test]
#[allow(unsafe_code)]
fn run_n_survives_eintr_from_unrelated_signals() {
use std::sync::mpsc;
extern "C" fn noop(_: libc::c_int) {}
unsafe {
let mut sa: libc::sigaction = std::mem::zeroed();
sa.sa_sigaction = noop as *const () as usize;
libc::sigemptyset(&raw mut sa.sa_mask);
sa.sa_flags = 0;
assert_eq!(
libc::sigaction(libc::SIGUSR1, &raw const sa, std::ptr::null_mut()),
0,
"sigaction(SIGUSR1) failed"
);
}
let mut exec = Executor::builder().worker_threads(0).build().unwrap();
let counter = Arc::new(AtomicU32::new(0));
let c = Arc::clone(&counter);
exec.add(item_with_triggers(
|d| {
d.interval(Duration::from_millis(5));
Ok(())
},
move |_| {
c.fetch_add(1, Ordering::SeqCst);
Ok(ControlFlow::Continue)
},
))
.unwrap();
let (tid_tx, tid_rx) = mpsc::channel();
let dispatch = std::thread::spawn(move || {
let tid = unsafe { libc::pthread_self() };
#[allow(clippy::cast_possible_truncation)]
tid_tx.send(tid as usize).expect("send tid");
exec.run_n(50).map(|()| exec)
});
let tid = tid_rx.recv().expect("recv tid") as libc::pthread_t;
for _ in 0..25 {
std::thread::sleep(Duration::from_millis(10));
unsafe {
libc::pthread_kill(tid, libc::SIGUSR1);
}
}
let result = dispatch.join().expect("dispatch thread panicked");
assert!(result.is_ok(), "run_n failed: {:?}", result.err());
assert_eq!(
counter.load(Ordering::SeqCst),
50,
"EINTR must not truncate run_n: all 50 cycles must dispatch"
);
}