use anyhow::anyhow;
use bcast::{RingBuffer, error::Error};
use rand::{Rng, thread_rng};
use std::mem::MaybeUninit;
#[allow(dead_code)]
pub fn writer(bytes: &[u8]) {
let writer = RingBuffer::new(bytes).into_writer();
loop {
let symbol = thread_rng().gen_range(b'A'..=b'Z');
let msg_len = thread_rng().gen_range(1..20);
let random_bytes: Vec<u8> = (0..msg_len).map(|_| symbol).collect();
let mut claim = writer.claim_with_user_defined(msg_len, true, symbol as u32);
claim.get_buffer_mut().copy_from_slice(&random_bytes);
claim.commit();
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
#[allow(dead_code)]
pub fn reader(bytes: &[u8]) -> anyhow::Result<()> {
let reader = RingBuffer::new(bytes).into_reader();
loop {
#[cfg(debug_assertions)]
let mut count = 0;
if let Some(batch) = reader.read_batch() {
for msg in batch {
let msg = match msg {
Ok(msg) => msg,
Err(Error::Overrun(position)) => {
println!("overrun for {} bytes, resetting reader", position);
reader.reset();
break;
}
Err(e) => {
return Err(anyhow!(e));
}
};
debug_assert!(!msg.is_padding, "padding frames should be skipped");
let mut payload = unsafe { MaybeUninit::new([0u8; 1024]).assume_init() };
msg.read(&mut payload)?;
#[cfg(debug_assertions)]
{
count += 1;
let payload = &payload[..msg.payload_len];
assert!(payload.iter().all(|b| *b == msg.user_defined as u8));
println!("{}", String::from_utf8_lossy(payload));
}
}
}
#[cfg(debug_assertions)]
{
if count > 0 {
println!("batch_size: {}", count);
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}
#[allow(dead_code)]
pub fn bulk_reader(bytes: &[u8]) -> anyhow::Result<()> {
let reader = RingBuffer::new(bytes).into_reader();
let mut bulk_bytes = Vec::new();
loop {
#[cfg(debug_assertions)]
let mut count = 0;
if let Some(bulk) = reader.read_bulk() {
let bulk = match bulk {
Ok(bulk) => bulk,
Err(Error::Overrun(position)) => {
println!("overrun for {} bytes, resetting reader", position);
reader.reset();
continue;
}
Err(e) => return Err(anyhow!(e)),
};
if bulk_bytes.len() < bulk.len() {
bulk_bytes.resize(bulk.len(), 0);
}
let iter = match bulk.into_iter(&mut bulk_bytes) {
Ok(iter) => iter,
Err(Error::Overrun(position)) => {
println!("overrun for {} bytes, resetting reader", position);
reader.reset();
continue;
}
Err(e) => return Err(anyhow!(e)),
};
for msg in iter {
#[cfg(debug_assertions)]
{
count += 1;
assert!(msg.payload.iter().all(|b| *b == msg.user_defined as u8));
println!("{}", String::from_utf8_lossy(msg.payload));
}
}
}
#[cfg(debug_assertions)]
{
if count > 0 {
println!("batch_size: {}", count);
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
}