Skip to main content

photon_ring/
bus.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use alloc::string::String;
5
6use crate::channel::{self, Publisher, Subscribable, Subscriber};
7use crate::pod::Pod;
8use hashbrown::HashMap;
9use spin::Mutex;
10
11/// Named-topic pub/sub bus.
12///
13/// Wraps [`channel`](crate::channel) with string-keyed topic routing.
14/// Each topic is an independent SPMC ring.
15///
16/// ```
17/// let bus = photon_ring::Photon::<u64>::new(64);
18/// let mut pub_ = bus.publisher("prices");
19/// let mut sub  = bus.subscribe("prices");
20/// pub_.publish(100);
21/// assert_eq!(sub.try_recv(), Ok(100));
22/// ```
23pub struct Photon<T: Pod> {
24    topics: Mutex<HashMap<String, TopicEntry<T>>>,
25    default_capacity: usize,
26}
27
28struct TopicEntry<T: Pod> {
29    subscribable: Subscribable<T>,
30    publisher: Option<Publisher<T>>,
31}
32
33impl<T: Pod> Photon<T> {
34    /// Create a bus. `capacity` is the ring size for each topic (>= 2).
35    pub fn new(capacity: usize) -> Self {
36        Photon {
37            topics: Mutex::new(HashMap::new()),
38            default_capacity: capacity,
39        }
40    }
41
42    /// Take the publisher for a topic. Creates the topic if it doesn't exist.
43    ///
44    /// # Panics
45    /// Panics if the publisher for this topic was already taken.
46    pub fn publisher(&self, topic: &str) -> Publisher<T> {
47        let mut topics = self.topics.lock();
48        let entry = topics
49            .entry_ref(topic)
50            .or_insert_with(|| Self::make_entry(self.default_capacity));
51        entry
52            .publisher
53            .take()
54            .unwrap_or_else(|| panic!("publisher already taken for topic '{}'", topic))
55    }
56
57    /// Try to take the publisher for a topic. Returns `None` if the
58    /// publisher was already taken.
59    pub fn try_publisher(&self, topic: &str) -> Option<Publisher<T>> {
60        let mut topics = self.topics.lock();
61        let entry = topics
62            .entry_ref(topic)
63            .or_insert_with(|| Self::make_entry(self.default_capacity));
64        entry.publisher.take()
65    }
66
67    /// Subscribe to a topic (future messages only). Creates the topic if needed.
68    pub fn subscribe(&self, topic: &str) -> Subscriber<T> {
69        let mut topics = self.topics.lock();
70        let entry = topics
71            .entry_ref(topic)
72            .or_insert_with(|| Self::make_entry(self.default_capacity));
73        entry.subscribable.subscribe()
74    }
75
76    /// Get the clone-able subscriber factory for a topic.
77    pub fn subscribable(&self, topic: &str) -> Subscribable<T> {
78        let mut topics = self.topics.lock();
79        let entry = topics
80            .entry_ref(topic)
81            .or_insert_with(|| Self::make_entry(self.default_capacity));
82        entry.subscribable.clone()
83    }
84
85    fn make_entry(capacity: usize) -> TopicEntry<T> {
86        let (pub_, sub_) = channel::channel(capacity);
87        TopicEntry {
88            subscribable: sub_,
89            publisher: Some(pub_),
90        }
91    }
92}