lightning_transaction_sync/
esplora.rs

1// This file is Copyright its original authors, visible in version control history.
2//
3// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6// accordance with one or both of these licenses.
7
8use crate::common::{ConfirmedTx, FilterQueue, SyncState};
9use crate::error::{InternalError, TxSyncError};
10
11use lightning::chain::WatchedOutput;
12use lightning::chain::{Confirm, Filter};
13use lightning::util::logger::Logger;
14use lightning::{log_debug, log_error, log_trace};
15
16use lightning_macros::{maybe_async, maybe_await};
17
18use bitcoin::{BlockHash, Script, Txid};
19
20#[cfg(not(feature = "async-interface"))]
21use esplora_client::blocking::BlockingClient;
22#[cfg(feature = "async-interface")]
23use esplora_client::r#async::AsyncClient;
24use esplora_client::Builder;
25
26use core::ops::Deref;
27use std::collections::HashSet;
28
29/// Synchronizes LDK with a given [`Esplora`] server.
30///
31/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
32/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
33/// reconfirmation.
34///
35/// Note that registration via [`Filter`] needs to happen before any calls to
36/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
37///
38/// This uses and exposes either a blocking or async client variant dependent on whether the
39/// `esplora-blocking` or the `esplora-async` feature is enabled.
40///
41/// [`Esplora`]: https://github.com/Blockstream/electrs
42/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
43/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
44/// [`Filter`]: lightning::chain::Filter
45pub struct EsploraSyncClient<L: Deref>
46where
47	L::Target: Logger,
48{
49	sync_state: MutexType<SyncState>,
50	queue: std::sync::Mutex<FilterQueue>,
51	client: EsploraClientType,
52	logger: L,
53}
54
55impl<L: Deref> EsploraSyncClient<L>
56where
57	L::Target: Logger,
58{
59	/// Returns a new [`EsploraSyncClient`] object.
60	pub fn new(server_url: String, logger: L) -> Self {
61		let builder = Builder::new(&server_url);
62		#[cfg(not(feature = "async-interface"))]
63		let client = builder.build_blocking();
64		#[cfg(feature = "async-interface")]
65		let client = builder.build_async().unwrap();
66
67		EsploraSyncClient::from_client(client, logger)
68	}
69
70	/// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
71	///
72	/// This is not exported to bindings users as the underlying client from BDK is not exported.
73	pub fn from_client(client: EsploraClientType, logger: L) -> Self {
74		let sync_state = MutexType::new(SyncState::new());
75		let queue = std::sync::Mutex::new(FilterQueue::new());
76		Self { sync_state, queue, client, logger }
77	}
78
79	/// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
80	/// method should be called regularly to keep LDK up-to-date with current chain data.
81	///
82	/// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
83	/// newest on-chain activity related to the items previously registered via the [`Filter`]
84	/// interface.
85	///
86	/// [`Confirm`]: lightning::chain::Confirm
87	/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
88	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
89	/// [`Filter`]: lightning::chain::Filter
90	#[maybe_async]
91	pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
92	where
93		C::Target: Confirm,
94	{
95		// This lock makes sure we're syncing once at a time.
96		#[cfg(not(feature = "async-interface"))]
97		let mut sync_state = self.sync_state.lock().unwrap();
98		#[cfg(feature = "async-interface")]
99		let mut sync_state = self.sync_state.lock().await;
100
101		log_trace!(self.logger, "Starting transaction sync.");
102		#[cfg(feature = "time")]
103		let start_time = std::time::Instant::now();
104		let mut num_confirmed = 0;
105		let mut num_unconfirmed = 0;
106
107		let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
108
109		loop {
110			let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
111			let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
112
113			// We loop until any registered transactions have been processed at least once, or the
114			// tip hasn't been updated during the last iteration.
115			if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
116				// Nothing to do.
117				break;
118			} else {
119				// Update the known tip to the newest one.
120				if tip_is_new {
121					// First check for any unconfirmed transactions and act on it immediately.
122					match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
123						Ok(unconfirmed_txs) => {
124							// Double-check the tip hash. If it changed, a reorg happened since
125							// we started syncing and we need to restart last-minute.
126							match maybe_await!(self.client.get_tip_hash()) {
127								Ok(check_tip_hash) => {
128									if check_tip_hash != tip_hash {
129										tip_hash = check_tip_hash;
130
131										log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
132										sync_state.pending_sync = true;
133										continue;
134									}
135									num_unconfirmed += unconfirmed_txs.len();
136									sync_state.sync_unconfirmed_transactions(
137										&confirmables,
138										unconfirmed_txs,
139									);
140								},
141								Err(err) => {
142									// (Semi-)permanent failure, retry later.
143									log_error!(self.logger,
144										"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
145										num_confirmed,
146										num_unconfirmed
147										);
148									sync_state.pending_sync = true;
149									return Err(TxSyncError::from(err));
150								},
151							}
152						},
153						Err(err) => {
154							// (Semi-)permanent failure, retry later.
155							log_error!(self.logger,
156								"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
157								num_confirmed,
158								num_unconfirmed
159							);
160							sync_state.pending_sync = true;
161							return Err(TxSyncError::from(err));
162						},
163					}
164
165					match maybe_await!(self.sync_best_block_updated(
166						&confirmables,
167						&mut sync_state,
168						&tip_hash
169					)) {
170						Ok(()) => {},
171						Err(InternalError::Inconsistency) => {
172							// Immediately restart syncing when we encounter any inconsistencies.
173							log_debug!(
174								self.logger,
175								"Encountered inconsistency during transaction sync, restarting."
176							);
177							sync_state.pending_sync = true;
178							continue;
179						},
180						Err(err) => {
181							// (Semi-)permanent failure, retry later.
182							log_error!(self.logger,
183								"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
184								num_confirmed,
185								num_unconfirmed
186							);
187							sync_state.pending_sync = true;
188							return Err(TxSyncError::from(err));
189						},
190					}
191				}
192
193				match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
194					Ok(confirmed_txs) => {
195						// Double-check the tip hash. If it changed, a reorg happened since
196						// we started syncing and we need to restart last-minute.
197						match maybe_await!(self.client.get_tip_hash()) {
198							Ok(check_tip_hash) => {
199								if check_tip_hash != tip_hash {
200									tip_hash = check_tip_hash;
201
202									log_debug!(self.logger,
203										"Encountered inconsistency during transaction sync, restarting.");
204									sync_state.pending_sync = true;
205									continue;
206								}
207								num_confirmed += confirmed_txs.len();
208								sync_state
209									.sync_confirmed_transactions(&confirmables, confirmed_txs);
210							},
211							Err(err) => {
212								// (Semi-)permanent failure, retry later.
213								log_error!(self.logger,
214									"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
215									num_confirmed,
216									num_unconfirmed
217								);
218								sync_state.pending_sync = true;
219								return Err(TxSyncError::from(err));
220							},
221						}
222					},
223					Err(InternalError::Inconsistency) => {
224						// Immediately restart syncing when we encounter any inconsistencies.
225						log_debug!(
226							self.logger,
227							"Encountered inconsistency during transaction sync, restarting."
228						);
229						sync_state.pending_sync = true;
230						continue;
231					},
232					Err(err) => {
233						// (Semi-)permanent failure, retry later.
234						log_error!(self.logger,
235							"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
236							num_confirmed,
237							num_unconfirmed
238						);
239						sync_state.pending_sync = true;
240						return Err(TxSyncError::from(err));
241					},
242				}
243				sync_state.last_sync_hash = Some(tip_hash);
244				sync_state.pending_sync = false;
245			}
246		}
247		#[cfg(feature = "time")]
248		log_debug!(
249			self.logger,
250			"Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
251			tip_hash,
252			start_time.elapsed().as_millis(),
253			num_confirmed,
254			num_unconfirmed
255		);
256		#[cfg(not(feature = "time"))]
257		log_debug!(
258			self.logger,
259			"Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
260			tip_hash,
261			num_confirmed,
262			num_unconfirmed
263		);
264		Ok(())
265	}
266
267	#[maybe_async]
268	fn sync_best_block_updated<C: Deref>(
269		&self, confirmables: &Vec<C>, sync_state: &mut SyncState, tip_hash: &BlockHash,
270	) -> Result<(), InternalError>
271	where
272		C::Target: Confirm,
273	{
274		// Inform the interface of the new block.
275		let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
276		let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
277		if tip_status.in_best_chain {
278			if let Some(tip_height) = tip_status.height {
279				for c in confirmables {
280					c.best_block_updated(&tip_header, tip_height);
281				}
282
283				// Prune any sufficiently confirmed output spends
284				sync_state.prune_output_spends(tip_height);
285			}
286		} else {
287			return Err(InternalError::Inconsistency);
288		}
289		Ok(())
290	}
291
292	#[maybe_async]
293	fn get_confirmed_transactions(
294		&self, sync_state: &SyncState,
295	) -> Result<Vec<ConfirmedTx>, InternalError> {
296		// First, check the confirmation status of registered transactions as well as the
297		// status of dependent transactions of registered outputs.
298
299		let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
300
301		for txid in &sync_state.watched_transactions {
302			if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
303				continue;
304			}
305			if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
306				confirmed_txs.push(confirmed_tx);
307			}
308		}
309
310		for (_, output) in &sync_state.watched_outputs {
311			if let Some(output_status) = maybe_await!(self
312				.client
313				.get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
314			{
315				if let Some(spending_txid) = output_status.txid {
316					if let Some(spending_tx_status) = output_status.status {
317						if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
318							if spending_tx_status.confirmed {
319								// Skip inserting duplicate ConfirmedTx entry
320								continue;
321							} else {
322								log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
323								return Err(InternalError::Inconsistency);
324							}
325						}
326
327						if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
328							spending_txid,
329							spending_tx_status.block_hash,
330							spending_tx_status.block_height,
331						))? {
332							confirmed_txs.push(confirmed_tx);
333						}
334					}
335				}
336			}
337		}
338
339		// Sort all confirmed transactions first by block height, then by in-block
340		// position, and finally feed them to the interface in order.
341		confirmed_txs.sort_unstable_by(|tx1, tx2| {
342			tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
343		});
344
345		Ok(confirmed_txs)
346	}
347
348	#[maybe_async]
349	fn get_confirmed_tx(
350		&self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
351	) -> Result<Option<ConfirmedTx>, InternalError> {
352		if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
353			let block_header = merkle_block.header;
354			let block_hash = block_header.block_hash();
355			if let Some(expected_block_hash) = expected_block_hash {
356				if expected_block_hash != block_hash {
357					log_trace!(
358						self.logger,
359						"Inconsistency: Tx {} expected in block {}, but is confirmed in {}",
360						txid,
361						expected_block_hash,
362						block_hash
363					);
364					return Err(InternalError::Inconsistency);
365				}
366			}
367
368			let mut matches = Vec::new();
369			let mut indexes = Vec::new();
370			let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
371			if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid {
372				log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
373				return Err(InternalError::Failed);
374			}
375
376			// unwrap() safety: len() > 0 is checked above
377			let pos = *indexes.first().unwrap() as usize;
378			if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
379				if tx.compute_txid() != txid {
380					log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
381					return Err(InternalError::Failed);
382				}
383
384				// Bitcoin Core's Merkle tree implementation has no way to discern between
385				// internal and leaf node entries. As a consequence it is susceptible to an
386				// attacker injecting additional transactions by crafting 64-byte
387				// transactions matching an inner Merkle node's hash (see
388				// https://web.archive.org/web/20240329003521/https://bitslog.com/2018/06/09/leaf-node-weakness-in-bitcoin-merkle-tree-design/).
389				// To protect against this (highly unlikely) attack vector, we check that the
390				// transaction is at least 65 bytes in length.
391				if tx.total_size() == 64 {
392					log_error!(
393						self.logger,
394						"Skipping transaction {} due to retrieving potentially invalid tx data.",
395						txid
396					);
397					return Ok(None);
398				}
399
400				if let Some(block_height) = known_block_height {
401					// We can take a shortcut here if a previous call already gave us the height.
402					return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
403				}
404
405				let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
406				if let Some(block_height) = block_status.height {
407					return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
408				} else {
409					// If any previously-confirmed block suddenly is no longer confirmed, we found
410					// an inconsistency and should start over.
411					log_trace!(
412						self.logger,
413						"Inconsistency: Tx {} was unconfirmed during syncing.",
414						txid
415					);
416					return Err(InternalError::Inconsistency);
417				}
418			}
419		}
420		Ok(None)
421	}
422
423	#[maybe_async]
424	fn get_unconfirmed_transactions<C: Deref>(
425		&self, confirmables: &Vec<C>,
426	) -> Result<Vec<Txid>, InternalError>
427	where
428		C::Target: Confirm,
429	{
430		// Query the interface for relevant txids and check whether the relevant blocks are still
431		// in the best chain, mark them unconfirmed otherwise
432		let relevant_txids = confirmables
433			.iter()
434			.flat_map(|c| c.get_relevant_txids())
435			.collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
436
437		let mut unconfirmed_txs = Vec::new();
438
439		for (txid, _conf_height, block_hash_opt) in relevant_txids {
440			if let Some(block_hash) = block_hash_opt {
441				let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
442				if block_status.in_best_chain {
443					// Skip if the block in question is still confirmed.
444					continue;
445				}
446
447				unconfirmed_txs.push(txid);
448			} else {
449				log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
450				panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
451			}
452		}
453		Ok(unconfirmed_txs)
454	}
455
456	/// Returns a reference to the underlying esplora client.
457	///
458	/// This is not exported to bindings users as the underlying client from BDK is not exported.
459	pub fn client(&self) -> &EsploraClientType {
460		&self.client
461	}
462}
463
464#[cfg(feature = "async-interface")]
465type MutexType<I> = futures::lock::Mutex<I>;
466#[cfg(not(feature = "async-interface"))]
467type MutexType<I> = std::sync::Mutex<I>;
468
469// The underlying client type.
470#[cfg(feature = "async-interface")]
471type EsploraClientType = AsyncClient;
472#[cfg(not(feature = "async-interface"))]
473type EsploraClientType = BlockingClient;
474
475impl<L: Deref> Filter for EsploraSyncClient<L>
476where
477	L::Target: Logger,
478{
479	fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
480		let mut locked_queue = self.queue.lock().unwrap();
481		locked_queue.transactions.insert(*txid);
482	}
483
484	fn register_output(&self, output: WatchedOutput) {
485		let mut locked_queue = self.queue.lock().unwrap();
486		locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
487	}
488}