snap_coin/node/
node.rs

1use num_bigint::BigUint;
2use std::io::{Read, Write};
3use std::net::IpAddr;
4use std::{
5    fs,
6    net::SocketAddr,
7    sync::{Arc, OnceLock},
8};
9use thiserror::Error;
10use tokio::net::TcpStream;
11use tokio::{
12    sync::RwLock,
13    task::{JoinError, JoinHandle},
14};
15
16use crate::crypto::Hash;
17use crate::node::auto_peer::auto_peer;
18use crate::node::server::ServerError;
19use crate::{
20    core::{
21        block::Block,
22        blockchain::{
23            Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
24        },
25        transaction::Transaction,
26    },
27    node::{
28        mempool::MemPool,
29        message::{Command, Message},
30        peer::{Peer, PeerError},
31        server::Server,
32    },
33};
34
35/// Path at which this blockchain is being read and written from and too. This enforces a singleton rule where only one node can exist in one program at once
36static NODE_PATH: OnceLock<String> = OnceLock::new();
37
38#[derive(Error, Debug)]
39pub enum NodeError {
40    #[error("{0}")]
41    PeerError(#[from] PeerError),
42
43    #[error("TCP error: {0}")]
44    IOError(#[from] std::io::Error),
45
46    #[error("Join error: {0}")]
47    JoinError(#[from] JoinError),
48
49    #[error("Server error: {0}")]
50    ServerError(#[from] super::server::ServerError),
51}
52
53/// Handles incoming connections and outbound peers
54pub struct Node {
55    pub peers: Vec<Arc<RwLock<Peer>>>,
56    pub blockchain: Blockchain,
57    pub mempool: MemPool,
58    pub last_seen_block: Hash,
59    pub reserved_ips: Vec<IpAddr>,
60    // Synchronization flag
61    pub is_syncing: bool,
62    pub target_peers: usize,
63    pub port: u16,
64}
65
66impl Node {
67    /// Create a new blockchain (load / create) with default 12 nodes target
68    /// WARNING: Only one instance of this struct can exist in one program
69    pub fn new(node_path: &str, port: u16, reserved_ips: Vec<IpAddr>) -> Arc<RwLock<Self>> {
70        NODE_PATH
71            .set(String::from(node_path))
72            .expect("Only one node can exist at once!");
73        // Clear log file
74        if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
75            fs::create_dir(node_path).expect("Could not create blockchain directory");
76        }
77        fs::OpenOptions::new()
78            .write(true)
79            .truncate(true)
80            .create(true)
81            .open(
82                NODE_PATH
83                    .get()
84                    .expect("One blockchain instance must exist before logging")
85                    .to_owned()
86                    + "/info.log",
87            )
88            .expect("Could not open logging file!");
89        Arc::new(RwLock::new(Node {
90            peers: vec![],
91            blockchain: Blockchain::new(node_path),
92            mempool: MemPool::new(),
93            is_syncing: false,
94            target_peers: 12,
95            port,
96            reserved_ips,
97            last_seen_block: Hash::new_from_buf([0u8; 32]),
98        }))
99    }
100
101    /// Connect to a specified peer
102    pub async fn connect_peer(
103        node: Arc<RwLock<Node>>,
104        address: SocketAddr,
105    ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
106        let peer: Arc<RwLock<Peer>> = Arc::new(RwLock::new(Peer::new(address, false)));
107        let stream = TcpStream::connect(address).await?;
108
109        let handle = Peer::connect(peer.clone(), node, stream).await;
110
111        Ok((peer, handle))
112    }
113
114    /// Initialize this node, with a array of seed nodes which this node will use to connect to
115    /// Starts all handlers
116    /// WARNING: Can only be called once
117    pub async fn init(
118        node: Arc<RwLock<Node>>,
119        seed_nodes: Vec<SocketAddr>,
120    ) -> Result<JoinHandle<Result<(), ServerError>>, NodeError> {
121        let mut peers = Vec::new();
122
123        for addr in seed_nodes {
124            let (peer, _) = Self::connect_peer(node.clone(), addr).await?;
125            peers.push(peer);
126        }
127
128        node.write().await.peers = peers;
129
130        let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
131            Server.init(node.clone(), node.read().await.port).await;
132
133        let node = node.clone();
134        auto_peer(node.clone());
135        node.read().await.mempool.start_expiry_watchdog();
136
137        Ok(server_handle)
138    }
139
140    /// Send some message to all peers
141    pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
142        for i_peer in &node.read().await.peers {
143            Peer::send(Arc::clone(i_peer), message.clone()).await;
144        }
145    }
146
147    /// Submit a new block to the network
148    pub async fn submit_block(
149        node: Arc<RwLock<Node>>,
150        new_block: Block,
151    ) -> Result<(), BlockchainError> {
152        node.write().await.last_seen_block = new_block.hash.unwrap();
153        
154        node.write().await.blockchain.add_block(new_block.clone())?;
155        validate_block_timestamp(&new_block)?;
156
157        // Remove transactions from mempool
158        node.write()
159            .await
160            .mempool
161            .spend_transactions(
162                new_block
163                    .transactions
164                    .iter()
165                    .map(|block_transaction| block_transaction.transaction_id.unwrap())
166                    .collect(),
167            )
168            .await;
169
170        Node::send_to_peers(
171            node.clone(),
172            Message::new(Command::NewBlock {
173                block: new_block.clone(),
174            }),
175        )
176        .await;
177
178        Node::log(format!(
179            "New block accepted: {}",
180            new_block.hash.unwrap().dump_base36()
181        ));
182        Ok(())
183    }
184
185    /// Submit a new transaction to the network to be mined
186    pub async fn submit_transaction(
187        node: Arc<RwLock<Node>>,
188        new_transaction: Transaction,
189    ) -> Result<(), BlockchainError> {
190        let tx_difficulty =
191            BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
192
193        node.read()
194            .await
195            .blockchain
196            .get_utxos()
197            .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
198
199        if !node
200            .read()
201            .await
202            .mempool
203            .validate_transaction(&new_transaction)
204            .await
205        {
206            return Err(BlockchainError::DoubleSpend);
207        }
208        validate_transaction_timestamp(&new_transaction)?;
209
210        node.write()
211            .await
212            .mempool
213            .add_transaction(new_transaction.clone())
214            .await;
215
216        Node::send_to_peers(
217            node.clone(),
218            Message::new(Command::NewTransaction {
219                transaction: new_transaction.clone(),
220            }),
221        )
222        .await;
223        Node::log(format!(
224            "New transaction accepted: {}",
225            new_transaction.transaction_id.unwrap().dump_base36()
226        ));
227        Ok(())
228    }
229
230    /// Log a message to the node log
231    pub fn log(msg: String) {
232        let mut log_file = fs::OpenOptions::new()
233            .append(true)
234            .create(true)
235            .open(
236                NODE_PATH
237                    .get()
238                    .expect("One blockchain instance must exist before logging")
239                    .to_owned()
240                    + "/info.log",
241            )
242            .expect("Could not open logging file!");
243        writeln!(
244            log_file,
245            "[{}] {}",
246            chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
247            msg
248        )
249        .expect("Failed to write to logging file");
250    }
251
252    /// Get last logged line
253    pub fn get_last_log() -> String {
254        let mut log_file = fs::OpenOptions::new()
255            .read(true)
256            .open(
257                NODE_PATH
258                    .get()
259                    .expect("One blockchain instance must exist before logging")
260                    .to_owned()
261                    + "/info.log",
262            )
263            .expect("Could not open logging file!");
264
265        let mut contents = String::new();
266        log_file
267            .read_to_string(&mut contents)
268            .expect("Failed to read logging file");
269
270        // Split by newlines and get the last line
271        contents.lines().last().unwrap_or("").to_string()
272    }
273
274    /// Get last popped line
275    pub fn pop_last_line() -> Option<String> {
276        let path = NODE_PATH
277            .get()
278            .expect("One blockchain instance must exist before logging")
279            .to_owned()
280            + "/info.log";
281
282        // Read the full log
283        let contents = fs::read_to_string(&path).ok()?;
284
285        // Split into lines
286        let mut lines: Vec<&str> = contents.lines().collect();
287
288        // Pop the last line
289        let last_line = lines.pop().map(|s| s.to_string());
290
291        // Write back the remaining lines
292        let new_contents = lines.join("\n");
293        fs::write(&path, new_contents).ok()?;
294
295        last_line
296    }
297}