Skip to main content

bark/actions/
mod.rs

1//! Wallet action infrastructure.
2//!
3//! A *wallet action* is a multi-step operation that moves vtxos (e.g. a
4//! lightning send). Each step is small, persists its outcome to a
5//! checkpoint, and is safe to re-drive after a crash.
6//!
7//! This module defines the generic vocabulary; per-kind machinery (state
8//! machines, transition functions) lives in submodules.
9
10pub mod lightning;
11
12use std::time::Duration;
13
14use log::{debug, trace, warn};
15use server_rpc::StatusExt;
16
17use crate::{Wallet, WalletVtxo};
18use crate::actions::lightning::pay::LightningSend;
19use crate::lock_manager::LockGuard;
20use crate::vtxo::{VtxoState, VtxoStateKind};
21
22pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
23
24/// Tagged union of every kind of checkpoint the wallet persists.
25///
26/// Used as the serialization boundary for the
27/// `bark_wallet_action_checkpoint` table; per-kind logic lives on each
28/// variant's payload type.
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub enum WalletActionCheckpoint {
31	LightningSend(LightningSend),
32}
33
34impl WalletActionCheckpoint {
35	pub fn id(&self) -> WalletActionId {
36		match self {
37			WalletActionCheckpoint::LightningSend(s) => s.id(),
38		}
39	}
40
41	pub fn as_lightning_send(&self) -> Option<&LightningSend> {
42		match self {
43			WalletActionCheckpoint::LightningSend(s) => Some(s),
44		}
45	}
46
47	pub fn into_lightning_send(self) -> Option<LightningSend> {
48		match self {
49			WalletActionCheckpoint::LightningSend(s) => Some(s),
50		}
51	}
52}
53
54impl From<LightningSend> for WalletActionCheckpoint {
55	fn from(s: LightningSend) -> Self {
56		WalletActionCheckpoint::LightningSend(s)
57	}
58}
59
60/// Stable identifier for a wallet action.
61///
62/// The id must be derivable from the action's identity (e.g. the payment
63/// hash for a lightning send) so that restarting the same action picks
64/// up the same checkpoint row.
65pub type WalletActionId = String;
66
67/// Outcome of one `WalletAction::advance` call.
68///
69/// The executor uses these to decide whether to persist, loop, schedule
70/// a wake-up or remove the checkpoint.
71pub enum Advance<A> {
72	/// Transition to a new state. Executor persists `state` and calls
73	/// `advance` on it.
74	Next(A),
75	/// Pause until something external (notification, periodic sync) or
76	/// `wake_after` (when set) re-drives the action. Executor persists
77	/// `state` and returns.
78	///
79	/// `wake_after` is a hint, not a guarantee: it lives only in this
80	/// process and is lost across restarts. `advance` MUST tolerate
81	/// being called before the hint has elapsed.
82	///
83	/// `error` is the error that caused the park, if any.
84	Park {
85		state: A,
86		wake_after: Option<Duration>,
87		error: Option<AdvanceError>,
88	},
89	/// Terminal: executor removes the checkpoint row. Any permanent fact
90	/// the action wants to retain (e.g. an "invoice paid" record) must
91	/// be written to its own table before returning `Done`.
92	Done,
93	/// Terminal: executor removes the checkpoint row because of a fatal error.
94	/// This advance should only be returned when no server change occured yet
95	/// or when process has checked server status is expected one and it is
96	/// safe to remove checkpoint
97	Failed(anyhow::Error),
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum AdvanceError {
102	#[error("An error occurred while communicating with the server: {0}")]
103	Server(tonic::Status),
104	#[error("An error occurred while processing the action: {0}")]
105	Other(#[from] anyhow::Error),
106}
107
108impl AdvanceError {
109	pub fn is_server_rejection(&self) -> bool {
110		match self {
111			AdvanceError::Server(err) => err.is_rejection(),
112			_ => false,
113		}
114	}
115}
116
117pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
118	let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
119	debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
120	Advance::Park { state, wake_after: Some(delay), error: None }
121}
122
123/// A wallet action that can be driven step-by-step.
124///
125/// Implementors define the per-kind state machine; the executor owns the
126/// loop, persistence, retry tracking and wake scheduling.
127///
128/// # Invariants
129///
130/// - `advance` MUST be re-entrant: it may be called more than once for
131///   the same logical step (after a crash, after an early wake, after a
132///   notification arrives). All side effects it triggers must therefore
133///   be idempotent.
134/// - The `id` returned MUST be stable across calls on the same logical
135///   action (different states of the same action share an id).
136/// - `on_rejection` MUST be re-entrant for the same reason as
137///   `advance`: it may run partially, crash, and be re-driven against
138///   the state the action subsequently lands in.
139#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
140#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
141pub trait WalletAction: Sized + Send + Sync {
142	fn id(&self) -> WalletActionId;
143
144	async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
145
146	async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
147		Ok(park_with_backoff(self, attempts))
148	}
149
150	async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
151		-> anyhow::Result<Advance<Self>>;
152}
153
154/// How aggressively the executor should drive an action.
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub enum DriveMode {
157	/// Drive until the action parks or completes, then return.
158	UntilParkOrDone,
159	/// Drive past parks, sleeping between iterations, until the action
160	/// returns [`Advance::Done`].
161	UntilDone,
162}
163
164impl Wallet {
165	/// List the VTXOs currently locked by a specific wallet action.
166	///
167	/// Used by the executor to free reservations when an action fails
168	/// terminally without having transitioned its vtxos through the
169	/// normal Spent/Spendable channels.
170	async fn get_vtxos_locked_by_action(
171		&self,
172		action_id: &WalletActionId,
173	) -> anyhow::Result<Vec<WalletVtxo>> {
174		let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
175		Ok(all.into_iter().filter(|v| match &v.state {
176			VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
177				id == action_id
178			},
179			_ => false,
180		}).collect())
181	}
182
183	/// Release every vtxo currently locked by the given action,
184	/// returning each one to [`crate::vtxo::VtxoState::Spendable`].
185	///
186	/// Cheap when nothing is held (no-op). Used as the cleanup hook by
187	/// the executor on `Advance::Done` and by manual cancellation via
188	/// [`Self::cancel_wallet_action`].
189	pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
190		let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
191		if vtxos.is_empty() {
192			return Ok(());
193		}
194		debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
195		self.unlock_vtxos(vtxos).await
196	}
197
198	/// Finish a wallet action: release its vtxo locks and remove the
199	/// checkpoint row. Intended for manual cleanup of stuck actions;
200	/// the normal terminal path is `Advance::Done` from `advance`.
201	pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
202		self.release_action_locks(action_id).await?;
203		self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
204		Ok(())
205	}
206
207	/// Drive a wallet action to its next park or terminal state.
208	///
209	/// Holds a per-action-id in-flight guard so concurrent drives of
210	/// the same action (e.g. the periodic sync racing a user call)
211	/// don't step on each other.
212	pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
213	where
214		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
215	{
216		let guard = match self.inner.lock_manager.try_lock(&action.id()).await {
217			Some(g) => g,
218			None => {
219				trace!("action {} is already being driven, skipping", action.id());
220				return Ok(());
221			},
222		};
223
224		self.drive_action_with_guard(action, mode, guard).await
225	}
226
227	/// Drive an action assuming the caller already holds its per-id
228	/// lock. `lock_guard` MUST be the guard returned by
229	/// `lock_manager.try_lock(&lock_key::<A>(&action.id()))`; it is
230	/// held for RAII and dropped when this function returns.
231	pub(crate) async fn drive_action_with_guard<A>(
232		&self,
233		action: A,
234		mode: DriveMode,
235		_lock_guard: Box<dyn LockGuard>,
236	) -> anyhow::Result<()>
237	where
238		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
239	{
240		self.run_action_loop(action, mode).await
241	}
242
243	async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
244	where
245		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
246	{
247		// In-memory counter for transient errors. Lives only for this
248		// drive_action call so the backoff curve resets between drives.
249		let mut retries: u32 = 0;
250
251		loop {
252			let id = action.id();
253			// Snapshot for the error path: advance consumes self, and
254			// on_rejection also takes self by value, so we need a
255			// copy around if budget exhausts.
256			let snapshot = action.clone();
257
258			let advance = match action.advance(self).await {
259				Ok(advance) => { advance },
260				Err(e) if e.is_server_rejection() => {
261					warn!("action {} got rejected by server: {:#}", id, e);
262					snapshot.on_rejection(self, e).await.inspect_err(|err| {
263						warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
264					})?
265				}
266				Err(e) => {
267					retries = retries.saturating_add(1);
268					log::error!("Got error {:?} from action {}, retrying", e, id);
269					snapshot.on_retry(self, retries).await.inspect_err(|err| {
270						warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
271					})?
272				},
273			};
274
275			match advance {
276				Advance::Next(next) => {
277					retries = 0;
278					let checkpoint: WalletActionCheckpoint = next.clone().into();
279					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
280					action = next;
281				},
282				Advance::Park { state, wake_after, error } => {
283					let checkpoint: WalletActionCheckpoint = state.clone().into();
284					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
285					match mode {
286						DriveMode::UntilParkOrDone => {
287							return match error {
288								Some(error) => Err(error.into()),
289								None => Ok(()),
290							};
291						},
292						DriveMode::UntilDone => {
293							if let Some(delay) = wake_after {
294								debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
295								tokio::time::sleep(delay).await;
296								action = state;
297							} else {
298								return Ok(());
299							}
300						},
301					}
302				},
303				Advance::Done => {
304					if let Err(e) = self.stop_wallet_action(&id).await {
305						warn!("action {} done but couldn't cancel: {:#}", id, e);
306					}
307					return Ok(());
308				},
309				Advance::Failed(e) => {
310					if let Err(e) = self.stop_wallet_action(&id).await {
311						warn!("action {} failed but couldn't cancel: {:#}", id, e);
312					}
313					return Err(e);
314				},
315			}
316		}
317	}
318}