use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use cdk_common::melt::MeltQuoteRequest;
use cdk_common::mint::MeltPaymentRequest;
use cdk_common::nut00::KnownMethod;
use cdk_common::nut05::MeltMethodOptions;
use cdk_common::payment::{
Bolt11OutgoingPaymentOptions, Bolt12OutgoingPaymentOptions, CustomOutgoingPaymentOptions,
OutgoingPaymentOptions,
};
use cdk_common::quote_id::QuoteId;
use cdk_common::{MeltOptions, MeltQuoteBolt12Request, MeltQuoteCustomRequest};
#[cfg(feature = "prometheus")]
use cdk_prometheus::METRICS;
use lightning::offers::offer::Offer;
use tracing::instrument;
use super::{
CurrencyUnit, MeltQuote, MeltQuoteBolt11Request, MeltQuoteBolt11Response, MeltRequest, Mint,
PaymentMethod,
};
use crate::mint::verification::MAX_REQUEST_FIELD_LEN;
use crate::nuts::MeltQuoteState;
use crate::types::PaymentProcessorKey;
use crate::util::unix_time;
use crate::{ensure_cdk, Amount, Error};
pub(crate) mod melt_saga;
pub(crate) mod shared;
#[cfg(test)]
mod tests;
use melt_saga::MeltSaga;
#[derive(Debug)]
pub struct PendingMelt {
response: MeltQuoteBolt11Response<QuoteId>,
completion: tokio::task::JoinHandle<Result<MeltQuoteBolt11Response<QuoteId>, Error>>,
}
impl PendingMelt {
pub fn pending_response(&self) -> &MeltQuoteBolt11Response<QuoteId> {
&self.response
}
pub fn into_pending_response(self) -> MeltQuoteBolt11Response<QuoteId> {
self.response
}
async fn wait(self) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
match self.completion.await {
Ok(result) => result,
Err(err) => {
tracing::error!("Background melt task failed to join: {}", err);
Err(Error::Internal)
}
}
}
}
impl std::future::IntoFuture for PendingMelt {
type Output = Result<MeltQuoteBolt11Response<QuoteId>, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.wait())
}
}
impl Mint {
#[instrument(skip_all)]
async fn check_melt_request_acceptable(
&self,
amount: Amount<CurrencyUnit>,
method: PaymentMethod,
request: String,
options: Option<MeltOptions>,
) -> Result<(), Error> {
let unit = amount.unit().clone();
let mint_info = self.mint_info().await?;
let nut05 = mint_info.nuts.nut05;
ensure_cdk!(!nut05.disabled, Error::MeltingDisabled);
let settings = nut05
.get_settings(&unit, &method)
.ok_or(Error::UnsupportedUnit)?;
match options {
Some(MeltOptions::Mpp { mpp: _ }) => {
let nut15 = mint_info.nuts.nut15;
if (self.localstore.get_mint_quote_by_request(&request).await?).is_some() {
return Err(Error::InternalMultiPartMeltQuote);
}
if !nut15
.methods
.into_iter()
.any(|m| m.method == method && m.unit == unit)
{
return Err(Error::MppUnitMethodNotSupported(unit, method));
}
}
Some(MeltOptions::Amountless { amountless: _ })
if method.is_bolt11()
&& !matches!(
settings.options,
Some(MeltMethodOptions::Bolt11 { amountless: true })
) =>
{
return Err(Error::AmountlessInvoiceNotSupported(unit, method));
}
_ => {}
};
let amount_value = amount.value();
let is_above_max = matches!(settings.max_amount, Some(max) if amount_value > max.into());
let is_below_min = matches!(settings.min_amount, Some(min) if amount_value < min.into());
match is_above_max || is_below_min {
true => {
tracing::error!(
"Melt amount out of range: {} is not within {} and {}",
amount,
settings.min_amount.unwrap_or_default(),
settings.max_amount.unwrap_or_default(),
);
Err(Error::AmountOutofLimitRange(
settings.min_amount.unwrap_or_default(),
settings.max_amount.unwrap_or_default(),
amount.into(),
))
}
false => Ok(()),
}
}
#[instrument(skip_all)]
pub async fn get_melt_quote(
&self,
melt_quote_request: MeltQuoteRequest,
) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
match melt_quote_request {
MeltQuoteRequest::Bolt11(bolt11_request) => {
self.get_melt_bolt11_quote_impl(&bolt11_request).await
}
MeltQuoteRequest::Bolt12(bolt12_request) => {
self.get_melt_bolt12_quote_impl(&bolt12_request).await
}
MeltQuoteRequest::Custom(request) => self.get_melt_custom_quote_impl(&request).await,
}
}
#[instrument(skip_all)]
async fn get_melt_bolt11_quote_impl(
&self,
melt_request: &MeltQuoteBolt11Request,
) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("get_melt_bolt11_quote");
let MeltQuoteBolt11Request {
request,
unit,
options,
..
} = melt_request;
let ln = self
.payment_processors
.get(&PaymentProcessorKey::new(
unit.clone(),
PaymentMethod::Known(KnownMethod::Bolt11),
))
.ok_or_else(|| {
tracing::info!("Could not get ln backend for {}, bolt11 ", unit);
Error::UnsupportedUnit
})?;
let bolt11 = Bolt11OutgoingPaymentOptions {
bolt11: melt_request.request.clone(),
max_fee_amount: None,
timeout_secs: None,
melt_options: melt_request.options,
};
let payment_quote = ln
.get_payment_quote(
&melt_request.unit,
OutgoingPaymentOptions::Bolt11(Box::new(bolt11)),
)
.await
.map_err(|err| {
tracing::error!(
"Could not get payment quote for mint quote, {} bolt11, {}",
unit,
err
);
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("get_melt_bolt11_quote");
METRICS.record_mint_operation("get_melt_bolt11_quote", false);
METRICS.record_error();
}
err
})?;
if payment_quote.unit() != unit {
return Err(Error::UnitMismatch);
}
self.check_melt_request_acceptable(
payment_quote.amount.clone(),
PaymentMethod::Known(KnownMethod::Bolt11),
request.to_string(),
*options,
)
.await?;
let quote_amount = payment_quote.amount;
let quote_fee = payment_quote.fee;
let melt_ttl = self.quote_ttl().await?.melt_ttl;
let quote = MeltQuote::new(
None,
MeltPaymentRequest::Bolt11 {
bolt11: request.clone(),
},
unit.clone(),
quote_amount.clone(),
quote_fee,
unix_time() + melt_ttl,
payment_quote.request_lookup_id.clone(),
*options,
PaymentMethod::Known(KnownMethod::Bolt11),
);
tracing::debug!(
"New {} melt quote {} for {} {} with request id {:?}",
quote.payment_method,
quote.id,
quote_amount,
unit,
payment_quote.request_lookup_id
);
let mut tx = self.localstore.begin_transaction().await?;
tx.add_melt_quote(quote.clone()).await?;
tx.commit().await?;
Ok(quote.into())
}
#[instrument(skip_all)]
async fn get_melt_bolt12_quote_impl(
&self,
melt_request: &MeltQuoteBolt12Request,
) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
let MeltQuoteBolt12Request {
request,
unit,
options,
} = melt_request;
let ln = self
.payment_processors
.get(&PaymentProcessorKey::new(
unit.clone(),
PaymentMethod::Known(KnownMethod::Bolt12),
))
.ok_or_else(|| {
tracing::info!("Could not get ln backend for {}, bolt12 ", unit);
Error::UnsupportedUnit
})?;
let offer = Offer::from_str(&melt_request.request).map_err(|_| Error::Bolt12parse)?;
let outgoing_payment_options = Bolt12OutgoingPaymentOptions {
offer: offer.clone(),
max_fee_amount: None,
timeout_secs: None,
melt_options: *options,
};
let payment_quote = ln
.get_payment_quote(
&melt_request.unit,
OutgoingPaymentOptions::Bolt12(Box::new(outgoing_payment_options)),
)
.await
.map_err(|err| {
tracing::error!(
"Could not get payment quote for mint quote, {} bolt12, {}",
unit,
err
);
err
})?;
if payment_quote.unit() != unit {
return Err(Error::UnitMismatch);
}
self.check_melt_request_acceptable(
payment_quote.amount.clone(),
PaymentMethod::Known(KnownMethod::Bolt12),
request.clone(),
*options,
)
.await?;
let quote_amount = payment_quote.amount;
let quote_fee = payment_quote.fee;
let payment_request = MeltPaymentRequest::Bolt12 {
offer: Box::new(offer),
};
let quote = MeltQuote::new(
None,
payment_request,
unit.clone(),
quote_amount.clone(),
quote_fee,
unix_time() + self.quote_ttl().await?.melt_ttl,
payment_quote.request_lookup_id.clone(),
*options,
PaymentMethod::Known(KnownMethod::Bolt12),
);
tracing::debug!(
"New {} melt quote {} for {} {} with request id {:?}",
quote.payment_method,
quote.id,
quote_amount,
unit,
payment_quote.request_lookup_id
);
let mut tx = self.localstore.begin_transaction().await?;
tx.add_melt_quote(quote.clone()).await?;
tx.commit().await?;
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("get_melt_bolt11_quote");
METRICS.record_mint_operation("get_melt_bolt11_quote", true);
}
Ok(quote.into())
}
#[instrument(skip_all)]
async fn get_melt_custom_quote_impl(
&self,
melt_request: &MeltQuoteCustomRequest,
) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("get_melt_custom_quote");
let MeltQuoteCustomRequest {
request,
unit,
method,
extra,
} = melt_request;
if !extra.is_null() {
let extra_str = extra.to_string();
if extra_str.len() > MAX_REQUEST_FIELD_LEN {
return Err(Error::RequestFieldTooLarge {
field: "extra".to_string(),
actual: extra_str.len(),
max: MAX_REQUEST_FIELD_LEN,
});
}
}
let ln = self
.payment_processors
.get(&PaymentProcessorKey::new(
unit.clone(),
PaymentMethod::from(method.as_str()),
))
.ok_or_else(|| {
tracing::info!("Could not get payment processor for {}, {} ", unit, method);
Error::UnsupportedUnit
})?;
let extra_json = if extra.is_null() {
None
} else {
Some(extra.to_string())
};
let custom_options =
OutgoingPaymentOptions::Custom(Box::new(CustomOutgoingPaymentOptions {
method: method.to_string(),
request: request.clone(),
max_fee_amount: None,
timeout_secs: None,
melt_options: None,
extra_json,
}));
let payment_quote = ln
.get_payment_quote(&melt_request.unit, custom_options)
.await
.map_err(|err| {
tracing::error!(
"Could not get payment quote for melt quote, {} {}, {}",
unit,
method,
err
);
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("get_melt_custom_quote");
METRICS.record_mint_operation("get_melt_custom_quote", false);
METRICS.record_error();
}
Error::UnsupportedUnit
})?;
if payment_quote.unit() != unit {
return Err(Error::UnitMismatch);
}
self.check_melt_request_acceptable(
payment_quote.amount.clone(),
PaymentMethod::from(method.as_str()),
request.clone(),
None, )
.await?;
let melt_ttl = self.quote_ttl().await?.melt_ttl;
let quote_amount = payment_quote.amount;
let quote_fee = payment_quote.fee;
let quote = MeltQuote::new(
None,
MeltPaymentRequest::Custom {
method: method.to_string(),
request: request.clone(),
},
unit.clone(),
quote_amount.clone(),
quote_fee,
unix_time() + melt_ttl,
payment_quote.request_lookup_id.clone(),
None, PaymentMethod::from(method.as_str()),
);
tracing::debug!(
"New {} melt quote {} for {} {} with request id {:?}",
method,
quote.id,
quote_amount,
unit,
payment_quote.request_lookup_id
);
let mut tx = self.localstore.begin_transaction().await?;
tx.add_melt_quote(quote.clone()).await?;
tx.commit().await?;
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("get_melt_custom_quote");
METRICS.record_mint_operation("get_melt_custom_quote", true);
}
Ok(quote.into())
}
#[instrument(skip(self))]
pub async fn check_melt_quote(
&self,
quote_id: &QuoteId,
) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("check_melt_quote");
let mut quote = match self.localstore.get_melt_quote(quote_id).await {
Ok(Some(quote)) => quote,
Ok(None) => {
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("check_melt_quote");
METRICS.record_mint_operation("check_melt_quote", false);
METRICS.record_error();
}
return Err(Error::UnknownQuote);
}
Err(err) => {
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("check_melt_quote");
METRICS.record_mint_operation("check_melt_quote", false);
METRICS.record_error();
}
return Err(err.into());
}
};
self.handle_pending_melt_quote(&mut quote).await?;
let blind_signatures = match self
.localstore
.get_blind_signatures_for_quote(quote_id)
.await
{
Ok(signatures) => signatures,
Err(err) => {
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("check_melt_quote");
METRICS.record_mint_operation("check_melt_quote", false);
METRICS.record_error();
}
return Err(err.into());
}
};
let change = (!blind_signatures.is_empty()).then_some(blind_signatures);
let response = MeltQuoteBolt11Response {
quote: quote.id.clone(),
state: quote.state,
expiry: quote.expiry,
amount: quote.amount().into(),
fee_reserve: quote.fee_reserve().into(),
payment_preimage: quote.payment_preimage,
change,
request: Some(quote.request.to_string()),
unit: Some(quote.unit.clone()),
};
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("check_melt_quote");
METRICS.record_mint_operation("check_melt_quote", true);
}
Ok(response)
}
#[instrument(skip_all)]
pub async fn melt_quotes(&self) -> Result<Vec<MeltQuote>, Error> {
let quotes = self.localstore.get_melt_quotes().await?;
Ok(quotes)
}
#[instrument(skip_all)]
pub async fn melt(&self, melt_request: &MeltRequest<QuoteId>) -> Result<PendingMelt, Error> {
if let Some(outputs) = melt_request.outputs() {
let outputs_count = outputs.len();
if outputs_count > self.max_outputs {
tracing::warn!(
"Melt request exceeds max outputs limit: {} > {}",
outputs_count,
self.max_outputs
);
return Err(Error::MaxOutputsExceeded {
actual: outputs_count,
max: self.max_outputs,
});
}
}
let verification = self.verify_inputs(melt_request.inputs()).await?;
let quote_id = melt_request.quote().clone();
let quote = self
.localstore
.get_melt_quote("e_id)
.await?
.ok_or(Error::UnknownQuote)?;
let init_saga = MeltSaga::new(
std::sync::Arc::new(self.clone()),
self.localstore.clone(),
std::sync::Arc::clone(&self.pubsub_manager),
);
let setup_saga = init_saga
.setup_melt(melt_request, verification, quote.payment_method.clone())
.await?;
let melt_request_owned = melt_request.clone();
let quote_id_for_log = quote_id.clone();
let completion = tokio::spawn(async move {
tracing::debug!(
"Starting background melt completion for quote: {}",
quote_id_for_log
);
let result = match setup_saga
.attempt_internal_settlement(&melt_request_owned)
.await
{
Ok((setup_saga, settlement)) => {
match setup_saga.make_payment(settlement).await {
Ok(payment_saga) => {
payment_saga.finalize().await
}
Err(err) => Err(err),
}
}
Err(err) => Err(err),
};
match &result {
Ok(_) => {
tracing::info!(
"Background melt completed successfully for quote: {}",
quote_id_for_log
);
}
Err(e) => {
tracing::error!(
"Background melt completion failed for quote {}: {}",
quote_id_for_log,
e
);
}
}
result
});
Ok(PendingMelt {
response: MeltQuoteBolt11Response {
quote: quote_id,
amount: quote.amount().into(),
fee_reserve: quote.fee_reserve().into(),
state: MeltQuoteState::Pending,
expiry: quote.expiry,
payment_preimage: None,
change: None,
request: Some(quote.request.to_string()),
unit: Some(quote.unit),
},
completion,
})
}
}