1use super::ClientStatsEvents;
5use core::fmt;
6use std::{collections::VecDeque, time::Duration};
7
8use nym_metrics::{inc, inc_by};
9use serde::{Deserialize, Serialize};
10use si_scale::helpers::bibytes2;
11
12const RECORDING_WINDOW_MS: u64 = 2300;
17
18#[derive(PartialOrd, PartialEq, Clone, Copy, Debug)]
19struct Instant {
20 #[cfg(not(target_arch = "wasm32"))]
21 inner: std::time::Instant,
22
23 #[cfg(target_arch = "wasm32")]
24 inner: wasmtimer::std::Instant,
25}
26
27impl Instant {
28 #[cfg(not(target_arch = "wasm32"))]
29 fn inner(&self) -> &std::time::Instant {
30 &self.inner
31 }
32
33 #[cfg(target_arch = "wasm32")]
34 fn inner(&self) -> &wasmtimer::std::Instant {
35 &self.inner
36 }
37
38 #[cfg(not(target_arch = "wasm32"))]
39 fn now() -> Self {
40 Instant {
41 inner: std::time::Instant::now(),
42 }
43 }
44
45 #[cfg(target_arch = "wasm32")]
46 fn now() -> Self {
47 Instant {
48 inner: wasmtimer::std::Instant::now(),
49 }
50 }
51
52 fn checked_sub(&self, duration: Duration) -> Option<Instant> {
53 self.inner()
54 .checked_sub(duration)
55 .map(|inner| Instant { inner })
56 }
57
58 fn duration_since(&self, earlier: &Instant) -> Duration {
59 self.inner.duration_since(*earlier.inner())
60 }
61}
62
63#[derive(Default, Debug, Clone, Serialize, Deserialize)]
64pub(crate) struct PacketStatistics {
65 real_packets_sent: u64,
67 real_packets_sent_size: usize,
68 cover_packets_sent: u64,
69 cover_packets_sent_size: usize,
70
71 real_packets_received: u64,
73 real_packets_received_size: usize,
74 cover_packets_received: u64,
75 cover_packets_received_size: usize,
76
77 total_acks_received: u64,
79 total_acks_received_size: usize,
80 real_acks_received: u64,
81 real_acks_received_size: usize,
82 cover_acks_received: u64,
83 cover_acks_received_size: usize,
84
85 real_packets_queued: u64,
88 retransmissions_queued: u64,
89 reply_surbs_queued: u64,
90 additional_reply_surbs_queued: u64,
91}
92
93impl PacketStatistics {
94 fn handle(&mut self, event: PacketStatisticsEvent) {
95 match event {
96 PacketStatisticsEvent::RealPacketSent(packet_size) => {
97 self.real_packets_sent += 1;
98 self.real_packets_sent_size += packet_size;
99 inc!("real_packets_sent");
100 inc_by!("real_packets_sent_size", packet_size);
101 }
102 PacketStatisticsEvent::CoverPacketSent(packet_size) => {
103 self.cover_packets_sent += 1;
104 self.cover_packets_sent_size += packet_size;
105 inc!("cover_packets_sent");
106 inc_by!("cover_packets_sent_size", packet_size);
107 }
108 PacketStatisticsEvent::RealPacketReceived(packet_size) => {
109 self.real_packets_received += 1;
110 self.real_packets_received_size += packet_size;
111 inc!("real_packets_received");
112 inc_by!("real_packets_received_size", packet_size);
113 }
114 PacketStatisticsEvent::CoverPacketReceived(packet_size) => {
115 self.cover_packets_received += 1;
116 self.cover_packets_received_size += packet_size;
117 inc!("cover_packets_received");
118 inc_by!("cover_packets_received_size", packet_size);
119 }
120 PacketStatisticsEvent::AckReceived(packet_size) => {
121 self.total_acks_received += 1;
122 self.total_acks_received_size += packet_size;
123 inc!("total_acks_received");
124 inc_by!("total_acks_received_size", packet_size);
125 }
126 PacketStatisticsEvent::RealAckReceived(packet_size) => {
127 self.real_acks_received += 1;
128 self.real_acks_received_size += packet_size;
129 inc!("real_acks_received");
130 inc_by!("real_acks_received_size", packet_size);
131 }
132 PacketStatisticsEvent::CoverAckReceived(packet_size) => {
133 self.cover_acks_received += 1;
134 self.cover_acks_received_size += packet_size;
135 inc!("cover_acks_received");
136 inc_by!("cover_acks_received_size", packet_size);
137 }
138 PacketStatisticsEvent::RealPacketQueued => {
139 self.real_packets_queued += 1;
140 inc!("real_packets_queued");
141 }
142 PacketStatisticsEvent::RetransmissionQueued => {
143 self.retransmissions_queued += 1;
144 inc!("retransmissions_queued");
145 }
146 PacketStatisticsEvent::ReplySurbRequestQueued => {
147 self.reply_surbs_queued += 1;
148 inc!("reply_surbs_queued");
149 }
150 PacketStatisticsEvent::AdditionalReplySurbRequestQueued => {
151 self.additional_reply_surbs_queued += 1;
152 inc!("additional_reply_surbs_queued");
153 }
154 }
155 }
156
157 fn summary(&self) -> (String, String) {
158 (
159 format!(
160 "packets sent: {} (real: {}, cover: {}, retransmissions: {})",
161 self.real_packets_sent + self.cover_packets_sent,
162 self.real_packets_sent,
163 self.cover_packets_sent,
164 self.retransmissions_queued,
165 ),
166 format!(
167 "packets received: {}, (real: {}, cover: {}, acks: {}, acks for cover: {})",
168 self.real_packets_received + self.cover_packets_received,
169 self.real_packets_received,
170 self.cover_packets_received,
171 self.real_acks_received,
172 self.cover_acks_received,
173 ),
174 )
175 }
176}
177
178impl std::ops::Sub for PacketStatistics {
179 type Output = Self;
180
181 fn sub(self, rhs: Self) -> Self::Output {
182 Self {
183 real_packets_sent: self.real_packets_sent - rhs.real_packets_sent,
184 real_packets_sent_size: self.real_packets_sent_size - rhs.real_packets_sent_size,
185 cover_packets_sent: self.cover_packets_sent - rhs.cover_packets_sent,
186 cover_packets_sent_size: self.cover_packets_sent_size - rhs.cover_packets_sent_size,
187
188 real_packets_received: self.real_packets_received - rhs.real_packets_received,
189 real_packets_received_size: self.real_packets_received_size
190 - rhs.real_packets_received_size,
191 cover_packets_received: self.cover_packets_received - rhs.cover_packets_received,
192 cover_packets_received_size: self.cover_packets_received_size
193 - rhs.cover_packets_received_size,
194
195 total_acks_received: self.total_acks_received - rhs.total_acks_received,
196 total_acks_received_size: self.total_acks_received_size - rhs.total_acks_received_size,
197 real_acks_received: self.real_acks_received - rhs.real_acks_received,
198 real_acks_received_size: self.real_acks_received_size - rhs.real_acks_received_size,
199 cover_acks_received: self.cover_acks_received - rhs.cover_acks_received,
200 cover_acks_received_size: self.cover_acks_received_size - rhs.cover_acks_received_size,
201
202 real_packets_queued: self.real_packets_queued - rhs.real_packets_queued,
203 retransmissions_queued: self.retransmissions_queued - rhs.retransmissions_queued,
204 reply_surbs_queued: self.reply_surbs_queued - rhs.reply_surbs_queued,
205 additional_reply_surbs_queued: self.additional_reply_surbs_queued
206 - rhs.additional_reply_surbs_queued,
207 }
208 }
209}
210
211pub struct MixnetBandwidthStatisticsEvent {
212 pub rates: PacketRates,
213}
214
215impl MixnetBandwidthStatisticsEvent {
216 pub fn new(rates: PacketRates) -> Self {
217 Self { rates }
218 }
219}
220
221impl nym_task::TaskStatusEvent for MixnetBandwidthStatisticsEvent {
222 fn as_any(&self) -> &dyn std::any::Any {
223 self
224 }
225}
226
227impl fmt::Display for MixnetBandwidthStatisticsEvent {
228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229 write!(f, "{}", self.rates.summary())
230 }
231}
232
233#[derive(Debug, Clone)]
234pub struct PacketRates {
235 pub real_packets_sent: f64,
236 pub real_packets_sent_size: f64,
237 pub cover_packets_sent: f64,
238 pub cover_packets_sent_size: f64,
239
240 pub real_packets_received: f64,
241 pub real_packets_received_size: f64,
242 pub cover_packets_received: f64,
243 pub cover_packets_received_size: f64,
244
245 pub total_acks_received: f64,
246 pub total_acks_received_size: f64,
247 pub real_acks_received: f64,
248 pub real_acks_received_size: f64,
249 pub cover_acks_received: f64,
250 pub cover_acks_received_size: f64,
251
252 pub real_packets_queued: f64,
253 pub retransmissions_queued: f64,
254 pub reply_surbs_queued: f64,
255 pub additional_reply_surbs_queued: f64,
256}
257
258impl fmt::Display for PacketRates {
259 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260 write!(
261 f,
262 "down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
263 bibytes2(self.real_packets_received_size),
264 bibytes2(self.real_packets_sent_size),
265 bibytes2(self.cover_packets_received_size),
266 bibytes2(self.cover_packets_sent_size),
267 )
268 }
269}
270
271impl From<PacketStatistics> for PacketRates {
272 fn from(stats: PacketStatistics) -> Self {
273 Self {
274 real_packets_sent: stats.real_packets_sent as f64,
275 real_packets_sent_size: stats.real_packets_sent_size as f64,
276 cover_packets_sent: stats.cover_packets_sent as f64,
277 cover_packets_sent_size: stats.cover_packets_sent_size as f64,
278
279 real_packets_received: stats.real_packets_received as f64,
280 real_packets_received_size: stats.real_packets_received_size as f64,
281 cover_packets_received: stats.cover_packets_received as f64,
282 cover_packets_received_size: stats.cover_packets_received_size as f64,
283
284 total_acks_received: stats.total_acks_received as f64,
285 total_acks_received_size: stats.total_acks_received_size as f64,
286 real_acks_received: stats.real_acks_received as f64,
287 real_acks_received_size: stats.real_acks_received_size as f64,
288 cover_acks_received: stats.cover_acks_received as f64,
289 cover_acks_received_size: stats.cover_acks_received_size as f64,
290
291 real_packets_queued: stats.real_packets_queued as f64,
292 retransmissions_queued: stats.retransmissions_queued as f64,
293 reply_surbs_queued: stats.reply_surbs_queued as f64,
294 additional_reply_surbs_queued: stats.additional_reply_surbs_queued as f64,
295 }
296 }
297}
298
299impl std::ops::Sub for PacketRates {
300 type Output = Self;
301
302 fn sub(self, rhs: Self) -> Self::Output {
303 Self {
304 real_packets_sent: self.real_packets_sent - rhs.real_packets_sent,
305 real_packets_sent_size: self.real_packets_sent_size - rhs.real_packets_sent_size,
306 cover_packets_sent: self.cover_packets_sent - rhs.cover_packets_sent,
307 cover_packets_sent_size: self.cover_packets_sent_size - rhs.cover_packets_sent_size,
308
309 real_packets_received: self.real_packets_received - rhs.real_packets_received,
310 real_packets_received_size: self.real_packets_received_size
311 - rhs.real_packets_received_size,
312 cover_packets_received: self.cover_packets_received - rhs.cover_packets_received,
313 cover_packets_received_size: self.cover_packets_received_size
314 - rhs.cover_packets_received_size,
315
316 total_acks_received: self.total_acks_received - rhs.total_acks_received,
317 total_acks_received_size: self.total_acks_received_size - rhs.total_acks_received_size,
318 real_acks_received: self.real_acks_received - rhs.real_acks_received,
319 real_acks_received_size: self.real_acks_received_size - rhs.real_acks_received_size,
320 cover_acks_received: self.cover_acks_received - rhs.cover_acks_received,
321 cover_acks_received_size: self.cover_acks_received_size - rhs.cover_acks_received_size,
322
323 real_packets_queued: self.real_packets_queued - rhs.real_packets_queued,
324 retransmissions_queued: self.retransmissions_queued - rhs.retransmissions_queued,
325 reply_surbs_queued: self.reply_surbs_queued - rhs.reply_surbs_queued,
326 additional_reply_surbs_queued: self.additional_reply_surbs_queued
327 - rhs.additional_reply_surbs_queued,
328 }
329 }
330}
331
332impl std::ops::Div<f64> for PacketRates {
333 type Output = Self;
334
335 fn div(self, rhs: f64) -> Self::Output {
336 Self {
337 real_packets_sent: self.real_packets_sent / rhs,
338 real_packets_sent_size: self.real_packets_sent_size / rhs,
339 cover_packets_sent: self.cover_packets_sent / rhs,
340 cover_packets_sent_size: self.cover_packets_sent_size / rhs,
341
342 real_packets_received: self.real_packets_received / rhs,
343 real_packets_received_size: self.real_packets_received_size / rhs,
344 cover_packets_received: self.cover_packets_received / rhs,
345 cover_packets_received_size: self.cover_packets_received_size / rhs,
346
347 total_acks_received: self.total_acks_received / rhs,
348 total_acks_received_size: self.total_acks_received_size / rhs,
349 real_acks_received: self.real_acks_received / rhs,
350 real_acks_received_size: self.real_acks_received_size / rhs,
351 cover_acks_received: self.cover_acks_received / rhs,
352 cover_acks_received_size: self.cover_acks_received_size / rhs,
353
354 real_packets_queued: self.real_packets_queued / rhs,
355 retransmissions_queued: self.retransmissions_queued / rhs,
356 reply_surbs_queued: self.reply_surbs_queued / rhs,
357 additional_reply_surbs_queued: self.additional_reply_surbs_queued / rhs,
358 }
359 }
360}
361
362impl PacketRates {
363 fn summary(&self) -> String {
364 format!(
365 "down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
366 bibytes2(self.real_packets_received_size),
367 bibytes2(self.real_packets_sent_size),
368 bibytes2(self.cover_packets_received_size),
369 bibytes2(self.cover_packets_sent_size),
370 )
371 }
372
373 fn detailed_summary(&self) -> String {
374 format!(
375 "RX: {:.1} mixpkt/s, {}/s (real: {}/s, acks: {}/s), TX: {:.1} mixpkt/s, {}/s (real: {}/s)",
376 self.real_packets_received + self.cover_packets_received,
377 bibytes2(self.real_packets_received_size + self.cover_packets_received_size),
378 bibytes2(self.real_packets_received_size),
379 bibytes2(self.total_acks_received_size),
380 self.real_packets_sent + self.cover_packets_sent,
381 bibytes2(self.real_packets_sent_size + self.cover_packets_sent_size),
382 bibytes2(self.real_packets_sent_size),
383 )
384 }
385}
386
387#[derive(Debug)]
389pub enum PacketStatisticsEvent {
390 RealPacketSent(usize),
392 CoverPacketSent(usize),
394
395 RealPacketReceived(usize),
397 CoverPacketReceived(usize),
399
400 AckReceived(usize),
403 RealAckReceived(usize),
405 CoverAckReceived(usize),
407
408 RealPacketQueued,
410 RetransmissionQueued,
412 ReplySurbRequestQueued,
414 AdditionalReplySurbRequestQueued,
416}
417
418impl From<PacketStatisticsEvent> for ClientStatsEvents {
419 fn from(event: PacketStatisticsEvent) -> ClientStatsEvents {
420 ClientStatsEvents::PacketStatistics(event)
421 }
422}
423
424#[derive(Default)]
426pub struct PacketStatisticsControl {
427 stats: PacketStatistics,
429
430 history: VecDeque<(Instant, PacketStatistics)>,
433
434 rates: VecDeque<(Instant, PacketRates)>,
436}
437
438impl PacketStatisticsControl {
439 pub(crate) fn handle_event(&mut self, event: PacketStatisticsEvent) {
440 self.stats.handle(event)
441 }
442
443 pub(crate) fn snapshot(&mut self) {
444 self.update_history();
445 self.update_rates();
446 }
447
448 pub(crate) fn report(&self) -> PacketStatistics {
449 self.stats.clone()
450 }
451
452 pub(crate) fn local_report(&mut self) {
453 let _rates = self.report_rates();
454 self.check_for_notable_events();
455 self.report_counters();
456
457 }
463
464 fn update_history(&mut self) {
466 self.history.push_back((Instant::now(), self.stats.clone()));
468
469 let now = Instant::now();
471 let recording_window = Instant::now()
472 .checked_sub(Duration::from_millis(RECORDING_WINDOW_MS))
473 .unwrap_or(now);
474
475 while self
476 .history
477 .front()
478 .is_some_and(|&(t, _)| t < recording_window)
479 {
480 self.history.pop_front();
481 }
482 }
483
484 fn compute_rates(&self) -> Option<PacketRates> {
485 if let Some((start, start_stats)) = self.history.front() {
492 let duration_secs = Instant::now().duration_since(start).as_secs_f64();
493 let delta = self.stats.clone() - start_stats.clone();
494 let rates = PacketRates::from(delta) / duration_secs;
495 Some(rates)
496 } else {
497 None
498 }
499 }
500
501 fn update_rates(&mut self) {
502 if let Some(rates) = self.compute_rates() {
504 self.rates.push_back((Instant::now(), rates));
505 }
506
507 let now = Instant::now();
509 let recording_window = now
510 .checked_sub(Duration::from_millis(RECORDING_WINDOW_MS))
511 .unwrap_or(now);
512 while self
513 .rates
514 .front()
515 .is_some_and(|&(t, _)| t < recording_window)
516 {
517 self.rates.pop_front();
518 }
519 }
520
521 fn report_rates(&self) -> Option<PacketRates> {
522 if let Some((_, rates)) = self.rates.back() {
523 log::debug!("{}", rates.summary());
524 log::debug!("{}", rates.detailed_summary());
525 return Some(rates.clone());
526 }
527 None
528 }
529
530 fn report_counters(&self) {
531 log::trace!("packet statistics: {:?}", &self.stats);
532 let (summary_sent, summary_recv) = self.stats.summary();
533 log::debug!("{summary_sent}");
534 log::debug!("{summary_recv}");
535 }
536
537 fn check_for_notable_events(&self) {
538 let Some((_, latest_rates)) = self.rates.back() else {
539 return;
540 };
541
542 if latest_rates.retransmissions_queued > 0.0 {
545 log::debug!(
546 "retransmissions: {:.2} pkt/s",
547 latest_rates.retransmissions_queued
548 );
549
550 if let Some((_, start_stats)) = self.history.front() {
552 let delta = self.stats.clone() - start_stats.clone();
553 log::debug!(
554 "mix packet retransmissions/real mix packets: {}/{}",
555 delta.retransmissions_queued,
556 delta.real_packets_queued,
557 );
558 } else {
559 log::warn!("Unable to check retransmissions during recording window");
560 }
561 }
562
563 }
565}