1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::io::AsyncRead;
10use futures::stream::{self, Stream};
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{
15 decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node,
16};
17use crate::hash::sha256;
18use crate::reader::{ReaderError, TreeEntry, WalkEntry};
19use crate::store::Store;
20use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
21
22use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
23
24#[derive(Clone)]
26pub struct HashTreeConfig<S: Store> {
27 pub store: Arc<S>,
28 pub chunk_size: usize,
29 pub max_links: usize,
30 pub encrypted: bool,
32}
33
34impl<S: Store> HashTreeConfig<S> {
35 pub fn new(store: Arc<S>) -> Self {
36 Self {
37 store,
38 chunk_size: DEFAULT_CHUNK_SIZE,
39 max_links: DEFAULT_MAX_LINKS,
40 encrypted: true,
41 }
42 }
43
44 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
45 self.chunk_size = chunk_size;
46 self
47 }
48
49 pub fn with_max_links(mut self, max_links: usize) -> Self {
50 self.max_links = max_links;
51 self
52 }
53
54 pub fn public(mut self) -> Self {
56 self.encrypted = false;
57 self
58 }
59}
60
61#[derive(Debug, thiserror::Error)]
63pub enum HashTreeError {
64 #[error("Store error: {0}")]
65 Store(String),
66 #[error("Codec error: {0}")]
67 Codec(#[from] crate::codec::CodecError),
68 #[error("Missing chunk: {0}")]
69 MissingChunk(String),
70 #[error("Path not found: {0}")]
71 PathNotFound(String),
72 #[error("Entry not found: {0}")]
73 EntryNotFound(String),
74 #[error("Encryption error: {0}")]
75 Encryption(String),
76 #[error("Decryption error: {0}")]
77 Decryption(String),
78}
79
80impl From<BuilderError> for HashTreeError {
81 fn from(e: BuilderError) -> Self {
82 match e {
83 BuilderError::Store(s) => HashTreeError::Store(s),
84 BuilderError::Codec(c) => HashTreeError::Codec(c),
85 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
86 }
87 }
88}
89
90impl From<ReaderError> for HashTreeError {
91 fn from(e: ReaderError) -> Self {
92 match e {
93 ReaderError::Store(s) => HashTreeError::Store(s),
94 ReaderError::Codec(c) => HashTreeError::Codec(c),
95 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
96 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
97 ReaderError::MissingKey => {
98 HashTreeError::Encryption("missing decryption key".to_string())
99 }
100 }
101 }
102}
103
104pub struct HashTree<S: Store> {
106 store: Arc<S>,
107 chunk_size: usize,
108 max_links: usize,
109 encrypted: bool,
110}
111
112impl<S: Store> HashTree<S> {
113 pub fn new(config: HashTreeConfig<S>) -> Self {
114 Self {
115 store: config.store,
116 chunk_size: config.chunk_size,
117 max_links: config.max_links,
118 encrypted: config.encrypted,
119 }
120 }
121
122 pub fn is_encrypted(&self) -> bool {
124 self.encrypted
125 }
126
127 pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
132 let size = data.len() as u64;
133
134 if data.len() <= self.chunk_size {
136 let (hash, key) = self.put_chunk_internal(data).await?;
137 return Ok((Cid { hash, key }, size));
138 }
139
140 let mut links: Vec<Link> = Vec::new();
142 let mut offset = 0;
143
144 while offset < data.len() {
145 let end = (offset + self.chunk_size).min(data.len());
146 let chunk = &data[offset..end];
147 let chunk_size = chunk.len() as u64;
148 let (hash, key) = self.put_chunk_internal(chunk).await?;
149 links.push(Link {
150 hash,
151 name: None,
152 size: chunk_size,
153 key,
154 link_type: LinkType::Blob, meta: None,
156 });
157 offset = end;
158 }
159
160 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
162 Ok((
163 Cid {
164 hash: root_hash,
165 key: root_key,
166 },
167 size,
168 ))
169 }
170
171 pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
173 if let Some(key) = cid.key {
174 self.get_encrypted(&cid.hash, &key).await
175 } else {
176 self.read_file(&cid.hash).await
177 }
178 }
179
180 pub async fn put_stream<R: AsyncRead + Unpin>(
186 &self,
187 mut reader: R,
188 ) -> Result<(Cid, u64), HashTreeError> {
189 let mut buffer = vec![0u8; self.chunk_size];
190 let mut links = Vec::new();
191 let mut total_size: u64 = 0;
192 let mut consistent_key: Option<[u8; 32]> = None;
193
194 loop {
195 let mut chunk = Vec::new();
196 let mut bytes_read = 0;
197
198 while bytes_read < self.chunk_size {
200 let n = reader
201 .read(&mut buffer[..self.chunk_size - bytes_read])
202 .await
203 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
204 if n == 0 {
205 break; }
207 chunk.extend_from_slice(&buffer[..n]);
208 bytes_read += n;
209 }
210
211 if chunk.is_empty() {
212 break; }
214
215 let chunk_len = chunk.len() as u64;
216 total_size += chunk_len;
217
218 let (hash, key) = self.put_chunk_internal(&chunk).await?;
219
220 if links.is_empty() {
222 consistent_key = key;
223 } else if consistent_key != key {
224 consistent_key = None;
225 }
226
227 links.push(Link {
228 hash,
229 name: None,
230 size: chunk_len,
231 key,
232 link_type: LinkType::Blob, meta: None,
234 });
235 }
236
237 if links.is_empty() {
238 let (hash, key) = self.put_chunk_internal(&[]).await?;
240 return Ok((Cid { hash, key }, 0));
241 }
242
243 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
245 Ok((
246 Cid {
247 hash: root_hash,
248 key: root_key,
249 },
250 total_size,
251 ))
252 }
253
254 pub fn get_stream(
259 &self,
260 cid: &Cid,
261 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
262 let hash = cid.hash;
263 let key = cid.key;
264
265 if let Some(k) = key {
266 Box::pin(self.read_file_stream_encrypted(hash, k))
268 } else {
269 self.read_file_stream(hash)
271 }
272 }
273
274 fn read_file_stream_encrypted(
276 &self,
277 hash: Hash,
278 key: EncryptionKey,
279 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
280 stream::unfold(
281 EncryptedStreamState::Init {
282 hash,
283 key,
284 tree: self,
285 },
286 |state| async move {
287 match state {
288 EncryptedStreamState::Init { hash, key, tree } => {
289 let data = match tree.store.get(&hash).await {
290 Ok(Some(d)) => d,
291 Ok(None) => return None,
292 Err(e) => {
293 return Some((
294 Err(HashTreeError::Store(e.to_string())),
295 EncryptedStreamState::Done,
296 ))
297 }
298 };
299
300 let decrypted = match decrypt_chk(&data, &key) {
302 Ok(d) => d,
303 Err(e) => {
304 return Some((
305 Err(HashTreeError::Decryption(e.to_string())),
306 EncryptedStreamState::Done,
307 ))
308 }
309 };
310
311 if !is_tree_node(&decrypted) {
312 return Some((Ok(decrypted), EncryptedStreamState::Done));
314 }
315
316 let node = match decode_tree_node(&decrypted) {
318 Ok(n) => n,
319 Err(e) => {
320 return Some((
321 Err(HashTreeError::Codec(e)),
322 EncryptedStreamState::Done,
323 ))
324 }
325 };
326
327 let mut stack: Vec<EncryptedStackItem> = Vec::new();
328 for link in node.links.into_iter().rev() {
329 stack.push(EncryptedStackItem {
330 hash: link.hash,
331 key: link.key,
332 });
333 }
334
335 tree.process_encrypted_stream_stack(&mut stack).await
336 }
337 EncryptedStreamState::Processing { mut stack, tree } => {
338 tree.process_encrypted_stream_stack(&mut stack).await
339 }
340 EncryptedStreamState::Done => None,
341 }
342 },
343 )
344 }
345
346 async fn process_encrypted_stream_stack<'a>(
347 &'a self,
348 stack: &mut Vec<EncryptedStackItem>,
349 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
350 while let Some(item) = stack.pop() {
351 let data = match self.store.get(&item.hash).await {
352 Ok(Some(d)) => d,
353 Ok(None) => {
354 return Some((
355 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
356 EncryptedStreamState::Done,
357 ))
358 }
359 Err(e) => {
360 return Some((
361 Err(HashTreeError::Store(e.to_string())),
362 EncryptedStreamState::Done,
363 ))
364 }
365 };
366
367 let decrypted = if let Some(key) = item.key {
369 match decrypt_chk(&data, &key) {
370 Ok(d) => d,
371 Err(e) => {
372 return Some((
373 Err(HashTreeError::Decryption(e.to_string())),
374 EncryptedStreamState::Done,
375 ))
376 }
377 }
378 } else {
379 data
380 };
381
382 if is_tree_node(&decrypted) {
383 let node = match decode_tree_node(&decrypted) {
385 Ok(n) => n,
386 Err(e) => {
387 return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done))
388 }
389 };
390 for link in node.links.into_iter().rev() {
391 stack.push(EncryptedStackItem {
392 hash: link.hash,
393 key: link.key,
394 });
395 }
396 } else {
397 return Some((
399 Ok(decrypted),
400 EncryptedStreamState::Processing {
401 stack: std::mem::take(stack),
402 tree: self,
403 },
404 ));
405 }
406 }
407 None
408 }
409
410 async fn put_chunk_internal(
412 &self,
413 data: &[u8],
414 ) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
415 if self.encrypted {
416 let (encrypted, key) =
417 encrypt_chk(data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
418 let hash = sha256(&encrypted);
419 self.store
420 .put(hash, encrypted)
421 .await
422 .map_err(|e| HashTreeError::Store(e.to_string()))?;
423 Ok((hash, Some(key)))
424 } else {
425 let hash = self.put_blob(data).await?;
426 Ok((hash, None))
427 }
428 }
429
430 async fn build_tree_internal(
432 &self,
433 links: Vec<Link>,
434 total_size: Option<u64>,
435 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
436 if links.len() == 1 {
438 if let Some(ts) = total_size {
439 if links[0].size == ts {
440 return Ok((links[0].hash, links[0].key));
441 }
442 }
443 }
444
445 if links.len() <= self.max_links {
446 let node = TreeNode {
447 node_type: LinkType::File,
448 links,
449 };
450 let (data, _) = encode_and_hash(&node)?;
451
452 if self.encrypted {
453 let (encrypted, key) =
454 encrypt_chk(&data).map_err(|e| HashTreeError::Encryption(e.to_string()))?;
455 let hash = sha256(&encrypted);
456 self.store
457 .put(hash, encrypted)
458 .await
459 .map_err(|e| HashTreeError::Store(e.to_string()))?;
460 return Ok((hash, Some(key)));
461 }
462
463 let hash = sha256(&data);
465 self.store
466 .put(hash, data)
467 .await
468 .map_err(|e| HashTreeError::Store(e.to_string()))?;
469 return Ok((hash, None));
470 }
471
472 let mut sub_links = Vec::new();
474 for batch in links.chunks(self.max_links) {
475 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
476 let (hash, key) =
477 Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
478 sub_links.push(Link {
479 hash,
480 name: None,
481 size: batch_size,
482 key,
483 link_type: LinkType::File, meta: None,
485 });
486 }
487
488 Box::pin(self.build_tree_internal(sub_links, total_size)).await
489 }
490
491 async fn get_encrypted(
493 &self,
494 hash: &Hash,
495 key: &EncryptionKey,
496 ) -> Result<Option<Vec<u8>>, HashTreeError> {
497 let encrypted_data = match self
498 .store
499 .get(hash)
500 .await
501 .map_err(|e| HashTreeError::Store(e.to_string()))?
502 {
503 Some(d) => d,
504 None => return Ok(None),
505 };
506
507 let decrypted = decrypt_chk(&encrypted_data, key)
509 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
510
511 if is_tree_node(&decrypted) {
513 let node = decode_tree_node(&decrypted)?;
514 let assembled = self.assemble_encrypted_chunks(&node).await?;
515 return Ok(Some(assembled));
516 }
517
518 Ok(Some(decrypted))
520 }
521
522 async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
524 let mut parts: Vec<Vec<u8>> = Vec::new();
525
526 for link in &node.links {
527 let chunk_key = link
528 .key
529 .ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
530
531 let encrypted_child = self
532 .store
533 .get(&link.hash)
534 .await
535 .map_err(|e| HashTreeError::Store(e.to_string()))?
536 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
537
538 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
539 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
540
541 if is_tree_node(&decrypted) {
542 let child_node = decode_tree_node(&decrypted)?;
544 let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
545 parts.push(child_data);
546 } else {
547 parts.push(decrypted);
549 }
550 }
551
552 let total_len: usize = parts.iter().map(|p| p.len()).sum();
553 let mut result = Vec::with_capacity(total_len);
554 for part in parts {
555 result.extend_from_slice(&part);
556 }
557
558 Ok(result)
559 }
560
561 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
566 let hash = sha256(data);
567 self.store
568 .put(hash, data.to_vec())
569 .await
570 .map_err(|e| HashTreeError::Store(e.to_string()))?;
571 Ok(hash)
572 }
573
574 pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
577 let size = data.len() as u64;
578
579 if data.len() <= self.chunk_size {
581 let (hash, key) = self.put_chunk_internal(data).await?;
582 return Ok((Cid { hash, key }, size));
583 }
584
585 let mut links: Vec<Link> = Vec::new();
587 let mut offset = 0;
588
589 while offset < data.len() {
590 let end = (offset + self.chunk_size).min(data.len());
591 let chunk = &data[offset..end];
592 let chunk_size = (end - offset) as u64;
593
594 let (hash, key) = self.put_chunk_internal(chunk).await?;
595 links.push(Link {
596 hash,
597 name: None,
598 size: chunk_size,
599 key,
600 link_type: LinkType::Blob, meta: None,
602 });
603 offset = end;
604 }
605
606 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
608 Ok((
609 Cid {
610 hash: root_hash,
611 key: root_key,
612 },
613 size,
614 ))
615 }
616
617 pub async fn put_directory(&self, entries: Vec<DirEntry>) -> Result<Cid, HashTreeError> {
623 let mut sorted = entries;
625 sorted.sort_by(|a, b| a.name.cmp(&b.name));
626
627 let links: Vec<Link> = sorted
628 .into_iter()
629 .map(|e| Link {
630 hash: e.hash,
631 name: Some(e.name),
632 size: e.size,
633 key: e.key,
634 link_type: e.link_type,
635 meta: e.meta,
636 })
637 .collect();
638
639 let node = TreeNode {
641 node_type: LinkType::Dir,
642 links,
643 };
644 let (data, _plain_hash) = encode_and_hash(&node)?;
645
646 let (cid, _size) = self.put(&data).await?;
651 Ok(cid)
652 }
653
654 pub async fn put_tree_node(&self, links: Vec<Link>) -> Result<Hash, HashTreeError> {
656 let node = TreeNode {
657 node_type: LinkType::Dir,
658 links,
659 };
660
661 let (data, hash) = encode_and_hash(&node)?;
662 self.store
663 .put(hash, data)
664 .await
665 .map_err(|e| HashTreeError::Store(e.to_string()))?;
666 Ok(hash)
667 }
668
669 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
673 self.store
674 .get(hash)
675 .await
676 .map_err(|e| HashTreeError::Store(e.to_string()))
677 }
678
679 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
681 let data = match self
682 .store
683 .get(hash)
684 .await
685 .map_err(|e| HashTreeError::Store(e.to_string()))?
686 {
687 Some(d) => d,
688 None => return Ok(None),
689 };
690
691 if !is_tree_node(&data) {
692 return Ok(None);
693 }
694
695 let node = decode_tree_node(&data)?;
696 Ok(Some(node))
697 }
698
699 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
701 let data = match self
702 .store
703 .get(&cid.hash)
704 .await
705 .map_err(|e| HashTreeError::Store(e.to_string()))?
706 {
707 Some(d) => d,
708 None => return Ok(None),
709 };
710
711 let decrypted = if let Some(key) = &cid.key {
713 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
714 } else {
715 data
716 };
717
718 if !is_tree_node(&decrypted) {
719 return Ok(None);
720 }
721
722 let node = decode_tree_node(&decrypted)?;
723 Ok(Some(node))
724 }
725
726 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
729 let data = match self
730 .store
731 .get(&cid.hash)
732 .await
733 .map_err(|e| HashTreeError::Store(e.to_string()))?
734 {
735 Some(d) => d,
736 None => return Ok(None),
737 };
738
739 let decrypted = if let Some(key) = &cid.key {
741 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
742 } else {
743 data
744 };
745
746 if !is_tree_node(&decrypted) {
747 return Ok(None);
748 }
749
750 let node = decode_tree_node(&decrypted)?;
751
752 if node.node_type == LinkType::File {
754 let assembled = self.assemble_chunks(&node).await?;
755 if is_tree_node(&assembled) {
756 let inner_node = decode_tree_node(&assembled)?;
757 return Ok(Some(inner_node));
758 }
759 }
760
761 Ok(Some(node))
762 }
763
764 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
766 let data = match self
767 .store
768 .get(hash)
769 .await
770 .map_err(|e| HashTreeError::Store(e.to_string()))?
771 {
772 Some(d) => d,
773 None => return Ok(false),
774 };
775 Ok(is_tree_node(&data))
776 }
777
778 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
780 let node = match self.get_node(cid).await? {
781 Some(n) => n,
782 None => return Ok(false),
783 };
784 Ok(node.links.iter().any(|l| {
786 l.name
787 .as_ref()
788 .map(|n| !n.starts_with('_'))
789 .unwrap_or(false)
790 }))
791 }
792
793 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
795 let data = match self
796 .store
797 .get(hash)
798 .await
799 .map_err(|e| HashTreeError::Store(e.to_string()))?
800 {
801 Some(d) => d,
802 None => return Ok(false),
803 };
804 Ok(is_directory_node(&data))
805 }
806
807 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
809 let data = match self
810 .store
811 .get(hash)
812 .await
813 .map_err(|e| HashTreeError::Store(e.to_string()))?
814 {
815 Some(d) => d,
816 None => return Ok(None),
817 };
818
819 if !is_tree_node(&data) {
821 return Ok(Some(data));
822 }
823
824 let node = decode_tree_node(&data)?;
826 let assembled = self.assemble_chunks(&node).await?;
827 Ok(Some(assembled))
828 }
829
830 pub async fn read_file_range(
837 &self,
838 hash: &Hash,
839 start: u64,
840 end: Option<u64>,
841 ) -> Result<Option<Vec<u8>>, HashTreeError> {
842 let data = match self
843 .store
844 .get(hash)
845 .await
846 .map_err(|e| HashTreeError::Store(e.to_string()))?
847 {
848 Some(d) => d,
849 None => return Ok(None),
850 };
851
852 if !is_tree_node(&data) {
854 let start_idx = start as usize;
855 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
856 if start_idx >= data.len() {
857 return Ok(Some(vec![]));
858 }
859 let end_idx = end_idx.min(data.len());
860 return Ok(Some(data[start_idx..end_idx].to_vec()));
861 }
862
863 let node = decode_tree_node(&data)?;
865 let range_data = self.assemble_chunks_range(&node, start, end).await?;
866 Ok(Some(range_data))
867 }
868
869 pub async fn read_file_range_cid(
871 &self,
872 cid: &Cid,
873 start: u64,
874 end: Option<u64>,
875 ) -> Result<Option<Vec<u8>>, HashTreeError> {
876 if let Some(_key) = cid.key {
877 let data = match self.get(cid).await? {
878 Some(d) => d,
879 None => return Ok(None),
880 };
881 let start_idx = start as usize;
882 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
883 if start_idx >= data.len() {
884 return Ok(Some(vec![]));
885 }
886 let end_idx = end_idx.min(data.len());
887 return Ok(Some(data[start_idx..end_idx].to_vec()));
888 }
889
890 self.read_file_range(&cid.hash, start, end).await
891 }
892
893 async fn assemble_chunks_range(
895 &self,
896 node: &TreeNode,
897 start: u64,
898 end: Option<u64>,
899 ) -> Result<Vec<u8>, HashTreeError> {
900 let chunks_info = self.collect_chunk_offsets(node).await?;
902
903 if chunks_info.is_empty() {
904 return Ok(vec![]);
905 }
906
907 let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
909 let actual_end = end.unwrap_or(total_size).min(total_size);
910
911 if start >= actual_end {
912 return Ok(vec![]);
913 }
914
915 let mut result = Vec::with_capacity((actual_end - start) as usize);
917 let mut current_offset = 0u64;
918
919 for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
920 let chunk_start = current_offset;
921 let chunk_end = current_offset + chunk_size;
922
923 if chunk_end > start && chunk_start < actual_end {
925 let chunk_data = self
927 .store
928 .get(chunk_hash)
929 .await
930 .map_err(|e| HashTreeError::Store(e.to_string()))?
931 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
932
933 let slice_start = if start > chunk_start {
935 (start - chunk_start) as usize
936 } else {
937 0
938 };
939 let slice_end = if actual_end < chunk_end {
940 (actual_end - chunk_start) as usize
941 } else {
942 chunk_data.len()
943 };
944
945 result.extend_from_slice(&chunk_data[slice_start..slice_end]);
946 }
947
948 current_offset = chunk_end;
949
950 if current_offset >= actual_end {
952 break;
953 }
954 }
955
956 Ok(result)
957 }
958
959 async fn collect_chunk_offsets(
962 &self,
963 node: &TreeNode,
964 ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
965 let mut chunks = Vec::new();
966 let mut offset = 0u64;
967 self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset)
968 .await?;
969 Ok(chunks)
970 }
971
972 async fn collect_chunk_offsets_recursive(
973 &self,
974 node: &TreeNode,
975 chunks: &mut Vec<(Hash, u64, u64)>,
976 offset: &mut u64,
977 ) -> Result<(), HashTreeError> {
978 for link in &node.links {
979 let child_data = self
980 .store
981 .get(&link.hash)
982 .await
983 .map_err(|e| HashTreeError::Store(e.to_string()))?
984 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
985
986 if is_tree_node(&child_data) {
987 let child_node = decode_tree_node(&child_data)?;
989 Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
990 } else {
991 let size = child_data.len() as u64;
993 chunks.push((link.hash, *offset, size));
994 *offset += size;
995 }
996 }
997 Ok(())
998 }
999
1000 async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
1002 let mut parts: Vec<Vec<u8>> = Vec::new();
1003
1004 for link in &node.links {
1005 let child_data = self
1006 .store
1007 .get(&link.hash)
1008 .await
1009 .map_err(|e| HashTreeError::Store(e.to_string()))?
1010 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1011
1012 if is_tree_node(&child_data) {
1013 let child_node = decode_tree_node(&child_data)?;
1014 parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
1015 } else {
1016 parts.push(child_data);
1017 }
1018 }
1019
1020 let total_length: usize = parts.iter().map(|p| p.len()).sum();
1022 let mut result = Vec::with_capacity(total_length);
1023 for part in parts {
1024 result.extend_from_slice(&part);
1025 }
1026
1027 Ok(result)
1028 }
1029
1030 pub fn read_file_stream(
1033 &self,
1034 hash: Hash,
1035 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
1036 Box::pin(stream::unfold(
1037 ReadStreamState::Init { hash, tree: self },
1038 |state| async move {
1039 match state {
1040 ReadStreamState::Init { hash, tree } => {
1041 let data = match tree.store.get(&hash).await {
1042 Ok(Some(d)) => d,
1043 Ok(None) => return None,
1044 Err(e) => {
1045 return Some((
1046 Err(HashTreeError::Store(e.to_string())),
1047 ReadStreamState::Done,
1048 ))
1049 }
1050 };
1051
1052 if !is_tree_node(&data) {
1053 return Some((Ok(data), ReadStreamState::Done));
1055 }
1056
1057 let node = match decode_tree_node(&data) {
1059 Ok(n) => n,
1060 Err(e) => {
1061 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1062 }
1063 };
1064
1065 let mut stack: Vec<StreamStackItem> = Vec::new();
1067 for link in node.links.into_iter().rev() {
1068 stack.push(StreamStackItem::Hash(link.hash));
1069 }
1070
1071 tree.process_stream_stack(&mut stack).await
1073 }
1074 ReadStreamState::Processing { mut stack, tree } => {
1075 tree.process_stream_stack(&mut stack).await
1076 }
1077 ReadStreamState::Done => None,
1078 }
1079 },
1080 ))
1081 }
1082
1083 async fn process_stream_stack<'a>(
1084 &'a self,
1085 stack: &mut Vec<StreamStackItem>,
1086 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
1087 while let Some(item) = stack.pop() {
1088 match item {
1089 StreamStackItem::Hash(hash) => {
1090 let data = match self.store.get(&hash).await {
1091 Ok(Some(d)) => d,
1092 Ok(None) => {
1093 return Some((
1094 Err(HashTreeError::MissingChunk(to_hex(&hash))),
1095 ReadStreamState::Done,
1096 ))
1097 }
1098 Err(e) => {
1099 return Some((
1100 Err(HashTreeError::Store(e.to_string())),
1101 ReadStreamState::Done,
1102 ))
1103 }
1104 };
1105
1106 if is_tree_node(&data) {
1107 let node = match decode_tree_node(&data) {
1109 Ok(n) => n,
1110 Err(e) => {
1111 return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done))
1112 }
1113 };
1114 for link in node.links.into_iter().rev() {
1115 stack.push(StreamStackItem::Hash(link.hash));
1116 }
1117 } else {
1118 return Some((
1120 Ok(data),
1121 ReadStreamState::Processing {
1122 stack: std::mem::take(stack),
1123 tree: self,
1124 },
1125 ));
1126 }
1127 }
1128 }
1129 }
1130 None
1131 }
1132
1133 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1135 let data = match self
1136 .store
1137 .get(hash)
1138 .await
1139 .map_err(|e| HashTreeError::Store(e.to_string()))?
1140 {
1141 Some(d) => d,
1142 None => return Ok(vec![]),
1143 };
1144
1145 if !is_tree_node(&data) {
1146 return Ok(vec![data]);
1147 }
1148
1149 let node = decode_tree_node(&data)?;
1150 self.collect_chunks(&node).await
1151 }
1152
1153 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1154 let mut chunks = Vec::new();
1155
1156 for link in &node.links {
1157 let child_data = self
1158 .store
1159 .get(&link.hash)
1160 .await
1161 .map_err(|e| HashTreeError::Store(e.to_string()))?
1162 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1163
1164 if is_tree_node(&child_data) {
1165 let child_node = decode_tree_node(&child_data)?;
1166 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1167 } else {
1168 chunks.push(child_data);
1169 }
1170 }
1171
1172 Ok(chunks)
1173 }
1174
1175 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1177 let node = match self.get_node(cid).await? {
1178 Some(n) => n,
1179 None => return Ok(vec![]),
1180 };
1181
1182 let mut entries = Vec::new();
1183
1184 for link in &node.links {
1185 if let Some(ref name) = link.name {
1187 if name.starts_with("_chunk_") || name.starts_with('_') {
1188 let chunk_cid = Cid {
1189 hash: link.hash,
1190 key: link.key,
1191 };
1192 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1193 entries.extend(sub_entries);
1194 continue;
1195 }
1196 }
1197
1198 entries.push(TreeEntry {
1199 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1200 hash: link.hash,
1201 size: link.size,
1202 link_type: link.link_type,
1203 key: link.key,
1204 meta: link.meta.clone(),
1205 });
1206 }
1207
1208 Ok(entries)
1209 }
1210
1211 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1214 let node = match self.get_directory_node(cid).await? {
1216 Some(n) => n,
1217 None => return Ok(vec![]),
1218 };
1219
1220 let mut entries = Vec::new();
1221
1222 for link in &node.links {
1223 if let Some(ref name) = link.name {
1225 if name.starts_with("_chunk_") || name.starts_with('_') {
1226 let sub_cid = Cid {
1228 hash: link.hash,
1229 key: cid.key,
1230 };
1231 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1232 entries.extend(sub_entries);
1233 continue;
1234 }
1235 }
1236
1237 entries.push(TreeEntry {
1238 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1239 hash: link.hash,
1240 size: link.size,
1241 link_type: link.link_type,
1242 key: link.key,
1243 meta: link.meta.clone(),
1244 });
1245 }
1246
1247 Ok(entries)
1248 }
1249
1250 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1252 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1253 if parts.is_empty() {
1254 return Ok(Some(cid.clone()));
1255 }
1256
1257 let mut current_cid = cid.clone();
1258
1259 for part in parts {
1260 let node = match self.get_directory_node(¤t_cid).await? {
1262 Some(n) => n,
1263 None => return Ok(None),
1264 };
1265
1266 if let Some(link) = self.find_link(&node, part) {
1267 current_cid = Cid {
1268 hash: link.hash,
1269 key: link.key,
1270 };
1271 } else {
1272 match self
1274 .find_link_in_subtrees_cid(&node, part, ¤t_cid)
1275 .await?
1276 {
1277 Some(link) => {
1278 current_cid = Cid {
1279 hash: link.hash,
1280 key: link.key,
1281 };
1282 }
1283 None => return Ok(None),
1284 }
1285 }
1286 }
1287
1288 Ok(Some(current_cid))
1289 }
1290
1291 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1293 self.resolve(cid, path).await
1294 }
1295
1296 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1297 node.links
1298 .iter()
1299 .find(|l| l.name.as_deref() == Some(name))
1300 .cloned()
1301 }
1302
1303 async fn find_link_in_subtrees_cid(
1305 &self,
1306 node: &TreeNode,
1307 name: &str,
1308 _parent_cid: &Cid,
1309 ) -> Result<Option<Link>, HashTreeError> {
1310 for link in &node.links {
1311 if !link
1312 .name
1313 .as_ref()
1314 .map(|n| n.starts_with('_'))
1315 .unwrap_or(false)
1316 {
1317 continue;
1318 }
1319
1320 let sub_cid = Cid {
1322 hash: link.hash,
1323 key: link.key.clone(),
1324 };
1325
1326 let sub_node = match self.get_node(&sub_cid).await? {
1327 Some(n) => n,
1328 None => continue,
1329 };
1330
1331 if let Some(found) = self.find_link(&sub_node, name) {
1332 return Ok(Some(found));
1333 }
1334
1335 if let Some(deep_found) =
1336 Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await?
1337 {
1338 return Ok(Some(deep_found));
1339 }
1340 }
1341
1342 Ok(None)
1343 }
1344
1345 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1347 let data = match self
1348 .store
1349 .get(hash)
1350 .await
1351 .map_err(|e| HashTreeError::Store(e.to_string()))?
1352 {
1353 Some(d) => d,
1354 None => return Ok(0),
1355 };
1356
1357 if !is_tree_node(&data) {
1358 return Ok(data.len() as u64);
1359 }
1360
1361 let node = decode_tree_node(&data)?;
1362 let mut total = 0u64;
1364 for link in &node.links {
1365 total += link.size;
1366 }
1367 Ok(total)
1368 }
1369
1370 pub async fn get_size_cid(&self, cid: &Cid) -> Result<u64, HashTreeError> {
1372 if cid.key.is_some() {
1373 let data = match self.get(cid).await? {
1374 Some(d) => d,
1375 None => return Ok(0),
1376 };
1377 return Ok(data.len() as u64);
1378 }
1379
1380 self.get_size(&cid.hash).await
1381 }
1382
1383 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1385 let mut entries = Vec::new();
1386 self.walk_recursive(cid, path, &mut entries).await?;
1387 Ok(entries)
1388 }
1389
1390 async fn walk_recursive(
1391 &self,
1392 cid: &Cid,
1393 path: &str,
1394 entries: &mut Vec<WalkEntry>,
1395 ) -> Result<(), HashTreeError> {
1396 let data = match self
1397 .store
1398 .get(&cid.hash)
1399 .await
1400 .map_err(|e| HashTreeError::Store(e.to_string()))?
1401 {
1402 Some(d) => d,
1403 None => return Ok(()),
1404 };
1405
1406 let data = if let Some(key) = &cid.key {
1408 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1409 } else {
1410 data
1411 };
1412
1413 let node = match try_decode_tree_node(&data) {
1414 Some(n) => n,
1415 None => {
1416 entries.push(WalkEntry {
1417 path: path.to_string(),
1418 hash: cid.hash,
1419 link_type: LinkType::Blob,
1420 size: data.len() as u64,
1421 key: cid.key,
1422 });
1423 return Ok(());
1424 }
1425 };
1426
1427 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1428 entries.push(WalkEntry {
1429 path: path.to_string(),
1430 hash: cid.hash,
1431 link_type: node.node_type,
1432 size: node_size,
1433 key: cid.key,
1434 });
1435
1436 for link in &node.links {
1437 let child_path = match &link.name {
1438 Some(name) => {
1439 if name.starts_with("_chunk_") || name.starts_with('_') {
1440 let sub_cid = Cid {
1442 hash: link.hash,
1443 key: cid.key,
1444 };
1445 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1446 continue;
1447 }
1448 if path.is_empty() {
1449 name.clone()
1450 } else {
1451 format!("{}/{}", path, name)
1452 }
1453 }
1454 None => path.to_string(),
1455 };
1456
1457 let child_cid = Cid {
1459 hash: link.hash,
1460 key: link.key,
1461 };
1462 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1463 }
1464
1465 Ok(())
1466 }
1467
1468 pub async fn walk_parallel(
1471 &self,
1472 cid: &Cid,
1473 path: &str,
1474 concurrency: usize,
1475 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1476 self.walk_parallel_with_progress(cid, path, concurrency, None)
1477 .await
1478 }
1479
1480 pub async fn walk_parallel_with_progress(
1487 &self,
1488 cid: &Cid,
1489 path: &str,
1490 concurrency: usize,
1491 progress: Option<&std::sync::atomic::AtomicUsize>,
1492 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1493 use futures::stream::{FuturesUnordered, StreamExt};
1494 use std::collections::VecDeque;
1495 use std::sync::atomic::Ordering;
1496
1497 let mut entries = Vec::new();
1498 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1499 let mut active = FuturesUnordered::new();
1500
1501 pending.push_back((cid.clone(), path.to_string()));
1503
1504 loop {
1505 while active.len() < concurrency {
1507 if let Some((node_cid, node_path)) = pending.pop_front() {
1508 let store = &self.store;
1509 let fut = async move {
1510 let data = store
1511 .get(&node_cid.hash)
1512 .await
1513 .map_err(|e| HashTreeError::Store(e.to_string()))?;
1514 Ok::<_, HashTreeError>((node_cid, node_path, data))
1515 };
1516 active.push(fut);
1517 } else {
1518 break;
1519 }
1520 }
1521
1522 if active.is_empty() {
1524 break;
1525 }
1526
1527 if let Some(result) = active.next().await {
1529 let (node_cid, node_path, data) = result?;
1530
1531 if let Some(counter) = progress {
1533 counter.fetch_add(1, Ordering::Relaxed);
1534 }
1535
1536 let data = match data {
1537 Some(d) => d,
1538 None => continue,
1539 };
1540
1541 let data = if let Some(key) = &node_cid.key {
1543 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1544 } else {
1545 data
1546 };
1547
1548 let node = match try_decode_tree_node(&data) {
1549 Some(n) => n,
1550 None => {
1551 entries.push(WalkEntry {
1553 path: node_path,
1554 hash: node_cid.hash,
1555 link_type: LinkType::Blob,
1556 size: data.len() as u64,
1557 key: node_cid.key,
1558 });
1559 continue;
1560 }
1561 };
1562
1563 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1565 entries.push(WalkEntry {
1566 path: node_path.clone(),
1567 hash: node_cid.hash,
1568 link_type: node.node_type,
1569 size: node_size,
1570 key: node_cid.key,
1571 });
1572
1573 for link in &node.links {
1575 let child_path = match &link.name {
1576 Some(name) => {
1577 if name.starts_with("_chunk_") || name.starts_with('_') {
1578 let sub_cid = Cid {
1580 hash: link.hash,
1581 key: node_cid.key,
1582 };
1583 pending.push_back((sub_cid, node_path.clone()));
1584 continue;
1585 }
1586 if node_path.is_empty() {
1587 name.clone()
1588 } else {
1589 format!("{}/{}", node_path, name)
1590 }
1591 }
1592 None => node_path.clone(),
1593 };
1594
1595 if link.link_type == LinkType::Blob {
1598 entries.push(WalkEntry {
1599 path: child_path,
1600 hash: link.hash,
1601 link_type: LinkType::Blob,
1602 size: link.size,
1603 key: link.key,
1604 });
1605 if let Some(counter) = progress {
1606 counter.fetch_add(1, Ordering::Relaxed);
1607 }
1608 continue;
1609 }
1610
1611 let child_cid = Cid {
1613 hash: link.hash,
1614 key: link.key,
1615 };
1616 pending.push_back((child_cid, child_path));
1617 }
1618 }
1619 }
1620
1621 Ok(entries)
1622 }
1623
1624 pub fn walk_stream(
1626 &self,
1627 cid: Cid,
1628 initial_path: String,
1629 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1630 Box::pin(stream::unfold(
1631 WalkStreamState::Init {
1632 cid,
1633 path: initial_path,
1634 tree: self,
1635 },
1636 |state| async move {
1637 match state {
1638 WalkStreamState::Init { cid, path, tree } => {
1639 let data = match tree.store.get(&cid.hash).await {
1640 Ok(Some(d)) => d,
1641 Ok(None) => return None,
1642 Err(e) => {
1643 return Some((
1644 Err(HashTreeError::Store(e.to_string())),
1645 WalkStreamState::Done,
1646 ))
1647 }
1648 };
1649
1650 let data = if let Some(key) = &cid.key {
1652 match decrypt_chk(&data, key) {
1653 Ok(d) => d,
1654 Err(e) => {
1655 return Some((
1656 Err(HashTreeError::Decryption(e.to_string())),
1657 WalkStreamState::Done,
1658 ))
1659 }
1660 }
1661 } else {
1662 data
1663 };
1664
1665 let node = match try_decode_tree_node(&data) {
1666 Some(n) => n,
1667 None => {
1668 let entry = WalkEntry {
1670 path,
1671 hash: cid.hash,
1672 link_type: LinkType::Blob,
1673 size: data.len() as u64,
1674 key: cid.key,
1675 };
1676 return Some((Ok(entry), WalkStreamState::Done));
1677 }
1678 };
1679
1680 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1681 let entry = WalkEntry {
1682 path: path.clone(),
1683 hash: cid.hash,
1684 link_type: node.node_type,
1685 size: node_size,
1686 key: cid.key,
1687 };
1688
1689 let mut stack: Vec<WalkStackItem> = Vec::new();
1691 for link in node.links.into_iter().rev() {
1692 let child_path = match &link.name {
1693 Some(name) if !name.starts_with('_') => {
1694 if path.is_empty() {
1695 name.clone()
1696 } else {
1697 format!("{}/{}", path, name)
1698 }
1699 }
1700 _ => path.clone(),
1701 };
1702 stack.push(WalkStackItem {
1704 hash: link.hash,
1705 path: child_path,
1706 key: link.key,
1707 });
1708 }
1709
1710 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1711 }
1712 WalkStreamState::Processing { mut stack, tree } => {
1713 tree.process_walk_stack(&mut stack).await
1714 }
1715 WalkStreamState::Done => None,
1716 }
1717 },
1718 ))
1719 }
1720
1721 async fn process_walk_stack<'a>(
1722 &'a self,
1723 stack: &mut Vec<WalkStackItem>,
1724 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1725 while let Some(item) = stack.pop() {
1726 let data = match self.store.get(&item.hash).await {
1727 Ok(Some(d)) => d,
1728 Ok(None) => continue,
1729 Err(e) => {
1730 return Some((
1731 Err(HashTreeError::Store(e.to_string())),
1732 WalkStreamState::Done,
1733 ))
1734 }
1735 };
1736
1737 let node = match try_decode_tree_node(&data) {
1738 Some(n) => n,
1739 None => {
1740 let entry = WalkEntry {
1742 path: item.path,
1743 hash: item.hash,
1744 link_type: LinkType::Blob,
1745 size: data.len() as u64,
1746 key: item.key,
1747 };
1748 return Some((
1749 Ok(entry),
1750 WalkStreamState::Processing {
1751 stack: std::mem::take(stack),
1752 tree: self,
1753 },
1754 ));
1755 }
1756 };
1757
1758 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1759 let entry = WalkEntry {
1760 path: item.path.clone(),
1761 hash: item.hash,
1762 link_type: node.node_type,
1763 size: node_size,
1764 key: None, };
1766
1767 for link in node.links.into_iter().rev() {
1769 let child_path = match &link.name {
1770 Some(name) if !name.starts_with('_') => {
1771 if item.path.is_empty() {
1772 name.clone()
1773 } else {
1774 format!("{}/{}", item.path, name)
1775 }
1776 }
1777 _ => item.path.clone(),
1778 };
1779 stack.push(WalkStackItem {
1780 hash: link.hash,
1781 path: child_path,
1782 key: link.key,
1783 });
1784 }
1785
1786 return Some((
1787 Ok(entry),
1788 WalkStreamState::Processing {
1789 stack: std::mem::take(stack),
1790 tree: self,
1791 },
1792 ));
1793 }
1794 None
1795 }
1796
1797 pub async fn set_entry(
1802 &self,
1803 root: &Cid,
1804 path: &[&str],
1805 name: &str,
1806 entry_cid: &Cid,
1807 size: u64,
1808 link_type: LinkType,
1809 ) -> Result<Cid, HashTreeError> {
1810 let dir_cid = self.resolve_path_array(root, path).await?;
1811 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1812
1813 let entries = self.list_directory(&dir_cid).await?;
1814 let mut new_entries: Vec<DirEntry> = entries
1815 .into_iter()
1816 .filter(|e| e.name != name)
1817 .map(|e| DirEntry {
1818 name: e.name,
1819 hash: e.hash,
1820 size: e.size,
1821 key: e.key,
1822 link_type: e.link_type,
1823 meta: e.meta,
1824 })
1825 .collect();
1826
1827 new_entries.push(DirEntry {
1828 name: name.to_string(),
1829 hash: entry_cid.hash,
1830 size,
1831 key: entry_cid.key,
1832 link_type,
1833 meta: None,
1834 });
1835
1836 let new_dir_cid = self.put_directory(new_entries).await?;
1837 self.rebuild_path(root, path, new_dir_cid).await
1838 }
1839
1840 pub async fn remove_entry(
1843 &self,
1844 root: &Cid,
1845 path: &[&str],
1846 name: &str,
1847 ) -> Result<Cid, HashTreeError> {
1848 let dir_cid = self.resolve_path_array(root, path).await?;
1849 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1850
1851 let entries = self.list_directory(&dir_cid).await?;
1852 let new_entries: Vec<DirEntry> = entries
1853 .into_iter()
1854 .filter(|e| e.name != name)
1855 .map(|e| DirEntry {
1856 name: e.name,
1857 hash: e.hash,
1858 size: e.size,
1859 key: e.key,
1860 link_type: e.link_type,
1861 meta: e.meta,
1862 })
1863 .collect();
1864
1865 let new_dir_cid = self.put_directory(new_entries).await?;
1866 self.rebuild_path(root, path, new_dir_cid).await
1867 }
1868
1869 pub async fn rename_entry(
1872 &self,
1873 root: &Cid,
1874 path: &[&str],
1875 old_name: &str,
1876 new_name: &str,
1877 ) -> Result<Cid, HashTreeError> {
1878 if old_name == new_name {
1879 return Ok(root.clone());
1880 }
1881
1882 let dir_cid = self.resolve_path_array(root, path).await?;
1883 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1884
1885 let entries = self.list_directory(&dir_cid).await?;
1886 let entry = entries
1887 .iter()
1888 .find(|e| e.name == old_name)
1889 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1890
1891 let entry_hash = entry.hash;
1892 let entry_size = entry.size;
1893 let entry_key = entry.key;
1894 let entry_link_type = entry.link_type;
1895 let entry_meta = entry.meta.clone();
1896
1897 let new_entries: Vec<DirEntry> = entries
1898 .into_iter()
1899 .filter(|e| e.name != old_name)
1900 .map(|e| DirEntry {
1901 name: e.name,
1902 hash: e.hash,
1903 size: e.size,
1904 key: e.key,
1905 link_type: e.link_type,
1906 meta: e.meta,
1907 })
1908 .chain(std::iter::once(DirEntry {
1909 name: new_name.to_string(),
1910 hash: entry_hash,
1911 size: entry_size,
1912 key: entry_key,
1913 link_type: entry_link_type,
1914 meta: entry_meta,
1915 }))
1916 .collect();
1917
1918 let new_dir_cid = self.put_directory(new_entries).await?;
1919 self.rebuild_path(root, path, new_dir_cid).await
1920 }
1921
1922 pub async fn move_entry(
1925 &self,
1926 root: &Cid,
1927 source_path: &[&str],
1928 name: &str,
1929 target_path: &[&str],
1930 ) -> Result<Cid, HashTreeError> {
1931 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1932 let source_dir_cid =
1933 source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1934
1935 let source_entries = self.list_directory(&source_dir_cid).await?;
1936 let entry = source_entries
1937 .iter()
1938 .find(|e| e.name == name)
1939 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1940
1941 let entry_cid = Cid {
1942 hash: entry.hash,
1943 key: entry.key,
1944 };
1945 let entry_size = entry.size;
1946 let entry_link_type = entry.link_type;
1947
1948 let new_root = self.remove_entry(root, source_path, name).await?;
1950
1951 self.set_entry(
1953 &new_root,
1954 target_path,
1955 name,
1956 &entry_cid,
1957 entry_size,
1958 entry_link_type,
1959 )
1960 .await
1961 }
1962
1963 async fn resolve_path_array(
1964 &self,
1965 root: &Cid,
1966 path: &[&str],
1967 ) -> Result<Option<Cid>, HashTreeError> {
1968 if path.is_empty() {
1969 return Ok(Some(root.clone()));
1970 }
1971 self.resolve_path(root, &path.join("/")).await
1972 }
1973
1974 async fn rebuild_path(
1975 &self,
1976 root: &Cid,
1977 path: &[&str],
1978 new_child: Cid,
1979 ) -> Result<Cid, HashTreeError> {
1980 if path.is_empty() {
1981 return Ok(new_child);
1982 }
1983
1984 let mut child_cid = new_child;
1985 let parts: Vec<&str> = path.to_vec();
1986
1987 for i in (0..parts.len()).rev() {
1988 let child_name = parts[i];
1989 let parent_path = &parts[..i];
1990
1991 let parent_cid = if parent_path.is_empty() {
1992 root.clone()
1993 } else {
1994 self.resolve_path_array(root, parent_path)
1995 .await?
1996 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1997 };
1998
1999 let parent_entries = self.list_directory(&parent_cid).await?;
2000 let new_parent_entries: Vec<DirEntry> = parent_entries
2001 .into_iter()
2002 .map(|e| {
2003 if e.name == child_name {
2004 DirEntry {
2005 name: e.name,
2006 hash: child_cid.hash,
2007 size: 0, key: child_cid.key,
2009 link_type: e.link_type,
2010 meta: e.meta,
2011 }
2012 } else {
2013 DirEntry {
2014 name: e.name,
2015 hash: e.hash,
2016 size: e.size,
2017 key: e.key,
2018 link_type: e.link_type,
2019 meta: e.meta,
2020 }
2021 }
2022 })
2023 .collect();
2024
2025 child_cid = self.put_directory(new_parent_entries).await?;
2026 }
2027
2028 Ok(child_cid)
2029 }
2030
2031 pub fn get_store(&self) -> Arc<S> {
2035 self.store.clone()
2036 }
2037
2038 pub fn chunk_size(&self) -> usize {
2040 self.chunk_size
2041 }
2042
2043 pub fn max_links(&self) -> usize {
2045 self.max_links
2046 }
2047}
2048
2049enum StreamStackItem {
2052 Hash(Hash),
2053}
2054
2055enum ReadStreamState<'a, S: Store> {
2056 Init {
2057 hash: Hash,
2058 tree: &'a HashTree<S>,
2059 },
2060 Processing {
2061 stack: Vec<StreamStackItem>,
2062 tree: &'a HashTree<S>,
2063 },
2064 Done,
2065}
2066
2067struct WalkStackItem {
2068 hash: Hash,
2069 path: String,
2070 key: Option<[u8; 32]>,
2071}
2072
2073enum WalkStreamState<'a, S: Store> {
2074 Init {
2075 cid: Cid,
2076 path: String,
2077 tree: &'a HashTree<S>,
2078 },
2079 Processing {
2080 stack: Vec<WalkStackItem>,
2081 tree: &'a HashTree<S>,
2082 },
2083 Done,
2084}
2085
2086struct EncryptedStackItem {
2088 hash: Hash,
2089 key: Option<[u8; 32]>,
2090}
2091
2092enum EncryptedStreamState<'a, S: Store> {
2093 Init {
2094 hash: Hash,
2095 key: [u8; 32],
2096 tree: &'a HashTree<S>,
2097 },
2098 Processing {
2099 stack: Vec<EncryptedStackItem>,
2100 tree: &'a HashTree<S>,
2101 },
2102 Done,
2103}
2104
2105pub async fn verify_tree<S: Store>(
2107 store: Arc<S>,
2108 root_hash: &Hash,
2109) -> Result<crate::reader::VerifyResult, HashTreeError> {
2110 let mut missing = Vec::new();
2111 let mut visited = std::collections::HashSet::new();
2112
2113 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
2114
2115 Ok(crate::reader::VerifyResult {
2116 valid: missing.is_empty(),
2117 missing,
2118 })
2119}
2120
2121async fn verify_recursive<S: Store>(
2122 store: Arc<S>,
2123 hash: &Hash,
2124 missing: &mut Vec<Hash>,
2125 visited: &mut std::collections::HashSet<String>,
2126) -> Result<(), HashTreeError> {
2127 let hex = to_hex(hash);
2128 if visited.contains(&hex) {
2129 return Ok(());
2130 }
2131 visited.insert(hex);
2132
2133 let data = match store
2134 .get(hash)
2135 .await
2136 .map_err(|e| HashTreeError::Store(e.to_string()))?
2137 {
2138 Some(d) => d,
2139 None => {
2140 missing.push(*hash);
2141 return Ok(());
2142 }
2143 };
2144
2145 if is_tree_node(&data) {
2146 let node = decode_tree_node(&data)?;
2147 for link in &node.links {
2148 Box::pin(verify_recursive(
2149 store.clone(),
2150 &link.hash,
2151 missing,
2152 visited,
2153 ))
2154 .await?;
2155 }
2156 }
2157
2158 Ok(())
2159}
2160
2161#[cfg(test)]
2162mod tests {
2163 use super::*;
2164 use crate::store::MemoryStore;
2165
2166 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
2167 let store = Arc::new(MemoryStore::new());
2168 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
2170 (store, tree)
2171 }
2172
2173 #[tokio::test]
2174 async fn test_put_and_read_blob() {
2175 let (_store, tree) = make_tree();
2176
2177 let data = vec![1, 2, 3, 4, 5];
2178 let hash = tree.put_blob(&data).await.unwrap();
2179
2180 let result = tree.get_blob(&hash).await.unwrap();
2181 assert_eq!(result, Some(data));
2182 }
2183
2184 #[tokio::test]
2185 async fn test_put_and_read_file_small() {
2186 let (_store, tree) = make_tree();
2187
2188 let data = b"Hello, World!";
2189 let (cid, size) = tree.put_file(data).await.unwrap();
2190
2191 assert_eq!(size, data.len() as u64);
2192
2193 let read_data = tree.read_file(&cid.hash).await.unwrap();
2194 assert_eq!(read_data, Some(data.to_vec()));
2195 }
2196
2197 #[tokio::test]
2198 async fn test_put_and_read_directory() {
2199 let (_store, tree) = make_tree();
2200
2201 let file1 = tree.put_blob(b"content1").await.unwrap();
2202 let file2 = tree.put_blob(b"content2").await.unwrap();
2203
2204 let dir_cid = tree
2205 .put_directory(vec![
2206 DirEntry::new("a.txt", file1).with_size(8),
2207 DirEntry::new("b.txt", file2).with_size(8),
2208 ])
2209 .await
2210 .unwrap();
2211
2212 let entries = tree.list_directory(&dir_cid).await.unwrap();
2213 assert_eq!(entries.len(), 2);
2214 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
2215 assert!(names.contains(&"a.txt"));
2216 assert!(names.contains(&"b.txt"));
2217 }
2218
2219 #[tokio::test]
2220 async fn test_is_directory() {
2221 let (_store, tree) = make_tree();
2222
2223 let file_hash = tree.put_blob(b"data").await.unwrap();
2224 let dir_cid = tree.put_directory(vec![]).await.unwrap();
2225
2226 assert!(!tree.is_directory(&file_hash).await.unwrap());
2227 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
2228 }
2229
2230 #[tokio::test]
2231 async fn test_resolve_path() {
2232 let (_store, tree) = make_tree();
2233
2234 let file_hash = tree.put_blob(b"nested").await.unwrap();
2235 let sub_dir = tree
2236 .put_directory(vec![DirEntry::new("file.txt", file_hash).with_size(6)])
2237 .await
2238 .unwrap();
2239 let root_dir = tree
2240 .put_directory(vec![DirEntry::new("subdir", sub_dir.hash)])
2241 .await
2242 .unwrap();
2243
2244 let resolved = tree
2245 .resolve_path(&root_dir, "subdir/file.txt")
2246 .await
2247 .unwrap();
2248 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
2249 }
2250
2251 #[tokio::test]
2254 async fn test_unified_put_get_public() {
2255 let store = Arc::new(MemoryStore::new());
2256 let tree = HashTree::new(HashTreeConfig::new(store).public());
2258
2259 let data = b"Hello, public world!";
2260 let (cid, size) = tree.put(data).await.unwrap();
2261
2262 assert_eq!(size, data.len() as u64);
2263 assert!(cid.key.is_none()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
2266 assert_eq!(retrieved, data);
2267 }
2268
2269 #[tokio::test]
2270 async fn test_unified_put_get_encrypted() {
2271 let store = Arc::new(MemoryStore::new());
2272 let tree = HashTree::new(HashTreeConfig::new(store));
2274
2275 let data = b"Hello, encrypted world!";
2276 let (cid, size) = tree.put(data).await.unwrap();
2277
2278 assert_eq!(size, data.len() as u64);
2279 assert!(cid.key.is_some()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
2282 assert_eq!(retrieved, data);
2283 }
2284
2285 #[tokio::test]
2286 async fn test_unified_put_get_encrypted_chunked() {
2287 let store = Arc::new(MemoryStore::new());
2288 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2289
2290 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2292 let (cid, size) = tree.put(&data).await.unwrap();
2293
2294 assert_eq!(size, data.len() as u64);
2295 assert!(cid.key.is_some());
2296
2297 let retrieved = tree.get(&cid).await.unwrap().unwrap();
2298 assert_eq!(retrieved, data);
2299 }
2300
2301 #[tokio::test]
2302 async fn test_cid_deterministic() {
2303 let store = Arc::new(MemoryStore::new());
2304 let tree = HashTree::new(HashTreeConfig::new(store));
2305
2306 let data = b"Same content produces same CID";
2307
2308 let (cid1, _) = tree.put(data).await.unwrap();
2309 let (cid2, _) = tree.put(data).await.unwrap();
2310
2311 assert_eq!(cid1.hash, cid2.hash);
2313 assert_eq!(cid1.key, cid2.key);
2314 assert_eq!(cid1.to_string(), cid2.to_string());
2315 }
2316
2317 #[tokio::test]
2318 async fn test_cid_to_string_public() {
2319 let store = Arc::new(MemoryStore::new());
2320 let tree = HashTree::new(HashTreeConfig::new(store).public());
2321
2322 let (cid, _) = tree.put(b"test").await.unwrap();
2323 let s = cid.to_string();
2324
2325 assert_eq!(s.len(), 64);
2327 assert!(!s.contains(':'));
2328 }
2329
2330 #[tokio::test]
2331 async fn test_cid_to_string_encrypted() {
2332 let store = Arc::new(MemoryStore::new());
2333 let tree = HashTree::new(HashTreeConfig::new(store));
2334
2335 let (cid, _) = tree.put(b"test").await.unwrap();
2336 let s = cid.to_string();
2337
2338 assert_eq!(s.len(), 129);
2340 assert!(s.contains(':'));
2341 }
2342}