use std::{
sync::{Arc, Weak},
time::{Duration, Instant},
};
use zenoh_collections::RingBuffer;
use zenoh_result::ZResult;
use crate::api::{
handlers::{callback::Callback, IntoHandler},
session::API_DATA_RECEPTION_CHANNEL_SIZE,
};
pub struct RingChannel {
capacity: usize,
}
impl RingChannel {
pub fn new(capacity: usize) -> Self {
Self { capacity }
}
}
impl Default for RingChannel {
fn default() -> Self {
Self::new(*API_DATA_RECEPTION_CHANNEL_SIZE)
}
}
struct RingChannelInner<T> {
ring: std::sync::Mutex<RingBuffer<T>>,
not_empty: flume::Receiver<()>,
}
pub struct RingChannelHandler<T> {
ring: Weak<RingChannelInner<T>>,
}
impl<T> RingChannelHandler<T> {
pub fn recv(&self) -> ZResult<T> {
let Some(channel) = self.ring.upgrade() else {
bail!("The ringbuffer has been deleted.");
};
loop {
if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() {
return Ok(t);
}
channel.not_empty.recv().map_err(|e| zerror!("{}", e))?;
}
}
pub fn recv_deadline(&self, deadline: Instant) -> ZResult<Option<T>> {
let Some(channel) = self.ring.upgrade() else {
bail!("The ringbuffer has been deleted.");
};
loop {
if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() {
return Ok(Some(t));
}
match channel.not_empty.recv_deadline(deadline) {
Ok(()) => {}
Err(flume::RecvTimeoutError::Timeout) => return Ok(None),
Err(err) => bail!("{}", err),
}
}
}
pub fn recv_timeout(&self, timeout: Duration) -> ZResult<Option<T>> {
let Some(channel) = self.ring.upgrade() else {
bail!("The ringbuffer has been deleted.");
};
loop {
if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() {
return Ok(Some(t));
}
match channel.not_empty.recv_timeout(timeout) {
Ok(()) => {}
Err(flume::RecvTimeoutError::Timeout) => return Ok(None),
Err(err) => bail!("{}", err),
}
}
}
pub async fn recv_async(&self) -> ZResult<T> {
let Some(channel) = self.ring.upgrade() else {
bail!("The ringbuffer has been deleted.");
};
loop {
if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() {
return Ok(t);
}
channel
.not_empty
.recv_async()
.await
.map_err(|e| zerror!("{}", e))?;
}
}
pub fn try_recv(&self) -> ZResult<Option<T>> {
let Some(channel) = self.ring.upgrade() else {
bail!("The ringbuffer has been deleted.");
};
let mut guard = channel.ring.lock().map_err(|e| zerror!("{}", e))?;
Ok(guard.pull())
}
}
impl<T: Send + 'static> IntoHandler<T> for RingChannel {
type Handler = RingChannelHandler<T>;
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = flume::bounded(1);
let inner = Arc::new(RingChannelInner {
ring: std::sync::Mutex::new(RingBuffer::new(self.capacity)),
not_empty: receiver,
});
let receiver = RingChannelHandler {
ring: Arc::downgrade(&inner),
};
(
Callback::from(move |t| match inner.ring.lock() {
Ok(mut g) => {
g.push_force(t);
drop(g);
let _ = sender.try_send(());
}
Err(e) => tracing::error!("{}", e),
}),
receiver,
)
}
}