1use super::*;
2
3impl<S: Store> HashTree<S> {
4 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
6 let mut entries = Vec::new();
7 self.walk_recursive(cid, path, &mut entries).await?;
8 Ok(entries)
9 }
10
11 async fn walk_recursive(
12 &self,
13 cid: &Cid,
14 path: &str,
15 entries: &mut Vec<WalkEntry>,
16 ) -> Result<(), HashTreeError> {
17 let data = match self
18 .store
19 .get(&cid.hash)
20 .await
21 .map_err(|e| HashTreeError::Store(e.to_string()))?
22 {
23 Some(d) => d,
24 None => return Ok(()),
25 };
26
27 let data = if let Some(key) = &cid.key {
29 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
30 } else {
31 data
32 };
33
34 let node = match try_decode_tree_node(&data) {
35 Some(n) => n,
36 None => {
37 entries.push(WalkEntry {
38 path: path.to_string(),
39 hash: cid.hash,
40 link_type: LinkType::Blob,
41 size: data.len() as u64,
42 key: cid.key,
43 });
44 return Ok(());
45 }
46 };
47
48 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
49 entries.push(WalkEntry {
50 path: path.to_string(),
51 hash: cid.hash,
52 link_type: node.node_type,
53 size: node_size,
54 key: cid.key,
55 });
56
57 for link in &node.links {
58 let child_path = match &link.name {
59 Some(name) => {
60 if Self::is_internal_directory_link(&node, link) {
61 let sub_cid = Cid {
63 hash: link.hash,
64 key: cid.key,
65 };
66 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
67 continue;
68 }
69 if path.is_empty() {
70 name.clone()
71 } else {
72 format!("{}/{}", path, name)
73 }
74 }
75 None => path.to_string(),
76 };
77
78 let child_cid = Cid {
80 hash: link.hash,
81 key: link.key,
82 };
83 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
84 }
85
86 Ok(())
87 }
88
89 pub async fn walk_parallel(
92 &self,
93 cid: &Cid,
94 path: &str,
95 concurrency: usize,
96 ) -> Result<Vec<WalkEntry>, HashTreeError> {
97 self.walk_parallel_with_progress(cid, path, concurrency, None)
98 .await
99 }
100
101 pub async fn walk_parallel_with_progress(
108 &self,
109 cid: &Cid,
110 path: &str,
111 concurrency: usize,
112 progress: Option<&std::sync::atomic::AtomicUsize>,
113 ) -> Result<Vec<WalkEntry>, HashTreeError> {
114 use futures::stream::{FuturesUnordered, StreamExt};
115 use std::collections::VecDeque;
116 use std::sync::atomic::Ordering;
117
118 let mut entries = Vec::new();
119 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
120 let mut active = FuturesUnordered::new();
121
122 pending.push_back((cid.clone(), path.to_string()));
124
125 loop {
126 while active.len() < concurrency {
128 if let Some((node_cid, node_path)) = pending.pop_front() {
129 let store = &self.store;
130 let fut = async move {
131 let data = store
132 .get(&node_cid.hash)
133 .await
134 .map_err(|e| HashTreeError::Store(e.to_string()))?;
135 Ok::<_, HashTreeError>((node_cid, node_path, data))
136 };
137 active.push(fut);
138 } else {
139 break;
140 }
141 }
142
143 if active.is_empty() {
145 break;
146 }
147
148 if let Some(result) = active.next().await {
150 let (node_cid, node_path, data) = result?;
151
152 if let Some(counter) = progress {
154 counter.fetch_add(1, Ordering::Relaxed);
155 }
156
157 let data = match data {
158 Some(d) => d,
159 None => continue,
160 };
161
162 let data = if let Some(key) = &node_cid.key {
164 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
165 } else {
166 data
167 };
168
169 let node = match try_decode_tree_node(&data) {
170 Some(n) => n,
171 None => {
172 entries.push(WalkEntry {
174 path: node_path,
175 hash: node_cid.hash,
176 link_type: LinkType::Blob,
177 size: data.len() as u64,
178 key: node_cid.key,
179 });
180 continue;
181 }
182 };
183
184 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
186 entries.push(WalkEntry {
187 path: node_path.clone(),
188 hash: node_cid.hash,
189 link_type: node.node_type,
190 size: node_size,
191 key: node_cid.key,
192 });
193
194 for link in &node.links {
196 let child_path = match &link.name {
197 Some(name) => {
198 if Self::is_internal_directory_link(&node, link) {
199 let sub_cid = Cid {
201 hash: link.hash,
202 key: node_cid.key,
203 };
204 pending.push_back((sub_cid, node_path.clone()));
205 continue;
206 }
207 if node_path.is_empty() {
208 name.clone()
209 } else {
210 format!("{}/{}", node_path, name)
211 }
212 }
213 None => node_path.clone(),
214 };
215
216 if link.link_type == LinkType::Blob {
219 entries.push(WalkEntry {
220 path: child_path,
221 hash: link.hash,
222 link_type: LinkType::Blob,
223 size: link.size,
224 key: link.key,
225 });
226 if let Some(counter) = progress {
227 counter.fetch_add(1, Ordering::Relaxed);
228 }
229 continue;
230 }
231
232 let child_cid = Cid {
234 hash: link.hash,
235 key: link.key,
236 };
237 pending.push_back((child_cid, child_path));
238 }
239 }
240 }
241
242 Ok(entries)
243 }
244
245 pub fn walk_stream(
247 &self,
248 cid: Cid,
249 initial_path: String,
250 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
251 Box::pin(stream::unfold(
252 WalkStreamState::Init {
253 cid,
254 path: initial_path,
255 tree: self,
256 },
257 |state| async move {
258 match state {
259 WalkStreamState::Init { cid, path, tree } => {
260 let data = match tree.store.get(&cid.hash).await {
261 Ok(Some(d)) => d,
262 Ok(None) => return None,
263 Err(e) => {
264 return Some((
265 Err(HashTreeError::Store(e.to_string())),
266 WalkStreamState::Done,
267 ))
268 }
269 };
270
271 let data = if let Some(key) = &cid.key {
273 match decrypt_chk(&data, key) {
274 Ok(d) => d,
275 Err(e) => {
276 return Some((
277 Err(HashTreeError::Decryption(e.to_string())),
278 WalkStreamState::Done,
279 ))
280 }
281 }
282 } else {
283 data
284 };
285
286 let node = match try_decode_tree_node(&data) {
287 Some(n) => n,
288 None => {
289 let entry = WalkEntry {
291 path,
292 hash: cid.hash,
293 link_type: LinkType::Blob,
294 size: data.len() as u64,
295 key: cid.key,
296 };
297 return Some((Ok(entry), WalkStreamState::Done));
298 }
299 };
300
301 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
302 let entry = WalkEntry {
303 path: path.clone(),
304 hash: cid.hash,
305 link_type: node.node_type,
306 size: node_size,
307 key: cid.key,
308 };
309
310 let mut stack: Vec<WalkStackItem> = Vec::new();
312 let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
313 for link in node.links.into_iter().rev() {
314 let is_internal = Self::is_internal_directory_link_with_legacy_fanout(
315 &link,
316 uses_legacy_fanout,
317 );
318 let child_path = match &link.name {
319 Some(name) if !is_internal => {
320 if path.is_empty() {
321 name.clone()
322 } else {
323 format!("{}/{}", path, name)
324 }
325 }
326 _ => path.clone(),
327 };
328 stack.push(WalkStackItem {
330 hash: link.hash,
331 path: child_path,
332 key: link.key,
333 });
334 }
335
336 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
337 }
338 WalkStreamState::Processing { mut stack, tree } => {
339 tree.process_walk_stack(&mut stack).await
340 }
341 WalkStreamState::Done => None,
342 }
343 },
344 ))
345 }
346
347 async fn process_walk_stack<'a>(
348 &'a self,
349 stack: &mut Vec<WalkStackItem>,
350 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
351 while let Some(item) = stack.pop() {
352 let data = match self.store.get(&item.hash).await {
353 Ok(Some(d)) => d,
354 Ok(None) => continue,
355 Err(e) => {
356 return Some((
357 Err(HashTreeError::Store(e.to_string())),
358 WalkStreamState::Done,
359 ))
360 }
361 };
362
363 let node = match try_decode_tree_node(&data) {
364 Some(n) => n,
365 None => {
366 let entry = WalkEntry {
368 path: item.path,
369 hash: item.hash,
370 link_type: LinkType::Blob,
371 size: data.len() as u64,
372 key: item.key,
373 };
374 return Some((
375 Ok(entry),
376 WalkStreamState::Processing {
377 stack: std::mem::take(stack),
378 tree: self,
379 },
380 ));
381 }
382 };
383
384 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
385 let entry = WalkEntry {
386 path: item.path.clone(),
387 hash: item.hash,
388 link_type: node.node_type,
389 size: node_size,
390 key: None, };
392
393 let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
395 for link in node.links.into_iter().rev() {
396 let is_internal =
397 Self::is_internal_directory_link_with_legacy_fanout(&link, uses_legacy_fanout);
398 let child_path = match &link.name {
399 Some(name) if !is_internal => {
400 if item.path.is_empty() {
401 name.clone()
402 } else {
403 format!("{}/{}", item.path, name)
404 }
405 }
406 _ => item.path.clone(),
407 };
408 stack.push(WalkStackItem {
409 hash: link.hash,
410 path: child_path,
411 key: link.key,
412 });
413 }
414
415 return Some((
416 Ok(entry),
417 WalkStreamState::Processing {
418 stack: std::mem::take(stack),
419 tree: self,
420 },
421 ));
422 }
423 None
424 }
425}
426
427struct WalkStackItem {
428 hash: Hash,
429 path: String,
430 key: Option<[u8; 32]>,
431}
432
433enum WalkStreamState<'a, S: Store> {
434 Init {
435 cid: Cid,
436 path: String,
437 tree: &'a HashTree<S>,
438 },
439 Processing {
440 stack: Vec<WalkStackItem>,
441 tree: &'a HashTree<S>,
442 },
443 Done,
444}