1use alloc::boxed::Box;
5use alloc::string::{String, ToString};
6use core::any::{Any, TypeId};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10use crate::channel::{self, Publisher, Subscribable, Subscriber};
11use crate::pod::Pod;
12
13struct TopicSlot {
16 type_id: TypeId,
17 type_name: &'static str,
18 inner: Box<dyn Any + Send + Sync>,
19}
20
21struct TypedEntry<T: Pod> {
23 subscribable: Subscribable<T>,
24 publisher: Option<Publisher<T>>,
25}
26
27unsafe impl<T: Pod> Send for TypedEntry<T> {}
32unsafe impl<T: Pod> Sync for TypedEntry<T> {}
33
34pub struct TypedBus {
61 topics: Mutex<HashMap<String, TopicSlot>>,
62 default_capacity: usize,
63}
64
65impl TypedBus {
66 pub fn new(capacity: usize) -> Self {
68 TypedBus {
69 topics: Mutex::new(HashMap::new()),
70 default_capacity: capacity,
71 }
72 }
73
74 pub fn publisher<T: Pod>(&self, topic: &str) -> Publisher<T> {
81 let mut topics = self.topics.lock();
82 if !topics.contains_key(topic) {
83 topics.insert(
84 topic.to_string(),
85 Self::make_slot::<T>(self.default_capacity),
86 );
87 }
88 let slot = topics.get_mut(topic).unwrap();
89 let entry = Self::downcast_mut::<T>(slot, topic);
90 entry
91 .publisher
92 .take()
93 .unwrap_or_else(|| panic!("publisher already taken for topic '{topic}'"))
94 }
95
96 pub fn try_publisher<T: Pod>(&self, topic: &str) -> Option<Publisher<T>> {
103 let mut topics = self.topics.lock();
104 if !topics.contains_key(topic) {
105 topics.insert(
106 topic.to_string(),
107 Self::make_slot::<T>(self.default_capacity),
108 );
109 }
110 let slot = topics.get_mut(topic).unwrap();
111 let entry = Self::downcast_mut::<T>(slot, topic);
112 entry.publisher.take()
113 }
114
115 pub fn subscribe<T: Pod>(&self, topic: &str) -> Subscriber<T> {
121 let mut topics = self.topics.lock();
122 if !topics.contains_key(topic) {
123 topics.insert(
124 topic.to_string(),
125 Self::make_slot::<T>(self.default_capacity),
126 );
127 }
128 let slot = topics.get_mut(topic).unwrap();
129 let entry = Self::downcast_mut::<T>(slot, topic);
130 entry.subscribable.subscribe()
131 }
132
133 pub fn subscribable<T: Pod>(&self, topic: &str) -> Subscribable<T> {
139 let mut topics = self.topics.lock();
140 if !topics.contains_key(topic) {
141 topics.insert(
142 topic.to_string(),
143 Self::make_slot::<T>(self.default_capacity),
144 );
145 }
146 let slot = topics.get_mut(topic).unwrap();
147 let entry = Self::downcast_mut::<T>(slot, topic);
148 entry.subscribable.clone()
149 }
150
151 fn downcast_mut<'a, T: Pod>(slot: &'a mut TopicSlot, topic: &str) -> &'a mut TypedEntry<T> {
154 let requested = TypeId::of::<T>();
155 if slot.type_id != requested {
156 panic!(
157 "topic '{topic}' exists with type '{}', cannot access as '{}'",
158 slot.type_name,
159 core::any::type_name::<T>(),
160 );
161 }
162 slot.inner
163 .downcast_mut::<TypedEntry<T>>()
164 .expect("TypeId matched but downcast failed (this is a bug)")
165 }
166
167 fn make_slot<T: Pod>(capacity: usize) -> TopicSlot {
168 let (pub_, sub_) = channel::channel::<T>(capacity);
169 TopicSlot {
170 type_id: TypeId::of::<T>(),
171 type_name: core::any::type_name::<T>(),
172 inner: Box::new(TypedEntry {
173 subscribable: sub_,
174 publisher: Some(pub_),
175 }),
176 }
177 }
178}