bark/
daemon.rs

1pub use tokio_util::sync::CancellationToken;
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use anyhow::Context;
8use futures::StreamExt;
9use log::{info, warn};
10use tokio::sync::RwLock;
11
12use crate::Wallet;
13use crate::onchain::ExitUnilaterally;
14
15lazy_static::lazy_static! {
16	static ref FAST_INTERVAL: Duration = Duration::from_secs(1);
17	static ref MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
18	static ref SLOW_INTERVAL: Duration = Duration::from_secs(60);
19}
20
21/// The daemon is responsible for running the wallet and performing the
22/// necessary actions to keep the wallet in a healthy state
23pub struct Daemon {
24	shutdown: CancellationToken,
25
26	connected: AtomicBool,
27	wallet: Arc<Wallet>,
28	onchain: Arc<RwLock<dyn ExitUnilaterally>>,
29}
30
31impl Daemon {
32	/// Create a new [Daemon]
33	///
34	/// Arguments:
35	/// - `shutdown`: A signal receive to shutdown the daemon
36	/// - `wallet`: The wallet to run the daemon on
37	/// - `onchain`: The onchain wallet to run the daemon on
38	///
39	/// Returns:
40	/// - The new [Daemon]
41	pub fn new(
42		shutdown: CancellationToken,
43		wallet: Arc<Wallet>,
44		onchain: Arc<RwLock<dyn ExitUnilaterally>>,
45	) -> anyhow::Result<Daemon> {
46		let daemon = Daemon {
47			shutdown,
48
49			connected: AtomicBool::new(false),
50			wallet,
51			onchain,
52		};
53
54		Ok(daemon)
55	}
56
57	/// Run lightning sync process
58	/// - Try to claim all pending lightning receives
59	/// - Sync pending lightning sends
60	async fn run_lightning_sync(&self) {
61		if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
62			warn!("An error occured while checking and claiming pending lightning receives: {e}");
63		}
64
65		if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
66			warn!("An error occured while syncing pending lightning sends: {e}");
67		}
68	}
69
70	/// Check for incoming arkoors
71	async fn run_arkoors_sync(&self) {
72		if let Err(e) = self.wallet.sync_oors().await {
73			warn!("An error occured while syncing oors: {e}");
74		}
75	}
76
77	/// Sync pending boards, register new ones if needed
78	async fn run_boards_sync(&self) {
79		if let Err(e) = self.wallet.sync_pending_boards().await {
80			warn!("An error occured while syncing pending board: {e}");
81		}
82	}
83
84	/// Perform library built-in maintenance refresh
85	async fn run_maintenance_refresh_process(&self) {
86		loop {
87			if let Err(e) = self.wallet.maintenance_refresh().await {
88				warn!("An error occured while performing maintenance refresh: {e}");
89			}
90
91			tokio::select! {
92				_ = tokio::time::sleep(*SLOW_INTERVAL) => {},
93				_ = self.shutdown.cancelled() => {
94					info!("Shutdown signal received! Shutting maintenance refresh process...");
95					break;
96				},
97			}
98		}
99	}
100
101	/// Progress any ongoing unilateral exits and sync the exit statuses
102	async fn run_exits(&self) {
103		let mut onchain = self.onchain.write().await;
104
105		if let Err(e) = self.wallet.sync_exits(&mut *onchain).await {
106			warn!("An error occured while syncing exits: {e}");
107		}
108
109		let mut exit_lock = self.wallet.exit.write().await;
110		if let Err(e) = exit_lock.progress_exits(&mut *onchain, None).await {
111			warn!("An error occured while progressing exits: {e}");
112		}
113		drop(exit_lock);
114	}
115
116	/// Subscribe to round event stream and process each incoming event
117	async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
118		let mut events = self.wallet.subscribe_round_events().await?;
119
120		loop {
121			tokio::select! {
122				res = events.next() => {
123					let event = res.context("events stream broke")?
124						.context("error on event stream")?;
125
126					self.wallet.progress_ongoing_rounds(Some(&event)).await?;
127				},
128				_ = self.shutdown.cancelled() => {
129					info!("Shutdown signal received! Shutting round events process...");
130					return Ok(());
131				},
132			}
133		}
134	}
135
136	/// Recursively resubscribe to round event stream by waiting and
137	/// calling [Self::inner_process_pending_rounds] again until
138	/// the daemon is shutdown.
139	async fn run_round_events_process(&self) {
140		loop {
141			if self.connected.load(Ordering::Relaxed) {
142				if let Err(e) = self.inner_process_pending_rounds().await {
143					warn!("An error occured while processing pending rounds: {e}");
144				}
145			}
146
147			tokio::select! {
148				_ = tokio::time::sleep(*SLOW_INTERVAL) => {},
149				_ = self.shutdown.cancelled() => {
150					info!("Shutdown signal received! Shutting round events process...");
151					break;
152				},
153			}
154		}
155	}
156
157	/// Run a single connection check and return whether the server is
158	/// connected or not
159	async fn run_server_connection_check(&self) -> bool {
160		self.wallet.check_connection().await
161			.inspect_err(|e| warn!("Server connection lost: {:#}", e)).is_ok()
162	}
163
164	/// Run a process that will recursively check the server connection
165	async fn run_server_connection_check_process(&self) {
166		loop {
167			tokio::select! {
168				_ = tokio::time::sleep(*FAST_INTERVAL) => {},
169				_ = self.shutdown.cancelled() => {
170					info!("Shutdown signal received! Shutting server connection check process...");
171					break;
172				},
173			}
174
175			let connected = self.run_server_connection_check().await;
176			self.connected.store(connected, Ordering::Relaxed);
177		}
178	}
179
180	async fn run_sync_processes(&self) {
181		let mut fast_interval = tokio::time::interval(*FAST_INTERVAL);
182		fast_interval.reset();
183		let mut medium_interval = tokio::time::interval(*MEDIUM_INTERVAL);
184		medium_interval.reset();
185		let mut slow_interval = tokio::time::interval(*SLOW_INTERVAL);
186		slow_interval.reset();
187
188		loop {
189			tokio::select! {
190				_ = fast_interval.tick() => {
191					if !self.connected.load(Ordering::Relaxed) {
192						continue;
193					}
194
195					self.run_lightning_sync().await;
196					fast_interval.reset();
197				},
198				_ = medium_interval.tick() => {
199					if !self.connected.load(Ordering::Relaxed) {
200						continue;
201					}
202
203					self.run_arkoors_sync().await;
204					self.run_boards_sync().await;
205					medium_interval.reset();
206				},
207				_ = slow_interval.tick() => {
208					if !self.connected.load(Ordering::Relaxed) {
209						continue;
210					}
211
212					self.run_exits().await;
213					slow_interval.reset();
214				},
215				_ = self.shutdown.cancelled() => {
216					info!("Shutdown signal received! Shutting sync processes...");
217					break;
218				},
219			}
220		}
221	}
222
223	pub async fn run(self) {
224		let connected = self.run_server_connection_check().await;
225		self.connected.store(connected, Ordering::Relaxed);
226
227		let _ = tokio::join!(
228			self.run_server_connection_check_process(),
229			self.run_round_events_process(),
230			self.run_sync_processes(),
231			self.run_maintenance_refresh_process(),
232		);
233
234		info!("Daemon gracefully stopped");
235	}
236}