photon_ring/channel/
subscribable.rs1use super::group::SubscriberGroup;
5use super::subscriber::Subscriber;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11pub struct Subscribable<T: Pod> {
16 pub(super) ring: Arc<SharedRing<T>>,
17}
18
19impl<T: Pod> Clone for Subscribable<T> {
20 fn clone(&self) -> Self {
21 Subscribable {
22 ring: self.ring.clone(),
23 }
24 }
25}
26
27unsafe impl<T: Pod> Send for Subscribable<T> {}
28unsafe impl<T: Pod> Sync for Subscribable<T> {}
29
30impl<T: Pod> Subscribable<T> {
31 pub fn subscribe(&self) -> Subscriber<T> {
33 let head = self.ring.cursor.0.load(Ordering::Acquire);
34 let start = if head == u64::MAX { 0 } else { head + 1 };
35 let tracker = self.ring.register_tracker(start);
36 let slots_ptr = self.ring.slots_ptr();
37 let idx = self.ring.index;
38 Subscriber {
39 ring: self.ring.clone(),
40 slots_ptr,
41 capacity: idx.capacity,
42 mask: idx.mask,
43 reciprocal: idx.reciprocal,
44 is_pow2: idx.is_pow2,
45 cursor: start,
46 tracker,
47 total_lagged: 0,
48 total_received: 0,
49 }
50 }
51
52 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
63 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
64 let head = self.ring.cursor.0.load(Ordering::Acquire);
65 let start = if head == u64::MAX { 0 } else { head + 1 };
66 let tracker = self.ring.register_tracker(start);
67 let slots_ptr = self.ring.slots_ptr();
68 let idx = self.ring.index;
69 SubscriberGroup {
70 ring: self.ring.clone(),
71 slots_ptr,
72 capacity: idx.capacity,
73 mask: idx.mask,
74 reciprocal: idx.reciprocal,
75 is_pow2: idx.is_pow2,
76 cursor: start,
77 total_lagged: 0,
78 total_received: 0,
79 tracker,
80 }
81 }
82
83 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
86 let head = self.ring.cursor.0.load(Ordering::Acquire);
87 let cap = self.ring.capacity();
88 let start = if head == u64::MAX {
89 0
90 } else if head >= cap {
91 head - cap + 1
92 } else {
93 0
94 };
95 let tracker = self.ring.register_tracker(start);
96 let slots_ptr = self.ring.slots_ptr();
97 let idx = self.ring.index;
98 Subscriber {
99 ring: self.ring.clone(),
100 slots_ptr,
101 capacity: idx.capacity,
102 mask: idx.mask,
103 reciprocal: idx.reciprocal,
104 is_pow2: idx.is_pow2,
105 cursor: start,
106 tracker,
107 total_lagged: 0,
108 total_received: 0,
109 }
110 }
111
112 pub fn subscribe_tracked(&self) -> Subscriber<T> {
128 let head = self.ring.cursor.0.load(Ordering::Acquire);
129 let start = if head == u64::MAX { 0 } else { head + 1 };
130 let tracker = self
133 .ring
134 .register_tracker(start)
135 .or_else(|| Some(Arc::new(Padded(AtomicU64::new(start)))));
136 let slots_ptr = self.ring.slots_ptr();
137 let idx = self.ring.index;
138 Subscriber {
139 ring: self.ring.clone(),
140 slots_ptr,
141 capacity: idx.capacity,
142 mask: idx.mask,
143 reciprocal: idx.reciprocal,
144 is_pow2: idx.is_pow2,
145 cursor: start,
146 tracker,
147 total_lagged: 0,
148 total_received: 0,
149 }
150 }
151}