rmqtt_utils/
lib.rs

1//! Utilities module providing essential types and functions for common system operations
2//!
3//! ## Core Features:
4//! - **Byte Size Handling**: Human-readable byte size parsing/formatting with [`Bytesize`]
5//! - **Duration Conversion**: String-to-Duration parsing supporting multiple time units
6//! - **Timestamp Utilities**: Precise timestamp handling with millisecond resolution
7//! - **Network Addressing**: Cluster node address parsing ([`NodeAddr`]) and socket address handling
8//! - **Counter Implementation**: Thread-safe counter with merge modes ([`Counter`])
9//!
10//! ## Key Components:
11//! - `Bytesize`: Handles 2G512M-style conversions with serialization support
12//! - Time functions: `timestamp_secs()`, `format_timestamp_now()`, and datetime parsing
13//! - `NodeAddr`: Cluster node representation (ID@Address) with parser
14//! - Network address utilities with proper error handling
15//! - Custom serde helpers for duration and address types
16//!
17//! ## Usage Examples:
18//! ```rust
19//! use rmqtt_utils::{Bytesize, NodeAddr, to_bytesize, to_duration, format_timestamp_now};
20//!
21//! // Byte size parsing
22//! let size = Bytesize::try_from("2G512M").unwrap();
23//! assert_eq!(size.as_usize(), 2_684_354_560);
24//!
25//! // Duration conversion
26//! let duration = to_duration("1h30m15s");
27//! assert_eq!(duration.as_secs(), 5415);
28//!
29//! // Node address parsing
30//! let node: NodeAddr = "1@mqtt-node:1883".parse().unwrap();
31//! assert_eq!(node.id, 1);
32//!
33//! // Timestamp formatting
34//! let now = format_timestamp_now();
35//! assert!(now.contains("2025")); // Current year
36//! ```
37//!
38//! ## Safety Guarantees:
39//! - Zero `unsafe` code usage (enforced by `#![deny(unsafe_code)]`)
40//! - Comprehensive error handling for parsing operations
41//! - Platform-agnostic network address handling
42//! - Chrono-based timestamp calculations with proper timezone handling
43//!
44//! Overall usage example:
45//!
46//! ```
47//! use rmqtt_utils::{
48//!     Bytesize, NodeAddr,
49//!     to_bytesize, to_duration,
50//!     timestamp_secs, format_timestamp_now
51//! };
52//!
53//! // Parse byte size from string
54//! let size = Bytesize::try_from("2G512M");
55//!
56//! // Convert duration string
57//! let duration = to_duration("1h30m15s");
58//!
59//! // Parse node address
60//! let node: NodeAddr = "123@127.0.0.1:1883".parse().unwrap();
61//!
62//! // Get formatted timestamp
63//! let now = format_timestamp_now();
64//! ```
65
66#![deny(unsafe_code)]
67
68use std::fmt;
69use std::net::SocketAddr;
70use std::ops::{Deref, DerefMut};
71use std::str::FromStr;
72use std::time::Duration;
73
74use anyhow::{anyhow, Error};
75use bytestring::ByteString;
76use chrono::LocalResult;
77use serde::{
78    de::{self, Deserializer},
79    ser::Serializer,
80    Deserialize, Serialize,
81};
82
83mod counter;
84
85pub use counter::{Counter, StatsMergeMode};
86
87/// Cluster node identifier type (64-bit unsigned integer)
88pub type NodeId = u64;
89
90/// Network address storage using efficient ByteString
91pub type Addr = ByteString;
92
93/// Timestamp representation in seconds since Unix epoch
94pub type Timestamp = i64;
95
96/// Timestamp representation in milliseconds since Unix epoch
97pub type TimestampMillis = i64;
98
99const BYTESIZE_K: usize = 1024;
100const BYTESIZE_M: usize = 1048576;
101const BYTESIZE_G: usize = 1073741824;
102
103/// Human-readable byte size representation with parsing/serialization support
104///
105/// # Example:
106/// ```
107/// use rmqtt_utils::Bytesize;
108///
109/// // Create from string
110/// let size = Bytesize::try_from("2G512M").unwrap();
111/// assert_eq!(size.as_usize(), 2_684_354_560);
112///
113/// // Create from integer
114/// let size = Bytesize::from(1024);
115/// assert_eq!(size.string(), "1K");
116/// ```
117#[derive(Clone, Copy, Default)]
118pub struct Bytesize(pub usize);
119
120impl Bytesize {
121    /// Convert to u32 (may truncate on 32-bit platforms)
122    ///
123    /// # Example:
124    /// ```
125    /// let size = rmqtt_utils::Bytesize(5000);
126    /// assert_eq!(size.as_u32(), 5000);
127    /// ```
128    #[inline]
129    pub fn as_u32(&self) -> u32 {
130        self.0 as u32
131    }
132
133    /// Convert to u64
134    ///
135    /// # Example:
136    /// ```
137    /// let size = rmqtt_utils::Bytesize(usize::MAX);
138    /// assert_eq!(size.as_u64(), usize::MAX as u64);
139    /// ```
140    #[inline]
141    pub fn as_u64(&self) -> u64 {
142        self.0 as u64
143    }
144
145    /// Get underlying usize value
146    ///
147    /// # Example:
148    /// ```
149    /// let size = rmqtt_utils::Bytesize(1024);
150    /// assert_eq!(size.as_usize(), 1024);
151    /// ```
152    #[inline]
153    pub fn as_usize(&self) -> usize {
154        self.0
155    }
156
157    /// Format bytesize to human-readable string
158    ///
159    /// # Example:
160    /// ```
161    /// let size = rmqtt_utils::Bytesize(3145728);
162    /// assert_eq!(size.string(), "3M");
163    ///
164    /// let mixed = rmqtt_utils::Bytesize(2148532224);
165    /// assert_eq!(mixed.string(), "2G1M");
166    /// ```
167    #[inline]
168    pub fn string(&self) -> String {
169        let mut v = self.0;
170        let mut res = String::new();
171
172        let g = v / BYTESIZE_G;
173        if g > 0 {
174            res.push_str(&format!("{g}G"));
175            v %= BYTESIZE_G;
176        }
177
178        let m = v / BYTESIZE_M;
179        if m > 0 {
180            res.push_str(&format!("{m}M"));
181            v %= BYTESIZE_M;
182        }
183
184        let k = v / BYTESIZE_K;
185        if k > 0 {
186            res.push_str(&format!("{k}K"));
187            v %= BYTESIZE_K;
188        }
189
190        if v > 0 {
191            res.push_str(&format!("{v}B"));
192        }
193
194        res
195    }
196}
197
198impl Deref for Bytesize {
199    type Target = usize;
200    fn deref(&self) -> &Self::Target {
201        &self.0
202    }
203}
204
205impl DerefMut for Bytesize {
206    fn deref_mut(&mut self) -> &mut Self::Target {
207        &mut self.0
208    }
209}
210
211impl From<usize> for Bytesize {
212    fn from(v: usize) -> Self {
213        Bytesize(v)
214    }
215}
216
217impl TryFrom<&str> for Bytesize {
218    type Error = ParseSizeError;
219    fn try_from(v: &str) -> Result<Self, Self::Error> {
220        let value = to_bytesize(v)?;
221        Ok(Bytesize(value))
222    }
223}
224
225impl FromStr for Bytesize {
226    type Err = ParseSizeError;
227    fn from_str(s: &str) -> Result<Self, Self::Err> {
228        Ok(Bytesize(to_bytesize(s)?))
229    }
230}
231
232impl fmt::Debug for Bytesize {
233    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
234        write!(f, "{}", self.string())?;
235        Ok(())
236    }
237}
238
239impl fmt::Display for Bytesize {
240    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
241        write!(f, "{}", self.string())
242    }
243}
244
245impl Serialize for Bytesize {
246    #[inline]
247    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
248    where
249        S: Serializer,
250    {
251        serializer.serialize_str(&self.to_string())
252    }
253}
254
255impl<'de> Deserialize<'de> for Bytesize {
256    #[inline]
257    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
258    where
259        D: Deserializer<'de>,
260    {
261        let v = to_bytesize(&String::deserialize(deserializer)?).map_err(de::Error::custom)?;
262        Ok(Bytesize(v))
263    }
264}
265
266/// Parse human-readable byte size string to usize
267///
268/// # Example:
269/// ```
270/// let bytes = rmqtt_utils::to_bytesize("2G512K");
271/// assert_eq!(bytes, Ok(2148007936));
272///
273/// let complex = rmqtt_utils::to_bytesize("1G500M256K1024B");
274/// assert_eq!(complex, Ok(1598292992));
275/// ```
276#[inline]
277pub fn to_bytesize(text: &str) -> Result<usize, ParseSizeError> {
278    let text = text.to_uppercase().replace("GB", "G").replace("MB", "M").replace("KB", "K");
279    text.split_inclusive(['G', 'M', 'K', 'B'])
280        .map(|x| {
281            let mut chars = x.chars();
282            let u = chars.nth_back(0).ok_or(ParseSizeError::InvalidFormat)?;
283            let num_str = chars.as_str();
284            let v =
285                num_str.parse::<usize>().map_err(|_| ParseSizeError::InvalidNumber(num_str.to_string()))?;
286            match u {
287                'B' => Ok(v),
288                'K' => Ok(v * BYTESIZE_K),
289                'M' => Ok(v * BYTESIZE_M),
290                'G' => Ok(v * BYTESIZE_G),
291                _ => Err(ParseSizeError::InvalidUnit(u)),
292            }
293        })
294        .sum()
295}
296
297#[derive(Debug, Eq, PartialEq, Clone)]
298pub enum ParseSizeError {
299    InvalidFormat,
300    InvalidNumber(String),
301    InvalidUnit(char),
302}
303
304impl std::fmt::Display for ParseSizeError {
305    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
306        match self {
307            Self::InvalidFormat => write!(f, "invalid size format"),
308            Self::InvalidNumber(s) => write!(f, "invalid number: '{s}'"),
309            Self::InvalidUnit(c) => write!(f, "invalid unit: '{c}'"),
310        }
311    }
312}
313
314impl std::error::Error for ParseSizeError {}
315
316/// Deserialize Duration from human-readable string format
317#[inline]
318pub fn deserialize_duration<'de, D>(deserializer: D) -> std::result::Result<Duration, D::Error>
319where
320    D: Deserializer<'de>,
321{
322    let v = String::deserialize(deserializer)?;
323    Ok(to_duration(&v))
324}
325
326/// Deserialize optional Duration from string
327#[inline]
328pub fn deserialize_duration_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
329where
330    D: Deserializer<'de>,
331{
332    let v = String::deserialize(deserializer)?;
333    if v.is_empty() {
334        Ok(None)
335    } else {
336        Ok(Some(to_duration(&v)))
337    }
338}
339
340/// Convert human-readable duration string to Duration
341///
342/// # Supported units:
343/// - ms: milliseconds
344/// - s: seconds
345/// - m: minutes
346/// - h: hours
347/// - d: days
348/// - w: weeks
349/// - f: fortnight (2 weeks)
350///
351/// # Example:
352/// ```
353/// let duration = rmqtt_utils::to_duration("1h30m15s");
354/// assert_eq!(duration.as_secs(), 5415);
355///
356/// let complex = rmqtt_utils::to_duration("2w3d12h");
357/// assert_eq!(complex.as_secs(), 1512000);
358/// ```
359#[inline]
360pub fn to_duration(text: &str) -> Duration {
361    let text = text.to_lowercase().replace("ms", "Y");
362    let ms: u64 = text
363        .split_inclusive(['s', 'm', 'h', 'd', 'w', 'f', 'Y'])
364        .map(|x| {
365            let mut chars = x.chars();
366            let u = match chars.nth_back(0) {
367                None => return 0,
368                Some(u) => u,
369            };
370            let v = match chars.as_str().parse::<u64>() {
371                Err(_e) => return 0,
372                Ok(v) => v,
373            };
374            match u {
375                'Y' => v,
376                's' => v * 1000,
377                'm' => v * 60000,
378                'h' => v * 3600000,
379                'd' => v * 86400000,
380                'w' => v * 604800000,
381                'f' => v * 1209600000,
382                _ => 0,
383            }
384        })
385        .sum();
386    Duration::from_millis(ms)
387}
388
389/// Deserialize SocketAddr with error handling
390#[inline]
391pub fn deserialize_addr<'de, D>(deserializer: D) -> std::result::Result<SocketAddr, D::Error>
392where
393    D: Deserializer<'de>,
394{
395    let addr = String::deserialize(deserializer)?
396        .parse::<std::net::SocketAddr>()
397        .map_err(serde::de::Error::custom)?;
398    Ok(addr)
399}
400
401/// Deserialize optional SocketAddr with port handling
402#[inline]
403pub fn deserialize_addr_option<'de, D>(
404    deserializer: D,
405) -> std::result::Result<Option<std::net::SocketAddr>, D::Error>
406where
407    D: Deserializer<'de>,
408{
409    let addr = String::deserialize(deserializer).map(|mut addr| {
410        if !addr.contains(':') {
411            addr += ":0";
412        }
413        addr
414    })?;
415    let addr = addr.parse::<std::net::SocketAddr>().map_err(serde::de::Error::custom)?;
416    Ok(Some(addr))
417}
418
419/// Deserialize optional datetime from string
420#[inline]
421pub fn deserialize_datetime_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
422where
423    D: Deserializer<'de>,
424{
425    let t_str = String::deserialize(deserializer)?;
426    if t_str.is_empty() {
427        Ok(None)
428    } else {
429        let t = if let Ok(d) = timestamp_parse_from_str(&t_str, "%Y-%m-%d %H:%M:%S") {
430            Duration::from_secs(d as u64)
431        } else {
432            let d = t_str.parse::<u64>().map_err(serde::de::Error::custom)?;
433            Duration::from_secs(d)
434        };
435        Ok(Some(t))
436    }
437}
438
439/// Serialize optional datetime to string
440#[inline]
441pub fn serialize_datetime_option<S>(t: &Option<Duration>, s: S) -> std::result::Result<S::Ok, S::Error>
442where
443    S: Serializer,
444{
445    if let Some(t) = t {
446        t.as_secs().to_string().serialize(s)
447    } else {
448        "".serialize(s)
449    }
450}
451
452/// Internal datetime parsing helper
453#[inline]
454fn timestamp_parse_from_str(ts: &str, fmt: &str) -> anyhow::Result<i64> {
455    let ndt = chrono::NaiveDateTime::parse_from_str(ts, fmt)?;
456    let ndt = ndt.and_local_timezone(*chrono::Local::now().offset());
457    match ndt {
458        LocalResult::None => Err(anyhow::Error::msg("Impossible")),
459        LocalResult::Single(d) => Ok(d.timestamp()),
460        LocalResult::Ambiguous(d, _tz) => Ok(d.timestamp()),
461    }
462}
463
464/// Get current timestamp as Duration
465///
466/// # Example:
467/// ```
468/// let ts = rmqtt_utils::timestamp();
469/// assert!(ts.as_secs() > 0);
470/// ```
471#[inline]
472pub fn timestamp() -> Duration {
473    use std::time::{SystemTime, UNIX_EPOCH};
474    SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_else(|_| {
475        let now = chrono::Local::now();
476        Duration::new(now.timestamp() as u64, now.timestamp_subsec_nanos())
477    })
478}
479
480/// Get current timestamp in seconds
481///
482/// # Example:
483/// ```
484/// let ts = rmqtt_utils::timestamp_secs();
485/// assert!(ts > 0);
486/// ```
487#[inline]
488pub fn timestamp_secs() -> Timestamp {
489    use std::time::{SystemTime, UNIX_EPOCH};
490    SystemTime::now()
491        .duration_since(UNIX_EPOCH)
492        .map(|t| t.as_secs() as i64)
493        .unwrap_or_else(|_| chrono::Local::now().timestamp())
494}
495
496/// Get current timestamp in milliseconds
497///
498/// # Example:
499/// ```
500/// let ts = rmqtt_utils::timestamp_millis();
501/// assert!(ts > 0);
502/// ```
503#[inline]
504pub fn timestamp_millis() -> TimestampMillis {
505    use std::time::{SystemTime, UNIX_EPOCH};
506    SystemTime::now()
507        .duration_since(UNIX_EPOCH)
508        .map(|t| t.as_millis() as i64)
509        .unwrap_or_else(|_| chrono::Local::now().timestamp_millis())
510}
511
512/// Format timestamp (seconds) to human-readable string
513#[inline]
514pub fn format_timestamp(t: Timestamp) -> String {
515    if t <= 0 {
516        "".into()
517    } else {
518        use chrono::TimeZone;
519        if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_opt(t, 0) {
520            t.format("%Y-%m-%d %H:%M:%S").to_string()
521        } else {
522            "".into()
523        }
524    }
525}
526
527/// Format current timestamp to string
528///
529/// # Example:
530/// ```
531/// let now = rmqtt_utils::format_timestamp_now();
532/// assert!(!now.is_empty());
533/// ```
534#[inline]
535pub fn format_timestamp_now() -> String {
536    format_timestamp(timestamp_secs())
537}
538
539/// Format millisecond timestamp to string
540#[inline]
541pub fn format_timestamp_millis(t: TimestampMillis) -> String {
542    if t <= 0 {
543        "".into()
544    } else {
545        use chrono::TimeZone;
546        if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_millis_opt(t) {
547            t.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
548        } else {
549            "".into()
550        }
551    }
552}
553
554/// Format current millisecond timestamp to string
555///
556/// # Example:
557/// ```
558/// let now = rmqtt_utils::format_timestamp_millis_now();
559/// assert!(!now.is_empty());
560/// ```
561#[inline]
562pub fn format_timestamp_millis_now() -> String {
563    format_timestamp_millis(timestamp_millis())
564}
565
566/// Cluster node address representation (ID@Address)
567///
568/// # Example:
569/// ```
570/// use rmqtt_utils::NodeAddr;
571///
572/// // Parse from string
573/// let node: NodeAddr = "123@mqtt.example.com:1883".parse().unwrap();
574/// assert_eq!(node.id, 123);
575/// assert_eq!(node.addr, "mqtt.example.com:1883");
576///
577/// // Direct construction
578/// let node = NodeAddr {
579///     id: 456,
580///     addr: rmqtt_utils::Addr::from("localhost:8883")
581/// };
582/// ```
583#[derive(Clone, Serialize)]
584pub struct NodeAddr {
585    /// Unique node identifier
586    pub id: NodeId,
587
588    /// Network address in host:port format
589    pub addr: Addr,
590}
591
592impl std::fmt::Debug for NodeAddr {
593    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
594        write!(f, "{}@{:?}", self.id, self.addr)
595    }
596}
597
598impl FromStr for NodeAddr {
599    type Err = Error;
600    fn from_str(s: &str) -> Result<Self, Self::Err> {
601        let parts: Vec<&str> = s.split('@').collect();
602        if parts.len() < 2 {
603            return Err(anyhow!(format!("NodeAddr format error, {}", s)));
604        }
605        let id = NodeId::from_str(parts[0])?;
606        let addr = Addr::from(parts[1]);
607        Ok(NodeAddr { id, addr })
608    }
609}
610
611impl<'de> de::Deserialize<'de> for NodeAddr {
612    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
613    where
614        D: de::Deserializer<'de>,
615    {
616        NodeAddr::from_str(&String::deserialize(deserializer)?).map_err(de::Error::custom)
617    }
618}