use crate::channel::{Receiver, Sender, SyncSend, channel};
use anyhow::Result;
use dashmap::DashMap;
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
hash::Hash,
};
use uuid::Uuid;
pub mod channel;
enum CommandMessage<M: Message> {
Publish(M),
Direct {
target: Address,
message: M,
},
Subscribe(Address, M::Topic),
Unsubscribe(Address, M::Topic),
Unregister(Address),
}
pub trait Message: Clone + Debug + Send + Sync + 'static {
type Topic: Topic;
fn topics(&self) -> &'static [Self::Topic];
}
pub trait Topic: Eq + Hash + Debug + Send + Sync + 'static {}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct Address(pub Uuid);
#[derive(Debug)]
pub struct Letterbox<M: Message> {
sender: Sender<M>,
receiver: Receiver<M>,
address: Option<Address>,
post_tx: Option<Sender<CommandMessage<M>>>,
contacts: HashMap<String, Sender<M>>,
}
impl<M: Message> Letterbox<M> {
pub fn new() -> Self {
let (sender, receiver) = channel();
Self {
sender,
receiver,
address: None,
post_tx: None,
contacts: HashMap::new(),
}
}
pub fn post(&mut self, message: M) -> Result<()> {
self.post_tx
.as_ref()
.expect("Cannot post anything using an unregistered letterbox")
.send_sync(CommandMessage::Publish(message))?;
Ok(())
}
pub fn post_to(&mut self, address: Address, message: M) -> Result<()> {
self.post_tx
.as_ref()
.expect("Cannot post anything using an unregistered letterbox")
.send_sync(CommandMessage::Direct {
target: address,
message,
})?;
Ok(())
}
pub fn recv_all(&mut self) -> Vec<M> {
let mut messages = Vec::new();
while let Ok(message) = self.receiver.try_recv() {
messages.push(message);
}
messages
}
pub async fn recv(&mut self) -> Option<M> {
#[cfg(feature = "tokio")]
return self.receiver.recv().await;
#[cfg(all(not(feature = "tokio"), feature = "async-std"))]
return self.receiver.recv().await.ok();
}
pub fn recv_now(&mut self) -> Option<M> {
self.receiver.try_recv().ok()
}
pub fn recv_many(&mut self, limit: usize) -> Vec<M> {
let mut out = Vec::with_capacity(limit);
for _ in 0..limit {
match self.receiver.try_recv() {
Ok(msg) => out.push(msg),
Err(_) => break,
}
}
out
}
pub fn subscribe(&mut self, topic: M::Topic) -> Result<()> {
let (post_tx, address) = self
.post_tx
.as_ref()
.zip(self.address.clone())
.expect("Cannot subscribe using an unregistered letterbox");
post_tx.send_sync(CommandMessage::Subscribe(address, topic))?;
Ok(())
}
pub fn unsubscribe(&mut self, topic: M::Topic) -> Result<()> {
let (post_tx, address) = self
.post_tx
.as_ref()
.zip(self.address.clone())
.expect("Cannot unsubscribe using an unregistered letterbox");
post_tx.send_sync(CommandMessage::Unsubscribe(address, topic))?;
Ok(())
}
pub fn unregister(&self) -> Result<()> {
self.post_tx
.as_ref()
.expect("Cannot unregister a letterbox if it hasn't been registered yet")
.send_sync(CommandMessage::Unregister(self.address.clone().unwrap()))?;
Ok(())
}
pub fn add_contact(&mut self, letterbox: &Letterbox<M>, alias: String) {
self.contacts.insert(alias, letterbox.sender.clone());
}
pub fn deliver_to(&self, alias: &str, message: M) -> Result<()> {
let sender = self
.contacts
.get(alias)
.ok_or_else(|| anyhow::anyhow!("Contact not found for alias: {}", alias))?;
sender.send_sync(message)?;
Ok(())
}
}
impl<M: Message> Default for Letterbox<M> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct Postoffice<M: Message> {
subscriptions: DashMap<M::Topic, HashSet<Address>>,
registry: DashMap<Address, Sender<M>>,
post_tx: Sender<CommandMessage<M>>,
post_rx: Receiver<CommandMessage<M>>,
}
impl<M: Message> Postoffice<M> {
pub fn new() -> Self {
let (post_tx, post_rx) = channel();
Self {
subscriptions: DashMap::new(),
registry: DashMap::new(),
post_tx,
post_rx,
}
}
pub fn register(&self, mailbox: &mut Letterbox<M>) {
let address = Address(Uuid::new_v4());
self.registry
.insert(address.clone(), mailbox.sender.clone());
mailbox.address = Some(address);
mailbox.post_tx = Some(self.post_tx.clone());
}
pub fn tick(&mut self) {
while let Ok(cmd_message) = self.post_rx.try_recv() {
self.handle_command(cmd_message);
}
}
pub async fn tick_async(&mut self) {
#[cfg(feature = "tokio")]
{
while let Some(cmd_message) = self.post_rx.recv().await {
self.handle_command(cmd_message);
}
}
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
{
while let Ok(cmd_message) = self.post_rx.recv().await {
self.handle_command(cmd_message);
}
}
}
fn handle_command(&mut self, cmd_message: CommandMessage<M>) {
match cmd_message {
CommandMessage::Publish(message) => {
for topic in message.topics() {
if let Some(addresses) = self.subscriptions.get(topic) {
for address in addresses.iter() {
if let Some(sender) = self.registry.get(address) {
let _ = sender.send_sync(message.clone());
}
}
}
}
}
CommandMessage::Direct { target, message } => {
if let Some(sender) = self.registry.get(&target) {
let _ = sender.send_sync(message);
}
}
CommandMessage::Subscribe(address, d) => {
self.subscriptions
.entry(d)
.or_default()
.value_mut()
.insert(address);
}
CommandMessage::Unsubscribe(address, d) => {
if let Some(mut subscribers) = self.subscriptions.get_mut(&d) {
subscribers.value_mut().remove(&address);
}
}
CommandMessage::Unregister(address) => {
self.registry.remove(&address);
self.subscriptions.retain(|_, set| {
set.remove(&address);
!set.is_empty()
});
}
}
}
}
impl<M: Message> Default for Postoffice<M> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum TestTopic {
Alpha,
Beta,
Gamma,
}
impl Topic for TestTopic {}
static TOPIC_COMBOS: &[&[TestTopic]] = &[
&[TestTopic::Alpha],
&[TestTopic::Beta],
&[TestTopic::Alpha, TestTopic::Beta],
&[TestTopic::Alpha, TestTopic::Gamma],
&[TestTopic::Alpha, TestTopic::Beta, TestTopic::Gamma],
];
#[derive(Debug, Clone, PartialEq)]
pub struct TestMessage {
pub topics: &'static [TestTopic],
pub payload: u64,
}
impl Message for TestMessage {
type Topic = TestTopic;
fn topics(&self) -> &'static [Self::Topic] {
self.topics
}
}
proptest! {
#[test]
fn test_hub_pubsub_intersection(
combo_idx in 0..TOPIC_COMBOS.len(),
payload in any::<u64>(),
sub_alpha in any::<bool>(),
sub_beta in any::<bool>(),
sub_gamma in any::<bool>(),
) {
let mut hub = Postoffice::new();
let mut publisher = Letterbox::new();
let mut subscriber = Letterbox::new();
hub.register(&mut publisher);
hub.register(&mut subscriber);
let selected_topics = TOPIC_COMBOS[combo_idx];
if sub_alpha { subscriber.subscribe(TestTopic::Alpha).unwrap(); }
if sub_beta { subscriber.subscribe(TestTopic::Beta).unwrap(); }
if sub_gamma { subscriber.subscribe(TestTopic::Gamma).unwrap(); }
hub.tick();
let msg = TestMessage { topics: selected_topics, payload };
publisher.post(msg.clone()).unwrap();
hub.tick();
let mut expected_copies = 0;
for topic in selected_topics {
match topic {
TestTopic::Alpha if sub_alpha => expected_copies += 1,
TestTopic::Beta if sub_beta => expected_copies += 1,
TestTopic::Gamma if sub_gamma => expected_copies += 1,
_ => {}
}
}
let received = subscriber.recv_all();
prop_assert_eq!(received.len(), expected_copies);
for fetched_msg in received {
prop_assert_eq!(fetched_msg, msg.clone());
}
}
#[test]
fn test_targeted_and_peer_routing(
payload_hub in any::<u64>(),
payload_p2p in any::<u64>(),
alias in "[a-zA-Z0-9_]{1,15}"
) {
let mut hub = Postoffice::new();
let mut node_a = Letterbox::new();
let mut node_b = Letterbox::new();
hub.register(&mut node_a);
hub.register(&mut node_b);
let target_addr = node_b.address.clone().unwrap();
let hub_msg = TestMessage { topics: TOPIC_COMBOS[0], payload: payload_hub };
node_a.post_to(target_addr, hub_msg.clone()).unwrap();
hub.tick();
node_a.add_contact(&node_b, alias.clone());
let p2p_msg = TestMessage { topics: TOPIC_COMBOS[0], payload: payload_p2p };
node_a.deliver_to(&alias, p2p_msg.clone()).unwrap();
let received = node_b.recv_all();
prop_assert_eq!(received.len(), 2);
prop_assert_eq!(received[0].clone(), hub_msg);
prop_assert_eq!(received[1].clone(), p2p_msg);
prop_assert!(node_a.recv_all().is_empty());
}
#[test]
fn test_lifecycle_eviction(payload in any::<u64>()) {
let mut hub = Postoffice::new();
let mut publisher = Letterbox::new();
let mut sub_unsub = Letterbox::new();
let mut sub_unreg = Letterbox::new();
hub.register(&mut publisher);
hub.register(&mut sub_unsub);
hub.register(&mut sub_unreg);
sub_unsub.subscribe(TestTopic::Alpha).unwrap();
sub_unreg.subscribe(TestTopic::Alpha).unwrap();
hub.tick();
sub_unsub.unsubscribe(TestTopic::Alpha).unwrap();
sub_unreg.unregister().unwrap();
hub.tick();
let msg = TestMessage { topics: TOPIC_COMBOS[0], payload };
publisher.post(msg).unwrap();
hub.tick();
prop_assert!(sub_unsub.recv_all().is_empty());
prop_assert!(sub_unreg.recv_all().is_empty());
}
#[test]
fn test_mailbox_consumption_boundaries(
payloads in prop::collection::vec(any::<u64>(), 5..20),
limit in 1..4usize
) {
let mut hub = Postoffice::new();
let mut publisher = Letterbox::new();
let mut subscriber = Letterbox::new();
hub.register(&mut publisher);
hub.register(&mut subscriber);
subscriber.subscribe(TestTopic::Beta).unwrap();
hub.tick();
let total_messages = payloads.len();
for p in payloads {
publisher.post(TestMessage { topics: TOPIC_COMBOS[1], payload: p }).unwrap();
}
hub.tick();
let batch = subscriber.recv_many(limit);
prop_assert_eq!(batch.len(), limit);
let single = subscriber.recv_now();
prop_assert!(single.is_some());
let remaining = subscriber.recv_all();
prop_assert_eq!(remaining.len(), total_messages - limit - 1);
}
}
}