use alloc::boxed::Box;
use alloc::string::String;
use core::any::{Any, TypeId};
use hashbrown::HashMap;
use spin::Mutex;
use crate::channel::{self, Publisher, Subscribable, Subscriber};
use crate::pod::Pod;
struct TopicSlot {
type_id: TypeId,
type_name: &'static str,
inner: Box<dyn Any + Send + Sync>,
}
struct TypedEntry<T: Pod> {
subscribable: Subscribable<T>,
publisher: Option<Publisher<T>>,
}
unsafe impl<T: Pod> Send for TypedEntry<T> {}
unsafe impl<T: Pod> Sync for TypedEntry<T> {}
pub struct TypedBus {
topics: Mutex<HashMap<String, TopicSlot>>,
default_capacity: usize,
}
impl TypedBus {
pub fn new(capacity: usize) -> Self {
TypedBus {
topics: Mutex::new(HashMap::new()),
default_capacity: capacity,
}
}
pub fn publisher<T: Pod>(&self, topic: &str) -> Publisher<T> {
let mut topics = self.topics.lock();
let slot = topics
.entry_ref(topic)
.or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
let entry = Self::downcast_mut::<T>(slot, topic);
entry
.publisher
.take()
.unwrap_or_else(|| panic!("publisher already taken for topic '{topic}'"))
}
pub fn try_publisher<T: Pod>(&self, topic: &str) -> Option<Publisher<T>> {
let mut topics = self.topics.lock();
let slot = topics
.entry_ref(topic)
.or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
let entry = Self::downcast_mut::<T>(slot, topic);
entry.publisher.take()
}
pub fn subscribe<T: Pod>(&self, topic: &str) -> Subscriber<T> {
let mut topics = self.topics.lock();
let slot = topics
.entry_ref(topic)
.or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
let entry = Self::downcast_mut::<T>(slot, topic);
entry.subscribable.subscribe()
}
pub fn subscribable<T: Pod>(&self, topic: &str) -> Subscribable<T> {
let mut topics = self.topics.lock();
let slot = topics
.entry_ref(topic)
.or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
let entry = Self::downcast_mut::<T>(slot, topic);
entry.subscribable.clone()
}
fn downcast_mut<'a, T: Pod>(slot: &'a mut TopicSlot, topic: &str) -> &'a mut TypedEntry<T> {
let requested = TypeId::of::<T>();
if slot.type_id != requested {
panic!(
"topic '{topic}' exists with type '{}', cannot access as '{}'",
slot.type_name,
core::any::type_name::<T>(),
);
}
slot.inner
.downcast_mut::<TypedEntry<T>>()
.expect("TypeId matched but downcast failed (this is a bug)")
}
fn make_slot<T: Pod>(capacity: usize) -> TopicSlot {
let (pub_, sub_) = channel::channel::<T>(capacity);
TopicSlot {
type_id: TypeId::of::<T>(),
type_name: core::any::type_name::<T>(),
inner: Box::new(TypedEntry {
subscribable: sub_,
publisher: Some(pub_),
}),
}
}
}