1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::{FutureExt, StreamExt};
8use log::{info, warn};
9use tokio::sync::RwLock;
10#[cfg(not(feature = "wasm-web"))]
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13
14use crate::Wallet;
15use crate::onchain::DaemonizableOnchainWallet;
16
17
18#[cfg(not(feature = "wasm-web"))]
20pub struct DaemonHandle {
21 shutdown: CancellationToken,
22 jh: JoinHandle<()>,
23}
24
25#[cfg(feature = "wasm-web")]
27pub struct DaemonHandle {
28 shutdown: CancellationToken,
29}
30
31impl DaemonHandle {
32 pub fn stop(&self) {
34 self.shutdown.cancel();
35 }
36
37 pub async fn stop_wait(self) -> anyhow::Result<()> {
39 self.stop();
40 #[cfg(not(feature = "wasm-web"))]
41 self.jh.await?;
42 Ok(())
43 }
44}
45
46pub(crate) fn start_daemon(
47 wallet: Arc<Wallet>,
48 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
49) -> DaemonHandle {
50 let shutdown = CancellationToken::new();
51 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
52
53 #[cfg(not(feature = "wasm-web"))]
54 {
55 let jh = crate::utils::spawn(proc.run());
56 DaemonHandle { shutdown, jh }
57 }
58 #[cfg(feature = "wasm-web")]
59 {
60 crate::utils::spawn(proc.run());
61 DaemonHandle { shutdown }
62 }
63}
64
65struct DaemonProcess {
68 shutdown: CancellationToken,
69
70 connected: AtomicBool,
71 wallet: Arc<Wallet>,
72 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
73}
74
75impl DaemonProcess {
76 fn new(
77 shutdown: CancellationToken,
78 wallet: Arc<Wallet>,
79 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
80 ) -> DaemonProcess {
81 DaemonProcess {
82 connected: AtomicBool::new(false),
83 shutdown,
84 wallet,
85 onchain,
86 }
87 }
88
89 fn fast_interval(&self) -> Duration {
90 Duration::from_secs(self.wallet.config().daemon_fast_sync_interval_secs)
91 }
92
93 fn slow_interval(&self) -> Duration {
94 Duration::from_secs(self.wallet.config().daemon_slow_sync_interval_secs)
95 }
96
97 async fn run_lightning_sync(&self) {
101 if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
102 warn!("An error occured while checking and claiming pending lightning receives: {e:#}");
103 }
104
105 if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
106 warn!("An error occured while syncing pending lightning sends: {e:#}");
107 }
108 }
109
110 async fn run_mailbox_messages_process(&self) {
114 loop {
115 if self.connected.load(Ordering::Relaxed) {
116 if let Err(e) = self.wallet.subscribe_process_mailbox_messages(None).await {
117 warn!("An error occured while processing mailbox messages: {e:#}");
118 }
119 }
120
121 futures::select! {
122 _ = tokio::time::sleep(self.slow_interval()).fuse() => {},
123 _ = self.shutdown.cancelled().fuse() => {
124 info!("Shutdown signal received! Shutting mailbox messages process...");
125 break;
126 },
127 }
128 }
129 }
130
131 async fn run_boards_sync(&self) {
133 if let Err(e) = self.wallet.sync_pending_boards().await {
134 warn!("An error occured while syncing pending board: {e:#}");
135 }
136 }
137
138 async fn run_offboards_sync(&self) {
140 if let Err(e) = self.wallet.sync_pending_offboards().await {
141 warn!("An error occured while syncing pending offboards: {e:#}");
142 }
143 }
144
145 async fn run_fee_rate_update(&self) {
147 if let Err(e) = self.wallet.chain.update_fee_rates(self.wallet.config.fallback_fee_rate).await {
148 warn!("An error occured while updating fee rates: {e:#}");
149 }
150 }
151
152 async fn run_onchain_sync(&self) {
154 let mut onchain = self.onchain.write().await;
155 if let Err(e) = onchain.sync(&self.wallet.chain).await {
156 warn!("An error occured while syncing onchain: {e:#}");
157 }
158 }
159
160 async fn run_maintenance_refresh_process(&self) {
162 if let Err(e) = self.wallet.maintenance_refresh().await {
163 warn!("An error occured while performing maintenance refresh: {e:#}");
164 }
165 }
166
167 async fn run_exits(&self) {
169 let mut onchain = self.onchain.write().await;
170
171 let mut exit_lock = self.wallet.exit.write().await;
172 if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
173 warn!("An error occurred while syncing exits: {e:#}");
174 }
175
176 if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
177 warn!("An error occurred while progressing exits: {e:#}");
178 }
179 }
180
181 async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
183 let mut events = self.wallet.subscribe_round_events().await?;
184
185 loop {
186 futures::select! {
187 res = events.next().fuse() => {
188 let event = res.context("events stream broke")?
189 .context("error on event stream")?;
190
191 self.wallet.progress_pending_rounds(Some(&event)).await?;
192 },
193 _ = self.shutdown.cancelled().fuse() => {
194 info!("Shutdown signal received! Shutting inner round events process...");
195 return Ok(());
196 },
197 }
198 }
199 }
200
201 async fn run_round_events_process(&self) {
205 loop {
206 if self.connected.load(Ordering::Relaxed) {
207 if let Err(e) = self.inner_process_pending_rounds().await {
208 warn!("An error occured while processing pending rounds: {e:#}");
209 }
210 }
211
212 futures::select! {
213 _ = tokio::time::sleep(self.slow_interval()).fuse() => {},
214 _ = self.shutdown.cancelled().fuse() => {
215 info!("Shutdown signal received! Shutting round events process...");
216 break;
217 },
218 }
219 }
220 }
221
222 async fn run_server_connection_check_process(&self) {
224 loop {
225 futures::select! {
226 _ = tokio::time::sleep(self.fast_interval()).fuse() => {},
227 _ = self.shutdown.cancelled().fuse() => {
228 info!("Shutdown signal received! Shutting server connection check process...");
229 break;
230 },
231 }
232
233 let connected = self.wallet.refresh_server().await.is_ok();
234 self.connected.store(connected, Ordering::Relaxed);
235 }
236 }
237
238 async fn run_sync_processes(&self) {
239 let mut fast_interval = tokio::time::interval(self.fast_interval());
240 fast_interval.reset();
241 let mut slow_interval = tokio::time::interval(self.slow_interval());
242 slow_interval.reset();
243
244 loop {
245 futures::select! {
246 _ = fast_interval.tick().fuse() => {
247 if !self.connected.load(Ordering::Relaxed) {
248 continue;
249 }
250
251 self.run_lightning_sync().await;
252 fast_interval.reset();
253 },
254 _ = slow_interval.tick().fuse() => {
255 if !self.connected.load(Ordering::Relaxed) {
256 continue;
257 }
258
259 self.run_fee_rate_update().await;
260 self.run_boards_sync().await;
261 self.run_offboards_sync().await;
262 self.run_maintenance_refresh_process().await;
263 self.run_onchain_sync().await;
264 self.run_exits().await;
265 slow_interval.reset();
266 },
267 _ = self.shutdown.cancelled().fuse() => {
268 info!("Shutdown signal received! Shutting sync processes...");
269 break;
270 },
271 }
272 }
273 }
274
275 pub async fn run(self) {
276 let connected = self.wallet.server.read().is_some();
277 self.connected.store(connected, Ordering::Relaxed);
278
279 let _ = futures::join!(
280 self.run_server_connection_check_process(),
281 self.run_round_events_process(),
282 self.run_sync_processes(),
283 self.run_mailbox_messages_process(),
284 );
285
286 info!("Daemon gracefully stopped");
287 }
288}