sapio_bitcoin/network/
stream_reader.rs

1// Rust Bitcoin Library
2// Written in 2014 by
3//     Andrew Poelstra <apoelstra@wpsoftware.net>
4//
5// To the extent possible under law, the author(s) have dedicated all
6// copyright and related and neighboring rights to this software to
7// the public domain worldwide. This software is distributed without
8// any warranty.
9//
10// You should have received a copy of the CC0 Public Domain Dedication
11// along with this software.
12// If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
13//
14
15//! Stream reader.
16//!
17//! Deprecated
18//!
19//! This module defines `StreamReader` struct and its implementation which is used
20//! for parsing incoming stream into separate `Decodable`s, handling assembling
21//! messages from multiple packets or dealing with partial or multiple messages in the stream
22//! (like can happen with reading from TCP socket)
23//!
24
25use core::fmt;
26use io::{Read, BufReader};
27
28use consensus::{encode, Decodable};
29
30/// Struct used to configure stream reader function
31pub struct StreamReader<R: Read> {
32    /// Stream to read from
33    pub stream: BufReader<R>,
34}
35
36impl<R: Read> fmt::Debug for StreamReader<R> {
37    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38        write!(f, "StreamReader")
39    }
40}
41
42impl<R: Read> StreamReader<R> {
43    /// Constructs new stream reader for a given input stream `stream`
44    #[deprecated(since = "0.28.0", note = "wrap your stream into a buffered reader if necessary and use consensus_encode directly")]
45    pub fn new(stream: R, _buffer_size: Option<usize>) -> StreamReader<R> {
46        StreamReader {
47            stream: BufReader::new(stream),
48        }
49    }
50
51    /// Reads stream and parses next message from its current input
52    #[deprecated(since = "0.28.0", note = "wrap your stream into a buffered reader if necessary and use consensus_encode directly")]
53    pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
54        Decodable::consensus_decode(&mut self.stream)
55    }
56}
57
58#[allow(deprecated)]
59#[cfg(test)]
60mod test {
61    use std::thread;
62    use std::time::Duration;
63    use io::{BufReader, Write};
64    use std::net::{TcpListener, TcpStream, Shutdown};
65    use std::thread::JoinHandle;
66    use network::constants::ServiceFlags;
67
68    use super::StreamReader;
69    use network::message::{NetworkMessage, RawNetworkMessage};
70
71    // First, let's define some byte arrays for sample messages - dumps are taken from live
72    // Bitcoin Core node v0.17.1 with Wireshark
73    const MSG_VERSION: [u8; 126] = [
74        0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
75        0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
76        0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
77        0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
78        0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
79        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
80        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
81        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
82        0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
83        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
84        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
85        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
86        0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
87        0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
88        0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
89        0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01
90    ];
91
92    const MSG_VERACK: [u8; 24] = [
93        0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61,
94        0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
95        0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2
96    ];
97
98    const MSG_PING: [u8; 32] = [
99        0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67,
100        0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
101        0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d,
102        0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
103    ];
104
105    const MSG_ALERT: [u8; 192] = [
106        0xf9, 0xbe, 0xb4, 0xd9, 0x61, 0x6c, 0x65, 0x72,
107        0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
108        0xa8, 0x00, 0x00, 0x00, 0x1b, 0xf9, 0xaa, 0xea,
109        0x60, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
110        0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
111        0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
112        0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
113        0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
114        0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
115        0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
116        0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
117        0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
118        0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
119        0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
120        0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
121        0x00, 0x46, 0x30, 0x44, 0x02, 0x20, 0x65, 0x3f,
122        0xeb, 0xd6, 0x41, 0x0f, 0x47, 0x0f, 0x6b, 0xae,
123        0x11, 0xca, 0xd1, 0x9c, 0x48, 0x41, 0x3b, 0xec,
124        0xb1, 0xac, 0x2c, 0x17, 0xf9, 0x08, 0xfd, 0x0f,
125        0xd5, 0x3b, 0xdc, 0x3a, 0xbd, 0x52, 0x02, 0x20,
126        0x6d, 0x0e, 0x9c, 0x96, 0xfe, 0x88, 0xd4, 0xa0,
127        0xf0, 0x1e, 0xd9, 0xde, 0xda, 0xe2, 0xb6, 0xf9,
128        0xe0, 0x0d, 0xa9, 0x4c, 0xad, 0x0f, 0xec, 0xaa,
129        0xe6, 0x6e, 0xcf, 0x68, 0x9b, 0xf7, 0x1b, 0x50
130    ];
131
132    // Helper functions that checks parsed versions of the messages from the byte arrays above
133    fn check_version_msg(msg: &RawNetworkMessage) {
134        assert_eq!(msg.magic, 0xd9b4bef9);
135        if let NetworkMessage::Version(ref version_msg) = msg.payload {
136            assert_eq!(version_msg.version, 70015);
137            assert_eq!(version_msg.services, ServiceFlags::NETWORK | ServiceFlags::BLOOM | ServiceFlags::WITNESS | ServiceFlags::NETWORK_LIMITED);
138            assert_eq!(version_msg.timestamp, 1548554224);
139            assert_eq!(version_msg.nonce, 13952548347456104954);
140            assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
141            assert_eq!(version_msg.start_height, 560275);
142            assert_eq!(version_msg.relay, true);
143        } else {
144            panic!("Wrong message type: expected VersionMessage");
145        }
146    }
147
148    fn check_alert_msg(msg: &RawNetworkMessage) {
149        assert_eq!(msg.magic, 0xd9b4bef9);
150        if let NetworkMessage::Alert(ref alert) = msg.payload {
151            assert_eq!(alert.clone(), [
152                0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
153                0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
154                0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
155                0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
156                0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
157                0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
158                0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
159                0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
160                0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
161                0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
162                0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
163                0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
164                0x00,
165            ].to_vec());
166        } else {
167            panic!("Wrong message type: expected AlertMessage");
168        }
169    }
170
171    #[test]
172    fn read_singlemsg_test() {
173        let stream = MSG_VERSION[..].to_vec();
174        let stream = stream.as_slice();
175        let message = StreamReader::new(stream, None).read_next().unwrap();
176        check_version_msg(&message);
177    }
178
179    #[test]
180    fn read_doublemsgs_test() {
181        let mut stream = MSG_VERSION.to_vec();
182        stream.extend(&MSG_PING);
183        let stream = stream.as_slice();
184        let mut reader = StreamReader::new(stream, None);
185        let message = reader.read_next().unwrap();
186        check_version_msg(&message);
187
188        let msg: RawNetworkMessage = reader.read_next().unwrap();
189        assert_eq!(msg.magic, 0xd9b4bef9);
190        if let NetworkMessage::Ping(nonce) = msg.payload {
191            assert_eq!(nonce, 100);
192        } else {
193            panic!("Wrong message type, expected PingMessage");
194        }
195    }
196
197    // Helper function that set ups emulation of client-server TCP connection for
198    // testing message transfer via TCP packets
199    fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, BufReader<TcpStream>) {
200        // 1. Creating server part (emulating Bitcoin Core node)
201        let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap();
202        let port = listener.local_addr().unwrap().port();
203        // 2. Spawning thread that will be writing our messages to the TCP Stream at the server side
204        // in async mode
205        let handle = thread::spawn(move || {
206            if let Some( ostream) = listener.incoming().next() {
207                let mut ostream = ostream.unwrap();
208
209                for piece in pieces {
210                    ostream.write_all(&piece[..]).unwrap();
211                    ostream.flush().unwrap();
212                    thread::sleep(Duration::from_secs(1));
213                }
214                ostream.shutdown(Shutdown::Both).unwrap();
215            }
216        });
217
218        // 3. Creating client side of the TCP socket connection
219        thread::sleep(Duration::from_secs(1));
220        let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
221        let reader = BufReader::new(istream);
222
223        (handle, reader)
224    }
225
226    #[test]
227    fn read_multipartmsg_test() {
228        // Setting up TCP connection emulation
229        let (handle, istream) = serve_tcp(vec![
230            // single message split in two parts to emulate real network conditions
231            MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
232        ]);
233        let stream = istream;
234        let mut reader = StreamReader::new(stream, None);
235
236        // Reading and checking the whole message back
237        let message = reader.read_next().unwrap();
238        check_version_msg(&message);
239
240        // Waiting TCP server thread to terminate
241        handle.join().unwrap();
242    }
243
244    #[test]
245    fn read_sequencemsg_test() {
246        // Setting up TCP connection emulation
247        let (handle, istream) = serve_tcp(vec![
248            // Real-world Bitcoin core communication case for /Satoshi:0.17.1/
249            MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(),
250            MSG_VERACK.to_vec(),
251            MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
252        ]);
253        let stream = istream;
254        let mut reader = StreamReader::new(stream, None);
255
256        // Reading and checking the first message (Version)
257        let message = reader.read_next().unwrap();
258        check_version_msg(&message);
259
260        // Reading and checking the second message (Verack)
261        let msg: RawNetworkMessage = reader.read_next().unwrap();
262        assert_eq!(msg.magic, 0xd9b4bef9);
263        assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage");
264
265        // Reading and checking the third message (Alert)
266        let msg = reader.read_next().unwrap();
267        check_alert_msg(&msg);
268
269        // Waiting TCP server thread to terminate
270        handle.join().unwrap();
271    }
272
273    #[test]
274    fn read_block_from_file_test() {
275        use io;
276        use consensus::serialize;
277        use hashes::hex::FromHex;
278        use Block;
279
280        let normal_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac00000000").unwrap();
281        let cutoff_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac").unwrap();
282        let prevhash = Vec::from_hex("4ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000").unwrap();
283        let merkle = Vec::from_hex("bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914c").unwrap();
284
285        let stream = io::BufReader::new(&normal_data[..]);
286        let mut reader = StreamReader::new(stream, None);
287        let normal_block = reader.read_next::<Block>();
288
289        let stream = io::BufReader::new(&cutoff_data[..]);
290        let mut reader = StreamReader::new(stream, None);
291        let cutoff_block = reader.read_next::<Block>();
292
293        assert!(normal_block.is_ok());
294        assert!(cutoff_block.is_err());
295        let block = normal_block.unwrap();
296        assert_eq!(block.header.version, 1);
297        assert_eq!(serialize(&block.header.prev_blockhash), prevhash);
298        assert_eq!(serialize(&block.header.merkle_root), merkle);
299        assert_eq!(block.header.time, 1231965655);
300        assert_eq!(block.header.bits, 486604799);
301        assert_eq!(block.header.nonce, 2067413810);
302
303        // should be also ok for a non-witness block as commitment is optional in that case
304        assert!(block.check_witness_commitment());
305    }
306
307    #[test]
308    fn parse_multipartmsg_test() {
309        let mut multi = MSG_ALERT.to_vec();
310        multi.extend(&MSG_ALERT[..]);
311        let mut reader = StreamReader::new(&multi[..], None);
312        let message: Result<RawNetworkMessage, _> = reader.read_next();
313        assert!(message.is_ok());
314        check_alert_msg(&message.unwrap());
315        let message: Result<RawNetworkMessage, _> = reader.read_next();
316        assert!(message.is_ok());
317        check_alert_msg(&message.unwrap());
318    }
319}