1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::io::AsyncRead;
10use futures::stream::{self, Stream};
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{
15 decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node,
16};
17use crate::hash::sha256;
18use crate::reader::{ReaderError, TreeEntry, WalkEntry};
19use crate::store::Store;
20use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
21
22use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
23
24#[derive(Clone)]
26pub struct HashTreeConfig<S: Store> {
27 pub store: Arc<S>,
28 pub chunk_size: usize,
29 pub max_links: usize,
30 pub encrypted: bool,
32}
33
34impl<S: Store> HashTreeConfig<S> {
35 pub fn new(store: Arc<S>) -> Self {
36 Self {
37 store,
38 chunk_size: DEFAULT_CHUNK_SIZE,
39 max_links: DEFAULT_MAX_LINKS,
40 encrypted: true,
41 }
42 }
43
44 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
45 self.chunk_size = chunk_size;
46 self
47 }
48
49 pub fn with_max_links(mut self, max_links: usize) -> Self {
50 self.max_links = max_links;
51 self
52 }
53
54 pub fn public(mut self) -> Self {
56 self.encrypted = false;
57 self
58 }
59}
60
61#[derive(Debug, thiserror::Error)]
63pub enum HashTreeError {
64 #[error("Store error: {0}")]
65 Store(String),
66 #[error("Codec error: {0}")]
67 Codec(#[from] crate::codec::CodecError),
68 #[error("Missing chunk: {0}")]
69 MissingChunk(String),
70 #[error("Path not found: {0}")]
71 PathNotFound(String),
72 #[error("Entry not found: {0}")]
73 EntryNotFound(String),
74 #[error("Encryption error: {0}")]
75 Encryption(String),
76 #[error("Decryption error: {0}")]
77 Decryption(String),
78 #[error("Content size {actual_size} exceeds max_size {max_size}")]
79 SizeLimitExceeded { max_size: u64, actual_size: u64 },
80}
81
82impl From<BuilderError> for HashTreeError {
83 fn from(e: BuilderError) -> Self {
84 match e {
85 BuilderError::Store(s) => HashTreeError::Store(s),
86 BuilderError::Codec(c) => HashTreeError::Codec(c),
87 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
88 }
89 }
90}
91
92impl From<ReaderError> for HashTreeError {
93 fn from(e: ReaderError) -> Self {
94 match e {
95 ReaderError::Store(s) => HashTreeError::Store(s),
96 ReaderError::Codec(c) => HashTreeError::Codec(c),
97 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
98 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
99 ReaderError::MissingKey => {
100 HashTreeError::Encryption("missing decryption key".to_string())
101 }
102 }
103 }
104}
105
106pub struct HashTree<S: Store> {
108 store: Arc<S>,
109 chunk_size: usize,
110 max_links: usize,
111 encrypted: bool,
112}
113
114impl<S: Store> HashTree<S> {
115 pub fn new(config: HashTreeConfig<S>) -> Self {
116 Self {
117 store: config.store,
118 chunk_size: config.chunk_size,
119 max_links: config.max_links,
120 encrypted: config.encrypted,
121 }
122 }
123
124 pub fn is_encrypted(&self) -> bool {
126 self.encrypted
127 }
128
129 pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
134 let size = data.len() as u64;
135
136 if data.len() <= self.chunk_size {
138 let (hash, key) = self.put_chunk_internal(data).await?;
139 return Ok((Cid { hash, key }, size));
140 }
141
142 let mut links: Vec<Link> = Vec::new();
144 let mut offset = 0;
145
146 while offset < data.len() {
147 let end = (offset + self.chunk_size).min(data.len());
148 let chunk = &data[offset..end];
149 let chunk_size = chunk.len() as u64;
150 let (hash, key) = self.put_chunk_internal(chunk).await?;
151 links.push(Link {
152 hash,
153 name: None,
154 size: chunk_size,
155 key,
156 link_type: LinkType::Blob, meta: None,
158 });
159 offset = end;
160 }
161
162 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
164 Ok((
165 Cid {
166 hash: root_hash,
167 key: root_key,
168 },
169 size,
170 ))
171 }
172
173 pub async fn get(
178 &self,
179 cid: &Cid,
180 max_size: Option<u64>,
181 ) -> Result<Option<Vec<u8>>, HashTreeError> {
182 if let Some(key) = cid.key {
183 self.get_encrypted(&cid.hash, &key, max_size).await
184 } else {
185 self.read_file_with_limit(&cid.hash, max_size).await
186 }
187 }
188
189 pub async fn put_stream<R: AsyncRead + Unpin>(
195 &self,
196 mut reader: R,
197 ) -> Result<(Cid, u64), HashTreeError> {
198 let mut buffer = vec![0u8; self.chunk_size];
199 let mut links = Vec::new();
200 let mut total_size: u64 = 0;
201 let mut consistent_key: Option<[u8; 32]> = None;
202
203 loop {
204 let mut chunk = Vec::new();
205 let mut bytes_read = 0;
206
207 while bytes_read < self.chunk_size {
209 let n = reader
210 .read(&mut buffer[..self.chunk_size - bytes_read])
211 .await
212 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
213 if n == 0 {
214 break; }
216 chunk.extend_from_slice(&buffer[..n]);
217 bytes_read += n;
218 }
219
220 if chunk.is_empty() {
221 break; }
223
224 let chunk_len = chunk.len() as u64;
225 total_size += chunk_len;
226
227 let (hash, key) = self.put_chunk_internal(&chunk).await?;
228
229 if links.is_empty() {
231 consistent_key = key;
232 } else if consistent_key != key {
233 consistent_key = None;
234 }
235
236 links.push(Link {
237 hash,
238 name: None,
239 size: chunk_len,
240 key,
241 link_type: LinkType::Blob, meta: None,
243 });
244 }
245
246 if links.is_empty() {
247 let (hash, key) = self.put_chunk_internal(&[]).await?;
249 return Ok((Cid { hash, key }, 0));
250 }
251
252 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
254 Ok((
255 Cid {
256 hash: root_hash,
257 key: root_key,
258 },
259 total_size,
260 ))
261 }
262
263 pub fn get_stream(
268 &self,
269 cid: &Cid,
270 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
271 let hash = cid.hash;
272 let key = cid.key;
273
274 if let Some(k) = key {
275 Box::pin(self.read_file_stream_encrypted(hash, k))
277 } else {
278 self.read_file_stream(hash)
280 }
281 }
282
283 fn read_file_stream_encrypted(
285 &self,
286 hash: Hash,
287 key: EncryptionKey,
288 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
289 stream::unfold(
290 EncryptedStreamState::Init {
291 hash,
292 key,
293 tree: self,
294 },
295 |state| async move {
296 match state {
297 EncryptedStreamState::Init { hash, key, tree } => {
298 let data = match tree.store.get(&hash).await {
299 Ok(Some(d)) => d,
300 Ok(None) => return None,
301 Err(e) => {
302 return Some((
303 Err(HashTreeError::Store(e.to_string())),
304 EncryptedStreamState::Done,
305 ))
306 }
307 };
308
309 let decrypted = match decrypt_chk(&data, &key) {
311 Ok(d) => d,
312 Err(e) => {
313 return Some((
314 Err(HashTreeError::Decryption(e.to_string())),
315 EncryptedStreamState::Done,
316 ))
317 }
318 };
319
320 if !is_tree_node(&decrypted) {
321 return Some((Ok(decrypted), EncryptedStreamState::Done));
323 }
324
325 let node = match decode_tree_node(&decrypted) {
327 Ok(n) => n,
328 Err(e) => {
329 return Some((
330 Err(HashTreeError::Codec(e)),
331 EncryptedStreamState::Done,
332 ))
333 }
334 };
335
336 let mut stack: Vec<EncryptedStackItem> = Vec::new();
337 for link in node.links.into_iter().rev() {
338 stack.push(EncryptedStackItem {
339 hash: link.hash,
340 key: link.key,
341 });
342 }
343
344 tree.process_encrypted_stream_stack(&mut stack).await
345 }
346 EncryptedStreamState::Processing { mut stack, tree } => {
347 tree.process_encrypted_stream_stack(&mut stack).await
348 }
349 EncryptedStreamState::Done => None,
350 }
351 },
352 )
353 }
354
355 async fn process_encrypted_stream_stack<'a>(
356 &'a self,
357 stack: &mut Vec<EncryptedStackItem>,
358 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
359 while let Some(item) = stack.pop() {
360 let data = match self.store.get(&item.hash).await {
361 Ok(Some(d)) => d,
362 Ok(None) => {
363 return Some((
364 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
365 EncryptedStreamState::Done,
366 ))
367 }
368 Err(e) => {
369 return Some((
370 Err(HashTreeError::Store(e.to_string())),
371 EncryptedStreamState::Done,
372 ))
373 }
374 };
375
376 let decrypted = if let Some(key) = item.key {
378 match decrypt_chk(&data, &key) {
379 Ok(d) => d,
380 Err(e) => {
381 return Some((
382 Err(HashTreeError::Decryption(e.to_string())),
383 EncryptedStreamState::Done,
384 ))
385 }
386 }
387 } else {
388 data
389 };
390
391 if is_tree_node(&decrypted) {
392 let node = match decode_tree_node(&decrypted) {
394 Ok(n) => n,
395 Err(e) => {
396 return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done))
397 }
398 };
399 for link in node.links.into_iter().rev() {
400 stack.push(EncryptedStackItem {
401 hash: link.hash,
402 key: link.key,
403 });
404 }
405 } else {
406 return Some((
408 Ok(decrypted),
409 EncryptedStreamState::Processing {
410 stack: std::mem::take(stack),
411 tree: self,
412 },
413 ));
414 }
415 }
416 None
417 }
418
419 async fn put_chunk_internal(
421 &self,
422 data: &[u8],
423 ) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
424 if self.encrypted {
425 let (encrypted, key) =
426 encrypt_chk(data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
427 let hash = sha256(&encrypted);
428 self.store
429 .put(hash, encrypted)
430 .await
431 .map_err(|e| HashTreeError::Store(e.to_string()))?;
432 Ok((hash, Some(key)))
433 } else {
434 let hash = self.put_blob(data).await?;
435 Ok((hash, None))
436 }
437 }
438
439 async fn build_tree_internal(
441 &self,
442 links: Vec<Link>,
443 total_size: Option<u64>,
444 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
445 if links.len() == 1 {
447 if let Some(ts) = total_size {
448 if links[0].size == ts {
449 return Ok((links[0].hash, links[0].key));
450 }
451 }
452 }
453
454 if links.len() <= self.max_links {
455 let node = TreeNode {
456 node_type: LinkType::File,
457 links,
458 };
459 let (data, _) = encode_and_hash(&node)?;
460
461 if self.encrypted {
462 let (encrypted, key) =
463 encrypt_chk(&data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
464 let hash = sha256(&encrypted);
465 self.store
466 .put(hash, encrypted)
467 .await
468 .map_err(|e| HashTreeError::Store(e.to_string()))?;
469 return Ok((hash, Some(key)));
470 }
471
472 let hash = sha256(&data);
474 self.store
475 .put(hash, data)
476 .await
477 .map_err(|e| HashTreeError::Store(e.to_string()))?;
478 return Ok((hash, None));
479 }
480
481 let mut sub_links = Vec::new();
483 for batch in links.chunks(self.max_links) {
484 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
485 let (hash, key) =
486 Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
487 sub_links.push(Link {
488 hash,
489 name: None,
490 size: batch_size,
491 key,
492 link_type: LinkType::File, meta: None,
494 });
495 }
496
497 Box::pin(self.build_tree_internal(sub_links, total_size)).await
498 }
499
500 async fn get_encrypted(
502 &self,
503 hash: &Hash,
504 key: &EncryptionKey,
505 max_size: Option<u64>,
506 ) -> Result<Option<Vec<u8>>, HashTreeError> {
507 let encrypted_data = match self
508 .store
509 .get(hash)
510 .await
511 .map_err(|e| HashTreeError::Store(e.to_string()))?
512 {
513 Some(d) => d,
514 None => return Ok(None),
515 };
516
517 let decrypted = decrypt_chk(&encrypted_data, key)
519 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
520
521 if is_tree_node(&decrypted) {
523 let node = decode_tree_node(&decrypted)?;
524 let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
525 Self::ensure_size_limit(max_size, declared_size)?;
526
527 let mut bytes_read = 0u64;
528 let assembled = self
529 .assemble_encrypted_chunks_limited(&node, max_size, &mut bytes_read)
530 .await?;
531 return Ok(Some(assembled));
532 }
533
534 Self::ensure_size_limit(max_size, decrypted.len() as u64)?;
536 Ok(Some(decrypted))
537 }
538
539 fn ensure_size_limit(max_size: Option<u64>, actual_size: u64) -> Result<(), HashTreeError> {
540 if let Some(max_size) = max_size {
541 if actual_size > max_size {
542 return Err(HashTreeError::SizeLimitExceeded {
543 max_size,
544 actual_size,
545 });
546 }
547 }
548 Ok(())
549 }
550
551 async fn assemble_encrypted_chunks_limited(
553 &self,
554 node: &TreeNode,
555 max_size: Option<u64>,
556 bytes_read: &mut u64,
557 ) -> Result<Vec<u8>, HashTreeError> {
558 let mut parts: Vec<Vec<u8>> = Vec::new();
559
560 for link in &node.links {
561 let projected = (*bytes_read).saturating_add(link.size);
562 Self::ensure_size_limit(max_size, projected)?;
563
564 let chunk_key = link
565 .key
566 .ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
567
568 let encrypted_child = self
569 .store
570 .get(&link.hash)
571 .await
572 .map_err(|e| HashTreeError::Store(e.to_string()))?
573 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
574
575 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
576 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
577
578 if is_tree_node(&decrypted) {
579 let child_node = decode_tree_node(&decrypted)?;
581 let child_data = Box::pin(self.assemble_encrypted_chunks_limited(
582 &child_node,
583 max_size,
584 bytes_read,
585 ))
586 .await?;
587 parts.push(child_data);
588 } else {
589 let projected = (*bytes_read).saturating_add(decrypted.len() as u64);
591 Self::ensure_size_limit(max_size, projected)?;
592 *bytes_read = projected;
593 parts.push(decrypted);
594 }
595 }
596
597 let total_len: usize = parts.iter().map(|p| p.len()).sum();
598 let mut result = Vec::with_capacity(total_len);
599 for part in parts {
600 result.extend_from_slice(&part);
601 }
602
603 Ok(result)
604 }
605
606 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
611 let hash = sha256(data);
612 self.store
613 .put(hash, data.to_vec())
614 .await
615 .map_err(|e| HashTreeError::Store(e.to_string()))?;
616 Ok(hash)
617 }
618
619 pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
622 let size = data.len() as u64;
623
624 if data.len() <= self.chunk_size {
626 let (hash, key) = self.put_chunk_internal(data).await?;
627 return Ok((Cid { hash, key }, size));
628 }
629
630 let mut links: Vec<Link> = Vec::new();
632 let mut offset = 0;
633
634 while offset < data.len() {
635 let end = (offset + self.chunk_size).min(data.len());
636 let chunk = &data[offset..end];
637 let chunk_size = (end - offset) as u64;
638
639 let (hash, key) = self.put_chunk_internal(chunk).await?;
640 links.push(Link {
641 hash,
642 name: None,
643 size: chunk_size,
644 key,
645 link_type: LinkType::Blob, meta: None,
647 });
648 offset = end;
649 }
650
651 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
653 Ok((
654 Cid {
655 hash: root_hash,
656 key: root_key,
657 },
658 size,
659 ))
660 }
661
662 pub async fn put_directory(&self, entries: Vec<DirEntry>) -> Result<Cid, HashTreeError> {
668 let mut sorted = entries;
670 sorted.sort_by(|a, b| a.name.cmp(&b.name));
671
672 let links: Vec<Link> = sorted
673 .into_iter()
674 .map(|e| Link {
675 hash: e.hash,
676 name: Some(e.name),
677 size: e.size,
678 key: e.key,
679 link_type: e.link_type,
680 meta: e.meta,
681 })
682 .collect();
683
684 let node = TreeNode {
686 node_type: LinkType::Dir,
687 links,
688 };
689 let (data, _plain_hash) = encode_and_hash(&node)?;
690
691 let (cid, _size) = self.put(&data).await?;
696 Ok(cid)
697 }
698
699 pub async fn put_tree_node(&self, links: Vec<Link>) -> Result<Hash, HashTreeError> {
701 let node = TreeNode {
702 node_type: LinkType::Dir,
703 links,
704 };
705
706 let (data, hash) = encode_and_hash(&node)?;
707 self.store
708 .put(hash, data)
709 .await
710 .map_err(|e| HashTreeError::Store(e.to_string()))?;
711 Ok(hash)
712 }
713
714 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
718 self.store
719 .get(hash)
720 .await
721 .map_err(|e| HashTreeError::Store(e.to_string()))
722 }
723
724 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
726 let data = match self
727 .store
728 .get(hash)
729 .await
730 .map_err(|e| HashTreeError::Store(e.to_string()))?
731 {
732 Some(d) => d,
733 None => return Ok(None),
734 };
735
736 if !is_tree_node(&data) {
737 return Ok(None);
738 }
739
740 let node = decode_tree_node(&data)?;
741 Ok(Some(node))
742 }
743
744 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
746 let data = match self
747 .store
748 .get(&cid.hash)
749 .await
750 .map_err(|e| HashTreeError::Store(e.to_string()))?
751 {
752 Some(d) => d,
753 None => return Ok(None),
754 };
755
756 let decrypted = if let Some(key) = &cid.key {
758 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
759 } else {
760 data
761 };
762
763 if !is_tree_node(&decrypted) {
764 return Ok(None);
765 }
766
767 let node = decode_tree_node(&decrypted)?;
768 Ok(Some(node))
769 }
770
771 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
774 let data = match self
775 .store
776 .get(&cid.hash)
777 .await
778 .map_err(|e| HashTreeError::Store(e.to_string()))?
779 {
780 Some(d) => d,
781 None => return Ok(None),
782 };
783
784 let decrypted = if let Some(key) = &cid.key {
786 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
787 } else {
788 data
789 };
790
791 if !is_tree_node(&decrypted) {
792 return Ok(None);
793 }
794
795 let node = decode_tree_node(&decrypted)?;
796
797 if node.node_type == LinkType::File {
799 let mut bytes_read = 0u64;
800 let assembled = self
801 .assemble_chunks_limited(&node, None, &mut bytes_read)
802 .await?;
803 if is_tree_node(&assembled) {
804 let inner_node = decode_tree_node(&assembled)?;
805 return Ok(Some(inner_node));
806 }
807 }
808
809 Ok(Some(node))
810 }
811
812 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
814 let data = match self
815 .store
816 .get(hash)
817 .await
818 .map_err(|e| HashTreeError::Store(e.to_string()))?
819 {
820 Some(d) => d,
821 None => return Ok(false),
822 };
823 Ok(is_tree_node(&data))
824 }
825
826 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
828 let node = match self.get_node(cid).await? {
829 Some(n) => n,
830 None => return Ok(false),
831 };
832 Ok(node.links.iter().any(|l| {
834 l.name
835 .as_ref()
836 .map(|n| !n.starts_with('_'))
837 .unwrap_or(false)
838 }))
839 }
840
841 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
843 let data = match self
844 .store
845 .get(hash)
846 .await
847 .map_err(|e| HashTreeError::Store(e.to_string()))?
848 {
849 Some(d) => d,
850 None => return Ok(false),
851 };
852 Ok(is_directory_node(&data))
853 }
854
855 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
857 self.read_file_with_limit(hash, None).await
858 }
859
860 async fn read_file_with_limit(
862 &self,
863 hash: &Hash,
864 max_size: Option<u64>,
865 ) -> Result<Option<Vec<u8>>, HashTreeError> {
866 let data = match self
867 .store
868 .get(hash)
869 .await
870 .map_err(|e| HashTreeError::Store(e.to_string()))?
871 {
872 Some(d) => d,
873 None => return Ok(None),
874 };
875
876 if !is_tree_node(&data) {
878 Self::ensure_size_limit(max_size, data.len() as u64)?;
879 return Ok(Some(data));
880 }
881
882 let node = decode_tree_node(&data)?;
884 let declared_size: u64 = node.links.iter().map(|l| l.size).sum();
885 Self::ensure_size_limit(max_size, declared_size)?;
886
887 let mut bytes_read = 0u64;
888 let assembled = self
889 .assemble_chunks_limited(&node, max_size, &mut bytes_read)
890 .await?;
891 Ok(Some(assembled))
892 }
893
894 pub async fn read_file_range(
901 &self,
902 hash: &Hash,
903 start: u64,
904 end: Option<u64>,
905 ) -> Result<Option<Vec<u8>>, HashTreeError> {
906 let data = match self
907 .store
908 .get(hash)
909 .await
910 .map_err(|e| HashTreeError::Store(e.to_string()))?
911 {
912 Some(d) => d,
913 None => return Ok(None),
914 };
915
916 if !is_tree_node(&data) {
918 let start_idx = start as usize;
919 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
920 if start_idx >= data.len() {
921 return Ok(Some(vec![]));
922 }
923 let end_idx = end_idx.min(data.len());
924 return Ok(Some(data[start_idx..end_idx].to_vec()));
925 }
926
927 let node = decode_tree_node(&data)?;
929 let range_data = self.assemble_chunks_range(&node, start, end).await?;
930 Ok(Some(range_data))
931 }
932
933 pub async fn read_file_range_cid(
935 &self,
936 cid: &Cid,
937 start: u64,
938 end: Option<u64>,
939 ) -> Result<Option<Vec<u8>>, HashTreeError> {
940 if let Some(_key) = cid.key {
941 let data = match self.get(cid, None).await? {
942 Some(d) => d,
943 None => return Ok(None),
944 };
945 let start_idx = start as usize;
946 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
947 if start_idx >= data.len() {
948 return Ok(Some(vec![]));
949 }
950 let end_idx = end_idx.min(data.len());
951 return Ok(Some(data[start_idx..end_idx].to_vec()));
952 }
953
954 self.read_file_range(&cid.hash, start, end).await
955 }
956
957 async fn assemble_chunks_range(
959 &self,
960 node: &TreeNode,
961 start: u64,
962 end: Option<u64>,
963 ) -> Result<Vec<u8>, HashTreeError> {
964 let chunks_info = self.collect_chunk_offsets(node).await?;
966
967 if chunks_info.is_empty() {
968 return Ok(vec![]);
969 }
970
971 let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
973 let actual_end = end.unwrap_or(total_size).min(total_size);
974
975 if start >= actual_end {
976 return Ok(vec![]);
977 }
978
979 let mut result = Vec::with_capacity((actual_end - start) as usize);
981 let mut current_offset = 0u64;
982
983 for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
984 let chunk_start = current_offset;
985 let chunk_end = current_offset + chunk_size;
986
987 if chunk_end > start && chunk_start < actual_end {
989 let chunk_data = self
991 .store
992 .get(chunk_hash)
993 .await
994 .map_err(|e| HashTreeError::Store(e.to_string()))?
995 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
996
997 let slice_start = if start > chunk_start {
999 (start - chunk_start) as usize
1000 } else {
1001 0
1002 };
1003 let slice_end = if actual_end < chunk_end {
1004 (actual_end - chunk_start) as usize
1005 } else {
1006 chunk_data.len()
1007 };
1008
1009 result.extend_from_slice(&chunk_data[slice_start..slice_end]);
1010 }
1011
1012 current_offset = chunk_end;
1013
1014 if current_offset >= actual_end {
1016 break;
1017 }
1018 }
1019
1020 Ok(result)
1021 }
1022
1023 async fn collect_chunk_offsets(
1026 &self,
1027 node: &TreeNode,
1028 ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
1029 let mut chunks = Vec::new();
1030 let mut offset = 0u64;
1031 self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset)
1032 .await?;
1033 Ok(chunks)
1034 }
1035
1036 async fn collect_chunk_offsets_recursive(
1037 &self,
1038 node: &TreeNode,
1039 chunks: &mut Vec<(Hash, u64, u64)>,
1040 offset: &mut u64,
1041 ) -> Result<(), HashTreeError> {
1042 for link in &node.links {
1043 let child_data = self
1044 .store
1045 .get(&link.hash)
1046 .await
1047 .map_err(|e| HashTreeError::Store(e.to_string()))?
1048 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1049
1050 if is_tree_node(&child_data) {
1051 let child_node = decode_tree_node(&child_data)?;
1053 Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
1054 } else {
1055 let size = child_data.len() as u64;
1057 chunks.push((link.hash, *offset, size));
1058 *offset += size;
1059 }
1060 }
1061 Ok(())
1062 }
1063
1064 async fn assemble_chunks_limited(
1066 &self,
1067 node: &TreeNode,
1068 max_size: Option<u64>,
1069 bytes_read: &mut u64,
1070 ) -> Result<Vec<u8>, HashTreeError> {
1071 let mut parts: Vec<Vec<u8>> = Vec::new();
1072
1073 for link in &node.links {
1074 let projected = (*bytes_read).saturating_add(link.size);
1075 Self::ensure_size_limit(max_size, projected)?;
1076
1077 let child_data = self
1078 .store
1079 .get(&link.hash)
1080 .await
1081 .map_err(|e| HashTreeError::Store(e.to_string()))?
1082 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1083
1084 if is_tree_node(&child_data) {
1085 let child_node = decode_tree_node(&child_data)?;
1086 parts.push(
1087 Box::pin(self.assemble_chunks_limited(&child_node, max_size, bytes_read))
1088 .await?,
1089 );
1090 } else {
1091 let projected = (*bytes_read).saturating_add(child_data.len() as u64);
1092 Self::ensure_size_limit(max_size, projected)?;
1093 *bytes_read = projected;
1094 parts.push(child_data);
1095 }
1096 }
1097
1098 let total_length: usize = parts.iter().map(|p| p.len()).sum();
1100 let mut result = Vec::with_capacity(total_length);
1101 for part in parts {
1102 result.extend_from_slice(&part);
1103 }
1104
1105 Ok(result)
1106 }
1107
1108 pub fn read_file_stream(
1111 &self,
1112 hash: Hash,
1113 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
1114 Box::pin(stream::unfold(
1115 ReadStreamState::Init { hash, tree: self },
1116 |state| async move {
1117 match state {
1118 ReadStreamState::Init { hash, tree } => {
1119 let data = match tree.store.get(&hash).await {
1120 Ok(Some(d)) => d,
1121 Ok(None) => return None,
1122 Err(e) => {
1123 return Some((
1124 Err(HashTreeError::Store(e.to_string())),
1125 ReadStreamState::Done,
1126 ))
1127 }
1128 };
1129
1130 if !is_tree_node(&data) {
1131 return Some((Ok(data), ReadStreamState::Done));
1133 }
1134
1135 let node = match decode_tree_node(&data) {
1137 Ok(n) => n,
1138 Err(e) => {
1139 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1140 }
1141 };
1142
1143 let mut stack: Vec<StreamStackItem> = Vec::new();
1145 for link in node.links.into_iter().rev() {
1146 stack.push(StreamStackItem::Hash(link.hash));
1147 }
1148
1149 tree.process_stream_stack(&mut stack).await
1151 }
1152 ReadStreamState::Processing { mut stack, tree } => {
1153 tree.process_stream_stack(&mut stack).await
1154 }
1155 ReadStreamState::Done => None,
1156 }
1157 },
1158 ))
1159 }
1160
1161 async fn process_stream_stack<'a>(
1162 &'a self,
1163 stack: &mut Vec<StreamStackItem>,
1164 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
1165 while let Some(item) = stack.pop() {
1166 match item {
1167 StreamStackItem::Hash(hash) => {
1168 let data = match self.store.get(&hash).await {
1169 Ok(Some(d)) => d,
1170 Ok(None) => {
1171 return Some((
1172 Err(HashTreeError::MissingChunk(to_hex(&hash))),
1173 ReadStreamState::Done,
1174 ))
1175 }
1176 Err(e) => {
1177 return Some((
1178 Err(HashTreeError::Store(e.to_string())),
1179 ReadStreamState::Done,
1180 ))
1181 }
1182 };
1183
1184 if is_tree_node(&data) {
1185 let node = match decode_tree_node(&data) {
1187 Ok(n) => n,
1188 Err(e) => {
1189 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1190 }
1191 };
1192 for link in node.links.into_iter().rev() {
1193 stack.push(StreamStackItem::Hash(link.hash));
1194 }
1195 } else {
1196 return Some((
1198 Ok(data),
1199 ReadStreamState::Processing {
1200 stack: std::mem::take(stack),
1201 tree: self,
1202 },
1203 ));
1204 }
1205 }
1206 }
1207 }
1208 None
1209 }
1210
1211 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1213 let data = match self
1214 .store
1215 .get(hash)
1216 .await
1217 .map_err(|e| HashTreeError::Store(e.to_string()))?
1218 {
1219 Some(d) => d,
1220 None => return Ok(vec![]),
1221 };
1222
1223 if !is_tree_node(&data) {
1224 return Ok(vec![data]);
1225 }
1226
1227 let node = decode_tree_node(&data)?;
1228 self.collect_chunks(&node).await
1229 }
1230
1231 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1232 let mut chunks = Vec::new();
1233
1234 for link in &node.links {
1235 let child_data = self
1236 .store
1237 .get(&link.hash)
1238 .await
1239 .map_err(|e| HashTreeError::Store(e.to_string()))?
1240 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1241
1242 if is_tree_node(&child_data) {
1243 let child_node = decode_tree_node(&child_data)?;
1244 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1245 } else {
1246 chunks.push(child_data);
1247 }
1248 }
1249
1250 Ok(chunks)
1251 }
1252
1253 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1255 let node = match self.get_node(cid).await? {
1256 Some(n) => n,
1257 None => return Ok(vec![]),
1258 };
1259
1260 let mut entries = Vec::new();
1261
1262 for link in &node.links {
1263 if let Some(ref name) = link.name {
1265 if name.starts_with("_chunk_") || name.starts_with('_') {
1266 let chunk_cid = Cid {
1267 hash: link.hash,
1268 key: link.key,
1269 };
1270 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1271 entries.extend(sub_entries);
1272 continue;
1273 }
1274 }
1275
1276 entries.push(TreeEntry {
1277 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1278 hash: link.hash,
1279 size: link.size,
1280 link_type: link.link_type,
1281 key: link.key,
1282 meta: link.meta.clone(),
1283 });
1284 }
1285
1286 Ok(entries)
1287 }
1288
1289 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1292 let node = match self.get_directory_node(cid).await? {
1294 Some(n) => n,
1295 None => return Ok(vec![]),
1296 };
1297
1298 let mut entries = Vec::new();
1299
1300 for link in &node.links {
1301 if let Some(ref name) = link.name {
1303 if name.starts_with("_chunk_") || name.starts_with('_') {
1304 let sub_cid = Cid {
1306 hash: link.hash,
1307 key: cid.key,
1308 };
1309 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1310 entries.extend(sub_entries);
1311 continue;
1312 }
1313 }
1314
1315 entries.push(TreeEntry {
1316 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1317 hash: link.hash,
1318 size: link.size,
1319 link_type: link.link_type,
1320 key: link.key,
1321 meta: link.meta.clone(),
1322 });
1323 }
1324
1325 Ok(entries)
1326 }
1327
1328 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1330 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1331 if parts.is_empty() {
1332 return Ok(Some(cid.clone()));
1333 }
1334
1335 let mut current_cid = cid.clone();
1336
1337 for part in parts {
1338 let node = match self.get_directory_node(¤t_cid).await? {
1340 Some(n) => n,
1341 None => return Ok(None),
1342 };
1343
1344 if let Some(link) = self.find_link(&node, part) {
1345 current_cid = Cid {
1346 hash: link.hash,
1347 key: link.key,
1348 };
1349 } else {
1350 match self
1352 .find_link_in_subtrees_cid(&node, part, ¤t_cid)
1353 .await?
1354 {
1355 Some(link) => {
1356 current_cid = Cid {
1357 hash: link.hash,
1358 key: link.key,
1359 };
1360 }
1361 None => return Ok(None),
1362 }
1363 }
1364 }
1365
1366 Ok(Some(current_cid))
1367 }
1368
1369 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1371 self.resolve(cid, path).await
1372 }
1373
1374 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1375 node.links
1376 .iter()
1377 .find(|l| l.name.as_deref() == Some(name))
1378 .cloned()
1379 }
1380
1381 async fn find_link_in_subtrees_cid(
1383 &self,
1384 node: &TreeNode,
1385 name: &str,
1386 _parent_cid: &Cid,
1387 ) -> Result<Option<Link>, HashTreeError> {
1388 for link in &node.links {
1389 if !link
1390 .name
1391 .as_ref()
1392 .map(|n| n.starts_with('_'))
1393 .unwrap_or(false)
1394 {
1395 continue;
1396 }
1397
1398 let sub_cid = Cid {
1400 hash: link.hash,
1401 key: link.key.clone(),
1402 };
1403
1404 let sub_node = match self.get_node(&sub_cid).await? {
1405 Some(n) => n,
1406 None => continue,
1407 };
1408
1409 if let Some(found) = self.find_link(&sub_node, name) {
1410 return Ok(Some(found));
1411 }
1412
1413 if let Some(deep_found) =
1414 Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await?
1415 {
1416 return Ok(Some(deep_found));
1417 }
1418 }
1419
1420 Ok(None)
1421 }
1422
1423 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1425 let data = match self
1426 .store
1427 .get(hash)
1428 .await
1429 .map_err(|e| HashTreeError::Store(e.to_string()))?
1430 {
1431 Some(d) => d,
1432 None => return Ok(0),
1433 };
1434
1435 if !is_tree_node(&data) {
1436 return Ok(data.len() as u64);
1437 }
1438
1439 let node = decode_tree_node(&data)?;
1440 let mut total = 0u64;
1442 for link in &node.links {
1443 total += link.size;
1444 }
1445 Ok(total)
1446 }
1447
1448 pub async fn get_size_cid(&self, cid: &Cid) -> Result<u64, HashTreeError> {
1450 if cid.key.is_some() {
1451 let data = match self.get(cid, None).await? {
1452 Some(d) => d,
1453 None => return Ok(0),
1454 };
1455 return Ok(data.len() as u64);
1456 }
1457
1458 self.get_size(&cid.hash).await
1459 }
1460
1461 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1463 let mut entries = Vec::new();
1464 self.walk_recursive(cid, path, &mut entries).await?;
1465 Ok(entries)
1466 }
1467
1468 async fn walk_recursive(
1469 &self,
1470 cid: &Cid,
1471 path: &str,
1472 entries: &mut Vec<WalkEntry>,
1473 ) -> Result<(), HashTreeError> {
1474 let data = match self
1475 .store
1476 .get(&cid.hash)
1477 .await
1478 .map_err(|e| HashTreeError::Store(e.to_string()))?
1479 {
1480 Some(d) => d,
1481 None => return Ok(()),
1482 };
1483
1484 let data = if let Some(key) = &cid.key {
1486 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1487 } else {
1488 data
1489 };
1490
1491 let node = match try_decode_tree_node(&data) {
1492 Some(n) => n,
1493 None => {
1494 entries.push(WalkEntry {
1495 path: path.to_string(),
1496 hash: cid.hash,
1497 link_type: LinkType::Blob,
1498 size: data.len() as u64,
1499 key: cid.key,
1500 });
1501 return Ok(());
1502 }
1503 };
1504
1505 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1506 entries.push(WalkEntry {
1507 path: path.to_string(),
1508 hash: cid.hash,
1509 link_type: node.node_type,
1510 size: node_size,
1511 key: cid.key,
1512 });
1513
1514 for link in &node.links {
1515 let child_path = match &link.name {
1516 Some(name) => {
1517 if name.starts_with("_chunk_") || name.starts_with('_') {
1518 let sub_cid = Cid {
1520 hash: link.hash,
1521 key: cid.key,
1522 };
1523 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1524 continue;
1525 }
1526 if path.is_empty() {
1527 name.clone()
1528 } else {
1529 format!("{}/{}", path, name)
1530 }
1531 }
1532 None => path.to_string(),
1533 };
1534
1535 let child_cid = Cid {
1537 hash: link.hash,
1538 key: link.key,
1539 };
1540 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1541 }
1542
1543 Ok(())
1544 }
1545
1546 pub async fn walk_parallel(
1549 &self,
1550 cid: &Cid,
1551 path: &str,
1552 concurrency: usize,
1553 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1554 self.walk_parallel_with_progress(cid, path, concurrency, None)
1555 .await
1556 }
1557
1558 pub async fn walk_parallel_with_progress(
1565 &self,
1566 cid: &Cid,
1567 path: &str,
1568 concurrency: usize,
1569 progress: Option<&std::sync::atomic::AtomicUsize>,
1570 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1571 use futures::stream::{FuturesUnordered, StreamExt};
1572 use std::collections::VecDeque;
1573 use std::sync::atomic::Ordering;
1574
1575 let mut entries = Vec::new();
1576 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1577 let mut active = FuturesUnordered::new();
1578
1579 pending.push_back((cid.clone(), path.to_string()));
1581
1582 loop {
1583 while active.len() < concurrency {
1585 if let Some((node_cid, node_path)) = pending.pop_front() {
1586 let store = &self.store;
1587 let fut = async move {
1588 let data = store
1589 .get(&node_cid.hash)
1590 .await
1591 .map_err(|e| HashTreeError::Store(e.to_string()))?;
1592 Ok::<_, HashTreeError>((node_cid, node_path, data))
1593 };
1594 active.push(fut);
1595 } else {
1596 break;
1597 }
1598 }
1599
1600 if active.is_empty() {
1602 break;
1603 }
1604
1605 if let Some(result) = active.next().await {
1607 let (node_cid, node_path, data) = result?;
1608
1609 if let Some(counter) = progress {
1611 counter.fetch_add(1, Ordering::Relaxed);
1612 }
1613
1614 let data = match data {
1615 Some(d) => d,
1616 None => continue,
1617 };
1618
1619 let data = if let Some(key) = &node_cid.key {
1621 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1622 } else {
1623 data
1624 };
1625
1626 let node = match try_decode_tree_node(&data) {
1627 Some(n) => n,
1628 None => {
1629 entries.push(WalkEntry {
1631 path: node_path,
1632 hash: node_cid.hash,
1633 link_type: LinkType::Blob,
1634 size: data.len() as u64,
1635 key: node_cid.key,
1636 });
1637 continue;
1638 }
1639 };
1640
1641 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1643 entries.push(WalkEntry {
1644 path: node_path.clone(),
1645 hash: node_cid.hash,
1646 link_type: node.node_type,
1647 size: node_size,
1648 key: node_cid.key,
1649 });
1650
1651 for link in &node.links {
1653 let child_path = match &link.name {
1654 Some(name) => {
1655 if name.starts_with("_chunk_") || name.starts_with('_') {
1656 let sub_cid = Cid {
1658 hash: link.hash,
1659 key: node_cid.key,
1660 };
1661 pending.push_back((sub_cid, node_path.clone()));
1662 continue;
1663 }
1664 if node_path.is_empty() {
1665 name.clone()
1666 } else {
1667 format!("{}/{}", node_path, name)
1668 }
1669 }
1670 None => node_path.clone(),
1671 };
1672
1673 if link.link_type == LinkType::Blob {
1676 entries.push(WalkEntry {
1677 path: child_path,
1678 hash: link.hash,
1679 link_type: LinkType::Blob,
1680 size: link.size,
1681 key: link.key,
1682 });
1683 if let Some(counter) = progress {
1684 counter.fetch_add(1, Ordering::Relaxed);
1685 }
1686 continue;
1687 }
1688
1689 let child_cid = Cid {
1691 hash: link.hash,
1692 key: link.key,
1693 };
1694 pending.push_back((child_cid, child_path));
1695 }
1696 }
1697 }
1698
1699 Ok(entries)
1700 }
1701
1702 pub fn walk_stream(
1704 &self,
1705 cid: Cid,
1706 initial_path: String,
1707 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1708 Box::pin(stream::unfold(
1709 WalkStreamState::Init {
1710 cid,
1711 path: initial_path,
1712 tree: self,
1713 },
1714 |state| async move {
1715 match state {
1716 WalkStreamState::Init { cid, path, tree } => {
1717 let data = match tree.store.get(&cid.hash).await {
1718 Ok(Some(d)) => d,
1719 Ok(None) => return None,
1720 Err(e) => {
1721 return Some((
1722 Err(HashTreeError::Store(e.to_string())),
1723 WalkStreamState::Done,
1724 ))
1725 }
1726 };
1727
1728 let data = if let Some(key) = &cid.key {
1730 match decrypt_chk(&data, key) {
1731 Ok(d) => d,
1732 Err(e) => {
1733 return Some((
1734 Err(HashTreeError::Decryption(e.to_string())),
1735 WalkStreamState::Done,
1736 ))
1737 }
1738 }
1739 } else {
1740 data
1741 };
1742
1743 let node = match try_decode_tree_node(&data) {
1744 Some(n) => n,
1745 None => {
1746 let entry = WalkEntry {
1748 path,
1749 hash: cid.hash,
1750 link_type: LinkType::Blob,
1751 size: data.len() as u64,
1752 key: cid.key,
1753 };
1754 return Some((Ok(entry), WalkStreamState::Done));
1755 }
1756 };
1757
1758 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1759 let entry = WalkEntry {
1760 path: path.clone(),
1761 hash: cid.hash,
1762 link_type: node.node_type,
1763 size: node_size,
1764 key: cid.key,
1765 };
1766
1767 let mut stack: Vec<WalkStackItem> = Vec::new();
1769 for link in node.links.into_iter().rev() {
1770 let child_path = match &link.name {
1771 Some(name) if !name.starts_with('_') => {
1772 if path.is_empty() {
1773 name.clone()
1774 } else {
1775 format!("{}/{}", path, name)
1776 }
1777 }
1778 _ => path.clone(),
1779 };
1780 stack.push(WalkStackItem {
1782 hash: link.hash,
1783 path: child_path,
1784 key: link.key,
1785 });
1786 }
1787
1788 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1789 }
1790 WalkStreamState::Processing { mut stack, tree } => {
1791 tree.process_walk_stack(&mut stack).await
1792 }
1793 WalkStreamState::Done => None,
1794 }
1795 },
1796 ))
1797 }
1798
1799 async fn process_walk_stack<'a>(
1800 &'a self,
1801 stack: &mut Vec<WalkStackItem>,
1802 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1803 while let Some(item) = stack.pop() {
1804 let data = match self.store.get(&item.hash).await {
1805 Ok(Some(d)) => d,
1806 Ok(None) => continue,
1807 Err(e) => {
1808 return Some((
1809 Err(HashTreeError::Store(e.to_string())),
1810 WalkStreamState::Done,
1811 ))
1812 }
1813 };
1814
1815 let node = match try_decode_tree_node(&data) {
1816 Some(n) => n,
1817 None => {
1818 let entry = WalkEntry {
1820 path: item.path,
1821 hash: item.hash,
1822 link_type: LinkType::Blob,
1823 size: data.len() as u64,
1824 key: item.key,
1825 };
1826 return Some((
1827 Ok(entry),
1828 WalkStreamState::Processing {
1829 stack: std::mem::take(stack),
1830 tree: self,
1831 },
1832 ));
1833 }
1834 };
1835
1836 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1837 let entry = WalkEntry {
1838 path: item.path.clone(),
1839 hash: item.hash,
1840 link_type: node.node_type,
1841 size: node_size,
1842 key: None, };
1844
1845 for link in node.links.into_iter().rev() {
1847 let child_path = match &link.name {
1848 Some(name) if !name.starts_with('_') => {
1849 if item.path.is_empty() {
1850 name.clone()
1851 } else {
1852 format!("{}/{}", item.path, name)
1853 }
1854 }
1855 _ => item.path.clone(),
1856 };
1857 stack.push(WalkStackItem {
1858 hash: link.hash,
1859 path: child_path,
1860 key: link.key,
1861 });
1862 }
1863
1864 return Some((
1865 Ok(entry),
1866 WalkStreamState::Processing {
1867 stack: std::mem::take(stack),
1868 tree: self,
1869 },
1870 ));
1871 }
1872 None
1873 }
1874
1875 pub async fn set_entry(
1880 &self,
1881 root: &Cid,
1882 path: &[&str],
1883 name: &str,
1884 entry_cid: &Cid,
1885 size: u64,
1886 link_type: LinkType,
1887 ) -> Result<Cid, HashTreeError> {
1888 let dir_cid = self.resolve_path_array(root, path).await?;
1889 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1890
1891 let entries = self.list_directory(&dir_cid).await?;
1892 let mut new_entries: Vec<DirEntry> = entries
1893 .into_iter()
1894 .filter(|e| e.name != name)
1895 .map(|e| DirEntry {
1896 name: e.name,
1897 hash: e.hash,
1898 size: e.size,
1899 key: e.key,
1900 link_type: e.link_type,
1901 meta: e.meta,
1902 })
1903 .collect();
1904
1905 new_entries.push(DirEntry {
1906 name: name.to_string(),
1907 hash: entry_cid.hash,
1908 size,
1909 key: entry_cid.key,
1910 link_type,
1911 meta: None,
1912 });
1913
1914 let new_dir_cid = self.put_directory(new_entries).await?;
1915 self.rebuild_path(root, path, new_dir_cid).await
1916 }
1917
1918 pub async fn remove_entry(
1921 &self,
1922 root: &Cid,
1923 path: &[&str],
1924 name: &str,
1925 ) -> Result<Cid, HashTreeError> {
1926 let dir_cid = self.resolve_path_array(root, path).await?;
1927 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1928
1929 let entries = self.list_directory(&dir_cid).await?;
1930 let new_entries: Vec<DirEntry> = entries
1931 .into_iter()
1932 .filter(|e| e.name != name)
1933 .map(|e| DirEntry {
1934 name: e.name,
1935 hash: e.hash,
1936 size: e.size,
1937 key: e.key,
1938 link_type: e.link_type,
1939 meta: e.meta,
1940 })
1941 .collect();
1942
1943 let new_dir_cid = self.put_directory(new_entries).await?;
1944 self.rebuild_path(root, path, new_dir_cid).await
1945 }
1946
1947 pub async fn rename_entry(
1950 &self,
1951 root: &Cid,
1952 path: &[&str],
1953 old_name: &str,
1954 new_name: &str,
1955 ) -> Result<Cid, HashTreeError> {
1956 if old_name == new_name {
1957 return Ok(root.clone());
1958 }
1959
1960 let dir_cid = self.resolve_path_array(root, path).await?;
1961 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1962
1963 let entries = self.list_directory(&dir_cid).await?;
1964 let entry = entries
1965 .iter()
1966 .find(|e| e.name == old_name)
1967 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1968
1969 let entry_hash = entry.hash;
1970 let entry_size = entry.size;
1971 let entry_key = entry.key;
1972 let entry_link_type = entry.link_type;
1973 let entry_meta = entry.meta.clone();
1974
1975 let new_entries: Vec<DirEntry> = entries
1976 .into_iter()
1977 .filter(|e| e.name != old_name)
1978 .map(|e| DirEntry {
1979 name: e.name,
1980 hash: e.hash,
1981 size: e.size,
1982 key: e.key,
1983 link_type: e.link_type,
1984 meta: e.meta,
1985 })
1986 .chain(std::iter::once(DirEntry {
1987 name: new_name.to_string(),
1988 hash: entry_hash,
1989 size: entry_size,
1990 key: entry_key,
1991 link_type: entry_link_type,
1992 meta: entry_meta,
1993 }))
1994 .collect();
1995
1996 let new_dir_cid = self.put_directory(new_entries).await?;
1997 self.rebuild_path(root, path, new_dir_cid).await
1998 }
1999
2000 pub async fn move_entry(
2003 &self,
2004 root: &Cid,
2005 source_path: &[&str],
2006 name: &str,
2007 target_path: &[&str],
2008 ) -> Result<Cid, HashTreeError> {
2009 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
2010 let source_dir_cid =
2011 source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
2012
2013 let source_entries = self.list_directory(&source_dir_cid).await?;
2014 let entry = source_entries
2015 .iter()
2016 .find(|e| e.name == name)
2017 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
2018
2019 let entry_cid = Cid {
2020 hash: entry.hash,
2021 key: entry.key,
2022 };
2023 let entry_size = entry.size;
2024 let entry_link_type = entry.link_type;
2025
2026 let new_root = self.remove_entry(root, source_path, name).await?;
2028
2029 self.set_entry(
2031 &new_root,
2032 target_path,
2033 name,
2034 &entry_cid,
2035 entry_size,
2036 entry_link_type,
2037 )
2038 .await
2039 }
2040
2041 async fn resolve_path_array(
2042 &self,
2043 root: &Cid,
2044 path: &[&str],
2045 ) -> Result<Option<Cid>, HashTreeError> {
2046 if path.is_empty() {
2047 return Ok(Some(root.clone()));
2048 }
2049 self.resolve_path(root, &path.join("/")).await
2050 }
2051
2052 async fn rebuild_path(
2053 &self,
2054 root: &Cid,
2055 path: &[&str],
2056 new_child: Cid,
2057 ) -> Result<Cid, HashTreeError> {
2058 if path.is_empty() {
2059 return Ok(new_child);
2060 }
2061
2062 let mut child_cid = new_child;
2063 let parts: Vec<&str> = path.to_vec();
2064
2065 for i in (0..parts.len()).rev() {
2066 let child_name = parts[i];
2067 let parent_path = &parts[..i];
2068
2069 let parent_cid = if parent_path.is_empty() {
2070 root.clone()
2071 } else {
2072 self.resolve_path_array(root, parent_path)
2073 .await?
2074 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
2075 };
2076
2077 let parent_entries = self.list_directory(&parent_cid).await?;
2078 let new_parent_entries: Vec<DirEntry> = parent_entries
2079 .into_iter()
2080 .map(|e| {
2081 if e.name == child_name {
2082 DirEntry {
2083 name: e.name,
2084 hash: child_cid.hash,
2085 size: 0, key: child_cid.key,
2087 link_type: e.link_type,
2088 meta: e.meta,
2089 }
2090 } else {
2091 DirEntry {
2092 name: e.name,
2093 hash: e.hash,
2094 size: e.size,
2095 key: e.key,
2096 link_type: e.link_type,
2097 meta: e.meta,
2098 }
2099 }
2100 })
2101 .collect();
2102
2103 child_cid = self.put_directory(new_parent_entries).await?;
2104 }
2105
2106 Ok(child_cid)
2107 }
2108
2109 pub fn get_store(&self) -> Arc<S> {
2113 self.store.clone()
2114 }
2115
2116 pub fn chunk_size(&self) -> usize {
2118 self.chunk_size
2119 }
2120
2121 pub fn max_links(&self) -> usize {
2123 self.max_links
2124 }
2125}
2126
2127enum StreamStackItem {
2130 Hash(Hash),
2131}
2132
2133enum ReadStreamState<'a, S: Store> {
2134 Init {
2135 hash: Hash,
2136 tree: &'a HashTree<S>,
2137 },
2138 Processing {
2139 stack: Vec<StreamStackItem>,
2140 tree: &'a HashTree<S>,
2141 },
2142 Done,
2143}
2144
2145struct WalkStackItem {
2146 hash: Hash,
2147 path: String,
2148 key: Option<[u8; 32]>,
2149}
2150
2151enum WalkStreamState<'a, S: Store> {
2152 Init {
2153 cid: Cid,
2154 path: String,
2155 tree: &'a HashTree<S>,
2156 },
2157 Processing {
2158 stack: Vec<WalkStackItem>,
2159 tree: &'a HashTree<S>,
2160 },
2161 Done,
2162}
2163
2164struct EncryptedStackItem {
2166 hash: Hash,
2167 key: Option<[u8; 32]>,
2168}
2169
2170enum EncryptedStreamState<'a, S: Store> {
2171 Init {
2172 hash: Hash,
2173 key: [u8; 32],
2174 tree: &'a HashTree<S>,
2175 },
2176 Processing {
2177 stack: Vec<EncryptedStackItem>,
2178 tree: &'a HashTree<S>,
2179 },
2180 Done,
2181}
2182
2183pub async fn verify_tree<S: Store>(
2185 store: Arc<S>,
2186 root_hash: &Hash,
2187) -> Result<crate::reader::VerifyResult, HashTreeError> {
2188 let mut missing = Vec::new();
2189 let mut visited = std::collections::HashSet::new();
2190
2191 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
2192
2193 Ok(crate::reader::VerifyResult {
2194 valid: missing.is_empty(),
2195 missing,
2196 })
2197}
2198
2199async fn verify_recursive<S: Store>(
2200 store: Arc<S>,
2201 hash: &Hash,
2202 missing: &mut Vec<Hash>,
2203 visited: &mut std::collections::HashSet<String>,
2204) -> Result<(), HashTreeError> {
2205 let hex = to_hex(hash);
2206 if visited.contains(&hex) {
2207 return Ok(());
2208 }
2209 visited.insert(hex);
2210
2211 let data = match store
2212 .get(hash)
2213 .await
2214 .map_err(|e| HashTreeError::Store(e.to_string()))?
2215 {
2216 Some(d) => d,
2217 None => {
2218 missing.push(*hash);
2219 return Ok(());
2220 }
2221 };
2222
2223 if is_tree_node(&data) {
2224 let node = decode_tree_node(&data)?;
2225 for link in &node.links {
2226 Box::pin(verify_recursive(
2227 store.clone(),
2228 &link.hash,
2229 missing,
2230 visited,
2231 ))
2232 .await?;
2233 }
2234 }
2235
2236 Ok(())
2237}
2238
2239#[cfg(test)]
2240mod tests {
2241 use super::*;
2242 use crate::store::MemoryStore;
2243
2244 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
2245 let store = Arc::new(MemoryStore::new());
2246 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
2248 (store, tree)
2249 }
2250
2251 #[tokio::test]
2252 async fn test_put_and_read_blob() {
2253 let (_store, tree) = make_tree();
2254
2255 let data = vec![1, 2, 3, 4, 5];
2256 let hash = tree.put_blob(&data).await.unwrap();
2257
2258 let result = tree.get_blob(&hash).await.unwrap();
2259 assert_eq!(result, Some(data));
2260 }
2261
2262 #[tokio::test]
2263 async fn test_put_and_read_file_small() {
2264 let (_store, tree) = make_tree();
2265
2266 let data = b"Hello, World!";
2267 let (cid, size) = tree.put_file(data).await.unwrap();
2268
2269 assert_eq!(size, data.len() as u64);
2270
2271 let read_data = tree.read_file(&cid.hash).await.unwrap();
2272 assert_eq!(read_data, Some(data.to_vec()));
2273 }
2274
2275 #[tokio::test]
2276 async fn test_put_and_read_directory() {
2277 let (_store, tree) = make_tree();
2278
2279 let file1 = tree.put_blob(b"content1").await.unwrap();
2280 let file2 = tree.put_blob(b"content2").await.unwrap();
2281
2282 let dir_cid = tree
2283 .put_directory(vec![
2284 DirEntry::new("a.txt", file1).with_size(8),
2285 DirEntry::new("b.txt", file2).with_size(8),
2286 ])
2287 .await
2288 .unwrap();
2289
2290 let entries = tree.list_directory(&dir_cid).await.unwrap();
2291 assert_eq!(entries.len(), 2);
2292 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
2293 assert!(names.contains(&"a.txt"));
2294 assert!(names.contains(&"b.txt"));
2295 }
2296
2297 #[tokio::test]
2298 async fn test_is_directory() {
2299 let (_store, tree) = make_tree();
2300
2301 let file_hash = tree.put_blob(b"data").await.unwrap();
2302 let dir_cid = tree.put_directory(vec![]).await.unwrap();
2303
2304 assert!(!tree.is_directory(&file_hash).await.unwrap());
2305 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
2306 }
2307
2308 #[tokio::test]
2309 async fn test_resolve_path() {
2310 let (_store, tree) = make_tree();
2311
2312 let file_hash = tree.put_blob(b"nested").await.unwrap();
2313 let sub_dir = tree
2314 .put_directory(vec![DirEntry::new("file.txt", file_hash).with_size(6)])
2315 .await
2316 .unwrap();
2317 let root_dir = tree
2318 .put_directory(vec![DirEntry::new("subdir", sub_dir.hash)])
2319 .await
2320 .unwrap();
2321
2322 let resolved = tree
2323 .resolve_path(&root_dir, "subdir/file.txt")
2324 .await
2325 .unwrap();
2326 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
2327 }
2328
2329 #[tokio::test]
2332 async fn test_unified_put_get_public() {
2333 let store = Arc::new(MemoryStore::new());
2334 let tree = HashTree::new(HashTreeConfig::new(store).public());
2336
2337 let data = b"Hello, public world!";
2338 let (cid, size) = tree.put(data).await.unwrap();
2339
2340 assert_eq!(size, data.len() as u64);
2341 assert!(cid.key.is_none()); let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2344 assert_eq!(retrieved, data);
2345 }
2346
2347 #[tokio::test]
2348 async fn test_unified_put_get_encrypted() {
2349 let store = Arc::new(MemoryStore::new());
2350 let tree = HashTree::new(HashTreeConfig::new(store));
2352
2353 let data = b"Hello, encrypted world!";
2354 let (cid, size) = tree.put(data).await.unwrap();
2355
2356 assert_eq!(size, data.len() as u64);
2357 assert!(cid.key.is_some()); let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2360 assert_eq!(retrieved, data);
2361 }
2362
2363 #[tokio::test]
2364 async fn test_unified_put_get_encrypted_chunked() {
2365 let store = Arc::new(MemoryStore::new());
2366 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2367
2368 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2370 let (cid, size) = tree.put(&data).await.unwrap();
2371
2372 assert_eq!(size, data.len() as u64);
2373 assert!(cid.key.is_some());
2374
2375 let retrieved = tree.get(&cid, None).await.unwrap().unwrap();
2376 assert_eq!(retrieved, data);
2377 }
2378
2379 #[tokio::test]
2380 async fn test_cid_deterministic() {
2381 let store = Arc::new(MemoryStore::new());
2382 let tree = HashTree::new(HashTreeConfig::new(store));
2383
2384 let data = b"Same content produces same CID";
2385
2386 let (cid1, _) = tree.put(data).await.unwrap();
2387 let (cid2, _) = tree.put(data).await.unwrap();
2388
2389 assert_eq!(cid1.hash, cid2.hash);
2391 assert_eq!(cid1.key, cid2.key);
2392 assert_eq!(cid1.to_string(), cid2.to_string());
2393 }
2394
2395 #[tokio::test]
2396 async fn test_cid_to_string_public() {
2397 let store = Arc::new(MemoryStore::new());
2398 let tree = HashTree::new(HashTreeConfig::new(store).public());
2399
2400 let (cid, _) = tree.put(b"test").await.unwrap();
2401 let s = cid.to_string();
2402
2403 assert_eq!(s.len(), 64);
2405 assert!(!s.contains(':'));
2406 }
2407
2408 #[tokio::test]
2409 async fn test_cid_to_string_encrypted() {
2410 let store = Arc::new(MemoryStore::new());
2411 let tree = HashTree::new(HashTreeConfig::new(store));
2412
2413 let (cid, _) = tree.put(b"test").await.unwrap();
2414 let s = cid.to_string();
2415
2416 assert_eq!(s.len(), 129);
2418 assert!(s.contains(':'));
2419 }
2420}