sequential_storage/
queue.rs

1//! Implementation of the queue logic.
2
3use crate::item::{Item, ItemHeader, ItemHeaderIter};
4
5use self::{cache::CacheImpl, item::ItemUnborrowed};
6
7use super::{
8    Debug, Deref, DerefMut, Error, GenericStorage, MAX_WORD_SIZE, NorFlash, NorFlashExt, PageState,
9    PhantomData, Range, cache, calculate_page_address, calculate_page_end_address,
10    calculate_page_index, calculate_page_size, item, run_with_auto_repair,
11};
12use embedded_storage_async::nor_flash::MultiwriteNorFlash;
13
14/// Configuration for a queue
15pub struct QueueConfig<S> {
16    flash_range: Range<u32>,
17    _phantom: PhantomData<S>,
18}
19
20impl<S: NorFlash> QueueConfig<S> {
21    /// Create a new queue configuration. Will panic if the data is invalid.
22    /// If you want a fallible version, use [`Self::try_new`].
23    #[must_use]
24    pub const fn new(flash_range: Range<u32>) -> Self {
25        Self::try_new(flash_range).expect("Queue config must be correct")
26    }
27
28    /// Create a new queue configuration. Will return None if the data is invalid
29    #[must_use]
30    pub const fn try_new(flash_range: Range<u32>) -> Option<Self> {
31        if !flash_range.start.is_multiple_of(S::ERASE_SIZE as u32) {
32            return None;
33        }
34        if !flash_range.end.is_multiple_of(S::ERASE_SIZE as u32) {
35            return None;
36        }
37        // At least 1 page is used
38        if flash_range.end - flash_range.start < (S::ERASE_SIZE as u32) {
39            return None;
40        }
41
42        if S::ERASE_SIZE < S::WORD_SIZE * 4 {
43            return None;
44        }
45        if S::WORD_SIZE > MAX_WORD_SIZE {
46            return None;
47        }
48
49        Some(Self {
50            flash_range,
51            _phantom: PhantomData,
52        })
53    }
54}
55
56/// A fifo queue storage
57///
58/// Use [`Self::push`] to add data to the fifo and use [`Self::peek`] and [`Self::pop`] to get the data back.
59///
60/// ## Basic API
61///
62/// ```rust
63/// # use sequential_storage::cache::NoCache;
64/// # use sequential_storage::queue::{QueueConfig, QueueStorage};
65/// # use mock_flash::MockFlashBase;
66/// # use futures::executor::block_on;
67/// # type Flash = MockFlashBase<10, 1, 4096>;
68/// # mod mock_flash {
69/// #   include!("mock_flash.rs");
70/// # }
71/// #
72/// # fn init_flash() -> Flash {
73/// #     Flash::new(mock_flash::WriteCountCheck::Twice, None, false)
74/// # }
75/// #
76/// # block_on(async {
77///
78/// // Initialize the flash. This can be internal or external
79/// let mut flash = init_flash();
80///
81/// let mut storage = QueueStorage::new(flash, const { QueueConfig::new(0x1000..0x3000) }, NoCache::new());
82/// // We need to give the crate a buffer to work with.
83/// // It must be big enough to serialize the biggest value of your storage type in.
84/// let mut data_buffer = [0; 128];
85///
86/// let my_data = [10, 47, 29];
87///
88/// // We can push some data to the queue
89/// storage.push(&my_data, false).await.unwrap();
90///
91/// // We can peek at the oldest data
92///
93/// assert_eq!(
94///     &storage.peek(&mut data_buffer).await.unwrap().unwrap()[..],
95///     &my_data[..]
96/// );
97///
98/// // With popping we get back the oldest data, but that data is now also removed
99///
100/// assert_eq!(
101///     &storage.pop(&mut data_buffer).await.unwrap().unwrap()[..],
102///     &my_data[..]
103/// );
104///
105/// // If we pop again, we find there's no data anymore
106///
107/// assert_eq!(
108///     storage.pop(&mut data_buffer).await,
109///     Ok(None)
110/// );
111/// # });
112/// ```
113pub struct QueueStorage<S: NorFlash, C: CacheImpl> {
114    inner: GenericStorage<S, C>,
115}
116
117impl<S: NorFlash, C: CacheImpl> QueueStorage<S, C> {
118    /// Create a new (fifo) queue instance
119    ///
120    /// The provided cache instance must be new or must be in the exact correct state for the current flash contents.
121    /// If the cache is bad, undesirable things will happen.
122    /// So, it's ok to reuse the cache gotten from the [`Self::destroy`] method when the flash hasn't changed since calling destroy.
123    pub const fn new(storage: S, config: QueueConfig<S>, cache: C) -> Self {
124        Self {
125            inner: GenericStorage {
126                flash: storage,
127                flash_range: config.flash_range,
128                cache,
129            },
130        }
131    }
132
133    /// Push data into the queue.
134    /// The data can only be taken out with the [`Self::pop`] function.
135    ///
136    /// Old data will not be overwritten unless `allow_overwrite_old_data` is true.
137    /// If it is, then if the queue is full, the oldest data is removed to make space for the new data.
138    ///
139    /// *Note: If a page is already used and you push more data than the remaining capacity of the page,
140    /// the entire remaining capacity will go unused because the data is stored on the next page.*
141    pub async fn push(
142        &mut self,
143        data: &[u8],
144        allow_overwrite_old_data: bool,
145    ) -> Result<(), Error<S::Error>> {
146        run_with_auto_repair!(
147            function = self.push_inner(data, allow_overwrite_old_data).await,
148            repair = self.try_repair().await?
149        )
150    }
151
152    async fn push_inner(
153        &mut self,
154        data: &[u8],
155        allow_overwrite_old_data: bool,
156    ) -> Result<(), Error<S::Error>> {
157        if self.inner.cache.is_dirty() {
158            self.inner.cache.invalidate_cache_state();
159        }
160
161        // Data must fit in a single page
162        if data.len() > u16::MAX as usize
163            || data.len()
164                > calculate_page_size::<S>()
165                    .saturating_sub(ItemHeader::data_address::<S>(0) as usize)
166        {
167            self.inner.cache.unmark_dirty();
168            return Err(Error::ItemTooBig);
169        }
170
171        let current_page = self.find_youngest_page().await?;
172
173        let page_data_start_address =
174            calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
175        let page_data_end_address =
176            calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
177
178        self.inner.partial_close_page(current_page).await?;
179
180        // Find the last item on the page so we know where we need to write
181
182        let mut next_address = self
183            .inner
184            .find_next_free_item_spot(
185                page_data_start_address,
186                page_data_end_address,
187                data.len() as u32,
188            )
189            .await?;
190
191        if next_address.is_none() {
192            // No cap left on this page, move to the next page
193            let next_page = self.inner.next_page(current_page);
194            let next_page_state = self.inner.get_page_state(next_page).await?;
195            let single_page = next_page == current_page;
196
197            match (next_page_state, single_page) {
198                (PageState::Open, _) => {
199                    self.inner.close_page(current_page).await?;
200                    self.inner.partial_close_page(next_page).await?;
201                    next_address = Some(
202                        calculate_page_address::<S>(self.flash_range(), next_page)
203                            + S::WORD_SIZE as u32,
204                    );
205                }
206                (PageState::Closed, _) | (PageState::PartialOpen, true) => {
207                    let next_page_data_start_address =
208                        calculate_page_address::<S>(self.flash_range(), next_page)
209                            + S::WORD_SIZE as u32;
210
211                    if !allow_overwrite_old_data
212                        && !self
213                            .inner
214                            .is_page_empty(next_page, Some(next_page_state))
215                            .await?
216                    {
217                        self.inner.cache.unmark_dirty();
218                        return Err(Error::FullStorage);
219                    }
220
221                    self.inner.open_page(next_page).await?;
222                    if !single_page {
223                        self.inner.close_page(current_page).await?;
224                    }
225                    self.inner.partial_close_page(next_page).await?;
226                    next_address = Some(next_page_data_start_address);
227                }
228                (PageState::PartialOpen, false) => {
229                    // This should never happen
230                    return Err(Error::Corrupted {
231                        #[cfg(feature = "_test")]
232                        backtrace: std::backtrace::Backtrace::capture(),
233                    });
234                }
235            }
236        }
237
238        Item::write_new(
239            &mut self.inner.flash,
240            self.inner.flash_range.clone(),
241            &mut self.inner.cache,
242            next_address.unwrap(),
243            data,
244        )
245        .await?;
246
247        self.inner.cache.unmark_dirty();
248        Ok(())
249    }
250
251    /// Get an iterator-like interface to iterate over the items stored in the queue.
252    /// This goes from oldest to newest.
253    ///
254    /// The iteration happens non-destructively, or in other words it peeks at every item.
255    /// The returned entry has a [`QueueIteratorEntry::pop`] function with which you can decide to pop the item
256    /// after you've seen the contents.
257    pub async fn iter(&mut self) -> Result<QueueIterator<'_, S, C>, Error<S::Error>> {
258        // Note: Corruption repair is done in these functions already
259        QueueIterator::new(self).await
260    }
261
262    /// Peek at the oldest data.
263    ///
264    /// If you also want to remove the data use [`Self::pop`].
265    ///
266    /// The data is written to the given `data_buffer` and the part that was written is returned.
267    /// It is valid to only use the length of the returned slice and use the original `data_buffer`.
268    /// The `data_buffer` may contain extra data on ranges after the returned slice.
269    /// You should not depend on that data.
270    ///
271    /// If the data buffer is not big enough an error is returned.
272    pub async fn peek<'d>(
273        &mut self,
274        data_buffer: &'d mut [u8],
275    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
276        // Note: Corruption repair is done in these functions already
277        let mut iterator = self.iter().await?;
278
279        let next_value = iterator.next(data_buffer).await?;
280
281        match next_value {
282            Some(entry) => Ok(Some(entry.into_buf())),
283            None => Ok(None),
284        }
285    }
286
287    /// Pop the oldest data from the queue.
288    ///
289    /// If you don't want to remove the data use [`Self::peek`].
290    ///
291    /// The data is written to the given `data_buffer` and the part that was written is returned.
292    /// It is valid to only use the length of the returned slice and use the original `data_buffer`.
293    /// The `data_buffer` may contain extra data on ranges after the returned slice.
294    /// You should not depend on that data.
295    ///
296    /// If the data buffer is not big enough an error is returned.
297    pub async fn pop<'d>(
298        &mut self,
299        data_buffer: &'d mut [u8],
300    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
301    where
302        S: MultiwriteNorFlash,
303    {
304        let mut iterator = self.iter().await?;
305
306        let next_value = iterator.next(data_buffer).await?;
307
308        match next_value {
309            Some(entry) => Ok(Some(entry.pop().await?)),
310            None => Ok(None),
311        }
312    }
313
314    /// Find the largest size of data that can be stored.
315    ///
316    /// This will read through the entire flash to find the largest chunk of
317    /// data that can be stored, taking alignment requirements of the item into account.
318    ///
319    /// If there is no space left, `None` is returned.
320    pub async fn find_max_fit(&mut self) -> Result<Option<u32>, Error<S::Error>> {
321        run_with_auto_repair!(
322            function = self.find_max_fit_inner().await,
323            repair = self.try_repair().await?
324        )
325    }
326
327    async fn find_max_fit_inner(&mut self) -> Result<Option<u32>, Error<S::Error>> {
328        if self.inner.cache.is_dirty() {
329            self.inner.cache.invalidate_cache_state();
330        }
331
332        let current_page = self.find_youngest_page().await?;
333
334        // Check if we have space on the next page
335        let next_page = self.inner.next_page(current_page);
336        match self.inner.get_page_state(next_page).await? {
337            state @ PageState::Closed => {
338                if self.inner.is_page_empty(next_page, Some(state)).await? {
339                    self.inner.cache.unmark_dirty();
340                    return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
341                }
342            }
343            PageState::Open => {
344                self.inner.cache.unmark_dirty();
345                return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
346            }
347            PageState::PartialOpen => {
348                // This should never happen
349                return Err(Error::Corrupted {
350                    #[cfg(feature = "_test")]
351                    backtrace: std::backtrace::Backtrace::capture(),
352                });
353            }
354        }
355
356        // See how much space we can find in the current page.
357        let page_data_start_address =
358            calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
359        let page_data_end_address =
360            calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
361
362        let next_item_address = match self.inner.cache.first_item_after_written(current_page) {
363            Some(next_item_address) => next_item_address,
364            None => {
365                ItemHeaderIter::new(
366                    self.inner
367                        .cache
368                        .first_item_after_erased(current_page)
369                        .unwrap_or(page_data_start_address),
370                    page_data_end_address,
371                )
372                .traverse(&mut self.inner.flash, |_, _| true)
373                .await?
374                .1
375            }
376        };
377
378        self.inner.cache.unmark_dirty();
379        Ok(ItemHeader::available_data_bytes::<S>(
380            page_data_end_address - next_item_address,
381        ))
382    }
383
384    /// Calculate how much space is left free in the queue (in bytes).
385    ///
386    /// The number given back is accurate, however there are lots of things that add overhead and padding.
387    /// Every push is an item with its own overhead. You can check the overhead per item with [`Self::item_overhead_size`].
388    ///
389    /// Furthermore, every item has to fully fit in a page. So if a page has 50 bytes left and you push an item of 60 bytes,
390    /// the current page is closed and the item is stored on the next page, 'wasting' the 50 you had.
391    ///
392    /// So unless you're tracking all this, the returned number should only be used as a rough indication.
393    pub async fn space_left(&mut self) -> Result<u32, Error<S::Error>> {
394        run_with_auto_repair!(
395            function = self.space_left_inner().await,
396            repair = self.try_repair().await?
397        )
398    }
399
400    async fn space_left_inner(&mut self) -> Result<u32, Error<S::Error>> {
401        if self.inner.cache.is_dirty() {
402            self.inner.cache.invalidate_cache_state();
403        }
404
405        let mut total_free_space = 0;
406
407        for page in self.inner.get_pages(0) {
408            let state = self.inner.get_page_state(page).await?;
409            let page_empty = self.inner.is_page_empty(page, Some(state)).await?;
410
411            if state.is_closed() && !page_empty {
412                continue;
413            }
414
415            // See how much space we can find in the current page.
416            let page_data_start_address =
417                calculate_page_address::<S>(self.flash_range(), page) + S::WORD_SIZE as u32;
418            let page_data_end_address =
419                calculate_page_end_address::<S>(self.flash_range(), page) - S::WORD_SIZE as u32;
420
421            if page_empty {
422                total_free_space += page_data_end_address - page_data_start_address;
423                continue;
424            }
425
426            // Partial open page
427            let next_item_address = match self.inner.cache.first_item_after_written(page) {
428                Some(next_item_address) => next_item_address,
429                None => {
430                    ItemHeaderIter::new(
431                        self.inner
432                            .cache
433                            .first_item_after_erased(page)
434                            .unwrap_or(page_data_start_address),
435                        page_data_end_address,
436                    )
437                    .traverse(&mut self.inner.flash, |_, _| true)
438                    .await?
439                    .1
440                }
441            };
442
443            if ItemHeader::available_data_bytes::<S>(page_data_end_address - next_item_address)
444                .is_none()
445            {
446                // No data fits on this partial open page anymore.
447                // So if all data on this is already erased, then this page might as well be counted as empty.
448                // We can use [is_page_empty] and lie to to it so it checks the items.
449                if self
450                    .inner
451                    .is_page_empty(page, Some(PageState::Closed))
452                    .await?
453                {
454                    total_free_space += page_data_end_address - page_data_start_address;
455                    continue;
456                }
457            }
458
459            total_free_space += page_data_end_address - next_item_address;
460        }
461
462        self.inner.cache.unmark_dirty();
463        Ok(total_free_space)
464    }
465
466    async fn find_youngest_page(&mut self) -> Result<usize, Error<S::Error>> {
467        let last_used_page = self
468            .inner
469            .find_first_page(0, PageState::PartialOpen)
470            .await?;
471
472        if let Some(last_used_page) = last_used_page {
473            return Ok(last_used_page);
474        }
475
476        // We have no partial open page. Search for a closed page to anker ourselves to
477        let first_closed_page = self.inner.find_first_page(0, PageState::Closed).await?;
478
479        let first_open_page = match first_closed_page {
480            Some(anchor) => {
481                // We have at least one closed page
482                // The first one after is the page we need to use
483                self.inner.find_first_page(anchor, PageState::Open).await?
484            }
485            None => {
486                // No closed pages and no partial open pages, so all pages should be open
487                // Might as well start at page 0
488                Some(0)
489            }
490        };
491
492        if let Some(first_open_page) = first_open_page {
493            return Ok(first_open_page);
494        }
495
496        // All pages are closed... This is not correct.
497        Err(Error::Corrupted {
498            #[cfg(feature = "_test")]
499            backtrace: std::backtrace::Backtrace::capture(),
500        })
501    }
502
503    async fn find_oldest_page(&mut self) -> Result<usize, Error<S::Error>> {
504        let youngest_page = self.find_youngest_page().await?;
505
506        // The oldest page is the first non-open page after the youngest page
507        let oldest_closed_page = self
508            .inner
509            .find_first_page(youngest_page, PageState::Closed)
510            .await?;
511
512        Ok(oldest_closed_page.unwrap_or(youngest_page))
513    }
514
515    /// Try to repair the state of the flash to hopefull get back everything in working order.
516    /// Care is taken that no data is lost, but this depends on correctly repairing the state and
517    /// so is only best effort.
518    ///
519    /// This function might be called after a different function returned the [`Error::Corrupted`] error.
520    /// There's no guarantee it will work.
521    ///
522    /// If this function or the function call after this crate returns [`Error::Corrupted`], then it's unlikely
523    /// that the state can be recovered. To at least make everything function again at the cost of losing the data,
524    /// erase the flash range.
525    async fn try_repair(&mut self) -> Result<(), Error<S::Error>> {
526        self.inner.cache.invalidate_cache_state();
527
528        self.inner.try_general_repair().await?;
529        Ok(())
530    }
531
532    async fn find_start_address(&mut self) -> Result<NextAddress, Error<S::Error>> {
533        if self.inner.cache.is_dirty() {
534            self.inner.cache.invalidate_cache_state();
535        }
536
537        let oldest_page = self.find_oldest_page().await?;
538
539        // We start at the start of the oldest page
540        let current_address = match self.inner.cache.first_item_after_erased(oldest_page) {
541            Some(address) => address,
542            None => {
543                calculate_page_address::<S>(self.inner.flash_range.clone(), oldest_page)
544                    + S::WORD_SIZE as u32
545            }
546        };
547
548        Ok(NextAddress::Address(current_address))
549    }
550
551    /// Resets the flash in the entire given flash range.
552    ///
553    /// This is just a thin helper function as it just calls the flash's erase function.
554    pub fn erase_all(&mut self) -> impl Future<Output = Result<(), Error<S::Error>>> {
555        self.inner.erase_all()
556    }
557
558    /// Get the minimal overhead size per stored item for the given flash type.
559    ///
560    /// The associated data of each item is additionally padded to a full flash word size, but that's not part of this number.\
561    /// This means the full item length is `returned number + (data length).next_multiple_of(S::WORD_SIZE)`.
562    #[must_use]
563    pub const fn item_overhead_size() -> u32 {
564        GenericStorage::<S, C>::item_overhead_size()
565    }
566
567    /// Destroy the instance to get back the flash and the cache.
568    ///
569    /// The cache can be passed to a new storage instance, but only for the same flash region and if nothing has changed in flash.
570    pub fn destroy(self) -> (S, C) {
571        self.inner.destroy()
572    }
573
574    /// Get a reference to the flash. Mutating the memory is at your own risk.
575    pub const fn flash(&mut self) -> &mut S {
576        self.inner.flash()
577    }
578
579    /// Get the flash range being used
580    pub const fn flash_range(&self) -> Range<u32> {
581        self.inner.flash_range()
582    }
583
584    #[cfg(any(test, feature = "std"))]
585    /// Print all items in flash to the returned string
586    ///
587    /// This is meant as a debugging utility. The string format is not stable.
588    pub fn print_items(&mut self) -> impl Future<Output = String> {
589        self.inner.print_items()
590    }
591}
592
593#[derive(PartialEq, Eq, Clone, Copy, Debug)]
594enum PreviousItemStates {
595    AllPopped,
596    AllButCurrentPopped,
597    Unpopped,
598}
599
600/// An iterator-like interface for peeking into data stored in flash with the option to pop it.
601pub struct QueueIterator<'s, S: NorFlash, C: CacheImpl> {
602    storage: &'s mut QueueStorage<S, C>,
603    next_address: NextAddress,
604    previous_item_states: PreviousItemStates,
605    oldest_page: usize,
606}
607
608impl<S: NorFlash, C: CacheImpl> Debug for QueueIterator<'_, S, C> {
609    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
610        f.debug_struct("QueueIterator")
611            .field("current_address", &self.next_address)
612            .finish_non_exhaustive()
613    }
614}
615
616#[derive(Debug, Clone, Copy)]
617enum NextAddress {
618    Address(u32),
619    PageAfter(usize),
620}
621
622impl<'s, S: NorFlash, C: CacheImpl> QueueIterator<'s, S, C> {
623    async fn new(storage: &'s mut QueueStorage<S, C>) -> Result<Self, Error<S::Error>> {
624        let start_address = run_with_auto_repair!(
625            function = storage.find_start_address().await,
626            repair = storage.try_repair().await?
627        )?;
628
629        let oldest_page = match start_address {
630            NextAddress::Address(address) => {
631                calculate_page_index::<S>(storage.inner.flash_range.clone(), address)
632            }
633            NextAddress::PageAfter(index) => storage.inner.next_page(index),
634        };
635
636        Ok(Self {
637            storage,
638            next_address: start_address,
639            previous_item_states: PreviousItemStates::AllPopped,
640            oldest_page,
641        })
642    }
643
644    /// Get the next entry.
645    ///
646    /// If there are no more entries, None is returned.
647    ///
648    /// The `data_buffer` has to be large enough to be able to hold the largest item in flash.
649    pub async fn next<'d, 'q>(
650        &'q mut self,
651        data_buffer: &'d mut [u8],
652    ) -> Result<Option<QueueIteratorEntry<'s, 'd, 'q, S, C>>, Error<S::Error>> {
653        // We continue from a place where the current item wasn't popped
654        // That means that from now on, the next item will have unpopped items behind it
655        if self.previous_item_states == PreviousItemStates::AllButCurrentPopped {
656            self.previous_item_states = PreviousItemStates::Unpopped;
657        }
658
659        let value = run_with_auto_repair!(
660            function = self.next_inner(data_buffer).await,
661            repair = self.storage.try_repair().await?
662        );
663
664        match value {
665            Ok(Some((item, address))) => Ok(Some(QueueIteratorEntry {
666                iter: self,
667                item: item.reborrow(data_buffer).ok_or_else(|| Error::LogicBug {
668                    #[cfg(feature = "_test")]
669                    backtrace: std::backtrace::Backtrace::capture(),
670                })?,
671                address,
672            })),
673            Ok(None) => Ok(None),
674            Err(e) => Err(e),
675        }
676    }
677
678    async fn next_inner(
679        &mut self,
680        data_buffer: &mut [u8],
681    ) -> Result<Option<(ItemUnborrowed, u32)>, Error<S::Error>> {
682        if self.storage.inner.cache.is_dirty() {
683            self.storage.inner.cache.invalidate_cache_state();
684        }
685
686        loop {
687            // Get the current page and address based on what was stored
688            let (current_page, current_address) = match self.next_address {
689                NextAddress::PageAfter(previous_page) => {
690                    let next_page = self.storage.inner.next_page(previous_page);
691                    if self
692                        .storage
693                        .inner
694                        .get_page_state(next_page)
695                        .await?
696                        .is_open()
697                        || next_page == self.oldest_page
698                    {
699                        self.storage.inner.cache.unmark_dirty();
700                        return Ok(None);
701                    }
702
703                    // We now know the previous page was left because there were no items on there anymore
704                    // If we know all those items were popped, we can proactively open the previous page
705                    // This is amazing for performance
706                    if self.previous_item_states == PreviousItemStates::AllPopped {
707                        self.storage.inner.open_page(previous_page).await?;
708                    }
709
710                    let current_address = calculate_page_address::<S>(
711                        self.storage.inner.flash_range.clone(),
712                        next_page,
713                    ) + S::WORD_SIZE as u32;
714
715                    self.next_address = NextAddress::Address(current_address);
716
717                    (next_page, current_address)
718                }
719                NextAddress::Address(address) => (
720                    calculate_page_index::<S>(self.storage.inner.flash_range.clone(), address),
721                    address,
722                ),
723            };
724
725            let page_data_end_address = calculate_page_end_address::<S>(
726                self.storage.inner.flash_range.clone(),
727                current_page,
728            ) - S::WORD_SIZE as u32;
729
730            // Search for the first item with data
731            let mut it = ItemHeaderIter::new(current_address, page_data_end_address);
732            // No need to worry about cache here since that has been dealt with at the creation of this iterator
733            if let (Some(found_item_header), found_item_address) = it
734                .traverse(&mut self.storage.inner.flash, |header, _| {
735                    header.crc.is_none()
736                })
737                .await?
738            {
739                let maybe_item = found_item_header
740                    .read_item(
741                        &mut self.storage.inner.flash,
742                        data_buffer,
743                        found_item_address,
744                        page_data_end_address,
745                    )
746                    .await?;
747
748                match maybe_item {
749                    item::MaybeItem::Corrupted(header, _) => {
750                        let next_address = header.next_item_address::<S>(found_item_address);
751                        self.next_address = if next_address >= page_data_end_address {
752                            NextAddress::PageAfter(current_page)
753                        } else {
754                            NextAddress::Address(next_address)
755                        };
756                    }
757                    item::MaybeItem::Erased(_, _) => {
758                        // Item is already erased
759                        return Err(Error::LogicBug {
760                            #[cfg(feature = "_test")]
761                            backtrace: std::backtrace::Backtrace::capture(),
762                        });
763                    }
764                    item::MaybeItem::Present(item) => {
765                        let next_address = item.header.next_item_address::<S>(found_item_address);
766                        self.next_address = if next_address >= page_data_end_address {
767                            NextAddress::PageAfter(current_page)
768                        } else {
769                            NextAddress::Address(next_address)
770                        };
771
772                        // Record that the current item hasn't been popped (yet)
773                        if self.previous_item_states == PreviousItemStates::AllPopped {
774                            self.previous_item_states = PreviousItemStates::AllButCurrentPopped;
775                        }
776
777                        // Return the item we found
778                        self.storage.inner.cache.unmark_dirty();
779                        return Ok(Some((item.unborrow(), found_item_address)));
780                    }
781                }
782            } else {
783                self.next_address = NextAddress::PageAfter(current_page);
784            }
785        }
786    }
787}
788
789/// An entry in the iteration over the queue flash
790pub struct QueueIteratorEntry<'s, 'd, 'q, S: NorFlash, CI: CacheImpl> {
791    iter: &'q mut QueueIterator<'s, S, CI>,
792    address: u32,
793    item: Item<'d>,
794}
795
796impl<S: NorFlash, CI: CacheImpl> Deref for QueueIteratorEntry<'_, '_, '_, S, CI> {
797    type Target = [u8];
798
799    fn deref(&self) -> &Self::Target {
800        self.item.data()
801    }
802}
803
804impl<S: NorFlash, CI: CacheImpl> DerefMut for QueueIteratorEntry<'_, '_, '_, S, CI> {
805    fn deref_mut(&mut self) -> &mut Self::Target {
806        self.item.data_mut()
807    }
808}
809
810impl<'d, S: NorFlash, CI: CacheImpl> QueueIteratorEntry<'_, 'd, '_, S, CI> {
811    /// Get a mutable reference to the data of this entry, but consume the entry too.
812    /// This function has some relaxed lifetime constraints compared to the deref impls.
813    #[must_use]
814    pub fn into_buf(self) -> &'d mut [u8] {
815        self.item.data_owned()
816    }
817
818    /// Pop the data in flash that corresponds to this entry. This makes it so
819    /// future peeks won't find this data anymore.
820    pub async fn pop(self) -> Result<&'d mut [u8], Error<S::Error>>
821    where
822        S: MultiwriteNorFlash,
823    {
824        let (header, item_data_buffer) = self.item.header_and_data_owned();
825
826        // We're popping ourself, so if all previous but us were popped, then now all are popped again
827        if self.iter.previous_item_states == PreviousItemStates::AllButCurrentPopped {
828            self.iter.previous_item_states = PreviousItemStates::AllPopped;
829        }
830
831        header
832            .erase_data(
833                &mut self.iter.storage.inner.flash,
834                self.iter.storage.inner.flash_range.clone(),
835                &mut self.iter.storage.inner.cache,
836                self.address,
837            )
838            .await?;
839
840        self.iter.storage.inner.cache.unmark_dirty();
841        Ok(item_data_buffer)
842    }
843
844    /// Get the flash address of the item
845    #[cfg(feature = "_test")]
846    pub fn address(&self) -> u32 {
847        self.address
848    }
849}
850
851#[cfg(test)]
852mod tests {
853    use crate::{
854        AlignedBuf,
855        cache::NoCache,
856        mock_flash::{self, FlashAverageStatsResult, FlashStatsResult, WriteCountCheck},
857    };
858
859    use super::*;
860    use futures_test::test;
861
862    type MockFlashBig = mock_flash::MockFlashBase<4, 4, 256>;
863    type MockFlashTiny = mock_flash::MockFlashBase<2, 1, 32>;
864
865    #[test]
866    async fn peek_and_overwrite_old_data() {
867        let mut storage = QueueStorage::new(
868            MockFlashTiny::new(WriteCountCheck::Twice, None, true),
869            const { QueueConfig::new(0x00..0x40) },
870            NoCache::new(),
871        );
872        let mut data_buffer = AlignedBuf([0; 1024]);
873        const DATA_SIZE: usize = 22;
874
875        assert_eq!(storage.space_left().await.unwrap(), 60);
876
877        assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
878
879        data_buffer[..DATA_SIZE].copy_from_slice(&[0xAA; DATA_SIZE]);
880        storage
881            .push(&data_buffer[..DATA_SIZE], false)
882            .await
883            .unwrap();
884
885        assert_eq!(storage.space_left().await.unwrap(), 30);
886
887        assert_eq!(
888            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
889            &[0xAA; DATA_SIZE]
890        );
891        data_buffer[..DATA_SIZE].copy_from_slice(&[0xBB; DATA_SIZE]);
892        storage
893            .push(&data_buffer[..DATA_SIZE], false)
894            .await
895            .unwrap();
896
897        assert_eq!(storage.space_left().await.unwrap(), 0);
898
899        assert_eq!(
900            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
901            &[0xAA; DATA_SIZE]
902        );
903
904        // Flash is full, this should fail
905        data_buffer[..DATA_SIZE].copy_from_slice(&[0xCC; DATA_SIZE]);
906        storage
907            .push(&data_buffer[..DATA_SIZE], false)
908            .await
909            .unwrap_err();
910        // Now we allow overwrite, so it should work
911        data_buffer[..DATA_SIZE].copy_from_slice(&[0xDD; DATA_SIZE]);
912        storage.push(&data_buffer[..DATA_SIZE], true).await.unwrap();
913
914        assert_eq!(
915            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
916            &[0xBB; DATA_SIZE]
917        );
918        assert_eq!(
919            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
920            &[0xBB; DATA_SIZE]
921        );
922
923        assert_eq!(storage.space_left().await.unwrap(), 30);
924
925        assert_eq!(
926            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
927            &[0xDD; DATA_SIZE]
928        );
929        assert_eq!(
930            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
931            &[0xDD; DATA_SIZE]
932        );
933
934        assert_eq!(storage.space_left().await.unwrap(), 60);
935
936        assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
937        assert_eq!(storage.pop(&mut data_buffer).await.unwrap(), None);
938    }
939
940    #[test]
941    async fn push_pop() {
942        let mut storage = QueueStorage::new(
943            MockFlashBig::new(WriteCountCheck::Twice, None, true),
944            const { QueueConfig::new(0x000..0x1000) },
945            NoCache::new(),
946        );
947
948        let mut data_buffer = AlignedBuf([0; 1024]);
949
950        for i in 0..2000 {
951            println!("{i}");
952            let data = vec![i as u8; i % 512 + 1];
953
954            storage.push(&data, true).await.unwrap();
955            assert_eq!(
956                storage.peek(&mut data_buffer).await.unwrap().unwrap(),
957                &data,
958                "At {i}"
959            );
960            assert_eq!(
961                storage.pop(&mut data_buffer).await.unwrap().unwrap(),
962                &data,
963                "At {i}"
964            );
965            assert_eq!(
966                storage.peek(&mut data_buffer).await.unwrap(),
967                None,
968                "At {i}"
969            );
970        }
971    }
972
973    #[test]
974    async fn push_pop_tiny() {
975        let mut storage = QueueStorage::new(
976            MockFlashTiny::new(WriteCountCheck::Twice, None, true),
977            const { QueueConfig::new(0x00..0x40) },
978            NoCache::new(),
979        );
980        let mut data_buffer = AlignedBuf([0; 1024]);
981
982        for i in 0..2000 {
983            println!("{i}");
984            let data = vec![i as u8; i % 20 + 1];
985
986            println!("PUSH");
987            storage.push(&data, true).await.unwrap();
988            assert_eq!(
989                storage.peek(&mut data_buffer).await.unwrap().unwrap(),
990                &data,
991                "At {i}"
992            );
993            println!("POP");
994            assert_eq!(
995                storage.pop(&mut data_buffer).await.unwrap().unwrap(),
996                &data,
997                "At {i}"
998            );
999            println!("PEEK");
1000            assert_eq!(
1001                storage.peek(&mut data_buffer).await.unwrap(),
1002                None,
1003                "At {i}"
1004            );
1005            println!("DONE");
1006        }
1007    }
1008
1009    #[test]
1010    /// Same as [push_lots_then_pop_lots], except with added peeking and using the iterator style
1011    async fn push_peek_pop_many() {
1012        let mut storage = QueueStorage::new(
1013            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1014            const { QueueConfig::new(0x000..0x1000) },
1015            NoCache::new(),
1016        );
1017        let mut data_buffer = AlignedBuf([0; 1024]);
1018
1019        let mut push_stats = FlashStatsResult::default();
1020        let mut pushes = 0;
1021        let mut peek_stats = FlashStatsResult::default();
1022        let mut peeks = 0;
1023        let mut pop_stats = FlashStatsResult::default();
1024        let mut pops = 0;
1025
1026        for loop_index in 0..100 {
1027            println!("Loop index: {loop_index}");
1028
1029            for i in 0..20 {
1030                let start_snapshot = storage.flash().stats_snapshot();
1031                let data = vec![i as u8; 50];
1032                storage.push(&data, false).await.unwrap();
1033                pushes += 1;
1034                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1035            }
1036
1037            let start_snapshot = storage.flash().stats_snapshot();
1038            let mut iterator = storage.iter().await.unwrap();
1039            peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1040            for i in 0..5 {
1041                let start_snapshot = iterator.storage.flash().stats_snapshot();
1042                let data = [i as u8; 50];
1043                assert_eq!(
1044                    iterator
1045                        .next(&mut data_buffer)
1046                        .await
1047                        .unwrap()
1048                        .unwrap()
1049                        .deref(),
1050                    &data[..],
1051                    "At {i}"
1052                );
1053                peeks += 1;
1054                peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1055            }
1056
1057            let start_snapshot = storage.flash().stats_snapshot();
1058            let mut iterator = storage.iter().await.unwrap();
1059            pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1060            for i in 0..5 {
1061                let start_snapshot = iterator.storage.flash().stats_snapshot();
1062                let data = vec![i as u8; 50];
1063                assert_eq!(
1064                    iterator
1065                        .next(&mut data_buffer)
1066                        .await
1067                        .unwrap()
1068                        .unwrap()
1069                        .pop()
1070                        .await
1071                        .unwrap(),
1072                    &data,
1073                    "At {i}"
1074                );
1075                pops += 1;
1076                pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1077            }
1078
1079            for i in 20..25 {
1080                let start_snapshot = storage.flash().stats_snapshot();
1081                let data = vec![i as u8; 50];
1082                storage.push(&data, false).await.unwrap();
1083                pushes += 1;
1084                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1085            }
1086
1087            let start_snapshot = storage.flash().stats_snapshot();
1088            let mut iterator = storage.iter().await.unwrap();
1089            peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1090            for i in 5..25 {
1091                let start_snapshot = iterator.storage.flash().stats_snapshot();
1092                let data = vec![i as u8; 50];
1093                assert_eq!(
1094                    iterator
1095                        .next(&mut data_buffer)
1096                        .await
1097                        .unwrap()
1098                        .unwrap()
1099                        .deref(),
1100                    &data,
1101                    "At {i}"
1102                );
1103                peeks += 1;
1104                peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1105            }
1106
1107            let start_snapshot = storage.flash().stats_snapshot();
1108            let mut iterator = storage.iter().await.unwrap();
1109            pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1110            for i in 5..25 {
1111                let start_snapshot = iterator.storage.flash().stats_snapshot();
1112                let data = vec![i as u8; 50];
1113                assert_eq!(
1114                    iterator
1115                        .next(&mut data_buffer)
1116                        .await
1117                        .unwrap()
1118                        .unwrap()
1119                        .pop()
1120                        .await
1121                        .unwrap(),
1122                    &data,
1123                    "At {i}"
1124                );
1125                pops += 1;
1126                pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1127            }
1128        }
1129
1130        // Assert the performance. These numbers can be changed if acceptable.
1131        approx::assert_relative_eq!(
1132            push_stats.take_average(pushes),
1133            FlashAverageStatsResult {
1134                avg_erases: 0.0,
1135                avg_reads: 16.864,
1136                avg_writes: 3.1252,
1137                avg_bytes_read: 105.4112,
1138                avg_bytes_written: 60.5008
1139            }
1140        );
1141        approx::assert_relative_eq!(
1142            peek_stats.take_average(peeks),
1143            FlashAverageStatsResult {
1144                avg_erases: 0.0052,
1145                avg_reads: 3.8656,
1146                avg_writes: 0.0,
1147                avg_bytes_read: 70.4256,
1148                avg_bytes_written: 0.0
1149            }
1150        );
1151        approx::assert_relative_eq!(
1152            pop_stats.take_average(pops),
1153            FlashAverageStatsResult {
1154                avg_erases: 0.0572,
1155                avg_reads: 3.7772,
1156                avg_writes: 1.0,
1157                avg_bytes_read: 69.7184,
1158                avg_bytes_written: 8.0
1159            }
1160        );
1161    }
1162
1163    #[test]
1164    async fn push_lots_then_pop_lots() {
1165        let mut storage = QueueStorage::new(
1166            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1167            const { QueueConfig::new(0x000..0x1000) },
1168            NoCache::new(),
1169        );
1170        let mut data_buffer = AlignedBuf([0; 1024]);
1171
1172        let mut push_stats = FlashStatsResult::default();
1173        let mut pushes = 0;
1174        let mut pop_stats = FlashStatsResult::default();
1175        let mut pops = 0;
1176
1177        for loop_index in 0..100 {
1178            println!("Loop index: {loop_index}");
1179
1180            for i in 0..20 {
1181                let start_snapshot = storage.flash().stats_snapshot();
1182                let data = vec![i as u8; 50];
1183                storage.push(&data, false).await.unwrap();
1184                pushes += 1;
1185                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1186            }
1187
1188            for i in 0..5 {
1189                let start_snapshot = storage.flash().stats_snapshot();
1190                let data = vec![i as u8; 50];
1191                assert_eq!(
1192                    storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1193                    &data,
1194                    "At {i}"
1195                );
1196                pops += 1;
1197                pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1198            }
1199
1200            for i in 20..25 {
1201                let start_snapshot = storage.flash().stats_snapshot();
1202                let data = vec![i as u8; 50];
1203                storage.push(&data, false).await.unwrap();
1204                pushes += 1;
1205                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1206            }
1207
1208            for i in 5..25 {
1209                let start_snapshot = storage.flash().stats_snapshot();
1210                let data = vec![i as u8; 50];
1211                assert_eq!(
1212                    storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1213                    &data,
1214                    "At {i}"
1215                );
1216                pops += 1;
1217                pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1218            }
1219        }
1220
1221        // Assert the performance. These numbers can be changed if acceptable.
1222        approx::assert_relative_eq!(
1223            push_stats.take_average(pushes),
1224            FlashAverageStatsResult {
1225                avg_erases: 0.0,
1226                avg_reads: 16.864,
1227                avg_writes: 3.1252,
1228                avg_bytes_read: 105.4112,
1229                avg_bytes_written: 60.5008
1230            }
1231        );
1232        approx::assert_relative_eq!(
1233            pop_stats.take_average(pops),
1234            FlashAverageStatsResult {
1235                avg_erases: 0.0624,
1236                avg_reads: 23.5768,
1237                avg_writes: 1.0,
1238                avg_bytes_read: 180.512,
1239                avg_bytes_written: 8.0
1240            }
1241        );
1242    }
1243
1244    #[test]
1245    async fn pop_with_empty_section() {
1246        let mut storage = QueueStorage::new(
1247            MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1248            const { QueueConfig::new(0x00..0x40) },
1249            NoCache::new(),
1250        );
1251        let mut data_buffer = AlignedBuf([0; 1024]);
1252
1253        data_buffer[..20].copy_from_slice(&[0xAA; 20]);
1254        storage.push(&data_buffer[0..20], false).await.unwrap();
1255        data_buffer[..20].copy_from_slice(&[0xBB; 20]);
1256        storage.push(&data_buffer[0..20], false).await.unwrap();
1257
1258        // There's now an unused gap at the end of the first page
1259
1260        assert_eq!(
1261            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1262            &[0xAA; 20]
1263        );
1264
1265        assert_eq!(
1266            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1267            &[0xBB; 20]
1268        );
1269    }
1270
1271    #[test]
1272    async fn search_pages() {
1273        let mut storage = QueueStorage::new(
1274            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1275            const { QueueConfig::new(0x000..0x1000) },
1276            NoCache::new(),
1277        );
1278
1279        storage.inner.close_page(0).await.unwrap();
1280        storage.inner.close_page(1).await.unwrap();
1281        storage.inner.partial_close_page(2).await.unwrap();
1282
1283        assert_eq!(storage.find_youngest_page().await.unwrap(), 2);
1284        assert_eq!(storage.find_oldest_page().await.unwrap(), 0);
1285    }
1286
1287    #[test]
1288    async fn store_too_big_item() {
1289        let mut storage = QueueStorage::new(
1290            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1291            const { QueueConfig::new(0x000..0x1000) },
1292            NoCache::new(),
1293        );
1294
1295        storage
1296            .push(&AlignedBuf([0; 1024 - 4 * 2 - 8]), false)
1297            .await
1298            .unwrap();
1299
1300        assert_eq!(
1301            storage
1302                .push(&AlignedBuf([0; 1024 - 4 * 2 - 8 + 1]), false,)
1303                .await,
1304            Err(Error::ItemTooBig)
1305        );
1306    }
1307
1308    #[test]
1309    async fn push_on_single_page() {
1310        let mut storage = QueueStorage::new(
1311            mock_flash::MockFlashBase::<1, 4, 256>::new(WriteCountCheck::Twice, None, true),
1312            const { QueueConfig::new(0x000..0x400) },
1313            NoCache::new(),
1314        );
1315
1316        for _ in 0..100 {
1317            match storage.push(&[0, 1, 2, 3, 4], true).await {
1318                Ok(_) => {}
1319                Err(e) => {
1320                    println!("{}", storage.print_items().await);
1321                    panic!("{e}");
1322                }
1323            }
1324        }
1325    }
1326}