Skip to main content

snap_coin/full_node/
mod.rs

1/// Listens for incoming peer connections
2pub mod p2p_server;
3
4/// Handles public node discovery, and connection. A daemon
5pub mod auto_peer;
6
7/// Stores all currently pending transactions, that are waiting to be mined
8pub mod mempool;
9
10/// Stores current node state, shared between threads
11pub mod node_state;
12
13/// IBD Logic
14pub mod ibd;
15
16/// Handles full node on message logic
17mod behavior;
18
19/// Enforces longest chain rule, syncs to a peer that has a higher height
20mod sync;
21
22use flexi_logger::{Duplicate, FileSpec, Logger};
23use futures::future::join_all;
24use log::{error, info};
25use num_bigint::BigUint;
26use std::{
27    net::SocketAddr,
28    path::PathBuf,
29    sync::{Arc, Once},
30};
31use tokio::net::TcpStream;
32
33use crate::{
34    core::{
35        block::Block,
36        blockchain::{self, Blockchain, BlockchainError},
37        transaction::{Transaction, TransactionError},
38    },
39    full_node::{
40        behavior::FullNodePeerBehavior,
41        node_state::{NodeState, SharedNodeState},
42    },
43    node::{
44        message::{Command, Message},
45        peer::{PeerError, PeerHandle, create_peer},
46    },
47};
48
49pub type SharedBlockchain = Arc<Blockchain>;
50
51static LOGGER_INIT: Once = Once::new();
52
53/// Creates a full node (SharedBlockchain and SharedNodeState), connecting to peers, accepting blocks and transactions
54pub fn create_full_node(
55    node_path: &str,
56    disable_stdout: bool,
57) -> (SharedBlockchain, SharedNodeState) {
58    let node_path = PathBuf::from(node_path);
59
60    LOGGER_INIT.call_once(|| {
61        let log_path = node_path.join("logs");
62        std::fs::create_dir_all(&log_path).expect("Failed to create log directory");
63
64        let mut logger = Logger::try_with_str("info")
65            .unwrap()
66            .log_to_file(FileSpec::default().directory(&log_path));
67
68        if !disable_stdout {
69            logger = logger.duplicate_to_stderr(Duplicate::Info);
70        }
71
72        logger.start().ok(); // Ignore errors if logger is already set
73
74        info!("Logger initialized for node at {:?}", node_path);
75    });
76
77    let node_state = NodeState::new_empty();
78    let node_state_expiry = node_state.clone();
79    node_state
80        .mempool
81        .start_expiry_watchdog(move |transaction| {
82            let _ = node_state_expiry
83                .chain_events
84                .send(node_state::ChainEvent::TransactionExpiration { transaction });
85        });
86
87    let blockchain = Blockchain::new(
88        node_path
89            .join("blockchain")
90            .to_str()
91            .expect("Failed to create node path"),
92    );
93
94    (Arc::new(blockchain), node_state)
95}
96
97/// Connect to a peer
98pub async fn connect_peer(
99    address: SocketAddr,
100    blockchain: &SharedBlockchain,
101    node_state: &SharedNodeState,
102) -> Result<PeerHandle, PeerError> {
103    let stream = TcpStream::connect(address)
104        .await
105        .map_err(|e| PeerError::Io(format!("IO error: {e}")))?;
106
107    let handle = create_peer(
108        stream,
109        FullNodePeerBehavior::new(blockchain.clone(), node_state.clone()),
110        false,
111    )?;
112    node_state
113        .connected_peers
114        .write()
115        .await
116        .insert(address, handle.clone());
117
118    Ok(handle)
119}
120
121/// Forward a message to all peers
122pub async fn to_peers(message: Message, node_state: &SharedNodeState) {
123    let peers_snapshot: Vec<_> = node_state
124        .connected_peers
125        .read()
126        .await
127        .values()
128        .cloned()
129        .collect();
130
131    // Create a list of futures for all peers
132    let futures = peers_snapshot.into_iter().map(|peer| {
133        let message = message.clone();
134        async move {
135            if let Err(err) = peer.request(message).await {
136                if let Err(e) = peer.kill(err.to_string()).await {
137                    error!("Failed to kill peer, error: {e}");
138                }
139            }
140        }
141    });
142
143    // Run all futures concurrently
144    join_all(futures).await;
145}
146
147/// Accept a new block to the local blockchain, and forward it to all peers
148pub async fn accept_block(
149    blockchain: &SharedBlockchain,
150    node_state: &SharedNodeState,
151    new_block: Block,
152) -> Result<(), BlockchainError> {
153    new_block.check_completeness()?;
154    let block_hash = new_block.meta.hash.unwrap(); // Unwrap is okay, we checked that block is complete
155
156    if node_state.last_seen_block() == block_hash {
157        return Ok(()); // We already processed this block
158    }
159    node_state.set_last_seen_block(block_hash);
160
161    // Wait for any running add block tasks to finish, hold a lock to prevent stacking
162    let _lock = node_state.processing.lock().await;
163
164    // Validation
165    blockchain::validate_block_timestamp(&new_block)?;
166    blockchain.add_block(new_block.clone(), false)?;
167
168    // Mempool, spend transactions
169    node_state
170        .mempool
171        .spend_transactions(
172            new_block
173                .transactions
174                .iter()
175                .map(|tx| tx.transaction_id.unwrap())
176                .collect(),
177        )
178        .await;
179
180    info!("New block accepted: {}", block_hash.dump_base36());
181
182    // Broadcast new block
183    let _ = node_state.chain_events.send(node_state::ChainEvent::Block {
184        block: new_block.clone(),
185    });
186
187    let node_state = node_state.clone();
188
189    // Forward to all peers (non blocking)
190    tokio::spawn(async move {
191        to_peers(
192            Message::new(Command::NewBlock { block: new_block }),
193            &node_state,
194        )
195        .await;
196    });
197    Ok(())
198}
199
200/// Accept a new block to the local blockchain, and forward it to all peers
201pub async fn accept_transaction(
202    blockchain: &SharedBlockchain,
203    node_state: &SharedNodeState,
204    new_transaction: Transaction,
205) -> Result<(), BlockchainError> {
206    new_transaction.check_completeness()?;
207    let transaction_id = new_transaction.transaction_id.unwrap(); // Unwrap is okay, we checked that tx is complete
208
209    if node_state
210        .last_seen_transactions()
211        .contains(&transaction_id)
212    {
213        return Ok(()); // We already processed this tx
214    }
215    node_state.add_last_seen_transaction(transaction_id);
216
217    if BigUint::from_bytes_be(
218        &node_state
219            .get_live_transaction_difficulty(blockchain.get_transaction_difficulty())
220            .await,
221    ) < BigUint::from_bytes_be(&*transaction_id)
222    {
223        return Err(BlockchainError::LiveTransactionDifficulty);
224    }
225
226    // Validation
227    blockchain::validate_transaction_timestamp(&new_transaction)?;
228    blockchain.get_utxos().validate_transaction(
229        &new_transaction,
230        &BigUint::from_bytes_be(&blockchain.get_transaction_difficulty()),
231        false,
232    )?;
233    if !node_state
234        .mempool
235        .validate_transaction(&new_transaction)
236        .await
237    {
238        return Err(TransactionError::DoubleSpend(transaction_id.dump_base36()).into());
239    }
240
241    node_state
242        .mempool
243        .add_transaction(new_transaction.clone())
244        .await;
245
246    // Broadcast new transaction
247    let _ = node_state
248        .chain_events
249        .send(node_state::ChainEvent::Transaction {
250            transaction: new_transaction.clone(),
251        });
252
253    let node_state = node_state.clone();
254
255    // Forward to all peers (non blocking)
256    tokio::spawn(async move {
257        to_peers(
258            Message::new(Command::NewTransaction {
259                transaction: new_transaction,
260            }),
261            &node_state,
262        )
263        .await;
264    });
265    Ok(())
266}