1use futures::future::try_join_all;
2use num_bigint::BigUint;
3use std::io::Write;
4use std::{
5 fs,
6 net::SocketAddr,
7 pin::Pin,
8 str::FromStr,
9 sync::{Arc, OnceLock},
10 time::Duration,
11};
12use thiserror::Error;
13use tokio::{
14 net::TcpStream,
15 sync::RwLock,
16 task::{JoinError, JoinHandle},
17 time::sleep,
18};
19
20use crate::crypto::Hash;
21use crate::{
22 core::{
23 block::Block,
24 blockchain::{
25 Blockchain, BlockchainError, validate_block_timestamp, validate_transaction_timestamp,
26 },
27 transaction::Transaction,
28 },
29 node::{
30 mempool::MemPool,
31 message::{Command, Message},
32 peer::{Peer, PeerError},
33 server::Server,
34 },
35};
36
37static NODE_PATH: OnceLock<String> = OnceLock::new();
39
40#[derive(Error, Debug)]
41pub enum NodeError {
42 #[error("{0}")]
43 PeerError(#[from] PeerError),
44
45 #[error("TCP error: {0}")]
46 IOError(#[from] std::io::Error),
47
48 #[error("Join error: {0}")]
49 JoinError(#[from] JoinError),
50
51 #[error("Server error: {0}")]
52 ServerError(#[from] super::server::ServerError),
53}
54
55pub struct Node {
57 pub peers: Vec<Arc<RwLock<Peer>>>,
58 pub blockchain: Blockchain,
59 pub mempool: MemPool,
60 pub last_seen_block: Hash,
61
62 pub is_syncing: bool,
64
65 pub target_peers: usize,
66
67 port: u32,
68}
69
70impl Node {
71 pub fn new(node_path: &str, port: u32) -> Arc<RwLock<Self>> {
74 NODE_PATH
75 .set(String::from(node_path))
76 .expect("Only one node can exist at once!");
77 if !fs::exists(node_path).expect("failed to check if blockchain dir exists") {
79 fs::create_dir(node_path).expect("Could not create blockchain directory");
80 }
81 fs::OpenOptions::new()
82 .write(true)
83 .truncate(true)
84 .create(true)
85 .open(
86 NODE_PATH
87 .get()
88 .expect("One blockchain instance must exist before logging")
89 .to_owned()
90 + "/info.log",
91 )
92 .expect("Could not open logging file!");
93 Arc::new(RwLock::new(Node {
94 peers: vec![],
95 blockchain: Blockchain::new(node_path),
96 mempool: MemPool::new(),
97 is_syncing: false,
98 target_peers: 12,
99 port,
100 last_seen_block: Hash::new_from_buf([0u8; 32]),
101 }))
102 }
103
104 async fn connect_peer(
106 node: Arc<RwLock<Node>>,
107 address: SocketAddr,
108 ) -> Result<(Arc<RwLock<Peer>>, JoinHandle<Result<(), PeerError>>), NodeError> {
109 let peer = Arc::new(RwLock::new(Peer::new(address)));
110 let stream = TcpStream::connect(address).await?;
111
112 let on_fail = |peer: Arc<RwLock<Peer>>, node: Arc<RwLock<Node>>| {
113 Box::pin(async move {
114 Peer::kill(peer.clone()).await;
115 let peer_address = peer.read().await.address;
116
117 let mut node_peers = node.write().await;
118
119 let mut new_peers = Vec::new();
120 for p in node_peers.peers.drain(..) {
121 let p_address = p.read().await.address;
122 if p_address != peer_address {
123 new_peers.push(p);
124 }
125 }
126
127 node_peers.peers = new_peers;
128 }) as Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>
129 };
130 let handle = Peer::connect(peer.clone(), node, on_fail, stream).await;
131
132 Ok((peer, handle))
133 }
134
135 pub async fn init(
139 node: Arc<RwLock<Node>>,
140 seed_nodes: Vec<SocketAddr>,
141 ) -> Result<JoinHandle<Result<(), NodeError>>, NodeError> {
142 let mut peer_handles = Vec::new();
143 let mut peers = Vec::new();
144
145 for addr in seed_nodes {
146 let (peer, handle) = Self::connect_peer(node.clone(), addr).await?;
147 peers.push(peer);
148 peer_handles.push(handle);
149 }
150
151 node.write().await.peers = peers;
152
153 let server_handle: JoinHandle<Result<(), super::server::ServerError>> =
154 Server.init(node.clone(), node.read().await.port).await;
155
156 let node = node.clone();
157 let auto_peer = tokio::spawn(async move {
158 loop {
159 sleep(Duration::from_secs(30)).await;
161
162 let (peers_snapshot, target_peers) = {
164 let guard = node.read().await;
165 (guard.peers.clone(), guard.target_peers)
166 };
167
168 if peers_snapshot.len() < target_peers {
170 if let Some(fetch_peer) = peers_snapshot.get(0) {
172 let response =
174 Peer::request(fetch_peer.clone(), Message::new(Command::GetPeers))
175 .await;
176
177 let Ok(response) = response else {
179 Node::log(format!(
180 "Could not request peers from {}",
181 fetch_peer.read().await.address,
182 ));
183 continue;
184 };
185
186 match response.command {
187 Command::SendPeers { peers } => {
188 for peer_str in peers {
189 let addr = match SocketAddr::from_str(&peer_str) {
191 Ok(a) => a,
192 Err(_) => {
193 Node::log(format!("Fetched peer had invalid address"));
194 continue;
195 }
196 };
197
198 let exists = {
200 let mut exists = false;
201 let guard = node.read().await;
202 for p in &guard.peers {
203 if p.read().await.address.ip() == addr.ip()
204 && p.read().await.address.port() == addr.port()
205 {
206 exists = true;
207 break;
208 }
209 }
210 exists
211 };
212
213 if exists {
214 continue;
215 }
216
217 let new_peer = Node::connect_peer(node.clone(), addr).await;
219 match new_peer {
220 Ok(..) => {}
221 Err(..) => {
222 continue;
223 }
224 }
225
226 Node::log(format!(
227 "Connected to new peer (referred by {})",
228 fetch_peer.read().await.address
229 ));
230
231 let peer_count = {
233 let guard = node.read().await;
234 guard.peers.len()
235 };
236
237 if peer_count >= target_peers {
238 break;
239 }
240 }
241 }
242 _ => {}
243 }
244 }
245 }
246 }
247
248 #[allow(unused)]
249 Ok::<(), NodeError>(())
250 });
251
252 let all_handle = tokio::spawn(async move {
253 let auto_peer_error = match auto_peer.await {
254 Ok(Ok(_)) => Ok(()),
255 Ok(Err(e)) => Err(e),
256 Err(e) => Err(NodeError::JoinError(e)),
257 };
258 if auto_peer_error.is_err() {
259 return Err(auto_peer_error.err().unwrap());
260 }
261
262 if let Err(join_err) = try_join_all(peer_handles).await {
264 return Err(NodeError::JoinError(join_err));
265 }
266
267 match server_handle.await {
269 Ok(Ok(())) => Ok(()),
270 Ok(Err(e)) => Err(NodeError::ServerError(e)),
271 Err(e) => Err(NodeError::JoinError(e)),
272 }
273 });
274
275 Ok(all_handle)
276 }
277
278 pub async fn send_to_peers(node: Arc<RwLock<Node>>, message: Message) {
280 for i_peer in &node.read().await.peers {
281 Peer::send(Arc::clone(i_peer), message.clone()).await;
282 }
283 }
284
285 pub async fn submit_block(
287 node: Arc<RwLock<Node>>,
288 new_block: Block,
289 ) -> Result<(), BlockchainError> {
290 node.write().await.blockchain.add_block(new_block.clone())?;
291 validate_block_timestamp(&new_block)?;
292
293 node.write()
295 .await
296 .mempool
297 .spend_transactions(
298 new_block
299 .transactions
300 .iter()
301 .map(|block_transaction| block_transaction.transaction_id.unwrap())
302 .collect(),
303 )
304 .await;
305
306 Node::send_to_peers(
307 node.clone(),
308 Message::new(Command::NewBlock {
309 block: new_block.clone(),
310 }),
311 )
312 .await;
313 {
314 node.write().await.last_seen_block = new_block.hash.unwrap();
315 }
316 Ok(())
317 }
318
319 pub async fn submit_transaction(
321 node: Arc<RwLock<Node>>,
322 new_transaction: Transaction,
323 ) -> Result<(), BlockchainError> {
324 let tx_difficulty =
325 BigUint::from_bytes_be(&node.read().await.blockchain.get_transaction_difficulty());
326
327 node.read()
328 .await
329 .blockchain
330 .get_utxos()
331 .validate_transaction(&new_transaction.clone(), &tx_difficulty)?;
332
333 if !node
334 .read()
335 .await
336 .mempool
337 .validate_transaction(&new_transaction)
338 .await
339 {
340 return Err(BlockchainError::DoubleSpend);
341 }
342 validate_transaction_timestamp(&new_transaction)?;
343
344 node.write()
345 .await
346 .mempool
347 .add_transaction(new_transaction.clone())
348 .await;
349
350 Node::send_to_peers(
351 node.clone(),
352 Message::new(Command::NewTransaction {
353 transaction: new_transaction.clone(),
354 }),
355 )
356 .await;
357 Node::log(format!(
358 "Submitting new tx {}",
359 new_transaction.transaction_id.unwrap().dump_base36()
360 ));
361 Ok(())
362 }
363
364 pub fn log(msg: String) {
366 let mut log_file = fs::OpenOptions::new()
367 .append(true)
368 .create(true)
369 .open(
370 NODE_PATH
371 .get()
372 .expect("One blockchain instance must exist before logging")
373 .to_owned()
374 + "/info.log",
375 )
376 .expect("Could not open logging file!");
377 writeln!(
378 log_file,
379 "[{}] {}",
380 chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
381 msg
382 )
383 .expect("Failed to write to logging file");
384 }
385}