use std::{
future::Future,
pin::Pin,
sync::mpsc::{RecvTimeoutError, TryRecvError},
task::{Context, Poll},
time::{Duration, Instant},
};
use sled::Event;
use thiserror::Error;
use crate::{invoices_db::InvoiceStorageError, AcceptXmrError, Invoice};
pub struct Subscriber(sled::Subscriber);
impl Subscriber {
pub(crate) fn new(sled_subscriber: sled::Subscriber) -> Subscriber {
Subscriber(sled_subscriber)
}
pub fn recv(&mut self) -> Result<Invoice, AcceptXmrError> {
let maybe_event = self.0.next();
match maybe_event {
Some(Event::Insert { value, .. }) => {
bincode::decode_from_slice(&value, bincode::config::standard())
.map_err(|e| AcceptXmrError::from(InvoiceStorageError::from(e)))
.map(|tup| tup.0)
}
Some(Event::Remove { .. }) => self.recv(),
None => Err(AcceptXmrError::Subscriber(SubscriberError::Recv)),
}
}
pub fn try_recv(&mut self) -> Result<Invoice, AcceptXmrError> {
match self.0.next_timeout(Duration::from_nanos(0)) {
Ok(Event::Insert { value, .. }) => {
bincode::decode_from_slice(&value, bincode::config::standard())
.map_err(|e| AcceptXmrError::from(InvoiceStorageError::from(e)))
.map(|tup| tup.0)
}
Ok(Event::Remove { .. }) => self.try_recv(),
Err(RecvTimeoutError::Timeout) => Err(AcceptXmrError::from(SubscriberError::TryRecv(
TryRecvError::Empty,
))),
Err(RecvTimeoutError::Disconnected) => Err(AcceptXmrError::from(
SubscriberError::TryRecv(TryRecvError::Disconnected),
)),
}
}
pub fn recv_timeout(&mut self, timeout: Duration) -> Result<Invoice, AcceptXmrError> {
let start = Instant::now();
loop {
let event_or_err = self.0.next_timeout(timeout - start.elapsed());
match event_or_err {
Ok(Event::Insert { value, .. }) => {
return bincode::decode_from_slice(&value, bincode::config::standard())
.map_err(|e| AcceptXmrError::from(InvoiceStorageError::from(e)))
.map(|tup| tup.0)
}
Ok(Event::Remove { .. }) => continue,
Err(e) => return Err(AcceptXmrError::Subscriber(SubscriberError::RecvTimeout(e))),
}
}
}
}
impl Iterator for Subscriber {
type Item = Result<Invoice, AcceptXmrError>;
fn next(&mut self) -> Option<Result<Invoice, AcceptXmrError>> {
match self.0.next_timeout(Duration::from_nanos(0)) {
Ok(Event::Insert { value, .. }) => Some(
bincode::decode_from_slice(&value, bincode::config::standard())
.map_err(|e| AcceptXmrError::from(InvoiceStorageError::from(e)))
.map(|tup| tup.0),
),
_ => None,
}
}
}
impl Future for Subscriber {
type Output = Option<Result<Invoice, AcceptXmrError>>;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Invoice, AcceptXmrError>>> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(Some(Event::Insert { value, .. })) => Poll::Ready(Some(
bincode::decode_from_slice(&value, bincode::config::standard())
.map_err(|e| AcceptXmrError::from(InvoiceStorageError::from(e)))
.map(|tup| tup.0),
)),
Poll::Ready(Some(Event::Remove { .. })) => self.poll(cx),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Error, Debug)]
pub enum SubscriberError {
#[error("subscriber cannot receive further updates, likely because the scanning thread has panicked")]
Recv,
#[error("subscriber recv timeout: {0}")]
RecvTimeout(#[from] RecvTimeoutError),
#[error("subscriber try recv failed: {0}")]
TryRecv(#[from] TryRecvError),
}