1use std::sync::atomic::{AtomicU64, Ordering};
7
8use super::CachePadded;
9
10#[derive(Debug)]
15pub struct DatapathStats {
16 pub tx_packets: CachePadded<AtomicU64>,
18 pub tx_bytes: CachePadded<AtomicU64>,
20 pub tx_dropped: CachePadded<AtomicU64>,
22 pub tx_errors: CachePadded<AtomicU64>,
24
25 pub rx_packets: CachePadded<AtomicU64>,
27 pub rx_bytes: CachePadded<AtomicU64>,
29 pub rx_dropped: CachePadded<AtomicU64>,
31 pub rx_errors: CachePadded<AtomicU64>,
33
34 pub nat_translations: CachePadded<AtomicU64>,
36 pub nat_fast_path_hits: CachePadded<AtomicU64>,
38 pub nat_slow_path_lookups: CachePadded<AtomicU64>,
40 pub nat_connections_created: CachePadded<AtomicU64>,
42 pub nat_connections_expired: CachePadded<AtomicU64>,
44
45 pub poll_iterations: CachePadded<AtomicU64>,
47 pub poll_work_done: CachePadded<AtomicU64>,
49 pub poll_busy_spins: CachePadded<AtomicU64>,
51
52 pub batch_histogram: [CachePadded<AtomicU64>; 8],
55}
56
57impl Default for DatapathStats {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl DatapathStats {
64 #[must_use]
66 pub const fn new() -> Self {
67 Self {
68 tx_packets: CachePadded::new(AtomicU64::new(0)),
69 tx_bytes: CachePadded::new(AtomicU64::new(0)),
70 tx_dropped: CachePadded::new(AtomicU64::new(0)),
71 tx_errors: CachePadded::new(AtomicU64::new(0)),
72
73 rx_packets: CachePadded::new(AtomicU64::new(0)),
74 rx_bytes: CachePadded::new(AtomicU64::new(0)),
75 rx_dropped: CachePadded::new(AtomicU64::new(0)),
76 rx_errors: CachePadded::new(AtomicU64::new(0)),
77
78 nat_translations: CachePadded::new(AtomicU64::new(0)),
79 nat_fast_path_hits: CachePadded::new(AtomicU64::new(0)),
80 nat_slow_path_lookups: CachePadded::new(AtomicU64::new(0)),
81 nat_connections_created: CachePadded::new(AtomicU64::new(0)),
82 nat_connections_expired: CachePadded::new(AtomicU64::new(0)),
83
84 poll_iterations: CachePadded::new(AtomicU64::new(0)),
85 poll_work_done: CachePadded::new(AtomicU64::new(0)),
86 poll_busy_spins: CachePadded::new(AtomicU64::new(0)),
87
88 batch_histogram: [
89 CachePadded::new(AtomicU64::new(0)),
90 CachePadded::new(AtomicU64::new(0)),
91 CachePadded::new(AtomicU64::new(0)),
92 CachePadded::new(AtomicU64::new(0)),
93 CachePadded::new(AtomicU64::new(0)),
94 CachePadded::new(AtomicU64::new(0)),
95 CachePadded::new(AtomicU64::new(0)),
96 CachePadded::new(AtomicU64::new(0)),
97 ],
98 }
99 }
100
101 #[inline]
103 pub fn record_tx(&self, packets: u64, bytes: u64) {
104 self.tx_packets.0.fetch_add(packets, Ordering::Relaxed);
105 self.tx_bytes.0.fetch_add(bytes, Ordering::Relaxed);
106 }
107
108 #[inline]
110 pub fn record_tx_drop(&self) {
111 self.tx_dropped.0.fetch_add(1, Ordering::Relaxed);
112 }
113
114 #[inline]
116 pub fn record_tx_error(&self) {
117 self.tx_errors.0.fetch_add(1, Ordering::Relaxed);
118 }
119
120 #[inline]
122 pub fn record_rx(&self, packets: u64, bytes: u64) {
123 self.rx_packets.0.fetch_add(packets, Ordering::Relaxed);
124 self.rx_bytes.0.fetch_add(bytes, Ordering::Relaxed);
125 }
126
127 #[inline]
129 pub fn record_rx_drop(&self) {
130 self.rx_dropped.0.fetch_add(1, Ordering::Relaxed);
131 }
132
133 #[inline]
135 pub fn record_rx_error(&self) {
136 self.rx_errors.0.fetch_add(1, Ordering::Relaxed);
137 }
138
139 #[inline]
141 pub fn record_nat_translation(&self, fast_path: bool) {
142 self.nat_translations.0.fetch_add(1, Ordering::Relaxed);
143 if fast_path {
144 self.nat_fast_path_hits.0.fetch_add(1, Ordering::Relaxed);
145 } else {
146 self.nat_slow_path_lookups.0.fetch_add(1, Ordering::Relaxed);
147 }
148 }
149
150 #[inline]
152 pub fn record_nat_connection_created(&self) {
153 self.nat_connections_created
154 .0
155 .fetch_add(1, Ordering::Relaxed);
156 }
157
158 #[inline]
160 pub fn record_nat_connection_expired(&self) {
161 self.nat_connections_expired
162 .0
163 .fetch_add(1, Ordering::Relaxed);
164 }
165
166 #[inline]
168 pub fn record_poll(&self, work_done: bool) {
169 self.poll_iterations.0.fetch_add(1, Ordering::Relaxed);
170 if work_done {
171 self.poll_work_done.0.fetch_add(1, Ordering::Relaxed);
172 } else {
173 self.poll_busy_spins.0.fetch_add(1, Ordering::Relaxed);
174 }
175 }
176
177 #[inline]
179 pub fn record_batch_size(&self, size: usize) {
180 let bucket = if size == 0 {
182 0
183 } else {
184 (usize::BITS - (size - 1).leading_zeros()) as usize
185 };
186 let bucket = bucket.min(self.batch_histogram.len() - 1);
187 self.batch_histogram[bucket]
188 .0
189 .fetch_add(1, Ordering::Relaxed);
190 }
191
192 #[must_use]
194 pub fn snapshot(&self) -> StatsSnapshot {
195 StatsSnapshot {
196 tx_packets: self.tx_packets.0.load(Ordering::Relaxed),
197 tx_bytes: self.tx_bytes.0.load(Ordering::Relaxed),
198 tx_dropped: self.tx_dropped.0.load(Ordering::Relaxed),
199 tx_errors: self.tx_errors.0.load(Ordering::Relaxed),
200
201 rx_packets: self.rx_packets.0.load(Ordering::Relaxed),
202 rx_bytes: self.rx_bytes.0.load(Ordering::Relaxed),
203 rx_dropped: self.rx_dropped.0.load(Ordering::Relaxed),
204 rx_errors: self.rx_errors.0.load(Ordering::Relaxed),
205
206 nat_translations: self.nat_translations.0.load(Ordering::Relaxed),
207 nat_fast_path_hits: self.nat_fast_path_hits.0.load(Ordering::Relaxed),
208 nat_slow_path_lookups: self.nat_slow_path_lookups.0.load(Ordering::Relaxed),
209 nat_connections_created: self.nat_connections_created.0.load(Ordering::Relaxed),
210 nat_connections_expired: self.nat_connections_expired.0.load(Ordering::Relaxed),
211
212 poll_iterations: self.poll_iterations.0.load(Ordering::Relaxed),
213 poll_work_done: self.poll_work_done.0.load(Ordering::Relaxed),
214 poll_busy_spins: self.poll_busy_spins.0.load(Ordering::Relaxed),
215 }
216 }
217
218 pub fn reset(&self) {
220 self.tx_packets.0.store(0, Ordering::Relaxed);
221 self.tx_bytes.0.store(0, Ordering::Relaxed);
222 self.tx_dropped.0.store(0, Ordering::Relaxed);
223 self.tx_errors.0.store(0, Ordering::Relaxed);
224
225 self.rx_packets.0.store(0, Ordering::Relaxed);
226 self.rx_bytes.0.store(0, Ordering::Relaxed);
227 self.rx_dropped.0.store(0, Ordering::Relaxed);
228 self.rx_errors.0.store(0, Ordering::Relaxed);
229
230 self.nat_translations.0.store(0, Ordering::Relaxed);
231 self.nat_fast_path_hits.0.store(0, Ordering::Relaxed);
232 self.nat_slow_path_lookups.0.store(0, Ordering::Relaxed);
233 self.nat_connections_created.0.store(0, Ordering::Relaxed);
234 self.nat_connections_expired.0.store(0, Ordering::Relaxed);
235
236 self.poll_iterations.0.store(0, Ordering::Relaxed);
237 self.poll_work_done.0.store(0, Ordering::Relaxed);
238 self.poll_busy_spins.0.store(0, Ordering::Relaxed);
239
240 for bucket in &self.batch_histogram {
241 bucket.0.store(0, Ordering::Relaxed);
242 }
243 }
244}
245
246#[derive(Debug, Clone, Copy, Default)]
248pub struct StatsSnapshot {
249 pub tx_packets: u64,
251 pub tx_bytes: u64,
253 pub tx_dropped: u64,
255 pub tx_errors: u64,
257
258 pub rx_packets: u64,
260 pub rx_bytes: u64,
262 pub rx_dropped: u64,
264 pub rx_errors: u64,
266
267 pub nat_translations: u64,
269 pub nat_fast_path_hits: u64,
271 pub nat_slow_path_lookups: u64,
273 pub nat_connections_created: u64,
275 pub nat_connections_expired: u64,
277
278 pub poll_iterations: u64,
280 pub poll_work_done: u64,
282 pub poll_busy_spins: u64,
284}
285
286impl StatsSnapshot {
287 #[inline]
289 #[must_use]
290 pub const fn total_packets(&self) -> u64 {
291 self.tx_packets + self.rx_packets
292 }
293
294 #[inline]
296 #[must_use]
297 pub const fn total_bytes(&self) -> u64 {
298 self.tx_bytes + self.rx_bytes
299 }
300
301 #[inline]
303 #[must_use]
304 pub fn nat_hit_rate(&self) -> f64 {
305 if self.nat_translations == 0 {
306 0.0
307 } else {
308 self.nat_fast_path_hits as f64 / self.nat_translations as f64
309 }
310 }
311
312 #[inline]
314 #[must_use]
315 pub fn poll_efficiency(&self) -> f64 {
316 if self.poll_iterations == 0 {
317 0.0
318 } else {
319 self.poll_work_done as f64 / self.poll_iterations as f64
320 }
321 }
322
323 #[must_use]
325 pub fn delta(&self, prev: &Self) -> Self {
326 Self {
327 tx_packets: self.tx_packets.saturating_sub(prev.tx_packets),
328 tx_bytes: self.tx_bytes.saturating_sub(prev.tx_bytes),
329 tx_dropped: self.tx_dropped.saturating_sub(prev.tx_dropped),
330 tx_errors: self.tx_errors.saturating_sub(prev.tx_errors),
331
332 rx_packets: self.rx_packets.saturating_sub(prev.rx_packets),
333 rx_bytes: self.rx_bytes.saturating_sub(prev.rx_bytes),
334 rx_dropped: self.rx_dropped.saturating_sub(prev.rx_dropped),
335 rx_errors: self.rx_errors.saturating_sub(prev.rx_errors),
336
337 nat_translations: self.nat_translations.saturating_sub(prev.nat_translations),
338 nat_fast_path_hits: self
339 .nat_fast_path_hits
340 .saturating_sub(prev.nat_fast_path_hits),
341 nat_slow_path_lookups: self
342 .nat_slow_path_lookups
343 .saturating_sub(prev.nat_slow_path_lookups),
344 nat_connections_created: self
345 .nat_connections_created
346 .saturating_sub(prev.nat_connections_created),
347 nat_connections_expired: self
348 .nat_connections_expired
349 .saturating_sub(prev.nat_connections_expired),
350
351 poll_iterations: self.poll_iterations.saturating_sub(prev.poll_iterations),
352 poll_work_done: self.poll_work_done.saturating_sub(prev.poll_work_done),
353 poll_busy_spins: self.poll_busy_spins.saturating_sub(prev.poll_busy_spins),
354 }
355 }
356}
357
358impl std::fmt::Display for StatsSnapshot {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 writeln!(f, "Datapath Statistics:")?;
361 writeln!(
362 f,
363 " TX: {} packets, {} bytes, {} dropped, {} errors",
364 self.tx_packets, self.tx_bytes, self.tx_dropped, self.tx_errors
365 )?;
366 writeln!(
367 f,
368 " RX: {} packets, {} bytes, {} dropped, {} errors",
369 self.rx_packets, self.rx_bytes, self.rx_dropped, self.rx_errors
370 )?;
371 writeln!(
372 f,
373 " NAT: {} translations ({:.1}% fast path)",
374 self.nat_translations,
375 self.nat_hit_rate() * 100.0
376 )?;
377 writeln!(
378 f,
379 " Poll: {} iterations ({:.1}% efficient)",
380 self.poll_iterations,
381 self.poll_efficiency() * 100.0
382 )?;
383 Ok(())
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[test]
392 fn test_stats_basic() {
393 let stats = DatapathStats::new();
394
395 stats.record_tx(10, 1000);
396 stats.record_rx(5, 500);
397 stats.record_tx_drop();
398 stats.record_rx_error();
399
400 let snap = stats.snapshot();
401 assert_eq!(snap.tx_packets, 10);
402 assert_eq!(snap.tx_bytes, 1000);
403 assert_eq!(snap.rx_packets, 5);
404 assert_eq!(snap.rx_bytes, 500);
405 assert_eq!(snap.tx_dropped, 1);
406 assert_eq!(snap.rx_errors, 1);
407 }
408
409 #[test]
410 fn test_stats_nat() {
411 let stats = DatapathStats::new();
412
413 stats.record_nat_translation(true);
414 stats.record_nat_translation(true);
415 stats.record_nat_translation(false);
416
417 let snap = stats.snapshot();
418 assert_eq!(snap.nat_translations, 3);
419 assert_eq!(snap.nat_fast_path_hits, 2);
420 assert_eq!(snap.nat_slow_path_lookups, 1);
421 assert!((snap.nat_hit_rate() - 0.666).abs() < 0.01);
422 }
423
424 #[test]
425 fn test_stats_reset() {
426 let stats = DatapathStats::new();
427
428 stats.record_tx(100, 10000);
429 stats.reset();
430
431 let snap = stats.snapshot();
432 assert_eq!(snap.tx_packets, 0);
433 assert_eq!(snap.tx_bytes, 0);
434 }
435
436 #[test]
437 fn test_snapshot_delta() {
438 let stats = DatapathStats::new();
439
440 stats.record_tx(10, 1000);
441 let snap1 = stats.snapshot();
442
443 stats.record_tx(5, 500);
444 let snap2 = stats.snapshot();
445
446 let delta = snap2.delta(&snap1);
447 assert_eq!(delta.tx_packets, 5);
448 assert_eq!(delta.tx_bytes, 500);
449 }
450
451 #[test]
452 fn test_batch_histogram() {
453 let stats = DatapathStats::new();
454
455 stats.record_batch_size(1);
456 stats.record_batch_size(2);
457 stats.record_batch_size(4);
458 stats.record_batch_size(8);
459 stats.record_batch_size(64);
460
461 assert_eq!(stats.batch_histogram[0].0.load(Ordering::Relaxed), 1); assert_eq!(stats.batch_histogram[1].0.load(Ordering::Relaxed), 1); assert_eq!(stats.batch_histogram[2].0.load(Ordering::Relaxed), 1); assert_eq!(stats.batch_histogram[3].0.load(Ordering::Relaxed), 1); assert_eq!(stats.batch_histogram[6].0.load(Ordering::Relaxed), 1); }
467}