rmqtt_utils/counter.rs
1//! Thread-safe atomic counter implementation with statistical merging capabilities
2//!
3//! ## Core Features:
4//! - **Dual-value Tracking**: Maintains both current value and historical maximum
5//! - **Atomic Operations**: Uses `AtomicIsize` for lock-free thread safety
6//! - **Multiple Merge Strategies**: Supports Sum/Max/Min merging modes
7//! - **Serialization Ready**: Implements Serde traits for easy serialization
8//! - **JSON Conversion**: Provides `to_json()` for monitoring integration
9//!
10//! ## Key Components:
11//! - `Counter`: Main structure with atomic current/max values and merge mode
12//! - `StatsMergeMode`: Enum defining SUM/MAX/MIN/NONE merge strategies
13//! - Atomic operations with SeqCst ordering for cross-thread consistency
14//!
15/// # Example: Using `Counter`
16/// ```
17/// use rmqtt_utils::{Counter, StatsMergeMode};
18///
19/// let counter = Counter::new();
20/// counter.inc(); // increment by 1
21/// counter.incs(5); // increment by 5
22/// println!("Current: {}, Max: {}", counter.count(), counter.max());
23///
24/// let other = Counter::new_with(10, 20, StatsMergeMode::Max);
25/// counter.merge(&other);
26/// println!("After merge: {:?}", counter.to_json());
27/// ```
28use std::fmt;
29use std::sync::atomic::{AtomicIsize, Ordering};
30
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33
34type Current = AtomicIsize;
35type Max = AtomicIsize;
36
37/// A counter with current and maximum tracking, and optional merging behavior.
38#[derive(Serialize, Deserialize)]
39pub struct Counter(Current, Max, StatsMergeMode);
40
41impl Clone for Counter {
42 fn clone(&self) -> Self {
43 Counter(
44 AtomicIsize::new(self.0.load(Ordering::SeqCst)),
45 AtomicIsize::new(self.1.load(Ordering::SeqCst)),
46 self.2.clone(),
47 )
48 }
49}
50
51impl fmt::Debug for Counter {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 write!(f, r#"{{ "count":{}, "max":{} }}"#, self.count(), self.max())
54 }
55}
56
57impl Default for Counter {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl Counter {
64 /// Creates a new `Counter` with zeroed values and `StatsMergeMode::None`.
65 ///
66 /// # Example
67 /// ```
68 /// let c = rmqtt_utils::Counter::new();
69 /// ```
70 #[inline]
71 pub fn new() -> Self {
72 Counter(AtomicIsize::new(0), AtomicIsize::new(0), StatsMergeMode::None)
73 }
74
75 /// Creates a new `Counter` with specified values.
76 ///
77 /// # Example
78 /// ```
79 /// let c = rmqtt_utils::Counter::new_with(5, 10, rmqtt_utils::StatsMergeMode::Sum);
80 /// ```
81 #[inline]
82 pub fn new_with(c: isize, max: isize, m: StatsMergeMode) -> Self {
83 Counter(AtomicIsize::new(c), AtomicIsize::new(max), m)
84 }
85
86 /// Increments current count by 1 and updates max if needed.
87 ///
88 /// # Example
89 /// ```
90 /// let c = rmqtt_utils::Counter::new();
91 /// c.inc();
92 /// ```
93 #[inline]
94 pub fn inc(&self) {
95 self.incs(1);
96 }
97
98 /// Increments current count by `c` and updates max if needed.
99 ///
100 /// # Example
101 /// ```
102 /// let c = rmqtt_utils::Counter::new();
103 /// c.incs(5);
104 /// ```
105 #[inline]
106 pub fn incs(&self, c: isize) {
107 let prev = self.0.fetch_add(c, Ordering::SeqCst);
108 self.1.fetch_max(prev + c, Ordering::SeqCst);
109 }
110
111 /// Increments current count by 1 without affecting max.
112 ///
113 /// # Example
114 /// ```
115 /// let c = rmqtt_utils::Counter::new();
116 /// c.current_inc();
117 /// ```
118 #[inline]
119 pub fn current_inc(&self) {
120 self.current_incs(1);
121 }
122
123 /// Increments current count by `c` without affecting max.
124 ///
125 /// # Example
126 /// ```
127 /// let c = rmqtt_utils::Counter::new();
128 /// c.current_incs(3);
129 /// ```
130 #[inline]
131 pub fn current_incs(&self, c: isize) {
132 self.0.fetch_add(c, Ordering::SeqCst);
133 }
134
135 /// Sets the current count directly, does not affect max.
136 ///
137 /// # Example
138 /// ```
139 /// let c = rmqtt_utils::Counter::new();
140 /// c.current_set(100);
141 /// ```
142 #[inline]
143 pub fn current_set(&self, c: isize) {
144 self.0.store(c, Ordering::SeqCst);
145 }
146
147 /// Sets the current count and possibly updates the max.
148 ///
149 /// # Example
150 /// ```
151 /// let c = rmqtt_utils::Counter::new();
152 /// c.sets(20);
153 /// ```
154 #[inline]
155 pub fn sets(&self, c: isize) {
156 self.current_set(c);
157 self.1.fetch_max(c, Ordering::SeqCst);
158 }
159
160 /// Decrements current count by 1.
161 ///
162 /// # Example
163 /// ```
164 /// let c = rmqtt_utils::Counter::new();
165 /// c.dec();
166 /// ```
167 #[inline]
168 pub fn dec(&self) {
169 self.decs(1)
170 }
171
172 /// Decrements current count by `c`.
173 ///
174 /// # Example
175 /// ```
176 /// let c = rmqtt_utils::Counter::new();
177 /// c.decs(4);
178 /// ```
179 #[inline]
180 pub fn decs(&self, c: isize) {
181 self.0.fetch_sub(c, Ordering::SeqCst);
182 }
183
184 /// Sets current count to the minimum of its current value and `count`.
185 ///
186 /// # Example
187 /// ```
188 /// let c = rmqtt_utils::Counter::new();
189 /// c.count_min(5);
190 /// ```
191 #[inline]
192 pub fn count_min(&self, count: isize) {
193 self.0.fetch_min(count, Ordering::SeqCst);
194 }
195
196 /// Sets current count to the maximum of its current value and `count`.
197 ///
198 /// # Example
199 /// ```
200 /// let c = rmqtt_utils::Counter::new();
201 /// c.count_max(10);
202 /// ```
203 #[inline]
204 pub fn count_max(&self, count: isize) {
205 self.0.fetch_max(count, Ordering::SeqCst);
206 }
207
208 /// Sets max to the maximum of its current value and `max`.
209 ///
210 /// # Example
211 /// ```
212 /// let c = rmqtt_utils::Counter::new();
213 /// c.max_max(50);
214 /// ```
215 #[inline]
216 pub fn max_max(&self, max: isize) {
217 self.1.fetch_max(max, Ordering::SeqCst);
218 }
219
220 /// Sets max to the minimum of its current value and `max`.
221 ///
222 /// # Example
223 /// ```
224 /// let c = rmqtt_utils::Counter::new();
225 /// c.max_min(30);
226 /// ```
227 #[inline]
228 pub fn max_min(&self, max: isize) {
229 self.1.fetch_min(max, Ordering::SeqCst);
230 }
231
232 /// Returns the current count value.
233 ///
234 /// # Example
235 /// ```
236 /// let c = rmqtt_utils::Counter::new();
237 /// let v = c.count();
238 /// ```
239 #[inline]
240 pub fn count(&self) -> isize {
241 self.0.load(Ordering::SeqCst)
242 }
243
244 /// Returns the current max value.
245 ///
246 /// # Example
247 /// ```
248 /// let c = rmqtt_utils::Counter::new();
249 /// let m = c.max();
250 /// ```
251 #[inline]
252 pub fn max(&self) -> isize {
253 self.1.load(Ordering::SeqCst)
254 }
255
256 /// Adds values from another counter.
257 ///
258 /// # Example
259 /// ```
260 /// let a = rmqtt_utils::Counter::new_with(3, 10, rmqtt_utils::StatsMergeMode::None);
261 /// let b = rmqtt_utils::Counter::new_with(2, 5, rmqtt_utils::StatsMergeMode::None);
262 /// a.add(&b);
263 /// ```
264 #[inline]
265 pub fn add(&self, other: &Self) {
266 self.0.fetch_add(other.0.load(Ordering::SeqCst), Ordering::SeqCst);
267 self.1.fetch_add(other.1.load(Ordering::SeqCst), Ordering::SeqCst);
268 }
269
270 /// Replaces internal values with those from another counter.
271 ///
272 /// # Example
273 /// ```
274 /// let a = rmqtt_utils::Counter::new();
275 /// let b = rmqtt_utils::Counter::new_with(5, 100, rmqtt_utils::StatsMergeMode::None);
276 /// a.set(&b);
277 /// ```
278 #[inline]
279 pub fn set(&self, other: &Self) {
280 self.0.store(other.0.load(Ordering::SeqCst), Ordering::SeqCst);
281 self.1.store(other.1.load(Ordering::SeqCst), Ordering::SeqCst);
282 }
283
284 /// Merges another counter into this one using the configured merge mode.
285 ///
286 /// # Example
287 /// ```
288 /// let a = rmqtt_utils::Counter::new_with(1, 2, rmqtt_utils::StatsMergeMode::Sum);
289 /// let b = rmqtt_utils::Counter::new_with(3, 4, rmqtt_utils::StatsMergeMode::Sum);
290 /// a.merge(&b);
291 /// ```
292 #[inline]
293 pub fn merge(&self, other: &Self) {
294 stats_merge(&self.2, self, other);
295 }
296
297 /// Converts the counter to JSON format.
298 ///
299 /// # Example
300 /// ```
301 /// let c = rmqtt_utils::Counter::new();
302 /// let json = c.to_json();
303 /// ```
304 #[inline]
305 pub fn to_json(&self) -> serde_json::Value {
306 json!({
307 "count": self.count(),
308 "max": self.max()
309 })
310 }
311}
312
313#[inline]
314fn stats_merge<'a>(mode: &StatsMergeMode, c: &'a Counter, o: &Counter) -> &'a Counter {
315 match mode {
316 StatsMergeMode::None => {}
317 StatsMergeMode::Sum => {
318 c.add(o);
319 }
320 StatsMergeMode::Max => {
321 c.count_max(o.count());
322 c.max_max(o.max());
323 }
324 StatsMergeMode::Min => {
325 c.count_min(o.count());
326 c.max_min(o.max());
327 }
328 _ => {
329 log::info!("unimplemented!");
330 }
331 }
332 c
333}
334
335/// Merge behavior modes for `Counter`.
336#[derive(Clone, Debug, Serialize, Deserialize)]
337pub enum StatsMergeMode {
338 None, // Represents no merging;
339 Sum, // Represents summing the data;
340 Average, // Represents averaging the data;
341 Max, // Represents taking the maximum value of the data;
342 Min, // Represents taking the minimum value of the data;
343}