snap_coin/node/
peer.rs

1use bincode::error::EncodeError;
2use std::pin::Pin;
3use std::{
4    collections::{HashMap, VecDeque},
5    net::SocketAddr,
6    sync::Arc,
7    time::Duration,
8};
9use thiserror::Error;
10use tokio::{
11    net::TcpStream,
12    sync::{RwLock, oneshot},
13    task::JoinHandle,
14    time::{sleep, timeout},
15};
16
17use crate::{
18    core::{blockchain::BlockchainError, transaction::TransactionId, utxo::TransactionError},
19    node::{
20        message::{Command, Message, MessageError},
21        node::Node,
22        sync::sync_to_peer,
23    },
24};
25
26#[derive(Error, Debug)]
27pub enum PeerError {
28    #[error("{0}")]
29    MessageError(#[from] MessageError),
30
31    #[error("Disconnected")]
32    Disconnected,
33
34    #[error("Blockchain error: {0}")]
35    BlockchainError(#[from] BlockchainError),
36
37    #[error("Transaction error: {0}")]
38    TransactionError(#[from] TransactionError),
39
40    #[error("Sync peer returned an invalid response")]
41    SyncResponseInvalid,
42
43    #[error("Could not find fork point with peer")]
44    NoForkPoint,
45
46    #[error("Block has invalid difficulty")]
47    BadBlockDifficulty,
48
49    #[error("Block has invalid block hash")]
50    BadBlockHash,
51
52    #[error("Block has no block hash attached")]
53    NoBlockHash,
54
55    #[error("Encode error: {0}")]
56    EncodeError(#[from] EncodeError),
57}
58
59/// A struct representing one peer (peer connection. Can be both a client peer or a connected peer)
60pub struct Peer {
61    pub address: SocketAddr,
62
63    // Outgoing messages waiting to be written to stream
64    send_queue: VecDeque<Message>,
65
66    // Pending requests waiting for a response (id -> oneshot sender)
67    pending: HashMap<u16, oneshot::Sender<Message>>,
68
69    // Shutdown flag
70    shutdown: bool,
71
72    seen_transactions: VecDeque<TransactionId>,
73}
74
75impl Peer {
76    /// Create a new peer
77    pub fn new(address: SocketAddr) -> Self {
78        Self {
79            address,
80            send_queue: VecDeque::new(),
81            pending: HashMap::new(),
82            shutdown: false,
83            seen_transactions: VecDeque::new(),
84        }
85    }
86
87    /// Immediately terminates the peer connection
88    pub async fn kill(peer: Arc<RwLock<Peer>>) {
89        let mut p = peer.write().await;
90        p.shutdown = true;
91        p.send_queue.clear();
92    }
93
94    /// Main connection handler
95    pub async fn connect<F>(
96        peer: Arc<RwLock<Peer>>,
97        node: Arc<RwLock<Node>>,
98        on_fail: F,
99        stream: TcpStream,
100    ) -> JoinHandle<Result<(), PeerError>>
101    where
102        F: Fn(
103                Arc<RwLock<Peer>>,
104                Arc<RwLock<Node>>,
105            ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
106            + Send
107            + Sync
108            + 'static,
109    {
110        let (mut read_stream, mut write_stream) = stream.into_split();
111
112        // Spawn peer handler task
113        tokio::spawn(async move {
114            let peer_cloned = peer.clone();
115            let node_cloned = node.clone();
116
117            // Spawn ping / pong task
118            let pinger = {
119                let peer = peer.clone();
120                let node = node.clone();
121                Box::pin(async move {
122                    loop {
123                        {
124                            let p = peer.read().await;
125                            if p.shutdown {
126                                return Ok(());
127                            }
128                        }
129                        sleep(Duration::from_secs(5)).await; // 5 second ping interval
130                        match Peer::request(
131                            // Send Ping and wait for Pong
132                            peer.clone(),
133                            Message::new(Command::Ping {
134                                height: node.read().await.blockchain.get_height(),
135                            }),
136                        )
137                        .await?
138                        .command
139                        {
140                            Command::Pong { .. } => {}
141                            _ => {}
142                        }
143                    }
144                    #[allow(unreachable_code)]
145                    Ok::<(), PeerError>(())
146                })
147            };
148
149            // Spawn reader task
150            let reader = {
151                let peer = peer.clone();
152                let node = node.clone();
153                Box::pin(async move {
154                    loop {
155                        {
156                            let p = peer.read().await;
157                            if p.shutdown {
158                                return Err(PeerError::Disconnected);
159                            }
160                        }
161                        let msg = Message::from_stream(&mut read_stream).await?;
162                        Peer::handle_incoming(peer.clone(), node.clone(), msg).await;
163                    }
164                    #[allow(unreachable_code)]
165                    Ok::<(), PeerError>(())
166                })
167            };
168
169            // Spawn writer task
170            let writer = {
171                let peer = peer.clone();
172                Box::pin(async move {
173                    loop {
174                        {
175                            let p = peer.read().await;
176                            if p.shutdown {
177                                return Err(PeerError::Disconnected);
178                            }
179                        }
180
181                        let maybe_msg = {
182                            let mut p = peer.write().await;
183                            p.send_queue.pop_front()
184                        };
185
186                        if let Some(msg) = maybe_msg {
187                            msg.send(&mut write_stream).await?;
188                        } else {
189                            sleep(Duration::from_millis(10)).await;
190                        }
191                    }
192                    #[allow(unreachable_code)]
193                    Ok::<(), PeerError>(())
194                })
195            };
196
197            // Join all tasks
198            let result = tokio::select! {
199              r = reader => r,
200              r = writer => r,
201              r = pinger => r,
202            };
203
204            if let Err(e) = result {
205                Node::log(format!(
206                    "Disconnected peer: {}:{}. Error: {:?}",
207                    peer.read().await.address.ip(),
208                    peer.read().await.address.port(),
209                    e
210                ));
211                let peer_cloned = peer_cloned.clone();
212                let node_cloned = node_cloned.clone();
213                tokio::spawn(async move {
214                    on_fail(peer_cloned, node_cloned).await;
215                });
216            }
217            Ok(())
218        })
219    }
220
221    /// Handle incoming message
222    async fn handle_incoming(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
223        {
224            let mut p = peer.write().await;
225            if let Some(tx) = p.pending.remove(&message.id) {
226                let _ = tx.send(message);
227                return;
228            }
229        }
230
231        Peer::on_message(peer.clone(), node.clone(), message).await;
232    }
233
234    /// Handle incoming message
235    async fn on_message(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
236        if let Err(err) = async {
237            match message.command {
238                Command::Connect => {
239                    Peer::send(peer, message.make_response(Command::AcknowledgeConnection)).await;
240                }
241                Command::AcknowledgeConnection => {
242                    Node::log(format!("Got unhandled AcknowledgeConnection"));
243                }
244                Command::Ping { height } => {
245                    let local_height = node.read().await.blockchain.get_height();
246                    Peer::send(
247                        peer.clone(),
248                        message.make_response(Command::Pong {
249                            height: local_height,
250                        }),
251                    )
252                    .await;
253
254                    // Spawn async task that will independently sync to this node if needed
255                    tokio::spawn(async move {
256                        if local_height < height {
257                            Node::log(format!("[SYNC] Starting sync!"));
258                            {
259                                let mut node = node.write().await;
260                                if node.is_syncing {
261                                    return;
262                                }
263                                node.is_syncing = true;
264                            }
265                            match sync_to_peer(node.clone(), Arc::clone(&peer), height).await {
266                                Ok(()) => {
267                                    Node::log(format!("[SYNC] Complete!"));
268                                }
269                                Err(e) => {
270                                    Node::log(format!("[SYNC] Failed to sync! {}", e.to_string()));
271                                }
272                            }
273                            node.write().await.is_syncing = false;
274                        }
275                    });
276                }
277                Command::Pong { .. } => {
278                    Node::log(format!("Got unhandled Pong"));
279                }
280                Command::GetPeers => {
281                    let peers: Vec<String> = {
282                        let node_read = node.read().await;
283                        let mut peer_addrs = Vec::new();
284                        for p in &node_read.peers {
285                            let p_addr = p.read().await.address.to_string();
286                            peer_addrs.push(p_addr);
287                        }
288                        peer_addrs
289                    };
290                    let response = message.make_response(Command::SendPeers { peers });
291                    Peer::send(peer, response).await;
292                }
293                Command::SendPeers { .. } => {
294                    Node::log(format!("Got unhandled SendPeers"));
295                }
296                Command::NewBlock { ref block } => {
297                    // Make sure block is not in the blockchain
298                    if Some(node.read().await.last_seen_block) != block.hash {
299                        Node::submit_block(node.clone(), block.clone()).await?;
300
301                        Node::log(format!(
302                            "New block accepted: {}",
303                            block.hash.unwrap().dump_base36()
304                        ));
305                    }
306                }
307                Command::NewTransaction { ref transaction } => {
308                    // Check if transaction was already seen
309                    if peer
310                        .read()
311                        .await
312                        .seen_transactions
313                        .contains(&transaction.transaction_id.unwrap())
314                    {
315                        return Ok(());
316                    }
317
318                    Node::submit_transaction(node, transaction.clone()).await?;
319
320                    Node::log(format!(
321                        "New transaction accepted: {}",
322                        transaction.transaction_id.unwrap().dump_base36()
323                    ));
324                }
325                Command::GetBlock { block_hash } => {
326                    Peer::send(
327                        peer,
328                        message.make_response(Command::GetBlockResponse {
329                            block: node.read().await.blockchain.get_block_by_hash(&block_hash),
330                        }),
331                    )
332                    .await;
333                }
334                Command::GetBlockResponse { .. } => {
335                    Node::log(format!("Got unhandled SendBlock"));
336                }
337                Command::GetBlockHashes { start, end } => {
338                    let mut block_hashes = Vec::new();
339                    for i in start..end {
340                        if let Some(block_hash) =
341                            node.read().await.blockchain.get_block_hash_by_height(i)
342                        {
343                            block_hashes.push(*block_hash);
344                        }
345                    }
346                    Peer::send(
347                        peer,
348                        message.make_response(Command::GetBlockHashesResponse { block_hashes }),
349                    )
350                    .await;
351                }
352                Command::GetBlockHashesResponse { .. } => {
353                    Node::log(format!("Got unhandled SendBlockHashes"));
354                }
355            };
356            Ok::<(), PeerError>(())
357        }
358        .await
359        {
360            Node::log(format!("Error processing incoming message: {err}"));
361        }
362    }
363
364    /// Send a request and wait for the response
365    pub async fn request(peer: Arc<RwLock<Peer>>, message: Message) -> Result<Message, PeerError> {
366        let id = message.id;
367
368        let (tx, rx) = oneshot::channel();
369
370        {
371            let mut p = peer.write().await;
372            p.pending.insert(id, tx);
373            p.send_queue.push_back(message);
374        }
375
376        match timeout(Duration::from_secs(10), rx).await {
377            Ok(Ok(msg)) => Ok(msg),
378            Ok(Err(_)) => Err(PeerError::Disconnected),
379            Err(_) => Err(PeerError::Disconnected),
380        }
381    }
382
383    /// Send a message to this peer, without expecting a response
384    pub async fn send(peer: Arc<RwLock<Peer>>, message: Message) {
385        let mut p = peer.write().await;
386        p.send_queue.push_back(message);
387    }
388
389    /// Send this message to all peers but this one
390    pub async fn send_to_peers(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
391        for i_peer in &node.read().await.peers {
392            if !Arc::ptr_eq(&i_peer, &peer) {
393                Peer::send(Arc::clone(i_peer), message.clone()).await;
394            }
395        }
396    }
397}