Skip to main content

streaming_crypto/core_api/telemetry/
counters.rs

1// ### `src/telemetry/counters.rs`
2
3//! telemetry/counters.rs
4//! Mutable counters used during streaming pipelines.
5//!
6//! Summary: Collects frame counts and byte counts during encrypt/decrypt.
7//! Converted into immutable TelemetrySnapshot at pipeline end.
8//! FIXME: Use serde instead of bincode
9use bincode::{Encode, Decode};
10use std::{fmt, ops::AddAssign};
11
12/// Deterministic counters collected during stream processing
13#[derive(Default, Clone, Debug, Encode, Decode, PartialEq)]
14pub struct TelemetryCounters {
15    /// Consider One frame for each segment, the SegmentHeader
16    pub frames_header: u64,
17    pub frames_data: u64,
18    pub frames_digest: u64,
19    pub frames_terminator: u64,
20    pub bytes_plaintext: u64,
21    pub bytes_compressed: u64,
22    pub bytes_ciphertext: u64,
23    pub bytes_overhead: u64,   
24}
25
26impl TelemetryCounters {
27    pub fn from_ref(counters: &TelemetryCounters) -> Self {
28        counters.clone()
29    }
30
31    /// Record the stream header as overhead.
32    pub fn add_header(&mut self, header_len: usize) {
33        self.frames_header += 1;           // optional: count headers if we track them
34        self.bytes_overhead += header_len as u64;
35    }
36
37    /// Mark a digest frame processed.
38    /// - `frame_overhead_len`: total encoded length of the digest frame
39    pub fn add_digest(&mut self, frame_overhead_len: usize) {
40        self.frames_digest += 1;
41        self.bytes_overhead += frame_overhead_len as u64;
42    }
43
44    /// Mark a terminator frame processed.
45    /// - `frame_overhead_len`: total encoded length of the terminator frame
46    pub fn add_terminator(&mut self, frame_overhead_len: usize) {
47        self.frames_terminator += 1;
48        self.bytes_overhead += frame_overhead_len as u64;
49    }
50
51    // This avoids:
52    // * locks inside workers
53    // * atomics
54    // * false sharing
55    pub fn merge(&mut self, other: &TelemetryCounters) {
56        self.frames_header += other.frames_header;
57        self.frames_data += other.frames_data;
58        self.frames_terminator += other.frames_terminator;
59        self.frames_digest += other.frames_digest;
60
61        self.bytes_plaintext += other.bytes_plaintext;
62        self.bytes_compressed += other.bytes_compressed;
63        self.bytes_ciphertext += other.bytes_ciphertext;
64        self.bytes_overhead += other.bytes_overhead;
65    }
66}
67
68
69impl AddAssign for TelemetryCounters {
70    fn add_assign(&mut self, rhs: Self) {
71        self.frames_header      += rhs.frames_header;
72        self.frames_data        += rhs.frames_data;
73        self.frames_terminator  += rhs.frames_terminator;
74        self.frames_digest      += rhs.frames_digest;
75
76        self.bytes_plaintext    += rhs.bytes_plaintext;
77        self.bytes_compressed   += rhs.bytes_compressed;
78        self.bytes_ciphertext   += rhs.bytes_ciphertext;
79        self.bytes_overhead     += rhs.bytes_overhead;
80    }
81}
82
83impl fmt::Display for TelemetryCounters {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        writeln!(f, "=== Telemetry Counters Summary ===")?;
86        writeln!(f, "  frames_header: {}", self.frames_header)?;
87        writeln!(f, "  frames_data: {}", self.frames_data)?;
88        writeln!(f, "  frames_digest: {}", self.frames_digest)?;
89        writeln!(f, "  frames_terminator: {}", self.frames_terminator)?;
90        writeln!(f, "  bytes_plaintext: {}", self.bytes_plaintext)?;
91        writeln!(f, "  bytes_compressed: {}", self.bytes_compressed)?;
92        writeln!(f, "  bytes_ciphertext: {}", self.bytes_ciphertext)?;
93        writeln!(f, "  bytes_overhead: {}", self.bytes_overhead)
94    }
95}