nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
// Copyright 2023 David Weikersdorfer

use nodo::{
    channels::{MessageRx, MessageTx},
    codelet::ScheduleBuilder,
    prelude::*,
};
use nodo_runtime::Runtime;
use std::time::Duration;

#[derive(Clone)]
pub struct Ping(String);

const NUM_MESSAGES: usize = 85;

#[derive(Default)]
struct Alice {
    num_sent: usize,
}

#[derive(TxBundleDerive)]
struct AliceTx {
    ping: MessageTx<Ping>,
}

impl Codelet for Alice {
    type Status = DefaultStatus;
    type Config = ();
    type Rx = ();
    type Tx = AliceTx;
    type Signals = ();

    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
        (
            (),
            AliceTx {
                ping: MessageTx::new(1),
            },
        )
    }

    fn step(&mut self, cx: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
        tx.ping.push(
            cx.clocks.sys_mono.now(),
            Ping(format!("hello_{}", self.num_sent)),
        )?;
        self.num_sent += 1;
        SUCCESS
    }

    fn stop(&mut self, _: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
        assert_eq!(self.num_sent, NUM_MESSAGES);
        SUCCESS
    }
}

#[derive(Default)]
struct Bob {
    num_recv: usize,
}

#[derive(RxBundleDerive)]
struct BobRx {
    ping: MessageRx<Ping>,
}

impl Codelet for Bob {
    type Status = DefaultStatus;
    type Config = ();
    type Rx = BobRx;
    type Tx = ();
    type Signals = ();

    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
        (
            BobRx {
                ping: MessageRx::new(OverflowPolicy::Reject(1), RetentionPolicy::Drop),
            },
            (),
        )
    }

    fn step(&mut self, _: Context<Self>, rx: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
        let ping = rx.ping.pop()?;
        assert_eq!(ping.value.0, format!("hello_{}", self.num_recv));
        self.num_recv += 1;
        SUCCESS
    }

    fn stop(&mut self, _: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
        assert_eq!(self.num_recv, NUM_MESSAGES);
        SUCCESS
    }
}

use std::sync::Once;

static INIT: Once = Once::new();

fn init_reporting() {
    INIT.call_once(|| {
        env_logger::init();
    });
}

fn test_schedule(schedule: ScheduleBuilder) {
    let mut rt = Runtime::new();
    rt.add_codelet_schedule(schedule);
    rt.spin();
}

#[test]
fn alice_bob_codelets() {
    init_reporting();

    let mut alice = Alice::instantiate("alice", ());
    let mut bob = Bob::instantiate("bob", ());

    connect(&mut alice.tx.ping, &mut bob.rx.ping).unwrap();

    test_schedule(
        ScheduleBuilder::new()
            .with_period(Duration::from_millis(2))
            .with_max_step_count(NUM_MESSAGES)
            .with(alice)
            .with(bob),
    );
}

#[test]
fn alice_double_bob_codelets() {
    init_reporting();

    let mut alice = Alice::instantiate("alice", ());
    let mut bob_1 = Bob::instantiate("bob 1", ());
    let mut bob_2 = Bob::instantiate("bob 2", ());

    connect(&mut alice.tx.ping, &mut bob_1.rx.ping).unwrap();
    connect(&mut alice.tx.ping, &mut bob_2.rx.ping).unwrap();

    test_schedule(
        ScheduleBuilder::new()
            .with_period(Duration::from_millis(2))
            .with_max_step_count(NUM_MESSAGES)
            .with(alice)
            .with(bob_1)
            .with(bob_2),
    );
}

#[test]
fn alice_many_bobs_codelets() {
    init_reporting();

    let mut alice = Alice::instantiate("alice", ());
    let mut bobs = (0..50)
        .map(|i| Bob::instantiate(format!("bob {i}"), ()))
        .collect::<Vec<_>>();

    for bob in bobs.iter_mut() {
        connect(&mut alice.tx.ping, &mut bob.rx.ping).unwrap();
    }

    let mut schedule = ScheduleBuilder::new()
        .with_period(Duration::from_millis(2))
        .with_max_step_count(NUM_MESSAGES)
        .with(alice);

    for bob in bobs.into_iter() {
        schedule.append(bob);
    }

    test_schedule(schedule);
}