1use std::time::Duration;
11
12use log::{debug, trace, warn};
13use server_rpc::StatusExt;
14
15use crate::vtxo::{VtxoState, VtxoStateKind};
16use crate::{Wallet, WalletVtxo};
17use crate::lock_manager::LockGuard;
18
19pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
20
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub enum WalletActionCheckpoint {
28 Dummy { id: String },
29}
30
31impl WalletActionCheckpoint {
32 pub fn id(&self) -> WalletActionId {
33 match self {
34 WalletActionCheckpoint::Dummy { id } => id.clone(),
35 }
36 }
37}
38
39pub type WalletActionId = String;
45
46pub enum Advance<A> {
51 Next(A),
54 Park {
64 state: A,
65 wake_after: Option<Duration>,
66 error: Option<AdvanceError>,
67 },
68 Done,
72 Failed(anyhow::Error),
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum AdvanceError {
81 #[error("An error occurred while communicating with the server: {0}")]
82 Server(tonic::Status),
83 #[error("An error occurred while processing the action: {0}")]
84 Other(#[from] anyhow::Error),
85}
86
87impl AdvanceError {
88 pub fn is_server_rejection(&self) -> bool {
89 match self {
90 AdvanceError::Server(err) => err.is_rejection(),
91 _ => false,
92 }
93 }
94}
95
96pub fn lock_key<A: WalletAction>(id: &WalletActionId) -> String {
97 format!("{}.{}", A::namespace(), id)
98}
99
100pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
101 let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
102 debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
103 Advance::Park { state, wake_after: Some(delay), error: None }
104}
105
106#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
123#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
124pub trait WalletAction: Sized + Send + Sync {
125 fn namespace() -> &'static str;
126 fn id(&self) -> WalletActionId;
127
128 async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
129
130 async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
131 Ok(park_with_backoff(self, attempts))
132 }
133
134 async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
135 -> anyhow::Result<Advance<Self>>;
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub enum DriveMode {
141 UntilParkOrDone,
143 UntilDone,
146}
147
148impl Wallet {
149 async fn get_vtxos_locked_by_action(
155 &self,
156 action_id: &WalletActionId,
157 ) -> anyhow::Result<Vec<WalletVtxo>> {
158 let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
159 Ok(all.into_iter().filter(|v| match &v.state {
160 VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
161 id == action_id
162 },
163 _ => false,
164 }).collect())
165 }
166
167 pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
174 let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
175 if vtxos.is_empty() {
176 return Ok(());
177 }
178 debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
179 self.unlock_vtxos(vtxos).await
180 }
181
182 pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
186 self.release_action_locks(action_id).await?;
187 self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
188 Ok(())
189 }
190
191 pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
197 where
198 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
199 {
200 let guard = match self.inner.lock_manager.try_lock(&lock_key::<A>(&action.id())).await {
201 Some(g) => g,
202 None => {
203 trace!("action {} in namespace {} is already being driven, skipping", action.id(), A::namespace());
204 return Ok(());
205 },
206 };
207
208 self.drive_action_with_guard(action, mode, guard).await
209 }
210
211 pub(crate) async fn drive_action_with_guard<A>(
216 &self,
217 action: A,
218 mode: DriveMode,
219 _lock_guard: Box<dyn LockGuard>,
220 ) -> anyhow::Result<()>
221 where
222 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
223 {
224 self.run_action_loop(action, mode).await
225 }
226
227 async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
228 where
229 A: WalletAction + Into<WalletActionCheckpoint> + Clone,
230 {
231 let mut retries: u32 = 0;
234
235 loop {
236 let id = action.id();
237 let snapshot = action.clone();
241
242 let advance = match action.advance(self).await {
243 Ok(advance) => { advance },
244 Err(e) if e.is_server_rejection() => {
245 warn!("action {} got rejected by server: {:#}", id, e);
246 snapshot.on_rejection(self, e).await.inspect_err(|err| {
247 warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
248 })?
249 }
250 Err(e) => {
251 retries = retries.saturating_add(1);
252 log::error!("Got error {:?} from action {}, retrying", e, id);
253 snapshot.on_retry(self, retries).await.inspect_err(|err| {
254 warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
255 })?
256 },
257 };
258
259 match advance {
260 Advance::Next(next) => {
261 retries = 0;
262 let checkpoint: WalletActionCheckpoint = next.clone().into();
263 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
264 action = next;
265 },
266 Advance::Park { state, wake_after, error } => {
267 let checkpoint: WalletActionCheckpoint = state.clone().into();
268 self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
269 match mode {
270 DriveMode::UntilParkOrDone => {
271 return match error {
272 Some(error) => Err(error.into()),
273 None => Ok(()),
274 };
275 },
276 DriveMode::UntilDone => {
277 if let Some(delay) = wake_after {
278 debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
279 tokio::time::sleep(delay).await;
280 action = state;
281 } else {
282 return Ok(());
283 }
284 },
285 }
286 },
287 Advance::Done => {
288 if let Err(e) = self.stop_wallet_action(&id).await {
289 warn!("action {} done but couldn't cancel: {:#}", id, e);
290 }
291 return Ok(());
292 },
293 Advance::Failed(e) => {
294 if let Err(e) = self.stop_wallet_action(&id).await {
295 warn!("action {} failed but couldn't cancel: {:#}", id, e);
296 }
297 return Err(e);
298 },
299 }
300 }
301 }
302}