hashtree_core/
hashtree.rs

1//! HashTree - Unified merkle tree operations
2//!
3//! Single struct for creating, reading, and editing content-addressed merkle trees.
4//! Mirrors the hashtree-ts HashTree class API.
5
6use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22/// HashTree configuration
23#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25    pub store: Arc<S>,
26    pub chunk_size: usize,
27    pub max_links: usize,
28    /// Whether to encrypt content (default: true when encryption feature enabled)
29    pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33    pub fn new(store: Arc<S>) -> Self {
34        Self {
35            store,
36            chunk_size: DEFAULT_CHUNK_SIZE,
37            max_links: DEFAULT_MAX_LINKS,
38            encrypted: true,
39        }
40    }
41
42    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43        self.chunk_size = chunk_size;
44        self
45    }
46
47    pub fn with_max_links(mut self, max_links: usize) -> Self {
48        self.max_links = max_links;
49        self
50    }
51
52    /// Disable encryption (store content publicly)
53    pub fn public(mut self) -> Self {
54        self.encrypted = false;
55        self
56    }
57}
58
59/// Result of put_file operation
60#[derive(Debug, Clone)]
61pub struct PutFileResult {
62    pub hash: Hash,
63    pub size: u64,
64}
65
66/// HashTree error type
67#[derive(Debug, thiserror::Error)]
68pub enum HashTreeError {
69    #[error("Store error: {0}")]
70    Store(String),
71    #[error("Codec error: {0}")]
72    Codec(#[from] crate::codec::CodecError),
73    #[error("Missing chunk: {0}")]
74    MissingChunk(String),
75    #[error("Path not found: {0}")]
76    PathNotFound(String),
77    #[error("Entry not found: {0}")]
78    EntryNotFound(String),
79    #[error("Encryption error: {0}")]
80    Encryption(String),
81    #[error("Decryption error: {0}")]
82    Decryption(String),
83}
84
85impl From<BuilderError> for HashTreeError {
86    fn from(e: BuilderError) -> Self {
87        match e {
88            BuilderError::Store(s) => HashTreeError::Store(s),
89            BuilderError::Codec(c) => HashTreeError::Codec(c),
90            BuilderError::Encryption(s) => HashTreeError::Encryption(s),
91        }
92    }
93}
94
95impl From<ReaderError> for HashTreeError {
96    fn from(e: ReaderError) -> Self {
97        match e {
98            ReaderError::Store(s) => HashTreeError::Store(s),
99            ReaderError::Codec(c) => HashTreeError::Codec(c),
100            ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
101            ReaderError::Decryption(s) => HashTreeError::Encryption(s),
102            ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
103        }
104    }
105}
106
107/// HashTree - unified create, read, and edit merkle tree operations
108pub struct HashTree<S: Store> {
109    store: Arc<S>,
110    chunk_size: usize,
111    max_links: usize,
112    encrypted: bool,
113}
114
115impl<S: Store> HashTree<S> {
116    pub fn new(config: HashTreeConfig<S>) -> Self {
117        Self {
118            store: config.store,
119            chunk_size: config.chunk_size,
120            max_links: config.max_links,
121            encrypted: config.encrypted,
122        }
123    }
124
125    /// Check if encryption is enabled
126    pub fn is_encrypted(&self) -> bool {
127        self.encrypted
128    }
129
130    // ============ UNIFIED API ============
131
132    /// Store content, returns Cid (hash + optional key)
133    /// Encrypts by default when encryption feature is enabled
134    pub async fn put(&self, data: &[u8]) -> Result<Cid, HashTreeError> {
135        let size = data.len() as u64;
136
137        // Small data - store as single chunk
138        if data.len() <= self.chunk_size {
139            let (hash, key) = self.put_chunk_internal(data).await?;
140            return Ok(Cid { hash, key, size });
141        }
142
143        // Large data - chunk it
144        let mut links: Vec<Link> = Vec::new();
145        let mut offset = 0;
146
147        while offset < data.len() {
148            let end = (offset + self.chunk_size).min(data.len());
149            let chunk = &data[offset..end];
150            let chunk_size = chunk.len() as u64;
151            let (hash, key) = self.put_chunk_internal(chunk).await?;
152            links.push(Link {
153                hash,
154                name: None,
155                size: chunk_size,
156                key,
157                link_type: LinkType::Blob, // Leaf chunk (raw blob)
158                meta: None,
159            });
160            offset = end;
161        }
162
163        // Build tree from chunks
164        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
165        Ok(Cid { hash: root_hash, key: root_key, size })
166    }
167
168    /// Get content by Cid (handles decryption automatically)
169    pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
170        if let Some(key) = cid.key {
171            self.get_encrypted(&cid.hash, &key).await
172        } else {
173            self.read_file(&cid.hash).await
174        }
175    }
176
177    /// Store content from an async reader (streaming put)
178    ///
179    /// Reads data in chunks and builds a merkle tree incrementally.
180    /// Useful for large files or streaming data sources.
181    pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<Cid, HashTreeError> {
182        let mut buffer = vec![0u8; self.chunk_size];
183        let mut links = Vec::new();
184        let mut total_size: u64 = 0;
185        let mut consistent_key: Option<[u8; 32]> = None;
186
187        loop {
188            let mut chunk = Vec::new();
189            let mut bytes_read = 0;
190
191            // Read until we have a full chunk or EOF
192            while bytes_read < self.chunk_size {
193                let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
194                    .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
195                if n == 0 {
196                    break; // EOF
197                }
198                chunk.extend_from_slice(&buffer[..n]);
199                bytes_read += n;
200            }
201
202            if chunk.is_empty() {
203                break; // No more data
204            }
205
206            let chunk_len = chunk.len() as u64;
207            total_size += chunk_len;
208
209            let (hash, key) = self.put_chunk_internal(&chunk).await?;
210
211            // Track consistent key for single-key result
212            if links.is_empty() {
213                consistent_key = key;
214            } else if consistent_key != key {
215                consistent_key = None;
216            }
217
218            links.push(Link {
219                hash,
220                name: None,
221                size: chunk_len,
222                key,
223                link_type: LinkType::Blob, // Leaf chunk (raw blob)
224                meta: None,
225            });
226        }
227
228        if links.is_empty() {
229            // Empty input
230            let (hash, key) = self.put_chunk_internal(&[]).await?;
231            return Ok(Cid { hash, key, size: 0 });
232        }
233
234        // Build tree from chunks
235        let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
236        Ok(Cid { hash: root_hash, key: root_key, size: total_size })
237    }
238
239    /// Read content as a stream of chunks by Cid (handles decryption automatically)
240    ///
241    /// Returns an async stream that yields chunks as they are read.
242    /// Useful for large files or when you want to process data incrementally.
243    pub fn get_stream(
244        &self,
245        cid: &Cid,
246    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
247        let hash = cid.hash;
248        let key = cid.key;
249
250        if let Some(k) = key {
251            // Encrypted stream
252            Box::pin(self.read_file_stream_encrypted(hash, k))
253        } else {
254            // Unencrypted stream
255            self.read_file_stream(hash)
256        }
257    }
258
259    /// Read encrypted file as stream (internal)
260    fn read_file_stream_encrypted(
261        &self,
262        hash: Hash,
263        key: EncryptionKey,
264    ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
265        stream::unfold(
266            EncryptedStreamState::Init { hash, key, tree: self },
267            |state| async move {
268                match state {
269                    EncryptedStreamState::Init { hash, key, tree } => {
270                        let data = match tree.store.get(&hash).await {
271                            Ok(Some(d)) => d,
272                            Ok(None) => return None,
273                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
274                        };
275
276                        // Try to decrypt
277                        let decrypted = match decrypt_chk(&data, &key) {
278                            Ok(d) => d,
279                            Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
280                        };
281
282                        if !is_tree_node(&decrypted) {
283                            // Single blob - yield decrypted data
284                            return Some((Ok(decrypted), EncryptedStreamState::Done));
285                        }
286
287                        // Tree node - parse and traverse
288                        let node = match decode_tree_node(&decrypted) {
289                            Ok(n) => n,
290                            Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
291                        };
292
293                        let mut stack: Vec<EncryptedStackItem> = Vec::new();
294                        for link in node.links.into_iter().rev() {
295                            stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
296                        }
297
298                        tree.process_encrypted_stream_stack(&mut stack).await
299                    }
300                    EncryptedStreamState::Processing { mut stack, tree } => {
301                        tree.process_encrypted_stream_stack(&mut stack).await
302                    }
303                    EncryptedStreamState::Done => None,
304                }
305            },
306        )
307    }
308
309    async fn process_encrypted_stream_stack<'a>(
310        &'a self,
311        stack: &mut Vec<EncryptedStackItem>,
312    ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
313        while let Some(item) = stack.pop() {
314            let data = match self.store.get(&item.hash).await {
315                Ok(Some(d)) => d,
316                Ok(None) => {
317                    return Some((
318                        Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
319                        EncryptedStreamState::Done,
320                    ))
321                }
322                Err(e) => {
323                    return Some((
324                        Err(HashTreeError::Store(e.to_string())),
325                        EncryptedStreamState::Done,
326                    ))
327                }
328            };
329
330            // Decrypt if we have a key
331            let decrypted = if let Some(key) = item.key {
332                match decrypt_chk(&data, &key) {
333                    Ok(d) => d,
334                    Err(e) => {
335                        return Some((
336                            Err(HashTreeError::Decryption(e.to_string())),
337                            EncryptedStreamState::Done,
338                        ))
339                    }
340                }
341            } else {
342                data
343            };
344
345            if is_tree_node(&decrypted) {
346                // Nested tree node - add children to stack
347                let node = match decode_tree_node(&decrypted) {
348                    Ok(n) => n,
349                    Err(e) => {
350                        return Some((
351                            Err(HashTreeError::Codec(e)),
352                            EncryptedStreamState::Done,
353                        ))
354                    }
355                };
356                for link in node.links.into_iter().rev() {
357                    stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
358                }
359            } else {
360                // Leaf chunk - yield decrypted data
361                return Some((
362                    Ok(decrypted),
363                    EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
364                ));
365            }
366        }
367        None
368    }
369
370    /// Store a chunk with optional encryption
371    async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
372        if self.encrypted {
373            let (encrypted, key) = encrypt_chk(data)
374                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
375            let hash = sha256(&encrypted);
376            self.store
377                .put(hash, encrypted)
378                .await
379                .map_err(|e| HashTreeError::Store(e.to_string()))?;
380            Ok((hash, Some(key)))
381        } else {
382            let hash = self.put_blob(data).await?;
383            Ok((hash, None))
384        }
385    }
386
387    /// Build tree and return (hash, optional_key)
388    async fn build_tree_internal(
389        &self,
390        links: Vec<Link>,
391        total_size: Option<u64>,
392    ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
393        // Single link with matching size - return directly
394        if links.len() == 1 {
395            if let Some(ts) = total_size {
396                if links[0].size == ts {
397                    return Ok((links[0].hash, links[0].key));
398                }
399            }
400        }
401
402        if links.len() <= self.max_links {
403            let node = TreeNode {
404                node_type: LinkType::File,
405                links,
406            };
407            let (data, _) = encode_and_hash(&node)?;
408
409            if self.encrypted {
410                let (encrypted, key) = encrypt_chk(&data)
411                    .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
412                let hash = sha256(&encrypted);
413                self.store
414                    .put(hash, encrypted)
415                    .await
416                    .map_err(|e| HashTreeError::Store(e.to_string()))?;
417                return Ok((hash, Some(key)));
418            }
419
420            // Unencrypted path
421            let hash = sha256(&data);
422            self.store
423                .put(hash, data)
424                .await
425                .map_err(|e| HashTreeError::Store(e.to_string()))?;
426            return Ok((hash, None));
427        }
428
429        // Too many links - create subtrees
430        let mut sub_links = Vec::new();
431        for batch in links.chunks(self.max_links) {
432            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
433            let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
434            sub_links.push(Link {
435                hash,
436                name: None,
437                size: batch_size,
438                key,
439                link_type: LinkType::File, // Internal tree node
440                meta: None,
441            });
442        }
443
444        Box::pin(self.build_tree_internal(sub_links, total_size)).await
445    }
446
447    /// Get encrypted content by hash and key
448    async fn get_encrypted(
449        &self,
450        hash: &Hash,
451        key: &EncryptionKey,
452    ) -> Result<Option<Vec<u8>>, HashTreeError> {
453        let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
454            Some(d) => d,
455            None => return Ok(None),
456        };
457
458        // Decrypt the data
459        let decrypted = decrypt_chk(&encrypted_data, key)
460            .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
461
462        // Check if it's a tree node
463        if is_tree_node(&decrypted) {
464            let node = decode_tree_node(&decrypted)?;
465            let assembled = self.assemble_encrypted_chunks(&node).await?;
466            return Ok(Some(assembled));
467        }
468
469        // Single chunk data
470        Ok(Some(decrypted))
471    }
472
473    /// Assemble encrypted chunks from tree
474    async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
475        let mut parts: Vec<Vec<u8>> = Vec::new();
476
477        for link in &node.links {
478            let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
479
480            let encrypted_child = self
481                .store
482                .get(&link.hash)
483                .await
484                .map_err(|e| HashTreeError::Store(e.to_string()))?
485                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
486
487            let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
488                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
489
490            if is_tree_node(&decrypted) {
491                // Intermediate tree node - recurse
492                let child_node = decode_tree_node(&decrypted)?;
493                let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
494                parts.push(child_data);
495            } else {
496                // Leaf data chunk
497                parts.push(decrypted);
498            }
499        }
500
501        let total_len: usize = parts.iter().map(|p| p.len()).sum();
502        let mut result = Vec::with_capacity(total_len);
503        for part in parts {
504            result.extend_from_slice(&part);
505        }
506
507        Ok(result)
508    }
509
510    // ============ LOW-LEVEL CREATE ============
511
512    /// Store a blob directly (small data, no encryption)
513    /// Returns the content hash
514    pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
515        let hash = sha256(data);
516        self.store
517            .put(hash, data.to_vec())
518            .await
519            .map_err(|e| HashTreeError::Store(e.to_string()))?;
520        Ok(hash)
521    }
522
523    /// Store a file, chunking if necessary
524    /// Returns root hash and total size
525    pub async fn put_file(&self, data: &[u8]) -> Result<PutFileResult, HashTreeError> {
526        let size = data.len() as u64;
527
528        // Small file - store as single blob
529        if data.len() <= self.chunk_size {
530            let hash = self.put_blob(data).await?;
531            return Ok(PutFileResult { hash, size });
532        }
533
534        // Large file - chunk it
535        let mut chunk_hashes: Vec<Hash> = Vec::new();
536        let mut offset = 0;
537
538        while offset < data.len() {
539            let end = (offset + self.chunk_size).min(data.len());
540            let chunk = &data[offset..end];
541            let hash = self.put_blob(chunk).await?;
542            chunk_hashes.push(hash);
543            offset = end;
544        }
545
546        // Build tree from chunks
547        let chunks: Vec<Link> = chunk_hashes
548            .iter()
549            .enumerate()
550            .map(|(i, &hash)| {
551                let chunk_size = if i < chunk_hashes.len() - 1 {
552                    self.chunk_size as u64
553                } else {
554                    (data.len() - i * self.chunk_size) as u64
555                };
556                Link {
557                    hash,
558                    name: None,
559                    size: chunk_size,
560                    key: None,
561                    link_type: LinkType::Blob, // Leaf chunk (raw blob)
562                    meta: None,
563                }
564            })
565            .collect();
566
567        let root_hash = self.build_tree(chunks, Some(size)).await?;
568        Ok(PutFileResult { hash: root_hash, size })
569    }
570
571    /// Build a directory from entries
572    /// Returns Cid with key if encrypted
573    pub async fn put_directory(
574        &self,
575        entries: Vec<DirEntry>,
576    ) -> Result<Cid, HashTreeError> {
577        // Sort entries by name for deterministic hashing
578        let mut sorted = entries;
579        sorted.sort_by(|a, b| a.name.cmp(&b.name));
580
581        let links: Vec<Link> = sorted
582            .into_iter()
583            .map(|e| Link {
584                hash: e.hash,
585                name: Some(e.name),
586                size: e.size,
587                key: e.key,
588                link_type: e.link_type,
589                meta: e.meta,
590            })
591            .collect();
592
593        let total_size: u64 = links.iter().map(|l| l.size).sum();
594
595        // Fits in one node
596        if links.len() <= self.max_links {
597            let node = TreeNode {
598                node_type: LinkType::Dir,
599                links,
600            };
601            let (data, _plain_hash) = encode_and_hash(&node)?;
602
603            // Encrypt directory data if encryption is enabled
604            let (hash, key) = self.put_chunk_internal(&data).await?;
605
606            return Ok(Cid { hash, key, size: total_size });
607        }
608
609        // Large directory - create sub-trees
610        self.build_directory_by_chunks(links, total_size).await
611    }
612
613    /// Build a balanced tree from links
614    async fn build_tree(&self, links: Vec<Link>, total_size: Option<u64>) -> Result<Hash, HashTreeError> {
615        // Single link with matching size - return it directly
616        if links.len() == 1 {
617            if let Some(ts) = total_size {
618                if links[0].size == ts {
619                    return Ok(links[0].hash);
620                }
621            }
622        }
623
624        // Fits in one node
625        if links.len() <= self.max_links {
626            let node = TreeNode {
627                node_type: LinkType::File,
628                links,
629            };
630            let (data, hash) = encode_and_hash(&node)?;
631            self.store
632                .put(hash, data)
633                .await
634                .map_err(|e| HashTreeError::Store(e.to_string()))?;
635            return Ok(hash);
636        }
637
638        // Need to split into sub-trees
639        let mut sub_trees: Vec<Link> = Vec::new();
640
641        for batch in links.chunks(self.max_links) {
642            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
643
644            let node = TreeNode {
645                node_type: LinkType::File,
646                links: batch.to_vec(),
647            };
648            let (data, hash) = encode_and_hash(&node)?;
649            self.store
650                .put(hash, data)
651                .await
652                .map_err(|e| HashTreeError::Store(e.to_string()))?;
653
654            sub_trees.push(Link {
655                hash,
656                name: None,
657                size: batch_size,
658                key: None,
659                link_type: LinkType::File, // Internal tree node
660                meta: None,
661            });
662        }
663
664        // Recursively build parent level
665        Box::pin(self.build_tree(sub_trees, total_size)).await
666    }
667
668    /// Split directory into numeric chunks
669    /// Directory chunks are encrypted if encryption is enabled
670    async fn build_directory_by_chunks(
671        &self,
672        links: Vec<Link>,
673        total_size: u64,
674    ) -> Result<Cid, HashTreeError> {
675        let mut sub_trees: Vec<Link> = Vec::new();
676
677        for (i, batch) in links.chunks(self.max_links).enumerate() {
678            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
679
680            let node = TreeNode {
681                node_type: LinkType::Dir,
682                links: batch.to_vec(),
683            };
684            let (data, _plain_hash) = encode_and_hash(&node)?;
685
686            // Encrypt directory chunk if encryption is enabled
687            let (hash, key) = self.put_chunk_internal(&data).await?;
688
689            sub_trees.push(Link {
690                hash,
691                name: Some(format!("_chunk_{}", i * self.max_links)),
692                size: batch_size,
693                key,
694                link_type: LinkType::Dir, // Internal chunk node
695                meta: None,
696            });
697        }
698
699        if sub_trees.len() <= self.max_links {
700            let node = TreeNode {
701                node_type: LinkType::Dir,
702                links: sub_trees,
703            };
704            let (data, _plain_hash) = encode_and_hash(&node)?;
705
706            // Encrypt root directory if encryption is enabled
707            let (hash, key) = self.put_chunk_internal(&data).await?;
708            return Ok(Cid { hash, key, size: total_size });
709        }
710
711        // Recursively build more levels
712        Box::pin(self.build_directory_by_chunks(sub_trees, total_size)).await
713    }
714
715    /// Create a tree node with custom links
716    pub async fn put_tree_node(
717        &self,
718        links: Vec<Link>,
719    ) -> Result<Hash, HashTreeError> {
720        let node = TreeNode {
721            node_type: LinkType::Dir,
722            links,
723        };
724
725        let (data, hash) = encode_and_hash(&node)?;
726        self.store
727            .put(hash, data)
728            .await
729            .map_err(|e| HashTreeError::Store(e.to_string()))?;
730        Ok(hash)
731    }
732
733    // ============ READ ============
734
735    /// Get raw data by hash
736    pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
737        self.store
738            .get(hash)
739            .await
740            .map_err(|e| HashTreeError::Store(e.to_string()))
741    }
742
743    /// Get and decode a tree node (unencrypted)
744    pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
745        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
746            Some(d) => d,
747            None => return Ok(None),
748        };
749
750        if !is_tree_node(&data) {
751            return Ok(None);
752        }
753
754        let node = decode_tree_node(&data)?;
755        Ok(Some(node))
756    }
757
758    /// Get and decode a tree node using Cid (with decryption if key present)
759    pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
760        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
761            Some(d) => d,
762            None => return Ok(None),
763        };
764
765        // Decrypt if key is present
766        let decrypted = if let Some(key) = &cid.key {
767            decrypt_chk(&data, key)
768                .map_err(|e| HashTreeError::Decryption(e.to_string()))?
769        } else {
770            data
771        };
772
773        if !is_tree_node(&decrypted) {
774            return Ok(None);
775        }
776
777        let node = decode_tree_node(&decrypted)?;
778        Ok(Some(node))
779    }
780
781    /// Check if hash points to a tree node (no decryption)
782    pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
783        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
784            Some(d) => d,
785            None => return Ok(false),
786        };
787        Ok(is_tree_node(&data))
788    }
789
790    /// Check if Cid points to a directory (with decryption)
791    pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
792        let node = match self.get_node(cid).await? {
793            Some(n) => n,
794            None => return Ok(false),
795        };
796        // Directory has named links (not just internal chunks)
797        Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
798    }
799
800    /// Check if hash points to a directory (tree with named links, no decryption)
801    pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
802        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
803            Some(d) => d,
804            None => return Ok(false),
805        };
806        Ok(is_directory_node(&data))
807    }
808
809    /// Read a complete file (reassemble chunks if needed)
810    pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
811        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
812            Some(d) => d,
813            None => return Ok(None),
814        };
815
816        // Check if it's a tree (chunked file) or raw blob
817        if !is_tree_node(&data) {
818            return Ok(Some(data));
819        }
820
821        // It's a tree - reassemble chunks
822        let node = decode_tree_node(&data)?;
823        let assembled = self.assemble_chunks(&node).await?;
824        Ok(Some(assembled))
825    }
826
827    /// Recursively assemble chunks from tree
828    async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
829        let mut parts: Vec<Vec<u8>> = Vec::new();
830
831        for link in &node.links {
832            let child_data = self
833                .store
834                .get(&link.hash)
835                .await
836                .map_err(|e| HashTreeError::Store(e.to_string()))?
837                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
838
839            if is_tree_node(&child_data) {
840                let child_node = decode_tree_node(&child_data)?;
841                parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
842            } else {
843                parts.push(child_data);
844            }
845        }
846
847        // Concatenate all parts
848        let total_length: usize = parts.iter().map(|p| p.len()).sum();
849        let mut result = Vec::with_capacity(total_length);
850        for part in parts {
851            result.extend_from_slice(&part);
852        }
853
854        Ok(result)
855    }
856
857    /// Read a file as stream of chunks
858    /// Returns an async stream that yields chunks as they are read
859    pub fn read_file_stream(
860        &self,
861        hash: Hash,
862    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
863        Box::pin(stream::unfold(
864            ReadStreamState::Init { hash, tree: self },
865            |state| async move {
866                match state {
867                    ReadStreamState::Init { hash, tree } => {
868                        let data = match tree.store.get(&hash).await {
869                            Ok(Some(d)) => d,
870                            Ok(None) => return None,
871                            Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
872                        };
873
874                        if !is_tree_node(&data) {
875                            // Single blob - yield it and finish
876                            return Some((Ok(data), ReadStreamState::Done));
877                        }
878
879                        // Tree node - start streaming chunks
880                        let node = match decode_tree_node(&data) {
881                            Ok(n) => n,
882                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
883                        };
884
885                        // Create stack with all links to process
886                        let mut stack: Vec<StreamStackItem> = Vec::new();
887                        for link in node.links.into_iter().rev() {
888                            stack.push(StreamStackItem::Hash(link.hash));
889                        }
890
891                        // Process first item
892                        tree.process_stream_stack(&mut stack).await
893                    }
894                    ReadStreamState::Processing { mut stack, tree } => {
895                        tree.process_stream_stack(&mut stack).await
896                    }
897                    ReadStreamState::Done => None,
898                }
899            },
900        ))
901    }
902
903    async fn process_stream_stack<'a>(
904        &'a self,
905        stack: &mut Vec<StreamStackItem>,
906    ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
907        while let Some(item) = stack.pop() {
908            match item {
909                StreamStackItem::Hash(hash) => {
910                    let data = match self.store.get(&hash).await {
911                        Ok(Some(d)) => d,
912                        Ok(None) => {
913                            return Some((
914                                Err(HashTreeError::MissingChunk(to_hex(&hash))),
915                                ReadStreamState::Done,
916                            ))
917                        }
918                        Err(e) => {
919                            return Some((
920                                Err(HashTreeError::Store(e.to_string())),
921                                ReadStreamState::Done,
922                            ))
923                        }
924                    };
925
926                    if is_tree_node(&data) {
927                        // Nested tree - push its children to stack
928                        let node = match decode_tree_node(&data) {
929                            Ok(n) => n,
930                            Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
931                        };
932                        for link in node.links.into_iter().rev() {
933                            stack.push(StreamStackItem::Hash(link.hash));
934                        }
935                    } else {
936                        // Leaf blob - yield it
937                        return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
938                    }
939                }
940            }
941        }
942        None
943    }
944
945    /// Read file chunks as Vec (non-streaming version)
946    pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
947        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
948            Some(d) => d,
949            None => return Ok(vec![]),
950        };
951
952        if !is_tree_node(&data) {
953            return Ok(vec![data]);
954        }
955
956        let node = decode_tree_node(&data)?;
957        self.collect_chunks(&node).await
958    }
959
960    async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
961        let mut chunks = Vec::new();
962
963        for link in &node.links {
964            let child_data = self
965                .store
966                .get(&link.hash)
967                .await
968                .map_err(|e| HashTreeError::Store(e.to_string()))?
969                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
970
971            if is_tree_node(&child_data) {
972                let child_node = decode_tree_node(&child_data)?;
973                chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
974            } else {
975                chunks.push(child_data);
976            }
977        }
978
979        Ok(chunks)
980    }
981
982    /// List directory entries (Cid-based, supports encrypted directories)
983    pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
984        let node = match self.get_node(cid).await? {
985            Some(n) => n,
986            None => return Ok(vec![]),
987        };
988
989        let mut entries = Vec::new();
990
991        for link in &node.links {
992            // Skip internal chunk nodes - recurse into them
993            if let Some(ref name) = link.name {
994                if name.starts_with("_chunk_") || name.starts_with('_') {
995                    let chunk_cid = Cid { hash: link.hash, key: link.key, size: link.size };
996                    let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
997                    entries.extend(sub_entries);
998                    continue;
999                }
1000            }
1001
1002            entries.push(TreeEntry {
1003                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1004                hash: link.hash,
1005                size: link.size,
1006                link_type: link.link_type,
1007                key: link.key,
1008                meta: link.meta.clone(),
1009            });
1010        }
1011
1012        Ok(entries)
1013    }
1014
1015    /// List directory entries using Cid (with decryption if key present)
1016    pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1017        let node = match self.get_node(cid).await? {
1018            Some(n) => n,
1019            None => return Ok(vec![]),
1020        };
1021
1022        let mut entries = Vec::new();
1023
1024        for link in &node.links {
1025            // Skip internal chunk nodes
1026            if let Some(ref name) = link.name {
1027                if name.starts_with("_chunk_") || name.starts_with('_') {
1028                    // Internal nodes inherit parent's key for decryption
1029                    let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1030                    let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1031                    entries.extend(sub_entries);
1032                    continue;
1033                }
1034            }
1035
1036            entries.push(TreeEntry {
1037                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1038                hash: link.hash,
1039                size: link.size,
1040                link_type: link.link_type,
1041                key: link.key,
1042                meta: link.meta.clone(),
1043            });
1044        }
1045
1046        Ok(entries)
1047    }
1048
1049    /// Resolve a path within a tree (returns Cid with key if encrypted)
1050    pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1051        let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1052        if parts.is_empty() {
1053            return Ok(Some(cid.clone()));
1054        }
1055
1056        let mut current_cid = cid.clone();
1057
1058        for part in parts {
1059            // Use get_node which handles decryption when key is present
1060            let node = match self.get_node(&current_cid).await? {
1061                Some(n) => n,
1062                None => return Ok(None),
1063            };
1064
1065            if let Some(link) = self.find_link(&node, part) {
1066                current_cid = Cid {
1067                    hash: link.hash,
1068                    key: link.key,
1069                    size: link.size,
1070                };
1071            } else {
1072                // Check internal nodes
1073                match self.find_link_in_subtrees_cid(&node, part, &current_cid).await? {
1074                    Some(link) => {
1075                        current_cid = Cid {
1076                            hash: link.hash,
1077                            key: link.key,
1078                            size: link.size,
1079                        };
1080                    }
1081                    None => return Ok(None),
1082                }
1083            }
1084        }
1085
1086        Ok(Some(current_cid))
1087    }
1088
1089    /// Resolve a path within a tree using Cid (with decryption if key present)
1090    pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1091        self.resolve(cid, path).await
1092    }
1093
1094    fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1095        node.links
1096            .iter()
1097            .find(|l| l.name.as_deref() == Some(name))
1098            .cloned()
1099    }
1100
1101    /// Find a link in subtrees using Cid (with decryption support)
1102    async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1103        for link in &node.links {
1104            if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1105                continue;
1106            }
1107
1108            // Internal nodes inherit encryption from parent context
1109            let sub_cid = Cid {
1110                hash: link.hash,
1111                key: link.key.clone(),
1112                size: link.size,
1113            };
1114
1115            let sub_node = match self.get_node(&sub_cid).await? {
1116                Some(n) => n,
1117                None => continue,
1118            };
1119
1120            if let Some(found) = self.find_link(&sub_node, name) {
1121                return Ok(Some(found));
1122            }
1123
1124            if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1125                return Ok(Some(deep_found));
1126            }
1127        }
1128
1129        Ok(None)
1130    }
1131
1132    /// Get total size of a tree
1133    pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1134        let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1135            Some(d) => d,
1136            None => return Ok(0),
1137        };
1138
1139        if !is_tree_node(&data) {
1140            return Ok(data.len() as u64);
1141        }
1142
1143        let node = decode_tree_node(&data)?;
1144        // Calculate from children
1145        let mut total = 0u64;
1146        for link in &node.links {
1147            total += link.size;
1148        }
1149        Ok(total)
1150    }
1151
1152    /// Walk entire tree depth-first (returns Vec)
1153    pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1154        let mut entries = Vec::new();
1155        self.walk_recursive(cid, path, &mut entries).await?;
1156        Ok(entries)
1157    }
1158
1159    async fn walk_recursive(
1160        &self,
1161        cid: &Cid,
1162        path: &str,
1163        entries: &mut Vec<WalkEntry>,
1164    ) -> Result<(), HashTreeError> {
1165        let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1166            Some(d) => d,
1167            None => return Ok(()),
1168        };
1169
1170        // Decrypt if key is present
1171        let data = if let Some(key) = &cid.key {
1172            decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1173        } else {
1174            data
1175        };
1176
1177        let node = match try_decode_tree_node(&data) {
1178            Some(n) => n,
1179            None => {
1180                entries.push(WalkEntry {
1181                    path: path.to_string(),
1182                    hash: cid.hash,
1183                    link_type: LinkType::Blob,
1184                    size: data.len() as u64,
1185                    key: cid.key,
1186                });
1187                return Ok(());
1188            }
1189        };
1190
1191        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1192        entries.push(WalkEntry {
1193            path: path.to_string(),
1194            hash: cid.hash,
1195            link_type: node.node_type,
1196            size: node_size,
1197            key: cid.key,
1198        });
1199
1200        for link in &node.links {
1201            let child_path = match &link.name {
1202                Some(name) => {
1203                    if name.starts_with("_chunk_") || name.starts_with('_') {
1204                        // Internal nodes inherit parent's key
1205                        let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1206                        Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1207                        continue;
1208                    }
1209                    if path.is_empty() {
1210                        name.clone()
1211                    } else {
1212                        format!("{}/{}", path, name)
1213                    }
1214                }
1215                None => path.to_string(),
1216            };
1217
1218            // Child nodes use their own key from link
1219            let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1220            Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1221        }
1222
1223        Ok(())
1224    }
1225
1226    /// Walk entire tree with parallel fetching
1227    /// Uses a work-stealing approach: always keeps `concurrency` requests in flight
1228    pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1229        self.walk_parallel_with_progress(cid, path, concurrency, None).await
1230    }
1231
1232    /// Walk entire tree with parallel fetching and optional progress counter
1233    /// The counter is incremented for each node fetched (not just entries found)
1234    ///
1235    /// OPTIMIZATION: Blobs are NOT fetched - their metadata (hash, size, link_type)
1236    /// comes from the parent node's link, so we just add them directly to entries.
1237    /// This avoids downloading file contents during tree traversal.
1238    pub async fn walk_parallel_with_progress(
1239        &self,
1240        cid: &Cid,
1241        path: &str,
1242        concurrency: usize,
1243        progress: Option<&std::sync::atomic::AtomicUsize>,
1244    ) -> Result<Vec<WalkEntry>, HashTreeError> {
1245        use futures::stream::{FuturesUnordered, StreamExt};
1246        use std::collections::VecDeque;
1247        use std::sync::atomic::Ordering;
1248
1249        let mut entries = Vec::new();
1250        let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1251        let mut active = FuturesUnordered::new();
1252
1253        // Seed with root
1254        pending.push_back((cid.clone(), path.to_string()));
1255
1256        loop {
1257            // Fill up to concurrency limit from pending queue
1258            while active.len() < concurrency {
1259                if let Some((node_cid, node_path)) = pending.pop_front() {
1260                    let store = &self.store;
1261                    let fut = async move {
1262                        let data = store.get(&node_cid.hash).await
1263                            .map_err(|e| HashTreeError::Store(e.to_string()))?;
1264                        Ok::<_, HashTreeError>((node_cid, node_path, data))
1265                    };
1266                    active.push(fut);
1267                } else {
1268                    break;
1269                }
1270            }
1271
1272            // If nothing active, we're done
1273            if active.is_empty() {
1274                break;
1275            }
1276
1277            // Wait for any future to complete
1278            if let Some(result) = active.next().await {
1279                let (node_cid, node_path, data) = result?;
1280
1281                // Update progress counter
1282                if let Some(counter) = progress {
1283                    counter.fetch_add(1, Ordering::Relaxed);
1284                }
1285
1286                let data = match data {
1287                    Some(d) => d,
1288                    None => continue,
1289                };
1290
1291                // Decrypt if key is present
1292                let data = if let Some(key) = &node_cid.key {
1293                    decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1294                } else {
1295                    data
1296                };
1297
1298                let node = match try_decode_tree_node(&data) {
1299                    Some(n) => n,
1300                    None => {
1301                        // It's a blob/file - this case only happens for root
1302                        entries.push(WalkEntry {
1303                            path: node_path,
1304                            hash: node_cid.hash,
1305                            link_type: LinkType::Blob,
1306                            size: data.len() as u64,
1307                            key: node_cid.key,
1308                        });
1309                        continue;
1310                    }
1311                };
1312
1313                // It's a directory/file node
1314                let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1315                entries.push(WalkEntry {
1316                    path: node_path.clone(),
1317                    hash: node_cid.hash,
1318                    link_type: node.node_type,
1319                    size: node_size,
1320                    key: node_cid.key,
1321                });
1322
1323                // Queue children - but DON'T fetch blobs, just add them directly
1324                for link in &node.links {
1325                    let child_path = match &link.name {
1326                        Some(name) => {
1327                            if name.starts_with("_chunk_") || name.starts_with('_') {
1328                                // Internal chunked nodes - inherit parent's key, same path
1329                                let sub_cid = Cid { hash: link.hash, key: node_cid.key, size: 0 };
1330                                pending.push_back((sub_cid, node_path.clone()));
1331                                continue;
1332                            }
1333                            if node_path.is_empty() {
1334                                name.clone()
1335                            } else {
1336                                format!("{}/{}", node_path, name)
1337                            }
1338                        }
1339                        None => node_path.clone(),
1340                    };
1341
1342                    // OPTIMIZATION: If it's a blob, add entry directly without fetching
1343                    // The link already contains all the metadata we need
1344                    if link.link_type == LinkType::Blob {
1345                        entries.push(WalkEntry {
1346                            path: child_path,
1347                            hash: link.hash,
1348                            link_type: LinkType::Blob,
1349                            size: link.size,
1350                            key: link.key,
1351                        });
1352                        if let Some(counter) = progress {
1353                            counter.fetch_add(1, Ordering::Relaxed);
1354                        }
1355                        continue;
1356                    }
1357
1358                    // For tree nodes (File/Dir), we need to fetch to see their children
1359                    let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1360                    pending.push_back((child_cid, child_path));
1361                }
1362            }
1363        }
1364
1365        Ok(entries)
1366    }
1367
1368    /// Walk tree as stream
1369    pub fn walk_stream(
1370        &self,
1371        cid: Cid,
1372        initial_path: String,
1373    ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1374        Box::pin(stream::unfold(
1375            WalkStreamState::Init { cid, path: initial_path, tree: self },
1376            |state| async move {
1377                match state {
1378                    WalkStreamState::Init { cid, path, tree } => {
1379                        let data = match tree.store.get(&cid.hash).await {
1380                            Ok(Some(d)) => d,
1381                            Ok(None) => return None,
1382                            Err(e) => {
1383                                return Some((
1384                                    Err(HashTreeError::Store(e.to_string())),
1385                                    WalkStreamState::Done,
1386                                ))
1387                            }
1388                        };
1389
1390                        // Decrypt if key is present
1391                        let data = if let Some(key) = &cid.key {
1392                            match decrypt_chk(&data, key) {
1393                                Ok(d) => d,
1394                                Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1395                            }
1396                        } else {
1397                            data
1398                        };
1399
1400                        let node = match try_decode_tree_node(&data) {
1401                            Some(n) => n,
1402                            None => {
1403                                // Blob data
1404                                let entry = WalkEntry {
1405                                    path,
1406                                    hash: cid.hash,
1407                                    link_type: LinkType::Blob,
1408                                    size: data.len() as u64,
1409                                    key: cid.key,
1410                                };
1411                                return Some((Ok(entry), WalkStreamState::Done));
1412                            }
1413                        };
1414
1415                        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1416                        let entry = WalkEntry {
1417                            path: path.clone(),
1418                            hash: cid.hash,
1419                            link_type: node.node_type,
1420                            size: node_size,
1421                            key: cid.key,
1422                        };
1423
1424                        // Create stack with children to process
1425                        let mut stack: Vec<WalkStackItem> = Vec::new();
1426                        for link in node.links.into_iter().rev() {
1427                            let child_path = match &link.name {
1428                                Some(name) if !name.starts_with('_') => {
1429                                    if path.is_empty() {
1430                                        name.clone()
1431                                    } else {
1432                                        format!("{}/{}", path, name)
1433                                    }
1434                                }
1435                                _ => path.clone(),
1436                            };
1437                            // Child nodes use their own key from link
1438                            stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1439                        }
1440
1441                        Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1442                    }
1443                    WalkStreamState::Processing { mut stack, tree } => {
1444                        tree.process_walk_stack(&mut stack).await
1445                    }
1446                    WalkStreamState::Done => None,
1447                }
1448            },
1449        ))
1450    }
1451
1452    async fn process_walk_stack<'a>(
1453        &'a self,
1454        stack: &mut Vec<WalkStackItem>,
1455    ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1456        while let Some(item) = stack.pop() {
1457            let data = match self.store.get(&item.hash).await {
1458                Ok(Some(d)) => d,
1459                Ok(None) => continue,
1460                Err(e) => {
1461                    return Some((
1462                        Err(HashTreeError::Store(e.to_string())),
1463                        WalkStreamState::Done,
1464                    ))
1465                }
1466            };
1467
1468            let node = match try_decode_tree_node(&data) {
1469                Some(n) => n,
1470                None => {
1471                    // Blob data
1472                    let entry = WalkEntry {
1473                        path: item.path,
1474                        hash: item.hash,
1475                        link_type: LinkType::Blob,
1476                        size: data.len() as u64,
1477                        key: item.key,
1478                    };
1479                    return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1480                }
1481            };
1482
1483            let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1484            let entry = WalkEntry {
1485                path: item.path.clone(),
1486                hash: item.hash,
1487                link_type: node.node_type,
1488                size: node_size,
1489                key: None, // directories are not encrypted
1490            };
1491
1492            // Push children to stack
1493            for link in node.links.into_iter().rev() {
1494                let child_path = match &link.name {
1495                    Some(name) if !name.starts_with('_') => {
1496                        if item.path.is_empty() {
1497                            name.clone()
1498                        } else {
1499                            format!("{}/{}", item.path, name)
1500                        }
1501                    }
1502                    _ => item.path.clone(),
1503                };
1504                stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1505            }
1506
1507            return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1508        }
1509        None
1510    }
1511
1512    // ============ EDIT ============
1513
1514    /// Add or update an entry in a directory
1515    /// Returns new root Cid (immutable operation)
1516    pub async fn set_entry(
1517        &self,
1518        root: &Cid,
1519        path: &[&str],
1520        name: &str,
1521        entry_cid: &Cid,
1522        link_type: LinkType,
1523    ) -> Result<Cid, HashTreeError> {
1524        let dir_cid = self.resolve_path_array(root, path).await?;
1525        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1526
1527        let entries = self.list_directory(&dir_cid).await?;
1528        let mut new_entries: Vec<DirEntry> = entries
1529            .into_iter()
1530            .filter(|e| e.name != name)
1531            .map(|e| DirEntry {
1532                name: e.name,
1533                hash: e.hash,
1534                size: e.size,
1535                key: e.key,
1536                link_type: e.link_type,
1537                meta: e.meta,
1538            })
1539            .collect();
1540
1541        new_entries.push(DirEntry {
1542            name: name.to_string(),
1543            hash: entry_cid.hash,
1544            size: entry_cid.size,
1545            key: entry_cid.key,
1546            link_type,
1547            meta: None,
1548        });
1549
1550        let new_dir_cid = self.put_directory(new_entries).await?;
1551        self.rebuild_path(root, path, new_dir_cid).await
1552    }
1553
1554    /// Remove an entry from a directory
1555    /// Returns new root Cid
1556    pub async fn remove_entry(
1557        &self,
1558        root: &Cid,
1559        path: &[&str],
1560        name: &str,
1561    ) -> Result<Cid, HashTreeError> {
1562        let dir_cid = self.resolve_path_array(root, path).await?;
1563        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1564
1565        let entries = self.list_directory(&dir_cid).await?;
1566        let new_entries: Vec<DirEntry> = entries
1567            .into_iter()
1568            .filter(|e| e.name != name)
1569            .map(|e| DirEntry {
1570                name: e.name,
1571                hash: e.hash,
1572                size: e.size,
1573                key: e.key,
1574                link_type: e.link_type,
1575                meta: e.meta,
1576            })
1577            .collect();
1578
1579        let new_dir_cid = self.put_directory(new_entries).await?;
1580        self.rebuild_path(root, path, new_dir_cid).await
1581    }
1582
1583    /// Rename an entry in a directory
1584    /// Returns new root Cid
1585    pub async fn rename_entry(
1586        &self,
1587        root: &Cid,
1588        path: &[&str],
1589        old_name: &str,
1590        new_name: &str,
1591    ) -> Result<Cid, HashTreeError> {
1592        if old_name == new_name {
1593            return Ok(root.clone());
1594        }
1595
1596        let dir_cid = self.resolve_path_array(root, path).await?;
1597        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1598
1599        let entries = self.list_directory(&dir_cid).await?;
1600        let entry = entries
1601            .iter()
1602            .find(|e| e.name == old_name)
1603            .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1604
1605        let entry_hash = entry.hash;
1606        let entry_size = entry.size;
1607        let entry_key = entry.key;
1608        let entry_link_type = entry.link_type;
1609        let entry_meta = entry.meta.clone();
1610
1611        let new_entries: Vec<DirEntry> = entries
1612            .into_iter()
1613            .filter(|e| e.name != old_name)
1614            .map(|e| DirEntry {
1615                name: e.name,
1616                hash: e.hash,
1617                size: e.size,
1618                key: e.key,
1619                link_type: e.link_type,
1620                meta: e.meta,
1621            })
1622            .chain(std::iter::once(DirEntry {
1623                name: new_name.to_string(),
1624                hash: entry_hash,
1625                size: entry_size,
1626                key: entry_key,
1627                link_type: entry_link_type,
1628                meta: entry_meta,
1629            }))
1630            .collect();
1631
1632        let new_dir_cid = self.put_directory(new_entries).await?;
1633        self.rebuild_path(root, path, new_dir_cid).await
1634    }
1635
1636    /// Move an entry to a different directory
1637    /// Returns new root Cid
1638    pub async fn move_entry(
1639        &self,
1640        root: &Cid,
1641        source_path: &[&str],
1642        name: &str,
1643        target_path: &[&str],
1644    ) -> Result<Cid, HashTreeError> {
1645        let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1646        let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1647
1648        let source_entries = self.list_directory(&source_dir_cid).await?;
1649        let entry = source_entries
1650            .iter()
1651            .find(|e| e.name == name)
1652            .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1653
1654        let entry_cid = Cid {
1655            hash: entry.hash,
1656            key: entry.key,
1657            size: entry.size,
1658        };
1659        let entry_link_type = entry.link_type;
1660
1661        // Remove from source
1662        let new_root = self.remove_entry(root, source_path, name).await?;
1663
1664        // Add to target
1665        self.set_entry(&new_root, target_path, name, &entry_cid, entry_link_type).await
1666    }
1667
1668    async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1669        if path.is_empty() {
1670            return Ok(Some(root.clone()));
1671        }
1672        self.resolve_path(root, &path.join("/")).await
1673    }
1674
1675    async fn rebuild_path(
1676        &self,
1677        root: &Cid,
1678        path: &[&str],
1679        new_child: Cid,
1680    ) -> Result<Cid, HashTreeError> {
1681        if path.is_empty() {
1682            return Ok(new_child);
1683        }
1684
1685        let mut child_cid = new_child;
1686        let parts: Vec<&str> = path.to_vec();
1687
1688        for i in (0..parts.len()).rev() {
1689            let child_name = parts[i];
1690            let parent_path = &parts[..i];
1691
1692            let parent_cid = if parent_path.is_empty() {
1693                root.clone()
1694            } else {
1695                self.resolve_path_array(root, parent_path)
1696                    .await?
1697                    .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1698            };
1699
1700            let parent_entries = self.list_directory(&parent_cid).await?;
1701            let new_parent_entries: Vec<DirEntry> = parent_entries
1702                .into_iter()
1703                .map(|e| {
1704                    if e.name == child_name {
1705                        DirEntry {
1706                            name: e.name,
1707                            hash: child_cid.hash,
1708                            size: child_cid.size,
1709                            key: child_cid.key,
1710                            link_type: e.link_type,
1711                            meta: e.meta,
1712                        }
1713                    } else {
1714                        DirEntry {
1715                            name: e.name,
1716                            hash: e.hash,
1717                            size: e.size,
1718                            key: e.key,
1719                            link_type: e.link_type,
1720                            meta: e.meta,
1721                        }
1722                    }
1723                })
1724                .collect();
1725
1726            child_cid = self.put_directory(new_parent_entries).await?;
1727        }
1728
1729        Ok(child_cid)
1730    }
1731
1732    // ============ UTILITY ============
1733
1734    /// Get the underlying store
1735    pub fn get_store(&self) -> Arc<S> {
1736        self.store.clone()
1737    }
1738
1739    /// Get chunk size configuration
1740    pub fn chunk_size(&self) -> usize {
1741        self.chunk_size
1742    }
1743
1744    /// Get max links configuration
1745    pub fn max_links(&self) -> usize {
1746        self.max_links
1747    }
1748}
1749
1750// Internal state types for streaming
1751
1752enum StreamStackItem {
1753    Hash(Hash),
1754}
1755
1756enum ReadStreamState<'a, S: Store> {
1757    Init { hash: Hash, tree: &'a HashTree<S> },
1758    Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1759    Done,
1760}
1761
1762struct WalkStackItem {
1763    hash: Hash,
1764    path: String,
1765    key: Option<[u8; 32]>,
1766}
1767
1768enum WalkStreamState<'a, S: Store> {
1769    Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1770    Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1771    Done,
1772}
1773
1774// Encrypted stream state types
1775struct EncryptedStackItem {
1776    hash: Hash,
1777    key: Option<[u8; 32]>,
1778}
1779
1780enum EncryptedStreamState<'a, S: Store> {
1781    Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1782    Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1783    Done,
1784}
1785
1786/// Verify tree integrity - checks that all referenced hashes exist
1787pub async fn verify_tree<S: Store>(
1788    store: Arc<S>,
1789    root_hash: &Hash,
1790) -> Result<crate::reader::VerifyResult, HashTreeError> {
1791    let mut missing = Vec::new();
1792    let mut visited = std::collections::HashSet::new();
1793
1794    verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1795
1796    Ok(crate::reader::VerifyResult {
1797        valid: missing.is_empty(),
1798        missing,
1799    })
1800}
1801
1802async fn verify_recursive<S: Store>(
1803    store: Arc<S>,
1804    hash: &Hash,
1805    missing: &mut Vec<Hash>,
1806    visited: &mut std::collections::HashSet<String>,
1807) -> Result<(), HashTreeError> {
1808    let hex = to_hex(hash);
1809    if visited.contains(&hex) {
1810        return Ok(());
1811    }
1812    visited.insert(hex);
1813
1814    let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1815        Some(d) => d,
1816        None => {
1817            missing.push(*hash);
1818            return Ok(());
1819        }
1820    };
1821
1822    if is_tree_node(&data) {
1823        let node = decode_tree_node(&data)?;
1824        for link in &node.links {
1825            Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1826        }
1827    }
1828
1829    Ok(())
1830}
1831
1832#[cfg(test)]
1833mod tests {
1834    use super::*;
1835    use crate::store::MemoryStore;
1836
1837    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1838        let store = Arc::new(MemoryStore::new());
1839        // Use public (unencrypted) mode for these tests
1840        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1841        (store, tree)
1842    }
1843
1844    #[tokio::test]
1845    async fn test_put_and_read_blob() {
1846        let (_store, tree) = make_tree();
1847
1848        let data = vec![1, 2, 3, 4, 5];
1849        let hash = tree.put_blob(&data).await.unwrap();
1850
1851        let result = tree.get_blob(&hash).await.unwrap();
1852        assert_eq!(result, Some(data));
1853    }
1854
1855    #[tokio::test]
1856    async fn test_put_and_read_file_small() {
1857        let (_store, tree) = make_tree();
1858
1859        let data = b"Hello, World!";
1860        let result = tree.put_file(data).await.unwrap();
1861
1862        assert_eq!(result.size, data.len() as u64);
1863
1864        let read_data = tree.read_file(&result.hash).await.unwrap();
1865        assert_eq!(read_data, Some(data.to_vec()));
1866    }
1867
1868    #[tokio::test]
1869    async fn test_put_and_read_directory() {
1870        let (_store, tree) = make_tree();
1871
1872        let file1 = tree.put_blob(b"content1").await.unwrap();
1873        let file2 = tree.put_blob(b"content2").await.unwrap();
1874
1875        let dir_cid = tree
1876            .put_directory(
1877                vec![
1878                    DirEntry::new("a.txt", file1).with_size(8),
1879                    DirEntry::new("b.txt", file2).with_size(8),
1880                ],
1881            )
1882            .await
1883            .unwrap();
1884
1885        let entries = tree.list_directory(&dir_cid).await.unwrap();
1886        assert_eq!(entries.len(), 2);
1887        let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1888        assert!(names.contains(&"a.txt"));
1889        assert!(names.contains(&"b.txt"));
1890    }
1891
1892    #[tokio::test]
1893    async fn test_is_directory() {
1894        let (_store, tree) = make_tree();
1895
1896        let file_hash = tree.put_blob(b"data").await.unwrap();
1897        let dir_cid = tree.put_directory(vec![]).await.unwrap();
1898
1899        assert!(!tree.is_directory(&file_hash).await.unwrap());
1900        assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1901    }
1902
1903    #[tokio::test]
1904    async fn test_resolve_path() {
1905        let (_store, tree) = make_tree();
1906
1907        let file_hash = tree.put_blob(b"nested").await.unwrap();
1908        let sub_dir = tree.put_directory(
1909            vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1910        ).await.unwrap();
1911        let root_dir = tree.put_directory(
1912            vec![DirEntry::new("subdir", sub_dir.hash)],
1913        ).await.unwrap();
1914
1915        let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1916        assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1917    }
1918
1919    // ============ UNIFIED API TESTS ============
1920
1921    #[tokio::test]
1922    async fn test_unified_put_get_public() {
1923        let store = Arc::new(MemoryStore::new());
1924        // Use .public() to disable encryption
1925        let tree = HashTree::new(HashTreeConfig::new(store).public());
1926
1927        let data = b"Hello, public world!";
1928        let cid = tree.put(data).await.unwrap();
1929
1930        assert_eq!(cid.size, data.len() as u64);
1931        assert!(cid.key.is_none()); // No key for public content
1932
1933        let retrieved = tree.get(&cid).await.unwrap().unwrap();
1934        assert_eq!(retrieved, data);
1935    }
1936
1937    #[tokio::test]
1938    async fn test_unified_put_get_encrypted() {
1939        let store = Arc::new(MemoryStore::new());
1940        // Default config has encryption enabled
1941        let tree = HashTree::new(HashTreeConfig::new(store));
1942
1943        let data = b"Hello, encrypted world!";
1944        let cid = tree.put(data).await.unwrap();
1945
1946        assert_eq!(cid.size, data.len() as u64);
1947        assert!(cid.key.is_some()); // Has encryption key
1948
1949        let retrieved = tree.get(&cid).await.unwrap().unwrap();
1950        assert_eq!(retrieved, data);
1951    }
1952
1953    #[tokio::test]
1954    async fn test_unified_put_get_encrypted_chunked() {
1955        let store = Arc::new(MemoryStore::new());
1956        let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
1957
1958        // Data larger than chunk size
1959        let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1960        let cid = tree.put(&data).await.unwrap();
1961
1962        assert_eq!(cid.size, data.len() as u64);
1963        assert!(cid.key.is_some());
1964
1965        let retrieved = tree.get(&cid).await.unwrap().unwrap();
1966        assert_eq!(retrieved, data);
1967    }
1968
1969    #[tokio::test]
1970    async fn test_cid_deterministic() {
1971        let store = Arc::new(MemoryStore::new());
1972        let tree = HashTree::new(HashTreeConfig::new(store));
1973
1974        let data = b"Same content produces same CID";
1975
1976        let cid1 = tree.put(data).await.unwrap();
1977        let cid2 = tree.put(data).await.unwrap();
1978
1979        // CHK: same content = same hash AND same key
1980        assert_eq!(cid1.hash, cid2.hash);
1981        assert_eq!(cid1.key, cid2.key);
1982        assert_eq!(cid1.to_string(), cid2.to_string());
1983    }
1984
1985    #[tokio::test]
1986    async fn test_cid_to_string_public() {
1987        let store = Arc::new(MemoryStore::new());
1988        let tree = HashTree::new(HashTreeConfig::new(store).public());
1989
1990        let cid = tree.put(b"test").await.unwrap();
1991        let s = cid.to_string();
1992
1993        // Public CID is just the hash (64 hex chars)
1994        assert_eq!(s.len(), 64);
1995        assert!(!s.contains(':'));
1996    }
1997
1998    #[tokio::test]
1999    async fn test_cid_to_string_encrypted() {
2000        let store = Arc::new(MemoryStore::new());
2001        let tree = HashTree::new(HashTreeConfig::new(store));
2002
2003        let cid = tree.put(b"test").await.unwrap();
2004        let s = cid.to_string();
2005
2006        // Encrypted CID is "hash:key" (64 + 1 + 64 = 129 chars)
2007        assert_eq!(s.len(), 129);
2008        assert!(s.contains(':'));
2009    }
2010}