1use crate::chain::FilecoinSnapshotMetadata;
5use crate::db::car::plain::read_v2_header;
6use crate::utils::io::skip_bytes;
7use crate::utils::multihash::prelude::*;
8use async_compression::tokio::bufread::ZstdDecoder;
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10use cid::Cid;
11use futures::ready;
12use futures::{Stream, sink::Sink};
13use fvm_ipld_encoding::to_vec;
14use integer_encoding::{VarInt, VarIntAsyncReader as _};
15use nunny::Vec as NonEmpty;
16use pin_project_lite::pin_project;
17use serde::{Deserialize, Serialize};
18use std::io::{self, Cursor, Read, SeekFrom, Write};
19use std::path::Path;
20use std::pin::Pin;
21use std::task::{Context, Poll};
22use tokio::io::{
23 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
24 Take,
25};
26use tokio_util::codec::{Encoder, FramedRead};
27use tokio_util::either::Either;
28use unsigned_varint::codec::UviBytes;
29
30use crate::utils::encoding::from_slice_with_fallback;
31
32const MAX_FRAME_LEN: usize = 512 * 1024 * 1024;
34
35#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
36pub struct CarV1Header {
37 pub roots: NonEmpty<Cid>,
41 pub version: u64,
42}
43
44#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
46pub struct CarV2Header {
47 pub characteristics: [u8; 16],
48 pub data_offset: i64,
49 pub data_size: i64,
50 pub index_offset: i64,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
54pub struct CarBlock {
55 pub cid: Cid,
56 pub data: Vec<u8>,
57}
58
59impl CarBlock {
60 pub fn write(&self, writer: &mut impl Write) -> io::Result<()> {
62 writer.write_car_block(
63 self.cid,
64 self.data.len() as u64,
65 &mut Cursor::new(&self.data),
66 )
67 }
68
69 pub fn from_bytes(bytes: impl Into<Bytes>) -> io::Result<CarBlock> {
70 let bytes: Bytes = bytes.into();
71 let mut cursor = bytes.reader();
72 let cid = Cid::read_bytes(&mut cursor)
73 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
74 let bytes = cursor.into_inner();
75 Ok(CarBlock {
76 cid,
77 data: bytes.to_vec(),
78 })
79 }
80
81 pub fn valid(&self) -> bool {
82 self.validate().is_ok()
83 }
84
85 pub fn validate(&self) -> anyhow::Result<()> {
86 let actual = {
87 let code = MultihashCode::try_from(self.cid.hash().code())?;
88 Cid::new_v1(self.cid.codec(), code.digest(&self.data))
89 };
90 anyhow::ensure!(
91 actual == self.cid,
92 "CID/Block mismatch for block {}, actual: {actual}",
93 self.cid
94 );
95 Ok(())
96 }
97}
98
99pub trait CarBlockWrite {
100 fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()>;
101}
102
103impl<T: Write> CarBlockWrite for T {
104 fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()> {
105 let frame_length = cid.encoded_len() as u64 + data_len;
106 self.write_all(&frame_length.encode_var_vec())?;
107 cid.write_bytes(&mut *self)
108 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
109 std::io::copy(data, self)?;
110 Ok(())
111 }
112}
113
114pin_project! {
115 pub struct CarStream<ReaderT> {
120 #[pin]
121 reader: FramedRead<Take<Either<ReaderT, ZstdDecoder<ReaderT>>>,UviBytes>,
122 pub header_v1: CarV1Header,
123 pub header_v2: Option<CarV2Header>,
124 first_block: Option<CarBlock>,
125 }
126}
127
128fn is_zstd(buf: &[u8]) -> bool {
132 zstd::zstd_safe::get_frame_content_size(buf).is_ok()
133}
134
135impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {
136 #[allow(dead_code)]
144 pub async fn new_unsafe(mut reader: ReaderT) -> io::Result<Self> {
145 let header_v2 = Self::try_decode_header_v2_from_fill_buf(reader.fill_buf().await?)
146 .ok()
148 .flatten();
149 Self::new_with_header_v2(reader, header_v2).await
150 }
151
152 pub async fn new_with_header_v2(
154 mut reader: ReaderT,
155 header_v2: Option<CarV2Header>,
156 ) -> io::Result<Self> {
157 let is_compressed = is_zstd(reader.fill_buf().await?);
158 let mut reader = if is_compressed {
159 let mut zstd = ZstdDecoder::new(reader);
160 zstd.multiple_members(true);
161 Either::Right(zstd)
162 } else {
163 Either::Left(reader)
164 };
165
166 if let Some(header_v2) = &header_v2 {
168 reader = skip_bytes(
169 reader,
170 u64::try_from(header_v2.data_offset).map_err(std::io::Error::other)?,
171 )
172 .await?;
173 }
174
175 let max_car_v1_bytes = header_v2
176 .as_ref()
177 .map(|h| u64::try_from(h.data_size).map_err(std::io::Error::other))
178 .transpose()?
179 .unwrap_or(u64::MAX);
180 let mut reader = reader.take(max_car_v1_bytes);
181 let header_v1 = read_v1_header(&mut reader).await?;
182
183 if let Some(block) = read_car_block(&mut reader).await? {
186 if !block.valid() {
187 return Err(io::Error::new(
188 io::ErrorKind::InvalidData,
189 "invalid first block",
190 ));
191 }
192
193 let first_block = if header_v1.roots.len() == crate::db::car::V2_SNAPSHOT_ROOT_COUNT {
194 let maybe_metadata_cid = header_v1.roots.first();
195 if maybe_metadata_cid == &block.cid
196 && let Ok(metadata) =
197 fvm_ipld_encoding::from_slice::<FilecoinSnapshotMetadata>(&block.data)
198 {
199 if metadata.f3_data.is_some() {
201 const MAX_F3_FRAME_LEN: u64 = 16 * 1024 * 1024 * 1024;
203 let len: u64 = reader.read_varint_async().await?;
204 if len > MAX_F3_FRAME_LEN {
205 return Err(io::Error::new(
206 io::ErrorKind::InvalidData,
207 format!(
208 "f3 block frame length too large: {len} > {MAX_F3_FRAME_LEN}"
209 ),
210 ));
211 }
212 reader = skip_bytes(reader, len).await?;
213 }
214
215 None
217 } else {
218 Some(block)
219 }
220 } else {
221 Some(block)
222 };
223
224 Ok(CarStream {
225 reader: FramedRead::new(reader, uvi_bytes()),
226 header_v1,
227 header_v2,
228 first_block,
229 })
230 } else {
231 Ok(CarStream {
232 reader: FramedRead::new(reader, uvi_bytes()),
233 header_v1,
234 header_v2,
235 first_block: None,
236 })
237 }
238 }
239
240 pub async fn extract_header_v2(
245 mut reader: ReaderT,
246 ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
247 let is_compressed = is_zstd(reader.fill_buf().await?);
248 let mut reader = if is_compressed {
249 let mut zstd = ZstdDecoder::new(reader);
250 zstd.multiple_members(true);
251 Either::Right(zstd)
252 } else {
253 Either::Left(reader)
254 };
255 let mut possible_header_bytes = [0; 51];
256 reader.read_exact(&mut possible_header_bytes).await?;
257 let header_v2 = read_v2_header(possible_header_bytes.as_slice())?;
258 let reader = match reader {
259 Either::Left(reader) => reader,
260 Either::Right(zstd) => zstd.into_inner(),
261 };
262 Ok((reader, header_v2))
263 }
264
265 fn try_decode_header_v2_from_fill_buf(fill_buf: &[u8]) -> io::Result<Option<CarV2Header>> {
266 let is_compressed = is_zstd(fill_buf);
267 let fill_buf_reader = if is_compressed {
268 itertools::Either::Right(zstd::Decoder::new(fill_buf)?)
269 } else {
270 itertools::Either::Left(fill_buf)
271 };
272 read_v2_header(fill_buf_reader)
273 }
274}
275
276impl<ReaderT: AsyncBufRead + AsyncSeek + Unpin> CarStream<ReaderT> {
277 pub async fn new(reader: ReaderT) -> io::Result<Self> {
279 let (reader, header_v2) = Self::extract_header_v2_and_reset_reader_position(reader).await?;
280 Self::new_with_header_v2(reader, header_v2).await
281 }
282
283 pub async fn extract_header_v2_and_reset_reader_position(
285 mut reader: ReaderT,
286 ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
287 let stream_position = reader.stream_position().await?;
288 let (mut reader, header_v2) = Self::extract_header_v2(reader).await?;
289 reader.seek(SeekFrom::Start(stream_position)).await?;
290 Ok((reader, header_v2))
291 }
292}
293
294impl CarStream<tokio::io::BufReader<tokio::fs::File>> {
295 pub async fn new_from_path(path: impl AsRef<Path>) -> io::Result<Self> {
296 Self::new(tokio::io::BufReader::new(
297 tokio::fs::File::open(path.as_ref()).await?,
298 ))
299 .await
300 }
301}
302
303impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
304 type Item = io::Result<CarBlock>;
305
306 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
307 let this = self.project();
308 if let Some(block) = this.first_block.take() {
309 return Poll::Ready(Some(Ok(block)));
310 }
311 let item = futures::ready!(this.reader.poll_next(cx));
312 Poll::Ready(item.map(|ret| ret.and_then(CarBlock::from_bytes)))
313 }
314}
315
316pin_project! {
317 pub struct CarWriter<W> {
318 #[pin]
319 inner: W,
320 buffer: BytesMut,
321 }
322}
323
324impl<W: AsyncWrite> CarWriter<W> {
325 pub fn new_carv1(roots: NonEmpty<Cid>, writer: W) -> io::Result<Self> {
326 let car_header = CarV1Header { roots, version: 1 };
327
328 let mut header_uvi_frame = BytesMut::new();
329 uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?;
330
331 Ok(Self {
332 inner: writer,
333 buffer: header_uvi_frame,
334 })
335 }
336}
337
338impl<W: AsyncWrite> Sink<CarBlock> for CarWriter<W> {
339 type Error = io::Error;
340
341 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
342 let mut this = self.as_mut().project();
343
344 while !this.buffer.is_empty() {
345 this = self.as_mut().project();
346 let bytes_written = ready!(this.inner.poll_write(cx, this.buffer))?;
347 this.buffer.advance(bytes_written);
348 }
349 Poll::Ready(Ok(()))
350 }
351 fn start_send(self: Pin<&mut Self>, item: CarBlock) -> Result<(), Self::Error> {
352 item.write(&mut self.project().buffer.writer())
353 }
354 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
355 ready!(self.as_mut().poll_ready(cx))?;
356 self.project().inner.poll_flush(cx)
357 }
358 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
359 ready!(self.as_mut().poll_ready(cx))?;
360 self.project().inner.poll_shutdown(cx)
361 }
362}
363
364async fn read_v1_header<ReaderT: AsyncRead + Unpin>(
365 reader: &mut ReaderT,
366) -> std::io::Result<CarV1Header> {
367 let Some(frame) = read_frame(reader).await? else {
368 return Err(std::io::Error::new(
369 std::io::ErrorKind::UnexpectedEof,
370 "failed to decode v1 header frame",
371 ));
372 };
373 let header = from_slice_with_fallback::<CarV1Header>(&frame).map_err(std::io::Error::other)?;
374 if header.version != 1 {
375 return Err(std::io::Error::other(format!(
376 "unexpected header version {}, 1 expected",
377 header.version
378 )));
379 }
380 Ok(header)
381}
382
383async fn read_frame<ReaderT: AsyncRead + Unpin>(
384 reader: &mut ReaderT,
385) -> std::io::Result<Option<Vec<u8>>> {
386 let len: usize = match reader.read_varint_async().await {
387 Ok(len) if len > MAX_FRAME_LEN => {
388 return Err(std::io::Error::new(
389 std::io::ErrorKind::InvalidData,
390 format!("frame too large: {len} > {MAX_FRAME_LEN}"),
391 ));
392 }
393 Ok(len) => len,
394 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
395 Err(e) => return Err(e),
396 };
397 let mut bytes = vec![0; len];
398 reader.read_exact(&mut bytes[..]).await?;
399 Ok(Some(bytes))
400}
401
402async fn read_car_block<ReaderT: AsyncRead + Unpin>(
403 reader: &mut ReaderT,
404) -> std::io::Result<Option<CarBlock>> {
405 read_frame(reader)
406 .await?
407 .map(CarBlock::from_bytes)
408 .transpose()
409}
410
411pub fn uvi_bytes() -> UviBytes {
412 let mut decoder = UviBytes::default();
413 decoder.set_max_len(MAX_FRAME_LEN);
414 decoder
415}
416
417#[cfg(test)]
418mod tests;