1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25 pub store: Arc<S>,
26 pub chunk_size: usize,
27 pub max_links: usize,
28 pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33 pub fn new(store: Arc<S>) -> Self {
34 Self {
35 store,
36 chunk_size: DEFAULT_CHUNK_SIZE,
37 max_links: DEFAULT_MAX_LINKS,
38 encrypted: true,
39 }
40 }
41
42 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43 self.chunk_size = chunk_size;
44 self
45 }
46
47 pub fn with_max_links(mut self, max_links: usize) -> Self {
48 self.max_links = max_links;
49 self
50 }
51
52 pub fn public(mut self) -> Self {
54 self.encrypted = false;
55 self
56 }
57}
58
59#[derive(Debug, thiserror::Error)]
61pub enum HashTreeError {
62 #[error("Store error: {0}")]
63 Store(String),
64 #[error("Codec error: {0}")]
65 Codec(#[from] crate::codec::CodecError),
66 #[error("Missing chunk: {0}")]
67 MissingChunk(String),
68 #[error("Path not found: {0}")]
69 PathNotFound(String),
70 #[error("Entry not found: {0}")]
71 EntryNotFound(String),
72 #[error("Encryption error: {0}")]
73 Encryption(String),
74 #[error("Decryption error: {0}")]
75 Decryption(String),
76}
77
78impl From<BuilderError> for HashTreeError {
79 fn from(e: BuilderError) -> Self {
80 match e {
81 BuilderError::Store(s) => HashTreeError::Store(s),
82 BuilderError::Codec(c) => HashTreeError::Codec(c),
83 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
84 }
85 }
86}
87
88impl From<ReaderError> for HashTreeError {
89 fn from(e: ReaderError) -> Self {
90 match e {
91 ReaderError::Store(s) => HashTreeError::Store(s),
92 ReaderError::Codec(c) => HashTreeError::Codec(c),
93 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
94 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
95 ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
96 }
97 }
98}
99
100pub struct HashTree<S: Store> {
102 store: Arc<S>,
103 chunk_size: usize,
104 max_links: usize,
105 encrypted: bool,
106}
107
108impl<S: Store> HashTree<S> {
109 pub fn new(config: HashTreeConfig<S>) -> Self {
110 Self {
111 store: config.store,
112 chunk_size: config.chunk_size,
113 max_links: config.max_links,
114 encrypted: config.encrypted,
115 }
116 }
117
118 pub fn is_encrypted(&self) -> bool {
120 self.encrypted
121 }
122
123 pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
128 let size = data.len() as u64;
129
130 if data.len() <= self.chunk_size {
132 let (hash, key) = self.put_chunk_internal(data).await?;
133 return Ok((Cid { hash, key }, size));
134 }
135
136 let mut links: Vec<Link> = Vec::new();
138 let mut offset = 0;
139
140 while offset < data.len() {
141 let end = (offset + self.chunk_size).min(data.len());
142 let chunk = &data[offset..end];
143 let chunk_size = chunk.len() as u64;
144 let (hash, key) = self.put_chunk_internal(chunk).await?;
145 links.push(Link {
146 hash,
147 name: None,
148 size: chunk_size,
149 key,
150 link_type: LinkType::Blob, meta: None,
152 });
153 offset = end;
154 }
155
156 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
158 Ok((Cid { hash: root_hash, key: root_key }, size))
159 }
160
161 pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
163 if let Some(key) = cid.key {
164 self.get_encrypted(&cid.hash, &key).await
165 } else {
166 self.read_file(&cid.hash).await
167 }
168 }
169
170 pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<(Cid, u64), HashTreeError> {
176 let mut buffer = vec![0u8; self.chunk_size];
177 let mut links = Vec::new();
178 let mut total_size: u64 = 0;
179 let mut consistent_key: Option<[u8; 32]> = None;
180
181 loop {
182 let mut chunk = Vec::new();
183 let mut bytes_read = 0;
184
185 while bytes_read < self.chunk_size {
187 let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
188 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
189 if n == 0 {
190 break; }
192 chunk.extend_from_slice(&buffer[..n]);
193 bytes_read += n;
194 }
195
196 if chunk.is_empty() {
197 break; }
199
200 let chunk_len = chunk.len() as u64;
201 total_size += chunk_len;
202
203 let (hash, key) = self.put_chunk_internal(&chunk).await?;
204
205 if links.is_empty() {
207 consistent_key = key;
208 } else if consistent_key != key {
209 consistent_key = None;
210 }
211
212 links.push(Link {
213 hash,
214 name: None,
215 size: chunk_len,
216 key,
217 link_type: LinkType::Blob, meta: None,
219 });
220 }
221
222 if links.is_empty() {
223 let (hash, key) = self.put_chunk_internal(&[]).await?;
225 return Ok((Cid { hash, key }, 0));
226 }
227
228 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
230 Ok((Cid { hash: root_hash, key: root_key }, total_size))
231 }
232
233 pub fn get_stream(
238 &self,
239 cid: &Cid,
240 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
241 let hash = cid.hash;
242 let key = cid.key;
243
244 if let Some(k) = key {
245 Box::pin(self.read_file_stream_encrypted(hash, k))
247 } else {
248 self.read_file_stream(hash)
250 }
251 }
252
253 fn read_file_stream_encrypted(
255 &self,
256 hash: Hash,
257 key: EncryptionKey,
258 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
259 stream::unfold(
260 EncryptedStreamState::Init { hash, key, tree: self },
261 |state| async move {
262 match state {
263 EncryptedStreamState::Init { hash, key, tree } => {
264 let data = match tree.store.get(&hash).await {
265 Ok(Some(d)) => d,
266 Ok(None) => return None,
267 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
268 };
269
270 let decrypted = match decrypt_chk(&data, &key) {
272 Ok(d) => d,
273 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
274 };
275
276 if !is_tree_node(&decrypted) {
277 return Some((Ok(decrypted), EncryptedStreamState::Done));
279 }
280
281 let node = match decode_tree_node(&decrypted) {
283 Ok(n) => n,
284 Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
285 };
286
287 let mut stack: Vec<EncryptedStackItem> = Vec::new();
288 for link in node.links.into_iter().rev() {
289 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
290 }
291
292 tree.process_encrypted_stream_stack(&mut stack).await
293 }
294 EncryptedStreamState::Processing { mut stack, tree } => {
295 tree.process_encrypted_stream_stack(&mut stack).await
296 }
297 EncryptedStreamState::Done => None,
298 }
299 },
300 )
301 }
302
303 async fn process_encrypted_stream_stack<'a>(
304 &'a self,
305 stack: &mut Vec<EncryptedStackItem>,
306 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
307 while let Some(item) = stack.pop() {
308 let data = match self.store.get(&item.hash).await {
309 Ok(Some(d)) => d,
310 Ok(None) => {
311 return Some((
312 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
313 EncryptedStreamState::Done,
314 ))
315 }
316 Err(e) => {
317 return Some((
318 Err(HashTreeError::Store(e.to_string())),
319 EncryptedStreamState::Done,
320 ))
321 }
322 };
323
324 let decrypted = if let Some(key) = item.key {
326 match decrypt_chk(&data, &key) {
327 Ok(d) => d,
328 Err(e) => {
329 return Some((
330 Err(HashTreeError::Decryption(e.to_string())),
331 EncryptedStreamState::Done,
332 ))
333 }
334 }
335 } else {
336 data
337 };
338
339 if is_tree_node(&decrypted) {
340 let node = match decode_tree_node(&decrypted) {
342 Ok(n) => n,
343 Err(e) => {
344 return Some((
345 Err(HashTreeError::Codec(e)),
346 EncryptedStreamState::Done,
347 ))
348 }
349 };
350 for link in node.links.into_iter().rev() {
351 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
352 }
353 } else {
354 return Some((
356 Ok(decrypted),
357 EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
358 ));
359 }
360 }
361 None
362 }
363
364 async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
366 if self.encrypted {
367 let (encrypted, key) = encrypt_chk(data)
368 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
369 let hash = sha256(&encrypted);
370 self.store
371 .put(hash, encrypted)
372 .await
373 .map_err(|e| HashTreeError::Store(e.to_string()))?;
374 Ok((hash, Some(key)))
375 } else {
376 let hash = self.put_blob(data).await?;
377 Ok((hash, None))
378 }
379 }
380
381 async fn build_tree_internal(
383 &self,
384 links: Vec<Link>,
385 total_size: Option<u64>,
386 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
387 if links.len() == 1 {
389 if let Some(ts) = total_size {
390 if links[0].size == ts {
391 return Ok((links[0].hash, links[0].key));
392 }
393 }
394 }
395
396 if links.len() <= self.max_links {
397 let node = TreeNode {
398 node_type: LinkType::File,
399 links,
400 };
401 let (data, _) = encode_and_hash(&node)?;
402
403 if self.encrypted {
404 let (encrypted, key) = encrypt_chk(&data)
405 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
406 let hash = sha256(&encrypted);
407 self.store
408 .put(hash, encrypted)
409 .await
410 .map_err(|e| HashTreeError::Store(e.to_string()))?;
411 return Ok((hash, Some(key)));
412 }
413
414 let hash = sha256(&data);
416 self.store
417 .put(hash, data)
418 .await
419 .map_err(|e| HashTreeError::Store(e.to_string()))?;
420 return Ok((hash, None));
421 }
422
423 let mut sub_links = Vec::new();
425 for batch in links.chunks(self.max_links) {
426 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
427 let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
428 sub_links.push(Link {
429 hash,
430 name: None,
431 size: batch_size,
432 key,
433 link_type: LinkType::File, meta: None,
435 });
436 }
437
438 Box::pin(self.build_tree_internal(sub_links, total_size)).await
439 }
440
441 async fn get_encrypted(
443 &self,
444 hash: &Hash,
445 key: &EncryptionKey,
446 ) -> Result<Option<Vec<u8>>, HashTreeError> {
447 let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
448 Some(d) => d,
449 None => return Ok(None),
450 };
451
452 let decrypted = decrypt_chk(&encrypted_data, key)
454 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
455
456 if is_tree_node(&decrypted) {
458 let node = decode_tree_node(&decrypted)?;
459 let assembled = self.assemble_encrypted_chunks(&node).await?;
460 return Ok(Some(assembled));
461 }
462
463 Ok(Some(decrypted))
465 }
466
467 async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
469 let mut parts: Vec<Vec<u8>> = Vec::new();
470
471 for link in &node.links {
472 let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
473
474 let encrypted_child = self
475 .store
476 .get(&link.hash)
477 .await
478 .map_err(|e| HashTreeError::Store(e.to_string()))?
479 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
480
481 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
482 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
483
484 if is_tree_node(&decrypted) {
485 let child_node = decode_tree_node(&decrypted)?;
487 let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
488 parts.push(child_data);
489 } else {
490 parts.push(decrypted);
492 }
493 }
494
495 let total_len: usize = parts.iter().map(|p| p.len()).sum();
496 let mut result = Vec::with_capacity(total_len);
497 for part in parts {
498 result.extend_from_slice(&part);
499 }
500
501 Ok(result)
502 }
503
504 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
509 let hash = sha256(data);
510 self.store
511 .put(hash, data.to_vec())
512 .await
513 .map_err(|e| HashTreeError::Store(e.to_string()))?;
514 Ok(hash)
515 }
516
517 pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
520 let size = data.len() as u64;
521
522 if data.len() <= self.chunk_size {
524 let (hash, key) = self.put_chunk_internal(data).await?;
525 return Ok((Cid { hash, key }, size));
526 }
527
528 let mut links: Vec<Link> = Vec::new();
530 let mut offset = 0;
531
532 while offset < data.len() {
533 let end = (offset + self.chunk_size).min(data.len());
534 let chunk = &data[offset..end];
535 let chunk_size = (end - offset) as u64;
536
537 let (hash, key) = self.put_chunk_internal(chunk).await?;
538 links.push(Link {
539 hash,
540 name: None,
541 size: chunk_size,
542 key,
543 link_type: LinkType::Blob, meta: None,
545 });
546 offset = end;
547 }
548
549 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
551 Ok((Cid { hash: root_hash, key: root_key }, size))
552 }
553
554 pub async fn put_directory(
560 &self,
561 entries: Vec<DirEntry>,
562 ) -> Result<Cid, HashTreeError> {
563 let mut sorted = entries;
565 sorted.sort_by(|a, b| a.name.cmp(&b.name));
566
567 let links: Vec<Link> = sorted
568 .into_iter()
569 .map(|e| Link {
570 hash: e.hash,
571 name: Some(e.name),
572 size: e.size,
573 key: e.key,
574 link_type: e.link_type,
575 meta: e.meta,
576 })
577 .collect();
578
579 let node = TreeNode {
581 node_type: LinkType::Dir,
582 links,
583 };
584 let (data, _plain_hash) = encode_and_hash(&node)?;
585
586 let (cid, _size) = self.put(&data).await?;
591 Ok(cid)
592 }
593
594 pub async fn put_tree_node(
596 &self,
597 links: Vec<Link>,
598 ) -> Result<Hash, HashTreeError> {
599 let node = TreeNode {
600 node_type: LinkType::Dir,
601 links,
602 };
603
604 let (data, hash) = encode_and_hash(&node)?;
605 self.store
606 .put(hash, data)
607 .await
608 .map_err(|e| HashTreeError::Store(e.to_string()))?;
609 Ok(hash)
610 }
611
612 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
616 self.store
617 .get(hash)
618 .await
619 .map_err(|e| HashTreeError::Store(e.to_string()))
620 }
621
622 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
624 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
625 Some(d) => d,
626 None => return Ok(None),
627 };
628
629 if !is_tree_node(&data) {
630 return Ok(None);
631 }
632
633 let node = decode_tree_node(&data)?;
634 Ok(Some(node))
635 }
636
637 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
639 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
640 Some(d) => d,
641 None => return Ok(None),
642 };
643
644 let decrypted = if let Some(key) = &cid.key {
646 decrypt_chk(&data, key)
647 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
648 } else {
649 data
650 };
651
652 if !is_tree_node(&decrypted) {
653 return Ok(None);
654 }
655
656 let node = decode_tree_node(&decrypted)?;
657 Ok(Some(node))
658 }
659
660 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
663 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
664 Some(d) => d,
665 None => return Ok(None),
666 };
667
668 let decrypted = if let Some(key) = &cid.key {
670 decrypt_chk(&data, key)
671 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
672 } else {
673 data
674 };
675
676 if !is_tree_node(&decrypted) {
677 return Ok(None);
678 }
679
680 let node = decode_tree_node(&decrypted)?;
681
682 if node.node_type == LinkType::File {
684 let assembled = self.assemble_chunks(&node).await?;
685 if is_tree_node(&assembled) {
686 let inner_node = decode_tree_node(&assembled)?;
687 return Ok(Some(inner_node));
688 }
689 }
690
691 Ok(Some(node))
692 }
693
694 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
696 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
697 Some(d) => d,
698 None => return Ok(false),
699 };
700 Ok(is_tree_node(&data))
701 }
702
703 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
705 let node = match self.get_node(cid).await? {
706 Some(n) => n,
707 None => return Ok(false),
708 };
709 Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
711 }
712
713 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
715 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
716 Some(d) => d,
717 None => return Ok(false),
718 };
719 Ok(is_directory_node(&data))
720 }
721
722 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
724 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
725 Some(d) => d,
726 None => return Ok(None),
727 };
728
729 if !is_tree_node(&data) {
731 return Ok(Some(data));
732 }
733
734 let node = decode_tree_node(&data)?;
736 let assembled = self.assemble_chunks(&node).await?;
737 Ok(Some(assembled))
738 }
739
740 async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
742 let mut parts: Vec<Vec<u8>> = Vec::new();
743
744 for link in &node.links {
745 let child_data = self
746 .store
747 .get(&link.hash)
748 .await
749 .map_err(|e| HashTreeError::Store(e.to_string()))?
750 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
751
752 if is_tree_node(&child_data) {
753 let child_node = decode_tree_node(&child_data)?;
754 parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
755 } else {
756 parts.push(child_data);
757 }
758 }
759
760 let total_length: usize = parts.iter().map(|p| p.len()).sum();
762 let mut result = Vec::with_capacity(total_length);
763 for part in parts {
764 result.extend_from_slice(&part);
765 }
766
767 Ok(result)
768 }
769
770 pub fn read_file_stream(
773 &self,
774 hash: Hash,
775 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
776 Box::pin(stream::unfold(
777 ReadStreamState::Init { hash, tree: self },
778 |state| async move {
779 match state {
780 ReadStreamState::Init { hash, tree } => {
781 let data = match tree.store.get(&hash).await {
782 Ok(Some(d)) => d,
783 Ok(None) => return None,
784 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
785 };
786
787 if !is_tree_node(&data) {
788 return Some((Ok(data), ReadStreamState::Done));
790 }
791
792 let node = match decode_tree_node(&data) {
794 Ok(n) => n,
795 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
796 };
797
798 let mut stack: Vec<StreamStackItem> = Vec::new();
800 for link in node.links.into_iter().rev() {
801 stack.push(StreamStackItem::Hash(link.hash));
802 }
803
804 tree.process_stream_stack(&mut stack).await
806 }
807 ReadStreamState::Processing { mut stack, tree } => {
808 tree.process_stream_stack(&mut stack).await
809 }
810 ReadStreamState::Done => None,
811 }
812 },
813 ))
814 }
815
816 async fn process_stream_stack<'a>(
817 &'a self,
818 stack: &mut Vec<StreamStackItem>,
819 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
820 while let Some(item) = stack.pop() {
821 match item {
822 StreamStackItem::Hash(hash) => {
823 let data = match self.store.get(&hash).await {
824 Ok(Some(d)) => d,
825 Ok(None) => {
826 return Some((
827 Err(HashTreeError::MissingChunk(to_hex(&hash))),
828 ReadStreamState::Done,
829 ))
830 }
831 Err(e) => {
832 return Some((
833 Err(HashTreeError::Store(e.to_string())),
834 ReadStreamState::Done,
835 ))
836 }
837 };
838
839 if is_tree_node(&data) {
840 let node = match decode_tree_node(&data) {
842 Ok(n) => n,
843 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
844 };
845 for link in node.links.into_iter().rev() {
846 stack.push(StreamStackItem::Hash(link.hash));
847 }
848 } else {
849 return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
851 }
852 }
853 }
854 }
855 None
856 }
857
858 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
860 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
861 Some(d) => d,
862 None => return Ok(vec![]),
863 };
864
865 if !is_tree_node(&data) {
866 return Ok(vec![data]);
867 }
868
869 let node = decode_tree_node(&data)?;
870 self.collect_chunks(&node).await
871 }
872
873 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
874 let mut chunks = Vec::new();
875
876 for link in &node.links {
877 let child_data = self
878 .store
879 .get(&link.hash)
880 .await
881 .map_err(|e| HashTreeError::Store(e.to_string()))?
882 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
883
884 if is_tree_node(&child_data) {
885 let child_node = decode_tree_node(&child_data)?;
886 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
887 } else {
888 chunks.push(child_data);
889 }
890 }
891
892 Ok(chunks)
893 }
894
895 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
897 let node = match self.get_node(cid).await? {
898 Some(n) => n,
899 None => return Ok(vec![]),
900 };
901
902 let mut entries = Vec::new();
903
904 for link in &node.links {
905 if let Some(ref name) = link.name {
907 if name.starts_with("_chunk_") || name.starts_with('_') {
908 let chunk_cid = Cid { hash: link.hash, key: link.key };
909 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
910 entries.extend(sub_entries);
911 continue;
912 }
913 }
914
915 entries.push(TreeEntry {
916 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
917 hash: link.hash,
918 size: link.size,
919 link_type: link.link_type,
920 key: link.key,
921 meta: link.meta.clone(),
922 });
923 }
924
925 Ok(entries)
926 }
927
928 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
931 let node = match self.get_directory_node(cid).await? {
933 Some(n) => n,
934 None => return Ok(vec![]),
935 };
936
937 let mut entries = Vec::new();
938
939 for link in &node.links {
940 if let Some(ref name) = link.name {
942 if name.starts_with("_chunk_") || name.starts_with('_') {
943 let sub_cid = Cid { hash: link.hash, key: cid.key };
945 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
946 entries.extend(sub_entries);
947 continue;
948 }
949 }
950
951 entries.push(TreeEntry {
952 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
953 hash: link.hash,
954 size: link.size,
955 link_type: link.link_type,
956 key: link.key,
957 meta: link.meta.clone(),
958 });
959 }
960
961 Ok(entries)
962 }
963
964 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
966 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
967 if parts.is_empty() {
968 return Ok(Some(cid.clone()));
969 }
970
971 let mut current_cid = cid.clone();
972
973 for part in parts {
974 let node = match self.get_directory_node(¤t_cid).await? {
976 Some(n) => n,
977 None => return Ok(None),
978 };
979
980 if let Some(link) = self.find_link(&node, part) {
981 current_cid = Cid {
982 hash: link.hash,
983 key: link.key,
984 };
985 } else {
986 match self.find_link_in_subtrees_cid(&node, part, ¤t_cid).await? {
988 Some(link) => {
989 current_cid = Cid {
990 hash: link.hash,
991 key: link.key,
992 };
993 }
994 None => return Ok(None),
995 }
996 }
997 }
998
999 Ok(Some(current_cid))
1000 }
1001
1002 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1004 self.resolve(cid, path).await
1005 }
1006
1007 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1008 node.links
1009 .iter()
1010 .find(|l| l.name.as_deref() == Some(name))
1011 .cloned()
1012 }
1013
1014 async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1016 for link in &node.links {
1017 if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1018 continue;
1019 }
1020
1021 let sub_cid = Cid {
1023 hash: link.hash,
1024 key: link.key.clone(),
1025 };
1026
1027 let sub_node = match self.get_node(&sub_cid).await? {
1028 Some(n) => n,
1029 None => continue,
1030 };
1031
1032 if let Some(found) = self.find_link(&sub_node, name) {
1033 return Ok(Some(found));
1034 }
1035
1036 if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1037 return Ok(Some(deep_found));
1038 }
1039 }
1040
1041 Ok(None)
1042 }
1043
1044 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1046 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1047 Some(d) => d,
1048 None => return Ok(0),
1049 };
1050
1051 if !is_tree_node(&data) {
1052 return Ok(data.len() as u64);
1053 }
1054
1055 let node = decode_tree_node(&data)?;
1056 let mut total = 0u64;
1058 for link in &node.links {
1059 total += link.size;
1060 }
1061 Ok(total)
1062 }
1063
1064 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1066 let mut entries = Vec::new();
1067 self.walk_recursive(cid, path, &mut entries).await?;
1068 Ok(entries)
1069 }
1070
1071 async fn walk_recursive(
1072 &self,
1073 cid: &Cid,
1074 path: &str,
1075 entries: &mut Vec<WalkEntry>,
1076 ) -> Result<(), HashTreeError> {
1077 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1078 Some(d) => d,
1079 None => return Ok(()),
1080 };
1081
1082 let data = if let Some(key) = &cid.key {
1084 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1085 } else {
1086 data
1087 };
1088
1089 let node = match try_decode_tree_node(&data) {
1090 Some(n) => n,
1091 None => {
1092 entries.push(WalkEntry {
1093 path: path.to_string(),
1094 hash: cid.hash,
1095 link_type: LinkType::Blob,
1096 size: data.len() as u64,
1097 key: cid.key,
1098 });
1099 return Ok(());
1100 }
1101 };
1102
1103 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1104 entries.push(WalkEntry {
1105 path: path.to_string(),
1106 hash: cid.hash,
1107 link_type: node.node_type,
1108 size: node_size,
1109 key: cid.key,
1110 });
1111
1112 for link in &node.links {
1113 let child_path = match &link.name {
1114 Some(name) => {
1115 if name.starts_with("_chunk_") || name.starts_with('_') {
1116 let sub_cid = Cid { hash: link.hash, key: cid.key };
1118 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1119 continue;
1120 }
1121 if path.is_empty() {
1122 name.clone()
1123 } else {
1124 format!("{}/{}", path, name)
1125 }
1126 }
1127 None => path.to_string(),
1128 };
1129
1130 let child_cid = Cid { hash: link.hash, key: link.key };
1132 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1133 }
1134
1135 Ok(())
1136 }
1137
1138 pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1141 self.walk_parallel_with_progress(cid, path, concurrency, None).await
1142 }
1143
1144 pub async fn walk_parallel_with_progress(
1151 &self,
1152 cid: &Cid,
1153 path: &str,
1154 concurrency: usize,
1155 progress: Option<&std::sync::atomic::AtomicUsize>,
1156 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1157 use futures::stream::{FuturesUnordered, StreamExt};
1158 use std::collections::VecDeque;
1159 use std::sync::atomic::Ordering;
1160
1161 let mut entries = Vec::new();
1162 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1163 let mut active = FuturesUnordered::new();
1164
1165 pending.push_back((cid.clone(), path.to_string()));
1167
1168 loop {
1169 while active.len() < concurrency {
1171 if let Some((node_cid, node_path)) = pending.pop_front() {
1172 let store = &self.store;
1173 let fut = async move {
1174 let data = store.get(&node_cid.hash).await
1175 .map_err(|e| HashTreeError::Store(e.to_string()))?;
1176 Ok::<_, HashTreeError>((node_cid, node_path, data))
1177 };
1178 active.push(fut);
1179 } else {
1180 break;
1181 }
1182 }
1183
1184 if active.is_empty() {
1186 break;
1187 }
1188
1189 if let Some(result) = active.next().await {
1191 let (node_cid, node_path, data) = result?;
1192
1193 if let Some(counter) = progress {
1195 counter.fetch_add(1, Ordering::Relaxed);
1196 }
1197
1198 let data = match data {
1199 Some(d) => d,
1200 None => continue,
1201 };
1202
1203 let data = if let Some(key) = &node_cid.key {
1205 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1206 } else {
1207 data
1208 };
1209
1210 let node = match try_decode_tree_node(&data) {
1211 Some(n) => n,
1212 None => {
1213 entries.push(WalkEntry {
1215 path: node_path,
1216 hash: node_cid.hash,
1217 link_type: LinkType::Blob,
1218 size: data.len() as u64,
1219 key: node_cid.key,
1220 });
1221 continue;
1222 }
1223 };
1224
1225 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1227 entries.push(WalkEntry {
1228 path: node_path.clone(),
1229 hash: node_cid.hash,
1230 link_type: node.node_type,
1231 size: node_size,
1232 key: node_cid.key,
1233 });
1234
1235 for link in &node.links {
1237 let child_path = match &link.name {
1238 Some(name) => {
1239 if name.starts_with("_chunk_") || name.starts_with('_') {
1240 let sub_cid = Cid { hash: link.hash, key: node_cid.key };
1242 pending.push_back((sub_cid, node_path.clone()));
1243 continue;
1244 }
1245 if node_path.is_empty() {
1246 name.clone()
1247 } else {
1248 format!("{}/{}", node_path, name)
1249 }
1250 }
1251 None => node_path.clone(),
1252 };
1253
1254 if link.link_type == LinkType::Blob {
1257 entries.push(WalkEntry {
1258 path: child_path,
1259 hash: link.hash,
1260 link_type: LinkType::Blob,
1261 size: link.size,
1262 key: link.key,
1263 });
1264 if let Some(counter) = progress {
1265 counter.fetch_add(1, Ordering::Relaxed);
1266 }
1267 continue;
1268 }
1269
1270 let child_cid = Cid { hash: link.hash, key: link.key };
1272 pending.push_back((child_cid, child_path));
1273 }
1274 }
1275 }
1276
1277 Ok(entries)
1278 }
1279
1280 pub fn walk_stream(
1282 &self,
1283 cid: Cid,
1284 initial_path: String,
1285 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1286 Box::pin(stream::unfold(
1287 WalkStreamState::Init { cid, path: initial_path, tree: self },
1288 |state| async move {
1289 match state {
1290 WalkStreamState::Init { cid, path, tree } => {
1291 let data = match tree.store.get(&cid.hash).await {
1292 Ok(Some(d)) => d,
1293 Ok(None) => return None,
1294 Err(e) => {
1295 return Some((
1296 Err(HashTreeError::Store(e.to_string())),
1297 WalkStreamState::Done,
1298 ))
1299 }
1300 };
1301
1302 let data = if let Some(key) = &cid.key {
1304 match decrypt_chk(&data, key) {
1305 Ok(d) => d,
1306 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1307 }
1308 } else {
1309 data
1310 };
1311
1312 let node = match try_decode_tree_node(&data) {
1313 Some(n) => n,
1314 None => {
1315 let entry = WalkEntry {
1317 path,
1318 hash: cid.hash,
1319 link_type: LinkType::Blob,
1320 size: data.len() as u64,
1321 key: cid.key,
1322 };
1323 return Some((Ok(entry), WalkStreamState::Done));
1324 }
1325 };
1326
1327 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1328 let entry = WalkEntry {
1329 path: path.clone(),
1330 hash: cid.hash,
1331 link_type: node.node_type,
1332 size: node_size,
1333 key: cid.key,
1334 };
1335
1336 let mut stack: Vec<WalkStackItem> = Vec::new();
1338 for link in node.links.into_iter().rev() {
1339 let child_path = match &link.name {
1340 Some(name) if !name.starts_with('_') => {
1341 if path.is_empty() {
1342 name.clone()
1343 } else {
1344 format!("{}/{}", path, name)
1345 }
1346 }
1347 _ => path.clone(),
1348 };
1349 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1351 }
1352
1353 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1354 }
1355 WalkStreamState::Processing { mut stack, tree } => {
1356 tree.process_walk_stack(&mut stack).await
1357 }
1358 WalkStreamState::Done => None,
1359 }
1360 },
1361 ))
1362 }
1363
1364 async fn process_walk_stack<'a>(
1365 &'a self,
1366 stack: &mut Vec<WalkStackItem>,
1367 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1368 while let Some(item) = stack.pop() {
1369 let data = match self.store.get(&item.hash).await {
1370 Ok(Some(d)) => d,
1371 Ok(None) => continue,
1372 Err(e) => {
1373 return Some((
1374 Err(HashTreeError::Store(e.to_string())),
1375 WalkStreamState::Done,
1376 ))
1377 }
1378 };
1379
1380 let node = match try_decode_tree_node(&data) {
1381 Some(n) => n,
1382 None => {
1383 let entry = WalkEntry {
1385 path: item.path,
1386 hash: item.hash,
1387 link_type: LinkType::Blob,
1388 size: data.len() as u64,
1389 key: item.key,
1390 };
1391 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1392 }
1393 };
1394
1395 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1396 let entry = WalkEntry {
1397 path: item.path.clone(),
1398 hash: item.hash,
1399 link_type: node.node_type,
1400 size: node_size,
1401 key: None, };
1403
1404 for link in node.links.into_iter().rev() {
1406 let child_path = match &link.name {
1407 Some(name) if !name.starts_with('_') => {
1408 if item.path.is_empty() {
1409 name.clone()
1410 } else {
1411 format!("{}/{}", item.path, name)
1412 }
1413 }
1414 _ => item.path.clone(),
1415 };
1416 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1417 }
1418
1419 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1420 }
1421 None
1422 }
1423
1424 pub async fn set_entry(
1429 &self,
1430 root: &Cid,
1431 path: &[&str],
1432 name: &str,
1433 entry_cid: &Cid,
1434 size: u64,
1435 link_type: LinkType,
1436 ) -> Result<Cid, HashTreeError> {
1437 let dir_cid = self.resolve_path_array(root, path).await?;
1438 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1439
1440 let entries = self.list_directory(&dir_cid).await?;
1441 let mut new_entries: Vec<DirEntry> = entries
1442 .into_iter()
1443 .filter(|e| e.name != name)
1444 .map(|e| DirEntry {
1445 name: e.name,
1446 hash: e.hash,
1447 size: e.size,
1448 key: e.key,
1449 link_type: e.link_type,
1450 meta: e.meta,
1451 })
1452 .collect();
1453
1454 new_entries.push(DirEntry {
1455 name: name.to_string(),
1456 hash: entry_cid.hash,
1457 size,
1458 key: entry_cid.key,
1459 link_type,
1460 meta: None,
1461 });
1462
1463 let new_dir_cid = self.put_directory(new_entries).await?;
1464 self.rebuild_path(root, path, new_dir_cid).await
1465 }
1466
1467 pub async fn remove_entry(
1470 &self,
1471 root: &Cid,
1472 path: &[&str],
1473 name: &str,
1474 ) -> Result<Cid, HashTreeError> {
1475 let dir_cid = self.resolve_path_array(root, path).await?;
1476 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1477
1478 let entries = self.list_directory(&dir_cid).await?;
1479 let new_entries: Vec<DirEntry> = entries
1480 .into_iter()
1481 .filter(|e| e.name != name)
1482 .map(|e| DirEntry {
1483 name: e.name,
1484 hash: e.hash,
1485 size: e.size,
1486 key: e.key,
1487 link_type: e.link_type,
1488 meta: e.meta,
1489 })
1490 .collect();
1491
1492 let new_dir_cid = self.put_directory(new_entries).await?;
1493 self.rebuild_path(root, path, new_dir_cid).await
1494 }
1495
1496 pub async fn rename_entry(
1499 &self,
1500 root: &Cid,
1501 path: &[&str],
1502 old_name: &str,
1503 new_name: &str,
1504 ) -> Result<Cid, HashTreeError> {
1505 if old_name == new_name {
1506 return Ok(root.clone());
1507 }
1508
1509 let dir_cid = self.resolve_path_array(root, path).await?;
1510 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1511
1512 let entries = self.list_directory(&dir_cid).await?;
1513 let entry = entries
1514 .iter()
1515 .find(|e| e.name == old_name)
1516 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1517
1518 let entry_hash = entry.hash;
1519 let entry_size = entry.size;
1520 let entry_key = entry.key;
1521 let entry_link_type = entry.link_type;
1522 let entry_meta = entry.meta.clone();
1523
1524 let new_entries: Vec<DirEntry> = entries
1525 .into_iter()
1526 .filter(|e| e.name != old_name)
1527 .map(|e| DirEntry {
1528 name: e.name,
1529 hash: e.hash,
1530 size: e.size,
1531 key: e.key,
1532 link_type: e.link_type,
1533 meta: e.meta,
1534 })
1535 .chain(std::iter::once(DirEntry {
1536 name: new_name.to_string(),
1537 hash: entry_hash,
1538 size: entry_size,
1539 key: entry_key,
1540 link_type: entry_link_type,
1541 meta: entry_meta,
1542 }))
1543 .collect();
1544
1545 let new_dir_cid = self.put_directory(new_entries).await?;
1546 self.rebuild_path(root, path, new_dir_cid).await
1547 }
1548
1549 pub async fn move_entry(
1552 &self,
1553 root: &Cid,
1554 source_path: &[&str],
1555 name: &str,
1556 target_path: &[&str],
1557 ) -> Result<Cid, HashTreeError> {
1558 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1559 let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1560
1561 let source_entries = self.list_directory(&source_dir_cid).await?;
1562 let entry = source_entries
1563 .iter()
1564 .find(|e| e.name == name)
1565 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1566
1567 let entry_cid = Cid {
1568 hash: entry.hash,
1569 key: entry.key,
1570 };
1571 let entry_size = entry.size;
1572 let entry_link_type = entry.link_type;
1573
1574 let new_root = self.remove_entry(root, source_path, name).await?;
1576
1577 self.set_entry(&new_root, target_path, name, &entry_cid, entry_size, entry_link_type).await
1579 }
1580
1581 async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1582 if path.is_empty() {
1583 return Ok(Some(root.clone()));
1584 }
1585 self.resolve_path(root, &path.join("/")).await
1586 }
1587
1588 async fn rebuild_path(
1589 &self,
1590 root: &Cid,
1591 path: &[&str],
1592 new_child: Cid,
1593 ) -> Result<Cid, HashTreeError> {
1594 if path.is_empty() {
1595 return Ok(new_child);
1596 }
1597
1598 let mut child_cid = new_child;
1599 let parts: Vec<&str> = path.to_vec();
1600
1601 for i in (0..parts.len()).rev() {
1602 let child_name = parts[i];
1603 let parent_path = &parts[..i];
1604
1605 let parent_cid = if parent_path.is_empty() {
1606 root.clone()
1607 } else {
1608 self.resolve_path_array(root, parent_path)
1609 .await?
1610 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1611 };
1612
1613 let parent_entries = self.list_directory(&parent_cid).await?;
1614 let new_parent_entries: Vec<DirEntry> = parent_entries
1615 .into_iter()
1616 .map(|e| {
1617 if e.name == child_name {
1618 DirEntry {
1619 name: e.name,
1620 hash: child_cid.hash,
1621 size: 0, key: child_cid.key,
1623 link_type: e.link_type,
1624 meta: e.meta,
1625 }
1626 } else {
1627 DirEntry {
1628 name: e.name,
1629 hash: e.hash,
1630 size: e.size,
1631 key: e.key,
1632 link_type: e.link_type,
1633 meta: e.meta,
1634 }
1635 }
1636 })
1637 .collect();
1638
1639 child_cid = self.put_directory(new_parent_entries).await?;
1640 }
1641
1642 Ok(child_cid)
1643 }
1644
1645 pub fn get_store(&self) -> Arc<S> {
1649 self.store.clone()
1650 }
1651
1652 pub fn chunk_size(&self) -> usize {
1654 self.chunk_size
1655 }
1656
1657 pub fn max_links(&self) -> usize {
1659 self.max_links
1660 }
1661}
1662
1663enum StreamStackItem {
1666 Hash(Hash),
1667}
1668
1669enum ReadStreamState<'a, S: Store> {
1670 Init { hash: Hash, tree: &'a HashTree<S> },
1671 Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1672 Done,
1673}
1674
1675struct WalkStackItem {
1676 hash: Hash,
1677 path: String,
1678 key: Option<[u8; 32]>,
1679}
1680
1681enum WalkStreamState<'a, S: Store> {
1682 Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1683 Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1684 Done,
1685}
1686
1687struct EncryptedStackItem {
1689 hash: Hash,
1690 key: Option<[u8; 32]>,
1691}
1692
1693enum EncryptedStreamState<'a, S: Store> {
1694 Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1695 Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1696 Done,
1697}
1698
1699pub async fn verify_tree<S: Store>(
1701 store: Arc<S>,
1702 root_hash: &Hash,
1703) -> Result<crate::reader::VerifyResult, HashTreeError> {
1704 let mut missing = Vec::new();
1705 let mut visited = std::collections::HashSet::new();
1706
1707 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1708
1709 Ok(crate::reader::VerifyResult {
1710 valid: missing.is_empty(),
1711 missing,
1712 })
1713}
1714
1715async fn verify_recursive<S: Store>(
1716 store: Arc<S>,
1717 hash: &Hash,
1718 missing: &mut Vec<Hash>,
1719 visited: &mut std::collections::HashSet<String>,
1720) -> Result<(), HashTreeError> {
1721 let hex = to_hex(hash);
1722 if visited.contains(&hex) {
1723 return Ok(());
1724 }
1725 visited.insert(hex);
1726
1727 let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1728 Some(d) => d,
1729 None => {
1730 missing.push(*hash);
1731 return Ok(());
1732 }
1733 };
1734
1735 if is_tree_node(&data) {
1736 let node = decode_tree_node(&data)?;
1737 for link in &node.links {
1738 Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1739 }
1740 }
1741
1742 Ok(())
1743}
1744
1745#[cfg(test)]
1746mod tests {
1747 use super::*;
1748 use crate::store::MemoryStore;
1749
1750 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1751 let store = Arc::new(MemoryStore::new());
1752 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1754 (store, tree)
1755 }
1756
1757 #[tokio::test]
1758 async fn test_put_and_read_blob() {
1759 let (_store, tree) = make_tree();
1760
1761 let data = vec![1, 2, 3, 4, 5];
1762 let hash = tree.put_blob(&data).await.unwrap();
1763
1764 let result = tree.get_blob(&hash).await.unwrap();
1765 assert_eq!(result, Some(data));
1766 }
1767
1768 #[tokio::test]
1769 async fn test_put_and_read_file_small() {
1770 let (_store, tree) = make_tree();
1771
1772 let data = b"Hello, World!";
1773 let (cid, size) = tree.put_file(data).await.unwrap();
1774
1775 assert_eq!(size, data.len() as u64);
1776
1777 let read_data = tree.read_file(&cid.hash).await.unwrap();
1778 assert_eq!(read_data, Some(data.to_vec()));
1779 }
1780
1781 #[tokio::test]
1782 async fn test_put_and_read_directory() {
1783 let (_store, tree) = make_tree();
1784
1785 let file1 = tree.put_blob(b"content1").await.unwrap();
1786 let file2 = tree.put_blob(b"content2").await.unwrap();
1787
1788 let dir_cid = tree
1789 .put_directory(
1790 vec![
1791 DirEntry::new("a.txt", file1).with_size(8),
1792 DirEntry::new("b.txt", file2).with_size(8),
1793 ],
1794 )
1795 .await
1796 .unwrap();
1797
1798 let entries = tree.list_directory(&dir_cid).await.unwrap();
1799 assert_eq!(entries.len(), 2);
1800 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1801 assert!(names.contains(&"a.txt"));
1802 assert!(names.contains(&"b.txt"));
1803 }
1804
1805 #[tokio::test]
1806 async fn test_is_directory() {
1807 let (_store, tree) = make_tree();
1808
1809 let file_hash = tree.put_blob(b"data").await.unwrap();
1810 let dir_cid = tree.put_directory(vec![]).await.unwrap();
1811
1812 assert!(!tree.is_directory(&file_hash).await.unwrap());
1813 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1814 }
1815
1816 #[tokio::test]
1817 async fn test_resolve_path() {
1818 let (_store, tree) = make_tree();
1819
1820 let file_hash = tree.put_blob(b"nested").await.unwrap();
1821 let sub_dir = tree.put_directory(
1822 vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1823 ).await.unwrap();
1824 let root_dir = tree.put_directory(
1825 vec![DirEntry::new("subdir", sub_dir.hash)],
1826 ).await.unwrap();
1827
1828 let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1829 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1830 }
1831
1832 #[tokio::test]
1835 async fn test_unified_put_get_public() {
1836 let store = Arc::new(MemoryStore::new());
1837 let tree = HashTree::new(HashTreeConfig::new(store).public());
1839
1840 let data = b"Hello, public world!";
1841 let (cid, size) = tree.put(data).await.unwrap();
1842
1843 assert_eq!(size, data.len() as u64);
1844 assert!(cid.key.is_none()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1847 assert_eq!(retrieved, data);
1848 }
1849
1850 #[tokio::test]
1851 async fn test_unified_put_get_encrypted() {
1852 let store = Arc::new(MemoryStore::new());
1853 let tree = HashTree::new(HashTreeConfig::new(store));
1855
1856 let data = b"Hello, encrypted world!";
1857 let (cid, size) = tree.put(data).await.unwrap();
1858
1859 assert_eq!(size, data.len() as u64);
1860 assert!(cid.key.is_some()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1863 assert_eq!(retrieved, data);
1864 }
1865
1866 #[tokio::test]
1867 async fn test_unified_put_get_encrypted_chunked() {
1868 let store = Arc::new(MemoryStore::new());
1869 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
1870
1871 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
1873 let (cid, size) = tree.put(&data).await.unwrap();
1874
1875 assert_eq!(size, data.len() as u64);
1876 assert!(cid.key.is_some());
1877
1878 let retrieved = tree.get(&cid).await.unwrap().unwrap();
1879 assert_eq!(retrieved, data);
1880 }
1881
1882 #[tokio::test]
1883 async fn test_cid_deterministic() {
1884 let store = Arc::new(MemoryStore::new());
1885 let tree = HashTree::new(HashTreeConfig::new(store));
1886
1887 let data = b"Same content produces same CID";
1888
1889 let (cid1, _) = tree.put(data).await.unwrap();
1890 let (cid2, _) = tree.put(data).await.unwrap();
1891
1892 assert_eq!(cid1.hash, cid2.hash);
1894 assert_eq!(cid1.key, cid2.key);
1895 assert_eq!(cid1.to_string(), cid2.to_string());
1896 }
1897
1898 #[tokio::test]
1899 async fn test_cid_to_string_public() {
1900 let store = Arc::new(MemoryStore::new());
1901 let tree = HashTree::new(HashTreeConfig::new(store).public());
1902
1903 let (cid, _) = tree.put(b"test").await.unwrap();
1904 let s = cid.to_string();
1905
1906 assert_eq!(s.len(), 64);
1908 assert!(!s.contains(':'));
1909 }
1910
1911 #[tokio::test]
1912 async fn test_cid_to_string_encrypted() {
1913 let store = Arc::new(MemoryStore::new());
1914 let tree = HashTree::new(HashTreeConfig::new(store));
1915
1916 let (cid, _) = tree.put(b"test").await.unwrap();
1917 let s = cid.to_string();
1918
1919 assert_eq!(s.len(), 129);
1921 assert!(s.contains(':'));
1922 }
1923}