use anyhow::Result;
use papaya::{HashMap, HashSet};
use std::{fmt::Debug, hash::Hash};
use tokio::sync::mpsc::{self, Receiver, Sender};
enum CommandMessage<A: Address, M: Message> {
Publish(M),
Subscribe(A, M::Topic),
Unsubscribe(A, M::Topic),
Unregister(A),
}
pub trait Message: Clone + Debug + Send + Sync + 'static {
type Topic: Topic;
fn topics(&self) -> &'static [Self::Topic];
}
pub trait Topic: Eq + Hash + Debug + Send + Sync + 'static {}
pub trait Address: Clone + Debug + Hash + Eq + PartialEq + Send + Sync + 'static {}
impl<T> Address for T where T: Clone + Debug + Hash + Eq + PartialEq + Send + Sync + 'static {}
#[derive(Debug)]
pub struct Letterbox<A: Address, M: Message> {
sender: mpsc::Sender<M>,
receiver: mpsc::Receiver<M>,
address: Option<A>,
post_tx: Option<Sender<CommandMessage<A, M>>>,
contacts: HashMap<A, mpsc::Sender<M>>,
}
impl<A: Address, M: Message> Letterbox<A, M> {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(32);
Self {
sender,
receiver,
address: None,
post_tx: None,
contacts: HashMap::new(),
}
}
pub fn post(&mut self, message: M) -> Result<()> {
self.post_tx
.as_ref()
.expect("Cannot post anything using an unregistered letterbox")
.try_send(CommandMessage::Publish(message))?;
Ok(())
}
pub fn recv_all(&mut self) -> Vec<M> {
let mut messages = Vec::new();
while let Ok(message) = self.receiver.try_recv() {
messages.push(message);
}
messages
}
pub async fn recv(&mut self) -> Option<M> {
return self.receiver.recv().await;
}
pub fn recv_now(&mut self) -> Option<M> {
self.receiver.try_recv().ok()
}
pub fn recv_many(&mut self, limit: usize) -> Vec<M> {
let mut out = Vec::with_capacity(limit);
for _ in 0..limit {
match self.receiver.try_recv() {
Ok(msg) => out.push(msg),
Err(_) => break,
}
}
out
}
pub fn subscribe(&mut self, topic: M::Topic) -> Result<()> {
let (post_tx, address) = self
.post_tx
.as_ref()
.zip(self.address.clone())
.expect("Cannot subscribe using an unregistered letterbox");
post_tx.try_send(CommandMessage::Subscribe(address, topic))?;
Ok(())
}
pub fn unsubscribe(&mut self, topic: M::Topic) -> Result<()> {
let (post_tx, address) = self
.post_tx
.as_ref()
.zip(self.address.clone())
.expect("Cannot unsubscribe using an unregistered letterbox");
post_tx.try_send(CommandMessage::Unsubscribe(address, topic))?;
Ok(())
}
pub fn unregister(&self) -> Result<()> {
self.post_tx
.as_ref()
.expect("Cannot unregister a letterbox if it hasn't been registered yet")
.try_send(CommandMessage::Unregister(self.address.clone().unwrap()))?;
Ok(())
}
pub fn add_contact(&mut self, letterbox: &Letterbox<A, M>, address: A) {
self.contacts
.pin()
.insert(address, letterbox.sender.clone());
}
pub fn post_to(&self, address: A, message: M) -> Result<()> {
let contacts = self.contacts.pin();
let sender = contacts
.get(&address)
.ok_or_else(|| anyhow::anyhow!("Contact not found for alias: {:?}", address))?;
sender.try_send(message)?;
Ok(())
}
}
impl<A: Address, M: Message> Default for Letterbox<A, M> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct Postoffice<A: Address, M: Message> {
subscriptions: HashMap<M::Topic, HashSet<A>>,
registry: HashMap<A, Sender<M>>,
post_tx: Sender<CommandMessage<A, M>>,
post_rx: Receiver<CommandMessage<A, M>>,
}
impl<A: Address, M: Message> Postoffice<A, M> {
pub fn new() -> Self {
let (post_tx, post_rx) = mpsc::channel(128);
Self {
subscriptions: HashMap::new(),
registry: HashMap::new(),
post_tx,
post_rx,
}
}
pub fn register(&self, mailbox: &mut Letterbox<A, M>, address: &A) {
self.registry
.pin()
.insert(address.clone(), mailbox.sender.clone());
mailbox.address = Some(address.clone());
mailbox.post_tx = Some(self.post_tx.clone());
}
pub fn tick(&mut self) {
while let Ok(cmd_message) = self.post_rx.try_recv() {
self.handle_command(cmd_message);
}
}
pub async fn tick_async(&mut self) {
while let Some(cmd_message) = self.post_rx.recv().await {
self.handle_command(cmd_message);
}
}
fn handle_command(&mut self, cmd_message: CommandMessage<A, M>) {
match cmd_message {
CommandMessage::Publish(message) => {
for topic in message.topics() {
if let Some(addresses) = self.subscriptions.pin().get(topic) {
for address in addresses.pin().iter() {
if let Some(sender) = self.registry.pin().get(address) {
let _ = sender.try_send(message.clone());
}
}
}
}
}
CommandMessage::Subscribe(address, topic) => {
let subscriptions = self.subscriptions.pin();
let subscribers = subscriptions.get_or_insert_with(topic, HashSet::new);
subscribers.pin().insert(address);
}
CommandMessage::Unsubscribe(address, d) => {
if let Some(subscribers) = self.subscriptions.pin().get(&d) {
subscribers.pin().remove(&address);
}
}
CommandMessage::Unregister(address) => {
self.registry.pin().remove(&address);
self.subscriptions.pin().retain(|_, set| {
set.pin().remove(&address);
!set.is_empty()
});
}
}
}
}
impl<A: Address, M: Message> Default for Postoffice<A, M> {
fn default() -> Self {
Self::new()
}
}