#![cfg_attr(not(test), no_std)]
use array_macro::array;
use core::marker::PhantomData;
use core::sync::atomic::{AtomicU32, Ordering};
use core::fmt::Debug;
use spms_ring::{ReadToken, SpmsRing};
pub type PublisherId = u32;
pub const ANY_PUBLISHER: PublisherId = 0;
pub const MAX_PUBLISHERS_PER_TOPIC: u32 = 4;
pub trait TopicMeta {
type MsgType;
const TOPIC: &'static str;
}
#[derive(Copy, Clone, Hash, Eq, PartialEq, PartialOrd)]
pub struct Advertisement<M>
where
M: TopicMeta + Copy,
{
pub advertiser_id: PublisherId,
meta: PhantomData<*const M>,
}
impl<M> Advertisement<M>
where
M: TopicMeta + Copy,
{
fn new(instance: PublisherId) -> Self {
Self {
advertiser_id: instance,
meta: PhantomData,
}
}
}
pub struct Subscription<M>
where
M: TopicMeta + Copy,
{
pub(crate) advert: Advertisement<M>,
pub(crate) read_token: ReadToken,
}
type DefaultQueueSize = generic_array::typenum::U16;
pub struct Broker<M>
where
M: TopicMeta + Default + Copy,
{
advertiser_count: AtomicU32,
topic_queues:
[SpmsRing<<M as TopicMeta>::MsgType, DefaultQueueSize>; MAX_PUBLISHERS_PER_TOPIC as usize],
}
impl<M> Broker<M>
where
M: TopicMeta + Default + Copy,
<M as TopicMeta>::MsgType: Default + Copy + Debug,
{
pub fn new() -> Self {
Self {
advertiser_count: AtomicU32::new(0),
topic_queues: array![SpmsRing::default(); MAX_PUBLISHERS_PER_TOPIC as usize],
}
}
pub fn subscribe(&mut self, instance: PublisherId) -> Option<Subscription<M>> {
if instance != ANY_PUBLISHER && instance >= self.advertiser_count.load(Ordering::SeqCst) {
return None;
}
let advert = Advertisement::new(instance);
Some(Subscription {
advert,
read_token: Default::default(),
})
}
pub fn advertise(&mut self) -> Option<Advertisement<M>> {
let publisher_id = self.advertiser_count.fetch_add(1, Ordering::SeqCst);
if publisher_id < MAX_PUBLISHERS_PER_TOPIC {
let advert = Advertisement::new(publisher_id);
Some(advert)
} else {
self.advertiser_count.fetch_sub(1, Ordering::SeqCst);
None
}
}
pub fn publish(&mut self, advert: &Advertisement<M>, msg: &M::MsgType) {
if advert.advertiser_id < self.advertiser_count.load(Ordering::Relaxed) {
self.topic_queues[advert.advertiser_id as usize].publish(msg)
}
}
pub fn pub_registered(&self, instance: PublisherId) -> bool {
instance < self.advertiser_count.load(Ordering::Relaxed)
}
pub fn group_count(&self) -> u32 {
self.advertiser_count.load(Ordering::Relaxed)
}
pub fn poll(&self, sub: &mut Subscription<M>) -> nb::Result<M::MsgType, ()> {
if sub.advert.advertiser_id < self.advertiser_count.load(Ordering::Relaxed) {
return self.topic_queues[sub.advert.advertiser_id as usize]
.read_next(&mut sub.read_token);
}
Err(nb::Error::Other(()))
}
}
#[cfg(test)]
mod tests {
use super::{Broker, TopicMeta, ANY_PUBLISHER};
#[derive(Default, Copy, Clone, Debug)]
struct Point {
x: u32,
y: u32,
}
#[derive(Default, Copy, Clone, Debug)]
struct RoverLocation {}
impl TopicMeta for RoverLocation {
type MsgType = Point;
const TOPIC: &'static str = "rover";
}
#[derive(Default, Copy, Clone, Debug)]
struct HomeLocation {}
impl TopicMeta for HomeLocation {
type MsgType = Point;
const TOPIC: &'static str = "home";
}
#[test]
fn setup_pubsub() {
let mut broker: Broker<HomeLocation> = Broker::new();
let adv = broker.advertise().unwrap();
assert!(broker.pub_registered(0));
let mut sb = broker.subscribe(ANY_PUBLISHER).unwrap();
for i in 0..5 {
let msg = Point { x: i, y: i };
broker.publish(&adv, &msg);
let next_msg_r = broker.poll(&mut sb);
assert!(next_msg_r.is_ok());
let next_msg = next_msg_r.unwrap();
assert_eq!(next_msg.x, i);
assert_eq!(next_msg.y, i);
}
}
#[test]
fn two_queues_same_inner_type_diff_topics() {
let mut broker1: Broker<HomeLocation> = Broker::new();
let adv1 = broker1.advertise().unwrap();
let mut broker2: Broker<RoverLocation> = Broker::new();
let mut sb2 = broker2.subscribe(ANY_PUBLISHER).unwrap();
for i in 0..5 {
let msg = Point { x: i, y: i };
broker1.publish(&adv1, &msg);
let next_msg_r = broker2.poll(&mut sb2);
assert!(next_msg_r.is_err());
}
}
#[test]
fn two_queues_same_topic() {
let mut broker: Broker<HomeLocation> = Broker::new();
let adv1 = broker.advertise().unwrap();
let adv2 = broker.advertise().unwrap();
assert!(broker.pub_registered(0));
assert!(broker.pub_registered(1));
let mut sb2 = broker.subscribe(adv2.advertiser_id).unwrap();
for i in 0..5 {
let msg = Point { x: i, y: i };
broker.publish(&adv1, &msg);
let next_msg_r = broker.poll(&mut sb2);
assert!(next_msg_r.is_err());
broker.publish(&adv2, &msg);
let next_msg_r = broker.poll(&mut sb2);
assert!(next_msg_r.is_ok());
}
}
}