snap_coin/node/
message.rs

1use std::array::TryFromSliceError;
2
3use bincode::{Decode, Encode};
4use rand::random;
5use thiserror::Error;
6use tokio::{
7    io::{AsyncReadExt, AsyncWriteExt},
8    net::tcp::{OwnedReadHalf, OwnedWriteHalf},
9};
10
11use crate::{
12    core::{block::Block, transaction::Transaction}, crypto::Hash, version::VERSION
13};
14
15/// Struct that contains every command (request, response) sent on the p2p network
16#[derive(Encode, Decode, Debug, Clone)]
17pub enum Command {
18    // Connect / keep-alive
19    Connect,
20    AcknowledgeConnection,
21    Ping { height: usize },
22    Pong { height: usize },
23    GetPeers,
24    SendPeers { peers: Vec<String> },
25
26    // Live
27    NewBlock { block: Block },
28    NewTransaction { transaction: Transaction },
29
30    // Historical
31    GetBlock { block_hash: Hash },
32    GetBlockResponse { block: Option<Block> },
33    GetBlockHashes { start: usize, end: usize },
34    GetBlockHashesResponse { block_hashes: Vec<Hash> },
35}
36
37#[derive(Error, Debug)]
38pub enum MessageError {
39    #[error("Failed to encode command")]
40    Encoding(#[from] bincode::error::EncodeError),
41
42    #[error("Failed to decode command")]
43    Decoding(#[from] bincode::error::DecodeError),
44
45    #[error("Failed to write or read to / from stream")]
46    Stream(#[from] std::io::Error),
47
48    #[error("Received header length is not correct")]
49    HeaderLength,
50
51    #[error("Received header version or size bytes length is not correct")]
52    HeaderItemLength(#[from] TryFromSliceError),
53}
54
55#[derive(Debug, Clone)]
56pub struct Message {
57    pub version: u16,
58    pub id: u16,
59    pub command: Command,
60}
61
62impl Message {
63    /// New 
64    pub fn new(command: Command) -> Self {
65        Message {
66            version: VERSION,
67            id: random(),
68            command,
69        }
70    }
71
72    pub fn make_response(&self, command: Command) -> Self {
73        Message {
74            version: VERSION,
75            id: self.id,
76            command,
77        }
78    }
79
80    /// Serialize message into a Vec<u8> to be sent
81    /// Message is serialized into: [8 bytes header (version 2, id 2, payload size 4)][payload]
82    pub fn serialize(&self) -> Result<Vec<u8>, MessageError> {
83        // Serialize just the command to get its size
84        let command_bytes = bincode::encode_to_vec(&self.command, bincode::config::standard())?;
85        let size: u32 = command_bytes.len() as u32;
86
87        // Serialize the header first
88        let mut header_bytes: Vec<u8> = Vec::new();
89        header_bytes.extend_from_slice(&self.version.to_be_bytes());
90        header_bytes.extend_from_slice(&self.id.to_be_bytes());
91        header_bytes.extend_from_slice(&size.to_be_bytes());
92
93        // Combine: [header][command]
94        let mut message_bytes = Vec::new();
95        message_bytes.extend_from_slice(&header_bytes);
96        message_bytes.extend_from_slice(&command_bytes);
97
98        Ok(message_bytes)
99    }
100
101    /// Send this message to a TcpStream (its owned write half)
102    pub async fn send(&self, stream: &mut OwnedWriteHalf) -> Result<(), MessageError> {
103        let buf = self.serialize()?;
104        if let Err(e) = stream.write_all(&buf).await {
105            return Err(e.into());
106        }
107        Ok(())
108    }
109
110    /// Read a message from a TcpStream (its owned read half)
111    pub async fn from_stream(stream: &mut OwnedReadHalf) -> Result<Self, MessageError> {
112        let mut header_bytes = [0u8; 8];
113        if stream.read_exact(&mut header_bytes).await? != 8 {
114            return Err(MessageError::HeaderLength);
115        }
116
117        let (version_bytes, id_and_size) = header_bytes.split_at(2);
118        let (id_bytes, size_bytes) = id_and_size.split_at(2);
119
120        let version = u16::from_be_bytes(version_bytes.try_into()?);
121        let id = u16::from_be_bytes(id_bytes.try_into()?);
122        let size = u32::from_be_bytes(size_bytes.try_into()?);
123
124        let mut command_bytes = vec![0u8; size as usize];
125        stream.read_exact(&mut command_bytes).await?;
126
127        let command = bincode::decode_from_slice(&command_bytes, bincode::config::standard())?.0;
128        Ok(Message {
129            command,
130            id,
131            version,
132        })
133    }
134}