Skip to main content

sequential_storage/queue/
buffered.rs

1//! RAM-buffered wrapper for [`QueueStorage`].
2//!
3//! NOR flash writes are slow and erases are very slow. If a producer emits data faster than the
4//! flash interface can commit it, [`BufferedQueue`] accepts items into a fixed-size RAM ring
5//! buffer via the synchronous [`enqueue`][BufferedQueue::enqueue] call (no flash I/O) and
6//! asynchronously drains them to flash via [`drain_one`][BufferedQueue::drain_one] or
7//! [`drain_all`][BufferedQueue::drain_all].
8//!
9//! # Overflow policy
10//!
11//! When the RAM ring is full, [`enqueue`][BufferedQueue::enqueue] behaviour is controlled by
12//! [`OverflowPolicy`]: either return `Err(())` or silently evict the oldest buffered item.
13//!
14//! # Ordering
15//!
16//! FIFO ordering is preserved: flash items are always older than RAM items.
17//! [`pop`][BufferedQueue::pop] and [`peek`][BufferedQueue::peek] read from flash first,
18//! falling back to the oldest RAM item if flash is empty.
19//!
20//! # Power-fail note
21//!
22//! Items that are in the RAM ring and have not yet been drained to flash **will be lost** on
23//! power loss. Items that have been drained follow the power-fail safety guarantees of
24//! sequential-storage.
25//!
26//! # Embassy / ISR-safe use
27//!
28//! Enable the `shared-ram-ring` feature for [`SharedRamRing`], which wraps the ring in a
29//! critical-section mutex so it can be enqueued to from an interrupt handler, and signals
30//! a drain task the moment data arrives.
31
32use embedded_storage_async::nor_flash::{MultiwriteNorFlash, NorFlash};
33
34use super::QueueStorage;
35use crate::{Error, cache::CacheImpl};
36
37// ── RamRing ──────────────────────────────────────────────────────────────────
38
39/// A compact, fixed-capacity ring buffer for variable-length byte items.
40///
41/// Each item is stored as a 2-byte little-endian length prefix followed by the item's bytes,
42/// so the usable capacity for data is `N - 2` bytes per item at most.
43struct RamRing<const N: usize> {
44    buf: [u8; N],
45    read_pos: usize,
46    write_pos: usize,
47    /// Total bytes occupied (length prefixes + data).
48    used: usize,
49    item_count: usize,
50}
51
52impl<const N: usize> RamRing<N> {
53    /// Create a new empty ring.
54    pub const fn new() -> Self {
55        Self {
56            buf: [0u8; N],
57            read_pos: 0,
58            write_pos: 0,
59            used: 0,
60            item_count: 0,
61        }
62    }
63
64    /// Number of items currently buffered.
65    pub fn len(&self) -> usize {
66        self.item_count
67    }
68
69    /// Returns `true` if the ring contains no items.
70    #[allow(unused)]
71    pub fn is_empty(&self) -> bool {
72        self.item_count == 0
73    }
74
75    /// Bytes currently occupied (including length prefixes).
76    pub fn bytes_used(&self) -> usize {
77        self.used
78    }
79
80    /// Byte length of the oldest buffered item, or `None` if the ring is empty.
81    pub fn oldest_len(&self) -> Option<usize> {
82        if self.item_count == 0 {
83            return None;
84        }
85        let lo = self.buf[self.read_pos] as usize;
86        let hi = self.buf[(self.read_pos + 1) % N] as usize;
87        Some(lo | (hi << 8))
88    }
89
90    /// Push an item into the ring.
91    ///
92    /// Returns `Err(())` if there is insufficient space or the item exceeds `u16::MAX` bytes.
93    #[allow(clippy::result_unit_err)]
94    pub fn push(&mut self, data: &[u8]) -> Result<(), ()> {
95        let len = data.len();
96        if len > u16::MAX as usize {
97            return Err(());
98        }
99        let total = 2 + len;
100        if self.used + total > N {
101            return Err(());
102        }
103        self.write_raw(data);
104        Ok(())
105    }
106
107    /// Push an item, discarding the oldest item(s) to make room if necessary.
108    ///
109    /// Returns `Err(())` only if the item is larger than the entire ring capacity.
110    #[allow(clippy::result_unit_err)]
111    pub fn push_overwriting(&mut self, data: &[u8]) -> Result<(), ()> {
112        let len = data.len();
113        if len > u16::MAX as usize {
114            return Err(());
115        }
116        let total = 2 + len;
117        if total > N {
118            return Err(());
119        }
120        while self.used + total > N {
121            self.discard_oldest();
122        }
123        self.write_raw(data);
124        Ok(())
125    }
126
127    /// Copy the oldest item into `buf` and return a slice of the written bytes.
128    ///
129    /// Returns `None` if the ring is empty or `buf` is smaller than [`oldest_len`][Self::oldest_len].
130    pub fn peek_into<'b>(&self, buf: &'b mut [u8]) -> Option<&'b [u8]> {
131        let len = self.oldest_len()?;
132        if buf.len() < len {
133            return None;
134        }
135        let mut pos = (self.read_pos + 2) % N;
136        for b in buf[..len].iter_mut() {
137            *b = self.buf[pos];
138            pos = (pos + 1) % N;
139        }
140        Some(&buf[..len])
141    }
142
143    /// Discard the oldest item without copying it. Does nothing if the ring is empty.
144    pub fn discard_oldest(&mut self) {
145        if let Some(len) = self.oldest_len() {
146            self.read_pos = (self.read_pos + 2 + len) % N;
147            self.used -= 2 + len;
148            self.item_count -= 1;
149        }
150    }
151
152    fn write_raw(&mut self, data: &[u8]) {
153        let len = data.len();
154        self.write_byte(len as u8);
155        self.write_byte((len >> 8) as u8);
156        for &b in data {
157            self.write_byte(b);
158        }
159        self.used += 2 + len;
160        self.item_count += 1;
161    }
162
163    fn write_byte(&mut self, b: u8) {
164        self.buf[self.write_pos] = b;
165        self.write_pos = (self.write_pos + 1) % N;
166    }
167}
168
169impl<const N: usize> Default for RamRing<N> {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175// ── OverflowPolicy ────────────────────────────────────────────────────────────
176
177/// Controls what happens when [`enqueue`][BufferedQueue::enqueue] is called on a full RAM ring.
178#[derive(Debug, Clone, Copy, PartialEq, Eq)]
179#[cfg_attr(feature = "defmt", derive(defmt::Format))]
180pub enum OverflowPolicy {
181    /// Return `Err(())` — the caller decides whether to drop the item or drain first.
182    Err,
183    /// Silently discard the oldest buffered item(s) to make room for the new one.
184    ///
185    /// The new item is always accepted as long as it physically fits in the ring
186    /// (i.e. `data.len() + 2 <= RAM_BYTES`).
187    DiscardOldest,
188}
189
190// ── BufferedQueue ─────────────────────────────────────────────────────────────
191
192/// A write-buffered queue that accepts items into a RAM ring and drains them to NOR flash.
193///
194/// ## Type parameters
195///
196/// - `S`: NOR flash driver implementing [`NorFlash`].
197/// - `C`: Cache implementation from [`crate::cache`].
198/// - `RAM_BYTES`: Capacity of the RAM ring buffer in bytes (includes 2-byte per-item overhead).
199///
200/// ## Usage pattern
201///
202/// ```ignore
203/// // Fast path — called from a tight loop:
204/// queue.enqueue(&sample, OverflowPolicy::Err)?;
205///
206/// // Slow path — called from a lower-priority task or on a timer:
207/// queue.drain_all(&mut scratch, false).await?;
208///
209/// // Read path — flash items are popped first, then RAM items:
210/// if let Some(data) = queue.pop(&mut buf).await? {
211///     // process data
212/// }
213/// ```
214///
215/// For ISR-safe use, enable the `shared-ram-ring` feature and use [`SharedRamRing`] instead.
216pub struct BufferedQueue<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> {
217    storage: QueueStorage<S, C>,
218    ram: RamRing<RAM_BYTES>,
219}
220
221impl<S: NorFlash, C: CacheImpl, const RAM_BYTES: usize> BufferedQueue<S, C, RAM_BYTES> {
222    /// Wrap an existing [`QueueStorage`] with a RAM ring buffer.
223    pub fn new(storage: QueueStorage<S, C>) -> Self {
224        Self {
225            storage,
226            ram: RamRing::new(),
227        }
228    }
229
230    /// Enqueue an item into the RAM ring buffer.
231    ///
232    /// This is **synchronous and never touches flash**. When the ring is full the behaviour
233    /// is determined by `policy`: return `Err(())` or evict the oldest item.
234    #[allow(clippy::result_unit_err)]
235    pub fn enqueue(&mut self, data: &[u8], policy: OverflowPolicy) -> Result<(), ()> {
236        match policy {
237            OverflowPolicy::Err => self.ram.push(data),
238            OverflowPolicy::DiscardOldest => self.ram.push_overwriting(data),
239        }
240    }
241
242    /// Drain one item from the RAM ring to flash.
243    ///
244    /// `scratch` is caller-provided temporary storage; it must be at least as large as the
245    /// oldest pending item.
246    ///
247    /// Returns `Ok(true)` if an item was committed to flash, `Ok(false)` if the ring was empty.
248    pub async fn drain_one(
249        &mut self,
250        scratch: &mut [u8],
251        allow_overwrite: bool,
252    ) -> Result<bool, Error<S::Error>> {
253        let Some(data) = self.ram.peek_into(scratch) else {
254            return Ok(false);
255        };
256        let len = data.len();
257        self.storage.push(&scratch[..len], allow_overwrite).await?;
258        self.ram.discard_oldest();
259        Ok(true)
260    }
261
262    /// Drain all RAM-buffered items to flash.
263    ///
264    /// `scratch` must be large enough for the largest pending item.
265    pub async fn drain_all(
266        &mut self,
267        scratch: &mut [u8],
268        allow_overwrite: bool,
269    ) -> Result<(), Error<S::Error>> {
270        while self.drain_one(scratch, allow_overwrite).await? {}
271        Ok(())
272    }
273
274    /// Pop the oldest item from the queue.
275    ///
276    /// Flash items are always older than RAM items, so flash is read first. If flash is
277    /// empty the oldest item is taken directly from the RAM ring without writing to flash.
278    pub async fn pop<'d>(
279        &mut self,
280        data_buffer: &'d mut [u8],
281    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
282    where
283        S: MultiwriteNorFlash,
284    {
285        // Reborrow so we can reuse data_buffer if flash returns None.
286        let flash_len = self.storage.pop(&mut *data_buffer).await?.map(|s| s.len());
287        if let Some(len) = flash_len {
288            return Ok(Some(&mut data_buffer[..len]));
289        }
290        let len = self.ram.peek_into(data_buffer).map(|s| s.len());
291        if let Some(len) = len {
292            self.ram.discard_oldest();
293            return Ok(Some(&mut data_buffer[..len]));
294        }
295        Ok(None)
296    }
297
298    /// Peek at the oldest item without removing it.
299    ///
300    /// Flash items are always older than RAM items, so flash is read first. If flash is
301    /// empty the oldest item is read directly from the RAM ring without writing to flash.
302    pub async fn peek<'d>(
303        &mut self,
304        data_buffer: &'d mut [u8],
305    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
306    where
307        S: MultiwriteNorFlash,
308    {
309        // Reborrow so we can reuse data_buffer if flash returns None.
310        let flash_len = self.storage.peek(&mut *data_buffer).await?.map(|s| s.len());
311        if let Some(len) = flash_len {
312            return Ok(Some(&mut data_buffer[..len]));
313        }
314        let len = self.ram.peek_into(data_buffer).map(|s| s.len());
315        if let Some(len) = len {
316            return Ok(Some(&mut data_buffer[..len]));
317        }
318        Ok(None)
319    }
320
321    /// Total capacity of the RAM ring in bytes (including 2-byte per-item length prefixes).
322    pub const fn ram_capacity_bytes() -> usize {
323        RAM_BYTES
324    }
325
326    /// Free bytes remaining in the RAM ring.
327    pub fn ram_free_bytes(&self) -> usize {
328        RAM_BYTES - self.ram.bytes_used()
329    }
330
331    /// Number of items currently buffered in RAM (not yet committed to flash).
332    pub fn ram_pending_count(&self) -> usize {
333        self.ram.len()
334    }
335
336    /// Bytes currently occupied in the RAM ring (including 2-byte per-item length prefixes).
337    pub fn ram_bytes_used(&self) -> usize {
338        self.ram.bytes_used()
339    }
340
341    /// Consume this [`BufferedQueue`] and return the inner [`QueueStorage`].
342    ///
343    /// **Any items still in the RAM ring are discarded.**
344    pub fn into_storage(self) -> QueueStorage<S, C> {
345        self.storage
346    }
347}
348
349// ── SharedRamRing ─────────────────────────────────────────────────────────────
350
351/// An ISR-safe RAM ring buffer with an Embassy [`Signal`][embassy_sync::signal::Signal] that
352/// wakes a drain task on enqueue.
353///
354/// Designed to be placed in a `static`:
355/// ```ignore
356/// static RING: SharedRamRing<256> = SharedRamRing::new();
357/// ```
358///
359/// The [`enqueue`][SharedRamRing::enqueue] method is synchronous and interrupt-safe.
360/// Drain and read methods (`drain_one`, `drain_all`, `pop`, `peek`) are `async` and intended
361/// for task context; they take a `&mut QueueStorage` so flash access remains exclusive to
362/// one task.
363///
364/// ## Typical wiring
365///
366/// ```ignore
367/// static RING: SharedRamRing<256> = SharedRamRing::new();
368///
369/// // In an interrupt handler (or anywhere, no async needed):
370/// RING.enqueue(&sensor_sample, OverflowPolicy::DiscardOldest);
371///
372/// // In the drain task:
373/// #[embassy_executor::task]
374/// async fn drain(mut storage: QueueStorage<Flash, NoCache>) {
375///     let mut scratch = [0u8; 64];
376///     loop {
377///         RING.wait_and_drain_all(&mut storage, &mut scratch, false).await.unwrap();
378///     }
379/// }
380/// ```
381#[cfg(feature = "shared-ram-ring")]
382pub struct SharedRamRing<const N: usize> {
383    ring: embassy_sync::blocking_mutex::Mutex<
384        embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
385        core::cell::RefCell<RamRing<N>>,
386    >,
387    signal: embassy_sync::signal::Signal<
388        embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex,
389        (),
390    >,
391}
392
393#[cfg(feature = "shared-ram-ring")]
394impl<const N: usize> SharedRamRing<N> {
395    /// Create a new `SharedRamRing`. Suitable for `static` initialisation.
396    pub const fn new() -> Self {
397        Self {
398            ring: embassy_sync::blocking_mutex::Mutex::new(
399                core::cell::RefCell::new(RamRing::new()),
400            ),
401            signal: embassy_sync::signal::Signal::new(),
402        }
403    }
404
405    // ── Producer API (sync, ISR-safe) ─────────────────────────────────────────
406
407    /// Enqueue an item. Safe to call from any context, including interrupt handlers.
408    ///
409    /// Signals the drain task after a successful enqueue so it wakes without polling.
410    /// Returns `Err(())` if the ring is full and `policy` is [`OverflowPolicy::Err`].
411    #[allow(clippy::result_unit_err)]
412    pub fn enqueue(&self, data: &[u8], policy: OverflowPolicy) -> Result<(), ()> {
413        let result = self.ring.lock(|r| match policy {
414            OverflowPolicy::Err => r.borrow_mut().push(data),
415            OverflowPolicy::DiscardOldest => r.borrow_mut().push_overwriting(data),
416        });
417        if result.is_ok() {
418            self.signal.signal(());
419        }
420        result
421    }
422
423    // ── Task-context API (async) ──────────────────────────────────────────────
424
425    /// Wait until at least one item has been enqueued since the last `wait`.
426    pub async fn wait(&self) {
427        self.signal.wait().await;
428    }
429
430    /// Drain one item from the ring to flash.
431    ///
432    /// `scratch` must be at least [`oldest_ram_item_len`][Self::oldest_ram_item_len] bytes.
433    /// Returns `Ok(true)` if an item was written, `Ok(false)` if the ring was empty.
434    ///
435    /// The critical section is held only for the brief ring peek/discard; the slow flash
436    /// write runs outside it.
437    pub async fn drain_one<S: NorFlash, C: CacheImpl>(
438        &self,
439        storage: &mut QueueStorage<S, C>,
440        scratch: &mut [u8],
441        allow_overwrite: bool,
442    ) -> Result<bool, Error<S::Error>> {
443        let len = self
444            .ring
445            .lock(|r| r.borrow().peek_into(scratch).map(|s| s.len()));
446        let Some(len) = len else {
447            return Ok(false);
448        };
449        storage.push(&scratch[..len], allow_overwrite).await?;
450        self.ring.lock(|r| r.borrow_mut().discard_oldest());
451        Ok(true)
452    }
453
454    /// Drain all ring items to flash.
455    ///
456    /// `scratch` must be large enough for the largest pending item.
457    pub async fn drain_all<S: NorFlash, C: CacheImpl>(
458        &self,
459        storage: &mut QueueStorage<S, C>,
460        scratch: &mut [u8],
461        allow_overwrite: bool,
462    ) -> Result<(), Error<S::Error>> {
463        while self.drain_one(storage, scratch, allow_overwrite).await? {}
464        Ok(())
465    }
466
467    /// Wait for the signal, then drain all ring items to flash.
468    ///
469    /// This is the recommended drain-task body:
470    /// ```ignore
471    /// loop {
472    ///     ring.wait_and_drain_all(&mut storage, &mut scratch, false).await.unwrap();
473    /// }
474    /// ```
475    pub async fn wait_and_drain_all<S: NorFlash, C: CacheImpl>(
476        &self,
477        storage: &mut QueueStorage<S, C>,
478        scratch: &mut [u8],
479        allow_overwrite: bool,
480    ) -> Result<(), Error<S::Error>> {
481        self.wait().await;
482        self.drain_all(storage, scratch, allow_overwrite).await
483    }
484
485    /// Pop the oldest item (drains ring to flash first to preserve ordering).
486    pub async fn pop<'d, S: MultiwriteNorFlash, C: CacheImpl>(
487        &self,
488        storage: &mut QueueStorage<S, C>,
489        data_buffer: &'d mut [u8],
490        allow_overwrite: bool,
491    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
492        if self.ram_pending_count() > 0 {
493            self.drain_all(storage, data_buffer, allow_overwrite)
494                .await?;
495        }
496        storage.pop(data_buffer).await
497    }
498
499    /// Peek at the oldest item without removing it (drains ring to flash first).
500    pub async fn peek<'d, S: MultiwriteNorFlash, C: CacheImpl>(
501        &self,
502        storage: &mut QueueStorage<S, C>,
503        data_buffer: &'d mut [u8],
504        allow_overwrite: bool,
505    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
506        if self.ram_pending_count() > 0 {
507            self.drain_all(storage, data_buffer, allow_overwrite)
508                .await?;
509        }
510        storage.peek(data_buffer).await
511    }
512
513    // ── Introspection ─────────────────────────────────────────────────────────
514
515    /// Total capacity of the ring in bytes (including 2-byte per-item length prefixes).
516    pub const fn ram_capacity_bytes() -> usize {
517        N
518    }
519
520    /// Free bytes remaining in the ring.
521    pub fn ram_free_bytes(&self) -> usize {
522        self.ring.lock(|r| N - r.borrow().bytes_used())
523    }
524
525    /// Number of items currently buffered in the ring.
526    pub fn ram_pending_count(&self) -> usize {
527        self.ring.lock(|r| r.borrow().len())
528    }
529
530    /// Byte length of the oldest item in the ring, or `None` if the ring is empty.
531    pub fn oldest_ram_item_len(&self) -> Option<usize> {
532        self.ring.lock(|r| r.borrow().oldest_len())
533    }
534}
535
536#[cfg(feature = "shared-ram-ring")]
537impl<const N: usize> Default for SharedRamRing<N> {
538    fn default() -> Self {
539        Self::new()
540    }
541}
542
543// ── Unit tests ────────────────────────────────────────────────────────────────
544
545#[cfg(test)]
546mod tests {
547    use super::*;
548
549    #[test]
550    fn push_peek_discard() {
551        let mut ring: RamRing<64> = RamRing::new();
552        assert!(ring.is_empty());
553
554        ring.push(b"hello").unwrap();
555        ring.push(b"world").unwrap();
556        assert_eq!(ring.len(), 2);
557        assert_eq!(ring.oldest_len(), Some(5));
558
559        let mut buf = [0u8; 32];
560        assert_eq!(ring.peek_into(&mut buf), Some(b"hello".as_ref()));
561        assert_eq!(ring.len(), 2); // peek does not consume
562
563        ring.discard_oldest();
564        assert_eq!(ring.len(), 1);
565        assert_eq!(ring.peek_into(&mut buf), Some(b"world".as_ref()));
566
567        ring.discard_oldest();
568        assert!(ring.is_empty());
569        assert_eq!(ring.peek_into(&mut buf), None);
570    }
571
572    #[test]
573    fn wrap_around() {
574        // Use a tight buffer: 2 items of 3 bytes each = 2*(2+3)=10 bytes
575        let mut ring: RamRing<10> = RamRing::new();
576        ring.push(b"abc").unwrap();
577        ring.push(b"def").unwrap();
578        // Full — no room for another
579        assert!(ring.push(b"x").is_err());
580
581        let mut buf = [0u8; 8];
582        ring.discard_oldest();
583        ring.push(b"ghi").unwrap(); // wraps around
584        assert_eq!(ring.peek_into(&mut buf), Some(b"def".as_ref()));
585        ring.discard_oldest();
586        assert_eq!(ring.peek_into(&mut buf), Some(b"ghi".as_ref()));
587    }
588
589    #[test]
590    fn push_overwriting_evicts_oldest() {
591        // 10-byte ring fits exactly 2 items of 3 bytes (2*(2+3)=10)
592        let mut ring: RamRing<10> = RamRing::new();
593        ring.push(b"aaa").unwrap();
594        ring.push(b"bbb").unwrap();
595
596        // Overwriting push evicts "aaa" to make room for "ccc"
597        ring.push_overwriting(b"ccc").unwrap();
598        assert_eq!(ring.len(), 2);
599
600        let mut buf = [0u8; 8];
601        assert_eq!(ring.peek_into(&mut buf), Some(b"bbb".as_ref()));
602        ring.discard_oldest();
603        assert_eq!(ring.peek_into(&mut buf), Some(b"ccc".as_ref()));
604    }
605
606    // ── Integration tests (require mock flash via _test feature) ─────────────
607
608    #[cfg(feature = "_test")]
609    mod integration {
610        use super::*;
611        use crate::cache::NoCache;
612        use crate::mock_flash::MockFlashBase;
613        use crate::queue::{QueueConfig, QueueStorage};
614        use futures::executor::block_on;
615
616        // 4 pages × 64 words × 4 bytes/word = 1 KiB flash
617        type MockFlash = MockFlashBase<4, 4, 64>;
618
619        fn make_storage() -> QueueStorage<MockFlash, NoCache> {
620            let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
621            let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
622            QueueStorage::new(flash, config, NoCache::new())
623        }
624
625        fn make_queue() -> BufferedQueue<MockFlash, NoCache, 256> {
626            BufferedQueue::new(make_storage())
627        }
628
629        #[test]
630        fn enqueue_drain_pop() {
631            block_on(async {
632                let mut queue = make_queue();
633                let mut scratch = [0u8; 64];
634                let mut out = [0u8; 64];
635
636                // Enqueue two items into RAM — no flash I/O yet.
637                queue.enqueue(b"hello", OverflowPolicy::Err).unwrap();
638                queue.enqueue(b"world", OverflowPolicy::Err).unwrap();
639                assert_eq!(queue.ram_pending_count(), 2);
640
641                // Drain RAM → flash.
642                queue.drain_all(&mut scratch, false).await.unwrap();
643                assert_eq!(queue.ram_pending_count(), 0);
644
645                // Pop from flash in FIFO order.
646                let data = queue.pop(&mut out).await.unwrap().unwrap();
647                assert_eq!(data, b"hello");
648
649                let data = queue.pop(&mut out).await.unwrap().unwrap();
650                assert_eq!(data, b"world");
651
652                assert!(queue.pop(&mut out).await.unwrap().is_none());
653            });
654        }
655
656        #[test]
657        fn pop_reads_flash_before_ram() {
658            block_on(async {
659                // Push "first" directly to flash, then wrap in a BufferedQueue with "second" in RAM.
660                // The mock flash requires write buffers aligned to BYTES_PER_WORD = 4 bytes.
661                let mut storage = make_storage();
662                let mut aligned = [0u8; 8];
663                aligned[..5].copy_from_slice(b"first");
664                storage.push(&aligned[..5], false).await.unwrap();
665
666                let mut queue: BufferedQueue<MockFlash, NoCache, 256> = BufferedQueue::new(storage);
667                let mut out = [0u8; 64];
668
669                // Buffer "second" in RAM only.
670                queue.enqueue(b"second", OverflowPolicy::Err).unwrap();
671
672                // pop returns flash items (older) before RAM items.
673                let data = queue.pop(&mut out).await.unwrap().unwrap();
674                assert_eq!(data, b"first");
675
676                let data = queue.pop(&mut out).await.unwrap().unwrap();
677                assert_eq!(data, b"second");
678
679                assert!(queue.pop(&mut out).await.unwrap().is_none());
680            });
681        }
682
683        #[test]
684        fn overflow_policy_err() {
685            // 16-byte ring: each item costs 2 (prefix) + data.len() bytes.
686            // 3 items of 4 bytes = 3*6 = 18 bytes — won't all fit.
687            let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
688            let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
689            let storage = QueueStorage::new(flash, config, NoCache::new());
690            let mut queue: BufferedQueue<MockFlash, NoCache, 16> = BufferedQueue::new(storage);
691
692            queue.enqueue(b"aaaa", OverflowPolicy::Err).unwrap(); // 6 bytes used
693            queue.enqueue(b"bbbb", OverflowPolicy::Err).unwrap(); // 12 bytes used
694            assert!(queue.enqueue(b"cccc", OverflowPolicy::Err).is_err()); // 18 > 16
695        }
696
697        #[test]
698        fn overflow_policy_discard_oldest() {
699            let flash = MockFlash::new(crate::mock_flash::WriteCountCheck::Twice, None, true);
700            let config = QueueConfig::new(MockFlash::FULL_FLASH_RANGE);
701            let storage = QueueStorage::new(flash, config, NoCache::new());
702            let mut queue: BufferedQueue<MockFlash, NoCache, 16> = BufferedQueue::new(storage);
703
704            queue.enqueue(b"aaaa", OverflowPolicy::Err).unwrap();
705            queue.enqueue(b"bbbb", OverflowPolicy::Err).unwrap();
706            // "aaaa" is evicted to make room for "cccc"
707            queue
708                .enqueue(b"cccc", OverflowPolicy::DiscardOldest)
709                .unwrap();
710
711            assert_eq!(queue.ram_pending_count(), 2);
712
713            // Drain to flash and pop to verify FIFO order with "aaaa" evicted.
714            block_on(async {
715                let mut out = [0u8; 64];
716                let data = queue.pop(&mut out).await.unwrap().unwrap();
717                assert_eq!(data, b"bbbb");
718                let data = queue.pop(&mut out).await.unwrap().unwrap();
719                assert_eq!(data, b"cccc");
720            });
721        }
722
723        #[test]
724        fn capacity_helpers() {
725            let queue = make_queue();
726            assert_eq!(
727                BufferedQueue::<MockFlash, NoCache, 256>::ram_capacity_bytes(),
728                256
729            );
730            assert_eq!(queue.ram_free_bytes(), 256);
731            assert_eq!(queue.ram_bytes_used(), 0);
732        }
733    }
734}