taktora-executor 0.1.2

Execution framework for iceoryx2-based Rust applications.
Documentation
#![allow(missing_docs)]
use iceoryx2::prelude::*;
use std::sync::Arc;
use taktora_executor::Channel;

static SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);

fn unique(prefix: &str) -> String {
    let n = SEQ.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
    format!("{prefix}.{}.{n}", std::process::id())
}

#[derive(Debug, Default, Clone, Copy, ZeroCopySend)]
#[repr(C)]
struct Msg(u64);

#[test]
fn publisher_send_notifies_subscriber_listener() {
    let node = NodeBuilder::new().create::<ipc::Service>().unwrap();

    let channel: Arc<Channel<Msg>> =
        Channel::open_or_create(&node, &unique("taktora.test.chan")).unwrap();

    let publisher = channel.publisher().unwrap();
    let subscriber = channel.subscriber().unwrap();

    let outcome = publisher.send_copy(Msg(42)).unwrap();
    assert!(outcome.sent);
    assert_eq!(outcome.listeners_notified, 1, "1 subscriber attached");

    // The subscriber's listener fires because Publisher::send notified.
    let listener = subscriber.listener_handle();
    let mut woke = 0_u32;
    while let Ok(Some(_)) = listener.try_wait_one() {
        woke += 1;
    }
    assert!(woke >= 1, "subscriber listener did not fire");

    let sample = subscriber.take().unwrap().expect("payload");
    assert_eq!(sample.payload().0, 42);
}

#[test]
fn opening_same_channel_twice_does_not_panic() {
    let node = NodeBuilder::new().create::<ipc::Service>().unwrap();
    let chan_name = unique("taktora.test.chan2");
    let _a: Arc<Channel<Msg>> = Channel::open_or_create(&node, &chan_name).unwrap();
    let _b: Arc<Channel<Msg>> = Channel::open_or_create(&node, &chan_name).unwrap();
    // No assertion — the call must not panic and must not deadlock.
}

#[test]
fn publisher_loan_zero_copy_round_trip() {
    use core::mem::MaybeUninit;

    let node = NodeBuilder::new().create::<ipc::Service>().unwrap();
    let channel: Arc<Channel<Msg>> =
        Channel::open_or_create(&node, &unique("taktora.test.loan")).unwrap();

    let publisher = channel.publisher().unwrap();
    let subscriber = channel.subscriber().unwrap();

    let outcome = publisher
        .loan(|slot: &mut MaybeUninit<Msg>| {
            slot.write(Msg(99));
            true
        })
        .unwrap();
    assert!(
        outcome.delivered_to_any_listener(),
        "expected delivery to at least one listener"
    );

    let sample = subscriber.take().unwrap().expect("payload");
    assert_eq!(sample.payload().0, 99);
}

#[test]
fn publisher_loan_skip_returns_false() {
    use core::mem::MaybeUninit;

    let node = NodeBuilder::new().create::<ipc::Service>().unwrap();
    let channel: Arc<Channel<Msg>> =
        Channel::open_or_create(&node, &unique("taktora.test.loan_skip")).unwrap();

    let publisher = channel.publisher().unwrap();
    let subscriber = channel.subscriber().unwrap();

    let outcome = publisher
        .loan(|_slot: &mut MaybeUninit<Msg>| {
            // Closure decides not to send. Note: not initialising the slot is
            // safe here because we return false (no `assume_init` happens).
            false
        })
        .unwrap();
    assert!(!outcome.sent);
    assert_eq!(outcome.listeners_notified, 0);

    let received = subscriber.take().unwrap();
    assert!(received.is_none(), "no message should have been sent");
}