1#![deny(unsafe_code)]
67
68use std::fmt;
69use std::net::SocketAddr;
70use std::ops::{Deref, DerefMut};
71use std::str::FromStr;
72use std::time::Duration;
73
74use anyhow::{anyhow, Error};
75use bytestring::ByteString;
76use chrono::LocalResult;
77use serde::{
78 de::{self, Deserializer},
79 ser::Serializer,
80 Deserialize, Serialize,
81};
82
83mod counter;
84
85pub use counter::{Counter, StatsMergeMode};
86
87pub type NodeId = u64;
89
90pub type Addr = ByteString;
92
93pub type Timestamp = i64;
95
96pub type TimestampMillis = i64;
98
99const BYTESIZE_K: usize = 1024;
100const BYTESIZE_M: usize = 1048576;
101const BYTESIZE_G: usize = 1073741824;
102
103#[derive(Clone, Copy, Default)]
118pub struct Bytesize(pub usize);
119
120impl Bytesize {
121 #[inline]
129 pub fn as_u32(&self) -> u32 {
130 self.0 as u32
131 }
132
133 #[inline]
141 pub fn as_u64(&self) -> u64 {
142 self.0 as u64
143 }
144
145 #[inline]
153 pub fn as_usize(&self) -> usize {
154 self.0
155 }
156
157 #[inline]
168 pub fn string(&self) -> String {
169 let mut v = self.0;
170 let mut res = String::new();
171
172 let g = v / BYTESIZE_G;
173 if g > 0 {
174 res.push_str(&format!("{g}G"));
175 v %= BYTESIZE_G;
176 }
177
178 let m = v / BYTESIZE_M;
179 if m > 0 {
180 res.push_str(&format!("{m}M"));
181 v %= BYTESIZE_M;
182 }
183
184 let k = v / BYTESIZE_K;
185 if k > 0 {
186 res.push_str(&format!("{k}K"));
187 v %= BYTESIZE_K;
188 }
189
190 if v > 0 {
191 res.push_str(&format!("{v}B"));
192 }
193
194 res
195 }
196}
197
198impl Deref for Bytesize {
199 type Target = usize;
200 fn deref(&self) -> &Self::Target {
201 &self.0
202 }
203}
204
205impl DerefMut for Bytesize {
206 fn deref_mut(&mut self) -> &mut Self::Target {
207 &mut self.0
208 }
209}
210
211impl From<usize> for Bytesize {
212 fn from(v: usize) -> Self {
213 Bytesize(v)
214 }
215}
216
217impl TryFrom<&str> for Bytesize {
218 type Error = ParseSizeError;
219 fn try_from(v: &str) -> Result<Self, Self::Error> {
220 let value = to_bytesize(v)?;
221 Ok(Bytesize(value))
222 }
223}
224
225impl FromStr for Bytesize {
226 type Err = ParseSizeError;
227 fn from_str(s: &str) -> Result<Self, Self::Err> {
228 Ok(Bytesize(to_bytesize(s)?))
229 }
230}
231
232impl fmt::Debug for Bytesize {
233 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
234 write!(f, "{}", self.string())?;
235 Ok(())
236 }
237}
238
239impl fmt::Display for Bytesize {
240 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
241 write!(f, "{}", self.string())
242 }
243}
244
245impl Serialize for Bytesize {
246 #[inline]
247 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
248 where
249 S: Serializer,
250 {
251 serializer.serialize_str(&self.to_string())
252 }
253}
254
255impl<'de> Deserialize<'de> for Bytesize {
256 #[inline]
257 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
258 where
259 D: Deserializer<'de>,
260 {
261 let v = to_bytesize(&String::deserialize(deserializer)?).map_err(de::Error::custom)?;
262 Ok(Bytesize(v))
263 }
264}
265
266#[inline]
277pub fn to_bytesize(text: &str) -> Result<usize, ParseSizeError> {
278 let text = text.to_uppercase().replace("GB", "G").replace("MB", "M").replace("KB", "K");
279 text.split_inclusive(['G', 'M', 'K', 'B'])
280 .map(|x| {
281 let mut chars = x.chars();
282 let u = chars.nth_back(0).ok_or(ParseSizeError::InvalidFormat)?;
283 let num_str = chars.as_str();
284 let v =
285 num_str.parse::<usize>().map_err(|_| ParseSizeError::InvalidNumber(num_str.to_string()))?;
286 match u {
287 'B' => Ok(v),
288 'K' => Ok(v * BYTESIZE_K),
289 'M' => Ok(v * BYTESIZE_M),
290 'G' => Ok(v * BYTESIZE_G),
291 _ => Err(ParseSizeError::InvalidUnit(u)),
292 }
293 })
294 .sum()
295}
296
297#[derive(Debug, Eq, PartialEq, Clone)]
298pub enum ParseSizeError {
299 InvalidFormat,
300 InvalidNumber(String),
301 InvalidUnit(char),
302}
303
304impl std::fmt::Display for ParseSizeError {
305 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
306 match self {
307 Self::InvalidFormat => write!(f, "invalid size format"),
308 Self::InvalidNumber(s) => write!(f, "invalid number: '{s}'"),
309 Self::InvalidUnit(c) => write!(f, "invalid unit: '{c}'"),
310 }
311 }
312}
313
314impl std::error::Error for ParseSizeError {}
315
316#[inline]
318pub fn deserialize_duration<'de, D>(deserializer: D) -> std::result::Result<Duration, D::Error>
319where
320 D: Deserializer<'de>,
321{
322 let v = String::deserialize(deserializer)?;
323 Ok(to_duration(&v))
324}
325
326#[inline]
328pub fn deserialize_duration_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
329where
330 D: Deserializer<'de>,
331{
332 let v = String::deserialize(deserializer)?;
333 if v.is_empty() {
334 Ok(None)
335 } else {
336 Ok(Some(to_duration(&v)))
337 }
338}
339
340#[inline]
360pub fn to_duration(text: &str) -> Duration {
361 let text = text.to_lowercase().replace("ms", "Y");
362 let ms: u64 = text
363 .split_inclusive(['s', 'm', 'h', 'd', 'w', 'f', 'Y'])
364 .map(|x| {
365 let mut chars = x.chars();
366 let u = match chars.nth_back(0) {
367 None => return 0,
368 Some(u) => u,
369 };
370 let v = match chars.as_str().parse::<u64>() {
371 Err(_e) => return 0,
372 Ok(v) => v,
373 };
374 match u {
375 'Y' => v,
376 's' => v * 1000,
377 'm' => v * 60000,
378 'h' => v * 3600000,
379 'd' => v * 86400000,
380 'w' => v * 604800000,
381 'f' => v * 1209600000,
382 _ => 0,
383 }
384 })
385 .sum();
386 Duration::from_millis(ms)
387}
388
389#[inline]
391pub fn deserialize_addr<'de, D>(deserializer: D) -> std::result::Result<SocketAddr, D::Error>
392where
393 D: Deserializer<'de>,
394{
395 let addr = String::deserialize(deserializer)?
396 .parse::<std::net::SocketAddr>()
397 .map_err(serde::de::Error::custom)?;
398 Ok(addr)
399}
400
401#[inline]
403pub fn deserialize_addr_option<'de, D>(
404 deserializer: D,
405) -> std::result::Result<Option<std::net::SocketAddr>, D::Error>
406where
407 D: Deserializer<'de>,
408{
409 let addr = String::deserialize(deserializer).map(|mut addr| {
410 if !addr.contains(':') {
411 addr += ":0";
412 }
413 addr
414 })?;
415 let addr = addr.parse::<std::net::SocketAddr>().map_err(serde::de::Error::custom)?;
416 Ok(Some(addr))
417}
418
419#[inline]
421pub fn deserialize_datetime_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
422where
423 D: Deserializer<'de>,
424{
425 let t_str = String::deserialize(deserializer)?;
426 if t_str.is_empty() {
427 Ok(None)
428 } else {
429 let t = if let Ok(d) = timestamp_parse_from_str(&t_str, "%Y-%m-%d %H:%M:%S") {
430 Duration::from_secs(d as u64)
431 } else {
432 let d = t_str.parse::<u64>().map_err(serde::de::Error::custom)?;
433 Duration::from_secs(d)
434 };
435 Ok(Some(t))
436 }
437}
438
439#[inline]
441pub fn serialize_datetime_option<S>(t: &Option<Duration>, s: S) -> std::result::Result<S::Ok, S::Error>
442where
443 S: Serializer,
444{
445 if let Some(t) = t {
446 t.as_secs().to_string().serialize(s)
447 } else {
448 "".serialize(s)
449 }
450}
451
452#[inline]
454fn timestamp_parse_from_str(ts: &str, fmt: &str) -> anyhow::Result<i64> {
455 let ndt = chrono::NaiveDateTime::parse_from_str(ts, fmt)?;
456 let ndt = ndt.and_local_timezone(*chrono::Local::now().offset());
457 match ndt {
458 LocalResult::None => Err(anyhow::Error::msg("Impossible")),
459 LocalResult::Single(d) => Ok(d.timestamp()),
460 LocalResult::Ambiguous(d, _tz) => Ok(d.timestamp()),
461 }
462}
463
464#[inline]
472pub fn timestamp() -> Duration {
473 use std::time::{SystemTime, UNIX_EPOCH};
474 SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_else(|_| {
475 let now = chrono::Local::now();
476 Duration::new(now.timestamp() as u64, now.timestamp_subsec_nanos())
477 })
478}
479
480#[inline]
488pub fn timestamp_secs() -> Timestamp {
489 use std::time::{SystemTime, UNIX_EPOCH};
490 SystemTime::now()
491 .duration_since(UNIX_EPOCH)
492 .map(|t| t.as_secs() as i64)
493 .unwrap_or_else(|_| chrono::Local::now().timestamp())
494}
495
496#[inline]
504pub fn timestamp_millis() -> TimestampMillis {
505 use std::time::{SystemTime, UNIX_EPOCH};
506 SystemTime::now()
507 .duration_since(UNIX_EPOCH)
508 .map(|t| t.as_millis() as i64)
509 .unwrap_or_else(|_| chrono::Local::now().timestamp_millis())
510}
511
512#[inline]
514pub fn format_timestamp(t: Timestamp) -> String {
515 if t <= 0 {
516 "".into()
517 } else {
518 use chrono::TimeZone;
519 if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_opt(t, 0) {
520 t.format("%Y-%m-%d %H:%M:%S").to_string()
521 } else {
522 "".into()
523 }
524 }
525}
526
527#[inline]
535pub fn format_timestamp_now() -> String {
536 format_timestamp(timestamp_secs())
537}
538
539#[inline]
541pub fn format_timestamp_millis(t: TimestampMillis) -> String {
542 if t <= 0 {
543 "".into()
544 } else {
545 use chrono::TimeZone;
546 if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_millis_opt(t) {
547 t.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
548 } else {
549 "".into()
550 }
551 }
552}
553
554#[inline]
562pub fn format_timestamp_millis_now() -> String {
563 format_timestamp_millis(timestamp_millis())
564}
565
566#[derive(Clone, Serialize)]
584pub struct NodeAddr {
585 pub id: NodeId,
587
588 pub addr: Addr,
590}
591
592impl std::fmt::Debug for NodeAddr {
593 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
594 write!(f, "{}@{:?}", self.id, self.addr)
595 }
596}
597
598impl FromStr for NodeAddr {
599 type Err = Error;
600 fn from_str(s: &str) -> Result<Self, Self::Err> {
601 let parts: Vec<&str> = s.split('@').collect();
602 if parts.len() < 2 {
603 return Err(anyhow!(format!("NodeAddr format error, {}", s)));
604 }
605 let id = NodeId::from_str(parts[0])?;
606 let addr = Addr::from(parts[1]);
607 Ok(NodeAddr { id, addr })
608 }
609}
610
611impl<'de> de::Deserialize<'de> for NodeAddr {
612 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
613 where
614 D: de::Deserializer<'de>,
615 {
616 NodeAddr::from_str(&String::deserialize(deserializer)?).map_err(de::Error::custom)
617 }
618}