1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::io::AsyncRead;
10use futures::stream::{self, Stream};
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{
15 decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node,
16};
17use crate::hash::sha256;
18use crate::reader::{ReaderError, TreeEntry, WalkEntry};
19use crate::store::Store;
20use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
21
22use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
23
24#[path = "hashtree/stream.rs"]
25mod read_stream;
26mod walk;
27
28#[derive(Clone)]
30pub struct HashTreeConfig<S: Store> {
31 pub store: Arc<S>,
32 pub chunk_size: usize,
33 pub max_links: usize,
34 pub encrypted: bool,
36}
37
38impl<S: Store> HashTreeConfig<S> {
39 pub fn new(store: Arc<S>) -> Self {
40 Self {
41 store,
42 chunk_size: DEFAULT_CHUNK_SIZE,
43 max_links: DEFAULT_MAX_LINKS,
44 encrypted: true,
45 }
46 }
47
48 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
49 self.chunk_size = chunk_size;
50 self
51 }
52
53 pub fn with_max_links(mut self, max_links: usize) -> Self {
54 self.max_links = max_links;
55 self
56 }
57
58 pub fn public(mut self) -> Self {
60 self.encrypted = false;
61 self
62 }
63}
64
65#[derive(Debug, thiserror::Error)]
67pub enum HashTreeError {
68 #[error("Store error: {0}")]
69 Store(String),
70 #[error("Codec error: {0}")]
71 Codec(#[from] crate::codec::CodecError),
72 #[error("Missing chunk: {0}")]
73 MissingChunk(String),
74 #[error("Path not found: {0}")]
75 PathNotFound(String),
76 #[error("Entry not found: {0}")]
77 EntryNotFound(String),
78 #[error("Encryption error: {0}")]
79 Encryption(String),
80 #[error("Decryption error: {0}")]
81 Decryption(String),
82 #[error("Content size {actual_size} exceeds max_size {max_size}")]
83 SizeLimitExceeded { max_size: u64, actual_size: u64 },
84}
85
86impl From<BuilderError> for HashTreeError {
87 fn from(e: BuilderError) -> Self {
88 match e {
89 BuilderError::Store(s) => HashTreeError::Store(s),
90 BuilderError::Codec(c) => HashTreeError::Codec(c),
91 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
92 }
93 }
94}
95
96impl From<ReaderError> for HashTreeError {
97 fn from(e: ReaderError) -> Self {
98 match e {
99 ReaderError::Store(s) => HashTreeError::Store(s),
100 ReaderError::Codec(c) => HashTreeError::Codec(c),
101 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
102 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
103 ReaderError::MissingKey => {
104 HashTreeError::Encryption("missing decryption key".to_string())
105 }
106 }
107 }
108}
109
110pub struct HashTree<S: Store> {
112 store: Arc<S>,
113 chunk_size: usize,
114 max_links: usize,
115 encrypted: bool,
116}
117
118impl<S: Store> HashTree<S> {
119 fn is_legacy_internal_group_name(name: &str) -> bool {
120 name.starts_with('_') && !name.starts_with("_chunk_") && name.chars().count() == 2
121 }
122
123 fn node_uses_legacy_directory_fanout(node: &TreeNode) -> bool {
124 !node.links.is_empty()
125 && node.links.iter().all(|link| {
126 let Some(name) = link.name.as_deref() else {
127 return false;
128 };
129 Self::is_legacy_internal_group_name(name) && link.link_type == LinkType::Dir
130 })
131 }
132
133 fn is_internal_directory_link_with_legacy_fanout(
134 link: &Link,
135 uses_legacy_fanout: bool,
136 ) -> bool {
137 let Some(name) = link.name.as_deref() else {
138 return false;
139 };
140
141 if name.starts_with("_chunk_") {
142 return true;
143 }
144
145 uses_legacy_fanout
146 && Self::is_legacy_internal_group_name(name)
147 && link.link_type == LinkType::Dir
148 }
149
150 fn is_internal_directory_link(node: &TreeNode, link: &Link) -> bool {
151 Self::is_internal_directory_link_with_legacy_fanout(
152 link,
153 Self::node_uses_legacy_directory_fanout(node),
154 )
155 }
156
157 pub fn new(config: HashTreeConfig<S>) -> Self {
158 Self {
159 store: config.store,
160 chunk_size: config.chunk_size,
161 max_links: config.max_links,
162 encrypted: config.encrypted,
163 }
164 }
165
166 pub fn is_encrypted(&self) -> bool {
168 self.encrypted
169 }
170
171 pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
176 let size = data.len() as u64;
177
178 if data.len() <= self.chunk_size {
180 let (hash, key) = self.put_chunk_internal(data).await?;
181 return Ok((Cid { hash, key }, size));
182 }
183
184 let mut links: Vec<Link> = Vec::new();
186 let mut offset = 0;
187
188 while offset < data.len() {
189 let end = (offset + self.chunk_size).min(data.len());
190 let chunk = &data[offset..end];
191 let chunk_size = chunk.len() as u64;
192 let (hash, key) = self.put_chunk_internal(chunk).await?;
193 links.push(Link {
194 hash,
195 name: None,
196 size: chunk_size,
197 key,
198 link_type: LinkType::Blob, meta: None,
200 });
201 offset = end;
202 }
203
204 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
206 Ok((
207 Cid {
208 hash: root_hash,
209 key: root_key,
210 },
211 size,
212 ))
213 }
214
215 pub async fn get(
220 &self,
221 cid: &Cid,
222 max_size: Option<u64>,
223 ) -> Result<Option<Vec<u8>>, HashTreeError> {
224 if let Some(key) = cid.key {
225 self.get_encrypted(&cid.hash, &key, max_size).await
226 } else {
227 self.read_file_with_limit(&cid.hash, max_size).await
228 }
229 }
230
231 pub async fn put_stream<R: AsyncRead + Unpin>(
237 &self,
238 mut reader: R,
239 ) -> Result<(Cid, u64), HashTreeError> {
240 let mut buffer = vec![0u8; self.chunk_size];
241 let mut links = Vec::new();
242 let mut total_size: u64 = 0;
243 let mut consistent_key: Option<[u8; 32]> = None;
244
245 loop {
246 let mut chunk = Vec::new();
247 let mut bytes_read = 0;
248
249 while bytes_read < self.chunk_size {
251 let n = reader
252 .read(&mut buffer[..self.chunk_size - bytes_read])
253 .await
254 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
255 if n == 0 {
256 break; }
258 chunk.extend_from_slice(&buffer[..n]);
259 bytes_read += n;
260 }
261
262 if chunk.is_empty() {
263 break; }
265
266 let chunk_len = chunk.len() as u64;
267 total_size += chunk_len;
268
269 let (hash, key) = self.put_chunk_internal(&chunk).await?;
270
271 if links.is_empty() {
273 consistent_key = key;
274 } else if consistent_key != key {
275 consistent_key = None;
276 }
277
278 links.push(Link {
279 hash,
280 name: None,
281 size: chunk_len,
282 key,
283 link_type: LinkType::Blob, meta: None,
285 });
286 }
287
288 if links.is_empty() {
289 let (hash, key) = self.put_chunk_internal(&[]).await?;
291 return Ok((Cid { hash, key }, 0));
292 }
293
294 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
296 Ok((
297 Cid {
298 hash: root_hash,
299 key: root_key,
300 },
301 total_size,
302 ))
303 }
304
305 async fn put_chunk_internal(
307 &self,
308 data: &[u8],
309 ) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
310 if self.encrypted {
311 let (encrypted, key) =
312 encrypt_chk(data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
313 let hash = sha256(&encrypted);
314 self.store
315 .put(hash, encrypted)
316 .await
317 .map_err(|e| HashTreeError::Store(e.to_string()))?;
318 Ok((hash, Some(key)))
319 } else {
320 let hash = self.put_blob(data).await?;
321 Ok((hash, None))
322 }
323 }
324
325 async fn build_tree_internal(
327 &self,
328 links: Vec<Link>,
329 total_size: Option<u64>,
330 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
331 if links.len() == 1 {
333 if let Some(ts) = total_size {
334 if links[0].size == ts {
335 return Ok((links[0].hash, links[0].key));
336 }
337 }
338 }
339
340 if links.len() <= self.max_links {
341 let node = TreeNode {
342 node_type: LinkType::File,
343 links,
344 };
345 let (data, _) = encode_and_hash(&node)?;
346
347 if self.encrypted {
348 let (encrypted, key) =
349 encrypt_chk(&data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
350 let hash = sha256(&encrypted);
351 self.store
352 .put(hash, encrypted)
353 .await
354 .map_err(|e| HashTreeError::Store(e.to_string()))?;
355 return Ok((hash, Some(key)));
356 }
357
358 let hash = sha256(&data);
360 self.store
361 .put(hash, data)
362 .await
363 .map_err(|e| HashTreeError::Store(e.to_string()))?;
364 return Ok((hash, None));
365 }
366
367 let mut sub_links = Vec::new();
369 for batch in links.chunks(self.max_links) {
370 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
371 let (hash, key) =
372 Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
373 sub_links.push(Link {
374 hash,
375 name: None,
376 size: batch_size,
377 key,
378 link_type: LinkType::File, meta: None,
380 });
381 }
382
383 Box::pin(self.build_tree_internal(sub_links, total_size)).await
384 }
385
386 async fn get_encrypted(
388 &self,
389 hash: &Hash,
390 key: &EncryptionKey,
391 max_size: Option<u64>,
392 ) -> Result<Option<Vec<u8>>, HashTreeError> {
393 let decrypted = match self.get_encrypted_root(hash, key).await? {
394 Some(data) => data,
395 None => return Ok(None),
396 };
397
398 if is_tree_node(&decrypted) {
400 let node = decode_tree_node(&decrypted)?;
401 let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
402 Self::ensure_size_limit(max_size, declared_size)?;
403
404 let mut bytes_read = 0u64;
405 let assembled = self
406 .assemble_encrypted_chunks_limited(&node, max_size, &mut bytes_read)
407 .await?;
408 return Ok(Some(assembled));
409 }
410
411 Self::ensure_size_limit(max_size, decrypted.len() as u64)?;
413 Ok(Some(decrypted))
414 }
415
416 async fn get_encrypted_root(
417 &self,
418 hash: &Hash,
419 key: &EncryptionKey,
420 ) -> Result<Option<Vec<u8>>, HashTreeError> {
421 self.get_cid_root_bytes(&Cid {
422 hash: *hash,
423 key: Some(*key),
424 })
425 .await
426 .map_err(|err| match err {
427 HashTreeError::Decryption(message) => HashTreeError::Encryption(message),
428 other => other,
429 })
430 }
431
432 fn ensure_size_limit(max_size: Option<u64>, actual_size: u64) -> Result<(), HashTreeError> {
433 if let Some(max_size) = max_size {
434 if actual_size > max_size {
435 return Err(HashTreeError::SizeLimitExceeded {
436 max_size,
437 actual_size,
438 });
439 }
440 }
441 Ok(())
442 }
443
444 async fn assemble_encrypted_chunks_limited(
446 &self,
447 node: &TreeNode,
448 max_size: Option<u64>,
449 bytes_read: &mut u64,
450 ) -> Result<Vec<u8>, HashTreeError> {
451 let mut parts: Vec<Vec<u8>> = Vec::new();
452
453 for link in &node.links {
454 let projected = (*bytes_read).saturating_add(link.size);
455 Self::ensure_size_limit(max_size, projected)?;
456
457 let chunk_key = link
458 .key
459 .ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
460
461 let encrypted_child = self
462 .store
463 .get(&link.hash)
464 .await
465 .map_err(|e| HashTreeError::Store(e.to_string()))?
466 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
467
468 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
469 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
470
471 if is_tree_node(&decrypted) {
472 let child_node = decode_tree_node(&decrypted)?;
474 let child_data = Box::pin(self.assemble_encrypted_chunks_limited(
475 &child_node,
476 max_size,
477 bytes_read,
478 ))
479 .await?;
480 parts.push(child_data);
481 } else {
482 let projected = (*bytes_read).saturating_add(decrypted.len() as u64);
484 Self::ensure_size_limit(max_size, projected)?;
485 *bytes_read = projected;
486 parts.push(decrypted);
487 }
488 }
489
490 let total_len: usize = parts.iter().map(|p| p.len()).sum();
491 let mut result = Vec::with_capacity(total_len);
492 for part in parts {
493 result.extend_from_slice(&part);
494 }
495
496 Ok(result)
497 }
498
499 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
504 let hash = sha256(data);
505 self.store
506 .put(hash, data.to_vec())
507 .await
508 .map_err(|e| HashTreeError::Store(e.to_string()))?;
509 Ok(hash)
510 }
511
512 pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
515 let size = data.len() as u64;
516
517 if data.len() <= self.chunk_size {
519 let (hash, key) = self.put_chunk_internal(data).await?;
520 return Ok((Cid { hash, key }, size));
521 }
522
523 let mut links: Vec<Link> = Vec::new();
525 let mut offset = 0;
526
527 while offset < data.len() {
528 let end = (offset + self.chunk_size).min(data.len());
529 let chunk = &data[offset..end];
530 let chunk_size = (end - offset) as u64;
531
532 let (hash, key) = self.put_chunk_internal(chunk).await?;
533 links.push(Link {
534 hash,
535 name: None,
536 size: chunk_size,
537 key,
538 link_type: LinkType::Blob, meta: None,
540 });
541 offset = end;
542 }
543
544 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
546 Ok((
547 Cid {
548 hash: root_hash,
549 key: root_key,
550 },
551 size,
552 ))
553 }
554
555 pub async fn put_directory(&self, entries: Vec<DirEntry>) -> Result<Cid, HashTreeError> {
561 let mut sorted = entries;
563 sorted.sort_by(|a, b| a.name.cmp(&b.name));
564
565 let links: Vec<Link> = sorted
566 .into_iter()
567 .map(|e| Link {
568 hash: e.hash,
569 name: Some(e.name),
570 size: e.size,
571 key: e.key,
572 link_type: e.link_type,
573 meta: e.meta,
574 })
575 .collect();
576
577 let node = TreeNode {
579 node_type: LinkType::Dir,
580 links,
581 };
582 let (data, _plain_hash) = encode_and_hash(&node)?;
583
584 let (cid, _size) = self.put(&data).await?;
589 Ok(cid)
590 }
591
592 pub async fn put_tree_node(&self, links: Vec<Link>) -> Result<Hash, HashTreeError> {
594 let node = TreeNode {
595 node_type: LinkType::Dir,
596 links,
597 };
598
599 let (data, hash) = encode_and_hash(&node)?;
600 self.store
601 .put(hash, data)
602 .await
603 .map_err(|e| HashTreeError::Store(e.to_string()))?;
604 Ok(hash)
605 }
606
607 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
611 self.store
612 .get(hash)
613 .await
614 .map_err(|e| HashTreeError::Store(e.to_string()))
615 }
616
617 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
619 let data = match self
620 .store
621 .get(hash)
622 .await
623 .map_err(|e| HashTreeError::Store(e.to_string()))?
624 {
625 Some(d) => d,
626 None => return Ok(None),
627 };
628
629 if !is_tree_node(&data) {
630 return Ok(None);
631 }
632
633 let node = decode_tree_node(&data)?;
634 Ok(Some(node))
635 }
636
637 async fn get_cid_root_bytes(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
638 let data = match self
639 .store
640 .get(&cid.hash)
641 .await
642 .map_err(|e| HashTreeError::Store(e.to_string()))?
643 {
644 Some(d) => d,
645 None => return Ok(None),
646 };
647
648 let Some(key) = &cid.key else {
649 return Ok(Some(data));
650 };
651
652 let raw_is_tree = is_tree_node(&data);
653 match decrypt_chk(&data, key) {
654 Ok(decrypted) => {
655 if is_tree_node(&decrypted) || !raw_is_tree {
656 Ok(Some(decrypted))
657 } else {
658 Ok(Some(data))
659 }
660 }
661 Err(err) => {
662 if raw_is_tree {
663 Ok(Some(data))
664 } else {
665 Err(HashTreeError::Decryption(err.to_string()))
666 }
667 }
668 }
669 }
670
671 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
673 let decrypted = match self.get_cid_root_bytes(cid).await? {
674 Some(d) => d,
675 None => return Ok(None),
676 };
677
678 if !is_tree_node(&decrypted) {
679 return Ok(None);
680 }
681
682 let node = decode_tree_node(&decrypted)?;
683 Ok(Some(node))
684 }
685
686 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
689 let decrypted = match self.get_cid_root_bytes(cid).await? {
690 Some(d) => d,
691 None => return Ok(None),
692 };
693
694 if !is_tree_node(&decrypted) {
695 return Ok(None);
696 }
697
698 let node = decode_tree_node(&decrypted)?;
699
700 if node.node_type == LinkType::File {
702 let mut bytes_read = 0u64;
703 let assembled = self
704 .assemble_chunks_limited(&node, None, &mut bytes_read)
705 .await?;
706 if is_tree_node(&assembled) {
707 let inner_node = decode_tree_node(&assembled)?;
708 return Ok(Some(inner_node));
709 }
710 }
711
712 Ok(Some(node))
713 }
714
715 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
717 let data = match self
718 .store
719 .get(hash)
720 .await
721 .map_err(|e| HashTreeError::Store(e.to_string()))?
722 {
723 Some(d) => d,
724 None => return Ok(false),
725 };
726 Ok(is_tree_node(&data))
727 }
728
729 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
731 Ok(matches!(
732 self.get_directory_node(cid).await?,
733 Some(node) if node.node_type == LinkType::Dir
734 ))
735 }
736
737 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
739 let data = match self
740 .store
741 .get(hash)
742 .await
743 .map_err(|e| HashTreeError::Store(e.to_string()))?
744 {
745 Some(d) => d,
746 None => return Ok(false),
747 };
748 Ok(is_directory_node(&data))
749 }
750
751 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
753 self.read_file_with_limit(hash, None).await
754 }
755
756 async fn read_file_with_limit(
758 &self,
759 hash: &Hash,
760 max_size: Option<u64>,
761 ) -> Result<Option<Vec<u8>>, HashTreeError> {
762 let data = match self
763 .store
764 .get(hash)
765 .await
766 .map_err(|e| HashTreeError::Store(e.to_string()))?
767 {
768 Some(d) => d,
769 None => return Ok(None),
770 };
771
772 if !is_tree_node(&data) {
774 Self::ensure_size_limit(max_size, data.len() as u64)?;
775 return Ok(Some(data));
776 }
777
778 let node = decode_tree_node(&data)?;
780 let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
781 Self::ensure_size_limit(max_size, declared_size)?;
782
783 let mut bytes_read = 0u64;
784 let assembled = self
785 .assemble_chunks_limited(&node, max_size, &mut bytes_read)
786 .await?;
787 Ok(Some(assembled))
788 }
789
790 pub async fn read_file_range(
797 &self,
798 hash: &Hash,
799 start: u64,
800 end: Option<u64>,
801 ) -> Result<Option<Vec<u8>>, HashTreeError> {
802 let data = match self
803 .store
804 .get(hash)
805 .await
806 .map_err(|e| HashTreeError::Store(e.to_string()))?
807 {
808 Some(d) => d,
809 None => return Ok(None),
810 };
811
812 if !is_tree_node(&data) {
814 let start_idx = start as usize;
815 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
816 if start_idx >= data.len() {
817 return Ok(Some(vec![]));
818 }
819 let end_idx = end_idx.min(data.len());
820 return Ok(Some(data[start_idx..end_idx].to_vec()));
821 }
822
823 let node = decode_tree_node(&data)?;
825 let range_data = self.assemble_chunks_range(&node, start, end).await?;
826 Ok(Some(range_data))
827 }
828
829 pub async fn read_file_range_cid(
831 &self,
832 cid: &Cid,
833 start: u64,
834 end: Option<u64>,
835 ) -> Result<Option<Vec<u8>>, HashTreeError> {
836 if let Some(key) = cid.key {
837 let data = match self.get_encrypted_root(&cid.hash, &key).await? {
838 Some(d) => d,
839 None => return Ok(None),
840 };
841
842 if is_tree_node(&data) {
843 let node = decode_tree_node(&data)?;
844 let total_size: u64 = node.links.iter().map(|link| link.size).sum();
845 let actual_end = end.unwrap_or(total_size).min(total_size);
846 if start >= actual_end {
847 return Ok(Some(vec![]));
848 }
849
850 let mut result = Vec::with_capacity((actual_end - start) as usize);
851 self.append_encrypted_range(&node, start, actual_end, 0, &mut result)
852 .await?;
853 return Ok(Some(result));
854 }
855
856 let start_idx = start as usize;
857 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
858 if start_idx >= data.len() {
859 return Ok(Some(vec![]));
860 }
861 let end_idx = end_idx.min(data.len());
862 return Ok(Some(data[start_idx..end_idx].to_vec()));
863 }
864
865 self.read_file_range(&cid.hash, start, end).await
866 }
867
868 async fn append_encrypted_range(
869 &self,
870 node: &TreeNode,
871 start: u64,
872 end: u64,
873 base_offset: u64,
874 result: &mut Vec<u8>,
875 ) -> Result<(), HashTreeError> {
876 let mut current_offset = base_offset;
877
878 for link in &node.links {
879 let child_start = current_offset;
880 let child_end = child_start.saturating_add(link.size);
881 current_offset = child_end;
882
883 if child_end <= start {
884 continue;
885 }
886 if child_start >= end {
887 break;
888 }
889
890 let chunk_key = link
891 .key
892 .ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
893
894 let encrypted_child = self
895 .store
896 .get(&link.hash)
897 .await
898 .map_err(|e| HashTreeError::Store(e.to_string()))?
899 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
900 let decrypted_child = decrypt_chk(&encrypted_child, &chunk_key)
901 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
902
903 if is_tree_node(&decrypted_child) {
904 let child_node = decode_tree_node(&decrypted_child)?;
905 Box::pin(self.append_encrypted_range(&child_node, start, end, child_start, result))
906 .await?;
907 continue;
908 }
909
910 let slice_start = if start > child_start {
911 (start - child_start) as usize
912 } else {
913 0
914 };
915 let slice_end = if end < child_end {
916 (end - child_start) as usize
917 } else {
918 decrypted_child.len()
919 };
920 result.extend_from_slice(&decrypted_child[slice_start..slice_end]);
921 }
922
923 Ok(())
924 }
925
926 async fn assemble_chunks_range(
928 &self,
929 node: &TreeNode,
930 start: u64,
931 end: Option<u64>,
932 ) -> Result<Vec<u8>, HashTreeError> {
933 let chunks_info = self.collect_chunk_offsets(node).await?;
935
936 if chunks_info.is_empty() {
937 return Ok(vec![]);
938 }
939
940 let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
942 let actual_end = end.unwrap_or(total_size).min(total_size);
943
944 if start >= actual_end {
945 return Ok(vec![]);
946 }
947
948 let mut result = Vec::with_capacity((actual_end - start) as usize);
950 let mut current_offset = 0u64;
951
952 for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
953 let chunk_start = current_offset;
954 let chunk_end = current_offset + chunk_size;
955
956 if chunk_end > start && chunk_start < actual_end {
958 let chunk_data = self
960 .store
961 .get(chunk_hash)
962 .await
963 .map_err(|e| HashTreeError::Store(e.to_string()))?
964 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
965
966 let slice_start = if start > chunk_start {
968 (start - chunk_start) as usize
969 } else {
970 0
971 };
972 let slice_end = if actual_end < chunk_end {
973 (actual_end - chunk_start) as usize
974 } else {
975 chunk_data.len()
976 };
977
978 result.extend_from_slice(&chunk_data[slice_start..slice_end]);
979 }
980
981 current_offset = chunk_end;
982
983 if current_offset >= actual_end {
985 break;
986 }
987 }
988
989 Ok(result)
990 }
991
992 async fn collect_chunk_offsets(
995 &self,
996 node: &TreeNode,
997 ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
998 let mut chunks = Vec::new();
999 let mut offset = 0u64;
1000 self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset)
1001 .await?;
1002 Ok(chunks)
1003 }
1004
1005 async fn collect_chunk_offsets_recursive(
1006 &self,
1007 node: &TreeNode,
1008 chunks: &mut Vec<(Hash, u64, u64)>,
1009 offset: &mut u64,
1010 ) -> Result<(), HashTreeError> {
1011 for link in &node.links {
1012 let child_data = self
1013 .store
1014 .get(&link.hash)
1015 .await
1016 .map_err(|e| HashTreeError::Store(e.to_string()))?
1017 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1018
1019 if is_tree_node(&child_data) {
1020 let child_node = decode_tree_node(&child_data)?;
1022 Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
1023 } else {
1024 let size = child_data.len() as u64;
1026 chunks.push((link.hash, *offset, size));
1027 *offset += size;
1028 }
1029 }
1030 Ok(())
1031 }
1032
1033 async fn assemble_chunks_limited(
1035 &self,
1036 node: &TreeNode,
1037 max_size: Option<u64>,
1038 bytes_read: &mut u64,
1039 ) -> Result<Vec<u8>, HashTreeError> {
1040 let mut parts: Vec<Vec<u8>> = Vec::new();
1041
1042 for link in &node.links {
1043 let projected = (*bytes_read).saturating_add(link.size);
1044 Self::ensure_size_limit(max_size, projected)?;
1045
1046 let child_data = self
1047 .store
1048 .get(&link.hash)
1049 .await
1050 .map_err(|e| HashTreeError::Store(e.to_string()))?
1051 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1052
1053 if is_tree_node(&child_data) {
1054 let child_node = decode_tree_node(&child_data)?;
1055 parts.push(
1056 Box::pin(self.assemble_chunks_limited(&child_node, max_size, bytes_read))
1057 .await?,
1058 );
1059 } else {
1060 let projected = (*bytes_read).saturating_add(child_data.len() as u64);
1061 Self::ensure_size_limit(max_size, projected)?;
1062 *bytes_read = projected;
1063 parts.push(child_data);
1064 }
1065 }
1066
1067 let total_length: usize = parts.iter().map(|p| p.len()).sum();
1069 let mut result = Vec::with_capacity(total_length);
1070 for part in parts {
1071 result.extend_from_slice(&part);
1072 }
1073
1074 Ok(result)
1075 }
1076
1077 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1079 let data = match self
1080 .store
1081 .get(hash)
1082 .await
1083 .map_err(|e| HashTreeError::Store(e.to_string()))?
1084 {
1085 Some(d) => d,
1086 None => return Ok(vec![]),
1087 };
1088
1089 if !is_tree_node(&data) {
1090 return Ok(vec![data]);
1091 }
1092
1093 let node = decode_tree_node(&data)?;
1094 self.collect_chunks(&node).await
1095 }
1096
1097 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1098 let mut chunks = Vec::new();
1099
1100 for link in &node.links {
1101 let child_data = self
1102 .store
1103 .get(&link.hash)
1104 .await
1105 .map_err(|e| HashTreeError::Store(e.to_string()))?
1106 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1107
1108 if is_tree_node(&child_data) {
1109 let child_node = decode_tree_node(&child_data)?;
1110 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1111 } else {
1112 chunks.push(child_data);
1113 }
1114 }
1115
1116 Ok(chunks)
1117 }
1118
1119 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1121 let node = match self.get_node(cid).await? {
1122 Some(n) => n,
1123 None => return Ok(vec![]),
1124 };
1125
1126 let mut entries = Vec::new();
1127
1128 for link in &node.links {
1129 if Self::is_internal_directory_link(&node, link) {
1131 let chunk_cid = Cid {
1132 hash: link.hash,
1133 key: link.key,
1134 };
1135 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1136 entries.extend(sub_entries);
1137 continue;
1138 }
1139
1140 entries.push(TreeEntry {
1141 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1142 hash: link.hash,
1143 size: link.size,
1144 link_type: link.link_type,
1145 key: link.key,
1146 meta: link.meta.clone(),
1147 });
1148 }
1149
1150 Ok(entries)
1151 }
1152
1153 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1156 let node = match self.get_directory_node(cid).await? {
1158 Some(n) => n,
1159 None => return Ok(vec![]),
1160 };
1161
1162 let mut entries = Vec::new();
1163
1164 for link in &node.links {
1165 if Self::is_internal_directory_link(&node, link) {
1167 let sub_cid = Cid {
1169 hash: link.hash,
1170 key: cid.key,
1171 };
1172 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1173 entries.extend(sub_entries);
1174 continue;
1175 }
1176
1177 entries.push(TreeEntry {
1178 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1179 hash: link.hash,
1180 size: link.size,
1181 link_type: link.link_type,
1182 key: link.key,
1183 meta: link.meta.clone(),
1184 });
1185 }
1186
1187 Ok(entries)
1188 }
1189
1190 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1192 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1193 if parts.is_empty() {
1194 return Ok(Some(cid.clone()));
1195 }
1196
1197 let mut current_cid = cid.clone();
1198
1199 for part in parts {
1200 let node = match self.get_directory_node(¤t_cid).await? {
1202 Some(n) => n,
1203 None => return Ok(None),
1204 };
1205
1206 if let Some(link) = self.find_link(&node, part) {
1207 current_cid = Cid {
1208 hash: link.hash,
1209 key: link.key,
1210 };
1211 } else {
1212 match self
1214 .find_link_in_subtrees_cid(&node, part, ¤t_cid)
1215 .await?
1216 {
1217 Some(link) => {
1218 current_cid = Cid {
1219 hash: link.hash,
1220 key: link.key,
1221 };
1222 }
1223 None => return Ok(None),
1224 }
1225 }
1226 }
1227
1228 Ok(Some(current_cid))
1229 }
1230
1231 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1233 self.resolve(cid, path).await
1234 }
1235
1236 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1237 node.links
1238 .iter()
1239 .find(|l| l.name.as_deref() == Some(name))
1240 .cloned()
1241 }
1242
1243 async fn find_link_in_subtrees_cid(
1245 &self,
1246 node: &TreeNode,
1247 name: &str,
1248 _parent_cid: &Cid,
1249 ) -> Result<Option<Link>, HashTreeError> {
1250 for link in &node.links {
1251 if !Self::is_internal_directory_link(node, link) {
1252 continue;
1253 }
1254
1255 let sub_cid = Cid {
1257 hash: link.hash,
1258 key: link.key,
1259 };
1260
1261 let sub_node = match self.get_node(&sub_cid).await? {
1262 Some(n) => n,
1263 None => continue,
1264 };
1265
1266 if let Some(found) = self.find_link(&sub_node, name) {
1267 return Ok(Some(found));
1268 }
1269
1270 if let Some(deep_found) =
1271 Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await?
1272 {
1273 return Ok(Some(deep_found));
1274 }
1275 }
1276
1277 Ok(None)
1278 }
1279
1280 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1282 let data = match self
1283 .store
1284 .get(hash)
1285 .await
1286 .map_err(|e| HashTreeError::Store(e.to_string()))?
1287 {
1288 Some(d) => d,
1289 None => return Ok(0),
1290 };
1291
1292 if !is_tree_node(&data) {
1293 return Ok(data.len() as u64);
1294 }
1295
1296 let node = decode_tree_node(&data)?;
1297 let mut total = 0u64;
1299 for link in &node.links {
1300 total += link.size;
1301 }
1302 Ok(total)
1303 }
1304
1305 pub async fn get_size_cid(&self, cid: &Cid) -> Result<u64, HashTreeError> {
1307 if let Some(key) = cid.key {
1308 let data = match self.get_encrypted_root(&cid.hash, &key).await? {
1309 Some(d) => d,
1310 None => return Ok(0),
1311 };
1312 if is_tree_node(&data) {
1313 let node = decode_tree_node(&data)?;
1314 return Ok(node.links.iter().map(|link| link.size).sum());
1315 }
1316 return Ok(data.len() as u64);
1317 }
1318
1319 self.get_size(&cid.hash).await
1320 }
1321
1322 pub async fn set_entry(
1327 &self,
1328 root: &Cid,
1329 path: &[&str],
1330 name: &str,
1331 entry_cid: &Cid,
1332 size: u64,
1333 link_type: LinkType,
1334 ) -> Result<Cid, HashTreeError> {
1335 let dir_cid = self.resolve_path_array(root, path).await?;
1336 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1337
1338 let entries = self.list_directory(&dir_cid).await?;
1339 let mut new_entries: Vec<DirEntry> = entries
1340 .into_iter()
1341 .filter(|e| e.name != name)
1342 .map(|e| DirEntry {
1343 name: e.name,
1344 hash: e.hash,
1345 size: e.size,
1346 key: e.key,
1347 link_type: e.link_type,
1348 meta: e.meta,
1349 })
1350 .collect();
1351
1352 new_entries.push(DirEntry {
1353 name: name.to_string(),
1354 hash: entry_cid.hash,
1355 size,
1356 key: entry_cid.key,
1357 link_type,
1358 meta: None,
1359 });
1360
1361 let new_dir_cid = self.put_directory(new_entries).await?;
1362 self.rebuild_path(root, path, new_dir_cid).await
1363 }
1364
1365 pub async fn remove_entry(
1368 &self,
1369 root: &Cid,
1370 path: &[&str],
1371 name: &str,
1372 ) -> Result<Cid, HashTreeError> {
1373 let dir_cid = self.resolve_path_array(root, path).await?;
1374 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1375
1376 let entries = self.list_directory(&dir_cid).await?;
1377 let new_entries: Vec<DirEntry> = entries
1378 .into_iter()
1379 .filter(|e| e.name != name)
1380 .map(|e| DirEntry {
1381 name: e.name,
1382 hash: e.hash,
1383 size: e.size,
1384 key: e.key,
1385 link_type: e.link_type,
1386 meta: e.meta,
1387 })
1388 .collect();
1389
1390 let new_dir_cid = self.put_directory(new_entries).await?;
1391 self.rebuild_path(root, path, new_dir_cid).await
1392 }
1393
1394 pub async fn rename_entry(
1397 &self,
1398 root: &Cid,
1399 path: &[&str],
1400 old_name: &str,
1401 new_name: &str,
1402 ) -> Result<Cid, HashTreeError> {
1403 if old_name == new_name {
1404 return Ok(root.clone());
1405 }
1406
1407 let dir_cid = self.resolve_path_array(root, path).await?;
1408 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1409
1410 let entries = self.list_directory(&dir_cid).await?;
1411 let entry = entries
1412 .iter()
1413 .find(|e| e.name == old_name)
1414 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1415
1416 let entry_hash = entry.hash;
1417 let entry_size = entry.size;
1418 let entry_key = entry.key;
1419 let entry_link_type = entry.link_type;
1420 let entry_meta = entry.meta.clone();
1421
1422 let new_entries: Vec<DirEntry> = entries
1423 .into_iter()
1424 .filter(|e| e.name != old_name)
1425 .map(|e| DirEntry {
1426 name: e.name,
1427 hash: e.hash,
1428 size: e.size,
1429 key: e.key,
1430 link_type: e.link_type,
1431 meta: e.meta,
1432 })
1433 .chain(std::iter::once(DirEntry {
1434 name: new_name.to_string(),
1435 hash: entry_hash,
1436 size: entry_size,
1437 key: entry_key,
1438 link_type: entry_link_type,
1439 meta: entry_meta,
1440 }))
1441 .collect();
1442
1443 let new_dir_cid = self.put_directory(new_entries).await?;
1444 self.rebuild_path(root, path, new_dir_cid).await
1445 }
1446
1447 pub async fn move_entry(
1450 &self,
1451 root: &Cid,
1452 source_path: &[&str],
1453 name: &str,
1454 target_path: &[&str],
1455 ) -> Result<Cid, HashTreeError> {
1456 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1457 let source_dir_cid =
1458 source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1459
1460 let source_entries = self.list_directory(&source_dir_cid).await?;
1461 let entry = source_entries
1462 .iter()
1463 .find(|e| e.name == name)
1464 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1465
1466 let entry_cid = Cid {
1467 hash: entry.hash,
1468 key: entry.key,
1469 };
1470 let entry_size = entry.size;
1471 let entry_link_type = entry.link_type;
1472
1473 let new_root = self.remove_entry(root, source_path, name).await?;
1475
1476 self.set_entry(
1478 &new_root,
1479 target_path,
1480 name,
1481 &entry_cid,
1482 entry_size,
1483 entry_link_type,
1484 )
1485 .await
1486 }
1487
1488 async fn resolve_path_array(
1489 &self,
1490 root: &Cid,
1491 path: &[&str],
1492 ) -> Result<Option<Cid>, HashTreeError> {
1493 if path.is_empty() {
1494 return Ok(Some(root.clone()));
1495 }
1496 self.resolve_path(root, &path.join("/")).await
1497 }
1498
1499 async fn rebuild_path(
1500 &self,
1501 root: &Cid,
1502 path: &[&str],
1503 new_child: Cid,
1504 ) -> Result<Cid, HashTreeError> {
1505 if path.is_empty() {
1506 return Ok(new_child);
1507 }
1508
1509 let mut child_cid = new_child;
1510 let parts: Vec<&str> = path.to_vec();
1511
1512 for i in (0..parts.len()).rev() {
1513 let child_name = parts[i];
1514 let parent_path = &parts[..i];
1515
1516 let parent_cid = if parent_path.is_empty() {
1517 root.clone()
1518 } else {
1519 self.resolve_path_array(root, parent_path)
1520 .await?
1521 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1522 };
1523
1524 let parent_entries = self.list_directory(&parent_cid).await?;
1525 let new_parent_entries: Vec<DirEntry> = parent_entries
1526 .into_iter()
1527 .map(|e| {
1528 if e.name == child_name {
1529 DirEntry {
1530 name: e.name,
1531 hash: child_cid.hash,
1532 size: 0, key: child_cid.key,
1534 link_type: e.link_type,
1535 meta: e.meta,
1536 }
1537 } else {
1538 DirEntry {
1539 name: e.name,
1540 hash: e.hash,
1541 size: e.size,
1542 key: e.key,
1543 link_type: e.link_type,
1544 meta: e.meta,
1545 }
1546 }
1547 })
1548 .collect();
1549
1550 child_cid = self.put_directory(new_parent_entries).await?;
1551 }
1552
1553 Ok(child_cid)
1554 }
1555
1556 pub fn get_store(&self) -> Arc<S> {
1560 self.store.clone()
1561 }
1562
1563 pub fn chunk_size(&self) -> usize {
1565 self.chunk_size
1566 }
1567
1568 pub fn max_links(&self) -> usize {
1570 self.max_links
1571 }
1572}
1573
1574pub async fn verify_tree<S: Store>(
1576 store: Arc<S>,
1577 root_hash: &Hash,
1578) -> Result<crate::reader::VerifyResult, HashTreeError> {
1579 let mut missing = Vec::new();
1580 let mut visited = std::collections::HashSet::new();
1581
1582 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1583
1584 Ok(crate::reader::VerifyResult {
1585 valid: missing.is_empty(),
1586 missing,
1587 })
1588}
1589
1590async fn verify_recursive<S: Store>(
1591 store: Arc<S>,
1592 hash: &Hash,
1593 missing: &mut Vec<Hash>,
1594 visited: &mut std::collections::HashSet<String>,
1595) -> Result<(), HashTreeError> {
1596 let hex = to_hex(hash);
1597 if visited.contains(&hex) {
1598 return Ok(());
1599 }
1600 visited.insert(hex);
1601
1602 let data = match store
1603 .get(hash)
1604 .await
1605 .map_err(|e| HashTreeError::Store(e.to_string()))?
1606 {
1607 Some(d) => d,
1608 None => {
1609 missing.push(*hash);
1610 return Ok(());
1611 }
1612 };
1613
1614 if is_tree_node(&data) {
1615 let node = decode_tree_node(&data)?;
1616 for link in &node.links {
1617 Box::pin(verify_recursive(
1618 store.clone(),
1619 &link.hash,
1620 missing,
1621 visited,
1622 ))
1623 .await?;
1624 }
1625 }
1626
1627 Ok(())
1628}
1629
1630#[cfg(test)]
1631mod tests;