use crate::{Core, Fwd, Waker};
use std::sync::{Arc, Mutex};
pub struct Channel<M: Send> {
arc: Arc<Mutex<ChannelBuf<M>>>,
}
struct ChannelBuf<M: Send> {
queue: Vec<M>,
waker: Option<Waker>, }
impl<M: Send> Channel<M> {
pub fn new(core: &mut Core, fwd: Fwd<M>) -> (Self, ChannelGuard) {
let arc = Arc::new(Mutex::new(ChannelBuf {
queue: Vec::new(),
waker: None,
}));
let arc1 = arc.clone();
let waker = core.waker(move |_, _| {
let mut guard = arc1.lock().expect("Stakker channel lock poisoned");
let vec = std::mem::take(&mut guard.queue);
let is_open = guard.waker.is_some();
drop(guard);
if is_open {
for msg in vec {
fwd.fwd(msg);
}
}
});
arc.lock().unwrap().waker = Some(waker);
let this = Self { arc };
let guard = ChannelGuard(Box::new(this.clone()));
(this, guard)
}
pub fn send(&self, msg: M) -> bool {
let mut guard = self.arc.lock().expect("Stakker channel lock poisoned");
if let Some(ref waker) = guard.waker {
if guard.queue.is_empty() {
waker.wake();
}
guard.queue.push(msg);
true
} else {
false
}
}
pub fn is_closed(&self) -> bool {
let guard = self.arc.lock().expect("Stakker channel lock poisoned");
guard.waker.is_none()
}
}
impl<M: Send> Clone for Channel<M> {
fn clone(&self) -> Self {
Self {
arc: self.arc.clone(),
}
}
}
trait Closable {
fn close(&self);
}
impl<M: 'static + Send> Closable for Channel<M> {
fn close(&self) {
let mut guard = self.arc.lock().expect("Stakker channel lock poisoned");
guard.waker.take();
guard.queue = Vec::new();
}
}
pub struct ChannelGuard(Box<dyn Closable>);
impl Drop for ChannelGuard {
fn drop(&mut self) {
self.0.close();
}
}