1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25 pub store: Arc<S>,
26 pub chunk_size: usize,
27 pub max_links: usize,
28 pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33 pub fn new(store: Arc<S>) -> Self {
34 Self {
35 store,
36 chunk_size: DEFAULT_CHUNK_SIZE,
37 max_links: DEFAULT_MAX_LINKS,
38 encrypted: true,
39 }
40 }
41
42 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43 self.chunk_size = chunk_size;
44 self
45 }
46
47 pub fn with_max_links(mut self, max_links: usize) -> Self {
48 self.max_links = max_links;
49 self
50 }
51
52 pub fn public(mut self) -> Self {
54 self.encrypted = false;
55 self
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct PutFileResult {
62 pub hash: Hash,
63 pub size: u64,
64}
65
66#[derive(Debug, thiserror::Error)]
68pub enum HashTreeError {
69 #[error("Store error: {0}")]
70 Store(String),
71 #[error("Codec error: {0}")]
72 Codec(#[from] crate::codec::CodecError),
73 #[error("Missing chunk: {0}")]
74 MissingChunk(String),
75 #[error("Path not found: {0}")]
76 PathNotFound(String),
77 #[error("Entry not found: {0}")]
78 EntryNotFound(String),
79 #[error("Encryption error: {0}")]
80 Encryption(String),
81 #[error("Decryption error: {0}")]
82 Decryption(String),
83}
84
85impl From<BuilderError> for HashTreeError {
86 fn from(e: BuilderError) -> Self {
87 match e {
88 BuilderError::Store(s) => HashTreeError::Store(s),
89 BuilderError::Codec(c) => HashTreeError::Codec(c),
90 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
91 }
92 }
93}
94
95impl From<ReaderError> for HashTreeError {
96 fn from(e: ReaderError) -> Self {
97 match e {
98 ReaderError::Store(s) => HashTreeError::Store(s),
99 ReaderError::Codec(c) => HashTreeError::Codec(c),
100 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
101 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
102 ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
103 }
104 }
105}
106
107pub struct HashTree<S: Store> {
109 store: Arc<S>,
110 chunk_size: usize,
111 max_links: usize,
112 encrypted: bool,
113}
114
115impl<S: Store> HashTree<S> {
116 pub fn new(config: HashTreeConfig<S>) -> Self {
117 Self {
118 store: config.store,
119 chunk_size: config.chunk_size,
120 max_links: config.max_links,
121 encrypted: config.encrypted,
122 }
123 }
124
125 pub fn is_encrypted(&self) -> bool {
127 self.encrypted
128 }
129
130 pub async fn put(&self, data: &[u8]) -> Result<Cid, HashTreeError> {
135 let size = data.len() as u64;
136
137 if data.len() <= self.chunk_size {
139 let (hash, key) = self.put_chunk_internal(data).await?;
140 return Ok(Cid { hash, key, size });
141 }
142
143 let mut links: Vec<Link> = Vec::new();
145 let mut offset = 0;
146
147 while offset < data.len() {
148 let end = (offset + self.chunk_size).min(data.len());
149 let chunk = &data[offset..end];
150 let chunk_size = chunk.len() as u64;
151 let (hash, key) = self.put_chunk_internal(chunk).await?;
152 links.push(Link {
153 hash,
154 name: None,
155 size: chunk_size,
156 key,
157 link_type: LinkType::Blob, meta: None,
159 });
160 offset = end;
161 }
162
163 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
165 Ok(Cid { hash: root_hash, key: root_key, size })
166 }
167
168 pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
170 if let Some(key) = cid.key {
171 self.get_encrypted(&cid.hash, &key).await
172 } else {
173 self.read_file(&cid.hash).await
174 }
175 }
176
177 pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<Cid, HashTreeError> {
182 let mut buffer = vec![0u8; self.chunk_size];
183 let mut links = Vec::new();
184 let mut total_size: u64 = 0;
185 let mut consistent_key: Option<[u8; 32]> = None;
186
187 loop {
188 let mut chunk = Vec::new();
189 let mut bytes_read = 0;
190
191 while bytes_read < self.chunk_size {
193 let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
194 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
195 if n == 0 {
196 break; }
198 chunk.extend_from_slice(&buffer[..n]);
199 bytes_read += n;
200 }
201
202 if chunk.is_empty() {
203 break; }
205
206 let chunk_len = chunk.len() as u64;
207 total_size += chunk_len;
208
209 let (hash, key) = self.put_chunk_internal(&chunk).await?;
210
211 if links.is_empty() {
213 consistent_key = key;
214 } else if consistent_key != key {
215 consistent_key = None;
216 }
217
218 links.push(Link {
219 hash,
220 name: None,
221 size: chunk_len,
222 key,
223 link_type: LinkType::Blob, meta: None,
225 });
226 }
227
228 if links.is_empty() {
229 let (hash, key) = self.put_chunk_internal(&[]).await?;
231 return Ok(Cid { hash, key, size: 0 });
232 }
233
234 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
236 Ok(Cid { hash: root_hash, key: root_key, size: total_size })
237 }
238
239 pub fn get_stream(
244 &self,
245 cid: &Cid,
246 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
247 let hash = cid.hash;
248 let key = cid.key;
249
250 if let Some(k) = key {
251 Box::pin(self.read_file_stream_encrypted(hash, k))
253 } else {
254 self.read_file_stream(hash)
256 }
257 }
258
259 fn read_file_stream_encrypted(
261 &self,
262 hash: Hash,
263 key: EncryptionKey,
264 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
265 stream::unfold(
266 EncryptedStreamState::Init { hash, key, tree: self },
267 |state| async move {
268 match state {
269 EncryptedStreamState::Init { hash, key, tree } => {
270 let data = match tree.store.get(&hash).await {
271 Ok(Some(d)) => d,
272 Ok(None) => return None,
273 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
274 };
275
276 let decrypted = match decrypt_chk(&data, &key) {
278 Ok(d) => d,
279 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
280 };
281
282 if !is_tree_node(&decrypted) {
283 return Some((Ok(decrypted), EncryptedStreamState::Done));
285 }
286
287 let node = match decode_tree_node(&decrypted) {
289 Ok(n) => n,
290 Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
291 };
292
293 let mut stack: Vec<EncryptedStackItem> = Vec::new();
294 for link in node.links.into_iter().rev() {
295 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
296 }
297
298 tree.process_encrypted_stream_stack(&mut stack).await
299 }
300 EncryptedStreamState::Processing { mut stack, tree } => {
301 tree.process_encrypted_stream_stack(&mut stack).await
302 }
303 EncryptedStreamState::Done => None,
304 }
305 },
306 )
307 }
308
309 async fn process_encrypted_stream_stack<'a>(
310 &'a self,
311 stack: &mut Vec<EncryptedStackItem>,
312 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
313 while let Some(item) = stack.pop() {
314 let data = match self.store.get(&item.hash).await {
315 Ok(Some(d)) => d,
316 Ok(None) => {
317 return Some((
318 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
319 EncryptedStreamState::Done,
320 ))
321 }
322 Err(e) => {
323 return Some((
324 Err(HashTreeError::Store(e.to_string())),
325 EncryptedStreamState::Done,
326 ))
327 }
328 };
329
330 let decrypted = if let Some(key) = item.key {
332 match decrypt_chk(&data, &key) {
333 Ok(d) => d,
334 Err(e) => {
335 return Some((
336 Err(HashTreeError::Decryption(e.to_string())),
337 EncryptedStreamState::Done,
338 ))
339 }
340 }
341 } else {
342 data
343 };
344
345 if is_tree_node(&decrypted) {
346 let node = match decode_tree_node(&decrypted) {
348 Ok(n) => n,
349 Err(e) => {
350 return Some((
351 Err(HashTreeError::Codec(e)),
352 EncryptedStreamState::Done,
353 ))
354 }
355 };
356 for link in node.links.into_iter().rev() {
357 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
358 }
359 } else {
360 return Some((
362 Ok(decrypted),
363 EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
364 ));
365 }
366 }
367 None
368 }
369
370 async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
372 if self.encrypted {
373 let (encrypted, key) = encrypt_chk(data)
374 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
375 let hash = sha256(&encrypted);
376 self.store
377 .put(hash, encrypted)
378 .await
379 .map_err(|e| HashTreeError::Store(e.to_string()))?;
380 Ok((hash, Some(key)))
381 } else {
382 let hash = self.put_blob(data).await?;
383 Ok((hash, None))
384 }
385 }
386
387 async fn build_tree_internal(
389 &self,
390 links: Vec<Link>,
391 total_size: Option<u64>,
392 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
393 if links.len() == 1 {
395 if let Some(ts) = total_size {
396 if links[0].size == ts {
397 return Ok((links[0].hash, links[0].key));
398 }
399 }
400 }
401
402 if links.len() <= self.max_links {
403 let node = TreeNode {
404 node_type: LinkType::File,
405 links,
406 };
407 let (data, _) = encode_and_hash(&node)?;
408
409 if self.encrypted {
410 let (encrypted, key) = encrypt_chk(&data)
411 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
412 let hash = sha256(&encrypted);
413 self.store
414 .put(hash, encrypted)
415 .await
416 .map_err(|e| HashTreeError::Store(e.to_string()))?;
417 return Ok((hash, Some(key)));
418 }
419
420 let hash = sha256(&data);
422 self.store
423 .put(hash, data)
424 .await
425 .map_err(|e| HashTreeError::Store(e.to_string()))?;
426 return Ok((hash, None));
427 }
428
429 let mut sub_links = Vec::new();
431 for batch in links.chunks(self.max_links) {
432 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
433 let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
434 sub_links.push(Link {
435 hash,
436 name: None,
437 size: batch_size,
438 key,
439 link_type: LinkType::File, meta: None,
441 });
442 }
443
444 Box::pin(self.build_tree_internal(sub_links, total_size)).await
445 }
446
447 async fn get_encrypted(
449 &self,
450 hash: &Hash,
451 key: &EncryptionKey,
452 ) -> Result<Option<Vec<u8>>, HashTreeError> {
453 let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
454 Some(d) => d,
455 None => return Ok(None),
456 };
457
458 let decrypted = decrypt_chk(&encrypted_data, key)
460 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
461
462 if is_tree_node(&decrypted) {
464 let node = decode_tree_node(&decrypted)?;
465 let assembled = self.assemble_encrypted_chunks(&node).await?;
466 return Ok(Some(assembled));
467 }
468
469 Ok(Some(decrypted))
471 }
472
473 async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
475 let mut parts: Vec<Vec<u8>> = Vec::new();
476
477 for link in &node.links {
478 let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
479
480 let encrypted_child = self
481 .store
482 .get(&link.hash)
483 .await
484 .map_err(|e| HashTreeError::Store(e.to_string()))?
485 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
486
487 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
488 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
489
490 if is_tree_node(&decrypted) {
491 let child_node = decode_tree_node(&decrypted)?;
493 let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
494 parts.push(child_data);
495 } else {
496 parts.push(decrypted);
498 }
499 }
500
501 let total_len: usize = parts.iter().map(|p| p.len()).sum();
502 let mut result = Vec::with_capacity(total_len);
503 for part in parts {
504 result.extend_from_slice(&part);
505 }
506
507 Ok(result)
508 }
509
510 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
515 let hash = sha256(data);
516 self.store
517 .put(hash, data.to_vec())
518 .await
519 .map_err(|e| HashTreeError::Store(e.to_string()))?;
520 Ok(hash)
521 }
522
523 pub async fn put_file(&self, data: &[u8]) -> Result<PutFileResult, HashTreeError> {
526 let size = data.len() as u64;
527
528 if data.len() <= self.chunk_size {
530 let hash = self.put_blob(data).await?;
531 return Ok(PutFileResult { hash, size });
532 }
533
534 let mut chunk_hashes: Vec<Hash> = Vec::new();
536 let mut offset = 0;
537
538 while offset < data.len() {
539 let end = (offset + self.chunk_size).min(data.len());
540 let chunk = &data[offset..end];
541 let hash = self.put_blob(chunk).await?;
542 chunk_hashes.push(hash);
543 offset = end;
544 }
545
546 let chunks: Vec<Link> = chunk_hashes
548 .iter()
549 .enumerate()
550 .map(|(i, &hash)| {
551 let chunk_size = if i < chunk_hashes.len() - 1 {
552 self.chunk_size as u64
553 } else {
554 (data.len() - i * self.chunk_size) as u64
555 };
556 Link {
557 hash,
558 name: None,
559 size: chunk_size,
560 key: None,
561 link_type: LinkType::Blob, meta: None,
563 }
564 })
565 .collect();
566
567 let root_hash = self.build_tree(chunks, Some(size)).await?;
568 Ok(PutFileResult { hash: root_hash, size })
569 }
570
571 pub async fn put_directory(
574 &self,
575 entries: Vec<DirEntry>,
576 ) -> Result<Cid, HashTreeError> {
577 let mut sorted = entries;
579 sorted.sort_by(|a, b| a.name.cmp(&b.name));
580
581 let links: Vec<Link> = sorted
582 .into_iter()
583 .map(|e| Link {
584 hash: e.hash,
585 name: Some(e.name),
586 size: e.size,
587 key: e.key,
588 link_type: e.link_type,
589 meta: e.meta,
590 })
591 .collect();
592
593 let total_size: u64 = links.iter().map(|l| l.size).sum();
594
595 if links.len() <= self.max_links {
597 let node = TreeNode {
598 node_type: LinkType::Dir,
599 links,
600 };
601 let (data, _plain_hash) = encode_and_hash(&node)?;
602
603 let (hash, key) = self.put_chunk_internal(&data).await?;
605
606 return Ok(Cid { hash, key, size: total_size });
607 }
608
609 self.build_directory_by_chunks(links, total_size).await
611 }
612
613 async fn build_tree(&self, links: Vec<Link>, total_size: Option<u64>) -> Result<Hash, HashTreeError> {
615 if links.len() == 1 {
617 if let Some(ts) = total_size {
618 if links[0].size == ts {
619 return Ok(links[0].hash);
620 }
621 }
622 }
623
624 if links.len() <= self.max_links {
626 let node = TreeNode {
627 node_type: LinkType::File,
628 links,
629 };
630 let (data, hash) = encode_and_hash(&node)?;
631 self.store
632 .put(hash, data)
633 .await
634 .map_err(|e| HashTreeError::Store(e.to_string()))?;
635 return Ok(hash);
636 }
637
638 let mut sub_trees: Vec<Link> = Vec::new();
640
641 for batch in links.chunks(self.max_links) {
642 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
643
644 let node = TreeNode {
645 node_type: LinkType::File,
646 links: batch.to_vec(),
647 };
648 let (data, hash) = encode_and_hash(&node)?;
649 self.store
650 .put(hash, data)
651 .await
652 .map_err(|e| HashTreeError::Store(e.to_string()))?;
653
654 sub_trees.push(Link {
655 hash,
656 name: None,
657 size: batch_size,
658 key: None,
659 link_type: LinkType::File, meta: None,
661 });
662 }
663
664 Box::pin(self.build_tree(sub_trees, total_size)).await
666 }
667
668 async fn build_directory_by_chunks(
671 &self,
672 links: Vec<Link>,
673 total_size: u64,
674 ) -> Result<Cid, HashTreeError> {
675 let mut sub_trees: Vec<Link> = Vec::new();
676
677 for (i, batch) in links.chunks(self.max_links).enumerate() {
678 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
679
680 let node = TreeNode {
681 node_type: LinkType::Dir,
682 links: batch.to_vec(),
683 };
684 let (data, _plain_hash) = encode_and_hash(&node)?;
685
686 let (hash, key) = self.put_chunk_internal(&data).await?;
688
689 sub_trees.push(Link {
690 hash,
691 name: Some(format!("_chunk_{}", i * self.max_links)),
692 size: batch_size,
693 key,
694 link_type: LinkType::Dir, meta: None,
696 });
697 }
698
699 if sub_trees.len() <= self.max_links {
700 let node = TreeNode {
701 node_type: LinkType::Dir,
702 links: sub_trees,
703 };
704 let (data, _plain_hash) = encode_and_hash(&node)?;
705
706 let (hash, key) = self.put_chunk_internal(&data).await?;
708 return Ok(Cid { hash, key, size: total_size });
709 }
710
711 Box::pin(self.build_directory_by_chunks(sub_trees, total_size)).await
713 }
714
715 pub async fn put_tree_node(
717 &self,
718 links: Vec<Link>,
719 ) -> Result<Hash, HashTreeError> {
720 let node = TreeNode {
721 node_type: LinkType::Dir,
722 links,
723 };
724
725 let (data, hash) = encode_and_hash(&node)?;
726 self.store
727 .put(hash, data)
728 .await
729 .map_err(|e| HashTreeError::Store(e.to_string()))?;
730 Ok(hash)
731 }
732
733 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
737 self.store
738 .get(hash)
739 .await
740 .map_err(|e| HashTreeError::Store(e.to_string()))
741 }
742
743 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
745 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
746 Some(d) => d,
747 None => return Ok(None),
748 };
749
750 if !is_tree_node(&data) {
751 return Ok(None);
752 }
753
754 let node = decode_tree_node(&data)?;
755 Ok(Some(node))
756 }
757
758 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
760 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
761 Some(d) => d,
762 None => return Ok(None),
763 };
764
765 let decrypted = if let Some(key) = &cid.key {
767 decrypt_chk(&data, key)
768 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
769 } else {
770 data
771 };
772
773 if !is_tree_node(&decrypted) {
774 return Ok(None);
775 }
776
777 let node = decode_tree_node(&decrypted)?;
778 Ok(Some(node))
779 }
780
781 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
783 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
784 Some(d) => d,
785 None => return Ok(false),
786 };
787 Ok(is_tree_node(&data))
788 }
789
790 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
792 let node = match self.get_node(cid).await? {
793 Some(n) => n,
794 None => return Ok(false),
795 };
796 Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
798 }
799
800 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
802 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
803 Some(d) => d,
804 None => return Ok(false),
805 };
806 Ok(is_directory_node(&data))
807 }
808
809 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
811 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
812 Some(d) => d,
813 None => return Ok(None),
814 };
815
816 if !is_tree_node(&data) {
818 return Ok(Some(data));
819 }
820
821 let node = decode_tree_node(&data)?;
823 let assembled = self.assemble_chunks(&node).await?;
824 Ok(Some(assembled))
825 }
826
827 async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
829 let mut parts: Vec<Vec<u8>> = Vec::new();
830
831 for link in &node.links {
832 let child_data = self
833 .store
834 .get(&link.hash)
835 .await
836 .map_err(|e| HashTreeError::Store(e.to_string()))?
837 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
838
839 if is_tree_node(&child_data) {
840 let child_node = decode_tree_node(&child_data)?;
841 parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
842 } else {
843 parts.push(child_data);
844 }
845 }
846
847 let total_length: usize = parts.iter().map(|p| p.len()).sum();
849 let mut result = Vec::with_capacity(total_length);
850 for part in parts {
851 result.extend_from_slice(&part);
852 }
853
854 Ok(result)
855 }
856
857 pub fn read_file_stream(
860 &self,
861 hash: Hash,
862 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
863 Box::pin(stream::unfold(
864 ReadStreamState::Init { hash, tree: self },
865 |state| async move {
866 match state {
867 ReadStreamState::Init { hash, tree } => {
868 let data = match tree.store.get(&hash).await {
869 Ok(Some(d)) => d,
870 Ok(None) => return None,
871 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
872 };
873
874 if !is_tree_node(&data) {
875 return Some((Ok(data), ReadStreamState::Done));
877 }
878
879 let node = match decode_tree_node(&data) {
881 Ok(n) => n,
882 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
883 };
884
885 let mut stack: Vec<StreamStackItem> = Vec::new();
887 for link in node.links.into_iter().rev() {
888 stack.push(StreamStackItem::Hash(link.hash));
889 }
890
891 tree.process_stream_stack(&mut stack).await
893 }
894 ReadStreamState::Processing { mut stack, tree } => {
895 tree.process_stream_stack(&mut stack).await
896 }
897 ReadStreamState::Done => None,
898 }
899 },
900 ))
901 }
902
903 async fn process_stream_stack<'a>(
904 &'a self,
905 stack: &mut Vec<StreamStackItem>,
906 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
907 while let Some(item) = stack.pop() {
908 match item {
909 StreamStackItem::Hash(hash) => {
910 let data = match self.store.get(&hash).await {
911 Ok(Some(d)) => d,
912 Ok(None) => {
913 return Some((
914 Err(HashTreeError::MissingChunk(to_hex(&hash))),
915 ReadStreamState::Done,
916 ))
917 }
918 Err(e) => {
919 return Some((
920 Err(HashTreeError::Store(e.to_string())),
921 ReadStreamState::Done,
922 ))
923 }
924 };
925
926 if is_tree_node(&data) {
927 let node = match decode_tree_node(&data) {
929 Ok(n) => n,
930 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
931 };
932 for link in node.links.into_iter().rev() {
933 stack.push(StreamStackItem::Hash(link.hash));
934 }
935 } else {
936 return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
938 }
939 }
940 }
941 }
942 None
943 }
944
945 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
947 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
948 Some(d) => d,
949 None => return Ok(vec![]),
950 };
951
952 if !is_tree_node(&data) {
953 return Ok(vec![data]);
954 }
955
956 let node = decode_tree_node(&data)?;
957 self.collect_chunks(&node).await
958 }
959
960 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
961 let mut chunks = Vec::new();
962
963 for link in &node.links {
964 let child_data = self
965 .store
966 .get(&link.hash)
967 .await
968 .map_err(|e| HashTreeError::Store(e.to_string()))?
969 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
970
971 if is_tree_node(&child_data) {
972 let child_node = decode_tree_node(&child_data)?;
973 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
974 } else {
975 chunks.push(child_data);
976 }
977 }
978
979 Ok(chunks)
980 }
981
982 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
984 let node = match self.get_node(cid).await? {
985 Some(n) => n,
986 None => return Ok(vec![]),
987 };
988
989 let mut entries = Vec::new();
990
991 for link in &node.links {
992 if let Some(ref name) = link.name {
994 if name.starts_with("_chunk_") || name.starts_with('_') {
995 let chunk_cid = Cid { hash: link.hash, key: link.key, size: link.size };
996 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
997 entries.extend(sub_entries);
998 continue;
999 }
1000 }
1001
1002 entries.push(TreeEntry {
1003 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1004 hash: link.hash,
1005 size: link.size,
1006 link_type: link.link_type,
1007 key: link.key,
1008 meta: link.meta.clone(),
1009 });
1010 }
1011
1012 Ok(entries)
1013 }
1014
1015 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1017 let node = match self.get_node(cid).await? {
1018 Some(n) => n,
1019 None => return Ok(vec![]),
1020 };
1021
1022 let mut entries = Vec::new();
1023
1024 for link in &node.links {
1025 if let Some(ref name) = link.name {
1027 if name.starts_with("_chunk_") || name.starts_with('_') {
1028 let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1030 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1031 entries.extend(sub_entries);
1032 continue;
1033 }
1034 }
1035
1036 entries.push(TreeEntry {
1037 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1038 hash: link.hash,
1039 size: link.size,
1040 link_type: link.link_type,
1041 key: link.key,
1042 meta: link.meta.clone(),
1043 });
1044 }
1045
1046 Ok(entries)
1047 }
1048
1049 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1051 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1052 if parts.is_empty() {
1053 return Ok(Some(cid.clone()));
1054 }
1055
1056 let mut current_cid = cid.clone();
1057
1058 for part in parts {
1059 let node = match self.get_node(¤t_cid).await? {
1061 Some(n) => n,
1062 None => return Ok(None),
1063 };
1064
1065 if let Some(link) = self.find_link(&node, part) {
1066 current_cid = Cid {
1067 hash: link.hash,
1068 key: link.key,
1069 size: link.size,
1070 };
1071 } else {
1072 match self.find_link_in_subtrees_cid(&node, part, ¤t_cid).await? {
1074 Some(link) => {
1075 current_cid = Cid {
1076 hash: link.hash,
1077 key: link.key,
1078 size: link.size,
1079 };
1080 }
1081 None => return Ok(None),
1082 }
1083 }
1084 }
1085
1086 Ok(Some(current_cid))
1087 }
1088
1089 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1091 self.resolve(cid, path).await
1092 }
1093
1094 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1095 node.links
1096 .iter()
1097 .find(|l| l.name.as_deref() == Some(name))
1098 .cloned()
1099 }
1100
1101 async fn find_link_in_subtrees(&self, node: &TreeNode, name: &str) -> Result<Option<Link>, HashTreeError> {
1102 for link in &node.links {
1103 if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1104 continue;
1105 }
1106
1107 let sub_node = match self.get_tree_node(&link.hash).await? {
1108 Some(n) => n,
1109 None => continue,
1110 };
1111
1112 if let Some(found) = self.find_link(&sub_node, name) {
1113 return Ok(Some(found));
1114 }
1115
1116 if let Some(deep_found) = Box::pin(self.find_link_in_subtrees(&sub_node, name)).await? {
1117 return Ok(Some(deep_found));
1118 }
1119 }
1120
1121 Ok(None)
1122 }
1123
1124 async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1126 for link in &node.links {
1127 if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1128 continue;
1129 }
1130
1131 let sub_cid = Cid {
1133 hash: link.hash,
1134 key: link.key.clone(),
1135 size: link.size,
1136 };
1137
1138 let sub_node = match self.get_node(&sub_cid).await? {
1139 Some(n) => n,
1140 None => continue,
1141 };
1142
1143 if let Some(found) = self.find_link(&sub_node, name) {
1144 return Ok(Some(found));
1145 }
1146
1147 if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1148 return Ok(Some(deep_found));
1149 }
1150 }
1151
1152 Ok(None)
1153 }
1154
1155 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1157 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1158 Some(d) => d,
1159 None => return Ok(0),
1160 };
1161
1162 if !is_tree_node(&data) {
1163 return Ok(data.len() as u64);
1164 }
1165
1166 let node = decode_tree_node(&data)?;
1167 let mut total = 0u64;
1169 for link in &node.links {
1170 total += link.size;
1171 }
1172 Ok(total)
1173 }
1174
1175 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1177 let mut entries = Vec::new();
1178 self.walk_recursive(cid, path, &mut entries).await?;
1179 Ok(entries)
1180 }
1181
1182 async fn walk_recursive(
1183 &self,
1184 cid: &Cid,
1185 path: &str,
1186 entries: &mut Vec<WalkEntry>,
1187 ) -> Result<(), HashTreeError> {
1188 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1189 Some(d) => d,
1190 None => return Ok(()),
1191 };
1192
1193 let data = if let Some(key) = &cid.key {
1195 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1196 } else {
1197 data
1198 };
1199
1200 let node = match try_decode_tree_node(&data) {
1201 Some(n) => n,
1202 None => {
1203 entries.push(WalkEntry {
1204 path: path.to_string(),
1205 hash: cid.hash,
1206 link_type: LinkType::Blob,
1207 size: data.len() as u64,
1208 key: cid.key,
1209 });
1210 return Ok(());
1211 }
1212 };
1213
1214 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1215 entries.push(WalkEntry {
1216 path: path.to_string(),
1217 hash: cid.hash,
1218 link_type: node.node_type,
1219 size: node_size,
1220 key: cid.key,
1221 });
1222
1223 for link in &node.links {
1224 let child_path = match &link.name {
1225 Some(name) => {
1226 if name.starts_with("_chunk_") || name.starts_with('_') {
1227 let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1229 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1230 continue;
1231 }
1232 if path.is_empty() {
1233 name.clone()
1234 } else {
1235 format!("{}/{}", path, name)
1236 }
1237 }
1238 None => path.to_string(),
1239 };
1240
1241 let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1243 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1244 }
1245
1246 Ok(())
1247 }
1248
1249 pub fn walk_stream(
1251 &self,
1252 cid: Cid,
1253 initial_path: String,
1254 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1255 Box::pin(stream::unfold(
1256 WalkStreamState::Init { cid, path: initial_path, tree: self },
1257 |state| async move {
1258 match state {
1259 WalkStreamState::Init { cid, path, tree } => {
1260 let data = match tree.store.get(&cid.hash).await {
1261 Ok(Some(d)) => d,
1262 Ok(None) => return None,
1263 Err(e) => {
1264 return Some((
1265 Err(HashTreeError::Store(e.to_string())),
1266 WalkStreamState::Done,
1267 ))
1268 }
1269 };
1270
1271 let data = if let Some(key) = &cid.key {
1273 match decrypt_chk(&data, key) {
1274 Ok(d) => d,
1275 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1276 }
1277 } else {
1278 data
1279 };
1280
1281 let node = match try_decode_tree_node(&data) {
1282 Some(n) => n,
1283 None => {
1284 let entry = WalkEntry {
1286 path,
1287 hash: cid.hash,
1288 link_type: LinkType::Blob,
1289 size: data.len() as u64,
1290 key: cid.key,
1291 };
1292 return Some((Ok(entry), WalkStreamState::Done));
1293 }
1294 };
1295
1296 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1297 let entry = WalkEntry {
1298 path: path.clone(),
1299 hash: cid.hash,
1300 link_type: node.node_type,
1301 size: node_size,
1302 key: cid.key,
1303 };
1304
1305 let mut stack: Vec<WalkStackItem> = Vec::new();
1307 for link in node.links.into_iter().rev() {
1308 let child_path = match &link.name {
1309 Some(name) if !name.starts_with('_') => {
1310 if path.is_empty() {
1311 name.clone()
1312 } else {
1313 format!("{}/{}", path, name)
1314 }
1315 }
1316 _ => path.clone(),
1317 };
1318 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1320 }
1321
1322 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1323 }
1324 WalkStreamState::Processing { mut stack, tree } => {
1325 tree.process_walk_stack(&mut stack).await
1326 }
1327 WalkStreamState::Done => None,
1328 }
1329 },
1330 ))
1331 }
1332
1333 async fn process_walk_stack<'a>(
1334 &'a self,
1335 stack: &mut Vec<WalkStackItem>,
1336 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1337 while let Some(item) = stack.pop() {
1338 let data = match self.store.get(&item.hash).await {
1339 Ok(Some(d)) => d,
1340 Ok(None) => continue,
1341 Err(e) => {
1342 return Some((
1343 Err(HashTreeError::Store(e.to_string())),
1344 WalkStreamState::Done,
1345 ))
1346 }
1347 };
1348
1349 let node = match try_decode_tree_node(&data) {
1350 Some(n) => n,
1351 None => {
1352 let entry = WalkEntry {
1354 path: item.path,
1355 hash: item.hash,
1356 link_type: LinkType::Blob,
1357 size: data.len() as u64,
1358 key: item.key,
1359 };
1360 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1361 }
1362 };
1363
1364 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1365 let entry = WalkEntry {
1366 path: item.path.clone(),
1367 hash: item.hash,
1368 link_type: node.node_type,
1369 size: node_size,
1370 key: None, };
1372
1373 for link in node.links.into_iter().rev() {
1375 let child_path = match &link.name {
1376 Some(name) if !name.starts_with('_') => {
1377 if item.path.is_empty() {
1378 name.clone()
1379 } else {
1380 format!("{}/{}", item.path, name)
1381 }
1382 }
1383 _ => item.path.clone(),
1384 };
1385 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1386 }
1387
1388 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1389 }
1390 None
1391 }
1392
1393 pub async fn set_entry(
1398 &self,
1399 root: &Cid,
1400 path: &[&str],
1401 name: &str,
1402 entry_cid: &Cid,
1403 link_type: LinkType,
1404 ) -> Result<Cid, HashTreeError> {
1405 let dir_cid = self.resolve_path_array(root, path).await?;
1406 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1407
1408 let entries = self.list_directory(&dir_cid).await?;
1409 let mut new_entries: Vec<DirEntry> = entries
1410 .into_iter()
1411 .filter(|e| e.name != name)
1412 .map(|e| DirEntry {
1413 name: e.name,
1414 hash: e.hash,
1415 size: e.size,
1416 key: e.key,
1417 link_type: e.link_type,
1418 meta: e.meta,
1419 })
1420 .collect();
1421
1422 new_entries.push(DirEntry {
1423 name: name.to_string(),
1424 hash: entry_cid.hash,
1425 size: entry_cid.size,
1426 key: entry_cid.key,
1427 link_type,
1428 meta: None,
1429 });
1430
1431 let new_dir_cid = self.put_directory(new_entries).await?;
1432 self.rebuild_path(root, path, new_dir_cid).await
1433 }
1434
1435 pub async fn remove_entry(
1438 &self,
1439 root: &Cid,
1440 path: &[&str],
1441 name: &str,
1442 ) -> Result<Cid, HashTreeError> {
1443 let dir_cid = self.resolve_path_array(root, path).await?;
1444 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1445
1446 let entries = self.list_directory(&dir_cid).await?;
1447 let new_entries: Vec<DirEntry> = entries
1448 .into_iter()
1449 .filter(|e| e.name != name)
1450 .map(|e| DirEntry {
1451 name: e.name,
1452 hash: e.hash,
1453 size: e.size,
1454 key: e.key,
1455 link_type: e.link_type,
1456 meta: e.meta,
1457 })
1458 .collect();
1459
1460 let new_dir_cid = self.put_directory(new_entries).await?;
1461 self.rebuild_path(root, path, new_dir_cid).await
1462 }
1463
1464 pub async fn rename_entry(
1467 &self,
1468 root: &Cid,
1469 path: &[&str],
1470 old_name: &str,
1471 new_name: &str,
1472 ) -> Result<Cid, HashTreeError> {
1473 if old_name == new_name {
1474 return Ok(root.clone());
1475 }
1476
1477 let dir_cid = self.resolve_path_array(root, path).await?;
1478 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1479
1480 let entries = self.list_directory(&dir_cid).await?;
1481 let entry = entries
1482 .iter()
1483 .find(|e| e.name == old_name)
1484 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1485
1486 let entry_hash = entry.hash;
1487 let entry_size = entry.size;
1488 let entry_key = entry.key;
1489 let entry_link_type = entry.link_type;
1490 let entry_meta = entry.meta.clone();
1491
1492 let new_entries: Vec<DirEntry> = entries
1493 .into_iter()
1494 .filter(|e| e.name != old_name)
1495 .map(|e| DirEntry {
1496 name: e.name,
1497 hash: e.hash,
1498 size: e.size,
1499 key: e.key,
1500 link_type: e.link_type,
1501 meta: e.meta,
1502 })
1503 .chain(std::iter::once(DirEntry {
1504 name: new_name.to_string(),
1505 hash: entry_hash,
1506 size: entry_size,
1507 key: entry_key,
1508 link_type: entry_link_type,
1509 meta: entry_meta,
1510 }))
1511 .collect();
1512
1513 let new_dir_cid = self.put_directory(new_entries).await?;
1514 self.rebuild_path(root, path, new_dir_cid).await
1515 }
1516
1517 pub async fn move_entry(
1520 &self,
1521 root: &Cid,
1522 source_path: &[&str],
1523 name: &str,
1524 target_path: &[&str],
1525 ) -> Result<Cid, HashTreeError> {
1526 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1527 let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1528
1529 let source_entries = self.list_directory(&source_dir_cid).await?;
1530 let entry = source_entries
1531 .iter()
1532 .find(|e| e.name == name)
1533 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1534
1535 let entry_cid = Cid {
1536 hash: entry.hash,
1537 key: entry.key,
1538 size: entry.size,
1539 };
1540 let entry_link_type = entry.link_type;
1541
1542 let new_root = self.remove_entry(root, source_path, name).await?;
1544
1545 self.set_entry(&new_root, target_path, name, &entry_cid, entry_link_type).await
1547 }
1548
1549 async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1550 if path.is_empty() {
1551 return Ok(Some(root.clone()));
1552 }
1553 self.resolve_path(root, &path.join("/")).await
1554 }
1555
1556 async fn rebuild_path(
1557 &self,
1558 root: &Cid,
1559 path: &[&str],
1560 new_child: Cid,
1561 ) -> Result<Cid, HashTreeError> {
1562 if path.is_empty() {
1563 return Ok(new_child);
1564 }
1565
1566 let mut child_cid = new_child;
1567 let parts: Vec<&str> = path.to_vec();
1568
1569 for i in (0..parts.len()).rev() {
1570 let child_name = parts[i];
1571 let parent_path = &parts[..i];
1572
1573 let parent_cid = if parent_path.is_empty() {
1574 root.clone()
1575 } else {
1576 self.resolve_path_array(root, parent_path)
1577 .await?
1578 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1579 };
1580
1581 let parent_entries = self.list_directory(&parent_cid).await?;
1582 let new_parent_entries: Vec<DirEntry> = parent_entries
1583 .into_iter()
1584 .map(|e| {
1585 if e.name == child_name {
1586 DirEntry {
1587 name: e.name,
1588 hash: child_cid.hash,
1589 size: child_cid.size,
1590 key: child_cid.key,
1591 link_type: e.link_type,
1592 meta: e.meta,
1593 }
1594 } else {
1595 DirEntry {
1596 name: e.name,
1597 hash: e.hash,
1598 size: e.size,
1599 key: e.key,
1600 link_type: e.link_type,
1601 meta: e.meta,
1602 }
1603 }
1604 })
1605 .collect();
1606
1607 child_cid = self.put_directory(new_parent_entries).await?;
1608 }
1609
1610 Ok(child_cid)
1611 }
1612
1613 pub fn get_store(&self) -> Arc<S> {
1617 self.store.clone()
1618 }
1619
1620 pub fn chunk_size(&self) -> usize {
1622 self.chunk_size
1623 }
1624
1625 pub fn max_links(&self) -> usize {
1627 self.max_links
1628 }
1629}
1630
1631enum StreamStackItem {
1634 Hash(Hash),
1635}
1636
1637enum ReadStreamState<'a, S: Store> {
1638 Init { hash: Hash, tree: &'a HashTree<S> },
1639 Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1640 Done,
1641}
1642
1643struct WalkStackItem {
1644 hash: Hash,
1645 path: String,
1646 key: Option<[u8; 32]>,
1647}
1648
1649enum WalkStreamState<'a, S: Store> {
1650 Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1651 Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1652 Done,
1653}
1654
1655struct EncryptedStackItem {
1657 hash: Hash,
1658 key: Option<[u8; 32]>,
1659}
1660
1661enum EncryptedStreamState<'a, S: Store> {
1662 Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1663 Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1664 Done,
1665}
1666
1667pub async fn verify_tree<S: Store>(
1669 store: Arc<S>,
1670 root_hash: &Hash,
1671) -> Result<crate::reader::VerifyResult, HashTreeError> {
1672 let mut missing = Vec::new();
1673 let mut visited = std::collections::HashSet::new();
1674
1675 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1676
1677 Ok(crate::reader::VerifyResult {
1678 valid: missing.is_empty(),
1679 missing,
1680 })
1681}
1682
1683async fn verify_recursive<S: Store>(
1684 store: Arc<S>,
1685 hash: &Hash,
1686 missing: &mut Vec<Hash>,
1687 visited: &mut std::collections::HashSet<String>,
1688) -> Result<(), HashTreeError> {
1689 let hex = to_hex(hash);
1690 if visited.contains(&hex) {
1691 return Ok(());
1692 }
1693 visited.insert(hex);
1694
1695 let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1696 Some(d) => d,
1697 None => {
1698 missing.push(*hash);
1699 return Ok(());
1700 }
1701 };
1702
1703 if is_tree_node(&data) {
1704 let node = decode_tree_node(&data)?;
1705 for link in &node.links {
1706 Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1707 }
1708 }
1709
1710 Ok(())
1711}
1712
1713#[cfg(test)]
1714mod tests {
1715 use super::*;
1716 use crate::store::MemoryStore;
1717
1718 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1719 let store = Arc::new(MemoryStore::new());
1720 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1722 (store, tree)
1723 }
1724
1725 #[tokio::test]
1726 async fn test_put_and_read_blob() {
1727 let (_store, tree) = make_tree();
1728
1729 let data = vec![1, 2, 3, 4, 5];
1730 let hash = tree.put_blob(&data).await.unwrap();
1731
1732 let result = tree.get_blob(&hash).await.unwrap();
1733 assert_eq!(result, Some(data));
1734 }
1735
1736 #[tokio::test]
1737 async fn test_put_and_read_file_small() {
1738 let (_store, tree) = make_tree();
1739
1740 let data = b"Hello, World!";
1741 let result = tree.put_file(data).await.unwrap();
1742
1743 assert_eq!(result.size, data.len() as u64);
1744
1745 let read_data = tree.read_file(&result.hash).await.unwrap();
1746 assert_eq!(read_data, Some(data.to_vec()));
1747 }
1748
1749 #[tokio::test]
1750 async fn test_put_and_read_directory() {
1751 let (_store, tree) = make_tree();
1752
1753 let file1 = tree.put_blob(b"content1").await.unwrap();
1754 let file2 = tree.put_blob(b"content2").await.unwrap();
1755
1756 let dir_cid = tree
1757 .put_directory(
1758 vec![
1759 DirEntry::new("a.txt", file1).with_size(8),
1760 DirEntry::new("b.txt", file2).with_size(8),
1761 ],
1762 )
1763 .await
1764 .unwrap();
1765
1766 let entries = tree.list_directory(&dir_cid).await.unwrap();
1767 assert_eq!(entries.len(), 2);
1768 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1769 assert!(names.contains(&"a.txt"));
1770 assert!(names.contains(&"b.txt"));
1771 }
1772
1773 #[tokio::test]
1774 async fn test_is_directory() {
1775 let (_store, tree) = make_tree();
1776
1777 let file_hash = tree.put_blob(b"data").await.unwrap();
1778 let dir_cid = tree.put_directory(vec![]).await.unwrap();
1779
1780 assert!(!tree.is_directory(&file_hash).await.unwrap());
1781 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1782 }
1783
1784 #[tokio::test]
1785 async fn test_resolve_path() {
1786 let (_store, tree) = make_tree();
1787
1788 let file_hash = tree.put_blob(b"nested").await.unwrap();
1789 let sub_dir = tree.put_directory(
1790 vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1791 ).await.unwrap();
1792 let root_dir = tree.put_directory(
1793 vec![DirEntry::new("subdir", sub_dir.hash)],
1794 ).await.unwrap();
1795
1796 let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1797 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1798 }
1799
1800 #[tokio::test]
1803 async fn test_unified_put_get_public() {
1804 let store = Arc::new(MemoryStore::new());
1805 let tree = HashTree::new(HashTreeConfig::new(store).public());
1807
1808 let data = b"Hello, public world!";
1809 let cid = tree.put(data).await.unwrap();
1810
1811 assert_eq!(cid.size, data.len() as u64);
1812 assert!(cid.key.is_none()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1815 assert_eq!(retrieved, data);
1816 }
1817
1818 #[tokio::test]
1819 async fn test_unified_put_get_encrypted() {
1820 let store = Arc::new(MemoryStore::new());
1821 let tree = HashTree::new(HashTreeConfig::new(store));
1823
1824 let data = b"Hello, encrypted world!";
1825 let cid = tree.put(data).await.unwrap();
1826
1827 assert_eq!(cid.size, data.len() as u64);
1828 assert!(cid.key.is_some()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1831 assert_eq!(retrieved, data);
1832 }
1833
1834 #[tokio::test]
1835 async fn test_unified_put_get_encrypted_chunked() {
1836 let store = Arc::new(MemoryStore::new());
1837 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
1838
1839 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1841 let cid = tree.put(&data).await.unwrap();
1842
1843 assert_eq!(cid.size, data.len() as u64);
1844 assert!(cid.key.is_some());
1845
1846 let retrieved = tree.get(&cid).await.unwrap().unwrap();
1847 assert_eq!(retrieved, data);
1848 }
1849
1850 #[tokio::test]
1851 async fn test_cid_deterministic() {
1852 let store = Arc::new(MemoryStore::new());
1853 let tree = HashTree::new(HashTreeConfig::new(store));
1854
1855 let data = b"Same content produces same CID";
1856
1857 let cid1 = tree.put(data).await.unwrap();
1858 let cid2 = tree.put(data).await.unwrap();
1859
1860 assert_eq!(cid1.hash, cid2.hash);
1862 assert_eq!(cid1.key, cid2.key);
1863 assert_eq!(cid1.to_string(), cid2.to_string());
1864 }
1865
1866 #[tokio::test]
1867 async fn test_cid_to_string_public() {
1868 let store = Arc::new(MemoryStore::new());
1869 let tree = HashTree::new(HashTreeConfig::new(store).public());
1870
1871 let cid = tree.put(b"test").await.unwrap();
1872 let s = cid.to_string();
1873
1874 assert_eq!(s.len(), 64);
1876 assert!(!s.contains(':'));
1877 }
1878
1879 #[tokio::test]
1880 async fn test_cid_to_string_encrypted() {
1881 let store = Arc::new(MemoryStore::new());
1882 let tree = HashTree::new(HashTreeConfig::new(store));
1883
1884 let cid = tree.put(b"test").await.unwrap();
1885 let s = cid.to_string();
1886
1887 assert_eq!(s.len(), 129);
1889 assert!(s.contains(':'));
1890 }
1891}