Skip to main content

avalanche_types/message/
app_request.rs

1use std::io::{self, Error, ErrorKind};
2
3use crate::{ids, message, proto::pb::p2p};
4use prost::Message as ProstMessage;
5
6#[derive(Debug, PartialEq, Clone)]
7pub struct Message {
8    pub msg: p2p::AppRequest,
9    pub gzip_compress: bool,
10}
11
12impl Default for Message {
13    fn default() -> Self {
14        Message {
15            msg: p2p::AppRequest {
16                chain_id: prost::bytes::Bytes::new(),
17                request_id: 0,
18                deadline: 0,
19                app_bytes: prost::bytes::Bytes::new(),
20            },
21            gzip_compress: false,
22        }
23    }
24}
25
26impl Message {
27    #[must_use]
28    pub fn chain_id(mut self, chain_id: ids::Id) -> Self {
29        self.msg.chain_id = prost::bytes::Bytes::from(chain_id.to_vec());
30        self
31    }
32
33    #[must_use]
34    pub fn request_id(mut self, request_id: u32) -> Self {
35        self.msg.request_id = request_id;
36        self
37    }
38
39    #[must_use]
40    pub fn deadline(mut self, deadline: u64) -> Self {
41        self.msg.deadline = deadline;
42        self
43    }
44
45    #[must_use]
46    pub fn app_bytes(mut self, app_bytes: Vec<u8>) -> Self {
47        self.msg.app_bytes = prost::bytes::Bytes::from(app_bytes);
48        self
49    }
50
51    #[must_use]
52    pub fn gzip_compress(mut self, gzip_compress: bool) -> Self {
53        self.gzip_compress = gzip_compress;
54        self
55    }
56
57    pub fn serialize(&self) -> io::Result<Vec<u8>> {
58        let msg = p2p::Message {
59            message: Some(p2p::message::Message::AppRequest(self.msg.clone())),
60        };
61        let encoded = ProstMessage::encode_to_vec(&msg);
62        if !self.gzip_compress {
63            return Ok(encoded);
64        }
65
66        let uncompressed_len = encoded.len();
67        let compressed = message::compress::pack_gzip(&encoded)?;
68        let msg = p2p::Message {
69            message: Some(p2p::message::Message::CompressedGzip(
70                prost::bytes::Bytes::from(compressed),
71            )),
72        };
73
74        let compressed_len = msg.encoded_len();
75        if uncompressed_len > compressed_len {
76            log::debug!(
77                "app_request compression saved {} bytes",
78                uncompressed_len - compressed_len
79            );
80        } else {
81            log::debug!(
82                "app_request compression added {} byte(s)",
83                compressed_len - uncompressed_len
84            );
85        }
86
87        Ok(ProstMessage::encode_to_vec(&msg))
88    }
89
90    pub fn deserialize(d: impl AsRef<[u8]>) -> io::Result<Self> {
91        let buf = bytes::Bytes::from(d.as_ref().to_vec());
92        let p2p_msg: p2p::Message = ProstMessage::decode(buf).map_err(|e| {
93            Error::new(
94                ErrorKind::InvalidData,
95                format!("failed prost::Message::decode '{}'", e),
96            )
97        })?;
98
99        match p2p_msg.message.unwrap() {
100            // was not compressed
101            p2p::message::Message::AppRequest(msg) => Ok(Message {
102                msg,
103                gzip_compress: false,
104            }),
105
106            // was compressed, so need decompress first
107            p2p::message::Message::CompressedGzip(msg) => {
108                let decompressed = message::compress::unpack_gzip(msg.as_ref())?;
109                let decompressed_msg: p2p::Message =
110                    ProstMessage::decode(prost::bytes::Bytes::from(decompressed)).map_err(|e| {
111                        Error::new(
112                            ErrorKind::InvalidData,
113                            format!("failed prost::Message::decode '{}'", e),
114                        )
115                    })?;
116                match decompressed_msg.message.unwrap() {
117                    p2p::message::Message::AppRequest(msg) => Ok(Message {
118                        msg,
119                        gzip_compress: false,
120                    }),
121                    _ => Err(Error::new(
122                        ErrorKind::InvalidInput,
123                        "unknown message type after decompress",
124                    )),
125                }
126            }
127
128            // unknown message enum
129            _ => Err(Error::new(ErrorKind::InvalidInput, "unknown message type")),
130        }
131    }
132}
133
134/// RUST_LOG=debug cargo test --package avalanche-types --lib -- message::app_request::test_message --exact --show-output
135#[test]
136fn test_message() {
137    let _ = env_logger::builder()
138        .filter_level(log::LevelFilter::Debug)
139        .is_test(true)
140        .try_init();
141
142    let msg1_with_no_compression = Message::default()
143        .chain_id(ids::Id::from_slice(
144            &random_manager::secure_bytes(32).unwrap(),
145        ))
146        .request_id(random_manager::u32())
147        .deadline(random_manager::u64())
148        .app_bytes(vec![0u8; 100]);
149
150    let data1 = msg1_with_no_compression.serialize().unwrap();
151    let msg1_with_no_compression_deserialized = Message::deserialize(data1).unwrap();
152    assert_eq!(
153        msg1_with_no_compression,
154        msg1_with_no_compression_deserialized
155    );
156
157    let msg2_with_compression = msg1_with_no_compression.clone().gzip_compress(true);
158    assert_ne!(msg1_with_no_compression, msg2_with_compression);
159
160    let data2 = msg2_with_compression.serialize().unwrap();
161    let msg2_with_compression_deserialized = Message::deserialize(data2).unwrap();
162    assert_eq!(msg1_with_no_compression, msg2_with_compression_deserialized);
163}