rmqtt_utils/
lib.rs

1//! Utilities module providing essential types and functions for common system operations
2//!
3//! ## Core Features:
4//! - **Byte Size Handling**: Human-readable byte size parsing/formatting with [`Bytesize`]
5//! - **Duration Conversion**: String-to-Duration parsing supporting multiple time units
6//! - **Timestamp Utilities**: Precise timestamp handling with millisecond resolution
7//! - **Network Addressing**: Cluster node address parsing ([`NodeAddr`]) and socket address handling
8//! - **Counter Implementation**: Thread-safe counter with merge modes ([`Counter`])
9//!
10//! ## Key Components:
11//! - `Bytesize`: Handles 2G512M-style conversions with serialization support
12//! - Time functions: `timestamp_secs()`, `format_timestamp_now()`, and datetime parsing
13//! - `NodeAddr`: Cluster node representation (ID@Address) with parser
14//! - Network address utilities with proper error handling
15//! - Custom serde helpers for duration and address types
16//!
17//! ## Usage Examples:
18//! ```rust
19//! use rmqtt_utils::{Bytesize, NodeAddr, to_bytesize, to_duration, format_timestamp_now};
20//!
21//! // Byte size parsing
22//! let size = Bytesize::from("2G512M");
23//! assert_eq!(size.as_usize(), 2_684_354_560);
24//!
25//! // Duration conversion
26//! let duration = to_duration("1h30m15s");
27//! assert_eq!(duration.as_secs(), 5415);
28//!
29//! // Node address parsing
30//! let node: NodeAddr = "1@mqtt-node:1883".parse().unwrap();
31//! assert_eq!(node.id, 1);
32//!
33//! // Timestamp formatting
34//! let now = format_timestamp_now();
35//! assert!(now.contains("2025")); // Current year
36//! ```
37//!
38//! ## Safety Guarantees:
39//! - Zero `unsafe` code usage (enforced by `#![deny(unsafe_code)]`)
40//! - Comprehensive error handling for parsing operations
41//! - Platform-agnostic network address handling
42//! - Chrono-based timestamp calculations with proper timezone handling
43//!
44//! Overall usage example:
45//!
46//! ```
47//! use rmqtt_utils::{
48//!     Bytesize, NodeAddr,
49//!     to_bytesize, to_duration,
50//!     timestamp_secs, format_timestamp_now
51//! };
52//!
53//! // Parse byte size from string
54//! let size = Bytesize::from("2G512M");
55//!
56//! // Convert duration string
57//! let duration = to_duration("1h30m15s");
58//!
59//! // Parse node address
60//! let node: NodeAddr = "123@127.0.0.1:1883".parse().unwrap();
61//!
62//! // Get formatted timestamp
63//! let now = format_timestamp_now();
64//! ```
65
66#![deny(unsafe_code)]
67
68use std::fmt;
69use std::net::SocketAddr;
70use std::ops::{Deref, DerefMut};
71use std::str::FromStr;
72use std::time::Duration;
73
74use anyhow::{anyhow, Error};
75use bytestring::ByteString;
76use chrono::LocalResult;
77use serde::{
78    de::{self, Deserializer},
79    ser::Serializer,
80    Deserialize, Serialize,
81};
82
83mod counter;
84
85pub use counter::{Counter, StatsMergeMode};
86
87/// Cluster node identifier type (64-bit unsigned integer)
88pub type NodeId = u64;
89
90/// Network address storage using efficient ByteString
91pub type Addr = ByteString;
92
93/// Timestamp representation in seconds since Unix epoch
94pub type Timestamp = i64;
95
96/// Timestamp representation in milliseconds since Unix epoch
97pub type TimestampMillis = i64;
98
99const BYTESIZE_K: usize = 1024;
100const BYTESIZE_M: usize = 1048576;
101const BYTESIZE_G: usize = 1073741824;
102
103/// Human-readable byte size representation with parsing/serialization support
104///
105/// # Example:
106/// ```
107/// use rmqtt_utils::Bytesize;
108///
109/// // Create from string
110/// let size = Bytesize::from("2G512M");
111/// assert_eq!(size.as_usize(), 2_684_354_560);
112///
113/// // Create from integer
114/// let size = Bytesize::from(1024);
115/// assert_eq!(size.string(), "1K");
116/// ```
117#[derive(Clone, Copy, Default)]
118pub struct Bytesize(pub usize);
119
120impl Bytesize {
121    /// Convert to u32 (may truncate on 32-bit platforms)
122    ///
123    /// # Example:
124    /// ```
125    /// let size = rmqtt_utils::Bytesize(5000);
126    /// assert_eq!(size.as_u32(), 5000);
127    /// ```
128    #[inline]
129    pub fn as_u32(&self) -> u32 {
130        self.0 as u32
131    }
132
133    /// Convert to u64
134    ///
135    /// # Example:
136    /// ```
137    /// let size = rmqtt_utils::Bytesize(usize::MAX);
138    /// assert_eq!(size.as_u64(), usize::MAX as u64);
139    /// ```
140    #[inline]
141    pub fn as_u64(&self) -> u64 {
142        self.0 as u64
143    }
144
145    /// Get underlying usize value
146    ///
147    /// # Example:
148    /// ```
149    /// let size = rmqtt_utils::Bytesize(1024);
150    /// assert_eq!(size.as_usize(), 1024);
151    /// ```
152    #[inline]
153    pub fn as_usize(&self) -> usize {
154        self.0
155    }
156
157    /// Format bytesize to human-readable string
158    ///
159    /// # Example:
160    /// ```
161    /// let size = rmqtt_utils::Bytesize(3145728);
162    /// assert_eq!(size.string(), "3M");
163    ///
164    /// let mixed = rmqtt_utils::Bytesize(2148532224);
165    /// assert_eq!(mixed.string(), "2G1M");
166    /// ```
167    #[inline]
168    pub fn string(&self) -> String {
169        let mut v = self.0;
170        let mut res = String::new();
171
172        let g = v / BYTESIZE_G;
173        if g > 0 {
174            res.push_str(&format!("{}G", g));
175            v %= BYTESIZE_G;
176        }
177
178        let m = v / BYTESIZE_M;
179        if m > 0 {
180            res.push_str(&format!("{}M", m));
181            v %= BYTESIZE_M;
182        }
183
184        let k = v / BYTESIZE_K;
185        if k > 0 {
186            res.push_str(&format!("{}K", k));
187            v %= BYTESIZE_K;
188        }
189
190        if v > 0 {
191            res.push_str(&format!("{}B", v));
192        }
193
194        res
195    }
196}
197
198impl Deref for Bytesize {
199    type Target = usize;
200    fn deref(&self) -> &Self::Target {
201        &self.0
202    }
203}
204
205impl DerefMut for Bytesize {
206    fn deref_mut(&mut self) -> &mut Self::Target {
207        &mut self.0
208    }
209}
210
211impl From<usize> for Bytesize {
212    fn from(v: usize) -> Self {
213        Bytesize(v)
214    }
215}
216
217impl From<&str> for Bytesize {
218    fn from(v: &str) -> Self {
219        Bytesize(to_bytesize(v))
220    }
221}
222
223impl fmt::Debug for Bytesize {
224    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
225        write!(f, "{}", self.string())?;
226        Ok(())
227    }
228}
229
230impl Serialize for Bytesize {
231    #[inline]
232    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
233    where
234        S: Serializer,
235    {
236        serializer.serialize_str(&self.to_string())
237    }
238}
239
240impl<'de> Deserialize<'de> for Bytesize {
241    #[inline]
242    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
243    where
244        D: Deserializer<'de>,
245    {
246        let v = to_bytesize(&String::deserialize(deserializer)?);
247        Ok(Bytesize(v))
248    }
249}
250
251/// Parse human-readable byte size string to usize
252///
253/// # Example:
254/// ```
255/// let bytes = rmqtt_utils::to_bytesize("2G512K");
256/// assert_eq!(bytes, 2148007936);
257///
258/// let complex = rmqtt_utils::to_bytesize("1G500M256K1024B");
259/// assert_eq!(complex, 1598292992);
260/// ```
261#[inline]
262pub fn to_bytesize(text: &str) -> usize {
263    let text = text.to_uppercase().replace("GB", "G").replace("MB", "M").replace("KB", "K");
264    text.split_inclusive(['G', 'M', 'K', 'B'])
265        .map(|x| {
266            let mut chars = x.chars();
267            let u = match chars.nth_back(0) {
268                None => return 0,
269                Some(u) => u,
270            };
271            let v = match chars.as_str().parse::<usize>() {
272                Err(_e) => return 0,
273                Ok(v) => v,
274            };
275            match u {
276                'B' => v,
277                'K' => v * BYTESIZE_K,
278                'M' => v * BYTESIZE_M,
279                'G' => v * BYTESIZE_G,
280                _ => 0,
281            }
282        })
283        .sum()
284}
285
286/// Deserialize Duration from human-readable string format
287#[inline]
288pub fn deserialize_duration<'de, D>(deserializer: D) -> std::result::Result<Duration, D::Error>
289where
290    D: Deserializer<'de>,
291{
292    let v = String::deserialize(deserializer)?;
293    Ok(to_duration(&v))
294}
295
296/// Deserialize optional Duration from string
297#[inline]
298pub fn deserialize_duration_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
299where
300    D: Deserializer<'de>,
301{
302    let v = String::deserialize(deserializer)?;
303    if v.is_empty() {
304        Ok(None)
305    } else {
306        Ok(Some(to_duration(&v)))
307    }
308}
309
310/// Convert human-readable duration string to Duration
311///
312/// # Supported units:
313/// - Y: milliseconds (e.g. "100Y" = 100ms)
314/// - s: seconds
315/// - m: minutes
316/// - h: hours
317/// - d: days
318/// - w: weeks
319/// - f: fortnight (2 weeks)
320///
321/// # Example:
322/// ```
323/// let duration = rmqtt_utils::to_duration("1h30m15s");
324/// assert_eq!(duration.as_secs(), 5415);
325///
326/// let complex = rmqtt_utils::to_duration("2w3d12h");
327/// assert_eq!(complex.as_secs(), 1512000);
328/// ```
329#[inline]
330pub fn to_duration(text: &str) -> Duration {
331    let text = text.to_lowercase().replace("ms", "Y");
332    let ms: u64 = text
333        .split_inclusive(['s', 'm', 'h', 'd', 'w', 'f', 'Y'])
334        .map(|x| {
335            let mut chars = x.chars();
336            let u = match chars.nth_back(0) {
337                None => return 0,
338                Some(u) => u,
339            };
340            let v = match chars.as_str().parse::<u64>() {
341                Err(_e) => return 0,
342                Ok(v) => v,
343            };
344            match u {
345                'Y' => v,
346                's' => v * 1000,
347                'm' => v * 60000,
348                'h' => v * 3600000,
349                'd' => v * 86400000,
350                'w' => v * 604800000,
351                'f' => v * 1209600000,
352                _ => 0,
353            }
354        })
355        .sum();
356    Duration::from_millis(ms)
357}
358
359/// Deserialize SocketAddr with error handling
360#[inline]
361pub fn deserialize_addr<'de, D>(deserializer: D) -> std::result::Result<SocketAddr, D::Error>
362where
363    D: Deserializer<'de>,
364{
365    let addr = String::deserialize(deserializer)?
366        .parse::<std::net::SocketAddr>()
367        .map_err(serde::de::Error::custom)?;
368    Ok(addr)
369}
370
371/// Deserialize optional SocketAddr with port handling
372#[inline]
373pub fn deserialize_addr_option<'de, D>(
374    deserializer: D,
375) -> std::result::Result<Option<std::net::SocketAddr>, D::Error>
376where
377    D: Deserializer<'de>,
378{
379    let addr = String::deserialize(deserializer).map(|mut addr| {
380        if !addr.contains(':') {
381            addr += ":0";
382        }
383        addr
384    })?;
385    let addr = addr.parse::<std::net::SocketAddr>().map_err(serde::de::Error::custom)?;
386    Ok(Some(addr))
387}
388
389/// Deserialize optional datetime from string
390#[inline]
391pub fn deserialize_datetime_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
392where
393    D: Deserializer<'de>,
394{
395    let t_str = String::deserialize(deserializer)?;
396    if t_str.is_empty() {
397        Ok(None)
398    } else {
399        let t = if let Ok(d) = timestamp_parse_from_str(&t_str, "%Y-%m-%d %H:%M:%S") {
400            Duration::from_secs(d as u64)
401        } else {
402            let d = t_str.parse::<u64>().map_err(serde::de::Error::custom)?;
403            Duration::from_secs(d)
404        };
405        Ok(Some(t))
406    }
407}
408
409/// Serialize optional datetime to string
410#[inline]
411pub fn serialize_datetime_option<S>(t: &Option<Duration>, s: S) -> std::result::Result<S::Ok, S::Error>
412where
413    S: Serializer,
414{
415    if let Some(t) = t {
416        t.as_secs().to_string().serialize(s)
417    } else {
418        "".serialize(s)
419    }
420}
421
422/// Internal datetime parsing helper
423#[inline]
424fn timestamp_parse_from_str(ts: &str, fmt: &str) -> anyhow::Result<i64> {
425    let ndt = chrono::NaiveDateTime::parse_from_str(ts, fmt)?;
426    let ndt = ndt.and_local_timezone(*chrono::Local::now().offset());
427    match ndt {
428        LocalResult::None => Err(anyhow::Error::msg("Impossible")),
429        LocalResult::Single(d) => Ok(d.timestamp()),
430        LocalResult::Ambiguous(d, _tz) => Ok(d.timestamp()),
431    }
432}
433
434/// Get current timestamp as Duration
435///
436/// # Example:
437/// ```
438/// let ts = rmqtt_utils::timestamp();
439/// assert!(ts.as_secs() > 0);
440/// ```
441#[inline]
442pub fn timestamp() -> Duration {
443    use std::time::{SystemTime, UNIX_EPOCH};
444    SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_else(|_| {
445        let now = chrono::Local::now();
446        Duration::new(now.timestamp() as u64, now.timestamp_subsec_nanos())
447    })
448}
449
450/// Get current timestamp in seconds
451///
452/// # Example:
453/// ```
454/// let ts = rmqtt_utils::timestamp_secs();
455/// assert!(ts > 0);
456/// ```
457#[inline]
458pub fn timestamp_secs() -> Timestamp {
459    use std::time::{SystemTime, UNIX_EPOCH};
460    SystemTime::now()
461        .duration_since(UNIX_EPOCH)
462        .map(|t| t.as_secs() as i64)
463        .unwrap_or_else(|_| chrono::Local::now().timestamp())
464}
465
466/// Get current timestamp in milliseconds
467///
468/// # Example:
469/// ```
470/// let ts = rmqtt_utils::timestamp_millis();
471/// assert!(ts > 0);
472/// ```
473#[inline]
474pub fn timestamp_millis() -> TimestampMillis {
475    use std::time::{SystemTime, UNIX_EPOCH};
476    SystemTime::now()
477        .duration_since(UNIX_EPOCH)
478        .map(|t| t.as_millis() as i64)
479        .unwrap_or_else(|_| chrono::Local::now().timestamp_millis())
480}
481
482/// Format timestamp (seconds) to human-readable string
483#[inline]
484pub fn format_timestamp(t: Timestamp) -> String {
485    if t <= 0 {
486        "".into()
487    } else {
488        use chrono::TimeZone;
489        if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_opt(t, 0) {
490            t.format("%Y-%m-%d %H:%M:%S").to_string()
491        } else {
492            "".into()
493        }
494    }
495}
496
497/// Format current timestamp to string
498///
499/// # Example:
500/// ```
501/// let now = rmqtt_utils::format_timestamp_now();
502/// assert!(!now.is_empty());
503/// ```
504#[inline]
505pub fn format_timestamp_now() -> String {
506    format_timestamp(timestamp_secs())
507}
508
509/// Format millisecond timestamp to string
510#[inline]
511pub fn format_timestamp_millis(t: TimestampMillis) -> String {
512    if t <= 0 {
513        "".into()
514    } else {
515        use chrono::TimeZone;
516        if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_millis_opt(t) {
517            t.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
518        } else {
519            "".into()
520        }
521    }
522}
523
524/// Format current millisecond timestamp to string
525///
526/// # Example:
527/// ```
528/// let now = rmqtt_utils::format_timestamp_millis_now();
529/// assert!(!now.is_empty());
530/// ```
531#[inline]
532pub fn format_timestamp_millis_now() -> String {
533    format_timestamp_millis(timestamp_millis())
534}
535
536/// Cluster node address representation (ID@Address)
537///
538/// # Example:
539/// ```
540/// use rmqtt_utils::NodeAddr;
541///
542/// // Parse from string
543/// let node: NodeAddr = "123@mqtt.example.com:1883".parse().unwrap();
544/// assert_eq!(node.id, 123);
545/// assert_eq!(node.addr, "mqtt.example.com:1883");
546///
547/// // Direct construction
548/// let node = NodeAddr {
549///     id: 456,
550///     addr: rmqtt_utils::Addr::from("localhost:8883")
551/// };
552/// ```
553#[derive(Clone, Serialize)]
554pub struct NodeAddr {
555    /// Unique node identifier
556    pub id: NodeId,
557
558    /// Network address in host:port format
559    pub addr: Addr,
560}
561
562impl std::fmt::Debug for NodeAddr {
563    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564        write!(f, "{}@{:?}", self.id, self.addr)
565    }
566}
567
568impl FromStr for NodeAddr {
569    type Err = Error;
570    fn from_str(s: &str) -> Result<Self, Self::Err> {
571        let parts: Vec<&str> = s.split('@').collect();
572        if parts.len() < 2 {
573            return Err(anyhow!(format!("NodeAddr format error, {}", s)));
574        }
575        let id = NodeId::from_str(parts[0])?;
576        let addr = Addr::from(parts[1]);
577        Ok(NodeAddr { id, addr })
578    }
579}
580
581impl<'de> de::Deserialize<'de> for NodeAddr {
582    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
583    where
584        D: de::Deserializer<'de>,
585    {
586        NodeAddr::from_str(&String::deserialize(deserializer)?).map_err(de::Error::custom)
587    }
588}