hashtree_core/
hashtree.rs

1//! HashTree - Unified merkle tree operations
2//!
3//! Single struct for creating, reading, and editing content-addressed merkle trees.
4//! Mirrors the hashtree-ts HashTree class API.
5
6use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22/// HashTree configuration
23#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25    pub store: Arc<S>,
26    pub chunk_size: usize,
27    pub max_links: usize,
28    /// Whether to encrypt content (default: true when encryption feature enabled)
29    pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33    pub fn new(store: Arc<S>) -> Self {
34        Self {
35            store,
36            chunk_size: DEFAULT_CHUNK_SIZE,
37            max_links: DEFAULT_MAX_LINKS,
38            encrypted: true,
39        }
40    }
41
42    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43        self.chunk_size = chunk_size;
44        self
45    }
46
47    pub fn with_max_links(mut self, max_links: usize) -> Self {
48        self.max_links = max_links;
49        self
50    }
51
52    /// Disable encryption (store content publicly)
53    pub fn public(mut self) -> Self {
54        self.encrypted = false;
55        self
56    }
57}
58
59/// HashTree error type
60#[derive(Debug, thiserror::Error)]
61pub enum HashTreeError {
62    #[error("Store error: {0}")]
63    Store(String),
64    #[error("Codec error: {0}")]
65    Codec(#[from] crate::codec::CodecError),
66    #[error("Missing chunk: {0}")]
67    MissingChunk(String),
68    #[error("Path not found: {0}")]
69    PathNotFound(String),
70    #[error("Entry not found: {0}")]
71    EntryNotFound(String),
72    #[error("Encryption error: {0}")]
73    Encryption(String),
74    #[error("Decryption error: {0}")]
75    Decryption(String),
76}
77
78impl From<BuilderError> for HashTreeError {
79    fn from(e: BuilderError) -> Self {
80        match e {
81            BuilderError::Store(s) => HashTreeError::Store(s),
82            BuilderError::Codec(c) => HashTreeError::Codec(c),
83            BuilderError::Encryption(s) => HashTreeError::Encryption(s),
84        }
85    }
86}
87
88impl From<ReaderError> for HashTreeError {
89    fn from(e: ReaderError) -> Self {
90        match e {
91            ReaderError::Store(s) => HashTreeError::Store(s),
92            ReaderError::Codec(c) => HashTreeError::Codec(c),
93            ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
94            ReaderError::Decryption(s) => HashTreeError::Encryption(s),
95            ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
96        }
97    }
98}
99
100/// HashTree - unified create, read, and edit merkle tree operations
101pub struct HashTree<S: Store> {
102    store: Arc<S>,
103    chunk_size: usize,
104    max_links: usize,
105    encrypted: bool,
106}
107
108impl<S: Store> HashTree<S> {
109    pub fn new(config: HashTreeConfig<S>) -> Self {
110        Self {
111            store: config.store,
112            chunk_size: config.chunk_size,
113            max_links: config.max_links,
114            encrypted: config.encrypted,
115        }
116    }
117
118    /// Check if encryption is enabled
119    pub fn is_encrypted(&self) -> bool {
120        self.encrypted
121    }
122
123    // ============ UNIFIED API ============
124
125    /// Store content, returns (Cid, size) where Cid is hash + optional key
126    /// Encrypts by default when encryption feature is enabled
127    pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
128        let size = data.len() as u64;
129
130        // Small data - store as single chunk
131        if data.len() <= self.chunk_size {
132            let (hash, key) = self.put_chunk_internal(data).await?;
133            return Ok((Cid { hash, key }, size));
134        }
135
136        // Large data - chunk it
137        let mut links: Vec<Link> = Vec::new();
138        let mut offset = 0;
139
140        while offset < data.len() {
141            let end = (offset + self.chunk_size).min(data.len());
142            let chunk = &data[offset..end];
143            let chunk_size = chunk.len() as u64;
144            let (hash, key) = self.put_chunk_internal(chunk).await?;
145            links.push(Link {
146                hash,
147                name: None,
148                size: chunk_size,
149                key,
150                link_type: LinkType::Blob, // Leaf chunk (raw blob)
151                meta: None,
152            });
153            offset = end;
154        }
155
156        // Build tree from chunks
157        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
158        Ok((Cid { hash: root_hash, key: root_key }, size))
159    }
160
161    /// Get content by Cid (handles decryption automatically)
162    pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
163        if let Some(key) = cid.key {
164            self.get_encrypted(&cid.hash, &key).await
165        } else {
166            self.read_file(&cid.hash).await
167        }
168    }
169
170    /// Store content from an async reader (streaming put)
171    ///
172    /// Reads data in chunks and builds a merkle tree incrementally.
173    /// Useful for large files or streaming data sources.
174    /// Returns (Cid, size) where Cid is hash + optional key
175    pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<(Cid, u64), HashTreeError> {
176        let mut buffer = vec![0u8; self.chunk_size];
177        let mut links = Vec::new();
178        let mut total_size: u64 = 0;
179        let mut consistent_key: Option<[u8; 32]> = None;
180
181        loop {
182            let mut chunk = Vec::new();
183            let mut bytes_read = 0;
184
185            // Read until we have a full chunk or EOF
186            while bytes_read < self.chunk_size {
187                let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
188                    .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
189                if n == 0 {
190                    break; // EOF
191                }
192                chunk.extend_from_slice(&buffer[..n]);
193                bytes_read += n;
194            }
195
196            if chunk.is_empty() {
197                break; // No more data
198            }
199
200            let chunk_len = chunk.len() as u64;
201            total_size += chunk_len;
202
203            let (hash, key) = self.put_chunk_internal(&chunk).await?;
204
205            // Track consistent key for single-key result
206            if links.is_empty() {
207                consistent_key = key;
208            } else if consistent_key != key {
209                consistent_key = None;
210            }
211
212            links.push(Link {
213                hash,
214                name: None,
215                size: chunk_len,
216                key,
217                link_type: LinkType::Blob, // Leaf chunk (raw blob)
218                meta: None,
219            });
220        }
221
222        if links.is_empty() {
223            // Empty input
224            let (hash, key) = self.put_chunk_internal(&[]).await?;
225            return Ok((Cid { hash, key }, 0));
226        }
227
228        // Build tree from chunks
229        let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
230        Ok((Cid { hash: root_hash, key: root_key }, total_size))
231    }
232
233    /// Read content as a stream of chunks by Cid (handles decryption automatically)
234    ///
235    /// Returns an async stream that yields chunks as they are read.
236    /// Useful for large files or when you want to process data incrementally.
237    pub fn get_stream(
238        &self,
239        cid: &Cid,
240    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
241        let hash = cid.hash;
242        let key = cid.key;
243
244        if let Some(k) = key {
245            // Encrypted stream
246            Box::pin(self.read_file_stream_encrypted(hash, k))
247        } else {
248            // Unencrypted stream
249            self.read_file_stream(hash)
250        }
251    }
252
253    /// Read encrypted file as stream (internal)
254    fn read_file_stream_encrypted(
255        &self,
256        hash: Hash,
257        key: EncryptionKey,
258    ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
259        stream::unfold(
260            EncryptedStreamState::Init { hash, key, tree: self },
261            |state| async move {
262                match state {
263                    EncryptedStreamState::Init { hash, key, tree } => {
264                        let data = match tree.store.get(&hash).await {
265                            Ok(Some(d)) => d,
266                            Ok(None) => return None,
267                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
268                        };
269
270                        // Try to decrypt
271                        let decrypted = match decrypt_chk(&data, &key) {
272                            Ok(d) => d,
273                            Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
274                        };
275
276                        if !is_tree_node(&decrypted) {
277                            // Single blob - yield decrypted data
278                            return Some((Ok(decrypted), EncryptedStreamState::Done));
279                        }
280
281                        // Tree node - parse and traverse
282                        let node = match decode_tree_node(&decrypted) {
283                            Ok(n) => n,
284                            Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
285                        };
286
287                        let mut stack: Vec<EncryptedStackItem> = Vec::new();
288                        for link in node.links.into_iter().rev() {
289                            stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
290                        }
291
292                        tree.process_encrypted_stream_stack(&mut stack).await
293                    }
294                    EncryptedStreamState::Processing { mut stack, tree } => {
295                        tree.process_encrypted_stream_stack(&mut stack).await
296                    }
297                    EncryptedStreamState::Done => None,
298                }
299            },
300        )
301    }
302
303    async fn process_encrypted_stream_stack<'a>(
304        &'a self,
305        stack: &mut Vec<EncryptedStackItem>,
306    ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
307        while let Some(item) = stack.pop() {
308            let data = match self.store.get(&item.hash).await {
309                Ok(Some(d)) => d,
310                Ok(None) => {
311                    return Some((
312                        Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
313                        EncryptedStreamState::Done,
314                    ))
315                }
316                Err(e) => {
317                    return Some((
318                        Err(HashTreeError::Store(e.to_string())),
319                        EncryptedStreamState::Done,
320                    ))
321                }
322            };
323
324            // Decrypt if we have a key
325            let decrypted = if let Some(key) = item.key {
326                match decrypt_chk(&data, &key) {
327                    Ok(d) => d,
328                    Err(e) => {
329                        return Some((
330                            Err(HashTreeError::Decryption(e.to_string())),
331                            EncryptedStreamState::Done,
332                        ))
333                    }
334                }
335            } else {
336                data
337            };
338
339            if is_tree_node(&decrypted) {
340                // Nested tree node - add children to stack
341                let node = match decode_tree_node(&decrypted) {
342                    Ok(n) => n,
343                    Err(e) => {
344                        return Some((
345                            Err(HashTreeError::Codec(e)),
346                            EncryptedStreamState::Done,
347                        ))
348                    }
349                };
350                for link in node.links.into_iter().rev() {
351                    stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
352                }
353            } else {
354                // Leaf chunk - yield decrypted data
355                return Some((
356                    Ok(decrypted),
357                    EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
358                ));
359            }
360        }
361        None
362    }
363
364    /// Store a chunk with optional encryption
365    async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
366        if self.encrypted {
367            let (encrypted, key) = encrypt_chk(data)
368                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
369            let hash = sha256(&encrypted);
370            self.store
371                .put(hash, encrypted)
372                .await
373                .map_err(|e| HashTreeError::Store(e.to_string()))?;
374            Ok((hash, Some(key)))
375        } else {
376            let hash = self.put_blob(data).await?;
377            Ok((hash, None))
378        }
379    }
380
381    /// Build tree and return (hash, optional_key)
382    async fn build_tree_internal(
383        &self,
384        links: Vec<Link>,
385        total_size: Option<u64>,
386    ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
387        // Single link with matching size - return directly
388        if links.len() == 1 {
389            if let Some(ts) = total_size {
390                if links[0].size == ts {
391                    return Ok((links[0].hash, links[0].key));
392                }
393            }
394        }
395
396        if links.len() <= self.max_links {
397            let node = TreeNode {
398                node_type: LinkType::File,
399                links,
400            };
401            let (data, _) = encode_and_hash(&node)?;
402
403            if self.encrypted {
404                let (encrypted, key) = encrypt_chk(&data)
405                    .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
406                let hash = sha256(&encrypted);
407                self.store
408                    .put(hash, encrypted)
409                    .await
410                    .map_err(|e| HashTreeError::Store(e.to_string()))?;
411                return Ok((hash, Some(key)));
412            }
413
414            // Unencrypted path
415            let hash = sha256(&data);
416            self.store
417                .put(hash, data)
418                .await
419                .map_err(|e| HashTreeError::Store(e.to_string()))?;
420            return Ok((hash, None));
421        }
422
423        // Too many links - create subtrees
424        let mut sub_links = Vec::new();
425        for batch in links.chunks(self.max_links) {
426            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
427            let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
428            sub_links.push(Link {
429                hash,
430                name: None,
431                size: batch_size,
432                key,
433                link_type: LinkType::File, // Internal tree node
434                meta: None,
435            });
436        }
437
438        Box::pin(self.build_tree_internal(sub_links, total_size)).await
439    }
440
441    /// Get encrypted content by hash and key
442    async fn get_encrypted(
443        &self,
444        hash: &Hash,
445        key: &EncryptionKey,
446    ) -> Result<Option<Vec<u8>>, HashTreeError> {
447        let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
448            Some(d) => d,
449            None => return Ok(None),
450        };
451
452        // Decrypt the data
453        let decrypted = decrypt_chk(&encrypted_data, key)
454            .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
455
456        // Check if it's a tree node
457        if is_tree_node(&decrypted) {
458            let node = decode_tree_node(&decrypted)?;
459            let assembled = self.assemble_encrypted_chunks(&node).await?;
460            return Ok(Some(assembled));
461        }
462
463        // Single chunk data
464        Ok(Some(decrypted))
465    }
466
467    /// Assemble encrypted chunks from tree
468    async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
469        let mut parts: Vec<Vec<u8>> = Vec::new();
470
471        for link in &node.links {
472            let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
473
474            let encrypted_child = self
475                .store
476                .get(&link.hash)
477                .await
478                .map_err(|e| HashTreeError::Store(e.to_string()))?
479                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
480
481            let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
482                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
483
484            if is_tree_node(&decrypted) {
485                // Intermediate tree node - recurse
486                let child_node = decode_tree_node(&decrypted)?;
487                let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
488                parts.push(child_data);
489            } else {
490                // Leaf data chunk
491                parts.push(decrypted);
492            }
493        }
494
495        let total_len: usize = parts.iter().map(|p| p.len()).sum();
496        let mut result = Vec::with_capacity(total_len);
497        for part in parts {
498            result.extend_from_slice(&part);
499        }
500
501        Ok(result)
502    }
503
504    // ============ LOW-LEVEL CREATE ============
505
506    /// Store a blob directly (small data, no encryption)
507    /// Returns the content hash
508    pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
509        let hash = sha256(data);
510        self.store
511            .put(hash, data.to_vec())
512            .await
513            .map_err(|e| HashTreeError::Store(e.to_string()))?;
514        Ok(hash)
515    }
516
517    /// Store a file, chunking if necessary
518    /// Returns (Cid, size) where Cid is hash + optional key
519    pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
520        let size = data.len() as u64;
521
522        // Small file - store as single chunk
523        if data.len() <= self.chunk_size {
524            let (hash, key) = self.put_chunk_internal(data).await?;
525            return Ok((Cid { hash, key }, size));
526        }
527
528        // Large file - chunk it
529        let mut links: Vec<Link> = Vec::new();
530        let mut offset = 0;
531
532        while offset < data.len() {
533            let end = (offset + self.chunk_size).min(data.len());
534            let chunk = &data[offset..end];
535            let chunk_size = (end - offset) as u64;
536
537            let (hash, key) = self.put_chunk_internal(chunk).await?;
538            links.push(Link {
539                hash,
540                name: None,
541                size: chunk_size,
542                key,
543                link_type: LinkType::Blob, // Leaf chunk
544                meta: None,
545            });
546            offset = end;
547        }
548
549        // Build tree from chunks (uses encryption if enabled)
550        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
551        Ok((Cid { hash: root_hash, key: root_key }, size))
552    }
553
554    /// Build a directory from entries
555    /// Returns Cid with key if encrypted
556    ///
557    /// For large directories, the messagepack-encoded TreeNode is stored via put()
558    /// which automatically chunks the data. The reader uses read_file() to reassemble.
559    pub async fn put_directory(
560        &self,
561        entries: Vec<DirEntry>,
562    ) -> Result<Cid, HashTreeError> {
563        // Sort entries by name for deterministic hashing
564        let mut sorted = entries;
565        sorted.sort_by(|a, b| a.name.cmp(&b.name));
566
567        let links: Vec<Link> = sorted
568            .into_iter()
569            .map(|e| Link {
570                hash: e.hash,
571                name: Some(e.name),
572                size: e.size,
573                key: e.key,
574                link_type: e.link_type,
575                meta: e.meta,
576            })
577            .collect();
578
579        // Create the directory node with all entries
580        let node = TreeNode {
581            node_type: LinkType::Dir,
582            links,
583        };
584        let (data, _plain_hash) = encode_and_hash(&node)?;
585
586        // Store directory data via put() - handles both small and large directories
587        // For small dirs, stores as single chunk
588        // For large dirs, chunks transparently via build_tree()
589        // Reader uses read_file() to reassemble before decoding
590        let (cid, _size) = self.put(&data).await?;
591        Ok(cid)
592    }
593
594    /// Create a tree node with custom links
595    pub async fn put_tree_node(
596        &self,
597        links: Vec<Link>,
598    ) -> Result<Hash, HashTreeError> {
599        let node = TreeNode {
600            node_type: LinkType::Dir,
601            links,
602        };
603
604        let (data, hash) = encode_and_hash(&node)?;
605        self.store
606            .put(hash, data)
607            .await
608            .map_err(|e| HashTreeError::Store(e.to_string()))?;
609        Ok(hash)
610    }
611
612    // ============ READ ============
613
614    /// Get raw data by hash
615    pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
616        self.store
617            .get(hash)
618            .await
619            .map_err(|e| HashTreeError::Store(e.to_string()))
620    }
621
622    /// Get and decode a tree node (unencrypted)
623    pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
624        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
625            Some(d) => d,
626            None => return Ok(None),
627        };
628
629        if !is_tree_node(&data) {
630            return Ok(None);
631        }
632
633        let node = decode_tree_node(&data)?;
634        Ok(Some(node))
635    }
636
637    /// Get and decode a tree node using Cid (with decryption if key present)
638    pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
639        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
640            Some(d) => d,
641            None => return Ok(None),
642        };
643
644        // Decrypt if key is present
645        let decrypted = if let Some(key) = &cid.key {
646            decrypt_chk(&data, key)
647                .map_err(|e| HashTreeError::Decryption(e.to_string()))?
648        } else {
649            data
650        };
651
652        if !is_tree_node(&decrypted) {
653            return Ok(None);
654        }
655
656        let node = decode_tree_node(&decrypted)?;
657        Ok(Some(node))
658    }
659
660    /// Get directory node, handling chunked directory data
661    /// Use this when you know the target is a directory (from parent link_type)
662    pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
663        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
664            Some(d) => d,
665            None => return Ok(None),
666        };
667
668        // Decrypt if key is present
669        let decrypted = if let Some(key) = &cid.key {
670            decrypt_chk(&data, key)
671                .map_err(|e| HashTreeError::Decryption(e.to_string()))?
672        } else {
673            data
674        };
675
676        if !is_tree_node(&decrypted) {
677            return Ok(None);
678        }
679
680        let node = decode_tree_node(&decrypted)?;
681
682        // If this is a file tree (chunked data), reassemble to get actual directory
683        if node.node_type == LinkType::File {
684            let assembled = self.assemble_chunks(&node).await?;
685            if is_tree_node(&assembled) {
686                let inner_node = decode_tree_node(&assembled)?;
687                return Ok(Some(inner_node));
688            }
689        }
690
691        Ok(Some(node))
692    }
693
694    /// Check if hash points to a tree node (no decryption)
695    pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
696        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
697            Some(d) => d,
698            None => return Ok(false),
699        };
700        Ok(is_tree_node(&data))
701    }
702
703    /// Check if Cid points to a directory (with decryption)
704    pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
705        let node = match self.get_node(cid).await? {
706            Some(n) => n,
707            None => return Ok(false),
708        };
709        // Directory has named links (not just internal chunks)
710        Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
711    }
712
713    /// Check if hash points to a directory (tree with named links, no decryption)
714    pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
715        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
716            Some(d) => d,
717            None => return Ok(false),
718        };
719        Ok(is_directory_node(&data))
720    }
721
722    /// Read a complete file (reassemble chunks if needed)
723    pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
724        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
725            Some(d) => d,
726            None => return Ok(None),
727        };
728
729        // Check if it's a tree (chunked file) or raw blob
730        if !is_tree_node(&data) {
731            return Ok(Some(data));
732        }
733
734        // It's a tree - reassemble chunks
735        let node = decode_tree_node(&data)?;
736        let assembled = self.assemble_chunks(&node).await?;
737        Ok(Some(assembled))
738    }
739
740    /// Read a byte range from a file (fetches only necessary chunks)
741    ///
742    /// - `start`: Starting byte offset (inclusive)
743    /// - `end`: Ending byte offset (exclusive), or None to read to end
744    ///
745    /// This is more efficient than read_file() for partial reads of large files.
746    pub async fn read_file_range(
747        &self,
748        hash: &Hash,
749        start: u64,
750        end: Option<u64>,
751    ) -> Result<Option<Vec<u8>>, HashTreeError> {
752        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
753            Some(d) => d,
754            None => return Ok(None),
755        };
756
757        // Single blob - just slice it
758        if !is_tree_node(&data) {
759            let start_idx = start as usize;
760            let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
761            if start_idx >= data.len() {
762                return Ok(Some(vec![]));
763            }
764            let end_idx = end_idx.min(data.len());
765            return Ok(Some(data[start_idx..end_idx].to_vec()));
766        }
767
768        // It's a chunked file - fetch only needed chunks
769        let node = decode_tree_node(&data)?;
770        let range_data = self.assemble_chunks_range(&node, start, end).await?;
771        Ok(Some(range_data))
772    }
773
774    /// Assemble only the chunks needed for a byte range
775    async fn assemble_chunks_range(
776        &self,
777        node: &TreeNode,
778        start: u64,
779        end: Option<u64>,
780    ) -> Result<Vec<u8>, HashTreeError> {
781        // First, flatten the tree to get all leaf chunks with their byte offsets
782        let chunks_info = self.collect_chunk_offsets(node).await?;
783
784        if chunks_info.is_empty() {
785            return Ok(vec![]);
786        }
787
788        // Calculate total size and actual end
789        let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
790        let actual_end = end.unwrap_or(total_size).min(total_size);
791
792        if start >= actual_end {
793            return Ok(vec![]);
794        }
795
796        // Find chunks that overlap with [start, actual_end)
797        let mut result = Vec::with_capacity((actual_end - start) as usize);
798        let mut current_offset = 0u64;
799
800        for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
801            let chunk_start = current_offset;
802            let chunk_end = current_offset + chunk_size;
803
804            // Check if this chunk overlaps with our range
805            if chunk_end > start && chunk_start < actual_end {
806                // Fetch this chunk
807                let chunk_data = self
808                    .store
809                    .get(chunk_hash)
810                    .await
811                    .map_err(|e| HashTreeError::Store(e.to_string()))?
812                    .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
813
814                // Calculate slice bounds within this chunk
815                let slice_start = if start > chunk_start {
816                    (start - chunk_start) as usize
817                } else {
818                    0
819                };
820                let slice_end = if actual_end < chunk_end {
821                    (actual_end - chunk_start) as usize
822                } else {
823                    chunk_data.len()
824                };
825
826                result.extend_from_slice(&chunk_data[slice_start..slice_end]);
827            }
828
829            current_offset = chunk_end;
830
831            // Early exit if we've passed the requested range
832            if current_offset >= actual_end {
833                break;
834            }
835        }
836
837        Ok(result)
838    }
839
840    /// Collect all leaf chunk hashes with their byte offsets
841    /// Returns Vec<(hash, offset, size)>
842    async fn collect_chunk_offsets(
843        &self,
844        node: &TreeNode,
845    ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
846        let mut chunks = Vec::new();
847        let mut offset = 0u64;
848        self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset).await?;
849        Ok(chunks)
850    }
851
852    async fn collect_chunk_offsets_recursive(
853        &self,
854        node: &TreeNode,
855        chunks: &mut Vec<(Hash, u64, u64)>,
856        offset: &mut u64,
857    ) -> Result<(), HashTreeError> {
858        for link in &node.links {
859            let child_data = self
860                .store
861                .get(&link.hash)
862                .await
863                .map_err(|e| HashTreeError::Store(e.to_string()))?
864                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
865
866            if is_tree_node(&child_data) {
867                // Intermediate node - recurse
868                let child_node = decode_tree_node(&child_data)?;
869                Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
870            } else {
871                // Leaf chunk
872                let size = child_data.len() as u64;
873                chunks.push((link.hash, *offset, size));
874                *offset += size;
875            }
876        }
877        Ok(())
878    }
879
880    /// Recursively assemble chunks from tree
881    async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
882        let mut parts: Vec<Vec<u8>> = Vec::new();
883
884        for link in &node.links {
885            let child_data = self
886                .store
887                .get(&link.hash)
888                .await
889                .map_err(|e| HashTreeError::Store(e.to_string()))?
890                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
891
892            if is_tree_node(&child_data) {
893                let child_node = decode_tree_node(&child_data)?;
894                parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
895            } else {
896                parts.push(child_data);
897            }
898        }
899
900        // Concatenate all parts
901        let total_length: usize = parts.iter().map(|p| p.len()).sum();
902        let mut result = Vec::with_capacity(total_length);
903        for part in parts {
904            result.extend_from_slice(&part);
905        }
906
907        Ok(result)
908    }
909
910    /// Read a file as stream of chunks
911    /// Returns an async stream that yields chunks as they are read
912    pub fn read_file_stream(
913        &self,
914        hash: Hash,
915    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
916        Box::pin(stream::unfold(
917            ReadStreamState::Init { hash, tree: self },
918            |state| async move {
919                match state {
920                    ReadStreamState::Init { hash, tree } => {
921                        let data = match tree.store.get(&hash).await {
922                            Ok(Some(d)) => d,
923                            Ok(None) => return None,
924                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
925                        };
926
927                        if !is_tree_node(&data) {
928                            // Single blob - yield it and finish
929                            return Some((Ok(data), ReadStreamState::Done));
930                        }
931
932                        // Tree node - start streaming chunks
933                        let node = match decode_tree_node(&data) {
934                            Ok(n) => n,
935                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
936                        };
937
938                        // Create stack with all links to process
939                        let mut stack: Vec<StreamStackItem> = Vec::new();
940                        for link in node.links.into_iter().rev() {
941                            stack.push(StreamStackItem::Hash(link.hash));
942                        }
943
944                        // Process first item
945                        tree.process_stream_stack(&mut stack).await
946                    }
947                    ReadStreamState::Processing { mut stack, tree } => {
948                        tree.process_stream_stack(&mut stack).await
949                    }
950                    ReadStreamState::Done => None,
951                }
952            },
953        ))
954    }
955
956    async fn process_stream_stack<'a>(
957        &'a self,
958        stack: &mut Vec<StreamStackItem>,
959    ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
960        while let Some(item) = stack.pop() {
961            match item {
962                StreamStackItem::Hash(hash) => {
963                    let data = match self.store.get(&hash).await {
964                        Ok(Some(d)) => d,
965                        Ok(None) => {
966                            return Some((
967                                Err(HashTreeError::MissingChunk(to_hex(&hash))),
968                                ReadStreamState::Done,
969                            ))
970                        }
971                        Err(e) => {
972                            return Some((
973                                Err(HashTreeError::Store(e.to_string())),
974                                ReadStreamState::Done,
975                            ))
976                        }
977                    };
978
979                    if is_tree_node(&data) {
980                        // Nested tree - push its children to stack
981                        let node = match decode_tree_node(&data) {
982                            Ok(n) => n,
983                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
984                        };
985                        for link in node.links.into_iter().rev() {
986                            stack.push(StreamStackItem::Hash(link.hash));
987                        }
988                    } else {
989                        // Leaf blob - yield it
990                        return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
991                    }
992                }
993            }
994        }
995        None
996    }
997
998    /// Read file chunks as Vec (non-streaming version)
999    pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1000        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1001            Some(d) => d,
1002            None => return Ok(vec![]),
1003        };
1004
1005        if !is_tree_node(&data) {
1006            return Ok(vec![data]);
1007        }
1008
1009        let node = decode_tree_node(&data)?;
1010        self.collect_chunks(&node).await
1011    }
1012
1013    async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1014        let mut chunks = Vec::new();
1015
1016        for link in &node.links {
1017            let child_data = self
1018                .store
1019                .get(&link.hash)
1020                .await
1021                .map_err(|e| HashTreeError::Store(e.to_string()))?
1022                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1023
1024            if is_tree_node(&child_data) {
1025                let child_node = decode_tree_node(&child_data)?;
1026                chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1027            } else {
1028                chunks.push(child_data);
1029            }
1030        }
1031
1032        Ok(chunks)
1033    }
1034
1035    /// List directory entries (Cid-based, supports encrypted directories)
1036    pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1037        let node = match self.get_node(cid).await? {
1038            Some(n) => n,
1039            None => return Ok(vec![]),
1040        };
1041
1042        let mut entries = Vec::new();
1043
1044        for link in &node.links {
1045            // Skip internal chunk nodes - recurse into them
1046            if let Some(ref name) = link.name {
1047                if name.starts_with("_chunk_") || name.starts_with('_') {
1048                    let chunk_cid = Cid { hash: link.hash, key: link.key };
1049                    let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1050                    entries.extend(sub_entries);
1051                    continue;
1052                }
1053            }
1054
1055            entries.push(TreeEntry {
1056                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1057                hash: link.hash,
1058                size: link.size,
1059                link_type: link.link_type,
1060                key: link.key,
1061                meta: link.meta.clone(),
1062            });
1063        }
1064
1065        Ok(entries)
1066    }
1067
1068    /// List directory entries using Cid (with decryption if key present)
1069    /// Handles both regular and chunked directory data
1070    pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1071        // Use get_directory_node which handles chunked directory data
1072        let node = match self.get_directory_node(cid).await? {
1073            Some(n) => n,
1074            None => return Ok(vec![]),
1075        };
1076
1077        let mut entries = Vec::new();
1078
1079        for link in &node.links {
1080            // Skip internal chunk nodes (backwards compat with old _chunk_ format)
1081            if let Some(ref name) = link.name {
1082                if name.starts_with("_chunk_") || name.starts_with('_') {
1083                    // Internal nodes inherit parent's key for decryption
1084                    let sub_cid = Cid { hash: link.hash, key: cid.key };
1085                    let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1086                    entries.extend(sub_entries);
1087                    continue;
1088                }
1089            }
1090
1091            entries.push(TreeEntry {
1092                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1093                hash: link.hash,
1094                size: link.size,
1095                link_type: link.link_type,
1096                key: link.key,
1097                meta: link.meta.clone(),
1098            });
1099        }
1100
1101        Ok(entries)
1102    }
1103
1104    /// Resolve a path within a tree (returns Cid with key if encrypted)
1105    pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1106        let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1107        if parts.is_empty() {
1108            return Ok(Some(cid.clone()));
1109        }
1110
1111        let mut current_cid = cid.clone();
1112
1113        for part in parts {
1114            // Use get_directory_node which handles chunked directory data
1115            let node = match self.get_directory_node(&current_cid).await? {
1116                Some(n) => n,
1117                None => return Ok(None),
1118            };
1119
1120            if let Some(link) = self.find_link(&node, part) {
1121                current_cid = Cid {
1122                    hash: link.hash,
1123                    key: link.key,
1124                };
1125            } else {
1126                // Check internal nodes
1127                match self.find_link_in_subtrees_cid(&node, part, &current_cid).await? {
1128                    Some(link) => {
1129                        current_cid = Cid {
1130                            hash: link.hash,
1131                            key: link.key,
1132                        };
1133                    }
1134                    None => return Ok(None),
1135                }
1136            }
1137        }
1138
1139        Ok(Some(current_cid))
1140    }
1141
1142    /// Resolve a path within a tree using Cid (with decryption if key present)
1143    pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1144        self.resolve(cid, path).await
1145    }
1146
1147    fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1148        node.links
1149            .iter()
1150            .find(|l| l.name.as_deref() == Some(name))
1151            .cloned()
1152    }
1153
1154    /// Find a link in subtrees using Cid (with decryption support)
1155    async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1156        for link in &node.links {
1157            if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1158                continue;
1159            }
1160
1161            // Internal nodes inherit encryption from parent context
1162            let sub_cid = Cid {
1163                hash: link.hash,
1164                key: link.key.clone(),
1165            };
1166
1167            let sub_node = match self.get_node(&sub_cid).await? {
1168                Some(n) => n,
1169                None => continue,
1170            };
1171
1172            if let Some(found) = self.find_link(&sub_node, name) {
1173                return Ok(Some(found));
1174            }
1175
1176            if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1177                return Ok(Some(deep_found));
1178            }
1179        }
1180
1181        Ok(None)
1182    }
1183
1184    /// Get total size of a tree
1185    pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1186        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1187            Some(d) => d,
1188            None => return Ok(0),
1189        };
1190
1191        if !is_tree_node(&data) {
1192            return Ok(data.len() as u64);
1193        }
1194
1195        let node = decode_tree_node(&data)?;
1196        // Calculate from children
1197        let mut total = 0u64;
1198        for link in &node.links {
1199            total += link.size;
1200        }
1201        Ok(total)
1202    }
1203
1204    /// Walk entire tree depth-first (returns Vec)
1205    pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1206        let mut entries = Vec::new();
1207        self.walk_recursive(cid, path, &mut entries).await?;
1208        Ok(entries)
1209    }
1210
1211    async fn walk_recursive(
1212        &self,
1213        cid: &Cid,
1214        path: &str,
1215        entries: &mut Vec<WalkEntry>,
1216    ) -> Result<(), HashTreeError> {
1217        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1218            Some(d) => d,
1219            None => return Ok(()),
1220        };
1221
1222        // Decrypt if key is present
1223        let data = if let Some(key) = &cid.key {
1224            decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1225        } else {
1226            data
1227        };
1228
1229        let node = match try_decode_tree_node(&data) {
1230            Some(n) => n,
1231            None => {
1232                entries.push(WalkEntry {
1233                    path: path.to_string(),
1234                    hash: cid.hash,
1235                    link_type: LinkType::Blob,
1236                    size: data.len() as u64,
1237                    key: cid.key,
1238                });
1239                return Ok(());
1240            }
1241        };
1242
1243        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1244        entries.push(WalkEntry {
1245            path: path.to_string(),
1246            hash: cid.hash,
1247            link_type: node.node_type,
1248            size: node_size,
1249            key: cid.key,
1250        });
1251
1252        for link in &node.links {
1253            let child_path = match &link.name {
1254                Some(name) => {
1255                    if name.starts_with("_chunk_") || name.starts_with('_') {
1256                        // Internal nodes inherit parent's key
1257                        let sub_cid = Cid { hash: link.hash, key: cid.key };
1258                        Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1259                        continue;
1260                    }
1261                    if path.is_empty() {
1262                        name.clone()
1263                    } else {
1264                        format!("{}/{}", path, name)
1265                    }
1266                }
1267                None => path.to_string(),
1268            };
1269
1270            // Child nodes use their own key from link
1271            let child_cid = Cid { hash: link.hash, key: link.key };
1272            Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1273        }
1274
1275        Ok(())
1276    }
1277
1278    /// Walk entire tree with parallel fetching
1279    /// Uses a work-stealing approach: always keeps `concurrency` requests in flight
1280    pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1281        self.walk_parallel_with_progress(cid, path, concurrency, None).await
1282    }
1283
1284    /// Walk entire tree with parallel fetching and optional progress counter
1285    /// The counter is incremented for each node fetched (not just entries found)
1286    ///
1287    /// OPTIMIZATION: Blobs are NOT fetched - their metadata (hash, size, link_type)
1288    /// comes from the parent node's link, so we just add them directly to entries.
1289    /// This avoids downloading file contents during tree traversal.
1290    pub async fn walk_parallel_with_progress(
1291        &self,
1292        cid: &Cid,
1293        path: &str,
1294        concurrency: usize,
1295        progress: Option<&std::sync::atomic::AtomicUsize>,
1296    ) -> Result<Vec<WalkEntry>, HashTreeError> {
1297        use futures::stream::{FuturesUnordered, StreamExt};
1298        use std::collections::VecDeque;
1299        use std::sync::atomic::Ordering;
1300
1301        let mut entries = Vec::new();
1302        let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1303        let mut active = FuturesUnordered::new();
1304
1305        // Seed with root
1306        pending.push_back((cid.clone(), path.to_string()));
1307
1308        loop {
1309            // Fill up to concurrency limit from pending queue
1310            while active.len() < concurrency {
1311                if let Some((node_cid, node_path)) = pending.pop_front() {
1312                    let store = &self.store;
1313                    let fut = async move {
1314                        let data = store.get(&node_cid.hash).await
1315                            .map_err(|e| HashTreeError::Store(e.to_string()))?;
1316                        Ok::<_, HashTreeError>((node_cid, node_path, data))
1317                    };
1318                    active.push(fut);
1319                } else {
1320                    break;
1321                }
1322            }
1323
1324            // If nothing active, we're done
1325            if active.is_empty() {
1326                break;
1327            }
1328
1329            // Wait for any future to complete
1330            if let Some(result) = active.next().await {
1331                let (node_cid, node_path, data) = result?;
1332
1333                // Update progress counter
1334                if let Some(counter) = progress {
1335                    counter.fetch_add(1, Ordering::Relaxed);
1336                }
1337
1338                let data = match data {
1339                    Some(d) => d,
1340                    None => continue,
1341                };
1342
1343                // Decrypt if key is present
1344                let data = if let Some(key) = &node_cid.key {
1345                    decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1346                } else {
1347                    data
1348                };
1349
1350                let node = match try_decode_tree_node(&data) {
1351                    Some(n) => n,
1352                    None => {
1353                        // It's a blob/file - this case only happens for root
1354                        entries.push(WalkEntry {
1355                            path: node_path,
1356                            hash: node_cid.hash,
1357                            link_type: LinkType::Blob,
1358                            size: data.len() as u64,
1359                            key: node_cid.key,
1360                        });
1361                        continue;
1362                    }
1363                };
1364
1365                // It's a directory/file node
1366                let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1367                entries.push(WalkEntry {
1368                    path: node_path.clone(),
1369                    hash: node_cid.hash,
1370                    link_type: node.node_type,
1371                    size: node_size,
1372                    key: node_cid.key,
1373                });
1374
1375                // Queue children - but DON'T fetch blobs, just add them directly
1376                for link in &node.links {
1377                    let child_path = match &link.name {
1378                        Some(name) => {
1379                            if name.starts_with("_chunk_") || name.starts_with('_') {
1380                                // Internal chunked nodes - inherit parent's key, same path
1381                                let sub_cid = Cid { hash: link.hash, key: node_cid.key };
1382                                pending.push_back((sub_cid, node_path.clone()));
1383                                continue;
1384                            }
1385                            if node_path.is_empty() {
1386                                name.clone()
1387                            } else {
1388                                format!("{}/{}", node_path, name)
1389                            }
1390                        }
1391                        None => node_path.clone(),
1392                    };
1393
1394                    // OPTIMIZATION: If it's a blob, add entry directly without fetching
1395                    // The link already contains all the metadata we need
1396                    if link.link_type == LinkType::Blob {
1397                        entries.push(WalkEntry {
1398                            path: child_path,
1399                            hash: link.hash,
1400                            link_type: LinkType::Blob,
1401                            size: link.size,
1402                            key: link.key,
1403                        });
1404                        if let Some(counter) = progress {
1405                            counter.fetch_add(1, Ordering::Relaxed);
1406                        }
1407                        continue;
1408                    }
1409
1410                    // For tree nodes (File/Dir), we need to fetch to see their children
1411                    let child_cid = Cid { hash: link.hash, key: link.key };
1412                    pending.push_back((child_cid, child_path));
1413                }
1414            }
1415        }
1416
1417        Ok(entries)
1418    }
1419
1420    /// Walk tree as stream
1421    pub fn walk_stream(
1422        &self,
1423        cid: Cid,
1424        initial_path: String,
1425    ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1426        Box::pin(stream::unfold(
1427            WalkStreamState::Init { cid, path: initial_path, tree: self },
1428            |state| async move {
1429                match state {
1430                    WalkStreamState::Init { cid, path, tree } => {
1431                        let data = match tree.store.get(&cid.hash).await {
1432                            Ok(Some(d)) => d,
1433                            Ok(None) => return None,
1434                            Err(e) => {
1435                                return Some((
1436                                    Err(HashTreeError::Store(e.to_string())),
1437                                    WalkStreamState::Done,
1438                                ))
1439                            }
1440                        };
1441
1442                        // Decrypt if key is present
1443                        let data = if let Some(key) = &cid.key {
1444                            match decrypt_chk(&data, key) {
1445                                Ok(d) => d,
1446                                Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1447                            }
1448                        } else {
1449                            data
1450                        };
1451
1452                        let node = match try_decode_tree_node(&data) {
1453                            Some(n) => n,
1454                            None => {
1455                                // Blob data
1456                                let entry = WalkEntry {
1457                                    path,
1458                                    hash: cid.hash,
1459                                    link_type: LinkType::Blob,
1460                                    size: data.len() as u64,
1461                                    key: cid.key,
1462                                };
1463                                return Some((Ok(entry), WalkStreamState::Done));
1464                            }
1465                        };
1466
1467                        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1468                        let entry = WalkEntry {
1469                            path: path.clone(),
1470                            hash: cid.hash,
1471                            link_type: node.node_type,
1472                            size: node_size,
1473                            key: cid.key,
1474                        };
1475
1476                        // Create stack with children to process
1477                        let mut stack: Vec<WalkStackItem> = Vec::new();
1478                        for link in node.links.into_iter().rev() {
1479                            let child_path = match &link.name {
1480                                Some(name) if !name.starts_with('_') => {
1481                                    if path.is_empty() {
1482                                        name.clone()
1483                                    } else {
1484                                        format!("{}/{}", path, name)
1485                                    }
1486                                }
1487                                _ => path.clone(),
1488                            };
1489                            // Child nodes use their own key from link
1490                            stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1491                        }
1492
1493                        Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1494                    }
1495                    WalkStreamState::Processing { mut stack, tree } => {
1496                        tree.process_walk_stack(&mut stack).await
1497                    }
1498                    WalkStreamState::Done => None,
1499                }
1500            },
1501        ))
1502    }
1503
1504    async fn process_walk_stack<'a>(
1505        &'a self,
1506        stack: &mut Vec<WalkStackItem>,
1507    ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1508        while let Some(item) = stack.pop() {
1509            let data = match self.store.get(&item.hash).await {
1510                Ok(Some(d)) => d,
1511                Ok(None) => continue,
1512                Err(e) => {
1513                    return Some((
1514                        Err(HashTreeError::Store(e.to_string())),
1515                        WalkStreamState::Done,
1516                    ))
1517                }
1518            };
1519
1520            let node = match try_decode_tree_node(&data) {
1521                Some(n) => n,
1522                None => {
1523                    // Blob data
1524                    let entry = WalkEntry {
1525                        path: item.path,
1526                        hash: item.hash,
1527                        link_type: LinkType::Blob,
1528                        size: data.len() as u64,
1529                        key: item.key,
1530                    };
1531                    return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1532                }
1533            };
1534
1535            let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1536            let entry = WalkEntry {
1537                path: item.path.clone(),
1538                hash: item.hash,
1539                link_type: node.node_type,
1540                size: node_size,
1541                key: None, // directories are not encrypted
1542            };
1543
1544            // Push children to stack
1545            for link in node.links.into_iter().rev() {
1546                let child_path = match &link.name {
1547                    Some(name) if !name.starts_with('_') => {
1548                        if item.path.is_empty() {
1549                            name.clone()
1550                        } else {
1551                            format!("{}/{}", item.path, name)
1552                        }
1553                    }
1554                    _ => item.path.clone(),
1555                };
1556                stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1557            }
1558
1559            return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1560        }
1561        None
1562    }
1563
1564    // ============ EDIT ============
1565
1566    /// Add or update an entry in a directory
1567    /// Returns new root Cid (immutable operation)
1568    pub async fn set_entry(
1569        &self,
1570        root: &Cid,
1571        path: &[&str],
1572        name: &str,
1573        entry_cid: &Cid,
1574        size: u64,
1575        link_type: LinkType,
1576    ) -> Result<Cid, HashTreeError> {
1577        let dir_cid = self.resolve_path_array(root, path).await?;
1578        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1579
1580        let entries = self.list_directory(&dir_cid).await?;
1581        let mut new_entries: Vec<DirEntry> = entries
1582            .into_iter()
1583            .filter(|e| e.name != name)
1584            .map(|e| DirEntry {
1585                name: e.name,
1586                hash: e.hash,
1587                size: e.size,
1588                key: e.key,
1589                link_type: e.link_type,
1590                meta: e.meta,
1591            })
1592            .collect();
1593
1594        new_entries.push(DirEntry {
1595            name: name.to_string(),
1596            hash: entry_cid.hash,
1597            size,
1598            key: entry_cid.key,
1599            link_type,
1600            meta: None,
1601        });
1602
1603        let new_dir_cid = self.put_directory(new_entries).await?;
1604        self.rebuild_path(root, path, new_dir_cid).await
1605    }
1606
1607    /// Remove an entry from a directory
1608    /// Returns new root Cid
1609    pub async fn remove_entry(
1610        &self,
1611        root: &Cid,
1612        path: &[&str],
1613        name: &str,
1614    ) -> Result<Cid, HashTreeError> {
1615        let dir_cid = self.resolve_path_array(root, path).await?;
1616        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1617
1618        let entries = self.list_directory(&dir_cid).await?;
1619        let new_entries: Vec<DirEntry> = entries
1620            .into_iter()
1621            .filter(|e| e.name != name)
1622            .map(|e| DirEntry {
1623                name: e.name,
1624                hash: e.hash,
1625                size: e.size,
1626                key: e.key,
1627                link_type: e.link_type,
1628                meta: e.meta,
1629            })
1630            .collect();
1631
1632        let new_dir_cid = self.put_directory(new_entries).await?;
1633        self.rebuild_path(root, path, new_dir_cid).await
1634    }
1635
1636    /// Rename an entry in a directory
1637    /// Returns new root Cid
1638    pub async fn rename_entry(
1639        &self,
1640        root: &Cid,
1641        path: &[&str],
1642        old_name: &str,
1643        new_name: &str,
1644    ) -> Result<Cid, HashTreeError> {
1645        if old_name == new_name {
1646            return Ok(root.clone());
1647        }
1648
1649        let dir_cid = self.resolve_path_array(root, path).await?;
1650        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1651
1652        let entries = self.list_directory(&dir_cid).await?;
1653        let entry = entries
1654            .iter()
1655            .find(|e| e.name == old_name)
1656            .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1657
1658        let entry_hash = entry.hash;
1659        let entry_size = entry.size;
1660        let entry_key = entry.key;
1661        let entry_link_type = entry.link_type;
1662        let entry_meta = entry.meta.clone();
1663
1664        let new_entries: Vec<DirEntry> = entries
1665            .into_iter()
1666            .filter(|e| e.name != old_name)
1667            .map(|e| DirEntry {
1668                name: e.name,
1669                hash: e.hash,
1670                size: e.size,
1671                key: e.key,
1672                link_type: e.link_type,
1673                meta: e.meta,
1674            })
1675            .chain(std::iter::once(DirEntry {
1676                name: new_name.to_string(),
1677                hash: entry_hash,
1678                size: entry_size,
1679                key: entry_key,
1680                link_type: entry_link_type,
1681                meta: entry_meta,
1682            }))
1683            .collect();
1684
1685        let new_dir_cid = self.put_directory(new_entries).await?;
1686        self.rebuild_path(root, path, new_dir_cid).await
1687    }
1688
1689    /// Move an entry to a different directory
1690    /// Returns new root Cid
1691    pub async fn move_entry(
1692        &self,
1693        root: &Cid,
1694        source_path: &[&str],
1695        name: &str,
1696        target_path: &[&str],
1697    ) -> Result<Cid, HashTreeError> {
1698        let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1699        let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1700
1701        let source_entries = self.list_directory(&source_dir_cid).await?;
1702        let entry = source_entries
1703            .iter()
1704            .find(|e| e.name == name)
1705            .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1706
1707        let entry_cid = Cid {
1708            hash: entry.hash,
1709            key: entry.key,
1710        };
1711        let entry_size = entry.size;
1712        let entry_link_type = entry.link_type;
1713
1714        // Remove from source
1715        let new_root = self.remove_entry(root, source_path, name).await?;
1716
1717        // Add to target
1718        self.set_entry(&new_root, target_path, name, &entry_cid, entry_size, entry_link_type).await
1719    }
1720
1721    async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1722        if path.is_empty() {
1723            return Ok(Some(root.clone()));
1724        }
1725        self.resolve_path(root, &path.join("/")).await
1726    }
1727
1728    async fn rebuild_path(
1729        &self,
1730        root: &Cid,
1731        path: &[&str],
1732        new_child: Cid,
1733    ) -> Result<Cid, HashTreeError> {
1734        if path.is_empty() {
1735            return Ok(new_child);
1736        }
1737
1738        let mut child_cid = new_child;
1739        let parts: Vec<&str> = path.to_vec();
1740
1741        for i in (0..parts.len()).rev() {
1742            let child_name = parts[i];
1743            let parent_path = &parts[..i];
1744
1745            let parent_cid = if parent_path.is_empty() {
1746                root.clone()
1747            } else {
1748                self.resolve_path_array(root, parent_path)
1749                    .await?
1750                    .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1751            };
1752
1753            let parent_entries = self.list_directory(&parent_cid).await?;
1754            let new_parent_entries: Vec<DirEntry> = parent_entries
1755                .into_iter()
1756                .map(|e| {
1757                    if e.name == child_name {
1758                        DirEntry {
1759                            name: e.name,
1760                            hash: child_cid.hash,
1761                            size: 0, // Directories don't have a meaningful size in the link
1762                            key: child_cid.key,
1763                            link_type: e.link_type,
1764                            meta: e.meta,
1765                        }
1766                    } else {
1767                        DirEntry {
1768                            name: e.name,
1769                            hash: e.hash,
1770                            size: e.size,
1771                            key: e.key,
1772                            link_type: e.link_type,
1773                            meta: e.meta,
1774                        }
1775                    }
1776                })
1777                .collect();
1778
1779            child_cid = self.put_directory(new_parent_entries).await?;
1780        }
1781
1782        Ok(child_cid)
1783    }
1784
1785    // ============ UTILITY ============
1786
1787    /// Get the underlying store
1788    pub fn get_store(&self) -> Arc<S> {
1789        self.store.clone()
1790    }
1791
1792    /// Get chunk size configuration
1793    pub fn chunk_size(&self) -> usize {
1794        self.chunk_size
1795    }
1796
1797    /// Get max links configuration
1798    pub fn max_links(&self) -> usize {
1799        self.max_links
1800    }
1801}
1802
1803// Internal state types for streaming
1804
1805enum StreamStackItem {
1806    Hash(Hash),
1807}
1808
1809enum ReadStreamState<'a, S: Store> {
1810    Init { hash: Hash, tree: &'a HashTree<S> },
1811    Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1812    Done,
1813}
1814
1815struct WalkStackItem {
1816    hash: Hash,
1817    path: String,
1818    key: Option<[u8; 32]>,
1819}
1820
1821enum WalkStreamState<'a, S: Store> {
1822    Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1823    Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1824    Done,
1825}
1826
1827// Encrypted stream state types
1828struct EncryptedStackItem {
1829    hash: Hash,
1830    key: Option<[u8; 32]>,
1831}
1832
1833enum EncryptedStreamState<'a, S: Store> {
1834    Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1835    Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1836    Done,
1837}
1838
1839/// Verify tree integrity - checks that all referenced hashes exist
1840pub async fn verify_tree<S: Store>(
1841    store: Arc<S>,
1842    root_hash: &Hash,
1843) -> Result<crate::reader::VerifyResult, HashTreeError> {
1844    let mut missing = Vec::new();
1845    let mut visited = std::collections::HashSet::new();
1846
1847    verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1848
1849    Ok(crate::reader::VerifyResult {
1850        valid: missing.is_empty(),
1851        missing,
1852    })
1853}
1854
1855async fn verify_recursive<S: Store>(
1856    store: Arc<S>,
1857    hash: &Hash,
1858    missing: &mut Vec<Hash>,
1859    visited: &mut std::collections::HashSet<String>,
1860) -> Result<(), HashTreeError> {
1861    let hex = to_hex(hash);
1862    if visited.contains(&hex) {
1863        return Ok(());
1864    }
1865    visited.insert(hex);
1866
1867    let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1868        Some(d) => d,
1869        None => {
1870            missing.push(*hash);
1871            return Ok(());
1872        }
1873    };
1874
1875    if is_tree_node(&data) {
1876        let node = decode_tree_node(&data)?;
1877        for link in &node.links {
1878            Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1879        }
1880    }
1881
1882    Ok(())
1883}
1884
1885#[cfg(test)]
1886mod tests {
1887    use super::*;
1888    use crate::store::MemoryStore;
1889
1890    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1891        let store = Arc::new(MemoryStore::new());
1892        // Use public (unencrypted) mode for these tests
1893        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1894        (store, tree)
1895    }
1896
1897    #[tokio::test]
1898    async fn test_put_and_read_blob() {
1899        let (_store, tree) = make_tree();
1900
1901        let data = vec![1, 2, 3, 4, 5];
1902        let hash = tree.put_blob(&data).await.unwrap();
1903
1904        let result = tree.get_blob(&hash).await.unwrap();
1905        assert_eq!(result, Some(data));
1906    }
1907
1908    #[tokio::test]
1909    async fn test_put_and_read_file_small() {
1910        let (_store, tree) = make_tree();
1911
1912        let data = b"Hello, World!";
1913        let (cid, size) = tree.put_file(data).await.unwrap();
1914
1915        assert_eq!(size, data.len() as u64);
1916
1917        let read_data = tree.read_file(&cid.hash).await.unwrap();
1918        assert_eq!(read_data, Some(data.to_vec()));
1919    }
1920
1921    #[tokio::test]
1922    async fn test_put_and_read_directory() {
1923        let (_store, tree) = make_tree();
1924
1925        let file1 = tree.put_blob(b"content1").await.unwrap();
1926        let file2 = tree.put_blob(b"content2").await.unwrap();
1927
1928        let dir_cid = tree
1929            .put_directory(
1930                vec![
1931                    DirEntry::new("a.txt", file1).with_size(8),
1932                    DirEntry::new("b.txt", file2).with_size(8),
1933                ],
1934            )
1935            .await
1936            .unwrap();
1937
1938        let entries = tree.list_directory(&dir_cid).await.unwrap();
1939        assert_eq!(entries.len(), 2);
1940        let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1941        assert!(names.contains(&"a.txt"));
1942        assert!(names.contains(&"b.txt"));
1943    }
1944
1945    #[tokio::test]
1946    async fn test_is_directory() {
1947        let (_store, tree) = make_tree();
1948
1949        let file_hash = tree.put_blob(b"data").await.unwrap();
1950        let dir_cid = tree.put_directory(vec![]).await.unwrap();
1951
1952        assert!(!tree.is_directory(&file_hash).await.unwrap());
1953        assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1954    }
1955
1956    #[tokio::test]
1957    async fn test_resolve_path() {
1958        let (_store, tree) = make_tree();
1959
1960        let file_hash = tree.put_blob(b"nested").await.unwrap();
1961        let sub_dir = tree.put_directory(
1962            vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1963        ).await.unwrap();
1964        let root_dir = tree.put_directory(
1965            vec![DirEntry::new("subdir", sub_dir.hash)],
1966        ).await.unwrap();
1967
1968        let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1969        assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1970    }
1971
1972    // ============ UNIFIED API TESTS ============
1973
1974    #[tokio::test]
1975    async fn test_unified_put_get_public() {
1976        let store = Arc::new(MemoryStore::new());
1977        // Use .public() to disable encryption
1978        let tree = HashTree::new(HashTreeConfig::new(store).public());
1979
1980        let data = b"Hello, public world!";
1981        let (cid, size) = tree.put(data).await.unwrap();
1982
1983        assert_eq!(size, data.len() as u64);
1984        assert!(cid.key.is_none()); // No key for public content
1985
1986        let retrieved = tree.get(&cid).await.unwrap().unwrap();
1987        assert_eq!(retrieved, data);
1988    }
1989
1990    #[tokio::test]
1991    async fn test_unified_put_get_encrypted() {
1992        let store = Arc::new(MemoryStore::new());
1993        // Default config has encryption enabled
1994        let tree = HashTree::new(HashTreeConfig::new(store));
1995
1996        let data = b"Hello, encrypted world!";
1997        let (cid, size) = tree.put(data).await.unwrap();
1998
1999        assert_eq!(size, data.len() as u64);
2000        assert!(cid.key.is_some()); // Has encryption key
2001
2002        let retrieved = tree.get(&cid).await.unwrap().unwrap();
2003        assert_eq!(retrieved, data);
2004    }
2005
2006    #[tokio::test]
2007    async fn test_unified_put_get_encrypted_chunked() {
2008        let store = Arc::new(MemoryStore::new());
2009        let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2010
2011        // Data larger than chunk size
2012        let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2013        let (cid, size) = tree.put(&data).await.unwrap();
2014
2015        assert_eq!(size, data.len() as u64);
2016        assert!(cid.key.is_some());
2017
2018        let retrieved = tree.get(&cid).await.unwrap().unwrap();
2019        assert_eq!(retrieved, data);
2020    }
2021
2022    #[tokio::test]
2023    async fn test_cid_deterministic() {
2024        let store = Arc::new(MemoryStore::new());
2025        let tree = HashTree::new(HashTreeConfig::new(store));
2026
2027        let data = b"Same content produces same CID";
2028
2029        let (cid1, _) = tree.put(data).await.unwrap();
2030        let (cid2, _) = tree.put(data).await.unwrap();
2031
2032        // CHK: same content = same hash AND same key
2033        assert_eq!(cid1.hash, cid2.hash);
2034        assert_eq!(cid1.key, cid2.key);
2035        assert_eq!(cid1.to_string(), cid2.to_string());
2036    }
2037
2038    #[tokio::test]
2039    async fn test_cid_to_string_public() {
2040        let store = Arc::new(MemoryStore::new());
2041        let tree = HashTree::new(HashTreeConfig::new(store).public());
2042
2043        let (cid, _) = tree.put(b"test").await.unwrap();
2044        let s = cid.to_string();
2045
2046        // Public CID is just the hash (64 hex chars)
2047        assert_eq!(s.len(), 64);
2048        assert!(!s.contains(':'));
2049    }
2050
2051    #[tokio::test]
2052    async fn test_cid_to_string_encrypted() {
2053        let store = Arc::new(MemoryStore::new());
2054        let tree = HashTree::new(HashTreeConfig::new(store));
2055
2056        let (cid, _) = tree.put(b"test").await.unwrap();
2057        let s = cid.to_string();
2058
2059        // Encrypted CID is "hash:key" (64 + 1 + 64 = 129 chars)
2060        assert_eq!(s.len(), 129);
2061        assert!(s.contains(':'));
2062    }
2063}