#![cfg_attr(not(feature = "std"), no_std)]
#[cfg(feature = "std")]
extern crate std;
#[cfg(loom)]
use loom::cell::UnsafeCell;
#[cfg(loom)]
use loom::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
#[cfg(not(loom))]
use core::cell::UnsafeCell;
#[cfg(not(loom))]
use core::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
use core::fmt;
use core::mem::size_of;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
#[repr(C, align(16))]
pub struct FragmentMetadata {
pub seq: u64,
pub sig: u64,
pub chunk: u32,
pub size: u32,
pub ctl: u16,
pub reserved: u16,
pub ts: u32,
}
const _: () = {
assert!(size_of::<FragmentMetadata>() == 32);
};
impl fmt::Display for FragmentMetadata {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Fragment {{ seq={}, sig={:#x}, chunk={}, size={}, ctl={}, ts={} }}",
self.seq, self.sig, self.chunk, self.size, self.ctl, self.ts
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use = "this error should be handled"]
pub enum TangoError {
DcacheFull,
ChunkOutOfRange(u32),
Overrun,
NoCredits,
}
impl fmt::Display for TangoError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TangoError::DcacheFull => write!(f, "dcache is out of capacity"),
TangoError::ChunkOutOfRange(idx) => write!(f, "chunk index {} out of range", idx),
TangoError::Overrun => write!(f, "consumer overrun: producer lapped the consumer"),
TangoError::NoCredits => write!(f, "no credits available for backpressure"),
}
}
}
#[cfg(feature = "std")]
impl std::error::Error for TangoError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use = "this `ReadResult` may contain data that should be handled"]
pub enum ReadResult<T> {
Ok(T),
NotReady,
Overrun,
}
impl<T> ReadResult<T> {
#[inline]
pub fn is_ok(&self) -> bool {
matches!(self, ReadResult::Ok(_))
}
#[inline]
pub fn is_not_ready(&self) -> bool {
matches!(self, ReadResult::NotReady)
}
#[inline]
pub fn is_overrun(&self) -> bool {
matches!(self, ReadResult::Overrun)
}
#[inline]
pub fn ok(self) -> Option<T> {
match self {
ReadResult::Ok(v) => Some(v),
_ => None,
}
}
}
impl<T: fmt::Debug> fmt::Display for ReadResult<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ReadResult::Ok(v) => write!(f, "Ok({:?})", v),
ReadResult::NotReady => write!(f, "NotReady"),
ReadResult::Overrun => write!(f, "Overrun"),
}
}
}
#[derive(Debug)]
pub struct Metrics {
published: AtomicU64,
consumed: AtomicU64,
overruns: AtomicU64,
backpressure_events: AtomicU64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
#[must_use = "this snapshot contains metrics data that should be used"]
pub struct MetricsSnapshot {
pub published: u64,
pub consumed: u64,
pub overruns: u64,
pub backpressure_events: u64,
}
impl MetricsSnapshot {
#[inline]
pub fn lag(&self) -> u64 {
self.published.saturating_sub(self.consumed)
}
}
impl fmt::Display for MetricsSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"published={}, consumed={}, lag={}, overruns={}, backpressure={}",
self.published,
self.consumed,
self.lag(),
self.overruns,
self.backpressure_events
)
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
impl Metrics {
pub fn new() -> Self {
Self {
published: AtomicU64::new(0),
consumed: AtomicU64::new(0),
overruns: AtomicU64::new(0),
backpressure_events: AtomicU64::new(0),
}
}
#[inline]
pub fn record_publish(&self) {
self.published.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_consume(&self) {
self.consumed.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_overrun(&self) {
self.overruns.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn record_backpressure(&self) {
self.backpressure_events.fetch_add(1, Ordering::Relaxed);
}
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
published: self.published.load(Ordering::Acquire),
consumed: self.consumed.load(Ordering::Acquire),
overruns: self.overruns.load(Ordering::Acquire),
backpressure_events: self.backpressure_events.load(Ordering::Acquire),
}
}
pub fn reset(&self) {
self.published.store(0, Ordering::Release);
self.consumed.store(0, Ordering::Release);
self.overruns.store(0, Ordering::Release);
self.backpressure_events.store(0, Ordering::Release);
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum CncState {
Boot = 0,
Run = 1,
Halt = 2,
}
impl CncState {
fn from_u8(value: u8) -> Self {
match value {
0 => CncState::Boot,
1 => CncState::Run,
_ => CncState::Halt,
}
}
}
impl fmt::Display for CncState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CncState::Boot => write!(f, "Boot"),
CncState::Run => write!(f, "Run"),
CncState::Halt => write!(f, "Halt"),
}
}
}
#[derive(Debug)]
pub struct Cnc {
state: AtomicU8,
}
impl Default for Cnc {
fn default() -> Self {
Self::new()
}
}
impl Cnc {
pub fn new() -> Self {
Self {
state: AtomicU8::new(CncState::Boot as u8),
}
}
pub fn state(&self) -> CncState {
CncState::from_u8(self.state.load(Ordering::Acquire))
}
pub fn set_state(&self, state: CncState) {
self.state.store(state as u8, Ordering::Release);
}
}
#[derive(Debug)]
pub struct Fseq {
next: AtomicU64,
}
impl Fseq {
pub fn new(initial: u64) -> Self {
Self {
next: AtomicU64::new(initial),
}
}
pub fn next(&self) -> u64 {
self.next.fetch_add(1, Ordering::AcqRel)
}
pub fn current(&self) -> u64 {
self.next.load(Ordering::Acquire)
}
}
#[derive(Debug)]
pub struct Fctl {
credits: AtomicU64,
}
impl Fctl {
pub fn new(initial: u64) -> Self {
Self {
credits: AtomicU64::new(initial),
}
}
#[must_use = "returns whether the credits were successfully acquired"]
pub fn acquire(&self, amount: u64) -> bool {
let mut current = self.credits.load(Ordering::Acquire);
loop {
if current < amount {
return false;
}
match self.credits.compare_exchange(
current,
current - amount,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(next) => current = next,
}
}
}
pub fn release(&self, amount: u64) {
self.credits.fetch_add(amount, Ordering::AcqRel);
}
pub fn available(&self) -> u64 {
self.credits.load(Ordering::Acquire)
}
}
const fn is_power_of_two(value: usize) -> bool {
value != 0 && (value & (value - 1)) == 0
}
#[derive(Debug)]
pub struct Tcache<const WORDS: usize> {
bits: [AtomicU64; WORDS],
mask: u64,
}
impl<const WORDS: usize> Default for Tcache<WORDS> {
fn default() -> Self {
Self::new()
}
}
impl<const WORDS: usize> Tcache<WORDS> {
const BIT_COUNT: usize = WORDS * 64;
const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(Self::BIT_COUNT));
pub fn new() -> Self {
let () = Self::ASSERT_POWER_OF_TWO;
Self {
bits: core::array::from_fn(|_| AtomicU64::new(0)),
mask: (Self::BIT_COUNT - 1) as u64,
}
}
pub fn check_and_insert(&self, tag: u64) -> bool {
let bit = tag.wrapping_mul(0x9E37_79B9_7F4A_7C15) & self.mask;
let word_idx = (bit / 64) as usize;
let bit_mask = 1u64 << (bit % 64);
let prev = self.bits[word_idx].fetch_or(bit_mask, Ordering::AcqRel);
(prev & bit_mask) == 0
}
pub fn len(&self) -> usize {
self.bits
.iter()
.map(|word| word.load(Ordering::Acquire).count_ones() as usize)
.sum()
}
pub fn is_empty(&self) -> bool {
self.bits
.iter()
.all(|word| word.load(Ordering::Acquire) == 0)
}
}
const CACHE_LINE_SIZE: usize = 64;
#[repr(C, align(64))]
struct MCacheEntry {
seq: AtomicU64,
_pad: [u8; CACHE_LINE_SIZE - size_of::<AtomicU64>()],
meta: UnsafeCell<FragmentMetadata>,
}
impl fmt::Debug for MCacheEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MCacheEntry")
.field("seq", &self.seq.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
unsafe impl Sync for MCacheEntry {}
impl MCacheEntry {
fn new() -> Self {
Self {
seq: AtomicU64::new(0),
_pad: [0u8; CACHE_LINE_SIZE - size_of::<AtomicU64>()],
meta: UnsafeCell::new(FragmentMetadata::default()),
}
}
}
#[derive(Debug)]
pub struct MCache<const DEPTH: usize> {
mask: u64,
entries: [MCacheEntry; DEPTH],
running: AtomicBool,
}
impl<const DEPTH: usize> Default for MCache<DEPTH> {
fn default() -> Self {
Self::new()
}
}
impl<const DEPTH: usize> MCache<DEPTH> {
const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(DEPTH));
pub fn new() -> Self {
let () = Self::ASSERT_POWER_OF_TWO;
Self {
mask: (DEPTH - 1) as u64,
entries: core::array::from_fn(|_| MCacheEntry::new()),
running: AtomicBool::new(true),
}
}
pub fn stop(&self) {
self.running.store(false, Ordering::Release);
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Acquire)
}
pub fn publish(&self, meta: FragmentMetadata) {
let idx = (meta.seq & self.mask) as usize;
let entry = &self.entries[idx];
unsafe {
*entry.meta.get() = meta;
}
entry.seq.store(meta.seq, Ordering::Release);
}
pub fn wait(&self, seq: u64) -> ReadResult<FragmentMetadata> {
while self.is_running() {
match self.try_read(seq) {
ReadResult::Ok(meta) => return ReadResult::Ok(meta),
ReadResult::Overrun => return ReadResult::Overrun,
ReadResult::NotReady => core::hint::spin_loop(),
}
}
ReadResult::NotReady
}
pub fn try_read(&self, seq: u64) -> ReadResult<FragmentMetadata> {
let idx = (seq & self.mask) as usize;
let entry = &self.entries[idx];
let seq_before = entry.seq.load(Ordering::Acquire);
if seq_before < seq {
return ReadResult::NotReady;
}
if seq_before > seq {
return ReadResult::Overrun;
}
let meta = unsafe { *entry.meta.get() };
let seq_after = entry.seq.load(Ordering::Acquire);
if seq_before == seq_after {
ReadResult::Ok(meta)
} else {
ReadResult::Overrun
}
}
}
#[repr(C, align(64))]
struct DcacheChunk<const CHUNK_SIZE: usize> {
data: UnsafeCell<[u8; CHUNK_SIZE]>,
}
impl<const CHUNK_SIZE: usize> fmt::Debug for DcacheChunk<CHUNK_SIZE> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DcacheChunk")
.field("size", &CHUNK_SIZE)
.finish_non_exhaustive()
}
}
impl<const CHUNK_SIZE: usize> DcacheChunk<CHUNK_SIZE> {
fn new() -> Self {
Self {
data: UnsafeCell::new([0u8; CHUNK_SIZE]),
}
}
}
unsafe impl<const CHUNK_SIZE: usize> Sync for DcacheChunk<CHUNK_SIZE> {}
#[derive(Debug, Clone, Copy)]
pub struct DcacheView<'a, const CHUNK_SIZE: usize> {
chunk: &'a DcacheChunk<CHUNK_SIZE>,
size: usize,
}
impl<'a, const CHUNK_SIZE: usize> DcacheView<'a, CHUNK_SIZE> {
#[inline]
pub fn as_slice(&self) -> &'a [u8] {
let data = unsafe { &*self.chunk.data.get() };
&data[..self.size]
}
#[cfg(feature = "std")]
pub fn read(&self) -> std::vec::Vec<u8> {
self.as_slice().to_vec()
}
pub fn with_reader<T>(&self, f: impl FnOnce(&[u8]) -> T) -> T {
f(self.as_slice())
}
#[inline]
pub fn len(&self) -> usize {
self.size
}
#[inline]
pub fn is_empty(&self) -> bool {
self.size == 0
}
}
#[derive(Debug)]
pub struct DCache<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> {
chunks: [DcacheChunk<CHUNK_SIZE>; CHUNK_COUNT],
next: AtomicU64,
mask: u64,
}
const _: () = {
};
impl<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Default
for DCache<CHUNK_COUNT, CHUNK_SIZE>
{
fn default() -> Self {
Self::new()
}
}
impl<const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> DCache<CHUNK_COUNT, CHUNK_SIZE> {
const ASSERT_POWER_OF_TWO: () = assert!(is_power_of_two(CHUNK_COUNT));
pub fn new() -> Self {
let () = Self::ASSERT_POWER_OF_TWO;
Self {
chunks: core::array::from_fn(|_| DcacheChunk::new()),
next: AtomicU64::new(0),
mask: (CHUNK_COUNT - 1) as u64,
}
}
pub fn allocate(&self) -> u32 {
let seq = self.next.fetch_add(1, Ordering::AcqRel);
(seq & self.mask) as u32
}
pub fn capacity(&self) -> usize {
CHUNK_COUNT
}
pub fn write_chunk(&self, chunk: u32, payload: &[u8]) -> Result<usize, TangoError> {
let idx = chunk as usize;
let Some(target) = self.chunks.get(idx) else {
return Err(TangoError::ChunkOutOfRange(chunk));
};
let size = payload.len().min(CHUNK_SIZE);
unsafe {
let data = &mut *target.data.get();
data[..size].copy_from_slice(&payload[..size]);
}
Ok(size)
}
pub fn read_chunk(
&self,
chunk: u32,
size: usize,
) -> Result<DcacheView<'_, CHUNK_SIZE>, TangoError> {
let idx = chunk as usize;
let Some(target) = self.chunks.get(idx) else {
return Err(TangoError::ChunkOutOfRange(chunk));
};
Ok(DcacheView {
chunk: target,
size: size.min(CHUNK_SIZE),
})
}
pub fn chunk_size(&self) -> usize {
CHUNK_SIZE
}
}
#[derive(Clone, Copy)]
pub struct Producer<
'a,
const MCACHE_DEPTH: usize,
const CHUNK_COUNT: usize,
const CHUNK_SIZE: usize,
> {
mcache: &'a MCache<MCACHE_DEPTH>,
dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
fseq: &'a Fseq,
fctl: Option<&'a Fctl>,
metrics: Option<&'a Metrics>,
}
impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
for Producer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Producer")
.field("has_flow_control", &self.fctl.is_some())
.field("has_metrics", &self.metrics.is_some())
.finish()
}
}
impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
Producer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
pub fn new(
mcache: &'a MCache<MCACHE_DEPTH>,
dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
fseq: &'a Fseq,
) -> Self {
Self {
mcache,
dcache,
fseq,
fctl: None,
metrics: None,
}
}
pub fn with_flow_control(
mcache: &'a MCache<MCACHE_DEPTH>,
dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
fseq: &'a Fseq,
fctl: &'a Fctl,
) -> Self {
Self {
mcache,
dcache,
fseq,
fctl: Some(fctl),
metrics: None,
}
}
pub fn with_metrics(mut self, metrics: &'a Metrics) -> Self {
self.metrics = Some(metrics);
self
}
#[must_use = "publishing may fail; check the result"]
pub fn publish(
&self,
payload: &[u8],
sig: u64,
ctl: u16,
ts: u32,
) -> Result<FragmentMetadata, TangoError> {
if let Some(fctl) = self.fctl {
if !fctl.acquire(1) {
if let Some(metrics) = self.metrics {
metrics.record_backpressure();
}
return Err(TangoError::NoCredits);
}
}
let seq = self.fseq.next();
let chunk = self.dcache.allocate();
let size = self.dcache.write_chunk(chunk, payload)? as u32;
let meta = FragmentMetadata {
seq,
sig,
chunk,
size,
ctl,
reserved: 0,
ts,
};
self.mcache.publish(meta);
if let Some(metrics) = self.metrics {
metrics.record_publish();
}
Ok(meta)
}
#[must_use = "publishing may fail; check the result"]
pub fn publish_blocking(
&self,
payload: &[u8],
sig: u64,
ctl: u16,
ts: u32,
) -> Result<FragmentMetadata, TangoError> {
loop {
match self.publish(payload, sig, ctl, ts) {
Ok(meta) => return Ok(meta),
Err(TangoError::NoCredits) => {
if !self.mcache.is_running() {
return Err(TangoError::NoCredits);
}
core::hint::spin_loop();
}
Err(e) => return Err(e),
}
}
}
pub fn publish_batch(&self, payloads: &[&[u8]], sig: u64, ctl: u16) -> usize {
let mut published = 0;
for (i, payload) in payloads.iter().enumerate() {
match self.publish(payload, sig, ctl, i as u32) {
Ok(_) => published += 1,
Err(_) => break,
}
}
published
}
}
#[derive(Debug)]
pub struct Fragment<'a, const CHUNK_SIZE: usize> {
pub meta: FragmentMetadata,
pub payload: DcacheView<'a, CHUNK_SIZE>,
}
pub struct Consumer<
'a,
const MCACHE_DEPTH: usize,
const CHUNK_COUNT: usize,
const CHUNK_SIZE: usize,
> {
mcache: &'a MCache<MCACHE_DEPTH>,
dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
fctl: Option<&'a Fctl>,
metrics: Option<&'a Metrics>,
next_seq: u64,
}
impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Consumer")
.field("next_seq", &self.next_seq)
.field("has_flow_control", &self.fctl.is_some())
.field("has_metrics", &self.metrics.is_some())
.finish()
}
}
impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
pub fn new(
mcache: &'a MCache<MCACHE_DEPTH>,
dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
initial_seq: u64,
) -> Self {
Self {
mcache,
dcache,
fctl: None,
metrics: None,
next_seq: initial_seq,
}
}
pub fn with_flow_control(
mcache: &'a MCache<MCACHE_DEPTH>,
dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>,
fctl: &'a Fctl,
initial_seq: u64,
) -> Self {
Self {
mcache,
dcache,
fctl: Some(fctl),
metrics: None,
next_seq: initial_seq,
}
}
pub fn with_metrics(mut self, metrics: &'a Metrics) -> Self {
self.metrics = Some(metrics);
self
}
#[must_use = "polling may return data or an error; check the result"]
pub fn poll(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError> {
let seq = self.next_seq;
match self.mcache.try_read(seq) {
ReadResult::Ok(meta) => {
let payload = self.dcache.read_chunk(meta.chunk, meta.size as usize)?;
self.next_seq = seq + 1;
if let Some(fctl) = self.fctl {
fctl.release(1);
}
if let Some(metrics) = self.metrics {
metrics.record_consume();
}
Ok(Some(Fragment { meta, payload }))
}
ReadResult::NotReady => Ok(None),
ReadResult::Overrun => {
if let Some(metrics) = self.metrics {
metrics.record_overrun();
}
Err(TangoError::Overrun)
}
}
}
#[must_use = "waiting may return data or an error; check the result"]
pub fn wait(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError> {
let seq = self.next_seq;
match self.mcache.wait(seq) {
ReadResult::Ok(meta) => {
let payload = self.dcache.read_chunk(meta.chunk, meta.size as usize)?;
self.next_seq = seq + 1;
if let Some(fctl) = self.fctl {
fctl.release(1);
}
if let Some(metrics) = self.metrics {
metrics.record_consume();
}
Ok(Some(Fragment { meta, payload }))
}
ReadResult::NotReady => Ok(None),
ReadResult::Overrun => {
if let Some(metrics) = self.metrics {
metrics.record_overrun();
}
Err(TangoError::Overrun)
}
}
}
pub fn next_seq(&self) -> u64 {
self.next_seq
}
pub fn release_credits(&self, count: u64) {
if let Some(fctl) = self.fctl {
fctl.release(count);
}
}
#[cfg(feature = "std")]
pub fn poll_batch(
&mut self,
max_count: usize,
) -> Result<std::vec::Vec<Fragment<'a, CHUNK_SIZE>>, TangoError> {
let mut fragments = std::vec::Vec::with_capacity(max_count);
for _ in 0..max_count {
match self.poll() {
Ok(Some(fragment)) => fragments.push(fragment),
Ok(None) => break,
Err(e) => {
if fragments.is_empty() {
return Err(e);
}
break;
}
}
}
Ok(fragments)
}
}
impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> IntoIterator
for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
type Item = Result<Fragment<'a, CHUNK_SIZE>, TangoError>;
type IntoIter = ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>;
fn into_iter(self) -> Self::IntoIter {
ConsumerIter { consumer: self }
}
}
pub struct ConsumerIter<
'a,
const MCACHE_DEPTH: usize,
const CHUNK_COUNT: usize,
const CHUNK_SIZE: usize,
> {
consumer: Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>,
}
impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> fmt::Debug
for ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConsumerIter")
.field("consumer", &self.consumer)
.finish()
}
}
impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Iterator
for ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
type Item = Result<Fragment<'a, CHUNK_SIZE>, TangoError>;
fn next(&mut self) -> Option<Self::Item> {
match self.consumer.wait() {
Ok(Some(fragment)) => Some(Ok(fragment)),
Ok(None) => None, Err(e) => Some(Err(e)),
}
}
}
#[derive(Debug)]
pub struct ChannelBuilder<
const MCACHE_DEPTH: usize,
const CHUNK_COUNT: usize,
const CHUNK_SIZE: usize,
> {
initial_seq: u64,
flow_control: bool,
metrics: bool,
}
impl<const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Default
for ChannelBuilder<MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
fn default() -> Self {
Self::new()
}
}
impl<const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize>
ChannelBuilder<MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>
{
pub fn new() -> Self {
Self {
initial_seq: 1,
flow_control: false,
metrics: false,
}
}
pub fn initial_seq(mut self, seq: u64) -> Self {
self.initial_seq = seq;
self
}
pub fn with_flow_control(mut self) -> Self {
self.flow_control = true;
self
}
pub fn with_metrics(mut self) -> Self {
self.metrics = true;
self
}
#[must_use = "this returns the channel components that should be used"]
pub fn build(
self,
) -> (
MCache<MCACHE_DEPTH>,
DCache<CHUNK_COUNT, CHUNK_SIZE>,
Fseq,
Option<Fctl>,
Option<Metrics>,
) {
let mcache = MCache::new();
let dcache = DCache::new();
let fseq = Fseq::new(self.initial_seq);
let fctl = if self.flow_control {
Some(Fctl::new(CHUNK_COUNT as u64))
} else {
None
};
let metrics = if self.metrics {
Some(Metrics::new())
} else {
None
};
(mcache, dcache, fseq, fctl, metrics)
}
}
#[cfg(all(test, feature = "std", not(loom)))]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
const MCACHE_DEPTH: usize = 8;
const CHUNK_COUNT: usize = 8;
const CHUNK_SIZE: usize = 64;
#[test]
fn publish_and_consume() {
let mcache = MCache::<MCACHE_DEPTH>::new();
let dcache = DCache::<CHUNK_COUNT, CHUNK_SIZE>::new();
let fseq = Fseq::new(1);
let producer = Producer::new(&mcache, &dcache, &fseq);
let mut consumer = Consumer::new(&mcache, &dcache, 1);
let meta = producer.publish(b"hello", 42, 7, 1234).expect("publish");
assert_eq!(meta.seq, 1);
let fragment = consumer.poll().expect("poll").expect("fragment");
assert_eq!(fragment.meta.sig, 42);
assert_eq!(fragment.payload.read(), b"hello");
}
#[test]
fn publish_and_consume_with_flow_control() {
let mcache = MCache::<MCACHE_DEPTH>::new();
let dcache = DCache::<CHUNK_COUNT, CHUNK_SIZE>::new();
let fseq = Fseq::new(1);
let fctl = Fctl::new(CHUNK_COUNT as u64);
let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
let mut consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
for i in 0..CHUNK_COUNT {
producer
.publish(b"test", i as u64, 0, 0)
.expect("publish should succeed");
}
assert!(matches!(
producer.publish(b"fail", 0, 0, 0),
Err(TangoError::NoCredits)
));
let _ = consumer.poll().expect("poll").expect("fragment");
producer
.publish(b"success", 0, 0, 0)
.expect("publish should succeed after credit release");
}
#[test]
fn detect_overrun() {
let mcache = MCache::<4>::new();
let dcache = DCache::<8, 64>::new();
let fseq = Fseq::new(1);
let producer = Producer::new(&mcache, &dcache, &fseq);
let mut consumer = Consumer::new(&mcache, &dcache, 1);
for i in 0..8u64 {
producer.publish(b"msg", i, 0, 0).expect("publish");
}
assert!(matches!(consumer.poll(), Err(TangoError::Overrun)));
}
#[test]
fn read_result_not_ready() {
let mcache = MCache::<8>::new();
assert!(matches!(mcache.try_read(1), ReadResult::NotReady));
}
#[test]
fn publish_and_consume_across_threads() {
let mcache = MCache::<64>::new();
let dcache = DCache::<64, 64>::new();
let fseq = Fseq::new(1);
let producer = Producer::new(&mcache, &dcache, &fseq);
let consumer = Consumer::new(&mcache, &dcache, 1);
let received = AtomicUsize::new(0);
thread::scope(|scope| {
scope.spawn(|| {
let mut consumer = consumer;
while received.load(Ordering::Acquire) < 3 {
match consumer.poll() {
Ok(Some(fragment)) => {
let payload = fragment.payload.read();
println!("received: {:?}", String::from_utf8_lossy(&payload));
assert!(payload.starts_with(b"msg-"));
received.fetch_add(1, Ordering::AcqRel);
}
Ok(None) => thread::yield_now(),
Err(e) => panic!("unexpected error: {}", e),
}
}
});
scope.spawn(|| {
for idx in 0..3u8 {
let payload = [b'm', b's', b'g', b'-', b'0' + idx];
producer
.publish(&payload, 0xAA, 0, idx as u32)
.expect("publish");
}
});
});
assert_eq!(received.load(Ordering::Acquire), 3);
}
#[test]
fn flow_control_across_threads() {
let mcache = MCache::<64>::new();
let dcache = DCache::<64, 64>::new();
let fseq = Fseq::new(1);
let fctl = Fctl::new(64);
let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
let consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
let received = AtomicUsize::new(0);
thread::scope(|scope| {
scope.spawn(|| {
let mut consumer = consumer;
while received.load(Ordering::Acquire) < 100 {
match consumer.poll() {
Ok(Some(_)) => {
received.fetch_add(1, Ordering::AcqRel);
}
Ok(None) => thread::yield_now(),
Err(e) => panic!("unexpected error: {}", e),
}
}
});
scope.spawn(|| {
for i in 0..100u32 {
producer
.publish_blocking(b"test", i as u64, 0, i)
.expect("publish");
}
});
});
assert_eq!(received.load(Ordering::Acquire), 100);
}
#[test]
fn metrics_tracking() {
let mcache = MCache::<8>::new();
let dcache = DCache::<16, 64>::new();
let fseq = Fseq::new(1);
let fctl = Fctl::new(8);
let metrics = Metrics::new();
let producer =
Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl).with_metrics(&metrics);
let mut consumer =
Consumer::with_flow_control(&mcache, &dcache, &fctl, 1).with_metrics(&metrics);
for i in 0..5 {
producer.publish(b"test", i, 0, 0).expect("publish");
}
for _ in 0..3 {
consumer.poll().expect("poll").expect("fragment");
}
let snapshot = metrics.snapshot();
assert_eq!(snapshot.published, 5);
assert_eq!(snapshot.consumed, 3);
assert_eq!(snapshot.lag(), 2);
assert_eq!(snapshot.overruns, 0);
assert_eq!(snapshot.backpressure_events, 0);
}
#[test]
fn metrics_backpressure_tracking() {
let mcache = MCache::<8>::new();
let dcache = DCache::<8, 64>::new();
let fseq = Fseq::new(1);
let fctl = Fctl::new(2); let metrics = Metrics::new();
let producer =
Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl).with_metrics(&metrics);
producer.publish(b"1", 1, 0, 0).expect("first");
producer.publish(b"2", 2, 0, 0).expect("second");
assert!(producer.publish(b"3", 3, 0, 0).is_err());
let snapshot = metrics.snapshot();
assert_eq!(snapshot.published, 2);
assert_eq!(snapshot.backpressure_events, 1);
}
#[test]
fn zero_copy_read() {
let mcache = MCache::<8>::new();
let dcache = DCache::<8, 64>::new();
let fseq = Fseq::new(1);
let producer = Producer::new(&mcache, &dcache, &fseq);
let mut consumer = Consumer::new(&mcache, &dcache, 1);
producer.publish(b"hello world", 42, 0, 0).expect("publish");
let fragment = consumer.poll().expect("poll").expect("fragment");
let slice = fragment.payload.as_slice();
assert_eq!(slice, b"hello world");
assert_eq!(fragment.payload.len(), 11);
assert!(!fragment.payload.is_empty());
}
}