bcast 0.0.29

Low latency broadcast (SPMC) buffer designed to work with shared memory.
Documentation
use anyhow::anyhow;
use bcast::{RingBuffer, error::Error};
use rand::{Rng, thread_rng};
use std::mem::MaybeUninit;

/// Generate random message every 1 millisecond.
#[allow(dead_code)]
pub fn writer(bytes: &[u8]) {
    let writer = RingBuffer::new(bytes).into_writer();
    loop {
        let symbol = thread_rng().gen_range(b'A'..=b'Z');
        let msg_len = thread_rng().gen_range(1..20);
        let random_bytes: Vec<u8> = (0..msg_len).map(|_| symbol).collect();
        let mut claim = writer.claim_with_user_defined(msg_len, true, symbol as u32);
        claim.get_buffer_mut().copy_from_slice(&random_bytes);
        claim.commit();
        std::thread::sleep(std::time::Duration::from_millis(1));
    }
}

/// Consume messages produced by the writer. It will also perform payload validation (using user
/// defined field set by the writer) and sleep for 10 milliseconds in order to process messages
/// in a batch.
#[allow(dead_code)]
pub fn reader(bytes: &[u8]) -> anyhow::Result<()> {
    let reader = RingBuffer::new(bytes).into_reader();
    loop {
        #[cfg(debug_assertions)]
        let mut count = 0;
        if let Some(batch) = reader.read_batch() {
            for msg in batch {
                let msg = match msg {
                    Ok(msg) => msg,
                    Err(Error::Overrun(position)) => {
                        println!("overrun for {} bytes, resetting reader", position);
                        reader.reset();
                        break;
                    }
                    Err(e) => {
                        return Err(anyhow!(e));
                    }
                };
                debug_assert!(!msg.is_padding, "padding frames should be skipped");
                let mut payload = unsafe { MaybeUninit::new([0u8; 1024]).assume_init() };
                msg.read(&mut payload)?;
                #[cfg(debug_assertions)]
                {
                    count += 1;
                    let payload = &payload[..msg.payload_len];
                    assert!(payload.iter().all(|b| *b == msg.user_defined as u8));
                    println!("{}", String::from_utf8_lossy(payload));
                }
            }
        }
        #[cfg(debug_assertions)]
        {
            if count > 0 {
                println!("batch_size: {}", count);
            }
            // adding delay here to simulate impact of batching
            std::thread::sleep(std::time::Duration::from_millis(10));
        }
    }
}

/// Consume messages produced by the writer using the bulk API. It performs the same payload
/// validation as `reader`, but copies the full bulk window first and then iterates over it
/// off-ring.
#[allow(dead_code)]
pub fn bulk_reader(bytes: &[u8]) -> anyhow::Result<()> {
    let reader = RingBuffer::new(bytes).into_reader();
    let mut bulk_bytes = Vec::new();

    loop {
        #[cfg(debug_assertions)]
        let mut count = 0;

        if let Some(bulk) = reader.read_bulk() {
            let bulk = match bulk {
                Ok(bulk) => bulk,
                Err(Error::Overrun(position)) => {
                    println!("overrun for {} bytes, resetting reader", position);
                    reader.reset();
                    continue;
                }
                Err(e) => return Err(anyhow!(e)),
            };

            if bulk_bytes.len() < bulk.len() {
                bulk_bytes.resize(bulk.len(), 0);
            }

            let iter = match bulk.into_iter(&mut bulk_bytes) {
                Ok(iter) => iter,
                Err(Error::Overrun(position)) => {
                    println!("overrun for {} bytes, resetting reader", position);
                    reader.reset();
                    continue;
                }
                Err(e) => return Err(anyhow!(e)),
            };

            for msg in iter {
                #[cfg(debug_assertions)]
                {
                    count += 1;
                    assert!(msg.payload.iter().all(|b| *b == msg.user_defined as u8));
                    println!("{}", String::from_utf8_lossy(msg.payload));
                }
            }
        }

        #[cfg(debug_assertions)]
        {
            if count > 0 {
                println!("batch_size: {}", count);
            }
            std::thread::sleep(std::time::Duration::from_millis(10));
        }
    }
}