use nodo::{
channels::{ConnectionCheck, FlushResult, RxBundle, SyncResult, TxBundle},
codelet::ScheduleBuilder,
prelude::*,
};
use nodo_runtime::Runtime;
use std::time::Duration;
struct VecRxBundle<T>(Vec<(String, MessageRx<T>)>);
impl<T: Clone + Send + Sync> VecRxBundle<T> {
pub fn from_iter<I>(items: I) -> Self
where
I: IntoIterator<Item = MessageRx<T>>,
{
Self(
items
.into_iter()
.enumerate()
.map(|(i, rx)| (format!("{}", i), rx))
.collect(),
)
}
}
impl<T: Send + Sync> RxBundle for VecRxBundle<T> {
fn channel_count(&self) -> usize {
self.0.len()
}
fn name(&self, index: usize) -> &str {
&self.0[index].0
}
fn inbox_message_count(&self, index: usize) -> usize {
assert!(index < self.channel_count());
self.0[index].1.len()
}
fn sync_all(&mut self, results: &mut [SyncResult]) {
for i in 0..self.channel_count() {
results[i] = self.0[i].1.sync();
}
}
fn check_connection(&self) -> ConnectionCheck {
let mut cc = ConnectionCheck::new(self.channel_count());
for i in 0..self.channel_count() {
cc.mark(i, self.0[i].1.is_connected());
}
cc
}
}
struct VecTxBundle<T>(Vec<(String, MessageTx<T>)>);
impl<T: Clone + Send + Sync> VecTxBundle<T> {
pub fn from_iter<I>(items: I) -> Self
where
I: IntoIterator<Item = MessageTx<T>>,
{
Self(
items
.into_iter()
.enumerate()
.map(|(i, rx)| (format!("{}", i), rx))
.collect(),
)
}
}
impl<T: Clone + Send + Sync> TxBundle for VecTxBundle<T> {
fn channel_count(&self) -> usize {
self.0.len()
}
fn name(&self, index: usize) -> &str {
&self.0[index].0
}
fn outbox_message_count(&self, index: usize) -> usize {
assert!(index < self.channel_count());
self.0[index].1.len()
}
fn flush_all(&mut self, results: &mut [FlushResult]) {
for i in 0..self.channel_count() {
results[i] = self.0[i].1.flush();
}
}
fn check_connection(&self) -> ConnectionCheck {
let mut cc = ConnectionCheck::new(self.channel_count());
for i in 0..self.channel_count() {
cc.mark(i, self.0[i].1.is_connected());
}
cc
}
}
#[derive(Clone)]
struct Ping;
struct John;
#[derive(Config, Default)]
struct JohnConfig {
rx_count: usize,
tx_count: usize,
}
signals! {
JohnSignals {
pub_count: usize,
recv_count: usize,
}
}
impl Codelet for John {
type Status = DefaultStatus;
type Config = JohnConfig;
type Rx = VecRxBundle<Ping>;
type Tx = VecTxBundle<Ping>;
type Signals = JohnSignals;
fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
(
VecRxBundle::from_iter(
(0..cfg.rx_count)
.map(|_| MessageRx::new(OverflowPolicy::Forget(1), RetentionPolicy::Keep)),
),
VecTxBundle::from_iter((0..cfg.tx_count).map(|_| MessageTx::new(1))),
)
}
fn step(&mut self, cx: Context<Self>, rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
for (_, rx) in rx.0.iter_mut() {
cx.signals.recv_count.add(rx.len());
rx.clear();
}
for (_, tx) in tx.0.iter_mut() {
cx.signals.pub_count.increment();
tx.push(cx.clocks.sys_mono.now(), Ping)?;
}
SUCCESS
}
}
#[test]
fn test_big_app() -> eyre::Result<()> {
let mut rt = Runtime::new();
const NODE_COUNT: usize = 1000;
const EDGE_COUNT: usize = 10000;
let mut rnd_x = 1;
let mut rnd_f = || -> usize {
rnd_x = (48271 * rnd_x) % 2147483647;
rnd_x
};
let mut cfg = (0..NODE_COUNT)
.map(|_| JohnConfig::default())
.collect::<Vec<_>>();
let mut edges = Vec::new();
for _ in 0..EDGE_COUNT {
let ai = rnd_f() % NODE_COUNT;
let bi = rnd_f() % NODE_COUNT;
if ai == bi {
continue;
}
let ac = cfg[ai].tx_count;
let bc = cfg[bi].rx_count;
cfg[ai].tx_count += 1;
cfg[bi].rx_count += 1;
edges.push(((ai, ac), (bi, bc)));
}
let mut nodes = cfg
.into_iter()
.enumerate()
.map(|(i, cfg)| John.into_instance(format!("john_{i:03}"), cfg))
.collect::<Vec<_>>();
for ((ai, ac), (bi, bc)) in edges {
let (a, b) = vec_get_mut_2(&mut nodes, ai, bi).unwrap();
connect(&mut a.tx.0[ac].1, &mut b.rx.0[bc].1)?;
}
let mut schedule = ScheduleBuilder::new()
.with_name("test_big_app")
.with_max_step_count(250)
.with_period(Duration::from_millis(1));
for node in nodes {
schedule.append(node);
}
rt.add_codelet_schedule(schedule);
rt.spin();
Ok(())
}
fn vec_get_mut_2<T>(v: &mut [T], i: usize, j: usize) -> Option<(&mut T, &mut T)> {
if i < j {
let (head, tail) = v.split_at_mut(j);
Some((&mut head[i], &mut tail[0]))
} else if j < i {
let (head, tail) = v.split_at_mut(i);
Some((&mut tail[0], &mut head[j]))
} else {
None
}
}