rmqtt_utils/
lib.rs

1//! Overall usage example:
2//!
3//! ```
4//! use rmqtt_utils::{
5//!     Bytesize, NodeAddr,
6//!     to_bytesize, to_duration,
7//!     timestamp_secs, format_timestamp_now
8//! };
9//!
10//! // Parse byte size from string
11//! let size = Bytesize::from("2G512M");
12//!
13//! // Convert duration string
14//! let duration = to_duration("1h30m15s");
15//!
16//! // Parse node address
17//! let node: NodeAddr = "123@127.0.0.1:1883".parse().unwrap();
18//!
19//! // Get formatted timestamp
20//! let now = format_timestamp_now();
21//! ```
22
23#![deny(unsafe_code)]
24
25use std::fmt;
26use std::net::SocketAddr;
27use std::ops::{Deref, DerefMut};
28use std::str::FromStr;
29use std::time::Duration;
30
31use anyhow::{anyhow, Error};
32use bytestring::ByteString;
33use chrono::LocalResult;
34use serde::{
35    de::{self, Deserializer},
36    ser::Serializer,
37    Deserialize, Serialize,
38};
39
40mod counter;
41
42pub use counter::{Counter, StatsMergeMode};
43
44/// Cluster node identifier type (64-bit unsigned integer)
45pub type NodeId = u64;
46
47/// Network address storage using efficient ByteString
48pub type Addr = ByteString;
49
50/// Timestamp representation in seconds since Unix epoch
51pub type Timestamp = i64;
52
53/// Timestamp representation in milliseconds since Unix epoch
54pub type TimestampMillis = i64;
55
56const BYTESIZE_K: usize = 1024;
57const BYTESIZE_M: usize = 1048576;
58const BYTESIZE_G: usize = 1073741824;
59
60/// Human-readable byte size representation with parsing/serialization support
61///
62/// # Example:
63/// ```
64/// use rmqtt_utils::Bytesize;
65///
66/// // Create from string
67/// let size = Bytesize::from("2G512M");
68/// assert_eq!(size.as_usize(), 2_684_354_560);
69///
70/// // Create from integer
71/// let size = Bytesize::from(1024);
72/// assert_eq!(size.string(), "1K");
73/// ```
74#[derive(Clone, Copy, Default)]
75pub struct Bytesize(pub usize);
76
77impl Bytesize {
78    /// Convert to u32 (may truncate on 32-bit platforms)
79    ///
80    /// # Example:
81    /// ```
82    /// let size = rmqtt_utils::Bytesize(5000);
83    /// assert_eq!(size.as_u32(), 5000);
84    /// ```
85    #[inline]
86    pub fn as_u32(&self) -> u32 {
87        self.0 as u32
88    }
89
90    /// Convert to u64
91    ///
92    /// # Example:
93    /// ```
94    /// let size = rmqtt_utils::Bytesize(usize::MAX);
95    /// assert_eq!(size.as_u64(), usize::MAX as u64);
96    /// ```
97    #[inline]
98    pub fn as_u64(&self) -> u64 {
99        self.0 as u64
100    }
101
102    /// Get underlying usize value
103    ///
104    /// # Example:
105    /// ```
106    /// let size = rmqtt_utils::Bytesize(1024);
107    /// assert_eq!(size.as_usize(), 1024);
108    /// ```
109    #[inline]
110    pub fn as_usize(&self) -> usize {
111        self.0
112    }
113
114    /// Format bytesize to human-readable string
115    ///
116    /// # Example:
117    /// ```
118    /// let size = rmqtt_utils::Bytesize(3145728);
119    /// assert_eq!(size.string(), "3M");
120    ///
121    /// let mixed = rmqtt_utils::Bytesize(2148532224);
122    /// assert_eq!(mixed.string(), "2G1M");
123    /// ```
124    #[inline]
125    pub fn string(&self) -> String {
126        let mut v = self.0;
127        let mut res = String::new();
128
129        let g = v / BYTESIZE_G;
130        if g > 0 {
131            res.push_str(&format!("{}G", g));
132            v %= BYTESIZE_G;
133        }
134
135        let m = v / BYTESIZE_M;
136        if m > 0 {
137            res.push_str(&format!("{}M", m));
138            v %= BYTESIZE_M;
139        }
140
141        let k = v / BYTESIZE_K;
142        if k > 0 {
143            res.push_str(&format!("{}K", k));
144            v %= BYTESIZE_K;
145        }
146
147        if v > 0 {
148            res.push_str(&format!("{}B", v));
149        }
150
151        res
152    }
153}
154
155impl Deref for Bytesize {
156    type Target = usize;
157    fn deref(&self) -> &Self::Target {
158        &self.0
159    }
160}
161
162impl DerefMut for Bytesize {
163    fn deref_mut(&mut self) -> &mut Self::Target {
164        &mut self.0
165    }
166}
167
168impl From<usize> for Bytesize {
169    fn from(v: usize) -> Self {
170        Bytesize(v)
171    }
172}
173
174impl From<&str> for Bytesize {
175    fn from(v: &str) -> Self {
176        Bytesize(to_bytesize(v))
177    }
178}
179
180impl fmt::Debug for Bytesize {
181    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182        write!(f, "{}", self.string())?;
183        Ok(())
184    }
185}
186
187impl Serialize for Bytesize {
188    #[inline]
189    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
190    where
191        S: Serializer,
192    {
193        serializer.serialize_str(&self.to_string())
194    }
195}
196
197impl<'de> Deserialize<'de> for Bytesize {
198    #[inline]
199    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
200    where
201        D: Deserializer<'de>,
202    {
203        let v = to_bytesize(&String::deserialize(deserializer)?);
204        Ok(Bytesize(v))
205    }
206}
207
208/// Parse human-readable byte size string to usize
209///
210/// # Example:
211/// ```
212/// let bytes = rmqtt_utils::to_bytesize("2G512K");
213/// assert_eq!(bytes, 2148007936);
214///
215/// let complex = rmqtt_utils::to_bytesize("1G500M256K1024B");
216/// assert_eq!(complex, 1598292992);
217/// ```
218#[inline]
219pub fn to_bytesize(text: &str) -> usize {
220    let text = text.to_uppercase().replace("GB", "G").replace("MB", "M").replace("KB", "K");
221    text.split_inclusive(['G', 'M', 'K', 'B'])
222        .map(|x| {
223            let mut chars = x.chars();
224            let u = match chars.nth_back(0) {
225                None => return 0,
226                Some(u) => u,
227            };
228            let v = match chars.as_str().parse::<usize>() {
229                Err(_e) => return 0,
230                Ok(v) => v,
231            };
232            match u {
233                'B' => v,
234                'K' => v * BYTESIZE_K,
235                'M' => v * BYTESIZE_M,
236                'G' => v * BYTESIZE_G,
237                _ => 0,
238            }
239        })
240        .sum()
241}
242
243/// Deserialize Duration from human-readable string format
244#[inline]
245pub fn deserialize_duration<'de, D>(deserializer: D) -> std::result::Result<Duration, D::Error>
246where
247    D: Deserializer<'de>,
248{
249    let v = String::deserialize(deserializer)?;
250    Ok(to_duration(&v))
251}
252
253/// Deserialize optional Duration from string
254#[inline]
255pub fn deserialize_duration_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
256where
257    D: Deserializer<'de>,
258{
259    let v = String::deserialize(deserializer)?;
260    if v.is_empty() {
261        Ok(None)
262    } else {
263        Ok(Some(to_duration(&v)))
264    }
265}
266
267/// Convert human-readable duration string to Duration
268///
269/// # Supported units:
270/// - Y: milliseconds (e.g. "100Y" = 100ms)
271/// - s: seconds
272/// - m: minutes
273/// - h: hours
274/// - d: days
275/// - w: weeks
276/// - f: fortnight (2 weeks)
277///
278/// # Example:
279/// ```
280/// let duration = rmqtt_utils::to_duration("1h30m15s");
281/// assert_eq!(duration.as_secs(), 5415);
282///
283/// let complex = rmqtt_utils::to_duration("2w3d12h");
284/// assert_eq!(complex.as_secs(), 1512000);
285/// ```
286#[inline]
287pub fn to_duration(text: &str) -> Duration {
288    let text = text.to_lowercase().replace("ms", "Y");
289    let ms: u64 = text
290        .split_inclusive(['s', 'm', 'h', 'd', 'w', 'f', 'Y'])
291        .map(|x| {
292            let mut chars = x.chars();
293            let u = match chars.nth_back(0) {
294                None => return 0,
295                Some(u) => u,
296            };
297            let v = match chars.as_str().parse::<u64>() {
298                Err(_e) => return 0,
299                Ok(v) => v,
300            };
301            match u {
302                'Y' => v,
303                's' => v * 1000,
304                'm' => v * 60000,
305                'h' => v * 3600000,
306                'd' => v * 86400000,
307                'w' => v * 604800000,
308                'f' => v * 1209600000,
309                _ => 0,
310            }
311        })
312        .sum();
313    Duration::from_millis(ms)
314}
315
316/// Deserialize SocketAddr with error handling
317#[inline]
318pub fn deserialize_addr<'de, D>(deserializer: D) -> std::result::Result<SocketAddr, D::Error>
319where
320    D: Deserializer<'de>,
321{
322    let addr = String::deserialize(deserializer)?
323        .parse::<std::net::SocketAddr>()
324        .map_err(serde::de::Error::custom)?;
325    Ok(addr)
326}
327
328/// Deserialize optional SocketAddr with port handling
329#[inline]
330pub fn deserialize_addr_option<'de, D>(
331    deserializer: D,
332) -> std::result::Result<Option<std::net::SocketAddr>, D::Error>
333where
334    D: Deserializer<'de>,
335{
336    let addr = String::deserialize(deserializer).map(|mut addr| {
337        if !addr.contains(':') {
338            addr += ":0";
339        }
340        addr
341    })?;
342    let addr = addr.parse::<std::net::SocketAddr>().map_err(serde::de::Error::custom)?;
343    Ok(Some(addr))
344}
345
346/// Deserialize optional datetime from string
347#[inline]
348pub fn deserialize_datetime_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
349where
350    D: Deserializer<'de>,
351{
352    let t_str = String::deserialize(deserializer)?;
353    if t_str.is_empty() {
354        Ok(None)
355    } else {
356        let t = if let Ok(d) = timestamp_parse_from_str(&t_str, "%Y-%m-%d %H:%M:%S") {
357            Duration::from_secs(d as u64)
358        } else {
359            let d = t_str.parse::<u64>().map_err(serde::de::Error::custom)?;
360            Duration::from_secs(d)
361        };
362        Ok(Some(t))
363    }
364}
365
366/// Serialize optional datetime to string
367#[inline]
368pub fn serialize_datetime_option<S>(t: &Option<Duration>, s: S) -> std::result::Result<S::Ok, S::Error>
369where
370    S: Serializer,
371{
372    if let Some(t) = t {
373        t.as_secs().to_string().serialize(s)
374    } else {
375        "".serialize(s)
376    }
377}
378
379/// Internal datetime parsing helper
380#[inline]
381fn timestamp_parse_from_str(ts: &str, fmt: &str) -> anyhow::Result<i64> {
382    let ndt = chrono::NaiveDateTime::parse_from_str(ts, fmt)?;
383    let ndt = ndt.and_local_timezone(*chrono::Local::now().offset());
384    match ndt {
385        LocalResult::None => Err(anyhow::Error::msg("Impossible")),
386        LocalResult::Single(d) => Ok(d.timestamp()),
387        LocalResult::Ambiguous(d, _tz) => Ok(d.timestamp()),
388    }
389}
390
391/// Get current timestamp as Duration
392///
393/// # Example:
394/// ```
395/// let ts = rmqtt_utils::timestamp();
396/// assert!(ts.as_secs() > 0);
397/// ```
398#[inline]
399pub fn timestamp() -> Duration {
400    use std::time::{SystemTime, UNIX_EPOCH};
401    SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_else(|_| {
402        let now = chrono::Local::now();
403        Duration::new(now.timestamp() as u64, now.timestamp_subsec_nanos())
404    })
405}
406
407/// Get current timestamp in seconds
408///
409/// # Example:
410/// ```
411/// let ts = rmqtt_utils::timestamp_secs();
412/// assert!(ts > 0);
413/// ```
414#[inline]
415pub fn timestamp_secs() -> Timestamp {
416    use std::time::{SystemTime, UNIX_EPOCH};
417    SystemTime::now()
418        .duration_since(UNIX_EPOCH)
419        .map(|t| t.as_secs() as i64)
420        .unwrap_or_else(|_| chrono::Local::now().timestamp())
421}
422
423/// Get current timestamp in milliseconds
424///
425/// # Example:
426/// ```
427/// let ts = rmqtt_utils::timestamp_millis();
428/// assert!(ts > 0);
429/// ```
430#[inline]
431pub fn timestamp_millis() -> TimestampMillis {
432    use std::time::{SystemTime, UNIX_EPOCH};
433    SystemTime::now()
434        .duration_since(UNIX_EPOCH)
435        .map(|t| t.as_millis() as i64)
436        .unwrap_or_else(|_| chrono::Local::now().timestamp_millis())
437}
438
439/// Format timestamp (seconds) to human-readable string
440#[inline]
441pub fn format_timestamp(t: Timestamp) -> String {
442    if t <= 0 {
443        "".into()
444    } else {
445        use chrono::TimeZone;
446        if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_opt(t, 0) {
447            t.format("%Y-%m-%d %H:%M:%S").to_string()
448        } else {
449            "".into()
450        }
451    }
452}
453
454/// Format current timestamp to string
455///
456/// # Example:
457/// ```
458/// let now = rmqtt_utils::format_timestamp_now();
459/// assert!(!now.is_empty());
460/// ```
461#[inline]
462pub fn format_timestamp_now() -> String {
463    format_timestamp(timestamp_secs())
464}
465
466/// Format millisecond timestamp to string
467#[inline]
468pub fn format_timestamp_millis(t: TimestampMillis) -> String {
469    if t <= 0 {
470        "".into()
471    } else {
472        use chrono::TimeZone;
473        if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_millis_opt(t) {
474            t.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
475        } else {
476            "".into()
477        }
478    }
479}
480
481/// Format current millisecond timestamp to string
482///
483/// # Example:
484/// ```
485/// let now = rmqtt_utils::format_timestamp_millis_now();
486/// assert!(!now.is_empty());
487/// ```
488#[inline]
489pub fn format_timestamp_millis_now() -> String {
490    format_timestamp_millis(timestamp_millis())
491}
492
493/// Cluster node address representation (ID@Address)
494///
495/// # Example:
496/// ```
497/// use rmqtt_utils::NodeAddr;
498///
499/// // Parse from string
500/// let node: NodeAddr = "123@mqtt.example.com:1883".parse().unwrap();
501/// assert_eq!(node.id, 123);
502/// assert_eq!(node.addr, "mqtt.example.com:1883");
503///
504/// // Direct construction
505/// let node = NodeAddr {
506///     id: 456,
507///     addr: rmqtt_utils::Addr::from("localhost:8883")
508/// };
509/// ```
510#[derive(Clone, Serialize)]
511pub struct NodeAddr {
512    /// Unique node identifier
513    pub id: NodeId,
514
515    /// Network address in host:port format
516    pub addr: Addr,
517}
518
519impl std::fmt::Debug for NodeAddr {
520    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521        write!(f, "{}@{:?}", self.id, self.addr)
522    }
523}
524
525impl FromStr for NodeAddr {
526    type Err = Error;
527    fn from_str(s: &str) -> Result<Self, Self::Err> {
528        let parts: Vec<&str> = s.split('@').collect();
529        if parts.len() < 2 {
530            return Err(anyhow!(format!("NodeAddr format error, {}", s)));
531        }
532        let id = NodeId::from_str(parts[0])?;
533        let addr = Addr::from(parts[1]);
534        Ok(NodeAddr { id, addr })
535    }
536}
537
538impl<'de> de::Deserialize<'de> for NodeAddr {
539    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
540    where
541        D: de::Deserializer<'de>,
542    {
543        NodeAddr::from_str(&String::deserialize(deserializer)?).map_err(de::Error::custom)
544    }
545}