1use core::fmt;
26use io::{Read, BufReader};
27
28use consensus::{encode, Decodable};
29
30pub struct StreamReader<R: Read> {
32 pub stream: BufReader<R>,
34}
35
36impl<R: Read> fmt::Debug for StreamReader<R> {
37 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38 write!(f, "StreamReader")
39 }
40}
41
42impl<R: Read> StreamReader<R> {
43 #[deprecated(since = "0.28.0", note = "wrap your stream into a buffered reader if necessary and use consensus_encode directly")]
45 pub fn new(stream: R, _buffer_size: Option<usize>) -> StreamReader<R> {
46 StreamReader {
47 stream: BufReader::new(stream),
48 }
49 }
50
51 #[deprecated(since = "0.28.0", note = "wrap your stream into a buffered reader if necessary and use consensus_encode directly")]
53 pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
54 Decodable::consensus_decode(&mut self.stream)
55 }
56}
57
58#[allow(deprecated)]
59#[cfg(test)]
60mod test {
61 use std::thread;
62 use std::time::Duration;
63 use io::{BufReader, Write};
64 use std::net::{TcpListener, TcpStream, Shutdown};
65 use std::thread::JoinHandle;
66 use network::constants::ServiceFlags;
67
68 use super::StreamReader;
69 use network::message::{NetworkMessage, RawNetworkMessage};
70
71 const MSG_VERSION: [u8; 126] = [
74 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
75 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
76 0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
77 0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
78 0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
79 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
80 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
81 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
82 0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
83 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
84 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
85 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
86 0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
87 0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
88 0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
89 0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01
90 ];
91
92 const MSG_VERACK: [u8; 24] = [
93 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61,
94 0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
95 0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2
96 ];
97
98 const MSG_PING: [u8; 32] = [
99 0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67,
100 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
101 0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d,
102 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
103 ];
104
105 const MSG_ALERT: [u8; 192] = [
106 0xf9, 0xbe, 0xb4, 0xd9, 0x61, 0x6c, 0x65, 0x72,
107 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
108 0xa8, 0x00, 0x00, 0x00, 0x1b, 0xf9, 0xaa, 0xea,
109 0x60, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
110 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
111 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
112 0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
113 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
114 0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
115 0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
116 0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
117 0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
118 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
119 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
120 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
121 0x00, 0x46, 0x30, 0x44, 0x02, 0x20, 0x65, 0x3f,
122 0xeb, 0xd6, 0x41, 0x0f, 0x47, 0x0f, 0x6b, 0xae,
123 0x11, 0xca, 0xd1, 0x9c, 0x48, 0x41, 0x3b, 0xec,
124 0xb1, 0xac, 0x2c, 0x17, 0xf9, 0x08, 0xfd, 0x0f,
125 0xd5, 0x3b, 0xdc, 0x3a, 0xbd, 0x52, 0x02, 0x20,
126 0x6d, 0x0e, 0x9c, 0x96, 0xfe, 0x88, 0xd4, 0xa0,
127 0xf0, 0x1e, 0xd9, 0xde, 0xda, 0xe2, 0xb6, 0xf9,
128 0xe0, 0x0d, 0xa9, 0x4c, 0xad, 0x0f, 0xec, 0xaa,
129 0xe6, 0x6e, 0xcf, 0x68, 0x9b, 0xf7, 0x1b, 0x50
130 ];
131
132 fn check_version_msg(msg: &RawNetworkMessage) {
134 assert_eq!(msg.magic, 0xd9b4bef9);
135 if let NetworkMessage::Version(ref version_msg) = msg.payload {
136 assert_eq!(version_msg.version, 70015);
137 assert_eq!(version_msg.services, ServiceFlags::NETWORK | ServiceFlags::BLOOM | ServiceFlags::WITNESS | ServiceFlags::NETWORK_LIMITED);
138 assert_eq!(version_msg.timestamp, 1548554224);
139 assert_eq!(version_msg.nonce, 13952548347456104954);
140 assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
141 assert_eq!(version_msg.start_height, 560275);
142 assert_eq!(version_msg.relay, true);
143 } else {
144 panic!("Wrong message type: expected VersionMessage");
145 }
146 }
147
148 fn check_alert_msg(msg: &RawNetworkMessage) {
149 assert_eq!(msg.magic, 0xd9b4bef9);
150 if let NetworkMessage::Alert(ref alert) = msg.payload {
151 assert_eq!(alert.clone(), [
152 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
153 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
154 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
155 0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
156 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
157 0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
158 0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
159 0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
160 0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
161 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
162 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
163 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
164 0x00,
165 ].to_vec());
166 } else {
167 panic!("Wrong message type: expected AlertMessage");
168 }
169 }
170
171 #[test]
172 fn read_singlemsg_test() {
173 let stream = MSG_VERSION[..].to_vec();
174 let stream = stream.as_slice();
175 let message = StreamReader::new(stream, None).read_next().unwrap();
176 check_version_msg(&message);
177 }
178
179 #[test]
180 fn read_doublemsgs_test() {
181 let mut stream = MSG_VERSION.to_vec();
182 stream.extend(&MSG_PING);
183 let stream = stream.as_slice();
184 let mut reader = StreamReader::new(stream, None);
185 let message = reader.read_next().unwrap();
186 check_version_msg(&message);
187
188 let msg: RawNetworkMessage = reader.read_next().unwrap();
189 assert_eq!(msg.magic, 0xd9b4bef9);
190 if let NetworkMessage::Ping(nonce) = msg.payload {
191 assert_eq!(nonce, 100);
192 } else {
193 panic!("Wrong message type, expected PingMessage");
194 }
195 }
196
197 fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, BufReader<TcpStream>) {
200 let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap();
202 let port = listener.local_addr().unwrap().port();
203 let handle = thread::spawn(move || {
206 if let Some( ostream) = listener.incoming().next() {
207 let mut ostream = ostream.unwrap();
208
209 for piece in pieces {
210 ostream.write_all(&piece[..]).unwrap();
211 ostream.flush().unwrap();
212 thread::sleep(Duration::from_secs(1));
213 }
214 ostream.shutdown(Shutdown::Both).unwrap();
215 }
216 });
217
218 thread::sleep(Duration::from_secs(1));
220 let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
221 let reader = BufReader::new(istream);
222
223 (handle, reader)
224 }
225
226 #[test]
227 fn read_multipartmsg_test() {
228 let (handle, istream) = serve_tcp(vec![
230 MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
232 ]);
233 let stream = istream;
234 let mut reader = StreamReader::new(stream, None);
235
236 let message = reader.read_next().unwrap();
238 check_version_msg(&message);
239
240 handle.join().unwrap();
242 }
243
244 #[test]
245 fn read_sequencemsg_test() {
246 let (handle, istream) = serve_tcp(vec![
248 MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(),
250 MSG_VERACK.to_vec(),
251 MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
252 ]);
253 let stream = istream;
254 let mut reader = StreamReader::new(stream, None);
255
256 let message = reader.read_next().unwrap();
258 check_version_msg(&message);
259
260 let msg: RawNetworkMessage = reader.read_next().unwrap();
262 assert_eq!(msg.magic, 0xd9b4bef9);
263 assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage");
264
265 let msg = reader.read_next().unwrap();
267 check_alert_msg(&msg);
268
269 handle.join().unwrap();
271 }
272
273 #[test]
274 fn read_block_from_file_test() {
275 use io;
276 use consensus::serialize;
277 use hashes::hex::FromHex;
278 use Block;
279
280 let normal_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac00000000").unwrap();
281 let cutoff_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac").unwrap();
282 let prevhash = Vec::from_hex("4ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000").unwrap();
283 let merkle = Vec::from_hex("bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914c").unwrap();
284
285 let stream = io::BufReader::new(&normal_data[..]);
286 let mut reader = StreamReader::new(stream, None);
287 let normal_block = reader.read_next::<Block>();
288
289 let stream = io::BufReader::new(&cutoff_data[..]);
290 let mut reader = StreamReader::new(stream, None);
291 let cutoff_block = reader.read_next::<Block>();
292
293 assert!(normal_block.is_ok());
294 assert!(cutoff_block.is_err());
295 let block = normal_block.unwrap();
296 assert_eq!(block.header.version, 1);
297 assert_eq!(serialize(&block.header.prev_blockhash), prevhash);
298 assert_eq!(serialize(&block.header.merkle_root), merkle);
299 assert_eq!(block.header.time, 1231965655);
300 assert_eq!(block.header.bits, 486604799);
301 assert_eq!(block.header.nonce, 2067413810);
302
303 assert!(block.check_witness_commitment());
305 }
306
307 #[test]
308 fn parse_multipartmsg_test() {
309 let mut multi = MSG_ALERT.to_vec();
310 multi.extend(&MSG_ALERT[..]);
311 let mut reader = StreamReader::new(&multi[..], None);
312 let message: Result<RawNetworkMessage, _> = reader.read_next();
313 assert!(message.is_ok());
314 check_alert_msg(&message.unwrap());
315 let message: Result<RawNetworkMessage, _> = reader.read_next();
316 assert!(message.is_ok());
317 check_alert_msg(&message.unwrap());
318 }
319}