1pub mod lightning;
11
12use std::time::Duration;
13
14use log::{debug, trace, warn};
15use server_rpc::StatusExt;
16
17use crate::{Wallet, WalletVtxo};
18use crate::actions::lightning::pay::LightningSend;
19use crate::lock_manager::LockGuard;
20use crate::vtxo::{VtxoState, VtxoStateKind};
21
22pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub enum WalletActionCheckpoint {
31 LightningSend(LightningSend),
32}
33
34impl WalletActionCheckpoint {
35 pub fn id(&self) -> WalletActionId {
36 match self {
37 WalletActionCheckpoint::LightningSend(s) => s.id(),
38 }
39 }
40
41 pub fn as_lightning_send(&self) -> Option<&LightningSend> {
42 match self {
43 WalletActionCheckpoint::LightningSend(s) => Some(s),
44 }
45 }
46
47 pub fn into_lightning_send(self) -> Option<LightningSend> {
48 match self {
49 WalletActionCheckpoint::LightningSend(s) => Some(s),
50 }
51 }
52}
53
54impl From<LightningSend> for WalletActionCheckpoint {
55 fn from(s: LightningSend) -> Self {
56 WalletActionCheckpoint::LightningSend(s)
57 }
58}
59
60pub type WalletActionId = String;
66
67pub enum Advance<A> {
72 Next(A),
75 Park {
85 state: A,
86 wake_after: Option<Duration>,
87 error: Option<AdvanceError>,
88 },
89 Done,
93 Failed(anyhow::Error),
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum AdvanceError {
102 #[error("An error occurred while communicating with the server: {0}")]
103 Server(tonic::Status),
104 #[error("An error occurred while processing the action: {0}")]
105 Other(#[from] anyhow::Error),
106}
107
108impl AdvanceError {
109 pub fn is_server_rejection(&self) -> bool {
110 match self {
111 AdvanceError::Server(err) => err.is_rejection(),
112 _ => false,
113 }
114 }
115}
116
117pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
118 let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
119 debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
120 Advance::Park { state, wake_after: Some(delay), error: None }
121}
122
123#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
128#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
129pub trait WalletAction: Sized + Send + Sync {
130 fn id(&self) -> WalletActionId;
135
136 async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
142
143 async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
145 Ok(park_with_backoff(self, attempts))
146 }
147
148 async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
154 -> anyhow::Result<Advance<Self>>;
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum DriveMode {
160 UntilParkOrDone,
162 UntilDone,
165}
166
167impl Wallet {
168 async fn get_vtxos_locked_by_action(
174 &self,
175 action_id: &WalletActionId,
176 ) -> anyhow::Result<Vec<WalletVtxo>> {
177 let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
178 Ok(all.into_iter().filter(|v| match &v.state {
179 VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
180 id == action_id
181 },
182 _ => false,
183 }).collect())
184 }
185
186 pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
193 let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
194 if vtxos.is_empty() {
195 return Ok(());
196 }
197 debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
198 self.unlock_vtxos(vtxos).await
199 }
200
201 pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
205 self.release_action_locks(action_id).await?;
206 self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
207 Ok(())
208 }
209
210 pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
216 where
217 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
218 {
219 let guard = match self.inner.lock_manager.try_lock(&action.id()).await {
220 Some(g) => g,
221 None => {
222 trace!("action {} is already being driven, skipping", action.id());
223 return Ok(());
224 },
225 };
226
227 self.drive_action_with_guard(action, mode, guard).await
228 }
229
230 pub(crate) async fn drive_action_with_guard<A>(
235 &self,
236 action: A,
237 mode: DriveMode,
238 _lock_guard: Box<dyn LockGuard>,
239 ) -> anyhow::Result<()>
240 where
241 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
242 {
243 self.run_action_loop(action, mode).await
244 }
245
246 async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
247 where
248 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
249 {
250 let mut retries: u32 = 0;
253
254 loop {
255 let id = action.id();
256 let snapshot = action.clone();
260
261 let advance = match action.advance(self).await {
262 Ok(advance) => { advance },
263 Err(e) if e.is_server_rejection() => {
264 warn!("action {} got rejected by server: {:#}", id, e);
265 snapshot.on_rejection(self, e).await.inspect_err(|err| {
266 warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
267 })?
268 }
269 Err(e) => {
270 retries = retries.saturating_add(1);
271 log::error!("Got error {:?} from action {}, retrying", e, id);
272 snapshot.on_retry(self, retries).await.inspect_err(|err| {
273 warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
274 })?
275 },
276 };
277
278 match advance {
279 Advance::Next(next) => {
280 retries = 0;
281 let checkpoint: WalletActionCheckpoint = next.clone().into();
282 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
283 action = next;
284 },
285 Advance::Park { state, wake_after, error } => {
286 let checkpoint: WalletActionCheckpoint = state.clone().into();
287 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
288 match mode {
289 DriveMode::UntilParkOrDone => {
290 return match error {
291 Some(error) => Err(error.into()),
292 None => Ok(()),
293 };
294 },
295 DriveMode::UntilDone => {
296 if let Some(delay) = wake_after {
297 debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
298 tokio::time::sleep(delay).await;
299 action = state;
300 } else {
301 return Ok(());
302 }
303 },
304 }
305 },
306 Advance::Done => {
307 if let Err(e) = self.stop_wallet_action(&id).await {
308 warn!("action {} done but couldn't cancel: {:#}", id, e);
309 }
310 return Ok(());
311 },
312 Advance::Failed(e) => {
313 if let Err(e) = self.stop_wallet_action(&id).await {
314 warn!("action {} failed but couldn't cancel: {:#}", id, e);
315 }
316 return Err(e);
317 },
318 }
319 }
320 }
321}