snap_coin/full_node/
mod.rs1pub mod p2p_server;
3
4pub mod auto_peer;
6
7pub mod mempool;
9
10pub mod node_state;
12
13pub mod ibd;
15
16mod behavior;
18
19mod sync;
21
22use flexi_logger::{Duplicate, FileSpec, Logger};
23use futures::future::join_all;
24use log::{error, info};
25use num_bigint::BigUint;
26use std::{
27 net::SocketAddr,
28 path::PathBuf,
29 sync::{Arc, Once},
30};
31use tokio::net::TcpStream;
32
33use crate::{
34 core::{
35 block::Block,
36 blockchain::{self, Blockchain, BlockchainError},
37 transaction::{Transaction, TransactionError},
38 },
39 full_node::{
40 behavior::FullNodePeerBehavior,
41 node_state::{NodeState, SharedNodeState},
42 },
43 node::{
44 message::{Command, Message},
45 peer::{PeerError, PeerHandle, create_peer},
46 },
47};
48
49pub type SharedBlockchain = Arc<Blockchain>;
50
51static LOGGER_INIT: Once = Once::new();
52
53pub fn create_full_node(
55 node_path: &str,
56 disable_stdout: bool,
57) -> (SharedBlockchain, SharedNodeState) {
58 let node_path = PathBuf::from(node_path);
59
60 LOGGER_INIT.call_once(|| {
61 let log_path = node_path.join("logs");
62 std::fs::create_dir_all(&log_path).expect("Failed to create log directory");
63
64 let mut logger = Logger::try_with_str("info")
65 .unwrap()
66 .log_to_file(FileSpec::default().directory(&log_path));
67
68 if !disable_stdout {
69 logger = logger.duplicate_to_stderr(Duplicate::Info);
70 }
71
72 logger.start().ok(); info!("Logger initialized for node at {:?}", node_path);
75 });
76
77 let node_state = NodeState::new_empty();
78 let node_state_expiry = node_state.clone();
79 node_state
80 .mempool
81 .start_expiry_watchdog(move |transaction| {
82 let _ = node_state_expiry
83 .chain_events
84 .send(node_state::ChainEvent::TransactionExpiration { transaction });
85 });
86
87 let blockchain = Blockchain::new(
88 node_path
89 .join("blockchain")
90 .to_str()
91 .expect("Failed to create node path"),
92 );
93
94 (Arc::new(blockchain), node_state)
95}
96
97pub async fn connect_peer(
99 address: SocketAddr,
100 blockchain: &SharedBlockchain,
101 node_state: &SharedNodeState,
102) -> Result<PeerHandle, PeerError> {
103 let stream = TcpStream::connect(address)
104 .await
105 .map_err(|e| PeerError::Io(format!("IO error: {e}")))?;
106
107 let handle = create_peer(
108 stream,
109 FullNodePeerBehavior::new(blockchain.clone(), node_state.clone()),
110 false,
111 )?;
112 node_state
113 .connected_peers
114 .write()
115 .await
116 .insert(address, handle.clone());
117
118 Ok(handle)
119}
120
121pub async fn to_peers(message: Message, node_state: &SharedNodeState) {
123 let peers_snapshot: Vec<_> = node_state
124 .connected_peers
125 .read()
126 .await
127 .values()
128 .cloned()
129 .collect();
130
131 let futures = peers_snapshot.into_iter().map(|peer| {
133 let message = message.clone();
134 async move {
135 if let Err(err) = peer.request(message).await {
136 if let Err(e) = peer.kill(err.to_string()).await {
137 error!("Failed to kill peer, error: {e}");
138 }
139 }
140 }
141 });
142
143 join_all(futures).await;
145}
146
147pub async fn accept_block(
149 blockchain: &SharedBlockchain,
150 node_state: &SharedNodeState,
151 new_block: Block,
152) -> Result<(), BlockchainError> {
153 new_block.check_completeness()?;
154 let block_hash = new_block.meta.hash.unwrap(); if node_state.last_seen_block() == block_hash {
157 return Ok(()); }
159 node_state.set_last_seen_block(block_hash);
160
161 let _lock = node_state.processing.lock().await;
163
164 blockchain::validate_block_timestamp(&new_block)?;
166 blockchain.add_block(new_block.clone(), false)?;
167
168 node_state
170 .mempool
171 .spend_transactions(
172 new_block
173 .transactions
174 .iter()
175 .map(|tx| tx.transaction_id.unwrap())
176 .collect(),
177 )
178 .await;
179
180 info!("New block accepted: {}", block_hash.dump_base36());
181
182 let _ = node_state.chain_events.send(node_state::ChainEvent::Block {
184 block: new_block.clone(),
185 });
186
187 let node_state = node_state.clone();
188
189 tokio::spawn(async move {
191 to_peers(
192 Message::new(Command::NewBlock { block: new_block }),
193 &node_state,
194 )
195 .await;
196 });
197 Ok(())
198}
199
200pub async fn accept_transaction(
202 blockchain: &SharedBlockchain,
203 node_state: &SharedNodeState,
204 new_transaction: Transaction,
205) -> Result<(), BlockchainError> {
206 new_transaction.check_completeness()?;
207 let transaction_id = new_transaction.transaction_id.unwrap(); if node_state
210 .last_seen_transactions()
211 .contains(&transaction_id)
212 {
213 return Ok(()); }
215 node_state.add_last_seen_transaction(transaction_id);
216
217 if BigUint::from_bytes_be(
218 &node_state
219 .get_live_transaction_difficulty(blockchain.get_transaction_difficulty())
220 .await,
221 ) < BigUint::from_bytes_be(&*transaction_id)
222 {
223 return Err(BlockchainError::LiveTransactionDifficulty);
224 }
225
226 blockchain::validate_transaction_timestamp(&new_transaction)?;
228 blockchain.get_utxos().validate_transaction(
229 &new_transaction,
230 &BigUint::from_bytes_be(&blockchain.get_transaction_difficulty()),
231 false,
232 )?;
233 if !node_state
234 .mempool
235 .validate_transaction(&new_transaction)
236 .await
237 {
238 return Err(TransactionError::DoubleSpend(transaction_id.dump_base36()).into());
239 }
240
241 node_state
242 .mempool
243 .add_transaction(new_transaction.clone())
244 .await;
245
246 let _ = node_state
248 .chain_events
249 .send(node_state::ChainEvent::Transaction {
250 transaction: new_transaction.clone(),
251 });
252
253 let node_state = node_state.clone();
254
255 tokio::spawn(async move {
257 to_peers(
258 Message::new(Command::NewTransaction {
259 transaction: new_transaction,
260 }),
261 &node_state,
262 )
263 .await;
264 });
265 Ok(())
266}