nanocurrency_protocol/
lib.rs

1use std::convert::TryFrom;
2use std::io;
3use std::io::prelude::*;
4use std::net;
5use std::net::SocketAddrV6;
6
7extern crate byteorder;
8use byteorder::{BigEndian, ByteOrder, LittleEndian, ReadBytesExt};
9
10extern crate ed25519_dalek;
11use ed25519_dalek::PublicKey;
12
13extern crate tokio_util;
14
15extern crate bytes;
16use bytes::{Buf, BufMut, BytesMut};
17
18extern crate nanocurrency_types;
19use nanocurrency_types::*;
20
21#[cfg(test)]
22mod tests;
23
24const NET_VERSION: u8 = 0x12;
25const NET_VERSION_MAX: u8 = 0x12;
26const NET_VERSION_MIN: u8 = 0x01;
27
28const NODE_ID_HANDSHAKE_QUERY_FLAG: u16 = 1 << 0;
29const NODE_ID_HANDSHAKE_RESPONSE_FLAG: u16 = 1 << 1;
30
31trait BufMutExt: BufMut {
32    fn put_i128_le(&mut self, n: i128) {
33        let mut buf = [0u8; 16];
34        LittleEndian::write_i128(&mut buf, n);
35        self.put_slice(&buf)
36    }
37
38    fn put_i128_be(&mut self, n: i128) {
39        let mut buf = [0u8; 16];
40        BigEndian::write_i128(&mut buf, n);
41        self.put_slice(&buf)
42    }
43
44    fn put_u128_le(&mut self, n: u128) {
45        let mut buf = [0u8; 16];
46        LittleEndian::write_u128(&mut buf, n);
47        self.put_slice(&buf)
48    }
49
50    fn put_u128_be(&mut self, n: u128) {
51        let mut buf = [0u8; 16];
52        BigEndian::write_u128(&mut buf, n);
53        self.put_slice(&buf)
54    }
55}
56
57impl BufMutExt for BytesMut {}
58
59// Note: this does not include the message type.
60// That's wrapped into the Message enum.
61#[allow(dead_code)]
62#[derive(PartialEq, Eq, Clone, Debug)]
63pub struct MessageHeader {
64    pub network: Network,
65    pub version_max: u8,
66    pub version: u8,
67    pub version_min: u8,
68    pub extensions: u16,
69}
70
71#[derive(Debug, PartialEq, Clone)]
72pub enum Message {
73    Keepalive([SocketAddrV6; 8]),
74    Publish(Block),
75    ConfirmReq(Block),
76    /// Contains an array of (hash, root)
77    ConfirmReqHashes(Vec<(BlockHash, [u8; 32])>),
78    ConfirmAck(Vote),
79    NodeIdHandshake(Option<[u8; 32]>, Option<(PublicKey, Signature)>),
80    TelemetryReq,
81    // TODO TelemetryAck
82}
83
84pub struct NanoCurrencyCodec(pub Network);
85
86impl NanoCurrencyCodec {
87    pub fn read_block<C: io::Read>(cursor: &mut C, block_ty: u8) -> io::Result<Block> {
88        let inner = match block_ty {
89            2 => {
90                // send
91                let mut previous = BlockHash::default();
92                cursor.read_exact(&mut previous.0)?;
93                let mut destination = Account::default();
94                cursor.read_exact(&mut destination.0)?;
95                let balance = cursor.read_u128::<BigEndian>()?;
96                BlockInner::Send {
97                    previous,
98                    destination,
99                    balance,
100                }
101            }
102            3 => {
103                // receieve
104                let mut previous = BlockHash::default();
105                cursor.read_exact(&mut previous.0)?;
106                let mut source = BlockHash::default();
107                cursor.read_exact(&mut source.0)?;
108                BlockInner::Receive { previous, source }
109            }
110            4 => {
111                // open
112                let mut source = BlockHash::default();
113                cursor.read_exact(&mut source.0)?;
114                let mut representative = Account::default();
115                cursor.read_exact(&mut representative.0)?;
116                let mut account = Account::default();
117                cursor.read_exact(&mut account.0)?;
118                BlockInner::Open {
119                    source,
120                    representative,
121                    account,
122                }
123            }
124            5 => {
125                // change
126                let mut previous = BlockHash::default();
127                cursor.read_exact(&mut previous.0)?;
128                let mut representative = Account::default();
129                cursor.read_exact(&mut representative.0)?;
130                BlockInner::Change {
131                    previous,
132                    representative,
133                }
134            }
135            6 => {
136                // state
137                let mut account = Account::default();
138                cursor.read_exact(&mut account.0)?;
139                let mut previous = BlockHash::default();
140                cursor.read_exact(&mut previous.0)?;
141                let mut representative = Account::default();
142                cursor.read_exact(&mut representative.0)?;
143                let balance = cursor.read_u128::<BigEndian>()?;
144                let mut link = [0u8; 32];
145                cursor.read_exact(&mut link)?;
146                BlockInner::State {
147                    account,
148                    previous,
149                    representative,
150                    balance,
151                    link,
152                }
153            }
154            _ => {
155                return Err(io::Error::new(
156                    io::ErrorKind::Other,
157                    "unrecognized block type",
158                ))
159            }
160        };
161        let mut signature = [0u8; 64];
162        cursor.read_exact(&mut signature)?;
163        let signature = Signature::from_bytes(&signature)
164            .map_err(|_| io::Error::new(io::ErrorKind::Other, "bad signature"))?;
165        let work;
166        if block_ty >= 6 {
167            // New block types have work in big endian
168            work = cursor.read_u64::<BigEndian>()?;
169        } else {
170            work = cursor.read_u64::<LittleEndian>()?;
171        }
172        let header = BlockHeader { signature, work };
173        Ok(Block { header, inner })
174    }
175
176    pub fn block_type_num(ty: BlockType) -> u8 {
177        match ty {
178            BlockType::Send => 2,
179            BlockType::Receive => 3,
180            BlockType::Open => 4,
181            BlockType::Change => 5,
182            BlockType::State => 6,
183        }
184    }
185
186    /// Does NOT include block type
187    pub fn write_block(buf: &mut BytesMut, block: Block) {
188        buf.reserve(block.size());
189        let mut work_big_endian = false;
190        match block.inner {
191            BlockInner::Send {
192                previous,
193                destination,
194                balance,
195            } => {
196                buf.put_slice(&previous.0);
197                buf.put_slice(&destination.0);
198                buf.put_u128_be(balance);
199            }
200            BlockInner::Receive { previous, source } => {
201                buf.put_slice(&previous.0);
202                buf.put_slice(&source.0);
203            }
204            BlockInner::Open {
205                source,
206                representative,
207                account,
208            } => {
209                buf.put_slice(&source.0);
210                buf.put_slice(&representative.0);
211                buf.put_slice(&account.0);
212            }
213            BlockInner::Change {
214                previous,
215                representative,
216            } => {
217                buf.put_slice(&previous.0);
218                buf.put_slice(&representative.0);
219            }
220            BlockInner::State {
221                account,
222                previous,
223                representative,
224                balance,
225                link,
226            } => {
227                buf.put_slice(&account.0);
228                buf.put_slice(&previous.0);
229                buf.put_slice(&representative.0);
230                buf.put_u128(balance);
231                buf.put_slice(&link as &[u8]);
232                work_big_endian = true;
233            }
234        };
235        buf.put_slice(&block.header.signature.to_bytes() as &[u8]);
236        if work_big_endian {
237            buf.put_u64(block.header.work);
238        } else {
239            buf.put_u64_le(block.header.work);
240        }
241    }
242
243    pub fn network_magic_byte(network: Network) -> u8 {
244        match network {
245            Network::Test => b'A',
246            Network::Beta => b'B',
247            Network::Live => b'C',
248        }
249    }
250
251    fn decode_inner<R: Read>(&self, mut cursor: R) -> io::Result<(MessageHeader, Message)> {
252        if cursor.read_u8()? != b'R' {
253            return Err(io::Error::new(io::ErrorKind::Other, "invalid magic number"));
254        }
255        let network = match cursor.read_u8()? {
256            b'A' => Network::Test,
257            b'B' => Network::Beta,
258            b'C' => Network::Live,
259            _ => {
260                return Err(io::Error::new(
261                    io::ErrorKind::Other,
262                    "invalid network indicator",
263                ));
264            }
265        };
266        if network != self.0 {
267            return Err(io::Error::new(io::ErrorKind::Other, "different network"));
268        }
269        let version_max = cursor.read_u8()?;
270        let version = cursor.read_u8()?;
271        let version_min = cursor.read_u8()?;
272        let msg_type = cursor.read_u8()?;
273        let extensions = cursor.read_u16::<LittleEndian>()?;
274        if version_min > NET_VERSION_MAX || version_max < NET_VERSION_MIN {
275            return Err(io::Error::new(
276                io::ErrorKind::Other,
277                "unsupported peer version",
278            ));
279        }
280        let header = MessageHeader {
281            network,
282            version_max,
283            version,
284            version_min,
285            extensions,
286        };
287        let message = match msg_type {
288            2 => {
289                // keepalive
290                let mut peers = [zero_v6_addr!(); 8];
291                let _ = (|| -> io::Result<()> {
292                    for peer in peers.iter_mut() {
293                        let mut ip_bytes: [u8; 16] = [0; 16];
294                        for byte in ip_bytes.iter_mut() {
295                            *byte = cursor.read_u8()?;
296                        }
297                        let port = cursor.read_u16::<LittleEndian>()?;
298                        *peer = SocketAddrV6::new(net::Ipv6Addr::from(ip_bytes), port, 0, 0);
299                    }
300                    Ok(())
301                })();
302                Message::Keepalive(peers)
303            }
304            3 => {
305                // publish
306                let ty = (header.extensions & 0x0f00) >> 8;
307                Message::Publish(Self::read_block(&mut cursor, ty as u8)?)
308            }
309            4 => {
310                // confirm_req
311                let ty = (header.extensions & 0x0f00) >> 8;
312                if ty == 1 {
313                    let count = usize::from((header.extensions & 0xf000) >> 12);
314                    let mut hashes = Vec::with_capacity(count);
315                    for _ in 0..count {
316                        let mut hash = BlockHash::default();
317                        let mut root = [0u8; 32];
318                        cursor.read_exact(&mut hash.0)?;
319                        cursor.read_exact(&mut root)?;
320                        if hash == BlockHash::default() && root == [0; 32] {
321                            return Err(io::Error::new(
322                                io::ErrorKind::Other,
323                                "zero hash and root requested in confirm_req",
324                            ));
325                        }
326                        hashes.push((hash, root));
327                    }
328                    Message::ConfirmReqHashes(hashes)
329                } else {
330                    Message::ConfirmReq(Self::read_block(&mut cursor, ty as u8)?)
331                }
332            }
333            5 => {
334                // confirm_ack
335                let ty = (header.extensions & 0x0f00) >> 8;
336                let mut account = Account::default();
337                cursor.read_exact(&mut account.0)?;
338                let mut signature = [0u8; 64];
339                cursor.read_exact(&mut signature)?;
340                let signature = Signature::from_bytes(&signature).unwrap();
341                let sequence = cursor.read_u64::<LittleEndian>()?;
342                let inner = if ty == 1 {
343                    let count = usize::from((header.extensions & 0xf000) >> 12);
344                    let mut hashes = Vec::with_capacity(count);
345                    for _ in 0..count {
346                        let mut hash = BlockHash::default();
347                        cursor.read_exact(&mut hash.0)?;
348                        hashes.push(hash);
349                    }
350                    VoteInner::Hashes(hashes)
351                } else {
352                    let block = Self::read_block(&mut cursor, ty as u8)?;
353                    VoteInner::Block(block)
354                };
355                Message::ConfirmAck(Vote {
356                    account,
357                    signature,
358                    sequence,
359                    inner,
360                })
361            }
362            10 => {
363                // node_id_handshake
364                let query = if header.extensions & NODE_ID_HANDSHAKE_QUERY_FLAG != 0 {
365                    let mut query = [0u8; 32];
366                    cursor.read_exact(&mut query)?;
367                    Some(query)
368                } else {
369                    None
370                };
371                let response = if header.extensions & NODE_ID_HANDSHAKE_RESPONSE_FLAG != 0 {
372                    let mut pubkey = [0u8; 32];
373                    cursor.read_exact(&mut pubkey)?;
374                    let pubkey = PublicKey::from_bytes(&pubkey)
375                        .map_err(|_| io::Error::new(io::ErrorKind::Other, "bad pubkey"))?;
376                    let mut signature = [0u8; 64];
377                    cursor.read_exact(&mut signature)?;
378                    let signature = Signature::from_bytes(&signature)
379                        .map_err(|_| io::Error::new(io::ErrorKind::Other, "bad signature"))?;
380                    Some((pubkey, signature))
381                } else {
382                    None
383                };
384                Message::NodeIdHandshake(query, response)
385            }
386            12 => {
387                // telemetry_req
388                Message::TelemetryReq
389            }
390            6 | 7 | 8 => {
391                return Err(io::Error::new(
392                    io::ErrorKind::Other,
393                    "bootstrap message sent over UDP",
394                ))
395            }
396            x => {
397                return Err(io::Error::new(
398                    io::ErrorKind::Other,
399                    format!("unrecognized message type {}", x),
400                ))
401            }
402        };
403        Ok((header, message))
404    }
405}
406
407// Message types:
408// invalid      0
409// not_a_type   1
410// keepalive    2
411// publish      3
412// confirm_req  4
413// confirm_ack  5
414//
415// Bootstrap message types:
416// bulk_pull    6
417// bulk_push    7
418// frontier_req 8
419
420impl tokio_util::codec::Decoder for NanoCurrencyCodec {
421    type Item = (MessageHeader, Message);
422    type Error = io::Error;
423
424    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
425        let mut cursor = io::Cursor::new(&buf);
426        match self.decode_inner(&mut cursor) {
427            Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
428            Err(err) => Err(err),
429            Ok(message) => {
430                let read = cursor.position() as usize;
431                buf.advance(read);
432                Ok(Some(message))
433            }
434        }
435    }
436}
437
438impl tokio_util::codec::Encoder<Message> for NanoCurrencyCodec {
439    type Error = io::Error;
440
441    fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> io::Result<()> {
442        buf.reserve(8); // header (including extensions)
443        buf.put_slice(&[
444            b'R',
445            Self::network_magic_byte(self.0),
446            NET_VERSION_MAX,
447            NET_VERSION,
448            NET_VERSION_MIN,
449        ]);
450        match msg {
451            Message::Keepalive(peers) => {
452                buf.put_slice(&[2]);
453                buf.put_slice(&[0, 0]); // extensions
454                buf.reserve(peers.len() * (16 + 2));
455                for peer in peers.iter() {
456                    buf.put_slice(&peer.ip().octets());
457                    buf.put_u16_le(peer.port());
458                }
459            }
460            Message::Publish(block) => {
461                buf.put_slice(&[3]);
462                let type_num = Self::block_type_num(block.ty()) as u16;
463                buf.put_u16_le((type_num & 0x0f) << 8);
464                Self::write_block(buf, block);
465            }
466            Message::ConfirmReq(block) => {
467                buf.put_slice(&[4]);
468                let type_num = Self::block_type_num(block.ty()) as u16;
469                buf.put_u16_le((type_num & 0x0f) << 8);
470                Self::write_block(buf, block);
471            }
472            Message::ConfirmReqHashes(hashes) => {
473                let hashes_len = match u16::try_from(hashes.len()) {
474                    Ok(x) if x < 16 => x,
475                    _ => {
476                        return Err(io::Error::new(
477                            io::ErrorKind::Other,
478                            "attempted to send a confirm_req with more than 16 hashes and roots",
479                        ));
480                    }
481                };
482                buf.put_slice(&[4]);
483                buf.put_u16_le((1 << 8) | (hashes_len << 12));
484                buf.reserve(hashes.len() * 64);
485                for (hash, root) in hashes {
486                    buf.put_slice(&hash.0);
487                    buf.put_slice(&root);
488                }
489            }
490            Message::ConfirmAck(Vote {
491                account,
492                signature,
493                sequence,
494                inner,
495            }) => {
496                buf.put_slice(&[5]);
497                match inner {
498                    VoteInner::Block(block) => {
499                        let type_num = Self::block_type_num(block.ty()) as u16;
500                        buf.put_u16_le((type_num & 0x0f) << 8);
501                        buf.reserve(32 + 64 + 8);
502                        buf.put_slice(&account.0);
503                        buf.put_slice(&signature.to_bytes());
504                        buf.put_u64_le(sequence);
505                        Self::write_block(buf, block);
506                    }
507                    VoteInner::Hashes(hashes) => {
508                        let hashes_len = match u16::try_from(hashes.len()) {
509                            Ok(x) if x < 16 => x,
510                            _ => {
511                                return Err(io::Error::new(
512                                    io::ErrorKind::Other,
513                                    "attempted to send a vote with more than 16 hashes",
514                                ));
515                            }
516                        };
517                        buf.put_u16_le((1 << 8) | (hashes_len << 12));
518                        buf.reserve(32 + 64 + 8 + (hashes.len() * 32));
519                        buf.put_slice(&account.0);
520                        buf.put_slice(&signature.to_bytes());
521                        buf.put_u64_le(sequence);
522                        for hash in hashes {
523                            buf.put_slice(&hash.0);
524                        }
525                    }
526                }
527            }
528            Message::NodeIdHandshake(query, response) => {
529                buf.put_slice(&[10]);
530                let mut flags = 0;
531                let mut len = 0;
532                if query.is_some() {
533                    flags |= NODE_ID_HANDSHAKE_QUERY_FLAG;
534                    len += 32;
535                }
536                if response.is_some() {
537                    flags |= NODE_ID_HANDSHAKE_RESPONSE_FLAG;
538                    len += 32 + 64;
539                }
540                buf.put_u16_le(flags);
541                buf.reserve(len);
542                if let Some(query) = query {
543                    buf.put_slice(&query);
544                }
545                if let Some(response) = response {
546                    buf.put_slice(&response.0.to_bytes());
547                    buf.put_slice(&response.1.to_bytes());
548                }
549            }
550            Message::TelemetryReq => {
551                buf.put_slice(&[12]);
552                buf.put_slice(&[0, 0]);
553            }
554        }
555        Ok(())
556    }
557}