hyperi_rustlib/worker/
stats.rs1use std::sync::atomic::{AtomicU64, Ordering};
19
20#[derive(Debug, Default)]
25pub struct PipelineStats {
26 pub received: AtomicU64,
28 pub processed: AtomicU64,
30 pub errors: AtomicU64,
32 pub dlq: AtomicU64,
34 pub filtered: AtomicU64,
36 pub bytes_received: AtomicU64,
38 pub bytes_written: AtomicU64,
40 pub batches_flushed: AtomicU64,
42}
43
44impl PipelineStats {
45 #[must_use]
47 pub fn new() -> Self {
48 Self::default()
49 }
50
51 pub fn incr_received(&self) {
54 self.received.fetch_add(1, Ordering::Relaxed);
55 }
56
57 pub fn incr_processed(&self) {
58 self.processed.fetch_add(1, Ordering::Relaxed);
59 }
60
61 pub fn incr_errors(&self) {
62 self.errors.fetch_add(1, Ordering::Relaxed);
63 }
64
65 pub fn incr_dlq(&self) {
66 self.dlq.fetch_add(1, Ordering::Relaxed);
67 }
68
69 pub fn incr_filtered(&self) {
70 self.filtered.fetch_add(1, Ordering::Relaxed);
71 }
72
73 pub fn incr_batches_flushed(&self) {
74 self.batches_flushed.fetch_add(1, Ordering::Relaxed);
75 }
76
77 pub fn add_received(&self, n: u64) {
80 self.received.fetch_add(n, Ordering::Relaxed);
81 }
82
83 pub fn add_processed(&self, n: u64) {
84 self.processed.fetch_add(n, Ordering::Relaxed);
85 }
86
87 pub fn add_filtered(&self, n: u64) {
88 self.filtered.fetch_add(n, Ordering::Relaxed);
89 }
90
91 pub fn add_bytes_received(&self, n: u64) {
92 self.bytes_received.fetch_add(n, Ordering::Relaxed);
93 }
94
95 pub fn add_bytes_written(&self, n: u64) {
96 self.bytes_written.fetch_add(n, Ordering::Relaxed);
97 }
98
99 #[must_use]
101 pub fn snapshot(&self) -> PipelineStatsSnapshot {
102 PipelineStatsSnapshot {
103 received: self.received.load(Ordering::Relaxed),
104 processed: self.processed.load(Ordering::Relaxed),
105 errors: self.errors.load(Ordering::Relaxed),
106 dlq: self.dlq.load(Ordering::Relaxed),
107 filtered: self.filtered.load(Ordering::Relaxed),
108 bytes_received: self.bytes_received.load(Ordering::Relaxed),
109 bytes_written: self.bytes_written.load(Ordering::Relaxed),
110 batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
111 }
112 }
113}
114
115#[derive(Debug, Clone, Copy, Default)]
120pub struct PipelineStatsSnapshot {
121 pub received: u64,
122 pub processed: u64,
123 pub errors: u64,
124 pub dlq: u64,
125 pub filtered: u64,
126 pub bytes_received: u64,
127 pub bytes_written: u64,
128 pub batches_flushed: u64,
129}
130
131impl std::fmt::Display for PipelineStatsSnapshot {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 write!(
134 f,
135 "received={} processed={} errors={} dlq={} filtered={} batches={}",
136 self.received,
137 self.processed,
138 self.errors,
139 self.dlq,
140 self.filtered,
141 self.batches_flushed,
142 )
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn test_default_is_zero() {
152 let stats = PipelineStats::new();
153 let snap = stats.snapshot();
154 assert_eq!(snap.received, 0);
155 assert_eq!(snap.processed, 0);
156 assert_eq!(snap.errors, 0);
157 assert_eq!(snap.dlq, 0);
158 }
159
160 #[test]
161 fn test_increments() {
162 let stats = PipelineStats::new();
163 stats.incr_received();
164 stats.incr_received();
165 stats.incr_processed();
166 stats.incr_errors();
167 stats.incr_dlq();
168 stats.add_bytes_received(1024);
169
170 let snap = stats.snapshot();
171 assert_eq!(snap.received, 2);
172 assert_eq!(snap.processed, 1);
173 assert_eq!(snap.errors, 1);
174 assert_eq!(snap.dlq, 1);
175 assert_eq!(snap.bytes_received, 1024);
176 }
177
178 #[test]
179 fn test_bulk_add() {
180 let stats = PipelineStats::new();
181 stats.add_received(100);
182 stats.add_processed(95);
183 stats.add_bytes_written(4096);
184 stats.incr_batches_flushed();
185
186 let snap = stats.snapshot();
187 assert_eq!(snap.received, 100);
188 assert_eq!(snap.processed, 95);
189 assert_eq!(snap.bytes_written, 4096);
190 assert_eq!(snap.batches_flushed, 1);
191 }
192
193 #[test]
194 fn test_snapshot_is_copy() {
195 let stats = PipelineStats::new();
196 stats.add_received(42);
197 let snap = stats.snapshot();
198 let copy = snap; assert_eq!(snap.received, copy.received);
200 }
201
202 #[test]
203 fn test_filtered_counter() {
204 let stats = PipelineStats::new();
205 stats.incr_filtered();
206 stats.add_filtered(9);
207 let snap = stats.snapshot();
208 assert_eq!(snap.filtered, 10);
209 }
210
211 #[test]
212 fn test_display() {
213 let stats = PipelineStats::new();
214 stats.add_received(100);
215 stats.add_processed(90);
216 let snap = stats.snapshot();
217 let display = format!("{snap}");
218 assert!(display.contains("received=100"));
219 assert!(display.contains("processed=90"));
220 }
221}