#![crate_name = "ldk_node"]
#![cfg_attr(not(feature = "uniffi"), deny(missing_docs))]
#![deny(rustdoc::broken_intra_doc_links)]
#![deny(rustdoc::private_intra_doc_links)]
#![allow(bare_trait_objects)]
#![allow(ellipsis_inclusive_range_patterns)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
mod balance;
mod builder;
mod config;
mod connection;
mod error;
mod event;
mod fee_estimator;
mod gossip;
pub mod graph;
mod hex_utils;
pub mod io;
mod liquidity;
mod logger;
mod message_handler;
pub mod payment;
mod peer_store;
mod sweep;
mod tx_broadcaster;
mod types;
#[cfg(feature = "uniffi")]
mod uniffi_types;
mod wallet;
pub use bip39;
pub use bitcoin;
pub use lightning;
pub use lightning_invoice;
pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance};
pub use config::{default_config, AnchorChannelsConfig, Config};
pub use error::Error as NodeError;
use error::Error;
pub use event::Event;
pub use types::ChannelConfig;
pub use io::utils::generate_entropy_mnemonic;
#[cfg(feature = "uniffi")]
use uniffi_types::*;
#[cfg(feature = "uniffi")]
pub use builder::ArcedNodeBuilder as Builder;
pub use builder::BuildError;
#[cfg(not(feature = "uniffi"))]
pub use builder::NodeBuilder as Builder;
use config::{
default_user_config, LDK_WALLET_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL,
PEER_RECONNECTION_INTERVAL, RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, RGS_SYNC_INTERVAL,
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
};
use connection::ConnectionManager;
use event::{EventHandler, EventQueue};
use gossip::GossipSource;
use graph::NetworkGraph;
use liquidity::LiquiditySource;
use payment::store::PaymentStore;
use payment::{Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment};
use peer_store::{PeerInfo, PeerStore};
use types::{
Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, FeeEstimator,
Graph, KeysManager, PeerManager, Router, Scorer, Sweeper, Wallet,
};
pub use types::{ChannelDetails, PeerDetails, UserChannelId};
use logger::{log_error, log_info, log_trace, FilesystemLogger, Logger};
use lightning::chain::{BestBlock, Confirm};
use lightning::events::bump_transaction::Wallet as LdkWallet;
use lightning::ln::channelmanager::{ChannelShutdownState, PaymentId};
use lightning::ln::msgs::SocketAddress;
pub use lightning::util::logger::Level as LogLevel;
use lightning_background_processor::process_events_async;
use lightning_transaction_sync::EsploraSyncClient;
use bitcoin::secp256k1::PublicKey;
use rand::Rng;
use std::default::Default;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(feature = "uniffi")]
uniffi::include_scaffolding!("ldk_node");
pub struct Node {
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
stop_sender: tokio::sync::watch::Sender<()>,
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
config: Arc<Config>,
wallet: Arc<Wallet>,
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
tx_broadcaster: Arc<Broadcaster>,
fee_estimator: Arc<FeeEstimator>,
event_queue: Arc<EventQueue<Arc<FilesystemLogger>>>,
channel_manager: Arc<ChannelManager>,
chain_monitor: Arc<ChainMonitor>,
output_sweeper: Arc<Sweeper>,
peer_manager: Arc<PeerManager>,
connection_manager: Arc<ConnectionManager<Arc<FilesystemLogger>>>,
keys_manager: Arc<KeysManager>,
network_graph: Arc<Graph>,
gossip_source: Arc<GossipSource>,
liquidity_source: Option<Arc<LiquiditySource<Arc<FilesystemLogger>>>>,
kv_store: Arc<DynStore>,
logger: Arc<FilesystemLogger>,
_router: Arc<Router>,
scorer: Arc<Mutex<Scorer>>,
peer_store: Arc<PeerStore<Arc<FilesystemLogger>>>,
payment_store: Arc<PaymentStore<Arc<FilesystemLogger>>>,
is_listening: Arc<AtomicBool>,
latest_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_onchain_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
latest_fee_rate_cache_update_timestamp: Arc<RwLock<Option<u64>>>,
latest_rgs_snapshot_timestamp: Arc<RwLock<Option<u64>>>,
latest_node_announcement_broadcast_timestamp: Arc<RwLock<Option<u64>>>,
latest_channel_monitor_archival_height: Arc<RwLock<Option<u32>>>,
}
impl Node {
pub fn start(&self) -> Result<(), Error> {
let mut runtime_lock = self.runtime.write().unwrap();
if runtime_lock.is_some() {
return Err(Error::AlreadyRunning);
}
log_info!(
self.logger,
"Starting up LDK Node with node ID {} on network: {}",
self.node_id(),
self.config.network
);
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let fee_estimator = Arc::clone(&self.fee_estimator);
let sync_logger = Arc::clone(&self.logger);
let sync_fee_rate_update_timestamp =
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let runtime_ref = &runtime;
tokio::task::block_in_place(move || {
runtime_ref.block_on(async move {
let now = Instant::now();
match fee_estimator.update_fee_estimates().await {
Ok(()) => {
log_info!(
sync_logger,
"Initial fee rate cache update finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt;
Ok(())
},
Err(e) => {
log_error!(sync_logger, "Initial fee rate cache update failed: {}", e,);
Err(e)
},
}
})
})?;
let wallet = Arc::clone(&self.wallet);
let sync_logger = Arc::clone(&self.logger);
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
let mut stop_sync = self.stop_sender.subscribe();
let onchain_wallet_sync_interval_secs = self
.config
.onchain_wallet_sync_interval_secs
.max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS);
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
async move {
let mut onchain_wallet_sync_interval = tokio::time::interval(
Duration::from_secs(onchain_wallet_sync_interval_secs),
);
onchain_wallet_sync_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
sync_logger,
"Stopping background syncing on-chain wallet.",
);
return;
}
_ = onchain_wallet_sync_interval.tick() => {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
sync_logger,
"Background sync of on-chain wallet failed: {}",
err
)
}
}
}
}
}
},
);
});
let mut stop_fee_updates = self.stop_sender.subscribe();
let fee_update_logger = Arc::clone(&self.logger);
let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let fee_estimator = Arc::clone(&self.fee_estimator);
let fee_rate_cache_update_interval_secs =
self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
let mut fee_rate_update_interval =
tokio::time::interval(Duration::from_secs(fee_rate_cache_update_interval_secs));
fee_rate_update_interval.reset();
fee_rate_update_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_fee_updates.changed() => {
log_trace!(
fee_update_logger,
"Stopping background updates of fee rate cache.",
);
return;
}
_ = fee_rate_update_interval.tick() => {
let now = Instant::now();
match fee_estimator.update_fee_estimates().await {
Ok(()) => {
log_trace!(
fee_update_logger,
"Background update of fee rate cache finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*fee_update_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
fee_update_logger,
"Background update of fee rate cache failed: {}",
err
)
}
}
}
}
}
});
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let archive_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let archive_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
let sync_monitor_archival_height = Arc::clone(&self.latest_channel_monitor_archival_height);
let mut stop_sync = self.stop_sender.subscribe();
let wallet_sync_interval_secs =
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
runtime.spawn(async move {
let mut wallet_sync_interval =
tokio::time::interval(Duration::from_secs(wallet_sync_interval_secs));
wallet_sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
sync_logger,
"Stopping background syncing Lightning wallet.",
);
return;
}
_ = wallet_sync_interval.tick() => {
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];
let now = Instant::now();
let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables));
match timeout_fut.await {
Ok(res) => match res {
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
periodically_archive_fully_resolved_monitors(
Arc::clone(&archive_cman),
Arc::clone(&archive_cmon),
Arc::clone(&sync_monitor_archival_height)
);
}
Err(e) => {
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
}
}
Err(e) => {
log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e)
}
}
}
}
}
});
if self.gossip_source.is_rgs() {
let gossip_source = Arc::clone(&self.gossip_source);
let gossip_sync_store = Arc::clone(&self.kv_store);
let gossip_sync_logger = Arc::clone(&self.logger);
let gossip_rgs_sync_timestamp = Arc::clone(&self.latest_rgs_snapshot_timestamp);
let mut stop_gossip_sync = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
loop {
tokio::select! {
_ = stop_gossip_sync.changed() => {
log_trace!(
gossip_sync_logger,
"Stopping background syncing RGS gossip data.",
);
return;
}
_ = interval.tick() => {
let gossip_sync_logger = Arc::clone(&gossip_sync_logger);
let now = Instant::now();
match gossip_source.update_rgs_snapshot().await {
Ok(updated_timestamp) => {
log_trace!(
gossip_sync_logger,
"Background sync of RGS gossip data finished in {}ms.",
now.elapsed().as_millis()
);
io::utils::write_latest_rgs_sync_timestamp(
updated_timestamp,
Arc::clone(&gossip_sync_store),
Arc::clone(&gossip_sync_logger),
)
.unwrap_or_else(|e| {
log_error!(gossip_sync_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
*gossip_rgs_sync_timestamp.write().unwrap() = Some(updated_timestamp as u64);
}
Err(e) => log_error!(
gossip_sync_logger,
"Background sync of RGS gossip data failed: {}",
e
),
}
}
}
}
});
}
if let Some(listening_addresses) = &self.config.listening_addresses {
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let mut stop_listen = self.stop_sender.subscribe();
let listening_logger = Arc::clone(&self.logger);
let listening_indicator = Arc::clone(&self.is_listening);
let mut bind_addrs = Vec::with_capacity(listening_addresses.len());
for listening_addr in listening_addresses {
let resolved_address = listening_addr.to_socket_addrs().map_err(|e| {
log_error!(
self.logger,
"Unable to resolve listening address: {:?}. Error details: {}",
listening_addr,
e,
);
Error::InvalidSocketAddress
})?;
bind_addrs.extend(resolved_address);
}
runtime.spawn(async move {
{
let listener =
tokio::net::TcpListener::bind(&*bind_addrs).await
.unwrap_or_else(|e| {
log_error!(listening_logger, "Failed to bind to listen addresses/ports - is something else already listening on it?: {}", e);
panic!(
"Failed to bind to listen address/port - is something else already listening on it?",
);
});
listening_indicator.store(true, Ordering::Release);
loop {
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
tokio::select! {
_ = stop_listen.changed() => {
log_trace!(
listening_logger,
"Stopping listening to inbound connections.",
);
break;
}
res = listener.accept() => {
let tcp_stream = res.unwrap().0;
tokio::spawn(async move {
lightning_net_tokio::setup_inbound(
Arc::clone(&peer_mgr),
tcp_stream.into_std().unwrap(),
)
.await;
});
}
}
}
}
listening_indicator.store(false, Ordering::Release);
});
}
let connect_cm = Arc::clone(&self.connection_manager);
let connect_pm = Arc::clone(&self.peer_manager);
let connect_logger = Arc::clone(&self.logger);
let connect_peer_store = Arc::clone(&self.peer_store);
let mut stop_connect = self.stop_sender.subscribe();
runtime.spawn(async move {
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_connect.changed() => {
log_trace!(
connect_logger,
"Stopping reconnecting known peers.",
);
return;
}
_ = interval.tick() => {
let pm_peers = connect_pm
.list_peers()
.iter()
.map(|peer| peer.counterparty_node_id)
.collect::<Vec<_>>();
for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
let res = connect_cm.do_connect_peer(
peer_info.node_id,
peer_info.address.clone(),
).await;
match res {
Ok(_) => {
log_info!(connect_logger, "Successfully reconnected to peer {}", peer_info.node_id);
},
Err(e) => {
log_error!(connect_logger, "Failed to reconnect to peer {}: {}", peer_info.node_id, e);
}
}
}
}
}
}
});
let bcast_cm = Arc::clone(&self.channel_manager);
let bcast_pm = Arc::clone(&self.peer_manager);
let bcast_config = Arc::clone(&self.config);
let bcast_store = Arc::clone(&self.kv_store);
let bcast_logger = Arc::clone(&self.logger);
let bcast_ann_timestamp = Arc::clone(&self.latest_node_announcement_broadcast_timestamp);
let mut stop_bcast = self.stop_sender.subscribe();
runtime.spawn(async move {
#[cfg(not(test))]
let mut interval = tokio::time::interval(Duration::from_secs(30));
#[cfg(test)]
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = stop_bcast.changed() => {
log_trace!(
bcast_logger,
"Stopping broadcasting node announcements.",
);
return;
}
_ = interval.tick() => {
let skip_broadcast = match io::utils::read_latest_node_ann_bcast_timestamp(Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) {
Ok(latest_bcast_time_secs) => {
let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL;
next_bcast_unix_time.elapsed().is_err()
}
Err(_) => {
false
}
};
if skip_broadcast {
continue;
}
if !bcast_cm.list_channels().iter().any(|chan| chan.is_public && chan.is_channel_ready) {
continue;
}
if bcast_pm.list_peers().is_empty() {
continue;
}
let addresses = bcast_config.listening_addresses.clone().unwrap_or(Vec::new());
if addresses.is_empty() {
continue;
}
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*bcast_ann_timestamp.write().unwrap() = unix_time_secs_opt;
if let Some(unix_time_secs) = unix_time_secs_opt {
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
.unwrap_or_else(|e| {
log_error!(bcast_logger, "Persistence failed: {}", e);
panic!("Persistence failed");
});
}
}
}
}
});
let mut stop_tx_bcast = self.stop_sender.subscribe();
let tx_bcaster = Arc::clone(&self.tx_broadcaster);
let tx_bcast_logger = Arc::clone(&self.logger);
runtime.spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_tx_bcast.changed() => {
log_trace!(
tx_bcast_logger,
"Stopping broadcasting transactions.",
);
return;
}
_ = interval.tick() => {
tx_bcaster.process_queue().await;
}
}
}
});
let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new(
Arc::clone(&self.tx_broadcaster),
Arc::new(LdkWallet::new(Arc::clone(&self.wallet), Arc::clone(&self.logger))),
Arc::clone(&self.keys_manager),
Arc::clone(&self.logger),
));
let event_handler = Arc::new(EventHandler::new(
Arc::clone(&self.event_queue),
Arc::clone(&self.wallet),
bump_tx_event_handler,
Arc::clone(&self.channel_manager),
Arc::clone(&self.connection_manager),
Arc::clone(&self.output_sweeper),
Arc::clone(&self.network_graph),
Arc::clone(&self.payment_store),
Arc::clone(&self.peer_store),
Arc::clone(&self.runtime),
Arc::clone(&self.logger),
Arc::clone(&self.config),
));
let background_persister = Arc::clone(&self.kv_store);
let background_event_handler = Arc::clone(&event_handler);
let background_chain_mon = Arc::clone(&self.chain_monitor);
let background_chan_man = Arc::clone(&self.channel_manager);
let background_gossip_sync = self.gossip_source.as_gossip_sync();
let background_peer_man = Arc::clone(&self.peer_manager);
let background_logger = Arc::clone(&self.logger);
let background_error_logger = Arc::clone(&self.logger);
let background_scorer = Arc::clone(&self.scorer);
let stop_bp = self.stop_sender.subscribe();
let sleeper_logger = Arc::clone(&self.logger);
let sleeper = move |d| {
let mut stop = stop_bp.clone();
let sleeper_logger = Arc::clone(&sleeper_logger);
Box::pin(async move {
tokio::select! {
_ = stop.changed() => {
log_trace!(
sleeper_logger,
"Stopping processing events.",
);
true
}
_ = tokio::time::sleep(d) => {
false
}
}
})
};
let background_stop_logger = Arc::clone(&self.logger);
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
runtime.spawn(async move {
process_events_async(
background_persister,
|e| background_event_handler.handle_event(e),
background_chain_mon,
background_chan_man,
background_gossip_sync,
background_peer_man,
background_logger,
Some(background_scorer),
sleeper,
true,
|| Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()),
)
.await
.unwrap_or_else(|e| {
log_error!(background_error_logger, "Failed to process events: {}", e);
panic!("Failed to process events");
});
log_trace!(background_stop_logger, "Events processing stopped.",);
match event_handling_stopped_sender.send(()) {
Ok(_) => (),
Err(e) => {
log_error!(
background_stop_logger,
"Failed to send 'events handling stopped' signal. This should never happen: {}",
e
);
debug_assert!(false);
},
}
});
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
let mut stop_liquidity_handler = self.stop_sender.subscribe();
let liquidity_handler = Arc::clone(&liquidity_source);
let liquidity_logger = Arc::clone(&self.logger);
runtime.spawn(async move {
loop {
tokio::select! {
_ = stop_liquidity_handler.changed() => {
log_trace!(
liquidity_logger,
"Stopping processing liquidity events.",
);
return;
}
_ = liquidity_handler.handle_next_event() => {}
}
}
});
}
*runtime_lock = Some(runtime);
log_info!(self.logger, "Startup complete.");
Ok(())
}
pub fn stop(&self) -> Result<(), Error> {
let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?;
log_info!(self.logger, "Shutting down LDK Node with node ID {}...", self.node_id());
match self.stop_sender.send(()) {
Ok(_) => (),
Err(e) => {
log_error!(
self.logger,
"Failed to send shutdown signal. This should never happen: {}",
e
);
debug_assert!(false);
},
}
self.peer_manager.disconnect_all_peers();
let event_handling_stopped_logger = Arc::clone(&self.logger);
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
let timeout_res = runtime.block_on(async {
tokio::time::timeout(
Duration::from_secs(100),
event_handling_stopped_receiver.changed(),
)
.await
});
match timeout_res {
Ok(stop_res) => match stop_res {
Ok(()) => {},
Err(e) => {
log_error!(
event_handling_stopped_logger,
"Stopping event handling failed. This should never happen: {}",
e
);
panic!("Stopping event handling failed. This should never happen.");
},
},
Err(e) => {
log_error!(
event_handling_stopped_logger,
"Stopping event handling timed out: {}",
e
);
},
}
#[cfg(tokio_unstable)]
{
log_trace!(
self.logger,
"Active runtime tasks left prior to shutdown: {}",
runtime.metrics().active_tasks_count()
);
}
runtime.shutdown_timeout(Duration::from_secs(10));
log_info!(self.logger, "Shutdown complete.");
Ok(())
}
pub fn status(&self) -> NodeStatus {
let is_running = self.runtime.read().unwrap().is_some();
let is_listening = self.is_listening.load(Ordering::Acquire);
let current_best_block = self.channel_manager.current_best_block().into();
let latest_wallet_sync_timestamp = *self.latest_wallet_sync_timestamp.read().unwrap();
let latest_onchain_wallet_sync_timestamp =
*self.latest_onchain_wallet_sync_timestamp.read().unwrap();
let latest_fee_rate_cache_update_timestamp =
*self.latest_fee_rate_cache_update_timestamp.read().unwrap();
let latest_rgs_snapshot_timestamp = *self.latest_rgs_snapshot_timestamp.read().unwrap();
let latest_node_announcement_broadcast_timestamp =
*self.latest_node_announcement_broadcast_timestamp.read().unwrap();
NodeStatus {
is_running,
is_listening,
current_best_block,
latest_wallet_sync_timestamp,
latest_onchain_wallet_sync_timestamp,
latest_fee_rate_cache_update_timestamp,
latest_rgs_snapshot_timestamp,
latest_node_announcement_broadcast_timestamp,
}
}
pub fn config(&self) -> Config {
self.config.as_ref().clone()
}
pub fn next_event(&self) -> Option<Event> {
self.event_queue.next_event()
}
pub async fn next_event_async(&self) -> Event {
self.event_queue.next_event_async().await
}
pub fn wait_next_event(&self) -> Event {
self.event_queue.wait_next_event()
}
pub fn event_handled(&self) {
self.event_queue.event_handled().unwrap_or_else(|e| {
log_error!(
self.logger,
"Couldn't mark event handled due to persistence failure: {}",
e
);
panic!("Couldn't mark event handled due to persistence failure");
});
}
pub fn node_id(&self) -> PublicKey {
self.channel_manager.get_our_node_id()
}
pub fn listening_addresses(&self) -> Option<Vec<SocketAddress>> {
self.config.listening_addresses.clone()
}
#[cfg(not(feature = "uniffi"))]
pub fn bolt11_payment(&self) -> Bolt11Payment {
Bolt11Payment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.channel_manager),
Arc::clone(&self.connection_manager),
Arc::clone(&self.keys_manager),
self.liquidity_source.clone(),
Arc::clone(&self.payment_store),
Arc::clone(&self.peer_store),
Arc::clone(&self.config),
Arc::clone(&self.logger),
)
}
#[cfg(feature = "uniffi")]
pub fn bolt11_payment(&self) -> Arc<Bolt11Payment> {
Arc::new(Bolt11Payment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.channel_manager),
Arc::clone(&self.connection_manager),
Arc::clone(&self.keys_manager),
self.liquidity_source.clone(),
Arc::clone(&self.payment_store),
Arc::clone(&self.peer_store),
Arc::clone(&self.config),
Arc::clone(&self.logger),
))
}
#[cfg(not(feature = "uniffi"))]
pub fn bolt12_payment(&self) -> Arc<Bolt12Payment> {
Arc::new(Bolt12Payment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.channel_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.logger),
))
}
#[cfg(feature = "uniffi")]
pub fn bolt12_payment(&self) -> Arc<Bolt12Payment> {
Arc::new(Bolt12Payment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.channel_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.logger),
))
}
#[cfg(not(feature = "uniffi"))]
pub fn spontaneous_payment(&self) -> SpontaneousPayment {
SpontaneousPayment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.channel_manager),
Arc::clone(&self.keys_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.config),
Arc::clone(&self.logger),
)
}
#[cfg(feature = "uniffi")]
pub fn spontaneous_payment(&self) -> Arc<SpontaneousPayment> {
Arc::new(SpontaneousPayment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.channel_manager),
Arc::clone(&self.keys_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.config),
Arc::clone(&self.logger),
))
}
#[cfg(not(feature = "uniffi"))]
pub fn onchain_payment(&self) -> OnchainPayment {
OnchainPayment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.wallet),
Arc::clone(&self.channel_manager),
Arc::clone(&self.config),
Arc::clone(&self.logger),
)
}
#[cfg(feature = "uniffi")]
pub fn onchain_payment(&self) -> Arc<OnchainPayment> {
Arc::new(OnchainPayment::new(
Arc::clone(&self.runtime),
Arc::clone(&self.wallet),
Arc::clone(&self.channel_manager),
Arc::clone(&self.config),
Arc::clone(&self.logger),
))
}
pub fn list_channels(&self) -> Vec<ChannelDetails> {
self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect()
}
pub fn connect(
&self, node_id: PublicKey, address: SocketAddress, persist: bool,
) -> Result<(), Error> {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
return Err(Error::NotRunning);
}
let runtime = rt_lock.as_ref().unwrap();
let peer_info = PeerInfo { node_id, address };
let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_cm = Arc::clone(&self.connection_manager);
tokio::task::block_in_place(move || {
runtime.block_on(async move {
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})
})?;
log_info!(self.logger, "Connected to peer {}@{}. ", peer_info.node_id, peer_info.address);
if persist {
self.peer_store.add_peer(peer_info)?;
}
Ok(())
}
pub fn disconnect(&self, counterparty_node_id: PublicKey) -> Result<(), Error> {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
return Err(Error::NotRunning);
}
log_info!(self.logger, "Disconnecting peer {}..", counterparty_node_id);
match self.peer_store.remove_peer(&counterparty_node_id) {
Ok(()) => {},
Err(e) => {
log_error!(self.logger, "Failed to remove peer {}: {}", counterparty_node_id, e)
},
}
self.peer_manager.disconnect_by_node_id(counterparty_node_id);
Ok(())
}
pub fn connect_open_channel(
&self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64,
push_to_counterparty_msat: Option<u64>, channel_config: Option<Arc<ChannelConfig>>,
announce_channel: bool,
) -> Result<UserChannelId, Error> {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
return Err(Error::NotRunning);
}
let runtime = rt_lock.as_ref().unwrap();
let peer_info = PeerInfo { node_id, address };
let con_node_id = peer_info.node_id;
let con_addr = peer_info.address.clone();
let con_cm = Arc::clone(&self.connection_manager);
let cur_anchor_reserve_sats =
total_anchor_channels_reserve_sats(&self.channel_manager, &self.config);
let spendable_amount_sats =
self.wallet.get_spendable_amount_sats(cur_anchor_reserve_sats).unwrap_or(0);
if spendable_amount_sats < channel_amount_sats {
log_error!(self.logger,
"Unable to create channel due to insufficient funds. Available: {}sats, Required: {}sats",
spendable_amount_sats, channel_amount_sats
);
return Err(Error::InsufficientFunds);
}
tokio::task::block_in_place(move || {
runtime.block_on(async move {
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
})
})?;
let init_features = self
.peer_manager
.peer_by_node_id(&node_id)
.ok_or(Error::ConnectionFailed)?
.init_features;
let required_funds_sats = channel_amount_sats
+ self.config.anchor_channels_config.as_ref().map_or(0, |c| {
if init_features.requires_anchors_zero_fee_htlc_tx()
&& !c.trusted_peers_no_reserve.contains(&node_id)
{
c.per_channel_reserve_sats
} else {
0
}
});
if spendable_amount_sats < required_funds_sats {
log_error!(self.logger,
"Unable to create channel due to insufficient funds. Available: {}sats, Required: {}sats",
spendable_amount_sats, required_funds_sats
);
return Err(Error::InsufficientFunds);
}
let mut user_config = default_user_config(&self.config);
user_config.channel_handshake_config.announced_channel = announce_channel;
user_config.channel_config = (*(channel_config.unwrap_or_default())).clone().into();
if !announce_channel {
user_config
.channel_handshake_config
.max_inbound_htlc_value_in_flight_percent_of_channel = 100;
}
let push_msat = push_to_counterparty_msat.unwrap_or(0);
let user_channel_id: u128 = rand::thread_rng().gen::<u128>();
match self.channel_manager.create_channel(
peer_info.node_id,
channel_amount_sats,
push_msat,
user_channel_id,
None,
Some(user_config),
) {
Ok(_) => {
log_info!(
self.logger,
"Initiated channel creation with peer {}. ",
peer_info.node_id
);
self.peer_store.add_peer(peer_info)?;
Ok(UserChannelId(user_channel_id))
},
Err(e) => {
log_error!(self.logger, "Failed to initiate channel creation: {:?}", e);
Err(Error::ChannelCreationFailed)
},
}
}
pub fn sync_wallets(&self) -> Result<(), Error> {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
return Err(Error::NotRunning);
}
let wallet = Arc::clone(&self.wallet);
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let archive_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let archive_cmon = Arc::clone(&self.chain_monitor);
let fee_estimator = Arc::clone(&self.fee_estimator);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
let sync_fee_rate_update_timestamp =
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
let sync_monitor_archival_height = Arc::clone(&self.latest_channel_monitor_archival_height);
tokio::task::block_in_place(move || {
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(
async move {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_info!(
sync_logger,
"Sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
},
Err(e) => {
log_error!(sync_logger, "Sync of on-chain wallet failed: {}", e);
return Err(e);
},
};
let now = Instant::now();
match fee_estimator.update_fee_estimates().await {
Ok(()) => {
log_info!(
sync_logger,
"Fee rate cache update finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt;
},
Err(e) => {
log_error!(sync_logger, "Fee rate cache update failed: {}", e,);
return Err(e);
},
}
let now = Instant::now();
let tx_sync_timeout_fut = tokio::time::timeout(
Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS),
tx_sync.sync(confirmables),
);
match tx_sync_timeout_fut.await {
Ok(res) => match res {
Ok(()) => {
log_info!(
sync_logger,
"Sync of Lightning wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|d| d.as_secs());
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
periodically_archive_fully_resolved_monitors(
archive_cman,
archive_cmon,
sync_monitor_archival_height,
);
Ok(())
},
Err(e) => {
log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e);
Err(e.into())
},
},
Err(e) => {
log_error!(sync_logger, "Sync of Lightning wallet timed out: {}", e);
Err(Error::TxSyncTimeout)
},
}
},
)
})
}
pub fn close_channel(
&self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey,
) -> Result<(), Error> {
self.close_channel_internal(user_channel_id, counterparty_node_id, false)
}
pub fn force_close_channel(
&self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey,
) -> Result<(), Error> {
self.close_channel_internal(user_channel_id, counterparty_node_id, true)
}
fn close_channel_internal(
&self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, force: bool,
) -> Result<(), Error> {
let open_channels =
self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
if let Some(channel_details) =
open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0)
{
if force {
if self.config.anchor_channels_config.as_ref().map_or(false, |acc| {
acc.trusted_peers_no_reserve.contains(&counterparty_node_id)
}) {
self.channel_manager
.force_close_without_broadcasting_txn(
&channel_details.channel_id,
&counterparty_node_id,
)
.map_err(|e| {
log_error!(
self.logger,
"Failed to force-close channel to trusted peer: {:?}",
e
);
Error::ChannelClosingFailed
})?;
} else {
self.channel_manager
.force_close_broadcasting_latest_txn(
&channel_details.channel_id,
&counterparty_node_id,
)
.map_err(|e| {
log_error!(self.logger, "Failed to force-close channel: {:?}", e);
Error::ChannelClosingFailed
})?;
}
} else {
self.channel_manager
.close_channel(&channel_details.channel_id, &counterparty_node_id)
.map_err(|e| {
log_error!(self.logger, "Failed to close channel: {:?}", e);
Error::ChannelClosingFailed
})?;
}
if open_channels.len() == 1 {
self.peer_store.remove_peer(&counterparty_node_id)?;
}
}
Ok(())
}
pub fn update_channel_config(
&self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey,
channel_config: Arc<ChannelConfig>,
) -> Result<(), Error> {
let open_channels =
self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
if let Some(channel_details) =
open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0)
{
self.channel_manager
.update_channel_config(
&counterparty_node_id,
&[channel_details.channel_id],
&(*channel_config).clone().into(),
)
.map_err(|_| Error::ChannelConfigUpdateFailed)
} else {
Err(Error::ChannelConfigUpdateFailed)
}
}
pub fn payment(&self, payment_id: &PaymentId) -> Option<PaymentDetails> {
self.payment_store.get(payment_id)
}
pub fn remove_payment(&self, payment_id: &PaymentId) -> Result<(), Error> {
self.payment_store.remove(&payment_id)
}
pub fn list_balances(&self) -> BalanceDetails {
let cur_anchor_reserve_sats =
total_anchor_channels_reserve_sats(&self.channel_manager, &self.config);
let (total_onchain_balance_sats, spendable_onchain_balance_sats) =
self.wallet.get_balances(cur_anchor_reserve_sats).unwrap_or((0, 0));
let total_anchor_channels_reserve_sats =
std::cmp::min(cur_anchor_reserve_sats, total_onchain_balance_sats);
let mut total_lightning_balance_sats = 0;
let mut lightning_balances = Vec::new();
for (funding_txo, channel_id) in self.chain_monitor.list_monitors() {
match self.chain_monitor.get_monitor(funding_txo) {
Ok(monitor) => {
let counterparty_node_id = monitor.get_counterparty_node_id().unwrap();
for ldk_balance in monitor.get_claimable_balances() {
total_lightning_balance_sats += ldk_balance.claimable_amount_satoshis();
lightning_balances.push(LightningBalance::from_ldk_balance(
channel_id,
counterparty_node_id,
ldk_balance,
));
}
},
Err(()) => {
continue;
},
}
}
let pending_balances_from_channel_closures = self
.output_sweeper
.tracked_spendable_outputs()
.into_iter()
.map(PendingSweepBalance::from_tracked_spendable_output)
.collect();
BalanceDetails {
total_onchain_balance_sats,
spendable_onchain_balance_sats,
total_anchor_channels_reserve_sats,
total_lightning_balance_sats,
lightning_balances,
pending_balances_from_channel_closures,
}
}
pub fn list_payments_with_filter<F: FnMut(&&PaymentDetails) -> bool>(
&self, f: F,
) -> Vec<PaymentDetails> {
self.payment_store.list_filter(f)
}
pub fn list_payments(&self) -> Vec<PaymentDetails> {
self.payment_store.list_filter(|_| true)
}
pub fn list_peers(&self) -> Vec<PeerDetails> {
let mut peers = Vec::new();
let connected_peers = self.peer_manager.list_peers();
let connected_peers_len = connected_peers.len();
for connected_peer in connected_peers {
let node_id = connected_peer.counterparty_node_id;
let stored_peer = self.peer_store.get_peer(&node_id);
let stored_addr_opt = stored_peer.as_ref().map(|p| p.address.clone());
let address = match (connected_peer.socket_address, stored_addr_opt) {
(Some(con_addr), _) => con_addr,
(None, Some(stored_addr)) => stored_addr,
(None, None) => continue,
};
let is_persisted = stored_peer.is_some();
let is_connected = true;
let details = PeerDetails { node_id, address, is_persisted, is_connected };
peers.push(details);
}
for p in self.peer_store.list_peers() {
if peers.iter().take(connected_peers_len).any(|d| d.node_id == p.node_id) {
continue;
}
let details = PeerDetails {
node_id: p.node_id,
address: p.address,
is_persisted: true,
is_connected: false,
};
peers.push(details);
}
peers
}
#[cfg(not(feature = "uniffi"))]
pub fn network_graph(&self) -> NetworkGraph {
NetworkGraph::new(Arc::clone(&self.network_graph))
}
#[cfg(feature = "uniffi")]
pub fn network_graph(&self) -> Arc<NetworkGraph> {
Arc::new(NetworkGraph::new(Arc::clone(&self.network_graph)))
}
pub fn sign_message(&self, msg: &[u8]) -> Result<String, Error> {
self.keys_manager.sign_message(msg)
}
pub fn verify_signature(&self, msg: &[u8], sig: &str, pkey: &PublicKey) -> bool {
self.keys_manager.verify_signature(msg, sig, pkey)
}
}
impl Drop for Node {
fn drop(&mut self) {
let _ = self.stop();
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NodeStatus {
pub is_running: bool,
pub is_listening: bool,
pub current_best_block: BestBlock,
pub latest_wallet_sync_timestamp: Option<u64>,
pub latest_onchain_wallet_sync_timestamp: Option<u64>,
pub latest_fee_rate_cache_update_timestamp: Option<u64>,
pub latest_rgs_snapshot_timestamp: Option<u64>,
pub latest_node_announcement_broadcast_timestamp: Option<u64>,
}
pub(crate) fn total_anchor_channels_reserve_sats(
channel_manager: &ChannelManager, config: &Config,
) -> u64 {
config.anchor_channels_config.as_ref().map_or(0, |anchor_channels_config| {
channel_manager
.list_channels()
.into_iter()
.filter(|c| {
!anchor_channels_config.trusted_peers_no_reserve.contains(&c.counterparty.node_id)
&& c.channel_shutdown_state
.map_or(true, |s| s != ChannelShutdownState::ShutdownComplete)
&& c.channel_type
.as_ref()
.map_or(false, |t| t.requires_anchors_zero_fee_htlc_tx())
})
.count() as u64
* anchor_channels_config.per_channel_reserve_sats
})
}
fn periodically_archive_fully_resolved_monitors(
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
latest_channel_monitor_archival_height: Arc<RwLock<Option<u32>>>,
) {
let mut latest_archival_height_lock = latest_channel_monitor_archival_height.write().unwrap();
let cur_height = channel_manager.current_best_block().height;
let should_archive = latest_archival_height_lock
.as_ref()
.map_or(true, |h| cur_height >= h + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL);
if should_archive {
chain_monitor.archive_fully_resolved_channel_monitors();
*latest_archival_height_lock = Some(cur_height);
}
}