use std::collections::VecDeque;
use std::sync::Arc;
use cdk_common::database::DynMintDatabase;
use cdk_common::mint::{MeltFinalizationData, MeltSagaState, Operation, Saga, SagaStateEnum};
use cdk_common::nut00::KnownMethod;
use cdk_common::nuts::MeltQuoteState;
use cdk_common::payment::OutgoingPaymentOptions;
use cdk_common::{
Amount, CurrencyUnit, Error, ProofsMethods, PublicKey, QuoteId, SpendingConditionVerification,
State,
};
#[cfg(feature = "prometheus")]
use cdk_prometheus::MintMetricGuard;
use tokio::sync::Mutex;
use tracing::instrument;
use self::compensation::{CompensatingAction, RemoveMeltSetup};
use self::state::{Initial, PaymentConfirmed, SettlementDecision, SetupComplete};
use crate::cdk_payment::MakePaymentResponse;
use crate::mint::melt::shared;
use crate::mint::subscription::PubSubManager;
use crate::mint::verification::Verification;
use crate::mint::MeltRequest;
use crate::{MeltQuoteResponse, Mint};
mod compensation;
mod state;
#[cfg(test)]
mod tests;
pub struct MeltSaga<S> {
mint: Arc<super::Mint>,
db: DynMintDatabase,
pubsub: Arc<PubSubManager>,
compensations: Arc<Mutex<VecDeque<Box<dyn CompensatingAction>>>>,
operation_id: uuid::Uuid,
#[cfg(feature = "prometheus")]
metrics: Option<MintMetricGuard>,
state_data: S,
}
impl MeltSaga<Initial> {
pub fn new(mint: Arc<super::Mint>, db: DynMintDatabase, pubsub: Arc<PubSubManager>) -> Self {
#[cfg(feature = "prometheus")]
let metrics = Some(MintMetricGuard::new("melt_bolt11"));
let operation_id = uuid::Uuid::new_v4();
Self {
mint,
db,
pubsub,
compensations: Arc::new(Mutex::new(VecDeque::new())),
operation_id,
#[cfg(feature = "prometheus")]
metrics,
state_data: Initial { operation_id },
}
}
#[instrument(skip_all)]
pub async fn setup_melt(
self,
melt_request: &MeltRequest<QuoteId>,
input_verification: Verification,
payment_method: cdk_common::PaymentMethod,
) -> Result<MeltSaga<SetupComplete>, Error> {
let Verification {
amount: input_amount,
} = input_verification;
let input_unit = Some(input_amount.unit().clone());
if let Some(outputs) = melt_request.outputs() {
if !outputs.is_empty() {
let output_verification = self.mint.verify_outputs(outputs)?;
if input_unit.as_ref() != Some(output_verification.amount.unit()) {
return Err(Error::UnitMismatch);
}
}
}
melt_request.verify_spending_conditions()?;
let mut tx = self.db.begin_transaction().await?;
let mut quote =
match shared::load_melt_quotes_exclusively(&mut tx, melt_request.quote()).await {
Ok(quote) => quote,
Err(err) => {
tx.rollback().await?;
return Err(err);
}
};
let fee_breakdown = self.mint.get_proofs_fee(melt_request.inputs()).await?;
let operation = Operation::new(
self.state_data.operation_id,
cdk_common::mint::OperationKind::Melt,
Amount::ZERO, input_amount.clone().into(), fee_breakdown.total, None, Some(payment_method.clone()), );
if let Err(err) = tx
.add_proofs(
melt_request.inputs().clone(),
Some(melt_request.quote_id().to_owned()),
&operation,
)
.await
{
tx.rollback().await?;
return Err(match err {
cdk_common::database::Error::Duplicate => Error::TokenPending,
cdk_common::database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent,
err => Error::Database(err),
});
}
let input_ys = melt_request.inputs().ys()?;
let mut proofs = tx.get_proofs(&input_ys).await?;
if let Err(err) = Mint::update_proofs_state(&mut tx, &mut proofs, State::Pending).await {
tx.rollback().await?;
return Err(err);
}
let previous_state = quote.state;
if input_unit != Some(quote.unit.clone()) {
tx.rollback().await?;
return Err(Error::UnitMismatch);
}
if payment_method == cdk_common::PaymentMethod::Known(KnownMethod::Onchain) {
let Some(fee_index) = melt_request.selected_fee_index() else {
tracing::warn!(
quote_id = %quote.id,
"onchain melt rejected: request is missing selected fee_index",
);
tx.rollback().await?;
return Err(Error::InvalidPaymentRequest);
};
if let Err(err) = quote.select_onchain_fee_option(fee_index) {
let available: Vec<u32> = quote
.fee_options()
.iter()
.map(|option| option.fee_index)
.collect();
tracing::warn!(
quote_id = %quote.id,
requested_fee_index = fee_index,
available_fee_indices = ?available,
"onchain melt rejected: selected fee_index is not valid for this quote: {err}",
);
tx.rollback().await?;
return Err(err);
}
}
match previous_state {
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {}
MeltQuoteState::Pending => {
tx.rollback().await?;
return Err(Error::PendingQuote);
}
MeltQuoteState::Paid => {
tx.rollback().await?;
return Err(Error::PaidQuote);
}
MeltQuoteState::Unknown => {
tx.rollback().await?;
return Err(Error::UnknownPaymentState);
}
}
match tx
.update_melt_quote_state(&mut quote, MeltQuoteState::Pending, None)
.await
{
Ok(_) => {}
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
let inputs_fee_breakdown = self.mint.get_proofs_fee(melt_request.inputs()).await?;
let inputs_fee = inputs_fee_breakdown.total.with_unit(quote.unit.clone());
let fee_reserve = quote.fee_reserve();
let required_total = quote
.amount()
.checked_add(&fee_reserve)?
.checked_add(&inputs_fee)?;
let amount_mismatch = input_amount < required_total.clone();
if amount_mismatch {
tracing::info!(
"Melt request unbalanced: inputs {}, amount {}, fee_reserve {}, input_fee {}, required {}",
input_amount,
quote.amount(),
fee_reserve,
inputs_fee,
required_total
);
tx.rollback().await?;
return Err(Error::TransactionUnbalanced(
input_amount.to_u64(),
quote.amount().value(),
inputs_fee.checked_add(&fee_reserve)?.value(),
));
}
tx.add_melt_request(
melt_request.quote_id(),
melt_request.inputs_amount()?.with_unit(quote.unit.clone()),
inputs_fee.clone(),
)
.await?;
tx.add_blinded_messages(
Some(melt_request.quote_id()),
melt_request.outputs().as_ref().unwrap_or(&Vec::new()),
&operation,
)
.await?;
let blinded_secrets: Vec<PublicKey> = melt_request
.outputs()
.as_ref()
.unwrap_or(&Vec::new())
.iter()
.map(|bm| bm.blinded_secret)
.collect();
let saga = Saga::new_melt(
self.operation_id,
MeltSagaState::SetupComplete,
quote.id.to_string(),
);
if let Err(err) = tx.add_saga(&saga).await {
tx.rollback().await?;
return Err(err.into());
}
tx.commit().await?;
for pk in input_ys.iter() {
self.pubsub.proof_state((*pk, State::Pending));
}
self.pubsub
.melt_quote_status("e, None, None, MeltQuoteState::Pending);
let compensations = Arc::clone(&self.compensations);
compensations
.lock()
.await
.push_front(Box::new(RemoveMeltSetup {
input_ys: input_ys.clone(),
blinded_secrets,
quote_id: quote.id.clone(),
operation_id: self.operation_id,
}));
Ok(MeltSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation_id: self.operation_id,
#[cfg(feature = "prometheus")]
metrics: self.metrics,
state_data: SetupComplete {
quote: quote.inner(),
},
})
}
}
impl MeltSaga<SetupComplete> {
#[instrument(skip_all)]
pub async fn attempt_internal_settlement(
self,
melt_request: &MeltRequest<QuoteId>,
) -> Result<(Self, SettlementDecision), Error> {
let mut tx = self.db.begin_transaction().await?;
let mut mint_quote = match tx
.get_mint_quote_by_request(&self.state_data.quote.request.to_string())
.await
{
Ok(Some(mint_quote)) if mint_quote.unit == self.state_data.quote.unit => mint_quote,
Ok(_) => {
tx.rollback().await?;
tracing::debug!("Not an internal payment or unit mismatch");
return Ok((self, SettlementDecision::RequiresExternalPayment));
}
Err(err) => {
tx.rollback().await?;
tracing::debug!("Error checking for mint quote: {}", err);
self.compensate_all().await?;
return Err(Error::Internal);
}
};
if (mint_quote.state() == cdk_common::nuts::MintQuoteState::Issued
|| mint_quote.state() == cdk_common::nuts::MintQuoteState::Paid)
&& mint_quote.payment_method == crate::mint::PaymentMethod::Known(KnownMethod::Bolt11)
{
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::RequestAlreadyPaid);
}
let inputs_amount_quote_unit = melt_request
.inputs_amount()
.map_err(|_| {
tracing::error!("Proof inputs in melt quote overflowed");
Error::AmountOverflow
})?
.with_unit(mint_quote.unit.clone());
if let Some(ref amount) = mint_quote.amount {
if amount > &inputs_amount_quote_unit {
tracing::debug!(
"Not enough inputs provided: {} needed {}",
inputs_amount_quote_unit,
amount
);
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::InsufficientFunds);
}
}
let amount = self.state_data.quote.amount();
tracing::info!(
"Mint quote {} paid {} from internal payment.",
mint_quote.id,
amount
);
tx.update_saga(
&self.operation_id,
SagaStateEnum::Melt(MeltSagaState::PaymentAttempted),
)
.await?;
mint_quote.add_payment(amount.clone(), self.state_data.quote.id.to_string(), None)?;
tx.update_mint_quote(&mut mint_quote).await?;
tx.commit().await?;
self.pubsub
.mint_quote_payment(&mint_quote, mint_quote.amount_paid());
tracing::info!(
"Melt quote {} paid Mint quote {}",
self.state_data.quote.id,
mint_quote.id
);
Ok((self, SettlementDecision::Internal { amount }))
}
#[instrument(skip_all)]
pub async fn make_payment(
self,
settlement: SettlementDecision,
) -> Result<PaymentOutcome, Error> {
let payment_result = match settlement {
SettlementDecision::Internal { amount } => self.handle_internal_payment(amount),
SettlementDecision::RequiresExternalPayment => {
let response = self.attempt_external_payment().await?;
match response.status {
MeltQuoteState::Paid => response,
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
tracing::info!(
"Lightning payment for quote {} failed.",
self.state_data.quote.id
);
self.compensate_all().await?;
return Err(Error::PaymentFailed);
}
MeltQuoteState::Unknown => {
tracing::warn!(
"Lightning payment for quote {} unknown.",
self.state_data.quote.id
);
return Ok(PaymentOutcome::Pending {
#[cfg(feature = "prometheus")]
metrics: self.metrics,
});
}
MeltQuoteState::Pending => {
tracing::warn!(
"LN payment pending, proofs remain pending for quote: {}",
self.state_data.quote.id
);
return Ok(PaymentOutcome::Pending {
#[cfg(feature = "prometheus")]
metrics: self.metrics,
});
}
}
}
};
Ok(PaymentOutcome::Confirmed(Box::new(MeltSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation_id: self.operation_id,
#[cfg(feature = "prometheus")]
metrics: self.metrics,
state_data: PaymentConfirmed {
quote: self.state_data.quote,
payment_result,
},
})))
}
fn handle_internal_payment(&self, amount: Amount<CurrencyUnit>) -> MakePaymentResponse {
tracing::info!(
"Payment settled internally for {} {}",
amount,
self.state_data.quote.unit
);
MakePaymentResponse {
status: MeltQuoteState::Paid,
total_spent: amount,
payment_proof: None,
payment_lookup_id: self
.state_data
.quote
.request_lookup_id
.clone()
.unwrap_or_else(|| {
cdk_common::payment::PaymentIdentifier::CustomId(
self.state_data.quote.id.to_string(),
)
}),
}
}
async fn attempt_external_payment(&self) -> Result<MakePaymentResponse, Error> {
let ln = self
.mint
.payment_processors
.get(&crate::types::PaymentProcessorKey::new(
self.state_data.quote.unit.clone(),
self.state_data.quote.payment_method.clone(),
))
.ok_or_else(|| {
tracing::info!(
"Could not get ln backend for {}, {}",
self.state_data.quote.unit,
self.state_data.quote.payment_method
);
Error::UnsupportedUnit
})?;
{
let mut tx = self.db.begin_transaction().await?;
tx.update_saga(
&self.operation_id,
SagaStateEnum::Melt(MeltSagaState::PaymentAttempted),
)
.await?;
tx.commit().await?;
}
self.execute_payment_and_verify(Arc::clone(ln)).await
}
async fn execute_payment_and_verify(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
) -> Result<MakePaymentResponse, Error> {
let quote = &self.state_data.quote;
let payment_options = OutgoingPaymentOptions::from_melt_quote_with_fee(quote.clone())?;
match ln.make_payment("e.unit, payment_options).await {
Ok(pay) if pay.status == MeltQuoteState::Paid => Ok(pay),
Ok(pay) => self.verify_ambiguous_payment(ln, pay).await,
Err(err) => self.handle_payment_error(ln, err).await,
}
}
async fn verify_ambiguous_payment(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
pay: MakePaymentResponse,
) -> Result<MakePaymentResponse, Error> {
tracing::warn!(
"Got {} status when paying melt quote {} for {} {}. Verifying with backend...",
pay.status,
self.state_data.quote.id,
self.state_data.quote.amount(),
self.state_data.quote.unit
);
let mut check_response = self.check_payment_state(ln, &pay.payment_lookup_id).await?;
if check_response.status == MeltQuoteState::Paid {
tracing::info!(
"Payment initially returned {} but confirmed as Paid. Proceeding to finalize.",
pay.status
);
return Ok(check_response);
}
if pay.status == MeltQuoteState::Pending && check_response.status == MeltQuoteState::Unknown
{
tracing::warn!(
"Payment was initially Pending but verification returned Unknown. Keeping as Pending for safety."
);
return Ok(pay);
}
if check_response.status == MeltQuoteState::Unknown {
check_response.status = MeltQuoteState::Failed;
}
Ok(check_response)
}
async fn handle_payment_error(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
err: cdk_common::payment::Error,
) -> Result<MakePaymentResponse, Error> {
if matches!(err, crate::cdk_payment::Error::InvoiceAlreadyPaid) {
tracing::info!("Invoice already paid, verifying payment status");
} else {
tracing::error!(
"Error returned attempting to pay: {} {}",
self.state_data.quote.id,
err
);
}
let lookup_id = self
.state_data
.quote
.request_lookup_id
.as_ref()
.ok_or_else(|| {
tracing::error!(
"No payment id, cannot verify payment status for {} after error",
self.state_data.quote.id
);
Error::Internal
})?;
let mut check_response = self.check_payment_state(ln, lookup_id).await?;
tracing::info!(
"Initial payment attempt for {} errored. Follow up check status: {}",
self.state_data.quote.id,
check_response.status
);
if check_response.status == MeltQuoteState::Unknown {
check_response.status = MeltQuoteState::Failed;
}
Ok(check_response)
}
async fn check_payment_state(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
lookup_id: &cdk_common::payment::PaymentIdentifier,
) -> Result<MakePaymentResponse, Error> {
match ln.check_outgoing_payment(lookup_id).await {
Ok(response) => Ok(response),
Err(check_err) => {
tracing::error!(
"Could not check the status of payment for {}. Proofs stuck as pending",
lookup_id
);
tracing::error!("Checking payment error: {}", check_err);
Err(Error::Internal)
}
}
}
}
impl MeltSaga<PaymentConfirmed> {
#[instrument(skip_all)]
pub async fn finalize(mut self) -> Result<MeltQuoteResponse<QuoteId>, Error> {
tracing::info!("TX2: Finalizing melt (mark spent + change)");
let total_spent: Amount<CurrencyUnit> = self
.state_data
.payment_result
.total_spent
.convert_to(&self.state_data.quote.unit)
.map_err(|e| {
tracing::error!("Failed to convert total_spent to quote unit: {:?}", e);
Error::UnitMismatch
})?;
let payment_proof = self.state_data.payment_result.payment_proof.clone();
let payment_lookup_id = &self.state_data.payment_result.payment_lookup_id;
{
let mut tx = self.db.begin_transaction().await?;
let finalization_data = MeltFinalizationData {
total_spent: total_spent.clone(),
payment_lookup_id: payment_lookup_id.clone(),
payment_proof: payment_proof.clone(),
};
tx.update_saga_with_finalization_data(
&self.operation_id,
SagaStateEnum::Melt(MeltSagaState::Finalizing),
Some(&finalization_data),
)
.await?;
tx.commit().await?;
}
let change = shared::finalize_melt_quote(
&self.mint,
&self.db,
&self.pubsub,
&self.state_data.quote,
total_spent,
payment_proof.clone(),
payment_lookup_id,
Some(self.operation_id),
)
.await
.map_err(|err| {
tracing::error!(
"Finalize failed for paid melt quote {} - will retry on startup: {}",
self.state_data.quote.id,
err
);
err
})?;
self.compensations.lock().await.clear();
#[cfg(feature = "prometheus")]
if let Some(metrics) = self.metrics.take() {
metrics.record(true);
}
self.state_data.quote.payment_proof = payment_proof;
self.state_data.quote.state = MeltQuoteState::Paid;
let response = self.state_data.quote.into_response(change);
Ok(response)
}
}
pub enum PaymentOutcome {
Confirmed(Box<MeltSaga<PaymentConfirmed>>),
Pending {
#[cfg(feature = "prometheus")]
metrics: Option<MintMetricGuard>,
},
}
impl<S> MeltSaga<S> {
#[instrument(skip_all)]
async fn compensate_all(self) -> Result<(), Error> {
#[cfg(feature = "prometheus")]
let metrics = self.metrics;
let mut compensations = self.compensations.lock().await;
if compensations.is_empty() {
return Ok(());
}
#[cfg(feature = "prometheus")]
if let Some(metrics) = metrics {
metrics.record(false);
}
tracing::warn!("Running {} compensating actions", compensations.len());
while let Some(compensation) = compensations.pop_front() {
tracing::debug!("Running compensation: {}", compensation.name());
if let Err(e) = compensation.execute(&self.db, &self.pubsub).await {
tracing::error!(
"Compensation {} failed: {}. Continuing...",
compensation.name(),
e
);
}
}
Ok(())
}
}