use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
};
use futures::{stream::Peekable, Stream, StreamExt};
use crate::types::RpcOut;
#[derive(Debug)]
pub(crate) struct Sender {
priority_cap: usize,
len: Arc<AtomicUsize>,
pub(crate) priority_sender: async_channel::Sender<RpcOut>,
pub(crate) non_priority_sender: async_channel::Sender<RpcOut>,
priority_receiver: async_channel::Receiver<RpcOut>,
non_priority_receiver: async_channel::Receiver<RpcOut>,
}
impl Sender {
pub(crate) fn new(cap: usize) -> Sender {
let (priority_sender, priority_receiver) = async_channel::unbounded();
let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2);
let len = Arc::new(AtomicUsize::new(0));
Sender {
priority_cap: cap / 2,
len,
priority_sender,
non_priority_sender,
priority_receiver,
non_priority_receiver,
}
}
pub(crate) fn new_receiver(&self) -> Receiver {
Receiver {
priority_queue_len: self.len.clone(),
priority: Box::pin(self.priority_receiver.clone().peekable()),
non_priority: Box::pin(self.non_priority_receiver.clone().peekable()),
}
}
#[allow(clippy::result_large_err)]
pub(crate) fn send_message(&self, rpc: RpcOut) -> Result<(), RpcOut> {
if let RpcOut::Publish { .. } = rpc {
let len = self.len.load(Ordering::Relaxed);
if len >= self.priority_cap {
return Err(rpc);
}
self.len.store(len + 1, Ordering::Relaxed);
}
let sender = match rpc {
RpcOut::Publish { .. }
| RpcOut::Graft(_)
| RpcOut::Prune(_)
| RpcOut::Subscribe(_)
| RpcOut::Unsubscribe(_) => &self.priority_sender,
RpcOut::Forward { .. } | RpcOut::IHave(_) | RpcOut::IWant(_) | RpcOut::IDontWant(_) => {
&self.non_priority_sender
}
};
sender.try_send(rpc).map_err(|err| err.into_inner())
}
#[cfg(feature = "metrics")]
pub(crate) fn priority_queue_len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
#[cfg(feature = "metrics")]
pub(crate) fn non_priority_queue_len(&self) -> usize {
self.non_priority_sender.len()
}
}
#[derive(Debug)]
pub struct Receiver {
pub(crate) priority_queue_len: Arc<AtomicUsize>,
pub(crate) priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
pub(crate) non_priority: Pin<Box<Peekable<async_channel::Receiver<RpcOut>>>>,
}
impl Receiver {
pub(crate) fn poll_stale(&mut self, cx: &mut Context<'_>) -> Poll<Option<RpcOut>> {
let priority = match self.priority.as_mut().poll_peek_mut(cx) {
Poll::Ready(Some(RpcOut::Publish {
message: _,
ref mut timeout,
})) => {
if Pin::new(timeout).poll(cx).is_ready() {
let dropped = futures::ready!(self.priority.poll_next_unpin(cx))
.expect("There should be a message");
return Poll::Ready(Some(dropped));
}
Poll::Ready(None)
}
poll => poll,
};
let non_priority = match self.non_priority.as_mut().poll_peek_mut(cx) {
Poll::Ready(Some(RpcOut::Forward {
message: _,
ref mut timeout,
})) => {
if Pin::new(timeout).poll(cx).is_ready() {
let dropped = futures::ready!(self.non_priority.poll_next_unpin(cx))
.expect("There should be a message");
return Poll::Ready(Some(dropped));
}
Poll::Ready(None)
}
poll => poll,
};
match (priority, non_priority) {
(Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None),
_ => Poll::Pending,
}
}
pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool {
matches!(
(
self.priority.as_mut().poll_peek(cx),
self.non_priority.as_mut().poll_peek(cx),
),
(Poll::Ready(None), Poll::Ready(None))
)
}
}
impl Stream for Receiver {
type Item = RpcOut;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if let Poll::Ready(rpc) = Pin::new(&mut self.priority).poll_next(cx) {
if let Some(RpcOut::Publish { .. }) = rpc {
self.priority_queue_len.fetch_sub(1, Ordering::Relaxed);
}
return Poll::Ready(rpc);
}
Pin::new(&mut self.non_priority).poll_next(cx)
}
}