use core::convert::{Infallible, TryFrom, TryInto};
use core::ptr::read_volatile;
use core::ptr::write_volatile;
use core::sync::atomic;
use core::sync::atomic::Ordering;
#[derive(Debug)]
pub struct ByteQueue {
write_pos_ptr: *mut u32,
read_pos_ptr: *mut u32,
data_ptr: *mut u8,
capacity: usize,
}
unsafe impl Send for ByteQueue {}
impl ByteQueue {
pub unsafe fn create(mem: *mut u8, mem_len: usize) -> Self {
let mut slf = Self::attach(mem, mem_len);
slf.set_write_pos(0);
slf.set_read_pos(0);
slf
}
pub unsafe fn attach(mem: *mut u8, mem_len: usize) -> Self {
ByteQueue {
write_pos_ptr: mem as *mut u32,
read_pos_ptr: (mem as *mut u32).offset(1),
data_ptr: mem.offset(
isize::try_from(2 * core::mem::size_of::<u32>())
.expect("~8u should be convertible to isize"),
),
capacity: mem_len - 2 * core::mem::size_of::<u32>(),
}
}
fn get_write_pos(&self) -> usize {
unsafe { read_volatile(self.write_pos_ptr) as usize }
}
fn get_read_pos(&self) -> usize {
unsafe { read_volatile(self.read_pos_ptr) as usize }
}
fn set_write_pos(&mut self, wpos: usize) {
unsafe {
write_volatile(
self.write_pos_ptr,
wpos.try_into().expect("cannot convert usize into u32"),
)
}
}
fn set_read_pos(&mut self, rpos: usize) {
unsafe {
write_volatile(
self.read_pos_ptr,
rpos.try_into().expect("cannot convert usize into u32"),
)
}
}
pub fn space(&self) -> usize {
(self.capacity + self.get_read_pos() - self.get_write_pos() - 1) % self.capacity
}
pub fn capacity(&self) -> usize {
self.capacity - 1
}
pub fn size(&self) -> usize {
(self.capacity + self.get_write_pos() - self.get_read_pos()) % self.capacity
}
pub fn write_at_most(&mut self, data: &[u8]) -> usize {
let len = data.len().min(self.space());
atomic::fence(Ordering::Acquire);
let wpos = self.get_write_pos();
for (i, byte) in data.iter().enumerate().take(len) {
let offset = (wpos + i) % self.capacity;
unsafe {
let dptr = self.data_ptr.add(offset);
write_volatile(dptr, *byte);
}
}
atomic::fence(Ordering::Release);
let wpos = (wpos + len) % self.capacity;
self.set_write_pos(wpos);
len
}
pub fn write_or_fail(&mut self, data: &[u8]) -> nb::Result<(), Infallible> {
if self.space() < data.len() {
return Err(nb::Error::WouldBlock);
}
self.write_at_most(data);
Ok(())
}
pub fn write_blocking(&mut self, data: &[u8]) {
loop {
if self.space() >= data.len() {
break;
}
}
self.write_at_most(data);
}
pub fn skip_at_most(&mut self, len: usize) -> usize {
let len = len.min(self.size());
self.set_read_pos((self.get_read_pos() + len) % self.capacity);
len
}
pub fn skip_or_fail(&mut self, len: usize) -> nb::Result<(), Infallible> {
if self.size() < len {
return Err(nb::Error::WouldBlock);
}
self.skip_at_most(len);
Ok(())
}
pub fn skip_blocking(&mut self, len: usize) {
loop {
if self.size() >= len {
break;
}
}
self.skip_at_most(len);
}
pub fn peek_at_most(&self, buf: &mut [u8], len: usize) -> usize {
let len = len.min(buf.len()).min(self.size());
atomic::fence(Ordering::Acquire);
let rpos = self.get_read_pos();
for (i, byte) in buf.iter_mut().enumerate().take(len) {
let offset = (rpos + i) % self.capacity;
unsafe {
let dptr = self.data_ptr.add(offset);
*byte = read_volatile(dptr);
}
}
atomic::fence(Ordering::Release);
len
}
pub fn peek_or_fail(&self, buf: &mut [u8]) -> nb::Result<(), Infallible> {
if self.size() < buf.len() {
return Err(nb::Error::WouldBlock);
}
self.peek_at_most(buf, buf.len());
Ok(())
}
pub fn peek_blocking(&self, buf: &mut [u8]) {
loop {
if self.size() >= buf.len() {
break;
}
}
self.peek_at_most(buf, buf.len());
}
pub fn consume_at_most(&mut self, buf: &mut [u8]) -> usize {
let len = self.peek_at_most(buf, buf.len());
self.skip_at_most(len)
}
pub fn consume_or_fail(&mut self, buf: &mut [u8]) -> nb::Result<(), Infallible> {
self.peek_or_fail(buf)?;
self.skip_at_most(buf.len());
Ok(())
}
pub fn consume_blocking(&mut self, buf: &mut [u8]) {
self.peek_blocking(buf);
self.skip_at_most(buf.len());
}
}
impl core::fmt::Write for ByteQueue {
fn write_str(&mut self, s: &str) -> Result<(), core::fmt::Error> {
self.write_blocking(s.as_bytes());
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::ByteQueue;
const LEN_U32_TO_U8_SCALER: usize = core::mem::size_of::<u32>();
#[test]
fn test_peek() {
let mut buffer = [123u32; 17];
let mut writer = unsafe {
ByteQueue::create(
buffer.as_mut_ptr() as *mut u8,
buffer.len() * LEN_U32_TO_U8_SCALER,
)
};
let mut reader = unsafe {
ByteQueue::attach(
buffer.as_mut_ptr() as *mut u8,
buffer.len() * LEN_U32_TO_U8_SCALER,
)
};
let tx = [1, 2, 3, 4];
writer.write_or_fail(&tx).unwrap();
let mut rx = [0u8; 4];
reader.peek_or_fail(&mut rx).unwrap();
assert_eq!(&tx, &rx);
assert!(reader.size() == tx.len());
for i in 0..1234 {
reader.peek_at_most(&mut rx, i);
assert_eq!(&tx[..i.min(tx.len())], &rx[..i.min(rx.len())]);
assert!(reader.size() == tx.len());
}
reader.consume_or_fail(&mut rx).unwrap();
assert_eq!(&tx, &rx);
assert!(reader.size() == 0);
}
#[test]
fn test_skip() {
let mut buffer = [123u32; 55];
let mut writer = unsafe {
ByteQueue::create(
buffer.as_mut_ptr() as *mut u8,
buffer.len() * LEN_U32_TO_U8_SCALER,
)
};
let mut reader = unsafe {
ByteQueue::attach(
buffer.as_mut_ptr() as *mut u8,
buffer.len() * LEN_U32_TO_U8_SCALER,
)
};
let data = [0xffu8; 10];
let sum_to_ten = 55;
for i in 0..=10 {
writer.write_at_most(&data[..i]);
}
let mut skipped = 0;
for i in 0..=10 {
reader.skip_or_fail(i).unwrap();
skipped += i;
assert_eq!(reader.size(), sum_to_ten - skipped);
}
}
#[test]
fn write_read() {
let mut buffer = [123u32; 17];
let mut writer = unsafe {
ByteQueue::create(
buffer.as_mut_ptr() as *mut u8,
buffer.len() * LEN_U32_TO_U8_SCALER,
)
};
let mut reader = unsafe {
ByteQueue::attach(
buffer.as_mut_ptr() as *mut u8,
buffer.len() * LEN_U32_TO_U8_SCALER,
)
};
let tx = [1, 2, 3, 4];
writer.write_blocking(&tx);
let mut rx = [0u8; 4];
reader.consume_blocking(&mut rx);
assert_eq!(&tx, &rx);
}
}