1use crate::common::{ConfirmedTx, FilterQueue, SyncState};
9use crate::error::{InternalError, TxSyncError};
10
11use electrum_client::utils::validate_merkle_proof;
12use electrum_client::Client as ElectrumClient;
13use electrum_client::ElectrumApi;
14
15use lightning::chain::WatchedOutput;
16use lightning::chain::{Confirm, Filter};
17use lightning::util::logger::Logger;
18use lightning::{log_debug, log_error, log_trace};
19
20use bitcoin::block::Header;
21use bitcoin::{BlockHash, Script, Transaction, Txid};
22
23use std::collections::HashSet;
24use std::ops::Deref;
25use std::sync::Mutex;
26use std::time::Instant;
27
28pub struct ElectrumSyncClient<L: Deref>
41where
42 L::Target: Logger,
43{
44 sync_state: Mutex<SyncState>,
45 queue: Mutex<FilterQueue>,
46 client: ElectrumClient,
47 logger: L,
48}
49
50impl<L: Deref> ElectrumSyncClient<L>
51where
52 L::Target: Logger,
53{
54 pub fn new(server_url: String, logger: L) -> Result<Self, TxSyncError> {
56 let client = ElectrumClient::new(&server_url).map_err(|e| {
57 log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e);
58 e
59 })?;
60
61 Self::from_client(client, logger)
62 }
63
64 pub fn from_client(client: ElectrumClient, logger: L) -> Result<Self, TxSyncError> {
68 let sync_state = Mutex::new(SyncState::new());
69 let queue = Mutex::new(FilterQueue::new());
70
71 Ok(Self { sync_state, queue, client, logger })
72 }
73
74 pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
86 where
87 C::Target: Confirm,
88 {
89 let mut sync_state = self.sync_state.lock().unwrap();
91
92 log_trace!(self.logger, "Starting transaction sync.");
93 #[cfg(feature = "time")]
94 let start_time = Instant::now();
95 let mut num_confirmed = 0;
96 let mut num_unconfirmed = 0;
97
98 while let Some(_) = self.client.block_headers_pop()? {}
100
101 let tip_notification = self.client.block_headers_subscribe()?;
102 let mut tip_header = tip_notification.header;
103 let mut tip_height = tip_notification.height as u32;
104
105 loop {
106 let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
107 let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash;
108
109 if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
112 break;
114 } else {
115 if tip_is_new {
117 match self.get_unconfirmed_transactions(&confirmables) {
119 Ok(unconfirmed_txs) => {
120 match self.check_update_tip(&mut tip_header, &mut tip_height) {
123 Ok(false) => {
124 num_unconfirmed += unconfirmed_txs.len();
125 sync_state.sync_unconfirmed_transactions(
126 &confirmables,
127 unconfirmed_txs,
128 );
129 },
130 Ok(true) => {
131 log_debug!(self.logger,
132 "Encountered inconsistency during transaction sync, restarting.");
133 sync_state.pending_sync = true;
134 continue;
135 },
136 Err(err) => {
137 log_error!(self.logger,
139 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
140 num_confirmed,
141 num_unconfirmed
142 );
143 sync_state.pending_sync = true;
144 return Err(TxSyncError::from(err));
145 },
146 }
147 },
148 Err(err) => {
149 log_error!(self.logger,
151 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
152 num_confirmed,
153 num_unconfirmed
154 );
155 sync_state.pending_sync = true;
156 return Err(TxSyncError::from(err));
157 },
158 }
159
160 for c in &confirmables {
162 c.best_block_updated(&tip_header, tip_height);
163 }
164
165 sync_state.prune_output_spends(tip_height);
167 }
168
169 match self.get_confirmed_transactions(&sync_state) {
170 Ok(confirmed_txs) => {
171 match self.check_update_tip(&mut tip_header, &mut tip_height) {
174 Ok(false) => {
175 num_confirmed += confirmed_txs.len();
176 sync_state
177 .sync_confirmed_transactions(&confirmables, confirmed_txs);
178 },
179 Ok(true) => {
180 log_debug!(self.logger,
181 "Encountered inconsistency during transaction sync, restarting.");
182 sync_state.pending_sync = true;
183 continue;
184 },
185 Err(err) => {
186 log_error!(self.logger,
188 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
189 num_confirmed,
190 num_unconfirmed
191 );
192 sync_state.pending_sync = true;
193 return Err(TxSyncError::from(err));
194 },
195 }
196 },
197 Err(InternalError::Inconsistency) => {
198 log_debug!(
200 self.logger,
201 "Encountered inconsistency during transaction sync, restarting."
202 );
203 sync_state.pending_sync = true;
204 continue;
205 },
206 Err(err) => {
207 log_error!(self.logger,
209 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
210 num_confirmed,
211 num_unconfirmed
212 );
213 sync_state.pending_sync = true;
214 return Err(TxSyncError::from(err));
215 },
216 }
217 sync_state.last_sync_hash = Some(tip_header.block_hash());
218 sync_state.pending_sync = false;
219 }
220 }
221 #[cfg(feature = "time")]
222 log_debug!(
223 self.logger,
224 "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
225 tip_header.block_hash(),
226 start_time.elapsed().as_millis(),
227 num_confirmed,
228 num_unconfirmed
229 );
230 #[cfg(not(feature = "time"))]
231 log_debug!(
232 self.logger,
233 "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
234 tip_header.block_hash(),
235 num_confirmed,
236 num_unconfirmed
237 );
238 Ok(())
239 }
240
241 fn check_update_tip(
242 &self, cur_tip_header: &mut Header, cur_tip_height: &mut u32,
243 ) -> Result<bool, InternalError> {
244 let check_notification = self.client.block_headers_subscribe()?;
245 let check_tip_hash = check_notification.header.block_hash();
246
247 let mut restart_sync = check_tip_hash != cur_tip_header.block_hash();
251 while let Some(queued_notif) = self.client.block_headers_pop()? {
252 if queued_notif.header.block_hash() != check_tip_hash {
253 restart_sync = true
254 }
255 }
256
257 if restart_sync {
258 *cur_tip_header = check_notification.header;
259 *cur_tip_height = check_notification.height as u32;
260 Ok(true)
261 } else {
262 Ok(false)
263 }
264 }
265
266 fn get_confirmed_transactions(
267 &self, sync_state: &SyncState,
268 ) -> Result<Vec<ConfirmedTx>, InternalError> {
269 let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
272 let mut watched_script_pubkeys = Vec::with_capacity(
273 sync_state.watched_transactions.len() + sync_state.watched_outputs.len(),
274 );
275 let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
276
277 for txid in &sync_state.watched_transactions {
278 match self.client.transaction_get(&txid) {
279 Ok(tx) => {
280 if tx.total_size() == 64 {
288 log_error!(self.logger, "Skipping transaction {} due to retrieving potentially invalid tx data.", txid);
289 continue;
290 }
291
292 watched_txs.push((txid, tx.clone()));
293 if let Some(tx_out) = tx.output.first() {
294 watched_script_pubkeys.push(tx_out.script_pubkey.clone());
298 } else {
299 debug_assert!(false, "Failed due to retrieving invalid tx data.");
300 log_error!(self.logger, "Failed due to retrieving invalid tx data.");
301 return Err(InternalError::Failed);
302 }
303 },
304 Err(electrum_client::Error::Protocol(_)) => {
305 },
307 Err(e) => {
308 log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
309 return Err(InternalError::Failed);
310 },
311 }
312 }
313
314 let num_tx_lookups = watched_script_pubkeys.len();
315 debug_assert_eq!(num_tx_lookups, watched_txs.len());
316
317 for output in sync_state.watched_outputs.values() {
318 watched_script_pubkeys.push(output.script_pubkey.clone());
319 }
320
321 let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups;
322 debug_assert_eq!(num_output_spend_lookups, sync_state.watched_outputs.len());
323
324 match self.client.batch_script_get_history(watched_script_pubkeys.iter().map(|s| s.deref()))
325 {
326 Ok(results) => {
327 let (tx_results, output_results) = results.split_at(num_tx_lookups);
328 debug_assert_eq!(num_output_spend_lookups, output_results.len());
329
330 for (i, script_history) in tx_results.iter().enumerate() {
331 let (txid, tx) = &watched_txs[i];
332 if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
333 continue;
334 }
335 let mut filtered_history =
336 script_history.iter().filter(|h| h.tx_hash == **txid);
337 if let Some(history) = filtered_history.next() {
338 let prob_conf_height = history.height as u32;
339 if prob_conf_height <= 0 {
340 continue;
342 }
343 let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
344 confirmed_txs.push(confirmed_tx);
345 }
346 if filtered_history.next().is_some() {
347 log_error!(
348 self.logger,
349 "Failed due to server returning multiple history entries for Tx {}.",
350 txid
351 );
352 return Err(InternalError::Failed);
353 }
354 }
355
356 for (watched_output, script_history) in
357 sync_state.watched_outputs.values().zip(output_results)
358 {
359 for possible_output_spend in script_history {
360 if possible_output_spend.height <= 0 {
361 continue;
363 }
364
365 let txid = possible_output_spend.tx_hash;
366 if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
367 continue;
368 }
369
370 match self.client.transaction_get(&txid) {
371 Ok(tx) => {
372 let mut is_spend = false;
373 for txin in &tx.input {
374 let watched_outpoint =
375 watched_output.outpoint.into_bitcoin_outpoint();
376 if txin.previous_output == watched_outpoint {
377 is_spend = true;
378 break;
379 }
380 }
381
382 if !is_spend {
383 continue;
384 }
385
386 let prob_conf_height = possible_output_spend.height as u32;
387 let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
388 confirmed_txs.push(confirmed_tx);
389 },
390 Err(e) => {
391 log_trace!(
392 self.logger,
393 "Inconsistency: Tx {} was unconfirmed during syncing: {}",
394 txid,
395 e
396 );
397 return Err(InternalError::Inconsistency);
398 },
399 }
400 }
401 }
402 },
403 Err(e) => {
404 log_error!(self.logger, "Failed to look up script histories: {}.", e);
405 return Err(InternalError::Failed);
406 },
407 }
408
409 confirmed_txs.sort_unstable_by(|tx1, tx2| {
412 tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
413 });
414
415 Ok(confirmed_txs)
416 }
417
418 fn get_unconfirmed_transactions<C: Deref>(
419 &self, confirmables: &Vec<C>,
420 ) -> Result<Vec<Txid>, InternalError>
421 where
422 C::Target: Confirm,
423 {
424 let relevant_txids = confirmables
427 .iter()
428 .flat_map(|c| c.get_relevant_txids())
429 .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
430
431 let mut unconfirmed_txs = Vec::new();
432
433 for (txid, conf_height, block_hash_opt) in relevant_txids {
434 if let Some(block_hash) = block_hash_opt {
435 let block_header = self.client.block_header(conf_height as usize)?;
436 if block_header.block_hash() == block_hash {
437 continue;
439 }
440
441 unconfirmed_txs.push(txid);
442 } else {
443 log_error!(self.logger,
444 "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
445 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
446 }
447 }
448 Ok(unconfirmed_txs)
449 }
450
451 fn get_confirmed_tx(
452 &self, tx: &Transaction, prob_conf_height: u32,
453 ) -> Result<ConfirmedTx, InternalError> {
454 let txid = tx.compute_txid();
455 match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
456 Ok(merkle_res) => {
457 debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32);
458 match self.client.block_header(prob_conf_height as usize) {
459 Ok(block_header) => {
460 let pos = merkle_res.pos;
461 if !validate_merkle_proof(&txid, &block_header.merkle_root, &merkle_res) {
462 log_trace!(
463 self.logger,
464 "Inconsistency: Block {} was unconfirmed during syncing.",
465 block_header.block_hash()
466 );
467 return Err(InternalError::Inconsistency);
468 }
469 let confirmed_tx = ConfirmedTx {
470 tx: tx.clone(),
471 txid,
472 block_header,
473 block_height: prob_conf_height,
474 pos,
475 };
476 Ok(confirmed_tx)
477 },
478 Err(e) => {
479 log_error!(
480 self.logger,
481 "Failed to retrieve block header for height {}: {}.",
482 prob_conf_height,
483 e
484 );
485 Err(InternalError::Failed)
486 },
487 }
488 },
489 Err(e) => {
490 log_trace!(
491 self.logger,
492 "Inconsistency: Tx {} was unconfirmed during syncing: {}",
493 txid,
494 e
495 );
496 Err(InternalError::Inconsistency)
497 },
498 }
499 }
500
501 pub fn client(&self) -> &ElectrumClient {
505 &self.client
506 }
507}
508
509impl<L: Deref> Filter for ElectrumSyncClient<L>
510where
511 L::Target: Logger,
512{
513 fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
514 let mut locked_queue = self.queue.lock().unwrap();
515 locked_queue.transactions.insert(*txid);
516 }
517
518 fn register_output(&self, output: WatchedOutput) {
519 let mut locked_queue = self.queue.lock().unwrap();
520 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
521 }
522}