1use futures::future::try_join_all;
2use num_bigint::BigUint;
3use std::io::Write;
4use std::{
5 fs,
6 net::SocketAddr,
7 pin::Pin,
8 str::FromStr,
9 sync::{Arc, OnceLock},
10 time::Duration,
11};
12use thiserror::Error;
13use tokio::{
14 net::TcpStream,
15 sync::RwLock,
16 task::{JoinError, JoinHandle},
17 time::sleep,
18};
19
20use crate::crypto::Hash;
21use crate::{
22 core::{
23 block::Block,
24 blockchain::{
25 Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
26 },
27 transaction::Transaction,
28 },
29 node::{
30 mempool::MemPool,
31 message::{Command, Message},
32 peer::{Peer, PeerError},
33 server::Server,
34 },
35};
36
37static NODE_PATH: OnceLock<String> = OnceLock::new();
39
40#[derive(Error, Debug)]
41pub enum NodeError {
42 #[error("{0}")]
43 PeerError(#[from] PeerError),
44
45 #[error("TCP error: {0}")]
46 IOError(#[from] std::io::Error),
47
48 #[error("Join error: {0}")]
49 JoinError(#[from] JoinError),
50
51 #[error("Server error: {0}")]
52 ServerError(#[from] super::server::ServerError),
53}
54
55pub struct Node {
57 pub peers: Vec<Arc<RwLock<Peer>>>,
58 pub blockchain: Blockchain,
59 pub mempool: MemPool,
60 pub last_seen_block: Hash,
61
62 pub is_syncing: bool,
64
65 pub target_peers: usize,
66
67 port: u32,
68}
69
70impl Node {
71 pub fn new(node_path: &str, port: u32) -> Arc<RwLock<Self>> {
74 NODE_PATH
75 .set(String::from(node_path))
76 .expect("Only one node can exist at once!");
77 if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
79 fs::create_dir(node_path).expect("Could not create blockchain directory");
80 }
81 fs::OpenOptions::new()
82 .write(true)
83 .truncate(true)
84 .create(true)
85 .open(
86 NODE_PATH
87 .get()
88 .expect("One blockchain instance must exist before logging")
89 .to_owned()
90 + "/info.log",
91 )
92 .expect("Could not open logging file!");
93 Arc::new(RwLock::new(Node {
94 peers: vec![],
95 blockchain: Blockchain::new(node_path),
96 mempool: MemPool::new(),
97 is_syncing: false,
98 target_peers: 12,
99 port,
100 last_seen_block: Hash::new_from_buf([0u8; 32]),
101 }))
102 }
103
104 async fn connect_peer(
106 node: Arc<RwLock<Node>>,
107 address: SocketAddr,
108 ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
109 let peer = Arc::new(RwLock::new(Peer::new(address)));
110 let stream = TcpStream::connect(address).await?;
111
112 let on_fail = |peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>| {
113 Box::pin(async move {
114 Peer::kill(peer.clone()).await;
115 let peer_address = peer.read().await.address;
116
117 let mut node_peers = node.write().await;
118
119 let mut new_peers = Vec::new();
120 for p in node_peers.peers.drain(..) {
121 let p_address = p.read().await.address;
122 if p_address != peer_address {
123 new_peers.push(p);
124 }
125 }
126
127 node_peers.peers = new_peers;
128 }) as Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>
129 };
130 let handle = Peer::connect(peer.clone(), node, on_fail, stream).await;
131
132 Ok((peer, handle))
133 }
134
135 pub async fn init(
139 node: Arc<RwLock<Node>>,
140 seed_nodes: Vec<SocketAddr>,
141 ) -> Result<JoinHandle<Result<(), NodeError>>, NodeError> {
142 let mut peer_handles = Vec::new();
143 let mut peers = Vec::new();
144
145 for addr in seed_nodes {
146 let (peer, handle) = Self::connect_peer(node.clone(), addr).await?;
147 peers.push(peer);
148 peer_handles.push(handle);
149 }
150
151 node.write().await.peers = peers;
152
153 let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
154 Server.init(node.clone(), node.read().await.port).await;
155
156 let node = node.clone();
157 let auto_peer = tokio::spawn(async move {
158 loop {
159 sleep(Duration::from_secs(30)).await;
161
162 let (peers_snapshot, target_peers) = {
164 let guard = node.read().await;
165 (guard.peers.clone(), guard.target_peers)
166 };
167
168 if peers_snapshot.len() < target_peers {
170 if let Some(fetch_peer) = peers_snapshot.get(0) {
172 let response =
174 Peer::request(fetch_peer.clone(), Message::new(Command::GetPeers))
175 .await;
176
177 let Ok(response) = response else {
179 Node::log(format!(
180 "Could not request peers from {}",
181 fetch_peer.read().await.address,
182 ));
183 continue;
184 };
185
186 match response.command {
187 Command::SendPeers { peers } => {
188 for peer_str in peers {
189 let addr = match SocketAddr::from_str(&peer_str) {
191 Ok(a) => a,
192 Err(_) => {
193 Node::log(format!("Fetched peer had invalid address"));
194 continue;
195 }
196 };
197
198 let exists = {
200 let mut exists = false;
201 let guard = node.read().await;
202 for p in &guard.peers {
203 if p.read().await.address.ip() == addr.ip()
204 && p.read().await.address.port() == addr.port()
205 {
206 exists = true;
207 break;
208 }
209 }
210 exists
211 };
212
213 if exists {
214 continue;
215 }
216
217 let new_peer = Node::connect_peer(node.clone(), addr).await;
219 let Ok((peer_arc, _)) = new_peer else {
220 continue;
221 };
222
223 {
225 let mut guard = node.write().await;
226 guard.peers.push(peer_arc);
227 }
228
229 Node::log(format!(
230 "Connected to new peer (referred by {})",
231 fetch_peer.read().await.address
232 ));
233
234 let peer_count = {
236 let guard = node.read().await;
237 guard.peers.len()
238 };
239
240 if peer_count >= target_peers {
241 break;
242 }
243 }
244 }
245 _ => {}
246 }
247 }
248 }
249 }
250
251 #[allow(unused)]
252 Ok::<(), NodeError>(())
253 });
254
255 let all_handle = tokio::spawn(async move {
256 let auto_peer_error = match auto_peer.await {
257 Ok(Ok(_)) => Ok(()),
258 Ok(Err(e)) => Err(e),
259 Err(e) => Err(NodeError::JoinError(e)),
260 };
261 if auto_peer_error.is_err() {
262 return Err(auto_peer_error.err().unwrap());
263 }
264
265 if let Err(join_err) = try_join_all(peer_handles).await {
267 return Err(NodeError::JoinError(join_err));
268 }
269
270 match server_handle.await {
272 Ok(Ok(())) => Ok(()),
273 Ok(Err(e)) => Err(NodeError::ServerError(e)),
274 Err(e) => Err(NodeError::JoinError(e)),
275 }
276 });
277
278 Ok(all_handle)
279 }
280
281 pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
283 for i_peer in &node.read().await.peers {
284 Peer::send(Arc::clone(i_peer), message.clone()).await;
285 }
286 }
287
288 pub async fn submit_block(
290 node: Arc<RwLock<Node>>,
291 new_block: Block,
292 ) -> Result<(), BlockchainError> {
293 node.write().await.blockchain.add_block(new_block.clone())?;
294 validate_block_timestamp(&new_block)?;
295
296 node.write()
298 .await
299 .mempool
300 .spend_transactions(
301 new_block
302 .transactions
303 .iter()
304 .map(|block_transaction| block_transaction.transaction_id.unwrap())
305 .collect(),
306 )
307 .await;
308
309 Node::send_to_peers(
310 node.clone(),
311 Message::new(Command::NewBlock {
312 block: new_block.clone(),
313 }),
314 )
315 .await;
316 {
317 node.write().await.last_seen_block = new_block.hash.unwrap();
318 }
319 Ok(())
320 }
321
322 pub async fn submit_transaction(
324 node: Arc<RwLock<Node>>,
325 new_transaction: Transaction,
326 ) -> Result<(), BlockchainError> {
327 let tx_difficulty =
328 BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
329
330 node.read()
331 .await
332 .blockchain
333 .get_utxos()
334 .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
335
336 if !node
337 .read()
338 .await
339 .mempool
340 .validate_transaction(&new_transaction)
341 .await
342 {
343 return Err(BlockchainError::DoubleSpend);
344 }
345 validate_transaction_timestamp(&new_transaction)?;
346
347 node.write()
348 .await
349 .mempool
350 .add_transaction(new_transaction.clone())
351 .await;
352
353 Node::send_to_peers(
354 node.clone(),
355 Message::new(Command::NewTransaction {
356 transaction: new_transaction.clone(),
357 }),
358 )
359 .await;
360 Node::log(format!(
361 "Submitting new tx {}",
362 new_transaction.transaction_id.unwrap().dump_base36()
363 ));
364 Ok(())
365 }
366
367 pub fn log(msg: String) {
369 let mut log_file = fs::OpenOptions::new()
370 .append(true)
371 .create(true)
372 .open(
373 NODE_PATH
374 .get()
375 .expect("One blockchain instance must exist before logging")
376 .to_owned()
377 + "/info.log",
378 )
379 .expect("Could not open logging file!");
380 writeln!(
381 log_file,
382 "[{}] {}",
383 chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
384 msg
385 )
386 .expect("Failed to write to logging file");
387 }
388}