1pub use tokio_util::sync::CancellationToken;
2
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::time::Duration;
6
7use anyhow::Context;
8use futures::StreamExt;
9use log::{info, warn};
10use tokio::sync::RwLock;
11
12use crate::Wallet;
13use crate::onchain::ExitUnilaterally;
14
15lazy_static::lazy_static! {
16 static ref FAST_INTERVAL: Duration = Duration::from_secs(1);
17 static ref MEDIUM_INTERVAL: Duration = Duration::from_secs(30);
18 static ref SLOW_INTERVAL: Duration = Duration::from_secs(60);
19}
20
21pub struct Daemon {
24 shutdown: CancellationToken,
25
26 connected: AtomicBool,
27 wallet: Arc<Wallet>,
28 onchain: Arc<RwLock<dyn ExitUnilaterally>>,
29}
30
31impl Daemon {
32 pub fn new(
42 shutdown: CancellationToken,
43 wallet: Arc<Wallet>,
44 onchain: Arc<RwLock<dyn ExitUnilaterally>>,
45 ) -> anyhow::Result<Daemon> {
46 let daemon = Daemon {
47 shutdown,
48
49 connected: AtomicBool::new(false),
50 wallet,
51 onchain,
52 };
53
54 Ok(daemon)
55 }
56
57 async fn run_lightning_sync(&self) {
61 if let Err(e) = self.wallet.try_claim_all_lightning_receives(false).await {
62 warn!("An error occured while checking and claiming pending lightning receives: {e}");
63 }
64
65 if let Err(e) = self.wallet.sync_pending_lightning_send_vtxos().await {
66 warn!("An error occured while syncing pending lightning sends: {e}");
67 }
68 }
69
70 async fn run_arkoors_sync(&self) {
72 if let Err(e) = self.wallet.sync_oors().await {
73 warn!("An error occured while syncing oors: {e}");
74 }
75 }
76
77 async fn run_boards_sync(&self) {
79 if let Err(e) = self.wallet.sync_pending_boards().await {
80 warn!("An error occured while syncing pending board: {e}");
81 }
82 }
83
84 async fn run_maintenance_refresh_process(&self) {
86 loop {
87 if let Err(e) = self.wallet.maintenance_refresh().await {
88 warn!("An error occured while performing maintenance refresh: {e}");
89 }
90
91 tokio::select! {
92 _ = tokio::time::sleep(*SLOW_INTERVAL) => {},
93 _ = self.shutdown.cancelled() => {
94 info!("Shutdown signal received! Shutting maintenance refresh process...");
95 break;
96 },
97 }
98 }
99 }
100
101 async fn run_exits(&self) {
103 let mut onchain = self.onchain.write().await;
104
105 if let Err(e) = self.wallet.sync_exits(&mut *onchain).await {
106 warn!("An error occured while syncing exits: {e}");
107 }
108
109 let mut exit_lock = self.wallet.exit.write().await;
110 if let Err(e) = exit_lock.progress_exits(&mut *onchain, None).await {
111 warn!("An error occured while progressing exits: {e}");
112 }
113 drop(exit_lock);
114 }
115
116 async fn inner_process_pending_rounds(&self) -> anyhow::Result<()> {
118 let mut events = self.wallet.subscribe_round_events().await?;
119
120 loop {
121 tokio::select! {
122 res = events.next() => {
123 let event = res.context("events stream broke")?
124 .context("error on event stream")?;
125
126 self.wallet.progress_ongoing_rounds(Some(&event)).await?;
127 },
128 _ = self.shutdown.cancelled() => {
129 info!("Shutdown signal received! Shutting round events process...");
130 return Ok(());
131 },
132 }
133 }
134 }
135
136 async fn run_round_events_process(&self) {
140 loop {
141 if self.connected.load(Ordering::Relaxed) {
142 if let Err(e) = self.inner_process_pending_rounds().await {
143 warn!("An error occured while processing pending rounds: {e}");
144 }
145 }
146
147 tokio::select! {
148 _ = tokio::time::sleep(*SLOW_INTERVAL) => {},
149 _ = self.shutdown.cancelled() => {
150 info!("Shutdown signal received! Shutting round events process...");
151 break;
152 },
153 }
154 }
155 }
156
157 async fn run_server_connection_check(&self) -> bool {
160 self.wallet.check_connection().await
161 .inspect_err(|e| warn!("Server connection lost: {:#}", e)).is_ok()
162 }
163
164 async fn run_server_connection_check_process(&self) {
166 loop {
167 tokio::select! {
168 _ = tokio::time::sleep(*FAST_INTERVAL) => {},
169 _ = self.shutdown.cancelled() => {
170 info!("Shutdown signal received! Shutting server connection check process...");
171 break;
172 },
173 }
174
175 let connected = self.run_server_connection_check().await;
176 self.connected.store(connected, Ordering::Relaxed);
177 }
178 }
179
180 async fn run_sync_processes(&self) {
181 let mut fast_interval = tokio::time::interval(*FAST_INTERVAL);
182 fast_interval.reset();
183 let mut medium_interval = tokio::time::interval(*MEDIUM_INTERVAL);
184 medium_interval.reset();
185 let mut slow_interval = tokio::time::interval(*SLOW_INTERVAL);
186 slow_interval.reset();
187
188 loop {
189 tokio::select! {
190 _ = fast_interval.tick() => {
191 if !self.connected.load(Ordering::Relaxed) {
192 continue;
193 }
194
195 self.run_lightning_sync().await;
196 fast_interval.reset();
197 },
198 _ = medium_interval.tick() => {
199 if !self.connected.load(Ordering::Relaxed) {
200 continue;
201 }
202
203 self.run_arkoors_sync().await;
204 self.run_boards_sync().await;
205 medium_interval.reset();
206 },
207 _ = slow_interval.tick() => {
208 if !self.connected.load(Ordering::Relaxed) {
209 continue;
210 }
211
212 self.run_exits().await;
213 slow_interval.reset();
214 },
215 _ = self.shutdown.cancelled() => {
216 info!("Shutdown signal received! Shutting sync processes...");
217 break;
218 },
219 }
220 }
221 }
222
223 pub async fn run(self) {
224 let connected = self.run_server_connection_check().await;
225 self.connected.store(connected, Ordering::Relaxed);
226
227 let _ = tokio::join!(
228 self.run_server_connection_check_process(),
229 self.run_round_events_process(),
230 self.run_sync_processes(),
231 self.run_maintenance_refresh_process(),
232 );
233
234 info!("Daemon gracefully stopped");
235 }
236}