lightning_transaction_sync/
esplora.rs1use crate::common::{ConfirmedTx, FilterQueue, SyncState};
9use crate::error::{InternalError, TxSyncError};
10
11use lightning::chain::WatchedOutput;
12use lightning::chain::{Confirm, Filter};
13use lightning::util::logger::Logger;
14use lightning::{log_debug, log_error, log_trace};
15
16use lightning_macros::{maybe_async, maybe_await};
17
18use bitcoin::{BlockHash, Script, Txid};
19
20#[cfg(not(feature = "async-interface"))]
21use esplora_client::blocking::BlockingClient;
22#[cfg(feature = "async-interface")]
23use esplora_client::r#async::AsyncClient;
24use esplora_client::Builder;
25
26use core::ops::Deref;
27use std::collections::HashSet;
28
29pub struct EsploraSyncClient<L: Deref>
46where
47 L::Target: Logger,
48{
49 sync_state: MutexType<SyncState>,
50 queue: std::sync::Mutex<FilterQueue>,
51 client: EsploraClientType,
52 logger: L,
53}
54
55impl<L: Deref> EsploraSyncClient<L>
56where
57 L::Target: Logger,
58{
59 pub fn new(server_url: String, logger: L) -> Self {
61 let builder = Builder::new(&server_url);
62 #[cfg(not(feature = "async-interface"))]
63 let client = builder.build_blocking();
64 #[cfg(feature = "async-interface")]
65 let client = builder.build_async().unwrap();
66
67 EsploraSyncClient::from_client(client, logger)
68 }
69
70 pub fn from_client(client: EsploraClientType, logger: L) -> Self {
74 let sync_state = MutexType::new(SyncState::new());
75 let queue = std::sync::Mutex::new(FilterQueue::new());
76 Self { sync_state, queue, client, logger }
77 }
78
79 #[maybe_async]
91 pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
92 where
93 C::Target: Confirm,
94 {
95 #[cfg(not(feature = "async-interface"))]
97 let mut sync_state = self.sync_state.lock().unwrap();
98 #[cfg(feature = "async-interface")]
99 let mut sync_state = self.sync_state.lock().await;
100
101 log_trace!(self.logger, "Starting transaction sync.");
102 #[cfg(feature = "time")]
103 let start_time = std::time::Instant::now();
104 let mut num_confirmed = 0;
105 let mut num_unconfirmed = 0;
106
107 let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
108
109 loop {
110 let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
111 let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
112
113 if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
116 break;
118 } else {
119 if tip_is_new {
121 match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
123 Ok(unconfirmed_txs) => {
124 match maybe_await!(self.client.get_tip_hash()) {
127 Ok(check_tip_hash) => {
128 if check_tip_hash != tip_hash {
129 tip_hash = check_tip_hash;
130
131 log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
132 sync_state.pending_sync = true;
133 continue;
134 }
135 num_unconfirmed += unconfirmed_txs.len();
136 sync_state.sync_unconfirmed_transactions(
137 &confirmables,
138 unconfirmed_txs,
139 );
140 },
141 Err(err) => {
142 log_error!(self.logger,
144 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
145 num_confirmed,
146 num_unconfirmed
147 );
148 sync_state.pending_sync = true;
149 return Err(TxSyncError::from(err));
150 },
151 }
152 },
153 Err(err) => {
154 log_error!(self.logger,
156 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
157 num_confirmed,
158 num_unconfirmed
159 );
160 sync_state.pending_sync = true;
161 return Err(TxSyncError::from(err));
162 },
163 }
164
165 match maybe_await!(self.sync_best_block_updated(
166 &confirmables,
167 &mut sync_state,
168 &tip_hash
169 )) {
170 Ok(()) => {},
171 Err(InternalError::Inconsistency) => {
172 log_debug!(
174 self.logger,
175 "Encountered inconsistency during transaction sync, restarting."
176 );
177 sync_state.pending_sync = true;
178 continue;
179 },
180 Err(err) => {
181 log_error!(self.logger,
183 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
184 num_confirmed,
185 num_unconfirmed
186 );
187 sync_state.pending_sync = true;
188 return Err(TxSyncError::from(err));
189 },
190 }
191 }
192
193 match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
194 Ok(confirmed_txs) => {
195 match maybe_await!(self.client.get_tip_hash()) {
198 Ok(check_tip_hash) => {
199 if check_tip_hash != tip_hash {
200 tip_hash = check_tip_hash;
201
202 log_debug!(self.logger,
203 "Encountered inconsistency during transaction sync, restarting.");
204 sync_state.pending_sync = true;
205 continue;
206 }
207 num_confirmed += confirmed_txs.len();
208 sync_state
209 .sync_confirmed_transactions(&confirmables, confirmed_txs);
210 },
211 Err(err) => {
212 log_error!(self.logger,
214 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
215 num_confirmed,
216 num_unconfirmed
217 );
218 sync_state.pending_sync = true;
219 return Err(TxSyncError::from(err));
220 },
221 }
222 },
223 Err(InternalError::Inconsistency) => {
224 log_debug!(
226 self.logger,
227 "Encountered inconsistency during transaction sync, restarting."
228 );
229 sync_state.pending_sync = true;
230 continue;
231 },
232 Err(err) => {
233 log_error!(self.logger,
235 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
236 num_confirmed,
237 num_unconfirmed
238 );
239 sync_state.pending_sync = true;
240 return Err(TxSyncError::from(err));
241 },
242 }
243 sync_state.last_sync_hash = Some(tip_hash);
244 sync_state.pending_sync = false;
245 }
246 }
247 #[cfg(feature = "time")]
248 log_debug!(
249 self.logger,
250 "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
251 tip_hash,
252 start_time.elapsed().as_millis(),
253 num_confirmed,
254 num_unconfirmed
255 );
256 #[cfg(not(feature = "time"))]
257 log_debug!(
258 self.logger,
259 "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
260 tip_hash,
261 num_confirmed,
262 num_unconfirmed
263 );
264 Ok(())
265 }
266
267 #[maybe_async]
268 fn sync_best_block_updated<C: Deref>(
269 &self, confirmables: &Vec<C>, sync_state: &mut SyncState, tip_hash: &BlockHash,
270 ) -> Result<(), InternalError>
271 where
272 C::Target: Confirm,
273 {
274 let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
276 let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
277 if tip_status.in_best_chain {
278 if let Some(tip_height) = tip_status.height {
279 for c in confirmables {
280 c.best_block_updated(&tip_header, tip_height);
281 }
282
283 sync_state.prune_output_spends(tip_height);
285 }
286 } else {
287 return Err(InternalError::Inconsistency);
288 }
289 Ok(())
290 }
291
292 #[maybe_async]
293 fn get_confirmed_transactions(
294 &self, sync_state: &SyncState,
295 ) -> Result<Vec<ConfirmedTx>, InternalError> {
296 let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
300
301 for txid in &sync_state.watched_transactions {
302 if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
303 continue;
304 }
305 if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
306 confirmed_txs.push(confirmed_tx);
307 }
308 }
309
310 for (_, output) in &sync_state.watched_outputs {
311 if let Some(output_status) = maybe_await!(self
312 .client
313 .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
314 {
315 if let Some(spending_txid) = output_status.txid {
316 if let Some(spending_tx_status) = output_status.status {
317 if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
318 if spending_tx_status.confirmed {
319 continue;
321 } else {
322 log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
323 return Err(InternalError::Inconsistency);
324 }
325 }
326
327 if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
328 spending_txid,
329 spending_tx_status.block_hash,
330 spending_tx_status.block_height,
331 ))? {
332 confirmed_txs.push(confirmed_tx);
333 }
334 }
335 }
336 }
337 }
338
339 confirmed_txs.sort_unstable_by(|tx1, tx2| {
342 tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
343 });
344
345 Ok(confirmed_txs)
346 }
347
348 #[maybe_async]
349 fn get_confirmed_tx(
350 &self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
351 ) -> Result<Option<ConfirmedTx>, InternalError> {
352 if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
353 let block_header = merkle_block.header;
354 let block_hash = block_header.block_hash();
355 if let Some(expected_block_hash) = expected_block_hash {
356 if expected_block_hash != block_hash {
357 log_trace!(
358 self.logger,
359 "Inconsistency: Tx {} expected in block {}, but is confirmed in {}",
360 txid,
361 expected_block_hash,
362 block_hash
363 );
364 return Err(InternalError::Inconsistency);
365 }
366 }
367
368 let mut matches = Vec::new();
369 let mut indexes = Vec::new();
370 let computed_merkle_root =
371 merkle_block.txn.extract_matches(&mut matches, &mut indexes).ok();
372 if computed_merkle_root != Some(block_header.merkle_root)
373 || indexes.len() != 1
374 || matches.len() != 1
375 || matches[0] != txid
376 {
377 log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
378 return Err(InternalError::Failed);
379 }
380
381 let pos = *indexes.first().unwrap() as usize;
383 if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
384 if tx.compute_txid() != txid {
385 log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
386 return Err(InternalError::Failed);
387 }
388
389 if tx.total_size() == 64 {
397 log_error!(
398 self.logger,
399 "Skipping transaction {} due to retrieving potentially invalid tx data.",
400 txid
401 );
402 return Ok(None);
403 }
404
405 if let Some(block_height) = known_block_height {
406 return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
408 }
409
410 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
411 if let Some(block_height) = block_status.height {
412 return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
413 } else {
414 log_trace!(
417 self.logger,
418 "Inconsistency: Tx {} was unconfirmed during syncing.",
419 txid
420 );
421 return Err(InternalError::Inconsistency);
422 }
423 }
424 }
425 Ok(None)
426 }
427
428 #[maybe_async]
429 fn get_unconfirmed_transactions<C: Deref>(
430 &self, confirmables: &Vec<C>,
431 ) -> Result<Vec<Txid>, InternalError>
432 where
433 C::Target: Confirm,
434 {
435 let relevant_txids = confirmables
438 .iter()
439 .flat_map(|c| c.get_relevant_txids())
440 .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
441
442 let mut unconfirmed_txs = Vec::new();
443
444 for (txid, _conf_height, block_hash_opt) in relevant_txids {
445 if let Some(block_hash) = block_hash_opt {
446 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
447 if block_status.in_best_chain {
448 continue;
450 }
451
452 unconfirmed_txs.push(txid);
453 } else {
454 log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
455 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
456 }
457 }
458 Ok(unconfirmed_txs)
459 }
460
461 pub fn client(&self) -> &EsploraClientType {
465 &self.client
466 }
467}
468
469#[cfg(feature = "async-interface")]
470type MutexType<I> = futures::lock::Mutex<I>;
471#[cfg(not(feature = "async-interface"))]
472type MutexType<I> = std::sync::Mutex<I>;
473
474#[cfg(feature = "async-interface")]
476type EsploraClientType = AsyncClient;
477#[cfg(not(feature = "async-interface"))]
478type EsploraClientType = BlockingClient;
479
480impl<L: Deref> Filter for EsploraSyncClient<L>
481where
482 L::Target: Logger,
483{
484 fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
485 let mut locked_queue = self.queue.lock().unwrap();
486 locked_queue.transactions.insert(*txid);
487 }
488
489 fn register_output(&self, output: WatchedOutput) {
490 let mut locked_queue = self.queue.lock().unwrap();
491 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
492 }
493}