Skip to main content

nexus_logbuf/channel/
spsc.rs

1//! Single-producer single-consumer channel.
2//!
3//! Wraps [`queue::spsc`](crate::queue::spsc) with backoff and parking.
4//!
5//! # Philosophy
6//!
7//! **Senders use brief backoff.** They spin, yield, then return error if still
8//! full. Never make syscalls - keeps the hot path fast.
9//!
10//! **Receivers can block.** They use `park_timeout` to wait for messages
11//! without burning CPU. The timeout ensures they periodically check for
12//! disconnection.
13//!
14//! # Example
15//!
16//! ```
17//! use nexus_logbuf::channel::spsc;
18//! use std::thread;
19//!
20//! let (mut tx, mut rx) = spsc::channel(4096);
21//!
22//! thread::spawn(move || {
23//!     let payload = b"hello";
24//!     let mut claim = tx.send(payload.len()).unwrap();
25//!     claim.copy_from_slice(payload);
26//!     claim.commit();
27//!     tx.notify();
28//! });
29//!
30//! let record = rx.recv(None).unwrap();
31//! assert_eq!(&*record, b"hello");
32//! ```
33
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, Ordering};
36use std::time::Duration;
37
38use crossbeam_utils::Backoff;
39
40use crate::queue::spsc as queue;
41
42/// Default park timeout for receivers.
43///
44/// Receivers wake periodically to check for disconnection.
45const DEFAULT_PARK_TIMEOUT: Duration = Duration::from_millis(100);
46
47/// Creates a bounded SPSC channel.
48///
49/// Capacity is rounded up to the next power of two.
50///
51/// # Panics
52///
53/// Panics if `capacity` is less than 16 bytes.
54pub fn channel(capacity: usize) -> (Sender, Receiver) {
55    let (producer, consumer) = queue::new(capacity);
56
57    let shared = Arc::new(ChannelShared {
58        receiver_waiting: AtomicBool::new(false),
59        sender_disconnected: AtomicBool::new(false),
60        receiver_disconnected: AtomicBool::new(false),
61    });
62
63    let parker = crossbeam_utils::sync::Parker::new();
64    let unparker = parker.unparker().clone();
65
66    (
67        Sender {
68            inner: producer,
69            receiver_unparker: unparker,
70            shared: Arc::clone(&shared),
71        },
72        Receiver {
73            inner: consumer,
74            parker,
75            shared,
76        },
77    )
78}
79
80/// Shared state between sender and receiver.
81struct ChannelShared {
82    /// True if receiver is parked and waiting.
83    receiver_waiting: AtomicBool,
84    /// True if sender has been dropped.
85    sender_disconnected: AtomicBool,
86    /// True if receiver has been dropped.
87    receiver_disconnected: AtomicBool,
88}
89
90// ============================================================================
91// Sender
92// ============================================================================
93
94/// Sending half of the SPSC channel.
95///
96/// **Never blocks with syscalls.** Uses brief backoff (spin + yield) then
97/// returns error if buffer is full.
98pub struct Sender {
99    inner: queue::Producer,
100    receiver_unparker: crossbeam_utils::sync::Unparker,
101    shared: Arc<ChannelShared>,
102}
103
104/// Error returned from [`Sender::send`] when the receiver has been dropped.
105///
106/// `send` has only one runtime failure mode — the receiver is gone. Passing
107/// `len == 0` is a precondition violation and panics (see
108/// [`queue::Producer::try_claim`] for details).
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub struct ChannelClosed;
111
112impl std::fmt::Display for ChannelClosed {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.write_str("channel disconnected")
115    }
116}
117
118impl std::error::Error for ChannelClosed {}
119
120/// Error returned from [`Sender::try_send`].
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum TrySendError {
123    /// The buffer is full.
124    Full,
125    /// The receiver has been dropped.
126    Disconnected,
127}
128
129impl std::fmt::Display for TrySendError {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        match self {
132            Self::Full => write!(f, "channel full"),
133            Self::Disconnected => write!(f, "channel disconnected"),
134        }
135    }
136}
137
138impl std::error::Error for TrySendError {}
139
140impl Sender {
141    /// Claims space for a record, spinning until space is available.
142    ///
143    /// **Never makes syscalls.** Spins and yields until the buffer has space
144    /// or the receiver disconnects.
145    ///
146    /// After receiving a [`WriteClaim`](queue::WriteClaim), write your payload
147    /// and call [`commit()`](queue::WriteClaim::commit) to publish. Then call
148    /// [`notify()`](Self::notify) to wake a parked receiver.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`ChannelClosed`] if the receiver was dropped.
153    ///
154    /// # Panics
155    ///
156    /// Panics if `len == 0` (see [`queue::Producer::try_claim`]).
157    #[inline]
158    pub fn send(&mut self, len: usize) -> Result<queue::WriteClaim<'_>, ChannelClosed> {
159        // Precondition check before any state inspection — `len == 0` is a
160        // contract violation regardless of channel state, and the doc
161        // contract is honest only if it panics unconditionally.
162        assert!(len > 0, "payload length must be non-zero");
163        if self.shared.receiver_disconnected.load(Ordering::Relaxed) {
164            return Err(ChannelClosed);
165        }
166
167        let backoff = Backoff::new();
168
169        loop {
170            // SAFETY: We only return the claim when we get one, at which point
171            // the loop terminates. The borrow checker can't prove this, but there
172            // is never a second mutable borrow while the first is alive.
173            // This is a known borrow checker limitation that Polonius handles.
174            unsafe {
175                let inner_ptr: *mut queue::Producer = &raw mut self.inner;
176                if let Ok(claim) = (*inner_ptr).try_claim(len) {
177                    return Ok(std::mem::transmute::<
178                        queue::WriteClaim<'_>,
179                        queue::WriteClaim<'_>,
180                    >(claim));
181                }
182                // BufferFull — wait for receiver to drain.
183                backoff.snooze();
184                if self.shared.receiver_disconnected.load(Ordering::Relaxed) {
185                    return Err(ChannelClosed);
186                }
187                // Reset backoff after it completes to keep spinning
188                if backoff.is_completed() {
189                    backoff.reset();
190                }
191            }
192        }
193    }
194
195    /// Attempts to claim space for a record without any waiting.
196    ///
197    /// # Errors
198    ///
199    /// - [`TrySendError::Full`] if buffer is full
200    /// - [`TrySendError::Disconnected`] if receiver was dropped
201    ///
202    /// # Panics
203    ///
204    /// Panics if `len == 0` (see [`queue::Producer::try_claim`]).
205    #[inline]
206    pub fn try_send(&mut self, len: usize) -> Result<queue::WriteClaim<'_>, TrySendError> {
207        // Precondition check before any state inspection — see `send` for why.
208        assert!(len > 0, "payload length must be non-zero");
209        if self.shared.receiver_disconnected.load(Ordering::Relaxed) {
210            return Err(TrySendError::Disconnected);
211        }
212
213        match self.inner.try_claim(len) {
214            Ok(claim) => Ok(claim),
215            Err(crate::BufferFull) => Err(TrySendError::Full),
216        }
217    }
218
219    /// Notifies the receiver that data is available.
220    ///
221    /// Call this after committing a write to wake a parked receiver.
222    /// Cheap no-op if receiver isn't parked.
223    #[inline]
224    pub fn notify(&self) {
225        if self.shared.receiver_waiting.load(Ordering::Relaxed) {
226            self.receiver_unparker.unpark();
227        }
228    }
229
230    /// Returns the capacity of the underlying buffer.
231    #[inline]
232    pub fn capacity(&self) -> usize {
233        self.inner.capacity()
234    }
235
236    /// Returns `true` if the receiver has been dropped.
237    #[inline]
238    pub fn is_disconnected(&self) -> bool {
239        self.shared.receiver_disconnected.load(Ordering::Relaxed)
240    }
241}
242
243impl Drop for Sender {
244    fn drop(&mut self) {
245        self.shared
246            .sender_disconnected
247            .store(true, Ordering::Relaxed);
248        // Wake receiver so it can observe disconnection
249        self.receiver_unparker.unpark();
250    }
251}
252
253impl std::fmt::Debug for Sender {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        f.debug_struct("Sender")
256            .field("capacity", &self.capacity())
257            .finish_non_exhaustive()
258    }
259}
260
261// ============================================================================
262// Receiver
263// ============================================================================
264
265/// Receiving half of the SPSC channel.
266///
267/// **Can block with syscalls.** Uses `park_timeout` to wait for messages
268/// without burning CPU.
269pub struct Receiver {
270    inner: queue::Consumer,
271    parker: crossbeam_utils::sync::Parker,
272    shared: Arc<ChannelShared>,
273}
274
275/// Error returned from [`Receiver::recv`].
276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
277pub enum RecvError {
278    /// The timeout elapsed before a message arrived.
279    ///
280    /// Only returned when a timeout was specified.
281    Timeout,
282    /// The sender has been dropped and the buffer is empty.
283    Disconnected,
284}
285
286impl std::fmt::Display for RecvError {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        match self {
289            Self::Timeout => write!(f, "receive timed out"),
290            Self::Disconnected => write!(f, "channel disconnected"),
291        }
292    }
293}
294
295impl std::error::Error for RecvError {}
296
297impl Receiver {
298    /// Blocks until a message is available or the optional timeout elapses.
299    ///
300    /// - `None` — block forever (or until disconnected)
301    /// - `Some(Duration::ZERO)` — single try, no spinning
302    /// - `Some(duration)` — block up to `duration`
303    ///
304    /// Uses backoff (spin → yield) then parks.
305    ///
306    /// # Errors
307    ///
308    /// - [`RecvError::Timeout`] if timeout elapsed (only when `Some`)
309    /// - [`RecvError::Disconnected`] if sender was dropped and buffer is empty
310    #[inline]
311    pub fn recv(&mut self, timeout: Option<Duration>) -> Result<queue::ReadClaim<'_>, RecvError> {
312        // Fast path for zero timeout - single try, no spinning
313        if timeout == Some(Duration::ZERO) {
314            // SAFETY: We only return the claim when we get one, at which point
315            // the function returns. The borrow checker can't prove this, but there
316            // is never a second mutable borrow while the first is alive.
317            // This is a known borrow checker limitation that Polonius handles.
318            unsafe {
319                let inner_ptr: *mut queue::Consumer = &raw mut self.inner;
320                if let Some(claim) = (*inner_ptr).try_claim() {
321                    return Ok(std::mem::transmute::<
322                        queue::ReadClaim<'_>,
323                        queue::ReadClaim<'_>,
324                    >(claim));
325                }
326            }
327            if self.shared.sender_disconnected.load(Ordering::Relaxed) {
328                return Err(RecvError::Disconnected);
329            }
330            return Err(RecvError::Timeout);
331        }
332
333        let park_timeout = timeout.unwrap_or(DEFAULT_PARK_TIMEOUT);
334        let backoff = Backoff::new();
335
336        loop {
337            // SAFETY: We only return the claim when we get one, at which point
338            // the loop terminates. The borrow checker can't prove this, but there
339            // is never a second mutable borrow while the first is alive.
340            // This is a known borrow checker limitation that Polonius handles.
341            unsafe {
342                let inner_ptr: *mut queue::Consumer = &raw mut self.inner;
343                if let Some(claim) = (*inner_ptr).try_claim() {
344                    return Ok(std::mem::transmute::<
345                        queue::ReadClaim<'_>,
346                        queue::ReadClaim<'_>,
347                    >(claim));
348                }
349            }
350
351            if self.shared.sender_disconnected.load(Ordering::Relaxed) {
352                return Err(RecvError::Disconnected);
353            }
354
355            // Backoff phase: spin/yield without syscalls
356            if !backoff.is_completed() {
357                backoff.snooze();
358                continue;
359            }
360
361            // Park phase
362            self.shared.receiver_waiting.store(true, Ordering::Relaxed);
363            self.parker.park_timeout(park_timeout);
364            self.shared.receiver_waiting.store(false, Ordering::Relaxed);
365
366            // For Some(timeout), only park once then return Timeout
367            // For None, loop back and try again
368            if timeout.is_some() {
369                // Final try after park
370                // SAFETY: Same as above - borrow checker limitation workaround.
371                unsafe {
372                    let inner_ptr: *mut queue::Consumer = &raw mut self.inner;
373                    if let Some(claim) = (*inner_ptr).try_claim() {
374                        return Ok(std::mem::transmute::<
375                            queue::ReadClaim<'_>,
376                            queue::ReadClaim<'_>,
377                        >(claim));
378                    }
379                }
380
381                if self.shared.sender_disconnected.load(Ordering::Relaxed) {
382                    return Err(RecvError::Disconnected);
383                }
384
385                return Err(RecvError::Timeout);
386            }
387
388            // None case: reset backoff and loop
389            backoff.reset();
390        }
391    }
392
393    /// Attempts to receive a message without blocking.
394    ///
395    /// Returns `None` if no message is available.
396    #[inline]
397    pub fn try_recv(&mut self) -> Option<queue::ReadClaim<'_>> {
398        self.inner.try_claim()
399    }
400
401    /// Returns the capacity of the underlying buffer.
402    #[inline]
403    pub fn capacity(&self) -> usize {
404        self.inner.capacity()
405    }
406
407    /// Returns `true` if the sender has been dropped.
408    #[inline]
409    pub fn is_disconnected(&self) -> bool {
410        self.shared.sender_disconnected.load(Ordering::Relaxed)
411    }
412}
413
414impl Drop for Receiver {
415    fn drop(&mut self) {
416        self.shared
417            .receiver_disconnected
418            .store(true, Ordering::Relaxed);
419    }
420}
421
422impl std::fmt::Debug for Receiver {
423    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
424        f.debug_struct("Receiver")
425            .field("capacity", &self.capacity())
426            .finish_non_exhaustive()
427    }
428}
429
430// ============================================================================
431// Tests
432// ============================================================================
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437    use std::thread;
438
439    #[test]
440    fn basic_send_recv() {
441        let (mut tx, mut rx) = channel(1024);
442
443        let payload = b"hello world";
444        let mut claim = tx.send(payload.len()).unwrap();
445        claim.copy_from_slice(payload);
446        claim.commit();
447        tx.notify();
448
449        let record = rx.recv(None).unwrap();
450        assert_eq!(&*record, payload);
451    }
452
453    #[test]
454    fn try_send_try_recv() {
455        let (mut tx, mut rx) = channel(1024);
456
457        assert!(rx.try_recv().is_none());
458
459        let payload = b"test";
460        let mut claim = tx.try_send(payload.len()).unwrap();
461        claim.copy_from_slice(payload);
462        claim.commit();
463
464        {
465            let record = rx.try_recv().unwrap();
466            assert_eq!(&*record, payload);
467        } // record dropped here
468
469        assert!(rx.try_recv().is_none());
470    }
471
472    #[test]
473    fn cross_thread() {
474        let (mut tx, mut rx) = channel(4096);
475
476        let producer = thread::spawn(move || {
477            for i in 0..1000u64 {
478                let payload = i.to_le_bytes();
479                {
480                    let mut claim = tx.send(payload.len()).unwrap();
481                    claim.copy_from_slice(&payload);
482                    claim.commit();
483                } // claim dropped here
484                tx.notify();
485            }
486        });
487
488        let consumer = thread::spawn(move || {
489            for i in 0..1000u64 {
490                let record = rx.recv(None).unwrap();
491                let value = u64::from_le_bytes((*record).try_into().unwrap());
492                assert_eq!(value, i);
493            }
494        });
495
496        producer.join().unwrap();
497        consumer.join().unwrap();
498    }
499
500    #[test]
501    fn disconnection_sender_dropped() {
502        let (tx, mut rx) = channel(1024);
503
504        drop(tx);
505
506        match rx.recv(None) {
507            Err(RecvError::Disconnected) => {}
508            _ => panic!("expected Disconnected"),
509        }
510    }
511
512    #[test]
513    fn disconnection_receiver_dropped() {
514        let (mut tx, rx) = channel(1024);
515
516        drop(rx);
517
518        match tx.send(8) {
519            Err(ChannelClosed) => {}
520            _ => panic!("expected ChannelClosed"),
521        }
522    }
523
524    #[test]
525    fn recv_timeout_works() {
526        let (_tx, mut rx) = channel(1024);
527
528        let start = std::time::Instant::now();
529        let result = rx.recv(Some(Duration::from_millis(50)));
530        let elapsed = start.elapsed();
531
532        assert!(matches!(result, Err(RecvError::Timeout)));
533        assert!(elapsed >= Duration::from_millis(40)); // Some tolerance
534        assert!(elapsed < Duration::from_millis(200));
535    }
536
537    #[test]
538    fn recv_timeout_with_data() {
539        let (mut tx, mut rx) = channel(1024);
540
541        let payload = b"data";
542        let mut claim = tx.send(payload.len()).unwrap();
543        claim.copy_from_slice(payload);
544        claim.commit();
545        tx.notify();
546
547        let result = rx.recv(Some(Duration::from_secs(1)));
548        assert!(result.is_ok());
549        assert_eq!(&*result.unwrap(), payload);
550    }
551
552    #[test]
553    fn try_send_returns_full() {
554        let (mut tx, _rx) = channel(64);
555
556        // Fill the buffer with try_send
557        let mut count = 0;
558        loop {
559            match tx.try_send(8) {
560                Ok(mut claim) => {
561                    claim.copy_from_slice(b"12345678");
562                    claim.commit();
563                    count += 1;
564                }
565                Err(TrySendError::Full) => break,
566                Err(e) => panic!("unexpected error: {:?}", e),
567            }
568        }
569
570        assert!(count > 0);
571    }
572
573    #[test]
574    #[should_panic(expected = "payload length must be non-zero")]
575    fn send_zero_panics() {
576        let (mut tx, _rx) = channel(1024);
577        let _ = tx.send(0);
578    }
579
580    #[test]
581    #[should_panic(expected = "payload length must be non-zero")]
582    fn try_send_zero_panics() {
583        let (mut tx, _rx) = channel(1024);
584        let _ = tx.try_send(0);
585    }
586}