use cdk_common::database::mint::Acquired;
use cdk_common::database::{self, DynMintDatabase};
use cdk_common::mint::{self as mint_types};
use cdk_common::nuts::{BlindSignature, BlindedMessage, MeltQuoteState, Proofs, State};
use cdk_common::{Amount, CurrencyUnit, Error, PublicKey, QuoteId};
#[cfg(feature = "prometheus")]
use cdk_prometheus::METRICS;
use cdk_signatory::signatory::SignatoryKeySet;
use crate::mint::subscription::PubSubManager;
use crate::mint::MeltQuote;
use crate::Mint;
pub fn get_keyset_fee_and_amounts(
keysets: &arc_swap::ArcSwap<Vec<SignatoryKeySet>>,
outputs: &[BlindedMessage],
) -> cdk_common::amount::FeeAndAmounts {
keysets
.load()
.iter()
.filter_map(|keyset| {
if keyset.active && Some(keyset.id) == outputs.first().map(|x| x.keyset_id) {
Some((keyset.input_fee_ppk, keyset.amounts.clone()).into())
} else {
None
}
})
.next()
.unwrap_or_else(|| (0, (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>()).into())
}
#[cfg(feature = "prometheus")]
fn amount_as_sats(amount: &Amount<CurrencyUnit>) -> Option<f64> {
amount.to_msat().ok().map(|msats| msats as f64 / 1000.0)
}
#[cfg(feature = "prometheus")]
fn record_confirmed_payment_metrics(quote: &MeltQuote, total_spent: &Amount<CurrencyUnit>) {
let payment_method = quote.payment_method.as_str();
let quote_amount = quote.amount();
METRICS.record_payment_total(payment_method);
if let Some(quote_amount_sats) = amount_as_sats("e_amount) {
METRICS.record_payment_amount(payment_method, quote_amount_sats);
}
if let Ok(quote_amount) = quote_amount.convert_to(total_spent.unit()) {
if let Ok(payment_fee) = total_spent.checked_sub("e_amount) {
if let Some(payment_fee_sats) = amount_as_sats(&payment_fee) {
METRICS.record_payment_fee(payment_method, payment_fee_sats);
}
}
}
}
pub async fn rollback_melt_quote(
db: &DynMintDatabase,
pubsub: &PubSubManager,
quote_id: &QuoteId,
input_ys: &[PublicKey],
blinded_secrets: &[PublicKey],
operation_id: &uuid::Uuid,
) -> Result<(), Error> {
if input_ys.is_empty() && blinded_secrets.is_empty() {
return Ok(());
}
tracing::info!(
"Rolling back melt quote {} ({} proofs, {} blinded messages, saga {})",
quote_id,
input_ys.len(),
blinded_secrets.len(),
operation_id
);
let mut tx = db.begin_transaction().await?;
let mut proofs_recovered = false;
if !input_ys.is_empty() {
match tx.remove_proofs(input_ys, Some(quote_id.clone())).await {
Ok(_) => {
proofs_recovered = true;
}
Err(database::Error::AttemptRemoveSpentProof) => {
tracing::warn!(
"Proofs already spent or missing during rollback for quote {}",
quote_id
);
}
Err(e) => return Err(e.into()),
}
}
if !blinded_secrets.is_empty() {
tx.delete_blinded_messages(blinded_secrets).await?;
}
let quote_option = if let Some(mut quote) = tx.get_melt_quote(quote_id).await? {
match quote.state {
MeltQuoteState::Pending => {
tx.update_melt_quote_state(&mut quote, MeltQuoteState::Unpaid, None)
.await?;
Some(quote)
}
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
None
}
MeltQuoteState::Paid => {
tx.rollback().await?;
return Err(Error::PaidQuote);
}
state => {
tracing::warn!(
"Refusing rollback for melt quote {} in unexpected state {}",
quote_id,
state
);
tx.rollback().await?;
return Err(Error::UnknownPaymentState);
}
}
} else {
None
};
tx.delete_melt_request(quote_id).await?;
if let Err(e) = tx.delete_saga(operation_id).await {
tracing::warn!(
"Failed to delete saga {} during rollback: {}",
operation_id,
e
);
}
tx.commit().await?;
if proofs_recovered {
for pk in input_ys.iter() {
pubsub.proof_state((*pk, State::Unspent));
}
}
if let Some(quote) = quote_option {
pubsub.melt_quote_status("e, None, None, MeltQuoteState::Unpaid);
}
tracing::info!(
"Successfully rolled back melt quote {} and deleted saga {}",
quote_id,
operation_id
);
Ok(())
}
pub async fn process_melt_change(
mint: &super::super::Mint,
db: &DynMintDatabase,
quote_id: &QuoteId,
inputs_amount: Amount<CurrencyUnit>,
total_spent: Amount<CurrencyUnit>,
inputs_fee: Amount<CurrencyUnit>,
change_outputs: Vec<BlindedMessage>,
) -> Result<
(
Option<Vec<BlindSignature>>,
Box<dyn database::MintTransaction<database::Error> + Send + Sync>,
),
Error,
> {
let change_target: Amount = match inputs_amount
.checked_sub(&total_spent)
.ok()
.and_then(|rem| rem.checked_sub(&inputs_fee).ok())
{
Some(amt) if amt.value() > 0 => amt.into(),
Some(_) => {
let tx = db.begin_transaction().await?;
return Ok((None, tx));
}
None => {
tracing::warn!(
"Fee was too high for quote {}. inputs_amount: {}, total_spent: {}, inputs_fee: {}",
quote_id,
inputs_amount,
total_spent,
inputs_fee
);
let tx = db.begin_transaction().await?;
return Ok((None, tx));
}
};
if change_outputs.is_empty() {
let tx = db.begin_transaction().await?;
return Ok((None, tx));
}
let fee_and_amounts = get_keyset_fee_and_amounts(&mint.keysets, &change_outputs);
let mut amounts: Vec<Amount> = change_target.split(&fee_and_amounts)?;
if change_outputs.len() < amounts.len() {
tracing::debug!(
"Providing change requires {} blinded messages, but only {} provided",
amounts.len(),
change_outputs.len()
);
amounts.sort_by(|a, b| b.cmp(a));
}
let mut blinded_messages_to_sign = vec![];
for (amount, mut blinded_message) in amounts.iter().zip(change_outputs.iter().cloned()) {
blinded_message.amount = *amount;
blinded_messages_to_sign.push(blinded_message);
}
let change_sigs = mint.blind_sign(blinded_messages_to_sign.clone()).await?;
let mut tx = db.begin_transaction().await?;
let blinded_secrets: Vec<_> = blinded_messages_to_sign
.iter()
.map(|bm| bm.blinded_secret)
.collect();
tx.add_blind_signatures(&blinded_secrets, &change_sigs, Some(quote_id.clone()))
.await?;
Ok((Some(change_sigs), tx))
}
pub async fn load_melt_quotes_exclusively(
tx: &mut Box<dyn database::MintTransaction<database::Error> + Send + Sync>,
quote_id: &QuoteId,
) -> Result<Acquired<MeltQuote>, Error> {
let locked = tx
.lock_melt_quote_and_related(quote_id)
.await
.map_err(|e| match e {
database::Error::Locked => {
tracing::warn!("Quote {quote_id} or related quotes are locked by another process");
database::Error::Duplicate
}
e => e,
})?;
let quote = locked.target.ok_or(Error::UnknownQuote)?;
if let Some(conflict) = locked.all_related.iter().find(|locked_quote| {
locked_quote.id != quote.id
&& (locked_quote.state == MeltQuoteState::Pending
|| locked_quote.state == MeltQuoteState::Paid)
}) {
tracing::warn!(
"Cannot transition quote {} to Pending: another quote with lookup_id {:?} is already {:?}",
quote.id,
quote.request_lookup_id,
conflict.state,
);
return Err(match conflict.state {
MeltQuoteState::Pending => Error::PendingQuote,
MeltQuoteState::Paid => Error::RequestAlreadyPaid,
_ => unreachable!("Only Pending/Paid states reach this branch"),
});
}
Ok(quote)
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn finalize_melt_core(
mut tx: Box<dyn database::MintTransaction<database::Error> + Send + Sync>,
pubsub: &PubSubManager,
mut quote: Acquired<MeltQuote>,
input_ys: &[PublicKey],
inputs_amount: Amount<CurrencyUnit>,
inputs_fee: Amount<CurrencyUnit>,
total_spent: Amount<CurrencyUnit>,
payment_proof: Option<String>,
payment_lookup_id: &cdk_common::payment::PaymentIdentifier,
) -> Result<(Proofs, MeltQuote), Error> {
if quote.amount() > total_spent {
tracing::error!(
"Payment amount {} is less than quote amount {} for quote {}",
total_spent,
quote.amount(),
quote.id
);
tx.rollback().await?;
return Err(Error::IncorrectQuoteAmount);
}
let net_inputs = match inputs_amount.checked_sub(&inputs_fee) {
Ok(net_inputs) => net_inputs,
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
let total_spent = match total_spent.convert_to(net_inputs.unit()) {
Ok(total_spent) => total_spent,
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
tracing::debug!(
"Melt validation for quote {}: inputs_amount={}, inputs_fee={}, net_inputs={}, total_spent={}, quote_amount={}, fee_reserve={}",
quote.id,
inputs_amount.display_with_unit(),
inputs_fee.display_with_unit(),
net_inputs.display_with_unit(),
total_spent.display_with_unit(),
quote.amount().display_with_unit(),
quote.fee_reserve().display_with_unit(),
);
debug_assert!(
net_inputs >= total_spent,
"Over paid melt quote {}: net_inputs ({}) < total_spent ({}). Payment already complete, finalizing with no change.",
quote.id,
net_inputs.display_with_unit(),
total_spent.display_with_unit(),
);
if net_inputs < total_spent {
tracing::error!(
"Over paid melt quote {}: net_inputs ({}) < total_spent ({}). Payment already complete, finalizing with no change.",
quote.id,
net_inputs.display_with_unit(),
total_spent.display_with_unit(),
);
}
if let Err(err) = tx
.update_melt_quote_state(&mut quote, MeltQuoteState::Paid, payment_proof.clone())
.await
{
tx.rollback().await?;
return Err(err.into());
}
quote.state = MeltQuoteState::Paid;
if quote.request_lookup_id.as_ref() != Some(payment_lookup_id) {
tracing::info!(
"Payment lookup id changed post payment from {:?} to {}",
"e.request_lookup_id,
payment_lookup_id
);
if let Err(err) = tx
.update_melt_quote_request_lookup_id(&mut quote, payment_lookup_id)
.await
{
tx.rollback().await?;
return Err(err.into());
}
}
let mut proofs = match tx.get_proofs(input_ys).await {
Ok(proofs) => proofs,
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
if let Err(err) = Mint::update_proofs_state(&mut tx, &mut proofs, State::Spent).await {
tx.rollback().await?;
return Err(err);
}
tx.commit().await?;
for pk in input_ys.iter() {
pubsub.proof_state((*pk, State::Spent));
}
Ok((proofs.to_vec(), quote.inner()))
}
#[allow(clippy::too_many_arguments)]
pub async fn finalize_melt_quote(
mint: &super::super::Mint,
db: &DynMintDatabase,
pubsub: &PubSubManager,
quote: &MeltQuote,
total_spent: Amount<CurrencyUnit>,
payment_proof: Option<String>,
payment_lookup_id: &cdk_common::payment::PaymentIdentifier,
operation_id: Option<uuid::Uuid>,
) -> Result<Option<Vec<BlindSignature>>, Error> {
tracing::info!("Finalizing melt quote {}", quote.id);
let settlement_matches = |stored_quote: &MeltQuote| {
stored_quote.request_lookup_id.as_ref() == Some(payment_lookup_id)
&& stored_quote.payment_proof == payment_proof
};
let mut tx = db.begin_transaction().await?;
let locked_quote = load_melt_quotes_exclusively(&mut tx, "e.id).await?;
let melt_request_info = match tx.get_melt_request_and_blinded_messages("e.id).await? {
Some(info) => info,
None => {
if locked_quote.state == MeltQuoteState::Paid {
let locked_quote = locked_quote.inner();
if locked_quote.request_lookup_id.as_ref() != Some(payment_lookup_id)
|| locked_quote.payment_proof != payment_proof
{
tx.rollback().await?;
return Err(Error::PaidQuote);
}
}
tracing::warn!(
"No melt request found for quote {} - may have been completed already",
quote.id
);
if let Some(op_id) = operation_id {
if let Err(e) = tx.delete_saga(&op_id).await {
tracing::warn!("Failed to delete saga {} during early return: {}", op_id, e);
}
tx.commit().await?;
} else {
tx.rollback().await?;
}
let sigs = db.get_blind_signatures_for_quote("e.id).await?;
return Ok(if sigs.is_empty() { None } else { Some(sigs) });
}
};
let input_ys = tx.get_proof_ys_by_quote_id("e.id).await?;
if input_ys.is_empty() {
tracing::warn!(
"No input proofs found for quote {} - may have been completed already",
quote.id
);
if let Some(op_id) = operation_id {
if let Err(e) = tx.delete_saga(&op_id).await {
tracing::warn!("Failed to delete saga {} during early return: {}", op_id, e);
}
tx.commit().await?;
} else {
tx.rollback().await?;
}
let sigs = db.get_blind_signatures_for_quote("e.id).await?;
return Ok(if sigs.is_empty() { None } else { Some(sigs) });
}
#[cfg(feature = "prometheus")]
let should_record_payment_metrics = locked_quote.state != MeltQuoteState::Paid;
let (proofs, quote) = if locked_quote.state == MeltQuoteState::Paid {
let locked_quote = locked_quote.inner();
if !settlement_matches(&locked_quote) {
tx.rollback().await?;
return Err(Error::PaidQuote);
}
tracing::info!(
"Melt quote {} already Paid, skipping to change/cleanup",
quote.id
);
let proofs = tx.get_proofs(&input_ys).await?.to_vec();
tx.commit().await?;
(proofs, locked_quote)
} else {
let (proofs, quote) = finalize_melt_core(
tx,
pubsub,
locked_quote,
&input_ys,
melt_request_info.inputs_amount.clone(),
melt_request_info.inputs_fee.clone(),
total_spent.clone(),
payment_proof.clone(),
payment_lookup_id,
)
.await?;
(proofs, quote)
};
let (change_sigs, mut tx) = process_melt_change(
mint,
db,
"e.id,
melt_request_info.inputs_amount.clone(),
total_spent.clone(),
melt_request_info.inputs_fee.clone(),
melt_request_info.change_outputs.clone(),
)
.await?;
let fee_breakdown = if operation_id.is_some() {
Some(match mint.get_proofs_fee(&proofs).await {
Ok(fee_breakdown) => fee_breakdown,
Err(err) => {
tx.rollback().await?;
return Err(err);
}
})
} else {
None
};
if let Err(err) = tx.delete_melt_request("e.id).await {
tx.rollback().await?;
return Err(err.into());
}
if let (Some(op_id), Some(fee_breakdown)) = (operation_id, fee_breakdown.as_ref()) {
let change_amount = change_sigs
.as_ref()
.map(|sigs| {
Amount::try_sum(sigs.iter().map(|s| s.amount))
.expect("Change amount cannot overflow")
})
.unwrap_or_default();
let mut operation = mint_types::Operation::new(
op_id,
mint_types::OperationKind::Melt,
Amount::ZERO,
melt_request_info.inputs_amount.clone().into(),
fee_breakdown.total,
None,
Some(quote.payment_method.clone()),
);
operation.add_change(change_amount);
let payment_fee = match total_spent.checked_sub("e.amount()) {
Ok(payment_fee) => payment_fee,
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
operation.set_payment_details(quote.amount().into(), payment_fee.into());
if let Err(err) = tx
.add_completed_operation(&operation, &fee_breakdown.per_keyset)
.await
{
tx.rollback().await?;
return Err(err.into());
}
}
if let Some(op_id) = operation_id {
if let Err(err) = tx.delete_saga(&op_id).await {
tx.rollback().await?;
return Err(err.into());
}
}
tx.commit().await?;
pubsub.melt_quote_status(
"e,
payment_proof,
change_sigs.clone(),
MeltQuoteState::Paid,
);
tracing::info!("Successfully finalized melt quote {}", quote.id);
#[cfg(feature = "prometheus")]
if should_record_payment_metrics {
record_confirmed_payment_metrics("e, &total_spent);
}
Ok(change_sigs)
}