Skip to main content

ipld_car/
car.rs

1//! IPLD CAR v1 (Content Addressable aRchive) format.
2//!
3//! # Wire format
4//!
5//! ```text
6//! <uvarint(header-len)> <dag-cbor-header> (<uvarint(section-len)> <cid-bytes> <block-data>)*
7//! ```
8//!
9//! The header is a DAG-CBOR map:
10//! ```json
11//! { "version": 1, "roots": [<CID tag(42)>, ...] }
12//! ```
13//!
14//! CIDs in the CBOR header are encoded as CBOR tag 42 over a byte string
15//! with a leading `\x00` multibase-identity prefix.  In block sections the
16//! CID bytes appear **without** any prefix.
17//!
18//! Reference: <https://ipld.io/specs/transport/car/carv1/>
19use crate::{
20	bounded_reader::{
21		sync::{BoundedReader, ChainedBoundedReader},
22		traits::{Bounded as _, CloneAndRewind as _},
23	},
24	config::{CidCodec, Config},
25	dag_pb::{BlockLink, DagPb, DagPbType, Link, NamedLink},
26	ensure,
27	error::{Error, InvalidErr, LoopDetectedErr, NotFoundErr, NotSupportedErr, Result},
28	fail, proto,
29	traits::ContextLen,
30};
31
32use bytes::{Buf, Bytes};
33use libipld::{
34	multihash::MultihashDigest,
35	pb::{PbLink, PbNode},
36	Cid,
37};
38use petgraph::{
39	graph::{EdgeReference, Graph, NodeIndex},
40	visit::{Dfs, EdgeRef, Reversed, Walker},
41	Direction,
42};
43use smallvec::{smallvec, SmallVec};
44use std::{
45	collections::{HashMap, HashSet, VecDeque},
46	fs::File,
47	io::{copy, BufWriter, Read, Seek, SeekFrom, Write},
48	path::{Component, Path, PathBuf},
49};
50use tempfile::tempfile;
51use tracing::{debug, trace};
52
53mod metadata;
54pub use metadata::{FileType, Metadata};
55mod block;
56pub use block::{Block, BlockType};
57mod block_builder;
58use block_builder::BlockBuilder;
59mod block_def;
60pub(crate) use block_def::BlockDef;
61mod cbor_cid;
62#[cfg(feature = "vfs")]
63pub mod fs;
64mod header;
65pub(crate) use header::CarHeader;
66#[cfg(test)]
67mod tests;
68
69pub type BlockId = NodeIndex<u32>;
70pub type SmallBlockIds = SmallVec<[BlockId; 1]>;
71static BLOCK_INSERTED_QED: &str = "Block just added .qed";
72
73#[derive(derive_more::Debug)]
74pub struct ContentAddressableArchive<T> {
75	/// Configuration used only to generate consolidation info, like CIDs
76	config: Config,
77
78	/// Inner reader.
79	pub content: BoundedReader<T>,
80
81	/// Blocks DAG
82	pub dag: Graph<Block<T>, Link>,
83
84	/// CAR root IDs.
85	root_ids: SmallBlockIds,
86
87	/// Index block ID by CID.
88	index_by_cid: HashMap<Cid, BlockId>,
89
90	/// On MBF load process, this list stores any link (cid)
91	/// that is referenced but it is not yet loaded.
92	mbf_pending_links: HashMap<Cid, BlockId>,
93
94	/// On loads, it register bytes used by CAR
95	pub car_overhead_byte_counter: u64,
96}
97
98impl ContentAddressableArchive<BufWriter<File>> {
99	pub fn new_temp(config: Config) -> Result<Self> {
100		let content = BoundedReader::from_reader(BufWriter::new(tempfile()?))?;
101
102		Ok(Self::base_new(content, config))
103	}
104}
105
106impl<T> ContentAddressableArchive<T> {
107	pub fn new_without_root(config: Config) -> Self {
108		Self::base_new(BoundedReader::empty(), config)
109	}
110
111	fn base_new(content: BoundedReader<T>, config: Config) -> Self {
112		let root_ids = SmallBlockIds::new();
113		let index_by_cid = HashMap::new();
114		let mbf_pending_links = HashMap::new();
115		let dag = Graph::new();
116
117		Self { content, config, dag, index_by_cid, mbf_pending_links, root_ids, car_overhead_byte_counter: 0u64 }
118	}
119
120	pub(crate) fn add_block_without_cid(&mut self, block: Block<T>) -> BlockId {
121		let id = self.dag.add_node(block);
122		let block = self.dag.node_weight(id).expect(BLOCK_INSERTED_QED);
123		debug!(?id, ?block, "Added block without cid");
124		id
125	}
126
127	pub(crate) fn add_block(&mut self, block: Block<T>) -> BlockId {
128		// Check if block is a missing block
129		let id = if let Some(id) = self.index_by_cid.get(&block.cid).copied() {
130			if let Some(pre_block) = self.dag.node_weight_mut(id) {
131				if let Some(DagPbType::MissingBlock(..)) = &pre_block.dag_pb_type() {
132					*pre_block = block
133				}
134			}
135			id
136		} else {
137			self.dag.add_node(block)
138		};
139
140		let (cid, pb_data_len) = {
141			let block = self.dag.node_weight(id).expect(BLOCK_INSERTED_QED);
142			debug!(?id, ?block, "Added block");
143			(block.cid, block.pb_data_len())
144		};
145
146		// Double-check pending link list
147		self.check_mbf_pending_link(id, cid, pb_data_len);
148
149		// Add index by CID
150		self.index_by_cid.insert(cid, id);
151		id
152	}
153
154	pub(crate) fn link_children(&mut self, id: BlockId, children: &[BlockId]) {
155		for child_id in children {
156			let child_pb_len = self.dag.node_weight(*child_id).map(|block| block.pb_data_len()).unwrap_or_default();
157			let link = BlockLink::new(child_pb_len).into();
158			self.dag.add_edge(id, *child_id, link);
159		}
160	}
161
162	fn check_mbf_pending_link(&mut self, id: BlockId, cid: Cid, pb_data_len: u64) {
163		if let Some(parent_id) = self.mbf_pending_links.get(&cid) {
164			let link = BlockLink::new(pb_data_len).into();
165			debug!(?parent_id, cid = cid.to_string(), ?link, "MBF pending link found");
166			self.dag.add_edge(*parent_id, id, link);
167			self.mbf_pending_links.remove(&cid);
168		}
169	}
170
171	pub(crate) fn add_multi_block_file(&mut self, block: Block<T>, links: &[PbLink]) -> BlockId {
172		let dag_pb_len = block.pb_data_len();
173		let id = self.add_block(block);
174
175		for l in links {
176			if let Some(link_id) = self.index_by_cid.get(&l.cid) {
177				let link = BlockLink::new(dag_pb_len).into();
178				self.dag.add_edge(id, *link_id, link);
179			} else {
180				self.mbf_pending_links.insert(l.cid, id);
181			}
182		}
183
184		id
185	}
186
187	pub(crate) fn add_directory(&mut self, block: Block<T>, links: &[PbLink]) -> BlockId {
188		let id = self.add_block(block);
189		tracing::debug!(?links, "Add directory");
190
191		for link in links {
192			let link_id = self.index_by_cid.get(&link.cid).copied().unwrap_or_else(|| {
193				let missing_block = Block::new_dag_pb(link.cid, DagPbType::MissingBlock(Box::new(link.clone())), ());
194				self.add_block(missing_block)
195			});
196			let link = NamedLink::new(link.name.clone().unwrap_or_default()).into();
197			self.dag.add_edge(id, link_id, link);
198		}
199
200		id
201	}
202
203	/// Returns the `BlockId`s associated to `path`.
204	///
205	/// Please note that it can be more than one because a CAR can contains multiple roots.
206	fn path_to_block_ids(&self, path: &Path) -> Result<SmallBlockIds> {
207		let not_found_path = || NotFoundErr::Path(path.to_owned());
208		let mut levels = vec![self.root_ids.clone()];
209
210		for path_component in path.components() {
211			match path_component {
212				Component::Normal(os_name) => {
213					let name = os_name.to_str().ok_or_else(not_found_path)?;
214
215					let mut new_level = SmallBlockIds::new();
216					for block_id in levels.last().ok_or_else(not_found_path)? {
217						let mut targets = self
218							.dag
219							.edges_directed(*block_id, Direction::Outgoing)
220							.filter_map(|edge| (edge.weight().name() == Some(name)).then_some(edge.target()))
221							.collect::<SmallBlockIds>();
222						new_level.append(&mut targets);
223					}
224
225					levels.push(new_level)
226				},
227				Component::RootDir | Component::CurDir => {},
228				Component::ParentDir => {
229					levels.pop().ok_or_else(not_found_path)?;
230				},
231				Component::Prefix(..) => fail!(NotSupportedErr::Prefix),
232			}
233		}
234
235		levels.pop().ok_or_else(|| not_found_path().into())
236	}
237
238	/// Returns the  **unique**`BlockId` associated to `path`.
239	///
240	/// If there is more that one `BlockId`, it will fail with an `Error::MoreThanOneMatchOnPath(..)`
241	fn path_to_block_id(&self, path: &Path) -> Result<BlockId> {
242		let ids = self.path_to_block_ids(path)?;
243		ensure!(ids.len() < 2, Error::more_than_one(ids.len(), path));
244		ids.first().copied().ok_or_else(|| Error::NotFound(NotFoundErr::Path(path.to_owned())))
245	}
246
247	/// Returns the **unique** `Block` associated to `path`
248	fn path_to_block(&self, path: &Path) -> Result<&'_ Block<T>> {
249		let id = self.path_to_block_id(path)?;
250		self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id).into())
251	}
252
253	pub fn path_to_cid(&self, path: &Path) -> Option<&Cid> {
254		self.path_to_block(path).map(|block| &block.cid).ok()
255	}
256
257	fn outgoing_links(&self, id: BlockId) -> Vec<BlockId> {
258		self.dag.edges_directed(id, Direction::Outgoing).map(|edge| edge.target()).collect()
259	}
260
261	fn outgoing_links_as_entries(&self, id: BlockId) -> Vec<PbLink> {
262		let into_pb_link = |edge: EdgeReference<'_, Link>, name: &str| {
263			let target_id = edge.target();
264			let target = self.dag.node_weight(target_id)?;
265			Some(proto::new_pb_link(target.cid, name.to_owned(), None))
266		};
267
268		// Dev: Only `edge` with proper `name`
269		let mut named_links = self
270			.dag
271			.edges_directed(id, Direction::Outgoing)
272			.filter_map(|edge| {
273				let name = edge.weight().name()?;
274				into_pb_link(edge, name)
275			})
276			.collect::<Vec<_>>();
277
278		// NOTE: Links should be sorted by name, following the IPLD specs.
279		named_links.sort_by(|a, b| {
280			static LINK_WITH_NAME_QED: &str = "Links with `None` as name were filtered previously .qed";
281			let a_name = a.name.as_ref().expect(LINK_WITH_NAME_QED);
282			let b_name = b.name.as_ref().expect(LINK_WITH_NAME_QED);
283			a_name.cmp(b_name)
284		});
285		named_links
286	}
287
288	fn outgoing_links_as_blocks(&self, id: BlockId) -> Vec<PbLink> {
289		self.dag
290			.edges_directed(id, Direction::Outgoing)
291			.filter_map(|edge| {
292				let cum_dag_size = edge.weight().cumulative_dag_size();
293				let target_id = edge.target();
294				let target = self.dag.node_weight(target_id)?;
295				Some(proto::new_pb_link(target.cid, None, cum_dag_size))
296			})
297			.collect::<Vec<_>>()
298	}
299
300	pub fn block_count(&self) -> usize {
301		self.dag.node_count()
302	}
303}
304
305impl<T: Read + Seek> ContentAddressableArchive<T> {
306	pub fn new(config: Config) -> Result<Self> {
307		let mut this = Self::new_without_root(config);
308
309		// Add a root folder
310		let root_folder = Block::new_dag_pb(Cid::default(), DagPb::directory(), ());
311		let root_folder_id = this.add_block_without_cid(root_folder);
312		this.rebuild(root_folder_id)?;
313		this.root_ids.push(root_folder_id);
314		Ok(this)
315	}
316
317	/// Recomputes consolidation info (like `CID`) for each block that was marked as dirty,
318	/// then propagates upward through all ancestors.
319	///
320	/// Uses pre-order DFS on the reversed graph so `id` is rebuilt first and ancestors follow
321	/// bottom-up. Cycle-safe: the DFS tracks visited nodes and never re-enters them.
322	fn rebuild_ancestors(&mut self, id: BlockId) -> Result<()> {
323		let rev_dag = Reversed(&self.dag);
324		let ancestors = Dfs::new(&rev_dag, id).iter(&rev_dag).collect::<Vec<_>>();
325
326		for block_id in ancestors {
327			self.rebuild(block_id)?;
328		}
329
330		Ok(())
331	}
332
333	fn rebuild(&mut self, id: BlockId) -> Result<()> {
334		let mut hasher = self.config.hasher()?;
335
336		let (cid, dag_pb_data) = {
337			let block = self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id))?;
338
339			// Remove current CID from indexes
340			self.index_by_cid.remove(&block.cid);
341
342			// Rebuild CID
343			let (cid_codec, dag_pb_data) = match &block.r#type {
344				BlockType::Raw => {
345					let _len = copy(&mut block.data.clone_and_rewind(), &mut hasher)?;
346					(CidCodec::Raw, Bytes::new())
347				},
348				BlockType::DagPb(dag_pb) => {
349					let pb_node = self.as_pb_node(id, dag_pb)?;
350					let dag_pb_data = Bytes::from(pb_node.into_bytes());
351					let _len = copy(&mut dag_pb_data.clone().reader(), &mut hasher)?;
352					(CidCodec::DagPb, dag_pb_data)
353				},
354			};
355
356			let digest = self.config.hash_code.wrap(hasher.finalize())?;
357			let cid = Cid::new_v1(cid_codec as u64, digest);
358			(cid, dag_pb_data)
359		};
360
361		// Calculate the cumulative_dag_size
362		let cumulative_dag_size = self
363			.dag
364			.edges_directed(id, Direction::Incoming)
365			.map(|edge| edge.weight().cumulative_dag_size())
366			.sum();
367		let block_outgoing_edges = self
368			.dag
369			.edges_directed(id, Direction::Outgoing)
370			.filter_map(|edge| match edge.weight() {
371				Link::Block(..) => Some(edge.id()),
372				_ => None,
373			})
374			.collect::<Vec<_>>();
375		for edge_id in block_outgoing_edges {
376			if let Some(Link::Block(block_link)) = self.dag.edge_weight_mut(edge_id) {
377				block_link.cumulative_dag_size = cumulative_dag_size;
378			}
379		}
380
381		// Update Block
382		self.index_by_cid.insert(cid, id);
383		let block = self.dag.node_weight_mut(id).ok_or(NotFoundErr::BlockId(id))?;
384		block.cid = cid;
385		if let BlockType::DagPb(dag_pb) = &mut block.r#type {
386			dag_pb.data = BoundedReader::from(dag_pb_data);
387		}
388		Ok(())
389	}
390
391	fn as_pb_node(&self, block_id: BlockId, dag_pb: &DagPb<T>) -> Result<PbNode> {
392		let pb_node = match &dag_pb.r#type {
393			DagPbType::Dir => {
394				let links = self.outgoing_links_as_entries(block_id);
395				let pb_data: Bytes = proto::Data::new_directory().into();
396				proto::new_pb_node(links, pb_data)
397			},
398			DagPbType::Symlink(s) => {
399				let pb_data: Bytes = proto::Data::new_symlink(s.posix_path.clone()).into();
400				proto::new_pb_node(vec![], pb_data)
401			},
402			DagPbType::SingleBlockFile => {
403				let mut sbf_buf = Vec::with_capacity(dag_pb.data.bound_len() as usize);
404				let _ = dag_pb.data.clone_and_rewind().read_to_end(&mut sbf_buf)?;
405				let pb_data: Bytes = proto::Data::new_file_with_data(sbf_buf).into();
406				proto::new_pb_node(vec![], pb_data)
407			},
408			DagPbType::MultiBlockFile(mbf) => {
409				let links = self.outgoing_links_as_blocks(block_id);
410				let pb_data: Bytes = proto::Data::new_file(mbf.block_sizes.clone()).into();
411				proto::new_pb_node(links, pb_data)
412			},
413			DagPbType::MissingBlock(l) => fail!(InvalidErr::is_a_miss_block(format!("block_id={block_id:?}"), &l.cid)),
414		};
415		Ok(pb_node)
416	}
417}
418
419// File System interface
420// ===========================================================================
421
422impl<T> ContentAddressableArchive<T> {
423	pub fn read_dir(&self, path: &Path) -> Result<impl Iterator<Item = &str>> {
424		let block_id = self.path_to_block_id(path)?;
425		let mut entries = self
426			.dag
427			.edges_directed(block_id, Direction::Outgoing)
428			.filter_map(|edge| edge.weight().name())
429			.collect::<Vec<_>>();
430		entries.sort();
431
432		Ok(entries.into_iter())
433	}
434
435	pub fn open_file(&self, path: &Path) -> Result<BoundedReader<T>> {
436		self.open_file_with_loop_detector(path, smallvec![])
437	}
438
439	fn open_file_with_loop_detector(
440		&self,
441		path: &Path,
442		mut open_block_ids: SmallVec<[BlockId; 1]>,
443	) -> Result<BoundedReader<T>> {
444		let id = self.path_to_block_id(path)?;
445		let block = self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id))?;
446		match &block.r#type {
447			BlockType::Raw => Ok(block.data.clone_and_rewind()),
448			BlockType::DagPb(dag_pb) => match &dag_pb.r#type {
449				DagPbType::SingleBlockFile => Ok(dag_pb.data.clone_and_rewind()),
450				DagPbType::MultiBlockFile(_mbf) => Ok(self.open_multi_block_file(id)),
451				DagPbType::Symlink(symlink) => {
452					check_loop_and_update(&mut open_block_ids, path, id)?;
453					let target_abs_path = self.resolve_open_symlink(path, Path::new(&symlink.posix_path));
454					self.open_file_with_loop_detector(&target_abs_path, open_block_ids)
455				},
456				DagPbType::Dir => fail!(InvalidErr::is_a_dir(path)),
457				DagPbType::MissingBlock(pb_link) => fail!(InvalidErr::is_a_miss_block(path, &pb_link.cid)),
458			},
459		}
460	}
461
462	fn resolve_open_symlink(&self, link_path: &Path, target_path: &Path) -> PathBuf {
463		if target_path.is_absolute() {
464			return target_path.to_path_buf();
465		}
466
467		let root = Path::new("/");
468		let mut link_path_parent = link_path.parent().unwrap_or(root);
469		if link_path_parent.as_os_str().is_empty() {
470			link_path_parent = root;
471		}
472
473		link_path_parent.join(target_path)
474	}
475
476	fn open_multi_block_file(&self, id: BlockId) -> BoundedReader<T> {
477		let dfs = Dfs::new(&self.dag, id);
478		let part_readers = dfs
479			.iter(&self.dag)
480			.filter_map(|child_id| {
481				let child = self.dag.node_weight(child_id)?;
482				child.as_sfb_data()
483			})
484			.collect::<Vec<_>>();
485
486		ChainedBoundedReader::new(part_readers).into()
487	}
488
489	pub fn metadata(&self, path: &Path) -> Result<Metadata> {
490		self.metadata_with_loop_detector(path, smallvec![])
491	}
492
493	fn metadata_with_loop_detector(&self, path: &Path, mut open_block_ids: SmallVec<[BlockId; 1]>) -> Result<Metadata> {
494		let block_id = self.path_to_block_id(path)?;
495		let block = self.dag.node_weight(block_id).ok_or(NotFoundErr::BlockId(block_id))?;
496
497		let meta = match &block.r#type {
498			BlockType::Raw => Metadata::file(block.data.bound_len()),
499			BlockType::DagPb(dag_pb) => match &dag_pb.r#type {
500				DagPbType::SingleBlockFile => Metadata::file(block.data_len()),
501				DagPbType::MultiBlockFile(mbf) => {
502					let acc_len = mbf.block_sizes.iter().sum::<u64>();
503					Metadata::file(acc_len)
504				},
505				DagPbType::Dir => Metadata::directory(),
506				DagPbType::Symlink(symlink) => {
507					check_loop_and_update(&mut open_block_ids, path, block_id)?;
508					let target_abs_path = self.resolve_open_symlink(path, Path::new(&symlink.posix_path));
509					let target_meta = self.metadata_with_loop_detector(&target_abs_path, open_block_ids)?;
510					Metadata::symlink(target_meta, Path::new(&symlink.posix_path))
511				},
512				DagPbType::MissingBlock(link) => fail!(InvalidErr::is_a_miss_block(path, &link.cid)),
513			},
514		};
515
516		Ok(meta)
517	}
518
519	pub fn exists(&self, path: &Path) -> bool {
520		self.path_to_block_id(path).ok().is_some()
521	}
522}
523
524impl<T: Read + Seek> ContentAddressableArchive<T> {
525	/// Creates a new empty directory at `parent_path/dir_name`.
526	pub fn create_dir(&mut self, path: &Path) -> Result<()> {
527		let dir_name = path
528			.file_name()
529			.ok_or_else(|| InvalidErr::file_name(path))?
530			.to_str()
531			.ok_or_else(|| InvalidErr::not_utf8_path(path))?;
532		let parent_path = path.parent().unwrap_or_else(|| Path::new("."));
533		let parent_id = self.path_to_block_id(parent_path)?;
534
535		// `dir_name` is not already used.
536		let found_dir_name = self
537			.dag
538			.edges_directed(parent_id, Direction::Outgoing)
539			.find(|edge| edge.weight().name() == Some(dir_name));
540		ensure!(found_dir_name.is_none(), InvalidErr::exists(dir_name));
541
542		let new_dir = Block::new_dag_pb(Cid::default(), DagPb::directory(), ());
543		let new_dir_id = self.add_block(new_dir);
544		self.dag.add_edge(parent_id, new_dir_id, NamedLink::new(dir_name).into());
545		self.rebuild_ancestors(new_dir_id)
546	}
547
548	pub fn add_file(&mut self, path: &Path, reader: T) -> Result<()> {
549		let os_name = path.file_name().ok_or_else(|| NotFoundErr::file_name(path))?;
550		let name = os_name.to_str().ok_or_else(|| InvalidErr::not_utf8_path(os_name))?;
551
552		// Create and add block.
553		let bounded = BoundedReader::from_reader(reader)?;
554		let block_id = BlockBuilder::new(self, bounded)?.build()?;
555
556		if !self.root_ids.is_empty() {
557			let parent_path = path.parent().unwrap_or(Path::new("."));
558			let parent_id = self.path_to_block_id(parent_path)?;
559			self.dag.add_edge(parent_id, block_id, NamedLink::new(name).into());
560			self.rebuild_ancestors(block_id)
561		} else {
562			self.root_ids.push(block_id);
563			self.dag.add_edge(block_id, block_id, NamedLink::new(name).into());
564			Ok(())
565		}
566	}
567
568	pub fn root_cids(&self) -> Result<Vec<Cid>> {
569		self.root_ids
570			.iter()
571			.map(|id| {
572				let block = self.dag.node_weight(*id).ok_or(NotFoundErr::BlockId(*id))?;
573				Ok(block.cid)
574			})
575			.collect()
576	}
577}
578
579// Load functions
580// ===========================================================================
581
582impl<F: Read + Seek> ContentAddressableArchive<F> {
583	pub fn load(reader: F) -> Result<Self> {
584		let mut reader = BoundedReader::from_reader(reader)?;
585		let mut this = Self::base_new(reader.clone(), Config::default());
586
587		// Load header
588		let header = CarHeader::load(&mut reader)?;
589		this.car_overhead_byte_counter += reader.stream_position()?;
590		trace!(?header, pos = this.car_overhead_byte_counter, "Header loaded");
591
592		// load each blocka
593		while let Some(block_def) = BlockDef::load(&mut reader)? {
594			// Block elements: content & consolidation info from `reader`
595			trace!(?block_def, "BlockDef loaded");
596			this.car_overhead_byte_counter += block_def.car_overhead_byte_counter;
597			let block_data = reader.sub(block_def.range.clone())?;
598
599			// Load block based on its CID.
600			let cid_codec = block_def.cid.codec();
601			let codec = CidCodec::from_repr(cid_codec).ok_or(Error::CodecNotSupported(cid_codec))?;
602			match codec {
603				CidCodec::Raw => this.add_block(Block::new_raw(block_def.cid, block_data)),
604				CidCodec::DagPb => DagPb::load(&mut this, block_def.cid, block_data)?,
605				_other => fail!(Error::CodecNotSupported(cid_codec)),
606			};
607			reader.seek(SeekFrom::Start(block_def.range.end))?;
608		}
609
610		// Update roots.
611		this.root_ids = header
612			.roots
613			.iter()
614			.filter_map(|cid| this.index_by_cid.get(&cid.0))
615			.cloned()
616			.collect::<SmallBlockIds>();
617
618		Ok(this)
619	}
620}
621
622// Write functions
623// ===========================================================================
624
625impl<T: Read + Seek + 'static> ContentAddressableArchive<T> {
626	pub fn write<W: Write>(&mut self, writer: &mut W) -> Result<u64> {
627		// Write header
628		let header = CarHeader::new_v1(self.root_cids()?);
629		let header_written = header.write(writer)? as u64;
630		// debug!(?header, pos = header_written, "Header written");
631
632		// Write blocks in node insertion order, which preserves the original file block order
633		// on round-trips. BFS would visit children in reverse-insertion order due to petgraph's
634		// adjacency list being prepend-only.
635		let mut acc_written = 0u64;
636
637		for id in self.dag.node_indices() {
638			let block = self.dag.node_weight(id).ok_or(NotFoundErr::BlockId(id))?;
639			let cid = block.cid;
640			let written_bytes = match &block.r#type {
641				BlockType::Raw => {
642					let len = block.data.bound_len();
643					write_block(cid, len, &mut block.data.clone_and_rewind(), writer)?
644				},
645				BlockType::DagPb(dag_pb) => {
646					if block.data.bound_len() > 0 {
647						// Pass-through: write the original bytes from the loaded file.
648						let len = block.data.bound_len();
649						write_block(cid, len, &mut block.data.clone_and_rewind(), writer)?
650					} else {
651						// New block (no original bytes): encode from structure.
652						let pb_node = Bytes::from(self.as_pb_node(id, dag_pb)?.into_bytes());
653						let pb_node_len = pb_node.len() as u64;
654						write_block(cid, pb_node_len, &mut pb_node.reader(), writer)?
655					}
656				},
657			};
658			acc_written = acc_written.checked_add(written_bytes).ok_or(Error::FileTooLarge)?;
659		}
660
661		header_written.checked_add(acc_written).ok_or(Error::FileTooLarge)
662	}
663}
664
665fn write_block<R: Read, W: Write>(cid: Cid, reader_len: u64, reader: &mut R, w: &mut W) -> Result<u64> {
666	let cid = cid.to_bytes();
667	let section_len = reader_len.checked_add(cid.len() as u64).ok_or(Error::FileTooLarge)?;
668
669	let leb_written = leb128::write::unsigned(w, section_len)? as u64;
670	w.write_all(&cid)?;
671	let copied = copy(reader, w)?;
672
673	copied.checked_add(leb_written + cid.len() as u64).ok_or(Error::FileTooLarge)
674}
675
676impl<T> ContentAddressableArchive<T> {
677	fn traverse_blocks<F>(&self, mut len_fn: F) -> u64
678	where
679		F: FnMut(&Block<T>, BlockId, &mut VecDeque<BlockId>, &HashSet<BlockId>) -> u64,
680	{
681		let mut acc_len = 0u64;
682		let mut closed = HashSet::new();
683		let mut open = VecDeque::from_iter(self.root_ids.iter().copied());
684
685		while let Some(id) = open.pop_front() {
686			let Some(block) = self.dag.node_weight(id) else { continue };
687			let block_len = len_fn(block, id, &mut open, &closed);
688
689			closed.insert(id);
690			acc_len = acc_len.saturating_add(block_len);
691		}
692
693		acc_len
694	}
695}
696
697impl<T> ContextLen for ContentAddressableArchive<T> {
698	fn data_len(&self) -> u64 {
699		self.traverse_blocks(|block, id, open, closed| match &block.r#type {
700			BlockType::Raw => block.data.bound_len(),
701			BlockType::DagPb(dag_pb) => match &dag_pb.r#type {
702				DagPbType::Dir => {
703					for entry_id in self.outgoing_links(id) {
704						if entry_id != id && !open.contains(&entry_id) && !closed.contains(&entry_id) {
705							open.push_back(entry_id);
706						}
707					}
708					0u64
709				},
710				DagPbType::SingleBlockFile => dag_pb.data.bound_len(),
711				DagPbType::MultiBlockFile(mbf) => mbf.block_sizes.iter().sum(),
712				DagPbType::Symlink(..) | DagPbType::MissingBlock(..) => 0u64,
713			},
714		})
715	}
716
717	fn pb_data_len(&self) -> u64 {
718		self.traverse_blocks(|block, id, open, closed| {
719			if let BlockType::DagPb(dag_pb) = &block.r#type {
720				if let DagPbType::Dir = &dag_pb.r#type {
721					for entry_id in self.outgoing_links(id) {
722						if entry_id != id && !open.contains(&entry_id) && !closed.contains(&entry_id) {
723							open.push_back(entry_id);
724						}
725					}
726				}
727			}
728			block.data.bound_len()
729		})
730	}
731}
732
733// Tools
734// ===========================================================================
735
736/// Uses `open_block_ids` to track visited block IDs, in order to detect loops during the
737/// resolution of symbolic links.
738fn check_loop_and_update(
739	open_block_ids: &mut SmallVec<[BlockId; 1]>,
740	target_path: &Path,
741	target_id: BlockId,
742) -> Result<()> {
743	ensure!(!open_block_ids.contains(&target_id), LoopDetectedErr::Symlink(target_path.to_owned()));
744
745	open_block_ids.push(target_id);
746	Ok(())
747}