1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use ark::rounds::RoundEvent;
6use futures::{FutureExt, StreamExt};
7use log::{info, trace, warn};
8use tokio::sync::RwLock;
9#[cfg(not(feature = "wasm-web"))]
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15use crate::utils::time::sleep;
16
17
18
19#[cfg(not(feature = "wasm-web"))]
21pub struct DaemonHandle {
22 shutdown: CancellationToken,
23 jh: JoinHandle<()>,
24}
25
26#[cfg(feature = "wasm-web")]
28pub struct DaemonHandle {
29 shutdown: CancellationToken,
30}
31
32impl DaemonHandle {
33 pub fn stop(&self) {
35 self.shutdown.cancel();
36 }
37
38 pub async fn stop_wait(self) -> anyhow::Result<()> {
40 self.stop();
41 #[cfg(not(feature = "wasm-web"))]
42 self.jh.await?;
43 Ok(())
44 }
45}
46
47pub(crate) fn start_daemon(
48 wallet: Wallet,
49 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
50) -> DaemonHandle {
51 let shutdown = CancellationToken::new();
52 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
53
54 #[cfg(not(feature = "wasm-web"))]
55 {
56 let jh = crate::utils::spawn(proc.run());
57 DaemonHandle { shutdown, jh }
58 }
59 #[cfg(feature = "wasm-web")]
60 {
61 crate::utils::spawn(proc.run());
62 DaemonHandle { shutdown }
63 }
64}
65
66struct DaemonProcess {
69 shutdown: CancellationToken,
70
71 connected: AtomicBool,
72 wallet: Wallet,
73 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
74}
75
76impl DaemonProcess {
77 fn new(
78 shutdown: CancellationToken,
79 wallet: Wallet,
80 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
81 ) -> DaemonProcess {
82 DaemonProcess {
83 connected: AtomicBool::new(false),
84 shutdown,
85 wallet,
86 onchain,
87 }
88 }
89
90 fn sync_interval(&self) -> Duration {
91 Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
92 }
93
94 async fn run_mailbox_messages_process(&self) {
101 loop {
102 let shutdown = self.shutdown.clone();
103 if self.connected.load(Ordering::Relaxed) {
104 let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
105 if let Err(e) = r {
106 warn!("An error occurred while processing mailbox messages: {e:#}");
107 self.connected.store(false, Ordering::Relaxed);
108 }
109 }
110
111 futures::select! {
112 _ = sleep(self.sync_interval()).fuse() => {},
113 _ = self.shutdown.cancelled().fuse() => {
114 info!("Shutdown signal received! Shutting mailbox messages process...");
115 break;
116 },
117 }
118 }
119 }
120
121 async fn run_boards_sync(&self) {
123 if let Err(e) = self.wallet.sync_pending_boards().await {
124 warn!("An error occured while syncing pending board: {e:#}");
125 }
126 }
127
128 async fn run_offboards_sync(&self) {
130 if let Err(e) = self.wallet.sync_pending_offboards().await {
131 warn!("An error occured while syncing pending offboards: {e:#}");
132 }
133 }
134
135 async fn run_rounds_sync(&self) {
137 if let Err(e) = self.wallet.sync_pending_rounds().await {
138 warn!("An error occured while syncing pending rounds: {e:#}");
139 }
140 }
141
142 async fn run_fee_rate_update(&self) {
144 if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
145 warn!("An error occured while updating fee rates: {e:#}");
146 }
147 }
148
149 async fn run_onchain_sync(&self) {
151 if let Some(onchain) = &self.onchain {
152 let mut onchain = onchain.write().await;
153 if let Err(e) = onchain.sync(self.wallet.chain()).await {
154 warn!("An error occured while syncing onchain: {e:#}");
155 }
156 }
157 }
158
159 async fn run_exits(&self) {
161 if let Some(onchain) = &self.onchain {
162 let mut onchain = onchain.write().await;
163 if let Err(e) = self.wallet.exit_mgr().progress_exits_with_bdk(&self.wallet, &mut *onchain, None).await {
164 warn!("An error occurred while progressing exits: {e:#}");
165 }
166 }
167 }
168
169 async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
170 match &event {
172 &RoundEvent::Attempt(attempt) => {
173 if attempt.attempt_seq == 0 {
174 match self.wallet.maybe_schedule_maintenance_refresh().await {
175 Ok(_) => {},
176 Err(err) => warn!("Failed to schedule maintenance refresh: {:?}", err),
177 }
178 };
179 },
180 _ => {},
181 };
182
183 self.wallet.progress_pending_rounds(Some(event)).await
184 }
185
186 async fn process_round_event_stream(&self) -> anyhow::Result<()> {
189 let mut events = self.wallet.subscribe_round_events().await?;
190
191 loop {
192 futures::select! {
193 res = events.next().fuse() => {
194 match res {
195 Some(Ok(event)) => {
196 if let Err(e) = self.handle_round_event(&event).await {
197 warn!("Error processing round event: {e:#}");
198 }
199 },
200 Some(Err(e)) => {
201 return Err(e.context("error on event stream"));
202 },
203 None => {
204 return Ok(());
205 },
206 }
207 },
208 _ = self.shutdown.cancelled().fuse() => {
209 info!("Shutdown signal received! Shutting round events stream...");
210 return Ok(());
211 },
212 }
213 }
214 }
215
216 async fn run_round_events_process(&self) {
219 loop {
220 if self.shutdown.is_cancelled() {
221 info!("Shutdown signal received! Shutting round events process...");
222 break;
223 }
224
225 let started_at = std::time::Instant::now();
226 if let Err(e) = self.process_round_event_stream().await {
227 warn!("An error occured while processing pending rounds: {e:#}");
228 }
229
230 if started_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
231 trace!("Round events stream closed after healthy session, reconnecting");
232 continue;
233 }
234
235 futures::select! {
236 _ = sleep(self.sync_interval()).fuse() => {},
237 _ = self.shutdown.cancelled().fuse() => {
238 info!("Shutdown signal received! Shutting round events process...");
239 break;
240 },
241 }
242 }
243 }
244
245 async fn run_server_connection_check_process(&self) {
250 loop {
251 futures::select! {
252 _ = sleep(self.sync_interval()).fuse() => {},
253 _ = self.shutdown.cancelled().fuse() => {
254 info!("Shutdown signal received! Shutting server connection check process...");
255 break;
256 },
257 }
258
259 if self.connected.load(Ordering::Relaxed) {
260 continue;
261 }
262
263 let result = self.wallet.refresh_server().await;
264 if let Err(ref e) = result {
265 warn!("Ark server reconnect failed: {:#}", e);
266 } else {
267 info!("Ark server reconnected");
268 self.connected.store(true, Ordering::Relaxed);
269 }
270 }
271 }
272
273 async fn run_sync_processes(&self) {
274 let mut sync_interval = tokio::time::interval(self.sync_interval());
275
276 loop {
277 futures::select! {
278 _ = sync_interval.tick().fuse() => {
279 if self.connected.load(Ordering::Relaxed) {
280 self.run_fee_rate_update().await;
281 self.run_boards_sync().await;
282 self.run_offboards_sync().await;
283 }
284 self.run_onchain_sync().await;
285 self.run_rounds_sync().await;
286 self.run_exits().await;
287 sync_interval.reset();
288 },
289 _ = self.shutdown.cancelled().fuse() => {
290 info!("Shutdown signal received! Shutting sync processes...");
291 break;
292 },
293 }
294 }
295 }
296
297 async fn run_startup_tasks(&self) {
299 let result = self.wallet.refresh_server().await;
304 if let Err(ref e) = result {
305 warn!("Ark server refresh failed: {:#}", e);
306 }
307 let connected = self.wallet.inner.server.initialized();
308 self.connected.store(connected, Ordering::Relaxed);
309
310 if !self.wallet.config().daemon_manual_sync {
311 self.wallet.sync().await;
312 }
313 }
314
315 pub async fn run(self) {
316 info!("Starting daemon for wallet {}", self.wallet.fingerprint());
317
318 self.run_startup_tasks().await;
319
320 if self.wallet.config().daemon_manual_sync {
321 info!("Daemon running in manual-sync mode; background sync disabled");
324 let _ = self.run_server_connection_check_process().await;
325 } else {
326 let _ = futures::join!(
327 self.run_server_connection_check_process(),
328 self.run_round_events_process(),
329 self.run_sync_processes(),
330 self.run_mailbox_messages_process(),
331 );
332 }
333
334 info!("Daemon gracefully stopped");
335 }
336}