1use num_bigint::BigUint;
2use std::io::{Read, Write};
3use std::net::IpAddr;
4use std::{
5 fs,
6 net::SocketAddr,
7 sync::{Arc, OnceLock},
8};
9use thiserror::Error;
10use tokio::net::TcpStream;
11use tokio::{
12 sync::RwLock,
13 task::{JoinError, JoinHandle},
14};
15
16use crate::crypto::Hash;
17use crate::node::auto_peer::auto_peer;
18use crate::node::server::ServerError;
19use crate::{
20 core::{
21 block::Block,
22 blockchain::{
23 Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
24 },
25 transaction::Transaction,
26 },
27 node::{
28 mempool::MemPool,
29 message::{Command, Message},
30 peer::{Peer, PeerError},
31 server::Server,
32 },
33};
34
35static NODE_PATH: OnceLock<String> = OnceLock::new();
37
38#[derive(Error, Debug)]
39pub enum NodeError {
40 #[error("{0}")]
41 PeerError(#[from] PeerError),
42
43 #[error("TCP error: {0}")]
44 IOError(#[from] std::io::Error),
45
46 #[error("Join error: {0}")]
47 JoinError(#[from] JoinError),
48
49 #[error("Server error: {0}")]
50 ServerError(#[from] super::server::ServerError),
51}
52
53pub struct Node {
55 pub peers: Vec<Arc<RwLock<Peer>>>,
56 pub blockchain: Blockchain,
57 pub mempool: MemPool,
58 pub last_seen_block: Hash,
59 pub reserved_ips: Vec<IpAddr>,
60 pub is_syncing: bool,
62 pub target_peers: usize,
63 pub port: u16,
64}
65
66impl Node {
67 pub fn new(node_path: &str, port: u16, reserved_ips: Vec<IpAddr>) -> Arc<RwLock<Self>> {
70 NODE_PATH
71 .set(String::from(node_path))
72 .expect("Only one node can exist at once!");
73 if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
75 fs::create_dir(node_path).expect("Could not create blockchain directory");
76 }
77 fs::OpenOptions::new()
78 .write(true)
79 .truncate(true)
80 .create(true)
81 .open(
82 NODE_PATH
83 .get()
84 .expect("One blockchain instance must exist before logging")
85 .to_owned()
86 + "/info.log",
87 )
88 .expect("Could not open logging file!");
89 Arc::new(RwLock::new(Node {
90 peers: vec![],
91 blockchain: Blockchain::new(node_path),
92 mempool: MemPool::new(),
93 is_syncing: false,
94 target_peers: 12,
95 port,
96 reserved_ips,
97 last_seen_block: Hash::new_from_buf([0u8; 32]),
98 }))
99 }
100
101 pub async fn connect_peer(
103 node: Arc<RwLock<Node>>,
104 address: SocketAddr,
105 ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
106 let peer: Arc<RwLock<Peer>> = Arc::new(RwLock::new(Peer::new(address, false)));
107 let stream = TcpStream::connect(address).await?;
108
109 let handle = Peer::connect(peer.clone(), node, stream).await;
110
111 Ok((peer, handle))
112 }
113
114 pub async fn init(
118 node: Arc<RwLock<Node>>,
119 seed_nodes: Vec<SocketAddr>,
120 ) -> Result<JoinHandle<Result<(), ServerError>>, NodeError> {
121 let mut peers = Vec::new();
122
123 for addr in seed_nodes {
124 let (peer, _) = Self::connect_peer(node.clone(), addr).await?;
125 peers.push(peer);
126 }
127
128 node.write().await.peers = peers;
129
130 let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
131 Server.init(node.clone(), node.read().await.port).await;
132
133 let node = node.clone();
134 auto_peer(node.clone());
135 node.read().await.mempool.start_expiry_watchdog();
136
137 Ok(server_handle)
138 }
139
140 pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
142 for i_peer in &node.read().await.peers {
143 Peer::send(Arc::clone(i_peer), message.clone()).await;
144 }
145 }
146
147 pub async fn submit_block(
149 node: Arc<RwLock<Node>>,
150 new_block: Block,
151 ) -> Result<(), BlockchainError> {
152 node.write().await.last_seen_block = new_block.hash.unwrap();
153
154 node.write().await.blockchain.add_block(new_block.clone())?;
155 validate_block_timestamp(&new_block)?;
156
157 node.write()
159 .await
160 .mempool
161 .spend_transactions(
162 new_block
163 .transactions
164 .iter()
165 .map(|block_transaction| block_transaction.transaction_id.unwrap())
166 .collect(),
167 )
168 .await;
169
170 Node::send_to_peers(
171 node.clone(),
172 Message::new(Command::NewBlock {
173 block: new_block.clone(),
174 }),
175 )
176 .await;
177
178 Node::log(format!(
179 "New block accepted: {}",
180 new_block.hash.unwrap().dump_base36()
181 ));
182 Ok(())
183 }
184
185 pub async fn submit_transaction(
187 node: Arc<RwLock<Node>>,
188 new_transaction: Transaction,
189 ) -> Result<(), BlockchainError> {
190 let tx_difficulty =
191 BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
192
193 node.read()
194 .await
195 .blockchain
196 .get_utxos()
197 .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
198
199 if !node
200 .read()
201 .await
202 .mempool
203 .validate_transaction(&new_transaction)
204 .await
205 {
206 return Err(BlockchainError::DoubleSpend);
207 }
208 validate_transaction_timestamp(&new_transaction)?;
209
210 node.write()
211 .await
212 .mempool
213 .add_transaction(new_transaction.clone())
214 .await;
215
216 Node::send_to_peers(
217 node.clone(),
218 Message::new(Command::NewTransaction {
219 transaction: new_transaction.clone(),
220 }),
221 )
222 .await;
223 Node::log(format!(
224 "New transaction accepted: {}",
225 new_transaction.transaction_id.unwrap().dump_base36()
226 ));
227 Ok(())
228 }
229
230 pub fn log(msg: String) {
232 let mut log_file = fs::OpenOptions::new()
233 .append(true)
234 .create(true)
235 .open(
236 NODE_PATH
237 .get()
238 .expect("One blockchain instance must exist before logging")
239 .to_owned()
240 + "/info.log",
241 )
242 .expect("Could not open logging file!");
243 writeln!(
244 log_file,
245 "[{}] {}",
246 chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
247 msg
248 )
249 .expect("Failed to write to logging file");
250 }
251
252 pub fn get_last_log() -> String {
254 let mut log_file = fs::OpenOptions::new()
255 .read(true)
256 .open(
257 NODE_PATH
258 .get()
259 .expect("One blockchain instance must exist before logging")
260 .to_owned()
261 + "/info.log",
262 )
263 .expect("Could not open logging file!");
264
265 let mut contents = String::new();
266 log_file
267 .read_to_string(&mut contents)
268 .expect("Failed to read logging file");
269
270 contents.lines().last().unwrap_or("").to_string()
272 }
273
274 pub fn pop_last_line() -> Option<String> {
276 let path = NODE_PATH
277 .get()
278 .expect("One blockchain instance must exist before logging")
279 .to_owned()
280 + "/info.log";
281
282 let contents = fs::read_to_string(&path).ok()?;
284
285 let mut lines: Vec<&str> = contents.lines().collect();
287
288 let last_line = lines.pop().map(|s| s.to_string());
290
291 let new_contents = lines.join("\n");
293 fs::write(&path, new_contents).ok()?;
294
295 last_line
296 }
297}