use core::time::Duration;
use nodo::{
codelet::{CodeletInstance, ScheduleBuilder},
prelude::*,
};
use nodo_std::{Serializer, SerializerConfig, TopicJoin, TopicJoinConfig};
use serde::{Deserialize, Serialize};
mod bincode_format;
mod r#pub;
mod snappy_bincode_format;
mod sub;
pub use bincode_format::*;
pub use r#pub::*;
pub use snappy_bincode_format::*;
pub use sub::*;
#[derive(Debug, Serialize, Deserialize)]
pub struct NngPubSubHeader {
pub magic: u64,
pub seq: u64,
pub stamp: Stamp,
pub payload_checksum: u32,
}
impl NngPubSubHeader {
pub const MAGIC: u64 = 0x90D0ABCDABCD90D0;
pub const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_AUTOSAR);
pub const BINCODE_SIZE: usize = 44;
}
pub struct Publisher {
tag: String,
join: CodeletInstance<TopicJoin<Vec<u8>>>,
nng_pub: CodeletInstance<NngPub>,
schedule_builder: ScheduleBuilder,
}
impl Publisher {
pub fn new(tag: &str, address: &str) -> Self {
let mut join = TopicJoin::instantiate(format!("{tag}_join"), TopicJoinConfig::default());
let mut nng_pub = NngPub::instantiate(
format!("{tag}_nng_pub"),
NngPubConfig {
address: address.to_string(),
queue_size: 24,
enable_statistics: false,
},
);
connect(&mut join.tx, &mut nng_pub.rx).unwrap(); Self {
tag: tag.to_string(),
join,
nng_pub,
schedule_builder: nodo::codelet::ScheduleBuilder::new()
.with_name("vis")
.with_period(Duration::from_millis(10)),
}
}
pub fn schedule_builder_mut(&mut self) -> &mut ScheduleBuilder {
&mut self.schedule_builder
}
pub fn publish<T>(
&mut self,
topic: &str,
tx: &mut DoubleBufferTx<Message<T>>,
) -> eyre::Result<()>
where
T: Clone + Send + Sync + Serialize + for<'a> Deserialize<'a> + 'static,
{
let mut ser = Serializer::new(Bincode::default()).into_instance(
format!("{}_ser_{topic}", self.tag),
SerializerConfig::default(),
);
connect(tx, &mut ser.rx)?;
connect(&mut ser.tx, &mut self.join.rx.add(topic.into()))?;
self.schedule_builder.append(ser);
Ok(())
}
pub fn into_sequence(self) -> Sequence {
Sequence::new().with(self.join).with(self.nng_pub)
}
}
#[cfg(test)]
mod tests {
use crate::{Bincode, NngPub, NngPubConfig, NngSub, NngSubConfig};
use core::time::Duration;
use nodo::{core::WithTopic, prelude::*};
use nodo_runtime::Runtime;
use nodo_std::{
Deserializer, DeserializerConfig, Log, Pipe, PipeConfig, Serializer, SerializerConfig,
Sink, Source,
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, RwLock};
#[test]
fn test_pub_sub() {
env_logger::init();
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Foo {
number: u32,
}
const ADDRESS: &str = "tcp://127.0.0.1:7789";
const MESSAGE_COUNT: usize = 25;
let mut rt = Runtime::new();
let mut tx_counter = 0;
let mut issue = Source::new(move || {
tx_counter += 1;
Message {
seq: tx_counter,
stamp: Stamp {
acqtime: Duration::from_millis(1000 + tx_counter).into(),
pubtime: Duration::from_millis(tx_counter).into(),
},
value: Foo {
number: tx_counter as u32,
},
}
})
.into_instance("issue", ());
let mut ser = Serializer::new(Bincode::default())
.into_instance("ser", SerializerConfig { queue_size: 1 });
let mut add_topic = Pipe::new(|msg: Message<Vec<u8>>| {
msg.map(|value| WithTopic {
topic: "test".into(),
value,
})
})
.into_instance("add_topic", PipeConfig::Dynamic);
let mut alice = NngPub::instantiate(
"alice",
NngPubConfig {
address: ADDRESS.to_string(),
queue_size: 10,
enable_statistics: false,
},
);
let mut bob = NngSub::instantiate(
"bob",
NngSubConfig {
address: ADDRESS.to_string(),
queue_size: 10,
},
);
let mut rmv_topic =
Pipe::new(|msg: Message<WithTopic<Vec<u8>>>| msg.map(|WithTopic { value, .. }| value))
.into_instance("add_topic", PipeConfig::Dynamic);
let mut de = Deserializer::<Foo, _>::new(Bincode::default())
.into_instance("de", DeserializerConfig { queue_size: 1 });
let mut log = Log::instantiate("log", ());
let rx_counter = Arc::new(RwLock::new(0));
let mut check = {
let rx_counter = rx_counter.clone();
let ctrl = rt.tx_control();
Sink::new(move |foo: Message<Foo>| {
assert!(foo.value.number as usize > *rx_counter.read().unwrap());
*rx_counter.write().unwrap() += 1;
if *rx_counter.read().unwrap() == MESSAGE_COUNT {
ctrl.send(RuntimeControl::RequestStop)?;
}
SUCCESS
})
.into_instance("check", ())
};
connect(&mut issue.tx, &mut ser.rx).unwrap();
connect(&mut ser.tx, &mut add_topic.rx).unwrap();
connect(&mut add_topic.tx, &mut alice.rx).unwrap();
connect(&mut bob.tx, &mut rmv_topic.rx).unwrap();
connect(&mut rmv_topic.tx, &mut de.rx).unwrap();
connect(&mut de.tx, &mut log.rx).unwrap();
connect(&mut de.tx, &mut check.rx).unwrap();
rt.add_codelet_schedule(
nodo::codelet::ScheduleBuilder::new()
.with_period(Duration::from_millis(1))
.with(issue)
.with(ser)
.with(add_topic)
.with(alice)
.with(bob)
.with(rmv_topic)
.with(de)
.with(log)
.with(check)
.into(),
);
rt.spin();
assert_eq!(*rx_counter.read().unwrap(), MESSAGE_COUNT);
}
}