Skip to main content

common/mount/
mount_inner.rs

1use std::collections::BTreeMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::crypto::{PublicKey, Secret, SecretError, SecretKey, SecretShare};
10use crate::linked_data::{BlockEncoded, CodecError, Link};
11use crate::peer::{BlobsStore, BlobsStoreError};
12
13use super::conflict::MergeResult;
14use super::manifest::{Manifest, ManifestError, Share};
15use super::node::{Node, NodeError, NodeLink};
16use super::path_ops::{OpType, PathOpLog};
17use super::pins::Pins;
18use super::principal::PrincipalRole;
19
20pub fn clean_path(path: &Path) -> PathBuf {
21    if !path.is_absolute() {
22        panic!("path is not absolute");
23    }
24    path.iter()
25        .skip(1)
26        .map(|part| part.to_string_lossy().to_string())
27        .collect::<PathBuf>()
28}
29
30#[derive(Clone)]
31pub struct MountInner {
32    // link to the manifest
33    pub link: Link,
34    // the loaded manifest
35    pub manifest: Manifest,
36    // the loaded, decrypted entry node
37    pub entry: Node,
38    // the loaded pins
39    pub pins: Pins,
40    // convenience pointer to the height of the mount
41    pub height: u64,
42    // the path operations log (CRDT for conflict resolution)
43    pub ops_log: PathOpLog,
44    // the local peer ID (for recording operations)
45    pub peer_id: PublicKey,
46    // the secret key for signing manifests
47    pub secret_key: SecretKey,
48}
49
50impl MountInner {
51    pub fn link(&self) -> &Link {
52        &self.link
53    }
54    pub fn entry(&self) -> &Node {
55        &self.entry
56    }
57    pub fn manifest(&self) -> &Manifest {
58        &self.manifest
59    }
60    pub fn pins(&self) -> &Pins {
61        &self.pins
62    }
63    pub fn height(&self) -> u64 {
64        self.height
65    }
66    pub fn ops_log(&self) -> &PathOpLog {
67        &self.ops_log
68    }
69    pub fn peer_id(&self) -> &PublicKey {
70        &self.peer_id
71    }
72}
73
74#[derive(Clone)]
75pub struct Mount(Arc<Mutex<MountInner>>, BlobsStore);
76
77#[derive(Debug, thiserror::Error)]
78pub enum MountError {
79    #[error("default error: {0}")]
80    Default(#[from] anyhow::Error),
81    #[error("link not found")]
82    LinkNotFound(Link),
83    #[error("path not found: {0}")]
84    PathNotFound(PathBuf),
85    #[error("path is not a node: {0}")]
86    PathNotNode(PathBuf),
87    #[error("path already exists: {0}")]
88    PathAlreadyExists(PathBuf),
89    #[error("cannot move '{from}' to '{to}': destination is inside source")]
90    MoveIntoSelf { from: PathBuf, to: PathBuf },
91    #[error("blobs store error: {0}")]
92    BlobsStore(#[from] BlobsStoreError),
93    #[error("secret error: {0}")]
94    Secret(#[from] SecretError),
95    #[error("node error: {0}")]
96    Node(#[from] NodeError),
97    #[error("codec error: {0}")]
98    Codec(#[from] CodecError),
99    #[error("share error: {0}")]
100    Share(#[from] crate::crypto::SecretShareError),
101    #[error("manifest error: {0}")]
102    Manifest(#[from] ManifestError),
103    #[error("peers share was not found")]
104    ShareNotFound,
105    #[error("mirror cannot mount: bucket is not published")]
106    MirrorCannotMount,
107    #[error("unauthorized: only owners can perform this operation")]
108    Unauthorized,
109}
110
111impl Mount {
112    pub async fn inner(&self) -> MountInner {
113        self.0.lock().await.clone()
114    }
115
116    pub fn blobs(&self) -> BlobsStore {
117        self.1.clone()
118    }
119
120    pub async fn link(&self) -> Link {
121        let inner = self.0.lock().await;
122        inner.link.clone()
123    }
124
125    /// Save the current mount state to the blobs store.
126    ///
127    /// The `publish` parameter controls publish state:
128    /// - `None` — preserve current state (published stays published, unpublished stays unpublished)
129    /// - `Some(true)` — publish (store secret in plaintext so mirrors can decrypt)
130    /// - `Some(false)` — unpublish (clear plaintext secret, revoke mirror access)
131    pub async fn save(
132        &self,
133        blobs: &BlobsStore,
134        publish: Option<bool>,
135    ) -> Result<(Link, Link, u64), MountError> {
136        // Clone data we need before any async operations
137        let (
138            entry_node,
139            mut pins,
140            previous_link,
141            previous_height,
142            manifest_template,
143            ops_log,
144            secret_key,
145        ) = {
146            let inner = self.0.lock().await;
147            (
148                inner.entry.clone(),
149                inner.pins.clone(),
150                inner.link.clone(),
151                inner.height,
152                inner.manifest.clone(),
153                inner.ops_log.clone(),
154                inner.secret_key.clone(),
155            )
156        };
157
158        // Increment the height of the mount
159        let height = previous_height + 1;
160
161        // Create a new secret for the updated root
162        let secret = Secret::generate();
163
164        // Put the current root node into blobs with the new secret
165        let entry = Self::_put_node_in_blobs(&entry_node, &secret, blobs).await?;
166
167        // Serialize current pins to blobs
168        // put the new root link into the pins, as well as the previous link
169        pins.insert(entry.clone().hash());
170        pins.insert(previous_link.hash());
171
172        // Encrypt and store the ops log if it has any operations
173        let ops_log_link = if !ops_log.is_empty() {
174            let link = Self::_put_ops_log_in_blobs(&ops_log, &secret, blobs).await?;
175            pins.insert(link.hash());
176            Some(link)
177        } else {
178            None
179        };
180
181        let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
182
183        // Re-encrypt owner shares with the new secret (mirrors stay unchanged)
184        let mut manifest = manifest_template;
185        for share in manifest.shares_mut().values_mut() {
186            if *share.role() == PrincipalRole::Owner {
187                let secret_share = SecretShare::new(&secret, &share.principal().identity)?;
188                share.set_share(secret_share);
189            }
190        }
191
192        // Apply publish state:
193        // None = preserve current, Some(true) = publish, Some(false) = unpublish
194        let should_publish = publish.unwrap_or(manifest.is_published());
195        if should_publish {
196            manifest.publish(&secret);
197        } else {
198            manifest.unpublish();
199        }
200        manifest.set_pins(pins_link.clone());
201        manifest.set_previous(previous_link.clone());
202        manifest.set_entry(entry.clone());
203        manifest.set_height(height);
204
205        // Clear inherited ops_log from the template, then set if we have new operations
206        // Each version's ops_log is independent and encrypted with that version's secret
207        manifest.clear_ops_log();
208        if let Some(ops_link) = ops_log_link {
209            manifest.set_ops_log(ops_link);
210        }
211
212        // Sign the manifest with the stored secret key
213        manifest.sign(&secret_key)?;
214
215        // Put the updated manifest into blobs to determine the new link
216        let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
217
218        // Update the internal state
219        {
220            let mut inner = self.0.lock().await;
221            inner.manifest = manifest;
222            inner.height = height;
223            inner.link = link.clone();
224            // Clear the ops_log - it's now persisted in the manifest
225            // Future operations start a fresh log for the next version
226            // IMPORTANT: Preserve the clock value so future ops have unique timestamps
227            inner.ops_log.clear_preserving_clock();
228        }
229
230        Ok((link, previous_link, height))
231    }
232
233    pub async fn init(
234        id: Uuid,
235        name: String,
236        owner: &SecretKey,
237        blobs: &BlobsStore,
238    ) -> Result<Self, MountError> {
239        // create a new root node for the bucket
240        let entry = Node::default();
241        // create a new secret for the owner
242        let secret = Secret::generate();
243        // put the node in the blobs store for the secret
244        let entry_link = Self::_put_node_in_blobs(&entry, &secret, blobs).await?;
245        // share the secret with the owner
246        let share = SecretShare::new(&secret, &owner.public())?;
247        // Initialize pins with root node hash
248        let mut pins = Pins::new();
249        pins.insert(entry_link.hash());
250        // Put the pins in blobs to get a pins link
251        let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
252        // construct the new manifest
253        let mut manifest = Manifest::new(
254            id,
255            name.clone(),
256            owner.public(),
257            share,
258            entry_link.clone(),
259            pins_link.clone(),
260            0, // initial height is 0
261        );
262        // Sign the manifest with the owner's key
263        manifest.sign(owner)?;
264        let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
265
266        // return the new mount
267        Ok(Mount(
268            Arc::new(Mutex::new(MountInner {
269                link,
270                manifest,
271                entry,
272                pins,
273                height: 0,
274                ops_log: PathOpLog::new(),
275                peer_id: owner.public(),
276                secret_key: owner.clone(),
277            })),
278            blobs.clone(),
279        ))
280    }
281
282    pub async fn load(
283        link: &Link,
284        secret_key: &SecretKey,
285        blobs: &BlobsStore,
286    ) -> Result<Self, MountError> {
287        let public_key = &secret_key.public();
288        let manifest = Self::_get_manifest_from_blobs(link, blobs).await?;
289
290        let bucket_share = match manifest.get_share(public_key) {
291            Some(share) => share,
292            None => return Err(MountError::ShareNotFound),
293        };
294
295        // Get the secret based on role
296        let secret = match bucket_share.role() {
297            PrincipalRole::Owner => {
298                // Owners decrypt their individual share
299                let share = bucket_share.share().ok_or(MountError::ShareNotFound)?;
300                share.recover(secret_key)?
301            }
302            PrincipalRole::Mirror => {
303                // Mirrors use the public secret (if bucket is published)
304                manifest
305                    .public()
306                    .cloned()
307                    .ok_or(MountError::MirrorCannotMount)?
308            }
309        };
310
311        let pins = Self::_get_pins_from_blobs(manifest.pins(), blobs).await?;
312        let entry = Self::_get_node_from_blobs(
313            &NodeLink::Dir(manifest.entry().clone(), secret.clone()),
314            blobs,
315        )
316        .await?;
317
318        // Read height from the manifest
319        let height = manifest.height();
320
321        // Load the ops log if it exists, otherwise create a new one
322        let ops_log = if let Some(ops_link) = manifest.ops_log() {
323            let mut log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
324            // Rebuild local clock from operations after deserialization
325            log.rebuild_clock();
326            log
327        } else {
328            PathOpLog::new()
329        };
330
331        Ok(Mount(
332            Arc::new(Mutex::new(MountInner {
333                link: link.clone(),
334                manifest,
335                entry,
336                pins,
337                height,
338                ops_log,
339                peer_id: secret_key.public(),
340                secret_key: secret_key.clone(),
341            })),
342            blobs.clone(),
343        ))
344    }
345
346    /// Load just the manifest from a link without full mount decryption.
347    ///
348    /// This is useful for checking roles/shares before deciding which version to load.
349    /// Returns the manifest if the blob exists, without attempting decryption.
350    pub async fn load_manifest(link: &Link, blobs: &BlobsStore) -> Result<Manifest, MountError> {
351        Self::_get_manifest_from_blobs(link, blobs).await
352    }
353
354    /// Add an owner to this bucket.
355    /// Owners get an encrypted share immediately.
356    pub async fn add_owner(&mut self, peer: PublicKey) -> Result<(), MountError> {
357        let mut inner = self.0.lock().await;
358        let secret_share = SecretShare::new(&Secret::default(), &peer)?;
359        inner
360            .manifest
361            .add_share(Share::new_owner(secret_share, peer));
362        Ok(())
363    }
364
365    /// Add a mirror to this bucket.
366    /// Mirrors can sync bucket data but cannot decrypt until published.
367    pub async fn add_mirror(&mut self, peer: PublicKey) {
368        let mut inner = self.0.lock().await;
369        inner.manifest.add_share(Share::new_mirror(peer));
370    }
371
372    /// Remove a share from the bucket.
373    ///
374    /// Only owners can remove shares. Returns an error if the caller is not an owner
375    /// or if the specified peer is not in the shares.
376    pub async fn remove_share(&self, peer_public_key: PublicKey) -> Result<(), MountError> {
377        let mut inner = self.0.lock().await;
378
379        // Verify caller is an owner
380        let our_key = inner.secret_key.public();
381        let our_share = inner
382            .manifest
383            .get_share(&our_key)
384            .ok_or(MountError::ShareNotFound)?;
385        if *our_share.role() != PrincipalRole::Owner {
386            return Err(MountError::Unauthorized);
387        }
388
389        // Verify the target share exists
390        let key_hex = peer_public_key.to_hex();
391        if inner.manifest.shares_mut().remove(&key_hex).is_none() {
392            return Err(MountError::ShareNotFound);
393        }
394
395        Ok(())
396    }
397
398    /// Check if this bucket is published (mirrors can decrypt).
399    pub async fn is_published(&self) -> bool {
400        let inner = self.0.lock().await;
401        inner.manifest.is_published()
402    }
403
404    /// Publish this bucket, granting decryption access to all mirrors.
405    pub async fn publish(&self) -> Result<(Link, Link, u64), MountError> {
406        self.save(&self.1, Some(true)).await
407    }
408
409    /// Unpublish this bucket, revoking mirror decryption access.
410    pub async fn unpublish(&self) -> Result<(Link, Link, u64), MountError> {
411        self.save(&self.1, Some(false)).await
412    }
413
414    pub async fn add<R>(&mut self, path: &Path, data: R) -> Result<(), MountError>
415    where
416        R: Read + Send + Sync + 'static + Unpin,
417    {
418        let secret = Secret::generate();
419
420        let encrypted_reader = secret.encrypt_reader(data)?;
421
422        // TODO (amiller68): this is incredibly dumb
423        use bytes::Bytes;
424        use futures::stream;
425        let encrypted_bytes = {
426            let mut buf = Vec::new();
427            let mut reader = encrypted_reader;
428            reader.read_to_end(&mut buf).map_err(SecretError::Io)?;
429            buf
430        };
431
432        let stream = Box::pin(stream::once(async move {
433            Ok::<_, std::io::Error>(Bytes::from(encrypted_bytes))
434        }));
435
436        let hash = self.1.put_stream(stream).await?;
437
438        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
439
440        let node_link = NodeLink::new_data_from_path(link.clone(), secret, path);
441
442        let root_node = {
443            let inner = self.0.lock().await;
444            inner.entry.clone()
445        };
446
447        let (updated_link, node_hashes) =
448            Self::_set_node_link_at_path(root_node, node_link, path, &self.1).await?;
449
450        // Update entry if needed
451        let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
452            Some(
453                Self::_get_node_from_blobs(
454                    &NodeLink::Dir(new_root_link.clone(), new_secret),
455                    &self.1,
456                )
457                .await?,
458            )
459        } else {
460            None
461        };
462
463        // Update inner state
464        {
465            let mut inner = self.0.lock().await;
466            // Track pins: data blob + all created node hashes
467            inner.pins.insert(hash);
468            inner.pins.extend(node_hashes);
469
470            if let Some(entry) = new_entry {
471                inner.entry = entry;
472            }
473
474            // Record the add operation in the ops log
475            let peer_id = inner.peer_id;
476            inner
477                .ops_log
478                .record(peer_id, OpType::Add, clean_path(path), Some(link), false);
479        }
480
481        Ok(())
482    }
483
484    pub async fn rm(&mut self, path: &Path) -> Result<(), MountError> {
485        let path = clean_path(path);
486        let parent_path = path
487            .parent()
488            .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot remove root")))?;
489
490        let entry = {
491            let inner = self.0.lock().await;
492            inner.entry.clone()
493        };
494
495        let mut parent_node = if parent_path == Path::new("") {
496            entry.clone()
497        } else {
498            Self::_get_node_at_path(&entry, parent_path, &self.1).await?
499        };
500
501        let file_name = path.file_name().unwrap().to_string_lossy().to_string();
502
503        // Get the node link before deleting to check if it's a directory
504        let removed_link = parent_node.del(&file_name);
505        if removed_link.is_none() {
506            return Err(MountError::PathNotFound(path.to_path_buf()));
507        }
508        let is_dir = removed_link.map(|l| l.is_dir()).unwrap_or(false);
509
510        // Store path for ops log before we lose ownership
511        let removed_path = path.to_path_buf();
512
513        if parent_path == Path::new("") {
514            let secret = Secret::generate();
515            let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
516
517            let mut inner = self.0.lock().await;
518            // Track the new root node hash
519            inner.pins.insert(link.hash());
520            inner.entry = parent_node;
521        } else {
522            // Save the modified parent node to blobs
523            let secret = Secret::generate();
524            let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
525            let node_link = NodeLink::new_dir(parent_link.clone(), secret);
526
527            // Convert parent_path back to absolute for _set_node_link_at_path
528            let abs_parent_path = Path::new("/").join(parent_path);
529            let (updated_link, node_hashes) =
530                Self::_set_node_link_at_path(entry, node_link, &abs_parent_path, &self.1).await?;
531
532            let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
533                Some(
534                    Self::_get_node_from_blobs(
535                        &NodeLink::Dir(new_root_link.clone(), new_secret),
536                        &self.1,
537                    )
538                    .await?,
539                )
540            } else {
541                None
542            };
543
544            let mut inner = self.0.lock().await;
545            // Track the parent node hash and all created node hashes
546            inner.pins.insert(parent_link.hash());
547            inner.pins.extend(node_hashes);
548
549            if let Some(new_entry) = new_entry {
550                inner.entry = new_entry;
551            }
552        }
553
554        // Record the remove operation in the ops log
555        {
556            let mut inner = self.0.lock().await;
557            let peer_id = inner.peer_id;
558            inner
559                .ops_log
560                .record(peer_id, OpType::Remove, removed_path, None, is_dir);
561        }
562
563        Ok(())
564    }
565
566    pub async fn mkdir(&mut self, path: &Path) -> Result<(), MountError> {
567        let path = clean_path(path);
568
569        // Check if the path already exists
570        let entry = {
571            let inner = self.0.lock().await;
572            inner.entry.clone()
573        };
574
575        // Try to get the parent path and file name
576        let (parent_path, dir_name) = if let Some(parent) = path.parent() {
577            (
578                parent,
579                path.file_name().unwrap().to_string_lossy().to_string(),
580            )
581        } else {
582            return Err(MountError::Default(anyhow::anyhow!("Cannot mkdir at root")));
583        };
584
585        // Get the parent node (or use root if parent is empty)
586        let parent_node = if parent_path == Path::new("") {
587            entry.clone()
588        } else {
589            // Check if parent path exists, if not it will be created by _set_node_link_at_path
590            match Self::_get_node_at_path(&entry, parent_path, &self.1).await {
591                Ok(node) => node,
592                Err(MountError::PathNotFound(_)) => Node::default(), // Will be created
593                Err(err) => return Err(err),
594            }
595        };
596
597        // Check if a node with this name already exists in the parent
598        if parent_node.get_link(&dir_name).is_some() {
599            return Err(MountError::PathAlreadyExists(Path::new("/").join(&path)));
600        }
601
602        // Create an empty directory node
603        let new_dir_node = Node::default();
604
605        // Generate a secret for the new directory
606        let secret = Secret::generate();
607
608        // Store the node in blobs
609        let dir_link = Self::_put_node_in_blobs(&new_dir_node, &secret, &self.1).await?;
610
611        // Create a NodeLink for the directory
612        let node_link = NodeLink::new_dir(dir_link.clone(), secret);
613
614        // Convert path back to absolute for _set_node_link_at_path
615        let abs_path = Path::new("/").join(&path);
616
617        // Use _set_node_link_at_path to insert the directory into the tree
618        let (updated_link, node_hashes) =
619            Self::_set_node_link_at_path(entry, node_link, &abs_path, &self.1).await?;
620
621        // Update entry if the root was modified
622        let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
623            Some(
624                Self::_get_node_from_blobs(
625                    &NodeLink::Dir(new_root_link.clone(), new_secret),
626                    &self.1,
627                )
628                .await?,
629            )
630        } else {
631            None
632        };
633
634        // Update inner state
635        {
636            let mut inner = self.0.lock().await;
637            // Track the directory node hash and all created node hashes
638            inner.pins.insert(dir_link.hash());
639            inner.pins.extend(node_hashes);
640
641            if let Some(new_entry) = new_entry {
642                inner.entry = new_entry;
643            }
644
645            // Record the mkdir operation in the ops log
646            let peer_id = inner.peer_id;
647            inner
648                .ops_log
649                .record(peer_id, OpType::Mkdir, path.to_path_buf(), None, true);
650        }
651
652        Ok(())
653    }
654
655    /// Move or rename a file or directory from one path to another.
656    ///
657    /// This operation:
658    /// 1. Validates that the move is legal (destination not inside source)
659    /// 2. Retrieves the node at the source path
660    /// 3. Removes it from the source location
661    /// 4. Inserts it at the destination location
662    ///
663    /// The node's content (files/subdirectories) is not re-encrypted during the move;
664    /// only the tree structure is updated. This makes moves efficient regardless of
665    /// the size of the subtree being moved.
666    ///
667    /// # Errors
668    ///
669    /// - `PathNotFound` - source path doesn't exist
670    /// - `PathAlreadyExists` - destination path already exists
671    /// - `MoveIntoSelf` - attempting to move a directory into itself (e.g., /foo -> /foo/bar)
672    /// - `Default` - attempting to move the root directory
673    pub async fn mv(&mut self, from: &Path, to: &Path) -> Result<(), MountError> {
674        // Convert absolute paths to relative paths for internal operations.
675        // The mount stores paths relative to root, so "/foo/bar" becomes "foo/bar".
676        let from_clean = clean_path(from);
677        let to_clean = clean_path(to);
678
679        // ============================================================
680        // VALIDATION: Prevent moving a directory into itself
681        // ============================================================
682        // This catches cases like:
683        //   - /foo -> /foo (same path, would delete then fail to insert)
684        //   - /foo -> /foo/bar (moving into subdirectory of itself)
685        //
686        // This is impossible in a filesystem sense - you can't put a box inside itself.
687        // We check this early to provide a clear error message and avoid corrupting
688        // the tree structure (the delete would succeed but the insert would fail).
689        if to.starts_with(from) {
690            return Err(MountError::MoveIntoSelf {
691                from: from.to_path_buf(),
692                to: to.to_path_buf(),
693            });
694        }
695
696        // ============================================================
697        // STEP 1: Retrieve the NodeLink at the source path
698        // ============================================================
699        // A NodeLink is a reference to either a file or directory. For directories,
700        // it contains the entire subtree. We'll reuse this same NodeLink at the
701        // destination, which means no re-encryption is needed for the content.
702        let node_link = self.get(from).await?;
703        let is_dir = node_link.is_dir();
704
705        // Store paths for ops log before any mutations
706        let from_path = from_clean.to_path_buf();
707        let to_path = to_clean.to_path_buf();
708
709        // ============================================================
710        // STEP 2: Verify destination doesn't already exist
711        // ============================================================
712        // Unlike Unix mv which can overwrite, we require the destination to be empty.
713        // This prevents accidental data loss.
714        if self.get(to).await.is_ok() {
715            return Err(MountError::PathAlreadyExists(to.to_path_buf()));
716        }
717
718        // ============================================================
719        // STEP 3: Remove the node from its source location
720        // ============================================================
721        // We need to update the parent directory to remove the reference to this node.
722        // This involves:
723        //   a) Finding the parent directory
724        //   b) Removing the child entry from it
725        //   c) Re-encrypting and saving the modified parent
726        //   d) Propagating changes up to the root (updating all ancestor nodes)
727        {
728            // Get the parent path (e.g., "foo" for "foo/bar")
729            let parent_path = from_clean
730                .parent()
731                .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot move root")))?;
732
733            // Get the current root entry node
734            let entry = {
735                let inner = self.0.lock().await;
736                inner.entry.clone()
737            };
738
739            // Load the parent node - either the root itself or a subdirectory
740            let mut parent_node = if parent_path == Path::new("") {
741                // Source is at root level (e.g., "/foo"), parent is root
742                entry.clone()
743            } else {
744                // Source is nested, need to traverse to find parent
745                Self::_get_node_at_path(&entry, parent_path, &self.1).await?
746            };
747
748            // Extract the filename component (e.g., "bar" from "foo/bar")
749            let file_name = from_clean
750                .file_name()
751                .expect(
752                    "from_clean has no filename - this should be impossible after parent() check",
753                )
754                .to_string_lossy()
755                .to_string();
756
757            // Remove the child from the parent's children map
758            if parent_node.del(&file_name).is_none() {
759                return Err(MountError::PathNotFound(from_clean.to_path_buf()));
760            }
761
762            // Now we need to persist the modified parent and update the tree
763            if parent_path == Path::new("") {
764                // Parent is root - just update root directly
765                let secret = Secret::generate();
766                let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
767
768                let mut inner = self.0.lock().await;
769                inner.pins.insert(link.hash());
770                inner.entry = parent_node;
771            } else {
772                // Parent is a subdirectory - need to propagate changes up the tree.
773                // This creates a new encrypted blob for the parent and updates
774                // all ancestor nodes to point to the new parent.
775                let secret = Secret::generate();
776                let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
777                let new_node_link = NodeLink::new_dir(parent_link.clone(), secret);
778
779                // Update the tree from root down to this parent
780                let abs_parent_path = Path::new("/").join(parent_path);
781                let (updated_root_link, node_hashes) =
782                    Self::_set_node_link_at_path(entry, new_node_link, &abs_parent_path, &self.1)
783                        .await?;
784
785                // Load the new root entry from the updated link.
786                // The root should always be a directory; if it's not, something is
787                // seriously wrong with the mount structure.
788                let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
789
790                // Update the mount's internal state with the new tree
791                let mut inner = self.0.lock().await;
792                inner.pins.insert(parent_link.hash());
793                inner.pins.extend(node_hashes);
794                inner.entry = new_entry;
795            }
796        }
797
798        // ============================================================
799        // STEP 4: Insert the node at the destination path
800        // ============================================================
801        // We reuse the same NodeLink from step 1. This means the actual file/directory
802        // content doesn't need to be re-encrypted - only the tree structure changes.
803        // This makes moves O(depth) rather than O(size of subtree).
804        let entry = {
805            let inner = self.0.lock().await;
806            inner.entry.clone()
807        };
808
809        let (updated_root_link, node_hashes) =
810            Self::_set_node_link_at_path(entry, node_link, to, &self.1).await?;
811
812        // ============================================================
813        // STEP 5: Update internal state with the final tree
814        // ============================================================
815        {
816            // Load the new root entry and update the mount
817            let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
818
819            let mut inner = self.0.lock().await;
820            inner.pins.extend(node_hashes);
821            inner.entry = new_entry;
822
823            // ============================================================
824            // STEP 6: Record mv operation in the ops log
825            // ============================================================
826            let peer_id = inner.peer_id;
827            inner.ops_log.record(
828                peer_id,
829                OpType::Mv { from: from_path },
830                to_path,
831                None,
832                is_dir,
833            );
834        }
835
836        Ok(())
837    }
838
839    pub async fn ls(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
840        let mut items = BTreeMap::new();
841        let path = clean_path(path);
842
843        let inner = self.0.lock().await;
844        let root_node = inner.entry.clone();
845        drop(inner);
846
847        let node = if path == Path::new("") {
848            root_node
849        } else {
850            match Self::_get_node_at_path(&root_node, &path, &self.1).await {
851                Ok(node) => node,
852                Err(MountError::LinkNotFound(_)) => {
853                    return Err(MountError::PathNotNode(path.to_path_buf()))
854                }
855                Err(err) => return Err(err),
856            }
857        };
858
859        for (name, link) in node.get_links() {
860            let mut full_path = path.clone();
861            full_path.push(name);
862            items.insert(full_path, link.clone());
863        }
864
865        Ok(items)
866    }
867
868    pub async fn ls_deep(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
869        let base_path = clean_path(path);
870        self._ls_deep(path, &base_path).await
871    }
872
873    async fn _ls_deep(
874        &self,
875        path: &Path,
876        base_path: &Path,
877    ) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
878        let mut all_items = BTreeMap::new();
879
880        // get the initial items at the given path
881        let items = self.ls(path).await?;
882
883        for (item_path, link) in items {
884            // Make path relative to the base_path
885            let relative_path = if base_path == Path::new("") {
886                item_path.clone()
887            } else {
888                item_path
889                    .strip_prefix(base_path)
890                    .unwrap_or(&item_path)
891                    .to_path_buf()
892            };
893            all_items.insert(relative_path.clone(), link.clone());
894
895            if link.is_dir() {
896                // Recurse using the absolute path
897                let abs_item_path = Path::new("/").join(&item_path);
898                let sub_items = Box::pin(self._ls_deep(&abs_item_path, base_path)).await?;
899
900                // Sub items already have correct relative paths from base_path
901                for (sub_path, sub_link) in sub_items {
902                    all_items.insert(sub_path, sub_link);
903                }
904            }
905        }
906
907        Ok(all_items)
908    }
909
910    #[allow(clippy::await_holding_lock)]
911    pub async fn cat(&self, path: &Path) -> Result<Vec<u8>, MountError> {
912        let path = clean_path(path);
913
914        let inner = self.0.lock().await;
915        let root_node = inner.entry.clone();
916        drop(inner);
917
918        let (parent_path, file_name) = if let Some(parent) = path.parent() {
919            (
920                parent,
921                path.file_name().unwrap().to_string_lossy().to_string(),
922            )
923        } else {
924            return Err(MountError::PathNotFound(path.to_path_buf()));
925        };
926
927        let parent_node = if parent_path == Path::new("") {
928            root_node
929        } else {
930            Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
931        };
932
933        let link = parent_node
934            .get_link(&file_name)
935            .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))?;
936
937        match link {
938            NodeLink::Data(link, secret, _) => {
939                let encrypted_data = self.1.get(&link.hash()).await?;
940                let data = secret.decrypt(&encrypted_data)?;
941                Ok(data)
942            }
943            NodeLink::Dir(_, _) => Err(MountError::PathNotNode(path.to_path_buf())),
944        }
945    }
946
947    /// Get the NodeLink for a file at a given path
948    #[allow(clippy::await_holding_lock)]
949    pub async fn get(&self, path: &Path) -> Result<NodeLink, MountError> {
950        let path = clean_path(path);
951
952        let inner = self.0.lock().await;
953        let root_node = inner.entry.clone();
954        drop(inner);
955
956        let (parent_path, file_name) = if let Some(parent) = path.parent() {
957            (
958                parent,
959                path.file_name().unwrap().to_string_lossy().to_string(),
960            )
961        } else {
962            return Err(MountError::PathNotFound(path.to_path_buf()));
963        };
964
965        let parent_node = if parent_path == Path::new("") {
966            root_node
967        } else {
968            Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
969        };
970
971        parent_node
972            .get_link(&file_name)
973            .cloned()
974            .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))
975    }
976
977    async fn _get_node_at_path(
978        node: &Node,
979        path: &Path,
980        blobs: &BlobsStore,
981    ) -> Result<Node, MountError> {
982        let mut current_node = node.clone();
983        let mut consumed_path = PathBuf::from("/");
984
985        for part in path.iter() {
986            consumed_path.push(part);
987            let next = part.to_string_lossy().to_string();
988            let next_link = current_node
989                .get_link(&next)
990                .ok_or(MountError::PathNotFound(consumed_path.clone()))?;
991            current_node = Self::_get_node_from_blobs(next_link, blobs).await?
992        }
993        Ok(current_node)
994    }
995
996    pub async fn _set_node_link_at_path(
997        node: Node,
998        node_link: NodeLink,
999        path: &Path,
1000        blobs: &BlobsStore,
1001    ) -> Result<(NodeLink, Vec<crate::linked_data::Hash>), MountError> {
1002        let path = clean_path(path);
1003        let mut visited_nodes = Vec::new();
1004        let mut name = path.file_name().unwrap().to_string_lossy().to_string();
1005        let parent_path = path.parent().unwrap_or(Path::new(""));
1006
1007        let mut consumed_path = PathBuf::from("/");
1008        let mut node = node;
1009        visited_nodes.push((consumed_path.clone(), node.clone()));
1010
1011        for part in parent_path.iter() {
1012            let next = part.to_string_lossy().to_string();
1013            let next_link = node.get_link(&next);
1014            if let Some(next_link) = next_link {
1015                consumed_path.push(part);
1016                match next_link {
1017                    NodeLink::Dir(..) => {
1018                        node = Self::_get_node_from_blobs(next_link, blobs).await?
1019                    }
1020                    NodeLink::Data(..) => {
1021                        return Err(MountError::PathNotNode(consumed_path.clone()));
1022                    }
1023                }
1024                visited_nodes.push((consumed_path.clone(), node.clone()));
1025            } else {
1026                // Create a new directory node
1027                node = Node::default();
1028                consumed_path.push(part);
1029                visited_nodes.push((consumed_path.clone(), node.clone()));
1030            }
1031        }
1032
1033        let mut node_link = node_link;
1034        let mut created_hashes = Vec::new();
1035        for (path, mut node) in visited_nodes.into_iter().rev() {
1036            node.insert(name, node_link.clone());
1037            let secret = Secret::generate();
1038            let link = Self::_put_node_in_blobs(&node, &secret, blobs).await?;
1039            created_hashes.push(link.hash());
1040            node_link = NodeLink::Dir(link, secret);
1041            name = path
1042                .file_name()
1043                .unwrap_or_default()
1044                .to_string_lossy()
1045                .to_string();
1046        }
1047
1048        Ok((node_link, created_hashes))
1049    }
1050
1051    async fn _get_manifest_from_blobs(
1052        link: &Link,
1053        blobs: &BlobsStore,
1054    ) -> Result<Manifest, MountError> {
1055        tracing::debug!(
1056            "_get_bucket_from_blobs: Checking for bucket data at link {:?}",
1057            link
1058        );
1059        let hash = link.hash();
1060        tracing::debug!("_get_bucket_from_blobs: Bucket hash: {}", hash);
1061
1062        match blobs.stat(&hash).await {
1063            Ok(true) => {
1064                tracing::debug!(
1065                    "_get_bucket_from_blobs: Bucket hash {} exists in blobs",
1066                    hash
1067                );
1068            }
1069            Ok(false) => {
1070                tracing::error!("_get_bucket_from_blobs: Bucket hash {} NOT FOUND in blobs - LinkNotFound error!", hash);
1071                return Err(MountError::LinkNotFound(link.clone()));
1072            }
1073            Err(e) => {
1074                tracing::error!(
1075                    "_get_bucket_from_blobs: Error checking bucket hash {}: {}",
1076                    hash,
1077                    e
1078                );
1079                return Err(e.into());
1080            }
1081        }
1082
1083        tracing::debug!("_get_bucket_from_blobs: Reading bucket data from blobs");
1084        let data = blobs.get(&hash).await?;
1085        tracing::debug!(
1086            "_get_bucket_from_blobs: Got {} bytes of bucket data",
1087            data.len()
1088        );
1089
1090        let bucket_data = Manifest::decode(&data)?;
1091        tracing::debug!(
1092            "_get_bucket_from_blobs: Successfully decoded BucketData for bucket '{}'",
1093            bucket_data.name()
1094        );
1095
1096        Ok(bucket_data)
1097    }
1098
1099    pub async fn _get_pins_from_blobs(link: &Link, blobs: &BlobsStore) -> Result<Pins, MountError> {
1100        tracing::debug!("_get_pins_from_blobs: Checking for pins at link {:?}", link);
1101        let hash = link.hash();
1102        tracing::debug!("_get_pins_from_blobs: Pins hash: {}", hash);
1103
1104        match blobs.stat(&hash).await {
1105            Ok(true) => {
1106                tracing::debug!("_get_pins_from_blobs: Pins hash {} exists in blobs", hash);
1107            }
1108            Ok(false) => {
1109                tracing::error!(
1110                    "_get_pins_from_blobs: Pins hash {} NOT FOUND in blobs - LinkNotFound error!",
1111                    hash
1112                );
1113                return Err(MountError::LinkNotFound(link.clone()));
1114            }
1115            Err(e) => {
1116                tracing::error!(
1117                    "_get_pins_from_blobs: Error checking pins hash {}: {}",
1118                    hash,
1119                    e
1120                );
1121                return Err(e.into());
1122            }
1123        }
1124
1125        tracing::debug!("_get_pins_from_blobs: Reading hash list from blobs");
1126        // Read hashes from the hash list blob
1127        let hashes = blobs.read_hash_list(hash).await?;
1128        tracing::debug!(
1129            "_get_pins_from_blobs: Successfully read {} hashes from pinset",
1130            hashes.len()
1131        );
1132
1133        Ok(Pins::from_vec(hashes))
1134    }
1135
1136    async fn _get_node_from_blobs(
1137        node_link: &NodeLink,
1138        blobs: &BlobsStore,
1139    ) -> Result<Node, MountError> {
1140        let link = node_link.link();
1141        let secret = node_link.secret();
1142        let hash = link.hash();
1143
1144        tracing::debug!("_get_node_from_blobs: Checking for node at hash {}", hash);
1145
1146        match blobs.stat(&hash).await {
1147            Ok(true) => {
1148                tracing::debug!("_get_node_from_blobs: Node hash {} exists in blobs", hash);
1149            }
1150            Ok(false) => {
1151                tracing::error!(
1152                    "_get_node_from_blobs: Node hash {} NOT FOUND in blobs - LinkNotFound error!",
1153                    hash
1154                );
1155                return Err(MountError::LinkNotFound(link.clone()));
1156            }
1157            Err(e) => {
1158                tracing::error!(
1159                    "_get_node_from_blobs: Error checking node hash {}: {}",
1160                    hash,
1161                    e
1162                );
1163                return Err(e.into());
1164            }
1165        }
1166
1167        tracing::debug!("_get_node_from_blobs: Reading encrypted node blob");
1168        let blob = blobs.get(&hash).await?;
1169        tracing::debug!(
1170            "_get_node_from_blobs: Got {} bytes of encrypted node data",
1171            blob.len()
1172        );
1173
1174        tracing::debug!("_get_node_from_blobs: Decrypting node data");
1175        let data = secret.decrypt(&blob)?;
1176        tracing::debug!("_get_node_from_blobs: Decrypted {} bytes", data.len());
1177
1178        let node = Node::decode(&data)?;
1179        tracing::debug!("_get_node_from_blobs: Successfully decoded Node");
1180
1181        Ok(node)
1182    }
1183
1184    // TODO (amiller68): you should inline a Link
1185    //  into the node when we store encrypt it,
1186    //  so that we have an integrity check
1187    async fn _put_node_in_blobs(
1188        node: &Node,
1189        secret: &Secret,
1190        blobs: &BlobsStore,
1191    ) -> Result<Link, MountError> {
1192        let _data = node.encode()?;
1193        let data = secret.encrypt(&_data)?;
1194        let hash = blobs.put(data).await?;
1195        // NOTE (amiller68): nodes are always stored as raw
1196        //  since they are encrypted blobs
1197        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1198        Ok(link)
1199    }
1200
1201    pub async fn _put_manifest_in_blobs(
1202        bucket_data: &Manifest,
1203        blobs: &BlobsStore,
1204    ) -> Result<Link, MountError> {
1205        let data = bucket_data.encode()?;
1206        let hash = blobs.put(data).await?;
1207        // NOTE (amiller68): buckets are unencrypted, so they can inherit
1208        //  the codec of the bucket itself (which is currently always cbor)
1209        let link = Link::new(bucket_data.codec(), hash);
1210        Ok(link)
1211    }
1212
1213    pub async fn _put_pins_in_blobs(pins: &Pins, blobs: &BlobsStore) -> Result<Link, MountError> {
1214        // Create a hash list blob from the pins (raw bytes: 32 bytes per hash, concatenated)
1215        let hash = blobs.create_hash_list(pins.iter().copied()).await?;
1216        // Pins are stored as raw blobs containing concatenated hashes
1217        // Note: The underlying blob is HashSeq format, but Link doesn't track this
1218        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1219        Ok(link)
1220    }
1221
1222    async fn _get_ops_log_from_blobs(
1223        link: &Link,
1224        secret: &Secret,
1225        blobs: &BlobsStore,
1226    ) -> Result<PathOpLog, MountError> {
1227        let hash = link.hash();
1228        tracing::debug!(
1229            "_get_ops_log_from_blobs: Checking for ops log at hash {}",
1230            hash
1231        );
1232
1233        match blobs.stat(&hash).await {
1234            Ok(true) => {
1235                tracing::debug!(
1236                    "_get_ops_log_from_blobs: Ops log hash {} exists in blobs",
1237                    hash
1238                );
1239            }
1240            Ok(false) => {
1241                tracing::error!(
1242                    "_get_ops_log_from_blobs: Ops log hash {} NOT FOUND in blobs - LinkNotFound error!",
1243                    hash
1244                );
1245                return Err(MountError::LinkNotFound(link.clone()));
1246            }
1247            Err(e) => {
1248                tracing::error!(
1249                    "_get_ops_log_from_blobs: Error checking ops log hash {}: {}",
1250                    hash,
1251                    e
1252                );
1253                return Err(e.into());
1254            }
1255        }
1256
1257        tracing::debug!("_get_ops_log_from_blobs: Reading encrypted ops log blob");
1258        let blob = blobs.get(&hash).await?;
1259        tracing::debug!(
1260            "_get_ops_log_from_blobs: Got {} bytes of encrypted ops log data",
1261            blob.len()
1262        );
1263
1264        tracing::debug!("_get_ops_log_from_blobs: Decrypting ops log data");
1265        let data = secret.decrypt(&blob)?;
1266        tracing::debug!("_get_ops_log_from_blobs: Decrypted {} bytes", data.len());
1267
1268        let ops_log = PathOpLog::decode(&data)?;
1269        tracing::debug!(
1270            "_get_ops_log_from_blobs: Successfully decoded PathOpLog with {} operations",
1271            ops_log.len()
1272        );
1273
1274        Ok(ops_log)
1275    }
1276
1277    async fn _put_ops_log_in_blobs(
1278        ops_log: &PathOpLog,
1279        secret: &Secret,
1280        blobs: &BlobsStore,
1281    ) -> Result<Link, MountError> {
1282        let _data = ops_log.encode()?;
1283        let data = secret.encrypt(&_data)?;
1284        let hash = blobs.put(data).await?;
1285        // Ops log is stored as an encrypted raw blob
1286        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1287        tracing::debug!(
1288            "_put_ops_log_in_blobs: Stored ops log with {} operations at hash {}",
1289            ops_log.len(),
1290            hash
1291        );
1292        Ok(link)
1293    }
1294
1295    /// Collect all ops from manifest chain back to (but not including) ancestor_link.
1296    ///
1297    /// Traverses the manifest chain starting from the current version, collecting
1298    /// ops_logs from each manifest until we reach the ancestor (or genesis).
1299    /// The collected ops are merged in chronological order.
1300    ///
1301    /// # Arguments
1302    ///
1303    /// * `ancestor_link` - Stop when reaching this link (not included). None means collect all
1304    ///   accessible ops (stops when we can't decrypt a manifest).
1305    /// * `blobs` - The blob store to read manifests from
1306    ///
1307    /// # Returns
1308    ///
1309    /// A combined PathOpLog containing all operations since the ancestor.
1310    pub async fn collect_ops_since(
1311        &self,
1312        ancestor_link: Option<&Link>,
1313        blobs: &BlobsStore,
1314    ) -> Result<PathOpLog, MountError> {
1315        let inner = self.0.lock().await;
1316        let secret_key = inner.secret_key.clone();
1317        let current_link = inner.link.clone();
1318        let current_ops = inner.ops_log.clone();
1319        drop(inner);
1320
1321        let mut all_logs: Vec<PathOpLog> = Vec::new();
1322
1323        // Start with any unsaved operations in the current mount
1324        if !current_ops.is_empty() {
1325            all_logs.push(current_ops);
1326        }
1327
1328        // Walk the chain from current manifest backwards
1329        let mut link = current_link;
1330
1331        loop {
1332            // Check if we've reached the ancestor
1333            if let Some(ancestor) = ancestor_link {
1334                if &link == ancestor {
1335                    break;
1336                }
1337            }
1338
1339            // Load the manifest at this link
1340            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1341
1342            // Get the secret for this manifest version
1343            // If we can't get the secret (e.g., we weren't a member at this point),
1344            // stop traversing - we can't read older ops anyway
1345            let secret = match self.get_secret_for_manifest(&manifest, &secret_key) {
1346                Ok(s) => s,
1347                Err(MountError::ShareNotFound) => {
1348                    tracing::debug!(
1349                        "collect_ops_since: stopping at link {} - no share for current user",
1350                        link.hash()
1351                    );
1352                    break;
1353                }
1354                Err(e) => return Err(e),
1355            };
1356
1357            // Load the ops_log if present
1358            if let Some(ops_link) = manifest.ops_log() {
1359                let mut ops_log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
1360                ops_log.rebuild_clock();
1361                all_logs.push(ops_log);
1362            }
1363
1364            // Move to previous manifest
1365            match manifest.previous() {
1366                Some(prev) => link = prev.clone(),
1367                None => break, // Reached genesis
1368            }
1369        }
1370
1371        // Merge all logs in chronological order (oldest first, so reverse)
1372        all_logs.reverse();
1373        let mut merged = PathOpLog::new();
1374        for log in all_logs {
1375            merged.merge(&log);
1376        }
1377
1378        Ok(merged)
1379    }
1380
1381    /// Get the decryption secret for a manifest.
1382    ///
1383    /// Decrypts the secret share using the provided secret key.
1384    #[allow(clippy::result_large_err)]
1385    fn get_secret_for_manifest(
1386        &self,
1387        manifest: &Manifest,
1388        secret_key: &SecretKey,
1389    ) -> Result<Secret, MountError> {
1390        let public_key = secret_key.public();
1391        let share = manifest
1392            .get_share(&public_key)
1393            .ok_or(MountError::ShareNotFound)?;
1394
1395        match share.role() {
1396            PrincipalRole::Owner => {
1397                let secret_share = share.share().ok_or(MountError::ShareNotFound)?;
1398                Ok(secret_share.recover(secret_key)?)
1399            }
1400            PrincipalRole::Mirror => manifest
1401                .public()
1402                .cloned()
1403                .ok_or(MountError::MirrorCannotMount),
1404        }
1405    }
1406
1407    /// Find the common ancestor between this mount's chain and another's.
1408    ///
1409    /// Walks both chains backwards via the `previous` links and returns
1410    /// the first link where both chains converge.
1411    ///
1412    /// # Arguments
1413    ///
1414    /// * `other` - The other mount to find common ancestor with
1415    /// * `blobs` - The blob store to read manifests from
1416    ///
1417    /// # Returns
1418    ///
1419    /// The common ancestor link, or None if chains never converge (different buckets).
1420    pub async fn find_common_ancestor(
1421        &self,
1422        other: &Mount,
1423        blobs: &BlobsStore,
1424    ) -> Result<Option<Link>, MountError> {
1425        // Build set of all links in self's chain
1426        let mut self_chain: std::collections::HashSet<Link> = std::collections::HashSet::new();
1427
1428        let self_link = self.link().await;
1429        let mut link = self_link;
1430
1431        loop {
1432            self_chain.insert(link.clone());
1433            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1434            match manifest.previous() {
1435                Some(prev) => link = prev.clone(),
1436                None => break,
1437            }
1438        }
1439
1440        // Walk other's chain and find first link in self's set
1441        let other_link = other.link().await;
1442        let mut link = other_link;
1443
1444        loop {
1445            if self_chain.contains(&link) {
1446                return Ok(Some(link));
1447            }
1448            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1449            match manifest.previous() {
1450                Some(prev) => link = prev.clone(),
1451                None => break,
1452            }
1453        }
1454
1455        // No common ancestor found
1456        Ok(None)
1457    }
1458
1459    /// Apply resolved operations to the entry tree.
1460    ///
1461    /// For each operation in the resolved state:
1462    /// - Files with content links are checked for existence (cannot recreate without secret)
1463    /// - Directories are created if they don't exist
1464    async fn apply_resolved_state(&mut self, merged_ops: &PathOpLog) -> Result<(), MountError> {
1465        let resolved_state = merged_ops.resolve_all();
1466
1467        for (path, op) in &resolved_state {
1468            if op.content_link.is_some() {
1469                // File operation - check if it exists in our tree
1470                let abs_path = Path::new("/").join(path);
1471                match self.get(&abs_path).await {
1472                    Ok(_) => {
1473                        // File exists, skip
1474                    }
1475                    Err(MountError::PathNotFound(_)) => {
1476                        // Can't recreate - ops_log stores Link but not decryption secret
1477                        tracing::warn!(
1478                            "apply_resolved_state: cannot recreate file {} - no secret",
1479                            path.display()
1480                        );
1481                        // Record in ops_log so it's tracked
1482                        let mut inner = self.0.lock().await;
1483                        inner.ops_log.merge(&PathOpLog::from_operation(op));
1484                    }
1485                    Err(e) => return Err(e),
1486                }
1487            } else if op.is_dir && matches!(op.op_type, super::path_ops::OpType::Mkdir) {
1488                // Directory operation - create if missing
1489                let abs_path = Path::new("/").join(path);
1490                match self.mkdir(&abs_path).await {
1491                    Ok(()) => {}
1492                    Err(MountError::PathAlreadyExists(_)) => {}
1493                    Err(e) => return Err(e),
1494                }
1495            }
1496        }
1497
1498        Ok(())
1499    }
1500
1501    /// Merge another mount's changes into this one using the given resolver.
1502    ///
1503    /// This method:
1504    /// 1. Finds the common ancestor between the two chains
1505    /// 2. Collects ops from both chains since that ancestor
1506    /// 3. Merges using the resolver
1507    /// 4. Applies the merged state
1508    /// 5. Saves the result as a new version
1509    ///
1510    /// # Arguments
1511    ///
1512    /// * `incoming` - The mount to merge changes from
1513    /// * `resolver` - The conflict resolution strategy to use
1514    /// * `blobs` - The blob store
1515    ///
1516    /// # Returns
1517    ///
1518    /// A MergeResult containing conflict information and the new version link.
1519    pub async fn merge_from<R: super::ConflictResolver>(
1520        &mut self,
1521        incoming: &Mount,
1522        resolver: &R,
1523        blobs: &BlobsStore,
1524    ) -> Result<(MergeResult, Link), MountError> {
1525        // Find the common ancestor
1526        let ancestor = self.find_common_ancestor(incoming, blobs).await?;
1527
1528        // Collect ops from both chains since the ancestor
1529        let local_ops = self.collect_ops_since(ancestor.as_ref(), blobs).await?;
1530        let incoming_ops = incoming.collect_ops_since(ancestor.as_ref(), blobs).await?;
1531
1532        // Get local peer ID for tie-breaking
1533        let peer_id = {
1534            let inner = self.0.lock().await;
1535            inner.peer_id
1536        };
1537
1538        // Merge the operations
1539        let mut merged_ops = local_ops.clone();
1540        let merge_result = merged_ops.merge_with_resolver(&incoming_ops, resolver, &peer_id);
1541
1542        // Apply merged state to the entry tree
1543        self.apply_resolved_state(&merged_ops).await?;
1544
1545        // Merge the ops_log to include all merged operations
1546        {
1547            let mut inner = self.0.lock().await;
1548            inner.ops_log.merge(&merged_ops);
1549        }
1550
1551        // Save the merged state
1552        let (link, _, _) = self.save(blobs, None).await?;
1553
1554        Ok((merge_result, link))
1555    }
1556}