Skip to main content

bark/actions/
mod.rs

1//! Wallet action infrastructure.
2//!
3//! A *wallet action* is a multi-step operation that moves vtxos (e.g. a
4//! lightning send). Each step is small, persists its outcome to a
5//! checkpoint, and is safe to re-drive after a crash.
6//!
7//! This module defines the generic vocabulary; per-kind machinery (state
8//! machines, transition functions) lives in submodules.
9
10use std::time::Duration;
11
12use log::{debug, trace, warn};
13use server_rpc::StatusExt;
14
15use crate::vtxo::{VtxoState, VtxoStateKind};
16use crate::{Wallet, WalletVtxo};
17use crate::lock_manager::LockGuard;
18
19pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
20
21/// Tagged union of every kind of checkpoint the wallet persists.
22///
23/// Used as the serialization boundary for the
24/// `bark_wallet_action_checkpoint` table; per-kind logic lives on each
25/// variant's payload type.
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub enum WalletActionCheckpoint {
28	Dummy { id: String },
29}
30
31impl WalletActionCheckpoint {
32	pub fn id(&self) -> WalletActionId {
33		match self {
34			WalletActionCheckpoint::Dummy { id } => id.clone(),
35		}
36	}
37}
38
39/// Stable identifier for a wallet action.
40///
41/// The id must be derivable from the action's identity (e.g. the payment
42/// hash for a lightning send) so that restarting the same action picks
43/// up the same checkpoint row.
44pub type WalletActionId = String;
45
46/// Outcome of one `WalletAction::advance` call.
47///
48/// The executor uses these to decide whether to persist, loop, schedule
49/// a wake-up or remove the checkpoint.
50pub enum Advance<A> {
51	/// Transition to a new state. Executor persists `state` and calls
52	/// `advance` on it.
53	Next(A),
54	/// Pause until something external (notification, periodic sync) or
55	/// `wake_after` (when set) re-drives the action. Executor persists
56	/// `state` and returns.
57	///
58	/// `wake_after` is a hint, not a guarantee: it lives only in this
59	/// process and is lost across restarts. `advance` MUST tolerate
60	/// being called before the hint has elapsed.
61	///
62	/// `error` is the error that caused the park, if any.
63	Park {
64		state: A,
65		wake_after: Option<Duration>,
66		error: Option<AdvanceError>,
67	},
68	/// Terminal: executor removes the checkpoint row. Any permanent fact
69	/// the action wants to retain (e.g. an "invoice paid" record) must
70	/// be written to its own table before returning `Done`.
71	Done,
72	/// Terminal: executor removes the checkpoint row because of a fatal error.
73	/// This advance should only be returned when no server change occured yet
74	/// or when process has checked server status is expected one and it is
75	/// safe to remove checkpoint
76	Failed(anyhow::Error),
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum AdvanceError {
81	#[error("An error occurred while communicating with the server: {0}")]
82	Server(tonic::Status),
83	#[error("An error occurred while processing the action: {0}")]
84	Other(#[from] anyhow::Error),
85}
86
87impl AdvanceError {
88	pub fn is_server_rejection(&self) -> bool {
89		match self {
90			AdvanceError::Server(err) => err.is_rejection(),
91			_ => false,
92		}
93	}
94}
95
96pub fn lock_key<A: WalletAction>(id: &WalletActionId) -> String {
97	format!("{}.{}", A::namespace(), id)
98}
99
100pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
101	let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
102	debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
103	Advance::Park { state, wake_after: Some(delay), error: None }
104}
105
106/// A wallet action that can be driven step-by-step.
107///
108/// Implementors define the per-kind state machine; the executor owns the
109/// loop, persistence, retry tracking and wake scheduling.
110///
111/// # Invariants
112///
113/// - `advance` MUST be re-entrant: it may be called more than once for
114///   the same logical step (after a crash, after an early wake, after a
115///   notification arrives). All side effects it triggers must therefore
116///   be idempotent.
117/// - The `id` returned MUST be stable across calls on the same logical
118///   action (different states of the same action share an id).
119/// - `on_rejection` MUST be re-entrant for the same reason as
120///   `advance`: it may run partially, crash, and be re-driven against
121///   the state the action subsequently lands in.
122#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
123#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
124pub trait WalletAction: Sized + Send + Sync {
125	fn namespace() -> &'static str;
126	fn id(&self) -> WalletActionId;
127
128	async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
129
130	async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
131		Ok(park_with_backoff(self, attempts))
132	}
133
134	async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
135		-> anyhow::Result<Advance<Self>>;
136}
137
138/// How aggressively the executor should drive an action.
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub enum DriveMode {
141	/// Drive until the action parks or completes, then return.
142	UntilParkOrDone,
143	/// Drive past parks, sleeping between iterations, until the action
144	/// returns [`Advance::Done`].
145	UntilDone,
146}
147
148impl Wallet {
149	/// List the VTXOs currently locked by a specific wallet action.
150	///
151	/// Used by the executor to free reservations when an action fails
152	/// terminally without having transitioned its vtxos through the
153	/// normal Spent/Spendable channels.
154	async fn get_vtxos_locked_by_action(
155		&self,
156		action_id: &WalletActionId,
157	) -> anyhow::Result<Vec<WalletVtxo>> {
158		let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
159		Ok(all.into_iter().filter(|v| match &v.state {
160			VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
161				id == action_id
162			},
163			_ => false,
164		}).collect())
165	}
166
167	/// Release every vtxo currently locked by the given action,
168	/// returning each one to [`crate::vtxo::VtxoState::Spendable`].
169	///
170	/// Cheap when nothing is held (no-op). Used as the cleanup hook by
171	/// the executor on `Advance::Done` and by manual cancellation via
172	/// [`Self::cancel_wallet_action`].
173	pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
174		let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
175		if vtxos.is_empty() {
176			return Ok(());
177		}
178		debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
179		self.unlock_vtxos(vtxos).await
180	}
181
182	/// Finish a wallet action: release its vtxo locks and remove the
183	/// checkpoint row. Intended for manual cleanup of stuck actions;
184	/// the normal terminal path is `Advance::Done` from `advance`.
185	pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
186		self.release_action_locks(action_id).await?;
187		self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
188		Ok(())
189	}
190
191	/// Drive a wallet action to its next park or terminal state.
192	///
193	/// Holds a per-action-id in-flight guard so concurrent drives of
194	/// the same action (e.g. the periodic sync racing a user call)
195	/// don't step on each other.
196	pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
197	where
198		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
199	{
200		let guard = match self.inner.lock_manager.try_lock(&lock_key::<A>(&action.id())).await {
201			Some(g) => g,
202			None => {
203				trace!("action {} in namespace {} is already being driven, skipping", action.id(), A::namespace());
204				return Ok(());
205			},
206		};
207
208		self.drive_action_with_guard(action, mode, guard).await
209	}
210
211	/// Drive an action assuming the caller already holds its per-id
212	/// lock. `lock_guard` MUST be the guard returned by
213	/// `lock_manager.try_lock(&lock_key::<A>(&action.id()))`; it is
214	/// held for RAII and dropped when this function returns.
215	pub(crate) async fn drive_action_with_guard<A>(
216		&self,
217		action: A,
218		mode: DriveMode,
219		_lock_guard: Box<dyn LockGuard>,
220	) -> anyhow::Result<()>
221	where
222		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
223	{
224		self.run_action_loop(action, mode).await
225	}
226
227	async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
228	where
229		A: WalletAction + Into<WalletActionCheckpoint> + Clone,
230	{
231		// In-memory counter for transient errors. Lives only for this
232		// drive_action call so the backoff curve resets between drives.
233		let mut retries: u32 = 0;
234
235		loop {
236			let id = action.id();
237			// Snapshot for the error path: advance consumes self, and
238			// on_rejection also takes self by value, so we need a
239			// copy around if budget exhausts.
240			let snapshot = action.clone();
241
242			let advance = match action.advance(self).await {
243				Ok(advance) => { advance },
244				Err(e) if e.is_server_rejection() => {
245					warn!("action {} got rejected by server: {:#}", id, e);
246					snapshot.on_rejection(self, e).await.inspect_err(|err| {
247						warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
248					})?
249				}
250				Err(e) => {
251					retries = retries.saturating_add(1);
252					log::error!("Got error {:?} from action {}, retrying", e, id);
253					snapshot.on_retry(self, retries).await.inspect_err(|err| {
254						warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
255					})?
256				},
257			};
258
259			match advance {
260				Advance::Next(next) => {
261					retries = 0;
262					let checkpoint: WalletActionCheckpoint = next.clone().into();
263					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
264					action = next;
265				},
266				Advance::Park { state, wake_after, error } => {
267					let checkpoint: WalletActionCheckpoint = state.clone().into();
268					self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
269					match mode {
270						DriveMode::UntilParkOrDone => {
271							return match error {
272								Some(error) => Err(error.into()),
273								None => Ok(()),
274							};
275						},
276						DriveMode::UntilDone => {
277							if let Some(delay) = wake_after {
278								debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
279								tokio::time::sleep(delay).await;
280								action = state;
281							} else {
282								return Ok(());
283							}
284						},
285					}
286				},
287				Advance::Done => {
288					if let Err(e) = self.stop_wallet_action(&id).await {
289						warn!("action {} done but couldn't cancel: {:#}", id, e);
290					}
291					return Ok(());
292				},
293				Advance::Failed(e) => {
294					if let Err(e) = self.stop_wallet_action(&id).await {
295						warn!("action {} failed but couldn't cancel: {:#}", id, e);
296					}
297					return Err(e);
298				},
299			}
300		}
301	}
302}