1pub mod buffered;
4
5use crate::item::{Item, ItemHeader, ItemHeaderIter};
6
7use self::{cache::CacheImpl, item::ItemUnborrowed};
8
9use super::{
10 Debug, Deref, DerefMut, Error, GenericStorage, MAX_WORD_SIZE, NorFlash, NorFlashExt, PageState,
11 PhantomData, Range, cache, calculate_page_address, calculate_page_end_address,
12 calculate_page_index, calculate_page_size, item, run_with_auto_repair,
13};
14use embedded_storage_async::nor_flash::MultiwriteNorFlash;
15
16pub struct QueueConfig<S> {
18 flash_range: Range<u32>,
19 _phantom: PhantomData<S>,
20}
21
22impl<S: NorFlash> QueueConfig<S> {
23 #[must_use]
26 pub const fn new(flash_range: Range<u32>) -> Self {
27 Self::try_new(flash_range).expect("Queue config must be correct")
28 }
29
30 #[must_use]
32 pub const fn try_new(flash_range: Range<u32>) -> Option<Self> {
33 if !flash_range.start.is_multiple_of(S::ERASE_SIZE as u32) {
34 return None;
35 }
36 if !flash_range.end.is_multiple_of(S::ERASE_SIZE as u32) {
37 return None;
38 }
39 if flash_range.end - flash_range.start < (S::ERASE_SIZE as u32) {
41 return None;
42 }
43
44 if S::ERASE_SIZE < S::WORD_SIZE * 4 {
45 return None;
46 }
47 if S::WORD_SIZE > MAX_WORD_SIZE {
48 return None;
49 }
50
51 Some(Self {
52 flash_range,
53 _phantom: PhantomData,
54 })
55 }
56}
57
58pub struct QueueStorage<S: NorFlash, C: CacheImpl> {
116 inner: GenericStorage<S, C>,
117}
118
119impl<S: NorFlash, C: CacheImpl> QueueStorage<S, C> {
120 pub const fn new(storage: S, config: QueueConfig<S>, cache: C) -> Self {
126 Self {
127 inner: GenericStorage {
128 flash: storage,
129 flash_range: config.flash_range,
130 cache,
131 },
132 }
133 }
134
135 pub async fn push(
144 &mut self,
145 data: &[u8],
146 allow_overwrite_old_data: bool,
147 ) -> Result<(), Error<S::Error>> {
148 run_with_auto_repair!(
149 function = self.push_inner(data, allow_overwrite_old_data).await,
150 repair = self.try_repair().await?
151 )
152 }
153
154 async fn push_inner(
155 &mut self,
156 data: &[u8],
157 allow_overwrite_old_data: bool,
158 ) -> Result<(), Error<S::Error>> {
159 if self.inner.cache.is_dirty() {
160 self.inner.cache.invalidate_cache_state();
161 }
162
163 if data.len() > u16::MAX as usize
165 || data.len()
166 > calculate_page_size::<S>()
167 .saturating_sub(ItemHeader::data_address::<S>(0) as usize)
168 {
169 self.inner.cache.unmark_dirty();
170 return Err(Error::ItemTooBig);
171 }
172
173 let current_page = self.find_youngest_page().await?;
174
175 let page_data_start_address =
176 calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
177 let page_data_end_address =
178 calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
179
180 self.inner.partial_close_page(current_page).await?;
181
182 let mut next_address = self
185 .inner
186 .find_next_free_item_spot(
187 page_data_start_address,
188 page_data_end_address,
189 data.len() as u32,
190 )
191 .await?;
192
193 if next_address.is_none() {
194 let next_page = self.inner.next_page(current_page);
196 let next_page_state = self.inner.get_page_state(next_page).await?;
197 let single_page = next_page == current_page;
198
199 match (next_page_state, single_page) {
200 (PageState::Open, _) => {
201 self.inner.close_page(current_page).await?;
202 self.inner.partial_close_page(next_page).await?;
203 next_address = Some(
204 calculate_page_address::<S>(self.flash_range(), next_page)
205 + S::WORD_SIZE as u32,
206 );
207 }
208 (PageState::Closed, _) | (PageState::PartialOpen, true) => {
209 let next_page_data_start_address =
210 calculate_page_address::<S>(self.flash_range(), next_page)
211 + S::WORD_SIZE as u32;
212
213 if !allow_overwrite_old_data
214 && !self
215 .inner
216 .is_page_empty(next_page, Some(next_page_state))
217 .await?
218 {
219 self.inner.cache.unmark_dirty();
220 return Err(Error::FullStorage);
221 }
222
223 self.inner.open_page(next_page).await?;
224 if !single_page {
225 self.inner.close_page(current_page).await?;
226 }
227 self.inner.partial_close_page(next_page).await?;
228 next_address = Some(next_page_data_start_address);
229 }
230 (PageState::PartialOpen, false) => {
231 return Err(Error::Corrupted {
233 #[cfg(feature = "_test")]
234 backtrace: std::backtrace::Backtrace::capture(),
235 });
236 }
237 }
238 }
239
240 Item::write_new(
241 &mut self.inner.flash,
242 self.inner.flash_range.clone(),
243 &mut self.inner.cache,
244 next_address.unwrap(),
245 data,
246 )
247 .await?;
248
249 self.inner.cache.unmark_dirty();
250 Ok(())
251 }
252
253 pub async fn iter(&mut self) -> Result<QueueIterator<'_, S, C>, Error<S::Error>> {
260 QueueIterator::new(self).await
262 }
263
264 pub async fn peek<'d>(
275 &mut self,
276 data_buffer: &'d mut [u8],
277 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
278 let mut iterator = self.iter().await?;
280
281 let next_value = iterator.next(data_buffer).await?;
282
283 match next_value {
284 Some(entry) => Ok(Some(entry.into_buf())),
285 None => Ok(None),
286 }
287 }
288
289 pub async fn pop<'d>(
300 &mut self,
301 data_buffer: &'d mut [u8],
302 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
303 where
304 S: MultiwriteNorFlash,
305 {
306 let mut iterator = self.iter().await?;
307
308 let next_value = iterator.next(data_buffer).await?;
309
310 match next_value {
311 Some(entry) => Ok(Some(entry.pop().await?)),
312 None => Ok(None),
313 }
314 }
315
316 pub async fn find_max_fit(&mut self) -> Result<Option<u32>, Error<S::Error>> {
323 run_with_auto_repair!(
324 function = self.find_max_fit_inner().await,
325 repair = self.try_repair().await?
326 )
327 }
328
329 async fn find_max_fit_inner(&mut self) -> Result<Option<u32>, Error<S::Error>> {
330 if self.inner.cache.is_dirty() {
331 self.inner.cache.invalidate_cache_state();
332 }
333
334 let current_page = self.find_youngest_page().await?;
335
336 let next_page = self.inner.next_page(current_page);
338 match self.inner.get_page_state(next_page).await? {
339 state @ PageState::Closed => {
340 if self.inner.is_page_empty(next_page, Some(state)).await? {
341 self.inner.cache.unmark_dirty();
342 return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
343 }
344 }
345 PageState::Open => {
346 self.inner.cache.unmark_dirty();
347 return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
348 }
349 PageState::PartialOpen => {
350 return Err(Error::Corrupted {
352 #[cfg(feature = "_test")]
353 backtrace: std::backtrace::Backtrace::capture(),
354 });
355 }
356 }
357
358 let page_data_start_address =
360 calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
361 let page_data_end_address =
362 calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
363
364 let next_item_address = match self.inner.cache.first_item_after_written(current_page) {
365 Some(next_item_address) => next_item_address,
366 None => {
367 ItemHeaderIter::new(
368 self.inner
369 .cache
370 .first_item_after_erased(current_page)
371 .unwrap_or(page_data_start_address),
372 page_data_end_address,
373 )
374 .traverse(&mut self.inner.flash, |_, _| true)
375 .await?
376 .1
377 }
378 };
379
380 self.inner.cache.unmark_dirty();
381 Ok(ItemHeader::available_data_bytes::<S>(
382 page_data_end_address - next_item_address,
383 ))
384 }
385
386 pub async fn space_left(&mut self) -> Result<u32, Error<S::Error>> {
396 run_with_auto_repair!(
397 function = self.space_left_inner().await,
398 repair = self.try_repair().await?
399 )
400 }
401
402 async fn space_left_inner(&mut self) -> Result<u32, Error<S::Error>> {
403 if self.inner.cache.is_dirty() {
404 self.inner.cache.invalidate_cache_state();
405 }
406
407 let mut total_free_space = 0;
408
409 for page in self.inner.get_pages(0) {
410 let state = self.inner.get_page_state(page).await?;
411 let page_empty = self.inner.is_page_empty(page, Some(state)).await?;
412
413 if state.is_closed() && !page_empty {
414 continue;
415 }
416
417 let page_data_start_address =
419 calculate_page_address::<S>(self.flash_range(), page) + S::WORD_SIZE as u32;
420 let page_data_end_address =
421 calculate_page_end_address::<S>(self.flash_range(), page) - S::WORD_SIZE as u32;
422
423 if page_empty {
424 total_free_space += page_data_end_address - page_data_start_address;
425 continue;
426 }
427
428 let next_item_address = match self.inner.cache.first_item_after_written(page) {
430 Some(next_item_address) => next_item_address,
431 None => {
432 ItemHeaderIter::new(
433 self.inner
434 .cache
435 .first_item_after_erased(page)
436 .unwrap_or(page_data_start_address),
437 page_data_end_address,
438 )
439 .traverse(&mut self.inner.flash, |_, _| true)
440 .await?
441 .1
442 }
443 };
444
445 if ItemHeader::available_data_bytes::<S>(page_data_end_address - next_item_address)
446 .is_none()
447 {
448 if self
452 .inner
453 .is_page_empty(page, Some(PageState::Closed))
454 .await?
455 {
456 total_free_space += page_data_end_address - page_data_start_address;
457 continue;
458 }
459 }
460
461 total_free_space += page_data_end_address - next_item_address;
462 }
463
464 self.inner.cache.unmark_dirty();
465 Ok(total_free_space)
466 }
467
468 async fn find_youngest_page(&mut self) -> Result<usize, Error<S::Error>> {
469 let last_used_page = self
470 .inner
471 .find_first_page(0, PageState::PartialOpen)
472 .await?;
473
474 if let Some(last_used_page) = last_used_page {
475 return Ok(last_used_page);
476 }
477
478 let first_closed_page = self.inner.find_first_page(0, PageState::Closed).await?;
480
481 let first_open_page = match first_closed_page {
482 Some(anchor) => {
483 self.inner.find_first_page(anchor, PageState::Open).await?
486 }
487 None => {
488 Some(0)
491 }
492 };
493
494 if let Some(first_open_page) = first_open_page {
495 return Ok(first_open_page);
496 }
497
498 Err(Error::Corrupted {
500 #[cfg(feature = "_test")]
501 backtrace: std::backtrace::Backtrace::capture(),
502 })
503 }
504
505 async fn find_oldest_page(&mut self) -> Result<usize, Error<S::Error>> {
506 let youngest_page = self.find_youngest_page().await?;
507
508 let oldest_closed_page = self
510 .inner
511 .find_first_page(youngest_page, PageState::Closed)
512 .await?;
513
514 Ok(oldest_closed_page.unwrap_or(youngest_page))
515 }
516
517 async fn try_repair(&mut self) -> Result<(), Error<S::Error>> {
528 self.inner.cache.invalidate_cache_state();
529
530 self.inner.try_general_repair().await?;
531 Ok(())
532 }
533
534 async fn find_start_address(&mut self) -> Result<NextAddress, Error<S::Error>> {
535 if self.inner.cache.is_dirty() {
536 self.inner.cache.invalidate_cache_state();
537 }
538
539 let oldest_page = self.find_oldest_page().await?;
540
541 let current_address = match self.inner.cache.first_item_after_erased(oldest_page) {
543 Some(address) => address,
544 None => {
545 calculate_page_address::<S>(self.inner.flash_range.clone(), oldest_page)
546 + S::WORD_SIZE as u32
547 }
548 };
549
550 Ok(NextAddress::Address(current_address))
551 }
552
553 pub fn erase_all(&mut self) -> impl Future<Output = Result<(), Error<S::Error>>> {
557 self.inner.erase_all()
558 }
559
560 #[must_use]
565 pub const fn item_overhead_size() -> u32 {
566 GenericStorage::<S, C>::item_overhead_size()
567 }
568
569 pub fn destroy(self) -> (S, C) {
573 self.inner.destroy()
574 }
575
576 pub const fn flash(&mut self) -> &mut S {
578 self.inner.flash()
579 }
580
581 pub const fn flash_range(&self) -> Range<u32> {
583 self.inner.flash_range()
584 }
585
586 #[cfg(any(test, feature = "std"))]
587 pub fn print_items(&mut self) -> impl Future<Output = String> {
591 self.inner.print_items()
592 }
593}
594
595#[derive(PartialEq, Eq, Clone, Copy, Debug)]
596enum PreviousItemStates {
597 AllPopped,
598 AllButCurrentPopped,
599 Unpopped,
600}
601
602pub struct QueueIterator<'s, S: NorFlash, C: CacheImpl> {
604 storage: &'s mut QueueStorage<S, C>,
605 next_address: NextAddress,
606 previous_item_states: PreviousItemStates,
607 oldest_page: usize,
608}
609
610impl<S: NorFlash, C: CacheImpl> Debug for QueueIterator<'_, S, C> {
611 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
612 f.debug_struct("QueueIterator")
613 .field("current_address", &self.next_address)
614 .finish_non_exhaustive()
615 }
616}
617
618#[derive(Debug, Clone, Copy)]
619enum NextAddress {
620 Address(u32),
621 PageAfter(usize),
622}
623
624impl<'s, S: NorFlash, C: CacheImpl> QueueIterator<'s, S, C> {
625 async fn new(storage: &'s mut QueueStorage<S, C>) -> Result<Self, Error<S::Error>> {
626 let start_address = run_with_auto_repair!(
627 function = storage.find_start_address().await,
628 repair = storage.try_repair().await?
629 )?;
630
631 let oldest_page = match start_address {
632 NextAddress::Address(address) => {
633 calculate_page_index::<S>(storage.inner.flash_range.clone(), address)
634 }
635 NextAddress::PageAfter(index) => storage.inner.next_page(index),
636 };
637
638 Ok(Self {
639 storage,
640 next_address: start_address,
641 previous_item_states: PreviousItemStates::AllPopped,
642 oldest_page,
643 })
644 }
645
646 pub async fn next<'d, 'q>(
652 &'q mut self,
653 data_buffer: &'d mut [u8],
654 ) -> Result<Option<QueueIteratorEntry<'s, 'd, 'q, S, C>>, Error<S::Error>> {
655 if self.previous_item_states == PreviousItemStates::AllButCurrentPopped {
658 self.previous_item_states = PreviousItemStates::Unpopped;
659 }
660
661 let value = run_with_auto_repair!(
662 function = self.next_inner(data_buffer).await,
663 repair = self.storage.try_repair().await?
664 );
665
666 match value {
667 Ok(Some((item, address))) => Ok(Some(QueueIteratorEntry {
668 iter: self,
669 item: item.reborrow(data_buffer).ok_or_else(|| Error::LogicBug {
670 #[cfg(feature = "_test")]
671 backtrace: std::backtrace::Backtrace::capture(),
672 })?,
673 address,
674 })),
675 Ok(None) => Ok(None),
676 Err(e) => Err(e),
677 }
678 }
679
680 async fn next_inner(
681 &mut self,
682 data_buffer: &mut [u8],
683 ) -> Result<Option<(ItemUnborrowed, u32)>, Error<S::Error>> {
684 if self.storage.inner.cache.is_dirty() {
685 self.storage.inner.cache.invalidate_cache_state();
686 }
687
688 loop {
689 let (current_page, current_address) = match self.next_address {
691 NextAddress::PageAfter(previous_page) => {
692 let next_page = self.storage.inner.next_page(previous_page);
693 if self
694 .storage
695 .inner
696 .get_page_state(next_page)
697 .await?
698 .is_open()
699 || next_page == self.oldest_page
700 {
701 self.storage.inner.cache.unmark_dirty();
702 return Ok(None);
703 }
704
705 if self.previous_item_states == PreviousItemStates::AllPopped {
709 self.storage.inner.open_page(previous_page).await?;
710 }
711
712 let current_address = calculate_page_address::<S>(
713 self.storage.inner.flash_range.clone(),
714 next_page,
715 ) + S::WORD_SIZE as u32;
716
717 self.next_address = NextAddress::Address(current_address);
718
719 (next_page, current_address)
720 }
721 NextAddress::Address(address) => (
722 calculate_page_index::<S>(self.storage.inner.flash_range.clone(), address),
723 address,
724 ),
725 };
726
727 let page_data_end_address = calculate_page_end_address::<S>(
728 self.storage.inner.flash_range.clone(),
729 current_page,
730 ) - S::WORD_SIZE as u32;
731
732 let mut it = ItemHeaderIter::new(current_address, page_data_end_address);
734 if let (Some(found_item_header), found_item_address) = it
736 .traverse(&mut self.storage.inner.flash, |header, _| {
737 header.crc.is_none()
738 })
739 .await?
740 {
741 let maybe_item = found_item_header
742 .read_item(
743 &mut self.storage.inner.flash,
744 data_buffer,
745 found_item_address,
746 page_data_end_address,
747 )
748 .await?;
749
750 match maybe_item {
751 item::MaybeItem::Corrupted(header, _) => {
752 let next_address = header.next_item_address::<S>(found_item_address);
753 self.next_address = if next_address >= page_data_end_address {
754 NextAddress::PageAfter(current_page)
755 } else {
756 NextAddress::Address(next_address)
757 };
758 }
759 item::MaybeItem::Erased(_, _) => {
760 return Err(Error::LogicBug {
762 #[cfg(feature = "_test")]
763 backtrace: std::backtrace::Backtrace::capture(),
764 });
765 }
766 item::MaybeItem::Present(item) => {
767 let next_address = item.header.next_item_address::<S>(found_item_address);
768 self.next_address = if next_address >= page_data_end_address {
769 NextAddress::PageAfter(current_page)
770 } else {
771 NextAddress::Address(next_address)
772 };
773
774 if self.previous_item_states == PreviousItemStates::AllPopped {
776 self.previous_item_states = PreviousItemStates::AllButCurrentPopped;
777 }
778
779 self.storage.inner.cache.unmark_dirty();
781 return Ok(Some((item.unborrow(), found_item_address)));
782 }
783 }
784 } else {
785 self.next_address = NextAddress::PageAfter(current_page);
786 }
787 }
788 }
789}
790
791pub struct QueueIteratorEntry<'s, 'd, 'q, S: NorFlash, CI: CacheImpl> {
793 iter: &'q mut QueueIterator<'s, S, CI>,
794 address: u32,
795 item: Item<'d>,
796}
797
798impl<S: NorFlash, CI: CacheImpl> Deref for QueueIteratorEntry<'_, '_, '_, S, CI> {
799 type Target = [u8];
800
801 fn deref(&self) -> &Self::Target {
802 self.item.data()
803 }
804}
805
806impl<S: NorFlash, CI: CacheImpl> DerefMut for QueueIteratorEntry<'_, '_, '_, S, CI> {
807 fn deref_mut(&mut self) -> &mut Self::Target {
808 self.item.data_mut()
809 }
810}
811
812impl<'d, S: NorFlash, CI: CacheImpl> QueueIteratorEntry<'_, 'd, '_, S, CI> {
813 #[must_use]
816 pub fn into_buf(self) -> &'d mut [u8] {
817 self.item.data_owned()
818 }
819
820 pub async fn pop(self) -> Result<&'d mut [u8], Error<S::Error>>
823 where
824 S: MultiwriteNorFlash,
825 {
826 let (header, item_data_buffer) = self.item.header_and_data_owned();
827
828 if self.iter.previous_item_states == PreviousItemStates::AllButCurrentPopped {
830 self.iter.previous_item_states = PreviousItemStates::AllPopped;
831 }
832
833 header
834 .erase_data(
835 &mut self.iter.storage.inner.flash,
836 self.iter.storage.inner.flash_range.clone(),
837 &mut self.iter.storage.inner.cache,
838 self.address,
839 )
840 .await?;
841
842 self.iter.storage.inner.cache.unmark_dirty();
843 Ok(item_data_buffer)
844 }
845
846 #[cfg(feature = "_test")]
848 pub fn address(&self) -> u32 {
849 self.address
850 }
851}
852
853#[cfg(test)]
854mod tests {
855 use crate::{
856 AlignedBuf,
857 cache::NoCache,
858 mock_flash::{self, FlashAverageStatsResult, FlashStatsResult, WriteCountCheck},
859 };
860
861 use super::*;
862 use futures_test::test;
863
864 type MockFlashBig = mock_flash::MockFlashBase<4, 4, 256>;
865 type MockFlashTiny = mock_flash::MockFlashBase<2, 1, 32>;
866
867 #[test]
868 async fn peek_and_overwrite_old_data() {
869 let mut storage = QueueStorage::new(
870 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
871 const { QueueConfig::new(0x00..0x40) },
872 NoCache::new(),
873 );
874 let mut data_buffer = AlignedBuf([0; 1024]);
875 const DATA_SIZE: usize = 22;
876
877 assert_eq!(storage.space_left().await.unwrap(), 60);
878
879 assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
880
881 data_buffer[..DATA_SIZE].copy_from_slice(&[0xAA; DATA_SIZE]);
882 storage
883 .push(&data_buffer[..DATA_SIZE], false)
884 .await
885 .unwrap();
886
887 assert_eq!(storage.space_left().await.unwrap(), 30);
888
889 assert_eq!(
890 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
891 &[0xAA; DATA_SIZE]
892 );
893 data_buffer[..DATA_SIZE].copy_from_slice(&[0xBB; DATA_SIZE]);
894 storage
895 .push(&data_buffer[..DATA_SIZE], false)
896 .await
897 .unwrap();
898
899 assert_eq!(storage.space_left().await.unwrap(), 0);
900
901 assert_eq!(
902 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
903 &[0xAA; DATA_SIZE]
904 );
905
906 data_buffer[..DATA_SIZE].copy_from_slice(&[0xCC; DATA_SIZE]);
908 storage
909 .push(&data_buffer[..DATA_SIZE], false)
910 .await
911 .unwrap_err();
912 data_buffer[..DATA_SIZE].copy_from_slice(&[0xDD; DATA_SIZE]);
914 storage.push(&data_buffer[..DATA_SIZE], true).await.unwrap();
915
916 assert_eq!(
917 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
918 &[0xBB; DATA_SIZE]
919 );
920 assert_eq!(
921 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
922 &[0xBB; DATA_SIZE]
923 );
924
925 assert_eq!(storage.space_left().await.unwrap(), 30);
926
927 assert_eq!(
928 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
929 &[0xDD; DATA_SIZE]
930 );
931 assert_eq!(
932 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
933 &[0xDD; DATA_SIZE]
934 );
935
936 assert_eq!(storage.space_left().await.unwrap(), 60);
937
938 assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
939 assert_eq!(storage.pop(&mut data_buffer).await.unwrap(), None);
940 }
941
942 #[test]
943 async fn push_pop() {
944 let mut storage = QueueStorage::new(
945 MockFlashBig::new(WriteCountCheck::Twice, None, true),
946 const { QueueConfig::new(0x000..0x1000) },
947 NoCache::new(),
948 );
949
950 let mut data_buffer = AlignedBuf([0; 1024]);
951
952 for i in 0..2000 {
953 println!("{i}");
954 let data = vec![i as u8; i % 512 + 1];
955
956 storage.push(&data, true).await.unwrap();
957 assert_eq!(
958 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
959 &data,
960 "At {i}"
961 );
962 assert_eq!(
963 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
964 &data,
965 "At {i}"
966 );
967 assert_eq!(
968 storage.peek(&mut data_buffer).await.unwrap(),
969 None,
970 "At {i}"
971 );
972 }
973 }
974
975 #[test]
976 async fn iter_pop_out_of_order() {
977 let mut storage = QueueStorage::new(
978 MockFlashBig::new(WriteCountCheck::Twice, None, true),
979 const { QueueConfig::new(0x000..0x1000) },
980 NoCache::new(),
981 );
982
983 let mut data_buffer = AlignedBuf([0; 1024]);
984
985 let gen_data = |i: usize| vec![i as u8; i % 512 + 1];
986 const COUNT: usize = 20;
987
988 for i in 0..COUNT {
989 storage.push(&gen_data(i), false).await.unwrap();
990 }
991
992 let mut iterator = storage.iter().await.unwrap();
993 let mut i = 0;
994 while let Some(entry) = iterator.next(&mut data_buffer).await.unwrap() {
995 if i % 2 == 1 {
996 assert_eq!(entry.pop().await.unwrap(), gen_data(i));
997 }
998
999 i += 1;
1000 }
1001 assert_eq!(i, COUNT);
1002
1003 let mut iterator = storage.iter().await.unwrap();
1004 let mut i = 0;
1005 while let Some(entry) = iterator.next(&mut data_buffer).await.unwrap() {
1006 assert_eq!(entry.into_buf(), gen_data(i));
1007 i += 2;
1008 }
1009 assert_eq!(i, COUNT);
1010 }
1011
1012 #[test]
1013 async fn push_pop_tiny() {
1014 let mut storage = QueueStorage::new(
1015 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1016 const { QueueConfig::new(0x00..0x40) },
1017 NoCache::new(),
1018 );
1019 let mut data_buffer = AlignedBuf([0; 1024]);
1020
1021 for i in 0..2000 {
1022 println!("{i}");
1023 let data = vec![i as u8; i % 20 + 1];
1024
1025 println!("PUSH");
1026 storage.push(&data, true).await.unwrap();
1027 assert_eq!(
1028 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
1029 &data,
1030 "At {i}"
1031 );
1032 println!("POP");
1033 assert_eq!(
1034 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1035 &data,
1036 "At {i}"
1037 );
1038 println!("PEEK");
1039 assert_eq!(
1040 storage.peek(&mut data_buffer).await.unwrap(),
1041 None,
1042 "At {i}"
1043 );
1044 println!("DONE");
1045 }
1046 }
1047
1048 #[test]
1049 async fn push_peek_pop_many() {
1051 let mut storage = QueueStorage::new(
1052 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1053 const { QueueConfig::new(0x000..0x1000) },
1054 NoCache::new(),
1055 );
1056 let mut data_buffer = AlignedBuf([0; 1024]);
1057
1058 let mut push_stats = FlashStatsResult::default();
1059 let mut pushes = 0;
1060 let mut peek_stats = FlashStatsResult::default();
1061 let mut peeks = 0;
1062 let mut pop_stats = FlashStatsResult::default();
1063 let mut pops = 0;
1064
1065 for loop_index in 0..100 {
1066 println!("Loop index: {loop_index}");
1067
1068 for i in 0..20 {
1069 let start_snapshot = storage.flash().stats_snapshot();
1070 let data = vec![i as u8; 50];
1071 storage.push(&data, false).await.unwrap();
1072 pushes += 1;
1073 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1074 }
1075
1076 let start_snapshot = storage.flash().stats_snapshot();
1077 let mut iterator = storage.iter().await.unwrap();
1078 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1079 for i in 0..5 {
1080 let start_snapshot = iterator.storage.flash().stats_snapshot();
1081 let data = [i as u8; 50];
1082 assert_eq!(
1083 iterator
1084 .next(&mut data_buffer)
1085 .await
1086 .unwrap()
1087 .unwrap()
1088 .deref(),
1089 &data[..],
1090 "At {i}"
1091 );
1092 peeks += 1;
1093 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1094 }
1095
1096 let start_snapshot = storage.flash().stats_snapshot();
1097 let mut iterator = storage.iter().await.unwrap();
1098 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1099 for i in 0..5 {
1100 let start_snapshot = iterator.storage.flash().stats_snapshot();
1101 let data = vec![i as u8; 50];
1102 assert_eq!(
1103 iterator
1104 .next(&mut data_buffer)
1105 .await
1106 .unwrap()
1107 .unwrap()
1108 .pop()
1109 .await
1110 .unwrap(),
1111 &data,
1112 "At {i}"
1113 );
1114 pops += 1;
1115 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1116 }
1117
1118 for i in 20..25 {
1119 let start_snapshot = storage.flash().stats_snapshot();
1120 let data = vec![i as u8; 50];
1121 storage.push(&data, false).await.unwrap();
1122 pushes += 1;
1123 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1124 }
1125
1126 let start_snapshot = storage.flash().stats_snapshot();
1127 let mut iterator = storage.iter().await.unwrap();
1128 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1129 for i in 5..25 {
1130 let start_snapshot = iterator.storage.flash().stats_snapshot();
1131 let data = vec![i as u8; 50];
1132 assert_eq!(
1133 iterator
1134 .next(&mut data_buffer)
1135 .await
1136 .unwrap()
1137 .unwrap()
1138 .deref(),
1139 &data,
1140 "At {i}"
1141 );
1142 peeks += 1;
1143 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1144 }
1145
1146 let start_snapshot = storage.flash().stats_snapshot();
1147 let mut iterator = storage.iter().await.unwrap();
1148 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1149 for i in 5..25 {
1150 let start_snapshot = iterator.storage.flash().stats_snapshot();
1151 let data = vec![i as u8; 50];
1152 assert_eq!(
1153 iterator
1154 .next(&mut data_buffer)
1155 .await
1156 .unwrap()
1157 .unwrap()
1158 .pop()
1159 .await
1160 .unwrap(),
1161 &data,
1162 "At {i}"
1163 );
1164 pops += 1;
1165 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1166 }
1167 }
1168
1169 approx::assert_relative_eq!(
1171 push_stats.take_average(pushes),
1172 FlashAverageStatsResult {
1173 avg_erases: 0.0,
1174 avg_reads: 16.864,
1175 avg_writes: 3.1252,
1176 avg_bytes_read: 105.4112,
1177 avg_bytes_written: 60.5008
1178 }
1179 );
1180 approx::assert_relative_eq!(
1181 peek_stats.take_average(peeks),
1182 FlashAverageStatsResult {
1183 avg_erases: 0.0052,
1184 avg_reads: 3.8656,
1185 avg_writes: 0.0,
1186 avg_bytes_read: 70.4256,
1187 avg_bytes_written: 0.0
1188 }
1189 );
1190 approx::assert_relative_eq!(
1191 pop_stats.take_average(pops),
1192 FlashAverageStatsResult {
1193 avg_erases: 0.0572,
1194 avg_reads: 3.7772,
1195 avg_writes: 1.0,
1196 avg_bytes_read: 69.7184,
1197 avg_bytes_written: 8.0
1198 }
1199 );
1200 }
1201
1202 #[test]
1203 async fn push_lots_then_pop_lots() {
1204 let mut storage = QueueStorage::new(
1205 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1206 const { QueueConfig::new(0x000..0x1000) },
1207 NoCache::new(),
1208 );
1209 let mut data_buffer = AlignedBuf([0; 1024]);
1210
1211 let mut push_stats = FlashStatsResult::default();
1212 let mut pushes = 0;
1213 let mut pop_stats = FlashStatsResult::default();
1214 let mut pops = 0;
1215
1216 for loop_index in 0..100 {
1217 println!("Loop index: {loop_index}");
1218
1219 for i in 0..20 {
1220 let start_snapshot = storage.flash().stats_snapshot();
1221 let data = vec![i as u8; 50];
1222 storage.push(&data, false).await.unwrap();
1223 pushes += 1;
1224 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1225 }
1226
1227 for i in 0..5 {
1228 let start_snapshot = storage.flash().stats_snapshot();
1229 let data = vec![i as u8; 50];
1230 assert_eq!(
1231 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1232 &data,
1233 "At {i}"
1234 );
1235 pops += 1;
1236 pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1237 }
1238
1239 for i in 20..25 {
1240 let start_snapshot = storage.flash().stats_snapshot();
1241 let data = vec![i as u8; 50];
1242 storage.push(&data, false).await.unwrap();
1243 pushes += 1;
1244 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1245 }
1246
1247 for i in 5..25 {
1248 let start_snapshot = storage.flash().stats_snapshot();
1249 let data = vec![i as u8; 50];
1250 assert_eq!(
1251 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1252 &data,
1253 "At {i}"
1254 );
1255 pops += 1;
1256 pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1257 }
1258 }
1259
1260 approx::assert_relative_eq!(
1262 push_stats.take_average(pushes),
1263 FlashAverageStatsResult {
1264 avg_erases: 0.0,
1265 avg_reads: 16.864,
1266 avg_writes: 3.1252,
1267 avg_bytes_read: 105.4112,
1268 avg_bytes_written: 60.5008
1269 }
1270 );
1271 approx::assert_relative_eq!(
1272 pop_stats.take_average(pops),
1273 FlashAverageStatsResult {
1274 avg_erases: 0.0624,
1275 avg_reads: 23.5768,
1276 avg_writes: 1.0,
1277 avg_bytes_read: 180.512,
1278 avg_bytes_written: 8.0
1279 }
1280 );
1281 }
1282
1283 #[test]
1284 async fn pop_with_empty_section() {
1285 let mut storage = QueueStorage::new(
1286 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1287 const { QueueConfig::new(0x00..0x40) },
1288 NoCache::new(),
1289 );
1290 let mut data_buffer = AlignedBuf([0; 1024]);
1291
1292 data_buffer[..20].copy_from_slice(&[0xAA; 20]);
1293 storage.push(&data_buffer[0..20], false).await.unwrap();
1294 data_buffer[..20].copy_from_slice(&[0xBB; 20]);
1295 storage.push(&data_buffer[0..20], false).await.unwrap();
1296
1297 assert_eq!(
1300 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1301 &[0xAA; 20]
1302 );
1303
1304 assert_eq!(
1305 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1306 &[0xBB; 20]
1307 );
1308 }
1309
1310 #[test]
1311 async fn search_pages() {
1312 let mut storage = QueueStorage::new(
1313 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1314 const { QueueConfig::new(0x000..0x1000) },
1315 NoCache::new(),
1316 );
1317
1318 storage.inner.close_page(0).await.unwrap();
1319 storage.inner.close_page(1).await.unwrap();
1320 storage.inner.partial_close_page(2).await.unwrap();
1321
1322 assert_eq!(storage.find_youngest_page().await.unwrap(), 2);
1323 assert_eq!(storage.find_oldest_page().await.unwrap(), 0);
1324 }
1325
1326 #[test]
1327 async fn store_too_big_item() {
1328 let mut storage = QueueStorage::new(
1329 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1330 const { QueueConfig::new(0x000..0x1000) },
1331 NoCache::new(),
1332 );
1333
1334 storage
1335 .push(&AlignedBuf([0; 1024 - 4 * 2 - 8]), false)
1336 .await
1337 .unwrap();
1338
1339 assert_eq!(
1340 storage
1341 .push(&AlignedBuf([0; 1024 - 4 * 2 - 8 + 1]), false,)
1342 .await,
1343 Err(Error::ItemTooBig)
1344 );
1345 }
1346
1347 #[test]
1348 async fn push_on_single_page() {
1349 let mut storage = QueueStorage::new(
1350 mock_flash::MockFlashBase::<1, 4, 256>::new(WriteCountCheck::Twice, None, true),
1351 const { QueueConfig::new(0x000..0x400) },
1352 NoCache::new(),
1353 );
1354
1355 for _ in 0..100 {
1356 match storage.push(&[0, 1, 2, 3, 4], true).await {
1357 Ok(_) => {}
1358 Err(e) => {
1359 println!("{}", storage.print_items().await);
1360 panic!("{e}");
1361 }
1362 }
1363 }
1364 }
1365}