1use bincode::error::EncodeError;
2use std::pin::Pin;
3use std::{
4 collections::{HashMap, VecDeque},
5 net::SocketAddr,
6 sync::Arc,
7 time::Duration,
8};
9use thiserror::Error;
10use tokio::{
11 net::TcpStream,
12 sync::{RwLock, oneshot},
13 task::JoinHandle,
14 time::{sleep, timeout},
15};
16
17use crate::{
18 core::{blockchain::BlockchainError, transaction::TransactionId, utxo::TransactionError},
19 node::{
20 message::{Command, Message, MessageError},
21 node::Node,
22 sync::sync_to_peer,
23 },
24};
25
26#[derive(Error, Debug)]
27pub enum PeerError {
28 #[error("{0}")]
29 MessageError(#[from] MessageError),
30
31 #[error("Disconnected")]
32 Disconnected,
33
34 #[error("Blockchain error: {0}")]
35 BlockchainError(#[from] BlockchainError),
36
37 #[error("Transaction error: {0}")]
38 TransactionError(#[from] TransactionError),
39
40 #[error("Sync peer returned an invalid response")]
41 SyncResponseInvalid,
42
43 #[error("Could not find fork point with peer")]
44 NoForkPoint,
45
46 #[error("Block has invalid difficulty")]
47 BadBlockDifficulty,
48
49 #[error("Block has invalid block hash")]
50 BadBlockHash,
51
52 #[error("Block has no block hash attached")]
53 NoBlockHash,
54
55 #[error("Encode error: {0}")]
56 EncodeError(#[from] EncodeError),
57}
58
59pub struct Peer {
61 pub address: SocketAddr,
62
63 send_queue: VecDeque<Message>,
65
66 pending: HashMap<u16, oneshot::Sender<Message>>,
68
69 shutdown: bool,
71
72 seen_transactions: VecDeque<TransactionId>,
73}
74
75impl Peer {
76 pub fn new(address: SocketAddr) -> Self {
78 Self {
79 address,
80 send_queue: VecDeque::new(),
81 pending: HashMap::new(),
82 shutdown: false,
83 seen_transactions: VecDeque::new(),
84 }
85 }
86
87 pub async fn kill(peer: Arc<RwLock<Peer>>) {
89 let mut p = peer.write().await;
90 p.shutdown = true;
91 p.send_queue.clear();
92 }
93
94 pub async fn connect<F>(
96 peer: Arc<RwLock<Peer>>,
97 node: Arc<RwLock<Node>>,
98 on_fail: F,
99 stream: TcpStream,
100 ) -> JoinHandle<Result<(), PeerError>>
101 where
102 F: Fn(
103 Arc<RwLock<Peer>>,
104 Arc<RwLock<Node>>,
105 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
106 + Send
107 + Sync
108 + 'static,
109 {
110 let (mut read_stream, mut write_stream) = stream.into_split();
111
112 tokio::spawn(async move {
114 let peer_cloned = peer.clone();
115 let node_cloned = node.clone();
116
117 let pinger = {
119 let peer = peer.clone();
120 let node = node.clone();
121 Box::pin(async move {
122 loop {
123 {
124 let p = peer.read().await;
125 if p.shutdown {
126 return Ok(());
127 }
128 }
129 sleep(Duration::from_secs(5)).await; match Peer::request(
131 peer.clone(),
133 Message::new(Command::Ping {
134 height: node.read().await.blockchain.get_height(),
135 }),
136 )
137 .await?
138 .command
139 {
140 Command::Pong { .. } => {}
141 _ => {}
142 }
143 }
144 #[allow(unreachable_code)]
145 Ok::<(), PeerError>(())
146 })
147 };
148
149 let reader = {
151 let peer = peer.clone();
152 let node = node.clone();
153 Box::pin(async move {
154 loop {
155 {
156 let p = peer.read().await;
157 if p.shutdown {
158 return Err(PeerError::Disconnected);
159 }
160 }
161 let msg = Message::from_stream(&mut read_stream).await?;
162 Peer::handle_incoming(peer.clone(), node.clone(), msg).await;
163 }
164 #[allow(unreachable_code)]
165 Ok::<(), PeerError>(())
166 })
167 };
168
169 let writer = {
171 let peer = peer.clone();
172 Box::pin(async move {
173 loop {
174 {
175 let p = peer.read().await;
176 if p.shutdown {
177 return Err(PeerError::Disconnected);
178 }
179 }
180
181 let maybe_msg = {
182 let mut p = peer.write().await;
183 p.send_queue.pop_front()
184 };
185
186 if let Some(msg) = maybe_msg {
187 msg.send(&mut write_stream).await?;
188 } else {
189 sleep(Duration::from_millis(10)).await;
190 }
191 }
192 #[allow(unreachable_code)]
193 Ok::<(), PeerError>(())
194 })
195 };
196
197 let result = tokio::select! {
199 r = reader => r,
200 r = writer => r,
201 r = pinger => r,
202 };
203
204 if let Err(e) = result {
205 Node::log(format!(
206 "Disconnected peer: {}:{}. Error: {:?}",
207 peer.read().await.address.ip(),
208 peer.read().await.address.port(),
209 e
210 ));
211 let peer_cloned = peer_cloned.clone();
212 let node_cloned = node_cloned.clone();
213 tokio::spawn(async move {
214 on_fail(peer_cloned, node_cloned).await;
215 });
216 }
217 Ok(())
218 })
219 }
220
221 async fn handle_incoming(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
223 {
224 let mut p = peer.write().await;
225 if let Some(tx) = p.pending.remove(&message.id) {
226 let _ = tx.send(message);
227 return;
228 }
229 }
230
231 Peer::on_message(peer.clone(), node.clone(), message).await;
232 }
233
234 async fn on_message(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
236 if let Err(err) = async {
237 match message.command {
238 Command::Connect => {
239 Peer::send(peer, message.make_response(Command::AcknowledgeConnection)).await;
240 }
241 Command::AcknowledgeConnection => {
242 Node::log(format!("Got unhandled AcknowledgeConnection"));
243 }
244 Command::Ping { height } => {
245 let local_height = node.read().await.blockchain.get_height();
246 Peer::send(
247 peer.clone(),
248 message.make_response(Command::Pong {
249 height: local_height,
250 }),
251 )
252 .await;
253
254 tokio::spawn(async move {
256 if local_height < height {
257 Node::log(format!("[SYNC] Starting sync!"));
258 {
259 let mut node = node.write().await;
260 if node.is_syncing {
261 return;
262 }
263 node.is_syncing = true;
264 }
265 match sync_to_peer(node.clone(), Arc::clone(&peer), height).await {
266 Ok(()) => {
267 Node::log(format!("[SYNC] Complete!"));
268 }
269 Err(e) => {
270 Node::log(format!("[SYNC] Failed to sync! {}", e.to_string()));
271 }
272 }
273 node.write().await.is_syncing = false;
274 }
275 });
276 }
277 Command::Pong { .. } => {
278 Node::log(format!("Got unhandled Pong"));
279 }
280 Command::GetPeers => {
281 let peers: Vec<String> = {
282 let node_read = node.read().await;
283 let mut peer_addrs = Vec::new();
284 for p in &node_read.peers {
285 let p_addr = p.read().await.address.to_string();
286 peer_addrs.push(p_addr);
287 }
288 peer_addrs
289 };
290 let response = message.make_response(Command::SendPeers { peers });
291 Peer::send(peer, response).await;
292 }
293 Command::SendPeers { .. } => {
294 Node::log(format!("Got unhandled SendPeers"));
295 }
296 Command::NewBlock { ref block } => {
297 if Some(node.read().await.last_seen_block) != block.hash {
299 Node::submit_block(node.clone(), block.clone()).await?;
300
301 Node::log(format!(
302 "New block accepted: {}",
303 block.hash.unwrap().dump_base36()
304 ));
305 }
306 }
307 Command::NewTransaction { ref transaction } => {
308 if peer
310 .read()
311 .await
312 .seen_transactions
313 .contains(&transaction.transaction_id.unwrap())
314 {
315 return Ok(());
316 }
317
318 Node::submit_transaction(node, transaction.clone()).await?;
319
320 Node::log(format!(
321 "New transaction accepted: {}",
322 transaction.transaction_id.unwrap().dump_base36()
323 ));
324 }
325 Command::GetBlock { block_hash } => {
326 Peer::send(
327 peer,
328 message.make_response(Command::GetBlockResponse {
329 block: node.read().await.blockchain.get_block_by_hash(&block_hash),
330 }),
331 )
332 .await;
333 }
334 Command::GetBlockResponse { .. } => {
335 Node::log(format!("Got unhandled SendBlock"));
336 }
337 Command::GetBlockHashes { start, end } => {
338 let mut block_hashes = Vec::new();
339 for i in start..end {
340 if let Some(block_hash) =
341 node.read().await.blockchain.get_block_hash_by_height(i)
342 {
343 block_hashes.push(*block_hash);
344 }
345 }
346 Peer::send(
347 peer,
348 message.make_response(Command::GetBlockHashesResponse { block_hashes }),
349 )
350 .await;
351 }
352 Command::GetBlockHashesResponse { .. } => {
353 Node::log(format!("Got unhandled SendBlockHashes"));
354 }
355 };
356 Ok::<(), PeerError>(())
357 }
358 .await
359 {
360 Node::log(format!("Error processing incoming message: {err}"));
361 }
362 }
363
364 pub async fn request(peer: Arc<RwLock<Peer>>, message: Message) -> Result<Message, PeerError> {
366 let id = message.id;
367
368 let (tx, rx) = oneshot::channel();
369
370 {
371 let mut p = peer.write().await;
372 p.pending.insert(id, tx);
373 p.send_queue.push_back(message);
374 }
375
376 match timeout(Duration::from_secs(10), rx).await {
377 Ok(Ok(msg)) => Ok(msg),
378 Ok(Err(_)) => Err(PeerError::Disconnected),
379 Err(_) => Err(PeerError::Disconnected),
380 }
381 }
382
383 pub async fn send(peer: Arc<RwLock<Peer>>, message: Message) {
385 let mut p = peer.write().await;
386 p.send_queue.push_back(message);
387 }
388
389 pub async fn send_to_peers(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
391 for i_peer in &node.read().await.peers {
392 if !Arc::ptr_eq(&i_peer, &peer) {
393 Peer::send(Arc::clone(i_peer), message.clone()).await;
394 }
395 }
396 }
397}