1use std::collections::BTreeMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::crypto::{PublicKey, Secret, SecretError, SecretKey, SecretShare};
10use crate::linked_data::{BlockEncoded, CodecError, Link};
11use crate::peer::{BlobsStore, BlobsStoreError};
12
13use super::conflict::MergeResult;
14use super::manifest::{Manifest, ManifestError, Share};
15use super::node::{Node, NodeError, NodeLink};
16use super::path_ops::{OpType, PathOpLog};
17use super::pins::Pins;
18use super::principal::PrincipalRole;
19
20pub fn clean_path(path: &Path) -> PathBuf {
21 if !path.is_absolute() {
22 panic!("path is not absolute");
23 }
24 path.iter()
25 .skip(1)
26 .map(|part| part.to_string_lossy().to_string())
27 .collect::<PathBuf>()
28}
29
30#[derive(Clone)]
31pub struct MountInner {
32 pub link: Link,
34 pub manifest: Manifest,
36 pub entry: Node,
38 pub pins: Pins,
40 pub height: u64,
42 pub ops_log: PathOpLog,
44 pub peer_id: PublicKey,
46 pub secret_key: SecretKey,
48}
49
50impl MountInner {
51 pub fn link(&self) -> &Link {
52 &self.link
53 }
54 pub fn entry(&self) -> &Node {
55 &self.entry
56 }
57 pub fn manifest(&self) -> &Manifest {
58 &self.manifest
59 }
60 pub fn pins(&self) -> &Pins {
61 &self.pins
62 }
63 pub fn height(&self) -> u64 {
64 self.height
65 }
66 pub fn ops_log(&self) -> &PathOpLog {
67 &self.ops_log
68 }
69 pub fn peer_id(&self) -> &PublicKey {
70 &self.peer_id
71 }
72}
73
74#[derive(Clone)]
75pub struct Mount(Arc<Mutex<MountInner>>, BlobsStore);
76
77#[derive(Debug, thiserror::Error)]
78pub enum MountError {
79 #[error("default error: {0}")]
80 Default(#[from] anyhow::Error),
81 #[error("link not found")]
82 LinkNotFound(Link),
83 #[error("path not found: {0}")]
84 PathNotFound(PathBuf),
85 #[error("path is not a node: {0}")]
86 PathNotNode(PathBuf),
87 #[error("path already exists: {0}")]
88 PathAlreadyExists(PathBuf),
89 #[error("cannot move '{from}' to '{to}': destination is inside source")]
90 MoveIntoSelf { from: PathBuf, to: PathBuf },
91 #[error("blobs store error: {0}")]
92 BlobsStore(#[from] BlobsStoreError),
93 #[error("secret error: {0}")]
94 Secret(#[from] SecretError),
95 #[error("node error: {0}")]
96 Node(#[from] NodeError),
97 #[error("codec error: {0}")]
98 Codec(#[from] CodecError),
99 #[error("share error: {0}")]
100 Share(#[from] crate::crypto::SecretShareError),
101 #[error("manifest error: {0}")]
102 Manifest(#[from] ManifestError),
103 #[error("peers share was not found")]
104 ShareNotFound,
105 #[error("mirror cannot mount: bucket is not published")]
106 MirrorCannotMount,
107}
108
109impl Mount {
110 pub async fn inner(&self) -> MountInner {
111 self.0.lock().await.clone()
112 }
113
114 pub fn blobs(&self) -> BlobsStore {
115 self.1.clone()
116 }
117
118 pub async fn link(&self) -> Link {
119 let inner = self.0.lock().await;
120 inner.link.clone()
121 }
122
123 pub async fn save(
128 &self,
129 blobs: &BlobsStore,
130 publish: bool,
131 ) -> Result<(Link, Link, u64), MountError> {
132 let (
134 entry_node,
135 mut pins,
136 previous_link,
137 previous_height,
138 manifest_template,
139 ops_log,
140 secret_key,
141 ) = {
142 let inner = self.0.lock().await;
143 (
144 inner.entry.clone(),
145 inner.pins.clone(),
146 inner.link.clone(),
147 inner.height,
148 inner.manifest.clone(),
149 inner.ops_log.clone(),
150 inner.secret_key.clone(),
151 )
152 };
153
154 let height = previous_height + 1;
156
157 let secret = Secret::generate();
159
160 let entry = Self::_put_node_in_blobs(&entry_node, &secret, blobs).await?;
162
163 pins.insert(entry.clone().hash());
166 pins.insert(previous_link.hash());
167
168 let ops_log_link = if !ops_log.is_empty() {
170 let link = Self::_put_ops_log_in_blobs(&ops_log, &secret, blobs).await?;
171 pins.insert(link.hash());
172 Some(link)
173 } else {
174 None
175 };
176
177 let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
178
179 let mut manifest = manifest_template;
181 for share in manifest.shares_mut().values_mut() {
182 if *share.role() == PrincipalRole::Owner {
183 let secret_share = SecretShare::new(&secret, &share.principal().identity)?;
184 share.set_share(secret_share);
185 }
186 }
187
188 if publish {
190 manifest.publish(&secret);
191 } else {
192 manifest.unpublish();
195 }
196 manifest.set_pins(pins_link.clone());
197 manifest.set_previous(previous_link.clone());
198 manifest.set_entry(entry.clone());
199 manifest.set_height(height);
200
201 manifest.clear_ops_log();
204 if let Some(ops_link) = ops_log_link {
205 manifest.set_ops_log(ops_link);
206 }
207
208 manifest.sign(&secret_key)?;
210
211 let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
213
214 {
216 let mut inner = self.0.lock().await;
217 inner.manifest = manifest;
218 inner.height = height;
219 inner.link = link.clone();
220 inner.ops_log.clear_preserving_clock();
224 }
225
226 Ok((link, previous_link, height))
227 }
228
229 pub async fn init(
230 id: Uuid,
231 name: String,
232 owner: &SecretKey,
233 blobs: &BlobsStore,
234 ) -> Result<Self, MountError> {
235 let entry = Node::default();
237 let secret = Secret::generate();
239 let entry_link = Self::_put_node_in_blobs(&entry, &secret, blobs).await?;
241 let share = SecretShare::new(&secret, &owner.public())?;
243 let mut pins = Pins::new();
245 pins.insert(entry_link.hash());
246 let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
248 let mut manifest = Manifest::new(
250 id,
251 name.clone(),
252 owner.public(),
253 share,
254 entry_link.clone(),
255 pins_link.clone(),
256 0, );
258 manifest.sign(owner)?;
260 let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
261
262 Ok(Mount(
264 Arc::new(Mutex::new(MountInner {
265 link,
266 manifest,
267 entry,
268 pins,
269 height: 0,
270 ops_log: PathOpLog::new(),
271 peer_id: owner.public(),
272 secret_key: owner.clone(),
273 })),
274 blobs.clone(),
275 ))
276 }
277
278 pub async fn load(
279 link: &Link,
280 secret_key: &SecretKey,
281 blobs: &BlobsStore,
282 ) -> Result<Self, MountError> {
283 let public_key = &secret_key.public();
284 let manifest = Self::_get_manifest_from_blobs(link, blobs).await?;
285
286 let bucket_share = match manifest.get_share(public_key) {
287 Some(share) => share,
288 None => return Err(MountError::ShareNotFound),
289 };
290
291 let secret = match bucket_share.role() {
293 PrincipalRole::Owner => {
294 let share = bucket_share.share().ok_or(MountError::ShareNotFound)?;
296 share.recover(secret_key)?
297 }
298 PrincipalRole::Mirror => {
299 manifest
301 .public()
302 .cloned()
303 .ok_or(MountError::MirrorCannotMount)?
304 }
305 };
306
307 let pins = Self::_get_pins_from_blobs(manifest.pins(), blobs).await?;
308 let entry = Self::_get_node_from_blobs(
309 &NodeLink::Dir(manifest.entry().clone(), secret.clone()),
310 blobs,
311 )
312 .await?;
313
314 let height = manifest.height();
316
317 let ops_log = if let Some(ops_link) = manifest.ops_log() {
319 let mut log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
320 log.rebuild_clock();
322 log
323 } else {
324 PathOpLog::new()
325 };
326
327 Ok(Mount(
328 Arc::new(Mutex::new(MountInner {
329 link: link.clone(),
330 manifest,
331 entry,
332 pins,
333 height,
334 ops_log,
335 peer_id: secret_key.public(),
336 secret_key: secret_key.clone(),
337 })),
338 blobs.clone(),
339 ))
340 }
341
342 pub async fn load_manifest(link: &Link, blobs: &BlobsStore) -> Result<Manifest, MountError> {
347 Self::_get_manifest_from_blobs(link, blobs).await
348 }
349
350 pub async fn add_owner(&mut self, peer: PublicKey) -> Result<(), MountError> {
353 let mut inner = self.0.lock().await;
354 let secret_share = SecretShare::new(&Secret::default(), &peer)?;
355 inner
356 .manifest
357 .add_share(Share::new_owner(secret_share, peer));
358 Ok(())
359 }
360
361 pub async fn add_mirror(&mut self, peer: PublicKey) {
364 let mut inner = self.0.lock().await;
365 inner.manifest.add_share(Share::new_mirror(peer));
366 }
367
368 pub async fn is_published(&self) -> bool {
370 let inner = self.0.lock().await;
371 inner.manifest.is_published()
372 }
373
374 pub async fn publish(&self) -> Result<(Link, Link, u64), MountError> {
378 self.save(&self.1, true).await
379 }
380
381 pub async fn add<R>(&mut self, path: &Path, data: R) -> Result<(), MountError>
382 where
383 R: Read + Send + Sync + 'static + Unpin,
384 {
385 let secret = Secret::generate();
386
387 let encrypted_reader = secret.encrypt_reader(data)?;
388
389 use bytes::Bytes;
391 use futures::stream;
392 let encrypted_bytes = {
393 let mut buf = Vec::new();
394 let mut reader = encrypted_reader;
395 reader.read_to_end(&mut buf).map_err(SecretError::Io)?;
396 buf
397 };
398
399 let stream = Box::pin(stream::once(async move {
400 Ok::<_, std::io::Error>(Bytes::from(encrypted_bytes))
401 }));
402
403 let hash = self.1.put_stream(stream).await?;
404
405 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
406
407 let node_link = NodeLink::new_data_from_path(link.clone(), secret, path);
408
409 let root_node = {
410 let inner = self.0.lock().await;
411 inner.entry.clone()
412 };
413
414 let (updated_link, node_hashes) =
415 Self::_set_node_link_at_path(root_node, node_link, path, &self.1).await?;
416
417 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
419 Some(
420 Self::_get_node_from_blobs(
421 &NodeLink::Dir(new_root_link.clone(), new_secret),
422 &self.1,
423 )
424 .await?,
425 )
426 } else {
427 None
428 };
429
430 {
432 let mut inner = self.0.lock().await;
433 inner.pins.insert(hash);
435 inner.pins.extend(node_hashes);
436
437 if let Some(entry) = new_entry {
438 inner.entry = entry;
439 }
440
441 let peer_id = inner.peer_id;
443 inner
444 .ops_log
445 .record(peer_id, OpType::Add, clean_path(path), Some(link), false);
446 }
447
448 Ok(())
449 }
450
451 pub async fn rm(&mut self, path: &Path) -> Result<(), MountError> {
452 let path = clean_path(path);
453 let parent_path = path
454 .parent()
455 .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot remove root")))?;
456
457 let entry = {
458 let inner = self.0.lock().await;
459 inner.entry.clone()
460 };
461
462 let mut parent_node = if parent_path == Path::new("") {
463 entry.clone()
464 } else {
465 Self::_get_node_at_path(&entry, parent_path, &self.1).await?
466 };
467
468 let file_name = path.file_name().unwrap().to_string_lossy().to_string();
469
470 let removed_link = parent_node.del(&file_name);
472 if removed_link.is_none() {
473 return Err(MountError::PathNotFound(path.to_path_buf()));
474 }
475 let is_dir = removed_link.map(|l| l.is_dir()).unwrap_or(false);
476
477 let removed_path = path.to_path_buf();
479
480 if parent_path == Path::new("") {
481 let secret = Secret::generate();
482 let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
483
484 let mut inner = self.0.lock().await;
485 inner.pins.insert(link.hash());
487 inner.entry = parent_node;
488 } else {
489 let secret = Secret::generate();
491 let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
492 let node_link = NodeLink::new_dir(parent_link.clone(), secret);
493
494 let abs_parent_path = Path::new("/").join(parent_path);
496 let (updated_link, node_hashes) =
497 Self::_set_node_link_at_path(entry, node_link, &abs_parent_path, &self.1).await?;
498
499 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
500 Some(
501 Self::_get_node_from_blobs(
502 &NodeLink::Dir(new_root_link.clone(), new_secret),
503 &self.1,
504 )
505 .await?,
506 )
507 } else {
508 None
509 };
510
511 let mut inner = self.0.lock().await;
512 inner.pins.insert(parent_link.hash());
514 inner.pins.extend(node_hashes);
515
516 if let Some(new_entry) = new_entry {
517 inner.entry = new_entry;
518 }
519 }
520
521 {
523 let mut inner = self.0.lock().await;
524 let peer_id = inner.peer_id;
525 inner
526 .ops_log
527 .record(peer_id, OpType::Remove, removed_path, None, is_dir);
528 }
529
530 Ok(())
531 }
532
533 pub async fn mkdir(&mut self, path: &Path) -> Result<(), MountError> {
534 let path = clean_path(path);
535
536 let entry = {
538 let inner = self.0.lock().await;
539 inner.entry.clone()
540 };
541
542 let (parent_path, dir_name) = if let Some(parent) = path.parent() {
544 (
545 parent,
546 path.file_name().unwrap().to_string_lossy().to_string(),
547 )
548 } else {
549 return Err(MountError::Default(anyhow::anyhow!("Cannot mkdir at root")));
550 };
551
552 let parent_node = if parent_path == Path::new("") {
554 entry.clone()
555 } else {
556 match Self::_get_node_at_path(&entry, parent_path, &self.1).await {
558 Ok(node) => node,
559 Err(MountError::PathNotFound(_)) => Node::default(), Err(err) => return Err(err),
561 }
562 };
563
564 if parent_node.get_link(&dir_name).is_some() {
566 return Err(MountError::PathAlreadyExists(Path::new("/").join(&path)));
567 }
568
569 let new_dir_node = Node::default();
571
572 let secret = Secret::generate();
574
575 let dir_link = Self::_put_node_in_blobs(&new_dir_node, &secret, &self.1).await?;
577
578 let node_link = NodeLink::new_dir(dir_link.clone(), secret);
580
581 let abs_path = Path::new("/").join(&path);
583
584 let (updated_link, node_hashes) =
586 Self::_set_node_link_at_path(entry, node_link, &abs_path, &self.1).await?;
587
588 let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
590 Some(
591 Self::_get_node_from_blobs(
592 &NodeLink::Dir(new_root_link.clone(), new_secret),
593 &self.1,
594 )
595 .await?,
596 )
597 } else {
598 None
599 };
600
601 {
603 let mut inner = self.0.lock().await;
604 inner.pins.insert(dir_link.hash());
606 inner.pins.extend(node_hashes);
607
608 if let Some(new_entry) = new_entry {
609 inner.entry = new_entry;
610 }
611
612 let peer_id = inner.peer_id;
614 inner
615 .ops_log
616 .record(peer_id, OpType::Mkdir, path.to_path_buf(), None, true);
617 }
618
619 Ok(())
620 }
621
622 pub async fn mv(&mut self, from: &Path, to: &Path) -> Result<(), MountError> {
641 let from_clean = clean_path(from);
644 let to_clean = clean_path(to);
645
646 if to.starts_with(from) {
657 return Err(MountError::MoveIntoSelf {
658 from: from.to_path_buf(),
659 to: to.to_path_buf(),
660 });
661 }
662
663 let node_link = self.get(from).await?;
670 let is_dir = node_link.is_dir();
671
672 let from_path = from_clean.to_path_buf();
674 let to_path = to_clean.to_path_buf();
675
676 if self.get(to).await.is_ok() {
682 return Err(MountError::PathAlreadyExists(to.to_path_buf()));
683 }
684
685 {
695 let parent_path = from_clean
697 .parent()
698 .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot move root")))?;
699
700 let entry = {
702 let inner = self.0.lock().await;
703 inner.entry.clone()
704 };
705
706 let mut parent_node = if parent_path == Path::new("") {
708 entry.clone()
710 } else {
711 Self::_get_node_at_path(&entry, parent_path, &self.1).await?
713 };
714
715 let file_name = from_clean
717 .file_name()
718 .expect(
719 "from_clean has no filename - this should be impossible after parent() check",
720 )
721 .to_string_lossy()
722 .to_string();
723
724 if parent_node.del(&file_name).is_none() {
726 return Err(MountError::PathNotFound(from_clean.to_path_buf()));
727 }
728
729 if parent_path == Path::new("") {
731 let secret = Secret::generate();
733 let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
734
735 let mut inner = self.0.lock().await;
736 inner.pins.insert(link.hash());
737 inner.entry = parent_node;
738 } else {
739 let secret = Secret::generate();
743 let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
744 let new_node_link = NodeLink::new_dir(parent_link.clone(), secret);
745
746 let abs_parent_path = Path::new("/").join(parent_path);
748 let (updated_root_link, node_hashes) =
749 Self::_set_node_link_at_path(entry, new_node_link, &abs_parent_path, &self.1)
750 .await?;
751
752 let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
756
757 let mut inner = self.0.lock().await;
759 inner.pins.insert(parent_link.hash());
760 inner.pins.extend(node_hashes);
761 inner.entry = new_entry;
762 }
763 }
764
765 let entry = {
772 let inner = self.0.lock().await;
773 inner.entry.clone()
774 };
775
776 let (updated_root_link, node_hashes) =
777 Self::_set_node_link_at_path(entry, node_link, to, &self.1).await?;
778
779 {
783 let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
785
786 let mut inner = self.0.lock().await;
787 inner.pins.extend(node_hashes);
788 inner.entry = new_entry;
789
790 let peer_id = inner.peer_id;
794 inner.ops_log.record(
795 peer_id,
796 OpType::Mv { from: from_path },
797 to_path,
798 None,
799 is_dir,
800 );
801 }
802
803 Ok(())
804 }
805
806 pub async fn ls(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
807 let mut items = BTreeMap::new();
808 let path = clean_path(path);
809
810 let inner = self.0.lock().await;
811 let root_node = inner.entry.clone();
812 drop(inner);
813
814 let node = if path == Path::new("") {
815 root_node
816 } else {
817 match Self::_get_node_at_path(&root_node, &path, &self.1).await {
818 Ok(node) => node,
819 Err(MountError::LinkNotFound(_)) => {
820 return Err(MountError::PathNotNode(path.to_path_buf()))
821 }
822 Err(err) => return Err(err),
823 }
824 };
825
826 for (name, link) in node.get_links() {
827 let mut full_path = path.clone();
828 full_path.push(name);
829 items.insert(full_path, link.clone());
830 }
831
832 Ok(items)
833 }
834
835 pub async fn ls_deep(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
836 let base_path = clean_path(path);
837 self._ls_deep(path, &base_path).await
838 }
839
840 async fn _ls_deep(
841 &self,
842 path: &Path,
843 base_path: &Path,
844 ) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
845 let mut all_items = BTreeMap::new();
846
847 let items = self.ls(path).await?;
849
850 for (item_path, link) in items {
851 let relative_path = if base_path == Path::new("") {
853 item_path.clone()
854 } else {
855 item_path
856 .strip_prefix(base_path)
857 .unwrap_or(&item_path)
858 .to_path_buf()
859 };
860 all_items.insert(relative_path.clone(), link.clone());
861
862 if link.is_dir() {
863 let abs_item_path = Path::new("/").join(&item_path);
865 let sub_items = Box::pin(self._ls_deep(&abs_item_path, base_path)).await?;
866
867 for (sub_path, sub_link) in sub_items {
869 all_items.insert(sub_path, sub_link);
870 }
871 }
872 }
873
874 Ok(all_items)
875 }
876
877 #[allow(clippy::await_holding_lock)]
878 pub async fn cat(&self, path: &Path) -> Result<Vec<u8>, MountError> {
879 let path = clean_path(path);
880
881 let inner = self.0.lock().await;
882 let root_node = inner.entry.clone();
883 drop(inner);
884
885 let (parent_path, file_name) = if let Some(parent) = path.parent() {
886 (
887 parent,
888 path.file_name().unwrap().to_string_lossy().to_string(),
889 )
890 } else {
891 return Err(MountError::PathNotFound(path.to_path_buf()));
892 };
893
894 let parent_node = if parent_path == Path::new("") {
895 root_node
896 } else {
897 Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
898 };
899
900 let link = parent_node
901 .get_link(&file_name)
902 .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))?;
903
904 match link {
905 NodeLink::Data(link, secret, _) => {
906 let encrypted_data = self.1.get(&link.hash()).await?;
907 let data = secret.decrypt(&encrypted_data)?;
908 Ok(data)
909 }
910 NodeLink::Dir(_, _) => Err(MountError::PathNotNode(path.to_path_buf())),
911 }
912 }
913
914 #[allow(clippy::await_holding_lock)]
916 pub async fn get(&self, path: &Path) -> Result<NodeLink, MountError> {
917 let path = clean_path(path);
918
919 let inner = self.0.lock().await;
920 let root_node = inner.entry.clone();
921 drop(inner);
922
923 let (parent_path, file_name) = if let Some(parent) = path.parent() {
924 (
925 parent,
926 path.file_name().unwrap().to_string_lossy().to_string(),
927 )
928 } else {
929 return Err(MountError::PathNotFound(path.to_path_buf()));
930 };
931
932 let parent_node = if parent_path == Path::new("") {
933 root_node
934 } else {
935 Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
936 };
937
938 parent_node
939 .get_link(&file_name)
940 .cloned()
941 .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))
942 }
943
944 async fn _get_node_at_path(
945 node: &Node,
946 path: &Path,
947 blobs: &BlobsStore,
948 ) -> Result<Node, MountError> {
949 let mut current_node = node.clone();
950 let mut consumed_path = PathBuf::from("/");
951
952 for part in path.iter() {
953 consumed_path.push(part);
954 let next = part.to_string_lossy().to_string();
955 let next_link = current_node
956 .get_link(&next)
957 .ok_or(MountError::PathNotFound(consumed_path.clone()))?;
958 current_node = Self::_get_node_from_blobs(next_link, blobs).await?
959 }
960 Ok(current_node)
961 }
962
963 pub async fn _set_node_link_at_path(
964 node: Node,
965 node_link: NodeLink,
966 path: &Path,
967 blobs: &BlobsStore,
968 ) -> Result<(NodeLink, Vec<crate::linked_data::Hash>), MountError> {
969 let path = clean_path(path);
970 let mut visited_nodes = Vec::new();
971 let mut name = path.file_name().unwrap().to_string_lossy().to_string();
972 let parent_path = path.parent().unwrap_or(Path::new(""));
973
974 let mut consumed_path = PathBuf::from("/");
975 let mut node = node;
976 visited_nodes.push((consumed_path.clone(), node.clone()));
977
978 for part in parent_path.iter() {
979 let next = part.to_string_lossy().to_string();
980 let next_link = node.get_link(&next);
981 if let Some(next_link) = next_link {
982 consumed_path.push(part);
983 match next_link {
984 NodeLink::Dir(..) => {
985 node = Self::_get_node_from_blobs(next_link, blobs).await?
986 }
987 NodeLink::Data(..) => {
988 return Err(MountError::PathNotNode(consumed_path.clone()));
989 }
990 }
991 visited_nodes.push((consumed_path.clone(), node.clone()));
992 } else {
993 node = Node::default();
995 consumed_path.push(part);
996 visited_nodes.push((consumed_path.clone(), node.clone()));
997 }
998 }
999
1000 let mut node_link = node_link;
1001 let mut created_hashes = Vec::new();
1002 for (path, mut node) in visited_nodes.into_iter().rev() {
1003 node.insert(name, node_link.clone());
1004 let secret = Secret::generate();
1005 let link = Self::_put_node_in_blobs(&node, &secret, blobs).await?;
1006 created_hashes.push(link.hash());
1007 node_link = NodeLink::Dir(link, secret);
1008 name = path
1009 .file_name()
1010 .unwrap_or_default()
1011 .to_string_lossy()
1012 .to_string();
1013 }
1014
1015 Ok((node_link, created_hashes))
1016 }
1017
1018 async fn _get_manifest_from_blobs(
1019 link: &Link,
1020 blobs: &BlobsStore,
1021 ) -> Result<Manifest, MountError> {
1022 tracing::debug!(
1023 "_get_bucket_from_blobs: Checking for bucket data at link {:?}",
1024 link
1025 );
1026 let hash = link.hash();
1027 tracing::debug!("_get_bucket_from_blobs: Bucket hash: {}", hash);
1028
1029 match blobs.stat(&hash).await {
1030 Ok(true) => {
1031 tracing::debug!(
1032 "_get_bucket_from_blobs: Bucket hash {} exists in blobs",
1033 hash
1034 );
1035 }
1036 Ok(false) => {
1037 tracing::error!("_get_bucket_from_blobs: Bucket hash {} NOT FOUND in blobs - LinkNotFound error!", hash);
1038 return Err(MountError::LinkNotFound(link.clone()));
1039 }
1040 Err(e) => {
1041 tracing::error!(
1042 "_get_bucket_from_blobs: Error checking bucket hash {}: {}",
1043 hash,
1044 e
1045 );
1046 return Err(e.into());
1047 }
1048 }
1049
1050 tracing::debug!("_get_bucket_from_blobs: Reading bucket data from blobs");
1051 let data = blobs.get(&hash).await?;
1052 tracing::debug!(
1053 "_get_bucket_from_blobs: Got {} bytes of bucket data",
1054 data.len()
1055 );
1056
1057 let bucket_data = Manifest::decode(&data)?;
1058 tracing::debug!(
1059 "_get_bucket_from_blobs: Successfully decoded BucketData for bucket '{}'",
1060 bucket_data.name()
1061 );
1062
1063 Ok(bucket_data)
1064 }
1065
1066 pub async fn _get_pins_from_blobs(link: &Link, blobs: &BlobsStore) -> Result<Pins, MountError> {
1067 tracing::debug!("_get_pins_from_blobs: Checking for pins at link {:?}", link);
1068 let hash = link.hash();
1069 tracing::debug!("_get_pins_from_blobs: Pins hash: {}", hash);
1070
1071 match blobs.stat(&hash).await {
1072 Ok(true) => {
1073 tracing::debug!("_get_pins_from_blobs: Pins hash {} exists in blobs", hash);
1074 }
1075 Ok(false) => {
1076 tracing::error!(
1077 "_get_pins_from_blobs: Pins hash {} NOT FOUND in blobs - LinkNotFound error!",
1078 hash
1079 );
1080 return Err(MountError::LinkNotFound(link.clone()));
1081 }
1082 Err(e) => {
1083 tracing::error!(
1084 "_get_pins_from_blobs: Error checking pins hash {}: {}",
1085 hash,
1086 e
1087 );
1088 return Err(e.into());
1089 }
1090 }
1091
1092 tracing::debug!("_get_pins_from_blobs: Reading hash list from blobs");
1093 let hashes = blobs.read_hash_list(hash).await?;
1095 tracing::debug!(
1096 "_get_pins_from_blobs: Successfully read {} hashes from pinset",
1097 hashes.len()
1098 );
1099
1100 Ok(Pins::from_vec(hashes))
1101 }
1102
1103 async fn _get_node_from_blobs(
1104 node_link: &NodeLink,
1105 blobs: &BlobsStore,
1106 ) -> Result<Node, MountError> {
1107 let link = node_link.link();
1108 let secret = node_link.secret();
1109 let hash = link.hash();
1110
1111 tracing::debug!("_get_node_from_blobs: Checking for node at hash {}", hash);
1112
1113 match blobs.stat(&hash).await {
1114 Ok(true) => {
1115 tracing::debug!("_get_node_from_blobs: Node hash {} exists in blobs", hash);
1116 }
1117 Ok(false) => {
1118 tracing::error!(
1119 "_get_node_from_blobs: Node hash {} NOT FOUND in blobs - LinkNotFound error!",
1120 hash
1121 );
1122 return Err(MountError::LinkNotFound(link.clone()));
1123 }
1124 Err(e) => {
1125 tracing::error!(
1126 "_get_node_from_blobs: Error checking node hash {}: {}",
1127 hash,
1128 e
1129 );
1130 return Err(e.into());
1131 }
1132 }
1133
1134 tracing::debug!("_get_node_from_blobs: Reading encrypted node blob");
1135 let blob = blobs.get(&hash).await?;
1136 tracing::debug!(
1137 "_get_node_from_blobs: Got {} bytes of encrypted node data",
1138 blob.len()
1139 );
1140
1141 tracing::debug!("_get_node_from_blobs: Decrypting node data");
1142 let data = secret.decrypt(&blob)?;
1143 tracing::debug!("_get_node_from_blobs: Decrypted {} bytes", data.len());
1144
1145 let node = Node::decode(&data)?;
1146 tracing::debug!("_get_node_from_blobs: Successfully decoded Node");
1147
1148 Ok(node)
1149 }
1150
1151 async fn _put_node_in_blobs(
1155 node: &Node,
1156 secret: &Secret,
1157 blobs: &BlobsStore,
1158 ) -> Result<Link, MountError> {
1159 let _data = node.encode()?;
1160 let data = secret.encrypt(&_data)?;
1161 let hash = blobs.put(data).await?;
1162 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1165 Ok(link)
1166 }
1167
1168 pub async fn _put_manifest_in_blobs(
1169 bucket_data: &Manifest,
1170 blobs: &BlobsStore,
1171 ) -> Result<Link, MountError> {
1172 let data = bucket_data.encode()?;
1173 let hash = blobs.put(data).await?;
1174 let link = Link::new(bucket_data.codec(), hash);
1177 Ok(link)
1178 }
1179
1180 pub async fn _put_pins_in_blobs(pins: &Pins, blobs: &BlobsStore) -> Result<Link, MountError> {
1181 let hash = blobs.create_hash_list(pins.iter().copied()).await?;
1183 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1186 Ok(link)
1187 }
1188
1189 async fn _get_ops_log_from_blobs(
1190 link: &Link,
1191 secret: &Secret,
1192 blobs: &BlobsStore,
1193 ) -> Result<PathOpLog, MountError> {
1194 let hash = link.hash();
1195 tracing::debug!(
1196 "_get_ops_log_from_blobs: Checking for ops log at hash {}",
1197 hash
1198 );
1199
1200 match blobs.stat(&hash).await {
1201 Ok(true) => {
1202 tracing::debug!(
1203 "_get_ops_log_from_blobs: Ops log hash {} exists in blobs",
1204 hash
1205 );
1206 }
1207 Ok(false) => {
1208 tracing::error!(
1209 "_get_ops_log_from_blobs: Ops log hash {} NOT FOUND in blobs - LinkNotFound error!",
1210 hash
1211 );
1212 return Err(MountError::LinkNotFound(link.clone()));
1213 }
1214 Err(e) => {
1215 tracing::error!(
1216 "_get_ops_log_from_blobs: Error checking ops log hash {}: {}",
1217 hash,
1218 e
1219 );
1220 return Err(e.into());
1221 }
1222 }
1223
1224 tracing::debug!("_get_ops_log_from_blobs: Reading encrypted ops log blob");
1225 let blob = blobs.get(&hash).await?;
1226 tracing::debug!(
1227 "_get_ops_log_from_blobs: Got {} bytes of encrypted ops log data",
1228 blob.len()
1229 );
1230
1231 tracing::debug!("_get_ops_log_from_blobs: Decrypting ops log data");
1232 let data = secret.decrypt(&blob)?;
1233 tracing::debug!("_get_ops_log_from_blobs: Decrypted {} bytes", data.len());
1234
1235 let ops_log = PathOpLog::decode(&data)?;
1236 tracing::debug!(
1237 "_get_ops_log_from_blobs: Successfully decoded PathOpLog with {} operations",
1238 ops_log.len()
1239 );
1240
1241 Ok(ops_log)
1242 }
1243
1244 async fn _put_ops_log_in_blobs(
1245 ops_log: &PathOpLog,
1246 secret: &Secret,
1247 blobs: &BlobsStore,
1248 ) -> Result<Link, MountError> {
1249 let _data = ops_log.encode()?;
1250 let data = secret.encrypt(&_data)?;
1251 let hash = blobs.put(data).await?;
1252 let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1254 tracing::debug!(
1255 "_put_ops_log_in_blobs: Stored ops log with {} operations at hash {}",
1256 ops_log.len(),
1257 hash
1258 );
1259 Ok(link)
1260 }
1261
1262 pub async fn collect_ops_since(
1278 &self,
1279 ancestor_link: Option<&Link>,
1280 blobs: &BlobsStore,
1281 ) -> Result<PathOpLog, MountError> {
1282 let inner = self.0.lock().await;
1283 let secret_key = inner.secret_key.clone();
1284 let current_link = inner.link.clone();
1285 let current_ops = inner.ops_log.clone();
1286 drop(inner);
1287
1288 let mut all_logs: Vec<PathOpLog> = Vec::new();
1289
1290 if !current_ops.is_empty() {
1292 all_logs.push(current_ops);
1293 }
1294
1295 let mut link = current_link;
1297
1298 loop {
1299 if let Some(ancestor) = ancestor_link {
1301 if &link == ancestor {
1302 break;
1303 }
1304 }
1305
1306 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1308
1309 let secret = match self.get_secret_for_manifest(&manifest, &secret_key) {
1313 Ok(s) => s,
1314 Err(MountError::ShareNotFound) => {
1315 tracing::debug!(
1316 "collect_ops_since: stopping at link {} - no share for current user",
1317 link.hash()
1318 );
1319 break;
1320 }
1321 Err(e) => return Err(e),
1322 };
1323
1324 if let Some(ops_link) = manifest.ops_log() {
1326 let mut ops_log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
1327 ops_log.rebuild_clock();
1328 all_logs.push(ops_log);
1329 }
1330
1331 match manifest.previous() {
1333 Some(prev) => link = prev.clone(),
1334 None => break, }
1336 }
1337
1338 all_logs.reverse();
1340 let mut merged = PathOpLog::new();
1341 for log in all_logs {
1342 merged.merge(&log);
1343 }
1344
1345 Ok(merged)
1346 }
1347
1348 #[allow(clippy::result_large_err)]
1352 fn get_secret_for_manifest(
1353 &self,
1354 manifest: &Manifest,
1355 secret_key: &SecretKey,
1356 ) -> Result<Secret, MountError> {
1357 let public_key = secret_key.public();
1358 let share = manifest
1359 .get_share(&public_key)
1360 .ok_or(MountError::ShareNotFound)?;
1361
1362 match share.role() {
1363 PrincipalRole::Owner => {
1364 let secret_share = share.share().ok_or(MountError::ShareNotFound)?;
1365 Ok(secret_share.recover(secret_key)?)
1366 }
1367 PrincipalRole::Mirror => manifest
1368 .public()
1369 .cloned()
1370 .ok_or(MountError::MirrorCannotMount),
1371 }
1372 }
1373
1374 pub async fn find_common_ancestor(
1388 &self,
1389 other: &Mount,
1390 blobs: &BlobsStore,
1391 ) -> Result<Option<Link>, MountError> {
1392 let mut self_chain: std::collections::HashSet<Link> = std::collections::HashSet::new();
1394
1395 let self_link = self.link().await;
1396 let mut link = self_link;
1397
1398 loop {
1399 self_chain.insert(link.clone());
1400 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1401 match manifest.previous() {
1402 Some(prev) => link = prev.clone(),
1403 None => break,
1404 }
1405 }
1406
1407 let other_link = other.link().await;
1409 let mut link = other_link;
1410
1411 loop {
1412 if self_chain.contains(&link) {
1413 return Ok(Some(link));
1414 }
1415 let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1416 match manifest.previous() {
1417 Some(prev) => link = prev.clone(),
1418 None => break,
1419 }
1420 }
1421
1422 Ok(None)
1424 }
1425
1426 async fn apply_resolved_state(&mut self, merged_ops: &PathOpLog) -> Result<(), MountError> {
1432 let resolved_state = merged_ops.resolve_all();
1433
1434 for (path, op) in &resolved_state {
1435 if op.content_link.is_some() {
1436 let abs_path = Path::new("/").join(path);
1438 match self.get(&abs_path).await {
1439 Ok(_) => {
1440 }
1442 Err(MountError::PathNotFound(_)) => {
1443 tracing::warn!(
1445 "apply_resolved_state: cannot recreate file {} - no secret",
1446 path.display()
1447 );
1448 let mut inner = self.0.lock().await;
1450 inner.ops_log.merge(&PathOpLog::from_operation(op));
1451 }
1452 Err(e) => return Err(e),
1453 }
1454 } else if op.is_dir && matches!(op.op_type, super::path_ops::OpType::Mkdir) {
1455 let abs_path = Path::new("/").join(path);
1457 match self.mkdir(&abs_path).await {
1458 Ok(()) => {}
1459 Err(MountError::PathAlreadyExists(_)) => {}
1460 Err(e) => return Err(e),
1461 }
1462 }
1463 }
1464
1465 Ok(())
1466 }
1467
1468 pub async fn merge_from<R: super::ConflictResolver>(
1487 &mut self,
1488 incoming: &Mount,
1489 resolver: &R,
1490 blobs: &BlobsStore,
1491 ) -> Result<(MergeResult, Link), MountError> {
1492 let ancestor = self.find_common_ancestor(incoming, blobs).await?;
1494
1495 let local_ops = self.collect_ops_since(ancestor.as_ref(), blobs).await?;
1497 let incoming_ops = incoming.collect_ops_since(ancestor.as_ref(), blobs).await?;
1498
1499 let peer_id = {
1501 let inner = self.0.lock().await;
1502 inner.peer_id
1503 };
1504
1505 let mut merged_ops = local_ops.clone();
1507 let merge_result = merged_ops.merge_with_resolver(&incoming_ops, resolver, &peer_id);
1508
1509 self.apply_resolved_state(&merged_ops).await?;
1511
1512 {
1514 let mut inner = self.0.lock().await;
1515 inner.ops_log.merge(&merged_ops);
1516 }
1517
1518 let (link, _, _) = self.save(blobs, false).await?;
1520
1521 Ok((merge_result, link))
1522 }
1523}