1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25 pub store: Arc<S>,
26 pub chunk_size: usize,
27 pub max_links: usize,
28 pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33 pub fn new(store: Arc<S>) -> Self {
34 Self {
35 store,
36 chunk_size: DEFAULT_CHUNK_SIZE,
37 max_links: DEFAULT_MAX_LINKS,
38 encrypted: true,
39 }
40 }
41
42 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43 self.chunk_size = chunk_size;
44 self
45 }
46
47 pub fn with_max_links(mut self, max_links: usize) -> Self {
48 self.max_links = max_links;
49 self
50 }
51
52 pub fn public(mut self) -> Self {
54 self.encrypted = false;
55 self
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct PutFileResult {
62 pub hash: Hash,
63 pub size: u64,
64}
65
66#[derive(Debug, thiserror::Error)]
68pub enum HashTreeError {
69 #[error("Store error: {0}")]
70 Store(String),
71 #[error("Codec error: {0}")]
72 Codec(#[from] crate::codec::CodecError),
73 #[error("Missing chunk: {0}")]
74 MissingChunk(String),
75 #[error("Path not found: {0}")]
76 PathNotFound(String),
77 #[error("Entry not found: {0}")]
78 EntryNotFound(String),
79 #[error("Encryption error: {0}")]
80 Encryption(String),
81 #[error("Decryption error: {0}")]
82 Decryption(String),
83}
84
85impl From<BuilderError> for HashTreeError {
86 fn from(e: BuilderError) -> Self {
87 match e {
88 BuilderError::Store(s) => HashTreeError::Store(s),
89 BuilderError::Codec(c) => HashTreeError::Codec(c),
90 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
91 }
92 }
93}
94
95impl From<ReaderError> for HashTreeError {
96 fn from(e: ReaderError) -> Self {
97 match e {
98 ReaderError::Store(s) => HashTreeError::Store(s),
99 ReaderError::Codec(c) => HashTreeError::Codec(c),
100 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
101 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
102 ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
103 }
104 }
105}
106
107pub struct HashTree<S: Store> {
109 store: Arc<S>,
110 chunk_size: usize,
111 max_links: usize,
112 encrypted: bool,
113}
114
115impl<S: Store> HashTree<S> {
116 pub fn new(config: HashTreeConfig<S>) -> Self {
117 Self {
118 store: config.store,
119 chunk_size: config.chunk_size,
120 max_links: config.max_links,
121 encrypted: config.encrypted,
122 }
123 }
124
125 pub fn is_encrypted(&self) -> bool {
127 self.encrypted
128 }
129
130 pub async fn put(&self, data: &[u8]) -> Result<Cid, HashTreeError> {
135 let size = data.len() as u64;
136
137 if data.len() <= self.chunk_size {
139 let (hash, key) = self.put_chunk_internal(data).await?;
140 return Ok(Cid { hash, key, size });
141 }
142
143 let mut links: Vec<Link> = Vec::new();
145 let mut offset = 0;
146
147 while offset < data.len() {
148 let end = (offset + self.chunk_size).min(data.len());
149 let chunk = &data[offset..end];
150 let chunk_size = chunk.len() as u64;
151 let (hash, key) = self.put_chunk_internal(chunk).await?;
152 links.push(Link {
153 hash,
154 name: None,
155 size: chunk_size,
156 key,
157 link_type: LinkType::Blob, meta: None,
159 });
160 offset = end;
161 }
162
163 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
165 Ok(Cid { hash: root_hash, key: root_key, size })
166 }
167
168 pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
170 if let Some(key) = cid.key {
171 self.get_encrypted(&cid.hash, &key).await
172 } else {
173 self.read_file(&cid.hash).await
174 }
175 }
176
177 pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<Cid, HashTreeError> {
182 let mut buffer = vec![0u8; self.chunk_size];
183 let mut links = Vec::new();
184 let mut total_size: u64 = 0;
185 let mut consistent_key: Option<[u8; 32]> = None;
186
187 loop {
188 let mut chunk = Vec::new();
189 let mut bytes_read = 0;
190
191 while bytes_read < self.chunk_size {
193 let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
194 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
195 if n == 0 {
196 break; }
198 chunk.extend_from_slice(&buffer[..n]);
199 bytes_read += n;
200 }
201
202 if chunk.is_empty() {
203 break; }
205
206 let chunk_len = chunk.len() as u64;
207 total_size += chunk_len;
208
209 let (hash, key) = self.put_chunk_internal(&chunk).await?;
210
211 if links.is_empty() {
213 consistent_key = key;
214 } else if consistent_key != key {
215 consistent_key = None;
216 }
217
218 links.push(Link {
219 hash,
220 name: None,
221 size: chunk_len,
222 key,
223 link_type: LinkType::Blob, meta: None,
225 });
226 }
227
228 if links.is_empty() {
229 let (hash, key) = self.put_chunk_internal(&[]).await?;
231 return Ok(Cid { hash, key, size: 0 });
232 }
233
234 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
236 Ok(Cid { hash: root_hash, key: root_key, size: total_size })
237 }
238
239 pub fn get_stream(
244 &self,
245 cid: &Cid,
246 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
247 let hash = cid.hash;
248 let key = cid.key;
249
250 if let Some(k) = key {
251 Box::pin(self.read_file_stream_encrypted(hash, k))
253 } else {
254 self.read_file_stream(hash)
256 }
257 }
258
259 fn read_file_stream_encrypted(
261 &self,
262 hash: Hash,
263 key: EncryptionKey,
264 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
265 stream::unfold(
266 EncryptedStreamState::Init { hash, key, tree: self },
267 |state| async move {
268 match state {
269 EncryptedStreamState::Init { hash, key, tree } => {
270 let data = match tree.store.get(&hash).await {
271 Ok(Some(d)) => d,
272 Ok(None) => return None,
273 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
274 };
275
276 let decrypted = match decrypt_chk(&data, &key) {
278 Ok(d) => d,
279 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
280 };
281
282 if !is_tree_node(&decrypted) {
283 return Some((Ok(decrypted), EncryptedStreamState::Done));
285 }
286
287 let node = match decode_tree_node(&decrypted) {
289 Ok(n) => n,
290 Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
291 };
292
293 let mut stack: Vec<EncryptedStackItem> = Vec::new();
294 for link in node.links.into_iter().rev() {
295 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
296 }
297
298 tree.process_encrypted_stream_stack(&mut stack).await
299 }
300 EncryptedStreamState::Processing { mut stack, tree } => {
301 tree.process_encrypted_stream_stack(&mut stack).await
302 }
303 EncryptedStreamState::Done => None,
304 }
305 },
306 )
307 }
308
309 async fn process_encrypted_stream_stack<'a>(
310 &'a self,
311 stack: &mut Vec<EncryptedStackItem>,
312 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
313 while let Some(item) = stack.pop() {
314 let data = match self.store.get(&item.hash).await {
315 Ok(Some(d)) => d,
316 Ok(None) => {
317 return Some((
318 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
319 EncryptedStreamState::Done,
320 ))
321 }
322 Err(e) => {
323 return Some((
324 Err(HashTreeError::Store(e.to_string())),
325 EncryptedStreamState::Done,
326 ))
327 }
328 };
329
330 let decrypted = if let Some(key) = item.key {
332 match decrypt_chk(&data, &key) {
333 Ok(d) => d,
334 Err(e) => {
335 return Some((
336 Err(HashTreeError::Decryption(e.to_string())),
337 EncryptedStreamState::Done,
338 ))
339 }
340 }
341 } else {
342 data
343 };
344
345 if is_tree_node(&decrypted) {
346 let node = match decode_tree_node(&decrypted) {
348 Ok(n) => n,
349 Err(e) => {
350 return Some((
351 Err(HashTreeError::Codec(e)),
352 EncryptedStreamState::Done,
353 ))
354 }
355 };
356 for link in node.links.into_iter().rev() {
357 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
358 }
359 } else {
360 return Some((
362 Ok(decrypted),
363 EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
364 ));
365 }
366 }
367 None
368 }
369
370 async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
372 if self.encrypted {
373 let (encrypted, key) = encrypt_chk(data)
374 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
375 let hash = sha256(&encrypted);
376 self.store
377 .put(hash, encrypted)
378 .await
379 .map_err(|e| HashTreeError::Store(e.to_string()))?;
380 Ok((hash, Some(key)))
381 } else {
382 let hash = self.put_blob(data).await?;
383 Ok((hash, None))
384 }
385 }
386
387 async fn build_tree_internal(
389 &self,
390 links: Vec<Link>,
391 total_size: Option<u64>,
392 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
393 if links.len() == 1 {
395 if let Some(ts) = total_size {
396 if links[0].size == ts {
397 return Ok((links[0].hash, links[0].key));
398 }
399 }
400 }
401
402 if links.len() <= self.max_links {
403 let node = TreeNode {
404 node_type: LinkType::File,
405 links,
406 };
407 let (data, _) = encode_and_hash(&node)?;
408
409 if self.encrypted {
410 let (encrypted, key) = encrypt_chk(&data)
411 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
412 let hash = sha256(&encrypted);
413 self.store
414 .put(hash, encrypted)
415 .await
416 .map_err(|e| HashTreeError::Store(e.to_string()))?;
417 return Ok((hash, Some(key)));
418 }
419
420 let hash = sha256(&data);
422 self.store
423 .put(hash, data)
424 .await
425 .map_err(|e| HashTreeError::Store(e.to_string()))?;
426 return Ok((hash, None));
427 }
428
429 let mut sub_links = Vec::new();
431 for batch in links.chunks(self.max_links) {
432 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
433 let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
434 sub_links.push(Link {
435 hash,
436 name: None,
437 size: batch_size,
438 key,
439 link_type: LinkType::File, meta: None,
441 });
442 }
443
444 Box::pin(self.build_tree_internal(sub_links, total_size)).await
445 }
446
447 async fn get_encrypted(
449 &self,
450 hash: &Hash,
451 key: &EncryptionKey,
452 ) -> Result<Option<Vec<u8>>, HashTreeError> {
453 let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
454 Some(d) => d,
455 None => return Ok(None),
456 };
457
458 let decrypted = decrypt_chk(&encrypted_data, key)
460 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
461
462 if is_tree_node(&decrypted) {
464 let node = decode_tree_node(&decrypted)?;
465 let assembled = self.assemble_encrypted_chunks(&node).await?;
466 return Ok(Some(assembled));
467 }
468
469 Ok(Some(decrypted))
471 }
472
473 async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
475 let mut parts: Vec<Vec<u8>> = Vec::new();
476
477 for link in &node.links {
478 let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
479
480 let encrypted_child = self
481 .store
482 .get(&link.hash)
483 .await
484 .map_err(|e| HashTreeError::Store(e.to_string()))?
485 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
486
487 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
488 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
489
490 if is_tree_node(&decrypted) {
491 let child_node = decode_tree_node(&decrypted)?;
493 let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
494 parts.push(child_data);
495 } else {
496 parts.push(decrypted);
498 }
499 }
500
501 let total_len: usize = parts.iter().map(|p| p.len()).sum();
502 let mut result = Vec::with_capacity(total_len);
503 for part in parts {
504 result.extend_from_slice(&part);
505 }
506
507 Ok(result)
508 }
509
510 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
515 let hash = sha256(data);
516 self.store
517 .put(hash, data.to_vec())
518 .await
519 .map_err(|e| HashTreeError::Store(e.to_string()))?;
520 Ok(hash)
521 }
522
523 pub async fn put_file(&self, data: &[u8]) -> Result<PutFileResult, HashTreeError> {
526 let size = data.len() as u64;
527
528 if data.len() <= self.chunk_size {
530 let hash = self.put_blob(data).await?;
531 return Ok(PutFileResult { hash, size });
532 }
533
534 let mut chunk_hashes: Vec<Hash> = Vec::new();
536 let mut offset = 0;
537
538 while offset < data.len() {
539 let end = (offset + self.chunk_size).min(data.len());
540 let chunk = &data[offset..end];
541 let hash = self.put_blob(chunk).await?;
542 chunk_hashes.push(hash);
543 offset = end;
544 }
545
546 let chunks: Vec<Link> = chunk_hashes
548 .iter()
549 .enumerate()
550 .map(|(i, &hash)| {
551 let chunk_size = if i < chunk_hashes.len() - 1 {
552 self.chunk_size as u64
553 } else {
554 (data.len() - i * self.chunk_size) as u64
555 };
556 Link {
557 hash,
558 name: None,
559 size: chunk_size,
560 key: None,
561 link_type: LinkType::Blob, meta: None,
563 }
564 })
565 .collect();
566
567 let root_hash = self.build_tree(chunks, Some(size)).await?;
568 Ok(PutFileResult { hash: root_hash, size })
569 }
570
571 pub async fn put_directory(
577 &self,
578 entries: Vec<DirEntry>,
579 ) -> Result<Cid, HashTreeError> {
580 let mut sorted = entries;
582 sorted.sort_by(|a, b| a.name.cmp(&b.name));
583
584 let links: Vec<Link> = sorted
585 .into_iter()
586 .map(|e| Link {
587 hash: e.hash,
588 name: Some(e.name),
589 size: e.size,
590 key: e.key,
591 link_type: e.link_type,
592 meta: e.meta,
593 })
594 .collect();
595
596 let total_size: u64 = links.iter().map(|l| l.size).sum();
597
598 let node = TreeNode {
600 node_type: LinkType::Dir,
601 links,
602 };
603 let (data, _plain_hash) = encode_and_hash(&node)?;
604
605 let cid = self.put(&data).await?;
610 Ok(Cid { hash: cid.hash, key: cid.key, size: total_size })
611 }
612
613 async fn build_tree(&self, links: Vec<Link>, total_size: Option<u64>) -> Result<Hash, HashTreeError> {
615 if links.len() == 1 {
617 if let Some(ts) = total_size {
618 if links[0].size == ts {
619 return Ok(links[0].hash);
620 }
621 }
622 }
623
624 if links.len() <= self.max_links {
626 let node = TreeNode {
627 node_type: LinkType::File,
628 links,
629 };
630 let (data, hash) = encode_and_hash(&node)?;
631 self.store
632 .put(hash, data)
633 .await
634 .map_err(|e| HashTreeError::Store(e.to_string()))?;
635 return Ok(hash);
636 }
637
638 let mut sub_trees: Vec<Link> = Vec::new();
640
641 for batch in links.chunks(self.max_links) {
642 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
643
644 let node = TreeNode {
645 node_type: LinkType::File,
646 links: batch.to_vec(),
647 };
648 let (data, hash) = encode_and_hash(&node)?;
649 self.store
650 .put(hash, data)
651 .await
652 .map_err(|e| HashTreeError::Store(e.to_string()))?;
653
654 sub_trees.push(Link {
655 hash,
656 name: None,
657 size: batch_size,
658 key: None,
659 link_type: LinkType::File, meta: None,
661 });
662 }
663
664 Box::pin(self.build_tree(sub_trees, total_size)).await
666 }
667
668 pub async fn put_tree_node(
670 &self,
671 links: Vec<Link>,
672 ) -> Result<Hash, HashTreeError> {
673 let node = TreeNode {
674 node_type: LinkType::Dir,
675 links,
676 };
677
678 let (data, hash) = encode_and_hash(&node)?;
679 self.store
680 .put(hash, data)
681 .await
682 .map_err(|e| HashTreeError::Store(e.to_string()))?;
683 Ok(hash)
684 }
685
686 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
690 self.store
691 .get(hash)
692 .await
693 .map_err(|e| HashTreeError::Store(e.to_string()))
694 }
695
696 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
698 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
699 Some(d) => d,
700 None => return Ok(None),
701 };
702
703 if !is_tree_node(&data) {
704 return Ok(None);
705 }
706
707 let node = decode_tree_node(&data)?;
708 Ok(Some(node))
709 }
710
711 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
713 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
714 Some(d) => d,
715 None => return Ok(None),
716 };
717
718 let decrypted = if let Some(key) = &cid.key {
720 decrypt_chk(&data, key)
721 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
722 } else {
723 data
724 };
725
726 if !is_tree_node(&decrypted) {
727 return Ok(None);
728 }
729
730 let node = decode_tree_node(&decrypted)?;
731 Ok(Some(node))
732 }
733
734 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
737 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
738 Some(d) => d,
739 None => return Ok(None),
740 };
741
742 let decrypted = if let Some(key) = &cid.key {
744 decrypt_chk(&data, key)
745 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
746 } else {
747 data
748 };
749
750 if !is_tree_node(&decrypted) {
751 return Ok(None);
752 }
753
754 let node = decode_tree_node(&decrypted)?;
755
756 if node.node_type == LinkType::File {
758 let assembled = self.assemble_chunks(&node).await?;
759 if is_tree_node(&assembled) {
760 let inner_node = decode_tree_node(&assembled)?;
761 return Ok(Some(inner_node));
762 }
763 }
764
765 Ok(Some(node))
766 }
767
768 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
770 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
771 Some(d) => d,
772 None => return Ok(false),
773 };
774 Ok(is_tree_node(&data))
775 }
776
777 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
779 let node = match self.get_node(cid).await? {
780 Some(n) => n,
781 None => return Ok(false),
782 };
783 Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
785 }
786
787 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
789 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
790 Some(d) => d,
791 None => return Ok(false),
792 };
793 Ok(is_directory_node(&data))
794 }
795
796 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
798 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
799 Some(d) => d,
800 None => return Ok(None),
801 };
802
803 if !is_tree_node(&data) {
805 return Ok(Some(data));
806 }
807
808 let node = decode_tree_node(&data)?;
810 let assembled = self.assemble_chunks(&node).await?;
811 Ok(Some(assembled))
812 }
813
814 async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
816 let mut parts: Vec<Vec<u8>> = Vec::new();
817
818 for link in &node.links {
819 let child_data = self
820 .store
821 .get(&link.hash)
822 .await
823 .map_err(|e| HashTreeError::Store(e.to_string()))?
824 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
825
826 if is_tree_node(&child_data) {
827 let child_node = decode_tree_node(&child_data)?;
828 parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
829 } else {
830 parts.push(child_data);
831 }
832 }
833
834 let total_length: usize = parts.iter().map(|p| p.len()).sum();
836 let mut result = Vec::with_capacity(total_length);
837 for part in parts {
838 result.extend_from_slice(&part);
839 }
840
841 Ok(result)
842 }
843
844 pub fn read_file_stream(
847 &self,
848 hash: Hash,
849 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
850 Box::pin(stream::unfold(
851 ReadStreamState::Init { hash, tree: self },
852 |state| async move {
853 match state {
854 ReadStreamState::Init { hash, tree } => {
855 let data = match tree.store.get(&hash).await {
856 Ok(Some(d)) => d,
857 Ok(None) => return None,
858 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
859 };
860
861 if !is_tree_node(&data) {
862 return Some((Ok(data), ReadStreamState::Done));
864 }
865
866 let node = match decode_tree_node(&data) {
868 Ok(n) => n,
869 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
870 };
871
872 let mut stack: Vec<StreamStackItem> = Vec::new();
874 for link in node.links.into_iter().rev() {
875 stack.push(StreamStackItem::Hash(link.hash));
876 }
877
878 tree.process_stream_stack(&mut stack).await
880 }
881 ReadStreamState::Processing { mut stack, tree } => {
882 tree.process_stream_stack(&mut stack).await
883 }
884 ReadStreamState::Done => None,
885 }
886 },
887 ))
888 }
889
890 async fn process_stream_stack<'a>(
891 &'a self,
892 stack: &mut Vec<StreamStackItem>,
893 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
894 while let Some(item) = stack.pop() {
895 match item {
896 StreamStackItem::Hash(hash) => {
897 let data = match self.store.get(&hash).await {
898 Ok(Some(d)) => d,
899 Ok(None) => {
900 return Some((
901 Err(HashTreeError::MissingChunk(to_hex(&hash))),
902 ReadStreamState::Done,
903 ))
904 }
905 Err(e) => {
906 return Some((
907 Err(HashTreeError::Store(e.to_string())),
908 ReadStreamState::Done,
909 ))
910 }
911 };
912
913 if is_tree_node(&data) {
914 let node = match decode_tree_node(&data) {
916 Ok(n) => n,
917 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
918 };
919 for link in node.links.into_iter().rev() {
920 stack.push(StreamStackItem::Hash(link.hash));
921 }
922 } else {
923 return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
925 }
926 }
927 }
928 }
929 None
930 }
931
932 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
934 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
935 Some(d) => d,
936 None => return Ok(vec![]),
937 };
938
939 if !is_tree_node(&data) {
940 return Ok(vec![data]);
941 }
942
943 let node = decode_tree_node(&data)?;
944 self.collect_chunks(&node).await
945 }
946
947 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
948 let mut chunks = Vec::new();
949
950 for link in &node.links {
951 let child_data = self
952 .store
953 .get(&link.hash)
954 .await
955 .map_err(|e| HashTreeError::Store(e.to_string()))?
956 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
957
958 if is_tree_node(&child_data) {
959 let child_node = decode_tree_node(&child_data)?;
960 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
961 } else {
962 chunks.push(child_data);
963 }
964 }
965
966 Ok(chunks)
967 }
968
969 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
971 let node = match self.get_node(cid).await? {
972 Some(n) => n,
973 None => return Ok(vec![]),
974 };
975
976 let mut entries = Vec::new();
977
978 for link in &node.links {
979 if let Some(ref name) = link.name {
981 if name.starts_with("_chunk_") || name.starts_with('_') {
982 let chunk_cid = Cid { hash: link.hash, key: link.key, size: link.size };
983 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
984 entries.extend(sub_entries);
985 continue;
986 }
987 }
988
989 entries.push(TreeEntry {
990 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
991 hash: link.hash,
992 size: link.size,
993 link_type: link.link_type,
994 key: link.key,
995 meta: link.meta.clone(),
996 });
997 }
998
999 Ok(entries)
1000 }
1001
1002 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1005 let node = match self.get_directory_node(cid).await? {
1007 Some(n) => n,
1008 None => return Ok(vec![]),
1009 };
1010
1011 let mut entries = Vec::new();
1012
1013 for link in &node.links {
1014 if let Some(ref name) = link.name {
1016 if name.starts_with("_chunk_") || name.starts_with('_') {
1017 let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1019 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1020 entries.extend(sub_entries);
1021 continue;
1022 }
1023 }
1024
1025 entries.push(TreeEntry {
1026 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1027 hash: link.hash,
1028 size: link.size,
1029 link_type: link.link_type,
1030 key: link.key,
1031 meta: link.meta.clone(),
1032 });
1033 }
1034
1035 Ok(entries)
1036 }
1037
1038 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1040 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1041 if parts.is_empty() {
1042 return Ok(Some(cid.clone()));
1043 }
1044
1045 let mut current_cid = cid.clone();
1046
1047 for part in parts {
1048 let node = match self.get_directory_node(¤t_cid).await? {
1050 Some(n) => n,
1051 None => return Ok(None),
1052 };
1053
1054 if let Some(link) = self.find_link(&node, part) {
1055 current_cid = Cid {
1056 hash: link.hash,
1057 key: link.key,
1058 size: link.size,
1059 };
1060 } else {
1061 match self.find_link_in_subtrees_cid(&node, part, ¤t_cid).await? {
1063 Some(link) => {
1064 current_cid = Cid {
1065 hash: link.hash,
1066 key: link.key,
1067 size: link.size,
1068 };
1069 }
1070 None => return Ok(None),
1071 }
1072 }
1073 }
1074
1075 Ok(Some(current_cid))
1076 }
1077
1078 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1080 self.resolve(cid, path).await
1081 }
1082
1083 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1084 node.links
1085 .iter()
1086 .find(|l| l.name.as_deref() == Some(name))
1087 .cloned()
1088 }
1089
1090 async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1092 for link in &node.links {
1093 if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1094 continue;
1095 }
1096
1097 let sub_cid = Cid {
1099 hash: link.hash,
1100 key: link.key.clone(),
1101 size: link.size,
1102 };
1103
1104 let sub_node = match self.get_node(&sub_cid).await? {
1105 Some(n) => n,
1106 None => continue,
1107 };
1108
1109 if let Some(found) = self.find_link(&sub_node, name) {
1110 return Ok(Some(found));
1111 }
1112
1113 if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1114 return Ok(Some(deep_found));
1115 }
1116 }
1117
1118 Ok(None)
1119 }
1120
1121 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1123 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1124 Some(d) => d,
1125 None => return Ok(0),
1126 };
1127
1128 if !is_tree_node(&data) {
1129 return Ok(data.len() as u64);
1130 }
1131
1132 let node = decode_tree_node(&data)?;
1133 let mut total = 0u64;
1135 for link in &node.links {
1136 total += link.size;
1137 }
1138 Ok(total)
1139 }
1140
1141 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1143 let mut entries = Vec::new();
1144 self.walk_recursive(cid, path, &mut entries).await?;
1145 Ok(entries)
1146 }
1147
1148 async fn walk_recursive(
1149 &self,
1150 cid: &Cid,
1151 path: &str,
1152 entries: &mut Vec<WalkEntry>,
1153 ) -> Result<(), HashTreeError> {
1154 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1155 Some(d) => d,
1156 None => return Ok(()),
1157 };
1158
1159 let data = if let Some(key) = &cid.key {
1161 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1162 } else {
1163 data
1164 };
1165
1166 let node = match try_decode_tree_node(&data) {
1167 Some(n) => n,
1168 None => {
1169 entries.push(WalkEntry {
1170 path: path.to_string(),
1171 hash: cid.hash,
1172 link_type: LinkType::Blob,
1173 size: data.len() as u64,
1174 key: cid.key,
1175 });
1176 return Ok(());
1177 }
1178 };
1179
1180 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1181 entries.push(WalkEntry {
1182 path: path.to_string(),
1183 hash: cid.hash,
1184 link_type: node.node_type,
1185 size: node_size,
1186 key: cid.key,
1187 });
1188
1189 for link in &node.links {
1190 let child_path = match &link.name {
1191 Some(name) => {
1192 if name.starts_with("_chunk_") || name.starts_with('_') {
1193 let sub_cid = Cid { hash: link.hash, key: cid.key, size: 0 };
1195 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1196 continue;
1197 }
1198 if path.is_empty() {
1199 name.clone()
1200 } else {
1201 format!("{}/{}", path, name)
1202 }
1203 }
1204 None => path.to_string(),
1205 };
1206
1207 let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1209 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1210 }
1211
1212 Ok(())
1213 }
1214
1215 pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1218 self.walk_parallel_with_progress(cid, path, concurrency, None).await
1219 }
1220
1221 pub async fn walk_parallel_with_progress(
1228 &self,
1229 cid: &Cid,
1230 path: &str,
1231 concurrency: usize,
1232 progress: Option<&std::sync::atomic::AtomicUsize>,
1233 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1234 use futures::stream::{FuturesUnordered, StreamExt};
1235 use std::collections::VecDeque;
1236 use std::sync::atomic::Ordering;
1237
1238 let mut entries = Vec::new();
1239 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1240 let mut active = FuturesUnordered::new();
1241
1242 pending.push_back((cid.clone(), path.to_string()));
1244
1245 loop {
1246 while active.len() < concurrency {
1248 if let Some((node_cid, node_path)) = pending.pop_front() {
1249 let store = &self.store;
1250 let fut = async move {
1251 let data = store.get(&node_cid.hash).await
1252 .map_err(|e| HashTreeError::Store(e.to_string()))?;
1253 Ok::<_, HashTreeError>((node_cid, node_path, data))
1254 };
1255 active.push(fut);
1256 } else {
1257 break;
1258 }
1259 }
1260
1261 if active.is_empty() {
1263 break;
1264 }
1265
1266 if let Some(result) = active.next().await {
1268 let (node_cid, node_path, data) = result?;
1269
1270 if let Some(counter) = progress {
1272 counter.fetch_add(1, Ordering::Relaxed);
1273 }
1274
1275 let data = match data {
1276 Some(d) => d,
1277 None => continue,
1278 };
1279
1280 let data = if let Some(key) = &node_cid.key {
1282 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1283 } else {
1284 data
1285 };
1286
1287 let node = match try_decode_tree_node(&data) {
1288 Some(n) => n,
1289 None => {
1290 entries.push(WalkEntry {
1292 path: node_path,
1293 hash: node_cid.hash,
1294 link_type: LinkType::Blob,
1295 size: data.len() as u64,
1296 key: node_cid.key,
1297 });
1298 continue;
1299 }
1300 };
1301
1302 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1304 entries.push(WalkEntry {
1305 path: node_path.clone(),
1306 hash: node_cid.hash,
1307 link_type: node.node_type,
1308 size: node_size,
1309 key: node_cid.key,
1310 });
1311
1312 for link in &node.links {
1314 let child_path = match &link.name {
1315 Some(name) => {
1316 if name.starts_with("_chunk_") || name.starts_with('_') {
1317 let sub_cid = Cid { hash: link.hash, key: node_cid.key, size: 0 };
1319 pending.push_back((sub_cid, node_path.clone()));
1320 continue;
1321 }
1322 if node_path.is_empty() {
1323 name.clone()
1324 } else {
1325 format!("{}/{}", node_path, name)
1326 }
1327 }
1328 None => node_path.clone(),
1329 };
1330
1331 if link.link_type == LinkType::Blob {
1334 entries.push(WalkEntry {
1335 path: child_path,
1336 hash: link.hash,
1337 link_type: LinkType::Blob,
1338 size: link.size,
1339 key: link.key,
1340 });
1341 if let Some(counter) = progress {
1342 counter.fetch_add(1, Ordering::Relaxed);
1343 }
1344 continue;
1345 }
1346
1347 let child_cid = Cid { hash: link.hash, key: link.key, size: link.size };
1349 pending.push_back((child_cid, child_path));
1350 }
1351 }
1352 }
1353
1354 Ok(entries)
1355 }
1356
1357 pub fn walk_stream(
1359 &self,
1360 cid: Cid,
1361 initial_path: String,
1362 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1363 Box::pin(stream::unfold(
1364 WalkStreamState::Init { cid, path: initial_path, tree: self },
1365 |state| async move {
1366 match state {
1367 WalkStreamState::Init { cid, path, tree } => {
1368 let data = match tree.store.get(&cid.hash).await {
1369 Ok(Some(d)) => d,
1370 Ok(None) => return None,
1371 Err(e) => {
1372 return Some((
1373 Err(HashTreeError::Store(e.to_string())),
1374 WalkStreamState::Done,
1375 ))
1376 }
1377 };
1378
1379 let data = if let Some(key) = &cid.key {
1381 match decrypt_chk(&data, key) {
1382 Ok(d) => d,
1383 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1384 }
1385 } else {
1386 data
1387 };
1388
1389 let node = match try_decode_tree_node(&data) {
1390 Some(n) => n,
1391 None => {
1392 let entry = WalkEntry {
1394 path,
1395 hash: cid.hash,
1396 link_type: LinkType::Blob,
1397 size: data.len() as u64,
1398 key: cid.key,
1399 };
1400 return Some((Ok(entry), WalkStreamState::Done));
1401 }
1402 };
1403
1404 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1405 let entry = WalkEntry {
1406 path: path.clone(),
1407 hash: cid.hash,
1408 link_type: node.node_type,
1409 size: node_size,
1410 key: cid.key,
1411 };
1412
1413 let mut stack: Vec<WalkStackItem> = Vec::new();
1415 for link in node.links.into_iter().rev() {
1416 let child_path = match &link.name {
1417 Some(name) if !name.starts_with('_') => {
1418 if path.is_empty() {
1419 name.clone()
1420 } else {
1421 format!("{}/{}", path, name)
1422 }
1423 }
1424 _ => path.clone(),
1425 };
1426 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1428 }
1429
1430 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1431 }
1432 WalkStreamState::Processing { mut stack, tree } => {
1433 tree.process_walk_stack(&mut stack).await
1434 }
1435 WalkStreamState::Done => None,
1436 }
1437 },
1438 ))
1439 }
1440
1441 async fn process_walk_stack<'a>(
1442 &'a self,
1443 stack: &mut Vec<WalkStackItem>,
1444 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1445 while let Some(item) = stack.pop() {
1446 let data = match self.store.get(&item.hash).await {
1447 Ok(Some(d)) => d,
1448 Ok(None) => continue,
1449 Err(e) => {
1450 return Some((
1451 Err(HashTreeError::Store(e.to_string())),
1452 WalkStreamState::Done,
1453 ))
1454 }
1455 };
1456
1457 let node = match try_decode_tree_node(&data) {
1458 Some(n) => n,
1459 None => {
1460 let entry = WalkEntry {
1462 path: item.path,
1463 hash: item.hash,
1464 link_type: LinkType::Blob,
1465 size: data.len() as u64,
1466 key: item.key,
1467 };
1468 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1469 }
1470 };
1471
1472 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1473 let entry = WalkEntry {
1474 path: item.path.clone(),
1475 hash: item.hash,
1476 link_type: node.node_type,
1477 size: node_size,
1478 key: None, };
1480
1481 for link in node.links.into_iter().rev() {
1483 let child_path = match &link.name {
1484 Some(name) if !name.starts_with('_') => {
1485 if item.path.is_empty() {
1486 name.clone()
1487 } else {
1488 format!("{}/{}", item.path, name)
1489 }
1490 }
1491 _ => item.path.clone(),
1492 };
1493 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1494 }
1495
1496 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1497 }
1498 None
1499 }
1500
1501 pub async fn set_entry(
1506 &self,
1507 root: &Cid,
1508 path: &[&str],
1509 name: &str,
1510 entry_cid: &Cid,
1511 link_type: LinkType,
1512 ) -> Result<Cid, HashTreeError> {
1513 let dir_cid = self.resolve_path_array(root, path).await?;
1514 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1515
1516 let entries = self.list_directory(&dir_cid).await?;
1517 let mut new_entries: Vec<DirEntry> = entries
1518 .into_iter()
1519 .filter(|e| e.name != name)
1520 .map(|e| DirEntry {
1521 name: e.name,
1522 hash: e.hash,
1523 size: e.size,
1524 key: e.key,
1525 link_type: e.link_type,
1526 meta: e.meta,
1527 })
1528 .collect();
1529
1530 new_entries.push(DirEntry {
1531 name: name.to_string(),
1532 hash: entry_cid.hash,
1533 size: entry_cid.size,
1534 key: entry_cid.key,
1535 link_type,
1536 meta: None,
1537 });
1538
1539 let new_dir_cid = self.put_directory(new_entries).await?;
1540 self.rebuild_path(root, path, new_dir_cid).await
1541 }
1542
1543 pub async fn remove_entry(
1546 &self,
1547 root: &Cid,
1548 path: &[&str],
1549 name: &str,
1550 ) -> Result<Cid, HashTreeError> {
1551 let dir_cid = self.resolve_path_array(root, path).await?;
1552 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1553
1554 let entries = self.list_directory(&dir_cid).await?;
1555 let new_entries: Vec<DirEntry> = entries
1556 .into_iter()
1557 .filter(|e| e.name != name)
1558 .map(|e| DirEntry {
1559 name: e.name,
1560 hash: e.hash,
1561 size: e.size,
1562 key: e.key,
1563 link_type: e.link_type,
1564 meta: e.meta,
1565 })
1566 .collect();
1567
1568 let new_dir_cid = self.put_directory(new_entries).await?;
1569 self.rebuild_path(root, path, new_dir_cid).await
1570 }
1571
1572 pub async fn rename_entry(
1575 &self,
1576 root: &Cid,
1577 path: &[&str],
1578 old_name: &str,
1579 new_name: &str,
1580 ) -> Result<Cid, HashTreeError> {
1581 if old_name == new_name {
1582 return Ok(root.clone());
1583 }
1584
1585 let dir_cid = self.resolve_path_array(root, path).await?;
1586 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1587
1588 let entries = self.list_directory(&dir_cid).await?;
1589 let entry = entries
1590 .iter()
1591 .find(|e| e.name == old_name)
1592 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1593
1594 let entry_hash = entry.hash;
1595 let entry_size = entry.size;
1596 let entry_key = entry.key;
1597 let entry_link_type = entry.link_type;
1598 let entry_meta = entry.meta.clone();
1599
1600 let new_entries: Vec<DirEntry> = entries
1601 .into_iter()
1602 .filter(|e| e.name != old_name)
1603 .map(|e| DirEntry {
1604 name: e.name,
1605 hash: e.hash,
1606 size: e.size,
1607 key: e.key,
1608 link_type: e.link_type,
1609 meta: e.meta,
1610 })
1611 .chain(std::iter::once(DirEntry {
1612 name: new_name.to_string(),
1613 hash: entry_hash,
1614 size: entry_size,
1615 key: entry_key,
1616 link_type: entry_link_type,
1617 meta: entry_meta,
1618 }))
1619 .collect();
1620
1621 let new_dir_cid = self.put_directory(new_entries).await?;
1622 self.rebuild_path(root, path, new_dir_cid).await
1623 }
1624
1625 pub async fn move_entry(
1628 &self,
1629 root: &Cid,
1630 source_path: &[&str],
1631 name: &str,
1632 target_path: &[&str],
1633 ) -> Result<Cid, HashTreeError> {
1634 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1635 let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1636
1637 let source_entries = self.list_directory(&source_dir_cid).await?;
1638 let entry = source_entries
1639 .iter()
1640 .find(|e| e.name == name)
1641 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1642
1643 let entry_cid = Cid {
1644 hash: entry.hash,
1645 key: entry.key,
1646 size: entry.size,
1647 };
1648 let entry_link_type = entry.link_type;
1649
1650 let new_root = self.remove_entry(root, source_path, name).await?;
1652
1653 self.set_entry(&new_root, target_path, name, &entry_cid, entry_link_type).await
1655 }
1656
1657 async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1658 if path.is_empty() {
1659 return Ok(Some(root.clone()));
1660 }
1661 self.resolve_path(root, &path.join("/")).await
1662 }
1663
1664 async fn rebuild_path(
1665 &self,
1666 root: &Cid,
1667 path: &[&str],
1668 new_child: Cid,
1669 ) -> Result<Cid, HashTreeError> {
1670 if path.is_empty() {
1671 return Ok(new_child);
1672 }
1673
1674 let mut child_cid = new_child;
1675 let parts: Vec<&str> = path.to_vec();
1676
1677 for i in (0..parts.len()).rev() {
1678 let child_name = parts[i];
1679 let parent_path = &parts[..i];
1680
1681 let parent_cid = if parent_path.is_empty() {
1682 root.clone()
1683 } else {
1684 self.resolve_path_array(root, parent_path)
1685 .await?
1686 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1687 };
1688
1689 let parent_entries = self.list_directory(&parent_cid).await?;
1690 let new_parent_entries: Vec<DirEntry> = parent_entries
1691 .into_iter()
1692 .map(|e| {
1693 if e.name == child_name {
1694 DirEntry {
1695 name: e.name,
1696 hash: child_cid.hash,
1697 size: child_cid.size,
1698 key: child_cid.key,
1699 link_type: e.link_type,
1700 meta: e.meta,
1701 }
1702 } else {
1703 DirEntry {
1704 name: e.name,
1705 hash: e.hash,
1706 size: e.size,
1707 key: e.key,
1708 link_type: e.link_type,
1709 meta: e.meta,
1710 }
1711 }
1712 })
1713 .collect();
1714
1715 child_cid = self.put_directory(new_parent_entries).await?;
1716 }
1717
1718 Ok(child_cid)
1719 }
1720
1721 pub fn get_store(&self) -> Arc<S> {
1725 self.store.clone()
1726 }
1727
1728 pub fn chunk_size(&self) -> usize {
1730 self.chunk_size
1731 }
1732
1733 pub fn max_links(&self) -> usize {
1735 self.max_links
1736 }
1737}
1738
1739enum StreamStackItem {
1742 Hash(Hash),
1743}
1744
1745enum ReadStreamState<'a, S: Store> {
1746 Init { hash: Hash, tree: &'a HashTree<S> },
1747 Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1748 Done,
1749}
1750
1751struct WalkStackItem {
1752 hash: Hash,
1753 path: String,
1754 key: Option<[u8; 32]>,
1755}
1756
1757enum WalkStreamState<'a, S: Store> {
1758 Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1759 Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1760 Done,
1761}
1762
1763struct EncryptedStackItem {
1765 hash: Hash,
1766 key: Option<[u8; 32]>,
1767}
1768
1769enum EncryptedStreamState<'a, S: Store> {
1770 Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1771 Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1772 Done,
1773}
1774
1775pub async fn verify_tree<S: Store>(
1777 store: Arc<S>,
1778 root_hash: &Hash,
1779) -> Result<crate::reader::VerifyResult, HashTreeError> {
1780 let mut missing = Vec::new();
1781 let mut visited = std::collections::HashSet::new();
1782
1783 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1784
1785 Ok(crate::reader::VerifyResult {
1786 valid: missing.is_empty(),
1787 missing,
1788 })
1789}
1790
1791async fn verify_recursive<S: Store>(
1792 store: Arc<S>,
1793 hash: &Hash,
1794 missing: &mut Vec<Hash>,
1795 visited: &mut std::collections::HashSet<String>,
1796) -> Result<(), HashTreeError> {
1797 let hex = to_hex(hash);
1798 if visited.contains(&hex) {
1799 return Ok(());
1800 }
1801 visited.insert(hex);
1802
1803 let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1804 Some(d) => d,
1805 None => {
1806 missing.push(*hash);
1807 return Ok(());
1808 }
1809 };
1810
1811 if is_tree_node(&data) {
1812 let node = decode_tree_node(&data)?;
1813 for link in &node.links {
1814 Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1815 }
1816 }
1817
1818 Ok(())
1819}
1820
1821#[cfg(test)]
1822mod tests {
1823 use super::*;
1824 use crate::store::MemoryStore;
1825
1826 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1827 let store = Arc::new(MemoryStore::new());
1828 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1830 (store, tree)
1831 }
1832
1833 #[tokio::test]
1834 async fn test_put_and_read_blob() {
1835 let (_store, tree) = make_tree();
1836
1837 let data = vec![1, 2, 3, 4, 5];
1838 let hash = tree.put_blob(&data).await.unwrap();
1839
1840 let result = tree.get_blob(&hash).await.unwrap();
1841 assert_eq!(result, Some(data));
1842 }
1843
1844 #[tokio::test]
1845 async fn test_put_and_read_file_small() {
1846 let (_store, tree) = make_tree();
1847
1848 let data = b"Hello, World!";
1849 let result = tree.put_file(data).await.unwrap();
1850
1851 assert_eq!(result.size, data.len() as u64);
1852
1853 let read_data = tree.read_file(&result.hash).await.unwrap();
1854 assert_eq!(read_data, Some(data.to_vec()));
1855 }
1856
1857 #[tokio::test]
1858 async fn test_put_and_read_directory() {
1859 let (_store, tree) = make_tree();
1860
1861 let file1 = tree.put_blob(b"content1").await.unwrap();
1862 let file2 = tree.put_blob(b"content2").await.unwrap();
1863
1864 let dir_cid = tree
1865 .put_directory(
1866 vec![
1867 DirEntry::new("a.txt", file1).with_size(8),
1868 DirEntry::new("b.txt", file2).with_size(8),
1869 ],
1870 )
1871 .await
1872 .unwrap();
1873
1874 let entries = tree.list_directory(&dir_cid).await.unwrap();
1875 assert_eq!(entries.len(), 2);
1876 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1877 assert!(names.contains(&"a.txt"));
1878 assert!(names.contains(&"b.txt"));
1879 }
1880
1881 #[tokio::test]
1882 async fn test_is_directory() {
1883 let (_store, tree) = make_tree();
1884
1885 let file_hash = tree.put_blob(b"data").await.unwrap();
1886 let dir_cid = tree.put_directory(vec![]).await.unwrap();
1887
1888 assert!(!tree.is_directory(&file_hash).await.unwrap());
1889 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1890 }
1891
1892 #[tokio::test]
1893 async fn test_resolve_path() {
1894 let (_store, tree) = make_tree();
1895
1896 let file_hash = tree.put_blob(b"nested").await.unwrap();
1897 let sub_dir = tree.put_directory(
1898 vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1899 ).await.unwrap();
1900 let root_dir = tree.put_directory(
1901 vec![DirEntry::new("subdir", sub_dir.hash)],
1902 ).await.unwrap();
1903
1904 let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1905 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1906 }
1907
1908 #[tokio::test]
1911 async fn test_unified_put_get_public() {
1912 let store = Arc::new(MemoryStore::new());
1913 let tree = HashTree::new(HashTreeConfig::new(store).public());
1915
1916 let data = b"Hello, public world!";
1917 let cid = tree.put(data).await.unwrap();
1918
1919 assert_eq!(cid.size, data.len() as u64);
1920 assert!(cid.key.is_none()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1923 assert_eq!(retrieved, data);
1924 }
1925
1926 #[tokio::test]
1927 async fn test_unified_put_get_encrypted() {
1928 let store = Arc::new(MemoryStore::new());
1929 let tree = HashTree::new(HashTreeConfig::new(store));
1931
1932 let data = b"Hello, encrypted world!";
1933 let cid = tree.put(data).await.unwrap();
1934
1935 assert_eq!(cid.size, data.len() as u64);
1936 assert!(cid.key.is_some()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1939 assert_eq!(retrieved, data);
1940 }
1941
1942 #[tokio::test]
1943 async fn test_unified_put_get_encrypted_chunked() {
1944 let store = Arc::new(MemoryStore::new());
1945 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
1946
1947 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1949 let cid = tree.put(&data).await.unwrap();
1950
1951 assert_eq!(cid.size, data.len() as u64);
1952 assert!(cid.key.is_some());
1953
1954 let retrieved = tree.get(&cid).await.unwrap().unwrap();
1955 assert_eq!(retrieved, data);
1956 }
1957
1958 #[tokio::test]
1959 async fn test_cid_deterministic() {
1960 let store = Arc::new(MemoryStore::new());
1961 let tree = HashTree::new(HashTreeConfig::new(store));
1962
1963 let data = b"Same content produces same CID";
1964
1965 let cid1 = tree.put(data).await.unwrap();
1966 let cid2 = tree.put(data).await.unwrap();
1967
1968 assert_eq!(cid1.hash, cid2.hash);
1970 assert_eq!(cid1.key, cid2.key);
1971 assert_eq!(cid1.to_string(), cid2.to_string());
1972 }
1973
1974 #[tokio::test]
1975 async fn test_cid_to_string_public() {
1976 let store = Arc::new(MemoryStore::new());
1977 let tree = HashTree::new(HashTreeConfig::new(store).public());
1978
1979 let cid = tree.put(b"test").await.unwrap();
1980 let s = cid.to_string();
1981
1982 assert_eq!(s.len(), 64);
1984 assert!(!s.contains(':'));
1985 }
1986
1987 #[tokio::test]
1988 async fn test_cid_to_string_encrypted() {
1989 let store = Arc::new(MemoryStore::new());
1990 let tree = HashTree::new(HashTreeConfig::new(store));
1991
1992 let cid = tree.put(b"test").await.unwrap();
1993 let s = cid.to_string();
1994
1995 assert_eq!(s.len(), 129);
1997 assert!(s.contains(':'));
1998 }
1999}