snap_coin/full_node/
mod.rs1pub mod p2p_server;
3
4pub mod auto_peer;
6
7pub mod mempool;
9
10pub mod node_state;
12
13mod behavior;
15
16mod sync;
18
19use flexi_logger::{Duplicate, FileSpec, Logger};
20use futures::future::join_all;
21use log::{error, info};
22use num_bigint::BigUint;
23use std::{
24 net::SocketAddr,
25 path::PathBuf,
26 sync::{Arc, Once},
27};
28use tokio::net::TcpStream;
29
30use crate::{
31 core::{
32 block::Block,
33 blockchain::{self, Blockchain, BlockchainError},
34 transaction::{Transaction, TransactionError},
35 },
36 full_node::{
37 behavior::FullNodePeerBehavior,
38 node_state::{NodeState, SharedNodeState},
39 },
40 node::{
41 message::{Command, Message},
42 peer::{PeerError, PeerHandle, create_peer},
43 },
44};
45
46pub type SharedBlockchain = Arc<Blockchain>;
47
48static LOGGER_INIT: Once = Once::new();
49
50pub fn create_full_node(
52 node_path: &str,
53 disable_stdout: bool,
54) -> (SharedBlockchain, SharedNodeState) {
55 let node_path = PathBuf::from(node_path);
56
57 LOGGER_INIT.call_once(|| {
58 let log_path = node_path.join("logs");
59 std::fs::create_dir_all(&log_path).expect("Failed to create log directory");
60
61 let mut logger = Logger::try_with_str("info")
62 .unwrap()
63 .log_to_file(FileSpec::default().directory(&log_path));
64
65 if !disable_stdout {
66 logger = logger.duplicate_to_stderr(Duplicate::Info);
67 }
68
69 logger.start().ok(); info!("Logger initialized for node at {:?}", node_path);
72 });
73
74 let node_state = NodeState::new_empty();
75 let node_state_expiry = node_state.clone();
76 node_state
77 .mempool
78 .start_expiry_watchdog(move |transaction| {
79 let _ = node_state_expiry
80 .chain_events
81 .send(node_state::ChainEvent::TransactionExpiration { transaction });
82 });
83
84 let blockchain = Blockchain::new(
85 node_path
86 .join("blockchain")
87 .to_str()
88 .expect("Failed to create node path"),
89 );
90
91 (Arc::new(blockchain), node_state)
92}
93
94pub async fn connect_peer(
96 address: SocketAddr,
97 blockchain: &SharedBlockchain,
98 node_state: &SharedNodeState,
99) -> Result<PeerHandle, PeerError> {
100 let stream = TcpStream::connect(address)
101 .await
102 .map_err(|e| PeerError::Io(format!("IO error: {e}")))?;
103
104 let handle = create_peer(
105 stream,
106 FullNodePeerBehavior::new(blockchain.clone(), node_state.clone()),
107 false,
108 )?;
109 node_state
110 .connected_peers
111 .write()
112 .await
113 .insert(address, handle.clone());
114
115 Ok(handle)
116}
117
118pub async fn to_peers(message: Message, node_state: &SharedNodeState) {
120 let peers_snapshot: Vec<_> = node_state
121 .connected_peers
122 .read()
123 .await
124 .values()
125 .cloned()
126 .collect();
127
128 let futures = peers_snapshot.into_iter().map(|peer| {
130 let message = message.clone();
131 async move {
132 if let Err(err) = peer.request(message).await {
133 if let Err(e) = peer.kill(err.to_string()).await {
134 error!("Failed to kill peer, error: {e}");
135 }
136 }
137 }
138 });
139
140 join_all(futures).await;
142}
143
144pub async fn accept_block(
146 blockchain: &SharedBlockchain,
147 node_state: &SharedNodeState,
148 new_block: Block,
149) -> Result<(), BlockchainError> {
150 new_block.check_completeness()?;
151 let block_hash = new_block.meta.hash.unwrap(); if node_state.last_seen_block() == block_hash {
154 return Ok(()); }
156 node_state.set_last_seen_block(block_hash);
157
158 let _lock = node_state.adding_block.lock().await;
160
161 blockchain::validate_block_timestamp(&new_block)?;
163 blockchain.add_block(new_block.clone())?;
164
165 node_state
167 .mempool
168 .spend_transactions(
169 new_block
170 .transactions
171 .iter()
172 .map(|tx| tx.transaction_id.unwrap())
173 .collect(),
174 )
175 .await;
176
177 info!("New block accepted: {}", block_hash.dump_base36());
178
179 let _ = node_state.chain_events.send(node_state::ChainEvent::Block {
181 block: new_block.clone(),
182 });
183
184 let node_state = node_state.clone();
185
186 tokio::spawn(async move {
188 to_peers(
189 Message::new(Command::NewBlock { block: new_block }),
190 &node_state,
191 )
192 .await;
193 });
194 Ok(())
195}
196
197pub async fn accept_transaction(
199 blockchain: &SharedBlockchain,
200 node_state: &SharedNodeState,
201 new_transaction: Transaction,
202) -> Result<(), BlockchainError> {
203 new_transaction.check_completeness()?;
204 let transaction_id = new_transaction.transaction_id.unwrap(); if BigUint::from_bytes_be(
207 &node_state
208 .get_live_transaction_difficulty(blockchain.get_transaction_difficulty())
209 .await,
210 ) < BigUint::from_bytes_be(&*transaction_id)
211 {
212 return Err(BlockchainError::LiveTransactionDifficulty);
213 }
214
215 if node_state
216 .last_seen_transactions()
217 .contains(&transaction_id)
218 {
219 return Ok(()); }
221 node_state.add_last_seen_transaction(transaction_id);
222
223 blockchain::validate_transaction_timestamp(&new_transaction)?;
225 blockchain.get_utxos().validate_transaction(
226 &new_transaction,
227 &BigUint::from_bytes_be(&blockchain.get_transaction_difficulty()),
228 )?;
229 if !node_state
230 .mempool
231 .validate_transaction(&new_transaction)
232 .await
233 {
234 return Err(TransactionError::DoubleSpend(transaction_id.dump_base36()).into());
235 }
236
237 node_state
238 .mempool
239 .add_transaction(new_transaction.clone())
240 .await;
241
242 let _ = node_state
244 .chain_events
245 .send(node_state::ChainEvent::Transaction {
246 transaction: new_transaction.clone(),
247 });
248
249 let node_state = node_state.clone();
250
251 tokio::spawn(async move {
253 to_peers(
254 Message::new(Command::NewTransaction {
255 transaction: new_transaction,
256 }),
257 &node_state,
258 )
259 .await;
260 });
261 Ok(())
262}