Skip to main content

hashtree_core/
hashtree.rs

1//! HashTree - Unified merkle tree operations
2//!
3//! Single struct for creating, reading, and editing content-addressed merkle trees.
4//! Mirrors the hashtree-ts HashTree class API.
5
6use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::io::AsyncRead;
10use futures::stream::{self, Stream};
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{
15    decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node,
16};
17use crate::hash::sha256;
18use crate::reader::{ReaderError, TreeEntry, WalkEntry};
19use crate::store::Store;
20use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
21
22use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
23
24/// HashTree configuration
25#[derive(Clone)]
26pub struct HashTreeConfig<S: Store> {
27    pub store: Arc<S>,
28    pub chunk_size: usize,
29    pub max_links: usize,
30    /// Whether to encrypt content (default: true when encryption feature enabled)
31    pub encrypted: bool,
32}
33
34impl<S: Store> HashTreeConfig<S> {
35    pub fn new(store: Arc<S>) -> Self {
36        Self {
37            store,
38            chunk_size: DEFAULT_CHUNK_SIZE,
39            max_links: DEFAULT_MAX_LINKS,
40            encrypted: true,
41        }
42    }
43
44    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
45        self.chunk_size = chunk_size;
46        self
47    }
48
49    pub fn with_max_links(mut self, max_links: usize) -> Self {
50        self.max_links = max_links;
51        self
52    }
53
54    /// Disable encryption (store content publicly)
55    pub fn public(mut self) -> Self {
56        self.encrypted = false;
57        self
58    }
59}
60
61/// HashTree error type
62#[derive(Debug, thiserror::Error)]
63pub enum HashTreeError {
64    #[error("Store error: {0}")]
65    Store(String),
66    #[error("Codec error: {0}")]
67    Codec(#[from] crate::codec::CodecError),
68    #[error("Missing chunk: {0}")]
69    MissingChunk(String),
70    #[error("Path not found: {0}")]
71    PathNotFound(String),
72    #[error("Entry not found: {0}")]
73    EntryNotFound(String),
74    #[error("Encryption error: {0}")]
75    Encryption(String),
76    #[error("Decryption error: {0}")]
77    Decryption(String),
78    #[error("Content size {actual_size} exceeds max_size {max_size}")]
79    SizeLimitExceeded { max_size: u64, actual_size: u64 },
80}
81
82impl From<BuilderError> for HashTreeError {
83    fn from(e: BuilderError) -> Self {
84        match e {
85            BuilderError::Store(s) => HashTreeError::Store(s),
86            BuilderError::Codec(c) => HashTreeError::Codec(c),
87            BuilderError::Encryption(s) => HashTreeError::Encryption(s),
88        }
89    }
90}
91
92impl From<ReaderError> for HashTreeError {
93    fn from(e: ReaderError) -> Self {
94        match e {
95            ReaderError::Store(s) => HashTreeError::Store(s),
96            ReaderError::Codec(c) => HashTreeError::Codec(c),
97            ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
98            ReaderError::Decryption(s) => HashTreeError::Encryption(s),
99            ReaderError::MissingKey => {
100                HashTreeError::Encryption("missing decryption key".to_string())
101            }
102        }
103    }
104}
105
106/// HashTree - unified create, read, and edit merkle tree operations
107pub struct HashTree<S: Store> {
108    store: Arc<S>,
109    chunk_size: usize,
110    max_links: usize,
111    encrypted: bool,
112}
113
114impl<S: Store> HashTree<S> {
115    fn is_legacy_internal_group_name(name: &str) -> bool {
116        name.starts_with('_') && !name.starts_with("_chunk_") && name.chars().count() == 2
117    }
118
119    fn node_uses_legacy_directory_fanout(node: &TreeNode) -> bool {
120        !node.links.is_empty()
121            && node.links.iter().all(|link| {
122                let Some(name) = link.name.as_deref() else {
123                    return false;
124                };
125                Self::is_legacy_internal_group_name(name) && link.link_type == LinkType::Dir
126            })
127    }
128
129    fn is_internal_directory_link_with_legacy_fanout(
130        link: &Link,
131        uses_legacy_fanout: bool,
132    ) -> bool {
133        let Some(name) = link.name.as_deref() else {
134            return false;
135        };
136
137        if name.starts_with("_chunk_") {
138            return true;
139        }
140
141        uses_legacy_fanout
142            && Self::is_legacy_internal_group_name(name)
143            && link.link_type == LinkType::Dir
144    }
145
146    fn is_internal_directory_link(node: &TreeNode, link: &Link) -> bool {
147        Self::is_internal_directory_link_with_legacy_fanout(
148            link,
149            Self::node_uses_legacy_directory_fanout(node),
150        )
151    }
152
153    pub fn new(config: HashTreeConfig<S>) -> Self {
154        Self {
155            store: config.store,
156            chunk_size: config.chunk_size,
157            max_links: config.max_links,
158            encrypted: config.encrypted,
159        }
160    }
161
162    /// Check if encryption is enabled
163    pub fn is_encrypted(&self) -> bool {
164        self.encrypted
165    }
166
167    // ============ UNIFIED API ============
168
169    /// Store content, returns (Cid, size) where Cid is hash + optional key
170    /// Encrypts by default when encryption feature is enabled
171    pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
172        let size = data.len() as u64;
173
174        // Small data - store as single chunk
175        if data.len() <= self.chunk_size {
176            let (hash, key) = self.put_chunk_internal(data).await?;
177            return Ok((Cid { hash, key }, size));
178        }
179
180        // Large data - chunk it
181        let mut links: Vec<Link> = Vec::new();
182        let mut offset = 0;
183
184        while offset < data.len() {
185            let end = (offset + self.chunk_size).min(data.len());
186            let chunk = &data[offset..end];
187            let chunk_size = chunk.len() as u64;
188            let (hash, key) = self.put_chunk_internal(chunk).await?;
189            links.push(Link {
190                hash,
191                name: None,
192                size: chunk_size,
193                key,
194                link_type: LinkType::Blob, // Leaf chunk (raw blob)
195                meta: None,
196            });
197            offset = end;
198        }
199
200        // Build tree from chunks
201        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
202        Ok((
203            Cid {
204                hash: root_hash,
205                key: root_key,
206            },
207            size,
208        ))
209    }
210
211    /// Get content by Cid (handles decryption automatically)
212    ///
213    /// - `max_size`: Optional max plaintext size in bytes. If exceeded, returns
214    ///   `HashTreeError::SizeLimitExceeded`.
215    pub async fn get(
216        &self,
217        cid: &Cid,
218        max_size: Option<u64>,
219    ) -> Result<Option<Vec<u8>>, HashTreeError> {
220        if let Some(key) = cid.key {
221            self.get_encrypted(&cid.hash, &key, max_size).await
222        } else {
223            self.read_file_with_limit(&cid.hash, max_size).await
224        }
225    }
226
227    /// Store content from an async reader (streaming put)
228    ///
229    /// Reads data in chunks and builds a merkle tree incrementally.
230    /// Useful for large files or streaming data sources.
231    /// Returns (Cid, size) where Cid is hash + optional key
232    pub async fn put_stream<R: AsyncRead + Unpin>(
233        &self,
234        mut reader: R,
235    ) -> Result<(Cid, u64), HashTreeError> {
236        let mut buffer = vec![0u8; self.chunk_size];
237        let mut links = Vec::new();
238        let mut total_size: u64 = 0;
239        let mut consistent_key: Option<[u8; 32]> = None;
240
241        loop {
242            let mut chunk = Vec::new();
243            let mut bytes_read = 0;
244
245            // Read until we have a full chunk or EOF
246            while bytes_read < self.chunk_size {
247                let n = reader
248                    .read(&mut buffer[..self.chunk_size - bytes_read])
249                    .await
250                    .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
251                if n == 0 {
252                    break; // EOF
253                }
254                chunk.extend_from_slice(&buffer[..n]);
255                bytes_read += n;
256            }
257
258            if chunk.is_empty() {
259                break; // No more data
260            }
261
262            let chunk_len = chunk.len() as u64;
263            total_size += chunk_len;
264
265            let (hash, key) = self.put_chunk_internal(&chunk).await?;
266
267            // Track consistent key for single-key result
268            if links.is_empty() {
269                consistent_key = key;
270            } else if consistent_key != key {
271                consistent_key = None;
272            }
273
274            links.push(Link {
275                hash,
276                name: None,
277                size: chunk_len,
278                key,
279                link_type: LinkType::Blob, // Leaf chunk (raw blob)
280                meta: None,
281            });
282        }
283
284        if links.is_empty() {
285            // Empty input
286            let (hash, key) = self.put_chunk_internal(&[]).await?;
287            return Ok((Cid { hash, key }, 0));
288        }
289
290        // Build tree from chunks
291        let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
292        Ok((
293            Cid {
294                hash: root_hash,
295                key: root_key,
296            },
297            total_size,
298        ))
299    }
300
301    /// Read content as a stream of chunks by Cid (handles decryption automatically)
302    ///
303    /// Returns an async stream that yields chunks as they are read.
304    /// Useful for large files or when you want to process data incrementally.
305    pub fn get_stream(
306        &self,
307        cid: &Cid,
308    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
309        let hash = cid.hash;
310        let key = cid.key;
311
312        if let Some(k) = key {
313            // Encrypted stream
314            Box::pin(self.read_file_stream_encrypted(hash, k))
315        } else {
316            // Unencrypted stream
317            self.read_file_stream(hash)
318        }
319    }
320
321    /// Read encrypted file as stream (internal)
322    fn read_file_stream_encrypted(
323        &self,
324        hash: Hash,
325        key: EncryptionKey,
326    ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
327        stream::unfold(
328            EncryptedStreamState::Init {
329                hash,
330                key,
331                tree: self,
332            },
333            |state| async move {
334                match state {
335                    EncryptedStreamState::Init { hash, key, tree } => {
336                        let data = match tree.store.get(&hash).await {
337                            Ok(Some(d)) => d,
338                            Ok(None) => return None,
339                            Err(e) => {
340                                return Some((
341                                    Err(HashTreeError::Store(e.to_string())),
342                                    EncryptedStreamState::Done,
343                                ))
344                            }
345                        };
346
347                        // Try to decrypt
348                        let decrypted = match decrypt_chk(&data, &key) {
349                            Ok(d) => d,
350                            Err(e) => {
351                                return Some((
352                                    Err(HashTreeError::Decryption(e.to_string())),
353                                    EncryptedStreamState::Done,
354                                ))
355                            }
356                        };
357
358                        if !is_tree_node(&decrypted) {
359                            // Single blob - yield decrypted data
360                            return Some((Ok(decrypted), EncryptedStreamState::Done));
361                        }
362
363                        // Tree node - parse and traverse
364                        let node = match decode_tree_node(&decrypted) {
365                            Ok(n) => n,
366                            Err(e) => {
367                                return Some((
368                                    Err(HashTreeError::Codec(e)),
369                                    EncryptedStreamState::Done,
370                                ))
371                            }
372                        };
373
374                        let mut stack: Vec<EncryptedStackItem> = Vec::new();
375                        for link in node.links.into_iter().rev() {
376                            stack.push(EncryptedStackItem {
377                                hash: link.hash,
378                                key: link.key,
379                            });
380                        }
381
382                        tree.process_encrypted_stream_stack(&mut stack).await
383                    }
384                    EncryptedStreamState::Processing { mut stack, tree } => {
385                        tree.process_encrypted_stream_stack(&mut stack).await
386                    }
387                    EncryptedStreamState::Done => None,
388                }
389            },
390        )
391    }
392
393    async fn process_encrypted_stream_stack<'a>(
394        &'a self,
395        stack: &mut Vec<EncryptedStackItem>,
396    ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
397        while let Some(item) = stack.pop() {
398            let data = match self.store.get(&item.hash).await {
399                Ok(Some(d)) => d,
400                Ok(None) => {
401                    return Some((
402                        Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
403                        EncryptedStreamState::Done,
404                    ))
405                }
406                Err(e) => {
407                    return Some((
408                        Err(HashTreeError::Store(e.to_string())),
409                        EncryptedStreamState::Done,
410                    ))
411                }
412            };
413
414            // Decrypt if we have a key
415            let decrypted = if let Some(key) = item.key {
416                match decrypt_chk(&data, &key) {
417                    Ok(d) => d,
418                    Err(e) => {
419                        return Some((
420                            Err(HashTreeError::Decryption(e.to_string())),
421                            EncryptedStreamState::Done,
422                        ))
423                    }
424                }
425            } else {
426                data
427            };
428
429            if is_tree_node(&decrypted) {
430                // Nested tree node - add children to stack
431                let node = match decode_tree_node(&decrypted) {
432                    Ok(n) => n,
433                    Err(e) => {
434                        return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done))
435                    }
436                };
437                for link in node.links.into_iter().rev() {
438                    stack.push(EncryptedStackItem {
439                        hash: link.hash,
440                        key: link.key,
441                    });
442                }
443            } else {
444                // Leaf chunk - yield decrypted data
445                return Some((
446                    Ok(decrypted),
447                    EncryptedStreamState::Processing {
448                        stack: std::mem::take(stack),
449                        tree: self,
450                    },
451                ));
452            }
453        }
454        None
455    }
456
457    /// Store a chunk with optional encryption
458    async fn put_chunk_internal(
459        &self,
460        data: &[u8],
461    ) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
462        if self.encrypted {
463            let (encrypted, key) =
464                encrypt_chk(data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
465            let hash = sha256(&encrypted);
466            self.store
467                .put(hash, encrypted)
468                .await
469                .map_err(|e| HashTreeError::Store(e.to_string()))?;
470            Ok((hash, Some(key)))
471        } else {
472            let hash = self.put_blob(data).await?;
473            Ok((hash, None))
474        }
475    }
476
477    /// Build tree and return (hash, optional_key)
478    async fn build_tree_internal(
479        &self,
480        links: Vec<Link>,
481        total_size: Option<u64>,
482    ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
483        // Single link with matching size - return directly
484        if links.len() == 1 {
485            if let Some(ts) = total_size {
486                if links[0].size == ts {
487                    return Ok((links[0].hash, links[0].key));
488                }
489            }
490        }
491
492        if links.len() <= self.max_links {
493            let node = TreeNode {
494                node_type: LinkType::File,
495                links,
496            };
497            let (data, _) = encode_and_hash(&node)?;
498
499            if self.encrypted {
500                let (encrypted, key) =
501                    encrypt_chk(&data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
502                let hash = sha256(&encrypted);
503                self.store
504                    .put(hash, encrypted)
505                    .await
506                    .map_err(|e| HashTreeError::Store(e.to_string()))?;
507                return Ok((hash, Some(key)));
508            }
509
510            // Unencrypted path
511            let hash = sha256(&data);
512            self.store
513                .put(hash, data)
514                .await
515                .map_err(|e| HashTreeError::Store(e.to_string()))?;
516            return Ok((hash, None));
517        }
518
519        // Too many links - create subtrees
520        let mut sub_links = Vec::new();
521        for batch in links.chunks(self.max_links) {
522            let batch_size: u64 = batch.iter().map(|l| l.size).sum();
523            let (hash, key) =
524                Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
525            sub_links.push(Link {
526                hash,
527                name: None,
528                size: batch_size,
529                key,
530                link_type: LinkType::File, // Internal tree node
531                meta: None,
532            });
533        }
534
535        Box::pin(self.build_tree_internal(sub_links, total_size)).await
536    }
537
538    /// Get encrypted content by hash and key
539    async fn get_encrypted(
540        &self,
541        hash: &Hash,
542        key: &EncryptionKey,
543        max_size: Option<u64>,
544    ) -> Result<Option<Vec<u8>>, HashTreeError> {
545        let decrypted = match self.get_encrypted_root(hash, key).await? {
546            Some(data) => data,
547            None => return Ok(None),
548        };
549
550        // Check if it's a tree node
551        if is_tree_node(&decrypted) {
552            let node = decode_tree_node(&decrypted)?;
553            let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
554            Self::ensure_size_limit(max_size, declared_size)?;
555
556            let mut bytes_read = 0u64;
557            let assembled = self
558                .assemble_encrypted_chunks_limited(&node, max_size, &mut bytes_read)
559                .await?;
560            return Ok(Some(assembled));
561        }
562
563        // Single chunk data
564        Self::ensure_size_limit(max_size, decrypted.len() as u64)?;
565        Ok(Some(decrypted))
566    }
567
568    async fn get_encrypted_root(
569        &self,
570        hash: &Hash,
571        key: &EncryptionKey,
572    ) -> Result<Option<Vec<u8>>, HashTreeError> {
573        self.get_cid_root_bytes(&Cid {
574            hash: *hash,
575            key: Some(*key),
576        })
577        .await
578        .map_err(|err| match err {
579            HashTreeError::Decryption(message) => HashTreeError::Encryption(message),
580            other => other,
581        })
582    }
583
584    fn ensure_size_limit(max_size: Option<u64>, actual_size: u64) -> Result<(), HashTreeError> {
585        if let Some(max_size) = max_size {
586            if actual_size > max_size {
587                return Err(HashTreeError::SizeLimitExceeded {
588                    max_size,
589                    actual_size,
590                });
591            }
592        }
593        Ok(())
594    }
595
596    /// Assemble encrypted chunks from tree
597    async fn assemble_encrypted_chunks_limited(
598        &self,
599        node: &TreeNode,
600        max_size: Option<u64>,
601        bytes_read: &mut u64,
602    ) -> Result<Vec<u8>, HashTreeError> {
603        let mut parts: Vec<Vec<u8>> = Vec::new();
604
605        for link in &node.links {
606            let projected = (*bytes_read).saturating_add(link.size);
607            Self::ensure_size_limit(max_size, projected)?;
608
609            let chunk_key = link
610                .key
611                .ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
612
613            let encrypted_child = self
614                .store
615                .get(&link.hash)
616                .await
617                .map_err(|e| HashTreeError::Store(e.to_string()))?
618                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
619
620            let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
621                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
622
623            if is_tree_node(&decrypted) {
624                // Intermediate tree node - recurse
625                let child_node = decode_tree_node(&decrypted)?;
626                let child_data = Box::pin(self.assemble_encrypted_chunks_limited(
627                    &child_node,
628                    max_size,
629                    bytes_read,
630                ))
631                .await?;
632                parts.push(child_data);
633            } else {
634                // Leaf data chunk
635                let projected = (*bytes_read).saturating_add(decrypted.len() as u64);
636                Self::ensure_size_limit(max_size, projected)?;
637                *bytes_read = projected;
638                parts.push(decrypted);
639            }
640        }
641
642        let total_len: usize = parts.iter().map(|p| p.len()).sum();
643        let mut result = Vec::with_capacity(total_len);
644        for part in parts {
645            result.extend_from_slice(&part);
646        }
647
648        Ok(result)
649    }
650
651    // ============ LOW-LEVEL CREATE ============
652
653    /// Store a blob directly (small data, no encryption)
654    /// Returns the content hash
655    pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
656        let hash = sha256(data);
657        self.store
658            .put(hash, data.to_vec())
659            .await
660            .map_err(|e| HashTreeError::Store(e.to_string()))?;
661        Ok(hash)
662    }
663
664    /// Store a file, chunking if necessary
665    /// Returns (Cid, size) where Cid is hash + optional key
666    pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
667        let size = data.len() as u64;
668
669        // Small file - store as single chunk
670        if data.len() <= self.chunk_size {
671            let (hash, key) = self.put_chunk_internal(data).await?;
672            return Ok((Cid { hash, key }, size));
673        }
674
675        // Large file - chunk it
676        let mut links: Vec<Link> = Vec::new();
677        let mut offset = 0;
678
679        while offset < data.len() {
680            let end = (offset + self.chunk_size).min(data.len());
681            let chunk = &data[offset..end];
682            let chunk_size = (end - offset) as u64;
683
684            let (hash, key) = self.put_chunk_internal(chunk).await?;
685            links.push(Link {
686                hash,
687                name: None,
688                size: chunk_size,
689                key,
690                link_type: LinkType::Blob, // Leaf chunk
691                meta: None,
692            });
693            offset = end;
694        }
695
696        // Build tree from chunks (uses encryption if enabled)
697        let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
698        Ok((
699            Cid {
700                hash: root_hash,
701                key: root_key,
702            },
703            size,
704        ))
705    }
706
707    /// Build a directory from entries
708    /// Returns Cid with key if encrypted
709    ///
710    /// For large directories, the messagepack-encoded TreeNode is stored via put()
711    /// which automatically chunks the data. The reader uses read_file() to reassemble.
712    pub async fn put_directory(&self, entries: Vec<DirEntry>) -> Result<Cid, HashTreeError> {
713        // Sort entries by name for deterministic hashing
714        let mut sorted = entries;
715        sorted.sort_by(|a, b| a.name.cmp(&b.name));
716
717        let links: Vec<Link> = sorted
718            .into_iter()
719            .map(|e| Link {
720                hash: e.hash,
721                name: Some(e.name),
722                size: e.size,
723                key: e.key,
724                link_type: e.link_type,
725                meta: e.meta,
726            })
727            .collect();
728
729        // Create the directory node with all entries
730        let node = TreeNode {
731            node_type: LinkType::Dir,
732            links,
733        };
734        let (data, _plain_hash) = encode_and_hash(&node)?;
735
736        // Store directory data via put() - handles both small and large directories
737        // For small dirs, stores as single chunk
738        // For large dirs, chunks transparently via build_tree()
739        // Reader uses read_file() to reassemble before decoding
740        let (cid, _size) = self.put(&data).await?;
741        Ok(cid)
742    }
743
744    /// Create a tree node with custom links
745    pub async fn put_tree_node(&self, links: Vec<Link>) -> Result<Hash, HashTreeError> {
746        let node = TreeNode {
747            node_type: LinkType::Dir,
748            links,
749        };
750
751        let (data, hash) = encode_and_hash(&node)?;
752        self.store
753            .put(hash, data)
754            .await
755            .map_err(|e| HashTreeError::Store(e.to_string()))?;
756        Ok(hash)
757    }
758
759    // ============ READ ============
760
761    /// Get raw data by hash
762    pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
763        self.store
764            .get(hash)
765            .await
766            .map_err(|e| HashTreeError::Store(e.to_string()))
767    }
768
769    /// Get and decode a tree node (unencrypted)
770    pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
771        let data = match self
772            .store
773            .get(hash)
774            .await
775            .map_err(|e| HashTreeError::Store(e.to_string()))?
776        {
777            Some(d) => d,
778            None => return Ok(None),
779        };
780
781        if !is_tree_node(&data) {
782            return Ok(None);
783        }
784
785        let node = decode_tree_node(&data)?;
786        Ok(Some(node))
787    }
788
789    async fn get_cid_root_bytes(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
790        let data = match self
791            .store
792            .get(&cid.hash)
793            .await
794            .map_err(|e| HashTreeError::Store(e.to_string()))?
795        {
796            Some(d) => d,
797            None => return Ok(None),
798        };
799
800        let Some(key) = &cid.key else {
801            return Ok(Some(data));
802        };
803
804        let raw_is_tree = is_tree_node(&data);
805        match decrypt_chk(&data, key) {
806            Ok(decrypted) => {
807                if is_tree_node(&decrypted) || !raw_is_tree {
808                    Ok(Some(decrypted))
809                } else {
810                    Ok(Some(data))
811                }
812            }
813            Err(err) => {
814                if raw_is_tree {
815                    Ok(Some(data))
816                } else {
817                    Err(HashTreeError::Decryption(err.to_string()))
818                }
819            }
820        }
821    }
822
823    /// Get and decode a tree node using Cid (with decryption if key present)
824    pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
825        let decrypted = match self.get_cid_root_bytes(cid).await? {
826            Some(d) => d,
827            None => return Ok(None),
828        };
829
830        if !is_tree_node(&decrypted) {
831            return Ok(None);
832        }
833
834        let node = decode_tree_node(&decrypted)?;
835        Ok(Some(node))
836    }
837
838    /// Get directory node, handling chunked directory data
839    /// Use this when you know the target is a directory (from parent link_type)
840    pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
841        let decrypted = match self.get_cid_root_bytes(cid).await? {
842            Some(d) => d,
843            None => return Ok(None),
844        };
845
846        if !is_tree_node(&decrypted) {
847            return Ok(None);
848        }
849
850        let node = decode_tree_node(&decrypted)?;
851
852        // If this is a file tree (chunked data), reassemble to get actual directory
853        if node.node_type == LinkType::File {
854            let mut bytes_read = 0u64;
855            let assembled = self
856                .assemble_chunks_limited(&node, None, &mut bytes_read)
857                .await?;
858            if is_tree_node(&assembled) {
859                let inner_node = decode_tree_node(&assembled)?;
860                return Ok(Some(inner_node));
861            }
862        }
863
864        Ok(Some(node))
865    }
866
867    /// Check if hash points to a tree node (no decryption)
868    pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
869        let data = match self
870            .store
871            .get(hash)
872            .await
873            .map_err(|e| HashTreeError::Store(e.to_string()))?
874        {
875            Some(d) => d,
876            None => return Ok(false),
877        };
878        Ok(is_tree_node(&data))
879    }
880
881    /// Check if Cid points to a directory (with decryption)
882    pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
883        Ok(matches!(
884            self.get_directory_node(cid).await?,
885            Some(node) if node.node_type == LinkType::Dir
886        ))
887    }
888
889    /// Check if hash points to a directory (tree with named links, no decryption)
890    pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
891        let data = match self
892            .store
893            .get(hash)
894            .await
895            .map_err(|e| HashTreeError::Store(e.to_string()))?
896        {
897            Some(d) => d,
898            None => return Ok(false),
899        };
900        Ok(is_directory_node(&data))
901    }
902
903    /// Read a complete file (reassemble chunks if needed)
904    pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
905        self.read_file_with_limit(hash, None).await
906    }
907
908    /// Read a complete file with optional size limit.
909    async fn read_file_with_limit(
910        &self,
911        hash: &Hash,
912        max_size: Option<u64>,
913    ) -> Result<Option<Vec<u8>>, HashTreeError> {
914        let data = match self
915            .store
916            .get(hash)
917            .await
918            .map_err(|e| HashTreeError::Store(e.to_string()))?
919        {
920            Some(d) => d,
921            None => return Ok(None),
922        };
923
924        // Check if it's a tree (chunked file) or raw blob
925        if !is_tree_node(&data) {
926            Self::ensure_size_limit(max_size, data.len() as u64)?;
927            return Ok(Some(data));
928        }
929
930        // It's a tree - reassemble chunks
931        let node = decode_tree_node(&data)?;
932        let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
933        Self::ensure_size_limit(max_size, declared_size)?;
934
935        let mut bytes_read = 0u64;
936        let assembled = self
937            .assemble_chunks_limited(&node, max_size, &mut bytes_read)
938            .await?;
939        Ok(Some(assembled))
940    }
941
942    /// Read a byte range from a file (fetches only necessary chunks)
943    ///
944    /// - `start`: Starting byte offset (inclusive)
945    /// - `end`: Ending byte offset (exclusive), or None to read to end
946    ///
947    /// This is more efficient than read_file() for partial reads of large files.
948    pub async fn read_file_range(
949        &self,
950        hash: &Hash,
951        start: u64,
952        end: Option<u64>,
953    ) -> Result<Option<Vec<u8>>, HashTreeError> {
954        let data = match self
955            .store
956            .get(hash)
957            .await
958            .map_err(|e| HashTreeError::Store(e.to_string()))?
959        {
960            Some(d) => d,
961            None => return Ok(None),
962        };
963
964        // Single blob - just slice it
965        if !is_tree_node(&data) {
966            let start_idx = start as usize;
967            let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
968            if start_idx >= data.len() {
969                return Ok(Some(vec![]));
970            }
971            let end_idx = end_idx.min(data.len());
972            return Ok(Some(data[start_idx..end_idx].to_vec()));
973        }
974
975        // It's a chunked file - fetch only needed chunks
976        let node = decode_tree_node(&data)?;
977        let range_data = self.assemble_chunks_range(&node, start, end).await?;
978        Ok(Some(range_data))
979    }
980
981    /// Read a byte range from a file using a Cid (handles decryption if key present)
982    pub async fn read_file_range_cid(
983        &self,
984        cid: &Cid,
985        start: u64,
986        end: Option<u64>,
987    ) -> Result<Option<Vec<u8>>, HashTreeError> {
988        if let Some(key) = cid.key {
989            let data = match self.get_encrypted_root(&cid.hash, &key).await? {
990                Some(d) => d,
991                None => return Ok(None),
992            };
993
994            if is_tree_node(&data) {
995                let node = decode_tree_node(&data)?;
996                let total_size: u64 = node.links.iter().map(|link| link.size).sum();
997                let actual_end = end.unwrap_or(total_size).min(total_size);
998                if start >= actual_end {
999                    return Ok(Some(vec![]));
1000                }
1001
1002                let mut result = Vec::with_capacity((actual_end - start) as usize);
1003                self.append_encrypted_range(&node, start, actual_end, 0, &mut result)
1004                    .await?;
1005                return Ok(Some(result));
1006            }
1007
1008            let start_idx = start as usize;
1009            let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
1010            if start_idx >= data.len() {
1011                return Ok(Some(vec![]));
1012            }
1013            let end_idx = end_idx.min(data.len());
1014            return Ok(Some(data[start_idx..end_idx].to_vec()));
1015        }
1016
1017        self.read_file_range(&cid.hash, start, end).await
1018    }
1019
1020    async fn append_encrypted_range(
1021        &self,
1022        node: &TreeNode,
1023        start: u64,
1024        end: u64,
1025        base_offset: u64,
1026        result: &mut Vec<u8>,
1027    ) -> Result<(), HashTreeError> {
1028        let mut current_offset = base_offset;
1029
1030        for link in &node.links {
1031            let child_start = current_offset;
1032            let child_end = child_start.saturating_add(link.size);
1033            current_offset = child_end;
1034
1035            if child_end <= start {
1036                continue;
1037            }
1038            if child_start >= end {
1039                break;
1040            }
1041
1042            let chunk_key = link
1043                .key
1044                .ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
1045
1046            let encrypted_child = self
1047                .store
1048                .get(&link.hash)
1049                .await
1050                .map_err(|e| HashTreeError::Store(e.to_string()))?
1051                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1052            let decrypted_child = decrypt_chk(&encrypted_child, &chunk_key)
1053                .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
1054
1055            if is_tree_node(&decrypted_child) {
1056                let child_node = decode_tree_node(&decrypted_child)?;
1057                Box::pin(self.append_encrypted_range(&child_node, start, end, child_start, result))
1058                    .await?;
1059                continue;
1060            }
1061
1062            let slice_start = if start > child_start {
1063                (start - child_start) as usize
1064            } else {
1065                0
1066            };
1067            let slice_end = if end < child_end {
1068                (end - child_start) as usize
1069            } else {
1070                decrypted_child.len()
1071            };
1072            result.extend_from_slice(&decrypted_child[slice_start..slice_end]);
1073        }
1074
1075        Ok(())
1076    }
1077
1078    /// Assemble only the chunks needed for a byte range
1079    async fn assemble_chunks_range(
1080        &self,
1081        node: &TreeNode,
1082        start: u64,
1083        end: Option<u64>,
1084    ) -> Result<Vec<u8>, HashTreeError> {
1085        // First, flatten the tree to get all leaf chunks with their byte offsets
1086        let chunks_info = self.collect_chunk_offsets(node).await?;
1087
1088        if chunks_info.is_empty() {
1089            return Ok(vec![]);
1090        }
1091
1092        // Calculate total size and actual end
1093        let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
1094        let actual_end = end.unwrap_or(total_size).min(total_size);
1095
1096        if start >= actual_end {
1097            return Ok(vec![]);
1098        }
1099
1100        // Find chunks that overlap with [start, actual_end)
1101        let mut result = Vec::with_capacity((actual_end - start) as usize);
1102        let mut current_offset = 0u64;
1103
1104        for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
1105            let chunk_start = current_offset;
1106            let chunk_end = current_offset + chunk_size;
1107
1108            // Check if this chunk overlaps with our range
1109            if chunk_end > start && chunk_start < actual_end {
1110                // Fetch this chunk
1111                let chunk_data = self
1112                    .store
1113                    .get(chunk_hash)
1114                    .await
1115                    .map_err(|e| HashTreeError::Store(e.to_string()))?
1116                    .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
1117
1118                // Calculate slice bounds within this chunk
1119                let slice_start = if start > chunk_start {
1120                    (start - chunk_start) as usize
1121                } else {
1122                    0
1123                };
1124                let slice_end = if actual_end < chunk_end {
1125                    (actual_end - chunk_start) as usize
1126                } else {
1127                    chunk_data.len()
1128                };
1129
1130                result.extend_from_slice(&chunk_data[slice_start..slice_end]);
1131            }
1132
1133            current_offset = chunk_end;
1134
1135            // Early exit if we've passed the requested range
1136            if current_offset >= actual_end {
1137                break;
1138            }
1139        }
1140
1141        Ok(result)
1142    }
1143
1144    /// Collect all leaf chunk hashes with their byte offsets
1145    /// Returns Vec<(hash, offset, size)>
1146    async fn collect_chunk_offsets(
1147        &self,
1148        node: &TreeNode,
1149    ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
1150        let mut chunks = Vec::new();
1151        let mut offset = 0u64;
1152        self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset)
1153            .await?;
1154        Ok(chunks)
1155    }
1156
1157    async fn collect_chunk_offsets_recursive(
1158        &self,
1159        node: &TreeNode,
1160        chunks: &mut Vec<(Hash, u64, u64)>,
1161        offset: &mut u64,
1162    ) -> Result<(), HashTreeError> {
1163        for link in &node.links {
1164            let child_data = self
1165                .store
1166                .get(&link.hash)
1167                .await
1168                .map_err(|e| HashTreeError::Store(e.to_string()))?
1169                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1170
1171            if is_tree_node(&child_data) {
1172                // Intermediate node - recurse
1173                let child_node = decode_tree_node(&child_data)?;
1174                Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
1175            } else {
1176                // Leaf chunk
1177                let size = child_data.len() as u64;
1178                chunks.push((link.hash, *offset, size));
1179                *offset += size;
1180            }
1181        }
1182        Ok(())
1183    }
1184
1185    /// Recursively assemble chunks from tree
1186    async fn assemble_chunks_limited(
1187        &self,
1188        node: &TreeNode,
1189        max_size: Option<u64>,
1190        bytes_read: &mut u64,
1191    ) -> Result<Vec<u8>, HashTreeError> {
1192        let mut parts: Vec<Vec<u8>> = Vec::new();
1193
1194        for link in &node.links {
1195            let projected = (*bytes_read).saturating_add(link.size);
1196            Self::ensure_size_limit(max_size, projected)?;
1197
1198            let child_data = self
1199                .store
1200                .get(&link.hash)
1201                .await
1202                .map_err(|e| HashTreeError::Store(e.to_string()))?
1203                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1204
1205            if is_tree_node(&child_data) {
1206                let child_node = decode_tree_node(&child_data)?;
1207                parts.push(
1208                    Box::pin(self.assemble_chunks_limited(&child_node, max_size, bytes_read))
1209                        .await?,
1210                );
1211            } else {
1212                let projected = (*bytes_read).saturating_add(child_data.len() as u64);
1213                Self::ensure_size_limit(max_size, projected)?;
1214                *bytes_read = projected;
1215                parts.push(child_data);
1216            }
1217        }
1218
1219        // Concatenate all parts
1220        let total_length: usize = parts.iter().map(|p| p.len()).sum();
1221        let mut result = Vec::with_capacity(total_length);
1222        for part in parts {
1223            result.extend_from_slice(&part);
1224        }
1225
1226        Ok(result)
1227    }
1228
1229    /// Read a file as stream of chunks
1230    /// Returns an async stream that yields chunks as they are read
1231    pub fn read_file_stream(
1232        &self,
1233        hash: Hash,
1234    ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
1235        Box::pin(stream::unfold(
1236            ReadStreamState::Init { hash, tree: self },
1237            |state| async move {
1238                match state {
1239                    ReadStreamState::Init { hash, tree } => {
1240                        let data = match tree.store.get(&hash).await {
1241                            Ok(Some(d)) => d,
1242                            Ok(None) => return None,
1243                            Err(e) => {
1244                                return Some((
1245                                    Err(HashTreeError::Store(e.to_string())),
1246                                    ReadStreamState::Done,
1247                                ))
1248                            }
1249                        };
1250
1251                        if !is_tree_node(&data) {
1252                            // Single blob - yield it and finish
1253                            return Some((Ok(data), ReadStreamState::Done));
1254                        }
1255
1256                        // Tree node - start streaming chunks
1257                        let node = match decode_tree_node(&data) {
1258                            Ok(n) => n,
1259                            Err(e) => {
1260                                return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1261                            }
1262                        };
1263
1264                        // Create stack with all links to process
1265                        let mut stack: Vec<StreamStackItem> = Vec::new();
1266                        for link in node.links.into_iter().rev() {
1267                            stack.push(StreamStackItem::Hash(link.hash));
1268                        }
1269
1270                        // Process first item
1271                        tree.process_stream_stack(&mut stack).await
1272                    }
1273                    ReadStreamState::Processing { mut stack, tree } => {
1274                        tree.process_stream_stack(&mut stack).await
1275                    }
1276                    ReadStreamState::Done => None,
1277                }
1278            },
1279        ))
1280    }
1281
1282    async fn process_stream_stack<'a>(
1283        &'a self,
1284        stack: &mut Vec<StreamStackItem>,
1285    ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
1286        while let Some(item) = stack.pop() {
1287            match item {
1288                StreamStackItem::Hash(hash) => {
1289                    let data = match self.store.get(&hash).await {
1290                        Ok(Some(d)) => d,
1291                        Ok(None) => {
1292                            return Some((
1293                                Err(HashTreeError::MissingChunk(to_hex(&hash))),
1294                                ReadStreamState::Done,
1295                            ))
1296                        }
1297                        Err(e) => {
1298                            return Some((
1299                                Err(HashTreeError::Store(e.to_string())),
1300                                ReadStreamState::Done,
1301                            ))
1302                        }
1303                    };
1304
1305                    if is_tree_node(&data) {
1306                        // Nested tree - push its children to stack
1307                        let node = match decode_tree_node(&data) {
1308                            Ok(n) => n,
1309                            Err(e) => {
1310                                return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1311                            }
1312                        };
1313                        for link in node.links.into_iter().rev() {
1314                            stack.push(StreamStackItem::Hash(link.hash));
1315                        }
1316                    } else {
1317                        // Leaf blob - yield it
1318                        return Some((
1319                            Ok(data),
1320                            ReadStreamState::Processing {
1321                                stack: std::mem::take(stack),
1322                                tree: self,
1323                            },
1324                        ));
1325                    }
1326                }
1327            }
1328        }
1329        None
1330    }
1331
1332    /// Read file chunks as Vec (non-streaming version)
1333    pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1334        let data = match self
1335            .store
1336            .get(hash)
1337            .await
1338            .map_err(|e| HashTreeError::Store(e.to_string()))?
1339        {
1340            Some(d) => d,
1341            None => return Ok(vec![]),
1342        };
1343
1344        if !is_tree_node(&data) {
1345            return Ok(vec![data]);
1346        }
1347
1348        let node = decode_tree_node(&data)?;
1349        self.collect_chunks(&node).await
1350    }
1351
1352    async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1353        let mut chunks = Vec::new();
1354
1355        for link in &node.links {
1356            let child_data = self
1357                .store
1358                .get(&link.hash)
1359                .await
1360                .map_err(|e| HashTreeError::Store(e.to_string()))?
1361                .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1362
1363            if is_tree_node(&child_data) {
1364                let child_node = decode_tree_node(&child_data)?;
1365                chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1366            } else {
1367                chunks.push(child_data);
1368            }
1369        }
1370
1371        Ok(chunks)
1372    }
1373
1374    /// List directory entries (Cid-based, supports encrypted directories)
1375    pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1376        let node = match self.get_node(cid).await? {
1377            Some(n) => n,
1378            None => return Ok(vec![]),
1379        };
1380
1381        let mut entries = Vec::new();
1382
1383        for link in &node.links {
1384            // Skip internal chunk nodes - recurse into them
1385            if Self::is_internal_directory_link(&node, link) {
1386                let chunk_cid = Cid {
1387                    hash: link.hash,
1388                    key: link.key,
1389                };
1390                let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1391                entries.extend(sub_entries);
1392                continue;
1393            }
1394
1395            entries.push(TreeEntry {
1396                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1397                hash: link.hash,
1398                size: link.size,
1399                link_type: link.link_type,
1400                key: link.key,
1401                meta: link.meta.clone(),
1402            });
1403        }
1404
1405        Ok(entries)
1406    }
1407
1408    /// List directory entries using Cid (with decryption if key present)
1409    /// Handles both regular and chunked directory data
1410    pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1411        // Use get_directory_node which handles chunked directory data
1412        let node = match self.get_directory_node(cid).await? {
1413            Some(n) => n,
1414            None => return Ok(vec![]),
1415        };
1416
1417        let mut entries = Vec::new();
1418
1419        for link in &node.links {
1420            // Skip internal chunk nodes (backwards compat with old _chunk_ format)
1421            if Self::is_internal_directory_link(&node, link) {
1422                // Internal nodes inherit parent's key for decryption
1423                let sub_cid = Cid {
1424                    hash: link.hash,
1425                    key: cid.key,
1426                };
1427                let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1428                entries.extend(sub_entries);
1429                continue;
1430            }
1431
1432            entries.push(TreeEntry {
1433                name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1434                hash: link.hash,
1435                size: link.size,
1436                link_type: link.link_type,
1437                key: link.key,
1438                meta: link.meta.clone(),
1439            });
1440        }
1441
1442        Ok(entries)
1443    }
1444
1445    /// Resolve a path within a tree (returns Cid with key if encrypted)
1446    pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1447        let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1448        if parts.is_empty() {
1449            return Ok(Some(cid.clone()));
1450        }
1451
1452        let mut current_cid = cid.clone();
1453
1454        for part in parts {
1455            // Use get_directory_node which handles chunked directory data
1456            let node = match self.get_directory_node(&current_cid).await? {
1457                Some(n) => n,
1458                None => return Ok(None),
1459            };
1460
1461            if let Some(link) = self.find_link(&node, part) {
1462                current_cid = Cid {
1463                    hash: link.hash,
1464                    key: link.key,
1465                };
1466            } else {
1467                // Check internal nodes
1468                match self
1469                    .find_link_in_subtrees_cid(&node, part, &current_cid)
1470                    .await?
1471                {
1472                    Some(link) => {
1473                        current_cid = Cid {
1474                            hash: link.hash,
1475                            key: link.key,
1476                        };
1477                    }
1478                    None => return Ok(None),
1479                }
1480            }
1481        }
1482
1483        Ok(Some(current_cid))
1484    }
1485
1486    /// Resolve a path within a tree using Cid (with decryption if key present)
1487    pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1488        self.resolve(cid, path).await
1489    }
1490
1491    fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1492        node.links
1493            .iter()
1494            .find(|l| l.name.as_deref() == Some(name))
1495            .cloned()
1496    }
1497
1498    /// Find a link in subtrees using Cid (with decryption support)
1499    async fn find_link_in_subtrees_cid(
1500        &self,
1501        node: &TreeNode,
1502        name: &str,
1503        _parent_cid: &Cid,
1504    ) -> Result<Option<Link>, HashTreeError> {
1505        for link in &node.links {
1506            if !Self::is_internal_directory_link(node, link) {
1507                continue;
1508            }
1509
1510            // Internal nodes inherit encryption from parent context
1511            let sub_cid = Cid {
1512                hash: link.hash,
1513                key: link.key,
1514            };
1515
1516            let sub_node = match self.get_node(&sub_cid).await? {
1517                Some(n) => n,
1518                None => continue,
1519            };
1520
1521            if let Some(found) = self.find_link(&sub_node, name) {
1522                return Ok(Some(found));
1523            }
1524
1525            if let Some(deep_found) =
1526                Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await?
1527            {
1528                return Ok(Some(deep_found));
1529            }
1530        }
1531
1532        Ok(None)
1533    }
1534
1535    /// Get total size of a tree
1536    pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1537        let data = match self
1538            .store
1539            .get(hash)
1540            .await
1541            .map_err(|e| HashTreeError::Store(e.to_string()))?
1542        {
1543            Some(d) => d,
1544            None => return Ok(0),
1545        };
1546
1547        if !is_tree_node(&data) {
1548            return Ok(data.len() as u64);
1549        }
1550
1551        let node = decode_tree_node(&data)?;
1552        // Calculate from children
1553        let mut total = 0u64;
1554        for link in &node.links {
1555            total += link.size;
1556        }
1557        Ok(total)
1558    }
1559
1560    /// Get total size using a Cid (handles decryption if key present)
1561    pub async fn get_size_cid(&self, cid: &Cid) -> Result<u64, HashTreeError> {
1562        if let Some(key) = cid.key {
1563            let data = match self.get_encrypted_root(&cid.hash, &key).await? {
1564                Some(d) => d,
1565                None => return Ok(0),
1566            };
1567            if is_tree_node(&data) {
1568                let node = decode_tree_node(&data)?;
1569                return Ok(node.links.iter().map(|link| link.size).sum());
1570            }
1571            return Ok(data.len() as u64);
1572        }
1573
1574        self.get_size(&cid.hash).await
1575    }
1576
1577    /// Walk entire tree depth-first (returns Vec)
1578    pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1579        let mut entries = Vec::new();
1580        self.walk_recursive(cid, path, &mut entries).await?;
1581        Ok(entries)
1582    }
1583
1584    async fn walk_recursive(
1585        &self,
1586        cid: &Cid,
1587        path: &str,
1588        entries: &mut Vec<WalkEntry>,
1589    ) -> Result<(), HashTreeError> {
1590        let data = match self
1591            .store
1592            .get(&cid.hash)
1593            .await
1594            .map_err(|e| HashTreeError::Store(e.to_string()))?
1595        {
1596            Some(d) => d,
1597            None => return Ok(()),
1598        };
1599
1600        // Decrypt if key is present
1601        let data = if let Some(key) = &cid.key {
1602            decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1603        } else {
1604            data
1605        };
1606
1607        let node = match try_decode_tree_node(&data) {
1608            Some(n) => n,
1609            None => {
1610                entries.push(WalkEntry {
1611                    path: path.to_string(),
1612                    hash: cid.hash,
1613                    link_type: LinkType::Blob,
1614                    size: data.len() as u64,
1615                    key: cid.key,
1616                });
1617                return Ok(());
1618            }
1619        };
1620
1621        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1622        entries.push(WalkEntry {
1623            path: path.to_string(),
1624            hash: cid.hash,
1625            link_type: node.node_type,
1626            size: node_size,
1627            key: cid.key,
1628        });
1629
1630        for link in &node.links {
1631            let child_path = match &link.name {
1632                Some(name) => {
1633                    if Self::is_internal_directory_link(&node, link) {
1634                        // Internal nodes inherit parent's key
1635                        let sub_cid = Cid {
1636                            hash: link.hash,
1637                            key: cid.key,
1638                        };
1639                        Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1640                        continue;
1641                    }
1642                    if path.is_empty() {
1643                        name.clone()
1644                    } else {
1645                        format!("{}/{}", path, name)
1646                    }
1647                }
1648                None => path.to_string(),
1649            };
1650
1651            // Child nodes use their own key from link
1652            let child_cid = Cid {
1653                hash: link.hash,
1654                key: link.key,
1655            };
1656            Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1657        }
1658
1659        Ok(())
1660    }
1661
1662    /// Walk entire tree with parallel fetching
1663    /// Uses a work-stealing approach: always keeps `concurrency` requests in flight
1664    pub async fn walk_parallel(
1665        &self,
1666        cid: &Cid,
1667        path: &str,
1668        concurrency: usize,
1669    ) -> Result<Vec<WalkEntry>, HashTreeError> {
1670        self.walk_parallel_with_progress(cid, path, concurrency, None)
1671            .await
1672    }
1673
1674    /// Walk entire tree with parallel fetching and optional progress counter
1675    /// The counter is incremented for each node fetched (not just entries found)
1676    ///
1677    /// OPTIMIZATION: Blobs are NOT fetched - their metadata (hash, size, link_type)
1678    /// comes from the parent node's link, so we just add them directly to entries.
1679    /// This avoids downloading file contents during tree traversal.
1680    pub async fn walk_parallel_with_progress(
1681        &self,
1682        cid: &Cid,
1683        path: &str,
1684        concurrency: usize,
1685        progress: Option<&std::sync::atomic::AtomicUsize>,
1686    ) -> Result<Vec<WalkEntry>, HashTreeError> {
1687        use futures::stream::{FuturesUnordered, StreamExt};
1688        use std::collections::VecDeque;
1689        use std::sync::atomic::Ordering;
1690
1691        let mut entries = Vec::new();
1692        let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1693        let mut active = FuturesUnordered::new();
1694
1695        // Seed with root
1696        pending.push_back((cid.clone(), path.to_string()));
1697
1698        loop {
1699            // Fill up to concurrency limit from pending queue
1700            while active.len() < concurrency {
1701                if let Some((node_cid, node_path)) = pending.pop_front() {
1702                    let store = &self.store;
1703                    let fut = async move {
1704                        let data = store
1705                            .get(&node_cid.hash)
1706                            .await
1707                            .map_err(|e| HashTreeError::Store(e.to_string()))?;
1708                        Ok::<_, HashTreeError>((node_cid, node_path, data))
1709                    };
1710                    active.push(fut);
1711                } else {
1712                    break;
1713                }
1714            }
1715
1716            // If nothing active, we're done
1717            if active.is_empty() {
1718                break;
1719            }
1720
1721            // Wait for any future to complete
1722            if let Some(result) = active.next().await {
1723                let (node_cid, node_path, data) = result?;
1724
1725                // Update progress counter
1726                if let Some(counter) = progress {
1727                    counter.fetch_add(1, Ordering::Relaxed);
1728                }
1729
1730                let data = match data {
1731                    Some(d) => d,
1732                    None => continue,
1733                };
1734
1735                // Decrypt if key is present
1736                let data = if let Some(key) = &node_cid.key {
1737                    decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1738                } else {
1739                    data
1740                };
1741
1742                let node = match try_decode_tree_node(&data) {
1743                    Some(n) => n,
1744                    None => {
1745                        // It's a blob/file - this case only happens for root
1746                        entries.push(WalkEntry {
1747                            path: node_path,
1748                            hash: node_cid.hash,
1749                            link_type: LinkType::Blob,
1750                            size: data.len() as u64,
1751                            key: node_cid.key,
1752                        });
1753                        continue;
1754                    }
1755                };
1756
1757                // It's a directory/file node
1758                let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1759                entries.push(WalkEntry {
1760                    path: node_path.clone(),
1761                    hash: node_cid.hash,
1762                    link_type: node.node_type,
1763                    size: node_size,
1764                    key: node_cid.key,
1765                });
1766
1767                // Queue children - but DON'T fetch blobs, just add them directly
1768                for link in &node.links {
1769                    let child_path = match &link.name {
1770                        Some(name) => {
1771                            if Self::is_internal_directory_link(&node, link) {
1772                                // Internal chunked nodes - inherit parent's key, same path
1773                                let sub_cid = Cid {
1774                                    hash: link.hash,
1775                                    key: node_cid.key,
1776                                };
1777                                pending.push_back((sub_cid, node_path.clone()));
1778                                continue;
1779                            }
1780                            if node_path.is_empty() {
1781                                name.clone()
1782                            } else {
1783                                format!("{}/{}", node_path, name)
1784                            }
1785                        }
1786                        None => node_path.clone(),
1787                    };
1788
1789                    // OPTIMIZATION: If it's a blob, add entry directly without fetching
1790                    // The link already contains all the metadata we need
1791                    if link.link_type == LinkType::Blob {
1792                        entries.push(WalkEntry {
1793                            path: child_path,
1794                            hash: link.hash,
1795                            link_type: LinkType::Blob,
1796                            size: link.size,
1797                            key: link.key,
1798                        });
1799                        if let Some(counter) = progress {
1800                            counter.fetch_add(1, Ordering::Relaxed);
1801                        }
1802                        continue;
1803                    }
1804
1805                    // For tree nodes (File/Dir), we need to fetch to see their children
1806                    let child_cid = Cid {
1807                        hash: link.hash,
1808                        key: link.key,
1809                    };
1810                    pending.push_back((child_cid, child_path));
1811                }
1812            }
1813        }
1814
1815        Ok(entries)
1816    }
1817
1818    /// Walk tree as stream
1819    pub fn walk_stream(
1820        &self,
1821        cid: Cid,
1822        initial_path: String,
1823    ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1824        Box::pin(stream::unfold(
1825            WalkStreamState::Init {
1826                cid,
1827                path: initial_path,
1828                tree: self,
1829            },
1830            |state| async move {
1831                match state {
1832                    WalkStreamState::Init { cid, path, tree } => {
1833                        let data = match tree.store.get(&cid.hash).await {
1834                            Ok(Some(d)) => d,
1835                            Ok(None) => return None,
1836                            Err(e) => {
1837                                return Some((
1838                                    Err(HashTreeError::Store(e.to_string())),
1839                                    WalkStreamState::Done,
1840                                ))
1841                            }
1842                        };
1843
1844                        // Decrypt if key is present
1845                        let data = if let Some(key) = &cid.key {
1846                            match decrypt_chk(&data, key) {
1847                                Ok(d) => d,
1848                                Err(e) => {
1849                                    return Some((
1850                                        Err(HashTreeError::Decryption(e.to_string())),
1851                                        WalkStreamState::Done,
1852                                    ))
1853                                }
1854                            }
1855                        } else {
1856                            data
1857                        };
1858
1859                        let node = match try_decode_tree_node(&data) {
1860                            Some(n) => n,
1861                            None => {
1862                                // Blob data
1863                                let entry = WalkEntry {
1864                                    path,
1865                                    hash: cid.hash,
1866                                    link_type: LinkType::Blob,
1867                                    size: data.len() as u64,
1868                                    key: cid.key,
1869                                };
1870                                return Some((Ok(entry), WalkStreamState::Done));
1871                            }
1872                        };
1873
1874                        let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1875                        let entry = WalkEntry {
1876                            path: path.clone(),
1877                            hash: cid.hash,
1878                            link_type: node.node_type,
1879                            size: node_size,
1880                            key: cid.key,
1881                        };
1882
1883                        // Create stack with children to process
1884                        let mut stack: Vec<WalkStackItem> = Vec::new();
1885                        let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
1886                        for link in node.links.into_iter().rev() {
1887                            let is_internal = Self::is_internal_directory_link_with_legacy_fanout(
1888                                &link,
1889                                uses_legacy_fanout,
1890                            );
1891                            let child_path = match &link.name {
1892                                Some(name) if !is_internal => {
1893                                    if path.is_empty() {
1894                                        name.clone()
1895                                    } else {
1896                                        format!("{}/{}", path, name)
1897                                    }
1898                                }
1899                                _ => path.clone(),
1900                            };
1901                            // Child nodes use their own key from link
1902                            stack.push(WalkStackItem {
1903                                hash: link.hash,
1904                                path: child_path,
1905                                key: link.key,
1906                            });
1907                        }
1908
1909                        Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1910                    }
1911                    WalkStreamState::Processing { mut stack, tree } => {
1912                        tree.process_walk_stack(&mut stack).await
1913                    }
1914                    WalkStreamState::Done => None,
1915                }
1916            },
1917        ))
1918    }
1919
1920    async fn process_walk_stack<'a>(
1921        &'a self,
1922        stack: &mut Vec<WalkStackItem>,
1923    ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1924        while let Some(item) = stack.pop() {
1925            let data = match self.store.get(&item.hash).await {
1926                Ok(Some(d)) => d,
1927                Ok(None) => continue,
1928                Err(e) => {
1929                    return Some((
1930                        Err(HashTreeError::Store(e.to_string())),
1931                        WalkStreamState::Done,
1932                    ))
1933                }
1934            };
1935
1936            let node = match try_decode_tree_node(&data) {
1937                Some(n) => n,
1938                None => {
1939                    // Blob data
1940                    let entry = WalkEntry {
1941                        path: item.path,
1942                        hash: item.hash,
1943                        link_type: LinkType::Blob,
1944                        size: data.len() as u64,
1945                        key: item.key,
1946                    };
1947                    return Some((
1948                        Ok(entry),
1949                        WalkStreamState::Processing {
1950                            stack: std::mem::take(stack),
1951                            tree: self,
1952                        },
1953                    ));
1954                }
1955            };
1956
1957            let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1958            let entry = WalkEntry {
1959                path: item.path.clone(),
1960                hash: item.hash,
1961                link_type: node.node_type,
1962                size: node_size,
1963                key: None, // directories are not encrypted
1964            };
1965
1966            // Push children to stack
1967            let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
1968            for link in node.links.into_iter().rev() {
1969                let is_internal =
1970                    Self::is_internal_directory_link_with_legacy_fanout(&link, uses_legacy_fanout);
1971                let child_path = match &link.name {
1972                    Some(name) if !is_internal => {
1973                        if item.path.is_empty() {
1974                            name.clone()
1975                        } else {
1976                            format!("{}/{}", item.path, name)
1977                        }
1978                    }
1979                    _ => item.path.clone(),
1980                };
1981                stack.push(WalkStackItem {
1982                    hash: link.hash,
1983                    path: child_path,
1984                    key: link.key,
1985                });
1986            }
1987
1988            return Some((
1989                Ok(entry),
1990                WalkStreamState::Processing {
1991                    stack: std::mem::take(stack),
1992                    tree: self,
1993                },
1994            ));
1995        }
1996        None
1997    }
1998
1999    // ============ EDIT ============
2000
2001    /// Add or update an entry in a directory
2002    /// Returns new root Cid (immutable operation)
2003    pub async fn set_entry(
2004        &self,
2005        root: &Cid,
2006        path: &[&str],
2007        name: &str,
2008        entry_cid: &Cid,
2009        size: u64,
2010        link_type: LinkType,
2011    ) -> Result<Cid, HashTreeError> {
2012        let dir_cid = self.resolve_path_array(root, path).await?;
2013        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
2014
2015        let entries = self.list_directory(&dir_cid).await?;
2016        let mut new_entries: Vec<DirEntry> = entries
2017            .into_iter()
2018            .filter(|e| e.name != name)
2019            .map(|e| DirEntry {
2020                name: e.name,
2021                hash: e.hash,
2022                size: e.size,
2023                key: e.key,
2024                link_type: e.link_type,
2025                meta: e.meta,
2026            })
2027            .collect();
2028
2029        new_entries.push(DirEntry {
2030            name: name.to_string(),
2031            hash: entry_cid.hash,
2032            size,
2033            key: entry_cid.key,
2034            link_type,
2035            meta: None,
2036        });
2037
2038        let new_dir_cid = self.put_directory(new_entries).await?;
2039        self.rebuild_path(root, path, new_dir_cid).await
2040    }
2041
2042    /// Remove an entry from a directory
2043    /// Returns new root Cid
2044    pub async fn remove_entry(
2045        &self,
2046        root: &Cid,
2047        path: &[&str],
2048        name: &str,
2049    ) -> Result<Cid, HashTreeError> {
2050        let dir_cid = self.resolve_path_array(root, path).await?;
2051        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
2052
2053        let entries = self.list_directory(&dir_cid).await?;
2054        let new_entries: Vec<DirEntry> = entries
2055            .into_iter()
2056            .filter(|e| e.name != name)
2057            .map(|e| DirEntry {
2058                name: e.name,
2059                hash: e.hash,
2060                size: e.size,
2061                key: e.key,
2062                link_type: e.link_type,
2063                meta: e.meta,
2064            })
2065            .collect();
2066
2067        let new_dir_cid = self.put_directory(new_entries).await?;
2068        self.rebuild_path(root, path, new_dir_cid).await
2069    }
2070
2071    /// Rename an entry in a directory
2072    /// Returns new root Cid
2073    pub async fn rename_entry(
2074        &self,
2075        root: &Cid,
2076        path: &[&str],
2077        old_name: &str,
2078        new_name: &str,
2079    ) -> Result<Cid, HashTreeError> {
2080        if old_name == new_name {
2081            return Ok(root.clone());
2082        }
2083
2084        let dir_cid = self.resolve_path_array(root, path).await?;
2085        let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
2086
2087        let entries = self.list_directory(&dir_cid).await?;
2088        let entry = entries
2089            .iter()
2090            .find(|e| e.name == old_name)
2091            .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
2092
2093        let entry_hash = entry.hash;
2094        let entry_size = entry.size;
2095        let entry_key = entry.key;
2096        let entry_link_type = entry.link_type;
2097        let entry_meta = entry.meta.clone();
2098
2099        let new_entries: Vec<DirEntry> = entries
2100            .into_iter()
2101            .filter(|e| e.name != old_name)
2102            .map(|e| DirEntry {
2103                name: e.name,
2104                hash: e.hash,
2105                size: e.size,
2106                key: e.key,
2107                link_type: e.link_type,
2108                meta: e.meta,
2109            })
2110            .chain(std::iter::once(DirEntry {
2111                name: new_name.to_string(),
2112                hash: entry_hash,
2113                size: entry_size,
2114                key: entry_key,
2115                link_type: entry_link_type,
2116                meta: entry_meta,
2117            }))
2118            .collect();
2119
2120        let new_dir_cid = self.put_directory(new_entries).await?;
2121        self.rebuild_path(root, path, new_dir_cid).await
2122    }
2123
2124    /// Move an entry to a different directory
2125    /// Returns new root Cid
2126    pub async fn move_entry(
2127        &self,
2128        root: &Cid,
2129        source_path: &[&str],
2130        name: &str,
2131        target_path: &[&str],
2132    ) -> Result<Cid, HashTreeError> {
2133        let source_dir_cid = self.resolve_path_array(root, source_path).await?;
2134        let source_dir_cid =
2135            source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
2136
2137        let source_entries = self.list_directory(&source_dir_cid).await?;
2138        let entry = source_entries
2139            .iter()
2140            .find(|e| e.name == name)
2141            .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
2142
2143        let entry_cid = Cid {
2144            hash: entry.hash,
2145            key: entry.key,
2146        };
2147        let entry_size = entry.size;
2148        let entry_link_type = entry.link_type;
2149
2150        // Remove from source
2151        let new_root = self.remove_entry(root, source_path, name).await?;
2152
2153        // Add to target
2154        self.set_entry(
2155            &new_root,
2156            target_path,
2157            name,
2158            &entry_cid,
2159            entry_size,
2160            entry_link_type,
2161        )
2162        .await
2163    }
2164
2165    async fn resolve_path_array(
2166        &self,
2167        root: &Cid,
2168        path: &[&str],
2169    ) -> Result<Option<Cid>, HashTreeError> {
2170        if path.is_empty() {
2171            return Ok(Some(root.clone()));
2172        }
2173        self.resolve_path(root, &path.join("/")).await
2174    }
2175
2176    async fn rebuild_path(
2177        &self,
2178        root: &Cid,
2179        path: &[&str],
2180        new_child: Cid,
2181    ) -> Result<Cid, HashTreeError> {
2182        if path.is_empty() {
2183            return Ok(new_child);
2184        }
2185
2186        let mut child_cid = new_child;
2187        let parts: Vec<&str> = path.to_vec();
2188
2189        for i in (0..parts.len()).rev() {
2190            let child_name = parts[i];
2191            let parent_path = &parts[..i];
2192
2193            let parent_cid = if parent_path.is_empty() {
2194                root.clone()
2195            } else {
2196                self.resolve_path_array(root, parent_path)
2197                    .await?
2198                    .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
2199            };
2200
2201            let parent_entries = self.list_directory(&parent_cid).await?;
2202            let new_parent_entries: Vec<DirEntry> = parent_entries
2203                .into_iter()
2204                .map(|e| {
2205                    if e.name == child_name {
2206                        DirEntry {
2207                            name: e.name,
2208                            hash: child_cid.hash,
2209                            size: 0, // Directories don't have a meaningful size in the link
2210                            key: child_cid.key,
2211                            link_type: e.link_type,
2212                            meta: e.meta,
2213                        }
2214                    } else {
2215                        DirEntry {
2216                            name: e.name,
2217                            hash: e.hash,
2218                            size: e.size,
2219                            key: e.key,
2220                            link_type: e.link_type,
2221                            meta: e.meta,
2222                        }
2223                    }
2224                })
2225                .collect();
2226
2227            child_cid = self.put_directory(new_parent_entries).await?;
2228        }
2229
2230        Ok(child_cid)
2231    }
2232
2233    // ============ UTILITY ============
2234
2235    /// Get the underlying store
2236    pub fn get_store(&self) -> Arc<S> {
2237        self.store.clone()
2238    }
2239
2240    /// Get chunk size configuration
2241    pub fn chunk_size(&self) -> usize {
2242        self.chunk_size
2243    }
2244
2245    /// Get max links configuration
2246    pub fn max_links(&self) -> usize {
2247        self.max_links
2248    }
2249}
2250
2251// Internal state types for streaming
2252
2253enum StreamStackItem {
2254    Hash(Hash),
2255}
2256
2257enum ReadStreamState<'a, S: Store> {
2258    Init {
2259        hash: Hash,
2260        tree: &'a HashTree<S>,
2261    },
2262    Processing {
2263        stack: Vec<StreamStackItem>,
2264        tree: &'a HashTree<S>,
2265    },
2266    Done,
2267}
2268
2269struct WalkStackItem {
2270    hash: Hash,
2271    path: String,
2272    key: Option<[u8; 32]>,
2273}
2274
2275enum WalkStreamState<'a, S: Store> {
2276    Init {
2277        cid: Cid,
2278        path: String,
2279        tree: &'a HashTree<S>,
2280    },
2281    Processing {
2282        stack: Vec<WalkStackItem>,
2283        tree: &'a HashTree<S>,
2284    },
2285    Done,
2286}
2287
2288// Encrypted stream state types
2289struct EncryptedStackItem {
2290    hash: Hash,
2291    key: Option<[u8; 32]>,
2292}
2293
2294enum EncryptedStreamState<'a, S: Store> {
2295    Init {
2296        hash: Hash,
2297        key: [u8; 32],
2298        tree: &'a HashTree<S>,
2299    },
2300    Processing {
2301        stack: Vec<EncryptedStackItem>,
2302        tree: &'a HashTree<S>,
2303    },
2304    Done,
2305}
2306
2307/// Verify tree integrity - checks that all referenced hashes exist
2308pub async fn verify_tree<S: Store>(
2309    store: Arc<S>,
2310    root_hash: &Hash,
2311) -> Result<crate::reader::VerifyResult, HashTreeError> {
2312    let mut missing = Vec::new();
2313    let mut visited = std::collections::HashSet::new();
2314
2315    verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
2316
2317    Ok(crate::reader::VerifyResult {
2318        valid: missing.is_empty(),
2319        missing,
2320    })
2321}
2322
2323async fn verify_recursive<S: Store>(
2324    store: Arc<S>,
2325    hash: &Hash,
2326    missing: &mut Vec<Hash>,
2327    visited: &mut std::collections::HashSet<String>,
2328) -> Result<(), HashTreeError> {
2329    let hex = to_hex(hash);
2330    if visited.contains(&hex) {
2331        return Ok(());
2332    }
2333    visited.insert(hex);
2334
2335    let data = match store
2336        .get(hash)
2337        .await
2338        .map_err(|e| HashTreeError::Store(e.to_string()))?
2339    {
2340        Some(d) => d,
2341        None => {
2342            missing.push(*hash);
2343            return Ok(());
2344        }
2345    };
2346
2347    if is_tree_node(&data) {
2348        let node = decode_tree_node(&data)?;
2349        for link in &node.links {
2350            Box::pin(verify_recursive(
2351                store.clone(),
2352                &link.hash,
2353                missing,
2354                visited,
2355            ))
2356            .await?;
2357        }
2358    }
2359
2360    Ok(())
2361}
2362
2363#[cfg(test)]
2364mod tests {
2365    use super::*;
2366    use crate::store::MemoryStore;
2367
2368    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
2369        let store = Arc::new(MemoryStore::new());
2370        // Use public (unencrypted) mode for these tests
2371        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
2372        (store, tree)
2373    }
2374
2375    #[tokio::test]
2376    async fn test_put_and_read_blob() {
2377        let (_store, tree) = make_tree();
2378
2379        let data = vec![1, 2, 3, 4, 5];
2380        let hash = tree.put_blob(&data).await.unwrap();
2381
2382        let result = tree.get_blob(&hash).await.unwrap();
2383        assert_eq!(result, Some(data));
2384    }
2385
2386    #[tokio::test]
2387    async fn test_put_and_read_file_small() {
2388        let (_store, tree) = make_tree();
2389
2390        let data = b"Hello, World!";
2391        let (cid, size) = tree.put_file(data).await.unwrap();
2392
2393        assert_eq!(size, data.len() as u64);
2394
2395        let read_data = tree.read_file(&cid.hash).await.unwrap();
2396        assert_eq!(read_data, Some(data.to_vec()));
2397    }
2398
2399    #[tokio::test]
2400    async fn test_put_and_read_directory() {
2401        let (_store, tree) = make_tree();
2402
2403        let file1 = tree.put_blob(b"content1").await.unwrap();
2404        let file2 = tree.put_blob(b"content2").await.unwrap();
2405
2406        let dir_cid = tree
2407            .put_directory(vec![
2408                DirEntry::new("a.txt", file1).with_size(8),
2409                DirEntry::new("b.txt", file2).with_size(8),
2410            ])
2411            .await
2412            .unwrap();
2413
2414        let entries = tree.list_directory(&dir_cid).await.unwrap();
2415        assert_eq!(entries.len(), 2);
2416        let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
2417        assert!(names.contains(&"a.txt"));
2418        assert!(names.contains(&"b.txt"));
2419    }
2420
2421    #[tokio::test]
2422    async fn test_is_directory() {
2423        let (_store, tree) = make_tree();
2424
2425        let file_hash = tree.put_blob(b"data").await.unwrap();
2426        let dir_cid = tree.put_directory(vec![]).await.unwrap();
2427
2428        assert!(!tree.is_directory(&file_hash).await.unwrap());
2429        assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
2430    }
2431
2432    #[tokio::test]
2433    async fn test_plaintext_directory_with_stray_key_still_lists_as_directory() {
2434        let (_store, tree) = make_tree();
2435
2436        let file_hash = tree.put_blob(b"data").await.unwrap();
2437        let dir_cid = tree
2438            .put_directory(vec![DirEntry::new("thumbnail.jpg", file_hash).with_size(4)])
2439            .await
2440            .unwrap();
2441        let legacy_cid = Cid {
2442            hash: dir_cid.hash,
2443            key: Some([7u8; 32]),
2444        };
2445
2446        assert!(tree.is_dir(&legacy_cid).await.unwrap());
2447        let entries = tree.list_directory(&legacy_cid).await.unwrap();
2448        assert_eq!(entries.len(), 1);
2449        assert_eq!(entries[0].name, "thumbnail.jpg");
2450    }
2451
2452    #[tokio::test]
2453    async fn test_resolve_path() {
2454        let (_store, tree) = make_tree();
2455
2456        let file_hash = tree.put_blob(b"nested").await.unwrap();
2457        let sub_dir = tree
2458            .put_directory(vec![DirEntry::new("file.txt", file_hash).with_size(6)])
2459            .await
2460            .unwrap();
2461        let root_dir = tree
2462            .put_directory(vec![DirEntry::new("subdir", sub_dir.hash)])
2463            .await
2464            .unwrap();
2465
2466        let resolved = tree
2467            .resolve_path(&root_dir, "subdir/file.txt")
2468            .await
2469            .unwrap();
2470        assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
2471    }
2472
2473    // ============ UNIFIED API TESTS ============
2474
2475    #[tokio::test]
2476    async fn test_unified_put_get_public() {
2477        let store = Arc::new(MemoryStore::new());
2478        // Use .public() to disable encryption
2479        let tree = HashTree::new(HashTreeConfig::new(store).public());
2480
2481        let data = b"Hello, public world!";
2482        let (cid, size) = tree.put(data).await.unwrap();
2483
2484        assert_eq!(size, data.len() as u64);
2485        assert!(cid.key.is_none()); // No key for public content
2486
2487        let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2488        assert_eq!(retrieved, data);
2489    }
2490
2491    #[tokio::test]
2492    async fn test_unified_put_get_encrypted() {
2493        let store = Arc::new(MemoryStore::new());
2494        // Default config has encryption enabled
2495        let tree = HashTree::new(HashTreeConfig::new(store));
2496
2497        let data = b"Hello, encrypted world!";
2498        let (cid, size) = tree.put(data).await.unwrap();
2499
2500        assert_eq!(size, data.len() as u64);
2501        assert!(cid.key.is_some()); // Has encryption key
2502
2503        let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2504        assert_eq!(retrieved, data);
2505    }
2506
2507    #[tokio::test]
2508    async fn test_unified_put_get_encrypted_chunked() {
2509        let store = Arc::new(MemoryStore::new());
2510        let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2511
2512        // Data larger than chunk size
2513        let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2514        let (cid, size) = tree.put(&data).await.unwrap();
2515
2516        assert_eq!(size, data.len() as u64);
2517        assert!(cid.key.is_some());
2518
2519        let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2520        assert_eq!(retrieved, data);
2521    }
2522
2523    #[tokio::test]
2524    async fn test_encrypted_range_reads_do_not_require_unrelated_leaf_chunks() {
2525        let store = Arc::new(MemoryStore::new());
2526        let tree = HashTree::new(HashTreeConfig::new(store.clone()).with_chunk_size(100));
2527
2528        let data: Vec<u8> = (0..350).map(|i| (i % 256) as u8).collect();
2529        let (cid, _) = tree.put(&data).await.unwrap();
2530        let root = tree.get_node(&cid).await.unwrap().unwrap();
2531
2532        store.delete(&root.links[3].hash).await.unwrap();
2533
2534        assert_eq!(tree.get_size_cid(&cid).await.unwrap(), data.len() as u64);
2535        let range = tree
2536            .read_file_range_cid(&cid, 0, Some(50))
2537            .await
2538            .unwrap()
2539            .unwrap();
2540        assert_eq!(range, data[..50].to_vec());
2541    }
2542
2543    #[tokio::test]
2544    async fn test_encrypted_range_reads_do_not_require_unrelated_nested_subtrees() {
2545        let store = Arc::new(MemoryStore::new());
2546        let tree = HashTree::new(
2547            HashTreeConfig::new(store.clone())
2548                .with_chunk_size(100)
2549                .with_max_links(2),
2550        );
2551
2552        let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2553        let (cid, _) = tree.put(&data).await.unwrap();
2554        let root = tree.get_node(&cid).await.unwrap().unwrap();
2555
2556        store.delete(&root.links[1].hash).await.unwrap();
2557
2558        assert_eq!(tree.get_size_cid(&cid).await.unwrap(), data.len() as u64);
2559        let range = tree
2560            .read_file_range_cid(&cid, 0, Some(50))
2561            .await
2562            .unwrap()
2563            .unwrap();
2564        assert_eq!(range, data[..50].to_vec());
2565    }
2566
2567    #[tokio::test]
2568    async fn test_cid_deterministic() {
2569        let store = Arc::new(MemoryStore::new());
2570        let tree = HashTree::new(HashTreeConfig::new(store));
2571
2572        let data = b"Same content produces same CID";
2573
2574        let (cid1, _) = tree.put(data).await.unwrap();
2575        let (cid2, _) = tree.put(data).await.unwrap();
2576
2577        // CHK: same content = same hash AND same key
2578        assert_eq!(cid1.hash, cid2.hash);
2579        assert_eq!(cid1.key, cid2.key);
2580        assert_eq!(cid1.to_string(), cid2.to_string());
2581    }
2582
2583    #[tokio::test]
2584    async fn test_cid_to_string_public() {
2585        let store = Arc::new(MemoryStore::new());
2586        let tree = HashTree::new(HashTreeConfig::new(store).public());
2587
2588        let (cid, _) = tree.put(b"test").await.unwrap();
2589        let s = cid.to_string();
2590
2591        // Public CID is just the hash (64 hex chars)
2592        assert_eq!(s.len(), 64);
2593        assert!(!s.contains(':'));
2594    }
2595
2596    #[tokio::test]
2597    async fn test_cid_to_string_encrypted() {
2598        let store = Arc::new(MemoryStore::new());
2599        let tree = HashTree::new(HashTreeConfig::new(store));
2600
2601        let (cid, _) = tree.put(b"test").await.unwrap();
2602        let s = cid.to_string();
2603
2604        // Encrypted CID is "hash:key" (64 + 1 + 64 = 129 chars)
2605        assert_eq!(s.len(), 129);
2606        assert!(s.contains(':'));
2607    }
2608}