1use crate::{
20 bounded_reader::{
21 sync::{BoundedReader, ChainedBoundedReader},
22 traits::{Bounded as _, CloneAndRewind as _},
23 },
24 config::{CidCodec, Config},
25 dag_pb::{BlockLink, DagPb, DagPbType, Link, NamedLink},
26 ensure,
27 error::{Error, InvalidErr, LoopDetectedErr, NotFoundErr, NotSupportedErr, Result},
28 fail, proto,
29 traits::ContextLen,
30};
31
32use bytes::{Buf, Bytes};
33use libipld::{
34 multihash::MultihashDigest,
35 pb::{PbLink, PbNode},
36 Cid,
37};
38use petgraph::{
39 graph::{EdgeReference, Graph, NodeIndex},
40 visit::{Dfs, EdgeRef, Reversed, Walker},
41 Direction,
42};
43use smallvec::{smallvec, SmallVec};
44use std::{
45 collections::{HashMap, HashSet, VecDeque},
46 fs::File,
47 io::{copy, BufWriter, Read, Seek, SeekFrom, Write},
48 path::{Component, Path, PathBuf},
49};
50use tempfile::tempfile;
51use tracing::{debug, trace};
52
53mod metadata;
54pub use metadata::{FileType, Metadata};
55mod block;
56pub use block::{Block, BlockType};
57mod block_builder;
58use block_builder::BlockBuilder;
59mod block_def;
60pub(crate) use block_def::BlockDef;
61mod cbor_cid;
62#[cfg(feature = "vfs")]
63pub mod fs;
64mod header;
65pub(crate) use header::CarHeader;
66#[cfg(test)]
67mod tests;
68
69pub type BlockId = NodeIndex<u32>;
70pub type SmallBlockIds = SmallVec<[BlockId; 1]>;
71static BLOCK_INSERTED_QED: &str = "Block just added .qed";
72
73#[derive(derive_more::Debug)]
74pub struct ContentAddressableArchive<T> {
75 config: Config,
77
78 pub content: BoundedReader<T>,
80
81 pub dag: Graph<Block<T>, Link>,
83
84 root_ids: SmallBlockIds,
86
87 index_by_cid: HashMap<Cid, BlockId>,
89
90 mbf_pending_links: HashMap<Cid, BlockId>,
93
94 pub car_overhead_byte_counter: u64,
96}
97
98impl ContentAddressableArchive<BufWriter<File>> {
99 pub fn new_temp(config: Config) -> Result<Self> {
100 let content = BoundedReader::from_reader(BufWriter::new(tempfile()?))?;
101
102 Ok(Self::base_new(content, config))
103 }
104}
105
106impl<T> ContentAddressableArchive<T> {
107 pub fn new_without_root(config: Config) -> Self {
108 Self::base_new(BoundedReader::empty(), config)
109 }
110
111 fn base_new(content: BoundedReader<T>, config: Config) -> Self {
112 let root_ids = SmallBlockIds::new();
113 let index_by_cid = HashMap::new();
114 let mbf_pending_links = HashMap::new();
115 let dag = Graph::new();
116
117 Self { content, config, dag, index_by_cid, mbf_pending_links, root_ids, car_overhead_byte_counter: 0u64 }
118 }
119
120 pub(crate) fn add_block_without_cid(&mut self, block: Block<T>) -> BlockId {
121 let id = self.dag.add_node(block);
122 let block = self.dag.node_weight(id).expect(BLOCK_INSERTED_QED);
123 debug!(?id, ?block, "Added block without cid");
124 id
125 }
126
127 pub(crate) fn add_block(&mut self, block: Block<T>) -> BlockId {
128 let id = if let Some(id) = self.index_by_cid.get(&block.cid).copied() {
130 if let Some(pre_block) = self.dag.node_weight_mut(id) {
131 if let Some(DagPbType::MissingBlock(..)) = &pre_block.dag_pb_type() {
132 *pre_block = block
133 }
134 }
135 id
136 } else {
137 self.dag.add_node(block)
138 };
139
140 let (cid, pb_data_len) = {
141 let block = self.dag.node_weight(id).expect(BLOCK_INSERTED_QED);
142 debug!(?id, ?block, "Added block");
143 (block.cid, block.pb_data_len())
144 };
145
146 self.check_mbf_pending_link(id, cid, pb_data_len);
148
149 self.index_by_cid.insert(cid, id);
151 id
152 }
153
154 pub(crate) fn link_children(&mut self, id: BlockId, children: &[BlockId]) {
155 for child_id in children {
156 let child_pb_len = self.dag.node_weight(*child_id).map(|block| block.pb_data_len()).unwrap_or_default();
157 let link = BlockLink::new(child_pb_len).into();
158 self.dag.add_edge(id, *child_id, link);
159 }
160 }
161
162 fn check_mbf_pending_link(&mut self, id: BlockId, cid: Cid, pb_data_len: u64) {
163 if let Some(parent_id) = self.mbf_pending_links.get(&cid) {
164 let link = BlockLink::new(pb_data_len).into();
165 debug!(?parent_id, cid = cid.to_string(), ?link, "MBF pending link found");
166 self.dag.add_edge(*parent_id, id, link);
167 self.mbf_pending_links.remove(&cid);
168 }
169 }
170
171 pub(crate) fn add_multi_block_file(&mut self, block: Block<T>, links: &[PbLink]) -> BlockId {
172 let dag_pb_len = block.pb_data_len();
173 let id = self.add_block(block);
174
175 for l in links {
176 if let Some(link_id) = self.index_by_cid.get(&l.cid) {
177 let link = BlockLink::new(dag_pb_len).into();
178 self.dag.add_edge(id, *link_id, link);
179 } else {
180 self.mbf_pending_links.insert(l.cid, id);
181 }
182 }
183
184 id
185 }
186
187 pub(crate) fn add_directory(&mut self, block: Block<T>, links: &[PbLink]) -> BlockId {
188 let id = self.add_block(block);
189 tracing::debug!(?links, "Add directory");
190
191 for link in links {
192 let link_id = self.index_by_cid.get(&link.cid).copied().unwrap_or_else(|| {
193 let missing_block = Block::new_dag_pb(link.cid, DagPbType::MissingBlock(Box::new(link.clone())), ());
194 self.add_block(missing_block)
195 });
196 let link = NamedLink::new(link.name.clone().unwrap_or_default()).into();
197 self.dag.add_edge(id, link_id, link);
198 }
199
200 id
201 }
202
203 fn path_to_block_ids(&self, path: &Path) -> Result<SmallBlockIds> {
207 let not_found_path = || NotFoundErr::Path(path.to_owned());
208 let mut levels = vec![self.root_ids.clone()];
209
210 for path_component in path.components() {
211 match path_component {
212 Component::Normal(os_name) => {
213 let name = os_name.to_str().ok_or_else(not_found_path)?;
214
215 let mut new_level = SmallBlockIds::new();
216 for block_id in levels.last().ok_or_else(not_found_path)? {
217 let mut targets = self
218 .dag
219 .edges_directed(*block_id, Direction::Outgoing)
220 .filter_map(|edge| (edge.weight().name() == Some(name)).then_some(edge.target()))
221 .collect::<SmallBlockIds>();
222 new_level.append(&mut targets);
223 }
224
225 levels.push(new_level)
226 },
227 Component::RootDir | Component::CurDir => {},
228 Component::ParentDir => {
229 levels.pop().ok_or_else(not_found_path)?;
230 },
231 Component::Prefix(..) => fail!(NotSupportedErr::Prefix),
232 }
233 }
234
235 levels.pop().ok_or_else(|| not_found_path().into())
236 }
237
238 fn path_to_block_id(&self, path: &Path) -> Result<BlockId> {
242 let ids = self.path_to_block_ids(path)?;
243 ensure!(ids.len() < 2, Error::more_than_one(ids.len(), path));
244 ids.first().copied().ok_or_else(|| Error::NotFound(NotFoundErr::Path(path.to_owned())))
245 }
246
247 fn path_to_block(&self, path: &Path) -> Result<&'_ Block<T>> {
249 let id = self.path_to_block_id(path)?;
250 self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id).into())
251 }
252
253 pub fn path_to_cid(&self, path: &Path) -> Option<&Cid> {
254 self.path_to_block(path).map(|block| &block.cid).ok()
255 }
256
257 fn outgoing_links(&self, id: BlockId) -> Vec<BlockId> {
258 self.dag.edges_directed(id, Direction::Outgoing).map(|edge| edge.target()).collect()
259 }
260
261 fn outgoing_links_as_entries(&self, id: BlockId) -> Vec<PbLink> {
262 let into_pb_link = |edge: EdgeReference<'_, Link>, name: &str| {
263 let target_id = edge.target();
264 let target = self.dag.node_weight(target_id)?;
265 Some(proto::new_pb_link(target.cid, name.to_owned(), None))
266 };
267
268 let mut named_links = self
270 .dag
271 .edges_directed(id, Direction::Outgoing)
272 .filter_map(|edge| {
273 let name = edge.weight().name()?;
274 into_pb_link(edge, name)
275 })
276 .collect::<Vec<_>>();
277
278 named_links.sort_by(|a, b| {
280 static LINK_WITH_NAME_QED: &str = "Links with `None` as name were filtered previously .qed";
281 let a_name = a.name.as_ref().expect(LINK_WITH_NAME_QED);
282 let b_name = b.name.as_ref().expect(LINK_WITH_NAME_QED);
283 a_name.cmp(b_name)
284 });
285 named_links
286 }
287
288 fn outgoing_links_as_blocks(&self, id: BlockId) -> Vec<PbLink> {
289 self.dag
290 .edges_directed(id, Direction::Outgoing)
291 .filter_map(|edge| {
292 let cum_dag_size = edge.weight().cumulative_dag_size();
293 let target_id = edge.target();
294 let target = self.dag.node_weight(target_id)?;
295 Some(proto::new_pb_link(target.cid, None, cum_dag_size))
296 })
297 .collect::<Vec<_>>()
298 }
299
300 pub fn block_count(&self) -> usize {
301 self.dag.node_count()
302 }
303}
304
305impl<T: Read + Seek> ContentAddressableArchive<T> {
306 pub fn new(config: Config) -> Result<Self> {
307 let mut this = Self::new_without_root(config);
308
309 let root_folder = Block::new_dag_pb(Cid::default(), DagPb::directory(), ());
311 let root_folder_id = this.add_block_without_cid(root_folder);
312 this.rebuild(root_folder_id)?;
313 this.root_ids.push(root_folder_id);
314 Ok(this)
315 }
316
317 fn rebuild_ancestors(&mut self, id: BlockId) -> Result<()> {
323 let rev_dag = Reversed(&self.dag);
324 let ancestors = Dfs::new(&rev_dag, id).iter(&rev_dag).collect::<Vec<_>>();
325
326 for block_id in ancestors {
327 self.rebuild(block_id)?;
328 }
329
330 Ok(())
331 }
332
333 fn rebuild(&mut self, id: BlockId) -> Result<()> {
334 let mut hasher = self.config.hasher()?;
335
336 let (cid, dag_pb_data) = {
337 let block = self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id))?;
338
339 self.index_by_cid.remove(&block.cid);
341
342 let (cid_codec, dag_pb_data) = match &block.r#type {
344 BlockType::Raw => {
345 let _len = copy(&mut block.data.clone_and_rewind(), &mut hasher)?;
346 (CidCodec::Raw, Bytes::new())
347 },
348 BlockType::DagPb(dag_pb) => {
349 let pb_node = self.as_pb_node(id, dag_pb)?;
350 let dag_pb_data = Bytes::from(pb_node.into_bytes());
351 let _len = copy(&mut dag_pb_data.clone().reader(), &mut hasher)?;
352 (CidCodec::DagPb, dag_pb_data)
353 },
354 };
355
356 let digest = self.config.hash_code.wrap(hasher.finalize())?;
357 let cid = Cid::new_v1(cid_codec as u64, digest);
358 (cid, dag_pb_data)
359 };
360
361 let cumulative_dag_size = self
363 .dag
364 .edges_directed(id, Direction::Incoming)
365 .map(|edge| edge.weight().cumulative_dag_size())
366 .sum();
367 let block_outgoing_edges = self
368 .dag
369 .edges_directed(id, Direction::Outgoing)
370 .filter_map(|edge| match edge.weight() {
371 Link::Block(..) => Some(edge.id()),
372 _ => None,
373 })
374 .collect::<Vec<_>>();
375 for edge_id in block_outgoing_edges {
376 if let Some(Link::Block(block_link)) = self.dag.edge_weight_mut(edge_id) {
377 block_link.cumulative_dag_size = cumulative_dag_size;
378 }
379 }
380
381 self.index_by_cid.insert(cid, id);
383 let block = self.dag.node_weight_mut(id).ok_or(NotFoundErr::BlockId(id))?;
384 block.cid = cid;
385 if let BlockType::DagPb(dag_pb) = &mut block.r#type {
386 dag_pb.data = BoundedReader::from(dag_pb_data);
387 }
388 Ok(())
389 }
390
391 fn as_pb_node(&self, block_id: BlockId, dag_pb: &DagPb<T>) -> Result<PbNode> {
392 let pb_node = match &dag_pb.r#type {
393 DagPbType::Dir => {
394 let links = self.outgoing_links_as_entries(block_id);
395 let pb_data: Bytes = proto::Data::new_directory().into();
396 proto::new_pb_node(links, pb_data)
397 },
398 DagPbType::Symlink(s) => {
399 let pb_data: Bytes = proto::Data::new_symlink(s.posix_path.clone()).into();
400 proto::new_pb_node(vec![], pb_data)
401 },
402 DagPbType::SingleBlockFile => {
403 let mut sbf_buf = Vec::with_capacity(dag_pb.data.bound_len() as usize);
404 let _ = dag_pb.data.clone_and_rewind().read_to_end(&mut sbf_buf)?;
405 let pb_data: Bytes = proto::Data::new_file_with_data(sbf_buf).into();
406 proto::new_pb_node(vec![], pb_data)
407 },
408 DagPbType::MultiBlockFile(mbf) => {
409 let links = self.outgoing_links_as_blocks(block_id);
410 let pb_data: Bytes = proto::Data::new_file(mbf.block_sizes.clone()).into();
411 proto::new_pb_node(links, pb_data)
412 },
413 DagPbType::MissingBlock(l) => fail!(InvalidErr::is_a_miss_block(format!("block_id={block_id:?}"), &l.cid)),
414 };
415 Ok(pb_node)
416 }
417}
418
419impl<T> ContentAddressableArchive<T> {
423 pub fn read_dir(&self, path: &Path) -> Result<impl Iterator<Item = &str>> {
424 let block_id = self.path_to_block_id(path)?;
425 let mut entries = self
426 .dag
427 .edges_directed(block_id, Direction::Outgoing)
428 .filter_map(|edge| edge.weight().name())
429 .collect::<Vec<_>>();
430 entries.sort();
431
432 Ok(entries.into_iter())
433 }
434
435 pub fn open_file(&self, path: &Path) -> Result<BoundedReader<T>> {
436 self.open_file_with_loop_detector(path, smallvec![])
437 }
438
439 fn open_file_with_loop_detector(
440 &self,
441 path: &Path,
442 mut open_block_ids: SmallVec<[BlockId; 1]>,
443 ) -> Result<BoundedReader<T>> {
444 let id = self.path_to_block_id(path)?;
445 let block = self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id))?;
446 match &block.r#type {
447 BlockType::Raw => Ok(block.data.clone_and_rewind()),
448 BlockType::DagPb(dag_pb) => match &dag_pb.r#type {
449 DagPbType::SingleBlockFile => Ok(dag_pb.data.clone_and_rewind()),
450 DagPbType::MultiBlockFile(_mbf) => Ok(self.open_multi_block_file(id)),
451 DagPbType::Symlink(symlink) => {
452 check_loop_and_update(&mut open_block_ids, path, id)?;
453 let target_abs_path = self.resolve_open_symlink(path, Path::new(&symlink.posix_path));
454 self.open_file_with_loop_detector(&target_abs_path, open_block_ids)
455 },
456 DagPbType::Dir => fail!(InvalidErr::is_a_dir(path)),
457 DagPbType::MissingBlock(pb_link) => fail!(InvalidErr::is_a_miss_block(path, &pb_link.cid)),
458 },
459 }
460 }
461
462 fn resolve_open_symlink(&self, link_path: &Path, target_path: &Path) -> PathBuf {
463 if target_path.is_absolute() {
464 return target_path.to_path_buf();
465 }
466
467 let root = Path::new("/");
468 let mut link_path_parent = link_path.parent().unwrap_or(root);
469 if link_path_parent.as_os_str().is_empty() {
470 link_path_parent = root;
471 }
472
473 link_path_parent.join(target_path)
474 }
475
476 fn open_multi_block_file(&self, id: BlockId) -> BoundedReader<T> {
477 let dfs = Dfs::new(&self.dag, id);
478 let part_readers = dfs
479 .iter(&self.dag)
480 .filter_map(|child_id| {
481 let child = self.dag.node_weight(child_id)?;
482 child.as_sfb_data()
483 })
484 .collect::<Vec<_>>();
485
486 ChainedBoundedReader::new(part_readers).into()
487 }
488
489 pub fn metadata(&self, path: &Path) -> Result<Metadata> {
490 self.metadata_with_loop_detector(path, smallvec![])
491 }
492
493 fn metadata_with_loop_detector(&self, path: &Path, mut open_block_ids: SmallVec<[BlockId; 1]>) -> Result<Metadata> {
494 let block_id = self.path_to_block_id(path)?;
495 let block = self.dag.node_weight(block_id).ok_or(NotFoundErr::BlockId(block_id))?;
496
497 let meta = match &block.r#type {
498 BlockType::Raw => Metadata::file(block.data.bound_len()),
499 BlockType::DagPb(dag_pb) => match &dag_pb.r#type {
500 DagPbType::SingleBlockFile => Metadata::file(block.data_len()),
501 DagPbType::MultiBlockFile(mbf) => {
502 let acc_len = mbf.block_sizes.iter().sum::<u64>();
503 Metadata::file(acc_len)
504 },
505 DagPbType::Dir => Metadata::directory(),
506 DagPbType::Symlink(symlink) => {
507 check_loop_and_update(&mut open_block_ids, path, block_id)?;
508 let target_abs_path = self.resolve_open_symlink(path, Path::new(&symlink.posix_path));
509 let target_meta = self.metadata_with_loop_detector(&target_abs_path, open_block_ids)?;
510 Metadata::symlink(target_meta, Path::new(&symlink.posix_path))
511 },
512 DagPbType::MissingBlock(link) => fail!(InvalidErr::is_a_miss_block(path, &link.cid)),
513 },
514 };
515
516 Ok(meta)
517 }
518
519 pub fn exists(&self, path: &Path) -> bool {
520 self.path_to_block_id(path).ok().is_some()
521 }
522}
523
524impl<T: Read + Seek> ContentAddressableArchive<T> {
525 pub fn create_dir(&mut self, path: &Path) -> Result<()> {
527 let dir_name = path
528 .file_name()
529 .ok_or_else(|| InvalidErr::file_name(path))?
530 .to_str()
531 .ok_or_else(|| InvalidErr::not_utf8_path(path))?;
532 let parent_path = path.parent().unwrap_or_else(|| Path::new("."));
533 let parent_id = self.path_to_block_id(parent_path)?;
534
535 let found_dir_name = self
537 .dag
538 .edges_directed(parent_id, Direction::Outgoing)
539 .find(|edge| edge.weight().name() == Some(dir_name));
540 ensure!(found_dir_name.is_none(), InvalidErr::exists(dir_name));
541
542 let new_dir = Block::new_dag_pb(Cid::default(), DagPb::directory(), ());
543 let new_dir_id = self.add_block(new_dir);
544 self.dag.add_edge(parent_id, new_dir_id, NamedLink::new(dir_name).into());
545 self.rebuild_ancestors(new_dir_id)
546 }
547
548 pub fn add_file(&mut self, path: &Path, reader: T) -> Result<()> {
549 let os_name = path.file_name().ok_or_else(|| NotFoundErr::file_name(path))?;
550 let name = os_name.to_str().ok_or_else(|| InvalidErr::not_utf8_path(os_name))?;
551
552 let bounded = BoundedReader::from_reader(reader)?;
554 let block_id = BlockBuilder::new(self, bounded)?.build()?;
555
556 if !self.root_ids.is_empty() {
557 let parent_path = path.parent().unwrap_or(Path::new("."));
558 let parent_id = self.path_to_block_id(parent_path)?;
559 self.dag.add_edge(parent_id, block_id, NamedLink::new(name).into());
560 self.rebuild_ancestors(block_id)
561 } else {
562 self.root_ids.push(block_id);
563 self.dag.add_edge(block_id, block_id, NamedLink::new(name).into());
564 Ok(())
565 }
566 }
567
568 pub fn root_cids(&self) -> Result<Vec<Cid>> {
569 self.root_ids
570 .iter()
571 .map(|id| {
572 let block = self.dag.node_weight(*id).ok_or(NotFoundErr::BlockId(*id))?;
573 Ok(block.cid)
574 })
575 .collect()
576 }
577}
578
579impl<F: Read + Seek> ContentAddressableArchive<F> {
583 pub fn load(reader: F) -> Result<Self> {
584 let mut reader = BoundedReader::from_reader(reader)?;
585 let mut this = Self::base_new(reader.clone(), Config::default());
586
587 let header = CarHeader::load(&mut reader)?;
589 this.car_overhead_byte_counter += reader.stream_position()?;
590 trace!(?header, pos = this.car_overhead_byte_counter, "Header loaded");
591
592 while let Some(block_def) = BlockDef::load(&mut reader)? {
594 trace!(?block_def, "BlockDef loaded");
596 this.car_overhead_byte_counter += block_def.car_overhead_byte_counter;
597 let block_data = reader.sub(block_def.range.clone())?;
598
599 let cid_codec = block_def.cid.codec();
601 let codec = CidCodec::from_repr(cid_codec).ok_or(Error::CodecNotSupported(cid_codec))?;
602 match codec {
603 CidCodec::Raw => this.add_block(Block::new_raw(block_def.cid, block_data)),
604 CidCodec::DagPb => DagPb::load(&mut this, block_def.cid, block_data)?,
605 _other => fail!(Error::CodecNotSupported(cid_codec)),
606 };
607 reader.seek(SeekFrom::Start(block_def.range.end))?;
608 }
609
610 this.root_ids = header
612 .roots
613 .iter()
614 .filter_map(|cid| this.index_by_cid.get(&cid.0))
615 .cloned()
616 .collect::<SmallBlockIds>();
617
618 Ok(this)
619 }
620}
621
622impl<T: Read + Seek + 'static> ContentAddressableArchive<T> {
626 pub fn write<W: Write>(&mut self, writer: &mut W) -> Result<u64> {
627 let header = CarHeader::new_v1(self.root_cids()?);
629 let header_written = header.write(writer)? as u64;
630 let mut acc_written = 0u64;
636
637 for id in self.dag.node_indices() {
638 let block = self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id))?;
639 let cid = block.cid;
640 let written_bytes = match &block.r#type {
641 BlockType::Raw => {
642 let len = block.data.bound_len();
643 write_block(cid, len, &mut block.data.clone_and_rewind(), writer)?
644 },
645 BlockType::DagPb(dag_pb) => {
646 if block.data.bound_len() > 0 {
647 let len = block.data.bound_len();
649 write_block(cid, len, &mut block.data.clone_and_rewind(), writer)?
650 } else {
651 let pb_node = Bytes::from(self.as_pb_node(id, dag_pb)?.into_bytes());
653 let pb_node_len = pb_node.len() as u64;
654 write_block(cid, pb_node_len, &mut pb_node.reader(), writer)?
655 }
656 },
657 };
658 acc_written = acc_written.checked_add(written_bytes).ok_or(Error::FileTooLarge)?;
659 }
660
661 header_written.checked_add(acc_written).ok_or(Error::FileTooLarge)
662 }
663}
664
665fn write_block<R: Read, W: Write>(cid: Cid, reader_len: u64, reader: &mut R, w: &mut W) -> Result<u64> {
666 let cid = cid.to_bytes();
667 let section_len = reader_len.checked_add(cid.len() as u64).ok_or(Error::FileTooLarge)?;
668
669 let leb_written = leb128::write::unsigned(w, section_len)? as u64;
670 w.write_all(&cid)?;
671 let copied = copy(reader, w)?;
672
673 copied.checked_add(leb_written + cid.len() as u64).ok_or(Error::FileTooLarge)
674}
675
676impl<T> ContentAddressableArchive<T> {
677 fn traverse_blocks<F>(&self, mut len_fn: F) -> u64
678 where
679 F: FnMut(&Block<T>, BlockId, &mut VecDeque<BlockId>, &HashSet<BlockId>) -> u64,
680 {
681 let mut acc_len = 0u64;
682 let mut closed = HashSet::new();
683 let mut open = VecDeque::from_iter(self.root_ids.iter().copied());
684
685 while let Some(id) = open.pop_front() {
686 let Some(block) = self.dag.node_weight(id) else { continue };
687 let block_len = len_fn(block, id, &mut open, &closed);
688
689 closed.insert(id);
690 acc_len = acc_len.saturating_add(block_len);
691 }
692
693 acc_len
694 }
695}
696
697impl<T> ContextLen for ContentAddressableArchive<T> {
698 fn data_len(&self) -> u64 {
699 self.traverse_blocks(|block, id, open, closed| match &block.r#type {
700 BlockType::Raw => block.data.bound_len(),
701 BlockType::DagPb(dag_pb) => match &dag_pb.r#type {
702 DagPbType::Dir => {
703 for entry_id in self.outgoing_links(id) {
704 if entry_id != id && !open.contains(&entry_id) && !closed.contains(&entry_id) {
705 open.push_back(entry_id);
706 }
707 }
708 0u64
709 },
710 DagPbType::SingleBlockFile => dag_pb.data.bound_len(),
711 DagPbType::MultiBlockFile(mbf) => mbf.block_sizes.iter().sum(),
712 DagPbType::Symlink(..) | DagPbType::MissingBlock(..) => 0u64,
713 },
714 })
715 }
716
717 fn pb_data_len(&self) -> u64 {
718 self.traverse_blocks(|block, id, open, closed| {
719 if let BlockType::DagPb(dag_pb) = &block.r#type {
720 if let DagPbType::Dir = &dag_pb.r#type {
721 for entry_id in self.outgoing_links(id) {
722 if entry_id != id && !open.contains(&entry_id) && !closed.contains(&entry_id) {
723 open.push_back(entry_id);
724 }
725 }
726 }
727 }
728 block.data.bound_len()
729 })
730 }
731}
732
733fn check_loop_and_update(
739 open_block_ids: &mut SmallVec<[BlockId; 1]>,
740 target_path: &Path,
741 target_id: BlockId,
742) -> Result<()> {
743 ensure!(!open_block_ids.contains(&target_id), LoopDetectedErr::Symlink(target_path.to_owned()));
744
745 open_block_ids.push(target_id);
746 Ok(())
747}