1use crate::item::{Item, ItemHeader, ItemHeaderIter};
4
5use self::{cache::CacheImpl, item::ItemUnborrowed};
6
7use super::{
8 Debug, Deref, DerefMut, Error, GenericStorage, MAX_WORD_SIZE, NorFlash, NorFlashExt, PageState,
9 PhantomData, Range, cache, calculate_page_address, calculate_page_end_address,
10 calculate_page_index, calculate_page_size, item, run_with_auto_repair,
11};
12use embedded_storage_async::nor_flash::MultiwriteNorFlash;
13
14pub struct QueueConfig<S> {
16 flash_range: Range<u32>,
17 _phantom: PhantomData<S>,
18}
19
20impl<S: NorFlash> QueueConfig<S> {
21 #[must_use]
24 pub const fn new(flash_range: Range<u32>) -> Self {
25 Self::try_new(flash_range).expect("Queue config must be correct")
26 }
27
28 #[must_use]
30 pub const fn try_new(flash_range: Range<u32>) -> Option<Self> {
31 if !flash_range.start.is_multiple_of(S::ERASE_SIZE as u32) {
32 return None;
33 }
34 if !flash_range.end.is_multiple_of(S::ERASE_SIZE as u32) {
35 return None;
36 }
37 if flash_range.end - flash_range.start < (S::ERASE_SIZE as u32) {
39 return None;
40 }
41
42 if S::ERASE_SIZE < S::WORD_SIZE * 4 {
43 return None;
44 }
45 if S::WORD_SIZE > MAX_WORD_SIZE {
46 return None;
47 }
48
49 Some(Self {
50 flash_range,
51 _phantom: PhantomData,
52 })
53 }
54}
55
56pub struct QueueStorage<S: NorFlash, C: CacheImpl> {
114 inner: GenericStorage<S, C>,
115}
116
117impl<S: NorFlash, C: CacheImpl> QueueStorage<S, C> {
118 pub const fn new(storage: S, config: QueueConfig<S>, cache: C) -> Self {
124 Self {
125 inner: GenericStorage {
126 flash: storage,
127 flash_range: config.flash_range,
128 cache,
129 },
130 }
131 }
132
133 pub async fn push(
142 &mut self,
143 data: &[u8],
144 allow_overwrite_old_data: bool,
145 ) -> Result<(), Error<S::Error>> {
146 run_with_auto_repair!(
147 function = self.push_inner(data, allow_overwrite_old_data).await,
148 repair = self.try_repair().await?
149 )
150 }
151
152 async fn push_inner(
153 &mut self,
154 data: &[u8],
155 allow_overwrite_old_data: bool,
156 ) -> Result<(), Error<S::Error>> {
157 if self.inner.cache.is_dirty() {
158 self.inner.cache.invalidate_cache_state();
159 }
160
161 if data.len() > u16::MAX as usize
163 || data.len()
164 > calculate_page_size::<S>()
165 .saturating_sub(ItemHeader::data_address::<S>(0) as usize)
166 {
167 self.inner.cache.unmark_dirty();
168 return Err(Error::ItemTooBig);
169 }
170
171 let current_page = self.find_youngest_page().await?;
172
173 let page_data_start_address =
174 calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
175 let page_data_end_address =
176 calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
177
178 self.inner.partial_close_page(current_page).await?;
179
180 let mut next_address = self
183 .inner
184 .find_next_free_item_spot(
185 page_data_start_address,
186 page_data_end_address,
187 data.len() as u32,
188 )
189 .await?;
190
191 if next_address.is_none() {
192 let next_page = self.inner.next_page(current_page);
194 let next_page_state = self.inner.get_page_state(next_page).await?;
195 let single_page = next_page == current_page;
196
197 match (next_page_state, single_page) {
198 (PageState::Open, _) => {
199 self.inner.close_page(current_page).await?;
200 self.inner.partial_close_page(next_page).await?;
201 next_address = Some(
202 calculate_page_address::<S>(self.flash_range(), next_page)
203 + S::WORD_SIZE as u32,
204 );
205 }
206 (PageState::Closed, _) | (PageState::PartialOpen, true) => {
207 let next_page_data_start_address =
208 calculate_page_address::<S>(self.flash_range(), next_page)
209 + S::WORD_SIZE as u32;
210
211 if !allow_overwrite_old_data
212 && !self
213 .inner
214 .is_page_empty(next_page, Some(next_page_state))
215 .await?
216 {
217 self.inner.cache.unmark_dirty();
218 return Err(Error::FullStorage);
219 }
220
221 self.inner.open_page(next_page).await?;
222 if !single_page {
223 self.inner.close_page(current_page).await?;
224 }
225 self.inner.partial_close_page(next_page).await?;
226 next_address = Some(next_page_data_start_address);
227 }
228 (PageState::PartialOpen, false) => {
229 return Err(Error::Corrupted {
231 #[cfg(feature = "_test")]
232 backtrace: std::backtrace::Backtrace::capture(),
233 });
234 }
235 }
236 }
237
238 Item::write_new(
239 &mut self.inner.flash,
240 self.inner.flash_range.clone(),
241 &mut self.inner.cache,
242 next_address.unwrap(),
243 data,
244 )
245 .await?;
246
247 self.inner.cache.unmark_dirty();
248 Ok(())
249 }
250
251 pub async fn iter(&mut self) -> Result<QueueIterator<'_, S, C>, Error<S::Error>> {
258 QueueIterator::new(self).await
260 }
261
262 pub async fn peek<'d>(
273 &mut self,
274 data_buffer: &'d mut [u8],
275 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>> {
276 let mut iterator = self.iter().await?;
278
279 let next_value = iterator.next(data_buffer).await?;
280
281 match next_value {
282 Some(entry) => Ok(Some(entry.into_buf())),
283 None => Ok(None),
284 }
285 }
286
287 pub async fn pop<'d>(
298 &mut self,
299 data_buffer: &'d mut [u8],
300 ) -> Result<Option<&'d mut [u8]>, Error<S::Error>>
301 where
302 S: MultiwriteNorFlash,
303 {
304 let mut iterator = self.iter().await?;
305
306 let next_value = iterator.next(data_buffer).await?;
307
308 match next_value {
309 Some(entry) => Ok(Some(entry.pop().await?)),
310 None => Ok(None),
311 }
312 }
313
314 pub async fn find_max_fit(&mut self) -> Result<Option<u32>, Error<S::Error>> {
321 run_with_auto_repair!(
322 function = self.find_max_fit_inner().await,
323 repair = self.try_repair().await?
324 )
325 }
326
327 async fn find_max_fit_inner(&mut self) -> Result<Option<u32>, Error<S::Error>> {
328 if self.inner.cache.is_dirty() {
329 self.inner.cache.invalidate_cache_state();
330 }
331
332 let current_page = self.find_youngest_page().await?;
333
334 let next_page = self.inner.next_page(current_page);
336 match self.inner.get_page_state(next_page).await? {
337 state @ PageState::Closed => {
338 if self.inner.is_page_empty(next_page, Some(state)).await? {
339 self.inner.cache.unmark_dirty();
340 return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
341 }
342 }
343 PageState::Open => {
344 self.inner.cache.unmark_dirty();
345 return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32));
346 }
347 PageState::PartialOpen => {
348 return Err(Error::Corrupted {
350 #[cfg(feature = "_test")]
351 backtrace: std::backtrace::Backtrace::capture(),
352 });
353 }
354 }
355
356 let page_data_start_address =
358 calculate_page_address::<S>(self.flash_range(), current_page) + S::WORD_SIZE as u32;
359 let page_data_end_address =
360 calculate_page_end_address::<S>(self.flash_range(), current_page) - S::WORD_SIZE as u32;
361
362 let next_item_address = match self.inner.cache.first_item_after_written(current_page) {
363 Some(next_item_address) => next_item_address,
364 None => {
365 ItemHeaderIter::new(
366 self.inner
367 .cache
368 .first_item_after_erased(current_page)
369 .unwrap_or(page_data_start_address),
370 page_data_end_address,
371 )
372 .traverse(&mut self.inner.flash, |_, _| true)
373 .await?
374 .1
375 }
376 };
377
378 self.inner.cache.unmark_dirty();
379 Ok(ItemHeader::available_data_bytes::<S>(
380 page_data_end_address - next_item_address,
381 ))
382 }
383
384 pub async fn space_left(&mut self) -> Result<u32, Error<S::Error>> {
394 run_with_auto_repair!(
395 function = self.space_left_inner().await,
396 repair = self.try_repair().await?
397 )
398 }
399
400 async fn space_left_inner(&mut self) -> Result<u32, Error<S::Error>> {
401 if self.inner.cache.is_dirty() {
402 self.inner.cache.invalidate_cache_state();
403 }
404
405 let mut total_free_space = 0;
406
407 for page in self.inner.get_pages(0) {
408 let state = self.inner.get_page_state(page).await?;
409 let page_empty = self.inner.is_page_empty(page, Some(state)).await?;
410
411 if state.is_closed() && !page_empty {
412 continue;
413 }
414
415 let page_data_start_address =
417 calculate_page_address::<S>(self.flash_range(), page) + S::WORD_SIZE as u32;
418 let page_data_end_address =
419 calculate_page_end_address::<S>(self.flash_range(), page) - S::WORD_SIZE as u32;
420
421 if page_empty {
422 total_free_space += page_data_end_address - page_data_start_address;
423 continue;
424 }
425
426 let next_item_address = match self.inner.cache.first_item_after_written(page) {
428 Some(next_item_address) => next_item_address,
429 None => {
430 ItemHeaderIter::new(
431 self.inner
432 .cache
433 .first_item_after_erased(page)
434 .unwrap_or(page_data_start_address),
435 page_data_end_address,
436 )
437 .traverse(&mut self.inner.flash, |_, _| true)
438 .await?
439 .1
440 }
441 };
442
443 if ItemHeader::available_data_bytes::<S>(page_data_end_address - next_item_address)
444 .is_none()
445 {
446 if self
450 .inner
451 .is_page_empty(page, Some(PageState::Closed))
452 .await?
453 {
454 total_free_space += page_data_end_address - page_data_start_address;
455 continue;
456 }
457 }
458
459 total_free_space += page_data_end_address - next_item_address;
460 }
461
462 self.inner.cache.unmark_dirty();
463 Ok(total_free_space)
464 }
465
466 async fn find_youngest_page(&mut self) -> Result<usize, Error<S::Error>> {
467 let last_used_page = self
468 .inner
469 .find_first_page(0, PageState::PartialOpen)
470 .await?;
471
472 if let Some(last_used_page) = last_used_page {
473 return Ok(last_used_page);
474 }
475
476 let first_closed_page = self.inner.find_first_page(0, PageState::Closed).await?;
478
479 let first_open_page = match first_closed_page {
480 Some(anchor) => {
481 self.inner.find_first_page(anchor, PageState::Open).await?
484 }
485 None => {
486 Some(0)
489 }
490 };
491
492 if let Some(first_open_page) = first_open_page {
493 return Ok(first_open_page);
494 }
495
496 Err(Error::Corrupted {
498 #[cfg(feature = "_test")]
499 backtrace: std::backtrace::Backtrace::capture(),
500 })
501 }
502
503 async fn find_oldest_page(&mut self) -> Result<usize, Error<S::Error>> {
504 let youngest_page = self.find_youngest_page().await?;
505
506 let oldest_closed_page = self
508 .inner
509 .find_first_page(youngest_page, PageState::Closed)
510 .await?;
511
512 Ok(oldest_closed_page.unwrap_or(youngest_page))
513 }
514
515 async fn try_repair(&mut self) -> Result<(), Error<S::Error>> {
526 self.inner.cache.invalidate_cache_state();
527
528 self.inner.try_general_repair().await?;
529 Ok(())
530 }
531
532 async fn find_start_address(&mut self) -> Result<NextAddress, Error<S::Error>> {
533 if self.inner.cache.is_dirty() {
534 self.inner.cache.invalidate_cache_state();
535 }
536
537 let oldest_page = self.find_oldest_page().await?;
538
539 let current_address = match self.inner.cache.first_item_after_erased(oldest_page) {
541 Some(address) => address,
542 None => {
543 calculate_page_address::<S>(self.inner.flash_range.clone(), oldest_page)
544 + S::WORD_SIZE as u32
545 }
546 };
547
548 Ok(NextAddress::Address(current_address))
549 }
550
551 pub fn erase_all(&mut self) -> impl Future<Output = Result<(), Error<S::Error>>> {
555 self.inner.erase_all()
556 }
557
558 #[must_use]
563 pub const fn item_overhead_size() -> u32 {
564 GenericStorage::<S, C>::item_overhead_size()
565 }
566
567 pub fn destroy(self) -> (S, C) {
571 self.inner.destroy()
572 }
573
574 pub const fn flash(&mut self) -> &mut S {
576 self.inner.flash()
577 }
578
579 pub const fn flash_range(&self) -> Range<u32> {
581 self.inner.flash_range()
582 }
583
584 #[cfg(any(test, feature = "std"))]
585 pub fn print_items(&mut self) -> impl Future<Output = String> {
589 self.inner.print_items()
590 }
591}
592
593#[derive(PartialEq, Eq, Clone, Copy, Debug)]
594enum PreviousItemStates {
595 AllPopped,
596 AllButCurrentPopped,
597 Unpopped,
598}
599
600pub struct QueueIterator<'s, S: NorFlash, C: CacheImpl> {
602 storage: &'s mut QueueStorage<S, C>,
603 next_address: NextAddress,
604 previous_item_states: PreviousItemStates,
605 oldest_page: usize,
606}
607
608impl<S: NorFlash, C: CacheImpl> Debug for QueueIterator<'_, S, C> {
609 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
610 f.debug_struct("QueueIterator")
611 .field("current_address", &self.next_address)
612 .finish_non_exhaustive()
613 }
614}
615
616#[derive(Debug, Clone, Copy)]
617enum NextAddress {
618 Address(u32),
619 PageAfter(usize),
620}
621
622impl<'s, S: NorFlash, C: CacheImpl> QueueIterator<'s, S, C> {
623 async fn new(storage: &'s mut QueueStorage<S, C>) -> Result<Self, Error<S::Error>> {
624 let start_address = run_with_auto_repair!(
625 function = storage.find_start_address().await,
626 repair = storage.try_repair().await?
627 )?;
628
629 let oldest_page = match start_address {
630 NextAddress::Address(address) => {
631 calculate_page_index::<S>(storage.inner.flash_range.clone(), address)
632 }
633 NextAddress::PageAfter(index) => storage.inner.next_page(index),
634 };
635
636 Ok(Self {
637 storage,
638 next_address: start_address,
639 previous_item_states: PreviousItemStates::AllPopped,
640 oldest_page,
641 })
642 }
643
644 pub async fn next<'d, 'q>(
650 &'q mut self,
651 data_buffer: &'d mut [u8],
652 ) -> Result<Option<QueueIteratorEntry<'s, 'd, 'q, S, C>>, Error<S::Error>> {
653 if self.previous_item_states == PreviousItemStates::AllButCurrentPopped {
656 self.previous_item_states = PreviousItemStates::Unpopped;
657 }
658
659 let value = run_with_auto_repair!(
660 function = self.next_inner(data_buffer).await,
661 repair = self.storage.try_repair().await?
662 );
663
664 match value {
665 Ok(Some((item, address))) => Ok(Some(QueueIteratorEntry {
666 iter: self,
667 item: item.reborrow(data_buffer).ok_or_else(|| Error::LogicBug {
668 #[cfg(feature = "_test")]
669 backtrace: std::backtrace::Backtrace::capture(),
670 })?,
671 address,
672 })),
673 Ok(None) => Ok(None),
674 Err(e) => Err(e),
675 }
676 }
677
678 async fn next_inner(
679 &mut self,
680 data_buffer: &mut [u8],
681 ) -> Result<Option<(ItemUnborrowed, u32)>, Error<S::Error>> {
682 if self.storage.inner.cache.is_dirty() {
683 self.storage.inner.cache.invalidate_cache_state();
684 }
685
686 loop {
687 let (current_page, current_address) = match self.next_address {
689 NextAddress::PageAfter(previous_page) => {
690 let next_page = self.storage.inner.next_page(previous_page);
691 if self
692 .storage
693 .inner
694 .get_page_state(next_page)
695 .await?
696 .is_open()
697 || next_page == self.oldest_page
698 {
699 self.storage.inner.cache.unmark_dirty();
700 return Ok(None);
701 }
702
703 if self.previous_item_states == PreviousItemStates::AllPopped {
707 self.storage.inner.open_page(previous_page).await?;
708 }
709
710 let current_address = calculate_page_address::<S>(
711 self.storage.inner.flash_range.clone(),
712 next_page,
713 ) + S::WORD_SIZE as u32;
714
715 self.next_address = NextAddress::Address(current_address);
716
717 (next_page, current_address)
718 }
719 NextAddress::Address(address) => (
720 calculate_page_index::<S>(self.storage.inner.flash_range.clone(), address),
721 address,
722 ),
723 };
724
725 let page_data_end_address = calculate_page_end_address::<S>(
726 self.storage.inner.flash_range.clone(),
727 current_page,
728 ) - S::WORD_SIZE as u32;
729
730 let mut it = ItemHeaderIter::new(current_address, page_data_end_address);
732 if let (Some(found_item_header), found_item_address) = it
734 .traverse(&mut self.storage.inner.flash, |header, _| {
735 header.crc.is_none()
736 })
737 .await?
738 {
739 let maybe_item = found_item_header
740 .read_item(
741 &mut self.storage.inner.flash,
742 data_buffer,
743 found_item_address,
744 page_data_end_address,
745 )
746 .await?;
747
748 match maybe_item {
749 item::MaybeItem::Corrupted(header, _) => {
750 let next_address = header.next_item_address::<S>(found_item_address);
751 self.next_address = if next_address >= page_data_end_address {
752 NextAddress::PageAfter(current_page)
753 } else {
754 NextAddress::Address(next_address)
755 };
756 }
757 item::MaybeItem::Erased(_, _) => {
758 return Err(Error::LogicBug {
760 #[cfg(feature = "_test")]
761 backtrace: std::backtrace::Backtrace::capture(),
762 });
763 }
764 item::MaybeItem::Present(item) => {
765 let next_address = item.header.next_item_address::<S>(found_item_address);
766 self.next_address = if next_address >= page_data_end_address {
767 NextAddress::PageAfter(current_page)
768 } else {
769 NextAddress::Address(next_address)
770 };
771
772 if self.previous_item_states == PreviousItemStates::AllPopped {
774 self.previous_item_states = PreviousItemStates::AllButCurrentPopped;
775 }
776
777 self.storage.inner.cache.unmark_dirty();
779 return Ok(Some((item.unborrow(), found_item_address)));
780 }
781 }
782 } else {
783 self.next_address = NextAddress::PageAfter(current_page);
784 }
785 }
786 }
787}
788
789pub struct QueueIteratorEntry<'s, 'd, 'q, S: NorFlash, CI: CacheImpl> {
791 iter: &'q mut QueueIterator<'s, S, CI>,
792 address: u32,
793 item: Item<'d>,
794}
795
796impl<S: NorFlash, CI: CacheImpl> Deref for QueueIteratorEntry<'_, '_, '_, S, CI> {
797 type Target = [u8];
798
799 fn deref(&self) -> &Self::Target {
800 self.item.data()
801 }
802}
803
804impl<S: NorFlash, CI: CacheImpl> DerefMut for QueueIteratorEntry<'_, '_, '_, S, CI> {
805 fn deref_mut(&mut self) -> &mut Self::Target {
806 self.item.data_mut()
807 }
808}
809
810impl<'d, S: NorFlash, CI: CacheImpl> QueueIteratorEntry<'_, 'd, '_, S, CI> {
811 #[must_use]
814 pub fn into_buf(self) -> &'d mut [u8] {
815 self.item.data_owned()
816 }
817
818 pub async fn pop(self) -> Result<&'d mut [u8], Error<S::Error>>
821 where
822 S: MultiwriteNorFlash,
823 {
824 let (header, item_data_buffer) = self.item.header_and_data_owned();
825
826 if self.iter.previous_item_states == PreviousItemStates::AllButCurrentPopped {
828 self.iter.previous_item_states = PreviousItemStates::AllPopped;
829 }
830
831 header
832 .erase_data(
833 &mut self.iter.storage.inner.flash,
834 self.iter.storage.inner.flash_range.clone(),
835 &mut self.iter.storage.inner.cache,
836 self.address,
837 )
838 .await?;
839
840 self.iter.storage.inner.cache.unmark_dirty();
841 Ok(item_data_buffer)
842 }
843
844 #[cfg(feature = "_test")]
846 pub fn address(&self) -> u32 {
847 self.address
848 }
849}
850
851#[cfg(test)]
852mod tests {
853 use crate::{
854 AlignedBuf,
855 cache::NoCache,
856 mock_flash::{self, FlashAverageStatsResult, FlashStatsResult, WriteCountCheck},
857 };
858
859 use super::*;
860 use futures_test::test;
861
862 type MockFlashBig = mock_flash::MockFlashBase<4, 4, 256>;
863 type MockFlashTiny = mock_flash::MockFlashBase<2, 1, 32>;
864
865 #[test]
866 async fn peek_and_overwrite_old_data() {
867 let mut storage = QueueStorage::new(
868 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
869 const { QueueConfig::new(0x00..0x40) },
870 NoCache::new(),
871 );
872 let mut data_buffer = AlignedBuf([0; 1024]);
873 const DATA_SIZE: usize = 22;
874
875 assert_eq!(storage.space_left().await.unwrap(), 60);
876
877 assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
878
879 data_buffer[..DATA_SIZE].copy_from_slice(&[0xAA; DATA_SIZE]);
880 storage
881 .push(&data_buffer[..DATA_SIZE], false)
882 .await
883 .unwrap();
884
885 assert_eq!(storage.space_left().await.unwrap(), 30);
886
887 assert_eq!(
888 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
889 &[0xAA; DATA_SIZE]
890 );
891 data_buffer[..DATA_SIZE].copy_from_slice(&[0xBB; DATA_SIZE]);
892 storage
893 .push(&data_buffer[..DATA_SIZE], false)
894 .await
895 .unwrap();
896
897 assert_eq!(storage.space_left().await.unwrap(), 0);
898
899 assert_eq!(
900 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
901 &[0xAA; DATA_SIZE]
902 );
903
904 data_buffer[..DATA_SIZE].copy_from_slice(&[0xCC; DATA_SIZE]);
906 storage
907 .push(&data_buffer[..DATA_SIZE], false)
908 .await
909 .unwrap_err();
910 data_buffer[..DATA_SIZE].copy_from_slice(&[0xDD; DATA_SIZE]);
912 storage.push(&data_buffer[..DATA_SIZE], true).await.unwrap();
913
914 assert_eq!(
915 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
916 &[0xBB; DATA_SIZE]
917 );
918 assert_eq!(
919 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
920 &[0xBB; DATA_SIZE]
921 );
922
923 assert_eq!(storage.space_left().await.unwrap(), 30);
924
925 assert_eq!(
926 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
927 &[0xDD; DATA_SIZE]
928 );
929 assert_eq!(
930 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
931 &[0xDD; DATA_SIZE]
932 );
933
934 assert_eq!(storage.space_left().await.unwrap(), 60);
935
936 assert_eq!(storage.peek(&mut data_buffer).await.unwrap(), None);
937 assert_eq!(storage.pop(&mut data_buffer).await.unwrap(), None);
938 }
939
940 #[test]
941 async fn push_pop() {
942 let mut storage = QueueStorage::new(
943 MockFlashBig::new(WriteCountCheck::Twice, None, true),
944 const { QueueConfig::new(0x000..0x1000) },
945 NoCache::new(),
946 );
947
948 let mut data_buffer = AlignedBuf([0; 1024]);
949
950 for i in 0..2000 {
951 println!("{i}");
952 let data = vec![i as u8; i % 512 + 1];
953
954 storage.push(&data, true).await.unwrap();
955 assert_eq!(
956 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
957 &data,
958 "At {i}"
959 );
960 assert_eq!(
961 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
962 &data,
963 "At {i}"
964 );
965 assert_eq!(
966 storage.peek(&mut data_buffer).await.unwrap(),
967 None,
968 "At {i}"
969 );
970 }
971 }
972
973 #[test]
974 async fn push_pop_tiny() {
975 let mut storage = QueueStorage::new(
976 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
977 const { QueueConfig::new(0x00..0x40) },
978 NoCache::new(),
979 );
980 let mut data_buffer = AlignedBuf([0; 1024]);
981
982 for i in 0..2000 {
983 println!("{i}");
984 let data = vec![i as u8; i % 20 + 1];
985
986 println!("PUSH");
987 storage.push(&data, true).await.unwrap();
988 assert_eq!(
989 storage.peek(&mut data_buffer).await.unwrap().unwrap(),
990 &data,
991 "At {i}"
992 );
993 println!("POP");
994 assert_eq!(
995 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
996 &data,
997 "At {i}"
998 );
999 println!("PEEK");
1000 assert_eq!(
1001 storage.peek(&mut data_buffer).await.unwrap(),
1002 None,
1003 "At {i}"
1004 );
1005 println!("DONE");
1006 }
1007 }
1008
1009 #[test]
1010 async fn push_peek_pop_many() {
1012 let mut storage = QueueStorage::new(
1013 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1014 const { QueueConfig::new(0x000..0x1000) },
1015 NoCache::new(),
1016 );
1017 let mut data_buffer = AlignedBuf([0; 1024]);
1018
1019 let mut push_stats = FlashStatsResult::default();
1020 let mut pushes = 0;
1021 let mut peek_stats = FlashStatsResult::default();
1022 let mut peeks = 0;
1023 let mut pop_stats = FlashStatsResult::default();
1024 let mut pops = 0;
1025
1026 for loop_index in 0..100 {
1027 println!("Loop index: {loop_index}");
1028
1029 for i in 0..20 {
1030 let start_snapshot = storage.flash().stats_snapshot();
1031 let data = vec![i as u8; 50];
1032 storage.push(&data, false).await.unwrap();
1033 pushes += 1;
1034 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1035 }
1036
1037 let start_snapshot = storage.flash().stats_snapshot();
1038 let mut iterator = storage.iter().await.unwrap();
1039 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1040 for i in 0..5 {
1041 let start_snapshot = iterator.storage.flash().stats_snapshot();
1042 let data = [i as u8; 50];
1043 assert_eq!(
1044 iterator
1045 .next(&mut data_buffer)
1046 .await
1047 .unwrap()
1048 .unwrap()
1049 .deref(),
1050 &data[..],
1051 "At {i}"
1052 );
1053 peeks += 1;
1054 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1055 }
1056
1057 let start_snapshot = storage.flash().stats_snapshot();
1058 let mut iterator = storage.iter().await.unwrap();
1059 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1060 for i in 0..5 {
1061 let start_snapshot = iterator.storage.flash().stats_snapshot();
1062 let data = vec![i as u8; 50];
1063 assert_eq!(
1064 iterator
1065 .next(&mut data_buffer)
1066 .await
1067 .unwrap()
1068 .unwrap()
1069 .pop()
1070 .await
1071 .unwrap(),
1072 &data,
1073 "At {i}"
1074 );
1075 pops += 1;
1076 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1077 }
1078
1079 for i in 20..25 {
1080 let start_snapshot = storage.flash().stats_snapshot();
1081 let data = vec![i as u8; 50];
1082 storage.push(&data, false).await.unwrap();
1083 pushes += 1;
1084 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1085 }
1086
1087 let start_snapshot = storage.flash().stats_snapshot();
1088 let mut iterator = storage.iter().await.unwrap();
1089 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1090 for i in 5..25 {
1091 let start_snapshot = iterator.storage.flash().stats_snapshot();
1092 let data = vec![i as u8; 50];
1093 assert_eq!(
1094 iterator
1095 .next(&mut data_buffer)
1096 .await
1097 .unwrap()
1098 .unwrap()
1099 .deref(),
1100 &data,
1101 "At {i}"
1102 );
1103 peeks += 1;
1104 peek_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1105 }
1106
1107 let start_snapshot = storage.flash().stats_snapshot();
1108 let mut iterator = storage.iter().await.unwrap();
1109 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1110 for i in 5..25 {
1111 let start_snapshot = iterator.storage.flash().stats_snapshot();
1112 let data = vec![i as u8; 50];
1113 assert_eq!(
1114 iterator
1115 .next(&mut data_buffer)
1116 .await
1117 .unwrap()
1118 .unwrap()
1119 .pop()
1120 .await
1121 .unwrap(),
1122 &data,
1123 "At {i}"
1124 );
1125 pops += 1;
1126 pop_stats += start_snapshot.compare_to(iterator.storage.flash().stats_snapshot());
1127 }
1128 }
1129
1130 approx::assert_relative_eq!(
1132 push_stats.take_average(pushes),
1133 FlashAverageStatsResult {
1134 avg_erases: 0.0,
1135 avg_reads: 16.864,
1136 avg_writes: 3.1252,
1137 avg_bytes_read: 105.4112,
1138 avg_bytes_written: 60.5008
1139 }
1140 );
1141 approx::assert_relative_eq!(
1142 peek_stats.take_average(peeks),
1143 FlashAverageStatsResult {
1144 avg_erases: 0.0052,
1145 avg_reads: 3.8656,
1146 avg_writes: 0.0,
1147 avg_bytes_read: 70.4256,
1148 avg_bytes_written: 0.0
1149 }
1150 );
1151 approx::assert_relative_eq!(
1152 pop_stats.take_average(pops),
1153 FlashAverageStatsResult {
1154 avg_erases: 0.0572,
1155 avg_reads: 3.7772,
1156 avg_writes: 1.0,
1157 avg_bytes_read: 69.7184,
1158 avg_bytes_written: 8.0
1159 }
1160 );
1161 }
1162
1163 #[test]
1164 async fn push_lots_then_pop_lots() {
1165 let mut storage = QueueStorage::new(
1166 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1167 const { QueueConfig::new(0x000..0x1000) },
1168 NoCache::new(),
1169 );
1170 let mut data_buffer = AlignedBuf([0; 1024]);
1171
1172 let mut push_stats = FlashStatsResult::default();
1173 let mut pushes = 0;
1174 let mut pop_stats = FlashStatsResult::default();
1175 let mut pops = 0;
1176
1177 for loop_index in 0..100 {
1178 println!("Loop index: {loop_index}");
1179
1180 for i in 0..20 {
1181 let start_snapshot = storage.flash().stats_snapshot();
1182 let data = vec![i as u8; 50];
1183 storage.push(&data, false).await.unwrap();
1184 pushes += 1;
1185 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1186 }
1187
1188 for i in 0..5 {
1189 let start_snapshot = storage.flash().stats_snapshot();
1190 let data = vec![i as u8; 50];
1191 assert_eq!(
1192 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1193 &data,
1194 "At {i}"
1195 );
1196 pops += 1;
1197 pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1198 }
1199
1200 for i in 20..25 {
1201 let start_snapshot = storage.flash().stats_snapshot();
1202 let data = vec![i as u8; 50];
1203 storage.push(&data, false).await.unwrap();
1204 pushes += 1;
1205 push_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1206 }
1207
1208 for i in 5..25 {
1209 let start_snapshot = storage.flash().stats_snapshot();
1210 let data = vec![i as u8; 50];
1211 assert_eq!(
1212 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1213 &data,
1214 "At {i}"
1215 );
1216 pops += 1;
1217 pop_stats += start_snapshot.compare_to(storage.flash().stats_snapshot());
1218 }
1219 }
1220
1221 approx::assert_relative_eq!(
1223 push_stats.take_average(pushes),
1224 FlashAverageStatsResult {
1225 avg_erases: 0.0,
1226 avg_reads: 16.864,
1227 avg_writes: 3.1252,
1228 avg_bytes_read: 105.4112,
1229 avg_bytes_written: 60.5008
1230 }
1231 );
1232 approx::assert_relative_eq!(
1233 pop_stats.take_average(pops),
1234 FlashAverageStatsResult {
1235 avg_erases: 0.0624,
1236 avg_reads: 23.5768,
1237 avg_writes: 1.0,
1238 avg_bytes_read: 180.512,
1239 avg_bytes_written: 8.0
1240 }
1241 );
1242 }
1243
1244 #[test]
1245 async fn pop_with_empty_section() {
1246 let mut storage = QueueStorage::new(
1247 MockFlashTiny::new(WriteCountCheck::Twice, None, true),
1248 const { QueueConfig::new(0x00..0x40) },
1249 NoCache::new(),
1250 );
1251 let mut data_buffer = AlignedBuf([0; 1024]);
1252
1253 data_buffer[..20].copy_from_slice(&[0xAA; 20]);
1254 storage.push(&data_buffer[0..20], false).await.unwrap();
1255 data_buffer[..20].copy_from_slice(&[0xBB; 20]);
1256 storage.push(&data_buffer[0..20], false).await.unwrap();
1257
1258 assert_eq!(
1261 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1262 &[0xAA; 20]
1263 );
1264
1265 assert_eq!(
1266 storage.pop(&mut data_buffer).await.unwrap().unwrap(),
1267 &[0xBB; 20]
1268 );
1269 }
1270
1271 #[test]
1272 async fn search_pages() {
1273 let mut storage = QueueStorage::new(
1274 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1275 const { QueueConfig::new(0x000..0x1000) },
1276 NoCache::new(),
1277 );
1278
1279 storage.inner.close_page(0).await.unwrap();
1280 storage.inner.close_page(1).await.unwrap();
1281 storage.inner.partial_close_page(2).await.unwrap();
1282
1283 assert_eq!(storage.find_youngest_page().await.unwrap(), 2);
1284 assert_eq!(storage.find_oldest_page().await.unwrap(), 0);
1285 }
1286
1287 #[test]
1288 async fn store_too_big_item() {
1289 let mut storage = QueueStorage::new(
1290 MockFlashBig::new(WriteCountCheck::Twice, None, true),
1291 const { QueueConfig::new(0x000..0x1000) },
1292 NoCache::new(),
1293 );
1294
1295 storage
1296 .push(&AlignedBuf([0; 1024 - 4 * 2 - 8]), false)
1297 .await
1298 .unwrap();
1299
1300 assert_eq!(
1301 storage
1302 .push(&AlignedBuf([0; 1024 - 4 * 2 - 8 + 1]), false,)
1303 .await,
1304 Err(Error::ItemTooBig)
1305 );
1306 }
1307
1308 #[test]
1309 async fn push_on_single_page() {
1310 let mut storage = QueueStorage::new(
1311 mock_flash::MockFlashBase::<1, 4, 256>::new(WriteCountCheck::Twice, None, true),
1312 const { QueueConfig::new(0x000..0x400) },
1313 NoCache::new(),
1314 );
1315
1316 for _ in 0..100 {
1317 match storage.push(&[0, 1, 2, 3, 4], true).await {
1318 Ok(_) => {}
1319 Err(e) => {
1320 println!("{}", storage.print_items().await);
1321 panic!("{e}");
1322 }
1323 }
1324 }
1325 }
1326}