snap_coin/node/
peer.rs

1use bincode::error::EncodeError;
2use std::{
3    collections::{HashMap, VecDeque},
4    net::SocketAddr,
5    sync::Arc,
6    time::Duration,
7};
8use thiserror::Error;
9use tokio::{
10    net::TcpStream,
11    sync::{RwLock, oneshot},
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15
16use crate::{
17    core::{blockchain::BlockchainError, utxo::TransactionError},
18    node::{
19        message::{Command, Message, MessageError},
20        node::Node,
21        sync::sync_to_peer,
22    },
23};
24
25#[derive(Error, Debug)]
26pub enum PeerError {
27    #[error("{0}")]
28    MessageError(#[from] MessageError),
29
30    #[error("Disconnected")]
31    Disconnected,
32
33    #[error("Blockchain error: {0}")]
34    BlockchainError(#[from] BlockchainError),
35
36    #[error("Transaction error: {0}")]
37    TransactionError(#[from] TransactionError),
38
39    #[error("Sync peer returned an invalid response")]
40    SyncResponseInvalid,
41
42    #[error("Could not find fork point with peer")]
43    NoForkPoint,
44
45    #[error("Block has invalid difficulty")]
46    BadBlockDifficulty,
47
48    #[error("Block has invalid block hash")]
49    BadBlockHash,
50
51    #[error("Block has no block hash attached")]
52    NoBlockHash,
53
54    #[error("Encode error: {0}")]
55    EncodeError(#[from] EncodeError),
56}
57
58pub const TIMEOUT: Duration = Duration::from_secs(15);
59
60/// A struct representing one peer (peer connection. Can be both a client peer or a connected peer)
61pub struct Peer {
62    pub address: SocketAddr,
63
64    pub is_client: bool,
65
66    // Outgoing messages waiting to be written to stream
67    send_queue: VecDeque<Message>,
68
69    // Pending requests waiting for a response (id -> oneshot sender)
70    pending: HashMap<u16, oneshot::Sender<Message>>,
71}
72
73impl Peer {
74    /// Create a new peer
75    pub fn new(address: SocketAddr, is_client: bool) -> Self {
76        Self {
77            address,
78            is_client,
79            send_queue: VecDeque::new(),
80            pending: HashMap::new(),
81        }
82    }
83
84    async fn on_fail(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>) {
85        let peer_address = peer.read().await.address;
86
87        let mut node_peers = node.write().await;
88
89        let mut new_peers = Vec::new();
90        for p in node_peers.peers.drain(..) {
91            let p_address = p.read().await.address;
92            if p_address != peer_address {
93                new_peers.push(p);
94            }
95        }
96
97        node_peers.peers = new_peers;
98    }
99
100    /// Main connection handler
101    pub async fn connect(
102        peer: Arc<RwLock<Peer>>,
103        node: Arc<RwLock<Node>>,
104        stream: TcpStream,
105    ) -> JoinHandle<Result<(), PeerError>> {
106        let (mut read_stream, mut write_stream) = stream.into_split();
107
108        // Spawn peer handler task
109        tokio::spawn(async move {
110            let peer_cloned = peer.clone();
111            let node_cloned = node.clone();
112
113            // Spawn ping / pong task
114            let pinger = {
115                let peer = peer.clone();
116                let node = node.clone();
117                Box::pin(async move {
118                    loop {
119                        let height = node.read().await.blockchain.get_height();
120                        match Peer::request(
121                            // Send Ping and wait for Pong
122                            peer.clone(),
123                            Message::new(Command::Ping {
124                                height,
125                            }),
126                        )
127                        .await?
128                        .command
129                        {
130                            Command::Pong { .. } => {}
131                            _ => {}
132                        }
133                        sleep(Duration::from_secs(5)).await; // 5 second ping interval
134                    }
135                    #[allow(unreachable_code)]
136                    Ok::<(), PeerError>(())
137                })
138            };
139
140            // Spawn reader task
141            let reader = {
142                let peer = peer.clone();
143                let node = node.clone();
144                Box::pin(async move {
145                    loop {
146                        let msg = Message::from_stream(&mut read_stream).await?;
147                        match timeout(
148                            TIMEOUT,
149                            Peer::handle_incoming(peer.clone(), node.clone(), msg),
150                        )
151                        .await
152                        {
153                            Ok(()) => {}
154                            Err(..) => return Err(PeerError::Disconnected),
155                        }
156                    }
157                    #[allow(unreachable_code)]
158                    Ok::<(), PeerError>(())
159                })
160            };
161
162            // Spawn writer task
163            let writer = {
164                let peer = peer.clone();
165                Box::pin(async move {
166                    loop {
167                        let maybe_msg = {
168                            let mut p = peer.write().await;
169                            p.send_queue.pop_front()
170                        };
171
172                        if let Some(msg) = maybe_msg {
173                            match timeout(TIMEOUT, msg.send(&mut write_stream)).await {
174                                Ok(e) => e?,
175                                Err(..) => return Err(PeerError::Disconnected),
176                            }
177                        } else {
178                            sleep(Duration::from_millis(10)).await;
179                        }
180                    }
181                    #[allow(unreachable_code)]
182                    Ok::<(), PeerError>(())
183                })
184            };
185
186            // Join all tasks
187            let result = tokio::select! {
188              r = reader => r,
189              r = writer => r,
190              r = pinger => r,
191            };
192
193            if let Err(e) = result {
194                Node::log(format!(
195                    "Disconnected peer: {}:{}. Error: {:?}",
196                    peer.read().await.address.ip(),
197                    peer.read().await.address.port(),
198                    e
199                ));
200                let peer_cloned = peer_cloned.clone();
201                let node_cloned = node_cloned.clone();
202
203                tokio::spawn(async move {
204                    Self::on_fail(peer_cloned, node_cloned).await;
205                });
206            }
207            Ok(())
208        })
209    }
210
211    /// Handle incoming message
212    async fn handle_incoming(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
213        {
214            let mut p = peer.write().await;
215            if let Some(tx) = p.pending.remove(&message.id) {
216                let _ = tx.send(message);
217                return;
218            }
219        }
220
221        Peer::on_message(peer.clone(), node.clone(), message).await;
222    }
223
224    /// Handle incoming message
225    async fn on_message(peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>, message: Message) {
226        if let Err(err) = async {
227            match message.command {
228                Command::Connect => {
229                    Peer::send(peer, message.make_response(Command::AcknowledgeConnection)).await;
230                }
231                Command::AcknowledgeConnection => {
232                    Node::log(format!("Got unhandled AcknowledgeConnection"));
233                }
234                Command::Ping { height } => {
235                    let local_height = node.read().await.blockchain.get_height();
236                    Peer::send(
237                        peer.clone(),
238                        message.make_response(Command::Pong {
239                            height: local_height,
240                        }),
241                    )
242                    .await;
243
244                    // Spawn async task that will independently sync to this node if needed
245                    tokio::spawn(async move {
246                        if local_height < height {
247                            Node::log(format!("[SYNC] Starting sync!"));
248                            {
249                                let mut node = node.write().await;
250                                if node.is_syncing {
251                                    return;
252                                }
253                                node.is_syncing = true;
254                            }
255                            match sync_to_peer(node.clone(), Arc::clone(&peer), height).await {
256                                Ok(()) => {
257                                    Node::log(format!("[SYNC] Complete!"));
258                                }
259                                Err(e) => {
260                                    Node::log(format!("[SYNC] Failed to sync! {}", e.to_string()));
261                                }
262                            }
263                            node.write().await.is_syncing = false;
264                        }
265                    });
266                }
267                Command::Pong { .. } => {
268                    Node::log(format!("Got unhandled Pong"));
269                }
270                Command::GetPeers => {
271                    let peers: Vec<String> = {
272                        let node_read = node.read().await;
273                        let mut peer_addrs = Vec::new();
274                        for p in &node_read.peers {
275                            if p.read().await.is_client {
276                                continue;
277                            }
278                            let p_addr = p.read().await.address.to_string();
279                            peer_addrs.push(p_addr);
280                        }
281                        peer_addrs
282                    };
283                    let response = message.make_response(Command::SendPeers { peers });
284                    Peer::send(peer, response).await;
285                }
286                Command::SendPeers { .. } => {
287                    Node::log(format!("Got unhandled SendPeers"));
288                }
289                Command::NewBlock { ref block } => {
290                    // Make sure block is not in the blockchain
291                    if Some(node.read().await.last_seen_block) != block.hash {
292                        Node::submit_block(node.clone(), block.clone()).await?;
293
294                        Node::log(format!(
295                            "New block accepted: {}",
296                            block.hash.unwrap().dump_base36()
297                        ));
298                    }
299                }
300                Command::NewTransaction { ref transaction } => {
301                    // Check if transaction was already seen
302                    if !node.read().await.mempool.validate_transaction(transaction).await {
303                        return Ok(());
304                    }
305
306                    Node::submit_transaction(node, transaction.clone()).await?;
307
308                    Node::log(format!(
309                        "New transaction accepted: {}",
310                        transaction.transaction_id.unwrap().dump_base36()
311                    ));
312                }
313                Command::GetBlock { block_hash } => {
314                    Peer::send(
315                        peer,
316                        message.make_response(Command::GetBlockResponse {
317                            block: node.read().await.blockchain.get_block_by_hash(&block_hash),
318                        }),
319                    )
320                    .await;
321                }
322                Command::GetBlockResponse { .. } => {
323                    Node::log(format!("Got unhandled SendBlock"));
324                }
325                Command::GetBlockHashes { start, end } => {
326                    let mut block_hashes = Vec::new();
327                    for i in start..end {
328                        if let Some(block_hash) =
329                            node.read().await.blockchain.get_block_hash_by_height(i)
330                        {
331                            block_hashes.push(*block_hash);
332                        }
333                    }
334                    Peer::send(
335                        peer,
336                        message.make_response(Command::GetBlockHashesResponse { block_hashes }),
337                    )
338                    .await;
339                }
340                Command::GetBlockHashesResponse { .. } => {
341                    Node::log(format!("Got unhandled SendBlockHashes"));
342                }
343            };
344            Ok::<(), PeerError>(())
345        }
346        .await
347        {
348            Node::log(format!("Error processing incoming message: {err}"));
349        }
350    }
351
352    /// Send a request and wait for the response
353    pub async fn request(peer: Arc<RwLock<Peer>>, message: Message) -> Result<Message, PeerError> {
354        let id = message.id;
355
356        let (tx, rx) = oneshot::channel();
357
358        {
359            let mut p = peer.write().await;
360            p.pending.insert(id, tx);
361            p.send_queue.push_back(message);
362        }
363
364        match timeout(Duration::from_secs(10), rx).await {
365            Ok(Ok(msg)) => Ok(msg),
366            Ok(Err(_)) => Err(PeerError::Disconnected),
367            Err(_) => Err(PeerError::Disconnected),
368        }
369    }
370
371    /// Send a message to this peer, without expecting a response
372    pub async fn send(peer: Arc<RwLock<Peer>>, message: Message) {
373        let mut p = peer.write().await;
374        p.send_queue.push_back(message);
375    }
376
377    /// Send this message to all peers but this one
378    pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
379        // clone the peer list while holding the lock, then drop the lock
380        let peers = {
381            let guard = node.read().await;
382            guard.peers.clone()
383        };
384
385        for peer in peers {
386            // now safe to await
387            Peer::send(peer, message.clone()).await;
388        }
389    }
390}