1use std::collections::BTreeMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::crypto::{PublicKey, Secret, SecretError, SecretKey, SecretShare};
10use crate::linked_data::{BlockEncoded, CodecError, Link};
11use crate::peer::{BlobsStore, BlobsStoreError};
12
13use super::conflict::MergeResult;
14use super::manifest::{Manifest, ManifestError, Share};
15use super::node::{Node, NodeError, NodeLink};
16use super::path_ops::{OpType, PathOpLog};
17use super::pins::Pins;
18use super::principal::PrincipalRole;
19
20pub fn clean_path(path: &Path) -> PathBuf {
21 if !path.is_absolute() {
22 panic!("path is not absolute");
23 }
24 path.iter()
25 .skip(1)
26 .map(|part| part.to_string_lossy().to_string())
27 .collect::<PathBuf>()
28}
29
30#[derive(Clone)]
31pub struct MountInner {
32 pub link: Link,
34 pub manifest: Manifest,
36 pub entry: Node,
38 pub pins: Pins,
40 pub height: u64,
42 pub ops_log: PathOpLog,
44 pub peer_id: PublicKey,
46 pub secret_key: SecretKey,
48}
49
50impl MountInner {
51 pub fn link(&self) -> &Link {
52 &self.link
53 }
54 pub fn entry(&self) -> &Node {
55 &self.entry
56 }
57 pub fn manifest(&self) -> &Manifest {
58 &self.manifest
59 }
60 pub fn pins(&self) -> &Pins {
61 &self.pins
62 }
63 pub fn height(&self) -> u64 {
64 self.height
65 }
66 pub fn ops_log(&self) -> &PathOpLog {
67 &self.ops_log
68 }
69 pub fn peer_id(&self) -> &PublicKey {
70 &self.peer_id
71 }
72}
73
74#[derive(Clone)]
75pub struct Mount(Arc<Mutex<MountInner>>, BlobsStore);
76
77#[derive(Debug, thiserror::Error)]
78pub enum MountError {
79 #[error("default error: {0}")]
80 Default(#[from] anyhow::Error),
81 #[error("link not found")]
82 LinkNotFound(Link),
83 #[error("path not found: {0}")]
84 PathNotFound(PathBuf),
85 #[error("path is not a node: {0}")]
86 PathNotNode(PathBuf),
87 #[error("path already exists: {0}")]
88 PathAlreadyExists(PathBuf),
89 #[error("cannot move '{from}' to '{to}': destination is inside source")]
90 MoveIntoSelf { from: PathBuf, to: PathBuf },
91 #[error("blobs store error: {0}")]
92 BlobsStore(#[from] BlobsStoreError),
93 #[error("secret error: {0}")]
94 Secret(#[from] SecretError),
95 #[error("node error: {0}")]
96 Node(#[from] NodeError),
97 #[error("codec error: {0}")]
98 Codec(#[from] CodecError),
99 #[error("share error: {0}")]
100 Share(#[from] crate::crypto::SecretShareError),
101 #[error("manifest error: {0}")]
102 Manifest(#[from] ManifestError),
103 #[error("peers share was not found")]
104 ShareNotFound,
105 #[error("mirror cannot mount: bucket is not published")]
106 MirrorCannotMount,
107 #[error("unauthorized: only owners can perform this operation")]
108 Unauthorized,
109}
110
111impl Mount {
112 pub async fn inner(&self) -> MountInner {
113 self.0.lock().await.clone()
114 }
115
116 pub fn blobs(&self) -> BlobsStore {
117 self.1.clone()
118 }
119
120 pub async fn link(&self) -> Link {
121 let inner = self.0.lock().await;
122 inner.link.clone()
123 }
124
125 pub async fn save(
132 &self,
133 blobs: &BlobsStore,
134 publish: Option<bool>,
135 ) -> Result<(Link, Link, u64), MountError> {
136 let (
138 entry_node,
139 mut pins,
140 previous_link,
141 previous_height,
142 manifest_template,
143 ops_log,
144 secret_key,
145 ) = {
146 let inner = self.0.lock().await;
147 (
148 inner.entry.clone(),
149 inner.pins.clone(),
150 inner.link.clone(),
151 inner.height,
152 inner.manifest.clone(),
153 inner.ops_log.clone(),
154 inner.secret_key.clone(),
155 )
156 };
157
158 let height = previous_height + 1;
160
161 let secret = Secret::generate();
163
164 let entry = Self::_put_node_in_blobs(&entry_node, &secret, blobs).await?;
166
167 pins.insert(entry.clone().hash());
170 pins.insert(previous_link.hash());
171
172 let ops_log_link = if !ops_log.is_empty() {
174 let link = Self::_put_ops_log_in_blobs(&ops_log, &secret, blobs).await?;
175 pins.insert(link.hash());
176 Some(link)
177 } else {
178 None
179 };
180
181 let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
182
183 let mut manifest = manifest_template;
185 for share in manifest.shares_mut().values_mut() {
186 if *share.role() == PrincipalRole::Owner {
187 let secret_share = SecretShare::new(&secret, &share.principal().identity)?;
188 share.set_share(secret_share);
189 }
190 }
191
192 let should_publish = publish.unwrap_or(manifest.is_published());
195 if should_publish {
196 manifest.publish(&secret);
197 } else {
198 manifest.unpublish();
199 }
200 manifest.set_pins(pins_link.clone());
201 manifest.set_previous(previous_link.clone());
202 manifest.set_entry(entry.clone());
203 manifest.set_height(height);
204
205 manifest.clear_ops_log();
208 if let Some(ops_link) = ops_log_link {
209 manifest.set_ops_log(ops_link);
210 }
211
212 manifest.sign(&secret_key)?;
214
215 let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
217
218 {
220 let mut inner = self.0.lock().await;
221 inner.manifest = manifest;
222 inner.height = height;
223 inner.link = link.clone();
224 inner.ops_log.clear_preserving_clock();
228 }
229
230 Ok((link, previous_link, height))
231 }
232
233 pub async fn init(
234 id: Uuid,
235 name: String,
236 owner: &SecretKey,
237 blobs: &BlobsStore,
238 ) -> Result<Self, MountError> {
239 let entry = Node::default();
241 let secret = Secret::generate();
243 let entry_link = Self::_put_node_in_blobs(&entry, &secret, blobs).await?;
245 let share = SecretShare::new(&secret, &owner.public())?;
247 let mut pins = Pins::new();
249 pins.insert(entry_link.hash());
250 let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
252 let mut manifest = Manifest::new(
254 id,
255 name.clone(),
256 owner.public(),
257 share,
258 entry_link.clone(),
259 pins_link.clone(),
260 0, );
262 manifest.sign(owner)?;
264 let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
265
266 Ok(Mount(
268 Arc::new(Mutex::new(MountInner {
269 link,
270 manifest,
271 entry,
272 pins,
273 height: 0,
274 ops_log: PathOpLog::new(),
275 peer_id: owner.public(),
276 secret_key: owner.clone(),
277 })),
278 blobs.clone(),
279 ))
280 }
281
282 pub async fn load(
283 link: &Link,
284 secret_key: &SecretKey,
285 blobs: &BlobsStore,
286 ) -> Result<Self, MountError> {
287 let public_key = &secret_key.public();
288 let manifest = Self::_get_manifest_from_blobs(link, blobs).await?;
289
290 let bucket_share = match manifest.get_share(public_key) {
291 Some(share) => share,
292 None => return Err(MountError::ShareNotFound),
293 };
294
295 let secret = match bucket_share.role() {
297 PrincipalRole::Owner => {
298 let share = bucket_share.share().ok_or(MountError::ShareNotFound)?;
300 share.recover(secret_key)?
301 }
302 PrincipalRole::Mirror => {
303 manifest
305 .public()
306 .cloned()
307 .ok_or(MountError::MirrorCannotMount)?
308 }
309 };
310
311 let pins = Self::_get_pins_from_blobs(manifest.pins(), blobs).await?;
312 let entry = Self::_get_node_from_blobs(
313 &NodeLink::Dir(manifest.entry().clone(), secret.clone()),
314 blobs,
315 )
316 .await?;
317
318 let height = manifest.height();
320
321 let ops_log = if let Some(ops_link) = manifest.ops_log() {
323 let mut log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
324 log.rebuild_clock();
326 log
327 } else {
328 PathOpLog::new()
329 };
330
331 Ok(Mount(
332 Arc::new(Mutex::new(MountInner {
333 link: link.clone(),
334 manifest,
335 entry,
336 pins,
337 height,
338 ops_log,
339 peer_id: secret_key.public(),
340 secret_key: secret_key.clone(),
341 })),
342 blobs.clone(),
343 ))
344 }
345
346 pub async fn load_manifest(link: &Link, blobs: &BlobsStore) -> Result<Manifest, MountError> {
351 Self::_get_manifest_from_blobs(link, blobs).await
352 }
353
354 pub async fn add_owner(&mut self, peer: PublicKey) -> Result<(), MountError> {
357 let mut inner = self.0.lock().await;
358 let secret_share = SecretShare::new(&Secret::default(), &peer)?;
359 inner
360 .manifest
361 .add_share(Share::new_owner(secret_share, peer));
362 Ok(())
363 }
364
365 pub async fn add_mirror(&mut self, peer: PublicKey) {
368 let mut inner = self.0.lock().await;
369 inner.manifest.add_share(Share::new_mirror(peer));
370 }
371
372 pub async fn remove_share(&self, peer_public_key: PublicKey) -> Result<(), MountError> {
377 let mut inner = self.0.lock().await;
378
379 let our_key = inner.secret_key.public();
381 let our_share = inner
382 .manifest
383 .get_share(&our_key)
384 .ok_or(MountError::ShareNotFound)?;
385 if *our_share.role() != PrincipalRole::Owner {
386 return Err(MountError::Unauthorized);
387 }
388
389 let key_hex = peer_public_key.to_hex();
391 if inner.manifest.shares_mut().remove(&key_hex).is_none() {
392 return Err(MountError::ShareNotFound);
393 }
394
395 Ok(())
396 }
397
398 pub async fn is_published(&self) -> bool {
400 let inner = self.0.lock().await;
401 inner.manifest.is_published()
402 }
403
404 pub async fn publish(&self) -> Result<(Link, Link, u64), MountError> {
406 self.save(&self.1, Some(true)).await
407 }
408
409 pub async fn unpublish(&self) -> Result<(Link, Link, u64), MountError> {
411 self.save(&self.1, Some(false)).await
412 }
413
414 pub async fn add<R>(&mut self, path: &Path, data: R) -> Result<(), MountError>
415 where
416 R: Read + Send + Sync + 'static + Unpin,
417 {
418 let secret = Secret::generate();
419
420 let encrypted_reader = secret.encrypt_reader(data)?;
421
422 use bytes::Bytes;
424 use futures::stream;
425 let encrypted_bytes = {
426 let mut buf = Vec::new();
427 let mut reader = encrypted_reader;
428 reader.read_to_end(&mut buf).map_err(SecretError::Io)?;
429 buf
430 };
431
432 let stream = Box::pin(stream::once(async move {
433 Ok::<_, std::io::Error>(Bytes::from(encrypted_bytes))
434 }));
435
436 let hash = self.1.put_stream(stream).await?;
437
438 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
439
440 let node_link = NodeLink::new_data_from_path(link.clone(), secret, path);
441
442 let root_node = {
443 let inner = self.0.lock().await;
444 inner.entry.clone()
445 };
446
447 let (updated_link, node_hashes) =
448 Self::_set_node_link_at_path(root_node, node_link, path, &self.1).await?;
449
450 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
452 Some(
453 Self::_get_node_from_blobs(
454 &NodeLink::Dir(new_root_link.clone(), new_secret),
455 &self.1,
456 )
457 .await?,
458 )
459 } else {
460 None
461 };
462
463 {
465 let mut inner = self.0.lock().await;
466 inner.pins.insert(hash);
468 inner.pins.extend(node_hashes);
469
470 if let Some(entry) = new_entry {
471 inner.entry = entry;
472 }
473
474 let peer_id = inner.peer_id;
476 inner
477 .ops_log
478 .record(peer_id, OpType::Add, clean_path(path), Some(link), false);
479 }
480
481 Ok(())
482 }
483
484 pub async fn rm(&mut self, path: &Path) -> Result<(), MountError> {
485 let path = clean_path(path);
486 let parent_path = path
487 .parent()
488 .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot remove root")))?;
489
490 let entry = {
491 let inner = self.0.lock().await;
492 inner.entry.clone()
493 };
494
495 let mut parent_node = if parent_path == Path::new("") {
496 entry.clone()
497 } else {
498 Self::_get_node_at_path(&entry, parent_path, &self.1).await?
499 };
500
501 let file_name = path.file_name().unwrap().to_string_lossy().to_string();
502
503 let removed_link = parent_node.del(&file_name);
505 if removed_link.is_none() {
506 return Err(MountError::PathNotFound(path.to_path_buf()));
507 }
508 let is_dir = removed_link.map(|l| l.is_dir()).unwrap_or(false);
509
510 let removed_path = path.to_path_buf();
512
513 if parent_path == Path::new("") {
514 let secret = Secret::generate();
515 let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
516
517 let mut inner = self.0.lock().await;
518 inner.pins.insert(link.hash());
520 inner.entry = parent_node;
521 } else {
522 let secret = Secret::generate();
524 let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
525 let node_link = NodeLink::new_dir(parent_link.clone(), secret);
526
527 let abs_parent_path = Path::new("/").join(parent_path);
529 let (updated_link, node_hashes) =
530 Self::_set_node_link_at_path(entry, node_link, &abs_parent_path, &self.1).await?;
531
532 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
533 Some(
534 Self::_get_node_from_blobs(
535 &NodeLink::Dir(new_root_link.clone(), new_secret),
536 &self.1,
537 )
538 .await?,
539 )
540 } else {
541 None
542 };
543
544 let mut inner = self.0.lock().await;
545 inner.pins.insert(parent_link.hash());
547 inner.pins.extend(node_hashes);
548
549 if let Some(new_entry) = new_entry {
550 inner.entry = new_entry;
551 }
552 }
553
554 {
556 let mut inner = self.0.lock().await;
557 let peer_id = inner.peer_id;
558 inner
559 .ops_log
560 .record(peer_id, OpType::Remove, removed_path, None, is_dir);
561 }
562
563 Ok(())
564 }
565
566 pub async fn mkdir(&mut self, path: &Path) -> Result<(), MountError> {
567 let path = clean_path(path);
568
569 let entry = {
571 let inner = self.0.lock().await;
572 inner.entry.clone()
573 };
574
575 let (parent_path, dir_name) = if let Some(parent) = path.parent() {
577 (
578 parent,
579 path.file_name().unwrap().to_string_lossy().to_string(),
580 )
581 } else {
582 return Err(MountError::Default(anyhow::anyhow!("Cannot mkdir at root")));
583 };
584
585 let parent_node = if parent_path == Path::new("") {
587 entry.clone()
588 } else {
589 match Self::_get_node_at_path(&entry, parent_path, &self.1).await {
591 Ok(node) => node,
592 Err(MountError::PathNotFound(_)) => Node::default(), Err(err) => return Err(err),
594 }
595 };
596
597 if parent_node.get_link(&dir_name).is_some() {
599 return Err(MountError::PathAlreadyExists(Path::new("/").join(&path)));
600 }
601
602 let new_dir_node = Node::default();
604
605 let secret = Secret::generate();
607
608 let dir_link = Self::_put_node_in_blobs(&new_dir_node, &secret, &self.1).await?;
610
611 let node_link = NodeLink::new_dir(dir_link.clone(), secret);
613
614 let abs_path = Path::new("/").join(&path);
616
617 let (updated_link, node_hashes) =
619 Self::_set_node_link_at_path(entry, node_link, &abs_path, &self.1).await?;
620
621 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
623 Some(
624 Self::_get_node_from_blobs(
625 &NodeLink::Dir(new_root_link.clone(), new_secret),
626 &self.1,
627 )
628 .await?,
629 )
630 } else {
631 None
632 };
633
634 {
636 let mut inner = self.0.lock().await;
637 inner.pins.insert(dir_link.hash());
639 inner.pins.extend(node_hashes);
640
641 if let Some(new_entry) = new_entry {
642 inner.entry = new_entry;
643 }
644
645 let peer_id = inner.peer_id;
647 inner
648 .ops_log
649 .record(peer_id, OpType::Mkdir, path.to_path_buf(), None, true);
650 }
651
652 Ok(())
653 }
654
655 pub async fn mv(&mut self, from: &Path, to: &Path) -> Result<(), MountError> {
674 let from_clean = clean_path(from);
677 let to_clean = clean_path(to);
678
679 if to.starts_with(from) {
690 return Err(MountError::MoveIntoSelf {
691 from: from.to_path_buf(),
692 to: to.to_path_buf(),
693 });
694 }
695
696 let node_link = self.get(from).await?;
703 let is_dir = node_link.is_dir();
704
705 let from_path = from_clean.to_path_buf();
707 let to_path = to_clean.to_path_buf();
708
709 if self.get(to).await.is_ok() {
715 return Err(MountError::PathAlreadyExists(to.to_path_buf()));
716 }
717
718 {
728 let parent_path = from_clean
730 .parent()
731 .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot move root")))?;
732
733 let entry = {
735 let inner = self.0.lock().await;
736 inner.entry.clone()
737 };
738
739 let mut parent_node = if parent_path == Path::new("") {
741 entry.clone()
743 } else {
744 Self::_get_node_at_path(&entry, parent_path, &self.1).await?
746 };
747
748 let file_name = from_clean
750 .file_name()
751 .expect(
752 "from_clean has no filename - this should be impossible after parent() check",
753 )
754 .to_string_lossy()
755 .to_string();
756
757 if parent_node.del(&file_name).is_none() {
759 return Err(MountError::PathNotFound(from_clean.to_path_buf()));
760 }
761
762 if parent_path == Path::new("") {
764 let secret = Secret::generate();
766 let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
767
768 let mut inner = self.0.lock().await;
769 inner.pins.insert(link.hash());
770 inner.entry = parent_node;
771 } else {
772 let secret = Secret::generate();
776 let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
777 let new_node_link = NodeLink::new_dir(parent_link.clone(), secret);
778
779 let abs_parent_path = Path::new("/").join(parent_path);
781 let (updated_root_link, node_hashes) =
782 Self::_set_node_link_at_path(entry, new_node_link, &abs_parent_path, &self.1)
783 .await?;
784
785 let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
789
790 let mut inner = self.0.lock().await;
792 inner.pins.insert(parent_link.hash());
793 inner.pins.extend(node_hashes);
794 inner.entry = new_entry;
795 }
796 }
797
798 let entry = {
805 let inner = self.0.lock().await;
806 inner.entry.clone()
807 };
808
809 let (updated_root_link, node_hashes) =
810 Self::_set_node_link_at_path(entry, node_link, to, &self.1).await?;
811
812 {
816 let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
818
819 let mut inner = self.0.lock().await;
820 inner.pins.extend(node_hashes);
821 inner.entry = new_entry;
822
823 let peer_id = inner.peer_id;
827 inner.ops_log.record(
828 peer_id,
829 OpType::Mv { from: from_path },
830 to_path,
831 None,
832 is_dir,
833 );
834 }
835
836 Ok(())
837 }
838
839 pub async fn ls(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
840 let mut items = BTreeMap::new();
841 let path = clean_path(path);
842
843 let inner = self.0.lock().await;
844 let root_node = inner.entry.clone();
845 drop(inner);
846
847 let node = if path == Path::new("") {
848 root_node
849 } else {
850 match Self::_get_node_at_path(&root_node, &path, &self.1).await {
851 Ok(node) => node,
852 Err(MountError::LinkNotFound(_)) => {
853 return Err(MountError::PathNotNode(path.to_path_buf()))
854 }
855 Err(err) => return Err(err),
856 }
857 };
858
859 for (name, link) in node.get_links() {
860 let mut full_path = path.clone();
861 full_path.push(name);
862 items.insert(full_path, link.clone());
863 }
864
865 Ok(items)
866 }
867
868 pub async fn ls_deep(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
869 let base_path = clean_path(path);
870 self._ls_deep(path, &base_path).await
871 }
872
873 async fn _ls_deep(
874 &self,
875 path: &Path,
876 base_path: &Path,
877 ) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
878 let mut all_items = BTreeMap::new();
879
880 let items = self.ls(path).await?;
882
883 for (item_path, link) in items {
884 let relative_path = if base_path == Path::new("") {
886 item_path.clone()
887 } else {
888 item_path
889 .strip_prefix(base_path)
890 .unwrap_or(&item_path)
891 .to_path_buf()
892 };
893 all_items.insert(relative_path.clone(), link.clone());
894
895 if link.is_dir() {
896 let abs_item_path = Path::new("/").join(&item_path);
898 let sub_items = Box::pin(self._ls_deep(&abs_item_path, base_path)).await?;
899
900 for (sub_path, sub_link) in sub_items {
902 all_items.insert(sub_path, sub_link);
903 }
904 }
905 }
906
907 Ok(all_items)
908 }
909
910 #[allow(clippy::await_holding_lock)]
911 pub async fn cat(&self, path: &Path) -> Result<Vec<u8>, MountError> {
912 let path = clean_path(path);
913
914 let inner = self.0.lock().await;
915 let root_node = inner.entry.clone();
916 drop(inner);
917
918 let (parent_path, file_name) = if let Some(parent) = path.parent() {
919 (
920 parent,
921 path.file_name().unwrap().to_string_lossy().to_string(),
922 )
923 } else {
924 return Err(MountError::PathNotFound(path.to_path_buf()));
925 };
926
927 let parent_node = if parent_path == Path::new("") {
928 root_node
929 } else {
930 Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
931 };
932
933 let link = parent_node
934 .get_link(&file_name)
935 .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))?;
936
937 match link {
938 NodeLink::Data(link, secret, _) => {
939 let encrypted_data = self.1.get(&link.hash()).await?;
940 let data = secret.decrypt(&encrypted_data)?;
941 Ok(data)
942 }
943 NodeLink::Dir(_, _) => Err(MountError::PathNotNode(path.to_path_buf())),
944 }
945 }
946
947 #[allow(clippy::await_holding_lock)]
949 pub async fn get(&self, path: &Path) -> Result<NodeLink, MountError> {
950 let path = clean_path(path);
951
952 let inner = self.0.lock().await;
953 let root_node = inner.entry.clone();
954 drop(inner);
955
956 let (parent_path, file_name) = if let Some(parent) = path.parent() {
957 (
958 parent,
959 path.file_name().unwrap().to_string_lossy().to_string(),
960 )
961 } else {
962 return Err(MountError::PathNotFound(path.to_path_buf()));
963 };
964
965 let parent_node = if parent_path == Path::new("") {
966 root_node
967 } else {
968 Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
969 };
970
971 parent_node
972 .get_link(&file_name)
973 .cloned()
974 .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))
975 }
976
977 async fn _get_node_at_path(
978 node: &Node,
979 path: &Path,
980 blobs: &BlobsStore,
981 ) -> Result<Node, MountError> {
982 let mut current_node = node.clone();
983 let mut consumed_path = PathBuf::from("/");
984
985 for part in path.iter() {
986 consumed_path.push(part);
987 let next = part.to_string_lossy().to_string();
988 let next_link = current_node
989 .get_link(&next)
990 .ok_or(MountError::PathNotFound(consumed_path.clone()))?;
991 current_node = Self::_get_node_from_blobs(next_link, blobs).await?
992 }
993 Ok(current_node)
994 }
995
996 pub async fn _set_node_link_at_path(
997 node: Node,
998 node_link: NodeLink,
999 path: &Path,
1000 blobs: &BlobsStore,
1001 ) -> Result<(NodeLink, Vec<crate::linked_data::Hash>), MountError> {
1002 let path = clean_path(path);
1003 let mut visited_nodes = Vec::new();
1004 let mut name = path.file_name().unwrap().to_string_lossy().to_string();
1005 let parent_path = path.parent().unwrap_or(Path::new(""));
1006
1007 let mut consumed_path = PathBuf::from("/");
1008 let mut node = node;
1009 visited_nodes.push((consumed_path.clone(), node.clone()));
1010
1011 for part in parent_path.iter() {
1012 let next = part.to_string_lossy().to_string();
1013 let next_link = node.get_link(&next);
1014 if let Some(next_link) = next_link {
1015 consumed_path.push(part);
1016 match next_link {
1017 NodeLink::Dir(..) => {
1018 node = Self::_get_node_from_blobs(next_link, blobs).await?
1019 }
1020 NodeLink::Data(..) => {
1021 return Err(MountError::PathNotNode(consumed_path.clone()));
1022 }
1023 }
1024 visited_nodes.push((consumed_path.clone(), node.clone()));
1025 } else {
1026 node = Node::default();
1028 consumed_path.push(part);
1029 visited_nodes.push((consumed_path.clone(), node.clone()));
1030 }
1031 }
1032
1033 let mut node_link = node_link;
1034 let mut created_hashes = Vec::new();
1035 for (path, mut node) in visited_nodes.into_iter().rev() {
1036 node.insert(name, node_link.clone());
1037 let secret = Secret::generate();
1038 let link = Self::_put_node_in_blobs(&node, &secret, blobs).await?;
1039 created_hashes.push(link.hash());
1040 node_link = NodeLink::Dir(link, secret);
1041 name = path
1042 .file_name()
1043 .unwrap_or_default()
1044 .to_string_lossy()
1045 .to_string();
1046 }
1047
1048 Ok((node_link, created_hashes))
1049 }
1050
1051 async fn _get_manifest_from_blobs(
1052 link: &Link,
1053 blobs: &BlobsStore,
1054 ) -> Result<Manifest, MountError> {
1055 tracing::debug!(
1056 "_get_bucket_from_blobs: Checking for bucket data at link {:?}",
1057 link
1058 );
1059 let hash = link.hash();
1060 tracing::debug!("_get_bucket_from_blobs: Bucket hash: {}", hash);
1061
1062 match blobs.stat(&hash).await {
1063 Ok(true) => {
1064 tracing::debug!(
1065 "_get_bucket_from_blobs: Bucket hash {} exists in blobs",
1066 hash
1067 );
1068 }
1069 Ok(false) => {
1070 tracing::error!("_get_bucket_from_blobs: Bucket hash {} NOT FOUND in blobs - LinkNotFound error!", hash);
1071 return Err(MountError::LinkNotFound(link.clone()));
1072 }
1073 Err(e) => {
1074 tracing::error!(
1075 "_get_bucket_from_blobs: Error checking bucket hash {}: {}",
1076 hash,
1077 e
1078 );
1079 return Err(e.into());
1080 }
1081 }
1082
1083 tracing::debug!("_get_bucket_from_blobs: Reading bucket data from blobs");
1084 let data = blobs.get(&hash).await?;
1085 tracing::debug!(
1086 "_get_bucket_from_blobs: Got {} bytes of bucket data",
1087 data.len()
1088 );
1089
1090 let bucket_data = Manifest::decode(&data)?;
1091 tracing::debug!(
1092 "_get_bucket_from_blobs: Successfully decoded BucketData for bucket '{}'",
1093 bucket_data.name()
1094 );
1095
1096 Ok(bucket_data)
1097 }
1098
1099 pub async fn _get_pins_from_blobs(link: &Link, blobs: &BlobsStore) -> Result<Pins, MountError> {
1100 tracing::debug!("_get_pins_from_blobs: Checking for pins at link {:?}", link);
1101 let hash = link.hash();
1102 tracing::debug!("_get_pins_from_blobs: Pins hash: {}", hash);
1103
1104 match blobs.stat(&hash).await {
1105 Ok(true) => {
1106 tracing::debug!("_get_pins_from_blobs: Pins hash {} exists in blobs", hash);
1107 }
1108 Ok(false) => {
1109 tracing::error!(
1110 "_get_pins_from_blobs: Pins hash {} NOT FOUND in blobs - LinkNotFound error!",
1111 hash
1112 );
1113 return Err(MountError::LinkNotFound(link.clone()));
1114 }
1115 Err(e) => {
1116 tracing::error!(
1117 "_get_pins_from_blobs: Error checking pins hash {}: {}",
1118 hash,
1119 e
1120 );
1121 return Err(e.into());
1122 }
1123 }
1124
1125 tracing::debug!("_get_pins_from_blobs: Reading hash list from blobs");
1126 let hashes = blobs.read_hash_list(hash).await?;
1128 tracing::debug!(
1129 "_get_pins_from_blobs: Successfully read {} hashes from pinset",
1130 hashes.len()
1131 );
1132
1133 Ok(Pins::from_vec(hashes))
1134 }
1135
1136 async fn _get_node_from_blobs(
1137 node_link: &NodeLink,
1138 blobs: &BlobsStore,
1139 ) -> Result<Node, MountError> {
1140 let link = node_link.link();
1141 let secret = node_link.secret();
1142 let hash = link.hash();
1143
1144 tracing::debug!("_get_node_from_blobs: Checking for node at hash {}", hash);
1145
1146 match blobs.stat(&hash).await {
1147 Ok(true) => {
1148 tracing::debug!("_get_node_from_blobs: Node hash {} exists in blobs", hash);
1149 }
1150 Ok(false) => {
1151 tracing::error!(
1152 "_get_node_from_blobs: Node hash {} NOT FOUND in blobs - LinkNotFound error!",
1153 hash
1154 );
1155 return Err(MountError::LinkNotFound(link.clone()));
1156 }
1157 Err(e) => {
1158 tracing::error!(
1159 "_get_node_from_blobs: Error checking node hash {}: {}",
1160 hash,
1161 e
1162 );
1163 return Err(e.into());
1164 }
1165 }
1166
1167 tracing::debug!("_get_node_from_blobs: Reading encrypted node blob");
1168 let blob = blobs.get(&hash).await?;
1169 tracing::debug!(
1170 "_get_node_from_blobs: Got {} bytes of encrypted node data",
1171 blob.len()
1172 );
1173
1174 tracing::debug!("_get_node_from_blobs: Decrypting node data");
1175 let data = secret.decrypt(&blob)?;
1176 tracing::debug!("_get_node_from_blobs: Decrypted {} bytes", data.len());
1177
1178 let node = Node::decode(&data)?;
1179 tracing::debug!("_get_node_from_blobs: Successfully decoded Node");
1180
1181 Ok(node)
1182 }
1183
1184 async fn _put_node_in_blobs(
1188 node: &Node,
1189 secret: &Secret,
1190 blobs: &BlobsStore,
1191 ) -> Result<Link, MountError> {
1192 let _data = node.encode()?;
1193 let data = secret.encrypt(&_data)?;
1194 let hash = blobs.put(data).await?;
1195 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1198 Ok(link)
1199 }
1200
1201 pub async fn _put_manifest_in_blobs(
1202 bucket_data: &Manifest,
1203 blobs: &BlobsStore,
1204 ) -> Result<Link, MountError> {
1205 let data = bucket_data.encode()?;
1206 let hash = blobs.put(data).await?;
1207 let link = Link::new(bucket_data.codec(), hash);
1210 Ok(link)
1211 }
1212
1213 pub async fn _put_pins_in_blobs(pins: &Pins, blobs: &BlobsStore) -> Result<Link, MountError> {
1214 let hash = blobs.create_hash_list(pins.iter().copied()).await?;
1216 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1219 Ok(link)
1220 }
1221
1222 async fn _get_ops_log_from_blobs(
1223 link: &Link,
1224 secret: &Secret,
1225 blobs: &BlobsStore,
1226 ) -> Result<PathOpLog, MountError> {
1227 let hash = link.hash();
1228 tracing::debug!(
1229 "_get_ops_log_from_blobs: Checking for ops log at hash {}",
1230 hash
1231 );
1232
1233 match blobs.stat(&hash).await {
1234 Ok(true) => {
1235 tracing::debug!(
1236 "_get_ops_log_from_blobs: Ops log hash {} exists in blobs",
1237 hash
1238 );
1239 }
1240 Ok(false) => {
1241 tracing::error!(
1242 "_get_ops_log_from_blobs: Ops log hash {} NOT FOUND in blobs - LinkNotFound error!",
1243 hash
1244 );
1245 return Err(MountError::LinkNotFound(link.clone()));
1246 }
1247 Err(e) => {
1248 tracing::error!(
1249 "_get_ops_log_from_blobs: Error checking ops log hash {}: {}",
1250 hash,
1251 e
1252 );
1253 return Err(e.into());
1254 }
1255 }
1256
1257 tracing::debug!("_get_ops_log_from_blobs: Reading encrypted ops log blob");
1258 let blob = blobs.get(&hash).await?;
1259 tracing::debug!(
1260 "_get_ops_log_from_blobs: Got {} bytes of encrypted ops log data",
1261 blob.len()
1262 );
1263
1264 tracing::debug!("_get_ops_log_from_blobs: Decrypting ops log data");
1265 let data = secret.decrypt(&blob)?;
1266 tracing::debug!("_get_ops_log_from_blobs: Decrypted {} bytes", data.len());
1267
1268 let ops_log = PathOpLog::decode(&data)?;
1269 tracing::debug!(
1270 "_get_ops_log_from_blobs: Successfully decoded PathOpLog with {} operations",
1271 ops_log.len()
1272 );
1273
1274 Ok(ops_log)
1275 }
1276
1277 async fn _put_ops_log_in_blobs(
1278 ops_log: &PathOpLog,
1279 secret: &Secret,
1280 blobs: &BlobsStore,
1281 ) -> Result<Link, MountError> {
1282 let _data = ops_log.encode()?;
1283 let data = secret.encrypt(&_data)?;
1284 let hash = blobs.put(data).await?;
1285 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1287 tracing::debug!(
1288 "_put_ops_log_in_blobs: Stored ops log with {} operations at hash {}",
1289 ops_log.len(),
1290 hash
1291 );
1292 Ok(link)
1293 }
1294
1295 pub async fn collect_ops_since(
1311 &self,
1312 ancestor_link: Option<&Link>,
1313 blobs: &BlobsStore,
1314 ) -> Result<PathOpLog, MountError> {
1315 let inner = self.0.lock().await;
1316 let secret_key = inner.secret_key.clone();
1317 let current_link = inner.link.clone();
1318 let current_ops = inner.ops_log.clone();
1319 drop(inner);
1320
1321 let mut all_logs: Vec<PathOpLog> = Vec::new();
1322
1323 if !current_ops.is_empty() {
1325 all_logs.push(current_ops);
1326 }
1327
1328 let mut link = current_link;
1330
1331 loop {
1332 if let Some(ancestor) = ancestor_link {
1334 if &link == ancestor {
1335 break;
1336 }
1337 }
1338
1339 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1341
1342 let secret = match self.get_secret_for_manifest(&manifest, &secret_key) {
1346 Ok(s) => s,
1347 Err(MountError::ShareNotFound) => {
1348 tracing::debug!(
1349 "collect_ops_since: stopping at link {} - no share for current user",
1350 link.hash()
1351 );
1352 break;
1353 }
1354 Err(e) => return Err(e),
1355 };
1356
1357 if let Some(ops_link) = manifest.ops_log() {
1359 let mut ops_log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
1360 ops_log.rebuild_clock();
1361 all_logs.push(ops_log);
1362 }
1363
1364 match manifest.previous() {
1366 Some(prev) => link = prev.clone(),
1367 None => break, }
1369 }
1370
1371 all_logs.reverse();
1373 let mut merged = PathOpLog::new();
1374 for log in all_logs {
1375 merged.merge(&log);
1376 }
1377
1378 Ok(merged)
1379 }
1380
1381 #[allow(clippy::result_large_err)]
1385 fn get_secret_for_manifest(
1386 &self,
1387 manifest: &Manifest,
1388 secret_key: &SecretKey,
1389 ) -> Result<Secret, MountError> {
1390 let public_key = secret_key.public();
1391 let share = manifest
1392 .get_share(&public_key)
1393 .ok_or(MountError::ShareNotFound)?;
1394
1395 match share.role() {
1396 PrincipalRole::Owner => {
1397 let secret_share = share.share().ok_or(MountError::ShareNotFound)?;
1398 Ok(secret_share.recover(secret_key)?)
1399 }
1400 PrincipalRole::Mirror => manifest
1401 .public()
1402 .cloned()
1403 .ok_or(MountError::MirrorCannotMount),
1404 }
1405 }
1406
1407 pub async fn find_common_ancestor(
1421 &self,
1422 other: &Mount,
1423 blobs: &BlobsStore,
1424 ) -> Result<Option<Link>, MountError> {
1425 let mut self_chain: std::collections::HashSet<Link> = std::collections::HashSet::new();
1427
1428 let self_link = self.link().await;
1429 let mut link = self_link;
1430
1431 loop {
1432 self_chain.insert(link.clone());
1433 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1434 match manifest.previous() {
1435 Some(prev) => link = prev.clone(),
1436 None => break,
1437 }
1438 }
1439
1440 let other_link = other.link().await;
1442 let mut link = other_link;
1443
1444 loop {
1445 if self_chain.contains(&link) {
1446 return Ok(Some(link));
1447 }
1448 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1449 match manifest.previous() {
1450 Some(prev) => link = prev.clone(),
1451 None => break,
1452 }
1453 }
1454
1455 Ok(None)
1457 }
1458
1459 async fn apply_resolved_state(&mut self, merged_ops: &PathOpLog) -> Result<(), MountError> {
1465 let resolved_state = merged_ops.resolve_all();
1466
1467 for (path, op) in &resolved_state {
1468 if op.content_link.is_some() {
1469 let abs_path = Path::new("/").join(path);
1471 match self.get(&abs_path).await {
1472 Ok(_) => {
1473 }
1475 Err(MountError::PathNotFound(_)) => {
1476 tracing::warn!(
1478 "apply_resolved_state: cannot recreate file {} - no secret",
1479 path.display()
1480 );
1481 let mut inner = self.0.lock().await;
1483 inner.ops_log.merge(&PathOpLog::from_operation(op));
1484 }
1485 Err(e) => return Err(e),
1486 }
1487 } else if op.is_dir && matches!(op.op_type, super::path_ops::OpType::Mkdir) {
1488 let abs_path = Path::new("/").join(path);
1490 match self.mkdir(&abs_path).await {
1491 Ok(()) => {}
1492 Err(MountError::PathAlreadyExists(_)) => {}
1493 Err(e) => return Err(e),
1494 }
1495 }
1496 }
1497
1498 Ok(())
1499 }
1500
1501 pub async fn merge_from<R: super::ConflictResolver>(
1520 &mut self,
1521 incoming: &Mount,
1522 resolver: &R,
1523 blobs: &BlobsStore,
1524 ) -> Result<(MergeResult, Link), MountError> {
1525 let ancestor = self.find_common_ancestor(incoming, blobs).await?;
1527
1528 let local_ops = self.collect_ops_since(ancestor.as_ref(), blobs).await?;
1530 let incoming_ops = incoming.collect_ops_since(ancestor.as_ref(), blobs).await?;
1531
1532 let peer_id = {
1534 let inner = self.0.lock().await;
1535 inner.peer_id
1536 };
1537
1538 let mut merged_ops = local_ops.clone();
1540 let merge_result = merged_ops.merge_with_resolver(&incoming_ops, resolver, &peer_id);
1541
1542 self.apply_resolved_state(&merged_ops).await?;
1544
1545 {
1547 let mut inner = self.0.lock().await;
1548 inner.ops_log.merge(&merged_ops);
1549 }
1550
1551 let (link, _, _) = self.save(blobs, None).await?;
1553
1554 Ok((merge_result, link))
1555 }
1556}