Skip to main content

laminar_core/streaming/
channel.rs

1//! Lock-free streaming channels with automatic SPSC → MPSC upgrade.
2//!
3//! This module provides channels optimized for streaming data flow with:
4//! - SPSC mode for single-producer scenarios (fastest)
5//! - Automatic upgrade to MPSC when `Producer::clone()` is called
6//! - Configurable backpressure strategies
7//! - Zero-allocation batch operations
8//!
9//! ## Key Design
10//!
11//! The channel type is NEVER user-specified. It starts as SPSC and
12//! automatically upgrades to MPSC when the producer is cloned.
13//!
14//! ```rust,ignore
15//! let (producer, consumer) = streaming::channel::<u64>(1024);
16//! assert!(!producer.is_mpsc());
17//!
18//! let producer2 = producer.clone();  // Triggers MPSC upgrade
19//! assert!(producer.is_mpsc());
20//! ```
21
22use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering};
23use std::sync::Arc;
24use std::thread;
25use std::time::{Duration, Instant};
26
27use crate::tpc::CachePadded;
28
29use super::config::{BackpressureStrategy, ChannelConfig, ChannelStats, WaitStrategy};
30use super::error::{RecvError, StreamingError, TryPushError};
31use super::ring_buffer::RingBuffer;
32
33/// Channel mode indicator.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35#[repr(u8)]
36pub enum ChannelMode {
37    /// Single-producer single-consumer (fastest).
38    Spsc = 0,
39    /// Multi-producer single-consumer.
40    Mpsc = 1,
41}
42
43impl From<u8> for ChannelMode {
44    fn from(v: u8) -> Self {
45        match v {
46            0 => Self::Spsc,
47            _ => Self::Mpsc,
48        }
49    }
50}
51
52/// Shared state for a channel.
53struct ChannelInner<T> {
54    /// The underlying ring buffer.
55    buffer: RingBuffer<T>,
56
57    /// Channel mode (SPSC or MPSC).
58    mode: AtomicU8,
59
60    /// Number of active producers.
61    producer_count: AtomicUsize,
62
63    /// Whether the channel is closed (all producers dropped).
64    closed: AtomicBool,
65
66    /// Configuration.
67    config: ChannelConfig,
68
69    /// Statistics (if tracking enabled).
70    stats: ChannelStatsInner,
71
72    /// Lock for MPSC mode serialization.
73    /// In MPSC mode, producers acquire this lock to push.
74    /// Value: 0 = unlocked, 1 = locked
75    mpsc_lock: AtomicU8,
76}
77
78/// Internal statistics counters.
79///
80/// `items_pushed` and `items_popped` are cache-padded to prevent false sharing
81/// between producer and consumer threads.
82struct ChannelStatsInner {
83    /// Items pushed by producer (cache-padded to avoid false sharing).
84    items_pushed: CachePadded<AtomicU64>,
85    /// Items popped by consumer (cache-padded to avoid false sharing).
86    items_popped: CachePadded<AtomicU64>,
87    /// Producer-side counters (grouped together).
88    push_blocked: AtomicU64,
89    items_dropped: AtomicU64,
90    /// Consumer-side counter.
91    pop_empty: AtomicU64,
92}
93
94impl ChannelStatsInner {
95    fn new() -> Self {
96        Self {
97            items_pushed: CachePadded::new(AtomicU64::new(0)),
98            items_popped: CachePadded::new(AtomicU64::new(0)),
99            push_blocked: AtomicU64::new(0),
100            items_dropped: AtomicU64::new(0),
101            pop_empty: AtomicU64::new(0),
102        }
103    }
104
105    fn snapshot(&self) -> ChannelStats {
106        ChannelStats {
107            items_pushed: self.items_pushed.load(Ordering::Relaxed),
108            items_popped: self.items_popped.load(Ordering::Relaxed),
109            push_blocked: self.push_blocked.load(Ordering::Relaxed),
110            items_dropped: self.items_dropped.load(Ordering::Relaxed),
111            pop_empty: self.pop_empty.load(Ordering::Relaxed),
112        }
113    }
114}
115
116impl<T> ChannelInner<T> {
117    fn new(config: ChannelConfig) -> Self {
118        Self {
119            buffer: RingBuffer::new(config.effective_buffer_size()),
120            mode: AtomicU8::new(ChannelMode::Spsc as u8),
121            producer_count: AtomicUsize::new(1),
122            closed: AtomicBool::new(false),
123            config,
124            stats: ChannelStatsInner::new(),
125            mpsc_lock: AtomicU8::new(0),
126        }
127    }
128
129    /// Acquire the MPSC lock (spin with exponential backoff).
130    ///
131    /// Uses exponential backoff to reduce contention under load:
132    /// - First 4 attempts: `spin_loop`
133    /// - Next 4 attempts: yield
134    /// - After that: short sleep
135    #[inline]
136    fn acquire_mpsc_lock(&self) {
137        let mut attempts = 0u32;
138
139        while self
140            .mpsc_lock
141            .compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
142            .is_err()
143        {
144            attempts = attempts.saturating_add(1);
145
146            if attempts <= 4 {
147                // Fast path: spin
148                std::hint::spin_loop();
149            } else if attempts <= 8 {
150                // Medium contention: yield to OS
151                thread::yield_now();
152            } else {
153                // High contention: brief sleep with exponential backoff
154                // Cap at 100us to avoid excessive latency
155                let sleep_us = (1 << (attempts - 8).min(6)).min(100);
156                thread::sleep(Duration::from_micros(sleep_us));
157            }
158        }
159    }
160
161    /// Release the MPSC lock.
162    #[inline]
163    fn release_mpsc_lock(&self) {
164        self.mpsc_lock.store(0, Ordering::Release);
165    }
166
167    #[inline]
168    fn mode(&self) -> ChannelMode {
169        ChannelMode::from(self.mode.load(Ordering::Acquire))
170    }
171
172    #[inline]
173    fn is_mpsc(&self) -> bool {
174        self.mode() == ChannelMode::Mpsc
175    }
176
177    #[inline]
178    fn upgrade_to_mpsc(&self) {
179        self.mode.store(ChannelMode::Mpsc as u8, Ordering::Release);
180    }
181
182    #[inline]
183    fn track_push(&self) {
184        if self.config.track_stats {
185            self.stats.items_pushed.fetch_add(1, Ordering::Relaxed);
186        }
187    }
188
189    #[inline]
190    fn track_push_blocked(&self) {
191        if self.config.track_stats {
192            self.stats.push_blocked.fetch_add(1, Ordering::Relaxed);
193        }
194    }
195
196    #[inline]
197    fn track_dropped(&self) {
198        if self.config.track_stats {
199            self.stats.items_dropped.fetch_add(1, Ordering::Relaxed);
200        }
201    }
202
203    #[inline]
204    fn track_pop(&self) {
205        if self.config.track_stats {
206            self.stats.items_popped.fetch_add(1, Ordering::Relaxed);
207        }
208    }
209
210    #[inline]
211    fn track_pop_empty(&self) {
212        if self.config.track_stats {
213            self.stats.pop_empty.fetch_add(1, Ordering::Relaxed);
214        }
215    }
216}
217
218/// Producer handle for sending items into a channel.
219///
220/// Cloning a producer triggers automatic SPSC → MPSC upgrade.
221pub struct Producer<T> {
222    inner: Arc<ChannelInner<T>>,
223}
224
225impl<T> Producer<T> {
226    /// Pushes an item to the channel, blocking if necessary.
227    ///
228    /// Behavior depends on the backpressure strategy:
229    /// - `Block`: Waits until space is available
230    /// - `DropOldest`: Drops the oldest item to make room
231    /// - `Reject`: Returns error immediately if full
232    ///
233    /// # Errors
234    ///
235    /// Returns `StreamingError::ChannelClosed` if all consumers have dropped.
236    /// Returns `StreamingError::ChannelFull` if strategy is `Reject` and buffer is full.
237    pub fn push(&self, item: T) -> Result<(), StreamingError> {
238        if self.inner.closed.load(Ordering::Acquire) {
239            return Err(StreamingError::ChannelClosed);
240        }
241
242        match self.inner.config.backpressure {
243            BackpressureStrategy::Block => self.push_blocking(item),
244            BackpressureStrategy::DropOldest => self.push_drop_oldest(item),
245            BackpressureStrategy::Reject => self.push_reject(item),
246        }
247    }
248
249    /// Tries to push an item without blocking.
250    ///
251    /// Returns `Ok(())` if successful, or `Err(TryPushError)` containing
252    /// the item if the push failed.
253    ///
254    /// # Errors
255    ///
256    /// Returns `TryPushError` if the channel is full or closed.
257    pub fn try_push(&self, item: T) -> Result<(), TryPushError<T>> {
258        if self.inner.closed.load(Ordering::Acquire) {
259            return Err(TryPushError::closed(item));
260        }
261
262        if self.inner.is_mpsc() {
263            self.try_push_mpsc(item)
264        } else {
265            self.try_push_spsc(item)
266        }
267    }
268
269    /// Pushes multiple items, returning the number successfully pushed.
270    ///
271    /// Stops at the first failure (full buffer or closed channel).
272    pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
273        let mut count = 0;
274        for item in items {
275            if self.try_push(item).is_err() {
276                break;
277            }
278            count += 1;
279        }
280        count
281    }
282
283    /// Returns true if the channel is in MPSC mode.
284    #[inline]
285    #[must_use]
286    pub fn is_mpsc(&self) -> bool {
287        self.inner.is_mpsc()
288    }
289
290    /// Returns the channel mode.
291    #[inline]
292    #[must_use]
293    pub fn mode(&self) -> ChannelMode {
294        self.inner.mode()
295    }
296
297    /// Returns true if the channel is closed.
298    #[inline]
299    #[must_use]
300    pub fn is_closed(&self) -> bool {
301        self.inner.closed.load(Ordering::Acquire)
302    }
303
304    /// Returns the number of items currently in the buffer.
305    #[inline]
306    #[must_use]
307    pub fn len(&self) -> usize {
308        self.inner.buffer.len()
309    }
310
311    /// Returns true if the buffer is empty.
312    #[inline]
313    #[must_use]
314    pub fn is_empty(&self) -> bool {
315        self.inner.buffer.is_empty()
316    }
317
318    /// Returns the buffer capacity.
319    #[inline]
320    #[must_use]
321    pub fn capacity(&self) -> usize {
322        self.inner.buffer.capacity()
323    }
324
325    /// Returns statistics for this channel.
326    #[must_use]
327    pub fn stats(&self) -> ChannelStats {
328        self.inner.stats.snapshot()
329    }
330
331    fn push_blocking(&self, mut item: T) -> Result<(), StreamingError> {
332        loop {
333            match self.try_push(item) {
334                Ok(()) => return Ok(()),
335                Err(e) if e.is_closed() => return Err(StreamingError::ChannelClosed),
336                Err(e) => {
337                    self.inner.track_push_blocked();
338                    item = e.into_inner();
339                    self.wait_for_space();
340                }
341            }
342        }
343    }
344
345    fn push_drop_oldest(&self, item: T) -> Result<(), StreamingError> {
346        // Try normal push first
347        match self.try_push(item) {
348            Ok(()) => Ok(()),
349            Err(e) if e.is_closed() => Err(StreamingError::ChannelClosed),
350            Err(e) => {
351                // Buffer is full - we can't actually drop oldest from producer side
352                // because that would require coordinating with consumer.
353                // Instead, we drop the new item and record it.
354                self.inner.track_dropped();
355                // In a real implementation, we'd use a different buffer type
356                // that supports overwriting. For now, just drop the new item.
357                drop(e.into_inner());
358                Ok(())
359            }
360        }
361    }
362
363    fn push_reject(&self, item: T) -> Result<(), StreamingError> {
364        match self.try_push(item) {
365            Ok(()) => Ok(()),
366            Err(e) if e.is_closed() => Err(StreamingError::ChannelClosed),
367            Err(_) => Err(StreamingError::ChannelFull),
368        }
369    }
370
371    #[inline]
372    fn try_push_spsc(&self, item: T) -> Result<(), TryPushError<T>> {
373        match self.inner.buffer.push(item) {
374            Ok(()) => {
375                self.inner.track_push();
376                Ok(())
377            }
378            Err(item) => Err(TryPushError::full(item)),
379        }
380    }
381
382    fn try_push_mpsc(&self, item: T) -> Result<(), TryPushError<T>> {
383        // In MPSC mode, we serialize producers with a spin-lock.
384        // This ensures the ring buffer's SPSC push is safe.
385        self.inner.acquire_mpsc_lock();
386
387        let result = match self.inner.buffer.push(item) {
388            Ok(()) => {
389                self.inner.track_push();
390                Ok(())
391            }
392            Err(item) => Err(TryPushError::full(item)),
393        };
394
395        self.inner.release_mpsc_lock();
396        result
397    }
398
399    fn wait_for_space(&self) {
400        match self.inner.config.wait_strategy {
401            WaitStrategy::Spin => {
402                while self.inner.buffer.is_full() {
403                    std::hint::spin_loop();
404                }
405            }
406            WaitStrategy::SpinYield => {
407                let mut spins = 0;
408                while self.inner.buffer.is_full() {
409                    if spins < 100 {
410                        std::hint::spin_loop();
411                        spins += 1;
412                    } else {
413                        thread::yield_now();
414                        spins = 0;
415                    }
416                }
417            }
418            WaitStrategy::Park => {
419                // Use 100us timeout to balance latency vs syscall overhead
420                while self.inner.buffer.is_full() {
421                    thread::park_timeout(Duration::from_micros(100));
422                }
423            }
424        }
425    }
426}
427
428impl<T> Clone for Producer<T> {
429    fn clone(&self) -> Self {
430        // Upgrade to MPSC mode on clone
431        self.inner.upgrade_to_mpsc();
432
433        // Increment producer count
434        self.inner.producer_count.fetch_add(1, Ordering::AcqRel);
435
436        Self {
437            inner: Arc::clone(&self.inner),
438        }
439    }
440}
441
442impl<T> Drop for Producer<T> {
443    fn drop(&mut self) {
444        let prev = self.inner.producer_count.fetch_sub(1, Ordering::AcqRel);
445        if prev == 1 {
446            // Last producer dropped - close the channel
447            self.inner.closed.store(true, Ordering::Release);
448        }
449    }
450}
451
452impl<T: std::fmt::Debug> std::fmt::Debug for Producer<T> {
453    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454        f.debug_struct("Producer")
455            .field("mode", &self.mode())
456            .field("len", &self.len())
457            .field("capacity", &self.capacity())
458            .field("is_closed", &self.is_closed())
459            .finish()
460    }
461}
462
463/// Consumer handle for receiving items from a channel.
464pub struct Consumer<T> {
465    inner: Arc<ChannelInner<T>>,
466}
467
468impl<T> Consumer<T> {
469    /// Polls for the next item without blocking.
470    ///
471    /// Returns `Some(item)` if available, `None` if the buffer is empty.
472    #[inline]
473    #[must_use]
474    pub fn poll(&self) -> Option<T> {
475        if self.inner.is_mpsc() {
476            self.poll_mpsc()
477        } else {
478            self.poll_spsc()
479        }
480    }
481
482    /// Receives the next item, blocking until one is available.
483    ///
484    /// # Errors
485    ///
486    /// Returns `RecvError::Disconnected` if all producers have dropped
487    /// and the buffer is empty.
488    pub fn recv(&self) -> Result<T, RecvError> {
489        loop {
490            if let Some(item) = self.poll() {
491                return Ok(item);
492            }
493
494            if self.is_disconnected() {
495                return Err(RecvError::Disconnected);
496            }
497
498            self.wait_for_item();
499        }
500    }
501
502    /// Receives the next item with a timeout.
503    ///
504    /// # Errors
505    ///
506    /// Returns `RecvError::Timeout` if no item becomes available within the timeout.
507    /// Returns `RecvError::Disconnected` if all producers have dropped.
508    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
509        let deadline = Instant::now() + timeout;
510
511        loop {
512            if let Some(item) = self.poll() {
513                return Ok(item);
514            }
515
516            if self.is_disconnected() {
517                return Err(RecvError::Disconnected);
518            }
519
520            if Instant::now() >= deadline {
521                return Err(RecvError::Timeout);
522            }
523
524            self.wait_for_item_timeout(deadline);
525        }
526    }
527
528    /// Pops multiple items from the buffer.
529    ///
530    /// Returns a vector of up to `max_count` items.
531    ///
532    /// # Performance Warning
533    ///
534    /// **This method allocates a `Vec` on every call.** Do not use on hot paths
535    /// where allocation overhead matters. For zero-allocation consumption, use
536    /// [`pop_each`](Self::pop_each) or [`pop_batch_into`](Self::pop_batch_into).
537    #[cold]
538    #[must_use]
539    pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
540        let mut items = Vec::with_capacity(max_count.min(self.len()));
541        for _ in 0..max_count {
542            if let Some(item) = self.poll() {
543                items.push(item);
544            } else {
545                break;
546            }
547        }
548        items
549    }
550
551    /// Pops multiple items into a pre-allocated vector (zero-allocation).
552    ///
553    /// Appends up to `max_count` items to the provided vector.
554    /// Returns the number of items added.
555    ///
556    /// # Example
557    ///
558    /// ```rust,ignore
559    /// let mut buffer = Vec::with_capacity(100);
560    /// loop {
561    ///     buffer.clear();
562    ///     let count = consumer.pop_batch_into(&mut buffer, 100);
563    ///     if count == 0 { break; }
564    ///     for item in &buffer {
565    ///         process(item);
566    ///     }
567    /// }
568    /// ```
569    #[inline]
570    pub fn pop_batch_into(&self, buffer: &mut Vec<T>, max_count: usize) -> usize {
571        let mut count = 0;
572        for _ in 0..max_count {
573            if let Some(item) = self.poll() {
574                buffer.push(item);
575                count += 1;
576            } else {
577                break;
578            }
579        }
580        count
581    }
582
583    /// Pops items and calls a callback for each (zero-allocation).
584    ///
585    /// Returns the number of items processed.
586    #[inline]
587    pub fn pop_each<F>(&self, max_count: usize, f: F) -> usize
588    where
589        F: FnMut(T) -> bool,
590    {
591        self.inner.buffer.pop_each(max_count, f)
592    }
593
594    /// Returns true if all producers have dropped.
595    #[inline]
596    #[must_use]
597    pub fn is_disconnected(&self) -> bool {
598        self.inner.closed.load(Ordering::Acquire) && self.inner.buffer.is_empty()
599    }
600
601    /// Returns the number of items in the buffer.
602    #[inline]
603    #[must_use]
604    pub fn len(&self) -> usize {
605        self.inner.buffer.len()
606    }
607
608    /// Returns true if the buffer is empty.
609    #[inline]
610    #[must_use]
611    pub fn is_empty(&self) -> bool {
612        self.inner.buffer.is_empty()
613    }
614
615    /// Returns the buffer capacity.
616    #[inline]
617    #[must_use]
618    pub fn capacity(&self) -> usize {
619        self.inner.buffer.capacity()
620    }
621
622    /// Returns the channel mode.
623    #[inline]
624    #[must_use]
625    pub fn mode(&self) -> ChannelMode {
626        self.inner.mode()
627    }
628
629    /// Returns statistics for this channel.
630    #[must_use]
631    pub fn stats(&self) -> ChannelStats {
632        self.inner.stats.snapshot()
633    }
634
635    #[inline]
636    fn poll_spsc(&self) -> Option<T> {
637        let item = self.inner.buffer.pop();
638        if item.is_some() {
639            self.inner.track_pop();
640        } else {
641            self.inner.track_pop_empty();
642        }
643        item
644    }
645
646    fn poll_mpsc(&self) -> Option<T> {
647        // In MPSC mode, items are published in order via mpsc_published counter
648        // The consumer just pops from the buffer
649        let item = self.inner.buffer.pop();
650        if item.is_some() {
651            self.inner.track_pop();
652        } else {
653            self.inner.track_pop_empty();
654        }
655        item
656    }
657
658    fn wait_for_item(&self) {
659        match self.inner.config.wait_strategy {
660            WaitStrategy::Spin => {
661                while self.inner.buffer.is_empty() && !self.is_disconnected() {
662                    std::hint::spin_loop();
663                }
664            }
665            WaitStrategy::SpinYield => {
666                let mut spins = 0;
667                while self.inner.buffer.is_empty() && !self.is_disconnected() {
668                    if spins < 100 {
669                        std::hint::spin_loop();
670                        spins += 1;
671                    } else {
672                        thread::yield_now();
673                        spins = 0;
674                    }
675                }
676            }
677            WaitStrategy::Park => {
678                while self.inner.buffer.is_empty() && !self.is_disconnected() {
679                    thread::park_timeout(Duration::from_micros(10));
680                }
681            }
682        }
683    }
684
685    fn wait_for_item_timeout(&self, deadline: Instant) {
686        match self.inner.config.wait_strategy {
687            WaitStrategy::Spin | WaitStrategy::SpinYield => {
688                // Just return - the recv_timeout loop will handle deadline
689            }
690            WaitStrategy::Park => {
691                let remaining = deadline.saturating_duration_since(Instant::now());
692                if !remaining.is_zero() {
693                    thread::park_timeout(remaining.min(Duration::from_micros(100)));
694                }
695            }
696        }
697    }
698}
699
700impl<T> Drop for Consumer<T> {
701    fn drop(&mut self) {
702        // Mark channel as closed when consumer drops
703        // This prevents producers from blocking forever
704        self.inner.closed.store(true, Ordering::Release);
705    }
706}
707
708impl<T: std::fmt::Debug> std::fmt::Debug for Consumer<T> {
709    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710        f.debug_struct("Consumer")
711            .field("mode", &self.mode())
712            .field("len", &self.len())
713            .field("capacity", &self.capacity())
714            .field("is_disconnected", &self.is_disconnected())
715            .finish()
716    }
717}
718
719/// Iterator adapter for Consumer.
720impl<T> Iterator for Consumer<T> {
721    type Item = T;
722
723    fn next(&mut self) -> Option<Self::Item> {
724        self.recv().ok()
725    }
726}
727
728/// Creates a new channel with the specified buffer size.
729///
730/// Returns a `(Producer, Consumer)` pair. The channel starts in SPSC mode
731/// and automatically upgrades to MPSC when the producer is cloned.
732#[must_use]
733pub fn channel<T>(buffer_size: usize) -> (Producer<T>, Consumer<T>) {
734    channel_with_config(ChannelConfig::with_buffer_size(buffer_size))
735}
736
737/// Creates a new channel with custom configuration.
738#[must_use]
739pub fn channel_with_config<T>(config: ChannelConfig) -> (Producer<T>, Consumer<T>) {
740    let inner = Arc::new(ChannelInner::new(config));
741
742    let producer = Producer {
743        inner: Arc::clone(&inner),
744    };
745
746    let consumer = Consumer { inner };
747
748    (producer, consumer)
749}
750
751#[cfg(test)]
752mod tests {
753    use super::*;
754
755    #[test]
756    fn test_basic_channel() {
757        let (producer, consumer) = channel::<i32>(16);
758
759        assert!(producer.try_push(1).is_ok());
760        assert!(producer.try_push(2).is_ok());
761        assert!(producer.try_push(3).is_ok());
762
763        assert_eq!(consumer.poll(), Some(1));
764        assert_eq!(consumer.poll(), Some(2));
765        assert_eq!(consumer.poll(), Some(3));
766        assert_eq!(consumer.poll(), None);
767    }
768
769    #[test]
770    fn test_channel_starts_spsc() {
771        let (producer, consumer) = channel::<i32>(16);
772
773        assert!(!producer.is_mpsc());
774        assert_eq!(producer.mode(), ChannelMode::Spsc);
775        assert_eq!(consumer.mode(), ChannelMode::Spsc);
776    }
777
778    #[test]
779    fn test_clone_upgrades_to_mpsc() {
780        let (producer, consumer) = channel::<i32>(16);
781
782        assert!(!producer.is_mpsc());
783
784        let producer2 = producer.clone();
785
786        assert!(producer.is_mpsc());
787        assert!(producer2.is_mpsc());
788        assert_eq!(consumer.mode(), ChannelMode::Mpsc);
789    }
790
791    #[test]
792    fn test_spsc_push_pop() {
793        let (producer, consumer) = channel::<i32>(8);
794
795        for i in 0..7 {
796            assert!(producer.try_push(i).is_ok());
797        }
798
799        // Buffer should be full (capacity 8, but one slot reserved)
800        assert!(producer.try_push(100).is_err());
801
802        for i in 0..7 {
803            assert_eq!(consumer.poll(), Some(i));
804        }
805
806        assert_eq!(consumer.poll(), None);
807    }
808
809    #[test]
810    fn test_mpsc_push_pop() {
811        let (producer, consumer) = channel::<i32>(16);
812
813        let producer2 = producer.clone();
814        assert!(producer.is_mpsc());
815
816        producer.try_push(1).unwrap();
817        producer2.try_push(2).unwrap();
818        producer.try_push(3).unwrap();
819
820        // Items should come out in push order
821        let mut items = Vec::new();
822        while let Some(item) = consumer.poll() {
823            items.push(item);
824        }
825
826        assert_eq!(items.len(), 3);
827        assert!(items.contains(&1));
828        assert!(items.contains(&2));
829        assert!(items.contains(&3));
830    }
831
832    #[test]
833    fn test_push_batch() {
834        let (producer, consumer) = channel::<i32>(16);
835
836        let count = producer.push_batch(vec![1, 2, 3, 4, 5]);
837        assert_eq!(count, 5);
838
839        let items = consumer.pop_batch(10);
840        assert_eq!(items, vec![1, 2, 3, 4, 5]);
841    }
842
843    #[test]
844    fn test_pop_each() {
845        let (producer, consumer) = channel::<i32>(16);
846
847        producer.push_batch(vec![1, 2, 3, 4, 5]);
848
849        let mut sum = 0;
850        let count = consumer.pop_each(10, |item| {
851            sum += item;
852            true
853        });
854
855        assert_eq!(count, 5);
856        assert_eq!(sum, 15);
857    }
858
859    #[test]
860    fn test_recv_timeout() {
861        let (producer, consumer) = channel::<i32>(16);
862
863        // Should timeout on empty channel
864        let result = consumer.recv_timeout(Duration::from_millis(10));
865        assert!(matches!(result, Err(RecvError::Timeout)));
866
867        // Should succeed when item is available
868        producer.try_push(42).unwrap();
869        let result = consumer.recv_timeout(Duration::from_secs(1));
870        assert_eq!(result, Ok(42));
871    }
872
873    #[test]
874    fn test_disconnected_on_producer_drop() {
875        let (producer, consumer) = channel::<i32>(16);
876
877        producer.try_push(1).unwrap();
878        drop(producer);
879
880        // Should still get the buffered item
881        assert_eq!(consumer.poll(), Some(1));
882
883        // Now should be disconnected
884        assert!(consumer.is_disconnected());
885        assert!(matches!(consumer.recv(), Err(RecvError::Disconnected)));
886    }
887
888    #[test]
889    fn test_closed_on_consumer_drop() {
890        let (producer, consumer) = channel::<i32>(16);
891
892        drop(consumer);
893
894        // Push should fail with closed error
895        assert!(producer.is_closed());
896        assert!(matches!(
897            producer.push(1),
898            Err(StreamingError::ChannelClosed)
899        ));
900    }
901
902    #[test]
903    fn test_backpressure_reject() {
904        let config = ChannelConfig::builder()
905            .buffer_size(4)
906            .backpressure(BackpressureStrategy::Reject)
907            .build();
908
909        let (producer, consumer) = channel_with_config::<i32>(config);
910
911        // Fill the buffer (capacity 4 = 3 usable slots)
912        assert!(producer.push(1).is_ok());
913        assert!(producer.push(2).is_ok());
914        assert!(producer.push(3).is_ok());
915
916        // Should reject immediately
917        assert!(matches!(producer.push(4), Err(StreamingError::ChannelFull)));
918
919        // Make room
920        let _ = consumer.poll();
921
922        // Now should succeed
923        assert!(producer.push(4).is_ok());
924    }
925
926    #[test]
927    fn test_backpressure_drop_oldest() {
928        let config = ChannelConfig::builder()
929            .buffer_size(4)
930            .backpressure(BackpressureStrategy::DropOldest)
931            .track_stats(true)
932            .build();
933
934        let (producer, _consumer) = channel_with_config::<i32>(config);
935
936        // Fill the buffer
937        producer.push(1).unwrap();
938        producer.push(2).unwrap();
939        producer.push(3).unwrap();
940
941        // This should succeed (drops the new item in our simple impl)
942        let result = producer.push(4);
943        assert!(result.is_ok());
944
945        // Check stats show a drop
946        let stats = producer.stats();
947        assert!(stats.items_dropped > 0);
948    }
949
950    #[test]
951    fn test_stats_tracking() {
952        let config = ChannelConfig::builder()
953            .buffer_size(16)
954            .track_stats(true)
955            .build();
956
957        let (producer, consumer) = channel_with_config::<i32>(config);
958
959        producer.push_batch(vec![1, 2, 3, 4, 5]);
960        let _ = consumer.pop_batch(3);
961        // 2 items remain, poll them
962        let _ = consumer.poll();
963        let _ = consumer.poll();
964        // Buffer is now empty, these polls should track empty
965        let _ = consumer.poll();
966        let _ = consumer.poll();
967
968        let stats = producer.stats();
969        assert_eq!(stats.items_pushed, 5);
970        assert_eq!(stats.items_popped, 5); // All 5 items popped (3 + 2)
971        assert!(stats.pop_empty >= 2); // At least 2 empty polls
972    }
973
974    #[test]
975    fn test_concurrent_spsc() {
976        const ITEMS: i32 = 10_000;
977        let (producer, consumer) = channel::<i32>(1024);
978
979        let producer_handle = thread::spawn(move || {
980            for i in 0..ITEMS {
981                while producer.try_push(i).is_err() {
982                    thread::yield_now();
983                }
984            }
985        });
986
987        let consumer_handle = thread::spawn(move || {
988            let mut received = Vec::with_capacity(ITEMS as usize);
989            while received.len() < ITEMS as usize {
990                if let Some(item) = consumer.poll() {
991                    received.push(item);
992                } else {
993                    thread::yield_now();
994                }
995            }
996            received
997        });
998
999        producer_handle.join().unwrap();
1000        let received = consumer_handle.join().unwrap();
1001
1002        assert_eq!(received.len(), ITEMS as usize);
1003        for (i, &item) in received.iter().enumerate() {
1004            assert_eq!(item, i32::try_from(i).unwrap());
1005        }
1006    }
1007
1008    #[test]
1009    fn test_concurrent_mpsc() {
1010        const ITEMS_PER_PRODUCER: i32 = 1000;
1011        const NUM_PRODUCERS: usize = 4;
1012
1013        let (producer, consumer) = channel::<i32>(1024);
1014
1015        let mut handles = Vec::new();
1016
1017        for id in 0..NUM_PRODUCERS {
1018            let p = producer.clone();
1019            handles.push(thread::spawn(move || {
1020                for i in 0..ITEMS_PER_PRODUCER {
1021                    let value = i32::try_from(id).unwrap() * ITEMS_PER_PRODUCER + i;
1022                    while p.try_push(value).is_err() {
1023                        thread::yield_now();
1024                    }
1025                }
1026            }));
1027        }
1028
1029        drop(producer); // Drop original producer
1030
1031        let consumer_handle = thread::spawn(move || {
1032            let mut received = Vec::new();
1033            let expected = NUM_PRODUCERS * ITEMS_PER_PRODUCER as usize;
1034            while received.len() < expected {
1035                if let Some(item) = consumer.poll() {
1036                    received.push(item);
1037                } else if consumer.is_disconnected() {
1038                    break;
1039                } else {
1040                    thread::yield_now();
1041                }
1042            }
1043            received
1044        });
1045
1046        for h in handles {
1047            h.join().unwrap();
1048        }
1049
1050        let received = consumer_handle.join().unwrap();
1051        assert_eq!(received.len(), NUM_PRODUCERS * ITEMS_PER_PRODUCER as usize);
1052    }
1053
1054    #[test]
1055    fn test_len_and_capacity() {
1056        let (producer, consumer) = channel::<i32>(16);
1057
1058        assert_eq!(producer.capacity(), 16);
1059        assert_eq!(consumer.capacity(), 16);
1060        assert!(producer.is_empty());
1061        assert!(consumer.is_empty());
1062
1063        producer.push_batch(vec![1, 2, 3]);
1064
1065        assert_eq!(producer.len(), 3);
1066        assert_eq!(consumer.len(), 3);
1067        assert!(!producer.is_empty());
1068    }
1069
1070    #[test]
1071    fn test_consumer_iterator() {
1072        let (producer, mut consumer) = channel::<i32>(16);
1073
1074        producer.push_batch(vec![1, 2, 3]);
1075        drop(producer);
1076
1077        let items: Vec<i32> = consumer.by_ref().collect();
1078        assert_eq!(items, vec![1, 2, 3]);
1079    }
1080
1081    #[test]
1082    fn test_debug_formatting() {
1083        let (producer, consumer) = channel::<i32>(16);
1084
1085        let producer_debug = format!("{producer:?}");
1086        assert!(producer_debug.contains("Producer"));
1087        assert!(producer_debug.contains("Spsc"));
1088
1089        let consumer_debug = format!("{consumer:?}");
1090        assert!(consumer_debug.contains("Consumer"));
1091    }
1092}