1use crate::chain::FilecoinSnapshotMetadata;
64use crate::cid_collections::CidHashMap;
65use crate::db::PersistentStore;
66use crate::utils::db::car_stream::{CarV1Header, CarV2Header};
67use crate::{
68 blocks::{Tipset, TipsetKey},
69 utils::encoding::from_slice_with_fallback,
70};
71use cid::Cid;
72use fvm_ipld_blockstore::Blockstore;
73use fvm_ipld_encoding::CborStore as _;
74use integer_encoding::{FixedIntReader, VarIntReader};
75use nunny::Vec as NonEmpty;
76use parking_lot::RwLock;
77use positioned_io::ReadAt;
78use std::{
79 io::{
80 self, BufReader,
81 ErrorKind::{InvalidData, Unsupported},
82 Read, Seek, SeekFrom,
83 },
84 iter,
85 sync::OnceLock,
86};
87use tokio::io::{AsyncWrite, AsyncWriteExt};
88use tracing::{debug, trace};
89
90pub struct PlainCar<ReaderT> {
112 reader: ReaderT,
113 index: RwLock<CidHashMap<UncompressedBlockDataLocation>>,
114 version: u64,
115 header_v1: CarV1Header,
116 header_v2: Option<CarV2Header>,
117 metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
118}
119
120impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
121 #[tracing::instrument(level = "debug", skip_all)]
126 pub fn new(reader: ReaderT) -> io::Result<Self> {
127 let mut cursor = positioned_io::Cursor::new(&reader);
128 let position = cursor.position();
129 let header_v2 = read_v2_header(&mut cursor)?;
130 let (limit_position, version) =
131 if let Some(header_v2) = &header_v2 {
132 cursor.set_position(position.saturating_add(
133 u64::try_from(header_v2.data_offset).map_err(io::Error::other)?,
134 ));
135 (
136 Some(cursor.stream_position()?.saturating_add(
137 u64::try_from(header_v2.data_size).map_err(io::Error::other)?,
138 )),
139 2,
140 )
141 } else {
142 cursor.set_position(position);
143 (None, 1)
144 };
145
146 let header_v1 = read_v1_header(&mut cursor)?;
147 let mut buf_reader = BufReader::with_capacity(1024, cursor);
150
151 let index = iter::from_fn(|| {
153 read_block_data_location_and_skip(&mut buf_reader, limit_position).transpose()
154 })
155 .collect::<Result<CidHashMap<_>, _>>()?;
156
157 match index.len() {
158 0 => Err(io::Error::new(
159 InvalidData,
160 "CARv1 files must contain at least one block",
161 )),
162 num_blocks => {
163 debug!(num_blocks, "indexed CAR");
164 Ok(Self {
165 reader,
166 index: RwLock::new(index),
167 version,
168 header_v1,
169 header_v2,
170 metadata: OnceLock::new(),
171 })
172 }
173 }
174 }
175
176 pub fn metadata(&self) -> Option<&FilecoinSnapshotMetadata> {
177 self.metadata
178 .get_or_init(|| {
179 if self.header_v1.roots.len() == super::V2_SNAPSHOT_ROOT_COUNT {
180 let maybe_metadata_cid = self.header_v1.roots.first();
181 if let Ok(Some(metadata)) =
182 self.get_cbor::<FilecoinSnapshotMetadata>(maybe_metadata_cid)
183 {
184 return Some(metadata);
185 }
186 }
187 None
188 })
189 .as_ref()
190 }
191
192 pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
193 if let Some(metadata) = self.metadata() {
196 &metadata.head_tipset_key
197 } else {
198 &self.header_v1.roots
199 }
200 }
201
202 pub fn version(&self) -> u64 {
203 self.version
204 }
205
206 pub fn heaviest_tipset_key(&self) -> TipsetKey {
207 TipsetKey::from(self.head_tipset_key().clone())
208 }
209
210 pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
211 Tipset::load_required(self, &self.heaviest_tipset_key())
212 }
213
214 #[cfg(test)]
216 pub fn cids(&self) -> Vec<Cid> {
217 self.index.read().keys().collect()
218 }
219
220 pub fn into_dyn(self) -> PlainCar<Box<dyn super::RandomAccessFileReader>> {
221 PlainCar {
222 reader: Box::new(self.reader),
223 index: self.index,
224 version: self.version,
225 header_v1: self.header_v1,
226 header_v2: self.header_v2,
227 metadata: self.metadata,
228 }
229 }
230
231 pub fn get_reader(&self, k: Cid) -> Option<impl Read> {
233 self.index
234 .read()
235 .get(&k)
236 .map(|UncompressedBlockDataLocation { offset, length }| {
237 positioned_io::Cursor::new_pos(&self.reader, *offset).take(u64::from(*length))
238 })
239 }
240}
241
242impl TryFrom<&'static [u8]> for PlainCar<&'static [u8]> {
243 type Error = io::Error;
244 fn try_from(bytes: &'static [u8]) -> io::Result<Self> {
245 PlainCar::new(bytes)
246 }
247}
248
249#[derive(Debug, serde::Serialize, serde::Deserialize)]
252pub struct UncompressedBlockDataLocation {
253 offset: u64,
254 length: u32,
255}
256
257impl<ReaderT> Blockstore for PlainCar<ReaderT>
258where
259 ReaderT: ReadAt,
260{
261 #[tracing::instrument(level = "trace", skip(self))]
262 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
263 match self.index.read().get(k) {
264 Some(UncompressedBlockDataLocation { offset, length }) => {
265 trace!("fetching from disk");
266 let mut data = vec![0; usize::try_from(*length).unwrap()];
267 self.reader.read_exact_at(*offset, &mut data)?;
268 Ok(Some(data))
269 }
270 None => {
271 trace!("not found");
272 Ok(None)
273 }
274 }
275 }
276
277 fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
279 unreachable!("PlainCar is read-only, use ManyCar instead");
280 }
281}
282
283impl<ReaderT> PersistentStore for PlainCar<ReaderT>
284where
285 ReaderT: ReadAt,
286{
287 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
288 self.put_keyed(k, block)
289 }
290}
291
292pub async fn write_skip_frame_header_async(
293 mut writer: impl AsyncWrite + Unpin,
294 data_len: u32,
295) -> std::io::Result<()> {
296 writer
297 .write_all(&super::forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER)
298 .await?;
299 writer.write_all(&data_len.to_le_bytes()).await?;
300 Ok(())
301}
302
303fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
304 match cid_error {
305 cid::Error::Io(io_error) => io_error,
306 other => io::Error::new(InvalidData, other),
307 }
308}
309
310pub fn read_v2_header(mut reader: impl Read) -> io::Result<Option<CarV2Header>> {
318 const CAR_V2_PRAGMA: [u8; 10] = [0xa1, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02];
320
321 let len = reader.read_fixedint::<u8>()? as usize;
322 if len == CAR_V2_PRAGMA.len() {
323 let mut buffer = vec![0; len];
324 reader.read_exact(&mut buffer)?;
325 if buffer[..] == CAR_V2_PRAGMA {
326 let mut characteristics = [0; 16];
327 reader.read_exact(&mut characteristics)?;
328 let data_offset: i64 = reader.read_fixedint()?;
329 let data_size: i64 = reader.read_fixedint()?;
330 let index_offset: i64 = reader.read_fixedint()?;
331 return Ok(Some(CarV2Header {
332 characteristics,
333 data_offset,
334 data_size,
335 index_offset,
336 }));
337 }
338 }
339 Ok(None)
340}
341
342#[tracing::instrument(level = "trace", skip_all, ret)]
349fn read_v1_header(mut reader: impl Read) -> io::Result<CarV1Header> {
350 let header_len = reader.read_varint()?;
351 let mut buffer = vec![0; header_len];
352 reader.read_exact(&mut buffer)?;
353 let header: CarV1Header =
354 from_slice_with_fallback(&buffer).map_err(|e| io::Error::new(InvalidData, e))?;
355 if header.version == 1 {
356 Ok(header)
357 } else {
358 Err(io::Error::new(
359 Unsupported,
360 format!("unsupported CAR version {}", header.version),
361 ))
362 }
363}
364
365#[tracing::instrument(level = "trace", skip_all, ret)]
381fn read_block_data_location_and_skip(
382 mut reader: impl Read + Seek,
383 limit_position: Option<u64>,
384) -> io::Result<Option<(Cid, UncompressedBlockDataLocation)>> {
385 if let Some(limit_position) = limit_position
386 && reader.stream_position()? >= limit_position
387 {
388 return Ok(None);
389 }
390 let Some(body_length) = read_varint_body_length_or_eof(&mut reader)? else {
391 return Ok(None);
392 };
393 let frame_body_offset = reader.stream_position()?;
394 let mut reader = CountRead::new(&mut reader);
395 let cid = Cid::read_bytes(&mut reader).map_err(cid_error_to_io_error)?;
396
397 let cid_length = reader.bytes_read();
399 let block_data_offset = frame_body_offset + u64::try_from(cid_length).unwrap();
400 let next_frame_offset = frame_body_offset + u64::from(body_length);
401 let block_data_length = u32::try_from(next_frame_offset - block_data_offset).unwrap();
402 reader
403 .into_inner()
404 .seek(SeekFrom::Start(next_frame_offset))?;
405 Ok(Some((
406 cid,
407 UncompressedBlockDataLocation {
408 offset: block_data_offset,
409 length: block_data_length,
410 },
411 )))
412}
413
414fn read_varint_body_length_or_eof(mut reader: impl Read) -> io::Result<Option<u32>> {
425 let mut byte = [0u8; 1]; match reader.read(&mut byte)? {
427 0 => Ok(None),
428 1 => (byte.chain(reader)).read_varint().map(Some),
429 _ => unreachable!(),
430 }
431}
432
433struct CountRead<ReadT> {
437 inner: ReadT,
438 count: usize,
439}
440
441impl<ReadT> CountRead<ReadT> {
442 pub fn new(inner: ReadT) -> Self {
443 Self { inner, count: 0 }
444 }
445 pub fn bytes_read(&self) -> usize {
446 self.count
447 }
448 pub fn into_inner(self) -> ReadT {
449 self.inner
450 }
451}
452
453impl<ReadT> Read for CountRead<ReadT>
454where
455 ReadT: Read,
456{
457 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
458 let n = self.inner.read(buf)?;
459 self.count += n;
460 Ok(n)
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use crate::utils::db::{
468 car_stream::{CarStream, CarV1Header},
469 car_util::load_car,
470 };
471 use futures::TryStreamExt as _;
472 use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
473 use std::io::Cursor;
474 use std::sync::LazyLock;
475 use tokio::io::{AsyncBufRead, AsyncSeek, BufReader};
476 use tokio_test::block_on;
477
478 #[test]
479 fn test_uncompressed_v1() {
480 let car = chain4_car();
481 let car_backed = PlainCar::new(car).unwrap();
482
483 assert_eq!(car_backed.version(), 1);
484 assert_eq!(car_backed.head_tipset_key().len(), 1);
485 assert_eq!(car_backed.cids().len(), 1222);
486
487 let reference_car = reference(Cursor::new(car));
488 let reference_car_zst = reference(Cursor::new(chain4_car_zst()));
489 let reference_car_zst_unsafe = reference_unsafe(chain4_car_zst());
490 for cid in car_backed.cids() {
491 let expected = reference_car.get(&cid).unwrap().unwrap();
492 let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
493 let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
494 let mut expected4 = vec![];
495 car_backed
496 .get_reader(cid)
497 .unwrap()
498 .read_to_end(&mut expected4)
499 .unwrap();
500 let actual = car_backed.get(&cid).unwrap().unwrap();
501 assert_eq!(expected, actual);
502 assert_eq!(expected2, actual);
503 assert_eq!(expected3, actual);
504 assert_eq!(expected4, actual);
505 }
506 }
507
508 #[test]
509 fn test_uncompressed_v2() {
510 let car = carv2_car();
511 let car_backed = PlainCar::new(car).unwrap();
512
513 assert_eq!(car_backed.version(), 2);
514 assert_eq!(car_backed.head_tipset_key().len(), 1);
515 assert_eq!(car_backed.cids().len(), 7153);
516
517 let reference_car = reference(Cursor::new(car));
518 let reference_car_zst = reference(Cursor::new(carv2_car_zst()));
519 let reference_car_zst_unsafe = reference_unsafe(carv2_car_zst());
520 for cid in car_backed.cids() {
521 let expected = reference_car.get(&cid).unwrap().unwrap();
522 let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
523 let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
524 let actual = car_backed.get(&cid).unwrap().unwrap();
525 assert_eq!(expected, actual);
526 assert_eq!(expected2, actual);
527 assert_eq!(expected3, actual);
528 }
529 }
530
531 fn reference(reader: impl AsyncBufRead + AsyncSeek + Unpin) -> MemoryBlockstore {
532 let blockstore = MemoryBlockstore::new();
533 block_on(load_car(&blockstore, reader)).unwrap();
534 blockstore
535 }
536
537 fn reference_unsafe(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore {
538 let blockstore = MemoryBlockstore::new();
539 block_on(load_car_unsafe(&blockstore, reader)).unwrap();
540 blockstore
541 }
542
543 pub async fn load_car_unsafe<R>(db: &impl Blockstore, reader: R) -> anyhow::Result<CarV1Header>
544 where
545 R: AsyncBufRead + Unpin,
546 {
547 let mut stream = CarStream::new_unsafe(BufReader::new(reader)).await?;
548 while let Some(block) = stream.try_next().await? {
549 db.put_keyed(&block.cid, &block.data)?;
550 }
551 Ok(stream.header_v1)
552 }
553
554 fn chain4_car_zst() -> &'static [u8] {
555 include_bytes!("../../../test-snapshots/chain4.car.zst")
556 }
557
558 fn chain4_car() -> &'static [u8] {
559 static CAR: LazyLock<Vec<u8>> =
560 LazyLock::new(|| zstd::decode_all(chain4_car_zst()).unwrap());
561 CAR.as_slice()
562 }
563
564 fn carv2_car_zst() -> &'static [u8] {
565 include_bytes!("../../../test-snapshots/carv2.car.zst")
566 }
567
568 fn carv2_car() -> &'static [u8] {
569 static CAR: LazyLock<Vec<u8>> =
570 LazyLock::new(|| zstd::decode_all(carv2_car_zst()).unwrap());
571 CAR.as_slice()
572 }
573}