1use super::{CacheKey, ZstdFrameCache};
50use crate::blocks::{Tipset, TipsetKey};
51use crate::chain::FilecoinSnapshotMetadata;
52use crate::db::car::RandomAccessFileReader;
53use crate::db::car::plain::write_skip_frame_header_async;
54use crate::utils::db::car_stream::{CarBlock, CarV1Header, uvi_bytes};
55use crate::utils::encoding::from_slice_with_fallback;
56use crate::utils::get_size::CidWrapper;
57use crate::utils::io::EitherMmapOrRandomAccessFile;
58use byteorder::LittleEndian;
59use bytes::{BufMut as _, Bytes, BytesMut, buf::Writer};
60use cid::Cid;
61use futures::{Stream, TryStreamExt as _};
62use fvm_ipld_blockstore::Blockstore;
63use fvm_ipld_encoding::CborStore as _;
64use integer_encoding::VarIntReader;
65use nunny::Vec as NonEmpty;
66use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor};
67use std::io::{Seek, SeekFrom};
68use std::path::Path;
69use std::sync::{Arc, OnceLock};
70use std::task::Poll;
71use std::{
72 io,
73 io::{Read, Write},
74};
75use tokio::io::{AsyncWrite, AsyncWriteExt};
76use tokio_util::codec::{Decoder, Encoder as _};
77
78#[cfg(feature = "benchmark-private")]
79pub mod index;
80#[cfg(not(feature = "benchmark-private"))]
81mod index;
82
83pub const FOREST_CAR_FILE_EXTENSION: &str = ".forest.car.zst";
84pub const TEMP_FOREST_CAR_FILE_EXTENSION: &str = ".forest.car.zst.tmp";
85pub const ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER: [u8; 4] = [0x50, 0x2A, 0x4D, 0x18];
87pub const DEFAULT_FOREST_CAR_FRAME_SIZE: usize = 8000_usize.next_power_of_two();
88pub const DEFAULT_FOREST_CAR_COMPRESSION_LEVEL: u16 = zstd::DEFAULT_COMPRESSION_LEVEL as _;
89const ZSTD_SKIP_FRAME_LEN: u64 = 8;
90
91pub type ForestCarFrame = (Vec<Cid>, Bytes);
93
94pub struct ForestCar<ReaderT> {
95 cache_key: CacheKey,
98 indexed: index::Reader<positioned_io::Slice<ReaderT>>,
99 index_size_bytes: u32,
100 frame_cache: Arc<ZstdFrameCache>,
101 header: CarV1Header,
102 metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
103}
104
105impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
106 pub fn new(reader: ReaderT) -> io::Result<ForestCar<ReaderT>> {
107 let (header, footer) = Self::validate_car(&reader)?;
108 let index_size_bytes = reader.read_u32_at::<LittleEndian>(
109 footer.index.saturating_sub(std::mem::size_of::<u32>() as _),
110 )?;
111 let indexed = index::Reader::new(positioned_io::Slice::new(
112 reader,
113 footer.index,
114 Some(index_size_bytes as u64),
115 ))?;
116 Ok(ForestCar {
117 cache_key: 0,
118 indexed,
119 index_size_bytes,
120 frame_cache: Arc::new(ZstdFrameCache::default()),
121 header,
122 metadata: OnceLock::new(),
123 })
124 }
125
126 pub fn metadata(&self) -> &Option<FilecoinSnapshotMetadata> {
127 self.metadata.get_or_init(|| {
128 if self.header.roots.len() == super::V2_SNAPSHOT_ROOT_COUNT {
129 let maybe_metadata_cid = self.header.roots.first();
130 if let Ok(Some(metadata)) =
131 self.get_cbor::<FilecoinSnapshotMetadata>(maybe_metadata_cid)
132 {
133 return Some(metadata);
134 }
135 }
136 None
137 })
138 }
139
140 pub fn is_valid(reader: &ReaderT) -> bool {
141 Self::validate_car(reader).is_ok()
142 }
143
144 fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, ForestCarFooter)> {
145 let mut cursor = SizeCursor::new(&reader);
146 cursor.seek(SeekFrom::End(-(ForestCarFooter::SIZE as i64)))?;
147
148 let mut footer_buffer = [0; ForestCarFooter::SIZE];
149 cursor.read_exact(&mut footer_buffer)?;
150
151 let footer = ForestCarFooter::try_from_le_bytes(footer_buffer).ok_or_else(|| {
152 invalid_data(format!(
153 "not recognizable as a `{FOREST_CAR_FILE_EXTENSION}` file"
154 ))
155 })?;
156
157 let cursor = Cursor::new_pos(&reader, 0);
158 let mut header_zstd_frame = decode_zstd_single_frame(cursor)?;
159 let block_frame = uvi_bytes()
160 .decode(&mut header_zstd_frame)?
161 .ok_or_else(|| invalid_data("malformed uvibytes"))?;
162 let header = from_slice_with_fallback::<CarV1Header>(&block_frame)
163 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
164
165 Ok((header, footer))
166 }
167
168 pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
169 if let Some(metadata) = self.metadata() {
172 &metadata.head_tipset_key
173 } else {
174 &self.header.roots
175 }
176 }
177
178 pub fn index_size_bytes(&self) -> u32 {
179 self.index_size_bytes
180 }
181
182 pub fn heaviest_tipset_key(&self) -> TipsetKey {
183 TipsetKey::from(self.head_tipset_key().clone())
184 }
185
186 pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
187 Tipset::load_required(self, &self.heaviest_tipset_key())
188 }
189
190 pub fn into_dyn(self) -> ForestCar<Box<dyn super::RandomAccessFileReader>> {
191 ForestCar {
192 cache_key: self.cache_key,
193 indexed: self.indexed.map(|slice| {
194 let offset = slice.offset();
195 positioned_io::Slice::new(
196 Box::new(slice.into_inner()) as Box<dyn RandomAccessFileReader>,
197 offset,
198 None,
199 )
200 }),
201 index_size_bytes: self.index_size_bytes,
202 frame_cache: self.frame_cache,
203 header: self.header,
204 metadata: self.metadata,
205 }
206 }
207
208 pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
209 Self {
210 cache_key: key,
211 frame_cache: cache,
212 ..self
213 }
214 }
215
216 pub fn get_reader(&self, k: Cid) -> anyhow::Result<Option<impl Read>> {
218 for position in self.indexed.get(k)? {
219 let entire_file = self.indexed.reader().get_ref();
221 let cursor = Cursor::new_pos(entire_file, position);
223 let mut decoder = zstd::Decoder::new(cursor)?.single_frame();
224 while let Ok(car_block_len) = decoder.read_varint::<usize>() {
225 let cid = Cid::read_bytes(&mut decoder)?;
226 let data_len = car_block_len.saturating_sub(cid.encoded_len()) as u64;
227 if cid == k {
228 return Ok(Some(decoder.take(data_len)));
230 }
231 io::copy(&mut decoder.by_ref().take(data_len), &mut io::sink())?;
233 }
234 }
235 Ok(None)
236 }
237}
238
239impl TryFrom<&Path> for ForestCar<EitherMmapOrRandomAccessFile> {
240 type Error = std::io::Error;
241 fn try_from(path: &Path) -> std::io::Result<Self> {
242 ForestCar::new(EitherMmapOrRandomAccessFile::open(path)?)
243 }
244}
245
246impl<ReaderT> Blockstore for ForestCar<ReaderT>
247where
248 ReaderT: ReadAt,
249{
250 #[tracing::instrument(level = "trace", skip(self))]
251 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
252 let indexed = &self.indexed;
253 for position in indexed.get(*k)?.into_iter() {
254 let cache_query = self.frame_cache.get(position, self.cache_key, *k);
255 match cache_query {
256 Some(Some(val)) => return Ok(Some(val)),
258 Some(None) => {}
260 None => {
261 let entire_file = indexed.reader().get_ref(); let cursor = Cursor::new_pos(entire_file, position);
264 let mut zstd_frame = decode_zstd_single_frame(cursor)?;
265 let mut block_map = hashbrown::HashMap::new();
267 while let Some(block_frame) = uvi_bytes().decode_eof(&mut zstd_frame)? {
268 let CarBlock { cid, data } = CarBlock::from_bytes(block_frame)?;
269 block_map.insert(cid.into(), data);
270 }
271 let get_result = block_map.get(&CidWrapper::from(*k)).cloned();
272 self.frame_cache.put(position, self.cache_key, block_map);
273
274 if let Some(value) = get_result {
276 return Ok(Some(value));
277 }
278 }
279 }
280 }
281 Ok(None)
282 }
283
284 fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
286 unreachable!("ForestCar is read-only, use ManyCar instead");
287 }
288}
289
290fn decode_zstd_single_frame<ReaderT: Read>(reader: ReaderT) -> io::Result<BytesMut> {
291 let mut zstd_frame = vec![];
292 zstd::Decoder::new(reader)?
293 .single_frame()
294 .read_to_end(&mut zstd_frame)?;
295 Ok(zstd_frame.into_iter().collect())
296}
297
298pub struct Encoder {}
299
300impl Encoder {
301 pub async fn write(
302 mut sink: impl AsyncWrite + Unpin,
303 roots: NonEmpty<Cid>,
304 mut stream: impl Stream<Item = anyhow::Result<ForestCarFrame>> + Unpin,
305 ) -> anyhow::Result<()> {
306 let mut offset = 0;
307
308 let mut header_encoder = new_encoder(DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
310
311 let header = CarV1Header { roots, version: 1 };
312 let mut header_uvi_frame = BytesMut::new();
313 uvi_bytes().encode(
314 Bytes::from(fvm_ipld_encoding::to_vec(&header)?),
315 &mut header_uvi_frame,
316 )?;
317 header_encoder.write_all(&header_uvi_frame)?;
318 let header_bytes = header_encoder.finish()?.into_inner().freeze();
319
320 sink.write_all(&header_bytes).await?;
321 let header_len = header_bytes.len();
322
323 offset += header_len;
324
325 let mut builder = index::Builder::new();
327 while let Some((cids, zstd_frame)) = stream.try_next().await? {
328 builder.extend(cids.into_iter().map(|cid| (cid, offset as u64)));
329 sink.write_all(&zstd_frame).await?;
330 offset += zstd_frame.len()
331 }
332
333 let writer = builder.into_writer();
335 write_skip_frame_header_async(&mut sink, writer.written_len().try_into().unwrap()).await?;
336 writer.write_into(&mut sink).await?;
337
338 let footer = ForestCarFooter {
340 index: offset as u64 + ZSTD_SKIP_FRAME_LEN,
341 };
342 sink.write_all(&footer.to_le_bytes()).await?;
343 Ok(())
344 }
345
346 pub fn compress_stream_default(
348 stream: impl Stream<Item = anyhow::Result<CarBlock>>,
349 ) -> impl Stream<Item = anyhow::Result<ForestCarFrame>> {
350 Self::compress_stream(
351 DEFAULT_FOREST_CAR_FRAME_SIZE,
352 DEFAULT_FOREST_CAR_COMPRESSION_LEVEL,
353 stream,
354 )
355 }
356
357 pub fn compress_stream(
360 zstd_frame_size_tripwire: usize,
361 zstd_compression_level: u16,
362 stream: impl Stream<Item = anyhow::Result<CarBlock>>,
363 ) -> impl Stream<Item = anyhow::Result<ForestCarFrame>> {
364 let mut encoder_store = new_encoder(zstd_compression_level);
365 let mut frame_cids = vec![];
366
367 let mut stream = Box::pin(stream.into_stream());
368 futures::stream::poll_fn(move |cx| {
369 let encoder = match encoder_store.as_mut() {
370 Err(e) => {
371 let dummy_error = io::Error::other("Error already consumed.");
372 return Poll::Ready(Some(Err(anyhow::Error::from(std::mem::replace(
373 e,
374 dummy_error,
375 )))));
376 }
377 Ok(encoder) => encoder,
378 };
379 loop {
380 if compressed_len(encoder) > zstd_frame_size_tripwire {
382 let cids = std::mem::take(&mut frame_cids);
383 let frame = finalize_frame(zstd_compression_level, encoder)?;
384 return Poll::Ready(Some(Ok((cids, frame))));
385 }
386 let ret = futures::ready!(stream.as_mut().poll_next(cx));
388 match ret {
389 None => {
391 if compressed_len(encoder) > 0 {
393 let cids = std::mem::take(&mut frame_cids);
394 let frame = finalize_frame(zstd_compression_level, encoder)?;
395 return Poll::Ready(Some(Ok((cids, frame))));
396 } else {
397 return Poll::Ready(None);
399 }
400 }
401 Some(Err(e)) => {
403 return Poll::Ready(Some(Err(anyhow::anyhow!(
404 "error polling CarBlock from stream: {e}"
405 ))));
406 }
407 Some(Ok(block)) => {
409 frame_cids.push(block.cid);
410 block.write(encoder)?;
411 encoder.flush()?;
412 }
413 }
414 }
415 })
416 }
417}
418
419fn invalid_data(inner: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
420 io::Error::new(io::ErrorKind::InvalidData, inner)
421}
422
423fn compressed_len(encoder: &zstd::Encoder<'static, Writer<BytesMut>>) -> usize {
424 encoder.get_ref().get_ref().len()
425}
426
427pub fn finalize_frame(
428 zstd_compression_level: u16,
429 encoder: &mut zstd::Encoder<'static, Writer<BytesMut>>,
430) -> io::Result<Bytes> {
431 let prev_encoder = std::mem::replace(encoder, new_encoder(zstd_compression_level)?);
432 Ok(prev_encoder.finish()?.into_inner().freeze())
433}
434
435pub fn new_encoder(
436 zstd_compression_level: u16,
437) -> io::Result<zstd::Encoder<'static, Writer<BytesMut>>> {
438 zstd::Encoder::new(BytesMut::new().writer(), i32::from(zstd_compression_level))
439}
440
441#[derive(Debug, Clone, Eq, PartialEq)]
442#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
443struct ForestCarFooter {
444 index: u64,
445}
446
447impl ForestCarFooter {
448 pub const SIZE: usize = 16;
449
450 pub fn to_le_bytes(&self) -> [u8; Self::SIZE] {
451 let mut buffer = [0; 16];
452 buffer[0..4].copy_from_slice(&ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER);
454 buffer[4..8].copy_from_slice(&(std::mem::size_of_val(&self.index) as u32).to_le_bytes());
456 buffer[8..16].copy_from_slice(&self.index.to_le_bytes());
458 buffer
459 }
460
461 pub fn try_from_le_bytes(bytes: [u8; Self::SIZE]) -> Option<ForestCarFooter> {
462 let index = u64::from_le_bytes(bytes[8..16].try_into().expect("infallible"));
463 let footer = ForestCarFooter { index };
464 if bytes == footer.to_le_bytes() {
465 Some(footer)
466 } else {
467 None
468 }
469 }
470}
471
472pub fn new_forest_car_temp_path_in(
473 output_dir: impl AsRef<Path>,
474) -> std::io::Result<tempfile::TempPath> {
475 Ok(tempfile::Builder::new()
476 .suffix(TEMP_FOREST_CAR_FILE_EXTENSION)
477 .tempfile_in(output_dir)?
478 .into_temp_path())
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use crate::block_on;
485 use nunny::vec as nonempty;
486 use quickcheck_macros::quickcheck;
487
488 fn mk_encoded_car(
489 zstd_frame_size_tripwire: usize,
490 zstd_compression_level: u16,
491 roots: NonEmpty<Cid>,
492 blocks: NonEmpty<CarBlock>,
493 ) -> Vec<u8> {
494 block_on(async {
495 let frame_stream = Encoder::compress_stream(
496 zstd_frame_size_tripwire,
497 zstd_compression_level,
498 futures::stream::iter(blocks.into_iter().map(Ok)),
499 );
500 let mut encoded = vec![];
501 Encoder::write(&mut encoded, roots, frame_stream)
502 .await
503 .unwrap();
504 encoded
505 })
506 }
507
508 #[quickcheck]
509 fn forest_car_create_basic(blocks: nunny::Vec<CarBlock>) {
510 let roots = nonempty!(blocks.first().cid);
511 let forest_car =
512 ForestCar::new(mk_encoded_car(1024 * 4, 3, roots.clone(), blocks.clone())).unwrap();
513 assert_eq!(forest_car.head_tipset_key(), &roots);
514 for block in blocks {
515 assert_eq!(forest_car.get(&block.cid).unwrap().unwrap(), block.data);
516 let mut buf = vec![];
517 forest_car
518 .get_reader(block.cid)
519 .unwrap()
520 .unwrap()
521 .read_to_end(&mut buf)
522 .unwrap();
523 assert_eq!(buf, block.data);
524 }
525 }
526
527 #[quickcheck]
528 fn forest_car_create_options(
529 blocks: nunny::Vec<CarBlock>,
530 frame_size: usize,
531 mut compression_level: u16,
532 ) {
533 compression_level %= 15;
534 let roots = nonempty!(blocks.first().cid);
535
536 let forest_car = ForestCar::new(mk_encoded_car(
537 frame_size,
538 compression_level.max(1),
539 roots.clone(),
540 blocks.clone(),
541 ))
542 .unwrap();
543 assert_eq!(forest_car.head_tipset_key(), &roots);
544 for block in blocks {
545 assert_eq!(forest_car.get(&block.cid).unwrap(), Some(block.data));
546 }
547 }
548
549 #[quickcheck]
550 fn forest_car_open_invalid(junk: Vec<u8>) {
551 assert!(ForestCar::new(junk).is_err());
553 }
554
555 #[quickcheck]
556 fn forest_footer_roundtrip(footer: ForestCarFooter) {
557 let footer_recoded = ForestCarFooter::try_from_le_bytes(footer.to_le_bytes());
558 assert_eq!(footer_recoded, Some(footer));
559 }
560
561 #[test]
563 fn encode_hash_collisions() {
564 use crate::utils::multihash::prelude::*;
565
566 let cid_a = Cid::new_v1(0, MultihashCode::Identity.digest(&[10]));
568 let cid_b = Cid::new_v1(0, MultihashCode::Identity.digest(&[0]));
569 assert_ne!(cid_a, cid_b);
571 assert_eq!(index::hash::summary(&cid_a), index::hash::summary(&cid_b));
573
574 let blocks = nonempty![
577 CarBlock {
578 cid: cid_a,
579 data: Vec::from_iter(*b"bill and ben"),
580 },
581 CarBlock {
582 cid: cid_b,
583 data: Vec::from_iter(*b"the flowerpot men"),
584 },
585 ];
586
587 let forest_car = ForestCar::new(mk_encoded_car(
589 0,
590 3,
591 nonempty![blocks.first().cid],
592 blocks.clone(),
593 ))
594 .unwrap();
595
596 assert_eq!(forest_car.get(&cid_a).unwrap().unwrap(), blocks[0].data);
598 assert_eq!(forest_car.get(&cid_b).unwrap().unwrap(), blocks[1].data);
599 }
600}