Skip to main content

sequential_storage/queue/
mod.rs

1//! Implementation of the queue logic.
2
3pub mod buffered;
4
5use crate::item::{Item, ItemHeader, ItemHeaderIter};
6
7use self::{cache::CacheImpl, item::ItemUnborrowed};
8
9use super::{
10    Debug, Deref, DerefMut, Error, GenericStorage, MAX_WORD_SIZE, NorFlash, NorFlashExt, PageState,
11    PhantomData, Range, cache, calculate_page_address, calculate_page_end_address,
12    calculate_page_index, calculate_page_size, item, run_with_auto_repair,
13};
14use embedded_storage_async::nor_flash::MultiwriteNorFlash;
15
16/// Configuration for a queue
17pub struct QueueConfig<S> {
18    flash_range: Range<u32>,
19    _phantom: PhantomData<S>,
20}
21
22impl<S: NorFlash> QueueConfig<S> {
23    /// Create a new queue configuration. Will panic if the data is invalid.
24    /// If you want a fallible version, use [`Self::try_new`].
25    #[must_use]
26    pub const fn new(flash_range: Range<u32>) -> Self {
27        Self::try_new(flash_range).expect("Queue config must be correct")
28    }
29
30    /// Create a new queue configuration. Will return None if the data is invalid
31    #[must_use]
32    pub const fn try_new(flash_range: Range<u32>) -> Option<Self> {
33        if !flash_range.start.is_multiple_of(S::ERASE_SIZE as u32) {
34            return None;
35        }
36        if !flash_range.end.is_multiple_of(S::ERASE_SIZE as u32) {
37            return None;
38        }
39        // At least 1 page is used
40        if flash_range.end - flash_range.start < (S::ERASE_SIZE as u32) {
41            return None;
42        }
43
44        if S::ERASE_SIZE < S::WORD_SIZE * 4 {
45            return None;
46        }
47        if S::WORD_SIZE > MAX_WORD_SIZE {
48            return None;
49        }
50
51        Some(Self {
52            flash_range,
53            _phantom: PhantomData,
54        })
55    }
56}
57
58/// A fifo queue storage
59///
60/// Use [`Self::push`] to add data to the fifo and use [`Self::peek`] and [`Self::pop`] to get the data back.
61///
62/// ## Basic API
63///
64/// ```rust
65/// # use sequential_storage::cache::NoCache;
66/// # use sequential_storage::queue::{QueueConfig, QueueStorage};
67/// # use mock_flash::MockFlashBase;
68/// # use futures::executor::block_on;
69/// # type Flash = MockFlashBase<10, 1, 4096>;
70/// # mod mock_flash {
71/// #   include!("../mock_flash.rs");
72/// # }
73/// #
74/// # fn init_flash() -> Flash {
75/// #     Flash::new(mock_flash::WriteCountCheck::Twice, None, false)
76/// # }
77/// #
78/// # block_on(async {
79///
80/// // Initialize the flash. This can be internal or external
81/// let mut flash = init_flash();
82///
83/// let mut storage = QueueStorage::new(flash, const { QueueConfig::new(0x1000..0x3000) }, NoCache::new());
84/// // We need to give the crate a buffer to work with.
85/// // It must be big enough to serialize the biggest value of your storage type in.
86/// let mut data_buffer = [0; 128];
87///
88/// let my_data = [10, 47, 29];
89///
90/// // We can push some data to the queue
91/// storage.push(&my_data, false).await.unwrap();
92///
93/// // We can peek at the oldest data
94///
95/// assert_eq!(
96///     &storage.peek(&mut data_buffer).await.unwrap().unwrap()[..],
97///     &my_data[..]
98/// );
99///
100/// // With popping we get back the oldest data, but that data is now also removed
101///
102/// assert_eq!(
103///     &storage.pop(&mut data_buffer).await.unwrap().unwrap()[..],
104///     &my_data[..]
105/// );
106///
107/// // If we pop again, we find there's no data anymore
108///
109/// assert_eq!(
110///     storage.pop(&mut data_buffer).await,
111///     Ok(None)
112/// );
113/// # });
114/// ```
115pub struct QueueStorage<S: NorFlash, C: CacheImpl> {
116    inner: GenericStorage<S, C>,
117}
118
119impl<S: NorFlash, C: CacheImpl> QueueStorage<S, C> {
120    /// Create a new (fifo) queue instance
121    ///
122    /// The provided cache instance must be new or must be in the exact correct state for the current flash contents.
123    /// If the cache is bad, undesirable things will happen.
124    /// So, it's ok to reuse the cache gotten from the [`Self::destroy`] method when the flash hasn't changed since calling destroy.
125    pub const fn new(storage: S, config: QueueConfig<S>, cache: C) -> Self {
126        Self {
127            inner: GenericStorage {
128                flash: storage,
129                flash_range: config.flash_range,
130                cache,
131            },
132        }
133    }
134
135    /// Push data into the queue.
136    /// The data can only be taken out with the [`Self::pop`] function.
137    ///
138    /// Old data will not be overwritten unless `allow_overwrite_old_data` is true.
139    /// If it is, then if the queue is full, the oldest data is removed to make space for the new data.
140    ///
141    /// *Note: If a page is already used and you push more data than the remaining capacity of the page,
142    /// the entire remaining capacity will go unused because the data is stored on the next page.*
143    pub async fn push(
144        &mut self,
145        data: &[u8],
146        allow_overwrite_old_data: bool,
147    ) -> Result<(), Error<S::Error>> {
148        run_with_auto_repair!(
149            function = self.push_inner(data, allow_overwrite_old_data).await,
150            repair = self.try_repair().await?
151        )
152    }
153
154    async fn push_inner(
155        &mut self,
156        data: &[u8],
157        allow_overwrite_old_data: bool,
158    ) -> Result<(), Error<S::Error>> {
159        if self.inner.cache.is_dirty() {
160            self.inner.cache.invalidate_cache_state();
161        }
162
163        // Data must fit in a single page
164        if data.len() > u16::MAX as usize
165            || data.len()
166                > calculate_page_size::<S>()
167                    .saturating_sub(ItemHeader::data_address::<S>(0) as usize)
168        {
169            self.inner.cache.unmark_dirty();
170            return Err(Error::ItemTooBig);
171        }
172
173        let current_page = self.find_youngest_page().await?;
174
175        let page_data_start_address =
176            calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
177        let page_data_end_address =
178            calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
179
180        self.inner.partial_close_page(current_page).await?;
181
182        // Find the last item on the page so we know where we need to write
183
184        let mut next_address = self
185            .inner
186            .find_next_free_item_spot(
187                page_data_start_address,
188                page_data_end_address,
189                data.len() as u32,
190            )
191            .await?;
192
193        if next_address.is_none() {
194            // No cap left on this page, move to the next page
195            let next_page = self.inner.next_page(current_page);
196            let next_page_state = self.inner.get_page_state(next_page).await?;
197            let single_page = next_page == current_page;
198
199            match (next_page_state, single_page) {
200                (PageState::Open, _) => {
201                    self.inner.close_page(current_page).await?;
202                    self.inner.partial_close_page(next_page).await?;
203                    next_address = Some(
204                        calculate_page_address::<S>(self.flash_range(), next_page)
205                            + S::WORD_SIZE as u32,
206                    );
207                }
208                (PageState::Closed, _) | (PageState::PartialOpen, true) => {
209                    let next_page_data_start_address =
210                        calculate_page_address::<S>(self.flash_range(), next_page)
211                            + S::WORD_SIZE as u32;
212
213                    if !allow_overwrite_old_data
214                        && !self
215                            .inner
216                            .is_page_empty(next_page, Some(next_page_state))
217                            .await?
218                    {
219                        self.inner.cache.unmark_dirty();
220                        return Err(Error::FullStorage);
221                    }
222
223                    self.inner.open_page(next_page).await?;
224                    if !single_page {
225                        self.inner.close_page(current_page).await?;
226                    }
227                    self.inner.partial_close_page(next_page).await?;
228                    next_address = Some(next_page_data_start_address);
229                }
230                (PageState::PartialOpen, false) => {
231                    // This should never happen
232                    return Err(Error::Corrupted {
233                        #[cfg(feature = "_test")]
234                        backtrace: std::backtrace::Backtrace::capture(),
235                    });
236                }
237            }
238        }
239
240        Item::write_new(
241            &mut self.inner.flash,
242            self.inner.flash_range.clone(),
243            &mut self.inner.cache,
244            next_address.unwrap(),
245            data,
246        )
247        .await?;
248
249        self.inner.cache.unmark_dirty();
250        Ok(())
251    }
252
253    /// Get an iterator-like interface to iterate over the items stored in the queue.
254    /// This goes from oldest to newest.
255    ///
256    /// The iteration happens non-destructively, or in other words it peeks at every item.
257    /// The returned entry has a [`QueueIteratorEntry::pop`] function with which you can decide to pop the item
258    /// after you've seen the contents.
259    pub async fn iter(&mut self) -> Result<QueueIterator<'_, S, C>, Error<S::Error>> {
260        // Note: Corruption repair is done in these functions already
261        QueueIterator::new(self).await
262    }
263
264    /// Peek at the oldest data.
265    ///
266    /// If you also want to remove the data use [`Self::pop`].
267    ///
268    /// The data is written to the given `data_buffer` and the part that was written is returned.
269    /// It is valid to only use the length of the returned slice and use the original `data_buffer`.
270    /// The `data_buffer` may contain extra data on ranges after the returned slice.
271    /// You should not depend on that data.
272    ///
273    /// If the data buffer is not big enough an error is returned.
274    pub async fn peek<'d>(
275        &mut self,
276        data_buffer: &'d mut [u8],
277    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
278        // Note: Corruption repair is done in these functions already
279        let mut iterator = self.iter().await?;
280
281        let next_value = iterator.next(data_buffer).await?;
282
283        match next_value {
284            Some(entry) => Ok(Some(entry.into_buf())),
285            None => Ok(None),
286        }
287    }
288
289    /// Pop the oldest data from the queue.
290    ///
291    /// If you don't want to remove the data use [`Self::peek`].
292    ///
293    /// The data is written to the given `data_buffer` and the part that was written is returned.
294    /// It is valid to only use the length of the returned slice and use the original `data_buffer`.
295    /// The `data_buffer` may contain extra data on ranges after the returned slice.
296    /// You should not depend on that data.
297    ///
298    /// If the data buffer is not big enough an error is returned.
299    pub async fn pop<'d>(
300        &mut self,
301        data_buffer: &'d mut [u8],
302    ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
303    where
304        S: MultiwriteNorFlash,
305    {
306        let mut iterator = self.iter().await?;
307
308        let next_value = iterator.next(data_buffer).await?;
309
310        match next_value {
311            Some(entry) => Ok(Some(entry.pop().await?)),
312            None => Ok(None),
313        }
314    }
315
316    /// Find the largest size of data that can be stored.
317    ///
318    /// This will read through the entire flash to find the largest chunk of
319    /// data that can be stored, taking alignment requirements of the item into account.
320    ///
321    /// If there is no space left, `None` is returned.
322    pub async fn find_max_fit(&mut self) -> Result<Option<u32>, Error<S::Error>> {
323        run_with_auto_repair!(
324            function = self.find_max_fit_inner().await,
325            repair = self.try_repair().await?
326        )
327    }
328
329    async fn find_max_fit_inner(&mut self) -> Result<Option<u32>, Error<S::Error>> {
330        if self.inner.cache.is_dirty() {
331            self.inner.cache.invalidate_cache_state();
332        }
333
334        let current_page = self.find_youngest_page().await?;
335
336        // Check if we have space on the next page
337        let next_page = self.inner.next_page(current_page);
338        match self.inner.get_page_state(next_page).await? {
339            state @ PageState::Closed => {
340                if self.inner.is_page_empty(next_page, Some(state)).await? {
341                    self.inner.cache.unmark_dirty();
342                    return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
343                }
344            }
345            PageState::Open => {
346                self.inner.cache.unmark_dirty();
347                return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
348            }
349            PageState::PartialOpen => {
350                // This should never happen
351                return Err(Error::Corrupted {
352                    #[cfg(feature = "_test")]
353                    backtrace: std::backtrace::Backtrace::capture(),
354                });
355            }
356        }
357
358        // See how much space we can find in the current page.
359        let page_data_start_address =
360            calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
361        let page_data_end_address =
362            calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
363
364        let next_item_address = match self.inner.cache.first_item_after_written(current_page) {
365            Some(next_item_address) => next_item_address,
366            None => {
367                ItemHeaderIter::new(
368                    self.inner
369                        .cache
370                        .first_item_after_erased(current_page)
371                        .unwrap_or(page_data_start_address),
372                    page_data_end_address,
373                )
374                .traverse(&mut self.inner.flash, |_, _| true)
375                .await?
376                .1
377            }
378        };
379
380        self.inner.cache.unmark_dirty();
381        Ok(ItemHeader::available_data_bytes::<S>(
382            page_data_end_address - next_item_address,
383        ))
384    }
385
386    /// Calculate how much space is left free in the queue (in bytes).
387    ///
388    /// The number given back is accurate, however there are lots of things that add overhead and padding.
389    /// Every push is an item with its own overhead. You can check the overhead per item with [`Self::item_overhead_size`].
390    ///
391    /// Furthermore, every item has to fully fit in a page. So if a page has 50 bytes left and you push an item of 60 bytes,
392    /// the current page is closed and the item is stored on the next page, 'wasting' the 50 you had.
393    ///
394    /// So unless you're tracking all this, the returned number should only be used as a rough indication.
395    pub async fn space_left(&mut self) -> Result<u32, Error<S::Error>> {
396        run_with_auto_repair!(
397            function = self.space_left_inner().await,
398            repair = self.try_repair().await?
399        )
400    }
401
402    async fn space_left_inner(&mut self) -> Result<u32, Error<S::Error>> {
403        if self.inner.cache.is_dirty() {
404            self.inner.cache.invalidate_cache_state();
405        }
406
407        let mut total_free_space = 0;
408
409        for page in self.inner.get_pages(0) {
410            let state = self.inner.get_page_state(page).await?;
411            let page_empty = self.inner.is_page_empty(page, Some(state)).await?;
412
413            if state.is_closed() && !page_empty {
414                continue;
415            }
416
417            // See how much space we can find in the current page.
418            let page_data_start_address =
419                calculate_page_address::<S>(self.flash_range(), page) + S::WORD_SIZE as u32;
420            let page_data_end_address =
421                calculate_page_end_address::<S>(self.flash_range(), page) - S::WORD_SIZE as u32;
422
423            if page_empty {
424                total_free_space += page_data_end_address - page_data_start_address;
425                continue;
426            }
427
428            // Partial open page
429            let next_item_address = match self.inner.cache.first_item_after_written(page) {
430                Some(next_item_address) => next_item_address,
431                None => {
432                    ItemHeaderIter::new(
433                        self.inner
434                            .cache
435                            .first_item_after_erased(page)
436                            .unwrap_or(page_data_start_address),
437                        page_data_end_address,
438                    )
439                    .traverse(&mut self.inner.flash, |_, _| true)
440                    .await?
441                    .1
442                }
443            };
444
445            if ItemHeader::available_data_bytes::<S>(page_data_end_address - next_item_address)
446                .is_none()
447            {
448                // No data fits on this partial open page anymore.
449                // So if all data on this is already erased, then this page might as well be counted as empty.
450                // We can use [is_page_empty] and lie to to it so it checks the items.
451                if self
452                    .inner
453                    .is_page_empty(page, Some(PageState::Closed))
454                    .await?
455                {
456                    total_free_space += page_data_end_address - page_data_start_address;
457                    continue;
458                }
459            }
460
461            total_free_space += page_data_end_address - next_item_address;
462        }
463
464        self.inner.cache.unmark_dirty();
465        Ok(total_free_space)
466    }
467
468    async fn find_youngest_page(&mut self) -> Result<usize, Error<S::Error>> {
469        let last_used_page = self
470            .inner
471            .find_first_page(0, PageState::PartialOpen)
472            .await?;
473
474        if let Some(last_used_page) = last_used_page {
475            return Ok(last_used_page);
476        }
477
478        // We have no partial open page. Search for a closed page to anker ourselves to
479        let first_closed_page = self.inner.find_first_page(0, PageState::Closed).await?;
480
481        let first_open_page = match first_closed_page {
482            Some(anchor) => {
483                // We have at least one closed page
484                // The first one after is the page we need to use
485                self.inner.find_first_page(anchor, PageState::Open).await?
486            }
487            None => {
488                // No closed pages and no partial open pages, so all pages should be open
489                // Might as well start at page 0
490                Some(0)
491            }
492        };
493
494        if let Some(first_open_page) = first_open_page {
495            return Ok(first_open_page);
496        }
497
498        // All pages are closed... This is not correct.
499        Err(Error::Corrupted {
500            #[cfg(feature = "_test")]
501            backtrace: std::backtrace::Backtrace::capture(),
502        })
503    }
504
505    async fn find_oldest_page(&mut self) -> Result<usize, Error<S::Error>> {
506        let youngest_page = self.find_youngest_page().await?;
507
508        // The oldest page is the first non-open page after the youngest page
509        let oldest_closed_page = self
510            .inner
511            .find_first_page(youngest_page, PageState::Closed)
512            .await?;
513
514        Ok(oldest_closed_page.unwrap_or(youngest_page))
515    }
516
517    /// Try to repair the state of the flash to hopefull get back everything in working order.
518    /// Care is taken that no data is lost, but this depends on correctly repairing the state and
519    /// so is only best effort.
520    ///
521    /// This function might be called after a different function returned the [`Error::Corrupted`] error.
522    /// There's no guarantee it will work.
523    ///
524    /// If this function or the function call after this crate returns [`Error::Corrupted`], then it's unlikely
525    /// that the state can be recovered. To at least make everything function again at the cost of losing the data,
526    /// erase the flash range.
527    async fn try_repair(&mut self) -> Result<(), Error<S::Error>> {
528        self.inner.cache.invalidate_cache_state();
529
530        self.inner.try_general_repair().await?;
531        Ok(())
532    }
533
534    async fn find_start_address(&mut self) -> Result<NextAddress, Error<S::Error>> {
535        if self.inner.cache.is_dirty() {
536            self.inner.cache.invalidate_cache_state();
537        }
538
539        let oldest_page = self.find_oldest_page().await?;
540
541        // We start at the start of the oldest page
542        let current_address = match self.inner.cache.first_item_after_erased(oldest_page) {
543            Some(address) => address,
544            None => {
545                calculate_page_address::<S>(self.inner.flash_range.clone(), oldest_page)
546                    + S::WORD_SIZE as u32
547            }
548        };
549
550        Ok(NextAddress::Address(current_address))
551    }
552
553    /// Resets the flash in the entire given flash range.
554    ///
555    /// This is just a thin helper function as it just calls the flash's erase function.
556    pub fn erase_all(&mut self) -> impl Future<Output = Result<(), Error<S::Error>>> {
557        self.inner.erase_all()
558    }
559
560    /// Get the minimal overhead size per stored item for the given flash type.
561    ///
562    /// The associated data of each item is additionally padded to a full flash word size, but that's not part of this number.\
563    /// This means the full item length is `returned number + (data length).next_multiple_of(S::WORD_SIZE)`.
564    #[must_use]
565    pub const fn item_overhead_size() -> u32 {
566        GenericStorage::<S, C>::item_overhead_size()
567    }
568
569    /// Destroy the instance to get back the flash and the cache.
570    ///
571    /// The cache can be passed to a new storage instance, but only for the same flash region and if nothing has changed in flash.
572    pub fn destroy(self) -> (S, C) {
573        self.inner.destroy()
574    }
575
576    /// Get a reference to the flash. Mutating the memory is at your own risk.
577    pub const fn flash(&mut self) -> &mut S {
578        self.inner.flash()
579    }
580
581    /// Get the flash range being used
582    pub const fn flash_range(&self) -> Range<u32> {
583        self.inner.flash_range()
584    }
585
586    #[cfg(any(test, feature = "std"))]
587    /// Print all items in flash to the returned string
588    ///
589    /// This is meant as a debugging utility. The string format is not stable.
590    pub fn print_items(&mut self) -> impl Future<Output = String> {
591        self.inner.print_items()
592    }
593}
594
595#[derive(PartialEq, Eq, Clone, Copy, Debug)]
596enum PreviousItemStates {
597    AllPopped,
598    AllButCurrentPopped,
599    Unpopped,
600}
601
602/// An iterator-like interface for peeking into data stored in flash with the option to pop it.
603pub struct QueueIterator<'s, S: NorFlash, C: CacheImpl> {
604    storage: &'s mut QueueStorage<S, C>,
605    next_address: NextAddress,
606    previous_item_states: PreviousItemStates,
607    oldest_page: usize,
608}
609
610impl<S: NorFlash, C: CacheImpl> Debug for QueueIterator<'_, S, C> {
611    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
612        f.debug_struct("QueueIterator")
613            .field("current_address", &self.next_address)
614            .finish_non_exhaustive()
615    }
616}
617
618#[derive(Debug, Clone, Copy)]
619enum NextAddress {
620    Address(u32),
621    PageAfter(usize),
622}
623
624impl<'s, S: NorFlash, C: CacheImpl> QueueIterator<'s, S, C> {
625    async fn new(storage: &'s mut QueueStorage<S, C>) -> Result<Self, Error<S::Error>> {
626        let start_address = run_with_auto_repair!(
627            function = storage.find_start_address().await,
628            repair = storage.try_repair().await?
629        )?;
630
631        let oldest_page = match start_address {
632            NextAddress::Address(address) => {
633                calculate_page_index::<S>(storage.inner.flash_range.clone(), address)
634            }
635            NextAddress::PageAfter(index) => storage.inner.next_page(index),
636        };
637
638        Ok(Self {
639            storage,
640            next_address: start_address,
641            previous_item_states: PreviousItemStates::AllPopped,
642            oldest_page,
643        })
644    }
645
646    /// Get the next entry.
647    ///
648    /// If there are no more entries, None is returned.
649    ///
650    /// The `data_buffer` has to be large enough to be able to hold the largest item in flash.
651    pub async fn next<'d, 'q>(
652        &'q mut self,
653        data_buffer: &'d mut [u8],
654    ) -> Result<Option<QueueIteratorEntry<'s, 'd, 'q, S, C>>, Error<S::Error>> {
655        // We continue from a place where the current item wasn't popped
656        // That means that from now on, the next item will have unpopped items behind it
657        if self.previous_item_states == PreviousItemStates::AllButCurrentPopped {
658            self.previous_item_states = PreviousItemStates::Unpopped;
659        }
660
661        let value = run_with_auto_repair!(
662            function = self.next_inner(data_buffer).await,
663            repair = self.storage.try_repair().await?
664        );
665
666        match value {
667            Ok(Some((item, address))) => Ok(Some(QueueIteratorEntry {
668                iter: self,
669                item: item.reborrow(data_buffer).ok_or_else(|| Error::LogicBug {
670                    #[cfg(feature = "_test")]
671                    backtrace: std::backtrace::Backtrace::capture(),
672                })?,
673                address,
674            })),
675            Ok(None) => Ok(None),
676            Err(e) => Err(e),
677        }
678    }
679
680    async fn next_inner(
681        &mut self,
682        data_buffer: &mut [u8],
683    ) -> Result<Option<(ItemUnborrowed, u32)>, Error<S::Error>> {
684        if self.storage.inner.cache.is_dirty() {
685            self.storage.inner.cache.invalidate_cache_state();
686        }
687
688        loop {
689            // Get the current page and address based on what was stored
690            let (current_page, current_address) = match self.next_address {
691                NextAddress::PageAfter(previous_page) => {
692                    let next_page = self.storage.inner.next_page(previous_page);
693                    if self
694                        .storage
695                        .inner
696                        .get_page_state(next_page)
697                        .await?
698                        .is_open()
699                        || next_page == self.oldest_page
700                    {
701                        self.storage.inner.cache.unmark_dirty();
702                        return Ok(None);
703                    }
704
705                    // We now know the previous page was left because there were no items on there anymore
706                    // If we know all those items were popped, we can proactively open the previous page
707                    // This is amazing for performance
708                    if self.previous_item_states == PreviousItemStates::AllPopped {
709                        self.storage.inner.open_page(previous_page).await?;
710                    }
711
712                    let current_address = calculate_page_address::<S>(
713                        self.storage.inner.flash_range.clone(),
714                        next_page,
715                    ) + S::WORD_SIZE as u32;
716
717                    self.next_address = NextAddress::Address(current_address);
718
719                    (next_page, current_address)
720                }
721                NextAddress::Address(address) => (
722                    calculate_page_index::<S>(self.storage.inner.flash_range.clone(), address),
723                    address,
724                ),
725            };
726
727            let page_data_end_address = calculate_page_end_address::<S>(
728                self.storage.inner.flash_range.clone(),
729                current_page,
730            ) - S::WORD_SIZE as u32;
731
732            // Search for the first item with data
733            let mut it = ItemHeaderIter::new(current_address, page_data_end_address);
734            // No need to worry about cache here since that has been dealt with at the creation of this iterator
735            if let (Some(found_item_header), found_item_address) = it
736                .traverse(&mut self.storage.inner.flash, |header, _| {
737                    header.crc.is_none()
738                })
739                .await?
740            {
741                let maybe_item = found_item_header
742                    .read_item(
743                        &mut self.storage.inner.flash,
744                        data_buffer,
745                        found_item_address,
746                        page_data_end_address,
747                    )
748                    .await?;
749
750                match maybe_item {
751                    item::MaybeItem::Corrupted(header, _) => {
752                        let next_address = header.next_item_address::<S>(found_item_address);
753                        self.next_address = if next_address >= page_data_end_address {
754                            NextAddress::PageAfter(current_page)
755                        } else {
756                            NextAddress::Address(next_address)
757                        };
758                    }
759                    item::MaybeItem::Erased(_, _) => {
760                        // Item is already erased
761                        return Err(Error::LogicBug {
762                            #[cfg(feature = "_test")]
763                            backtrace: std::backtrace::Backtrace::capture(),
764                        });
765                    }
766                    item::MaybeItem::Present(item) => {
767                        let next_address = item.header.next_item_address::<S>(found_item_address);
768                        self.next_address = if next_address >= page_data_end_address {
769                            NextAddress::PageAfter(current_page)
770                        } else {
771                            NextAddress::Address(next_address)
772                        };
773
774                        // Record that the current item hasn't been popped (yet)
775                        if self.previous_item_states == PreviousItemStates::AllPopped {
776                            self.previous_item_states = PreviousItemStates::AllButCurrentPopped;
777                        }
778
779                        // Return the item we found
780                        self.storage.inner.cache.unmark_dirty();
781                        return Ok(Some((item.unborrow(), found_item_address)));
782                    }
783                }
784            } else {
785                self.next_address = NextAddress::PageAfter(current_page);
786            }
787        }
788    }
789}
790
791/// An entry in the iteration over the queue flash
792pub struct QueueIteratorEntry<'s, 'd, 'q, S: NorFlash, CI: CacheImpl> {
793    iter: &'q mut QueueIterator<'s, S, CI>,
794    address: u32,
795    item: Item<'d>,
796}
797
798impl<S: NorFlash, CI: CacheImpl> Deref for QueueIteratorEntry<'_, '_, '_, S, CI> {
799    type Target = [u8];
800
801    fn deref(&self) -> &Self::Target {
802        self.item.data()
803    }
804}
805
806impl<S: NorFlash, CI: CacheImpl> DerefMut for QueueIteratorEntry<'_, '_, '_, S, CI> {
807    fn deref_mut(&mut self) -> &mut Self::Target {
808        self.item.data_mut()
809    }
810}
811
812impl<'d, S: NorFlash, CI: CacheImpl> QueueIteratorEntry<'_, 'd, '_, S, CI> {
813    /// Get a mutable reference to the data of this entry, but consume the entry too.
814    /// This function has some relaxed lifetime constraints compared to the deref impls.
815    #[must_use]
816    pub fn into_buf(self) -> &'d mut [u8] {
817        self.item.data_owned()
818    }
819
820    /// Pop the data in flash that corresponds to this entry. This makes it so
821    /// future peeks won't find this data anymore.
822    pub async fn pop(self) -> Result<&'d mut [u8], Error<S::Error>>
823    where
824        S: MultiwriteNorFlash,
825    {
826        let (header, item_data_buffer) = self.item.header_and_data_owned();
827
828        // We're popping ourself, so if all previous but us were popped, then now all are popped again
829        if self.iter.previous_item_states == PreviousItemStates::AllButCurrentPopped {
830            self.iter.previous_item_states = PreviousItemStates::AllPopped;
831        }
832
833        header
834            .erase_data(
835                &mut self.iter.storage.inner.flash,
836                self.iter.storage.inner.flash_range.clone(),
837                &mut self.iter.storage.inner.cache,
838                self.address,
839            )
840            .await?;
841
842        self.iter.storage.inner.cache.unmark_dirty();
843        Ok(item_data_buffer)
844    }
845
846    /// Get the flash address of the item
847    #[cfg(feature = "_test")]
848    pub fn address(&self) -> u32 {
849        self.address
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use crate::{
856        AlignedBuf,
857        cache::NoCache,
858        mock_flash::{self, FlashAverageStatsResult, FlashStatsResult, WriteCountCheck},
859    };
860
861    use super::*;
862    use futures_test::test;
863
864    type MockFlashBig = mock_flash::MockFlashBase<4, 4, 256>;
865    type MockFlashTiny = mock_flash::MockFlashBase<2, 1, 32>;
866
867    #[test]
868    async fn peek_and_overwrite_old_data() {
869        let mut storage = QueueStorage::new(
870            MockFlashTiny::new(WriteCountCheck::Twice, None, true),
871            const { QueueConfig::new(0x00..0x40) },
872            NoCache::new(),
873        );
874        let mut data_buffer = AlignedBuf([0; 1024]);
875        const DATA_SIZE: usize = 22;
876
877        assert_eq!(storage.space_left().await.unwrap(), 60);
878
879        assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
880
881        data_buffer[..DATA_SIZE].copy_from_slice(&[0xAA; DATA_SIZE]);
882        storage
883            .push(&data_buffer[..DATA_SIZE], false)
884            .await
885            .unwrap();
886
887        assert_eq!(storage.space_left().await.unwrap(), 30);
888
889        assert_eq!(
890            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
891            &[0xAA; DATA_SIZE]
892        );
893        data_buffer[..DATA_SIZE].copy_from_slice(&[0xBB; DATA_SIZE]);
894        storage
895            .push(&data_buffer[..DATA_SIZE], false)
896            .await
897            .unwrap();
898
899        assert_eq!(storage.space_left().await.unwrap(), 0);
900
901        assert_eq!(
902            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
903            &[0xAA; DATA_SIZE]
904        );
905
906        // Flash is full, this should fail
907        data_buffer[..DATA_SIZE].copy_from_slice(&[0xCC; DATA_SIZE]);
908        storage
909            .push(&data_buffer[..DATA_SIZE], false)
910            .await
911            .unwrap_err();
912        // Now we allow overwrite, so it should work
913        data_buffer[..DATA_SIZE].copy_from_slice(&[0xDD; DATA_SIZE]);
914        storage.push(&data_buffer[..DATA_SIZE], true).await.unwrap();
915
916        assert_eq!(
917            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
918            &[0xBB; DATA_SIZE]
919        );
920        assert_eq!(
921            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
922            &[0xBB; DATA_SIZE]
923        );
924
925        assert_eq!(storage.space_left().await.unwrap(), 30);
926
927        assert_eq!(
928            storage.peek(&mut data_buffer).await.unwrap().unwrap(),
929            &[0xDD; DATA_SIZE]
930        );
931        assert_eq!(
932            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
933            &[0xDD; DATA_SIZE]
934        );
935
936        assert_eq!(storage.space_left().await.unwrap(), 60);
937
938        assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
939        assert_eq!(storage.pop(&mut data_buffer).await.unwrap(), None);
940    }
941
942    #[test]
943    async fn push_pop() {
944        let mut storage = QueueStorage::new(
945            MockFlashBig::new(WriteCountCheck::Twice, None, true),
946            const { QueueConfig::new(0x000..0x1000) },
947            NoCache::new(),
948        );
949
950        let mut data_buffer = AlignedBuf([0; 1024]);
951
952        for i in 0..2000 {
953            println!("{i}");
954            let data = vec![i as u8; i % 512 + 1];
955
956            storage.push(&data, true).await.unwrap();
957            assert_eq!(
958                storage.peek(&mut data_buffer).await.unwrap().unwrap(),
959                &data,
960                "At {i}"
961            );
962            assert_eq!(
963                storage.pop(&mut data_buffer).await.unwrap().unwrap(),
964                &data,
965                "At {i}"
966            );
967            assert_eq!(
968                storage.peek(&mut data_buffer).await.unwrap(),
969                None,
970                "At {i}"
971            );
972        }
973    }
974
975    #[test]
976    async fn iter_pop_out_of_order() {
977        let mut storage = QueueStorage::new(
978            MockFlashBig::new(WriteCountCheck::Twice, None, true),
979            const { QueueConfig::new(0x000..0x1000) },
980            NoCache::new(),
981        );
982
983        let mut data_buffer = AlignedBuf([0; 1024]);
984
985        let gen_data = |i: usize| vec![i as u8; i % 512 + 1];
986        const COUNT: usize = 20;
987
988        for i in 0..COUNT {
989            storage.push(&gen_data(i), false).await.unwrap();
990        }
991
992        let mut iterator = storage.iter().await.unwrap();
993        let mut i = 0;
994        while let Some(entry) = iterator.next(&mut data_buffer).await.unwrap() {
995            if i % 2 == 1 {
996                assert_eq!(entry.pop().await.unwrap(), gen_data(i));
997            }
998
999            i += 1;
1000        }
1001        assert_eq!(i, COUNT);
1002
1003        let mut iterator = storage.iter().await.unwrap();
1004        let mut i = 0;
1005        while let Some(entry) = iterator.next(&mut data_buffer).await.unwrap() {
1006            assert_eq!(entry.into_buf(), gen_data(i));
1007            i += 2;
1008        }
1009        assert_eq!(i, COUNT);
1010    }
1011
1012    #[test]
1013    async fn push_pop_tiny() {
1014        let mut storage = QueueStorage::new(
1015            MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1016            const { QueueConfig::new(0x00..0x40) },
1017            NoCache::new(),
1018        );
1019        let mut data_buffer = AlignedBuf([0; 1024]);
1020
1021        for i in 0..2000 {
1022            println!("{i}");
1023            let data = vec![i as u8; i % 20 + 1];
1024
1025            println!("PUSH");
1026            storage.push(&data, true).await.unwrap();
1027            assert_eq!(
1028                storage.peek(&mut data_buffer).await.unwrap().unwrap(),
1029                &data,
1030                "At {i}"
1031            );
1032            println!("POP");
1033            assert_eq!(
1034                storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1035                &data,
1036                "At {i}"
1037            );
1038            println!("PEEK");
1039            assert_eq!(
1040                storage.peek(&mut data_buffer).await.unwrap(),
1041                None,
1042                "At {i}"
1043            );
1044            println!("DONE");
1045        }
1046    }
1047
1048    #[test]
1049    /// Same as [push_lots_then_pop_lots], except with added peeking and using the iterator style
1050    async fn push_peek_pop_many() {
1051        let mut storage = QueueStorage::new(
1052            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1053            const { QueueConfig::new(0x000..0x1000) },
1054            NoCache::new(),
1055        );
1056        let mut data_buffer = AlignedBuf([0; 1024]);
1057
1058        let mut push_stats = FlashStatsResult::default();
1059        let mut pushes = 0;
1060        let mut peek_stats = FlashStatsResult::default();
1061        let mut peeks = 0;
1062        let mut pop_stats = FlashStatsResult::default();
1063        let mut pops = 0;
1064
1065        for loop_index in 0..100 {
1066            println!("Loop index: {loop_index}");
1067
1068            for i in 0..20 {
1069                let start_snapshot = storage.flash().stats_snapshot();
1070                let data = vec![i as u8; 50];
1071                storage.push(&data, false).await.unwrap();
1072                pushes += 1;
1073                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1074            }
1075
1076            let start_snapshot = storage.flash().stats_snapshot();
1077            let mut iterator = storage.iter().await.unwrap();
1078            peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1079            for i in 0..5 {
1080                let start_snapshot = iterator.storage.flash().stats_snapshot();
1081                let data = [i as u8; 50];
1082                assert_eq!(
1083                    iterator
1084                        .next(&mut data_buffer)
1085                        .await
1086                        .unwrap()
1087                        .unwrap()
1088                        .deref(),
1089                    &data[..],
1090                    "At {i}"
1091                );
1092                peeks += 1;
1093                peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1094            }
1095
1096            let start_snapshot = storage.flash().stats_snapshot();
1097            let mut iterator = storage.iter().await.unwrap();
1098            pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1099            for i in 0..5 {
1100                let start_snapshot = iterator.storage.flash().stats_snapshot();
1101                let data = vec![i as u8; 50];
1102                assert_eq!(
1103                    iterator
1104                        .next(&mut data_buffer)
1105                        .await
1106                        .unwrap()
1107                        .unwrap()
1108                        .pop()
1109                        .await
1110                        .unwrap(),
1111                    &data,
1112                    "At {i}"
1113                );
1114                pops += 1;
1115                pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1116            }
1117
1118            for i in 20..25 {
1119                let start_snapshot = storage.flash().stats_snapshot();
1120                let data = vec![i as u8; 50];
1121                storage.push(&data, false).await.unwrap();
1122                pushes += 1;
1123                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1124            }
1125
1126            let start_snapshot = storage.flash().stats_snapshot();
1127            let mut iterator = storage.iter().await.unwrap();
1128            peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1129            for i in 5..25 {
1130                let start_snapshot = iterator.storage.flash().stats_snapshot();
1131                let data = vec![i as u8; 50];
1132                assert_eq!(
1133                    iterator
1134                        .next(&mut data_buffer)
1135                        .await
1136                        .unwrap()
1137                        .unwrap()
1138                        .deref(),
1139                    &data,
1140                    "At {i}"
1141                );
1142                peeks += 1;
1143                peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1144            }
1145
1146            let start_snapshot = storage.flash().stats_snapshot();
1147            let mut iterator = storage.iter().await.unwrap();
1148            pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1149            for i in 5..25 {
1150                let start_snapshot = iterator.storage.flash().stats_snapshot();
1151                let data = vec![i as u8; 50];
1152                assert_eq!(
1153                    iterator
1154                        .next(&mut data_buffer)
1155                        .await
1156                        .unwrap()
1157                        .unwrap()
1158                        .pop()
1159                        .await
1160                        .unwrap(),
1161                    &data,
1162                    "At {i}"
1163                );
1164                pops += 1;
1165                pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1166            }
1167        }
1168
1169        // Assert the performance. These numbers can be changed if acceptable.
1170        approx::assert_relative_eq!(
1171            push_stats.take_average(pushes),
1172            FlashAverageStatsResult {
1173                avg_erases: 0.0,
1174                avg_reads: 16.864,
1175                avg_writes: 3.1252,
1176                avg_bytes_read: 105.4112,
1177                avg_bytes_written: 60.5008
1178            }
1179        );
1180        approx::assert_relative_eq!(
1181            peek_stats.take_average(peeks),
1182            FlashAverageStatsResult {
1183                avg_erases: 0.0052,
1184                avg_reads: 3.8656,
1185                avg_writes: 0.0,
1186                avg_bytes_read: 70.4256,
1187                avg_bytes_written: 0.0
1188            }
1189        );
1190        approx::assert_relative_eq!(
1191            pop_stats.take_average(pops),
1192            FlashAverageStatsResult {
1193                avg_erases: 0.0572,
1194                avg_reads: 3.7772,
1195                avg_writes: 1.0,
1196                avg_bytes_read: 69.7184,
1197                avg_bytes_written: 8.0
1198            }
1199        );
1200    }
1201
1202    #[test]
1203    async fn push_lots_then_pop_lots() {
1204        let mut storage = QueueStorage::new(
1205            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1206            const { QueueConfig::new(0x000..0x1000) },
1207            NoCache::new(),
1208        );
1209        let mut data_buffer = AlignedBuf([0; 1024]);
1210
1211        let mut push_stats = FlashStatsResult::default();
1212        let mut pushes = 0;
1213        let mut pop_stats = FlashStatsResult::default();
1214        let mut pops = 0;
1215
1216        for loop_index in 0..100 {
1217            println!("Loop index: {loop_index}");
1218
1219            for i in 0..20 {
1220                let start_snapshot = storage.flash().stats_snapshot();
1221                let data = vec![i as u8; 50];
1222                storage.push(&data, false).await.unwrap();
1223                pushes += 1;
1224                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1225            }
1226
1227            for i in 0..5 {
1228                let start_snapshot = storage.flash().stats_snapshot();
1229                let data = vec![i as u8; 50];
1230                assert_eq!(
1231                    storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1232                    &data,
1233                    "At {i}"
1234                );
1235                pops += 1;
1236                pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1237            }
1238
1239            for i in 20..25 {
1240                let start_snapshot = storage.flash().stats_snapshot();
1241                let data = vec![i as u8; 50];
1242                storage.push(&data, false).await.unwrap();
1243                pushes += 1;
1244                push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1245            }
1246
1247            for i in 5..25 {
1248                let start_snapshot = storage.flash().stats_snapshot();
1249                let data = vec![i as u8; 50];
1250                assert_eq!(
1251                    storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1252                    &data,
1253                    "At {i}"
1254                );
1255                pops += 1;
1256                pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1257            }
1258        }
1259
1260        // Assert the performance. These numbers can be changed if acceptable.
1261        approx::assert_relative_eq!(
1262            push_stats.take_average(pushes),
1263            FlashAverageStatsResult {
1264                avg_erases: 0.0,
1265                avg_reads: 16.864,
1266                avg_writes: 3.1252,
1267                avg_bytes_read: 105.4112,
1268                avg_bytes_written: 60.5008
1269            }
1270        );
1271        approx::assert_relative_eq!(
1272            pop_stats.take_average(pops),
1273            FlashAverageStatsResult {
1274                avg_erases: 0.0624,
1275                avg_reads: 23.5768,
1276                avg_writes: 1.0,
1277                avg_bytes_read: 180.512,
1278                avg_bytes_written: 8.0
1279            }
1280        );
1281    }
1282
1283    #[test]
1284    async fn pop_with_empty_section() {
1285        let mut storage = QueueStorage::new(
1286            MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1287            const { QueueConfig::new(0x00..0x40) },
1288            NoCache::new(),
1289        );
1290        let mut data_buffer = AlignedBuf([0; 1024]);
1291
1292        data_buffer[..20].copy_from_slice(&[0xAA; 20]);
1293        storage.push(&data_buffer[0..20], false).await.unwrap();
1294        data_buffer[..20].copy_from_slice(&[0xBB; 20]);
1295        storage.push(&data_buffer[0..20], false).await.unwrap();
1296
1297        // There's now an unused gap at the end of the first page
1298
1299        assert_eq!(
1300            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1301            &[0xAA; 20]
1302        );
1303
1304        assert_eq!(
1305            storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1306            &[0xBB; 20]
1307        );
1308    }
1309
1310    #[test]
1311    async fn search_pages() {
1312        let mut storage = QueueStorage::new(
1313            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1314            const { QueueConfig::new(0x000..0x1000) },
1315            NoCache::new(),
1316        );
1317
1318        storage.inner.close_page(0).await.unwrap();
1319        storage.inner.close_page(1).await.unwrap();
1320        storage.inner.partial_close_page(2).await.unwrap();
1321
1322        assert_eq!(storage.find_youngest_page().await.unwrap(), 2);
1323        assert_eq!(storage.find_oldest_page().await.unwrap(), 0);
1324    }
1325
1326    #[test]
1327    async fn store_too_big_item() {
1328        let mut storage = QueueStorage::new(
1329            MockFlashBig::new(WriteCountCheck::Twice, None, true),
1330            const { QueueConfig::new(0x000..0x1000) },
1331            NoCache::new(),
1332        );
1333
1334        storage
1335            .push(&AlignedBuf([0; 1024 - 4 * 2 - 8]), false)
1336            .await
1337            .unwrap();
1338
1339        assert_eq!(
1340            storage
1341                .push(&AlignedBuf([0; 1024 - 4 * 2 - 8 + 1]), false,)
1342                .await,
1343            Err(Error::ItemTooBig)
1344        );
1345    }
1346
1347    #[test]
1348    async fn push_on_single_page() {
1349        let mut storage = QueueStorage::new(
1350            mock_flash::MockFlashBase::<1, 4, 256>::new(WriteCountCheck::Twice, None, true),
1351            const { QueueConfig::new(0x000..0x400) },
1352            NoCache::new(),
1353        );
1354
1355        for _ in 0..100 {
1356            match storage.push(&[0, 1, 2, 3, 4], true).await {
1357                Ok(_) => {}
1358                Err(e) => {
1359                    println!("{}", storage.print_items().await);
1360                    panic!("{e}");
1361                }
1362            }
1363        }
1364    }
1365}