use std::pin::Pin;
use std::task::Poll;
use futures::stream::Stream;
use futures::TryStreamExt;
use tokio::sync::broadcast;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use ark::lightning::{AsPaymentHash, Invoice, Offer, PaymentHash};
use crate::Wallet;
use crate::movement::{Movement, PaymentMethod};
use crate::subsystem::Subsystem;
#[derive(Debug, Clone)]
pub enum WalletNotification {
MovementCreated {
movement: Movement,
},
MovementUpdated {
movement: Movement,
},
ChannelLagging,
}
impl WalletNotification {
pub fn movement(&self) -> Option<&Movement> {
match self {
Self::MovementCreated { movement } => Some(movement),
Self::MovementUpdated { movement } => Some(movement),
Self::ChannelLagging => None,
}
}
pub fn lightning_invoice(&self) -> Option<&Invoice> {
self.movement().and_then(|m| m.lightning_invoice())
}
pub fn lightning_offer(&self) -> Option<&Offer> {
self.movement().and_then(|m| m.lightning_offer())
}
pub fn lightning_payment_hash(&self) -> Option<PaymentHash> {
self.movement().and_then(|m| m.lightning_payment_hash())
}
}
pub struct NotificationStream {
rx: BroadcastStream<WalletNotification>,
}
impl NotificationStream {
pub(crate) fn new(rx: broadcast::Receiver<WalletNotification>) -> Self {
Self {
rx: BroadcastStream::new(rx),
}
}
pub fn movements(self) -> impl Stream<Item = Movement> + Unpin + Send {
self.filter_map(|n| match n {
WalletNotification::MovementCreated { movement } => Some(movement),
WalletNotification::MovementUpdated { movement } => Some(movement),
WalletNotification::ChannelLagging => None,
})
}
pub fn filter_arkoor_address_movements(
self,
address: ark::Address,
) -> impl Stream<Item = Movement> + Unpin + Send {
self.movements().filter(move |m| {
if !m.subsystem.is_subsystem(Subsystem::ARKOOR) {
return false;
}
m.received_on.iter().any(|d| match d.destination {
PaymentMethod::Ark(ref a) if *a == address => true,
_ => false,
})
})
}
pub fn filter_lightning_payment_movements(
self,
payment: impl AsPaymentHash,
) -> impl Stream<Item = Movement> + Unpin + Send {
let payment_hash = payment.as_payment_hash();
self.movements().filter(move |m| m.lightning_payment_hash() == Some(payment_hash))
}
pub fn into_raw_stream(self) -> BroadcastStream<WalletNotification> {
self.rx
}
}
impl Stream for NotificationStream {
type Item = WalletNotification;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.rx.try_poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(m))) => Poll::Ready(Some(m)),
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_)))) => {
Poll::Ready(Some(WalletNotification::ChannelLagging))
},
}
}
}
#[derive(Clone)]
pub(crate) struct NotificationDispatch {
tx: broadcast::Sender<WalletNotification>,
}
impl NotificationDispatch {
pub fn new() -> Self {
let (tx, _rx) = broadcast::channel(64);
Self { tx }
}
pub fn subscribe(&self) -> NotificationStream {
NotificationStream::new(self.tx.subscribe())
}
fn dispatch(&self, n: WalletNotification) {
let _ = self.tx.send(n);
}
pub fn dispatch_movement_created(&self, movement: Movement) {
self.dispatch(WalletNotification::MovementCreated { movement });
}
pub fn dispatch_movement_updated(&self, movement: Movement) {
self.dispatch(WalletNotification::MovementUpdated { movement });
}
}
impl Wallet {
pub fn subscribe_notifications(&self) -> NotificationStream {
self.notifications.subscribe()
}
}