lightning_transaction_sync/
electrum.rs

1// This file is Copyright its original authors, visible in version control history.
2//
3// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6// accordance with one or both of these licenses.
7
8use crate::common::{ConfirmedTx, FilterQueue, SyncState};
9use crate::error::{InternalError, TxSyncError};
10
11use electrum_client::utils::validate_merkle_proof;
12use electrum_client::Client as ElectrumClient;
13use electrum_client::ElectrumApi;
14
15use lightning::chain::WatchedOutput;
16use lightning::chain::{Confirm, Filter};
17use lightning::util::logger::Logger;
18use lightning::{log_debug, log_error, log_trace};
19
20use bitcoin::block::Header;
21use bitcoin::{BlockHash, Script, Transaction, Txid};
22
23use std::collections::HashSet;
24use std::ops::Deref;
25use std::sync::Mutex;
26use std::time::Instant;
27
28/// Synchronizes LDK with a given Electrum server.
29///
30/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
31/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
32/// reconfirmation.
33///
34/// Note that registration via [`Filter`] needs to happen before any calls to
35/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
36///
37/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
38/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
39/// [`Filter`]: lightning::chain::Filter
40pub struct ElectrumSyncClient<L: Deref>
41where
42	L::Target: Logger,
43{
44	sync_state: Mutex<SyncState>,
45	queue: Mutex<FilterQueue>,
46	client: ElectrumClient,
47	logger: L,
48}
49
50impl<L: Deref> ElectrumSyncClient<L>
51where
52	L::Target: Logger,
53{
54	/// Returns a new [`ElectrumSyncClient`] object.
55	pub fn new(server_url: String, logger: L) -> Result<Self, TxSyncError> {
56		let client = ElectrumClient::new(&server_url).map_err(|e| {
57			log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e);
58			e
59		})?;
60
61		Self::from_client(client, logger)
62	}
63
64	/// Returns a new [`ElectrumSyncClient`] object using the given Electrum client.
65	///
66	/// This is not exported to bindings users as the underlying client from BDK is not exported.
67	pub fn from_client(client: ElectrumClient, logger: L) -> Result<Self, TxSyncError> {
68		let sync_state = Mutex::new(SyncState::new());
69		let queue = Mutex::new(FilterQueue::new());
70
71		Ok(Self { sync_state, queue, client, logger })
72	}
73
74	/// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
75	/// method should be called regularly to keep LDK up-to-date with current chain data.
76	///
77	/// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
78	/// newest on-chain activity related to the items previously registered via the [`Filter`]
79	/// interface.
80	///
81	/// [`Confirm`]: lightning::chain::Confirm
82	/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
83	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
84	/// [`Filter`]: lightning::chain::Filter
85	pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
86	where
87		C::Target: Confirm,
88	{
89		// This lock makes sure we're syncing once at a time.
90		let mut sync_state = self.sync_state.lock().unwrap();
91
92		log_trace!(self.logger, "Starting transaction sync.");
93		#[cfg(feature = "time")]
94		let start_time = Instant::now();
95		let mut num_confirmed = 0;
96		let mut num_unconfirmed = 0;
97
98		// Clear any header notifications we might have gotten to keep the queue count low.
99		while let Some(_) = self.client.block_headers_pop()? {}
100
101		let tip_notification = self.client.block_headers_subscribe()?;
102		let mut tip_header = tip_notification.header;
103		let mut tip_height = tip_notification.height as u32;
104
105		loop {
106			let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
107			let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash;
108
109			// We loop until any registered transactions have been processed at least once, or the
110			// tip hasn't been updated during the last iteration.
111			if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
112				// Nothing to do.
113				break;
114			} else {
115				// Update the known tip to the newest one.
116				if tip_is_new {
117					// First check for any unconfirmed transactions and act on it immediately.
118					match self.get_unconfirmed_transactions(&confirmables) {
119						Ok(unconfirmed_txs) => {
120							// Double-check the tip hash. If it changed, a reorg happened since
121							// we started syncing and we need to restart last-minute.
122							match self.check_update_tip(&mut tip_header, &mut tip_height) {
123								Ok(false) => {
124									num_unconfirmed += unconfirmed_txs.len();
125									sync_state.sync_unconfirmed_transactions(
126										&confirmables,
127										unconfirmed_txs,
128									);
129								},
130								Ok(true) => {
131									log_debug!(self.logger,
132										"Encountered inconsistency during transaction sync, restarting.");
133									sync_state.pending_sync = true;
134									continue;
135								},
136								Err(err) => {
137									// (Semi-)permanent failure, retry later.
138									log_error!(self.logger,
139										"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
140										num_confirmed,
141										num_unconfirmed
142									);
143									sync_state.pending_sync = true;
144									return Err(TxSyncError::from(err));
145								},
146							}
147						},
148						Err(err) => {
149							// (Semi-)permanent failure, retry later.
150							log_error!(self.logger,
151								"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
152								num_confirmed,
153								num_unconfirmed
154							);
155							sync_state.pending_sync = true;
156							return Err(TxSyncError::from(err));
157						},
158					}
159
160					// Update the best block.
161					for c in &confirmables {
162						c.best_block_updated(&tip_header, tip_height);
163					}
164
165					// Prune any sufficiently confirmed output spends
166					sync_state.prune_output_spends(tip_height);
167				}
168
169				match self.get_confirmed_transactions(&sync_state) {
170					Ok(confirmed_txs) => {
171						// Double-check the tip hash. If it changed, a reorg happened since
172						// we started syncing and we need to restart last-minute.
173						match self.check_update_tip(&mut tip_header, &mut tip_height) {
174							Ok(false) => {
175								num_confirmed += confirmed_txs.len();
176								sync_state
177									.sync_confirmed_transactions(&confirmables, confirmed_txs);
178							},
179							Ok(true) => {
180								log_debug!(self.logger,
181									"Encountered inconsistency during transaction sync, restarting.");
182								sync_state.pending_sync = true;
183								continue;
184							},
185							Err(err) => {
186								// (Semi-)permanent failure, retry later.
187								log_error!(self.logger,
188									"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
189									num_confirmed,
190									num_unconfirmed
191								);
192								sync_state.pending_sync = true;
193								return Err(TxSyncError::from(err));
194							},
195						}
196					},
197					Err(InternalError::Inconsistency) => {
198						// Immediately restart syncing when we encounter any inconsistencies.
199						log_debug!(
200							self.logger,
201							"Encountered inconsistency during transaction sync, restarting."
202						);
203						sync_state.pending_sync = true;
204						continue;
205					},
206					Err(err) => {
207						// (Semi-)permanent failure, retry later.
208						log_error!(self.logger,
209							"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
210							num_confirmed,
211							num_unconfirmed
212						);
213						sync_state.pending_sync = true;
214						return Err(TxSyncError::from(err));
215					},
216				}
217				sync_state.last_sync_hash = Some(tip_header.block_hash());
218				sync_state.pending_sync = false;
219			}
220		}
221		#[cfg(feature = "time")]
222		log_debug!(
223			self.logger,
224			"Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
225			tip_header.block_hash(),
226			start_time.elapsed().as_millis(),
227			num_confirmed,
228			num_unconfirmed
229		);
230		#[cfg(not(feature = "time"))]
231		log_debug!(
232			self.logger,
233			"Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
234			tip_header.block_hash(),
235			num_confirmed,
236			num_unconfirmed
237		);
238		Ok(())
239	}
240
241	fn check_update_tip(
242		&self, cur_tip_header: &mut Header, cur_tip_height: &mut u32,
243	) -> Result<bool, InternalError> {
244		let check_notification = self.client.block_headers_subscribe()?;
245		let check_tip_hash = check_notification.header.block_hash();
246
247		// Restart if either the tip changed or we got some divergent tip
248		// change notification since we started. In the latter case we
249		// make sure we clear the queue before continuing.
250		let mut restart_sync = check_tip_hash != cur_tip_header.block_hash();
251		while let Some(queued_notif) = self.client.block_headers_pop()? {
252			if queued_notif.header.block_hash() != check_tip_hash {
253				restart_sync = true
254			}
255		}
256
257		if restart_sync {
258			*cur_tip_header = check_notification.header;
259			*cur_tip_height = check_notification.height as u32;
260			Ok(true)
261		} else {
262			Ok(false)
263		}
264	}
265
266	fn get_confirmed_transactions(
267		&self, sync_state: &SyncState,
268	) -> Result<Vec<ConfirmedTx>, InternalError> {
269		// First, check the confirmation status of registered transactions as well as the
270		// status of dependent transactions of registered outputs.
271		let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
272		let mut watched_script_pubkeys = Vec::with_capacity(
273			sync_state.watched_transactions.len() + sync_state.watched_outputs.len(),
274		);
275		let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
276
277		for txid in &sync_state.watched_transactions {
278			match self.client.transaction_get(&txid) {
279				Ok(tx) => {
280					// Bitcoin Core's Merkle tree implementation has no way to discern between
281					// internal and leaf node entries. As a consequence it is susceptible to an
282					// attacker injecting additional transactions by crafting 64-byte
283					// transactions matching an inner Merkle node's hash (see
284					// https://web.archive.org/web/20240329003521/https://bitslog.com/2018/06/09/leaf-node-weakness-in-bitcoin-merkle-tree-design/).
285					// To protect against this (highly unlikely) attack vector, we check that the
286					// transaction is at least 65 bytes in length.
287					if tx.total_size() == 64 {
288						log_error!(self.logger, "Skipping transaction {} due to retrieving potentially invalid tx data.", txid);
289						continue;
290					}
291
292					watched_txs.push((txid, tx.clone()));
293					if let Some(tx_out) = tx.output.first() {
294						// We watch an arbitrary output of the transaction of interest in order to
295						// retrieve the associated script history, before narrowing down our search
296						// through `filter`ing by `txid` below.
297						watched_script_pubkeys.push(tx_out.script_pubkey.clone());
298					} else {
299						debug_assert!(false, "Failed due to retrieving invalid tx data.");
300						log_error!(self.logger, "Failed due to retrieving invalid tx data.");
301						return Err(InternalError::Failed);
302					}
303				},
304				Err(electrum_client::Error::Protocol(_)) => {
305					// We couldn't find the tx, do nothing.
306				},
307				Err(e) => {
308					log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
309					return Err(InternalError::Failed);
310				},
311			}
312		}
313
314		let num_tx_lookups = watched_script_pubkeys.len();
315		debug_assert_eq!(num_tx_lookups, watched_txs.len());
316
317		for output in sync_state.watched_outputs.values() {
318			watched_script_pubkeys.push(output.script_pubkey.clone());
319		}
320
321		let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups;
322		debug_assert_eq!(num_output_spend_lookups, sync_state.watched_outputs.len());
323
324		match self.client.batch_script_get_history(watched_script_pubkeys.iter().map(|s| s.deref()))
325		{
326			Ok(results) => {
327				let (tx_results, output_results) = results.split_at(num_tx_lookups);
328				debug_assert_eq!(num_output_spend_lookups, output_results.len());
329
330				for (i, script_history) in tx_results.iter().enumerate() {
331					let (txid, tx) = &watched_txs[i];
332					if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
333						continue;
334					}
335					let mut filtered_history =
336						script_history.iter().filter(|h| h.tx_hash == **txid);
337					if let Some(history) = filtered_history.next() {
338						let prob_conf_height = history.height as u32;
339						let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
340						confirmed_txs.push(confirmed_tx);
341					}
342					debug_assert!(filtered_history.next().is_none());
343				}
344
345				for (watched_output, script_history) in
346					sync_state.watched_outputs.values().zip(output_results)
347				{
348					for possible_output_spend in script_history {
349						if possible_output_spend.height <= 0 {
350							continue;
351						}
352
353						let txid = possible_output_spend.tx_hash;
354						if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
355							continue;
356						}
357
358						match self.client.transaction_get(&txid) {
359							Ok(tx) => {
360								let mut is_spend = false;
361								for txin in &tx.input {
362									let watched_outpoint =
363										watched_output.outpoint.into_bitcoin_outpoint();
364									if txin.previous_output == watched_outpoint {
365										is_spend = true;
366										break;
367									}
368								}
369
370								if !is_spend {
371									continue;
372								}
373
374								let prob_conf_height = possible_output_spend.height as u32;
375								let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
376								confirmed_txs.push(confirmed_tx);
377							},
378							Err(e) => {
379								log_trace!(
380									self.logger,
381									"Inconsistency: Tx {} was unconfirmed during syncing: {}",
382									txid,
383									e
384								);
385								return Err(InternalError::Inconsistency);
386							},
387						}
388					}
389				}
390			},
391			Err(e) => {
392				log_error!(self.logger, "Failed to look up script histories: {}.", e);
393				return Err(InternalError::Failed);
394			},
395		}
396
397		// Sort all confirmed transactions first by block height, then by in-block
398		// position, and finally feed them to the interface in order.
399		confirmed_txs.sort_unstable_by(|tx1, tx2| {
400			tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
401		});
402
403		Ok(confirmed_txs)
404	}
405
406	fn get_unconfirmed_transactions<C: Deref>(
407		&self, confirmables: &Vec<C>,
408	) -> Result<Vec<Txid>, InternalError>
409	where
410		C::Target: Confirm,
411	{
412		// Query the interface for relevant txids and check whether the relevant blocks are still
413		// in the best chain, mark them unconfirmed otherwise
414		let relevant_txids = confirmables
415			.iter()
416			.flat_map(|c| c.get_relevant_txids())
417			.collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
418
419		let mut unconfirmed_txs = Vec::new();
420
421		for (txid, conf_height, block_hash_opt) in relevant_txids {
422			if let Some(block_hash) = block_hash_opt {
423				let block_header = self.client.block_header(conf_height as usize)?;
424				if block_header.block_hash() == block_hash {
425					// Skip if the tx is still confirmed in the block in question.
426					continue;
427				}
428
429				unconfirmed_txs.push(txid);
430			} else {
431				log_error!(self.logger,
432					"Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
433				panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
434			}
435		}
436		Ok(unconfirmed_txs)
437	}
438
439	fn get_confirmed_tx(
440		&self, tx: &Transaction, prob_conf_height: u32,
441	) -> Result<ConfirmedTx, InternalError> {
442		let txid = tx.compute_txid();
443		match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
444			Ok(merkle_res) => {
445				debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32);
446				match self.client.block_header(prob_conf_height as usize) {
447					Ok(block_header) => {
448						let pos = merkle_res.pos;
449						if !validate_merkle_proof(&txid, &block_header.merkle_root, &merkle_res) {
450							log_trace!(
451								self.logger,
452								"Inconsistency: Block {} was unconfirmed during syncing.",
453								block_header.block_hash()
454							);
455							return Err(InternalError::Inconsistency);
456						}
457						let confirmed_tx = ConfirmedTx {
458							tx: tx.clone(),
459							txid,
460							block_header,
461							block_height: prob_conf_height,
462							pos,
463						};
464						Ok(confirmed_tx)
465					},
466					Err(e) => {
467						log_error!(
468							self.logger,
469							"Failed to retrieve block header for height {}: {}.",
470							prob_conf_height,
471							e
472						);
473						Err(InternalError::Failed)
474					},
475				}
476			},
477			Err(e) => {
478				log_trace!(
479					self.logger,
480					"Inconsistency: Tx {} was unconfirmed during syncing: {}",
481					txid,
482					e
483				);
484				Err(InternalError::Inconsistency)
485			},
486		}
487	}
488
489	/// Returns a reference to the underlying Electrum client.
490	///
491	/// This is not exported to bindings users as the underlying client from BDK is not exported.
492	pub fn client(&self) -> &ElectrumClient {
493		&self.client
494	}
495}
496
497impl<L: Deref> Filter for ElectrumSyncClient<L>
498where
499	L::Target: Logger,
500{
501	fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
502		let mut locked_queue = self.queue.lock().unwrap();
503		locked_queue.transactions.insert(*txid);
504	}
505
506	fn register_output(&self, output: WatchedOutput) {
507		let mut locked_queue = self.queue.lock().unwrap();
508		locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
509	}
510}