nakamoto_p2p/
stream.rs

1//! Message stream utilities.
2use std::io;
3
4use nakamoto_common::bitcoin::consensus::{encode, Decodable};
5
6/// Message stream decoder.
7///
8/// Used to for example turn a byte stream into network messages.
9#[derive(Debug)]
10pub struct Decoder {
11    unparsed: Vec<u8>,
12}
13
14impl Decoder {
15    /// Create a new stream decoder.
16    pub fn new(capacity: usize) -> Self {
17        Self {
18            unparsed: Vec::with_capacity(capacity),
19        }
20    }
21
22    /// Input bytes into the decoder.
23    pub fn input(&mut self, bytes: &[u8]) {
24        self.unparsed.extend_from_slice(bytes);
25    }
26
27    /// Decode and return the next message. Returns [`None`] if nothing was decoded.
28    pub fn decode_next<D: Decodable>(&mut self) -> Result<Option<D>, encode::Error> {
29        match encode::deserialize_partial::<D>(&self.unparsed) {
30            Ok((msg, index)) => {
31                // Drain deserialized bytes only.
32                self.unparsed.drain(..index);
33                Ok(Some(msg))
34            }
35
36            Err(encode::Error::Io(ref err)) if err.kind() == io::ErrorKind::UnexpectedEof => {
37                Ok(None)
38            }
39            Err(err) => Err(err),
40        }
41    }
42}
43
44#[cfg(test)]
45mod test {
46    use super::*;
47    use nakamoto_common::bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
48    use quickcheck_macros::quickcheck;
49
50    const MSG_VERACK: [u8; 24] = [
51        0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61, 0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00,
52        0x00, 0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2,
53    ];
54
55    const MSG_PING: [u8; 32] = [
56        0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
57        0x00, 0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00,
58        0x00, 0x00,
59    ];
60
61    #[quickcheck]
62    fn prop_decode_next(chunk_size: usize) {
63        let mut bytes = vec![];
64        let mut msgs = vec![];
65        let mut decoder = Decoder::new(1024);
66
67        let chunk_size = 1 + chunk_size % decoder.unparsed.capacity();
68
69        bytes.extend_from_slice(&MSG_VERACK);
70        bytes.extend_from_slice(&MSG_PING);
71
72        for chunk in bytes.as_slice().chunks(chunk_size) {
73            decoder.input(chunk);
74
75            while let Some(msg) = decoder.decode_next::<RawNetworkMessage>().unwrap() {
76                msgs.push(msg);
77            }
78        }
79
80        assert_eq!(decoder.unparsed.len(), 0);
81        assert_eq!(msgs.len(), 2);
82        assert_eq!(
83            msgs[0],
84            RawNetworkMessage {
85                magic: 3652501241,
86                payload: NetworkMessage::Verack
87            }
88        );
89        assert_eq!(
90            msgs[1],
91            RawNetworkMessage {
92                magic: 3652501241,
93                payload: NetworkMessage::Ping(100),
94            }
95        );
96    }
97}