carbites 0.1.2

CAR file splitted implementation in Rust
Documentation
use ipld::{Cid, Ipld};
use std::collections::VecDeque;
use std::io::{Read, Seek};

use crate::error::CarSplitterError;
use crate::header::CarHeader;
use crate::reader::CarReader;
use crate::writer::CarWriter;
use crate::CarSplitter;

pub(crate) struct TreewalkSplitter<R> {
    root: Cid,
    target_size: usize,
    reader: CarReader<R>,
    writer: CarWriter,
    pending_blocks: VecDeque<PendingBlock>,
}

impl<R> TreewalkSplitter<R>
where
    R: Read + Seek,
{
    pub(crate) fn new(r: R, target_size: usize) -> Result<TreewalkSplitter<R>, CarSplitterError> {
        let mut reader = CarReader::new(r)?;
        let header = reader.header();

        if header.roots.len() != 1 {
            return Err(CarSplitterError::InvalidFile(
                "multiple roots not allowed".to_owned(),
            ));
        }
        let root = header.roots.first().unwrap().clone();
        let parents = vec![Parent::new(reader.read_section_data(&root)?, root)];

        let writer = new_car(root, parents.clone())?;

        let mut pending_blocks: VecDeque<PendingBlock> = VecDeque::new();
        let ipld = reader.ipld(&root)?;
        match ipld {
            Ipld::Map(_) => match ipld.take("Links") {
                Ok(Ipld::List(links)) => {
                    for link in links.iter() {
                        pending_blocks.push_back(PendingBlock {
                            cid: extract_cid(link)?,
                            parents: parents.clone(),
                        });
                    }
                }
                Ok(_) => {
                    return Err(CarSplitterError::Parsing(
                        "root node does not have links".to_owned(),
                    ))
                }
                Err(e) => {
                    return Err(CarSplitterError::Parsing(e.to_string()));
                }
            },
            Ipld::Bytes(_) => {}
            _ => {
                return Err(CarSplitterError::InvalidFile(
                    "root node is not a map".to_owned(),
                ))
            }
        }

        Ok(TreewalkSplitter {
            root,
            target_size,
            writer,
            reader,
            pending_blocks,
        })
    }

    fn next(&mut self) -> Result<Option<Vec<u8>>, CarSplitterError> {
        loop {
            let pending_block = match self.pending_blocks.pop_front() {
                Some(pending_block) => pending_block,
                None => {
                    if !self.writer.is_empty() {
                        let car = self.writer.flush();
                        return Ok(Some(car));
                    }
                    break;
                }
            };

            let data = self.reader.read_section_data(&pending_block.cid)?;
            let (ready_car, links) = self.add_block(&data, pending_block.cid)?;

            let mut parents = pending_block.parents;

            if !links.is_empty() {
                parents.push(Parent::new(data, pending_block.cid));

                let mut pb = VecDeque::<PendingBlock>::new();
                for link in links.iter() {
                    pb.push_back(PendingBlock {
                        cid: extract_cid(link)?,
                        parents: parents.clone(),
                    });
                }

                pb.append(&mut self.pending_blocks);
                self.pending_blocks = pb;
            }

            if let Some(ready_car) = ready_car {
                self.writer = new_car(self.root, parents)?;
                return Ok(Some(ready_car));
            }
        }

        Ok(None)
    }

    fn add_block(
        &mut self,
        block: &Vec<u8>,
        cid: Cid,
    ) -> Result<(Option<Vec<u8>>, Vec<Ipld>), CarSplitterError> {
        let mut car_ready = false;
        if block.len() + self.writer.len() > self.target_size {
            car_ready = true;
        }

        self.writer.write(cid, &block)?;

        let ipld = self.reader.ipld(&cid)?;
        let links = ipld.take("Links").unwrap_or(Ipld::List(Vec::new()));
        match (car_ready, links) {
            (true, Ipld::List(links)) => Ok((Some(self.writer.flush()), links)),
            (false, Ipld::List(links)) => Ok((None, links)),
            _ => Err(CarSplitterError::Parsing(
                "root node does not have links".to_owned(),
            )),
        }
    }
}

impl<R> CarSplitter for TreewalkSplitter<R>
where
    R: Read + Seek,
{
    fn next_chunk(&mut self) -> Result<Option<Vec<u8>>, CarSplitterError> {
        TreewalkSplitter::next(self)
    }
}

fn extract_cid(ipld: &Ipld) -> Result<Cid, CarSplitterError> {
    let hash = ipld.clone().take("Hash");
    match hash {
        Ok(Ipld::Link(cid)) => Ok(cid),
        Ok(_) => Err(CarSplitterError::InvalidFile(
            "Root node does not have links".to_owned(),
        )),
        Err(e) => Err(CarSplitterError::Parsing(e.to_string())),
    }
}

fn new_car(root: Cid, parents: Vec<Parent>) -> Result<CarWriter, CarSplitterError> {
    let header = CarHeader::new(vec![root]);

    let mut writer = CarWriter::new(header);
    writer.write_header()?;

    for parent in parents {
        writer.write(parent.cid, &parent.data)?;
    }

    Ok(writer)
}

#[derive(Debug, Clone)]
struct Parent {
    pub data: Vec<u8>,
    pub cid: Cid,
}

impl Parent {
    pub fn new(data: Vec<u8>, cid: Cid) -> Parent {
        Parent { data, cid }
    }
}

struct PendingBlock {
    pub parents: Vec<Parent>,
    pub cid: Cid,
}