rns-embedded-runtime 0.5.1

Runtime support layer for embedded Reticulum transports and alloc-backed targets.
Documentation
#[cfg(test)]
mod tests {
    use alloc::vec;

    use super::{
        BleNodeBackendConfig, BroadcastOptions, EmbeddedNode, NodeBackendConfig, NodeConfig,
        NodeError, NodeEventKind, NodeLogLevel, NodeRunState, NodeTransportMode, PollResult,
        SendOptions,
    };
    use crate::{CaptureDefaults, RuntimeConfig};
    use rns_embedded_core::packet::decode_frame;
    use rns_embedded_core::transport::LinkState;

    #[cfg(feature = "std")]
    use std::{thread, time::Duration};

    fn config() -> NodeConfig {
        NodeConfig {
            runtime: RuntimeConfig {
                store_identity: [0x21; 32],
                lxmf_address: [0x42; 16],
                node_mode: NodeTransportMode::BleOnly,
                announce_interval_ms: 1_000,
                max_outbound_queue: 8,
                max_events: 16,
                capture_defaults: CaptureDefaults::default(),
            },
            backend: NodeBackendConfig::Ble(BleNodeBackendConfig::default()),
        }
    }

    #[test]
    fn node_starts_sends_and_exposes_ble_wire() {
        let node = EmbeddedNode::new();
        let sub = node.subscribe_events().expect("subscribe");
        assert_eq!(node.get_status().run_state, NodeRunState::Stopped);

        node.start(config()).expect("start");
        node.set_link_state(LinkState::Up).expect("link up");
        let receipt = node.send([0x99; 16], b"hello", SendOptions).expect("send");
        assert_eq!(receipt.epoch, 1);
        assert_eq!(receipt.target_count, 1);

        #[cfg(feature = "std")]
        thread::sleep(Duration::from_millis(60));
        #[cfg(not(feature = "std"))]
        node.tick(0).expect("tick");

        let first = node.take_outbound_wire().expect("take outbound").expect("frame");
        let second = node.take_outbound_wire().expect("take outbound").expect("frame");
        let decoded_first = decode_frame(&first).expect("decode first");
        let decoded_second = decode_frame(&second).expect("decode second");
        assert_eq!(decoded_first.kind, crate::FRAME_KIND_LXMF_MESSAGE);
        assert_eq!(decoded_second.kind, crate::FRAME_KIND_ANNOUNCE);

        let status = node.get_status();
        assert_eq!(status.run_state, NodeRunState::Running);
        assert_eq!(status.epoch, 1);

        assert_eq!(sub.next(0).expect("poll"), PollResult::NodeRestarted { epoch: 1 });
    }

    #[test]
    fn restart_increments_epoch_and_stop_is_idempotent() {
        let node = EmbeddedNode::new();
        let sub = node.subscribe_events().expect("subscribe");
        node.start(config()).expect("start");
        assert_eq!(node.get_status().epoch, 1);

        node.restart(config()).expect("restart");
        assert_eq!(node.get_status().epoch, 2);
        assert_eq!(sub.next(0).expect("signal"), PollResult::NodeRestarted { epoch: 1 });
        assert_eq!(sub.next(0).expect("signal"), PollResult::NodeStopped);
        assert_eq!(sub.next(0).expect("signal"), PollResult::NodeRestarted { epoch: 2 });

        node.stop().expect("stop");
        node.stop().expect("stop twice");
        let status = node.get_status();
        assert_eq!(status.run_state, NodeRunState::Stopped);
        assert_eq!(status.epoch, 2);
    }

    #[test]
    fn broadcast_requires_destinations_and_tracks_log_level() {
        let node = EmbeddedNode::new();
        node.start(config()).expect("start");
        node.set_log_level(NodeLogLevel::Debug).expect("log level");

        let err =
            node.broadcast(b"hello", BroadcastOptions::default()).expect_err("empty broadcast");
        assert_eq!(err, NodeError::InvalidConfig);

        let receipt = node
            .broadcast(b"hello", BroadcastOptions { destinations: vec![[0x11; 16], [0x22; 16]] })
            .expect("broadcast");
        assert_eq!(receipt.target_count, 2);
        assert_eq!(node.get_status().log_level, NodeLogLevel::Debug);
        assert_eq!(node.get_status().pending_outbound, 2);
    }

