tycho_core/storage/persistent_state/
mod.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Weak};
8
9use anyhow::{Context, Result};
10use arc_swap::{ArcSwap, ArcSwapAny};
11use dashmap::DashMap;
12use parking_lot::Mutex;
13use tokio::sync::{Notify, Semaphore, mpsc};
14use tokio::time::Instant;
15use tycho_block_util::block::BlockStuff;
16use tycho_block_util::queue::QueueStateHeader;
17use tycho_block_util::state::RefMcStateHandle;
18use tycho_storage::fs::Dir;
19use tycho_types::models::{BlockId, PrevBlockRef};
20use tycho_util::fs::MappedFile;
21use tycho_util::sync::CancellationFlag;
22use tycho_util::{FastHashMap, FastHashSet};
23
24pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
25pub use self::queue_state::writer::QueueStateWriter;
26pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
27pub use self::shard_state::writer::ShardStateWriter;
28use super::{
29    BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, ShardStateStorage,
30};
31
32mod queue_state {
33    pub mod reader;
34    pub mod writer;
35}
36mod shard_state {
37    pub mod reader;
38    pub mod writer;
39}
40
41#[cfg(test)]
42mod tests;
43
44const BASE_DIR: &str = "states";
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub enum PersistentStateKind {
48    Shard,
49    Queue,
50}
51
52impl PersistentStateKind {
53    pub fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
54        match self {
55            Self::Shard => ShardStateWriter::file_name(block_id),
56            Self::Queue => QueueStateWriter::file_name(block_id),
57        }
58    }
59
60    pub fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
61        match self {
62            Self::Shard => ShardStateWriter::temp_file_name(block_id),
63            Self::Queue => QueueStateWriter::temp_file_name(block_id),
64        }
65    }
66
67    pub fn from_extension(extension: &str) -> Option<Self> {
68        match extension {
69            ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
70            QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
71            _ => None,
72        }
73    }
74}
75
76#[derive(Debug, Eq, Hash, PartialEq)]
77struct CacheKey {
78    block_id: BlockId,
79    kind: PersistentStateKind,
80}
81
82#[derive(Clone)]
83pub struct PersistentStateStorage {
84    inner: Arc<Inner>,
85}
86
87impl PersistentStateStorage {
88    pub fn new(
89        cells_db: CellsDb,
90        files_dir: &Dir,
91        block_handle_storage: Arc<BlockHandleStorage>,
92        block_storage: Arc<BlockStorage>,
93        shard_state_storage: Arc<ShardStateStorage>,
94    ) -> Result<Self> {
95        const MAX_PARALLEL_CHUNK_READS: usize = 20;
96
97        let storage_dir = files_dir.create_subdir(BASE_DIR)?;
98
99        Ok(Self {
100            inner: Arc::new(Inner {
101                cells_db,
102                storage_dir,
103                block_handles: block_handle_storage,
104                blocks: block_storage,
105                shard_states: shard_state_storage,
106                descriptor_cache: Default::default(),
107                mc_seqno_to_block_ids: Default::default(),
108                chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
109                handles_queue: Default::default(),
110                oldest_ps_changed: Default::default(),
111                oldest_ps_handle: Default::default(),
112                subscriptions: Default::default(),
113                subscriptions_mutex: Default::default(),
114            }),
115        })
116    }
117
118    pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
119        self.inner.oldest_ps_handle.load_full()
120    }
121
122    pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
123        self.inner.oldest_ps_changed.notified()
124    }
125
126    #[tracing::instrument(skip_all)]
127    pub async fn preload(&self) -> Result<()> {
128        self.preload_handles_queue()?;
129        self.preload_states().await
130    }
131
132    fn preload_handles_queue(&self) -> Result<()> {
133        let this = self.inner.as_ref();
134
135        let block_handles = this.block_handles.as_ref();
136
137        let mut changed = false;
138        let mut prev_utime = 0;
139        for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
140            let block_handle = block_handles
141                .load_handle(&block_id)
142                .context("key block handle not found")?;
143
144            let gen_utime = block_handle.gen_utime();
145            if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
146                prev_utime = gen_utime;
147
148                let mut queue = this.handles_queue.lock();
149                if queue.push(block_handle) {
150                    this.oldest_ps_handle.store(queue.oldest_known().cloned());
151                    changed = true;
152                }
153            }
154        }
155
156        if changed {
157            this.oldest_ps_changed.notify_waiters();
158        }
159        Ok(())
160    }
161
162    async fn preload_states(&self) -> Result<()> {
163        // For each mc_seqno directory
164        let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
165            'outer: for entry in std::fs::read_dir(dir)?.flatten() {
166                let path = entry.path();
167                // Skip subdirectories
168                if path.is_dir() {
169                    tracing::warn!(path = %path.display(), "unexpected directory");
170                    continue;
171                }
172
173                'file: {
174                    // Try to parse the file name as a block_id
175                    let Ok(block_id) = path
176                        // TODO should use file_prefix
177                        .file_stem()
178                        .unwrap_or_default()
179                        .to_str()
180                        .unwrap_or_default()
181                        .parse::<BlockId>()
182                    else {
183                        break 'file;
184                    };
185
186                    let extension = path
187                        .extension()
188                        .and_then(|ext| ext.to_str())
189                        .unwrap_or_default();
190
191                    let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
192                        break 'file;
193                    };
194
195                    this.cache_state(mc_seqno, &block_id, cache_type)?;
196                    continue 'outer;
197                }
198                tracing::warn!(path = %path.display(), "unexpected file");
199            }
200            Ok(())
201        };
202
203        let this = self.inner.clone();
204        let span = tracing::Span::current();
205        tokio::task::spawn_blocking(move || {
206            let _span = span.enter();
207
208            // For each entry in the storage directory
209            'outer: for entry in this.storage_dir.entries()?.flatten() {
210                let path = entry.path();
211                // Skip files
212                if path.is_file() {
213                    tracing::warn!(path = %path.display(), "unexpected file");
214                    continue;
215                }
216
217                'dir: {
218                    // Try to parse the directory name as an mc_seqno
219                    let Ok(name) = entry.file_name().into_string() else {
220                        break 'dir;
221                    };
222                    let Ok(mc_seqno) = name.parse::<u32>() else {
223                        break 'dir;
224                    };
225
226                    // Try to load files in the directory as persistent states
227                    process_states(&this, &path, mc_seqno)?;
228                    continue 'outer;
229                }
230                tracing::warn!(path = %path.display(), "unexpected directory");
231            }
232
233            Ok(())
234        })
235        .await?
236    }
237
238    // NOTE: This is intentionally a method, not a constant because
239    // it might be useful to allow configure it during the first run.
240    pub fn state_chunk_size(&self) -> NonZeroU32 {
241        NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
242    }
243
244    pub fn subscribe(&self) -> (Vec<PersistentState>, PersistentStateReceiver) {
245        let id = RECEIVER_ID.fetch_add(1, Ordering::Relaxed);
246        let (sender, receiver) = mpsc::channel(1);
247
248        // TODO: Hold `_guard` for the whole method body? So that we can know
249        // that no states will be send to subscriptions while we are collecting
250        // the current cache snapshot.
251        {
252            let _guard = self.inner.subscriptions_mutex.lock();
253            let mut subscriptions = self.inner.subscriptions.load_full();
254            {
255                let subscriptions = Arc::make_mut(&mut subscriptions);
256                let prev = subscriptions.insert(id, sender);
257                assert!(
258                    prev.is_none(),
259                    "persistent state subscription must be unique"
260                );
261            }
262            self.inner.subscriptions.store(subscriptions);
263        }
264
265        let receiver = PersistentStateReceiver {
266            id,
267            inner: Arc::downgrade(&self.inner),
268            receiver,
269        };
270
271        let initial_states = self
272            .inner
273            .descriptor_cache
274            .iter()
275            .map(|item| PersistentState {
276                block_id: item.key().block_id,
277                kind: item.key().kind,
278                cached: item.value().clone(),
279            })
280            .collect();
281
282        (initial_states, receiver)
283    }
284
285    pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
286        self.inner.descriptor_cache.contains_key(&CacheKey {
287            block_id: *block_id,
288            kind,
289        })
290    }
291
292    pub fn get_state_info(
293        &self,
294        block_id: &BlockId,
295        kind: PersistentStateKind,
296    ) -> Option<PersistentStateInfo> {
297        self.inner
298            .descriptor_cache
299            .get(&CacheKey {
300                block_id: *block_id,
301                kind,
302            })
303            .and_then(|cached| {
304                let size = NonZeroU64::new(cached.file.length() as u64)?;
305                Some(PersistentStateInfo {
306                    size,
307                    chunk_size: self.state_chunk_size(),
308                })
309            })
310    }
311
312    pub async fn read_state_part(
313        &self,
314        block_id: &BlockId,
315        offset: u64,
316        state_kind: PersistentStateKind,
317    ) -> Option<Vec<u8>> {
318        // NOTE: Should be noop on x64
319        let offset = usize::try_from(offset).ok()?;
320        let chunk_size = self.state_chunk_size().get() as usize;
321        if offset % chunk_size != 0 {
322            return None;
323        }
324
325        let permit = {
326            let semaphore = self.inner.chunks_semaphore.clone();
327            semaphore.acquire_owned().await.ok()?
328        };
329
330        let key = CacheKey {
331            block_id: *block_id,
332            kind: state_kind,
333        };
334        let cached = self.inner.descriptor_cache.get(&key)?.clone();
335        if offset > cached.file.length() {
336            return None;
337        }
338
339        // NOTE: Cached file is a mapped file, therefore it can take a while to read from it.
340        // NOTE: `spawn_blocking` is called here because it is mostly IO-bound operation.
341        // TODO: Add semaphore to limit the number of concurrent operations.
342        tokio::task::spawn_blocking(move || {
343            // Ensure that permit is dropped only after cached state is used.
344            let _permit = permit;
345
346            let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
347            cached.file.as_slice()[offset..end].to_vec()
348        })
349        .await
350        .ok()
351    }
352
353    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
354    pub async fn store_shard_state(
355        &self,
356        mc_seqno: u32,
357        handle: &BlockHandle,
358        tracker_handle: RefMcStateHandle,
359    ) -> Result<()> {
360        if self
361            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
362            .await?
363        {
364            return Ok(());
365        }
366
367        let cancelled = CancellationFlag::new();
368        scopeguard::defer! {
369            cancelled.cancel();
370        }
371
372        let handle = handle.clone();
373        let this = self.inner.clone();
374        let cancelled = cancelled.clone();
375        let span = tracing::Span::current();
376
377        let state = tokio::task::spawn_blocking(move || {
378            let _span = span.enter();
379
380            let guard = scopeguard::guard((), |_| {
381                tracing::warn!("cancelled");
382            });
383
384            // NOTE: Ensure that the tracker handle will outlive the state writer.
385            let _tracker_handle = tracker_handle;
386
387            let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
388
389            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
390
391            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
392            match cell_writer.write(&root_hash, Some(&cancelled)) {
393                Ok(()) => {
394                    this.block_handles.set_has_persistent_shard_state(&handle);
395                    tracing::info!("persistent shard state saved");
396                }
397                Err(e) => {
398                    // NOTE: We are ignoring an error here. It might be intentional
399                    tracing::error!("failed to write persistent shard state: {e:?}");
400                }
401            }
402
403            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
404
405            scopeguard::ScopeGuard::into_inner(guard);
406            Ok::<_, anyhow::Error>(state)
407        })
408        .await??;
409
410        self.notify_with_persistent_state(&state).await;
411        Ok(())
412    }
413
414    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
415    pub async fn store_shard_state_file(
416        &self,
417        mc_seqno: u32,
418        handle: &BlockHandle,
419        file: File,
420    ) -> Result<()> {
421        if self
422            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
423            .await?
424        {
425            return Ok(());
426        }
427
428        let cancelled = CancellationFlag::new();
429        scopeguard::defer! {
430            cancelled.cancel();
431        }
432
433        let handle = handle.clone();
434        let this = self.inner.clone();
435        let cancelled = cancelled.clone();
436        let span = tracing::Span::current();
437
438        let state = tokio::task::spawn_blocking(move || {
439            let _span = span.enter();
440
441            let guard = scopeguard::guard((), |_| {
442                tracing::warn!("cancelled");
443            });
444
445            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
446
447            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
448            cell_writer.write_file(file, Some(&cancelled))?;
449            this.block_handles.set_has_persistent_shard_state(&handle);
450            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
451
452            scopeguard::ScopeGuard::into_inner(guard);
453            Ok::<_, anyhow::Error>(state)
454        })
455        .await??;
456
457        self.notify_with_persistent_state(&state).await;
458        Ok(())
459    }
460
461    #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
462    pub async fn store_queue_state(
463        &self,
464        mc_seqno: u32,
465        handle: &BlockHandle,
466        block: BlockStuff,
467    ) -> Result<()> {
468        if self
469            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
470            .await?
471        {
472            return Ok(());
473        }
474
475        let this = self.inner.clone();
476
477        let shard_ident = handle.id().shard;
478
479        let mut queue_diffs = Vec::new();
480        let mut messages = Vec::new();
481
482        let mut top_block_handle = handle.clone();
483        let mut top_block = block;
484
485        let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
486
487        while tail_len > 0 {
488            let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
489            let top_block_info = top_block.load_info()?;
490
491            let block_extra = top_block.load_extra()?;
492            let out_messages = block_extra.load_out_msg_description()?;
493
494            messages.push(queue_diff.zip(&out_messages));
495            queue_diffs.push(queue_diff.diff().clone());
496
497            if tail_len == 1 {
498                break;
499            }
500
501            let prev_block_id = match top_block_info.load_prev_ref()? {
502                PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
503                PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
504            };
505
506            let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
507                anyhow::bail!("prev block handle not found for: {prev_block_id}");
508            };
509            let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
510
511            top_block_handle = prev_block_handle;
512            top_block = prev_block;
513            tail_len -= 1;
514        }
515
516        let state = QueueStateHeader {
517            shard_ident,
518            seqno: handle.id().seqno,
519            queue_diffs,
520        };
521
522        let cancelled = CancellationFlag::new();
523        scopeguard::defer! {
524            cancelled.cancel();
525        }
526
527        let handle = handle.clone();
528        let cancelled = cancelled.clone();
529        let span = tracing::Span::current();
530
531        let state = tokio::task::spawn_blocking(move || {
532            let _span = span.enter();
533
534            let guard = scopeguard::guard((), |_| {
535                tracing::warn!("cancelled");
536            });
537
538            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
539            match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
540                .write(Some(&cancelled))
541            {
542                Ok(()) => {
543                    this.block_handles.set_has_persistent_queue_state(&handle);
544                    tracing::info!("persistent queue state saved");
545                }
546                Err(e) => {
547                    tracing::error!("failed to write persistent queue state: {e:?}");
548                }
549            }
550
551            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
552
553            scopeguard::ScopeGuard::into_inner(guard);
554            Ok::<_, anyhow::Error>(state)
555        })
556        .await??;
557
558        self.notify_with_persistent_state(&state).await;
559        Ok(())
560    }
561
562    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
563    pub async fn store_queue_state_file(
564        &self,
565        mc_seqno: u32,
566        handle: &BlockHandle,
567        file: File,
568    ) -> Result<()> {
569        if self
570            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
571            .await?
572        {
573            return Ok(());
574        }
575
576        let cancelled = CancellationFlag::new();
577        scopeguard::defer! {
578            cancelled.cancel();
579        }
580
581        let handle = handle.clone();
582        let this = self.inner.clone();
583        let cancelled = cancelled.clone();
584        let span = tracing::Span::current();
585
586        let state = tokio::task::spawn_blocking(move || {
587            let _span = span.enter();
588
589            let guard = scopeguard::guard((), |_| {
590                tracing::warn!("cancelled");
591            });
592
593            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
594
595            QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
596            this.block_handles.set_has_persistent_queue_state(&handle);
597            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
598
599            scopeguard::ScopeGuard::into_inner(guard);
600            Ok::<_, anyhow::Error>(state)
601        })
602        .await??;
603
604        self.notify_with_persistent_state(&state).await;
605        Ok(())
606    }
607
608    pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
609        anyhow::ensure!(
610            top_handle.is_masterchain(),
611            "top persistent state handle must be in the masterchain"
612        );
613
614        {
615            tracing::info!(
616                mc_block_id = %top_handle.id(),
617                "adding new persistent state to the queue"
618            );
619
620            let mut queue = self.inner.handles_queue.lock();
621            if queue.push(top_handle.clone()) {
622                self.inner
623                    .oldest_ps_handle
624                    .store(queue.oldest_known().cloned());
625                self.inner.oldest_ps_changed.notify_waiters();
626            }
627        }
628
629        tracing::info!("started clearing old persistent state directories");
630        let start = Instant::now();
631        scopeguard::defer! {
632            tracing::info!(
633                elapsed = %humantime::format_duration(start.elapsed()),
634                "clearing old persistent state directories completed"
635            );
636        }
637
638        let this = self.inner.clone();
639        let mut top_handle = top_handle.clone();
640        if top_handle.id().seqno == 0 {
641            // Nothing to clear for the zerostate
642            return Ok(());
643        }
644
645        let span = tracing::Span::current();
646        tokio::task::spawn_blocking(move || {
647            let _span = span.enter();
648
649            let block_handles = &this.block_handles;
650
651            let now_utime = top_handle.gen_utime();
652
653            // Find a state before the
654            let mut has_suitable = false;
655            loop {
656                match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
657                    // Find the newest usable persistent state...
658                    Some(handle) if !has_suitable => {
659                        has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
660                        top_handle = handle;
661                    }
662                    // ...and return the previous one.
663                    Some(handle) => {
664                        top_handle = handle;
665                        break;
666                    }
667                    // Or do nothing if not found.
668                    None => return Ok(()),
669                }
670            }
671
672            // Remove cached states
673            let mut index = this.mc_seqno_to_block_ids.lock();
674            index.retain(|&mc_seqno, block_ids| {
675                if mc_seqno >= top_handle.id().seqno || mc_seqno == 0 {
676                    return true;
677                }
678
679                for block_id in block_ids.drain() {
680                    // TODO: Clear flag in block handle
681                    this.clear_cache(&block_id);
682                }
683                false
684            });
685
686            // Remove files
687            this.clear_outdated_state_entries(top_handle.id())
688        })
689        .await?
690    }
691
692    async fn try_reuse_persistent_state(
693        &self,
694        mc_seqno: u32,
695        handle: &BlockHandle,
696        kind: PersistentStateKind,
697    ) -> Result<bool> {
698        // Check if there is anything to reuse (return false if nothing)
699        match kind {
700            PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
701            PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
702            _ => {}
703        }
704
705        let block_id = *handle.id();
706
707        let Some(cached) = self
708            .inner
709            .descriptor_cache
710            .get(&CacheKey { block_id, kind })
711            .map(|r| r.clone())
712        else {
713            // Nothing to reuse
714            return Ok(false);
715        };
716
717        if cached.mc_seqno >= mc_seqno {
718            // We already have the recent enough state
719            return Ok(true);
720        }
721
722        let this = self.inner.clone();
723
724        let span = tracing::Span::current();
725        let state = tokio::task::spawn_blocking(move || {
726            let _span = span.enter();
727
728            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
729
730            let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
731            std::fs::write(temp_file.path(), cached.file.as_slice())?;
732            temp_file.rename(kind.make_file_name(&block_id))?;
733
734            drop(cached);
735
736            this.cache_state(mc_seqno, &block_id, kind)
737        })
738        .await??;
739
740        self.notify_with_persistent_state(&state).await;
741        Ok(true)
742    }
743
744    async fn notify_with_persistent_state(&self, state: &PersistentState) {
745        let subscriptions = self.inner.subscriptions.load_full();
746        for sender in subscriptions.values() {
747            sender.send(state.clone()).await.ok();
748        }
749    }
750}
751
752struct Inner {
753    cells_db: CellsDb,
754    storage_dir: Dir,
755    block_handles: Arc<BlockHandleStorage>,
756    blocks: Arc<BlockStorage>,
757    shard_states: Arc<ShardStateStorage>,
758    descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
759    mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
760    chunks_semaphore: Arc<Semaphore>,
761    handles_queue: Mutex<HandlesQueue>,
762    oldest_ps_changed: Notify,
763    oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
764    subscriptions: ArcSwap<FastHashMap<usize, mpsc::Sender<PersistentState>>>,
765    subscriptions_mutex: Mutex<()>,
766}
767
768impl Inner {
769    fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
770        let states_dir = self.mc_states_dir(mc_seqno);
771        if !states_dir.path().is_dir() {
772            tracing::info!(mc_seqno, "creating persistent state directory");
773            states_dir.create_if_not_exists()?;
774        }
775        Ok(states_dir)
776    }
777
778    fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
779        Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
780    }
781
782    fn clear_outdated_state_entries(&self, recent_block_id: &BlockId) -> Result<()> {
783        let mut directories_to_remove: Vec<PathBuf> = Vec::new();
784        let mut files_to_remove: Vec<PathBuf> = Vec::new();
785
786        for entry in self.storage_dir.entries()?.flatten() {
787            let path = entry.path();
788
789            if path.is_file() {
790                files_to_remove.push(path);
791                continue;
792            }
793
794            let Ok(name) = entry.file_name().into_string() else {
795                directories_to_remove.push(path);
796                continue;
797            };
798
799            let is_recent = matches!(
800                name.parse::<u32>(),
801                Ok(seqno) if seqno >= recent_block_id.seqno || seqno == 0
802            );
803            if !is_recent {
804                directories_to_remove.push(path);
805            }
806        }
807
808        for dir in directories_to_remove {
809            tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
810            if let Err(e) = std::fs::remove_dir_all(&dir) {
811                tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
812            }
813        }
814
815        for file in files_to_remove {
816            tracing::info!(file = %file.display(), "removing file");
817            if let Err(e) = std::fs::remove_file(&file) {
818                tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
819            }
820        }
821
822        Ok(())
823    }
824
825    fn cache_state(
826        &self,
827        mc_seqno: u32,
828        block_id: &BlockId,
829        kind: PersistentStateKind,
830    ) -> Result<PersistentState> {
831        use std::collections::btree_map;
832
833        use dashmap::mapref::entry::Entry;
834
835        let key = CacheKey {
836            block_id: *block_id,
837            kind,
838        };
839
840        let load_mapped = || {
841            let mut file = self
842                .mc_states_dir(mc_seqno)
843                .file(kind.make_file_name(block_id))
844                .read(true)
845                .open()?;
846
847            // We create a copy of the original file here to make sure
848            // that the underlying mapped file will not be changed outside
849            // of the node. Otherwise it will randomly fail with exit code 7/BUS.
850            let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
851                .context("failed to create a temp file")?;
852
853            // Underlying implementation will call something like `copy_file_range`,
854            // and we hope that it will be just COW pages.
855            // TODO: Find a way to cancel this operation.
856            std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
857            temp_file.flush()?;
858            temp_file.seek(std::io::SeekFrom::Start(0))?;
859
860            MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
861        };
862
863        let file =
864            load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
865
866        let new_state = Arc::new(CachedState { mc_seqno, file });
867
868        let prev_mc_seqno = match self.descriptor_cache.entry(key) {
869            Entry::Vacant(entry) => {
870                entry.insert(new_state.clone());
871                None
872            }
873            Entry::Occupied(mut entry) => {
874                let prev_mc_seqno = entry.get().mc_seqno;
875                if mc_seqno <= prev_mc_seqno {
876                    // Cache only the most recent block (if changed)
877                    return Ok(PersistentState {
878                        block_id: *block_id,
879                        kind,
880                        cached: entry.get().clone(),
881                    });
882                }
883
884                entry.insert(new_state.clone());
885                Some(prev_mc_seqno)
886            }
887        };
888
889        let mut index = self.mc_seqno_to_block_ids.lock();
890
891        // Remove previous entry if exists
892        if let Some(prev_mc_seqno) = prev_mc_seqno
893            && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
894        {
895            entry.get_mut().remove(block_id);
896            if entry.get().is_empty() {
897                entry.remove();
898            }
899        }
900
901        index.entry(mc_seqno).or_default().insert(*block_id);
902
903        Ok(PersistentState {
904            block_id: *block_id,
905            kind,
906            cached: new_state,
907        })
908    }
909
910    fn clear_cache(&self, block_id: &BlockId) {
911        self.descriptor_cache.remove(&CacheKey {
912            block_id: *block_id,
913            kind: PersistentStateKind::Shard,
914        });
915        self.descriptor_cache.remove(&CacheKey {
916            block_id: *block_id,
917            kind: PersistentStateKind::Queue,
918        });
919    }
920}
921
922#[derive(Debug, Clone, Copy)]
923pub struct PersistentStateInfo {
924    pub size: NonZeroU64,
925    pub chunk_size: NonZeroU32,
926}
927
928#[derive(Clone)]
929pub struct PersistentState {
930    block_id: BlockId,
931    kind: PersistentStateKind,
932    cached: Arc<CachedState>,
933}
934
935impl PersistentState {
936    pub fn block_id(&self) -> &BlockId {
937        &self.block_id
938    }
939
940    pub fn kind(&self) -> PersistentStateKind {
941        self.kind
942    }
943
944    pub fn file(&self) -> &MappedFile {
945        &self.cached.file
946    }
947
948    pub fn mc_seqno(&self) -> u32 {
949        self.cached.mc_seqno
950    }
951}
952
953pub struct PersistentStateReceiver {
954    id: usize,
955    inner: Weak<Inner>,
956    receiver: mpsc::Receiver<PersistentState>,
957}
958
959impl std::ops::Deref for PersistentStateReceiver {
960    type Target = mpsc::Receiver<PersistentState>;
961
962    #[inline]
963    fn deref(&self) -> &Self::Target {
964        &self.receiver
965    }
966}
967
968impl std::ops::DerefMut for PersistentStateReceiver {
969    #[inline]
970    fn deref_mut(&mut self) -> &mut Self::Target {
971        &mut self.receiver
972    }
973}
974
975impl Drop for PersistentStateReceiver {
976    fn drop(&mut self) {
977        if let Some(inner) = self.inner.upgrade() {
978            let _guard = inner.subscriptions_mutex.lock();
979            let mut subscriptions = inner.subscriptions.load_full();
980            {
981                let subscriptions = Arc::make_mut(&mut subscriptions);
982                subscriptions.remove(&self.id);
983            }
984            inner.subscriptions.store(subscriptions);
985        }
986    }
987}
988
989static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0);
990
991struct CachedState {
992    mc_seqno: u32,
993    file: MappedFile,
994}
995
996#[derive(Default)]
997struct HandlesQueue {
998    handles: VecDeque<BlockHandle>,
999}
1000
1001impl HandlesQueue {
1002    fn oldest_known(&self) -> Option<&BlockHandle> {
1003        self.handles.back()
1004    }
1005
1006    fn push(&mut self, new_handle: BlockHandle) -> bool {
1007        // Allow only new blocks
1008        if let Some(newest) = self.handles.front()
1009            && newest.id().seqno >= new_handle.id().seqno
1010        {
1011            return false;
1012        }
1013
1014        // Remove too old states
1015        let now_utime = new_handle.gen_utime();
1016        let mut has_suitable = false;
1017        self.handles.retain(|old_handle| {
1018            if !has_suitable {
1019                has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
1020                true
1021            } else {
1022                false
1023            }
1024        });
1025
1026        // Add the new one
1027        self.handles.push_front(new_handle);
1028        true
1029    }
1030}
1031
1032const STATE_CHUNK_SIZE: u64 = 1024 * 1024; // 1 MB