Skip to main content

hyperi_rustlib/worker/
stats.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/stats.rs
3// Purpose:   Atomic pipeline statistics for lock-free concurrent access
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Atomic pipeline statistics for DFE services.
10//!
11//! Every DFE pipeline tracks the same base counters: received, processed,
12//! errors, DLQ. These use [`AtomicU64`] for lock-free updates from both
13//! the parallel (rayon) and sequential phases.
14//!
15//! App-specific stats extend separately -- these are the common fields
16//! shared across all 6 DFE pipeline projects.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19
20/// Common DFE pipeline statistics with atomic counters.
21///
22/// Lock-free, safe to read and write from any thread. Uses
23/// `Ordering::Relaxed` -- stats are informational, not safety-critical.
24#[derive(Debug, Default)]
25pub struct PipelineStats {
26    /// Messages received from source (Kafka, HTTP, gRPC, etc.).
27    pub received: AtomicU64,
28    /// Messages successfully processed through the pipeline.
29    pub processed: AtomicU64,
30    /// Messages that failed processing and were routed to DLQ or dropped.
31    pub errors: AtomicU64,
32    /// Messages sent to the dead letter queue (subset of errors, only if DLQ enabled).
33    pub dlq: AtomicU64,
34    /// Messages filtered out (did not pass routing rules or predicates).
35    pub filtered: AtomicU64,
36    /// Total bytes received from source.
37    pub bytes_received: AtomicU64,
38    /// Total bytes written to sink.
39    pub bytes_written: AtomicU64,
40    /// Batches flushed to sink.
41    pub batches_flushed: AtomicU64,
42}
43
44impl PipelineStats {
45    /// Create new zeroed stats.
46    #[must_use]
47    pub fn new() -> Self {
48        Self::default()
49    }
50
51    // --- Increment helpers (single message) ---
52
53    pub fn incr_received(&self) {
54        self.received.fetch_add(1, Ordering::Relaxed);
55    }
56
57    pub fn incr_processed(&self) {
58        self.processed.fetch_add(1, Ordering::Relaxed);
59    }
60
61    pub fn incr_errors(&self) {
62        self.errors.fetch_add(1, Ordering::Relaxed);
63    }
64
65    pub fn incr_dlq(&self) {
66        self.dlq.fetch_add(1, Ordering::Relaxed);
67    }
68
69    pub fn incr_filtered(&self) {
70        self.filtered.fetch_add(1, Ordering::Relaxed);
71    }
72
73    pub fn incr_batches_flushed(&self) {
74        self.batches_flushed.fetch_add(1, Ordering::Relaxed);
75    }
76
77    // --- Bulk add helpers (batch-level) ---
78
79    pub fn add_received(&self, n: u64) {
80        self.received.fetch_add(n, Ordering::Relaxed);
81    }
82
83    pub fn add_processed(&self, n: u64) {
84        self.processed.fetch_add(n, Ordering::Relaxed);
85    }
86
87    pub fn add_filtered(&self, n: u64) {
88        self.filtered.fetch_add(n, Ordering::Relaxed);
89    }
90
91    pub fn add_bytes_received(&self, n: u64) {
92        self.bytes_received.fetch_add(n, Ordering::Relaxed);
93    }
94
95    pub fn add_bytes_written(&self, n: u64) {
96        self.bytes_written.fetch_add(n, Ordering::Relaxed);
97    }
98
99    /// Take an immutable snapshot for logging, metrics, or display.
100    #[must_use]
101    pub fn snapshot(&self) -> PipelineStatsSnapshot {
102        PipelineStatsSnapshot {
103            received: self.received.load(Ordering::Relaxed),
104            processed: self.processed.load(Ordering::Relaxed),
105            errors: self.errors.load(Ordering::Relaxed),
106            dlq: self.dlq.load(Ordering::Relaxed),
107            filtered: self.filtered.load(Ordering::Relaxed),
108            bytes_received: self.bytes_received.load(Ordering::Relaxed),
109            bytes_written: self.bytes_written.load(Ordering::Relaxed),
110            batches_flushed: self.batches_flushed.load(Ordering::Relaxed),
111        }
112    }
113}
114
115/// Immutable snapshot of pipeline stats.
116///
117/// Safe to copy, pass between threads, and use in logging/display without
118/// holding any reference to the atomic counters.
119#[derive(Debug, Clone, Copy, Default)]
120pub struct PipelineStatsSnapshot {
121    pub received: u64,
122    pub processed: u64,
123    pub errors: u64,
124    pub dlq: u64,
125    pub filtered: u64,
126    pub bytes_received: u64,
127    pub bytes_written: u64,
128    pub batches_flushed: u64,
129}
130
131impl std::fmt::Display for PipelineStatsSnapshot {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        write!(
134            f,
135            "received={} processed={} errors={} dlq={} filtered={} batches={}",
136            self.received,
137            self.processed,
138            self.errors,
139            self.dlq,
140            self.filtered,
141            self.batches_flushed,
142        )
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn test_default_is_zero() {
152        let stats = PipelineStats::new();
153        let snap = stats.snapshot();
154        assert_eq!(snap.received, 0);
155        assert_eq!(snap.processed, 0);
156        assert_eq!(snap.errors, 0);
157        assert_eq!(snap.dlq, 0);
158    }
159
160    #[test]
161    fn test_increments() {
162        let stats = PipelineStats::new();
163        stats.incr_received();
164        stats.incr_received();
165        stats.incr_processed();
166        stats.incr_errors();
167        stats.incr_dlq();
168        stats.add_bytes_received(1024);
169
170        let snap = stats.snapshot();
171        assert_eq!(snap.received, 2);
172        assert_eq!(snap.processed, 1);
173        assert_eq!(snap.errors, 1);
174        assert_eq!(snap.dlq, 1);
175        assert_eq!(snap.bytes_received, 1024);
176    }
177
178    #[test]
179    fn test_bulk_add() {
180        let stats = PipelineStats::new();
181        stats.add_received(100);
182        stats.add_processed(95);
183        stats.add_bytes_written(4096);
184        stats.incr_batches_flushed();
185
186        let snap = stats.snapshot();
187        assert_eq!(snap.received, 100);
188        assert_eq!(snap.processed, 95);
189        assert_eq!(snap.bytes_written, 4096);
190        assert_eq!(snap.batches_flushed, 1);
191    }
192
193    #[test]
194    fn test_snapshot_is_copy() {
195        let stats = PipelineStats::new();
196        stats.add_received(42);
197        let snap = stats.snapshot();
198        let copy = snap; // Copy
199        assert_eq!(snap.received, copy.received);
200    }
201
202    #[test]
203    fn test_filtered_counter() {
204        let stats = PipelineStats::new();
205        stats.incr_filtered();
206        stats.add_filtered(9);
207        let snap = stats.snapshot();
208        assert_eq!(snap.filtered, 10);
209    }
210
211    #[test]
212    fn test_display() {
213        let stats = PipelineStats::new();
214        stats.add_received(100);
215        stats.add_processed(90);
216        let snap = stats.snapshot();
217        let display = format!("{snap}");
218        assert!(display.contains("received=100"));
219        assert!(display.contains("processed=90"));
220    }
221}