taktora-executor 0.1.2

Execution framework for iceoryx2-based Rust applications.
Documentation
#![allow(missing_docs)]

use core::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use taktora_executor::{ControlFlow, Executor, item, item_with_triggers};

#[test]
fn chain_runs_items_in_order() {
    let mut exec = Executor::builder().worker_threads(2).build().unwrap();
    let log = Arc::new(std::sync::Mutex::new(Vec::<u32>::new()));

    let l1 = Arc::clone(&log);
    let head = item_with_triggers(
        |d| {
            d.interval(Duration::from_millis(10));
            Ok(())
        },
        move |_| {
            l1.lock().unwrap().push(1);
            Ok(ControlFlow::Continue)
        },
    );

    let l2 = Arc::clone(&log);
    let mid = item(move |_| {
        l2.lock().unwrap().push(2);
        Ok(ControlFlow::Continue)
    });
    let l3 = Arc::clone(&log);
    let tail = item(move |_| {
        l3.lock().unwrap().push(3);
        Ok(ControlFlow::Continue)
    });

    let chain: Vec<Box<dyn taktora_executor::ExecutableItem>> =
        vec![Box::new(head), Box::new(mid), Box::new(tail)];
    exec.add_chain(chain).unwrap();

    exec.run_n(1).unwrap();
    let l = log.lock().unwrap().clone();
    assert_eq!(l, vec![1, 2, 3]);
}

#[test]
fn stop_chain_aborts_remaining_items() {
    let mut exec = Executor::builder().worker_threads(0).build().unwrap();
    let counter = Arc::new(AtomicU32::new(0));

    let c1 = Arc::clone(&counter);
    let head = item_with_triggers(
        |d| {
            d.interval(Duration::from_millis(10));
            Ok(())
        },
        move |_| {
            c1.fetch_add(1, Ordering::SeqCst);
            Ok(ControlFlow::StopChain)
        },
    );
    let c2 = Arc::clone(&counter);
    let tail = item(move |_| {
        c2.fetch_add(1, Ordering::SeqCst);
        Ok(ControlFlow::Continue)
    });

    let chain: Vec<Box<dyn taktora_executor::ExecutableItem>> =
        vec![Box::new(head), Box::new(tail)];
    exec.add_chain(chain).unwrap();

    exec.run_n(1).unwrap();
    assert_eq!(
        counter.load(Ordering::SeqCst),
        1,
        "tail must not run after StopChain"
    );
}

#[test]
fn err_in_middle_propagates_and_stops() {
    let mut exec = Executor::builder().worker_threads(0).build().unwrap();
    let head = item_with_triggers(
        |d| {
            d.interval(Duration::from_millis(10));
            Ok(())
        },
        |_| Ok(ControlFlow::Continue),
    );
    let mid = item(|_| Err(Box::new(std::io::Error::other("mid-err"))));
    let tail_seen = Arc::new(AtomicU32::new(0));
    let t = Arc::clone(&tail_seen);
    let tail = item(move |_| {
        t.fetch_add(1, Ordering::SeqCst);
        Ok(ControlFlow::Continue)
    });

    let chain: Vec<Box<dyn taktora_executor::ExecutableItem>> =
        vec![Box::new(head), Box::new(mid), Box::new(tail)];
    exec.add_chain(chain).unwrap();

    let err = exec.run_n(1).expect_err("expected chain error");
    assert!(format!("{err}").contains("mid-err"));
    assert_eq!(tail_seen.load(Ordering::SeqCst), 0);
}