1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Weak};
8
9use anyhow::{Context, Result};
10use arc_swap::{ArcSwap, ArcSwapAny};
11use dashmap::DashMap;
12use parking_lot::Mutex;
13use tokio::sync::{Notify, Semaphore, mpsc};
14use tokio::time::Instant;
15use tycho_block_util::block::BlockStuff;
16use tycho_block_util::queue::QueueStateHeader;
17use tycho_block_util::state::RefMcStateHandle;
18use tycho_storage::fs::Dir;
19use tycho_types::models::{BlockId, PrevBlockRef};
20use tycho_util::fs::MappedFile;
21use tycho_util::sync::CancellationFlag;
22use tycho_util::{FastHashMap, FastHashSet};
23
24pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
25pub use self::queue_state::writer::QueueStateWriter;
26pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
27pub use self::shard_state::writer::ShardStateWriter;
28use super::{
29 BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, ShardStateStorage,
30};
31
32mod queue_state {
33 pub mod reader;
34 pub mod writer;
35}
36mod shard_state {
37 pub mod reader;
38 pub mod writer;
39}
40
41#[cfg(test)]
42mod tests;
43
44const BASE_DIR: &str = "states";
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub enum PersistentStateKind {
48 Shard,
49 Queue,
50}
51
52impl PersistentStateKind {
53 pub fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
54 match self {
55 Self::Shard => ShardStateWriter::file_name(block_id),
56 Self::Queue => QueueStateWriter::file_name(block_id),
57 }
58 }
59
60 pub fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
61 match self {
62 Self::Shard => ShardStateWriter::temp_file_name(block_id),
63 Self::Queue => QueueStateWriter::temp_file_name(block_id),
64 }
65 }
66
67 pub fn from_extension(extension: &str) -> Option<Self> {
68 match extension {
69 ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
70 QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
71 _ => None,
72 }
73 }
74}
75
76#[derive(Debug, Eq, Hash, PartialEq)]
77struct CacheKey {
78 block_id: BlockId,
79 kind: PersistentStateKind,
80}
81
82#[derive(Clone)]
83pub struct PersistentStateStorage {
84 inner: Arc<Inner>,
85}
86
87impl PersistentStateStorage {
88 pub fn new(
89 cells_db: CellsDb,
90 files_dir: &Dir,
91 block_handle_storage: Arc<BlockHandleStorage>,
92 block_storage: Arc<BlockStorage>,
93 shard_state_storage: Arc<ShardStateStorage>,
94 ) -> Result<Self> {
95 const MAX_PARALLEL_CHUNK_READS: usize = 20;
96
97 let storage_dir = files_dir.create_subdir(BASE_DIR)?;
98
99 Ok(Self {
100 inner: Arc::new(Inner {
101 cells_db,
102 storage_dir,
103 block_handles: block_handle_storage,
104 blocks: block_storage,
105 shard_states: shard_state_storage,
106 descriptor_cache: Default::default(),
107 mc_seqno_to_block_ids: Default::default(),
108 chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
109 handles_queue: Default::default(),
110 oldest_ps_changed: Default::default(),
111 oldest_ps_handle: Default::default(),
112 subscriptions: Default::default(),
113 subscriptions_mutex: Default::default(),
114 }),
115 })
116 }
117
118 pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
119 self.inner.oldest_ps_handle.load_full()
120 }
121
122 pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
123 self.inner.oldest_ps_changed.notified()
124 }
125
126 #[tracing::instrument(skip_all)]
127 pub async fn preload(&self) -> Result<()> {
128 self.preload_handles_queue()?;
129 self.preload_states().await
130 }
131
132 fn preload_handles_queue(&self) -> Result<()> {
133 let this = self.inner.as_ref();
134
135 let block_handles = this.block_handles.as_ref();
136
137 let mut changed = false;
138 let mut prev_utime = 0;
139 for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
140 let block_handle = block_handles
141 .load_handle(&block_id)
142 .context("key block handle not found")?;
143
144 let gen_utime = block_handle.gen_utime();
145 if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
146 prev_utime = gen_utime;
147
148 let mut queue = this.handles_queue.lock();
149 if queue.push(block_handle) {
150 this.oldest_ps_handle.store(queue.oldest_known().cloned());
151 changed = true;
152 }
153 }
154 }
155
156 if changed {
157 this.oldest_ps_changed.notify_waiters();
158 }
159 Ok(())
160 }
161
162 async fn preload_states(&self) -> Result<()> {
163 let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
165 'outer: for entry in std::fs::read_dir(dir)?.flatten() {
166 let path = entry.path();
167 if path.is_dir() {
169 tracing::warn!(path = %path.display(), "unexpected directory");
170 continue;
171 }
172
173 'file: {
174 let Ok(block_id) = path
176 .file_stem()
178 .unwrap_or_default()
179 .to_str()
180 .unwrap_or_default()
181 .parse::<BlockId>()
182 else {
183 break 'file;
184 };
185
186 let extension = path
187 .extension()
188 .and_then(|ext| ext.to_str())
189 .unwrap_or_default();
190
191 let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
192 break 'file;
193 };
194
195 this.cache_state(mc_seqno, &block_id, cache_type)?;
196 continue 'outer;
197 }
198 tracing::warn!(path = %path.display(), "unexpected file");
199 }
200 Ok(())
201 };
202
203 let this = self.inner.clone();
204 let span = tracing::Span::current();
205 tokio::task::spawn_blocking(move || {
206 let _span = span.enter();
207
208 'outer: for entry in this.storage_dir.entries()?.flatten() {
210 let path = entry.path();
211 if path.is_file() {
213 tracing::warn!(path = %path.display(), "unexpected file");
214 continue;
215 }
216
217 'dir: {
218 let Ok(name) = entry.file_name().into_string() else {
220 break 'dir;
221 };
222 let Ok(mc_seqno) = name.parse::<u32>() else {
223 break 'dir;
224 };
225
226 process_states(&this, &path, mc_seqno)?;
228 continue 'outer;
229 }
230 tracing::warn!(path = %path.display(), "unexpected directory");
231 }
232
233 Ok(())
234 })
235 .await?
236 }
237
238 pub fn state_chunk_size(&self) -> NonZeroU32 {
241 NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
242 }
243
244 pub fn subscribe(&self) -> (Vec<PersistentState>, PersistentStateReceiver) {
245 let id = RECEIVER_ID.fetch_add(1, Ordering::Relaxed);
246 let (sender, receiver) = mpsc::channel(1);
247
248 {
252 let _guard = self.inner.subscriptions_mutex.lock();
253 let mut subscriptions = self.inner.subscriptions.load_full();
254 {
255 let subscriptions = Arc::make_mut(&mut subscriptions);
256 let prev = subscriptions.insert(id, sender);
257 assert!(
258 prev.is_none(),
259 "persistent state subscription must be unique"
260 );
261 }
262 self.inner.subscriptions.store(subscriptions);
263 }
264
265 let receiver = PersistentStateReceiver {
266 id,
267 inner: Arc::downgrade(&self.inner),
268 receiver,
269 };
270
271 let initial_states = self
272 .inner
273 .descriptor_cache
274 .iter()
275 .map(|item| PersistentState {
276 block_id: item.key().block_id,
277 kind: item.key().kind,
278 cached: item.value().clone(),
279 })
280 .collect();
281
282 (initial_states, receiver)
283 }
284
285 pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
286 self.inner.descriptor_cache.contains_key(&CacheKey {
287 block_id: *block_id,
288 kind,
289 })
290 }
291
292 pub fn get_state_info(
293 &self,
294 block_id: &BlockId,
295 kind: PersistentStateKind,
296 ) -> Option<PersistentStateInfo> {
297 self.inner
298 .descriptor_cache
299 .get(&CacheKey {
300 block_id: *block_id,
301 kind,
302 })
303 .and_then(|cached| {
304 let size = NonZeroU64::new(cached.file.length() as u64)?;
305 Some(PersistentStateInfo {
306 size,
307 chunk_size: self.state_chunk_size(),
308 })
309 })
310 }
311
312 pub async fn read_state_part(
313 &self,
314 block_id: &BlockId,
315 offset: u64,
316 state_kind: PersistentStateKind,
317 ) -> Option<Vec<u8>> {
318 let offset = usize::try_from(offset).ok()?;
320 let chunk_size = self.state_chunk_size().get() as usize;
321 if offset % chunk_size != 0 {
322 return None;
323 }
324
325 let permit = {
326 let semaphore = self.inner.chunks_semaphore.clone();
327 semaphore.acquire_owned().await.ok()?
328 };
329
330 let key = CacheKey {
331 block_id: *block_id,
332 kind: state_kind,
333 };
334 let cached = self.inner.descriptor_cache.get(&key)?.clone();
335 if offset > cached.file.length() {
336 return None;
337 }
338
339 tokio::task::spawn_blocking(move || {
343 let _permit = permit;
345
346 let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
347 cached.file.as_slice()[offset..end].to_vec()
348 })
349 .await
350 .ok()
351 }
352
353 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
354 pub async fn store_shard_state(
355 &self,
356 mc_seqno: u32,
357 handle: &BlockHandle,
358 tracker_handle: RefMcStateHandle,
359 ) -> Result<()> {
360 if self
361 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
362 .await?
363 {
364 return Ok(());
365 }
366
367 let cancelled = CancellationFlag::new();
368 scopeguard::defer! {
369 cancelled.cancel();
370 }
371
372 let handle = handle.clone();
373 let this = self.inner.clone();
374 let cancelled = cancelled.clone();
375 let span = tracing::Span::current();
376
377 let state = tokio::task::spawn_blocking(move || {
378 let _span = span.enter();
379
380 let guard = scopeguard::guard((), |_| {
381 tracing::warn!("cancelled");
382 });
383
384 let _tracker_handle = tracker_handle;
386
387 let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
388
389 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
390
391 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
392 match cell_writer.write(&root_hash, Some(&cancelled)) {
393 Ok(()) => {
394 this.block_handles.set_has_persistent_shard_state(&handle);
395 tracing::info!("persistent shard state saved");
396 }
397 Err(e) => {
398 tracing::error!("failed to write persistent shard state: {e:?}");
400 }
401 }
402
403 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
404
405 scopeguard::ScopeGuard::into_inner(guard);
406 Ok::<_, anyhow::Error>(state)
407 })
408 .await??;
409
410 self.notify_with_persistent_state(&state).await;
411 Ok(())
412 }
413
414 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
415 pub async fn store_shard_state_file(
416 &self,
417 mc_seqno: u32,
418 handle: &BlockHandle,
419 file: File,
420 ) -> Result<()> {
421 if self
422 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
423 .await?
424 {
425 return Ok(());
426 }
427
428 let cancelled = CancellationFlag::new();
429 scopeguard::defer! {
430 cancelled.cancel();
431 }
432
433 let handle = handle.clone();
434 let this = self.inner.clone();
435 let cancelled = cancelled.clone();
436 let span = tracing::Span::current();
437
438 let state = tokio::task::spawn_blocking(move || {
439 let _span = span.enter();
440
441 let guard = scopeguard::guard((), |_| {
442 tracing::warn!("cancelled");
443 });
444
445 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
446
447 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
448 cell_writer.write_file(file, Some(&cancelled))?;
449 this.block_handles.set_has_persistent_shard_state(&handle);
450 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
451
452 scopeguard::ScopeGuard::into_inner(guard);
453 Ok::<_, anyhow::Error>(state)
454 })
455 .await??;
456
457 self.notify_with_persistent_state(&state).await;
458 Ok(())
459 }
460
461 #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
462 pub async fn store_queue_state(
463 &self,
464 mc_seqno: u32,
465 handle: &BlockHandle,
466 block: BlockStuff,
467 ) -> Result<()> {
468 if self
469 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
470 .await?
471 {
472 return Ok(());
473 }
474
475 let this = self.inner.clone();
476
477 let shard_ident = handle.id().shard;
478
479 let mut queue_diffs = Vec::new();
480 let mut messages = Vec::new();
481
482 let mut top_block_handle = handle.clone();
483 let mut top_block = block;
484
485 let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
486
487 while tail_len > 0 {
488 let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
489 let top_block_info = top_block.load_info()?;
490
491 let block_extra = top_block.load_extra()?;
492 let out_messages = block_extra.load_out_msg_description()?;
493
494 messages.push(queue_diff.zip(&out_messages));
495 queue_diffs.push(queue_diff.diff().clone());
496
497 if tail_len == 1 {
498 break;
499 }
500
501 let prev_block_id = match top_block_info.load_prev_ref()? {
502 PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
503 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
504 };
505
506 let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
507 anyhow::bail!("prev block handle not found for: {prev_block_id}");
508 };
509 let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
510
511 top_block_handle = prev_block_handle;
512 top_block = prev_block;
513 tail_len -= 1;
514 }
515
516 let state = QueueStateHeader {
517 shard_ident,
518 seqno: handle.id().seqno,
519 queue_diffs,
520 };
521
522 let cancelled = CancellationFlag::new();
523 scopeguard::defer! {
524 cancelled.cancel();
525 }
526
527 let handle = handle.clone();
528 let cancelled = cancelled.clone();
529 let span = tracing::Span::current();
530
531 let state = tokio::task::spawn_blocking(move || {
532 let _span = span.enter();
533
534 let guard = scopeguard::guard((), |_| {
535 tracing::warn!("cancelled");
536 });
537
538 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
539 match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
540 .write(Some(&cancelled))
541 {
542 Ok(()) => {
543 this.block_handles.set_has_persistent_queue_state(&handle);
544 tracing::info!("persistent queue state saved");
545 }
546 Err(e) => {
547 tracing::error!("failed to write persistent queue state: {e:?}");
548 }
549 }
550
551 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
552
553 scopeguard::ScopeGuard::into_inner(guard);
554 Ok::<_, anyhow::Error>(state)
555 })
556 .await??;
557
558 self.notify_with_persistent_state(&state).await;
559 Ok(())
560 }
561
562 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
563 pub async fn store_queue_state_file(
564 &self,
565 mc_seqno: u32,
566 handle: &BlockHandle,
567 file: File,
568 ) -> Result<()> {
569 if self
570 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
571 .await?
572 {
573 return Ok(());
574 }
575
576 let cancelled = CancellationFlag::new();
577 scopeguard::defer! {
578 cancelled.cancel();
579 }
580
581 let handle = handle.clone();
582 let this = self.inner.clone();
583 let cancelled = cancelled.clone();
584 let span = tracing::Span::current();
585
586 let state = tokio::task::spawn_blocking(move || {
587 let _span = span.enter();
588
589 let guard = scopeguard::guard((), |_| {
590 tracing::warn!("cancelled");
591 });
592
593 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
594
595 QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
596 this.block_handles.set_has_persistent_queue_state(&handle);
597 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
598
599 scopeguard::ScopeGuard::into_inner(guard);
600 Ok::<_, anyhow::Error>(state)
601 })
602 .await??;
603
604 self.notify_with_persistent_state(&state).await;
605 Ok(())
606 }
607
608 pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
609 anyhow::ensure!(
610 top_handle.is_masterchain(),
611 "top persistent state handle must be in the masterchain"
612 );
613
614 {
615 tracing::info!(
616 mc_block_id = %top_handle.id(),
617 "adding new persistent state to the queue"
618 );
619
620 let mut queue = self.inner.handles_queue.lock();
621 if queue.push(top_handle.clone()) {
622 self.inner
623 .oldest_ps_handle
624 .store(queue.oldest_known().cloned());
625 self.inner.oldest_ps_changed.notify_waiters();
626 }
627 }
628
629 tracing::info!("started clearing old persistent state directories");
630 let start = Instant::now();
631 scopeguard::defer! {
632 tracing::info!(
633 elapsed = %humantime::format_duration(start.elapsed()),
634 "clearing old persistent state directories completed"
635 );
636 }
637
638 let this = self.inner.clone();
639 let mut top_handle = top_handle.clone();
640 if top_handle.id().seqno == 0 {
641 return Ok(());
643 }
644
645 let span = tracing::Span::current();
646 tokio::task::spawn_blocking(move || {
647 let _span = span.enter();
648
649 let block_handles = &this.block_handles;
650
651 let now_utime = top_handle.gen_utime();
652
653 let mut has_suitable = false;
655 loop {
656 match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
657 Some(handle) if !has_suitable => {
659 has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
660 top_handle = handle;
661 }
662 Some(handle) => {
664 top_handle = handle;
665 break;
666 }
667 None => return Ok(()),
669 }
670 }
671
672 let mut index = this.mc_seqno_to_block_ids.lock();
674 index.retain(|&mc_seqno, block_ids| {
675 if mc_seqno >= top_handle.id().seqno || mc_seqno == 0 {
676 return true;
677 }
678
679 for block_id in block_ids.drain() {
680 this.clear_cache(&block_id);
682 }
683 false
684 });
685
686 this.clear_outdated_state_entries(top_handle.id())
688 })
689 .await?
690 }
691
692 async fn try_reuse_persistent_state(
693 &self,
694 mc_seqno: u32,
695 handle: &BlockHandle,
696 kind: PersistentStateKind,
697 ) -> Result<bool> {
698 match kind {
700 PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
701 PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
702 _ => {}
703 }
704
705 let block_id = *handle.id();
706
707 let Some(cached) = self
708 .inner
709 .descriptor_cache
710 .get(&CacheKey { block_id, kind })
711 .map(|r| r.clone())
712 else {
713 return Ok(false);
715 };
716
717 if cached.mc_seqno >= mc_seqno {
718 return Ok(true);
720 }
721
722 let this = self.inner.clone();
723
724 let span = tracing::Span::current();
725 let state = tokio::task::spawn_blocking(move || {
726 let _span = span.enter();
727
728 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
729
730 let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
731 std::fs::write(temp_file.path(), cached.file.as_slice())?;
732 temp_file.rename(kind.make_file_name(&block_id))?;
733
734 drop(cached);
735
736 this.cache_state(mc_seqno, &block_id, kind)
737 })
738 .await??;
739
740 self.notify_with_persistent_state(&state).await;
741 Ok(true)
742 }
743
744 async fn notify_with_persistent_state(&self, state: &PersistentState) {
745 let subscriptions = self.inner.subscriptions.load_full();
746 for sender in subscriptions.values() {
747 sender.send(state.clone()).await.ok();
748 }
749 }
750}
751
752struct Inner {
753 cells_db: CellsDb,
754 storage_dir: Dir,
755 block_handles: Arc<BlockHandleStorage>,
756 blocks: Arc<BlockStorage>,
757 shard_states: Arc<ShardStateStorage>,
758 descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
759 mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
760 chunks_semaphore: Arc<Semaphore>,
761 handles_queue: Mutex<HandlesQueue>,
762 oldest_ps_changed: Notify,
763 oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
764 subscriptions: ArcSwap<FastHashMap<usize, mpsc::Sender<PersistentState>>>,
765 subscriptions_mutex: Mutex<()>,
766}
767
768impl Inner {
769 fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
770 let states_dir = self.mc_states_dir(mc_seqno);
771 if !states_dir.path().is_dir() {
772 tracing::info!(mc_seqno, "creating persistent state directory");
773 states_dir.create_if_not_exists()?;
774 }
775 Ok(states_dir)
776 }
777
778 fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
779 Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
780 }
781
782 fn clear_outdated_state_entries(&self, recent_block_id: &BlockId) -> Result<()> {
783 let mut directories_to_remove: Vec<PathBuf> = Vec::new();
784 let mut files_to_remove: Vec<PathBuf> = Vec::new();
785
786 for entry in self.storage_dir.entries()?.flatten() {
787 let path = entry.path();
788
789 if path.is_file() {
790 files_to_remove.push(path);
791 continue;
792 }
793
794 let Ok(name) = entry.file_name().into_string() else {
795 directories_to_remove.push(path);
796 continue;
797 };
798
799 let is_recent = matches!(
800 name.parse::<u32>(),
801 Ok(seqno) if seqno >= recent_block_id.seqno || seqno == 0
802 );
803 if !is_recent {
804 directories_to_remove.push(path);
805 }
806 }
807
808 for dir in directories_to_remove {
809 tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
810 if let Err(e) = std::fs::remove_dir_all(&dir) {
811 tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
812 }
813 }
814
815 for file in files_to_remove {
816 tracing::info!(file = %file.display(), "removing file");
817 if let Err(e) = std::fs::remove_file(&file) {
818 tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
819 }
820 }
821
822 Ok(())
823 }
824
825 fn cache_state(
826 &self,
827 mc_seqno: u32,
828 block_id: &BlockId,
829 kind: PersistentStateKind,
830 ) -> Result<PersistentState> {
831 use std::collections::btree_map;
832
833 use dashmap::mapref::entry::Entry;
834
835 let key = CacheKey {
836 block_id: *block_id,
837 kind,
838 };
839
840 let load_mapped = || {
841 let mut file = self
842 .mc_states_dir(mc_seqno)
843 .file(kind.make_file_name(block_id))
844 .read(true)
845 .open()?;
846
847 let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
851 .context("failed to create a temp file")?;
852
853 std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
857 temp_file.flush()?;
858 temp_file.seek(std::io::SeekFrom::Start(0))?;
859
860 MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
861 };
862
863 let file =
864 load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
865
866 let new_state = Arc::new(CachedState { mc_seqno, file });
867
868 let prev_mc_seqno = match self.descriptor_cache.entry(key) {
869 Entry::Vacant(entry) => {
870 entry.insert(new_state.clone());
871 None
872 }
873 Entry::Occupied(mut entry) => {
874 let prev_mc_seqno = entry.get().mc_seqno;
875 if mc_seqno <= prev_mc_seqno {
876 return Ok(PersistentState {
878 block_id: *block_id,
879 kind,
880 cached: entry.get().clone(),
881 });
882 }
883
884 entry.insert(new_state.clone());
885 Some(prev_mc_seqno)
886 }
887 };
888
889 let mut index = self.mc_seqno_to_block_ids.lock();
890
891 if let Some(prev_mc_seqno) = prev_mc_seqno
893 && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
894 {
895 entry.get_mut().remove(block_id);
896 if entry.get().is_empty() {
897 entry.remove();
898 }
899 }
900
901 index.entry(mc_seqno).or_default().insert(*block_id);
902
903 Ok(PersistentState {
904 block_id: *block_id,
905 kind,
906 cached: new_state,
907 })
908 }
909
910 fn clear_cache(&self, block_id: &BlockId) {
911 self.descriptor_cache.remove(&CacheKey {
912 block_id: *block_id,
913 kind: PersistentStateKind::Shard,
914 });
915 self.descriptor_cache.remove(&CacheKey {
916 block_id: *block_id,
917 kind: PersistentStateKind::Queue,
918 });
919 }
920}
921
922#[derive(Debug, Clone, Copy)]
923pub struct PersistentStateInfo {
924 pub size: NonZeroU64,
925 pub chunk_size: NonZeroU32,
926}
927
928#[derive(Clone)]
929pub struct PersistentState {
930 block_id: BlockId,
931 kind: PersistentStateKind,
932 cached: Arc<CachedState>,
933}
934
935impl PersistentState {
936 pub fn block_id(&self) -> &BlockId {
937 &self.block_id
938 }
939
940 pub fn kind(&self) -> PersistentStateKind {
941 self.kind
942 }
943
944 pub fn file(&self) -> &MappedFile {
945 &self.cached.file
946 }
947
948 pub fn mc_seqno(&self) -> u32 {
949 self.cached.mc_seqno
950 }
951}
952
953pub struct PersistentStateReceiver {
954 id: usize,
955 inner: Weak<Inner>,
956 receiver: mpsc::Receiver<PersistentState>,
957}
958
959impl std::ops::Deref for PersistentStateReceiver {
960 type Target = mpsc::Receiver<PersistentState>;
961
962 #[inline]
963 fn deref(&self) -> &Self::Target {
964 &self.receiver
965 }
966}
967
968impl std::ops::DerefMut for PersistentStateReceiver {
969 #[inline]
970 fn deref_mut(&mut self) -> &mut Self::Target {
971 &mut self.receiver
972 }
973}
974
975impl Drop for PersistentStateReceiver {
976 fn drop(&mut self) {
977 if let Some(inner) = self.inner.upgrade() {
978 let _guard = inner.subscriptions_mutex.lock();
979 let mut subscriptions = inner.subscriptions.load_full();
980 {
981 let subscriptions = Arc::make_mut(&mut subscriptions);
982 subscriptions.remove(&self.id);
983 }
984 inner.subscriptions.store(subscriptions);
985 }
986 }
987}
988
989static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0);
990
991struct CachedState {
992 mc_seqno: u32,
993 file: MappedFile,
994}
995
996#[derive(Default)]
997struct HandlesQueue {
998 handles: VecDeque<BlockHandle>,
999}
1000
1001impl HandlesQueue {
1002 fn oldest_known(&self) -> Option<&BlockHandle> {
1003 self.handles.back()
1004 }
1005
1006 fn push(&mut self, new_handle: BlockHandle) -> bool {
1007 if let Some(newest) = self.handles.front()
1009 && newest.id().seqno >= new_handle.id().seqno
1010 {
1011 return false;
1012 }
1013
1014 let now_utime = new_handle.gen_utime();
1016 let mut has_suitable = false;
1017 self.handles.retain(|old_handle| {
1018 if !has_suitable {
1019 has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
1020 true
1021 } else {
1022 false
1023 }
1024 });
1025
1026 self.handles.push_front(new_handle);
1028 true
1029 }
1030}
1031
1032const STATE_CHUNK_SIZE: u64 = 1024 * 1024;