Skip to main content

lightning_transaction_sync/
electrum.rs

1// This file is Copyright its original authors, visible in version control history.
2//
3// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6// accordance with one or both of these licenses.
7
8use crate::common::{ConfirmedTx, FilterQueue, SyncState};
9use crate::error::{InternalError, TxSyncError};
10
11use electrum_client::utils::validate_merkle_proof;
12use electrum_client::Client as ElectrumClient;
13use electrum_client::ElectrumApi;
14
15use lightning::chain::WatchedOutput;
16use lightning::chain::{Confirm, Filter};
17use lightning::util::logger::Logger;
18use lightning::{log_debug, log_error, log_trace};
19
20use bitcoin::block::Header;
21use bitcoin::{BlockHash, Script, Transaction, Txid};
22
23use std::collections::HashSet;
24use std::ops::Deref;
25use std::sync::Mutex;
26use std::time::Instant;
27
28/// Synchronizes LDK with a given Electrum server.
29///
30/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
31/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
32/// reconfirmation.
33///
34/// Note that registration via [`Filter`] needs to happen before any calls to
35/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
36///
37/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
38/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
39/// [`Filter`]: lightning::chain::Filter
40pub struct ElectrumSyncClient<L: Deref>
41where
42	L::Target: Logger,
43{
44	sync_state: Mutex<SyncState>,
45	queue: Mutex<FilterQueue>,
46	client: ElectrumClient,
47	logger: L,
48}
49
50impl<L: Deref> ElectrumSyncClient<L>
51where
52	L::Target: Logger,
53{
54	/// Returns a new [`ElectrumSyncClient`] object.
55	pub fn new(server_url: String, logger: L) -> Result<Self, TxSyncError> {
56		let client = ElectrumClient::new(&server_url).map_err(|e| {
57			log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e);
58			e
59		})?;
60
61		Self::from_client(client, logger)
62	}
63
64	/// Returns a new [`ElectrumSyncClient`] object using the given Electrum client.
65	///
66	/// This is not exported to bindings users as the underlying client from BDK is not exported.
67	pub fn from_client(client: ElectrumClient, logger: L) -> Result<Self, TxSyncError> {
68		let sync_state = Mutex::new(SyncState::new());
69		let queue = Mutex::new(FilterQueue::new());
70
71		Ok(Self { sync_state, queue, client, logger })
72	}
73
74	/// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
75	/// method should be called regularly to keep LDK up-to-date with current chain data.
76	///
77	/// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
78	/// newest on-chain activity related to the items previously registered via the [`Filter`]
79	/// interface.
80	///
81	/// [`Confirm`]: lightning::chain::Confirm
82	/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
83	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
84	/// [`Filter`]: lightning::chain::Filter
85	pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
86	where
87		C::Target: Confirm,
88	{
89		// This lock makes sure we're syncing once at a time.
90		let mut sync_state = self.sync_state.lock().unwrap();
91
92		log_trace!(self.logger, "Starting transaction sync.");
93		#[cfg(feature = "time")]
94		let start_time = Instant::now();
95		let mut num_confirmed = 0;
96		let mut num_unconfirmed = 0;
97
98		// Clear any header notifications we might have gotten to keep the queue count low.
99		while let Some(_) = self.client.block_headers_pop()? {}
100
101		let tip_notification = self.client.block_headers_subscribe()?;
102		let mut tip_header = tip_notification.header;
103		let mut tip_height = tip_notification.height as u32;
104
105		loop {
106			let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
107			let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash;
108
109			// We loop until any registered transactions have been processed at least once, or the
110			// tip hasn't been updated during the last iteration.
111			if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
112				// Nothing to do.
113				break;
114			} else {
115				// Update the known tip to the newest one.
116				if tip_is_new {
117					// First check for any unconfirmed transactions and act on it immediately.
118					match self.get_unconfirmed_transactions(&confirmables) {
119						Ok(unconfirmed_txs) => {
120							// Double-check the tip hash. If it changed, a reorg happened since
121							// we started syncing and we need to restart last-minute.
122							match self.check_update_tip(&mut tip_header, &mut tip_height) {
123								Ok(false) => {
124									num_unconfirmed += unconfirmed_txs.len();
125									sync_state.sync_unconfirmed_transactions(
126										&confirmables,
127										unconfirmed_txs,
128									);
129								},
130								Ok(true) => {
131									log_debug!(self.logger,
132										"Encountered inconsistency during transaction sync, restarting.");
133									sync_state.pending_sync = true;
134									continue;
135								},
136								Err(err) => {
137									// (Semi-)permanent failure, retry later.
138									log_error!(self.logger,
139										"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
140										num_confirmed,
141										num_unconfirmed
142									);
143									sync_state.pending_sync = true;
144									return Err(TxSyncError::from(err));
145								},
146							}
147						},
148						Err(err) => {
149							// (Semi-)permanent failure, retry later.
150							log_error!(self.logger,
151								"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
152								num_confirmed,
153								num_unconfirmed
154							);
155							sync_state.pending_sync = true;
156							return Err(TxSyncError::from(err));
157						},
158					}
159
160					// Update the best block.
161					for c in &confirmables {
162						c.best_block_updated(&tip_header, tip_height);
163					}
164
165					// Prune any sufficiently confirmed output spends
166					sync_state.prune_output_spends(tip_height);
167				}
168
169				match self.get_confirmed_transactions(&sync_state) {
170					Ok(confirmed_txs) => {
171						// Double-check the tip hash. If it changed, a reorg happened since
172						// we started syncing and we need to restart last-minute.
173						match self.check_update_tip(&mut tip_header, &mut tip_height) {
174							Ok(false) => {
175								num_confirmed += confirmed_txs.len();
176								sync_state
177									.sync_confirmed_transactions(&confirmables, confirmed_txs);
178							},
179							Ok(true) => {
180								log_debug!(self.logger,
181									"Encountered inconsistency during transaction sync, restarting.");
182								sync_state.pending_sync = true;
183								continue;
184							},
185							Err(err) => {
186								// (Semi-)permanent failure, retry later.
187								log_error!(self.logger,
188									"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
189									num_confirmed,
190									num_unconfirmed
191								);
192								sync_state.pending_sync = true;
193								return Err(TxSyncError::from(err));
194							},
195						}
196					},
197					Err(InternalError::Inconsistency) => {
198						// Immediately restart syncing when we encounter any inconsistencies.
199						log_debug!(
200							self.logger,
201							"Encountered inconsistency during transaction sync, restarting."
202						);
203						sync_state.pending_sync = true;
204						continue;
205					},
206					Err(err) => {
207						// (Semi-)permanent failure, retry later.
208						log_error!(self.logger,
209							"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
210							num_confirmed,
211							num_unconfirmed
212						);
213						sync_state.pending_sync = true;
214						return Err(TxSyncError::from(err));
215					},
216				}
217				sync_state.last_sync_hash = Some(tip_header.block_hash());
218				sync_state.pending_sync = false;
219			}
220		}
221		#[cfg(feature = "time")]
222		log_debug!(
223			self.logger,
224			"Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
225			tip_header.block_hash(),
226			start_time.elapsed().as_millis(),
227			num_confirmed,
228			num_unconfirmed
229		);
230		#[cfg(not(feature = "time"))]
231		log_debug!(
232			self.logger,
233			"Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
234			tip_header.block_hash(),
235			num_confirmed,
236			num_unconfirmed
237		);
238		Ok(())
239	}
240
241	fn check_update_tip(
242		&self, cur_tip_header: &mut Header, cur_tip_height: &mut u32,
243	) -> Result<bool, InternalError> {
244		let check_notification = self.client.block_headers_subscribe()?;
245		let check_tip_hash = check_notification.header.block_hash();
246
247		// Restart if either the tip changed or we got some divergent tip
248		// change notification since we started. In the latter case we
249		// make sure we clear the queue before continuing.
250		let mut restart_sync = check_tip_hash != cur_tip_header.block_hash();
251		while let Some(queued_notif) = self.client.block_headers_pop()? {
252			if queued_notif.header.block_hash() != check_tip_hash {
253				restart_sync = true
254			}
255		}
256
257		if restart_sync {
258			*cur_tip_header = check_notification.header;
259			*cur_tip_height = check_notification.height as u32;
260			Ok(true)
261		} else {
262			Ok(false)
263		}
264	}
265
266	fn get_confirmed_transactions(
267		&self, sync_state: &SyncState,
268	) -> Result<Vec<ConfirmedTx>, InternalError> {
269		// First, check the confirmation status of registered transactions as well as the
270		// status of dependent transactions of registered outputs.
271		let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
272		let mut watched_script_pubkeys = Vec::with_capacity(
273			sync_state.watched_transactions.len() + sync_state.watched_outputs.len(),
274		);
275		let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
276
277		for txid in &sync_state.watched_transactions {
278			match self.client.transaction_get(&txid) {
279				Ok(tx) => {
280					// Bitcoin Core's Merkle tree implementation has no way to discern between
281					// internal and leaf node entries. As a consequence it is susceptible to an
282					// attacker injecting additional transactions by crafting 64-byte
283					// transactions matching an inner Merkle node's hash (see
284					// https://web.archive.org/web/20240329003521/https://bitslog.com/2018/06/09/leaf-node-weakness-in-bitcoin-merkle-tree-design/).
285					// To protect against this (highly unlikely) attack vector, we check that the
286					// transaction is at least 65 bytes in length.
287					if tx.total_size() == 64 {
288						log_error!(self.logger, "Skipping transaction {} due to retrieving potentially invalid tx data.", txid);
289						continue;
290					}
291
292					watched_txs.push((txid, tx.clone()));
293					if let Some(tx_out) = tx.output.first() {
294						// We watch an arbitrary output of the transaction of interest in order to
295						// retrieve the associated script history, before narrowing down our search
296						// through `filter`ing by `txid` below.
297						watched_script_pubkeys.push(tx_out.script_pubkey.clone());
298					} else {
299						debug_assert!(false, "Failed due to retrieving invalid tx data.");
300						log_error!(self.logger, "Failed due to retrieving invalid tx data.");
301						return Err(InternalError::Failed);
302					}
303				},
304				Err(electrum_client::Error::Protocol(_)) => {
305					// We couldn't find the tx, do nothing.
306				},
307				Err(e) => {
308					log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
309					return Err(InternalError::Failed);
310				},
311			}
312		}
313
314		let num_tx_lookups = watched_script_pubkeys.len();
315		debug_assert_eq!(num_tx_lookups, watched_txs.len());
316
317		for output in sync_state.watched_outputs.values() {
318			watched_script_pubkeys.push(output.script_pubkey.clone());
319		}
320
321		let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups;
322		debug_assert_eq!(num_output_spend_lookups, sync_state.watched_outputs.len());
323
324		match self.client.batch_script_get_history(watched_script_pubkeys.iter().map(|s| s.deref()))
325		{
326			Ok(results) => {
327				let (tx_results, output_results) = results.split_at(num_tx_lookups);
328				debug_assert_eq!(num_output_spend_lookups, output_results.len());
329
330				for (i, script_history) in tx_results.iter().enumerate() {
331					let (txid, tx) = &watched_txs[i];
332					if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
333						continue;
334					}
335					let mut filtered_history =
336						script_history.iter().filter(|h| h.tx_hash == **txid);
337					if let Some(history) = filtered_history.next() {
338						let prob_conf_height = history.height as u32;
339						if prob_conf_height <= 0 {
340							// Skip if it's a an unconfirmed entry.
341							continue;
342						}
343						let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
344						confirmed_txs.push(confirmed_tx);
345					}
346					if filtered_history.next().is_some() {
347						log_error!(
348							self.logger,
349							"Failed due to server returning multiple history entries for Tx {}.",
350							txid
351						);
352						return Err(InternalError::Failed);
353					}
354				}
355
356				for (watched_output, script_history) in
357					sync_state.watched_outputs.values().zip(output_results)
358				{
359					for possible_output_spend in script_history {
360						if possible_output_spend.height <= 0 {
361							// Skip if it's a an unconfirmed entry.
362							continue;
363						}
364
365						let txid = possible_output_spend.tx_hash;
366						if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
367							continue;
368						}
369
370						match self.client.transaction_get(&txid) {
371							Ok(tx) => {
372								let mut is_spend = false;
373								for txin in &tx.input {
374									let watched_outpoint =
375										watched_output.outpoint.into_bitcoin_outpoint();
376									if txin.previous_output == watched_outpoint {
377										is_spend = true;
378										break;
379									}
380								}
381
382								if !is_spend {
383									continue;
384								}
385
386								let prob_conf_height = possible_output_spend.height as u32;
387								let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
388								confirmed_txs.push(confirmed_tx);
389							},
390							Err(e) => {
391								log_trace!(
392									self.logger,
393									"Inconsistency: Tx {} was unconfirmed during syncing: {}",
394									txid,
395									e
396								);
397								return Err(InternalError::Inconsistency);
398							},
399						}
400					}
401				}
402			},
403			Err(e) => {
404				log_error!(self.logger, "Failed to look up script histories: {}.", e);
405				return Err(InternalError::Failed);
406			},
407		}
408
409		// Sort all confirmed transactions first by block height, then by in-block
410		// position, and finally feed them to the interface in order.
411		confirmed_txs.sort_unstable_by(|tx1, tx2| {
412			tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
413		});
414
415		Ok(confirmed_txs)
416	}
417
418	fn get_unconfirmed_transactions<C: Deref>(
419		&self, confirmables: &Vec<C>,
420	) -> Result<Vec<Txid>, InternalError>
421	where
422		C::Target: Confirm,
423	{
424		// Query the interface for relevant txids and check whether the relevant blocks are still
425		// in the best chain, mark them unconfirmed otherwise
426		let relevant_txids = confirmables
427			.iter()
428			.flat_map(|c| c.get_relevant_txids())
429			.collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
430
431		let mut unconfirmed_txs = Vec::new();
432
433		for (txid, conf_height, block_hash_opt) in relevant_txids {
434			if let Some(block_hash) = block_hash_opt {
435				let block_header = self.client.block_header(conf_height as usize)?;
436				if block_header.block_hash() == block_hash {
437					// Skip if the tx is still confirmed in the block in question.
438					continue;
439				}
440
441				unconfirmed_txs.push(txid);
442			} else {
443				log_error!(self.logger,
444					"Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
445				panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
446			}
447		}
448		Ok(unconfirmed_txs)
449	}
450
451	fn get_confirmed_tx(
452		&self, tx: &Transaction, prob_conf_height: u32,
453	) -> Result<ConfirmedTx, InternalError> {
454		let txid = tx.compute_txid();
455		match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
456			Ok(merkle_res) => {
457				debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32);
458				match self.client.block_header(prob_conf_height as usize) {
459					Ok(block_header) => {
460						let pos = merkle_res.pos;
461						if !validate_merkle_proof(&txid, &block_header.merkle_root, &merkle_res) {
462							log_trace!(
463								self.logger,
464								"Inconsistency: Block {} was unconfirmed during syncing.",
465								block_header.block_hash()
466							);
467							return Err(InternalError::Inconsistency);
468						}
469						let confirmed_tx = ConfirmedTx {
470							tx: tx.clone(),
471							txid,
472							block_header,
473							block_height: prob_conf_height,
474							pos,
475						};
476						Ok(confirmed_tx)
477					},
478					Err(e) => {
479						log_error!(
480							self.logger,
481							"Failed to retrieve block header for height {}: {}.",
482							prob_conf_height,
483							e
484						);
485						Err(InternalError::Failed)
486					},
487				}
488			},
489			Err(e) => {
490				log_trace!(
491					self.logger,
492					"Inconsistency: Tx {} was unconfirmed during syncing: {}",
493					txid,
494					e
495				);
496				Err(InternalError::Inconsistency)
497			},
498		}
499	}
500
501	/// Returns a reference to the underlying Electrum client.
502	///
503	/// This is not exported to bindings users as the underlying client from BDK is not exported.
504	pub fn client(&self) -> &ElectrumClient {
505		&self.client
506	}
507}
508
509impl<L: Deref> Filter for ElectrumSyncClient<L>
510where
511	L::Target: Logger,
512{
513	fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
514		let mut locked_queue = self.queue.lock().unwrap();
515		locked_queue.transactions.insert(*txid);
516	}
517
518	fn register_output(&self, output: WatchedOutput) {
519		let mut locked_queue = self.queue.lock().unwrap();
520		locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
521	}
522}