Skip to main content

hashtree_core/hashtree/
walk.rs

1use super::*;
2
3impl<S: Store> HashTree<S> {
4    /// Walk entire tree depth-first (returns Vec)
5    pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
6        let mut entries = Vec::new();
7        self.walk_recursive(cid, path, &mut entries).await?;
8        Ok(entries)
9    }
10
11    async fn walk_recursive(
12        &self,
13        cid: &Cid,
14        path: &str,
15        entries: &mut Vec<WalkEntry>,
16    ) -> Result<(), HashTreeError> {
17        let data = match self
18            .store
19            .get(&cid.hash)
20            .await
21            .map_err(|e| HashTreeError::Store(e.to_string()))?
22        {
23            Some(d) => d,
24            None => return Ok(()),
25        };
26
27        // Decrypt if key is present
28        let data = if let Some(key) = &cid.key {
29            decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
30        } else {
31            data
32        };
33
34        let node = match try_decode_tree_node(&data) {
35            Some(n) => n,
36            None => {
37                entries.push(WalkEntry {
38                    path: path.to_string(),
39                    hash: cid.hash,
40                    link_type: LinkType::Blob,
41                    size: data.len() as u64,
42                    key: cid.key,
43                });
44                return Ok(());
45            }
46        };
47
48        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
49        entries.push(WalkEntry {
50            path: path.to_string(),
51            hash: cid.hash,
52            link_type: node.node_type,
53            size: node_size,
54            key: cid.key,
55        });
56
57        for link in &node.links {
58            let child_path = match &link.name {
59                Some(name) => {
60                    if Self::is_internal_directory_link(&node, link) {
61                        // Internal nodes inherit parent's key
62                        let sub_cid = Cid {
63                            hash: link.hash,
64                            key: cid.key,
65                        };
66                        Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
67                        continue;
68                    }
69                    if path.is_empty() {
70                        name.clone()
71                    } else {
72                        format!("{}/{}", path, name)
73                    }
74                }
75                None => path.to_string(),
76            };
77
78            // Child nodes use their own key from link
79            let child_cid = Cid {
80                hash: link.hash,
81                key: link.key,
82            };
83            Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
84        }
85
86        Ok(())
87    }
88
89    /// Walk entire tree with parallel fetching
90    /// Uses a work-stealing approach: always keeps `concurrency` requests in flight
91    pub async fn walk_parallel(
92        &self,
93        cid: &Cid,
94        path: &str,
95        concurrency: usize,
96    ) -> Result<Vec<WalkEntry>, HashTreeError> {
97        self.walk_parallel_with_progress(cid, path, concurrency, None)
98            .await
99    }
100
101    /// Walk entire tree with parallel fetching and optional progress counter
102    /// The counter is incremented for each node fetched (not just entries found)
103    ///
104    /// OPTIMIZATION: Blobs are NOT fetched - their metadata (hash, size, link_type)
105    /// comes from the parent node's link, so we just add them directly to entries.
106    /// This avoids downloading file contents during tree traversal.
107    pub async fn walk_parallel_with_progress(
108        &self,
109        cid: &Cid,
110        path: &str,
111        concurrency: usize,
112        progress: Option<&std::sync::atomic::AtomicUsize>,
113    ) -> Result<Vec<WalkEntry>, HashTreeError> {
114        use futures::stream::{FuturesUnordered, StreamExt};
115        use std::collections::VecDeque;
116        use std::sync::atomic::Ordering;
117
118        let mut entries = Vec::new();
119        let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
120        let mut active = FuturesUnordered::new();
121
122        // Seed with root
123        pending.push_back((cid.clone(), path.to_string()));
124
125        loop {
126            // Fill up to concurrency limit from pending queue
127            while active.len() < concurrency {
128                if let Some((node_cid, node_path)) = pending.pop_front() {
129                    let store = &self.store;
130                    let fut = async move {
131                        let data = store
132                            .get(&node_cid.hash)
133                            .await
134                            .map_err(|e| HashTreeError::Store(e.to_string()))?;
135                        Ok::<_, HashTreeError>((node_cid, node_path, data))
136                    };
137                    active.push(fut);
138                } else {
139                    break;
140                }
141            }
142
143            // If nothing active, we're done
144            if active.is_empty() {
145                break;
146            }
147
148            // Wait for any future to complete
149            if let Some(result) = active.next().await {
150                let (node_cid, node_path, data) = result?;
151
152                // Update progress counter
153                if let Some(counter) = progress {
154                    counter.fetch_add(1, Ordering::Relaxed);
155                }
156
157                let data = match data {
158                    Some(d) => d,
159                    None => continue,
160                };
161
162                // Decrypt if key is present
163                let data = if let Some(key) = &node_cid.key {
164                    decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
165                } else {
166                    data
167                };
168
169                let node = match try_decode_tree_node(&data) {
170                    Some(n) => n,
171                    None => {
172                        // It's a blob/file - this case only happens for root
173                        entries.push(WalkEntry {
174                            path: node_path,
175                            hash: node_cid.hash,
176                            link_type: LinkType::Blob,
177                            size: data.len() as u64,
178                            key: node_cid.key,
179                        });
180                        continue;
181                    }
182                };
183
184                // It's a directory/file node
185                let node_size: u64 = node.links.iter().map(|l| l.size).sum();
186                entries.push(WalkEntry {
187                    path: node_path.clone(),
188                    hash: node_cid.hash,
189                    link_type: node.node_type,
190                    size: node_size,
191                    key: node_cid.key,
192                });
193
194                // Queue children - but DON'T fetch blobs, just add them directly
195                for link in &node.links {
196                    let child_path = match &link.name {
197                        Some(name) => {
198                            if Self::is_internal_directory_link(&node, link) {
199                                // Internal chunked nodes - inherit parent's key, same path
200                                let sub_cid = Cid {
201                                    hash: link.hash,
202                                    key: node_cid.key,
203                                };
204                                pending.push_back((sub_cid, node_path.clone()));
205                                continue;
206                            }
207                            if node_path.is_empty() {
208                                name.clone()
209                            } else {
210                                format!("{}/{}", node_path, name)
211                            }
212                        }
213                        None => node_path.clone(),
214                    };
215
216                    // OPTIMIZATION: If it's a blob, add entry directly without fetching
217                    // The link already contains all the metadata we need
218                    if link.link_type == LinkType::Blob {
219                        entries.push(WalkEntry {
220                            path: child_path,
221                            hash: link.hash,
222                            link_type: LinkType::Blob,
223                            size: link.size,
224                            key: link.key,
225                        });
226                        if let Some(counter) = progress {
227                            counter.fetch_add(1, Ordering::Relaxed);
228                        }
229                        continue;
230                    }
231
232                    // For tree nodes (File/Dir), we need to fetch to see their children
233                    let child_cid = Cid {
234                        hash: link.hash,
235                        key: link.key,
236                    };
237                    pending.push_back((child_cid, child_path));
238                }
239            }
240        }
241
242        Ok(entries)
243    }
244
245    /// Walk tree as stream
246    pub fn walk_stream(
247        &self,
248        cid: Cid,
249        initial_path: String,
250    ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
251        Box::pin(stream::unfold(
252            WalkStreamState::Init {
253                cid,
254                path: initial_path,
255                tree: self,
256            },
257            |state| async move {
258                match state {
259                    WalkStreamState::Init { cid, path, tree } => {
260                        let data = match tree.store.get(&cid.hash).await {
261                            Ok(Some(d)) => d,
262                            Ok(None) => return None,
263                            Err(e) => {
264                                return Some((
265                                    Err(HashTreeError::Store(e.to_string())),
266                                    WalkStreamState::Done,
267                                ))
268                            }
269                        };
270
271                        // Decrypt if key is present
272                        let data = if let Some(key) = &cid.key {
273                            match decrypt_chk(&data, key) {
274                                Ok(d) => d,
275                                Err(e) => {
276                                    return Some((
277                                        Err(HashTreeError::Decryption(e.to_string())),
278                                        WalkStreamState::Done,
279                                    ))
280                                }
281                            }
282                        } else {
283                            data
284                        };
285
286                        let node = match try_decode_tree_node(&data) {
287                            Some(n) => n,
288                            None => {
289                                // Blob data
290                                let entry = WalkEntry {
291                                    path,
292                                    hash: cid.hash,
293                                    link_type: LinkType::Blob,
294                                    size: data.len() as u64,
295                                    key: cid.key,
296                                };
297                                return Some((Ok(entry), WalkStreamState::Done));
298                            }
299                        };
300
301                        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
302                        let entry = WalkEntry {
303                            path: path.clone(),
304                            hash: cid.hash,
305                            link_type: node.node_type,
306                            size: node_size,
307                            key: cid.key,
308                        };
309
310                        // Create stack with children to process
311                        let mut stack: Vec<WalkStackItem> = Vec::new();
312                        let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
313                        for link in node.links.into_iter().rev() {
314                            let is_internal = Self::is_internal_directory_link_with_legacy_fanout(
315                                &link,
316                                uses_legacy_fanout,
317                            );
318                            let child_path = match &link.name {
319                                Some(name) if !is_internal => {
320                                    if path.is_empty() {
321                                        name.clone()
322                                    } else {
323                                        format!("{}/{}", path, name)
324                                    }
325                                }
326                                _ => path.clone(),
327                            };
328                            // Child nodes use their own key from link
329                            stack.push(WalkStackItem {
330                                hash: link.hash,
331                                path: child_path,
332                                key: link.key,
333                            });
334                        }
335
336                        Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
337                    }
338                    WalkStreamState::Processing { mut stack, tree } => {
339                        tree.process_walk_stack(&mut stack).await
340                    }
341                    WalkStreamState::Done => None,
342                }
343            },
344        ))
345    }
346
347    async fn process_walk_stack<'a>(
348        &'a self,
349        stack: &mut Vec<WalkStackItem>,
350    ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
351        while let Some(item) = stack.pop() {
352            let data = match self.store.get(&item.hash).await {
353                Ok(Some(d)) => d,
354                Ok(None) => continue,
355                Err(e) => {
356                    return Some((
357                        Err(HashTreeError::Store(e.to_string())),
358                        WalkStreamState::Done,
359                    ))
360                }
361            };
362
363            let node = match try_decode_tree_node(&data) {
364                Some(n) => n,
365                None => {
366                    // Blob data
367                    let entry = WalkEntry {
368                        path: item.path,
369                        hash: item.hash,
370                        link_type: LinkType::Blob,
371                        size: data.len() as u64,
372                        key: item.key,
373                    };
374                    return Some((
375                        Ok(entry),
376                        WalkStreamState::Processing {
377                            stack: std::mem::take(stack),
378                            tree: self,
379                        },
380                    ));
381                }
382            };
383
384            let node_size: u64 = node.links.iter().map(|l| l.size).sum();
385            let entry = WalkEntry {
386                path: item.path.clone(),
387                hash: item.hash,
388                link_type: node.node_type,
389                size: node_size,
390                key: None, // directories are not encrypted
391            };
392
393            // Push children to stack
394            let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
395            for link in node.links.into_iter().rev() {
396                let is_internal =
397                    Self::is_internal_directory_link_with_legacy_fanout(&link, uses_legacy_fanout);
398                let child_path = match &link.name {
399                    Some(name) if !is_internal => {
400                        if item.path.is_empty() {
401                            name.clone()
402                        } else {
403                            format!("{}/{}", item.path, name)
404                        }
405                    }
406                    _ => item.path.clone(),
407                };
408                stack.push(WalkStackItem {
409                    hash: link.hash,
410                    path: child_path,
411                    key: link.key,
412                });
413            }
414
415            return Some((
416                Ok(entry),
417                WalkStreamState::Processing {
418                    stack: std::mem::take(stack),
419                    tree: self,
420                },
421            ));
422        }
423        None
424    }
425}
426
427struct WalkStackItem {
428    hash: Hash,
429    path: String,
430    key: Option<[u8; 32]>,
431}
432
433enum WalkStreamState<'a, S: Store> {
434    Init {
435        cid: Cid,
436        path: String,
437        tree: &'a HashTree<S>,
438    },
439    Processing {
440        stack: Vec<WalkStackItem>,
441        tree: &'a HashTree<S>,
442    },
443    Done,
444}