Skip to main content

photon_ring/channel/
subscribable.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::group::SubscriberGroup;
5use super::subscriber::Subscriber;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11/// Clone-able handle for spawning [`Subscriber`]s.
12///
13/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
14/// to create independent consumers.
15pub struct Subscribable<T: Pod> {
16    pub(super) ring: Arc<SharedRing<T>>,
17}
18
19impl<T: Pod> Clone for Subscribable<T> {
20    fn clone(&self) -> Self {
21        Subscribable {
22            ring: self.ring.clone(),
23        }
24    }
25}
26
27unsafe impl<T: Pod> Send for Subscribable<T> {}
28unsafe impl<T: Pod> Sync for Subscribable<T> {}
29
30impl<T: Pod> Subscribable<T> {
31    /// Create a subscriber that will see only **future** messages.
32    pub fn subscribe(&self) -> Subscriber<T> {
33        let head = self.ring.cursor.0.load(Ordering::Acquire);
34        let start = if head == u64::MAX { 0 } else { head + 1 };
35        let tracker = self.ring.register_tracker(start);
36        let slots_ptr = self.ring.slots_ptr();
37        let idx = self.ring.index;
38        Subscriber {
39            ring: self.ring.clone(),
40            slots_ptr,
41            capacity: idx.capacity,
42            mask: idx.mask,
43            reciprocal: idx.reciprocal,
44            is_pow2: idx.is_pow2,
45            cursor: start,
46            tracker,
47            total_lagged: 0,
48            total_received: 0,
49        }
50    }
51
52    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
53    /// message. All `N` logical subscribers share a single ring read — the
54    /// seqlock is checked once and all cursors are advanced together.
55    ///
56    /// This is dramatically faster than `N` independent [`Subscriber`]s when
57    /// polled in a loop on the same thread.
58    ///
59    /// # Panics
60    ///
61    /// Panics if `N` is 0.
62    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
63        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
64        let head = self.ring.cursor.0.load(Ordering::Acquire);
65        let start = if head == u64::MAX { 0 } else { head + 1 };
66        let tracker = self.ring.register_tracker(start);
67        let slots_ptr = self.ring.slots_ptr();
68        let idx = self.ring.index;
69        SubscriberGroup {
70            ring: self.ring.clone(),
71            slots_ptr,
72            capacity: idx.capacity,
73            mask: idx.mask,
74            reciprocal: idx.reciprocal,
75            is_pow2: idx.is_pow2,
76            cursor: start,
77            total_lagged: 0,
78            total_received: 0,
79            tracker,
80        }
81    }
82
83    /// Create a subscriber starting from the **oldest available** message
84    /// still in the ring (or 0 if nothing published yet).
85    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
86        let head = self.ring.cursor.0.load(Ordering::Acquire);
87        let cap = self.ring.capacity();
88        let start = if head == u64::MAX {
89            0
90        } else if head >= cap {
91            head - cap + 1
92        } else {
93            0
94        };
95        let tracker = self.ring.register_tracker(start);
96        let slots_ptr = self.ring.slots_ptr();
97        let idx = self.ring.index;
98        Subscriber {
99            ring: self.ring.clone(),
100            slots_ptr,
101            capacity: idx.capacity,
102            mask: idx.mask,
103            reciprocal: idx.reciprocal,
104            is_pow2: idx.is_pow2,
105            cursor: start,
106            tracker,
107            total_lagged: 0,
108            total_received: 0,
109        }
110    }
111
112    /// Create a subscriber with an active cursor tracker.
113    ///
114    /// Use this when the subscriber will participate in a
115    /// [`DependencyBarrier`] as an upstream consumer.
116    ///
117    /// On **bounded** channels, this behaves identically to
118    /// [`subscribe()`](Self::subscribe) — those subscribers already have
119    /// trackers.
120    ///
121    /// On **lossy** channels, [`subscribe()`](Self::subscribe) omits the
122    /// tracker (zero overhead for the common case). This method creates a
123    /// standalone tracker so that a [`DependencyBarrier`] can read the
124    /// subscriber's cursor position. The tracker is **not** registered
125    /// with the ring's backpressure system — it is purely for dependency
126    /// graph coordination.
127    pub fn subscribe_tracked(&self) -> Subscriber<T> {
128        let head = self.ring.cursor.0.load(Ordering::Acquire);
129        let start = if head == u64::MAX { 0 } else { head + 1 };
130        // On bounded channels, register_tracker returns Some (backpressure-aware).
131        // On lossy channels, it returns None — so we create a standalone tracker.
132        let tracker = self
133            .ring
134            .register_tracker(start)
135            .or_else(|| Some(Arc::new(Padded(AtomicU64::new(start)))));
136        let slots_ptr = self.ring.slots_ptr();
137        let idx = self.ring.index;
138        Subscriber {
139            ring: self.ring.clone(),
140            slots_ptr,
141            capacity: idx.capacity,
142            mask: idx.mask,
143            reciprocal: idx.reciprocal,
144            is_pow2: idx.is_pow2,
145            cursor: start,
146            tracker,
147            total_lagged: 0,
148            total_received: 0,
149        }
150    }
151}