1use super::*;
2
3impl<S: Store> HashTree<S> {
4 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
6 let mut entries = Vec::new();
7 self.walk_recursive(cid, path, &mut entries).await?;
8 Ok(entries)
9 }
10
11 async fn walk_recursive(
12 &self,
13 cid: &Cid,
14 path: &str,
15 entries: &mut Vec<WalkEntry>,
16 ) -> Result<(), HashTreeError> {
17 let data = match self
18 .store
19 .get(&cid.hash)
20 .await
21 .map_err(|e| HashTreeError::Store(e.to_string()))?
22 {
23 Some(d) => d,
24 None => return Ok(()),
25 };
26
27 let data = if let Some(key) = &cid.key {
29 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
30 } else {
31 data
32 };
33
34 let node = match try_decode_tree_node(&data) {
35 Some(n) => n,
36 None => {
37 entries.push(WalkEntry {
38 path: path.to_string(),
39 hash: cid.hash,
40 link_type: LinkType::Blob,
41 size: data.len() as u64,
42 key: cid.key,
43 });
44 return Ok(());
45 }
46 };
47
48 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
49 entries.push(WalkEntry {
50 path: path.to_string(),
51 hash: cid.hash,
52 link_type: node.node_type,
53 size: node_size,
54 key: cid.key,
55 });
56
57 for link in &node.links {
58 let child_path = match &link.name {
59 Some(name) => {
60 if Self::is_internal_directory_link(&node, link) {
61 let sub_cid = Cid {
62 hash: link.hash,
63 key: link.key,
64 };
65 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
66 continue;
67 }
68 if path.is_empty() {
69 name.clone()
70 } else {
71 format!("{}/{}", path, name)
72 }
73 }
74 None => path.to_string(),
75 };
76
77 let child_cid = Cid {
79 hash: link.hash,
80 key: link.key,
81 };
82 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
83 }
84
85 Ok(())
86 }
87
88 pub async fn walk_parallel(
91 &self,
92 cid: &Cid,
93 path: &str,
94 concurrency: usize,
95 ) -> Result<Vec<WalkEntry>, HashTreeError> {
96 self.walk_parallel_with_progress(cid, path, concurrency, None)
97 .await
98 }
99
100 pub async fn walk_parallel_with_progress(
107 &self,
108 cid: &Cid,
109 path: &str,
110 concurrency: usize,
111 progress: Option<&std::sync::atomic::AtomicUsize>,
112 ) -> Result<Vec<WalkEntry>, HashTreeError> {
113 use futures::stream::{FuturesUnordered, StreamExt};
114 use std::collections::VecDeque;
115 use std::sync::atomic::Ordering;
116
117 let mut entries = Vec::new();
118 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
119 let mut active = FuturesUnordered::new();
120
121 pending.push_back((cid.clone(), path.to_string()));
123
124 loop {
125 while active.len() < concurrency {
127 if let Some((node_cid, node_path)) = pending.pop_front() {
128 let store = &self.store;
129 let fut = async move {
130 let data = store
131 .get(&node_cid.hash)
132 .await
133 .map_err(|e| HashTreeError::Store(e.to_string()))?;
134 Ok::<_, HashTreeError>((node_cid, node_path, data))
135 };
136 active.push(fut);
137 } else {
138 break;
139 }
140 }
141
142 if active.is_empty() {
144 break;
145 }
146
147 if let Some(result) = active.next().await {
149 let (node_cid, node_path, data) = result?;
150
151 if let Some(counter) = progress {
153 counter.fetch_add(1, Ordering::Relaxed);
154 }
155
156 let data = match data {
157 Some(d) => d,
158 None => continue,
159 };
160
161 let data = if let Some(key) = &node_cid.key {
163 decrypt_chk(&data, key).map_err(|e| {
164 HashTreeError::Decryption(format!(
165 "{} at path '{}' hash {} key {}",
166 e,
167 node_path,
168 hex::encode(node_cid.hash),
169 hex::encode(key)
170 ))
171 })?
172 } else {
173 data
174 };
175
176 let node = match try_decode_tree_node(&data) {
177 Some(n) => n,
178 None => {
179 entries.push(WalkEntry {
181 path: node_path,
182 hash: node_cid.hash,
183 link_type: LinkType::Blob,
184 size: data.len() as u64,
185 key: node_cid.key,
186 });
187 continue;
188 }
189 };
190
191 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
193 entries.push(WalkEntry {
194 path: node_path.clone(),
195 hash: node_cid.hash,
196 link_type: node.node_type,
197 size: node_size,
198 key: node_cid.key,
199 });
200
201 for link in &node.links {
203 let child_path = match &link.name {
204 Some(name) => {
205 if Self::is_internal_directory_link(&node, link) {
206 let sub_cid = Cid {
207 hash: link.hash,
208 key: link.key,
209 };
210 pending.push_back((sub_cid, node_path.clone()));
211 continue;
212 }
213 if node_path.is_empty() {
214 name.clone()
215 } else {
216 format!("{}/{}", node_path, name)
217 }
218 }
219 None => node_path.clone(),
220 };
221
222 if link.link_type == LinkType::Blob {
225 entries.push(WalkEntry {
226 path: child_path,
227 hash: link.hash,
228 link_type: LinkType::Blob,
229 size: link.size,
230 key: link.key,
231 });
232 if let Some(counter) = progress {
233 counter.fetch_add(1, Ordering::Relaxed);
234 }
235 continue;
236 }
237
238 let child_cid = Cid {
240 hash: link.hash,
241 key: link.key,
242 };
243 pending.push_back((child_cid, child_path));
244 }
245 }
246 }
247
248 Ok(entries)
249 }
250
251 pub fn walk_stream(
253 &self,
254 cid: Cid,
255 initial_path: String,
256 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
257 Box::pin(stream::unfold(
258 WalkStreamState::Init {
259 cid,
260 path: initial_path,
261 tree: self,
262 },
263 |state| async move {
264 match state {
265 WalkStreamState::Init { cid, path, tree } => {
266 let data = match tree.store.get(&cid.hash).await {
267 Ok(Some(d)) => d,
268 Ok(None) => return None,
269 Err(e) => {
270 return Some((
271 Err(HashTreeError::Store(e.to_string())),
272 WalkStreamState::Done,
273 ))
274 }
275 };
276
277 let data = if let Some(key) = &cid.key {
279 match decrypt_chk(&data, key) {
280 Ok(d) => d,
281 Err(e) => {
282 return Some((
283 Err(HashTreeError::Decryption(format!(
284 "{} at path '{}' hash {} key {}",
285 e,
286 path,
287 hex::encode(cid.hash),
288 hex::encode(key)
289 ))),
290 WalkStreamState::Done,
291 ))
292 }
293 }
294 } else {
295 data
296 };
297
298 let node = match try_decode_tree_node(&data) {
299 Some(n) => n,
300 None => {
301 let entry = WalkEntry {
303 path,
304 hash: cid.hash,
305 link_type: LinkType::Blob,
306 size: data.len() as u64,
307 key: cid.key,
308 };
309 return Some((Ok(entry), WalkStreamState::Done));
310 }
311 };
312
313 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
314 let entry = WalkEntry {
315 path: path.clone(),
316 hash: cid.hash,
317 link_type: node.node_type,
318 size: node_size,
319 key: cid.key,
320 };
321
322 let mut stack: Vec<WalkStackItem> = Vec::new();
324 let uses_fanout = Self::node_uses_directory_fanout(&node);
325 for link in node.links.into_iter().rev() {
326 let is_internal =
327 Self::is_internal_directory_link_with_fanout(&link, uses_fanout);
328 let child_path = match &link.name {
329 Some(name) if !is_internal => {
330 if path.is_empty() {
331 name.clone()
332 } else {
333 format!("{}/{}", path, name)
334 }
335 }
336 _ => path.clone(),
337 };
338 stack.push(WalkStackItem {
340 hash: link.hash,
341 path: child_path,
342 key: link.key,
343 });
344 }
345
346 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
347 }
348 WalkStreamState::Processing { mut stack, tree } => {
349 tree.process_walk_stack(&mut stack).await
350 }
351 WalkStreamState::Done => None,
352 }
353 },
354 ))
355 }
356
357 async fn process_walk_stack<'a>(
358 &'a self,
359 stack: &mut Vec<WalkStackItem>,
360 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
361 while let Some(item) = stack.pop() {
362 let data = match self.store.get(&item.hash).await {
363 Ok(Some(d)) => d,
364 Ok(None) => continue,
365 Err(e) => {
366 return Some((
367 Err(HashTreeError::Store(e.to_string())),
368 WalkStreamState::Done,
369 ))
370 }
371 };
372
373 let data = if let Some(key) = &item.key {
374 match decrypt_chk(&data, key) {
375 Ok(d) => d,
376 Err(e) => {
377 return Some((
378 Err(HashTreeError::Decryption(format!(
379 "{} at path '{}' hash {} key {}",
380 e,
381 item.path,
382 hex::encode(item.hash),
383 hex::encode(key)
384 ))),
385 WalkStreamState::Done,
386 ))
387 }
388 }
389 } else {
390 data
391 };
392
393 let node = match try_decode_tree_node(&data) {
394 Some(n) => n,
395 None => {
396 let entry = WalkEntry {
398 path: item.path,
399 hash: item.hash,
400 link_type: LinkType::Blob,
401 size: data.len() as u64,
402 key: item.key,
403 };
404 return Some((
405 Ok(entry),
406 WalkStreamState::Processing {
407 stack: std::mem::take(stack),
408 tree: self,
409 },
410 ));
411 }
412 };
413
414 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
415 let entry = WalkEntry {
416 path: item.path.clone(),
417 hash: item.hash,
418 link_type: node.node_type,
419 size: node_size,
420 key: item.key,
421 };
422
423 let uses_fanout = Self::node_uses_directory_fanout(&node);
425 for link in node.links.into_iter().rev() {
426 let is_internal = Self::is_internal_directory_link_with_fanout(&link, uses_fanout);
427 let child_path = match &link.name {
428 Some(name) if !is_internal => {
429 if item.path.is_empty() {
430 name.clone()
431 } else {
432 format!("{}/{}", item.path, name)
433 }
434 }
435 _ => item.path.clone(),
436 };
437 stack.push(WalkStackItem {
438 hash: link.hash,
439 path: child_path,
440 key: link.key,
441 });
442 }
443
444 return Some((
445 Ok(entry),
446 WalkStreamState::Processing {
447 stack: std::mem::take(stack),
448 tree: self,
449 },
450 ));
451 }
452 None
453 }
454}
455
456struct WalkStackItem {
457 hash: Hash,
458 path: String,
459 key: Option<[u8; 32]>,
460}
461
462enum WalkStreamState<'a, S: Store> {
463 Init {
464 cid: Cid,
465 path: String,
466 tree: &'a HashTree<S>,
467 },
468 Processing {
469 stack: Vec<WalkStackItem>,
470 tree: &'a HashTree<S>,
471 },
472 Done,
473}