use crate::traits::BlockStore;
use bytes::Bytes;
use ipfrs_core::{Block, Cid, Error, Result};
use std::path::Path;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
pub const CAR_VERSION: u64 = 1;
#[derive(Debug, Clone)]
pub struct CarHeader {
pub version: u64,
pub roots: Vec<Cid>,
}
impl CarHeader {
pub fn new(roots: Vec<Cid>) -> Self {
Self {
version: CAR_VERSION,
roots,
}
}
pub fn to_cbor(&self) -> Result<Vec<u8>> {
let mut buf = Vec::new();
buf.push(0xa2);
buf.push(0x67); buf.extend_from_slice(b"version");
buf.push(0x01);
buf.push(0x65); buf.extend_from_slice(b"roots");
let roots_len = self.roots.len();
if roots_len < 24 {
buf.push(0x80 | roots_len as u8); } else if roots_len < 256 {
buf.push(0x98); buf.push(roots_len as u8);
} else {
return Err(Error::InvalidData("Too many roots".to_string()));
}
for root in &self.roots {
let cid_bytes = root.to_bytes();
buf.push(0xd8); buf.push(0x2a);
if cid_bytes.len() < 24 {
buf.push(0x40 | cid_bytes.len() as u8);
} else if cid_bytes.len() < 256 {
buf.push(0x58);
buf.push(cid_bytes.len() as u8);
} else {
buf.push(0x59);
buf.extend_from_slice(&(cid_bytes.len() as u16).to_be_bytes());
}
buf.extend_from_slice(&cid_bytes);
}
Ok(buf)
}
pub fn from_cbor(data: &[u8]) -> Result<Self> {
if data.is_empty() || (data[0] & 0xe0) != 0xa0 {
return Err(Error::InvalidData("Expected CBOR map".to_string()));
}
let (map_len, mut pos) = if data[0] == 0xa2 {
(2, 1)
} else {
return Err(Error::InvalidData("Expected map(2)".to_string()));
};
let mut version = 1u64;
let mut roots = Vec::new();
for _ in 0..map_len {
let (key, new_pos) = read_cbor_text(&data[pos..])?;
pos += new_pos;
match key.as_str() {
"version" => {
if pos >= data.len() {
return Err(Error::InvalidData("Unexpected end".to_string()));
}
let (v, new_pos) = read_cbor_uint(&data[pos..])?;
version = v;
pos += new_pos;
}
"roots" => {
let (r, new_pos) = read_cbor_roots(&data[pos..])?;
roots = r;
pos += new_pos;
}
_ => {
let new_pos = skip_cbor_value(&data[pos..])?;
pos += new_pos;
}
}
}
Ok(Self { version, roots })
}
}
fn read_cbor_text(data: &[u8]) -> Result<(String, usize)> {
if data.is_empty() {
return Err(Error::InvalidData("Unexpected end".to_string()));
}
let major = data[0] >> 5;
if major != 3 {
return Err(Error::InvalidData("Expected text string".to_string()));
}
let (len, header_len) = read_cbor_len(data)?;
let total_len = header_len + len;
if data.len() < total_len {
return Err(Error::InvalidData("Text string too short".to_string()));
}
let text = String::from_utf8(data[header_len..total_len].to_vec())
.map_err(|e| Error::InvalidData(format!("Invalid UTF-8: {e}")))?;
Ok((text, total_len))
}
fn read_cbor_uint(data: &[u8]) -> Result<(u64, usize)> {
if data.is_empty() {
return Err(Error::InvalidData("Unexpected end".to_string()));
}
let major = data[0] >> 5;
if major != 0 {
return Err(Error::InvalidData("Expected unsigned int".to_string()));
}
let (val, len) = read_cbor_len(data)?;
Ok((val as u64, len))
}
fn read_cbor_len(data: &[u8]) -> Result<(usize, usize)> {
if data.is_empty() {
return Err(Error::InvalidData("Unexpected end".to_string()));
}
let additional = data[0] & 0x1f;
match additional {
0..=23 => Ok((additional as usize, 1)),
24 => {
if data.len() < 2 {
return Err(Error::InvalidData("Length too short".to_string()));
}
Ok((data[1] as usize, 2))
}
25 => {
if data.len() < 3 {
return Err(Error::InvalidData("Length too short".to_string()));
}
Ok((u16::from_be_bytes([data[1], data[2]]) as usize, 3))
}
26 => {
if data.len() < 5 {
return Err(Error::InvalidData("Length too short".to_string()));
}
Ok((
u32::from_be_bytes([data[1], data[2], data[3], data[4]]) as usize,
5,
))
}
_ => Err(Error::InvalidData(
"Unsupported length encoding".to_string(),
)),
}
}
fn read_cbor_roots(data: &[u8]) -> Result<(Vec<Cid>, usize)> {
if data.is_empty() {
return Err(Error::InvalidData("Unexpected end".to_string()));
}
let major = data[0] >> 5;
if major != 4 {
return Err(Error::InvalidData("Expected array".to_string()));
}
let (arr_len, header_len) = read_cbor_len(data)?;
let mut pos = header_len;
let mut roots = Vec::with_capacity(arr_len);
for _ in 0..arr_len {
if pos < data.len() && data[pos] == 0xd8 {
pos += 2; }
if pos >= data.len() {
return Err(Error::InvalidData("Unexpected end in roots".to_string()));
}
let major = data[pos] >> 5;
if major != 2 {
return Err(Error::InvalidData(
"Expected byte string for CID".to_string(),
));
}
let (len, header) = read_cbor_len(&data[pos..])?;
pos += header;
if pos + len > data.len() {
return Err(Error::InvalidData("CID bytes too short".to_string()));
}
let cid = Cid::try_from(data[pos..pos + len].to_vec())
.map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
roots.push(cid);
pos += len;
}
Ok((roots, pos))
}
fn skip_cbor_value(data: &[u8]) -> Result<usize> {
if data.is_empty() {
return Err(Error::InvalidData("Unexpected end".to_string()));
}
let major = data[0] >> 5;
let (len, header_len) = read_cbor_len(data)?;
match major {
0 | 1 => Ok(header_len), 2 | 3 => Ok(header_len + len), 4 => {
let mut pos = header_len;
for _ in 0..len {
pos += skip_cbor_value(&data[pos..])?;
}
Ok(pos)
}
5 => {
let mut pos = header_len;
for _ in 0..len {
pos += skip_cbor_value(&data[pos..])?; pos += skip_cbor_value(&data[pos..])?; }
Ok(pos)
}
6 => {
Ok(header_len + skip_cbor_value(&data[header_len..])?)
}
7 => Ok(header_len), _ => Err(Error::InvalidData("Unknown CBOR major type".to_string())),
}
}
fn encode_varint(mut value: u64) -> Vec<u8> {
let mut buf = Vec::new();
while value >= 0x80 {
buf.push((value as u8) | 0x80);
value >>= 7;
}
buf.push(value as u8);
buf
}
fn decode_varint(data: &[u8]) -> Result<(u64, usize)> {
let mut result: u64 = 0;
let mut shift = 0;
for (i, &byte) in data.iter().enumerate() {
result |= ((byte & 0x7f) as u64) << shift;
if byte & 0x80 == 0 {
return Ok((result, i + 1));
}
shift += 7;
if shift >= 64 {
return Err(Error::InvalidData("Varint too long".to_string()));
}
}
Err(Error::InvalidData("Incomplete varint".to_string()))
}
pub struct CarWriter {
writer: BufWriter<File>,
blocks_written: u64,
bytes_written: u64,
}
impl CarWriter {
pub async fn create(path: &Path, roots: Vec<Cid>) -> Result<Self> {
let file = File::create(path)
.await
.map_err(|e| Error::Storage(format!("Failed to create CAR file: {e}")))?;
let mut writer = BufWriter::new(file);
let header = CarHeader::new(roots);
let header_bytes = header.to_cbor()?;
let header_len = encode_varint(header_bytes.len() as u64);
writer
.write_all(&header_len)
.await
.map_err(|e| Error::Storage(format!("Failed to write header length: {e}")))?;
writer
.write_all(&header_bytes)
.await
.map_err(|e| Error::Storage(format!("Failed to write header: {e}")))?;
let bytes_written = (header_len.len() + header_bytes.len()) as u64;
Ok(Self {
writer,
blocks_written: 0,
bytes_written,
})
}
pub async fn write_block(&mut self, block: &Block) -> Result<()> {
let cid_bytes = block.cid().to_bytes();
let data = block.data();
let block_len = cid_bytes.len() + data.len();
let len_bytes = encode_varint(block_len as u64);
self.writer
.write_all(&len_bytes)
.await
.map_err(|e| Error::Storage(format!("Failed to write block length: {e}")))?;
self.writer
.write_all(&cid_bytes)
.await
.map_err(|e| Error::Storage(format!("Failed to write CID: {e}")))?;
self.writer
.write_all(data)
.await
.map_err(|e| Error::Storage(format!("Failed to write block data: {e}")))?;
self.blocks_written += 1;
self.bytes_written += (len_bytes.len() + block_len) as u64;
Ok(())
}
pub async fn finish(mut self) -> Result<CarWriteStats> {
self.writer
.flush()
.await
.map_err(|e| Error::Storage(format!("Failed to flush CAR file: {e}")))?;
Ok(CarWriteStats {
blocks_written: self.blocks_written,
bytes_written: self.bytes_written,
})
}
pub fn stats(&self) -> CarWriteStats {
CarWriteStats {
blocks_written: self.blocks_written,
bytes_written: self.bytes_written,
}
}
}
#[derive(Debug, Clone)]
pub struct CarWriteStats {
pub blocks_written: u64,
pub bytes_written: u64,
}
pub struct CarReader {
reader: BufReader<File>,
header: CarHeader,
blocks_read: u64,
bytes_read: u64,
}
impl CarReader {
pub async fn open(path: &Path) -> Result<Self> {
let file = File::open(path)
.await
.map_err(|e| Error::Storage(format!("Failed to open CAR file: {e}")))?;
let mut reader = BufReader::new(file);
let mut header_len_buf = [0u8; 10];
let mut header_len_size = 0;
for i in 0..10 {
reader
.read_exact(&mut header_len_buf[i..i + 1])
.await
.map_err(|e| Error::Storage(format!("Failed to read header length: {e}")))?;
header_len_size = i + 1;
if header_len_buf[i] & 0x80 == 0 {
break;
}
}
let (header_len, _) = decode_varint(&header_len_buf[..header_len_size])?;
let mut header_bytes = vec![0u8; header_len as usize];
reader
.read_exact(&mut header_bytes)
.await
.map_err(|e| Error::Storage(format!("Failed to read header: {e}")))?;
let header = CarHeader::from_cbor(&header_bytes)?;
let bytes_read = (header_len_size + header_len as usize) as u64;
Ok(Self {
reader,
header,
blocks_read: 0,
bytes_read,
})
}
pub fn header(&self) -> &CarHeader {
&self.header
}
pub fn roots(&self) -> &[Cid] {
&self.header.roots
}
pub async fn read_block(&mut self) -> Result<Option<Block>> {
let mut len_buf = [0u8; 10];
let mut len_size = 0;
#[allow(clippy::needless_range_loop)]
for i in 0..10 {
let mut byte_buf = [0u8; 1];
match self.reader.read(&mut byte_buf).await {
Ok(0) => {
if i == 0 {
return Ok(None); }
return Err(Error::Storage("Incomplete block length".to_string()));
}
Ok(_) => {
len_buf[i] = byte_buf[0];
}
Err(e) => return Err(Error::Storage(format!("Failed to read block length: {e}"))),
}
len_size = i + 1;
if len_buf[i] & 0x80 == 0 {
break;
}
}
let (block_len, _) = decode_varint(&len_buf[..len_size])?;
let mut block_data = vec![0u8; block_len as usize];
self.reader
.read_exact(&mut block_data)
.await
.map_err(|e| Error::Storage(format!("Failed to read block data: {e}")))?;
let cid = Cid::try_from(block_data.clone())
.map_err(|e| Error::Cid(format!("Invalid CID in CAR: {e}")))?;
let cid_len = cid.to_bytes().len();
let data = Bytes::copy_from_slice(&block_data[cid_len..]);
self.blocks_read += 1;
self.bytes_read += (len_size + block_len as usize) as u64;
Ok(Some(Block::from_parts(cid, data)))
}
pub fn stats(&self) -> CarReadStats {
CarReadStats {
blocks_read: self.blocks_read,
bytes_read: self.bytes_read,
}
}
}
#[derive(Debug, Clone)]
pub struct CarReadStats {
pub blocks_read: u64,
pub bytes_read: u64,
}
pub async fn export_to_car<S: BlockStore>(
store: &S,
path: &Path,
roots: Vec<Cid>,
) -> Result<CarWriteStats> {
let mut writer = CarWriter::create(path, roots.clone()).await?;
let all_cids = store.list_cids()?;
for cid in all_cids {
if let Some(block) = store.get(&cid).await? {
writer.write_block(&block).await?;
}
}
writer.finish().await
}
pub async fn import_from_car<S: BlockStore>(store: &S, path: &Path) -> Result<CarReadStats> {
let mut reader = CarReader::open(path).await?;
while let Some(block) = reader.read_block().await? {
store.put(&block).await?;
}
Ok(reader.stats())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blockstore::{BlockStoreConfig, SledBlockStore};
use std::path::PathBuf;
fn make_test_block(data: &[u8]) -> Block {
Block::new(Bytes::copy_from_slice(data)).unwrap()
}
#[test]
fn test_varint_encode_decode() {
let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 1000000];
for &val in &test_values {
let encoded = encode_varint(val);
let (decoded, _) = decode_varint(&encoded).unwrap();
assert_eq!(val, decoded, "Failed for value {}", val);
}
}
#[test]
fn test_car_header_roundtrip() {
let block1 = make_test_block(b"test1");
let block2 = make_test_block(b"test2");
let roots = vec![*block1.cid(), *block2.cid()];
let header = CarHeader::new(roots.clone());
let cbor = header.to_cbor().unwrap();
let decoded = CarHeader::from_cbor(&cbor).unwrap();
assert_eq!(decoded.version, CAR_VERSION);
assert_eq!(decoded.roots.len(), 2);
assert_eq!(decoded.roots[0], roots[0]);
assert_eq!(decoded.roots[1], roots[1]);
}
#[tokio::test]
async fn test_car_write_read() {
let path = PathBuf::from("/tmp/test-car.car");
let _ = std::fs::remove_file(&path);
let block1 = make_test_block(b"hello world");
let block2 = make_test_block(b"goodbye world");
let roots = vec![*block1.cid()];
{
let mut writer = CarWriter::create(&path, roots.clone()).await.unwrap();
writer.write_block(&block1).await.unwrap();
writer.write_block(&block2).await.unwrap();
let stats = writer.finish().await.unwrap();
assert_eq!(stats.blocks_written, 2);
}
{
let mut reader = CarReader::open(&path).await.unwrap();
assert_eq!(reader.roots().len(), 1);
assert_eq!(reader.roots()[0], *block1.cid());
let read_block1 = reader.read_block().await.unwrap().unwrap();
assert_eq!(read_block1.cid(), block1.cid());
assert_eq!(read_block1.data(), block1.data());
let read_block2 = reader.read_block().await.unwrap().unwrap();
assert_eq!(read_block2.cid(), block2.cid());
assert_eq!(read_block2.data(), block2.data());
assert!(reader.read_block().await.unwrap().is_none());
}
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn test_export_import_car() {
let store_path = PathBuf::from("/tmp/ipfrs-test-car-store");
let car_path = PathBuf::from("/tmp/test-export.car");
let _ = std::fs::remove_dir_all(&store_path);
let _ = std::fs::remove_file(&car_path);
let config = BlockStoreConfig {
path: store_path.clone(),
cache_size: 1024 * 1024,
};
let store = SledBlockStore::new(config).unwrap();
let block1 = make_test_block(b"block1");
let block2 = make_test_block(b"block2");
store.put(&block1).await.unwrap();
store.put(&block2).await.unwrap();
let write_stats = export_to_car(&store, &car_path, vec![*block1.cid()])
.await
.unwrap();
assert_eq!(write_stats.blocks_written, 2);
let store_path2 = PathBuf::from("/tmp/ipfrs-test-car-store2");
let _ = std::fs::remove_dir_all(&store_path2);
let config2 = BlockStoreConfig {
path: store_path2.clone(),
cache_size: 1024 * 1024,
};
let store2 = SledBlockStore::new(config2).unwrap();
let read_stats = import_from_car(&store2, &car_path).await.unwrap();
assert_eq!(read_stats.blocks_read, 2);
assert!(store2.has(block1.cid()).await.unwrap());
assert!(store2.has(block2.cid()).await.unwrap());
let _ = std::fs::remove_dir_all(&store_path);
let _ = std::fs::remove_dir_all(&store_path2);
let _ = std::fs::remove_file(&car_path);
}
}