Skip to main content

photon_ring/
bus.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use alloc::string::{String, ToString};
5
6use crate::channel::{self, Publisher, Subscribable, Subscriber};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10/// Named-topic pub/sub bus.
11///
12/// Wraps [`channel`](crate::channel) with string-keyed topic routing.
13/// Each topic is an independent SPMC ring.
14///
15/// ```
16/// let bus = photon_ring::Photon::<u64>::new(64);
17/// let mut pub_ = bus.publisher("prices");
18/// let mut sub  = bus.subscribe("prices");
19/// pub_.publish(100);
20/// assert_eq!(sub.try_recv(), Ok(100));
21/// ```
22pub struct Photon<T: Copy + Send + 'static> {
23    topics: Mutex<HashMap<String, TopicEntry<T>>>,
24    default_capacity: usize,
25}
26
27struct TopicEntry<T: Copy + Send> {
28    subscribable: Subscribable<T>,
29    publisher: Option<Publisher<T>>,
30}
31
32impl<T: Copy + Send + 'static> Photon<T> {
33    /// Create a bus. `capacity` is the ring size for each topic (power of two).
34    pub fn new(capacity: usize) -> Self {
35        Photon {
36            topics: Mutex::new(HashMap::new()),
37            default_capacity: capacity,
38        }
39    }
40
41    /// Take the publisher for a topic. Creates the topic if it doesn't exist.
42    ///
43    /// # Panics
44    /// Panics if the publisher for this topic was already taken.
45    pub fn publisher(&self, topic: &str) -> Publisher<T> {
46        let mut topics = self.topics.lock();
47        let entry = topics
48            .entry(topic.to_string())
49            .or_insert_with(|| Self::make_entry(self.default_capacity));
50        entry
51            .publisher
52            .take()
53            .unwrap_or_else(|| panic!("publisher already taken for topic '{}'", topic))
54    }
55
56    /// Subscribe to a topic (future messages only). Creates the topic if needed.
57    pub fn subscribe(&self, topic: &str) -> Subscriber<T> {
58        let mut topics = self.topics.lock();
59        let entry = topics
60            .entry(topic.to_string())
61            .or_insert_with(|| Self::make_entry(self.default_capacity));
62        entry.subscribable.subscribe()
63    }
64
65    /// Get the clone-able subscriber factory for a topic.
66    pub fn subscribable(&self, topic: &str) -> Subscribable<T> {
67        let mut topics = self.topics.lock();
68        let entry = topics
69            .entry(topic.to_string())
70            .or_insert_with(|| Self::make_entry(self.default_capacity));
71        entry.subscribable.clone()
72    }
73
74    fn make_entry(capacity: usize) -> TopicEntry<T> {
75        let (pub_, sub_) = channel::channel(capacity);
76        TopicEntry {
77            subscribable: sub_,
78            publisher: Some(pub_),
79        }
80    }
81}