Skip to main content

snap_coin/full_node/
mod.rs

1/// Listens for incoming peer connections
2pub mod p2p_server;
3
4/// Handles public node discovery, and connection. A daemon
5pub mod auto_peer;
6
7/// Stores all currently pending transactions, that are waiting to be mined
8pub mod mempool;
9
10/// Stores current node state, shared between threads
11pub mod node_state;
12
13/// Handles full node on message logic
14mod behavior;
15
16/// Enforces longest chain rule, syncs to a peer that has a higher height
17mod sync;
18
19use flexi_logger::{Duplicate, FileSpec, Logger};
20use futures::future::join_all;
21use log::{error, info};
22use num_bigint::BigUint;
23use std::{
24    net::SocketAddr,
25    path::PathBuf,
26    sync::{Arc, Once},
27};
28use tokio::net::TcpStream;
29
30use crate::{
31    core::{
32        block::Block,
33        blockchain::{self, Blockchain, BlockchainError},
34        transaction::{Transaction, TransactionError},
35    },
36    full_node::{
37        behavior::FullNodePeerBehavior,
38        node_state::{NodeState, SharedNodeState},
39    },
40    node::{
41        message::{Command, Message},
42        peer::{PeerError, PeerHandle, create_peer},
43    },
44};
45
46pub type SharedBlockchain = Arc<Blockchain>;
47
48static LOGGER_INIT: Once = Once::new();
49
50/// Creates a full node (SharedBlockchain and SharedNodeState), connecting to peers, accepting blocks and transactions
51pub fn create_full_node(
52    node_path: &str,
53    disable_stdout: bool,
54) -> (SharedBlockchain, SharedNodeState) {
55    let node_path = PathBuf::from(node_path);
56
57    LOGGER_INIT.call_once(|| {
58        let log_path = node_path.join("logs");
59        std::fs::create_dir_all(&log_path).expect("Failed to create log directory");
60
61        let mut logger = Logger::try_with_str("info")
62            .unwrap()
63            .log_to_file(FileSpec::default().directory(&log_path));
64
65        if !disable_stdout {
66            logger = logger.duplicate_to_stderr(Duplicate::Info);
67        }
68
69        logger.start().ok(); // Ignore errors if logger is already set
70
71        info!("Logger initialized for node at {:?}", node_path);
72    });
73
74    let node_state = NodeState::new_empty();
75    let node_state_expiry = node_state.clone();
76    node_state
77        .mempool
78        .start_expiry_watchdog(move |transaction| {
79            let _ = node_state_expiry
80                .chain_events
81                .send(node_state::ChainEvent::TransactionExpiration { transaction });
82        });
83
84    let blockchain = Blockchain::new(
85        node_path
86            .join("blockchain")
87            .to_str()
88            .expect("Failed to create node path"),
89    );
90
91    (Arc::new(blockchain), node_state)
92}
93
94/// Connect to a peer
95pub async fn connect_peer(
96    address: SocketAddr,
97    blockchain: &SharedBlockchain,
98    node_state: &SharedNodeState,
99) -> Result<PeerHandle, PeerError> {
100    let stream = TcpStream::connect(address)
101        .await
102        .map_err(|e| PeerError::Io(format!("IO error: {e}")))?;
103
104    let handle = create_peer(
105        stream,
106        FullNodePeerBehavior::new(blockchain.clone(), node_state.clone()),
107        false,
108    )?;
109    node_state
110        .connected_peers
111        .write()
112        .await
113        .insert(address, handle.clone());
114
115    Ok(handle)
116}
117
118/// Forward a message to all peers
119pub async fn to_peers(message: Message, node_state: &SharedNodeState) {
120    let peers_snapshot: Vec<_> = node_state
121        .connected_peers
122        .read()
123        .await
124        .values()
125        .cloned()
126        .collect();
127
128    // Create a list of futures for all peers
129    let futures = peers_snapshot.into_iter().map(|peer| {
130        let message = message.clone();
131        async move {
132            if let Err(err) = peer.request(message).await {
133                if let Err(e) = peer.kill(err.to_string()).await {
134                    error!("Failed to kill peer, error: {e}");
135                }
136            }
137        }
138    });
139
140    // Run all futures concurrently
141    join_all(futures).await;
142}
143
144/// Accept a new block to the local blockchain, and forward it to all peers
145pub async fn accept_block(
146    blockchain: &SharedBlockchain,
147    node_state: &SharedNodeState,
148    new_block: Block,
149) -> Result<(), BlockchainError> {
150    new_block.check_completeness()?;
151    let block_hash = new_block.meta.hash.unwrap(); // Unwrap is okay, we checked that block is complete
152
153    if node_state.last_seen_block() == block_hash {
154        return Ok(()); // We already processed this block
155    }
156    node_state.set_last_seen_block(block_hash);
157
158    // Wait for any running add block tasks to finish, hold a lock to prevent stacking
159    let _lock = node_state.adding_block.lock().await;
160
161    // Validation
162    blockchain::validate_block_timestamp(&new_block)?;
163    blockchain.add_block(new_block.clone())?;
164
165    // Mempool, spend transactions
166    node_state
167        .mempool
168        .spend_transactions(
169            new_block
170                .transactions
171                .iter()
172                .map(|tx| tx.transaction_id.unwrap())
173                .collect(),
174        )
175        .await;
176
177    info!("New block accepted: {}", block_hash.dump_base36());
178
179    // Broadcast new block
180    let _ = node_state.chain_events.send(node_state::ChainEvent::Block {
181        block: new_block.clone(),
182    });
183
184    let node_state = node_state.clone();
185
186    // Forward to all peers (non blocking)
187    tokio::spawn(async move {
188        to_peers(
189            Message::new(Command::NewBlock { block: new_block }),
190            &node_state,
191        )
192        .await;
193    });
194    Ok(())
195}
196
197/// Accept a new block to the local blockchain, and forward it to all peers
198pub async fn accept_transaction(
199    blockchain: &SharedBlockchain,
200    node_state: &SharedNodeState,
201    new_transaction: Transaction,
202) -> Result<(), BlockchainError> {
203    new_transaction.check_completeness()?;
204    let transaction_id = new_transaction.transaction_id.unwrap(); // Unwrap is okay, we checked that tx is complete
205
206    if BigUint::from_bytes_be(
207        &node_state
208            .get_live_transaction_difficulty(blockchain.get_transaction_difficulty())
209            .await,
210    ) < BigUint::from_bytes_be(&*transaction_id)
211    {
212        return Err(BlockchainError::LiveTransactionDifficulty);
213    }
214
215    if node_state
216        .last_seen_transactions()
217        .contains(&transaction_id)
218    {
219        return Ok(()); // We already processed this tx
220    }
221    node_state.add_last_seen_transaction(transaction_id);
222
223    // Validation
224    blockchain::validate_transaction_timestamp(&new_transaction)?;
225    blockchain.get_utxos().validate_transaction(
226        &new_transaction,
227        &BigUint::from_bytes_be(&blockchain.get_transaction_difficulty()),
228    )?;
229    if !node_state
230        .mempool
231        .validate_transaction(&new_transaction)
232        .await
233    {
234        return Err(TransactionError::DoubleSpend(transaction_id.dump_base36()).into());
235    }
236
237    node_state
238        .mempool
239        .add_transaction(new_transaction.clone())
240        .await;
241
242    // Broadcast new transaction
243    let _ = node_state
244        .chain_events
245        .send(node_state::ChainEvent::Transaction {
246            transaction: new_transaction.clone(),
247        });
248
249    let node_state = node_state.clone();
250
251    // Forward to all peers (non blocking)
252    tokio::spawn(async move {
253        to_peers(
254            Message::new(Command::NewTransaction {
255                transaction: new_transaction,
256            }),
257            &node_state,
258        )
259        .await;
260    });
261    Ok(())
262}