1use super::{AnyCar, ZstdFrameCache};
12use crate::blocks::TipsetKey;
13use crate::db::parity_db::GarbageCollectableDb;
14use crate::db::{
15 BlockstoreWriteOpsSubscribable, EthMappingsStore, MemoryDB, PersistentStore, SettingsStore,
16 SettingsStoreExt,
17};
18use crate::libp2p_bitswap::BitswapStoreReadWrite;
19use crate::rpc::eth::types::EthHash;
20use crate::shim::clock::ChainEpoch;
21use crate::utils::ShallowClone as _;
22use crate::utils::io::EitherMmapOrRandomAccessFile;
23use crate::utils::multihash::prelude::*;
24use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
25use anyhow::Context as _;
26use cid::Cid;
27use fvm_ipld_blockstore::Blockstore;
28use parking_lot::RwLock;
29use std::{
30 cmp::Ord,
31 collections::BinaryHeap,
32 path::{Path, PathBuf},
33};
34
35struct WithHeaviestEpoch {
36 pub car: AnyCar<Box<dyn super::RandomAccessFileReader>>,
37 epoch: ChainEpoch,
38}
39
40impl WithHeaviestEpoch {
41 pub fn new(car: AnyCar<Box<dyn super::RandomAccessFileReader>>) -> anyhow::Result<Self> {
42 let epoch = car
43 .heaviest_tipset()
44 .context("store doesn't have a heaviest tipset")?
45 .epoch();
46 Ok(Self { car, epoch })
47 }
48}
49
50impl Ord for WithHeaviestEpoch {
51 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
52 self.epoch.cmp(&other.epoch)
53 }
54}
55
56impl Eq for WithHeaviestEpoch {}
57
58impl PartialOrd for WithHeaviestEpoch {
59 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
60 Some(self.cmp(other))
61 }
62}
63
64impl PartialEq for WithHeaviestEpoch {
65 fn eq(&self, other: &Self) -> bool {
66 self.epoch == other.epoch
67 }
68}
69
70pub struct ManyCar<WriterT = MemoryDB> {
71 shared_cache: RwLock<ZstdFrameCache>,
72 read_only: RwLock<BinaryHeap<WithHeaviestEpoch>>,
73 writer: WriterT,
74}
75
76impl<WriterT> ManyCar<WriterT> {
77 pub fn new(writer: WriterT) -> Self {
78 ManyCar {
79 shared_cache: RwLock::new(ZstdFrameCache::default()),
80 read_only: RwLock::new(BinaryHeap::default()),
81 writer,
82 }
83 }
84
85 pub fn writer(&self) -> &WriterT {
86 &self.writer
87 }
88}
89
90impl<WriterT: Default> Default for ManyCar<WriterT> {
91 fn default() -> Self {
92 Self::new(Default::default())
93 }
94}
95
96impl<WriterT> ManyCar<WriterT> {
97 pub fn with_read_only<ReaderT: super::RandomAccessFileReader>(
98 self,
99 any_car: AnyCar<ReaderT>,
100 ) -> anyhow::Result<Self> {
101 self.read_only(any_car)?;
102 Ok(self)
103 }
104
105 pub fn read_only<ReaderT: super::RandomAccessFileReader>(
106 &self,
107 any_car: AnyCar<ReaderT>,
108 ) -> anyhow::Result<()> {
109 let mut read_only = self.read_only.write();
110 Self::read_only_inner(
111 &mut read_only,
112 self.shared_cache.read().shallow_clone(),
113 any_car,
114 )
115 }
116
117 fn read_only_inner<ReaderT: super::RandomAccessFileReader>(
118 read_only: &mut BinaryHeap<WithHeaviestEpoch>,
119 shared_cache: ZstdFrameCache,
120 any_car: AnyCar<ReaderT>,
121 ) -> anyhow::Result<()> {
122 let key = read_only.len() as u64;
123 read_only.push(WithHeaviestEpoch::new(
124 any_car.with_cache(shared_cache, key).into_dyn(),
125 )?);
126 Ok(())
127 }
128
129 pub fn with_read_only_files(
130 self,
131 files: impl Iterator<Item = PathBuf>,
132 ) -> anyhow::Result<Self> {
133 self.read_only_files(files)?;
134 Ok(self)
135 }
136
137 pub fn read_only_files(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
138 for file in files {
139 self.read_only_file(file)?;
140 }
141 Ok(())
142 }
143
144 pub fn read_only_file(&self, file: impl AsRef<Path>) -> anyhow::Result<()> {
145 (|| {
146 self.read_only(AnyCar::new(EitherMmapOrRandomAccessFile::open(
147 file.as_ref(),
148 )?)?)
149 })()
150 .with_context(|| format!("failed to load CAR at {}", file.as_ref().display()))
151 }
152
153 pub fn clear_and_append_read_only_files(
155 &self,
156 files: impl Iterator<Item = PathBuf>,
157 ) -> anyhow::Result<()> {
158 let mut read_only = BinaryHeap::default();
159 let shared_cache = ZstdFrameCache::default();
160 for f in files {
161 let car = AnyCar::new(EitherMmapOrRandomAccessFile::open(f)?)?;
162 Self::read_only_inner(&mut read_only, shared_cache.shallow_clone(), car)?;
163 }
164 *self.read_only.write() = read_only;
165 *self.shared_cache.write() = shared_cache;
166 Ok(())
167 }
168
169 pub fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
170 Ok(self
171 .read_only
172 .read()
173 .peek()
174 .map(|w| AnyCar::heaviest_tipset_key(&w.car)))
175 }
176
177 pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
178 self.read_only
179 .read()
180 .peek()
181 .map(|w| AnyCar::heaviest_tipset(&w.car))
182 .context("ManyCar store doesn't have a heaviest tipset")?
183 }
184
185 pub fn len(&self) -> usize {
187 self.read_only.read().len()
188 }
189}
190
191pub trait ReloadableManyCar {
192 fn clear_and_reload_cars(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()>;
193
194 fn heaviest_car_tipset(&self) -> anyhow::Result<Tipset>;
195}
196
197impl<T> ReloadableManyCar for ManyCar<T> {
198 fn clear_and_reload_cars(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
199 self.clear_and_append_read_only_files(files)
200 }
201
202 fn heaviest_car_tipset(&self) -> anyhow::Result<Tipset> {
203 self.heaviest_tipset()
204 }
205}
206
207impl<ReaderT: super::RandomAccessFileReader> TryFrom<AnyCar<ReaderT>> for ManyCar<MemoryDB> {
208 type Error = anyhow::Error;
209 fn try_from(any_car: AnyCar<ReaderT>) -> anyhow::Result<Self> {
210 ManyCar::default().with_read_only(any_car)
211 }
212}
213
214impl TryFrom<Vec<PathBuf>> for ManyCar<MemoryDB> {
215 type Error = anyhow::Error;
216 fn try_from(files: Vec<PathBuf>) -> anyhow::Result<Self> {
217 ManyCar::default().with_read_only_files(files.into_iter())
218 }
219}
220
221impl<WriterT: Blockstore> Blockstore for ManyCar<WriterT> {
222 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
223 if let Ok(Some(value)) = self.writer.get(k) {
227 return Ok(Some(value));
228 }
229 for reader in self.read_only.read().iter() {
230 if let Some(val) = reader.car.get(k)? {
231 return Ok(Some(val));
232 }
233 }
234 Ok(None)
235 }
236
237 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
238 self.writer.put_keyed(k, block)
239 }
240}
241
242impl<WriterT: PersistentStore> PersistentStore for ManyCar<WriterT> {
243 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
244 self.writer.put_keyed_persistent(k, block)
245 }
246}
247
248impl<WriterT: BitswapStoreRead + Blockstore> BitswapStoreRead for ManyCar<WriterT> {
249 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
250 Blockstore::has(self, cid)
251 }
252
253 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
254 Blockstore::get(self, cid)
255 }
256}
257
258impl<WriterT: BitswapStoreReadWrite + Blockstore> BitswapStoreReadWrite for ManyCar<WriterT> {
259 type Hashes = MultihashCode;
260
261 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
262 self.put_keyed(block.cid(), block.data())
263 }
264}
265
266impl<WriterT: SettingsStore> SettingsStore for ManyCar<WriterT> {
267 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
268 SettingsStore::read_bin(self.writer(), key)
269 }
270
271 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
272 SettingsStore::write_bin(self.writer(), key, value)
273 }
274
275 fn exists(&self, key: &str) -> anyhow::Result<bool> {
276 SettingsStore::exists(self.writer(), key)
277 }
278
279 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
280 SettingsStore::setting_keys(self.writer())
281 }
282}
283
284impl<WriterT: EthMappingsStore> EthMappingsStore for ManyCar<WriterT> {
285 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
286 EthMappingsStore::read_bin(self.writer(), key)
287 }
288
289 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
290 EthMappingsStore::write_bin(self.writer(), key, value)
291 }
292
293 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
294 EthMappingsStore::exists(self.writer(), key)
295 }
296
297 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
298 EthMappingsStore::get_message_cids(self.writer())
299 }
300
301 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
302 EthMappingsStore::delete(self.writer(), keys)
303 }
304}
305
306impl<T: Blockstore + SettingsStore> super::super::HeaviestTipsetKeyProvider for ManyCar<T> {
307 fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
308 match SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)? {
309 Some(tsk) => Ok(Some(tsk)),
310 None => self.heaviest_tipset_key(),
311 }
312 }
313
314 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
315 SettingsStoreExt::write_obj(self, crate::db::setting_keys::HEAD_KEY, tsk)
316 }
317}
318
319impl<WriterT: BlockstoreWriteOpsSubscribable> BlockstoreWriteOpsSubscribable for ManyCar<WriterT> {
320 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
321 self.writer().subscribe_write_ops()
322 }
323
324 fn unsubscribe_write_ops(&self) {
325 self.writer().unsubscribe_write_ops()
326 }
327}
328
329impl<T: GarbageCollectableDb> GarbageCollectableDb for ManyCar<T> {
330 fn reset_gc_columns(&self) -> anyhow::Result<()> {
331 self.writer().reset_gc_columns()
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::super::AnyCar;
338 use super::*;
339 use crate::networks::{calibnet, mainnet};
340
341 #[test]
342 fn many_car_empty() {
343 let many = ManyCar::new(MemoryDB::default());
344 assert!(many.heaviest_tipset().is_err());
345 }
346
347 #[test]
348 fn many_car_idempotent() {
349 let many = ManyCar::new(MemoryDB::default())
350 .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
351 .unwrap()
352 .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
353 .unwrap();
354 assert_eq!(
355 many.heaviest_tipset().unwrap(),
356 AnyCar::try_from(mainnet::DEFAULT_GENESIS)
357 .unwrap()
358 .heaviest_tipset()
359 .unwrap()
360 );
361 }
362
363 #[test]
364 fn many_car_calibnet_heaviest() {
365 let many = ManyCar::try_from(AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap()).unwrap();
366 let heaviest = many.heaviest_tipset().unwrap();
367 assert_eq!(
368 heaviest.min_ticket_block(),
369 &heaviest.genesis(&many).unwrap()
370 );
371 }
372}