1
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::Context;
7use futures::StreamExt;
8use log::{info, warn};
9use tokio::sync::RwLock;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use crate::Wallet;
14use crate::onchain::DaemonizableOnchainWallet;
15
16const FAST_INTERVAL: Duration = Duration::from_secs(1);
17const MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
18const SLOW_INTERVAL: Duration = Duration::from_secs(60);
19
20pub struct DaemonHandle {
22 shutdown: CancellationToken,
23 jh: JoinHandle<()>,
24}
25
26impl DaemonHandle {
27 pub fn stop(&self) {
29 self.shutdown.cancel();
30 }
31
32 pub async fn stop_wait(self) -> anyhow::Result<()> {
34 self.stop();
35 self.jh.await?;
36 Ok(())
37 }
38}
39
40pub(crate) fn start_daemon(
41 wallet: Arc<Wallet>,
42 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
43) -> DaemonHandle {
44 let shutdown = CancellationToken::new();
45 let proc = DaemonProcess::new(shutdown.clone(), wallet, onchain);
46
47 let jh = tokio::spawn(proc.run());
48
49 DaemonHandle { shutdown, jh }
50}
51
52struct DaemonProcess {
55 shutdown: CancellationToken,
56
57 connected: AtomicBool,
58 wallet: Arc<Wallet>,
59 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
60}
61
62impl DaemonProcess {
63 fn new(
64 shutdown: CancellationToken,
65 wallet: Arc<Wallet>,
66 onchain: Arc<RwLock<dyn DaemonizableOnchainWallet>>,
67 ) -> DaemonProcess {
68 DaemonProcess {
69 connected: AtomicBool::new(false),
70 shutdown,
71 wallet,
72 onchain,
73 }
74 }
75
76 async fn run_lightning_sync(&self) {
80 if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
81 warn!("An error occured while checking and claiming pending lightning receives: {e}");
82 }
83
84 if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
85 warn!("An error occured while syncing pending lightning sends: {e}");
86 }
87 }
88
89 async fn run_arkoors_sync(&self) {
91 if let Err(e) = self.wallet.sync_oors().await {
92 warn!("An error occured while syncing oors: {e}");
93 }
94 }
95
96 async fn run_boards_sync(&self) {
98 if let Err(e) = self.wallet.sync_pending_boards().await {
99 warn!("An error occured while syncing pending board: {e}");
100 }
101 }
102
103 async fn run_onchain_sync(&self) {
105 let mut onchain = self.onchain.write().await;
106 if let Err(e) = onchain.sync(&self.wallet.chain).await {
107 warn!("An error occured while syncing onchain: {e}");
108 }
109 }
110
111 async fn run_maintenance_refresh_process(&self) {
113 loop {
114 if let Err(e) = self.wallet.maintenance_refresh().await {
115 warn!("An error occured while performing maintenance refresh: {e}");
116 }
117
118 tokio::select! {
119 _ = tokio::time::sleep(SLOW_INTERVAL) => {},
120
121 _ = self.shutdown.cancelled() => {
122 info!("Shutdown signal received! Shutting maintenance refresh process...");
123 break;
124 },
125 }
126 }
127 }
128
129 async fn run_exits(&self) {
131 let mut onchain = self.onchain.write().await;
132
133 let mut exit_lock = self.wallet.exit.write().await;
134 if let Err(e) = exit_lock.sync_no_progress(&*onchain).await {
135 warn!("An error occurred while syncing exits: {e}");
136 }
137
138 if let Err(e) = exit_lock.progress_exits(&self.wallet, &mut *onchain, None).await {
139 warn!("An error occurred while progressing exits: {e}");
140 }
141 }
142
143 async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
145 let mut events = self.wallet.subscribe_round_events().await?;
146
147 loop {
148 tokio::select! {
149 res = events.next() => {
150 let event = res.context("events stream broke")?
151 .context("error on event stream")?;
152
153 self.wallet.progress_pending_rounds(Some(&event)).await?;
154 },
155 _ = self.shutdown.cancelled() => {
156 info!("Shutdown signal received! Shutting inner round events process...");
157 return Ok(());
158 },
159 }
160 }
161 }
162
163 async fn run_round_events_process(&self) {
167 loop {
168 if self.connected.load(Ordering::Relaxed) {
169 if let Err(e) = self.inner_process_pending_rounds().await {
170 warn!("An error occured while processing pending rounds: {e}");
171 }
172 }
173
174 tokio::select! {
175 _ = tokio::time::sleep(SLOW_INTERVAL) => {},
176 _ = self.shutdown.cancelled() => {
177 info!("Shutdown signal received! Shutting round events process...");
178 break;
179 },
180 }
181 }
182 }
183
184 async fn run_server_connection_check_process(&self) {
186 loop {
187 tokio::select! {
188 _ = tokio::time::sleep(FAST_INTERVAL) => {},
189 _ = self.shutdown.cancelled() => {
190 info!("Shutdown signal received! Shutting server connection check process...");
191 break;
192 },
193 }
194
195 let connected = self.wallet.refresh_server().await.is_ok();
196 self.connected.store(connected, Ordering::Relaxed);
197 }
198 }
199
200 async fn run_sync_processes(&self) {
201 let mut fast_interval = tokio::time::interval(FAST_INTERVAL);
202 fast_interval.reset();
203 let mut medium_interval = tokio::time::interval(MEDIUM_INTERVAL);
204 medium_interval.reset();
205 let mut slow_interval = tokio::time::interval(SLOW_INTERVAL);
206 slow_interval.reset();
207
208 loop {
209 tokio::select! {
210 _ = fast_interval.tick() => {
211 if !self.connected.load(Ordering::Relaxed) {
212 continue;
213 }
214
215 self.run_lightning_sync().await;
216 fast_interval.reset();
217 },
218 _ = medium_interval.tick() => {
219 if !self.connected.load(Ordering::Relaxed) {
220 continue;
221 }
222
223 self.run_arkoors_sync().await;
224 self.run_boards_sync().await;
225 medium_interval.reset();
226 },
227 _ = slow_interval.tick() => {
228 if !self.connected.load(Ordering::Relaxed) {
229 continue;
230 }
231
232 self.run_onchain_sync().await;
233 self.run_exits().await;
234 slow_interval.reset();
235 },
236 _ = self.shutdown.cancelled() => {
237 info!("Shutdown signal received! Shutting sync processes...");
238 break;
239 },
240 }
241 }
242 }
243
244 pub async fn run(self) {
245 let connected = self.wallet.server.read().is_some();
246 self.connected.store(connected, Ordering::Relaxed);
247
248 let _ = tokio::join!(
249 self.run_server_connection_check_process(),
250 self.run_round_events_process(),
251 self.run_sync_processes(),
252 self.run_maintenance_refresh_process(),
253 );
254
255 info!("Daemon gracefully stopped");
256 }
257}