Skip to main content

hashtree_core/
hashtree.rs

1//! HashTree - Unified merkle tree operations
2//!
3//! Single struct for creating, reading, and editing content-addressed merkle trees.
4//! Mirrors the hashtree-ts HashTree class API.
5
6use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22/// HashTree configuration
23#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25    pub store: Arc<S>,
26    pub chunk_size: usize,
27    pub max_links: usize,
28    /// Whether to encrypt content (default: true when encryption feature enabled)
29    pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33    pub fn new(store: Arc<S>) -> Self {
34        Self {
35            store,
36            chunk_size: DEFAULT_CHUNK_SIZE,
37            max_links: DEFAULT_MAX_LINKS,
38            encrypted: true,
39        }
40    }
41
42    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43        self.chunk_size = chunk_size;
44        self
45    }
46
47    pub fn with_max_links(mut self, max_links: usize) -> Self {
48        self.max_links = max_links;
49        self
50    }
51
52    /// Disable encryption (store content publicly)
53    pub fn public(mut self) -> Self {
54        self.encrypted = false;
55        self
56    }
57}
58
59/// HashTree error type
60#[derive(Debug, thiserror::Error)]
61pub enum HashTreeError {
62    #[error("Store error: {0}")]
63    Store(String),
64    #[error("Codec error: {0}")]
65    Codec(#[from] crate::codec::CodecError),
66    #[error("Missing chunk: {0}")]
67    MissingChunk(String),
68    #[error("Path not found: {0}")]
69    PathNotFound(String),
70    #[error("Entry not found: {0}")]
71    EntryNotFound(String),
72    #[error("Encryption error: {0}")]
73    Encryption(String),
74    #[error("Decryption error: {0}")]
75    Decryption(String),
76}
77
78impl From<BuilderError> for HashTreeError {
79    fn from(e: BuilderError) -> Self {
80        match e {
81            BuilderError::Store(s) => HashTreeError::Store(s),
82            BuilderError::Codec(c) => HashTreeError::Codec(c),
83            BuilderError::Encryption(s) => HashTreeError::Encryption(s),
84        }
85    }
86}
87
88impl From<ReaderError> for HashTreeError {
89    fn from(e: ReaderError) -> Self {
90        match e {
91            ReaderError::Store(s) => HashTreeError::Store(s),
92            ReaderError::Codec(c) => HashTreeError::Codec(c),
93            ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
94            ReaderError::Decryption(s) => HashTreeError::Encryption(s),
95            ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
96        }
97    }
98}
99
100/// HashTree - unified create, read, and edit merkle tree operations
101pub struct HashTree<S: Store> {
102    store: Arc<S>,
103    chunk_size: usize,
104    max_links: usize,
105    encrypted: bool,
106}
107
108impl<S: Store> HashTree<S> {
109    pub fn new(config: HashTreeConfig<S>) -> Self {
110        Self {
111            store: config.store,
112            chunk_size: config.chunk_size,
113            max_links: config.max_links,
114            encrypted: config.encrypted,
115        }
116    }
117
118    /// Check if encryption is enabled
119    pub fn is_encrypted(&self) -> bool {
120        self.encrypted
121    }
122
123    // ============ UNIFIED API ============
124
125    /// Store content, returns (Cid, size) where Cid is hash + optional key
126    /// Encrypts by default when encryption feature is enabled
127    pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
128        let size = data.len() as u64;
129
130        // Small data - store as single chunk
131        if data.len() <= self.chunk_size {
132            let (hash, key) = self.put_chunk_internal(data).await?;
133            return Ok((Cid { hash, key }, size));
134        }
135
136        // Large data - chunk it
137        let mut links: Vec<Link> = Vec::new();
138        let mut offset = 0;
139
140        while offset < data.len() {
141            let end = (offset + self.chunk_size).min(data.len());
142            let chunk = &data[offset..end];
143            let chunk_size = chunk.len() as u64;
144            let (hash, key) = self.put_chunk_internal(chunk).await?;
145            links.push(Link {
146                hash,
147                name: None,
148                size: chunk_size,
149                key,
150                link_type: LinkType::Blob, // Leaf chunk (raw blob)
151                meta: None,
152            });
153            offset = end;
154        }
155
156        // Build tree from chunks
157        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
158        Ok((Cid { hash: root_hash, key: root_key }, size))
159    }
160
161    /// Get content by Cid (handles decryption automatically)
162    pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
163        if let Some(key) = cid.key {
164            self.get_encrypted(&cid.hash, &key).await
165        } else {
166            self.read_file(&cid.hash).await
167        }
168    }
169
170    /// Store content from an async reader (streaming put)
171    ///
172    /// Reads data in chunks and builds a merkle tree incrementally.
173    /// Useful for large files or streaming data sources.
174    /// Returns (Cid, size) where Cid is hash + optional key
175    pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<(Cid, u64), HashTreeError> {
176        let mut buffer = vec![0u8; self.chunk_size];
177        let mut links = Vec::new();
178        let mut total_size: u64 = 0;
179        let mut consistent_key: Option<[u8; 32]> = None;
180
181        loop {
182            let mut chunk = Vec::new();
183            let mut bytes_read = 0;
184
185            // Read until we have a full chunk or EOF
186            while bytes_read < self.chunk_size {
187                let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
188                    .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
189                if n == 0 {
190                    break; // EOF
191                }
192                chunk.extend_from_slice(&buffer[..n]);
193                bytes_read += n;
194            }
195
196            if chunk.is_empty() {
197                break; // No more data
198            }
199
200            let chunk_len = chunk.len() as u64;
201            total_size += chunk_len;
202
203            let (hash, key) = self.put_chunk_internal(&chunk).await?;
204
205            // Track consistent key for single-key result
206            if links.is_empty() {
207                consistent_key = key;
208            } else if consistent_key != key {
209                consistent_key = None;
210            }
211
212            links.push(Link {
213                hash,
214                name: None,
215                size: chunk_len,
216                key,
217                link_type: LinkType::Blob, // Leaf chunk (raw blob)
218                meta: None,
219            });
220        }
221
222        if links.is_empty() {
223            // Empty input
224            let (hash, key) = self.put_chunk_internal(&[]).await?;
225            return Ok((Cid { hash, key }, 0));
226        }
227
228        // Build tree from chunks
229        let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
230        Ok((Cid { hash: root_hash, key: root_key }, total_size))
231    }
232
233    /// Read content as a stream of chunks by Cid (handles decryption automatically)
234    ///
235    /// Returns an async stream that yields chunks as they are read.
236    /// Useful for large files or when you want to process data incrementally.
237    pub fn get_stream(
238        &self,
239        cid: &Cid,
240    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
241        let hash = cid.hash;
242        let key = cid.key;
243
244        if let Some(k) = key {
245            // Encrypted stream
246            Box::pin(self.read_file_stream_encrypted(hash, k))
247        } else {
248            // Unencrypted stream
249            self.read_file_stream(hash)
250        }
251    }
252
253    /// Read encrypted file as stream (internal)
254    fn read_file_stream_encrypted(
255        &self,
256        hash: Hash,
257        key: EncryptionKey,
258    ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
259        stream::unfold(
260            EncryptedStreamState::Init { hash, key, tree: self },
261            |state| async move {
262                match state {
263                    EncryptedStreamState::Init { hash, key, tree } => {
264                        let data = match tree.store.get(&hash).await {
265                            Ok(Some(d)) => d,
266                            Ok(None) => return None,
267                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
268                        };
269
270                        // Try to decrypt
271                        let decrypted = match decrypt_chk(&data, &key) {
272                            Ok(d) => d,
273                            Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
274                        };
275
276                        if !is_tree_node(&decrypted) {
277                            // Single blob - yield decrypted data
278                            return Some((Ok(decrypted), EncryptedStreamState::Done));
279                        }
280
281                        // Tree node - parse and traverse
282                        let node = match decode_tree_node(&decrypted) {
283                            Ok(n) => n,
284                            Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
285                        };
286
287                        let mut stack: Vec<EncryptedStackItem> = Vec::new();
288                        for link in node.links.into_iter().rev() {
289                            stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
290                        }
291
292                        tree.process_encrypted_stream_stack(&mut stack).await
293                    }
294                    EncryptedStreamState::Processing { mut stack, tree } => {
295                        tree.process_encrypted_stream_stack(&mut stack).await
296                    }
297                    EncryptedStreamState::Done => None,
298                }
299            },
300        )
301    }
302
303    async fn process_encrypted_stream_stack<'a>(
304        &'a self,
305        stack: &mut Vec<EncryptedStackItem>,
306    ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
307        while let Some(item) = stack.pop() {
308            let data = match self.store.get(&item.hash).await {
309                Ok(Some(d)) => d,
310                Ok(None) => {
311                    return Some((
312                        Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
313                        EncryptedStreamState::Done,
314                    ))
315                }
316                Err(e) => {
317                    return Some((
318                        Err(HashTreeError::Store(e.to_string())),
319                        EncryptedStreamState::Done,
320                    ))
321                }
322            };
323
324            // Decrypt if we have a key
325            let decrypted = if let Some(key) = item.key {
326                match decrypt_chk(&data, &key) {
327                    Ok(d) => d,
328                    Err(e) => {
329                        return Some((
330                            Err(HashTreeError::Decryption(e.to_string())),
331                            EncryptedStreamState::Done,
332                        ))
333                    }
334                }
335            } else {
336                data
337            };
338
339            if is_tree_node(&decrypted) {
340                // Nested tree node - add children to stack
341                let node = match decode_tree_node(&decrypted) {
342                    Ok(n) => n,
343                    Err(e) => {
344                        return Some((
345                            Err(HashTreeError::Codec(e)),
346                            EncryptedStreamState::Done,
347                        ))
348                    }
349                };
350                for link in node.links.into_iter().rev() {
351                    stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
352                }
353            } else {
354                // Leaf chunk - yield decrypted data
355                return Some((
356                    Ok(decrypted),
357                    EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
358                ));
359            }
360        }
361        None
362    }
363
364    /// Store a chunk with optional encryption
365    async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
366        if self.encrypted {
367            let (encrypted, key) = encrypt_chk(data)
368                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
369            let hash = sha256(&encrypted);
370            self.store
371                .put(hash, encrypted)
372                .await
373                .map_err(|e| HashTreeError::Store(e.to_string()))?;
374            Ok((hash, Some(key)))
375        } else {
376            let hash = self.put_blob(data).await?;
377            Ok((hash, None))
378        }
379    }
380
381    /// Build tree and return (hash, optional_key)
382    async fn build_tree_internal(
383        &self,
384        links: Vec<Link>,
385        total_size: Option<u64>,
386    ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
387        // Single link with matching size - return directly
388        if links.len() == 1 {
389            if let Some(ts) = total_size {
390                if links[0].size == ts {
391                    return Ok((links[0].hash, links[0].key));
392                }
393            }
394        }
395
396        if links.len() <= self.max_links {
397            let node = TreeNode {
398                node_type: LinkType::File,
399                links,
400            };
401            let (data, _) = encode_and_hash(&node)?;
402
403            if self.encrypted {
404                let (encrypted, key) = encrypt_chk(&data)
405                    .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
406                let hash = sha256(&encrypted);
407                self.store
408                    .put(hash, encrypted)
409                    .await
410                    .map_err(|e| HashTreeError::Store(e.to_string()))?;
411                return Ok((hash, Some(key)));
412            }
413
414            // Unencrypted path
415            let hash = sha256(&data);
416            self.store
417                .put(hash, data)
418                .await
419                .map_err(|e| HashTreeError::Store(e.to_string()))?;
420            return Ok((hash, None));
421        }
422
423        // Too many links - create subtrees
424        let mut sub_links = Vec::new();
425        for batch in links.chunks(self.max_links) {
426            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
427            let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
428            sub_links.push(Link {
429                hash,
430                name: None,
431                size: batch_size,
432                key,
433                link_type: LinkType::File, // Internal tree node
434                meta: None,
435            });
436        }
437
438        Box::pin(self.build_tree_internal(sub_links, total_size)).await
439    }
440
441    /// Get encrypted content by hash and key
442    async fn get_encrypted(
443        &self,
444        hash: &Hash,
445        key: &EncryptionKey,
446    ) -> Result<Option<Vec<u8>>, HashTreeError> {
447        let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
448            Some(d) => d,
449            None => return Ok(None),
450        };
451
452        // Decrypt the data
453        let decrypted = decrypt_chk(&encrypted_data, key)
454            .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
455
456        // Check if it's a tree node
457        if is_tree_node(&decrypted) {
458            let node = decode_tree_node(&decrypted)?;
459            let assembled = self.assemble_encrypted_chunks(&node).await?;
460            return Ok(Some(assembled));
461        }
462
463        // Single chunk data
464        Ok(Some(decrypted))
465    }
466
467    /// Assemble encrypted chunks from tree
468    async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
469        let mut parts: Vec<Vec<u8>> = Vec::new();
470
471        for link in &node.links {
472            let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
473
474            let encrypted_child = self
475                .store
476                .get(&link.hash)
477                .await
478                .map_err(|e| HashTreeError::Store(e.to_string()))?
479                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
480
481            let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
482                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
483
484            if is_tree_node(&decrypted) {
485                // Intermediate tree node - recurse
486                let child_node = decode_tree_node(&decrypted)?;
487                let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
488                parts.push(child_data);
489            } else {
490                // Leaf data chunk
491                parts.push(decrypted);
492            }
493        }
494
495        let total_len: usize = parts.iter().map(|p| p.len()).sum();
496        let mut result = Vec::with_capacity(total_len);
497        for part in parts {
498            result.extend_from_slice(&part);
499        }
500
501        Ok(result)
502    }
503
504    // ============ LOW-LEVEL CREATE ============
505
506    /// Store a blob directly (small data, no encryption)
507    /// Returns the content hash
508    pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
509        let hash = sha256(data);
510        self.store
511            .put(hash, data.to_vec())
512            .await
513            .map_err(|e| HashTreeError::Store(e.to_string()))?;
514        Ok(hash)
515    }
516
517    /// Store a file, chunking if necessary
518    /// Returns (Cid, size) where Cid is hash + optional key
519    pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
520        let size = data.len() as u64;
521
522        // Small file - store as single chunk
523        if data.len() <= self.chunk_size {
524            let (hash, key) = self.put_chunk_internal(data).await?;
525            return Ok((Cid { hash, key }, size));
526        }
527
528        // Large file - chunk it
529        let mut links: Vec<Link> = Vec::new();
530        let mut offset = 0;
531
532        while offset < data.len() {
533            let end = (offset + self.chunk_size).min(data.len());
534            let chunk = &data[offset..end];
535            let chunk_size = (end - offset) as u64;
536
537            let (hash, key) = self.put_chunk_internal(chunk).await?;
538            links.push(Link {
539                hash,
540                name: None,
541                size: chunk_size,
542                key,
543                link_type: LinkType::Blob, // Leaf chunk
544                meta: None,
545            });
546            offset = end;
547        }
548
549        // Build tree from chunks (uses encryption if enabled)
550        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
551        Ok((Cid { hash: root_hash, key: root_key }, size))
552    }
553
554    /// Build a directory from entries
555    /// Returns Cid with key if encrypted
556    ///
557    /// For large directories, the messagepack-encoded TreeNode is stored via put()
558    /// which automatically chunks the data. The reader uses read_file() to reassemble.
559    pub async fn put_directory(
560        &self,
561        entries: Vec<DirEntry>,
562    ) -> Result<Cid, HashTreeError> {
563        // Sort entries by name for deterministic hashing
564        let mut sorted = entries;
565        sorted.sort_by(|a, b| a.name.cmp(&b.name));
566
567        let links: Vec<Link> = sorted
568            .into_iter()
569            .map(|e| Link {
570                hash: e.hash,
571                name: Some(e.name),
572                size: e.size,
573                key: e.key,
574                link_type: e.link_type,
575                meta: e.meta,
576            })
577            .collect();
578
579        // Create the directory node with all entries
580        let node = TreeNode {
581            node_type: LinkType::Dir,
582            links,
583        };
584        let (data, _plain_hash) = encode_and_hash(&node)?;
585
586        // Store directory data via put() - handles both small and large directories
587        // For small dirs, stores as single chunk
588        // For large dirs, chunks transparently via build_tree()
589        // Reader uses read_file() to reassemble before decoding
590        let (cid, _size) = self.put(&data).await?;
591        Ok(cid)
592    }
593
594    /// Create a tree node with custom links
595    pub async fn put_tree_node(
596        &self,
597        links: Vec<Link>,
598    ) -> Result<Hash, HashTreeError> {
599        let node = TreeNode {
600            node_type: LinkType::Dir,
601            links,
602        };
603
604        let (data, hash) = encode_and_hash(&node)?;
605        self.store
606            .put(hash, data)
607            .await
608            .map_err(|e| HashTreeError::Store(e.to_string()))?;
609        Ok(hash)
610    }
611
612    // ============ READ ============
613
614    /// Get raw data by hash
615    pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
616        self.store
617            .get(hash)
618            .await
619            .map_err(|e| HashTreeError::Store(e.to_string()))
620    }
621
622    /// Get and decode a tree node (unencrypted)
623    pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
624        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
625            Some(d) => d,
626            None => return Ok(None),
627        };
628
629        if !is_tree_node(&data) {
630            return Ok(None);
631        }
632
633        let node = decode_tree_node(&data)?;
634        Ok(Some(node))
635    }
636
637    /// Get and decode a tree node using Cid (with decryption if key present)
638    pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
639        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
640            Some(d) => d,
641            None => return Ok(None),
642        };
643
644        // Decrypt if key is present
645        let decrypted = if let Some(key) = &cid.key {
646            decrypt_chk(&data, key)
647                .map_err(|e| HashTreeError::Decryption(e.to_string()))?
648        } else {
649            data
650        };
651
652        if !is_tree_node(&decrypted) {
653            return Ok(None);
654        }
655
656        let node = decode_tree_node(&decrypted)?;
657        Ok(Some(node))
658    }
659
660    /// Get directory node, handling chunked directory data
661    /// Use this when you know the target is a directory (from parent link_type)
662    pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
663        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
664            Some(d) => d,
665            None => return Ok(None),
666        };
667
668        // Decrypt if key is present
669        let decrypted = if let Some(key) = &cid.key {
670            decrypt_chk(&data, key)
671                .map_err(|e| HashTreeError::Decryption(e.to_string()))?
672        } else {
673            data
674        };
675
676        if !is_tree_node(&decrypted) {
677            return Ok(None);
678        }
679
680        let node = decode_tree_node(&decrypted)?;
681
682        // If this is a file tree (chunked data), reassemble to get actual directory
683        if node.node_type == LinkType::File {
684            let assembled = self.assemble_chunks(&node).await?;
685            if is_tree_node(&assembled) {
686                let inner_node = decode_tree_node(&assembled)?;
687                return Ok(Some(inner_node));
688            }
689        }
690
691        Ok(Some(node))
692    }
693
694    /// Check if hash points to a tree node (no decryption)
695    pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
696        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
697            Some(d) => d,
698            None => return Ok(false),
699        };
700        Ok(is_tree_node(&data))
701    }
702
703    /// Check if Cid points to a directory (with decryption)
704    pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
705        let node = match self.get_node(cid).await? {
706            Some(n) => n,
707            None => return Ok(false),
708        };
709        // Directory has named links (not just internal chunks)
710        Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
711    }
712
713    /// Check if hash points to a directory (tree with named links, no decryption)
714    pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
715        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
716            Some(d) => d,
717            None => return Ok(false),
718        };
719        Ok(is_directory_node(&data))
720    }
721
722    /// Read a complete file (reassemble chunks if needed)
723    pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
724        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
725            Some(d) => d,
726            None => return Ok(None),
727        };
728
729        // Check if it's a tree (chunked file) or raw blob
730        if !is_tree_node(&data) {
731            return Ok(Some(data));
732        }
733
734        // It's a tree - reassemble chunks
735        let node = decode_tree_node(&data)?;
736        let assembled = self.assemble_chunks(&node).await?;
737        Ok(Some(assembled))
738    }
739
740    /// Read a byte range from a file (fetches only necessary chunks)
741    ///
742    /// - `start`: Starting byte offset (inclusive)
743    /// - `end`: Ending byte offset (exclusive), or None to read to end
744    ///
745    /// This is more efficient than read_file() for partial reads of large files.
746    pub async fn read_file_range(
747        &self,
748        hash: &Hash,
749        start: u64,
750        end: Option<u64>,
751    ) -> Result<Option<Vec<u8>>, HashTreeError> {
752        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
753            Some(d) => d,
754            None => return Ok(None),
755        };
756
757        // Single blob - just slice it
758        if !is_tree_node(&data) {
759            let start_idx = start as usize;
760            let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
761            if start_idx >= data.len() {
762                return Ok(Some(vec![]));
763            }
764            let end_idx = end_idx.min(data.len());
765            return Ok(Some(data[start_idx..end_idx].to_vec()));
766        }
767
768        // It's a chunked file - fetch only needed chunks
769        let node = decode_tree_node(&data)?;
770        let range_data = self.assemble_chunks_range(&node, start, end).await?;
771        Ok(Some(range_data))
772    }
773
774    /// Read a byte range from a file using a Cid (handles decryption if key present)
775    pub async fn read_file_range_cid(
776        &self,
777        cid: &Cid,
778        start: u64,
779        end: Option<u64>,
780    ) -> Result<Option<Vec<u8>>, HashTreeError> {
781        if let Some(_key) = cid.key {
782            let data = match self.get(cid).await? {
783                Some(d) => d,
784                None => return Ok(None),
785            };
786            let start_idx = start as usize;
787            let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
788            if start_idx >= data.len() {
789                return Ok(Some(vec![]));
790            }
791            let end_idx = end_idx.min(data.len());
792            return Ok(Some(data[start_idx..end_idx].to_vec()));
793        }
794
795        self.read_file_range(&cid.hash, start, end).await
796    }
797
798    /// Assemble only the chunks needed for a byte range
799    async fn assemble_chunks_range(
800        &self,
801        node: &TreeNode,
802        start: u64,
803        end: Option<u64>,
804    ) -> Result<Vec<u8>, HashTreeError> {
805        // First, flatten the tree to get all leaf chunks with their byte offsets
806        let chunks_info = self.collect_chunk_offsets(node).await?;
807
808        if chunks_info.is_empty() {
809            return Ok(vec![]);
810        }
811
812        // Calculate total size and actual end
813        let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
814        let actual_end = end.unwrap_or(total_size).min(total_size);
815
816        if start >= actual_end {
817            return Ok(vec![]);
818        }
819
820        // Find chunks that overlap with [start, actual_end)
821        let mut result = Vec::with_capacity((actual_end - start) as usize);
822        let mut current_offset = 0u64;
823
824        for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
825            let chunk_start = current_offset;
826            let chunk_end = current_offset + chunk_size;
827
828            // Check if this chunk overlaps with our range
829            if chunk_end > start && chunk_start < actual_end {
830                // Fetch this chunk
831                let chunk_data = self
832                    .store
833                    .get(chunk_hash)
834                    .await
835                    .map_err(|e| HashTreeError::Store(e.to_string()))?
836                    .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
837
838                // Calculate slice bounds within this chunk
839                let slice_start = if start > chunk_start {
840                    (start - chunk_start) as usize
841                } else {
842                    0
843                };
844                let slice_end = if actual_end < chunk_end {
845                    (actual_end - chunk_start) as usize
846                } else {
847                    chunk_data.len()
848                };
849
850                result.extend_from_slice(&chunk_data[slice_start..slice_end]);
851            }
852
853            current_offset = chunk_end;
854
855            // Early exit if we've passed the requested range
856            if current_offset >= actual_end {
857                break;
858            }
859        }
860
861        Ok(result)
862    }
863
864    /// Collect all leaf chunk hashes with their byte offsets
865    /// Returns Vec<(hash, offset, size)>
866    async fn collect_chunk_offsets(
867        &self,
868        node: &TreeNode,
869    ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
870        let mut chunks = Vec::new();
871        let mut offset = 0u64;
872        self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset).await?;
873        Ok(chunks)
874    }
875
876    async fn collect_chunk_offsets_recursive(
877        &self,
878        node: &TreeNode,
879        chunks: &mut Vec<(Hash, u64, u64)>,
880        offset: &mut u64,
881    ) -> Result<(), HashTreeError> {
882        for link in &node.links {
883            let child_data = self
884                .store
885                .get(&link.hash)
886                .await
887                .map_err(|e| HashTreeError::Store(e.to_string()))?
888                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
889
890            if is_tree_node(&child_data) {
891                // Intermediate node - recurse
892                let child_node = decode_tree_node(&child_data)?;
893                Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
894            } else {
895                // Leaf chunk
896                let size = child_data.len() as u64;
897                chunks.push((link.hash, *offset, size));
898                *offset += size;
899            }
900        }
901        Ok(())
902    }
903
904    /// Recursively assemble chunks from tree
905    async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
906        let mut parts: Vec<Vec<u8>> = Vec::new();
907
908        for link in &node.links {
909            let child_data = self
910                .store
911                .get(&link.hash)
912                .await
913                .map_err(|e| HashTreeError::Store(e.to_string()))?
914                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
915
916            if is_tree_node(&child_data) {
917                let child_node = decode_tree_node(&child_data)?;
918                parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
919            } else {
920                parts.push(child_data);
921            }
922        }
923
924        // Concatenate all parts
925        let total_length: usize = parts.iter().map(|p| p.len()).sum();
926        let mut result = Vec::with_capacity(total_length);
927        for part in parts {
928            result.extend_from_slice(&part);
929        }
930
931        Ok(result)
932    }
933
934    /// Read a file as stream of chunks
935    /// Returns an async stream that yields chunks as they are read
936    pub fn read_file_stream(
937        &self,
938        hash: Hash,
939    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
940        Box::pin(stream::unfold(
941            ReadStreamState::Init { hash, tree: self },
942            |state| async move {
943                match state {
944                    ReadStreamState::Init { hash, tree } => {
945                        let data = match tree.store.get(&hash).await {
946                            Ok(Some(d)) => d,
947                            Ok(None) => return None,
948                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
949                        };
950
951                        if !is_tree_node(&data) {
952                            // Single blob - yield it and finish
953                            return Some((Ok(data), ReadStreamState::Done));
954                        }
955
956                        // Tree node - start streaming chunks
957                        let node = match decode_tree_node(&data) {
958                            Ok(n) => n,
959                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
960                        };
961
962                        // Create stack with all links to process
963                        let mut stack: Vec<StreamStackItem> = Vec::new();
964                        for link in node.links.into_iter().rev() {
965                            stack.push(StreamStackItem::Hash(link.hash));
966                        }
967
968                        // Process first item
969                        tree.process_stream_stack(&mut stack).await
970                    }
971                    ReadStreamState::Processing { mut stack, tree } => {
972                        tree.process_stream_stack(&mut stack).await
973                    }
974                    ReadStreamState::Done => None,
975                }
976            },
977        ))
978    }
979
980    async fn process_stream_stack<'a>(
981        &'a self,
982        stack: &mut Vec<StreamStackItem>,
983    ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
984        while let Some(item) = stack.pop() {
985            match item {
986                StreamStackItem::Hash(hash) => {
987                    let data = match self.store.get(&hash).await {
988                        Ok(Some(d)) => d,
989                        Ok(None) => {
990                            return Some((
991                                Err(HashTreeError::MissingChunk(to_hex(&hash))),
992                                ReadStreamState::Done,
993                            ))
994                        }
995                        Err(e) => {
996                            return Some((
997                                Err(HashTreeError::Store(e.to_string())),
998                                ReadStreamState::Done,
999                            ))
1000                        }
1001                    };
1002
1003                    if is_tree_node(&data) {
1004                        // Nested tree - push its children to stack
1005                        let node = match decode_tree_node(&data) {
1006                            Ok(n) => n,
1007                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
1008                        };
1009                        for link in node.links.into_iter().rev() {
1010                            stack.push(StreamStackItem::Hash(link.hash));
1011                        }
1012                    } else {
1013                        // Leaf blob - yield it
1014                        return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1015                    }
1016                }
1017            }
1018        }
1019        None
1020    }
1021
1022    /// Read file chunks as Vec (non-streaming version)
1023    pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1024        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1025            Some(d) => d,
1026            None => return Ok(vec![]),
1027        };
1028
1029        if !is_tree_node(&data) {
1030            return Ok(vec![data]);
1031        }
1032
1033        let node = decode_tree_node(&data)?;
1034        self.collect_chunks(&node).await
1035    }
1036
1037    async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1038        let mut chunks = Vec::new();
1039
1040        for link in &node.links {
1041            let child_data = self
1042                .store
1043                .get(&link.hash)
1044                .await
1045                .map_err(|e| HashTreeError::Store(e.to_string()))?
1046                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1047
1048            if is_tree_node(&child_data) {
1049                let child_node = decode_tree_node(&child_data)?;
1050                chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1051            } else {
1052                chunks.push(child_data);
1053            }
1054        }
1055
1056        Ok(chunks)
1057    }
1058
1059    /// List directory entries (Cid-based, supports encrypted directories)
1060    pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1061        let node = match self.get_node(cid).await? {
1062            Some(n) => n,
1063            None => return Ok(vec![]),
1064        };
1065
1066        let mut entries = Vec::new();
1067
1068        for link in &node.links {
1069            // Skip internal chunk nodes - recurse into them
1070            if let Some(ref name) = link.name {
1071                if name.starts_with("_chunk_") || name.starts_with('_') {
1072                    let chunk_cid = Cid { hash: link.hash, key: link.key };
1073                    let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1074                    entries.extend(sub_entries);
1075                    continue;
1076                }
1077            }
1078
1079            entries.push(TreeEntry {
1080                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1081                hash: link.hash,
1082                size: link.size,
1083                link_type: link.link_type,
1084                key: link.key,
1085                meta: link.meta.clone(),
1086            });
1087        }
1088
1089        Ok(entries)
1090    }
1091
1092    /// List directory entries using Cid (with decryption if key present)
1093    /// Handles both regular and chunked directory data
1094    pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1095        // Use get_directory_node which handles chunked directory data
1096        let node = match self.get_directory_node(cid).await? {
1097            Some(n) => n,
1098            None => return Ok(vec![]),
1099        };
1100
1101        let mut entries = Vec::new();
1102
1103        for link in &node.links {
1104            // Skip internal chunk nodes (backwards compat with old _chunk_ format)
1105            if let Some(ref name) = link.name {
1106                if name.starts_with("_chunk_") || name.starts_with('_') {
1107                    // Internal nodes inherit parent's key for decryption
1108                    let sub_cid = Cid { hash: link.hash, key: cid.key };
1109                    let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1110                    entries.extend(sub_entries);
1111                    continue;
1112                }
1113            }
1114
1115            entries.push(TreeEntry {
1116                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1117                hash: link.hash,
1118                size: link.size,
1119                link_type: link.link_type,
1120                key: link.key,
1121                meta: link.meta.clone(),
1122            });
1123        }
1124
1125        Ok(entries)
1126    }
1127
1128    /// Resolve a path within a tree (returns Cid with key if encrypted)
1129    pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1130        let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1131        if parts.is_empty() {
1132            return Ok(Some(cid.clone()));
1133        }
1134
1135        let mut current_cid = cid.clone();
1136
1137        for part in parts {
1138            // Use get_directory_node which handles chunked directory data
1139            let node = match self.get_directory_node(&current_cid).await? {
1140                Some(n) => n,
1141                None => return Ok(None),
1142            };
1143
1144            if let Some(link) = self.find_link(&node, part) {
1145                current_cid = Cid {
1146                    hash: link.hash,
1147                    key: link.key,
1148                };
1149            } else {
1150                // Check internal nodes
1151                match self.find_link_in_subtrees_cid(&node, part, &current_cid).await? {
1152                    Some(link) => {
1153                        current_cid = Cid {
1154                            hash: link.hash,
1155                            key: link.key,
1156                        };
1157                    }
1158                    None => return Ok(None),
1159                }
1160            }
1161        }
1162
1163        Ok(Some(current_cid))
1164    }
1165
1166    /// Resolve a path within a tree using Cid (with decryption if key present)
1167    pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1168        self.resolve(cid, path).await
1169    }
1170
1171    fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1172        node.links
1173            .iter()
1174            .find(|l| l.name.as_deref() == Some(name))
1175            .cloned()
1176    }
1177
1178    /// Find a link in subtrees using Cid (with decryption support)
1179    async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1180        for link in &node.links {
1181            if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1182                continue;
1183            }
1184
1185            // Internal nodes inherit encryption from parent context
1186            let sub_cid = Cid {
1187                hash: link.hash,
1188                key: link.key.clone(),
1189            };
1190
1191            let sub_node = match self.get_node(&sub_cid).await? {
1192                Some(n) => n,
1193                None => continue,
1194            };
1195
1196            if let Some(found) = self.find_link(&sub_node, name) {
1197                return Ok(Some(found));
1198            }
1199
1200            if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1201                return Ok(Some(deep_found));
1202            }
1203        }
1204
1205        Ok(None)
1206    }
1207
1208    /// Get total size of a tree
1209    pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1210        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1211            Some(d) => d,
1212            None => return Ok(0),
1213        };
1214
1215        if !is_tree_node(&data) {
1216            return Ok(data.len() as u64);
1217        }
1218
1219        let node = decode_tree_node(&data)?;
1220        // Calculate from children
1221        let mut total = 0u64;
1222        for link in &node.links {
1223            total += link.size;
1224        }
1225        Ok(total)
1226    }
1227
1228    /// Get total size using a Cid (handles decryption if key present)
1229    pub async fn get_size_cid(&self, cid: &Cid) -> Result<u64, HashTreeError> {
1230        if cid.key.is_some() {
1231            let data = match self.get(cid).await? {
1232                Some(d) => d,
1233                None => return Ok(0),
1234            };
1235            return Ok(data.len() as u64);
1236        }
1237
1238        self.get_size(&cid.hash).await
1239    }
1240
1241    /// Walk entire tree depth-first (returns Vec)
1242    pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1243        let mut entries = Vec::new();
1244        self.walk_recursive(cid, path, &mut entries).await?;
1245        Ok(entries)
1246    }
1247
1248    async fn walk_recursive(
1249        &self,
1250        cid: &Cid,
1251        path: &str,
1252        entries: &mut Vec<WalkEntry>,
1253    ) -> Result<(), HashTreeError> {
1254        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1255            Some(d) => d,
1256            None => return Ok(()),
1257        };
1258
1259        // Decrypt if key is present
1260        let data = if let Some(key) = &cid.key {
1261            decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1262        } else {
1263            data
1264        };
1265
1266        let node = match try_decode_tree_node(&data) {
1267            Some(n) => n,
1268            None => {
1269                entries.push(WalkEntry {
1270                    path: path.to_string(),
1271                    hash: cid.hash,
1272                    link_type: LinkType::Blob,
1273                    size: data.len() as u64,
1274                    key: cid.key,
1275                });
1276                return Ok(());
1277            }
1278        };
1279
1280        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1281        entries.push(WalkEntry {
1282            path: path.to_string(),
1283            hash: cid.hash,
1284            link_type: node.node_type,
1285            size: node_size,
1286            key: cid.key,
1287        });
1288
1289        for link in &node.links {
1290            let child_path = match &link.name {
1291                Some(name) => {
1292                    if name.starts_with("_chunk_") || name.starts_with('_') {
1293                        // Internal nodes inherit parent's key
1294                        let sub_cid = Cid { hash: link.hash, key: cid.key };
1295                        Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1296                        continue;
1297                    }
1298                    if path.is_empty() {
1299                        name.clone()
1300                    } else {
1301                        format!("{}/{}", path, name)
1302                    }
1303                }
1304                None => path.to_string(),
1305            };
1306
1307            // Child nodes use their own key from link
1308            let child_cid = Cid { hash: link.hash, key: link.key };
1309            Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1310        }
1311
1312        Ok(())
1313    }
1314
1315    /// Walk entire tree with parallel fetching
1316    /// Uses a work-stealing approach: always keeps `concurrency` requests in flight
1317    pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1318        self.walk_parallel_with_progress(cid, path, concurrency, None).await
1319    }
1320
1321    /// Walk entire tree with parallel fetching and optional progress counter
1322    /// The counter is incremented for each node fetched (not just entries found)
1323    ///
1324    /// OPTIMIZATION: Blobs are NOT fetched - their metadata (hash, size, link_type)
1325    /// comes from the parent node's link, so we just add them directly to entries.
1326    /// This avoids downloading file contents during tree traversal.
1327    pub async fn walk_parallel_with_progress(
1328        &self,
1329        cid: &Cid,
1330        path: &str,
1331        concurrency: usize,
1332        progress: Option<&std::sync::atomic::AtomicUsize>,
1333    ) -> Result<Vec<WalkEntry>, HashTreeError> {
1334        use futures::stream::{FuturesUnordered, StreamExt};
1335        use std::collections::VecDeque;
1336        use std::sync::atomic::Ordering;
1337
1338        let mut entries = Vec::new();
1339        let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1340        let mut active = FuturesUnordered::new();
1341
1342        // Seed with root
1343        pending.push_back((cid.clone(), path.to_string()));
1344
1345        loop {
1346            // Fill up to concurrency limit from pending queue
1347            while active.len() < concurrency {
1348                if let Some((node_cid, node_path)) = pending.pop_front() {
1349                    let store = &self.store;
1350                    let fut = async move {
1351                        let data = store.get(&node_cid.hash).await
1352                            .map_err(|e| HashTreeError::Store(e.to_string()))?;
1353                        Ok::<_, HashTreeError>((node_cid, node_path, data))
1354                    };
1355                    active.push(fut);
1356                } else {
1357                    break;
1358                }
1359            }
1360
1361            // If nothing active, we're done
1362            if active.is_empty() {
1363                break;
1364            }
1365
1366            // Wait for any future to complete
1367            if let Some(result) = active.next().await {
1368                let (node_cid, node_path, data) = result?;
1369
1370                // Update progress counter
1371                if let Some(counter) = progress {
1372                    counter.fetch_add(1, Ordering::Relaxed);
1373                }
1374
1375                let data = match data {
1376                    Some(d) => d,
1377                    None => continue,
1378                };
1379
1380                // Decrypt if key is present
1381                let data = if let Some(key) = &node_cid.key {
1382                    decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1383                } else {
1384                    data
1385                };
1386
1387                let node = match try_decode_tree_node(&data) {
1388                    Some(n) => n,
1389                    None => {
1390                        // It's a blob/file - this case only happens for root
1391                        entries.push(WalkEntry {
1392                            path: node_path,
1393                            hash: node_cid.hash,
1394                            link_type: LinkType::Blob,
1395                            size: data.len() as u64,
1396                            key: node_cid.key,
1397                        });
1398                        continue;
1399                    }
1400                };
1401
1402                // It's a directory/file node
1403                let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1404                entries.push(WalkEntry {
1405                    path: node_path.clone(),
1406                    hash: node_cid.hash,
1407                    link_type: node.node_type,
1408                    size: node_size,
1409                    key: node_cid.key,
1410                });
1411
1412                // Queue children - but DON'T fetch blobs, just add them directly
1413                for link in &node.links {
1414                    let child_path = match &link.name {
1415                        Some(name) => {
1416                            if name.starts_with("_chunk_") || name.starts_with('_') {
1417                                // Internal chunked nodes - inherit parent's key, same path
1418                                let sub_cid = Cid { hash: link.hash, key: node_cid.key };
1419                                pending.push_back((sub_cid, node_path.clone()));
1420                                continue;
1421                            }
1422                            if node_path.is_empty() {
1423                                name.clone()
1424                            } else {
1425                                format!("{}/{}", node_path, name)
1426                            }
1427                        }
1428                        None => node_path.clone(),
1429                    };
1430
1431                    // OPTIMIZATION: If it's a blob, add entry directly without fetching
1432                    // The link already contains all the metadata we need
1433                    if link.link_type == LinkType::Blob {
1434                        entries.push(WalkEntry {
1435                            path: child_path,
1436                            hash: link.hash,
1437                            link_type: LinkType::Blob,
1438                            size: link.size,
1439                            key: link.key,
1440                        });
1441                        if let Some(counter) = progress {
1442                            counter.fetch_add(1, Ordering::Relaxed);
1443                        }
1444                        continue;
1445                    }
1446
1447                    // For tree nodes (File/Dir), we need to fetch to see their children
1448                    let child_cid = Cid { hash: link.hash, key: link.key };
1449                    pending.push_back((child_cid, child_path));
1450                }
1451            }
1452        }
1453
1454        Ok(entries)
1455    }
1456
1457    /// Walk tree as stream
1458    pub fn walk_stream(
1459        &self,
1460        cid: Cid,
1461        initial_path: String,
1462    ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1463        Box::pin(stream::unfold(
1464            WalkStreamState::Init { cid, path: initial_path, tree: self },
1465            |state| async move {
1466                match state {
1467                    WalkStreamState::Init { cid, path, tree } => {
1468                        let data = match tree.store.get(&cid.hash).await {
1469                            Ok(Some(d)) => d,
1470                            Ok(None) => return None,
1471                            Err(e) => {
1472                                return Some((
1473                                    Err(HashTreeError::Store(e.to_string())),
1474                                    WalkStreamState::Done,
1475                                ))
1476                            }
1477                        };
1478
1479                        // Decrypt if key is present
1480                        let data = if let Some(key) = &cid.key {
1481                            match decrypt_chk(&data, key) {
1482                                Ok(d) => d,
1483                                Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1484                            }
1485                        } else {
1486                            data
1487                        };
1488
1489                        let node = match try_decode_tree_node(&data) {
1490                            Some(n) => n,
1491                            None => {
1492                                // Blob data
1493                                let entry = WalkEntry {
1494                                    path,
1495                                    hash: cid.hash,
1496                                    link_type: LinkType::Blob,
1497                                    size: data.len() as u64,
1498                                    key: cid.key,
1499                                };
1500                                return Some((Ok(entry), WalkStreamState::Done));
1501                            }
1502                        };
1503
1504                        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1505                        let entry = WalkEntry {
1506                            path: path.clone(),
1507                            hash: cid.hash,
1508                            link_type: node.node_type,
1509                            size: node_size,
1510                            key: cid.key,
1511                        };
1512
1513                        // Create stack with children to process
1514                        let mut stack: Vec<WalkStackItem> = Vec::new();
1515                        for link in node.links.into_iter().rev() {
1516                            let child_path = match &link.name {
1517                                Some(name) if !name.starts_with('_') => {
1518                                    if path.is_empty() {
1519                                        name.clone()
1520                                    } else {
1521                                        format!("{}/{}", path, name)
1522                                    }
1523                                }
1524                                _ => path.clone(),
1525                            };
1526                            // Child nodes use their own key from link
1527                            stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1528                        }
1529
1530                        Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1531                    }
1532                    WalkStreamState::Processing { mut stack, tree } => {
1533                        tree.process_walk_stack(&mut stack).await
1534                    }
1535                    WalkStreamState::Done => None,
1536                }
1537            },
1538        ))
1539    }
1540
1541    async fn process_walk_stack<'a>(
1542        &'a self,
1543        stack: &mut Vec<WalkStackItem>,
1544    ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1545        while let Some(item) = stack.pop() {
1546            let data = match self.store.get(&item.hash).await {
1547                Ok(Some(d)) => d,
1548                Ok(None) => continue,
1549                Err(e) => {
1550                    return Some((
1551                        Err(HashTreeError::Store(e.to_string())),
1552                        WalkStreamState::Done,
1553                    ))
1554                }
1555            };
1556
1557            let node = match try_decode_tree_node(&data) {
1558                Some(n) => n,
1559                None => {
1560                    // Blob data
1561                    let entry = WalkEntry {
1562                        path: item.path,
1563                        hash: item.hash,
1564                        link_type: LinkType::Blob,
1565                        size: data.len() as u64,
1566                        key: item.key,
1567                    };
1568                    return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1569                }
1570            };
1571
1572            let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1573            let entry = WalkEntry {
1574                path: item.path.clone(),
1575                hash: item.hash,
1576                link_type: node.node_type,
1577                size: node_size,
1578                key: None, // directories are not encrypted
1579            };
1580
1581            // Push children to stack
1582            for link in node.links.into_iter().rev() {
1583                let child_path = match &link.name {
1584                    Some(name) if !name.starts_with('_') => {
1585                        if item.path.is_empty() {
1586                            name.clone()
1587                        } else {
1588                            format!("{}/{}", item.path, name)
1589                        }
1590                    }
1591                    _ => item.path.clone(),
1592                };
1593                stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1594            }
1595
1596            return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1597        }
1598        None
1599    }
1600
1601    // ============ EDIT ============
1602
1603    /// Add or update an entry in a directory
1604    /// Returns new root Cid (immutable operation)
1605    pub async fn set_entry(
1606        &self,
1607        root: &Cid,
1608        path: &[&str],
1609        name: &str,
1610        entry_cid: &Cid,
1611        size: u64,
1612        link_type: LinkType,
1613    ) -> Result<Cid, HashTreeError> {
1614        let dir_cid = self.resolve_path_array(root, path).await?;
1615        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1616
1617        let entries = self.list_directory(&dir_cid).await?;
1618        let mut new_entries: Vec<DirEntry> = entries
1619            .into_iter()
1620            .filter(|e| e.name != name)
1621            .map(|e| DirEntry {
1622                name: e.name,
1623                hash: e.hash,
1624                size: e.size,
1625                key: e.key,
1626                link_type: e.link_type,
1627                meta: e.meta,
1628            })
1629            .collect();
1630
1631        new_entries.push(DirEntry {
1632            name: name.to_string(),
1633            hash: entry_cid.hash,
1634            size,
1635            key: entry_cid.key,
1636            link_type,
1637            meta: None,
1638        });
1639
1640        let new_dir_cid = self.put_directory(new_entries).await?;
1641        self.rebuild_path(root, path, new_dir_cid).await
1642    }
1643
1644    /// Remove an entry from a directory
1645    /// Returns new root Cid
1646    pub async fn remove_entry(
1647        &self,
1648        root: &Cid,
1649        path: &[&str],
1650        name: &str,
1651    ) -> Result<Cid, HashTreeError> {
1652        let dir_cid = self.resolve_path_array(root, path).await?;
1653        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1654
1655        let entries = self.list_directory(&dir_cid).await?;
1656        let new_entries: Vec<DirEntry> = entries
1657            .into_iter()
1658            .filter(|e| e.name != name)
1659            .map(|e| DirEntry {
1660                name: e.name,
1661                hash: e.hash,
1662                size: e.size,
1663                key: e.key,
1664                link_type: e.link_type,
1665                meta: e.meta,
1666            })
1667            .collect();
1668
1669        let new_dir_cid = self.put_directory(new_entries).await?;
1670        self.rebuild_path(root, path, new_dir_cid).await
1671    }
1672
1673    /// Rename an entry in a directory
1674    /// Returns new root Cid
1675    pub async fn rename_entry(
1676        &self,
1677        root: &Cid,
1678        path: &[&str],
1679        old_name: &str,
1680        new_name: &str,
1681    ) -> Result<Cid, HashTreeError> {
1682        if old_name == new_name {
1683            return Ok(root.clone());
1684        }
1685
1686        let dir_cid = self.resolve_path_array(root, path).await?;
1687        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1688
1689        let entries = self.list_directory(&dir_cid).await?;
1690        let entry = entries
1691            .iter()
1692            .find(|e| e.name == old_name)
1693            .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1694
1695        let entry_hash = entry.hash;
1696        let entry_size = entry.size;
1697        let entry_key = entry.key;
1698        let entry_link_type = entry.link_type;
1699        let entry_meta = entry.meta.clone();
1700
1701        let new_entries: Vec<DirEntry> = entries
1702            .into_iter()
1703            .filter(|e| e.name != old_name)
1704            .map(|e| DirEntry {
1705                name: e.name,
1706                hash: e.hash,
1707                size: e.size,
1708                key: e.key,
1709                link_type: e.link_type,
1710                meta: e.meta,
1711            })
1712            .chain(std::iter::once(DirEntry {
1713                name: new_name.to_string(),
1714                hash: entry_hash,
1715                size: entry_size,
1716                key: entry_key,
1717                link_type: entry_link_type,
1718                meta: entry_meta,
1719            }))
1720            .collect();
1721
1722        let new_dir_cid = self.put_directory(new_entries).await?;
1723        self.rebuild_path(root, path, new_dir_cid).await
1724    }
1725
1726    /// Move an entry to a different directory
1727    /// Returns new root Cid
1728    pub async fn move_entry(
1729        &self,
1730        root: &Cid,
1731        source_path: &[&str],
1732        name: &str,
1733        target_path: &[&str],
1734    ) -> Result<Cid, HashTreeError> {
1735        let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1736        let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1737
1738        let source_entries = self.list_directory(&source_dir_cid).await?;
1739        let entry = source_entries
1740            .iter()
1741            .find(|e| e.name == name)
1742            .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1743
1744        let entry_cid = Cid {
1745            hash: entry.hash,
1746            key: entry.key,
1747        };
1748        let entry_size = entry.size;
1749        let entry_link_type = entry.link_type;
1750
1751        // Remove from source
1752        let new_root = self.remove_entry(root, source_path, name).await?;
1753
1754        // Add to target
1755        self.set_entry(&new_root, target_path, name, &entry_cid, entry_size, entry_link_type).await
1756    }
1757
1758    async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1759        if path.is_empty() {
1760            return Ok(Some(root.clone()));
1761        }
1762        self.resolve_path(root, &path.join("/")).await
1763    }
1764
1765    async fn rebuild_path(
1766        &self,
1767        root: &Cid,
1768        path: &[&str],
1769        new_child: Cid,
1770    ) -> Result<Cid, HashTreeError> {
1771        if path.is_empty() {
1772            return Ok(new_child);
1773        }
1774
1775        let mut child_cid = new_child;
1776        let parts: Vec<&str> = path.to_vec();
1777
1778        for i in (0..parts.len()).rev() {
1779            let child_name = parts[i];
1780            let parent_path = &parts[..i];
1781
1782            let parent_cid = if parent_path.is_empty() {
1783                root.clone()
1784            } else {
1785                self.resolve_path_array(root, parent_path)
1786                    .await?
1787                    .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1788            };
1789
1790            let parent_entries = self.list_directory(&parent_cid).await?;
1791            let new_parent_entries: Vec<DirEntry> = parent_entries
1792                .into_iter()
1793                .map(|e| {
1794                    if e.name == child_name {
1795                        DirEntry {
1796                            name: e.name,
1797                            hash: child_cid.hash,
1798                            size: 0, // Directories don't have a meaningful size in the link
1799                            key: child_cid.key,
1800                            link_type: e.link_type,
1801                            meta: e.meta,
1802                        }
1803                    } else {
1804                        DirEntry {
1805                            name: e.name,
1806                            hash: e.hash,
1807                            size: e.size,
1808                            key: e.key,
1809                            link_type: e.link_type,
1810                            meta: e.meta,
1811                        }
1812                    }
1813                })
1814                .collect();
1815
1816            child_cid = self.put_directory(new_parent_entries).await?;
1817        }
1818
1819        Ok(child_cid)
1820    }
1821
1822    // ============ UTILITY ============
1823
1824    /// Get the underlying store
1825    pub fn get_store(&self) -> Arc<S> {
1826        self.store.clone()
1827    }
1828
1829    /// Get chunk size configuration
1830    pub fn chunk_size(&self) -> usize {
1831        self.chunk_size
1832    }
1833
1834    /// Get max links configuration
1835    pub fn max_links(&self) -> usize {
1836        self.max_links
1837    }
1838}
1839
1840// Internal state types for streaming
1841
1842enum StreamStackItem {
1843    Hash(Hash),
1844}
1845
1846enum ReadStreamState<'a, S: Store> {
1847    Init { hash: Hash, tree: &'a HashTree<S> },
1848    Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1849    Done,
1850}
1851
1852struct WalkStackItem {
1853    hash: Hash,
1854    path: String,
1855    key: Option<[u8; 32]>,
1856}
1857
1858enum WalkStreamState<'a, S: Store> {
1859    Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1860    Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1861    Done,
1862}
1863
1864// Encrypted stream state types
1865struct EncryptedStackItem {
1866    hash: Hash,
1867    key: Option<[u8; 32]>,
1868}
1869
1870enum EncryptedStreamState<'a, S: Store> {
1871    Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1872    Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1873    Done,
1874}
1875
1876/// Verify tree integrity - checks that all referenced hashes exist
1877pub async fn verify_tree<S: Store>(
1878    store: Arc<S>,
1879    root_hash: &Hash,
1880) -> Result<crate::reader::VerifyResult, HashTreeError> {
1881    let mut missing = Vec::new();
1882    let mut visited = std::collections::HashSet::new();
1883
1884    verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1885
1886    Ok(crate::reader::VerifyResult {
1887        valid: missing.is_empty(),
1888        missing,
1889    })
1890}
1891
1892async fn verify_recursive<S: Store>(
1893    store: Arc<S>,
1894    hash: &Hash,
1895    missing: &mut Vec<Hash>,
1896    visited: &mut std::collections::HashSet<String>,
1897) -> Result<(), HashTreeError> {
1898    let hex = to_hex(hash);
1899    if visited.contains(&hex) {
1900        return Ok(());
1901    }
1902    visited.insert(hex);
1903
1904    let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1905        Some(d) => d,
1906        None => {
1907            missing.push(*hash);
1908            return Ok(());
1909        }
1910    };
1911
1912    if is_tree_node(&data) {
1913        let node = decode_tree_node(&data)?;
1914        for link in &node.links {
1915            Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1916        }
1917    }
1918
1919    Ok(())
1920}
1921
1922#[cfg(test)]
1923mod tests {
1924    use super::*;
1925    use crate::store::MemoryStore;
1926
1927    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1928        let store = Arc::new(MemoryStore::new());
1929        // Use public (unencrypted) mode for these tests
1930        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1931        (store, tree)
1932    }
1933
1934    #[tokio::test]
1935    async fn test_put_and_read_blob() {
1936        let (_store, tree) = make_tree();
1937
1938        let data = vec![1, 2, 3, 4, 5];
1939        let hash = tree.put_blob(&data).await.unwrap();
1940
1941        let result = tree.get_blob(&hash).await.unwrap();
1942        assert_eq!(result, Some(data));
1943    }
1944
1945    #[tokio::test]
1946    async fn test_put_and_read_file_small() {
1947        let (_store, tree) = make_tree();
1948
1949        let data = b"Hello, World!";
1950        let (cid, size) = tree.put_file(data).await.unwrap();
1951
1952        assert_eq!(size, data.len() as u64);
1953
1954        let read_data = tree.read_file(&cid.hash).await.unwrap();
1955        assert_eq!(read_data, Some(data.to_vec()));
1956    }
1957
1958    #[tokio::test]
1959    async fn test_put_and_read_directory() {
1960        let (_store, tree) = make_tree();
1961
1962        let file1 = tree.put_blob(b"content1").await.unwrap();
1963        let file2 = tree.put_blob(b"content2").await.unwrap();
1964
1965        let dir_cid = tree
1966            .put_directory(
1967                vec![
1968                    DirEntry::new("a.txt", file1).with_size(8),
1969                    DirEntry::new("b.txt", file2).with_size(8),
1970                ],
1971            )
1972            .await
1973            .unwrap();
1974
1975        let entries = tree.list_directory(&dir_cid).await.unwrap();
1976        assert_eq!(entries.len(), 2);
1977        let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1978        assert!(names.contains(&"a.txt"));
1979        assert!(names.contains(&"b.txt"));
1980    }
1981
1982    #[tokio::test]
1983    async fn test_is_directory() {
1984        let (_store, tree) = make_tree();
1985
1986        let file_hash = tree.put_blob(b"data").await.unwrap();
1987        let dir_cid = tree.put_directory(vec![]).await.unwrap();
1988
1989        assert!(!tree.is_directory(&file_hash).await.unwrap());
1990        assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1991    }
1992
1993    #[tokio::test]
1994    async fn test_resolve_path() {
1995        let (_store, tree) = make_tree();
1996
1997        let file_hash = tree.put_blob(b"nested").await.unwrap();
1998        let sub_dir = tree.put_directory(
1999            vec![DirEntry::new("file.txt", file_hash).with_size(6)],
2000        ).await.unwrap();
2001        let root_dir = tree.put_directory(
2002            vec![DirEntry::new("subdir", sub_dir.hash)],
2003        ).await.unwrap();
2004
2005        let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
2006        assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
2007    }
2008
2009    // ============ UNIFIED API TESTS ============
2010
2011    #[tokio::test]
2012    async fn test_unified_put_get_public() {
2013        let store = Arc::new(MemoryStore::new());
2014        // Use .public() to disable encryption
2015        let tree = HashTree::new(HashTreeConfig::new(store).public());
2016
2017        let data = b"Hello, public world!";
2018        let (cid, size) = tree.put(data).await.unwrap();
2019
2020        assert_eq!(size, data.len() as u64);
2021        assert!(cid.key.is_none()); // No key for public content
2022
2023        let retrieved = tree.get(&cid).await.unwrap().unwrap();
2024        assert_eq!(retrieved, data);
2025    }
2026
2027    #[tokio::test]
2028    async fn test_unified_put_get_encrypted() {
2029        let store = Arc::new(MemoryStore::new());
2030        // Default config has encryption enabled
2031        let tree = HashTree::new(HashTreeConfig::new(store));
2032
2033        let data = b"Hello, encrypted world!";
2034        let (cid, size) = tree.put(data).await.unwrap();
2035
2036        assert_eq!(size, data.len() as u64);
2037        assert!(cid.key.is_some()); // Has encryption key
2038
2039        let retrieved = tree.get(&cid).await.unwrap().unwrap();
2040        assert_eq!(retrieved, data);
2041    }
2042
2043    #[tokio::test]
2044    async fn test_unified_put_get_encrypted_chunked() {
2045        let store = Arc::new(MemoryStore::new());
2046        let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2047
2048        // Data larger than chunk size
2049        let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2050        let (cid, size) = tree.put(&data).await.unwrap();
2051
2052        assert_eq!(size, data.len() as u64);
2053        assert!(cid.key.is_some());
2054
2055        let retrieved = tree.get(&cid).await.unwrap().unwrap();
2056        assert_eq!(retrieved, data);
2057    }
2058
2059    #[tokio::test]
2060    async fn test_cid_deterministic() {
2061        let store = Arc::new(MemoryStore::new());
2062        let tree = HashTree::new(HashTreeConfig::new(store));
2063
2064        let data = b"Same content produces same CID";
2065
2066        let (cid1, _) = tree.put(data).await.unwrap();
2067        let (cid2, _) = tree.put(data).await.unwrap();
2068
2069        // CHK: same content = same hash AND same key
2070        assert_eq!(cid1.hash, cid2.hash);
2071        assert_eq!(cid1.key, cid2.key);
2072        assert_eq!(cid1.to_string(), cid2.to_string());
2073    }
2074
2075    #[tokio::test]
2076    async fn test_cid_to_string_public() {
2077        let store = Arc::new(MemoryStore::new());
2078        let tree = HashTree::new(HashTreeConfig::new(store).public());
2079
2080        let (cid, _) = tree.put(b"test").await.unwrap();
2081        let s = cid.to_string();
2082
2083        // Public CID is just the hash (64 hex chars)
2084        assert_eq!(s.len(), 64);
2085        assert!(!s.contains(':'));
2086    }
2087
2088    #[tokio::test]
2089    async fn test_cid_to_string_encrypted() {
2090        let store = Arc::new(MemoryStore::new());
2091        let tree = HashTree::new(HashTreeConfig::new(store));
2092
2093        let (cid, _) = tree.put(b"test").await.unwrap();
2094        let s = cid.to_string();
2095
2096        // Encrypted CID is "hash:key" (64 + 1 + 64 = 129 chars)
2097        assert_eq!(s.len(), 129);
2098        assert!(s.contains(':'));
2099    }
2100}