nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
use argh::FromArgs;
use core::time::Duration;
use eyre::bail;
use nodo::{codelet::ScheduleBuilder, prelude::*};
use nodo_runtime::Runtime;
use nodo_std::{Sink, SourceWithError};
use std::thread;

#[derive(FromArgs)]
/// NODO demo app
pub struct Args {
    /// number of schedules
    #[argh(option, default = "1")]
    pub schedule_count: usize,

    /// number of sequences per schedule
    #[argh(option, default = "1")]
    pub sequence_count: usize,

    /// number of nodes per sequence
    #[argh(option, default = "1")]
    pub node_count: usize,

    /// if enabled a node will error after a while
    #[argh(option, default = "false")]
    pub simulate_error: bool,

    /// if enabled a node will panick after a while
    #[argh(option, default = "false")]
    pub simulate_panic: bool,
}

fn main() -> eyre::Result<()> {
    env_logger::init();

    let args: Args = argh::from_env();

    let mut rt = Runtime::new();

    rt.enable_inspector("tcp://localhost:54399")?;

    for k in 0..args.schedule_count {
        let mut schedule = ScheduleBuilder::new()
            .with_name(format!("schedule {k}"))
            .with_period(Duration::from_millis(10 * (1 + k) as u64));

        for i in 0..args.sequence_count {
            let mut sequence = Sequence::new().with_name(format!("sequence {i}"));

            for j in 0..args.node_count {
                let duration = Duration::from_secs_f32((k + i + j) as f32 * 0.5 / 1000.0);

                let mut alice_count = 0;
                let mut alice = SourceWithError::new(move || {
                    alice_count += 1;

                    if args.simulate_error && alice_count > 100 {
                        bail!("alice doesn't want anymore");
                    }
                    thread::sleep(duration);

                    Ok(())
                })
                .into_instance(format!("alice {k}_{i}_{j}"), ());

                let mut bob_count = 0;
                let mut bob = Sink::new(move |_| {
                    bob_count += 1;

                    if args.simulate_panic && bob_count > 150 {
                        panic!("bob doesn't want anymore");
                    }

                    thread::sleep(duration);

                    SUCCESS
                })
                .into_instance(format!("bob {k}_{i}_{j}"), ());

                connect(&mut alice.tx, &mut bob.rx)?;

                sequence.append((alice, bob));
            }

            schedule.append(sequence);
        }

        let custom = CustomStatusCodelet::instantiate(format!("custom_{}", k), ());
        schedule.append(custom);

        rt.add_codelet_schedule(schedule);
    }

    rt.enable_terminate_on_ctrl_c();

    rt.spin();

    Ok(())
}

#[derive(Default)]
struct CustomStatusCodelet(usize);

#[derive(Status)]
enum CustomStatusCodeletStatus {
    #[default]
    AsUsual,

    Foo,

    Bar,

    #[skipped]
    NoStep,
}

impl Codelet for CustomStatusCodelet {
    type Config = ();
    type Rx = ();
    type Tx = ();
    type Status = CustomStatusCodeletStatus;
    type Signals = ();

    fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
        ((), ())
    }

    fn step(
        &mut self,
        _cx: Context<Self>,
        _rx: &mut Self::Rx,
        _tx: &mut Self::Tx,
    ) -> eyre::Result<Self::Status> {
        self.0 = if self.0 == 3 { 0 } else { self.0 + 1 };
        Ok(match self.0 {
            0 => CustomStatusCodeletStatus::AsUsual,
            1 => CustomStatusCodeletStatus::Foo,
            2 => CustomStatusCodeletStatus::Bar,
            3 => CustomStatusCodeletStatus::NoStep,
            _ => unreachable!(),
        })
    }
}