1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use ark::rounds::RoundEvent;
6use futures::{FutureExt, StreamExt};
7use log::{info, trace, warn};
8use tokio::sync::RwLock;
9#[cfg(not(feature = "wasm-web"))]
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15use crate::utils::time::sleep;
16
17
18
19#[cfg(not(feature = "wasm-web"))]
21pub struct DaemonHandle {
22 shutdown: CancellationToken,
23 jh: JoinHandle<()>,
24}
25
26#[cfg(feature = "wasm-web")]
28pub struct DaemonHandle {
29 shutdown: CancellationToken,
30}
31
32impl DaemonHandle {
33 pub fn stop(&self) {
35 self.shutdown.cancel();
36 }
37
38 pub async fn stop_wait(self) -> anyhow::Result<()> {
40 self.stop();
41 #[cfg(not(feature = "wasm-web"))]
42 self.jh.await?;
43 Ok(())
44 }
45}
46
47pub(crate) fn start_daemon(
48 wallet: Wallet,
49 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
50) -> DaemonHandle {
51 let shutdown = CancellationToken::new();
52 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
53
54 #[cfg(not(feature = "wasm-web"))]
55 {
56 let jh = crate::utils::spawn(proc.run());
57 DaemonHandle { shutdown, jh }
58 }
59 #[cfg(feature = "wasm-web")]
60 {
61 crate::utils::spawn(proc.run());
62 DaemonHandle { shutdown }
63 }
64}
65
66struct DaemonProcess {
69 shutdown: CancellationToken,
70
71 connected: AtomicBool,
72 wallet: Wallet,
73 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
74}
75
76impl DaemonProcess {
77 fn new(
78 shutdown: CancellationToken,
79 wallet: Wallet,
80 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
81 ) -> DaemonProcess {
82 DaemonProcess {
83 connected: AtomicBool::new(false),
84 shutdown,
85 wallet,
86 onchain,
87 }
88 }
89
90 fn sync_interval(&self) -> Duration {
91 Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
92 }
93
94 async fn run_mailbox_messages_process(&self) {
101 loop {
102 let shutdown = self.shutdown.clone();
103 if self.connected.load(Ordering::Relaxed) {
104 let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
105 if let Err(e) = r {
106 warn!("An error occurred while processing mailbox messages: {e:#}");
107 self.connected.store(false, Ordering::Relaxed);
108 }
109 }
110
111 futures::select! {
112 _ = sleep(self.sync_interval()).fuse() => {},
113 _ = self.shutdown.cancelled().fuse() => {
114 info!("Shutdown signal received! Shutting mailbox messages process...");
115 break;
116 },
117 }
118 }
119 }
120
121 async fn run_boards_sync(&self) {
123 if let Err(e) = self.wallet.sync_pending_boards().await {
124 warn!("An error occured while syncing pending board: {e:#}");
125 }
126 }
127
128 async fn run_offboards_sync(&self) {
130 if let Err(e) = self.wallet.sync_pending_offboards().await {
131 warn!("An error occured while syncing pending offboards: {e:#}");
132 }
133 }
134
135 async fn run_rounds_sync(&self) {
137 if let Err(e) = self.wallet.sync_pending_rounds().await {
138 warn!("An error occured while syncing pending rounds: {e:#}");
139 }
140 }
141
142 async fn run_fee_rate_update(&self) {
144 if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
145 warn!("An error occured while updating fee rates: {e:#}");
146 }
147 }
148
149 async fn run_onchain_sync(&self) {
151 if let Some(onchain) = &self.onchain {
152 let mut onchain = onchain.write().await;
153 if let Err(e) = onchain.sync(self.wallet.chain()).await {
154 warn!("An error occured while syncing onchain: {e:#}");
155 }
156 }
157 }
158
159 async fn run_exits(&self) {
161 if let Some(onchain) = &self.onchain {
162 let mut onchain = onchain.write().await;
163 if let Err(e) = self.wallet.exit_mgr().progress_exits_with_bdk(&self.wallet, &mut *onchain, None).await {
164 warn!("An error occurred while progressing exits: {e:#}");
165 }
166 }
167 }
168
169 async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
170 match &event {
172 &RoundEvent::Attempt(attempt) => {
173 if attempt.attempt_seq == 0 {
174 if let Err(err) = self.wallet.join_round_for_maintenance_refresh(attempt).await {
175 warn!("Failed to join round for maintenance refresh: {:#}", err);
176 }
177 };
178 },
179 _ => {},
180 };
181
182 self.wallet.progress_pending_rounds(Some(event)).await
183 }
184
185 async fn process_round_event_stream(&self) -> anyhow::Result<()> {
188 let mut events = self.wallet.subscribe_round_events().await?;
189
190 loop {
191 futures::select! {
192 res = events.next().fuse() => {
193 match res {
194 Some(Ok(event)) => {
195 if let Err(e) = self.handle_round_event(&event).await {
196 warn!("Error processing round event: {e:#}");
197 }
198 },
199 Some(Err(e)) => {
200 return Err(e.context("error on event stream"));
201 },
202 None => {
203 return Ok(());
204 },
205 }
206 },
207 _ = self.shutdown.cancelled().fuse() => {
208 info!("Shutdown signal received! Shutting round events stream...");
209 return Ok(());
210 },
211 }
212 }
213 }
214
215 async fn run_round_events_process(&self) {
218 loop {
219 if self.shutdown.is_cancelled() {
220 info!("Shutdown signal received! Shutting round events process...");
221 break;
222 }
223
224 match self.process_round_event_stream().await {
225 Ok(()) => {},
226 Err(e) if crate::utils::is_h2_stream_error(&e) => {
230 trace!("Round events stream reset by server, reconnecting: {e:#}");
231 },
232 Err(e) => {
233 warn!("An error occured while processing pending rounds: {e:#}");
234 futures::select! {
235 _ = sleep(self.sync_interval()).fuse() => {},
236 _ = self.shutdown.cancelled().fuse() => {
237 info!("Shutdown signal received! Shutting round events process...");
238 break;
239 },
240 }
241 },
242 }
243 }
244 }
245
246 async fn run_server_connection_check_process(&self) {
251 loop {
252 futures::select! {
253 _ = sleep(self.sync_interval()).fuse() => {},
254 _ = self.shutdown.cancelled().fuse() => {
255 info!("Shutdown signal received! Shutting server connection check process...");
256 break;
257 },
258 }
259
260 if self.connected.load(Ordering::Relaxed) {
261 continue;
262 }
263
264 let result = self.wallet.refresh_server().await;
265 if let Err(ref e) = result {
266 warn!("Ark server reconnect failed: {:#}", e);
267 } else {
268 info!("Ark server reconnected");
269 self.connected.store(true, Ordering::Relaxed);
270 }
271 }
272 }
273
274 async fn run_sync_processes(&self) {
275 let mut sync_interval = tokio::time::interval(self.sync_interval());
276
277 loop {
278 futures::select! {
279 _ = sync_interval.tick().fuse() => {
280 if self.connected.load(Ordering::Relaxed) {
281 self.run_fee_rate_update().await;
282 self.run_boards_sync().await;
283 self.run_offboards_sync().await;
284 }
285 self.run_onchain_sync().await;
286 self.run_rounds_sync().await;
287 self.run_exits().await;
288 sync_interval.reset();
289 },
290 _ = self.shutdown.cancelled().fuse() => {
291 info!("Shutdown signal received! Shutting sync processes...");
292 break;
293 },
294 }
295 }
296 }
297
298 async fn run_startup_tasks(&self) {
300 let result = self.wallet.refresh_server().await;
305 if let Err(ref e) = result {
306 warn!("Ark server refresh failed: {:#}", e);
307 }
308 let connected = self.wallet.inner.server.initialized();
309 self.connected.store(connected, Ordering::Relaxed);
310
311 if !self.wallet.config().daemon_manual_sync {
312 self.wallet.sync().await;
313 }
314 }
315
316 pub async fn run(self) {
317 info!("Starting daemon for wallet {}", self.wallet.fingerprint());
318
319 self.run_startup_tasks().await;
320
321 if self.wallet.config().daemon_manual_sync {
322 info!("Daemon running in manual-sync mode; background sync disabled");
325 let _ = self.run_server_connection_check_process().await;
326 } else {
327 #[cfg(not(feature = "wasm-web"))]
328 {
329 let proc = Arc::new(self);
333 let p1 = Arc::clone(&proc);
334 let p2 = Arc::clone(&proc);
335 let p3 = Arc::clone(&proc);
336 let p4 = Arc::clone(&proc);
337 let _ = futures::join!(
338 supervised("server-connection", move || {
339 let p = Arc::clone(&p1);
340 async move { p.run_server_connection_check_process().await }
341 }),
342 supervised("round-events", move || {
343 let p = Arc::clone(&p2);
344 async move { p.run_round_events_process().await }
345 }),
346 supervised("sync", move || {
347 let p = Arc::clone(&p3);
348 async move { p.run_sync_processes().await }
349 }),
350 supervised("mailbox", move || {
351 let p = Arc::clone(&p4);
352 async move { p.run_mailbox_messages_process().await }
353 }),
354 );
355 }
356 #[cfg(feature = "wasm-web")]
357 {
358 let _ = futures::join!(
359 self.run_server_connection_check_process(),
360 self.run_round_events_process(),
361 self.run_sync_processes(),
362 self.run_mailbox_messages_process(),
363 );
364 }
365 }
366
367 info!("Daemon gracefully stopped");
368 }
369}
370
371#[cfg(not(feature = "wasm-web"))]
375async fn supervised<F, Fut>(name: &'static str, f: F)
376where
377 F: Fn() -> Fut,
378 Fut: std::future::Future<Output = ()> + Send + 'static,
379{
380 loop {
381 match tokio::spawn(f()).await {
382 Ok(()) => break,
383 Err(e) => {
384 warn!("Daemon task '{}' terminated unexpectedly, restarting: {e}", name);
385 tokio::time::sleep(Duration::from_secs(1)).await;
386 },
387 }
388 }
389}