snap_coin/node/
node.rs

1use futures::future::try_join_all;
2use num_bigint::BigUint;
3use std::io::Write;
4use std::{
5    fs,
6    net::SocketAddr,
7    pin::Pin,
8    str::FromStr,
9    sync::{Arc, OnceLock},
10    time::Duration,
11};
12use thiserror::Error;
13use tokio::{
14    net::TcpStream,
15    sync::RwLock,
16    task::{JoinError, JoinHandle},
17    time::sleep,
18};
19
20use crate::crypto::Hash;
21use crate::{
22    core::{
23        block::Block,
24        blockchain::{
25            Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
26        },
27        transaction::Transaction,
28    },
29    node::{
30        mempool::MemPool,
31        message::{Command, Message},
32        peer::{Peer, PeerError},
33        server::Server,
34    },
35};
36
37/// Path at which this blockchain is being read and written from and too. This enforces a singleton rule where only one node can exist in one program at once
38static NODE_PATH: OnceLock<String> = OnceLock::new();
39
40#[derive(Error, Debug)]
41pub enum NodeError {
42    #[error("{0}")]
43    PeerError(#[from] PeerError),
44
45    #[error("TCP error: {0}")]
46    IOError(#[from] std::io::Error),
47
48    #[error("Join error: {0}")]
49    JoinError(#[from] JoinError),
50
51    #[error("Server error: {0}")]
52    ServerError(#[from] super::server::ServerError),
53}
54
55/// Handles incoming connections and outbound peers
56pub struct Node {
57    pub peers: Vec<Arc<RwLock<Peer>>>,
58    pub blockchain: Blockchain,
59    pub mempool: MemPool,
60    pub last_seen_block: Hash,
61
62    // Synchronization flag
63    pub is_syncing: bool,
64
65    pub target_peers: usize,
66
67    port: u32,
68}
69
70impl Node {
71    /// Create a new blockchain (load / create) with default 12 nodes target
72    /// WARNING: Only one instance of this struct can exist in one program
73    pub fn new(node_path: &str, port: u32) -> Arc<RwLock<Self>> {
74        NODE_PATH
75            .set(String::from(node_path))
76            .expect("Only one node can exist at once!");
77        // Clear log file
78        if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
79            fs::create_dir(node_path).expect("Could not create blockchain directory");
80        }
81        fs::OpenOptions::new()
82            .write(true)
83            .truncate(true)
84            .create(true)
85            .open(
86                NODE_PATH
87                    .get()
88                    .expect("One blockchain instance must exist before logging")
89                    .to_owned()
90                    + "/info.log",
91            )
92            .expect("Could not open logging file!");
93        Arc::new(RwLock::new(Node {
94            peers: vec![],
95            blockchain: Blockchain::new(node_path),
96            mempool: MemPool::new(),
97            is_syncing: false,
98            target_peers: 12,
99            port,
100            last_seen_block: Hash::new_from_buf([0u8; 32]),
101        }))
102    }
103
104    /// Connect to a specified peer
105    async fn connect_peer(
106        node: Arc<RwLock<Node>>,
107        address: SocketAddr,
108    ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
109        let peer = Arc::new(RwLock::new(Peer::new(address)));
110        let stream = TcpStream::connect(address).await?;
111
112        let on_fail = |peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>| {
113            Box::pin(async move {
114                Peer::kill(peer.clone()).await;
115                let peer_address = peer.read().await.address;
116
117                let mut node_peers = node.write().await;
118
119                let mut new_peers = Vec::new();
120                for p in node_peers.peers.drain(..) {
121                    let p_address = p.read().await.address;
122                    if p_address != peer_address {
123                        new_peers.push(p);
124                    }
125                }
126
127                node_peers.peers = new_peers;
128            }) as Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>
129        };
130        let handle = Peer::connect(peer.clone(), node, on_fail, stream).await;
131
132        Ok((peer, handle))
133    }
134
135    /// Initialize this node, with a array of seed nodes which this node will use to connect to
136    /// Starts all handlers
137    /// WARNING: Can only be called once
138    pub async fn init(
139        node: Arc<RwLock<Node>>,
140        seed_nodes: Vec<SocketAddr>,
141    ) -> Result<JoinHandle<Result<(), NodeError>>, NodeError> {
142        let mut peer_handles = Vec::new();
143        let mut peers = Vec::new();
144
145        for addr in seed_nodes {
146            let (peer, handle) = Self::connect_peer(node.clone(), addr).await?;
147            peers.push(peer);
148            peer_handles.push(handle);
149        }
150
151        node.write().await.peers = peers;
152
153        let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
154            Server.init(node.clone(), node.read().await.port).await;
155
156        let node = node.clone();
157        let auto_peer = tokio::spawn(async move {
158            loop {
159                // wait before next peer fetch
160                sleep(Duration::from_secs(30)).await;
161
162                // pull a snapshot of peer list and config outside the lock
163                let (peers_snapshot, target_peers) = {
164                    let guard = node.read().await;
165                    (guard.peers.clone(), guard.target_peers)
166                };
167
168                // do we need more peers?
169                if peers_snapshot.len() < target_peers {
170                    // pick a known peer to ask for more peers
171                    if let Some(fetch_peer) = peers_snapshot.get(0) {
172                        // request peers without holding any lock
173                        let response =
174                            Peer::request(fetch_peer.clone(), Message::new(Command::GetPeers))
175                                .await;
176
177                        // request failed?
178                        let Ok(response) = response else {
179                            Node::log(format!(
180                                "Could not request peers from {}",
181                                fetch_peer.read().await.address,
182                            ));
183                            continue;
184                        };
185
186                        match response.command {
187                            Command::SendPeers { peers } => {
188                                for peer_str in peers {
189                                    // parse address
190                                    let addr = match SocketAddr::from_str(&peer_str) {
191                                        Ok(a) => a,
192                                        Err(_) => {
193                                            Node::log(format!("Fetched peer had invalid address"));
194                                            continue;
195                                        }
196                                    };
197
198                                    // Check if already connected
199                                    let exists = {
200                                        let mut exists = false;
201                                        let guard = node.read().await;
202                                        for p in &guard.peers {
203                                            if p.read().await.address.ip() == addr.ip()
204                                                && p.read().await.address.port() == addr.port()
205                                            {
206                                                exists = true;
207                                                break;
208                                            }
209                                        }
210                                        exists
211                                    };
212
213                                    if exists {
214                                        continue;
215                                    }
216
217                                    // Connect to fetched peer
218                                    let new_peer = Node::connect_peer(node.clone(), addr).await;
219                                    match new_peer {
220                                        Ok(..) => {}
221                                        Err(..) => {
222                                            continue;
223                                        }
224                                    }
225
226                                    Node::log(format!(
227                                        "Connected to new peer (referred by {})",
228                                        fetch_peer.read().await.address
229                                    ));
230
231                                    // Re-check peer count
232                                    let peer_count = {
233                                        let guard = node.read().await;
234                                        guard.peers.len()
235                                    };
236
237                                    if peer_count >= target_peers {
238                                        break;
239                                    }
240                                }
241                            }
242                            _ => {}
243                        }
244                    }
245                }
246            }
247
248            #[allow(unused)]
249            Ok::<(), NodeError>(())
250        });
251
252        let all_handle = tokio::spawn(async move {
253            let auto_peer_error = match auto_peer.await {
254                Ok(Ok(_)) => Ok(()),
255                Ok(Err(e)) => Err(e),
256                Err(e) => Err(NodeError::JoinError(e)),
257            };
258            if auto_peer_error.is_err() {
259                return Err(auto_peer_error.err().unwrap());
260            }
261
262            // Run all peer connections concurrently
263            if let Err(join_err) = try_join_all(peer_handles).await {
264                return Err(NodeError::JoinError(join_err));
265            }
266
267            // Await server result
268            match server_handle.await {
269                Ok(Ok(())) => Ok(()),
270                Ok(Err(e)) => Err(NodeError::ServerError(e)),
271                Err(e) => Err(NodeError::JoinError(e)),
272            }
273        });
274
275        Ok(all_handle)
276    }
277
278    /// Send some message to all peers
279    pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
280        for i_peer in &node.read().await.peers {
281            Peer::send(Arc::clone(i_peer), message.clone()).await;
282        }
283    }
284
285    /// Submit a new block to the network
286    pub async fn submit_block(
287        node: Arc<RwLock<Node>>,
288        new_block: Block,
289    ) -> Result<(), BlockchainError> {
290        node.write().await.blockchain.add_block(new_block.clone())?;
291        validate_block_timestamp(&new_block)?;
292
293        // Remove transactions from mempool
294        node.write()
295            .await
296            .mempool
297            .spend_transactions(
298                new_block
299                    .transactions
300                    .iter()
301                    .map(|block_transaction| block_transaction.transaction_id.unwrap())
302                    .collect(),
303            )
304            .await;
305
306        Node::send_to_peers(
307            node.clone(),
308            Message::new(Command::NewBlock {
309                block: new_block.clone(),
310            }),
311        )
312        .await;
313        {
314            node.write().await.last_seen_block = new_block.hash.unwrap();
315        }
316        Ok(())
317    }
318
319    /// Submit a new transaction to the network to be mined
320    pub async fn submit_transaction(
321        node: Arc<RwLock<Node>>,
322        new_transaction: Transaction,
323    ) -> Result<(), BlockchainError> {
324        let tx_difficulty =
325            BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
326
327        node.read()
328            .await
329            .blockchain
330            .get_utxos()
331            .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
332
333        if !node
334            .read()
335            .await
336            .mempool
337            .validate_transaction(&new_transaction)
338            .await
339        {
340            return Err(BlockchainError::DoubleSpend);
341        }
342        validate_transaction_timestamp(&new_transaction)?;
343
344        node.write()
345            .await
346            .mempool
347            .add_transaction(new_transaction.clone())
348            .await;
349
350        Node::send_to_peers(
351            node.clone(),
352            Message::new(Command::NewTransaction {
353                transaction: new_transaction.clone(),
354            }),
355        )
356        .await;
357        Node::log(format!(
358            "Submitting new tx {}",
359            new_transaction.transaction_id.unwrap().dump_base36()
360        ));
361        Ok(())
362    }
363
364    /// Log a message to the node log
365    pub fn log(msg: String) {
366        let mut log_file = fs::OpenOptions::new()
367            .append(true)
368            .create(true)
369            .open(
370                NODE_PATH
371                    .get()
372                    .expect("One blockchain instance must exist before logging")
373                    .to_owned()
374                    + "/info.log",
375            )
376            .expect("Could not open logging file!");
377        writeln!(
378            log_file,
379            "[{}] {}",
380            chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
381            msg
382        )
383        .expect("Failed to write to logging file");
384    }
385}