1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25 pub store: Arc<S>,
26 pub chunk_size: usize,
27 pub max_links: usize,
28 pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33 pub fn new(store: Arc<S>) -> Self {
34 Self {
35 store,
36 chunk_size: DEFAULT_CHUNK_SIZE,
37 max_links: DEFAULT_MAX_LINKS,
38 encrypted: true,
39 }
40 }
41
42 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43 self.chunk_size = chunk_size;
44 self
45 }
46
47 pub fn with_max_links(mut self, max_links: usize) -> Self {
48 self.max_links = max_links;
49 self
50 }
51
52 pub fn public(mut self) -> Self {
54 self.encrypted = false;
55 self
56 }
57}
58
59#[derive(Debug, thiserror::Error)]
61pub enum HashTreeError {
62 #[error("Store error: {0}")]
63 Store(String),
64 #[error("Codec error: {0}")]
65 Codec(#[from] crate::codec::CodecError),
66 #[error("Missing chunk: {0}")]
67 MissingChunk(String),
68 #[error("Path not found: {0}")]
69 PathNotFound(String),
70 #[error("Entry not found: {0}")]
71 EntryNotFound(String),
72 #[error("Encryption error: {0}")]
73 Encryption(String),
74 #[error("Decryption error: {0}")]
75 Decryption(String),
76}
77
78impl From<BuilderError> for HashTreeError {
79 fn from(e: BuilderError) -> Self {
80 match e {
81 BuilderError::Store(s) => HashTreeError::Store(s),
82 BuilderError::Codec(c) => HashTreeError::Codec(c),
83 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
84 }
85 }
86}
87
88impl From<ReaderError> for HashTreeError {
89 fn from(e: ReaderError) -> Self {
90 match e {
91 ReaderError::Store(s) => HashTreeError::Store(s),
92 ReaderError::Codec(c) => HashTreeError::Codec(c),
93 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
94 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
95 ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
96 }
97 }
98}
99
100pub struct HashTree<S: Store> {
102 store: Arc<S>,
103 chunk_size: usize,
104 max_links: usize,
105 encrypted: bool,
106}
107
108impl<S: Store> HashTree<S> {
109 pub fn new(config: HashTreeConfig<S>) -> Self {
110 Self {
111 store: config.store,
112 chunk_size: config.chunk_size,
113 max_links: config.max_links,
114 encrypted: config.encrypted,
115 }
116 }
117
118 pub fn is_encrypted(&self) -> bool {
120 self.encrypted
121 }
122
123 pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
128 let size = data.len() as u64;
129
130 if data.len() <= self.chunk_size {
132 let (hash, key) = self.put_chunk_internal(data).await?;
133 return Ok((Cid { hash, key }, size));
134 }
135
136 let mut links: Vec<Link> = Vec::new();
138 let mut offset = 0;
139
140 while offset < data.len() {
141 let end = (offset + self.chunk_size).min(data.len());
142 let chunk = &data[offset..end];
143 let chunk_size = chunk.len() as u64;
144 let (hash, key) = self.put_chunk_internal(chunk).await?;
145 links.push(Link {
146 hash,
147 name: None,
148 size: chunk_size,
149 key,
150 link_type: LinkType::Blob, meta: None,
152 });
153 offset = end;
154 }
155
156 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
158 Ok((Cid { hash: root_hash, key: root_key }, size))
159 }
160
161 pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
163 if let Some(key) = cid.key {
164 self.get_encrypted(&cid.hash, &key).await
165 } else {
166 self.read_file(&cid.hash).await
167 }
168 }
169
170 pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<(Cid, u64), HashTreeError> {
176 let mut buffer = vec![0u8; self.chunk_size];
177 let mut links = Vec::new();
178 let mut total_size: u64 = 0;
179 let mut consistent_key: Option<[u8; 32]> = None;
180
181 loop {
182 let mut chunk = Vec::new();
183 let mut bytes_read = 0;
184
185 while bytes_read < self.chunk_size {
187 let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
188 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
189 if n == 0 {
190 break; }
192 chunk.extend_from_slice(&buffer[..n]);
193 bytes_read += n;
194 }
195
196 if chunk.is_empty() {
197 break; }
199
200 let chunk_len = chunk.len() as u64;
201 total_size += chunk_len;
202
203 let (hash, key) = self.put_chunk_internal(&chunk).await?;
204
205 if links.is_empty() {
207 consistent_key = key;
208 } else if consistent_key != key {
209 consistent_key = None;
210 }
211
212 links.push(Link {
213 hash,
214 name: None,
215 size: chunk_len,
216 key,
217 link_type: LinkType::Blob, meta: None,
219 });
220 }
221
222 if links.is_empty() {
223 let (hash, key) = self.put_chunk_internal(&[]).await?;
225 return Ok((Cid { hash, key }, 0));
226 }
227
228 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
230 Ok((Cid { hash: root_hash, key: root_key }, total_size))
231 }
232
233 pub fn get_stream(
238 &self,
239 cid: &Cid,
240 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
241 let hash = cid.hash;
242 let key = cid.key;
243
244 if let Some(k) = key {
245 Box::pin(self.read_file_stream_encrypted(hash, k))
247 } else {
248 self.read_file_stream(hash)
250 }
251 }
252
253 fn read_file_stream_encrypted(
255 &self,
256 hash: Hash,
257 key: EncryptionKey,
258 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
259 stream::unfold(
260 EncryptedStreamState::Init { hash, key, tree: self },
261 |state| async move {
262 match state {
263 EncryptedStreamState::Init { hash, key, tree } => {
264 let data = match tree.store.get(&hash).await {
265 Ok(Some(d)) => d,
266 Ok(None) => return None,
267 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
268 };
269
270 let decrypted = match decrypt_chk(&data, &key) {
272 Ok(d) => d,
273 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
274 };
275
276 if !is_tree_node(&decrypted) {
277 return Some((Ok(decrypted), EncryptedStreamState::Done));
279 }
280
281 let node = match decode_tree_node(&decrypted) {
283 Ok(n) => n,
284 Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
285 };
286
287 let mut stack: Vec<EncryptedStackItem> = Vec::new();
288 for link in node.links.into_iter().rev() {
289 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
290 }
291
292 tree.process_encrypted_stream_stack(&mut stack).await
293 }
294 EncryptedStreamState::Processing { mut stack, tree } => {
295 tree.process_encrypted_stream_stack(&mut stack).await
296 }
297 EncryptedStreamState::Done => None,
298 }
299 },
300 )
301 }
302
303 async fn process_encrypted_stream_stack<'a>(
304 &'a self,
305 stack: &mut Vec<EncryptedStackItem>,
306 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
307 while let Some(item) = stack.pop() {
308 let data = match self.store.get(&item.hash).await {
309 Ok(Some(d)) => d,
310 Ok(None) => {
311 return Some((
312 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
313 EncryptedStreamState::Done,
314 ))
315 }
316 Err(e) => {
317 return Some((
318 Err(HashTreeError::Store(e.to_string())),
319 EncryptedStreamState::Done,
320 ))
321 }
322 };
323
324 let decrypted = if let Some(key) = item.key {
326 match decrypt_chk(&data, &key) {
327 Ok(d) => d,
328 Err(e) => {
329 return Some((
330 Err(HashTreeError::Decryption(e.to_string())),
331 EncryptedStreamState::Done,
332 ))
333 }
334 }
335 } else {
336 data
337 };
338
339 if is_tree_node(&decrypted) {
340 let node = match decode_tree_node(&decrypted) {
342 Ok(n) => n,
343 Err(e) => {
344 return Some((
345 Err(HashTreeError::Codec(e)),
346 EncryptedStreamState::Done,
347 ))
348 }
349 };
350 for link in node.links.into_iter().rev() {
351 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
352 }
353 } else {
354 return Some((
356 Ok(decrypted),
357 EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
358 ));
359 }
360 }
361 None
362 }
363
364 async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
366 if self.encrypted {
367 let (encrypted, key) = encrypt_chk(data)
368 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
369 let hash = sha256(&encrypted);
370 self.store
371 .put(hash, encrypted)
372 .await
373 .map_err(|e| HashTreeError::Store(e.to_string()))?;
374 Ok((hash, Some(key)))
375 } else {
376 let hash = self.put_blob(data).await?;
377 Ok((hash, None))
378 }
379 }
380
381 async fn build_tree_internal(
383 &self,
384 links: Vec<Link>,
385 total_size: Option<u64>,
386 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
387 if links.len() == 1 {
389 if let Some(ts) = total_size {
390 if links[0].size == ts {
391 return Ok((links[0].hash, links[0].key));
392 }
393 }
394 }
395
396 if links.len() <= self.max_links {
397 let node = TreeNode {
398 node_type: LinkType::File,
399 links,
400 };
401 let (data, _) = encode_and_hash(&node)?;
402
403 if self.encrypted {
404 let (encrypted, key) = encrypt_chk(&data)
405 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
406 let hash = sha256(&encrypted);
407 self.store
408 .put(hash, encrypted)
409 .await
410 .map_err(|e| HashTreeError::Store(e.to_string()))?;
411 return Ok((hash, Some(key)));
412 }
413
414 let hash = sha256(&data);
416 self.store
417 .put(hash, data)
418 .await
419 .map_err(|e| HashTreeError::Store(e.to_string()))?;
420 return Ok((hash, None));
421 }
422
423 let mut sub_links = Vec::new();
425 for batch in links.chunks(self.max_links) {
426 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
427 let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
428 sub_links.push(Link {
429 hash,
430 name: None,
431 size: batch_size,
432 key,
433 link_type: LinkType::File, meta: None,
435 });
436 }
437
438 Box::pin(self.build_tree_internal(sub_links, total_size)).await
439 }
440
441 async fn get_encrypted(
443 &self,
444 hash: &Hash,
445 key: &EncryptionKey,
446 ) -> Result<Option<Vec<u8>>, HashTreeError> {
447 let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
448 Some(d) => d,
449 None => return Ok(None),
450 };
451
452 let decrypted = decrypt_chk(&encrypted_data, key)
454 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
455
456 if is_tree_node(&decrypted) {
458 let node = decode_tree_node(&decrypted)?;
459 let assembled = self.assemble_encrypted_chunks(&node).await?;
460 return Ok(Some(assembled));
461 }
462
463 Ok(Some(decrypted))
465 }
466
467 async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
469 let mut parts: Vec<Vec<u8>> = Vec::new();
470
471 for link in &node.links {
472 let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
473
474 let encrypted_child = self
475 .store
476 .get(&link.hash)
477 .await
478 .map_err(|e| HashTreeError::Store(e.to_string()))?
479 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
480
481 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
482 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
483
484 if is_tree_node(&decrypted) {
485 let child_node = decode_tree_node(&decrypted)?;
487 let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
488 parts.push(child_data);
489 } else {
490 parts.push(decrypted);
492 }
493 }
494
495 let total_len: usize = parts.iter().map(|p| p.len()).sum();
496 let mut result = Vec::with_capacity(total_len);
497 for part in parts {
498 result.extend_from_slice(&part);
499 }
500
501 Ok(result)
502 }
503
504 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
509 let hash = sha256(data);
510 self.store
511 .put(hash, data.to_vec())
512 .await
513 .map_err(|e| HashTreeError::Store(e.to_string()))?;
514 Ok(hash)
515 }
516
517 pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
520 let size = data.len() as u64;
521
522 if data.len() <= self.chunk_size {
524 let (hash, key) = self.put_chunk_internal(data).await?;
525 return Ok((Cid { hash, key }, size));
526 }
527
528 let mut links: Vec<Link> = Vec::new();
530 let mut offset = 0;
531
532 while offset < data.len() {
533 let end = (offset + self.chunk_size).min(data.len());
534 let chunk = &data[offset..end];
535 let chunk_size = (end - offset) as u64;
536
537 let (hash, key) = self.put_chunk_internal(chunk).await?;
538 links.push(Link {
539 hash,
540 name: None,
541 size: chunk_size,
542 key,
543 link_type: LinkType::Blob, meta: None,
545 });
546 offset = end;
547 }
548
549 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
551 Ok((Cid { hash: root_hash, key: root_key }, size))
552 }
553
554 pub async fn put_directory(
560 &self,
561 entries: Vec<DirEntry>,
562 ) -> Result<Cid, HashTreeError> {
563 let mut sorted = entries;
565 sorted.sort_by(|a, b| a.name.cmp(&b.name));
566
567 let links: Vec<Link> = sorted
568 .into_iter()
569 .map(|e| Link {
570 hash: e.hash,
571 name: Some(e.name),
572 size: e.size,
573 key: e.key,
574 link_type: e.link_type,
575 meta: e.meta,
576 })
577 .collect();
578
579 let node = TreeNode {
581 node_type: LinkType::Dir,
582 links,
583 };
584 let (data, _plain_hash) = encode_and_hash(&node)?;
585
586 let (cid, _size) = self.put(&data).await?;
591 Ok(cid)
592 }
593
594 pub async fn put_tree_node(
596 &self,
597 links: Vec<Link>,
598 ) -> Result<Hash, HashTreeError> {
599 let node = TreeNode {
600 node_type: LinkType::Dir,
601 links,
602 };
603
604 let (data, hash) = encode_and_hash(&node)?;
605 self.store
606 .put(hash, data)
607 .await
608 .map_err(|e| HashTreeError::Store(e.to_string()))?;
609 Ok(hash)
610 }
611
612 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
616 self.store
617 .get(hash)
618 .await
619 .map_err(|e| HashTreeError::Store(e.to_string()))
620 }
621
622 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
624 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
625 Some(d) => d,
626 None => return Ok(None),
627 };
628
629 if !is_tree_node(&data) {
630 return Ok(None);
631 }
632
633 let node = decode_tree_node(&data)?;
634 Ok(Some(node))
635 }
636
637 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
639 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
640 Some(d) => d,
641 None => return Ok(None),
642 };
643
644 let decrypted = if let Some(key) = &cid.key {
646 decrypt_chk(&data, key)
647 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
648 } else {
649 data
650 };
651
652 if !is_tree_node(&decrypted) {
653 return Ok(None);
654 }
655
656 let node = decode_tree_node(&decrypted)?;
657 Ok(Some(node))
658 }
659
660 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
663 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
664 Some(d) => d,
665 None => return Ok(None),
666 };
667
668 let decrypted = if let Some(key) = &cid.key {
670 decrypt_chk(&data, key)
671 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
672 } else {
673 data
674 };
675
676 if !is_tree_node(&decrypted) {
677 return Ok(None);
678 }
679
680 let node = decode_tree_node(&decrypted)?;
681
682 if node.node_type == LinkType::File {
684 let assembled = self.assemble_chunks(&node).await?;
685 if is_tree_node(&assembled) {
686 let inner_node = decode_tree_node(&assembled)?;
687 return Ok(Some(inner_node));
688 }
689 }
690
691 Ok(Some(node))
692 }
693
694 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
696 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
697 Some(d) => d,
698 None => return Ok(false),
699 };
700 Ok(is_tree_node(&data))
701 }
702
703 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
705 let node = match self.get_node(cid).await? {
706 Some(n) => n,
707 None => return Ok(false),
708 };
709 Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
711 }
712
713 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
715 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
716 Some(d) => d,
717 None => return Ok(false),
718 };
719 Ok(is_directory_node(&data))
720 }
721
722 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
724 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
725 Some(d) => d,
726 None => return Ok(None),
727 };
728
729 if !is_tree_node(&data) {
731 return Ok(Some(data));
732 }
733
734 let node = decode_tree_node(&data)?;
736 let assembled = self.assemble_chunks(&node).await?;
737 Ok(Some(assembled))
738 }
739
740 pub async fn read_file_range(
747 &self,
748 hash: &Hash,
749 start: u64,
750 end: Option<u64>,
751 ) -> Result<Option<Vec<u8>>, HashTreeError> {
752 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
753 Some(d) => d,
754 None => return Ok(None),
755 };
756
757 if !is_tree_node(&data) {
759 let start_idx = start as usize;
760 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
761 if start_idx >= data.len() {
762 return Ok(Some(vec![]));
763 }
764 let end_idx = end_idx.min(data.len());
765 return Ok(Some(data[start_idx..end_idx].to_vec()));
766 }
767
768 let node = decode_tree_node(&data)?;
770 let range_data = self.assemble_chunks_range(&node, start, end).await?;
771 Ok(Some(range_data))
772 }
773
774 async fn assemble_chunks_range(
776 &self,
777 node: &TreeNode,
778 start: u64,
779 end: Option<u64>,
780 ) -> Result<Vec<u8>, HashTreeError> {
781 let chunks_info = self.collect_chunk_offsets(node).await?;
783
784 if chunks_info.is_empty() {
785 return Ok(vec![]);
786 }
787
788 let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
790 let actual_end = end.unwrap_or(total_size).min(total_size);
791
792 if start >= actual_end {
793 return Ok(vec![]);
794 }
795
796 let mut result = Vec::with_capacity((actual_end - start) as usize);
798 let mut current_offset = 0u64;
799
800 for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
801 let chunk_start = current_offset;
802 let chunk_end = current_offset + chunk_size;
803
804 if chunk_end > start && chunk_start < actual_end {
806 let chunk_data = self
808 .store
809 .get(chunk_hash)
810 .await
811 .map_err(|e| HashTreeError::Store(e.to_string()))?
812 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
813
814 let slice_start = if start > chunk_start {
816 (start - chunk_start) as usize
817 } else {
818 0
819 };
820 let slice_end = if actual_end < chunk_end {
821 (actual_end - chunk_start) as usize
822 } else {
823 chunk_data.len()
824 };
825
826 result.extend_from_slice(&chunk_data[slice_start..slice_end]);
827 }
828
829 current_offset = chunk_end;
830
831 if current_offset >= actual_end {
833 break;
834 }
835 }
836
837 Ok(result)
838 }
839
840 async fn collect_chunk_offsets(
843 &self,
844 node: &TreeNode,
845 ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
846 let mut chunks = Vec::new();
847 let mut offset = 0u64;
848 self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset).await?;
849 Ok(chunks)
850 }
851
852 async fn collect_chunk_offsets_recursive(
853 &self,
854 node: &TreeNode,
855 chunks: &mut Vec<(Hash, u64, u64)>,
856 offset: &mut u64,
857 ) -> Result<(), HashTreeError> {
858 for link in &node.links {
859 let child_data = self
860 .store
861 .get(&link.hash)
862 .await
863 .map_err(|e| HashTreeError::Store(e.to_string()))?
864 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
865
866 if is_tree_node(&child_data) {
867 let child_node = decode_tree_node(&child_data)?;
869 Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
870 } else {
871 let size = child_data.len() as u64;
873 chunks.push((link.hash, *offset, size));
874 *offset += size;
875 }
876 }
877 Ok(())
878 }
879
880 async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
882 let mut parts: Vec<Vec<u8>> = Vec::new();
883
884 for link in &node.links {
885 let child_data = self
886 .store
887 .get(&link.hash)
888 .await
889 .map_err(|e| HashTreeError::Store(e.to_string()))?
890 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
891
892 if is_tree_node(&child_data) {
893 let child_node = decode_tree_node(&child_data)?;
894 parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
895 } else {
896 parts.push(child_data);
897 }
898 }
899
900 let total_length: usize = parts.iter().map(|p| p.len()).sum();
902 let mut result = Vec::with_capacity(total_length);
903 for part in parts {
904 result.extend_from_slice(&part);
905 }
906
907 Ok(result)
908 }
909
910 pub fn read_file_stream(
913 &self,
914 hash: Hash,
915 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
916 Box::pin(stream::unfold(
917 ReadStreamState::Init { hash, tree: self },
918 |state| async move {
919 match state {
920 ReadStreamState::Init { hash, tree } => {
921 let data = match tree.store.get(&hash).await {
922 Ok(Some(d)) => d,
923 Ok(None) => return None,
924 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
925 };
926
927 if !is_tree_node(&data) {
928 return Some((Ok(data), ReadStreamState::Done));
930 }
931
932 let node = match decode_tree_node(&data) {
934 Ok(n) => n,
935 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
936 };
937
938 let mut stack: Vec<StreamStackItem> = Vec::new();
940 for link in node.links.into_iter().rev() {
941 stack.push(StreamStackItem::Hash(link.hash));
942 }
943
944 tree.process_stream_stack(&mut stack).await
946 }
947 ReadStreamState::Processing { mut stack, tree } => {
948 tree.process_stream_stack(&mut stack).await
949 }
950 ReadStreamState::Done => None,
951 }
952 },
953 ))
954 }
955
956 async fn process_stream_stack<'a>(
957 &'a self,
958 stack: &mut Vec<StreamStackItem>,
959 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
960 while let Some(item) = stack.pop() {
961 match item {
962 StreamStackItem::Hash(hash) => {
963 let data = match self.store.get(&hash).await {
964 Ok(Some(d)) => d,
965 Ok(None) => {
966 return Some((
967 Err(HashTreeError::MissingChunk(to_hex(&hash))),
968 ReadStreamState::Done,
969 ))
970 }
971 Err(e) => {
972 return Some((
973 Err(HashTreeError::Store(e.to_string())),
974 ReadStreamState::Done,
975 ))
976 }
977 };
978
979 if is_tree_node(&data) {
980 let node = match decode_tree_node(&data) {
982 Ok(n) => n,
983 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
984 };
985 for link in node.links.into_iter().rev() {
986 stack.push(StreamStackItem::Hash(link.hash));
987 }
988 } else {
989 return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
991 }
992 }
993 }
994 }
995 None
996 }
997
998 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1000 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1001 Some(d) => d,
1002 None => return Ok(vec![]),
1003 };
1004
1005 if !is_tree_node(&data) {
1006 return Ok(vec![data]);
1007 }
1008
1009 let node = decode_tree_node(&data)?;
1010 self.collect_chunks(&node).await
1011 }
1012
1013 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1014 let mut chunks = Vec::new();
1015
1016 for link in &node.links {
1017 let child_data = self
1018 .store
1019 .get(&link.hash)
1020 .await
1021 .map_err(|e| HashTreeError::Store(e.to_string()))?
1022 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1023
1024 if is_tree_node(&child_data) {
1025 let child_node = decode_tree_node(&child_data)?;
1026 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1027 } else {
1028 chunks.push(child_data);
1029 }
1030 }
1031
1032 Ok(chunks)
1033 }
1034
1035 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1037 let node = match self.get_node(cid).await? {
1038 Some(n) => n,
1039 None => return Ok(vec![]),
1040 };
1041
1042 let mut entries = Vec::new();
1043
1044 for link in &node.links {
1045 if let Some(ref name) = link.name {
1047 if name.starts_with("_chunk_") || name.starts_with('_') {
1048 let chunk_cid = Cid { hash: link.hash, key: link.key };
1049 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1050 entries.extend(sub_entries);
1051 continue;
1052 }
1053 }
1054
1055 entries.push(TreeEntry {
1056 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1057 hash: link.hash,
1058 size: link.size,
1059 link_type: link.link_type,
1060 key: link.key,
1061 meta: link.meta.clone(),
1062 });
1063 }
1064
1065 Ok(entries)
1066 }
1067
1068 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1071 let node = match self.get_directory_node(cid).await? {
1073 Some(n) => n,
1074 None => return Ok(vec![]),
1075 };
1076
1077 let mut entries = Vec::new();
1078
1079 for link in &node.links {
1080 if let Some(ref name) = link.name {
1082 if name.starts_with("_chunk_") || name.starts_with('_') {
1083 let sub_cid = Cid { hash: link.hash, key: cid.key };
1085 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1086 entries.extend(sub_entries);
1087 continue;
1088 }
1089 }
1090
1091 entries.push(TreeEntry {
1092 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1093 hash: link.hash,
1094 size: link.size,
1095 link_type: link.link_type,
1096 key: link.key,
1097 meta: link.meta.clone(),
1098 });
1099 }
1100
1101 Ok(entries)
1102 }
1103
1104 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1106 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1107 if parts.is_empty() {
1108 return Ok(Some(cid.clone()));
1109 }
1110
1111 let mut current_cid = cid.clone();
1112
1113 for part in parts {
1114 let node = match self.get_directory_node(¤t_cid).await? {
1116 Some(n) => n,
1117 None => return Ok(None),
1118 };
1119
1120 if let Some(link) = self.find_link(&node, part) {
1121 current_cid = Cid {
1122 hash: link.hash,
1123 key: link.key,
1124 };
1125 } else {
1126 match self.find_link_in_subtrees_cid(&node, part, ¤t_cid).await? {
1128 Some(link) => {
1129 current_cid = Cid {
1130 hash: link.hash,
1131 key: link.key,
1132 };
1133 }
1134 None => return Ok(None),
1135 }
1136 }
1137 }
1138
1139 Ok(Some(current_cid))
1140 }
1141
1142 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1144 self.resolve(cid, path).await
1145 }
1146
1147 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1148 node.links
1149 .iter()
1150 .find(|l| l.name.as_deref() == Some(name))
1151 .cloned()
1152 }
1153
1154 async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1156 for link in &node.links {
1157 if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1158 continue;
1159 }
1160
1161 let sub_cid = Cid {
1163 hash: link.hash,
1164 key: link.key.clone(),
1165 };
1166
1167 let sub_node = match self.get_node(&sub_cid).await? {
1168 Some(n) => n,
1169 None => continue,
1170 };
1171
1172 if let Some(found) = self.find_link(&sub_node, name) {
1173 return Ok(Some(found));
1174 }
1175
1176 if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1177 return Ok(Some(deep_found));
1178 }
1179 }
1180
1181 Ok(None)
1182 }
1183
1184 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1186 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1187 Some(d) => d,
1188 None => return Ok(0),
1189 };
1190
1191 if !is_tree_node(&data) {
1192 return Ok(data.len() as u64);
1193 }
1194
1195 let node = decode_tree_node(&data)?;
1196 let mut total = 0u64;
1198 for link in &node.links {
1199 total += link.size;
1200 }
1201 Ok(total)
1202 }
1203
1204 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1206 let mut entries = Vec::new();
1207 self.walk_recursive(cid, path, &mut entries).await?;
1208 Ok(entries)
1209 }
1210
1211 async fn walk_recursive(
1212 &self,
1213 cid: &Cid,
1214 path: &str,
1215 entries: &mut Vec<WalkEntry>,
1216 ) -> Result<(), HashTreeError> {
1217 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1218 Some(d) => d,
1219 None => return Ok(()),
1220 };
1221
1222 let data = if let Some(key) = &cid.key {
1224 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1225 } else {
1226 data
1227 };
1228
1229 let node = match try_decode_tree_node(&data) {
1230 Some(n) => n,
1231 None => {
1232 entries.push(WalkEntry {
1233 path: path.to_string(),
1234 hash: cid.hash,
1235 link_type: LinkType::Blob,
1236 size: data.len() as u64,
1237 key: cid.key,
1238 });
1239 return Ok(());
1240 }
1241 };
1242
1243 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1244 entries.push(WalkEntry {
1245 path: path.to_string(),
1246 hash: cid.hash,
1247 link_type: node.node_type,
1248 size: node_size,
1249 key: cid.key,
1250 });
1251
1252 for link in &node.links {
1253 let child_path = match &link.name {
1254 Some(name) => {
1255 if name.starts_with("_chunk_") || name.starts_with('_') {
1256 let sub_cid = Cid { hash: link.hash, key: cid.key };
1258 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1259 continue;
1260 }
1261 if path.is_empty() {
1262 name.clone()
1263 } else {
1264 format!("{}/{}", path, name)
1265 }
1266 }
1267 None => path.to_string(),
1268 };
1269
1270 let child_cid = Cid { hash: link.hash, key: link.key };
1272 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1273 }
1274
1275 Ok(())
1276 }
1277
1278 pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1281 self.walk_parallel_with_progress(cid, path, concurrency, None).await
1282 }
1283
1284 pub async fn walk_parallel_with_progress(
1291 &self,
1292 cid: &Cid,
1293 path: &str,
1294 concurrency: usize,
1295 progress: Option<&std::sync::atomic::AtomicUsize>,
1296 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1297 use futures::stream::{FuturesUnordered, StreamExt};
1298 use std::collections::VecDeque;
1299 use std::sync::atomic::Ordering;
1300
1301 let mut entries = Vec::new();
1302 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1303 let mut active = FuturesUnordered::new();
1304
1305 pending.push_back((cid.clone(), path.to_string()));
1307
1308 loop {
1309 while active.len() < concurrency {
1311 if let Some((node_cid, node_path)) = pending.pop_front() {
1312 let store = &self.store;
1313 let fut = async move {
1314 let data = store.get(&node_cid.hash).await
1315 .map_err(|e| HashTreeError::Store(e.to_string()))?;
1316 Ok::<_, HashTreeError>((node_cid, node_path, data))
1317 };
1318 active.push(fut);
1319 } else {
1320 break;
1321 }
1322 }
1323
1324 if active.is_empty() {
1326 break;
1327 }
1328
1329 if let Some(result) = active.next().await {
1331 let (node_cid, node_path, data) = result?;
1332
1333 if let Some(counter) = progress {
1335 counter.fetch_add(1, Ordering::Relaxed);
1336 }
1337
1338 let data = match data {
1339 Some(d) => d,
1340 None => continue,
1341 };
1342
1343 let data = if let Some(key) = &node_cid.key {
1345 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1346 } else {
1347 data
1348 };
1349
1350 let node = match try_decode_tree_node(&data) {
1351 Some(n) => n,
1352 None => {
1353 entries.push(WalkEntry {
1355 path: node_path,
1356 hash: node_cid.hash,
1357 link_type: LinkType::Blob,
1358 size: data.len() as u64,
1359 key: node_cid.key,
1360 });
1361 continue;
1362 }
1363 };
1364
1365 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1367 entries.push(WalkEntry {
1368 path: node_path.clone(),
1369 hash: node_cid.hash,
1370 link_type: node.node_type,
1371 size: node_size,
1372 key: node_cid.key,
1373 });
1374
1375 for link in &node.links {
1377 let child_path = match &link.name {
1378 Some(name) => {
1379 if name.starts_with("_chunk_") || name.starts_with('_') {
1380 let sub_cid = Cid { hash: link.hash, key: node_cid.key };
1382 pending.push_back((sub_cid, node_path.clone()));
1383 continue;
1384 }
1385 if node_path.is_empty() {
1386 name.clone()
1387 } else {
1388 format!("{}/{}", node_path, name)
1389 }
1390 }
1391 None => node_path.clone(),
1392 };
1393
1394 if link.link_type == LinkType::Blob {
1397 entries.push(WalkEntry {
1398 path: child_path,
1399 hash: link.hash,
1400 link_type: LinkType::Blob,
1401 size: link.size,
1402 key: link.key,
1403 });
1404 if let Some(counter) = progress {
1405 counter.fetch_add(1, Ordering::Relaxed);
1406 }
1407 continue;
1408 }
1409
1410 let child_cid = Cid { hash: link.hash, key: link.key };
1412 pending.push_back((child_cid, child_path));
1413 }
1414 }
1415 }
1416
1417 Ok(entries)
1418 }
1419
1420 pub fn walk_stream(
1422 &self,
1423 cid: Cid,
1424 initial_path: String,
1425 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1426 Box::pin(stream::unfold(
1427 WalkStreamState::Init { cid, path: initial_path, tree: self },
1428 |state| async move {
1429 match state {
1430 WalkStreamState::Init { cid, path, tree } => {
1431 let data = match tree.store.get(&cid.hash).await {
1432 Ok(Some(d)) => d,
1433 Ok(None) => return None,
1434 Err(e) => {
1435 return Some((
1436 Err(HashTreeError::Store(e.to_string())),
1437 WalkStreamState::Done,
1438 ))
1439 }
1440 };
1441
1442 let data = if let Some(key) = &cid.key {
1444 match decrypt_chk(&data, key) {
1445 Ok(d) => d,
1446 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1447 }
1448 } else {
1449 data
1450 };
1451
1452 let node = match try_decode_tree_node(&data) {
1453 Some(n) => n,
1454 None => {
1455 let entry = WalkEntry {
1457 path,
1458 hash: cid.hash,
1459 link_type: LinkType::Blob,
1460 size: data.len() as u64,
1461 key: cid.key,
1462 };
1463 return Some((Ok(entry), WalkStreamState::Done));
1464 }
1465 };
1466
1467 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1468 let entry = WalkEntry {
1469 path: path.clone(),
1470 hash: cid.hash,
1471 link_type: node.node_type,
1472 size: node_size,
1473 key: cid.key,
1474 };
1475
1476 let mut stack: Vec<WalkStackItem> = Vec::new();
1478 for link in node.links.into_iter().rev() {
1479 let child_path = match &link.name {
1480 Some(name) if !name.starts_with('_') => {
1481 if path.is_empty() {
1482 name.clone()
1483 } else {
1484 format!("{}/{}", path, name)
1485 }
1486 }
1487 _ => path.clone(),
1488 };
1489 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1491 }
1492
1493 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1494 }
1495 WalkStreamState::Processing { mut stack, tree } => {
1496 tree.process_walk_stack(&mut stack).await
1497 }
1498 WalkStreamState::Done => None,
1499 }
1500 },
1501 ))
1502 }
1503
1504 async fn process_walk_stack<'a>(
1505 &'a self,
1506 stack: &mut Vec<WalkStackItem>,
1507 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1508 while let Some(item) = stack.pop() {
1509 let data = match self.store.get(&item.hash).await {
1510 Ok(Some(d)) => d,
1511 Ok(None) => continue,
1512 Err(e) => {
1513 return Some((
1514 Err(HashTreeError::Store(e.to_string())),
1515 WalkStreamState::Done,
1516 ))
1517 }
1518 };
1519
1520 let node = match try_decode_tree_node(&data) {
1521 Some(n) => n,
1522 None => {
1523 let entry = WalkEntry {
1525 path: item.path,
1526 hash: item.hash,
1527 link_type: LinkType::Blob,
1528 size: data.len() as u64,
1529 key: item.key,
1530 };
1531 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1532 }
1533 };
1534
1535 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1536 let entry = WalkEntry {
1537 path: item.path.clone(),
1538 hash: item.hash,
1539 link_type: node.node_type,
1540 size: node_size,
1541 key: None, };
1543
1544 for link in node.links.into_iter().rev() {
1546 let child_path = match &link.name {
1547 Some(name) if !name.starts_with('_') => {
1548 if item.path.is_empty() {
1549 name.clone()
1550 } else {
1551 format!("{}/{}", item.path, name)
1552 }
1553 }
1554 _ => item.path.clone(),
1555 };
1556 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1557 }
1558
1559 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1560 }
1561 None
1562 }
1563
1564 pub async fn set_entry(
1569 &self,
1570 root: &Cid,
1571 path: &[&str],
1572 name: &str,
1573 entry_cid: &Cid,
1574 size: u64,
1575 link_type: LinkType,
1576 ) -> Result<Cid, HashTreeError> {
1577 let dir_cid = self.resolve_path_array(root, path).await?;
1578 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1579
1580 let entries = self.list_directory(&dir_cid).await?;
1581 let mut new_entries: Vec<DirEntry> = entries
1582 .into_iter()
1583 .filter(|e| e.name != name)
1584 .map(|e| DirEntry {
1585 name: e.name,
1586 hash: e.hash,
1587 size: e.size,
1588 key: e.key,
1589 link_type: e.link_type,
1590 meta: e.meta,
1591 })
1592 .collect();
1593
1594 new_entries.push(DirEntry {
1595 name: name.to_string(),
1596 hash: entry_cid.hash,
1597 size,
1598 key: entry_cid.key,
1599 link_type,
1600 meta: None,
1601 });
1602
1603 let new_dir_cid = self.put_directory(new_entries).await?;
1604 self.rebuild_path(root, path, new_dir_cid).await
1605 }
1606
1607 pub async fn remove_entry(
1610 &self,
1611 root: &Cid,
1612 path: &[&str],
1613 name: &str,
1614 ) -> Result<Cid, HashTreeError> {
1615 let dir_cid = self.resolve_path_array(root, path).await?;
1616 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1617
1618 let entries = self.list_directory(&dir_cid).await?;
1619 let new_entries: Vec<DirEntry> = entries
1620 .into_iter()
1621 .filter(|e| e.name != name)
1622 .map(|e| DirEntry {
1623 name: e.name,
1624 hash: e.hash,
1625 size: e.size,
1626 key: e.key,
1627 link_type: e.link_type,
1628 meta: e.meta,
1629 })
1630 .collect();
1631
1632 let new_dir_cid = self.put_directory(new_entries).await?;
1633 self.rebuild_path(root, path, new_dir_cid).await
1634 }
1635
1636 pub async fn rename_entry(
1639 &self,
1640 root: &Cid,
1641 path: &[&str],
1642 old_name: &str,
1643 new_name: &str,
1644 ) -> Result<Cid, HashTreeError> {
1645 if old_name == new_name {
1646 return Ok(root.clone());
1647 }
1648
1649 let dir_cid = self.resolve_path_array(root, path).await?;
1650 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1651
1652 let entries = self.list_directory(&dir_cid).await?;
1653 let entry = entries
1654 .iter()
1655 .find(|e| e.name == old_name)
1656 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1657
1658 let entry_hash = entry.hash;
1659 let entry_size = entry.size;
1660 let entry_key = entry.key;
1661 let entry_link_type = entry.link_type;
1662 let entry_meta = entry.meta.clone();
1663
1664 let new_entries: Vec<DirEntry> = entries
1665 .into_iter()
1666 .filter(|e| e.name != old_name)
1667 .map(|e| DirEntry {
1668 name: e.name,
1669 hash: e.hash,
1670 size: e.size,
1671 key: e.key,
1672 link_type: e.link_type,
1673 meta: e.meta,
1674 })
1675 .chain(std::iter::once(DirEntry {
1676 name: new_name.to_string(),
1677 hash: entry_hash,
1678 size: entry_size,
1679 key: entry_key,
1680 link_type: entry_link_type,
1681 meta: entry_meta,
1682 }))
1683 .collect();
1684
1685 let new_dir_cid = self.put_directory(new_entries).await?;
1686 self.rebuild_path(root, path, new_dir_cid).await
1687 }
1688
1689 pub async fn move_entry(
1692 &self,
1693 root: &Cid,
1694 source_path: &[&str],
1695 name: &str,
1696 target_path: &[&str],
1697 ) -> Result<Cid, HashTreeError> {
1698 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1699 let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1700
1701 let source_entries = self.list_directory(&source_dir_cid).await?;
1702 let entry = source_entries
1703 .iter()
1704 .find(|e| e.name == name)
1705 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1706
1707 let entry_cid = Cid {
1708 hash: entry.hash,
1709 key: entry.key,
1710 };
1711 let entry_size = entry.size;
1712 let entry_link_type = entry.link_type;
1713
1714 let new_root = self.remove_entry(root, source_path, name).await?;
1716
1717 self.set_entry(&new_root, target_path, name, &entry_cid, entry_size, entry_link_type).await
1719 }
1720
1721 async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1722 if path.is_empty() {
1723 return Ok(Some(root.clone()));
1724 }
1725 self.resolve_path(root, &path.join("/")).await
1726 }
1727
1728 async fn rebuild_path(
1729 &self,
1730 root: &Cid,
1731 path: &[&str],
1732 new_child: Cid,
1733 ) -> Result<Cid, HashTreeError> {
1734 if path.is_empty() {
1735 return Ok(new_child);
1736 }
1737
1738 let mut child_cid = new_child;
1739 let parts: Vec<&str> = path.to_vec();
1740
1741 for i in (0..parts.len()).rev() {
1742 let child_name = parts[i];
1743 let parent_path = &parts[..i];
1744
1745 let parent_cid = if parent_path.is_empty() {
1746 root.clone()
1747 } else {
1748 self.resolve_path_array(root, parent_path)
1749 .await?
1750 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1751 };
1752
1753 let parent_entries = self.list_directory(&parent_cid).await?;
1754 let new_parent_entries: Vec<DirEntry> = parent_entries
1755 .into_iter()
1756 .map(|e| {
1757 if e.name == child_name {
1758 DirEntry {
1759 name: e.name,
1760 hash: child_cid.hash,
1761 size: 0, key: child_cid.key,
1763 link_type: e.link_type,
1764 meta: e.meta,
1765 }
1766 } else {
1767 DirEntry {
1768 name: e.name,
1769 hash: e.hash,
1770 size: e.size,
1771 key: e.key,
1772 link_type: e.link_type,
1773 meta: e.meta,
1774 }
1775 }
1776 })
1777 .collect();
1778
1779 child_cid = self.put_directory(new_parent_entries).await?;
1780 }
1781
1782 Ok(child_cid)
1783 }
1784
1785 pub fn get_store(&self) -> Arc<S> {
1789 self.store.clone()
1790 }
1791
1792 pub fn chunk_size(&self) -> usize {
1794 self.chunk_size
1795 }
1796
1797 pub fn max_links(&self) -> usize {
1799 self.max_links
1800 }
1801}
1802
1803enum StreamStackItem {
1806 Hash(Hash),
1807}
1808
1809enum ReadStreamState<'a, S: Store> {
1810 Init { hash: Hash, tree: &'a HashTree<S> },
1811 Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1812 Done,
1813}
1814
1815struct WalkStackItem {
1816 hash: Hash,
1817 path: String,
1818 key: Option<[u8; 32]>,
1819}
1820
1821enum WalkStreamState<'a, S: Store> {
1822 Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1823 Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1824 Done,
1825}
1826
1827struct EncryptedStackItem {
1829 hash: Hash,
1830 key: Option<[u8; 32]>,
1831}
1832
1833enum EncryptedStreamState<'a, S: Store> {
1834 Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1835 Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1836 Done,
1837}
1838
1839pub async fn verify_tree<S: Store>(
1841 store: Arc<S>,
1842 root_hash: &Hash,
1843) -> Result<crate::reader::VerifyResult, HashTreeError> {
1844 let mut missing = Vec::new();
1845 let mut visited = std::collections::HashSet::new();
1846
1847 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1848
1849 Ok(crate::reader::VerifyResult {
1850 valid: missing.is_empty(),
1851 missing,
1852 })
1853}
1854
1855async fn verify_recursive<S: Store>(
1856 store: Arc<S>,
1857 hash: &Hash,
1858 missing: &mut Vec<Hash>,
1859 visited: &mut std::collections::HashSet<String>,
1860) -> Result<(), HashTreeError> {
1861 let hex = to_hex(hash);
1862 if visited.contains(&hex) {
1863 return Ok(());
1864 }
1865 visited.insert(hex);
1866
1867 let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1868 Some(d) => d,
1869 None => {
1870 missing.push(*hash);
1871 return Ok(());
1872 }
1873 };
1874
1875 if is_tree_node(&data) {
1876 let node = decode_tree_node(&data)?;
1877 for link in &node.links {
1878 Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1879 }
1880 }
1881
1882 Ok(())
1883}
1884
1885#[cfg(test)]
1886mod tests {
1887 use super::*;
1888 use crate::store::MemoryStore;
1889
1890 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1891 let store = Arc::new(MemoryStore::new());
1892 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1894 (store, tree)
1895 }
1896
1897 #[tokio::test]
1898 async fn test_put_and_read_blob() {
1899 let (_store, tree) = make_tree();
1900
1901 let data = vec![1, 2, 3, 4, 5];
1902 let hash = tree.put_blob(&data).await.unwrap();
1903
1904 let result = tree.get_blob(&hash).await.unwrap();
1905 assert_eq!(result, Some(data));
1906 }
1907
1908 #[tokio::test]
1909 async fn test_put_and_read_file_small() {
1910 let (_store, tree) = make_tree();
1911
1912 let data = b"Hello, World!";
1913 let (cid, size) = tree.put_file(data).await.unwrap();
1914
1915 assert_eq!(size, data.len() as u64);
1916
1917 let read_data = tree.read_file(&cid.hash).await.unwrap();
1918 assert_eq!(read_data, Some(data.to_vec()));
1919 }
1920
1921 #[tokio::test]
1922 async fn test_put_and_read_directory() {
1923 let (_store, tree) = make_tree();
1924
1925 let file1 = tree.put_blob(b"content1").await.unwrap();
1926 let file2 = tree.put_blob(b"content2").await.unwrap();
1927
1928 let dir_cid = tree
1929 .put_directory(
1930 vec![
1931 DirEntry::new("a.txt", file1).with_size(8),
1932 DirEntry::new("b.txt", file2).with_size(8),
1933 ],
1934 )
1935 .await
1936 .unwrap();
1937
1938 let entries = tree.list_directory(&dir_cid).await.unwrap();
1939 assert_eq!(entries.len(), 2);
1940 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1941 assert!(names.contains(&"a.txt"));
1942 assert!(names.contains(&"b.txt"));
1943 }
1944
1945 #[tokio::test]
1946 async fn test_is_directory() {
1947 let (_store, tree) = make_tree();
1948
1949 let file_hash = tree.put_blob(b"data").await.unwrap();
1950 let dir_cid = tree.put_directory(vec![]).await.unwrap();
1951
1952 assert!(!tree.is_directory(&file_hash).await.unwrap());
1953 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1954 }
1955
1956 #[tokio::test]
1957 async fn test_resolve_path() {
1958 let (_store, tree) = make_tree();
1959
1960 let file_hash = tree.put_blob(b"nested").await.unwrap();
1961 let sub_dir = tree.put_directory(
1962 vec![DirEntry::new("file.txt", file_hash).with_size(6)],
1963 ).await.unwrap();
1964 let root_dir = tree.put_directory(
1965 vec![DirEntry::new("subdir", sub_dir.hash)],
1966 ).await.unwrap();
1967
1968 let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
1969 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
1970 }
1971
1972 #[tokio::test]
1975 async fn test_unified_put_get_public() {
1976 let store = Arc::new(MemoryStore::new());
1977 let tree = HashTree::new(HashTreeConfig::new(store).public());
1979
1980 let data = b"Hello, public world!";
1981 let (cid, size) = tree.put(data).await.unwrap();
1982
1983 assert_eq!(size, data.len() as u64);
1984 assert!(cid.key.is_none()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
1987 assert_eq!(retrieved, data);
1988 }
1989
1990 #[tokio::test]
1991 async fn test_unified_put_get_encrypted() {
1992 let store = Arc::new(MemoryStore::new());
1993 let tree = HashTree::new(HashTreeConfig::new(store));
1995
1996 let data = b"Hello, encrypted world!";
1997 let (cid, size) = tree.put(data).await.unwrap();
1998
1999 assert_eq!(size, data.len() as u64);
2000 assert!(cid.key.is_some()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
2003 assert_eq!(retrieved, data);
2004 }
2005
2006 #[tokio::test]
2007 async fn test_unified_put_get_encrypted_chunked() {
2008 let store = Arc::new(MemoryStore::new());
2009 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2010
2011 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2013 let (cid, size) = tree.put(&data).await.unwrap();
2014
2015 assert_eq!(size, data.len() as u64);
2016 assert!(cid.key.is_some());
2017
2018 let retrieved = tree.get(&cid).await.unwrap().unwrap();
2019 assert_eq!(retrieved, data);
2020 }
2021
2022 #[tokio::test]
2023 async fn test_cid_deterministic() {
2024 let store = Arc::new(MemoryStore::new());
2025 let tree = HashTree::new(HashTreeConfig::new(store));
2026
2027 let data = b"Same content produces same CID";
2028
2029 let (cid1, _) = tree.put(data).await.unwrap();
2030 let (cid2, _) = tree.put(data).await.unwrap();
2031
2032 assert_eq!(cid1.hash, cid2.hash);
2034 assert_eq!(cid1.key, cid2.key);
2035 assert_eq!(cid1.to_string(), cid2.to_string());
2036 }
2037
2038 #[tokio::test]
2039 async fn test_cid_to_string_public() {
2040 let store = Arc::new(MemoryStore::new());
2041 let tree = HashTree::new(HashTreeConfig::new(store).public());
2042
2043 let (cid, _) = tree.put(b"test").await.unwrap();
2044 let s = cid.to_string();
2045
2046 assert_eq!(s.len(), 64);
2048 assert!(!s.contains(':'));
2049 }
2050
2051 #[tokio::test]
2052 async fn test_cid_to_string_encrypted() {
2053 let store = Arc::new(MemoryStore::new());
2054 let tree = HashTree::new(HashTreeConfig::new(store));
2055
2056 let (cid, _) = tree.put(b"test").await.unwrap();
2057 let s = cid.to_string();
2058
2059 assert_eq!(s.len(), 129);
2061 assert!(s.contains(':'));
2062 }
2063}