nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
// Copyright 2025 David Weikersdorfer

use nodo::{codelet::ScheduleBuilder, monitors::MonitorStatus, prelude::*};
use nodo_runtime::Runtime;
use std::time::Duration;

struct Alice;

signals! {
    AliceSignals {
        flag: bool,
    }
}

impl Codelet for Alice {
    type Status = DefaultStatus;
    type Config = ();
    type Rx = ();
    type Tx = MessageTx<()>;
    type Signals = AliceSignals;

    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
        ((), MessageTx::new(1))
    }

    fn step(&mut self, cx: Context<Self>, _rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
        cx.signals.flag.set(true);
        tx.push(cx.clocks.sys_mono.now(), ())?;
        SUCCESS
    }
}

struct Bob;

signals! {
    BobSignals {
        count: usize,
    }
}

impl Codelet for Bob {
    type Status = DefaultStatus;
    type Config = ();
    type Rx = MessageRx<()>;
    type Tx = ();
    type Signals = BobSignals;

    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
        (
            MessageRx::new(OverflowPolicy::Forget(1), RetentionPolicy::Keep),
            (),
        )
    }

    fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, _tx: &mut Self::Tx) -> Outcome {
        for _ in rx.drain(..) {
            cx.signals.count.increment();
        }
        SUCCESS
    }
}

#[test]
fn test_signals() -> eyre::Result<()> {
    const STEP_COUNT: usize = 10;

    let mut rt = Runtime::new();

    let monitor = rt.setup_monitors(AppMonitorDef::new())?;

    let mut alice = Alice.into_instance("alice", ());
    let mut bob = Bob.into_instance("bob", ());
    connect(&mut alice.tx, &mut bob.rx)?;

    let schedule = ScheduleBuilder::new()
        .with_name("test_auto_signals")
        .with_max_step_count(STEP_COUNT)
        .with_period(Duration::from_millis(1))
        .with(alice)
        .with(bob);

    rt.add_codelet_schedule(schedule);
    rt.spin();

    assert_eq!(
        monitor.get("alice", &GaugeKey::TxTotal("out".into()))?,
        Some(GaugeValue::Usize(STEP_COUNT))
    );

    assert_eq!(
        monitor.get("alice", &GaugeKey::SignalValue("flag".into()))?,
        Some(GaugeValue::Bool(true))
    );

    assert_eq!(
        monitor.get("bob", &GaugeKey::SignalValue("count".into()))?,
        Some(GaugeValue::Usize(STEP_COUNT))
    );

    assert_eq!(
        monitor.get_signal("alice", "flag")?.unwrap().value,
        SignalValue::Bool(true)
    );

    assert_eq!(
        monitor.get_signal("bob", "count")?.unwrap().value,
        SignalValue::Usize(STEP_COUNT)
    );

    Ok(())
}

#[test]
fn test_signals_fail() -> eyre::Result<()> {
    const STEP_COUNT: usize = 10;

    let mut rt = Runtime::new();

    let mut monitor_def = AppMonitorDef::new();
    monitor_def.push(
        "bob",
        GaugeKey::SignalValue("count".into()),
        "",
        monitors::Equals::new(STEP_COUNT + 1),
    )?;

    let monitor = rt.setup_monitors(monitor_def)?;

    let mut alice = Alice.into_instance("alice", ());
    let mut bob = Bob.into_instance("bob", ());
    connect(&mut alice.tx, &mut bob.rx)?;

    let schedule = ScheduleBuilder::new()
        .with_name("test_auto_signals")
        .with_max_step_count(STEP_COUNT)
        .with_period(Duration::from_millis(1))
        .with(alice)
        .with(bob)
        .into();

    rt.add_codelet_schedule(schedule);
    rt.spin();

    assert_eq!(monitor.status()?, MonitorStatus::Critical);

    assert_eq!(
        monitor.get_failed_observables()?,
        vec![("bob".into(), GaugeKey::SignalValue("count".into()))]
    );

    Ok(())
}