Skip to main content

bitcoin/network/
stream_reader.rs

1// Rust Bitcoin Library
2// Written in 2014 by
3//     Andrew Poelstra <apoelstra@wpsoftware.net>
4//
5// To the extent possible under law, the author(s) have dedicated all
6// copyright and related and neighboring rights to this software to
7// the public domain worldwide. This software is distributed without
8// any warranty.
9//
10// You should have received a copy of the CC0 Public Domain Dedication
11// along with this software.
12// If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13//
14
15//! Stream reader
16//!
17//! This module defines `StreamReader` struct and its implementation which is used
18//! for parsing incoming stream into separate `RawNetworkMessage`s, handling assembling
19//! messages from multiple packets or dealing with partial or multiple messages in the stream
20//! (like can happen with reading from TCP socket)
21//!
22
23use std::fmt;
24use std::io::{self, Read};
25
26use consensus::{encode, Decodable};
27
28/// Struct used to configure stream reader function
29pub struct StreamReader<R: Read> {
30    /// Stream to read from
31    pub stream: R,
32    /// I/O buffer
33    data: Vec<u8>,
34    /// Buffer containing unparsed message part
35    unparsed: Vec<u8>
36}
37
38impl<R: Read> fmt::Debug for StreamReader<R> {
39    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40        write!(f, "StreamReader with buffer_size={} and unparsed content {:?}",
41               self.data.capacity(), self.unparsed)
42    }
43}
44
45impl<R: Read> StreamReader<R> {
46    /// Constructs new stream reader for a given input stream `stream` with
47    /// optional parameter `buffer_size` determining reading buffer size
48    pub fn new(stream: R, buffer_size: Option<usize>) -> StreamReader<R> {
49        StreamReader {
50            stream,
51            data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
52            unparsed: vec![]
53        }
54    }
55
56    /// Reads stream and parses next message from its current input,
57    /// also taking into account previously unparsed partial message (if there was such).
58    pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
59        loop {
60            match encode::deserialize_partial::<D>(&self.unparsed) {
61                // In this case we just have an incomplete data, so we need to read more
62                Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => {
63                    let count = self.stream.read(&mut self.data)?;
64                    if count > 0 {
65                        self.unparsed.extend(self.data[0..count].iter());
66                    }
67                    else {
68                        return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof)));
69                    }
70                },
71                Err(err) => return Err(err),
72                // We have successfully read from the buffer
73                Ok((message, index)) => {
74                    self.unparsed.drain(..index);
75                    return Ok(message)
76                },
77            }
78        }
79    }
80}
81
82#[cfg(test)]
83mod test {
84    use std::thread;
85    use std::time::Duration;
86    use std::io::{self, BufReader, Write};
87    use std::net::{TcpListener, TcpStream, Shutdown};
88    use std::thread::JoinHandle;
89    use network::constants::ServiceFlags;
90
91    use super::StreamReader;
92    use network::message::{NetworkMessage, RawNetworkMessage};
93
94    // First, let's define some byte arrays for sample messages - dumps are taken from live
95    // Bitcoin Core node v0.17.1 with Wireshark
96    const MSG_VERSION: [u8; 126] = [
97        0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
98        0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
99        0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
100        0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
101        0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
102        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
103        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
104        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
105        0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
106        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
107        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
108        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
109        0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
110        0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
111        0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
112        0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01
113    ];
114
115    const MSG_VERACK: [u8; 24] = [
116        0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61,
117        0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
118        0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2
119    ];
120
121    const MSG_PING: [u8; 32] = [
122        0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67,
123        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
124        0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d,
125        0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
126    ];
127
128    const MSG_ALERT: [u8; 192] = [
129        0xf9, 0xbe, 0xb4, 0xd9, 0x61, 0x6c, 0x65, 0x72,
130        0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
131        0xa8, 0x00, 0x00, 0x00, 0x1b, 0xf9, 0xaa, 0xea,
132        0x60, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
133        0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
134        0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
135        0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
136        0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
137        0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
138        0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
139        0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
140        0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
141        0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
142        0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
143        0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
144        0x00, 0x46, 0x30, 0x44, 0x02, 0x20, 0x65, 0x3f,
145        0xeb, 0xd6, 0x41, 0x0f, 0x47, 0x0f, 0x6b, 0xae,
146        0x11, 0xca, 0xd1, 0x9c, 0x48, 0x41, 0x3b, 0xec,
147        0xb1, 0xac, 0x2c, 0x17, 0xf9, 0x08, 0xfd, 0x0f,
148        0xd5, 0x3b, 0xdc, 0x3a, 0xbd, 0x52, 0x02, 0x20,
149        0x6d, 0x0e, 0x9c, 0x96, 0xfe, 0x88, 0xd4, 0xa0,
150        0xf0, 0x1e, 0xd9, 0xde, 0xda, 0xe2, 0xb6, 0xf9,
151        0xe0, 0x0d, 0xa9, 0x4c, 0xad, 0x0f, 0xec, 0xaa,
152        0xe6, 0x6e, 0xcf, 0x68, 0x9b, 0xf7, 0x1b, 0x50
153    ];
154
155    // Helper functions that checks parsed versions of the messages from the byte arrays above
156    fn check_version_msg(msg: &RawNetworkMessage) {
157        assert_eq!(msg.magic, 0xd9b4bef9);
158        if let NetworkMessage::Version(ref version_msg) = msg.payload {
159            assert_eq!(version_msg.version, 70015);
160            assert_eq!(version_msg.services, ServiceFlags::NETWORK | ServiceFlags::BLOOM | ServiceFlags::WITNESS | ServiceFlags::NETWORK_LIMITED);
161            assert_eq!(version_msg.timestamp, 1548554224);
162            assert_eq!(version_msg.nonce, 13952548347456104954);
163            assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
164            assert_eq!(version_msg.start_height, 560275);
165            assert_eq!(version_msg.relay, true);
166        } else {
167            panic!("Wrong message type: expected VersionMessage");
168        }
169    }
170
171    fn check_alert_msg(msg: &RawNetworkMessage) {
172        assert_eq!(msg.magic, 0xd9b4bef9);
173        if let NetworkMessage::Alert(ref alert) = msg.payload {
174            assert_eq!(alert.clone(), [
175                0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
176                0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
177                0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
178                0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
179                0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
180                0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
181                0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
182                0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
183                0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
184                0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
185                0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
186                0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
187                0x00,
188            ].to_vec());
189        } else {
190            panic!("Wrong message type: expected AlertMessage");
191        }
192    }
193
194    #[test]
195    fn parse_multipartmsg_test() {
196        let stream = io::empty();
197        let mut reader = StreamReader::new(stream, None);
198        reader.unparsed = MSG_ALERT[..24].to_vec();
199        let message: Result<RawNetworkMessage, _> = reader.read_next();
200        assert!(message.is_err());
201        assert_eq!(reader.unparsed.len(), 24);
202
203        reader.unparsed = MSG_ALERT.to_vec();
204        let message = reader.read_next().unwrap();
205        assert_eq!(reader.unparsed.len(), 0);
206
207        check_alert_msg(&message);
208    }
209
210    #[test]
211    fn read_singlemsg_test() {
212        let stream = MSG_VERSION[..].to_vec();
213        let stream = stream.as_slice();
214        let message = StreamReader::new(stream, None).read_next().unwrap();
215        check_version_msg(&message);
216    }
217
218    #[test]
219    fn read_doublemsgs_test() {
220        let mut stream = MSG_VERSION.to_vec();
221        stream.extend(&MSG_PING);
222        let stream = stream.as_slice();
223        let mut reader = StreamReader::new(stream, None);
224        let message = reader.read_next().unwrap();
225        check_version_msg(&message);
226
227        let msg: RawNetworkMessage = reader.read_next().unwrap();
228        assert_eq!(msg.magic, 0xd9b4bef9);
229        if let NetworkMessage::Ping(nonce) = msg.payload {
230            assert_eq!(nonce, 100);
231        } else {
232            panic!("Wrong message type, expected PingMessage");
233        }
234    }
235
236    // Helper function that set ups emulation of client-server TCP connection for
237    // testing message transfer via TCP packets
238    fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, BufReader<TcpStream>) {
239        // 1. Creating server part (emulating Bitcoin Core node)
240        let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap();
241        let port = listener.local_addr().unwrap().port();
242        // 2. Spawning thread that will be writing our messages to the TCP Stream at the server side
243        // in async mode
244        let handle = thread::spawn(move || {
245            for ostream in listener.incoming() {
246                let mut ostream = ostream.unwrap();
247
248                for piece in pieces {
249                    ostream.write(&piece[..]).unwrap();
250                    ostream.flush().unwrap();
251                    thread::sleep(Duration::from_secs(1));
252                }
253
254                ostream.shutdown(Shutdown::Both).unwrap();
255                break;
256            }
257        });
258
259        // 3. Creating client side of the TCP socket connection
260        thread::sleep(Duration::from_secs(1));
261        let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
262        let reader = BufReader::new(istream);
263
264        return (handle, reader)
265    }
266
267    #[test]
268    fn read_multipartmsg_test() {
269        // Setting up TCP connection emulation
270        let (handle, istream) = serve_tcp(vec![
271            // single message split in two parts to emulate real network conditions
272            MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
273        ]);
274        let stream = istream;
275        let mut reader = StreamReader::new(stream, None);
276
277        // Reading and checking the whole message back
278        let message = reader.read_next().unwrap();
279        check_version_msg(&message);
280
281        // Waiting TCP server thread to terminate
282        handle.join().unwrap();
283    }
284
285    #[test]
286    fn read_sequencemsg_test() {
287        // Setting up TCP connection emulation
288        let (handle, istream) = serve_tcp(vec![
289            // Real-world Bitcoin core communication case for /Satoshi:0.17.1/
290            MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(),
291            MSG_VERACK.to_vec(),
292            MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
293        ]);
294        let stream = istream;
295        let mut reader = StreamReader::new(stream, None);
296
297        // Reading and checking the first message (Version)
298        let message = reader.read_next().unwrap();
299        check_version_msg(&message);
300
301        // Reading and checking the second message (Verack)
302        let msg: RawNetworkMessage = reader.read_next().unwrap();
303        assert_eq!(msg.magic, 0xd9b4bef9);
304        assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage");
305
306        // Reading and checking the third message (Alert)
307        let msg = reader.read_next().unwrap();
308        check_alert_msg(&msg);
309
310        // Waiting TCP server thread to terminate
311        handle.join().unwrap();
312    }
313
314    #[test]
315    fn read_block_from_file_test() {
316        use std::io;
317        use consensus::serialize;
318        use hashes::hex::FromHex;
319        use Block;
320
321        let normal_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac00000000").unwrap();
322        let cutoff_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac").unwrap();
323        let prevhash = Vec::from_hex("4ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000").unwrap();
324        let merkle = Vec::from_hex("bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914c").unwrap();
325
326        let stream = io::BufReader::new(&normal_data[..]);
327        let mut reader = StreamReader::new(stream, None);
328        let normal_block = reader.read_next::<Block>();
329
330        let stream = io::BufReader::new(&cutoff_data[..]);
331        let mut reader = StreamReader::new(stream, None);
332        let cutoff_block = reader.read_next::<Block>();
333
334        assert!(normal_block.is_ok());
335        assert!(cutoff_block.is_err());
336        let block = normal_block.unwrap();
337        assert_eq!(block.header.version, 1);
338        assert_eq!(serialize(&block.header.prev_blockhash), prevhash);
339        assert_eq!(serialize(&block.header.merkle_root), merkle);
340        assert_eq!(block.header.time, 1231965655);
341        assert_eq!(block.header.bits, 486604799);
342        assert_eq!(block.header.nonce, 2067413810);
343
344        // should be also ok for a non-witness block as commitment is optional in that case
345        assert!(block.check_witness_commitment());
346    }
347}