use std::task::Poll;
use cdk_common::PaymentRequestPayload;
use futures::{FutureExt, Stream};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use crate::error::Error;
use crate::wallet::streams::RecvFuture;
#[allow(clippy::type_complexity)]
pub struct NostrPaymentEventStream {
cancel: CancellationToken,
rx: Option<mpsc::Receiver<Result<PaymentRequestPayload, Error>>>,
init_fut: Option<RecvFuture<'static, Result<(), Error>>>,
cancel_fut: Option<RecvFuture<'static, ()>>,
rx_future: Option<
RecvFuture<
'static,
(
Option<Result<PaymentRequestPayload, Error>>,
mpsc::Receiver<Result<PaymentRequestPayload, Error>>,
),
>,
>,
}
impl NostrPaymentEventStream {
pub fn new(keys: nostr_sdk::Keys, relays: Vec<String>, pubkey: nostr_sdk::PublicKey) -> Self {
let cancel = CancellationToken::new();
let (tx, rx) = mpsc::channel::<Result<PaymentRequestPayload, Error>>(32);
let init_cancel = cancel.clone();
let init_fut = Box::pin(async move {
let client = nostr_sdk::Client::new(keys);
for r in &relays {
client
.add_read_relay(r.clone())
.await
.map_err(|e| Error::Custom(format!("Add relay {r}: {e}")))?;
}
client.connect().await;
let filter = nostr_sdk::Filter::new().pubkey(pubkey);
client
.subscribe(filter, None)
.await
.map_err(|e| Error::Custom(format!("Subscribe: {e}")))?;
let client_for_handler = client.clone();
let _bg = tokio::spawn(async move {
let tx_err = tx.clone();
let res = client
.handle_notifications(move |notification| {
let tx = tx.clone();
let client = client_for_handler.clone();
let cancel = init_cancel.clone();
async move {
if cancel.is_cancelled() {
return Ok(true);
}
if let nostr_sdk::RelayPoolNotification::Event { event, .. } =
notification
{
match client.unwrap_gift_wrap(&event).await {
Ok(unwrapped) => {
let rumor = unwrapped.rumor;
match serde_json::from_str::<PaymentRequestPayload>(
&rumor.content,
) {
Ok(payload) => {
if tx.send(Ok(payload)).await.is_err() {
return Ok(true);
}
}
Err(e) => {
let _ = tx
.send(Err(Error::Custom(format!(
"Invalid payload JSON: {e}"
))))
.await;
}
}
}
Err(e) => {
let _ = tx
.send(Err(Error::Custom(format!(
"Unwrap gift wrap failed: {e}"
))))
.await;
}
}
}
Ok(false)
}
})
.await;
if let Err(e) = res {
let _ = tx_err
.send(Err(Error::Custom(format!(
"Notification handler error: {e}"
))))
.await;
}
});
Ok(())
});
Self {
cancel,
rx: Some(rx),
init_fut: Some(init_fut),
cancel_fut: None,
rx_future: None,
}
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel.clone()
}
}
impl Stream for NostrPaymentEventStream {
type Item = Result<PaymentRequestPayload, Error>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.cancel_fut.is_none() {
let cancel = this.cancel.clone();
this.cancel_fut = Some(Box::pin(async move { cancel.cancelled().await }));
}
if let Some(mut fut) = this.cancel_fut.take() {
if fut.poll_unpin(cx).is_ready() {
this.rx.take();
return Poll::Ready(None);
}
this.cancel_fut = Some(fut);
}
if let Some(mut init) = this.init_fut.take() {
match init.poll_unpin(cx) {
Poll::Pending => {
this.init_fut = Some(init);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Ok(())) => {
}
}
}
if this.rx.is_none() {
return Poll::Ready(None);
}
if this.rx_future.is_none() {
let mut rx = this.rx.take().expect("receiver");
this.rx_future = Some(Box::pin(async move {
let item = rx.recv().await;
(item, rx)
}));
}
let mut fut = this.rx_future.take().ok_or(Error::Internal)?;
match fut.poll_unpin(cx) {
Poll::Pending => {
this.rx_future = Some(fut);
Poll::Pending
}
Poll::Ready((item, rx)) => {
this.rx = Some(rx);
match item {
None => Poll::Ready(None),
Some(item) => Poll::Ready(Some(item)),
}
}
}
}
}