use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Semaphore;
pub struct Inner<T> {
tx: flume::Sender<T>,
rx: flume::Receiver<T>,
closed: Arc<AtomicBool>,
close_signal: Arc<Semaphore>,
close_rx: flume::Receiver<()>,
close_tx_guard: Arc<Mutex<Option<flume::Sender<()>>>>,
}
impl<T> Inner<T> {
pub fn new(cap: usize) -> Self {
let (tx, rx) = if cap == 0 {
flume::bounded(0)
} else {
flume::bounded(cap)
};
let (close_tx, close_rx) = flume::bounded::<()>(1);
Inner {
tx,
rx,
closed: Arc::new(AtomicBool::new(false)),
close_signal: Arc::new(Semaphore::new(0)),
close_rx,
close_tx_guard: Arc::new(Mutex::new(Some(close_tx))),
}
}
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
}
pub fn close(&self) -> bool {
let was_open = !self.closed.swap(true, Ordering::SeqCst);
if was_open {
self.close_signal.close();
let _ = self.close_tx_guard.lock().unwrap().take();
}
was_open
}
pub fn send(&self, v: T) -> Result<(), T> {
if self.is_closed() {
return Err(v);
}
match self.tx.send(v) {
Ok(()) => Ok(()),
Err(e) => Err(e.into_inner()),
}
}
pub async fn send_async(&self, v: T) -> Result<(), T> {
if self.is_closed() {
return Err(v);
}
match self.tx.send_async(v).await {
Ok(()) => Ok(()),
Err(e) => Err(e.into_inner()),
}
}
pub fn try_send(&self, v: T) -> Result<(), T> {
if self.is_closed() {
return Err(v);
}
match self.tx.try_send(v) {
Ok(()) => Ok(()),
Err(flume::TrySendError::Full(v)) => Err(v),
Err(flume::TrySendError::Disconnected(v)) => Err(v),
}
}
pub fn recv(&self) -> Option<T> {
loop {
match self.rx.try_recv() {
Ok(v) => return Some(v),
Err(flume::TryRecvError::Disconnected) => return None,
Err(flume::TryRecvError::Empty) => {
if self.is_closed() { return None; }
}
}
let mut got: Option<T> = None;
flume::Selector::new()
.recv(&self.rx, |res| {
if let Ok(v) = res { got = Some(v); }
})
.recv(&self.close_rx, |_| {
})
.wait();
if got.is_some() { return got; }
}
}
pub async fn recv_async(&self) -> Option<T> {
loop {
match self.rx.try_recv() {
Ok(v) => return Some(v),
Err(flume::TryRecvError::Disconnected) => return None,
Err(flume::TryRecvError::Empty) => {
if self.is_closed() { return None; }
tokio::select! {
biased;
res = self.rx.recv_async() => match res {
Ok(v) => return Some(v),
Err(_) => return None,
},
_ = self.close_signal.acquire() => {
continue;
}
}
}
}
}
}
pub fn try_recv(&self) -> Option<T> {
self.rx.try_recv().ok()
}
pub fn len(&self) -> usize { self.tx.len() }
pub fn cap(&self) -> usize { self.tx.capacity().unwrap_or(0) }
#[doc(hidden)]
pub fn __flume_rx(&self) -> &flume::Receiver<T> { &self.rx }
#[doc(hidden)]
pub fn __flume_close_rx(&self) -> &flume::Receiver<()> { &self.close_rx }
#[doc(hidden)]
pub fn __flume_tx(&self) -> &flume::Sender<T> { &self.tx }
}
impl<T> Clone for Inner<T> {
fn clone(&self) -> Self {
Inner {
tx: self.tx.clone(),
rx: self.rx.clone(),
closed: self.closed.clone(),
close_signal: self.close_signal.clone(),
close_rx: self.close_rx.clone(),
close_tx_guard: self.close_tx_guard.clone(),
}
}
}
pub const ENGINE_NAME: &str = "flume";