1use alloc::boxed::Box;
5use alloc::string::{String, ToString};
6use core::any::{Any, TypeId};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10use crate::channel::{self, Publisher, Subscribable, Subscriber};
11
12struct TopicSlot {
15 type_id: TypeId,
16 type_name: &'static str,
17 inner: Box<dyn Any + Send + Sync>,
18}
19
20struct TypedEntry<T: Copy + Send + 'static> {
22 subscribable: Subscribable<T>,
23 publisher: Option<Publisher<T>>,
24}
25
26unsafe impl<T: Copy + Send + 'static> Send for TypedEntry<T> {}
31unsafe impl<T: Copy + Send + 'static> Sync for TypedEntry<T> {}
32
33pub struct TypedBus {
60 topics: Mutex<HashMap<String, TopicSlot>>,
61 default_capacity: usize,
62}
63
64impl TypedBus {
65 pub fn new(capacity: usize) -> Self {
67 TypedBus {
68 topics: Mutex::new(HashMap::new()),
69 default_capacity: capacity,
70 }
71 }
72
73 pub fn publisher<T: Copy + Send + 'static>(&self, topic: &str) -> Publisher<T> {
80 let mut topics = self.topics.lock();
81 if !topics.contains_key(topic) {
82 topics.insert(
83 topic.to_string(),
84 Self::make_slot::<T>(self.default_capacity),
85 );
86 }
87 let slot = topics.get_mut(topic).unwrap();
88 let entry = Self::downcast_mut::<T>(slot, topic);
89 entry
90 .publisher
91 .take()
92 .unwrap_or_else(|| panic!("publisher already taken for topic '{topic}'"))
93 }
94
95 pub fn subscribe<T: Copy + Send + 'static>(&self, topic: &str) -> Subscriber<T> {
101 let mut topics = self.topics.lock();
102 if !topics.contains_key(topic) {
103 topics.insert(
104 topic.to_string(),
105 Self::make_slot::<T>(self.default_capacity),
106 );
107 }
108 let slot = topics.get_mut(topic).unwrap();
109 let entry = Self::downcast_mut::<T>(slot, topic);
110 entry.subscribable.subscribe()
111 }
112
113 pub fn subscribable<T: Copy + Send + 'static>(&self, topic: &str) -> Subscribable<T> {
119 let mut topics = self.topics.lock();
120 if !topics.contains_key(topic) {
121 topics.insert(
122 topic.to_string(),
123 Self::make_slot::<T>(self.default_capacity),
124 );
125 }
126 let slot = topics.get_mut(topic).unwrap();
127 let entry = Self::downcast_mut::<T>(slot, topic);
128 entry.subscribable.clone()
129 }
130
131 fn downcast_mut<'a, T: Copy + Send + 'static>(
134 slot: &'a mut TopicSlot,
135 topic: &str,
136 ) -> &'a mut TypedEntry<T> {
137 let requested = TypeId::of::<T>();
138 if slot.type_id != requested {
139 panic!(
140 "topic '{topic}' exists with type '{}', cannot access as '{}'",
141 slot.type_name,
142 core::any::type_name::<T>(),
143 );
144 }
145 slot.inner
146 .downcast_mut::<TypedEntry<T>>()
147 .expect("TypeId matched but downcast failed (this is a bug)")
148 }
149
150 fn make_slot<T: Copy + Send + 'static>(capacity: usize) -> TopicSlot {
151 let (pub_, sub_) = channel::channel::<T>(capacity);
152 TopicSlot {
153 type_id: TypeId::of::<T>(),
154 type_name: core::any::type_name::<T>(),
155 inner: Box::new(TypedEntry {
156 subscribable: sub_,
157 publisher: Some(pub_),
158 }),
159 }
160 }
161}