1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use ark::rounds::RoundEvent;
6use futures::{FutureExt, StreamExt};
7use log::{info, trace, warn};
8use tokio::sync::RwLock;
9#[cfg(not(feature = "wasm-web"))]
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15
16
17
18#[cfg(not(feature = "wasm-web"))]
20pub struct DaemonHandle {
21 shutdown: CancellationToken,
22 jh: JoinHandle<()>,
23}
24
25#[cfg(feature = "wasm-web")]
27pub struct DaemonHandle {
28 shutdown: CancellationToken,
29}
30
31impl DaemonHandle {
32 pub fn stop(&self) {
34 self.shutdown.cancel();
35 }
36
37 pub async fn stop_wait(self) -> anyhow::Result<()> {
39 self.stop();
40 #[cfg(not(feature = "wasm-web"))]
41 self.jh.await?;
42 Ok(())
43 }
44}
45
46pub(crate) fn start_daemon(
47 wallet: Wallet,
48 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
49) -> DaemonHandle {
50 let shutdown = CancellationToken::new();
51 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
52
53 #[cfg(not(feature = "wasm-web"))]
54 {
55 let jh = crate::utils::spawn(proc.run());
56 DaemonHandle { shutdown, jh }
57 }
58 #[cfg(feature = "wasm-web")]
59 {
60 crate::utils::spawn(proc.run());
61 DaemonHandle { shutdown }
62 }
63}
64
65struct DaemonProcess {
68 shutdown: CancellationToken,
69
70 connected: AtomicBool,
71 wallet: Wallet,
72 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
73}
74
75impl DaemonProcess {
76 fn new(
77 shutdown: CancellationToken,
78 wallet: Wallet,
79 onchain: Option<Arc<RwLock<dyn DaemonizableOnchainWallet>>>,
80 ) -> DaemonProcess {
81 DaemonProcess {
82 connected: AtomicBool::new(false),
83 shutdown,
84 wallet,
85 onchain,
86 }
87 }
88
89 fn sync_interval(&self) -> Duration {
90 Duration::from_secs(self.wallet.config().daemon_sync_interval_secs)
91 }
92
93 async fn run_mailbox_messages_process(&self) {
97 loop {
98 let shutdown = self.shutdown.clone();
99 if self.connected.load(Ordering::Relaxed) {
100 let r = self.wallet.subscribe_process_mailbox_messages(None, shutdown).await;
101 if let Err(e) = r {
102 warn!("An error occurred while processing mailbox messages: {e:#}");
103 }
104 }
105
106 futures::select! {
107 _ = tokio::time::sleep(self.sync_interval()).fuse() => {},
108 _ = self.shutdown.cancelled().fuse() => {
109 info!("Shutdown signal received! Shutting mailbox messages process...");
110 break;
111 },
112 }
113 }
114 }
115
116 async fn run_boards_sync(&self) {
118 if let Err(e) = self.wallet.sync_pending_boards().await {
119 warn!("An error occured while syncing pending board: {e:#}");
120 }
121 }
122
123 async fn run_offboards_sync(&self) {
125 if let Err(e) = self.wallet.sync_pending_offboards().await {
126 warn!("An error occured while syncing pending offboards: {e:#}");
127 }
128 }
129
130 async fn run_rounds_sync(&self) {
132 if let Err(e) = self.wallet.sync_pending_rounds().await {
133 warn!("An error occured while syncing pending rounds: {e:#}");
134 }
135 }
136
137 async fn run_fee_rate_update(&self) {
139 if let Err(e) = self.wallet.chain().update_fee_rates(self.wallet.config().fallback_fee_rate).await {
140 warn!("An error occured while updating fee rates: {e:#}");
141 }
142 }
143
144 async fn run_onchain_sync(&self) {
146 if let Some(onchain) = &self.onchain {
147 let mut onchain = onchain.write().await;
148 if let Err(e) = onchain.sync(self.wallet.chain()).await {
149 warn!("An error occured while syncing onchain: {e:#}");
150 }
151 }
152 }
153
154 async fn run_exits(&self) {
156 if let Some(onchain) = &self.onchain {
157 let mut onchain = onchain.write().await;
158 if let Err(e) = self.wallet.exit_mgr().sync_no_progress(&*onchain).await {
159 warn!("An error occurred while syncing exits: {e:#}");
160 }
161
162 if let Err(e) = self.wallet.exit_mgr().progress_exits(&self.wallet, &mut *onchain, None).await {
163 warn!("An error occurred while progressing exits: {e:#}");
164 }
165 }
166 }
167
168 async fn handle_round_event(&self, event: &RoundEvent) -> anyhow::Result<()> {
169 match &event {
171 &RoundEvent::Attempt(attempt) => {
172 if attempt.attempt_seq == 0 {
173 match self.wallet.maybe_schedule_maintenance_refresh().await {
174 Ok(_) => {},
175 Err(err) => warn!("Failed to schedule maintenance refresh: {:?}", err),
176 }
177 };
178 },
179 _ => {},
180 };
181
182 self.wallet.progress_pending_rounds(Some(event)).await
183 }
184
185 async fn process_round_event_stream(&self) -> anyhow::Result<()> {
188 let mut events = self.wallet.subscribe_round_events().await?;
189 self.connected.store(true, Ordering::Relaxed);
190
191 loop {
192 futures::select! {
193 res = events.next().fuse() => {
194 match res {
195 Some(Ok(event)) => {
196 if let Err(e) = self.handle_round_event(&event).await {
197 warn!("Error processing round event: {e:#}");
198 }
199 },
200 Some(Err(e)) => {
201 return Err(e.context("error on event stream"));
202 },
203 None => {
204 return Ok(());
205 },
206 }
207 },
208 _ = self.shutdown.cancelled().fuse() => {
209 info!("Shutdown signal received! Shutting round events stream...");
210 return Ok(());
211 },
212 }
213 }
214 }
215
216 async fn run_round_events_process(&self) {
219 loop {
220 if self.shutdown.is_cancelled() {
221 info!("Shutdown signal received! Shutting round events process...");
222 break;
223 }
224
225 let started_at = std::time::Instant::now();
226 if let Err(e) = self.process_round_event_stream().await {
227 warn!("An error occured while processing pending rounds: {e:#}");
228 }
229
230 if started_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
231 trace!("Round events stream closed after healthy session, reconnecting");
232 continue;
233 }
234
235 self.connected.store(false, Ordering::Relaxed);
236
237 futures::select! {
238 _ = tokio::time::sleep(self.sync_interval()).fuse() => {},
239 _ = self.shutdown.cancelled().fuse() => {
240 info!("Shutdown signal received! Shutting round events process...");
241 break;
242 },
243 }
244 }
245 }
246
247 async fn run_server_connection_check_process(&self) {
253 loop {
254 futures::select! {
255 _ = tokio::time::sleep(self.sync_interval()).fuse() => {},
256 _ = self.shutdown.cancelled().fuse() => {
257 info!("Shutdown signal received! Shutting server connection check process...");
258 break;
259 },
260 }
261
262 if self.connected.load(Ordering::Relaxed) {
263 continue;
264 }
265
266 let result = self.wallet.refresh_server().await;
267 let connected = result.is_ok();
268 if let Err(ref e) = result {
269 warn!("Ark server reconnect failed: {:#}", e);
270 } else {
271 info!("Ark server reconnected");
272 }
273 self.connected.store(connected, Ordering::Relaxed);
274 }
275 }
276
277 async fn run_sync_processes(&self) {
278 let mut sync_interval = tokio::time::interval(self.sync_interval());
279
280 loop {
281 futures::select! {
282 _ = sync_interval.tick().fuse() => {
283 if self.connected.load(Ordering::Relaxed) {
284 self.run_fee_rate_update().await;
285 self.run_boards_sync().await;
286 self.run_offboards_sync().await;
287 }
288 self.run_onchain_sync().await;
289 self.run_rounds_sync().await;
290 self.run_exits().await;
291 sync_interval.reset();
292 },
293 _ = self.shutdown.cancelled().fuse() => {
294 info!("Shutdown signal received! Shutting sync processes...");
295 break;
296 },
297 }
298 }
299 }
300
301 async fn run_startup_tasks(&self) {
303 let result = self.wallet.refresh_server().await;
308 if let Err(ref e) = result {
309 warn!("Ark server refresh failed: {:#}", e);
310 }
311 let connected = self.wallet.inner.server.initialized();
312 self.connected.store(connected, Ordering::Relaxed);
313
314 if !self.wallet.config().daemon_manual_sync {
315 self.wallet.sync().await;
316 }
317 }
318
319 pub async fn run(self) {
320 info!("Starting daemon for wallet {}", self.wallet.fingerprint());
321
322 self.run_startup_tasks().await;
323
324 if self.wallet.config().daemon_manual_sync {
325 info!("Daemon running in manual-sync mode; background sync disabled");
328 let _ = self.run_server_connection_check_process().await;
329 } else {
330 let _ = futures::join!(
331 self.run_server_connection_check_process(),
332 self.run_round_events_process(),
333 self.run_sync_processes(),
334 self.run_mailbox_messages_process(),
335 );
336 }
337
338 info!("Daemon gracefully stopped");
339 }
340}