tycho_core/storage/persistent_state/
mod.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use anyhow::{Context, Result};
9use arc_swap::ArcSwapAny;
10use dashmap::DashMap;
11use parking_lot::Mutex;
12use tokio::sync::{Notify, Semaphore};
13use tokio::time::Instant;
14use tycho_block_util::block::BlockStuff;
15use tycho_block_util::queue::QueueStateHeader;
16use tycho_block_util::state::RefMcStateHandle;
17use tycho_storage::fs::{Dir, MappedFile};
18use tycho_types::models::{BlockId, PrevBlockRef};
19use tycho_util::FastHashSet;
20use tycho_util::sync::CancellationFlag;
21
22pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
23pub use self::queue_state::writer::QueueStateWriter;
24pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
25pub use self::shard_state::writer::ShardStateWriter;
26use super::{
27    BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, ShardStateStorage,
28};
29
30mod queue_state {
31    pub mod reader;
32    pub mod writer;
33}
34mod shard_state {
35    pub mod reader;
36    pub mod writer;
37}
38
39#[cfg(test)]
40mod tests;
41
42const BASE_DIR: &str = "states";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum PersistentStateKind {
46    Shard,
47    Queue,
48}
49
50impl PersistentStateKind {
51    fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
52        match self {
53            Self::Shard => ShardStateWriter::file_name(block_id),
54            Self::Queue => QueueStateWriter::file_name(block_id),
55        }
56    }
57
58    fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
59        match self {
60            Self::Shard => ShardStateWriter::temp_file_name(block_id),
61            Self::Queue => QueueStateWriter::temp_file_name(block_id),
62        }
63    }
64
65    fn from_extension(extension: &str) -> Option<Self> {
66        match extension {
67            ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
68            QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
69            _ => None,
70        }
71    }
72}
73
74#[derive(Debug, Eq, Hash, PartialEq)]
75struct CacheKey {
76    block_id: BlockId,
77    kind: PersistentStateKind,
78}
79
80#[derive(Clone)]
81pub struct PersistentStateStorage {
82    inner: Arc<Inner>,
83}
84
85impl PersistentStateStorage {
86    pub fn new(
87        cells_db: CellsDb,
88        files_dir: &Dir,
89        block_handle_storage: Arc<BlockHandleStorage>,
90        block_storage: Arc<BlockStorage>,
91        shard_state_storage: Arc<ShardStateStorage>,
92    ) -> Result<Self> {
93        const MAX_PARALLEL_CHUNK_READS: usize = 20;
94
95        let storage_dir = files_dir.create_subdir(BASE_DIR)?;
96
97        Ok(Self {
98            inner: Arc::new(Inner {
99                cells_db,
100                storage_dir,
101                block_handles: block_handle_storage,
102                blocks: block_storage,
103                shard_states: shard_state_storage,
104                descriptor_cache: Default::default(),
105                mc_seqno_to_block_ids: Default::default(),
106                chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
107                handles_queue: Default::default(),
108                oldest_ps_changed: Default::default(),
109                oldest_ps_handle: Default::default(),
110            }),
111        })
112    }
113
114    pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
115        self.inner.oldest_ps_handle.load_full()
116    }
117
118    pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
119        self.inner.oldest_ps_changed.notified()
120    }
121
122    #[tracing::instrument(skip_all)]
123    pub async fn preload(&self) -> Result<()> {
124        self.preload_handles_queue()?;
125        self.preload_states().await
126    }
127
128    fn preload_handles_queue(&self) -> Result<()> {
129        let this = self.inner.as_ref();
130
131        let block_handles = this.block_handles.as_ref();
132
133        let mut changed = false;
134        let mut prev_utime = 0;
135        for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
136            let block_handle = block_handles
137                .load_handle(&block_id)
138                .context("key block handle not found")?;
139
140            let gen_utime = block_handle.gen_utime();
141            if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
142                prev_utime = gen_utime;
143
144                let mut queue = this.handles_queue.lock();
145                if queue.push(block_handle) {
146                    this.oldest_ps_handle.store(queue.oldest_known().cloned());
147                    changed = true;
148                }
149            }
150        }
151
152        if changed {
153            this.oldest_ps_changed.notify_waiters();
154        }
155        Ok(())
156    }
157
158    async fn preload_states(&self) -> Result<()> {
159        // For each mc_seqno directory
160        let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
161            'outer: for entry in std::fs::read_dir(dir)?.flatten() {
162                let path = entry.path();
163                // Skip subdirectories
164                if path.is_dir() {
165                    tracing::warn!(path = %path.display(), "unexpected directory");
166                    continue;
167                }
168
169                'file: {
170                    // Try to parse the file name as a block_id
171                    let Ok(block_id) = path
172                        // TODO should use file_prefix
173                        .file_stem()
174                        .unwrap_or_default()
175                        .to_str()
176                        .unwrap_or_default()
177                        .parse::<BlockId>()
178                    else {
179                        break 'file;
180                    };
181
182                    let extension = path
183                        .extension()
184                        .and_then(|ext| ext.to_str())
185                        .unwrap_or_default();
186
187                    let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
188                        break 'file;
189                    };
190
191                    this.cache_state(mc_seqno, &block_id, cache_type)?;
192                    continue 'outer;
193                }
194                tracing::warn!(path = %path.display(), "unexpected file");
195            }
196            Ok(())
197        };
198
199        let this = self.inner.clone();
200        let span = tracing::Span::current();
201        tokio::task::spawn_blocking(move || {
202            let _span = span.enter();
203
204            // For each entry in the storage directory
205            'outer: for entry in this.storage_dir.entries()?.flatten() {
206                let path = entry.path();
207                // Skip files
208                if path.is_file() {
209                    tracing::warn!(path = %path.display(), "unexpected file");
210                    continue;
211                }
212
213                'dir: {
214                    // Try to parse the directory name as an mc_seqno
215                    let Ok(name) = entry.file_name().into_string() else {
216                        break 'dir;
217                    };
218                    let Ok(mc_seqno) = name.parse::<u32>() else {
219                        break 'dir;
220                    };
221
222                    // Try to load files in the directory as persistent states
223                    process_states(&this, &path, mc_seqno)?;
224                    continue 'outer;
225                }
226                tracing::warn!(path = %path.display(), "unexpected directory");
227            }
228
229            Ok(())
230        })
231        .await?
232    }
233
234    // NOTE: This is intentionally a method, not a constant because
235    // it might be useful to allow configure it during the first run.
236    pub fn state_chunk_size(&self) -> NonZeroU32 {
237        NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
238    }
239
240    pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
241        self.inner.descriptor_cache.contains_key(&CacheKey {
242            block_id: *block_id,
243            kind,
244        })
245    }
246
247    pub fn get_state_info(
248        &self,
249        block_id: &BlockId,
250        kind: PersistentStateKind,
251    ) -> Option<PersistentStateInfo> {
252        self.inner
253            .descriptor_cache
254            .get(&CacheKey {
255                block_id: *block_id,
256                kind,
257            })
258            .and_then(|cached| {
259                let size = NonZeroU64::new(cached.file.length() as u64)?;
260                Some(PersistentStateInfo {
261                    size,
262                    chunk_size: self.state_chunk_size(),
263                })
264            })
265    }
266
267    pub async fn read_state_part(
268        &self,
269        block_id: &BlockId,
270        offset: u64,
271        state_kind: PersistentStateKind,
272    ) -> Option<Vec<u8>> {
273        // NOTE: Should be noop on x64
274        let offset = usize::try_from(offset).ok()?;
275        let chunk_size = self.state_chunk_size().get() as usize;
276        if offset % chunk_size != 0 {
277            return None;
278        }
279
280        let permit = {
281            let semaphore = self.inner.chunks_semaphore.clone();
282            semaphore.acquire_owned().await.ok()?
283        };
284
285        let key = CacheKey {
286            block_id: *block_id,
287            kind: state_kind,
288        };
289        let cached = self.inner.descriptor_cache.get(&key)?.clone();
290        if offset > cached.file.length() {
291            return None;
292        }
293
294        // NOTE: Cached file is a mapped file, therefore it can take a while to read from it.
295        // NOTE: `spawn_blocking` is called here because it is mostly IO-bound operation.
296        // TODO: Add semaphore to limit the number of concurrent operations.
297        tokio::task::spawn_blocking(move || {
298            // Ensure that permit is dropped only after cached state is used.
299            let _permit = permit;
300
301            let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
302            cached.file.as_slice()[offset..end].to_vec()
303        })
304        .await
305        .ok()
306    }
307
308    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
309    pub async fn store_shard_state(
310        &self,
311        mc_seqno: u32,
312        handle: &BlockHandle,
313        tracker_handle: RefMcStateHandle,
314    ) -> Result<()> {
315        if self
316            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
317            .await?
318        {
319            return Ok(());
320        }
321
322        let cancelled = CancellationFlag::new();
323        scopeguard::defer! {
324            cancelled.cancel();
325        }
326
327        let handle = handle.clone();
328        let this = self.inner.clone();
329        let cancelled = cancelled.clone();
330        let span = tracing::Span::current();
331
332        tokio::task::spawn_blocking(move || {
333            let _span = span.enter();
334
335            let guard = scopeguard::guard((), |_| {
336                tracing::warn!("cancelled");
337            });
338
339            // NOTE: Ensure that the tracker handle will outlive the state writer.
340            let _tracker_handle = tracker_handle;
341
342            let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
343
344            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
345
346            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
347            match cell_writer.write(&root_hash, Some(&cancelled)) {
348                Ok(()) => {
349                    this.block_handles.set_has_persistent_shard_state(&handle);
350                    tracing::info!("persistent shard state saved");
351                }
352                Err(e) => {
353                    // NOTE: We are ignoring an error here. It might be intentional
354                    tracing::error!("failed to write persistent shard state: {e:?}");
355                }
356            }
357
358            this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
359
360            scopeguard::ScopeGuard::into_inner(guard);
361            Ok(())
362        })
363        .await?
364    }
365
366    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
367    pub async fn store_shard_state_file(
368        &self,
369        mc_seqno: u32,
370        handle: &BlockHandle,
371        file: File,
372    ) -> Result<()> {
373        if self
374            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
375            .await?
376        {
377            return Ok(());
378        }
379
380        let cancelled = CancellationFlag::new();
381        scopeguard::defer! {
382            cancelled.cancel();
383        }
384
385        let handle = handle.clone();
386        let this = self.inner.clone();
387        let cancelled = cancelled.clone();
388        let span = tracing::Span::current();
389
390        tokio::task::spawn_blocking(move || {
391            let _span = span.enter();
392
393            let guard = scopeguard::guard((), |_| {
394                tracing::warn!("cancelled");
395            });
396
397            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
398
399            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
400            cell_writer.write_file(file, Some(&cancelled))?;
401            this.block_handles.set_has_persistent_shard_state(&handle);
402            this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
403
404            scopeguard::ScopeGuard::into_inner(guard);
405            Ok(())
406        })
407        .await?
408    }
409
410    #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
411    pub async fn store_queue_state(
412        &self,
413        mc_seqno: u32,
414        handle: &BlockHandle,
415        block: BlockStuff,
416    ) -> Result<()> {
417        if self
418            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
419            .await?
420        {
421            return Ok(());
422        }
423
424        let this = self.inner.clone();
425
426        let shard_ident = handle.id().shard;
427
428        let mut queue_diffs = Vec::new();
429        let mut messages = Vec::new();
430
431        let mut top_block_handle = handle.clone();
432        let mut top_block = block;
433
434        let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
435
436        while tail_len > 0 {
437            let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
438            let top_block_info = top_block.load_info()?;
439
440            let block_extra = top_block.load_extra()?;
441            let out_messages = block_extra.load_out_msg_description()?;
442
443            messages.push(queue_diff.zip(&out_messages));
444            queue_diffs.push(queue_diff.diff().clone());
445
446            if tail_len == 1 {
447                break;
448            }
449
450            let prev_block_id = match top_block_info.load_prev_ref()? {
451                PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
452                PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
453            };
454
455            let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
456                anyhow::bail!("prev block handle not found for: {prev_block_id}");
457            };
458            let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
459
460            top_block_handle = prev_block_handle;
461            top_block = prev_block;
462            tail_len -= 1;
463        }
464
465        let state = QueueStateHeader {
466            shard_ident,
467            seqno: handle.id().seqno,
468            queue_diffs,
469        };
470
471        let cancelled = CancellationFlag::new();
472        scopeguard::defer! {
473            cancelled.cancel();
474        }
475
476        let handle = handle.clone();
477        let cancelled = cancelled.clone();
478        let span = tracing::Span::current();
479
480        tokio::task::spawn_blocking(move || {
481            let _span = span.enter();
482
483            let guard = scopeguard::guard((), |_| {
484                tracing::warn!("cancelled");
485            });
486
487            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
488            match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
489                .write(Some(&cancelled))
490            {
491                Ok(()) => {
492                    this.block_handles.set_has_persistent_queue_state(&handle);
493                    tracing::info!("persistent queue state saved");
494                }
495                Err(e) => {
496                    tracing::error!("failed to write persistent queue state: {e:?}");
497                }
498            }
499
500            this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
501
502            scopeguard::ScopeGuard::into_inner(guard);
503            Ok(())
504        })
505        .await?
506    }
507
508    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
509    pub async fn store_queue_state_file(
510        &self,
511        mc_seqno: u32,
512        handle: &BlockHandle,
513        file: File,
514    ) -> Result<()> {
515        if self
516            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
517            .await?
518        {
519            return Ok(());
520        }
521
522        let cancelled = CancellationFlag::new();
523        scopeguard::defer! {
524            cancelled.cancel();
525        }
526
527        let handle = handle.clone();
528        let this = self.inner.clone();
529        let cancelled = cancelled.clone();
530        let span = tracing::Span::current();
531
532        tokio::task::spawn_blocking(move || {
533            let _span = span.enter();
534
535            let guard = scopeguard::guard((), |_| {
536                tracing::warn!("cancelled");
537            });
538
539            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
540
541            QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
542            this.block_handles.set_has_persistent_queue_state(&handle);
543            this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
544
545            scopeguard::ScopeGuard::into_inner(guard);
546            Ok(())
547        })
548        .await?
549    }
550
551    pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
552        anyhow::ensure!(
553            top_handle.is_masterchain(),
554            "top persistent state handle must be in the masterchain"
555        );
556
557        {
558            tracing::info!(
559                mc_block_id = %top_handle.id(),
560                "adding new persistent state to the queue"
561            );
562
563            let mut queue = self.inner.handles_queue.lock();
564            if queue.push(top_handle.clone()) {
565                self.inner
566                    .oldest_ps_handle
567                    .store(queue.oldest_known().cloned());
568                self.inner.oldest_ps_changed.notify_waiters();
569            }
570        }
571
572        tracing::info!("started clearing old persistent state directories");
573        let start = Instant::now();
574        scopeguard::defer! {
575            tracing::info!(
576                elapsed = %humantime::format_duration(start.elapsed()),
577                "clearing old persistent state directories completed"
578            );
579        }
580
581        let this = self.inner.clone();
582        let mut top_handle = top_handle.clone();
583        if top_handle.id().seqno == 0 {
584            // Nothing to clear for the zerostate
585            return Ok(());
586        }
587
588        let span = tracing::Span::current();
589        tokio::task::spawn_blocking(move || {
590            let _span = span.enter();
591
592            let block_handles = &this.block_handles;
593
594            let now_utime = top_handle.gen_utime();
595
596            // Find a state before the
597            let mut has_suitable = false;
598            loop {
599                match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
600                    // Find the newest usable persistent state...
601                    Some(handle) if !has_suitable => {
602                        has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
603                        top_handle = handle;
604                    }
605                    // ...and return the previous one.
606                    Some(handle) => {
607                        top_handle = handle;
608                        break;
609                    }
610                    // Or do nothing if not found.
611                    None => return Ok(()),
612                }
613            }
614
615            // Remove cached states
616            let mut index = this.mc_seqno_to_block_ids.lock();
617            index.retain(|&mc_seqno, block_ids| {
618                if mc_seqno >= top_handle.id().seqno || mc_seqno == 0 {
619                    return true;
620                }
621
622                for block_id in block_ids.drain() {
623                    // TODO: Clear flag in block handle
624                    this.clear_cache(&block_id);
625                }
626                false
627            });
628
629            // Remove files
630            this.clear_outdated_state_entries(top_handle.id())
631        })
632        .await?
633    }
634
635    async fn try_reuse_persistent_state(
636        &self,
637        mc_seqno: u32,
638        handle: &BlockHandle,
639        kind: PersistentStateKind,
640    ) -> Result<bool> {
641        // Check if there is anything to reuse (return false if nothing)
642        match kind {
643            PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
644            PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
645            _ => {}
646        }
647
648        let block_id = *handle.id();
649
650        let Some(cached) = self
651            .inner
652            .descriptor_cache
653            .get(&CacheKey { block_id, kind })
654            .map(|r| r.clone())
655        else {
656            // Nothing to reuse
657            return Ok(false);
658        };
659
660        if cached.mc_seqno >= mc_seqno {
661            // We already have the recent enough state
662            return Ok(true);
663        }
664
665        let this = self.inner.clone();
666
667        let span = tracing::Span::current();
668        tokio::task::spawn_blocking(move || {
669            let _span = span.enter();
670
671            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
672
673            let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
674            std::fs::write(temp_file.path(), cached.file.as_slice())?;
675            temp_file.rename(kind.make_file_name(&block_id))?;
676
677            drop(cached);
678
679            this.cache_state(mc_seqno, &block_id, kind)?;
680            Ok(true)
681        })
682        .await?
683    }
684}
685
686struct Inner {
687    cells_db: CellsDb,
688    storage_dir: Dir,
689    block_handles: Arc<BlockHandleStorage>,
690    blocks: Arc<BlockStorage>,
691    shard_states: Arc<ShardStateStorage>,
692    descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
693    mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
694    chunks_semaphore: Arc<Semaphore>,
695    handles_queue: Mutex<HandlesQueue>,
696    oldest_ps_changed: Notify,
697    oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
698}
699
700impl Inner {
701    fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
702        let states_dir = self.mc_states_dir(mc_seqno);
703        if !states_dir.path().is_dir() {
704            tracing::info!(mc_seqno, "creating persistent state directory");
705            states_dir.create_if_not_exists()?;
706        }
707        Ok(states_dir)
708    }
709
710    fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
711        Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
712    }
713
714    fn clear_outdated_state_entries(&self, recent_block_id: &BlockId) -> Result<()> {
715        let mut directories_to_remove: Vec<PathBuf> = Vec::new();
716        let mut files_to_remove: Vec<PathBuf> = Vec::new();
717
718        for entry in self.storage_dir.entries()?.flatten() {
719            let path = entry.path();
720
721            if path.is_file() {
722                files_to_remove.push(path);
723                continue;
724            }
725
726            let Ok(name) = entry.file_name().into_string() else {
727                directories_to_remove.push(path);
728                continue;
729            };
730
731            let is_recent = matches!(
732                name.parse::<u32>(),
733                Ok(seqno) if seqno >= recent_block_id.seqno || seqno == 0
734            );
735            if !is_recent {
736                directories_to_remove.push(path);
737            }
738        }
739
740        for dir in directories_to_remove {
741            tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
742            if let Err(e) = std::fs::remove_dir_all(&dir) {
743                tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
744            }
745        }
746
747        for file in files_to_remove {
748            tracing::info!(file = %file.display(), "removing file");
749            if let Err(e) = std::fs::remove_file(&file) {
750                tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
751            }
752        }
753
754        Ok(())
755    }
756
757    fn cache_state(
758        &self,
759        mc_seqno: u32,
760        block_id: &BlockId,
761        kind: PersistentStateKind,
762    ) -> Result<()> {
763        use std::collections::btree_map;
764
765        use dashmap::mapref::entry::Entry;
766
767        let key = CacheKey {
768            block_id: *block_id,
769            kind,
770        };
771
772        let load_mapped = || {
773            let mut file = self
774                .mc_states_dir(mc_seqno)
775                .file(kind.make_file_name(block_id))
776                .read(true)
777                .open()?;
778
779            // We create a copy of the original file here to make sure
780            // that the underlying mapped file will not be changed outside
781            // of the node. Otherwise it will randomly fail with exit code 7/BUS.
782            let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
783                .context("failed to create a temp file")?;
784
785            // Underlying implementation will call something like `copy_file_range`,
786            // and we hope that it will be just COW pages.
787            // TODO: Find a way to cancel this operation.
788            std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
789            temp_file.flush()?;
790            temp_file.seek(std::io::SeekFrom::Start(0))?;
791
792            MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
793        };
794
795        let file =
796            load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
797
798        let new_state = Arc::new(CachedState { mc_seqno, file });
799
800        let prev_mc_seqno = match self.descriptor_cache.entry(key) {
801            Entry::Vacant(entry) => {
802                entry.insert(new_state);
803                None
804            }
805            Entry::Occupied(mut entry) => {
806                let prev_mc_seqno = entry.get().mc_seqno;
807                if mc_seqno <= prev_mc_seqno {
808                    // Cache only the most recent block (if changed)
809                    return Ok(());
810                }
811
812                entry.insert(new_state);
813                Some(prev_mc_seqno)
814            }
815        };
816
817        let mut index = self.mc_seqno_to_block_ids.lock();
818
819        // Remove previous entry if exists
820        if let Some(prev_mc_seqno) = prev_mc_seqno
821            && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
822        {
823            entry.get_mut().remove(block_id);
824            if entry.get().is_empty() {
825                entry.remove();
826            }
827        }
828
829        index.entry(mc_seqno).or_default().insert(*block_id);
830
831        Ok(())
832    }
833
834    fn clear_cache(&self, block_id: &BlockId) {
835        self.descriptor_cache.remove(&CacheKey {
836            block_id: *block_id,
837            kind: PersistentStateKind::Shard,
838        });
839        self.descriptor_cache.remove(&CacheKey {
840            block_id: *block_id,
841            kind: PersistentStateKind::Queue,
842        });
843    }
844}
845
846#[derive(Debug, Clone, Copy)]
847pub struct PersistentStateInfo {
848    pub size: NonZeroU64,
849    pub chunk_size: NonZeroU32,
850}
851
852struct CachedState {
853    mc_seqno: u32,
854    file: MappedFile,
855}
856
857#[derive(Default)]
858struct HandlesQueue {
859    handles: VecDeque<BlockHandle>,
860}
861
862impl HandlesQueue {
863    fn oldest_known(&self) -> Option<&BlockHandle> {
864        self.handles.back()
865    }
866
867    fn push(&mut self, new_handle: BlockHandle) -> bool {
868        // Allow only new blocks
869        if let Some(newest) = self.handles.front()
870            && newest.id().seqno >= new_handle.id().seqno
871        {
872            return false;
873        }
874
875        // Remove too old states
876        let now_utime = new_handle.gen_utime();
877        let mut has_suitable = false;
878        self.handles.retain(|old_handle| {
879            if !has_suitable {
880                has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
881                true
882            } else {
883                false
884            }
885        });
886
887        // Add the new one
888        self.handles.push_front(new_handle);
889        true
890    }
891}
892
893const STATE_CHUNK_SIZE: u64 = 1024 * 1024; // 1 MB