1use crate::blocks::TipsetKey;
5use crate::chain::FilecoinSnapshotMetadata;
6use crate::db::car::plain::read_v2_header;
7use crate::utils::io::skip_bytes;
8use crate::utils::multihash::prelude::*;
9use async_compression::tokio::bufread::ZstdDecoder;
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11use cid::Cid;
12use futures::ready;
13use futures::{Stream, sink::Sink};
14use fvm_ipld_encoding::to_vec;
15use integer_encoding::{VarInt, VarIntAsyncReader as _};
16use nunny::Vec as NonEmpty;
17use pin_project_lite::pin_project;
18use serde::{Deserialize, Serialize};
19use std::io::{self, Cursor, Read, SeekFrom, Write};
20use std::path::Path;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23use tokio::io::{
24 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
25 Take,
26};
27use tokio_util::codec::{Encoder, FramedRead};
28use tokio_util::either::Either;
29use unsigned_varint::codec::UviBytes;
30
31use crate::utils::encoding::from_slice_with_fallback;
32
33const MAX_FRAME_LEN: usize = 512 * 1024 * 1024;
35
36#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
37pub struct CarV1Header {
38 pub roots: NonEmpty<Cid>,
42 pub version: u64,
43}
44
45#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
47pub struct CarV2Header {
48 pub characteristics: [u8; 16],
49 pub data_offset: i64,
50 pub data_size: i64,
51 pub index_offset: i64,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub struct CarBlock {
56 pub cid: Cid,
57 pub data: Bytes,
58}
59
60impl CarBlock {
61 pub fn write(&self, writer: &mut impl Write) -> io::Result<()> {
63 writer.write_car_block(
64 self.cid,
65 self.data.len() as u64,
66 &mut Cursor::new(&self.data),
67 )
68 }
69
70 pub fn from_bytes(bytes: impl Into<Bytes>) -> io::Result<CarBlock> {
71 let bytes: Bytes = bytes.into();
72 let mut cursor = bytes.reader();
73 let cid = Cid::read_bytes(&mut cursor)
74 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
75 let data = cursor.into_inner();
76 Ok(CarBlock { cid, data })
77 }
78
79 pub fn valid(&self) -> bool {
80 self.validate().is_ok()
81 }
82
83 pub fn validate(&self) -> anyhow::Result<()> {
84 let actual = {
85 let code = MultihashCode::try_from(self.cid.hash().code())?;
86 Cid::new_v1(self.cid.codec(), code.digest(&self.data))
87 };
88 anyhow::ensure!(
89 actual == self.cid,
90 "CID/Block mismatch for block {}, actual: {actual}",
91 self.cid
92 );
93 Ok(())
94 }
95}
96
97pub trait CarBlockWrite {
98 fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()>;
99}
100
101impl<T: Write> CarBlockWrite for T {
102 fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()> {
103 let frame_length = cid.encoded_len() as u64 + data_len;
104 self.write_all(&frame_length.encode_var_vec())?;
105 cid.write_bytes(&mut *self)
106 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
107 std::io::copy(data, self)?;
108 Ok(())
109 }
110}
111
112pin_project! {
113 pub struct CarStream<ReaderT> {
118 #[pin]
119 reader: FramedRead<Take<Either<ReaderT, ZstdDecoder<ReaderT>>>,UviBytes>,
120 pub header_v1: CarV1Header,
121 pub header_v2: Option<CarV2Header>,
122 pub metadata: Option<FilecoinSnapshotMetadata>,
123 first_block: Option<CarBlock>,
124 }
125}
126
127fn is_zstd(buf: &[u8]) -> bool {
131 zstd::zstd_safe::get_frame_content_size(buf).is_ok()
132}
133
134impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {
135 #[allow(dead_code)]
143 pub async fn new_unsafe(mut reader: ReaderT) -> io::Result<Self> {
144 let header_v2 = Self::try_decode_header_v2_from_fill_buf(reader.fill_buf().await?)
145 .ok()
147 .flatten();
148 Self::new_with_header_v2(reader, header_v2).await
149 }
150
151 pub async fn new_with_header_v2(
153 mut reader: ReaderT,
154 header_v2: Option<CarV2Header>,
155 ) -> io::Result<Self> {
156 let is_compressed = is_zstd(reader.fill_buf().await?);
157 let mut reader = if is_compressed {
158 let mut zstd = ZstdDecoder::new(reader);
159 zstd.multiple_members(true);
160 Either::Right(zstd)
161 } else {
162 Either::Left(reader)
163 };
164
165 if let Some(header_v2) = &header_v2 {
167 reader = skip_bytes(
168 reader,
169 u64::try_from(header_v2.data_offset).map_err(std::io::Error::other)?,
170 )
171 .await?;
172 }
173
174 let max_car_v1_bytes = header_v2
175 .as_ref()
176 .map(|h| u64::try_from(h.data_size).map_err(std::io::Error::other))
177 .transpose()?
178 .unwrap_or(u64::MAX);
179 let mut reader = reader.take(max_car_v1_bytes);
180 let header_v1 = read_v1_header(&mut reader).await?;
181
182 if let Some(block) = read_car_block(&mut reader).await? {
185 if !block.valid() {
186 return Err(io::Error::new(
187 io::ErrorKind::InvalidData,
188 "invalid first block",
189 ));
190 }
191
192 let (first_block, metadata) = if header_v1.roots.len()
193 == crate::db::car::V2_SNAPSHOT_ROOT_COUNT
194 {
195 let maybe_metadata_cid = header_v1.roots.first();
196 if maybe_metadata_cid == &block.cid
197 && let Ok(metadata) =
198 fvm_ipld_encoding::from_slice::<FilecoinSnapshotMetadata>(&block.data)
199 {
200 if metadata.f3_data.is_some() {
202 const MAX_F3_FRAME_LEN: u64 = 16 * 1024 * 1024 * 1024;
204 let len: u64 = reader.read_varint_async().await?;
205 if len > MAX_F3_FRAME_LEN {
206 return Err(io::Error::new(
207 io::ErrorKind::InvalidData,
208 format!(
209 "f3 block frame length too large: {len} > {MAX_F3_FRAME_LEN}"
210 ),
211 ));
212 }
213 reader = skip_bytes(reader, len).await?;
214 }
215
216 (None, Some(metadata))
218 } else {
219 (Some(block), None)
220 }
221 } else {
222 (Some(block), None)
223 };
224
225 Ok(CarStream {
226 reader: FramedRead::new(reader, uvi_bytes()),
227 header_v1,
228 header_v2,
229 metadata,
230 first_block,
231 })
232 } else {
233 Ok(CarStream {
234 reader: FramedRead::new(reader, uvi_bytes()),
235 header_v1,
236 header_v2,
237 metadata: None,
238 first_block: None,
239 })
240 }
241 }
242
243 pub async fn extract_header_v2(
248 mut reader: ReaderT,
249 ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
250 let is_compressed = is_zstd(reader.fill_buf().await?);
251 let mut reader = if is_compressed {
252 let mut zstd = ZstdDecoder::new(reader);
253 zstd.multiple_members(true);
254 Either::Right(zstd)
255 } else {
256 Either::Left(reader)
257 };
258 let mut possible_header_bytes = [0; 51];
259 reader.read_exact(&mut possible_header_bytes).await?;
260 let header_v2 = read_v2_header(possible_header_bytes.as_slice())?;
261 let reader = match reader {
262 Either::Left(reader) => reader,
263 Either::Right(zstd) => zstd.into_inner(),
264 };
265 Ok((reader, header_v2))
266 }
267
268 fn try_decode_header_v2_from_fill_buf(fill_buf: &[u8]) -> io::Result<Option<CarV2Header>> {
269 let is_compressed = is_zstd(fill_buf);
270 let fill_buf_reader = if is_compressed {
271 itertools::Either::Right(zstd::Decoder::new(fill_buf)?)
272 } else {
273 itertools::Either::Left(fill_buf)
274 };
275 read_v2_header(fill_buf_reader)
276 }
277}
278
279impl<ReaderT: AsyncBufRead + AsyncSeek + Unpin> CarStream<ReaderT> {
280 pub async fn new(reader: ReaderT) -> io::Result<Self> {
282 let (reader, header_v2) = Self::extract_header_v2_and_reset_reader_position(reader).await?;
283 Self::new_with_header_v2(reader, header_v2).await
284 }
285
286 pub async fn extract_header_v2_and_reset_reader_position(
288 mut reader: ReaderT,
289 ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
290 let stream_position = reader.stream_position().await?;
291 let (mut reader, header_v2) = Self::extract_header_v2(reader).await?;
292 reader.seek(SeekFrom::Start(stream_position)).await?;
293 Ok((reader, header_v2))
294 }
295}
296
297impl<ReaderT> CarStream<ReaderT> {
298 pub fn head_tipset_key(&self) -> TipsetKey {
299 self.metadata
300 .as_ref()
301 .map(|m| m.head_tipset_key.clone())
302 .unwrap_or_else(|| self.header_v1.roots.clone())
303 .into()
304 }
305}
306
307impl CarStream<tokio::io::BufReader<tokio::fs::File>> {
308 pub async fn new_from_path(path: impl AsRef<Path>) -> io::Result<Self> {
309 Self::new(tokio::io::BufReader::new(
310 tokio::fs::File::open(path.as_ref()).await?,
311 ))
312 .await
313 }
314}
315
316impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
317 type Item = io::Result<CarBlock>;
318
319 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320 let this = self.project();
321 if let Some(block) = this.first_block.take() {
322 return Poll::Ready(Some(Ok(block)));
323 }
324 let item = futures::ready!(this.reader.poll_next(cx));
325 Poll::Ready(item.map(|ret| ret.and_then(CarBlock::from_bytes)))
326 }
327}
328
329pin_project! {
330 pub struct CarWriter<W> {
331 #[pin]
332 inner: W,
333 buffer: BytesMut,
334 }
335}
336
337impl<W: AsyncWrite> CarWriter<W> {
338 pub fn new_carv1(roots: NonEmpty<Cid>, writer: W) -> io::Result<Self> {
339 let car_header = CarV1Header { roots, version: 1 };
340
341 let mut header_uvi_frame = BytesMut::new();
342 uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?;
343
344 Ok(Self {
345 inner: writer,
346 buffer: header_uvi_frame,
347 })
348 }
349}
350
351impl<W: AsyncWrite> Sink<CarBlock> for CarWriter<W> {
352 type Error = io::Error;
353
354 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
355 let mut this = self.as_mut().project();
356
357 while !this.buffer.is_empty() {
358 this = self.as_mut().project();
359 let bytes_written = ready!(this.inner.poll_write(cx, this.buffer))?;
360 this.buffer.advance(bytes_written);
361 }
362 Poll::Ready(Ok(()))
363 }
364 fn start_send(self: Pin<&mut Self>, item: CarBlock) -> Result<(), Self::Error> {
365 item.write(&mut self.project().buffer.writer())
366 }
367 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
368 ready!(self.as_mut().poll_ready(cx))?;
369 self.project().inner.poll_flush(cx)
370 }
371 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372 ready!(self.as_mut().poll_ready(cx))?;
373 self.project().inner.poll_shutdown(cx)
374 }
375}
376
377async fn read_v1_header<ReaderT: AsyncRead + Unpin>(
378 reader: &mut ReaderT,
379) -> std::io::Result<CarV1Header> {
380 let Some(frame) = read_frame(reader).await? else {
381 return Err(std::io::Error::new(
382 std::io::ErrorKind::UnexpectedEof,
383 "failed to decode v1 header frame",
384 ));
385 };
386 let header = from_slice_with_fallback::<CarV1Header>(&frame).map_err(std::io::Error::other)?;
387 if header.version != 1 {
388 return Err(std::io::Error::other(format!(
389 "unexpected header version {}, 1 expected",
390 header.version
391 )));
392 }
393 Ok(header)
394}
395
396async fn read_frame<ReaderT: AsyncRead + Unpin>(
397 reader: &mut ReaderT,
398) -> std::io::Result<Option<Vec<u8>>> {
399 let len: usize = match reader.read_varint_async().await {
400 Ok(len) if len > MAX_FRAME_LEN => {
401 return Err(std::io::Error::new(
402 std::io::ErrorKind::InvalidData,
403 format!("frame too large: {len} > {MAX_FRAME_LEN}"),
404 ));
405 }
406 Ok(len) => len,
407 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
408 Err(e) => return Err(e),
409 };
410 let mut bytes = vec![0; len];
411 reader.read_exact(&mut bytes[..]).await?;
412 Ok(Some(bytes))
413}
414
415async fn read_car_block<ReaderT: AsyncRead + Unpin>(
416 reader: &mut ReaderT,
417) -> std::io::Result<Option<CarBlock>> {
418 read_frame(reader)
419 .await?
420 .map(CarBlock::from_bytes)
421 .transpose()
422}
423
424pub fn uvi_bytes() -> UviBytes {
425 let mut decoder = UviBytes::default();
426 decoder.set_max_len(MAX_FRAME_LEN);
427 decoder
428}
429
430#[cfg(test)]
431mod tests;