rust_car/utils/
archive_local.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    fs,
4    io,
5    path::{Path, PathBuf},
6    rc::Rc,
7};
8
9use crate::{
10    codec::Encoder,
11    error::CarError,
12    header::CarHeaderV1,
13    unixfs::{FileType, Link, UnixFs},
14    writer::{CarWriter, CarWriterV1, WriteStream},
15    CarHeader, Ipld,
16};
17use cid::{
18    multihash::{
19        Code, 
20        MultihashDigest, 
21        Hasher,
22        Multihash,
23        Blake2b256,
24    },
25    Cid,
26};
27use ipld::{pb::DagPbCodec, prelude::Codec, raw::RawCodec};
28use path_absolutize::*;
29
30use super::BLAKE2B256_CODEC;
31
32type WalkPath = (Rc<PathBuf>, Option<usize>);
33
34type WalkPathCache = HashMap<Rc<PathBuf>, UnixFs>;
35
36/// archive the directory to the target CAR format file
37/// `path` is the directory archived in to the CAR file.
38/// `to_carfile` is the target file.
39pub fn archive_local<T>(path: impl AsRef<Path>, to_carfile: T) -> Result<(), CarError>
40where
41    T: std::io::Write + std::io::Seek,
42{
43    let src_path = path.as_ref();
44    if !src_path.exists() {
45        return Err(CarError::IO(io::ErrorKind::NotFound.into()));
46    }
47    let root_path = src_path.absolutize().unwrap();
48    let path = root_path.to_path_buf();
49    // ensure sufficient file block size for head, after the root cid generated using the content, fill back the head.
50    let mut root_cid = Some(empty_pb_cid());
51    let header = CarHeader::new_v1(vec![root_cid.unwrap()]);
52    let mut writer = CarWriterV1::new(to_carfile, header);
53    walk_dir(
54        path,
55        |(abs_path, parent_idx), path_map| -> Result<(), CarError> {
56            let unixfs = path_map.get_mut(abs_path).unwrap();
57            for link in unixfs.links.iter_mut() {
58                match link.guess_type {
59                    FileType::Directory => {}
60                    FileType::File => {
61                        //TODO: split file when file size is bigger than the max section size.
62                        let filepath = abs_path.join(link.name_ref());
63                        let mut file = fs::OpenOptions::new().read(true).open(filepath)?;
64                        let mut hash_codec = Blake2b256::default();
65                        let cid_gen = |w: WriteStream| {
66                            match w {
67                                WriteStream::Bytes(bs) => {
68                                    hash_codec.update(bs);
69                                    None
70                                },
71                                WriteStream::End => {
72                                    let bs = hash_codec.finalize();
73                                    let h = match Multihash::wrap(BLAKE2B256_CODEC, bs) {
74                                        Ok(h) => h,
75                                        Err(e) => return Some(Err(CarError::Parsing(e.to_string()))),
76                                    };
77                                    Some(Ok(Cid::new_v1(RawCodec.into(), h)))
78                                },
79                            }
80                        };
81                        let file_cid = writer.write_stream(cid_gen, link.tsize as usize, &mut file)?;
82                        link.hash = file_cid;
83                    }
84                    _ => unreachable!("not support!"),
85                }
86            }
87            let fs_ipld: Ipld = unixfs.encode()?;
88            let bs = DagPbCodec
89                .encode(&fs_ipld)
90                .map_err(|e| CarError::Parsing(e.to_string()))?;
91            let cid = pb_cid(&bs);
92
93            if root_path.as_ref() == abs_path.as_ref() {
94                root_cid = Some(cid);
95            }
96            writer.write(cid, bs)?;
97            unixfs.cid = Some(cid);
98            match abs_path.parent() {
99                Some(parent) => {
100                    let parent = Rc::new(parent.to_path_buf());
101
102                    if let Some((p, pos)) = path_map.get_mut(&parent).zip(*parent_idx) {
103                        p.links[pos].hash = cid;
104                    }
105                }
106                None => unimplemented!("should not happend"),
107            }
108            Ok(())
109        },
110    )?;
111    let root_cid = root_cid.ok_or(CarError::NotFound("root cid not found.".to_string()))?;
112    let header = CarHeader::V1(CarHeaderV1::new(vec![root_cid]));
113    writer.rewrite_header(header)
114}
115
116pub fn pipe_raw_cid<R, W>(r: &mut R, w: &mut W) -> Result<Cid, CarError> 
117where
118    R: std::io::Read,
119    W: std::io::Write,
120{
121    let mut hash_codec = cid::multihash::Blake2b256::default();
122    let mut bs = [0u8; 1024];
123    while let Ok(n) = r.read(&mut bs) {
124        hash_codec.update(&bs[0..n]);
125        w.write_all(&bs[0..n])?;
126    }
127    let bs = hash_codec.finalize();
128    let h = cid::multihash::Multihash::wrap(BLAKE2B256_CODEC, bs);
129    let h = h.map_err(|e| CarError::Parsing(e.to_string()))?;
130    Ok(Cid::new_v1(DagPbCodec.into(), h))
131}
132
133#[inline(always)]
134pub fn empty_pb_cid() -> Cid {
135    pb_cid(&[])
136}
137
138#[inline(always)]
139pub fn pb_cid(data: &[u8]) -> Cid {
140    let h = Code::Blake2b256.digest(data);
141    Cid::new_v1(DagPbCodec.into(), h)
142}
143
144#[inline(always)]
145pub fn raw_cid(data: &[u8]) -> Cid {
146    let h = Code::Blake2b256.digest(data);
147    Cid::new_v1(RawCodec.into(), h)
148}
149
150/// walk all directory, and record the directory informations.
151/// `dir_queue` is the dir queue for hold the directory.
152/// `WalkPath` contain the index in children.
153fn walk_inner(
154    dir_queue: &mut VecDeque<Rc<PathBuf>>,
155    path_map: &mut WalkPathCache,
156) -> Result<Vec<WalkPath>, CarError> {
157    let mut dirs = Vec::new();
158    while let Some(dir_path) = dir_queue.pop_back() {
159        let mut unix_dir = UnixFs {
160            file_type: FileType::Directory,
161            ..Default::default()
162        };
163        for entry in fs::read_dir(&*dir_path)? {
164            let entry = entry?;
165            let file_type = entry.file_type()?;
166            let file_path = entry.path();
167            let abs_path = file_path.absolutize()?.to_path_buf();
168
169            let name = entry.file_name().to_str().unwrap_or("").to_string();
170            let tsize = entry.metadata()?.len();
171            let mut link = Link {
172                name,
173                tsize,
174                ..Default::default()
175            };
176            if file_type.is_file() {
177                link.guess_type = FileType::File;
178                unix_dir.add_link(link);
179            } else if file_type.is_dir() {
180                let rc_abs_path = Rc::new(abs_path);
181                link.guess_type = FileType::Directory;
182                let idx = unix_dir.add_link(link);
183                dirs.push((rc_abs_path.clone(), Some(idx)));
184                dir_queue.push_back(rc_abs_path);
185            }
186            //skip other types.
187        }
188        path_map.insert(dir_path, unix_dir);
189    }
190    dirs.reverse();
191    Ok(dirs)
192}
193
194pub fn walk_dir<T>(root: impl AsRef<Path>, mut walker: T) -> Result<(), CarError>
195where
196    T: FnMut(&WalkPath, &mut WalkPathCache) -> Result<(), CarError>,
197{
198    let src_path = root.as_ref().absolutize()?;
199    let mut queue = VecDeque::new();
200    let mut path_map: HashMap<Rc<PathBuf>, UnixFs> = HashMap::new();
201    let root_path: Rc<PathBuf> = Rc::new(src_path.into());
202    queue.push_back(root_path.clone());
203    let mut keys = walk_inner(&mut queue, &mut path_map)?;
204    keys.push((root_path, None));
205    for key in keys.iter() {
206        walker(key, &mut path_map)?;
207    }
208    Ok(())
209}