1pub mod lightning;
11pub mod arkoor_send;
12
13use std::time::Duration;
14
15use log::{debug, trace, warn};
16use server_rpc::StatusExt;
17
18use crate::{Wallet, WalletVtxo};
19use crate::actions::arkoor_send::ArkoorSend;
20use crate::actions::lightning::pay::LightningSend;
21use crate::lock_manager::LockGuard;
22use crate::vtxo::{VtxoState, VtxoStateKind};
23
24pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32pub enum WalletActionCheckpoint {
33 LightningSend(LightningSend),
34 ArkoorSend(ArkoorSend),
35}
36
37impl WalletActionCheckpoint {
38 pub fn id(&self) -> WalletActionId {
39 match self {
40 WalletActionCheckpoint::LightningSend(s) => s.id(),
41 WalletActionCheckpoint::ArkoorSend(s) => s.id(),
42 }
43 }
44
45 pub fn as_lightning_send(&self) -> Option<&LightningSend> {
46 match self {
47 WalletActionCheckpoint::LightningSend(s) => Some(s),
48 _ => None,
49 }
50 }
51
52 pub fn into_lightning_send(self) -> Option<LightningSend> {
53 match self {
54 WalletActionCheckpoint::LightningSend(s) => Some(s),
55 _ => None,
56 }
57 }
58
59 pub fn as_arkoor_send(&self) -> Option<&ArkoorSend> {
60 match self {
61 WalletActionCheckpoint::ArkoorSend(s) => Some(s),
62 _ => None,
63 }
64 }
65
66 pub fn into_arkoor_send(self) -> Option<ArkoorSend> {
67 match self {
68 WalletActionCheckpoint::ArkoorSend(s) => Some(s),
69 _ => None,
70 }
71 }
72}
73
74impl From<LightningSend> for WalletActionCheckpoint {
75 fn from(s: LightningSend) -> Self {
76 WalletActionCheckpoint::LightningSend(s)
77 }
78}
79
80impl From<ArkoorSend> for WalletActionCheckpoint {
81 fn from(s: ArkoorSend) -> Self {
82 WalletActionCheckpoint::ArkoorSend(s)
83 }
84}
85
86pub type WalletActionId = String;
92
93pub enum Advance<A> {
98 Next(A),
101 Park {
111 state: A,
112 wake_after: Option<Duration>,
113 error: Option<AdvanceError>,
114 },
115 Done,
119 Failed(anyhow::Error),
124}
125
126#[derive(Debug, thiserror::Error)]
127pub enum AdvanceError {
128 #[error("An error occurred while communicating with the server: {0}")]
129 Server(tonic::Status),
130 #[error("An error occurred while processing the action: {0}")]
131 Other(#[from] anyhow::Error),
132}
133
134impl AdvanceError {
135 pub fn is_server_rejection(&self) -> bool {
136 match self {
137 AdvanceError::Server(err) => err.is_rejection(),
138 _ => false,
139 }
140 }
141}
142
143pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
144 let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
145 debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
146 Advance::Park { state, wake_after: Some(delay), error: None }
147}
148
149#[cfg(debug_assertions)]
153fn double_drive_actions() -> bool {
154 std::env::var_os("BARK_DOUBLE_DRIVE_ACTIONS").is_some()
155}
156
157#[cfg(debug_assertions)]
162fn assert_reentrant<A>(
163 first: &Result<Advance<A>, AdvanceError>,
164 second: &Result<Advance<A>, AdvanceError>,
165) where
166 A: Into<WalletActionCheckpoint> + Clone,
167{
168 fn describe<A: Into<WalletActionCheckpoint> + Clone>(
169 result: &Result<Advance<A>, AdvanceError>,
170 ) -> (&'static str, Option<WalletActionCheckpoint>) {
171 match result {
172 Ok(Advance::Next(state)) => ("Next", Some(state.clone().into())),
173 Ok(Advance::Park { state, .. }) => ("Park", Some(state.clone().into())),
174 Ok(Advance::Done) => ("Done", None),
175 Ok(Advance::Failed(_)) => ("Failed", None),
176 Err(_) => ("Err", None),
177 }
178 }
179
180 assert_eq!(
181 describe(first), describe(second),
182 "wallet action is not reentrant: advancing the same state twice diverged",
183 );
184}
185
186#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
191#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
192pub trait WalletAction: Sized + Send + Sync {
193 fn id(&self) -> WalletActionId;
198
199 async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
205
206 async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
208 Ok(park_with_backoff(self, attempts))
209 }
210
211 async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
217 -> anyhow::Result<Advance<Self>>;
218}
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum DriveMode {
223 UntilParkOrDone,
225 UntilDone,
228}
229
230impl Wallet {
231 async fn get_vtxos_locked_by_action(
237 &self,
238 action_id: &WalletActionId,
239 ) -> anyhow::Result<Vec<WalletVtxo>> {
240 let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
241 Ok(all.into_iter().filter(|v| match &v.state {
242 VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
243 id == action_id
244 },
245 _ => false,
246 }).collect())
247 }
248
249 pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
256 let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
257 if vtxos.is_empty() {
258 return Ok(());
259 }
260 debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
261 self.unlock_vtxos(vtxos).await
262 }
263
264 pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
268 self.release_action_locks(action_id).await?;
269 self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
270 Ok(())
271 }
272
273 pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
279 where
280 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
281 {
282 let guard = match self.inner.lock_manager.try_lock(&action.id()).await {
283 Some(g) => g,
284 None => {
285 trace!("action {} is already being driven, skipping", action.id());
286 return Ok(());
287 },
288 };
289
290 self.drive_action_with_guard(action, mode, guard).await
291 }
292
293 pub(crate) async fn drive_action_with_guard<A>(
298 &self,
299 action: A,
300 mode: DriveMode,
301 _lock_guard: Box<dyn LockGuard>,
302 ) -> anyhow::Result<()>
303 where
304 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
305 {
306 self.run_action_loop(action, mode).await
307 }
308
309 async fn advance_step<A>(&self, action: A) -> Result<Advance<A>, AdvanceError>
317 where
318 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
319 {
320 #[cfg(debug_assertions)]
321 if double_drive_actions() {
322 let snapshot = action.clone();
325 let first = action.advance(self).await;
326 let second = snapshot.advance(self).await;
327 assert_reentrant(&first, &second);
328 return second;
329 }
330
331 action.advance(self).await
332 }
333
334 async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
335 where
336 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
337 {
338 let mut retries: u32 = 0;
341
342 loop {
343 let id = action.id();
344 let snapshot = action.clone();
348
349 let advance = match self.advance_step(action).await {
350 Ok(advance) => { advance },
351 Err(e) if e.is_server_rejection() => {
352 warn!("action {} got rejected by server: {:#}", id, e);
353 snapshot.on_rejection(self, e).await.inspect_err(|err| {
354 warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
355 })?
356 }
357 Err(e) => {
358 retries = retries.saturating_add(1);
359 log::error!("Got error {:?} from action {}, retrying", e, id);
360 snapshot.on_retry(self, retries).await.inspect_err(|err| {
361 warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
362 })?
363 },
364 };
365
366 match advance {
367 Advance::Next(next) => {
368 retries = 0;
369 let checkpoint: WalletActionCheckpoint = next.clone().into();
370 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
371 action = next;
372 },
373 Advance::Park { state, wake_after, error } => {
374 let checkpoint: WalletActionCheckpoint = state.clone().into();
375 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
376 match mode {
377 DriveMode::UntilParkOrDone => {
378 return match error {
379 Some(error) => Err(error.into()),
380 None => Ok(()),
381 };
382 },
383 DriveMode::UntilDone => {
384 if let Some(delay) = wake_after {
385 debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
386 tokio::time::sleep(delay).await;
387 action = state;
388 } else {
389 return Ok(());
390 }
391 },
392 }
393 },
394 Advance::Done => {
395 if let Err(e) = self.stop_wallet_action(&id).await {
396 warn!("action {} done but couldn't cancel: {:#}", id, e);
397 }
398 return Ok(());
399 },
400 Advance::Failed(e) => {
401 if let Err(e) = self.stop_wallet_action(&id).await {
402 warn!("action {} failed but couldn't cancel: {:#}", id, e);
403 }
404 return Err(e);
405 },
406 }
407 }
408 }
409}