photon_ring/typed_bus.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use alloc::boxed::Box;
5use alloc::string::String;
6use core::any::{Any, TypeId};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10use crate::channel::{self, Publisher, Subscribable, Subscriber};
11use crate::pod::Pod;
12
13/// Wrapper that erases the concrete `T` behind `dyn Any` while preserving
14/// the `TypeId` and a human-readable type name for diagnostics.
15struct TopicSlot {
16 type_id: TypeId,
17 type_name: &'static str,
18 inner: Box<dyn Any + Send + Sync>,
19}
20
21/// The concrete (generic) payload stored inside a [`TopicSlot`].
22struct TypedEntry<T: Pod> {
23 subscribable: Subscribable<T>,
24 publisher: Option<Publisher<T>>,
25}
26
27// Safety: `TypedEntry<T>` is `Send + Sync` when `T: Pod`
28// because `Subscribable<T>` is `Send + Sync` and `Publisher<T>` is `Send`.
29// The `Option` wrapper and the fact that the publisher is only accessed
30// under the outer `Mutex` make this safe.
31unsafe impl<T: Pod> Send for TypedEntry<T> {}
32unsafe impl<T: Pod> Sync for TypedEntry<T> {}
33
34/// A topic bus that supports different message types per topic.
35///
36/// Unlike [`Photon<T>`](crate::Photon) which requires a single message type
37/// across all topics, `TypedBus` allows each topic to have its own
38/// `T: Pod`.
39///
40/// # Example
41///
42/// ```
43/// use photon_ring::TypedBus;
44///
45/// let bus = TypedBus::new(1024);
46///
47/// // Different types per topic
48/// let mut price_pub = bus.publisher::<f64>("prices");
49/// let mut vol_pub = bus.publisher::<u32>("volumes");
50///
51/// let mut price_sub = bus.subscribe::<f64>("prices");
52/// let mut vol_sub = bus.subscribe::<u32>("volumes");
53///
54/// price_pub.publish(42.5);
55/// vol_pub.publish(1000);
56///
57/// assert_eq!(price_sub.try_recv(), Ok(42.5));
58/// assert_eq!(vol_sub.try_recv(), Ok(1000));
59/// ```
60pub struct TypedBus {
61 topics: Mutex<HashMap<String, TopicSlot>>,
62 default_capacity: usize,
63}
64
65impl TypedBus {
66 /// Create a bus. `capacity` is the ring size for each topic (>= 2).
67 pub fn new(capacity: usize) -> Self {
68 TypedBus {
69 topics: Mutex::new(HashMap::new()),
70 default_capacity: capacity,
71 }
72 }
73
74 /// Take the publisher for a topic. Creates the topic if it doesn't exist.
75 ///
76 /// # Panics
77 ///
78 /// - Panics if the topic already exists with a different type `T`.
79 /// - Panics if the publisher for this topic was already taken.
80 pub fn publisher<T: Pod>(&self, topic: &str) -> Publisher<T> {
81 let mut topics = self.topics.lock();
82 let slot = topics
83 .entry_ref(topic)
84 .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
85 let entry = Self::downcast_mut::<T>(slot, topic);
86 entry
87 .publisher
88 .take()
89 .unwrap_or_else(|| panic!("publisher already taken for topic '{topic}'"))
90 }
91
92 /// Try to take the publisher for a topic. Returns `None` if the
93 /// publisher was already taken.
94 ///
95 /// # Panics
96 ///
97 /// Panics if the topic already exists with a different type `T`.
98 pub fn try_publisher<T: Pod>(&self, topic: &str) -> Option<Publisher<T>> {
99 let mut topics = self.topics.lock();
100 let slot = topics
101 .entry_ref(topic)
102 .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
103 let entry = Self::downcast_mut::<T>(slot, topic);
104 entry.publisher.take()
105 }
106
107 /// Subscribe to a topic (future messages only). Creates the topic if needed.
108 ///
109 /// # Panics
110 ///
111 /// Panics if the topic already exists with a different type `T`.
112 pub fn subscribe<T: Pod>(&self, topic: &str) -> Subscriber<T> {
113 let mut topics = self.topics.lock();
114 let slot = topics
115 .entry_ref(topic)
116 .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
117 let entry = Self::downcast_mut::<T>(slot, topic);
118 entry.subscribable.subscribe()
119 }
120
121 /// Get the clone-able subscriber factory for a topic.
122 ///
123 /// # Panics
124 ///
125 /// Panics if the topic already exists with a different type `T`.
126 pub fn subscribable<T: Pod>(&self, topic: &str) -> Subscribable<T> {
127 let mut topics = self.topics.lock();
128 let slot = topics
129 .entry_ref(topic)
130 .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
131 let entry = Self::downcast_mut::<T>(slot, topic);
132 entry.subscribable.clone()
133 }
134
135 /// Downcast the erased `TopicSlot` to `TypedEntry<T>`, panicking with a
136 /// clear message on type mismatch.
137 fn downcast_mut<'a, T: Pod>(slot: &'a mut TopicSlot, topic: &str) -> &'a mut TypedEntry<T> {
138 let requested = TypeId::of::<T>();
139 if slot.type_id != requested {
140 panic!(
141 "topic '{topic}' exists with type '{}', cannot access as '{}'",
142 slot.type_name,
143 core::any::type_name::<T>(),
144 );
145 }
146 slot.inner
147 .downcast_mut::<TypedEntry<T>>()
148 .expect("TypeId matched but downcast failed (this is a bug)")
149 }
150
151 fn make_slot<T: Pod>(capacity: usize) -> TopicSlot {
152 let (pub_, sub_) = channel::channel::<T>(capacity);
153 TopicSlot {
154 type_id: TypeId::of::<T>(),
155 type_name: core::any::type_name::<T>(),
156 inner: Box::new(TypedEntry {
157 subscribable: sub_,
158 publisher: Some(pub_),
159 }),
160 }
161 }
162}