photon_ring/typed_bus.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use alloc::boxed::Box;
5use alloc::string::{String, ToString};
6use core::any::{Any, TypeId};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10use crate::channel::{self, Publisher, Subscribable, Subscriber};
11
12/// Wrapper that erases the concrete `T` behind `dyn Any` while preserving
13/// the `TypeId` and a human-readable type name for diagnostics.
14struct TopicSlot {
15 type_id: TypeId,
16 type_name: &'static str,
17 inner: Box<dyn Any + Send + Sync>,
18}
19
20/// The concrete (generic) payload stored inside a [`TopicSlot`].
21struct TypedEntry<T: Copy + Send + 'static> {
22 subscribable: Subscribable<T>,
23 publisher: Option<Publisher<T>>,
24}
25
26// Safety: `TypedEntry<T>` is `Send + Sync` when `T: Copy + Send + 'static`
27// because `Subscribable<T>` is `Send + Sync` and `Publisher<T>` is `Send`.
28// The `Option` wrapper and the fact that the publisher is only accessed
29// under the outer `Mutex` make this safe.
30unsafe impl<T: Copy + Send + 'static> Send for TypedEntry<T> {}
31unsafe impl<T: Copy + Send + 'static> Sync for TypedEntry<T> {}
32
33/// A topic bus that supports different message types per topic.
34///
35/// Unlike [`Photon<T>`](crate::Photon) which requires a single message type
36/// across all topics, `TypedBus` allows each topic to have its own
37/// `T: Copy + Send + 'static`.
38///
39/// # Example
40///
41/// ```
42/// use photon_ring::TypedBus;
43///
44/// let bus = TypedBus::new(1024);
45///
46/// // Different types per topic
47/// let mut price_pub = bus.publisher::<f64>("prices");
48/// let mut vol_pub = bus.publisher::<u32>("volumes");
49///
50/// let mut price_sub = bus.subscribe::<f64>("prices");
51/// let mut vol_sub = bus.subscribe::<u32>("volumes");
52///
53/// price_pub.publish(42.5);
54/// vol_pub.publish(1000);
55///
56/// assert_eq!(price_sub.try_recv(), Ok(42.5));
57/// assert_eq!(vol_sub.try_recv(), Ok(1000));
58/// ```
59pub struct TypedBus {
60 topics: Mutex<HashMap<String, TopicSlot>>,
61 default_capacity: usize,
62}
63
64impl TypedBus {
65 /// Create a bus. `capacity` is the ring size for each topic (power of two).
66 pub fn new(capacity: usize) -> Self {
67 TypedBus {
68 topics: Mutex::new(HashMap::new()),
69 default_capacity: capacity,
70 }
71 }
72
73 /// Take the publisher for a topic. Creates the topic if it doesn't exist.
74 ///
75 /// # Panics
76 ///
77 /// - Panics if the topic already exists with a different type `T`.
78 /// - Panics if the publisher for this topic was already taken.
79 pub fn publisher<T: Copy + Send + 'static>(&self, topic: &str) -> Publisher<T> {
80 let mut topics = self.topics.lock();
81 let slot = topics
82 .entry(topic.to_string())
83 .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
84
85 let entry = Self::downcast_mut::<T>(slot, topic);
86 entry
87 .publisher
88 .take()
89 .unwrap_or_else(|| panic!("publisher already taken for topic '{topic}'"))
90 }
91
92 /// Subscribe to a topic (future messages only). Creates the topic if needed.
93 ///
94 /// # Panics
95 ///
96 /// Panics if the topic already exists with a different type `T`.
97 pub fn subscribe<T: Copy + Send + 'static>(&self, topic: &str) -> Subscriber<T> {
98 let mut topics = self.topics.lock();
99 let slot = topics
100 .entry(topic.to_string())
101 .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
102
103 let entry = Self::downcast_mut::<T>(slot, topic);
104 entry.subscribable.subscribe()
105 }
106
107 /// Get the clone-able subscriber factory for a topic.
108 ///
109 /// # Panics
110 ///
111 /// Panics if the topic already exists with a different type `T`.
112 pub fn subscribable<T: Copy + Send + 'static>(&self, topic: &str) -> Subscribable<T> {
113 let mut topics = self.topics.lock();
114 let slot = topics
115 .entry(topic.to_string())
116 .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
117
118 let entry = Self::downcast_mut::<T>(slot, topic);
119 entry.subscribable.clone()
120 }
121
122 /// Downcast the erased `TopicSlot` to `TypedEntry<T>`, panicking with a
123 /// clear message on type mismatch.
124 fn downcast_mut<'a, T: Copy + Send + 'static>(
125 slot: &'a mut TopicSlot,
126 topic: &str,
127 ) -> &'a mut TypedEntry<T> {
128 let requested = TypeId::of::<T>();
129 if slot.type_id != requested {
130 panic!(
131 "topic '{topic}' exists with type '{}', cannot access as '{}'",
132 slot.type_name,
133 core::any::type_name::<T>(),
134 );
135 }
136 slot.inner
137 .downcast_mut::<TypedEntry<T>>()
138 .expect("TypeId matched but downcast failed (this is a bug)")
139 }
140
141 fn make_slot<T: Copy + Send + 'static>(capacity: usize) -> TopicSlot {
142 let (pub_, sub_) = channel::channel::<T>(capacity);
143 TopicSlot {
144 type_id: TypeId::of::<T>(),
145 type_name: core::any::type_name::<T>(),
146 inner: Box::new(TypedEntry {
147 subscribable: sub_,
148 publisher: Some(pub_),
149 }),
150 }
151 }
152}