Skip to main content

bark/actions/
mod.rs

1//! Wallet action infrastructure.
2//!
3//! A *wallet action* is a multi-step operation that moves vtxos (e.g. a
4//! lightning send). Each step is small, persists its outcome to a
5//! checkpoint, and is safe to re-drive after a crash.
6//!
7//! This module defines the generic vocabulary; per-kind machinery (state
8//! machines, transition functions) lives in submodules.
9
10pub mod lightning;
11pub mod arkoor_send;
12
13use std::time::Duration;
14
15use log::{debug, trace, warn};
16use server_rpc::StatusExt;
17
18use crate::{Wallet, WalletVtxo};
19use crate::actions::arkoor_send::ArkoorSend;
20use crate::actions::lightning::pay::LightningSend;
21use crate::lock_manager::LockGuard;
22use crate::vtxo::{VtxoState, VtxoStateKind};
23
24pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
25
26/// Tagged union of every kind of checkpoint the wallet persists.
27///
28/// Used as the serialization boundary for the
29/// `bark_wallet_action_checkpoint` table; per-kind logic lives on each
30/// variant's payload type.
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32pub enum WalletActionCheckpoint {
33	LightningSend(LightningSend),
34	ArkoorSend(ArkoorSend),
35}
36
37impl WalletActionCheckpoint {
38	pub fn id(&self) -> WalletActionId {
39		match self {
40			WalletActionCheckpoint::LightningSend(s) => s.id(),
41			WalletActionCheckpoint::ArkoorSend(s) => s.id(),
42		}
43	}
44
45	pub fn as_lightning_send(&self) -> Option<&LightningSend> {
46		match self {
47			WalletActionCheckpoint::LightningSend(s) => Some(s),
48			_ => None,
49		}
50	}
51
52	pub fn into_lightning_send(self) -> Option<LightningSend> {
53		match self {
54			WalletActionCheckpoint::LightningSend(s) => Some(s),
55			_ => None,
56		}
57	}
58
59	pub fn as_arkoor_send(&self) -> Option<&ArkoorSend> {
60		match self {
61			WalletActionCheckpoint::ArkoorSend(s) => Some(s),
62			_ => None,
63		}
64	}
65
66	pub fn into_arkoor_send(self) -> Option<ArkoorSend> {
67		match self {
68			WalletActionCheckpoint::ArkoorSend(s) => Some(s),
69			_ => None,
70		}
71	}
72}
73
74impl From<LightningSend> for WalletActionCheckpoint {
75	fn from(s: LightningSend) -> Self {
76		WalletActionCheckpoint::LightningSend(s)
77	}
78}
79
80impl From<ArkoorSend> for WalletActionCheckpoint {
81	fn from(s: ArkoorSend) -> Self {
82		WalletActionCheckpoint::ArkoorSend(s)
83	}
84}
85
86/// Stable identifier for a wallet action.
87///
88/// The id must be derivable from the action's identity (e.g. the payment
89/// hash for a lightning send) so that restarting the same action picks
90/// up the same checkpoint row.
91pub type WalletActionId = String;
92
93/// Outcome of one `WalletAction::advance` call.
94///
95/// The executor uses these to decide whether to persist, loop, schedule
96/// a wake-up or remove the checkpoint.
97pub enum Advance<A> {
98	/// Transition to a new state. Executor persists `state` and calls
99	/// `advance` on it.
100	Next(A),
101	/// Pause until something external (notification, periodic sync) or
102	/// `wake_after` (when set) re-drives the action. Executor persists
103	/// `state` and returns.
104	///
105	/// `wake_after` is a hint, not a guarantee: it lives only in this
106	/// process and is lost across restarts. `advance` MUST tolerate
107	/// being called before the hint has elapsed.
108	///
109	/// `error` is the error that caused the park, if any.
110	Park {
111		state: A,
112		wake_after: Option<Duration>,
113		error: Option<AdvanceError>,
114	},
115	/// Terminal: executor removes the checkpoint row. Any permanent fact
116	/// the action wants to retain (e.g. an "invoice paid" record) must
117	/// be written to its own table before returning `Done`.
118	Done,
119	/// Terminal: executor removes the checkpoint row because of a fatal error.
120	/// This advance should only be returned when no server change occured yet
121	/// or when process has checked server status is expected one and it is
122	/// safe to remove checkpoint
123	Failed(anyhow::Error),
124}
125
126#[derive(Debug, thiserror::Error)]
127pub enum AdvanceError {
128	#[error("An error occurred while communicating with the server: {0}")]
129	Server(tonic::Status),
130	#[error("An error occurred while processing the action: {0}")]
131	Other(#[from] anyhow::Error),
132}
133
134impl AdvanceError {
135	pub fn is_server_rejection(&self) -> bool {
136		match self {
137			AdvanceError::Server(err) => err.is_rejection(),
138			_ => false,
139		}
140	}
141}
142
143pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
144	let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
145	debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
146	Advance::Park { state, wake_after: Some(delay), error: None }
147}
148
149/// Whether to double-drive each action step to check reentrancy, set via the
150/// `BARK_DOUBLE_DRIVE_ACTIONS` env var. Debug-only, compiled out of release.
151/// See `just int-bark-sdk-action-reentrancy`.
152#[cfg(debug_assertions)]
153fn double_drive_actions() -> bool {
154	std::env::var_os("BARK_DOUBLE_DRIVE_ACTIONS").is_some()
155}
156
157/// Assert advancing the same state twice produced an equivalent outcome (same
158/// [`Advance`] kind, same checkpoint for non-terminal kinds); a divergence is a
159/// non-idempotency bug and panics, naming the offending step. Two errors count
160/// as equivalent: [`AdvanceError`] isn't comparable.
161#[cfg(debug_assertions)]
162fn assert_reentrant<A>(
163	first: &Result<Advance<A>, AdvanceError>,
164	second: &Result<Advance<A>, AdvanceError>,
165) where
166	A: Into<WalletActionCheckpoint> + Clone,
167{
168	fn describe<A: Into<WalletActionCheckpoint> + Clone>(
169		result: &Result<Advance<A>, AdvanceError>,
170	) -> (&'static str, Option<WalletActionCheckpoint>) {
171		match result {
172			Ok(Advance::Next(state)) => ("Next", Some(state.clone().into())),
173			Ok(Advance::Park { state, .. }) => ("Park", Some(state.clone().into())),
174			Ok(Advance::Done) => ("Done", None),
175			Ok(Advance::Failed(_)) => ("Failed", None),
176			Err(_) => ("Err", None),
177		}
178	}
179
180	assert_eq!(
181		describe(first), describe(second),
182		"wallet action is not reentrant: advancing the same state twice diverged",
183	);
184}
185
186/// A wallet action that can be driven step-by-step.
187///
188/// Implementors define the per-kind state machine; the executor owns the
189/// loop, persistence, retry tracking and wake scheduling.
190#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
191#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
192pub trait WalletAction: Sized + Send + Sync {
193	/// Get an identifier for this action
194	///
195	/// The `id` returned MUST be stable across calls on the same logical
196	/// action (different states of the same action share an id).
197	fn id(&self) -> WalletActionId;
198
199	/// Called to advance the action state
200	///
201	/// MUST be re-entrant: it may be called more than once for the same logical
202	/// step (after a crash, after an early wake, after a notification arrives).
203	/// All side effects it triggers must therefore be idempotent.
204	async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
205
206	/// Called when the action should be retried
207	async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
208		Ok(park_with_backoff(self, attempts))
209	}
210
211	/// Called when the server rejected one of our requests
212	///
213	/// MUST be re-entrant for the same reason as [WalletAction::advance]:
214	/// it may run partially, crash, and be re-driven against the state the action
215	/// subsequently lands in.
216	async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
217		-> anyhow::Result<Advance<Self>>;
218}
219
220/// How aggressively the executor should drive an action.
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum DriveMode {
223	/// Drive until the action parks or completes, then return.
224	UntilParkOrDone,
225	/// Drive past parks, sleeping between iterations, until the action
226	/// returns [`Advance::Done`].
227	UntilDone,
228}
229
230impl Wallet {
231	/// List the VTXOs currently locked by a specific wallet action.
232	///
233	/// Used by the executor to free reservations when an action fails
234	/// terminally without having transitioned its vtxos through the
235	/// normal Spent/Spendable channels.
236	async fn get_vtxos_locked_by_action(
237		&self,
238		action_id: &WalletActionId,
239	) -> anyhow::Result<Vec<WalletVtxo>> {
240		let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
241		Ok(all.into_iter().filter(|v| match &v.state {
242			VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
243				id == action_id
244			},
245			_ => false,
246		}).collect())
247	}
248
249	/// Release every vtxo currently locked by the given action,
250	/// returning each one to [`crate::vtxo::VtxoState::Spendable`].
251	///
252	/// Cheap when nothing is held (no-op). Used as the cleanup hook by
253	/// the executor on `Advance::Done` and by manual cancellation via
254	/// [`Self::cancel_wallet_action`].
255	pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
256		let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
257		if vtxos.is_empty() {
258			return Ok(());
259		}
260		debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
261		self.unlock_vtxos(vtxos).await
262	}
263
264	/// Finish a wallet action: release its vtxo locks and remove the
265	/// checkpoint row. Intended for manual cleanup of stuck actions;
266	/// the normal terminal path is `Advance::Done` from `advance`.
267	pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
268		self.release_action_locks(action_id).await?;
269		self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
270		Ok(())
271	}
272
273	/// Drive a wallet action to its next park or terminal state.
274	///
275	/// Holds a per-action-id in-flight guard so concurrent drives of
276	/// the same action (e.g. the periodic sync racing a user call)
277	/// don't step on each other.
278	pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
279	where
280		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
281	{
282		let guard = match self.inner.lock_manager.try_lock(&action.id()).await {
283			Some(g) => g,
284			None => {
285				trace!("action {} is already being driven, skipping", action.id());
286				return Ok(());
287			},
288		};
289
290		self.drive_action_with_guard(action, mode, guard).await
291	}
292
293	/// Drive an action assuming the caller already holds its per-id
294	/// lock. `lock_guard` MUST be the guard returned by
295	/// `lock_manager.try_lock(&lock_key::<A>(&action.id()))`; it is
296	/// held for RAII and dropped when this function returns.
297	pub(crate) async fn drive_action_with_guard<A>(
298		&self,
299		action: A,
300		mode: DriveMode,
301		_lock_guard: Box<dyn LockGuard>,
302	) -> anyhow::Result<()>
303	where
304		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
305	{
306		self.run_action_loop(action, mode).await
307	}
308
309	/// Run one `advance` step.
310	///
311	/// In debug builds with `BARK_DOUBLE_DRIVE_ACTIONS` set (see
312	/// [`double_drive_actions`]) the step runs twice from the same state and
313	/// [`assert_reentrant`] checks both reach an equivalent checkpoint,
314	/// exercising `advance`'s idempotency contract. Keep the second run; its
315	/// side effects are the ones the persisted state references.
316	async fn advance_step<A>(&self, action: A) -> Result<Advance<A>, AdvanceError>
317	where
318		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
319	{
320		#[cfg(debug_assertions)]
321		if double_drive_actions() {
322			// Two deep `advance` runs back to back can overflow the default
323			// thread stack in debug; the runner bumps `RUST_MIN_STACK`.
324			let snapshot = action.clone();
325			let first = action.advance(self).await;
326			let second = snapshot.advance(self).await;
327			assert_reentrant(&first, &second);
328			return second;
329		}
330
331		action.advance(self).await
332	}
333
334	async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
335	where
336		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
337	{
338		// In-memory counter for transient errors. Lives only for this
339		// drive_action call so the backoff curve resets between drives.
340		let mut retries: u32 = 0;
341
342		loop {
343			let id = action.id();
344			// Snapshot for the error path: advance consumes self, and
345			// on_rejection also takes self by value, so we need a
346			// copy around if budget exhausts.
347			let snapshot = action.clone();
348
349			let advance = match self.advance_step(action).await {
350				Ok(advance) => { advance },
351				Err(e) if e.is_server_rejection() => {
352					warn!("action {} got rejected by server: {:#}", id, e);
353					snapshot.on_rejection(self, e).await.inspect_err(|err| {
354						warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
355					})?
356				}
357				Err(e) => {
358					retries = retries.saturating_add(1);
359					log::error!("Got error {:?} from action {}, retrying", e, id);
360					snapshot.on_retry(self, retries).await.inspect_err(|err| {
361						warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
362					})?
363				},
364			};
365
366			match advance {
367				Advance::Next(next) => {
368					retries = 0;
369					let checkpoint: WalletActionCheckpoint = next.clone().into();
370					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
371					action = next;
372				},
373				Advance::Park { state, wake_after, error } => {
374					let checkpoint: WalletActionCheckpoint = state.clone().into();
375					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
376					match mode {
377						DriveMode::UntilParkOrDone => {
378							return match error {
379								Some(error) => Err(error.into()),
380								None => Ok(()),
381							};
382						},
383						DriveMode::UntilDone => {
384							if let Some(delay) = wake_after {
385								debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
386								tokio::time::sleep(delay).await;
387								action = state;
388							} else {
389								return Ok(());
390							}
391						},
392					}
393				},
394				Advance::Done => {
395					if let Err(e) = self.stop_wallet_action(&id).await {
396						warn!("action {} done but couldn't cancel: {:#}", id, e);
397					}
398					return Ok(());
399				},
400				Advance::Failed(e) => {
401					if let Err(e) = self.stop_wallet_action(&id).await {
402						warn!("action {} failed but couldn't cancel: {:#}", id, e);
403					}
404					return Err(e);
405				},
406			}
407		}
408	}
409}