use std::collections::VecDeque;
use std::sync::Arc;
use cdk_common::database::DynMintDatabase;
use cdk_common::mint::{Operation, Saga, SwapSagaState};
use cdk_common::nuts::BlindedMessage;
use cdk_common::{database, Error, Proofs, ProofsMethods, PublicKey, QuoteId, State};
use tracing::instrument;
use self::compensation::{CompensatingAction, RemoveSwapSetup};
use self::state::{Initial, SetupComplete, Signed};
use crate::mint::subscription::PubSubManager;
use crate::Mint;
pub mod compensation;
mod state;
#[cfg(test)]
mod tests;
pub struct SwapSaga<'a, S> {
mint: &'a super::Mint,
db: DynMintDatabase,
pubsub: Arc<PubSubManager>,
compensations: VecDeque<Box<dyn CompensatingAction>>,
operation_id: uuid::Uuid,
state_data: S,
}
impl<'a> SwapSaga<'a, Initial> {
pub fn new(mint: &'a super::Mint, db: DynMintDatabase, pubsub: Arc<PubSubManager>) -> Self {
let operation_id = uuid::Uuid::new_v4();
Self {
mint,
db,
pubsub,
compensations: VecDeque::new(),
operation_id,
state_data: Initial { operation_id },
}
}
#[instrument(skip_all)]
pub async fn setup_swap(
mut self,
input_proofs: &Proofs,
blinded_messages: &[BlindedMessage],
quote_id: Option<QuoteId>,
input_verification: crate::mint::Verification,
) -> Result<SwapSaga<'a, SetupComplete>, Error> {
let output_verification = self.mint.verify_outputs(blinded_messages).map_err(|err| {
tracing::debug!("Output verification failed: {:?}", err);
err
})?;
self.mint
.verify_transaction_balanced(
input_verification.clone(),
output_verification.clone(),
input_proofs,
)
.await?;
let total_redeemed = input_verification.amount;
let total_issued = output_verification.amount;
let fee_breakdown = self.mint.get_proofs_fee(input_proofs).await?;
let operation = Operation::new(
self.state_data.operation_id,
cdk_common::mint::OperationKind::Swap,
total_issued.clone().into(),
total_redeemed.clone().into(),
fee_breakdown.total,
None, None, );
let mut tx = self.db.begin_transaction().await?;
let mut new_proofs = match tx
.add_proofs(input_proofs.clone(), quote_id.clone(), &operation)
.await
{
Ok(proofs) => proofs,
Err(err) => {
tx.rollback().await?;
return Err(match err {
database::Error::Duplicate => Error::TokenPending,
database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent,
_ => Error::Database(err),
});
}
};
let ys = match input_proofs.ys() {
Ok(ys) => ys,
Err(err) => return Err(Error::NUT00(err)),
};
if let Err(err) = Mint::update_proofs_state(&mut tx, &mut new_proofs, State::Pending).await
{
tx.rollback().await?;
return Err(err);
}
if let Err(err) = tx
.add_blinded_messages(quote_id.as_ref(), blinded_messages, &operation)
.await
{
tx.rollback().await?;
return Err(match err {
database::Error::Duplicate => Error::DuplicateOutputs,
_ => Error::Database(err),
});
}
let blinded_messages_vec = blinded_messages.to_vec();
let blinded_secrets: Vec<PublicKey> = blinded_messages_vec
.iter()
.map(|bm| bm.blinded_secret)
.collect();
let saga = Saga::new_swap(self.operation_id, SwapSagaState::SetupComplete);
if let Err(err) = tx.add_saga(&saga).await {
tx.rollback().await?;
return Err(err.into());
}
tx.commit().await?;
for pk in &ys {
self.pubsub.proof_state((*pk, State::Pending));
}
self.compensations.push_front(Box::new(RemoveSwapSetup {
blinded_secrets: blinded_secrets.clone(),
input_ys: ys.clone(),
operation_id: self.operation_id,
}));
Ok(SwapSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation_id: self.operation_id,
state_data: SetupComplete {
blinded_messages: blinded_messages_vec,
ys,
operation,
fee_breakdown,
},
})
}
}
impl<'a> SwapSaga<'a, SetupComplete> {
#[instrument(skip_all)]
pub async fn sign_outputs(self) -> Result<SwapSaga<'a, Signed>, Error> {
match self
.mint
.blind_sign(self.state_data.blinded_messages.clone())
.await
{
Ok(signatures) => {
Ok(SwapSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation_id: self.operation_id,
state_data: Signed {
blinded_messages: self.state_data.blinded_messages,
ys: self.state_data.ys,
signatures,
operation: self.state_data.operation,
fee_breakdown: self.state_data.fee_breakdown,
},
})
}
Err(err) => {
self.compensate_all().await?;
Err(err)
}
}
}
}
impl SwapSaga<'_, Signed> {
#[instrument(skip_all)]
pub async fn finalize(mut self) -> Result<cdk_common::nuts::SwapResponse, Error> {
let blinded_secrets: Vec<PublicKey> = self
.state_data
.blinded_messages
.iter()
.map(|bm| bm.blinded_secret)
.collect();
let mut tx = self.db.begin_transaction().await?;
#[cfg(test)]
{
if crate::test_helpers::mint::should_fail_for("ADD_SIGNATURES") {
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::Database(database::Error::Database(
"Test failure: ADD_SIGNATURES".into(),
)));
}
}
if let Err(err) = tx
.add_blind_signatures(&blinded_secrets, &self.state_data.signatures, None)
.await
{
tx.rollback().await?;
self.compensate_all().await?;
return Err(err.into());
}
#[cfg(test)]
{
if crate::test_helpers::mint::should_fail_for("UPDATE_PROOFS") {
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::Database(database::Error::Database(
"Test failure: UPDATE_PROOFS".into(),
)));
}
}
let mut proofs = match tx.get_proofs(&self.state_data.ys).await {
Ok(proofs) => proofs,
Err(err) => {
tx.rollback().await?;
self.compensate_all().await?;
return Err(err.into());
}
};
if let Err(err) = Mint::update_proofs_state(&mut tx, &mut proofs, State::Spent).await {
tx.rollback().await?;
self.compensate_all().await?;
return Err(err);
}
if let Err(err) = tx
.add_completed_operation(
&self.state_data.operation,
&self.state_data.fee_breakdown.per_keyset,
)
.await
{
tx.rollback().await?;
self.compensate_all().await?;
return Err(err.into());
}
if let Err(e) = tx.delete_saga(&self.operation_id).await {
tracing::warn!(
"Failed to delete saga in finalize (will be cleaned up on recovery): {}",
e
);
}
tx.commit().await?;
for pk in &self.state_data.ys {
self.pubsub.proof_state((*pk, State::Spent));
}
self.compensations.clear();
Ok(cdk_common::nuts::SwapResponse::new(
self.state_data.signatures,
))
}
}
impl<S> SwapSaga<'_, S> {
#[instrument(skip_all)]
async fn compensate_all(self) -> Result<(), Error> {
let mut compensations = self.compensations;
if compensations.is_empty() {
return Ok(());
}
#[cfg(feature = "prometheus")]
{
use cdk_prometheus::METRICS;
self.mint.record_swap_failure("process_swap_request");
METRICS.dec_in_flight_requests("process_swap_request");
}
tracing::warn!("Running {} compensating actions", compensations.len());
while let Some(compensation) = compensations.pop_front() {
tracing::debug!("Running compensation: {}", compensation.name());
if let Err(e) = compensation.execute(&self.db, &self.pubsub).await {
tracing::error!(
"Compensation {} failed: {}. Continuing...",
compensation.name(),
e
);
}
}
Ok(())
}
}