nodo_runtime/
inspector.rs

1// Copyright 2023 David Weikersdorfer
2
3use eyre::Result;
4use lz4_flex::{compress_prepend_size, decompress_size_prepended};
5use nng::{
6    options::{protocol::pubsub::Subscribe, Options},
7    Protocol, Socket,
8};
9use nodo::{
10    codelet::{NodeletId, Statistics},
11    prelude::DefaultStatus,
12};
13use serde::{Deserialize, Serialize};
14use std::{collections::HashMap, time::Instant};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RenderedStatus {
18    pub label: String,
19    pub status: DefaultStatus,
20}
21
22#[derive(Default, Debug, Clone, Serialize, Deserialize)]
23pub struct InspectorReport(HashMap<NodeletId, InspectorCodeletReport>);
24
25impl InspectorReport {
26    pub fn push(&mut self, id: NodeletId, entry: InspectorCodeletReport) {
27        if self.0.contains_key(&id) {
28            log::error!(
29                "Duplicated codelet id: {:?} (name='{}', other='{}'). This will be a hard error in the future.",
30                id,
31                entry.name,
32                self.0[&id].name
33            );
34        }
35        self.0.insert(id, entry);
36    }
37
38    pub fn extend(&mut self, other: InspectorReport) {
39        for (id, entry) in other.0 {
40            self.push(id, entry);
41        }
42    }
43
44    pub fn contains_key(&self, nid: NodeletId) -> bool {
45        self.0.contains_key(&nid)
46    }
47
48    pub fn into_vec(self) -> Vec<(NodeletId, InspectorCodeletReport)> {
49        self.0.into_iter().collect()
50    }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct InspectorCodeletReport {
55    pub sequence: String,
56    pub name: String,
57    pub typename: String,
58    pub status: Option<RenderedStatus>,
59    pub statistics: Statistics,
60}
61
62/// The server is running in the nodo runtime and publishes reports
63pub struct InspectorServer {
64    socket: Socket,
65}
66
67impl InspectorServer {
68    pub fn open(address: &str) -> Result<Self> {
69        log::info!("Opening Inspector PUB socket at '{}'..", address);
70
71        let socket = Socket::new(Protocol::Pub0)?;
72
73        socket.pipe_notify(move |_, ev| {
74            log::trace!("pipe_notify: {ev:?}");
75        })?;
76
77        socket.listen(address)?;
78
79        Ok(Self { socket })
80    }
81
82    pub fn send_report(&self, report: InspectorReport) -> Result<()> {
83        let buffer = bincode::serialize(&report)?;
84        let compressed = compress_prepend_size(&buffer);
85        self.socket.send(&compressed).map_err(|(_, err)| err)?;
86        Ok(())
87    }
88}
89
90/// The client is running in the report viewer and receives reports
91pub struct InspectorClient {
92    socket: Socket,
93    datarate: DatarateEstimation,
94}
95
96impl InspectorClient {
97    pub fn dial(address: &str) -> Result<Self> {
98        log::info!("Opening Inspector SUB socket at '{}'..", address);
99
100        let socket = Socket::new(Protocol::Sub0)?;
101
102        socket.pipe_notify(move |_, ev| {
103            log::trace!("pipe_notify: {ev:?}");
104        })?;
105
106        socket.dial_async(address)?;
107
108        // subscribe to all topics
109        socket.set_opt::<Subscribe>(vec![])?;
110
111        Ok(Self {
112            socket,
113            datarate: DatarateEstimation::default(),
114        })
115    }
116
117    pub fn try_recv_report(&mut self) -> Result<Option<InspectorReport>> {
118        let mut maybe_buff = None;
119        loop {
120            match self.socket.try_recv() {
121                Ok(buff) => {
122                    self.datarate.push(buff.len() as u64);
123                    maybe_buff = Some(buff);
124                }
125                Err(nng::Error::TryAgain) => break,
126                Err(err) => return Err(err)?,
127            }
128        }
129
130        if let Some(buff) = maybe_buff {
131            let uncompressed = decompress_size_prepended(&buff)?;
132            Ok(Some(bincode::deserialize(&uncompressed)?))
133        } else {
134            Ok(None)
135        }
136    }
137
138    pub fn datarate(&self) -> f64 {
139        self.datarate.datarate()
140    }
141}
142
143#[derive(Default)]
144pub struct DatarateEstimation {
145    total_bytes_received: u64,
146    datarate: f64,
147    last_step: Option<Instant>,
148    bytes_since_last_step: u64,
149}
150
151impl DatarateEstimation {
152    pub fn push(&mut self, len: u64) {
153        self.bytes_since_last_step += len;
154        self.total_bytes_received += len;
155
156        let now = Instant::now();
157        if let Some(prev) = self.last_step {
158            let dt = (now - prev).as_secs_f64();
159            if dt > 3.0 {
160                self.last_step = Some(now);
161                self.datarate =
162                    0.2 * self.datarate + 0.8 * (self.bytes_since_last_step as f64) / dt;
163                self.bytes_since_last_step = 0;
164            }
165        } else {
166            self.last_step = Some(now);
167        }
168    }
169
170    /// Datarate in bytes/s
171    pub fn datarate(&self) -> f64 {
172        self.datarate
173    }
174}