Skip to main content

bark/
daemon.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use ark::rounds::RoundEvent;
6use futures::{FutureExt, StreamExt};
7use log::{info, trace, warn};
8use tokio::sync::RwLock;
9#[cfg(not(feature = "wasm-web"))]
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15use crate::utils::time::sleep;
16
17
18
19/// A handle to a running background daemon
20#[cfg(not(feature = "wasm-web"))]
21pub struct DaemonHandle {
22	shutdown: CancellationToken,
23	jh: JoinHandle<()>,
24}
25
26/// A handle to a running background daemon for WASM
27#[cfg(feature = "wasm-web")]
28pub struct DaemonHandle {
29	shutdown: CancellationToken,
30}
31
32impl DaemonHandle {
33	/// Trigger the daemon process to stop
34	pub fn stop(&self) {
35		self.shutdown.cancel();
36	}
37
38	/// Stop the daemon process and wait for it to finish
39	pub async fn stop_wait(self) -> anyhow::Result<()> {
40		self.stop();
41		#[cfg(not(feature = "wasm-web"))]
42		self.jh.await?;
43		Ok(())
44	}
45}
46
47pub(crate) fn start_daemon(
48	wallet: Wallet,
49	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
50) -> DaemonHandle {
51	let shutdown = CancellationToken::new();
52	let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
53
54	#[cfg(not(feature = "wasm-web"))]
55	{
56		let jh = crate::utils::spawn(proc.run());
57		DaemonHandle { shutdown, jh }
58	}
59	#[cfg(feature = "wasm-web")]
60	{
61		crate::utils::spawn(proc.run());
62		DaemonHandle { shutdown }
63	}
64}
65
66/// The daemon is responsible for running the wallet and performing the
67/// necessary actions to keep the wallet in a healthy state
68struct DaemonProcess {
69	shutdown: CancellationToken,
70
71	connected: AtomicBool,
72	wallet: Wallet,
73	onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
74}
75
76impl DaemonProcess {
77	fn new(
78		shutdown: CancellationToken,
79		wallet: Wallet,
80		onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
81	) -> DaemonProcess {
82		DaemonProcess {
83			connected: AtomicBool::new(false),
84			shutdown,
85			wallet,
86			onchain,
87		}
88	}
89
90	fn sync_interval(&self) -> Duration {
91		Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
92	}
93
94	/// Recursively resubscribe to mailbox message stream by waiting and
95	/// calling [Wallet::subscribe_store_mailbox_messages] again until
96	/// the daemon is shutdown.
97	///
98	/// The mailbox stream is always-on and sets `connected` to `false`
99	/// when it breaks, so other processes can back off.
100	async fn run_mailbox_messages_process(&self) {
101		loop {
102			let shutdown = self.shutdown.clone();
103			if self.connected.load(Ordering::Relaxed) {
104				let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
105				if let Err(e) = r {
106					warn!("An error occurred while processing mailbox messages: {e:#}");
107					self.connected.store(false, Ordering::Relaxed);
108				}
109			}
110
111			futures::select! {
112				_ = sleep(self.sync_interval()).fuse() => {},
113				_ = self.shutdown.cancelled().fuse() => {
114					info!("Shutdown signal received! Shutting mailbox messages process...");
115					break;
116				},
117			}
118		}
119	}
120
121	/// Sync pending boards, register new ones if needed
122	async fn run_boards_sync(&self) {
123		if let Err(e) = self.wallet.sync_pending_boards().await {
124			warn!("An error occured while syncing pending board: {e:#}");
125		}
126	}
127
128	/// Sync pending offboards, check for confirmations
129	async fn run_offboards_sync(&self) {
130		if let Err(e) = self.wallet.sync_pending_offboards().await {
131			warn!("An error occured while syncing pending offboards: {e:#}");
132		}
133	}
134
135	/// Sync pending rounds, check for confirmations and finalize VTXOs
136	async fn run_rounds_sync(&self) {
137		if let Err(e) = self.wallet.sync_pending_rounds().await {
138			warn!("An error occured while syncing pending rounds: {e:#}");
139		}
140	}
141
142	/// Update cached fee rates from the chain source
143	async fn run_fee_rate_update(&self) {
144		if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
145			warn!("An error occured while updating fee rates: {e:#}");
146		}
147	}
148
149	/// Sync onchain wallet
150	async fn run_onchain_sync(&self) {
151		if let Some(onchain) = &self.onchain {
152			let mut onchain = onchain.write().await;
153			if let Err(e) = onchain.sync(self.wallet.chain()).await {
154				warn!("An error occured while syncing onchain: {e:#}");
155			}
156		}
157	}
158
159	/// Progress any ongoing unilateral exits and sync the exit statuses
160	async fn run_exits(&self) {
161		if let Some(onchain) = &self.onchain {
162			let mut onchain = onchain.write().await;
163			if let Err(e) = self.wallet.exit_mgr().progress_exits_with_bdk(&self.wallet, &mut *onchain, None).await {
164				warn!("An error occurred while progressing exits: {e:#}");
165			}
166		}
167	}
168
169	async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
170		// Do a refresh if you need to
171		match &event {
172			&RoundEvent::Attempt(attempt) => {
173				if attempt.attempt_seq == 0 {
174					match self.wallet.maybe_schedule_maintenance_refresh().await {
175						Ok(_) => {},
176						Err(err) => warn!("Failed to schedule maintenance refresh: {:?}", err),
177					}
178				};
179			},
180			_ => {},
181		};
182
183		self.wallet.progress_pending_rounds(Some(event)).await
184	}
185
186	/// Subscribe to the round event stream and process events
187	/// until it closes or the daemon shuts down.
188	async fn process_round_event_stream(&self) -> anyhow::Result<()> {
189		let mut events = self.wallet.subscribe_round_events().await?;
190
191		loop {
192			futures::select! {
193				res = events.next().fuse() => {
194					match res {
195						Some(Ok(event)) => {
196							if let Err(e) = self.handle_round_event(&event).await {
197								warn!("Error processing round event: {e:#}");
198							}
199						},
200						Some(Err(e)) => {
201							return Err(e.context("error on event stream"));
202						},
203						None => {
204							return Ok(());
205						},
206					}
207				},
208				_ = self.shutdown.cancelled().fuse() => {
209					info!("Shutdown signal received! Shutting round events stream...");
210					return Ok(());
211				},
212			}
213		}
214	}
215
216	/// Keep the round events subscription alive for the
217	/// lifetime of the daemon, reconnecting as needed.
218	async fn run_round_events_process(&self) {
219		loop {
220			if self.shutdown.is_cancelled() {
221				info!("Shutdown signal received! Shutting round events process...");
222				break;
223			}
224
225			let started_at = std::time::Instant::now();
226			if let Err(e) = self.process_round_event_stream().await {
227				warn!("An error occured while processing pending rounds: {e:#}");
228			}
229
230			if started_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
231				trace!("Round events stream closed after healthy session, reconnecting");
232				continue;
233			}
234
235			futures::select! {
236				_ = sleep(self.sync_interval()).fuse() => {},
237				_ = self.shutdown.cancelled().fuse() => {
238					info!("Shutdown signal received! Shutting round events process...");
239					break;
240				},
241			}
242		}
243	}
244
245	/// Periodically try to reconnect when the server is not reachable.
246	///
247	/// Sets `connected` to `true` on success so the round-events
248	/// and mailbox streams start subscribing again.
249	async fn run_server_connection_check_process(&self) {
250		loop {
251			futures::select! {
252				_ = sleep(self.sync_interval()).fuse() => {},
253				_ = self.shutdown.cancelled().fuse() => {
254					info!("Shutdown signal received! Shutting server connection check process...");
255					break;
256				},
257			}
258
259			if self.connected.load(Ordering::Relaxed) {
260				continue;
261			}
262
263			let result = self.wallet.refresh_server().await;
264			if let Err(ref e) = result {
265				warn!("Ark server reconnect failed: {:#}", e);
266			} else {
267				info!("Ark server reconnected");
268				self.connected.store(true, Ordering::Relaxed);
269			}
270		}
271	}
272
273	async fn run_sync_processes(&self) {
274		let mut sync_interval = tokio::time::interval(self.sync_interval());
275
276		loop {
277			futures::select! {
278				_ = sync_interval.tick().fuse() => {
279					if self.connected.load(Ordering::Relaxed) {
280						self.run_fee_rate_update().await;
281						self.run_boards_sync().await;
282						self.run_offboards_sync().await;
283					}
284					self.run_onchain_sync().await;
285					self.run_rounds_sync().await;
286					self.run_exits().await;
287					sync_interval.reset();
288				},
289				_ = self.shutdown.cancelled().fuse() => {
290					info!("Shutdown signal received! Shutting sync processes...");
291					break;
292				},
293			}
294		}
295	}
296
297	/// Run processes that only need to be run once on startup
298	async fn run_startup_tasks(&self) {
299		// Eagerly refresh the server connection before starting the other
300		// daemon tasks so they don't race the first connection check and
301		// skip their initial iteration with `connected = false` (which
302		// would delay mailbox subscription by `slow_interval`).
303		let result = self.wallet.refresh_server().await;
304		if let Err(ref e) = result {
305			warn!("Ark server refresh failed: {:#}", e);
306		}
307		let connected = self.wallet.inner.server.initialized();
308		self.connected.store(connected, Ordering::Relaxed);
309
310		if !self.wallet.config().daemon_manual_sync {
311			self.wallet.sync().await;
312		}
313	}
314
315	pub async fn run(self) {
316		info!("Starting daemon for wallet {}", self.wallet.fingerprint());
317
318		self.run_startup_tasks().await;
319
320		if self.wallet.config().daemon_manual_sync {
321			// In manual-sync mode only the server connection heartbeat keeps
322			// running; everything else must be triggered via the REST API.
323			info!("Daemon running in manual-sync mode; background sync disabled");
324			let _ = self.run_server_connection_check_process().await;
325		} else {
326			let _ = futures::join!(
327				self.run_server_connection_check_process(),
328				self.run_round_events_process(),
329				self.run_sync_processes(),
330				self.run_mailbox_messages_process(),
331			);
332		}
333
334		info!("Daemon gracefully stopped");
335	}
336}