use std::time::Duration;
use log::{debug, trace, warn};
use server_rpc::StatusExt;
use crate::vtxo::{VtxoState, VtxoStateKind};
use crate::{Wallet, WalletVtxo};
use crate::lock_manager::LockGuard;
pub(crate) const BASE_RETRY_BACKOFF: Duration = Duration::from_secs(1);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum WalletActionCheckpoint {
Dummy { id: String },
}
impl WalletActionCheckpoint {
pub fn id(&self) -> WalletActionId {
match self {
WalletActionCheckpoint::Dummy { id } => id.clone(),
}
}
}
pub type WalletActionId = String;
pub enum Advance<A> {
Next(A),
Park {
state: A,
wake_after: Option<Duration>,
error: Option<AdvanceError>,
},
Done,
Failed(anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub enum AdvanceError {
#[error("An error occurred while communicating with the server: {0}")]
Server(tonic::Status),
#[error("An error occurred while processing the action: {0}")]
Other(#[from] anyhow::Error),
}
impl AdvanceError {
pub fn is_server_rejection(&self) -> bool {
match self {
AdvanceError::Server(err) => err.is_rejection(),
_ => false,
}
}
}
pub fn lock_key<A: WalletAction>(id: &WalletActionId) -> String {
format!("{}.{}", A::namespace(), id)
}
pub fn park_with_backoff<A: WalletAction>(state: A, attempts: u32) -> Advance<A> {
let delay = attempts.pow(2) * BASE_RETRY_BACKOFF;
debug!("action {} retrying; sleeping {:?} before re-drive", state.id(), delay);
Advance::Park { state, wake_after: Some(delay), error: None }
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait WalletAction: Sized + Send + Sync {
fn namespace() -> &'static str;
fn id(&self) -> WalletActionId;
async fn advance(self, wallet: &Wallet) -> Result<Advance<Self>, AdvanceError>;
async fn on_retry(self, _wallet: &Wallet, attempts: u32) -> anyhow::Result<Advance<Self>> {
Ok(park_with_backoff(self, attempts))
}
async fn on_rejection(self, _wallet: &Wallet, _error: AdvanceError)
-> anyhow::Result<Advance<Self>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DriveMode {
UntilParkOrDone,
UntilDone,
}
impl Wallet {
async fn get_vtxos_locked_by_action(
&self,
action_id: &WalletActionId,
) -> anyhow::Result<Vec<WalletVtxo>> {
let all = self.inner.db.get_vtxos_by_state(&[VtxoStateKind::Locked]).await?;
Ok(all.into_iter().filter(|v| match &v.state {
VtxoState::Locked { holder: Some(crate::vtxo::VtxoLockHolder::Action { id }) } => {
id == action_id
},
_ => false,
}).collect())
}
pub async fn release_action_locks(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
let vtxos = self.get_vtxos_locked_by_action(action_id).await?;
if vtxos.is_empty() {
return Ok(());
}
debug!("releasing {} vtxo lock(s) held by action {}", vtxos.len(), action_id);
self.unlock_vtxos(vtxos).await
}
pub async fn stop_wallet_action(&self, action_id: &WalletActionId) -> anyhow::Result<()> {
self.release_action_locks(action_id).await?;
self.inner.db.remove_wallet_action_checkpoint(action_id).await?;
Ok(())
}
pub async fn drive_action<A>(&self, action: A, mode: DriveMode) -> anyhow::Result<()>
where
A: WalletAction + Into<WalletActionCheckpoint> + Clone,
{
let guard = match self.inner.lock_manager.try_lock(&lock_key::<A>(&action.id())).await {
Some(g) => g,
None => {
trace!("action {} in namespace {} is already being driven, skipping", action.id(), A::namespace());
return Ok(());
},
};
self.drive_action_with_guard(action, mode, guard).await
}
pub(crate) async fn drive_action_with_guard<A>(
&self,
action: A,
mode: DriveMode,
_lock_guard: Box<dyn LockGuard>,
) -> anyhow::Result<()>
where
A: WalletAction + Into<WalletActionCheckpoint> + Clone,
{
self.run_action_loop(action, mode).await
}
async fn run_action_loop<A>(&self, mut action: A, mode: DriveMode) -> anyhow::Result<()>
where
A: WalletAction + Into<WalletActionCheckpoint> + Clone,
{
let mut retries: u32 = 0;
loop {
let id = action.id();
let snapshot = action.clone();
let advance = match action.advance(self).await {
Ok(advance) => { advance },
Err(e) if e.is_server_rejection() => {
warn!("action {} got rejected by server: {:#}", id, e);
snapshot.on_rejection(self, e).await.inspect_err(|err| {
warn!("action {} on_rejection failed, leaving checkpoint for retry: {:#}", id, err);
})?
}
Err(e) => {
retries = retries.saturating_add(1);
log::error!("Got error {:?} from action {}, retrying", e, id);
snapshot.on_retry(self, retries).await.inspect_err(|err| {
warn!("action {} on_retry failed, leaving checkpoint for retry: {:#}", id, err);
})?
},
};
match advance {
Advance::Next(next) => {
retries = 0;
let checkpoint: WalletActionCheckpoint = next.clone().into();
self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
action = next;
},
Advance::Park { state, wake_after, error } => {
let checkpoint: WalletActionCheckpoint = state.clone().into();
self.inner.db.upsert_wallet_action_checkpoint(&id, &checkpoint).await?;
match mode {
DriveMode::UntilParkOrDone => {
return match error {
Some(error) => Err(error.into()),
None => Ok(()),
};
},
DriveMode::UntilDone => {
if let Some(delay) = wake_after {
debug!("action {} parked; sleeping {:?} before re-drive", id, delay);
tokio::time::sleep(delay).await;
action = state;
} else {
return Ok(());
}
},
}
},
Advance::Done => {
if let Err(e) = self.stop_wallet_action(&id).await {
warn!("action {} done but couldn't cancel: {:#}", id, e);
}
return Ok(());
},
Advance::Failed(e) => {
if let Err(e) = self.stop_wallet_action(&id).await {
warn!("action {} failed but couldn't cancel: {:#}", id, e);
}
return Err(e);
},
}
}
}
}