Skip to main content

forest/db/car/
many.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! The [`ManyCar`] block store is the union of `N` read-only CAR-backed block
5//! stores and a single writable block store. Get requests are forwarded to each
6//! store (including the writable store) and the first hit is returned. Write
7//! requests are only forwarded to the writable store.
8//!
9//! A single z-frame cache is shared between all read-only stores.
10
11use super::{AnyCar, ZstdFrameCache};
12use crate::blocks::TipsetKey;
13use crate::db::parity_db::GarbageCollectableDb;
14use crate::db::{
15    BlockstoreWriteOpsSubscribable, EthMappingsStore, MemoryDB, PersistentStore, SettingsStore,
16    SettingsStoreExt,
17};
18use crate::libp2p_bitswap::BitswapStoreReadWrite;
19use crate::rpc::eth::types::EthHash;
20use crate::shim::clock::ChainEpoch;
21use crate::utils::ShallowClone as _;
22use crate::utils::io::EitherMmapOrRandomAccessFile;
23use crate::utils::multihash::prelude::*;
24use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
25use anyhow::Context as _;
26use cid::Cid;
27use fvm_ipld_blockstore::Blockstore;
28use parking_lot::RwLock;
29use std::{
30    cmp::Ord,
31    collections::BinaryHeap,
32    path::{Path, PathBuf},
33};
34
35struct WithHeaviestEpoch {
36    pub car: AnyCar<Box<dyn super::RandomAccessFileReader>>,
37    epoch: ChainEpoch,
38}
39
40impl WithHeaviestEpoch {
41    pub fn new(car: AnyCar<Box<dyn super::RandomAccessFileReader>>) -> anyhow::Result<Self> {
42        let epoch = car
43            .heaviest_tipset()
44            .context("store doesn't have a heaviest tipset")?
45            .epoch();
46        Ok(Self { car, epoch })
47    }
48}
49
50impl Ord for WithHeaviestEpoch {
51    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
52        self.epoch.cmp(&other.epoch)
53    }
54}
55
56impl Eq for WithHeaviestEpoch {}
57
58impl PartialOrd for WithHeaviestEpoch {
59    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
60        Some(self.cmp(other))
61    }
62}
63
64impl PartialEq for WithHeaviestEpoch {
65    fn eq(&self, other: &Self) -> bool {
66        self.epoch == other.epoch
67    }
68}
69
70pub struct ManyCar<WriterT = MemoryDB> {
71    shared_cache: RwLock<ZstdFrameCache>,
72    read_only: RwLock<BinaryHeap<WithHeaviestEpoch>>,
73    writer: WriterT,
74}
75
76impl<WriterT> ManyCar<WriterT> {
77    pub fn new(writer: WriterT) -> Self {
78        ManyCar {
79            shared_cache: RwLock::new(ZstdFrameCache::default()),
80            read_only: RwLock::new(BinaryHeap::default()),
81            writer,
82        }
83    }
84
85    pub fn writer(&self) -> &WriterT {
86        &self.writer
87    }
88}
89
90impl<WriterT: Default> Default for ManyCar<WriterT> {
91    fn default() -> Self {
92        Self::new(Default::default())
93    }
94}
95
96impl<WriterT> ManyCar<WriterT> {
97    pub fn with_read_only<ReaderT: super::RandomAccessFileReader>(
98        self,
99        any_car: AnyCar<ReaderT>,
100    ) -> anyhow::Result<Self> {
101        self.read_only(any_car)?;
102        Ok(self)
103    }
104
105    pub fn read_only<ReaderT: super::RandomAccessFileReader>(
106        &self,
107        any_car: AnyCar<ReaderT>,
108    ) -> anyhow::Result<()> {
109        let mut read_only = self.read_only.write();
110        Self::read_only_inner(
111            &mut read_only,
112            self.shared_cache.read().shallow_clone(),
113            any_car,
114        )
115    }
116
117    fn read_only_inner<ReaderT: super::RandomAccessFileReader>(
118        read_only: &mut BinaryHeap<WithHeaviestEpoch>,
119        shared_cache: ZstdFrameCache,
120        any_car: AnyCar<ReaderT>,
121    ) -> anyhow::Result<()> {
122        let key = read_only.len() as u64;
123        read_only.push(WithHeaviestEpoch::new(
124            any_car.with_cache(shared_cache, key).into_dyn(),
125        )?);
126        Ok(())
127    }
128
129    pub fn with_read_only_files(
130        self,
131        files: impl Iterator<Item = PathBuf>,
132    ) -> anyhow::Result<Self> {
133        self.read_only_files(files)?;
134        Ok(self)
135    }
136
137    pub fn read_only_files(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
138        for file in files {
139            self.read_only_file(file)?;
140        }
141        Ok(())
142    }
143
144    pub fn read_only_file(&self, file: impl AsRef<Path>) -> anyhow::Result<()> {
145        (|| {
146            self.read_only(AnyCar::new(EitherMmapOrRandomAccessFile::open(
147                file.as_ref(),
148            )?)?)
149        })()
150        .with_context(|| format!("failed to load CAR at {}", file.as_ref().display()))
151    }
152
153    /// Reload `CAR` files after garbage collection.
154    pub fn clear_and_append_read_only_files(
155        &self,
156        files: impl Iterator<Item = PathBuf>,
157    ) -> anyhow::Result<()> {
158        let mut read_only = BinaryHeap::default();
159        let shared_cache = ZstdFrameCache::default();
160        for f in files {
161            let car = AnyCar::new(EitherMmapOrRandomAccessFile::open(f)?)?;
162            Self::read_only_inner(&mut read_only, shared_cache.shallow_clone(), car)?;
163        }
164        *self.read_only.write() = read_only;
165        *self.shared_cache.write() = shared_cache;
166        Ok(())
167    }
168
169    pub fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
170        Ok(self
171            .read_only
172            .read()
173            .peek()
174            .map(|w| AnyCar::heaviest_tipset_key(&w.car)))
175    }
176
177    pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
178        self.read_only
179            .read()
180            .peek()
181            .map(|w| AnyCar::heaviest_tipset(&w.car))
182            .context("ManyCar store doesn't have a heaviest tipset")?
183    }
184
185    /// Number of read-only `CAR`s
186    pub fn len(&self) -> usize {
187        self.read_only.read().len()
188    }
189}
190
191pub trait ReloadableManyCar {
192    fn clear_and_reload_cars(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()>;
193
194    fn heaviest_car_tipset(&self) -> anyhow::Result<Tipset>;
195}
196
197impl<T> ReloadableManyCar for ManyCar<T> {
198    fn clear_and_reload_cars(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
199        self.clear_and_append_read_only_files(files)
200    }
201
202    fn heaviest_car_tipset(&self) -> anyhow::Result<Tipset> {
203        self.heaviest_tipset()
204    }
205}
206
207impl<ReaderT: super::RandomAccessFileReader> TryFrom<AnyCar<ReaderT>> for ManyCar<MemoryDB> {
208    type Error = anyhow::Error;
209    fn try_from(any_car: AnyCar<ReaderT>) -> anyhow::Result<Self> {
210        ManyCar::default().with_read_only(any_car)
211    }
212}
213
214impl TryFrom<Vec<PathBuf>> for ManyCar<MemoryDB> {
215    type Error = anyhow::Error;
216    fn try_from(files: Vec<PathBuf>) -> anyhow::Result<Self> {
217        ManyCar::default().with_read_only_files(files.into_iter())
218    }
219}
220
221impl<WriterT: Blockstore> Blockstore for ManyCar<WriterT> {
222    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
223        // Theoretically it should be easily parallelizable with `rayon`.
224        // In practice, there is a massive performance loss when providing
225        // more than a single reader.
226        if let Ok(Some(value)) = self.writer.get(k) {
227            return Ok(Some(value));
228        }
229        for reader in self.read_only.read().iter() {
230            if let Some(val) = reader.car.get(k)? {
231                return Ok(Some(val));
232            }
233        }
234        Ok(None)
235    }
236
237    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
238        self.writer.put_keyed(k, block)
239    }
240}
241
242impl<WriterT: PersistentStore> PersistentStore for ManyCar<WriterT> {
243    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
244        self.writer.put_keyed_persistent(k, block)
245    }
246}
247
248impl<WriterT: BitswapStoreRead + Blockstore> BitswapStoreRead for ManyCar<WriterT> {
249    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
250        Blockstore::has(self, cid)
251    }
252
253    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
254        Blockstore::get(self, cid)
255    }
256}
257
258impl<WriterT: BitswapStoreReadWrite + Blockstore> BitswapStoreReadWrite for ManyCar<WriterT> {
259    type Hashes = MultihashCode;
260
261    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
262        self.put_keyed(block.cid(), block.data())
263    }
264}
265
266impl<WriterT: SettingsStore> SettingsStore for ManyCar<WriterT> {
267    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
268        SettingsStore::read_bin(self.writer(), key)
269    }
270
271    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
272        SettingsStore::write_bin(self.writer(), key, value)
273    }
274
275    fn exists(&self, key: &str) -> anyhow::Result<bool> {
276        SettingsStore::exists(self.writer(), key)
277    }
278
279    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
280        SettingsStore::setting_keys(self.writer())
281    }
282}
283
284impl<WriterT: EthMappingsStore> EthMappingsStore for ManyCar<WriterT> {
285    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
286        EthMappingsStore::read_bin(self.writer(), key)
287    }
288
289    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
290        EthMappingsStore::write_bin(self.writer(), key, value)
291    }
292
293    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
294        EthMappingsStore::exists(self.writer(), key)
295    }
296
297    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
298        EthMappingsStore::get_message_cids(self.writer())
299    }
300
301    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
302        EthMappingsStore::delete(self.writer(), keys)
303    }
304}
305
306impl<T: Blockstore + SettingsStore> super::super::HeaviestTipsetKeyProvider for ManyCar<T> {
307    fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
308        match SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)? {
309            Some(tsk) => Ok(Some(tsk)),
310            None => self.heaviest_tipset_key(),
311        }
312    }
313
314    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
315        SettingsStoreExt::write_obj(self, crate::db::setting_keys::HEAD_KEY, tsk)
316    }
317}
318
319impl<WriterT: BlockstoreWriteOpsSubscribable> BlockstoreWriteOpsSubscribable for ManyCar<WriterT> {
320    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
321        self.writer().subscribe_write_ops()
322    }
323
324    fn unsubscribe_write_ops(&self) {
325        self.writer().unsubscribe_write_ops()
326    }
327}
328
329impl<T: GarbageCollectableDb> GarbageCollectableDb for ManyCar<T> {
330    fn reset_gc_columns(&self) -> anyhow::Result<()> {
331        self.writer().reset_gc_columns()
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::super::AnyCar;
338    use super::*;
339    use crate::networks::{calibnet, mainnet};
340
341    #[test]
342    fn many_car_empty() {
343        let many = ManyCar::new(MemoryDB::default());
344        assert!(many.heaviest_tipset().is_err());
345    }
346
347    #[test]
348    fn many_car_idempotent() {
349        let many = ManyCar::new(MemoryDB::default())
350            .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
351            .unwrap()
352            .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
353            .unwrap();
354        assert_eq!(
355            many.heaviest_tipset().unwrap(),
356            AnyCar::try_from(mainnet::DEFAULT_GENESIS)
357                .unwrap()
358                .heaviest_tipset()
359                .unwrap()
360        );
361    }
362
363    #[test]
364    fn many_car_calibnet_heaviest() {
365        let many = ManyCar::try_from(AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap()).unwrap();
366        let heaviest = many.heaviest_tipset().unwrap();
367        assert_eq!(
368            heaviest.min_ticket_block(),
369            &heaviest.genesis(&many).unwrap()
370        );
371    }
372}