Skip to main content

snap_coin/node/
message.rs

1use std::array::TryFromSliceError;
2
3use bincode::{Decode, Encode};
4use rand::random;
5use thiserror::Error;
6use tokio::{
7    io::{AsyncReadExt, AsyncWriteExt},
8    net::tcp::{OwnedReadHalf, OwnedWriteHalf},
9};
10
11use crate::{
12    core::{
13        block::{Block, BlockMetadata},
14        transaction::{Transaction, TransactionId},
15    },
16    crypto::{Hash, merkle_tree::MerkleTreeProof},
17    version::VERSION,
18};
19
20/// Struct that contains every command (request, response) sent on the p2p network
21#[derive(Encode, Decode, Debug, Clone)]
22pub enum Command {
23    // Connect / keep-alive
24    Connect,
25    AcknowledgeConnection,
26    Ping {
27        height: usize,
28    },
29    Pong {
30        height: usize,
31    },
32    GetPeers,
33    SendPeers {
34        peers: Vec<String>,
35    },
36
37    // Live
38    NewBlock {
39        block: Block,
40    },
41    NewBlockResolved,
42    NewTransaction {
43        transaction: Transaction,
44    },
45    NewTransactionResolved,
46
47    // Historical
48    GetBlockMetadata {
49        block_hash: Hash,
50    },
51    GetBlockMetadataResponse {
52        block_metadata: Option<BlockMetadata>,
53    },
54    GetBlock {
55        block_hash: Hash,
56    },
57    GetBlockResponse {
58        block: Option<Block>,
59    },
60    GetBlockHashes {
61        start: usize,
62        end: usize,
63    },
64    GetBlockHashesResponse {
65        block_hashes: Vec<Hash>,
66    },
67
68    // Light node
69    GetTransactionMerkleProof {
70        block: Hash,
71        transaction_id: TransactionId,
72    },
73    GetTransactionMerkleProofResponse {
74        proof: Option<MerkleTreeProof>,
75    },
76}
77
78#[derive(Error, Debug)]
79pub enum MessageError {
80    #[error("Failed to encode command")]
81    Encoding(#[from] bincode::error::EncodeError),
82
83    #[error("Failed to decode command")]
84    Decoding(#[from] bincode::error::DecodeError),
85
86    #[error("Failed to write or read to / from stream")]
87    Stream(#[from] std::io::Error),
88
89    #[error("Received header length is not correct")]
90    HeaderLength,
91
92    #[error("Received header version or size bytes length is not correct")]
93    HeaderItemLength(#[from] TryFromSliceError),
94}
95
96pub type MessageId = u32;
97
98#[derive(Debug, Clone)]
99pub struct Message {
100    pub version: u16,
101    pub id: MessageId,
102    pub command: Command,
103}
104
105impl Message {
106    /// New
107    pub fn new(command: Command) -> Self {
108        Message {
109            version: VERSION,
110            id: random(),
111            command,
112        }
113    }
114
115    pub fn make_response(&self, command: Command) -> Self {
116        Message {
117            version: VERSION,
118            id: self.id,
119            command,
120        }
121    }
122
123    /// Serialize message into a Vec<u8> to be sent
124    /// Message is serialized into: `[10 bytes header (version 2, id 4, payload size 4)][payload]`
125    pub fn serialize(&self) -> Result<Vec<u8>, MessageError> {
126        // Serialize just the command to get its size
127        let command_bytes = bincode::encode_to_vec(&self.command, bincode::config::standard())?;
128        let size: u32 = command_bytes.len() as u32;
129
130        // Serialize the header first
131        let mut header_bytes: Vec<u8> = Vec::new();
132        header_bytes.extend_from_slice(&self.version.to_be_bytes());
133        header_bytes.extend_from_slice(&self.id.to_be_bytes());
134        header_bytes.extend_from_slice(&size.to_be_bytes());
135
136        // Combine: [header][command]
137        let mut message_bytes = Vec::new();
138        message_bytes.extend_from_slice(&header_bytes);
139        message_bytes.extend_from_slice(&command_bytes);
140
141        Ok(message_bytes)
142    }
143
144    /// Send this message to a TcpStream (its owned write half)
145    pub async fn send(&self, stream: &mut OwnedWriteHalf) -> Result<(), MessageError> {
146        let buf = self.serialize()?;
147        if let Err(e) = stream.write_all(&buf).await {
148            return Err(e.into());
149        }
150        // info!("TX: {:#?}", self.command);
151        Ok(())
152    }
153
154    /// Read a message from a TcpStream (its owned read half)
155    pub async fn from_stream(stream: &mut OwnedReadHalf) -> Result<Self, MessageError> {
156        let mut header_bytes = [0u8; 10];
157        if stream.read_exact(&mut header_bytes).await? != 10 {
158            return Err(MessageError::HeaderLength);
159        }
160
161        let (version_bytes, id_and_size) = header_bytes.split_at(2);
162        let (id_bytes, size_bytes) = id_and_size.split_at(4);
163
164        let version = u16::from_be_bytes(version_bytes.try_into()?);
165        let id = MessageId::from_be_bytes(id_bytes.try_into()?);
166        let size = u32::from_be_bytes(size_bytes.try_into()?);
167
168        let mut command_bytes = vec![0u8; size as usize];
169        stream.read_exact(&mut command_bytes).await?;
170
171        let command = bincode::decode_from_slice(&command_bytes, bincode::config::standard())?.0;
172        // info!("RX: {:#?}", command);
173        Ok(Message {
174            command,
175            id,
176            version,
177        })
178    }
179}