snarkos_node_router_messages/helpers/
codec.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::Message;
17use snarkvm::prelude::{FromBytes, Network, ToBytes};
18
19use ::bytes::{Buf, BufMut, BytesMut};
20use core::marker::PhantomData;
21use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
22
23/// The maximum size of a message that can be transmitted during the handshake.
24const MAXIMUM_HANDSHAKE_MESSAGE_SIZE: usize = 1024 * 1024; // 1 MiB
25
26/// The maximum size of a message that can be transmitted in the network.
27pub(crate) const MAXIMUM_MESSAGE_SIZE: usize = 128 * 1024 * 1024; // 128 MiB
28
29/// The codec used to decode and encode network `Message`s.
30pub struct MessageCodec<N: Network> {
31    codec: LengthDelimitedCodec,
32    _phantom: PhantomData<N>,
33}
34
35impl<N: Network> MessageCodec<N> {
36    pub fn handshake() -> Self {
37        let mut codec = Self::default();
38        codec.codec.set_max_frame_length(MAXIMUM_HANDSHAKE_MESSAGE_SIZE);
39        codec
40    }
41}
42
43impl<N: Network> Default for MessageCodec<N> {
44    fn default() -> Self {
45        Self {
46            codec: LengthDelimitedCodec::builder().max_frame_length(MAXIMUM_MESSAGE_SIZE).little_endian().new_codec(),
47            _phantom: Default::default(),
48        }
49    }
50}
51
52impl<N: Network> Encoder<Message<N>> for MessageCodec<N> {
53    type Error = std::io::Error;
54
55    fn encode(&mut self, message: Message<N>, dst: &mut BytesMut) -> Result<(), Self::Error> {
56        // Serialize the payload directly into dst.
57        message
58            .write_le(&mut dst.writer())
59            // This error should never happen, the conversion is for greater compatibility.
60            .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "serialization error"))?;
61
62        let serialized_message = dst.split_to(dst.len()).freeze();
63
64        self.codec.encode(serialized_message, dst)
65    }
66}
67
68impl<N: Network> Decoder for MessageCodec<N> {
69    type Error = std::io::Error;
70    type Item = Message<N>;
71
72    fn decode(&mut self, source: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
73        // Decode a frame containing bytes belonging to a message.
74        let bytes = match self.codec.decode(source)? {
75            Some(bytes) => bytes,
76            None => return Ok(None),
77        };
78
79        Self::Item::check_size(&bytes)?;
80
81        // Convert the bytes to a message, or fail if it is not valid.
82        let reader = bytes.reader();
83        match Message::read_le(reader) {
84            Ok(message) => Ok(Some(message)),
85            Err(error) => {
86                warn!("Failed to deserialize a message - {}", error);
87                Err(std::io::ErrorKind::InvalidData.into())
88            }
89        }
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    use crate::{
98        UnconfirmedTransaction,
99        unconfirmed_transaction::prop_tests::{any_large_unconfirmed_transaction, any_unconfirmed_transaction},
100    };
101
102    use proptest::prelude::ProptestConfig;
103    use test_strategy::proptest;
104
105    type CurrentNetwork = snarkvm::prelude::MainnetV0;
106
107    #[proptest]
108    fn unconfirmed_transaction(#[strategy(any_unconfirmed_transaction())] tx: UnconfirmedTransaction<CurrentNetwork>) {
109        let mut bytes = BytesMut::new();
110        let mut codec = MessageCodec::<CurrentNetwork>::default();
111        assert!(codec.encode(Message::UnconfirmedTransaction(tx), &mut bytes).is_ok());
112        assert!(codec.decode(&mut bytes).is_ok());
113    }
114
115    #[proptest(ProptestConfig { cases : 10, ..ProptestConfig::default() })]
116    fn overly_large_unconfirmed_transaction(
117        #[strategy(any_large_unconfirmed_transaction())] tx: UnconfirmedTransaction<CurrentNetwork>,
118    ) {
119        let mut bytes = BytesMut::new();
120        let mut codec = MessageCodec::<CurrentNetwork>::default();
121        assert!(codec.encode(Message::UnconfirmedTransaction(tx), &mut bytes).is_ok());
122        assert!(matches!(codec.decode(&mut bytes), Err(err) if err.kind() == std::io::ErrorKind::InvalidData));
123    }
124}