Skip to main content

common/mount/
mount_inner.rs

1use std::collections::BTreeMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::crypto::{PublicKey, Secret, SecretError, SecretKey, SecretShare};
10use crate::linked_data::{BlockEncoded, CodecError, Link};
11use crate::peer::{BlobsStore, BlobsStoreError};
12
13use super::conflict::MergeResult;
14use super::manifest::{Manifest, ManifestError, Share};
15use super::node::{Node, NodeError, NodeLink};
16use super::path_ops::{OpType, PathOpLog};
17use super::pins::Pins;
18use super::principal::PrincipalRole;
19
20pub fn clean_path(path: &Path) -> PathBuf {
21    if !path.is_absolute() {
22        panic!("path is not absolute");
23    }
24    path.iter()
25        .skip(1)
26        .map(|part| part.to_string_lossy().to_string())
27        .collect::<PathBuf>()
28}
29
30#[derive(Clone)]
31pub struct MountInner {
32    // link to the manifest
33    pub link: Link,
34    // the loaded manifest
35    pub manifest: Manifest,
36    // the loaded, decrypted entry node
37    pub entry: Node,
38    // the loaded pins
39    pub pins: Pins,
40    // convenience pointer to the height of the mount
41    pub height: u64,
42    // the path operations log (CRDT for conflict resolution)
43    pub ops_log: PathOpLog,
44    // the local peer ID (for recording operations)
45    pub peer_id: PublicKey,
46    // the secret key for signing manifests
47    pub secret_key: SecretKey,
48}
49
50impl MountInner {
51    pub fn link(&self) -> &Link {
52        &self.link
53    }
54    pub fn entry(&self) -> &Node {
55        &self.entry
56    }
57    pub fn manifest(&self) -> &Manifest {
58        &self.manifest
59    }
60    pub fn pins(&self) -> &Pins {
61        &self.pins
62    }
63    pub fn height(&self) -> u64 {
64        self.height
65    }
66    pub fn ops_log(&self) -> &PathOpLog {
67        &self.ops_log
68    }
69    pub fn peer_id(&self) -> &PublicKey {
70        &self.peer_id
71    }
72}
73
74#[derive(Clone)]
75pub struct Mount(Arc<Mutex<MountInner>>, BlobsStore);
76
77#[derive(Debug, thiserror::Error)]
78pub enum MountError {
79    #[error("default error: {0}")]
80    Default(#[from] anyhow::Error),
81    #[error("link not found")]
82    LinkNotFound(Link),
83    #[error("path not found: {0}")]
84    PathNotFound(PathBuf),
85    #[error("path is not a node: {0}")]
86    PathNotNode(PathBuf),
87    #[error("path already exists: {0}")]
88    PathAlreadyExists(PathBuf),
89    #[error("cannot move '{from}' to '{to}': destination is inside source")]
90    MoveIntoSelf { from: PathBuf, to: PathBuf },
91    #[error("blobs store error: {0}")]
92    BlobsStore(#[from] BlobsStoreError),
93    #[error("secret error: {0}")]
94    Secret(#[from] SecretError),
95    #[error("node error: {0}")]
96    Node(#[from] NodeError),
97    #[error("codec error: {0}")]
98    Codec(#[from] CodecError),
99    #[error("share error: {0}")]
100    Share(#[from] crate::crypto::SecretShareError),
101    #[error("manifest error: {0}")]
102    Manifest(#[from] ManifestError),
103    #[error("peers share was not found")]
104    ShareNotFound,
105    #[error("mirror cannot mount: bucket is not published")]
106    MirrorCannotMount,
107    #[error("unauthorized: only owners can perform this operation")]
108    Unauthorized,
109}
110
111impl Mount {
112    pub async fn inner(&self) -> MountInner {
113        self.0.lock().await.clone()
114    }
115
116    pub fn blobs(&self) -> BlobsStore {
117        self.1.clone()
118    }
119
120    pub async fn link(&self) -> Link {
121        let inner = self.0.lock().await;
122        inner.link.clone()
123    }
124
125    /// Save the current mount state to the blobs store.
126    ///
127    /// If `publish` is true, the secret will be stored in plaintext, allowing
128    /// mirrors to decrypt the bucket contents.
129    pub async fn save(
130        &self,
131        blobs: &BlobsStore,
132        publish: bool,
133    ) -> Result<(Link, Link, u64), MountError> {
134        // Clone data we need before any async operations
135        let (
136            entry_node,
137            mut pins,
138            previous_link,
139            previous_height,
140            manifest_template,
141            ops_log,
142            secret_key,
143        ) = {
144            let inner = self.0.lock().await;
145            (
146                inner.entry.clone(),
147                inner.pins.clone(),
148                inner.link.clone(),
149                inner.height,
150                inner.manifest.clone(),
151                inner.ops_log.clone(),
152                inner.secret_key.clone(),
153            )
154        };
155
156        // Increment the height of the mount
157        let height = previous_height + 1;
158
159        // Create a new secret for the updated root
160        let secret = Secret::generate();
161
162        // Put the current root node into blobs with the new secret
163        let entry = Self::_put_node_in_blobs(&entry_node, &secret, blobs).await?;
164
165        // Serialize current pins to blobs
166        // put the new root link into the pins, as well as the previous link
167        pins.insert(entry.clone().hash());
168        pins.insert(previous_link.hash());
169
170        // Encrypt and store the ops log if it has any operations
171        let ops_log_link = if !ops_log.is_empty() {
172            let link = Self::_put_ops_log_in_blobs(&ops_log, &secret, blobs).await?;
173            pins.insert(link.hash());
174            Some(link)
175        } else {
176            None
177        };
178
179        let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
180
181        // Re-encrypt owner shares with the new secret (mirrors stay unchanged)
182        let mut manifest = manifest_template;
183        for share in manifest.shares_mut().values_mut() {
184            if *share.role() == PrincipalRole::Owner {
185                let secret_share = SecretShare::new(&secret, &share.principal().identity)?;
186                share.set_share(secret_share);
187            }
188        }
189
190        // Update publish state: publish with new secret, or clear stale public secret
191        if publish {
192            manifest.publish(&secret);
193        } else {
194            // Clear any existing public secret since it would be stale
195            // (encrypted with old secret, not the new one)
196            manifest.unpublish();
197        }
198        manifest.set_pins(pins_link.clone());
199        manifest.set_previous(previous_link.clone());
200        manifest.set_entry(entry.clone());
201        manifest.set_height(height);
202
203        // Clear inherited ops_log from the template, then set if we have new operations
204        // Each version's ops_log is independent and encrypted with that version's secret
205        manifest.clear_ops_log();
206        if let Some(ops_link) = ops_log_link {
207            manifest.set_ops_log(ops_link);
208        }
209
210        // Sign the manifest with the stored secret key
211        manifest.sign(&secret_key)?;
212
213        // Put the updated manifest into blobs to determine the new link
214        let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
215
216        // Update the internal state
217        {
218            let mut inner = self.0.lock().await;
219            inner.manifest = manifest;
220            inner.height = height;
221            inner.link = link.clone();
222            // Clear the ops_log - it's now persisted in the manifest
223            // Future operations start a fresh log for the next version
224            // IMPORTANT: Preserve the clock value so future ops have unique timestamps
225            inner.ops_log.clear_preserving_clock();
226        }
227
228        Ok((link, previous_link, height))
229    }
230
231    pub async fn init(
232        id: Uuid,
233        name: String,
234        owner: &SecretKey,
235        blobs: &BlobsStore,
236    ) -> Result<Self, MountError> {
237        // create a new root node for the bucket
238        let entry = Node::default();
239        // create a new secret for the owner
240        let secret = Secret::generate();
241        // put the node in the blobs store for the secret
242        let entry_link = Self::_put_node_in_blobs(&entry, &secret, blobs).await?;
243        // share the secret with the owner
244        let share = SecretShare::new(&secret, &owner.public())?;
245        // Initialize pins with root node hash
246        let mut pins = Pins::new();
247        pins.insert(entry_link.hash());
248        // Put the pins in blobs to get a pins link
249        let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
250        // construct the new manifest
251        let mut manifest = Manifest::new(
252            id,
253            name.clone(),
254            owner.public(),
255            share,
256            entry_link.clone(),
257            pins_link.clone(),
258            0, // initial height is 0
259        );
260        // Sign the manifest with the owner's key
261        manifest.sign(owner)?;
262        let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
263
264        // return the new mount
265        Ok(Mount(
266            Arc::new(Mutex::new(MountInner {
267                link,
268                manifest,
269                entry,
270                pins,
271                height: 0,
272                ops_log: PathOpLog::new(),
273                peer_id: owner.public(),
274                secret_key: owner.clone(),
275            })),
276            blobs.clone(),
277        ))
278    }
279
280    pub async fn load(
281        link: &Link,
282        secret_key: &SecretKey,
283        blobs: &BlobsStore,
284    ) -> Result<Self, MountError> {
285        let public_key = &secret_key.public();
286        let manifest = Self::_get_manifest_from_blobs(link, blobs).await?;
287
288        let bucket_share = match manifest.get_share(public_key) {
289            Some(share) => share,
290            None => return Err(MountError::ShareNotFound),
291        };
292
293        // Get the secret based on role
294        let secret = match bucket_share.role() {
295            PrincipalRole::Owner => {
296                // Owners decrypt their individual share
297                let share = bucket_share.share().ok_or(MountError::ShareNotFound)?;
298                share.recover(secret_key)?
299            }
300            PrincipalRole::Mirror => {
301                // Mirrors use the public secret (if bucket is published)
302                manifest
303                    .public()
304                    .cloned()
305                    .ok_or(MountError::MirrorCannotMount)?
306            }
307        };
308
309        let pins = Self::_get_pins_from_blobs(manifest.pins(), blobs).await?;
310        let entry = Self::_get_node_from_blobs(
311            &NodeLink::Dir(manifest.entry().clone(), secret.clone()),
312            blobs,
313        )
314        .await?;
315
316        // Read height from the manifest
317        let height = manifest.height();
318
319        // Load the ops log if it exists, otherwise create a new one
320        let ops_log = if let Some(ops_link) = manifest.ops_log() {
321            let mut log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
322            // Rebuild local clock from operations after deserialization
323            log.rebuild_clock();
324            log
325        } else {
326            PathOpLog::new()
327        };
328
329        Ok(Mount(
330            Arc::new(Mutex::new(MountInner {
331                link: link.clone(),
332                manifest,
333                entry,
334                pins,
335                height,
336                ops_log,
337                peer_id: secret_key.public(),
338                secret_key: secret_key.clone(),
339            })),
340            blobs.clone(),
341        ))
342    }
343
344    /// Load just the manifest from a link without full mount decryption.
345    ///
346    /// This is useful for checking roles/shares before deciding which version to load.
347    /// Returns the manifest if the blob exists, without attempting decryption.
348    pub async fn load_manifest(link: &Link, blobs: &BlobsStore) -> Result<Manifest, MountError> {
349        Self::_get_manifest_from_blobs(link, blobs).await
350    }
351
352    /// Add an owner to this bucket.
353    /// Owners get an encrypted share immediately.
354    pub async fn add_owner(&mut self, peer: PublicKey) -> Result<(), MountError> {
355        let mut inner = self.0.lock().await;
356        let secret_share = SecretShare::new(&Secret::default(), &peer)?;
357        inner
358            .manifest
359            .add_share(Share::new_owner(secret_share, peer));
360        Ok(())
361    }
362
363    /// Add a mirror to this bucket.
364    /// Mirrors can sync bucket data but cannot decrypt until published.
365    pub async fn add_mirror(&mut self, peer: PublicKey) {
366        let mut inner = self.0.lock().await;
367        inner.manifest.add_share(Share::new_mirror(peer));
368    }
369
370    /// Remove a share from the bucket.
371    ///
372    /// Only owners can remove shares. Returns an error if the caller is not an owner
373    /// or if the specified peer is not in the shares.
374    pub async fn remove_share(&self, peer_public_key: PublicKey) -> Result<(), MountError> {
375        let mut inner = self.0.lock().await;
376
377        // Verify caller is an owner
378        let our_key = inner.secret_key.public();
379        let our_share = inner
380            .manifest
381            .get_share(&our_key)
382            .ok_or(MountError::ShareNotFound)?;
383        if *our_share.role() != PrincipalRole::Owner {
384            return Err(MountError::Unauthorized);
385        }
386
387        // Verify the target share exists
388        let key_hex = peer_public_key.to_hex();
389        if inner.manifest.shares_mut().remove(&key_hex).is_none() {
390            return Err(MountError::ShareNotFound);
391        }
392
393        Ok(())
394    }
395
396    /// Check if this bucket is published (mirrors can decrypt).
397    pub async fn is_published(&self) -> bool {
398        let inner = self.0.lock().await;
399        inner.manifest.is_published()
400    }
401
402    /// Save and publish this bucket, granting decryption access to all mirrors.
403    ///
404    /// This is a convenience method equivalent to `save(blobs, true)`.
405    pub async fn publish(&self) -> Result<(Link, Link, u64), MountError> {
406        self.save(&self.1, true).await
407    }
408
409    pub async fn add<R>(&mut self, path: &Path, data: R) -> Result<(), MountError>
410    where
411        R: Read + Send + Sync + 'static + Unpin,
412    {
413        let secret = Secret::generate();
414
415        let encrypted_reader = secret.encrypt_reader(data)?;
416
417        // TODO (amiller68): this is incredibly dumb
418        use bytes::Bytes;
419        use futures::stream;
420        let encrypted_bytes = {
421            let mut buf = Vec::new();
422            let mut reader = encrypted_reader;
423            reader.read_to_end(&mut buf).map_err(SecretError::Io)?;
424            buf
425        };
426
427        let stream = Box::pin(stream::once(async move {
428            Ok::<_, std::io::Error>(Bytes::from(encrypted_bytes))
429        }));
430
431        let hash = self.1.put_stream(stream).await?;
432
433        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
434
435        let node_link = NodeLink::new_data_from_path(link.clone(), secret, path);
436
437        let root_node = {
438            let inner = self.0.lock().await;
439            inner.entry.clone()
440        };
441
442        let (updated_link, node_hashes) =
443            Self::_set_node_link_at_path(root_node, node_link, path, &self.1).await?;
444
445        // Update entry if needed
446        let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
447            Some(
448                Self::_get_node_from_blobs(
449                    &NodeLink::Dir(new_root_link.clone(), new_secret),
450                    &self.1,
451                )
452                .await?,
453            )
454        } else {
455            None
456        };
457
458        // Update inner state
459        {
460            let mut inner = self.0.lock().await;
461            // Track pins: data blob + all created node hashes
462            inner.pins.insert(hash);
463            inner.pins.extend(node_hashes);
464
465            if let Some(entry) = new_entry {
466                inner.entry = entry;
467            }
468
469            // Record the add operation in the ops log
470            let peer_id = inner.peer_id;
471            inner
472                .ops_log
473                .record(peer_id, OpType::Add, clean_path(path), Some(link), false);
474        }
475
476        Ok(())
477    }
478
479    pub async fn rm(&mut self, path: &Path) -> Result<(), MountError> {
480        let path = clean_path(path);
481        let parent_path = path
482            .parent()
483            .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot remove root")))?;
484
485        let entry = {
486            let inner = self.0.lock().await;
487            inner.entry.clone()
488        };
489
490        let mut parent_node = if parent_path == Path::new("") {
491            entry.clone()
492        } else {
493            Self::_get_node_at_path(&entry, parent_path, &self.1).await?
494        };
495
496        let file_name = path.file_name().unwrap().to_string_lossy().to_string();
497
498        // Get the node link before deleting to check if it's a directory
499        let removed_link = parent_node.del(&file_name);
500        if removed_link.is_none() {
501            return Err(MountError::PathNotFound(path.to_path_buf()));
502        }
503        let is_dir = removed_link.map(|l| l.is_dir()).unwrap_or(false);
504
505        // Store path for ops log before we lose ownership
506        let removed_path = path.to_path_buf();
507
508        if parent_path == Path::new("") {
509            let secret = Secret::generate();
510            let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
511
512            let mut inner = self.0.lock().await;
513            // Track the new root node hash
514            inner.pins.insert(link.hash());
515            inner.entry = parent_node;
516        } else {
517            // Save the modified parent node to blobs
518            let secret = Secret::generate();
519            let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
520            let node_link = NodeLink::new_dir(parent_link.clone(), secret);
521
522            // Convert parent_path back to absolute for _set_node_link_at_path
523            let abs_parent_path = Path::new("/").join(parent_path);
524            let (updated_link, node_hashes) =
525                Self::_set_node_link_at_path(entry, node_link, &abs_parent_path, &self.1).await?;
526
527            let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
528                Some(
529                    Self::_get_node_from_blobs(
530                        &NodeLink::Dir(new_root_link.clone(), new_secret),
531                        &self.1,
532                    )
533                    .await?,
534                )
535            } else {
536                None
537            };
538
539            let mut inner = self.0.lock().await;
540            // Track the parent node hash and all created node hashes
541            inner.pins.insert(parent_link.hash());
542            inner.pins.extend(node_hashes);
543
544            if let Some(new_entry) = new_entry {
545                inner.entry = new_entry;
546            }
547        }
548
549        // Record the remove operation in the ops log
550        {
551            let mut inner = self.0.lock().await;
552            let peer_id = inner.peer_id;
553            inner
554                .ops_log
555                .record(peer_id, OpType::Remove, removed_path, None, is_dir);
556        }
557
558        Ok(())
559    }
560
561    pub async fn mkdir(&mut self, path: &Path) -> Result<(), MountError> {
562        let path = clean_path(path);
563
564        // Check if the path already exists
565        let entry = {
566            let inner = self.0.lock().await;
567            inner.entry.clone()
568        };
569
570        // Try to get the parent path and file name
571        let (parent_path, dir_name) = if let Some(parent) = path.parent() {
572            (
573                parent,
574                path.file_name().unwrap().to_string_lossy().to_string(),
575            )
576        } else {
577            return Err(MountError::Default(anyhow::anyhow!("Cannot mkdir at root")));
578        };
579
580        // Get the parent node (or use root if parent is empty)
581        let parent_node = if parent_path == Path::new("") {
582            entry.clone()
583        } else {
584            // Check if parent path exists, if not it will be created by _set_node_link_at_path
585            match Self::_get_node_at_path(&entry, parent_path, &self.1).await {
586                Ok(node) => node,
587                Err(MountError::PathNotFound(_)) => Node::default(), // Will be created
588                Err(err) => return Err(err),
589            }
590        };
591
592        // Check if a node with this name already exists in the parent
593        if parent_node.get_link(&dir_name).is_some() {
594            return Err(MountError::PathAlreadyExists(Path::new("/").join(&path)));
595        }
596
597        // Create an empty directory node
598        let new_dir_node = Node::default();
599
600        // Generate a secret for the new directory
601        let secret = Secret::generate();
602
603        // Store the node in blobs
604        let dir_link = Self::_put_node_in_blobs(&new_dir_node, &secret, &self.1).await?;
605
606        // Create a NodeLink for the directory
607        let node_link = NodeLink::new_dir(dir_link.clone(), secret);
608
609        // Convert path back to absolute for _set_node_link_at_path
610        let abs_path = Path::new("/").join(&path);
611
612        // Use _set_node_link_at_path to insert the directory into the tree
613        let (updated_link, node_hashes) =
614            Self::_set_node_link_at_path(entry, node_link, &abs_path, &self.1).await?;
615
616        // Update entry if the root was modified
617        let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
618            Some(
619                Self::_get_node_from_blobs(
620                    &NodeLink::Dir(new_root_link.clone(), new_secret),
621                    &self.1,
622                )
623                .await?,
624            )
625        } else {
626            None
627        };
628
629        // Update inner state
630        {
631            let mut inner = self.0.lock().await;
632            // Track the directory node hash and all created node hashes
633            inner.pins.insert(dir_link.hash());
634            inner.pins.extend(node_hashes);
635
636            if let Some(new_entry) = new_entry {
637                inner.entry = new_entry;
638            }
639
640            // Record the mkdir operation in the ops log
641            let peer_id = inner.peer_id;
642            inner
643                .ops_log
644                .record(peer_id, OpType::Mkdir, path.to_path_buf(), None, true);
645        }
646
647        Ok(())
648    }
649
650    /// Move or rename a file or directory from one path to another.
651    ///
652    /// This operation:
653    /// 1. Validates that the move is legal (destination not inside source)
654    /// 2. Retrieves the node at the source path
655    /// 3. Removes it from the source location
656    /// 4. Inserts it at the destination location
657    ///
658    /// The node's content (files/subdirectories) is not re-encrypted during the move;
659    /// only the tree structure is updated. This makes moves efficient regardless of
660    /// the size of the subtree being moved.
661    ///
662    /// # Errors
663    ///
664    /// - `PathNotFound` - source path doesn't exist
665    /// - `PathAlreadyExists` - destination path already exists
666    /// - `MoveIntoSelf` - attempting to move a directory into itself (e.g., /foo -> /foo/bar)
667    /// - `Default` - attempting to move the root directory
668    pub async fn mv(&mut self, from: &Path, to: &Path) -> Result<(), MountError> {
669        // Convert absolute paths to relative paths for internal operations.
670        // The mount stores paths relative to root, so "/foo/bar" becomes "foo/bar".
671        let from_clean = clean_path(from);
672        let to_clean = clean_path(to);
673
674        // ============================================================
675        // VALIDATION: Prevent moving a directory into itself
676        // ============================================================
677        // This catches cases like:
678        //   - /foo -> /foo (same path, would delete then fail to insert)
679        //   - /foo -> /foo/bar (moving into subdirectory of itself)
680        //
681        // This is impossible in a filesystem sense - you can't put a box inside itself.
682        // We check this early to provide a clear error message and avoid corrupting
683        // the tree structure (the delete would succeed but the insert would fail).
684        if to.starts_with(from) {
685            return Err(MountError::MoveIntoSelf {
686                from: from.to_path_buf(),
687                to: to.to_path_buf(),
688            });
689        }
690
691        // ============================================================
692        // STEP 1: Retrieve the NodeLink at the source path
693        // ============================================================
694        // A NodeLink is a reference to either a file or directory. For directories,
695        // it contains the entire subtree. We'll reuse this same NodeLink at the
696        // destination, which means no re-encryption is needed for the content.
697        let node_link = self.get(from).await?;
698        let is_dir = node_link.is_dir();
699
700        // Store paths for ops log before any mutations
701        let from_path = from_clean.to_path_buf();
702        let to_path = to_clean.to_path_buf();
703
704        // ============================================================
705        // STEP 2: Verify destination doesn't already exist
706        // ============================================================
707        // Unlike Unix mv which can overwrite, we require the destination to be empty.
708        // This prevents accidental data loss.
709        if self.get(to).await.is_ok() {
710            return Err(MountError::PathAlreadyExists(to.to_path_buf()));
711        }
712
713        // ============================================================
714        // STEP 3: Remove the node from its source location
715        // ============================================================
716        // We need to update the parent directory to remove the reference to this node.
717        // This involves:
718        //   a) Finding the parent directory
719        //   b) Removing the child entry from it
720        //   c) Re-encrypting and saving the modified parent
721        //   d) Propagating changes up to the root (updating all ancestor nodes)
722        {
723            // Get the parent path (e.g., "foo" for "foo/bar")
724            let parent_path = from_clean
725                .parent()
726                .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot move root")))?;
727
728            // Get the current root entry node
729            let entry = {
730                let inner = self.0.lock().await;
731                inner.entry.clone()
732            };
733
734            // Load the parent node - either the root itself or a subdirectory
735            let mut parent_node = if parent_path == Path::new("") {
736                // Source is at root level (e.g., "/foo"), parent is root
737                entry.clone()
738            } else {
739                // Source is nested, need to traverse to find parent
740                Self::_get_node_at_path(&entry, parent_path, &self.1).await?
741            };
742
743            // Extract the filename component (e.g., "bar" from "foo/bar")
744            let file_name = from_clean
745                .file_name()
746                .expect(
747                    "from_clean has no filename - this should be impossible after parent() check",
748                )
749                .to_string_lossy()
750                .to_string();
751
752            // Remove the child from the parent's children map
753            if parent_node.del(&file_name).is_none() {
754                return Err(MountError::PathNotFound(from_clean.to_path_buf()));
755            }
756
757            // Now we need to persist the modified parent and update the tree
758            if parent_path == Path::new("") {
759                // Parent is root - just update root directly
760                let secret = Secret::generate();
761                let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
762
763                let mut inner = self.0.lock().await;
764                inner.pins.insert(link.hash());
765                inner.entry = parent_node;
766            } else {
767                // Parent is a subdirectory - need to propagate changes up the tree.
768                // This creates a new encrypted blob for the parent and updates
769                // all ancestor nodes to point to the new parent.
770                let secret = Secret::generate();
771                let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
772                let new_node_link = NodeLink::new_dir(parent_link.clone(), secret);
773
774                // Update the tree from root down to this parent
775                let abs_parent_path = Path::new("/").join(parent_path);
776                let (updated_root_link, node_hashes) =
777                    Self::_set_node_link_at_path(entry, new_node_link, &abs_parent_path, &self.1)
778                        .await?;
779
780                // Load the new root entry from the updated link.
781                // The root should always be a directory; if it's not, something is
782                // seriously wrong with the mount structure.
783                let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
784
785                // Update the mount's internal state with the new tree
786                let mut inner = self.0.lock().await;
787                inner.pins.insert(parent_link.hash());
788                inner.pins.extend(node_hashes);
789                inner.entry = new_entry;
790            }
791        }
792
793        // ============================================================
794        // STEP 4: Insert the node at the destination path
795        // ============================================================
796        // We reuse the same NodeLink from step 1. This means the actual file/directory
797        // content doesn't need to be re-encrypted - only the tree structure changes.
798        // This makes moves O(depth) rather than O(size of subtree).
799        let entry = {
800            let inner = self.0.lock().await;
801            inner.entry.clone()
802        };
803
804        let (updated_root_link, node_hashes) =
805            Self::_set_node_link_at_path(entry, node_link, to, &self.1).await?;
806
807        // ============================================================
808        // STEP 5: Update internal state with the final tree
809        // ============================================================
810        {
811            // Load the new root entry and update the mount
812            let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
813
814            let mut inner = self.0.lock().await;
815            inner.pins.extend(node_hashes);
816            inner.entry = new_entry;
817
818            // ============================================================
819            // STEP 6: Record mv operation in the ops log
820            // ============================================================
821            let peer_id = inner.peer_id;
822            inner.ops_log.record(
823                peer_id,
824                OpType::Mv { from: from_path },
825                to_path,
826                None,
827                is_dir,
828            );
829        }
830
831        Ok(())
832    }
833
834    pub async fn ls(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
835        let mut items = BTreeMap::new();
836        let path = clean_path(path);
837
838        let inner = self.0.lock().await;
839        let root_node = inner.entry.clone();
840        drop(inner);
841
842        let node = if path == Path::new("") {
843            root_node
844        } else {
845            match Self::_get_node_at_path(&root_node, &path, &self.1).await {
846                Ok(node) => node,
847                Err(MountError::LinkNotFound(_)) => {
848                    return Err(MountError::PathNotNode(path.to_path_buf()))
849                }
850                Err(err) => return Err(err),
851            }
852        };
853
854        for (name, link) in node.get_links() {
855            let mut full_path = path.clone();
856            full_path.push(name);
857            items.insert(full_path, link.clone());
858        }
859
860        Ok(items)
861    }
862
863    pub async fn ls_deep(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
864        let base_path = clean_path(path);
865        self._ls_deep(path, &base_path).await
866    }
867
868    async fn _ls_deep(
869        &self,
870        path: &Path,
871        base_path: &Path,
872    ) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
873        let mut all_items = BTreeMap::new();
874
875        // get the initial items at the given path
876        let items = self.ls(path).await?;
877
878        for (item_path, link) in items {
879            // Make path relative to the base_path
880            let relative_path = if base_path == Path::new("") {
881                item_path.clone()
882            } else {
883                item_path
884                    .strip_prefix(base_path)
885                    .unwrap_or(&item_path)
886                    .to_path_buf()
887            };
888            all_items.insert(relative_path.clone(), link.clone());
889
890            if link.is_dir() {
891                // Recurse using the absolute path
892                let abs_item_path = Path::new("/").join(&item_path);
893                let sub_items = Box::pin(self._ls_deep(&abs_item_path, base_path)).await?;
894
895                // Sub items already have correct relative paths from base_path
896                for (sub_path, sub_link) in sub_items {
897                    all_items.insert(sub_path, sub_link);
898                }
899            }
900        }
901
902        Ok(all_items)
903    }
904
905    #[allow(clippy::await_holding_lock)]
906    pub async fn cat(&self, path: &Path) -> Result<Vec<u8>, MountError> {
907        let path = clean_path(path);
908
909        let inner = self.0.lock().await;
910        let root_node = inner.entry.clone();
911        drop(inner);
912
913        let (parent_path, file_name) = if let Some(parent) = path.parent() {
914            (
915                parent,
916                path.file_name().unwrap().to_string_lossy().to_string(),
917            )
918        } else {
919            return Err(MountError::PathNotFound(path.to_path_buf()));
920        };
921
922        let parent_node = if parent_path == Path::new("") {
923            root_node
924        } else {
925            Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
926        };
927
928        let link = parent_node
929            .get_link(&file_name)
930            .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))?;
931
932        match link {
933            NodeLink::Data(link, secret, _) => {
934                let encrypted_data = self.1.get(&link.hash()).await?;
935                let data = secret.decrypt(&encrypted_data)?;
936                Ok(data)
937            }
938            NodeLink::Dir(_, _) => Err(MountError::PathNotNode(path.to_path_buf())),
939        }
940    }
941
942    /// Get the NodeLink for a file at a given path
943    #[allow(clippy::await_holding_lock)]
944    pub async fn get(&self, path: &Path) -> Result<NodeLink, MountError> {
945        let path = clean_path(path);
946
947        let inner = self.0.lock().await;
948        let root_node = inner.entry.clone();
949        drop(inner);
950
951        let (parent_path, file_name) = if let Some(parent) = path.parent() {
952            (
953                parent,
954                path.file_name().unwrap().to_string_lossy().to_string(),
955            )
956        } else {
957            return Err(MountError::PathNotFound(path.to_path_buf()));
958        };
959
960        let parent_node = if parent_path == Path::new("") {
961            root_node
962        } else {
963            Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
964        };
965
966        parent_node
967            .get_link(&file_name)
968            .cloned()
969            .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))
970    }
971
972    async fn _get_node_at_path(
973        node: &Node,
974        path: &Path,
975        blobs: &BlobsStore,
976    ) -> Result<Node, MountError> {
977        let mut current_node = node.clone();
978        let mut consumed_path = PathBuf::from("/");
979
980        for part in path.iter() {
981            consumed_path.push(part);
982            let next = part.to_string_lossy().to_string();
983            let next_link = current_node
984                .get_link(&next)
985                .ok_or(MountError::PathNotFound(consumed_path.clone()))?;
986            current_node = Self::_get_node_from_blobs(next_link, blobs).await?
987        }
988        Ok(current_node)
989    }
990
991    pub async fn _set_node_link_at_path(
992        node: Node,
993        node_link: NodeLink,
994        path: &Path,
995        blobs: &BlobsStore,
996    ) -> Result<(NodeLink, Vec<crate::linked_data::Hash>), MountError> {
997        let path = clean_path(path);
998        let mut visited_nodes = Vec::new();
999        let mut name = path.file_name().unwrap().to_string_lossy().to_string();
1000        let parent_path = path.parent().unwrap_or(Path::new(""));
1001
1002        let mut consumed_path = PathBuf::from("/");
1003        let mut node = node;
1004        visited_nodes.push((consumed_path.clone(), node.clone()));
1005
1006        for part in parent_path.iter() {
1007            let next = part.to_string_lossy().to_string();
1008            let next_link = node.get_link(&next);
1009            if let Some(next_link) = next_link {
1010                consumed_path.push(part);
1011                match next_link {
1012                    NodeLink::Dir(..) => {
1013                        node = Self::_get_node_from_blobs(next_link, blobs).await?
1014                    }
1015                    NodeLink::Data(..) => {
1016                        return Err(MountError::PathNotNode(consumed_path.clone()));
1017                    }
1018                }
1019                visited_nodes.push((consumed_path.clone(), node.clone()));
1020            } else {
1021                // Create a new directory node
1022                node = Node::default();
1023                consumed_path.push(part);
1024                visited_nodes.push((consumed_path.clone(), node.clone()));
1025            }
1026        }
1027
1028        let mut node_link = node_link;
1029        let mut created_hashes = Vec::new();
1030        for (path, mut node) in visited_nodes.into_iter().rev() {
1031            node.insert(name, node_link.clone());
1032            let secret = Secret::generate();
1033            let link = Self::_put_node_in_blobs(&node, &secret, blobs).await?;
1034            created_hashes.push(link.hash());
1035            node_link = NodeLink::Dir(link, secret);
1036            name = path
1037                .file_name()
1038                .unwrap_or_default()
1039                .to_string_lossy()
1040                .to_string();
1041        }
1042
1043        Ok((node_link, created_hashes))
1044    }
1045
1046    async fn _get_manifest_from_blobs(
1047        link: &Link,
1048        blobs: &BlobsStore,
1049    ) -> Result<Manifest, MountError> {
1050        tracing::debug!(
1051            "_get_bucket_from_blobs: Checking for bucket data at link {:?}",
1052            link
1053        );
1054        let hash = link.hash();
1055        tracing::debug!("_get_bucket_from_blobs: Bucket hash: {}", hash);
1056
1057        match blobs.stat(&hash).await {
1058            Ok(true) => {
1059                tracing::debug!(
1060                    "_get_bucket_from_blobs: Bucket hash {} exists in blobs",
1061                    hash
1062                );
1063            }
1064            Ok(false) => {
1065                tracing::error!("_get_bucket_from_blobs: Bucket hash {} NOT FOUND in blobs - LinkNotFound error!", hash);
1066                return Err(MountError::LinkNotFound(link.clone()));
1067            }
1068            Err(e) => {
1069                tracing::error!(
1070                    "_get_bucket_from_blobs: Error checking bucket hash {}: {}",
1071                    hash,
1072                    e
1073                );
1074                return Err(e.into());
1075            }
1076        }
1077
1078        tracing::debug!("_get_bucket_from_blobs: Reading bucket data from blobs");
1079        let data = blobs.get(&hash).await?;
1080        tracing::debug!(
1081            "_get_bucket_from_blobs: Got {} bytes of bucket data",
1082            data.len()
1083        );
1084
1085        let bucket_data = Manifest::decode(&data)?;
1086        tracing::debug!(
1087            "_get_bucket_from_blobs: Successfully decoded BucketData for bucket '{}'",
1088            bucket_data.name()
1089        );
1090
1091        Ok(bucket_data)
1092    }
1093
1094    pub async fn _get_pins_from_blobs(link: &Link, blobs: &BlobsStore) -> Result<Pins, MountError> {
1095        tracing::debug!("_get_pins_from_blobs: Checking for pins at link {:?}", link);
1096        let hash = link.hash();
1097        tracing::debug!("_get_pins_from_blobs: Pins hash: {}", hash);
1098
1099        match blobs.stat(&hash).await {
1100            Ok(true) => {
1101                tracing::debug!("_get_pins_from_blobs: Pins hash {} exists in blobs", hash);
1102            }
1103            Ok(false) => {
1104                tracing::error!(
1105                    "_get_pins_from_blobs: Pins hash {} NOT FOUND in blobs - LinkNotFound error!",
1106                    hash
1107                );
1108                return Err(MountError::LinkNotFound(link.clone()));
1109            }
1110            Err(e) => {
1111                tracing::error!(
1112                    "_get_pins_from_blobs: Error checking pins hash {}: {}",
1113                    hash,
1114                    e
1115                );
1116                return Err(e.into());
1117            }
1118        }
1119
1120        tracing::debug!("_get_pins_from_blobs: Reading hash list from blobs");
1121        // Read hashes from the hash list blob
1122        let hashes = blobs.read_hash_list(hash).await?;
1123        tracing::debug!(
1124            "_get_pins_from_blobs: Successfully read {} hashes from pinset",
1125            hashes.len()
1126        );
1127
1128        Ok(Pins::from_vec(hashes))
1129    }
1130
1131    async fn _get_node_from_blobs(
1132        node_link: &NodeLink,
1133        blobs: &BlobsStore,
1134    ) -> Result<Node, MountError> {
1135        let link = node_link.link();
1136        let secret = node_link.secret();
1137        let hash = link.hash();
1138
1139        tracing::debug!("_get_node_from_blobs: Checking for node at hash {}", hash);
1140
1141        match blobs.stat(&hash).await {
1142            Ok(true) => {
1143                tracing::debug!("_get_node_from_blobs: Node hash {} exists in blobs", hash);
1144            }
1145            Ok(false) => {
1146                tracing::error!(
1147                    "_get_node_from_blobs: Node hash {} NOT FOUND in blobs - LinkNotFound error!",
1148                    hash
1149                );
1150                return Err(MountError::LinkNotFound(link.clone()));
1151            }
1152            Err(e) => {
1153                tracing::error!(
1154                    "_get_node_from_blobs: Error checking node hash {}: {}",
1155                    hash,
1156                    e
1157                );
1158                return Err(e.into());
1159            }
1160        }
1161
1162        tracing::debug!("_get_node_from_blobs: Reading encrypted node blob");
1163        let blob = blobs.get(&hash).await?;
1164        tracing::debug!(
1165            "_get_node_from_blobs: Got {} bytes of encrypted node data",
1166            blob.len()
1167        );
1168
1169        tracing::debug!("_get_node_from_blobs: Decrypting node data");
1170        let data = secret.decrypt(&blob)?;
1171        tracing::debug!("_get_node_from_blobs: Decrypted {} bytes", data.len());
1172
1173        let node = Node::decode(&data)?;
1174        tracing::debug!("_get_node_from_blobs: Successfully decoded Node");
1175
1176        Ok(node)
1177    }
1178
1179    // TODO (amiller68): you should inline a Link
1180    //  into the node when we store encrypt it,
1181    //  so that we have an integrity check
1182    async fn _put_node_in_blobs(
1183        node: &Node,
1184        secret: &Secret,
1185        blobs: &BlobsStore,
1186    ) -> Result<Link, MountError> {
1187        let _data = node.encode()?;
1188        let data = secret.encrypt(&_data)?;
1189        let hash = blobs.put(data).await?;
1190        // NOTE (amiller68): nodes are always stored as raw
1191        //  since they are encrypted blobs
1192        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1193        Ok(link)
1194    }
1195
1196    pub async fn _put_manifest_in_blobs(
1197        bucket_data: &Manifest,
1198        blobs: &BlobsStore,
1199    ) -> Result<Link, MountError> {
1200        let data = bucket_data.encode()?;
1201        let hash = blobs.put(data).await?;
1202        // NOTE (amiller68): buckets are unencrypted, so they can inherit
1203        //  the codec of the bucket itself (which is currently always cbor)
1204        let link = Link::new(bucket_data.codec(), hash);
1205        Ok(link)
1206    }
1207
1208    pub async fn _put_pins_in_blobs(pins: &Pins, blobs: &BlobsStore) -> Result<Link, MountError> {
1209        // Create a hash list blob from the pins (raw bytes: 32 bytes per hash, concatenated)
1210        let hash = blobs.create_hash_list(pins.iter().copied()).await?;
1211        // Pins are stored as raw blobs containing concatenated hashes
1212        // Note: The underlying blob is HashSeq format, but Link doesn't track this
1213        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1214        Ok(link)
1215    }
1216
1217    async fn _get_ops_log_from_blobs(
1218        link: &Link,
1219        secret: &Secret,
1220        blobs: &BlobsStore,
1221    ) -> Result<PathOpLog, MountError> {
1222        let hash = link.hash();
1223        tracing::debug!(
1224            "_get_ops_log_from_blobs: Checking for ops log at hash {}",
1225            hash
1226        );
1227
1228        match blobs.stat(&hash).await {
1229            Ok(true) => {
1230                tracing::debug!(
1231                    "_get_ops_log_from_blobs: Ops log hash {} exists in blobs",
1232                    hash
1233                );
1234            }
1235            Ok(false) => {
1236                tracing::error!(
1237                    "_get_ops_log_from_blobs: Ops log hash {} NOT FOUND in blobs - LinkNotFound error!",
1238                    hash
1239                );
1240                return Err(MountError::LinkNotFound(link.clone()));
1241            }
1242            Err(e) => {
1243                tracing::error!(
1244                    "_get_ops_log_from_blobs: Error checking ops log hash {}: {}",
1245                    hash,
1246                    e
1247                );
1248                return Err(e.into());
1249            }
1250        }
1251
1252        tracing::debug!("_get_ops_log_from_blobs: Reading encrypted ops log blob");
1253        let blob = blobs.get(&hash).await?;
1254        tracing::debug!(
1255            "_get_ops_log_from_blobs: Got {} bytes of encrypted ops log data",
1256            blob.len()
1257        );
1258
1259        tracing::debug!("_get_ops_log_from_blobs: Decrypting ops log data");
1260        let data = secret.decrypt(&blob)?;
1261        tracing::debug!("_get_ops_log_from_blobs: Decrypted {} bytes", data.len());
1262
1263        let ops_log = PathOpLog::decode(&data)?;
1264        tracing::debug!(
1265            "_get_ops_log_from_blobs: Successfully decoded PathOpLog with {} operations",
1266            ops_log.len()
1267        );
1268
1269        Ok(ops_log)
1270    }
1271
1272    async fn _put_ops_log_in_blobs(
1273        ops_log: &PathOpLog,
1274        secret: &Secret,
1275        blobs: &BlobsStore,
1276    ) -> Result<Link, MountError> {
1277        let _data = ops_log.encode()?;
1278        let data = secret.encrypt(&_data)?;
1279        let hash = blobs.put(data).await?;
1280        // Ops log is stored as an encrypted raw blob
1281        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1282        tracing::debug!(
1283            "_put_ops_log_in_blobs: Stored ops log with {} operations at hash {}",
1284            ops_log.len(),
1285            hash
1286        );
1287        Ok(link)
1288    }
1289
1290    /// Collect all ops from manifest chain back to (but not including) ancestor_link.
1291    ///
1292    /// Traverses the manifest chain starting from the current version, collecting
1293    /// ops_logs from each manifest until we reach the ancestor (or genesis).
1294    /// The collected ops are merged in chronological order.
1295    ///
1296    /// # Arguments
1297    ///
1298    /// * `ancestor_link` - Stop when reaching this link (not included). None means collect all
1299    ///   accessible ops (stops when we can't decrypt a manifest).
1300    /// * `blobs` - The blob store to read manifests from
1301    ///
1302    /// # Returns
1303    ///
1304    /// A combined PathOpLog containing all operations since the ancestor.
1305    pub async fn collect_ops_since(
1306        &self,
1307        ancestor_link: Option<&Link>,
1308        blobs: &BlobsStore,
1309    ) -> Result<PathOpLog, MountError> {
1310        let inner = self.0.lock().await;
1311        let secret_key = inner.secret_key.clone();
1312        let current_link = inner.link.clone();
1313        let current_ops = inner.ops_log.clone();
1314        drop(inner);
1315
1316        let mut all_logs: Vec<PathOpLog> = Vec::new();
1317
1318        // Start with any unsaved operations in the current mount
1319        if !current_ops.is_empty() {
1320            all_logs.push(current_ops);
1321        }
1322
1323        // Walk the chain from current manifest backwards
1324        let mut link = current_link;
1325
1326        loop {
1327            // Check if we've reached the ancestor
1328            if let Some(ancestor) = ancestor_link {
1329                if &link == ancestor {
1330                    break;
1331                }
1332            }
1333
1334            // Load the manifest at this link
1335            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1336
1337            // Get the secret for this manifest version
1338            // If we can't get the secret (e.g., we weren't a member at this point),
1339            // stop traversing - we can't read older ops anyway
1340            let secret = match self.get_secret_for_manifest(&manifest, &secret_key) {
1341                Ok(s) => s,
1342                Err(MountError::ShareNotFound) => {
1343                    tracing::debug!(
1344                        "collect_ops_since: stopping at link {} - no share for current user",
1345                        link.hash()
1346                    );
1347                    break;
1348                }
1349                Err(e) => return Err(e),
1350            };
1351
1352            // Load the ops_log if present
1353            if let Some(ops_link) = manifest.ops_log() {
1354                let mut ops_log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
1355                ops_log.rebuild_clock();
1356                all_logs.push(ops_log);
1357            }
1358
1359            // Move to previous manifest
1360            match manifest.previous() {
1361                Some(prev) => link = prev.clone(),
1362                None => break, // Reached genesis
1363            }
1364        }
1365
1366        // Merge all logs in chronological order (oldest first, so reverse)
1367        all_logs.reverse();
1368        let mut merged = PathOpLog::new();
1369        for log in all_logs {
1370            merged.merge(&log);
1371        }
1372
1373        Ok(merged)
1374    }
1375
1376    /// Get the decryption secret for a manifest.
1377    ///
1378    /// Decrypts the secret share using the provided secret key.
1379    #[allow(clippy::result_large_err)]
1380    fn get_secret_for_manifest(
1381        &self,
1382        manifest: &Manifest,
1383        secret_key: &SecretKey,
1384    ) -> Result<Secret, MountError> {
1385        let public_key = secret_key.public();
1386        let share = manifest
1387            .get_share(&public_key)
1388            .ok_or(MountError::ShareNotFound)?;
1389
1390        match share.role() {
1391            PrincipalRole::Owner => {
1392                let secret_share = share.share().ok_or(MountError::ShareNotFound)?;
1393                Ok(secret_share.recover(secret_key)?)
1394            }
1395            PrincipalRole::Mirror => manifest
1396                .public()
1397                .cloned()
1398                .ok_or(MountError::MirrorCannotMount),
1399        }
1400    }
1401
1402    /// Find the common ancestor between this mount's chain and another's.
1403    ///
1404    /// Walks both chains backwards via the `previous` links and returns
1405    /// the first link where both chains converge.
1406    ///
1407    /// # Arguments
1408    ///
1409    /// * `other` - The other mount to find common ancestor with
1410    /// * `blobs` - The blob store to read manifests from
1411    ///
1412    /// # Returns
1413    ///
1414    /// The common ancestor link, or None if chains never converge (different buckets).
1415    pub async fn find_common_ancestor(
1416        &self,
1417        other: &Mount,
1418        blobs: &BlobsStore,
1419    ) -> Result<Option<Link>, MountError> {
1420        // Build set of all links in self's chain
1421        let mut self_chain: std::collections::HashSet<Link> = std::collections::HashSet::new();
1422
1423        let self_link = self.link().await;
1424        let mut link = self_link;
1425
1426        loop {
1427            self_chain.insert(link.clone());
1428            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1429            match manifest.previous() {
1430                Some(prev) => link = prev.clone(),
1431                None => break,
1432            }
1433        }
1434
1435        // Walk other's chain and find first link in self's set
1436        let other_link = other.link().await;
1437        let mut link = other_link;
1438
1439        loop {
1440            if self_chain.contains(&link) {
1441                return Ok(Some(link));
1442            }
1443            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1444            match manifest.previous() {
1445                Some(prev) => link = prev.clone(),
1446                None => break,
1447            }
1448        }
1449
1450        // No common ancestor found
1451        Ok(None)
1452    }
1453
1454    /// Apply resolved operations to the entry tree.
1455    ///
1456    /// For each operation in the resolved state:
1457    /// - Files with content links are checked for existence (cannot recreate without secret)
1458    /// - Directories are created if they don't exist
1459    async fn apply_resolved_state(&mut self, merged_ops: &PathOpLog) -> Result<(), MountError> {
1460        let resolved_state = merged_ops.resolve_all();
1461
1462        for (path, op) in &resolved_state {
1463            if op.content_link.is_some() {
1464                // File operation - check if it exists in our tree
1465                let abs_path = Path::new("/").join(path);
1466                match self.get(&abs_path).await {
1467                    Ok(_) => {
1468                        // File exists, skip
1469                    }
1470                    Err(MountError::PathNotFound(_)) => {
1471                        // Can't recreate - ops_log stores Link but not decryption secret
1472                        tracing::warn!(
1473                            "apply_resolved_state: cannot recreate file {} - no secret",
1474                            path.display()
1475                        );
1476                        // Record in ops_log so it's tracked
1477                        let mut inner = self.0.lock().await;
1478                        inner.ops_log.merge(&PathOpLog::from_operation(op));
1479                    }
1480                    Err(e) => return Err(e),
1481                }
1482            } else if op.is_dir && matches!(op.op_type, super::path_ops::OpType::Mkdir) {
1483                // Directory operation - create if missing
1484                let abs_path = Path::new("/").join(path);
1485                match self.mkdir(&abs_path).await {
1486                    Ok(()) => {}
1487                    Err(MountError::PathAlreadyExists(_)) => {}
1488                    Err(e) => return Err(e),
1489                }
1490            }
1491        }
1492
1493        Ok(())
1494    }
1495
1496    /// Merge another mount's changes into this one using the given resolver.
1497    ///
1498    /// This method:
1499    /// 1. Finds the common ancestor between the two chains
1500    /// 2. Collects ops from both chains since that ancestor
1501    /// 3. Merges using the resolver
1502    /// 4. Applies the merged state
1503    /// 5. Saves the result as a new version
1504    ///
1505    /// # Arguments
1506    ///
1507    /// * `incoming` - The mount to merge changes from
1508    /// * `resolver` - The conflict resolution strategy to use
1509    /// * `blobs` - The blob store
1510    ///
1511    /// # Returns
1512    ///
1513    /// A MergeResult containing conflict information and the new version link.
1514    pub async fn merge_from<R: super::ConflictResolver>(
1515        &mut self,
1516        incoming: &Mount,
1517        resolver: &R,
1518        blobs: &BlobsStore,
1519    ) -> Result<(MergeResult, Link), MountError> {
1520        // Find the common ancestor
1521        let ancestor = self.find_common_ancestor(incoming, blobs).await?;
1522
1523        // Collect ops from both chains since the ancestor
1524        let local_ops = self.collect_ops_since(ancestor.as_ref(), blobs).await?;
1525        let incoming_ops = incoming.collect_ops_since(ancestor.as_ref(), blobs).await?;
1526
1527        // Get local peer ID for tie-breaking
1528        let peer_id = {
1529            let inner = self.0.lock().await;
1530            inner.peer_id
1531        };
1532
1533        // Merge the operations
1534        let mut merged_ops = local_ops.clone();
1535        let merge_result = merged_ops.merge_with_resolver(&incoming_ops, resolver, &peer_id);
1536
1537        // Apply merged state to the entry tree
1538        self.apply_resolved_state(&merged_ops).await?;
1539
1540        // Merge the ops_log to include all merged operations
1541        {
1542            let mut inner = self.0.lock().await;
1543            inner.ops_log.merge(&merged_ops);
1544        }
1545
1546        // Save the merged state
1547        let (link, _, _) = self.save(blobs, false).await?;
1548
1549        Ok((merge_result, link))
1550    }
1551}