1use std::collections::HashSet;
53use std::fmt;
54use std::ops::DerefMut;
55use std::path::Path;
56use std::sync::atomic::{AtomicUsize, Ordering};
57use std::sync::{Arc, Mutex};
58
59#[allow(unused_imports)]
60use log::{debug, error, info, trace};
61
62use bitcoin::network::message_blockdata::Inventory;
63use bitcoin::{Network, OutPoint, Transaction, Txid};
64
65use rocksdb::{Options, SliceTransform, DB};
66
67mod peer;
68mod store;
69mod sync;
70
71use crate::blockchain::*;
72use crate::database::{BatchDatabase, BatchOperations, DatabaseUtils};
73use crate::error::Error;
74use crate::types::{KeychainKind, LocalUtxo, TransactionDetails};
75use crate::{BlockTime, FeeRate};
76
77use peer::*;
78use store::*;
79use sync::*;
80
81pub use peer::{Mempool, Peer};
82
83const SYNC_HEADERS_COST: f32 = 1.0;
84const SYNC_FILTERS_COST: f32 = 11.6 * 1_000.0;
85const PROCESS_BLOCKS_COST: f32 = 20_000.0;
86
87#[derive(Debug)]
92pub struct CompactFiltersBlockchain {
93 peers: Vec<Arc<Peer>>,
94 headers: Arc<ChainStore<Full>>,
95 skip_blocks: Option<usize>,
96}
97
98impl CompactFiltersBlockchain {
99 pub fn new<P: AsRef<Path>>(
108 peers: Vec<Peer>,
109 storage_dir: P,
110 skip_blocks: Option<usize>,
111 ) -> Result<Self, CompactFiltersError> {
112 if peers.is_empty() {
113 return Err(CompactFiltersError::NoPeers);
114 }
115
116 let mut opts = Options::default();
117 opts.create_if_missing(true);
118 opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(16));
119
120 let network = peers[0].get_network();
121
122 let cfs = DB::list_cf(&opts, &storage_dir).unwrap_or_else(|_| vec!["default".to_string()]);
123 let db = DB::open_cf(&opts, &storage_dir, &cfs)?;
124 let headers = Arc::new(ChainStore::new(db, network)?);
125
126 for cf_name in &cfs {
128 if !cf_name.starts_with("_headers:") {
129 continue;
130 }
131
132 info!("Trying to recover: {:?}", cf_name);
133 headers.recover_snapshot(cf_name)?;
134 }
135
136 Ok(CompactFiltersBlockchain {
137 peers: peers.into_iter().map(Arc::new).collect(),
138 headers,
139 skip_blocks,
140 })
141 }
142
143 fn process_tx<D: BatchDatabase>(
146 &self,
147 database: &mut D,
148 tx: &Transaction,
149 height: Option<u32>,
150 timestamp: Option<u64>,
151 internal_max_deriv: &mut Option<u32>,
152 external_max_deriv: &mut Option<u32>,
153 ) -> Result<(), Error> {
154 let mut updates = database.begin_batch();
155
156 let mut incoming: u64 = 0;
157 let mut outgoing: u64 = 0;
158
159 let mut inputs_sum: u64 = 0;
160 let mut outputs_sum: u64 = 0;
161
162 for (i, input) in tx.input.iter().enumerate() {
164 if let Some(previous_output) = database.get_previous_output(&input.previous_output)? {
165 inputs_sum += previous_output.value;
166
167 if let Some((keychain, _)) =
169 database.get_path_from_script_pubkey(&previous_output.script_pubkey)?
170 {
171 outgoing += previous_output.value;
172
173 debug!("{} input #{} is mine, setting utxo as spent", tx.txid(), i);
174 updates.set_utxo(&LocalUtxo {
175 outpoint: input.previous_output,
176 txout: previous_output.clone(),
177 keychain,
178 is_spent: true,
179 })?;
180 }
181 }
182 }
183
184 for (i, output) in tx.output.iter().enumerate() {
185 outputs_sum += output.value;
187
188 if let Some((keychain, child)) =
190 database.get_path_from_script_pubkey(&output.script_pubkey)?
191 {
192 debug!("{} output #{} is mine, adding utxo", tx.txid(), i);
193 updates.set_utxo(&LocalUtxo {
194 outpoint: OutPoint::new(tx.txid(), i as u32),
195 txout: output.clone(),
196 keychain,
197 is_spent: false,
198 })?;
199 incoming += output.value;
200
201 if keychain == KeychainKind::Internal
202 && (internal_max_deriv.is_none() || child > internal_max_deriv.unwrap_or(0))
203 {
204 *internal_max_deriv = Some(child);
205 } else if keychain == KeychainKind::External
206 && (external_max_deriv.is_none() || child > external_max_deriv.unwrap_or(0))
207 {
208 *external_max_deriv = Some(child);
209 }
210 }
211 }
212
213 if incoming > 0 || outgoing > 0 {
214 let tx = TransactionDetails {
215 txid: tx.txid(),
216 transaction: Some(tx.clone()),
217 received: incoming,
218 sent: outgoing,
219 confirmation_time: BlockTime::new(height, timestamp),
220 fee: Some(inputs_sum.saturating_sub(outputs_sum)),
221 };
222
223 info!("Saving tx {}", tx.txid);
224 updates.set_tx(&tx)?;
225 }
226
227 database.commit_batch(updates)?;
228
229 Ok(())
230 }
231}
232
233impl Blockchain for CompactFiltersBlockchain {
234 fn get_capabilities(&self) -> HashSet<Capability> {
235 vec![Capability::FullHistory].into_iter().collect()
236 }
237
238 fn broadcast(&self, tx: &Transaction) -> Result<(), Error> {
239 self.peers[0].broadcast_tx(tx.clone())?;
240
241 Ok(())
242 }
243
244 fn estimate_fee(&self, _target: usize) -> Result<FeeRate, Error> {
245 Ok(FeeRate::default())
247 }
248}
249
250impl GetHeight for CompactFiltersBlockchain {
251 fn get_height(&self) -> Result<u32, Error> {
252 Ok(self.headers.get_height()? as u32)
253 }
254}
255
256impl GetTx for CompactFiltersBlockchain {
257 fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, Error> {
258 Ok(self.peers[0]
259 .get_mempool()
260 .get_tx(&Inventory::Transaction(*txid)))
261 }
262}
263
264impl GetBlockHash for CompactFiltersBlockchain {
265 fn get_block_hash(&self, height: u64) -> Result<BlockHash, Error> {
266 self.headers
267 .get_block_hash(height as usize)?
268 .ok_or(Error::CompactFilters(
269 CompactFiltersError::BlockHashNotFound,
270 ))
271 }
272}
273
274impl WalletSync for CompactFiltersBlockchain {
275 #[allow(clippy::mutex_atomic)] fn wallet_setup<D: BatchDatabase>(
277 &self,
278 database: &RefCell<D>,
279 progress_update: Box<dyn Progress>,
280 ) -> Result<(), Error> {
281 let first_peer = &self.peers[0];
282
283 let skip_blocks = self.skip_blocks.unwrap_or(0);
284
285 let cf_sync = Arc::new(CfSync::new(Arc::clone(&self.headers), skip_blocks, 0x00)?);
286
287 let initial_height = self.headers.get_height()?;
288 let total_bundles = (first_peer.get_version().start_height as usize)
289 .checked_sub(skip_blocks)
290 .map(|x| x / 1000)
291 .unwrap_or(0)
292 + 1;
293 let expected_bundles_to_sync = total_bundles.saturating_sub(cf_sync.pruned_bundles()?);
294
295 let headers_cost = (first_peer.get_version().start_height as usize)
296 .saturating_sub(initial_height) as f32
297 * SYNC_HEADERS_COST;
298 let filters_cost = expected_bundles_to_sync as f32 * SYNC_FILTERS_COST;
299
300 let total_cost = headers_cost + filters_cost + PROCESS_BLOCKS_COST;
301
302 if let Some(snapshot) = sync::sync_headers(
303 Arc::clone(first_peer),
304 Arc::clone(&self.headers),
305 |new_height| {
306 let local_headers_cost =
307 new_height.saturating_sub(initial_height) as f32 * SYNC_HEADERS_COST;
308 progress_update.update(
309 local_headers_cost / total_cost * 100.0,
310 Some(format!("Synced headers to {}", new_height)),
311 )
312 },
313 )? {
314 if snapshot.work()? > self.headers.work()? {
315 info!("Applying snapshot with work: {}", snapshot.work()?);
316 self.headers.apply_snapshot(snapshot)?;
317 }
318 }
319
320 let synced_height = self.headers.get_height()?;
321 let buried_height = synced_height.saturating_sub(sync::BURIED_CONFIRMATIONS);
322 info!("Synced headers to height: {}", synced_height);
323
324 cf_sync.prepare_sync(Arc::clone(first_peer))?;
325
326 let mut database = database.borrow_mut();
327 let database = database.deref_mut();
328
329 let all_scripts = Arc::new(
330 database
331 .iter_script_pubkeys(None)?
332 .into_iter()
333 .map(|s| s.to_bytes())
334 .collect::<Vec<_>>(),
335 );
336
337 #[allow(clippy::mutex_atomic)]
338 let last_synced_block = Arc::new(Mutex::new(synced_height));
339
340 let synced_bundles = Arc::new(AtomicUsize::new(0));
341 let progress_update = Arc::new(Mutex::new(progress_update));
342
343 let mut threads = Vec::with_capacity(self.peers.len());
344 for peer in &self.peers {
345 let cf_sync = Arc::clone(&cf_sync);
346 let peer = Arc::clone(peer);
347 let headers = Arc::clone(&self.headers);
348 let all_scripts = Arc::clone(&all_scripts);
349 let last_synced_block = Arc::clone(&last_synced_block);
350 let progress_update = Arc::clone(&progress_update);
351 let synced_bundles = Arc::clone(&synced_bundles);
352
353 let thread = std::thread::spawn(move || {
354 cf_sync.capture_thread_for_sync(
355 peer,
356 |block_hash, filter| {
357 if !filter
358 .match_any(block_hash, all_scripts.iter().map(|s| s.as_slice()))?
359 {
360 return Ok(false);
361 }
362
363 let block_height = headers.get_height_for(block_hash)?.unwrap_or(0);
364 let saved_correct_block = matches!(headers.get_full_block(block_height)?, Some(block) if &block.block_hash() == block_hash);
365
366 if saved_correct_block {
367 Ok(false)
368 } else {
369 let mut last_synced_block = last_synced_block.lock().unwrap();
370
371 if block_height < *last_synced_block {
374 *last_synced_block = block_height;
375 }
376
377 Ok(true)
378 }
379 },
380 |index| {
381 let synced_bundles = synced_bundles.fetch_add(1, Ordering::SeqCst);
382 let local_filters_cost = synced_bundles as f32 * SYNC_FILTERS_COST;
383 progress_update.lock().unwrap().update(
384 (headers_cost + local_filters_cost) / total_cost * 100.0,
385 Some(format!(
386 "Synced filters {} - {}",
387 index * 1000 + 1,
388 (index + 1) * 1000
389 )),
390 )
391 },
392 )
393 });
394
395 threads.push(thread);
396 }
397
398 for t in threads {
399 t.join().unwrap()?;
400 }
401
402 progress_update.lock().unwrap().update(
403 (headers_cost + filters_cost) / total_cost * 100.0,
404 Some("Processing downloaded blocks and mempool".into()),
405 )?;
406
407 let last_synced_block = *last_synced_block.lock().unwrap();
409 log::debug!(
410 "Dropping transactions newer than `last_synced_block` = {}",
411 last_synced_block
412 );
413 let mut updates = database.begin_batch();
414 for details in database.iter_txs(false)? {
415 match details.confirmation_time {
416 Some(c) if (c.height as usize) < last_synced_block => continue,
417 _ => updates.del_tx(&details.txid, false)?,
418 };
419 }
420 database.commit_batch(updates)?;
421
422 match first_peer.ask_for_mempool() {
423 Err(CompactFiltersError::PeerBloomDisabled) => {
424 log::warn!("Peer has BLOOM disabled, we can't ask for the mempool")
425 }
426 e => e?,
427 };
428
429 let mut internal_max_deriv = None;
430 let mut external_max_deriv = None;
431
432 for (height, block) in self.headers.iter_full_blocks()? {
433 for tx in &block.txdata {
434 self.process_tx(
435 database,
436 tx,
437 Some(height as u32),
438 None,
439 &mut internal_max_deriv,
440 &mut external_max_deriv,
441 )?;
442 }
443 }
444 for tx in first_peer.get_mempool().iter_txs().iter() {
445 self.process_tx(
446 database,
447 tx,
448 None,
449 None,
450 &mut internal_max_deriv,
451 &mut external_max_deriv,
452 )?;
453 }
454
455 let current_ext = database
456 .get_last_index(KeychainKind::External)?
457 .unwrap_or(0);
458 let first_ext_new = external_max_deriv.map(|x| x + 1).unwrap_or(0);
459 if first_ext_new > current_ext {
460 info!("Setting external index to {}", first_ext_new);
461 database.set_last_index(KeychainKind::External, first_ext_new)?;
462 }
463
464 let current_int = database
465 .get_last_index(KeychainKind::Internal)?
466 .unwrap_or(0);
467 let first_int_new = internal_max_deriv.map(|x| x + 1).unwrap_or(0);
468 if first_int_new > current_int {
469 info!("Setting internal index to {}", first_int_new);
470 database.set_last_index(KeychainKind::Internal, first_int_new)?;
471 }
472
473 info!("Dropping blocks until {}", buried_height);
474 self.headers.delete_blocks_until(buried_height)?;
475
476 progress_update
477 .lock()
478 .unwrap()
479 .update(100.0, Some("Done".into()))?;
480
481 Ok(())
482 }
483}
484
485#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
487pub struct BitcoinPeerConfig {
488 pub address: String,
490 pub socks5: Option<String>,
492 pub socks5_credentials: Option<(String, String)>,
494}
495
496#[derive(Debug, serde::Deserialize, serde::Serialize, Clone, PartialEq, Eq)]
498pub struct CompactFiltersBlockchainConfig {
499 pub peers: Vec<BitcoinPeerConfig>,
501 pub network: Network,
503 pub storage_dir: String,
505 pub skip_blocks: Option<usize>,
507}
508
509impl ConfigurableBlockchain for CompactFiltersBlockchain {
510 type Config = CompactFiltersBlockchainConfig;
511
512 fn from_config(config: &Self::Config) -> Result<Self, Error> {
513 let mempool = Arc::new(Mempool::default());
514 let peers = config
515 .peers
516 .iter()
517 .map(|peer_conf| match &peer_conf.socks5 {
518 None => Peer::connect(&peer_conf.address, Arc::clone(&mempool), config.network),
519 Some(proxy) => Peer::connect_proxy(
520 peer_conf.address.as_str(),
521 proxy,
522 peer_conf
523 .socks5_credentials
524 .as_ref()
525 .map(|(a, b)| (a.as_str(), b.as_str())),
526 Arc::clone(&mempool),
527 config.network,
528 ),
529 })
530 .collect::<Result<_, _>>()?;
531
532 Ok(CompactFiltersBlockchain::new(
533 peers,
534 &config.storage_dir,
535 config.skip_blocks,
536 )?)
537 }
538}
539
540#[derive(Debug)]
542pub enum CompactFiltersError {
543 InvalidResponse,
545 InvalidHeaders,
547 InvalidFilterHeader,
549 InvalidFilter,
551 MissingBlock,
553 BlockHashNotFound,
555 DataCorruption,
557
558 NotConnected,
560 Timeout,
562 PeerBloomDisabled,
564
565 NoPeers,
567
568 Db(rocksdb::Error),
570 Io(std::io::Error),
572 Bip158(bitcoin::bip158::Error),
574 Time(std::time::SystemTimeError),
576
577 Global(Box<crate::error::Error>),
579}
580
581impl fmt::Display for CompactFiltersError {
582 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
583 match self {
584 Self::InvalidResponse => write!(f, "A peer sent an invalid or unexpected response"),
585 Self::InvalidHeaders => write!(f, "Invalid headers"),
586 Self::InvalidFilterHeader => write!(f, "Invalid filter header"),
587 Self::InvalidFilter => write!(f, "Invalid filters"),
588 Self::MissingBlock => write!(f, "The peer is missing a block in the valid chain"),
589 Self::BlockHashNotFound => write!(f, "Block hash not found"),
590 Self::DataCorruption => write!(
591 f,
592 "The data stored in the block filters storage are corrupted"
593 ),
594 Self::NotConnected => write!(f, "A peer is not connected"),
595 Self::Timeout => write!(f, "A peer took too long to reply to one of our messages"),
596 Self::PeerBloomDisabled => write!(f, "Peer doesn't advertise the BLOOM service flag"),
597 Self::NoPeers => write!(f, "No peers have been specified"),
598 Self::Db(err) => write!(f, "Internal database error: {}", err),
599 Self::Io(err) => write!(f, "Internal I/O error: {}", err),
600 Self::Bip158(err) => write!(f, "Invalid BIP158 filter: {}", err),
601 Self::Time(err) => write!(f, "Invalid system time: {}", err),
602 Self::Global(err) => write!(f, "Generic error: {}", err),
603 }
604 }
605}
606
607impl std::error::Error for CompactFiltersError {}
608
609impl_error!(rocksdb::Error, Db, CompactFiltersError);
610impl_error!(std::io::Error, Io, CompactFiltersError);
611impl_error!(bitcoin::bip158::Error, Bip158, CompactFiltersError);
612impl_error!(std::time::SystemTimeError, Time, CompactFiltersError);
613
614impl From<crate::error::Error> for CompactFiltersError {
615 fn from(err: crate::error::Error) -> Self {
616 CompactFiltersError::Global(Box::new(err))
617 }
618}