snap_coin/node/
node.rs

1use num_bigint::BigUint;
2use std::io::{Read, Write};
3use std::{
4    fs,
5    net::SocketAddr,
6    sync::{Arc, OnceLock},
7};
8use thiserror::Error;
9use tokio::net::TcpStream;
10use tokio::{
11    sync::RwLock,
12    task::{JoinError, JoinHandle},
13};
14
15use crate::crypto::Hash;
16use crate::node::auto_peer::auto_peer;
17use crate::node::server::ServerError;
18use crate::{
19    core::{
20        block::Block,
21        blockchain::{
22            Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
23        },
24        transaction::Transaction,
25    },
26    node::{
27        mempool::MemPool,
28        message::{Command, Message},
29        peer::{Peer, PeerError},
30        server::Server,
31    },
32};
33
34/// Path at which this blockchain is being read and written from and too. This enforces a singleton rule where only one node can exist in one program at once
35static NODE_PATH: OnceLock<String> = OnceLock::new();
36
37#[derive(Error, Debug)]
38pub enum NodeError {
39    #[error("{0}")]
40    PeerError(#[from] PeerError),
41
42    #[error("TCP error: {0}")]
43    IOError(#[from] std::io::Error),
44
45    #[error("Join error: {0}")]
46    JoinError(#[from] JoinError),
47
48    #[error("Server error: {0}")]
49    ServerError(#[from] super::server::ServerError),
50}
51
52/// Handles incoming connections and outbound peers
53pub struct Node {
54    pub peers: Vec<Arc<RwLock<Peer>>>,
55    pub blockchain: Blockchain,
56    pub mempool: MemPool,
57    pub last_seen_block: Hash,
58
59    // Synchronization flag
60    pub is_syncing: bool,
61
62    pub target_peers: usize,
63
64    pub port: u16,
65}
66
67impl Node {
68    /// Create a new blockchain (load / create) with default 12 nodes target
69    /// WARNING: Only one instance of this struct can exist in one program
70    pub fn new(node_path: &str, port: u16) -> Arc<RwLock<Self>> {
71        NODE_PATH
72            .set(String::from(node_path))
73            .expect("Only one node can exist at once!");
74        // Clear log file
75        if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
76            fs::create_dir(node_path).expect("Could not create blockchain directory");
77        }
78        fs::OpenOptions::new()
79            .write(true)
80            .truncate(true)
81            .create(true)
82            .open(
83                NODE_PATH
84                    .get()
85                    .expect("One blockchain instance must exist before logging")
86                    .to_owned()
87                    + "/info.log",
88            )
89            .expect("Could not open logging file!");
90        Arc::new(RwLock::new(Node {
91            peers: vec![],
92            blockchain: Blockchain::new(node_path),
93            mempool: MemPool::new(),
94            is_syncing: false,
95            target_peers: 12,
96            port,
97            last_seen_block: Hash::new_from_buf([0u8; 32]),
98        }))
99    }
100
101    /// Connect to a specified peer
102    pub async fn connect_peer(
103        node: Arc<RwLock<Node>>,
104        address: SocketAddr,
105    ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
106        let peer: Arc<RwLock<Peer>> = Arc::new(RwLock::new(Peer::new(address, false)));
107        let stream = TcpStream::connect(address).await?;
108
109        let handle = Peer::connect(peer.clone(), node, stream).await;
110
111        Ok((peer, handle))
112    }
113
114    /// Initialize this node, with a array of seed nodes which this node will use to connect to
115    /// Starts all handlers
116    /// WARNING: Can only be called once
117    pub async fn init(
118        node: Arc<RwLock<Node>>,
119        seed_nodes: Vec<SocketAddr>,
120    ) -> Result<JoinHandle<Result<(), ServerError>>, NodeError> {
121        let mut peers = Vec::new();
122
123        for addr in seed_nodes {
124            let (peer, _) = Self::connect_peer(node.clone(), addr).await?;
125            peers.push(peer);
126        }
127
128        node.write().await.peers = peers;
129
130        let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
131            Server.init(node.clone(), node.read().await.port).await;
132
133        let node = node.clone();
134        auto_peer(node.clone());
135
136        Ok(server_handle)
137    }
138
139    /// Send some message to all peers
140    pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
141        for i_peer in &node.read().await.peers {
142            Peer::send(Arc::clone(i_peer), message.clone()).await;
143        }
144    }
145
146    /// Submit a new block to the network
147    pub async fn submit_block(
148        node: Arc<RwLock<Node>>,
149        new_block: Block,
150    ) -> Result<(), BlockchainError> {
151        node.write().await.blockchain.add_block(new_block.clone())?;
152        validate_block_timestamp(&new_block)?;
153
154        // Remove transactions from mempool
155        node.write()
156            .await
157            .mempool
158            .spend_transactions(
159                new_block
160                    .transactions
161                    .iter()
162                    .map(|block_transaction| block_transaction.transaction_id.unwrap())
163                    .collect(),
164            )
165            .await;
166
167        Node::send_to_peers(
168            node.clone(),
169            Message::new(Command::NewBlock {
170                block: new_block.clone(),
171            }),
172        )
173        .await;
174        {
175            node.write().await.last_seen_block = new_block.hash.unwrap();
176        }
177
178        Node::log(format!(
179            "New block accepted: {}",
180            new_block.hash.unwrap().dump_base36()
181        ));
182        Ok(())
183    }
184
185    /// Submit a new transaction to the network to be mined
186    pub async fn submit_transaction(
187        node: Arc<RwLock<Node>>,
188        new_transaction: Transaction,
189    ) -> Result<(), BlockchainError> {
190        let tx_difficulty =
191            BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
192
193        node.read()
194            .await
195            .blockchain
196            .get_utxos()
197            .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
198
199        if !node
200            .read()
201            .await
202            .mempool
203            .validate_transaction(&new_transaction)
204            .await
205        {
206            return Err(BlockchainError::DoubleSpend);
207        }
208        validate_transaction_timestamp(&new_transaction)?;
209
210        node.write()
211            .await
212            .mempool
213            .add_transaction(new_transaction.clone())
214            .await;
215
216        Node::send_to_peers(
217            node.clone(),
218            Message::new(Command::NewTransaction {
219                transaction: new_transaction.clone(),
220            }),
221        )
222        .await;
223        Node::log(format!(
224            "New transaction accepted: {}",
225            new_transaction.transaction_id.unwrap().dump_base36()
226        ));
227        Ok(())
228    }
229
230    /// Log a message to the node log
231    pub fn log(msg: String) {
232        let mut log_file = fs::OpenOptions::new()
233            .append(true)
234            .create(true)
235            .open(
236                NODE_PATH
237                    .get()
238                    .expect("One blockchain instance must exist before logging")
239                    .to_owned()
240                    + "/info.log",
241            )
242            .expect("Could not open logging file!");
243        writeln!(
244            log_file,
245            "[{}] {}",
246            chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
247            msg
248        )
249        .expect("Failed to write to logging file");
250    }
251
252    /// Get last logged line
253    pub fn get_last_log() -> String {
254        let mut log_file = fs::OpenOptions::new()
255            .read(true)
256            .open(
257                NODE_PATH
258                    .get()
259                    .expect("One blockchain instance must exist before logging")
260                    .to_owned()
261                    + "/info.log",
262            )
263            .expect("Could not open logging file!");
264
265        let mut contents = String::new();
266        log_file
267            .read_to_string(&mut contents)
268            .expect("Failed to read logging file");
269
270        // Split by newlines and get the last line
271        contents.lines().last().unwrap_or("").to_string()
272    }
273
274    /// Get last popped line
275    pub fn pop_last_line() -> Option<String> {
276        let path = NODE_PATH
277            .get()
278            .expect("One blockchain instance must exist before logging")
279            .to_owned()
280            + "/info.log";
281
282        // Read the full log
283        let contents = fs::read_to_string(&path).ok()?;
284
285        // Split into lines
286        let mut lines: Vec<&str> = contents.lines().collect();
287
288        // Pop the last line
289        let last_line = lines.pop().map(|s| s.to_string());
290
291        // Write back the remaining lines
292        let new_contents = lines.join("\n");
293        fs::write(&path, new_contents).ok()?;
294
295        last_line
296    }
297}