Skip to main content

bark/
daemon.rs

1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::{FutureExt, StreamExt};
8use log::{info, warn};
9use tokio::sync::RwLock;
10#[cfg(not(feature = "wasm-web"))]
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13
14use crate::Wallet;
15use crate::onchain::DaemonizableOnchainWallet;
16
17
18/// A handle to a running background daemon
19#[cfg(not(feature = "wasm-web"))]
20pub struct DaemonHandle {
21	shutdown: CancellationToken,
22	jh: JoinHandle<()>,
23}
24
25/// A handle to a running background daemon for WASM
26#[cfg(feature = "wasm-web")]
27pub struct DaemonHandle {
28	shutdown: CancellationToken,
29}
30
31impl DaemonHandle {
32	/// Trigger the daemon process to stop
33	pub fn stop(&self) {
34		self.shutdown.cancel();
35	}
36
37	/// Stop the daemon process and wait for it to finish
38	pub async fn stop_wait(self) -> anyhow::Result<()> {
39		self.stop();
40		#[cfg(not(feature = "wasm-web"))]
41		self.jh.await?;
42		Ok(())
43	}
44}
45
46pub(crate) fn start_daemon(
47	wallet: Arc<Wallet>,
48	onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
49) -> DaemonHandle {
50	let shutdown = CancellationToken::new();
51	let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
52
53	#[cfg(not(feature = "wasm-web"))]
54	{
55		let jh = crate::utils::spawn(proc.run());
56		DaemonHandle { shutdown, jh }
57	}
58	#[cfg(feature = "wasm-web")]
59	{
60		crate::utils::spawn(proc.run());
61		DaemonHandle { shutdown }
62	}
63}
64
65/// The daemon is responsible for running the wallet and performing the
66/// necessary actions to keep the wallet in a healthy state
67struct DaemonProcess {
68	shutdown: CancellationToken,
69
70	connected: AtomicBool,
71	wallet: Arc<Wallet>,
72	onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
73}
74
75impl DaemonProcess {
76	fn new(
77		shutdown: CancellationToken,
78		wallet: Arc<Wallet>,
79		onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
80	) -> DaemonProcess {
81		DaemonProcess {
82			connected: AtomicBool::new(false),
83			shutdown,
84			wallet,
85			onchain,
86		}
87	}
88
89	fn fast_interval(&self) -> Duration {
90		Duration::from_secs(self.wallet.config().daemon_fast_sync_interval_secs)
91	}
92
93	fn slow_interval(&self) -> Duration {
94		Duration::from_secs(self.wallet.config().daemon_slow_sync_interval_secs)
95	}
96
97	/// Run lightning sync process
98	/// - Try to claim all pending lightning receives
99	/// - Sync pending lightning sends
100	async fn run_lightning_sync(&self) {
101		if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
102			warn!("An error occured while checking and claiming pending lightning receives: {e:#}");
103		}
104
105		if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
106			warn!("An error occured while syncing pending lightning sends: {e:#}");
107		}
108	}
109
110	/// Recursively resubscribe to mailbox message stream by waiting and
111	/// calling [Wallet::subscribe_store_mailbox_messages] again until
112	/// the daemon is shutdown.
113	async fn run_mailbox_messages_process(&self) {
114		loop {
115			if self.connected.load(Ordering::Relaxed) {
116				if let Err(e) = self.wallet.subscribe_process_mailbox_messages(None).await {
117					warn!("An error occured while processing mailbox messages: {e:#}");
118				}
119			}
120
121			futures::select! {
122				_ = tokio::time::sleep(self.slow_interval()).fuse() => {},
123				_ = self.shutdown.cancelled().fuse() => {
124					info!("Shutdown signal received! Shutting mailbox messages process...");
125					break;
126				},
127			}
128		}
129	}
130
131	/// Sync pending boards, register new ones if needed
132	async fn run_boards_sync(&self) {
133		if let Err(e) = self.wallet.sync_pending_boards().await {
134			warn!("An error occured while syncing pending board: {e:#}");
135		}
136	}
137
138	/// Sync pending offboards, check for confirmations
139	async fn run_offboards_sync(&self) {
140		if let Err(e) = self.wallet.sync_pending_offboards().await {
141			warn!("An error occured while syncing pending offboards: {e:#}");
142		}
143	}
144
145	/// Update cached fee rates from the chain source
146	async fn run_fee_rate_update(&self) {
147		if let Err(e) = self.wallet.chain.update_fee_rates(self.wallet.config.fallback_fee_rate).await {
148			warn!("An error occured while updating fee rates: {e:#}");
149		}
150	}
151
152	/// Sync onchain wallet
153	async fn run_onchain_sync(&self) {
154		let mut onchain = self.onchain.write().await;
155		if let Err(e) = onchain.sync(&self.wallet.chain).await {
156			warn!("An error occured while syncing onchain: {e:#}");
157		}
158	}
159
160	/// Perform library built-in maintenance refresh
161	async fn run_maintenance_refresh_process(&self) {
162		if let Err(e) = self.wallet.maintenance_refresh().await {
163			warn!("An error occured while performing maintenance refresh: {e:#}");
164		}
165	}
166
167	/// Progress any ongoing unilateral exits and sync the exit statuses
168	async fn run_exits(&self) {
169		let mut onchain = self.onchain.write().await;
170
171		let mut exit_lock = self.wallet.exit.write().await;
172		if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
173			warn!("An error occurred while syncing exits: {e:#}");
174		}
175
176		if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
177			warn!("An error occurred while progressing exits: {e:#}");
178		}
179	}
180
181	/// Subscribe to round event stream and process each incoming event
182	async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
183		let mut events = self.wallet.subscribe_round_events().await?;
184
185		loop {
186			futures::select! {
187				res = events.next().fuse() => {
188					let event = res.context("events stream broke")?
189						.context("error on event stream")?;
190
191					self.wallet.progress_pending_rounds(Some(&event)).await?;
192				},
193				_ = self.shutdown.cancelled().fuse() => {
194					info!("Shutdown signal received! Shutting inner round events process...");
195					return Ok(());
196				},
197			}
198		}
199	}
200
201	/// Recursively resubscribe to round event stream by waiting and
202	/// calling [Self::inner_process_pending_rounds] again until
203	/// the daemon is shutdown.
204	async fn run_round_events_process(&self) {
205		loop {
206			if self.connected.load(Ordering::Relaxed) {
207				if let Err(e) = self.inner_process_pending_rounds().await {
208					warn!("An error occured while processing pending rounds: {e:#}");
209				}
210			}
211
212			futures::select! {
213				_ = tokio::time::sleep(self.slow_interval()).fuse() => {},
214				_ = self.shutdown.cancelled().fuse() => {
215					info!("Shutdown signal received! Shutting round events process...");
216					break;
217				},
218			}
219		}
220	}
221
222	/// Run a process that will recursively check the server connection
223	async fn run_server_connection_check_process(&self) {
224		loop {
225			futures::select! {
226				_ = tokio::time::sleep(self.fast_interval()).fuse() => {},
227				_ = self.shutdown.cancelled().fuse() => {
228					info!("Shutdown signal received! Shutting server connection check process...");
229					break;
230				},
231			}
232
233			let connected = self.wallet.refresh_server().await.is_ok();
234			self.connected.store(connected, Ordering::Relaxed);
235		}
236	}
237
238	async fn run_sync_processes(&self) {
239		let mut fast_interval = tokio::time::interval(self.fast_interval());
240		fast_interval.reset();
241		let mut slow_interval = tokio::time::interval(self.slow_interval());
242		slow_interval.reset();
243
244		loop {
245			futures::select! {
246				_ = fast_interval.tick().fuse() => {
247					if !self.connected.load(Ordering::Relaxed) {
248						continue;
249					}
250
251					self.run_lightning_sync().await;
252					fast_interval.reset();
253				},
254				_ = slow_interval.tick().fuse() => {
255					if !self.connected.load(Ordering::Relaxed) {
256						continue;
257					}
258
259					self.run_fee_rate_update().await;
260					self.run_boards_sync().await;
261					self.run_offboards_sync().await;
262					self.run_maintenance_refresh_process().await;
263					self.run_onchain_sync().await;
264					self.run_exits().await;
265					slow_interval.reset();
266				},
267				_ = self.shutdown.cancelled().fuse() => {
268					info!("Shutdown signal received! Shutting sync processes...");
269					break;
270				},
271			}
272		}
273	}
274
275	pub async fn run(self) {
276		let connected = self.wallet.server.read().is_some();
277		self.connected.store(connected, Ordering::Relaxed);
278
279		let _ = futures::join!(
280			self.run_server_connection_check_process(),
281			self.run_round_events_process(),
282			self.run_sync_processes(),
283			self.run_mailbox_messages_process(),
284		);
285
286		info!("Daemon gracefully stopped");
287	}
288}