use crate::NngPubSubHeader;
use eyre::eyre;
use log::{error, info, trace};
use nng::{
options::{protocol::pubsub::Subscribe, Options},
Protocol, Socket,
};
use nodo::{
core::{Topic, WithTopic},
prelude::*,
};
pub struct NngSub {
socket: Option<Socket>,
message_count: usize,
}
#[derive(Config)]
pub struct NngSubConfig {
pub address: String,
pub queue_size: usize,
}
impl Default for NngSub {
fn default() -> Self {
Self {
socket: None,
message_count: 0,
}
}
}
impl Codelet for NngSub {
type Status = DefaultStatus;
type Config = NngSubConfig;
type Rx = ();
type Tx = DoubleBufferTx<Message<WithTopic<Vec<u8>>>>;
type Signals = ();
fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
assert!(cfg.queue_size > 0, "queue_size must be at least 1");
((), DoubleBufferTx::new(cfg.queue_size))
}
fn start(&mut self, cx: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
info!("Opening SUB socket at '{}'..", cx.config.address);
let socket = Socket::new(Protocol::Sub0)?;
socket.pipe_notify(move |_, ev| {
trace!("nng::socket::pipe_notify: {ev:?}");
})?;
let res = socket.dial_async(&cx.config.address);
socket.set_opt::<Subscribe>(vec![])?;
if let Err(err) = res {
error!(" {err:?}");
res?;
}
self.socket = Some(socket);
SUCCESS
}
fn stop(&mut self, _cx: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
let socket = self.socket.take().unwrap();
socket.close();
SUCCESS
}
fn step(&mut self, cx: Context<Self>, _rx: &mut Self::Rx, tx: &mut Self::Tx) -> Outcome {
let socket = self.socket.as_mut().unwrap();
let mut received_count = 0;
loop {
if received_count == cx.config.queue_size {
log::warn!(
"codelet can not keep up with reading data from socket. considering increasing queue size"
);
break;
}
match socket.try_recv() {
Ok(buff) => match Self::parse(buff) {
Ok(msg) => {
tx.push(msg)?;
self.message_count += 1;
received_count += 1;
}
Err(err) => {
log::error!("{err:?}");
}
},
Err(nng::Error::TryAgain) => {
break;
}
Err(err) => Err(err)?,
}
}
if received_count > 0 {
SUCCESS
} else {
SKIPPED
}
}
}
impl NngSub {
fn parse(msg: nng::Message) -> eyre::Result<Message<WithTopic<Vec<u8>>>> {
let data = msg.as_slice();
let (cstr, data) = parse_cstr(data)?;
let topic: Topic = cstr.into();
let header: NngPubSubHeader =
bincode::deserialize(&data[0..NngPubSubHeader::BINCODE_SIZE])?;
if header.magic != NngPubSubHeader::MAGIC {
return Err(eyre!("invalid header magic"));
}
let value = data[NngPubSubHeader::BINCODE_SIZE..].to_vec();
let checksum = NngPubSubHeader::CRC.checksum(&value);
if header.payload_checksum != checksum {
return Err(eyre!(
"payload failed checksum test: expected={}, actual={}",
header.payload_checksum,
checksum
));
}
Ok(Message {
seq: header.seq,
stamp: header.stamp,
value: WithTopic { topic, value },
})
}
}
fn parse_cstr(utf8_src: &[u8]) -> eyre::Result<(&str, &[u8])> {
let end = utf8_src
.iter()
.position(|&c| c == b'\0')
.ok_or_else(|| eyre!("null terminator not found"))?;
Ok(::std::str::from_utf8(&utf8_src[0..end]).map(|x| (x, &utf8_src[end + 1..]))?)
}