1#![deny(unsafe_code)]
24
25use std::fmt;
26use std::net::SocketAddr;
27use std::ops::{Deref, DerefMut};
28use std::str::FromStr;
29use std::time::Duration;
30
31use anyhow::{anyhow, Error};
32use bytestring::ByteString;
33use chrono::LocalResult;
34use serde::{
35 de::{self, Deserializer},
36 ser::Serializer,
37 Deserialize, Serialize,
38};
39
40mod counter;
41
42pub use counter::{Counter, StatsMergeMode};
43
44pub type NodeId = u64;
46
47pub type Addr = ByteString;
49
50pub type Timestamp = i64;
52
53pub type TimestampMillis = i64;
55
56const BYTESIZE_K: usize = 1024;
57const BYTESIZE_M: usize = 1048576;
58const BYTESIZE_G: usize = 1073741824;
59
60#[derive(Clone, Copy, Default)]
75pub struct Bytesize(pub usize);
76
77impl Bytesize {
78 #[inline]
86 pub fn as_u32(&self) -> u32 {
87 self.0 as u32
88 }
89
90 #[inline]
98 pub fn as_u64(&self) -> u64 {
99 self.0 as u64
100 }
101
102 #[inline]
110 pub fn as_usize(&self) -> usize {
111 self.0
112 }
113
114 #[inline]
125 pub fn string(&self) -> String {
126 let mut v = self.0;
127 let mut res = String::new();
128
129 let g = v / BYTESIZE_G;
130 if g > 0 {
131 res.push_str(&format!("{}G", g));
132 v %= BYTESIZE_G;
133 }
134
135 let m = v / BYTESIZE_M;
136 if m > 0 {
137 res.push_str(&format!("{}M", m));
138 v %= BYTESIZE_M;
139 }
140
141 let k = v / BYTESIZE_K;
142 if k > 0 {
143 res.push_str(&format!("{}K", k));
144 v %= BYTESIZE_K;
145 }
146
147 if v > 0 {
148 res.push_str(&format!("{}B", v));
149 }
150
151 res
152 }
153}
154
155impl Deref for Bytesize {
156 type Target = usize;
157 fn deref(&self) -> &Self::Target {
158 &self.0
159 }
160}
161
162impl DerefMut for Bytesize {
163 fn deref_mut(&mut self) -> &mut Self::Target {
164 &mut self.0
165 }
166}
167
168impl From<usize> for Bytesize {
169 fn from(v: usize) -> Self {
170 Bytesize(v)
171 }
172}
173
174impl From<&str> for Bytesize {
175 fn from(v: &str) -> Self {
176 Bytesize(to_bytesize(v))
177 }
178}
179
180impl fmt::Debug for Bytesize {
181 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
182 write!(f, "{}", self.string())?;
183 Ok(())
184 }
185}
186
187impl Serialize for Bytesize {
188 #[inline]
189 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
190 where
191 S: Serializer,
192 {
193 serializer.serialize_str(&self.to_string())
194 }
195}
196
197impl<'de> Deserialize<'de> for Bytesize {
198 #[inline]
199 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
200 where
201 D: Deserializer<'de>,
202 {
203 let v = to_bytesize(&String::deserialize(deserializer)?);
204 Ok(Bytesize(v))
205 }
206}
207
208#[inline]
219pub fn to_bytesize(text: &str) -> usize {
220 let text = text.to_uppercase().replace("GB", "G").replace("MB", "M").replace("KB", "K");
221 text.split_inclusive(['G', 'M', 'K', 'B'])
222 .map(|x| {
223 let mut chars = x.chars();
224 let u = match chars.nth_back(0) {
225 None => return 0,
226 Some(u) => u,
227 };
228 let v = match chars.as_str().parse::<usize>() {
229 Err(_e) => return 0,
230 Ok(v) => v,
231 };
232 match u {
233 'B' => v,
234 'K' => v * BYTESIZE_K,
235 'M' => v * BYTESIZE_M,
236 'G' => v * BYTESIZE_G,
237 _ => 0,
238 }
239 })
240 .sum()
241}
242
243#[inline]
245pub fn deserialize_duration<'de, D>(deserializer: D) -> std::result::Result<Duration, D::Error>
246where
247 D: Deserializer<'de>,
248{
249 let v = String::deserialize(deserializer)?;
250 Ok(to_duration(&v))
251}
252
253#[inline]
255pub fn deserialize_duration_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
256where
257 D: Deserializer<'de>,
258{
259 let v = String::deserialize(deserializer)?;
260 if v.is_empty() {
261 Ok(None)
262 } else {
263 Ok(Some(to_duration(&v)))
264 }
265}
266
267#[inline]
287pub fn to_duration(text: &str) -> Duration {
288 let text = text.to_lowercase().replace("ms", "Y");
289 let ms: u64 = text
290 .split_inclusive(['s', 'm', 'h', 'd', 'w', 'f', 'Y'])
291 .map(|x| {
292 let mut chars = x.chars();
293 let u = match chars.nth_back(0) {
294 None => return 0,
295 Some(u) => u,
296 };
297 let v = match chars.as_str().parse::<u64>() {
298 Err(_e) => return 0,
299 Ok(v) => v,
300 };
301 match u {
302 'Y' => v,
303 's' => v * 1000,
304 'm' => v * 60000,
305 'h' => v * 3600000,
306 'd' => v * 86400000,
307 'w' => v * 604800000,
308 'f' => v * 1209600000,
309 _ => 0,
310 }
311 })
312 .sum();
313 Duration::from_millis(ms)
314}
315
316#[inline]
318pub fn deserialize_addr<'de, D>(deserializer: D) -> std::result::Result<SocketAddr, D::Error>
319where
320 D: Deserializer<'de>,
321{
322 let addr = String::deserialize(deserializer)?
323 .parse::<std::net::SocketAddr>()
324 .map_err(serde::de::Error::custom)?;
325 Ok(addr)
326}
327
328#[inline]
330pub fn deserialize_addr_option<'de, D>(
331 deserializer: D,
332) -> std::result::Result<Option<std::net::SocketAddr>, D::Error>
333where
334 D: Deserializer<'de>,
335{
336 let addr = String::deserialize(deserializer).map(|mut addr| {
337 if !addr.contains(':') {
338 addr += ":0";
339 }
340 addr
341 })?;
342 let addr = addr.parse::<std::net::SocketAddr>().map_err(serde::de::Error::custom)?;
343 Ok(Some(addr))
344}
345
346#[inline]
348pub fn deserialize_datetime_option<'de, D>(deserializer: D) -> std::result::Result<Option<Duration>, D::Error>
349where
350 D: Deserializer<'de>,
351{
352 let t_str = String::deserialize(deserializer)?;
353 if t_str.is_empty() {
354 Ok(None)
355 } else {
356 let t = if let Ok(d) = timestamp_parse_from_str(&t_str, "%Y-%m-%d %H:%M:%S") {
357 Duration::from_secs(d as u64)
358 } else {
359 let d = t_str.parse::<u64>().map_err(serde::de::Error::custom)?;
360 Duration::from_secs(d)
361 };
362 Ok(Some(t))
363 }
364}
365
366#[inline]
368pub fn serialize_datetime_option<S>(t: &Option<Duration>, s: S) -> std::result::Result<S::Ok, S::Error>
369where
370 S: Serializer,
371{
372 if let Some(t) = t {
373 t.as_secs().to_string().serialize(s)
374 } else {
375 "".serialize(s)
376 }
377}
378
379#[inline]
381fn timestamp_parse_from_str(ts: &str, fmt: &str) -> anyhow::Result<i64> {
382 let ndt = chrono::NaiveDateTime::parse_from_str(ts, fmt)?;
383 let ndt = ndt.and_local_timezone(*chrono::Local::now().offset());
384 match ndt {
385 LocalResult::None => Err(anyhow::Error::msg("Impossible")),
386 LocalResult::Single(d) => Ok(d.timestamp()),
387 LocalResult::Ambiguous(d, _tz) => Ok(d.timestamp()),
388 }
389}
390
391#[inline]
399pub fn timestamp() -> Duration {
400 use std::time::{SystemTime, UNIX_EPOCH};
401 SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_else(|_| {
402 let now = chrono::Local::now();
403 Duration::new(now.timestamp() as u64, now.timestamp_subsec_nanos())
404 })
405}
406
407#[inline]
415pub fn timestamp_secs() -> Timestamp {
416 use std::time::{SystemTime, UNIX_EPOCH};
417 SystemTime::now()
418 .duration_since(UNIX_EPOCH)
419 .map(|t| t.as_secs() as i64)
420 .unwrap_or_else(|_| chrono::Local::now().timestamp())
421}
422
423#[inline]
431pub fn timestamp_millis() -> TimestampMillis {
432 use std::time::{SystemTime, UNIX_EPOCH};
433 SystemTime::now()
434 .duration_since(UNIX_EPOCH)
435 .map(|t| t.as_millis() as i64)
436 .unwrap_or_else(|_| chrono::Local::now().timestamp_millis())
437}
438
439#[inline]
441pub fn format_timestamp(t: Timestamp) -> String {
442 if t <= 0 {
443 "".into()
444 } else {
445 use chrono::TimeZone;
446 if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_opt(t, 0) {
447 t.format("%Y-%m-%d %H:%M:%S").to_string()
448 } else {
449 "".into()
450 }
451 }
452}
453
454#[inline]
462pub fn format_timestamp_now() -> String {
463 format_timestamp(timestamp_secs())
464}
465
466#[inline]
468pub fn format_timestamp_millis(t: TimestampMillis) -> String {
469 if t <= 0 {
470 "".into()
471 } else {
472 use chrono::TimeZone;
473 if let chrono::LocalResult::Single(t) = chrono::Local.timestamp_millis_opt(t) {
474 t.format("%Y-%m-%d %H:%M:%S%.3f").to_string()
475 } else {
476 "".into()
477 }
478 }
479}
480
481#[inline]
489pub fn format_timestamp_millis_now() -> String {
490 format_timestamp_millis(timestamp_millis())
491}
492
493#[derive(Clone, Serialize)]
511pub struct NodeAddr {
512 pub id: NodeId,
514
515 pub addr: Addr,
517}
518
519impl std::fmt::Debug for NodeAddr {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 write!(f, "{}@{:?}", self.id, self.addr)
522 }
523}
524
525impl FromStr for NodeAddr {
526 type Err = Error;
527 fn from_str(s: &str) -> Result<Self, Self::Err> {
528 let parts: Vec<&str> = s.split('@').collect();
529 if parts.len() < 2 {
530 return Err(anyhow!(format!("NodeAddr format error, {}", s)));
531 }
532 let id = NodeId::from_str(parts[0])?;
533 let addr = Addr::from(parts[1]);
534 Ok(NodeAddr { id, addr })
535 }
536}
537
538impl<'de> de::Deserialize<'de> for NodeAddr {
539 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
540 where
541 D: de::Deserializer<'de>,
542 {
543 NodeAddr::from_str(&String::deserialize(deserializer)?).map_err(de::Error::custom)
544 }
545}