1use bincode::error::EncodeError;
2use std::{
3 collections::{HashMap, VecDeque},
4 net::SocketAddr,
5 sync::Arc,
6 time::Duration,
7};
8use thiserror::Error;
9use tokio::{
10 net::TcpStream,
11 sync::{RwLock, oneshot},
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15
16use crate::{
17 core::{blockchain::BlockchainError, utxo::TransactionError},
18 node::{
19 message::{Command, Message, MessageError},
20 node::Node,
21 sync::sync_to_peer,
22 },
23};
24
25#[derive(Error, Debug)]
26pub enum PeerError {
27 #[error("{0}")]
28 MessageError(#[from] MessageError),
29
30 #[error("Disconnected")]
31 Disconnected,
32
33 #[error("Blockchain error: {0}")]
34 BlockchainError(#[from] BlockchainError),
35
36 #[error("Transaction error: {0}")]
37 TransactionError(#[from] TransactionError),
38
39 #[error("Sync peer returned an invalid response")]
40 SyncResponseInvalid,
41
42 #[error("Could not find fork point with peer")]
43 NoForkPoint,
44
45 #[error("Block has invalid difficulty")]
46 BadBlockDifficulty,
47
48 #[error("Block has invalid block hash")]
49 BadBlockHash,
50
51 #[error("Block has no block hash attached")]
52 NoBlockHash,
53
54 #[error("Encode error: {0}")]
55 EncodeError(#[from] EncodeError),
56}
57
58pub const TIMEOUT: Duration = Duration::from_secs(15);
59
60pub struct Peer {
62 pub address: SocketAddr,
63
64 pub is_client: bool,
65
66 send_queue: VecDeque<Message>,
68
69 pending: HashMap<u16, oneshot::Sender<Message>>,
71}
72
73impl Peer {
74 pub fn new(address: SocketAddr, is_client: bool) -> Self {
76 Self {
77 address,
78 is_client,
79 send_queue: VecDeque::new(),
80 pending: HashMap::new(),
81 }
82 }
83
84 async fn on_fail(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>) {
85 let peer_address = peer.read().await.address;
86
87 let mut node_peers = node.write().await;
88
89 let mut new_peers = Vec::new();
90 for p in node_peers.peers.drain(..) {
91 let p_address = p.read().await.address;
92 if p_address != peer_address {
93 new_peers.push(p);
94 }
95 }
96
97 node_peers.peers = new_peers;
98 }
99
100 pub async fn connect(
102 peer: Arc<RwLock<Peer>>,
103 node: Arc<RwLock<Node>>,
104 stream: TcpStream,
105 ) -> JoinHandle<Result<(), PeerError>> {
106 let (mut read_stream, mut write_stream) = stream.into_split();
107
108 tokio::spawn(async move {
110 let peer_cloned = peer.clone();
111 let node_cloned = node.clone();
112
113 let pinger = {
115 let peer = peer.clone();
116 let node = node.clone();
117 Box::pin(async move {
118 loop {
119 sleep(Duration::from_secs(5)).await; let height = node.read().await.blockchain.get_height();
121 match Peer::request(
122 peer.clone(),
124 Message::new(Command::Ping { height }),
125 )
126 .await?
127 .command
128 {
129 Command::Pong { .. } => {}
130 _ => {}
131 }
132 }
133 #[allow(unreachable_code)]
134 Ok::<(), PeerError>(())
135 })
136 };
137
138 let reader = {
140 let peer = peer.clone();
141 let node = node.clone();
142 Box::pin(async move {
143 loop {
144 let msg = Message::from_stream(&mut read_stream).await?;
145 match timeout(
146 TIMEOUT,
147 Peer::handle_incoming(peer.clone(), node.clone(), msg),
148 )
149 .await
150 {
151 Ok(()) => {}
152 Err(..) => return Err(PeerError::Disconnected),
153 }
154 }
155 #[allow(unreachable_code)]
156 Ok::<(), PeerError>(())
157 })
158 };
159
160 let writer = {
162 let peer = peer.clone();
163 Box::pin(async move {
164 loop {
165 let maybe_msg = {
166 let mut p = peer.write().await;
167 p.send_queue.pop_front()
168 };
169
170 if let Some(msg) = maybe_msg {
171 match timeout(TIMEOUT, msg.send(&mut write_stream)).await {
172 Ok(e) => e?,
173 Err(..) => return Err(PeerError::Disconnected),
174 }
175 } else {
176 sleep(Duration::from_millis(10)).await;
177 }
178 }
179 #[allow(unreachable_code)]
180 Ok::<(), PeerError>(())
181 })
182 };
183
184 let result = tokio::select! {
186 r = reader => r,
187 r = writer => r,
188 r = pinger => r,
189 };
190
191 if let Err(e) = result {
192 Node::log(format!(
193 "Disconnected from peer: {}:{}. Error: {:?}",
194 peer.read().await.address.ip(),
195 peer.read().await.address.port(),
196 e
197 ));
198 let peer_cloned = peer_cloned.clone();
199 let node_cloned = node_cloned.clone();
200
201 tokio::spawn(async move {
202 Self::on_fail(peer_cloned, node_cloned).await;
203 });
204 }
205 Ok(())
206 })
207 }
208
209 async fn handle_incoming(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
211 {
212 let mut p = peer.write().await;
213 if let Some(tx) = p.pending.remove(&message.id) {
214 let _ = tx.send(message);
215 return;
216 }
217 }
218
219 Peer::on_message(peer.clone(), node.clone(), message).await;
220 }
221
222 async fn on_message(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
224 if let Err(err) = async {
225 match message.command {
226 Command::Connect => {
227 Peer::send(peer, message.make_response(Command::AcknowledgeConnection)).await;
228 }
229 Command::AcknowledgeConnection => {
230 Node::log(format!("Got unhandled AcknowledgeConnection"));
231 }
232 Command::Ping { height } => {
233 let local_height = node.read().await.blockchain.get_height();
234 Peer::send(
235 peer.clone(),
236 message.make_response(Command::Pong {
237 height: local_height,
238 }),
239 )
240 .await;
241
242 if local_height < height {
243 tokio::spawn(async move {
244 if node.read().await.is_syncing {
245 return;
246 }
247 node.write().await.is_syncing = true;
248 let result = sync_to_peer(node.clone(), peer.clone(), height).await;
249
250 if let Err(e) = result {
251 Node::log(format!("[SYNC] Failed: {}, disconnecting from {}", e, peer.read().await.address));
252 let node = node.clone();
253 tokio::spawn(async move {
254 Peer::on_fail(peer, node).await;
255 });
256 } else {
257 Node::log(format!("[SYNC] Completed"));
258 }
259
260 node.write().await.is_syncing = false;
261 });
262 }
263 }
264 Command::Pong { .. } => {
265 Node::log(format!("Got unhandled Pong"));
266 }
267 Command::GetPeers => {
268 let peers: Vec<String> = {
269 let node_read = node.read().await;
270 let mut peer_addrs = Vec::new();
271 for p in &node_read.peers {
272 if p.read().await.is_client {
273 continue;
274 }
275 let p_addr = p.read().await.address.to_string();
276 peer_addrs.push(p_addr);
277 }
278 peer_addrs
279 };
280 let response = message.make_response(Command::SendPeers { peers });
281 Peer::send(peer, response).await;
282 }
283 Command::SendPeers { .. } => {
284 Node::log(format!("Got unhandled SendPeers"));
285 }
286 Command::NewBlock { ref block } => {
287 if Some(node.read().await.last_seen_block) != block.hash {
289 Node::submit_block(node.clone(), block.clone()).await?;
290 }
291 }
292 Command::NewTransaction { ref transaction } => {
293 if !node
295 .read()
296 .await
297 .mempool
298 .validate_transaction(transaction)
299 .await
300 {
301 return Ok(());
302 }
303
304 Node::submit_transaction(node, transaction.clone()).await?;
305 }
306 Command::GetBlock { block_hash } => {
307 Peer::send(
308 peer,
309 message.make_response(Command::GetBlockResponse {
310 block: node.read().await.blockchain.get_block_by_hash(&block_hash),
311 }),
312 )
313 .await;
314 }
315 Command::GetBlockResponse { .. } => {
316 Node::log(format!("Got unhandled SendBlock"));
317 }
318 Command::GetBlockHashes { start, end } => {
319 let mut block_hashes = Vec::new();
320 for i in start..end {
321 if let Some(block_hash) =
322 node.read().await.blockchain.get_block_hash_by_height(i)
323 {
324 block_hashes.push(*block_hash);
325 }
326 }
327 Peer::send(
328 peer,
329 message.make_response(Command::GetBlockHashesResponse { block_hashes }),
330 )
331 .await;
332 }
333 Command::GetBlockHashesResponse { .. } => {
334 Node::log(format!("Got unhandled SendBlockHashes"));
335 }
336 };
337 Ok::<(), PeerError>(())
338 }
339 .await
340 {
341 Node::log(format!("Error processing incoming message: {err}"));
342 }
343 }
344
345 pub async fn request(peer: Arc<RwLock<Peer>>, message: Message) -> Result<Message, PeerError> {
347 let id = message.id;
348
349 let (tx, rx) = oneshot::channel();
350
351 {
352 let mut p = peer.write().await;
353 p.pending.insert(id, tx);
354 p.send_queue.push_back(message);
355 }
356
357 match timeout(Duration::from_secs(10), rx).await {
358 Ok(Ok(msg)) => Ok(msg),
359 Ok(Err(_)) => Err(PeerError::Disconnected),
360 Err(_) => Err(PeerError::Disconnected),
361 }
362 }
363
364 pub async fn send(peer: Arc<RwLock<Peer>>, message: Message) {
366 let mut p = peer.write().await;
367 p.send_queue.push_back(message);
368 }
369
370 pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
372 let peers = {
374 let guard = node.read().await;
375 guard.peers.clone()
376 };
377
378 for peer in peers {
379 Peer::send(peer, message.clone()).await;
381 }
382 }
383}