1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::io::AsyncRead;
10use futures::stream::{self, Stream};
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{
15 decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node,
16};
17use crate::hash::sha256;
18use crate::reader::{ReaderError, TreeEntry, WalkEntry};
19use crate::store::Store;
20use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
21
22use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
23
24#[derive(Clone)]
26pub struct HashTreeConfig<S: Store> {
27 pub store: Arc<S>,
28 pub chunk_size: usize,
29 pub max_links: usize,
30 pub encrypted: bool,
32}
33
34impl<S: Store> HashTreeConfig<S> {
35 pub fn new(store: Arc<S>) -> Self {
36 Self {
37 store,
38 chunk_size: DEFAULT_CHUNK_SIZE,
39 max_links: DEFAULT_MAX_LINKS,
40 encrypted: true,
41 }
42 }
43
44 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
45 self.chunk_size = chunk_size;
46 self
47 }
48
49 pub fn with_max_links(mut self, max_links: usize) -> Self {
50 self.max_links = max_links;
51 self
52 }
53
54 pub fn public(mut self) -> Self {
56 self.encrypted = false;
57 self
58 }
59}
60
61#[derive(Debug, thiserror::Error)]
63pub enum HashTreeError {
64 #[error("Store error: {0}")]
65 Store(String),
66 #[error("Codec error: {0}")]
67 Codec(#[from] crate::codec::CodecError),
68 #[error("Missing chunk: {0}")]
69 MissingChunk(String),
70 #[error("Path not found: {0}")]
71 PathNotFound(String),
72 #[error("Entry not found: {0}")]
73 EntryNotFound(String),
74 #[error("Encryption error: {0}")]
75 Encryption(String),
76 #[error("Decryption error: {0}")]
77 Decryption(String),
78 #[error("Content size {actual_size} exceeds max_size {max_size}")]
79 SizeLimitExceeded { max_size: u64, actual_size: u64 },
80}
81
82impl From<BuilderError> for HashTreeError {
83 fn from(e: BuilderError) -> Self {
84 match e {
85 BuilderError::Store(s) => HashTreeError::Store(s),
86 BuilderError::Codec(c) => HashTreeError::Codec(c),
87 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
88 }
89 }
90}
91
92impl From<ReaderError> for HashTreeError {
93 fn from(e: ReaderError) -> Self {
94 match e {
95 ReaderError::Store(s) => HashTreeError::Store(s),
96 ReaderError::Codec(c) => HashTreeError::Codec(c),
97 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
98 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
99 ReaderError::MissingKey => {
100 HashTreeError::Encryption("missing decryption key".to_string())
101 }
102 }
103 }
104}
105
106pub struct HashTree<S: Store> {
108 store: Arc<S>,
109 chunk_size: usize,
110 max_links: usize,
111 encrypted: bool,
112}
113
114impl<S: Store> HashTree<S> {
115 fn is_legacy_internal_group_name(name: &str) -> bool {
116 name.starts_with('_') && !name.starts_with("_chunk_") && name.chars().count() == 2
117 }
118
119 fn node_uses_legacy_directory_fanout(node: &TreeNode) -> bool {
120 !node.links.is_empty()
121 && node.links.iter().all(|link| {
122 let Some(name) = link.name.as_deref() else {
123 return false;
124 };
125 Self::is_legacy_internal_group_name(name) && link.link_type == LinkType::Dir
126 })
127 }
128
129 fn is_internal_directory_link_with_legacy_fanout(
130 link: &Link,
131 uses_legacy_fanout: bool,
132 ) -> bool {
133 let Some(name) = link.name.as_deref() else {
134 return false;
135 };
136
137 if name.starts_with("_chunk_") {
138 return true;
139 }
140
141 uses_legacy_fanout
142 && Self::is_legacy_internal_group_name(name)
143 && link.link_type == LinkType::Dir
144 }
145
146 fn is_internal_directory_link(node: &TreeNode, link: &Link) -> bool {
147 Self::is_internal_directory_link_with_legacy_fanout(
148 link,
149 Self::node_uses_legacy_directory_fanout(node),
150 )
151 }
152
153 pub fn new(config: HashTreeConfig<S>) -> Self {
154 Self {
155 store: config.store,
156 chunk_size: config.chunk_size,
157 max_links: config.max_links,
158 encrypted: config.encrypted,
159 }
160 }
161
162 pub fn is_encrypted(&self) -> bool {
164 self.encrypted
165 }
166
167 pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
172 let size = data.len() as u64;
173
174 if data.len() <= self.chunk_size {
176 let (hash, key) = self.put_chunk_internal(data).await?;
177 return Ok((Cid { hash, key }, size));
178 }
179
180 let mut links: Vec<Link> = Vec::new();
182 let mut offset = 0;
183
184 while offset < data.len() {
185 let end = (offset + self.chunk_size).min(data.len());
186 let chunk = &data[offset..end];
187 let chunk_size = chunk.len() as u64;
188 let (hash, key) = self.put_chunk_internal(chunk).await?;
189 links.push(Link {
190 hash,
191 name: None,
192 size: chunk_size,
193 key,
194 link_type: LinkType::Blob, meta: None,
196 });
197 offset = end;
198 }
199
200 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
202 Ok((
203 Cid {
204 hash: root_hash,
205 key: root_key,
206 },
207 size,
208 ))
209 }
210
211 pub async fn get(
216 &self,
217 cid: &Cid,
218 max_size: Option<u64>,
219 ) -> Result<Option<Vec<u8>>, HashTreeError> {
220 if let Some(key) = cid.key {
221 self.get_encrypted(&cid.hash, &key, max_size).await
222 } else {
223 self.read_file_with_limit(&cid.hash, max_size).await
224 }
225 }
226
227 pub async fn put_stream<R: AsyncRead + Unpin>(
233 &self,
234 mut reader: R,
235 ) -> Result<(Cid, u64), HashTreeError> {
236 let mut buffer = vec![0u8; self.chunk_size];
237 let mut links = Vec::new();
238 let mut total_size: u64 = 0;
239 let mut consistent_key: Option<[u8; 32]> = None;
240
241 loop {
242 let mut chunk = Vec::new();
243 let mut bytes_read = 0;
244
245 while bytes_read < self.chunk_size {
247 let n = reader
248 .read(&mut buffer[..self.chunk_size - bytes_read])
249 .await
250 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
251 if n == 0 {
252 break; }
254 chunk.extend_from_slice(&buffer[..n]);
255 bytes_read += n;
256 }
257
258 if chunk.is_empty() {
259 break; }
261
262 let chunk_len = chunk.len() as u64;
263 total_size += chunk_len;
264
265 let (hash, key) = self.put_chunk_internal(&chunk).await?;
266
267 if links.is_empty() {
269 consistent_key = key;
270 } else if consistent_key != key {
271 consistent_key = None;
272 }
273
274 links.push(Link {
275 hash,
276 name: None,
277 size: chunk_len,
278 key,
279 link_type: LinkType::Blob, meta: None,
281 });
282 }
283
284 if links.is_empty() {
285 let (hash, key) = self.put_chunk_internal(&[]).await?;
287 return Ok((Cid { hash, key }, 0));
288 }
289
290 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
292 Ok((
293 Cid {
294 hash: root_hash,
295 key: root_key,
296 },
297 total_size,
298 ))
299 }
300
301 pub fn get_stream(
306 &self,
307 cid: &Cid,
308 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
309 let hash = cid.hash;
310 let key = cid.key;
311
312 if let Some(k) = key {
313 Box::pin(self.read_file_stream_encrypted(hash, k))
315 } else {
316 self.read_file_stream(hash)
318 }
319 }
320
321 fn read_file_stream_encrypted(
323 &self,
324 hash: Hash,
325 key: EncryptionKey,
326 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
327 stream::unfold(
328 EncryptedStreamState::Init {
329 hash,
330 key,
331 tree: self,
332 },
333 |state| async move {
334 match state {
335 EncryptedStreamState::Init { hash, key, tree } => {
336 let data = match tree.store.get(&hash).await {
337 Ok(Some(d)) => d,
338 Ok(None) => return None,
339 Err(e) => {
340 return Some((
341 Err(HashTreeError::Store(e.to_string())),
342 EncryptedStreamState::Done,
343 ))
344 }
345 };
346
347 let decrypted = match decrypt_chk(&data, &key) {
349 Ok(d) => d,
350 Err(e) => {
351 return Some((
352 Err(HashTreeError::Decryption(e.to_string())),
353 EncryptedStreamState::Done,
354 ))
355 }
356 };
357
358 if !is_tree_node(&decrypted) {
359 return Some((Ok(decrypted), EncryptedStreamState::Done));
361 }
362
363 let node = match decode_tree_node(&decrypted) {
365 Ok(n) => n,
366 Err(e) => {
367 return Some((
368 Err(HashTreeError::Codec(e)),
369 EncryptedStreamState::Done,
370 ))
371 }
372 };
373
374 let mut stack: Vec<EncryptedStackItem> = Vec::new();
375 for link in node.links.into_iter().rev() {
376 stack.push(EncryptedStackItem {
377 hash: link.hash,
378 key: link.key,
379 });
380 }
381
382 tree.process_encrypted_stream_stack(&mut stack).await
383 }
384 EncryptedStreamState::Processing { mut stack, tree } => {
385 tree.process_encrypted_stream_stack(&mut stack).await
386 }
387 EncryptedStreamState::Done => None,
388 }
389 },
390 )
391 }
392
393 async fn process_encrypted_stream_stack<'a>(
394 &'a self,
395 stack: &mut Vec<EncryptedStackItem>,
396 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
397 while let Some(item) = stack.pop() {
398 let data = match self.store.get(&item.hash).await {
399 Ok(Some(d)) => d,
400 Ok(None) => {
401 return Some((
402 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
403 EncryptedStreamState::Done,
404 ))
405 }
406 Err(e) => {
407 return Some((
408 Err(HashTreeError::Store(e.to_string())),
409 EncryptedStreamState::Done,
410 ))
411 }
412 };
413
414 let decrypted = if let Some(key) = item.key {
416 match decrypt_chk(&data, &key) {
417 Ok(d) => d,
418 Err(e) => {
419 return Some((
420 Err(HashTreeError::Decryption(e.to_string())),
421 EncryptedStreamState::Done,
422 ))
423 }
424 }
425 } else {
426 data
427 };
428
429 if is_tree_node(&decrypted) {
430 let node = match decode_tree_node(&decrypted) {
432 Ok(n) => n,
433 Err(e) => {
434 return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done))
435 }
436 };
437 for link in node.links.into_iter().rev() {
438 stack.push(EncryptedStackItem {
439 hash: link.hash,
440 key: link.key,
441 });
442 }
443 } else {
444 return Some((
446 Ok(decrypted),
447 EncryptedStreamState::Processing {
448 stack: std::mem::take(stack),
449 tree: self,
450 },
451 ));
452 }
453 }
454 None
455 }
456
457 async fn put_chunk_internal(
459 &self,
460 data: &[u8],
461 ) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
462 if self.encrypted {
463 let (encrypted, key) =
464 encrypt_chk(data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
465 let hash = sha256(&encrypted);
466 self.store
467 .put(hash, encrypted)
468 .await
469 .map_err(|e| HashTreeError::Store(e.to_string()))?;
470 Ok((hash, Some(key)))
471 } else {
472 let hash = self.put_blob(data).await?;
473 Ok((hash, None))
474 }
475 }
476
477 async fn build_tree_internal(
479 &self,
480 links: Vec<Link>,
481 total_size: Option<u64>,
482 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
483 if links.len() == 1 {
485 if let Some(ts) = total_size {
486 if links[0].size == ts {
487 return Ok((links[0].hash, links[0].key));
488 }
489 }
490 }
491
492 if links.len() <= self.max_links {
493 let node = TreeNode {
494 node_type: LinkType::File,
495 links,
496 };
497 let (data, _) = encode_and_hash(&node)?;
498
499 if self.encrypted {
500 let (encrypted, key) =
501 encrypt_chk(&data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
502 let hash = sha256(&encrypted);
503 self.store
504 .put(hash, encrypted)
505 .await
506 .map_err(|e| HashTreeError::Store(e.to_string()))?;
507 return Ok((hash, Some(key)));
508 }
509
510 let hash = sha256(&data);
512 self.store
513 .put(hash, data)
514 .await
515 .map_err(|e| HashTreeError::Store(e.to_string()))?;
516 return Ok((hash, None));
517 }
518
519 let mut sub_links = Vec::new();
521 for batch in links.chunks(self.max_links) {
522 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
523 let (hash, key) =
524 Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
525 sub_links.push(Link {
526 hash,
527 name: None,
528 size: batch_size,
529 key,
530 link_type: LinkType::File, meta: None,
532 });
533 }
534
535 Box::pin(self.build_tree_internal(sub_links, total_size)).await
536 }
537
538 async fn get_encrypted(
540 &self,
541 hash: &Hash,
542 key: &EncryptionKey,
543 max_size: Option<u64>,
544 ) -> Result<Option<Vec<u8>>, HashTreeError> {
545 let encrypted_data = match self
546 .store
547 .get(hash)
548 .await
549 .map_err(|e| HashTreeError::Store(e.to_string()))?
550 {
551 Some(d) => d,
552 None => return Ok(None),
553 };
554
555 let decrypted = decrypt_chk(&encrypted_data, key)
557 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
558
559 if is_tree_node(&decrypted) {
561 let node = decode_tree_node(&decrypted)?;
562 let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
563 Self::ensure_size_limit(max_size, declared_size)?;
564
565 let mut bytes_read = 0u64;
566 let assembled = self
567 .assemble_encrypted_chunks_limited(&node, max_size, &mut bytes_read)
568 .await?;
569 return Ok(Some(assembled));
570 }
571
572 Self::ensure_size_limit(max_size, decrypted.len() as u64)?;
574 Ok(Some(decrypted))
575 }
576
577 fn ensure_size_limit(max_size: Option<u64>, actual_size: u64) -> Result<(), HashTreeError> {
578 if let Some(max_size) = max_size {
579 if actual_size > max_size {
580 return Err(HashTreeError::SizeLimitExceeded {
581 max_size,
582 actual_size,
583 });
584 }
585 }
586 Ok(())
587 }
588
589 async fn assemble_encrypted_chunks_limited(
591 &self,
592 node: &TreeNode,
593 max_size: Option<u64>,
594 bytes_read: &mut u64,
595 ) -> Result<Vec<u8>, HashTreeError> {
596 let mut parts: Vec<Vec<u8>> = Vec::new();
597
598 for link in &node.links {
599 let projected = (*bytes_read).saturating_add(link.size);
600 Self::ensure_size_limit(max_size, projected)?;
601
602 let chunk_key = link
603 .key
604 .ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
605
606 let encrypted_child = self
607 .store
608 .get(&link.hash)
609 .await
610 .map_err(|e| HashTreeError::Store(e.to_string()))?
611 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
612
613 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
614 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
615
616 if is_tree_node(&decrypted) {
617 let child_node = decode_tree_node(&decrypted)?;
619 let child_data = Box::pin(self.assemble_encrypted_chunks_limited(
620 &child_node,
621 max_size,
622 bytes_read,
623 ))
624 .await?;
625 parts.push(child_data);
626 } else {
627 let projected = (*bytes_read).saturating_add(decrypted.len() as u64);
629 Self::ensure_size_limit(max_size, projected)?;
630 *bytes_read = projected;
631 parts.push(decrypted);
632 }
633 }
634
635 let total_len: usize = parts.iter().map(|p| p.len()).sum();
636 let mut result = Vec::with_capacity(total_len);
637 for part in parts {
638 result.extend_from_slice(&part);
639 }
640
641 Ok(result)
642 }
643
644 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
649 let hash = sha256(data);
650 self.store
651 .put(hash, data.to_vec())
652 .await
653 .map_err(|e| HashTreeError::Store(e.to_string()))?;
654 Ok(hash)
655 }
656
657 pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
660 let size = data.len() as u64;
661
662 if data.len() <= self.chunk_size {
664 let (hash, key) = self.put_chunk_internal(data).await?;
665 return Ok((Cid { hash, key }, size));
666 }
667
668 let mut links: Vec<Link> = Vec::new();
670 let mut offset = 0;
671
672 while offset < data.len() {
673 let end = (offset + self.chunk_size).min(data.len());
674 let chunk = &data[offset..end];
675 let chunk_size = (end - offset) as u64;
676
677 let (hash, key) = self.put_chunk_internal(chunk).await?;
678 links.push(Link {
679 hash,
680 name: None,
681 size: chunk_size,
682 key,
683 link_type: LinkType::Blob, meta: None,
685 });
686 offset = end;
687 }
688
689 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
691 Ok((
692 Cid {
693 hash: root_hash,
694 key: root_key,
695 },
696 size,
697 ))
698 }
699
700 pub async fn put_directory(&self, entries: Vec<DirEntry>) -> Result<Cid, HashTreeError> {
706 let mut sorted = entries;
708 sorted.sort_by(|a, b| a.name.cmp(&b.name));
709
710 let links: Vec<Link> = sorted
711 .into_iter()
712 .map(|e| Link {
713 hash: e.hash,
714 name: Some(e.name),
715 size: e.size,
716 key: e.key,
717 link_type: e.link_type,
718 meta: e.meta,
719 })
720 .collect();
721
722 let node = TreeNode {
724 node_type: LinkType::Dir,
725 links,
726 };
727 let (data, _plain_hash) = encode_and_hash(&node)?;
728
729 let (cid, _size) = self.put(&data).await?;
734 Ok(cid)
735 }
736
737 pub async fn put_tree_node(&self, links: Vec<Link>) -> Result<Hash, HashTreeError> {
739 let node = TreeNode {
740 node_type: LinkType::Dir,
741 links,
742 };
743
744 let (data, hash) = encode_and_hash(&node)?;
745 self.store
746 .put(hash, data)
747 .await
748 .map_err(|e| HashTreeError::Store(e.to_string()))?;
749 Ok(hash)
750 }
751
752 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
756 self.store
757 .get(hash)
758 .await
759 .map_err(|e| HashTreeError::Store(e.to_string()))
760 }
761
762 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
764 let data = match self
765 .store
766 .get(hash)
767 .await
768 .map_err(|e| HashTreeError::Store(e.to_string()))?
769 {
770 Some(d) => d,
771 None => return Ok(None),
772 };
773
774 if !is_tree_node(&data) {
775 return Ok(None);
776 }
777
778 let node = decode_tree_node(&data)?;
779 Ok(Some(node))
780 }
781
782 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
784 let data = match self
785 .store
786 .get(&cid.hash)
787 .await
788 .map_err(|e| HashTreeError::Store(e.to_string()))?
789 {
790 Some(d) => d,
791 None => return Ok(None),
792 };
793
794 let decrypted = if let Some(key) = &cid.key {
796 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
797 } else {
798 data
799 };
800
801 if !is_tree_node(&decrypted) {
802 return Ok(None);
803 }
804
805 let node = decode_tree_node(&decrypted)?;
806 Ok(Some(node))
807 }
808
809 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
812 let data = match self
813 .store
814 .get(&cid.hash)
815 .await
816 .map_err(|e| HashTreeError::Store(e.to_string()))?
817 {
818 Some(d) => d,
819 None => return Ok(None),
820 };
821
822 let decrypted = if let Some(key) = &cid.key {
824 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
825 } else {
826 data
827 };
828
829 if !is_tree_node(&decrypted) {
830 return Ok(None);
831 }
832
833 let node = decode_tree_node(&decrypted)?;
834
835 if node.node_type == LinkType::File {
837 let mut bytes_read = 0u64;
838 let assembled = self
839 .assemble_chunks_limited(&node, None, &mut bytes_read)
840 .await?;
841 if is_tree_node(&assembled) {
842 let inner_node = decode_tree_node(&assembled)?;
843 return Ok(Some(inner_node));
844 }
845 }
846
847 Ok(Some(node))
848 }
849
850 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
852 let data = match self
853 .store
854 .get(hash)
855 .await
856 .map_err(|e| HashTreeError::Store(e.to_string()))?
857 {
858 Some(d) => d,
859 None => return Ok(false),
860 };
861 Ok(is_tree_node(&data))
862 }
863
864 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
866 Ok(matches!(
867 self.get_directory_node(cid).await?,
868 Some(node) if node.node_type == LinkType::Dir
869 ))
870 }
871
872 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
874 let data = match self
875 .store
876 .get(hash)
877 .await
878 .map_err(|e| HashTreeError::Store(e.to_string()))?
879 {
880 Some(d) => d,
881 None => return Ok(false),
882 };
883 Ok(is_directory_node(&data))
884 }
885
886 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
888 self.read_file_with_limit(hash, None).await
889 }
890
891 async fn read_file_with_limit(
893 &self,
894 hash: &Hash,
895 max_size: Option<u64>,
896 ) -> Result<Option<Vec<u8>>, HashTreeError> {
897 let data = match self
898 .store
899 .get(hash)
900 .await
901 .map_err(|e| HashTreeError::Store(e.to_string()))?
902 {
903 Some(d) => d,
904 None => return Ok(None),
905 };
906
907 if !is_tree_node(&data) {
909 Self::ensure_size_limit(max_size, data.len() as u64)?;
910 return Ok(Some(data));
911 }
912
913 let node = decode_tree_node(&data)?;
915 let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
916 Self::ensure_size_limit(max_size, declared_size)?;
917
918 let mut bytes_read = 0u64;
919 let assembled = self
920 .assemble_chunks_limited(&node, max_size, &mut bytes_read)
921 .await?;
922 Ok(Some(assembled))
923 }
924
925 pub async fn read_file_range(
932 &self,
933 hash: &Hash,
934 start: u64,
935 end: Option<u64>,
936 ) -> Result<Option<Vec<u8>>, HashTreeError> {
937 let data = match self
938 .store
939 .get(hash)
940 .await
941 .map_err(|e| HashTreeError::Store(e.to_string()))?
942 {
943 Some(d) => d,
944 None => return Ok(None),
945 };
946
947 if !is_tree_node(&data) {
949 let start_idx = start as usize;
950 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
951 if start_idx >= data.len() {
952 return Ok(Some(vec![]));
953 }
954 let end_idx = end_idx.min(data.len());
955 return Ok(Some(data[start_idx..end_idx].to_vec()));
956 }
957
958 let node = decode_tree_node(&data)?;
960 let range_data = self.assemble_chunks_range(&node, start, end).await?;
961 Ok(Some(range_data))
962 }
963
964 pub async fn read_file_range_cid(
966 &self,
967 cid: &Cid,
968 start: u64,
969 end: Option<u64>,
970 ) -> Result<Option<Vec<u8>>, HashTreeError> {
971 if let Some(_key) = cid.key {
972 let data = match self.get(cid, None).await? {
973 Some(d) => d,
974 None => return Ok(None),
975 };
976 let start_idx = start as usize;
977 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
978 if start_idx >= data.len() {
979 return Ok(Some(vec![]));
980 }
981 let end_idx = end_idx.min(data.len());
982 return Ok(Some(data[start_idx..end_idx].to_vec()));
983 }
984
985 self.read_file_range(&cid.hash, start, end).await
986 }
987
988 async fn assemble_chunks_range(
990 &self,
991 node: &TreeNode,
992 start: u64,
993 end: Option<u64>,
994 ) -> Result<Vec<u8>, HashTreeError> {
995 let chunks_info = self.collect_chunk_offsets(node).await?;
997
998 if chunks_info.is_empty() {
999 return Ok(vec![]);
1000 }
1001
1002 let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
1004 let actual_end = end.unwrap_or(total_size).min(total_size);
1005
1006 if start >= actual_end {
1007 return Ok(vec![]);
1008 }
1009
1010 let mut result = Vec::with_capacity((actual_end - start) as usize);
1012 let mut current_offset = 0u64;
1013
1014 for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
1015 let chunk_start = current_offset;
1016 let chunk_end = current_offset + chunk_size;
1017
1018 if chunk_end > start && chunk_start < actual_end {
1020 let chunk_data = self
1022 .store
1023 .get(chunk_hash)
1024 .await
1025 .map_err(|e| HashTreeError::Store(e.to_string()))?
1026 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
1027
1028 let slice_start = if start > chunk_start {
1030 (start - chunk_start) as usize
1031 } else {
1032 0
1033 };
1034 let slice_end = if actual_end < chunk_end {
1035 (actual_end - chunk_start) as usize
1036 } else {
1037 chunk_data.len()
1038 };
1039
1040 result.extend_from_slice(&chunk_data[slice_start..slice_end]);
1041 }
1042
1043 current_offset = chunk_end;
1044
1045 if current_offset >= actual_end {
1047 break;
1048 }
1049 }
1050
1051 Ok(result)
1052 }
1053
1054 async fn collect_chunk_offsets(
1057 &self,
1058 node: &TreeNode,
1059 ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
1060 let mut chunks = Vec::new();
1061 let mut offset = 0u64;
1062 self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset)
1063 .await?;
1064 Ok(chunks)
1065 }
1066
1067 async fn collect_chunk_offsets_recursive(
1068 &self,
1069 node: &TreeNode,
1070 chunks: &mut Vec<(Hash, u64, u64)>,
1071 offset: &mut u64,
1072 ) -> Result<(), HashTreeError> {
1073 for link in &node.links {
1074 let child_data = self
1075 .store
1076 .get(&link.hash)
1077 .await
1078 .map_err(|e| HashTreeError::Store(e.to_string()))?
1079 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1080
1081 if is_tree_node(&child_data) {
1082 let child_node = decode_tree_node(&child_data)?;
1084 Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
1085 } else {
1086 let size = child_data.len() as u64;
1088 chunks.push((link.hash, *offset, size));
1089 *offset += size;
1090 }
1091 }
1092 Ok(())
1093 }
1094
1095 async fn assemble_chunks_limited(
1097 &self,
1098 node: &TreeNode,
1099 max_size: Option<u64>,
1100 bytes_read: &mut u64,
1101 ) -> Result<Vec<u8>, HashTreeError> {
1102 let mut parts: Vec<Vec<u8>> = Vec::new();
1103
1104 for link in &node.links {
1105 let projected = (*bytes_read).saturating_add(link.size);
1106 Self::ensure_size_limit(max_size, projected)?;
1107
1108 let child_data = self
1109 .store
1110 .get(&link.hash)
1111 .await
1112 .map_err(|e| HashTreeError::Store(e.to_string()))?
1113 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1114
1115 if is_tree_node(&child_data) {
1116 let child_node = decode_tree_node(&child_data)?;
1117 parts.push(
1118 Box::pin(self.assemble_chunks_limited(&child_node, max_size, bytes_read))
1119 .await?,
1120 );
1121 } else {
1122 let projected = (*bytes_read).saturating_add(child_data.len() as u64);
1123 Self::ensure_size_limit(max_size, projected)?;
1124 *bytes_read = projected;
1125 parts.push(child_data);
1126 }
1127 }
1128
1129 let total_length: usize = parts.iter().map(|p| p.len()).sum();
1131 let mut result = Vec::with_capacity(total_length);
1132 for part in parts {
1133 result.extend_from_slice(&part);
1134 }
1135
1136 Ok(result)
1137 }
1138
1139 pub fn read_file_stream(
1142 &self,
1143 hash: Hash,
1144 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
1145 Box::pin(stream::unfold(
1146 ReadStreamState::Init { hash, tree: self },
1147 |state| async move {
1148 match state {
1149 ReadStreamState::Init { hash, tree } => {
1150 let data = match tree.store.get(&hash).await {
1151 Ok(Some(d)) => d,
1152 Ok(None) => return None,
1153 Err(e) => {
1154 return Some((
1155 Err(HashTreeError::Store(e.to_string())),
1156 ReadStreamState::Done,
1157 ))
1158 }
1159 };
1160
1161 if !is_tree_node(&data) {
1162 return Some((Ok(data), ReadStreamState::Done));
1164 }
1165
1166 let node = match decode_tree_node(&data) {
1168 Ok(n) => n,
1169 Err(e) => {
1170 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1171 }
1172 };
1173
1174 let mut stack: Vec<StreamStackItem> = Vec::new();
1176 for link in node.links.into_iter().rev() {
1177 stack.push(StreamStackItem::Hash(link.hash));
1178 }
1179
1180 tree.process_stream_stack(&mut stack).await
1182 }
1183 ReadStreamState::Processing { mut stack, tree } => {
1184 tree.process_stream_stack(&mut stack).await
1185 }
1186 ReadStreamState::Done => None,
1187 }
1188 },
1189 ))
1190 }
1191
1192 async fn process_stream_stack<'a>(
1193 &'a self,
1194 stack: &mut Vec<StreamStackItem>,
1195 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
1196 while let Some(item) = stack.pop() {
1197 match item {
1198 StreamStackItem::Hash(hash) => {
1199 let data = match self.store.get(&hash).await {
1200 Ok(Some(d)) => d,
1201 Ok(None) => {
1202 return Some((
1203 Err(HashTreeError::MissingChunk(to_hex(&hash))),
1204 ReadStreamState::Done,
1205 ))
1206 }
1207 Err(e) => {
1208 return Some((
1209 Err(HashTreeError::Store(e.to_string())),
1210 ReadStreamState::Done,
1211 ))
1212 }
1213 };
1214
1215 if is_tree_node(&data) {
1216 let node = match decode_tree_node(&data) {
1218 Ok(n) => n,
1219 Err(e) => {
1220 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1221 }
1222 };
1223 for link in node.links.into_iter().rev() {
1224 stack.push(StreamStackItem::Hash(link.hash));
1225 }
1226 } else {
1227 return Some((
1229 Ok(data),
1230 ReadStreamState::Processing {
1231 stack: std::mem::take(stack),
1232 tree: self,
1233 },
1234 ));
1235 }
1236 }
1237 }
1238 }
1239 None
1240 }
1241
1242 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1244 let data = match self
1245 .store
1246 .get(hash)
1247 .await
1248 .map_err(|e| HashTreeError::Store(e.to_string()))?
1249 {
1250 Some(d) => d,
1251 None => return Ok(vec![]),
1252 };
1253
1254 if !is_tree_node(&data) {
1255 return Ok(vec![data]);
1256 }
1257
1258 let node = decode_tree_node(&data)?;
1259 self.collect_chunks(&node).await
1260 }
1261
1262 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1263 let mut chunks = Vec::new();
1264
1265 for link in &node.links {
1266 let child_data = self
1267 .store
1268 .get(&link.hash)
1269 .await
1270 .map_err(|e| HashTreeError::Store(e.to_string()))?
1271 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1272
1273 if is_tree_node(&child_data) {
1274 let child_node = decode_tree_node(&child_data)?;
1275 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1276 } else {
1277 chunks.push(child_data);
1278 }
1279 }
1280
1281 Ok(chunks)
1282 }
1283
1284 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1286 let node = match self.get_node(cid).await? {
1287 Some(n) => n,
1288 None => return Ok(vec![]),
1289 };
1290
1291 let mut entries = Vec::new();
1292
1293 for link in &node.links {
1294 if Self::is_internal_directory_link(&node, link) {
1296 let chunk_cid = Cid {
1297 hash: link.hash,
1298 key: link.key,
1299 };
1300 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1301 entries.extend(sub_entries);
1302 continue;
1303 }
1304
1305 entries.push(TreeEntry {
1306 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1307 hash: link.hash,
1308 size: link.size,
1309 link_type: link.link_type,
1310 key: link.key,
1311 meta: link.meta.clone(),
1312 });
1313 }
1314
1315 Ok(entries)
1316 }
1317
1318 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1321 let node = match self.get_directory_node(cid).await? {
1323 Some(n) => n,
1324 None => return Ok(vec![]),
1325 };
1326
1327 let mut entries = Vec::new();
1328
1329 for link in &node.links {
1330 if Self::is_internal_directory_link(&node, link) {
1332 let sub_cid = Cid {
1334 hash: link.hash,
1335 key: cid.key,
1336 };
1337 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1338 entries.extend(sub_entries);
1339 continue;
1340 }
1341
1342 entries.push(TreeEntry {
1343 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1344 hash: link.hash,
1345 size: link.size,
1346 link_type: link.link_type,
1347 key: link.key,
1348 meta: link.meta.clone(),
1349 });
1350 }
1351
1352 Ok(entries)
1353 }
1354
1355 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1357 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1358 if parts.is_empty() {
1359 return Ok(Some(cid.clone()));
1360 }
1361
1362 let mut current_cid = cid.clone();
1363
1364 for part in parts {
1365 let node = match self.get_directory_node(¤t_cid).await? {
1367 Some(n) => n,
1368 None => return Ok(None),
1369 };
1370
1371 if let Some(link) = self.find_link(&node, part) {
1372 current_cid = Cid {
1373 hash: link.hash,
1374 key: link.key,
1375 };
1376 } else {
1377 match self
1379 .find_link_in_subtrees_cid(&node, part, ¤t_cid)
1380 .await?
1381 {
1382 Some(link) => {
1383 current_cid = Cid {
1384 hash: link.hash,
1385 key: link.key,
1386 };
1387 }
1388 None => return Ok(None),
1389 }
1390 }
1391 }
1392
1393 Ok(Some(current_cid))
1394 }
1395
1396 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1398 self.resolve(cid, path).await
1399 }
1400
1401 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1402 node.links
1403 .iter()
1404 .find(|l| l.name.as_deref() == Some(name))
1405 .cloned()
1406 }
1407
1408 async fn find_link_in_subtrees_cid(
1410 &self,
1411 node: &TreeNode,
1412 name: &str,
1413 _parent_cid: &Cid,
1414 ) -> Result<Option<Link>, HashTreeError> {
1415 for link in &node.links {
1416 if !Self::is_internal_directory_link(node, link) {
1417 continue;
1418 }
1419
1420 let sub_cid = Cid {
1422 hash: link.hash,
1423 key: link.key,
1424 };
1425
1426 let sub_node = match self.get_node(&sub_cid).await? {
1427 Some(n) => n,
1428 None => continue,
1429 };
1430
1431 if let Some(found) = self.find_link(&sub_node, name) {
1432 return Ok(Some(found));
1433 }
1434
1435 if let Some(deep_found) =
1436 Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await?
1437 {
1438 return Ok(Some(deep_found));
1439 }
1440 }
1441
1442 Ok(None)
1443 }
1444
1445 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1447 let data = match self
1448 .store
1449 .get(hash)
1450 .await
1451 .map_err(|e| HashTreeError::Store(e.to_string()))?
1452 {
1453 Some(d) => d,
1454 None => return Ok(0),
1455 };
1456
1457 if !is_tree_node(&data) {
1458 return Ok(data.len() as u64);
1459 }
1460
1461 let node = decode_tree_node(&data)?;
1462 let mut total = 0u64;
1464 for link in &node.links {
1465 total += link.size;
1466 }
1467 Ok(total)
1468 }
1469
1470 pub async fn get_size_cid(&self, cid: &Cid) -> Result<u64, HashTreeError> {
1472 if cid.key.is_some() {
1473 let data = match self.get(cid, None).await? {
1474 Some(d) => d,
1475 None => return Ok(0),
1476 };
1477 return Ok(data.len() as u64);
1478 }
1479
1480 self.get_size(&cid.hash).await
1481 }
1482
1483 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1485 let mut entries = Vec::new();
1486 self.walk_recursive(cid, path, &mut entries).await?;
1487 Ok(entries)
1488 }
1489
1490 async fn walk_recursive(
1491 &self,
1492 cid: &Cid,
1493 path: &str,
1494 entries: &mut Vec<WalkEntry>,
1495 ) -> Result<(), HashTreeError> {
1496 let data = match self
1497 .store
1498 .get(&cid.hash)
1499 .await
1500 .map_err(|e| HashTreeError::Store(e.to_string()))?
1501 {
1502 Some(d) => d,
1503 None => return Ok(()),
1504 };
1505
1506 let data = if let Some(key) = &cid.key {
1508 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1509 } else {
1510 data
1511 };
1512
1513 let node = match try_decode_tree_node(&data) {
1514 Some(n) => n,
1515 None => {
1516 entries.push(WalkEntry {
1517 path: path.to_string(),
1518 hash: cid.hash,
1519 link_type: LinkType::Blob,
1520 size: data.len() as u64,
1521 key: cid.key,
1522 });
1523 return Ok(());
1524 }
1525 };
1526
1527 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1528 entries.push(WalkEntry {
1529 path: path.to_string(),
1530 hash: cid.hash,
1531 link_type: node.node_type,
1532 size: node_size,
1533 key: cid.key,
1534 });
1535
1536 for link in &node.links {
1537 let child_path = match &link.name {
1538 Some(name) => {
1539 if Self::is_internal_directory_link(&node, link) {
1540 let sub_cid = Cid {
1542 hash: link.hash,
1543 key: cid.key,
1544 };
1545 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1546 continue;
1547 }
1548 if path.is_empty() {
1549 name.clone()
1550 } else {
1551 format!("{}/{}", path, name)
1552 }
1553 }
1554 None => path.to_string(),
1555 };
1556
1557 let child_cid = Cid {
1559 hash: link.hash,
1560 key: link.key,
1561 };
1562 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1563 }
1564
1565 Ok(())
1566 }
1567
1568 pub async fn walk_parallel(
1571 &self,
1572 cid: &Cid,
1573 path: &str,
1574 concurrency: usize,
1575 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1576 self.walk_parallel_with_progress(cid, path, concurrency, None)
1577 .await
1578 }
1579
1580 pub async fn walk_parallel_with_progress(
1587 &self,
1588 cid: &Cid,
1589 path: &str,
1590 concurrency: usize,
1591 progress: Option<&std::sync::atomic::AtomicUsize>,
1592 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1593 use futures::stream::{FuturesUnordered, StreamExt};
1594 use std::collections::VecDeque;
1595 use std::sync::atomic::Ordering;
1596
1597 let mut entries = Vec::new();
1598 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1599 let mut active = FuturesUnordered::new();
1600
1601 pending.push_back((cid.clone(), path.to_string()));
1603
1604 loop {
1605 while active.len() < concurrency {
1607 if let Some((node_cid, node_path)) = pending.pop_front() {
1608 let store = &self.store;
1609 let fut = async move {
1610 let data = store
1611 .get(&node_cid.hash)
1612 .await
1613 .map_err(|e| HashTreeError::Store(e.to_string()))?;
1614 Ok::<_, HashTreeError>((node_cid, node_path, data))
1615 };
1616 active.push(fut);
1617 } else {
1618 break;
1619 }
1620 }
1621
1622 if active.is_empty() {
1624 break;
1625 }
1626
1627 if let Some(result) = active.next().await {
1629 let (node_cid, node_path, data) = result?;
1630
1631 if let Some(counter) = progress {
1633 counter.fetch_add(1, Ordering::Relaxed);
1634 }
1635
1636 let data = match data {
1637 Some(d) => d,
1638 None => continue,
1639 };
1640
1641 let data = if let Some(key) = &node_cid.key {
1643 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1644 } else {
1645 data
1646 };
1647
1648 let node = match try_decode_tree_node(&data) {
1649 Some(n) => n,
1650 None => {
1651 entries.push(WalkEntry {
1653 path: node_path,
1654 hash: node_cid.hash,
1655 link_type: LinkType::Blob,
1656 size: data.len() as u64,
1657 key: node_cid.key,
1658 });
1659 continue;
1660 }
1661 };
1662
1663 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1665 entries.push(WalkEntry {
1666 path: node_path.clone(),
1667 hash: node_cid.hash,
1668 link_type: node.node_type,
1669 size: node_size,
1670 key: node_cid.key,
1671 });
1672
1673 for link in &node.links {
1675 let child_path = match &link.name {
1676 Some(name) => {
1677 if Self::is_internal_directory_link(&node, link) {
1678 let sub_cid = Cid {
1680 hash: link.hash,
1681 key: node_cid.key,
1682 };
1683 pending.push_back((sub_cid, node_path.clone()));
1684 continue;
1685 }
1686 if node_path.is_empty() {
1687 name.clone()
1688 } else {
1689 format!("{}/{}", node_path, name)
1690 }
1691 }
1692 None => node_path.clone(),
1693 };
1694
1695 if link.link_type == LinkType::Blob {
1698 entries.push(WalkEntry {
1699 path: child_path,
1700 hash: link.hash,
1701 link_type: LinkType::Blob,
1702 size: link.size,
1703 key: link.key,
1704 });
1705 if let Some(counter) = progress {
1706 counter.fetch_add(1, Ordering::Relaxed);
1707 }
1708 continue;
1709 }
1710
1711 let child_cid = Cid {
1713 hash: link.hash,
1714 key: link.key,
1715 };
1716 pending.push_back((child_cid, child_path));
1717 }
1718 }
1719 }
1720
1721 Ok(entries)
1722 }
1723
1724 pub fn walk_stream(
1726 &self,
1727 cid: Cid,
1728 initial_path: String,
1729 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1730 Box::pin(stream::unfold(
1731 WalkStreamState::Init {
1732 cid,
1733 path: initial_path,
1734 tree: self,
1735 },
1736 |state| async move {
1737 match state {
1738 WalkStreamState::Init { cid, path, tree } => {
1739 let data = match tree.store.get(&cid.hash).await {
1740 Ok(Some(d)) => d,
1741 Ok(None) => return None,
1742 Err(e) => {
1743 return Some((
1744 Err(HashTreeError::Store(e.to_string())),
1745 WalkStreamState::Done,
1746 ))
1747 }
1748 };
1749
1750 let data = if let Some(key) = &cid.key {
1752 match decrypt_chk(&data, key) {
1753 Ok(d) => d,
1754 Err(e) => {
1755 return Some((
1756 Err(HashTreeError::Decryption(e.to_string())),
1757 WalkStreamState::Done,
1758 ))
1759 }
1760 }
1761 } else {
1762 data
1763 };
1764
1765 let node = match try_decode_tree_node(&data) {
1766 Some(n) => n,
1767 None => {
1768 let entry = WalkEntry {
1770 path,
1771 hash: cid.hash,
1772 link_type: LinkType::Blob,
1773 size: data.len() as u64,
1774 key: cid.key,
1775 };
1776 return Some((Ok(entry), WalkStreamState::Done));
1777 }
1778 };
1779
1780 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1781 let entry = WalkEntry {
1782 path: path.clone(),
1783 hash: cid.hash,
1784 link_type: node.node_type,
1785 size: node_size,
1786 key: cid.key,
1787 };
1788
1789 let mut stack: Vec<WalkStackItem> = Vec::new();
1791 let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
1792 for link in node.links.into_iter().rev() {
1793 let is_internal = Self::is_internal_directory_link_with_legacy_fanout(
1794 &link,
1795 uses_legacy_fanout,
1796 );
1797 let child_path = match &link.name {
1798 Some(name) if !is_internal => {
1799 if path.is_empty() {
1800 name.clone()
1801 } else {
1802 format!("{}/{}", path, name)
1803 }
1804 }
1805 _ => path.clone(),
1806 };
1807 stack.push(WalkStackItem {
1809 hash: link.hash,
1810 path: child_path,
1811 key: link.key,
1812 });
1813 }
1814
1815 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1816 }
1817 WalkStreamState::Processing { mut stack, tree } => {
1818 tree.process_walk_stack(&mut stack).await
1819 }
1820 WalkStreamState::Done => None,
1821 }
1822 },
1823 ))
1824 }
1825
1826 async fn process_walk_stack<'a>(
1827 &'a self,
1828 stack: &mut Vec<WalkStackItem>,
1829 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1830 while let Some(item) = stack.pop() {
1831 let data = match self.store.get(&item.hash).await {
1832 Ok(Some(d)) => d,
1833 Ok(None) => continue,
1834 Err(e) => {
1835 return Some((
1836 Err(HashTreeError::Store(e.to_string())),
1837 WalkStreamState::Done,
1838 ))
1839 }
1840 };
1841
1842 let node = match try_decode_tree_node(&data) {
1843 Some(n) => n,
1844 None => {
1845 let entry = WalkEntry {
1847 path: item.path,
1848 hash: item.hash,
1849 link_type: LinkType::Blob,
1850 size: data.len() as u64,
1851 key: item.key,
1852 };
1853 return Some((
1854 Ok(entry),
1855 WalkStreamState::Processing {
1856 stack: std::mem::take(stack),
1857 tree: self,
1858 },
1859 ));
1860 }
1861 };
1862
1863 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1864 let entry = WalkEntry {
1865 path: item.path.clone(),
1866 hash: item.hash,
1867 link_type: node.node_type,
1868 size: node_size,
1869 key: None, };
1871
1872 let uses_legacy_fanout = Self::node_uses_legacy_directory_fanout(&node);
1874 for link in node.links.into_iter().rev() {
1875 let is_internal =
1876 Self::is_internal_directory_link_with_legacy_fanout(&link, uses_legacy_fanout);
1877 let child_path = match &link.name {
1878 Some(name) if !is_internal => {
1879 if item.path.is_empty() {
1880 name.clone()
1881 } else {
1882 format!("{}/{}", item.path, name)
1883 }
1884 }
1885 _ => item.path.clone(),
1886 };
1887 stack.push(WalkStackItem {
1888 hash: link.hash,
1889 path: child_path,
1890 key: link.key,
1891 });
1892 }
1893
1894 return Some((
1895 Ok(entry),
1896 WalkStreamState::Processing {
1897 stack: std::mem::take(stack),
1898 tree: self,
1899 },
1900 ));
1901 }
1902 None
1903 }
1904
1905 pub async fn set_entry(
1910 &self,
1911 root: &Cid,
1912 path: &[&str],
1913 name: &str,
1914 entry_cid: &Cid,
1915 size: u64,
1916 link_type: LinkType,
1917 ) -> Result<Cid, HashTreeError> {
1918 let dir_cid = self.resolve_path_array(root, path).await?;
1919 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1920
1921 let entries = self.list_directory(&dir_cid).await?;
1922 let mut new_entries: Vec<DirEntry> = entries
1923 .into_iter()
1924 .filter(|e| e.name != name)
1925 .map(|e| DirEntry {
1926 name: e.name,
1927 hash: e.hash,
1928 size: e.size,
1929 key: e.key,
1930 link_type: e.link_type,
1931 meta: e.meta,
1932 })
1933 .collect();
1934
1935 new_entries.push(DirEntry {
1936 name: name.to_string(),
1937 hash: entry_cid.hash,
1938 size,
1939 key: entry_cid.key,
1940 link_type,
1941 meta: None,
1942 });
1943
1944 let new_dir_cid = self.put_directory(new_entries).await?;
1945 self.rebuild_path(root, path, new_dir_cid).await
1946 }
1947
1948 pub async fn remove_entry(
1951 &self,
1952 root: &Cid,
1953 path: &[&str],
1954 name: &str,
1955 ) -> Result<Cid, HashTreeError> {
1956 let dir_cid = self.resolve_path_array(root, path).await?;
1957 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1958
1959 let entries = self.list_directory(&dir_cid).await?;
1960 let new_entries: Vec<DirEntry> = entries
1961 .into_iter()
1962 .filter(|e| e.name != name)
1963 .map(|e| DirEntry {
1964 name: e.name,
1965 hash: e.hash,
1966 size: e.size,
1967 key: e.key,
1968 link_type: e.link_type,
1969 meta: e.meta,
1970 })
1971 .collect();
1972
1973 let new_dir_cid = self.put_directory(new_entries).await?;
1974 self.rebuild_path(root, path, new_dir_cid).await
1975 }
1976
1977 pub async fn rename_entry(
1980 &self,
1981 root: &Cid,
1982 path: &[&str],
1983 old_name: &str,
1984 new_name: &str,
1985 ) -> Result<Cid, HashTreeError> {
1986 if old_name == new_name {
1987 return Ok(root.clone());
1988 }
1989
1990 let dir_cid = self.resolve_path_array(root, path).await?;
1991 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1992
1993 let entries = self.list_directory(&dir_cid).await?;
1994 let entry = entries
1995 .iter()
1996 .find(|e| e.name == old_name)
1997 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1998
1999 let entry_hash = entry.hash;
2000 let entry_size = entry.size;
2001 let entry_key = entry.key;
2002 let entry_link_type = entry.link_type;
2003 let entry_meta = entry.meta.clone();
2004
2005 let new_entries: Vec<DirEntry> = entries
2006 .into_iter()
2007 .filter(|e| e.name != old_name)
2008 .map(|e| DirEntry {
2009 name: e.name,
2010 hash: e.hash,
2011 size: e.size,
2012 key: e.key,
2013 link_type: e.link_type,
2014 meta: e.meta,
2015 })
2016 .chain(std::iter::once(DirEntry {
2017 name: new_name.to_string(),
2018 hash: entry_hash,
2019 size: entry_size,
2020 key: entry_key,
2021 link_type: entry_link_type,
2022 meta: entry_meta,
2023 }))
2024 .collect();
2025
2026 let new_dir_cid = self.put_directory(new_entries).await?;
2027 self.rebuild_path(root, path, new_dir_cid).await
2028 }
2029
2030 pub async fn move_entry(
2033 &self,
2034 root: &Cid,
2035 source_path: &[&str],
2036 name: &str,
2037 target_path: &[&str],
2038 ) -> Result<Cid, HashTreeError> {
2039 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
2040 let source_dir_cid =
2041 source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
2042
2043 let source_entries = self.list_directory(&source_dir_cid).await?;
2044 let entry = source_entries
2045 .iter()
2046 .find(|e| e.name == name)
2047 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
2048
2049 let entry_cid = Cid {
2050 hash: entry.hash,
2051 key: entry.key,
2052 };
2053 let entry_size = entry.size;
2054 let entry_link_type = entry.link_type;
2055
2056 let new_root = self.remove_entry(root, source_path, name).await?;
2058
2059 self.set_entry(
2061 &new_root,
2062 target_path,
2063 name,
2064 &entry_cid,
2065 entry_size,
2066 entry_link_type,
2067 )
2068 .await
2069 }
2070
2071 async fn resolve_path_array(
2072 &self,
2073 root: &Cid,
2074 path: &[&str],
2075 ) -> Result<Option<Cid>, HashTreeError> {
2076 if path.is_empty() {
2077 return Ok(Some(root.clone()));
2078 }
2079 self.resolve_path(root, &path.join("/")).await
2080 }
2081
2082 async fn rebuild_path(
2083 &self,
2084 root: &Cid,
2085 path: &[&str],
2086 new_child: Cid,
2087 ) -> Result<Cid, HashTreeError> {
2088 if path.is_empty() {
2089 return Ok(new_child);
2090 }
2091
2092 let mut child_cid = new_child;
2093 let parts: Vec<&str> = path.to_vec();
2094
2095 for i in (0..parts.len()).rev() {
2096 let child_name = parts[i];
2097 let parent_path = &parts[..i];
2098
2099 let parent_cid = if parent_path.is_empty() {
2100 root.clone()
2101 } else {
2102 self.resolve_path_array(root, parent_path)
2103 .await?
2104 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
2105 };
2106
2107 let parent_entries = self.list_directory(&parent_cid).await?;
2108 let new_parent_entries: Vec<DirEntry> = parent_entries
2109 .into_iter()
2110 .map(|e| {
2111 if e.name == child_name {
2112 DirEntry {
2113 name: e.name,
2114 hash: child_cid.hash,
2115 size: 0, key: child_cid.key,
2117 link_type: e.link_type,
2118 meta: e.meta,
2119 }
2120 } else {
2121 DirEntry {
2122 name: e.name,
2123 hash: e.hash,
2124 size: e.size,
2125 key: e.key,
2126 link_type: e.link_type,
2127 meta: e.meta,
2128 }
2129 }
2130 })
2131 .collect();
2132
2133 child_cid = self.put_directory(new_parent_entries).await?;
2134 }
2135
2136 Ok(child_cid)
2137 }
2138
2139 pub fn get_store(&self) -> Arc<S> {
2143 self.store.clone()
2144 }
2145
2146 pub fn chunk_size(&self) -> usize {
2148 self.chunk_size
2149 }
2150
2151 pub fn max_links(&self) -> usize {
2153 self.max_links
2154 }
2155}
2156
2157enum StreamStackItem {
2160 Hash(Hash),
2161}
2162
2163enum ReadStreamState<'a, S: Store> {
2164 Init {
2165 hash: Hash,
2166 tree: &'a HashTree<S>,
2167 },
2168 Processing {
2169 stack: Vec<StreamStackItem>,
2170 tree: &'a HashTree<S>,
2171 },
2172 Done,
2173}
2174
2175struct WalkStackItem {
2176 hash: Hash,
2177 path: String,
2178 key: Option<[u8; 32]>,
2179}
2180
2181enum WalkStreamState<'a, S: Store> {
2182 Init {
2183 cid: Cid,
2184 path: String,
2185 tree: &'a HashTree<S>,
2186 },
2187 Processing {
2188 stack: Vec<WalkStackItem>,
2189 tree: &'a HashTree<S>,
2190 },
2191 Done,
2192}
2193
2194struct EncryptedStackItem {
2196 hash: Hash,
2197 key: Option<[u8; 32]>,
2198}
2199
2200enum EncryptedStreamState<'a, S: Store> {
2201 Init {
2202 hash: Hash,
2203 key: [u8; 32],
2204 tree: &'a HashTree<S>,
2205 },
2206 Processing {
2207 stack: Vec<EncryptedStackItem>,
2208 tree: &'a HashTree<S>,
2209 },
2210 Done,
2211}
2212
2213pub async fn verify_tree<S: Store>(
2215 store: Arc<S>,
2216 root_hash: &Hash,
2217) -> Result<crate::reader::VerifyResult, HashTreeError> {
2218 let mut missing = Vec::new();
2219 let mut visited = std::collections::HashSet::new();
2220
2221 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
2222
2223 Ok(crate::reader::VerifyResult {
2224 valid: missing.is_empty(),
2225 missing,
2226 })
2227}
2228
2229async fn verify_recursive<S: Store>(
2230 store: Arc<S>,
2231 hash: &Hash,
2232 missing: &mut Vec<Hash>,
2233 visited: &mut std::collections::HashSet<String>,
2234) -> Result<(), HashTreeError> {
2235 let hex = to_hex(hash);
2236 if visited.contains(&hex) {
2237 return Ok(());
2238 }
2239 visited.insert(hex);
2240
2241 let data = match store
2242 .get(hash)
2243 .await
2244 .map_err(|e| HashTreeError::Store(e.to_string()))?
2245 {
2246 Some(d) => d,
2247 None => {
2248 missing.push(*hash);
2249 return Ok(());
2250 }
2251 };
2252
2253 if is_tree_node(&data) {
2254 let node = decode_tree_node(&data)?;
2255 for link in &node.links {
2256 Box::pin(verify_recursive(
2257 store.clone(),
2258 &link.hash,
2259 missing,
2260 visited,
2261 ))
2262 .await?;
2263 }
2264 }
2265
2266 Ok(())
2267}
2268
2269#[cfg(test)]
2270mod tests {
2271 use super::*;
2272 use crate::store::MemoryStore;
2273
2274 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
2275 let store = Arc::new(MemoryStore::new());
2276 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
2278 (store, tree)
2279 }
2280
2281 #[tokio::test]
2282 async fn test_put_and_read_blob() {
2283 let (_store, tree) = make_tree();
2284
2285 let data = vec![1, 2, 3, 4, 5];
2286 let hash = tree.put_blob(&data).await.unwrap();
2287
2288 let result = tree.get_blob(&hash).await.unwrap();
2289 assert_eq!(result, Some(data));
2290 }
2291
2292 #[tokio::test]
2293 async fn test_put_and_read_file_small() {
2294 let (_store, tree) = make_tree();
2295
2296 let data = b"Hello, World!";
2297 let (cid, size) = tree.put_file(data).await.unwrap();
2298
2299 assert_eq!(size, data.len() as u64);
2300
2301 let read_data = tree.read_file(&cid.hash).await.unwrap();
2302 assert_eq!(read_data, Some(data.to_vec()));
2303 }
2304
2305 #[tokio::test]
2306 async fn test_put_and_read_directory() {
2307 let (_store, tree) = make_tree();
2308
2309 let file1 = tree.put_blob(b"content1").await.unwrap();
2310 let file2 = tree.put_blob(b"content2").await.unwrap();
2311
2312 let dir_cid = tree
2313 .put_directory(vec![
2314 DirEntry::new("a.txt", file1).with_size(8),
2315 DirEntry::new("b.txt", file2).with_size(8),
2316 ])
2317 .await
2318 .unwrap();
2319
2320 let entries = tree.list_directory(&dir_cid).await.unwrap();
2321 assert_eq!(entries.len(), 2);
2322 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
2323 assert!(names.contains(&"a.txt"));
2324 assert!(names.contains(&"b.txt"));
2325 }
2326
2327 #[tokio::test]
2328 async fn test_is_directory() {
2329 let (_store, tree) = make_tree();
2330
2331 let file_hash = tree.put_blob(b"data").await.unwrap();
2332 let dir_cid = tree.put_directory(vec![]).await.unwrap();
2333
2334 assert!(!tree.is_directory(&file_hash).await.unwrap());
2335 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
2336 }
2337
2338 #[tokio::test]
2339 async fn test_resolve_path() {
2340 let (_store, tree) = make_tree();
2341
2342 let file_hash = tree.put_blob(b"nested").await.unwrap();
2343 let sub_dir = tree
2344 .put_directory(vec![DirEntry::new("file.txt", file_hash).with_size(6)])
2345 .await
2346 .unwrap();
2347 let root_dir = tree
2348 .put_directory(vec![DirEntry::new("subdir", sub_dir.hash)])
2349 .await
2350 .unwrap();
2351
2352 let resolved = tree
2353 .resolve_path(&root_dir, "subdir/file.txt")
2354 .await
2355 .unwrap();
2356 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
2357 }
2358
2359 #[tokio::test]
2362 async fn test_unified_put_get_public() {
2363 let store = Arc::new(MemoryStore::new());
2364 let tree = HashTree::new(HashTreeConfig::new(store).public());
2366
2367 let data = b"Hello, public world!";
2368 let (cid, size) = tree.put(data).await.unwrap();
2369
2370 assert_eq!(size, data.len() as u64);
2371 assert!(cid.key.is_none()); let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2374 assert_eq!(retrieved, data);
2375 }
2376
2377 #[tokio::test]
2378 async fn test_unified_put_get_encrypted() {
2379 let store = Arc::new(MemoryStore::new());
2380 let tree = HashTree::new(HashTreeConfig::new(store));
2382
2383 let data = b"Hello, encrypted world!";
2384 let (cid, size) = tree.put(data).await.unwrap();
2385
2386 assert_eq!(size, data.len() as u64);
2387 assert!(cid.key.is_some()); let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2390 assert_eq!(retrieved, data);
2391 }
2392
2393 #[tokio::test]
2394 async fn test_unified_put_get_encrypted_chunked() {
2395 let store = Arc::new(MemoryStore::new());
2396 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2397
2398 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2400 let (cid, size) = tree.put(&data).await.unwrap();
2401
2402 assert_eq!(size, data.len() as u64);
2403 assert!(cid.key.is_some());
2404
2405 let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2406 assert_eq!(retrieved, data);
2407 }
2408
2409 #[tokio::test]
2410 async fn test_cid_deterministic() {
2411 let store = Arc::new(MemoryStore::new());
2412 let tree = HashTree::new(HashTreeConfig::new(store));
2413
2414 let data = b"Same content produces same CID";
2415
2416 let (cid1, _) = tree.put(data).await.unwrap();
2417 let (cid2, _) = tree.put(data).await.unwrap();
2418
2419 assert_eq!(cid1.hash, cid2.hash);
2421 assert_eq!(cid1.key, cid2.key);
2422 assert_eq!(cid1.to_string(), cid2.to_string());
2423 }
2424
2425 #[tokio::test]
2426 async fn test_cid_to_string_public() {
2427 let store = Arc::new(MemoryStore::new());
2428 let tree = HashTree::new(HashTreeConfig::new(store).public());
2429
2430 let (cid, _) = tree.put(b"test").await.unwrap();
2431 let s = cid.to_string();
2432
2433 assert_eq!(s.len(), 64);
2435 assert!(!s.contains(':'));
2436 }
2437
2438 #[tokio::test]
2439 async fn test_cid_to_string_encrypted() {
2440 let store = Arc::new(MemoryStore::new());
2441 let tree = HashTree::new(HashTreeConfig::new(store));
2442
2443 let (cid, _) = tree.put(b"test").await.unwrap();
2444 let s = cid.to_string();
2445
2446 assert_eq!(s.len(), 129);
2448 assert!(s.contains(':'));
2449 }
2450}