Skip to main content

bark/
daemon.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use ark::rounds::RoundEvent;
6use futures::{FutureExt, StreamExt};
7use log::{info, trace, warn};
8use tokio::sync::RwLock;
9#[cfg(not(feature = "wasm-web"))]
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15
16
17
18/// A handle to a running background daemon
19#[cfg(not(feature = "wasm-web"))]
20pub struct DaemonHandle {
21	shutdown: CancellationToken,
22	jh: JoinHandle<()>,
23}
24
25/// A handle to a running background daemon for WASM
26#[cfg(feature = "wasm-web")]
27pub struct DaemonHandle {
28	shutdown: CancellationToken,
29}
30
31impl DaemonHandle {
32	/// Trigger the daemon process to stop
33	pub fn stop(&self) {
34		self.shutdown.cancel();
35	}
36
37	/// Stop the daemon process and wait for it to finish
38	pub async fn stop_wait(self) -> anyhow::Result<()> {
39		self.stop();
40		#[cfg(not(feature = "wasm-web"))]
41		self.jh.await?;
42		Ok(())
43	}
44}
45
46pub(crate) fn start_daemon(
47	wallet: Wallet,
48	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
49) -> DaemonHandle {
50	let shutdown = CancellationToken::new();
51	let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
52
53	#[cfg(not(feature = "wasm-web"))]
54	{
55		let jh = crate::utils::spawn(proc.run());
56		DaemonHandle { shutdown, jh }
57	}
58	#[cfg(feature = "wasm-web")]
59	{
60		crate::utils::spawn(proc.run());
61		DaemonHandle { shutdown }
62	}
63}
64
65/// The daemon is responsible for running the wallet and performing the
66/// necessary actions to keep the wallet in a healthy state
67struct DaemonProcess {
68	shutdown: CancellationToken,
69
70	connected: AtomicBool,
71	wallet: Wallet,
72	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
73}
74
75impl DaemonProcess {
76	fn new(
77		shutdown: CancellationToken,
78		wallet: Wallet,
79		onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
80	) -> DaemonProcess {
81		DaemonProcess {
82			connected: AtomicBool::new(false),
83			shutdown,
84			wallet,
85			onchain,
86		}
87	}
88
89	fn sync_interval(&self) -> Duration {
90		Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
91	}
92
93	/// Recursively resubscribe to mailbox message stream by waiting and
94	/// calling [Wallet::subscribe_store_mailbox_messages] again until
95	/// the daemon is shutdown.
96	async fn run_mailbox_messages_process(&self) {
97		loop {
98			let shutdown = self.shutdown.clone();
99			if self.connected.load(Ordering::Relaxed) {
100				let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
101				if let Err(e) = r {
102					warn!("An error occurred while processing mailbox messages: {e:#}");
103				}
104			}
105
106			futures::select! {
107				_ = tokio::time::sleep(self.sync_interval()).fuse() => {},
108				_ = self.shutdown.cancelled().fuse() => {
109					info!("Shutdown signal received! Shutting mailbox messages process...");
110					break;
111				},
112			}
113		}
114	}
115
116	/// Sync pending boards, register new ones if needed
117	async fn run_boards_sync(&self) {
118		if let Err(e) = self.wallet.sync_pending_boards().await {
119			warn!("An error occured while syncing pending board: {e:#}");
120		}
121	}
122
123	/// Sync pending offboards, check for confirmations
124	async fn run_offboards_sync(&self) {
125		if let Err(e) = self.wallet.sync_pending_offboards().await {
126			warn!("An error occured while syncing pending offboards: {e:#}");
127		}
128	}
129
130	/// Sync pending rounds, check for confirmations and finalize VTXOs
131	async fn run_rounds_sync(&self) {
132		if let Err(e) = self.wallet.sync_pending_rounds().await {
133			warn!("An error occured while syncing pending rounds: {e:#}");
134		}
135	}
136
137	/// Update cached fee rates from the chain source
138	async fn run_fee_rate_update(&self) {
139		if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
140			warn!("An error occured while updating fee rates: {e:#}");
141		}
142	}
143
144	/// Sync onchain wallet
145	async fn run_onchain_sync(&self) {
146		if let Some(onchain) = &self.onchain {
147			let mut onchain = onchain.write().await;
148			if let Err(e) = onchain.sync(self.wallet.chain()).await {
149				warn!("An error occured while syncing onchain: {e:#}");
150			}
151		}
152	}
153
154	/// Progress any ongoing unilateral exits and sync the exit statuses
155	async fn run_exits(&self) {
156		if let Some(onchain) = &self.onchain {
157			let mut onchain = onchain.write().await;
158			if let Err(e) = self.wallet.exit_mgr().sync_no_progress(&*onchain).await {
159				warn!("An error occurred while syncing exits: {e:#}");
160			}
161
162			if let Err(e) = self.wallet.exit_mgr().progress_exits(&self.wallet, &mut *onchain, None).await {
163				warn!("An error occurred while progressing exits: {e:#}");
164			}
165		}
166	}
167
168	async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
169		// Do a refresh if you need to
170		match &event {
171			&RoundEvent::Attempt(attempt) => {
172				if attempt.attempt_seq == 0 {
173					match self.wallet.maybe_schedule_maintenance_refresh().await {
174						Ok(_) => {},
175						Err(err) => warn!("Failed to schedule maintenance refresh: {:?}", err),
176					}
177				};
178			},
179			_ => {},
180		};
181
182		self.wallet.progress_pending_rounds(Some(event)).await
183	}
184
185	/// Subscribe to the round event stream and process events
186	/// until it closes or the daemon shuts down.
187	async fn process_round_event_stream(&self) -> anyhow::Result<()> {
188		let mut events = self.wallet.subscribe_round_events().await?;
189		self.connected.store(true, Ordering::Relaxed);
190
191		loop {
192			futures::select! {
193				res = events.next().fuse() => {
194					match res {
195						Some(Ok(event)) => {
196							if let Err(e) = self.handle_round_event(&event).await {
197								warn!("Error processing round event: {e:#}");
198							}
199						},
200						Some(Err(e)) => {
201							return Err(e.context("error on event stream"));
202						},
203						None => {
204							return Ok(());
205						},
206					}
207				},
208				_ = self.shutdown.cancelled().fuse() => {
209					info!("Shutdown signal received! Shutting round events stream...");
210					return Ok(());
211				},
212			}
213		}
214	}
215
216	/// Keep the round events subscription alive for the
217	/// lifetime of the daemon, reconnecting as needed.
218	async fn run_round_events_process(&self) {
219		loop {
220			if self.shutdown.is_cancelled() {
221				info!("Shutdown signal received! Shutting round events process...");
222				break;
223			}
224
225			let started_at = std::time::Instant::now();
226			if let Err(e) = self.process_round_event_stream().await {
227				warn!("An error occured while processing pending rounds: {e:#}");
228			}
229
230			if started_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
231				trace!("Round events stream closed after healthy session, reconnecting");
232				continue;
233			}
234
235			self.connected.store(false, Ordering::Relaxed);
236
237			futures::select! {
238				_ = tokio::time::sleep(self.sync_interval()).fuse() => {},
239				_ = self.shutdown.cancelled().fuse() => {
240					info!("Shutdown signal received! Shutting round events process...");
241					break;
242				},
243			}
244		}
245	}
246
247	/// Periodically try to reconnect when the server is not reachable.
248	///
249	/// When connected, the round-events stream drives the `connected`
250	/// flag — this process only kicks in to re-establish the initial
251	/// connection so the streams can take over again.
252	async fn run_server_connection_check_process(&self) {
253		loop {
254			futures::select! {
255				_ = tokio::time::sleep(self.sync_interval()).fuse() => {},
256				_ = self.shutdown.cancelled().fuse() => {
257					info!("Shutdown signal received! Shutting server connection check process...");
258					break;
259				},
260			}
261
262			if self.connected.load(Ordering::Relaxed) {
263				continue;
264			}
265
266			let result = self.wallet.refresh_server().await;
267			let connected = result.is_ok();
268			if let Err(ref e) = result {
269				warn!("Ark server reconnect failed: {:#}", e);
270			} else {
271				info!("Ark server reconnected");
272			}
273			self.connected.store(connected, Ordering::Relaxed);
274		}
275	}
276
277	async fn run_sync_processes(&self) {
278		let mut sync_interval = tokio::time::interval(self.sync_interval());
279
280		loop {
281			futures::select! {
282				_ = sync_interval.tick().fuse() => {
283					if self.connected.load(Ordering::Relaxed) {
284						self.run_fee_rate_update().await;
285						self.run_boards_sync().await;
286						self.run_offboards_sync().await;
287					}
288					self.run_onchain_sync().await;
289					self.run_rounds_sync().await;
290					self.run_exits().await;
291					sync_interval.reset();
292				},
293				_ = self.shutdown.cancelled().fuse() => {
294					info!("Shutdown signal received! Shutting sync processes...");
295					break;
296				},
297			}
298		}
299	}
300
301	/// Run processes that only need to be run once on startup
302	async fn run_startup_tasks(&self) {
303		// Eagerly refresh the server connection before starting the other
304		// daemon tasks so they don't race the first connection check and
305		// skip their initial iteration with `connected = false` (which
306		// would delay mailbox subscription by `slow_interval`).
307		let result = self.wallet.refresh_server().await;
308		if let Err(ref e) = result {
309			warn!("Ark server refresh failed: {:#}", e);
310		}
311		let connected = self.wallet.inner.server.initialized();
312		self.connected.store(connected, Ordering::Relaxed);
313
314		if !self.wallet.config().daemon_manual_sync {
315			self.wallet.sync().await;
316		}
317	}
318
319	pub async fn run(self) {
320		info!("Starting daemon for wallet {}", self.wallet.fingerprint());
321
322		self.run_startup_tasks().await;
323
324		if self.wallet.config().daemon_manual_sync {
325			// In manual-sync mode only the server connection heartbeat keeps
326			// running; everything else must be triggered via the REST API.
327			info!("Daemon running in manual-sync mode; background sync disabled");
328			let _ = self.run_server_connection_check_process().await;
329		} else {
330			let _ = futures::join!(
331				self.run_server_connection_check_process(),
332				self.run_round_events_process(),
333				self.run_sync_processes(),
334				self.run_mailbox_messages_process(),
335			);
336		}
337
338		info!("Daemon gracefully stopped");
339	}
340}