snap_coin/node/
node.rs

1use futures::future::try_join_all;
2use num_bigint::BigUint;
3use std::io::Write;
4use std::{
5    fs,
6    net::SocketAddr,
7    pin::Pin,
8    str::FromStr,
9    sync::{Arc, OnceLock},
10    time::Duration,
11};
12use thiserror::Error;
13use tokio::{
14    net::TcpStream,
15    sync::RwLock,
16    task::{JoinError, JoinHandle},
17    time::sleep,
18};
19
20use crate::crypto::Hash;
21use crate::{
22    core::{
23        block::Block,
24        blockchain::{
25            Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
26        },
27        transaction::Transaction,
28    },
29    node::{
30        mempool::MemPool,
31        message::{Command, Message},
32        peer::{Peer, PeerError},
33        server::Server,
34    },
35};
36
37/// Path at which this blockchain is being read and written from and too. This enforces a singleton rule where only one node can exist in one program at once
38static NODE_PATH: OnceLock<String> = OnceLock::new();
39
40#[derive(Error, Debug)]
41pub enum NodeError {
42    #[error("{0}")]
43    PeerError(#[from] PeerError),
44
45    #[error("TCP error: {0}")]
46    IOError(#[from] std::io::Error),
47
48    #[error("Join error: {0}")]
49    JoinError(#[from] JoinError),
50
51    #[error("Server error: {0}")]
52    ServerError(#[from] super::server::ServerError),
53}
54
55/// Handles incoming connections and outbound peers
56pub struct Node {
57    pub peers: Vec<Arc<RwLock<Peer>>>,
58    pub blockchain: Blockchain,
59    pub mempool: MemPool,
60    pub last_seen_block: Hash,
61
62    // Synchronization flag
63    pub is_syncing: bool,
64
65    pub target_peers: usize,
66
67    port: u32,
68}
69
70impl Node {
71    /// Create a new blockchain (load / create) with default 12 nodes target
72    /// WARNING: Only one instance of this struct can exist in one program
73    pub fn new(node_path: &str, port: u32) -> Arc<RwLock<Self>> {
74        NODE_PATH
75            .set(String::from(node_path))
76            .expect("Only one node can exist at once!");
77        // Clear log file
78        if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
79            fs::create_dir(node_path).expect("Could not create blockchain directory");
80        }
81        fs::OpenOptions::new()
82            .write(true)
83            .truncate(true)
84            .create(true)
85            .open(
86                NODE_PATH
87                    .get()
88                    .expect("One blockchain instance must exist before logging")
89                    .to_owned()
90                    + "/info.log",
91            )
92            .expect("Could not open logging file!");
93        Arc::new(RwLock::new(Node {
94            peers: vec![],
95            blockchain: Blockchain::new(node_path),
96            mempool: MemPool::new(),
97            is_syncing: false,
98            target_peers: 12,
99            port,
100            last_seen_block: Hash::new_from_buf([0u8; 32]),
101        }))
102    }
103
104    /// Connect to a specified peer
105    async fn connect_peer(
106        node: Arc<RwLock<Node>>,
107        address: SocketAddr,
108    ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
109        let peer = Arc::new(RwLock::new(Peer::new(address)));
110        let stream = TcpStream::connect(address).await?;
111
112        let on_fail = |peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>| {
113            Box::pin(async move {
114                Peer::kill(peer.clone()).await;
115                let peer_address = peer.read().await.address;
116
117                let mut node_peers = node.write().await;
118
119                let mut new_peers = Vec::new();
120                for p in node_peers.peers.drain(..) {
121                    let p_address = p.read().await.address;
122                    if p_address != peer_address {
123                        new_peers.push(p);
124                    }
125                }
126
127                node_peers.peers = new_peers;
128            }) as Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>
129        };
130        let handle = Peer::connect(peer.clone(), node, on_fail, stream).await;
131
132        Ok((peer, handle))
133    }
134
135    /// Initialize this node, with a array of seed nodes which this node will use to connect to
136    /// Starts all handlers
137    /// WARNING: Can only be called once
138    pub async fn init(
139        node: Arc<RwLock<Node>>,
140        seed_nodes: Vec<SocketAddr>,
141    ) -> Result<JoinHandle<Result<(), NodeError>>, NodeError> {
142        let mut peer_handles = Vec::new();
143        let mut peers = Vec::new();
144
145        for addr in seed_nodes {
146            let (peer, handle) = Self::connect_peer(node.clone(), addr).await?;
147            peers.push(peer);
148            peer_handles.push(handle);
149        }
150
151        node.write().await.peers = peers;
152
153        let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
154            Server.init(node.clone(), node.read().await.port).await;
155
156        let node = node.clone();
157        let auto_peer = tokio::spawn(async move {
158            loop {
159                // wait before next peer fetch
160                sleep(Duration::from_secs(30)).await;
161
162                // pull a snapshot of peer list and config outside the lock
163                let (peers_snapshot, target_peers) = {
164                    let guard = node.read().await;
165                    (guard.peers.clone(), guard.target_peers)
166                };
167
168                // do we need more peers?
169                if peers_snapshot.len() < target_peers {
170                    // pick a known peer to ask for more peers
171                    if let Some(fetch_peer) = peers_snapshot.get(0) {
172                        // request peers without holding any lock
173                        let response =
174                            Peer::request(fetch_peer.clone(), Message::new(Command::GetPeers))
175                                .await;
176
177                        // request failed?
178                        let Ok(response) = response else {
179                            Node::log(format!(
180                                "Could not request peers from {}",
181                                fetch_peer.read().await.address,
182                            ));
183                            continue;
184                        };
185
186                        match response.command {
187                            Command::SendPeers { peers } => {
188                                for peer_str in peers {
189                                    // parse address
190                                    let addr = match SocketAddr::from_str(&peer_str) {
191                                        Ok(a) => a,
192                                        Err(_) => {
193                                            Node::log(format!("Fetched peer had invalid address"));
194                                            continue;
195                                        }
196                                    };
197
198                                    // Check if already connected
199                                    let exists = {
200                                        let mut exists = false;
201                                        let guard = node.read().await;
202                                        for p in &guard.peers {
203                                            if p.read().await.address.ip() == addr.ip()
204                                                && p.read().await.address.port() == addr.port()
205                                            {
206                                                exists = true;
207                                                break;
208                                            }
209                                        }
210                                        exists
211                                    };
212
213                                    if exists {
214                                        continue;
215                                    }
216
217                                    // Connect to fetched peer
218                                    let new_peer = Node::connect_peer(node.clone(), addr).await;
219                                    let Ok((peer_arc, _)) = new_peer else {
220                                        continue;
221                                    };
222
223                                    // Add to our known peers
224                                    {
225                                        let mut guard = node.write().await;
226                                        guard.peers.push(peer_arc);
227                                    }
228
229                                    Node::log(format!(
230                                        "Connected to new peer (referred by {})",
231                                        fetch_peer.read().await.address
232                                    ));
233
234                                    // Re-check peer count
235                                    let peer_count = {
236                                        let guard = node.read().await;
237                                        guard.peers.len()
238                                    };
239
240                                    if peer_count >= target_peers {
241                                        break;
242                                    }
243                                }
244                            }
245                            _ => {}
246                        }
247                    }
248                }
249            }
250
251            #[allow(unused)]
252            Ok::<(), NodeError>(())
253        });
254
255        let all_handle = tokio::spawn(async move {
256            let auto_peer_error = match auto_peer.await {
257                Ok(Ok(_)) => Ok(()),
258                Ok(Err(e)) => Err(e),
259                Err(e) => Err(NodeError::JoinError(e)),
260            };
261            if auto_peer_error.is_err() {
262                return Err(auto_peer_error.err().unwrap());
263            }
264
265            // Run all peer connections concurrently
266            if let Err(join_err) = try_join_all(peer_handles).await {
267                return Err(NodeError::JoinError(join_err));
268            }
269
270            // Await server result
271            match server_handle.await {
272                Ok(Ok(())) => Ok(()),
273                Ok(Err(e)) => Err(NodeError::ServerError(e)),
274                Err(e) => Err(NodeError::JoinError(e)),
275            }
276        });
277
278        Ok(all_handle)
279    }
280
281    /// Send some message to all peers
282    pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
283        for i_peer in &node.read().await.peers {
284            Peer::send(Arc::clone(i_peer), message.clone()).await;
285        }
286    }
287
288    /// Submit a new block to the network
289    pub async fn submit_block(
290        node: Arc<RwLock<Node>>,
291        new_block: Block,
292    ) -> Result<(), BlockchainError> {
293        node.write().await.blockchain.add_block(new_block.clone())?;
294        validate_block_timestamp(&new_block)?;
295
296        // Remove transactions from mempool
297        node.write()
298            .await
299            .mempool
300            .spend_transactions(
301                new_block
302                    .transactions
303                    .iter()
304                    .map(|block_transaction| block_transaction.transaction_id.unwrap())
305                    .collect(),
306            )
307            .await;
308
309        Node::send_to_peers(
310            node.clone(),
311            Message::new(Command::NewBlock {
312                block: new_block.clone(),
313            }),
314        )
315        .await;
316        {
317            node.write().await.last_seen_block = new_block.hash.unwrap();
318        }
319        Ok(())
320    }
321
322    /// Submit a new transaction to the network to be mined
323    pub async fn submit_transaction(
324        node: Arc<RwLock<Node>>,
325        new_transaction: Transaction,
326    ) -> Result<(), BlockchainError> {
327        let tx_difficulty =
328            BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
329
330        node.read()
331            .await
332            .blockchain
333            .get_utxos()
334            .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
335
336        if !node
337            .read()
338            .await
339            .mempool
340            .validate_transaction(&new_transaction)
341            .await
342        {
343            return Err(BlockchainError::DoubleSpend);
344        }
345        validate_transaction_timestamp(&new_transaction)?;
346
347        node.write()
348            .await
349            .mempool
350            .add_transaction(new_transaction.clone())
351            .await;
352
353        Node::send_to_peers(
354            node.clone(),
355            Message::new(Command::NewTransaction {
356                transaction: new_transaction.clone(),
357            }),
358        )
359        .await;
360        Node::log(format!(
361            "Submitting new tx {}",
362            new_transaction.transaction_id.unwrap().dump_base36()
363        ));
364        Ok(())
365    }
366
367    /// Log a message to the node log
368    pub fn log(msg: String) {
369        let mut log_file = fs::OpenOptions::new()
370            .append(true)
371            .create(true)
372            .open(
373                NODE_PATH
374                    .get()
375                    .expect("One blockchain instance must exist before logging")
376                    .to_owned()
377                    + "/info.log",
378            )
379            .expect("Could not open logging file!");
380        writeln!(
381            log_file,
382            "[{}] {}",
383            chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
384            msg
385        )
386        .expect("Failed to write to logging file");
387    }
388}