photon_ring/channel/publisher.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::PublishError;
5use crate::pod::Pod;
6use crate::ring::SharedRing;
7use crate::slot::Slot;
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11use super::prefetch_write_next;
12
13/// The write side of a Photon SPMC channel.
14///
15/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
16/// only one thread may publish at a time (single-producer guarantee enforced
17/// by `&mut self`).
18pub struct Publisher<T: Pod> {
19 pub(super) ring: Arc<SharedRing<T>>,
20 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
21 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
22 pub(super) slots_ptr: *const Slot<T>,
23 /// Cached ring mask (`capacity - 1`). Immutable after construction.
24 pub(super) mask: u64,
25 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
26 pub(super) cursor_ptr: *const AtomicU64,
27 pub(super) seq: u64,
28 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
29 /// check to avoid scanning on every `try_publish` call.
30 pub(super) cached_slowest: u64,
31 /// Cached backpressure flag. Avoids Arc deref + Option check on every
32 /// publish() for lossy channels. Immutable after construction.
33 pub(super) has_backpressure: bool,
34}
35
36unsafe impl<T: Pod> Send for Publisher<T> {}
37
38impl<T: Pod> Publisher<T> {
39 /// Write a single value to the ring without any backpressure check.
40 /// This is the raw publish path used by both `publish()` (lossy) and
41 /// `try_publish()` (after backpressure check passes).
42 #[inline]
43 fn publish_unchecked(&mut self, value: T) {
44 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
45 // Index is masked to stay within the allocated slot array.
46 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
47 prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
48 slot.write(self.seq, value);
49 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
50 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
51 self.seq += 1;
52 }
53
54 /// Publish by writing directly into the slot via a closure.
55 ///
56 /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
57 /// construction that can eliminate the write-side `memcpy` when the
58 /// compiler constructs the value directly in slot memory.
59 ///
60 /// This is the lossy (no backpressure) path. For bounded channels,
61 /// prefer [`publish()`](Self::publish) with a pre-built value.
62 ///
63 /// # Example
64 ///
65 /// ```
66 /// use std::mem::MaybeUninit;
67 /// let (mut p, s) = photon_ring::channel::<u64>(64);
68 /// let mut sub = s.subscribe();
69 /// p.publish_with(|slot| { slot.write(42u64); });
70 /// assert_eq!(sub.try_recv(), Ok(42));
71 /// ```
72 #[inline]
73 pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
74 // SAFETY: see publish_unchecked.
75 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
76 prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
77 slot.write_with(self.seq, f);
78 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
79 self.seq += 1;
80 }
81
82 /// Publish a single value. Zero-allocation, O(1).
83 ///
84 /// On a bounded channel (created with [`channel_bounded()`]), this method
85 /// spin-waits until there is room in the ring, ensuring no message loss.
86 /// On a regular (lossy) channel, this publishes immediately without any
87 /// backpressure check.
88 #[inline]
89 pub fn publish(&mut self, value: T) {
90 if self.has_backpressure {
91 let mut v = value;
92 loop {
93 match self.try_publish(v) {
94 Ok(()) => return,
95 Err(PublishError::Full(returned)) => {
96 v = returned;
97 core::hint::spin_loop();
98 }
99 }
100 }
101 }
102 self.publish_unchecked(value);
103 }
104
105 /// Try to publish a single value with backpressure awareness.
106 ///
107 /// - On a regular (lossy) channel created with [`channel()`], this always
108 /// succeeds — it publishes the value and returns `Ok(())`.
109 /// - On a bounded channel created with [`channel_bounded()`], this checks
110 /// whether the slowest subscriber has fallen too far behind. If
111 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
112 /// `Err(PublishError::Full(value))` without writing.
113 #[inline]
114 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
115 if let Some(bp) = self.ring.backpressure.as_ref() {
116 let capacity = self.ring.capacity();
117 let effective = capacity - bp.watermark;
118
119 // Fast path: use cached slowest cursor.
120 if self.seq >= self.cached_slowest + effective {
121 // Slow path: rescan all trackers.
122 match self.ring.slowest_cursor() {
123 Some(slowest) => {
124 self.cached_slowest = slowest;
125 if self.seq >= slowest + effective {
126 return Err(PublishError::Full(value));
127 }
128 }
129 None => {
130 // No subscribers registered yet — ring is unbounded.
131 }
132 }
133 }
134 }
135 self.publish_unchecked(value);
136 Ok(())
137 }
138
139 /// Publish a batch of values.
140 ///
141 /// On a **lossy** channel: writes all values with a single cursor update
142 /// at the end — consumers see the entire batch appear at once, and
143 /// cache-line bouncing on the shared cursor is reduced to one store.
144 ///
145 /// On a **bounded** channel: spin-waits for room before each value,
146 /// ensuring no message loss. The cursor advances per-value (not batched),
147 /// so consumers may observe a partial batch during publication.
148 #[inline]
149 pub fn publish_batch(&mut self, values: &[T]) {
150 if values.is_empty() {
151 return;
152 }
153 if self.has_backpressure {
154 for &v in values.iter() {
155 let mut val = v;
156 loop {
157 match self.try_publish(val) {
158 Ok(()) => break,
159 Err(PublishError::Full(returned)) => {
160 val = returned;
161 core::hint::spin_loop();
162 }
163 }
164 }
165 }
166 return;
167 }
168 for (i, &v) in values.iter().enumerate() {
169 let seq = self.seq + i as u64;
170 // SAFETY: see publish_unchecked.
171 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
172 slot.write(seq, v);
173 }
174 let last = self.seq + values.len() as u64 - 1;
175 unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
176 self.seq += values.len() as u64;
177 }
178
179 /// Number of messages published so far.
180 #[inline]
181 pub fn published(&self) -> u64 {
182 self.seq
183 }
184
185 /// Current sequence number (same as `published()`).
186 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
187 #[inline]
188 pub fn sequence(&self) -> u64 {
189 self.seq
190 }
191
192 /// Ring capacity (power of two).
193 #[inline]
194 pub fn capacity(&self) -> u64 {
195 self.ring.capacity()
196 }
197
198 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
199 /// them to disk. Reduces worst-case latency by eliminating page-fault
200 /// stalls on the hot path.
201 ///
202 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
203 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
204 #[cfg(all(target_os = "linux", feature = "hugepages"))]
205 pub fn mlock(&self) -> bool {
206 let ptr = self.ring.slots_ptr() as *const u8;
207 let len = self.ring.slots_byte_len();
208 unsafe { crate::mem::mlock_pages(ptr, len) }
209 }
210
211 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
212 /// page. Ensures the first publish does not trigger a page fault.
213 ///
214 /// # Safety
215 ///
216 /// Must be called before any publish/subscribe operations begin.
217 /// Calling this while the ring is in active use is undefined behavior
218 /// because it writes zero bytes to live ring memory via raw pointers,
219 /// which can corrupt slot data and seqlock stamps.
220 #[cfg(all(target_os = "linux", feature = "hugepages"))]
221 pub unsafe fn prefault(&self) {
222 let ptr = self.ring.slots_ptr() as *mut u8;
223 let len = self.ring.slots_byte_len();
224 crate::mem::prefault_pages(ptr, len)
225 }
226}