raft_engine/
util.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use std::fmt::{self, Display, Write};
4use std::ops::{Div, Mul};
5use std::str::FromStr;
6use std::time::{Duration, Instant};
7
8use crc32fast::Hasher;
9use serde::de::{self, Unexpected, Visitor};
10use serde::{Deserialize, Deserializer, Serialize, Serializer};
11
12const UNIT: u64 = 1;
13
14const BINARY_DATA_MAGNITUDE: u64 = 1024;
15pub const B: u64 = UNIT;
16pub const KIB: u64 = B * BINARY_DATA_MAGNITUDE;
17pub const MIB: u64 = KIB * BINARY_DATA_MAGNITUDE;
18pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE;
19pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE;
20pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE;
21
22#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd)]
23pub struct ReadableSize(pub u64);
24
25impl ReadableSize {
26    pub const fn kb(count: u64) -> ReadableSize {
27        ReadableSize(count * KIB)
28    }
29
30    pub const fn mb(count: u64) -> ReadableSize {
31        ReadableSize(count * MIB)
32    }
33
34    pub const fn gb(count: u64) -> ReadableSize {
35        ReadableSize(count * GIB)
36    }
37
38    pub const fn as_mb(self) -> u64 {
39        self.0 / MIB
40    }
41}
42
43impl Div<u64> for ReadableSize {
44    type Output = ReadableSize;
45
46    fn div(self, rhs: u64) -> ReadableSize {
47        ReadableSize(self.0 / rhs)
48    }
49}
50
51impl Div<ReadableSize> for ReadableSize {
52    type Output = u64;
53
54    fn div(self, rhs: ReadableSize) -> u64 {
55        self.0 / rhs.0
56    }
57}
58
59impl Mul<u64> for ReadableSize {
60    type Output = ReadableSize;
61
62    fn mul(self, rhs: u64) -> ReadableSize {
63        ReadableSize(self.0 * rhs)
64    }
65}
66
67impl Serialize for ReadableSize {
68    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
69    where
70        S: Serializer,
71    {
72        let mut buffer = String::new();
73        write!(buffer, "{self}").unwrap();
74        serializer.serialize_str(&buffer)
75    }
76}
77
78impl FromStr for ReadableSize {
79    type Err = String;
80
81    // This method parses value in binary unit.
82    fn from_str(s: &str) -> Result<ReadableSize, String> {
83        let size_str = s.trim();
84        if size_str.is_empty() {
85            return Err(format!("{s:?} is not a valid size."));
86        }
87
88        if !size_str.is_ascii() {
89            return Err(format!("ASCII string is expected, but got {s:?}"));
90        }
91
92        // size: digits and '.' as decimal separator
93        let size_len = size_str
94            .to_string()
95            .chars()
96            .take_while(|c| char::is_ascii_digit(c) || ['.', 'e', 'E', '-', '+'].contains(c))
97            .count();
98
99        // unit: alphabetic characters
100        let (size, unit) = size_str.split_at(size_len);
101
102        let unit = match unit.trim() {
103            "K" | "KB" | "KiB" => KIB,
104            "M" | "MB" | "MiB" => MIB,
105            "G" | "GB" | "GiB" => GIB,
106            "T" | "TB" | "TiB" => TIB,
107            "P" | "PB" | "PiB" => PIB,
108            "B" | "" => B,
109            _ => {
110                return Err(format!(
111                    "only B, KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, and PiB are supported: {s:?}"
112                ));
113            }
114        };
115
116        match size.parse::<f64>() {
117            Ok(n) => Ok(ReadableSize((n * unit as f64) as u64)),
118            Err(_) => Err(format!("invalid size string: {s:?}")),
119        }
120    }
121}
122
123impl Display for ReadableSize {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        let size = self.0;
126        if size == 0 {
127            write!(f, "{size}KiB")
128        } else if size % PIB == 0 {
129            write!(f, "{}PiB", size / PIB)
130        } else if size % TIB == 0 {
131            write!(f, "{}TiB", size / TIB)
132        } else if size % GIB == 0 {
133            write!(f, "{}GiB", size / GIB)
134        } else if size % MIB == 0 {
135            write!(f, "{}MiB", size / MIB)
136        } else if size % KIB == 0 {
137            write!(f, "{}KiB", size / KIB)
138        } else {
139            write!(f, "{size}B")
140        }
141    }
142}
143
144impl<'de> Deserialize<'de> for ReadableSize {
145    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
146    where
147        D: Deserializer<'de>,
148    {
149        struct SizeVisitor;
150
151        impl<'de> Visitor<'de> for SizeVisitor {
152            type Value = ReadableSize;
153
154            fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
155                formatter.write_str("valid size")
156            }
157
158            fn visit_i64<E>(self, size: i64) -> Result<ReadableSize, E>
159            where
160                E: de::Error,
161            {
162                if size >= 0 {
163                    self.visit_u64(size as u64)
164                } else {
165                    Err(E::invalid_value(Unexpected::Signed(size), &self))
166                }
167            }
168
169            fn visit_u64<E>(self, size: u64) -> Result<ReadableSize, E>
170            where
171                E: de::Error,
172            {
173                Ok(ReadableSize(size))
174            }
175
176            fn visit_str<E>(self, size_str: &str) -> Result<ReadableSize, E>
177            where
178                E: de::Error,
179            {
180                size_str.parse().map_err(E::custom)
181            }
182        }
183
184        deserializer.deserialize_any(SizeVisitor)
185    }
186}
187
188pub trait InstantExt {
189    fn saturating_elapsed(&self) -> Duration;
190}
191
192impl InstantExt for Instant {
193    #[inline]
194    fn saturating_elapsed(&self) -> Duration {
195        Instant::now().saturating_duration_since(*self)
196    }
197}
198
199#[inline]
200pub fn crc32(data: &[u8]) -> u32 {
201    let mut hasher = Hasher::new();
202    hasher.update(data);
203    hasher.finalize()
204}
205
206// Credit: [splitmix64 algorithm](https://xorshift.di.unimi.it/splitmix64.c)
207#[inline]
208pub fn hash_u64(mut i: u64) -> u64 {
209    i = (i ^ (i >> 30)).wrapping_mul(0xbf58476d1ce4e5b9);
210    i = (i ^ (i >> 27)).wrapping_mul(0x94d049bb133111eb);
211    i ^ (i >> 31)
212}
213
214#[allow(dead_code)]
215#[inline]
216pub fn unhash_u64(mut i: u64) -> u64 {
217    i = (i ^ (i >> 31) ^ (i >> 62)).wrapping_mul(0x319642b2d24d8ec3);
218    i = (i ^ (i >> 27) ^ (i >> 54)).wrapping_mul(0x96de1b173f119089);
219    i ^ (i >> 30) ^ (i >> 60)
220}
221
222pub mod lz4 {
223    use crate::{Error, Result};
224    use std::{i32, ptr};
225
226    pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;
227
228    /// Compress content in `buf[skip..]`, and append output to `buf`.
229    pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<()> {
230        let buf_len = buf.len();
231        let content_len = buf_len - skip;
232        if content_len > 0 {
233            if content_len > i32::MAX as usize {
234                return Err(Error::InvalidArgument(format!(
235                    "Content too long {content_len}"
236                )));
237            }
238            unsafe {
239                let bound = lz4_sys::LZ4_compressBound(content_len as i32);
240                debug_assert!(bound > 0);
241
242                // Layout: { decoded_len | content }
243                buf.reserve(buf_len + 4 + bound as usize);
244                let buf_ptr = buf.as_mut_ptr();
245
246                let le_len = content_len.to_le_bytes();
247                ptr::copy_nonoverlapping(le_len.as_ptr(), buf_ptr.add(buf_len), 4);
248
249                let compressed = lz4_sys::LZ4_compress_fast(
250                    buf_ptr.add(skip) as _,
251                    buf_ptr.add(buf_len + 4) as _,
252                    content_len as i32,
253                    bound,
254                    level as i32,
255                );
256                if compressed == 0 {
257                    return Err(Error::Other(box_err!("Compression failed")));
258                }
259                buf.set_len(buf_len + 4 + compressed as usize);
260            }
261        }
262        Ok(())
263    }
264
265    pub fn decompress_block(src: &[u8]) -> Result<Vec<u8>> {
266        if src.len() > 4 {
267            unsafe {
268                let len = u32::from_le(ptr::read_unaligned(src.as_ptr() as *const u32));
269                let mut dst = Vec::with_capacity(len as usize);
270                let l = lz4_sys::LZ4_decompress_safe(
271                    src.as_ptr().add(4) as _,
272                    dst.as_mut_ptr() as _,
273                    src.len() as i32 - 4,
274                    dst.capacity() as i32,
275                );
276                if l == len as i32 {
277                    dst.set_len(l as usize);
278                    Ok(dst)
279                } else if l < 0 {
280                    Err(Error::Other(box_err!("Decompression failed {l}")))
281                } else {
282                    Err(Error::Corruption(format!(
283                        "Decompressed content length mismatch {l} != {len}"
284                    )))
285                }
286            }
287        } else if !src.is_empty() {
288            Err(Error::Corruption(format!(
289                "Content to compress too short {}",
290                src.len()
291            )))
292        } else {
293            Ok(Vec::new())
294        }
295    }
296
297    #[cfg(test)]
298    mod tests {
299        #[test]
300        fn test_basic() {
301            let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()];
302            for mut vec in vecs.into_iter() {
303                let uncompressed_len = vec.len();
304                super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
305                    .unwrap();
306                let res = super::decompress_block(&vec[uncompressed_len..]).unwrap();
307                assert_eq!(res, vec[..uncompressed_len].to_owned());
308            }
309        }
310    }
311}
312
313pub trait Factory<Target>: Send + Sync {
314    fn new_target(&self) -> Target;
315}
316
317/// Returns an aligned `offset`.
318///
319/// # Example:
320///
321/// ```ignore
322/// assert_eq!(round_up(18, 4), 20);
323/// assert_eq!(round_up(64, 16), 64);
324/// ```
325#[inline]
326pub fn round_up(offset: usize, alignment: usize) -> usize {
327    (offset + alignment - 1) / alignment * alignment
328}
329
330/// Returns an aligned `offset`.
331///
332/// # Example:
333///
334/// ```ignore
335/// assert_eq!(round_down(18, 4), 16);
336/// assert_eq!(round_down(64, 16), 64);
337/// ```
338#[allow(dead_code)]
339#[inline]
340pub fn round_down(offset: usize, alignment: usize) -> usize {
341    offset / alignment * alignment
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    #[test]
349    fn test_readable_size() {
350        let s = ReadableSize::kb(2);
351        assert_eq!(s.0, 2048);
352        assert_eq!(s.as_mb(), 0);
353        let s = ReadableSize::mb(2);
354        assert_eq!(s.0, 2 * 1024 * 1024);
355        assert_eq!(s.as_mb(), 2);
356        let s = ReadableSize::gb(2);
357        assert_eq!(s.0, 2 * 1024 * 1024 * 1024);
358        assert_eq!(s.as_mb(), 2048);
359
360        assert_eq!((ReadableSize::mb(2) / 2).0, MIB);
361        assert_eq!((ReadableSize::mb(1) / 2).0, 512 * KIB);
362        assert_eq!(ReadableSize::mb(2) / ReadableSize::kb(1), 2048);
363    }
364
365    #[test]
366    fn test_parse_readable_size() {
367        #[derive(Serialize, Deserialize)]
368        struct SizeHolder {
369            s: ReadableSize,
370        }
371
372        let legal_cases = vec![
373            (0, "0KiB"),
374            (2 * KIB, "2KiB"),
375            (4 * MIB, "4MiB"),
376            (5 * GIB, "5GiB"),
377            (7 * TIB, "7TiB"),
378            (11 * PIB, "11PiB"),
379        ];
380        for (size, exp) in legal_cases {
381            let c = SizeHolder {
382                s: ReadableSize(size),
383            };
384            let res_str = toml::to_string(&c).unwrap();
385            let exp_str = format!("s = {exp:?}\n");
386            assert_eq!(res_str, exp_str);
387            let res_size: SizeHolder = toml::from_str(&exp_str).unwrap();
388            assert_eq!(res_size.s.0, size);
389        }
390
391        let c = SizeHolder {
392            s: ReadableSize(512),
393        };
394        let res_str = toml::to_string(&c).unwrap();
395        assert_eq!(res_str, "s = \"512B\"\n");
396        let res_size: SizeHolder = toml::from_str(&res_str).unwrap();
397        assert_eq!(res_size.s.0, c.s.0);
398
399        let decode_cases = vec![
400            (" 0.5 PB", PIB / 2),
401            ("0.5 TB", TIB / 2),
402            ("0.5GB ", GIB / 2),
403            ("0.5MB", MIB / 2),
404            ("0.5KB", KIB / 2),
405            ("0.5P", PIB / 2),
406            ("0.5T", TIB / 2),
407            ("0.5G", GIB / 2),
408            ("0.5M", MIB / 2),
409            ("0.5K", KIB / 2),
410            ("23", 23),
411            ("1", 1),
412            ("1024B", KIB),
413            // units with binary prefixes
414            (" 0.5 PiB", PIB / 2),
415            ("1PiB", PIB),
416            ("0.5 TiB", TIB / 2),
417            ("2 TiB", TIB * 2),
418            ("0.5GiB ", GIB / 2),
419            ("787GiB ", GIB * 787),
420            ("0.5MiB", MIB / 2),
421            ("3MiB", MIB * 3),
422            ("0.5KiB", KIB / 2),
423            ("1 KiB", KIB),
424            // scientific notation
425            ("0.5e6 B", B * 500000),
426            ("0.5E6 B", B * 500000),
427            ("1e6B", B * 1000000),
428            ("8E6B", B * 8000000),
429            ("8e7", B * 80000000),
430            ("1e-1MB", MIB / 10),
431            ("1e+1MB", MIB * 10),
432            ("0e+10MB", 0),
433        ];
434        for (src, exp) in decode_cases {
435            let src = format!("s = {src:?}");
436            let res: SizeHolder = toml::from_str(&src).unwrap();
437            assert_eq!(res.s.0, exp);
438        }
439
440        let illegal_cases = vec![
441            "0.5kb", "0.5kB", "0.5Kb", "0.5k", "0.5g", "b", "gb", "1b", "B", "1K24B", " 5_KB",
442            "4B7", "5M_",
443        ];
444        for src in illegal_cases {
445            let src_str = format!("s = {src:?}");
446            assert!(toml::from_str::<SizeHolder>(&src_str).is_err(), "{}", src);
447        }
448    }
449
450    #[test]
451    fn test_unhash() {
452        assert_eq!(unhash_u64(hash_u64(777)), 777);
453    }
454
455    #[test]
456    fn test_rounding() {
457        // round_up
458        assert_eq!(round_up(18, 4), 20);
459        assert_eq!(round_up(64, 16), 64);
460        assert_eq!(round_up(79, 4096), 4096);
461        // round_down
462        assert_eq!(round_down(18, 4), 16);
463        assert_eq!(round_down(64, 16), 64);
464        assert_eq!(round_down(79, 4096), 0);
465    }
466}