1#![deny(unsafe_code)]
67
68use std::fmt;
69use std::net::SocketAddr;
70use std::ops::{Deref, DerefMut};
71use std::str::FromStr;
72use std::time::Duration;
73
74use anyhow::{anyhow, Error};
75use bytestring::ByteString;
76use chrono::LocalResult;
77use serde::{
78 de::{self, Deserializer},
79 ser::Serializer,
80 Deserialize, Serialize,
81};
82
83mod counter;
84
85pub use counter::{Counter, StatsMergeMode};
86
87pub type NodeId = u64;
89
90pub type Addr = ByteString;
92
93pub type Timestamp = i64;
95
96pub type TimestampMillis = i64;
98
99const BYTESIZE_K: usize = 1024;
100const BYTESIZE_M: usize = 1048576;
101const BYTESIZE_G: usize = 1073741824;
102
103#[derive(Clone, Copy, Default)]
118pub struct Bytesize(pub usize);
119
120impl Bytesize {
121 #[inline]
129 pub fn as_u32(&self) -> u32 {
130 self.0 as u32
131 }
132
133 #[inline]
141 pub fn as_u64(&self) -> u64 {
142 self.0 as u64
143 }
144
145 #[inline]
153 pub fn as_usize(&self) -> usize {
154 self.0
155 }
156
157 #[inline]
168 pub fn string(&self) -> String {
169 let mut v = self.0;
170 let mut res = String::new();
171
172 let g = v / BYTESIZE_G;
173 if g > 0 {
174 res.push_str(&format!("{}G", g));
175 v %= BYTESIZE_G;
176 }
177
178 let m = v / BYTESIZE_M;
179 if m > 0 {
180 res.push_str(&format!("{}M", m));
181 v %= BYTESIZE_M;
182 }
183
184 let k = v / BYTESIZE_K;
185 if k > 0 {
186 res.push_str(&format!("{}K", k));
187 v %= BYTESIZE_K;
188 }
189
190 if v > 0 {
191 res.push_str(&format!("{}B", v));
192 }
193
194 res
195 }
196}
197
198impl Deref for Bytesize {
199 type Target = usize;
200 fn deref(&self) -> &Self::Target {
201 &self.0
202 }
203}
204
205impl DerefMut for Bytesize {
206 fn deref_mut(&mut self) -> &mut Self::Target {
207 &mut self.0
208 }
209}
210
211impl From<usize> for Bytesize {
212 fn from(v: usize) -> Self {
213 Bytesize(v)
214 }
215}
216
217impl From<&str> for Bytesize {
218 fn from(v: &str) -> Self {
219 Bytesize(to_bytesize(v))
220 }
221}
222
223impl fmt::Debug for Bytesize {
224 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
225 write!(f, "{}", self.string())?;
226 Ok(())
227 }
228}
229
230impl Serialize for Bytesize {
231 #[inline]
232 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
233 where
234 S: Serializer,
235 {
236 serializer.serialize_str(&self.to_string())
237 }
238}
239
240impl<'de> Deserialize<'de> for Bytesize {
241 #[inline]
242 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
243 where
244 D: Deserializer<'de>,
245 {
246 let v = to_bytesize(&String::deserialize(deserializer)?);
247 Ok(Bytesize(v))
248 }
249}
250
251#[inline]
262pub fn to_bytesize(text: &str) -> usize {
263 let text = text.to_uppercase().replace("GB", "G").replace("MB", "M").replace("KB", "K");
264 text.split_inclusive(['G', 'M', 'K', 'B'])
265 .map(|x| {
266 let mut chars = x.chars();
267 let u = match chars.nth_back(0) {
268 None => return 0,
269 Some(u) => u,
270 };
271 let v = match chars.as_str().parse::<usize>() {
272 Err(_e) => return 0,
273 Ok(v) => v,
274 };
275 match u {
276 'B' => v,
277 'K' => v * BYTESIZE_K,
278 'M' => v * BYTESIZE_M,
279 'G' => v * BYTESIZE_G,
280 _ => 0,
281 }
282 })
283 .sum()
284}
285
286#[inline]
288pub fn deserialize_duration<'de, D>(deserializer: D) -> std::result::Result<Duration, D::Error>
289where
290 D: Deserializer<'de>,
291{
292 let v = String::deserialize(deserializer)?;
293 Ok(to_duration(&v))
294}
295
296#[inline]
298pub fn deserialize_duration_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
299where
300 D: Deserializer<'de>,
301{
302 let v = String::deserialize(deserializer)?;
303 if v.is_empty() {
304 Ok(None)
305 } else {
306 Ok(Some(to_duration(&v)))
307 }
308}
309
310#[inline]
330pub fn to_duration(text: &str) -> Duration {
331 let text = text.to_lowercase().replace("ms", "Y");
332 let ms: u64 = text
333 .split_inclusive(['s', 'm', 'h', 'd', 'w', 'f', 'Y'])
334 .map(|x| {
335 let mut chars = x.chars();
336 let u = match chars.nth_back(0) {
337 None => return 0,
338 Some(u) => u,
339 };
340 let v = match chars.as_str().parse::<u64>() {
341 Err(_e) => return 0,
342 Ok(v) => v,
343 };
344 match u {
345 'Y' => v,
346 's' => v * 1000,
347 'm' => v * 60000,
348 'h' => v * 3600000,
349 'd' => v * 86400000,
350 'w' => v * 604800000,
351 'f' => v * 1209600000,
352 _ => 0,
353 }
354 })
355 .sum();
356 Duration::from_millis(ms)
357}
358
359#[inline]
361pub fn deserialize_addr<'de, D>(deserializer: D) -> std::result::Result<SocketAddr, D::Error>
362where
363 D: Deserializer<'de>,
364{
365 let addr = String::deserialize(deserializer)?
366 .parse::<std::net::SocketAddr>()
367 .map_err(serde::de::Error::custom)?;
368 Ok(addr)
369}
370
371#[inline]
373pub fn deserialize_addr_option<'de, D>(
374 deserializer: D,
375) -> std::result::Result<Option<std::net::SocketAddr>, D::Error>
376where
377 D: Deserializer<'de>,
378{
379 let addr = String::deserialize(deserializer).map(|mut addr| {
380 if !addr.contains(':') {
381 addr += ":0";
382 }
383 addr
384 })?;
385 let addr = addr.parse::<std::net::SocketAddr>().map_err(serde::de::Error::custom)?;
386 Ok(Some(addr))
387}
388
389#[inline]
391pub fn deserialize_datetime_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
392where
393 D: Deserializer<'de>,
394{
395 let t_str = String::deserialize(deserializer)?;
396 if t_str.is_empty() {
397 Ok(None)
398 } else {
399 let t = if let Ok(d) = timestamp_parse_from_str(&t_str, "%Y-%m-%d %H:%M:%S") {
400 Duration::from_secs(d as u64)
401 } else {
402 let d = t_str.parse::<u64>().map_err(serde::de::Error::custom)?;
403 Duration::from_secs(d)
404 };
405 Ok(Some(t))
406 }
407}
408
409#[inline]
411pub fn serialize_datetime_option<S>(t: &Option<Duration>, s: S) -> std::result::Result<S::Ok, S::Error>
412where
413 S: Serializer,
414{
415 if let Some(t) = t {
416 t.as_secs().to_string().serialize(s)
417 } else {
418 "".serialize(s)
419 }
420}
421
422#[inline]
424fn timestamp_parse_from_str(ts: &str, fmt: &str) -> anyhow::Result<i64> {
425 let ndt = chrono::NaiveDateTime::parse_from_str(ts, fmt)?;
426 let ndt = ndt.and_local_timezone(*chrono::Local::now().offset());
427 match ndt {
428 LocalResult::None => Err(anyhow::Error::msg("Impossible")),
429 LocalResult::Single(d) => Ok(d.timestamp()),
430 LocalResult::Ambiguous(d, _tz) => Ok(d.timestamp()),
431 }
432}
433
434#[inline]
442pub fn timestamp() -> Duration {
443 use std::time::{SystemTime, UNIX_EPOCH};
444 SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_else(|_| {
445 let now = chrono::Local::now();
446 Duration::new(now.timestamp() as u64, now.timestamp_subsec_nanos())
447 })
448}
449
450#[inline]
458pub fn timestamp_secs() -> Timestamp {
459 use std::time::{SystemTime, UNIX_EPOCH};
460 SystemTime::now()
461 .duration_since(UNIX_EPOCH)
462 .map(|t| t.as_secs() as i64)
463 .unwrap_or_else(|_| chrono::Local::now().timestamp())
464}
465
466#[inline]
474pub fn timestamp_millis() -> TimestampMillis {
475 use std::time::{SystemTime, UNIX_EPOCH};
476 SystemTime::now()
477 .duration_since(UNIX_EPOCH)
478 .map(|t| t.as_millis() as i64)
479 .unwrap_or_else(|_| chrono::Local::now().timestamp_millis())
480}
481
482#[inline]
484pub fn format_timestamp(t: Timestamp) -> String {
485 if t <= 0 {
486 "".into()
487 } else {
488 use chrono::TimeZone;
489 if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_opt(t, 0) {
490 t.format("%Y-%m-%d %H:%M:%S").to_string()
491 } else {
492 "".into()
493 }
494 }
495}
496
497#[inline]
505pub fn format_timestamp_now() -> String {
506 format_timestamp(timestamp_secs())
507}
508
509#[inline]
511pub fn format_timestamp_millis(t: TimestampMillis) -> String {
512 if t <= 0 {
513 "".into()
514 } else {
515 use chrono::TimeZone;
516 if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_millis_opt(t) {
517 t.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
518 } else {
519 "".into()
520 }
521 }
522}
523
524#[inline]
532pub fn format_timestamp_millis_now() -> String {
533 format_timestamp_millis(timestamp_millis())
534}
535
536#[derive(Clone, Serialize)]
554pub struct NodeAddr {
555 pub id: NodeId,
557
558 pub addr: Addr,
560}
561
562impl std::fmt::Debug for NodeAddr {
563 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564 write!(f, "{}@{:?}", self.id, self.addr)
565 }
566}
567
568impl FromStr for NodeAddr {
569 type Err = Error;
570 fn from_str(s: &str) -> Result<Self, Self::Err> {
571 let parts: Vec<&str> = s.split('@').collect();
572 if parts.len() < 2 {
573 return Err(anyhow!(format!("NodeAddr format error, {}", s)));
574 }
575 let id = NodeId::from_str(parts[0])?;
576 let addr = Addr::from(parts[1]);
577 Ok(NodeAddr { id, addr })
578 }
579}
580
581impl<'de> de::Deserialize<'de> for NodeAddr {
582 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
583 where
584 D: de::Deserializer<'de>,
585 {
586 NodeAddr::from_str(&String::deserialize(deserializer)?).map_err(de::Error::custom)
587 }
588}