kyoto/
node.rs

1use std::{sync::Arc, time::Duration};
2
3use bitcoin::{
4    block::Header,
5    p2p::{
6        message_filter::{CFHeaders, CFilter},
7        message_network::VersionMessage,
8        ServiceFlags,
9    },
10    Block, BlockHash, Network,
11};
12use tokio::{
13    select,
14    sync::mpsc::{self},
15};
16use tokio::{
17    sync::{
18        mpsc::{Receiver, UnboundedReceiver},
19        Mutex,
20    },
21    time::MissedTickBehavior,
22};
23
24use crate::{
25    chain::{
26        block_queue::{BlockQueue, BlockRecipient, ProcessBlockResponse},
27        chain::Chain,
28        checkpoints::HeaderCheckpoint,
29        error::{CFilterSyncError, HeaderSyncError},
30        CFHeaderChanges, FilterCheck, HeaderChainChanges, HeightMonitor,
31    },
32    db::traits::{HeaderStore, PeerStore},
33    error::{FetchBlockError, FetchHeaderError},
34    network::{peer_map::PeerMap, LastBlockMonitor, PeerId},
35    IndexedBlock, NodeState, TxBroadcast, TxBroadcastPolicy,
36};
37
38use super::{
39    channel_messages::{GetHeaderConfig, MainThreadMessage, PeerMessage, PeerThreadMessage},
40    client::Client,
41    config::NodeConfig,
42    dialog::Dialog,
43    error::NodeError,
44    messages::{ClientMessage, Event, Info, SyncUpdate, Warning},
45};
46
47pub(crate) const WTXID_VERSION: u32 = 70016;
48const LOOP_TIMEOUT: Duration = Duration::from_millis(10);
49
50type PeerRequirement = usize;
51
52/// A compact block filter node. Nodes download Bitcoin block headers, block filters, and blocks to send relevant events to a client.
53#[derive(Debug)]
54pub struct Node<H: HeaderStore, P: PeerStore + 'static> {
55    state: NodeState,
56    chain: Chain<H>,
57    peer_map: PeerMap<P>,
58    required_peers: PeerRequirement,
59    dialog: Arc<Dialog>,
60    block_queue: BlockQueue,
61    client_recv: UnboundedReceiver<ClientMessage>,
62    peer_recv: Receiver<PeerThreadMessage>,
63}
64
65impl<H: HeaderStore, P: PeerStore> Node<H, P> {
66    pub(crate) fn new(
67        network: Network,
68        config: NodeConfig,
69        peer_store: P,
70        header_store: H,
71    ) -> (Self, Client) {
72        let NodeConfig {
73            required_peers,
74            white_list,
75            dns_resolver,
76            data_path: _,
77            header_checkpoint,
78            connection_type,
79            target_peer_size,
80            peer_timeout_config,
81        } = config;
82        // Set up a communication channel between the node and client
83        let (info_tx, info_rx) = mpsc::channel::<Info>(32);
84        let (warn_tx, warn_rx) = mpsc::unbounded_channel::<Warning>();
85        let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
86        let (ctx, crx) = mpsc::unbounded_channel::<ClientMessage>();
87        let client = Client::new(info_rx, warn_rx, event_rx, ctx);
88        // A structured way to talk to the client
89        let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx));
90        // We always assume we are behind
91        let state = NodeState::Behind;
92        // Configure the peer manager
93        let (mtx, mrx) = mpsc::channel::<PeerThreadMessage>(32);
94        let height_monitor = Arc::new(Mutex::new(HeightMonitor::new()));
95        let peer_map = PeerMap::new(
96            mtx,
97            network,
98            peer_store,
99            white_list,
100            Arc::clone(&dialog),
101            connection_type,
102            target_peer_size,
103            peer_timeout_config,
104            Arc::clone(&height_monitor),
105            dns_resolver,
106        );
107        // Build the chain
108        let chain = Chain::new(
109            network,
110            header_checkpoint,
111            Arc::clone(&dialog),
112            height_monitor,
113            header_store,
114            required_peers,
115        );
116        (
117            Self {
118                state,
119                chain,
120                peer_map,
121                required_peers: required_peers.into(),
122                dialog,
123                block_queue: BlockQueue::new(),
124                client_recv: crx,
125                peer_recv: mrx,
126            },
127            client,
128        )
129    }
130
131    /// Run the node continuously. Typically run on a separate thread than the underlying application.
132    ///
133    /// # Errors
134    ///
135    /// A node will cease running if a fatal error is encountered with either the [`PeerStore`] or [`HeaderStore`].
136    pub async fn run(mut self) -> Result<(), NodeError<H::Error, P::Error>> {
137        crate::debug!("Starting node");
138        crate::debug!(format!(
139            "Configured connection requirement: {} peers",
140            self.required_peers
141        ));
142        self.fetch_headers().await?;
143        let mut last_block = LastBlockMonitor::new();
144        let mut interval = tokio::time::interval(LOOP_TIMEOUT);
145        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
146        loop {
147            // Try to advance the state of the node
148            self.advance_state(&mut last_block).await;
149            // Connect to more peers if we need them and remove old connections
150            self.dispatch().await?;
151            // If there are blocks we need in the queue, we should request them of a random peer
152            self.get_blocks().await;
153            // Either handle a message from a remote peer or from our client
154            select! {
155                peer = self.peer_recv.recv() => {
156                    match peer {
157                        Some(peer_thread) => {
158                            match peer_thread.message {
159                                PeerMessage::Version(version) => {
160                                    self.peer_map.set_services(peer_thread.nonce, version.services);
161                                    self.peer_map.set_height(peer_thread.nonce, version.start_height as u32).await;
162                                    let response = self.handle_version(peer_thread.nonce, version).await?;
163                                    self.peer_map.send_message(peer_thread.nonce, response).await;
164                                    crate::debug!(format!("[{}]: version", peer_thread.nonce));
165                                }
166                                PeerMessage::Headers(headers) => {
167                                    last_block.reset();
168                                    crate::debug!(format!("[{}]: headers", peer_thread.nonce));
169                                    match self.handle_headers(peer_thread.nonce, headers).await {
170                                        Some(response) => {
171                                            self.peer_map.send_message(peer_thread.nonce, response).await;
172                                        }
173                                        None => continue,
174                                    }
175                                }
176                                PeerMessage::FilterHeaders(cf_headers) => {
177                                    crate::debug!(format!("[{}]: filter headers", peer_thread.nonce));
178                                    match self.handle_cf_headers(peer_thread.nonce, cf_headers).await {
179                                        Some(response) => {
180                                            self.peer_map.broadcast(response).await;
181                                        }
182                                        None => continue,
183                                    }
184                                }
185                                PeerMessage::Filter(filter) => {
186                                    match self.handle_filter(peer_thread.nonce, filter).await {
187                                        Some(response) => {
188                                            self.peer_map.broadcast(response).await;
189                                        }
190                                        None => continue,
191                                    }
192                                }
193                                PeerMessage::Block(block) => match self.handle_block(peer_thread.nonce, block).await {
194                                    Some(response) => {
195                                        self.peer_map.send_message(peer_thread.nonce, response).await;
196                                    }
197                                    None => continue,
198                                },
199                                PeerMessage::NewBlocks(blocks) => {
200                                    crate::debug!(format!("[{}]: inv", peer_thread.nonce));
201                                    match self.handle_inventory_blocks(peer_thread.nonce, blocks).await {
202                                        Some(response) => {
203                                            self.peer_map.broadcast(response).await;
204                                        }
205                                        None => continue,
206                                    }
207                                }
208                                PeerMessage::FeeFilter(feerate) => {
209                                    self.peer_map.set_broadcast_min(peer_thread.nonce, feerate);
210                                }
211                            }
212                        },
213                        _ => continue,
214                    }
215                },
216                message = self.client_recv.recv() => {
217                    if let Some(message) = message {
218                        match message {
219                            ClientMessage::Shutdown => return Ok(()),
220                            ClientMessage::Broadcast(transaction) => {
221                                self.broadcast_transaction(transaction).await;
222                            },
223                            ClientMessage::Rescan => {
224                                if let Some(response) = self.rescan() {
225                                    self.peer_map.broadcast(response).await;
226                                }
227                            },
228                            ClientMessage::GetBlock(request) => {
229                                let height_opt = self.chain.header_chain.height_of_hash(request.hash);
230                                if height_opt.is_none() {
231                                    let err_reponse = request.oneshot.send(Err(FetchBlockError::UnknownHash));
232                                    if err_reponse.is_err() {
233                                        self.dialog.send_warning(Warning::ChannelDropped);
234                                    }
235                                } else {
236                                    crate::debug!(
237                                        format!("Adding block {} to queue", request.hash)
238                                    );
239                                    self.block_queue.add(request);
240                                }
241                            },
242                            ClientMessage::SetDuration(duration) => {
243                                self.peer_map.set_duration(duration);
244                            },
245                            ClientMessage::AddPeer(peer) => {
246                                self.peer_map.add_trusted_peer(peer);
247                            },
248                            ClientMessage::GetHeader(request) => {
249                                let header_opt = self.chain.fetch_header(request.height).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }).and_then(|opt| opt.ok_or(FetchHeaderError::UnknownHeight));
250                                let send_result = request.oneshot.send(header_opt);
251                                if send_result.is_err() {
252                                    self.dialog.send_warning(Warning::ChannelDropped);
253                                };
254                            },
255                            ClientMessage::GetHeaderBatch(request) => {
256                                let range_opt = self.chain.fetch_header_range(request.range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() });
257                                let send_result = request.oneshot.send(range_opt);
258                                if send_result.is_err() {
259                                    self.dialog.send_warning(Warning::ChannelDropped);
260                                };
261                            },
262                            ClientMessage::GetBroadcastMinFeeRate(request) => {
263                                let fee_rate = self.peer_map.broadcast_min();
264                                let send_result = request.send(fee_rate);
265                                if send_result.is_err() {
266                                    self.dialog.send_warning(Warning::ChannelDropped);
267                                };
268                            }
269                            ClientMessage::NoOp => (),
270                        }
271                    }
272                }
273                _ = interval.tick() => (),
274            }
275        }
276    }
277
278    // Connect to a new peer if we are not connected to enough
279    async fn dispatch(&mut self) -> Result<(), NodeError<H::Error, P::Error>> {
280        self.peer_map.clean().await;
281        let live = self.peer_map.live();
282        let required = self.next_required_peers();
283        // Find more peers when lower than the desired threshold.
284        if live < required {
285            self.dialog.send_warning(Warning::NeedConnections {
286                connected: live,
287                required,
288            });
289            let address = self.peer_map.next_peer().await?;
290            if self.peer_map.dispatch(address).await.is_err() {
291                self.dialog.send_warning(Warning::CouldNotConnect);
292            }
293        }
294        Ok(())
295    }
296
297    // If there are blocks in the queue, we should request them of a random peer
298    async fn get_blocks(&mut self) {
299        if let Some(block_request) = self.pop_block_queue() {
300            crate::debug!("Sending block request to random peer");
301            self.peer_map.send_random(block_request).await;
302        }
303    }
304
305    // Broadcast transactions according to the configured policy
306    async fn broadcast_transaction(&self, broadcast: TxBroadcast) {
307        let mut queue = self.peer_map.tx_queue.lock().await;
308        queue.add_to_queue(broadcast.tx);
309        drop(queue);
310        match broadcast.broadcast_policy {
311            TxBroadcastPolicy::AllPeers => {
312                crate::debug!(format!(
313                    "Sending transaction to {} connected peers",
314                    self.peer_map.live()
315                ));
316                self.peer_map
317                    .broadcast(MainThreadMessage::BroadcastPending)
318                    .await
319            }
320            TxBroadcastPolicy::RandomPeer => {
321                crate::debug!("Sending transaction to a random peer");
322                self.peer_map
323                    .send_random(MainThreadMessage::BroadcastPending)
324                    .await
325            }
326        };
327    }
328
329    // Try to continue with the syncing process
330    async fn advance_state(&mut self, last_block: &mut LastBlockMonitor) {
331        match self.state {
332            NodeState::Behind => {
333                if self.chain.is_synced().await {
334                    self.state = NodeState::HeadersSynced;
335                }
336            }
337            NodeState::HeadersSynced => {
338                if self.chain.is_cf_headers_synced() {
339                    self.state = NodeState::FilterHeadersSynced;
340                }
341            }
342            NodeState::FilterHeadersSynced => {
343                if self.chain.is_filters_synced() {
344                    self.state = NodeState::FiltersSynced;
345                    let update = SyncUpdate::new(
346                        HeaderCheckpoint::new(
347                            self.chain.header_chain.height(),
348                            self.chain.header_chain.tip_hash(),
349                        ),
350                        self.chain.last_ten(),
351                    );
352                    self.dialog.send_event(Event::FiltersSynced(update));
353                }
354            }
355            NodeState::FiltersSynced => {
356                if last_block.stale() {
357                    self.dialog.send_warning(Warning::PotentialStaleTip);
358                    crate::debug!("Disconnecting from remote nodes to find new connections");
359                    self.peer_map.broadcast(MainThreadMessage::Disconnect).await;
360                    last_block.reset();
361                }
362            }
363        }
364    }
365
366    // When syncing headers we are only interested in one peer to start
367    fn next_required_peers(&self) -> PeerRequirement {
368        match self.state {
369            NodeState::Behind => 1,
370            _ => self.required_peers,
371        }
372    }
373
374    // After we receiving some chain-syncing message, we decide what chain of data needs to be
375    // requested next.
376    async fn next_stateful_message(&mut self) -> Option<MainThreadMessage> {
377        if !self.chain.is_synced().await {
378            let headers = GetHeaderConfig {
379                locators: self.chain.header_chain.locators(),
380                stop_hash: None,
381            };
382            return Some(MainThreadMessage::GetHeaders(headers));
383        } else if !self.chain.is_cf_headers_synced() {
384            return Some(MainThreadMessage::GetFilterHeaders(
385                self.chain.next_cf_header_message(),
386            ));
387        } else if !self.chain.is_filters_synced() {
388            return Some(MainThreadMessage::GetFilters(
389                self.chain.next_filter_message(),
390            ));
391        }
392        None
393    }
394
395    // We accepted a handshake with a peer but we may disconnect if they do not support CBF
396    async fn handle_version(
397        &mut self,
398        nonce: PeerId,
399        version_message: VersionMessage,
400    ) -> Result<MainThreadMessage, NodeError<H::Error, P::Error>> {
401        if version_message.version < WTXID_VERSION {
402            return Ok(MainThreadMessage::Disconnect);
403        }
404        match self.state {
405            NodeState::Behind => (),
406            _ => {
407                if !version_message.services.has(ServiceFlags::COMPACT_FILTERS)
408                    || !version_message.services.has(ServiceFlags::NETWORK)
409                {
410                    self.dialog.send_warning(Warning::NoCompactFilters);
411                    return Ok(MainThreadMessage::Disconnect);
412                }
413            }
414        }
415        self.peer_map.tried(nonce).await;
416        let needs_peers = self.peer_map.need_peers().await?;
417        // First we signal for ADDRV2 support
418        self.peer_map
419            .send_message(nonce, MainThreadMessage::GetAddrV2)
420            .await;
421        // Then for BIP 339 witness transaction broadcast
422        self.peer_map
423            .send_message(nonce, MainThreadMessage::WtxidRelay)
424            .await;
425        self.peer_map
426            .send_message(nonce, MainThreadMessage::Verack)
427            .await;
428        // Now we may request peers if required
429        if needs_peers {
430            crate::debug!("Requesting new addresses");
431            self.peer_map
432                .send_message(nonce, MainThreadMessage::GetAddr)
433                .await;
434        }
435        // Inform the user we are connected to all required peers
436        if self.peer_map.live().eq(&self.required_peers) {
437            self.dialog.send_info(Info::ConnectionsMet).await;
438        }
439        // Even if we start the node as caught up in terms of height, we need to check for reorgs. So we can send this unconditionally.
440        let next_headers = GetHeaderConfig {
441            locators: self.chain.header_chain.locators(),
442            stop_hash: None,
443        };
444        Ok(MainThreadMessage::GetHeaders(next_headers))
445    }
446
447    // We always send headers to our peers, so our next message depends on our state
448    async fn handle_headers(
449        &mut self,
450        peer_id: PeerId,
451        headers: Vec<Header>,
452    ) -> Option<MainThreadMessage> {
453        let chain = &mut self.chain;
454        match chain.sync_chain(headers).await {
455            Ok(changes) => match changes {
456                HeaderChainChanges::Extended(height) => {
457                    self.dialog.send_info(Info::NewChainHeight(height)).await;
458                }
459                HeaderChainChanges::Reorg { height: _, hashes } => {
460                    self.block_queue.remove(&hashes);
461                    crate::debug!(format!("{} blocks reorganized", hashes.len()));
462                }
463                HeaderChainChanges::ForkAdded { tip } => {
464                    crate::debug!(format!(
465                        "Candidate fork {} -> {}",
466                        tip.height,
467                        tip.block_hash()
468                    ));
469                    self.dialog.send_info(Info::NewFork { tip }).await;
470                }
471                HeaderChainChanges::Duplicate => (),
472            },
473            Err(e) => match e {
474                HeaderSyncError::EmptyMessage => {
475                    if !chain.is_synced().await {
476                        return Some(MainThreadMessage::Disconnect);
477                    }
478                    return self.next_stateful_message().await;
479                }
480                _ => {
481                    self.dialog.send_warning(Warning::UnexpectedSyncError {
482                        warning: format!("Unexpected header syncing error: {e}"),
483                    });
484                    self.peer_map.ban(peer_id).await;
485                    return Some(MainThreadMessage::Disconnect);
486                }
487            },
488        }
489        self.next_stateful_message().await
490    }
491
492    // Compact filter headers may result in a number of outcomes, including the need to audit filters.
493    async fn handle_cf_headers(
494        &mut self,
495        peer_id: PeerId,
496        cf_headers: CFHeaders,
497    ) -> Option<MainThreadMessage> {
498        self.chain.send_chain_update().await;
499        match self.chain.sync_cf_headers(peer_id, cf_headers) {
500            Ok(potential_message) => match potential_message {
501                CFHeaderChanges::AddedToQueue => None,
502                CFHeaderChanges::Extended => self.next_stateful_message().await,
503                CFHeaderChanges::Conflict => {
504                    self.dialog.send_warning(Warning::UnexpectedSyncError {
505                        warning: "Found a conflict while peers are sending filter headers".into(),
506                    });
507                    Some(MainThreadMessage::Disconnect)
508                }
509            },
510            Err(e) => {
511                self.dialog.send_warning(Warning::UnexpectedSyncError {
512                    warning: format!("Compact filter header syncing encountered an error: {e}"),
513                });
514                self.peer_map.ban(peer_id).await;
515                Some(MainThreadMessage::Disconnect)
516            }
517        }
518    }
519
520    // Handle a new compact block filter
521    async fn handle_filter(
522        &mut self,
523        peer_id: PeerId,
524        filter: CFilter,
525    ) -> Option<MainThreadMessage> {
526        match self.chain.sync_filter(filter) {
527            Ok(potential_message) => {
528                let FilterCheck { was_last_in_batch } = potential_message;
529                if was_last_in_batch {
530                    self.chain.send_chain_update().await;
531                    if !self.chain.is_filters_synced() {
532                        let next_filters = self.chain.next_filter_message();
533                        return Some(MainThreadMessage::GetFilters(next_filters));
534                    }
535                }
536                None
537            }
538            Err(e) => {
539                self.dialog.send_warning(Warning::UnexpectedSyncError {
540                    warning: format!("Compact filter syncing encountered an error: {e}"),
541                });
542                match e {
543                    CFilterSyncError::Filter(_) => Some(MainThreadMessage::Disconnect),
544                    _ => {
545                        self.peer_map.ban(peer_id).await;
546                        Some(MainThreadMessage::Disconnect)
547                    }
548                }
549            }
550        }
551    }
552
553    // Scan a block for transactions.
554    async fn handle_block(&mut self, peer_id: PeerId, block: Block) -> Option<MainThreadMessage> {
555        let block_hash = block.block_hash();
556        let height = match self.chain.header_chain.height_of_hash(block_hash) {
557            Some(height) => height,
558            None => {
559                self.dialog.send_warning(Warning::UnexpectedSyncError {
560                    warning: "A block received does not have a known hash".into(),
561                });
562                self.peer_map.ban(peer_id).await;
563                return Some(MainThreadMessage::Disconnect);
564            }
565        };
566        if !block.check_merkle_root() {
567            self.dialog.send_warning(Warning::UnexpectedSyncError {
568                warning: "A block received does not have a valid merkle root".into(),
569            });
570            self.peer_map.ban(peer_id).await;
571            return Some(MainThreadMessage::Disconnect);
572        }
573        let process_block_response = self.block_queue.process_block(&block_hash);
574        match process_block_response {
575            ProcessBlockResponse::Accepted { block_recipient } => {
576                self.dialog
577                    .send_info(Info::BlockReceived(block.block_hash()))
578                    .await;
579                match block_recipient {
580                    BlockRecipient::Client(sender) => {
581                        let send_err = sender.send(Ok(IndexedBlock::new(height, block))).is_err();
582                        if send_err {
583                            self.dialog.send_warning(Warning::ChannelDropped);
584                        };
585                    }
586                    BlockRecipient::Event => {
587                        self.dialog
588                            .send_event(Event::Block(IndexedBlock::new(height, block)));
589                    }
590                }
591            }
592            ProcessBlockResponse::LateResponse => {
593                crate::debug!(format!(
594                    "Peer {} responded late to a request for hash {}",
595                    peer_id, block_hash
596                ));
597            }
598            ProcessBlockResponse::UnknownHash => {
599                crate::debug!(format!(
600                    "Peer {} responded with an irrelevant block",
601                    peer_id
602                ));
603            }
604        }
605        None
606    }
607
608    // The block queue holds all the block hashes we may be interested in
609    fn pop_block_queue(&mut self) -> Option<MainThreadMessage> {
610        if matches!(
611            self.state,
612            NodeState::FilterHeadersSynced | NodeState::FiltersSynced
613        ) {
614            let next_block_hash = self.block_queue.pop();
615            return next_block_hash.map(MainThreadMessage::GetBlock);
616        }
617        None
618    }
619
620    // If new inventory came in, we need to download the headers and update the node state
621    async fn handle_inventory_blocks(
622        &mut self,
623        nonce: PeerId,
624        blocks: Vec<BlockHash>,
625    ) -> Option<MainThreadMessage> {
626        for block in blocks.iter() {
627            if !self.chain.header_chain.contains(*block) {
628                self.peer_map.increment_height(nonce).await;
629                crate::debug!(format!("New block: {}", block));
630            }
631        }
632        match self.state {
633            NodeState::Behind => None,
634            _ => {
635                if blocks
636                    .into_iter()
637                    .any(|block| !self.chain.header_chain.contains(block))
638                {
639                    self.state = NodeState::Behind;
640                    let next_headers = GetHeaderConfig {
641                        locators: self.chain.header_chain.locators(),
642                        stop_hash: None,
643                    };
644                    self.chain.clear_compact_filter_queue();
645                    Some(MainThreadMessage::GetHeaders(next_headers))
646                } else {
647                    None
648                }
649            }
650        }
651    }
652
653    // Clear the filter hash cache and redownload the filters.
654    fn rescan(&mut self) -> Option<MainThreadMessage> {
655        match self.state {
656            NodeState::Behind => None,
657            NodeState::HeadersSynced => None,
658            _ => {
659                self.chain.clear_filters();
660                self.state = NodeState::FilterHeadersSynced;
661                Some(MainThreadMessage::GetFilters(
662                    self.chain.next_filter_message(),
663                ))
664            }
665        }
666    }
667
668    // When the application starts, fetch any headers we know about from the database.
669    async fn fetch_headers(&mut self) -> Result<(), NodeError<H::Error, P::Error>> {
670        crate::debug!("Attempting to load headers from the database");
671        self.chain
672            .load_headers()
673            .await
674            .map_err(NodeError::HeaderDatabase)
675    }
676}