Skip to main content

bark/
daemon.rs

1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::StreamExt;
8use log::{info, warn};
9use tokio::sync::RwLock;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15
16const FAST_INTERVAL: Duration = Duration::from_secs(1);
17const MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
18const SLOW_INTERVAL: Duration = Duration::from_secs(60);
19
20/// A handle to a running background daemon
21pub struct DaemonHandle {
22	shutdown: CancellationToken,
23	jh: JoinHandle<()>,
24}
25
26impl DaemonHandle {
27	/// Trigger the daemon process to stop
28	pub fn stop(&self) {
29		self.shutdown.cancel();
30	}
31
32	/// Stop the daemon process and wait for it to finish
33	pub async fn stop_wait(self) -> anyhow::Result<()> {
34		self.stop();
35		self.jh.await?;
36		Ok(())
37	}
38}
39
40pub(crate) fn start_daemon(
41	wallet: Arc<Wallet>,
42	onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
43) -> DaemonHandle {
44	let shutdown = CancellationToken::new();
45	let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
46
47	let jh = tokio::spawn(proc.run());
48
49	DaemonHandle { shutdown, jh }
50}
51
52/// The daemon is responsible for running the wallet and performing the
53/// necessary actions to keep the wallet in a healthy state
54struct DaemonProcess {
55	shutdown: CancellationToken,
56
57	connected: AtomicBool,
58	wallet: Arc<Wallet>,
59	onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
60}
61
62impl DaemonProcess {
63	fn new(
64		shutdown: CancellationToken,
65		wallet: Arc<Wallet>,
66		onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
67	) -> DaemonProcess {
68		DaemonProcess {
69			connected: AtomicBool::new(false),
70			shutdown,
71			wallet,
72			onchain,
73		}
74	}
75
76	/// Run lightning sync process
77	/// - Try to claim all pending lightning receives
78	/// - Sync pending lightning sends
79	async fn run_lightning_sync(&self) {
80		if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
81			warn!("An error occured while checking and claiming pending lightning receives: {e}");
82		}
83
84		if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
85			warn!("An error occured while syncing pending lightning sends: {e}");
86		}
87	}
88
89	/// Check for incoming arkoors
90	async fn run_arkoors_sync(&self) {
91		if let Err(e) = self.wallet.sync_oors().await {
92			warn!("An error occured while syncing oors: {e}");
93		}
94	}
95
96	/// Sync pending boards, register new ones if needed
97	async fn run_boards_sync(&self) {
98		if let Err(e) = self.wallet.sync_pending_boards().await {
99			warn!("An error occured while syncing pending board: {e}");
100		}
101	}
102
103	/// Sync onchain wallet
104	async fn run_onchain_sync(&self) {
105		let mut onchain = self.onchain.write().await;
106		if let Err(e) = onchain.sync(&self.wallet.chain).await {
107			warn!("An error occured while syncing onchain: {e}");
108		}
109	}
110
111	/// Perform library built-in maintenance refresh
112	async fn run_maintenance_refresh_process(&self) {
113		loop {
114			if let Err(e) = self.wallet.maintenance_refresh().await {
115				warn!("An error occured while performing maintenance refresh: {e}");
116			}
117
118			tokio::select! {
119				_ = tokio::time::sleep(SLOW_INTERVAL) => {},
120
121				_ = self.shutdown.cancelled() => {
122					info!("Shutdown signal received! Shutting maintenance refresh process...");
123					break;
124				},
125			}
126		}
127	}
128
129	/// Progress any ongoing unilateral exits and sync the exit statuses
130	async fn run_exits(&self) {
131		let mut onchain = self.onchain.write().await;
132
133		let mut exit_lock = self.wallet.exit.write().await;
134		if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
135			warn!("An error occurred while syncing exits: {e}");
136		}
137
138		if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
139			warn!("An error occurred while progressing exits: {e}");
140		}
141	}
142
143	/// Subscribe to round event stream and process each incoming event
144	async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
145		let mut events = self.wallet.subscribe_round_events().await?;
146
147		loop {
148			tokio::select! {
149				res = events.next() => {
150					let event = res.context("events stream broke")?
151						.context("error on event stream")?;
152
153					self.wallet.progress_pending_rounds(Some(&event)).await?;
154				},
155				_ = self.shutdown.cancelled() => {
156					info!("Shutdown signal received! Shutting inner round events process...");
157					return Ok(());
158				},
159			}
160		}
161	}
162
163	/// Recursively resubscribe to round event stream by waiting and
164	/// calling [Self::inner_process_pending_rounds] again until
165	/// the daemon is shutdown.
166	async fn run_round_events_process(&self) {
167		loop {
168			if self.connected.load(Ordering::Relaxed) {
169				if let Err(e) = self.inner_process_pending_rounds().await {
170					warn!("An error occured while processing pending rounds: {e}");
171				}
172			}
173
174			tokio::select! {
175				_ = tokio::time::sleep(SLOW_INTERVAL) => {},
176				_ = self.shutdown.cancelled() => {
177					info!("Shutdown signal received! Shutting round events process...");
178					break;
179				},
180			}
181		}
182	}
183
184	/// Run a process that will recursively check the server connection
185	async fn run_server_connection_check_process(&self) {
186		loop {
187			tokio::select! {
188				_ = tokio::time::sleep(FAST_INTERVAL) => {},
189				_ = self.shutdown.cancelled() => {
190					info!("Shutdown signal received! Shutting server connection check process...");
191					break;
192				},
193			}
194
195			let connected = self.wallet.refresh_server().await.is_ok();
196			self.connected.store(connected, Ordering::Relaxed);
197		}
198	}
199
200	async fn run_sync_processes(&self) {
201		let mut fast_interval = tokio::time::interval(FAST_INTERVAL);
202		fast_interval.reset();
203		let mut medium_interval = tokio::time::interval(MEDIUM_INTERVAL);
204		medium_interval.reset();
205		let mut slow_interval = tokio::time::interval(SLOW_INTERVAL);
206		slow_interval.reset();
207
208		loop {
209			tokio::select! {
210				_ = fast_interval.tick() => {
211					if !self.connected.load(Ordering::Relaxed) {
212						continue;
213					}
214
215					self.run_lightning_sync().await;
216					fast_interval.reset();
217				},
218				_ = medium_interval.tick() => {
219					if !self.connected.load(Ordering::Relaxed) {
220						continue;
221					}
222
223					self.run_arkoors_sync().await;
224					self.run_boards_sync().await;
225					medium_interval.reset();
226				},
227				_ = slow_interval.tick() => {
228					if !self.connected.load(Ordering::Relaxed) {
229						continue;
230					}
231
232					self.run_onchain_sync().await;
233					self.run_exits().await;
234					slow_interval.reset();
235				},
236				_ = self.shutdown.cancelled() => {
237					info!("Shutdown signal received! Shutting sync processes...");
238					break;
239				},
240			}
241		}
242	}
243
244	pub async fn run(self) {
245		let connected = self.wallet.server.read().is_some();
246		self.connected.store(connected, Ordering::Relaxed);
247
248		let _ = tokio::join!(
249			self.run_server_connection_check_process(),
250			self.run_round_events_process(),
251			self.run_sync_processes(),
252			self.run_maintenance_refresh_process(),
253		);
254
255		info!("Daemon gracefully stopped");
256	}
257}