Skip to main content

bark/
daemon.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use ark::rounds::RoundEvent;
6use futures::{FutureExt, StreamExt};
7use log::{info, trace, warn};
8use tokio::sync::RwLock;
9#[cfg(not(feature = "wasm-web"))]
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15use crate::utils::time::sleep;
16
17
18
19/// A handle to a running background daemon
20#[cfg(not(feature = "wasm-web"))]
21pub struct DaemonHandle {
22	shutdown: CancellationToken,
23	jh: JoinHandle<()>,
24}
25
26/// A handle to a running background daemon for WASM
27#[cfg(feature = "wasm-web")]
28pub struct DaemonHandle {
29	shutdown: CancellationToken,
30}
31
32impl DaemonHandle {
33	/// Trigger the daemon process to stop
34	pub fn stop(&self) {
35		self.shutdown.cancel();
36	}
37
38	/// Stop the daemon process and wait for it to finish
39	pub async fn stop_wait(self) -> anyhow::Result<()> {
40		self.stop();
41		#[cfg(not(feature = "wasm-web"))]
42		self.jh.await?;
43		Ok(())
44	}
45}
46
47pub(crate) fn start_daemon(
48	wallet: Wallet,
49	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
50) -> DaemonHandle {
51	let shutdown = CancellationToken::new();
52	let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
53
54	#[cfg(not(feature = "wasm-web"))]
55	{
56		let jh = crate::utils::spawn(proc.run());
57		DaemonHandle { shutdown, jh }
58	}
59	#[cfg(feature = "wasm-web")]
60	{
61		crate::utils::spawn(proc.run());
62		DaemonHandle { shutdown }
63	}
64}
65
66/// The daemon is responsible for running the wallet and performing the
67/// necessary actions to keep the wallet in a healthy state
68struct DaemonProcess {
69	shutdown: CancellationToken,
70
71	connected: AtomicBool,
72	wallet: Wallet,
73	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
74}
75
76impl DaemonProcess {
77	fn new(
78		shutdown: CancellationToken,
79		wallet: Wallet,
80		onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
81	) -> DaemonProcess {
82		DaemonProcess {
83			connected: AtomicBool::new(false),
84			shutdown,
85			wallet,
86			onchain,
87		}
88	}
89
90	fn sync_interval(&self) -> Duration {
91		Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
92	}
93
94	/// Recursively resubscribe to mailbox message stream by waiting and
95	/// calling [Wallet::subscribe_store_mailbox_messages] again until
96	/// the daemon is shutdown.
97	///
98	/// The mailbox stream is always-on and sets `connected` to `false`
99	/// when it breaks, so other processes can back off.
100	async fn run_mailbox_messages_process(&self) {
101		loop {
102			let shutdown = self.shutdown.clone();
103			if self.connected.load(Ordering::Relaxed) {
104				let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
105				if let Err(e) = r {
106					warn!("An error occurred while processing mailbox messages: {e:#}");
107					self.connected.store(false, Ordering::Relaxed);
108				}
109			}
110
111			futures::select! {
112				_ = sleep(self.sync_interval()).fuse() => {},
113				_ = self.shutdown.cancelled().fuse() => {
114					info!("Shutdown signal received! Shutting mailbox messages process...");
115					break;
116				},
117			}
118		}
119	}
120
121	/// Sync pending boards, register new ones if needed
122	async fn run_boards_sync(&self) {
123		if let Err(e) = self.wallet.sync_pending_boards().await {
124			warn!("An error occured while syncing pending board: {e:#}");
125		}
126	}
127
128	/// Sync pending offboards, check for confirmations
129	async fn run_offboards_sync(&self) {
130		if let Err(e) = self.wallet.sync_pending_offboards().await {
131			warn!("An error occured while syncing pending offboards: {e:#}");
132		}
133	}
134
135	/// Sync pending rounds, check for confirmations and finalize VTXOs
136	async fn run_rounds_sync(&self) {
137		if let Err(e) = self.wallet.sync_pending_rounds().await {
138			warn!("An error occured while syncing pending rounds: {e:#}");
139		}
140	}
141
142	/// Update cached fee rates from the chain source
143	async fn run_fee_rate_update(&self) {
144		if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
145			warn!("An error occured while updating fee rates: {e:#}");
146		}
147	}
148
149	/// Sync onchain wallet
150	async fn run_onchain_sync(&self) {
151		if let Some(onchain) = &self.onchain {
152			let mut onchain = onchain.write().await;
153			if let Err(e) = onchain.sync(self.wallet.chain()).await {
154				warn!("An error occured while syncing onchain: {e:#}");
155			}
156		}
157	}
158
159	/// Progress any ongoing unilateral exits and sync the exit statuses
160	async fn run_exits(&self) {
161		if let Some(onchain) = &self.onchain {
162			let mut onchain = onchain.write().await;
163			if let Err(e) = self.wallet.exit_mgr().progress_exits_with_bdk(&self.wallet, &mut *onchain, None).await {
164				warn!("An error occurred while progressing exits: {e:#}");
165			}
166		}
167	}
168
169	async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
170		// Do a refresh if you need to
171		match &event {
172			&RoundEvent::Attempt(attempt) => {
173				if attempt.attempt_seq == 0 {
174					if let Err(err) = self.wallet.join_round_for_maintenance_refresh(attempt).await {
175						warn!("Failed to join round for maintenance refresh: {:#}", err);
176					}
177				};
178			},
179			_ => {},
180		};
181
182		self.wallet.progress_pending_rounds(Some(event)).await
183	}
184
185	/// Subscribe to the round event stream and process events
186	/// until it closes or the daemon shuts down.
187	async fn process_round_event_stream(&self) -> anyhow::Result<()> {
188		let mut events = self.wallet.subscribe_round_events().await?;
189
190		loop {
191			futures::select! {
192				res = events.next().fuse() => {
193					match res {
194						Some(Ok(event)) => {
195							if let Err(e) = self.handle_round_event(&event).await {
196								warn!("Error processing round event: {e:#}");
197							}
198						},
199						Some(Err(e)) => {
200							return Err(e.context("error on event stream"));
201						},
202						None => {
203							return Ok(());
204						},
205					}
206				},
207				_ = self.shutdown.cancelled().fuse() => {
208					info!("Shutdown signal received! Shutting round events stream...");
209					return Ok(());
210				},
211			}
212		}
213	}
214
215	/// Keep the round events subscription alive for the
216	/// lifetime of the daemon, reconnecting as needed.
217	async fn run_round_events_process(&self) {
218		loop {
219			if self.shutdown.is_cancelled() {
220				info!("Shutdown signal received! Shutting round events process...");
221				break;
222			}
223
224			match self.process_round_event_stream().await {
225				Ok(()) => {},
226				// A tonic h2 stream reset is almost always a
227				// proxy- or server-side idle timeout rather than
228				// a real failure; resubscribe quietly.
229				Err(e) if crate::utils::is_h2_stream_error(&e) => {
230					trace!("Round events stream reset by server, reconnecting: {e:#}");
231				},
232				Err(e) => {
233					warn!("An error occured while processing pending rounds: {e:#}");
234					futures::select! {
235						_ = sleep(self.sync_interval()).fuse() => {},
236						_ = self.shutdown.cancelled().fuse() => {
237							info!("Shutdown signal received! Shutting round events process...");
238							break;
239						},
240					}
241				},
242			}
243		}
244	}
245
246	/// Periodically try to reconnect when the server is not reachable.
247	///
248	/// Sets `connected` to `true` on success so the round-events
249	/// and mailbox streams start subscribing again.
250	async fn run_server_connection_check_process(&self) {
251		loop {
252			futures::select! {
253				_ = sleep(self.sync_interval()).fuse() => {},
254				_ = self.shutdown.cancelled().fuse() => {
255					info!("Shutdown signal received! Shutting server connection check process...");
256					break;
257				},
258			}
259
260			if self.connected.load(Ordering::Relaxed) {
261				continue;
262			}
263
264			let result = self.wallet.refresh_server().await;
265			if let Err(ref e) = result {
266				warn!("Ark server reconnect failed: {:#}", e);
267			} else {
268				info!("Ark server reconnected");
269				self.connected.store(true, Ordering::Relaxed);
270			}
271		}
272	}
273
274	async fn run_sync_processes(&self) {
275		let mut sync_interval = tokio::time::interval(self.sync_interval());
276
277		loop {
278			futures::select! {
279				_ = sync_interval.tick().fuse() => {
280					if self.connected.load(Ordering::Relaxed) {
281						self.run_fee_rate_update().await;
282						self.run_boards_sync().await;
283						self.run_offboards_sync().await;
284					}
285					self.run_onchain_sync().await;
286					self.run_rounds_sync().await;
287					self.run_exits().await;
288					sync_interval.reset();
289				},
290				_ = self.shutdown.cancelled().fuse() => {
291					info!("Shutdown signal received! Shutting sync processes...");
292					break;
293				},
294			}
295		}
296	}
297
298	/// Run processes that only need to be run once on startup
299	async fn run_startup_tasks(&self) {
300		// Eagerly refresh the server connection before starting the other
301		// daemon tasks so they don't race the first connection check and
302		// skip their initial iteration with `connected = false` (which
303		// would delay mailbox subscription by `slow_interval`).
304		let result = self.wallet.refresh_server().await;
305		if let Err(ref e) = result {
306			warn!("Ark server refresh failed: {:#}", e);
307		}
308		let connected = self.wallet.inner.server.initialized();
309		self.connected.store(connected, Ordering::Relaxed);
310
311		if !self.wallet.config().daemon_manual_sync {
312			self.wallet.sync().await;
313		}
314	}
315
316	pub async fn run(self) {
317		info!("Starting daemon for wallet {}", self.wallet.fingerprint());
318
319		self.run_startup_tasks().await;
320
321		if self.wallet.config().daemon_manual_sync {
322			// In manual-sync mode only the server connection heartbeat keeps
323			// running; everything else must be triggered via the REST API.
324			info!("Daemon running in manual-sync mode; background sync disabled");
325			let _ = self.run_server_connection_check_process().await;
326		} else {
327			#[cfg(not(feature = "wasm-web"))]
328			{
329				// Each loop runs in its own tokio task so that a panic in one
330				// (e.g. from a crafted round proposal) cannot silently kill the
331				// others — in particular exit monitoring / CPFP fee-bumping.
332				let proc = Arc::new(self);
333				let p1 = Arc::clone(&proc);
334				let p2 = Arc::clone(&proc);
335				let p3 = Arc::clone(&proc);
336				let p4 = Arc::clone(&proc);
337				let _ = futures::join!(
338					supervised("server-connection", move || {
339						let p = Arc::clone(&p1);
340						async move { p.run_server_connection_check_process().await }
341					}),
342					supervised("round-events", move || {
343						let p = Arc::clone(&p2);
344						async move { p.run_round_events_process().await }
345					}),
346					supervised("sync", move || {
347						let p = Arc::clone(&p3);
348						async move { p.run_sync_processes().await }
349					}),
350					supervised("mailbox", move || {
351						let p = Arc::clone(&p4);
352						async move { p.run_mailbox_messages_process().await }
353					}),
354				);
355			}
356			#[cfg(feature = "wasm-web")]
357			{
358				let _ = futures::join!(
359					self.run_server_connection_check_process(),
360					self.run_round_events_process(),
361					self.run_sync_processes(),
362					self.run_mailbox_messages_process(),
363				);
364			}
365		}
366
367		info!("Daemon gracefully stopped");
368	}
369}
370
371/// Run `f` in its own [`tokio::spawn`] task, restarting it if it panics.
372///
373/// A clean return (shutdown signal) breaks the loop immediately.
374#[cfg(not(feature = "wasm-web"))]
375async fn supervised<F, Fut>(name: &'static str, f: F)
376where
377	F: Fn() -> Fut,
378	Fut: std::future::Future<Output = ()> + Send + 'static,
379{
380	loop {
381		match tokio::spawn(f()).await {
382			Ok(()) => break,
383			Err(e) => {
384				warn!("Daemon task '{}' terminated unexpectedly, restarting: {e}", name);
385				tokio::time::sleep(Duration::from_secs(1)).await;
386			},
387		}
388	}
389}