rmqtt_utils/
counter.rs

1//! Thread-safe atomic counter implementation with statistical merging capabilities
2//!
3//! ## Core Features:
4//! - **Dual-value Tracking**: Maintains both current value and historical maximum
5//! - **Atomic Operations**: Uses `AtomicIsize` for lock-free thread safety
6//! - **Multiple Merge Strategies**: Supports Sum/Max/Min merging modes
7//! - **Serialization Ready**: Implements Serde traits for easy serialization
8//! - **JSON Conversion**: Provides `to_json()` for monitoring integration
9//!
10//! ## Key Components:
11//! - `Counter`: Main structure with atomic current/max values and merge mode
12//! - `StatsMergeMode`: Enum defining SUM/MAX/MIN/NONE merge strategies
13//! - Atomic operations with SeqCst ordering for cross-thread consistency
14//!
15/// # Example: Using `Counter`
16/// ```
17/// use rmqtt_utils::{Counter, StatsMergeMode};
18///
19/// let counter = Counter::new();
20/// counter.inc(); // increment by 1
21/// counter.incs(5); // increment by 5
22/// println!("Current: {}, Max: {}", counter.count(), counter.max());
23///
24/// let other = Counter::new_with(10, 20, StatsMergeMode::Max);
25/// counter.merge(&other);
26/// println!("After merge: {:?}", counter.to_json());
27/// ```
28use std::fmt;
29use std::sync::atomic::{AtomicIsize, Ordering};
30
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33
34type Current = AtomicIsize;
35type Max = AtomicIsize;
36
37/// A counter with current and maximum tracking, and optional merging behavior.
38#[derive(Serialize, Deserialize)]
39pub struct Counter(Current, Max, StatsMergeMode);
40
41impl Clone for Counter {
42    fn clone(&self) -> Self {
43        Counter(
44            AtomicIsize::new(self.0.load(Ordering::SeqCst)),
45            AtomicIsize::new(self.1.load(Ordering::SeqCst)),
46            self.2.clone(),
47        )
48    }
49}
50
51impl fmt::Debug for Counter {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        write!(f, r#"{{ "count":{}, "max":{} }}"#, self.count(), self.max())
54    }
55}
56
57impl Default for Counter {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl Counter {
64    /// Creates a new `Counter` with zeroed values and `StatsMergeMode::None`.
65    ///
66    /// # Example
67    /// ```
68    /// let c = rmqtt_utils::Counter::new();
69    /// ```
70    #[inline]
71    pub fn new() -> Self {
72        Counter(AtomicIsize::new(0), AtomicIsize::new(0), StatsMergeMode::None)
73    }
74
75    /// Creates a new `Counter` with specified values.
76    ///
77    /// # Example
78    /// ```
79    /// let c = rmqtt_utils::Counter::new_with(5, 10, rmqtt_utils::StatsMergeMode::Sum);
80    /// ```
81    #[inline]
82    pub fn new_with(c: isize, max: isize, m: StatsMergeMode) -> Self {
83        Counter(AtomicIsize::new(c), AtomicIsize::new(max), m)
84    }
85
86    /// Increments current count by 1 and updates max if needed.
87    ///
88    /// # Example
89    /// ```
90    /// let c = rmqtt_utils::Counter::new();
91    /// c.inc();
92    /// ```
93    #[inline]
94    pub fn inc(&self) {
95        self.incs(1);
96    }
97
98    /// Increments current count by `c` and updates max if needed.
99    ///
100    /// # Example
101    /// ```
102    /// let c = rmqtt_utils::Counter::new();
103    /// c.incs(5);
104    /// ```
105    #[inline]
106    pub fn incs(&self, c: isize) {
107        let prev = self.0.fetch_add(c, Ordering::SeqCst);
108        self.1.fetch_max(prev + c, Ordering::SeqCst);
109    }
110
111    /// Increments current count by 1 without affecting max.
112    ///
113    /// # Example
114    /// ```
115    /// let c = rmqtt_utils::Counter::new();
116    /// c.current_inc();
117    /// ```
118    #[inline]
119    pub fn current_inc(&self) {
120        self.current_incs(1);
121    }
122
123    /// Increments current count by `c` without affecting max.
124    ///
125    /// # Example
126    /// ```
127    /// let c = rmqtt_utils::Counter::new();
128    /// c.current_incs(3);
129    /// ```
130    #[inline]
131    pub fn current_incs(&self, c: isize) {
132        self.0.fetch_add(c, Ordering::SeqCst);
133    }
134
135    /// Sets the current count directly, does not affect max.
136    ///
137    /// # Example
138    /// ```
139    /// let c = rmqtt_utils::Counter::new();
140    /// c.current_set(100);
141    /// ```
142    #[inline]
143    pub fn current_set(&self, c: isize) {
144        self.0.store(c, Ordering::SeqCst);
145    }
146
147    /// Sets the current count and possibly updates the max.
148    ///
149    /// # Example
150    /// ```
151    /// let c = rmqtt_utils::Counter::new();
152    /// c.sets(20);
153    /// ```
154    #[inline]
155    pub fn sets(&self, c: isize) {
156        self.current_set(c);
157        self.1.fetch_max(c, Ordering::SeqCst);
158    }
159
160    /// Decrements current count by 1.
161    ///
162    /// # Example
163    /// ```
164    /// let c = rmqtt_utils::Counter::new();
165    /// c.dec();
166    /// ```
167    #[inline]
168    pub fn dec(&self) {
169        self.decs(1)
170    }
171
172    /// Decrements current count by `c`.
173    ///
174    /// # Example
175    /// ```
176    /// let c = rmqtt_utils::Counter::new();
177    /// c.decs(4);
178    /// ```
179    #[inline]
180    pub fn decs(&self, c: isize) {
181        self.0.fetch_sub(c, Ordering::SeqCst);
182    }
183
184    /// Sets current count to the minimum of its current value and `count`.
185    ///
186    /// # Example
187    /// ```
188    /// let c = rmqtt_utils::Counter::new();
189    /// c.count_min(5);
190    /// ```
191    #[inline]
192    pub fn count_min(&self, count: isize) {
193        self.0.fetch_min(count, Ordering::SeqCst);
194    }
195
196    /// Sets current count to the maximum of its current value and `count`.
197    ///
198    /// # Example
199    /// ```
200    /// let c = rmqtt_utils::Counter::new();
201    /// c.count_max(10);
202    /// ```
203    #[inline]
204    pub fn count_max(&self, count: isize) {
205        self.0.fetch_max(count, Ordering::SeqCst);
206    }
207
208    /// Sets max to the maximum of its current value and `max`.
209    ///
210    /// # Example
211    /// ```
212    /// let c = rmqtt_utils::Counter::new();
213    /// c.max_max(50);
214    /// ```
215    #[inline]
216    pub fn max_max(&self, max: isize) {
217        self.1.fetch_max(max, Ordering::SeqCst);
218    }
219
220    /// Sets max to the minimum of its current value and `max`.
221    ///
222    /// # Example
223    /// ```
224    /// let c = rmqtt_utils::Counter::new();
225    /// c.max_min(30);
226    /// ```
227    #[inline]
228    pub fn max_min(&self, max: isize) {
229        self.1.fetch_min(max, Ordering::SeqCst);
230    }
231
232    /// Returns the current count value.
233    ///
234    /// # Example
235    /// ```
236    /// let c = rmqtt_utils::Counter::new();
237    /// let v = c.count();
238    /// ```
239    #[inline]
240    pub fn count(&self) -> isize {
241        self.0.load(Ordering::SeqCst)
242    }
243
244    /// Returns the current max value.
245    ///
246    /// # Example
247    /// ```
248    /// let c = rmqtt_utils::Counter::new();
249    /// let m = c.max();
250    /// ```
251    #[inline]
252    pub fn max(&self) -> isize {
253        self.1.load(Ordering::SeqCst)
254    }
255
256    /// Adds values from another counter.
257    ///
258    /// # Example
259    /// ```
260    /// let a = rmqtt_utils::Counter::new_with(3, 10, rmqtt_utils::StatsMergeMode::None);
261    /// let b = rmqtt_utils::Counter::new_with(2, 5, rmqtt_utils::StatsMergeMode::None);
262    /// a.add(&b);
263    /// ```
264    #[inline]
265    pub fn add(&self, other: &Self) {
266        self.0.fetch_add(other.0.load(Ordering::SeqCst), Ordering::SeqCst);
267        self.1.fetch_add(other.1.load(Ordering::SeqCst), Ordering::SeqCst);
268    }
269
270    /// Replaces internal values with those from another counter.
271    ///
272    /// # Example
273    /// ```
274    /// let a = rmqtt_utils::Counter::new();
275    /// let b = rmqtt_utils::Counter::new_with(5, 100, rmqtt_utils::StatsMergeMode::None);
276    /// a.set(&b);
277    /// ```
278    #[inline]
279    pub fn set(&self, other: &Self) {
280        self.0.store(other.0.load(Ordering::SeqCst), Ordering::SeqCst);
281        self.1.store(other.1.load(Ordering::SeqCst), Ordering::SeqCst);
282    }
283
284    /// Merges another counter into this one using the configured merge mode.
285    ///
286    /// # Example
287    /// ```
288    /// let a = rmqtt_utils::Counter::new_with(1, 2, rmqtt_utils::StatsMergeMode::Sum);
289    /// let b = rmqtt_utils::Counter::new_with(3, 4, rmqtt_utils::StatsMergeMode::Sum);
290    /// a.merge(&b);
291    /// ```
292    #[inline]
293    pub fn merge(&self, other: &Self) {
294        stats_merge(&self.2, self, other);
295    }
296
297    /// Converts the counter to JSON format.
298    ///
299    /// # Example
300    /// ```
301    /// let c = rmqtt_utils::Counter::new();
302    /// let json = c.to_json();
303    /// ```
304    #[inline]
305    pub fn to_json(&self) -> serde_json::Value {
306        json!({
307            "count": self.count(),
308            "max": self.max()
309        })
310    }
311}
312
313#[inline]
314fn stats_merge<'a>(mode: &StatsMergeMode, c: &'a Counter, o: &Counter) -> &'a Counter {
315    match mode {
316        StatsMergeMode::None => {}
317        StatsMergeMode::Sum => {
318            c.add(o);
319        }
320        StatsMergeMode::Max => {
321            c.count_max(o.count());
322            c.max_max(o.max());
323        }
324        StatsMergeMode::Min => {
325            c.count_min(o.count());
326            c.max_min(o.max());
327        }
328        _ => {
329            log::info!("unimplemented!");
330        }
331    }
332    c
333}
334
335/// Merge behavior modes for `Counter`.
336#[derive(Clone, Debug, Serialize, Deserialize)]
337pub enum StatsMergeMode {
338    None,    // Represents no merging;
339    Sum,     // Represents summing the data;
340    Average, // Represents averaging the data;
341    Max,     // Represents taking the maximum value of the data;
342    Min,     // Represents taking the minimum value of the data;
343}