1use embedded_storage_async::nor_flash::{MultiwriteNorFlash, NorFlash};
33
34use super::QueueStorage;
35use crate::{Error, cache::CacheImpl};
36
37struct RamRing<const N: usize> {
44 buf: [u8; N],
45 read_pos: usize,
46 write_pos: usize,
47 used: usize,
49 item_count: usize,
50}
51
52impl<const N: usize> RamRing<N> {
53 pub const fn new() -> Self {
55 Self {
56 buf: [0u8; N],
57 read_pos: 0,
58 write_pos: 0,
59 used: 0,
60 item_count: 0,
61 }
62 }
63
64 pub fn len(&self) -> usize {
66 self.item_count
67 }
68
69 #[allow(unused)]
71 pub fn is_empty(&self) -> bool {
72 self.item_count == 0
73 }
74
75 pub fn bytes_used(&self) -> usize {
77 self.used
78 }
79
80 pub fn oldest_len(&self) -> Option<usize> {
82 if self.item_count == 0 {
83 return None;
84 }
85 let lo = self.buf[self.read_pos] as usize;
86 let hi = self.buf[(self.read_pos + 1) % N] as usize;
87 Some(lo | (hi << 8))
88 }
89
90 #[allow(clippy::result_unit_err)]
94 pub fn push(&mut self, data: &[u8]) -> Result<(), ()> {
95 let len = data.len();
96 if len > u16::MAX as usize {
97 return Err(());
98 }
99 let total = 2 + len;
100 if self.used + total > N {
101 return Err(());
102 }
103 self.write_raw(data);
104 Ok(())
105 }
106
107 #[allow(clippy::result_unit_err)]
111 pub fn push_overwriting(&mut self, data: &[u8]) -> Result<(), ()> {
112 let len = data.len();
113 if len > u16::MAX as usize {
114 return Err(());
115 }
116 let total = 2 + len;
117 if total > N {
118 return Err(());
119 }
120 while self.used + total > N {
121 self.discard_oldest();
122 }
123 self.write_raw(data);
124 Ok(())
125 }
126
127 pub fn peek_into<'b>(&self, buf: &'b mut [u8]) -> Option<&'b [u8]> {
131 let len = self.oldest_len()?;
132 if buf.len() < len {
133 return None;
134 }
135 let mut pos = (self.read_pos + 2) % N;
136 for b in buf[..len].iter_mut() {
137 *b = self.buf[pos];
138 pos = (pos + 1) % N;
139 }
140 Some(&buf[..len])
141 }
142
143 pub fn discard_oldest(&mut self) {
145 if let Some(len) = self.oldest_len() {
146 self.read_pos = (self.read_pos + 2 + len) % N;
147 self.used -= 2 + len;
148 self.item_count -= 1;
149 }
150 }
151
152 fn write_raw(&mut self, data: &[u8]) {
153 let len = data.len();
154 self.write_byte(len as u8);
155 self.write_byte((len >> 8) as u8);
156 for &b in data {
157 self.write_byte(b);
158 }
159 self.used += 2 + len;
160 self.item_count += 1;
161 }
162
163 fn write_byte(&mut self, b: u8) {
164 self.buf[self.write_pos] = b;
165 self.write_pos = (self.write_pos + 1) % N;
166 }
167}
168
169impl<const N: usize> Default for RamRing<N> {
170 fn default() -> Self {
171 Self::new()
172 }
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
179#[cfg_attr(feature = "defmt", derive(defmt::Format))]
180pub enum OverflowPolicy {
181 Err,
183 DiscardOldest,
188}
189
190pub struct BufferedQueue<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> {
217 storage: QueueStorage<S, C>,
218 ram: RamRing<RAM_BYTES>,
219}
220
221impl<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> BufferedQueue<S, C, RAM_BYTES> {
222 pub fn new(storage: QueueStorage<S, C>) -> Self {
224 Self {
225 storage,
226 ram: RamRing::new(),
227 }
228 }
229
230 #[allow(clippy::result_unit_err)]
235 pub fn enqueue(&mut self, data: &[u8], policy: OverflowPolicy) -> Result<(), ()> {
236 match policy {
237 OverflowPolicy::Err => self.ram.push(data),
238 OverflowPolicy::DiscardOldest => self.ram.push_overwriting(data),
239 }
240 }
241
242 pub async fn drain_one(
249 &mut self,
250 scratch: &mut [u8],
251 allow_overwrite: bool,
252 ) -> Result<bool, Error<S::Error>> {
253 let Some(data) = self.ram.peek_into(scratch) else {
254 return Ok(false);
255 };
256 let len = data.len();
257 self.storage.push(&scratch[..len], allow_overwrite).await?;
258 self.ram.discard_oldest();
259 Ok(true)
260 }
261
262 pub async fn drain_all(
266 &mut self,
267 scratch: &mut [u8],
268 allow_overwrite: bool,
269 ) -> Result<(), Error<S::Error>> {
270 while self.drain_one(scratch, allow_overwrite).await? {}
271 Ok(())
272 }
273
274 pub async fn pop<'d>(
279 &mut self,
280 data_buffer: &'d mut [u8],
281 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
282 where
283 S: MultiwriteNorFlash,
284 {
285 let flash_len = self.storage.pop(&mut *data_buffer).await?.map(|s| s.len());
287 if let Some(len) = flash_len {
288 return Ok(Some(&mut data_buffer[..len]));
289 }
290 let len = self.ram.peek_into(data_buffer).map(|s| s.len());
291 if let Some(len) = len {
292 self.ram.discard_oldest();
293 return Ok(Some(&mut data_buffer[..len]));
294 }
295 Ok(None)
296 }
297
298 pub async fn peek<'d>(
303 &mut self,
304 data_buffer: &'d mut [u8],
305 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
306 where
307 S: MultiwriteNorFlash,
308 {
309 let flash_len = self.storage.peek(&mut *data_buffer).await?.map(|s| s.len());
311 if let Some(len) = flash_len {
312 return Ok(Some(&mut data_buffer[..len]));
313 }
314 let len = self.ram.peek_into(data_buffer).map(|s| s.len());
315 if let Some(len) = len {
316 return Ok(Some(&mut data_buffer[..len]));
317 }
318 Ok(None)
319 }
320
321 pub const fn ram_capacity_bytes() -> usize {
323 RAM_BYTES
324 }
325
326 pub fn ram_free_bytes(&self) -> usize {
328 RAM_BYTES - self.ram.bytes_used()
329 }
330
331 pub fn ram_pending_count(&self) -> usize {
333 self.ram.len()
334 }
335
336 pub fn ram_bytes_used(&self) -> usize {
338 self.ram.bytes_used()
339 }
340
341 pub fn into_storage(self) -> QueueStorage<S, C> {
345 self.storage
346 }
347}
348
349#[cfg(feature = "shared-ram-ring")]
382pub struct SharedRamRing<const N: usize> {
383 ring: embassy_sync::blocking_mutex::Mutex<
384 embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
385 core::cell::RefCell<RamRing<N>>,
386 >,
387 signal: embassy_sync::signal::Signal<
388 embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
389 (),
390 >,
391}
392
393#[cfg(feature = "shared-ram-ring")]
394impl<const N: usize> SharedRamRing<N> {
395 pub const fn new() -> Self {
397 Self {
398 ring: embassy_sync::blocking_mutex::Mutex::new(
399 core::cell::RefCell::new(RamRing::new()),
400 ),
401 signal: embassy_sync::signal::Signal::new(),
402 }
403 }
404
405 #[allow(clippy::result_unit_err)]
412 pub fn enqueue(&self, data: &[u8], policy: OverflowPolicy) -> Result<(), ()> {
413 let result = self.ring.lock(|r| match policy {
414 OverflowPolicy::Err => r.borrow_mut().push(data),
415 OverflowPolicy::DiscardOldest => r.borrow_mut().push_overwriting(data),
416 });
417 if result.is_ok() {
418 self.signal.signal(());
419 }
420 result
421 }
422
423 pub async fn wait(&self) {
427 self.signal.wait().await;
428 }
429
430 pub async fn drain_one<S: NorFlash, C: CacheImpl>(
438 &self,
439 storage: &mut QueueStorage<S, C>,
440 scratch: &mut [u8],
441 allow_overwrite: bool,
442 ) -> Result<bool, Error<S::Error>> {
443 let len = self
444 .ring
445 .lock(|r| r.borrow().peek_into(scratch).map(|s| s.len()));
446 let Some(len) = len else {
447 return Ok(false);
448 };
449 storage.push(&scratch[..len], allow_overwrite).await?;
450 self.ring.lock(|r| r.borrow_mut().discard_oldest());
451 Ok(true)
452 }
453
454 pub async fn drain_all<S: NorFlash, C: CacheImpl>(
458 &self,
459 storage: &mut QueueStorage<S, C>,
460 scratch: &mut [u8],
461 allow_overwrite: bool,
462 ) -> Result<(), Error<S::Error>> {
463 while self.drain_one(storage, scratch, allow_overwrite).await? {}
464 Ok(())
465 }
466
467 pub async fn wait_and_drain_all<S: NorFlash, C: CacheImpl>(
476 &self,
477 storage: &mut QueueStorage<S, C>,
478 scratch: &mut [u8],
479 allow_overwrite: bool,
480 ) -> Result<(), Error<S::Error>> {
481 self.wait().await;
482 self.drain_all(storage, scratch, allow_overwrite).await
483 }
484
485 pub async fn pop<'d, S: MultiwriteNorFlash, C: CacheImpl>(
487 &self,
488 storage: &mut QueueStorage<S, C>,
489 data_buffer: &'d mut [u8],
490 allow_overwrite: bool,
491 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
492 if self.ram_pending_count() > 0 {
493 self.drain_all(storage, data_buffer, allow_overwrite)
494 .await?;
495 }
496 storage.pop(data_buffer).await
497 }
498
499 pub async fn peek<'d, S: MultiwriteNorFlash, C: CacheImpl>(
501 &self,
502 storage: &mut QueueStorage<S, C>,
503 data_buffer: &'d mut [u8],
504 allow_overwrite: bool,
505 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
506 if self.ram_pending_count() > 0 {
507 self.drain_all(storage, data_buffer, allow_overwrite)
508 .await?;
509 }
510 storage.peek(data_buffer).await
511 }
512
513 pub const fn ram_capacity_bytes() -> usize {
517 N
518 }
519
520 pub fn ram_free_bytes(&self) -> usize {
522 self.ring.lock(|r| N - r.borrow().bytes_used())
523 }
524
525 pub fn ram_pending_count(&self) -> usize {
527 self.ring.lock(|r| r.borrow().len())
528 }
529
530 pub fn oldest_ram_item_len(&self) -> Option<usize> {
532 self.ring.lock(|r| r.borrow().oldest_len())
533 }
534}
535
536#[cfg(feature = "shared-ram-ring")]
537impl<const N: usize> Default for SharedRamRing<N> {
538 fn default() -> Self {
539 Self::new()
540 }
541}
542
543#[cfg(test)]
546mod tests {
547 use super::*;
548
549 #[test]
550 fn push_peek_discard() {
551 let mut ring: RamRing<64> = RamRing::new();
552 assert!(ring.is_empty());
553
554 ring.push(b"hello").unwrap();
555 ring.push(b"world").unwrap();
556 assert_eq!(ring.len(), 2);
557 assert_eq!(ring.oldest_len(), Some(5));
558
559 let mut buf = [0u8; 32];
560 assert_eq!(ring.peek_into(&mut buf), Some(b"hello".as_ref()));
561 assert_eq!(ring.len(), 2); ring.discard_oldest();
564 assert_eq!(ring.len(), 1);
565 assert_eq!(ring.peek_into(&mut buf), Some(b"world".as_ref()));
566
567 ring.discard_oldest();
568 assert!(ring.is_empty());
569 assert_eq!(ring.peek_into(&mut buf), None);
570 }
571
572 #[test]
573 fn wrap_around() {
574 let mut ring: RamRing<10> = RamRing::new();
576 ring.push(b"abc").unwrap();
577 ring.push(b"def").unwrap();
578 assert!(ring.push(b"x").is_err());
580
581 let mut buf = [0u8; 8];
582 ring.discard_oldest();
583 ring.push(b"ghi").unwrap(); assert_eq!(ring.peek_into(&mut buf), Some(b"def".as_ref()));
585 ring.discard_oldest();
586 assert_eq!(ring.peek_into(&mut buf), Some(b"ghi".as_ref()));
587 }
588
589 #[test]
590 fn push_overwriting_evicts_oldest() {
591 let mut ring: RamRing<10> = RamRing::new();
593 ring.push(b"aaa").unwrap();
594 ring.push(b"bbb").unwrap();
595
596 ring.push_overwriting(b"ccc").unwrap();
598 assert_eq!(ring.len(), 2);
599
600 let mut buf = [0u8; 8];
601 assert_eq!(ring.peek_into(&mut buf), Some(b"bbb".as_ref()));
602 ring.discard_oldest();
603 assert_eq!(ring.peek_into(&mut buf), Some(b"ccc".as_ref()));
604 }
605
606 #[cfg(feature = "_test")]
609 mod integration {
610 use super::*;
611 use crate::cache::NoCache;
612 use crate::mock_flash::MockFlashBase;
613 use crate::queue::{QueueConfig, QueueStorage};
614 use futures::executor::block_on;
615
616 type MockFlash = MockFlashBase<4, 4, 64>;
618
619 fn make_storage() -> QueueStorage<MockFlash, NoCache> {
620 let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
621 let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
622 QueueStorage::new(flash, config, NoCache::new())
623 }
624
625 fn make_queue() -> BufferedQueue<MockFlash, NoCache, 256> {
626 BufferedQueue::new(make_storage())
627 }
628
629 #[test]
630 fn enqueue_drain_pop() {
631 block_on(async {
632 let mut queue = make_queue();
633 let mut scratch = [0u8; 64];
634 let mut out = [0u8; 64];
635
636 queue.enqueue(b"hello", OverflowPolicy::Err).unwrap();
638 queue.enqueue(b"world", OverflowPolicy::Err).unwrap();
639 assert_eq!(queue.ram_pending_count(), 2);
640
641 queue.drain_all(&mut scratch, false).await.unwrap();
643 assert_eq!(queue.ram_pending_count(), 0);
644
645 let data = queue.pop(&mut out).await.unwrap().unwrap();
647 assert_eq!(data, b"hello");
648
649 let data = queue.pop(&mut out).await.unwrap().unwrap();
650 assert_eq!(data, b"world");
651
652 assert!(queue.pop(&mut out).await.unwrap().is_none());
653 });
654 }
655
656 #[test]
657 fn pop_reads_flash_before_ram() {
658 block_on(async {
659 let mut storage = make_storage();
662 let mut aligned = [0u8; 8];
663 aligned[..5].copy_from_slice(b"first");
664 storage.push(&aligned[..5], false).await.unwrap();
665
666 let mut queue: BufferedQueue<MockFlash, NoCache, 256> = BufferedQueue::new(storage);
667 let mut out = [0u8; 64];
668
669 queue.enqueue(b"second", OverflowPolicy::Err).unwrap();
671
672 let data = queue.pop(&mut out).await.unwrap().unwrap();
674 assert_eq!(data, b"first");
675
676 let data = queue.pop(&mut out).await.unwrap().unwrap();
677 assert_eq!(data, b"second");
678
679 assert!(queue.pop(&mut out).await.unwrap().is_none());
680 });
681 }
682
683 #[test]
684 fn overflow_policy_err() {
685 let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
688 let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
689 let storage = QueueStorage::new(flash, config, NoCache::new());
690 let mut queue: BufferedQueue<MockFlash, NoCache, 16> = BufferedQueue::new(storage);
691
692 queue.enqueue(b"aaaa", OverflowPolicy::Err).unwrap(); queue.enqueue(b"bbbb", OverflowPolicy::Err).unwrap(); assert!(queue.enqueue(b"cccc", OverflowPolicy::Err).is_err()); }
696
697 #[test]
698 fn overflow_policy_discard_oldest() {
699 let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
700 let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
701 let storage = QueueStorage::new(flash, config, NoCache::new());
702 let mut queue: BufferedQueue<MockFlash, NoCache, 16> = BufferedQueue::new(storage);
703
704 queue.enqueue(b"aaaa", OverflowPolicy::Err).unwrap();
705 queue.enqueue(b"bbbb", OverflowPolicy::Err).unwrap();
706 queue
708 .enqueue(b"cccc", OverflowPolicy::DiscardOldest)
709 .unwrap();
710
711 assert_eq!(queue.ram_pending_count(), 2);
712
713 block_on(async {
715 let mut out = [0u8; 64];
716 let data = queue.pop(&mut out).await.unwrap().unwrap();
717 assert_eq!(data, b"bbbb");
718 let data = queue.pop(&mut out).await.unwrap().unwrap();
719 assert_eq!(data, b"cccc");
720 });
721 }
722
723 #[test]
724 fn capacity_helpers() {
725 let queue = make_queue();
726 assert_eq!(
727 BufferedQueue::<MockFlash, NoCache, 256>::ram_capacity_bytes(),
728 256
729 );
730 assert_eq!(queue.ram_free_bytes(), 256);
731 assert_eq!(queue.ram_bytes_used(), 0);
732 }
733 }
734}