nodo_runtime 0.18.5

Runtime for NODO applications
Documentation
// Copyright 2023 David Weikersdorfer

use crate::proto::nodo as nodo_pb;
use eyre::Result;
use lz4_flex::{compress_prepend_size, decompress_size_prepended};
use nng::{
    options::{protocol::pubsub::Subscribe, Options},
    Protocol, Socket,
};
use prost::Message;
use std::time::Instant;

/// The server is running in the nodo runtime and publishes reports
pub struct InspectorServer {
    socket: Socket,
    buffer: Vec<u8>,
}

impl InspectorServer {
    pub fn open(address: &str) -> Result<Self> {
        log::info!("Opening Inspector PUB socket at '{}'..", address);

        let socket = Socket::new(Protocol::Pub0)?;

        socket.pipe_notify(move |_, ev| {
            log::trace!("pipe_notify: {ev:?}");
        })?;

        socket.listen(address)?;

        Ok(Self {
            socket,
            buffer: Vec::with_capacity(1000),
        })
    }

    pub fn send_report(&mut self, report: nodo_pb::Report) -> Result<()> {
        self.buffer.clear();
        report.encode(&mut self.buffer)?;
        let compressed = compress_prepend_size(&self.buffer);
        self.socket.send(&compressed).map_err(|(_, err)| err)?;
        Ok(())
    }
}

/// The client is running in the report viewer and receives reports
pub struct InspectorClient {
    socket: Socket,
    datarate: DatarateEstimation,
}

impl InspectorClient {
    pub fn dial(address: &str) -> Result<Self> {
        log::info!("Opening Inspector SUB socket at '{}'..", address);

        let socket = Socket::new(Protocol::Sub0)?;

        socket.pipe_notify(move |_, ev| {
            log::trace!("pipe_notify: {ev:?}");
        })?;

        socket.dial_async(address)?;

        // subscribe to all topics
        socket.set_opt::<Subscribe>(vec![])?;

        Ok(Self {
            socket,
            datarate: DatarateEstimation::default(),
        })
    }

    pub fn try_recv_report(&mut self) -> Result<Option<nodo_pb::Report>> {
        let mut maybe_buff = None;
        loop {
            match self.socket.try_recv() {
                Ok(buff) => {
                    self.datarate.push(buff.len() as u64);
                    maybe_buff = Some(buff);
                }
                Err(nng::Error::TryAgain) => break,
                Err(err) => return Err(err)?,
            }
        }

        if let Some(buff) = maybe_buff {
            let uncompressed = decompress_size_prepended(&buff)?;
            Ok(Some(nodo_pb::Report::decode(uncompressed.as_slice())?))
        } else {
            Ok(None)
        }
    }

    pub fn datarate(&self) -> f64 {
        self.datarate.datarate()
    }
}

#[derive(Default)]
pub struct DatarateEstimation {
    total_bytes_received: u64,
    datarate: f64,
    last_step: Option<Instant>,
    bytes_since_last_step: u64,
}

impl DatarateEstimation {
    pub fn push(&mut self, len: u64) {
        self.bytes_since_last_step += len;
        self.total_bytes_received += len;

        let now = Instant::now();
        if let Some(prev) = self.last_step {
            let dt = (now - prev).as_secs_f64();
            if dt > 1.0 {
                self.last_step = Some(now);
                self.datarate =
                    0.5 * self.datarate + 0.5 * (self.bytes_since_last_step as f64) / dt;
                self.bytes_since_last_step = 0;
            }
        } else {
            self.last_step = Some(now);
        }
    }

    /// Datarate in bytes/s
    pub fn datarate(&self) -> f64 {
        self.datarate
    }
}