1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use arc_swap::ArcSwapAny;
10use dashmap::DashMap;
11use parking_lot::Mutex;
12use tokio::sync::{Notify, Semaphore};
13use tokio::time::Instant;
14use tycho_block_util::block::BlockStuff;
15use tycho_block_util::queue::QueueStateHeader;
16use tycho_block_util::state::RefMcStateHandle;
17use tycho_storage::fs::{Dir, MappedFile};
18use tycho_types::models::{BlockId, PrevBlockRef};
19use tycho_util::FastHashSet;
20use tycho_util::sync::CancellationFlag;
21
22pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
23pub use self::queue_state::writer::QueueStateWriter;
24pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
25pub use self::shard_state::writer::ShardStateWriter;
26use super::{
27 BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, ShardStateStorage,
28};
29
30mod queue_state {
31 pub mod reader;
32 pub mod writer;
33}
34mod shard_state {
35 pub mod reader;
36 pub mod writer;
37}
38
39#[cfg(test)]
40mod tests;
41
42const BASE_DIR: &str = "states";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum PersistentStateKind {
46 Shard,
47 Queue,
48}
49
50impl PersistentStateKind {
51 fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
52 match self {
53 Self::Shard => ShardStateWriter::file_name(block_id),
54 Self::Queue => QueueStateWriter::file_name(block_id),
55 }
56 }
57
58 fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
59 match self {
60 Self::Shard => ShardStateWriter::temp_file_name(block_id),
61 Self::Queue => QueueStateWriter::temp_file_name(block_id),
62 }
63 }
64
65 fn from_extension(extension: &str) -> Option<Self> {
66 match extension {
67 ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
68 QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
69 _ => None,
70 }
71 }
72}
73
74#[derive(Debug, Eq, Hash, PartialEq)]
75struct CacheKey {
76 block_id: BlockId,
77 kind: PersistentStateKind,
78}
79
80#[derive(Clone)]
81pub struct PersistentStateStorage {
82 inner: Arc<Inner>,
83}
84
85impl PersistentStateStorage {
86 pub fn new(
87 cells_db: CellsDb,
88 files_dir: &Dir,
89 block_handle_storage: Arc<BlockHandleStorage>,
90 block_storage: Arc<BlockStorage>,
91 shard_state_storage: Arc<ShardStateStorage>,
92 ) -> Result<Self> {
93 const MAX_PARALLEL_CHUNK_READS: usize = 20;
94
95 let storage_dir = files_dir.create_subdir(BASE_DIR)?;
96
97 Ok(Self {
98 inner: Arc::new(Inner {
99 cells_db,
100 storage_dir,
101 block_handles: block_handle_storage,
102 blocks: block_storage,
103 shard_states: shard_state_storage,
104 descriptor_cache: Default::default(),
105 mc_seqno_to_block_ids: Default::default(),
106 chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
107 handles_queue: Default::default(),
108 oldest_ps_changed: Default::default(),
109 oldest_ps_handle: Default::default(),
110 }),
111 })
112 }
113
114 pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
115 self.inner.oldest_ps_handle.load_full()
116 }
117
118 pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
119 self.inner.oldest_ps_changed.notified()
120 }
121
122 #[tracing::instrument(skip_all)]
123 pub async fn preload(&self) -> Result<()> {
124 self.preload_handles_queue()?;
125 self.preload_states().await
126 }
127
128 fn preload_handles_queue(&self) -> Result<()> {
129 let this = self.inner.as_ref();
130
131 let block_handles = this.block_handles.as_ref();
132
133 let mut changed = false;
134 let mut prev_utime = 0;
135 for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
136 let block_handle = block_handles
137 .load_handle(&block_id)
138 .context("key block handle not found")?;
139
140 let gen_utime = block_handle.gen_utime();
141 if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
142 prev_utime = gen_utime;
143
144 let mut queue = this.handles_queue.lock();
145 if queue.push(block_handle) {
146 this.oldest_ps_handle.store(queue.oldest_known().cloned());
147 changed = true;
148 }
149 }
150 }
151
152 if changed {
153 this.oldest_ps_changed.notify_waiters();
154 }
155 Ok(())
156 }
157
158 async fn preload_states(&self) -> Result<()> {
159 let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
161 'outer: for entry in std::fs::read_dir(dir)?.flatten() {
162 let path = entry.path();
163 if path.is_dir() {
165 tracing::warn!(path = %path.display(), "unexpected directory");
166 continue;
167 }
168
169 'file: {
170 let Ok(block_id) = path
172 .file_stem()
174 .unwrap_or_default()
175 .to_str()
176 .unwrap_or_default()
177 .parse::<BlockId>()
178 else {
179 break 'file;
180 };
181
182 let extension = path
183 .extension()
184 .and_then(|ext| ext.to_str())
185 .unwrap_or_default();
186
187 let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
188 break 'file;
189 };
190
191 this.cache_state(mc_seqno, &block_id, cache_type)?;
192 continue 'outer;
193 }
194 tracing::warn!(path = %path.display(), "unexpected file");
195 }
196 Ok(())
197 };
198
199 let this = self.inner.clone();
200 let span = tracing::Span::current();
201 tokio::task::spawn_blocking(move || {
202 let _span = span.enter();
203
204 'outer: for entry in this.storage_dir.entries()?.flatten() {
206 let path = entry.path();
207 if path.is_file() {
209 tracing::warn!(path = %path.display(), "unexpected file");
210 continue;
211 }
212
213 'dir: {
214 let Ok(name) = entry.file_name().into_string() else {
216 break 'dir;
217 };
218 let Ok(mc_seqno) = name.parse::<u32>() else {
219 break 'dir;
220 };
221
222 process_states(&this, &path, mc_seqno)?;
224 continue 'outer;
225 }
226 tracing::warn!(path = %path.display(), "unexpected directory");
227 }
228
229 Ok(())
230 })
231 .await?
232 }
233
234 pub fn state_chunk_size(&self) -> NonZeroU32 {
237 NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
238 }
239
240 pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
241 self.inner.descriptor_cache.contains_key(&CacheKey {
242 block_id: *block_id,
243 kind,
244 })
245 }
246
247 pub fn get_state_info(
248 &self,
249 block_id: &BlockId,
250 kind: PersistentStateKind,
251 ) -> Option<PersistentStateInfo> {
252 self.inner
253 .descriptor_cache
254 .get(&CacheKey {
255 block_id: *block_id,
256 kind,
257 })
258 .and_then(|cached| {
259 let size = NonZeroU64::new(cached.file.length() as u64)?;
260 Some(PersistentStateInfo {
261 size,
262 chunk_size: self.state_chunk_size(),
263 })
264 })
265 }
266
267 pub async fn read_state_part(
268 &self,
269 block_id: &BlockId,
270 offset: u64,
271 state_kind: PersistentStateKind,
272 ) -> Option<Vec<u8>> {
273 let offset = usize::try_from(offset).ok()?;
275 let chunk_size = self.state_chunk_size().get() as usize;
276 if offset % chunk_size != 0 {
277 return None;
278 }
279
280 let permit = {
281 let semaphore = self.inner.chunks_semaphore.clone();
282 semaphore.acquire_owned().await.ok()?
283 };
284
285 let key = CacheKey {
286 block_id: *block_id,
287 kind: state_kind,
288 };
289 let cached = self.inner.descriptor_cache.get(&key)?.clone();
290 if offset > cached.file.length() {
291 return None;
292 }
293
294 tokio::task::spawn_blocking(move || {
298 let _permit = permit;
300
301 let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
302 cached.file.as_slice()[offset..end].to_vec()
303 })
304 .await
305 .ok()
306 }
307
308 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
309 pub async fn store_shard_state(
310 &self,
311 mc_seqno: u32,
312 handle: &BlockHandle,
313 tracker_handle: RefMcStateHandle,
314 ) -> Result<()> {
315 if self
316 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
317 .await?
318 {
319 return Ok(());
320 }
321
322 let cancelled = CancellationFlag::new();
323 scopeguard::defer! {
324 cancelled.cancel();
325 }
326
327 let handle = handle.clone();
328 let this = self.inner.clone();
329 let cancelled = cancelled.clone();
330 let span = tracing::Span::current();
331
332 tokio::task::spawn_blocking(move || {
333 let _span = span.enter();
334
335 let guard = scopeguard::guard((), |_| {
336 tracing::warn!("cancelled");
337 });
338
339 let _tracker_handle = tracker_handle;
341
342 let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
343
344 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
345
346 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
347 match cell_writer.write(&root_hash, Some(&cancelled)) {
348 Ok(()) => {
349 this.block_handles.set_has_persistent_shard_state(&handle);
350 tracing::info!("persistent shard state saved");
351 }
352 Err(e) => {
353 tracing::error!("failed to write persistent shard state: {e:?}");
355 }
356 }
357
358 this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
359
360 scopeguard::ScopeGuard::into_inner(guard);
361 Ok(())
362 })
363 .await?
364 }
365
366 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
367 pub async fn store_shard_state_file(
368 &self,
369 mc_seqno: u32,
370 handle: &BlockHandle,
371 file: File,
372 ) -> Result<()> {
373 if self
374 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
375 .await?
376 {
377 return Ok(());
378 }
379
380 let cancelled = CancellationFlag::new();
381 scopeguard::defer! {
382 cancelled.cancel();
383 }
384
385 let handle = handle.clone();
386 let this = self.inner.clone();
387 let cancelled = cancelled.clone();
388 let span = tracing::Span::current();
389
390 tokio::task::spawn_blocking(move || {
391 let _span = span.enter();
392
393 let guard = scopeguard::guard((), |_| {
394 tracing::warn!("cancelled");
395 });
396
397 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
398
399 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
400 cell_writer.write_file(file, Some(&cancelled))?;
401 this.block_handles.set_has_persistent_shard_state(&handle);
402 this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
403
404 scopeguard::ScopeGuard::into_inner(guard);
405 Ok(())
406 })
407 .await?
408 }
409
410 #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
411 pub async fn store_queue_state(
412 &self,
413 mc_seqno: u32,
414 handle: &BlockHandle,
415 block: BlockStuff,
416 ) -> Result<()> {
417 if self
418 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
419 .await?
420 {
421 return Ok(());
422 }
423
424 let this = self.inner.clone();
425
426 let shard_ident = handle.id().shard;
427
428 let mut queue_diffs = Vec::new();
429 let mut messages = Vec::new();
430
431 let mut top_block_handle = handle.clone();
432 let mut top_block = block;
433
434 let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
435
436 while tail_len > 0 {
437 let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
438 let top_block_info = top_block.load_info()?;
439
440 let block_extra = top_block.load_extra()?;
441 let out_messages = block_extra.load_out_msg_description()?;
442
443 messages.push(queue_diff.zip(&out_messages));
444 queue_diffs.push(queue_diff.diff().clone());
445
446 if tail_len == 1 {
447 break;
448 }
449
450 let prev_block_id = match top_block_info.load_prev_ref()? {
451 PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
452 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
453 };
454
455 let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
456 anyhow::bail!("prev block handle not found for: {prev_block_id}");
457 };
458 let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
459
460 top_block_handle = prev_block_handle;
461 top_block = prev_block;
462 tail_len -= 1;
463 }
464
465 let state = QueueStateHeader {
466 shard_ident,
467 seqno: handle.id().seqno,
468 queue_diffs,
469 };
470
471 let cancelled = CancellationFlag::new();
472 scopeguard::defer! {
473 cancelled.cancel();
474 }
475
476 let handle = handle.clone();
477 let cancelled = cancelled.clone();
478 let span = tracing::Span::current();
479
480 tokio::task::spawn_blocking(move || {
481 let _span = span.enter();
482
483 let guard = scopeguard::guard((), |_| {
484 tracing::warn!("cancelled");
485 });
486
487 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
488 match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
489 .write(Some(&cancelled))
490 {
491 Ok(()) => {
492 this.block_handles.set_has_persistent_queue_state(&handle);
493 tracing::info!("persistent queue state saved");
494 }
495 Err(e) => {
496 tracing::error!("failed to write persistent queue state: {e:?}");
497 }
498 }
499
500 this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
501
502 scopeguard::ScopeGuard::into_inner(guard);
503 Ok(())
504 })
505 .await?
506 }
507
508 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
509 pub async fn store_queue_state_file(
510 &self,
511 mc_seqno: u32,
512 handle: &BlockHandle,
513 file: File,
514 ) -> Result<()> {
515 if self
516 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
517 .await?
518 {
519 return Ok(());
520 }
521
522 let cancelled = CancellationFlag::new();
523 scopeguard::defer! {
524 cancelled.cancel();
525 }
526
527 let handle = handle.clone();
528 let this = self.inner.clone();
529 let cancelled = cancelled.clone();
530 let span = tracing::Span::current();
531
532 tokio::task::spawn_blocking(move || {
533 let _span = span.enter();
534
535 let guard = scopeguard::guard((), |_| {
536 tracing::warn!("cancelled");
537 });
538
539 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
540
541 QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
542 this.block_handles.set_has_persistent_queue_state(&handle);
543 this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
544
545 scopeguard::ScopeGuard::into_inner(guard);
546 Ok(())
547 })
548 .await?
549 }
550
551 pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
552 anyhow::ensure!(
553 top_handle.is_masterchain(),
554 "top persistent state handle must be in the masterchain"
555 );
556
557 {
558 tracing::info!(
559 mc_block_id = %top_handle.id(),
560 "adding new persistent state to the queue"
561 );
562
563 let mut queue = self.inner.handles_queue.lock();
564 if queue.push(top_handle.clone()) {
565 self.inner
566 .oldest_ps_handle
567 .store(queue.oldest_known().cloned());
568 self.inner.oldest_ps_changed.notify_waiters();
569 }
570 }
571
572 tracing::info!("started clearing old persistent state directories");
573 let start = Instant::now();
574 scopeguard::defer! {
575 tracing::info!(
576 elapsed = %humantime::format_duration(start.elapsed()),
577 "clearing old persistent state directories completed"
578 );
579 }
580
581 let this = self.inner.clone();
582 let mut top_handle = top_handle.clone();
583 if top_handle.id().seqno == 0 {
584 return Ok(());
586 }
587
588 let span = tracing::Span::current();
589 tokio::task::spawn_blocking(move || {
590 let _span = span.enter();
591
592 let block_handles = &this.block_handles;
593
594 let now_utime = top_handle.gen_utime();
595
596 let mut has_suitable = false;
598 loop {
599 match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
600 Some(handle) if !has_suitable => {
602 has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
603 top_handle = handle;
604 }
605 Some(handle) => {
607 top_handle = handle;
608 break;
609 }
610 None => return Ok(()),
612 }
613 }
614
615 let mut index = this.mc_seqno_to_block_ids.lock();
617 index.retain(|&mc_seqno, block_ids| {
618 if mc_seqno >= top_handle.id().seqno || mc_seqno == 0 {
619 return true;
620 }
621
622 for block_id in block_ids.drain() {
623 this.clear_cache(&block_id);
625 }
626 false
627 });
628
629 this.clear_outdated_state_entries(top_handle.id())
631 })
632 .await?
633 }
634
635 async fn try_reuse_persistent_state(
636 &self,
637 mc_seqno: u32,
638 handle: &BlockHandle,
639 kind: PersistentStateKind,
640 ) -> Result<bool> {
641 match kind {
643 PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
644 PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
645 _ => {}
646 }
647
648 let block_id = *handle.id();
649
650 let Some(cached) = self
651 .inner
652 .descriptor_cache
653 .get(&CacheKey { block_id, kind })
654 .map(|r| r.clone())
655 else {
656 return Ok(false);
658 };
659
660 if cached.mc_seqno >= mc_seqno {
661 return Ok(true);
663 }
664
665 let this = self.inner.clone();
666
667 let span = tracing::Span::current();
668 tokio::task::spawn_blocking(move || {
669 let _span = span.enter();
670
671 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
672
673 let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
674 std::fs::write(temp_file.path(), cached.file.as_slice())?;
675 temp_file.rename(kind.make_file_name(&block_id))?;
676
677 drop(cached);
678
679 this.cache_state(mc_seqno, &block_id, kind)?;
680 Ok(true)
681 })
682 .await?
683 }
684}
685
686struct Inner {
687 cells_db: CellsDb,
688 storage_dir: Dir,
689 block_handles: Arc<BlockHandleStorage>,
690 blocks: Arc<BlockStorage>,
691 shard_states: Arc<ShardStateStorage>,
692 descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
693 mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
694 chunks_semaphore: Arc<Semaphore>,
695 handles_queue: Mutex<HandlesQueue>,
696 oldest_ps_changed: Notify,
697 oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
698}
699
700impl Inner {
701 fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
702 let states_dir = self.mc_states_dir(mc_seqno);
703 if !states_dir.path().is_dir() {
704 tracing::info!(mc_seqno, "creating persistent state directory");
705 states_dir.create_if_not_exists()?;
706 }
707 Ok(states_dir)
708 }
709
710 fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
711 Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
712 }
713
714 fn clear_outdated_state_entries(&self, recent_block_id: &BlockId) -> Result<()> {
715 let mut directories_to_remove: Vec<PathBuf> = Vec::new();
716 let mut files_to_remove: Vec<PathBuf> = Vec::new();
717
718 for entry in self.storage_dir.entries()?.flatten() {
719 let path = entry.path();
720
721 if path.is_file() {
722 files_to_remove.push(path);
723 continue;
724 }
725
726 let Ok(name) = entry.file_name().into_string() else {
727 directories_to_remove.push(path);
728 continue;
729 };
730
731 let is_recent = matches!(
732 name.parse::<u32>(),
733 Ok(seqno) if seqno >= recent_block_id.seqno || seqno == 0
734 );
735 if !is_recent {
736 directories_to_remove.push(path);
737 }
738 }
739
740 for dir in directories_to_remove {
741 tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
742 if let Err(e) = std::fs::remove_dir_all(&dir) {
743 tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
744 }
745 }
746
747 for file in files_to_remove {
748 tracing::info!(file = %file.display(), "removing file");
749 if let Err(e) = std::fs::remove_file(&file) {
750 tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
751 }
752 }
753
754 Ok(())
755 }
756
757 fn cache_state(
758 &self,
759 mc_seqno: u32,
760 block_id: &BlockId,
761 kind: PersistentStateKind,
762 ) -> Result<()> {
763 use std::collections::btree_map;
764
765 use dashmap::mapref::entry::Entry;
766
767 let key = CacheKey {
768 block_id: *block_id,
769 kind,
770 };
771
772 let load_mapped = || {
773 let mut file = self
774 .mc_states_dir(mc_seqno)
775 .file(kind.make_file_name(block_id))
776 .read(true)
777 .open()?;
778
779 let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
783 .context("failed to create a temp file")?;
784
785 std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
789 temp_file.flush()?;
790 temp_file.seek(std::io::SeekFrom::Start(0))?;
791
792 MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
793 };
794
795 let file =
796 load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
797
798 let new_state = Arc::new(CachedState { mc_seqno, file });
799
800 let prev_mc_seqno = match self.descriptor_cache.entry(key) {
801 Entry::Vacant(entry) => {
802 entry.insert(new_state);
803 None
804 }
805 Entry::Occupied(mut entry) => {
806 let prev_mc_seqno = entry.get().mc_seqno;
807 if mc_seqno <= prev_mc_seqno {
808 return Ok(());
810 }
811
812 entry.insert(new_state);
813 Some(prev_mc_seqno)
814 }
815 };
816
817 let mut index = self.mc_seqno_to_block_ids.lock();
818
819 if let Some(prev_mc_seqno) = prev_mc_seqno
821 && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
822 {
823 entry.get_mut().remove(block_id);
824 if entry.get().is_empty() {
825 entry.remove();
826 }
827 }
828
829 index.entry(mc_seqno).or_default().insert(*block_id);
830
831 Ok(())
832 }
833
834 fn clear_cache(&self, block_id: &BlockId) {
835 self.descriptor_cache.remove(&CacheKey {
836 block_id: *block_id,
837 kind: PersistentStateKind::Shard,
838 });
839 self.descriptor_cache.remove(&CacheKey {
840 block_id: *block_id,
841 kind: PersistentStateKind::Queue,
842 });
843 }
844}
845
846#[derive(Debug, Clone, Copy)]
847pub struct PersistentStateInfo {
848 pub size: NonZeroU64,
849 pub chunk_size: NonZeroU32,
850}
851
852struct CachedState {
853 mc_seqno: u32,
854 file: MappedFile,
855}
856
857#[derive(Default)]
858struct HandlesQueue {
859 handles: VecDeque<BlockHandle>,
860}
861
862impl HandlesQueue {
863 fn oldest_known(&self) -> Option<&BlockHandle> {
864 self.handles.back()
865 }
866
867 fn push(&mut self, new_handle: BlockHandle) -> bool {
868 if let Some(newest) = self.handles.front()
870 && newest.id().seqno >= new_handle.id().seqno
871 {
872 return false;
873 }
874
875 let now_utime = new_handle.gen_utime();
877 let mut has_suitable = false;
878 self.handles.retain(|old_handle| {
879 if !has_suitable {
880 has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
881 true
882 } else {
883 false
884 }
885 });
886
887 self.handles.push_front(new_handle);
889 true
890 }
891}
892
893const STATE_CHUNK_SIZE: u64 = 1024 * 1024;