1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use ark::rounds::RoundEvent;
6use futures::{FutureExt, StreamExt};
7use log::{info, trace, warn};
8use tokio::sync::RwLock;
9#[cfg(not(feature = "wasm-web"))]
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15use crate::utils::time::sleep;
16
17
18
19#[cfg(not(feature = "wasm-web"))]
21pub struct DaemonHandle {
22 shutdown: CancellationToken,
23 jh: JoinHandle<()>,
24}
25
26#[cfg(feature = "wasm-web")]
28pub struct DaemonHandle {
29 shutdown: CancellationToken,
30}
31
32impl DaemonHandle {
33 pub fn stop(&self) {
35 self.shutdown.cancel();
36 }
37
38 pub async fn stop_wait(self) -> anyhow::Result<()> {
40 self.stop();
41 #[cfg(not(feature = "wasm-web"))]
42 self.jh.await?;
43 Ok(())
44 }
45}
46
47pub(crate) fn start_daemon(
48 wallet: Wallet,
49 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
50) -> DaemonHandle {
51 let shutdown = CancellationToken::new();
52 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
53
54 #[cfg(not(feature = "wasm-web"))]
55 {
56 let jh = crate::utils::spawn(proc.run());
57 DaemonHandle { shutdown, jh }
58 }
59 #[cfg(feature = "wasm-web")]
60 {
61 crate::utils::spawn(proc.run());
62 DaemonHandle { shutdown }
63 }
64}
65
66struct DaemonProcess {
69 shutdown: CancellationToken,
70
71 connected: AtomicBool,
72 wallet: Wallet,
73 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
74}
75
76impl DaemonProcess {
77 fn new(
78 shutdown: CancellationToken,
79 wallet: Wallet,
80 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
81 ) -> DaemonProcess {
82 DaemonProcess {
83 connected: AtomicBool::new(false),
84 shutdown,
85 wallet,
86 onchain,
87 }
88 }
89
90 fn sync_interval(&self) -> Duration {
91 Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
92 }
93
94 async fn run_mailbox_messages_process(&self) {
101 loop {
102 let shutdown = self.shutdown.clone();
103 if self.connected.load(Ordering::Relaxed) {
104 let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
105 if let Err(e) = r {
106 warn!("An error occurred while processing mailbox messages: {e:#}");
107 self.connected.store(false, Ordering::Relaxed);
108 }
109 }
110
111 futures::select! {
112 _ = sleep(self.sync_interval()).fuse() => {},
113 _ = self.shutdown.cancelled().fuse() => {
114 info!("Shutdown signal received! Shutting mailbox messages process...");
115 break;
116 },
117 }
118 }
119 }
120
121 async fn run_boards_sync(&self) {
123 if let Err(e) = self.wallet.sync_pending_boards().await {
124 warn!("An error occured while syncing pending board: {e:#}");
125 }
126 }
127
128 async fn run_offboards_sync(&self) {
130 if let Err(e) = self.wallet.sync_pending_offboards().await {
131 warn!("An error occured while syncing pending offboards: {e:#}");
132 }
133 }
134
135 async fn run_rounds_sync(&self) {
137 if let Err(e) = self.wallet.sync_pending_rounds().await {
138 warn!("An error occured while syncing pending rounds: {e:#}");
139 }
140 }
141
142 async fn run_fee_rate_update(&self) {
144 if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
145 warn!("An error occured while updating fee rates: {e:#}");
146 }
147 }
148
149 async fn run_onchain_sync(&self) {
151 if let Some(onchain) = &self.onchain {
152 let mut onchain = onchain.write().await;
153 if let Err(e) = onchain.sync(self.wallet.chain()).await {
154 warn!("An error occured while syncing onchain: {e:#}");
155 }
156 }
157 }
158
159 async fn run_exits(&self) {
161 if let Some(onchain) = &self.onchain {
162 let mut onchain = onchain.write().await;
163 if let Err(e) = self.wallet.exit_mgr().progress_exits_with_bdk(&self.wallet, &mut *onchain, None).await {
164 warn!("An error occurred while progressing exits: {e:#}");
165 }
166 }
167 }
168
169 async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
170 match &event {
172 &RoundEvent::Attempt(attempt) => {
173 if attempt.attempt_seq == 0 {
174 match self.wallet.maybe_schedule_maintenance_refresh().await {
175 Ok(_) => {},
176 Err(err) => warn!("Failed to schedule maintenance refresh: {:?}", err),
177 }
178 };
179 },
180 _ => {},
181 };
182
183 self.wallet.progress_pending_rounds(Some(event)).await
184 }
185
186 async fn process_round_event_stream(&self) -> anyhow::Result<()> {
189 let mut events = self.wallet.subscribe_round_events().await?;
190
191 loop {
192 futures::select! {
193 res = events.next().fuse() => {
194 match res {
195 Some(Ok(event)) => {
196 if let Err(e) = self.handle_round_event(&event).await {
197 warn!("Error processing round event: {e:#}");
198 }
199 },
200 Some(Err(e)) => {
201 return Err(e.context("error on event stream"));
202 },
203 None => {
204 return Ok(());
205 },
206 }
207 },
208 _ = self.shutdown.cancelled().fuse() => {
209 info!("Shutdown signal received! Shutting round events stream...");
210 return Ok(());
211 },
212 }
213 }
214 }
215
216 async fn run_round_events_process(&self) {
219 loop {
220 if self.shutdown.is_cancelled() {
221 info!("Shutdown signal received! Shutting round events process...");
222 break;
223 }
224
225 match self.process_round_event_stream().await {
226 Ok(()) => {},
227 Err(e) if crate::utils::is_h2_stream_error(&e) => {
231 trace!("Round events stream reset by server, reconnecting: {e:#}");
232 },
233 Err(e) => {
234 warn!("An error occured while processing pending rounds: {e:#}");
235 futures::select! {
236 _ = sleep(self.sync_interval()).fuse() => {},
237 _ = self.shutdown.cancelled().fuse() => {
238 info!("Shutdown signal received! Shutting round events process...");
239 break;
240 },
241 }
242 },
243 }
244 }
245 }
246
247 async fn run_server_connection_check_process(&self) {
252 loop {
253 futures::select! {
254 _ = sleep(self.sync_interval()).fuse() => {},
255 _ = self.shutdown.cancelled().fuse() => {
256 info!("Shutdown signal received! Shutting server connection check process...");
257 break;
258 },
259 }
260
261 if self.connected.load(Ordering::Relaxed) {
262 continue;
263 }
264
265 let result = self.wallet.refresh_server().await;
266 if let Err(ref e) = result {
267 warn!("Ark server reconnect failed: {:#}", e);
268 } else {
269 info!("Ark server reconnected");
270 self.connected.store(true, Ordering::Relaxed);
271 }
272 }
273 }
274
275 async fn run_sync_processes(&self) {
276 let mut sync_interval = tokio::time::interval(self.sync_interval());
277
278 loop {
279 futures::select! {
280 _ = sync_interval.tick().fuse() => {
281 if self.connected.load(Ordering::Relaxed) {
282 self.run_fee_rate_update().await;
283 self.run_boards_sync().await;
284 self.run_offboards_sync().await;
285 }
286 self.run_onchain_sync().await;
287 self.run_rounds_sync().await;
288 self.run_exits().await;
289 sync_interval.reset();
290 },
291 _ = self.shutdown.cancelled().fuse() => {
292 info!("Shutdown signal received! Shutting sync processes...");
293 break;
294 },
295 }
296 }
297 }
298
299 async fn run_startup_tasks(&self) {
301 let result = self.wallet.refresh_server().await;
306 if let Err(ref e) = result {
307 warn!("Ark server refresh failed: {:#}", e);
308 }
309 let connected = self.wallet.inner.server.initialized();
310 self.connected.store(connected, Ordering::Relaxed);
311
312 if !self.wallet.config().daemon_manual_sync {
313 self.wallet.sync().await;
314 }
315 }
316
317 pub async fn run(self) {
318 info!("Starting daemon for wallet {}", self.wallet.fingerprint());
319
320 self.run_startup_tasks().await;
321
322 if self.wallet.config().daemon_manual_sync {
323 info!("Daemon running in manual-sync mode; background sync disabled");
326 let _ = self.run_server_connection_check_process().await;
327 } else {
328 let _ = futures::join!(
329 self.run_server_connection_check_process(),
330 self.run_round_events_process(),
331 self.run_sync_processes(),
332 self.run_mailbox_messages_process(),
333 );
334 }
335
336 info!("Daemon gracefully stopped");
337 }
338}