Skip to main content

photon_ring/
typed_bus.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use alloc::boxed::Box;
5use alloc::string::{String, ToString};
6use core::any::{Any, TypeId};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10use crate::channel::{self, Publisher, Subscribable, Subscriber};
11use crate::pod::Pod;
12
13/// Wrapper that erases the concrete `T` behind `dyn Any` while preserving
14/// the `TypeId` and a human-readable type name for diagnostics.
15struct TopicSlot {
16    type_id: TypeId,
17    type_name: &'static str,
18    inner: Box<dyn Any + Send + Sync>,
19}
20
21/// The concrete (generic) payload stored inside a [`TopicSlot`].
22struct TypedEntry<T: Pod> {
23    subscribable: Subscribable<T>,
24    publisher: Option<Publisher<T>>,
25}
26
27// Safety: `TypedEntry<T>` is `Send + Sync` when `T: Pod`
28// because `Subscribable<T>` is `Send + Sync` and `Publisher<T>` is `Send`.
29// The `Option` wrapper and the fact that the publisher is only accessed
30// under the outer `Mutex` make this safe.
31unsafe impl<T: Pod> Send for TypedEntry<T> {}
32unsafe impl<T: Pod> Sync for TypedEntry<T> {}
33
34/// A topic bus that supports different message types per topic.
35///
36/// Unlike [`Photon<T>`](crate::Photon) which requires a single message type
37/// across all topics, `TypedBus` allows each topic to have its own
38/// `T: Pod`.
39///
40/// # Example
41///
42/// ```
43/// use photon_ring::TypedBus;
44///
45/// let bus = TypedBus::new(1024);
46///
47/// // Different types per topic
48/// let mut price_pub = bus.publisher::<f64>("prices");
49/// let mut vol_pub = bus.publisher::<u32>("volumes");
50///
51/// let mut price_sub = bus.subscribe::<f64>("prices");
52/// let mut vol_sub = bus.subscribe::<u32>("volumes");
53///
54/// price_pub.publish(42.5);
55/// vol_pub.publish(1000);
56///
57/// assert_eq!(price_sub.try_recv(), Ok(42.5));
58/// assert_eq!(vol_sub.try_recv(), Ok(1000));
59/// ```
60pub struct TypedBus {
61    topics: Mutex<HashMap<String, TopicSlot>>,
62    default_capacity: usize,
63}
64
65impl TypedBus {
66    /// Create a bus. `capacity` is the ring size for each topic (power of two).
67    pub fn new(capacity: usize) -> Self {
68        TypedBus {
69            topics: Mutex::new(HashMap::new()),
70            default_capacity: capacity,
71        }
72    }
73
74    /// Take the publisher for a topic. Creates the topic if it doesn't exist.
75    ///
76    /// # Panics
77    ///
78    /// - Panics if the topic already exists with a different type `T`.
79    /// - Panics if the publisher for this topic was already taken.
80    pub fn publisher<T: Pod>(&self, topic: &str) -> Publisher<T> {
81        let mut topics = self.topics.lock();
82        if !topics.contains_key(topic) {
83            topics.insert(
84                topic.to_string(),
85                Self::make_slot::<T>(self.default_capacity),
86            );
87        }
88        let slot = topics.get_mut(topic).unwrap();
89        let entry = Self::downcast_mut::<T>(slot, topic);
90        entry
91            .publisher
92            .take()
93            .unwrap_or_else(|| panic!("publisher already taken for topic '{topic}'"))
94    }
95
96    /// Try to take the publisher for a topic. Returns `None` if the
97    /// publisher was already taken.
98    ///
99    /// # Panics
100    ///
101    /// Panics if the topic already exists with a different type `T`.
102    pub fn try_publisher<T: Pod>(&self, topic: &str) -> Option<Publisher<T>> {
103        let mut topics = self.topics.lock();
104        if !topics.contains_key(topic) {
105            topics.insert(
106                topic.to_string(),
107                Self::make_slot::<T>(self.default_capacity),
108            );
109        }
110        let slot = topics.get_mut(topic).unwrap();
111        let entry = Self::downcast_mut::<T>(slot, topic);
112        entry.publisher.take()
113    }
114
115    /// Subscribe to a topic (future messages only). Creates the topic if needed.
116    ///
117    /// # Panics
118    ///
119    /// Panics if the topic already exists with a different type `T`.
120    pub fn subscribe<T: Pod>(&self, topic: &str) -> Subscriber<T> {
121        let mut topics = self.topics.lock();
122        if !topics.contains_key(topic) {
123            topics.insert(
124                topic.to_string(),
125                Self::make_slot::<T>(self.default_capacity),
126            );
127        }
128        let slot = topics.get_mut(topic).unwrap();
129        let entry = Self::downcast_mut::<T>(slot, topic);
130        entry.subscribable.subscribe()
131    }
132
133    /// Get the clone-able subscriber factory for a topic.
134    ///
135    /// # Panics
136    ///
137    /// Panics if the topic already exists with a different type `T`.
138    pub fn subscribable<T: Pod>(&self, topic: &str) -> Subscribable<T> {
139        let mut topics = self.topics.lock();
140        if !topics.contains_key(topic) {
141            topics.insert(
142                topic.to_string(),
143                Self::make_slot::<T>(self.default_capacity),
144            );
145        }
146        let slot = topics.get_mut(topic).unwrap();
147        let entry = Self::downcast_mut::<T>(slot, topic);
148        entry.subscribable.clone()
149    }
150
151    /// Downcast the erased `TopicSlot` to `TypedEntry<T>`, panicking with a
152    /// clear message on type mismatch.
153    fn downcast_mut<'a, T: Pod>(slot: &'a mut TopicSlot, topic: &str) -> &'a mut TypedEntry<T> {
154        let requested = TypeId::of::<T>();
155        if slot.type_id != requested {
156            panic!(
157                "topic '{topic}' exists with type '{}', cannot access as '{}'",
158                slot.type_name,
159                core::any::type_name::<T>(),
160            );
161        }
162        slot.inner
163            .downcast_mut::<TypedEntry<T>>()
164            .expect("TypeId matched but downcast failed (this is a bug)")
165    }
166
167    fn make_slot<T: Pod>(capacity: usize) -> TopicSlot {
168        let (pub_, sub_) = channel::channel::<T>(capacity);
169        TopicSlot {
170            type_id: TypeId::of::<T>(),
171            type_name: core::any::type_name::<T>(),
172            inner: Box::new(TypedEntry {
173                subscribable: sub_,
174                publisher: Some(pub_),
175            }),
176        }
177    }
178}