photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::SharedRing;
5use alloc::sync::Arc;
6use core::sync::atomic::Ordering;
7
8// ---------------------------------------------------------------------------
9// Errors
10// ---------------------------------------------------------------------------
11
12/// Error from [`Subscriber::try_recv`].
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TryRecvError {
15 /// No new messages available.
16 Empty,
17 /// Consumer fell behind the ring. `skipped` messages were lost.
18 Lagged { skipped: u64 },
19}
20
21// ---------------------------------------------------------------------------
22// Publisher (single-producer write side)
23// ---------------------------------------------------------------------------
24
25/// The write side of a Photon SPMC channel.
26///
27/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
28/// only one thread may publish at a time (single-producer guarantee enforced
29/// by `&mut self`).
30pub struct Publisher<T: Copy> {
31 ring: Arc<SharedRing<T>>,
32 seq: u64,
33}
34
35unsafe impl<T: Copy + Send> Send for Publisher<T> {}
36
37impl<T: Copy> Publisher<T> {
38 /// Publish a single value. Zero-allocation, O(1).
39 #[inline]
40 pub fn publish(&mut self, value: T) {
41 self.ring.slot(self.seq).write(self.seq, value);
42 self.ring.cursor.0.store(self.seq, Ordering::Release);
43 self.seq += 1;
44 }
45
46 /// Publish a batch of values with a single cursor update.
47 ///
48 /// Each slot is written atomically (seqlock), but the cursor advances only
49 /// once at the end — consumers see the entire batch appear at once, and
50 /// cache-line bouncing on the shared cursor is reduced to one store.
51 #[inline]
52 pub fn publish_batch(&mut self, values: &[T]) {
53 if values.is_empty() {
54 return;
55 }
56 for (i, &v) in values.iter().enumerate() {
57 let seq = self.seq + i as u64;
58 self.ring.slot(seq).write(seq, v);
59 }
60 let last = self.seq + values.len() as u64 - 1;
61 self.ring.cursor.0.store(last, Ordering::Release);
62 self.seq += values.len() as u64;
63 }
64
65 /// Number of messages published so far.
66 #[inline]
67 pub fn published(&self) -> u64 {
68 self.seq
69 }
70
71 /// Ring capacity (power of two).
72 #[inline]
73 pub fn capacity(&self) -> u64 {
74 self.ring.capacity()
75 }
76}
77
78// ---------------------------------------------------------------------------
79// Subscribable (factory for subscribers)
80// ---------------------------------------------------------------------------
81
82/// Clone-able handle for spawning [`Subscriber`]s.
83///
84/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
85/// to create independent consumers.
86pub struct Subscribable<T: Copy> {
87 ring: Arc<SharedRing<T>>,
88}
89
90impl<T: Copy> Clone for Subscribable<T> {
91 fn clone(&self) -> Self {
92 Subscribable {
93 ring: self.ring.clone(),
94 }
95 }
96}
97
98unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
99unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
100
101impl<T: Copy> Subscribable<T> {
102 /// Create a subscriber that will see only **future** messages.
103 pub fn subscribe(&self) -> Subscriber<T> {
104 let head = self.ring.cursor.0.load(Ordering::Acquire);
105 let start = if head == u64::MAX { 0 } else { head + 1 };
106 Subscriber {
107 ring: self.ring.clone(),
108 cursor: start,
109 }
110 }
111
112 /// Create a subscriber starting from the **oldest available** message
113 /// still in the ring (or 0 if nothing published yet).
114 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
115 let head = self.ring.cursor.0.load(Ordering::Acquire);
116 let cap = self.ring.capacity();
117 let start = if head == u64::MAX {
118 0
119 } else if head >= cap {
120 head - cap + 1
121 } else {
122 0
123 };
124 Subscriber {
125 ring: self.ring.clone(),
126 cursor: start,
127 }
128 }
129}
130
131// ---------------------------------------------------------------------------
132// Subscriber (consumer read side)
133// ---------------------------------------------------------------------------
134
135/// The read side of a Photon SPMC channel.
136///
137/// Each subscriber has its own cursor — no contention between consumers.
138pub struct Subscriber<T: Copy> {
139 ring: Arc<SharedRing<T>>,
140 cursor: u64,
141}
142
143unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
144
145impl<T: Copy> Subscriber<T> {
146 /// Try to receive the next message without blocking.
147 #[inline]
148 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
149 let head = self.ring.cursor.0.load(Ordering::Acquire);
150
151 if head == u64::MAX || self.cursor > head {
152 return Err(TryRecvError::Empty);
153 }
154
155 // Fast-path lag check (avoids touching the slot cache line)
156 let cap = self.ring.capacity();
157 if head >= cap {
158 let oldest = head - cap + 1;
159 if self.cursor < oldest {
160 let skipped = oldest - self.cursor;
161 self.cursor = oldest;
162 return Err(TryRecvError::Lagged { skipped });
163 }
164 }
165
166 self.read_slot()
167 }
168
169 /// Spin until the next message is available and return it.
170 #[inline]
171 pub fn recv(&mut self) -> T {
172 loop {
173 match self.try_recv() {
174 Ok(val) => return val,
175 Err(TryRecvError::Empty) => core::hint::spin_loop(),
176 Err(TryRecvError::Lagged { .. }) => {
177 // Cursor already advanced past the gap — retry immediately.
178 }
179 }
180 }
181 }
182
183 /// Skip to the **latest** published message (discards intermediate ones).
184 ///
185 /// Returns `None` only if nothing has been published yet. Under heavy
186 /// producer load, retries internally if the target slot is mid-write.
187 pub fn latest(&mut self) -> Option<T> {
188 loop {
189 let head = self.ring.cursor.0.load(Ordering::Acquire);
190 if head == u64::MAX {
191 return None;
192 }
193 self.cursor = head;
194 match self.read_slot() {
195 Ok(v) => return Some(v),
196 Err(TryRecvError::Empty) => return None,
197 Err(TryRecvError::Lagged { .. }) => {
198 // Producer lapped us between cursor read and slot read.
199 // Retry with updated head.
200 }
201 }
202 }
203 }
204
205 /// How many messages are available to read (capped at ring capacity).
206 #[inline]
207 pub fn pending(&self) -> u64 {
208 let head = self.ring.cursor.0.load(Ordering::Acquire);
209 if head == u64::MAX || self.cursor > head {
210 0
211 } else {
212 let raw = head - self.cursor + 1;
213 raw.min(self.ring.capacity())
214 }
215 }
216
217 /// Internal seqlock read with retry.
218 #[inline]
219 fn read_slot(&mut self) -> Result<T, TryRecvError> {
220 let slot = self.ring.slot(self.cursor);
221
222 loop {
223 match slot.try_read(self.cursor) {
224 Ok(Some(value)) => {
225 self.cursor += 1;
226 return Ok(value);
227 }
228 Ok(None) => {
229 // Torn read or write-in-progress — spin and retry.
230 core::hint::spin_loop();
231 }
232 Err(actual_stamp) => {
233 let expected_stamp = self.cursor * 2 + 2;
234 if actual_stamp < expected_stamp {
235 return Err(TryRecvError::Empty);
236 }
237 // Slot was overwritten — recompute oldest from head cursor
238 // (authoritative source) rather than inferring from the
239 // slot stamp, which only tells us about this one slot.
240 let head = self.ring.cursor.0.load(Ordering::Acquire);
241 let cap = self.ring.capacity();
242 if head >= cap {
243 let oldest = head - cap + 1;
244 if self.cursor < oldest {
245 let skipped = oldest - self.cursor;
246 self.cursor = oldest;
247 return Err(TryRecvError::Lagged { skipped });
248 }
249 }
250 // Head hasn't caught up yet (rare timing race) — spin
251 core::hint::spin_loop();
252 }
253 }
254 }
255 }
256}
257
258// ---------------------------------------------------------------------------
259// Constructor
260// ---------------------------------------------------------------------------
261
262/// Create a Photon SPMC channel.
263///
264/// `capacity` must be a power of two (≥ 2). Returns the single-producer
265/// write end and a clone-able factory for creating consumers.
266///
267/// # Example
268/// ```
269/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
270/// let mut sub = subs.subscribe();
271/// pub_.publish(42);
272/// assert_eq!(sub.try_recv(), Ok(42));
273/// ```
274pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
275 let ring = Arc::new(SharedRing::new(capacity));
276 (
277 Publisher {
278 ring: ring.clone(),
279 seq: 0,
280 },
281 Subscribable { ring },
282 )
283}