use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use ark::rounds::RoundEvent;
use futures::{FutureExt, StreamExt};
use log::{info, trace, warn};
use tokio::sync::RwLock;
#[cfg(not(feature = "wasm-web"))]
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::Wallet;
use crate::onchain::DaemonizableOnchainWallet;
#[cfg(not(feature = "wasm-web"))]
pub struct DaemonHandle {
shutdown: CancellationToken,
jh: JoinHandle<()>,
}
#[cfg(feature = "wasm-web")]
pub struct DaemonHandle {
shutdown: CancellationToken,
}
impl DaemonHandle {
pub fn stop(&self) {
self.shutdown.cancel();
}
pub async fn stop_wait(self) -> anyhow::Result<()> {
self.stop();
#[cfg(not(feature = "wasm-web"))]
self.jh.await?;
Ok(())
}
}
pub(crate) fn start_daemon(
wallet: Wallet,
onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
) -> DaemonHandle {
let shutdown = CancellationToken::new();
let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
#[cfg(not(feature = "wasm-web"))]
{
let jh = crate::utils::spawn(proc.run());
DaemonHandle { shutdown, jh }
}
#[cfg(feature = "wasm-web")]
{
crate::utils::spawn(proc.run());
DaemonHandle { shutdown }
}
}
struct DaemonProcess {
shutdown: CancellationToken,
connected: AtomicBool,
wallet: Wallet,
onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
}
impl DaemonProcess {
fn new(
shutdown: CancellationToken,
wallet: Wallet,
onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
) -> DaemonProcess {
DaemonProcess {
connected: AtomicBool::new(false),
shutdown,
wallet,
onchain,
}
}
fn sync_interval(&self) -> Duration {
Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
}
async fn run_mailbox_messages_process(&self) {
loop {
let shutdown = self.shutdown.clone();
if self.connected.load(Ordering::Relaxed) {
let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
if let Err(e) = r {
warn!("An error occurred while processing mailbox messages: {e:#}");
}
}
futures::select! {
_ = tokio::time::sleep(self.sync_interval()).fuse() => {},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting mailbox messages process...");
break;
},
}
}
}
async fn run_boards_sync(&self) {
if let Err(e) = self.wallet.sync_pending_boards().await {
warn!("An error occured while syncing pending board: {e:#}");
}
}
async fn run_offboards_sync(&self) {
if let Err(e) = self.wallet.sync_pending_offboards().await {
warn!("An error occured while syncing pending offboards: {e:#}");
}
}
async fn run_rounds_sync(&self) {
if let Err(e) = self.wallet.sync_pending_rounds().await {
warn!("An error occured while syncing pending rounds: {e:#}");
}
}
async fn run_fee_rate_update(&self) {
if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
warn!("An error occured while updating fee rates: {e:#}");
}
}
async fn run_onchain_sync(&self) {
if let Some(onchain) = &self.onchain {
let mut onchain = onchain.write().await;
if let Err(e) = onchain.sync(self.wallet.chain()).await {
warn!("An error occured while syncing onchain: {e:#}");
}
}
}
async fn run_exits(&self) {
if let Some(onchain) = &self.onchain {
let mut onchain = onchain.write().await;
if let Err(e) = self.wallet.exit_mgr().sync_no_progress(&*onchain).await {
warn!("An error occurred while syncing exits: {e:#}");
}
if let Err(e) = self.wallet.exit_mgr().progress_exits(&self.wallet, &mut *onchain, None).await {
warn!("An error occurred while progressing exits: {e:#}");
}
}
}
async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
match &event {
&RoundEvent::Attempt(attempt) => {
if attempt.attempt_seq == 0 {
match self.wallet.maybe_schedule_maintenance_refresh().await {
Ok(_) => {},
Err(err) => warn!("Failed to schedule maintenance refresh: {:?}", err),
}
};
},
_ => {},
};
self.wallet.progress_pending_rounds(Some(event)).await
}
async fn process_round_event_stream(&self) -> anyhow::Result<()> {
let mut events = self.wallet.subscribe_round_events().await?;
self.connected.store(true, Ordering::Relaxed);
loop {
futures::select! {
res = events.next().fuse() => {
match res {
Some(Ok(event)) => {
if let Err(e) = self.handle_round_event(&event).await {
warn!("Error processing round event: {e:#}");
}
},
Some(Err(e)) => {
return Err(e.context("error on event stream"));
},
None => {
return Ok(());
},
}
},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting round events stream...");
return Ok(());
},
}
}
}
async fn run_round_events_process(&self) {
loop {
if self.shutdown.is_cancelled() {
info!("Shutdown signal received! Shutting round events process...");
break;
}
let started_at = std::time::Instant::now();
if let Err(e) = self.process_round_event_stream().await {
warn!("An error occured while processing pending rounds: {e:#}");
}
if started_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
trace!("Round events stream closed after healthy session, reconnecting");
continue;
}
self.connected.store(false, Ordering::Relaxed);
futures::select! {
_ = tokio::time::sleep(self.sync_interval()).fuse() => {},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting round events process...");
break;
},
}
}
}
async fn run_server_connection_check_process(&self) {
loop {
futures::select! {
_ = tokio::time::sleep(self.sync_interval()).fuse() => {},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting server connection check process...");
break;
},
}
if self.connected.load(Ordering::Relaxed) {
continue;
}
let result = self.wallet.refresh_server().await;
let connected = result.is_ok();
if let Err(ref e) = result {
warn!("Ark server reconnect failed: {:#}", e);
} else {
info!("Ark server reconnected");
}
self.connected.store(connected, Ordering::Relaxed);
}
}
async fn run_sync_processes(&self) {
let mut sync_interval = tokio::time::interval(self.sync_interval());
loop {
futures::select! {
_ = sync_interval.tick().fuse() => {
if self.connected.load(Ordering::Relaxed) {
self.run_fee_rate_update().await;
self.run_boards_sync().await;
self.run_offboards_sync().await;
}
self.run_onchain_sync().await;
self.run_rounds_sync().await;
self.run_exits().await;
sync_interval.reset();
},
_ = self.shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting sync processes...");
break;
},
}
}
}
async fn run_startup_tasks(&self) {
let result = self.wallet.refresh_server().await;
if let Err(ref e) = result {
warn!("Ark server refresh failed: {:#}", e);
}
let connected = self.wallet.inner.server.initialized();
self.connected.store(connected, Ordering::Relaxed);
if !self.wallet.config().daemon_manual_sync {
self.wallet.sync().await;
}
}
pub async fn run(self) {
info!("Starting daemon for wallet {}", self.wallet.fingerprint());
self.run_startup_tasks().await;
if self.wallet.config().daemon_manual_sync {
info!("Daemon running in manual-sync mode; background sync disabled");
let _ = self.run_server_connection_check_process().await;
} else {
let _ = futures::join!(
self.run_server_connection_check_process(),
self.run_round_events_process(),
self.run_sync_processes(),
self.run_mailbox_messages_process(),
);
}
info!("Daemon gracefully stopped");
}
}