Skip to main content

rust_tango/
lib.rs

1//! A lock-free, high-performance IPC channel inspired by Firedancer's Tango.
2//!
3//! This crate provides a single-producer single-consumer (SPSC) channel optimized
4//! for low-latency, high-throughput message passing. It uses lock-free algorithms
5//! with busy-polling for minimal latency.
6//!
7//! # Features
8//!
9//! - **Zero-copy reads**: Access message payloads directly without allocation
10//! - **Lock-free**: No mutexes, just atomic operations with careful memory ordering
11//! - **Backpressure**: Optional credit-based flow control via [`Fctl`]
12//! - **Overrun detection**: Consumers detect when they've been lapped by producers
13//! - **Metrics**: Built-in observability with [`Metrics`]
14//! - **`no_std` support**: Works in embedded/kernel environments (disable `std` feature)
15//!
16//! # Architecture
17//!
18//! - [`MCache`]: Ring buffer of fragment metadata with sequence-based validation
19//! - [`DCache`]: Fixed-size chunk storage for payloads
20//! - [`Producer`] / [`Consumer`]: Ergonomic publish and consume APIs
21//! - [`Fctl`]: Credit counter for backpressure
22//! - [`Fseq`]: Shared sequence counter
23//!
24//! # Quick Start
25//!
26//! ```
27//! use rust_tango::{Consumer, DCache, Fseq, MCache, Producer};
28//!
29//! // Create the channel components
30//! let mcache = MCache::<64>::new();      // 64-slot metadata ring buffer
31//! let dcache = DCache::<64, 256>::new(); // 64 chunks of 256 bytes each
32//! let fseq = Fseq::new(1);               // Sequence counter starting at 1
33//!
34//! let producer = Producer::new(&mcache, &dcache, &fseq);
35//! let mut consumer = Consumer::new(&mcache, &dcache, 1);
36//!
37//! // Publish a message
38//! producer.publish(b"hello", 0, 0, 0).unwrap();
39//!
40//! // Consume it (zero-copy)
41//! if let Ok(Some(fragment)) = consumer.poll() {
42//!     assert_eq!(fragment.payload.as_slice(), b"hello");
43//! }
44//! ```
45//!
46//! # With Flow Control
47//!
48//! Use [`Fctl`] to prevent the producer from overwriting unconsumed messages:
49//!
50//! ```
51//! use rust_tango::{Consumer, DCache, Fctl, Fseq, MCache, Producer};
52//!
53//! let mcache = MCache::<64>::new();
54//! let dcache = DCache::<64, 256>::new();
55//! let fseq = Fseq::new(1);
56//! let fctl = Fctl::new(64); // 64 credits = buffer capacity
57//!
58//! let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
59//! let mut consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
60//!
61//! // Producer blocks when buffer is full (returns NoCredits error)
62//! // Consumer automatically releases credits after consuming
63//! ```
64//!
65//! # With Metrics
66//!
67//! Track throughput, lag, and errors:
68//!
69//! ```
70//! use rust_tango::{Consumer, DCache, Fseq, MCache, Metrics, Producer};
71//!
72//! let mcache = MCache::<64>::new();
73//! let dcache = DCache::<64, 256>::new();
74//! let fseq = Fseq::new(1);
75//! let metrics = Metrics::new();
76//!
77//! let producer = Producer::new(&mcache, &dcache, &fseq)
78//!     .with_metrics(&metrics);
79//! let mut consumer = Consumer::new(&mcache, &dcache, 1)
80//!     .with_metrics(&metrics);
81//!
82//! // ... publish and consume ...
83//! # producer.publish(b"test", 0, 0, 0).unwrap();
84//! # consumer.poll().unwrap();
85//!
86//! let snapshot = metrics.snapshot();
87//! println!("Lag: {} messages", snapshot.lag());
88//! ```
89//!
90//! # Performance Characteristics
91//!
92//! Benchmarked on Apple M3 Pro (1000 samples, 10K messages, 64-byte payload):
93//!
94//! | Scenario | Tango | std | crossbeam | ringbuf |
95//! |----------|-------|-----|-----------|---------|
96//! | SPSC throughput | **26.6M msg/s** | 9.1M msg/s | 7.3M msg/s | 10.7M msg/s |
97//! | Ping-pong latency (100 trips) | **90 µs** | 380 µs | 105 µs | 100 µs |
98//! | Large payload (1KB) | **12.4 GiB/s** | - | 6.9 GiB/s | 9.4 GiB/s |
99//!
100//! Best suited for:
101//! - Single-producer single-consumer scenarios
102//! - Latency-sensitive applications
103//! - High-throughput message passing
104//! - When you can dedicate a core to busy-polling
105//!
106//! # Memory Ordering
107//!
108//! The lock-free protocol:
109//! 1. Producer writes payload to [`DCache`]
110//! 2. Producer writes metadata to [`MCache`] slot
111//! 3. Producer stores sequence number with `Release` ordering
112//! 4. Consumer loads sequence with `Acquire`, reads metadata, re-checks sequence
113//!
114//! This double-read validation detects overwrites without locks.
115
116#![cfg_attr(not(feature = "std"), no_std)]
117
118#[cfg(feature = "std")]
119extern crate std;
120
121// Conditional imports for loom testing vs normal operation
122#[cfg(loom)]
123use loom::cell::UnsafeCell;
124#[cfg(loom)]
125use loom::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
126
127#[cfg(not(loom))]
128use core::cell::UnsafeCell;
129#[cfg(not(loom))]
130use core::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
131
132use core::fmt;
133use core::mem::size_of;
134
135#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
136#[repr(C, align(16))]
137pub struct FragmentMetadata {
138    /// Monotonic sequence number used for ordering and overwrite detection.
139    pub seq: u64,
140    /// Signature or tag used for identification or filtering.
141    pub sig: u64,
142    /// Index into the data cache for the payload bytes.
143    pub chunk: u32,
144    /// Payload size in bytes.
145    pub size: u32,
146    /// Control bits for application-specific signaling.
147    pub ctl: u16,
148    /// Reserved bits for future expansion.
149    pub reserved: u16,
150    /// Timestamp or timing metadata.
151    pub ts: u32,
152}
153
154const _: () = {
155    assert!(size_of::<FragmentMetadata>() == 32);
156};
157
158impl fmt::Display for FragmentMetadata {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        write!(
161            f,
162            "Fragment {{ seq={}, sig={:#x}, chunk={}, size={}, ctl={}, ts={} }}",
163            self.seq, self.sig, self.chunk, self.size, self.ctl, self.ts
164        )
165    }
166}
167
168/// Errors that can occur during tango operations.
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170#[must_use = "this error should be handled"]
171pub enum TangoError {
172    /// The DCache has no more chunks available (only without flow control).
173    DcacheFull,
174    /// The chunk index is out of range.
175    ChunkOutOfRange(u32),
176    /// Consumer was too slow and was lapped by the producer.
177    Overrun,
178    /// No credits available (flow control backpressure).
179    NoCredits,
180}
181
182impl fmt::Display for TangoError {
183    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184        match self {
185            TangoError::DcacheFull => write!(f, "dcache is out of capacity"),
186            TangoError::ChunkOutOfRange(idx) => write!(f, "chunk index {} out of range", idx),
187            TangoError::Overrun => write!(f, "consumer overrun: producer lapped the consumer"),
188            TangoError::NoCredits => write!(f, "no credits available for backpressure"),
189        }
190    }
191}
192
193#[cfg(feature = "std")]
194impl std::error::Error for TangoError {}
195
196/// Result of attempting to read from the MCache.
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198#[must_use = "this `ReadResult` may contain data that should be handled"]
199pub enum ReadResult<T> {
200    /// Successfully read the data.
201    Ok(T),
202    /// The sequence number has not been published yet.
203    NotReady,
204    /// The consumer was too slow and the slot was overwritten.
205    Overrun,
206}
207
208impl<T> ReadResult<T> {
209    /// Returns `true` if the result is `Ok`.
210    #[inline]
211    pub fn is_ok(&self) -> bool {
212        matches!(self, ReadResult::Ok(_))
213    }
214
215    /// Returns `true` if the result is `NotReady`.
216    #[inline]
217    pub fn is_not_ready(&self) -> bool {
218        matches!(self, ReadResult::NotReady)
219    }
220
221    /// Returns `true` if the result is `Overrun`.
222    #[inline]
223    pub fn is_overrun(&self) -> bool {
224        matches!(self, ReadResult::Overrun)
225    }
226
227    /// Converts to `Option<T>`, discarding the error variant.
228    #[inline]
229    pub fn ok(self) -> Option<T> {
230        match self {
231            ReadResult::Ok(v) => Some(v),
232            _ => None,
233        }
234    }
235}
236
237impl<T: fmt::Debug> fmt::Display for ReadResult<T> {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        match self {
240            ReadResult::Ok(v) => write!(f, "Ok({:?})", v),
241            ReadResult::NotReady => write!(f, "NotReady"),
242            ReadResult::Overrun => write!(f, "Overrun"),
243        }
244    }
245}
246
247/// Metrics for observability and monitoring.
248///
249/// All counters are atomically updated and can be read from any thread.
250/// Use `snapshot()` to get a consistent point-in-time view.
251#[derive(Debug)]
252pub struct Metrics {
253    /// Total messages published.
254    published: AtomicU64,
255    /// Total messages consumed.
256    consumed: AtomicU64,
257    /// Total overruns detected (consumer was lapped).
258    overruns: AtomicU64,
259    /// Total times producer was blocked due to no credits.
260    backpressure_events: AtomicU64,
261}
262
263/// A point-in-time snapshot of metrics.
264#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
265#[must_use = "this snapshot contains metrics data that should be used"]
266pub struct MetricsSnapshot {
267    /// Total messages published.
268    pub published: u64,
269    /// Total messages consumed.
270    pub consumed: u64,
271    /// Total overruns detected.
272    pub overruns: u64,
273    /// Total backpressure events.
274    pub backpressure_events: u64,
275}
276
277impl MetricsSnapshot {
278    /// Returns the current consumer lag (published - consumed).
279    #[inline]
280    pub fn lag(&self) -> u64 {
281        self.published.saturating_sub(self.consumed)
282    }
283}
284
285impl fmt::Display for MetricsSnapshot {
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287        write!(
288            f,
289            "published={}, consumed={}, lag={}, overruns={}, backpressure={}",
290            self.published,
291            self.consumed,
292            self.lag(),
293            self.overruns,
294            self.backpressure_events
295        )
296    }
297}
298
299impl Default for Metrics {
300    fn default() -> Self {
301        Self::new()
302    }
303}
304
305impl Metrics {
306    /// Create a new metrics instance with all counters at zero.
307    pub fn new() -> Self {
308        Self {
309            published: AtomicU64::new(0),
310            consumed: AtomicU64::new(0),
311            overruns: AtomicU64::new(0),
312            backpressure_events: AtomicU64::new(0),
313        }
314    }
315
316    /// Record a published message.
317    #[inline]
318    pub fn record_publish(&self) {
319        self.published.fetch_add(1, Ordering::Relaxed);
320    }
321
322    /// Record a consumed message.
323    #[inline]
324    pub fn record_consume(&self) {
325        self.consumed.fetch_add(1, Ordering::Relaxed);
326    }
327
328    /// Record an overrun event.
329    #[inline]
330    pub fn record_overrun(&self) {
331        self.overruns.fetch_add(1, Ordering::Relaxed);
332    }
333
334    /// Record a backpressure event (producer blocked on credits).
335    #[inline]
336    pub fn record_backpressure(&self) {
337        self.backpressure_events.fetch_add(1, Ordering::Relaxed);
338    }
339
340    /// Get a consistent snapshot of all metrics.
341    pub fn snapshot(&self) -> MetricsSnapshot {
342        // Use Acquire to ensure we see all prior increments
343        MetricsSnapshot {
344            published: self.published.load(Ordering::Acquire),
345            consumed: self.consumed.load(Ordering::Acquire),
346            overruns: self.overruns.load(Ordering::Acquire),
347            backpressure_events: self.backpressure_events.load(Ordering::Acquire),
348        }
349    }
350
351    /// Reset all counters to zero.
352    pub fn reset(&self) {
353        self.published.store(0, Ordering::Release);
354        self.consumed.store(0, Ordering::Release);
355        self.overruns.store(0, Ordering::Release);
356        self.backpressure_events.store(0, Ordering::Release);
357    }
358}
359
360/// Command-and-control state for coordinating threads.
361#[derive(Debug, Clone, Copy, Eq, PartialEq)]
362pub enum CncState {
363    Boot = 0,
364    Run = 1,
365    Halt = 2,
366}
367
368impl CncState {
369    fn from_u8(value: u8) -> Self {
370        match value {
371            0 => CncState::Boot,
372            1 => CncState::Run,
373            _ => CncState::Halt,
374        }
375    }
376}
377
378impl fmt::Display for CncState {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        match self {
381            CncState::Boot => write!(f, "Boot"),
382            CncState::Run => write!(f, "Run"),
383            CncState::Halt => write!(f, "Halt"),
384        }
385    }
386}
387
388#[derive(Debug)]
389pub struct Cnc {
390    state: AtomicU8,
391}
392
393impl Default for Cnc {
394    fn default() -> Self {
395        Self::new()
396    }
397}
398
399impl Cnc {
400    pub fn new() -> Self {
401        Self {
402            state: AtomicU8::new(CncState::Boot as u8),
403        }
404    }
405
406    pub fn state(&self) -> CncState {
407        CncState::from_u8(self.state.load(Ordering::Acquire))
408    }
409
410    pub fn set_state(&self, state: CncState) {
411        self.state.store(state as u8, Ordering::Release);
412    }
413}
414
415#[derive(Debug)]
416pub struct Fseq {
417    next: AtomicU64,
418}
419
420impl Fseq {
421    pub fn new(initial: u64) -> Self {
422        Self {
423            next: AtomicU64::new(initial),
424        }
425    }
426
427    pub fn next(&self) -> u64 {
428        self.next.fetch_add(1, Ordering::AcqRel)
429    }
430
431    pub fn current(&self) -> u64 {
432        self.next.load(Ordering::Acquire)
433    }
434}
435
436#[derive(Debug)]
437pub struct Fctl {
438    credits: AtomicU64,
439}
440
441impl Fctl {
442    pub fn new(initial: u64) -> Self {
443        Self {
444            credits: AtomicU64::new(initial),
445        }
446    }
447
448    #[must_use = "returns whether the credits were successfully acquired"]
449    pub fn acquire(&self, amount: u64) -> bool {
450        let mut current = self.credits.load(Ordering::Acquire);
451        loop {
452            if current < amount {
453                return false;
454            }
455            match self.credits.compare_exchange(
456                current,
457                current - amount,
458                Ordering::AcqRel,
459                Ordering::Acquire,
460            ) {
461                Ok(_) => return true,
462                Err(next) => current = next,
463            }
464        }
465    }
466
467    pub fn release(&self, amount: u64) {
468        self.credits.fetch_add(amount, Ordering::AcqRel);
469    }
470
471    pub fn available(&self) -> u64 {
472        self.credits.load(Ordering::Acquire)
473    }
474}
475
476const fn is_power_of_two(value: usize) -> bool {
477    value != 0 && (value & (value - 1)) == 0
478}
479
480/// Lock-free tag cache using a hashed bitset.
481#[derive(Debug)]
482pub struct Tcache<const WORDS: usize> {
483    bits: [AtomicU64; WORDS],
484    mask: u64,
485}
486
487impl<const WORDS: usize> Default for Tcache<WORDS> {
488    fn default() -> Self {
489        Self::new()
490    }
491}
492
493impl<const WORDS: usize> Tcache<WORDS> {
494    const BIT_COUNT: usize = WORDS * 64;
495    const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(Self::BIT_COUNT));
496
497    /// Create a tag cache backed by a power-of-two number of bits.
498    pub fn new() -> Self {
499        let () = Self::ASSERT_POWER_OF_TWO;
500        Self {
501            bits: core::array::from_fn(|_| AtomicU64::new(0)),
502            mask: (Self::BIT_COUNT - 1) as u64,
503        }
504    }
505
506    pub fn check_and_insert(&self, tag: u64) -> bool {
507        let bit = tag.wrapping_mul(0x9E37_79B9_7F4A_7C15) & self.mask;
508        let word_idx = (bit / 64) as usize;
509        let bit_mask = 1u64 << (bit % 64);
510        let prev = self.bits[word_idx].fetch_or(bit_mask, Ordering::AcqRel);
511        (prev & bit_mask) == 0
512    }
513
514    pub fn len(&self) -> usize {
515        self.bits
516            .iter()
517            .map(|word| word.load(Ordering::Acquire).count_ones() as usize)
518            .sum()
519    }
520
521    pub fn is_empty(&self) -> bool {
522        self.bits
523            .iter()
524            .all(|word| word.load(Ordering::Acquire) == 0)
525    }
526}
527
528/// Cache line size for padding to prevent false sharing.
529const CACHE_LINE_SIZE: usize = 64;
530
531/// A single entry in the MCache ring buffer.
532///
533/// Layout is carefully designed to prevent false sharing:
534/// - `seq` is on its own cache line (read by consumer, written by producer)
535/// - `meta` is on a separate cache line (written by producer, read by consumer)
536#[repr(C, align(64))]
537struct MCacheEntry {
538    /// Sequence number - atomically updated by producer, read by consumer.
539    seq: AtomicU64,
540    /// Padding to push metadata to a separate cache line.
541    _pad: [u8; CACHE_LINE_SIZE - size_of::<AtomicU64>()],
542    /// Fragment metadata - written by producer before seq update.
543    meta: UnsafeCell<FragmentMetadata>,
544}
545
546impl fmt::Debug for MCacheEntry {
547    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548        f.debug_struct("MCacheEntry")
549            .field("seq", &self.seq.load(Ordering::Relaxed))
550            .finish_non_exhaustive()
551    }
552}
553
554// SAFETY: MCacheEntry is Sync because:
555// - `seq` is atomic and provides synchronization
556// - `meta` is only written before `seq` is updated (Release) and read after
557//   `seq` is loaded (Acquire), establishing a happens-before relationship
558unsafe impl Sync for MCacheEntry {}
559
560impl MCacheEntry {
561    fn new() -> Self {
562        Self {
563            seq: AtomicU64::new(0),
564            _pad: [0u8; CACHE_LINE_SIZE - size_of::<AtomicU64>()],
565            meta: UnsafeCell::new(FragmentMetadata::default()),
566        }
567    }
568}
569
570#[derive(Debug)]
571pub struct MCache<const DEPTH: usize> {
572    mask: u64,
573    entries: [MCacheEntry; DEPTH],
574    running: AtomicBool,
575}
576
577impl<const DEPTH: usize> Default for MCache<DEPTH> {
578    fn default() -> Self {
579        Self::new()
580    }
581}
582
583impl<const DEPTH: usize> MCache<DEPTH> {
584    const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(DEPTH));
585
586    /// Create a ring buffer with a power-of-two number of slots.
587    pub fn new() -> Self {
588        let () = Self::ASSERT_POWER_OF_TWO;
589        Self {
590            mask: (DEPTH - 1) as u64,
591            entries: core::array::from_fn(|_| MCacheEntry::new()),
592            running: AtomicBool::new(true),
593        }
594    }
595
596    pub fn stop(&self) {
597        self.running.store(false, Ordering::Release);
598    }
599
600    pub fn is_running(&self) -> bool {
601        self.running.load(Ordering::Acquire)
602    }
603
604    /// Publish a fragment's metadata into the ring.
605    pub fn publish(&self, meta: FragmentMetadata) {
606        let idx = (meta.seq & self.mask) as usize;
607        let entry = &self.entries[idx];
608        // SAFETY: This write is safe because:
609        // 1. We have exclusive write access as the single producer
610        // 2. The subsequent Release store on `seq` ensures this write is visible
611        //    to any consumer that loads `seq` with Acquire ordering
612        // 3. The mask ensures idx is always within bounds
613        unsafe {
614            *entry.meta.get() = meta;
615        }
616        entry.seq.store(meta.seq, Ordering::Release);
617    }
618
619    /// Busy-wait for a specific sequence number to appear.
620    ///
621    /// Returns:
622    /// - `ReadResult::Ok(meta)` if the sequence was successfully read
623    /// - `ReadResult::Overrun` if the consumer was lapped by the producer
624    /// - `ReadResult::NotReady` if the mcache was stopped before the sequence appeared
625    pub fn wait(&self, seq: u64) -> ReadResult<FragmentMetadata> {
626        while self.is_running() {
627            match self.try_read(seq) {
628                ReadResult::Ok(meta) => return ReadResult::Ok(meta),
629                ReadResult::Overrun => return ReadResult::Overrun,
630                ReadResult::NotReady => core::hint::spin_loop(),
631            }
632        }
633        ReadResult::NotReady
634    }
635
636    /// Attempt a lock-free read of the metadata at a specific sequence.
637    ///
638    /// Returns:
639    /// - `ReadResult::Ok(meta)` if the sequence was successfully read
640    /// - `ReadResult::NotReady` if the sequence has not been published yet
641    /// - `ReadResult::Overrun` if the consumer was lapped by the producer
642    pub fn try_read(&self, seq: u64) -> ReadResult<FragmentMetadata> {
643        let idx = (seq & self.mask) as usize;
644        let entry = &self.entries[idx];
645
646        let seq_before = entry.seq.load(Ordering::Acquire);
647
648        // Not published yet
649        if seq_before < seq {
650            return ReadResult::NotReady;
651        }
652
653        // Slot has been overwritten - consumer was too slow
654        if seq_before > seq {
655            return ReadResult::Overrun;
656        }
657
658        // seq_before == seq: attempt to read
659        // SAFETY: This read is safe because:
660        // 1. The Acquire load above synchronizes with the Release store in publish(),
661        //    establishing a happens-before relationship that ensures the metadata
662        //    write is complete and visible
663        // 2. We verify seq hasn't changed after reading (double-read validation)
664        //    to detect concurrent overwrites
665        // 3. FragmentMetadata is Copy, so we get a snapshot that won't be affected
666        //    by subsequent writes
667        let meta = unsafe { *entry.meta.get() };
668
669        // Double-check the sequence hasn't changed during our read
670        let seq_after = entry.seq.load(Ordering::Acquire);
671        if seq_before == seq_after {
672            ReadResult::Ok(meta)
673        } else {
674            // Producer overwrote while we were reading
675            ReadResult::Overrun
676        }
677    }
678}
679
680/// A single chunk in the DCache.
681#[repr(C, align(64))]
682struct DcacheChunk<const CHUNK_SIZE: usize> {
683    data: UnsafeCell<[u8; CHUNK_SIZE]>,
684}
685
686impl<const CHUNK_SIZE: usize> fmt::Debug for DcacheChunk<CHUNK_SIZE> {
687    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688        f.debug_struct("DcacheChunk")
689            .field("size", &CHUNK_SIZE)
690            .finish_non_exhaustive()
691    }
692}
693
694impl<const CHUNK_SIZE: usize> DcacheChunk<CHUNK_SIZE> {
695    fn new() -> Self {
696        Self {
697            data: UnsafeCell::new([0u8; CHUNK_SIZE]),
698        }
699    }
700}
701
702// SAFETY: DcacheChunk is Sync because access is synchronized through the
703// MCache sequence protocol - writes happen before sequence update (Release),
704// reads happen after sequence load (Acquire).
705unsafe impl<const CHUNK_SIZE: usize> Sync for DcacheChunk<CHUNK_SIZE> {}
706
707#[derive(Debug, Clone, Copy)]
708pub struct DcacheView<'a, const CHUNK_SIZE: usize> {
709    chunk: &'a DcacheChunk<CHUNK_SIZE>,
710    size: usize,
711}
712
713impl<'a, const CHUNK_SIZE: usize> DcacheView<'a, CHUNK_SIZE> {
714    /// Zero-copy access to the payload bytes.
715    #[inline]
716    pub fn as_slice(&self) -> &'a [u8] {
717        // SAFETY: This read is safe because:
718        // 1. DcacheView can only be obtained through Consumer::poll() or
719        //    Consumer::wait(), which validate via the MCache sequence protocol
720        // 2. The Acquire load in MCache::try_read synchronizes with the Release
721        //    store in MCache::publish, ensuring the payload write in
722        //    DCache::write_chunk is fully visible before we can read it
723        // 3. The lifetime 'a is tied to the DCache, ensuring the data remains
724        //    valid for the duration of the borrow
725        // 4. self.size is validated to be <= CHUNK_SIZE when the view is created
726        let data = unsafe { &*self.chunk.data.get() };
727        &data[..self.size]
728    }
729
730    /// Copy the payload into a new Vec.
731    ///
732    /// Prefer `as_slice()` for zero-copy access when possible.
733    ///
734    /// Only available with the `std` feature.
735    #[cfg(feature = "std")]
736    pub fn read(&self) -> std::vec::Vec<u8> {
737        self.as_slice().to_vec()
738    }
739
740    /// Access the payload through a closure.
741    ///
742    /// Prefer `as_slice()` for direct zero-copy access.
743    pub fn with_reader<T>(&self, f: impl FnOnce(&[u8]) -> T) -> T {
744        f(self.as_slice())
745    }
746
747    /// Returns the size of the payload in bytes.
748    #[inline]
749    pub fn len(&self) -> usize {
750        self.size
751    }
752
753    /// Returns true if the payload is empty.
754    #[inline]
755    pub fn is_empty(&self) -> bool {
756        self.size == 0
757    }
758}
759
760#[derive(Debug)]
761pub struct DCache<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> {
762    chunks: [DcacheChunk<CHUNK_SIZE>; CHUNK_COUNT],
763    next: AtomicU64,
764    mask: u64,
765}
766
767const _: () = {
768    // DCache CHUNK_COUNT must be a power of two for ring buffer masking
769    // This is checked at runtime in new() via the same pattern as MCache
770};
771
772impl<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Default
773    for DCache<CHUNK_COUNT, CHUNK_SIZE>
774{
775    fn default() -> Self {
776        Self::new()
777    }
778}
779
780impl<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> DCache<CHUNK_COUNT, CHUNK_SIZE> {
781    const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(CHUNK_COUNT));
782
783    /// Create a fixed-size cache of payload chunks.
784    ///
785    /// CHUNK_COUNT must be a power of two.
786    pub fn new() -> Self {
787        let () = Self::ASSERT_POWER_OF_TWO;
788        Self {
789            chunks: core::array::from_fn(|_| DcacheChunk::new()),
790            next: AtomicU64::new(0),
791            mask: (CHUNK_COUNT - 1) as u64,
792        }
793    }
794
795    /// Allocate a chunk index for a new payload (ring buffer style).
796    ///
797    /// This wraps around, so callers must use flow control (Fctl) to ensure
798    /// chunks are not overwritten before consumers are done with them.
799    pub fn allocate(&self) -> u32 {
800        let seq = self.next.fetch_add(1, Ordering::AcqRel);
801        (seq & self.mask) as u32
802    }
803
804    /// Returns the number of chunks in the cache.
805    pub fn capacity(&self) -> usize {
806        CHUNK_COUNT
807    }
808
809    /// Write payload bytes into a chunk, truncating to the chunk size.
810    pub fn write_chunk(&self, chunk: u32, payload: &[u8]) -> Result<usize, TangoError> {
811        let idx = chunk as usize;
812        let Some(target) = self.chunks.get(idx) else {
813            return Err(TangoError::ChunkOutOfRange(chunk));
814        };
815        let size = payload.len().min(CHUNK_SIZE);
816        // SAFETY: This write is safe because:
817        // 1. We have exclusive write access as the single producer
818        // 2. The chunk index is bounds-checked via .get() above
819        // 3. The subsequent MCache::publish() with Release ordering ensures
820        //    this write is visible to consumers before they can read it
821        // 4. Consumers only read after validating the sequence number
822        unsafe {
823            let data = &mut *target.data.get();
824            data[..size].copy_from_slice(&payload[..size]);
825        }
826        Ok(size)
827    }
828
829    /// Read a view of a chunk with the provided size limit.
830    pub fn read_chunk(
831        &self,
832        chunk: u32,
833        size: usize,
834    ) -> Result<DcacheView<'_, CHUNK_SIZE>, TangoError> {
835        let idx = chunk as usize;
836        let Some(target) = self.chunks.get(idx) else {
837            return Err(TangoError::ChunkOutOfRange(chunk));
838        };
839        Ok(DcacheView {
840            chunk: target,
841            size: size.min(CHUNK_SIZE),
842        })
843    }
844
845    /// Return the configured chunk size in bytes.
846    pub fn chunk_size(&self) -> usize {
847        CHUNK_SIZE
848    }
849}
850
851#[derive(Clone, Copy)]
852pub struct Producer<
853    'a,
854    const MCACHE_DEPTH: usize,
855    const CHUNK_COUNT: usize,
856    const CHUNK_SIZE: usize,
857> {
858    mcache: &'a MCache<MCACHE_DEPTH>,
859    dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
860    fseq: &'a Fseq,
861    fctl: Option<&'a Fctl>,
862    metrics: Option<&'a Metrics>,
863}
864
865impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
866    for Producer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
867{
868    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869        f.debug_struct("Producer")
870            .field("has_flow_control", &self.fctl.is_some())
871            .field("has_metrics", &self.metrics.is_some())
872            .finish()
873    }
874}
875
876impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
877    Producer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
878{
879    /// Create a producer for a shared mcache/dcache pair.
880    ///
881    /// Without flow control, this producer will freely overwrite chunks.
882    /// Use `with_flow_control` for backpressure.
883    pub fn new(
884        mcache: &'a MCache<MCACHE_DEPTH>,
885        dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
886        fseq: &'a Fseq,
887    ) -> Self {
888        Self {
889            mcache,
890            dcache,
891            fseq,
892            fctl: None,
893            metrics: None,
894        }
895    }
896
897    /// Create a producer with credit-based flow control.
898    ///
899    /// The producer will acquire a credit before allocating a chunk.
900    /// Initialize `fctl` with `CHUNK_COUNT` credits.
901    pub fn with_flow_control(
902        mcache: &'a MCache<MCACHE_DEPTH>,
903        dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
904        fseq: &'a Fseq,
905        fctl: &'a Fctl,
906    ) -> Self {
907        Self {
908            mcache,
909            dcache,
910            fseq,
911            fctl: Some(fctl),
912            metrics: None,
913        }
914    }
915
916    /// Attach metrics tracking to this producer.
917    ///
918    /// Returns a new producer with the same configuration plus metrics.
919    pub fn with_metrics(mut self, metrics: &'a Metrics) -> Self {
920        self.metrics = Some(metrics);
921        self
922    }
923
924    /// Publish a payload fragment and its metadata.
925    ///
926    /// If flow control is enabled, returns `TangoError::NoCredits` when
927    /// no credits are available (consumer hasn't caught up).
928    #[must_use = "publishing may fail; check the result"]
929    pub fn publish(
930        &self,
931        payload: &[u8],
932        sig: u64,
933        ctl: u16,
934        ts: u32,
935    ) -> Result<FragmentMetadata, TangoError> {
936        // Acquire credit if flow control is enabled
937        if let Some(fctl) = self.fctl {
938            if !fctl.acquire(1) {
939                if let Some(metrics) = self.metrics {
940                    metrics.record_backpressure();
941                }
942                return Err(TangoError::NoCredits);
943            }
944        }
945
946        let seq = self.fseq.next();
947        let chunk = self.dcache.allocate();
948        let size = self.dcache.write_chunk(chunk, payload)? as u32;
949        let meta = FragmentMetadata {
950            seq,
951            sig,
952            chunk,
953            size,
954            ctl,
955            reserved: 0,
956            ts,
957        };
958        self.mcache.publish(meta);
959
960        if let Some(metrics) = self.metrics {
961            metrics.record_publish();
962        }
963
964        Ok(meta)
965    }
966
967    /// Try to publish, spinning until credits are available or mcache stops.
968    ///
969    /// Only useful when flow control is enabled.
970    #[must_use = "publishing may fail; check the result"]
971    pub fn publish_blocking(
972        &self,
973        payload: &[u8],
974        sig: u64,
975        ctl: u16,
976        ts: u32,
977    ) -> Result<FragmentMetadata, TangoError> {
978        loop {
979            match self.publish(payload, sig, ctl, ts) {
980                Ok(meta) => return Ok(meta),
981                Err(TangoError::NoCredits) => {
982                    if !self.mcache.is_running() {
983                        return Err(TangoError::NoCredits);
984                    }
985                    core::hint::spin_loop();
986                }
987                Err(e) => return Err(e),
988            }
989        }
990    }
991
992    /// Publish multiple payloads in a batch.
993    ///
994    /// Returns the number of successfully published messages.
995    /// Stops on the first error (e.g., `NoCredits`).
996    ///
997    /// This can be more efficient than calling `publish()` in a loop
998    /// as it reduces function call overhead.
999    pub fn publish_batch(&self, payloads: &[&[u8]], sig: u64, ctl: u16) -> usize {
1000        let mut published = 0;
1001        for (i, payload) in payloads.iter().enumerate() {
1002            match self.publish(payload, sig, ctl, i as u32) {
1003                Ok(_) => published += 1,
1004                Err(_) => break,
1005            }
1006        }
1007        published
1008    }
1009}
1010
1011#[derive(Debug)]
1012pub struct Fragment<'a, const CHUNK_SIZE: usize> {
1013    pub meta: FragmentMetadata,
1014    pub payload: DcacheView<'a, CHUNK_SIZE>,
1015}
1016
1017pub struct Consumer<
1018    'a,
1019    const MCACHE_DEPTH: usize,
1020    const CHUNK_COUNT: usize,
1021    const CHUNK_SIZE: usize,
1022> {
1023    mcache: &'a MCache<MCACHE_DEPTH>,
1024    dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
1025    fctl: Option<&'a Fctl>,
1026    metrics: Option<&'a Metrics>,
1027    next_seq: u64,
1028}
1029
1030impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
1031    for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1032{
1033    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1034        f.debug_struct("Consumer")
1035            .field("next_seq", &self.next_seq)
1036            .field("has_flow_control", &self.fctl.is_some())
1037            .field("has_metrics", &self.metrics.is_some())
1038            .finish()
1039    }
1040}
1041
1042impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
1043    Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1044{
1045    /// Create a consumer starting at the given sequence number.
1046    pub fn new(
1047        mcache: &'a MCache<MCACHE_DEPTH>,
1048        dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
1049        initial_seq: u64,
1050    ) -> Self {
1051        Self {
1052            mcache,
1053            dcache,
1054            fctl: None,
1055            metrics: None,
1056            next_seq: initial_seq,
1057        }
1058    }
1059
1060    /// Create a consumer with credit-based flow control.
1061    ///
1062    /// The consumer will release a credit after consuming each fragment.
1063    /// Use the same `fctl` instance as the producer.
1064    pub fn with_flow_control(
1065        mcache: &'a MCache<MCACHE_DEPTH>,
1066        dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
1067        fctl: &'a Fctl,
1068        initial_seq: u64,
1069    ) -> Self {
1070        Self {
1071            mcache,
1072            dcache,
1073            fctl: Some(fctl),
1074            metrics: None,
1075            next_seq: initial_seq,
1076        }
1077    }
1078
1079    /// Attach metrics tracking to this consumer.
1080    ///
1081    /// Returns a new consumer with the same configuration plus metrics.
1082    pub fn with_metrics(mut self, metrics: &'a Metrics) -> Self {
1083        self.metrics = Some(metrics);
1084        self
1085    }
1086
1087    /// Poll for the next fragment without blocking.
1088    ///
1089    /// Returns:
1090    /// - `Ok(Some(fragment))` if a fragment was available
1091    /// - `Ok(None)` if the sequence is not ready yet
1092    /// - `Err(TangoError::Overrun)` if the consumer was lapped
1093    #[must_use = "polling may return data or an error; check the result"]
1094    pub fn poll(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError> {
1095        let seq = self.next_seq;
1096        match self.mcache.try_read(seq) {
1097            ReadResult::Ok(meta) => {
1098                let payload = self.dcache.read_chunk(meta.chunk, meta.size as usize)?;
1099                self.next_seq = seq + 1;
1100
1101                // Release credit after consuming
1102                if let Some(fctl) = self.fctl {
1103                    fctl.release(1);
1104                }
1105
1106                if let Some(metrics) = self.metrics {
1107                    metrics.record_consume();
1108                }
1109
1110                Ok(Some(Fragment { meta, payload }))
1111            }
1112            ReadResult::NotReady => Ok(None),
1113            ReadResult::Overrun => {
1114                if let Some(metrics) = self.metrics {
1115                    metrics.record_overrun();
1116                }
1117                Err(TangoError::Overrun)
1118            }
1119        }
1120    }
1121
1122    /// Busy-wait for the next fragment.
1123    ///
1124    /// Returns:
1125    /// - `Ok(Some(fragment))` if a fragment was received
1126    /// - `Ok(None)` if the mcache was stopped
1127    /// - `Err(TangoError::Overrun)` if the consumer was lapped
1128    #[must_use = "waiting may return data or an error; check the result"]
1129    pub fn wait(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError> {
1130        let seq = self.next_seq;
1131        match self.mcache.wait(seq) {
1132            ReadResult::Ok(meta) => {
1133                let payload = self.dcache.read_chunk(meta.chunk, meta.size as usize)?;
1134                self.next_seq = seq + 1;
1135
1136                // Release credit after consuming
1137                if let Some(fctl) = self.fctl {
1138                    fctl.release(1);
1139                }
1140
1141                if let Some(metrics) = self.metrics {
1142                    metrics.record_consume();
1143                }
1144
1145                Ok(Some(Fragment { meta, payload }))
1146            }
1147            ReadResult::NotReady => Ok(None),
1148            ReadResult::Overrun => {
1149                if let Some(metrics) = self.metrics {
1150                    metrics.record_overrun();
1151                }
1152                Err(TangoError::Overrun)
1153            }
1154        }
1155    }
1156
1157    /// Return the next sequence number the consumer expects.
1158    pub fn next_seq(&self) -> u64 {
1159        self.next_seq
1160    }
1161
1162    /// Manually release credits (useful for batch processing).
1163    ///
1164    /// Call this after you're done processing a batch of fragments
1165    /// if you want to delay credit release for better throughput.
1166    pub fn release_credits(&self, count: u64) {
1167        if let Some(fctl) = self.fctl {
1168            fctl.release(count);
1169        }
1170    }
1171
1172    /// Poll for multiple fragments at once, up to `max_count`.
1173    ///
1174    /// Returns a vector of fragments (up to `max_count`) that were available.
1175    /// Stops on the first `NotReady` or error.
1176    ///
1177    /// This is more efficient than calling `poll()` in a loop when you expect
1178    /// multiple messages to be available.
1179    ///
1180    /// Only available with the `std` feature.
1181    #[cfg(feature = "std")]
1182    pub fn poll_batch(
1183        &mut self,
1184        max_count: usize,
1185    ) -> Result<std::vec::Vec<Fragment<'a, CHUNK_SIZE>>, TangoError> {
1186        let mut fragments = std::vec::Vec::with_capacity(max_count);
1187        for _ in 0..max_count {
1188            match self.poll() {
1189                Ok(Some(fragment)) => fragments.push(fragment),
1190                Ok(None) => break,
1191                Err(e) => {
1192                    if fragments.is_empty() {
1193                        return Err(e);
1194                    }
1195                    break;
1196                }
1197            }
1198        }
1199        Ok(fragments)
1200    }
1201}
1202
1203impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> IntoIterator
1204    for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1205{
1206    type Item = Result<Fragment<'a, CHUNK_SIZE>, TangoError>;
1207    type IntoIter = ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>;
1208
1209    /// Convert this consumer into an iterator that busy-waits for messages.
1210    ///
1211    /// The iterator yields `Result<Fragment, TangoError>` and will:
1212    /// - Yield `Ok(fragment)` for each successfully consumed message
1213    /// - Yield `Err(TangoError::Overrun)` if the consumer was lapped
1214    /// - Return `None` when the MCache is stopped
1215    ///
1216    /// # Example
1217    ///
1218    /// ```ignore
1219    /// for result in consumer {
1220    ///     match result {
1221    ///         Ok(fragment) => println!("Got: {:?}", fragment.payload.as_slice()),
1222    ///         Err(e) => eprintln!("Error: {}", e),
1223    ///     }
1224    /// }
1225    /// ```
1226    fn into_iter(self) -> Self::IntoIter {
1227        ConsumerIter { consumer: self }
1228    }
1229}
1230
1231/// An iterator over fragments from a [`Consumer`].
1232///
1233/// Created by [`Consumer::into_iter`]. This iterator busy-waits for messages
1234/// and yields `Result<Fragment, TangoError>`.
1235pub struct ConsumerIter<
1236    'a,
1237    const MCACHE_DEPTH: usize,
1238    const CHUNK_COUNT: usize,
1239    const CHUNK_SIZE: usize,
1240> {
1241    consumer: Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>,
1242}
1243
1244impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
1245    for ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1246{
1247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1248        f.debug_struct("ConsumerIter")
1249            .field("consumer", &self.consumer)
1250            .finish()
1251    }
1252}
1253
1254impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Iterator
1255    for ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1256{
1257    type Item = Result<Fragment<'a, CHUNK_SIZE>, TangoError>;
1258
1259    fn next(&mut self) -> Option<Self::Item> {
1260        match self.consumer.wait() {
1261            Ok(Some(fragment)) => Some(Ok(fragment)),
1262            Ok(None) => None, // MCache stopped
1263            Err(e) => Some(Err(e)),
1264        }
1265    }
1266}
1267
1268/// Builder for creating tango channels with ergonomic configuration.
1269///
1270/// # Example
1271///
1272/// ```
1273/// use rust_tango::ChannelBuilder;
1274///
1275/// // Create channel components with the builder
1276/// let (mcache, dcache, fseq, fctl, metrics) = ChannelBuilder::<64, 64, 256>::new()
1277///     .with_flow_control()
1278///     .with_metrics()
1279///     .build();
1280///
1281/// // Create producer and consumer from the components
1282/// use rust_tango::{Producer, Consumer};
1283/// let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, fctl.as_ref().unwrap());
1284/// let mut consumer = Consumer::with_flow_control(&mcache, &dcache, fctl.as_ref().unwrap(), 1);
1285/// ```
1286#[derive(Debug)]
1287pub struct ChannelBuilder<
1288    const MCACHE_DEPTH: usize,
1289    const CHUNK_COUNT: usize,
1290    const CHUNK_SIZE: usize,
1291> {
1292    initial_seq: u64,
1293    flow_control: bool,
1294    metrics: bool,
1295}
1296
1297impl<const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Default
1298    for ChannelBuilder<MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1299{
1300    fn default() -> Self {
1301        Self::new()
1302    }
1303}
1304
1305impl<const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
1306    ChannelBuilder<MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1307{
1308    /// Create a new channel builder with default settings.
1309    pub fn new() -> Self {
1310        Self {
1311            initial_seq: 1,
1312            flow_control: false,
1313            metrics: false,
1314        }
1315    }
1316
1317    /// Set the initial sequence number (default: 1).
1318    pub fn initial_seq(mut self, seq: u64) -> Self {
1319        self.initial_seq = seq;
1320        self
1321    }
1322
1323    /// Enable credit-based flow control.
1324    pub fn with_flow_control(mut self) -> Self {
1325        self.flow_control = true;
1326        self
1327    }
1328
1329    /// Enable metrics tracking.
1330    pub fn with_metrics(mut self) -> Self {
1331        self.metrics = true;
1332        self
1333    }
1334
1335    /// Build the channel components.
1336    ///
1337    /// Returns a tuple of:
1338    /// - `MCache` - the metadata ring buffer
1339    /// - `DCache` - the data chunk storage
1340    /// - `Fseq` - the sequence counter
1341    /// - `Option<Fctl>` - flow control (if enabled)
1342    /// - `Option<Metrics>` - metrics (if enabled)
1343    #[must_use = "this returns the channel components that should be used"]
1344    pub fn build(
1345        self,
1346    ) -> (
1347        MCache<MCACHE_DEPTH>,
1348        DCache<CHUNK_COUNT, CHUNK_SIZE>,
1349        Fseq,
1350        Option<Fctl>,
1351        Option<Metrics>,
1352    ) {
1353        let mcache = MCache::new();
1354        let dcache = DCache::new();
1355        let fseq = Fseq::new(self.initial_seq);
1356        let fctl = if self.flow_control {
1357            Some(Fctl::new(CHUNK_COUNT as u64))
1358        } else {
1359            None
1360        };
1361        let metrics = if self.metrics {
1362            Some(Metrics::new())
1363        } else {
1364            None
1365        };
1366        (mcache, dcache, fseq, fctl, metrics)
1367    }
1368}
1369
1370#[cfg(all(test, feature = "std", not(loom)))]
1371mod tests {
1372    use super::*;
1373    use std::sync::atomic::{AtomicUsize, Ordering};
1374    use std::thread;
1375
1376    const MCACHE_DEPTH: usize = 8;
1377    const CHUNK_COUNT: usize = 8;
1378    const CHUNK_SIZE: usize = 64;
1379
1380    #[test]
1381    fn publish_and_consume() {
1382        let mcache = MCache::<MCACHE_DEPTH>::new();
1383        let dcache = DCache::<CHUNK_COUNT, CHUNK_SIZE>::new();
1384        let fseq = Fseq::new(1);
1385        let producer = Producer::new(&mcache, &dcache, &fseq);
1386        let mut consumer = Consumer::new(&mcache, &dcache, 1);
1387
1388        let meta = producer.publish(b"hello", 42, 7, 1234).expect("publish");
1389        assert_eq!(meta.seq, 1);
1390
1391        let fragment = consumer.poll().expect("poll").expect("fragment");
1392        assert_eq!(fragment.meta.sig, 42);
1393        assert_eq!(fragment.payload.read(), b"hello");
1394    }
1395
1396    #[test]
1397    fn publish_and_consume_with_flow_control() {
1398        let mcache = MCache::<MCACHE_DEPTH>::new();
1399        let dcache = DCache::<CHUNK_COUNT, CHUNK_SIZE>::new();
1400        let fseq = Fseq::new(1);
1401        let fctl = Fctl::new(CHUNK_COUNT as u64);
1402
1403        let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
1404        let mut consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
1405
1406        // Should be able to publish up to CHUNK_COUNT messages
1407        for i in 0..CHUNK_COUNT {
1408            producer
1409                .publish(b"test", i as u64, 0, 0)
1410                .expect("publish should succeed");
1411        }
1412
1413        // Next publish should fail - no credits
1414        assert!(matches!(
1415            producer.publish(b"fail", 0, 0, 0),
1416            Err(TangoError::NoCredits)
1417        ));
1418
1419        // Consume one message - releases a credit
1420        let _ = consumer.poll().expect("poll").expect("fragment");
1421
1422        // Now we can publish again
1423        producer
1424            .publish(b"success", 0, 0, 0)
1425            .expect("publish should succeed after credit release");
1426    }
1427
1428    #[test]
1429    fn detect_overrun() {
1430        let mcache = MCache::<4>::new();
1431        let dcache = DCache::<8, 64>::new();
1432        let fseq = Fseq::new(1);
1433
1434        let producer = Producer::new(&mcache, &dcache, &fseq);
1435        let mut consumer = Consumer::new(&mcache, &dcache, 1);
1436
1437        // Publish more messages than mcache depth, causing overwrite
1438        for i in 0..8u64 {
1439            producer.publish(b"msg", i, 0, 0).expect("publish");
1440        }
1441
1442        // Consumer at seq=1 should detect overrun (slot was overwritten)
1443        assert!(matches!(consumer.poll(), Err(TangoError::Overrun)));
1444    }
1445
1446    #[test]
1447    fn read_result_not_ready() {
1448        let mcache = MCache::<8>::new();
1449
1450        // Try to read seq=1 before anything is published
1451        assert!(matches!(mcache.try_read(1), ReadResult::NotReady));
1452    }
1453
1454    #[test]
1455    fn publish_and_consume_across_threads() {
1456        let mcache = MCache::<64>::new();
1457        let dcache = DCache::<64, 64>::new();
1458        let fseq = Fseq::new(1);
1459        let producer = Producer::new(&mcache, &dcache, &fseq);
1460        let consumer = Consumer::new(&mcache, &dcache, 1);
1461        let received = AtomicUsize::new(0);
1462
1463        thread::scope(|scope| {
1464            scope.spawn(|| {
1465                let mut consumer = consumer;
1466                while received.load(Ordering::Acquire) < 3 {
1467                    match consumer.poll() {
1468                        Ok(Some(fragment)) => {
1469                            let payload = fragment.payload.read();
1470                            println!("received: {:?}", String::from_utf8_lossy(&payload));
1471                            assert!(payload.starts_with(b"msg-"));
1472                            received.fetch_add(1, Ordering::AcqRel);
1473                        }
1474                        Ok(None) => thread::yield_now(),
1475                        Err(e) => panic!("unexpected error: {}", e),
1476                    }
1477                }
1478            });
1479
1480            scope.spawn(|| {
1481                for idx in 0..3u8 {
1482                    let payload = [b'm', b's', b'g', b'-', b'0' + idx];
1483                    producer
1484                        .publish(&payload, 0xAA, 0, idx as u32)
1485                        .expect("publish");
1486                }
1487            });
1488        });
1489
1490        assert_eq!(received.load(Ordering::Acquire), 3);
1491    }
1492
1493    #[test]
1494    fn flow_control_across_threads() {
1495        let mcache = MCache::<64>::new();
1496        let dcache = DCache::<64, 64>::new();
1497        let fseq = Fseq::new(1);
1498        let fctl = Fctl::new(64);
1499
1500        let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
1501        let consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
1502        let received = AtomicUsize::new(0);
1503
1504        thread::scope(|scope| {
1505            scope.spawn(|| {
1506                let mut consumer = consumer;
1507                while received.load(Ordering::Acquire) < 100 {
1508                    match consumer.poll() {
1509                        Ok(Some(_)) => {
1510                            received.fetch_add(1, Ordering::AcqRel);
1511                        }
1512                        Ok(None) => thread::yield_now(),
1513                        Err(e) => panic!("unexpected error: {}", e),
1514                    }
1515                }
1516            });
1517
1518            scope.spawn(|| {
1519                for i in 0..100u32 {
1520                    // Use blocking publish since consumer might be slow
1521                    producer
1522                        .publish_blocking(b"test", i as u64, 0, i)
1523                        .expect("publish");
1524                }
1525            });
1526        });
1527
1528        assert_eq!(received.load(Ordering::Acquire), 100);
1529    }
1530
1531    #[test]
1532    fn metrics_tracking() {
1533        let mcache = MCache::<8>::new();
1534        let dcache = DCache::<16, 64>::new();
1535        let fseq = Fseq::new(1);
1536        let fctl = Fctl::new(8);
1537        let metrics = Metrics::new();
1538
1539        let producer =
1540            Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl).with_metrics(&metrics);
1541        let mut consumer =
1542            Consumer::with_flow_control(&mcache, &dcache, &fctl, 1).with_metrics(&metrics);
1543
1544        // Publish 5 messages
1545        for i in 0..5 {
1546            producer.publish(b"test", i, 0, 0).expect("publish");
1547        }
1548
1549        // Consume 3 messages
1550        for _ in 0..3 {
1551            consumer.poll().expect("poll").expect("fragment");
1552        }
1553
1554        let snapshot = metrics.snapshot();
1555        assert_eq!(snapshot.published, 5);
1556        assert_eq!(snapshot.consumed, 3);
1557        assert_eq!(snapshot.lag(), 2);
1558        assert_eq!(snapshot.overruns, 0);
1559        assert_eq!(snapshot.backpressure_events, 0);
1560    }
1561
1562    #[test]
1563    fn metrics_backpressure_tracking() {
1564        let mcache = MCache::<8>::new();
1565        let dcache = DCache::<8, 64>::new();
1566        let fseq = Fseq::new(1);
1567        let fctl = Fctl::new(2); // Only 2 credits
1568        let metrics = Metrics::new();
1569
1570        let producer =
1571            Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl).with_metrics(&metrics);
1572
1573        // Publish 2 messages (uses all credits)
1574        producer.publish(b"1", 1, 0, 0).expect("first");
1575        producer.publish(b"2", 2, 0, 0).expect("second");
1576
1577        // Third publish should fail and record backpressure
1578        assert!(producer.publish(b"3", 3, 0, 0).is_err());
1579
1580        let snapshot = metrics.snapshot();
1581        assert_eq!(snapshot.published, 2);
1582        assert_eq!(snapshot.backpressure_events, 1);
1583    }
1584
1585    #[test]
1586    fn zero_copy_read() {
1587        let mcache = MCache::<8>::new();
1588        let dcache = DCache::<8, 64>::new();
1589        let fseq = Fseq::new(1);
1590        let producer = Producer::new(&mcache, &dcache, &fseq);
1591        let mut consumer = Consumer::new(&mcache, &dcache, 1);
1592
1593        producer.publish(b"hello world", 42, 0, 0).expect("publish");
1594
1595        let fragment = consumer.poll().expect("poll").expect("fragment");
1596
1597        // Zero-copy access
1598        let slice = fragment.payload.as_slice();
1599        assert_eq!(slice, b"hello world");
1600        assert_eq!(fragment.payload.len(), 11);
1601        assert!(!fragment.payload.is_empty());
1602    }
1603}