bdk/blockchain/compact_filters/
mod.rs

1// Bitcoin Dev Kit
2// Written in 2020 by Alekos Filini <alekos.filini@gmail.com>
3//
4// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
5//
6// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
7// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
9// You may not use this file except in accordance with one or both of these
10// licenses.
11
12//! Compact Filters
13//!
14//! This module contains a multithreaded implementation of an [`Blockchain`] backend that
15//! uses BIP157 (aka "Neutrino") to populate the wallet's [database](crate::database::Database)
16//! by downloading compact filters from the P2P network.
17//!
18//! Since there are currently very few peers "in the wild" that advertise the required service
19//! flag, this implementation requires that one or more known peers are provided by the user.
20//! No dns or other kinds of peer discovery are done internally.
21//!
22//! Moreover, this module doesn't currently support detecting and resolving conflicts between
23//! messages received by different peers. Thus, it's recommended to use this module by only
24//! connecting to a single peer at a time, optionally by opening multiple connections if it's
25//! desirable to use multiple threads at once to sync in parallel.
26//!
27//! This is an **EXPERIMENTAL** feature, API and other major changes are expected.
28//!
29//! ## Example
30//!
31//! ```no_run
32//! # use std::sync::Arc;
33//! # use bitcoin::*;
34//! # use bdk::*;
35//! # use bdk::blockchain::compact_filters::*;
36//! let num_threads = 4;
37//!
38//! let mempool = Arc::new(Mempool::default());
39//! let peers = (0..num_threads)
40//!     .map(|_| {
41//!         Peer::connect(
42//!             "btcd-mainnet.lightning.computer:8333",
43//!             Arc::clone(&mempool),
44//!             Network::Bitcoin,
45//!         )
46//!     })
47//!     .collect::<Result<_, _>>()?;
48//! let blockchain = CompactFiltersBlockchain::new(peers, "./wallet-filters", Some(500_000))?;
49//! # Ok::<(), CompactFiltersError>(())
50//! ```
51
52use std::collections::HashSet;
53use std::fmt;
54use std::ops::DerefMut;
55use std::path::Path;
56use std::sync::atomic::{AtomicUsize, Ordering};
57use std::sync::{Arc, Mutex};
58
59#[allow(unused_imports)]
60use log::{debug, error, info, trace};
61
62use bitcoin::network::message_blockdata::Inventory;
63use bitcoin::{Network, OutPoint, Transaction, Txid};
64
65use rocksdb::{Options, SliceTransform, DB};
66
67mod peer;
68mod store;
69mod sync;
70
71use crate::blockchain::*;
72use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
73use crate::error::Error;
74use crate::types::{KeychainKind, LocalUtxo, TransactionDetails};
75use crate::{BlockTime, FeeRate};
76
77use peer::*;
78use store::*;
79use sync::*;
80
81pub use peer::{Mempool, Peer};
82
83const SYNC_HEADERS_COST: f32 = 1.0;
84const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
85const PROCESS_BLOCKS_COST: f32 = 20_000.0;
86
87/// Structure implementing the required blockchain traits
88///
89/// ## Example
90/// See the [`blockchain::compact_filters`](crate::blockchain::compact_filters) module for a usage example.
91#[derive(Debug)]
92pub struct CompactFiltersBlockchain {
93    peers: Vec<Arc<Peer>>,
94    headers: Arc<ChainStore<Full>>,
95    skip_blocks: Option<usize>,
96}
97
98impl CompactFiltersBlockchain {
99    /// Construct a new instance given a list of peers, a path to store headers and block
100    /// filters downloaded during the sync and optionally a number of blocks to ignore starting
101    /// from the genesis while scanning for the wallet's outputs.
102    ///
103    /// For each [`Peer`] specified a new thread will be spawned to download and verify the filters
104    /// in parallel. It's currently recommended to only connect to a single peer to avoid
105    /// inconsistencies in the data returned, optionally with multiple connections in parallel to
106    /// speed-up the sync process.
107    pub fn new<P: AsRef<Path>>(
108        peers: Vec<Peer>,
109        storage_dir: P,
110        skip_blocks: Option<usize>,
111    ) -> Result<Self, CompactFiltersError> {
112        if peers.is_empty() {
113            return Err(CompactFiltersError::NoPeers);
114        }
115
116        let mut opts = Options::default();
117        opts.create_if_missing(true);
118        opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16));
119
120        let network = peers[0].get_network();
121
122        let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or_else(|_| vec!["default".to_string()]);
123        let db = DB::open_cf(&opts, &storage_dir, &cfs)?;
124        let headers = Arc::new(ChainStore::new(db, network)?);
125
126        // try to recover partial snapshots
127        for cf_name in &cfs {
128            if !cf_name.starts_with("_headers:") {
129                continue;
130            }
131
132            info!("Trying to recover: {:?}", cf_name);
133            headers.recover_snapshot(cf_name)?;
134        }
135
136        Ok(CompactFiltersBlockchain {
137            peers: peers.into_iter().map(Arc::new).collect(),
138            headers,
139            skip_blocks,
140        })
141    }
142
143    /// Process a transaction by looking for inputs that spend from a UTXO in the database or
144    /// outputs that send funds to a know script_pubkey.
145    fn process_tx<D: BatchDatabase>(
146        &self,
147        database: &mut D,
148        tx: &Transaction,
149        height: Option<u32>,
150        timestamp: Option<u64>,
151        internal_max_deriv: &mut Option<u32>,
152        external_max_deriv: &mut Option<u32>,
153    ) -> Result<(), Error> {
154        let mut updates = database.begin_batch();
155
156        let mut incoming: u64 = 0;
157        let mut outgoing: u64 = 0;
158
159        let mut inputs_sum: u64 = 0;
160        let mut outputs_sum: u64 = 0;
161
162        // look for our own inputs
163        for (i, input) in tx.input.iter().enumerate() {
164            if let Some(previous_output) = database.get_previous_output(&input.previous_output)? {
165                inputs_sum += previous_output.value;
166
167                // this output is ours, we have a path to derive it
168                if let Some((keychain, _)) =
169                    database.get_path_from_script_pubkey(&previous_output.script_pubkey)?
170                {
171                    outgoing += previous_output.value;
172
173                    debug!("{} input #{} is mine, setting utxo as spent", tx.txid(), i);
174                    updates.set_utxo(&LocalUtxo {
175                        outpoint: input.previous_output,
176                        txout: previous_output.clone(),
177                        keychain,
178                        is_spent: true,
179                    })?;
180                }
181            }
182        }
183
184        for (i, output) in tx.output.iter().enumerate() {
185            // to compute the fees later
186            outputs_sum += output.value;
187
188            // this output is ours, we have a path to derive it
189            if let Some((keychain, child)) =
190                database.get_path_from_script_pubkey(&output.script_pubkey)?
191            {
192                debug!("{} output #{} is mine, adding utxo", tx.txid(), i);
193                updates.set_utxo(&LocalUtxo {
194                    outpoint: OutPoint::new(tx.txid(), i as u32),
195                    txout: output.clone(),
196                    keychain,
197                    is_spent: false,
198                })?;
199                incoming += output.value;
200
201                if keychain == KeychainKind::Internal
202                    && (internal_max_deriv.is_none() || child > internal_max_deriv.unwrap_or(0))
203                {
204                    *internal_max_deriv = Some(child);
205                } else if keychain == KeychainKind::External
206                    && (external_max_deriv.is_none() || child > external_max_deriv.unwrap_or(0))
207                {
208                    *external_max_deriv = Some(child);
209                }
210            }
211        }
212
213        if incoming > 0 || outgoing > 0 {
214            let tx = TransactionDetails {
215                txid: tx.txid(),
216                transaction: Some(tx.clone()),
217                received: incoming,
218                sent: outgoing,
219                confirmation_time: BlockTime::new(height, timestamp),
220                fee: Some(inputs_sum.saturating_sub(outputs_sum)),
221            };
222
223            info!("Saving tx {}", tx.txid);
224            updates.set_tx(&tx)?;
225        }
226
227        database.commit_batch(updates)?;
228
229        Ok(())
230    }
231}
232
233impl Blockchain for CompactFiltersBlockchain {
234    fn get_capabilities(&self) -> HashSet<Capability> {
235        vec![Capability::FullHistory].into_iter().collect()
236    }
237
238    fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
239        self.peers[0].broadcast_tx(tx.clone())?;
240
241        Ok(())
242    }
243
244    fn estimate_fee(&self, _target: usize) -> Result<FeeRate, Error> {
245        // TODO
246        Ok(FeeRate::default())
247    }
248}
249
250impl GetHeight for CompactFiltersBlockchain {
251    fn get_height(&self) -> Result<u32, Error> {
252        Ok(self.headers.get_height()? as u32)
253    }
254}
255
256impl GetTx for CompactFiltersBlockchain {
257    fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
258        Ok(self.peers[0]
259            .get_mempool()
260            .get_tx(&Inventory::Transaction(*txid)))
261    }
262}
263
264impl GetBlockHash for CompactFiltersBlockchain {
265    fn get_block_hash(&self, height: u64) -> Result<BlockHash, Error> {
266        self.headers
267            .get_block_hash(height as usize)?
268            .ok_or(Error::CompactFilters(
269                CompactFiltersError::BlockHashNotFound,
270            ))
271    }
272}
273
274impl WalletSync for CompactFiltersBlockchain {
275    #[allow(clippy::mutex_atomic)] // Mutex is easier to understand than a CAS loop.
276    fn wallet_setup<D: BatchDatabase>(
277        &self,
278        database: &RefCell<D>,
279        progress_update: Box<dyn Progress>,
280    ) -> Result<(), Error> {
281        let first_peer = &self.peers[0];
282
283        let skip_blocks = self.skip_blocks.unwrap_or(0);
284
285        let cf_sync = Arc::new(CfSync::new(Arc::clone(&self.headers), skip_blocks, 0x00)?);
286
287        let initial_height = self.headers.get_height()?;
288        let total_bundles = (first_peer.get_version().start_height as usize)
289            .checked_sub(skip_blocks)
290            .map(|x| x / 1000)
291            .unwrap_or(0)
292            + 1;
293        let expected_bundles_to_sync = total_bundles.saturating_sub(cf_sync.pruned_bundles()?);
294
295        let headers_cost = (first_peer.get_version().start_height as usize)
296            .saturating_sub(initial_height) as f32
297            * SYNC_HEADERS_COST;
298        let filters_cost = expected_bundles_to_sync as f32 * SYNC_FILTERS_COST;
299
300        let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
301
302        if let Some(snapshot) = sync::sync_headers(
303            Arc::clone(first_peer),
304            Arc::clone(&self.headers),
305            |new_height| {
306                let local_headers_cost =
307                    new_height.saturating_sub(initial_height) as f32 * SYNC_HEADERS_COST;
308                progress_update.update(
309                    local_headers_cost / total_cost * 100.0,
310                    Some(format!("Synced headers to {}", new_height)),
311                )
312            },
313        )? {
314            if snapshot.work()? > self.headers.work()? {
315                info!("Applying snapshot with work: {}", snapshot.work()?);
316                self.headers.apply_snapshot(snapshot)?;
317            }
318        }
319
320        let synced_height = self.headers.get_height()?;
321        let buried_height = synced_height.saturating_sub(sync::BURIED_CONFIRMATIONS);
322        info!("Synced headers to height: {}", synced_height);
323
324        cf_sync.prepare_sync(Arc::clone(first_peer))?;
325
326        let mut database = database.borrow_mut();
327        let database = database.deref_mut();
328
329        let all_scripts = Arc::new(
330            database
331                .iter_script_pubkeys(None)?
332                .into_iter()
333                .map(|s| s.to_bytes())
334                .collect::<Vec<_>>(),
335        );
336
337        #[allow(clippy::mutex_atomic)]
338        let last_synced_block = Arc::new(Mutex::new(synced_height));
339
340        let synced_bundles = Arc::new(AtomicUsize::new(0));
341        let progress_update = Arc::new(Mutex::new(progress_update));
342
343        let mut threads = Vec::with_capacity(self.peers.len());
344        for peer in &self.peers {
345            let cf_sync = Arc::clone(&cf_sync);
346            let peer = Arc::clone(peer);
347            let headers = Arc::clone(&self.headers);
348            let all_scripts = Arc::clone(&all_scripts);
349            let last_synced_block = Arc::clone(&last_synced_block);
350            let progress_update = Arc::clone(&progress_update);
351            let synced_bundles = Arc::clone(&synced_bundles);
352
353            let thread = std::thread::spawn(move || {
354                cf_sync.capture_thread_for_sync(
355                    peer,
356                    |block_hash, filter| {
357                        if !filter
358                            .match_any(block_hash, all_scripts.iter().map(|s| s.as_slice()))?
359                        {
360                            return Ok(false);
361                        }
362
363                        let block_height = headers.get_height_for(block_hash)?.unwrap_or(0);
364                        let saved_correct_block = matches!(headers.get_full_block(block_height)?, Some(block) if &block.block_hash() == block_hash);
365
366                        if saved_correct_block {
367                            Ok(false)
368                        } else {
369                            let mut last_synced_block = last_synced_block.lock().unwrap();
370
371                            // If we download a block older than `last_synced_block`, we update it so that
372                            // we know to delete and re-process all txs starting from that height
373                            if block_height < *last_synced_block {
374                                *last_synced_block = block_height;
375                            }
376
377                            Ok(true)
378                        }
379                    },
380                    |index| {
381                        let synced_bundles = synced_bundles.fetch_add(1, Ordering::SeqCst);
382                        let local_filters_cost = synced_bundles as f32 * SYNC_FILTERS_COST;
383                        progress_update.lock().unwrap().update(
384                            (headers_cost + local_filters_cost) / total_cost * 100.0,
385                            Some(format!(
386                                "Synced filters {} - {}",
387                                index * 1000 + 1,
388                                (index + 1) * 1000
389                            )),
390                        )
391                    },
392                )
393            });
394
395            threads.push(thread);
396        }
397
398        for t in threads {
399            t.join().unwrap()?;
400        }
401
402        progress_update.lock().unwrap().update(
403            (headers_cost + filters_cost) / total_cost * 100.0,
404            Some("Processing downloaded blocks and mempool".into()),
405        )?;
406
407        // delete all txs newer than last_synced_block
408        let last_synced_block = *last_synced_block.lock().unwrap();
409        log::debug!(
410            "Dropping transactions newer than `last_synced_block` = {}",
411            last_synced_block
412        );
413        let mut updates = database.begin_batch();
414        for details in database.iter_txs(false)? {
415            match details.confirmation_time {
416                Some(c) if (c.height as usize) < last_synced_block => continue,
417                _ => updates.del_tx(&details.txid, false)?,
418            };
419        }
420        database.commit_batch(updates)?;
421
422        match first_peer.ask_for_mempool() {
423            Err(CompactFiltersError::PeerBloomDisabled) => {
424                log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
425            }
426            e => e?,
427        };
428
429        let mut internal_max_deriv = None;
430        let mut external_max_deriv = None;
431
432        for (height, block) in self.headers.iter_full_blocks()? {
433            for tx in &block.txdata {
434                self.process_tx(
435                    database,
436                    tx,
437                    Some(height as u32),
438                    None,
439                    &mut internal_max_deriv,
440                    &mut external_max_deriv,
441                )?;
442            }
443        }
444        for tx in first_peer.get_mempool().iter_txs().iter() {
445            self.process_tx(
446                database,
447                tx,
448                None,
449                None,
450                &mut internal_max_deriv,
451                &mut external_max_deriv,
452            )?;
453        }
454
455        let current_ext = database
456            .get_last_index(KeychainKind::External)?
457            .unwrap_or(0);
458        let first_ext_new = external_max_deriv.map(|x| x + 1).unwrap_or(0);
459        if first_ext_new > current_ext {
460            info!("Setting external index to {}", first_ext_new);
461            database.set_last_index(KeychainKind::External, first_ext_new)?;
462        }
463
464        let current_int = database
465            .get_last_index(KeychainKind::Internal)?
466            .unwrap_or(0);
467        let first_int_new = internal_max_deriv.map(|x| x + 1).unwrap_or(0);
468        if first_int_new > current_int {
469            info!("Setting internal index to {}", first_int_new);
470            database.set_last_index(KeychainKind::Internal, first_int_new)?;
471        }
472
473        info!("Dropping blocks until {}", buried_height);
474        self.headers.delete_blocks_until(buried_height)?;
475
476        progress_update
477            .lock()
478            .unwrap()
479            .update(100.0, Some("Done".into()))?;
480
481        Ok(())
482    }
483}
484
485/// Data to connect to a Bitcoin P2P peer
486#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
487pub struct BitcoinPeerConfig {
488    /// Peer address such as 127.0.0.1:18333
489    pub address: String,
490    /// Optional socks5 proxy
491    pub socks5: Option<String>,
492    /// Optional socks5 proxy credentials
493    pub socks5_credentials: Option<(String, String)>,
494}
495
496/// Configuration for a [`CompactFiltersBlockchain`]
497#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
498pub struct CompactFiltersBlockchainConfig {
499    /// List of peers to try to connect to for asking headers and filters
500    pub peers: Vec<BitcoinPeerConfig>,
501    /// Network used
502    pub network: Network,
503    /// Storage dir to save partially downloaded headers and full blocks. Should be a separate directory per descriptor. Consider using [crate::wallet::wallet_name_from_descriptor] for this.
504    pub storage_dir: String,
505    /// Optionally skip initial `skip_blocks` blocks (default: 0)
506    pub skip_blocks: Option<usize>,
507}
508
509impl ConfigurableBlockchain for CompactFiltersBlockchain {
510    type Config = CompactFiltersBlockchainConfig;
511
512    fn from_config(config: &Self::Config) -> Result<Self, Error> {
513        let mempool = Arc::new(Mempool::default());
514        let peers = config
515            .peers
516            .iter()
517            .map(|peer_conf| match &peer_conf.socks5 {
518                None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network),
519                Some(proxy) => Peer::connect_proxy(
520                    peer_conf.address.as_str(),
521                    proxy,
522                    peer_conf
523                        .socks5_credentials
524                        .as_ref()
525                        .map(|(a, b)| (a.as_str(), b.as_str())),
526                    Arc::clone(&mempool),
527                    config.network,
528                ),
529            })
530            .collect::<Result<_, _>>()?;
531
532        Ok(CompactFiltersBlockchain::new(
533            peers,
534            &config.storage_dir,
535            config.skip_blocks,
536        )?)
537    }
538}
539
540/// An error that can occur during sync with a [`CompactFiltersBlockchain`]
541#[derive(Debug)]
542pub enum CompactFiltersError {
543    /// A peer sent an invalid or unexpected response
544    InvalidResponse,
545    /// The headers returned are invalid
546    InvalidHeaders,
547    /// The compact filter headers returned are invalid
548    InvalidFilterHeader,
549    /// The compact filter returned is invalid
550    InvalidFilter,
551    /// The peer is missing a block in the valid chain
552    MissingBlock,
553    /// Block hash at specified height not found
554    BlockHashNotFound,
555    /// The data stored in the block filters storage are corrupted
556    DataCorruption,
557
558    /// A peer is not connected
559    NotConnected,
560    /// A peer took too long to reply to one of our messages
561    Timeout,
562    /// The peer doesn't advertise the [`BLOOM`](bitcoin::network::constants::ServiceFlags::BLOOM) service flag
563    PeerBloomDisabled,
564
565    /// No peers have been specified
566    NoPeers,
567
568    /// Internal database error
569    Db(rocksdb::Error),
570    /// Internal I/O error
571    Io(std::io::Error),
572    /// Invalid BIP158 filter
573    Bip158(bitcoin::bip158::Error),
574    /// Internal system time error
575    Time(std::time::SystemTimeError),
576
577    /// Wrapper for [`crate::error::Error`]
578    Global(Box<crate::error::Error>),
579}
580
581impl fmt::Display for CompactFiltersError {
582    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
583        match self {
584            Self::InvalidResponse => write!(f, "A peer sent an invalid or unexpected response"),
585            Self::InvalidHeaders => write!(f, "Invalid headers"),
586            Self::InvalidFilterHeader => write!(f, "Invalid filter header"),
587            Self::InvalidFilter => write!(f, "Invalid filters"),
588            Self::MissingBlock => write!(f, "The peer is missing a block in the valid chain"),
589            Self::BlockHashNotFound => write!(f, "Block hash not found"),
590            Self::DataCorruption => write!(
591                f,
592                "The data stored in the block filters storage are corrupted"
593            ),
594            Self::NotConnected => write!(f, "A peer is not connected"),
595            Self::Timeout => write!(f, "A peer took too long to reply to one of our messages"),
596            Self::PeerBloomDisabled => write!(f, "Peer doesn't advertise the BLOOM service flag"),
597            Self::NoPeers => write!(f, "No peers have been specified"),
598            Self::Db(err) => write!(f, "Internal database error: {}", err),
599            Self::Io(err) => write!(f, "Internal I/O error: {}", err),
600            Self::Bip158(err) => write!(f, "Invalid BIP158 filter: {}", err),
601            Self::Time(err) => write!(f, "Invalid system time: {}", err),
602            Self::Global(err) => write!(f, "Generic error: {}", err),
603        }
604    }
605}
606
607impl std::error::Error for CompactFiltersError {}
608
609impl_error!(rocksdb::Error, Db, CompactFiltersError);
610impl_error!(std::io::Error, Io, CompactFiltersError);
611impl_error!(bitcoin::bip158::Error, Bip158, CompactFiltersError);
612impl_error!(std::time::SystemTimeError, Time, CompactFiltersError);
613
614impl From<crate::error::Error> for CompactFiltersError {
615    fn from(err: crate::error::Error) -> Self {
616        CompactFiltersError::Global(Box::new(err))
617    }
618}