1use ckb_logger::debug;
4use p2p::bytes::{BufMut, Bytes, BytesMut};
5use snap::raw::{decompress_len, Decoder as SnapDecoder, Encoder as SnapEncoder};
6
7use std::io;
8
9pub(crate) const COMPRESSION_SIZE_THRESHOLD: usize = 1024;
10const UNCOMPRESS_FLAG: u8 = 0b0000_0000;
11const COMPRESS_FLAG: u8 = 0b1000_0000;
12const MAX_UNCOMPRESSED_LEN: usize = 1 << 23; #[derive(Clone, Debug)]
30pub(crate) struct Message {
31 inner: BytesMut,
32}
33
34impl Message {
35 pub(crate) fn from_raw(data: Bytes) -> Self {
37 let mut inner = BytesMut::with_capacity(data.len() + 1);
38 inner.put_u8(UNCOMPRESS_FLAG);
39 inner.put(data);
40 Self { inner }
41 }
42
43 pub(crate) fn from_compressed(data: BytesMut) -> Self {
45 Self { inner: data }
46 }
47
48 pub(crate) fn compress(mut self) -> Bytes {
50 if self.inner.len() > COMPRESSION_SIZE_THRESHOLD {
51 let input = self.inner.split_off(1);
52 match SnapEncoder::new().compress_vec(&input) {
53 Ok(res) => {
54 self.inner.extend_from_slice(&res);
55 self.set_compress_flag();
56 }
57 Err(e) => {
58 debug!("snappy compress error: {}", e);
59 self.inner.unsplit(input);
60 }
61 }
62 }
63 self.inner.freeze()
64 }
65
66 pub(crate) fn decompress(mut self) -> Result<Bytes, io::Error> {
68 if self.inner.is_empty() {
69 Err(io::ErrorKind::InvalidData.into())
70 } else if self.compress_flag() {
71 match decompress_len(&self.inner[1..]) {
72 Ok(decompressed_bytes_len) => {
73 if decompressed_bytes_len > MAX_UNCOMPRESSED_LEN {
74 debug!(
75 "The limit for uncompressed bytes len is exceeded. limit: {}, len: {}",
76 MAX_UNCOMPRESSED_LEN, decompressed_bytes_len
77 );
78 Err(io::ErrorKind::InvalidData.into())
79 } else {
80 let mut buf = vec![0; decompressed_bytes_len];
81 match SnapDecoder::new().decompress(&self.inner[1..], &mut buf) {
82 Ok(_) => Ok(buf.into()),
83 Err(e) => {
84 debug!("snappy decompress error: {:?}", e);
85 Err(io::ErrorKind::InvalidData.into())
86 }
87 }
88 }
89 }
90 Err(e) => {
91 debug!("snappy decompress_len error: {:?}", e);
92 Err(io::ErrorKind::InvalidData.into())
93 }
94 }
95 } else {
96 let _ = self.inner.split_to(1);
97 Ok(self.inner.freeze())
98 }
99 }
100
101 pub(crate) fn set_compress_flag(&mut self) {
102 self.inner[0] = COMPRESS_FLAG;
103 }
104
105 pub(crate) fn compress_flag(&self) -> bool {
106 (self.inner[0] & COMPRESS_FLAG) != 0
107 }
108}
109
110pub fn compress(src: Bytes) -> Bytes {
112 Message::from_raw(src).compress()
113}
114
115pub fn decompress(src: BytesMut) -> Result<Bytes, io::Error> {
117 Message::from_compressed(src).decompress()
118}