1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Weak};
8
9use anyhow::{Context, Result};
10use arc_swap::{ArcSwap, ArcSwapAny};
11use dashmap::DashMap;
12use parking_lot::Mutex;
13use tokio::sync::{Notify, Semaphore, mpsc};
14use tokio::time::Instant;
15use tycho_block_util::block::BlockStuff;
16use tycho_block_util::queue::QueueStateHeader;
17use tycho_block_util::state::RefMcStateHandle;
18use tycho_storage::fs::Dir;
19use tycho_types::models::{BlockId, PrevBlockRef};
20use tycho_util::fs::MappedFile;
21use tycho_util::sync::CancellationFlag;
22use tycho_util::{FastHashMap, FastHashSet};
23
24pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
25pub use self::queue_state::writer::QueueStateWriter;
26pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
27pub use self::shard_state::writer::ShardStateWriter;
28use super::{
29 BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, NodeStateStorage,
30 ShardStateStorage,
31};
32
33mod queue_state {
34 pub mod reader;
35 pub mod writer;
36}
37mod shard_state {
38 pub mod reader;
39 pub mod writer;
40}
41
42#[cfg(test)]
43mod tests;
44
45const BASE_DIR: &str = "states";
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum PersistentStateKind {
49 Shard,
50 Queue,
51}
52
53impl PersistentStateKind {
54 pub fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
55 match self {
56 Self::Shard => ShardStateWriter::file_name(block_id),
57 Self::Queue => QueueStateWriter::file_name(block_id),
58 }
59 }
60
61 pub fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
62 match self {
63 Self::Shard => ShardStateWriter::temp_file_name(block_id),
64 Self::Queue => QueueStateWriter::temp_file_name(block_id),
65 }
66 }
67
68 pub fn from_extension(extension: &str) -> Option<Self> {
69 match extension {
70 ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
71 QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
72 _ => None,
73 }
74 }
75}
76
77#[derive(Debug, Eq, Hash, PartialEq)]
78struct CacheKey {
79 block_id: BlockId,
80 kind: PersistentStateKind,
81}
82
83#[derive(Clone)]
84pub struct PersistentStateStorage {
85 inner: Arc<Inner>,
86}
87
88impl PersistentStateStorage {
89 pub fn new(
90 cells_db: CellsDb,
91 files_dir: &Dir,
92 node_state: Arc<NodeStateStorage>,
93 block_handle_storage: Arc<BlockHandleStorage>,
94 block_storage: Arc<BlockStorage>,
95 shard_state_storage: Arc<ShardStateStorage>,
96 ) -> Result<Self> {
97 const MAX_PARALLEL_CHUNK_READS: usize = 20;
98
99 let storage_dir = files_dir.create_subdir(BASE_DIR)?;
100
101 Ok(Self {
102 inner: Arc::new(Inner {
103 cells_db,
104 storage_dir,
105 node_state,
106 block_handles: block_handle_storage,
107 blocks: block_storage,
108 shard_states: shard_state_storage,
109 descriptor_cache: Default::default(),
110 mc_seqno_to_block_ids: Default::default(),
111 chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
112 handles_queue: Default::default(),
113 oldest_ps_changed: Default::default(),
114 oldest_ps_handle: Default::default(),
115 subscriptions: Default::default(),
116 subscriptions_mutex: Default::default(),
117 }),
118 })
119 }
120
121 pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
122 self.inner.oldest_ps_handle.load_full()
123 }
124
125 pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
126 self.inner.oldest_ps_changed.notified()
127 }
128
129 #[tracing::instrument(skip_all)]
130 pub async fn preload(&self) -> Result<()> {
131 self.preload_handles_queue()?;
132 self.preload_states().await
133 }
134
135 fn preload_handles_queue(&self) -> Result<()> {
136 let this = self.inner.as_ref();
137
138 let block_handles = this.block_handles.as_ref();
139
140 let mut changed = false;
141 let mut prev_utime = 0;
142 for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
143 let block_handle = block_handles
144 .load_handle(&block_id)
145 .context("key block handle not found")?;
146
147 let gen_utime = block_handle.gen_utime();
148 if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
149 prev_utime = gen_utime;
150
151 let mut queue = this.handles_queue.lock();
152 if queue.push(block_handle) {
153 this.oldest_ps_handle.store(queue.oldest_known().cloned());
154 changed = true;
155 }
156 }
157 }
158
159 if changed {
160 this.oldest_ps_changed.notify_waiters();
161 }
162 Ok(())
163 }
164
165 async fn preload_states(&self) -> Result<()> {
166 let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
168 'outer: for entry in std::fs::read_dir(dir)?.flatten() {
169 let path = entry.path();
170 if path.is_dir() {
172 tracing::warn!(path = %path.display(), "unexpected directory");
173 continue;
174 }
175
176 'file: {
177 let Ok(block_id) = path
179 .file_stem()
181 .unwrap_or_default()
182 .to_str()
183 .unwrap_or_default()
184 .parse::<BlockId>()
185 else {
186 break 'file;
187 };
188
189 let extension = path
190 .extension()
191 .and_then(|ext| ext.to_str())
192 .unwrap_or_default();
193
194 let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
195 break 'file;
196 };
197
198 this.cache_state(mc_seqno, &block_id, cache_type)?;
199 continue 'outer;
200 }
201 tracing::warn!(path = %path.display(), "unexpected file");
202 }
203 Ok(())
204 };
205
206 let this = self.inner.clone();
207 let span = tracing::Span::current();
208 tokio::task::spawn_blocking(move || {
209 let _span = span.enter();
210
211 'outer: for entry in this.storage_dir.entries()?.flatten() {
213 let path = entry.path();
214 if path.is_file() {
216 tracing::warn!(path = %path.display(), "unexpected file");
217 continue;
218 }
219
220 'dir: {
221 let Ok(name) = entry.file_name().into_string() else {
223 break 'dir;
224 };
225 let Ok(mc_seqno) = name.parse::<u32>() else {
226 break 'dir;
227 };
228
229 process_states(&this, &path, mc_seqno)?;
231 continue 'outer;
232 }
233 tracing::warn!(path = %path.display(), "unexpected directory");
234 }
235
236 Ok(())
237 })
238 .await?
239 }
240
241 pub fn state_chunk_size(&self) -> NonZeroU32 {
244 NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
245 }
246
247 pub fn subscribe(&self) -> (Vec<PersistentState>, PersistentStateReceiver) {
248 let id = RECEIVER_ID.fetch_add(1, Ordering::Relaxed);
249 let (sender, receiver) = mpsc::channel(1);
250
251 {
255 let _guard = self.inner.subscriptions_mutex.lock();
256 let mut subscriptions = self.inner.subscriptions.load_full();
257 {
258 let subscriptions = Arc::make_mut(&mut subscriptions);
259 let prev = subscriptions.insert(id, sender);
260 assert!(
261 prev.is_none(),
262 "persistent state subscription must be unique"
263 );
264 }
265 self.inner.subscriptions.store(subscriptions);
266 }
267
268 let receiver = PersistentStateReceiver {
269 id,
270 inner: Arc::downgrade(&self.inner),
271 receiver,
272 };
273
274 let initial_states = self
275 .inner
276 .descriptor_cache
277 .iter()
278 .map(|item| PersistentState {
279 block_id: item.key().block_id,
280 kind: item.key().kind,
281 cached: item.value().clone(),
282 })
283 .collect();
284
285 (initial_states, receiver)
286 }
287
288 pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
289 self.inner.descriptor_cache.contains_key(&CacheKey {
290 block_id: *block_id,
291 kind,
292 })
293 }
294
295 pub fn get_state_info(
296 &self,
297 block_id: &BlockId,
298 kind: PersistentStateKind,
299 ) -> Option<PersistentStateInfo> {
300 self.inner
301 .descriptor_cache
302 .get(&CacheKey {
303 block_id: *block_id,
304 kind,
305 })
306 .and_then(|cached| {
307 let size = NonZeroU64::new(cached.file.length() as u64)?;
308 Some(PersistentStateInfo {
309 size,
310 chunk_size: self.state_chunk_size(),
311 })
312 })
313 }
314
315 pub async fn read_state_part(
316 &self,
317 block_id: &BlockId,
318 offset: u64,
319 state_kind: PersistentStateKind,
320 ) -> Option<Vec<u8>> {
321 let offset = usize::try_from(offset).ok()?;
323 let chunk_size = self.state_chunk_size().get() as usize;
324 if offset % chunk_size != 0 {
325 return None;
326 }
327
328 let permit = {
329 let semaphore = self.inner.chunks_semaphore.clone();
330 semaphore.acquire_owned().await.ok()?
331 };
332
333 let key = CacheKey {
334 block_id: *block_id,
335 kind: state_kind,
336 };
337 let cached = self.inner.descriptor_cache.get(&key)?.clone();
338 if offset > cached.file.length() {
339 return None;
340 }
341
342 tokio::task::spawn_blocking(move || {
346 let _permit = permit;
348
349 let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
350 cached.file.as_slice()[offset..end].to_vec()
351 })
352 .await
353 .ok()
354 }
355
356 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
357 pub async fn store_shard_state(
358 &self,
359 mc_seqno: u32,
360 handle: &BlockHandle,
361 tracker_handle: RefMcStateHandle,
362 ) -> Result<()> {
363 if self
364 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
365 .await?
366 {
367 return Ok(());
368 }
369
370 let cancelled = CancellationFlag::new();
371 scopeguard::defer! {
372 cancelled.cancel();
373 }
374
375 let handle = handle.clone();
376 let this = self.inner.clone();
377 let cancelled = cancelled.clone();
378 let span = tracing::Span::current();
379
380 let state = tokio::task::spawn_blocking(move || {
381 let _span = span.enter();
382
383 let guard = scopeguard::guard((), |_| {
384 tracing::warn!("cancelled");
385 });
386
387 let _tracker_handle = tracker_handle;
389
390 let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
391
392 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
393
394 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
395 match cell_writer.write(&root_hash, Some(&cancelled)) {
396 Ok(_) => {
397 this.block_handles.set_has_persistent_shard_state(&handle);
398 tracing::info!("persistent shard state saved");
399 }
400 Err(e) => {
401 tracing::error!("failed to write persistent shard state: {e:?}");
403 }
404 }
405
406 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
407
408 scopeguard::ScopeGuard::into_inner(guard);
409 Ok::<_, anyhow::Error>(state)
410 })
411 .await??;
412
413 self.notify_with_persistent_state(&state).await;
414 Ok(())
415 }
416
417 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
418 pub async fn store_shard_state_file(
419 &self,
420 mc_seqno: u32,
421 handle: &BlockHandle,
422 file: File,
423 ) -> Result<()> {
424 if self
425 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
426 .await?
427 {
428 return Ok(());
429 }
430
431 let cancelled = CancellationFlag::new();
432 scopeguard::defer! {
433 cancelled.cancel();
434 }
435
436 let handle = handle.clone();
437 let this = self.inner.clone();
438 let cancelled = cancelled.clone();
439 let span = tracing::Span::current();
440
441 let state = tokio::task::spawn_blocking(move || {
442 let _span = span.enter();
443
444 let guard = scopeguard::guard((), |_| {
445 tracing::warn!("cancelled");
446 });
447
448 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
449
450 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
451 cell_writer.write_file(file, Some(&cancelled))?;
452 this.block_handles.set_has_persistent_shard_state(&handle);
453 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
454
455 scopeguard::ScopeGuard::into_inner(guard);
456 Ok::<_, anyhow::Error>(state)
457 })
458 .await??;
459
460 self.notify_with_persistent_state(&state).await;
461 Ok(())
462 }
463
464 #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
465 pub async fn store_queue_state(
466 &self,
467 mc_seqno: u32,
468 handle: &BlockHandle,
469 block: BlockStuff,
470 ) -> Result<()> {
471 if self
472 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
473 .await?
474 {
475 return Ok(());
476 }
477
478 let this = self.inner.clone();
479
480 let shard_ident = handle.id().shard;
481
482 let mut queue_diffs = Vec::new();
483 let mut messages = Vec::new();
484
485 let mut top_block_handle = handle.clone();
486 let mut top_block = block;
487
488 let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
489
490 while tail_len > 0 {
491 let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
492 let top_block_info = top_block.load_info()?;
493
494 let block_extra = top_block.load_extra()?;
495 let out_messages = block_extra.load_out_msg_description()?;
496
497 messages.push(queue_diff.zip(&out_messages));
498 queue_diffs.push(queue_diff.diff().clone());
499
500 if tail_len == 1 {
501 break;
502 }
503
504 let prev_block_id = match top_block_info.load_prev_ref()? {
505 PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
506 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
507 };
508
509 let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
510 anyhow::bail!("prev block handle not found for: {prev_block_id}");
511 };
512 let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
513
514 top_block_handle = prev_block_handle;
515 top_block = prev_block;
516 tail_len -= 1;
517 }
518
519 let state = QueueStateHeader {
520 shard_ident,
521 seqno: handle.id().seqno,
522 queue_diffs,
523 };
524
525 let cancelled = CancellationFlag::new();
526 scopeguard::defer! {
527 cancelled.cancel();
528 }
529
530 let handle = handle.clone();
531 let cancelled = cancelled.clone();
532 let span = tracing::Span::current();
533
534 let state = tokio::task::spawn_blocking(move || {
535 let _span = span.enter();
536
537 let guard = scopeguard::guard((), |_| {
538 tracing::warn!("cancelled");
539 });
540
541 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
542 match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
543 .write(Some(&cancelled))
544 {
545 Ok(()) => {
546 this.block_handles.set_has_persistent_queue_state(&handle);
547 tracing::info!("persistent queue state saved");
548 }
549 Err(e) => {
550 tracing::error!("failed to write persistent queue state: {e:?}");
551 }
552 }
553
554 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
555
556 scopeguard::ScopeGuard::into_inner(guard);
557 Ok::<_, anyhow::Error>(state)
558 })
559 .await??;
560
561 self.notify_with_persistent_state(&state).await;
562 Ok(())
563 }
564
565 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
566 pub async fn store_queue_state_file(
567 &self,
568 mc_seqno: u32,
569 handle: &BlockHandle,
570 file: File,
571 ) -> Result<()> {
572 if self
573 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
574 .await?
575 {
576 return Ok(());
577 }
578
579 let cancelled = CancellationFlag::new();
580 scopeguard::defer! {
581 cancelled.cancel();
582 }
583
584 let handle = handle.clone();
585 let this = self.inner.clone();
586 let cancelled = cancelled.clone();
587 let span = tracing::Span::current();
588
589 let state = tokio::task::spawn_blocking(move || {
590 let _span = span.enter();
591
592 let guard = scopeguard::guard((), |_| {
593 tracing::warn!("cancelled");
594 });
595
596 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
597
598 QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
599 this.block_handles.set_has_persistent_queue_state(&handle);
600 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
601
602 scopeguard::ScopeGuard::into_inner(guard);
603 Ok::<_, anyhow::Error>(state)
604 })
605 .await??;
606
607 self.notify_with_persistent_state(&state).await;
608 Ok(())
609 }
610
611 pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
612 anyhow::ensure!(
613 top_handle.is_masterchain(),
614 "top persistent state handle must be in the masterchain"
615 );
616
617 {
618 tracing::info!(
619 mc_block_id = %top_handle.id(),
620 "adding new persistent state to the queue"
621 );
622
623 let mut queue = self.inner.handles_queue.lock();
624 if queue.push(top_handle.clone()) {
625 self.inner
626 .oldest_ps_handle
627 .store(queue.oldest_known().cloned());
628 self.inner.oldest_ps_changed.notify_waiters();
629 }
630 }
631
632 tracing::info!("started clearing old persistent state directories");
633 let start = Instant::now();
634 scopeguard::defer! {
635 tracing::info!(
636 elapsed = %humantime::format_duration(start.elapsed()),
637 "clearing old persistent state directories completed"
638 );
639 }
640
641 let this = self.inner.clone();
642 let zerostate_seqno = this
643 .node_state
644 .load_zerostate_mc_seqno()
645 .unwrap_or_default();
646
647 let mut top_handle = top_handle.clone();
648 if top_handle.id().seqno <= zerostate_seqno {
649 return Ok(());
651 }
652
653 let span = tracing::Span::current();
654 tokio::task::spawn_blocking(move || {
655 let _span = span.enter();
656
657 let block_handles = &this.block_handles;
658
659 let now_utime = top_handle.gen_utime();
660
661 let mut has_suitable = false;
663 loop {
664 match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
665 Some(handle) if !has_suitable => {
667 has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
668 top_handle = handle;
669 }
670 Some(handle) => {
672 top_handle = handle;
673 break;
674 }
675 None => return Ok(()),
677 }
678 }
679
680 let mut index = this.mc_seqno_to_block_ids.lock();
682 index.retain(|&mc_seqno, block_ids| {
683 if mc_seqno >= top_handle.id().seqno || mc_seqno <= zerostate_seqno {
684 return true;
685 }
686
687 for block_id in block_ids.drain() {
688 this.clear_cache(&block_id);
690 }
691 false
692 });
693
694 this.clear_outdated_state_entries(top_handle.id(), zerostate_seqno)
696 })
697 .await?
698 }
699
700 async fn try_reuse_persistent_state(
701 &self,
702 mc_seqno: u32,
703 handle: &BlockHandle,
704 kind: PersistentStateKind,
705 ) -> Result<bool> {
706 match kind {
708 PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
709 PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
710 _ => {}
711 }
712
713 let block_id = *handle.id();
714
715 let Some(cached) = self
716 .inner
717 .descriptor_cache
718 .get(&CacheKey { block_id, kind })
719 .map(|r| r.clone())
720 else {
721 return Ok(false);
723 };
724
725 if cached.mc_seqno >= mc_seqno {
726 return Ok(true);
728 }
729
730 let this = self.inner.clone();
731
732 let span = tracing::Span::current();
733 let state = tokio::task::spawn_blocking(move || {
734 let _span = span.enter();
735
736 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
737
738 let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
739 std::fs::write(temp_file.path(), cached.file.as_slice())?;
740 temp_file.rename(kind.make_file_name(&block_id))?;
741
742 drop(cached);
743
744 this.cache_state(mc_seqno, &block_id, kind)
745 })
746 .await??;
747
748 self.notify_with_persistent_state(&state).await;
749 Ok(true)
750 }
751
752 async fn notify_with_persistent_state(&self, state: &PersistentState) {
753 let subscriptions = self.inner.subscriptions.load_full();
754 for sender in subscriptions.values() {
755 sender.send(state.clone()).await.ok();
756 }
757 }
758}
759
760struct Inner {
761 cells_db: CellsDb,
762 storage_dir: Dir,
763 node_state: Arc<NodeStateStorage>,
764 block_handles: Arc<BlockHandleStorage>,
765 blocks: Arc<BlockStorage>,
766 shard_states: Arc<ShardStateStorage>,
767 descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
768 mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
769 chunks_semaphore: Arc<Semaphore>,
770 handles_queue: Mutex<HandlesQueue>,
771 oldest_ps_changed: Notify,
772 oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
773 subscriptions: ArcSwap<FastHashMap<usize, mpsc::Sender<PersistentState>>>,
774 subscriptions_mutex: Mutex<()>,
775}
776
777impl Inner {
778 fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
779 let states_dir = self.mc_states_dir(mc_seqno);
780 if !states_dir.path().is_dir() {
781 tracing::info!(mc_seqno, "creating persistent state directory");
782 states_dir.create_if_not_exists()?;
783 }
784 Ok(states_dir)
785 }
786
787 fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
788 Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
789 }
790
791 fn clear_outdated_state_entries(
792 &self,
793 recent_block_id: &BlockId,
794 zerostate_seqno: u32,
795 ) -> Result<()> {
796 let mut directories_to_remove: Vec<PathBuf> = Vec::new();
797 let mut files_to_remove: Vec<PathBuf> = Vec::new();
798
799 for entry in self.storage_dir.entries()?.flatten() {
800 let path = entry.path();
801
802 if path.is_file() {
803 files_to_remove.push(path);
804 continue;
805 }
806
807 let Ok(name) = entry.file_name().into_string() else {
808 directories_to_remove.push(path);
809 continue;
810 };
811
812 let is_recent = matches!(
813 name.parse::<u32>(),
814 Ok(seqno) if seqno >= recent_block_id.seqno || seqno <= zerostate_seqno
815 );
816 if !is_recent {
817 directories_to_remove.push(path);
818 }
819 }
820
821 for dir in directories_to_remove {
822 tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
823 if let Err(e) = std::fs::remove_dir_all(&dir) {
824 tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
825 }
826 }
827
828 for file in files_to_remove {
829 tracing::info!(file = %file.display(), "removing file");
830 if let Err(e) = std::fs::remove_file(&file) {
831 tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
832 }
833 }
834
835 Ok(())
836 }
837
838 fn cache_state(
839 &self,
840 mc_seqno: u32,
841 block_id: &BlockId,
842 kind: PersistentStateKind,
843 ) -> Result<PersistentState> {
844 use std::collections::btree_map;
845
846 use dashmap::mapref::entry::Entry;
847
848 let key = CacheKey {
849 block_id: *block_id,
850 kind,
851 };
852
853 let load_mapped = || {
854 let mut file = self
855 .mc_states_dir(mc_seqno)
856 .file(kind.make_file_name(block_id))
857 .read(true)
858 .open()?;
859
860 let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
864 .context("failed to create a temp file")?;
865
866 std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
870 temp_file.flush()?;
871 temp_file.seek(std::io::SeekFrom::Start(0))?;
872
873 MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
874 };
875
876 let file =
877 load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
878
879 let new_state = Arc::new(CachedState { mc_seqno, file });
880
881 let prev_mc_seqno = match self.descriptor_cache.entry(key) {
882 Entry::Vacant(entry) => {
883 entry.insert(new_state.clone());
884 None
885 }
886 Entry::Occupied(mut entry) => {
887 let prev_mc_seqno = entry.get().mc_seqno;
888 if mc_seqno <= prev_mc_seqno {
889 return Ok(PersistentState {
891 block_id: *block_id,
892 kind,
893 cached: entry.get().clone(),
894 });
895 }
896
897 entry.insert(new_state.clone());
898 Some(prev_mc_seqno)
899 }
900 };
901
902 let mut index = self.mc_seqno_to_block_ids.lock();
903
904 if let Some(prev_mc_seqno) = prev_mc_seqno
906 && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
907 {
908 entry.get_mut().remove(block_id);
909 if entry.get().is_empty() {
910 entry.remove();
911 }
912 }
913
914 index.entry(mc_seqno).or_default().insert(*block_id);
915
916 Ok(PersistentState {
917 block_id: *block_id,
918 kind,
919 cached: new_state,
920 })
921 }
922
923 fn clear_cache(&self, block_id: &BlockId) {
924 self.descriptor_cache.remove(&CacheKey {
925 block_id: *block_id,
926 kind: PersistentStateKind::Shard,
927 });
928 self.descriptor_cache.remove(&CacheKey {
929 block_id: *block_id,
930 kind: PersistentStateKind::Queue,
931 });
932 }
933}
934
935#[derive(Debug, Clone, Copy)]
936pub struct PersistentStateInfo {
937 pub size: NonZeroU64,
938 pub chunk_size: NonZeroU32,
939}
940
941#[derive(Clone)]
942pub struct PersistentState {
943 block_id: BlockId,
944 kind: PersistentStateKind,
945 cached: Arc<CachedState>,
946}
947
948impl PersistentState {
949 pub fn block_id(&self) -> &BlockId {
950 &self.block_id
951 }
952
953 pub fn kind(&self) -> PersistentStateKind {
954 self.kind
955 }
956
957 pub fn file(&self) -> &MappedFile {
958 &self.cached.file
959 }
960
961 pub fn mc_seqno(&self) -> u32 {
962 self.cached.mc_seqno
963 }
964}
965
966pub struct PersistentStateReceiver {
967 id: usize,
968 inner: Weak<Inner>,
969 receiver: mpsc::Receiver<PersistentState>,
970}
971
972impl std::ops::Deref for PersistentStateReceiver {
973 type Target = mpsc::Receiver<PersistentState>;
974
975 #[inline]
976 fn deref(&self) -> &Self::Target {
977 &self.receiver
978 }
979}
980
981impl std::ops::DerefMut for PersistentStateReceiver {
982 #[inline]
983 fn deref_mut(&mut self) -> &mut Self::Target {
984 &mut self.receiver
985 }
986}
987
988impl Drop for PersistentStateReceiver {
989 fn drop(&mut self) {
990 if let Some(inner) = self.inner.upgrade() {
991 let _guard = inner.subscriptions_mutex.lock();
992 let mut subscriptions = inner.subscriptions.load_full();
993 {
994 let subscriptions = Arc::make_mut(&mut subscriptions);
995 subscriptions.remove(&self.id);
996 }
997 inner.subscriptions.store(subscriptions);
998 }
999 }
1000}
1001
1002static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0);
1003
1004struct CachedState {
1005 mc_seqno: u32,
1006 file: MappedFile,
1007}
1008
1009#[derive(Default)]
1010struct HandlesQueue {
1011 handles: VecDeque<BlockHandle>,
1012}
1013
1014impl HandlesQueue {
1015 fn oldest_known(&self) -> Option<&BlockHandle> {
1016 self.handles.back()
1017 }
1018
1019 fn push(&mut self, new_handle: BlockHandle) -> bool {
1020 if let Some(newest) = self.handles.front()
1022 && newest.id().seqno >= new_handle.id().seqno
1023 {
1024 return false;
1025 }
1026
1027 let now_utime = new_handle.gen_utime();
1029 let mut has_suitable = false;
1030 self.handles.retain(|old_handle| {
1031 if !has_suitable {
1032 has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
1033 true
1034 } else {
1035 false
1036 }
1037 });
1038
1039 self.handles.push_front(new_handle);
1041 true
1042 }
1043}
1044
1045const STATE_CHUNK_SIZE: u64 = 1024 * 1024;