use std::collections::VecDeque;
use std::sync::Arc;
use cdk_common::database::mint::MeltRequestInfo;
use cdk_common::database::DynMintDatabase;
use cdk_common::mint::{MeltSagaState, Operation, Saga, SagaStateEnum};
use cdk_common::nut00::KnownMethod;
use cdk_common::nuts::MeltQuoteState;
use cdk_common::payment::OutgoingPaymentOptions;
use cdk_common::{
Amount, CurrencyUnit, Error, ProofsMethods, PublicKey, QuoteId, SpendingConditionVerification,
State,
};
#[cfg(feature = "prometheus")]
use cdk_prometheus::METRICS;
use tokio::sync::Mutex;
use tracing::instrument;
use self::compensation::{CompensatingAction, RemoveMeltSetup};
use self::state::{Initial, PaymentConfirmed, SettlementDecision, SetupComplete};
use crate::cdk_payment::MakePaymentResponse;
use crate::mint::melt::shared;
use crate::mint::subscription::PubSubManager;
use crate::mint::verification::Verification;
use crate::mint::{MeltQuoteBolt11Response, MeltRequest};
use crate::Mint;
mod compensation;
mod state;
#[cfg(test)]
mod tests;
pub struct MeltSaga<S> {
mint: Arc<super::Mint>,
db: DynMintDatabase,
pubsub: Arc<PubSubManager>,
compensations: Arc<Mutex<VecDeque<Box<dyn CompensatingAction>>>>,
operation_id: uuid::Uuid,
#[cfg(feature = "prometheus")]
metrics_incremented: bool,
state_data: S,
}
impl MeltSaga<Initial> {
pub fn new(mint: Arc<super::Mint>, db: DynMintDatabase, pubsub: Arc<PubSubManager>) -> Self {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("melt_bolt11");
let operation_id = uuid::Uuid::new_v4();
Self {
mint,
db,
pubsub,
compensations: Arc::new(Mutex::new(VecDeque::new())),
operation_id,
#[cfg(feature = "prometheus")]
metrics_incremented: true,
state_data: Initial { operation_id },
}
}
#[instrument(skip_all)]
pub async fn setup_melt(
self,
melt_request: &MeltRequest<QuoteId>,
input_verification: Verification,
payment_method: cdk_common::PaymentMethod,
) -> Result<MeltSaga<SetupComplete>, Error> {
let Verification {
amount: input_amount,
} = input_verification;
let input_unit = Some(input_amount.unit().clone());
if let Some(outputs) = melt_request.outputs() {
if !outputs.is_empty() {
let output_verification = self.mint.verify_outputs(outputs)?;
if input_unit.as_ref() != Some(output_verification.amount.unit()) {
return Err(Error::UnitMismatch);
}
}
}
melt_request.verify_spending_conditions()?;
let mut tx = self.db.begin_transaction().await?;
let mut quote =
match shared::load_melt_quotes_exclusively(&mut tx, melt_request.quote()).await {
Ok(quote) => quote,
Err(err) => {
tx.rollback().await?;
return Err(err);
}
};
let fee_breakdown = self.mint.get_proofs_fee(melt_request.inputs()).await?;
let operation = Operation::new(
self.state_data.operation_id,
cdk_common::mint::OperationKind::Melt,
Amount::ZERO, input_amount.clone().into(), fee_breakdown.total, None, Some(payment_method), );
if let Err(err) = tx
.add_proofs(
melt_request.inputs().clone(),
Some(melt_request.quote_id().to_owned()),
&operation,
)
.await
{
tx.rollback().await?;
return Err(match err {
cdk_common::database::Error::Duplicate => Error::TokenPending,
cdk_common::database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent,
err => Error::Database(err),
});
}
let input_ys = melt_request.inputs().ys()?;
let mut proofs = tx.get_proofs(&input_ys).await?;
if let Err(err) = Mint::update_proofs_state(&mut tx, &mut proofs, State::Pending).await {
tx.rollback().await?;
return Err(err);
}
let previous_state = quote.state;
if input_unit != Some(quote.unit.clone()) {
tx.rollback().await?;
return Err(Error::UnitMismatch);
}
match previous_state {
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {}
MeltQuoteState::Pending => {
tx.rollback().await?;
return Err(Error::PendingQuote);
}
MeltQuoteState::Paid => {
tx.rollback().await?;
return Err(Error::PaidQuote);
}
MeltQuoteState::Unknown => {
tx.rollback().await?;
return Err(Error::UnknownPaymentState);
}
}
match tx
.update_melt_quote_state(&mut quote, MeltQuoteState::Pending, None)
.await
{
Ok(_) => {}
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
let inputs_fee_breakdown = self.mint.get_proofs_fee(melt_request.inputs()).await?;
let inputs_fee = inputs_fee_breakdown.total.with_unit(quote.unit.clone());
let fee_reserve = quote.fee_reserve();
let required_total = quote
.amount()
.checked_add(&fee_reserve)?
.checked_add(&inputs_fee)?;
if input_amount < required_total.clone() {
tracing::info!(
"Melt request unbalanced: inputs {}, amount {}, fee_reserve {}, input_fee {}, required {}",
input_amount,
quote.amount(),
fee_reserve,
inputs_fee,
required_total
);
tx.rollback().await?;
return Err(Error::TransactionUnbalanced(
input_amount.to_u64(),
quote.amount().value(),
inputs_fee.checked_add(&fee_reserve)?.value(),
));
}
tx.add_melt_request(
melt_request.quote_id(),
melt_request.inputs_amount()?.with_unit(quote.unit.clone()),
inputs_fee.clone(),
)
.await?;
tx.add_blinded_messages(
Some(melt_request.quote_id()),
melt_request.outputs().as_ref().unwrap_or(&Vec::new()),
&operation,
)
.await?;
let blinded_secrets: Vec<PublicKey> = melt_request
.outputs()
.as_ref()
.unwrap_or(&Vec::new())
.iter()
.map(|bm| bm.blinded_secret)
.collect();
let saga = Saga::new_melt(
self.operation_id,
MeltSagaState::SetupComplete,
quote.id.to_string(),
);
if let Err(err) = tx.add_saga(&saga).await {
tx.rollback().await?;
return Err(err.into());
}
tx.commit().await?;
for pk in input_ys.iter() {
self.pubsub.proof_state((*pk, State::Pending));
}
self.pubsub
.melt_quote_status("e, None, None, MeltQuoteState::Pending);
let blinded_messages_vec = melt_request.outputs().clone().unwrap_or_default();
let compensations = Arc::clone(&self.compensations);
compensations
.lock()
.await
.push_front(Box::new(RemoveMeltSetup {
input_ys: input_ys.clone(),
blinded_secrets,
quote_id: quote.id.clone(),
operation_id: self.operation_id,
}));
Ok(MeltSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation_id: self.operation_id,
#[cfg(feature = "prometheus")]
metrics_incremented: self.metrics_incremented,
state_data: SetupComplete {
quote: quote.inner(),
input_ys,
blinded_messages: blinded_messages_vec,
operation,
fee_breakdown,
},
})
}
}
impl MeltSaga<SetupComplete> {
#[instrument(skip_all)]
pub async fn attempt_internal_settlement(
self,
melt_request: &MeltRequest<QuoteId>,
) -> Result<(Self, SettlementDecision), Error> {
let mut tx = self.db.begin_transaction().await?;
let mut mint_quote = match tx
.get_mint_quote_by_request(&self.state_data.quote.request.to_string())
.await
{
Ok(Some(mint_quote)) if mint_quote.unit == self.state_data.quote.unit => mint_quote,
Ok(_) => {
tx.rollback().await?;
tracing::debug!("Not an internal payment or unit mismatch");
return Ok((self, SettlementDecision::RequiresExternalPayment));
}
Err(err) => {
tx.rollback().await?;
tracing::debug!("Error checking for mint quote: {}", err);
self.compensate_all().await?;
return Err(Error::Internal);
}
};
if (mint_quote.state() == cdk_common::nuts::MintQuoteState::Issued
|| mint_quote.state() == cdk_common::nuts::MintQuoteState::Paid)
&& mint_quote.payment_method == crate::mint::PaymentMethod::Known(KnownMethod::Bolt11)
{
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::RequestAlreadyPaid);
}
let inputs_amount_quote_unit = melt_request
.inputs_amount()
.map_err(|_| {
tracing::error!("Proof inputs in melt quote overflowed");
Error::AmountOverflow
})?
.with_unit(mint_quote.unit.clone());
if let Some(ref amount) = mint_quote.amount {
if amount > &inputs_amount_quote_unit {
tracing::debug!(
"Not enough inputs provided: {} needed {}",
inputs_amount_quote_unit,
amount
);
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::InsufficientFunds);
}
}
let amount = self.state_data.quote.amount();
tracing::info!(
"Mint quote {} paid {} from internal payment.",
mint_quote.id,
amount
);
tx.update_saga(
&self.operation_id,
SagaStateEnum::Melt(MeltSagaState::PaymentAttempted),
)
.await?;
mint_quote.add_payment(amount.clone(), self.state_data.quote.id.to_string(), None)?;
tx.update_mint_quote(&mut mint_quote).await?;
tx.commit().await?;
self.pubsub
.mint_quote_payment(&mint_quote, mint_quote.amount_paid());
tracing::info!(
"Melt quote {} paid Mint quote {}",
self.state_data.quote.id,
mint_quote.id
);
Ok((self, SettlementDecision::Internal { amount }))
}
#[instrument(skip_all)]
pub async fn make_payment(
self,
settlement: SettlementDecision,
) -> Result<MeltSaga<PaymentConfirmed>, Error> {
let payment_result = match settlement {
SettlementDecision::Internal { amount } => self.handle_internal_payment(amount),
SettlementDecision::RequiresExternalPayment => {
let response = self.attempt_external_payment().await?;
match response.status {
MeltQuoteState::Paid => response,
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
tracing::info!(
"Lightning payment for quote {} failed.",
self.state_data.quote.id
);
self.compensate_all().await?;
return Err(Error::PaymentFailed);
}
MeltQuoteState::Unknown => {
tracing::warn!(
"Lightning payment for quote {} unknown.",
self.state_data.quote.id
);
return Err(Error::PendingQuote);
}
MeltQuoteState::Pending => {
tracing::warn!(
"LN payment pending, proofs remain pending for quote: {}",
self.state_data.quote.id
);
return Err(Error::PendingQuote);
}
}
}
};
Ok(MeltSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation_id: self.operation_id,
#[cfg(feature = "prometheus")]
metrics_incremented: self.metrics_incremented,
state_data: PaymentConfirmed {
quote: self.state_data.quote,
input_ys: self.state_data.input_ys,
blinded_messages: self.state_data.blinded_messages,
payment_result,
operation: self.state_data.operation,
fee_breakdown: self.state_data.fee_breakdown,
},
})
}
fn handle_internal_payment(&self, amount: Amount<CurrencyUnit>) -> MakePaymentResponse {
tracing::info!(
"Payment settled internally for {} {}",
amount,
self.state_data.quote.unit
);
MakePaymentResponse {
status: MeltQuoteState::Paid,
total_spent: amount,
payment_proof: None,
payment_lookup_id: self
.state_data
.quote
.request_lookup_id
.clone()
.unwrap_or_else(|| {
cdk_common::payment::PaymentIdentifier::CustomId(
self.state_data.quote.id.to_string(),
)
}),
}
}
async fn attempt_external_payment(&self) -> Result<MakePaymentResponse, Error> {
let ln = self
.mint
.payment_processors
.get(&crate::types::PaymentProcessorKey::new(
self.state_data.quote.unit.clone(),
self.state_data.quote.payment_method.clone(),
))
.ok_or_else(|| {
tracing::info!(
"Could not get ln backend for {}, {}",
self.state_data.quote.unit,
self.state_data.quote.payment_method
);
Error::UnsupportedUnit
})?;
{
let mut tx = self.db.begin_transaction().await?;
tx.update_saga(
&self.operation_id,
SagaStateEnum::Melt(MeltSagaState::PaymentAttempted),
)
.await?;
tx.commit().await?;
}
self.execute_payment_and_verify(Arc::clone(ln)).await
}
async fn execute_payment_and_verify(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
) -> Result<MakePaymentResponse, Error> {
let quote = &self.state_data.quote;
let payment_options = OutgoingPaymentOptions::from_melt_quote_with_fee(quote.clone())?;
match ln.make_payment("e.unit, payment_options).await {
Ok(pay) if pay.status == MeltQuoteState::Paid => Ok(pay),
Ok(pay) => self.verify_ambiguous_payment(ln, pay).await,
Err(err) => self.handle_payment_error(ln, err).await,
}
}
async fn verify_ambiguous_payment(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
pay: MakePaymentResponse,
) -> Result<MakePaymentResponse, Error> {
tracing::warn!(
"Got {} status when paying melt quote {} for {} {}. Verifying with backend...",
pay.status,
self.state_data.quote.id,
self.state_data.quote.amount(),
self.state_data.quote.unit
);
let mut check_response = self.check_payment_state(ln, &pay.payment_lookup_id).await?;
if check_response.status == MeltQuoteState::Paid {
tracing::info!(
"Payment initially returned {} but confirmed as Paid. Proceeding to finalize.",
pay.status
);
return Ok(check_response);
}
if pay.status == MeltQuoteState::Pending && check_response.status == MeltQuoteState::Unknown
{
tracing::warn!(
"Payment was initially Pending but verification returned Unknown. Keeping as Pending for safety."
);
return Ok(pay);
}
if check_response.status == MeltQuoteState::Unknown {
check_response.status = MeltQuoteState::Failed;
}
Ok(check_response)
}
async fn handle_payment_error(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
err: cdk_common::payment::Error,
) -> Result<MakePaymentResponse, Error> {
if matches!(err, crate::cdk_payment::Error::InvoiceAlreadyPaid) {
tracing::info!("Invoice already paid, verifying payment status");
} else {
tracing::error!(
"Error returned attempting to pay: {} {}",
self.state_data.quote.id,
err
);
}
let lookup_id = self
.state_data
.quote
.request_lookup_id
.as_ref()
.ok_or_else(|| {
tracing::error!(
"No payment id, cannot verify payment status for {} after error",
self.state_data.quote.id
);
Error::Internal
})?;
let mut check_response = self.check_payment_state(ln, lookup_id).await?;
tracing::info!(
"Initial payment attempt for {} errored. Follow up check status: {}",
self.state_data.quote.id,
check_response.status
);
if check_response.status == MeltQuoteState::Unknown {
check_response.status = MeltQuoteState::Failed;
}
Ok(check_response)
}
async fn check_payment_state(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
lookup_id: &cdk_common::payment::PaymentIdentifier,
) -> Result<MakePaymentResponse, Error> {
match ln.check_outgoing_payment(lookup_id).await {
Ok(response) => Ok(response),
Err(check_err) => {
tracing::error!(
"Could not check the status of payment for {}. Proofs stuck as pending",
lookup_id
);
tracing::error!("Checking payment error: {}", check_err);
Err(Error::Internal)
}
}
}
}
impl MeltSaga<PaymentConfirmed> {
#[instrument(skip_all)]
pub async fn finalize(self) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
tracing::info!("TX2: Finalizing melt (mark spent + change)");
let total_spent: Amount<CurrencyUnit> = self
.state_data
.payment_result
.total_spent
.convert_to(&self.state_data.quote.unit)
.map_err(|e| {
tracing::error!("Failed to convert total_spent to quote unit: {:?}", e);
Error::UnitMismatch
})?;
let payment_preimage = self.state_data.payment_result.payment_proof.clone();
let payment_lookup_id = &self.state_data.payment_result.payment_lookup_id;
let mut tx = self.db.begin_transaction().await?;
let mut quote =
shared::load_melt_quotes_exclusively(&mut tx, &self.state_data.quote.id).await?;
let MeltRequestInfo {
inputs_amount,
inputs_fee,
change_outputs,
} = tx
.get_melt_request_and_blinded_messages(&self.state_data.quote.id)
.await?
.ok_or(Error::UnknownQuote)?;
if let Err(err) = super::shared::finalize_melt_core(
&mut tx,
&self.pubsub,
&mut quote,
&self.state_data.input_ys,
inputs_amount.clone(),
inputs_fee.clone(),
total_spent.clone(),
payment_preimage.clone(),
payment_lookup_id,
)
.await
{
tracing::error!(
"Finalize failed for paid melt quote {} - will retry on startup: {}",
self.state_data.quote.id,
err
);
tx.rollback().await?;
return Err(err);
}
let needs_change = inputs_amount > total_spent;
let (change, mut tx) = if !needs_change {
tracing::debug!("No change required for melt {}", self.state_data.quote.id);
(None, tx)
} else {
tx.update_saga(
&self.operation_id,
cdk_common::mint::SagaStateEnum::Melt(cdk_common::mint::MeltSagaState::Finalizing),
)
.await?;
tx.commit().await?;
super::shared::process_melt_change(
&self.mint,
&self.db,
&self.state_data.quote.id,
inputs_amount.clone(),
total_spent.clone(),
inputs_fee,
change_outputs,
)
.await?
};
tx.delete_melt_request(&self.state_data.quote.id).await?;
if let Err(e) = tx.delete_saga(&self.operation_id).await {
tracing::warn!("Failed to delete saga in finalize: {}", e);
}
let mut operation = self.state_data.operation;
let change_amount = change
.as_ref()
.map(|c| Amount::try_sum(c.iter().map(|a| a.amount)).expect("Change cannot overflow"))
.unwrap_or_default();
operation.add_change(change_amount);
let payment_fee = total_spent.checked_sub(&self.state_data.quote.amount())?;
operation.set_payment_details(self.state_data.quote.amount().into(), payment_fee.into());
tx.add_completed_operation(&operation, &self.state_data.fee_breakdown.per_keyset)
.await?;
tx.commit().await?;
self.pubsub.melt_quote_status(
"e,
payment_preimage.clone(),
change.clone(),
MeltQuoteState::Paid,
);
tracing::debug!(
"Melt for quote {} completed total spent {}, total inputs: {}, change given: {}",
self.state_data.quote.id,
total_spent,
inputs_amount,
change_amount
);
self.compensations.lock().await.clear();
#[cfg(feature = "prometheus")]
if self.metrics_incremented {
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", true);
}
let response = MeltQuoteBolt11Response {
amount: self.state_data.quote.amount().into(),
payment_preimage,
change,
quote: self.state_data.quote.id.clone(),
fee_reserve: self.state_data.quote.fee_reserve().into(),
state: MeltQuoteState::Paid,
expiry: self.state_data.quote.expiry,
request: Some(self.state_data.quote.request.to_string()),
unit: Some(self.state_data.quote.unit.clone()),
};
Ok(response)
}
}
impl<S> MeltSaga<S> {
#[instrument(skip_all)]
async fn compensate_all(self) -> Result<(), Error> {
let mut compensations = self.compensations.lock().await;
if compensations.is_empty() {
return Ok(());
}
#[cfg(feature = "prometheus")]
if self.metrics_incremented {
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", false);
METRICS.record_error();
}
tracing::warn!("Running {} compensating actions", compensations.len());
while let Some(compensation) = compensations.pop_front() {
tracing::debug!("Running compensation: {}", compensation.name());
if let Err(e) = compensation.execute(&self.db, &self.pubsub).await {
tracing::error!(
"Compensation {} failed: {}. Continuing...",
compensation.name(),
e
);
}
}
Ok(())
}
}