Skip to main content

ipld_car/
dag_pb.rs

1use crate::{
2	bounded_reader::{
3		functions::slice_ref,
4		sync::BoundedReader,
5		traits::{Bounded, CloneAndRewind},
6	},
7	car::{Block, BlockId, ContentAddressableArchive},
8	error::{DagPbErr, DagPbResult, UnixFsErr},
9	fail, proto,
10	traits::ContextLen,
11};
12
13use bytes::{BufMut, BytesMut};
14use derivative::Derivative;
15use libipld::{
16	pb::{PbLink, PbNode},
17	Cid,
18};
19use nbytes::bytes;
20use prost::Message;
21use quick_protobuf::message::MessageWrite;
22use std::{
23	cmp::min,
24	io::{copy, Read, Seek},
25};
26
27mod multi_block_file;
28pub(crate) use multi_block_file::MultiBlockFile;
29mod symlink;
30pub use symlink::Symlink;
31mod link;
32pub use link::{BlockLink, Link, NamedLink};
33
34const BUF_LIMIT: usize = bytes!(2; MiB);
35const BUF_CAPS: &[usize] = &[bytes!(4; KiB), bytes!(16; KiB), bytes!(128; KiB), bytes!(512; KiB), bytes!(1; MiB)];
36
37#[derive(Debug, Clone)]
38pub enum DagPbType {
39	Dir,
40	Symlink(Symlink),
41	SingleBlockFile,
42	MultiBlockFile(MultiBlockFile),
43	MissingBlock(Box<PbLink>),
44}
45
46#[derive(derive_more::Debug, Derivative)]
47#[derivative(Clone(bound = ""))]
48pub struct DagPb<T> {
49	pub r#type: DagPbType,
50	pub data: BoundedReader<T>,
51}
52
53impl<T> DagPb<T> {
54	pub fn directory() -> Self {
55		Self::no_data(DagPbType::Dir)
56	}
57
58	pub fn symlink<S: Into<String>>(posix_path: S) -> Self {
59		Self::no_data(DagPbType::Symlink(Symlink::new(posix_path)))
60	}
61
62	#[inline]
63	pub fn single_block_file<D: Into<BoundedReader<T>>>(data: D) -> Self {
64		Self { r#type: DagPbType::SingleBlockFile, data: data.into() }
65	}
66
67	#[inline]
68	pub fn multi_block_file<BS, D>(blocksizes: BS, data: D) -> Self
69	where
70		BS: Into<MultiBlockFile>,
71		D: Into<BoundedReader<T>>,
72	{
73		Self { r#type: DagPbType::MultiBlockFile(blocksizes.into()), data: data.into() }
74	}
75
76	#[inline]
77	pub fn no_data(r#type: DagPbType) -> Self {
78		Self { r#type, data: Default::default() }
79	}
80
81	pub fn as_sfb_data(&self) -> Option<BoundedReader<T>> {
82		match &self.r#type {
83			DagPbType::SingleBlockFile => Some(self.data.clone_and_rewind()),
84			_ => None,
85		}
86	}
87}
88
89impl<T> From<DagPbType> for DagPb<T> {
90	fn from(r#type: DagPbType) -> Self {
91		Self { r#type, data: ().into() }
92	}
93}
94
95impl<T> ContextLen for DagPb<T> {
96	fn data_len(&self) -> u64 {
97		match &self.r#type {
98			DagPbType::Dir | DagPbType::MissingBlock(..) | DagPbType::Symlink(..) => 0u64,
99			DagPbType::SingleBlockFile => self.data.bound_len(),
100			DagPbType::MultiBlockFile(mbf) => mbf.block_sizes.iter().sum(),
101		}
102	}
103
104	fn pb_data_len(&self) -> u64 {
105		match &self.r#type {
106			DagPbType::SingleBlockFile => 0u64,
107			_ => self.data.bound_len(),
108		}
109	}
110}
111
112// Load part
113// ===========================================================================
114
115impl<T: Read + Seek> DagPb<T> {
116	pub fn load(
117		car: &mut ContentAddressableArchive<T>,
118		cid: Cid,
119		mut block_data: BoundedReader<T>,
120	) -> DagPbResult<BlockId> {
121		// Try to decode `PbNode` and rebounds to data.
122		let decode_pb_node_max = block_data.bound_len();
123		let pb_node = decode_pb_node(&mut block_data, decode_pb_node_max)?;
124		let pb_node_len = pb_node.get_size() as u64;
125		debug_assert_eq!(pb_node_len, pb_node.clone().into_bytes().len() as u64);
126		let pb_data = block_data.sub(..pb_node_len)?;
127		let data = block_data.sub(pb_node_len..)?;
128
129		// Decode Unixfs Data
130		let pb_node_data_enc = pb_node.data.clone().ok_or(UnixFsErr::MissingData)?;
131		let unixfs = proto::Data::decode(pb_node_data_enc).map_err(|_| UnixFsErr::InvalidData)?;
132
133		let unixfs_type = proto::data::DataType::try_from(unixfs.r#type)
134			.map_err(|_| UnixFsErr::DataTypeNotSupported(unixfs.r#type))?;
135
136		let id = match unixfs_type {
137			proto::data::DataType::Directory => load_directory(car, cid, pb_data, &pb_node.links),
138			proto::data::DataType::Symlink => load_symlink(car, cid, pb_data, unixfs)?,
139			proto::data::DataType::Raw =>
140				car.add_block(Block::new_dag_pb(cid, DagPb::single_block_file(data), pb_data)),
141			proto::data::DataType::File => load_file(car, cid, pb_node, unixfs, pb_data, data)?,
142			_ => fail!(UnixFsErr::DataTypeNotSupported(unixfs.r#type)),
143		};
144
145		Ok(id)
146	}
147}
148
149fn load_directory<T>(
150	car: &mut ContentAddressableArchive<T>,
151	cid: Cid,
152	pb_data: BoundedReader<T>,
153	links: &[PbLink],
154) -> BlockId {
155	let block = Block::new_dag_pb(cid, DagPb::directory(), pb_data);
156	car.add_directory(block, links)
157}
158
159fn load_symlink<T>(
160	car: &mut ContentAddressableArchive<T>,
161	cid: Cid,
162	pb_data: BoundedReader<T>,
163	unixfs: proto::Data,
164) -> DagPbResult<BlockId> {
165	let posix_path = unixfs.data.ok_or(UnixFsErr::MissingSymlinkInfo)?;
166	let posix_path_utf8 = String::try_from(posix_path).map_err(|_| UnixFsErr::SymlinkPathUtf8)?;
167	let block = Block::new_dag_pb(cid, DagPb::symlink(posix_path_utf8), pb_data);
168	Ok(car.add_block(block))
169}
170
171fn load_file<T: Read + Seek>(
172	car: &mut ContentAddressableArchive<T>,
173	cid: Cid,
174	pb_node: PbNode,
175	unixfs: proto::Data,
176	pb_data: BoundedReader<T>,
177	data: BoundedReader<T>,
178) -> DagPbResult<BlockId> {
179	// Load as SBF
180	if pb_node.links.is_empty() {
181		let embedded_data = unixfs.data.and_then(|data| slice_ref(pb_data.clone_and_rewind(), &data));
182		let data = embedded_data.unwrap_or(data);
183		let sbf = DagPb::single_block_file(data);
184		let block = Block::new_dag_pb(cid, sbf, pb_data);
185		return Ok(car.add_block(block));
186	}
187
188	// Load as MBF
189	let mbf = DagPb::multi_block_file(unixfs.blocksizes, data);
190	let block = Block::new_dag_pb(cid, mbf, pb_data);
191	Ok(car.add_multi_block_file(block, &pb_node.links))
192}
193
194/// It tries to decode a `pbNode` using a progressive increase on the buffer capacity, up to 2 `MiB`.
195fn decode_pb_node<R: Read>(reader: &mut R, block_len: u64) -> DagPbResult<PbNode> {
196	let caps = buffer_capacities_to_decode_pb_node(block_len);
197	let init_capacity = caps.first().copied().unwrap_or(BUF_LIMIT);
198	let mut buf = BytesMut::with_capacity(init_capacity);
199
200	for cap in caps {
201		// Reserve and real from `reader`, trying to fill `buf_len`
202		let additional_cap = cap.saturating_sub(buf.capacity());
203		if additional_cap > 0 {
204			buf.reserve(additional_cap);
205		}
206
207		let buf_len = buf.len();
208		let mut writer = buf.writer();
209		let additional = cap.saturating_sub(buf_len);
210		copy(&mut reader.take(additional as u64), &mut writer)?;
211
212		// Try to decode the PbNode.
213		let freezed_buf = writer.into_inner().freeze();
214		if let Ok(pb_node) = PbNode::from_bytes(freezed_buf.clone()) {
215			return Ok(pb_node);
216		}
217		buf = freezed_buf.try_into_mut().map_err(|_| DagPbErr::ExceedBufLimitOnDecode)?;
218	}
219
220	Err(DagPbErr::ExceedBufLimitOnDecode)
221}
222
223fn buffer_capacities_to_decode_pb_node(block_len: u64) -> Vec<usize> {
224	let max_buf_cap: usize = min(block_len.try_into().unwrap_or(usize::MAX), BUF_LIMIT);
225	let mut buf_caps: Vec<usize> = BUF_CAPS.to_vec();
226	buf_caps.push(max_buf_cap);
227
228	buf_caps.sort();
229	buf_caps.retain(|len| *len <= max_buf_cap);
230	buf_caps
231}