1use crate::item::{Item, ItemHeader, ItemHeaderIter};
4
5use self::{cache::CacheImpl, item::ItemUnborrowed};
6
7use super::{
8 Debug, Deref, DerefMut, Error, GenericStorage, MAX_WORD_SIZE, NorFlash, NorFlashExt, PageState,
9 PhantomData, Range, cache, calculate_page_address, calculate_page_end_address,
10 calculate_page_index, calculate_page_size, item, run_with_auto_repair,
11};
12use embedded_storage_async::nor_flash::MultiwriteNorFlash;
13
14pub struct QueueConfig<S> {
16 flash_range: Range<u32>,
17 _phantom: PhantomData<S>,
18}
19
20impl<S: NorFlash> QueueConfig<S> {
21 #[must_use]
24 pub const fn new(flash_range: Range<u32>) -> Self {
25 Self::try_new(flash_range).expect("Queue config must be correct")
26 }
27
28 #[must_use]
30 pub const fn try_new(flash_range: Range<u32>) -> Option<Self> {
31 if !flash_range.start.is_multiple_of(S::ERASE_SIZE as u32) {
32 return None;
33 }
34 if !flash_range.end.is_multiple_of(S::ERASE_SIZE as u32) {
35 return None;
36 }
37 if flash_range.end - flash_range.start < (S::ERASE_SIZE as u32) {
39 return None;
40 }
41
42 if S::ERASE_SIZE < S::WORD_SIZE * 4 {
43 return None;
44 }
45 if S::WORD_SIZE > MAX_WORD_SIZE {
46 return None;
47 }
48
49 Some(Self {
50 flash_range,
51 _phantom: PhantomData,
52 })
53 }
54}
55
56pub struct QueueStorage<S: NorFlash, C: CacheImpl> {
114 inner: GenericStorage<S, C>,
115}
116
117impl<S: NorFlash, C: CacheImpl> QueueStorage<S, C> {
118 pub const fn new(storage: S, config: QueueConfig<S>, cache: C) -> Self {
124 Self {
125 inner: GenericStorage {
126 flash: storage,
127 flash_range: config.flash_range,
128 cache,
129 },
130 }
131 }
132
133 pub async fn push(
142 &mut self,
143 data: &[u8],
144 allow_overwrite_old_data: bool,
145 ) -> Result<(), Error<S::Error>> {
146 run_with_auto_repair!(
147 function = self.push_inner(data, allow_overwrite_old_data).await,
148 repair = self.try_repair().await?
149 )
150 }
151
152 async fn push_inner(
153 &mut self,
154 data: &[u8],
155 allow_overwrite_old_data: bool,
156 ) -> Result<(), Error<S::Error>> {
157 if self.inner.cache.is_dirty() {
158 self.inner.cache.invalidate_cache_state();
159 }
160
161 if data.len() > u16::MAX as usize
163 || data.len()
164 > calculate_page_size::<S>()
165 .saturating_sub(ItemHeader::data_address::<S>(0) as usize)
166 {
167 self.inner.cache.unmark_dirty();
168 return Err(Error::ItemTooBig);
169 }
170
171 let current_page = self.find_youngest_page().await?;
172
173 let page_data_start_address =
174 calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
175 let page_data_end_address =
176 calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
177
178 self.inner.partial_close_page(current_page).await?;
179
180 let mut next_address = self
183 .inner
184 .find_next_free_item_spot(
185 page_data_start_address,
186 page_data_end_address,
187 data.len() as u32,
188 )
189 .await?;
190
191 if next_address.is_none() {
192 let next_page = self.inner.next_page(current_page);
194 let next_page_state = self.inner.get_page_state(next_page).await?;
195 let single_page = next_page == current_page;
196
197 match (next_page_state, single_page) {
198 (PageState::Open, _) => {
199 self.inner.close_page(current_page).await?;
200 self.inner.partial_close_page(next_page).await?;
201 next_address = Some(
202 calculate_page_address::<S>(self.flash_range(), next_page)
203 + S::WORD_SIZE as u32,
204 );
205 }
206 (PageState::Closed, _) | (PageState::PartialOpen, true) => {
207 let next_page_data_start_address =
208 calculate_page_address::<S>(self.flash_range(), next_page)
209 + S::WORD_SIZE as u32;
210
211 if !allow_overwrite_old_data
212 && !self
213 .inner
214 .is_page_empty(next_page, Some(next_page_state))
215 .await?
216 {
217 self.inner.cache.unmark_dirty();
218 return Err(Error::FullStorage);
219 }
220
221 self.inner.open_page(next_page).await?;
222 if !single_page {
223 self.inner.close_page(current_page).await?;
224 }
225 self.inner.partial_close_page(next_page).await?;
226 next_address = Some(next_page_data_start_address);
227 }
228 (PageState::PartialOpen, false) => {
229 return Err(Error::Corrupted {
231 #[cfg(feature = "_test")]
232 backtrace: std::backtrace::Backtrace::capture(),
233 });
234 }
235 }
236 }
237
238 Item::write_new(
239 &mut self.inner.flash,
240 self.inner.flash_range.clone(),
241 &mut self.inner.cache,
242 next_address.unwrap(),
243 data,
244 )
245 .await?;
246
247 self.inner.cache.unmark_dirty();
248 Ok(())
249 }
250
251 pub async fn iter(&mut self) -> Result<QueueIterator<'_, S, C>, Error<S::Error>> {
258 QueueIterator::new(self).await
260 }
261
262 pub async fn peek<'d>(
273 &mut self,
274 data_buffer: &'d mut [u8],
275 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
276 let mut iterator = self.iter().await?;
278
279 let next_value = iterator.next(data_buffer).await?;
280
281 match next_value {
282 Some(entry) => Ok(Some(entry.into_buf())),
283 None => Ok(None),
284 }
285 }
286
287 pub async fn pop<'d>(
298 &mut self,
299 data_buffer: &'d mut [u8],
300 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
301 where
302 S: MultiwriteNorFlash,
303 {
304 let mut iterator = self.iter().await?;
305
306 let next_value = iterator.next(data_buffer).await?;
307
308 match next_value {
309 Some(entry) => Ok(Some(entry.pop().await?)),
310 None => Ok(None),
311 }
312 }
313
314 pub async fn find_max_fit(&mut self) -> Result<Option<u32>, Error<S::Error>> {
321 run_with_auto_repair!(
322 function = self.find_max_fit_inner().await,
323 repair = self.try_repair().await?
324 )
325 }
326
327 async fn find_max_fit_inner(&mut self) -> Result<Option<u32>, Error<S::Error>> {
328 if self.inner.cache.is_dirty() {
329 self.inner.cache.invalidate_cache_state();
330 }
331
332 let current_page = self.find_youngest_page().await?;
333
334 let next_page = self.inner.next_page(current_page);
336 match self.inner.get_page_state(next_page).await? {
337 state @ PageState::Closed => {
338 if self.inner.is_page_empty(next_page, Some(state)).await? {
339 self.inner.cache.unmark_dirty();
340 return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
341 }
342 }
343 PageState::Open => {
344 self.inner.cache.unmark_dirty();
345 return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
346 }
347 PageState::PartialOpen => {
348 return Err(Error::Corrupted {
350 #[cfg(feature = "_test")]
351 backtrace: std::backtrace::Backtrace::capture(),
352 });
353 }
354 }
355
356 let page_data_start_address =
358 calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
359 let page_data_end_address =
360 calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
361
362 let next_item_address = match self.inner.cache.first_item_after_written(current_page) {
363 Some(next_item_address) => next_item_address,
364 None => {
365 ItemHeaderIter::new(
366 self.inner
367 .cache
368 .first_item_after_erased(current_page)
369 .unwrap_or(page_data_start_address),
370 page_data_end_address,
371 )
372 .traverse(&mut self.inner.flash, |_, _| true)
373 .await?
374 .1
375 }
376 };
377
378 self.inner.cache.unmark_dirty();
379 Ok(ItemHeader::available_data_bytes::<S>(
380 page_data_end_address - next_item_address,
381 ))
382 }
383
384 pub async fn space_left(&mut self) -> Result<u32, Error<S::Error>> {
394 run_with_auto_repair!(
395 function = self.space_left_inner().await,
396 repair = self.try_repair().await?
397 )
398 }
399
400 async fn space_left_inner(&mut self) -> Result<u32, Error<S::Error>> {
401 if self.inner.cache.is_dirty() {
402 self.inner.cache.invalidate_cache_state();
403 }
404
405 let mut total_free_space = 0;
406
407 for page in self.inner.get_pages(0) {
408 let state = self.inner.get_page_state(page).await?;
409 let page_empty = self.inner.is_page_empty(page, Some(state)).await?;
410
411 if state.is_closed() && !page_empty {
412 continue;
413 }
414
415 let page_data_start_address =
417 calculate_page_address::<S>(self.flash_range(), page) + S::WORD_SIZE as u32;
418 let page_data_end_address =
419 calculate_page_end_address::<S>(self.flash_range(), page) - S::WORD_SIZE as u32;
420
421 if page_empty {
422 total_free_space += page_data_end_address - page_data_start_address;
423 continue;
424 }
425
426 let next_item_address = match self.inner.cache.first_item_after_written(page) {
428 Some(next_item_address) => next_item_address,
429 None => {
430 ItemHeaderIter::new(
431 self.inner
432 .cache
433 .first_item_after_erased(page)
434 .unwrap_or(page_data_start_address),
435 page_data_end_address,
436 )
437 .traverse(&mut self.inner.flash, |_, _| true)
438 .await?
439 .1
440 }
441 };
442
443 if ItemHeader::available_data_bytes::<S>(page_data_end_address - next_item_address)
444 .is_none()
445 {
446 if self
450 .inner
451 .is_page_empty(page, Some(PageState::Closed))
452 .await?
453 {
454 total_free_space += page_data_end_address - page_data_start_address;
455 continue;
456 }
457 }
458
459 total_free_space += page_data_end_address - next_item_address;
460 }
461
462 self.inner.cache.unmark_dirty();
463 Ok(total_free_space)
464 }
465
466 async fn find_youngest_page(&mut self) -> Result<usize, Error<S::Error>> {
467 let last_used_page = self
468 .inner
469 .find_first_page(0, PageState::PartialOpen)
470 .await?;
471
472 if let Some(last_used_page) = last_used_page {
473 return Ok(last_used_page);
474 }
475
476 let first_closed_page = self.inner.find_first_page(0, PageState::Closed).await?;
478
479 let first_open_page = match first_closed_page {
480 Some(anchor) => {
481 self.inner.find_first_page(anchor, PageState::Open).await?
484 }
485 None => {
486 Some(0)
489 }
490 };
491
492 if let Some(first_open_page) = first_open_page {
493 return Ok(first_open_page);
494 }
495
496 Err(Error::Corrupted {
498 #[cfg(feature = "_test")]
499 backtrace: std::backtrace::Backtrace::capture(),
500 })
501 }
502
503 async fn find_oldest_page(&mut self) -> Result<usize, Error<S::Error>> {
504 let youngest_page = self.find_youngest_page().await?;
505
506 let oldest_closed_page = self
508 .inner
509 .find_first_page(youngest_page, PageState::Closed)
510 .await?;
511
512 Ok(oldest_closed_page.unwrap_or(youngest_page))
513 }
514
515 async fn try_repair(&mut self) -> Result<(), Error<S::Error>> {
526 self.inner.cache.invalidate_cache_state();
527
528 self.inner.try_general_repair().await?;
529 Ok(())
530 }
531
532 async fn find_start_address(&mut self) -> Result<NextAddress, Error<S::Error>> {
533 if self.inner.cache.is_dirty() {
534 self.inner.cache.invalidate_cache_state();
535 }
536
537 let oldest_page = self.find_oldest_page().await?;
538
539 let current_address = match self.inner.cache.first_item_after_erased(oldest_page) {
541 Some(address) => address,
542 None => {
543 calculate_page_address::<S>(self.inner.flash_range.clone(), oldest_page)
544 + S::WORD_SIZE as u32
545 }
546 };
547
548 Ok(NextAddress::Address(current_address))
549 }
550
551 pub fn erase_all(&mut self) -> impl Future<Output = Result<(), Error<S::Error>>> {
555 self.inner.erase_all()
556 }
557
558 #[must_use]
563 pub const fn item_overhead_size() -> u32 {
564 GenericStorage::<S, C>::item_overhead_size()
565 }
566
567 pub fn destroy(self) -> (S, C) {
571 self.inner.destroy()
572 }
573
574 pub const fn flash(&mut self) -> &mut S {
576 self.inner.flash()
577 }
578
579 pub const fn flash_range(&self) -> Range<u32> {
581 self.inner.flash_range()
582 }
583
584 #[cfg(any(test, feature = "std"))]
585 pub fn print_items(&mut self) -> impl Future<Output = String> {
589 self.inner.print_items()
590 }
591}
592
593#[derive(PartialEq, Eq, Clone, Copy, Debug)]
594enum PreviousItemStates {
595 AllPopped,
596 AllButCurrentPopped,
597 Unpopped,
598}
599
600pub struct QueueIterator<'s, S: NorFlash, C: CacheImpl> {
602 storage: &'s mut QueueStorage<S, C>,
603 next_address: NextAddress,
604 previous_item_states: PreviousItemStates,
605 oldest_page: usize,
606}
607
608impl<S: NorFlash, C: CacheImpl> Debug for QueueIterator<'_, S, C> {
609 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
610 f.debug_struct("QueueIterator")
611 .field("current_address", &self.next_address)
612 .finish_non_exhaustive()
613 }
614}
615
616#[derive(Debug, Clone, Copy)]
617enum NextAddress {
618 Address(u32),
619 PageAfter(usize),
620}
621
622impl<'s, S: NorFlash, C: CacheImpl> QueueIterator<'s, S, C> {
623 async fn new(storage: &'s mut QueueStorage<S, C>) -> Result<Self, Error<S::Error>> {
624 let start_address = run_with_auto_repair!(
625 function = storage.find_start_address().await,
626 repair = storage.try_repair().await?
627 )?;
628
629 let oldest_page = match start_address {
630 NextAddress::Address(address) => {
631 calculate_page_index::<S>(storage.inner.flash_range.clone(), address)
632 }
633 NextAddress::PageAfter(index) => storage.inner.next_page(index),
634 };
635
636 Ok(Self {
637 storage,
638 next_address: start_address,
639 previous_item_states: PreviousItemStates::AllPopped,
640 oldest_page,
641 })
642 }
643
644 pub async fn next<'d, 'q>(
650 &'q mut self,
651 data_buffer: &'d mut [u8],
652 ) -> Result<Option<QueueIteratorEntry<'s, 'd, 'q, S, C>>, Error<S::Error>> {
653 if self.previous_item_states == PreviousItemStates::AllButCurrentPopped {
656 self.previous_item_states = PreviousItemStates::Unpopped;
657 }
658
659 let value = run_with_auto_repair!(
660 function = self.next_inner(data_buffer).await,
661 repair = self.storage.try_repair().await?
662 );
663
664 match value {
665 Ok(Some((item, address))) => Ok(Some(QueueIteratorEntry {
666 iter: self,
667 item: item.reborrow(data_buffer).ok_or_else(|| Error::LogicBug {
668 #[cfg(feature = "_test")]
669 backtrace: std::backtrace::Backtrace::capture(),
670 })?,
671 address,
672 })),
673 Ok(None) => Ok(None),
674 Err(e) => Err(e),
675 }
676 }
677
678 async fn next_inner(
679 &mut self,
680 data_buffer: &mut [u8],
681 ) -> Result<Option<(ItemUnborrowed, u32)>, Error<S::Error>> {
682 if self.storage.inner.cache.is_dirty() {
683 self.storage.inner.cache.invalidate_cache_state();
684 }
685
686 loop {
687 let (current_page, current_address) = match self.next_address {
689 NextAddress::PageAfter(previous_page) => {
690 let next_page = self.storage.inner.next_page(previous_page);
691 if self
692 .storage
693 .inner
694 .get_page_state(next_page)
695 .await?
696 .is_open()
697 || next_page == self.oldest_page
698 {
699 self.storage.inner.cache.unmark_dirty();
700 return Ok(None);
701 }
702
703 if self.previous_item_states == PreviousItemStates::AllPopped {
707 self.storage.inner.open_page(previous_page).await?;
708 }
709
710 let current_address = calculate_page_address::<S>(
711 self.storage.inner.flash_range.clone(),
712 next_page,
713 ) + S::WORD_SIZE as u32;
714
715 self.next_address = NextAddress::Address(current_address);
716
717 (next_page, current_address)
718 }
719 NextAddress::Address(address) => (
720 calculate_page_index::<S>(self.storage.inner.flash_range.clone(), address),
721 address,
722 ),
723 };
724
725 let page_data_end_address = calculate_page_end_address::<S>(
726 self.storage.inner.flash_range.clone(),
727 current_page,
728 ) - S::WORD_SIZE as u32;
729
730 let mut it = ItemHeaderIter::new(current_address, page_data_end_address);
732 if let (Some(found_item_header), found_item_address) = it
734 .traverse(&mut self.storage.inner.flash, |header, _| {
735 header.crc.is_none()
736 })
737 .await?
738 {
739 let maybe_item = found_item_header
740 .read_item(
741 &mut self.storage.inner.flash,
742 data_buffer,
743 found_item_address,
744 page_data_end_address,
745 )
746 .await?;
747
748 match maybe_item {
749 item::MaybeItem::Corrupted(header, _) => {
750 let next_address = header.next_item_address::<S>(found_item_address);
751 self.next_address = if next_address >= page_data_end_address {
752 NextAddress::PageAfter(current_page)
753 } else {
754 NextAddress::Address(next_address)
755 };
756 }
757 item::MaybeItem::Erased(_, _) => {
758 return Err(Error::LogicBug {
760 #[cfg(feature = "_test")]
761 backtrace: std::backtrace::Backtrace::capture(),
762 });
763 }
764 item::MaybeItem::Present(item) => {
765 let next_address = item.header.next_item_address::<S>(found_item_address);
766 self.next_address = if next_address >= page_data_end_address {
767 NextAddress::PageAfter(current_page)
768 } else {
769 NextAddress::Address(next_address)
770 };
771
772 if self.previous_item_states == PreviousItemStates::AllPopped {
774 self.previous_item_states = PreviousItemStates::AllButCurrentPopped;
775 }
776
777 self.storage.inner.cache.unmark_dirty();
779 return Ok(Some((item.unborrow(), found_item_address)));
780 }
781 }
782 } else {
783 self.next_address = NextAddress::PageAfter(current_page);
784 }
785 }
786 }
787}
788
789pub struct QueueIteratorEntry<'s, 'd, 'q, S: NorFlash, CI: CacheImpl> {
791 iter: &'q mut QueueIterator<'s, S, CI>,
792 address: u32,
793 item: Item<'d>,
794}
795
796impl<S: NorFlash, CI: CacheImpl> Deref for QueueIteratorEntry<'_, '_, '_, S, CI> {
797 type Target = [u8];
798
799 fn deref(&self) -> &Self::Target {
800 self.item.data()
801 }
802}
803
804impl<S: NorFlash, CI: CacheImpl> DerefMut for QueueIteratorEntry<'_, '_, '_, S, CI> {
805 fn deref_mut(&mut self) -> &mut Self::Target {
806 self.item.data_mut()
807 }
808}
809
810impl<'d, S: NorFlash, CI: CacheImpl> QueueIteratorEntry<'_, 'd, '_, S, CI> {
811 #[must_use]
814 pub fn into_buf(self) -> &'d mut [u8] {
815 self.item.data_owned()
816 }
817
818 pub async fn pop(self) -> Result<&'d mut [u8], Error<S::Error>>
821 where
822 S: MultiwriteNorFlash,
823 {
824 let (header, item_data_buffer) = self.item.header_and_data_owned();
825
826 if self.iter.previous_item_states == PreviousItemStates::AllButCurrentPopped {
828 self.iter.previous_item_states = PreviousItemStates::AllPopped;
829 }
830
831 header
832 .erase_data(
833 &mut self.iter.storage.inner.flash,
834 self.iter.storage.inner.flash_range.clone(),
835 &mut self.iter.storage.inner.cache,
836 self.address,
837 )
838 .await?;
839
840 self.iter.storage.inner.cache.unmark_dirty();
841 Ok(item_data_buffer)
842 }
843
844 #[cfg(feature = "_test")]
846 pub fn address(&self) -> u32 {
847 self.address
848 }
849}
850
851#[cfg(test)]
852mod tests {
853 use crate::{
854 AlignedBuf,
855 cache::NoCache,
856 mock_flash::{self, FlashAverageStatsResult, FlashStatsResult, WriteCountCheck},
857 };
858
859 use super::*;
860 use futures_test::test;
861
862 type MockFlashBig = mock_flash::MockFlashBase<4, 4, 256>;
863 type MockFlashTiny = mock_flash::MockFlashBase<2, 1, 32>;
864
865 #[test]
866 async fn peek_and_overwrite_old_data() {
867 let mut storage = QueueStorage::new(
868 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
869 const { QueueConfig::new(0x00..0x40) },
870 NoCache::new(),
871 );
872 let mut data_buffer = AlignedBuf([0; 1024]);
873 const DATA_SIZE: usize = 22;
874
875 assert_eq!(storage.space_left().await.unwrap(), 60);
876
877 assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
878
879 data_buffer[..DATA_SIZE].copy_from_slice(&[0xAA; DATA_SIZE]);
880 storage
881 .push(&data_buffer[..DATA_SIZE], false)
882 .await
883 .unwrap();
884
885 assert_eq!(storage.space_left().await.unwrap(), 30);
886
887 assert_eq!(
888 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
889 &[0xAA; DATA_SIZE]
890 );
891 data_buffer[..DATA_SIZE].copy_from_slice(&[0xBB; DATA_SIZE]);
892 storage
893 .push(&data_buffer[..DATA_SIZE], false)
894 .await
895 .unwrap();
896
897 assert_eq!(storage.space_left().await.unwrap(), 0);
898
899 assert_eq!(
900 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
901 &[0xAA; DATA_SIZE]
902 );
903
904 data_buffer[..DATA_SIZE].copy_from_slice(&[0xCC; DATA_SIZE]);
906 storage
907 .push(&data_buffer[..DATA_SIZE], false)
908 .await
909 .unwrap_err();
910 data_buffer[..DATA_SIZE].copy_from_slice(&[0xDD; DATA_SIZE]);
912 storage.push(&data_buffer[..DATA_SIZE], true).await.unwrap();
913
914 assert_eq!(
915 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
916 &[0xBB; DATA_SIZE]
917 );
918 assert_eq!(
919 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
920 &[0xBB; DATA_SIZE]
921 );
922
923 assert_eq!(storage.space_left().await.unwrap(), 30);
924
925 assert_eq!(
926 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
927 &[0xDD; DATA_SIZE]
928 );
929 assert_eq!(
930 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
931 &[0xDD; DATA_SIZE]
932 );
933
934 assert_eq!(storage.space_left().await.unwrap(), 60);
935
936 assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
937 assert_eq!(storage.pop(&mut data_buffer).await.unwrap(), None);
938 }
939
940 #[test]
941 async fn push_pop() {
942 let mut storage = QueueStorage::new(
943 MockFlashBig::new(WriteCountCheck::Twice, None, true),
944 const { QueueConfig::new(0x000..0x1000) },
945 NoCache::new(),
946 );
947
948 let mut data_buffer = AlignedBuf([0; 1024]);
949
950 for i in 0..2000 {
951 println!("{i}");
952 let data = vec![i as u8; i % 512 + 1];
953
954 storage.push(&data, true).await.unwrap();
955 assert_eq!(
956 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
957 &data,
958 "At {i}"
959 );
960 assert_eq!(
961 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
962 &data,
963 "At {i}"
964 );
965 assert_eq!(
966 storage.peek(&mut data_buffer).await.unwrap(),
967 None,
968 "At {i}"
969 );
970 }
971 }
972
973 #[test]
974 async fn iter_pop_out_of_order() {
975 let mut storage = QueueStorage::new(
976 MockFlashBig::new(WriteCountCheck::Twice, None, true),
977 const { QueueConfig::new(0x000..0x1000) },
978 NoCache::new(),
979 );
980
981 let mut data_buffer = AlignedBuf([0; 1024]);
982
983 let gen_data = |i: usize| vec![i as u8; i % 512 + 1];
984 const COUNT: usize = 20;
985
986 for i in 0..COUNT {
987 storage.push(&gen_data(i), false).await.unwrap();
988 }
989
990 let mut iterator = storage.iter().await.unwrap();
991 let mut i = 0;
992 while let Some(entry) = iterator.next(&mut data_buffer).await.unwrap() {
993 if i % 2 == 1 {
994 assert_eq!(entry.pop().await.unwrap(), gen_data(i));
995 }
996
997 i += 1;
998 }
999 assert_eq!(i, COUNT);
1000
1001 let mut iterator = storage.iter().await.unwrap();
1002 let mut i = 0;
1003 while let Some(entry) = iterator.next(&mut data_buffer).await.unwrap() {
1004 assert_eq!(entry.into_buf(), gen_data(i));
1005 i += 2;
1006 }
1007 assert_eq!(i, COUNT);
1008 }
1009
1010 #[test]
1011 async fn push_pop_tiny() {
1012 let mut storage = QueueStorage::new(
1013 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1014 const { QueueConfig::new(0x00..0x40) },
1015 NoCache::new(),
1016 );
1017 let mut data_buffer = AlignedBuf([0; 1024]);
1018
1019 for i in 0..2000 {
1020 println!("{i}");
1021 let data = vec![i as u8; i % 20 + 1];
1022
1023 println!("PUSH");
1024 storage.push(&data, true).await.unwrap();
1025 assert_eq!(
1026 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
1027 &data,
1028 "At {i}"
1029 );
1030 println!("POP");
1031 assert_eq!(
1032 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1033 &data,
1034 "At {i}"
1035 );
1036 println!("PEEK");
1037 assert_eq!(
1038 storage.peek(&mut data_buffer).await.unwrap(),
1039 None,
1040 "At {i}"
1041 );
1042 println!("DONE");
1043 }
1044 }
1045
1046 #[test]
1047 async fn push_peek_pop_many() {
1049 let mut storage = QueueStorage::new(
1050 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1051 const { QueueConfig::new(0x000..0x1000) },
1052 NoCache::new(),
1053 );
1054 let mut data_buffer = AlignedBuf([0; 1024]);
1055
1056 let mut push_stats = FlashStatsResult::default();
1057 let mut pushes = 0;
1058 let mut peek_stats = FlashStatsResult::default();
1059 let mut peeks = 0;
1060 let mut pop_stats = FlashStatsResult::default();
1061 let mut pops = 0;
1062
1063 for loop_index in 0..100 {
1064 println!("Loop index: {loop_index}");
1065
1066 for i in 0..20 {
1067 let start_snapshot = storage.flash().stats_snapshot();
1068 let data = vec![i as u8; 50];
1069 storage.push(&data, false).await.unwrap();
1070 pushes += 1;
1071 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1072 }
1073
1074 let start_snapshot = storage.flash().stats_snapshot();
1075 let mut iterator = storage.iter().await.unwrap();
1076 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1077 for i in 0..5 {
1078 let start_snapshot = iterator.storage.flash().stats_snapshot();
1079 let data = [i as u8; 50];
1080 assert_eq!(
1081 iterator
1082 .next(&mut data_buffer)
1083 .await
1084 .unwrap()
1085 .unwrap()
1086 .deref(),
1087 &data[..],
1088 "At {i}"
1089 );
1090 peeks += 1;
1091 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1092 }
1093
1094 let start_snapshot = storage.flash().stats_snapshot();
1095 let mut iterator = storage.iter().await.unwrap();
1096 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1097 for i in 0..5 {
1098 let start_snapshot = iterator.storage.flash().stats_snapshot();
1099 let data = vec![i as u8; 50];
1100 assert_eq!(
1101 iterator
1102 .next(&mut data_buffer)
1103 .await
1104 .unwrap()
1105 .unwrap()
1106 .pop()
1107 .await
1108 .unwrap(),
1109 &data,
1110 "At {i}"
1111 );
1112 pops += 1;
1113 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1114 }
1115
1116 for i in 20..25 {
1117 let start_snapshot = storage.flash().stats_snapshot();
1118 let data = vec![i as u8; 50];
1119 storage.push(&data, false).await.unwrap();
1120 pushes += 1;
1121 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1122 }
1123
1124 let start_snapshot = storage.flash().stats_snapshot();
1125 let mut iterator = storage.iter().await.unwrap();
1126 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1127 for i in 5..25 {
1128 let start_snapshot = iterator.storage.flash().stats_snapshot();
1129 let data = vec![i as u8; 50];
1130 assert_eq!(
1131 iterator
1132 .next(&mut data_buffer)
1133 .await
1134 .unwrap()
1135 .unwrap()
1136 .deref(),
1137 &data,
1138 "At {i}"
1139 );
1140 peeks += 1;
1141 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1142 }
1143
1144 let start_snapshot = storage.flash().stats_snapshot();
1145 let mut iterator = storage.iter().await.unwrap();
1146 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1147 for i in 5..25 {
1148 let start_snapshot = iterator.storage.flash().stats_snapshot();
1149 let data = vec![i as u8; 50];
1150 assert_eq!(
1151 iterator
1152 .next(&mut data_buffer)
1153 .await
1154 .unwrap()
1155 .unwrap()
1156 .pop()
1157 .await
1158 .unwrap(),
1159 &data,
1160 "At {i}"
1161 );
1162 pops += 1;
1163 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1164 }
1165 }
1166
1167 approx::assert_relative_eq!(
1169 push_stats.take_average(pushes),
1170 FlashAverageStatsResult {
1171 avg_erases: 0.0,
1172 avg_reads: 16.864,
1173 avg_writes: 3.1252,
1174 avg_bytes_read: 105.4112,
1175 avg_bytes_written: 60.5008
1176 }
1177 );
1178 approx::assert_relative_eq!(
1179 peek_stats.take_average(peeks),
1180 FlashAverageStatsResult {
1181 avg_erases: 0.0052,
1182 avg_reads: 3.8656,
1183 avg_writes: 0.0,
1184 avg_bytes_read: 70.4256,
1185 avg_bytes_written: 0.0
1186 }
1187 );
1188 approx::assert_relative_eq!(
1189 pop_stats.take_average(pops),
1190 FlashAverageStatsResult {
1191 avg_erases: 0.0572,
1192 avg_reads: 3.7772,
1193 avg_writes: 1.0,
1194 avg_bytes_read: 69.7184,
1195 avg_bytes_written: 8.0
1196 }
1197 );
1198 }
1199
1200 #[test]
1201 async fn push_lots_then_pop_lots() {
1202 let mut storage = QueueStorage::new(
1203 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1204 const { QueueConfig::new(0x000..0x1000) },
1205 NoCache::new(),
1206 );
1207 let mut data_buffer = AlignedBuf([0; 1024]);
1208
1209 let mut push_stats = FlashStatsResult::default();
1210 let mut pushes = 0;
1211 let mut pop_stats = FlashStatsResult::default();
1212 let mut pops = 0;
1213
1214 for loop_index in 0..100 {
1215 println!("Loop index: {loop_index}");
1216
1217 for i in 0..20 {
1218 let start_snapshot = storage.flash().stats_snapshot();
1219 let data = vec![i as u8; 50];
1220 storage.push(&data, false).await.unwrap();
1221 pushes += 1;
1222 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1223 }
1224
1225 for i in 0..5 {
1226 let start_snapshot = storage.flash().stats_snapshot();
1227 let data = vec![i as u8; 50];
1228 assert_eq!(
1229 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1230 &data,
1231 "At {i}"
1232 );
1233 pops += 1;
1234 pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1235 }
1236
1237 for i in 20..25 {
1238 let start_snapshot = storage.flash().stats_snapshot();
1239 let data = vec![i as u8; 50];
1240 storage.push(&data, false).await.unwrap();
1241 pushes += 1;
1242 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1243 }
1244
1245 for i in 5..25 {
1246 let start_snapshot = storage.flash().stats_snapshot();
1247 let data = vec![i as u8; 50];
1248 assert_eq!(
1249 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1250 &data,
1251 "At {i}"
1252 );
1253 pops += 1;
1254 pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1255 }
1256 }
1257
1258 approx::assert_relative_eq!(
1260 push_stats.take_average(pushes),
1261 FlashAverageStatsResult {
1262 avg_erases: 0.0,
1263 avg_reads: 16.864,
1264 avg_writes: 3.1252,
1265 avg_bytes_read: 105.4112,
1266 avg_bytes_written: 60.5008
1267 }
1268 );
1269 approx::assert_relative_eq!(
1270 pop_stats.take_average(pops),
1271 FlashAverageStatsResult {
1272 avg_erases: 0.0624,
1273 avg_reads: 23.5768,
1274 avg_writes: 1.0,
1275 avg_bytes_read: 180.512,
1276 avg_bytes_written: 8.0
1277 }
1278 );
1279 }
1280
1281 #[test]
1282 async fn pop_with_empty_section() {
1283 let mut storage = QueueStorage::new(
1284 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1285 const { QueueConfig::new(0x00..0x40) },
1286 NoCache::new(),
1287 );
1288 let mut data_buffer = AlignedBuf([0; 1024]);
1289
1290 data_buffer[..20].copy_from_slice(&[0xAA; 20]);
1291 storage.push(&data_buffer[0..20], false).await.unwrap();
1292 data_buffer[..20].copy_from_slice(&[0xBB; 20]);
1293 storage.push(&data_buffer[0..20], false).await.unwrap();
1294
1295 assert_eq!(
1298 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1299 &[0xAA; 20]
1300 );
1301
1302 assert_eq!(
1303 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1304 &[0xBB; 20]
1305 );
1306 }
1307
1308 #[test]
1309 async fn search_pages() {
1310 let mut storage = QueueStorage::new(
1311 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1312 const { QueueConfig::new(0x000..0x1000) },
1313 NoCache::new(),
1314 );
1315
1316 storage.inner.close_page(0).await.unwrap();
1317 storage.inner.close_page(1).await.unwrap();
1318 storage.inner.partial_close_page(2).await.unwrap();
1319
1320 assert_eq!(storage.find_youngest_page().await.unwrap(), 2);
1321 assert_eq!(storage.find_oldest_page().await.unwrap(), 0);
1322 }
1323
1324 #[test]
1325 async fn store_too_big_item() {
1326 let mut storage = QueueStorage::new(
1327 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1328 const { QueueConfig::new(0x000..0x1000) },
1329 NoCache::new(),
1330 );
1331
1332 storage
1333 .push(&AlignedBuf([0; 1024 - 4 * 2 - 8]), false)
1334 .await
1335 .unwrap();
1336
1337 assert_eq!(
1338 storage
1339 .push(&AlignedBuf([0; 1024 - 4 * 2 - 8 + 1]), false,)
1340 .await,
1341 Err(Error::ItemTooBig)
1342 );
1343 }
1344
1345 #[test]
1346 async fn push_on_single_page() {
1347 let mut storage = QueueStorage::new(
1348 mock_flash::MockFlashBase::<1, 4, 256>::new(WriteCountCheck::Twice, None, true),
1349 const { QueueConfig::new(0x000..0x400) },
1350 NoCache::new(),
1351 );
1352
1353 for _ in 0..100 {
1354 match storage.push(&[0, 1, 2, 3, 4], true).await {
1355 Ok(_) => {}
1356 Err(e) => {
1357 println!("{}", storage.print_items().await);
1358 panic!("{e}");
1359 }
1360 }
1361 }
1362 }
1363}