tycho_core/storage/persistent_state/
mod.rs

1use std::collections::{BTreeMap, VecDeque};
2use std::fs::File;
3use std::io::{Seek, Write};
4use std::num::{NonZeroU32, NonZeroU64};
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Weak};
8
9use anyhow::{Context, Result};
10use arc_swap::{ArcSwap, ArcSwapAny};
11use dashmap::DashMap;
12use parking_lot::Mutex;
13use tokio::sync::{Notify, Semaphore, mpsc};
14use tokio::time::Instant;
15use tycho_block_util::block::BlockStuff;
16use tycho_block_util::queue::QueueStateHeader;
17use tycho_block_util::state::RefMcStateHandle;
18use tycho_storage::fs::{Dir, MappedFile};
19use tycho_types::models::{BlockId, PrevBlockRef};
20use tycho_util::sync::CancellationFlag;
21use tycho_util::{FastHashMap, FastHashSet};
22
23pub use self::queue_state::reader::{QueueDiffReader, QueueStateReader};
24pub use self::queue_state::writer::QueueStateWriter;
25pub use self::shard_state::reader::{BriefBocHeader, ShardStateReader};
26pub use self::shard_state::writer::ShardStateWriter;
27use super::{
28    BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, KeyBlocksDirection, ShardStateStorage,
29};
30
31mod queue_state {
32    pub mod reader;
33    pub mod writer;
34}
35mod shard_state {
36    pub mod reader;
37    pub mod writer;
38}
39
40#[cfg(test)]
41mod tests;
42
43const BASE_DIR: &str = "states";
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
46pub enum PersistentStateKind {
47    Shard,
48    Queue,
49}
50
51impl PersistentStateKind {
52    pub fn make_file_name(&self, block_id: &BlockId) -> PathBuf {
53        match self {
54            Self::Shard => ShardStateWriter::file_name(block_id),
55            Self::Queue => QueueStateWriter::file_name(block_id),
56        }
57    }
58
59    pub fn make_temp_file_name(&self, block_id: &BlockId) -> PathBuf {
60        match self {
61            Self::Shard => ShardStateWriter::temp_file_name(block_id),
62            Self::Queue => QueueStateWriter::temp_file_name(block_id),
63        }
64    }
65
66    pub fn from_extension(extension: &str) -> Option<Self> {
67        match extension {
68            ShardStateWriter::FILE_EXTENSION => Some(Self::Shard),
69            QueueStateWriter::FILE_EXTENSION => Some(Self::Queue),
70            _ => None,
71        }
72    }
73}
74
75#[derive(Debug, Eq, Hash, PartialEq)]
76struct CacheKey {
77    block_id: BlockId,
78    kind: PersistentStateKind,
79}
80
81#[derive(Clone)]
82pub struct PersistentStateStorage {
83    inner: Arc<Inner>,
84}
85
86impl PersistentStateStorage {
87    pub fn new(
88        cells_db: CellsDb,
89        files_dir: &Dir,
90        block_handle_storage: Arc<BlockHandleStorage>,
91        block_storage: Arc<BlockStorage>,
92        shard_state_storage: Arc<ShardStateStorage>,
93    ) -> Result<Self> {
94        const MAX_PARALLEL_CHUNK_READS: usize = 20;
95
96        let storage_dir = files_dir.create_subdir(BASE_DIR)?;
97
98        Ok(Self {
99            inner: Arc::new(Inner {
100                cells_db,
101                storage_dir,
102                block_handles: block_handle_storage,
103                blocks: block_storage,
104                shard_states: shard_state_storage,
105                descriptor_cache: Default::default(),
106                mc_seqno_to_block_ids: Default::default(),
107                chunks_semaphore: Arc::new(Semaphore::new(MAX_PARALLEL_CHUNK_READS)),
108                handles_queue: Default::default(),
109                oldest_ps_changed: Default::default(),
110                oldest_ps_handle: Default::default(),
111                subscriptions: Default::default(),
112                subscriptions_mutex: Default::default(),
113            }),
114        })
115    }
116
117    pub fn load_oldest_known_handle(&self) -> Option<BlockHandle> {
118        self.inner.oldest_ps_handle.load_full()
119    }
120
121    pub fn oldest_known_handle_changed(&self) -> tokio::sync::futures::Notified<'_> {
122        self.inner.oldest_ps_changed.notified()
123    }
124
125    #[tracing::instrument(skip_all)]
126    pub async fn preload(&self) -> Result<()> {
127        self.preload_handles_queue()?;
128        self.preload_states().await
129    }
130
131    fn preload_handles_queue(&self) -> Result<()> {
132        let this = self.inner.as_ref();
133
134        let block_handles = this.block_handles.as_ref();
135
136        let mut changed = false;
137        let mut prev_utime = 0;
138        for block_id in block_handles.key_blocks_iterator(KeyBlocksDirection::ForwardFrom(0)) {
139            let block_handle = block_handles
140                .load_handle(&block_id)
141                .context("key block handle not found")?;
142
143            let gen_utime = block_handle.gen_utime();
144            if BlockStuff::compute_is_persistent(gen_utime, prev_utime) {
145                prev_utime = gen_utime;
146
147                let mut queue = this.handles_queue.lock();
148                if queue.push(block_handle) {
149                    this.oldest_ps_handle.store(queue.oldest_known().cloned());
150                    changed = true;
151                }
152            }
153        }
154
155        if changed {
156            this.oldest_ps_changed.notify_waiters();
157        }
158        Ok(())
159    }
160
161    async fn preload_states(&self) -> Result<()> {
162        // For each mc_seqno directory
163        let process_states = |this: &Inner, dir: &PathBuf, mc_seqno: u32| -> Result<()> {
164            'outer: for entry in std::fs::read_dir(dir)?.flatten() {
165                let path = entry.path();
166                // Skip subdirectories
167                if path.is_dir() {
168                    tracing::warn!(path = %path.display(), "unexpected directory");
169                    continue;
170                }
171
172                'file: {
173                    // Try to parse the file name as a block_id
174                    let Ok(block_id) = path
175                        // TODO should use file_prefix
176                        .file_stem()
177                        .unwrap_or_default()
178                        .to_str()
179                        .unwrap_or_default()
180                        .parse::<BlockId>()
181                    else {
182                        break 'file;
183                    };
184
185                    let extension = path
186                        .extension()
187                        .and_then(|ext| ext.to_str())
188                        .unwrap_or_default();
189
190                    let Some(cache_type) = PersistentStateKind::from_extension(extension) else {
191                        break 'file;
192                    };
193
194                    this.cache_state(mc_seqno, &block_id, cache_type)?;
195                    continue 'outer;
196                }
197                tracing::warn!(path = %path.display(), "unexpected file");
198            }
199            Ok(())
200        };
201
202        let this = self.inner.clone();
203        let span = tracing::Span::current();
204        tokio::task::spawn_blocking(move || {
205            let _span = span.enter();
206
207            // For each entry in the storage directory
208            'outer: for entry in this.storage_dir.entries()?.flatten() {
209                let path = entry.path();
210                // Skip files
211                if path.is_file() {
212                    tracing::warn!(path = %path.display(), "unexpected file");
213                    continue;
214                }
215
216                'dir: {
217                    // Try to parse the directory name as an mc_seqno
218                    let Ok(name) = entry.file_name().into_string() else {
219                        break 'dir;
220                    };
221                    let Ok(mc_seqno) = name.parse::<u32>() else {
222                        break 'dir;
223                    };
224
225                    // Try to load files in the directory as persistent states
226                    process_states(&this, &path, mc_seqno)?;
227                    continue 'outer;
228                }
229                tracing::warn!(path = %path.display(), "unexpected directory");
230            }
231
232            Ok(())
233        })
234        .await?
235    }
236
237    // NOTE: This is intentionally a method, not a constant because
238    // it might be useful to allow configure it during the first run.
239    pub fn state_chunk_size(&self) -> NonZeroU32 {
240        NonZeroU32::new(STATE_CHUNK_SIZE as _).unwrap()
241    }
242
243    pub fn subscribe(&self) -> (Vec<PersistentState>, PersistentStateReceiver) {
244        let id = RECEIVER_ID.fetch_add(1, Ordering::Relaxed);
245        let (sender, receiver) = mpsc::channel(1);
246
247        // TODO: Hold `_guard` for the whole method body? So that we can know
248        // that no states will be send to subscriptions while we are collecting
249        // the current cache snapshot.
250        {
251            let _guard = self.inner.subscriptions_mutex.lock();
252            let mut subscriptions = self.inner.subscriptions.load_full();
253            {
254                let subscriptions = Arc::make_mut(&mut subscriptions);
255                let prev = subscriptions.insert(id, sender);
256                assert!(
257                    prev.is_none(),
258                    "persistent state subscription must be unique"
259                );
260            }
261            self.inner.subscriptions.store(subscriptions);
262        }
263
264        let receiver = PersistentStateReceiver {
265            id,
266            inner: Arc::downgrade(&self.inner),
267            receiver,
268        };
269
270        let initial_states = self
271            .inner
272            .descriptor_cache
273            .iter()
274            .map(|item| PersistentState {
275                block_id: item.key().block_id,
276                kind: item.key().kind,
277                cached: item.value().clone(),
278            })
279            .collect();
280
281        (initial_states, receiver)
282    }
283
284    pub fn state_exists(&self, block_id: &BlockId, kind: PersistentStateKind) -> bool {
285        self.inner.descriptor_cache.contains_key(&CacheKey {
286            block_id: *block_id,
287            kind,
288        })
289    }
290
291    pub fn get_state_info(
292        &self,
293        block_id: &BlockId,
294        kind: PersistentStateKind,
295    ) -> Option<PersistentStateInfo> {
296        self.inner
297            .descriptor_cache
298            .get(&CacheKey {
299                block_id: *block_id,
300                kind,
301            })
302            .and_then(|cached| {
303                let size = NonZeroU64::new(cached.file.length() as u64)?;
304                Some(PersistentStateInfo {
305                    size,
306                    chunk_size: self.state_chunk_size(),
307                })
308            })
309    }
310
311    pub async fn read_state_part(
312        &self,
313        block_id: &BlockId,
314        offset: u64,
315        state_kind: PersistentStateKind,
316    ) -> Option<Vec<u8>> {
317        // NOTE: Should be noop on x64
318        let offset = usize::try_from(offset).ok()?;
319        let chunk_size = self.state_chunk_size().get() as usize;
320        if offset % chunk_size != 0 {
321            return None;
322        }
323
324        let permit = {
325            let semaphore = self.inner.chunks_semaphore.clone();
326            semaphore.acquire_owned().await.ok()?
327        };
328
329        let key = CacheKey {
330            block_id: *block_id,
331            kind: state_kind,
332        };
333        let cached = self.inner.descriptor_cache.get(&key)?.clone();
334        if offset > cached.file.length() {
335            return None;
336        }
337
338        // NOTE: Cached file is a mapped file, therefore it can take a while to read from it.
339        // NOTE: `spawn_blocking` is called here because it is mostly IO-bound operation.
340        // TODO: Add semaphore to limit the number of concurrent operations.
341        tokio::task::spawn_blocking(move || {
342            // Ensure that permit is dropped only after cached state is used.
343            let _permit = permit;
344
345            let end = std::cmp::min(offset.saturating_add(chunk_size), cached.file.length());
346            cached.file.as_slice()[offset..end].to_vec()
347        })
348        .await
349        .ok()
350    }
351
352    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
353    pub async fn store_shard_state(
354        &self,
355        mc_seqno: u32,
356        handle: &BlockHandle,
357        tracker_handle: RefMcStateHandle,
358    ) -> Result<()> {
359        if self
360            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
361            .await?
362        {
363            return Ok(());
364        }
365
366        let cancelled = CancellationFlag::new();
367        scopeguard::defer! {
368            cancelled.cancel();
369        }
370
371        let handle = handle.clone();
372        let this = self.inner.clone();
373        let cancelled = cancelled.clone();
374        let span = tracing::Span::current();
375
376        let state = tokio::task::spawn_blocking(move || {
377            let _span = span.enter();
378
379            let guard = scopeguard::guard((), |_| {
380                tracing::warn!("cancelled");
381            });
382
383            // NOTE: Ensure that the tracker handle will outlive the state writer.
384            let _tracker_handle = tracker_handle;
385
386            let root_hash = this.shard_states.load_state_root_hash(handle.id())?;
387
388            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
389
390            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
391            match cell_writer.write(&root_hash, Some(&cancelled)) {
392                Ok(()) => {
393                    this.block_handles.set_has_persistent_shard_state(&handle);
394                    tracing::info!("persistent shard state saved");
395                }
396                Err(e) => {
397                    // NOTE: We are ignoring an error here. It might be intentional
398                    tracing::error!("failed to write persistent shard state: {e:?}");
399                }
400            }
401
402            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
403
404            scopeguard::ScopeGuard::into_inner(guard);
405            Ok::<_, anyhow::Error>(state)
406        })
407        .await??;
408
409        self.notify_with_persistent_state(&state).await;
410        Ok(())
411    }
412
413    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
414    pub async fn store_shard_state_file(
415        &self,
416        mc_seqno: u32,
417        handle: &BlockHandle,
418        file: File,
419    ) -> Result<()> {
420        if self
421            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Shard)
422            .await?
423        {
424            return Ok(());
425        }
426
427        let cancelled = CancellationFlag::new();
428        scopeguard::defer! {
429            cancelled.cancel();
430        }
431
432        let handle = handle.clone();
433        let this = self.inner.clone();
434        let cancelled = cancelled.clone();
435        let span = tracing::Span::current();
436
437        let state = tokio::task::spawn_blocking(move || {
438            let _span = span.enter();
439
440            let guard = scopeguard::guard((), |_| {
441                tracing::warn!("cancelled");
442            });
443
444            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
445
446            let cell_writer = ShardStateWriter::new(&this.cells_db, &states_dir, handle.id());
447            cell_writer.write_file(file, Some(&cancelled))?;
448            this.block_handles.set_has_persistent_shard_state(&handle);
449            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Shard)?;
450
451            scopeguard::ScopeGuard::into_inner(guard);
452            Ok::<_, anyhow::Error>(state)
453        })
454        .await??;
455
456        self.notify_with_persistent_state(&state).await;
457        Ok(())
458    }
459
460    #[tracing::instrument(skip_all, fields(mc_seqno = mc_seqno, block_id = %block.id()))]
461    pub async fn store_queue_state(
462        &self,
463        mc_seqno: u32,
464        handle: &BlockHandle,
465        block: BlockStuff,
466    ) -> Result<()> {
467        if self
468            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
469            .await?
470        {
471            return Ok(());
472        }
473
474        let this = self.inner.clone();
475
476        let shard_ident = handle.id().shard;
477
478        let mut queue_diffs = Vec::new();
479        let mut messages = Vec::new();
480
481        let mut top_block_handle = handle.clone();
482        let mut top_block = block;
483
484        let mut tail_len = top_block.block().out_msg_queue_updates.tail_len as usize;
485
486        while tail_len > 0 {
487            let queue_diff = this.blocks.load_queue_diff(&top_block_handle).await?;
488            let top_block_info = top_block.load_info()?;
489
490            let block_extra = top_block.load_extra()?;
491            let out_messages = block_extra.load_out_msg_description()?;
492
493            messages.push(queue_diff.zip(&out_messages));
494            queue_diffs.push(queue_diff.diff().clone());
495
496            if tail_len == 1 {
497                break;
498            }
499
500            let prev_block_id = match top_block_info.load_prev_ref()? {
501                PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident),
502                PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),
503            };
504
505            let Some(prev_block_handle) = this.block_handles.load_handle(&prev_block_id) else {
506                anyhow::bail!("prev block handle not found for: {prev_block_id}");
507            };
508            let prev_block = this.blocks.load_block_data(&prev_block_handle).await?;
509
510            top_block_handle = prev_block_handle;
511            top_block = prev_block;
512            tail_len -= 1;
513        }
514
515        let state = QueueStateHeader {
516            shard_ident,
517            seqno: handle.id().seqno,
518            queue_diffs,
519        };
520
521        let cancelled = CancellationFlag::new();
522        scopeguard::defer! {
523            cancelled.cancel();
524        }
525
526        let handle = handle.clone();
527        let cancelled = cancelled.clone();
528        let span = tracing::Span::current();
529
530        let state = tokio::task::spawn_blocking(move || {
531            let _span = span.enter();
532
533            let guard = scopeguard::guard((), |_| {
534                tracing::warn!("cancelled");
535            });
536
537            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
538            match QueueStateWriter::new(&states_dir, handle.id(), state, messages)
539                .write(Some(&cancelled))
540            {
541                Ok(()) => {
542                    this.block_handles.set_has_persistent_queue_state(&handle);
543                    tracing::info!("persistent queue state saved");
544                }
545                Err(e) => {
546                    tracing::error!("failed to write persistent queue state: {e:?}");
547                }
548            }
549
550            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
551
552            scopeguard::ScopeGuard::into_inner(guard);
553            Ok::<_, anyhow::Error>(state)
554        })
555        .await??;
556
557        self.notify_with_persistent_state(&state).await;
558        Ok(())
559    }
560
561    #[tracing::instrument(skip_all, fields(mc_seqno, block_id = %handle.id()))]
562    pub async fn store_queue_state_file(
563        &self,
564        mc_seqno: u32,
565        handle: &BlockHandle,
566        file: File,
567    ) -> Result<()> {
568        if self
569            .try_reuse_persistent_state(mc_seqno, handle, PersistentStateKind::Queue)
570            .await?
571        {
572            return Ok(());
573        }
574
575        let cancelled = CancellationFlag::new();
576        scopeguard::defer! {
577            cancelled.cancel();
578        }
579
580        let handle = handle.clone();
581        let this = self.inner.clone();
582        let cancelled = cancelled.clone();
583        let span = tracing::Span::current();
584
585        let state = tokio::task::spawn_blocking(move || {
586            let _span = span.enter();
587
588            let guard = scopeguard::guard((), |_| {
589                tracing::warn!("cancelled");
590            });
591
592            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
593
594            QueueStateWriter::write_file(&states_dir, handle.id(), file, Some(&cancelled))?;
595            this.block_handles.set_has_persistent_queue_state(&handle);
596            let state = this.cache_state(mc_seqno, handle.id(), PersistentStateKind::Queue)?;
597
598            scopeguard::ScopeGuard::into_inner(guard);
599            Ok::<_, anyhow::Error>(state)
600        })
601        .await??;
602
603        self.notify_with_persistent_state(&state).await;
604        Ok(())
605    }
606
607    pub async fn rotate_persistent_states(&self, top_handle: &BlockHandle) -> Result<()> {
608        anyhow::ensure!(
609            top_handle.is_masterchain(),
610            "top persistent state handle must be in the masterchain"
611        );
612
613        {
614            tracing::info!(
615                mc_block_id = %top_handle.id(),
616                "adding new persistent state to the queue"
617            );
618
619            let mut queue = self.inner.handles_queue.lock();
620            if queue.push(top_handle.clone()) {
621                self.inner
622                    .oldest_ps_handle
623                    .store(queue.oldest_known().cloned());
624                self.inner.oldest_ps_changed.notify_waiters();
625            }
626        }
627
628        tracing::info!("started clearing old persistent state directories");
629        let start = Instant::now();
630        scopeguard::defer! {
631            tracing::info!(
632                elapsed = %humantime::format_duration(start.elapsed()),
633                "clearing old persistent state directories completed"
634            );
635        }
636
637        let this = self.inner.clone();
638        let mut top_handle = top_handle.clone();
639        if top_handle.id().seqno == 0 {
640            // Nothing to clear for the zerostate
641            return Ok(());
642        }
643
644        let span = tracing::Span::current();
645        tokio::task::spawn_blocking(move || {
646            let _span = span.enter();
647
648            let block_handles = &this.block_handles;
649
650            let now_utime = top_handle.gen_utime();
651
652            // Find a state before the
653            let mut has_suitable = false;
654            loop {
655                match block_handles.find_prev_persistent_key_block(top_handle.id().seqno) {
656                    // Find the newest usable persistent state...
657                    Some(handle) if !has_suitable => {
658                        has_suitable |= BlockStuff::can_use_for_boot(handle.gen_utime(), now_utime);
659                        top_handle = handle;
660                    }
661                    // ...and return the previous one.
662                    Some(handle) => {
663                        top_handle = handle;
664                        break;
665                    }
666                    // Or do nothing if not found.
667                    None => return Ok(()),
668                }
669            }
670
671            // Remove cached states
672            let mut index = this.mc_seqno_to_block_ids.lock();
673            index.retain(|&mc_seqno, block_ids| {
674                if mc_seqno >= top_handle.id().seqno || mc_seqno == 0 {
675                    return true;
676                }
677
678                for block_id in block_ids.drain() {
679                    // TODO: Clear flag in block handle
680                    this.clear_cache(&block_id);
681                }
682                false
683            });
684
685            // Remove files
686            this.clear_outdated_state_entries(top_handle.id())
687        })
688        .await?
689    }
690
691    async fn try_reuse_persistent_state(
692        &self,
693        mc_seqno: u32,
694        handle: &BlockHandle,
695        kind: PersistentStateKind,
696    ) -> Result<bool> {
697        // Check if there is anything to reuse (return false if nothing)
698        match kind {
699            PersistentStateKind::Shard if !handle.has_persistent_shard_state() => return Ok(false),
700            PersistentStateKind::Queue if !handle.has_persistent_queue_state() => return Ok(false),
701            _ => {}
702        }
703
704        let block_id = *handle.id();
705
706        let Some(cached) = self
707            .inner
708            .descriptor_cache
709            .get(&CacheKey { block_id, kind })
710            .map(|r| r.clone())
711        else {
712            // Nothing to reuse
713            return Ok(false);
714        };
715
716        if cached.mc_seqno >= mc_seqno {
717            // We already have the recent enough state
718            return Ok(true);
719        }
720
721        let this = self.inner.clone();
722
723        let span = tracing::Span::current();
724        let state = tokio::task::spawn_blocking(move || {
725            let _span = span.enter();
726
727            let states_dir = this.prepare_persistent_states_dir(mc_seqno)?;
728
729            let temp_file = states_dir.file(kind.make_temp_file_name(&block_id));
730            std::fs::write(temp_file.path(), cached.file.as_slice())?;
731            temp_file.rename(kind.make_file_name(&block_id))?;
732
733            drop(cached);
734
735            this.cache_state(mc_seqno, &block_id, kind)
736        })
737        .await??;
738
739        self.notify_with_persistent_state(&state).await;
740        Ok(true)
741    }
742
743    async fn notify_with_persistent_state(&self, state: &PersistentState) {
744        let subscriptions = self.inner.subscriptions.load_full();
745        for sender in subscriptions.values() {
746            sender.send(state.clone()).await.ok();
747        }
748    }
749}
750
751struct Inner {
752    cells_db: CellsDb,
753    storage_dir: Dir,
754    block_handles: Arc<BlockHandleStorage>,
755    blocks: Arc<BlockStorage>,
756    shard_states: Arc<ShardStateStorage>,
757    descriptor_cache: DashMap<CacheKey, Arc<CachedState>>,
758    mc_seqno_to_block_ids: Mutex<BTreeMap<u32, FastHashSet<BlockId>>>,
759    chunks_semaphore: Arc<Semaphore>,
760    handles_queue: Mutex<HandlesQueue>,
761    oldest_ps_changed: Notify,
762    oldest_ps_handle: ArcSwapAny<Option<BlockHandle>>,
763    subscriptions: ArcSwap<FastHashMap<usize, mpsc::Sender<PersistentState>>>,
764    subscriptions_mutex: Mutex<()>,
765}
766
767impl Inner {
768    fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result<Dir> {
769        let states_dir = self.mc_states_dir(mc_seqno);
770        if !states_dir.path().is_dir() {
771            tracing::info!(mc_seqno, "creating persistent state directory");
772            states_dir.create_if_not_exists()?;
773        }
774        Ok(states_dir)
775    }
776
777    fn mc_states_dir(&self, mc_seqno: u32) -> Dir {
778        Dir::new_readonly(self.storage_dir.path().join(mc_seqno.to_string()))
779    }
780
781    fn clear_outdated_state_entries(&self, recent_block_id: &BlockId) -> Result<()> {
782        let mut directories_to_remove: Vec<PathBuf> = Vec::new();
783        let mut files_to_remove: Vec<PathBuf> = Vec::new();
784
785        for entry in self.storage_dir.entries()?.flatten() {
786            let path = entry.path();
787
788            if path.is_file() {
789                files_to_remove.push(path);
790                continue;
791            }
792
793            let Ok(name) = entry.file_name().into_string() else {
794                directories_to_remove.push(path);
795                continue;
796            };
797
798            let is_recent = matches!(
799                name.parse::<u32>(),
800                Ok(seqno) if seqno >= recent_block_id.seqno || seqno == 0
801            );
802            if !is_recent {
803                directories_to_remove.push(path);
804            }
805        }
806
807        for dir in directories_to_remove {
808            tracing::info!(dir = %dir.display(), "removing an old persistent state directory");
809            if let Err(e) = std::fs::remove_dir_all(&dir) {
810                tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}");
811            }
812        }
813
814        for file in files_to_remove {
815            tracing::info!(file = %file.display(), "removing file");
816            if let Err(e) = std::fs::remove_file(&file) {
817                tracing::error!(file = %file.display(), "failed to remove file: {e:?}");
818            }
819        }
820
821        Ok(())
822    }
823
824    fn cache_state(
825        &self,
826        mc_seqno: u32,
827        block_id: &BlockId,
828        kind: PersistentStateKind,
829    ) -> Result<PersistentState> {
830        use std::collections::btree_map;
831
832        use dashmap::mapref::entry::Entry;
833
834        let key = CacheKey {
835            block_id: *block_id,
836            kind,
837        };
838
839        let load_mapped = || {
840            let mut file = self
841                .mc_states_dir(mc_seqno)
842                .file(kind.make_file_name(block_id))
843                .read(true)
844                .open()?;
845
846            // We create a copy of the original file here to make sure
847            // that the underlying mapped file will not be changed outside
848            // of the node. Otherwise it will randomly fail with exit code 7/BUS.
849            let mut temp_file = tempfile::tempfile_in(self.storage_dir.path())
850                .context("failed to create a temp file")?;
851
852            // Underlying implementation will call something like `copy_file_range`,
853            // and we hope that it will be just COW pages.
854            // TODO: Find a way to cancel this operation.
855            std::io::copy(&mut file, &mut temp_file).context("failed to copy a temp file")?;
856            temp_file.flush()?;
857            temp_file.seek(std::io::SeekFrom::Start(0))?;
858
859            MappedFile::from_existing_file(temp_file).context("failed to map a temp file")
860        };
861
862        let file =
863            load_mapped().with_context(|| format!("failed to cache {kind:?} for {block_id}"))?;
864
865        let new_state = Arc::new(CachedState { mc_seqno, file });
866
867        let prev_mc_seqno = match self.descriptor_cache.entry(key) {
868            Entry::Vacant(entry) => {
869                entry.insert(new_state.clone());
870                None
871            }
872            Entry::Occupied(mut entry) => {
873                let prev_mc_seqno = entry.get().mc_seqno;
874                if mc_seqno <= prev_mc_seqno {
875                    // Cache only the most recent block (if changed)
876                    return Ok(PersistentState {
877                        block_id: *block_id,
878                        kind,
879                        cached: entry.get().clone(),
880                    });
881                }
882
883                entry.insert(new_state.clone());
884                Some(prev_mc_seqno)
885            }
886        };
887
888        let mut index = self.mc_seqno_to_block_ids.lock();
889
890        // Remove previous entry if exists
891        if let Some(prev_mc_seqno) = prev_mc_seqno
892            && let btree_map::Entry::Occupied(mut entry) = index.entry(prev_mc_seqno)
893        {
894            entry.get_mut().remove(block_id);
895            if entry.get().is_empty() {
896                entry.remove();
897            }
898        }
899
900        index.entry(mc_seqno).or_default().insert(*block_id);
901
902        Ok(PersistentState {
903            block_id: *block_id,
904            kind,
905            cached: new_state,
906        })
907    }
908
909    fn clear_cache(&self, block_id: &BlockId) {
910        self.descriptor_cache.remove(&CacheKey {
911            block_id: *block_id,
912            kind: PersistentStateKind::Shard,
913        });
914        self.descriptor_cache.remove(&CacheKey {
915            block_id: *block_id,
916            kind: PersistentStateKind::Queue,
917        });
918    }
919}
920
921#[derive(Debug, Clone, Copy)]
922pub struct PersistentStateInfo {
923    pub size: NonZeroU64,
924    pub chunk_size: NonZeroU32,
925}
926
927#[derive(Clone)]
928pub struct PersistentState {
929    block_id: BlockId,
930    kind: PersistentStateKind,
931    cached: Arc<CachedState>,
932}
933
934impl PersistentState {
935    pub fn block_id(&self) -> &BlockId {
936        &self.block_id
937    }
938
939    pub fn kind(&self) -> PersistentStateKind {
940        self.kind
941    }
942
943    pub fn file(&self) -> &MappedFile {
944        &self.cached.file
945    }
946
947    pub fn mc_seqno(&self) -> u32 {
948        self.cached.mc_seqno
949    }
950}
951
952pub struct PersistentStateReceiver {
953    id: usize,
954    inner: Weak<Inner>,
955    receiver: mpsc::Receiver<PersistentState>,
956}
957
958impl std::ops::Deref for PersistentStateReceiver {
959    type Target = mpsc::Receiver<PersistentState>;
960
961    #[inline]
962    fn deref(&self) -> &Self::Target {
963        &self.receiver
964    }
965}
966
967impl std::ops::DerefMut for PersistentStateReceiver {
968    #[inline]
969    fn deref_mut(&mut self) -> &mut Self::Target {
970        &mut self.receiver
971    }
972}
973
974impl Drop for PersistentStateReceiver {
975    fn drop(&mut self) {
976        if let Some(inner) = self.inner.upgrade() {
977            let _guard = inner.subscriptions_mutex.lock();
978            let mut subscriptions = inner.subscriptions.load_full();
979            {
980                let subscriptions = Arc::make_mut(&mut subscriptions);
981                subscriptions.remove(&self.id);
982            }
983            inner.subscriptions.store(subscriptions);
984        }
985    }
986}
987
988static RECEIVER_ID: AtomicUsize = AtomicUsize::new(0);
989
990struct CachedState {
991    mc_seqno: u32,
992    file: MappedFile,
993}
994
995#[derive(Default)]
996struct HandlesQueue {
997    handles: VecDeque<BlockHandle>,
998}
999
1000impl HandlesQueue {
1001    fn oldest_known(&self) -> Option<&BlockHandle> {
1002        self.handles.back()
1003    }
1004
1005    fn push(&mut self, new_handle: BlockHandle) -> bool {
1006        // Allow only new blocks
1007        if let Some(newest) = self.handles.front()
1008            && newest.id().seqno >= new_handle.id().seqno
1009        {
1010            return false;
1011        }
1012
1013        // Remove too old states
1014        let now_utime = new_handle.gen_utime();
1015        let mut has_suitable = false;
1016        self.handles.retain(|old_handle| {
1017            if !has_suitable {
1018                has_suitable |= BlockStuff::can_use_for_boot(old_handle.gen_utime(), now_utime);
1019                true
1020            } else {
1021                false
1022            }
1023        });
1024
1025        // Add the new one
1026        self.handles.push_front(new_handle);
1027        true
1028    }
1029}
1030
1031const STATE_CHUNK_SIZE: u64 = 1024 * 1024; // 1 MB