1use super::{AnyCar, ZstdFrameCache};
12use crate::blocks::TipsetKey;
13use crate::db::{
14 BlockstoreWriteOpsSubscribable, EthMappingsStore, MemoryDB, PersistentStore, SettingsStore,
15 SettingsStoreExt,
16};
17use crate::libp2p_bitswap::BitswapStoreReadWrite;
18use crate::rpc::eth::types::EthHash;
19use crate::shim::clock::ChainEpoch;
20use crate::utils::io::EitherMmapOrRandomAccessFile;
21use crate::utils::multihash::prelude::*;
22use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
23use anyhow::Context as _;
24use cid::Cid;
25use fvm_ipld_blockstore::Blockstore;
26use parking_lot::RwLock;
27use std::cmp::Ord;
28use std::collections::BinaryHeap;
29use std::{path::PathBuf, sync::Arc};
30
31struct WithHeaviestEpoch {
32 pub car: AnyCar<Box<dyn super::RandomAccessFileReader>>,
33 epoch: ChainEpoch,
34}
35
36impl WithHeaviestEpoch {
37 pub fn new(car: AnyCar<Box<dyn super::RandomAccessFileReader>>) -> anyhow::Result<Self> {
38 let epoch = car
39 .heaviest_tipset()
40 .context("store doesn't have a heaviest tipset")?
41 .epoch();
42 Ok(Self { car, epoch })
43 }
44}
45
46impl Ord for WithHeaviestEpoch {
47 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48 self.epoch.cmp(&other.epoch)
49 }
50}
51
52impl Eq for WithHeaviestEpoch {}
53
54impl PartialOrd for WithHeaviestEpoch {
55 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56 Some(self.cmp(other))
57 }
58}
59
60impl PartialEq for WithHeaviestEpoch {
61 fn eq(&self, other: &Self) -> bool {
62 self.epoch == other.epoch
63 }
64}
65
66pub struct ManyCar<WriterT = MemoryDB> {
67 shared_cache: Arc<ZstdFrameCache>,
68 read_only: Arc<RwLock<BinaryHeap<WithHeaviestEpoch>>>,
69 writer: WriterT,
70}
71
72impl<WriterT> ManyCar<WriterT> {
73 pub fn new(writer: WriterT) -> Self {
74 ManyCar {
75 shared_cache: Arc::new(ZstdFrameCache::default()),
76 read_only: Arc::new(RwLock::new(BinaryHeap::default())),
77 writer,
78 }
79 }
80
81 pub fn writer(&self) -> &WriterT {
82 &self.writer
83 }
84}
85
86impl<WriterT: Default> Default for ManyCar<WriterT> {
87 fn default() -> Self {
88 Self::new(Default::default())
89 }
90}
91
92impl<WriterT> ManyCar<WriterT> {
93 pub fn with_read_only<ReaderT: super::RandomAccessFileReader>(
94 self,
95 any_car: AnyCar<ReaderT>,
96 ) -> anyhow::Result<Self> {
97 self.read_only(any_car)?;
98 Ok(self)
99 }
100
101 pub fn read_only<ReaderT: super::RandomAccessFileReader>(
102 &self,
103 any_car: AnyCar<ReaderT>,
104 ) -> anyhow::Result<()> {
105 let mut read_only = self.read_only.write();
106 let key = read_only.len() as u64;
107
108 read_only.push(WithHeaviestEpoch::new(
109 any_car
110 .with_cache(self.shared_cache.clone(), key)
111 .into_dyn(),
112 )?);
113
114 Ok(())
115 }
116
117 pub fn with_read_only_files(
118 self,
119 files: impl Iterator<Item = PathBuf>,
120 ) -> anyhow::Result<Self> {
121 self.read_only_files(files)?;
122 Ok(self)
123 }
124
125 pub fn read_only_files(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
126 for file in files {
127 self.read_only(AnyCar::new(EitherMmapOrRandomAccessFile::open(file)?)?)?;
128 }
129
130 Ok(())
131 }
132
133 pub fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
134 self.read_only
135 .read()
136 .peek()
137 .map(|w| AnyCar::heaviest_tipset_key(&w.car))
138 .context("ManyCar store doesn't have a heaviest tipset key")
139 }
140
141 pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
142 self.read_only
143 .read()
144 .peek()
145 .map(|w| AnyCar::heaviest_tipset(&w.car))
146 .context("ManyCar store doesn't have a heaviest tipset")?
147 }
148
149 pub fn len(&self) -> usize {
151 self.read_only.read().len()
152 }
153}
154
155impl<ReaderT: super::RandomAccessFileReader> TryFrom<AnyCar<ReaderT>> for ManyCar<MemoryDB> {
156 type Error = anyhow::Error;
157 fn try_from(any_car: AnyCar<ReaderT>) -> anyhow::Result<Self> {
158 ManyCar::default().with_read_only(any_car)
159 }
160}
161
162impl TryFrom<Vec<PathBuf>> for ManyCar<MemoryDB> {
163 type Error = anyhow::Error;
164 fn try_from(files: Vec<PathBuf>) -> anyhow::Result<Self> {
165 ManyCar::default().with_read_only_files(files.into_iter())
166 }
167}
168
169impl<WriterT: Blockstore> Blockstore for ManyCar<WriterT> {
170 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
171 if let Ok(Some(value)) = self.writer.get(k) {
175 return Ok(Some(value));
176 }
177 for reader in self.read_only.read().iter() {
178 if let Some(val) = reader.car.get(k)? {
179 return Ok(Some(val));
180 }
181 }
182 Ok(None)
183 }
184
185 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
186 self.writer.put_keyed(k, block)
187 }
188}
189
190impl<WriterT: PersistentStore> PersistentStore for ManyCar<WriterT> {
191 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
192 self.writer.put_keyed_persistent(k, block)
193 }
194}
195
196impl<WriterT: BitswapStoreRead + Blockstore> BitswapStoreRead for ManyCar<WriterT> {
197 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
198 Blockstore::has(self, cid)
199 }
200
201 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
202 Blockstore::get(self, cid)
203 }
204}
205
206impl<WriterT: BitswapStoreReadWrite + Blockstore> BitswapStoreReadWrite for ManyCar<WriterT> {
207 type Hashes = MultihashCode;
208
209 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
210 self.put_keyed(block.cid(), block.data())
211 }
212}
213
214impl<WriterT: SettingsStore> SettingsStore for ManyCar<WriterT> {
215 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
216 SettingsStore::read_bin(self.writer(), key)
217 }
218
219 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
220 SettingsStore::write_bin(self.writer(), key, value)
221 }
222
223 fn exists(&self, key: &str) -> anyhow::Result<bool> {
224 SettingsStore::exists(self.writer(), key)
225 }
226
227 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
228 SettingsStore::setting_keys(self.writer())
229 }
230}
231
232impl<WriterT: EthMappingsStore> EthMappingsStore for ManyCar<WriterT> {
233 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
234 EthMappingsStore::read_bin(self.writer(), key)
235 }
236
237 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
238 EthMappingsStore::write_bin(self.writer(), key, value)
239 }
240
241 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
242 EthMappingsStore::exists(self.writer(), key)
243 }
244
245 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
246 EthMappingsStore::get_message_cids(self.writer())
247 }
248
249 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
250 EthMappingsStore::delete(self.writer(), keys)
251 }
252}
253
254impl<T: Blockstore + SettingsStore> super::super::HeaviestTipsetKeyProvider for ManyCar<T> {
255 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
256 match SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)? {
257 Some(tsk) => Ok(tsk),
258 None => self.heaviest_tipset_key(),
259 }
260 }
261
262 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
263 SettingsStoreExt::write_obj(self, crate::db::setting_keys::HEAD_KEY, tsk)
264 }
265}
266
267impl<WriterT: BlockstoreWriteOpsSubscribable> BlockstoreWriteOpsSubscribable for ManyCar<WriterT> {
268 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)> {
269 self.writer().subscribe_write_ops()
270 }
271
272 fn unsubscribe_write_ops(&self) {
273 self.writer().unsubscribe_write_ops()
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::super::AnyCar;
280 use super::*;
281 use crate::networks::{calibnet, mainnet};
282
283 #[test]
284 fn many_car_empty() {
285 let many = ManyCar::new(MemoryDB::default());
286 assert!(many.heaviest_tipset().is_err());
287 }
288
289 #[test]
290 fn many_car_idempotent() {
291 let many = ManyCar::new(MemoryDB::default())
292 .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
293 .unwrap()
294 .with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
295 .unwrap();
296 assert_eq!(
297 many.heaviest_tipset().unwrap(),
298 AnyCar::try_from(mainnet::DEFAULT_GENESIS)
299 .unwrap()
300 .heaviest_tipset()
301 .unwrap()
302 );
303 }
304
305 #[test]
306 fn many_car_calibnet_heaviest() {
307 let many = ManyCar::try_from(AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap()).unwrap();
308 let heaviest = many.heaviest_tipset().unwrap();
309 assert_eq!(
310 heaviest.min_ticket_block(),
311 &heaviest.genesis(&many).unwrap()
312 );
313 }
314}