Skip to main content

photon_ring/
typed_bus.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use alloc::boxed::Box;
5use alloc::string::{String, ToString};
6use core::any::{Any, TypeId};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10use crate::channel::{self, Publisher, Subscribable, Subscriber};
11
12/// Wrapper that erases the concrete `T` behind `dyn Any` while preserving
13/// the `TypeId` and a human-readable type name for diagnostics.
14struct TopicSlot {
15    type_id: TypeId,
16    type_name: &'static str,
17    inner: Box<dyn Any + Send + Sync>,
18}
19
20/// The concrete (generic) payload stored inside a [`TopicSlot`].
21struct TypedEntry<T: Copy + Send + 'static> {
22    subscribable: Subscribable<T>,
23    publisher: Option<Publisher<T>>,
24}
25
26// Safety: `TypedEntry<T>` is `Send + Sync` when `T: Copy + Send + 'static`
27// because `Subscribable<T>` is `Send + Sync` and `Publisher<T>` is `Send`.
28// The `Option` wrapper and the fact that the publisher is only accessed
29// under the outer `Mutex` make this safe.
30unsafe impl<T: Copy + Send + 'static> Send for TypedEntry<T> {}
31unsafe impl<T: Copy + Send + 'static> Sync for TypedEntry<T> {}
32
33/// A topic bus that supports different message types per topic.
34///
35/// Unlike [`Photon<T>`](crate::Photon) which requires a single message type
36/// across all topics, `TypedBus` allows each topic to have its own
37/// `T: Copy + Send + 'static`.
38///
39/// # Example
40///
41/// ```
42/// use photon_ring::TypedBus;
43///
44/// let bus = TypedBus::new(1024);
45///
46/// // Different types per topic
47/// let mut price_pub = bus.publisher::<f64>("prices");
48/// let mut vol_pub = bus.publisher::<u32>("volumes");
49///
50/// let mut price_sub = bus.subscribe::<f64>("prices");
51/// let mut vol_sub = bus.subscribe::<u32>("volumes");
52///
53/// price_pub.publish(42.5);
54/// vol_pub.publish(1000);
55///
56/// assert_eq!(price_sub.try_recv(), Ok(42.5));
57/// assert_eq!(vol_sub.try_recv(), Ok(1000));
58/// ```
59pub struct TypedBus {
60    topics: Mutex<HashMap<String, TopicSlot>>,
61    default_capacity: usize,
62}
63
64impl TypedBus {
65    /// Create a bus. `capacity` is the ring size for each topic (power of two).
66    pub fn new(capacity: usize) -> Self {
67        TypedBus {
68            topics: Mutex::new(HashMap::new()),
69            default_capacity: capacity,
70        }
71    }
72
73    /// Take the publisher for a topic. Creates the topic if it doesn't exist.
74    ///
75    /// # Panics
76    ///
77    /// - Panics if the topic already exists with a different type `T`.
78    /// - Panics if the publisher for this topic was already taken.
79    pub fn publisher<T: Copy + Send + 'static>(&self, topic: &str) -> Publisher<T> {
80        let mut topics = self.topics.lock();
81        let slot = topics
82            .entry(topic.to_string())
83            .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
84
85        let entry = Self::downcast_mut::<T>(slot, topic);
86        entry
87            .publisher
88            .take()
89            .unwrap_or_else(|| panic!("publisher already taken for topic '{topic}'"))
90    }
91
92    /// Subscribe to a topic (future messages only). Creates the topic if needed.
93    ///
94    /// # Panics
95    ///
96    /// Panics if the topic already exists with a different type `T`.
97    pub fn subscribe<T: Copy + Send + 'static>(&self, topic: &str) -> Subscriber<T> {
98        let mut topics = self.topics.lock();
99        let slot = topics
100            .entry(topic.to_string())
101            .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
102
103        let entry = Self::downcast_mut::<T>(slot, topic);
104        entry.subscribable.subscribe()
105    }
106
107    /// Get the clone-able subscriber factory for a topic.
108    ///
109    /// # Panics
110    ///
111    /// Panics if the topic already exists with a different type `T`.
112    pub fn subscribable<T: Copy + Send + 'static>(&self, topic: &str) -> Subscribable<T> {
113        let mut topics = self.topics.lock();
114        let slot = topics
115            .entry(topic.to_string())
116            .or_insert_with(|| Self::make_slot::<T>(self.default_capacity));
117
118        let entry = Self::downcast_mut::<T>(slot, topic);
119        entry.subscribable.clone()
120    }
121
122    /// Downcast the erased `TopicSlot` to `TypedEntry<T>`, panicking with a
123    /// clear message on type mismatch.
124    fn downcast_mut<'a, T: Copy + Send + 'static>(
125        slot: &'a mut TopicSlot,
126        topic: &str,
127    ) -> &'a mut TypedEntry<T> {
128        let requested = TypeId::of::<T>();
129        if slot.type_id != requested {
130            panic!(
131                "topic '{topic}' exists with type '{}', cannot access as '{}'",
132                slot.type_name,
133                core::any::type_name::<T>(),
134            );
135        }
136        slot.inner
137            .downcast_mut::<TypedEntry<T>>()
138            .expect("TypeId matched but downcast failed (this is a bug)")
139    }
140
141    fn make_slot<T: Copy + Send + 'static>(capacity: usize) -> TopicSlot {
142        let (pub_, sub_) = channel::channel::<T>(capacity);
143        TopicSlot {
144            type_id: TypeId::of::<T>(),
145            type_name: core::any::type_name::<T>(),
146            inner: Box::new(TypedEntry {
147                subscribable: sub_,
148                publisher: Some(pub_),
149            }),
150        }
151    }
152}