    #[test]
    fn queue_pressure_is_stable_and_broadcast_is_all_or_nothing() {
        let mut small = config();
        small.runtime.max_outbound_queue = 1;

        let node = EmbeddedNode::new();
        node.start(small).expect("start");

        let receipt = node.send([0xAA; 16], b"one", SendOptions).expect("first send");
        assert_eq!(receipt.target_count, 1);
        assert_eq!(node.get_status().pending_outbound, 1);

        let err = node.send([0xBB; 16], b"two", SendOptions).expect_err("queue pressure");
        assert_eq!(err, NodeError::QueuePressure);
        assert_eq!(node.get_status().pending_outbound, 1);

        node.stop().expect("stop");

        let node = EmbeddedNode::new();
        node.start(config()).expect("start");
        let err = node
            .broadcast(b"hello", BroadcastOptions { destinations: vec![[0x11; 16]; 9] })
            .expect_err("broadcast queue pressure");
        assert_eq!(err, NodeError::QueuePressure);
        assert_eq!(node.get_status().pending_outbound, 0);
    }

    #[test]
    fn subscriptions_observe_runtime_events_and_close() {
        let node = EmbeddedNode::new();
        let sub = node.subscribe_events().expect("subscribe");
        node.start(config()).expect("start");
        node.set_link_state(LinkState::Up).expect("link up");

        #[cfg(feature = "std")]
        thread::sleep(Duration::from_millis(60));
        #[cfg(not(feature = "std"))]
        node.tick(0).expect("tick");

        assert!(matches!(sub.next(0).expect("restart"), PollResult::NodeRestarted { epoch: 1 }));
        assert!(matches!(
            sub.next(0).expect("event"),
            PollResult::Event(crate::node::NodeEvent {
                kind: NodeEventKind::StatusChanged { .. },
                ..
            })
        ));

        sub.close().expect("close");
        assert_eq!(sub.next(0).expect("closed"), PollResult::Closed);
    }

    #[cfg(feature = "std")]
    #[test]
    fn subscription_timeout_overflow_returns_timeout() {
        let node = EmbeddedNode::new();
        let sub = node.subscribe_events().expect("subscribe");

        assert_eq!(sub.next(u64::MAX).expect("overflow timeout"), PollResult::Timeout);
    }

    #[cfg(feature = "std")]
    #[test]
    fn managed_mode_rejects_manual_tick() {
        let node = EmbeddedNode::new();
        node.start(config()).expect("start");

        assert_eq!(node.tick(0).expect_err("mode conflict"), NodeError::ModeConflict);
    }

    #[cfg(feature = "std")]
    #[test]
    fn blocked_next_wakes_on_stop() {
        let node = EmbeddedNode::new();
        let sub = node.subscribe_events().expect("subscribe");
        node.start(config()).expect("start");
        thread::sleep(Duration::from_millis(60));
        assert!(matches!(sub.next(0).expect("restart"), PollResult::NodeRestarted { .. }));
        while !matches!(sub.next(0).expect("drain"), PollResult::Timeout) {}

        let waiter = sub.clone();
        let handle = thread::spawn(move || waiter.next(5_000).expect("waiter"));
        thread::sleep(Duration::from_millis(50));
        node.stop().expect("stop");

        assert_eq!(handle.join().expect("join"), PollResult::NodeStopped);
    }

    #[cfg(feature = "std")]
    #[test]
    fn blocked_next_wakes_on_restart() {
        let node = EmbeddedNode::new();
        let sub = node.subscribe_events().expect("subscribe");
        node.start(config()).expect("start");
        thread::sleep(Duration::from_millis(60));
        assert!(matches!(sub.next(0).expect("restart"), PollResult::NodeRestarted { .. }));
        while !matches!(sub.next(0).expect("drain"), PollResult::Timeout) {}

        let waiter = sub.clone();
        let handle = thread::spawn(move || waiter.next(5_000).expect("waiter"));
        thread::sleep(Duration::from_millis(50));
        node.restart(config()).expect("restart");

        assert_eq!(handle.join().expect("join"), PollResult::NodeStopped);
        assert!(matches!(
            sub.next(0).expect("restart signal"),
            PollResult::NodeRestarted { epoch: 2 }
        ));
    }

    #[cfg(feature = "std")]
    #[test]
    fn closing_subscription_wakes_blocked_waiter() {
        let node = EmbeddedNode::new();
        let sub = node.subscribe_events().expect("subscribe");
        node.start(config()).expect("start");
        thread::sleep(Duration::from_millis(60));
        assert!(matches!(sub.next(0).expect("restart"), PollResult::NodeRestarted { .. }));
        while !matches!(sub.next(0).expect("drain"), PollResult::Timeout) {}

        let waiter = sub.clone();
        let handle = thread::spawn(move || waiter.next(5_000).expect("waiter"));
        thread::sleep(Duration::from_millis(50));
        sub.close().expect("close");

        assert_eq!(handle.join().expect("join"), PollResult::Closed);
    }
}