use crate::common::{ConfirmedTx, SyncState, FilterQueue};
use crate::error::{TxSyncError, InternalError};
use electrum_client::Client as ElectrumClient;
use electrum_client::ElectrumApi;
use electrum_client::GetMerkleRes;
use lightning::util::logger::Logger;
use lightning::{log_error, log_debug, log_trace};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};
use bitcoin::{BlockHash, Script, Transaction, Txid};
use bitcoin::block::Header;
use bitcoin::hash_types::TxMerkleNode;
use bitcoin::hashes::Hash;
use bitcoin::hashes::sha256d::Hash as Sha256d;
use std::ops::Deref;
use std::sync::Mutex;
use std::collections::HashSet;
use std::time::Instant;
pub struct ElectrumSyncClient<L: Deref>
where
L::Target: Logger,
{
sync_state: Mutex<SyncState>,
queue: Mutex<FilterQueue>,
client: ElectrumClient,
logger: L,
}
impl<L: Deref> ElectrumSyncClient<L>
where
L::Target: Logger,
{
pub fn new(server_url: String, logger: L) -> Result<Self, TxSyncError> {
let client = ElectrumClient::new(&server_url).map_err(|e| {
log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e);
e
})?;
Self::from_client(client, logger)
}
pub fn from_client(client: ElectrumClient, logger: L) -> Result<Self, TxSyncError> {
let sync_state = Mutex::new(SyncState::new());
let queue = Mutex::new(FilterQueue::new());
Ok(Self {
sync_state,
queue,
client,
logger,
})
}
pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
let mut sync_state = self.sync_state.lock().unwrap();
log_trace!(self.logger, "Starting transaction sync.");
#[cfg(feature = "time")]
let start_time = Instant::now();
let mut num_confirmed = 0;
let mut num_unconfirmed = 0;
while let Some(_) = self.client.block_headers_pop()? {}
let tip_notification = self.client.block_headers_subscribe()?;
let mut tip_header = tip_notification.header;
let mut tip_height = tip_notification.height as u32;
loop {
let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash;
if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
break;
} else {
if tip_is_new {
match self.get_unconfirmed_transactions(&confirmables) {
Ok(unconfirmed_txs) => {
match self.check_update_tip(&mut tip_header, &mut tip_height) {
Ok(false) => {
num_unconfirmed += unconfirmed_txs.len();
sync_state.sync_unconfirmed_transactions(
&confirmables,
unconfirmed_txs
);
}
Ok(true) => {
log_debug!(self.logger,
"Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
},
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
for c in &confirmables {
c.best_block_updated(&tip_header, tip_height);
}
}
match self.get_confirmed_transactions(&sync_state) {
Ok(confirmed_txs) => {
match self.check_update_tip(&mut tip_header, &mut tip_height) {
Ok(false) => {
num_confirmed += confirmed_txs.len();
sync_state.sync_confirmed_transactions(
&confirmables,
confirmed_txs
);
}
Ok(true) => {
log_debug!(self.logger,
"Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
}
Err(InternalError::Inconsistency) => {
log_debug!(self.logger,
"Encountered inconsistency during transaction sync, restarting.");
sync_state.pending_sync = true;
continue;
}
Err(err) => {
log_error!(self.logger,
"Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
num_confirmed,
num_unconfirmed
);
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
sync_state.last_sync_hash = Some(tip_header.block_hash());
sync_state.pending_sync = false;
}
}
#[cfg(feature = "time")]
log_debug!(self.logger,
"Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
tip_header.block_hash(), start_time.elapsed().as_millis(), num_confirmed,
num_unconfirmed);
#[cfg(not(feature = "time"))]
log_debug!(self.logger,
"Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
tip_header.block_hash(), num_confirmed, num_unconfirmed);
Ok(())
}
fn check_update_tip(&self, cur_tip_header: &mut Header, cur_tip_height: &mut u32)
-> Result<bool, InternalError>
{
let check_notification = self.client.block_headers_subscribe()?;
let check_tip_hash = check_notification.header.block_hash();
let mut restart_sync = check_tip_hash != cur_tip_header.block_hash();
while let Some(queued_notif) = self.client.block_headers_pop()? {
if queued_notif.header.block_hash() != check_tip_hash {
restart_sync = true
}
}
if restart_sync {
*cur_tip_header = check_notification.header;
*cur_tip_height = check_notification.height as u32;
Ok(true)
} else {
Ok(false)
}
}
fn get_confirmed_transactions(
&self, sync_state: &SyncState,
) -> Result<Vec<ConfirmedTx>, InternalError> {
let mut confirmed_txs = Vec::new();
let mut watched_script_pubkeys = Vec::with_capacity(
sync_state.watched_transactions.len() + sync_state.watched_outputs.len());
let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
for txid in &sync_state.watched_transactions {
match self.client.transaction_get(&txid) {
Ok(tx) => {
watched_txs.push((txid, tx.clone()));
if let Some(tx_out) = tx.output.first() {
watched_script_pubkeys.push(tx_out.script_pubkey.clone());
} else {
debug_assert!(false, "Failed due to retrieving invalid tx data.");
log_error!(self.logger, "Failed due to retrieving invalid tx data.");
return Err(InternalError::Failed);
}
}
Err(electrum_client::Error::Protocol(_)) => {
}
Err(e) => {
log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
return Err(InternalError::Failed);
}
}
}
let num_tx_lookups = watched_script_pubkeys.len();
debug_assert_eq!(num_tx_lookups, watched_txs.len());
for output in sync_state.watched_outputs.values() {
watched_script_pubkeys.push(output.script_pubkey.clone());
}
let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups;
debug_assert_eq!(num_output_spend_lookups, sync_state.watched_outputs.len());
match self.client.batch_script_get_history(watched_script_pubkeys.iter().map(|s| s.deref()))
{
Ok(results) => {
let (tx_results, output_results) = results.split_at(num_tx_lookups);
debug_assert_eq!(num_output_spend_lookups, output_results.len());
for (i, script_history) in tx_results.iter().enumerate() {
let (txid, tx) = &watched_txs[i];
let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid);
if let Some(history) = filtered_history.next()
{
let prob_conf_height = history.height as u32;
let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
confirmed_txs.push(confirmed_tx);
}
debug_assert!(filtered_history.next().is_none());
}
for (watched_output, script_history) in sync_state.watched_outputs.values()
.zip(output_results)
{
for possible_output_spend in script_history {
if possible_output_spend.height <= 0 {
continue;
}
let txid = possible_output_spend.tx_hash;
match self.client.transaction_get(&txid) {
Ok(tx) => {
let mut is_spend = false;
for txin in &tx.input {
let watched_outpoint = watched_output.outpoint
.into_bitcoin_outpoint();
if txin.previous_output == watched_outpoint {
is_spend = true;
break;
}
}
if !is_spend {
continue;
}
let prob_conf_height = possible_output_spend.height as u32;
let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
confirmed_txs.push(confirmed_tx);
}
Err(e) => {
log_trace!(self.logger,
"Inconsistency: Tx {} was unconfirmed during syncing: {}",
txid, e);
return Err(InternalError::Inconsistency);
}
}
}
}
}
Err(e) => {
log_error!(self.logger, "Failed to look up script histories: {}.", e);
return Err(InternalError::Failed);
}
}
confirmed_txs.sort_unstable_by(|tx1, tx2| {
tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
});
Ok(confirmed_txs)
}
fn get_unconfirmed_transactions(
&self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
) -> Result<Vec<Txid>, InternalError> {
let relevant_txids = confirmables
.iter()
.flat_map(|c| c.get_relevant_txids())
.collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
let mut unconfirmed_txs = Vec::new();
for (txid, conf_height, block_hash_opt) in relevant_txids {
if let Some(block_hash) = block_hash_opt {
let block_header = self.client.block_header(conf_height as usize)?;
if block_header.block_hash() == block_hash {
continue;
}
unconfirmed_txs.push(txid);
} else {
log_error!(self.logger,
"Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
}
}
Ok(unconfirmed_txs)
}
fn get_confirmed_tx(&self, tx: &Transaction, prob_conf_height: u32)
-> Result<ConfirmedTx, InternalError>
{
let txid = tx.txid();
match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
Ok(merkle_res) => {
debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32);
match self.client.block_header(prob_conf_height as usize) {
Ok(block_header) => {
let pos = merkle_res.pos;
if !self.validate_merkle_proof(&txid,
&block_header.merkle_root, merkle_res)?
{
log_trace!(self.logger,
"Inconsistency: Block {} was unconfirmed during syncing.",
block_header.block_hash());
return Err(InternalError::Inconsistency);
}
let confirmed_tx = ConfirmedTx {
tx: tx.clone(),
block_header, block_height: prob_conf_height,
pos,
};
Ok(confirmed_tx)
}
Err(e) => {
log_error!(self.logger,
"Failed to retrieve block header for height {}: {}.",
prob_conf_height, e);
Err(InternalError::Failed)
}
}
}
Err(e) => {
log_trace!(self.logger,
"Inconsistency: Tx {} was unconfirmed during syncing: {}",
txid, e);
Err(InternalError::Inconsistency)
}
}
}
pub fn client(&self) -> &ElectrumClient {
&self.client
}
fn validate_merkle_proof(&self, txid: &Txid, merkle_root: &TxMerkleNode,
merkle_res: GetMerkleRes) -> Result<bool, InternalError>
{
let mut index = merkle_res.pos;
let mut cur = txid.to_raw_hash();
for mut bytes in merkle_res.merkle {
bytes.reverse();
let next_hash = Sha256d::from_slice(&bytes).unwrap();
let (left, right) = if index % 2 == 0 {
(cur, next_hash)
} else {
(next_hash, cur)
};
let data = [&left[..], &right[..]].concat();
cur = Sha256d::hash(&data);
index /= 2;
}
Ok(cur == merkle_root.to_raw_hash())
}
}
impl<L: Deref> Filter for ElectrumSyncClient<L>
where
L::Target: Logger,
{
fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.transactions.insert(*txid);
}
fn register_output(&self, output: WatchedOutput) {
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
}
}