1use std::pin::Pin;
7use std::sync::Arc;
8
9use futures::stream::{self, Stream};
10use futures::io::AsyncRead;
11use futures::AsyncReadExt;
12
13use crate::builder::{BuilderError, DEFAULT_CHUNK_SIZE, DEFAULT_MAX_LINKS};
14use crate::codec::{decode_tree_node, encode_and_hash, is_directory_node, is_tree_node, try_decode_tree_node};
15use crate::hash::sha256;
16use crate::reader::{ReaderError, TreeEntry, WalkEntry};
17use crate::store::Store;
18use crate::types::{to_hex, Cid, DirEntry, Hash, Link, LinkType, TreeNode};
19
20use crate::crypto::{decrypt_chk, encrypt_chk, EncryptionKey};
21
22#[derive(Clone)]
24pub struct HashTreeConfig<S: Store> {
25 pub store: Arc<S>,
26 pub chunk_size: usize,
27 pub max_links: usize,
28 pub encrypted: bool,
30}
31
32impl<S: Store> HashTreeConfig<S> {
33 pub fn new(store: Arc<S>) -> Self {
34 Self {
35 store,
36 chunk_size: DEFAULT_CHUNK_SIZE,
37 max_links: DEFAULT_MAX_LINKS,
38 encrypted: true,
39 }
40 }
41
42 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
43 self.chunk_size = chunk_size;
44 self
45 }
46
47 pub fn with_max_links(mut self, max_links: usize) -> Self {
48 self.max_links = max_links;
49 self
50 }
51
52 pub fn public(mut self) -> Self {
54 self.encrypted = false;
55 self
56 }
57}
58
59#[derive(Debug, thiserror::Error)]
61pub enum HashTreeError {
62 #[error("Store error: {0}")]
63 Store(String),
64 #[error("Codec error: {0}")]
65 Codec(#[from] crate::codec::CodecError),
66 #[error("Missing chunk: {0}")]
67 MissingChunk(String),
68 #[error("Path not found: {0}")]
69 PathNotFound(String),
70 #[error("Entry not found: {0}")]
71 EntryNotFound(String),
72 #[error("Encryption error: {0}")]
73 Encryption(String),
74 #[error("Decryption error: {0}")]
75 Decryption(String),
76}
77
78impl From<BuilderError> for HashTreeError {
79 fn from(e: BuilderError) -> Self {
80 match e {
81 BuilderError::Store(s) => HashTreeError::Store(s),
82 BuilderError::Codec(c) => HashTreeError::Codec(c),
83 BuilderError::Encryption(s) => HashTreeError::Encryption(s),
84 }
85 }
86}
87
88impl From<ReaderError> for HashTreeError {
89 fn from(e: ReaderError) -> Self {
90 match e {
91 ReaderError::Store(s) => HashTreeError::Store(s),
92 ReaderError::Codec(c) => HashTreeError::Codec(c),
93 ReaderError::MissingChunk(s) => HashTreeError::MissingChunk(s),
94 ReaderError::Decryption(s) => HashTreeError::Encryption(s),
95 ReaderError::MissingKey => HashTreeError::Encryption("missing decryption key".to_string()),
96 }
97 }
98}
99
100pub struct HashTree<S: Store> {
102 store: Arc<S>,
103 chunk_size: usize,
104 max_links: usize,
105 encrypted: bool,
106}
107
108impl<S: Store> HashTree<S> {
109 pub fn new(config: HashTreeConfig<S>) -> Self {
110 Self {
111 store: config.store,
112 chunk_size: config.chunk_size,
113 max_links: config.max_links,
114 encrypted: config.encrypted,
115 }
116 }
117
118 pub fn is_encrypted(&self) -> bool {
120 self.encrypted
121 }
122
123 pub async fn put(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
128 let size = data.len() as u64;
129
130 if data.len() <= self.chunk_size {
132 let (hash, key) = self.put_chunk_internal(data).await?;
133 return Ok((Cid { hash, key }, size));
134 }
135
136 let mut links: Vec<Link> = Vec::new();
138 let mut offset = 0;
139
140 while offset < data.len() {
141 let end = (offset + self.chunk_size).min(data.len());
142 let chunk = &data[offset..end];
143 let chunk_size = chunk.len() as u64;
144 let (hash, key) = self.put_chunk_internal(chunk).await?;
145 links.push(Link {
146 hash,
147 name: None,
148 size: chunk_size,
149 key,
150 link_type: LinkType::Blob, meta: None,
152 });
153 offset = end;
154 }
155
156 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
158 Ok((Cid { hash: root_hash, key: root_key }, size))
159 }
160
161 pub async fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>, HashTreeError> {
163 if let Some(key) = cid.key {
164 self.get_encrypted(&cid.hash, &key).await
165 } else {
166 self.read_file(&cid.hash).await
167 }
168 }
169
170 pub async fn put_stream<R: AsyncRead + Unpin>(&self, mut reader: R) -> Result<(Cid, u64), HashTreeError> {
176 let mut buffer = vec![0u8; self.chunk_size];
177 let mut links = Vec::new();
178 let mut total_size: u64 = 0;
179 let mut consistent_key: Option<[u8; 32]> = None;
180
181 loop {
182 let mut chunk = Vec::new();
183 let mut bytes_read = 0;
184
185 while bytes_read < self.chunk_size {
187 let n = reader.read(&mut buffer[..self.chunk_size - bytes_read]).await
188 .map_err(|e| HashTreeError::Store(format!("read error: {}", e)))?;
189 if n == 0 {
190 break; }
192 chunk.extend_from_slice(&buffer[..n]);
193 bytes_read += n;
194 }
195
196 if chunk.is_empty() {
197 break; }
199
200 let chunk_len = chunk.len() as u64;
201 total_size += chunk_len;
202
203 let (hash, key) = self.put_chunk_internal(&chunk).await?;
204
205 if links.is_empty() {
207 consistent_key = key;
208 } else if consistent_key != key {
209 consistent_key = None;
210 }
211
212 links.push(Link {
213 hash,
214 name: None,
215 size: chunk_len,
216 key,
217 link_type: LinkType::Blob, meta: None,
219 });
220 }
221
222 if links.is_empty() {
223 let (hash, key) = self.put_chunk_internal(&[]).await?;
225 return Ok((Cid { hash, key }, 0));
226 }
227
228 let (root_hash, root_key) = self.build_tree_internal(links, Some(total_size)).await?;
230 Ok((Cid { hash: root_hash, key: root_key }, total_size))
231 }
232
233 pub fn get_stream(
238 &self,
239 cid: &Cid,
240 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
241 let hash = cid.hash;
242 let key = cid.key;
243
244 if let Some(k) = key {
245 Box::pin(self.read_file_stream_encrypted(hash, k))
247 } else {
248 self.read_file_stream(hash)
250 }
251 }
252
253 fn read_file_stream_encrypted(
255 &self,
256 hash: Hash,
257 key: EncryptionKey,
258 ) -> impl Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_ {
259 stream::unfold(
260 EncryptedStreamState::Init { hash, key, tree: self },
261 |state| async move {
262 match state {
263 EncryptedStreamState::Init { hash, key, tree } => {
264 let data = match tree.store.get(&hash).await {
265 Ok(Some(d)) => d,
266 Ok(None) => return None,
267 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), EncryptedStreamState::Done)),
268 };
269
270 let decrypted = match decrypt_chk(&data, &key) {
272 Ok(d) => d,
273 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), EncryptedStreamState::Done)),
274 };
275
276 if !is_tree_node(&decrypted) {
277 return Some((Ok(decrypted), EncryptedStreamState::Done));
279 }
280
281 let node = match decode_tree_node(&decrypted) {
283 Ok(n) => n,
284 Err(e) => return Some((Err(HashTreeError::Codec(e)), EncryptedStreamState::Done)),
285 };
286
287 let mut stack: Vec<EncryptedStackItem> = Vec::new();
288 for link in node.links.into_iter().rev() {
289 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
290 }
291
292 tree.process_encrypted_stream_stack(&mut stack).await
293 }
294 EncryptedStreamState::Processing { mut stack, tree } => {
295 tree.process_encrypted_stream_stack(&mut stack).await
296 }
297 EncryptedStreamState::Done => None,
298 }
299 },
300 )
301 }
302
303 async fn process_encrypted_stream_stack<'a>(
304 &'a self,
305 stack: &mut Vec<EncryptedStackItem>,
306 ) -> Option<(Result<Vec<u8>, HashTreeError>, EncryptedStreamState<'a, S>)> {
307 while let Some(item) = stack.pop() {
308 let data = match self.store.get(&item.hash).await {
309 Ok(Some(d)) => d,
310 Ok(None) => {
311 return Some((
312 Err(HashTreeError::MissingChunk(to_hex(&item.hash))),
313 EncryptedStreamState::Done,
314 ))
315 }
316 Err(e) => {
317 return Some((
318 Err(HashTreeError::Store(e.to_string())),
319 EncryptedStreamState::Done,
320 ))
321 }
322 };
323
324 let decrypted = if let Some(key) = item.key {
326 match decrypt_chk(&data, &key) {
327 Ok(d) => d,
328 Err(e) => {
329 return Some((
330 Err(HashTreeError::Decryption(e.to_string())),
331 EncryptedStreamState::Done,
332 ))
333 }
334 }
335 } else {
336 data
337 };
338
339 if is_tree_node(&decrypted) {
340 let node = match decode_tree_node(&decrypted) {
342 Ok(n) => n,
343 Err(e) => {
344 return Some((
345 Err(HashTreeError::Codec(e)),
346 EncryptedStreamState::Done,
347 ))
348 }
349 };
350 for link in node.links.into_iter().rev() {
351 stack.push(EncryptedStackItem { hash: link.hash, key: link.key });
352 }
353 } else {
354 return Some((
356 Ok(decrypted),
357 EncryptedStreamState::Processing { stack: std::mem::take(stack), tree: self },
358 ));
359 }
360 }
361 None
362 }
363
364 async fn put_chunk_internal(&self, data: &[u8]) -> Result<(Hash, Option<EncryptionKey>), HashTreeError> {
366 if self.encrypted {
367 let (encrypted, key) = encrypt_chk(data)
368 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
369 let hash = sha256(&encrypted);
370 self.store
371 .put(hash, encrypted)
372 .await
373 .map_err(|e| HashTreeError::Store(e.to_string()))?;
374 Ok((hash, Some(key)))
375 } else {
376 let hash = self.put_blob(data).await?;
377 Ok((hash, None))
378 }
379 }
380
381 async fn build_tree_internal(
383 &self,
384 links: Vec<Link>,
385 total_size: Option<u64>,
386 ) -> Result<(Hash, Option<[u8; 32]>), HashTreeError> {
387 if links.len() == 1 {
389 if let Some(ts) = total_size {
390 if links[0].size == ts {
391 return Ok((links[0].hash, links[0].key));
392 }
393 }
394 }
395
396 if links.len() <= self.max_links {
397 let node = TreeNode {
398 node_type: LinkType::File,
399 links,
400 };
401 let (data, _) = encode_and_hash(&node)?;
402
403 if self.encrypted {
404 let (encrypted, key) = encrypt_chk(&data)
405 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
406 let hash = sha256(&encrypted);
407 self.store
408 .put(hash, encrypted)
409 .await
410 .map_err(|e| HashTreeError::Store(e.to_string()))?;
411 return Ok((hash, Some(key)));
412 }
413
414 let hash = sha256(&data);
416 self.store
417 .put(hash, data)
418 .await
419 .map_err(|e| HashTreeError::Store(e.to_string()))?;
420 return Ok((hash, None));
421 }
422
423 let mut sub_links = Vec::new();
425 for batch in links.chunks(self.max_links) {
426 let batch_size: u64 = batch.iter().map(|l| l.size).sum();
427 let (hash, key) = Box::pin(self.build_tree_internal(batch.to_vec(), Some(batch_size))).await?;
428 sub_links.push(Link {
429 hash,
430 name: None,
431 size: batch_size,
432 key,
433 link_type: LinkType::File, meta: None,
435 });
436 }
437
438 Box::pin(self.build_tree_internal(sub_links, total_size)).await
439 }
440
441 async fn get_encrypted(
443 &self,
444 hash: &Hash,
445 key: &EncryptionKey,
446 ) -> Result<Option<Vec<u8>>, HashTreeError> {
447 let encrypted_data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
448 Some(d) => d,
449 None => return Ok(None),
450 };
451
452 let decrypted = decrypt_chk(&encrypted_data, key)
454 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
455
456 if is_tree_node(&decrypted) {
458 let node = decode_tree_node(&decrypted)?;
459 let assembled = self.assemble_encrypted_chunks(&node).await?;
460 return Ok(Some(assembled));
461 }
462
463 Ok(Some(decrypted))
465 }
466
467 async fn assemble_encrypted_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
469 let mut parts: Vec<Vec<u8>> = Vec::new();
470
471 for link in &node.links {
472 let chunk_key = link.key.ok_or_else(|| HashTreeError::Encryption("missing chunk key".to_string()))?;
473
474 let encrypted_child = self
475 .store
476 .get(&link.hash)
477 .await
478 .map_err(|e| HashTreeError::Store(e.to_string()))?
479 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
480
481 let decrypted = decrypt_chk(&encrypted_child, &chunk_key)
482 .map_err(|e| HashTreeError::Encryption(e.to_string()))?;
483
484 if is_tree_node(&decrypted) {
485 let child_node = decode_tree_node(&decrypted)?;
487 let child_data = Box::pin(self.assemble_encrypted_chunks(&child_node)).await?;
488 parts.push(child_data);
489 } else {
490 parts.push(decrypted);
492 }
493 }
494
495 let total_len: usize = parts.iter().map(|p| p.len()).sum();
496 let mut result = Vec::with_capacity(total_len);
497 for part in parts {
498 result.extend_from_slice(&part);
499 }
500
501 Ok(result)
502 }
503
504 pub async fn put_blob(&self, data: &[u8]) -> Result<Hash, HashTreeError> {
509 let hash = sha256(data);
510 self.store
511 .put(hash, data.to_vec())
512 .await
513 .map_err(|e| HashTreeError::Store(e.to_string()))?;
514 Ok(hash)
515 }
516
517 pub async fn put_file(&self, data: &[u8]) -> Result<(Cid, u64), HashTreeError> {
520 let size = data.len() as u64;
521
522 if data.len() <= self.chunk_size {
524 let (hash, key) = self.put_chunk_internal(data).await?;
525 return Ok((Cid { hash, key }, size));
526 }
527
528 let mut links: Vec<Link> = Vec::new();
530 let mut offset = 0;
531
532 while offset < data.len() {
533 let end = (offset + self.chunk_size).min(data.len());
534 let chunk = &data[offset..end];
535 let chunk_size = (end - offset) as u64;
536
537 let (hash, key) = self.put_chunk_internal(chunk).await?;
538 links.push(Link {
539 hash,
540 name: None,
541 size: chunk_size,
542 key,
543 link_type: LinkType::Blob, meta: None,
545 });
546 offset = end;
547 }
548
549 let (root_hash, root_key) = self.build_tree_internal(links, Some(size)).await?;
551 Ok((Cid { hash: root_hash, key: root_key }, size))
552 }
553
554 pub async fn put_directory(
560 &self,
561 entries: Vec<DirEntry>,
562 ) -> Result<Cid, HashTreeError> {
563 let mut sorted = entries;
565 sorted.sort_by(|a, b| a.name.cmp(&b.name));
566
567 let links: Vec<Link> = sorted
568 .into_iter()
569 .map(|e| Link {
570 hash: e.hash,
571 name: Some(e.name),
572 size: e.size,
573 key: e.key,
574 link_type: e.link_type,
575 meta: e.meta,
576 })
577 .collect();
578
579 let node = TreeNode {
581 node_type: LinkType::Dir,
582 links,
583 };
584 let (data, _plain_hash) = encode_and_hash(&node)?;
585
586 let (cid, _size) = self.put(&data).await?;
591 Ok(cid)
592 }
593
594 pub async fn put_tree_node(
596 &self,
597 links: Vec<Link>,
598 ) -> Result<Hash, HashTreeError> {
599 let node = TreeNode {
600 node_type: LinkType::Dir,
601 links,
602 };
603
604 let (data, hash) = encode_and_hash(&node)?;
605 self.store
606 .put(hash, data)
607 .await
608 .map_err(|e| HashTreeError::Store(e.to_string()))?;
609 Ok(hash)
610 }
611
612 pub async fn get_blob(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
616 self.store
617 .get(hash)
618 .await
619 .map_err(|e| HashTreeError::Store(e.to_string()))
620 }
621
622 pub async fn get_tree_node(&self, hash: &Hash) -> Result<Option<TreeNode>, HashTreeError> {
624 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
625 Some(d) => d,
626 None => return Ok(None),
627 };
628
629 if !is_tree_node(&data) {
630 return Ok(None);
631 }
632
633 let node = decode_tree_node(&data)?;
634 Ok(Some(node))
635 }
636
637 pub async fn get_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
639 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
640 Some(d) => d,
641 None => return Ok(None),
642 };
643
644 let decrypted = if let Some(key) = &cid.key {
646 decrypt_chk(&data, key)
647 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
648 } else {
649 data
650 };
651
652 if !is_tree_node(&decrypted) {
653 return Ok(None);
654 }
655
656 let node = decode_tree_node(&decrypted)?;
657 Ok(Some(node))
658 }
659
660 pub async fn get_directory_node(&self, cid: &Cid) -> Result<Option<TreeNode>, HashTreeError> {
663 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
664 Some(d) => d,
665 None => return Ok(None),
666 };
667
668 let decrypted = if let Some(key) = &cid.key {
670 decrypt_chk(&data, key)
671 .map_err(|e| HashTreeError::Decryption(e.to_string()))?
672 } else {
673 data
674 };
675
676 if !is_tree_node(&decrypted) {
677 return Ok(None);
678 }
679
680 let node = decode_tree_node(&decrypted)?;
681
682 if node.node_type == LinkType::File {
684 let assembled = self.assemble_chunks(&node).await?;
685 if is_tree_node(&assembled) {
686 let inner_node = decode_tree_node(&assembled)?;
687 return Ok(Some(inner_node));
688 }
689 }
690
691 Ok(Some(node))
692 }
693
694 pub async fn is_tree(&self, hash: &Hash) -> Result<bool, HashTreeError> {
696 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
697 Some(d) => d,
698 None => return Ok(false),
699 };
700 Ok(is_tree_node(&data))
701 }
702
703 pub async fn is_dir(&self, cid: &Cid) -> Result<bool, HashTreeError> {
705 let node = match self.get_node(cid).await? {
706 Some(n) => n,
707 None => return Ok(false),
708 };
709 Ok(node.links.iter().any(|l| l.name.as_ref().map(|n| !n.starts_with('_')).unwrap_or(false)))
711 }
712
713 pub async fn is_directory(&self, hash: &Hash) -> Result<bool, HashTreeError> {
715 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
716 Some(d) => d,
717 None => return Ok(false),
718 };
719 Ok(is_directory_node(&data))
720 }
721
722 pub async fn read_file(&self, hash: &Hash) -> Result<Option<Vec<u8>>, HashTreeError> {
724 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
725 Some(d) => d,
726 None => return Ok(None),
727 };
728
729 if !is_tree_node(&data) {
731 return Ok(Some(data));
732 }
733
734 let node = decode_tree_node(&data)?;
736 let assembled = self.assemble_chunks(&node).await?;
737 Ok(Some(assembled))
738 }
739
740 pub async fn read_file_range(
747 &self,
748 hash: &Hash,
749 start: u64,
750 end: Option<u64>,
751 ) -> Result<Option<Vec<u8>>, HashTreeError> {
752 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
753 Some(d) => d,
754 None => return Ok(None),
755 };
756
757 if !is_tree_node(&data) {
759 let start_idx = start as usize;
760 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
761 if start_idx >= data.len() {
762 return Ok(Some(vec![]));
763 }
764 let end_idx = end_idx.min(data.len());
765 return Ok(Some(data[start_idx..end_idx].to_vec()));
766 }
767
768 let node = decode_tree_node(&data)?;
770 let range_data = self.assemble_chunks_range(&node, start, end).await?;
771 Ok(Some(range_data))
772 }
773
774 pub async fn read_file_range_cid(
776 &self,
777 cid: &Cid,
778 start: u64,
779 end: Option<u64>,
780 ) -> Result<Option<Vec<u8>>, HashTreeError> {
781 if let Some(_key) = cid.key {
782 let data = match self.get(cid).await? {
783 Some(d) => d,
784 None => return Ok(None),
785 };
786 let start_idx = start as usize;
787 let end_idx = end.map(|e| e as usize).unwrap_or(data.len());
788 if start_idx >= data.len() {
789 return Ok(Some(vec![]));
790 }
791 let end_idx = end_idx.min(data.len());
792 return Ok(Some(data[start_idx..end_idx].to_vec()));
793 }
794
795 self.read_file_range(&cid.hash, start, end).await
796 }
797
798 async fn assemble_chunks_range(
800 &self,
801 node: &TreeNode,
802 start: u64,
803 end: Option<u64>,
804 ) -> Result<Vec<u8>, HashTreeError> {
805 let chunks_info = self.collect_chunk_offsets(node).await?;
807
808 if chunks_info.is_empty() {
809 return Ok(vec![]);
810 }
811
812 let total_size: u64 = chunks_info.iter().map(|(_, _, size)| size).sum();
814 let actual_end = end.unwrap_or(total_size).min(total_size);
815
816 if start >= actual_end {
817 return Ok(vec![]);
818 }
819
820 let mut result = Vec::with_capacity((actual_end - start) as usize);
822 let mut current_offset = 0u64;
823
824 for (chunk_hash, _chunk_offset, chunk_size) in &chunks_info {
825 let chunk_start = current_offset;
826 let chunk_end = current_offset + chunk_size;
827
828 if chunk_end > start && chunk_start < actual_end {
830 let chunk_data = self
832 .store
833 .get(chunk_hash)
834 .await
835 .map_err(|e| HashTreeError::Store(e.to_string()))?
836 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(chunk_hash)))?;
837
838 let slice_start = if start > chunk_start {
840 (start - chunk_start) as usize
841 } else {
842 0
843 };
844 let slice_end = if actual_end < chunk_end {
845 (actual_end - chunk_start) as usize
846 } else {
847 chunk_data.len()
848 };
849
850 result.extend_from_slice(&chunk_data[slice_start..slice_end]);
851 }
852
853 current_offset = chunk_end;
854
855 if current_offset >= actual_end {
857 break;
858 }
859 }
860
861 Ok(result)
862 }
863
864 async fn collect_chunk_offsets(
867 &self,
868 node: &TreeNode,
869 ) -> Result<Vec<(Hash, u64, u64)>, HashTreeError> {
870 let mut chunks = Vec::new();
871 let mut offset = 0u64;
872 self.collect_chunk_offsets_recursive(node, &mut chunks, &mut offset).await?;
873 Ok(chunks)
874 }
875
876 async fn collect_chunk_offsets_recursive(
877 &self,
878 node: &TreeNode,
879 chunks: &mut Vec<(Hash, u64, u64)>,
880 offset: &mut u64,
881 ) -> Result<(), HashTreeError> {
882 for link in &node.links {
883 let child_data = self
884 .store
885 .get(&link.hash)
886 .await
887 .map_err(|e| HashTreeError::Store(e.to_string()))?
888 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
889
890 if is_tree_node(&child_data) {
891 let child_node = decode_tree_node(&child_data)?;
893 Box::pin(self.collect_chunk_offsets_recursive(&child_node, chunks, offset)).await?;
894 } else {
895 let size = child_data.len() as u64;
897 chunks.push((link.hash, *offset, size));
898 *offset += size;
899 }
900 }
901 Ok(())
902 }
903
904 async fn assemble_chunks(&self, node: &TreeNode) -> Result<Vec<u8>, HashTreeError> {
906 let mut parts: Vec<Vec<u8>> = Vec::new();
907
908 for link in &node.links {
909 let child_data = self
910 .store
911 .get(&link.hash)
912 .await
913 .map_err(|e| HashTreeError::Store(e.to_string()))?
914 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
915
916 if is_tree_node(&child_data) {
917 let child_node = decode_tree_node(&child_data)?;
918 parts.push(Box::pin(self.assemble_chunks(&child_node)).await?);
919 } else {
920 parts.push(child_data);
921 }
922 }
923
924 let total_length: usize = parts.iter().map(|p| p.len()).sum();
926 let mut result = Vec::with_capacity(total_length);
927 for part in parts {
928 result.extend_from_slice(&part);
929 }
930
931 Ok(result)
932 }
933
934 pub fn read_file_stream(
937 &self,
938 hash: Hash,
939 ) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, HashTreeError>> + Send + '_>> {
940 Box::pin(stream::unfold(
941 ReadStreamState::Init { hash, tree: self },
942 |state| async move {
943 match state {
944 ReadStreamState::Init { hash, tree } => {
945 let data = match tree.store.get(&hash).await {
946 Ok(Some(d)) => d,
947 Ok(None) => return None,
948 Err(e) => return Some((Err(HashTreeError::Store(e.to_string())), ReadStreamState::Done)),
949 };
950
951 if !is_tree_node(&data) {
952 return Some((Ok(data), ReadStreamState::Done));
954 }
955
956 let node = match decode_tree_node(&data) {
958 Ok(n) => n,
959 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
960 };
961
962 let mut stack: Vec<StreamStackItem> = Vec::new();
964 for link in node.links.into_iter().rev() {
965 stack.push(StreamStackItem::Hash(link.hash));
966 }
967
968 tree.process_stream_stack(&mut stack).await
970 }
971 ReadStreamState::Processing { mut stack, tree } => {
972 tree.process_stream_stack(&mut stack).await
973 }
974 ReadStreamState::Done => None,
975 }
976 },
977 ))
978 }
979
980 async fn process_stream_stack<'a>(
981 &'a self,
982 stack: &mut Vec<StreamStackItem>,
983 ) -> Option<(Result<Vec<u8>, HashTreeError>, ReadStreamState<'a, S>)> {
984 while let Some(item) = stack.pop() {
985 match item {
986 StreamStackItem::Hash(hash) => {
987 let data = match self.store.get(&hash).await {
988 Ok(Some(d)) => d,
989 Ok(None) => {
990 return Some((
991 Err(HashTreeError::MissingChunk(to_hex(&hash))),
992 ReadStreamState::Done,
993 ))
994 }
995 Err(e) => {
996 return Some((
997 Err(HashTreeError::Store(e.to_string())),
998 ReadStreamState::Done,
999 ))
1000 }
1001 };
1002
1003 if is_tree_node(&data) {
1004 let node = match decode_tree_node(&data) {
1006 Ok(n) => n,
1007 Err(e) => return Some((Err(HashTreeError::Codec(e)), ReadStreamState::Done)),
1008 };
1009 for link in node.links.into_iter().rev() {
1010 stack.push(StreamStackItem::Hash(link.hash));
1011 }
1012 } else {
1013 return Some((Ok(data), ReadStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1015 }
1016 }
1017 }
1018 }
1019 None
1020 }
1021
1022 pub async fn read_file_chunks(&self, hash: &Hash) -> Result<Vec<Vec<u8>>, HashTreeError> {
1024 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1025 Some(d) => d,
1026 None => return Ok(vec![]),
1027 };
1028
1029 if !is_tree_node(&data) {
1030 return Ok(vec![data]);
1031 }
1032
1033 let node = decode_tree_node(&data)?;
1034 self.collect_chunks(&node).await
1035 }
1036
1037 async fn collect_chunks(&self, node: &TreeNode) -> Result<Vec<Vec<u8>>, HashTreeError> {
1038 let mut chunks = Vec::new();
1039
1040 for link in &node.links {
1041 let child_data = self
1042 .store
1043 .get(&link.hash)
1044 .await
1045 .map_err(|e| HashTreeError::Store(e.to_string()))?
1046 .ok_or_else(|| HashTreeError::MissingChunk(to_hex(&link.hash)))?;
1047
1048 if is_tree_node(&child_data) {
1049 let child_node = decode_tree_node(&child_data)?;
1050 chunks.extend(Box::pin(self.collect_chunks(&child_node)).await?);
1051 } else {
1052 chunks.push(child_data);
1053 }
1054 }
1055
1056 Ok(chunks)
1057 }
1058
1059 pub async fn list(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1061 let node = match self.get_node(cid).await? {
1062 Some(n) => n,
1063 None => return Ok(vec![]),
1064 };
1065
1066 let mut entries = Vec::new();
1067
1068 for link in &node.links {
1069 if let Some(ref name) = link.name {
1071 if name.starts_with("_chunk_") || name.starts_with('_') {
1072 let chunk_cid = Cid { hash: link.hash, key: link.key };
1073 let sub_entries = Box::pin(self.list(&chunk_cid)).await?;
1074 entries.extend(sub_entries);
1075 continue;
1076 }
1077 }
1078
1079 entries.push(TreeEntry {
1080 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1081 hash: link.hash,
1082 size: link.size,
1083 link_type: link.link_type,
1084 key: link.key,
1085 meta: link.meta.clone(),
1086 });
1087 }
1088
1089 Ok(entries)
1090 }
1091
1092 pub async fn list_directory(&self, cid: &Cid) -> Result<Vec<TreeEntry>, HashTreeError> {
1095 let node = match self.get_directory_node(cid).await? {
1097 Some(n) => n,
1098 None => return Ok(vec![]),
1099 };
1100
1101 let mut entries = Vec::new();
1102
1103 for link in &node.links {
1104 if let Some(ref name) = link.name {
1106 if name.starts_with("_chunk_") || name.starts_with('_') {
1107 let sub_cid = Cid { hash: link.hash, key: cid.key };
1109 let sub_entries = Box::pin(self.list_directory(&sub_cid)).await?;
1110 entries.extend(sub_entries);
1111 continue;
1112 }
1113 }
1114
1115 entries.push(TreeEntry {
1116 name: link.name.clone().unwrap_or_else(|| to_hex(&link.hash)),
1117 hash: link.hash,
1118 size: link.size,
1119 link_type: link.link_type,
1120 key: link.key,
1121 meta: link.meta.clone(),
1122 });
1123 }
1124
1125 Ok(entries)
1126 }
1127
1128 pub async fn resolve(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1130 let parts: Vec<&str> = path.split('/').filter(|p| !p.is_empty()).collect();
1131 if parts.is_empty() {
1132 return Ok(Some(cid.clone()));
1133 }
1134
1135 let mut current_cid = cid.clone();
1136
1137 for part in parts {
1138 let node = match self.get_directory_node(¤t_cid).await? {
1140 Some(n) => n,
1141 None => return Ok(None),
1142 };
1143
1144 if let Some(link) = self.find_link(&node, part) {
1145 current_cid = Cid {
1146 hash: link.hash,
1147 key: link.key,
1148 };
1149 } else {
1150 match self.find_link_in_subtrees_cid(&node, part, ¤t_cid).await? {
1152 Some(link) => {
1153 current_cid = Cid {
1154 hash: link.hash,
1155 key: link.key,
1156 };
1157 }
1158 None => return Ok(None),
1159 }
1160 }
1161 }
1162
1163 Ok(Some(current_cid))
1164 }
1165
1166 pub async fn resolve_path(&self, cid: &Cid, path: &str) -> Result<Option<Cid>, HashTreeError> {
1168 self.resolve(cid, path).await
1169 }
1170
1171 fn find_link(&self, node: &TreeNode, name: &str) -> Option<Link> {
1172 node.links
1173 .iter()
1174 .find(|l| l.name.as_deref() == Some(name))
1175 .cloned()
1176 }
1177
1178 async fn find_link_in_subtrees_cid(&self, node: &TreeNode, name: &str, _parent_cid: &Cid) -> Result<Option<Link>, HashTreeError> {
1180 for link in &node.links {
1181 if !link.name.as_ref().map(|n| n.starts_with('_')).unwrap_or(false) {
1182 continue;
1183 }
1184
1185 let sub_cid = Cid {
1187 hash: link.hash,
1188 key: link.key.clone(),
1189 };
1190
1191 let sub_node = match self.get_node(&sub_cid).await? {
1192 Some(n) => n,
1193 None => continue,
1194 };
1195
1196 if let Some(found) = self.find_link(&sub_node, name) {
1197 return Ok(Some(found));
1198 }
1199
1200 if let Some(deep_found) = Box::pin(self.find_link_in_subtrees_cid(&sub_node, name, &sub_cid)).await? {
1201 return Ok(Some(deep_found));
1202 }
1203 }
1204
1205 Ok(None)
1206 }
1207
1208 pub async fn get_size(&self, hash: &Hash) -> Result<u64, HashTreeError> {
1210 let data = match self.store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1211 Some(d) => d,
1212 None => return Ok(0),
1213 };
1214
1215 if !is_tree_node(&data) {
1216 return Ok(data.len() as u64);
1217 }
1218
1219 let node = decode_tree_node(&data)?;
1220 let mut total = 0u64;
1222 for link in &node.links {
1223 total += link.size;
1224 }
1225 Ok(total)
1226 }
1227
1228 pub async fn get_size_cid(&self, cid: &Cid) -> Result<u64, HashTreeError> {
1230 if cid.key.is_some() {
1231 let data = match self.get(cid).await? {
1232 Some(d) => d,
1233 None => return Ok(0),
1234 };
1235 return Ok(data.len() as u64);
1236 }
1237
1238 self.get_size(&cid.hash).await
1239 }
1240
1241 pub async fn walk(&self, cid: &Cid, path: &str) -> Result<Vec<WalkEntry>, HashTreeError> {
1243 let mut entries = Vec::new();
1244 self.walk_recursive(cid, path, &mut entries).await?;
1245 Ok(entries)
1246 }
1247
1248 async fn walk_recursive(
1249 &self,
1250 cid: &Cid,
1251 path: &str,
1252 entries: &mut Vec<WalkEntry>,
1253 ) -> Result<(), HashTreeError> {
1254 let data = match self.store.get(&cid.hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1255 Some(d) => d,
1256 None => return Ok(()),
1257 };
1258
1259 let data = if let Some(key) = &cid.key {
1261 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1262 } else {
1263 data
1264 };
1265
1266 let node = match try_decode_tree_node(&data) {
1267 Some(n) => n,
1268 None => {
1269 entries.push(WalkEntry {
1270 path: path.to_string(),
1271 hash: cid.hash,
1272 link_type: LinkType::Blob,
1273 size: data.len() as u64,
1274 key: cid.key,
1275 });
1276 return Ok(());
1277 }
1278 };
1279
1280 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1281 entries.push(WalkEntry {
1282 path: path.to_string(),
1283 hash: cid.hash,
1284 link_type: node.node_type,
1285 size: node_size,
1286 key: cid.key,
1287 });
1288
1289 for link in &node.links {
1290 let child_path = match &link.name {
1291 Some(name) => {
1292 if name.starts_with("_chunk_") || name.starts_with('_') {
1293 let sub_cid = Cid { hash: link.hash, key: cid.key };
1295 Box::pin(self.walk_recursive(&sub_cid, path, entries)).await?;
1296 continue;
1297 }
1298 if path.is_empty() {
1299 name.clone()
1300 } else {
1301 format!("{}/{}", path, name)
1302 }
1303 }
1304 None => path.to_string(),
1305 };
1306
1307 let child_cid = Cid { hash: link.hash, key: link.key };
1309 Box::pin(self.walk_recursive(&child_cid, &child_path, entries)).await?;
1310 }
1311
1312 Ok(())
1313 }
1314
1315 pub async fn walk_parallel(&self, cid: &Cid, path: &str, concurrency: usize) -> Result<Vec<WalkEntry>, HashTreeError> {
1318 self.walk_parallel_with_progress(cid, path, concurrency, None).await
1319 }
1320
1321 pub async fn walk_parallel_with_progress(
1328 &self,
1329 cid: &Cid,
1330 path: &str,
1331 concurrency: usize,
1332 progress: Option<&std::sync::atomic::AtomicUsize>,
1333 ) -> Result<Vec<WalkEntry>, HashTreeError> {
1334 use futures::stream::{FuturesUnordered, StreamExt};
1335 use std::collections::VecDeque;
1336 use std::sync::atomic::Ordering;
1337
1338 let mut entries = Vec::new();
1339 let mut pending: VecDeque<(Cid, String)> = VecDeque::new();
1340 let mut active = FuturesUnordered::new();
1341
1342 pending.push_back((cid.clone(), path.to_string()));
1344
1345 loop {
1346 while active.len() < concurrency {
1348 if let Some((node_cid, node_path)) = pending.pop_front() {
1349 let store = &self.store;
1350 let fut = async move {
1351 let data = store.get(&node_cid.hash).await
1352 .map_err(|e| HashTreeError::Store(e.to_string()))?;
1353 Ok::<_, HashTreeError>((node_cid, node_path, data))
1354 };
1355 active.push(fut);
1356 } else {
1357 break;
1358 }
1359 }
1360
1361 if active.is_empty() {
1363 break;
1364 }
1365
1366 if let Some(result) = active.next().await {
1368 let (node_cid, node_path, data) = result?;
1369
1370 if let Some(counter) = progress {
1372 counter.fetch_add(1, Ordering::Relaxed);
1373 }
1374
1375 let data = match data {
1376 Some(d) => d,
1377 None => continue,
1378 };
1379
1380 let data = if let Some(key) = &node_cid.key {
1382 decrypt_chk(&data, key).map_err(|e| HashTreeError::Decryption(e.to_string()))?
1383 } else {
1384 data
1385 };
1386
1387 let node = match try_decode_tree_node(&data) {
1388 Some(n) => n,
1389 None => {
1390 entries.push(WalkEntry {
1392 path: node_path,
1393 hash: node_cid.hash,
1394 link_type: LinkType::Blob,
1395 size: data.len() as u64,
1396 key: node_cid.key,
1397 });
1398 continue;
1399 }
1400 };
1401
1402 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1404 entries.push(WalkEntry {
1405 path: node_path.clone(),
1406 hash: node_cid.hash,
1407 link_type: node.node_type,
1408 size: node_size,
1409 key: node_cid.key,
1410 });
1411
1412 for link in &node.links {
1414 let child_path = match &link.name {
1415 Some(name) => {
1416 if name.starts_with("_chunk_") || name.starts_with('_') {
1417 let sub_cid = Cid { hash: link.hash, key: node_cid.key };
1419 pending.push_back((sub_cid, node_path.clone()));
1420 continue;
1421 }
1422 if node_path.is_empty() {
1423 name.clone()
1424 } else {
1425 format!("{}/{}", node_path, name)
1426 }
1427 }
1428 None => node_path.clone(),
1429 };
1430
1431 if link.link_type == LinkType::Blob {
1434 entries.push(WalkEntry {
1435 path: child_path,
1436 hash: link.hash,
1437 link_type: LinkType::Blob,
1438 size: link.size,
1439 key: link.key,
1440 });
1441 if let Some(counter) = progress {
1442 counter.fetch_add(1, Ordering::Relaxed);
1443 }
1444 continue;
1445 }
1446
1447 let child_cid = Cid { hash: link.hash, key: link.key };
1449 pending.push_back((child_cid, child_path));
1450 }
1451 }
1452 }
1453
1454 Ok(entries)
1455 }
1456
1457 pub fn walk_stream(
1459 &self,
1460 cid: Cid,
1461 initial_path: String,
1462 ) -> Pin<Box<dyn Stream<Item = Result<WalkEntry, HashTreeError>> + Send + '_>> {
1463 Box::pin(stream::unfold(
1464 WalkStreamState::Init { cid, path: initial_path, tree: self },
1465 |state| async move {
1466 match state {
1467 WalkStreamState::Init { cid, path, tree } => {
1468 let data = match tree.store.get(&cid.hash).await {
1469 Ok(Some(d)) => d,
1470 Ok(None) => return None,
1471 Err(e) => {
1472 return Some((
1473 Err(HashTreeError::Store(e.to_string())),
1474 WalkStreamState::Done,
1475 ))
1476 }
1477 };
1478
1479 let data = if let Some(key) = &cid.key {
1481 match decrypt_chk(&data, key) {
1482 Ok(d) => d,
1483 Err(e) => return Some((Err(HashTreeError::Decryption(e.to_string())), WalkStreamState::Done)),
1484 }
1485 } else {
1486 data
1487 };
1488
1489 let node = match try_decode_tree_node(&data) {
1490 Some(n) => n,
1491 None => {
1492 let entry = WalkEntry {
1494 path,
1495 hash: cid.hash,
1496 link_type: LinkType::Blob,
1497 size: data.len() as u64,
1498 key: cid.key,
1499 };
1500 return Some((Ok(entry), WalkStreamState::Done));
1501 }
1502 };
1503
1504 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1505 let entry = WalkEntry {
1506 path: path.clone(),
1507 hash: cid.hash,
1508 link_type: node.node_type,
1509 size: node_size,
1510 key: cid.key,
1511 };
1512
1513 let mut stack: Vec<WalkStackItem> = Vec::new();
1515 for link in node.links.into_iter().rev() {
1516 let child_path = match &link.name {
1517 Some(name) if !name.starts_with('_') => {
1518 if path.is_empty() {
1519 name.clone()
1520 } else {
1521 format!("{}/{}", path, name)
1522 }
1523 }
1524 _ => path.clone(),
1525 };
1526 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1528 }
1529
1530 Some((Ok(entry), WalkStreamState::Processing { stack, tree }))
1531 }
1532 WalkStreamState::Processing { mut stack, tree } => {
1533 tree.process_walk_stack(&mut stack).await
1534 }
1535 WalkStreamState::Done => None,
1536 }
1537 },
1538 ))
1539 }
1540
1541 async fn process_walk_stack<'a>(
1542 &'a self,
1543 stack: &mut Vec<WalkStackItem>,
1544 ) -> Option<(Result<WalkEntry, HashTreeError>, WalkStreamState<'a, S>)> {
1545 while let Some(item) = stack.pop() {
1546 let data = match self.store.get(&item.hash).await {
1547 Ok(Some(d)) => d,
1548 Ok(None) => continue,
1549 Err(e) => {
1550 return Some((
1551 Err(HashTreeError::Store(e.to_string())),
1552 WalkStreamState::Done,
1553 ))
1554 }
1555 };
1556
1557 let node = match try_decode_tree_node(&data) {
1558 Some(n) => n,
1559 None => {
1560 let entry = WalkEntry {
1562 path: item.path,
1563 hash: item.hash,
1564 link_type: LinkType::Blob,
1565 size: data.len() as u64,
1566 key: item.key,
1567 };
1568 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1569 }
1570 };
1571
1572 let node_size: u64 = node.links.iter().map(|l| l.size).sum();
1573 let entry = WalkEntry {
1574 path: item.path.clone(),
1575 hash: item.hash,
1576 link_type: node.node_type,
1577 size: node_size,
1578 key: None, };
1580
1581 for link in node.links.into_iter().rev() {
1583 let child_path = match &link.name {
1584 Some(name) if !name.starts_with('_') => {
1585 if item.path.is_empty() {
1586 name.clone()
1587 } else {
1588 format!("{}/{}", item.path, name)
1589 }
1590 }
1591 _ => item.path.clone(),
1592 };
1593 stack.push(WalkStackItem { hash: link.hash, path: child_path, key: link.key });
1594 }
1595
1596 return Some((Ok(entry), WalkStreamState::Processing { stack: std::mem::take(stack), tree: self }));
1597 }
1598 None
1599 }
1600
1601 pub async fn set_entry(
1606 &self,
1607 root: &Cid,
1608 path: &[&str],
1609 name: &str,
1610 entry_cid: &Cid,
1611 size: u64,
1612 link_type: LinkType,
1613 ) -> Result<Cid, HashTreeError> {
1614 let dir_cid = self.resolve_path_array(root, path).await?;
1615 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1616
1617 let entries = self.list_directory(&dir_cid).await?;
1618 let mut new_entries: Vec<DirEntry> = entries
1619 .into_iter()
1620 .filter(|e| e.name != name)
1621 .map(|e| DirEntry {
1622 name: e.name,
1623 hash: e.hash,
1624 size: e.size,
1625 key: e.key,
1626 link_type: e.link_type,
1627 meta: e.meta,
1628 })
1629 .collect();
1630
1631 new_entries.push(DirEntry {
1632 name: name.to_string(),
1633 hash: entry_cid.hash,
1634 size,
1635 key: entry_cid.key,
1636 link_type,
1637 meta: None,
1638 });
1639
1640 let new_dir_cid = self.put_directory(new_entries).await?;
1641 self.rebuild_path(root, path, new_dir_cid).await
1642 }
1643
1644 pub async fn remove_entry(
1647 &self,
1648 root: &Cid,
1649 path: &[&str],
1650 name: &str,
1651 ) -> Result<Cid, HashTreeError> {
1652 let dir_cid = self.resolve_path_array(root, path).await?;
1653 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1654
1655 let entries = self.list_directory(&dir_cid).await?;
1656 let new_entries: Vec<DirEntry> = entries
1657 .into_iter()
1658 .filter(|e| e.name != name)
1659 .map(|e| DirEntry {
1660 name: e.name,
1661 hash: e.hash,
1662 size: e.size,
1663 key: e.key,
1664 link_type: e.link_type,
1665 meta: e.meta,
1666 })
1667 .collect();
1668
1669 let new_dir_cid = self.put_directory(new_entries).await?;
1670 self.rebuild_path(root, path, new_dir_cid).await
1671 }
1672
1673 pub async fn rename_entry(
1676 &self,
1677 root: &Cid,
1678 path: &[&str],
1679 old_name: &str,
1680 new_name: &str,
1681 ) -> Result<Cid, HashTreeError> {
1682 if old_name == new_name {
1683 return Ok(root.clone());
1684 }
1685
1686 let dir_cid = self.resolve_path_array(root, path).await?;
1687 let dir_cid = dir_cid.ok_or_else(|| HashTreeError::PathNotFound(path.join("/")))?;
1688
1689 let entries = self.list_directory(&dir_cid).await?;
1690 let entry = entries
1691 .iter()
1692 .find(|e| e.name == old_name)
1693 .ok_or_else(|| HashTreeError::EntryNotFound(old_name.to_string()))?;
1694
1695 let entry_hash = entry.hash;
1696 let entry_size = entry.size;
1697 let entry_key = entry.key;
1698 let entry_link_type = entry.link_type;
1699 let entry_meta = entry.meta.clone();
1700
1701 let new_entries: Vec<DirEntry> = entries
1702 .into_iter()
1703 .filter(|e| e.name != old_name)
1704 .map(|e| DirEntry {
1705 name: e.name,
1706 hash: e.hash,
1707 size: e.size,
1708 key: e.key,
1709 link_type: e.link_type,
1710 meta: e.meta,
1711 })
1712 .chain(std::iter::once(DirEntry {
1713 name: new_name.to_string(),
1714 hash: entry_hash,
1715 size: entry_size,
1716 key: entry_key,
1717 link_type: entry_link_type,
1718 meta: entry_meta,
1719 }))
1720 .collect();
1721
1722 let new_dir_cid = self.put_directory(new_entries).await?;
1723 self.rebuild_path(root, path, new_dir_cid).await
1724 }
1725
1726 pub async fn move_entry(
1729 &self,
1730 root: &Cid,
1731 source_path: &[&str],
1732 name: &str,
1733 target_path: &[&str],
1734 ) -> Result<Cid, HashTreeError> {
1735 let source_dir_cid = self.resolve_path_array(root, source_path).await?;
1736 let source_dir_cid = source_dir_cid.ok_or_else(|| HashTreeError::PathNotFound(source_path.join("/")))?;
1737
1738 let source_entries = self.list_directory(&source_dir_cid).await?;
1739 let entry = source_entries
1740 .iter()
1741 .find(|e| e.name == name)
1742 .ok_or_else(|| HashTreeError::EntryNotFound(name.to_string()))?;
1743
1744 let entry_cid = Cid {
1745 hash: entry.hash,
1746 key: entry.key,
1747 };
1748 let entry_size = entry.size;
1749 let entry_link_type = entry.link_type;
1750
1751 let new_root = self.remove_entry(root, source_path, name).await?;
1753
1754 self.set_entry(&new_root, target_path, name, &entry_cid, entry_size, entry_link_type).await
1756 }
1757
1758 async fn resolve_path_array(&self, root: &Cid, path: &[&str]) -> Result<Option<Cid>, HashTreeError> {
1759 if path.is_empty() {
1760 return Ok(Some(root.clone()));
1761 }
1762 self.resolve_path(root, &path.join("/")).await
1763 }
1764
1765 async fn rebuild_path(
1766 &self,
1767 root: &Cid,
1768 path: &[&str],
1769 new_child: Cid,
1770 ) -> Result<Cid, HashTreeError> {
1771 if path.is_empty() {
1772 return Ok(new_child);
1773 }
1774
1775 let mut child_cid = new_child;
1776 let parts: Vec<&str> = path.to_vec();
1777
1778 for i in (0..parts.len()).rev() {
1779 let child_name = parts[i];
1780 let parent_path = &parts[..i];
1781
1782 let parent_cid = if parent_path.is_empty() {
1783 root.clone()
1784 } else {
1785 self.resolve_path_array(root, parent_path)
1786 .await?
1787 .ok_or_else(|| HashTreeError::PathNotFound(parent_path.join("/")))?
1788 };
1789
1790 let parent_entries = self.list_directory(&parent_cid).await?;
1791 let new_parent_entries: Vec<DirEntry> = parent_entries
1792 .into_iter()
1793 .map(|e| {
1794 if e.name == child_name {
1795 DirEntry {
1796 name: e.name,
1797 hash: child_cid.hash,
1798 size: 0, key: child_cid.key,
1800 link_type: e.link_type,
1801 meta: e.meta,
1802 }
1803 } else {
1804 DirEntry {
1805 name: e.name,
1806 hash: e.hash,
1807 size: e.size,
1808 key: e.key,
1809 link_type: e.link_type,
1810 meta: e.meta,
1811 }
1812 }
1813 })
1814 .collect();
1815
1816 child_cid = self.put_directory(new_parent_entries).await?;
1817 }
1818
1819 Ok(child_cid)
1820 }
1821
1822 pub fn get_store(&self) -> Arc<S> {
1826 self.store.clone()
1827 }
1828
1829 pub fn chunk_size(&self) -> usize {
1831 self.chunk_size
1832 }
1833
1834 pub fn max_links(&self) -> usize {
1836 self.max_links
1837 }
1838}
1839
1840enum StreamStackItem {
1843 Hash(Hash),
1844}
1845
1846enum ReadStreamState<'a, S: Store> {
1847 Init { hash: Hash, tree: &'a HashTree<S> },
1848 Processing { stack: Vec<StreamStackItem>, tree: &'a HashTree<S> },
1849 Done,
1850}
1851
1852struct WalkStackItem {
1853 hash: Hash,
1854 path: String,
1855 key: Option<[u8; 32]>,
1856}
1857
1858enum WalkStreamState<'a, S: Store> {
1859 Init { cid: Cid, path: String, tree: &'a HashTree<S> },
1860 Processing { stack: Vec<WalkStackItem>, tree: &'a HashTree<S> },
1861 Done,
1862}
1863
1864struct EncryptedStackItem {
1866 hash: Hash,
1867 key: Option<[u8; 32]>,
1868}
1869
1870enum EncryptedStreamState<'a, S: Store> {
1871 Init { hash: Hash, key: [u8; 32], tree: &'a HashTree<S> },
1872 Processing { stack: Vec<EncryptedStackItem>, tree: &'a HashTree<S> },
1873 Done,
1874}
1875
1876pub async fn verify_tree<S: Store>(
1878 store: Arc<S>,
1879 root_hash: &Hash,
1880) -> Result<crate::reader::VerifyResult, HashTreeError> {
1881 let mut missing = Vec::new();
1882 let mut visited = std::collections::HashSet::new();
1883
1884 verify_recursive(store, root_hash, &mut missing, &mut visited).await?;
1885
1886 Ok(crate::reader::VerifyResult {
1887 valid: missing.is_empty(),
1888 missing,
1889 })
1890}
1891
1892async fn verify_recursive<S: Store>(
1893 store: Arc<S>,
1894 hash: &Hash,
1895 missing: &mut Vec<Hash>,
1896 visited: &mut std::collections::HashSet<String>,
1897) -> Result<(), HashTreeError> {
1898 let hex = to_hex(hash);
1899 if visited.contains(&hex) {
1900 return Ok(());
1901 }
1902 visited.insert(hex);
1903
1904 let data = match store.get(hash).await.map_err(|e| HashTreeError::Store(e.to_string()))? {
1905 Some(d) => d,
1906 None => {
1907 missing.push(*hash);
1908 return Ok(());
1909 }
1910 };
1911
1912 if is_tree_node(&data) {
1913 let node = decode_tree_node(&data)?;
1914 for link in &node.links {
1915 Box::pin(verify_recursive(store.clone(), &link.hash, missing, visited)).await?;
1916 }
1917 }
1918
1919 Ok(())
1920}
1921
1922#[cfg(test)]
1923mod tests {
1924 use super::*;
1925 use crate::store::MemoryStore;
1926
1927 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
1928 let store = Arc::new(MemoryStore::new());
1929 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
1931 (store, tree)
1932 }
1933
1934 #[tokio::test]
1935 async fn test_put_and_read_blob() {
1936 let (_store, tree) = make_tree();
1937
1938 let data = vec![1, 2, 3, 4, 5];
1939 let hash = tree.put_blob(&data).await.unwrap();
1940
1941 let result = tree.get_blob(&hash).await.unwrap();
1942 assert_eq!(result, Some(data));
1943 }
1944
1945 #[tokio::test]
1946 async fn test_put_and_read_file_small() {
1947 let (_store, tree) = make_tree();
1948
1949 let data = b"Hello, World!";
1950 let (cid, size) = tree.put_file(data).await.unwrap();
1951
1952 assert_eq!(size, data.len() as u64);
1953
1954 let read_data = tree.read_file(&cid.hash).await.unwrap();
1955 assert_eq!(read_data, Some(data.to_vec()));
1956 }
1957
1958 #[tokio::test]
1959 async fn test_put_and_read_directory() {
1960 let (_store, tree) = make_tree();
1961
1962 let file1 = tree.put_blob(b"content1").await.unwrap();
1963 let file2 = tree.put_blob(b"content2").await.unwrap();
1964
1965 let dir_cid = tree
1966 .put_directory(
1967 vec![
1968 DirEntry::new("a.txt", file1).with_size(8),
1969 DirEntry::new("b.txt", file2).with_size(8),
1970 ],
1971 )
1972 .await
1973 .unwrap();
1974
1975 let entries = tree.list_directory(&dir_cid).await.unwrap();
1976 assert_eq!(entries.len(), 2);
1977 let names: Vec<_> = entries.iter().map(|e| e.name.as_str()).collect();
1978 assert!(names.contains(&"a.txt"));
1979 assert!(names.contains(&"b.txt"));
1980 }
1981
1982 #[tokio::test]
1983 async fn test_is_directory() {
1984 let (_store, tree) = make_tree();
1985
1986 let file_hash = tree.put_blob(b"data").await.unwrap();
1987 let dir_cid = tree.put_directory(vec![]).await.unwrap();
1988
1989 assert!(!tree.is_directory(&file_hash).await.unwrap());
1990 assert!(tree.is_directory(&dir_cid.hash).await.unwrap());
1991 }
1992
1993 #[tokio::test]
1994 async fn test_resolve_path() {
1995 let (_store, tree) = make_tree();
1996
1997 let file_hash = tree.put_blob(b"nested").await.unwrap();
1998 let sub_dir = tree.put_directory(
1999 vec![DirEntry::new("file.txt", file_hash).with_size(6)],
2000 ).await.unwrap();
2001 let root_dir = tree.put_directory(
2002 vec![DirEntry::new("subdir", sub_dir.hash)],
2003 ).await.unwrap();
2004
2005 let resolved = tree.resolve_path(&root_dir, "subdir/file.txt").await.unwrap();
2006 assert_eq!(resolved.map(|c| c.hash), Some(file_hash));
2007 }
2008
2009 #[tokio::test]
2012 async fn test_unified_put_get_public() {
2013 let store = Arc::new(MemoryStore::new());
2014 let tree = HashTree::new(HashTreeConfig::new(store).public());
2016
2017 let data = b"Hello, public world!";
2018 let (cid, size) = tree.put(data).await.unwrap();
2019
2020 assert_eq!(size, data.len() as u64);
2021 assert!(cid.key.is_none()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
2024 assert_eq!(retrieved, data);
2025 }
2026
2027 #[tokio::test]
2028 async fn test_unified_put_get_encrypted() {
2029 let store = Arc::new(MemoryStore::new());
2030 let tree = HashTree::new(HashTreeConfig::new(store));
2032
2033 let data = b"Hello, encrypted world!";
2034 let (cid, size) = tree.put(data).await.unwrap();
2035
2036 assert_eq!(size, data.len() as u64);
2037 assert!(cid.key.is_some()); let retrieved = tree.get(&cid).await.unwrap().unwrap();
2040 assert_eq!(retrieved, data);
2041 }
2042
2043 #[tokio::test]
2044 async fn test_unified_put_get_encrypted_chunked() {
2045 let store = Arc::new(MemoryStore::new());
2046 let tree = HashTree::new(HashTreeConfig::new(store).with_chunk_size(100));
2047
2048 let data: Vec<u8> = (0..500).map(|i| (i % 256) as u8).collect();
2050 let (cid, size) = tree.put(&data).await.unwrap();
2051
2052 assert_eq!(size, data.len() as u64);
2053 assert!(cid.key.is_some());
2054
2055 let retrieved = tree.get(&cid).await.unwrap().unwrap();
2056 assert_eq!(retrieved, data);
2057 }
2058
2059 #[tokio::test]
2060 async fn test_cid_deterministic() {
2061 let store = Arc::new(MemoryStore::new());
2062 let tree = HashTree::new(HashTreeConfig::new(store));
2063
2064 let data = b"Same content produces same CID";
2065
2066 let (cid1, _) = tree.put(data).await.unwrap();
2067 let (cid2, _) = tree.put(data).await.unwrap();
2068
2069 assert_eq!(cid1.hash, cid2.hash);
2071 assert_eq!(cid1.key, cid2.key);
2072 assert_eq!(cid1.to_string(), cid2.to_string());
2073 }
2074
2075 #[tokio::test]
2076 async fn test_cid_to_string_public() {
2077 let store = Arc::new(MemoryStore::new());
2078 let tree = HashTree::new(HashTreeConfig::new(store).public());
2079
2080 let (cid, _) = tree.put(b"test").await.unwrap();
2081 let s = cid.to_string();
2082
2083 assert_eq!(s.len(), 64);
2085 assert!(!s.contains(':'));
2086 }
2087
2088 #[tokio::test]
2089 async fn test_cid_to_string_encrypted() {
2090 let store = Arc::new(MemoryStore::new());
2091 let tree = HashTree::new(HashTreeConfig::new(store));
2092
2093 let (cid, _) = tree.put(b"test").await.unwrap();
2094 let s = cid.to_string();
2095
2096 assert_eq!(s.len(), 129);
2098 assert!(s.contains(':'));
2099 }
2100}