Skip to main content

tycho_core/storage/persistent_state/
mod.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Weak};
8
9use anyhow::{Context, Result};
10use arc_swap::{ArcSwap, ArcSwapAny};
11use dashmap::DashMap;
12use parking_lot::Mutex;
13use tokio::sync::{Notify, Semaphore, mpsc};
14use tokio::time::Instant;
15use tycho_block_util::block::BlockStuff;
16use tycho_block_util::queue::QueueStateHeader;
17use tycho_block_util::state::RefMcStateHandle;
18use tycho_storage::fs::Dir;
19use tycho_types::models::{BlockId, PrevBlockRef};
20use tycho_util::fs::MappedFile;
21use tycho_util::sync::CancellationFlag;
22use tycho_util::{FastHashMap, FastHashSet};
23
24pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
25pub use self::queue_state::writer::QueueStateWriter;
26pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
27pub use self::shard_state::writer::ShardStateWriter;
28use super::{
29    BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, NodeStateStorage,
30    ShardStateStorage,
31};
32
33mod queue_state {
34    pub mod reader;
35    pub mod writer;
36}
37mod shard_state {
38    pub mod reader;
39    pub mod writer;
40}
41
42#[cfg(test)]
43mod tests;
44
45const BASE_DIR: &str = "states";
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
48pub enum PersistentStateKind {
49    Shard,
50    Queue,
51}
52
53impl PersistentStateKind {
54    pub fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
55        match self {
56            Self::Shard => ShardStateWriter::file_name(block_id),
57            Self::Queue => QueueStateWriter::file_name(block_id),
58        }
59    }
60
61    pub fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
62        match self {
63            Self::Shard => ShardStateWriter::temp_file_name(block_id),
64            Self::Queue => QueueStateWriter::temp_file_name(block_id),
65        }
66    }
67
68    pub fn from_extension(extension: &str) -> Option<Self> {
69        match extension {
70            ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
71            QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
72            _ => None,
73        }
74    }
75}
76
77#[derive(Debug, Eq, Hash, PartialEq)]
78struct CacheKey {
79    block_id: BlockId,
80    kind: PersistentStateKind,
81}
82
83#[derive(Clone)]
84pub struct PersistentStateStorage {
85    inner: Arc<Inner>,
86}
87
88impl PersistentStateStorage {
89    pub fn new(
90        cells_db: CellsDb,
91        files_dir: &Dir,
92        node_state: Arc<NodeStateStorage>,
93        block_handle_storage: Arc<BlockHandleStorage>,
94        block_storage: Arc<BlockStorage>,
95        shard_state_storage: Arc<ShardStateStorage>,
96    ) -> Result<Self> {
97        const MAX_PARALLEL_CHUNK_READS: usize = 20;
98
99        let storage_dir = files_dir.create_subdir(BASE_DIR)?;
100
101        Ok(Self {
102            inner: Arc::new(Inner {
103                cells_db,
104                storage_dir,
105                node_state,
106                block_handles: block_handle_storage,
107                blocks: block_storage,
108                shard_states: shard_state_storage,
109                descriptor_cache: Default::default(),
110                mc_seqno_to_block_ids: Default::default(),
111                chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
112                handles_queue: Default::default(),
113                oldest_ps_changed: Default::default(),
114                oldest_ps_handle: Default::default(),
115                subscriptions: Default::default(),
116                subscriptions_mutex: Default::default(),
117            }),
118        })
119    }
120
121    pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
122        self.inner.oldest_ps_handle.load_full()
123    }
124
125    pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
126        self.inner.oldest_ps_changed.notified()
127    }
128
129    #[tracing::instrument(skip_all)]
130    pub async fn preload(&self) -> Result<()> {
131        self.preload_handles_queue()?;
132        self.preload_states().await
133    }
134
135    fn preload_handles_queue(&self) -> Result<()> {
136        let this = self.inner.as_ref();
137
138        let block_handles = this.block_handles.as_ref();
139
140        let mut changed = false;
141        let mut prev_utime = 0;
142        for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
143            let block_handle = block_handles
144                .load_handle(&block_id)
145                .context("key block handle not found")?;
146
147            let gen_utime = block_handle.gen_utime();
148            if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
149                prev_utime = gen_utime;
150
151                let mut queue = this.handles_queue.lock();
152                if queue.push(block_handle) {
153                    this.oldest_ps_handle.store(queue.oldest_known().cloned());
154                    changed = true;
155                }
156            }
157        }
158
159        if changed {
160            this.oldest_ps_changed.notify_waiters();
161        }
162        Ok(())
163    }
164
165    async fn preload_states(&self) -> Result<()> {
166        // For each mc_seqno directory
167        let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
168            'outer: for entry in std::fs::read_dir(dir)?.flatten() {
169                let path = entry.path();
170                // Skip subdirectories
171                if path.is_dir() {
172                    tracing::warn!(path = %path.display(), "unexpected directory");
173                    continue;
174                }
175
176                'file: {
177                    // Try to parse the file name as a block_id
178                    let Ok(block_id) = path
179                        // TODO should use file_prefix
180                        .file_stem()
181                        .unwrap_or_default()
182                        .to_str()
183                        .unwrap_or_default()
184                        .parse::<BlockId>()
185                    else {
186                        break 'file;
187                    };
188
189                    let extension = path
190                        .extension()
191                        .and_then(|ext| ext.to_str())
192                        .unwrap_or_default();
193
194                    let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
195                        break 'file;
196                    };
197
198                    this.cache_state(mc_seqno, &block_id, cache_type)?;
199                    continue 'outer;
200                }
201                tracing::warn!(path = %path.display(), "unexpected file");
202            }
203            Ok(())
204        };
205
206        let this = self.inner.clone();
207        let span = tracing::Span::current();
208        tokio::task::spawn_blocking(move || {
209            let _span = span.enter();
210
211            // For each entry in the storage directory
212            'outer: for entry in this.storage_dir.entries()?.flatten() {
213                let path = entry.path();
214                // Skip files
215                if path.is_file() {
216                    tracing::warn!(path = %path.display(), "unexpected file");
217                    continue;
218                }
219
220                'dir: {
221                    // Try to parse the directory name as an mc_seqno
222                    let Ok(name) = entry.file_name().into_string() else {
223                        break 'dir;
224                    };
225                    let Ok(mc_seqno) = name.parse::<u32>() else {
226                        break 'dir;
227                    };
228
229                    // Try to load files in the directory as persistent states
230                    process_states(&this, &path, mc_seqno)?;
231                    continue 'outer;
232                }
233                tracing::warn!(path = %path.display(), "unexpected directory");
234            }
235
236            Ok(())
237        })
238        .await?
239    }
240
241    // NOTE: This is intentionally a method, not a constant because
242    // it might be useful to allow configure it during the first run.
243    pub fn state_chunk_size(&self) -> NonZeroU32 {
244        NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
245    }
246
247    pub fn subscribe(&self) -> (Vec<PersistentState>, PersistentStateReceiver) {
248        let id = RECEIVER_ID.fetch_add(1, Ordering::Relaxed);
249        let (sender, receiver) = mpsc::channel(1);
250
251        // TODO: Hold `_guard` for the whole method body? So that we can know
252        // that no states will be send to subscriptions while we are collecting
253        // the current cache snapshot.
254        {
255            let _guard = self.inner.subscriptions_mutex.lock();
256            let mut subscriptions = self.inner.subscriptions.load_full();
257            {
258                let subscriptions = Arc::make_mut(&mut subscriptions);
259                let prev = subscriptions.insert(id, sender);
260                assert!(
261                    prev.is_none(),
262                    "persistent state subscription must be unique"
263                );
264            }
265            self.inner.subscriptions.store(subscriptions);
266        }
267
268        let receiver = PersistentStateReceiver {
269            id,
270            inner: Arc::downgrade(&self.inner),
271            receiver,
272        };
273
274        let initial_states = self
275            .inner
276            .descriptor_cache
277            .iter()
278            .map(|item| PersistentState {
279                block_id: item.key().block_id,
280                kind: item.key().kind,
281                cached: item.value().clone(),
282            })
283            .collect();
284
285        (initial_states, receiver)
286    }
287
288    pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
289        self.inner.descriptor_cache.contains_key(&CacheKey {
290            block_id: *block_id,
291            kind,
292        })
293    }
294
295    pub fn get_state_info(
296        &self,
297        block_id: &BlockId,
298        kind: PersistentStateKind,
299    ) -> Option<PersistentStateInfo> {
300        self.inner
301            .descriptor_cache
302            .get(&CacheKey {
303                block_id: *block_id,
304                kind,
305            })
306            .and_then(|cached| {
307                let size = NonZeroU64::new(cached.file.length() as u64)?;
308                Some(PersistentStateInfo {
309                    size,
310                    chunk_size: self.state_chunk_size(),
311                })
312            })
313    }
314
315    pub async fn read_state_part(
316        &self,
317        block_id: &BlockId,
318        offset: u64,
319        state_kind: PersistentStateKind,
320    ) -> Option<Vec<u8>> {
321        // NOTE: Should be noop on x64
322        let offset = usize::try_from(offset).ok()?;
323        let chunk_size = self.state_chunk_size().get() as usize;
324        if offset % chunk_size != 0 {
325            return None;
326        }
327
328        let permit = {
329            let semaphore = self.inner.chunks_semaphore.clone();
330            semaphore.acquire_owned().await.ok()?
331        };
332
333        let key = CacheKey {
334            block_id: *block_id,
335            kind: state_kind,
336        };
337        let cached = self.inner.descriptor_cache.get(&key)?.clone();
338        if offset > cached.file.length() {
339            return None;
340        }
341
342        // NOTE: Cached file is a mapped file, therefore it can take a while to read from it.
343        // NOTE: `spawn_blocking` is called here because it is mostly IO-bound operation.
344        // TODO: Add semaphore to limit the number of concurrent operations.
345        tokio::task::spawn_blocking(move || {
346            // Ensure that permit is dropped only after cached state is used.
347            let _permit = permit;
348
349            let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
350            cached.file.as_slice()[offset..end].to_vec()
351        })
352        .await
353        .ok()
354    }
355
356    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
357    pub async fn store_shard_state(
358        &self,
359        mc_seqno: u32,
360        handle: &BlockHandle,
361        tracker_handle: RefMcStateHandle,
362    ) -> Result<()> {
363        if self
364            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
365            .await?
366        {
367            return Ok(());
368        }
369
370        let cancelled = CancellationFlag::new();
371        scopeguard::defer! {
372            cancelled.cancel();
373        }
374
375        let handle = handle.clone();
376        let this = self.inner.clone();
377        let cancelled = cancelled.clone();
378        let span = tracing::Span::current();
379
380        let state = tokio::task::spawn_blocking(move || {
381            let _span = span.enter();
382
383            let guard = scopeguard::guard((), |_| {
384                tracing::warn!("cancelled");
385            });
386
387            // NOTE: Ensure that the tracker handle will outlive the state writer.
388            let _tracker_handle = tracker_handle;
389
390            let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
391
392            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
393
394            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
395            match cell_writer.write(&root_hash, Some(&cancelled)) {
396                Ok(_) => {
397                    this.block_handles.set_has_persistent_shard_state(&handle);
398                    tracing::info!("persistent shard state saved");
399                }
400                Err(e) => {
401                    // NOTE: We are ignoring an error here. It might be intentional
402                    tracing::error!("failed to write persistent shard state: {e:?}");
403                }
404            }
405
406            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
407
408            scopeguard::ScopeGuard::into_inner(guard);
409            Ok::<_, anyhow::Error>(state)
410        })
411        .await??;
412
413        self.notify_with_persistent_state(&state).await;
414        Ok(())
415    }
416
417    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
418    pub async fn store_shard_state_file(
419        &self,
420        mc_seqno: u32,
421        handle: &BlockHandle,
422        file: File,
423    ) -> Result<()> {
424        if self
425            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
426            .await?
427        {
428            return Ok(());
429        }
430
431        let cancelled = CancellationFlag::new();
432        scopeguard::defer! {
433            cancelled.cancel();
434        }
435
436        let handle = handle.clone();
437        let this = self.inner.clone();
438        let cancelled = cancelled.clone();
439        let span = tracing::Span::current();
440
441        let state = tokio::task::spawn_blocking(move || {
442            let _span = span.enter();
443
444            let guard = scopeguard::guard((), |_| {
445                tracing::warn!("cancelled");
446            });
447
448            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
449
450            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
451            cell_writer.write_file(file, Some(&cancelled))?;
452            this.block_handles.set_has_persistent_shard_state(&handle);
453            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
454
455            scopeguard::ScopeGuard::into_inner(guard);
456            Ok::<_, anyhow::Error>(state)
457        })
458        .await??;
459
460        self.notify_with_persistent_state(&state).await;
461        Ok(())
462    }
463
464    #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
465    pub async fn store_queue_state(
466        &self,
467        mc_seqno: u32,
468        handle: &BlockHandle,
469        block: BlockStuff,
470    ) -> Result<()> {
471        if self
472            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
473            .await?
474        {
475            return Ok(());
476        }
477
478        let this = self.inner.clone();
479
480        let shard_ident = handle.id().shard;
481
482        let mut queue_diffs = Vec::new();
483        let mut messages = Vec::new();
484
485        let mut top_block_handle = handle.clone();
486        let mut top_block = block;
487
488        let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
489
490        while tail_len > 0 {
491            let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
492            let top_block_info = top_block.load_info()?;
493
494            let block_extra = top_block.load_extra()?;
495            let out_messages = block_extra.load_out_msg_description()?;
496
497            messages.push(queue_diff.zip(&out_messages));
498            queue_diffs.push(queue_diff.diff().clone());
499
500            if tail_len == 1 {
501                break;
502            }
503
504            let prev_block_id = match top_block_info.load_prev_ref()? {
505                PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
506                PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
507            };
508
509            let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
510                anyhow::bail!("prev block handle not found for: {prev_block_id}");
511            };
512            let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
513
514            top_block_handle = prev_block_handle;
515            top_block = prev_block;
516            tail_len -= 1;
517        }
518
519        let state = QueueStateHeader {
520            shard_ident,
521            seqno: handle.id().seqno,
522            queue_diffs,
523        };
524
525        let cancelled = CancellationFlag::new();
526        scopeguard::defer! {
527            cancelled.cancel();
528        }
529
530        let handle = handle.clone();
531        let cancelled = cancelled.clone();
532        let span = tracing::Span::current();
533
534        let state = tokio::task::spawn_blocking(move || {
535            let _span = span.enter();
536
537            let guard = scopeguard::guard((), |_| {
538                tracing::warn!("cancelled");
539            });
540
541            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
542            match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
543                .write(Some(&cancelled))
544            {
545                Ok(()) => {
546                    this.block_handles.set_has_persistent_queue_state(&handle);
547                    tracing::info!("persistent queue state saved");
548                }
549                Err(e) => {
550                    tracing::error!("failed to write persistent queue state: {e:?}");
551                }
552            }
553
554            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
555
556            scopeguard::ScopeGuard::into_inner(guard);
557            Ok::<_, anyhow::Error>(state)
558        })
559        .await??;
560
561        self.notify_with_persistent_state(&state).await;
562        Ok(())
563    }
564
565    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
566    pub async fn store_queue_state_file(
567        &self,
568        mc_seqno: u32,
569        handle: &BlockHandle,
570        file: File,
571    ) -> Result<()> {
572        if self
573            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
574            .await?
575        {
576            return Ok(());
577        }
578
579        let cancelled = CancellationFlag::new();
580        scopeguard::defer! {
581            cancelled.cancel();
582        }
583
584        let handle = handle.clone();
585        let this = self.inner.clone();
586        let cancelled = cancelled.clone();
587        let span = tracing::Span::current();
588
589        let state = tokio::task::spawn_blocking(move || {
590            let _span = span.enter();
591
592            let guard = scopeguard::guard((), |_| {
593                tracing::warn!("cancelled");
594            });
595
596            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
597
598            QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
599            this.block_handles.set_has_persistent_queue_state(&handle);
600            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
601
602            scopeguard::ScopeGuard::into_inner(guard);
603            Ok::<_, anyhow::Error>(state)
604        })
605        .await??;
606
607        self.notify_with_persistent_state(&state).await;
608        Ok(())
609    }
610
611    pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
612        anyhow::ensure!(
613            top_handle.is_masterchain(),
614            "top persistent state handle must be in the masterchain"
615        );
616
617        {
618            tracing::info!(
619                mc_block_id = %top_handle.id(),
620                "adding new persistent state to the queue"
621            );
622
623            let mut queue = self.inner.handles_queue.lock();
624            if queue.push(top_handle.clone()) {
625                self.inner
626                    .oldest_ps_handle
627                    .store(queue.oldest_known().cloned());
628                self.inner.oldest_ps_changed.notify_waiters();
629            }
630        }
631
632        tracing::info!("started clearing old persistent state directories");
633        let start = Instant::now();
634        scopeguard::defer! {
635            tracing::info!(
636                elapsed = %humantime::format_duration(start.elapsed()),
637                "clearing old persistent state directories completed"
638            );
639        }
640
641        let this = self.inner.clone();
642        let zerostate_seqno = this
643            .node_state
644            .load_zerostate_mc_seqno()
645            .unwrap_or_default();
646
647        let mut top_handle = top_handle.clone();
648        if top_handle.id().seqno <= zerostate_seqno {
649            // Nothing to clear for the zerostate
650            return Ok(());
651        }
652
653        let span = tracing::Span::current();
654        tokio::task::spawn_blocking(move || {
655            let _span = span.enter();
656
657            let block_handles = &this.block_handles;
658
659            let now_utime = top_handle.gen_utime();
660
661            // Find a state before the
662            let mut has_suitable = false;
663            loop {
664                match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
665                    // Find the newest usable persistent state...
666                    Some(handle) if !has_suitable => {
667                        has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
668                        top_handle = handle;
669                    }
670                    // ...and return the previous one.
671                    Some(handle) => {
672                        top_handle = handle;
673                        break;
674                    }
675                    // Or do nothing if not found.
676                    None => return Ok(()),
677                }
678            }
679
680            // Remove cached states
681            let mut index = this.mc_seqno_to_block_ids.lock();
682            index.retain(|&mc_seqno, block_ids| {
683                if mc_seqno >= top_handle.id().seqno || mc_seqno <= zerostate_seqno {
684                    return true;
685                }
686
687                for block_id in block_ids.drain() {
688                    // TODO: Clear flag in block handle
689                    this.clear_cache(&block_id);
690                }
691                false
692            });
693
694            // Remove files
695            this.clear_outdated_state_entries(top_handle.id(), zerostate_seqno)
696        })
697        .await?
698    }
699
700    async fn try_reuse_persistent_state(
701        &self,
702        mc_seqno: u32,
703        handle: &BlockHandle,
704        kind: PersistentStateKind,
705    ) -> Result<bool> {
706        // Check if there is anything to reuse (return false if nothing)
707        match kind {
708            PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
709            PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
710            _ => {}
711        }
712
713        let block_id = *handle.id();
714
715        let Some(cached) = self
716            .inner
717            .descriptor_cache
718            .get(&CacheKey { block_id, kind })
719            .map(|r| r.clone())
720        else {
721            // Nothing to reuse
722            return Ok(false);
723        };
724
725        if cached.mc_seqno >= mc_seqno {
726            // We already have the recent enough state
727            return Ok(true);
728        }
729
730        let this = self.inner.clone();
731
732        let span = tracing::Span::current();
733        let state = tokio::task::spawn_blocking(move || {
734            let _span = span.enter();
735
736            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
737
738            let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
739            std::fs::write(temp_file.path(), cached.file.as_slice())?;
740            temp_file.rename(kind.make_file_name(&block_id))?;
741
742            drop(cached);
743
744            this.cache_state(mc_seqno, &block_id, kind)
745        })
746        .await??;
747
748        self.notify_with_persistent_state(&state).await;
749        Ok(true)
750    }
751
752    async fn notify_with_persistent_state(&self, state: &PersistentState) {
753        let subscriptions = self.inner.subscriptions.load_full();
754        for sender in subscriptions.values() {
755            sender.send(state.clone()).await.ok();
756        }
757    }
758}
759
760struct Inner {
761    cells_db: CellsDb,
762    storage_dir: Dir,
763    node_state: Arc<NodeStateStorage>,
764    block_handles: Arc<BlockHandleStorage>,
765    blocks: Arc<BlockStorage>,
766    shard_states: Arc<ShardStateStorage>,
767    descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
768    mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
769    chunks_semaphore: Arc<Semaphore>,
770    handles_queue: Mutex<HandlesQueue>,
771    oldest_ps_changed: Notify,
772    oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
773    subscriptions: ArcSwap<FastHashMap<usize, mpsc::Sender<PersistentState>>>,
774    subscriptions_mutex: Mutex<()>,
775}
776
777impl Inner {
778    fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
779        let states_dir = self.mc_states_dir(mc_seqno);
780        if !states_dir.path().is_dir() {
781            tracing::info!(mc_seqno, "creating persistent state directory");
782            states_dir.create_if_not_exists()?;
783        }
784        Ok(states_dir)
785    }
786
787    fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
788        Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
789    }
790
791    fn clear_outdated_state_entries(
792        &self,
793        recent_block_id: &BlockId,
794        zerostate_seqno: u32,
795    ) -> Result<()> {
796        let mut directories_to_remove: Vec<PathBuf> = Vec::new();
797        let mut files_to_remove: Vec<PathBuf> = Vec::new();
798
799        for entry in self.storage_dir.entries()?.flatten() {
800            let path = entry.path();
801
802            if path.is_file() {
803                files_to_remove.push(path);
804                continue;
805            }
806
807            let Ok(name) = entry.file_name().into_string() else {
808                directories_to_remove.push(path);
809                continue;
810            };
811
812            let is_recent = matches!(
813                name.parse::<u32>(),
814                Ok(seqno) if seqno >= recent_block_id.seqno || seqno <= zerostate_seqno
815            );
816            if !is_recent {
817                directories_to_remove.push(path);
818            }
819        }
820
821        for dir in directories_to_remove {
822            tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
823            if let Err(e) = std::fs::remove_dir_all(&dir) {
824                tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
825            }
826        }
827
828        for file in files_to_remove {
829            tracing::info!(file = %file.display(), "removing file");
830            if let Err(e) = std::fs::remove_file(&file) {
831                tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
832            }
833        }
834
835        Ok(())
836    }
837
838    fn cache_state(
839        &self,
840        mc_seqno: u32,
841        block_id: &BlockId,
842        kind: PersistentStateKind,
843    ) -> Result<PersistentState> {
844        use std::collections::btree_map;
845
846        use dashmap::mapref::entry::Entry;
847
848        let key = CacheKey {
849            block_id: *block_id,
850            kind,
851        };
852
853        let load_mapped = || {
854            let mut file = self
855                .mc_states_dir(mc_seqno)
856                .file(kind.make_file_name(block_id))
857                .read(true)
858                .open()?;
859
860            // We create a copy of the original file here to make sure
861            // that the underlying mapped file will not be changed outside
862            // of the node. Otherwise it will randomly fail with exit code 7/BUS.
863            let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
864                .context("failed to create a temp file")?;
865
866            // Underlying implementation will call something like `copy_file_range`,
867            // and we hope that it will be just COW pages.
868            // TODO: Find a way to cancel this operation.
869            std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
870            temp_file.flush()?;
871            temp_file.seek(std::io::SeekFrom::Start(0))?;
872
873            MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
874        };
875
876        let file =
877            load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
878
879        let new_state = Arc::new(CachedState { mc_seqno, file });
880
881        let prev_mc_seqno = match self.descriptor_cache.entry(key) {
882            Entry::Vacant(entry) => {
883                entry.insert(new_state.clone());
884                None
885            }
886            Entry::Occupied(mut entry) => {
887                let prev_mc_seqno = entry.get().mc_seqno;
888                if mc_seqno <= prev_mc_seqno {
889                    // Cache only the most recent block (if changed)
890                    return Ok(PersistentState {
891                        block_id: *block_id,
892                        kind,
893                        cached: entry.get().clone(),
894                    });
895                }
896
897                entry.insert(new_state.clone());
898                Some(prev_mc_seqno)
899            }
900        };
901
902        let mut index = self.mc_seqno_to_block_ids.lock();
903
904        // Remove previous entry if exists
905        if let Some(prev_mc_seqno) = prev_mc_seqno
906            && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
907        {
908            entry.get_mut().remove(block_id);
909            if entry.get().is_empty() {
910                entry.remove();
911            }
912        }
913
914        index.entry(mc_seqno).or_default().insert(*block_id);
915
916        Ok(PersistentState {
917            block_id: *block_id,
918            kind,
919            cached: new_state,
920        })
921    }
922
923    fn clear_cache(&self, block_id: &BlockId) {
924        self.descriptor_cache.remove(&CacheKey {
925            block_id: *block_id,
926            kind: PersistentStateKind::Shard,
927        });
928        self.descriptor_cache.remove(&CacheKey {
929            block_id: *block_id,
930            kind: PersistentStateKind::Queue,
931        });
932    }
933}
934
935#[derive(Debug, Clone, Copy)]
936pub struct PersistentStateInfo {
937    pub size: NonZeroU64,
938    pub chunk_size: NonZeroU32,
939}
940
941#[derive(Clone)]
942pub struct PersistentState {
943    block_id: BlockId,
944    kind: PersistentStateKind,
945    cached: Arc<CachedState>,
946}
947
948impl PersistentState {
949    pub fn block_id(&self) -> &BlockId {
950        &self.block_id
951    }
952
953    pub fn kind(&self) -> PersistentStateKind {
954        self.kind
955    }
956
957    pub fn file(&self) -> &MappedFile {
958        &self.cached.file
959    }
960
961    pub fn mc_seqno(&self) -> u32 {
962        self.cached.mc_seqno
963    }
964}
965
966pub struct PersistentStateReceiver {
967    id: usize,
968    inner: Weak<Inner>,
969    receiver: mpsc::Receiver<PersistentState>,
970}
971
972impl std::ops::Deref for PersistentStateReceiver {
973    type Target = mpsc::Receiver<PersistentState>;
974
975    #[inline]
976    fn deref(&self) -> &Self::Target {
977        &self.receiver
978    }
979}
980
981impl std::ops::DerefMut for PersistentStateReceiver {
982    #[inline]
983    fn deref_mut(&mut self) -> &mut Self::Target {
984        &mut self.receiver
985    }
986}
987
988impl Drop for PersistentStateReceiver {
989    fn drop(&mut self) {
990        if let Some(inner) = self.inner.upgrade() {
991            let _guard = inner.subscriptions_mutex.lock();
992            let mut subscriptions = inner.subscriptions.load_full();
993            {
994                let subscriptions = Arc::make_mut(&mut subscriptions);
995                subscriptions.remove(&self.id);
996            }
997            inner.subscriptions.store(subscriptions);
998        }
999    }
1000}
1001
1002static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0);
1003
1004struct CachedState {
1005    mc_seqno: u32,
1006    file: MappedFile,
1007}
1008
1009#[derive(Default)]
1010struct HandlesQueue {
1011    handles: VecDeque<BlockHandle>,
1012}
1013
1014impl HandlesQueue {
1015    fn oldest_known(&self) -> Option<&BlockHandle> {
1016        self.handles.back()
1017    }
1018
1019    fn push(&mut self, new_handle: BlockHandle) -> bool {
1020        // Allow only new blocks
1021        if let Some(newest) = self.handles.front()
1022            && newest.id().seqno >= new_handle.id().seqno
1023        {
1024            return false;
1025        }
1026
1027        // Remove too old states
1028        let now_utime = new_handle.gen_utime();
1029        let mut has_suitable = false;
1030        self.handles.retain(|old_handle| {
1031            if !has_suitable {
1032                has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
1033                true
1034            } else {
1035                false
1036            }
1037        });
1038
1039        // Add the new one
1040        self.handles.push_front(new_handle);
1041        true
1042    }
1043}
1044
1045const STATE_CHUNK_SIZE: u64 = 1024 * 1024; // 1 MB