use tokio_stream::wrappers::ReceiverStream;
use zaino_proto::proto::{
compact_formats::{CompactBlock, CompactTx},
service::{Address, GetAddressUtxosReply, RawTransaction, SubtreeRoot},
};
#[derive(Debug)]
pub struct RawTransactionStream {
inner: ReceiverStream<Result<RawTransaction, tonic::Status>>,
}
impl RawTransactionStream {
pub fn new(rx: tokio::sync::mpsc::Receiver<Result<RawTransaction, tonic::Status>>) -> Self {
RawTransactionStream {
inner: ReceiverStream::new(rx),
}
}
}
impl futures::Stream for RawTransactionStream {
type Item = Result<RawTransaction, tonic::Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = std::pin::Pin::new(&mut self.inner).poll_next(cx);
match poll {
std::task::Poll::Ready(Some(Ok(raw_tx))) => std::task::Poll::Ready(Some(Ok(raw_tx))),
std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub struct CompactTransactionStream {
inner: ReceiverStream<Result<CompactTx, tonic::Status>>,
}
impl CompactTransactionStream {
pub fn new(rx: tokio::sync::mpsc::Receiver<Result<CompactTx, tonic::Status>>) -> Self {
CompactTransactionStream {
inner: ReceiverStream::new(rx),
}
}
}
impl futures::Stream for CompactTransactionStream {
type Item = Result<CompactTx, tonic::Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = std::pin::Pin::new(&mut self.inner).poll_next(cx);
match poll {
std::task::Poll::Ready(Some(Ok(raw_tx))) => std::task::Poll::Ready(Some(Ok(raw_tx))),
std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub struct CompactBlockStream {
inner: ReceiverStream<Result<CompactBlock, tonic::Status>>,
}
impl CompactBlockStream {
pub fn new(rx: tokio::sync::mpsc::Receiver<Result<CompactBlock, tonic::Status>>) -> Self {
CompactBlockStream {
inner: ReceiverStream::new(rx),
}
}
}
impl futures::Stream for CompactBlockStream {
type Item = Result<CompactBlock, tonic::Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = std::pin::Pin::new(&mut self.inner).poll_next(cx);
match poll {
std::task::Poll::Ready(Some(Ok(raw_tx))) => std::task::Poll::Ready(Some(Ok(raw_tx))),
std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub struct UtxoReplyStream {
inner: ReceiverStream<Result<GetAddressUtxosReply, tonic::Status>>,
}
impl UtxoReplyStream {
pub fn new(
rx: tokio::sync::mpsc::Receiver<Result<GetAddressUtxosReply, tonic::Status>>,
) -> Self {
UtxoReplyStream {
inner: ReceiverStream::new(rx),
}
}
}
impl futures::Stream for UtxoReplyStream {
type Item = Result<GetAddressUtxosReply, tonic::Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = std::pin::Pin::new(&mut self.inner).poll_next(cx);
match poll {
std::task::Poll::Ready(Some(Ok(raw_tx))) => std::task::Poll::Ready(Some(Ok(raw_tx))),
std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub struct SubtreeRootReplyStream {
inner: ReceiverStream<Result<SubtreeRoot, tonic::Status>>,
}
impl SubtreeRootReplyStream {
pub fn new(rx: tokio::sync::mpsc::Receiver<Result<SubtreeRoot, tonic::Status>>) -> Self {
SubtreeRootReplyStream {
inner: ReceiverStream::new(rx),
}
}
}
impl futures::Stream for SubtreeRootReplyStream {
type Item = Result<SubtreeRoot, tonic::Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = std::pin::Pin::new(&mut self.inner).poll_next(cx);
match poll {
std::task::Poll::Ready(Some(Ok(raw_tx))) => std::task::Poll::Ready(Some(Ok(raw_tx))),
std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub struct AddressStream {
inner: ReceiverStream<Result<Address, tonic::Status>>,
}
impl AddressStream {
pub fn new(rx: tokio::sync::mpsc::Receiver<Result<Address, tonic::Status>>) -> Self {
AddressStream {
inner: ReceiverStream::new(rx),
}
}
}
impl futures::Stream for AddressStream {
type Item = Result<Address, tonic::Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let poll = std::pin::Pin::new(&mut self.inner).poll_next(cx);
match poll {
std::task::Poll::Ready(Some(Ok(address))) => std::task::Poll::Ready(Some(Ok(address))),
std::task::Poll::Ready(Some(Err(e))) => std::task::Poll::Ready(Some(Err(e))),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}