1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Weak};
8
9use anyhow::{Context, Result};
10use arc_swap::{ArcSwap, ArcSwapAny};
11use dashmap::DashMap;
12use parking_lot::Mutex;
13use tokio::sync::{Notify, Semaphore, mpsc};
14use tokio::time::Instant;
15use tycho_block_util::block::BlockStuff;
16use tycho_block_util::queue::QueueStateHeader;
17use tycho_storage::fs::Dir;
18use tycho_types::models::{BlockId, PrevBlockRef};
19use tycho_util::fs::MappedFile;
20use tycho_util::sync::CancellationFlag;
21use tycho_util::{FastHashMap, FastHashSet};
22
23pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
24pub use self::queue_state::writer::QueueStateWriter;
25pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
26pub use self::shard_state::writer::ShardStateWriter;
27use super::{
28 BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, NodeStateStorage,
29 ShardStateStorage,
30};
31
32mod queue_state {
33 pub mod reader;
34 pub mod writer;
35}
36mod shard_state {
37 pub mod reader;
38 pub mod writer;
39}
40
41#[cfg(test)]
42mod tests;
43
44const BASE_DIR: &str = "states";
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub enum PersistentStateKind {
48 Shard,
49 Queue,
50}
51
52impl PersistentStateKind {
53 pub fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
54 match self {
55 Self::Shard => ShardStateWriter::file_name(block_id),
56 Self::Queue => QueueStateWriter::file_name(block_id),
57 }
58 }
59
60 pub fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
61 match self {
62 Self::Shard => ShardStateWriter::temp_file_name(block_id),
63 Self::Queue => QueueStateWriter::temp_file_name(block_id),
64 }
65 }
66
67 pub fn from_extension(extension: &str) -> Option<Self> {
68 match extension {
69 ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
70 QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
71 _ => None,
72 }
73 }
74}
75
76#[derive(Debug, Eq, Hash, PartialEq)]
77struct CacheKey {
78 block_id: BlockId,
79 kind: PersistentStateKind,
80}
81
82#[derive(Clone)]
83pub struct PersistentStateStorage {
84 inner: Arc<Inner>,
85}
86
87impl PersistentStateStorage {
88 pub fn new(
89 cells_db: CellsDb,
90 files_dir: &Dir,
91 node_state: Arc<NodeStateStorage>,
92 block_handle_storage: Arc<BlockHandleStorage>,
93 block_storage: Arc<BlockStorage>,
94 shard_state_storage: Arc<ShardStateStorage>,
95 ) -> Result<Self> {
96 const MAX_PARALLEL_CHUNK_READS: usize = 20;
97
98 let storage_dir = files_dir.create_subdir(BASE_DIR)?;
99
100 Ok(Self {
101 inner: Arc::new(Inner {
102 cells_db,
103 storage_dir,
104 node_state,
105 block_handles: block_handle_storage,
106 blocks: block_storage,
107 shard_states: shard_state_storage,
108 descriptor_cache: Default::default(),
109 mc_seqno_to_block_ids: Default::default(),
110 chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
111 handles_queue: Default::default(),
112 oldest_ps_changed: Default::default(),
113 oldest_ps_handle: Default::default(),
114 subscriptions: Default::default(),
115 subscriptions_mutex: Default::default(),
116 }),
117 })
118 }
119
120 pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
121 self.inner.oldest_ps_handle.load_full()
122 }
123
124 pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
125 self.inner.oldest_ps_changed.notified()
126 }
127
128 #[tracing::instrument(skip_all)]
129 pub async fn preload(&self) -> Result<()> {
130 self.preload_handles_queue()?;
131 self.preload_states().await?;
132 Ok(())
133 }
134
135 fn preload_handles_queue(&self) -> Result<()> {
136 let this = self.inner.as_ref();
137
138 let block_handles = this.block_handles.as_ref();
139
140 let mut changed = false;
141 let mut prev_utime = 0;
142 for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
143 let block_handle = block_handles
144 .load_handle(&block_id)
145 .context("key block handle not found")?;
146
147 let gen_utime = block_handle.gen_utime();
148 if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
149 prev_utime = gen_utime;
150
151 let mut queue = this.handles_queue.lock();
152 if queue.push(block_handle) {
153 this.oldest_ps_handle.store(queue.oldest_known().cloned());
154 changed = true;
155 }
156 }
157 }
158
159 if changed {
160 this.oldest_ps_changed.notify_waiters();
161 }
162 Ok(())
163 }
164
165 async fn preload_states(&self) -> Result<()> {
166 let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
168 'outer: for entry in std::fs::read_dir(dir)?.flatten() {
169 let path = entry.path();
170 if path.is_dir() {
172 tracing::warn!(path = %path.display(), "unexpected directory");
173 continue;
174 }
175
176 'file: {
177 let Ok(block_id) = path
179 .file_stem()
181 .unwrap_or_default()
182 .to_str()
183 .unwrap_or_default()
184 .parse::<BlockId>()
185 else {
186 break 'file;
187 };
188
189 let extension = path
190 .extension()
191 .and_then(|ext| ext.to_str())
192 .unwrap_or_default();
193
194 let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
195 break 'file;
196 };
197
198 this.cache_state(mc_seqno, &block_id, cache_type)?;
199 continue 'outer;
200 }
201 tracing::warn!(path = %path.display(), "unexpected file");
202 }
203 Ok(())
204 };
205
206 let this = self.inner.clone();
207 let span = tracing::Span::current();
208 tokio::task::spawn_blocking(move || {
209 let _span = span.enter();
210
211 'outer: for entry in this.storage_dir.entries()?.flatten() {
213 let path = entry.path();
214 if path.is_file() {
216 tracing::warn!(path = %path.display(), "unexpected file");
217 continue;
218 }
219
220 'dir: {
221 let Ok(name) = entry.file_name().into_string() else {
223 break 'dir;
224 };
225 let Ok(mc_seqno) = name.parse::<u32>() else {
226 break 'dir;
227 };
228
229 process_states(&this, &path, mc_seqno)?;
231 continue 'outer;
232 }
233 tracing::warn!(path = %path.display(), "unexpected directory");
234 }
235
236 Ok(())
237 })
238 .await?
239 }
240
241 pub fn state_chunk_size(&self) -> NonZeroU32 {
244 NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
245 }
246
247 pub fn subscribe(&self) -> (Vec<PersistentState>, PersistentStateReceiver) {
248 let id = RECEIVER_ID.fetch_add(1, Ordering::Relaxed);
249 let (sender, receiver) = mpsc::channel(1);
250
251 {
255 let _guard = self.inner.subscriptions_mutex.lock();
256 let mut subscriptions = self.inner.subscriptions.load_full();
257 {
258 let subscriptions = Arc::make_mut(&mut subscriptions);
259 let prev = subscriptions.insert(id, sender);
260 assert!(
261 prev.is_none(),
262 "persistent state subscription must be unique"
263 );
264 }
265 self.inner.subscriptions.store(subscriptions);
266 }
267
268 let receiver = PersistentStateReceiver {
269 id,
270 inner: Arc::downgrade(&self.inner),
271 receiver,
272 };
273
274 let initial_states = self
275 .inner
276 .descriptor_cache
277 .iter()
278 .map(|item| PersistentState {
279 block_id: item.key().block_id,
280 kind: item.key().kind,
281 cached: item.value().clone(),
282 })
283 .collect();
284
285 (initial_states, receiver)
286 }
287
288 pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
289 self.inner.descriptor_cache.contains_key(&CacheKey {
290 block_id: *block_id,
291 kind,
292 })
293 }
294
295 pub fn get_state_info(
296 &self,
297 block_id: &BlockId,
298 kind: PersistentStateKind,
299 ) -> Option<PersistentStateInfo> {
300 self.inner
301 .descriptor_cache
302 .get(&CacheKey {
303 block_id: *block_id,
304 kind,
305 })
306 .and_then(|cached| {
307 let size = NonZeroU64::new(cached.file.length() as u64)?;
308 Some(PersistentStateInfo {
309 size,
310 chunk_size: self.state_chunk_size(),
311 })
312 })
313 }
314
315 pub async fn read_state_part(
316 &self,
317 block_id: &BlockId,
318 offset: u64,
319 state_kind: PersistentStateKind,
320 ) -> Option<Vec<u8>> {
321 let offset = usize::try_from(offset).ok()?;
323 let chunk_size = self.state_chunk_size().get() as usize;
324 if offset % chunk_size != 0 {
325 return None;
326 }
327
328 let permit = {
329 let semaphore = self.inner.chunks_semaphore.clone();
330 semaphore.acquire_owned().await.ok()?
331 };
332
333 let key = CacheKey {
334 block_id: *block_id,
335 kind: state_kind,
336 };
337 let cached = self.inner.descriptor_cache.get(&key)?.clone();
338 if offset > cached.file.length() {
339 return None;
340 }
341
342 tokio::task::spawn_blocking(move || {
346 let _permit = permit;
348
349 let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
350 cached.file.as_slice()[offset..end].to_vec()
351 })
352 .await
353 .ok()
354 }
355
356 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
357 pub async fn store_shard_state(&self, mc_seqno: u32, handle: &BlockHandle) -> Result<()> {
358 if self
359 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
360 .await?
361 {
362 return Ok(());
363 }
364
365 let cancelled = CancellationFlag::new();
366 scopeguard::defer! {
367 cancelled.cancel();
368 }
369
370 let handle = handle.clone();
371 let this = self.inner.clone();
372 let cancelled = cancelled.clone();
373 let span = tracing::Span::current();
374
375 let state = tokio::task::spawn_blocking(move || {
376 let _span = span.enter();
377
378 let guard = scopeguard::guard((), |_| {
379 tracing::warn!("cancelled");
380 });
381
382 let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
383
384 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
385
386 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
387 match cell_writer.write(&root_hash, Some(&cancelled)) {
388 Ok(_) => {
389 this.block_handles.set_has_persistent_shard_state(&handle);
390 tracing::info!("persistent shard state saved");
391 }
392 Err(e) => {
393 tracing::error!("failed to write persistent shard state: {e:?}");
395 }
396 }
397
398 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
399
400 scopeguard::ScopeGuard::into_inner(guard);
401 Ok::<_, anyhow::Error>(state)
402 })
403 .await??;
404
405 self.notify_with_persistent_state(&state).await;
406 Ok(())
407 }
408
409 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
410 pub async fn store_shard_state_file(
411 &self,
412 mc_seqno: u32,
413 handle: &BlockHandle,
414 file: File,
415 ) -> Result<()> {
416 if self
417 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
418 .await?
419 {
420 return Ok(());
421 }
422
423 let cancelled = CancellationFlag::new();
424 scopeguard::defer! {
425 cancelled.cancel();
426 }
427
428 let handle = handle.clone();
429 let this = self.inner.clone();
430 let cancelled = cancelled.clone();
431 let span = tracing::Span::current();
432
433 let state = tokio::task::spawn_blocking(move || {
434 let _span = span.enter();
435
436 let guard = scopeguard::guard((), |_| {
437 tracing::warn!("cancelled");
438 });
439
440 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
441
442 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
443 cell_writer.write_file(file, Some(&cancelled))?;
444 this.block_handles.set_has_persistent_shard_state(&handle);
445 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
446
447 scopeguard::ScopeGuard::into_inner(guard);
448 Ok::<_, anyhow::Error>(state)
449 })
450 .await??;
451
452 self.notify_with_persistent_state(&state).await;
453 Ok(())
454 }
455
456 #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
457 pub async fn store_queue_state(
458 &self,
459 mc_seqno: u32,
460 handle: &BlockHandle,
461 block: BlockStuff,
462 ) -> Result<()> {
463 if self
464 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
465 .await?
466 {
467 return Ok(());
468 }
469
470 let this = self.inner.clone();
471
472 let shard_ident = handle.id().shard;
473
474 let mut queue_diffs = Vec::new();
475 let mut messages = Vec::new();
476
477 let mut top_block_handle = handle.clone();
478 let mut top_block = block;
479
480 let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
481 while tail_len > 0 {
482 let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
483 let top_block_info = top_block.load_info()?;
484
485 let block_extra = top_block.load_extra()?;
486 let out_messages = block_extra.load_out_msg_description()?;
487
488 messages.push(queue_diff.zip(&out_messages));
489 queue_diffs.push(queue_diff.diff().clone());
490
491 if tail_len == 1 {
492 break;
493 }
494
495 let prev_block_id = match top_block_info.load_prev_ref()? {
496 PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
497 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
498 };
499
500 let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
501 anyhow::bail!("prev block handle not found for: {prev_block_id}");
502 };
503 let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
504
505 top_block_handle = prev_block_handle;
506 top_block = prev_block;
507 tail_len -= 1;
508 }
509
510 let state = QueueStateHeader {
511 shard_ident,
512 seqno: handle.id().seqno,
513 queue_diffs,
514 };
515
516 let cancelled = CancellationFlag::new();
517 scopeguard::defer! {
518 cancelled.cancel();
519 }
520
521 let handle = handle.clone();
522
523 let cancelled = cancelled.clone();
524 let span = tracing::Span::current();
525
526 let state = tokio::task::spawn_blocking(move || {
527 let _span = span.enter();
528
529 let guard = scopeguard::guard((), |_| {
530 tracing::warn!("cancelled");
531 });
532
533 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
534 match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
535 .write(Some(&cancelled))
536 {
537 Ok(()) => {
538 this.block_handles.set_has_persistent_queue_state(&handle);
539 tracing::info!("persistent queue state saved");
540 }
541 Err(e) => {
542 tracing::error!("failed to write persistent queue state: {e:?}");
543 }
544 }
545
546 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
547
548 scopeguard::ScopeGuard::into_inner(guard);
549 Ok::<_, anyhow::Error>(state)
550 })
551 .await??;
552 self.notify_with_persistent_state(&state).await;
553 Ok(())
554 }
555
556 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
557 pub async fn store_queue_state_file(
558 &self,
559 mc_seqno: u32,
560 handle: &BlockHandle,
561 file: File,
562 ) -> Result<()> {
563 if self
564 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
565 .await?
566 {
567 return Ok(());
568 }
569
570 let cancelled = CancellationFlag::new();
571 scopeguard::defer! {
572 cancelled.cancel();
573 }
574
575 let handle = handle.clone();
576 let this = self.inner.clone();
577 let cancelled = cancelled.clone();
578 let span = tracing::Span::current();
579
580 let state = tokio::task::spawn_blocking(move || {
581 let _span = span.enter();
582
583 let guard = scopeguard::guard((), |_| {
584 tracing::warn!("cancelled");
585 });
586
587 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
588
589 QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
590 this.block_handles.set_has_persistent_queue_state(&handle);
591 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
592
593 scopeguard::ScopeGuard::into_inner(guard);
594 Ok::<_, anyhow::Error>(state)
595 })
596 .await??;
597
598 self.notify_with_persistent_state(&state).await;
599 Ok(())
600 }
601
602 pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
603 anyhow::ensure!(
604 top_handle.is_masterchain(),
605 "top persistent state handle must be in the masterchain"
606 );
607
608 {
609 tracing::info!(
610 mc_block_id = %top_handle.id(),
611 "adding new persistent state to the queue"
612 );
613
614 let mut queue = self.inner.handles_queue.lock();
615 if queue.push(top_handle.clone()) {
616 self.inner
617 .oldest_ps_handle
618 .store(queue.oldest_known().cloned());
619 self.inner.oldest_ps_changed.notify_waiters();
620 }
621 }
622
623 tracing::info!("started clearing old persistent state directories");
624 let start = Instant::now();
625 scopeguard::defer! {
626 tracing::info!(
627 elapsed = %humantime::format_duration(start.elapsed()),
628 "clearing old persistent state directories completed"
629 );
630 }
631
632 let this = self.inner.clone();
633 let zerostate_seqno = this
634 .node_state
635 .load_zerostate_mc_seqno()
636 .unwrap_or_default();
637
638 let mut top_handle = top_handle.clone();
639 if top_handle.id().seqno <= zerostate_seqno {
640 return Ok(());
642 }
643
644 let span = tracing::Span::current();
645 tokio::task::spawn_blocking(move || {
646 let _span = span.enter();
647
648 let block_handles = &this.block_handles;
649
650 let now_utime = top_handle.gen_utime();
651
652 let mut has_suitable = false;
654 loop {
655 match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
656 Some(handle) if !has_suitable => {
658 has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
659 top_handle = handle;
660 }
661 Some(handle) => {
663 top_handle = handle;
664 break;
665 }
666 None => return Ok(()),
668 }
669 }
670
671 let mut index = this.mc_seqno_to_block_ids.lock();
673 index.retain(|&mc_seqno, block_ids| {
674 if mc_seqno >= top_handle.id().seqno || mc_seqno <= zerostate_seqno {
675 return true;
676 }
677
678 for block_id in block_ids.drain() {
679 this.clear_cache(&block_id);
681 }
682 false
683 });
684
685 this.clear_outdated_state_entries(top_handle.id(), zerostate_seqno)
687 })
688 .await?
689 }
690
691 async fn try_reuse_persistent_state(
692 &self,
693 mc_seqno: u32,
694 handle: &BlockHandle,
695 kind: PersistentStateKind,
696 ) -> Result<bool> {
697 match kind {
699 PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
700 PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
701 _ => {}
702 }
703
704 let block_id = *handle.id();
705
706 let Some(cached) = self
707 .inner
708 .descriptor_cache
709 .get(&CacheKey { block_id, kind })
710 .map(|r| r.clone())
711 else {
712 return Ok(false);
714 };
715
716 if cached.mc_seqno >= mc_seqno {
717 return Ok(true);
719 }
720
721 let this = self.inner.clone();
722
723 let span = tracing::Span::current();
724 let state = tokio::task::spawn_blocking(move || {
725 let _span = span.enter();
726
727 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
728
729 let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
730 std::fs::write(temp_file.path(), cached.file.as_slice())?;
731 temp_file.rename(kind.make_file_name(&block_id))?;
732
733 drop(cached);
734
735 this.cache_state(mc_seqno, &block_id, kind)
736 })
737 .await??;
738
739 self.notify_with_persistent_state(&state).await;
740 Ok(true)
741 }
742
743 async fn notify_with_persistent_state(&self, state: &PersistentState) {
744 let subscriptions = self.inner.subscriptions.load_full();
745 for sender in subscriptions.values() {
746 sender.send(state.clone()).await.ok();
747 }
748 }
749}
750
751struct Inner {
752 cells_db: CellsDb,
753 storage_dir: Dir,
754 node_state: Arc<NodeStateStorage>,
755 block_handles: Arc<BlockHandleStorage>,
756 blocks: Arc<BlockStorage>,
757 shard_states: Arc<ShardStateStorage>,
758 descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
759 mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
760 chunks_semaphore: Arc<Semaphore>,
761 handles_queue: Mutex<HandlesQueue>,
762 oldest_ps_changed: Notify,
763 oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
764 subscriptions: ArcSwap<FastHashMap<usize, mpsc::Sender<PersistentState>>>,
765 subscriptions_mutex: Mutex<()>,
766}
767
768impl Inner {
769 fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
770 let states_dir = self.mc_states_dir(mc_seqno);
771 if !states_dir.path().is_dir() {
772 tracing::info!(mc_seqno, "creating persistent state directory");
773 states_dir.create_if_not_exists()?;
774 }
775 Ok(states_dir)
776 }
777
778 fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
779 Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
780 }
781
782 fn clear_outdated_state_entries(
783 &self,
784 recent_block_id: &BlockId,
785 zerostate_seqno: u32,
786 ) -> Result<()> {
787 let mut directories_to_remove: Vec<PathBuf> = Vec::new();
788 let mut files_to_remove: Vec<PathBuf> = Vec::new();
789
790 for entry in self.storage_dir.entries()?.flatten() {
791 let path = entry.path();
792
793 if path.is_file() {
794 files_to_remove.push(path);
795 continue;
796 }
797
798 let Ok(name) = entry.file_name().into_string() else {
799 directories_to_remove.push(path);
800 continue;
801 };
802
803 let is_recent = matches!(
804 name.parse::<u32>(),
805 Ok(seqno) if seqno >= recent_block_id.seqno || seqno <= zerostate_seqno
806 );
807 if !is_recent {
808 directories_to_remove.push(path);
809 }
810 }
811
812 for dir in directories_to_remove {
813 tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
814 if let Err(e) = std::fs::remove_dir_all(&dir) {
815 tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
816 }
817 }
818
819 for file in files_to_remove {
820 tracing::info!(file = %file.display(), "removing file");
821 if let Err(e) = std::fs::remove_file(&file) {
822 tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
823 }
824 }
825
826 Ok(())
827 }
828
829 fn cache_state(
830 &self,
831 mc_seqno: u32,
832 block_id: &BlockId,
833 kind: PersistentStateKind,
834 ) -> Result<PersistentState> {
835 use std::collections::btree_map;
836
837 use dashmap::mapref::entry::Entry;
838
839 let key = CacheKey {
840 block_id: *block_id,
841 kind,
842 };
843
844 let load_mapped = || {
845 let mut file = self
846 .mc_states_dir(mc_seqno)
847 .file(kind.make_file_name(block_id))
848 .read(true)
849 .open()?;
850
851 let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
855 .context("failed to create a temp file")?;
856
857 std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
861 temp_file.flush()?;
862 temp_file.seek(std::io::SeekFrom::Start(0))?;
863
864 MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
865 };
866
867 let file =
868 load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
869
870 let new_state = Arc::new(CachedState { mc_seqno, file });
871
872 let prev_mc_seqno = match self.descriptor_cache.entry(key) {
873 Entry::Vacant(entry) => {
874 entry.insert(new_state.clone());
875 None
876 }
877 Entry::Occupied(mut entry) => {
878 let prev_mc_seqno = entry.get().mc_seqno;
879 if mc_seqno <= prev_mc_seqno {
880 return Ok(PersistentState {
882 block_id: *block_id,
883 kind,
884 cached: entry.get().clone(),
885 });
886 }
887
888 entry.insert(new_state.clone());
889 Some(prev_mc_seqno)
890 }
891 };
892
893 let mut index = self.mc_seqno_to_block_ids.lock();
894
895 if let Some(prev_mc_seqno) = prev_mc_seqno
897 && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
898 {
899 entry.get_mut().remove(block_id);
900 if entry.get().is_empty() {
901 entry.remove();
902 }
903 }
904
905 index.entry(mc_seqno).or_default().insert(*block_id);
906
907 Ok(PersistentState {
908 block_id: *block_id,
909 kind,
910 cached: new_state,
911 })
912 }
913
914 fn clear_cache(&self, block_id: &BlockId) {
915 self.descriptor_cache.remove(&CacheKey {
916 block_id: *block_id,
917 kind: PersistentStateKind::Shard,
918 });
919 self.descriptor_cache.remove(&CacheKey {
920 block_id: *block_id,
921 kind: PersistentStateKind::Queue,
922 });
923 }
924}
925
926#[derive(Debug, Clone, Copy)]
927pub struct PersistentStateInfo {
928 pub size: NonZeroU64,
929 pub chunk_size: NonZeroU32,
930}
931
932#[derive(Clone)]
933pub struct PersistentState {
934 block_id: BlockId,
935 kind: PersistentStateKind,
936 cached: Arc<CachedState>,
937}
938
939impl PersistentState {
940 pub fn block_id(&self) -> &BlockId {
941 &self.block_id
942 }
943
944 pub fn kind(&self) -> PersistentStateKind {
945 self.kind
946 }
947
948 pub fn file(&self) -> &MappedFile {
949 &self.cached.file
950 }
951
952 pub fn mc_seqno(&self) -> u32 {
953 self.cached.mc_seqno
954 }
955}
956
957pub struct PersistentStateReceiver {
958 id: usize,
959 inner: Weak<Inner>,
960 receiver: mpsc::Receiver<PersistentState>,
961}
962
963impl std::ops::Deref for PersistentStateReceiver {
964 type Target = mpsc::Receiver<PersistentState>;
965
966 #[inline]
967 fn deref(&self) -> &Self::Target {
968 &self.receiver
969 }
970}
971
972impl std::ops::DerefMut for PersistentStateReceiver {
973 #[inline]
974 fn deref_mut(&mut self) -> &mut Self::Target {
975 &mut self.receiver
976 }
977}
978
979impl Drop for PersistentStateReceiver {
980 fn drop(&mut self) {
981 if let Some(inner) = self.inner.upgrade() {
982 let _guard = inner.subscriptions_mutex.lock();
983 let mut subscriptions = inner.subscriptions.load_full();
984 {
985 let subscriptions = Arc::make_mut(&mut subscriptions);
986 subscriptions.remove(&self.id);
987 }
988 inner.subscriptions.store(subscriptions);
989 }
990 }
991}
992
993static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0);
994
995struct CachedState {
996 mc_seqno: u32,
997 file: MappedFile,
998}
999
1000#[derive(Default)]
1001struct HandlesQueue {
1002 handles: VecDeque<BlockHandle>,
1003}
1004
1005impl HandlesQueue {
1006 fn oldest_known(&self) -> Option<&BlockHandle> {
1007 self.handles.back()
1008 }
1009
1010 fn push(&mut self, new_handle: BlockHandle) -> bool {
1011 if let Some(newest) = self.handles.front()
1013 && newest.id().seqno >= new_handle.id().seqno
1014 {
1015 return false;
1016 }
1017
1018 let now_utime = new_handle.gen_utime();
1020 let mut has_suitable = false;
1021 self.handles.retain(|old_handle| {
1022 if !has_suitable {
1023 has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
1024 true
1025 } else {
1026 false
1027 }
1028 });
1029
1030 self.handles.push_front(new_handle);
1032 true
1033 }
1034}
1035
1036const STATE_CHUNK_SIZE: u64 = 1024 * 1024;