Skip to main content

bark/
daemon.rs

1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::{FutureExt, StreamExt};
8use log::{info, warn};
9use tokio::sync::RwLock;
10#[cfg(not(feature = "wasm-web"))]
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13
14use crate::Wallet;
15use crate::onchain::DaemonizableOnchainWallet;
16
17
18/// A handle to a running background daemon
19#[cfg(not(feature = "wasm-web"))]
20pub struct DaemonHandle {
21	shutdown: CancellationToken,
22	jh: JoinHandle<()>,
23}
24
25/// A handle to a running background daemon for WASM
26#[cfg(feature = "wasm-web")]
27pub struct DaemonHandle {
28	shutdown: CancellationToken,
29}
30
31impl DaemonHandle {
32	/// Trigger the daemon process to stop
33	pub fn stop(&self) {
34		self.shutdown.cancel();
35	}
36
37	/// Stop the daemon process and wait for it to finish
38	pub async fn stop_wait(self) -> anyhow::Result<()> {
39		self.stop();
40		#[cfg(not(feature = "wasm-web"))]
41		self.jh.await?;
42		Ok(())
43	}
44}
45
46pub(crate) fn start_daemon(
47	wallet: Arc<Wallet>,
48	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
49) -> DaemonHandle {
50	let shutdown = CancellationToken::new();
51	let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
52
53	#[cfg(not(feature = "wasm-web"))]
54	{
55		let jh = crate::utils::spawn(proc.run());
56		DaemonHandle { shutdown, jh }
57	}
58	#[cfg(feature = "wasm-web")]
59	{
60		crate::utils::spawn(proc.run());
61		DaemonHandle { shutdown }
62	}
63}
64
65/// The daemon is responsible for running the wallet and performing the
66/// necessary actions to keep the wallet in a healthy state
67struct DaemonProcess {
68	shutdown: CancellationToken,
69
70	connected: AtomicBool,
71	wallet: Arc<Wallet>,
72	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
73}
74
75impl DaemonProcess {
76	fn new(
77		shutdown: CancellationToken,
78		wallet: Arc<Wallet>,
79		onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
80	) -> DaemonProcess {
81		DaemonProcess {
82			connected: AtomicBool::new(false),
83			shutdown,
84			wallet,
85			onchain,
86		}
87	}
88
89	fn fast_interval(&self) -> Duration {
90		Duration::from_secs(self.wallet.config().daemon_fast_sync_interval_secs)
91	}
92
93	fn slow_interval(&self) -> Duration {
94		Duration::from_secs(self.wallet.config().daemon_slow_sync_interval_secs)
95	}
96
97	/// Run lightning sync process
98	/// - Try to claim all pending lightning receives
99	/// - Sync pending lightning sends
100	async fn run_lightning_sync(&self) {
101		if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
102			warn!("An error occured while checking and claiming pending lightning receives: {e:#}");
103		}
104
105		if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
106			warn!("An error occured while syncing pending lightning sends: {e:#}");
107		}
108	}
109
110	/// Recursively resubscribe to mailbox message stream by waiting and
111	/// calling [Wallet::subscribe_store_mailbox_messages] again until
112	/// the daemon is shutdown.
113	async fn run_mailbox_messages_process(&self) {
114		loop {
115			let shutdown = self.shutdown.clone();
116			if self.connected.load(Ordering::Relaxed) {
117				let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
118				if let Err(e) = r {
119					warn!("An error occurred while processing mailbox messages: {e:#}");
120				}
121			}
122
123			futures::select! {
124				_ = tokio::time::sleep(self.slow_interval()).fuse() => {},
125				_ = self.shutdown.cancelled().fuse() => {
126					info!("Shutdown signal received! Shutting mailbox messages process...");
127					break;
128				},
129			}
130		}
131	}
132
133	/// Sync pending boards, register new ones if needed
134	async fn run_boards_sync(&self) {
135		if let Err(e) = self.wallet.sync_pending_boards().await {
136			warn!("An error occured while syncing pending board: {e:#}");
137		}
138	}
139
140	/// Sync pending offboards, check for confirmations
141	async fn run_offboards_sync(&self) {
142		if let Err(e) = self.wallet.sync_pending_offboards().await {
143			warn!("An error occured while syncing pending offboards: {e:#}");
144		}
145	}
146
147	/// Update cached fee rates from the chain source
148	async fn run_fee_rate_update(&self) {
149		if let Err(e) = self.wallet.chain.update_fee_rates(self.wallet.config.fallback_fee_rate).await {
150			warn!("An error occured while updating fee rates: {e:#}");
151		}
152	}
153
154	/// Sync onchain wallet
155	async fn run_onchain_sync(&self) {
156		if let Some(onchain) = &self.onchain {
157			let mut onchain = onchain.write().await;
158			if let Err(e) = onchain.sync(&self.wallet.chain).await {
159				warn!("An error occured while syncing onchain: {e:#}");
160			}
161		}
162	}
163
164	/// Perform library built-in maintenance refresh
165	async fn run_maintenance_refresh_process(&self) {
166		if let Err(e) = self.wallet.maintenance_refresh().await {
167			warn!("An error occured while performing maintenance refresh: {e:#}");
168		}
169	}
170
171	/// Progress any ongoing unilateral exits and sync the exit statuses
172	async fn run_exits(&self) {
173		if let Some(onchain) = &self.onchain {
174			let mut onchain = onchain.write().await;
175			let mut exit_lock = self.wallet.exit.write().await;
176			if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
177				warn!("An error occurred while syncing exits: {e:#}");
178			}
179
180			if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
181				warn!("An error occurred while progressing exits: {e:#}");
182			}
183		}
184	}
185
186	/// Subscribe to round event stream and process each incoming event
187	async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
188		let mut events = self.wallet.subscribe_round_events().await?;
189
190		loop {
191			futures::select! {
192				res = events.next().fuse() => {
193					let event = res.context("events stream broke")?
194						.context("error on event stream")?;
195
196					self.wallet.progress_pending_rounds(Some(&event)).await?;
197				},
198				_ = self.shutdown.cancelled().fuse() => {
199					info!("Shutdown signal received! Shutting inner round events process...");
200					return Ok(());
201				},
202			}
203		}
204	}
205
206	/// Recursively resubscribe to round event stream by waiting and
207	/// calling [Self::inner_process_pending_rounds] again until
208	/// the daemon is shutdown.
209	async fn run_round_events_process(&self) {
210		loop {
211			if self.connected.load(Ordering::Relaxed) {
212				if let Err(e) = self.inner_process_pending_rounds().await {
213					warn!("An error occured while processing pending rounds: {e:#}");
214				}
215			}
216
217			futures::select! {
218				_ = tokio::time::sleep(self.slow_interval()).fuse() => {},
219				_ = self.shutdown.cancelled().fuse() => {
220					info!("Shutdown signal received! Shutting round events process...");
221					break;
222				},
223			}
224		}
225	}
226
227	/// Run a process that will recursively check the server connection
228	async fn run_server_connection_check_process(&self) {
229		loop {
230			futures::select! {
231				_ = tokio::time::sleep(self.fast_interval()).fuse() => {},
232				_ = self.shutdown.cancelled().fuse() => {
233					info!("Shutdown signal received! Shutting server connection check process...");
234					break;
235				},
236			}
237
238			let connected = self.wallet.refresh_server().await.is_ok();
239			self.connected.store(connected, Ordering::Relaxed);
240		}
241	}
242
243	async fn run_sync_processes(&self) {
244		let mut fast_interval = tokio::time::interval(self.fast_interval());
245		fast_interval.reset();
246		let mut slow_interval = tokio::time::interval(self.slow_interval());
247		slow_interval.reset();
248
249		loop {
250			futures::select! {
251				_ = fast_interval.tick().fuse() => {
252					if !self.connected.load(Ordering::Relaxed) {
253						continue;
254					}
255
256					self.run_lightning_sync().await;
257					fast_interval.reset();
258				},
259				_ = slow_interval.tick().fuse() => {
260					if !self.connected.load(Ordering::Relaxed) {
261						continue;
262					}
263
264					self.run_fee_rate_update().await;
265					self.run_boards_sync().await;
266					self.run_offboards_sync().await;
267					self.run_maintenance_refresh_process().await;
268					self.run_onchain_sync().await;
269					self.run_exits().await;
270					slow_interval.reset();
271				},
272				_ = self.shutdown.cancelled().fuse() => {
273					info!("Shutdown signal received! Shutting sync processes...");
274					break;
275				},
276			}
277		}
278	}
279
280	pub async fn run(self) {
281		let connected = self.wallet.server.read().is_some();
282		self.connected.store(connected, Ordering::Relaxed);
283
284		let _ = futures::join!(
285			self.run_server_connection_check_process(),
286			self.run_round_events_process(),
287			self.run_sync_processes(),
288			self.run_mailbox_messages_process(),
289		);
290
291		info!("Daemon gracefully stopped");
292	}
293}