1use std::collections::BTreeMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::crypto::{PublicKey, Secret, SecretError, SecretKey, SecretShare};
10use crate::linked_data::{BlockEncoded, CodecError, Link};
11use crate::peer::{BlobsStore, BlobsStoreError};
12
13use super::conflict::MergeResult;
14use super::manifest::{Manifest, ManifestError, Share};
15use super::node::{Node, NodeError, NodeLink};
16use super::path_ops::{OpType, PathOpLog};
17use super::pins::Pins;
18use super::principal::PrincipalRole;
19
20pub fn clean_path(path: &Path) -> PathBuf {
21 if !path.is_absolute() {
22 panic!("path is not absolute");
23 }
24 path.iter()
25 .skip(1)
26 .map(|part| part.to_string_lossy().to_string())
27 .collect::<PathBuf>()
28}
29
30#[derive(Clone)]
31pub struct MountInner {
32 pub link: Link,
34 pub manifest: Manifest,
36 pub entry: Node,
38 pub pins: Pins,
40 pub height: u64,
42 pub ops_log: PathOpLog,
44 pub peer_id: PublicKey,
46 pub secret_key: SecretKey,
48}
49
50impl MountInner {
51 pub fn link(&self) -> &Link {
52 &self.link
53 }
54 pub fn entry(&self) -> &Node {
55 &self.entry
56 }
57 pub fn manifest(&self) -> &Manifest {
58 &self.manifest
59 }
60 pub fn pins(&self) -> &Pins {
61 &self.pins
62 }
63 pub fn height(&self) -> u64 {
64 self.height
65 }
66 pub fn ops_log(&self) -> &PathOpLog {
67 &self.ops_log
68 }
69 pub fn peer_id(&self) -> &PublicKey {
70 &self.peer_id
71 }
72}
73
74#[derive(Clone)]
75pub struct Mount(Arc<Mutex<MountInner>>, BlobsStore);
76
77#[derive(Debug, thiserror::Error)]
78pub enum MountError {
79 #[error("default error: {0}")]
80 Default(#[from] anyhow::Error),
81 #[error("link not found")]
82 LinkNotFound(Link),
83 #[error("path not found: {0}")]
84 PathNotFound(PathBuf),
85 #[error("path is not a node: {0}")]
86 PathNotNode(PathBuf),
87 #[error("path already exists: {0}")]
88 PathAlreadyExists(PathBuf),
89 #[error("cannot move '{from}' to '{to}': destination is inside source")]
90 MoveIntoSelf { from: PathBuf, to: PathBuf },
91 #[error("blobs store error: {0}")]
92 BlobsStore(#[from] BlobsStoreError),
93 #[error("secret error: {0}")]
94 Secret(#[from] SecretError),
95 #[error("node error: {0}")]
96 Node(#[from] NodeError),
97 #[error("codec error: {0}")]
98 Codec(#[from] CodecError),
99 #[error("share error: {0}")]
100 Share(#[from] crate::crypto::SecretShareError),
101 #[error("manifest error: {0}")]
102 Manifest(#[from] ManifestError),
103 #[error("peers share was not found")]
104 ShareNotFound,
105 #[error("mirror cannot mount: bucket is not published")]
106 MirrorCannotMount,
107 #[error("unauthorized: only owners can perform this operation")]
108 Unauthorized,
109}
110
111impl Mount {
112 pub async fn inner(&self) -> MountInner {
113 self.0.lock().await.clone()
114 }
115
116 pub fn blobs(&self) -> BlobsStore {
117 self.1.clone()
118 }
119
120 pub async fn link(&self) -> Link {
121 let inner = self.0.lock().await;
122 inner.link.clone()
123 }
124
125 pub async fn save(
130 &self,
131 blobs: &BlobsStore,
132 publish: bool,
133 ) -> Result<(Link, Link, u64), MountError> {
134 let (
136 entry_node,
137 mut pins,
138 previous_link,
139 previous_height,
140 manifest_template,
141 ops_log,
142 secret_key,
143 ) = {
144 let inner = self.0.lock().await;
145 (
146 inner.entry.clone(),
147 inner.pins.clone(),
148 inner.link.clone(),
149 inner.height,
150 inner.manifest.clone(),
151 inner.ops_log.clone(),
152 inner.secret_key.clone(),
153 )
154 };
155
156 let height = previous_height + 1;
158
159 let secret = Secret::generate();
161
162 let entry = Self::_put_node_in_blobs(&entry_node, &secret, blobs).await?;
164
165 pins.insert(entry.clone().hash());
168 pins.insert(previous_link.hash());
169
170 let ops_log_link = if !ops_log.is_empty() {
172 let link = Self::_put_ops_log_in_blobs(&ops_log, &secret, blobs).await?;
173 pins.insert(link.hash());
174 Some(link)
175 } else {
176 None
177 };
178
179 let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
180
181 let mut manifest = manifest_template;
183 for share in manifest.shares_mut().values_mut() {
184 if *share.role() == PrincipalRole::Owner {
185 let secret_share = SecretShare::new(&secret, &share.principal().identity)?;
186 share.set_share(secret_share);
187 }
188 }
189
190 if publish {
192 manifest.publish(&secret);
193 } else {
194 manifest.unpublish();
197 }
198 manifest.set_pins(pins_link.clone());
199 manifest.set_previous(previous_link.clone());
200 manifest.set_entry(entry.clone());
201 manifest.set_height(height);
202
203 manifest.clear_ops_log();
206 if let Some(ops_link) = ops_log_link {
207 manifest.set_ops_log(ops_link);
208 }
209
210 manifest.sign(&secret_key)?;
212
213 let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
215
216 {
218 let mut inner = self.0.lock().await;
219 inner.manifest = manifest;
220 inner.height = height;
221 inner.link = link.clone();
222 inner.ops_log.clear_preserving_clock();
226 }
227
228 Ok((link, previous_link, height))
229 }
230
231 pub async fn init(
232 id: Uuid,
233 name: String,
234 owner: &SecretKey,
235 blobs: &BlobsStore,
236 ) -> Result<Self, MountError> {
237 let entry = Node::default();
239 let secret = Secret::generate();
241 let entry_link = Self::_put_node_in_blobs(&entry, &secret, blobs).await?;
243 let share = SecretShare::new(&secret, &owner.public())?;
245 let mut pins = Pins::new();
247 pins.insert(entry_link.hash());
248 let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
250 let mut manifest = Manifest::new(
252 id,
253 name.clone(),
254 owner.public(),
255 share,
256 entry_link.clone(),
257 pins_link.clone(),
258 0, );
260 manifest.sign(owner)?;
262 let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
263
264 Ok(Mount(
266 Arc::new(Mutex::new(MountInner {
267 link,
268 manifest,
269 entry,
270 pins,
271 height: 0,
272 ops_log: PathOpLog::new(),
273 peer_id: owner.public(),
274 secret_key: owner.clone(),
275 })),
276 blobs.clone(),
277 ))
278 }
279
280 pub async fn load(
281 link: &Link,
282 secret_key: &SecretKey,
283 blobs: &BlobsStore,
284 ) -> Result<Self, MountError> {
285 let public_key = &secret_key.public();
286 let manifest = Self::_get_manifest_from_blobs(link, blobs).await?;
287
288 let bucket_share = match manifest.get_share(public_key) {
289 Some(share) => share,
290 None => return Err(MountError::ShareNotFound),
291 };
292
293 let secret = match bucket_share.role() {
295 PrincipalRole::Owner => {
296 let share = bucket_share.share().ok_or(MountError::ShareNotFound)?;
298 share.recover(secret_key)?
299 }
300 PrincipalRole::Mirror => {
301 manifest
303 .public()
304 .cloned()
305 .ok_or(MountError::MirrorCannotMount)?
306 }
307 };
308
309 let pins = Self::_get_pins_from_blobs(manifest.pins(), blobs).await?;
310 let entry = Self::_get_node_from_blobs(
311 &NodeLink::Dir(manifest.entry().clone(), secret.clone()),
312 blobs,
313 )
314 .await?;
315
316 let height = manifest.height();
318
319 let ops_log = if let Some(ops_link) = manifest.ops_log() {
321 let mut log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
322 log.rebuild_clock();
324 log
325 } else {
326 PathOpLog::new()
327 };
328
329 Ok(Mount(
330 Arc::new(Mutex::new(MountInner {
331 link: link.clone(),
332 manifest,
333 entry,
334 pins,
335 height,
336 ops_log,
337 peer_id: secret_key.public(),
338 secret_key: secret_key.clone(),
339 })),
340 blobs.clone(),
341 ))
342 }
343
344 pub async fn load_manifest(link: &Link, blobs: &BlobsStore) -> Result<Manifest, MountError> {
349 Self::_get_manifest_from_blobs(link, blobs).await
350 }
351
352 pub async fn add_owner(&mut self, peer: PublicKey) -> Result<(), MountError> {
355 let mut inner = self.0.lock().await;
356 let secret_share = SecretShare::new(&Secret::default(), &peer)?;
357 inner
358 .manifest
359 .add_share(Share::new_owner(secret_share, peer));
360 Ok(())
361 }
362
363 pub async fn add_mirror(&mut self, peer: PublicKey) {
366 let mut inner = self.0.lock().await;
367 inner.manifest.add_share(Share::new_mirror(peer));
368 }
369
370 pub async fn remove_share(&self, peer_public_key: PublicKey) -> Result<(), MountError> {
375 let mut inner = self.0.lock().await;
376
377 let our_key = inner.secret_key.public();
379 let our_share = inner
380 .manifest
381 .get_share(&our_key)
382 .ok_or(MountError::ShareNotFound)?;
383 if *our_share.role() != PrincipalRole::Owner {
384 return Err(MountError::Unauthorized);
385 }
386
387 let key_hex = peer_public_key.to_hex();
389 if inner.manifest.shares_mut().remove(&key_hex).is_none() {
390 return Err(MountError::ShareNotFound);
391 }
392
393 Ok(())
394 }
395
396 pub async fn is_published(&self) -> bool {
398 let inner = self.0.lock().await;
399 inner.manifest.is_published()
400 }
401
402 pub async fn publish(&self) -> Result<(Link, Link, u64), MountError> {
406 self.save(&self.1, true).await
407 }
408
409 pub async fn add<R>(&mut self, path: &Path, data: R) -> Result<(), MountError>
410 where
411 R: Read + Send + Sync + 'static + Unpin,
412 {
413 let secret = Secret::generate();
414
415 let encrypted_reader = secret.encrypt_reader(data)?;
416
417 use bytes::Bytes;
419 use futures::stream;
420 let encrypted_bytes = {
421 let mut buf = Vec::new();
422 let mut reader = encrypted_reader;
423 reader.read_to_end(&mut buf).map_err(SecretError::Io)?;
424 buf
425 };
426
427 let stream = Box::pin(stream::once(async move {
428 Ok::<_, std::io::Error>(Bytes::from(encrypted_bytes))
429 }));
430
431 let hash = self.1.put_stream(stream).await?;
432
433 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
434
435 let node_link = NodeLink::new_data_from_path(link.clone(), secret, path);
436
437 let root_node = {
438 let inner = self.0.lock().await;
439 inner.entry.clone()
440 };
441
442 let (updated_link, node_hashes) =
443 Self::_set_node_link_at_path(root_node, node_link, path, &self.1).await?;
444
445 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
447 Some(
448 Self::_get_node_from_blobs(
449 &NodeLink::Dir(new_root_link.clone(), new_secret),
450 &self.1,
451 )
452 .await?,
453 )
454 } else {
455 None
456 };
457
458 {
460 let mut inner = self.0.lock().await;
461 inner.pins.insert(hash);
463 inner.pins.extend(node_hashes);
464
465 if let Some(entry) = new_entry {
466 inner.entry = entry;
467 }
468
469 let peer_id = inner.peer_id;
471 inner
472 .ops_log
473 .record(peer_id, OpType::Add, clean_path(path), Some(link), false);
474 }
475
476 Ok(())
477 }
478
479 pub async fn rm(&mut self, path: &Path) -> Result<(), MountError> {
480 let path = clean_path(path);
481 let parent_path = path
482 .parent()
483 .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot remove root")))?;
484
485 let entry = {
486 let inner = self.0.lock().await;
487 inner.entry.clone()
488 };
489
490 let mut parent_node = if parent_path == Path::new("") {
491 entry.clone()
492 } else {
493 Self::_get_node_at_path(&entry, parent_path, &self.1).await?
494 };
495
496 let file_name = path.file_name().unwrap().to_string_lossy().to_string();
497
498 let removed_link = parent_node.del(&file_name);
500 if removed_link.is_none() {
501 return Err(MountError::PathNotFound(path.to_path_buf()));
502 }
503 let is_dir = removed_link.map(|l| l.is_dir()).unwrap_or(false);
504
505 let removed_path = path.to_path_buf();
507
508 if parent_path == Path::new("") {
509 let secret = Secret::generate();
510 let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
511
512 let mut inner = self.0.lock().await;
513 inner.pins.insert(link.hash());
515 inner.entry = parent_node;
516 } else {
517 let secret = Secret::generate();
519 let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
520 let node_link = NodeLink::new_dir(parent_link.clone(), secret);
521
522 let abs_parent_path = Path::new("/").join(parent_path);
524 let (updated_link, node_hashes) =
525 Self::_set_node_link_at_path(entry, node_link, &abs_parent_path, &self.1).await?;
526
527 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
528 Some(
529 Self::_get_node_from_blobs(
530 &NodeLink::Dir(new_root_link.clone(), new_secret),
531 &self.1,
532 )
533 .await?,
534 )
535 } else {
536 None
537 };
538
539 let mut inner = self.0.lock().await;
540 inner.pins.insert(parent_link.hash());
542 inner.pins.extend(node_hashes);
543
544 if let Some(new_entry) = new_entry {
545 inner.entry = new_entry;
546 }
547 }
548
549 {
551 let mut inner = self.0.lock().await;
552 let peer_id = inner.peer_id;
553 inner
554 .ops_log
555 .record(peer_id, OpType::Remove, removed_path, None, is_dir);
556 }
557
558 Ok(())
559 }
560
561 pub async fn mkdir(&mut self, path: &Path) -> Result<(), MountError> {
562 let path = clean_path(path);
563
564 let entry = {
566 let inner = self.0.lock().await;
567 inner.entry.clone()
568 };
569
570 let (parent_path, dir_name) = if let Some(parent) = path.parent() {
572 (
573 parent,
574 path.file_name().unwrap().to_string_lossy().to_string(),
575 )
576 } else {
577 return Err(MountError::Default(anyhow::anyhow!("Cannot mkdir at root")));
578 };
579
580 let parent_node = if parent_path == Path::new("") {
582 entry.clone()
583 } else {
584 match Self::_get_node_at_path(&entry, parent_path, &self.1).await {
586 Ok(node) => node,
587 Err(MountError::PathNotFound(_)) => Node::default(), Err(err) => return Err(err),
589 }
590 };
591
592 if parent_node.get_link(&dir_name).is_some() {
594 return Err(MountError::PathAlreadyExists(Path::new("/").join(&path)));
595 }
596
597 let new_dir_node = Node::default();
599
600 let secret = Secret::generate();
602
603 let dir_link = Self::_put_node_in_blobs(&new_dir_node, &secret, &self.1).await?;
605
606 let node_link = NodeLink::new_dir(dir_link.clone(), secret);
608
609 let abs_path = Path::new("/").join(&path);
611
612 let (updated_link, node_hashes) =
614 Self::_set_node_link_at_path(entry, node_link, &abs_path, &self.1).await?;
615
616 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
618 Some(
619 Self::_get_node_from_blobs(
620 &NodeLink::Dir(new_root_link.clone(), new_secret),
621 &self.1,
622 )
623 .await?,
624 )
625 } else {
626 None
627 };
628
629 {
631 let mut inner = self.0.lock().await;
632 inner.pins.insert(dir_link.hash());
634 inner.pins.extend(node_hashes);
635
636 if let Some(new_entry) = new_entry {
637 inner.entry = new_entry;
638 }
639
640 let peer_id = inner.peer_id;
642 inner
643 .ops_log
644 .record(peer_id, OpType::Mkdir, path.to_path_buf(), None, true);
645 }
646
647 Ok(())
648 }
649
650 pub async fn mv(&mut self, from: &Path, to: &Path) -> Result<(), MountError> {
669 let from_clean = clean_path(from);
672 let to_clean = clean_path(to);
673
674 if to.starts_with(from) {
685 return Err(MountError::MoveIntoSelf {
686 from: from.to_path_buf(),
687 to: to.to_path_buf(),
688 });
689 }
690
691 let node_link = self.get(from).await?;
698 let is_dir = node_link.is_dir();
699
700 let from_path = from_clean.to_path_buf();
702 let to_path = to_clean.to_path_buf();
703
704 if self.get(to).await.is_ok() {
710 return Err(MountError::PathAlreadyExists(to.to_path_buf()));
711 }
712
713 {
723 let parent_path = from_clean
725 .parent()
726 .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot move root")))?;
727
728 let entry = {
730 let inner = self.0.lock().await;
731 inner.entry.clone()
732 };
733
734 let mut parent_node = if parent_path == Path::new("") {
736 entry.clone()
738 } else {
739 Self::_get_node_at_path(&entry, parent_path, &self.1).await?
741 };
742
743 let file_name = from_clean
745 .file_name()
746 .expect(
747 "from_clean has no filename - this should be impossible after parent() check",
748 )
749 .to_string_lossy()
750 .to_string();
751
752 if parent_node.del(&file_name).is_none() {
754 return Err(MountError::PathNotFound(from_clean.to_path_buf()));
755 }
756
757 if parent_path == Path::new("") {
759 let secret = Secret::generate();
761 let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
762
763 let mut inner = self.0.lock().await;
764 inner.pins.insert(link.hash());
765 inner.entry = parent_node;
766 } else {
767 let secret = Secret::generate();
771 let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
772 let new_node_link = NodeLink::new_dir(parent_link.clone(), secret);
773
774 let abs_parent_path = Path::new("/").join(parent_path);
776 let (updated_root_link, node_hashes) =
777 Self::_set_node_link_at_path(entry, new_node_link, &abs_parent_path, &self.1)
778 .await?;
779
780 let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
784
785 let mut inner = self.0.lock().await;
787 inner.pins.insert(parent_link.hash());
788 inner.pins.extend(node_hashes);
789 inner.entry = new_entry;
790 }
791 }
792
793 let entry = {
800 let inner = self.0.lock().await;
801 inner.entry.clone()
802 };
803
804 let (updated_root_link, node_hashes) =
805 Self::_set_node_link_at_path(entry, node_link, to, &self.1).await?;
806
807 {
811 let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
813
814 let mut inner = self.0.lock().await;
815 inner.pins.extend(node_hashes);
816 inner.entry = new_entry;
817
818 let peer_id = inner.peer_id;
822 inner.ops_log.record(
823 peer_id,
824 OpType::Mv { from: from_path },
825 to_path,
826 None,
827 is_dir,
828 );
829 }
830
831 Ok(())
832 }
833
834 pub async fn ls(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
835 let mut items = BTreeMap::new();
836 let path = clean_path(path);
837
838 let inner = self.0.lock().await;
839 let root_node = inner.entry.clone();
840 drop(inner);
841
842 let node = if path == Path::new("") {
843 root_node
844 } else {
845 match Self::_get_node_at_path(&root_node, &path, &self.1).await {
846 Ok(node) => node,
847 Err(MountError::LinkNotFound(_)) => {
848 return Err(MountError::PathNotNode(path.to_path_buf()))
849 }
850 Err(err) => return Err(err),
851 }
852 };
853
854 for (name, link) in node.get_links() {
855 let mut full_path = path.clone();
856 full_path.push(name);
857 items.insert(full_path, link.clone());
858 }
859
860 Ok(items)
861 }
862
863 pub async fn ls_deep(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
864 let base_path = clean_path(path);
865 self._ls_deep(path, &base_path).await
866 }
867
868 async fn _ls_deep(
869 &self,
870 path: &Path,
871 base_path: &Path,
872 ) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
873 let mut all_items = BTreeMap::new();
874
875 let items = self.ls(path).await?;
877
878 for (item_path, link) in items {
879 let relative_path = if base_path == Path::new("") {
881 item_path.clone()
882 } else {
883 item_path
884 .strip_prefix(base_path)
885 .unwrap_or(&item_path)
886 .to_path_buf()
887 };
888 all_items.insert(relative_path.clone(), link.clone());
889
890 if link.is_dir() {
891 let abs_item_path = Path::new("/").join(&item_path);
893 let sub_items = Box::pin(self._ls_deep(&abs_item_path, base_path)).await?;
894
895 for (sub_path, sub_link) in sub_items {
897 all_items.insert(sub_path, sub_link);
898 }
899 }
900 }
901
902 Ok(all_items)
903 }
904
905 #[allow(clippy::await_holding_lock)]
906 pub async fn cat(&self, path: &Path) -> Result<Vec<u8>, MountError> {
907 let path = clean_path(path);
908
909 let inner = self.0.lock().await;
910 let root_node = inner.entry.clone();
911 drop(inner);
912
913 let (parent_path, file_name) = if let Some(parent) = path.parent() {
914 (
915 parent,
916 path.file_name().unwrap().to_string_lossy().to_string(),
917 )
918 } else {
919 return Err(MountError::PathNotFound(path.to_path_buf()));
920 };
921
922 let parent_node = if parent_path == Path::new("") {
923 root_node
924 } else {
925 Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
926 };
927
928 let link = parent_node
929 .get_link(&file_name)
930 .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))?;
931
932 match link {
933 NodeLink::Data(link, secret, _) => {
934 let encrypted_data = self.1.get(&link.hash()).await?;
935 let data = secret.decrypt(&encrypted_data)?;
936 Ok(data)
937 }
938 NodeLink::Dir(_, _) => Err(MountError::PathNotNode(path.to_path_buf())),
939 }
940 }
941
942 #[allow(clippy::await_holding_lock)]
944 pub async fn get(&self, path: &Path) -> Result<NodeLink, MountError> {
945 let path = clean_path(path);
946
947 let inner = self.0.lock().await;
948 let root_node = inner.entry.clone();
949 drop(inner);
950
951 let (parent_path, file_name) = if let Some(parent) = path.parent() {
952 (
953 parent,
954 path.file_name().unwrap().to_string_lossy().to_string(),
955 )
956 } else {
957 return Err(MountError::PathNotFound(path.to_path_buf()));
958 };
959
960 let parent_node = if parent_path == Path::new("") {
961 root_node
962 } else {
963 Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
964 };
965
966 parent_node
967 .get_link(&file_name)
968 .cloned()
969 .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))
970 }
971
972 async fn _get_node_at_path(
973 node: &Node,
974 path: &Path,
975 blobs: &BlobsStore,
976 ) -> Result<Node, MountError> {
977 let mut current_node = node.clone();
978 let mut consumed_path = PathBuf::from("/");
979
980 for part in path.iter() {
981 consumed_path.push(part);
982 let next = part.to_string_lossy().to_string();
983 let next_link = current_node
984 .get_link(&next)
985 .ok_or(MountError::PathNotFound(consumed_path.clone()))?;
986 current_node = Self::_get_node_from_blobs(next_link, blobs).await?
987 }
988 Ok(current_node)
989 }
990
991 pub async fn _set_node_link_at_path(
992 node: Node,
993 node_link: NodeLink,
994 path: &Path,
995 blobs: &BlobsStore,
996 ) -> Result<(NodeLink, Vec<crate::linked_data::Hash>), MountError> {
997 let path = clean_path(path);
998 let mut visited_nodes = Vec::new();
999 let mut name = path.file_name().unwrap().to_string_lossy().to_string();
1000 let parent_path = path.parent().unwrap_or(Path::new(""));
1001
1002 let mut consumed_path = PathBuf::from("/");
1003 let mut node = node;
1004 visited_nodes.push((consumed_path.clone(), node.clone()));
1005
1006 for part in parent_path.iter() {
1007 let next = part.to_string_lossy().to_string();
1008 let next_link = node.get_link(&next);
1009 if let Some(next_link) = next_link {
1010 consumed_path.push(part);
1011 match next_link {
1012 NodeLink::Dir(..) => {
1013 node = Self::_get_node_from_blobs(next_link, blobs).await?
1014 }
1015 NodeLink::Data(..) => {
1016 return Err(MountError::PathNotNode(consumed_path.clone()));
1017 }
1018 }
1019 visited_nodes.push((consumed_path.clone(), node.clone()));
1020 } else {
1021 node = Node::default();
1023 consumed_path.push(part);
1024 visited_nodes.push((consumed_path.clone(), node.clone()));
1025 }
1026 }
1027
1028 let mut node_link = node_link;
1029 let mut created_hashes = Vec::new();
1030 for (path, mut node) in visited_nodes.into_iter().rev() {
1031 node.insert(name, node_link.clone());
1032 let secret = Secret::generate();
1033 let link = Self::_put_node_in_blobs(&node, &secret, blobs).await?;
1034 created_hashes.push(link.hash());
1035 node_link = NodeLink::Dir(link, secret);
1036 name = path
1037 .file_name()
1038 .unwrap_or_default()
1039 .to_string_lossy()
1040 .to_string();
1041 }
1042
1043 Ok((node_link, created_hashes))
1044 }
1045
1046 async fn _get_manifest_from_blobs(
1047 link: &Link,
1048 blobs: &BlobsStore,
1049 ) -> Result<Manifest, MountError> {
1050 tracing::debug!(
1051 "_get_bucket_from_blobs: Checking for bucket data at link {:?}",
1052 link
1053 );
1054 let hash = link.hash();
1055 tracing::debug!("_get_bucket_from_blobs: Bucket hash: {}", hash);
1056
1057 match blobs.stat(&hash).await {
1058 Ok(true) => {
1059 tracing::debug!(
1060 "_get_bucket_from_blobs: Bucket hash {} exists in blobs",
1061 hash
1062 );
1063 }
1064 Ok(false) => {
1065 tracing::error!("_get_bucket_from_blobs: Bucket hash {} NOT FOUND in blobs - LinkNotFound error!", hash);
1066 return Err(MountError::LinkNotFound(link.clone()));
1067 }
1068 Err(e) => {
1069 tracing::error!(
1070 "_get_bucket_from_blobs: Error checking bucket hash {}: {}",
1071 hash,
1072 e
1073 );
1074 return Err(e.into());
1075 }
1076 }
1077
1078 tracing::debug!("_get_bucket_from_blobs: Reading bucket data from blobs");
1079 let data = blobs.get(&hash).await?;
1080 tracing::debug!(
1081 "_get_bucket_from_blobs: Got {} bytes of bucket data",
1082 data.len()
1083 );
1084
1085 let bucket_data = Manifest::decode(&data)?;
1086 tracing::debug!(
1087 "_get_bucket_from_blobs: Successfully decoded BucketData for bucket '{}'",
1088 bucket_data.name()
1089 );
1090
1091 Ok(bucket_data)
1092 }
1093
1094 pub async fn _get_pins_from_blobs(link: &Link, blobs: &BlobsStore) -> Result<Pins, MountError> {
1095 tracing::debug!("_get_pins_from_blobs: Checking for pins at link {:?}", link);
1096 let hash = link.hash();
1097 tracing::debug!("_get_pins_from_blobs: Pins hash: {}", hash);
1098
1099 match blobs.stat(&hash).await {
1100 Ok(true) => {
1101 tracing::debug!("_get_pins_from_blobs: Pins hash {} exists in blobs", hash);
1102 }
1103 Ok(false) => {
1104 tracing::error!(
1105 "_get_pins_from_blobs: Pins hash {} NOT FOUND in blobs - LinkNotFound error!",
1106 hash
1107 );
1108 return Err(MountError::LinkNotFound(link.clone()));
1109 }
1110 Err(e) => {
1111 tracing::error!(
1112 "_get_pins_from_blobs: Error checking pins hash {}: {}",
1113 hash,
1114 e
1115 );
1116 return Err(e.into());
1117 }
1118 }
1119
1120 tracing::debug!("_get_pins_from_blobs: Reading hash list from blobs");
1121 let hashes = blobs.read_hash_list(hash).await?;
1123 tracing::debug!(
1124 "_get_pins_from_blobs: Successfully read {} hashes from pinset",
1125 hashes.len()
1126 );
1127
1128 Ok(Pins::from_vec(hashes))
1129 }
1130
1131 async fn _get_node_from_blobs(
1132 node_link: &NodeLink,
1133 blobs: &BlobsStore,
1134 ) -> Result<Node, MountError> {
1135 let link = node_link.link();
1136 let secret = node_link.secret();
1137 let hash = link.hash();
1138
1139 tracing::debug!("_get_node_from_blobs: Checking for node at hash {}", hash);
1140
1141 match blobs.stat(&hash).await {
1142 Ok(true) => {
1143 tracing::debug!("_get_node_from_blobs: Node hash {} exists in blobs", hash);
1144 }
1145 Ok(false) => {
1146 tracing::error!(
1147 "_get_node_from_blobs: Node hash {} NOT FOUND in blobs - LinkNotFound error!",
1148 hash
1149 );
1150 return Err(MountError::LinkNotFound(link.clone()));
1151 }
1152 Err(e) => {
1153 tracing::error!(
1154 "_get_node_from_blobs: Error checking node hash {}: {}",
1155 hash,
1156 e
1157 );
1158 return Err(e.into());
1159 }
1160 }
1161
1162 tracing::debug!("_get_node_from_blobs: Reading encrypted node blob");
1163 let blob = blobs.get(&hash).await?;
1164 tracing::debug!(
1165 "_get_node_from_blobs: Got {} bytes of encrypted node data",
1166 blob.len()
1167 );
1168
1169 tracing::debug!("_get_node_from_blobs: Decrypting node data");
1170 let data = secret.decrypt(&blob)?;
1171 tracing::debug!("_get_node_from_blobs: Decrypted {} bytes", data.len());
1172
1173 let node = Node::decode(&data)?;
1174 tracing::debug!("_get_node_from_blobs: Successfully decoded Node");
1175
1176 Ok(node)
1177 }
1178
1179 async fn _put_node_in_blobs(
1183 node: &Node,
1184 secret: &Secret,
1185 blobs: &BlobsStore,
1186 ) -> Result<Link, MountError> {
1187 let _data = node.encode()?;
1188 let data = secret.encrypt(&_data)?;
1189 let hash = blobs.put(data).await?;
1190 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1193 Ok(link)
1194 }
1195
1196 pub async fn _put_manifest_in_blobs(
1197 bucket_data: &Manifest,
1198 blobs: &BlobsStore,
1199 ) -> Result<Link, MountError> {
1200 let data = bucket_data.encode()?;
1201 let hash = blobs.put(data).await?;
1202 let link = Link::new(bucket_data.codec(), hash);
1205 Ok(link)
1206 }
1207
1208 pub async fn _put_pins_in_blobs(pins: &Pins, blobs: &BlobsStore) -> Result<Link, MountError> {
1209 let hash = blobs.create_hash_list(pins.iter().copied()).await?;
1211 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1214 Ok(link)
1215 }
1216
1217 async fn _get_ops_log_from_blobs(
1218 link: &Link,
1219 secret: &Secret,
1220 blobs: &BlobsStore,
1221 ) -> Result<PathOpLog, MountError> {
1222 let hash = link.hash();
1223 tracing::debug!(
1224 "_get_ops_log_from_blobs: Checking for ops log at hash {}",
1225 hash
1226 );
1227
1228 match blobs.stat(&hash).await {
1229 Ok(true) => {
1230 tracing::debug!(
1231 "_get_ops_log_from_blobs: Ops log hash {} exists in blobs",
1232 hash
1233 );
1234 }
1235 Ok(false) => {
1236 tracing::error!(
1237 "_get_ops_log_from_blobs: Ops log hash {} NOT FOUND in blobs - LinkNotFound error!",
1238 hash
1239 );
1240 return Err(MountError::LinkNotFound(link.clone()));
1241 }
1242 Err(e) => {
1243 tracing::error!(
1244 "_get_ops_log_from_blobs: Error checking ops log hash {}: {}",
1245 hash,
1246 e
1247 );
1248 return Err(e.into());
1249 }
1250 }
1251
1252 tracing::debug!("_get_ops_log_from_blobs: Reading encrypted ops log blob");
1253 let blob = blobs.get(&hash).await?;
1254 tracing::debug!(
1255 "_get_ops_log_from_blobs: Got {} bytes of encrypted ops log data",
1256 blob.len()
1257 );
1258
1259 tracing::debug!("_get_ops_log_from_blobs: Decrypting ops log data");
1260 let data = secret.decrypt(&blob)?;
1261 tracing::debug!("_get_ops_log_from_blobs: Decrypted {} bytes", data.len());
1262
1263 let ops_log = PathOpLog::decode(&data)?;
1264 tracing::debug!(
1265 "_get_ops_log_from_blobs: Successfully decoded PathOpLog with {} operations",
1266 ops_log.len()
1267 );
1268
1269 Ok(ops_log)
1270 }
1271
1272 async fn _put_ops_log_in_blobs(
1273 ops_log: &PathOpLog,
1274 secret: &Secret,
1275 blobs: &BlobsStore,
1276 ) -> Result<Link, MountError> {
1277 let _data = ops_log.encode()?;
1278 let data = secret.encrypt(&_data)?;
1279 let hash = blobs.put(data).await?;
1280 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1282 tracing::debug!(
1283 "_put_ops_log_in_blobs: Stored ops log with {} operations at hash {}",
1284 ops_log.len(),
1285 hash
1286 );
1287 Ok(link)
1288 }
1289
1290 pub async fn collect_ops_since(
1306 &self,
1307 ancestor_link: Option<&Link>,
1308 blobs: &BlobsStore,
1309 ) -> Result<PathOpLog, MountError> {
1310 let inner = self.0.lock().await;
1311 let secret_key = inner.secret_key.clone();
1312 let current_link = inner.link.clone();
1313 let current_ops = inner.ops_log.clone();
1314 drop(inner);
1315
1316 let mut all_logs: Vec<PathOpLog> = Vec::new();
1317
1318 if !current_ops.is_empty() {
1320 all_logs.push(current_ops);
1321 }
1322
1323 let mut link = current_link;
1325
1326 loop {
1327 if let Some(ancestor) = ancestor_link {
1329 if &link == ancestor {
1330 break;
1331 }
1332 }
1333
1334 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1336
1337 let secret = match self.get_secret_for_manifest(&manifest, &secret_key) {
1341 Ok(s) => s,
1342 Err(MountError::ShareNotFound) => {
1343 tracing::debug!(
1344 "collect_ops_since: stopping at link {} - no share for current user",
1345 link.hash()
1346 );
1347 break;
1348 }
1349 Err(e) => return Err(e),
1350 };
1351
1352 if let Some(ops_link) = manifest.ops_log() {
1354 let mut ops_log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
1355 ops_log.rebuild_clock();
1356 all_logs.push(ops_log);
1357 }
1358
1359 match manifest.previous() {
1361 Some(prev) => link = prev.clone(),
1362 None => break, }
1364 }
1365
1366 all_logs.reverse();
1368 let mut merged = PathOpLog::new();
1369 for log in all_logs {
1370 merged.merge(&log);
1371 }
1372
1373 Ok(merged)
1374 }
1375
1376 #[allow(clippy::result_large_err)]
1380 fn get_secret_for_manifest(
1381 &self,
1382 manifest: &Manifest,
1383 secret_key: &SecretKey,
1384 ) -> Result<Secret, MountError> {
1385 let public_key = secret_key.public();
1386 let share = manifest
1387 .get_share(&public_key)
1388 .ok_or(MountError::ShareNotFound)?;
1389
1390 match share.role() {
1391 PrincipalRole::Owner => {
1392 let secret_share = share.share().ok_or(MountError::ShareNotFound)?;
1393 Ok(secret_share.recover(secret_key)?)
1394 }
1395 PrincipalRole::Mirror => manifest
1396 .public()
1397 .cloned()
1398 .ok_or(MountError::MirrorCannotMount),
1399 }
1400 }
1401
1402 pub async fn find_common_ancestor(
1416 &self,
1417 other: &Mount,
1418 blobs: &BlobsStore,
1419 ) -> Result<Option<Link>, MountError> {
1420 let mut self_chain: std::collections::HashSet<Link> = std::collections::HashSet::new();
1422
1423 let self_link = self.link().await;
1424 let mut link = self_link;
1425
1426 loop {
1427 self_chain.insert(link.clone());
1428 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1429 match manifest.previous() {
1430 Some(prev) => link = prev.clone(),
1431 None => break,
1432 }
1433 }
1434
1435 let other_link = other.link().await;
1437 let mut link = other_link;
1438
1439 loop {
1440 if self_chain.contains(&link) {
1441 return Ok(Some(link));
1442 }
1443 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1444 match manifest.previous() {
1445 Some(prev) => link = prev.clone(),
1446 None => break,
1447 }
1448 }
1449
1450 Ok(None)
1452 }
1453
1454 async fn apply_resolved_state(&mut self, merged_ops: &PathOpLog) -> Result<(), MountError> {
1460 let resolved_state = merged_ops.resolve_all();
1461
1462 for (path, op) in &resolved_state {
1463 if op.content_link.is_some() {
1464 let abs_path = Path::new("/").join(path);
1466 match self.get(&abs_path).await {
1467 Ok(_) => {
1468 }
1470 Err(MountError::PathNotFound(_)) => {
1471 tracing::warn!(
1473 "apply_resolved_state: cannot recreate file {} - no secret",
1474 path.display()
1475 );
1476 let mut inner = self.0.lock().await;
1478 inner.ops_log.merge(&PathOpLog::from_operation(op));
1479 }
1480 Err(e) => return Err(e),
1481 }
1482 } else if op.is_dir && matches!(op.op_type, super::path_ops::OpType::Mkdir) {
1483 let abs_path = Path::new("/").join(path);
1485 match self.mkdir(&abs_path).await {
1486 Ok(()) => {}
1487 Err(MountError::PathAlreadyExists(_)) => {}
1488 Err(e) => return Err(e),
1489 }
1490 }
1491 }
1492
1493 Ok(())
1494 }
1495
1496 pub async fn merge_from<R: super::ConflictResolver>(
1515 &mut self,
1516 incoming: &Mount,
1517 resolver: &R,
1518 blobs: &BlobsStore,
1519 ) -> Result<(MergeResult, Link), MountError> {
1520 let ancestor = self.find_common_ancestor(incoming, blobs).await?;
1522
1523 let local_ops = self.collect_ops_since(ancestor.as_ref(), blobs).await?;
1525 let incoming_ops = incoming.collect_ops_since(ancestor.as_ref(), blobs).await?;
1526
1527 let peer_id = {
1529 let inner = self.0.lock().await;
1530 inner.peer_id
1531 };
1532
1533 let mut merged_ops = local_ops.clone();
1535 let merge_result = merged_ops.merge_with_resolver(&incoming_ops, resolver, &peer_id);
1536
1537 self.apply_resolved_state(&merged_ops).await?;
1539
1540 {
1542 let mut inner = self.0.lock().await;
1543 inner.ops_log.merge(&merged_ops);
1544 }
1545
1546 let (link, _, _) = self.save(blobs, false).await?;
1548
1549 Ok((merge_result, link))
1550 }
1551}