lightning_transaction_sync/
electrum.rs1use crate::common::{ConfirmedTx, FilterQueue, SyncState};
9use crate::error::{InternalError, TxSyncError};
10
11use electrum_client::utils::validate_merkle_proof;
12use electrum_client::Client as ElectrumClient;
13use electrum_client::ElectrumApi;
14
15use lightning::chain::WatchedOutput;
16use lightning::chain::{Confirm, Filter};
17use lightning::util::logger::Logger;
18use lightning::{log_debug, log_error, log_trace};
19
20use bitcoin::block::Header;
21use bitcoin::{BlockHash, Script, Transaction, Txid};
22
23use std::collections::HashSet;
24use std::ops::Deref;
25use std::sync::Mutex;
26use std::time::Instant;
27
28pub struct ElectrumSyncClient<L: Deref>
41where
42 L::Target: Logger,
43{
44 sync_state: Mutex<SyncState>,
45 queue: Mutex<FilterQueue>,
46 client: ElectrumClient,
47 logger: L,
48}
49
50impl<L: Deref> ElectrumSyncClient<L>
51where
52 L::Target: Logger,
53{
54 pub fn new(server_url: String, logger: L) -> Result<Self, TxSyncError> {
56 let client = ElectrumClient::new(&server_url).map_err(|e| {
57 log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e);
58 e
59 })?;
60
61 Self::from_client(client, logger)
62 }
63
64 pub fn from_client(client: ElectrumClient, logger: L) -> Result<Self, TxSyncError> {
68 let sync_state = Mutex::new(SyncState::new());
69 let queue = Mutex::new(FilterQueue::new());
70
71 Ok(Self { sync_state, queue, client, logger })
72 }
73
74 pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
86 where
87 C::Target: Confirm,
88 {
89 let mut sync_state = self.sync_state.lock().unwrap();
91
92 log_trace!(self.logger, "Starting transaction sync.");
93 #[cfg(feature = "time")]
94 let start_time = Instant::now();
95 let mut num_confirmed = 0;
96 let mut num_unconfirmed = 0;
97
98 while let Some(_) = self.client.block_headers_pop()? {}
100
101 let tip_notification = self.client.block_headers_subscribe()?;
102 let mut tip_header = tip_notification.header;
103 let mut tip_height = tip_notification.height as u32;
104
105 loop {
106 let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
107 let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash;
108
109 if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
112 break;
114 } else {
115 if tip_is_new {
117 match self.get_unconfirmed_transactions(&confirmables) {
119 Ok(unconfirmed_txs) => {
120 match self.check_update_tip(&mut tip_header, &mut tip_height) {
123 Ok(false) => {
124 num_unconfirmed += unconfirmed_txs.len();
125 sync_state.sync_unconfirmed_transactions(
126 &confirmables,
127 unconfirmed_txs,
128 );
129 },
130 Ok(true) => {
131 log_debug!(self.logger,
132 "Encountered inconsistency during transaction sync, restarting.");
133 sync_state.pending_sync = true;
134 continue;
135 },
136 Err(err) => {
137 log_error!(self.logger,
139 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
140 num_confirmed,
141 num_unconfirmed
142 );
143 sync_state.pending_sync = true;
144 return Err(TxSyncError::from(err));
145 },
146 }
147 },
148 Err(err) => {
149 log_error!(self.logger,
151 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
152 num_confirmed,
153 num_unconfirmed
154 );
155 sync_state.pending_sync = true;
156 return Err(TxSyncError::from(err));
157 },
158 }
159
160 for c in &confirmables {
162 c.best_block_updated(&tip_header, tip_height);
163 }
164
165 sync_state.prune_output_spends(tip_height);
167 }
168
169 match self.get_confirmed_transactions(&sync_state) {
170 Ok(confirmed_txs) => {
171 match self.check_update_tip(&mut tip_header, &mut tip_height) {
174 Ok(false) => {
175 num_confirmed += confirmed_txs.len();
176 sync_state
177 .sync_confirmed_transactions(&confirmables, confirmed_txs);
178 },
179 Ok(true) => {
180 log_debug!(self.logger,
181 "Encountered inconsistency during transaction sync, restarting.");
182 sync_state.pending_sync = true;
183 continue;
184 },
185 Err(err) => {
186 log_error!(self.logger,
188 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
189 num_confirmed,
190 num_unconfirmed
191 );
192 sync_state.pending_sync = true;
193 return Err(TxSyncError::from(err));
194 },
195 }
196 },
197 Err(InternalError::Inconsistency) => {
198 log_debug!(
200 self.logger,
201 "Encountered inconsistency during transaction sync, restarting."
202 );
203 sync_state.pending_sync = true;
204 continue;
205 },
206 Err(err) => {
207 log_error!(self.logger,
209 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
210 num_confirmed,
211 num_unconfirmed
212 );
213 sync_state.pending_sync = true;
214 return Err(TxSyncError::from(err));
215 },
216 }
217 sync_state.last_sync_hash = Some(tip_header.block_hash());
218 sync_state.pending_sync = false;
219 }
220 }
221 #[cfg(feature = "time")]
222 log_debug!(
223 self.logger,
224 "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
225 tip_header.block_hash(),
226 start_time.elapsed().as_millis(),
227 num_confirmed,
228 num_unconfirmed
229 );
230 #[cfg(not(feature = "time"))]
231 log_debug!(
232 self.logger,
233 "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
234 tip_header.block_hash(),
235 num_confirmed,
236 num_unconfirmed
237 );
238 Ok(())
239 }
240
241 fn check_update_tip(
242 &self, cur_tip_header: &mut Header, cur_tip_height: &mut u32,
243 ) -> Result<bool, InternalError> {
244 let check_notification = self.client.block_headers_subscribe()?;
245 let check_tip_hash = check_notification.header.block_hash();
246
247 let mut restart_sync = check_tip_hash != cur_tip_header.block_hash();
251 while let Some(queued_notif) = self.client.block_headers_pop()? {
252 if queued_notif.header.block_hash() != check_tip_hash {
253 restart_sync = true
254 }
255 }
256
257 if restart_sync {
258 *cur_tip_header = check_notification.header;
259 *cur_tip_height = check_notification.height as u32;
260 Ok(true)
261 } else {
262 Ok(false)
263 }
264 }
265
266 fn get_confirmed_transactions(
267 &self, sync_state: &SyncState,
268 ) -> Result<Vec<ConfirmedTx>, InternalError> {
269 let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
272 let mut watched_script_pubkeys = Vec::with_capacity(
273 sync_state.watched_transactions.len() + sync_state.watched_outputs.len(),
274 );
275 let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
276
277 for txid in &sync_state.watched_transactions {
278 match self.client.transaction_get(&txid) {
279 Ok(tx) => {
280 if tx.total_size() == 64 {
288 log_error!(self.logger, "Skipping transaction {} due to retrieving potentially invalid tx data.", txid);
289 continue;
290 }
291
292 watched_txs.push((txid, tx.clone()));
293 if let Some(tx_out) = tx.output.first() {
294 watched_script_pubkeys.push(tx_out.script_pubkey.clone());
298 } else {
299 debug_assert!(false, "Failed due to retrieving invalid tx data.");
300 log_error!(self.logger, "Failed due to retrieving invalid tx data.");
301 return Err(InternalError::Failed);
302 }
303 },
304 Err(electrum_client::Error::Protocol(_)) => {
305 },
307 Err(e) => {
308 log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
309 return Err(InternalError::Failed);
310 },
311 }
312 }
313
314 let num_tx_lookups = watched_script_pubkeys.len();
315 debug_assert_eq!(num_tx_lookups, watched_txs.len());
316
317 for output in sync_state.watched_outputs.values() {
318 watched_script_pubkeys.push(output.script_pubkey.clone());
319 }
320
321 let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups;
322 debug_assert_eq!(num_output_spend_lookups, sync_state.watched_outputs.len());
323
324 match self.client.batch_script_get_history(watched_script_pubkeys.iter().map(|s| s.deref()))
325 {
326 Ok(results) => {
327 let (tx_results, output_results) = results.split_at(num_tx_lookups);
328 debug_assert_eq!(num_output_spend_lookups, output_results.len());
329
330 for (i, script_history) in tx_results.iter().enumerate() {
331 let (txid, tx) = &watched_txs[i];
332 if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
333 continue;
334 }
335 let mut filtered_history =
336 script_history.iter().filter(|h| h.tx_hash == **txid);
337 if let Some(history) = filtered_history.next() {
338 let prob_conf_height = history.height as u32;
339 let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
340 confirmed_txs.push(confirmed_tx);
341 }
342 debug_assert!(filtered_history.next().is_none());
343 }
344
345 for (watched_output, script_history) in
346 sync_state.watched_outputs.values().zip(output_results)
347 {
348 for possible_output_spend in script_history {
349 if possible_output_spend.height <= 0 {
350 continue;
351 }
352
353 let txid = possible_output_spend.tx_hash;
354 if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
355 continue;
356 }
357
358 match self.client.transaction_get(&txid) {
359 Ok(tx) => {
360 let mut is_spend = false;
361 for txin in &tx.input {
362 let watched_outpoint =
363 watched_output.outpoint.into_bitcoin_outpoint();
364 if txin.previous_output == watched_outpoint {
365 is_spend = true;
366 break;
367 }
368 }
369
370 if !is_spend {
371 continue;
372 }
373
374 let prob_conf_height = possible_output_spend.height as u32;
375 let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
376 confirmed_txs.push(confirmed_tx);
377 },
378 Err(e) => {
379 log_trace!(
380 self.logger,
381 "Inconsistency: Tx {} was unconfirmed during syncing: {}",
382 txid,
383 e
384 );
385 return Err(InternalError::Inconsistency);
386 },
387 }
388 }
389 }
390 },
391 Err(e) => {
392 log_error!(self.logger, "Failed to look up script histories: {}.", e);
393 return Err(InternalError::Failed);
394 },
395 }
396
397 confirmed_txs.sort_unstable_by(|tx1, tx2| {
400 tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
401 });
402
403 Ok(confirmed_txs)
404 }
405
406 fn get_unconfirmed_transactions<C: Deref>(
407 &self, confirmables: &Vec<C>,
408 ) -> Result<Vec<Txid>, InternalError>
409 where
410 C::Target: Confirm,
411 {
412 let relevant_txids = confirmables
415 .iter()
416 .flat_map(|c| c.get_relevant_txids())
417 .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
418
419 let mut unconfirmed_txs = Vec::new();
420
421 for (txid, conf_height, block_hash_opt) in relevant_txids {
422 if let Some(block_hash) = block_hash_opt {
423 let block_header = self.client.block_header(conf_height as usize)?;
424 if block_header.block_hash() == block_hash {
425 continue;
427 }
428
429 unconfirmed_txs.push(txid);
430 } else {
431 log_error!(self.logger,
432 "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
433 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
434 }
435 }
436 Ok(unconfirmed_txs)
437 }
438
439 fn get_confirmed_tx(
440 &self, tx: &Transaction, prob_conf_height: u32,
441 ) -> Result<ConfirmedTx, InternalError> {
442 let txid = tx.compute_txid();
443 match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
444 Ok(merkle_res) => {
445 debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32);
446 match self.client.block_header(prob_conf_height as usize) {
447 Ok(block_header) => {
448 let pos = merkle_res.pos;
449 if !validate_merkle_proof(&txid, &block_header.merkle_root, &merkle_res) {
450 log_trace!(
451 self.logger,
452 "Inconsistency: Block {} was unconfirmed during syncing.",
453 block_header.block_hash()
454 );
455 return Err(InternalError::Inconsistency);
456 }
457 let confirmed_tx = ConfirmedTx {
458 tx: tx.clone(),
459 txid,
460 block_header,
461 block_height: prob_conf_height,
462 pos,
463 };
464 Ok(confirmed_tx)
465 },
466 Err(e) => {
467 log_error!(
468 self.logger,
469 "Failed to retrieve block header for height {}: {}.",
470 prob_conf_height,
471 e
472 );
473 Err(InternalError::Failed)
474 },
475 }
476 },
477 Err(e) => {
478 log_trace!(
479 self.logger,
480 "Inconsistency: Tx {} was unconfirmed during syncing: {}",
481 txid,
482 e
483 );
484 Err(InternalError::Inconsistency)
485 },
486 }
487 }
488
489 pub fn client(&self) -> &ElectrumClient {
493 &self.client
494 }
495}
496
497impl<L: Deref> Filter for ElectrumSyncClient<L>
498where
499 L::Target: Logger,
500{
501 fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
502 let mut locked_queue = self.queue.lock().unwrap();
503 locked_queue.transactions.insert(*txid);
504 }
505
506 fn register_output(&self, output: WatchedOutput) {
507 let mut locked_queue = self.queue.lock().unwrap();
508 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
509 }
510}