hashtree_core/
hashtree.rs

1//! HashTree - Unified merkle tree operations
2//!
3//! Single struct for creating, reading, and editing content-addressed merkle trees.
4//! Mirrors the hashtree-ts HashTree class API.
5
6use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22/// HashTree configuration
23#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25    pub store: Arc<S>,
26    pub chunk_size: usize,
27    pub max_links: usize,
28    /// Whether to encrypt content (default: true when encryption feature enabled)
29    pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33    pub fn new(store: Arc<S>) -> Self {
34        Self {
35            store,
36            chunk_size: DEFAULT_CHUNK_SIZE,
37            max_links: DEFAULT_MAX_LINKS,
38            encrypted: true,
39        }
40    }
41
42    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43        self.chunk_size = chunk_size;
44        self
45    }
46
47    pub fn with_max_links(mut self, max_links: usize) -> Self {
48        self.max_links = max_links;
49        self
50    }
51
52    /// Disable encryption (store content publicly)
53    pub fn public(mut self) -> Self {
54        self.encrypted = false;
55        self
56    }
57}
58
59/// Result of put_file operation
60#[derive(Debug, Clone)]
61pub struct PutFileResult {
62    pub hash: Hash,
63    pub size: u64,
64}
65
66/// HashTree error type
67#[derive(Debug, thiserror::Error)]
68pub enum HashTreeError {
69    #[error("Store error: {0}")]
70    Store(String),
71    #[error("Codec error: {0}")]
72    Codec(#[from] crate::codec::CodecError),
73    #[error("Missing chunk: {0}")]
74    MissingChunk(String),
75    #[error("Path not found: {0}")]
76    PathNotFound(String),
77    #[error("Entry not found: {0}")]
78    EntryNotFound(String),
79    #[error("Encryption error: {0}")]
80    Encryption(String),
81    #[error("Decryption error: {0}")]
82    Decryption(String),
83}
84
85impl From<BuilderError> for HashTreeError {
86    fn from(e: BuilderError) -> Self {
87        match e {
88            BuilderError::Store(s) => HashTreeError::Store(s),
89            BuilderError::Codec(c) => HashTreeError::Codec(c),
90            BuilderError::Encryption(s) => HashTreeError::Encryption(s),
91        }
92    }
93}
94
95impl From<ReaderError> for HashTreeError {
96    fn from(e: ReaderError) -> Self {
97        match e {
98            ReaderError::Store(s) => HashTreeError::Store(s),
99            ReaderError::Codec(c) => HashTreeError::Codec(c),
100            ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
101            ReaderError::Decryption(s) => HashTreeError::Encryption(s),
102            ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
103        }
104    }
105}
106
107/// HashTree - unified create, read, and edit merkle tree operations
108pub struct HashTree<S: Store> {
109    store: Arc<S>,
110    chunk_size: usize,
111    max_links: usize,
112    encrypted: bool,
113}
114
115impl<S: Store> HashTree<S> {
116    pub fn new(config: HashTreeConfig<S>) -> Self {
117        Self {
118            store: config.store,
119            chunk_size: config.chunk_size,
120            max_links: config.max_links,
121            encrypted: config.encrypted,
122        }
123    }
124
125    /// Check if encryption is enabled
126    pub fn is_encrypted(&self) -> bool {
127        self.encrypted
128    }
129
130    // ============ UNIFIED API ============
131
132    /// Store content, returns Cid (hash + optional key)
133    /// Encrypts by default when encryption feature is enabled
134    pub async fn put(&self, data: &[u8]) -> Result<Cid, HashTreeError> {
135        let size = data.len() as u64;
136
137        // Small data - store as single chunk
138        if data.len() <= self.chunk_size {
139            let (hash, key) = self.put_chunk_internal(data).await?;
140            return Ok(Cid { hash, key, size });
141        }
142
143        // Large data - chunk it
144        let mut links: Vec<Link> = Vec::new();
145        let mut offset = 0;
146
147        while offset < data.len() {
148            let end = (offset + self.chunk_size).min(data.len());
149            let chunk = &data[offset..end];
150            let chunk_size = chunk.len() as u64;
151            let (hash, key) = self.put_chunk_internal(chunk).await?;
152            links.push(Link {
153                hash,
154                name: None,
155                size: chunk_size,
156                key,
157                link_type: LinkType::Blob, // Leaf chunk (raw blob)
158                meta: None,
159            });
160            offset = end;
161        }
162
163        // Build tree from chunks
164        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
165        Ok(Cid { hash: root_hash, key: root_key, size })
166    }
167
168    /// Get content by Cid (handles decryption automatically)
169    pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
170        if let Some(key) = cid.key {
171            self.get_encrypted(&cid.hash, &key).await
172        } else {
173            self.read_file(&cid.hash).await
174        }
175    }
176
177    /// Store content from an async reader (streaming put)
178    ///
179    /// Reads data in chunks and builds a merkle tree incrementally.
180    /// Useful for large files or streaming data sources.
181    pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<Cid, HashTreeError> {
182        let mut buffer = vec![0u8; self.chunk_size];
183        let mut links = Vec::new();
184        let mut total_size: u64 = 0;
185        let mut consistent_key: Option<[u8; 32]> = None;
186
187        loop {
188            let mut chunk = Vec::new();
189            let mut bytes_read = 0;
190
191            // Read until we have a full chunk or EOF
192            while bytes_read < self.chunk_size {
193                let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
194                    .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
195                if n == 0 {
196                    break; // EOF
197                }
198                chunk.extend_from_slice(&buffer[..n]);
199                bytes_read += n;
200            }
201
202            if chunk.is_empty() {
203                break; // No more data
204            }
205
206            let chunk_len = chunk.len() as u64;
207            total_size += chunk_len;
208
209            let (hash, key) = self.put_chunk_internal(&chunk).await?;
210
211            // Track consistent key for single-key result
212            if links.is_empty() {
213                consistent_key = key;
214            } else if consistent_key != key {
215                consistent_key = None;
216            }
217
218            links.push(Link {
219                hash,
220                name: None,
221                size: chunk_len,
222                key,
223                link_type: LinkType::Blob, // Leaf chunk (raw blob)
224                meta: None,
225            });
226        }
227
228        if links.is_empty() {
229            // Empty input
230            let (hash, key) = self.put_chunk_internal(&[]).await?;
231            return Ok(Cid { hash, key, size: 0 });
232        }
233
234        // Build tree from chunks
235        let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
236        Ok(Cid { hash: root_hash, key: root_key, size: total_size })
237    }
238
239    /// Read content as a stream of chunks by Cid (handles decryption automatically)
240    ///
241    /// Returns an async stream that yields chunks as they are read.
242    /// Useful for large files or when you want to process data incrementally.
243    pub fn get_stream(
244        &self,
245        cid: &Cid,
246    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
247        let hash = cid.hash;
248        let key = cid.key;
249
250        if let Some(k) = key {
251            // Encrypted stream
252            Box::pin(self.read_file_stream_encrypted(hash, k))
253        } else {
254            // Unencrypted stream
255            self.read_file_stream(hash)
256        }
257    }
258
259    /// Read encrypted file as stream (internal)
260    fn read_file_stream_encrypted(
261        &self,
262        hash: Hash,
263        key: EncryptionKey,
264    ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
265        stream::unfold(
266            EncryptedStreamState::Init { hash, key, tree: self },
267            |state| async move {
268                match state {
269                    EncryptedStreamState::Init { hash, key, tree } => {
270                        let data = match tree.store.get(&hash).await {
271                            Ok(Some(d)) => d,
272                            Ok(None) => return None,
273                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
274                        };
275
276                        // Try to decrypt
277                        let decrypted = match decrypt_chk(&data, &key) {
278                            Ok(d) => d,
279                            Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
280                        };
281
282                        if !is_tree_node(&decrypted) {
283                            // Single blob - yield decrypted data
284                            return Some((Ok(decrypted), EncryptedStreamState::Done));
285                        }
286
287                        // Tree node - parse and traverse
288                        let node = match decode_tree_node(&decrypted) {
289                            Ok(n) => n,
290                            Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
291                        };
292
293                        let mut stack: Vec<EncryptedStackItem> = Vec::new();
294                        for link in node.links.into_iter().rev() {
295                            stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
296                        }
297
298                        tree.process_encrypted_stream_stack(&mut stack).await
299                    }
300                    EncryptedStreamState::Processing { mut stack, tree } => {
301                        tree.process_encrypted_stream_stack(&mut stack).await
302                    }
303                    EncryptedStreamState::Done => None,
304                }
305            },
306        )
307    }
308
309    async fn process_encrypted_stream_stack<'a>(
310        &'a self,
311        stack: &mut Vec<EncryptedStackItem>,
312    ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
313        while let Some(item) = stack.pop() {
314            let data = match self.store.get(&item.hash).await {
315                Ok(Some(d)) => d,
316                Ok(None) => {
317                    return Some((
318                        Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
319                        EncryptedStreamState::Done,
320                    ))
321                }
322                Err(e) => {
323                    return Some((
324                        Err(HashTreeError::Store(e.to_string())),
325                        EncryptedStreamState::Done,
326                    ))
327                }
328            };
329
330            // Decrypt if we have a key
331            let decrypted = if let Some(key) = item.key {
332                match decrypt_chk(&data, &key) {
333                    Ok(d) => d,
334                    Err(e) => {
335                        return Some((
336                            Err(HashTreeError::Decryption(e.to_string())),
337                            EncryptedStreamState::Done,
338                        ))
339                    }
340                }
341            } else {
342                data
343            };
344
345            if is_tree_node(&decrypted) {
346                // Nested tree node - add children to stack
347                let node = match decode_tree_node(&decrypted) {
348                    Ok(n) => n,
349                    Err(e) => {
350                        return Some((
351                            Err(HashTreeError::Codec(e)),
352                            EncryptedStreamState::Done,
353                        ))
354                    }
355                };
356                for link in node.links.into_iter().rev() {
357                    stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
358                }
359            } else {
360                // Leaf chunk - yield decrypted data
361                return Some((
362                    Ok(decrypted),
363                    EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
364                ));
365            }
366        }
367        None
368    }
369
370    /// Store a chunk with optional encryption
371    async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
372        if self.encrypted {
373            let (encrypted, key) = encrypt_chk(data)
374                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
375            let hash = sha256(&encrypted);
376            self.store
377                .put(hash, encrypted)
378                .await
379                .map_err(|e| HashTreeError::Store(e.to_string()))?;
380            Ok((hash, Some(key)))
381        } else {
382            let hash = self.put_blob(data).await?;
383            Ok((hash, None))
384        }
385    }
386
387    /// Build tree and return (hash, optional_key)
388    async fn build_tree_internal(
389        &self,
390        links: Vec<Link>,
391        total_size: Option<u64>,
392    ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
393        // Single link with matching size - return directly
394        if links.len() == 1 {
395            if let Some(ts) = total_size {
396                if links[0].size == ts {
397                    return Ok((links[0].hash, links[0].key));
398                }
399            }
400        }
401
402        if links.len() <= self.max_links {
403            let node = TreeNode {
404                node_type: LinkType::File,
405                links,
406            };
407            let (data, _) = encode_and_hash(&node)?;
408
409            if self.encrypted {
410                let (encrypted, key) = encrypt_chk(&data)
411                    .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
412                let hash = sha256(&encrypted);
413                self.store
414                    .put(hash, encrypted)
415                    .await
416                    .map_err(|e| HashTreeError::Store(e.to_string()))?;
417                return Ok((hash, Some(key)));
418            }
419
420            // Unencrypted path
421            let hash = sha256(&data);
422            self.store
423                .put(hash, data)
424                .await
425                .map_err(|e| HashTreeError::Store(e.to_string()))?;
426            return Ok((hash, None));
427        }
428
429        // Too many links - create subtrees
430        let mut sub_links = Vec::new();
431        for batch in links.chunks(self.max_links) {
432            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
433            let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
434            sub_links.push(Link {
435                hash,
436                name: None,
437                size: batch_size,
438                key,
439                link_type: LinkType::File, // Internal tree node
440                meta: None,
441            });
442        }
443
444        Box::pin(self.build_tree_internal(sub_links, total_size)).await
445    }
446
447    /// Get encrypted content by hash and key
448    async fn get_encrypted(
449        &self,
450        hash: &Hash,
451        key: &EncryptionKey,
452    ) -> Result<Option<Vec<u8>>, HashTreeError> {
453        let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
454            Some(d) => d,
455            None => return Ok(None),
456        };
457
458        // Decrypt the data
459        let decrypted = decrypt_chk(&encrypted_data, key)
460            .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
461
462        // Check if it's a tree node
463        if is_tree_node(&decrypted) {
464            let node = decode_tree_node(&decrypted)?;
465            let assembled = self.assemble_encrypted_chunks(&node).await?;
466            return Ok(Some(assembled));
467        }
468
469        // Single chunk data
470        Ok(Some(decrypted))
471    }
472
473    /// Assemble encrypted chunks from tree
474    async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
475        let mut parts: Vec<Vec<u8>> = Vec::new();
476
477        for link in &node.links {
478            let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
479
480            let encrypted_child = self
481                .store
482                .get(&link.hash)
483                .await
484                .map_err(|e| HashTreeError::Store(e.to_string()))?
485                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
486
487            let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
488                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
489
490            if is_tree_node(&decrypted) {
491                // Intermediate tree node - recurse
492                let child_node = decode_tree_node(&decrypted)?;
493                let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
494                parts.push(child_data);
495            } else {
496                // Leaf data chunk
497                parts.push(decrypted);
498            }
499        }
500
501        let total_len: usize = parts.iter().map(|p| p.len()).sum();
502        let mut result = Vec::with_capacity(total_len);
503        for part in parts {
504            result.extend_from_slice(&part);
505        }
506
507        Ok(result)
508    }
509
510    // ============ LOW-LEVEL CREATE ============
511
512    /// Store a blob directly (small data, no encryption)
513    /// Returns the content hash
514    pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
515        let hash = sha256(data);
516        self.store
517            .put(hash, data.to_vec())
518            .await
519            .map_err(|e| HashTreeError::Store(e.to_string()))?;
520        Ok(hash)
521    }
522
523    /// Store a file, chunking if necessary
524    /// Returns root hash and total size
525    pub async fn put_file(&self, data: &[u8]) -> Result<PutFileResult, HashTreeError> {
526        let size = data.len() as u64;
527
528        // Small file - store as single blob
529        if data.len() <= self.chunk_size {
530            let hash = self.put_blob(data).await?;
531            return Ok(PutFileResult { hash, size });
532        }
533
534        // Large file - chunk it
535        let mut chunk_hashes: Vec<Hash> = Vec::new();
536        let mut offset = 0;
537
538        while offset < data.len() {
539            let end = (offset + self.chunk_size).min(data.len());
540            let chunk = &data[offset..end];
541            let hash = self.put_blob(chunk).await?;
542            chunk_hashes.push(hash);
543            offset = end;
544        }
545
546        // Build tree from chunks
547        let chunks: Vec<Link> = chunk_hashes
548            .iter()
549            .enumerate()
550            .map(|(i, &hash)| {
551                let chunk_size = if i < chunk_hashes.len() - 1 {
552                    self.chunk_size as u64
553                } else {
554                    (data.len() - i * self.chunk_size) as u64
555                };
556                Link {
557                    hash,
558                    name: None,
559                    size: chunk_size,
560                    key: None,
561                    link_type: LinkType::Blob, // Leaf chunk (raw blob)
562                    meta: None,
563                }
564            })
565            .collect();
566
567        let root_hash = self.build_tree(chunks, Some(size)).await?;
568        Ok(PutFileResult { hash: root_hash, size })
569    }
570
571    /// Build a directory from entries
572    /// Returns Cid with key if encrypted
573    ///
574    /// For large directories, the messagepack-encoded TreeNode is stored via put()
575    /// which automatically chunks the data. The reader uses read_file() to reassemble.
576    pub async fn put_directory(
577        &self,
578        entries: Vec<DirEntry>,
579    ) -> Result<Cid, HashTreeError> {
580        // Sort entries by name for deterministic hashing
581        let mut sorted = entries;
582        sorted.sort_by(|a, b| a.name.cmp(&b.name));
583
584        let links: Vec<Link> = sorted
585            .into_iter()
586            .map(|e| Link {
587                hash: e.hash,
588                name: Some(e.name),
589                size: e.size,
590                key: e.key,
591                link_type: e.link_type,
592                meta: e.meta,
593            })
594            .collect();
595
596        let total_size: u64 = links.iter().map(|l| l.size).sum();
597
598        // Create the directory node with all entries
599        let node = TreeNode {
600            node_type: LinkType::Dir,
601            links,
602        };
603        let (data, _plain_hash) = encode_and_hash(&node)?;
604
605        // Store directory data via put() - handles both small and large directories
606        // For small dirs, stores as single chunk
607        // For large dirs, chunks transparently via build_tree()
608        // Reader uses read_file() to reassemble before decoding
609        let cid = self.put(&data).await?;
610        Ok(Cid { hash: cid.hash, key: cid.key, size: total_size })
611    }
612
613    /// Build a balanced tree from links
614    async fn build_tree(&self, links: Vec<Link>, total_size: Option<u64>) -> Result<Hash, HashTreeError> {
615        // Single link with matching size - return it directly
616        if links.len() == 1 {
617            if let Some(ts) = total_size {
618                if links[0].size == ts {
619                    return Ok(links[0].hash);
620                }
621            }
622        }
623
624        // Fits in one node
625        if links.len() <= self.max_links {
626            let node = TreeNode {
627                node_type: LinkType::File,
628                links,
629            };
630            let (data, hash) = encode_and_hash(&node)?;
631            self.store
632                .put(hash, data)
633                .await
634                .map_err(|e| HashTreeError::Store(e.to_string()))?;
635            return Ok(hash);
636        }
637
638        // Need to split into sub-trees
639        let mut sub_trees: Vec<Link> = Vec::new();
640
641        for batch in links.chunks(self.max_links) {
642            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
643
644            let node = TreeNode {
645                node_type: LinkType::File,
646                links: batch.to_vec(),
647            };
648            let (data, hash) = encode_and_hash(&node)?;
649            self.store
650                .put(hash, data)
651                .await
652                .map_err(|e| HashTreeError::Store(e.to_string()))?;
653
654            sub_trees.push(Link {
655                hash,
656                name: None,
657                size: batch_size,
658                key: None,
659                link_type: LinkType::File, // Internal tree node
660                meta: None,
661            });
662        }
663
664        // Recursively build parent level
665        Box::pin(self.build_tree(sub_trees, total_size)).await
666    }
667
668    /// Create a tree node with custom links
669    pub async fn put_tree_node(
670        &self,
671        links: Vec<Link>,
672    ) -> Result<Hash, HashTreeError> {
673        let node = TreeNode {
674            node_type: LinkType::Dir,
675            links,
676        };
677
678        let (data, hash) = encode_and_hash(&node)?;
679        self.store
680            .put(hash, data)
681            .await
682            .map_err(|e| HashTreeError::Store(e.to_string()))?;
683        Ok(hash)
684    }
685
686    // ============ READ ============
687
688    /// Get raw data by hash
689    pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
690        self.store
691            .get(hash)
692            .await
693            .map_err(|e| HashTreeError::Store(e.to_string()))
694    }
695
696    /// Get and decode a tree node (unencrypted)
697    pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
698        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
699            Some(d) => d,
700            None => return Ok(None),
701        };
702
703        if !is_tree_node(&data) {
704            return Ok(None);
705        }
706
707        let node = decode_tree_node(&data)?;
708        Ok(Some(node))
709    }
710
711    /// Get and decode a tree node using Cid (with decryption if key present)
712    pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
713        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
714            Some(d) => d,
715            None => return Ok(None),
716        };
717
718        // Decrypt if key is present
719        let decrypted = if let Some(key) = &cid.key {
720            decrypt_chk(&data, key)
721                .map_err(|e| HashTreeError::Decryption(e.to_string()))?
722        } else {
723            data
724        };
725
726        if !is_tree_node(&decrypted) {
727            return Ok(None);
728        }
729
730        let node = decode_tree_node(&decrypted)?;
731        Ok(Some(node))
732    }
733
734    /// Get directory node, handling chunked directory data
735    /// Use this when you know the target is a directory (from parent link_type)
736    pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
737        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
738            Some(d) => d,
739            None => return Ok(None),
740        };
741
742        // Decrypt if key is present
743        let decrypted = if let Some(key) = &cid.key {
744            decrypt_chk(&data, key)
745                .map_err(|e| HashTreeError::Decryption(e.to_string()))?
746        } else {
747            data
748        };
749
750        if !is_tree_node(&decrypted) {
751            return Ok(None);
752        }
753
754        let node = decode_tree_node(&decrypted)?;
755
756        // If this is a file tree (chunked data), reassemble to get actual directory
757        if node.node_type == LinkType::File {
758            let assembled = self.assemble_chunks(&node).await?;
759            if is_tree_node(&assembled) {
760                let inner_node = decode_tree_node(&assembled)?;
761                return Ok(Some(inner_node));
762            }
763        }
764
765        Ok(Some(node))
766    }
767
768    /// Check if hash points to a tree node (no decryption)
769    pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
770        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
771            Some(d) => d,
772            None => return Ok(false),
773        };
774        Ok(is_tree_node(&data))
775    }
776
777    /// Check if Cid points to a directory (with decryption)
778    pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
779        let node = match self.get_node(cid).await? {
780            Some(n) => n,
781            None => return Ok(false),
782        };
783        // Directory has named links (not just internal chunks)
784        Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
785    }
786
787    /// Check if hash points to a directory (tree with named links, no decryption)
788    pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
789        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
790            Some(d) => d,
791            None => return Ok(false),
792        };
793        Ok(is_directory_node(&data))
794    }
795
796    /// Read a complete file (reassemble chunks if needed)
797    pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
798        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
799            Some(d) => d,
800            None => return Ok(None),
801        };
802
803        // Check if it's a tree (chunked file) or raw blob
804        if !is_tree_node(&data) {
805            return Ok(Some(data));
806        }
807
808        // It's a tree - reassemble chunks
809        let node = decode_tree_node(&data)?;
810        let assembled = self.assemble_chunks(&node).await?;
811        Ok(Some(assembled))
812    }
813
814    /// Recursively assemble chunks from tree
815    async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
816        let mut parts: Vec<Vec<u8>> = Vec::new();
817
818        for link in &node.links {
819            let child_data = self
820                .store
821                .get(&link.hash)
822                .await
823                .map_err(|e| HashTreeError::Store(e.to_string()))?
824                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
825
826            if is_tree_node(&child_data) {
827                let child_node = decode_tree_node(&child_data)?;
828                parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
829            } else {
830                parts.push(child_data);
831            }
832        }
833
834        // Concatenate all parts
835        let total_length: usize = parts.iter().map(|p| p.len()).sum();
836        let mut result = Vec::with_capacity(total_length);
837        for part in parts {
838            result.extend_from_slice(&part);
839        }
840
841        Ok(result)
842    }
843
844    /// Read a file as stream of chunks
845    /// Returns an async stream that yields chunks as they are read
846    pub fn read_file_stream(
847        &self,
848        hash: Hash,
849    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
850        Box::pin(stream::unfold(
851            ReadStreamState::Init { hash, tree: self },
852            |state| async move {
853                match state {
854                    ReadStreamState::Init { hash, tree } => {
855                        let data = match tree.store.get(&hash).await {
856                            Ok(Some(d)) => d,
857                            Ok(None) => return None,
858                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
859                        };
860
861                        if !is_tree_node(&data) {
862                            // Single blob - yield it and finish
863                            return Some((Ok(data), ReadStreamState::Done));
864                        }
865
866                        // Tree node - start streaming chunks
867                        let node = match decode_tree_node(&data) {
868                            Ok(n) => n,
869                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
870                        };
871
872                        // Create stack with all links to process
873                        let mut stack: Vec<StreamStackItem> = Vec::new();
874                        for link in node.links.into_iter().rev() {
875                            stack.push(StreamStackItem::Hash(link.hash));
876                        }
877
878                        // Process first item
879                        tree.process_stream_stack(&mut stack).await
880                    }
881                    ReadStreamState::Processing { mut stack, tree } => {
882                        tree.process_stream_stack(&mut stack).await
883                    }
884                    ReadStreamState::Done => None,
885                }
886            },
887        ))
888    }
889
890    async fn process_stream_stack<'a>(
891        &'a self,
892        stack: &mut Vec<StreamStackItem>,
893    ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
894        while let Some(item) = stack.pop() {
895            match item {
896                StreamStackItem::Hash(hash) => {
897                    let data = match self.store.get(&hash).await {
898                        Ok(Some(d)) => d,
899                        Ok(None) => {
900                            return Some((
901                                Err(HashTreeError::MissingChunk(to_hex(&hash))),
902                                ReadStreamState::Done,
903                            ))
904                        }
905                        Err(e) => {
906                            return Some((
907                                Err(HashTreeError::Store(e.to_string())),
908                                ReadStreamState::Done,
909                            ))
910                        }
911                    };
912
913                    if is_tree_node(&data) {
914                        // Nested tree - push its children to stack
915                        let node = match decode_tree_node(&data) {
916                            Ok(n) => n,
917                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
918                        };
919                        for link in node.links.into_iter().rev() {
920                            stack.push(StreamStackItem::Hash(link.hash));
921                        }
922                    } else {
923                        // Leaf blob - yield it
924                        return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
925                    }
926                }
927            }
928        }
929        None
930    }
931
932    /// Read file chunks as Vec (non-streaming version)
933    pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
934        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
935            Some(d) => d,
936            None => return Ok(vec![]),
937        };
938
939        if !is_tree_node(&data) {
940            return Ok(vec![data]);
941        }
942
943        let node = decode_tree_node(&data)?;
944        self.collect_chunks(&node).await
945    }
946
947    async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
948        let mut chunks = Vec::new();
949
950        for link in &node.links {
951            let child_data = self
952                .store
953                .get(&link.hash)
954                .await
955                .map_err(|e| HashTreeError::Store(e.to_string()))?
956                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
957
958            if is_tree_node(&child_data) {
959                let child_node = decode_tree_node(&child_data)?;
960                chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
961            } else {
962                chunks.push(child_data);
963            }
964        }
965
966        Ok(chunks)
967    }
968
969    /// List directory entries (Cid-based, supports encrypted directories)
970    pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
971        let node = match self.get_node(cid).await? {
972            Some(n) => n,
973            None => return Ok(vec![]),
974        };
975
976        let mut entries = Vec::new();
977
978        for link in &node.links {
979            // Skip internal chunk nodes - recurse into them
980            if let Some(ref name) = link.name {
981                if name.starts_with("_chunk_") || name.starts_with('_') {
982                    let chunk_cid = Cid { hash: link.hash, key: link.key, size: link.size };
983                    let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
984                    entries.extend(sub_entries);
985                    continue;
986                }
987            }
988
989            entries.push(TreeEntry {
990                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
991                hash: link.hash,
992                size: link.size,
993                link_type: link.link_type,
994                key: link.key,
995                meta: link.meta.clone(),
996            });
997        }
998
999        Ok(entries)
1000    }
1001
1002    /// List directory entries using Cid (with decryption if key present)
1003    /// Handles both regular and chunked directory data
1004    pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1005        // Use get_directory_node which handles chunked directory data
1006        let node = match self.get_directory_node(cid).await? {
1007            Some(n) => n,
1008            None => return Ok(vec![]),
1009        };
1010
1011        let mut entries = Vec::new();
1012
1013        for link in &node.links {
1014            // Skip internal chunk nodes (backwards compat with old _chunk_ format)
1015            if let Some(ref name) = link.name {
1016                if name.starts_with("_chunk_") || name.starts_with('_') {
1017                    // Internal nodes inherit parent's key for decryption
1018                    let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1019                    let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1020                    entries.extend(sub_entries);
1021                    continue;
1022                }
1023            }
1024
1025            entries.push(TreeEntry {
1026                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1027                hash: link.hash,
1028                size: link.size,
1029                link_type: link.link_type,
1030                key: link.key,
1031                meta: link.meta.clone(),
1032            });
1033        }
1034
1035        Ok(entries)
1036    }
1037
1038    /// Resolve a path within a tree (returns Cid with key if encrypted)
1039    pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1040        let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1041        if parts.is_empty() {
1042            return Ok(Some(cid.clone()));
1043        }
1044
1045        let mut current_cid = cid.clone();
1046
1047        for part in parts {
1048            // Use get_directory_node which handles chunked directory data
1049            let node = match self.get_directory_node(&current_cid).await? {
1050                Some(n) => n,
1051                None => return Ok(None),
1052            };
1053
1054            if let Some(link) = self.find_link(&node, part) {
1055                current_cid = Cid {
1056                    hash: link.hash,
1057                    key: link.key,
1058                    size: link.size,
1059                };
1060            } else {
1061                // Check internal nodes
1062                match self.find_link_in_subtrees_cid(&node, part, &current_cid).await? {
1063                    Some(link) => {
1064                        current_cid = Cid {
1065                            hash: link.hash,
1066                            key: link.key,
1067                            size: link.size,
1068                        };
1069                    }
1070                    None => return Ok(None),
1071                }
1072            }
1073        }
1074
1075        Ok(Some(current_cid))
1076    }
1077
1078    /// Resolve a path within a tree using Cid (with decryption if key present)
1079    pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1080        self.resolve(cid, path).await
1081    }
1082
1083    fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1084        node.links
1085            .iter()
1086            .find(|l| l.name.as_deref() == Some(name))
1087            .cloned()
1088    }
1089
1090    /// Find a link in subtrees using Cid (with decryption support)
1091    async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1092        for link in &node.links {
1093            if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1094                continue;
1095            }
1096
1097            // Internal nodes inherit encryption from parent context
1098            let sub_cid = Cid {
1099                hash: link.hash,
1100                key: link.key.clone(),
1101                size: link.size,
1102            };
1103
1104            let sub_node = match self.get_node(&sub_cid).await? {
1105                Some(n) => n,
1106                None => continue,
1107            };
1108
1109            if let Some(found) = self.find_link(&sub_node, name) {
1110                return Ok(Some(found));
1111            }
1112
1113            if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1114                return Ok(Some(deep_found));
1115            }
1116        }
1117
1118        Ok(None)
1119    }
1120
1121    /// Get total size of a tree
1122    pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1123        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1124            Some(d) => d,
1125            None => return Ok(0),
1126        };
1127
1128        if !is_tree_node(&data) {
1129            return Ok(data.len() as u64);
1130        }
1131
1132        let node = decode_tree_node(&data)?;
1133        // Calculate from children
1134        let mut total = 0u64;
1135        for link in &node.links {
1136            total += link.size;
1137        }
1138        Ok(total)
1139    }
1140
1141    /// Walk entire tree depth-first (returns Vec)
1142    pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1143        let mut entries = Vec::new();
1144        self.walk_recursive(cid, path, &mut entries).await?;
1145        Ok(entries)
1146    }
1147
1148    async fn walk_recursive(
1149        &self,
1150        cid: &Cid,
1151        path: &str,
1152        entries: &mut Vec<WalkEntry>,
1153    ) -> Result<(), HashTreeError> {
1154        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1155            Some(d) => d,
1156            None => return Ok(()),
1157        };
1158
1159        // Decrypt if key is present
1160        let data = if let Some(key) = &cid.key {
1161            decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1162        } else {
1163            data
1164        };
1165
1166        let node = match try_decode_tree_node(&data) {
1167            Some(n) => n,
1168            None => {
1169                entries.push(WalkEntry {
1170                    path: path.to_string(),
1171                    hash: cid.hash,
1172                    link_type: LinkType::Blob,
1173                    size: data.len() as u64,
1174                    key: cid.key,
1175                });
1176                return Ok(());
1177            }
1178        };
1179
1180        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1181        entries.push(WalkEntry {
1182            path: path.to_string(),
1183            hash: cid.hash,
1184            link_type: node.node_type,
1185            size: node_size,
1186            key: cid.key,
1187        });
1188
1189        for link in &node.links {
1190            let child_path = match &link.name {
1191                Some(name) => {
1192                    if name.starts_with("_chunk_") || name.starts_with('_') {
1193                        // Internal nodes inherit parent's key
1194                        let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1195                        Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1196                        continue;
1197                    }
1198                    if path.is_empty() {
1199                        name.clone()
1200                    } else {
1201                        format!("{}/{}", path, name)
1202                    }
1203                }
1204                None => path.to_string(),
1205            };
1206
1207            // Child nodes use their own key from link
1208            let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1209            Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1210        }
1211
1212        Ok(())
1213    }
1214
1215    /// Walk entire tree with parallel fetching
1216    /// Uses a work-stealing approach: always keeps `concurrency` requests in flight
1217    pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1218        self.walk_parallel_with_progress(cid, path, concurrency, None).await
1219    }
1220
1221    /// Walk entire tree with parallel fetching and optional progress counter
1222    /// The counter is incremented for each node fetched (not just entries found)
1223    ///
1224    /// OPTIMIZATION: Blobs are NOT fetched - their metadata (hash, size, link_type)
1225    /// comes from the parent node's link, so we just add them directly to entries.
1226    /// This avoids downloading file contents during tree traversal.
1227    pub async fn walk_parallel_with_progress(
1228        &self,
1229        cid: &Cid,
1230        path: &str,
1231        concurrency: usize,
1232        progress: Option<&std::sync::atomic::AtomicUsize>,
1233    ) -> Result<Vec<WalkEntry>, HashTreeError> {
1234        use futures::stream::{FuturesUnordered, StreamExt};
1235        use std::collections::VecDeque;
1236        use std::sync::atomic::Ordering;
1237
1238        let mut entries = Vec::new();
1239        let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1240        let mut active = FuturesUnordered::new();
1241
1242        // Seed with root
1243        pending.push_back((cid.clone(), path.to_string()));
1244
1245        loop {
1246            // Fill up to concurrency limit from pending queue
1247            while active.len() < concurrency {
1248                if let Some((node_cid, node_path)) = pending.pop_front() {
1249                    let store = &self.store;
1250                    let fut = async move {
1251                        let data = store.get(&node_cid.hash).await
1252                            .map_err(|e| HashTreeError::Store(e.to_string()))?;
1253                        Ok::<_, HashTreeError>((node_cid, node_path, data))
1254                    };
1255                    active.push(fut);
1256                } else {
1257                    break;
1258                }
1259            }
1260
1261            // If nothing active, we're done
1262            if active.is_empty() {
1263                break;
1264            }
1265
1266            // Wait for any future to complete
1267            if let Some(result) = active.next().await {
1268                let (node_cid, node_path, data) = result?;
1269
1270                // Update progress counter
1271                if let Some(counter) = progress {
1272                    counter.fetch_add(1, Ordering::Relaxed);
1273                }
1274
1275                let data = match data {
1276                    Some(d) => d,
1277                    None => continue,
1278                };
1279
1280                // Decrypt if key is present
1281                let data = if let Some(key) = &node_cid.key {
1282                    decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1283                } else {
1284                    data
1285                };
1286
1287                let node = match try_decode_tree_node(&data) {
1288                    Some(n) => n,
1289                    None => {
1290                        // It's a blob/file - this case only happens for root
1291                        entries.push(WalkEntry {
1292                            path: node_path,
1293                            hash: node_cid.hash,
1294                            link_type: LinkType::Blob,
1295                            size: data.len() as u64,
1296                            key: node_cid.key,
1297                        });
1298                        continue;
1299                    }
1300                };
1301
1302                // It's a directory/file node
1303                let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1304                entries.push(WalkEntry {
1305                    path: node_path.clone(),
1306                    hash: node_cid.hash,
1307                    link_type: node.node_type,
1308                    size: node_size,
1309                    key: node_cid.key,
1310                });
1311
1312                // Queue children - but DON'T fetch blobs, just add them directly
1313                for link in &node.links {
1314                    let child_path = match &link.name {
1315                        Some(name) => {
1316                            if name.starts_with("_chunk_") || name.starts_with('_') {
1317                                // Internal chunked nodes - inherit parent's key, same path
1318                                let sub_cid = Cid { hash: link.hash, key: node_cid.key, size: 0 };
1319                                pending.push_back((sub_cid, node_path.clone()));
1320                                continue;
1321                            }
1322                            if node_path.is_empty() {
1323                                name.clone()
1324                            } else {
1325                                format!("{}/{}", node_path, name)
1326                            }
1327                        }
1328                        None => node_path.clone(),
1329                    };
1330
1331                    // OPTIMIZATION: If it's a blob, add entry directly without fetching
1332                    // The link already contains all the metadata we need
1333                    if link.link_type == LinkType::Blob {
1334                        entries.push(WalkEntry {
1335                            path: child_path,
1336                            hash: link.hash,
1337                            link_type: LinkType::Blob,
1338                            size: link.size,
1339                            key: link.key,
1340                        });
1341                        if let Some(counter) = progress {
1342                            counter.fetch_add(1, Ordering::Relaxed);
1343                        }
1344                        continue;
1345                    }
1346
1347                    // For tree nodes (File/Dir), we need to fetch to see their children
1348                    let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1349                    pending.push_back((child_cid, child_path));
1350                }
1351            }
1352        }
1353
1354        Ok(entries)
1355    }
1356
1357    /// Walk tree as stream
1358    pub fn walk_stream(
1359        &self,
1360        cid: Cid,
1361        initial_path: String,
1362    ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1363        Box::pin(stream::unfold(
1364            WalkStreamState::Init { cid, path: initial_path, tree: self },
1365            |state| async move {
1366                match state {
1367                    WalkStreamState::Init { cid, path, tree } => {
1368                        let data = match tree.store.get(&cid.hash).await {
1369                            Ok(Some(d)) => d,
1370                            Ok(None) => return None,
1371                            Err(e) => {
1372                                return Some((
1373                                    Err(HashTreeError::Store(e.to_string())),
1374                                    WalkStreamState::Done,
1375                                ))
1376                            }
1377                        };
1378
1379                        // Decrypt if key is present
1380                        let data = if let Some(key) = &cid.key {
1381                            match decrypt_chk(&data, key) {
1382                                Ok(d) => d,
1383                                Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1384                            }
1385                        } else {
1386                            data
1387                        };
1388
1389                        let node = match try_decode_tree_node(&data) {
1390                            Some(n) => n,
1391                            None => {
1392                                // Blob data
1393                                let entry = WalkEntry {
1394                                    path,
1395                                    hash: cid.hash,
1396                                    link_type: LinkType::Blob,
1397                                    size: data.len() as u64,
1398                                    key: cid.key,
1399                                };
1400                                return Some((Ok(entry), WalkStreamState::Done));
1401                            }
1402                        };
1403
1404                        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1405                        let entry = WalkEntry {
1406                            path: path.clone(),
1407                            hash: cid.hash,
1408                            link_type: node.node_type,
1409                            size: node_size,
1410                            key: cid.key,
1411                        };
1412
1413                        // Create stack with children to process
1414                        let mut stack: Vec<WalkStackItem> = Vec::new();
1415                        for link in node.links.into_iter().rev() {
1416                            let child_path = match &link.name {
1417                                Some(name) if !name.starts_with('_') => {
1418                                    if path.is_empty() {
1419                                        name.clone()
1420                                    } else {
1421                                        format!("{}/{}", path, name)
1422                                    }
1423                                }
1424                                _ => path.clone(),
1425                            };
1426                            // Child nodes use their own key from link
1427                            stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1428                        }
1429
1430                        Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1431                    }
1432                    WalkStreamState::Processing { mut stack, tree } => {
1433                        tree.process_walk_stack(&mut stack).await
1434                    }
1435                    WalkStreamState::Done => None,
1436                }
1437            },
1438        ))
1439    }
1440
1441    async fn process_walk_stack<'a>(
1442        &'a self,
1443        stack: &mut Vec<WalkStackItem>,
1444    ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1445        while let Some(item) = stack.pop() {
1446            let data = match self.store.get(&item.hash).await {
1447                Ok(Some(d)) => d,
1448                Ok(None) => continue,
1449                Err(e) => {
1450                    return Some((
1451                        Err(HashTreeError::Store(e.to_string())),
1452                        WalkStreamState::Done,
1453                    ))
1454                }
1455            };
1456
1457            let node = match try_decode_tree_node(&data) {
1458                Some(n) => n,
1459                None => {
1460                    // Blob data
1461                    let entry = WalkEntry {
1462                        path: item.path,
1463                        hash: item.hash,
1464                        link_type: LinkType::Blob,
1465                        size: data.len() as u64,
1466                        key: item.key,
1467                    };
1468                    return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1469                }
1470            };
1471
1472            let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1473            let entry = WalkEntry {
1474                path: item.path.clone(),
1475                hash: item.hash,
1476                link_type: node.node_type,
1477                size: node_size,
1478                key: None, // directories are not encrypted
1479            };
1480
1481            // Push children to stack
1482            for link in node.links.into_iter().rev() {
1483                let child_path = match &link.name {
1484                    Some(name) if !name.starts_with('_') => {
1485                        if item.path.is_empty() {
1486                            name.clone()
1487                        } else {
1488                            format!("{}/{}", item.path, name)
1489                        }
1490                    }
1491                    _ => item.path.clone(),
1492                };
1493                stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1494            }
1495
1496            return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1497        }
1498        None
1499    }
1500
1501    // ============ EDIT ============
1502
1503    /// Add or update an entry in a directory
1504    /// Returns new root Cid (immutable operation)
1505    pub async fn set_entry(
1506        &self,
1507        root: &Cid,
1508        path: &[&str],
1509        name: &str,
1510        entry_cid: &Cid,
1511        link_type: LinkType,
1512    ) -> Result<Cid, HashTreeError> {
1513        let dir_cid = self.resolve_path_array(root, path).await?;
1514        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1515
1516        let entries = self.list_directory(&dir_cid).await?;
1517        let mut new_entries: Vec<DirEntry> = entries
1518            .into_iter()
1519            .filter(|e| e.name != name)
1520            .map(|e| DirEntry {
1521                name: e.name,
1522                hash: e.hash,
1523                size: e.size,
1524                key: e.key,
1525                link_type: e.link_type,
1526                meta: e.meta,
1527            })
1528            .collect();
1529
1530        new_entries.push(DirEntry {
1531            name: name.to_string(),
1532            hash: entry_cid.hash,
1533            size: entry_cid.size,
1534            key: entry_cid.key,
1535            link_type,
1536            meta: None,
1537        });
1538
1539        let new_dir_cid = self.put_directory(new_entries).await?;
1540        self.rebuild_path(root, path, new_dir_cid).await
1541    }
1542
1543    /// Remove an entry from a directory
1544    /// Returns new root Cid
1545    pub async fn remove_entry(
1546        &self,
1547        root: &Cid,
1548        path: &[&str],
1549        name: &str,
1550    ) -> Result<Cid, HashTreeError> {
1551        let dir_cid = self.resolve_path_array(root, path).await?;
1552        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1553
1554        let entries = self.list_directory(&dir_cid).await?;
1555        let new_entries: Vec<DirEntry> = entries
1556            .into_iter()
1557            .filter(|e| e.name != name)
1558            .map(|e| DirEntry {
1559                name: e.name,
1560                hash: e.hash,
1561                size: e.size,
1562                key: e.key,
1563                link_type: e.link_type,
1564                meta: e.meta,
1565            })
1566            .collect();
1567
1568        let new_dir_cid = self.put_directory(new_entries).await?;
1569        self.rebuild_path(root, path, new_dir_cid).await
1570    }
1571
1572    /// Rename an entry in a directory
1573    /// Returns new root Cid
1574    pub async fn rename_entry(
1575        &self,
1576        root: &Cid,
1577        path: &[&str],
1578        old_name: &str,
1579        new_name: &str,
1580    ) -> Result<Cid, HashTreeError> {
1581        if old_name == new_name {
1582            return Ok(root.clone());
1583        }
1584
1585        let dir_cid = self.resolve_path_array(root, path).await?;
1586        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1587
1588        let entries = self.list_directory(&dir_cid).await?;
1589        let entry = entries
1590            .iter()
1591            .find(|e| e.name == old_name)
1592            .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1593
1594        let entry_hash = entry.hash;
1595        let entry_size = entry.size;
1596        let entry_key = entry.key;
1597        let entry_link_type = entry.link_type;
1598        let entry_meta = entry.meta.clone();
1599
1600        let new_entries: Vec<DirEntry> = entries
1601            .into_iter()
1602            .filter(|e| e.name != old_name)
1603            .map(|e| DirEntry {
1604                name: e.name,
1605                hash: e.hash,
1606                size: e.size,
1607                key: e.key,
1608                link_type: e.link_type,
1609                meta: e.meta,
1610            })
1611            .chain(std::iter::once(DirEntry {
1612                name: new_name.to_string(),
1613                hash: entry_hash,
1614                size: entry_size,
1615                key: entry_key,
1616                link_type: entry_link_type,
1617                meta: entry_meta,
1618            }))
1619            .collect();
1620
1621        let new_dir_cid = self.put_directory(new_entries).await?;
1622        self.rebuild_path(root, path, new_dir_cid).await
1623    }
1624
1625    /// Move an entry to a different directory
1626    /// Returns new root Cid
1627    pub async fn move_entry(
1628        &self,
1629        root: &Cid,
1630        source_path: &[&str],
1631        name: &str,
1632        target_path: &[&str],
1633    ) -> Result<Cid, HashTreeError> {
1634        let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1635        let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1636
1637        let source_entries = self.list_directory(&source_dir_cid).await?;
1638        let entry = source_entries
1639            .iter()
1640            .find(|e| e.name == name)
1641            .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1642
1643        let entry_cid = Cid {
1644            hash: entry.hash,
1645            key: entry.key,
1646            size: entry.size,
1647        };
1648        let entry_link_type = entry.link_type;
1649
1650        // Remove from source
1651        let new_root = self.remove_entry(root, source_path, name).await?;
1652
1653        // Add to target
1654        self.set_entry(&new_root, target_path, name, &entry_cid, entry_link_type).await
1655    }
1656
1657    async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1658        if path.is_empty() {
1659            return Ok(Some(root.clone()));
1660        }
1661        self.resolve_path(root, &path.join("/")).await
1662    }
1663
1664    async fn rebuild_path(
1665        &self,
1666        root: &Cid,
1667        path: &[&str],
1668        new_child: Cid,
1669    ) -> Result<Cid, HashTreeError> {
1670        if path.is_empty() {
1671            return Ok(new_child);
1672        }
1673
1674        let mut child_cid = new_child;
1675        let parts: Vec<&str> = path.to_vec();
1676
1677        for i in (0..parts.len()).rev() {
1678            let child_name = parts[i];
1679            let parent_path = &parts[..i];
1680
1681            let parent_cid = if parent_path.is_empty() {
1682                root.clone()
1683            } else {
1684                self.resolve_path_array(root, parent_path)
1685                    .await?
1686                    .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1687            };
1688
1689            let parent_entries = self.list_directory(&parent_cid).await?;
1690            let new_parent_entries: Vec<DirEntry> = parent_entries
1691                .into_iter()
1692                .map(|e| {
1693                    if e.name == child_name {
1694                        DirEntry {
1695                            name: e.name,
1696                            hash: child_cid.hash,
1697                            size: child_cid.size,
1698                            key: child_cid.key,
1699                            link_type: e.link_type,
1700                            meta: e.meta,
1701                        }
1702                    } else {
1703                        DirEntry {
1704                            name: e.name,
1705                            hash: e.hash,
1706                            size: e.size,
1707                            key: e.key,
1708                            link_type: e.link_type,
1709                            meta: e.meta,
1710                        }
1711                    }
1712                })
1713                .collect();
1714
1715            child_cid = self.put_directory(new_parent_entries).await?;
1716        }
1717
1718        Ok(child_cid)
1719    }
1720
1721    // ============ UTILITY ============
1722
1723    /// Get the underlying store
1724    pub fn get_store(&self) -> Arc<S> {
1725        self.store.clone()
1726    }
1727
1728    /// Get chunk size configuration
1729    pub fn chunk_size(&self) -> usize {
1730        self.chunk_size
1731    }
1732
1733    /// Get max links configuration
1734    pub fn max_links(&self) -> usize {
1735        self.max_links
1736    }
1737}
1738
1739// Internal state types for streaming
1740
1741enum StreamStackItem {
1742    Hash(Hash),
1743}
1744
1745enum ReadStreamState<'a, S: Store> {
1746    Init { hash: Hash, tree: &'a HashTree<S> },
1747    Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1748    Done,
1749}
1750
1751struct WalkStackItem {
1752    hash: Hash,
1753    path: String,
1754    key: Option<[u8; 32]>,
1755}
1756
1757enum WalkStreamState<'a, S: Store> {
1758    Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1759    Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1760    Done,
1761}
1762
1763// Encrypted stream state types
1764struct EncryptedStackItem {
1765    hash: Hash,
1766    key: Option<[u8; 32]>,
1767}
1768
1769enum EncryptedStreamState<'a, S: Store> {
1770    Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1771    Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1772    Done,
1773}
1774
1775/// Verify tree integrity - checks that all referenced hashes exist
1776pub async fn verify_tree<S: Store>(
1777    store: Arc<S>,
1778    root_hash: &Hash,
1779) -> Result<crate::reader::VerifyResult, HashTreeError> {
1780    let mut missing = Vec::new();
1781    let mut visited = std::collections::HashSet::new();
1782
1783    verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1784
1785    Ok(crate::reader::VerifyResult {
1786        valid: missing.is_empty(),
1787        missing,
1788    })
1789}
1790
1791async fn verify_recursive<S: Store>(
1792    store: Arc<S>,
1793    hash: &Hash,
1794    missing: &mut Vec<Hash>,
1795    visited: &mut std::collections::HashSet<String>,
1796) -> Result<(), HashTreeError> {
1797    let hex = to_hex(hash);
1798    if visited.contains(&hex) {
1799        return Ok(());
1800    }
1801    visited.insert(hex);
1802
1803    let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1804        Some(d) => d,
1805        None => {
1806            missing.push(*hash);
1807            return Ok(());
1808        }
1809    };
1810
1811    if is_tree_node(&data) {
1812        let node = decode_tree_node(&data)?;
1813        for link in &node.links {
1814            Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1815        }
1816    }
1817
1818    Ok(())
1819}
1820
1821#[cfg(test)]
1822mod tests {
1823    use super::*;
1824    use crate::store::MemoryStore;
1825
1826    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1827        let store = Arc::new(MemoryStore::new());
1828        // Use public (unencrypted) mode for these tests
1829        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1830        (store, tree)
1831    }
1832
1833    #[tokio::test]
1834    async fn test_put_and_read_blob() {
1835        let (_store, tree) = make_tree();
1836
1837        let data = vec![1, 2, 3, 4, 5];
1838        let hash = tree.put_blob(&data).await.unwrap();
1839
1840        let result = tree.get_blob(&hash).await.unwrap();
1841        assert_eq!(result, Some(data));
1842    }
1843
1844    #[tokio::test]
1845    async fn test_put_and_read_file_small() {
1846        let (_store, tree) = make_tree();
1847
1848        let data = b"Hello, World!";
1849        let result = tree.put_file(data).await.unwrap();
1850
1851        assert_eq!(result.size, data.len() as u64);
1852
1853        let read_data = tree.read_file(&result.hash).await.unwrap();
1854        assert_eq!(read_data, Some(data.to_vec()));
1855    }
1856
1857    #[tokio::test]
1858    async fn test_put_and_read_directory() {
1859        let (_store, tree) = make_tree();
1860
1861        let file1 = tree.put_blob(b"content1").await.unwrap();
1862        let file2 = tree.put_blob(b"content2").await.unwrap();
1863
1864        let dir_cid = tree
1865            .put_directory(
1866                vec![
1867                    DirEntry::new("a.txt", file1).with_size(8),
1868                    DirEntry::new("b.txt", file2).with_size(8),
1869                ],
1870            )
1871            .await
1872            .unwrap();
1873
1874        let entries = tree.list_directory(&dir_cid).await.unwrap();
1875        assert_eq!(entries.len(), 2);
1876        let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1877        assert!(names.contains(&"a.txt"));
1878        assert!(names.contains(&"b.txt"));
1879    }
1880
1881    #[tokio::test]
1882    async fn test_is_directory() {
1883        let (_store, tree) = make_tree();
1884
1885        let file_hash = tree.put_blob(b"data").await.unwrap();
1886        let dir_cid = tree.put_directory(vec![]).await.unwrap();
1887
1888        assert!(!tree.is_directory(&file_hash).await.unwrap());
1889        assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1890    }
1891
1892    #[tokio::test]
1893    async fn test_resolve_path() {
1894        let (_store, tree) = make_tree();
1895
1896        let file_hash = tree.put_blob(b"nested").await.unwrap();
1897        let sub_dir = tree.put_directory(
1898            vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1899        ).await.unwrap();
1900        let root_dir = tree.put_directory(
1901            vec![DirEntry::new("subdir", sub_dir.hash)],
1902        ).await.unwrap();
1903
1904        let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1905        assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1906    }
1907
1908    // ============ UNIFIED API TESTS ============
1909
1910    #[tokio::test]
1911    async fn test_unified_put_get_public() {
1912        let store = Arc::new(MemoryStore::new());
1913        // Use .public() to disable encryption
1914        let tree = HashTree::new(HashTreeConfig::new(store).public());
1915
1916        let data = b"Hello, public world!";
1917        let cid = tree.put(data).await.unwrap();
1918
1919        assert_eq!(cid.size, data.len() as u64);
1920        assert!(cid.key.is_none()); // No key for public content
1921
1922        let retrieved = tree.get(&cid).await.unwrap().unwrap();
1923        assert_eq!(retrieved, data);
1924    }
1925
1926    #[tokio::test]
1927    async fn test_unified_put_get_encrypted() {
1928        let store = Arc::new(MemoryStore::new());
1929        // Default config has encryption enabled
1930        let tree = HashTree::new(HashTreeConfig::new(store));
1931
1932        let data = b"Hello, encrypted world!";
1933        let cid = tree.put(data).await.unwrap();
1934
1935        assert_eq!(cid.size, data.len() as u64);
1936        assert!(cid.key.is_some()); // Has encryption key
1937
1938        let retrieved = tree.get(&cid).await.unwrap().unwrap();
1939        assert_eq!(retrieved, data);
1940    }
1941
1942    #[tokio::test]
1943    async fn test_unified_put_get_encrypted_chunked() {
1944        let store = Arc::new(MemoryStore::new());
1945        let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
1946
1947        // Data larger than chunk size
1948        let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1949        let cid = tree.put(&data).await.unwrap();
1950
1951        assert_eq!(cid.size, data.len() as u64);
1952        assert!(cid.key.is_some());
1953
1954        let retrieved = tree.get(&cid).await.unwrap().unwrap();
1955        assert_eq!(retrieved, data);
1956    }
1957
1958    #[tokio::test]
1959    async fn test_cid_deterministic() {
1960        let store = Arc::new(MemoryStore::new());
1961        let tree = HashTree::new(HashTreeConfig::new(store));
1962
1963        let data = b"Same content produces same CID";
1964
1965        let cid1 = tree.put(data).await.unwrap();
1966        let cid2 = tree.put(data).await.unwrap();
1967
1968        // CHK: same content = same hash AND same key
1969        assert_eq!(cid1.hash, cid2.hash);
1970        assert_eq!(cid1.key, cid2.key);
1971        assert_eq!(cid1.to_string(), cid2.to_string());
1972    }
1973
1974    #[tokio::test]
1975    async fn test_cid_to_string_public() {
1976        let store = Arc::new(MemoryStore::new());
1977        let tree = HashTree::new(HashTreeConfig::new(store).public());
1978
1979        let cid = tree.put(b"test").await.unwrap();
1980        let s = cid.to_string();
1981
1982        // Public CID is just the hash (64 hex chars)
1983        assert_eq!(s.len(), 64);
1984        assert!(!s.contains(':'));
1985    }
1986
1987    #[tokio::test]
1988    async fn test_cid_to_string_encrypted() {
1989        let store = Arc::new(MemoryStore::new());
1990        let tree = HashTree::new(HashTreeConfig::new(store));
1991
1992        let cid = tree.put(b"test").await.unwrap();
1993        let s = cid.to_string();
1994
1995        // Encrypted CID is "hash:key" (64 + 1 + 64 = 129 chars)
1996        assert_eq!(s.len(), 129);
1997        assert!(s.contains(':'));
1998    }
1999}