use nodo::{
channels::{MessageRx, MessageTx},
codelet::ScheduleBuilder,
prelude::*,
};
use nodo_runtime::Runtime;
use std::time::Duration;
#[derive(Clone)]
pub struct Ping(String);
const NUM_MESSAGES: usize = 85;
#[derive(Default)]
struct Alice {
num_sent: usize,
}
#[derive(TxBundleDerive)]
struct AliceTx {
ping: MessageTx<Ping>,
}
impl Codelet for Alice {
type Status = DefaultStatus;
type Config = ();
type Rx = ();
type Tx = AliceTx;
type Signals = ();
fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
(
(),
AliceTx {
ping: MessageTx::new(1),
},
)
}
fn step(&mut self, cx: Context<Self>, _: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
tx.ping.push(
cx.clocks.sys_mono.now(),
Ping(format!("hello_{}", self.num_sent)),
)?;
self.num_sent += 1;
SUCCESS
}
fn stop(&mut self, _: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
assert_eq!(self.num_sent, NUM_MESSAGES);
SUCCESS
}
}
#[derive(Default)]
struct Bob {
num_recv: usize,
}
#[derive(RxBundleDerive)]
struct BobRx {
ping: MessageRx<Ping>,
}
impl Codelet for Bob {
type Status = DefaultStatus;
type Config = ();
type Rx = BobRx;
type Tx = ();
type Signals = ();
fn build_bundles(_: &Self::Config) -> (Self::Rx, Self::Tx) {
(
BobRx {
ping: MessageRx::new(OverflowPolicy::Reject(1), RetentionPolicy::Drop),
},
(),
)
}
fn step(&mut self, _: Context<Self>, rx: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
let ping = rx.ping.pop()?;
assert_eq!(ping.value.0, format!("hello_{}", self.num_recv));
self.num_recv += 1;
SUCCESS
}
fn stop(&mut self, _: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
assert_eq!(self.num_recv, NUM_MESSAGES);
SUCCESS
}
}
use std::sync::Once;
static INIT: Once = Once::new();
fn init_reporting() {
INIT.call_once(|| {
env_logger::init();
});
}
fn test_schedule(schedule: ScheduleBuilder) {
let mut rt = Runtime::new();
rt.add_codelet_schedule(schedule);
rt.spin();
}
#[test]
fn alice_bob_codelets() {
init_reporting();
let mut alice = Alice::instantiate("alice", ());
let mut bob = Bob::instantiate("bob", ());
connect(&mut alice.tx.ping, &mut bob.rx.ping).unwrap();
test_schedule(
ScheduleBuilder::new()
.with_period(Duration::from_millis(2))
.with_max_step_count(NUM_MESSAGES)
.with(alice)
.with(bob),
);
}
#[test]
fn alice_double_bob_codelets() {
init_reporting();
let mut alice = Alice::instantiate("alice", ());
let mut bob_1 = Bob::instantiate("bob 1", ());
let mut bob_2 = Bob::instantiate("bob 2", ());
connect(&mut alice.tx.ping, &mut bob_1.rx.ping).unwrap();
connect(&mut alice.tx.ping, &mut bob_2.rx.ping).unwrap();
test_schedule(
ScheduleBuilder::new()
.with_period(Duration::from_millis(2))
.with_max_step_count(NUM_MESSAGES)
.with(alice)
.with(bob_1)
.with(bob_2),
);
}
#[test]
fn alice_many_bobs_codelets() {
init_reporting();
let mut alice = Alice::instantiate("alice", ());
let mut bobs = (0..50)
.map(|i| Bob::instantiate(format!("bob {i}"), ()))
.collect::<Vec<_>>();
for bob in bobs.iter_mut() {
connect(&mut alice.tx.ping, &mut bob.rx.ping).unwrap();
}
let mut schedule = ScheduleBuilder::new()
.with_period(Duration::from_millis(2))
.with_max_step_count(NUM_MESSAGES)
.with(alice);
for bob in bobs.into_iter() {
schedule.append(bob);
}
test_schedule(schedule);
}