1use std::fmt;
24use std::io::{self, Read};
25
26use consensus::{encode, Decodable};
27
28pub struct StreamReader<R: Read> {
30 pub stream: R,
32 data: Vec<u8>,
34 unparsed: Vec<u8>
36}
37
38impl<R: Read> fmt::Debug for StreamReader<R> {
39 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
40 write!(f, "StreamReader with buffer_size={} and unparsed content {:?}",
41 self.data.capacity(), self.unparsed)
42 }
43}
44
45impl<R: Read> StreamReader<R> {
46 pub fn new(stream: R, buffer_size: Option<usize>) -> StreamReader<R> {
49 StreamReader {
50 stream,
51 data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
52 unparsed: vec![]
53 }
54 }
55
56 pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
59 loop {
60 match encode::deserialize_partial::<D>(&self.unparsed) {
61 Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => {
63 let count = self.stream.read(&mut self.data)?;
64 if count > 0 {
65 self.unparsed.extend(self.data[0..count].iter());
66 }
67 else {
68 return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof)));
69 }
70 },
71 Err(err) => return Err(err),
72 Ok((message, index)) => {
74 self.unparsed.drain(..index);
75 return Ok(message)
76 },
77 }
78 }
79 }
80}
81
82#[cfg(test)]
83mod test {
84 use std::thread;
85 use std::time::Duration;
86 use std::io::{self, BufReader, Write};
87 use std::net::{TcpListener, TcpStream, Shutdown};
88 use std::thread::JoinHandle;
89 use network::constants::ServiceFlags;
90
91 use super::StreamReader;
92 use network::message::{NetworkMessage, RawNetworkMessage};
93
94 const MSG_VERSION: [u8; 126] = [
97 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
98 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
99 0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
100 0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
101 0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
102 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
103 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
104 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
105 0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
106 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
107 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
108 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
109 0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
110 0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
111 0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
112 0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01
113 ];
114
115 const MSG_VERACK: [u8; 24] = [
116 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61,
117 0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
118 0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2
119 ];
120
121 const MSG_PING: [u8; 32] = [
122 0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67,
123 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
124 0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d,
125 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
126 ];
127
128 const MSG_ALERT: [u8; 192] = [
129 0xf9, 0xbe, 0xb4, 0xd9, 0x61, 0x6c, 0x65, 0x72,
130 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
131 0xa8, 0x00, 0x00, 0x00, 0x1b, 0xf9, 0xaa, 0xea,
132 0x60, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
133 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
134 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
135 0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
136 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
137 0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
138 0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
139 0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
140 0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
141 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
142 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
143 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
144 0x00, 0x46, 0x30, 0x44, 0x02, 0x20, 0x65, 0x3f,
145 0xeb, 0xd6, 0x41, 0x0f, 0x47, 0x0f, 0x6b, 0xae,
146 0x11, 0xca, 0xd1, 0x9c, 0x48, 0x41, 0x3b, 0xec,
147 0xb1, 0xac, 0x2c, 0x17, 0xf9, 0x08, 0xfd, 0x0f,
148 0xd5, 0x3b, 0xdc, 0x3a, 0xbd, 0x52, 0x02, 0x20,
149 0x6d, 0x0e, 0x9c, 0x96, 0xfe, 0x88, 0xd4, 0xa0,
150 0xf0, 0x1e, 0xd9, 0xde, 0xda, 0xe2, 0xb6, 0xf9,
151 0xe0, 0x0d, 0xa9, 0x4c, 0xad, 0x0f, 0xec, 0xaa,
152 0xe6, 0x6e, 0xcf, 0x68, 0x9b, 0xf7, 0x1b, 0x50
153 ];
154
155 fn check_version_msg(msg: &RawNetworkMessage) {
157 assert_eq!(msg.magic, 0xd9b4bef9);
158 if let NetworkMessage::Version(ref version_msg) = msg.payload {
159 assert_eq!(version_msg.version, 70015);
160 assert_eq!(version_msg.services, ServiceFlags::NETWORK | ServiceFlags::BLOOM | ServiceFlags::WITNESS | ServiceFlags::NETWORK_LIMITED);
161 assert_eq!(version_msg.timestamp, 1548554224);
162 assert_eq!(version_msg.nonce, 13952548347456104954);
163 assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
164 assert_eq!(version_msg.start_height, 560275);
165 assert_eq!(version_msg.relay, true);
166 } else {
167 panic!("Wrong message type: expected VersionMessage");
168 }
169 }
170
171 fn check_alert_msg(msg: &RawNetworkMessage) {
172 assert_eq!(msg.magic, 0xd9b4bef9);
173 if let NetworkMessage::Alert(ref alert) = msg.payload {
174 assert_eq!(alert.clone(), [
175 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
176 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
177 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
178 0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
179 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
180 0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
181 0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
182 0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
183 0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
184 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
185 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
186 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
187 0x00,
188 ].to_vec());
189 } else {
190 panic!("Wrong message type: expected AlertMessage");
191 }
192 }
193
194 #[test]
195 fn parse_multipartmsg_test() {
196 let stream = io::empty();
197 let mut reader = StreamReader::new(stream, None);
198 reader.unparsed = MSG_ALERT[..24].to_vec();
199 let message: Result<RawNetworkMessage, _> = reader.read_next();
200 assert!(message.is_err());
201 assert_eq!(reader.unparsed.len(), 24);
202
203 reader.unparsed = MSG_ALERT.to_vec();
204 let message = reader.read_next().unwrap();
205 assert_eq!(reader.unparsed.len(), 0);
206
207 check_alert_msg(&message);
208 }
209
210 #[test]
211 fn read_singlemsg_test() {
212 let stream = MSG_VERSION[..].to_vec();
213 let stream = stream.as_slice();
214 let message = StreamReader::new(stream, None).read_next().unwrap();
215 check_version_msg(&message);
216 }
217
218 #[test]
219 fn read_doublemsgs_test() {
220 let mut stream = MSG_VERSION.to_vec();
221 stream.extend(&MSG_PING);
222 let stream = stream.as_slice();
223 let mut reader = StreamReader::new(stream, None);
224 let message = reader.read_next().unwrap();
225 check_version_msg(&message);
226
227 let msg: RawNetworkMessage = reader.read_next().unwrap();
228 assert_eq!(msg.magic, 0xd9b4bef9);
229 if let NetworkMessage::Ping(nonce) = msg.payload {
230 assert_eq!(nonce, 100);
231 } else {
232 panic!("Wrong message type, expected PingMessage");
233 }
234 }
235
236 fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, BufReader<TcpStream>) {
239 let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap();
241 let port = listener.local_addr().unwrap().port();
242 let handle = thread::spawn(move || {
245 for ostream in listener.incoming() {
246 let mut ostream = ostream.unwrap();
247
248 for piece in pieces {
249 ostream.write(&piece[..]).unwrap();
250 ostream.flush().unwrap();
251 thread::sleep(Duration::from_secs(1));
252 }
253
254 ostream.shutdown(Shutdown::Both).unwrap();
255 break;
256 }
257 });
258
259 thread::sleep(Duration::from_secs(1));
261 let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
262 let reader = BufReader::new(istream);
263
264 return (handle, reader)
265 }
266
267 #[test]
268 fn read_multipartmsg_test() {
269 let (handle, istream) = serve_tcp(vec![
271 MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
273 ]);
274 let stream = istream;
275 let mut reader = StreamReader::new(stream, None);
276
277 let message = reader.read_next().unwrap();
279 check_version_msg(&message);
280
281 handle.join().unwrap();
283 }
284
285 #[test]
286 fn read_sequencemsg_test() {
287 let (handle, istream) = serve_tcp(vec![
289 MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(),
291 MSG_VERACK.to_vec(),
292 MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
293 ]);
294 let stream = istream;
295 let mut reader = StreamReader::new(stream, None);
296
297 let message = reader.read_next().unwrap();
299 check_version_msg(&message);
300
301 let msg: RawNetworkMessage = reader.read_next().unwrap();
303 assert_eq!(msg.magic, 0xd9b4bef9);
304 assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage");
305
306 let msg = reader.read_next().unwrap();
308 check_alert_msg(&msg);
309
310 handle.join().unwrap();
312 }
313
314 #[test]
315 fn read_block_from_file_test() {
316 use std::io;
317 use consensus::serialize;
318 use hashes::hex::FromHex;
319 use Block;
320
321 let normal_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac00000000").unwrap();
322 let cutoff_data = Vec::from_hex("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac").unwrap();
323 let prevhash = Vec::from_hex("4ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000").unwrap();
324 let merkle = Vec::from_hex("bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914c").unwrap();
325
326 let stream = io::BufReader::new(&normal_data[..]);
327 let mut reader = StreamReader::new(stream, None);
328 let normal_block = reader.read_next::<Block>();
329
330 let stream = io::BufReader::new(&cutoff_data[..]);
331 let mut reader = StreamReader::new(stream, None);
332 let cutoff_block = reader.read_next::<Block>();
333
334 assert!(normal_block.is_ok());
335 assert!(cutoff_block.is_err());
336 let block = normal_block.unwrap();
337 assert_eq!(block.header.version, 1);
338 assert_eq!(serialize(&block.header.prev_blockhash), prevhash);
339 assert_eq!(serialize(&block.header.merkle_root), merkle);
340 assert_eq!(block.header.time, 1231965655);
341 assert_eq!(block.header.bits, 486604799);
342 assert_eq!(block.header.nonce, 2067413810);
343
344 assert!(block.check_witness_commitment());
346 }
347}