1use crate::traits::BlockStore;
36use bytes::Bytes;
37use ipfrs_core::{Block, Cid, Error, Result};
38use std::path::Path;
39use tokio::fs::File;
40use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
41
42pub const CAR_VERSION: u64 = 1;
44
45#[derive(Debug, Clone)]
47pub struct CarHeader {
48 pub version: u64,
50 pub roots: Vec<Cid>,
52}
53
54impl CarHeader {
55 pub fn new(roots: Vec<Cid>) -> Self {
57 Self {
58 version: CAR_VERSION,
59 roots,
60 }
61 }
62
63 pub fn to_cbor(&self) -> Result<Vec<u8>> {
65 let mut buf = Vec::new();
68
69 buf.push(0xa2); buf.push(0x67); buf.extend_from_slice(b"version");
75 buf.push(0x01); buf.push(0x65); buf.extend_from_slice(b"roots");
81
82 let roots_len = self.roots.len();
84 if roots_len < 24 {
85 buf.push(0x80 | roots_len as u8); } else if roots_len < 256 {
87 buf.push(0x98); buf.push(roots_len as u8);
89 } else {
90 return Err(Error::InvalidData("Too many roots".to_string()));
91 }
92
93 for root in &self.roots {
94 let cid_bytes = root.to_bytes();
95 buf.push(0xd8); buf.push(0x2a);
98 if cid_bytes.len() < 24 {
100 buf.push(0x40 | cid_bytes.len() as u8);
101 } else if cid_bytes.len() < 256 {
102 buf.push(0x58);
103 buf.push(cid_bytes.len() as u8);
104 } else {
105 buf.push(0x59);
106 buf.extend_from_slice(&(cid_bytes.len() as u16).to_be_bytes());
107 }
108 buf.extend_from_slice(&cid_bytes);
109 }
110
111 Ok(buf)
112 }
113
114 pub fn from_cbor(data: &[u8]) -> Result<Self> {
116 if data.is_empty() || (data[0] & 0xe0) != 0xa0 {
120 return Err(Error::InvalidData("Expected CBOR map".to_string()));
121 }
122
123 let (map_len, mut pos) = if data[0] == 0xa2 {
124 (2, 1)
125 } else {
126 return Err(Error::InvalidData("Expected map(2)".to_string()));
127 };
128
129 let mut version = 1u64;
130 let mut roots = Vec::new();
131
132 for _ in 0..map_len {
133 let (key, new_pos) = read_cbor_text(&data[pos..])?;
135 pos += new_pos;
136
137 match key.as_str() {
138 "version" => {
139 if pos >= data.len() {
141 return Err(Error::InvalidData("Unexpected end".to_string()));
142 }
143 let (v, new_pos) = read_cbor_uint(&data[pos..])?;
144 version = v;
145 pos += new_pos;
146 }
147 "roots" => {
148 let (r, new_pos) = read_cbor_roots(&data[pos..])?;
150 roots = r;
151 pos += new_pos;
152 }
153 _ => {
154 let new_pos = skip_cbor_value(&data[pos..])?;
156 pos += new_pos;
157 }
158 }
159 }
160
161 Ok(Self { version, roots })
162 }
163}
164
165fn read_cbor_text(data: &[u8]) -> Result<(String, usize)> {
167 if data.is_empty() {
168 return Err(Error::InvalidData("Unexpected end".to_string()));
169 }
170
171 let major = data[0] >> 5;
172 if major != 3 {
173 return Err(Error::InvalidData("Expected text string".to_string()));
175 }
176
177 let (len, header_len) = read_cbor_len(data)?;
178 let total_len = header_len + len;
179
180 if data.len() < total_len {
181 return Err(Error::InvalidData("Text string too short".to_string()));
182 }
183
184 let text = String::from_utf8(data[header_len..total_len].to_vec())
185 .map_err(|e| Error::InvalidData(format!("Invalid UTF-8: {e}")))?;
186
187 Ok((text, total_len))
188}
189
190fn read_cbor_uint(data: &[u8]) -> Result<(u64, usize)> {
192 if data.is_empty() {
193 return Err(Error::InvalidData("Unexpected end".to_string()));
194 }
195
196 let major = data[0] >> 5;
197 if major != 0 {
198 return Err(Error::InvalidData("Expected unsigned int".to_string()));
199 }
200
201 let (val, len) = read_cbor_len(data)?;
202 Ok((val as u64, len))
203}
204
205fn read_cbor_len(data: &[u8]) -> Result<(usize, usize)> {
207 if data.is_empty() {
208 return Err(Error::InvalidData("Unexpected end".to_string()));
209 }
210
211 let additional = data[0] & 0x1f;
212
213 match additional {
214 0..=23 => Ok((additional as usize, 1)),
215 24 => {
216 if data.len() < 2 {
217 return Err(Error::InvalidData("Length too short".to_string()));
218 }
219 Ok((data[1] as usize, 2))
220 }
221 25 => {
222 if data.len() < 3 {
223 return Err(Error::InvalidData("Length too short".to_string()));
224 }
225 Ok((u16::from_be_bytes([data[1], data[2]]) as usize, 3))
226 }
227 26 => {
228 if data.len() < 5 {
229 return Err(Error::InvalidData("Length too short".to_string()));
230 }
231 Ok((
232 u32::from_be_bytes([data[1], data[2], data[3], data[4]]) as usize,
233 5,
234 ))
235 }
236 _ => Err(Error::InvalidData(
237 "Unsupported length encoding".to_string(),
238 )),
239 }
240}
241
242fn read_cbor_roots(data: &[u8]) -> Result<(Vec<Cid>, usize)> {
244 if data.is_empty() {
245 return Err(Error::InvalidData("Unexpected end".to_string()));
246 }
247
248 let major = data[0] >> 5;
249 if major != 4 {
250 return Err(Error::InvalidData("Expected array".to_string()));
252 }
253
254 let (arr_len, header_len) = read_cbor_len(data)?;
255 let mut pos = header_len;
256 let mut roots = Vec::with_capacity(arr_len);
257
258 for _ in 0..arr_len {
259 if pos < data.len() && data[pos] == 0xd8 {
261 pos += 2; }
263
264 if pos >= data.len() {
266 return Err(Error::InvalidData("Unexpected end in roots".to_string()));
267 }
268
269 let major = data[pos] >> 5;
270 if major != 2 {
271 return Err(Error::InvalidData(
273 "Expected byte string for CID".to_string(),
274 ));
275 }
276
277 let (len, header) = read_cbor_len(&data[pos..])?;
278 pos += header;
279
280 if pos + len > data.len() {
281 return Err(Error::InvalidData("CID bytes too short".to_string()));
282 }
283
284 let cid = Cid::try_from(data[pos..pos + len].to_vec())
285 .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
286 roots.push(cid);
287 pos += len;
288 }
289
290 Ok((roots, pos))
291}
292
293fn skip_cbor_value(data: &[u8]) -> Result<usize> {
295 if data.is_empty() {
296 return Err(Error::InvalidData("Unexpected end".to_string()));
297 }
298
299 let major = data[0] >> 5;
300 let (len, header_len) = read_cbor_len(data)?;
301
302 match major {
303 0 | 1 => Ok(header_len), 2 | 3 => Ok(header_len + len), 4 => {
306 let mut pos = header_len;
308 for _ in 0..len {
309 pos += skip_cbor_value(&data[pos..])?;
310 }
311 Ok(pos)
312 }
313 5 => {
314 let mut pos = header_len;
316 for _ in 0..len {
317 pos += skip_cbor_value(&data[pos..])?; pos += skip_cbor_value(&data[pos..])?; }
320 Ok(pos)
321 }
322 6 => {
323 Ok(header_len + skip_cbor_value(&data[header_len..])?)
325 }
326 7 => Ok(header_len), _ => Err(Error::InvalidData("Unknown CBOR major type".to_string())),
328 }
329}
330
331fn encode_varint(mut value: u64) -> Vec<u8> {
333 let mut buf = Vec::new();
334 while value >= 0x80 {
335 buf.push((value as u8) | 0x80);
336 value >>= 7;
337 }
338 buf.push(value as u8);
339 buf
340}
341
342fn decode_varint(data: &[u8]) -> Result<(u64, usize)> {
344 let mut result: u64 = 0;
345 let mut shift = 0;
346
347 for (i, &byte) in data.iter().enumerate() {
348 result |= ((byte & 0x7f) as u64) << shift;
349 if byte & 0x80 == 0 {
350 return Ok((result, i + 1));
351 }
352 shift += 7;
353 if shift >= 64 {
354 return Err(Error::InvalidData("Varint too long".to_string()));
355 }
356 }
357
358 Err(Error::InvalidData("Incomplete varint".to_string()))
359}
360
361pub struct CarWriter {
363 writer: BufWriter<File>,
364 blocks_written: u64,
365 bytes_written: u64,
366}
367
368impl CarWriter {
369 pub async fn create(path: &Path, roots: Vec<Cid>) -> Result<Self> {
371 let file = File::create(path)
372 .await
373 .map_err(|e| Error::Storage(format!("Failed to create CAR file: {e}")))?;
374
375 let mut writer = BufWriter::new(file);
376
377 let header = CarHeader::new(roots);
379 let header_bytes = header.to_cbor()?;
380 let header_len = encode_varint(header_bytes.len() as u64);
381
382 writer
383 .write_all(&header_len)
384 .await
385 .map_err(|e| Error::Storage(format!("Failed to write header length: {e}")))?;
386 writer
387 .write_all(&header_bytes)
388 .await
389 .map_err(|e| Error::Storage(format!("Failed to write header: {e}")))?;
390
391 let bytes_written = (header_len.len() + header_bytes.len()) as u64;
392
393 Ok(Self {
394 writer,
395 blocks_written: 0,
396 bytes_written,
397 })
398 }
399
400 pub async fn write_block(&mut self, block: &Block) -> Result<()> {
402 let cid_bytes = block.cid().to_bytes();
403 let data = block.data();
404
405 let block_len = cid_bytes.len() + data.len();
407 let len_bytes = encode_varint(block_len as u64);
408
409 self.writer
410 .write_all(&len_bytes)
411 .await
412 .map_err(|e| Error::Storage(format!("Failed to write block length: {e}")))?;
413 self.writer
414 .write_all(&cid_bytes)
415 .await
416 .map_err(|e| Error::Storage(format!("Failed to write CID: {e}")))?;
417 self.writer
418 .write_all(data)
419 .await
420 .map_err(|e| Error::Storage(format!("Failed to write block data: {e}")))?;
421
422 self.blocks_written += 1;
423 self.bytes_written += (len_bytes.len() + block_len) as u64;
424
425 Ok(())
426 }
427
428 pub async fn finish(mut self) -> Result<CarWriteStats> {
430 self.writer
431 .flush()
432 .await
433 .map_err(|e| Error::Storage(format!("Failed to flush CAR file: {e}")))?;
434
435 Ok(CarWriteStats {
436 blocks_written: self.blocks_written,
437 bytes_written: self.bytes_written,
438 })
439 }
440
441 pub fn stats(&self) -> CarWriteStats {
443 CarWriteStats {
444 blocks_written: self.blocks_written,
445 bytes_written: self.bytes_written,
446 }
447 }
448}
449
450#[derive(Debug, Clone)]
452pub struct CarWriteStats {
453 pub blocks_written: u64,
454 pub bytes_written: u64,
455}
456
457pub struct CarReader {
459 reader: BufReader<File>,
460 header: CarHeader,
461 blocks_read: u64,
462 bytes_read: u64,
463}
464
465impl CarReader {
466 pub async fn open(path: &Path) -> Result<Self> {
468 let file = File::open(path)
469 .await
470 .map_err(|e| Error::Storage(format!("Failed to open CAR file: {e}")))?;
471
472 let mut reader = BufReader::new(file);
473
474 let mut header_len_buf = [0u8; 10];
476 let mut header_len_size = 0;
477
478 for i in 0..10 {
479 reader
480 .read_exact(&mut header_len_buf[i..i + 1])
481 .await
482 .map_err(|e| Error::Storage(format!("Failed to read header length: {e}")))?;
483 header_len_size = i + 1;
484 if header_len_buf[i] & 0x80 == 0 {
485 break;
486 }
487 }
488
489 let (header_len, _) = decode_varint(&header_len_buf[..header_len_size])?;
490
491 let mut header_bytes = vec![0u8; header_len as usize];
493 reader
494 .read_exact(&mut header_bytes)
495 .await
496 .map_err(|e| Error::Storage(format!("Failed to read header: {e}")))?;
497
498 let header = CarHeader::from_cbor(&header_bytes)?;
499
500 let bytes_read = (header_len_size + header_len as usize) as u64;
501
502 Ok(Self {
503 reader,
504 header,
505 blocks_read: 0,
506 bytes_read,
507 })
508 }
509
510 pub fn header(&self) -> &CarHeader {
512 &self.header
513 }
514
515 pub fn roots(&self) -> &[Cid] {
517 &self.header.roots
518 }
519
520 pub async fn read_block(&mut self) -> Result<Option<Block>> {
522 let mut len_buf = [0u8; 10];
524 let mut len_size = 0;
525
526 #[allow(clippy::needless_range_loop)]
527 for i in 0..10 {
528 let mut byte_buf = [0u8; 1];
529 match self.reader.read(&mut byte_buf).await {
530 Ok(0) => {
531 if i == 0 {
532 return Ok(None); }
534 return Err(Error::Storage("Incomplete block length".to_string()));
535 }
536 Ok(_) => {
537 len_buf[i] = byte_buf[0];
538 }
539 Err(e) => return Err(Error::Storage(format!("Failed to read block length: {e}"))),
540 }
541 len_size = i + 1;
542 if len_buf[i] & 0x80 == 0 {
543 break;
544 }
545 }
546
547 let (block_len, _) = decode_varint(&len_buf[..len_size])?;
548
549 let mut block_data = vec![0u8; block_len as usize];
551 self.reader
552 .read_exact(&mut block_data)
553 .await
554 .map_err(|e| Error::Storage(format!("Failed to read block data: {e}")))?;
555
556 let cid = Cid::try_from(block_data.clone())
558 .map_err(|e| Error::Cid(format!("Invalid CID in CAR: {e}")))?;
559
560 let cid_len = cid.to_bytes().len();
561 let data = Bytes::copy_from_slice(&block_data[cid_len..]);
562
563 self.blocks_read += 1;
564 self.bytes_read += (len_size + block_len as usize) as u64;
565
566 Ok(Some(Block::from_parts(cid, data)))
567 }
568
569 pub fn stats(&self) -> CarReadStats {
571 CarReadStats {
572 blocks_read: self.blocks_read,
573 bytes_read: self.bytes_read,
574 }
575 }
576}
577
578#[derive(Debug, Clone)]
580pub struct CarReadStats {
581 pub blocks_read: u64,
582 pub bytes_read: u64,
583}
584
585pub async fn export_to_car<S: BlockStore>(
587 store: &S,
588 path: &Path,
589 roots: Vec<Cid>,
590) -> Result<CarWriteStats> {
591 let mut writer = CarWriter::create(path, roots.clone()).await?;
592
593 let all_cids = store.list_cids()?;
595
596 for cid in all_cids {
597 if let Some(block) = store.get(&cid).await? {
598 writer.write_block(&block).await?;
599 }
600 }
601
602 writer.finish().await
603}
604
605pub async fn import_from_car<S: BlockStore>(store: &S, path: &Path) -> Result<CarReadStats> {
607 let mut reader = CarReader::open(path).await?;
608
609 while let Some(block) = reader.read_block().await? {
610 store.put(&block).await?;
611 }
612
613 Ok(reader.stats())
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619 use crate::blockstore::{BlockStoreConfig, SledBlockStore};
620 use std::path::PathBuf;
621
622 fn make_test_block(data: &[u8]) -> Block {
623 Block::new(Bytes::copy_from_slice(data)).unwrap()
624 }
625
626 #[test]
627 fn test_varint_encode_decode() {
628 let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 1000000];
629
630 for &val in &test_values {
631 let encoded = encode_varint(val);
632 let (decoded, _) = decode_varint(&encoded).unwrap();
633 assert_eq!(val, decoded, "Failed for value {}", val);
634 }
635 }
636
637 #[test]
638 fn test_car_header_roundtrip() {
639 let block1 = make_test_block(b"test1");
640 let block2 = make_test_block(b"test2");
641 let roots = vec![*block1.cid(), *block2.cid()];
642
643 let header = CarHeader::new(roots.clone());
644 let cbor = header.to_cbor().unwrap();
645 let decoded = CarHeader::from_cbor(&cbor).unwrap();
646
647 assert_eq!(decoded.version, CAR_VERSION);
648 assert_eq!(decoded.roots.len(), 2);
649 assert_eq!(decoded.roots[0], roots[0]);
650 assert_eq!(decoded.roots[1], roots[1]);
651 }
652
653 #[tokio::test]
654 async fn test_car_write_read() {
655 let path = PathBuf::from("/tmp/test-car.car");
656 let _ = std::fs::remove_file(&path);
657
658 let block1 = make_test_block(b"hello world");
659 let block2 = make_test_block(b"goodbye world");
660 let roots = vec![*block1.cid()];
661
662 {
664 let mut writer = CarWriter::create(&path, roots.clone()).await.unwrap();
665 writer.write_block(&block1).await.unwrap();
666 writer.write_block(&block2).await.unwrap();
667 let stats = writer.finish().await.unwrap();
668 assert_eq!(stats.blocks_written, 2);
669 }
670
671 {
673 let mut reader = CarReader::open(&path).await.unwrap();
674 assert_eq!(reader.roots().len(), 1);
675 assert_eq!(reader.roots()[0], *block1.cid());
676
677 let read_block1 = reader.read_block().await.unwrap().unwrap();
678 assert_eq!(read_block1.cid(), block1.cid());
679 assert_eq!(read_block1.data(), block1.data());
680
681 let read_block2 = reader.read_block().await.unwrap().unwrap();
682 assert_eq!(read_block2.cid(), block2.cid());
683 assert_eq!(read_block2.data(), block2.data());
684
685 assert!(reader.read_block().await.unwrap().is_none());
687 }
688
689 let _ = std::fs::remove_file(&path);
690 }
691
692 #[tokio::test]
693 async fn test_export_import_car() {
694 let store_path = PathBuf::from("/tmp/ipfrs-test-car-store");
695 let car_path = PathBuf::from("/tmp/test-export.car");
696 let _ = std::fs::remove_dir_all(&store_path);
697 let _ = std::fs::remove_file(&car_path);
698
699 let config = BlockStoreConfig {
700 path: store_path.clone(),
701 cache_size: 1024 * 1024,
702 };
703 let store = SledBlockStore::new(config).unwrap();
704
705 let block1 = make_test_block(b"block1");
707 let block2 = make_test_block(b"block2");
708 store.put(&block1).await.unwrap();
709 store.put(&block2).await.unwrap();
710
711 let write_stats = export_to_car(&store, &car_path, vec![*block1.cid()])
713 .await
714 .unwrap();
715 assert_eq!(write_stats.blocks_written, 2);
716
717 let store_path2 = PathBuf::from("/tmp/ipfrs-test-car-store2");
719 let _ = std::fs::remove_dir_all(&store_path2);
720 let config2 = BlockStoreConfig {
721 path: store_path2.clone(),
722 cache_size: 1024 * 1024,
723 };
724 let store2 = SledBlockStore::new(config2).unwrap();
725
726 let read_stats = import_from_car(&store2, &car_path).await.unwrap();
727 assert_eq!(read_stats.blocks_read, 2);
728
729 assert!(store2.has(block1.cid()).await.unwrap());
731 assert!(store2.has(block2.cid()).await.unwrap());
732
733 let _ = std::fs::remove_dir_all(&store_path);
734 let _ = std::fs::remove_dir_all(&store_path2);
735 let _ = std::fs::remove_file(&car_path);
736 }
737}