1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25 pub store: Arc<S>,
26 pub chunk_size: usize,
27 pub max_links: usize,
28 pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33 pub fn new(store: Arc<S>) -> Self {
34 Self {
35 store,
36 chunk_size: DEFAULT_CHUNK_SIZE,
37 max_links: DEFAULT_MAX_LINKS,
38 encrypted: true,
39 }
40 }
41
42 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43 self.chunk_size = chunk_size;
44 self
45 }
46
47 pub fn with_max_links(mut self, max_links: usize) -> Self {
48 self.max_links = max_links;
49 self
50 }
51
52 pub fn public(mut self) -> Self {
54 self.encrypted = false;
55 self
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct PutFileResult {
62 pub hash: Hash,
63 pub size: u64,
64}
65
66#[derive(Debug, thiserror::Error)]
68pub enum HashTreeError {
69 #[error("Store error: {0}")]
70 Store(String),
71 #[error("Codec error: {0}")]
72 Codec(#[from] crate::codec::CodecError),
73 #[error("Missing chunk: {0}")]
74 MissingChunk(String),
75 #[error("Path not found: {0}")]
76 PathNotFound(String),
77 #[error("Entry not found: {0}")]
78 EntryNotFound(String),
79 #[error("Encryption error: {0}")]
80 Encryption(String),
81 #[error("Decryption error: {0}")]
82 Decryption(String),
83}
84
85impl From<BuilderError> for HashTreeError {
86 fn from(e: BuilderError) -> Self {
87 match e {
88 BuilderError::Store(s) => HashTreeError::Store(s),
89 BuilderError::Codec(c) => HashTreeError::Codec(c),
90 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
91 }
92 }
93}
94
95impl From<ReaderError> for HashTreeError {
96 fn from(e: ReaderError) -> Self {
97 match e {
98 ReaderError::Store(s) => HashTreeError::Store(s),
99 ReaderError::Codec(c) => HashTreeError::Codec(c),
100 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
101 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
102 ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
103 }
104 }
105}
106
107pub struct HashTree<S: Store> {
109 store: Arc<S>,
110 chunk_size: usize,
111 max_links: usize,
112 encrypted: bool,
113}
114
115impl<S: Store> HashTree<S> {
116 pub fn new(config: HashTreeConfig<S>) -> Self {
117 Self {
118 store: config.store,
119 chunk_size: config.chunk_size,
120 max_links: config.max_links,
121 encrypted: config.encrypted,
122 }
123 }
124
125 pub fn is_encrypted(&self) -> bool {
127 self.encrypted
128 }
129
130 pub async fn put(&self, data: &[u8]) -> Result<Cid, HashTreeError> {
135 let size = data.len() as u64;
136
137 if data.len() <= self.chunk_size {
139 let (hash, key) = self.put_chunk_internal(data).await?;
140 return Ok(Cid { hash, key, size });
141 }
142
143 let mut links: Vec<Link> = Vec::new();
145 let mut offset = 0;
146
147 while offset < data.len() {
148 let end = (offset + self.chunk_size).min(data.len());
149 let chunk = &data[offset..end];
150 let chunk_size = chunk.len() as u64;
151 let (hash, key) = self.put_chunk_internal(chunk).await?;
152 links.push(Link {
153 hash,
154 name: None,
155 size: chunk_size,
156 key,
157 link_type: LinkType::Blob, meta: None,
159 });
160 offset = end;
161 }
162
163 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
165 Ok(Cid { hash: root_hash, key: root_key, size })
166 }
167
168 pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
170 if let Some(key) = cid.key {
171 self.get_encrypted(&cid.hash, &key).await
172 } else {
173 self.read_file(&cid.hash).await
174 }
175 }
176
177 pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<Cid, HashTreeError> {
182 let mut buffer = vec![0u8; self.chunk_size];
183 let mut links = Vec::new();
184 let mut total_size: u64 = 0;
185 let mut consistent_key: Option<[u8; 32]> = None;
186
187 loop {
188 let mut chunk = Vec::new();
189 let mut bytes_read = 0;
190
191 while bytes_read < self.chunk_size {
193 let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
194 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
195 if n == 0 {
196 break; }
198 chunk.extend_from_slice(&buffer[..n]);
199 bytes_read += n;
200 }
201
202 if chunk.is_empty() {
203 break; }
205
206 let chunk_len = chunk.len() as u64;
207 total_size += chunk_len;
208
209 let (hash, key) = self.put_chunk_internal(&chunk).await?;
210
211 if links.is_empty() {
213 consistent_key = key;
214 } else if consistent_key != key {
215 consistent_key = None;
216 }
217
218 links.push(Link {
219 hash,
220 name: None,
221 size: chunk_len,
222 key,
223 link_type: LinkType::Blob, meta: None,
225 });
226 }
227
228 if links.is_empty() {
229 let (hash, key) = self.put_chunk_internal(&[]).await?;
231 return Ok(Cid { hash, key, size: 0 });
232 }
233
234 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
236 Ok(Cid { hash: root_hash, key: root_key, size: total_size })
237 }
238
239 pub fn get_stream(
244 &self,
245 cid: &Cid,
246 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
247 let hash = cid.hash;
248 let key = cid.key;
249
250 if let Some(k) = key {
251 Box::pin(self.read_file_stream_encrypted(hash, k))
253 } else {
254 self.read_file_stream(hash)
256 }
257 }
258
259 fn read_file_stream_encrypted(
261 &self,
262 hash: Hash,
263 key: EncryptionKey,
264 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
265 stream::unfold(
266 EncryptedStreamState::Init { hash, key, tree: self },
267 |state| async move {
268 match state {
269 EncryptedStreamState::Init { hash, key, tree } => {
270 let data = match tree.store.get(&hash).await {
271 Ok(Some(d)) => d,
272 Ok(None) => return None,
273 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
274 };
275
276 let decrypted = match decrypt_chk(&data, &key) {
278 Ok(d) => d,
279 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
280 };
281
282 if !is_tree_node(&decrypted) {
283 return Some((Ok(decrypted), EncryptedStreamState::Done));
285 }
286
287 let node = match decode_tree_node(&decrypted) {
289 Ok(n) => n,
290 Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
291 };
292
293 let mut stack: Vec<EncryptedStackItem> = Vec::new();
294 for link in node.links.into_iter().rev() {
295 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
296 }
297
298 tree.process_encrypted_stream_stack(&mut stack).await
299 }
300 EncryptedStreamState::Processing { mut stack, tree } => {
301 tree.process_encrypted_stream_stack(&mut stack).await
302 }
303 EncryptedStreamState::Done => None,
304 }
305 },
306 )
307 }
308
309 async fn process_encrypted_stream_stack<'a>(
310 &'a self,
311 stack: &mut Vec<EncryptedStackItem>,
312 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
313 while let Some(item) = stack.pop() {
314 let data = match self.store.get(&item.hash).await {
315 Ok(Some(d)) => d,
316 Ok(None) => {
317 return Some((
318 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
319 EncryptedStreamState::Done,
320 ))
321 }
322 Err(e) => {
323 return Some((
324 Err(HashTreeError::Store(e.to_string())),
325 EncryptedStreamState::Done,
326 ))
327 }
328 };
329
330 let decrypted = if let Some(key) = item.key {
332 match decrypt_chk(&data, &key) {
333 Ok(d) => d,
334 Err(e) => {
335 return Some((
336 Err(HashTreeError::Decryption(e.to_string())),
337 EncryptedStreamState::Done,
338 ))
339 }
340 }
341 } else {
342 data
343 };
344
345 if is_tree_node(&decrypted) {
346 let node = match decode_tree_node(&decrypted) {
348 Ok(n) => n,
349 Err(e) => {
350 return Some((
351 Err(HashTreeError::Codec(e)),
352 EncryptedStreamState::Done,
353 ))
354 }
355 };
356 for link in node.links.into_iter().rev() {
357 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
358 }
359 } else {
360 return Some((
362 Ok(decrypted),
363 EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
364 ));
365 }
366 }
367 None
368 }
369
370 async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
372 if self.encrypted {
373 let (encrypted, key) = encrypt_chk(data)
374 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
375 let hash = sha256(&encrypted);
376 self.store
377 .put(hash, encrypted)
378 .await
379 .map_err(|e| HashTreeError::Store(e.to_string()))?;
380 Ok((hash, Some(key)))
381 } else {
382 let hash = self.put_blob(data).await?;
383 Ok((hash, None))
384 }
385 }
386
387 async fn build_tree_internal(
389 &self,
390 links: Vec<Link>,
391 total_size: Option<u64>,
392 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
393 if links.len() == 1 {
395 if let Some(ts) = total_size {
396 if links[0].size == ts {
397 return Ok((links[0].hash, links[0].key));
398 }
399 }
400 }
401
402 if links.len() <= self.max_links {
403 let node = TreeNode {
404 node_type: LinkType::File,
405 links,
406 };
407 let (data, _) = encode_and_hash(&node)?;
408
409 if self.encrypted {
410 let (encrypted, key) = encrypt_chk(&data)
411 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
412 let hash = sha256(&encrypted);
413 self.store
414 .put(hash, encrypted)
415 .await
416 .map_err(|e| HashTreeError::Store(e.to_string()))?;
417 return Ok((hash, Some(key)));
418 }
419
420 let hash = sha256(&data);
422 self.store
423 .put(hash, data)
424 .await
425 .map_err(|e| HashTreeError::Store(e.to_string()))?;
426 return Ok((hash, None));
427 }
428
429 let mut sub_links = Vec::new();
431 for batch in links.chunks(self.max_links) {
432 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
433 let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
434 sub_links.push(Link {
435 hash,
436 name: None,
437 size: batch_size,
438 key,
439 link_type: LinkType::File, meta: None,
441 });
442 }
443
444 Box::pin(self.build_tree_internal(sub_links, total_size)).await
445 }
446
447 async fn get_encrypted(
449 &self,
450 hash: &Hash,
451 key: &EncryptionKey,
452 ) -> Result<Option<Vec<u8>>, HashTreeError> {
453 let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
454 Some(d) => d,
455 None => return Ok(None),
456 };
457
458 let decrypted = decrypt_chk(&encrypted_data, key)
460 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
461
462 if is_tree_node(&decrypted) {
464 let node = decode_tree_node(&decrypted)?;
465 let assembled = self.assemble_encrypted_chunks(&node).await?;
466 return Ok(Some(assembled));
467 }
468
469 Ok(Some(decrypted))
471 }
472
473 async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
475 let mut parts: Vec<Vec<u8>> = Vec::new();
476
477 for link in &node.links {
478 let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
479
480 let encrypted_child = self
481 .store
482 .get(&link.hash)
483 .await
484 .map_err(|e| HashTreeError::Store(e.to_string()))?
485 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
486
487 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
488 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
489
490 if is_tree_node(&decrypted) {
491 let child_node = decode_tree_node(&decrypted)?;
493 let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
494 parts.push(child_data);
495 } else {
496 parts.push(decrypted);
498 }
499 }
500
501 let total_len: usize = parts.iter().map(|p| p.len()).sum();
502 let mut result = Vec::with_capacity(total_len);
503 for part in parts {
504 result.extend_from_slice(&part);
505 }
506
507 Ok(result)
508 }
509
510 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
515 let hash = sha256(data);
516 self.store
517 .put(hash, data.to_vec())
518 .await
519 .map_err(|e| HashTreeError::Store(e.to_string()))?;
520 Ok(hash)
521 }
522
523 pub async fn put_file(&self, data: &[u8]) -> Result<PutFileResult, HashTreeError> {
526 let size = data.len() as u64;
527
528 if data.len() <= self.chunk_size {
530 let hash = self.put_blob(data).await?;
531 return Ok(PutFileResult { hash, size });
532 }
533
534 let mut chunk_hashes: Vec<Hash> = Vec::new();
536 let mut offset = 0;
537
538 while offset < data.len() {
539 let end = (offset + self.chunk_size).min(data.len());
540 let chunk = &data[offset..end];
541 let hash = self.put_blob(chunk).await?;
542 chunk_hashes.push(hash);
543 offset = end;
544 }
545
546 let chunks: Vec<Link> = chunk_hashes
548 .iter()
549 .enumerate()
550 .map(|(i, &hash)| {
551 let chunk_size = if i < chunk_hashes.len() - 1 {
552 self.chunk_size as u64
553 } else {
554 (data.len() - i * self.chunk_size) as u64
555 };
556 Link {
557 hash,
558 name: None,
559 size: chunk_size,
560 key: None,
561 link_type: LinkType::Blob, meta: None,
563 }
564 })
565 .collect();
566
567 let root_hash = self.build_tree(chunks, Some(size)).await?;
568 Ok(PutFileResult { hash: root_hash, size })
569 }
570
571 pub async fn put_directory(
574 &self,
575 entries: Vec<DirEntry>,
576 ) -> Result<Cid, HashTreeError> {
577 let mut sorted = entries;
579 sorted.sort_by(|a, b| a.name.cmp(&b.name));
580
581 let links: Vec<Link> = sorted
582 .into_iter()
583 .map(|e| Link {
584 hash: e.hash,
585 name: Some(e.name),
586 size: e.size,
587 key: e.key,
588 link_type: e.link_type,
589 meta: e.meta,
590 })
591 .collect();
592
593 let total_size: u64 = links.iter().map(|l| l.size).sum();
594
595 if links.len() <= self.max_links {
597 let node = TreeNode {
598 node_type: LinkType::Dir,
599 links,
600 };
601 let (data, _plain_hash) = encode_and_hash(&node)?;
602
603 let (hash, key) = self.put_chunk_internal(&data).await?;
605
606 return Ok(Cid { hash, key, size: total_size });
607 }
608
609 self.build_directory_by_chunks(links, total_size).await
611 }
612
613 async fn build_tree(&self, links: Vec<Link>, total_size: Option<u64>) -> Result<Hash, HashTreeError> {
615 if links.len() == 1 {
617 if let Some(ts) = total_size {
618 if links[0].size == ts {
619 return Ok(links[0].hash);
620 }
621 }
622 }
623
624 if links.len() <= self.max_links {
626 let node = TreeNode {
627 node_type: LinkType::File,
628 links,
629 };
630 let (data, hash) = encode_and_hash(&node)?;
631 self.store
632 .put(hash, data)
633 .await
634 .map_err(|e| HashTreeError::Store(e.to_string()))?;
635 return Ok(hash);
636 }
637
638 let mut sub_trees: Vec<Link> = Vec::new();
640
641 for batch in links.chunks(self.max_links) {
642 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
643
644 let node = TreeNode {
645 node_type: LinkType::File,
646 links: batch.to_vec(),
647 };
648 let (data, hash) = encode_and_hash(&node)?;
649 self.store
650 .put(hash, data)
651 .await
652 .map_err(|e| HashTreeError::Store(e.to_string()))?;
653
654 sub_trees.push(Link {
655 hash,
656 name: None,
657 size: batch_size,
658 key: None,
659 link_type: LinkType::File, meta: None,
661 });
662 }
663
664 Box::pin(self.build_tree(sub_trees, total_size)).await
666 }
667
668 async fn build_directory_by_chunks(
671 &self,
672 links: Vec<Link>,
673 total_size: u64,
674 ) -> Result<Cid, HashTreeError> {
675 let mut sub_trees: Vec<Link> = Vec::new();
676
677 for (i, batch) in links.chunks(self.max_links).enumerate() {
678 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
679
680 let node = TreeNode {
681 node_type: LinkType::Dir,
682 links: batch.to_vec(),
683 };
684 let (data, _plain_hash) = encode_and_hash(&node)?;
685
686 let (hash, key) = self.put_chunk_internal(&data).await?;
688
689 sub_trees.push(Link {
690 hash,
691 name: Some(format!("_chunk_{}", i * self.max_links)),
692 size: batch_size,
693 key,
694 link_type: LinkType::Dir, meta: None,
696 });
697 }
698
699 if sub_trees.len() <= self.max_links {
700 let node = TreeNode {
701 node_type: LinkType::Dir,
702 links: sub_trees,
703 };
704 let (data, _plain_hash) = encode_and_hash(&node)?;
705
706 let (hash, key) = self.put_chunk_internal(&data).await?;
708 return Ok(Cid { hash, key, size: total_size });
709 }
710
711 Box::pin(self.build_directory_by_chunks(sub_trees, total_size)).await
713 }
714
715 pub async fn put_tree_node(
717 &self,
718 links: Vec<Link>,
719 ) -> Result<Hash, HashTreeError> {
720 let node = TreeNode {
721 node_type: LinkType::Dir,
722 links,
723 };
724
725 let (data, hash) = encode_and_hash(&node)?;
726 self.store
727 .put(hash, data)
728 .await
729 .map_err(|e| HashTreeError::Store(e.to_string()))?;
730 Ok(hash)
731 }
732
733 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
737 self.store
738 .get(hash)
739 .await
740 .map_err(|e| HashTreeError::Store(e.to_string()))
741 }
742
743 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
745 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
746 Some(d) => d,
747 None => return Ok(None),
748 };
749
750 if !is_tree_node(&data) {
751 return Ok(None);
752 }
753
754 let node = decode_tree_node(&data)?;
755 Ok(Some(node))
756 }
757
758 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
760 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
761 Some(d) => d,
762 None => return Ok(None),
763 };
764
765 let decrypted = if let Some(key) = &cid.key {
767 decrypt_chk(&data, key)
768 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
769 } else {
770 data
771 };
772
773 if !is_tree_node(&decrypted) {
774 return Ok(None);
775 }
776
777 let node = decode_tree_node(&decrypted)?;
778 Ok(Some(node))
779 }
780
781 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
783 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
784 Some(d) => d,
785 None => return Ok(false),
786 };
787 Ok(is_tree_node(&data))
788 }
789
790 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
792 let node = match self.get_node(cid).await? {
793 Some(n) => n,
794 None => return Ok(false),
795 };
796 Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
798 }
799
800 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
802 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
803 Some(d) => d,
804 None => return Ok(false),
805 };
806 Ok(is_directory_node(&data))
807 }
808
809 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
811 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
812 Some(d) => d,
813 None => return Ok(None),
814 };
815
816 if !is_tree_node(&data) {
818 return Ok(Some(data));
819 }
820
821 let node = decode_tree_node(&data)?;
823 let assembled = self.assemble_chunks(&node).await?;
824 Ok(Some(assembled))
825 }
826
827 async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
829 let mut parts: Vec<Vec<u8>> = Vec::new();
830
831 for link in &node.links {
832 let child_data = self
833 .store
834 .get(&link.hash)
835 .await
836 .map_err(|e| HashTreeError::Store(e.to_string()))?
837 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
838
839 if is_tree_node(&child_data) {
840 let child_node = decode_tree_node(&child_data)?;
841 parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
842 } else {
843 parts.push(child_data);
844 }
845 }
846
847 let total_length: usize = parts.iter().map(|p| p.len()).sum();
849 let mut result = Vec::with_capacity(total_length);
850 for part in parts {
851 result.extend_from_slice(&part);
852 }
853
854 Ok(result)
855 }
856
857 pub fn read_file_stream(
860 &self,
861 hash: Hash,
862 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
863 Box::pin(stream::unfold(
864 ReadStreamState::Init { hash, tree: self },
865 |state| async move {
866 match state {
867 ReadStreamState::Init { hash, tree } => {
868 let data = match tree.store.get(&hash).await {
869 Ok(Some(d)) => d,
870 Ok(None) => return None,
871 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
872 };
873
874 if !is_tree_node(&data) {
875 return Some((Ok(data), ReadStreamState::Done));
877 }
878
879 let node = match decode_tree_node(&data) {
881 Ok(n) => n,
882 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
883 };
884
885 let mut stack: Vec<StreamStackItem> = Vec::new();
887 for link in node.links.into_iter().rev() {
888 stack.push(StreamStackItem::Hash(link.hash));
889 }
890
891 tree.process_stream_stack(&mut stack).await
893 }
894 ReadStreamState::Processing { mut stack, tree } => {
895 tree.process_stream_stack(&mut stack).await
896 }
897 ReadStreamState::Done => None,
898 }
899 },
900 ))
901 }
902
903 async fn process_stream_stack<'a>(
904 &'a self,
905 stack: &mut Vec<StreamStackItem>,
906 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
907 while let Some(item) = stack.pop() {
908 match item {
909 StreamStackItem::Hash(hash) => {
910 let data = match self.store.get(&hash).await {
911 Ok(Some(d)) => d,
912 Ok(None) => {
913 return Some((
914 Err(HashTreeError::MissingChunk(to_hex(&hash))),
915 ReadStreamState::Done,
916 ))
917 }
918 Err(e) => {
919 return Some((
920 Err(HashTreeError::Store(e.to_string())),
921 ReadStreamState::Done,
922 ))
923 }
924 };
925
926 if is_tree_node(&data) {
927 let node = match decode_tree_node(&data) {
929 Ok(n) => n,
930 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
931 };
932 for link in node.links.into_iter().rev() {
933 stack.push(StreamStackItem::Hash(link.hash));
934 }
935 } else {
936 return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
938 }
939 }
940 }
941 }
942 None
943 }
944
945 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
947 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
948 Some(d) => d,
949 None => return Ok(vec![]),
950 };
951
952 if !is_tree_node(&data) {
953 return Ok(vec![data]);
954 }
955
956 let node = decode_tree_node(&data)?;
957 self.collect_chunks(&node).await
958 }
959
960 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
961 let mut chunks = Vec::new();
962
963 for link in &node.links {
964 let child_data = self
965 .store
966 .get(&link.hash)
967 .await
968 .map_err(|e| HashTreeError::Store(e.to_string()))?
969 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
970
971 if is_tree_node(&child_data) {
972 let child_node = decode_tree_node(&child_data)?;
973 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
974 } else {
975 chunks.push(child_data);
976 }
977 }
978
979 Ok(chunks)
980 }
981
982 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
984 let node = match self.get_node(cid).await? {
985 Some(n) => n,
986 None => return Ok(vec![]),
987 };
988
989 let mut entries = Vec::new();
990
991 for link in &node.links {
992 if let Some(ref name) = link.name {
994 if name.starts_with("_chunk_") || name.starts_with('_') {
995 let chunk_cid = Cid { hash: link.hash, key: link.key, size: link.size };
996 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
997 entries.extend(sub_entries);
998 continue;
999 }
1000 }
1001
1002 entries.push(TreeEntry {
1003 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1004 hash: link.hash,
1005 size: link.size,
1006 link_type: link.link_type,
1007 key: link.key,
1008 meta: link.meta.clone(),
1009 });
1010 }
1011
1012 Ok(entries)
1013 }
1014
1015 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1017 let node = match self.get_node(cid).await? {
1018 Some(n) => n,
1019 None => return Ok(vec![]),
1020 };
1021
1022 let mut entries = Vec::new();
1023
1024 for link in &node.links {
1025 if let Some(ref name) = link.name {
1027 if name.starts_with("_chunk_") || name.starts_with('_') {
1028 let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1030 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1031 entries.extend(sub_entries);
1032 continue;
1033 }
1034 }
1035
1036 entries.push(TreeEntry {
1037 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1038 hash: link.hash,
1039 size: link.size,
1040 link_type: link.link_type,
1041 key: link.key,
1042 meta: link.meta.clone(),
1043 });
1044 }
1045
1046 Ok(entries)
1047 }
1048
1049 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1051 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1052 if parts.is_empty() {
1053 return Ok(Some(cid.clone()));
1054 }
1055
1056 let mut current_hash = cid.hash;
1057 let mut current_link: Option<Link> = None;
1058
1059 for part in parts {
1060 let node = match self.get_tree_node(¤t_hash).await? {
1061 Some(n) => n,
1062 None => return Ok(None),
1063 };
1064
1065 if let Some(link) = self.find_link(&node, part) {
1066 current_hash = link.hash;
1067 current_link = Some(link);
1068 } else {
1069 match self.find_link_in_subtrees(&node, part).await? {
1071 Some(link) => {
1072 current_hash = link.hash;
1073 current_link = Some(link);
1074 }
1075 None => return Ok(None),
1076 }
1077 }
1078 }
1079
1080 let size = current_link.as_ref().map(|l| l.size).unwrap_or(0);
1082 let key = current_link.and_then(|l| l.key);
1083 Ok(Some(Cid { hash: current_hash, key, size }))
1084 }
1085
1086 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1088 self.resolve(cid, path).await
1089 }
1090
1091 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1092 node.links
1093 .iter()
1094 .find(|l| l.name.as_deref() == Some(name))
1095 .cloned()
1096 }
1097
1098 async fn find_link_in_subtrees(&self, node: &TreeNode, name: &str) -> Result<Option<Link>, HashTreeError> {
1099 for link in &node.links {
1100 if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1101 continue;
1102 }
1103
1104 let sub_node = match self.get_tree_node(&link.hash).await? {
1105 Some(n) => n,
1106 None => continue,
1107 };
1108
1109 if let Some(found) = self.find_link(&sub_node, name) {
1110 return Ok(Some(found));
1111 }
1112
1113 if let Some(deep_found) = Box::pin(self.find_link_in_subtrees(&sub_node, name)).await? {
1114 return Ok(Some(deep_found));
1115 }
1116 }
1117
1118 Ok(None)
1119 }
1120
1121 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1123 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1124 Some(d) => d,
1125 None => return Ok(0),
1126 };
1127
1128 if !is_tree_node(&data) {
1129 return Ok(data.len() as u64);
1130 }
1131
1132 let node = decode_tree_node(&data)?;
1133 let mut total = 0u64;
1135 for link in &node.links {
1136 total += link.size;
1137 }
1138 Ok(total)
1139 }
1140
1141 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1143 let mut entries = Vec::new();
1144 self.walk_recursive(cid, path, &mut entries).await?;
1145 Ok(entries)
1146 }
1147
1148 async fn walk_recursive(
1149 &self,
1150 cid: &Cid,
1151 path: &str,
1152 entries: &mut Vec<WalkEntry>,
1153 ) -> Result<(), HashTreeError> {
1154 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1155 Some(d) => d,
1156 None => return Ok(()),
1157 };
1158
1159 let data = if let Some(key) = &cid.key {
1161 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1162 } else {
1163 data
1164 };
1165
1166 let node = match try_decode_tree_node(&data) {
1167 Some(n) => n,
1168 None => {
1169 entries.push(WalkEntry {
1170 path: path.to_string(),
1171 hash: cid.hash,
1172 link_type: LinkType::Blob,
1173 size: data.len() as u64,
1174 key: cid.key,
1175 });
1176 return Ok(());
1177 }
1178 };
1179
1180 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1181 entries.push(WalkEntry {
1182 path: path.to_string(),
1183 hash: cid.hash,
1184 link_type: node.node_type,
1185 size: node_size,
1186 key: cid.key,
1187 });
1188
1189 for link in &node.links {
1190 let child_path = match &link.name {
1191 Some(name) => {
1192 if name.starts_with("_chunk_") || name.starts_with('_') {
1193 let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1195 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1196 continue;
1197 }
1198 if path.is_empty() {
1199 name.clone()
1200 } else {
1201 format!("{}/{}", path, name)
1202 }
1203 }
1204 None => path.to_string(),
1205 };
1206
1207 let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1209 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1210 }
1211
1212 Ok(())
1213 }
1214
1215 pub fn walk_stream(
1217 &self,
1218 cid: Cid,
1219 initial_path: String,
1220 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1221 Box::pin(stream::unfold(
1222 WalkStreamState::Init { cid, path: initial_path, tree: self },
1223 |state| async move {
1224 match state {
1225 WalkStreamState::Init { cid, path, tree } => {
1226 let data = match tree.store.get(&cid.hash).await {
1227 Ok(Some(d)) => d,
1228 Ok(None) => return None,
1229 Err(e) => {
1230 return Some((
1231 Err(HashTreeError::Store(e.to_string())),
1232 WalkStreamState::Done,
1233 ))
1234 }
1235 };
1236
1237 let data = if let Some(key) = &cid.key {
1239 match decrypt_chk(&data, key) {
1240 Ok(d) => d,
1241 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1242 }
1243 } else {
1244 data
1245 };
1246
1247 let node = match try_decode_tree_node(&data) {
1248 Some(n) => n,
1249 None => {
1250 let entry = WalkEntry {
1252 path,
1253 hash: cid.hash,
1254 link_type: LinkType::Blob,
1255 size: data.len() as u64,
1256 key: cid.key,
1257 };
1258 return Some((Ok(entry), WalkStreamState::Done));
1259 }
1260 };
1261
1262 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1263 let entry = WalkEntry {
1264 path: path.clone(),
1265 hash: cid.hash,
1266 link_type: node.node_type,
1267 size: node_size,
1268 key: cid.key,
1269 };
1270
1271 let mut stack: Vec<WalkStackItem> = Vec::new();
1273 for link in node.links.into_iter().rev() {
1274 let child_path = match &link.name {
1275 Some(name) if !name.starts_with('_') => {
1276 if path.is_empty() {
1277 name.clone()
1278 } else {
1279 format!("{}/{}", path, name)
1280 }
1281 }
1282 _ => path.clone(),
1283 };
1284 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1286 }
1287
1288 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1289 }
1290 WalkStreamState::Processing { mut stack, tree } => {
1291 tree.process_walk_stack(&mut stack).await
1292 }
1293 WalkStreamState::Done => None,
1294 }
1295 },
1296 ))
1297 }
1298
1299 async fn process_walk_stack<'a>(
1300 &'a self,
1301 stack: &mut Vec<WalkStackItem>,
1302 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1303 while let Some(item) = stack.pop() {
1304 let data = match self.store.get(&item.hash).await {
1305 Ok(Some(d)) => d,
1306 Ok(None) => continue,
1307 Err(e) => {
1308 return Some((
1309 Err(HashTreeError::Store(e.to_string())),
1310 WalkStreamState::Done,
1311 ))
1312 }
1313 };
1314
1315 let node = match try_decode_tree_node(&data) {
1316 Some(n) => n,
1317 None => {
1318 let entry = WalkEntry {
1320 path: item.path,
1321 hash: item.hash,
1322 link_type: LinkType::Blob,
1323 size: data.len() as u64,
1324 key: item.key,
1325 };
1326 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1327 }
1328 };
1329
1330 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1331 let entry = WalkEntry {
1332 path: item.path.clone(),
1333 hash: item.hash,
1334 link_type: node.node_type,
1335 size: node_size,
1336 key: None, };
1338
1339 for link in node.links.into_iter().rev() {
1341 let child_path = match &link.name {
1342 Some(name) if !name.starts_with('_') => {
1343 if item.path.is_empty() {
1344 name.clone()
1345 } else {
1346 format!("{}/{}", item.path, name)
1347 }
1348 }
1349 _ => item.path.clone(),
1350 };
1351 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1352 }
1353
1354 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1355 }
1356 None
1357 }
1358
1359 pub async fn set_entry(
1364 &self,
1365 root: &Cid,
1366 path: &[&str],
1367 name: &str,
1368 entry_cid: &Cid,
1369 link_type: LinkType,
1370 ) -> Result<Cid, HashTreeError> {
1371 let dir_cid = self.resolve_path_array(root, path).await?;
1372 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1373
1374 let entries = self.list_directory(&dir_cid).await?;
1375 let mut new_entries: Vec<DirEntry> = entries
1376 .into_iter()
1377 .filter(|e| e.name != name)
1378 .map(|e| DirEntry {
1379 name: e.name,
1380 hash: e.hash,
1381 size: e.size,
1382 key: e.key,
1383 link_type: e.link_type,
1384 meta: e.meta,
1385 })
1386 .collect();
1387
1388 new_entries.push(DirEntry {
1389 name: name.to_string(),
1390 hash: entry_cid.hash,
1391 size: entry_cid.size,
1392 key: entry_cid.key,
1393 link_type,
1394 meta: None,
1395 });
1396
1397 let new_dir_cid = self.put_directory(new_entries).await?;
1398 self.rebuild_path(root, path, new_dir_cid).await
1399 }
1400
1401 pub async fn remove_entry(
1404 &self,
1405 root: &Cid,
1406 path: &[&str],
1407 name: &str,
1408 ) -> Result<Cid, HashTreeError> {
1409 let dir_cid = self.resolve_path_array(root, path).await?;
1410 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1411
1412 let entries = self.list_directory(&dir_cid).await?;
1413 let new_entries: Vec<DirEntry> = entries
1414 .into_iter()
1415 .filter(|e| e.name != name)
1416 .map(|e| DirEntry {
1417 name: e.name,
1418 hash: e.hash,
1419 size: e.size,
1420 key: e.key,
1421 link_type: e.link_type,
1422 meta: e.meta,
1423 })
1424 .collect();
1425
1426 let new_dir_cid = self.put_directory(new_entries).await?;
1427 self.rebuild_path(root, path, new_dir_cid).await
1428 }
1429
1430 pub async fn rename_entry(
1433 &self,
1434 root: &Cid,
1435 path: &[&str],
1436 old_name: &str,
1437 new_name: &str,
1438 ) -> Result<Cid, HashTreeError> {
1439 if old_name == new_name {
1440 return Ok(root.clone());
1441 }
1442
1443 let dir_cid = self.resolve_path_array(root, path).await?;
1444 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1445
1446 let entries = self.list_directory(&dir_cid).await?;
1447 let entry = entries
1448 .iter()
1449 .find(|e| e.name == old_name)
1450 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1451
1452 let entry_hash = entry.hash;
1453 let entry_size = entry.size;
1454 let entry_key = entry.key;
1455 let entry_link_type = entry.link_type;
1456 let entry_meta = entry.meta.clone();
1457
1458 let new_entries: Vec<DirEntry> = entries
1459 .into_iter()
1460 .filter(|e| e.name != old_name)
1461 .map(|e| DirEntry {
1462 name: e.name,
1463 hash: e.hash,
1464 size: e.size,
1465 key: e.key,
1466 link_type: e.link_type,
1467 meta: e.meta,
1468 })
1469 .chain(std::iter::once(DirEntry {
1470 name: new_name.to_string(),
1471 hash: entry_hash,
1472 size: entry_size,
1473 key: entry_key,
1474 link_type: entry_link_type,
1475 meta: entry_meta,
1476 }))
1477 .collect();
1478
1479 let new_dir_cid = self.put_directory(new_entries).await?;
1480 self.rebuild_path(root, path, new_dir_cid).await
1481 }
1482
1483 pub async fn move_entry(
1486 &self,
1487 root: &Cid,
1488 source_path: &[&str],
1489 name: &str,
1490 target_path: &[&str],
1491 ) -> Result<Cid, HashTreeError> {
1492 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1493 let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1494
1495 let source_entries = self.list_directory(&source_dir_cid).await?;
1496 let entry = source_entries
1497 .iter()
1498 .find(|e| e.name == name)
1499 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1500
1501 let entry_cid = Cid {
1502 hash: entry.hash,
1503 key: entry.key,
1504 size: entry.size,
1505 };
1506 let entry_link_type = entry.link_type;
1507
1508 let new_root = self.remove_entry(root, source_path, name).await?;
1510
1511 self.set_entry(&new_root, target_path, name, &entry_cid, entry_link_type).await
1513 }
1514
1515 async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1516 if path.is_empty() {
1517 return Ok(Some(root.clone()));
1518 }
1519 self.resolve_path(root, &path.join("/")).await
1520 }
1521
1522 async fn rebuild_path(
1523 &self,
1524 root: &Cid,
1525 path: &[&str],
1526 new_child: Cid,
1527 ) -> Result<Cid, HashTreeError> {
1528 if path.is_empty() {
1529 return Ok(new_child);
1530 }
1531
1532 let mut child_cid = new_child;
1533 let parts: Vec<&str> = path.to_vec();
1534
1535 for i in (0..parts.len()).rev() {
1536 let child_name = parts[i];
1537 let parent_path = &parts[..i];
1538
1539 let parent_cid = if parent_path.is_empty() {
1540 root.clone()
1541 } else {
1542 self.resolve_path_array(root, parent_path)
1543 .await?
1544 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1545 };
1546
1547 let parent_entries = self.list_directory(&parent_cid).await?;
1548 let new_parent_entries: Vec<DirEntry> = parent_entries
1549 .into_iter()
1550 .map(|e| {
1551 if e.name == child_name {
1552 DirEntry {
1553 name: e.name,
1554 hash: child_cid.hash,
1555 size: child_cid.size,
1556 key: child_cid.key,
1557 link_type: e.link_type,
1558 meta: e.meta,
1559 }
1560 } else {
1561 DirEntry {
1562 name: e.name,
1563 hash: e.hash,
1564 size: e.size,
1565 key: e.key,
1566 link_type: e.link_type,
1567 meta: e.meta,
1568 }
1569 }
1570 })
1571 .collect();
1572
1573 child_cid = self.put_directory(new_parent_entries).await?;
1574 }
1575
1576 Ok(child_cid)
1577 }
1578
1579 pub fn get_store(&self) -> Arc<S> {
1583 self.store.clone()
1584 }
1585
1586 pub fn chunk_size(&self) -> usize {
1588 self.chunk_size
1589 }
1590
1591 pub fn max_links(&self) -> usize {
1593 self.max_links
1594 }
1595}
1596
1597enum StreamStackItem {
1600 Hash(Hash),
1601}
1602
1603enum ReadStreamState<'a, S: Store> {
1604 Init { hash: Hash, tree: &'a HashTree<S> },
1605 Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1606 Done,
1607}
1608
1609struct WalkStackItem {
1610 hash: Hash,
1611 path: String,
1612 key: Option<[u8; 32]>,
1613}
1614
1615enum WalkStreamState<'a, S: Store> {
1616 Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1617 Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1618 Done,
1619}
1620
1621struct EncryptedStackItem {
1623 hash: Hash,
1624 key: Option<[u8; 32]>,
1625}
1626
1627enum EncryptedStreamState<'a, S: Store> {
1628 Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1629 Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1630 Done,
1631}
1632
1633pub async fn verify_tree<S: Store>(
1635 store: Arc<S>,
1636 root_hash: &Hash,
1637) -> Result<crate::reader::VerifyResult, HashTreeError> {
1638 let mut missing = Vec::new();
1639 let mut visited = std::collections::HashSet::new();
1640
1641 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1642
1643 Ok(crate::reader::VerifyResult {
1644 valid: missing.is_empty(),
1645 missing,
1646 })
1647}
1648
1649async fn verify_recursive<S: Store>(
1650 store: Arc<S>,
1651 hash: &Hash,
1652 missing: &mut Vec<Hash>,
1653 visited: &mut std::collections::HashSet<String>,
1654) -> Result<(), HashTreeError> {
1655 let hex = to_hex(hash);
1656 if visited.contains(&hex) {
1657 return Ok(());
1658 }
1659 visited.insert(hex);
1660
1661 let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1662 Some(d) => d,
1663 None => {
1664 missing.push(*hash);
1665 return Ok(());
1666 }
1667 };
1668
1669 if is_tree_node(&data) {
1670 let node = decode_tree_node(&data)?;
1671 for link in &node.links {
1672 Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1673 }
1674 }
1675
1676 Ok(())
1677}
1678
1679#[cfg(test)]
1680mod tests {
1681 use super::*;
1682 use crate::store::MemoryStore;
1683
1684 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1685 let store = Arc::new(MemoryStore::new());
1686 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1688 (store, tree)
1689 }
1690
1691 #[tokio::test]
1692 async fn test_put_and_read_blob() {
1693 let (_store, tree) = make_tree();
1694
1695 let data = vec![1, 2, 3, 4, 5];
1696 let hash = tree.put_blob(&data).await.unwrap();
1697
1698 let result = tree.get_blob(&hash).await.unwrap();
1699 assert_eq!(result, Some(data));
1700 }
1701
1702 #[tokio::test]
1703 async fn test_put_and_read_file_small() {
1704 let (_store, tree) = make_tree();
1705
1706 let data = b"Hello, World!";
1707 let result = tree.put_file(data).await.unwrap();
1708
1709 assert_eq!(result.size, data.len() as u64);
1710
1711 let read_data = tree.read_file(&result.hash).await.unwrap();
1712 assert_eq!(read_data, Some(data.to_vec()));
1713 }
1714
1715 #[tokio::test]
1716 async fn test_put_and_read_directory() {
1717 let (_store, tree) = make_tree();
1718
1719 let file1 = tree.put_blob(b"content1").await.unwrap();
1720 let file2 = tree.put_blob(b"content2").await.unwrap();
1721
1722 let dir_cid = tree
1723 .put_directory(
1724 vec![
1725 DirEntry::new("a.txt", file1).with_size(8),
1726 DirEntry::new("b.txt", file2).with_size(8),
1727 ],
1728 )
1729 .await
1730 .unwrap();
1731
1732 let entries = tree.list_directory(&dir_cid).await.unwrap();
1733 assert_eq!(entries.len(), 2);
1734 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1735 assert!(names.contains(&"a.txt"));
1736 assert!(names.contains(&"b.txt"));
1737 }
1738
1739 #[tokio::test]
1740 async fn test_is_directory() {
1741 let (_store, tree) = make_tree();
1742
1743 let file_hash = tree.put_blob(b"data").await.unwrap();
1744 let dir_cid = tree.put_directory(vec![]).await.unwrap();
1745
1746 assert!(!tree.is_directory(&file_hash).await.unwrap());
1747 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1748 }
1749
1750 #[tokio::test]
1751 async fn test_resolve_path() {
1752 let (_store, tree) = make_tree();
1753
1754 let file_hash = tree.put_blob(b"nested").await.unwrap();
1755 let sub_dir = tree.put_directory(
1756 vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1757 ).await.unwrap();
1758 let root_dir = tree.put_directory(
1759 vec![DirEntry::new("subdir", sub_dir.hash)],
1760 ).await.unwrap();
1761
1762 let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1763 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1764 }
1765
1766 #[tokio::test]
1769 async fn test_unified_put_get_public() {
1770 let store = Arc::new(MemoryStore::new());
1771 let tree = HashTree::new(HashTreeConfig::new(store).public());
1773
1774 let data = b"Hello, public world!";
1775 let cid = tree.put(data).await.unwrap();
1776
1777 assert_eq!(cid.size, data.len() as u64);
1778 assert!(cid.key.is_none()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1781 assert_eq!(retrieved, data);
1782 }
1783
1784 #[tokio::test]
1785 async fn test_unified_put_get_encrypted() {
1786 let store = Arc::new(MemoryStore::new());
1787 let tree = HashTree::new(HashTreeConfig::new(store));
1789
1790 let data = b"Hello, encrypted world!";
1791 let cid = tree.put(data).await.unwrap();
1792
1793 assert_eq!(cid.size, data.len() as u64);
1794 assert!(cid.key.is_some()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1797 assert_eq!(retrieved, data);
1798 }
1799
1800 #[tokio::test]
1801 async fn test_unified_put_get_encrypted_chunked() {
1802 let store = Arc::new(MemoryStore::new());
1803 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
1804
1805 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1807 let cid = tree.put(&data).await.unwrap();
1808
1809 assert_eq!(cid.size, data.len() as u64);
1810 assert!(cid.key.is_some());
1811
1812 let retrieved = tree.get(&cid).await.unwrap().unwrap();
1813 assert_eq!(retrieved, data);
1814 }
1815
1816 #[tokio::test]
1817 async fn test_cid_deterministic() {
1818 let store = Arc::new(MemoryStore::new());
1819 let tree = HashTree::new(HashTreeConfig::new(store));
1820
1821 let data = b"Same content produces same CID";
1822
1823 let cid1 = tree.put(data).await.unwrap();
1824 let cid2 = tree.put(data).await.unwrap();
1825
1826 assert_eq!(cid1.hash, cid2.hash);
1828 assert_eq!(cid1.key, cid2.key);
1829 assert_eq!(cid1.to_string(), cid2.to_string());
1830 }
1831
1832 #[tokio::test]
1833 async fn test_cid_to_string_public() {
1834 let store = Arc::new(MemoryStore::new());
1835 let tree = HashTree::new(HashTreeConfig::new(store).public());
1836
1837 let cid = tree.put(b"test").await.unwrap();
1838 let s = cid.to_string();
1839
1840 assert_eq!(s.len(), 64);
1842 assert!(!s.contains(':'));
1843 }
1844
1845 #[tokio::test]
1846 async fn test_cid_to_string_encrypted() {
1847 let store = Arc::new(MemoryStore::new());
1848 let tree = HashTree::new(HashTreeConfig::new(store));
1849
1850 let cid = tree.put(b"test").await.unwrap();
1851 let s = cid.to_string();
1852
1853 assert_eq!(s.len(), 129);
1855 assert!(s.contains(':'));
1856 }
1857}