Skip to main content

forest/db/car/
many.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! The [`ManyCar`] block store is the union of `N` read-only CAR-backed block
5//! stores and a single writable block store. Get requests are forwarded to each
6//! store (including the writable store) and the first hit is returned. Write
7//! requests are only forwarded to the writable store.
8//!
9//! A single z-frame cache is shared between all read-only stores.
10
11use super::{AnyCar, ZstdFrameCache};
12use crate::blocks::TipsetKey;
13use crate::db::{
14    BlockstoreWriteOpsSubscribable, EthMappingsStore, MemoryDB, PersistentStore, SettingsStore,
15    SettingsStoreExt,
16};
17use crate::libp2p_bitswap::BitswapStoreReadWrite;
18use crate::rpc::eth::types::EthHash;
19use crate::shim::clock::ChainEpoch;
20use crate::utils::io::EitherMmapOrRandomAccessFile;
21use crate::utils::multihash::prelude::*;
22use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
23use anyhow::Context as _;
24use cid::Cid;
25use fvm_ipld_blockstore::Blockstore;
26use parking_lot::RwLock;
27use std::cmp::Ord;
28use std::collections::BinaryHeap;
29use std::{path::PathBuf, sync::Arc};
30
31struct WithHeaviestEpoch {
32    pub car: AnyCar<Box<dyn super::RandomAccessFileReader>>,
33    epoch: ChainEpoch,
34}
35
36impl WithHeaviestEpoch {
37    pub fn new(car: AnyCar<Box<dyn super::RandomAccessFileReader>>) -> anyhow::Result<Self> {
38        let epoch = car
39            .heaviest_tipset()
40            .context("store doesn't have a heaviest tipset")?
41            .epoch();
42        Ok(Self { car, epoch })
43    }
44}
45
46impl Ord for WithHeaviestEpoch {
47    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48        self.epoch.cmp(&other.epoch)
49    }
50}
51
52impl Eq for WithHeaviestEpoch {}
53
54impl PartialOrd for WithHeaviestEpoch {
55    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56        Some(self.cmp(other))
57    }
58}
59
60impl PartialEq for WithHeaviestEpoch {
61    fn eq(&self, other: &Self) -> bool {
62        self.epoch == other.epoch
63    }
64}
65
66pub struct ManyCar<WriterT = MemoryDB> {
67    shared_cache: Arc<ZstdFrameCache>,
68    read_only: Arc<RwLock<BinaryHeap<WithHeaviestEpoch>>>,
69    writer: WriterT,
70}
71
72impl<WriterT> ManyCar<WriterT> {
73    pub fn new(writer: WriterT) -> Self {
74        ManyCar {
75            shared_cache: Arc::new(ZstdFrameCache::default()),
76            read_only: Arc::new(RwLock::new(BinaryHeap::default())),
77            writer,
78        }
79    }
80
81    pub fn writer(&self) -> &WriterT {
82        &self.writer
83    }
84}
85
86impl<WriterT: Default> Default for ManyCar<WriterT> {
87    fn default() -> Self {
88        Self::new(Default::default())
89    }
90}
91
92impl<WriterT> ManyCar<WriterT> {
93    pub fn with_read_only<ReaderT: super::RandomAccessFileReader>(
94        self,
95        any_car: AnyCar<ReaderT>,
96    ) -> anyhow::Result<Self> {
97        self.read_only(any_car)?;
98        Ok(self)
99    }
100
101    pub fn read_only<ReaderT: super::RandomAccessFileReader>(
102        &self,
103        any_car: AnyCar<ReaderT>,
104    ) -> anyhow::Result<()> {
105        let mut read_only = self.read_only.write();
106        let key = read_only.len() as u64;
107
108        read_only.push(WithHeaviestEpoch::new(
109            any_car
110                .with_cache(self.shared_cache.clone(), key)
111                .into_dyn(),
112        )?);
113
114        Ok(())
115    }
116
117    pub fn with_read_only_files(
118        self,
119        files: impl Iterator<Item = PathBuf>,
120    ) -> anyhow::Result<Self> {
121        self.read_only_files(files)?;
122        Ok(self)
123    }
124
125    pub fn read_only_files(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
126        for file in files {
127            self.read_only(AnyCar::new(EitherMmapOrRandomAccessFile::open(file)?)?)?;
128        }
129
130        Ok(())
131    }
132
133    pub fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
134        self.read_only
135            .read()
136            .peek()
137            .map(|w| AnyCar::heaviest_tipset_key(&w.car))
138            .context("ManyCar store doesn't have a heaviest tipset key")
139    }
140
141    pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
142        self.read_only
143            .read()
144            .peek()
145            .map(|w| AnyCar::heaviest_tipset(&w.car))
146            .context("ManyCar store doesn't have a heaviest tipset")?
147    }
148
149    /// Number of read-only `CAR`s
150    pub fn len(&self) -> usize {
151        self.read_only.read().len()
152    }
153}
154
155impl<ReaderT: super::RandomAccessFileReader> TryFrom<AnyCar<ReaderT>> for ManyCar<MemoryDB> {
156    type Error = anyhow::Error;
157    fn try_from(any_car: AnyCar<ReaderT>) -> anyhow::Result<Self> {
158        ManyCar::default().with_read_only(any_car)
159    }
160}
161
162impl TryFrom<Vec<PathBuf>> for ManyCar<MemoryDB> {
163    type Error = anyhow::Error;
164    fn try_from(files: Vec<PathBuf>) -> anyhow::Result<Self> {
165        ManyCar::default().with_read_only_files(files.into_iter())
166    }
167}
168
169impl<WriterT: Blockstore> Blockstore for ManyCar<WriterT> {
170    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
171        // Theoretically it should be easily parallelizable with `rayon`.
172        // In practice, there is a massive performance loss when providing
173        // more than a single reader.
174        if let Ok(Some(value)) = self.writer.get(k) {
175            return Ok(Some(value));
176        }
177        for reader in self.read_only.read().iter() {
178            if let Some(val) = reader.car.get(k)? {
179                return Ok(Some(val));
180            }
181        }
182        Ok(None)
183    }
184
185    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
186        self.writer.put_keyed(k, block)
187    }
188}
189
190impl<WriterT: PersistentStore> PersistentStore for ManyCar<WriterT> {
191    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
192        self.writer.put_keyed_persistent(k, block)
193    }
194}
195
196impl<WriterT: BitswapStoreRead + Blockstore> BitswapStoreRead for ManyCar<WriterT> {
197    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
198        Blockstore::has(self, cid)
199    }
200
201    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
202        Blockstore::get(self, cid)
203    }
204}
205
206impl<WriterT: BitswapStoreReadWrite + Blockstore> BitswapStoreReadWrite for ManyCar<WriterT> {
207    type Hashes = MultihashCode;
208
209    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
210        self.put_keyed(block.cid(), block.data())
211    }
212}
213
214impl<WriterT: SettingsStore> SettingsStore for ManyCar<WriterT> {
215    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
216        SettingsStore::read_bin(self.writer(), key)
217    }
218
219    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
220        SettingsStore::write_bin(self.writer(), key, value)
221    }
222
223    fn exists(&self, key: &str) -> anyhow::Result<bool> {
224        SettingsStore::exists(self.writer(), key)
225    }
226
227    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
228        SettingsStore::setting_keys(self.writer())
229    }
230}
231
232impl<WriterT: EthMappingsStore> EthMappingsStore for ManyCar<WriterT> {
233    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
234        EthMappingsStore::read_bin(self.writer(), key)
235    }
236
237    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
238        EthMappingsStore::write_bin(self.writer(), key, value)
239    }
240
241    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
242        EthMappingsStore::exists(self.writer(), key)
243    }
244
245    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
246        EthMappingsStore::get_message_cids(self.writer())
247    }
248
249    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
250        EthMappingsStore::delete(self.writer(), keys)
251    }
252}
253
254impl<T: Blockstore + SettingsStore> super::super::HeaviestTipsetKeyProvider for ManyCar<T> {
255    fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
256        match SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)? {
257            Some(tsk) => Ok(tsk),
258            None => self.heaviest_tipset_key(),
259        }
260    }
261
262    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
263        SettingsStoreExt::write_obj(self, crate::db::setting_keys::HEAD_KEY, tsk)
264    }
265}
266
267impl<WriterT: BlockstoreWriteOpsSubscribable> BlockstoreWriteOpsSubscribable for ManyCar<WriterT> {
268    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)> {
269        self.writer().subscribe_write_ops()
270    }
271
272    fn unsubscribe_write_ops(&self) {
273        self.writer().unsubscribe_write_ops()
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::super::AnyCar;
280    use super::*;
281    use crate::networks::{calibnet, mainnet};
282
283    #[test]
284    fn many_car_empty() {
285        let many = ManyCar::new(MemoryDB::default());
286        assert!(many.heaviest_tipset().is_err());
287    }
288
289    #[test]
290    fn many_car_idempotent() {
291        let many = ManyCar::new(MemoryDB::default())
292            .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
293            .unwrap()
294            .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
295            .unwrap();
296        assert_eq!(
297            many.heaviest_tipset().unwrap(),
298            AnyCar::try_from(mainnet::DEFAULT_GENESIS)
299                .unwrap()
300                .heaviest_tipset()
301                .unwrap()
302        );
303    }
304
305    #[test]
306    fn many_car_calibnet_heaviest() {
307        let many = ManyCar::try_from(AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap()).unwrap();
308        let heaviest = many.heaviest_tipset().unwrap();
309        assert_eq!(
310            heaviest.min_ticket_block(),
311            &heaviest.genesis(&many).unwrap()
312        );
313    }
314}