nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
use nodo::{
    channels::{ConnectionCheck, FlushResult, RxBundle, SyncResult, TxBundle},
    codelet::ScheduleBuilder,
    prelude::*,
};
use nodo_runtime::Runtime;
use std::time::Duration;

struct VecRxBundle<T>(Vec<(String, MessageRx<T>)>);

impl<T: Clone + Send + Sync> VecRxBundle<T> {
    pub fn from_iter<I>(items: I) -> Self
    where
        I: IntoIterator<Item = MessageRx<T>>,
    {
        Self(
            items
                .into_iter()
                .enumerate()
                .map(|(i, rx)| (format!("{}", i), rx))
                .collect(),
        )
    }
}

impl<T: Send + Sync> RxBundle for VecRxBundle<T> {
    fn channel_count(&self) -> usize {
        self.0.len()
    }

    fn name(&self, index: usize) -> &str {
        &self.0[index].0
    }

    fn inbox_message_count(&self, index: usize) -> usize {
        assert!(index < self.channel_count());
        self.0[index].1.len()
    }

    fn sync_all(&mut self, results: &mut [SyncResult]) {
        for i in 0..self.channel_count() {
            results[i] = self.0[i].1.sync();
        }
    }

    fn check_connection(&self) -> ConnectionCheck {
        let mut cc = ConnectionCheck::new(self.channel_count());
        for i in 0..self.channel_count() {
            cc.mark(i, self.0[i].1.is_connected());
        }
        cc
    }
}

struct VecTxBundle<T>(Vec<(String, MessageTx<T>)>);

impl<T: Clone + Send + Sync> VecTxBundle<T> {
    pub fn from_iter<I>(items: I) -> Self
    where
        I: IntoIterator<Item = MessageTx<T>>,
    {
        Self(
            items
                .into_iter()
                .enumerate()
                .map(|(i, rx)| (format!("{}", i), rx))
                .collect(),
        )
    }
}

impl<T: Clone + Send + Sync> TxBundle for VecTxBundle<T> {
    fn channel_count(&self) -> usize {
        self.0.len()
    }

    fn name(&self, index: usize) -> &str {
        &self.0[index].0
    }

    fn outbox_message_count(&self, index: usize) -> usize {
        assert!(index < self.channel_count());
        self.0[index].1.len()
    }

    fn flush_all(&mut self, results: &mut [FlushResult]) {
        for i in 0..self.channel_count() {
            results[i] = self.0[i].1.flush();
        }
    }

    fn check_connection(&self) -> ConnectionCheck {
        let mut cc = ConnectionCheck::new(self.channel_count());
        for i in 0..self.channel_count() {
            cc.mark(i, self.0[i].1.is_connected());
        }
        cc
    }
}

#[derive(Clone)]
struct Ping;

struct John;

#[derive(Config, Default)]
struct JohnConfig {
    rx_count: usize,
    tx_count: usize,
}

signals! {
    JohnSignals {
        pub_count: usize,
        recv_count: usize,
    }
}

impl Codelet for John {
    type Status = DefaultStatus;
    type Config = JohnConfig;
    type Rx = VecRxBundle<Ping>;
    type Tx = VecTxBundle<Ping>;
    type Signals = JohnSignals;

    fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
        (
            VecRxBundle::from_iter(
                (0..cfg.rx_count)
                    .map(|_| MessageRx::new(OverflowPolicy::Forget(1), RetentionPolicy::Keep)),
            ),
            VecTxBundle::from_iter((0..cfg.tx_count).map(|_| MessageTx::new(1))),
        )
    }

    fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
        for (_, rx) in rx.0.iter_mut() {
            cx.signals.recv_count.add(rx.len());
            rx.clear();
        }

        for (_, tx) in tx.0.iter_mut() {
            cx.signals.pub_count.increment();
            tx.push(cx.clocks.sys_mono.now(), Ping)?;
        }

        SUCCESS
    }
}

/// A quite large application which can be used for performance analysis
#[test]
fn test_big_app() -> eyre::Result<()> {
    let mut rt = Runtime::new();

    const NODE_COUNT: usize = 1000;
    const EDGE_COUNT: usize = 10000;

    let mut rnd_x = 1;
    let mut rnd_f = || -> usize {
        rnd_x = (48271 * rnd_x) % 2147483647;
        rnd_x
    };

    let mut cfg = (0..NODE_COUNT)
        .map(|_| JohnConfig::default())
        .collect::<Vec<_>>();

    let mut edges = Vec::new();
    for _ in 0..EDGE_COUNT {
        let ai = rnd_f() % NODE_COUNT;
        let bi = rnd_f() % NODE_COUNT;
        if ai == bi {
            continue;
        }
        let ac = cfg[ai].tx_count;
        let bc = cfg[bi].rx_count;
        cfg[ai].tx_count += 1;
        cfg[bi].rx_count += 1;
        edges.push(((ai, ac), (bi, bc)));
    }

    let mut nodes = cfg
        .into_iter()
        .enumerate()
        .map(|(i, cfg)| John.into_instance(format!("john_{i:03}"), cfg))
        .collect::<Vec<_>>();

    for ((ai, ac), (bi, bc)) in edges {
        let (a, b) = vec_get_mut_2(&mut nodes, ai, bi).unwrap();
        connect(&mut a.tx.0[ac].1, &mut b.rx.0[bc].1)?;
    }

    let mut schedule = ScheduleBuilder::new()
        .with_name("test_big_app")
        .with_max_step_count(250)
        .with_period(Duration::from_millis(1));

    for node in nodes {
        schedule.append(node);
    }

    rt.add_codelet_schedule(schedule);

    rt.spin();

    Ok(())
}

fn vec_get_mut_2<T>(v: &mut [T], i: usize, j: usize) -> Option<(&mut T, &mut T)> {
    if i < j {
        let (head, tail) = v.split_at_mut(j);
        Some((&mut head[i], &mut tail[0]))
    } else if j < i {
        let (head, tail) = v.split_at_mut(i);
        Some((&mut tail[0], &mut head[j]))
    } else {
        // i == j
        None
    }
}