1#![cfg_attr(not(feature = "std"), no_std)]
117
118#[cfg(feature = "std")]
119extern crate std;
120
121#[cfg(loom)]
123use loom::cell::UnsafeCell;
124#[cfg(loom)]
125use loom::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
126
127#[cfg(not(loom))]
128use core::cell::UnsafeCell;
129#[cfg(not(loom))]
130use core::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
131
132use core::fmt;
133use core::mem::size_of;
134
135#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
136#[repr(C, align(16))]
137pub struct FragmentMetadata {
138 pub seq: u64,
140 pub sig: u64,
142 pub chunk: u32,
144 pub size: u32,
146 pub ctl: u16,
148 pub reserved: u16,
150 pub ts: u32,
152}
153
154const _: () = {
155 assert!(size_of::<FragmentMetadata>() == 32);
156};
157
158impl fmt::Display for FragmentMetadata {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 write!(
161 f,
162 "Fragment {{ seq={}, sig={:#x}, chunk={}, size={}, ctl={}, ts={} }}",
163 self.seq, self.sig, self.chunk, self.size, self.ctl, self.ts
164 )
165 }
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170#[must_use = "this error should be handled"]
171pub enum TangoError {
172 DcacheFull,
174 ChunkOutOfRange(u32),
176 Overrun,
178 NoCredits,
180}
181
182impl fmt::Display for TangoError {
183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184 match self {
185 TangoError::DcacheFull => write!(f, "dcache is out of capacity"),
186 TangoError::ChunkOutOfRange(idx) => write!(f, "chunk index {} out of range", idx),
187 TangoError::Overrun => write!(f, "consumer overrun: producer lapped the consumer"),
188 TangoError::NoCredits => write!(f, "no credits available for backpressure"),
189 }
190 }
191}
192
193#[cfg(feature = "std")]
194impl std::error::Error for TangoError {}
195
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
198#[must_use = "this `ReadResult` may contain data that should be handled"]
199pub enum ReadResult<T> {
200 Ok(T),
202 NotReady,
204 Overrun,
206}
207
208impl<T> ReadResult<T> {
209 #[inline]
211 pub fn is_ok(&self) -> bool {
212 matches!(self, ReadResult::Ok(_))
213 }
214
215 #[inline]
217 pub fn is_not_ready(&self) -> bool {
218 matches!(self, ReadResult::NotReady)
219 }
220
221 #[inline]
223 pub fn is_overrun(&self) -> bool {
224 matches!(self, ReadResult::Overrun)
225 }
226
227 #[inline]
229 pub fn ok(self) -> Option<T> {
230 match self {
231 ReadResult::Ok(v) => Some(v),
232 _ => None,
233 }
234 }
235}
236
237impl<T: fmt::Debug> fmt::Display for ReadResult<T> {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 match self {
240 ReadResult::Ok(v) => write!(f, "Ok({:?})", v),
241 ReadResult::NotReady => write!(f, "NotReady"),
242 ReadResult::Overrun => write!(f, "Overrun"),
243 }
244 }
245}
246
247#[derive(Debug)]
252pub struct Metrics {
253 published: AtomicU64,
255 consumed: AtomicU64,
257 overruns: AtomicU64,
259 backpressure_events: AtomicU64,
261}
262
263#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
265#[must_use = "this snapshot contains metrics data that should be used"]
266pub struct MetricsSnapshot {
267 pub published: u64,
269 pub consumed: u64,
271 pub overruns: u64,
273 pub backpressure_events: u64,
275}
276
277impl MetricsSnapshot {
278 #[inline]
280 pub fn lag(&self) -> u64 {
281 self.published.saturating_sub(self.consumed)
282 }
283}
284
285impl fmt::Display for MetricsSnapshot {
286 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287 write!(
288 f,
289 "published={}, consumed={}, lag={}, overruns={}, backpressure={}",
290 self.published,
291 self.consumed,
292 self.lag(),
293 self.overruns,
294 self.backpressure_events
295 )
296 }
297}
298
299impl Default for Metrics {
300 fn default() -> Self {
301 Self::new()
302 }
303}
304
305impl Metrics {
306 pub fn new() -> Self {
308 Self {
309 published: AtomicU64::new(0),
310 consumed: AtomicU64::new(0),
311 overruns: AtomicU64::new(0),
312 backpressure_events: AtomicU64::new(0),
313 }
314 }
315
316 #[inline]
318 pub fn record_publish(&self) {
319 self.published.fetch_add(1, Ordering::Relaxed);
320 }
321
322 #[inline]
324 pub fn record_consume(&self) {
325 self.consumed.fetch_add(1, Ordering::Relaxed);
326 }
327
328 #[inline]
330 pub fn record_overrun(&self) {
331 self.overruns.fetch_add(1, Ordering::Relaxed);
332 }
333
334 #[inline]
336 pub fn record_backpressure(&self) {
337 self.backpressure_events.fetch_add(1, Ordering::Relaxed);
338 }
339
340 pub fn snapshot(&self) -> MetricsSnapshot {
342 MetricsSnapshot {
344 published: self.published.load(Ordering::Acquire),
345 consumed: self.consumed.load(Ordering::Acquire),
346 overruns: self.overruns.load(Ordering::Acquire),
347 backpressure_events: self.backpressure_events.load(Ordering::Acquire),
348 }
349 }
350
351 pub fn reset(&self) {
353 self.published.store(0, Ordering::Release);
354 self.consumed.store(0, Ordering::Release);
355 self.overruns.store(0, Ordering::Release);
356 self.backpressure_events.store(0, Ordering::Release);
357 }
358}
359
360#[derive(Debug, Clone, Copy, Eq, PartialEq)]
362pub enum CncState {
363 Boot = 0,
364 Run = 1,
365 Halt = 2,
366}
367
368impl CncState {
369 fn from_u8(value: u8) -> Self {
370 match value {
371 0 => CncState::Boot,
372 1 => CncState::Run,
373 _ => CncState::Halt,
374 }
375 }
376}
377
378impl fmt::Display for CncState {
379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380 match self {
381 CncState::Boot => write!(f, "Boot"),
382 CncState::Run => write!(f, "Run"),
383 CncState::Halt => write!(f, "Halt"),
384 }
385 }
386}
387
388#[derive(Debug)]
389pub struct Cnc {
390 state: AtomicU8,
391}
392
393impl Default for Cnc {
394 fn default() -> Self {
395 Self::new()
396 }
397}
398
399impl Cnc {
400 pub fn new() -> Self {
401 Self {
402 state: AtomicU8::new(CncState::Boot as u8),
403 }
404 }
405
406 pub fn state(&self) -> CncState {
407 CncState::from_u8(self.state.load(Ordering::Acquire))
408 }
409
410 pub fn set_state(&self, state: CncState) {
411 self.state.store(state as u8, Ordering::Release);
412 }
413}
414
415#[derive(Debug)]
416pub struct Fseq {
417 next: AtomicU64,
418}
419
420impl Fseq {
421 pub fn new(initial: u64) -> Self {
422 Self {
423 next: AtomicU64::new(initial),
424 }
425 }
426
427 pub fn next(&self) -> u64 {
428 self.next.fetch_add(1, Ordering::AcqRel)
429 }
430
431 pub fn current(&self) -> u64 {
432 self.next.load(Ordering::Acquire)
433 }
434}
435
436#[derive(Debug)]
437pub struct Fctl {
438 credits: AtomicU64,
439}
440
441impl Fctl {
442 pub fn new(initial: u64) -> Self {
443 Self {
444 credits: AtomicU64::new(initial),
445 }
446 }
447
448 #[must_use = "returns whether the credits were successfully acquired"]
449 pub fn acquire(&self, amount: u64) -> bool {
450 let mut current = self.credits.load(Ordering::Acquire);
451 loop {
452 if current < amount {
453 return false;
454 }
455 match self.credits.compare_exchange(
456 current,
457 current - amount,
458 Ordering::AcqRel,
459 Ordering::Acquire,
460 ) {
461 Ok(_) => return true,
462 Err(next) => current = next,
463 }
464 }
465 }
466
467 pub fn release(&self, amount: u64) {
468 self.credits.fetch_add(amount, Ordering::AcqRel);
469 }
470
471 pub fn available(&self) -> u64 {
472 self.credits.load(Ordering::Acquire)
473 }
474}
475
476const fn is_power_of_two(value: usize) -> bool {
477 value != 0 && (value & (value - 1)) == 0
478}
479
480#[derive(Debug)]
482pub struct Tcache<const WORDS: usize> {
483 bits: [AtomicU64; WORDS],
484 mask: u64,
485}
486
487impl<const WORDS: usize> Default for Tcache<WORDS> {
488 fn default() -> Self {
489 Self::new()
490 }
491}
492
493impl<const WORDS: usize> Tcache<WORDS> {
494 const BIT_COUNT: usize = WORDS * 64;
495 const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(Self::BIT_COUNT));
496
497 pub fn new() -> Self {
499 let () = Self::ASSERT_POWER_OF_TWO;
500 Self {
501 bits: core::array::from_fn(|_| AtomicU64::new(0)),
502 mask: (Self::BIT_COUNT - 1) as u64,
503 }
504 }
505
506 pub fn check_and_insert(&self, tag: u64) -> bool {
507 let bit = tag.wrapping_mul(0x9E37_79B9_7F4A_7C15) & self.mask;
508 let word_idx = (bit / 64) as usize;
509 let bit_mask = 1u64 << (bit % 64);
510 let prev = self.bits[word_idx].fetch_or(bit_mask, Ordering::AcqRel);
511 (prev & bit_mask) == 0
512 }
513
514 pub fn len(&self) -> usize {
515 self.bits
516 .iter()
517 .map(|word| word.load(Ordering::Acquire).count_ones() as usize)
518 .sum()
519 }
520
521 pub fn is_empty(&self) -> bool {
522 self.bits
523 .iter()
524 .all(|word| word.load(Ordering::Acquire) == 0)
525 }
526}
527
528const CACHE_LINE_SIZE: usize = 64;
530
531#[repr(C, align(64))]
537struct MCacheEntry {
538 seq: AtomicU64,
540 _pad: [u8; CACHE_LINE_SIZE - size_of::<AtomicU64>()],
542 meta: UnsafeCell<FragmentMetadata>,
544}
545
546impl fmt::Debug for MCacheEntry {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 f.debug_struct("MCacheEntry")
549 .field("seq", &self.seq.load(Ordering::Relaxed))
550 .finish_non_exhaustive()
551 }
552}
553
554unsafe impl Sync for MCacheEntry {}
559
560impl MCacheEntry {
561 fn new() -> Self {
562 Self {
563 seq: AtomicU64::new(0),
564 _pad: [0u8; CACHE_LINE_SIZE - size_of::<AtomicU64>()],
565 meta: UnsafeCell::new(FragmentMetadata::default()),
566 }
567 }
568}
569
570#[derive(Debug)]
571pub struct MCache<const DEPTH: usize> {
572 mask: u64,
573 entries: [MCacheEntry; DEPTH],
574 running: AtomicBool,
575}
576
577impl<const DEPTH: usize> Default for MCache<DEPTH> {
578 fn default() -> Self {
579 Self::new()
580 }
581}
582
583impl<const DEPTH: usize> MCache<DEPTH> {
584 const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(DEPTH));
585
586 pub fn new() -> Self {
588 let () = Self::ASSERT_POWER_OF_TWO;
589 Self {
590 mask: (DEPTH - 1) as u64,
591 entries: core::array::from_fn(|_| MCacheEntry::new()),
592 running: AtomicBool::new(true),
593 }
594 }
595
596 pub fn stop(&self) {
597 self.running.store(false, Ordering::Release);
598 }
599
600 pub fn is_running(&self) -> bool {
601 self.running.load(Ordering::Acquire)
602 }
603
604 pub fn publish(&self, meta: FragmentMetadata) {
606 let idx = (meta.seq & self.mask) as usize;
607 let entry = &self.entries[idx];
608 unsafe {
614 *entry.meta.get() = meta;
615 }
616 entry.seq.store(meta.seq, Ordering::Release);
617 }
618
619 pub fn wait(&self, seq: u64) -> ReadResult<FragmentMetadata> {
626 while self.is_running() {
627 match self.try_read(seq) {
628 ReadResult::Ok(meta) => return ReadResult::Ok(meta),
629 ReadResult::Overrun => return ReadResult::Overrun,
630 ReadResult::NotReady => core::hint::spin_loop(),
631 }
632 }
633 ReadResult::NotReady
634 }
635
636 pub fn try_read(&self, seq: u64) -> ReadResult<FragmentMetadata> {
643 let idx = (seq & self.mask) as usize;
644 let entry = &self.entries[idx];
645
646 let seq_before = entry.seq.load(Ordering::Acquire);
647
648 if seq_before < seq {
650 return ReadResult::NotReady;
651 }
652
653 if seq_before > seq {
655 return ReadResult::Overrun;
656 }
657
658 let meta = unsafe { *entry.meta.get() };
668
669 let seq_after = entry.seq.load(Ordering::Acquire);
671 if seq_before == seq_after {
672 ReadResult::Ok(meta)
673 } else {
674 ReadResult::Overrun
676 }
677 }
678}
679
680#[repr(C, align(64))]
682struct DcacheChunk<const CHUNK_SIZE: usize> {
683 data: UnsafeCell<[u8; CHUNK_SIZE]>,
684}
685
686impl<const CHUNK_SIZE: usize> fmt::Debug for DcacheChunk<CHUNK_SIZE> {
687 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688 f.debug_struct("DcacheChunk")
689 .field("size", &CHUNK_SIZE)
690 .finish_non_exhaustive()
691 }
692}
693
694impl<const CHUNK_SIZE: usize> DcacheChunk<CHUNK_SIZE> {
695 fn new() -> Self {
696 Self {
697 data: UnsafeCell::new([0u8; CHUNK_SIZE]),
698 }
699 }
700}
701
702unsafe impl<const CHUNK_SIZE: usize> Sync for DcacheChunk<CHUNK_SIZE> {}
706
707#[derive(Debug, Clone, Copy)]
708pub struct DcacheView<'a, const CHUNK_SIZE: usize> {
709 chunk: &'a DcacheChunk<CHUNK_SIZE>,
710 size: usize,
711}
712
713impl<'a, const CHUNK_SIZE: usize> DcacheView<'a, CHUNK_SIZE> {
714 #[inline]
716 pub fn as_slice(&self) -> &'a [u8] {
717 let data = unsafe { &*self.chunk.data.get() };
727 &data[..self.size]
728 }
729
730 #[cfg(feature = "std")]
736 pub fn read(&self) -> std::vec::Vec<u8> {
737 self.as_slice().to_vec()
738 }
739
740 pub fn with_reader<T>(&self, f: impl FnOnce(&[u8]) -> T) -> T {
744 f(self.as_slice())
745 }
746
747 #[inline]
749 pub fn len(&self) -> usize {
750 self.size
751 }
752
753 #[inline]
755 pub fn is_empty(&self) -> bool {
756 self.size == 0
757 }
758}
759
760#[derive(Debug)]
761pub struct DCache<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> {
762 chunks: [DcacheChunk<CHUNK_SIZE>; CHUNK_COUNT],
763 next: AtomicU64,
764 mask: u64,
765}
766
767const _: () = {
768 };
771
772impl<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Default
773 for DCache<CHUNK_COUNT, CHUNK_SIZE>
774{
775 fn default() -> Self {
776 Self::new()
777 }
778}
779
780impl<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> DCache<CHUNK_COUNT, CHUNK_SIZE> {
781 const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(CHUNK_COUNT));
782
783 pub fn new() -> Self {
787 let () = Self::ASSERT_POWER_OF_TWO;
788 Self {
789 chunks: core::array::from_fn(|_| DcacheChunk::new()),
790 next: AtomicU64::new(0),
791 mask: (CHUNK_COUNT - 1) as u64,
792 }
793 }
794
795 pub fn allocate(&self) -> u32 {
800 let seq = self.next.fetch_add(1, Ordering::AcqRel);
801 (seq & self.mask) as u32
802 }
803
804 pub fn capacity(&self) -> usize {
806 CHUNK_COUNT
807 }
808
809 pub fn write_chunk(&self, chunk: u32, payload: &[u8]) -> Result<usize, TangoError> {
811 let idx = chunk as usize;
812 let Some(target) = self.chunks.get(idx) else {
813 return Err(TangoError::ChunkOutOfRange(chunk));
814 };
815 let size = payload.len().min(CHUNK_SIZE);
816 unsafe {
823 let data = &mut *target.data.get();
824 data[..size].copy_from_slice(&payload[..size]);
825 }
826 Ok(size)
827 }
828
829 pub fn read_chunk(
831 &self,
832 chunk: u32,
833 size: usize,
834 ) -> Result<DcacheView<'_, CHUNK_SIZE>, TangoError> {
835 let idx = chunk as usize;
836 let Some(target) = self.chunks.get(idx) else {
837 return Err(TangoError::ChunkOutOfRange(chunk));
838 };
839 Ok(DcacheView {
840 chunk: target,
841 size: size.min(CHUNK_SIZE),
842 })
843 }
844
845 pub fn chunk_size(&self) -> usize {
847 CHUNK_SIZE
848 }
849}
850
851#[derive(Clone, Copy)]
852pub struct Producer<
853 'a,
854 const MCACHE_DEPTH: usize,
855 const CHUNK_COUNT: usize,
856 const CHUNK_SIZE: usize,
857> {
858 mcache: &'a MCache<MCACHE_DEPTH>,
859 dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
860 fseq: &'a Fseq,
861 fctl: Option<&'a Fctl>,
862 metrics: Option<&'a Metrics>,
863}
864
865impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
866 for Producer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
867{
868 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
869 f.debug_struct("Producer")
870 .field("has_flow_control", &self.fctl.is_some())
871 .field("has_metrics", &self.metrics.is_some())
872 .finish()
873 }
874}
875
876impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
877 Producer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
878{
879 pub fn new(
884 mcache: &'a MCache<MCACHE_DEPTH>,
885 dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
886 fseq: &'a Fseq,
887 ) -> Self {
888 Self {
889 mcache,
890 dcache,
891 fseq,
892 fctl: None,
893 metrics: None,
894 }
895 }
896
897 pub fn with_flow_control(
902 mcache: &'a MCache<MCACHE_DEPTH>,
903 dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
904 fseq: &'a Fseq,
905 fctl: &'a Fctl,
906 ) -> Self {
907 Self {
908 mcache,
909 dcache,
910 fseq,
911 fctl: Some(fctl),
912 metrics: None,
913 }
914 }
915
916 pub fn with_metrics(mut self, metrics: &'a Metrics) -> Self {
920 self.metrics = Some(metrics);
921 self
922 }
923
924 #[must_use = "publishing may fail; check the result"]
929 pub fn publish(
930 &self,
931 payload: &[u8],
932 sig: u64,
933 ctl: u16,
934 ts: u32,
935 ) -> Result<FragmentMetadata, TangoError> {
936 if let Some(fctl) = self.fctl {
938 if !fctl.acquire(1) {
939 if let Some(metrics) = self.metrics {
940 metrics.record_backpressure();
941 }
942 return Err(TangoError::NoCredits);
943 }
944 }
945
946 let seq = self.fseq.next();
947 let chunk = self.dcache.allocate();
948 let size = self.dcache.write_chunk(chunk, payload)? as u32;
949 let meta = FragmentMetadata {
950 seq,
951 sig,
952 chunk,
953 size,
954 ctl,
955 reserved: 0,
956 ts,
957 };
958 self.mcache.publish(meta);
959
960 if let Some(metrics) = self.metrics {
961 metrics.record_publish();
962 }
963
964 Ok(meta)
965 }
966
967 #[must_use = "publishing may fail; check the result"]
971 pub fn publish_blocking(
972 &self,
973 payload: &[u8],
974 sig: u64,
975 ctl: u16,
976 ts: u32,
977 ) -> Result<FragmentMetadata, TangoError> {
978 loop {
979 match self.publish(payload, sig, ctl, ts) {
980 Ok(meta) => return Ok(meta),
981 Err(TangoError::NoCredits) => {
982 if !self.mcache.is_running() {
983 return Err(TangoError::NoCredits);
984 }
985 core::hint::spin_loop();
986 }
987 Err(e) => return Err(e),
988 }
989 }
990 }
991
992 pub fn publish_batch(&self, payloads: &[&[u8]], sig: u64, ctl: u16) -> usize {
1000 let mut published = 0;
1001 for (i, payload) in payloads.iter().enumerate() {
1002 match self.publish(payload, sig, ctl, i as u32) {
1003 Ok(_) => published += 1,
1004 Err(_) => break,
1005 }
1006 }
1007 published
1008 }
1009}
1010
1011#[derive(Debug)]
1012pub struct Fragment<'a, const CHUNK_SIZE: usize> {
1013 pub meta: FragmentMetadata,
1014 pub payload: DcacheView<'a, CHUNK_SIZE>,
1015}
1016
1017pub struct Consumer<
1018 'a,
1019 const MCACHE_DEPTH: usize,
1020 const CHUNK_COUNT: usize,
1021 const CHUNK_SIZE: usize,
1022> {
1023 mcache: &'a MCache<MCACHE_DEPTH>,
1024 dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
1025 fctl: Option<&'a Fctl>,
1026 metrics: Option<&'a Metrics>,
1027 next_seq: u64,
1028}
1029
1030impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
1031 for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1032{
1033 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1034 f.debug_struct("Consumer")
1035 .field("next_seq", &self.next_seq)
1036 .field("has_flow_control", &self.fctl.is_some())
1037 .field("has_metrics", &self.metrics.is_some())
1038 .finish()
1039 }
1040}
1041
1042impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
1043 Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1044{
1045 pub fn new(
1047 mcache: &'a MCache<MCACHE_DEPTH>,
1048 dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
1049 initial_seq: u64,
1050 ) -> Self {
1051 Self {
1052 mcache,
1053 dcache,
1054 fctl: None,
1055 metrics: None,
1056 next_seq: initial_seq,
1057 }
1058 }
1059
1060 pub fn with_flow_control(
1065 mcache: &'a MCache<MCACHE_DEPTH>,
1066 dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
1067 fctl: &'a Fctl,
1068 initial_seq: u64,
1069 ) -> Self {
1070 Self {
1071 mcache,
1072 dcache,
1073 fctl: Some(fctl),
1074 metrics: None,
1075 next_seq: initial_seq,
1076 }
1077 }
1078
1079 pub fn with_metrics(mut self, metrics: &'a Metrics) -> Self {
1083 self.metrics = Some(metrics);
1084 self
1085 }
1086
1087 #[must_use = "polling may return data or an error; check the result"]
1094 pub fn poll(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError> {
1095 let seq = self.next_seq;
1096 match self.mcache.try_read(seq) {
1097 ReadResult::Ok(meta) => {
1098 let payload = self.dcache.read_chunk(meta.chunk, meta.size as usize)?;
1099 self.next_seq = seq + 1;
1100
1101 if let Some(fctl) = self.fctl {
1103 fctl.release(1);
1104 }
1105
1106 if let Some(metrics) = self.metrics {
1107 metrics.record_consume();
1108 }
1109
1110 Ok(Some(Fragment { meta, payload }))
1111 }
1112 ReadResult::NotReady => Ok(None),
1113 ReadResult::Overrun => {
1114 if let Some(metrics) = self.metrics {
1115 metrics.record_overrun();
1116 }
1117 Err(TangoError::Overrun)
1118 }
1119 }
1120 }
1121
1122 #[must_use = "waiting may return data or an error; check the result"]
1129 pub fn wait(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError> {
1130 let seq = self.next_seq;
1131 match self.mcache.wait(seq) {
1132 ReadResult::Ok(meta) => {
1133 let payload = self.dcache.read_chunk(meta.chunk, meta.size as usize)?;
1134 self.next_seq = seq + 1;
1135
1136 if let Some(fctl) = self.fctl {
1138 fctl.release(1);
1139 }
1140
1141 if let Some(metrics) = self.metrics {
1142 metrics.record_consume();
1143 }
1144
1145 Ok(Some(Fragment { meta, payload }))
1146 }
1147 ReadResult::NotReady => Ok(None),
1148 ReadResult::Overrun => {
1149 if let Some(metrics) = self.metrics {
1150 metrics.record_overrun();
1151 }
1152 Err(TangoError::Overrun)
1153 }
1154 }
1155 }
1156
1157 pub fn next_seq(&self) -> u64 {
1159 self.next_seq
1160 }
1161
1162 pub fn release_credits(&self, count: u64) {
1167 if let Some(fctl) = self.fctl {
1168 fctl.release(count);
1169 }
1170 }
1171
1172 #[cfg(feature = "std")]
1182 pub fn poll_batch(
1183 &mut self,
1184 max_count: usize,
1185 ) -> Result<std::vec::Vec<Fragment<'a, CHUNK_SIZE>>, TangoError> {
1186 let mut fragments = std::vec::Vec::with_capacity(max_count);
1187 for _ in 0..max_count {
1188 match self.poll() {
1189 Ok(Some(fragment)) => fragments.push(fragment),
1190 Ok(None) => break,
1191 Err(e) => {
1192 if fragments.is_empty() {
1193 return Err(e);
1194 }
1195 break;
1196 }
1197 }
1198 }
1199 Ok(fragments)
1200 }
1201}
1202
1203impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> IntoIterator
1204 for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1205{
1206 type Item = Result<Fragment<'a, CHUNK_SIZE>, TangoError>;
1207 type IntoIter = ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>;
1208
1209 fn into_iter(self) -> Self::IntoIter {
1227 ConsumerIter { consumer: self }
1228 }
1229}
1230
1231pub struct ConsumerIter<
1236 'a,
1237 const MCACHE_DEPTH: usize,
1238 const CHUNK_COUNT: usize,
1239 const CHUNK_SIZE: usize,
1240> {
1241 consumer: Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>,
1242}
1243
1244impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
1245 for ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1246{
1247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1248 f.debug_struct("ConsumerIter")
1249 .field("consumer", &self.consumer)
1250 .finish()
1251 }
1252}
1253
1254impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Iterator
1255 for ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1256{
1257 type Item = Result<Fragment<'a, CHUNK_SIZE>, TangoError>;
1258
1259 fn next(&mut self) -> Option<Self::Item> {
1260 match self.consumer.wait() {
1261 Ok(Some(fragment)) => Some(Ok(fragment)),
1262 Ok(None) => None, Err(e) => Some(Err(e)),
1264 }
1265 }
1266}
1267
1268#[derive(Debug)]
1287pub struct ChannelBuilder<
1288 const MCACHE_DEPTH: usize,
1289 const CHUNK_COUNT: usize,
1290 const CHUNK_SIZE: usize,
1291> {
1292 initial_seq: u64,
1293 flow_control: bool,
1294 metrics: bool,
1295}
1296
1297impl<const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Default
1298 for ChannelBuilder<MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1299{
1300 fn default() -> Self {
1301 Self::new()
1302 }
1303}
1304
1305impl<const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
1306 ChannelBuilder<MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
1307{
1308 pub fn new() -> Self {
1310 Self {
1311 initial_seq: 1,
1312 flow_control: false,
1313 metrics: false,
1314 }
1315 }
1316
1317 pub fn initial_seq(mut self, seq: u64) -> Self {
1319 self.initial_seq = seq;
1320 self
1321 }
1322
1323 pub fn with_flow_control(mut self) -> Self {
1325 self.flow_control = true;
1326 self
1327 }
1328
1329 pub fn with_metrics(mut self) -> Self {
1331 self.metrics = true;
1332 self
1333 }
1334
1335 #[must_use = "this returns the channel components that should be used"]
1344 pub fn build(
1345 self,
1346 ) -> (
1347 MCache<MCACHE_DEPTH>,
1348 DCache<CHUNK_COUNT, CHUNK_SIZE>,
1349 Fseq,
1350 Option<Fctl>,
1351 Option<Metrics>,
1352 ) {
1353 let mcache = MCache::new();
1354 let dcache = DCache::new();
1355 let fseq = Fseq::new(self.initial_seq);
1356 let fctl = if self.flow_control {
1357 Some(Fctl::new(CHUNK_COUNT as u64))
1358 } else {
1359 None
1360 };
1361 let metrics = if self.metrics {
1362 Some(Metrics::new())
1363 } else {
1364 None
1365 };
1366 (mcache, dcache, fseq, fctl, metrics)
1367 }
1368}
1369
1370#[cfg(all(test, feature = "std", not(loom)))]
1371mod tests {
1372 use super::*;
1373 use std::sync::atomic::{AtomicUsize, Ordering};
1374 use std::thread;
1375
1376 const MCACHE_DEPTH: usize = 8;
1377 const CHUNK_COUNT: usize = 8;
1378 const CHUNK_SIZE: usize = 64;
1379
1380 #[test]
1381 fn publish_and_consume() {
1382 let mcache = MCache::<MCACHE_DEPTH>::new();
1383 let dcache = DCache::<CHUNK_COUNT, CHUNK_SIZE>::new();
1384 let fseq = Fseq::new(1);
1385 let producer = Producer::new(&mcache, &dcache, &fseq);
1386 let mut consumer = Consumer::new(&mcache, &dcache, 1);
1387
1388 let meta = producer.publish(b"hello", 42, 7, 1234).expect("publish");
1389 assert_eq!(meta.seq, 1);
1390
1391 let fragment = consumer.poll().expect("poll").expect("fragment");
1392 assert_eq!(fragment.meta.sig, 42);
1393 assert_eq!(fragment.payload.read(), b"hello");
1394 }
1395
1396 #[test]
1397 fn publish_and_consume_with_flow_control() {
1398 let mcache = MCache::<MCACHE_DEPTH>::new();
1399 let dcache = DCache::<CHUNK_COUNT, CHUNK_SIZE>::new();
1400 let fseq = Fseq::new(1);
1401 let fctl = Fctl::new(CHUNK_COUNT as u64);
1402
1403 let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
1404 let mut consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
1405
1406 for i in 0..CHUNK_COUNT {
1408 producer
1409 .publish(b"test", i as u64, 0, 0)
1410 .expect("publish should succeed");
1411 }
1412
1413 assert!(matches!(
1415 producer.publish(b"fail", 0, 0, 0),
1416 Err(TangoError::NoCredits)
1417 ));
1418
1419 let _ = consumer.poll().expect("poll").expect("fragment");
1421
1422 producer
1424 .publish(b"success", 0, 0, 0)
1425 .expect("publish should succeed after credit release");
1426 }
1427
1428 #[test]
1429 fn detect_overrun() {
1430 let mcache = MCache::<4>::new();
1431 let dcache = DCache::<8, 64>::new();
1432 let fseq = Fseq::new(1);
1433
1434 let producer = Producer::new(&mcache, &dcache, &fseq);
1435 let mut consumer = Consumer::new(&mcache, &dcache, 1);
1436
1437 for i in 0..8u64 {
1439 producer.publish(b"msg", i, 0, 0).expect("publish");
1440 }
1441
1442 assert!(matches!(consumer.poll(), Err(TangoError::Overrun)));
1444 }
1445
1446 #[test]
1447 fn read_result_not_ready() {
1448 let mcache = MCache::<8>::new();
1449
1450 assert!(matches!(mcache.try_read(1), ReadResult::NotReady));
1452 }
1453
1454 #[test]
1455 fn publish_and_consume_across_threads() {
1456 let mcache = MCache::<64>::new();
1457 let dcache = DCache::<64, 64>::new();
1458 let fseq = Fseq::new(1);
1459 let producer = Producer::new(&mcache, &dcache, &fseq);
1460 let consumer = Consumer::new(&mcache, &dcache, 1);
1461 let received = AtomicUsize::new(0);
1462
1463 thread::scope(|scope| {
1464 scope.spawn(|| {
1465 let mut consumer = consumer;
1466 while received.load(Ordering::Acquire) < 3 {
1467 match consumer.poll() {
1468 Ok(Some(fragment)) => {
1469 let payload = fragment.payload.read();
1470 println!("received: {:?}", String::from_utf8_lossy(&payload));
1471 assert!(payload.starts_with(b"msg-"));
1472 received.fetch_add(1, Ordering::AcqRel);
1473 }
1474 Ok(None) => thread::yield_now(),
1475 Err(e) => panic!("unexpected error: {}", e),
1476 }
1477 }
1478 });
1479
1480 scope.spawn(|| {
1481 for idx in 0..3u8 {
1482 let payload = [b'm', b's', b'g', b'-', b'0' + idx];
1483 producer
1484 .publish(&payload, 0xAA, 0, idx as u32)
1485 .expect("publish");
1486 }
1487 });
1488 });
1489
1490 assert_eq!(received.load(Ordering::Acquire), 3);
1491 }
1492
1493 #[test]
1494 fn flow_control_across_threads() {
1495 let mcache = MCache::<64>::new();
1496 let dcache = DCache::<64, 64>::new();
1497 let fseq = Fseq::new(1);
1498 let fctl = Fctl::new(64);
1499
1500 let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
1501 let consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
1502 let received = AtomicUsize::new(0);
1503
1504 thread::scope(|scope| {
1505 scope.spawn(|| {
1506 let mut consumer = consumer;
1507 while received.load(Ordering::Acquire) < 100 {
1508 match consumer.poll() {
1509 Ok(Some(_)) => {
1510 received.fetch_add(1, Ordering::AcqRel);
1511 }
1512 Ok(None) => thread::yield_now(),
1513 Err(e) => panic!("unexpected error: {}", e),
1514 }
1515 }
1516 });
1517
1518 scope.spawn(|| {
1519 for i in 0..100u32 {
1520 producer
1522 .publish_blocking(b"test", i as u64, 0, i)
1523 .expect("publish");
1524 }
1525 });
1526 });
1527
1528 assert_eq!(received.load(Ordering::Acquire), 100);
1529 }
1530
1531 #[test]
1532 fn metrics_tracking() {
1533 let mcache = MCache::<8>::new();
1534 let dcache = DCache::<16, 64>::new();
1535 let fseq = Fseq::new(1);
1536 let fctl = Fctl::new(8);
1537 let metrics = Metrics::new();
1538
1539 let producer =
1540 Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl).with_metrics(&metrics);
1541 let mut consumer =
1542 Consumer::with_flow_control(&mcache, &dcache, &fctl, 1).with_metrics(&metrics);
1543
1544 for i in 0..5 {
1546 producer.publish(b"test", i, 0, 0).expect("publish");
1547 }
1548
1549 for _ in 0..3 {
1551 consumer.poll().expect("poll").expect("fragment");
1552 }
1553
1554 let snapshot = metrics.snapshot();
1555 assert_eq!(snapshot.published, 5);
1556 assert_eq!(snapshot.consumed, 3);
1557 assert_eq!(snapshot.lag(), 2);
1558 assert_eq!(snapshot.overruns, 0);
1559 assert_eq!(snapshot.backpressure_events, 0);
1560 }
1561
1562 #[test]
1563 fn metrics_backpressure_tracking() {
1564 let mcache = MCache::<8>::new();
1565 let dcache = DCache::<8, 64>::new();
1566 let fseq = Fseq::new(1);
1567 let fctl = Fctl::new(2); let metrics = Metrics::new();
1569
1570 let producer =
1571 Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl).with_metrics(&metrics);
1572
1573 producer.publish(b"1", 1, 0, 0).expect("first");
1575 producer.publish(b"2", 2, 0, 0).expect("second");
1576
1577 assert!(producer.publish(b"3", 3, 0, 0).is_err());
1579
1580 let snapshot = metrics.snapshot();
1581 assert_eq!(snapshot.published, 2);
1582 assert_eq!(snapshot.backpressure_events, 1);
1583 }
1584
1585 #[test]
1586 fn zero_copy_read() {
1587 let mcache = MCache::<8>::new();
1588 let dcache = DCache::<8, 64>::new();
1589 let fseq = Fseq::new(1);
1590 let producer = Producer::new(&mcache, &dcache, &fseq);
1591 let mut consumer = Consumer::new(&mcache, &dcache, 1);
1592
1593 producer.publish(b"hello world", 42, 0, 0).expect("publish");
1594
1595 let fragment = consumer.poll().expect("poll").expect("fragment");
1596
1597 let slice = fragment.payload.as_slice();
1599 assert_eq!(slice, b"hello world");
1600 assert_eq!(fragment.payload.len(), 11);
1601 assert!(!fragment.payload.is_empty());
1602 }
1603}