Skip to main content

bark/actions/
mod.rs

1//! Wallet action infrastructure.
2//!
3//! A *wallet action* is a multi-step operation that moves vtxos (e.g. a
4//! lightning send). Each step is small, persists its outcome to a
5//! checkpoint, and is safe to re-drive after a crash.
6//!
7//! This module defines the generic vocabulary; per-kind machinery (state
8//! machines, transition functions) lives in submodules.
9
10pub mod lightning;
11
12use std::time::Duration;
13
14use log::{debug, trace, warn};
15use server_rpc::StatusExt;
16
17use crate::{Wallet, WalletVtxo};
18use crate::actions::lightning::pay::LightningSend;
19use crate::lock_manager::LockGuard;
20use crate::vtxo::{VtxoState, VtxoStateKind};
21
22pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
23
24/// Tagged union of every kind of checkpoint the wallet persists.
25///
26/// Used as the serialization boundary for the
27/// `bark_wallet_action_checkpoint` table; per-kind logic lives on each
28/// variant's payload type.
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub enum WalletActionCheckpoint {
31	LightningSend(LightningSend),
32}
33
34impl WalletActionCheckpoint {
35	pub fn id(&self) -> WalletActionId {
36		match self {
37			WalletActionCheckpoint::LightningSend(s) => s.id(),
38		}
39	}
40
41	pub fn as_lightning_send(&self) -> Option<&LightningSend> {
42		match self {
43			WalletActionCheckpoint::LightningSend(s) => Some(s),
44		}
45	}
46
47	pub fn into_lightning_send(self) -> Option<LightningSend> {
48		match self {
49			WalletActionCheckpoint::LightningSend(s) => Some(s),
50		}
51	}
52}
53
54impl From<LightningSend> for WalletActionCheckpoint {
55	fn from(s: LightningSend) -> Self {
56		WalletActionCheckpoint::LightningSend(s)
57	}
58}
59
60/// Stable identifier for a wallet action.
61///
62/// The id must be derivable from the action's identity (e.g. the payment
63/// hash for a lightning send) so that restarting the same action picks
64/// up the same checkpoint row.
65pub type WalletActionId = String;
66
67/// Outcome of one `WalletAction::advance` call.
68///
69/// The executor uses these to decide whether to persist, loop, schedule
70/// a wake-up or remove the checkpoint.
71pub enum Advance<A> {
72	/// Transition to a new state. Executor persists `state` and calls
73	/// `advance` on it.
74	Next(A),
75	/// Pause until something external (notification, periodic sync) or
76	/// `wake_after` (when set) re-drives the action. Executor persists
77	/// `state` and returns.
78	///
79	/// `wake_after` is a hint, not a guarantee: it lives only in this
80	/// process and is lost across restarts. `advance` MUST tolerate
81	/// being called before the hint has elapsed.
82	///
83	/// `error` is the error that caused the park, if any.
84	Park {
85		state: A,
86		wake_after: Option<Duration>,
87		error: Option<AdvanceError>,
88	},
89	/// Terminal: executor removes the checkpoint row. Any permanent fact
90	/// the action wants to retain (e.g. an "invoice paid" record) must
91	/// be written to its own table before returning `Done`.
92	Done,
93	/// Terminal: executor removes the checkpoint row because of a fatal error.
94	/// This advance should only be returned when no server change occured yet
95	/// or when process has checked server status is expected one and it is
96	/// safe to remove checkpoint
97	Failed(anyhow::Error),
98}
99
100#[derive(Debug, thiserror::Error)]
101pub enum AdvanceError {
102	#[error("An error occurred while communicating with the server: {0}")]
103	Server(tonic::Status),
104	#[error("An error occurred while processing the action: {0}")]
105	Other(#[from] anyhow::Error),
106}
107
108impl AdvanceError {
109	pub fn is_server_rejection(&self) -> bool {
110		match self {
111			AdvanceError::Server(err) => err.is_rejection(),
112			_ => false,
113		}
114	}
115}
116
117pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
118	let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
119	debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
120	Advance::Park { state, wake_after: Some(delay), error: None }
121}
122
123/// A wallet action that can be driven step-by-step.
124///
125/// Implementors define the per-kind state machine; the executor owns the
126/// loop, persistence, retry tracking and wake scheduling.
127#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
128#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
129pub trait WalletAction: Sized + Send + Sync {
130	/// Get an identifier for this action
131	///
132	/// The `id` returned MUST be stable across calls on the same logical
133	/// action (different states of the same action share an id).
134	fn id(&self) -> WalletActionId;
135
136	/// Called to advance the action state
137	///
138	/// MUST be re-entrant: it may be called more than once for the same logical
139	/// step (after a crash, after an early wake, after a notification arrives).
140	/// All side effects it triggers must therefore be idempotent.
141	async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
142
143	/// Called when the action should be retried
144	async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
145		Ok(park_with_backoff(self, attempts))
146	}
147
148	/// Called when the server rejected one of our requests
149	///
150	/// MUST be re-entrant for the same reason as [WalletAction::advance]:
151	/// it may run partially, crash, and be re-driven against the state the action
152	/// subsequently lands in.
153	async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
154		-> anyhow::Result<Advance<Self>>;
155}
156
157/// How aggressively the executor should drive an action.
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum DriveMode {
160	/// Drive until the action parks or completes, then return.
161	UntilParkOrDone,
162	/// Drive past parks, sleeping between iterations, until the action
163	/// returns [`Advance::Done`].
164	UntilDone,
165}
166
167impl Wallet {
168	/// List the VTXOs currently locked by a specific wallet action.
169	///
170	/// Used by the executor to free reservations when an action fails
171	/// terminally without having transitioned its vtxos through the
172	/// normal Spent/Spendable channels.
173	async fn get_vtxos_locked_by_action(
174		&self,
175		action_id: &WalletActionId,
176	) -> anyhow::Result<Vec<WalletVtxo>> {
177		let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
178		Ok(all.into_iter().filter(|v| match &v.state {
179			VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
180				id == action_id
181			},
182			_ => false,
183		}).collect())
184	}
185
186	/// Release every vtxo currently locked by the given action,
187	/// returning each one to [`crate::vtxo::VtxoState::Spendable`].
188	///
189	/// Cheap when nothing is held (no-op). Used as the cleanup hook by
190	/// the executor on `Advance::Done` and by manual cancellation via
191	/// [`Self::cancel_wallet_action`].
192	pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
193		let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
194		if vtxos.is_empty() {
195			return Ok(());
196		}
197		debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
198		self.unlock_vtxos(vtxos).await
199	}
200
201	/// Finish a wallet action: release its vtxo locks and remove the
202	/// checkpoint row. Intended for manual cleanup of stuck actions;
203	/// the normal terminal path is `Advance::Done` from `advance`.
204	pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
205		self.release_action_locks(action_id).await?;
206		self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
207		Ok(())
208	}
209
210	/// Drive a wallet action to its next park or terminal state.
211	///
212	/// Holds a per-action-id in-flight guard so concurrent drives of
213	/// the same action (e.g. the periodic sync racing a user call)
214	/// don't step on each other.
215	pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
216	where
217		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
218	{
219		let guard = match self.inner.lock_manager.try_lock(&action.id()).await {
220			Some(g) => g,
221			None => {
222				trace!("action {} is already being driven, skipping", action.id());
223				return Ok(());
224			},
225		};
226
227		self.drive_action_with_guard(action, mode, guard).await
228	}
229
230	/// Drive an action assuming the caller already holds its per-id
231	/// lock. `lock_guard` MUST be the guard returned by
232	/// `lock_manager.try_lock(&lock_key::<A>(&action.id()))`; it is
233	/// held for RAII and dropped when this function returns.
234	pub(crate) async fn drive_action_with_guard<A>(
235		&self,
236		action: A,
237		mode: DriveMode,
238		_lock_guard: Box<dyn LockGuard>,
239	) -> anyhow::Result<()>
240	where
241		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
242	{
243		self.run_action_loop(action, mode).await
244	}
245
246	async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
247	where
248		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
249	{
250		// In-memory counter for transient errors. Lives only for this
251		// drive_action call so the backoff curve resets between drives.
252		let mut retries: u32 = 0;
253
254		loop {
255			let id = action.id();
256			// Snapshot for the error path: advance consumes self, and
257			// on_rejection also takes self by value, so we need a
258			// copy around if budget exhausts.
259			let snapshot = action.clone();
260
261			let advance = match action.advance(self).await {
262				Ok(advance) => { advance },
263				Err(e) if e.is_server_rejection() => {
264					warn!("action {} got rejected by server: {:#}", id, e);
265					snapshot.on_rejection(self, e).await.inspect_err(|err| {
266						warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
267					})?
268				}
269				Err(e) => {
270					retries = retries.saturating_add(1);
271					log::error!("Got error {:?} from action {}, retrying", e, id);
272					snapshot.on_retry(self, retries).await.inspect_err(|err| {
273						warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
274					})?
275				},
276			};
277
278			match advance {
279				Advance::Next(next) => {
280					retries = 0;
281					let checkpoint: WalletActionCheckpoint = next.clone().into();
282					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
283					action = next;
284				},
285				Advance::Park { state, wake_after, error } => {
286					let checkpoint: WalletActionCheckpoint = state.clone().into();
287					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
288					match mode {
289						DriveMode::UntilParkOrDone => {
290							return match error {
291								Some(error) => Err(error.into()),
292								None => Ok(()),
293							};
294						},
295						DriveMode::UntilDone => {
296							if let Some(delay) = wake_after {
297								debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
298								tokio::time::sleep(delay).await;
299								action = state;
300							} else {
301								return Ok(());
302							}
303						},
304					}
305				},
306				Advance::Done => {
307					if let Err(e) = self.stop_wallet_action(&id).await {
308						warn!("action {} done but couldn't cancel: {:#}", id, e);
309					}
310					return Ok(());
311				},
312				Advance::Failed(e) => {
313					if let Err(e) = self.stop_wallet_action(&id).await {
314						warn!("action {} failed but couldn't cancel: {:#}", id, e);
315					}
316					return Err(e);
317				},
318			}
319		}
320	}
321}