nodo_runtime/
inspector.rs1use eyre::Result;
4use lz4_flex::{compress_prepend_size, decompress_size_prepended};
5use nng::{
6 options::{protocol::pubsub::Subscribe, Options},
7 Protocol, Socket,
8};
9use nodo::{
10 codelet::{NodeletId, Statistics},
11 prelude::DefaultStatus,
12};
13use serde::{Deserialize, Serialize};
14use std::{collections::HashMap, time::Instant};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct RenderedStatus {
18 pub label: String,
19 pub status: DefaultStatus,
20}
21
22#[derive(Default, Debug, Clone, Serialize, Deserialize)]
23pub struct InspectorReport(HashMap<NodeletId, InspectorCodeletReport>);
24
25impl InspectorReport {
26 pub fn push(&mut self, id: NodeletId, entry: InspectorCodeletReport) {
27 if self.0.contains_key(&id) {
28 log::error!(
29 "Duplicated codelet id: {:?} (name='{}', other='{}'). This will be a hard error in the future.",
30 id,
31 entry.name,
32 self.0[&id].name
33 );
34 }
35 self.0.insert(id, entry);
36 }
37
38 pub fn extend(&mut self, other: InspectorReport) {
39 for (id, entry) in other.0 {
40 self.push(id, entry);
41 }
42 }
43
44 pub fn contains_key(&self, nid: NodeletId) -> bool {
45 self.0.contains_key(&nid)
46 }
47
48 pub fn into_vec(self) -> Vec<(NodeletId, InspectorCodeletReport)> {
49 self.0.into_iter().collect()
50 }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct InspectorCodeletReport {
55 pub sequence: String,
56 pub name: String,
57 pub typename: String,
58 pub status: Option<RenderedStatus>,
59 pub statistics: Statistics,
60}
61
62pub struct InspectorServer {
64 socket: Socket,
65}
66
67impl InspectorServer {
68 pub fn open(address: &str) -> Result<Self> {
69 log::info!("Opening Inspector PUB socket at '{}'..", address);
70
71 let socket = Socket::new(Protocol::Pub0)?;
72
73 socket.pipe_notify(move |_, ev| {
74 log::trace!("pipe_notify: {ev:?}");
75 })?;
76
77 socket.listen(address)?;
78
79 Ok(Self { socket })
80 }
81
82 pub fn send_report(&self, report: InspectorReport) -> Result<()> {
83 let buffer = bincode::serialize(&report)?;
84 let compressed = compress_prepend_size(&buffer);
85 self.socket.send(&compressed).map_err(|(_, err)| err)?;
86 Ok(())
87 }
88}
89
90pub struct InspectorClient {
92 socket: Socket,
93 datarate: DatarateEstimation,
94}
95
96impl InspectorClient {
97 pub fn dial(address: &str) -> Result<Self> {
98 log::info!("Opening Inspector SUB socket at '{}'..", address);
99
100 let socket = Socket::new(Protocol::Sub0)?;
101
102 socket.pipe_notify(move |_, ev| {
103 log::trace!("pipe_notify: {ev:?}");
104 })?;
105
106 socket.dial_async(address)?;
107
108 socket.set_opt::<Subscribe>(vec![])?;
110
111 Ok(Self {
112 socket,
113 datarate: DatarateEstimation::default(),
114 })
115 }
116
117 pub fn try_recv_report(&mut self) -> Result<Option<InspectorReport>> {
118 let mut maybe_buff = None;
119 loop {
120 match self.socket.try_recv() {
121 Ok(buff) => {
122 self.datarate.push(buff.len() as u64);
123 maybe_buff = Some(buff);
124 }
125 Err(nng::Error::TryAgain) => break,
126 Err(err) => return Err(err)?,
127 }
128 }
129
130 if let Some(buff) = maybe_buff {
131 let uncompressed = decompress_size_prepended(&buff)?;
132 Ok(Some(bincode::deserialize(&uncompressed)?))
133 } else {
134 Ok(None)
135 }
136 }
137
138 pub fn datarate(&self) -> f64 {
139 self.datarate.datarate()
140 }
141}
142
143#[derive(Default)]
144pub struct DatarateEstimation {
145 total_bytes_received: u64,
146 datarate: f64,
147 last_step: Option<Instant>,
148 bytes_since_last_step: u64,
149}
150
151impl DatarateEstimation {
152 pub fn push(&mut self, len: u64) {
153 self.bytes_since_last_step += len;
154 self.total_bytes_received += len;
155
156 let now = Instant::now();
157 if let Some(prev) = self.last_step {
158 let dt = (now - prev).as_secs_f64();
159 if dt > 3.0 {
160 self.last_step = Some(now);
161 self.datarate =
162 0.2 * self.datarate + 0.8 * (self.bytes_since_last_step as f64) / dt;
163 self.bytes_since_last_step = 0;
164 }
165 } else {
166 self.last_step = Some(now);
167 }
168 }
169
170 pub fn datarate(&self) -> f64 {
172 self.datarate
173 }
174}