use std::collections::HashMap;
use std::task::Poll;
use cdk_common::amount::SplitTarget;
use cdk_common::wallet::MintQuote;
use cdk_common::{Error, PaymentMethod, Proofs, SpendingConditions};
use futures::{FutureExt, Stream, StreamExt};
use tokio_util::sync::CancellationToken;
use super::payment::PaymentStream;
use super::{RecvFuture, WaitableEvent};
use crate::Wallet;
#[allow(missing_debug_implementations)]
pub struct MultipleMintQuoteProofStream<'a> {
payment_stream: PaymentStream<'a>,
wallet: &'a Wallet,
quotes: HashMap<String, MintQuote>,
amount_split_target: SplitTarget,
spending_conditions: Option<SpendingConditions>,
minting_future: Option<RecvFuture<'a, Result<(MintQuote, Proofs), Error>>>,
}
impl<'a> MultipleMintQuoteProofStream<'a> {
pub fn new(
wallet: &'a Wallet,
quotes: Vec<MintQuote>,
amount_split_target: SplitTarget,
spending_conditions: Option<SpendingConditions>,
) -> Self {
let filter: WaitableEvent = quotes.as_slice().into();
Self {
payment_stream: PaymentStream::new(wallet, filter.into_subscription()),
wallet,
amount_split_target,
spending_conditions,
quotes: quotes
.into_iter()
.map(|mint_quote| (mint_quote.id.clone(), mint_quote))
.collect(),
minting_future: None,
}
}
pub fn get_cancel_token(&self) -> CancellationToken {
self.payment_stream.get_cancel_token()
}
}
impl Stream for MultipleMintQuoteProofStream<'_> {
type Item = Result<(MintQuote, Proofs), Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(mut minting_future) = this.minting_future.take() {
return match minting_future.poll_unpin(cx) {
Poll::Pending => {
this.minting_future = Some(minting_future);
Poll::Pending
}
Poll::Ready(proofs) => Poll::Ready(Some(proofs)),
};
}
match this.payment_stream.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => match result {
None => Poll::Ready(None),
Some(result) => {
let (quote_id, amount) = match result {
Err(err) => {
tracing::error!(
"Error while waiting for payment for {:?}",
this.quotes.keys().collect::<Vec<_>>()
);
return Poll::Ready(Some(Err(err)));
}
Ok(amount) => amount,
};
let mint_quote = if let Some(quote) = this.quotes.get("e_id) {
quote.clone()
} else {
tracing::error!("Cannot find mint_quote {} internally", quote_id);
return Poll::Ready(Some(Err(Error::UnknownQuote)));
};
let amount_split_target = this.amount_split_target.clone();
let spending_conditions = this.spending_conditions.clone();
let wallet = this.wallet;
tracing::debug!(
"Received payment ({:?}) notification for {}. Minting...",
amount,
mint_quote.id
);
let mut minting_future = Box::pin(async move {
match mint_quote.payment_method {
PaymentMethod::Known(cdk_common::nut00::KnownMethod::Bolt11) => wallet
.mint(&mint_quote.id, amount_split_target, spending_conditions)
.await
.map(|proofs| (mint_quote, proofs)),
PaymentMethod::Known(cdk_common::nut00::KnownMethod::Bolt12) => wallet
.mint(&mint_quote.id, amount_split_target, spending_conditions)
.await
.map(|proofs| (mint_quote, proofs)),
_ => Err(Error::UnsupportedPaymentMethod),
}
});
match minting_future.poll_unpin(cx) {
Poll::Pending => {
this.minting_future = Some(minting_future);
Poll::Pending
}
Poll::Ready(result) => Poll::Ready(Some(result)),
}
}
},
}
}
}
#[allow(missing_debug_implementations)]
pub struct SingleMintQuoteProofStream<'a>(MultipleMintQuoteProofStream<'a>);
impl<'a> SingleMintQuoteProofStream<'a> {
pub fn new(
wallet: &'a Wallet,
quote: MintQuote,
amount_split_target: SplitTarget,
spending_conditions: Option<SpendingConditions>,
) -> Self {
Self(MultipleMintQuoteProofStream::new(
wallet,
vec![quote],
amount_split_target,
spending_conditions,
))
}
pub fn get_cancel_token(&self) -> CancellationToken {
self.0.payment_stream.get_cancel_token()
}
}
impl Stream for SingleMintQuoteProofStream<'_> {
type Item = Result<Proofs, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.0.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(result) => match result {
None => Poll::Ready(None),
Some(Err(err)) => Poll::Ready(Some(Err(err))),
Some(Ok((_, proofs))) => Poll::Ready(Some(Ok(proofs))),
},
}
}
}