use embedded_storage_async::nor_flash::{MultiwriteNorFlash, NorFlash};
use super::QueueStorage;
use crate::{Error, cache::CacheImpl};
struct RamRing<const N: usize> {
buf: [u8; N],
read_pos: usize,
write_pos: usize,
used: usize,
item_count: usize,
}
impl<const N: usize> RamRing<N> {
pub const fn new() -> Self {
Self {
buf: [0u8; N],
read_pos: 0,
write_pos: 0,
used: 0,
item_count: 0,
}
}
pub fn len(&self) -> usize {
self.item_count
}
#[allow(unused)]
pub fn is_empty(&self) -> bool {
self.item_count == 0
}
pub fn bytes_used(&self) -> usize {
self.used
}
pub fn oldest_len(&self) -> Option<usize> {
if self.item_count == 0 {
return None;
}
let lo = self.buf[self.read_pos] as usize;
let hi = self.buf[(self.read_pos + 1) % N] as usize;
Some(lo | (hi << 8))
}
#[allow(clippy::result_unit_err)]
pub fn push(&mut self, data: &[u8]) -> Result<(), ()> {
let len = data.len();
if len > u16::MAX as usize {
return Err(());
}
let total = 2 + len;
if self.used + total > N {
return Err(());
}
self.write_raw(data);
Ok(())
}
#[allow(clippy::result_unit_err)]
pub fn push_overwriting(&mut self, data: &[u8]) -> Result<(), ()> {
let len = data.len();
if len > u16::MAX as usize {
return Err(());
}
let total = 2 + len;
if total > N {
return Err(());
}
while self.used + total > N {
self.discard_oldest();
}
self.write_raw(data);
Ok(())
}
pub fn peek_into<'b>(&self, buf: &'b mut [u8]) -> Option<&'b [u8]> {
let len = self.oldest_len()?;
if buf.len() < len {
return None;
}
let mut pos = (self.read_pos + 2) % N;
for b in buf[..len].iter_mut() {
*b = self.buf[pos];
pos = (pos + 1) % N;
}
Some(&buf[..len])
}
pub fn discard_oldest(&mut self) {
if let Some(len) = self.oldest_len() {
self.read_pos = (self.read_pos + 2 + len) % N;
self.used -= 2 + len;
self.item_count -= 1;
}
}
fn write_raw(&mut self, data: &[u8]) {
let len = data.len();
self.write_byte(len as u8);
self.write_byte((len >> 8) as u8);
for &b in data {
self.write_byte(b);
}
self.used += 2 + len;
self.item_count += 1;
}
fn write_byte(&mut self, b: u8) {
self.buf[self.write_pos] = b;
self.write_pos = (self.write_pos + 1) % N;
}
}
impl<const N: usize> Default for RamRing<N> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum OverflowPolicy {
Err,
DiscardOldest,
}
pub struct BufferedQueue<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> {
storage: QueueStorage<S, C>,
ram: RamRing<RAM_BYTES>,
}
impl<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> BufferedQueue<S, C, RAM_BYTES> {
pub fn new(storage: QueueStorage<S, C>) -> Self {
Self {
storage,
ram: RamRing::new(),
}
}
#[allow(clippy::result_unit_err)]
pub fn enqueue(&mut self, data: &[u8], policy: OverflowPolicy) -> Result<(), ()> {
match policy {
OverflowPolicy::Err => self.ram.push(data),
OverflowPolicy::DiscardOldest => self.ram.push_overwriting(data),
}
}
pub async fn drain_one(
&mut self,
scratch: &mut [u8],
allow_overwrite: bool,
) -> Result<bool, Error<S::Error>> {
let Some(data) = self.ram.peek_into(scratch) else {
return Ok(false);
};
let len = data.len();
self.storage.push(&scratch[..len], allow_overwrite).await?;
self.ram.discard_oldest();
Ok(true)
}
pub async fn drain_all(
&mut self,
scratch: &mut [u8],
allow_overwrite: bool,
) -> Result<(), Error<S::Error>> {
while self.drain_one(scratch, allow_overwrite).await? {}
Ok(())
}
pub async fn pop<'d>(
&mut self,
data_buffer: &'d mut [u8],
) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
where
S: MultiwriteNorFlash,
{
let flash_len = self.storage.pop(&mut *data_buffer).await?.map(|s| s.len());
if let Some(len) = flash_len {
return Ok(Some(&mut data_buffer[..len]));
}
let len = self.ram.peek_into(data_buffer).map(|s| s.len());
if let Some(len) = len {
self.ram.discard_oldest();
return Ok(Some(&mut data_buffer[..len]));
}
Ok(None)
}
pub async fn peek<'d>(
&mut self,
data_buffer: &'d mut [u8],
) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
where
S: MultiwriteNorFlash,
{
let flash_len = self.storage.peek(&mut *data_buffer).await?.map(|s| s.len());
if let Some(len) = flash_len {
return Ok(Some(&mut data_buffer[..len]));
}
let len = self.ram.peek_into(data_buffer).map(|s| s.len());
if let Some(len) = len {
return Ok(Some(&mut data_buffer[..len]));
}
Ok(None)
}
pub const fn ram_capacity_bytes() -> usize {
RAM_BYTES
}
pub fn ram_free_bytes(&self) -> usize {
RAM_BYTES - self.ram.bytes_used()
}
pub fn ram_pending_count(&self) -> usize {
self.ram.len()
}
pub fn ram_bytes_used(&self) -> usize {
self.ram.bytes_used()
}
pub fn into_storage(self) -> QueueStorage<S, C> {
self.storage
}
}
#[cfg(feature = "shared-ram-ring")]
pub struct SharedRamRing<const N: usize> {
ring: embassy_sync::blocking_mutex::Mutex<
embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
core::cell::RefCell<RamRing<N>>,
>,
signal: embassy_sync::signal::Signal<
embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
(),
>,
}
#[cfg(feature = "shared-ram-ring")]
impl<const N: usize> SharedRamRing<N> {
pub const fn new() -> Self {
Self {
ring: embassy_sync::blocking_mutex::Mutex::new(
core::cell::RefCell::new(RamRing::new()),
),
signal: embassy_sync::signal::Signal::new(),
}
}
#[allow(clippy::result_unit_err)]
pub fn enqueue(&self, data: &[u8], policy: OverflowPolicy) -> Result<(), ()> {
let result = self.ring.lock(|r| match policy {
OverflowPolicy::Err => r.borrow_mut().push(data),
OverflowPolicy::DiscardOldest => r.borrow_mut().push_overwriting(data),
});
if result.is_ok() {
self.signal.signal(());
}
result
}
pub async fn wait(&self) {
self.signal.wait().await;
}
pub async fn drain_one<S: NorFlash, C: CacheImpl>(
&self,
storage: &mut QueueStorage<S, C>,
scratch: &mut [u8],
allow_overwrite: bool,
) -> Result<bool, Error<S::Error>> {
let len = self
.ring
.lock(|r| r.borrow().peek_into(scratch).map(|s| s.len()));
let Some(len) = len else {
return Ok(false);
};
storage.push(&scratch[..len], allow_overwrite).await?;
self.ring.lock(|r| r.borrow_mut().discard_oldest());
Ok(true)
}
pub async fn drain_all<S: NorFlash, C: CacheImpl>(
&self,
storage: &mut QueueStorage<S, C>,
scratch: &mut [u8],
allow_overwrite: bool,
) -> Result<(), Error<S::Error>> {
while self.drain_one(storage, scratch, allow_overwrite).await? {}
Ok(())
}
pub async fn wait_and_drain_all<S: NorFlash, C: CacheImpl>(
&self,
storage: &mut QueueStorage<S, C>,
scratch: &mut [u8],
allow_overwrite: bool,
) -> Result<(), Error<S::Error>> {
self.wait().await;
self.drain_all(storage, scratch, allow_overwrite).await
}
pub async fn pop<'d, S: MultiwriteNorFlash, C: CacheImpl>(
&self,
storage: &mut QueueStorage<S, C>,
data_buffer: &'d mut [u8],
allow_overwrite: bool,
) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
if self.ram_pending_count() > 0 {
self.drain_all(storage, data_buffer, allow_overwrite)
.await?;
}
storage.pop(data_buffer).await
}
pub async fn peek<'d, S: MultiwriteNorFlash, C: CacheImpl>(
&self,
storage: &mut QueueStorage<S, C>,
data_buffer: &'d mut [u8],
allow_overwrite: bool,
) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
if self.ram_pending_count() > 0 {
self.drain_all(storage, data_buffer, allow_overwrite)
.await?;
}
storage.peek(data_buffer).await
}
pub const fn ram_capacity_bytes() -> usize {
N
}
pub fn ram_free_bytes(&self) -> usize {
self.ring.lock(|r| N - r.borrow().bytes_used())
}
pub fn ram_pending_count(&self) -> usize {
self.ring.lock(|r| r.borrow().len())
}
pub fn oldest_ram_item_len(&self) -> Option<usize> {
self.ring.lock(|r| r.borrow().oldest_len())
}
}
#[cfg(feature = "shared-ram-ring")]
impl<const N: usize> Default for SharedRamRing<N> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_peek_discard() {
let mut ring: RamRing<64> = RamRing::new();
assert!(ring.is_empty());
ring.push(b"hello").unwrap();
ring.push(b"world").unwrap();
assert_eq!(ring.len(), 2);
assert_eq!(ring.oldest_len(), Some(5));
let mut buf = [0u8; 32];
assert_eq!(ring.peek_into(&mut buf), Some(b"hello".as_ref()));
assert_eq!(ring.len(), 2);
ring.discard_oldest();
assert_eq!(ring.len(), 1);
assert_eq!(ring.peek_into(&mut buf), Some(b"world".as_ref()));
ring.discard_oldest();
assert!(ring.is_empty());
assert_eq!(ring.peek_into(&mut buf), None);
}
#[test]
fn wrap_around() {
let mut ring: RamRing<10> = RamRing::new();
ring.push(b"abc").unwrap();
ring.push(b"def").unwrap();
assert!(ring.push(b"x").is_err());
let mut buf = [0u8; 8];
ring.discard_oldest();
ring.push(b"ghi").unwrap(); assert_eq!(ring.peek_into(&mut buf), Some(b"def".as_ref()));
ring.discard_oldest();
assert_eq!(ring.peek_into(&mut buf), Some(b"ghi".as_ref()));
}
#[test]
fn push_overwriting_evicts_oldest() {
let mut ring: RamRing<10> = RamRing::new();
ring.push(b"aaa").unwrap();
ring.push(b"bbb").unwrap();
ring.push_overwriting(b"ccc").unwrap();
assert_eq!(ring.len(), 2);
let mut buf = [0u8; 8];
assert_eq!(ring.peek_into(&mut buf), Some(b"bbb".as_ref()));
ring.discard_oldest();
assert_eq!(ring.peek_into(&mut buf), Some(b"ccc".as_ref()));
}
#[cfg(feature = "_test")]
mod integration {
use super::*;
use crate::cache::NoCache;
use crate::mock_flash::MockFlashBase;
use crate::queue::{QueueConfig, QueueStorage};
use futures::executor::block_on;
type MockFlash = MockFlashBase<4, 4, 64>;
fn make_storage() -> QueueStorage<MockFlash, NoCache> {
let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
QueueStorage::new(flash, config, NoCache::new())
}
fn make_queue() -> BufferedQueue<MockFlash, NoCache, 256> {
BufferedQueue::new(make_storage())
}
#[test]
fn enqueue_drain_pop() {
block_on(async {
let mut queue = make_queue();
let mut scratch = [0u8; 64];
let mut out = [0u8; 64];
queue.enqueue(b"hello", OverflowPolicy::Err).unwrap();
queue.enqueue(b"world", OverflowPolicy::Err).unwrap();
assert_eq!(queue.ram_pending_count(), 2);
queue.drain_all(&mut scratch, false).await.unwrap();
assert_eq!(queue.ram_pending_count(), 0);
let data = queue.pop(&mut out).await.unwrap().unwrap();
assert_eq!(data, b"hello");
let data = queue.pop(&mut out).await.unwrap().unwrap();
assert_eq!(data, b"world");
assert!(queue.pop(&mut out).await.unwrap().is_none());
});
}
#[test]
fn pop_reads_flash_before_ram() {
block_on(async {
let mut storage = make_storage();
let mut aligned = [0u8; 8];
aligned[..5].copy_from_slice(b"first");
storage.push(&aligned[..5], false).await.unwrap();
let mut queue: BufferedQueue<MockFlash, NoCache, 256> = BufferedQueue::new(storage);
let mut out = [0u8; 64];
queue.enqueue(b"second", OverflowPolicy::Err).unwrap();
let data = queue.pop(&mut out).await.unwrap().unwrap();
assert_eq!(data, b"first");
let data = queue.pop(&mut out).await.unwrap().unwrap();
assert_eq!(data, b"second");
assert!(queue.pop(&mut out).await.unwrap().is_none());
});
}
#[test]
fn overflow_policy_err() {
let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
let storage = QueueStorage::new(flash, config, NoCache::new());
let mut queue: BufferedQueue<MockFlash, NoCache, 16> = BufferedQueue::new(storage);
queue.enqueue(b"aaaa", OverflowPolicy::Err).unwrap(); queue.enqueue(b"bbbb", OverflowPolicy::Err).unwrap(); assert!(queue.enqueue(b"cccc", OverflowPolicy::Err).is_err()); }
#[test]
fn overflow_policy_discard_oldest() {
let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
let storage = QueueStorage::new(flash, config, NoCache::new());
let mut queue: BufferedQueue<MockFlash, NoCache, 16> = BufferedQueue::new(storage);
queue.enqueue(b"aaaa", OverflowPolicy::Err).unwrap();
queue.enqueue(b"bbbb", OverflowPolicy::Err).unwrap();
queue
.enqueue(b"cccc", OverflowPolicy::DiscardOldest)
.unwrap();
assert_eq!(queue.ram_pending_count(), 2);
block_on(async {
let mut out = [0u8; 64];
let data = queue.pop(&mut out).await.unwrap().unwrap();
assert_eq!(data, b"bbbb");
let data = queue.pop(&mut out).await.unwrap().unwrap();
assert_eq!(data, b"cccc");
});
}
#[test]
fn capacity_helpers() {
let queue = make_queue();
assert_eq!(
BufferedQueue::<MockFlash, NoCache, 256>::ram_capacity_bytes(),
256
);
assert_eq!(queue.ram_free_bytes(), 256);
assert_eq!(queue.ram_bytes_used(), 0);
}
}
}