Skip to main content

photon_ring/channel/
publisher.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::PublishError;
5use crate::pod::Pod;
6use crate::ring::SharedRing;
7use crate::slot::Slot;
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11use super::prefetch_write_next;
12
13/// The write side of a Photon SPMC channel.
14///
15/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
16/// only one thread may publish at a time (single-producer guarantee enforced
17/// by `&mut self`).
18pub struct Publisher<T: Pod> {
19    pub(super) ring: Arc<SharedRing<T>>,
20    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
21    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
22    pub(super) slots_ptr: *const Slot<T>,
23    /// Cached ring mask (`capacity - 1`). Immutable after construction.
24    pub(super) mask: u64,
25    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
26    pub(super) cursor_ptr: *const AtomicU64,
27    pub(super) seq: u64,
28    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
29    /// check to avoid scanning on every `try_publish` call.
30    pub(super) cached_slowest: u64,
31    /// Cached backpressure flag. Avoids Arc deref + Option check on every
32    /// publish() for lossy channels. Immutable after construction.
33    pub(super) has_backpressure: bool,
34}
35
36unsafe impl<T: Pod> Send for Publisher<T> {}
37
38impl<T: Pod> Publisher<T> {
39    /// Write a single value to the ring without any backpressure check.
40    /// This is the raw publish path used by both `publish()` (lossy) and
41    /// `try_publish()` (after backpressure check passes).
42    #[inline]
43    fn publish_unchecked(&mut self, value: T) {
44        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
45        // Index is masked to stay within the allocated slot array.
46        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
47        prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
48        slot.write(self.seq, value);
49        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
50        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
51        self.seq += 1;
52    }
53
54    /// Publish by writing directly into the slot via a closure.
55    ///
56    /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
57    /// construction that can eliminate the write-side `memcpy` when the
58    /// compiler constructs the value directly in slot memory.
59    ///
60    /// This is the lossy (no backpressure) path. For bounded channels,
61    /// prefer [`publish()`](Self::publish) with a pre-built value.
62    ///
63    /// # Example
64    ///
65    /// ```
66    /// use std::mem::MaybeUninit;
67    /// let (mut p, s) = photon_ring::channel::<u64>(64);
68    /// let mut sub = s.subscribe();
69    /// p.publish_with(|slot| { slot.write(42u64); });
70    /// assert_eq!(sub.try_recv(), Ok(42));
71    /// ```
72    #[inline]
73    pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
74        // SAFETY: see publish_unchecked.
75        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
76        prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
77        slot.write_with(self.seq, f);
78        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
79        self.seq += 1;
80    }
81
82    /// Publish a single value. Zero-allocation, O(1).
83    ///
84    /// On a bounded channel (created with [`channel_bounded()`]), this method
85    /// spin-waits until there is room in the ring, ensuring no message loss.
86    /// On a regular (lossy) channel, this publishes immediately without any
87    /// backpressure check.
88    #[inline]
89    pub fn publish(&mut self, value: T) {
90        if self.has_backpressure {
91            let mut v = value;
92            loop {
93                match self.try_publish(v) {
94                    Ok(()) => return,
95                    Err(PublishError::Full(returned)) => {
96                        v = returned;
97                        core::hint::spin_loop();
98                    }
99                }
100            }
101        }
102        self.publish_unchecked(value);
103    }
104
105    /// Try to publish a single value with backpressure awareness.
106    ///
107    /// - On a regular (lossy) channel created with [`channel()`], this always
108    ///   succeeds — it publishes the value and returns `Ok(())`.
109    /// - On a bounded channel created with [`channel_bounded()`], this checks
110    ///   whether the slowest subscriber has fallen too far behind. If
111    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
112    ///   `Err(PublishError::Full(value))` without writing.
113    #[inline]
114    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
115        if let Some(bp) = self.ring.backpressure.as_ref() {
116            let capacity = self.ring.capacity();
117            let effective = capacity - bp.watermark;
118
119            // Fast path: use cached slowest cursor.
120            if self.seq >= self.cached_slowest + effective {
121                // Slow path: rescan all trackers.
122                match self.ring.slowest_cursor() {
123                    Some(slowest) => {
124                        self.cached_slowest = slowest;
125                        if self.seq >= slowest + effective {
126                            return Err(PublishError::Full(value));
127                        }
128                    }
129                    None => {
130                        // No subscribers registered yet — ring is unbounded.
131                    }
132                }
133            }
134        }
135        self.publish_unchecked(value);
136        Ok(())
137    }
138
139    /// Publish a batch of values.
140    ///
141    /// On a **lossy** channel: writes all values with a single cursor update
142    /// at the end — consumers see the entire batch appear at once, and
143    /// cache-line bouncing on the shared cursor is reduced to one store.
144    ///
145    /// On a **bounded** channel: spin-waits for room before each value,
146    /// ensuring no message loss. The cursor advances per-value (not batched),
147    /// so consumers may observe a partial batch during publication.
148    #[inline]
149    pub fn publish_batch(&mut self, values: &[T]) {
150        if values.is_empty() {
151            return;
152        }
153        if self.has_backpressure {
154            for &v in values.iter() {
155                let mut val = v;
156                loop {
157                    match self.try_publish(val) {
158                        Ok(()) => break,
159                        Err(PublishError::Full(returned)) => {
160                            val = returned;
161                            core::hint::spin_loop();
162                        }
163                    }
164                }
165            }
166            return;
167        }
168        for (i, &v) in values.iter().enumerate() {
169            let seq = self.seq + i as u64;
170            // SAFETY: see publish_unchecked.
171            let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
172            slot.write(seq, v);
173        }
174        let last = self.seq + values.len() as u64 - 1;
175        unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
176        self.seq += values.len() as u64;
177    }
178
179    /// Number of messages published so far.
180    #[inline]
181    pub fn published(&self) -> u64 {
182        self.seq
183    }
184
185    /// Current sequence number (same as `published()`).
186    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
187    #[inline]
188    pub fn sequence(&self) -> u64 {
189        self.seq
190    }
191
192    /// Ring capacity (power of two).
193    #[inline]
194    pub fn capacity(&self) -> u64 {
195        self.ring.capacity()
196    }
197
198    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
199    /// them to disk. Reduces worst-case latency by eliminating page-fault
200    /// stalls on the hot path.
201    ///
202    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
203    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
204    #[cfg(all(target_os = "linux", feature = "hugepages"))]
205    pub fn mlock(&self) -> bool {
206        let ptr = self.ring.slots_ptr() as *const u8;
207        let len = self.ring.slots_byte_len();
208        unsafe { crate::mem::mlock_pages(ptr, len) }
209    }
210
211    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
212    /// page. Ensures the first publish does not trigger a page fault.
213    ///
214    /// # Safety
215    ///
216    /// Must be called before any publish/subscribe operations begin.
217    /// Calling this while the ring is in active use is undefined behavior
218    /// because it writes zero bytes to live ring memory via raw pointers,
219    /// which can corrupt slot data and seqlock stamps.
220    #[cfg(all(target_os = "linux", feature = "hugepages"))]
221    pub unsafe fn prefault(&self) {
222        let ptr = self.ring.slots_ptr() as *mut u8;
223        let len = self.ring.slots_byte_len();
224        crate::mem::prefault_pages(ptr, len)
225    }
226}