1use std::convert::TryFrom;
2use std::io;
3use std::io::prelude::*;
4use std::net;
5use std::net::SocketAddrV6;
6
7extern crate byteorder;
8use byteorder::{BigEndian, ByteOrder, LittleEndian, ReadBytesExt};
9
10extern crate ed25519_dalek;
11use ed25519_dalek::PublicKey;
12
13extern crate tokio_util;
14
15extern crate bytes;
16use bytes::{Buf, BufMut, BytesMut};
17
18extern crate nanocurrency_types;
19use nanocurrency_types::*;
20
21#[cfg(test)]
22mod tests;
23
24const NET_VERSION: u8 = 0x12;
25const NET_VERSION_MAX: u8 = 0x12;
26const NET_VERSION_MIN: u8 = 0x01;
27
28const NODE_ID_HANDSHAKE_QUERY_FLAG: u16 = 1 << 0;
29const NODE_ID_HANDSHAKE_RESPONSE_FLAG: u16 = 1 << 1;
30
31trait BufMutExt: BufMut {
32 fn put_i128_le(&mut self, n: i128) {
33 let mut buf = [0u8; 16];
34 LittleEndian::write_i128(&mut buf, n);
35 self.put_slice(&buf)
36 }
37
38 fn put_i128_be(&mut self, n: i128) {
39 let mut buf = [0u8; 16];
40 BigEndian::write_i128(&mut buf, n);
41 self.put_slice(&buf)
42 }
43
44 fn put_u128_le(&mut self, n: u128) {
45 let mut buf = [0u8; 16];
46 LittleEndian::write_u128(&mut buf, n);
47 self.put_slice(&buf)
48 }
49
50 fn put_u128_be(&mut self, n: u128) {
51 let mut buf = [0u8; 16];
52 BigEndian::write_u128(&mut buf, n);
53 self.put_slice(&buf)
54 }
55}
56
57impl BufMutExt for BytesMut {}
58
59#[allow(dead_code)]
62#[derive(PartialEq, Eq, Clone, Debug)]
63pub struct MessageHeader {
64 pub network: Network,
65 pub version_max: u8,
66 pub version: u8,
67 pub version_min: u8,
68 pub extensions: u16,
69}
70
71#[derive(Debug, PartialEq, Clone)]
72pub enum Message {
73 Keepalive([SocketAddrV6; 8]),
74 Publish(Block),
75 ConfirmReq(Block),
76 ConfirmReqHashes(Vec<(BlockHash, [u8; 32])>),
78 ConfirmAck(Vote),
79 NodeIdHandshake(Option<[u8; 32]>, Option<(PublicKey, Signature)>),
80 TelemetryReq,
81 }
83
84pub struct NanoCurrencyCodec(pub Network);
85
86impl NanoCurrencyCodec {
87 pub fn read_block<C: io::Read>(cursor: &mut C, block_ty: u8) -> io::Result<Block> {
88 let inner = match block_ty {
89 2 => {
90 let mut previous = BlockHash::default();
92 cursor.read_exact(&mut previous.0)?;
93 let mut destination = Account::default();
94 cursor.read_exact(&mut destination.0)?;
95 let balance = cursor.read_u128::<BigEndian>()?;
96 BlockInner::Send {
97 previous,
98 destination,
99 balance,
100 }
101 }
102 3 => {
103 let mut previous = BlockHash::default();
105 cursor.read_exact(&mut previous.0)?;
106 let mut source = BlockHash::default();
107 cursor.read_exact(&mut source.0)?;
108 BlockInner::Receive { previous, source }
109 }
110 4 => {
111 let mut source = BlockHash::default();
113 cursor.read_exact(&mut source.0)?;
114 let mut representative = Account::default();
115 cursor.read_exact(&mut representative.0)?;
116 let mut account = Account::default();
117 cursor.read_exact(&mut account.0)?;
118 BlockInner::Open {
119 source,
120 representative,
121 account,
122 }
123 }
124 5 => {
125 let mut previous = BlockHash::default();
127 cursor.read_exact(&mut previous.0)?;
128 let mut representative = Account::default();
129 cursor.read_exact(&mut representative.0)?;
130 BlockInner::Change {
131 previous,
132 representative,
133 }
134 }
135 6 => {
136 let mut account = Account::default();
138 cursor.read_exact(&mut account.0)?;
139 let mut previous = BlockHash::default();
140 cursor.read_exact(&mut previous.0)?;
141 let mut representative = Account::default();
142 cursor.read_exact(&mut representative.0)?;
143 let balance = cursor.read_u128::<BigEndian>()?;
144 let mut link = [0u8; 32];
145 cursor.read_exact(&mut link)?;
146 BlockInner::State {
147 account,
148 previous,
149 representative,
150 balance,
151 link,
152 }
153 }
154 _ => {
155 return Err(io::Error::new(
156 io::ErrorKind::Other,
157 "unrecognized block type",
158 ))
159 }
160 };
161 let mut signature = [0u8; 64];
162 cursor.read_exact(&mut signature)?;
163 let signature = Signature::from_bytes(&signature)
164 .map_err(|_| io::Error::new(io::ErrorKind::Other, "bad signature"))?;
165 let work;
166 if block_ty >= 6 {
167 work = cursor.read_u64::<BigEndian>()?;
169 } else {
170 work = cursor.read_u64::<LittleEndian>()?;
171 }
172 let header = BlockHeader { signature, work };
173 Ok(Block { header, inner })
174 }
175
176 pub fn block_type_num(ty: BlockType) -> u8 {
177 match ty {
178 BlockType::Send => 2,
179 BlockType::Receive => 3,
180 BlockType::Open => 4,
181 BlockType::Change => 5,
182 BlockType::State => 6,
183 }
184 }
185
186 pub fn write_block(buf: &mut BytesMut, block: Block) {
188 buf.reserve(block.size());
189 let mut work_big_endian = false;
190 match block.inner {
191 BlockInner::Send {
192 previous,
193 destination,
194 balance,
195 } => {
196 buf.put_slice(&previous.0);
197 buf.put_slice(&destination.0);
198 buf.put_u128_be(balance);
199 }
200 BlockInner::Receive { previous, source } => {
201 buf.put_slice(&previous.0);
202 buf.put_slice(&source.0);
203 }
204 BlockInner::Open {
205 source,
206 representative,
207 account,
208 } => {
209 buf.put_slice(&source.0);
210 buf.put_slice(&representative.0);
211 buf.put_slice(&account.0);
212 }
213 BlockInner::Change {
214 previous,
215 representative,
216 } => {
217 buf.put_slice(&previous.0);
218 buf.put_slice(&representative.0);
219 }
220 BlockInner::State {
221 account,
222 previous,
223 representative,
224 balance,
225 link,
226 } => {
227 buf.put_slice(&account.0);
228 buf.put_slice(&previous.0);
229 buf.put_slice(&representative.0);
230 buf.put_u128(balance);
231 buf.put_slice(&link as &[u8]);
232 work_big_endian = true;
233 }
234 };
235 buf.put_slice(&block.header.signature.to_bytes() as &[u8]);
236 if work_big_endian {
237 buf.put_u64(block.header.work);
238 } else {
239 buf.put_u64_le(block.header.work);
240 }
241 }
242
243 pub fn network_magic_byte(network: Network) -> u8 {
244 match network {
245 Network::Test => b'A',
246 Network::Beta => b'B',
247 Network::Live => b'C',
248 }
249 }
250
251 fn decode_inner<R: Read>(&self, mut cursor: R) -> io::Result<(MessageHeader, Message)> {
252 if cursor.read_u8()? != b'R' {
253 return Err(io::Error::new(io::ErrorKind::Other, "invalid magic number"));
254 }
255 let network = match cursor.read_u8()? {
256 b'A' => Network::Test,
257 b'B' => Network::Beta,
258 b'C' => Network::Live,
259 _ => {
260 return Err(io::Error::new(
261 io::ErrorKind::Other,
262 "invalid network indicator",
263 ));
264 }
265 };
266 if network != self.0 {
267 return Err(io::Error::new(io::ErrorKind::Other, "different network"));
268 }
269 let version_max = cursor.read_u8()?;
270 let version = cursor.read_u8()?;
271 let version_min = cursor.read_u8()?;
272 let msg_type = cursor.read_u8()?;
273 let extensions = cursor.read_u16::<LittleEndian>()?;
274 if version_min > NET_VERSION_MAX || version_max < NET_VERSION_MIN {
275 return Err(io::Error::new(
276 io::ErrorKind::Other,
277 "unsupported peer version",
278 ));
279 }
280 let header = MessageHeader {
281 network,
282 version_max,
283 version,
284 version_min,
285 extensions,
286 };
287 let message = match msg_type {
288 2 => {
289 let mut peers = [zero_v6_addr!(); 8];
291 let _ = (|| -> io::Result<()> {
292 for peer in peers.iter_mut() {
293 let mut ip_bytes: [u8; 16] = [0; 16];
294 for byte in ip_bytes.iter_mut() {
295 *byte = cursor.read_u8()?;
296 }
297 let port = cursor.read_u16::<LittleEndian>()?;
298 *peer = SocketAddrV6::new(net::Ipv6Addr::from(ip_bytes), port, 0, 0);
299 }
300 Ok(())
301 })();
302 Message::Keepalive(peers)
303 }
304 3 => {
305 let ty = (header.extensions & 0x0f00) >> 8;
307 Message::Publish(Self::read_block(&mut cursor, ty as u8)?)
308 }
309 4 => {
310 let ty = (header.extensions & 0x0f00) >> 8;
312 if ty == 1 {
313 let count = usize::from((header.extensions & 0xf000) >> 12);
314 let mut hashes = Vec::with_capacity(count);
315 for _ in 0..count {
316 let mut hash = BlockHash::default();
317 let mut root = [0u8; 32];
318 cursor.read_exact(&mut hash.0)?;
319 cursor.read_exact(&mut root)?;
320 if hash == BlockHash::default() && root == [0; 32] {
321 return Err(io::Error::new(
322 io::ErrorKind::Other,
323 "zero hash and root requested in confirm_req",
324 ));
325 }
326 hashes.push((hash, root));
327 }
328 Message::ConfirmReqHashes(hashes)
329 } else {
330 Message::ConfirmReq(Self::read_block(&mut cursor, ty as u8)?)
331 }
332 }
333 5 => {
334 let ty = (header.extensions & 0x0f00) >> 8;
336 let mut account = Account::default();
337 cursor.read_exact(&mut account.0)?;
338 let mut signature = [0u8; 64];
339 cursor.read_exact(&mut signature)?;
340 let signature = Signature::from_bytes(&signature).unwrap();
341 let sequence = cursor.read_u64::<LittleEndian>()?;
342 let inner = if ty == 1 {
343 let count = usize::from((header.extensions & 0xf000) >> 12);
344 let mut hashes = Vec::with_capacity(count);
345 for _ in 0..count {
346 let mut hash = BlockHash::default();
347 cursor.read_exact(&mut hash.0)?;
348 hashes.push(hash);
349 }
350 VoteInner::Hashes(hashes)
351 } else {
352 let block = Self::read_block(&mut cursor, ty as u8)?;
353 VoteInner::Block(block)
354 };
355 Message::ConfirmAck(Vote {
356 account,
357 signature,
358 sequence,
359 inner,
360 })
361 }
362 10 => {
363 let query = if header.extensions & NODE_ID_HANDSHAKE_QUERY_FLAG != 0 {
365 let mut query = [0u8; 32];
366 cursor.read_exact(&mut query)?;
367 Some(query)
368 } else {
369 None
370 };
371 let response = if header.extensions & NODE_ID_HANDSHAKE_RESPONSE_FLAG != 0 {
372 let mut pubkey = [0u8; 32];
373 cursor.read_exact(&mut pubkey)?;
374 let pubkey = PublicKey::from_bytes(&pubkey)
375 .map_err(|_| io::Error::new(io::ErrorKind::Other, "bad pubkey"))?;
376 let mut signature = [0u8; 64];
377 cursor.read_exact(&mut signature)?;
378 let signature = Signature::from_bytes(&signature)
379 .map_err(|_| io::Error::new(io::ErrorKind::Other, "bad signature"))?;
380 Some((pubkey, signature))
381 } else {
382 None
383 };
384 Message::NodeIdHandshake(query, response)
385 }
386 12 => {
387 Message::TelemetryReq
389 }
390 6 | 7 | 8 => {
391 return Err(io::Error::new(
392 io::ErrorKind::Other,
393 "bootstrap message sent over UDP",
394 ))
395 }
396 x => {
397 return Err(io::Error::new(
398 io::ErrorKind::Other,
399 format!("unrecognized message type {}", x),
400 ))
401 }
402 };
403 Ok((header, message))
404 }
405}
406
407impl tokio_util::codec::Decoder for NanoCurrencyCodec {
421 type Item = (MessageHeader, Message);
422 type Error = io::Error;
423
424 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
425 let mut cursor = io::Cursor::new(&buf);
426 match self.decode_inner(&mut cursor) {
427 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
428 Err(err) => Err(err),
429 Ok(message) => {
430 let read = cursor.position() as usize;
431 buf.advance(read);
432 Ok(Some(message))
433 }
434 }
435 }
436}
437
438impl tokio_util::codec::Encoder<Message> for NanoCurrencyCodec {
439 type Error = io::Error;
440
441 fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> io::Result<()> {
442 buf.reserve(8); buf.put_slice(&[
444 b'R',
445 Self::network_magic_byte(self.0),
446 NET_VERSION_MAX,
447 NET_VERSION,
448 NET_VERSION_MIN,
449 ]);
450 match msg {
451 Message::Keepalive(peers) => {
452 buf.put_slice(&[2]);
453 buf.put_slice(&[0, 0]); buf.reserve(peers.len() * (16 + 2));
455 for peer in peers.iter() {
456 buf.put_slice(&peer.ip().octets());
457 buf.put_u16_le(peer.port());
458 }
459 }
460 Message::Publish(block) => {
461 buf.put_slice(&[3]);
462 let type_num = Self::block_type_num(block.ty()) as u16;
463 buf.put_u16_le((type_num & 0x0f) << 8);
464 Self::write_block(buf, block);
465 }
466 Message::ConfirmReq(block) => {
467 buf.put_slice(&[4]);
468 let type_num = Self::block_type_num(block.ty()) as u16;
469 buf.put_u16_le((type_num & 0x0f) << 8);
470 Self::write_block(buf, block);
471 }
472 Message::ConfirmReqHashes(hashes) => {
473 let hashes_len = match u16::try_from(hashes.len()) {
474 Ok(x) if x < 16 => x,
475 _ => {
476 return Err(io::Error::new(
477 io::ErrorKind::Other,
478 "attempted to send a confirm_req with more than 16 hashes and roots",
479 ));
480 }
481 };
482 buf.put_slice(&[4]);
483 buf.put_u16_le((1 << 8) | (hashes_len << 12));
484 buf.reserve(hashes.len() * 64);
485 for (hash, root) in hashes {
486 buf.put_slice(&hash.0);
487 buf.put_slice(&root);
488 }
489 }
490 Message::ConfirmAck(Vote {
491 account,
492 signature,
493 sequence,
494 inner,
495 }) => {
496 buf.put_slice(&[5]);
497 match inner {
498 VoteInner::Block(block) => {
499 let type_num = Self::block_type_num(block.ty()) as u16;
500 buf.put_u16_le((type_num & 0x0f) << 8);
501 buf.reserve(32 + 64 + 8);
502 buf.put_slice(&account.0);
503 buf.put_slice(&signature.to_bytes());
504 buf.put_u64_le(sequence);
505 Self::write_block(buf, block);
506 }
507 VoteInner::Hashes(hashes) => {
508 let hashes_len = match u16::try_from(hashes.len()) {
509 Ok(x) if x < 16 => x,
510 _ => {
511 return Err(io::Error::new(
512 io::ErrorKind::Other,
513 "attempted to send a vote with more than 16 hashes",
514 ));
515 }
516 };
517 buf.put_u16_le((1 << 8) | (hashes_len << 12));
518 buf.reserve(32 + 64 + 8 + (hashes.len() * 32));
519 buf.put_slice(&account.0);
520 buf.put_slice(&signature.to_bytes());
521 buf.put_u64_le(sequence);
522 for hash in hashes {
523 buf.put_slice(&hash.0);
524 }
525 }
526 }
527 }
528 Message::NodeIdHandshake(query, response) => {
529 buf.put_slice(&[10]);
530 let mut flags = 0;
531 let mut len = 0;
532 if query.is_some() {
533 flags |= NODE_ID_HANDSHAKE_QUERY_FLAG;
534 len += 32;
535 }
536 if response.is_some() {
537 flags |= NODE_ID_HANDSHAKE_RESPONSE_FLAG;
538 len += 32 + 64;
539 }
540 buf.put_u16_le(flags);
541 buf.reserve(len);
542 if let Some(query) = query {
543 buf.put_slice(&query);
544 }
545 if let Some(response) = response {
546 buf.put_slice(&response.0.to_bytes());
547 buf.put_slice(&response.1.to_bytes());
548 }
549 }
550 Message::TelemetryReq => {
551 buf.put_slice(&[12]);
552 buf.put_slice(&[0, 0]);
553 }
554 }
555 Ok(())
556 }
557}