snap_coin/node/
message.rs1use std::array::TryFromSliceError;
2
3use bincode::{Decode, Encode};
4use rand::random;
5use thiserror::Error;
6use tokio::{
7 io::{AsyncReadExt, AsyncWriteExt},
8 net::tcp::{OwnedReadHalf, OwnedWriteHalf},
9};
10
11use crate::{
12 core::{
13 block::{Block, BlockMetadata},
14 transaction::{Transaction, TransactionId},
15 },
16 crypto::{Hash, merkle_tree::MerkleTreeProof},
17 version::VERSION,
18};
19
20#[derive(Encode, Decode, Debug, Clone)]
22pub enum Command {
23 Connect,
25 AcknowledgeConnection,
26 Ping {
27 height: usize,
28 },
29 Pong {
30 height: usize,
31 },
32 GetPeers,
33 SendPeers {
34 peers: Vec<String>,
35 },
36
37 NewBlock {
39 block: Block,
40 },
41 NewBlockResolved,
42 NewTransaction {
43 transaction: Transaction,
44 },
45 NewTransactionResolved,
46
47 GetBlockMetadata {
49 block_hash: Hash,
50 },
51 GetBlockMetadataResponse {
52 block_metadata: Option<BlockMetadata>,
53 },
54 GetBlock {
55 block_hash: Hash,
56 },
57 GetBlockResponse {
58 block: Option<Block>,
59 },
60 GetBlockHashes {
61 start: usize,
62 end: usize,
63 },
64 GetBlockHashesResponse {
65 block_hashes: Vec<Hash>,
66 },
67
68 GetTransactionMerkleProof {
70 block: Hash,
71 transaction_id: TransactionId,
72 },
73 GetTransactionMerkleProofResponse {
74 proof: Option<MerkleTreeProof>,
75 },
76}
77
78#[derive(Error, Debug)]
79pub enum MessageError {
80 #[error("Failed to encode command")]
81 Encoding(#[from] bincode::error::EncodeError),
82
83 #[error("Failed to decode command")]
84 Decoding(#[from] bincode::error::DecodeError),
85
86 #[error("Failed to write or read to / from stream")]
87 Stream(#[from] std::io::Error),
88
89 #[error("Received header length is not correct")]
90 HeaderLength,
91
92 #[error("Received header version or size bytes length is not correct")]
93 HeaderItemLength(#[from] TryFromSliceError),
94}
95
96pub type MessageId = u32;
97
98#[derive(Debug, Clone)]
99pub struct Message {
100 pub version: u16,
101 pub id: MessageId,
102 pub command: Command,
103}
104
105impl Message {
106 pub fn new(command: Command) -> Self {
108 Message {
109 version: VERSION,
110 id: random(),
111 command,
112 }
113 }
114
115 pub fn make_response(&self, command: Command) -> Self {
116 Message {
117 version: VERSION,
118 id: self.id,
119 command,
120 }
121 }
122
123 pub fn serialize(&self) -> Result<Vec<u8>, MessageError> {
126 let command_bytes = bincode::encode_to_vec(&self.command, bincode::config::standard())?;
128 let size: u32 = command_bytes.len() as u32;
129
130 let mut header_bytes: Vec<u8> = Vec::new();
132 header_bytes.extend_from_slice(&self.version.to_be_bytes());
133 header_bytes.extend_from_slice(&self.id.to_be_bytes());
134 header_bytes.extend_from_slice(&size.to_be_bytes());
135
136 let mut message_bytes = Vec::new();
138 message_bytes.extend_from_slice(&header_bytes);
139 message_bytes.extend_from_slice(&command_bytes);
140
141 Ok(message_bytes)
142 }
143
144 pub async fn send(&self, stream: &mut OwnedWriteHalf) -> Result<(), MessageError> {
146 let buf = self.serialize()?;
147 if let Err(e) = stream.write_all(&buf).await {
148 return Err(e.into());
149 }
150 Ok(())
152 }
153
154 pub async fn from_stream(stream: &mut OwnedReadHalf) -> Result<Self, MessageError> {
156 let mut header_bytes = [0u8; 10];
157 if stream.read_exact(&mut header_bytes).await? != 10 {
158 return Err(MessageError::HeaderLength);
159 }
160
161 let (version_bytes, id_and_size) = header_bytes.split_at(2);
162 let (id_bytes, size_bytes) = id_and_size.split_at(4);
163
164 let version = u16::from_be_bytes(version_bytes.try_into()?);
165 let id = MessageId::from_be_bytes(id_bytes.try_into()?);
166 let size = u32::from_be_bytes(size_bytes.try_into()?);
167
168 let mut command_bytes = vec![0u8; size as usize];
169 stream.read_exact(&mut command_bytes).await?;
170
171 let command = bincode::decode_from_slice(&command_bytes, bincode::config::standard())?.0;
172 Ok(Message {
174 command,
175 id,
176 version,
177 })
178 }
179}