Skip to main content

streaming_crypto/core_api/telemetry/
counters.rs

1// ### `src/telemetry/counters.rs`
2
3//! telemetry/counters.rs
4//! Mutable counters used during streaming pipelines.
5//!
6//! Summary: Collects frame counts and byte counts during encrypt/decrypt.
7//! Converted into immutable TelemetrySnapshot at pipeline end.
8use bincode::{Encode, Decode};
9use std::{fmt, ops::AddAssign};
10
11/// Deterministic counters collected during stream processing
12#[derive(Default, Clone, Debug, Encode, Decode, PartialEq)]
13pub struct TelemetryCounters {
14    /// Consider One frame for each segment, the SegmentHeader
15    pub frames_header: u64,
16    pub frames_data: u64,
17    pub frames_digest: u64,
18    pub frames_terminator: u64,
19    pub bytes_plaintext: u64,
20    pub bytes_compressed: u64,
21    pub bytes_ciphertext: u64,
22    pub bytes_overhead: u64,   
23}
24
25impl TelemetryCounters {
26    pub fn from_ref(counters: &TelemetryCounters) -> Self {
27        counters.clone()
28    }
29
30    /// Record the stream header as overhead.
31    pub fn add_header(&mut self, header_len: usize) {
32        self.frames_header += 1;           // optional: count headers if we track them
33        self.bytes_overhead += header_len as u64;
34    }
35
36    /// Mark a digest frame processed.
37    /// - `frame_overhead_len`: total encoded length of the digest frame
38    pub fn add_digest(&mut self, frame_overhead_len: usize) {
39        self.frames_digest += 1;
40        self.bytes_overhead += frame_overhead_len as u64;
41    }
42
43    /// Mark a terminator frame processed.
44    /// - `frame_overhead_len`: total encoded length of the terminator frame
45    pub fn add_terminator(&mut self, frame_overhead_len: usize) {
46        self.frames_terminator += 1;
47        self.bytes_overhead += frame_overhead_len as u64;
48    }
49
50    // This avoids:
51    // * locks inside workers
52    // * atomics
53    // * false sharing
54    pub fn merge(&mut self, other: &TelemetryCounters) {
55        self.frames_header += other.frames_header;
56        self.frames_data += other.frames_data;
57        self.frames_terminator += other.frames_terminator;
58        self.frames_digest += other.frames_digest;
59
60        self.bytes_plaintext += other.bytes_plaintext;
61        self.bytes_compressed += other.bytes_compressed;
62        self.bytes_ciphertext += other.bytes_ciphertext;
63        self.bytes_overhead += other.bytes_overhead;
64    }
65}
66
67
68impl AddAssign for TelemetryCounters {
69    fn add_assign(&mut self, rhs: Self) {
70        self.frames_header      += rhs.frames_header;
71        self.frames_data        += rhs.frames_data;
72        self.frames_terminator  += rhs.frames_terminator;
73        self.frames_digest      += rhs.frames_digest;
74
75        self.bytes_plaintext    += rhs.bytes_plaintext;
76        self.bytes_compressed   += rhs.bytes_compressed;
77        self.bytes_ciphertext   += rhs.bytes_ciphertext;
78        self.bytes_overhead     += rhs.bytes_overhead;
79    }
80}
81
82impl fmt::Display for TelemetryCounters {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        writeln!(f, "=== Telemetry Counters Summary ===")?;
85        writeln!(f, "  frames_header: {}", self.frames_header)?;
86        writeln!(f, "  frames_data: {}", self.frames_data)?;
87        writeln!(f, "  frames_digest: {}", self.frames_digest)?;
88        writeln!(f, "  frames_terminator: {}", self.frames_terminator)?;
89        writeln!(f, "  bytes_plaintext: {}", self.bytes_plaintext)?;
90        writeln!(f, "  bytes_compressed: {}", self.bytes_compressed)?;
91        writeln!(f, "  bytes_ciphertext: {}", self.bytes_ciphertext)?;
92        writeln!(f, "  bytes_overhead: {}", self.bytes_overhead)
93    }
94}