1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Weak};
8
9use anyhow::{Context, Result};
10use arc_swap::{ArcSwap, ArcSwapAny};
11use dashmap::DashMap;
12use parking_lot::Mutex;
13use tokio::sync::{Notify, Semaphore, mpsc};
14use tokio::time::Instant;
15use tycho_block_util::block::BlockStuff;
16use tycho_block_util::queue::QueueStateHeader;
17use tycho_block_util::state::RefMcStateHandle;
18use tycho_storage::fs::{Dir, MappedFile};
19use tycho_types::models::{BlockId, PrevBlockRef};
20use tycho_util::sync::CancellationFlag;
21use tycho_util::{FastHashMap, FastHashSet};
22
23pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
24pub use self::queue_state::writer::QueueStateWriter;
25pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
26pub use self::shard_state::writer::ShardStateWriter;
27use super::{
28 BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, ShardStateStorage,
29};
30
31mod queue_state {
32 pub mod reader;
33 pub mod writer;
34}
35mod shard_state {
36 pub mod reader;
37 pub mod writer;
38}
39
40#[cfg(test)]
41mod tests;
42
43const BASE_DIR: &str = "states";
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
46pub enum PersistentStateKind {
47 Shard,
48 Queue,
49}
50
51impl PersistentStateKind {
52 pub fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
53 match self {
54 Self::Shard => ShardStateWriter::file_name(block_id),
55 Self::Queue => QueueStateWriter::file_name(block_id),
56 }
57 }
58
59 pub fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
60 match self {
61 Self::Shard => ShardStateWriter::temp_file_name(block_id),
62 Self::Queue => QueueStateWriter::temp_file_name(block_id),
63 }
64 }
65
66 pub fn from_extension(extension: &str) -> Option<Self> {
67 match extension {
68 ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
69 QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
70 _ => None,
71 }
72 }
73}
74
75#[derive(Debug, Eq, Hash, PartialEq)]
76struct CacheKey {
77 block_id: BlockId,
78 kind: PersistentStateKind,
79}
80
81#[derive(Clone)]
82pub struct PersistentStateStorage {
83 inner: Arc<Inner>,
84}
85
86impl PersistentStateStorage {
87 pub fn new(
88 cells_db: CellsDb,
89 files_dir: &Dir,
90 block_handle_storage: Arc<BlockHandleStorage>,
91 block_storage: Arc<BlockStorage>,
92 shard_state_storage: Arc<ShardStateStorage>,
93 ) -> Result<Self> {
94 const MAX_PARALLEL_CHUNK_READS: usize = 20;
95
96 let storage_dir = files_dir.create_subdir(BASE_DIR)?;
97
98 Ok(Self {
99 inner: Arc::new(Inner {
100 cells_db,
101 storage_dir,
102 block_handles: block_handle_storage,
103 blocks: block_storage,
104 shard_states: shard_state_storage,
105 descriptor_cache: Default::default(),
106 mc_seqno_to_block_ids: Default::default(),
107 chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
108 handles_queue: Default::default(),
109 oldest_ps_changed: Default::default(),
110 oldest_ps_handle: Default::default(),
111 subscriptions: Default::default(),
112 subscriptions_mutex: Default::default(),
113 }),
114 })
115 }
116
117 pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
118 self.inner.oldest_ps_handle.load_full()
119 }
120
121 pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
122 self.inner.oldest_ps_changed.notified()
123 }
124
125 #[tracing::instrument(skip_all)]
126 pub async fn preload(&self) -> Result<()> {
127 self.preload_handles_queue()?;
128 self.preload_states().await
129 }
130
131 fn preload_handles_queue(&self) -> Result<()> {
132 let this = self.inner.as_ref();
133
134 let block_handles = this.block_handles.as_ref();
135
136 let mut changed = false;
137 let mut prev_utime = 0;
138 for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
139 let block_handle = block_handles
140 .load_handle(&block_id)
141 .context("key block handle not found")?;
142
143 let gen_utime = block_handle.gen_utime();
144 if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
145 prev_utime = gen_utime;
146
147 let mut queue = this.handles_queue.lock();
148 if queue.push(block_handle) {
149 this.oldest_ps_handle.store(queue.oldest_known().cloned());
150 changed = true;
151 }
152 }
153 }
154
155 if changed {
156 this.oldest_ps_changed.notify_waiters();
157 }
158 Ok(())
159 }
160
161 async fn preload_states(&self) -> Result<()> {
162 let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
164 'outer: for entry in std::fs::read_dir(dir)?.flatten() {
165 let path = entry.path();
166 if path.is_dir() {
168 tracing::warn!(path = %path.display(), "unexpected directory");
169 continue;
170 }
171
172 'file: {
173 let Ok(block_id) = path
175 .file_stem()
177 .unwrap_or_default()
178 .to_str()
179 .unwrap_or_default()
180 .parse::<BlockId>()
181 else {
182 break 'file;
183 };
184
185 let extension = path
186 .extension()
187 .and_then(|ext| ext.to_str())
188 .unwrap_or_default();
189
190 let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
191 break 'file;
192 };
193
194 this.cache_state(mc_seqno, &block_id, cache_type)?;
195 continue 'outer;
196 }
197 tracing::warn!(path = %path.display(), "unexpected file");
198 }
199 Ok(())
200 };
201
202 let this = self.inner.clone();
203 let span = tracing::Span::current();
204 tokio::task::spawn_blocking(move || {
205 let _span = span.enter();
206
207 'outer: for entry in this.storage_dir.entries()?.flatten() {
209 let path = entry.path();
210 if path.is_file() {
212 tracing::warn!(path = %path.display(), "unexpected file");
213 continue;
214 }
215
216 'dir: {
217 let Ok(name) = entry.file_name().into_string() else {
219 break 'dir;
220 };
221 let Ok(mc_seqno) = name.parse::<u32>() else {
222 break 'dir;
223 };
224
225 process_states(&this, &path, mc_seqno)?;
227 continue 'outer;
228 }
229 tracing::warn!(path = %path.display(), "unexpected directory");
230 }
231
232 Ok(())
233 })
234 .await?
235 }
236
237 pub fn state_chunk_size(&self) -> NonZeroU32 {
240 NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
241 }
242
243 pub fn subscribe(&self) -> (Vec<PersistentState>, PersistentStateReceiver) {
244 let id = RECEIVER_ID.fetch_add(1, Ordering::Relaxed);
245 let (sender, receiver) = mpsc::channel(1);
246
247 {
251 let _guard = self.inner.subscriptions_mutex.lock();
252 let mut subscriptions = self.inner.subscriptions.load_full();
253 {
254 let subscriptions = Arc::make_mut(&mut subscriptions);
255 let prev = subscriptions.insert(id, sender);
256 assert!(
257 prev.is_none(),
258 "persistent state subscription must be unique"
259 );
260 }
261 self.inner.subscriptions.store(subscriptions);
262 }
263
264 let receiver = PersistentStateReceiver {
265 id,
266 inner: Arc::downgrade(&self.inner),
267 receiver,
268 };
269
270 let initial_states = self
271 .inner
272 .descriptor_cache
273 .iter()
274 .map(|item| PersistentState {
275 block_id: item.key().block_id,
276 kind: item.key().kind,
277 cached: item.value().clone(),
278 })
279 .collect();
280
281 (initial_states, receiver)
282 }
283
284 pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
285 self.inner.descriptor_cache.contains_key(&CacheKey {
286 block_id: *block_id,
287 kind,
288 })
289 }
290
291 pub fn get_state_info(
292 &self,
293 block_id: &BlockId,
294 kind: PersistentStateKind,
295 ) -> Option<PersistentStateInfo> {
296 self.inner
297 .descriptor_cache
298 .get(&CacheKey {
299 block_id: *block_id,
300 kind,
301 })
302 .and_then(|cached| {
303 let size = NonZeroU64::new(cached.file.length() as u64)?;
304 Some(PersistentStateInfo {
305 size,
306 chunk_size: self.state_chunk_size(),
307 })
308 })
309 }
310
311 pub async fn read_state_part(
312 &self,
313 block_id: &BlockId,
314 offset: u64,
315 state_kind: PersistentStateKind,
316 ) -> Option<Vec<u8>> {
317 let offset = usize::try_from(offset).ok()?;
319 let chunk_size = self.state_chunk_size().get() as usize;
320 if offset % chunk_size != 0 {
321 return None;
322 }
323
324 let permit = {
325 let semaphore = self.inner.chunks_semaphore.clone();
326 semaphore.acquire_owned().await.ok()?
327 };
328
329 let key = CacheKey {
330 block_id: *block_id,
331 kind: state_kind,
332 };
333 let cached = self.inner.descriptor_cache.get(&key)?.clone();
334 if offset > cached.file.length() {
335 return None;
336 }
337
338 tokio::task::spawn_blocking(move || {
342 let _permit = permit;
344
345 let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
346 cached.file.as_slice()[offset..end].to_vec()
347 })
348 .await
349 .ok()
350 }
351
352 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
353 pub async fn store_shard_state(
354 &self,
355 mc_seqno: u32,
356 handle: &BlockHandle,
357 tracker_handle: RefMcStateHandle,
358 ) -> Result<()> {
359 if self
360 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
361 .await?
362 {
363 return Ok(());
364 }
365
366 let cancelled = CancellationFlag::new();
367 scopeguard::defer! {
368 cancelled.cancel();
369 }
370
371 let handle = handle.clone();
372 let this = self.inner.clone();
373 let cancelled = cancelled.clone();
374 let span = tracing::Span::current();
375
376 let state = tokio::task::spawn_blocking(move || {
377 let _span = span.enter();
378
379 let guard = scopeguard::guard((), |_| {
380 tracing::warn!("cancelled");
381 });
382
383 let _tracker_handle = tracker_handle;
385
386 let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
387
388 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
389
390 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
391 match cell_writer.write(&root_hash, Some(&cancelled)) {
392 Ok(()) => {
393 this.block_handles.set_has_persistent_shard_state(&handle);
394 tracing::info!("persistent shard state saved");
395 }
396 Err(e) => {
397 tracing::error!("failed to write persistent shard state: {e:?}");
399 }
400 }
401
402 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
403
404 scopeguard::ScopeGuard::into_inner(guard);
405 Ok::<_, anyhow::Error>(state)
406 })
407 .await??;
408
409 self.notify_with_persistent_state(&state).await;
410 Ok(())
411 }
412
413 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
414 pub async fn store_shard_state_file(
415 &self,
416 mc_seqno: u32,
417 handle: &BlockHandle,
418 file: File,
419 ) -> Result<()> {
420 if self
421 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
422 .await?
423 {
424 return Ok(());
425 }
426
427 let cancelled = CancellationFlag::new();
428 scopeguard::defer! {
429 cancelled.cancel();
430 }
431
432 let handle = handle.clone();
433 let this = self.inner.clone();
434 let cancelled = cancelled.clone();
435 let span = tracing::Span::current();
436
437 let state = tokio::task::spawn_blocking(move || {
438 let _span = span.enter();
439
440 let guard = scopeguard::guard((), |_| {
441 tracing::warn!("cancelled");
442 });
443
444 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
445
446 let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
447 cell_writer.write_file(file, Some(&cancelled))?;
448 this.block_handles.set_has_persistent_shard_state(&handle);
449 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
450
451 scopeguard::ScopeGuard::into_inner(guard);
452 Ok::<_, anyhow::Error>(state)
453 })
454 .await??;
455
456 self.notify_with_persistent_state(&state).await;
457 Ok(())
458 }
459
460 #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
461 pub async fn store_queue_state(
462 &self,
463 mc_seqno: u32,
464 handle: &BlockHandle,
465 block: BlockStuff,
466 ) -> Result<()> {
467 if self
468 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
469 .await?
470 {
471 return Ok(());
472 }
473
474 let this = self.inner.clone();
475
476 let shard_ident = handle.id().shard;
477
478 let mut queue_diffs = Vec::new();
479 let mut messages = Vec::new();
480
481 let mut top_block_handle = handle.clone();
482 let mut top_block = block;
483
484 let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
485
486 while tail_len > 0 {
487 let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
488 let top_block_info = top_block.load_info()?;
489
490 let block_extra = top_block.load_extra()?;
491 let out_messages = block_extra.load_out_msg_description()?;
492
493 messages.push(queue_diff.zip(&out_messages));
494 queue_diffs.push(queue_diff.diff().clone());
495
496 if tail_len == 1 {
497 break;
498 }
499
500 let prev_block_id = match top_block_info.load_prev_ref()? {
501 PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
502 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
503 };
504
505 let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
506 anyhow::bail!("prev block handle not found for: {prev_block_id}");
507 };
508 let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
509
510 top_block_handle = prev_block_handle;
511 top_block = prev_block;
512 tail_len -= 1;
513 }
514
515 let state = QueueStateHeader {
516 shard_ident,
517 seqno: handle.id().seqno,
518 queue_diffs,
519 };
520
521 let cancelled = CancellationFlag::new();
522 scopeguard::defer! {
523 cancelled.cancel();
524 }
525
526 let handle = handle.clone();
527 let cancelled = cancelled.clone();
528 let span = tracing::Span::current();
529
530 let state = tokio::task::spawn_blocking(move || {
531 let _span = span.enter();
532
533 let guard = scopeguard::guard((), |_| {
534 tracing::warn!("cancelled");
535 });
536
537 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
538 match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
539 .write(Some(&cancelled))
540 {
541 Ok(()) => {
542 this.block_handles.set_has_persistent_queue_state(&handle);
543 tracing::info!("persistent queue state saved");
544 }
545 Err(e) => {
546 tracing::error!("failed to write persistent queue state: {e:?}");
547 }
548 }
549
550 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
551
552 scopeguard::ScopeGuard::into_inner(guard);
553 Ok::<_, anyhow::Error>(state)
554 })
555 .await??;
556
557 self.notify_with_persistent_state(&state).await;
558 Ok(())
559 }
560
561 #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
562 pub async fn store_queue_state_file(
563 &self,
564 mc_seqno: u32,
565 handle: &BlockHandle,
566 file: File,
567 ) -> Result<()> {
568 if self
569 .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
570 .await?
571 {
572 return Ok(());
573 }
574
575 let cancelled = CancellationFlag::new();
576 scopeguard::defer! {
577 cancelled.cancel();
578 }
579
580 let handle = handle.clone();
581 let this = self.inner.clone();
582 let cancelled = cancelled.clone();
583 let span = tracing::Span::current();
584
585 let state = tokio::task::spawn_blocking(move || {
586 let _span = span.enter();
587
588 let guard = scopeguard::guard((), |_| {
589 tracing::warn!("cancelled");
590 });
591
592 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
593
594 QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
595 this.block_handles.set_has_persistent_queue_state(&handle);
596 let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
597
598 scopeguard::ScopeGuard::into_inner(guard);
599 Ok::<_, anyhow::Error>(state)
600 })
601 .await??;
602
603 self.notify_with_persistent_state(&state).await;
604 Ok(())
605 }
606
607 pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
608 anyhow::ensure!(
609 top_handle.is_masterchain(),
610 "top persistent state handle must be in the masterchain"
611 );
612
613 {
614 tracing::info!(
615 mc_block_id = %top_handle.id(),
616 "adding new persistent state to the queue"
617 );
618
619 let mut queue = self.inner.handles_queue.lock();
620 if queue.push(top_handle.clone()) {
621 self.inner
622 .oldest_ps_handle
623 .store(queue.oldest_known().cloned());
624 self.inner.oldest_ps_changed.notify_waiters();
625 }
626 }
627
628 tracing::info!("started clearing old persistent state directories");
629 let start = Instant::now();
630 scopeguard::defer! {
631 tracing::info!(
632 elapsed = %humantime::format_duration(start.elapsed()),
633 "clearing old persistent state directories completed"
634 );
635 }
636
637 let this = self.inner.clone();
638 let mut top_handle = top_handle.clone();
639 if top_handle.id().seqno == 0 {
640 return Ok(());
642 }
643
644 let span = tracing::Span::current();
645 tokio::task::spawn_blocking(move || {
646 let _span = span.enter();
647
648 let block_handles = &this.block_handles;
649
650 let now_utime = top_handle.gen_utime();
651
652 let mut has_suitable = false;
654 loop {
655 match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
656 Some(handle) if !has_suitable => {
658 has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
659 top_handle = handle;
660 }
661 Some(handle) => {
663 top_handle = handle;
664 break;
665 }
666 None => return Ok(()),
668 }
669 }
670
671 let mut index = this.mc_seqno_to_block_ids.lock();
673 index.retain(|&mc_seqno, block_ids| {
674 if mc_seqno >= top_handle.id().seqno || mc_seqno == 0 {
675 return true;
676 }
677
678 for block_id in block_ids.drain() {
679 this.clear_cache(&block_id);
681 }
682 false
683 });
684
685 this.clear_outdated_state_entries(top_handle.id())
687 })
688 .await?
689 }
690
691 async fn try_reuse_persistent_state(
692 &self,
693 mc_seqno: u32,
694 handle: &BlockHandle,
695 kind: PersistentStateKind,
696 ) -> Result<bool> {
697 match kind {
699 PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
700 PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
701 _ => {}
702 }
703
704 let block_id = *handle.id();
705
706 let Some(cached) = self
707 .inner
708 .descriptor_cache
709 .get(&CacheKey { block_id, kind })
710 .map(|r| r.clone())
711 else {
712 return Ok(false);
714 };
715
716 if cached.mc_seqno >= mc_seqno {
717 return Ok(true);
719 }
720
721 let this = self.inner.clone();
722
723 let span = tracing::Span::current();
724 let state = tokio::task::spawn_blocking(move || {
725 let _span = span.enter();
726
727 let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
728
729 let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
730 std::fs::write(temp_file.path(), cached.file.as_slice())?;
731 temp_file.rename(kind.make_file_name(&block_id))?;
732
733 drop(cached);
734
735 this.cache_state(mc_seqno, &block_id, kind)
736 })
737 .await??;
738
739 self.notify_with_persistent_state(&state).await;
740 Ok(true)
741 }
742
743 async fn notify_with_persistent_state(&self, state: &PersistentState) {
744 let subscriptions = self.inner.subscriptions.load_full();
745 for sender in subscriptions.values() {
746 sender.send(state.clone()).await.ok();
747 }
748 }
749}
750
751struct Inner {
752 cells_db: CellsDb,
753 storage_dir: Dir,
754 block_handles: Arc<BlockHandleStorage>,
755 blocks: Arc<BlockStorage>,
756 shard_states: Arc<ShardStateStorage>,
757 descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
758 mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
759 chunks_semaphore: Arc<Semaphore>,
760 handles_queue: Mutex<HandlesQueue>,
761 oldest_ps_changed: Notify,
762 oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
763 subscriptions: ArcSwap<FastHashMap<usize, mpsc::Sender<PersistentState>>>,
764 subscriptions_mutex: Mutex<()>,
765}
766
767impl Inner {
768 fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
769 let states_dir = self.mc_states_dir(mc_seqno);
770 if !states_dir.path().is_dir() {
771 tracing::info!(mc_seqno, "creating persistent state directory");
772 states_dir.create_if_not_exists()?;
773 }
774 Ok(states_dir)
775 }
776
777 fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
778 Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
779 }
780
781 fn clear_outdated_state_entries(&self, recent_block_id: &BlockId) -> Result<()> {
782 let mut directories_to_remove: Vec<PathBuf> = Vec::new();
783 let mut files_to_remove: Vec<PathBuf> = Vec::new();
784
785 for entry in self.storage_dir.entries()?.flatten() {
786 let path = entry.path();
787
788 if path.is_file() {
789 files_to_remove.push(path);
790 continue;
791 }
792
793 let Ok(name) = entry.file_name().into_string() else {
794 directories_to_remove.push(path);
795 continue;
796 };
797
798 let is_recent = matches!(
799 name.parse::<u32>(),
800 Ok(seqno) if seqno >= recent_block_id.seqno || seqno == 0
801 );
802 if !is_recent {
803 directories_to_remove.push(path);
804 }
805 }
806
807 for dir in directories_to_remove {
808 tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
809 if let Err(e) = std::fs::remove_dir_all(&dir) {
810 tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
811 }
812 }
813
814 for file in files_to_remove {
815 tracing::info!(file = %file.display(), "removing file");
816 if let Err(e) = std::fs::remove_file(&file) {
817 tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
818 }
819 }
820
821 Ok(())
822 }
823
824 fn cache_state(
825 &self,
826 mc_seqno: u32,
827 block_id: &BlockId,
828 kind: PersistentStateKind,
829 ) -> Result<PersistentState> {
830 use std::collections::btree_map;
831
832 use dashmap::mapref::entry::Entry;
833
834 let key = CacheKey {
835 block_id: *block_id,
836 kind,
837 };
838
839 let load_mapped = || {
840 let mut file = self
841 .mc_states_dir(mc_seqno)
842 .file(kind.make_file_name(block_id))
843 .read(true)
844 .open()?;
845
846 let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
850 .context("failed to create a temp file")?;
851
852 std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
856 temp_file.flush()?;
857 temp_file.seek(std::io::SeekFrom::Start(0))?;
858
859 MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
860 };
861
862 let file =
863 load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
864
865 let new_state = Arc::new(CachedState { mc_seqno, file });
866
867 let prev_mc_seqno = match self.descriptor_cache.entry(key) {
868 Entry::Vacant(entry) => {
869 entry.insert(new_state.clone());
870 None
871 }
872 Entry::Occupied(mut entry) => {
873 let prev_mc_seqno = entry.get().mc_seqno;
874 if mc_seqno <= prev_mc_seqno {
875 return Ok(PersistentState {
877 block_id: *block_id,
878 kind,
879 cached: entry.get().clone(),
880 });
881 }
882
883 entry.insert(new_state.clone());
884 Some(prev_mc_seqno)
885 }
886 };
887
888 let mut index = self.mc_seqno_to_block_ids.lock();
889
890 if let Some(prev_mc_seqno) = prev_mc_seqno
892 && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
893 {
894 entry.get_mut().remove(block_id);
895 if entry.get().is_empty() {
896 entry.remove();
897 }
898 }
899
900 index.entry(mc_seqno).or_default().insert(*block_id);
901
902 Ok(PersistentState {
903 block_id: *block_id,
904 kind,
905 cached: new_state,
906 })
907 }
908
909 fn clear_cache(&self, block_id: &BlockId) {
910 self.descriptor_cache.remove(&CacheKey {
911 block_id: *block_id,
912 kind: PersistentStateKind::Shard,
913 });
914 self.descriptor_cache.remove(&CacheKey {
915 block_id: *block_id,
916 kind: PersistentStateKind::Queue,
917 });
918 }
919}
920
921#[derive(Debug, Clone, Copy)]
922pub struct PersistentStateInfo {
923 pub size: NonZeroU64,
924 pub chunk_size: NonZeroU32,
925}
926
927#[derive(Clone)]
928pub struct PersistentState {
929 block_id: BlockId,
930 kind: PersistentStateKind,
931 cached: Arc<CachedState>,
932}
933
934impl PersistentState {
935 pub fn block_id(&self) -> &BlockId {
936 &self.block_id
937 }
938
939 pub fn kind(&self) -> PersistentStateKind {
940 self.kind
941 }
942
943 pub fn file(&self) -> &MappedFile {
944 &self.cached.file
945 }
946
947 pub fn mc_seqno(&self) -> u32 {
948 self.cached.mc_seqno
949 }
950}
951
952pub struct PersistentStateReceiver {
953 id: usize,
954 inner: Weak<Inner>,
955 receiver: mpsc::Receiver<PersistentState>,
956}
957
958impl std::ops::Deref for PersistentStateReceiver {
959 type Target = mpsc::Receiver<PersistentState>;
960
961 #[inline]
962 fn deref(&self) -> &Self::Target {
963 &self.receiver
964 }
965}
966
967impl std::ops::DerefMut for PersistentStateReceiver {
968 #[inline]
969 fn deref_mut(&mut self) -> &mut Self::Target {
970 &mut self.receiver
971 }
972}
973
974impl Drop for PersistentStateReceiver {
975 fn drop(&mut self) {
976 if let Some(inner) = self.inner.upgrade() {
977 let _guard = inner.subscriptions_mutex.lock();
978 let mut subscriptions = inner.subscriptions.load_full();
979 {
980 let subscriptions = Arc::make_mut(&mut subscriptions);
981 subscriptions.remove(&self.id);
982 }
983 inner.subscriptions.store(subscriptions);
984 }
985 }
986}
987
988static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0);
989
990struct CachedState {
991 mc_seqno: u32,
992 file: MappedFile,
993}
994
995#[derive(Default)]
996struct HandlesQueue {
997 handles: VecDeque<BlockHandle>,
998}
999
1000impl HandlesQueue {
1001 fn oldest_known(&self) -> Option<&BlockHandle> {
1002 self.handles.back()
1003 }
1004
1005 fn push(&mut self, new_handle: BlockHandle) -> bool {
1006 if let Some(newest) = self.handles.front()
1008 && newest.id().seqno >= new_handle.id().seqno
1009 {
1010 return false;
1011 }
1012
1013 let now_utime = new_handle.gen_utime();
1015 let mut has_suitable = false;
1016 self.handles.retain(|old_handle| {
1017 if !has_suitable {
1018 has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
1019 true
1020 } else {
1021 false
1022 }
1023 });
1024
1025 self.handles.push_front(new_handle);
1027 true
1028 }
1029}
1030
1031const STATE_CHUNK_SIZE: u64 = 1024 * 1024;