use crate::NngPubSubHeader;
use log::{error, info, trace};
use nng::{Protocol, Socket};
use nodo::{
core::{Topic, WithTopic},
prelude::*,
};
use std::{collections::HashMap, time::Instant};
pub struct NngPub {
socket: Option<Socket>,
statistics: Option<Statistics>,
}
#[derive(Config)]
pub struct NngPubConfig {
pub address: String,
pub queue_size: usize,
pub enable_statistics: bool,
}
#[derive(Default)]
pub struct Statistics {
items: HashMap<String, TopicStatistics>,
last_sec: Option<Instant>,
}
impl Statistics {
pub fn add(&mut self, topic: &str, size: usize) {
if let Some(item) = self.items.get_mut(topic) {
item.add(size);
} else {
let mut item = TopicStatistics::default();
item.add(size);
self.items.insert(topic.into(), item);
}
}
pub fn step(&mut self) {
let now = Instant::now();
if self.last_sec.is_none() {
self.last_sec = Some(now);
}
let delta = (now - self.last_sec.unwrap()).as_secs_f64();
if delta >= 1.0 {
for (_, item) in self.items.iter_mut() {
item.reset_sec();
}
self.last_sec = Some(now);
self.print_report();
}
}
pub fn print_report(&self) {
println!("NngPub statistics:");
for (topic, item) in self.items.iter() {
println!(
" [{topic}] {} Hz | {:.1} kB/s",
item.last_sec_count,
item.last_sec_size as f64 / 1024.
);
}
}
}
#[derive(Default)]
pub struct TopicStatistics {
total_size: usize,
total_count: usize,
current_sec_size: usize,
current_sec_count: usize,
last_sec_size: usize,
last_sec_count: usize,
}
impl TopicStatistics {
pub fn add(&mut self, size: usize) {
self.total_size += size;
self.total_count += 1;
self.current_sec_size += size;
self.current_sec_count += 1;
}
pub fn reset_sec(&mut self) {
self.last_sec_size = self.current_sec_size;
self.last_sec_count = self.current_sec_count;
self.current_sec_size = 0;
self.current_sec_count = 0;
}
}
impl Default for NngPub {
fn default() -> Self {
Self {
socket: None,
statistics: None,
}
}
}
impl Codelet for NngPub {
type Status = DefaultStatus;
type Config = NngPubConfig;
type Rx = DoubleBufferRx<Message<WithTopic<Vec<u8>>>>;
type Tx = ();
type Signals = ();
fn build_bundles(cfg: &Self::Config) -> (Self::Rx, Self::Tx) {
(
DoubleBufferRx::new(
OverflowPolicy::Forget(cfg.queue_size),
RetentionPolicy::Drop,
),
(),
)
}
fn start(&mut self, cx: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
info!("Opening PUB socket at '{}'..", cx.config.address);
let socket = Socket::new(Protocol::Pub0)?;
socket.pipe_notify(move |_, ev| {
trace!("nng::socket::pipe_notify: {ev:?}");
})?;
let res = socket.listen(&cx.config.address);
if let Err(err) = res {
error!(" {err:?}");
res?;
}
self.socket = Some(socket);
if cx.config.enable_statistics {
self.statistics = Some(Statistics::default());
}
SUCCESS
}
fn stop(&mut self, _: Context<Self>, _: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
let socket = self.socket.take().unwrap();
socket.close();
SUCCESS
}
fn step(&mut self, _: Context<Self>, rx: &mut Self::Rx, _: &mut Self::Tx) -> Outcome {
let socket = self.socket.as_mut().unwrap();
let mut count = 0;
while let Some(message) = rx.try_pop() {
let topic_buffer = serialize_topic(&message.value.topic);
let header = NngPubSubHeader {
magic: NngPubSubHeader::MAGIC,
seq: message.seq.try_into()?,
stamp: message.stamp,
payload_checksum: NngPubSubHeader::CRC.checksum(&message.value.value),
};
let header_buffer = bincode::serialize(&header)?;
let outmsg_size = topic_buffer.len() + header_buffer.len() + message.value.value.len();
let mut outmsg = nng::Message::with_capacity(outmsg_size);
outmsg.push_back(&topic_buffer);
outmsg.push_back(&header_buffer);
outmsg.push_back(&message.value.value);
socket.send(outmsg).map_err(|(_, err)| err)?;
count += 1;
if let Some(stats) = self.statistics.as_mut() {
let topic_str: String = (&message.value.topic).into();
stats.add(&topic_str, outmsg_size);
}
}
if let Some(stats) = self.statistics.as_mut() {
stats.step();
}
if count > 0 {
SUCCESS
} else {
SKIPPED
}
}
}
fn serialize_topic(topic: &Topic) -> Vec<u8> {
let mut out = match topic {
Topic::Text(text) => text.as_bytes().to_vec(),
Topic::Id(id) => id.to_string().as_bytes().to_vec(),
};
assert!(out.iter().all(|&b| b != 0));
out.push(0);
out
}