snap_coin/node/
node.rs

1use num_bigint::BigUint;
2use std::io::{Read, Write};
3use std::net::IpAddr;
4use std::{
5    fs,
6    net::SocketAddr,
7    sync::{Arc, OnceLock},
8};
9use thiserror::Error;
10use tokio::net::TcpStream;
11use tokio::{
12    sync::RwLock,
13    task::{JoinError, JoinHandle},
14};
15
16use crate::crypto::Hash;
17use crate::node::auto_peer::auto_peer;
18use crate::node::server::ServerError;
19use crate::{
20    core::{
21        block::Block,
22        blockchain::{
23            Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
24        },
25        transaction::Transaction,
26    },
27    node::{
28        mempool::MemPool,
29        message::{Command, Message},
30        peer::{Peer, PeerError},
31        server::Server,
32    },
33};
34
35/// Path at which this blockchain is being read and written from and too. This enforces a singleton rule where only one node can exist in one program at once
36static NODE_PATH: OnceLock<String> = OnceLock::new();
37
38#[derive(Error, Debug)]
39pub enum NodeError {
40    #[error("{0}")]
41    PeerError(#[from] PeerError),
42
43    #[error("TCP error: {0}")]
44    IOError(#[from] std::io::Error),
45
46    #[error("Join error: {0}")]
47    JoinError(#[from] JoinError),
48
49    #[error("Server error: {0}")]
50    ServerError(#[from] super::server::ServerError),
51}
52
53/// Handles incoming connections and outbound peers
54pub struct Node {
55    pub peers: Vec<Arc<RwLock<Peer>>>,
56    pub blockchain: Blockchain,
57    pub mempool: MemPool,
58    pub last_seen_block: Hash,
59    pub reserved_ips: Vec<IpAddr>,
60    // Synchronization flag
61    pub is_syncing: bool,
62    pub target_peers: usize,
63    pub port: u16,
64}
65
66impl Node {
67    /// Create a new blockchain (load / create) with default 12 nodes target
68    /// WARNING: Only one instance of this struct can exist in one program
69    pub fn new(node_path: &str, port: u16, reserved_ips: Vec<IpAddr>) -> Arc<RwLock<Self>> {
70        NODE_PATH
71            .set(String::from(node_path))
72            .expect("Only one node can exist at once!");
73        // Clear log file
74        if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
75            fs::create_dir(node_path).expect("Could not create blockchain directory");
76        }
77        fs::OpenOptions::new()
78            .write(true)
79            .truncate(true)
80            .create(true)
81            .open(
82                NODE_PATH
83                    .get()
84                    .expect("One blockchain instance must exist before logging")
85                    .to_owned()
86                    + "/info.log",
87            )
88            .expect("Could not open logging file!");
89        Arc::new(RwLock::new(Node {
90            peers: vec![],
91            blockchain: Blockchain::new(node_path),
92            mempool: MemPool::new(),
93            is_syncing: false,
94            target_peers: 12,
95            port,
96            reserved_ips,
97            last_seen_block: Hash::new_from_buf([0u8; 32]),
98        }))
99    }
100
101    /// Connect to a specified peer
102    pub async fn connect_peer(
103        node: Arc<RwLock<Node>>,
104        address: SocketAddr,
105    ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
106        let peer: Arc<RwLock<Peer>> = Arc::new(RwLock::new(Peer::new(address, false)));
107        let stream = TcpStream::connect(address).await?;
108
109        let handle = Peer::connect(peer.clone(), node, stream).await;
110
111        Ok((peer, handle))
112    }
113
114    /// Initialize this node, with a array of seed nodes which this node will use to connect to
115    /// Starts all handlers
116    /// WARNING: Can only be called once
117    pub async fn init(
118        node: Arc<RwLock<Node>>,
119        seed_nodes: Vec<SocketAddr>,
120    ) -> Result<JoinHandle<Result<(), ServerError>>, NodeError> {
121        let mut peers = Vec::new();
122
123        for addr in seed_nodes {
124            let (peer, _) = Self::connect_peer(node.clone(), addr).await?;
125            peers.push(peer);
126        }
127
128        node.write().await.peers = peers;
129
130        let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
131            Server.init(node.clone(), node.read().await.port).await;
132
133        let node = node.clone();
134        auto_peer(node.clone());
135
136        Ok(server_handle)
137    }
138
139    /// Send some message to all peers
140    pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
141        for i_peer in &node.read().await.peers {
142            Peer::send(Arc::clone(i_peer), message.clone()).await;
143        }
144    }
145
146    /// Submit a new block to the network
147    pub async fn submit_block(
148        node: Arc<RwLock<Node>>,
149        new_block: Block,
150    ) -> Result<(), BlockchainError> {
151        node.write().await.last_seen_block = new_block.hash.unwrap();
152        
153        node.write().await.blockchain.add_block(new_block.clone())?;
154        validate_block_timestamp(&new_block)?;
155
156        // Remove transactions from mempool
157        node.write()
158            .await
159            .mempool
160            .spend_transactions(
161                new_block
162                    .transactions
163                    .iter()
164                    .map(|block_transaction| block_transaction.transaction_id.unwrap())
165                    .collect(),
166            )
167            .await;
168
169        Node::send_to_peers(
170            node.clone(),
171            Message::new(Command::NewBlock {
172                block: new_block.clone(),
173            }),
174        )
175        .await;
176
177        Node::log(format!(
178            "New block accepted: {}",
179            new_block.hash.unwrap().dump_base36()
180        ));
181        Ok(())
182    }
183
184    /// Submit a new transaction to the network to be mined
185    pub async fn submit_transaction(
186        node: Arc<RwLock<Node>>,
187        new_transaction: Transaction,
188    ) -> Result<(), BlockchainError> {
189        let tx_difficulty =
190            BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
191
192        node.read()
193            .await
194            .blockchain
195            .get_utxos()
196            .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
197
198        if !node
199            .read()
200            .await
201            .mempool
202            .validate_transaction(&new_transaction)
203            .await
204        {
205            return Err(BlockchainError::DoubleSpend);
206        }
207        validate_transaction_timestamp(&new_transaction)?;
208
209        node.write()
210            .await
211            .mempool
212            .add_transaction(new_transaction.clone())
213            .await;
214
215        Node::send_to_peers(
216            node.clone(),
217            Message::new(Command::NewTransaction {
218                transaction: new_transaction.clone(),
219            }),
220        )
221        .await;
222        Node::log(format!(
223            "New transaction accepted: {}",
224            new_transaction.transaction_id.unwrap().dump_base36()
225        ));
226        Ok(())
227    }
228
229    /// Log a message to the node log
230    pub fn log(msg: String) {
231        let mut log_file = fs::OpenOptions::new()
232            .append(true)
233            .create(true)
234            .open(
235                NODE_PATH
236                    .get()
237                    .expect("One blockchain instance must exist before logging")
238                    .to_owned()
239                    + "/info.log",
240            )
241            .expect("Could not open logging file!");
242        writeln!(
243            log_file,
244            "[{}] {}",
245            chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
246            msg
247        )
248        .expect("Failed to write to logging file");
249    }
250
251    /// Get last logged line
252    pub fn get_last_log() -> String {
253        let mut log_file = fs::OpenOptions::new()
254            .read(true)
255            .open(
256                NODE_PATH
257                    .get()
258                    .expect("One blockchain instance must exist before logging")
259                    .to_owned()
260                    + "/info.log",
261            )
262            .expect("Could not open logging file!");
263
264        let mut contents = String::new();
265        log_file
266            .read_to_string(&mut contents)
267            .expect("Failed to read logging file");
268
269        // Split by newlines and get the last line
270        contents.lines().last().unwrap_or("").to_string()
271    }
272
273    /// Get last popped line
274    pub fn pop_last_line() -> Option<String> {
275        let path = NODE_PATH
276            .get()
277            .expect("One blockchain instance must exist before logging")
278            .to_owned()
279            + "/info.log";
280
281        // Read the full log
282        let contents = fs::read_to_string(&path).ok()?;
283
284        // Split into lines
285        let mut lines: Vec<&str> = contents.lines().collect();
286
287        // Pop the last line
288        let last_line = lines.pop().map(|s| s.to_string());
289
290        // Write back the remaining lines
291        let new_contents = lines.join("\n");
292        fs::write(&path, new_contents).ok()?;
293
294        last_line
295    }
296}