photon_ring/channel/
subscribable.rs1use super::group::SubscriberGroup;
5use super::subscriber::Subscriber;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11pub struct Subscribable<T: Pod> {
16 pub(super) ring: Arc<SharedRing<T>>,
17}
18
19impl<T: Pod> Clone for Subscribable<T> {
20 fn clone(&self) -> Self {
21 Subscribable {
22 ring: self.ring.clone(),
23 }
24 }
25}
26
27unsafe impl<T: Pod> Send for Subscribable<T> {}
28unsafe impl<T: Pod> Sync for Subscribable<T> {}
29
30impl<T: Pod> Subscribable<T> {
31 pub fn subscribe(&self) -> Subscriber<T> {
33 let head = self.ring.cursor.0.load(Ordering::Acquire);
34 let start = if head == u64::MAX { 0 } else { head + 1 };
35 let tracker = self.ring.register_tracker(start);
36 let slots_ptr = self.ring.slots_ptr();
37 let mask = self.ring.mask;
38 Subscriber {
39 ring: self.ring.clone(),
40 slots_ptr,
41 mask,
42 cursor: start,
43 tracker,
44 total_lagged: 0,
45 total_received: 0,
46 }
47 }
48
49 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
60 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
61 let head = self.ring.cursor.0.load(Ordering::Acquire);
62 let start = if head == u64::MAX { 0 } else { head + 1 };
63 let tracker = self.ring.register_tracker(start);
64 let slots_ptr = self.ring.slots_ptr();
65 let mask = self.ring.mask;
66 SubscriberGroup {
67 ring: self.ring.clone(),
68 slots_ptr,
69 mask,
70 cursor: start,
71 total_lagged: 0,
72 total_received: 0,
73 tracker,
74 }
75 }
76
77 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
80 let head = self.ring.cursor.0.load(Ordering::Acquire);
81 let cap = self.ring.capacity();
82 let start = if head == u64::MAX {
83 0
84 } else if head >= cap {
85 head - cap + 1
86 } else {
87 0
88 };
89 let tracker = self.ring.register_tracker(start);
90 let slots_ptr = self.ring.slots_ptr();
91 let mask = self.ring.mask;
92 Subscriber {
93 ring: self.ring.clone(),
94 slots_ptr,
95 mask,
96 cursor: start,
97 tracker,
98 total_lagged: 0,
99 total_received: 0,
100 }
101 }
102
103 pub fn subscribe_tracked(&self) -> Subscriber<T> {
119 let head = self.ring.cursor.0.load(Ordering::Acquire);
120 let start = if head == u64::MAX { 0 } else { head + 1 };
121 let tracker = self
124 .ring
125 .register_tracker(start)
126 .or_else(|| Some(Arc::new(Padded(AtomicU64::new(start)))));
127 let slots_ptr = self.ring.slots_ptr();
128 let mask = self.ring.mask;
129 Subscriber {
130 ring: self.ring.clone(),
131 slots_ptr,
132 mask,
133 cursor: start,
134 tracker,
135 total_lagged: 0,
136 total_received: 0,
137 }
138 }
139}