use crate::proto::nodo as nodo_pb;
use eyre::Result;
use lz4_flex::{compress_prepend_size, decompress_size_prepended};
use nng::{
options::{protocol::pubsub::Subscribe, Options},
Protocol, Socket,
};
use prost::Message;
use std::time::Instant;
pub struct InspectorServer {
socket: Socket,
buffer: Vec<u8>,
}
impl InspectorServer {
pub fn open(address: &str) -> Result<Self> {
log::info!("Opening Inspector PUB socket at '{}'..", address);
let socket = Socket::new(Protocol::Pub0)?;
socket.pipe_notify(move |_, ev| {
log::trace!("pipe_notify: {ev:?}");
})?;
socket.listen(address)?;
Ok(Self {
socket,
buffer: Vec::with_capacity(1000),
})
}
pub fn send_report(&mut self, report: nodo_pb::Report) -> Result<()> {
self.buffer.clear();
report.encode(&mut self.buffer)?;
let compressed = compress_prepend_size(&self.buffer);
self.socket.send(&compressed).map_err(|(_, err)| err)?;
Ok(())
}
}
pub struct InspectorClient {
socket: Socket,
datarate: DatarateEstimation,
}
impl InspectorClient {
pub fn dial(address: &str) -> Result<Self> {
log::info!("Opening Inspector SUB socket at '{}'..", address);
let socket = Socket::new(Protocol::Sub0)?;
socket.pipe_notify(move |_, ev| {
log::trace!("pipe_notify: {ev:?}");
})?;
socket.dial_async(address)?;
socket.set_opt::<Subscribe>(vec![])?;
Ok(Self {
socket,
datarate: DatarateEstimation::default(),
})
}
pub fn try_recv_report(&mut self) -> Result<Option<nodo_pb::Report>> {
let mut maybe_buff = None;
loop {
match self.socket.try_recv() {
Ok(buff) => {
self.datarate.push(buff.len() as u64);
maybe_buff = Some(buff);
}
Err(nng::Error::TryAgain) => break,
Err(err) => return Err(err)?,
}
}
if let Some(buff) = maybe_buff {
let uncompressed = decompress_size_prepended(&buff)?;
Ok(Some(nodo_pb::Report::decode(uncompressed.as_slice())?))
} else {
Ok(None)
}
}
pub fn datarate(&self) -> f64 {
self.datarate.datarate()
}
}
#[derive(Default)]
pub struct DatarateEstimation {
total_bytes_received: u64,
datarate: f64,
last_step: Option<Instant>,
bytes_since_last_step: u64,
}
impl DatarateEstimation {
pub fn push(&mut self, len: u64) {
self.bytes_since_last_step += len;
self.total_bytes_received += len;
let now = Instant::now();
if let Some(prev) = self.last_step {
let dt = (now - prev).as_secs_f64();
if dt > 1.0 {
self.last_step = Some(now);
self.datarate =
0.5 * self.datarate + 0.5 * (self.bytes_since_last_step as f64) / dt;
self.bytes_since_last_step = 0;
}
} else {
self.last_step = Some(now);
}
}
pub fn datarate(&self) -> f64 {
self.datarate
}
}