cometbft_p2p/
traits.rs

1//! Helper traits for reading and writing Protobuf messages.
2
3use crate::{Error, MAX_MSG_LEN, Result, proto};
4use prost::Message;
5use std::io::{self, Read, Write};
6
7#[cfg(feature = "async")]
8use std::future::Future;
9
10// NOTE: async trait definitions below opt to place the `M` parameter on the trait to make it
11// possible to turbofish the result message, which is helpful when dealing with an `impl Future`
12// output where it may be difficult to notate the return type. The use of RPIT means the trait
13// can't be dyn compatible / object safe anyway.
14
15/// Read the given Protobuf message from the underlying I/O object (async).
16#[cfg(feature = "async")]
17pub trait AsyncReadMsg {
18    /// Read from the underlying I/O object, decrypting the data and decoding it into the given
19    /// Protobuf message.
20    fn read_msg<M: Message + Default>(&mut self) -> impl Future<Output = Result<M>> + Send + Sync;
21}
22
23/// Write the given Protobuf message to the underlying I/O object (async).
24#[cfg(feature = "async")]
25pub trait AsyncWriteMsg {
26    /// Encode the given Protobuf as bytes, encrypted it, and write the ciphertext to the underlying
27    /// I/O object.
28    ///
29    /// Deliberately takes ownership of the message to send to simplify writing async code.
30    fn write_msg<M: Message>(&mut self, msg: M) -> impl Future<Output = Result<()>> + Send + Sync;
31}
32
33// NOTE: trait definitions below use a generic `M` parameter on the trait rather than the method to
34// support dyn compatibility / object safety.
35//
36// For example, tmkms has a `ReadMsg + WriteMsg` connection type it stores in a `Box`.
37
38/// Read the given Protobuf message from the underlying I/O object.
39pub trait ReadMsg<M: Message + Default> {
40    /// Read from the underlying I/O object, decoding (and if necessary decrypting) the data
41    /// to the given Protobuf message.
42    fn read_msg(&mut self) -> Result<M>;
43}
44
45/// Write the given Protobuf message to the underlying I/O object.
46pub trait WriteMsg<M: Message> {
47    /// Encode the given Protobuf as bytes and write them to the underlying I/O object
48    /// (and encrypting if necessary).
49    fn write_msg(&mut self, msg: &M) -> Result<()>;
50}
51
52impl<M: Message + Default, Io: Read> ReadMsg<M> for Io {
53    fn read_msg(&mut self) -> Result<M> {
54        /// Message prefix length to always consume. This also represents the minimum message size.
55        ///
56        /// This is picked to ensure that the entire length prefix will always fit in this size,
57        /// namely  we only support up to 1 MiB messages (`MAX_MSG_LEN`), which use max 3-byte
58        /// length prefixes.
59        const PREFIX_LEN: usize = 3;
60
61        let mut prefix = [0u8; PREFIX_LEN];
62        self.read_exact(&mut prefix)?;
63
64        let msg_len = proto::decode_length_delimiter_inclusive(&prefix)?;
65
66        // Reject messages that are too small or too large.
67        if !(PREFIX_LEN..=MAX_MSG_LEN).contains(&msg_len) {
68            return Err(Error::MessageSize { size: msg_len });
69        }
70
71        // Allocate a buffer on the heap and consume the remaining data.
72        let mut msg = vec![0u8; msg_len];
73        msg[..PREFIX_LEN].copy_from_slice(&prefix);
74        self.read_exact(&mut msg[PREFIX_LEN..])?;
75
76        Ok(M::decode_length_delimited(msg.as_slice())?)
77    }
78}
79
80impl<M: Message, Io: Write> WriteMsg<M> for Io {
81    fn write_msg(&mut self, msg: &M) -> Result<()> {
82        let bytes = msg.encode_length_delimited_to_vec();
83        Ok(self.write_all(&bytes)?)
84    }
85}
86
87/// Attempt to clone an I/O object, returning an `io::Result`.
88pub trait TryCloneIo: Sized {
89    /// Try to clone the given I/O object.
90    fn try_clone(&self) -> io::Result<Self>;
91}
92
93impl TryCloneIo for std::net::TcpStream {
94    fn try_clone(&self) -> io::Result<Self> {
95        self.try_clone()
96    }
97}
98
99#[cfg(unix)]
100impl TryCloneIo for std::os::unix::net::UnixStream {
101    fn try_clone(&self) -> io::Result<Self> {
102        self.try_clone()
103    }
104}
105
106// NOTE: tested indirectly via `SecretConnection`