use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use cdk_common::amount::SplitTarget;
use cdk_common::MintQuoteState;
use futures::Stream;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::error::Error;
use crate::nuts::{Proofs, SpendingConditions};
use crate::wallet::types::MintQuote;
use crate::wallet::wallet_repository::WalletRepository;
use crate::wallet::Wallet;
#[allow(missing_debug_implementations)]
pub struct NpubCashProofStream {
rx: mpsc::Receiver<Result<(MintQuote, Proofs), Error>>,
cancel: CancellationToken,
}
impl NpubCashProofStream {
pub fn new(
wallet: WalletRepository,
poll_interval: Duration,
split_target: SplitTarget,
spending_conditions: Option<SpendingConditions>,
) -> Self {
let (tx, rx) = mpsc::channel(32);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(poll_interval);
loop {
tokio::select! {
_ = cancel_clone.cancelled() => {
break;
}
_ = interval.tick() => {
match wallet.sync_npubcash_quotes().await {
Ok(quotes) => {
for quote in quotes {
if matches!(quote.state, MintQuoteState::Paid) {
let quote_id = quote.id.clone();
let mint_url = quote.mint_url.clone();
tracing::info!("Minting NpubCash quote {}...", quote_id);
let result = async {
let wallet_instance = wallet.get_wallet(&mint_url, "e.unit).await?;
let proofs = wallet_instance
.mint(
"e_id,
split_target.clone(),
spending_conditions.clone(),
)
.await?;
Ok((quote.clone(), proofs))
}.await;
if tx.send(result).await.is_err() {
return; }
}
}
}
Err(e) => {
tracing::warn!("Error syncing NpubCash quotes: {}", e);
}
}
}
}
}
});
Self { rx, cancel }
}
}
impl Stream for NpubCashProofStream {
type Item = Result<(MintQuote, Proofs), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
impl Drop for NpubCashProofStream {
fn drop(&mut self) {
self.cancel.cancel();
}
}
#[allow(missing_debug_implementations)]
pub struct WalletNpubCashProofStream {
rx: mpsc::Receiver<Result<(MintQuote, Proofs), Error>>,
cancel: CancellationToken,
}
impl WalletNpubCashProofStream {
pub fn new(
wallet: Wallet,
poll_interval: Duration,
split_target: SplitTarget,
spending_conditions: Option<SpendingConditions>,
) -> Self {
let (tx, rx) = mpsc::channel(32);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(poll_interval);
loop {
tokio::select! {
_ = cancel_clone.cancelled() => {
break;
}
_ = interval.tick() => {
match wallet.sync_npubcash_quotes().await {
Ok(quotes) => {
for quote in quotes {
if matches!(quote.state, MintQuoteState::Paid) {
let quote_id = quote.id.clone();
let mint_url = quote.mint_url.clone();
if mint_url != wallet.mint_url {
tracing::debug!("Skipping quote {} for different mint {}", quote_id, mint_url);
continue;
}
tracing::info!("Minting NpubCash quote {}...", quote_id);
let result = async {
let proofs = wallet
.mint(
"e_id,
split_target.clone(),
spending_conditions.clone(),
)
.await?;
Ok((quote.clone(), proofs))
}.await;
if tx.send(result).await.is_err() {
return; }
}
}
}
Err(e) => {
tracing::warn!("Error syncing NpubCash quotes: {}", e);
}
}
}
}
}
});
Self { rx, cancel }
}
}
impl Stream for WalletNpubCashProofStream {
type Item = Result<(MintQuote, Proofs), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
impl Drop for WalletNpubCashProofStream {
fn drop(&mut self) {
self.cancel.cancel();
}
}