ckb_network/
compress.rs

1//!ckb network compress module
2
3use ckb_logger::debug;
4use p2p::bytes::{BufMut, Bytes, BytesMut};
5use snap::raw::{decompress_len, Decoder as SnapDecoder, Encoder as SnapEncoder};
6
7use std::io;
8
9pub(crate) const COMPRESSION_SIZE_THRESHOLD: usize = 1024;
10const UNCOMPRESS_FLAG: u8 = 0b0000_0000;
11const COMPRESS_FLAG: u8 = 0b1000_0000;
12const MAX_UNCOMPRESSED_LEN: usize = 1 << 23; // 8MB
13
14/// Compressed decompression structure
15///
16/// If you want to support multiple compression formats in the future,
17/// you can simply think that 0b1000 is in snappy format and 0b0000 is in uncompressed format.
18///
19/// # Message in Bytes:
20///
21/// +---------------------------------------------------------------+
22/// | Bytes | Type | Function                                       |
23/// |-------+------+------------------------------------------------|
24/// |   0   |  u1  | Compress: true 1, false 0                      |
25/// |       |  u7  | Reserved                                       |
26/// +-------+------+------------------------------------------------+
27/// |  1~   |      | Payload (Serialized Data with Compress)        |
28/// +-------+------+------------------------------------------------+
29#[derive(Clone, Debug)]
30pub(crate) struct Message {
31    inner: BytesMut,
32}
33
34impl Message {
35    /// create from uncompressed raw data
36    pub(crate) fn from_raw(data: Bytes) -> Self {
37        let mut inner = BytesMut::with_capacity(data.len() + 1);
38        inner.put_u8(UNCOMPRESS_FLAG);
39        inner.put(data);
40        Self { inner }
41    }
42
43    /// create from compressed data
44    pub(crate) fn from_compressed(data: BytesMut) -> Self {
45        Self { inner: data }
46    }
47
48    /// Compress message
49    pub(crate) fn compress(mut self) -> Bytes {
50        if self.inner.len() > COMPRESSION_SIZE_THRESHOLD {
51            let input = self.inner.split_off(1);
52            match SnapEncoder::new().compress_vec(&input) {
53                Ok(res) => {
54                    self.inner.extend_from_slice(&res);
55                    self.set_compress_flag();
56                }
57                Err(e) => {
58                    debug!("snappy compress error: {}", e);
59                    self.inner.unsplit(input);
60                }
61            }
62        }
63        self.inner.freeze()
64    }
65
66    /// Decompress message
67    pub(crate) fn decompress(mut self) -> Result<Bytes, io::Error> {
68        if self.inner.is_empty() {
69            Err(io::ErrorKind::InvalidData.into())
70        } else if self.compress_flag() {
71            match decompress_len(&self.inner[1..]) {
72                Ok(decompressed_bytes_len) => {
73                    if decompressed_bytes_len > MAX_UNCOMPRESSED_LEN {
74                        debug!(
75                            "The limit for uncompressed bytes len is exceeded. limit: {}, len: {}",
76                            MAX_UNCOMPRESSED_LEN, decompressed_bytes_len
77                        );
78                        Err(io::ErrorKind::InvalidData.into())
79                    } else {
80                        let mut buf = vec![0; decompressed_bytes_len];
81                        match SnapDecoder::new().decompress(&self.inner[1..], &mut buf) {
82                            Ok(_) => Ok(buf.into()),
83                            Err(e) => {
84                                debug!("snappy decompress error: {:?}", e);
85                                Err(io::ErrorKind::InvalidData.into())
86                            }
87                        }
88                    }
89                }
90                Err(e) => {
91                    debug!("snappy decompress_len error: {:?}", e);
92                    Err(io::ErrorKind::InvalidData.into())
93                }
94            }
95        } else {
96            let _ = self.inner.split_to(1);
97            Ok(self.inner.freeze())
98        }
99    }
100
101    pub(crate) fn set_compress_flag(&mut self) {
102        self.inner[0] = COMPRESS_FLAG;
103    }
104
105    pub(crate) fn compress_flag(&self) -> bool {
106        (self.inner[0] & COMPRESS_FLAG) != 0
107    }
108}
109
110/// Compress data
111pub fn compress(src: Bytes) -> Bytes {
112    Message::from_raw(src).compress()
113}
114
115/// Decompress data
116pub fn decompress(src: BytesMut) -> Result<Bytes, io::Error> {
117    Message::from_compressed(src).decompress()
118}