Skip to main content

photon_ring/
bus.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use alloc::string::{String, ToString};
5
6use crate::channel::{self, Publisher, Subscribable, Subscriber};
7use crate::pod::Pod;
8use hashbrown::HashMap;
9use spin::Mutex;
10
11/// Named-topic pub/sub bus.
12///
13/// Wraps [`channel`](crate::channel) with string-keyed topic routing.
14/// Each topic is an independent SPMC ring.
15///
16/// ```
17/// let bus = photon_ring::Photon::<u64>::new(64);
18/// let mut pub_ = bus.publisher("prices");
19/// let mut sub  = bus.subscribe("prices");
20/// pub_.publish(100);
21/// assert_eq!(sub.try_recv(), Ok(100));
22/// ```
23pub struct Photon<T: Pod> {
24    topics: Mutex<HashMap<String, TopicEntry<T>>>,
25    default_capacity: usize,
26}
27
28struct TopicEntry<T: Pod> {
29    subscribable: Subscribable<T>,
30    publisher: Option<Publisher<T>>,
31}
32
33impl<T: Pod> Photon<T> {
34    /// Create a bus. `capacity` is the ring size for each topic (power of two).
35    pub fn new(capacity: usize) -> Self {
36        Photon {
37            topics: Mutex::new(HashMap::new()),
38            default_capacity: capacity,
39        }
40    }
41
42    /// Take the publisher for a topic. Creates the topic if it doesn't exist.
43    ///
44    /// # Panics
45    /// Panics if the publisher for this topic was already taken.
46    pub fn publisher(&self, topic: &str) -> Publisher<T> {
47        let mut topics = self.topics.lock();
48        if !topics.contains_key(topic) {
49            topics.insert(topic.to_string(), Self::make_entry(self.default_capacity));
50        }
51        let entry = topics.get_mut(topic).unwrap();
52        entry
53            .publisher
54            .take()
55            .unwrap_or_else(|| panic!("publisher already taken for topic '{}'", topic))
56    }
57
58    /// Try to take the publisher for a topic. Returns `None` if the
59    /// publisher was already taken.
60    pub fn try_publisher(&self, topic: &str) -> Option<Publisher<T>> {
61        let mut topics = self.topics.lock();
62        if !topics.contains_key(topic) {
63            topics.insert(topic.to_string(), Self::make_entry(self.default_capacity));
64        }
65        let entry = topics.get_mut(topic).unwrap();
66        entry.publisher.take()
67    }
68
69    /// Subscribe to a topic (future messages only). Creates the topic if needed.
70    pub fn subscribe(&self, topic: &str) -> Subscriber<T> {
71        let mut topics = self.topics.lock();
72        if !topics.contains_key(topic) {
73            topics.insert(topic.to_string(), Self::make_entry(self.default_capacity));
74        }
75        let entry = topics.get_mut(topic).unwrap();
76        entry.subscribable.subscribe()
77    }
78
79    /// Get the clone-able subscriber factory for a topic.
80    pub fn subscribable(&self, topic: &str) -> Subscribable<T> {
81        let mut topics = self.topics.lock();
82        if !topics.contains_key(topic) {
83            topics.insert(topic.to_string(), Self::make_entry(self.default_capacity));
84        }
85        let entry = topics.get_mut(topic).unwrap();
86        entry.subscribable.clone()
87    }
88
89    fn make_entry(capacity: usize) -> TopicEntry<T> {
90        let (pub_, sub_) = channel::channel(capacity);
91        TopicEntry {
92            subscribable: sub_,
93            publisher: Some(pub_),
94        }
95    }
96}