1use std::{sync::Arc, time::Duration};
2
3use bitcoin::{
4 block::Header,
5 p2p::{
6 message_filter::{CFHeaders, CFilter},
7 message_network::VersionMessage,
8 ServiceFlags,
9 },
10 Block, BlockHash, Network,
11};
12use tokio::{
13 select,
14 sync::mpsc::{self},
15};
16use tokio::{
17 sync::{
18 mpsc::{Receiver, UnboundedReceiver},
19 Mutex,
20 },
21 time::MissedTickBehavior,
22};
23
24use crate::{
25 chain::{
26 block_queue::{BlockQueue, BlockRecipient, ProcessBlockResponse},
27 chain::Chain,
28 checkpoints::HeaderCheckpoint,
29 error::{CFilterSyncError, HeaderSyncError},
30 CFHeaderChanges, FilterCheck, HeaderChainChanges, HeightMonitor,
31 },
32 db::traits::{HeaderStore, PeerStore},
33 error::{FetchBlockError, FetchHeaderError},
34 network::{peer_map::PeerMap, LastBlockMonitor, PeerId},
35 IndexedBlock, NodeState, TxBroadcast, TxBroadcastPolicy,
36};
37
38use super::{
39 channel_messages::{GetHeaderConfig, MainThreadMessage, PeerMessage, PeerThreadMessage},
40 client::Client,
41 config::NodeConfig,
42 dialog::Dialog,
43 error::NodeError,
44 messages::{ClientMessage, Event, Info, SyncUpdate, Warning},
45};
46
47pub(crate) const WTXID_VERSION: u32 = 70016;
48const LOOP_TIMEOUT: Duration = Duration::from_millis(10);
49
50type PeerRequirement = usize;
51
52#[derive(Debug)]
54pub struct Node<H: HeaderStore, P: PeerStore + 'static> {
55 state: NodeState,
56 chain: Chain<H>,
57 peer_map: PeerMap<P>,
58 required_peers: PeerRequirement,
59 dialog: Arc<Dialog>,
60 block_queue: BlockQueue,
61 client_recv: UnboundedReceiver<ClientMessage>,
62 peer_recv: Receiver<PeerThreadMessage>,
63}
64
65impl<H: HeaderStore, P: PeerStore> Node<H, P> {
66 pub(crate) fn new(
67 network: Network,
68 config: NodeConfig,
69 peer_store: P,
70 header_store: H,
71 ) -> (Self, Client) {
72 let NodeConfig {
73 required_peers,
74 white_list,
75 dns_resolver,
76 data_path: _,
77 header_checkpoint,
78 connection_type,
79 target_peer_size,
80 peer_timeout_config,
81 } = config;
82 let (info_tx, info_rx) = mpsc::channel::<Info>(32);
84 let (warn_tx, warn_rx) = mpsc::unbounded_channel::<Warning>();
85 let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
86 let (ctx, crx) = mpsc::unbounded_channel::<ClientMessage>();
87 let client = Client::new(info_rx, warn_rx, event_rx, ctx);
88 let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx));
90 let state = NodeState::Behind;
92 let (mtx, mrx) = mpsc::channel::<PeerThreadMessage>(32);
94 let height_monitor = Arc::new(Mutex::new(HeightMonitor::new()));
95 let peer_map = PeerMap::new(
96 mtx,
97 network,
98 peer_store,
99 white_list,
100 Arc::clone(&dialog),
101 connection_type,
102 target_peer_size,
103 peer_timeout_config,
104 Arc::clone(&height_monitor),
105 dns_resolver,
106 );
107 let chain = Chain::new(
109 network,
110 header_checkpoint,
111 Arc::clone(&dialog),
112 height_monitor,
113 header_store,
114 required_peers,
115 );
116 (
117 Self {
118 state,
119 chain,
120 peer_map,
121 required_peers: required_peers.into(),
122 dialog,
123 block_queue: BlockQueue::new(),
124 client_recv: crx,
125 peer_recv: mrx,
126 },
127 client,
128 )
129 }
130
131 pub async fn run(mut self) -> Result<(), NodeError<H::Error, P::Error>> {
137 crate::debug!("Starting node");
138 crate::debug!(format!(
139 "Configured connection requirement: {} peers",
140 self.required_peers
141 ));
142 self.fetch_headers().await?;
143 let mut last_block = LastBlockMonitor::new();
144 let mut interval = tokio::time::interval(LOOP_TIMEOUT);
145 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
146 loop {
147 self.advance_state(&mut last_block).await;
149 self.dispatch().await?;
151 self.get_blocks().await;
153 select! {
155 peer = self.peer_recv.recv() => {
156 match peer {
157 Some(peer_thread) => {
158 match peer_thread.message {
159 PeerMessage::Version(version) => {
160 self.peer_map.set_services(peer_thread.nonce, version.services);
161 self.peer_map.set_height(peer_thread.nonce, version.start_height as u32).await;
162 let response = self.handle_version(peer_thread.nonce, version).await?;
163 self.peer_map.send_message(peer_thread.nonce, response).await;
164 crate::debug!(format!("[{}]: version", peer_thread.nonce));
165 }
166 PeerMessage::Headers(headers) => {
167 last_block.reset();
168 crate::debug!(format!("[{}]: headers", peer_thread.nonce));
169 match self.handle_headers(peer_thread.nonce, headers).await {
170 Some(response) => {
171 self.peer_map.send_message(peer_thread.nonce, response).await;
172 }
173 None => continue,
174 }
175 }
176 PeerMessage::FilterHeaders(cf_headers) => {
177 crate::debug!(format!("[{}]: filter headers", peer_thread.nonce));
178 match self.handle_cf_headers(peer_thread.nonce, cf_headers).await {
179 Some(response) => {
180 self.peer_map.broadcast(response).await;
181 }
182 None => continue,
183 }
184 }
185 PeerMessage::Filter(filter) => {
186 match self.handle_filter(peer_thread.nonce, filter).await {
187 Some(response) => {
188 self.peer_map.broadcast(response).await;
189 }
190 None => continue,
191 }
192 }
193 PeerMessage::Block(block) => match self.handle_block(peer_thread.nonce, block).await {
194 Some(response) => {
195 self.peer_map.send_message(peer_thread.nonce, response).await;
196 }
197 None => continue,
198 },
199 PeerMessage::NewBlocks(blocks) => {
200 crate::debug!(format!("[{}]: inv", peer_thread.nonce));
201 match self.handle_inventory_blocks(peer_thread.nonce, blocks).await {
202 Some(response) => {
203 self.peer_map.broadcast(response).await;
204 }
205 None => continue,
206 }
207 }
208 PeerMessage::FeeFilter(feerate) => {
209 self.peer_map.set_broadcast_min(peer_thread.nonce, feerate);
210 }
211 }
212 },
213 _ => continue,
214 }
215 },
216 message = self.client_recv.recv() => {
217 if let Some(message) = message {
218 match message {
219 ClientMessage::Shutdown => return Ok(()),
220 ClientMessage::Broadcast(transaction) => {
221 self.broadcast_transaction(transaction).await;
222 },
223 ClientMessage::Rescan => {
224 if let Some(response) = self.rescan() {
225 self.peer_map.broadcast(response).await;
226 }
227 },
228 ClientMessage::GetBlock(request) => {
229 let height_opt = self.chain.header_chain.height_of_hash(request.hash);
230 if height_opt.is_none() {
231 let err_reponse = request.oneshot.send(Err(FetchBlockError::UnknownHash));
232 if err_reponse.is_err() {
233 self.dialog.send_warning(Warning::ChannelDropped);
234 }
235 } else {
236 crate::debug!(
237 format!("Adding block {} to queue", request.hash)
238 );
239 self.block_queue.add(request);
240 }
241 },
242 ClientMessage::SetDuration(duration) => {
243 self.peer_map.set_duration(duration);
244 },
245 ClientMessage::AddPeer(peer) => {
246 self.peer_map.add_trusted_peer(peer);
247 },
248 ClientMessage::GetHeader(request) => {
249 let header_opt = self.chain.fetch_header(request.height).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }).and_then(|opt| opt.ok_or(FetchHeaderError::UnknownHeight));
250 let send_result = request.oneshot.send(header_opt);
251 if send_result.is_err() {
252 self.dialog.send_warning(Warning::ChannelDropped);
253 };
254 },
255 ClientMessage::GetHeaderBatch(request) => {
256 let range_opt = self.chain.fetch_header_range(request.range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() });
257 let send_result = request.oneshot.send(range_opt);
258 if send_result.is_err() {
259 self.dialog.send_warning(Warning::ChannelDropped);
260 };
261 },
262 ClientMessage::GetBroadcastMinFeeRate(request) => {
263 let fee_rate = self.peer_map.broadcast_min();
264 let send_result = request.send(fee_rate);
265 if send_result.is_err() {
266 self.dialog.send_warning(Warning::ChannelDropped);
267 };
268 }
269 ClientMessage::NoOp => (),
270 }
271 }
272 }
273 _ = interval.tick() => (),
274 }
275 }
276 }
277
278 async fn dispatch(&mut self) -> Result<(), NodeError<H::Error, P::Error>> {
280 self.peer_map.clean().await;
281 let live = self.peer_map.live();
282 let required = self.next_required_peers();
283 if live < required {
285 self.dialog.send_warning(Warning::NeedConnections {
286 connected: live,
287 required,
288 });
289 let address = self.peer_map.next_peer().await?;
290 if self.peer_map.dispatch(address).await.is_err() {
291 self.dialog.send_warning(Warning::CouldNotConnect);
292 }
293 }
294 Ok(())
295 }
296
297 async fn get_blocks(&mut self) {
299 if let Some(block_request) = self.pop_block_queue() {
300 crate::debug!("Sending block request to random peer");
301 self.peer_map.send_random(block_request).await;
302 }
303 }
304
305 async fn broadcast_transaction(&self, broadcast: TxBroadcast) {
307 let mut queue = self.peer_map.tx_queue.lock().await;
308 queue.add_to_queue(broadcast.tx);
309 drop(queue);
310 match broadcast.broadcast_policy {
311 TxBroadcastPolicy::AllPeers => {
312 crate::debug!(format!(
313 "Sending transaction to {} connected peers",
314 self.peer_map.live()
315 ));
316 self.peer_map
317 .broadcast(MainThreadMessage::BroadcastPending)
318 .await
319 }
320 TxBroadcastPolicy::RandomPeer => {
321 crate::debug!("Sending transaction to a random peer");
322 self.peer_map
323 .send_random(MainThreadMessage::BroadcastPending)
324 .await
325 }
326 };
327 }
328
329 async fn advance_state(&mut self, last_block: &mut LastBlockMonitor) {
331 match self.state {
332 NodeState::Behind => {
333 if self.chain.is_synced().await {
334 self.state = NodeState::HeadersSynced;
335 }
336 }
337 NodeState::HeadersSynced => {
338 if self.chain.is_cf_headers_synced() {
339 self.state = NodeState::FilterHeadersSynced;
340 }
341 }
342 NodeState::FilterHeadersSynced => {
343 if self.chain.is_filters_synced() {
344 self.state = NodeState::FiltersSynced;
345 let update = SyncUpdate::new(
346 HeaderCheckpoint::new(
347 self.chain.header_chain.height(),
348 self.chain.header_chain.tip_hash(),
349 ),
350 self.chain.last_ten(),
351 );
352 self.dialog.send_event(Event::FiltersSynced(update));
353 }
354 }
355 NodeState::FiltersSynced => {
356 if last_block.stale() {
357 self.dialog.send_warning(Warning::PotentialStaleTip);
358 crate::debug!("Disconnecting from remote nodes to find new connections");
359 self.peer_map.broadcast(MainThreadMessage::Disconnect).await;
360 last_block.reset();
361 }
362 }
363 }
364 }
365
366 fn next_required_peers(&self) -> PeerRequirement {
368 match self.state {
369 NodeState::Behind => 1,
370 _ => self.required_peers,
371 }
372 }
373
374 async fn next_stateful_message(&mut self) -> Option<MainThreadMessage> {
377 if !self.chain.is_synced().await {
378 let headers = GetHeaderConfig {
379 locators: self.chain.header_chain.locators(),
380 stop_hash: None,
381 };
382 return Some(MainThreadMessage::GetHeaders(headers));
383 } else if !self.chain.is_cf_headers_synced() {
384 return Some(MainThreadMessage::GetFilterHeaders(
385 self.chain.next_cf_header_message(),
386 ));
387 } else if !self.chain.is_filters_synced() {
388 return Some(MainThreadMessage::GetFilters(
389 self.chain.next_filter_message(),
390 ));
391 }
392 None
393 }
394
395 async fn handle_version(
397 &mut self,
398 nonce: PeerId,
399 version_message: VersionMessage,
400 ) -> Result<MainThreadMessage, NodeError<H::Error, P::Error>> {
401 if version_message.version < WTXID_VERSION {
402 return Ok(MainThreadMessage::Disconnect);
403 }
404 match self.state {
405 NodeState::Behind => (),
406 _ => {
407 if !version_message.services.has(ServiceFlags::COMPACT_FILTERS)
408 || !version_message.services.has(ServiceFlags::NETWORK)
409 {
410 self.dialog.send_warning(Warning::NoCompactFilters);
411 return Ok(MainThreadMessage::Disconnect);
412 }
413 }
414 }
415 self.peer_map.tried(nonce).await;
416 let needs_peers = self.peer_map.need_peers().await?;
417 self.peer_map
419 .send_message(nonce, MainThreadMessage::GetAddrV2)
420 .await;
421 self.peer_map
423 .send_message(nonce, MainThreadMessage::WtxidRelay)
424 .await;
425 self.peer_map
426 .send_message(nonce, MainThreadMessage::Verack)
427 .await;
428 if needs_peers {
430 crate::debug!("Requesting new addresses");
431 self.peer_map
432 .send_message(nonce, MainThreadMessage::GetAddr)
433 .await;
434 }
435 if self.peer_map.live().eq(&self.required_peers) {
437 self.dialog.send_info(Info::ConnectionsMet).await;
438 }
439 let next_headers = GetHeaderConfig {
441 locators: self.chain.header_chain.locators(),
442 stop_hash: None,
443 };
444 Ok(MainThreadMessage::GetHeaders(next_headers))
445 }
446
447 async fn handle_headers(
449 &mut self,
450 peer_id: PeerId,
451 headers: Vec<Header>,
452 ) -> Option<MainThreadMessage> {
453 let chain = &mut self.chain;
454 match chain.sync_chain(headers).await {
455 Ok(changes) => match changes {
456 HeaderChainChanges::Extended(height) => {
457 self.dialog.send_info(Info::NewChainHeight(height)).await;
458 }
459 HeaderChainChanges::Reorg { height: _, hashes } => {
460 self.block_queue.remove(&hashes);
461 crate::debug!(format!("{} blocks reorganized", hashes.len()));
462 }
463 HeaderChainChanges::ForkAdded { tip } => {
464 crate::debug!(format!(
465 "Candidate fork {} -> {}",
466 tip.height,
467 tip.block_hash()
468 ));
469 self.dialog.send_info(Info::NewFork { tip }).await;
470 }
471 HeaderChainChanges::Duplicate => (),
472 },
473 Err(e) => match e {
474 HeaderSyncError::EmptyMessage => {
475 if !chain.is_synced().await {
476 return Some(MainThreadMessage::Disconnect);
477 }
478 return self.next_stateful_message().await;
479 }
480 _ => {
481 self.dialog.send_warning(Warning::UnexpectedSyncError {
482 warning: format!("Unexpected header syncing error: {e}"),
483 });
484 self.peer_map.ban(peer_id).await;
485 return Some(MainThreadMessage::Disconnect);
486 }
487 },
488 }
489 self.next_stateful_message().await
490 }
491
492 async fn handle_cf_headers(
494 &mut self,
495 peer_id: PeerId,
496 cf_headers: CFHeaders,
497 ) -> Option<MainThreadMessage> {
498 self.chain.send_chain_update().await;
499 match self.chain.sync_cf_headers(peer_id, cf_headers) {
500 Ok(potential_message) => match potential_message {
501 CFHeaderChanges::AddedToQueue => None,
502 CFHeaderChanges::Extended => self.next_stateful_message().await,
503 CFHeaderChanges::Conflict => {
504 self.dialog.send_warning(Warning::UnexpectedSyncError {
505 warning: "Found a conflict while peers are sending filter headers".into(),
506 });
507 Some(MainThreadMessage::Disconnect)
508 }
509 },
510 Err(e) => {
511 self.dialog.send_warning(Warning::UnexpectedSyncError {
512 warning: format!("Compact filter header syncing encountered an error: {e}"),
513 });
514 self.peer_map.ban(peer_id).await;
515 Some(MainThreadMessage::Disconnect)
516 }
517 }
518 }
519
520 async fn handle_filter(
522 &mut self,
523 peer_id: PeerId,
524 filter: CFilter,
525 ) -> Option<MainThreadMessage> {
526 match self.chain.sync_filter(filter) {
527 Ok(potential_message) => {
528 let FilterCheck { was_last_in_batch } = potential_message;
529 if was_last_in_batch {
530 self.chain.send_chain_update().await;
531 if !self.chain.is_filters_synced() {
532 let next_filters = self.chain.next_filter_message();
533 return Some(MainThreadMessage::GetFilters(next_filters));
534 }
535 }
536 None
537 }
538 Err(e) => {
539 self.dialog.send_warning(Warning::UnexpectedSyncError {
540 warning: format!("Compact filter syncing encountered an error: {e}"),
541 });
542 match e {
543 CFilterSyncError::Filter(_) => Some(MainThreadMessage::Disconnect),
544 _ => {
545 self.peer_map.ban(peer_id).await;
546 Some(MainThreadMessage::Disconnect)
547 }
548 }
549 }
550 }
551 }
552
553 async fn handle_block(&mut self, peer_id: PeerId, block: Block) -> Option<MainThreadMessage> {
555 let block_hash = block.block_hash();
556 let height = match self.chain.header_chain.height_of_hash(block_hash) {
557 Some(height) => height,
558 None => {
559 self.dialog.send_warning(Warning::UnexpectedSyncError {
560 warning: "A block received does not have a known hash".into(),
561 });
562 self.peer_map.ban(peer_id).await;
563 return Some(MainThreadMessage::Disconnect);
564 }
565 };
566 if !block.check_merkle_root() {
567 self.dialog.send_warning(Warning::UnexpectedSyncError {
568 warning: "A block received does not have a valid merkle root".into(),
569 });
570 self.peer_map.ban(peer_id).await;
571 return Some(MainThreadMessage::Disconnect);
572 }
573 let process_block_response = self.block_queue.process_block(&block_hash);
574 match process_block_response {
575 ProcessBlockResponse::Accepted { block_recipient } => {
576 self.dialog
577 .send_info(Info::BlockReceived(block.block_hash()))
578 .await;
579 match block_recipient {
580 BlockRecipient::Client(sender) => {
581 let send_err = sender.send(Ok(IndexedBlock::new(height, block))).is_err();
582 if send_err {
583 self.dialog.send_warning(Warning::ChannelDropped);
584 };
585 }
586 BlockRecipient::Event => {
587 self.dialog
588 .send_event(Event::Block(IndexedBlock::new(height, block)));
589 }
590 }
591 }
592 ProcessBlockResponse::LateResponse => {
593 crate::debug!(format!(
594 "Peer {} responded late to a request for hash {}",
595 peer_id, block_hash
596 ));
597 }
598 ProcessBlockResponse::UnknownHash => {
599 crate::debug!(format!(
600 "Peer {} responded with an irrelevant block",
601 peer_id
602 ));
603 }
604 }
605 None
606 }
607
608 fn pop_block_queue(&mut self) -> Option<MainThreadMessage> {
610 if matches!(
611 self.state,
612 NodeState::FilterHeadersSynced | NodeState::FiltersSynced
613 ) {
614 let next_block_hash = self.block_queue.pop();
615 return next_block_hash.map(MainThreadMessage::GetBlock);
616 }
617 None
618 }
619
620 async fn handle_inventory_blocks(
622 &mut self,
623 nonce: PeerId,
624 blocks: Vec<BlockHash>,
625 ) -> Option<MainThreadMessage> {
626 for block in blocks.iter() {
627 if !self.chain.header_chain.contains(*block) {
628 self.peer_map.increment_height(nonce).await;
629 crate::debug!(format!("New block: {}", block));
630 }
631 }
632 match self.state {
633 NodeState::Behind => None,
634 _ => {
635 if blocks
636 .into_iter()
637 .any(|block| !self.chain.header_chain.contains(block))
638 {
639 self.state = NodeState::Behind;
640 let next_headers = GetHeaderConfig {
641 locators: self.chain.header_chain.locators(),
642 stop_hash: None,
643 };
644 self.chain.clear_compact_filter_queue();
645 Some(MainThreadMessage::GetHeaders(next_headers))
646 } else {
647 None
648 }
649 }
650 }
651 }
652
653 fn rescan(&mut self) -> Option<MainThreadMessage> {
655 match self.state {
656 NodeState::Behind => None,
657 NodeState::HeadersSynced => None,
658 _ => {
659 self.chain.clear_filters();
660 self.state = NodeState::FilterHeadersSynced;
661 Some(MainThreadMessage::GetFilters(
662 self.chain.next_filter_message(),
663 ))
664 }
665 }
666 }
667
668 async fn fetch_headers(&mut self) -> Result<(), NodeError<H::Error, P::Error>> {
670 crate::debug!("Attempting to load headers from the database");
671 self.chain
672 .load_headers()
673 .await
674 .map_err(NodeError::HeaderDatabase)
675 }
676}