Skip to main content

photon_ring/
bus.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use alloc::string::{String, ToString};
5
6use crate::channel::{self, Publisher, Subscribable, Subscriber};
7use hashbrown::HashMap;
8use spin::Mutex;
9
10/// Named-topic pub/sub bus.
11///
12/// Wraps [`channel`](crate::channel) with string-keyed topic routing.
13/// Each topic is an independent SPMC ring.
14///
15/// ```
16/// let bus = photon_ring::Photon::<u64>::new(64);
17/// let mut pub_ = bus.publisher("prices");
18/// let mut sub  = bus.subscribe("prices");
19/// pub_.publish(100);
20/// assert_eq!(sub.try_recv(), Ok(100));
21/// ```
22pub struct Photon<T: Copy + Send + 'static> {
23    topics: Mutex<HashMap<String, TopicEntry<T>>>,
24    default_capacity: usize,
25}
26
27struct TopicEntry<T: Copy + Send> {
28    subscribable: Subscribable<T>,
29    publisher: Option<Publisher<T>>,
30}
31
32impl<T: Copy + Send + 'static> Photon<T> {
33    /// Create a bus. `capacity` is the ring size for each topic (power of two).
34    pub fn new(capacity: usize) -> Self {
35        Photon {
36            topics: Mutex::new(HashMap::new()),
37            default_capacity: capacity,
38        }
39    }
40
41    /// Take the publisher for a topic. Creates the topic if it doesn't exist.
42    ///
43    /// # Panics
44    /// Panics if the publisher for this topic was already taken.
45    pub fn publisher(&self, topic: &str) -> Publisher<T> {
46        let mut topics = self.topics.lock();
47        if !topics.contains_key(topic) {
48            topics.insert(topic.to_string(), Self::make_entry(self.default_capacity));
49        }
50        let entry = topics.get_mut(topic).unwrap();
51        entry
52            .publisher
53            .take()
54            .unwrap_or_else(|| panic!("publisher already taken for topic '{}'", topic))
55    }
56
57    /// Subscribe to a topic (future messages only). Creates the topic if needed.
58    pub fn subscribe(&self, topic: &str) -> Subscriber<T> {
59        let mut topics = self.topics.lock();
60        if !topics.contains_key(topic) {
61            topics.insert(topic.to_string(), Self::make_entry(self.default_capacity));
62        }
63        let entry = topics.get_mut(topic).unwrap();
64        entry.subscribable.subscribe()
65    }
66
67    /// Get the clone-able subscriber factory for a topic.
68    pub fn subscribable(&self, topic: &str) -> Subscribable<T> {
69        let mut topics = self.topics.lock();
70        if !topics.contains_key(topic) {
71            topics.insert(topic.to_string(), Self::make_entry(self.default_capacity));
72        }
73        let entry = topics.get_mut(topic).unwrap();
74        entry.subscribable.clone()
75    }
76
77    fn make_entry(capacity: usize) -> TopicEntry<T> {
78        let (pub_, sub_) = channel::channel(capacity);
79        TopicEntry {
80            subscribable: sub_,
81            publisher: Some(pub_),
82        }
83    }
84}