nym_statistics_common/clients/
packet_statistics.rs

1// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use super::ClientStatsEvents;
5use core::fmt;
6use std::{collections::VecDeque, time::Duration};
7
8use nym_metrics::{inc, inc_by};
9use serde::{Deserialize, Serialize};
10use si_scale::helpers::bibytes2;
11
12// When computing rates, we include snapshots that are up to this old. We set it to some odd number
13// a tad larger than an integer number of snapshot intervals, so that we don't have to worry about
14// threshold effects.
15// Also, set it larger than the packet report interval so that we don't miss notable singular events
16const RECORDING_WINDOW_MS: u64 = 2300;
17
18#[derive(PartialOrd, PartialEq, Clone, Copy, Debug)]
19struct Instant {
20    #[cfg(not(target_arch = "wasm32"))]
21    inner: std::time::Instant,
22
23    #[cfg(target_arch = "wasm32")]
24    inner: wasmtimer::std::Instant,
25}
26
27impl Instant {
28    #[cfg(not(target_arch = "wasm32"))]
29    fn inner(&self) -> &std::time::Instant {
30        &self.inner
31    }
32
33    #[cfg(target_arch = "wasm32")]
34    fn inner(&self) -> &wasmtimer::std::Instant {
35        &self.inner
36    }
37
38    #[cfg(not(target_arch = "wasm32"))]
39    fn now() -> Self {
40        Instant {
41            inner: std::time::Instant::now(),
42        }
43    }
44
45    #[cfg(target_arch = "wasm32")]
46    fn now() -> Self {
47        Instant {
48            inner: wasmtimer::std::Instant::now(),
49        }
50    }
51
52    fn checked_sub(&self, duration: Duration) -> Option<Instant> {
53        self.inner()
54            .checked_sub(duration)
55            .map(|inner| Instant { inner })
56    }
57
58    fn duration_since(&self, earlier: &Instant) -> Duration {
59        self.inner.duration_since(*earlier.inner())
60    }
61}
62
63#[derive(Default, Debug, Clone, Serialize, Deserialize)]
64pub(crate) struct PacketStatistics {
65    // Sent
66    real_packets_sent: u64,
67    real_packets_sent_size: usize,
68    cover_packets_sent: u64,
69    cover_packets_sent_size: usize,
70
71    // Received
72    real_packets_received: u64,
73    real_packets_received_size: usize,
74    cover_packets_received: u64,
75    cover_packets_received_size: usize,
76
77    // Acks
78    total_acks_received: u64,
79    total_acks_received_size: usize,
80    real_acks_received: u64,
81    real_acks_received_size: usize,
82    cover_acks_received: u64,
83    cover_acks_received_size: usize,
84
85    // Types of packets queued
86    // TODO: track the type sent instead
87    real_packets_queued: u64,
88    retransmissions_queued: u64,
89    reply_surbs_queued: u64,
90    additional_reply_surbs_queued: u64,
91}
92
93impl PacketStatistics {
94    fn handle(&mut self, event: PacketStatisticsEvent) {
95        match event {
96            PacketStatisticsEvent::RealPacketSent(packet_size) => {
97                self.real_packets_sent += 1;
98                self.real_packets_sent_size += packet_size;
99                inc!("real_packets_sent");
100                inc_by!("real_packets_sent_size", packet_size);
101            }
102            PacketStatisticsEvent::CoverPacketSent(packet_size) => {
103                self.cover_packets_sent += 1;
104                self.cover_packets_sent_size += packet_size;
105                inc!("cover_packets_sent");
106                inc_by!("cover_packets_sent_size", packet_size);
107            }
108            PacketStatisticsEvent::RealPacketReceived(packet_size) => {
109                self.real_packets_received += 1;
110                self.real_packets_received_size += packet_size;
111                inc!("real_packets_received");
112                inc_by!("real_packets_received_size", packet_size);
113            }
114            PacketStatisticsEvent::CoverPacketReceived(packet_size) => {
115                self.cover_packets_received += 1;
116                self.cover_packets_received_size += packet_size;
117                inc!("cover_packets_received");
118                inc_by!("cover_packets_received_size", packet_size);
119            }
120            PacketStatisticsEvent::AckReceived(packet_size) => {
121                self.total_acks_received += 1;
122                self.total_acks_received_size += packet_size;
123                inc!("total_acks_received");
124                inc_by!("total_acks_received_size", packet_size);
125            }
126            PacketStatisticsEvent::RealAckReceived(packet_size) => {
127                self.real_acks_received += 1;
128                self.real_acks_received_size += packet_size;
129                inc!("real_acks_received");
130                inc_by!("real_acks_received_size", packet_size);
131            }
132            PacketStatisticsEvent::CoverAckReceived(packet_size) => {
133                self.cover_acks_received += 1;
134                self.cover_acks_received_size += packet_size;
135                inc!("cover_acks_received");
136                inc_by!("cover_acks_received_size", packet_size);
137            }
138            PacketStatisticsEvent::RealPacketQueued => {
139                self.real_packets_queued += 1;
140                inc!("real_packets_queued");
141            }
142            PacketStatisticsEvent::RetransmissionQueued => {
143                self.retransmissions_queued += 1;
144                inc!("retransmissions_queued");
145            }
146            PacketStatisticsEvent::ReplySurbRequestQueued => {
147                self.reply_surbs_queued += 1;
148                inc!("reply_surbs_queued");
149            }
150            PacketStatisticsEvent::AdditionalReplySurbRequestQueued => {
151                self.additional_reply_surbs_queued += 1;
152                inc!("additional_reply_surbs_queued");
153            }
154        }
155    }
156
157    fn summary(&self) -> (String, String) {
158        (
159            format!(
160                "packets sent: {} (real: {}, cover: {}, retransmissions: {})",
161                self.real_packets_sent + self.cover_packets_sent,
162                self.real_packets_sent,
163                self.cover_packets_sent,
164                self.retransmissions_queued,
165            ),
166            format!(
167                "packets received: {}, (real: {}, cover: {}, acks: {}, acks for cover: {})",
168                self.real_packets_received + self.cover_packets_received,
169                self.real_packets_received,
170                self.cover_packets_received,
171                self.real_acks_received,
172                self.cover_acks_received,
173            ),
174        )
175    }
176}
177
178impl std::ops::Sub for PacketStatistics {
179    type Output = Self;
180
181    fn sub(self, rhs: Self) -> Self::Output {
182        Self {
183            real_packets_sent: self.real_packets_sent - rhs.real_packets_sent,
184            real_packets_sent_size: self.real_packets_sent_size - rhs.real_packets_sent_size,
185            cover_packets_sent: self.cover_packets_sent - rhs.cover_packets_sent,
186            cover_packets_sent_size: self.cover_packets_sent_size - rhs.cover_packets_sent_size,
187
188            real_packets_received: self.real_packets_received - rhs.real_packets_received,
189            real_packets_received_size: self.real_packets_received_size
190                - rhs.real_packets_received_size,
191            cover_packets_received: self.cover_packets_received - rhs.cover_packets_received,
192            cover_packets_received_size: self.cover_packets_received_size
193                - rhs.cover_packets_received_size,
194
195            total_acks_received: self.total_acks_received - rhs.total_acks_received,
196            total_acks_received_size: self.total_acks_received_size - rhs.total_acks_received_size,
197            real_acks_received: self.real_acks_received - rhs.real_acks_received,
198            real_acks_received_size: self.real_acks_received_size - rhs.real_acks_received_size,
199            cover_acks_received: self.cover_acks_received - rhs.cover_acks_received,
200            cover_acks_received_size: self.cover_acks_received_size - rhs.cover_acks_received_size,
201
202            real_packets_queued: self.real_packets_queued - rhs.real_packets_queued,
203            retransmissions_queued: self.retransmissions_queued - rhs.retransmissions_queued,
204            reply_surbs_queued: self.reply_surbs_queued - rhs.reply_surbs_queued,
205            additional_reply_surbs_queued: self.additional_reply_surbs_queued
206                - rhs.additional_reply_surbs_queued,
207        }
208    }
209}
210
211pub struct MixnetBandwidthStatisticsEvent {
212    pub rates: PacketRates,
213}
214
215impl MixnetBandwidthStatisticsEvent {
216    pub fn new(rates: PacketRates) -> Self {
217        Self { rates }
218    }
219}
220
221impl nym_task::TaskStatusEvent for MixnetBandwidthStatisticsEvent {
222    fn as_any(&self) -> &dyn std::any::Any {
223        self
224    }
225}
226
227impl fmt::Display for MixnetBandwidthStatisticsEvent {
228    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229        write!(f, "{}", self.rates.summary())
230    }
231}
232
233#[derive(Debug, Clone)]
234pub struct PacketRates {
235    pub real_packets_sent: f64,
236    pub real_packets_sent_size: f64,
237    pub cover_packets_sent: f64,
238    pub cover_packets_sent_size: f64,
239
240    pub real_packets_received: f64,
241    pub real_packets_received_size: f64,
242    pub cover_packets_received: f64,
243    pub cover_packets_received_size: f64,
244
245    pub total_acks_received: f64,
246    pub total_acks_received_size: f64,
247    pub real_acks_received: f64,
248    pub real_acks_received_size: f64,
249    pub cover_acks_received: f64,
250    pub cover_acks_received_size: f64,
251
252    pub real_packets_queued: f64,
253    pub retransmissions_queued: f64,
254    pub reply_surbs_queued: f64,
255    pub additional_reply_surbs_queued: f64,
256}
257
258impl fmt::Display for PacketRates {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        write!(
261            f,
262            "down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
263            bibytes2(self.real_packets_received_size),
264            bibytes2(self.real_packets_sent_size),
265            bibytes2(self.cover_packets_received_size),
266            bibytes2(self.cover_packets_sent_size),
267        )
268    }
269}
270
271impl From<PacketStatistics> for PacketRates {
272    fn from(stats: PacketStatistics) -> Self {
273        Self {
274            real_packets_sent: stats.real_packets_sent as f64,
275            real_packets_sent_size: stats.real_packets_sent_size as f64,
276            cover_packets_sent: stats.cover_packets_sent as f64,
277            cover_packets_sent_size: stats.cover_packets_sent_size as f64,
278
279            real_packets_received: stats.real_packets_received as f64,
280            real_packets_received_size: stats.real_packets_received_size as f64,
281            cover_packets_received: stats.cover_packets_received as f64,
282            cover_packets_received_size: stats.cover_packets_received_size as f64,
283
284            total_acks_received: stats.total_acks_received as f64,
285            total_acks_received_size: stats.total_acks_received_size as f64,
286            real_acks_received: stats.real_acks_received as f64,
287            real_acks_received_size: stats.real_acks_received_size as f64,
288            cover_acks_received: stats.cover_acks_received as f64,
289            cover_acks_received_size: stats.cover_acks_received_size as f64,
290
291            real_packets_queued: stats.real_packets_queued as f64,
292            retransmissions_queued: stats.retransmissions_queued as f64,
293            reply_surbs_queued: stats.reply_surbs_queued as f64,
294            additional_reply_surbs_queued: stats.additional_reply_surbs_queued as f64,
295        }
296    }
297}
298
299impl std::ops::Sub for PacketRates {
300    type Output = Self;
301
302    fn sub(self, rhs: Self) -> Self::Output {
303        Self {
304            real_packets_sent: self.real_packets_sent - rhs.real_packets_sent,
305            real_packets_sent_size: self.real_packets_sent_size - rhs.real_packets_sent_size,
306            cover_packets_sent: self.cover_packets_sent - rhs.cover_packets_sent,
307            cover_packets_sent_size: self.cover_packets_sent_size - rhs.cover_packets_sent_size,
308
309            real_packets_received: self.real_packets_received - rhs.real_packets_received,
310            real_packets_received_size: self.real_packets_received_size
311                - rhs.real_packets_received_size,
312            cover_packets_received: self.cover_packets_received - rhs.cover_packets_received,
313            cover_packets_received_size: self.cover_packets_received_size
314                - rhs.cover_packets_received_size,
315
316            total_acks_received: self.total_acks_received - rhs.total_acks_received,
317            total_acks_received_size: self.total_acks_received_size - rhs.total_acks_received_size,
318            real_acks_received: self.real_acks_received - rhs.real_acks_received,
319            real_acks_received_size: self.real_acks_received_size - rhs.real_acks_received_size,
320            cover_acks_received: self.cover_acks_received - rhs.cover_acks_received,
321            cover_acks_received_size: self.cover_acks_received_size - rhs.cover_acks_received_size,
322
323            real_packets_queued: self.real_packets_queued - rhs.real_packets_queued,
324            retransmissions_queued: self.retransmissions_queued - rhs.retransmissions_queued,
325            reply_surbs_queued: self.reply_surbs_queued - rhs.reply_surbs_queued,
326            additional_reply_surbs_queued: self.additional_reply_surbs_queued
327                - rhs.additional_reply_surbs_queued,
328        }
329    }
330}
331
332impl std::ops::Div<f64> for PacketRates {
333    type Output = Self;
334
335    fn div(self, rhs: f64) -> Self::Output {
336        Self {
337            real_packets_sent: self.real_packets_sent / rhs,
338            real_packets_sent_size: self.real_packets_sent_size / rhs,
339            cover_packets_sent: self.cover_packets_sent / rhs,
340            cover_packets_sent_size: self.cover_packets_sent_size / rhs,
341
342            real_packets_received: self.real_packets_received / rhs,
343            real_packets_received_size: self.real_packets_received_size / rhs,
344            cover_packets_received: self.cover_packets_received / rhs,
345            cover_packets_received_size: self.cover_packets_received_size / rhs,
346
347            total_acks_received: self.total_acks_received / rhs,
348            total_acks_received_size: self.total_acks_received_size / rhs,
349            real_acks_received: self.real_acks_received / rhs,
350            real_acks_received_size: self.real_acks_received_size / rhs,
351            cover_acks_received: self.cover_acks_received / rhs,
352            cover_acks_received_size: self.cover_acks_received_size / rhs,
353
354            real_packets_queued: self.real_packets_queued / rhs,
355            retransmissions_queued: self.retransmissions_queued / rhs,
356            reply_surbs_queued: self.reply_surbs_queued / rhs,
357            additional_reply_surbs_queued: self.additional_reply_surbs_queued / rhs,
358        }
359    }
360}
361
362impl PacketRates {
363    fn summary(&self) -> String {
364        format!(
365            "down: {}/s, up: {}/s (cover down: {}/s, cover up: {}/s)",
366            bibytes2(self.real_packets_received_size),
367            bibytes2(self.real_packets_sent_size),
368            bibytes2(self.cover_packets_received_size),
369            bibytes2(self.cover_packets_sent_size),
370        )
371    }
372
373    fn detailed_summary(&self) -> String {
374        format!(
375            "RX: {:.1} mixpkt/s, {}/s (real: {}/s, acks: {}/s), TX: {:.1} mixpkt/s, {}/s (real: {}/s)",
376            self.real_packets_received + self.cover_packets_received,
377            bibytes2(self.real_packets_received_size + self.cover_packets_received_size),
378            bibytes2(self.real_packets_received_size),
379            bibytes2(self.total_acks_received_size),
380            self.real_packets_sent + self.cover_packets_sent,
381            bibytes2(self.real_packets_sent_size + self.cover_packets_sent_size),
382            bibytes2(self.real_packets_sent_size),
383        )
384    }
385}
386
387/// Event Space used for counting the Packet types used in a connection.
388#[derive(Debug)]
389pub enum PacketStatisticsEvent {
390    /// The real packets sent. Recall that acks are sent by the gateway, so it's not included here.
391    RealPacketSent(usize),
392    /// The cover packets sent
393    CoverPacketSent(usize),
394
395    /// Real packets received
396    RealPacketReceived(usize),
397    /// Cover packets received
398    CoverPacketReceived(usize),
399
400    /// Ack of any type received. This is mostly used as a consistency check, and should be the sum
401    /// of real and cover acks received.
402    AckReceived(usize),
403    /// Out of the total acks received, this is the subset of those that were real
404    RealAckReceived(usize),
405    /// Out of the total acks received, this is the subset of those that were for cover traffic
406    CoverAckReceived(usize),
407
408    /// Types of packets queued
409    RealPacketQueued,
410    /// Types of packets queued
411    RetransmissionQueued,
412    /// Types of packets queued
413    ReplySurbRequestQueued,
414    /// Types of packets queued
415    AdditionalReplySurbRequestQueued,
416}
417
418impl From<PacketStatisticsEvent> for ClientStatsEvents {
419    fn from(event: PacketStatisticsEvent) -> ClientStatsEvents {
420        ClientStatsEvents::PacketStatistics(event)
421    }
422}
423
424/// Statistics tracking for Packet based I/O
425#[derive(Default)]
426pub struct PacketStatisticsControl {
427    // Keep track of packet statistics over time
428    stats: PacketStatistics,
429
430    // We keep snapshots of the statistics over time so we can compute rates, and also keeping the
431    // full history allows for some more fancy averaging if we want to do that.
432    history: VecDeque<(Instant, PacketStatistics)>,
433
434    // Keep previous rates so that we can detect notable events
435    rates: VecDeque<(Instant, PacketRates)>,
436}
437
438impl PacketStatisticsControl {
439    pub(crate) fn handle_event(&mut self, event: PacketStatisticsEvent) {
440        self.stats.handle(event)
441    }
442
443    pub(crate) fn snapshot(&mut self) {
444        self.update_history();
445        self.update_rates();
446    }
447
448    pub(crate) fn report(&self) -> PacketStatistics {
449        self.stats.clone()
450    }
451
452    pub(crate) fn local_report(&mut self) {
453        let _rates = self.report_rates();
454        self.check_for_notable_events();
455        self.report_counters();
456
457        // leave the code commented in case somebody wanted to restore this logic with a different channel
458        // // Report our current bandwidth used to e.g a GUI client
459        // if let Some(rates) = rates {
460        //     task_client.send_status_msg(Box::new(MixnetBandwidthStatisticsEvent::new(rates)));
461        // }
462    }
463
464    // Add the current stats to the history, and remove old ones.
465    fn update_history(&mut self) {
466        // Update latest
467        self.history.push_back((Instant::now(), self.stats.clone()));
468
469        // Filter out old ones
470        let now = Instant::now();
471        let recording_window = Instant::now()
472            .checked_sub(Duration::from_millis(RECORDING_WINDOW_MS))
473            .unwrap_or(now);
474
475        while self
476            .history
477            .front()
478            .is_some_and(|&(t, _)| t < recording_window)
479        {
480            self.history.pop_front();
481        }
482    }
483
484    fn compute_rates(&self) -> Option<PacketRates> {
485        // NOTE: consider changing this to compute rates over the history instead of using current
486        // stats. Currently it should not make much of a difference since we call this just after
487        // updating the history, but it seems like it could be more internally consistent to do it
488        // that way.
489
490        // Do basic averaging over the entire history, which just uses the first and last
491        if let Some((start, start_stats)) = self.history.front() {
492            let duration_secs = Instant::now().duration_since(start).as_secs_f64();
493            let delta = self.stats.clone() - start_stats.clone();
494            let rates = PacketRates::from(delta) / duration_secs;
495            Some(rates)
496        } else {
497            None
498        }
499    }
500
501    fn update_rates(&mut self) {
502        // Update latest
503        if let Some(rates) = self.compute_rates() {
504            self.rates.push_back((Instant::now(), rates));
505        }
506
507        // Filter out old ones
508        let now = Instant::now();
509        let recording_window = now
510            .checked_sub(Duration::from_millis(RECORDING_WINDOW_MS))
511            .unwrap_or(now);
512        while self
513            .rates
514            .front()
515            .is_some_and(|&(t, _)| t < recording_window)
516        {
517            self.rates.pop_front();
518        }
519    }
520
521    fn report_rates(&self) -> Option<PacketRates> {
522        if let Some((_, rates)) = self.rates.back() {
523            log::debug!("{}", rates.summary());
524            log::debug!("{}", rates.detailed_summary());
525            return Some(rates.clone());
526        }
527        None
528    }
529
530    fn report_counters(&self) {
531        log::trace!("packet statistics: {:?}", &self.stats);
532        let (summary_sent, summary_recv) = self.stats.summary();
533        log::debug!("{summary_sent}");
534        log::debug!("{summary_recv}");
535    }
536
537    fn check_for_notable_events(&self) {
538        let Some((_, latest_rates)) = self.rates.back() else {
539            return;
540        };
541
542        // If we get a burst of retransmissions
543        // TODO: consider making this the number of retransmissions since last report instead.
544        if latest_rates.retransmissions_queued > 0.0 {
545            log::debug!(
546                "retransmissions: {:.2} pkt/s",
547                latest_rates.retransmissions_queued
548            );
549
550            // Check what the number of retransmissions was during the recording window
551            if let Some((_, start_stats)) = self.history.front() {
552                let delta = self.stats.clone() - start_stats.clone();
553                log::debug!(
554                    "mix packet retransmissions/real mix packets: {}/{}",
555                    delta.retransmissions_queued,
556                    delta.real_packets_queued,
557                );
558            } else {
559                log::warn!("Unable to check retransmissions during recording window");
560            }
561        }
562
563        // IDEA: if there is a burst of acks, that could indicate tokio task starvation.
564    }
565}