mod bitcoind;
mod electrum;
mod esplora;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use bitcoin::{Script, Txid};
use lightning::chain::{BestBlock, Filter};
use lightning_block_sync::gossip::UtxoSource;
use crate::chain::bitcoind::BitcoindChainSource;
use crate::chain::electrum::ElectrumChainSource;
use crate::chain::esplora::EsploraChainSource;
use crate::config::{
BackgroundSyncConfig, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig,
RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS,
};
use crate::fee_estimator::OnchainFeeEstimator;
use crate::io::utils::write_node_metrics;
use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger};
use crate::runtime::Runtime;
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, NodeMetrics};
pub(crate) enum WalletSyncStatus {
Completed,
InProgress { subscribers: tokio::sync::broadcast::Sender<Result<(), Error>> },
}
impl WalletSyncStatus {
fn register_or_subscribe_pending_sync(
&mut self,
) -> Option<tokio::sync::broadcast::Receiver<Result<(), Error>>> {
match self {
WalletSyncStatus::Completed => {
let (tx, _) = tokio::sync::broadcast::channel(1);
*self = WalletSyncStatus::InProgress { subscribers: tx };
None
},
WalletSyncStatus::InProgress { subscribers } => {
let rx = subscribers.subscribe();
Some(rx)
},
}
}
fn propagate_result_to_subscribers(&mut self, res: Result<(), Error>) {
{
match self {
WalletSyncStatus::Completed => {
return;
},
WalletSyncStatus::InProgress { subscribers } => {
if subscribers.receiver_count() > 0 {
match subscribers.send(res) {
Ok(_) => (),
Err(e) => {
debug_assert!(
false,
"Failed to send wallet sync result to subscribers: {:?}",
e
);
},
}
}
*self = WalletSyncStatus::Completed;
},
}
}
}
}
pub(crate) struct ChainSource {
kind: ChainSourceKind,
tx_broadcaster: Arc<Broadcaster>,
logger: Arc<Logger>,
}
enum ChainSourceKind {
Esplora(EsploraChainSource),
Electrum(ElectrumChainSource),
Bitcoind(BitcoindChainSource),
}
impl ChainSource {
pub(crate) fn new_esplora(
server_url: String, headers: HashMap<String, String>, sync_config: EsploraSyncConfig,
fee_estimator: Arc<OnchainFeeEstimator>, tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
) -> (Self, Option<BestBlock>) {
let esplora_chain_source = EsploraChainSource::new(
server_url,
headers,
sync_config,
fee_estimator,
kv_store,
config,
Arc::clone(&logger),
node_metrics,
);
let kind = ChainSourceKind::Esplora(esplora_chain_source);
(Self { kind, tx_broadcaster, logger }, None)
}
pub(crate) fn new_electrum(
server_url: String, sync_config: ElectrumSyncConfig,
fee_estimator: Arc<OnchainFeeEstimator>, tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
) -> (Self, Option<BestBlock>) {
let electrum_chain_source = ElectrumChainSource::new(
server_url,
sync_config,
fee_estimator,
kv_store,
config,
Arc::clone(&logger),
node_metrics,
);
let kind = ChainSourceKind::Electrum(electrum_chain_source);
(Self { kind, tx_broadcaster, logger }, None)
}
pub(crate) async fn new_bitcoind_rpc(
rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String,
fee_estimator: Arc<OnchainFeeEstimator>, tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
) -> (Self, Option<BestBlock>) {
let bitcoind_chain_source = BitcoindChainSource::new_rpc(
rpc_host,
rpc_port,
rpc_user,
rpc_password,
fee_estimator,
kv_store,
config,
Arc::clone(&logger),
node_metrics,
);
let best_block = bitcoind_chain_source.poll_best_block().await.ok();
let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source);
(Self { kind, tx_broadcaster, logger }, best_block)
}
pub(crate) async fn new_bitcoind_rest(
rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String,
fee_estimator: Arc<OnchainFeeEstimator>, tx_broadcaster: Arc<Broadcaster>,
kv_store: Arc<DynStore>, config: Arc<Config>, rest_client_config: BitcoindRestClientConfig,
logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
) -> (Self, Option<BestBlock>) {
let bitcoind_chain_source = BitcoindChainSource::new_rest(
rpc_host,
rpc_port,
rpc_user,
rpc_password,
fee_estimator,
kv_store,
config,
rest_client_config,
Arc::clone(&logger),
node_metrics,
);
let best_block = bitcoind_chain_source.poll_best_block().await.ok();
let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source);
(Self { kind, tx_broadcaster, logger }, best_block)
}
pub(crate) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
match &self.kind {
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.start(runtime)?
},
_ => {
},
}
Ok(())
}
pub(crate) fn stop(&self) {
match &self.kind {
ChainSourceKind::Electrum(electrum_chain_source) => electrum_chain_source.stop(),
_ => {
},
}
}
pub(crate) fn as_utxo_source(&self) -> Option<Arc<dyn UtxoSource>> {
match &self.kind {
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
Some(bitcoind_chain_source.as_utxo_source())
},
_ => None,
}
}
pub(crate) fn is_transaction_based(&self) -> bool {
match &self.kind {
ChainSourceKind::Esplora(_) => true,
ChainSourceKind::Electrum { .. } => true,
ChainSourceKind::Bitcoind { .. } => false,
}
}
pub(crate) async fn continuously_sync_wallets(
&self, stop_sync_receiver: tokio::sync::watch::Receiver<()>, onchain_wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
output_sweeper: Arc<Sweeper>,
) {
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
if let Some(background_sync_config) =
esplora_chain_source.sync_config.background_sync_config.as_ref()
{
self.start_tx_based_sync_loop(
stop_sync_receiver,
onchain_wallet,
channel_manager,
chain_monitor,
output_sweeper,
background_sync_config,
Arc::clone(&self.logger),
)
.await
} else {
log_info!(
self.logger,
"Background syncing is disabled. Manual syncing required for onchain wallet, lightning wallet, and fee rate updates.",
);
return;
}
},
ChainSourceKind::Electrum(electrum_chain_source) => {
if let Some(background_sync_config) =
electrum_chain_source.sync_config.background_sync_config.as_ref()
{
self.start_tx_based_sync_loop(
stop_sync_receiver,
onchain_wallet,
channel_manager,
chain_monitor,
output_sweeper,
background_sync_config,
Arc::clone(&self.logger),
)
.await
} else {
log_info!(
self.logger,
"Background syncing is disabled. Manual syncing required for onchain wallet, lightning wallet, and fee rate updates.",
);
return;
}
},
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
bitcoind_chain_source
.continuously_sync_wallets(
stop_sync_receiver,
onchain_wallet,
channel_manager,
chain_monitor,
output_sweeper,
)
.await
},
}
}
async fn start_tx_based_sync_loop(
&self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>,
onchain_wallet: Arc<Wallet>, channel_manager: Arc<ChannelManager>,
chain_monitor: Arc<ChainMonitor>, output_sweeper: Arc<Sweeper>,
background_sync_config: &BackgroundSyncConfig, logger: Arc<Logger>,
) {
let onchain_wallet_sync_interval_secs = background_sync_config
.onchain_wallet_sync_interval_secs
.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
let mut onchain_wallet_sync_interval =
tokio::time::interval(Duration::from_secs(onchain_wallet_sync_interval_secs));
onchain_wallet_sync_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let fee_rate_cache_update_interval_secs = background_sync_config
.fee_rate_cache_update_interval_secs
.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
let mut fee_rate_update_interval =
tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs));
fee_rate_update_interval.reset();
fee_rate_update_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let lightning_wallet_sync_interval_secs = background_sync_config
.lightning_wallet_sync_interval_secs
.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
let mut lightning_wallet_sync_interval =
tokio::time::interval(Duration::from_secs(lightning_wallet_sync_interval_secs));
lightning_wallet_sync_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync_receiver.changed() => {
log_trace!(
logger,
"Stopping background syncing on-chain wallet.",
);
return;
}
_ = onchain_wallet_sync_interval.tick() => {
let _ = self.sync_onchain_wallet(Arc::clone(&onchain_wallet)).await;
}
_ = fee_rate_update_interval.tick() => {
let _ = self.update_fee_rate_estimates().await;
}
_ = lightning_wallet_sync_interval.tick() => {
let _ = self.sync_lightning_wallet(
Arc::clone(&channel_manager),
Arc::clone(&chain_monitor),
Arc::clone(&output_sweeper),
).await;
}
}
}
}
pub(crate) async fn sync_onchain_wallet(
&self, onchain_wallet: Arc<Wallet>,
) -> Result<(), Error> {
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source.sync_onchain_wallet(onchain_wallet).await
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.sync_onchain_wallet(onchain_wallet).await
},
ChainSourceKind::Bitcoind { .. } => {
unreachable!("Onchain wallet will be synced via chain polling")
},
}
}
pub(crate) async fn sync_lightning_wallet(
&self, channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
output_sweeper: Arc<Sweeper>,
) -> Result<(), Error> {
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source
.sync_lightning_wallet(channel_manager, chain_monitor, output_sweeper)
.await
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source
.sync_lightning_wallet(channel_manager, chain_monitor, output_sweeper)
.await
},
ChainSourceKind::Bitcoind { .. } => {
unreachable!("Lightning wallet will be synced via chain polling")
},
}
}
pub(crate) async fn poll_and_update_listeners(
&self, onchain_wallet: Arc<Wallet>, channel_manager: Arc<ChannelManager>,
chain_monitor: Arc<ChainMonitor>, output_sweeper: Arc<Sweeper>,
) -> Result<(), Error> {
match &self.kind {
ChainSourceKind::Esplora { .. } => {
unreachable!("Listeners will be synced via transction-based syncing")
},
ChainSourceKind::Electrum { .. } => {
unreachable!("Listeners will be synced via transction-based syncing")
},
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
bitcoind_chain_source
.poll_and_update_listeners(
onchain_wallet,
channel_manager,
chain_monitor,
output_sweeper,
)
.await
},
}
}
pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> {
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source.update_fee_rate_estimates().await
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.update_fee_rate_estimates().await
},
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
bitcoind_chain_source.update_fee_rate_estimates().await
},
}
}
pub(crate) async fn continuously_process_broadcast_queue(
&self, mut stop_tx_bcast_receiver: tokio::sync::watch::Receiver<()>,
) {
let mut receiver = self.tx_broadcaster.get_broadcast_queue().await;
loop {
let tx_bcast_logger = Arc::clone(&self.logger);
tokio::select! {
_ = stop_tx_bcast_receiver.changed() => {
log_debug!(
tx_bcast_logger,
"Stopping broadcasting transactions.",
);
return;
}
Some(next_package) = receiver.recv() => {
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source.process_broadcast_package(next_package).await
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.process_broadcast_package(next_package).await
},
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
bitcoind_chain_source.process_broadcast_package(next_package).await
},
}
}
}
}
}
}
impl Filter for ChainSource {
fn register_tx(&self, txid: &Txid, script_pubkey: &Script) {
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source.register_tx(txid, script_pubkey)
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.register_tx(txid, script_pubkey)
},
ChainSourceKind::Bitcoind { .. } => (),
}
}
fn register_output(&self, output: lightning::chain::WatchedOutput) {
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source.register_output(output)
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.register_output(output)
},
ChainSourceKind::Bitcoind { .. } => (),
}
}
}
fn periodically_archive_fully_resolved_monitors(
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
kv_store: Arc<DynStore>, logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Result<(), Error> {
let mut locked_node_metrics = node_metrics.write().unwrap();
let cur_height = channel_manager.current_best_block().height;
let should_archive = locked_node_metrics
.latest_channel_monitor_archival_height
.as_ref()
.map_or(true, |h| cur_height >= h + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL);
if should_archive {
chain_monitor.archive_fully_resolved_channel_monitors();
locked_node_metrics.latest_channel_monitor_archival_height = Some(cur_height);
write_node_metrics(&*locked_node_metrics, kv_store, logger)?;
}
Ok(())
}