Skip to main content

photon_ring/channel/
subscribable.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::group::SubscriberGroup;
5use super::subscriber::Subscriber;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11/// Clone-able handle for spawning [`Subscriber`]s.
12///
13/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
14/// to create independent consumers.
15pub struct Subscribable<T: Pod> {
16    pub(super) ring: Arc<SharedRing<T>>,
17}
18
19impl<T: Pod> Clone for Subscribable<T> {
20    fn clone(&self) -> Self {
21        Subscribable {
22            ring: self.ring.clone(),
23        }
24    }
25}
26
27unsafe impl<T: Pod> Send for Subscribable<T> {}
28unsafe impl<T: Pod> Sync for Subscribable<T> {}
29
30impl<T: Pod> Subscribable<T> {
31    /// Create a subscriber that will see only **future** messages.
32    pub fn subscribe(&self) -> Subscriber<T> {
33        let head = self.ring.cursor.0.load(Ordering::Acquire);
34        let start = if head == u64::MAX { 0 } else { head + 1 };
35        let tracker = self.ring.register_tracker(start);
36        let slots_ptr = self.ring.slots_ptr();
37        let mask = self.ring.mask;
38        Subscriber {
39            ring: self.ring.clone(),
40            slots_ptr,
41            mask,
42            cursor: start,
43            tracker,
44            total_lagged: 0,
45            total_received: 0,
46        }
47    }
48
49    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
50    /// message. All `N` logical subscribers share a single ring read — the
51    /// seqlock is checked once and all cursors are advanced together.
52    ///
53    /// This is dramatically faster than `N` independent [`Subscriber`]s when
54    /// polled in a loop on the same thread.
55    ///
56    /// # Panics
57    ///
58    /// Panics if `N` is 0.
59    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
60        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
61        let head = self.ring.cursor.0.load(Ordering::Acquire);
62        let start = if head == u64::MAX { 0 } else { head + 1 };
63        let tracker = self.ring.register_tracker(start);
64        let slots_ptr = self.ring.slots_ptr();
65        let mask = self.ring.mask;
66        SubscriberGroup {
67            ring: self.ring.clone(),
68            slots_ptr,
69            mask,
70            cursor: start,
71            total_lagged: 0,
72            total_received: 0,
73            tracker,
74        }
75    }
76
77    /// Create a subscriber starting from the **oldest available** message
78    /// still in the ring (or 0 if nothing published yet).
79    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
80        let head = self.ring.cursor.0.load(Ordering::Acquire);
81        let cap = self.ring.capacity();
82        let start = if head == u64::MAX {
83            0
84        } else if head >= cap {
85            head - cap + 1
86        } else {
87            0
88        };
89        let tracker = self.ring.register_tracker(start);
90        let slots_ptr = self.ring.slots_ptr();
91        let mask = self.ring.mask;
92        Subscriber {
93            ring: self.ring.clone(),
94            slots_ptr,
95            mask,
96            cursor: start,
97            tracker,
98            total_lagged: 0,
99            total_received: 0,
100        }
101    }
102
103    /// Create a subscriber with an active cursor tracker.
104    ///
105    /// Use this when the subscriber will participate in a
106    /// [`DependencyBarrier`] as an upstream consumer.
107    ///
108    /// On **bounded** channels, this behaves identically to
109    /// [`subscribe()`](Self::subscribe) — those subscribers already have
110    /// trackers.
111    ///
112    /// On **lossy** channels, [`subscribe()`](Self::subscribe) omits the
113    /// tracker (zero overhead for the common case). This method creates a
114    /// standalone tracker so that a [`DependencyBarrier`] can read the
115    /// subscriber's cursor position. The tracker is **not** registered
116    /// with the ring's backpressure system — it is purely for dependency
117    /// graph coordination.
118    pub fn subscribe_tracked(&self) -> Subscriber<T> {
119        let head = self.ring.cursor.0.load(Ordering::Acquire);
120        let start = if head == u64::MAX { 0 } else { head + 1 };
121        // On bounded channels, register_tracker returns Some (backpressure-aware).
122        // On lossy channels, it returns None — so we create a standalone tracker.
123        let tracker = self
124            .ring
125            .register_tracker(start)
126            .or_else(|| Some(Arc::new(Padded(AtomicU64::new(start)))));
127        let slots_ptr = self.ring.slots_ptr();
128        let mask = self.ring.mask;
129        Subscriber {
130            ring: self.ring.clone(),
131            slots_ptr,
132            mask,
133            cursor: start,
134            tracker,
135            total_lagged: 0,
136            total_received: 0,
137        }
138    }
139}