use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use zenoh_result::ZResult;
use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};
pub struct FifoChannel {
capacity: usize,
}
impl FifoChannel {
pub fn new(capacity: usize) -> Self {
Self { capacity }
}
}
impl Default for FifoChannel {
fn default() -> Self {
Self::new(*API_DATA_RECEPTION_CHANNEL_SIZE)
}
}
#[derive(Debug, Clone)]
pub struct FifoChannelHandler<T>(flume::Receiver<T>);
impl<T: Send + 'static> IntoHandler<T> for FifoChannel {
type Handler = FifoChannelHandler<T>;
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = flume::bounded(self.capacity);
(
Callback::from(move |t| {
if let Err(error) = sender.send(t) {
tracing::error!(%error)
}
}),
FifoChannelHandler(receiver),
)
}
}
impl<T> FifoChannelHandler<T> {
pub fn try_recv(&self) -> ZResult<Option<T>> {
match self.0.try_recv() {
Ok(value) => Ok(Some(value)),
Err(flume::TryRecvError::Empty) => Ok(None),
Err(err) => Err(err.into()),
}
}
pub fn recv(&self) -> ZResult<T> {
self.0.recv().map_err(Into::into)
}
pub fn recv_deadline(&self, deadline: Instant) -> ZResult<Option<T>> {
match self.0.recv_deadline(deadline) {
Ok(value) => Ok(Some(value)),
Err(flume::RecvTimeoutError::Timeout) => Ok(None),
Err(err) => Err(err.into()),
}
}
pub fn recv_timeout(&self, duration: Duration) -> ZResult<Option<T>> {
match self.0.recv_timeout(duration) {
Ok(value) => Ok(Some(value)),
Err(flume::RecvTimeoutError::Timeout) => Ok(None),
Err(err) => Err(err.into()),
}
}
pub fn iter(&self) -> Iter<'_, T> {
Iter(self.0.iter())
}
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter(self.0.try_iter())
}
pub fn drain(&self) -> Drain<'_, T> {
Drain(self.0.drain())
}
pub fn is_disconnected(&self) -> bool {
self.0.is_disconnected()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn is_full(&self) -> bool {
self.0.is_full()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}
pub fn sender_count(&self) -> usize {
self.0.sender_count()
}
pub fn receiver_count(&self) -> usize {
self.0.receiver_count()
}
pub fn same_channel(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}
impl<'a, T> IntoIterator for &'a FifoChannelHandler<T> {
type Item = T;
type IntoIter = Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
Iter(self.0.iter())
}
}
impl<T> IntoIterator for FifoChannelHandler<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter(self.0.into_iter())
}
}
pub struct Iter<'a, T>(flume::Iter<'a, T>);
impl<T> Iterator for Iter<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
pub struct TryIter<'a, T>(flume::TryIter<'a, T>);
impl<T> Iterator for TryIter<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
#[derive(Debug)]
pub struct Drain<'a, T>(flume::Drain<'a, T>);
impl<T> Iterator for Drain<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
impl<T> ExactSizeIterator for Drain<'_, T> {
fn len(&self) -> usize {
self.0.len()
}
}
pub struct IntoIter<T>(flume::IntoIter<T>);
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
impl<T> FifoChannelHandler<T> {
pub fn recv_async(&self) -> RecvFut<'_, T> {
RecvFut(self.0.recv_async())
}
pub fn into_recv_async<'a>(self) -> RecvFut<'a, T> {
RecvFut(self.0.into_recv_async())
}
}
#[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
pub struct RecvFut<'a, T>(flume::r#async::RecvFut<'a, T>);
impl<T> Future for RecvFut<'_, T> {
type Output = ZResult<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Future::poll(Pin::new(&mut self.0), cx).map_err(Into::into)
}
}
impl<T> futures::future::FusedFuture for RecvFut<'_, T> {
fn is_terminated(&self) -> bool {
futures::future::FusedFuture::is_terminated(&self.0)
}
}
impl<T> FifoChannelHandler<T> {
pub fn stream(&self) -> RecvStream<'_, T> {
RecvStream(self.0.stream())
}
pub fn into_stream<'a>(self) -> RecvStream<'a, T> {
RecvStream(self.0.into_stream())
}
}
#[derive(Clone)]
pub struct RecvStream<'a, T>(flume::r#async::RecvStream<'a, T>);
impl<T> RecvStream<'_, T> {
pub fn is_disconnected(&self) -> bool {
self.0.is_disconnected()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn is_full(&self) -> bool {
self.0.is_full()
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn capacity(&self) -> Option<usize> {
self.0.capacity()
}
pub fn same_channel(&self, other: &Self) -> bool {
self.0.same_channel(&other.0)
}
}
impl<T> futures::stream::Stream for RecvStream<'_, T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures::stream::Stream::poll_next(Pin::new(&mut self.0), cx)
}
}
impl<T> futures::stream::FusedStream for RecvStream<'_, T> {
fn is_terminated(&self) -> bool {
futures::stream::FusedStream::is_terminated(&self.0)
}
}
impl<T: Clone + Send + Sync + 'static> IntoHandler<T>
for (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>)
{
type Handler = std::sync::mpsc::Receiver<T>;
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = self;
(
Callback::from(move |t: T| {
if let Err(error) = sender.send(t.clone()) {
tracing::error!(%error)
}
}),
receiver,
)
}
}