use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Semaphore;
pub struct Inner<T> {
tx: flume::Sender<T>,
rx: flume::Receiver<T>,
closed: Arc<AtomicBool>,
close_signal: Arc<Semaphore>,
}
impl<T> Inner<T> {
pub fn new(cap: usize) -> Self {
let (tx, rx) = if cap == 0 {
flume::bounded(0)
} else {
flume::bounded(cap)
};
Inner {
tx, rx,
closed: Arc::new(AtomicBool::new(false)),
close_signal: Arc::new(Semaphore::new(0)),
}
}
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
pub fn close(&self) {
self.closed.store(true, Ordering::SeqCst);
self.close_signal.close();
}
pub fn send(&self, v: T) -> Result<(), T> {
if self.is_closed() {
return Err(v);
}
match self.tx.send(v) {
Ok(()) => Ok(()),
Err(e) => Err(e.into_inner()),
}
}
pub async fn send_async(&self, v: T) -> Result<(), T> {
if self.is_closed() {
return Err(v);
}
match self.tx.send_async(v).await {
Ok(()) => Ok(()),
Err(e) => Err(e.into_inner()),
}
}
pub fn try_send(&self, v: T) -> Result<(), T> {
if self.is_closed() {
return Err(v);
}
match self.tx.try_send(v) {
Ok(()) => Ok(()),
Err(flume::TrySendError::Full(v)) => Err(v),
Err(flume::TrySendError::Disconnected(v)) => Err(v),
}
}
pub fn recv(&self) -> Option<T> {
loop {
match self.rx.try_recv() {
Ok(v) => return Some(v),
Err(flume::TryRecvError::Disconnected) => return None,
Err(flume::TryRecvError::Empty) => {
if self.is_closed() { return None; }
match self.rx.recv_timeout(std::time::Duration::from_millis(50)) {
Ok(v) => return Some(v),
Err(flume::RecvTimeoutError::Timeout) => continue,
Err(flume::RecvTimeoutError::Disconnected) => return None,
}
}
}
}
}
pub async fn recv_async(&self) -> Option<T> {
loop {
match self.rx.try_recv() {
Ok(v) => return Some(v),
Err(flume::TryRecvError::Disconnected) => return None,
Err(flume::TryRecvError::Empty) => {
if self.is_closed() { return None; }
tokio::select! {
biased;
res = self.rx.recv_async() => match res {
Ok(v) => return Some(v),
Err(_) => return None,
},
_ = self.close_signal.acquire() => {
continue;
}
}
}
}
}
}
pub fn try_recv(&self) -> Option<T> {
self.rx.try_recv().ok()
}
pub fn len(&self) -> usize { self.tx.len() }
pub fn cap(&self) -> usize { self.tx.capacity().unwrap_or(0) }
}
impl<T> Clone for Inner<T> {
fn clone(&self) -> Self {
Inner {
tx: self.tx.clone(),
rx: self.rx.clone(),
closed: self.closed.clone(),
close_signal: self.close_signal.clone(),
}
}
}
pub const ENGINE_NAME: &str = "flume";