use crate::error::{TxSyncError, InternalError};
use crate::common::{SyncState, FilterQueue, ConfirmedTx};
use lightning::util::logger::Logger;
use lightning::{log_error, log_debug, log_trace};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};
use bitcoin::{BlockHash, Script, Txid};
use esplora_client::Builder;
#[cfg(feature = "async-interface")]
use esplora_client::r#async::AsyncClient;
#[cfg(not(feature = "async-interface"))]
use esplora_client::blocking::BlockingClient;
use std::time::Instant;
use std::collections::HashSet;
use core::ops::Deref;
pub struct EsploraSyncClient<L: Deref>
where
L::Target: Logger,
{
sync_state: MutexType<SyncState>,
queue: std::sync::Mutex<FilterQueue>,
client: EsploraClientType,
logger: L,
}
impl<L: Deref> EsploraSyncClient<L>
where
L::Target: Logger,
{
pub fn new(server_url: String, logger: L) -> Self {
let builder = Builder::new(&server_url);
#[cfg(not(feature = "async-interface"))]
let client = builder.build_blocking().unwrap();
#[cfg(feature = "async-interface")]
let client = builder.build_async().unwrap();
EsploraSyncClient::from_client(client, logger)
}
pub fn from_client(client: EsploraClientType, logger: L) -> Self {
let sync_state = MutexType::new(SyncState::new());
let queue = std::sync::Mutex::new(FilterQueue::new());
Self {
sync_state,
queue,
client,
logger,
}
}
#[maybe_async]
pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
#[cfg(not(feature = "async-interface"))]
let mut sync_state = self.sync_state.lock().unwrap();
#[cfg(feature = "async-interface")]
let mut sync_state = self.sync_state.lock().await;
log_trace!(self.logger, "Starting transaction sync.");
let start_time = Instant::now();
let mut num_confirmed = 0;
let mut num_unconfirmed = 0;
let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
loop {
let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
break;
} else {
if tip_is_new {
match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
Ok(unconfirmed_txs) => {
match maybe_await!(self.client.get_tip_hash()) {
Ok(check_tip_hash) => {
if check_tip_hash != tip_hash {
tip_hash = check_tip_hash;
log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
num_unconfirmed += unconfirmed_txs.len();
sync_state.sync_unconfirmed_transactions(
&confirmables,
unconfirmed_txs
);
}
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
},
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
Ok(()) => {}
Err(InternalError::Inconsistency) => {
log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
}
match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
Ok(confirmed_txs) => {
match maybe_await!(self.client.get_tip_hash()) {
Ok(check_tip_hash) => {
if check_tip_hash != tip_hash {
tip_hash = check_tip_hash;
log_debug!(self.logger,
"Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
num_confirmed += confirmed_txs.len();
sync_state.sync_confirmed_transactions(
&confirmables,
confirmed_txs
);
}
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
}
Err(InternalError::Inconsistency) => {
log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
sync_state.last_sync_hash = Some(tip_hash);
sync_state.pending_sync = false;
}
}
log_debug!(self.logger, "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
tip_hash, start_time.elapsed().as_millis(), num_confirmed, num_unconfirmed);
Ok(())
}
#[maybe_async]
fn sync_best_block_updated(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
) -> Result<(), InternalError> {
let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
if tip_status.in_best_chain {
if let Some(tip_height) = tip_status.height {
for c in confirmables {
c.best_block_updated(&tip_header, tip_height);
}
}
} else {
return Err(InternalError::Inconsistency);
}
Ok(())
}
#[maybe_async]
fn get_confirmed_transactions(
&self, sync_state: &SyncState,
) -> Result<Vec<ConfirmedTx>, InternalError> {
let mut confirmed_txs = Vec::new();
for txid in &sync_state.watched_transactions {
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
confirmed_txs.push(confirmed_tx);
}
}
for (_, output) in &sync_state.watched_outputs {
if let Some(output_status) = maybe_await!(self.client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
{
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
if let Some(confirmed_tx) = maybe_await!(self
.get_confirmed_tx(
&spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
))?
{
confirmed_txs.push(confirmed_tx);
}
}
}
}
}
confirmed_txs.sort_unstable_by(|tx1, tx2| {
tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
});
Ok(confirmed_txs)
}
#[maybe_async]
fn get_confirmed_tx(
&self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
) -> Result<Option<ConfirmedTx>, InternalError> {
if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
let block_header = merkle_block.header;
let block_hash = block_header.block_hash();
if let Some(expected_block_hash) = expected_block_hash {
if expected_block_hash != block_hash {
log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash);
return Err(InternalError::Inconsistency);
}
}
let mut matches = Vec::new();
let mut indexes = Vec::new();
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
return Err(InternalError::Failed);
}
let pos = *indexes.first().unwrap() as usize;
if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
if let Some(block_height) = known_block_height {
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
}
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
if let Some(block_height) = block_status.height {
return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
} else {
log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid);
return Err(InternalError::Inconsistency);
}
}
}
Ok(None)
}
#[maybe_async]
fn get_unconfirmed_transactions(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
) -> Result<Vec<Txid>, InternalError> {
let relevant_txids = confirmables
.iter()
.flat_map(|c| c.get_relevant_txids())
.collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
let mut unconfirmed_txs = Vec::new();
for (txid, _conf_height, block_hash_opt) in relevant_txids {
if let Some(block_hash) = block_hash_opt {
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
if block_status.in_best_chain {
continue;
}
unconfirmed_txs.push(txid);
} else {
log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
}
}
Ok(unconfirmed_txs)
}
pub fn client(&self) -> &EsploraClientType {
&self.client
}
}
#[cfg(feature = "async-interface")]
type MutexType<I> = futures::lock::Mutex<I>;
#[cfg(not(feature = "async-interface"))]
type MutexType<I> = std::sync::Mutex<I>;
#[cfg(feature = "async-interface")]
type EsploraClientType = AsyncClient;
#[cfg(not(feature = "async-interface"))]
type EsploraClientType = BlockingClient;
impl<L: Deref> Filter for EsploraSyncClient<L>
where
L::Target: Logger,
{
fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.transactions.insert(*txid);
}
fn register_output(&self, output: WatchedOutput) {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
}
}