use crate::chain::FilecoinSnapshotMetadata;
use crate::cid_collections::CidHashMap;
use crate::db::PersistentStore;
use crate::utils::db::car_stream::{CarV1Header, CarV2Header};
use crate::{
blocks::{Tipset, TipsetKey},
utils::encoding::from_slice_with_fallback,
};
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore as _;
use integer_encoding::{FixedIntReader, VarIntReader};
use nunny::Vec as NonEmpty;
use parking_lot::RwLock;
use positioned_io::ReadAt;
use std::{
io::{
self, BufReader,
ErrorKind::{InvalidData, Unsupported},
Read, Seek, SeekFrom,
},
iter,
sync::OnceLock,
};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tracing::{debug, trace};
pub struct PlainCar<ReaderT> {
reader: ReaderT,
index: RwLock<CidHashMap<UncompressedBlockDataLocation>>,
version: u64,
header_v1: CarV1Header,
header_v2: Option<CarV2Header>,
metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
}
impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
#[tracing::instrument(level = "debug", skip_all)]
pub fn new(reader: ReaderT) -> io::Result<Self> {
let mut cursor = positioned_io::Cursor::new(&reader);
let position = cursor.position();
let header_v2 = read_v2_header(&mut cursor)?;
let (limit_position, version) =
if let Some(header_v2) = &header_v2 {
cursor.set_position(position.saturating_add(
u64::try_from(header_v2.data_offset).map_err(io::Error::other)?,
));
(
Some(cursor.stream_position()?.saturating_add(
u64::try_from(header_v2.data_size).map_err(io::Error::other)?,
)),
2,
)
} else {
cursor.set_position(position);
(None, 1)
};
let header_v1 = read_v1_header(&mut cursor)?;
let mut buf_reader = BufReader::with_capacity(1024, cursor);
let index = iter::from_fn(|| {
read_block_data_location_and_skip(&mut buf_reader, limit_position).transpose()
})
.collect::<Result<CidHashMap<_>, _>>()?;
match index.len() {
0 => Err(io::Error::new(
InvalidData,
"CARv1 files must contain at least one block",
)),
num_blocks => {
debug!(num_blocks, "indexed CAR");
Ok(Self {
reader,
index: RwLock::new(index),
version,
header_v1,
header_v2,
metadata: OnceLock::new(),
})
}
}
}
pub fn metadata(&self) -> Option<&FilecoinSnapshotMetadata> {
self.metadata
.get_or_init(|| {
if self.header_v1.roots.len() == super::V2_SNAPSHOT_ROOT_COUNT {
let maybe_metadata_cid = self.header_v1.roots.first();
if let Ok(Some(metadata)) =
self.get_cbor::<FilecoinSnapshotMetadata>(maybe_metadata_cid)
{
return Some(metadata);
}
}
None
})
.as_ref()
}
pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
if let Some(metadata) = self.metadata() {
&metadata.head_tipset_key
} else {
&self.header_v1.roots
}
}
pub fn version(&self) -> u64 {
self.version
}
pub fn heaviest_tipset_key(&self) -> TipsetKey {
TipsetKey::from(self.head_tipset_key().clone())
}
pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
Tipset::load_required(self, &self.heaviest_tipset_key())
}
#[cfg(test)]
pub fn cids(&self) -> Vec<Cid> {
self.index.read().keys().collect()
}
pub fn into_dyn(self) -> PlainCar<Box<dyn super::RandomAccessFileReader>> {
PlainCar {
reader: Box::new(self.reader),
index: self.index,
version: self.version,
header_v1: self.header_v1,
header_v2: self.header_v2,
metadata: self.metadata,
}
}
pub fn get_reader(&self, k: Cid) -> Option<impl Read> {
self.index
.read()
.get(&k)
.map(|UncompressedBlockDataLocation { offset, length }| {
positioned_io::Cursor::new_pos(&self.reader, *offset).take(u64::from(*length))
})
}
}
impl TryFrom<&'static [u8]> for PlainCar<&'static [u8]> {
type Error = io::Error;
fn try_from(bytes: &'static [u8]) -> io::Result<Self> {
PlainCar::new(bytes)
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct UncompressedBlockDataLocation {
offset: u64,
length: u32,
}
impl<ReaderT> Blockstore for PlainCar<ReaderT>
where
ReaderT: ReadAt,
{
#[tracing::instrument(level = "trace", skip(self))]
fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
match self.index.read().get(k) {
Some(UncompressedBlockDataLocation { offset, length }) => {
trace!("fetching from disk");
let mut data = vec![0; usize::try_from(*length).unwrap()];
self.reader.read_exact_at(*offset, &mut data)?;
Ok(Some(data))
}
None => {
trace!("not found");
Ok(None)
}
}
}
fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
unreachable!("PlainCar is read-only, use ManyCar instead");
}
}
impl<ReaderT> PersistentStore for PlainCar<ReaderT>
where
ReaderT: ReadAt,
{
fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
self.put_keyed(k, block)
}
}
pub async fn write_skip_frame_header_async(
mut writer: impl AsyncWrite + Unpin,
data_len: u32,
) -> std::io::Result<()> {
writer
.write_all(&super::forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER)
.await?;
writer.write_all(&data_len.to_le_bytes()).await?;
Ok(())
}
fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
match cid_error {
cid::Error::Io(io_error) => io_error,
other => io::Error::new(InvalidData, other),
}
}
pub fn read_v2_header(mut reader: impl Read) -> io::Result<Option<CarV2Header>> {
const CAR_V2_PRAGMA: [u8; 10] = [0xa1, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02];
let len = reader.read_fixedint::<u8>()? as usize;
if len == CAR_V2_PRAGMA.len() {
let mut buffer = vec![0; len];
reader.read_exact(&mut buffer)?;
if buffer[..] == CAR_V2_PRAGMA {
let mut characteristics = [0; 16];
reader.read_exact(&mut characteristics)?;
let data_offset: i64 = reader.read_fixedint()?;
let data_size: i64 = reader.read_fixedint()?;
let index_offset: i64 = reader.read_fixedint()?;
return Ok(Some(CarV2Header {
characteristics,
data_offset,
data_size,
index_offset,
}));
}
}
Ok(None)
}
#[tracing::instrument(level = "trace", skip_all, ret)]
fn read_v1_header(mut reader: impl Read) -> io::Result<CarV1Header> {
let header_len = reader.read_varint()?;
let mut buffer = vec![0; header_len];
reader.read_exact(&mut buffer)?;
let header: CarV1Header =
from_slice_with_fallback(&buffer).map_err(|e| io::Error::new(InvalidData, e))?;
if header.version == 1 {
Ok(header)
} else {
Err(io::Error::new(
Unsupported,
format!("unsupported CAR version {}", header.version),
))
}
}
#[tracing::instrument(level = "trace", skip_all, ret)]
fn read_block_data_location_and_skip(
mut reader: impl Read + Seek,
limit_position: Option<u64>,
) -> io::Result<Option<(Cid, UncompressedBlockDataLocation)>> {
if let Some(limit_position) = limit_position
&& reader.stream_position()? >= limit_position
{
return Ok(None);
}
let Some(body_length) = read_varint_body_length_or_eof(&mut reader)? else {
return Ok(None);
};
let frame_body_offset = reader.stream_position()?;
let mut reader = CountRead::new(&mut reader);
let cid = Cid::read_bytes(&mut reader).map_err(cid_error_to_io_error)?;
let cid_length = reader.bytes_read();
let block_data_offset = frame_body_offset + u64::try_from(cid_length).unwrap();
let next_frame_offset = frame_body_offset + u64::from(body_length);
let block_data_length = u32::try_from(next_frame_offset - block_data_offset).unwrap();
reader
.into_inner()
.seek(SeekFrom::Start(next_frame_offset))?;
Ok(Some((
cid,
UncompressedBlockDataLocation {
offset: block_data_offset,
length: block_data_length,
},
)))
}
fn read_varint_body_length_or_eof(mut reader: impl Read) -> io::Result<Option<u32>> {
let mut byte = [0u8; 1]; match reader.read(&mut byte)? {
0 => Ok(None),
1 => (byte.chain(reader)).read_varint().map(Some),
_ => unreachable!(),
}
}
struct CountRead<ReadT> {
inner: ReadT,
count: usize,
}
impl<ReadT> CountRead<ReadT> {
pub fn new(inner: ReadT) -> Self {
Self { inner, count: 0 }
}
pub fn bytes_read(&self) -> usize {
self.count
}
pub fn into_inner(self) -> ReadT {
self.inner
}
}
impl<ReadT> Read for CountRead<ReadT>
where
ReadT: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let n = self.inner.read(buf)?;
self.count += n;
Ok(n)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::db::{
car_stream::{CarStream, CarV1Header},
car_util::load_car,
};
use futures::TryStreamExt as _;
use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
use std::io::Cursor;
use std::sync::LazyLock;
use tokio::io::{AsyncBufRead, AsyncSeek, BufReader};
use tokio_test::block_on;
#[test]
fn test_uncompressed_v1() {
let car = chain4_car();
let car_backed = PlainCar::new(car).unwrap();
assert_eq!(car_backed.version(), 1);
assert_eq!(car_backed.head_tipset_key().len(), 1);
assert_eq!(car_backed.cids().len(), 1222);
let reference_car = reference(Cursor::new(car));
let reference_car_zst = reference(Cursor::new(chain4_car_zst()));
let reference_car_zst_unsafe = reference_unsafe(chain4_car_zst());
for cid in car_backed.cids() {
let expected = reference_car.get(&cid).unwrap().unwrap();
let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
let mut expected4 = vec![];
car_backed
.get_reader(cid)
.unwrap()
.read_to_end(&mut expected4)
.unwrap();
let actual = car_backed.get(&cid).unwrap().unwrap();
assert_eq!(expected, actual);
assert_eq!(expected2, actual);
assert_eq!(expected3, actual);
assert_eq!(expected4, actual);
}
}
#[test]
fn test_uncompressed_v2() {
let car = carv2_car();
let car_backed = PlainCar::new(car).unwrap();
assert_eq!(car_backed.version(), 2);
assert_eq!(car_backed.head_tipset_key().len(), 1);
assert_eq!(car_backed.cids().len(), 7153);
let reference_car = reference(Cursor::new(car));
let reference_car_zst = reference(Cursor::new(carv2_car_zst()));
let reference_car_zst_unsafe = reference_unsafe(carv2_car_zst());
for cid in car_backed.cids() {
let expected = reference_car.get(&cid).unwrap().unwrap();
let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
let actual = car_backed.get(&cid).unwrap().unwrap();
assert_eq!(expected, actual);
assert_eq!(expected2, actual);
assert_eq!(expected3, actual);
}
}
fn reference(reader: impl AsyncBufRead + AsyncSeek + Unpin) -> MemoryBlockstore {
let blockstore = MemoryBlockstore::new();
block_on(load_car(&blockstore, reader)).unwrap();
blockstore
}
fn reference_unsafe(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore {
let blockstore = MemoryBlockstore::new();
block_on(load_car_unsafe(&blockstore, reader)).unwrap();
blockstore
}
pub async fn load_car_unsafe<R>(db: &impl Blockstore, reader: R) -> anyhow::Result<CarV1Header>
where
R: AsyncBufRead + Unpin,
{
let mut stream = CarStream::new_unsafe(BufReader::new(reader)).await?;
while let Some(block) = stream.try_next().await? {
db.put_keyed(&block.cid, &block.data)?;
}
Ok(stream.header_v1)
}
fn chain4_car_zst() -> &'static [u8] {
include_bytes!("../../../test-snapshots/chain4.car.zst")
}
fn chain4_car() -> &'static [u8] {
static CAR: LazyLock<Vec<u8>> =
LazyLock::new(|| zstd::decode_all(chain4_car_zst()).unwrap());
CAR.as_slice()
}
fn carv2_car_zst() -> &'static [u8] {
include_bytes!("../../../test-snapshots/carv2.car.zst")
}
fn carv2_car() -> &'static [u8] {
static CAR: LazyLock<Vec<u8>> =
LazyLock::new(|| zstd::decode_all(carv2_car_zst()).unwrap());
CAR.as_slice()
}
}