#![warn(missing_docs)]
#![cfg_attr(feature = "cargo-clippy", allow(clippy::style))]
mod waker;
use core::{fmt, task};
use core::pin::Pin;
use core::future::Future;
use core::hash::Hash;
use core::mem::ManuallyDrop;
use std::sync::mpsc;
use std::sync::Arc;
use std::collections::{HashMap, hash_map};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum SendErrorKind {
Closed
}
impl SendErrorKind {
pub const fn is_closed(&self) -> bool {
match self {
SendErrorKind::Closed => true,
}
}
}
pub struct SendError<T> {
pub kind: SendErrorKind,
pub message: T
}
impl<T> fmt::Debug for SendError<T> {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.kind, fmt)
}
}
impl<T> fmt::Display for SendError<T> {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.kind, fmt)
}
}
impl<T> std::error::Error for SendError<T> {}
pub trait Sender<T: Send> {
fn try_send(&self, value: T) -> Result<(), SendError<T>>;
}
impl<T: Send> Sender<T> for mpsc::Sender<T> {
#[inline]
fn try_send(&self, value: T) -> Result<(), SendError<T>> {
match mpsc::Sender::send(self, value) {
Ok(()) => Ok(()),
Err(error) => Err(SendError {
kind: SendErrorKind::Closed,
message: error.0
})
}
}
}
enum Message<K: PartialEq + Eq, T: Send, S: Sender<T>> {
Subscribe(K, S),
Unsubscribe(K),
Msg(K, T)
}
struct State {
waker: waker::AtomicWaker,
}
impl State {
fn new() -> Self {
Self {
waker: waker::AtomicWaker::new(),
}
}
}
pub struct Cancelled;
impl fmt::Debug for Cancelled {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, fmt)
}
}
impl fmt::Display for Cancelled {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.write_str("Cancelled")
}
}
impl std::error::Error for Cancelled {}
#[must_use = "You must run Registry task"]
pub struct Registry<K: PartialEq + Eq, T: Send, S: Sender<T>> {
state: Arc<State>,
registry: HashMap<K, S>,
recv: mpsc::Receiver<Message<K, T, S>>
}
impl<K: PartialEq + Eq + Hash, T: Send, S: Sender<T>> Registry<K, T, S> {
#[inline(always)]
fn new(state: Arc<State>, recv: mpsc::Receiver<Message<K, T, S>>) -> Self {
Self {
state,
registry: HashMap::new(),
recv,
}
}
pub fn run(&mut self) -> Cancelled {
let waker = waker::thread::waker(std::thread::current());
loop {
match self.process(&waker) {
task::Poll::Ready(error) => break error,
task::Poll::Pending => std::thread::park(),
}
}
}
fn process(&mut self, waker: &task::Waker) -> task::Poll<Cancelled> {
loop {
match self.recv.try_recv() {
Ok(message) => match message {
Message::Subscribe(key, channel) => {
self.registry.insert(key, channel);
continue
}
Message::Unsubscribe(key) => {
self.registry.remove(&key);
continue
}
Message::Msg(key, message) => match self.registry.entry(key) {
hash_map::Entry::Occupied(entry) => match entry.get().try_send(message) {
Ok(()) => continue,
Err(error) => match error.kind {
SendErrorKind::Closed => {
entry.remove();
},
}
},
hash_map::Entry::Vacant(_) => continue,
}
},
Err(mpsc::TryRecvError::Disconnected) => break task::Poll::Ready(Cancelled),
Err(mpsc::TryRecvError::Empty) => {
self.state.waker.register_ref(waker);
break task::Poll::Pending;
}
}
}
}
}
impl<K: PartialEq + Eq + Hash + Unpin, T: Send, S: Sender<T> + Unpin> Future for Registry<K, T, S> {
type Output = Cancelled;
#[inline(always)]
fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let waker = ctx.waker();
self.get_mut().process(waker)
}
}
pub struct Channel<K: PartialEq + Eq + Hash, T: Send, S: Sender<T>> {
state: Arc<State>,
channel: ManuallyDrop<mpsc::Sender<Message<K, T, S>>>,
}
impl<K: PartialEq + Eq + Hash, T: Send, S: Sender<T>> Channel<K, T, S> {
fn send(&self, msg: Message<K, T, S>) -> Result<(), Cancelled> {
match self.channel.send(msg) {
Ok(()) => {
self.state.waker.wake();
Ok(())
},
Err(_) => Err(Cancelled)
}
}
#[inline(always)]
pub fn subscribe(&self, key: K, channel: S) -> Result<(), Cancelled> {
self.send(Message::Subscribe(key, channel))
}
#[inline(always)]
pub fn unsubscribe(&self, key: K) -> Result<(), Cancelled> {
self.send(Message::Unsubscribe(key))
}
#[inline(always)]
pub fn send_to(&self, key: K, msg: T) -> Result<(), Cancelled> {
self.send(Message::Msg(key, msg))
}
}
impl<K: PartialEq + Eq + Hash, T: Send, S: Sender<T>> Clone for Channel<K, T, S> {
#[inline(always)]
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
channel: self.channel.clone(),
}
}
}
impl<K: PartialEq + Eq + Hash, T: Send, S: Sender<T>> Drop for Channel<K, T, S> {
#[inline(always)]
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.channel)
}
if Arc::strong_count(&self.state) <= 2 {
self.state.waker.wake();
}
}
}
pub fn registry<K: PartialEq + Eq + Hash, T: Send, S: Sender<T>>() -> (Channel<K, T, S>, Registry<K, T, S>) {
let (channel, recv) = mpsc::channel();
let state = Arc::new(State::new());
let chan = Channel {
channel: ManuallyDrop::new(channel),
state: state.clone(),
};
(chan, Registry::new(state, recv))
}