1use crate::{
2 bounded_reader::{
3 functions::slice_ref,
4 sync::BoundedReader,
5 traits::{Bounded, CloneAndRewind},
6 },
7 car::{Block, BlockId, ContentAddressableArchive},
8 error::{DagPbErr, DagPbResult, UnixFsErr},
9 fail, proto,
10 traits::ContextLen,
11};
12
13use bytes::{BufMut, BytesMut};
14use derivative::Derivative;
15use libipld::{
16 pb::{PbLink, PbNode},
17 Cid,
18};
19use nbytes::bytes;
20use prost::Message;
21use quick_protobuf::message::MessageWrite;
22use std::{
23 cmp::min,
24 io::{copy, Read, Seek},
25};
26
27mod multi_block_file;
28pub(crate) use multi_block_file::MultiBlockFile;
29mod symlink;
30pub use symlink::Symlink;
31mod link;
32pub use link::{BlockLink, Link, NamedLink};
33
34const BUF_LIMIT: usize = bytes!(2; MiB);
35const BUF_CAPS: &[usize] = &[bytes!(4; KiB), bytes!(16; KiB), bytes!(128; KiB), bytes!(512; KiB), bytes!(1; MiB)];
36
37#[derive(Debug, Clone)]
38pub enum DagPbType {
39 Dir,
40 Symlink(Symlink),
41 SingleBlockFile,
42 MultiBlockFile(MultiBlockFile),
43 MissingBlock(Box<PbLink>),
44}
45
46#[derive(derive_more::Debug, Derivative)]
47#[derivative(Clone(bound = ""))]
48pub struct DagPb<T> {
49 pub r#type: DagPbType,
50 pub data: BoundedReader<T>,
51}
52
53impl<T> DagPb<T> {
54 pub fn directory() -> Self {
55 Self::no_data(DagPbType::Dir)
56 }
57
58 pub fn symlink<S: Into<String>>(posix_path: S) -> Self {
59 Self::no_data(DagPbType::Symlink(Symlink::new(posix_path)))
60 }
61
62 #[inline]
63 pub fn single_block_file<D: Into<BoundedReader<T>>>(data: D) -> Self {
64 Self { r#type: DagPbType::SingleBlockFile, data: data.into() }
65 }
66
67 #[inline]
68 pub fn multi_block_file<BS, D>(blocksizes: BS, data: D) -> Self
69 where
70 BS: Into<MultiBlockFile>,
71 D: Into<BoundedReader<T>>,
72 {
73 Self { r#type: DagPbType::MultiBlockFile(blocksizes.into()), data: data.into() }
74 }
75
76 #[inline]
77 pub fn no_data(r#type: DagPbType) -> Self {
78 Self { r#type, data: Default::default() }
79 }
80
81 pub fn as_sfb_data(&self) -> Option<BoundedReader<T>> {
82 match &self.r#type {
83 DagPbType::SingleBlockFile => Some(self.data.clone_and_rewind()),
84 _ => None,
85 }
86 }
87}
88
89impl<T> From<DagPbType> for DagPb<T> {
90 fn from(r#type: DagPbType) -> Self {
91 Self { r#type, data: ().into() }
92 }
93}
94
95impl<T> ContextLen for DagPb<T> {
96 fn data_len(&self) -> u64 {
97 match &self.r#type {
98 DagPbType::Dir | DagPbType::MissingBlock(..) | DagPbType::Symlink(..) => 0u64,
99 DagPbType::SingleBlockFile => self.data.bound_len(),
100 DagPbType::MultiBlockFile(mbf) => mbf.block_sizes.iter().sum(),
101 }
102 }
103
104 fn pb_data_len(&self) -> u64 {
105 match &self.r#type {
106 DagPbType::SingleBlockFile => 0u64,
107 _ => self.data.bound_len(),
108 }
109 }
110}
111
112impl<T: Read + Seek> DagPb<T> {
116 pub fn load(
117 car: &mut ContentAddressableArchive<T>,
118 cid: Cid,
119 mut block_data: BoundedReader<T>,
120 ) -> DagPbResult<BlockId> {
121 let decode_pb_node_max = block_data.bound_len();
123 let pb_node = decode_pb_node(&mut block_data, decode_pb_node_max)?;
124 let pb_node_len = pb_node.get_size() as u64;
125 debug_assert_eq!(pb_node_len, pb_node.clone().into_bytes().len() as u64);
126 let pb_data = block_data.sub(..pb_node_len)?;
127 let data = block_data.sub(pb_node_len..)?;
128
129 let pb_node_data_enc = pb_node.data.clone().ok_or(UnixFsErr::MissingData)?;
131 let unixfs = proto::Data::decode(pb_node_data_enc).map_err(|_| UnixFsErr::InvalidData)?;
132
133 let unixfs_type = proto::data::DataType::try_from(unixfs.r#type)
134 .map_err(|_| UnixFsErr::DataTypeNotSupported(unixfs.r#type))?;
135
136 let id = match unixfs_type {
137 proto::data::DataType::Directory => load_directory(car, cid, pb_data, &pb_node.links),
138 proto::data::DataType::Symlink => load_symlink(car, cid, pb_data, unixfs)?,
139 proto::data::DataType::Raw =>
140 car.add_block(Block::new_dag_pb(cid, DagPb::single_block_file(data), pb_data)),
141 proto::data::DataType::File => load_file(car, cid, pb_node, unixfs, pb_data, data)?,
142 _ => fail!(UnixFsErr::DataTypeNotSupported(unixfs.r#type)),
143 };
144
145 Ok(id)
146 }
147}
148
149fn load_directory<T>(
150 car: &mut ContentAddressableArchive<T>,
151 cid: Cid,
152 pb_data: BoundedReader<T>,
153 links: &[PbLink],
154) -> BlockId {
155 let block = Block::new_dag_pb(cid, DagPb::directory(), pb_data);
156 car.add_directory(block, links)
157}
158
159fn load_symlink<T>(
160 car: &mut ContentAddressableArchive<T>,
161 cid: Cid,
162 pb_data: BoundedReader<T>,
163 unixfs: proto::Data,
164) -> DagPbResult<BlockId> {
165 let posix_path = unixfs.data.ok_or(UnixFsErr::MissingSymlinkInfo)?;
166 let posix_path_utf8 = String::try_from(posix_path).map_err(|_| UnixFsErr::SymlinkPathUtf8)?;
167 let block = Block::new_dag_pb(cid, DagPb::symlink(posix_path_utf8), pb_data);
168 Ok(car.add_block(block))
169}
170
171fn load_file<T: Read + Seek>(
172 car: &mut ContentAddressableArchive<T>,
173 cid: Cid,
174 pb_node: PbNode,
175 unixfs: proto::Data,
176 pb_data: BoundedReader<T>,
177 data: BoundedReader<T>,
178) -> DagPbResult<BlockId> {
179 if pb_node.links.is_empty() {
181 let embedded_data = unixfs.data.and_then(|data| slice_ref(pb_data.clone_and_rewind(), &data));
182 let data = embedded_data.unwrap_or(data);
183 let sbf = DagPb::single_block_file(data);
184 let block = Block::new_dag_pb(cid, sbf, pb_data);
185 return Ok(car.add_block(block));
186 }
187
188 let mbf = DagPb::multi_block_file(unixfs.blocksizes, data);
190 let block = Block::new_dag_pb(cid, mbf, pb_data);
191 Ok(car.add_multi_block_file(block, &pb_node.links))
192}
193
194fn decode_pb_node<R: Read>(reader: &mut R, block_len: u64) -> DagPbResult<PbNode> {
196 let caps = buffer_capacities_to_decode_pb_node(block_len);
197 let init_capacity = caps.first().copied().unwrap_or(BUF_LIMIT);
198 let mut buf = BytesMut::with_capacity(init_capacity);
199
200 for cap in caps {
201 let additional_cap = cap.saturating_sub(buf.capacity());
203 if additional_cap > 0 {
204 buf.reserve(additional_cap);
205 }
206
207 let buf_len = buf.len();
208 let mut writer = buf.writer();
209 let additional = cap.saturating_sub(buf_len);
210 copy(&mut reader.take(additional as u64), &mut writer)?;
211
212 let freezed_buf = writer.into_inner().freeze();
214 if let Ok(pb_node) = PbNode::from_bytes(freezed_buf.clone()) {
215 return Ok(pb_node);
216 }
217 buf = freezed_buf.try_into_mut().map_err(|_| DagPbErr::ExceedBufLimitOnDecode)?;
218 }
219
220 Err(DagPbErr::ExceedBufLimitOnDecode)
221}
222
223fn buffer_capacities_to_decode_pb_node(block_len: u64) -> Vec<usize> {
224 let max_buf_cap: usize = min(block_len.try_into().unwrap_or(usize::MAX), BUF_LIMIT);
225 let mut buf_caps: Vec<usize> = BUF_CAPS.to_vec();
226 buf_caps.push(max_buf_cap);
227
228 buf_caps.sort();
229 buf_caps.retain(|len| *len <= max_buf_cap);
230 buf_caps
231}