use super::{AnyCar, ZstdFrameCache};
use crate::blocks::TipsetKey;
use crate::db::parity_db::GarbageCollectableDb;
use crate::db::{
BlockstoreWriteOpsSubscribable, EthMappingsStore, MemoryDB, PersistentStore, SettingsStore,
SettingsStoreExt,
};
use crate::libp2p_bitswap::BitswapStoreReadWrite;
use crate::rpc::eth::types::EthHash;
use crate::shim::clock::ChainEpoch;
use crate::utils::ShallowClone as _;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use crate::utils::multihash::prelude::*;
use crate::{blocks::Tipset, libp2p_bitswap::BitswapStoreRead};
use anyhow::Context as _;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use parking_lot::RwLock;
use std::{
cmp::Ord,
collections::BinaryHeap,
path::{Path, PathBuf},
};
struct WithHeaviestEpoch {
pub car: AnyCar<Box<dyn super::RandomAccessFileReader>>,
epoch: ChainEpoch,
}
impl WithHeaviestEpoch {
pub fn new(car: AnyCar<Box<dyn super::RandomAccessFileReader>>) -> anyhow::Result<Self> {
let epoch = car
.heaviest_tipset()
.context("store doesn't have a heaviest tipset")?
.epoch();
Ok(Self { car, epoch })
}
}
impl Ord for WithHeaviestEpoch {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.epoch.cmp(&other.epoch)
}
}
impl Eq for WithHeaviestEpoch {}
impl PartialOrd for WithHeaviestEpoch {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for WithHeaviestEpoch {
fn eq(&self, other: &Self) -> bool {
self.epoch == other.epoch
}
}
pub struct ManyCar<WriterT = MemoryDB> {
shared_cache: RwLock<ZstdFrameCache>,
read_only: RwLock<BinaryHeap<WithHeaviestEpoch>>,
writer: WriterT,
}
impl<WriterT> ManyCar<WriterT> {
pub fn new(writer: WriterT) -> Self {
ManyCar {
shared_cache: RwLock::new(ZstdFrameCache::default()),
read_only: RwLock::new(BinaryHeap::default()),
writer,
}
}
pub fn writer(&self) -> &WriterT {
&self.writer
}
}
impl<WriterT: Default> Default for ManyCar<WriterT> {
fn default() -> Self {
Self::new(Default::default())
}
}
impl<WriterT> ManyCar<WriterT> {
pub fn with_read_only<ReaderT: super::RandomAccessFileReader>(
self,
any_car: AnyCar<ReaderT>,
) -> anyhow::Result<Self> {
self.read_only(any_car)?;
Ok(self)
}
pub fn read_only<ReaderT: super::RandomAccessFileReader>(
&self,
any_car: AnyCar<ReaderT>,
) -> anyhow::Result<()> {
let mut read_only = self.read_only.write();
Self::read_only_inner(
&mut read_only,
self.shared_cache.read().shallow_clone(),
any_car,
)
}
fn read_only_inner<ReaderT: super::RandomAccessFileReader>(
read_only: &mut BinaryHeap<WithHeaviestEpoch>,
shared_cache: ZstdFrameCache,
any_car: AnyCar<ReaderT>,
) -> anyhow::Result<()> {
let key = read_only.len() as u64;
read_only.push(WithHeaviestEpoch::new(
any_car.with_cache(shared_cache, key).into_dyn(),
)?);
Ok(())
}
pub fn with_read_only_files(
self,
files: impl Iterator<Item = PathBuf>,
) -> anyhow::Result<Self> {
self.read_only_files(files)?;
Ok(self)
}
pub fn read_only_files(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
for file in files {
self.read_only_file(file)?;
}
Ok(())
}
pub fn read_only_file(&self, file: impl AsRef<Path>) -> anyhow::Result<()> {
(|| {
self.read_only(AnyCar::new(EitherMmapOrRandomAccessFile::open(
file.as_ref(),
)?)?)
})()
.with_context(|| format!("failed to load CAR at {}", file.as_ref().display()))
}
pub fn clear_and_append_read_only_files(
&self,
files: impl Iterator<Item = PathBuf>,
) -> anyhow::Result<()> {
let mut read_only = BinaryHeap::default();
let shared_cache = ZstdFrameCache::default();
for f in files {
let car = AnyCar::new(EitherMmapOrRandomAccessFile::open(f)?)?;
Self::read_only_inner(&mut read_only, shared_cache.shallow_clone(), car)?;
}
*self.read_only.write() = read_only;
*self.shared_cache.write() = shared_cache;
Ok(())
}
pub fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
Ok(self
.read_only
.read()
.peek()
.map(|w| AnyCar::heaviest_tipset_key(&w.car)))
}
pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
self.read_only
.read()
.peek()
.map(|w| AnyCar::heaviest_tipset(&w.car))
.context("ManyCar store doesn't have a heaviest tipset")?
}
pub fn len(&self) -> usize {
self.read_only.read().len()
}
}
pub trait ReloadableManyCar {
fn clear_and_reload_cars(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()>;
fn heaviest_car_tipset(&self) -> anyhow::Result<Tipset>;
}
impl<T> ReloadableManyCar for ManyCar<T> {
fn clear_and_reload_cars(&self, files: impl Iterator<Item = PathBuf>) -> anyhow::Result<()> {
self.clear_and_append_read_only_files(files)
}
fn heaviest_car_tipset(&self) -> anyhow::Result<Tipset> {
self.heaviest_tipset()
}
}
impl<ReaderT: super::RandomAccessFileReader> TryFrom<AnyCar<ReaderT>> for ManyCar<MemoryDB> {
type Error = anyhow::Error;
fn try_from(any_car: AnyCar<ReaderT>) -> anyhow::Result<Self> {
ManyCar::default().with_read_only(any_car)
}
}
impl TryFrom<Vec<PathBuf>> for ManyCar<MemoryDB> {
type Error = anyhow::Error;
fn try_from(files: Vec<PathBuf>) -> anyhow::Result<Self> {
ManyCar::default().with_read_only_files(files.into_iter())
}
}
impl<WriterT: Blockstore> Blockstore for ManyCar<WriterT> {
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
if let Ok(Some(value)) = self.writer.get(k) {
return Ok(Some(value));
}
for reader in self.read_only.read().iter() {
if let Some(val) = reader.car.get(k)? {
return Ok(Some(val));
}
}
Ok(None)
}
fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
self.writer.put_keyed(k, block)
}
}
impl<WriterT: PersistentStore> PersistentStore for ManyCar<WriterT> {
fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
self.writer.put_keyed_persistent(k, block)
}
}
impl<WriterT: BitswapStoreRead + Blockstore> BitswapStoreRead for ManyCar<WriterT> {
fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
Blockstore::has(self, cid)
}
fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
Blockstore::get(self, cid)
}
}
impl<WriterT: BitswapStoreReadWrite + Blockstore> BitswapStoreReadWrite for ManyCar<WriterT> {
type Hashes = MultihashCode;
fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
self.put_keyed(block.cid(), block.data())
}
}
impl<WriterT: SettingsStore> SettingsStore for ManyCar<WriterT> {
fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
SettingsStore::read_bin(self.writer(), key)
}
fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
SettingsStore::write_bin(self.writer(), key, value)
}
fn exists(&self, key: &str) -> anyhow::Result<bool> {
SettingsStore::exists(self.writer(), key)
}
fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
SettingsStore::setting_keys(self.writer())
}
}
impl<WriterT: EthMappingsStore> EthMappingsStore for ManyCar<WriterT> {
fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
EthMappingsStore::read_bin(self.writer(), key)
}
fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
EthMappingsStore::write_bin(self.writer(), key, value)
}
fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
EthMappingsStore::exists(self.writer(), key)
}
fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
EthMappingsStore::get_message_cids(self.writer())
}
fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
EthMappingsStore::delete(self.writer(), keys)
}
}
impl<T: Blockstore + SettingsStore> super::super::HeaviestTipsetKeyProvider for ManyCar<T> {
fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
match SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)? {
Some(tsk) => Ok(Some(tsk)),
None => self.heaviest_tipset_key(),
}
}
fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
SettingsStoreExt::write_obj(self, crate::db::setting_keys::HEAD_KEY, tsk)
}
}
impl<WriterT: BlockstoreWriteOpsSubscribable> BlockstoreWriteOpsSubscribable for ManyCar<WriterT> {
fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
self.writer().subscribe_write_ops()
}
fn unsubscribe_write_ops(&self) {
self.writer().unsubscribe_write_ops()
}
}
impl<T: GarbageCollectableDb> GarbageCollectableDb for ManyCar<T> {
fn reset_gc_columns(&self) -> anyhow::Result<()> {
self.writer().reset_gc_columns()
}
}
#[cfg(test)]
mod tests {
use super::super::AnyCar;
use super::*;
use crate::networks::{calibnet, mainnet};
#[test]
fn many_car_empty() {
let many = ManyCar::new(MemoryDB::default());
assert!(many.heaviest_tipset().is_err());
}
#[test]
fn many_car_idempotent() {
let many = ManyCar::new(MemoryDB::default())
.with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
.unwrap()
.with_read_only(AnyCar::try_from(mainnet::DEFAULT_GENESIS).unwrap())
.unwrap();
assert_eq!(
many.heaviest_tipset().unwrap(),
AnyCar::try_from(mainnet::DEFAULT_GENESIS)
.unwrap()
.heaviest_tipset()
.unwrap()
);
}
#[test]
fn many_car_calibnet_heaviest() {
let many = ManyCar::try_from(AnyCar::try_from(calibnet::DEFAULT_GENESIS).unwrap()).unwrap();
let heaviest = many.heaviest_tipset().unwrap();
assert_eq!(
heaviest.min_ticket_block(),
&heaviest.genesis(&many).unwrap()
);
}
}