snap_coin/node/
message.rs1use std::array::TryFromSliceError;
2
3use bincode::{Decode, Encode};
4use rand::random;
5use thiserror::Error;
6use tokio::{
7 io::{AsyncReadExt, AsyncWriteExt},
8 net::tcp::{OwnedReadHalf, OwnedWriteHalf},
9};
10
11use crate::{
12 core::{block::Block, transaction::Transaction}, crypto::Hash, version::VERSION
13};
14
15#[derive(Encode, Decode, Debug, Clone)]
17pub enum Command {
18 Connect,
20 AcknowledgeConnection,
21 Ping { height: usize },
22 Pong { height: usize },
23 GetPeers,
24 SendPeers { peers: Vec<String> },
25
26 NewBlock { block: Block },
28 NewTransaction { transaction: Transaction },
29
30 GetBlock { block_hash: Hash },
32 GetBlockResponse { block: Option<Block> },
33 GetBlockHashes { start: usize, end: usize },
34 GetBlockHashesResponse { block_hashes: Vec<Hash> },
35}
36
37#[derive(Error, Debug)]
38pub enum MessageError {
39 #[error("Failed to encode command")]
40 Encoding(#[from] bincode::error::EncodeError),
41
42 #[error("Failed to decode command")]
43 Decoding(#[from] bincode::error::DecodeError),
44
45 #[error("Failed to write or read to / from stream")]
46 Stream(#[from] std::io::Error),
47
48 #[error("Received header length is not correct")]
49 HeaderLength,
50
51 #[error("Received header version or size bytes length is not correct")]
52 HeaderItemLength(#[from] TryFromSliceError),
53}
54
55#[derive(Debug, Clone)]
56pub struct Message {
57 pub version: u16,
58 pub id: u16,
59 pub command: Command,
60}
61
62impl Message {
63 pub fn new(command: Command) -> Self {
65 Message {
66 version: VERSION,
67 id: random(),
68 command,
69 }
70 }
71
72 pub fn make_response(&self, command: Command) -> Self {
73 Message {
74 version: VERSION,
75 id: self.id,
76 command,
77 }
78 }
79
80 pub fn serialize(&self) -> Result<Vec<u8>, MessageError> {
83 let command_bytes = bincode::encode_to_vec(&self.command, bincode::config::standard())?;
85 let size: u32 = command_bytes.len() as u32;
86
87 let mut header_bytes: Vec<u8> = Vec::new();
89 header_bytes.extend_from_slice(&self.version.to_be_bytes());
90 header_bytes.extend_from_slice(&self.id.to_be_bytes());
91 header_bytes.extend_from_slice(&size.to_be_bytes());
92
93 let mut message_bytes = Vec::new();
95 message_bytes.extend_from_slice(&header_bytes);
96 message_bytes.extend_from_slice(&command_bytes);
97
98 Ok(message_bytes)
99 }
100
101 pub async fn send(&self, stream: &mut OwnedWriteHalf) -> Result<(), MessageError> {
103 let buf = self.serialize()?;
104 if let Err(e) = stream.write_all(&buf).await {
105 return Err(e.into());
106 }
107 Ok(())
108 }
109
110 pub async fn from_stream(stream: &mut OwnedReadHalf) -> Result<Self, MessageError> {
112 let mut header_bytes = [0u8; 8];
113 if stream.read_exact(&mut header_bytes).await? != 8 {
114 return Err(MessageError::HeaderLength);
115 }
116
117 let (version_bytes, id_and_size) = header_bytes.split_at(2);
118 let (id_bytes, size_bytes) = id_and_size.split_at(2);
119
120 let version = u16::from_be_bytes(version_bytes.try_into()?);
121 let id = u16::from_be_bytes(id_bytes.try_into()?);
122 let size = u32::from_be_bytes(size_bytes.try_into()?);
123
124 let mut command_bytes = vec![0u8; size as usize];
125 stream.read_exact(&mut command_bytes).await?;
126
127 let command = bincode::decode_from_slice(&command_bytes, bincode::config::standard())?.0;
128 Ok(Message {
129 command,
130 id,
131 version,
132 })
133 }
134}