1use crate::chain::FilecoinSnapshotMetadata;
64use crate::cid_collections::CidHashMap;
65use crate::db::PersistentStore;
66use crate::utils::db::car_stream::{CarV1Header, CarV2Header};
67use crate::{
68 blocks::{Tipset, TipsetKey},
69 utils::encoding::from_slice_with_fallback,
70};
71use cid::Cid;
72use fvm_ipld_blockstore::Blockstore;
73use fvm_ipld_encoding::CborStore as _;
74use integer_encoding::{FixedIntReader, VarIntReader};
75use nunny::Vec as NonEmpty;
76use parking_lot::RwLock;
77use positioned_io::ReadAt;
78use std::{
79 io::{
80 self, BufReader,
81 ErrorKind::{InvalidData, Unsupported},
82 Read, Seek, SeekFrom,
83 },
84 iter,
85 sync::OnceLock,
86};
87use tokio::io::{AsyncWrite, AsyncWriteExt};
88use tracing::{debug, trace};
89
90pub struct PlainCar<ReaderT> {
112 reader: ReaderT,
113 index: RwLock<CidHashMap<UncompressedBlockDataLocation>>,
114 version: u64,
115 header_v1: CarV1Header,
116 header_v2: Option<CarV2Header>,
117 metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
118}
119
120impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
121 #[tracing::instrument(level = "debug", skip_all)]
126 pub fn new(reader: ReaderT) -> io::Result<Self> {
127 let mut cursor = positioned_io::Cursor::new(&reader);
128 let position = cursor.position();
129 let header_v2 = read_v2_header(&mut cursor)?;
130 let (limit_position, version) =
131 if let Some(header_v2) = &header_v2 {
132 cursor.set_position(position.saturating_add(
133 u64::try_from(header_v2.data_offset).map_err(io::Error::other)?,
134 ));
135 (
136 Some(cursor.stream_position()?.saturating_add(
137 u64::try_from(header_v2.data_size).map_err(io::Error::other)?,
138 )),
139 2,
140 )
141 } else {
142 cursor.set_position(position);
143 (None, 1)
144 };
145
146 let header_v1 = read_v1_header(&mut cursor)?;
147 let mut buf_reader = BufReader::with_capacity(1024, cursor);
150
151 let index = iter::from_fn(|| {
153 read_block_data_location_and_skip(&mut buf_reader, limit_position).transpose()
154 })
155 .collect::<Result<CidHashMap<_>, _>>()?;
156
157 match index.len() {
158 0 => Err(io::Error::new(
159 InvalidData,
160 "CARv1 files must contain at least one block",
161 )),
162 num_blocks => {
163 debug!(num_blocks, "indexed CAR");
164 Ok(Self {
165 reader,
166 index: RwLock::new(index),
167 version,
168 header_v1,
169 header_v2,
170 metadata: OnceLock::new(),
171 })
172 }
173 }
174 }
175
176 pub fn metadata(&self) -> &Option<FilecoinSnapshotMetadata> {
177 self.metadata.get_or_init(|| {
178 if self.header_v1.roots.len() == super::V2_SNAPSHOT_ROOT_COUNT {
179 let maybe_metadata_cid = self.header_v1.roots.first();
180 if let Ok(Some(metadata)) =
181 self.get_cbor::<FilecoinSnapshotMetadata>(maybe_metadata_cid)
182 {
183 return Some(metadata);
184 }
185 }
186 None
187 })
188 }
189
190 pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
191 if let Some(metadata) = self.metadata() {
194 &metadata.head_tipset_key
195 } else {
196 &self.header_v1.roots
197 }
198 }
199
200 pub fn version(&self) -> u64 {
201 self.version
202 }
203
204 pub fn heaviest_tipset_key(&self) -> TipsetKey {
205 TipsetKey::from(self.head_tipset_key().clone())
206 }
207
208 pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
209 Tipset::load_required(self, &self.heaviest_tipset_key())
210 }
211
212 #[cfg(test)]
214 pub fn cids(&self) -> Vec<Cid> {
215 self.index.read().keys().collect()
216 }
217
218 pub fn into_dyn(self) -> PlainCar<Box<dyn super::RandomAccessFileReader>> {
219 PlainCar {
220 reader: Box::new(self.reader),
221 index: self.index,
222 version: self.version,
223 header_v1: self.header_v1,
224 header_v2: self.header_v2,
225 metadata: self.metadata,
226 }
227 }
228
229 pub fn get_reader(&self, k: Cid) -> Option<impl Read> {
231 self.index
232 .read()
233 .get(&k)
234 .map(|UncompressedBlockDataLocation { offset, length }| {
235 positioned_io::Cursor::new_pos(&self.reader, *offset).take(*length as u64)
236 })
237 }
238}
239
240impl TryFrom<&'static [u8]> for PlainCar<&'static [u8]> {
241 type Error = io::Error;
242 fn try_from(bytes: &'static [u8]) -> io::Result<Self> {
243 PlainCar::new(bytes)
244 }
245}
246
247#[derive(Debug, serde::Serialize, serde::Deserialize)]
250pub struct UncompressedBlockDataLocation {
251 offset: u64,
252 length: u32,
253}
254
255impl<ReaderT> Blockstore for PlainCar<ReaderT>
256where
257 ReaderT: ReadAt,
258{
259 #[tracing::instrument(level = "trace", skip(self))]
260 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
261 match self.index.read().get(k) {
262 Some(UncompressedBlockDataLocation { offset, length }) => {
263 trace!("fetching from disk");
264 let mut data = vec![0; usize::try_from(*length).unwrap()];
265 self.reader.read_exact_at(*offset, &mut data)?;
266 Ok(Some(data))
267 }
268 None => {
269 trace!("not found");
270 Ok(None)
271 }
272 }
273 }
274
275 fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
277 unreachable!("PlainCar is read-only, use ManyCar instead");
278 }
279}
280
281impl<ReaderT> PersistentStore for PlainCar<ReaderT>
282where
283 ReaderT: ReadAt,
284{
285 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
286 self.put_keyed(k, block)
287 }
288}
289
290pub async fn write_skip_frame_header_async(
291 mut writer: impl AsyncWrite + Unpin,
292 data_len: u32,
293) -> std::io::Result<()> {
294 writer
295 .write_all(&super::forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER)
296 .await?;
297 writer.write_all(&data_len.to_le_bytes()).await?;
298 Ok(())
299}
300
301fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
302 match cid_error {
303 cid::Error::Io(io_error) => io_error,
304 other => io::Error::new(InvalidData, other),
305 }
306}
307
308pub fn read_v2_header(mut reader: impl Read) -> io::Result<Option<CarV2Header>> {
316 const CAR_V2_PRAGMA: [u8; 10] = [0xa1, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02];
318
319 let len = reader.read_fixedint::<u8>()? as usize;
320 if len == CAR_V2_PRAGMA.len() {
321 let mut buffer = vec![0; len];
322 reader.read_exact(&mut buffer)?;
323 if buffer[..] == CAR_V2_PRAGMA {
324 let mut characteristics = [0; 16];
325 reader.read_exact(&mut characteristics)?;
326 let data_offset: i64 = reader.read_fixedint()?;
327 let data_size: i64 = reader.read_fixedint()?;
328 let index_offset: i64 = reader.read_fixedint()?;
329 return Ok(Some(CarV2Header {
330 characteristics,
331 data_offset,
332 data_size,
333 index_offset,
334 }));
335 }
336 }
337 Ok(None)
338}
339
340#[tracing::instrument(level = "trace", skip_all, ret)]
347fn read_v1_header(mut reader: impl Read) -> io::Result<CarV1Header> {
348 let header_len = reader.read_varint()?;
349 let mut buffer = vec![0; header_len];
350 reader.read_exact(&mut buffer)?;
351 let header: CarV1Header =
352 from_slice_with_fallback(&buffer).map_err(|e| io::Error::new(InvalidData, e))?;
353 if header.version == 1 {
354 Ok(header)
355 } else {
356 Err(io::Error::new(
357 Unsupported,
358 format!("unsupported CAR version {}", header.version),
359 ))
360 }
361}
362
363#[tracing::instrument(level = "trace", skip_all, ret)]
379fn read_block_data_location_and_skip(
380 mut reader: impl Read + Seek,
381 limit_position: Option<u64>,
382) -> io::Result<Option<(Cid, UncompressedBlockDataLocation)>> {
383 if let Some(limit_position) = limit_position
384 && reader.stream_position()? >= limit_position
385 {
386 return Ok(None);
387 }
388 let Some(body_length) = read_varint_body_length_or_eof(&mut reader)? else {
389 return Ok(None);
390 };
391 let frame_body_offset = reader.stream_position()?;
392 let mut reader = CountRead::new(&mut reader);
393 let cid = Cid::read_bytes(&mut reader).map_err(cid_error_to_io_error)?;
394
395 let cid_length = reader.bytes_read();
397 let block_data_offset = frame_body_offset + u64::try_from(cid_length).unwrap();
398 let next_frame_offset = frame_body_offset + u64::from(body_length);
399 let block_data_length = u32::try_from(next_frame_offset - block_data_offset).unwrap();
400 reader
401 .into_inner()
402 .seek(SeekFrom::Start(next_frame_offset))?;
403 Ok(Some((
404 cid,
405 UncompressedBlockDataLocation {
406 offset: block_data_offset,
407 length: block_data_length,
408 },
409 )))
410}
411
412fn read_varint_body_length_or_eof(mut reader: impl Read) -> io::Result<Option<u32>> {
423 let mut byte = [0u8; 1]; match reader.read(&mut byte)? {
425 0 => Ok(None),
426 1 => (byte.chain(reader)).read_varint().map(Some),
427 _ => unreachable!(),
428 }
429}
430
431struct CountRead<ReadT> {
435 inner: ReadT,
436 count: usize,
437}
438
439impl<ReadT> CountRead<ReadT> {
440 pub fn new(inner: ReadT) -> Self {
441 Self { inner, count: 0 }
442 }
443 pub fn bytes_read(&self) -> usize {
444 self.count
445 }
446 pub fn into_inner(self) -> ReadT {
447 self.inner
448 }
449}
450
451impl<ReadT> Read for CountRead<ReadT>
452where
453 ReadT: Read,
454{
455 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
456 let n = self.inner.read(buf)?;
457 self.count += n;
458 Ok(n)
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use crate::utils::db::{
466 car_stream::{CarStream, CarV1Header},
467 car_util::load_car,
468 };
469 use futures::{TryStreamExt as _, executor::block_on};
470 use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
471 use std::io::Cursor;
472 use std::sync::LazyLock;
473 use tokio::io::{AsyncBufRead, AsyncSeek, BufReader};
474
475 #[test]
476 fn test_uncompressed_v1() {
477 let car = chain4_car();
478 let car_backed = PlainCar::new(car).unwrap();
479
480 assert_eq!(car_backed.version(), 1);
481 assert_eq!(car_backed.head_tipset_key().len(), 1);
482 assert_eq!(car_backed.cids().len(), 1222);
483
484 let reference_car = reference(Cursor::new(car));
485 let reference_car_zst = reference(Cursor::new(chain4_car_zst()));
486 let reference_car_zst_unsafe = reference_unsafe(chain4_car_zst());
487 for cid in car_backed.cids() {
488 let expected = reference_car.get(&cid).unwrap().unwrap();
489 let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
490 let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
491 let mut expected4 = vec![];
492 car_backed
493 .get_reader(cid)
494 .unwrap()
495 .read_to_end(&mut expected4)
496 .unwrap();
497 let actual = car_backed.get(&cid).unwrap().unwrap();
498 assert_eq!(expected, actual);
499 assert_eq!(expected2, actual);
500 assert_eq!(expected3, actual);
501 assert_eq!(expected4, actual);
502 }
503 }
504
505 #[test]
506 fn test_uncompressed_v2() {
507 let car = carv2_car();
508 let car_backed = PlainCar::new(car).unwrap();
509
510 assert_eq!(car_backed.version(), 2);
511 assert_eq!(car_backed.head_tipset_key().len(), 1);
512 assert_eq!(car_backed.cids().len(), 7153);
513
514 let reference_car = reference(Cursor::new(car));
515 let reference_car_zst = reference(Cursor::new(carv2_car_zst()));
516 let reference_car_zst_unsafe = reference_unsafe(carv2_car_zst());
517 for cid in car_backed.cids() {
518 let expected = reference_car.get(&cid).unwrap().unwrap();
519 let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
520 let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
521 let actual = car_backed.get(&cid).unwrap().unwrap();
522 assert_eq!(expected, actual);
523 assert_eq!(expected2, actual);
524 assert_eq!(expected3, actual);
525 }
526 }
527
528 fn reference(reader: impl AsyncBufRead + AsyncSeek + Unpin) -> MemoryBlockstore {
529 let blockstore = MemoryBlockstore::new();
530 block_on(load_car(&blockstore, reader)).unwrap();
531 blockstore
532 }
533
534 fn reference_unsafe(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore {
535 let blockstore = MemoryBlockstore::new();
536 block_on(load_car_unsafe(&blockstore, reader)).unwrap();
537 blockstore
538 }
539
540 pub async fn load_car_unsafe<R>(db: &impl Blockstore, reader: R) -> anyhow::Result<CarV1Header>
541 where
542 R: AsyncBufRead + Unpin,
543 {
544 let mut stream = CarStream::new_unsafe(BufReader::new(reader)).await?;
545 while let Some(block) = stream.try_next().await? {
546 db.put_keyed(&block.cid, &block.data)?;
547 }
548 Ok(stream.header_v1)
549 }
550
551 fn chain4_car_zst() -> &'static [u8] {
552 include_bytes!("../../../test-snapshots/chain4.car.zst")
553 }
554
555 fn chain4_car() -> &'static [u8] {
556 static CAR: LazyLock<Vec<u8>> =
557 LazyLock::new(|| zstd::decode_all(chain4_car_zst()).unwrap());
558 CAR.as_slice()
559 }
560
561 fn carv2_car_zst() -> &'static [u8] {
562 include_bytes!("../../../test-snapshots/carv2.car.zst")
563 }
564
565 fn carv2_car() -> &'static [u8] {
566 static CAR: LazyLock<Vec<u8>> =
567 LazyLock::new(|| zstd::decode_all(carv2_car_zst()).unwrap());
568 CAR.as_slice()
569 }
570}