use std::cell::RefCell;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::time::Duration;
pub trait ChannelDrain {
fn drain(&self);
fn clear(&self);
fn has_messages(&self) -> bool;
}
pub struct WakingSender<T> {
tx: mpsc::Sender<T>,
wake: Arc<AtomicBool>,
}
impl<T> Clone for WakingSender<T> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
wake: Arc::clone(&self.wake),
}
}
}
impl<T> WakingSender<T> {
pub fn send(&self, value: T) -> Result<(), mpsc::SendError<T>> {
let result = self.tx.send(value);
if result.is_ok() {
self.wake.store(true, Ordering::Release);
}
result
}
}
pub struct ChannelHandle<T> {
inner: Rc<ChannelInner<T>>,
}
struct ChannelInner<T> {
tx: mpsc::Sender<T>,
wake: Arc<AtomicBool>,
rx: RefCell<mpsc::Receiver<T>>,
messages: RefCell<Vec<T>>,
}
impl<T> Clone for ChannelHandle<T> {
fn clone(&self) -> Self {
Self {
inner: Rc::clone(&self.inner),
}
}
}
impl<T: 'static> ChannelHandle<T> {
pub fn new(wake: Arc<AtomicBool>) -> Self {
let (tx, rx) = mpsc::channel();
Self {
inner: Rc::new(ChannelInner {
tx,
wake,
rx: RefCell::new(rx),
messages: RefCell::new(Vec::new()),
}),
}
}
pub fn tx(&self) -> WakingSender<T> {
WakingSender {
tx: self.inner.tx.clone(),
wake: Arc::clone(&self.inner.wake),
}
}
pub fn get(&self) -> Vec<T>
where
T: Clone,
{
self.inner.messages.borrow().clone()
}
pub fn len(&self) -> usize {
self.inner.messages.borrow().len()
}
pub fn is_empty(&self) -> bool {
self.inner.messages.borrow().is_empty()
}
}
impl<T: 'static> ChannelDrain for ChannelHandle<T> {
fn drain(&self) {
let rx = self.inner.rx.borrow();
let mut messages = self.inner.messages.borrow_mut();
loop {
match rx.try_recv() {
Ok(msg) => messages.push(msg),
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => break,
}
}
}
fn clear(&self) {
self.inner.messages.borrow_mut().clear();
}
fn has_messages(&self) -> bool {
!self.inner.messages.borrow().is_empty()
}
}
pub struct PortHandle<In, Out> {
pub rx: ChannelHandle<In>,
outbound_tx: mpsc::Sender<Out>,
outbound_rx: Rc<RefCell<Option<mpsc::Receiver<Out>>>>,
}
impl<In, Out> Clone for PortHandle<In, Out> {
fn clone(&self) -> Self {
Self {
rx: self.rx.clone(),
outbound_tx: self.outbound_tx.clone(),
outbound_rx: Rc::clone(&self.outbound_rx),
}
}
}
impl<In: 'static, Out: 'static> PortHandle<In, Out> {
pub fn new(wake: Arc<AtomicBool>) -> Self {
let (outbound_tx, outbound_rx) = mpsc::channel();
Self {
rx: ChannelHandle::new(wake),
outbound_tx,
outbound_rx: Rc::new(RefCell::new(Some(outbound_rx))),
}
}
pub fn tx(&self) -> mpsc::Sender<Out> {
self.outbound_tx.clone()
}
pub fn take_outbound_rx(&self) -> Option<mpsc::Receiver<Out>> {
self.outbound_rx.borrow_mut().take()
}
}
pub(crate) struct IntervalHandle {
inner: Rc<IntervalInner>,
}
struct IntervalInner {
rx: RefCell<mpsc::Receiver<()>>,
callback: RefCell<Option<Rc<dyn Fn()>>>,
ticked: RefCell<bool>,
}
impl Clone for IntervalHandle {
fn clone(&self) -> Self {
Self {
inner: Rc::clone(&self.inner),
}
}
}
impl IntervalHandle {
pub(crate) fn new(duration: Duration, callback: Rc<dyn Fn()>, wake: Arc<AtomicBool>) -> Self {
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || loop {
std::thread::sleep(duration);
if tx.send(()).is_err() {
break; }
wake.store(true, Ordering::Release);
});
Self {
inner: Rc::new(IntervalInner {
rx: RefCell::new(rx),
callback: RefCell::new(Some(callback)),
ticked: RefCell::new(false),
}),
}
}
}
impl ChannelDrain for IntervalHandle {
fn drain(&self) {
let rx = self.inner.rx.borrow();
let mut ticked = false;
loop {
match rx.try_recv() {
Ok(()) => ticked = true,
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => break,
}
}
if ticked {
*self.inner.ticked.borrow_mut() = true;
if let Some(cb) = self.inner.callback.borrow().as_ref() {
cb();
}
}
}
fn clear(&self) {
*self.inner.ticked.borrow_mut() = false;
}
fn has_messages(&self) -> bool {
*self.inner.ticked.borrow()
}
}