1pub mod lightning;
11
12use std::time::Duration;
13
14use log::{debug, trace, warn};
15use server_rpc::StatusExt;
16
17use crate::{Wallet, WalletVtxo};
18use crate::actions::lightning::pay::LightningSend;
19use crate::lock_manager::LockGuard;
20use crate::vtxo::{VtxoState, VtxoStateKind};
21
22pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub enum WalletActionCheckpoint {
31 LightningSend(LightningSend),
32}
33
34impl WalletActionCheckpoint {
35 pub fn id(&self) -> WalletActionId {
36 match self {
37 WalletActionCheckpoint::LightningSend(s) => s.id(),
38 }
39 }
40
41 pub fn as_lightning_send(&self) -> Option<&LightningSend> {
42 match self {
43 WalletActionCheckpoint::LightningSend(s) => Some(s),
44 }
45 }
46
47 pub fn into_lightning_send(self) -> Option<LightningSend> {
48 match self {
49 WalletActionCheckpoint::LightningSend(s) => Some(s),
50 }
51 }
52}
53
54impl From<LightningSend> for WalletActionCheckpoint {
55 fn from(s: LightningSend) -> Self {
56 WalletActionCheckpoint::LightningSend(s)
57 }
58}
59
60pub type WalletActionId = String;
66
67pub enum Advance<A> {
72 Next(A),
75 Park {
85 state: A,
86 wake_after: Option<Duration>,
87 error: Option<AdvanceError>,
88 },
89 Done,
93 Failed(anyhow::Error),
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum AdvanceError {
102 #[error("An error occurred while communicating with the server: {0}")]
103 Server(tonic::Status),
104 #[error("An error occurred while processing the action: {0}")]
105 Other(#[from] anyhow::Error),
106}
107
108impl AdvanceError {
109 pub fn is_server_rejection(&self) -> bool {
110 match self {
111 AdvanceError::Server(err) => err.is_rejection(),
112 _ => false,
113 }
114 }
115}
116
117pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
118 let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
119 debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
120 Advance::Park { state, wake_after: Some(delay), error: None }
121}
122
123#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
140#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
141pub trait WalletAction: Sized + Send + Sync {
142 fn id(&self) -> WalletActionId;
143
144 async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
145
146 async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
147 Ok(park_with_backoff(self, attempts))
148 }
149
150 async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
151 -> anyhow::Result<Advance<Self>>;
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum DriveMode {
157 UntilParkOrDone,
159 UntilDone,
162}
163
164impl Wallet {
165 async fn get_vtxos_locked_by_action(
171 &self,
172 action_id: &WalletActionId,
173 ) -> anyhow::Result<Vec<WalletVtxo>> {
174 let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
175 Ok(all.into_iter().filter(|v| match &v.state {
176 VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
177 id == action_id
178 },
179 _ => false,
180 }).collect())
181 }
182
183 pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
190 let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
191 if vtxos.is_empty() {
192 return Ok(());
193 }
194 debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
195 self.unlock_vtxos(vtxos).await
196 }
197
198 pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
202 self.release_action_locks(action_id).await?;
203 self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
204 Ok(())
205 }
206
207 pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
213 where
214 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
215 {
216 let guard = match self.inner.lock_manager.try_lock(&action.id()).await {
217 Some(g) => g,
218 None => {
219 trace!("action {} is already being driven, skipping", action.id());
220 return Ok(());
221 },
222 };
223
224 self.drive_action_with_guard(action, mode, guard).await
225 }
226
227 pub(crate) async fn drive_action_with_guard<A>(
232 &self,
233 action: A,
234 mode: DriveMode,
235 _lock_guard: Box<dyn LockGuard>,
236 ) -> anyhow::Result<()>
237 where
238 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
239 {
240 self.run_action_loop(action, mode).await
241 }
242
243 async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
244 where
245 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
246 {
247 let mut retries: u32 = 0;
250
251 loop {
252 let id = action.id();
253 let snapshot = action.clone();
257
258 let advance = match action.advance(self).await {
259 Ok(advance) => { advance },
260 Err(e) if e.is_server_rejection() => {
261 warn!("action {} got rejected by server: {:#}", id, e);
262 snapshot.on_rejection(self, e).await.inspect_err(|err| {
263 warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
264 })?
265 }
266 Err(e) => {
267 retries = retries.saturating_add(1);
268 log::error!("Got error {:?} from action {}, retrying", e, id);
269 snapshot.on_retry(self, retries).await.inspect_err(|err| {
270 warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
271 })?
272 },
273 };
274
275 match advance {
276 Advance::Next(next) => {
277 retries = 0;
278 let checkpoint: WalletActionCheckpoint = next.clone().into();
279 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
280 action = next;
281 },
282 Advance::Park { state, wake_after, error } => {
283 let checkpoint: WalletActionCheckpoint = state.clone().into();
284 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
285 match mode {
286 DriveMode::UntilParkOrDone => {
287 return match error {
288 Some(error) => Err(error.into()),
289 None => Ok(()),
290 };
291 },
292 DriveMode::UntilDone => {
293 if let Some(delay) = wake_after {
294 debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
295 tokio::time::sleep(delay).await;
296 action = state;
297 } else {
298 return Ok(());
299 }
300 },
301 }
302 },
303 Advance::Done => {
304 if let Err(e) = self.stop_wallet_action(&id).await {
305 warn!("action {} done but couldn't cancel: {:#}", id, e);
306 }
307 return Ok(());
308 },
309 Advance::Failed(e) => {
310 if let Err(e) = self.stop_wallet_action(&id).await {
311 warn!("action {} failed but couldn't cancel: {:#}", id, e);
312 }
313 return Err(e);
314 },
315 }
316 }
317 }
318}