1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::{FutureExt, StreamExt};
8use log::{info, warn};
9use tokio::sync::RwLock;
10#[cfg(not(feature = "wasm-web"))]
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13
14use crate::Wallet;
15use crate::onchain::DaemonizableOnchainWallet;
16
17
18#[cfg(not(feature = "wasm-web"))]
20pub struct DaemonHandle {
21 shutdown: CancellationToken,
22 jh: JoinHandle<()>,
23}
24
25#[cfg(feature = "wasm-web")]
27pub struct DaemonHandle {
28 shutdown: CancellationToken,
29}
30
31impl DaemonHandle {
32 pub fn stop(&self) {
34 self.shutdown.cancel();
35 }
36
37 pub async fn stop_wait(self) -> anyhow::Result<()> {
39 self.stop();
40 #[cfg(not(feature = "wasm-web"))]
41 self.jh.await?;
42 Ok(())
43 }
44}
45
46pub(crate) fn start_daemon(
47 wallet: Arc<Wallet>,
48 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
49) -> DaemonHandle {
50 let shutdown = CancellationToken::new();
51 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
52
53 #[cfg(not(feature = "wasm-web"))]
54 {
55 let jh = crate::utils::spawn(proc.run());
56 DaemonHandle { shutdown, jh }
57 }
58 #[cfg(feature = "wasm-web")]
59 {
60 crate::utils::spawn(proc.run());
61 DaemonHandle { shutdown }
62 }
63}
64
65struct DaemonProcess {
68 shutdown: CancellationToken,
69
70 connected: AtomicBool,
71 wallet: Arc<Wallet>,
72 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
73}
74
75impl DaemonProcess {
76 fn new(
77 shutdown: CancellationToken,
78 wallet: Arc<Wallet>,
79 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
80 ) -> DaemonProcess {
81 DaemonProcess {
82 connected: AtomicBool::new(false),
83 shutdown,
84 wallet,
85 onchain,
86 }
87 }
88
89 fn fast_interval(&self) -> Duration {
90 Duration::from_secs(self.wallet.config().daemon_fast_sync_interval_secs)
91 }
92
93 fn slow_interval(&self) -> Duration {
94 Duration::from_secs(self.wallet.config().daemon_slow_sync_interval_secs)
95 }
96
97 async fn run_lightning_sync(&self) {
101 if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
102 warn!("An error occured while checking and claiming pending lightning receives: {e:#}");
103 }
104
105 if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
106 warn!("An error occured while syncing pending lightning sends: {e:#}");
107 }
108 }
109
110 async fn run_mailbox_messages_process(&self) {
114 loop {
115 let shutdown = self.shutdown.clone();
116 if self.connected.load(Ordering::Relaxed) {
117 let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
118 if let Err(e) = r {
119 warn!("An error occurred while processing mailbox messages: {e:#}");
120 }
121 }
122
123 futures::select! {
124 _ = tokio::time::sleep(self.slow_interval()).fuse() => {},
125 _ = self.shutdown.cancelled().fuse() => {
126 info!("Shutdown signal received! Shutting mailbox messages process...");
127 break;
128 },
129 }
130 }
131 }
132
133 async fn run_boards_sync(&self) {
135 if let Err(e) = self.wallet.sync_pending_boards().await {
136 warn!("An error occured while syncing pending board: {e:#}");
137 }
138 }
139
140 async fn run_offboards_sync(&self) {
142 if let Err(e) = self.wallet.sync_pending_offboards().await {
143 warn!("An error occured while syncing pending offboards: {e:#}");
144 }
145 }
146
147 async fn run_fee_rate_update(&self) {
149 if let Err(e) = self.wallet.chain.update_fee_rates(self.wallet.config.fallback_fee_rate).await {
150 warn!("An error occured while updating fee rates: {e:#}");
151 }
152 }
153
154 async fn run_onchain_sync(&self) {
156 if let Some(onchain) = &self.onchain {
157 let mut onchain = onchain.write().await;
158 if let Err(e) = onchain.sync(&self.wallet.chain).await {
159 warn!("An error occured while syncing onchain: {e:#}");
160 }
161 }
162 }
163
164 async fn run_maintenance_refresh_process(&self) {
166 if let Err(e) = self.wallet.maintenance_refresh().await {
167 warn!("An error occured while performing maintenance refresh: {e:#}");
168 }
169 }
170
171 async fn run_exits(&self) {
173 if let Some(onchain) = &self.onchain {
174 let mut onchain = onchain.write().await;
175 let mut exit_lock = self.wallet.exit.write().await;
176 if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
177 warn!("An error occurred while syncing exits: {e:#}");
178 }
179
180 if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
181 warn!("An error occurred while progressing exits: {e:#}");
182 }
183 }
184 }
185
186 async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
188 let mut events = self.wallet.subscribe_round_events().await?;
189
190 loop {
191 futures::select! {
192 res = events.next().fuse() => {
193 let event = res.context("events stream broke")?
194 .context("error on event stream")?;
195
196 self.wallet.progress_pending_rounds(Some(&event)).await?;
197 },
198 _ = self.shutdown.cancelled().fuse() => {
199 info!("Shutdown signal received! Shutting inner round events process...");
200 return Ok(());
201 },
202 }
203 }
204 }
205
206 async fn run_round_events_process(&self) {
210 loop {
211 if self.connected.load(Ordering::Relaxed) {
212 if let Err(e) = self.inner_process_pending_rounds().await {
213 warn!("An error occured while processing pending rounds: {e:#}");
214 }
215 }
216
217 futures::select! {
218 _ = tokio::time::sleep(self.slow_interval()).fuse() => {},
219 _ = self.shutdown.cancelled().fuse() => {
220 info!("Shutdown signal received! Shutting round events process...");
221 break;
222 },
223 }
224 }
225 }
226
227 async fn run_server_connection_check_process(&self) {
229 loop {
230 futures::select! {
231 _ = tokio::time::sleep(self.fast_interval()).fuse() => {},
232 _ = self.shutdown.cancelled().fuse() => {
233 info!("Shutdown signal received! Shutting server connection check process...");
234 break;
235 },
236 }
237
238 let connected = self.wallet.refresh_server().await.is_ok();
239 self.connected.store(connected, Ordering::Relaxed);
240 }
241 }
242
243 async fn run_sync_processes(&self) {
244 let mut fast_interval = tokio::time::interval(self.fast_interval());
245 fast_interval.reset();
246 let mut slow_interval = tokio::time::interval(self.slow_interval());
247 slow_interval.reset();
248
249 loop {
250 futures::select! {
251 _ = fast_interval.tick().fuse() => {
252 if !self.connected.load(Ordering::Relaxed) {
253 continue;
254 }
255
256 self.run_lightning_sync().await;
257 fast_interval.reset();
258 },
259 _ = slow_interval.tick().fuse() => {
260 if !self.connected.load(Ordering::Relaxed) {
261 continue;
262 }
263
264 self.run_fee_rate_update().await;
265 self.run_boards_sync().await;
266 self.run_offboards_sync().await;
267 self.run_maintenance_refresh_process().await;
268 self.run_onchain_sync().await;
269 self.run_exits().await;
270 slow_interval.reset();
271 },
272 _ = self.shutdown.cancelled().fuse() => {
273 info!("Shutdown signal received! Shutting sync processes...");
274 break;
275 },
276 }
277 }
278 }
279
280 pub async fn run(self) {
281 let connected = self.wallet.server.read().is_some();
282 self.connected.store(connected, Ordering::Relaxed);
283
284 let _ = futures::join!(
285 self.run_server_connection_check_process(),
286 self.run_round_events_process(),
287 self.run_sync_processes(),
288 self.run_mailbox_messages_process(),
289 );
290
291 info!("Daemon gracefully stopped");
292 }
293}