Skip to main content

hashtree_core/hashtree/
walk.rs

1use super::*;
2
3impl<S: Store> HashTree<S> {
4    /// Walk entire tree depth-first (returns Vec)
5    pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
6        let mut entries = Vec::new();
7        self.walk_recursive(cid, path, &mut entries).await?;
8        Ok(entries)
9    }
10
11    async fn walk_recursive(
12        &self,
13        cid: &Cid,
14        path: &str,
15        entries: &mut Vec<WalkEntry>,
16    ) -> Result<(), HashTreeError> {
17        let data = match self
18            .store
19            .get(&cid.hash)
20            .await
21            .map_err(|e| HashTreeError::Store(e.to_string()))?
22        {
23            Some(d) => d,
24            None => return Ok(()),
25        };
26
27        // Decrypt if key is present
28        let data = if let Some(key) = &cid.key {
29            decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
30        } else {
31            data
32        };
33
34        let node = match try_decode_tree_node(&data) {
35            Some(n) => n,
36            None => {
37                entries.push(WalkEntry {
38                    path: path.to_string(),
39                    hash: cid.hash,
40                    link_type: LinkType::Blob,
41                    size: data.len() as u64,
42                    key: cid.key,
43                });
44                return Ok(());
45            }
46        };
47
48        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
49        entries.push(WalkEntry {
50            path: path.to_string(),
51            hash: cid.hash,
52            link_type: node.node_type,
53            size: node_size,
54            key: cid.key,
55        });
56
57        for link in &node.links {
58            let child_path = match &link.name {
59                Some(name) => {
60                    if Self::is_internal_directory_link(&node, link) {
61                        let sub_cid = Cid {
62                            hash: link.hash,
63                            key: link.key,
64                        };
65                        Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
66                        continue;
67                    }
68                    if path.is_empty() {
69                        name.clone()
70                    } else {
71                        format!("{}/{}", path, name)
72                    }
73                }
74                None => path.to_string(),
75            };
76
77            // Child nodes use their own key from link
78            let child_cid = Cid {
79                hash: link.hash,
80                key: link.key,
81            };
82            Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
83        }
84
85        Ok(())
86    }
87
88    /// Walk entire tree with parallel fetching
89    /// Uses a work-stealing approach: always keeps `concurrency` requests in flight
90    pub async fn walk_parallel(
91        &self,
92        cid: &Cid,
93        path: &str,
94        concurrency: usize,
95    ) -> Result<Vec<WalkEntry>, HashTreeError> {
96        self.walk_parallel_with_progress(cid, path, concurrency, None)
97            .await
98    }
99
100    /// Walk entire tree with parallel fetching and optional progress counter
101    /// The counter is incremented for each node fetched (not just entries found)
102    ///
103    /// OPTIMIZATION: Blobs are NOT fetched - their metadata (hash, size, link_type)
104    /// comes from the parent node's link, so we just add them directly to entries.
105    /// This avoids downloading file contents during tree traversal.
106    pub async fn walk_parallel_with_progress(
107        &self,
108        cid: &Cid,
109        path: &str,
110        concurrency: usize,
111        progress: Option<&std::sync::atomic::AtomicUsize>,
112    ) -> Result<Vec<WalkEntry>, HashTreeError> {
113        use futures::stream::{FuturesUnordered, StreamExt};
114        use std::collections::VecDeque;
115        use std::sync::atomic::Ordering;
116
117        let mut entries = Vec::new();
118        let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
119        let mut active = FuturesUnordered::new();
120
121        // Seed with root
122        pending.push_back((cid.clone(), path.to_string()));
123
124        loop {
125            // Fill up to concurrency limit from pending queue
126            while active.len() < concurrency {
127                if let Some((node_cid, node_path)) = pending.pop_front() {
128                    let store = &self.store;
129                    let fut = async move {
130                        let data = store
131                            .get(&node_cid.hash)
132                            .await
133                            .map_err(|e| HashTreeError::Store(e.to_string()))?;
134                        Ok::<_, HashTreeError>((node_cid, node_path, data))
135                    };
136                    active.push(fut);
137                } else {
138                    break;
139                }
140            }
141
142            // If nothing active, we're done
143            if active.is_empty() {
144                break;
145            }
146
147            // Wait for any future to complete
148            if let Some(result) = active.next().await {
149                let (node_cid, node_path, data) = result?;
150
151                // Update progress counter
152                if let Some(counter) = progress {
153                    counter.fetch_add(1, Ordering::Relaxed);
154                }
155
156                let data = match data {
157                    Some(d) => d,
158                    None => continue,
159                };
160
161                // Decrypt if key is present
162                let data = if let Some(key) = &node_cid.key {
163                    decrypt_chk(&data, key).map_err(|e| {
164                        HashTreeError::Decryption(format!(
165                            "{} at path '{}' hash {} key {}",
166                            e,
167                            node_path,
168                            hex::encode(node_cid.hash),
169                            hex::encode(key)
170                        ))
171                    })?
172                } else {
173                    data
174                };
175
176                let node = match try_decode_tree_node(&data) {
177                    Some(n) => n,
178                    None => {
179                        // It's a blob/file - this case only happens for root
180                        entries.push(WalkEntry {
181                            path: node_path,
182                            hash: node_cid.hash,
183                            link_type: LinkType::Blob,
184                            size: data.len() as u64,
185                            key: node_cid.key,
186                        });
187                        continue;
188                    }
189                };
190
191                // It's a directory/file node
192                let node_size: u64 = node.links.iter().map(|l| l.size).sum();
193                entries.push(WalkEntry {
194                    path: node_path.clone(),
195                    hash: node_cid.hash,
196                    link_type: node.node_type,
197                    size: node_size,
198                    key: node_cid.key,
199                });
200
201                // Queue children - but DON'T fetch blobs, just add them directly
202                for link in &node.links {
203                    let child_path = match &link.name {
204                        Some(name) => {
205                            if Self::is_internal_directory_link(&node, link) {
206                                let sub_cid = Cid {
207                                    hash: link.hash,
208                                    key: link.key,
209                                };
210                                pending.push_back((sub_cid, node_path.clone()));
211                                continue;
212                            }
213                            if node_path.is_empty() {
214                                name.clone()
215                            } else {
216                                format!("{}/{}", node_path, name)
217                            }
218                        }
219                        None => node_path.clone(),
220                    };
221
222                    // OPTIMIZATION: If it's a blob, add entry directly without fetching
223                    // The link already contains all the metadata we need
224                    if link.link_type == LinkType::Blob {
225                        entries.push(WalkEntry {
226                            path: child_path,
227                            hash: link.hash,
228                            link_type: LinkType::Blob,
229                            size: link.size,
230                            key: link.key,
231                        });
232                        if let Some(counter) = progress {
233                            counter.fetch_add(1, Ordering::Relaxed);
234                        }
235                        continue;
236                    }
237
238                    // For tree nodes (File/Dir), we need to fetch to see their children
239                    let child_cid = Cid {
240                        hash: link.hash,
241                        key: link.key,
242                    };
243                    pending.push_back((child_cid, child_path));
244                }
245            }
246        }
247
248        Ok(entries)
249    }
250
251    /// Walk tree as stream
252    pub fn walk_stream(
253        &self,
254        cid: Cid,
255        initial_path: String,
256    ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
257        Box::pin(stream::unfold(
258            WalkStreamState::Init {
259                cid,
260                path: initial_path,
261                tree: self,
262            },
263            |state| async move {
264                match state {
265                    WalkStreamState::Init { cid, path, tree } => {
266                        let data = match tree.store.get(&cid.hash).await {
267                            Ok(Some(d)) => d,
268                            Ok(None) => return None,
269                            Err(e) => {
270                                return Some((
271                                    Err(HashTreeError::Store(e.to_string())),
272                                    WalkStreamState::Done,
273                                ))
274                            }
275                        };
276
277                        // Decrypt if key is present
278                        let data = if let Some(key) = &cid.key {
279                            match decrypt_chk(&data, key) {
280                                Ok(d) => d,
281                                Err(e) => {
282                                    return Some((
283                                        Err(HashTreeError::Decryption(format!(
284                                            "{} at path '{}' hash {} key {}",
285                                            e,
286                                            path,
287                                            hex::encode(cid.hash),
288                                            hex::encode(key)
289                                        ))),
290                                        WalkStreamState::Done,
291                                    ))
292                                }
293                            }
294                        } else {
295                            data
296                        };
297
298                        let node = match try_decode_tree_node(&data) {
299                            Some(n) => n,
300                            None => {
301                                // Blob data
302                                let entry = WalkEntry {
303                                    path,
304                                    hash: cid.hash,
305                                    link_type: LinkType::Blob,
306                                    size: data.len() as u64,
307                                    key: cid.key,
308                                };
309                                return Some((Ok(entry), WalkStreamState::Done));
310                            }
311                        };
312
313                        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
314                        let entry = WalkEntry {
315                            path: path.clone(),
316                            hash: cid.hash,
317                            link_type: node.node_type,
318                            size: node_size,
319                            key: cid.key,
320                        };
321
322                        // Create stack with children to process
323                        let mut stack: Vec<WalkStackItem> = Vec::new();
324                        let uses_fanout = Self::node_uses_directory_fanout(&node);
325                        for link in node.links.into_iter().rev() {
326                            let is_internal =
327                                Self::is_internal_directory_link_with_fanout(&link, uses_fanout);
328                            let child_path = match &link.name {
329                                Some(name) if !is_internal => {
330                                    if path.is_empty() {
331                                        name.clone()
332                                    } else {
333                                        format!("{}/{}", path, name)
334                                    }
335                                }
336                                _ => path.clone(),
337                            };
338                            // Child nodes use their own key from link
339                            stack.push(WalkStackItem {
340                                hash: link.hash,
341                                path: child_path,
342                                key: link.key,
343                            });
344                        }
345
346                        Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
347                    }
348                    WalkStreamState::Processing { mut stack, tree } => {
349                        tree.process_walk_stack(&mut stack).await
350                    }
351                    WalkStreamState::Done => None,
352                }
353            },
354        ))
355    }
356
357    async fn process_walk_stack<'a>(
358        &'a self,
359        stack: &mut Vec<WalkStackItem>,
360    ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
361        while let Some(item) = stack.pop() {
362            let data = match self.store.get(&item.hash).await {
363                Ok(Some(d)) => d,
364                Ok(None) => continue,
365                Err(e) => {
366                    return Some((
367                        Err(HashTreeError::Store(e.to_string())),
368                        WalkStreamState::Done,
369                    ))
370                }
371            };
372
373            let data = if let Some(key) = &item.key {
374                match decrypt_chk(&data, key) {
375                    Ok(d) => d,
376                    Err(e) => {
377                        return Some((
378                            Err(HashTreeError::Decryption(format!(
379                                "{} at path '{}' hash {} key {}",
380                                e,
381                                item.path,
382                                hex::encode(item.hash),
383                                hex::encode(key)
384                            ))),
385                            WalkStreamState::Done,
386                        ))
387                    }
388                }
389            } else {
390                data
391            };
392
393            let node = match try_decode_tree_node(&data) {
394                Some(n) => n,
395                None => {
396                    // Blob data
397                    let entry = WalkEntry {
398                        path: item.path,
399                        hash: item.hash,
400                        link_type: LinkType::Blob,
401                        size: data.len() as u64,
402                        key: item.key,
403                    };
404                    return Some((
405                        Ok(entry),
406                        WalkStreamState::Processing {
407                            stack: std::mem::take(stack),
408                            tree: self,
409                        },
410                    ));
411                }
412            };
413
414            let node_size: u64 = node.links.iter().map(|l| l.size).sum();
415            let entry = WalkEntry {
416                path: item.path.clone(),
417                hash: item.hash,
418                link_type: node.node_type,
419                size: node_size,
420                key: item.key,
421            };
422
423            // Push children to stack
424            let uses_fanout = Self::node_uses_directory_fanout(&node);
425            for link in node.links.into_iter().rev() {
426                let is_internal = Self::is_internal_directory_link_with_fanout(&link, uses_fanout);
427                let child_path = match &link.name {
428                    Some(name) if !is_internal => {
429                        if item.path.is_empty() {
430                            name.clone()
431                        } else {
432                            format!("{}/{}", item.path, name)
433                        }
434                    }
435                    _ => item.path.clone(),
436                };
437                stack.push(WalkStackItem {
438                    hash: link.hash,
439                    path: child_path,
440                    key: link.key,
441                });
442            }
443
444            return Some((
445                Ok(entry),
446                WalkStreamState::Processing {
447                    stack: std::mem::take(stack),
448                    tree: self,
449                },
450            ));
451        }
452        None
453    }
454}
455
456struct WalkStackItem {
457    hash: Hash,
458    path: String,
459    key: Option<[u8; 32]>,
460}
461
462enum WalkStreamState<'a, S: Store> {
463    Init {
464        cid: Cid,
465        path: String,
466        tree: &'a HashTree<S>,
467    },
468    Processing {
469        stack: Vec<WalkStackItem>,
470        tree: &'a HashTree<S>,
471    },
472    Done,
473}