tendermint_abci/
codec.rs

1//! Encoding/decoding mechanisms for ABCI requests and responses.
2//!
3//! Implements the [Tendermint Socket Protocol][tsp].
4//!
5//! [tsp]: https://github.com/tendermint/tendermint/blob/v0.34.x/spec/abci/client-server.md#tsp
6
7use std::{
8    io::{Read, Write},
9    marker::PhantomData,
10};
11
12use bytes::{Buf, BufMut, BytesMut};
13use prost::Message;
14use tendermint_proto::v0_38::abci::{Request, Response};
15
16use crate::error::Error;
17
18/// The maximum number of bytes we expect in a varint. We use this to check if
19/// we're encountering a decoding error for a varint.
20pub const MAX_VARINT_LENGTH: usize = 16;
21
22/// The server receives incoming requests, and sends outgoing responses.
23pub type ServerCodec<S> = Codec<S, Request, Response>;
24
25#[cfg(feature = "client")]
26/// The client sends outgoing requests, and receives incoming responses.
27pub type ClientCodec<S> = Codec<S, Response, Request>;
28
29/// Allows for iteration over `S` to produce instances of `I`, as well as
30/// sending instances of `O`.
31pub struct Codec<S, I, O> {
32    stream: S,
33    // Long-running read buffer
34    read_buf: BytesMut,
35    // Fixed-length read window
36    read_window: Vec<u8>,
37    write_buf: BytesMut,
38    _incoming: PhantomData<I>,
39    _outgoing: PhantomData<O>,
40}
41
42impl<S, I, O> Codec<S, I, O>
43where
44    S: Read + Write,
45    I: Message + Default,
46    O: Message,
47{
48    /// Constructor.
49    pub fn new(stream: S, read_buf_size: usize) -> Self {
50        Self {
51            stream,
52            read_buf: BytesMut::new(),
53            read_window: vec![0_u8; read_buf_size],
54            write_buf: BytesMut::new(),
55            _incoming: Default::default(),
56            _outgoing: Default::default(),
57        }
58    }
59}
60
61// Iterating over a codec produces instances of `Result<I>`.
62impl<S, I, O> Iterator for Codec<S, I, O>
63where
64    S: Read,
65    I: Message + Default,
66{
67    type Item = Result<I, Error>;
68
69    fn next(&mut self) -> Option<Self::Item> {
70        loop {
71            // Try to decode an incoming message from our buffer first
72            match decode_length_delimited::<I>(&mut self.read_buf) {
73                Ok(Some(incoming)) => return Some(Ok(incoming)),
74                Err(e) => return Some(Err(e)),
75                _ => (), // not enough data to decode a message, let's continue.
76            }
77
78            // If we don't have enough data to decode a message, try to read
79            // more
80            let bytes_read = match self.stream.read(self.read_window.as_mut()) {
81                Ok(br) => br,
82                Err(e) => return Some(Err(Error::io(e))),
83            };
84            if bytes_read == 0 {
85                // The underlying stream terminated
86                return None;
87            }
88            self.read_buf
89                .extend_from_slice(&self.read_window[..bytes_read]);
90        }
91    }
92}
93
94impl<S, I, O> Codec<S, I, O>
95where
96    S: Write,
97    O: Message,
98{
99    /// Send a message using this codec.
100    pub fn send(&mut self, message: O) -> Result<(), Error> {
101        encode_length_delimited(message, &mut self.write_buf)?;
102        while !self.write_buf.is_empty() {
103            let bytes_written = self
104                .stream
105                .write(self.write_buf.as_ref())
106                .map_err(Error::io)?;
107
108            if bytes_written == 0 {
109                return Err(Error::io(std::io::Error::new(
110                    std::io::ErrorKind::WriteZero,
111                    "failed to write to underlying stream",
112                )));
113            }
114            self.write_buf.advance(bytes_written);
115        }
116
117        self.stream.flush().map_err(Error::io)?;
118
119        Ok(())
120    }
121}
122
123/// Encode the given message with a length prefix.
124pub fn encode_length_delimited<M, B>(message: M, mut dst: &mut B) -> Result<(), Error>
125where
126    M: Message,
127    B: BufMut,
128{
129    let mut buf = BytesMut::new();
130    message.encode(&mut buf).map_err(Error::encode)?;
131
132    let buf = buf.freeze();
133    prost::encoding::encode_varint(buf.len() as u64, &mut dst);
134    dst.put(buf);
135    Ok(())
136}
137
138/// Attempt to decode a message of type `M` from the given source buffer.
139pub fn decode_length_delimited<M>(src: &mut BytesMut) -> Result<Option<M>, Error>
140where
141    M: Message + Default,
142{
143    let src_len = src.len();
144    let mut tmp = src.clone().freeze();
145    let encoded_len = match prost::encoding::decode_varint(&mut tmp) {
146        Ok(len) => len,
147        // We've potentially only received a partial length delimiter
148        Err(_) if src_len <= MAX_VARINT_LENGTH => return Ok(None),
149        Err(e) => return Err(Error::decode(e)),
150    };
151    let remaining = tmp.remaining() as u64;
152    if remaining < encoded_len {
153        // We don't have enough data yet to decode the entire message
154        Ok(None)
155    } else {
156        let delim_len = src_len - tmp.remaining();
157        // We only advance the source buffer once we're sure we have enough
158        // data to try to decode the result.
159        src.advance(delim_len + (encoded_len as usize));
160
161        let mut result_bytes = BytesMut::from(tmp.split_to(encoded_len as usize).as_ref());
162        let res = M::decode(&mut result_bytes).map_err(Error::decode)?;
163
164        Ok(Some(res))
165    }
166}