use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use anyhow::Context;
use futures::{FutureExt, StreamExt};
use log::{info, warn};
use tokio::sync::RwLock;
#[cfg(not(feature = "wasm-web"))]
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::Wallet;
use crate::onchain::DaemonizableOnchainWallet;
#[cfg(not(feature = "wasm-web"))]
pub struct DaemonHandle {
shutdown: CancellationToken,
jh: JoinHandle<()>,
}
#[cfg(feature = "wasm-web")]
pub struct DaemonHandle {
shutdown: CancellationToken,
}
impl DaemonHandle {
pub fn stop(&self) {
self.shutdown.cancel();
}
pub async fn stop_wait(self) -> anyhow::Result<()> {
self.stop();
#[cfg(not(feature = "wasm-web"))]
self.jh.await?;
Ok(())
}
}
pub(crate) fn start_daemon(
wallet: Arc<Wallet>,
onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
) -> DaemonHandle {
let shutdown = CancellationToken::new();
let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
#[cfg(not(feature = "wasm-web"))]
{
let jh = crate::utils::spawn(proc.run());
DaemonHandle { shutdown, jh }
}
#[cfg(feature = "wasm-web")]
{
crate::utils::spawn(proc.run());
DaemonHandle { shutdown }
}
}
struct DaemonProcess {
shutdown: CancellationToken,
connected: AtomicBool,
wallet: Arc<Wallet>,
onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
}
impl DaemonProcess {
fn new(
shutdown: CancellationToken,
wallet: Arc<Wallet>,
onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
) -> DaemonProcess {
DaemonProcess {
connected: AtomicBool::new(false),
shutdown,
wallet,
onchain,
}
}
fn fast_interval(&self) -> Duration {
Duration::from_secs(self.wallet.config().daemon_fast_sync_interval_secs)
}
fn slow_interval(&self) -> Duration {
Duration::from_secs(self.wallet.config().daemon_slow_sync_interval_secs)
}
async fn run_lightning_sync(&self) {
if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
warn!("An error occured while checking and claiming pending lightning receives: {e:#}");
}
if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
warn!("An error occured while syncing pending lightning sends: {e:#}");
}
}
async fn run_mailbox_messages_process(&self) {
loop {
let shutdown = self.shutdown.clone();
if self.connected.load(Ordering::Relaxed) {
let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
if let Err(e) = r {
warn!("An error occurred while processing mailbox messages: {e:#}");
}
}
futures::select! {
_ = tokio::time::sleep(self.slow_interval()).fuse() => {},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting mailbox messages process...");
break;
},
}
}
}
async fn run_boards_sync(&self) {
if let Err(e) = self.wallet.sync_pending_boards().await {
warn!("An error occured while syncing pending board: {e:#}");
}
}
async fn run_offboards_sync(&self) {
if let Err(e) = self.wallet.sync_pending_offboards().await {
warn!("An error occured while syncing pending offboards: {e:#}");
}
}
async fn run_fee_rate_update(&self) {
if let Err(e) = self.wallet.chain.update_fee_rates(self.wallet.config.fallback_fee_rate).await {
warn!("An error occured while updating fee rates: {e:#}");
}
}
async fn run_onchain_sync(&self) {
if let Some(onchain) = &self.onchain {
let mut onchain = onchain.write().await;
if let Err(e) = onchain.sync(&self.wallet.chain).await {
warn!("An error occured while syncing onchain: {e:#}");
}
}
}
async fn run_maintenance_refresh_process(&self) {
if let Err(e) = self.wallet.maintenance_refresh().await {
warn!("An error occured while performing maintenance refresh: {e:#}");
}
}
async fn run_exits(&self) {
if let Some(onchain) = &self.onchain {
let mut onchain = onchain.write().await;
let mut exit_lock = self.wallet.exit.write().await;
if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
warn!("An error occurred while syncing exits: {e:#}");
}
if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
warn!("An error occurred while progressing exits: {e:#}");
}
}
}
async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
let mut events = self.wallet.subscribe_round_events().await?;
loop {
futures::select! {
res = events.next().fuse() => {
let event = res.context("events stream broke")?
.context("error on event stream")?;
self.wallet.progress_pending_rounds(Some(&event)).await?;
},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting inner round events process...");
return Ok(());
},
}
}
}
async fn run_round_events_process(&self) {
loop {
if self.connected.load(Ordering::Relaxed) {
if let Err(e) = self.inner_process_pending_rounds().await {
warn!("An error occured while processing pending rounds: {e:#}");
}
}
futures::select! {
_ = tokio::time::sleep(self.slow_interval()).fuse() => {},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting round events process...");
break;
},
}
}
}
async fn run_server_connection_check_process(&self) {
loop {
futures::select! {
_ = tokio::time::sleep(self.fast_interval()).fuse() => {},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting server connection check process...");
break;
},
}
let connected = self.wallet.refresh_server().await.is_ok();
self.connected.store(connected, Ordering::Relaxed);
}
}
async fn run_sync_processes(&self) {
let mut fast_interval = tokio::time::interval(self.fast_interval());
fast_interval.reset();
let mut slow_interval = tokio::time::interval(self.slow_interval());
slow_interval.reset();
loop {
futures::select! {
_ = fast_interval.tick().fuse() => {
if !self.connected.load(Ordering::Relaxed) {
continue;
}
self.run_lightning_sync().await;
fast_interval.reset();
},
_ = slow_interval.tick().fuse() => {
if !self.connected.load(Ordering::Relaxed) {
continue;
}
self.run_fee_rate_update().await;
self.run_boards_sync().await;
self.run_offboards_sync().await;
self.run_maintenance_refresh_process().await;
self.run_onchain_sync().await;
self.run_exits().await;
slow_interval.reset();
},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting sync processes...");
break;
},
}
}
}
pub async fn run(self) {
let connected = self.wallet.server.read().is_some();
self.connected.store(connected, Ordering::Relaxed);
let _ = futures::join!(
self.run_server_connection_check_process(),
self.run_round_events_process(),
self.run_sync_processes(),
self.run_mailbox_messages_process(),
);
info!("Daemon gracefully stopped");
}
}