1use crate::blocks::TipsetKey;
5use crate::chain::FilecoinSnapshotMetadata;
6use crate::db::car::plain::read_v2_header;
7use crate::utils::io::skip_bytes;
8use crate::utils::multihash::prelude::*;
9use async_compression::tokio::bufread::ZstdDecoder;
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11use cid::Cid;
12use futures::ready;
13use futures::{Stream, sink::Sink};
14use fvm_ipld_encoding::to_vec;
15use integer_encoding::{VarInt, VarIntAsyncReader as _};
16use nunny::Vec as NonEmpty;
17use pin_project_lite::pin_project;
18use serde::{Deserialize, Serialize};
19use std::io::{self, Cursor, Read, SeekFrom, Write};
20use std::path::Path;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23use tokio::io::{
24 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
25 Take,
26};
27use tokio_util::codec::{Encoder, FramedRead};
28use tokio_util::either::Either;
29use unsigned_varint::codec::UviBytes;
30
31use crate::utils::encoding::from_slice_with_fallback;
32
33const MAX_FRAME_LEN: usize = 512 * 1024 * 1024;
35
36#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
37pub struct CarV1Header {
38 pub roots: NonEmpty<Cid>,
42 pub version: u64,
43}
44
45#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
47pub struct CarV2Header {
48 pub characteristics: [u8; 16],
49 pub data_offset: i64,
50 pub data_size: i64,
51 pub index_offset: i64,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub struct CarBlock {
56 pub cid: Cid,
57 pub data: Vec<u8>,
58}
59
60impl CarBlock {
61 pub fn write(&self, writer: &mut impl Write) -> io::Result<()> {
63 writer.write_car_block(
64 self.cid,
65 self.data.len() as u64,
66 &mut Cursor::new(&self.data),
67 )
68 }
69
70 pub fn from_bytes(bytes: impl Into<Bytes>) -> io::Result<CarBlock> {
71 let bytes: Bytes = bytes.into();
72 let mut cursor = bytes.reader();
73 let cid = Cid::read_bytes(&mut cursor)
74 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
75 let bytes = cursor.into_inner();
76 Ok(CarBlock {
77 cid,
78 data: bytes.to_vec(),
79 })
80 }
81
82 pub fn valid(&self) -> bool {
83 self.validate().is_ok()
84 }
85
86 pub fn validate(&self) -> anyhow::Result<()> {
87 let actual = {
88 let code = MultihashCode::try_from(self.cid.hash().code())?;
89 Cid::new_v1(self.cid.codec(), code.digest(&self.data))
90 };
91 anyhow::ensure!(
92 actual == self.cid,
93 "CID/Block mismatch for block {}, actual: {actual}",
94 self.cid
95 );
96 Ok(())
97 }
98}
99
100pub trait CarBlockWrite {
101 fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()>;
102}
103
104impl<T: Write> CarBlockWrite for T {
105 fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()> {
106 let frame_length = cid.encoded_len() as u64 + data_len;
107 self.write_all(&frame_length.encode_var_vec())?;
108 cid.write_bytes(&mut *self)
109 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
110 std::io::copy(data, self)?;
111 Ok(())
112 }
113}
114
115pin_project! {
116 pub struct CarStream<ReaderT> {
121 #[pin]
122 reader: FramedRead<Take<Either<ReaderT, ZstdDecoder<ReaderT>>>,UviBytes>,
123 pub header_v1: CarV1Header,
124 pub header_v2: Option<CarV2Header>,
125 pub metadata: Option<FilecoinSnapshotMetadata>,
126 first_block: Option<CarBlock>,
127 }
128}
129
130fn is_zstd(buf: &[u8]) -> bool {
134 zstd::zstd_safe::get_frame_content_size(buf).is_ok()
135}
136
137impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {
138 #[allow(dead_code)]
146 pub async fn new_unsafe(mut reader: ReaderT) -> io::Result<Self> {
147 let header_v2 = Self::try_decode_header_v2_from_fill_buf(reader.fill_buf().await?)
148 .ok()
150 .flatten();
151 Self::new_with_header_v2(reader, header_v2).await
152 }
153
154 pub async fn new_with_header_v2(
156 mut reader: ReaderT,
157 header_v2: Option<CarV2Header>,
158 ) -> io::Result<Self> {
159 let is_compressed = is_zstd(reader.fill_buf().await?);
160 let mut reader = if is_compressed {
161 let mut zstd = ZstdDecoder::new(reader);
162 zstd.multiple_members(true);
163 Either::Right(zstd)
164 } else {
165 Either::Left(reader)
166 };
167
168 if let Some(header_v2) = &header_v2 {
170 reader = skip_bytes(
171 reader,
172 u64::try_from(header_v2.data_offset).map_err(std::io::Error::other)?,
173 )
174 .await?;
175 }
176
177 let max_car_v1_bytes = header_v2
178 .as_ref()
179 .map(|h| u64::try_from(h.data_size).map_err(std::io::Error::other))
180 .transpose()?
181 .unwrap_or(u64::MAX);
182 let mut reader = reader.take(max_car_v1_bytes);
183 let header_v1 = read_v1_header(&mut reader).await?;
184
185 if let Some(block) = read_car_block(&mut reader).await? {
188 if !block.valid() {
189 return Err(io::Error::new(
190 io::ErrorKind::InvalidData,
191 "invalid first block",
192 ));
193 }
194
195 let (first_block, metadata) = if header_v1.roots.len()
196 == crate::db::car::V2_SNAPSHOT_ROOT_COUNT
197 {
198 let maybe_metadata_cid = header_v1.roots.first();
199 if maybe_metadata_cid == &block.cid
200 && let Ok(metadata) =
201 fvm_ipld_encoding::from_slice::<FilecoinSnapshotMetadata>(&block.data)
202 {
203 if metadata.f3_data.is_some() {
205 const MAX_F3_FRAME_LEN: u64 = 16 * 1024 * 1024 * 1024;
207 let len: u64 = reader.read_varint_async().await?;
208 if len > MAX_F3_FRAME_LEN {
209 return Err(io::Error::new(
210 io::ErrorKind::InvalidData,
211 format!(
212 "f3 block frame length too large: {len} > {MAX_F3_FRAME_LEN}"
213 ),
214 ));
215 }
216 reader = skip_bytes(reader, len).await?;
217 }
218
219 (None, Some(metadata))
221 } else {
222 (Some(block), None)
223 }
224 } else {
225 (Some(block), None)
226 };
227
228 Ok(CarStream {
229 reader: FramedRead::new(reader, uvi_bytes()),
230 header_v1,
231 header_v2,
232 metadata,
233 first_block,
234 })
235 } else {
236 Ok(CarStream {
237 reader: FramedRead::new(reader, uvi_bytes()),
238 header_v1,
239 header_v2,
240 metadata: None,
241 first_block: None,
242 })
243 }
244 }
245
246 pub async fn extract_header_v2(
251 mut reader: ReaderT,
252 ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
253 let is_compressed = is_zstd(reader.fill_buf().await?);
254 let mut reader = if is_compressed {
255 let mut zstd = ZstdDecoder::new(reader);
256 zstd.multiple_members(true);
257 Either::Right(zstd)
258 } else {
259 Either::Left(reader)
260 };
261 let mut possible_header_bytes = [0; 51];
262 reader.read_exact(&mut possible_header_bytes).await?;
263 let header_v2 = read_v2_header(possible_header_bytes.as_slice())?;
264 let reader = match reader {
265 Either::Left(reader) => reader,
266 Either::Right(zstd) => zstd.into_inner(),
267 };
268 Ok((reader, header_v2))
269 }
270
271 fn try_decode_header_v2_from_fill_buf(fill_buf: &[u8]) -> io::Result<Option<CarV2Header>> {
272 let is_compressed = is_zstd(fill_buf);
273 let fill_buf_reader = if is_compressed {
274 itertools::Either::Right(zstd::Decoder::new(fill_buf)?)
275 } else {
276 itertools::Either::Left(fill_buf)
277 };
278 read_v2_header(fill_buf_reader)
279 }
280}
281
282impl<ReaderT: AsyncBufRead + AsyncSeek + Unpin> CarStream<ReaderT> {
283 pub async fn new(reader: ReaderT) -> io::Result<Self> {
285 let (reader, header_v2) = Self::extract_header_v2_and_reset_reader_position(reader).await?;
286 Self::new_with_header_v2(reader, header_v2).await
287 }
288
289 pub async fn extract_header_v2_and_reset_reader_position(
291 mut reader: ReaderT,
292 ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
293 let stream_position = reader.stream_position().await?;
294 let (mut reader, header_v2) = Self::extract_header_v2(reader).await?;
295 reader.seek(SeekFrom::Start(stream_position)).await?;
296 Ok((reader, header_v2))
297 }
298}
299
300impl<ReaderT> CarStream<ReaderT> {
301 pub fn head_tipset_key(&self) -> TipsetKey {
302 self.metadata
303 .as_ref()
304 .map(|m| m.head_tipset_key.clone())
305 .unwrap_or_else(|| self.header_v1.roots.clone())
306 .into()
307 }
308}
309
310impl CarStream<tokio::io::BufReader<tokio::fs::File>> {
311 pub async fn new_from_path(path: impl AsRef<Path>) -> io::Result<Self> {
312 Self::new(tokio::io::BufReader::new(
313 tokio::fs::File::open(path.as_ref()).await?,
314 ))
315 .await
316 }
317}
318
319impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
320 type Item = io::Result<CarBlock>;
321
322 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
323 let this = self.project();
324 if let Some(block) = this.first_block.take() {
325 return Poll::Ready(Some(Ok(block)));
326 }
327 let item = futures::ready!(this.reader.poll_next(cx));
328 Poll::Ready(item.map(|ret| ret.and_then(CarBlock::from_bytes)))
329 }
330}
331
332pin_project! {
333 pub struct CarWriter<W> {
334 #[pin]
335 inner: W,
336 buffer: BytesMut,
337 }
338}
339
340impl<W: AsyncWrite> CarWriter<W> {
341 pub fn new_carv1(roots: NonEmpty<Cid>, writer: W) -> io::Result<Self> {
342 let car_header = CarV1Header { roots, version: 1 };
343
344 let mut header_uvi_frame = BytesMut::new();
345 uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?;
346
347 Ok(Self {
348 inner: writer,
349 buffer: header_uvi_frame,
350 })
351 }
352}
353
354impl<W: AsyncWrite> Sink<CarBlock> for CarWriter<W> {
355 type Error = io::Error;
356
357 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
358 let mut this = self.as_mut().project();
359
360 while !this.buffer.is_empty() {
361 this = self.as_mut().project();
362 let bytes_written = ready!(this.inner.poll_write(cx, this.buffer))?;
363 this.buffer.advance(bytes_written);
364 }
365 Poll::Ready(Ok(()))
366 }
367 fn start_send(self: Pin<&mut Self>, item: CarBlock) -> Result<(), Self::Error> {
368 item.write(&mut self.project().buffer.writer())
369 }
370 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
371 ready!(self.as_mut().poll_ready(cx))?;
372 self.project().inner.poll_flush(cx)
373 }
374 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375 ready!(self.as_mut().poll_ready(cx))?;
376 self.project().inner.poll_shutdown(cx)
377 }
378}
379
380async fn read_v1_header<ReaderT: AsyncRead + Unpin>(
381 reader: &mut ReaderT,
382) -> std::io::Result<CarV1Header> {
383 let Some(frame) = read_frame(reader).await? else {
384 return Err(std::io::Error::new(
385 std::io::ErrorKind::UnexpectedEof,
386 "failed to decode v1 header frame",
387 ));
388 };
389 let header = from_slice_with_fallback::<CarV1Header>(&frame).map_err(std::io::Error::other)?;
390 if header.version != 1 {
391 return Err(std::io::Error::other(format!(
392 "unexpected header version {}, 1 expected",
393 header.version
394 )));
395 }
396 Ok(header)
397}
398
399async fn read_frame<ReaderT: AsyncRead + Unpin>(
400 reader: &mut ReaderT,
401) -> std::io::Result<Option<Vec<u8>>> {
402 let len: usize = match reader.read_varint_async().await {
403 Ok(len) if len > MAX_FRAME_LEN => {
404 return Err(std::io::Error::new(
405 std::io::ErrorKind::InvalidData,
406 format!("frame too large: {len} > {MAX_FRAME_LEN}"),
407 ));
408 }
409 Ok(len) => len,
410 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
411 Err(e) => return Err(e),
412 };
413 let mut bytes = vec![0; len];
414 reader.read_exact(&mut bytes[..]).await?;
415 Ok(Some(bytes))
416}
417
418async fn read_car_block<ReaderT: AsyncRead + Unpin>(
419 reader: &mut ReaderT,
420) -> std::io::Result<Option<CarBlock>> {
421 read_frame(reader)
422 .await?
423 .map(CarBlock::from_bytes)
424 .transpose()
425}
426
427pub fn uvi_bytes() -> UviBytes {
428 let mut decoder = UviBytes::default();
429 decoder.set_max_len(MAX_FRAME_LEN);
430 decoder
431}
432
433#[cfg(test)]
434mod tests;