Skip to main content

common/mount/
mount_inner.rs

1use std::collections::BTreeMap;
2use std::io::Read;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use tokio::sync::Mutex;
7use uuid::Uuid;
8
9use crate::crypto::{PublicKey, Secret, SecretError, SecretKey, SecretShare};
10use crate::linked_data::{BlockEncoded, CodecError, Link};
11use crate::peer::{BlobsStore, BlobsStoreError};
12
13use super::conflict::MergeResult;
14use super::manifest::{Manifest, ManifestError, Share};
15use super::node::{Node, NodeError, NodeLink};
16use super::path_ops::{OpType, PathOpLog};
17use super::pins::Pins;
18use super::principal::PrincipalRole;
19
20pub fn clean_path(path: &Path) -> PathBuf {
21    if !path.is_absolute() {
22        panic!("path is not absolute");
23    }
24    path.iter()
25        .skip(1)
26        .map(|part| part.to_string_lossy().to_string())
27        .collect::<PathBuf>()
28}
29
30#[derive(Clone)]
31pub struct MountInner {
32    // link to the manifest
33    pub link: Link,
34    // the loaded manifest
35    pub manifest: Manifest,
36    // the loaded, decrypted entry node
37    pub entry: Node,
38    // the loaded pins
39    pub pins: Pins,
40    // convenience pointer to the height of the mount
41    pub height: u64,
42    // the path operations log (CRDT for conflict resolution)
43    pub ops_log: PathOpLog,
44    // the local peer ID (for recording operations)
45    pub peer_id: PublicKey,
46    // the secret key for signing manifests
47    pub secret_key: SecretKey,
48}
49
50impl MountInner {
51    pub fn link(&self) -> &Link {
52        &self.link
53    }
54    pub fn entry(&self) -> &Node {
55        &self.entry
56    }
57    pub fn manifest(&self) -> &Manifest {
58        &self.manifest
59    }
60    pub fn pins(&self) -> &Pins {
61        &self.pins
62    }
63    pub fn height(&self) -> u64 {
64        self.height
65    }
66    pub fn ops_log(&self) -> &PathOpLog {
67        &self.ops_log
68    }
69    pub fn peer_id(&self) -> &PublicKey {
70        &self.peer_id
71    }
72}
73
74#[derive(Clone)]
75pub struct Mount(Arc<Mutex<MountInner>>, BlobsStore);
76
77#[derive(Debug, thiserror::Error)]
78pub enum MountError {
79    #[error("default error: {0}")]
80    Default(#[from] anyhow::Error),
81    #[error("link not found")]
82    LinkNotFound(Link),
83    #[error("path not found: {0}")]
84    PathNotFound(PathBuf),
85    #[error("path is not a node: {0}")]
86    PathNotNode(PathBuf),
87    #[error("path already exists: {0}")]
88    PathAlreadyExists(PathBuf),
89    #[error("cannot move '{from}' to '{to}': destination is inside source")]
90    MoveIntoSelf { from: PathBuf, to: PathBuf },
91    #[error("blobs store error: {0}")]
92    BlobsStore(#[from] BlobsStoreError),
93    #[error("secret error: {0}")]
94    Secret(#[from] SecretError),
95    #[error("node error: {0}")]
96    Node(#[from] NodeError),
97    #[error("codec error: {0}")]
98    Codec(#[from] CodecError),
99    #[error("share error: {0}")]
100    Share(#[from] crate::crypto::SecretShareError),
101    #[error("manifest error: {0}")]
102    Manifest(#[from] ManifestError),
103    #[error("peers share was not found")]
104    ShareNotFound,
105    #[error("mirror cannot mount: bucket is not published")]
106    MirrorCannotMount,
107}
108
109impl Mount {
110    pub async fn inner(&self) -> MountInner {
111        self.0.lock().await.clone()
112    }
113
114    pub fn blobs(&self) -> BlobsStore {
115        self.1.clone()
116    }
117
118    pub async fn link(&self) -> Link {
119        let inner = self.0.lock().await;
120        inner.link.clone()
121    }
122
123    /// Save the current mount state to the blobs store.
124    ///
125    /// If `publish` is true, the secret will be stored in plaintext, allowing
126    /// mirrors to decrypt the bucket contents.
127    pub async fn save(
128        &self,
129        blobs: &BlobsStore,
130        publish: bool,
131    ) -> Result<(Link, Link, u64), MountError> {
132        // Clone data we need before any async operations
133        let (
134            entry_node,
135            mut pins,
136            previous_link,
137            previous_height,
138            manifest_template,
139            ops_log,
140            secret_key,
141        ) = {
142            let inner = self.0.lock().await;
143            (
144                inner.entry.clone(),
145                inner.pins.clone(),
146                inner.link.clone(),
147                inner.height,
148                inner.manifest.clone(),
149                inner.ops_log.clone(),
150                inner.secret_key.clone(),
151            )
152        };
153
154        // Increment the height of the mount
155        let height = previous_height + 1;
156
157        // Create a new secret for the updated root
158        let secret = Secret::generate();
159
160        // Put the current root node into blobs with the new secret
161        let entry = Self::_put_node_in_blobs(&entry_node, &secret, blobs).await?;
162
163        // Serialize current pins to blobs
164        // put the new root link into the pins, as well as the previous link
165        pins.insert(entry.clone().hash());
166        pins.insert(previous_link.hash());
167
168        // Encrypt and store the ops log if it has any operations
169        let ops_log_link = if !ops_log.is_empty() {
170            let link = Self::_put_ops_log_in_blobs(&ops_log, &secret, blobs).await?;
171            pins.insert(link.hash());
172            Some(link)
173        } else {
174            None
175        };
176
177        let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
178
179        // Re-encrypt owner shares with the new secret (mirrors stay unchanged)
180        let mut manifest = manifest_template;
181        for share in manifest.shares_mut().values_mut() {
182            if *share.role() == PrincipalRole::Owner {
183                let secret_share = SecretShare::new(&secret, &share.principal().identity)?;
184                share.set_share(secret_share);
185            }
186        }
187
188        // Update publish state: publish with new secret, or clear stale public secret
189        if publish {
190            manifest.publish(&secret);
191        } else {
192            // Clear any existing public secret since it would be stale
193            // (encrypted with old secret, not the new one)
194            manifest.unpublish();
195        }
196        manifest.set_pins(pins_link.clone());
197        manifest.set_previous(previous_link.clone());
198        manifest.set_entry(entry.clone());
199        manifest.set_height(height);
200
201        // Clear inherited ops_log from the template, then set if we have new operations
202        // Each version's ops_log is independent and encrypted with that version's secret
203        manifest.clear_ops_log();
204        if let Some(ops_link) = ops_log_link {
205            manifest.set_ops_log(ops_link);
206        }
207
208        // Sign the manifest with the stored secret key
209        manifest.sign(&secret_key)?;
210
211        // Put the updated manifest into blobs to determine the new link
212        let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
213
214        // Update the internal state
215        {
216            let mut inner = self.0.lock().await;
217            inner.manifest = manifest;
218            inner.height = height;
219            inner.link = link.clone();
220            // Clear the ops_log - it's now persisted in the manifest
221            // Future operations start a fresh log for the next version
222            // IMPORTANT: Preserve the clock value so future ops have unique timestamps
223            inner.ops_log.clear_preserving_clock();
224        }
225
226        Ok((link, previous_link, height))
227    }
228
229    pub async fn init(
230        id: Uuid,
231        name: String,
232        owner: &SecretKey,
233        blobs: &BlobsStore,
234    ) -> Result<Self, MountError> {
235        // create a new root node for the bucket
236        let entry = Node::default();
237        // create a new secret for the owner
238        let secret = Secret::generate();
239        // put the node in the blobs store for the secret
240        let entry_link = Self::_put_node_in_blobs(&entry, &secret, blobs).await?;
241        // share the secret with the owner
242        let share = SecretShare::new(&secret, &owner.public())?;
243        // Initialize pins with root node hash
244        let mut pins = Pins::new();
245        pins.insert(entry_link.hash());
246        // Put the pins in blobs to get a pins link
247        let pins_link = Self::_put_pins_in_blobs(&pins, blobs).await?;
248        // construct the new manifest
249        let mut manifest = Manifest::new(
250            id,
251            name.clone(),
252            owner.public(),
253            share,
254            entry_link.clone(),
255            pins_link.clone(),
256            0, // initial height is 0
257        );
258        // Sign the manifest with the owner's key
259        manifest.sign(owner)?;
260        let link = Self::_put_manifest_in_blobs(&manifest, blobs).await?;
261
262        // return the new mount
263        Ok(Mount(
264            Arc::new(Mutex::new(MountInner {
265                link,
266                manifest,
267                entry,
268                pins,
269                height: 0,
270                ops_log: PathOpLog::new(),
271                peer_id: owner.public(),
272                secret_key: owner.clone(),
273            })),
274            blobs.clone(),
275        ))
276    }
277
278    pub async fn load(
279        link: &Link,
280        secret_key: &SecretKey,
281        blobs: &BlobsStore,
282    ) -> Result<Self, MountError> {
283        let public_key = &secret_key.public();
284        let manifest = Self::_get_manifest_from_blobs(link, blobs).await?;
285
286        let bucket_share = match manifest.get_share(public_key) {
287            Some(share) => share,
288            None => return Err(MountError::ShareNotFound),
289        };
290
291        // Get the secret based on role
292        let secret = match bucket_share.role() {
293            PrincipalRole::Owner => {
294                // Owners decrypt their individual share
295                let share = bucket_share.share().ok_or(MountError::ShareNotFound)?;
296                share.recover(secret_key)?
297            }
298            PrincipalRole::Mirror => {
299                // Mirrors use the public secret (if bucket is published)
300                manifest
301                    .public()
302                    .cloned()
303                    .ok_or(MountError::MirrorCannotMount)?
304            }
305        };
306
307        let pins = Self::_get_pins_from_blobs(manifest.pins(), blobs).await?;
308        let entry = Self::_get_node_from_blobs(
309            &NodeLink::Dir(manifest.entry().clone(), secret.clone()),
310            blobs,
311        )
312        .await?;
313
314        // Read height from the manifest
315        let height = manifest.height();
316
317        // Load the ops log if it exists, otherwise create a new one
318        let ops_log = if let Some(ops_link) = manifest.ops_log() {
319            let mut log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
320            // Rebuild local clock from operations after deserialization
321            log.rebuild_clock();
322            log
323        } else {
324            PathOpLog::new()
325        };
326
327        Ok(Mount(
328            Arc::new(Mutex::new(MountInner {
329                link: link.clone(),
330                manifest,
331                entry,
332                pins,
333                height,
334                ops_log,
335                peer_id: secret_key.public(),
336                secret_key: secret_key.clone(),
337            })),
338            blobs.clone(),
339        ))
340    }
341
342    /// Load just the manifest from a link without full mount decryption.
343    ///
344    /// This is useful for checking roles/shares before deciding which version to load.
345    /// Returns the manifest if the blob exists, without attempting decryption.
346    pub async fn load_manifest(link: &Link, blobs: &BlobsStore) -> Result<Manifest, MountError> {
347        Self::_get_manifest_from_blobs(link, blobs).await
348    }
349
350    /// Add an owner to this bucket.
351    /// Owners get an encrypted share immediately.
352    pub async fn add_owner(&mut self, peer: PublicKey) -> Result<(), MountError> {
353        let mut inner = self.0.lock().await;
354        let secret_share = SecretShare::new(&Secret::default(), &peer)?;
355        inner
356            .manifest
357            .add_share(Share::new_owner(secret_share, peer));
358        Ok(())
359    }
360
361    /// Add a mirror to this bucket.
362    /// Mirrors can sync bucket data but cannot decrypt until published.
363    pub async fn add_mirror(&mut self, peer: PublicKey) {
364        let mut inner = self.0.lock().await;
365        inner.manifest.add_share(Share::new_mirror(peer));
366    }
367
368    /// Check if this bucket is published (mirrors can decrypt).
369    pub async fn is_published(&self) -> bool {
370        let inner = self.0.lock().await;
371        inner.manifest.is_published()
372    }
373
374    /// Save and publish this bucket, granting decryption access to all mirrors.
375    ///
376    /// This is a convenience method equivalent to `save(blobs, true)`.
377    pub async fn publish(&self) -> Result<(Link, Link, u64), MountError> {
378        self.save(&self.1, true).await
379    }
380
381    pub async fn add<R>(&mut self, path: &Path, data: R) -> Result<(), MountError>
382    where
383        R: Read + Send + Sync + 'static + Unpin,
384    {
385        let secret = Secret::generate();
386
387        let encrypted_reader = secret.encrypt_reader(data)?;
388
389        // TODO (amiller68): this is incredibly dumb
390        use bytes::Bytes;
391        use futures::stream;
392        let encrypted_bytes = {
393            let mut buf = Vec::new();
394            let mut reader = encrypted_reader;
395            reader.read_to_end(&mut buf).map_err(SecretError::Io)?;
396            buf
397        };
398
399        let stream = Box::pin(stream::once(async move {
400            Ok::<_, std::io::Error>(Bytes::from(encrypted_bytes))
401        }));
402
403        let hash = self.1.put_stream(stream).await?;
404
405        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
406
407        let node_link = NodeLink::new_data_from_path(link.clone(), secret, path);
408
409        let root_node = {
410            let inner = self.0.lock().await;
411            inner.entry.clone()
412        };
413
414        let (updated_link, node_hashes) =
415            Self::_set_node_link_at_path(root_node, node_link, path, &self.1).await?;
416
417        // Update entry if needed
418        let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
419            Some(
420                Self::_get_node_from_blobs(
421                    &NodeLink::Dir(new_root_link.clone(), new_secret),
422                    &self.1,
423                )
424                .await?,
425            )
426        } else {
427            None
428        };
429
430        // Update inner state
431        {
432            let mut inner = self.0.lock().await;
433            // Track pins: data blob + all created node hashes
434            inner.pins.insert(hash);
435            inner.pins.extend(node_hashes);
436
437            if let Some(entry) = new_entry {
438                inner.entry = entry;
439            }
440
441            // Record the add operation in the ops log
442            let peer_id = inner.peer_id;
443            inner
444                .ops_log
445                .record(peer_id, OpType::Add, clean_path(path), Some(link), false);
446        }
447
448        Ok(())
449    }
450
451    pub async fn rm(&mut self, path: &Path) -> Result<(), MountError> {
452        let path = clean_path(path);
453        let parent_path = path
454            .parent()
455            .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot remove root")))?;
456
457        let entry = {
458            let inner = self.0.lock().await;
459            inner.entry.clone()
460        };
461
462        let mut parent_node = if parent_path == Path::new("") {
463            entry.clone()
464        } else {
465            Self::_get_node_at_path(&entry, parent_path, &self.1).await?
466        };
467
468        let file_name = path.file_name().unwrap().to_string_lossy().to_string();
469
470        // Get the node link before deleting to check if it's a directory
471        let removed_link = parent_node.del(&file_name);
472        if removed_link.is_none() {
473            return Err(MountError::PathNotFound(path.to_path_buf()));
474        }
475        let is_dir = removed_link.map(|l| l.is_dir()).unwrap_or(false);
476
477        // Store path for ops log before we lose ownership
478        let removed_path = path.to_path_buf();
479
480        if parent_path == Path::new("") {
481            let secret = Secret::generate();
482            let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
483
484            let mut inner = self.0.lock().await;
485            // Track the new root node hash
486            inner.pins.insert(link.hash());
487            inner.entry = parent_node;
488        } else {
489            // Save the modified parent node to blobs
490            let secret = Secret::generate();
491            let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
492            let node_link = NodeLink::new_dir(parent_link.clone(), secret);
493
494            // Convert parent_path back to absolute for _set_node_link_at_path
495            let abs_parent_path = Path::new("/").join(parent_path);
496            let (updated_link, node_hashes) =
497                Self::_set_node_link_at_path(entry, node_link, &abs_parent_path, &self.1).await?;
498
499            let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
500                Some(
501                    Self::_get_node_from_blobs(
502                        &NodeLink::Dir(new_root_link.clone(), new_secret),
503                        &self.1,
504                    )
505                    .await?,
506                )
507            } else {
508                None
509            };
510
511            let mut inner = self.0.lock().await;
512            // Track the parent node hash and all created node hashes
513            inner.pins.insert(parent_link.hash());
514            inner.pins.extend(node_hashes);
515
516            if let Some(new_entry) = new_entry {
517                inner.entry = new_entry;
518            }
519        }
520
521        // Record the remove operation in the ops log
522        {
523            let mut inner = self.0.lock().await;
524            let peer_id = inner.peer_id;
525            inner
526                .ops_log
527                .record(peer_id, OpType::Remove, removed_path, None, is_dir);
528        }
529
530        Ok(())
531    }
532
533    pub async fn mkdir(&mut self, path: &Path) -> Result<(), MountError> {
534        let path = clean_path(path);
535
536        // Check if the path already exists
537        let entry = {
538            let inner = self.0.lock().await;
539            inner.entry.clone()
540        };
541
542        // Try to get the parent path and file name
543        let (parent_path, dir_name) = if let Some(parent) = path.parent() {
544            (
545                parent,
546                path.file_name().unwrap().to_string_lossy().to_string(),
547            )
548        } else {
549            return Err(MountError::Default(anyhow::anyhow!("Cannot mkdir at root")));
550        };
551
552        // Get the parent node (or use root if parent is empty)
553        let parent_node = if parent_path == Path::new("") {
554            entry.clone()
555        } else {
556            // Check if parent path exists, if not it will be created by _set_node_link_at_path
557            match Self::_get_node_at_path(&entry, parent_path, &self.1).await {
558                Ok(node) => node,
559                Err(MountError::PathNotFound(_)) => Node::default(), // Will be created
560                Err(err) => return Err(err),
561            }
562        };
563
564        // Check if a node with this name already exists in the parent
565        if parent_node.get_link(&dir_name).is_some() {
566            return Err(MountError::PathAlreadyExists(Path::new("/").join(&path)));
567        }
568
569        // Create an empty directory node
570        let new_dir_node = Node::default();
571
572        // Generate a secret for the new directory
573        let secret = Secret::generate();
574
575        // Store the node in blobs
576        let dir_link = Self::_put_node_in_blobs(&new_dir_node, &secret, &self.1).await?;
577
578        // Create a NodeLink for the directory
579        let node_link = NodeLink::new_dir(dir_link.clone(), secret);
580
581        // Convert path back to absolute for _set_node_link_at_path
582        let abs_path = Path::new("/").join(&path);
583
584        // Use _set_node_link_at_path to insert the directory into the tree
585        let (updated_link, node_hashes) =
586            Self::_set_node_link_at_path(entry, node_link, &abs_path, &self.1).await?;
587
588        // Update entry if the root was modified
589        let new_entry = if let NodeLink::Dir(new_root_link, new_secret) = updated_link {
590            Some(
591                Self::_get_node_from_blobs(
592                    &NodeLink::Dir(new_root_link.clone(), new_secret),
593                    &self.1,
594                )
595                .await?,
596            )
597        } else {
598            None
599        };
600
601        // Update inner state
602        {
603            let mut inner = self.0.lock().await;
604            // Track the directory node hash and all created node hashes
605            inner.pins.insert(dir_link.hash());
606            inner.pins.extend(node_hashes);
607
608            if let Some(new_entry) = new_entry {
609                inner.entry = new_entry;
610            }
611
612            // Record the mkdir operation in the ops log
613            let peer_id = inner.peer_id;
614            inner
615                .ops_log
616                .record(peer_id, OpType::Mkdir, path.to_path_buf(), None, true);
617        }
618
619        Ok(())
620    }
621
622    /// Move or rename a file or directory from one path to another.
623    ///
624    /// This operation:
625    /// 1. Validates that the move is legal (destination not inside source)
626    /// 2. Retrieves the node at the source path
627    /// 3. Removes it from the source location
628    /// 4. Inserts it at the destination location
629    ///
630    /// The node's content (files/subdirectories) is not re-encrypted during the move;
631    /// only the tree structure is updated. This makes moves efficient regardless of
632    /// the size of the subtree being moved.
633    ///
634    /// # Errors
635    ///
636    /// - `PathNotFound` - source path doesn't exist
637    /// - `PathAlreadyExists` - destination path already exists
638    /// - `MoveIntoSelf` - attempting to move a directory into itself (e.g., /foo -> /foo/bar)
639    /// - `Default` - attempting to move the root directory
640    pub async fn mv(&mut self, from: &Path, to: &Path) -> Result<(), MountError> {
641        // Convert absolute paths to relative paths for internal operations.
642        // The mount stores paths relative to root, so "/foo/bar" becomes "foo/bar".
643        let from_clean = clean_path(from);
644        let to_clean = clean_path(to);
645
646        // ============================================================
647        // VALIDATION: Prevent moving a directory into itself
648        // ============================================================
649        // This catches cases like:
650        //   - /foo -> /foo (same path, would delete then fail to insert)
651        //   - /foo -> /foo/bar (moving into subdirectory of itself)
652        //
653        // This is impossible in a filesystem sense - you can't put a box inside itself.
654        // We check this early to provide a clear error message and avoid corrupting
655        // the tree structure (the delete would succeed but the insert would fail).
656        if to.starts_with(from) {
657            return Err(MountError::MoveIntoSelf {
658                from: from.to_path_buf(),
659                to: to.to_path_buf(),
660            });
661        }
662
663        // ============================================================
664        // STEP 1: Retrieve the NodeLink at the source path
665        // ============================================================
666        // A NodeLink is a reference to either a file or directory. For directories,
667        // it contains the entire subtree. We'll reuse this same NodeLink at the
668        // destination, which means no re-encryption is needed for the content.
669        let node_link = self.get(from).await?;
670        let is_dir = node_link.is_dir();
671
672        // Store paths for ops log before any mutations
673        let from_path = from_clean.to_path_buf();
674        let to_path = to_clean.to_path_buf();
675
676        // ============================================================
677        // STEP 2: Verify destination doesn't already exist
678        // ============================================================
679        // Unlike Unix mv which can overwrite, we require the destination to be empty.
680        // This prevents accidental data loss.
681        if self.get(to).await.is_ok() {
682            return Err(MountError::PathAlreadyExists(to.to_path_buf()));
683        }
684
685        // ============================================================
686        // STEP 3: Remove the node from its source location
687        // ============================================================
688        // We need to update the parent directory to remove the reference to this node.
689        // This involves:
690        //   a) Finding the parent directory
691        //   b) Removing the child entry from it
692        //   c) Re-encrypting and saving the modified parent
693        //   d) Propagating changes up to the root (updating all ancestor nodes)
694        {
695            // Get the parent path (e.g., "foo" for "foo/bar")
696            let parent_path = from_clean
697                .parent()
698                .ok_or_else(|| MountError::Default(anyhow::anyhow!("Cannot move root")))?;
699
700            // Get the current root entry node
701            let entry = {
702                let inner = self.0.lock().await;
703                inner.entry.clone()
704            };
705
706            // Load the parent node - either the root itself or a subdirectory
707            let mut parent_node = if parent_path == Path::new("") {
708                // Source is at root level (e.g., "/foo"), parent is root
709                entry.clone()
710            } else {
711                // Source is nested, need to traverse to find parent
712                Self::_get_node_at_path(&entry, parent_path, &self.1).await?
713            };
714
715            // Extract the filename component (e.g., "bar" from "foo/bar")
716            let file_name = from_clean
717                .file_name()
718                .expect(
719                    "from_clean has no filename - this should be impossible after parent() check",
720                )
721                .to_string_lossy()
722                .to_string();
723
724            // Remove the child from the parent's children map
725            if parent_node.del(&file_name).is_none() {
726                return Err(MountError::PathNotFound(from_clean.to_path_buf()));
727            }
728
729            // Now we need to persist the modified parent and update the tree
730            if parent_path == Path::new("") {
731                // Parent is root - just update root directly
732                let secret = Secret::generate();
733                let link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
734
735                let mut inner = self.0.lock().await;
736                inner.pins.insert(link.hash());
737                inner.entry = parent_node;
738            } else {
739                // Parent is a subdirectory - need to propagate changes up the tree.
740                // This creates a new encrypted blob for the parent and updates
741                // all ancestor nodes to point to the new parent.
742                let secret = Secret::generate();
743                let parent_link = Self::_put_node_in_blobs(&parent_node, &secret, &self.1).await?;
744                let new_node_link = NodeLink::new_dir(parent_link.clone(), secret);
745
746                // Update the tree from root down to this parent
747                let abs_parent_path = Path::new("/").join(parent_path);
748                let (updated_root_link, node_hashes) =
749                    Self::_set_node_link_at_path(entry, new_node_link, &abs_parent_path, &self.1)
750                        .await?;
751
752                // Load the new root entry from the updated link.
753                // The root should always be a directory; if it's not, something is
754                // seriously wrong with the mount structure.
755                let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
756
757                // Update the mount's internal state with the new tree
758                let mut inner = self.0.lock().await;
759                inner.pins.insert(parent_link.hash());
760                inner.pins.extend(node_hashes);
761                inner.entry = new_entry;
762            }
763        }
764
765        // ============================================================
766        // STEP 4: Insert the node at the destination path
767        // ============================================================
768        // We reuse the same NodeLink from step 1. This means the actual file/directory
769        // content doesn't need to be re-encrypted - only the tree structure changes.
770        // This makes moves O(depth) rather than O(size of subtree).
771        let entry = {
772            let inner = self.0.lock().await;
773            inner.entry.clone()
774        };
775
776        let (updated_root_link, node_hashes) =
777            Self::_set_node_link_at_path(entry, node_link, to, &self.1).await?;
778
779        // ============================================================
780        // STEP 5: Update internal state with the final tree
781        // ============================================================
782        {
783            // Load the new root entry and update the mount
784            let new_entry = Self::_get_node_from_blobs(&updated_root_link, &self.1).await?;
785
786            let mut inner = self.0.lock().await;
787            inner.pins.extend(node_hashes);
788            inner.entry = new_entry;
789
790            // ============================================================
791            // STEP 6: Record mv operation in the ops log
792            // ============================================================
793            let peer_id = inner.peer_id;
794            inner.ops_log.record(
795                peer_id,
796                OpType::Mv { from: from_path },
797                to_path,
798                None,
799                is_dir,
800            );
801        }
802
803        Ok(())
804    }
805
806    pub async fn ls(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
807        let mut items = BTreeMap::new();
808        let path = clean_path(path);
809
810        let inner = self.0.lock().await;
811        let root_node = inner.entry.clone();
812        drop(inner);
813
814        let node = if path == Path::new("") {
815            root_node
816        } else {
817            match Self::_get_node_at_path(&root_node, &path, &self.1).await {
818                Ok(node) => node,
819                Err(MountError::LinkNotFound(_)) => {
820                    return Err(MountError::PathNotNode(path.to_path_buf()))
821                }
822                Err(err) => return Err(err),
823            }
824        };
825
826        for (name, link) in node.get_links() {
827            let mut full_path = path.clone();
828            full_path.push(name);
829            items.insert(full_path, link.clone());
830        }
831
832        Ok(items)
833    }
834
835    pub async fn ls_deep(&self, path: &Path) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
836        let base_path = clean_path(path);
837        self._ls_deep(path, &base_path).await
838    }
839
840    async fn _ls_deep(
841        &self,
842        path: &Path,
843        base_path: &Path,
844    ) -> Result<BTreeMap<PathBuf, NodeLink>, MountError> {
845        let mut all_items = BTreeMap::new();
846
847        // get the initial items at the given path
848        let items = self.ls(path).await?;
849
850        for (item_path, link) in items {
851            // Make path relative to the base_path
852            let relative_path = if base_path == Path::new("") {
853                item_path.clone()
854            } else {
855                item_path
856                    .strip_prefix(base_path)
857                    .unwrap_or(&item_path)
858                    .to_path_buf()
859            };
860            all_items.insert(relative_path.clone(), link.clone());
861
862            if link.is_dir() {
863                // Recurse using the absolute path
864                let abs_item_path = Path::new("/").join(&item_path);
865                let sub_items = Box::pin(self._ls_deep(&abs_item_path, base_path)).await?;
866
867                // Sub items already have correct relative paths from base_path
868                for (sub_path, sub_link) in sub_items {
869                    all_items.insert(sub_path, sub_link);
870                }
871            }
872        }
873
874        Ok(all_items)
875    }
876
877    #[allow(clippy::await_holding_lock)]
878    pub async fn cat(&self, path: &Path) -> Result<Vec<u8>, MountError> {
879        let path = clean_path(path);
880
881        let inner = self.0.lock().await;
882        let root_node = inner.entry.clone();
883        drop(inner);
884
885        let (parent_path, file_name) = if let Some(parent) = path.parent() {
886            (
887                parent,
888                path.file_name().unwrap().to_string_lossy().to_string(),
889            )
890        } else {
891            return Err(MountError::PathNotFound(path.to_path_buf()));
892        };
893
894        let parent_node = if parent_path == Path::new("") {
895            root_node
896        } else {
897            Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
898        };
899
900        let link = parent_node
901            .get_link(&file_name)
902            .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))?;
903
904        match link {
905            NodeLink::Data(link, secret, _) => {
906                let encrypted_data = self.1.get(&link.hash()).await?;
907                let data = secret.decrypt(&encrypted_data)?;
908                Ok(data)
909            }
910            NodeLink::Dir(_, _) => Err(MountError::PathNotNode(path.to_path_buf())),
911        }
912    }
913
914    /// Get the NodeLink for a file at a given path
915    #[allow(clippy::await_holding_lock)]
916    pub async fn get(&self, path: &Path) -> Result<NodeLink, MountError> {
917        let path = clean_path(path);
918
919        let inner = self.0.lock().await;
920        let root_node = inner.entry.clone();
921        drop(inner);
922
923        let (parent_path, file_name) = if let Some(parent) = path.parent() {
924            (
925                parent,
926                path.file_name().unwrap().to_string_lossy().to_string(),
927            )
928        } else {
929            return Err(MountError::PathNotFound(path.to_path_buf()));
930        };
931
932        let parent_node = if parent_path == Path::new("") {
933            root_node
934        } else {
935            Self::_get_node_at_path(&root_node, parent_path, &self.1).await?
936        };
937
938        parent_node
939            .get_link(&file_name)
940            .cloned()
941            .ok_or_else(|| MountError::PathNotFound(path.to_path_buf()))
942    }
943
944    async fn _get_node_at_path(
945        node: &Node,
946        path: &Path,
947        blobs: &BlobsStore,
948    ) -> Result<Node, MountError> {
949        let mut current_node = node.clone();
950        let mut consumed_path = PathBuf::from("/");
951
952        for part in path.iter() {
953            consumed_path.push(part);
954            let next = part.to_string_lossy().to_string();
955            let next_link = current_node
956                .get_link(&next)
957                .ok_or(MountError::PathNotFound(consumed_path.clone()))?;
958            current_node = Self::_get_node_from_blobs(next_link, blobs).await?
959        }
960        Ok(current_node)
961    }
962
963    pub async fn _set_node_link_at_path(
964        node: Node,
965        node_link: NodeLink,
966        path: &Path,
967        blobs: &BlobsStore,
968    ) -> Result<(NodeLink, Vec<crate::linked_data::Hash>), MountError> {
969        let path = clean_path(path);
970        let mut visited_nodes = Vec::new();
971        let mut name = path.file_name().unwrap().to_string_lossy().to_string();
972        let parent_path = path.parent().unwrap_or(Path::new(""));
973
974        let mut consumed_path = PathBuf::from("/");
975        let mut node = node;
976        visited_nodes.push((consumed_path.clone(), node.clone()));
977
978        for part in parent_path.iter() {
979            let next = part.to_string_lossy().to_string();
980            let next_link = node.get_link(&next);
981            if let Some(next_link) = next_link {
982                consumed_path.push(part);
983                match next_link {
984                    NodeLink::Dir(..) => {
985                        node = Self::_get_node_from_blobs(next_link, blobs).await?
986                    }
987                    NodeLink::Data(..) => {
988                        return Err(MountError::PathNotNode(consumed_path.clone()));
989                    }
990                }
991                visited_nodes.push((consumed_path.clone(), node.clone()));
992            } else {
993                // Create a new directory node
994                node = Node::default();
995                consumed_path.push(part);
996                visited_nodes.push((consumed_path.clone(), node.clone()));
997            }
998        }
999
1000        let mut node_link = node_link;
1001        let mut created_hashes = Vec::new();
1002        for (path, mut node) in visited_nodes.into_iter().rev() {
1003            node.insert(name, node_link.clone());
1004            let secret = Secret::generate();
1005            let link = Self::_put_node_in_blobs(&node, &secret, blobs).await?;
1006            created_hashes.push(link.hash());
1007            node_link = NodeLink::Dir(link, secret);
1008            name = path
1009                .file_name()
1010                .unwrap_or_default()
1011                .to_string_lossy()
1012                .to_string();
1013        }
1014
1015        Ok((node_link, created_hashes))
1016    }
1017
1018    async fn _get_manifest_from_blobs(
1019        link: &Link,
1020        blobs: &BlobsStore,
1021    ) -> Result<Manifest, MountError> {
1022        tracing::debug!(
1023            "_get_bucket_from_blobs: Checking for bucket data at link {:?}",
1024            link
1025        );
1026        let hash = link.hash();
1027        tracing::debug!("_get_bucket_from_blobs: Bucket hash: {}", hash);
1028
1029        match blobs.stat(&hash).await {
1030            Ok(true) => {
1031                tracing::debug!(
1032                    "_get_bucket_from_blobs: Bucket hash {} exists in blobs",
1033                    hash
1034                );
1035            }
1036            Ok(false) => {
1037                tracing::error!("_get_bucket_from_blobs: Bucket hash {} NOT FOUND in blobs - LinkNotFound error!", hash);
1038                return Err(MountError::LinkNotFound(link.clone()));
1039            }
1040            Err(e) => {
1041                tracing::error!(
1042                    "_get_bucket_from_blobs: Error checking bucket hash {}: {}",
1043                    hash,
1044                    e
1045                );
1046                return Err(e.into());
1047            }
1048        }
1049
1050        tracing::debug!("_get_bucket_from_blobs: Reading bucket data from blobs");
1051        let data = blobs.get(&hash).await?;
1052        tracing::debug!(
1053            "_get_bucket_from_blobs: Got {} bytes of bucket data",
1054            data.len()
1055        );
1056
1057        let bucket_data = Manifest::decode(&data)?;
1058        tracing::debug!(
1059            "_get_bucket_from_blobs: Successfully decoded BucketData for bucket '{}'",
1060            bucket_data.name()
1061        );
1062
1063        Ok(bucket_data)
1064    }
1065
1066    pub async fn _get_pins_from_blobs(link: &Link, blobs: &BlobsStore) -> Result<Pins, MountError> {
1067        tracing::debug!("_get_pins_from_blobs: Checking for pins at link {:?}", link);
1068        let hash = link.hash();
1069        tracing::debug!("_get_pins_from_blobs: Pins hash: {}", hash);
1070
1071        match blobs.stat(&hash).await {
1072            Ok(true) => {
1073                tracing::debug!("_get_pins_from_blobs: Pins hash {} exists in blobs", hash);
1074            }
1075            Ok(false) => {
1076                tracing::error!(
1077                    "_get_pins_from_blobs: Pins hash {} NOT FOUND in blobs - LinkNotFound error!",
1078                    hash
1079                );
1080                return Err(MountError::LinkNotFound(link.clone()));
1081            }
1082            Err(e) => {
1083                tracing::error!(
1084                    "_get_pins_from_blobs: Error checking pins hash {}: {}",
1085                    hash,
1086                    e
1087                );
1088                return Err(e.into());
1089            }
1090        }
1091
1092        tracing::debug!("_get_pins_from_blobs: Reading hash list from blobs");
1093        // Read hashes from the hash list blob
1094        let hashes = blobs.read_hash_list(hash).await?;
1095        tracing::debug!(
1096            "_get_pins_from_blobs: Successfully read {} hashes from pinset",
1097            hashes.len()
1098        );
1099
1100        Ok(Pins::from_vec(hashes))
1101    }
1102
1103    async fn _get_node_from_blobs(
1104        node_link: &NodeLink,
1105        blobs: &BlobsStore,
1106    ) -> Result<Node, MountError> {
1107        let link = node_link.link();
1108        let secret = node_link.secret();
1109        let hash = link.hash();
1110
1111        tracing::debug!("_get_node_from_blobs: Checking for node at hash {}", hash);
1112
1113        match blobs.stat(&hash).await {
1114            Ok(true) => {
1115                tracing::debug!("_get_node_from_blobs: Node hash {} exists in blobs", hash);
1116            }
1117            Ok(false) => {
1118                tracing::error!(
1119                    "_get_node_from_blobs: Node hash {} NOT FOUND in blobs - LinkNotFound error!",
1120                    hash
1121                );
1122                return Err(MountError::LinkNotFound(link.clone()));
1123            }
1124            Err(e) => {
1125                tracing::error!(
1126                    "_get_node_from_blobs: Error checking node hash {}: {}",
1127                    hash,
1128                    e
1129                );
1130                return Err(e.into());
1131            }
1132        }
1133
1134        tracing::debug!("_get_node_from_blobs: Reading encrypted node blob");
1135        let blob = blobs.get(&hash).await?;
1136        tracing::debug!(
1137            "_get_node_from_blobs: Got {} bytes of encrypted node data",
1138            blob.len()
1139        );
1140
1141        tracing::debug!("_get_node_from_blobs: Decrypting node data");
1142        let data = secret.decrypt(&blob)?;
1143        tracing::debug!("_get_node_from_blobs: Decrypted {} bytes", data.len());
1144
1145        let node = Node::decode(&data)?;
1146        tracing::debug!("_get_node_from_blobs: Successfully decoded Node");
1147
1148        Ok(node)
1149    }
1150
1151    // TODO (amiller68): you should inline a Link
1152    //  into the node when we store encrypt it,
1153    //  so that we have an integrity check
1154    async fn _put_node_in_blobs(
1155        node: &Node,
1156        secret: &Secret,
1157        blobs: &BlobsStore,
1158    ) -> Result<Link, MountError> {
1159        let _data = node.encode()?;
1160        let data = secret.encrypt(&_data)?;
1161        let hash = blobs.put(data).await?;
1162        // NOTE (amiller68): nodes are always stored as raw
1163        //  since they are encrypted blobs
1164        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1165        Ok(link)
1166    }
1167
1168    pub async fn _put_manifest_in_blobs(
1169        bucket_data: &Manifest,
1170        blobs: &BlobsStore,
1171    ) -> Result<Link, MountError> {
1172        let data = bucket_data.encode()?;
1173        let hash = blobs.put(data).await?;
1174        // NOTE (amiller68): buckets are unencrypted, so they can inherit
1175        //  the codec of the bucket itself (which is currently always cbor)
1176        let link = Link::new(bucket_data.codec(), hash);
1177        Ok(link)
1178    }
1179
1180    pub async fn _put_pins_in_blobs(pins: &Pins, blobs: &BlobsStore) -> Result<Link, MountError> {
1181        // Create a hash list blob from the pins (raw bytes: 32 bytes per hash, concatenated)
1182        let hash = blobs.create_hash_list(pins.iter().copied()).await?;
1183        // Pins are stored as raw blobs containing concatenated hashes
1184        // Note: The underlying blob is HashSeq format, but Link doesn't track this
1185        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1186        Ok(link)
1187    }
1188
1189    async fn _get_ops_log_from_blobs(
1190        link: &Link,
1191        secret: &Secret,
1192        blobs: &BlobsStore,
1193    ) -> Result<PathOpLog, MountError> {
1194        let hash = link.hash();
1195        tracing::debug!(
1196            "_get_ops_log_from_blobs: Checking for ops log at hash {}",
1197            hash
1198        );
1199
1200        match blobs.stat(&hash).await {
1201            Ok(true) => {
1202                tracing::debug!(
1203                    "_get_ops_log_from_blobs: Ops log hash {} exists in blobs",
1204                    hash
1205                );
1206            }
1207            Ok(false) => {
1208                tracing::error!(
1209                    "_get_ops_log_from_blobs: Ops log hash {} NOT FOUND in blobs - LinkNotFound error!",
1210                    hash
1211                );
1212                return Err(MountError::LinkNotFound(link.clone()));
1213            }
1214            Err(e) => {
1215                tracing::error!(
1216                    "_get_ops_log_from_blobs: Error checking ops log hash {}: {}",
1217                    hash,
1218                    e
1219                );
1220                return Err(e.into());
1221            }
1222        }
1223
1224        tracing::debug!("_get_ops_log_from_blobs: Reading encrypted ops log blob");
1225        let blob = blobs.get(&hash).await?;
1226        tracing::debug!(
1227            "_get_ops_log_from_blobs: Got {} bytes of encrypted ops log data",
1228            blob.len()
1229        );
1230
1231        tracing::debug!("_get_ops_log_from_blobs: Decrypting ops log data");
1232        let data = secret.decrypt(&blob)?;
1233        tracing::debug!("_get_ops_log_from_blobs: Decrypted {} bytes", data.len());
1234
1235        let ops_log = PathOpLog::decode(&data)?;
1236        tracing::debug!(
1237            "_get_ops_log_from_blobs: Successfully decoded PathOpLog with {} operations",
1238            ops_log.len()
1239        );
1240
1241        Ok(ops_log)
1242    }
1243
1244    async fn _put_ops_log_in_blobs(
1245        ops_log: &PathOpLog,
1246        secret: &Secret,
1247        blobs: &BlobsStore,
1248    ) -> Result<Link, MountError> {
1249        let _data = ops_log.encode()?;
1250        let data = secret.encrypt(&_data)?;
1251        let hash = blobs.put(data).await?;
1252        // Ops log is stored as an encrypted raw blob
1253        let link = Link::new(crate::linked_data::LD_RAW_CODEC, hash);
1254        tracing::debug!(
1255            "_put_ops_log_in_blobs: Stored ops log with {} operations at hash {}",
1256            ops_log.len(),
1257            hash
1258        );
1259        Ok(link)
1260    }
1261
1262    /// Collect all ops from manifest chain back to (but not including) ancestor_link.
1263    ///
1264    /// Traverses the manifest chain starting from the current version, collecting
1265    /// ops_logs from each manifest until we reach the ancestor (or genesis).
1266    /// The collected ops are merged in chronological order.
1267    ///
1268    /// # Arguments
1269    ///
1270    /// * `ancestor_link` - Stop when reaching this link (not included). None means collect all
1271    ///   accessible ops (stops when we can't decrypt a manifest).
1272    /// * `blobs` - The blob store to read manifests from
1273    ///
1274    /// # Returns
1275    ///
1276    /// A combined PathOpLog containing all operations since the ancestor.
1277    pub async fn collect_ops_since(
1278        &self,
1279        ancestor_link: Option<&Link>,
1280        blobs: &BlobsStore,
1281    ) -> Result<PathOpLog, MountError> {
1282        let inner = self.0.lock().await;
1283        let secret_key = inner.secret_key.clone();
1284        let current_link = inner.link.clone();
1285        let current_ops = inner.ops_log.clone();
1286        drop(inner);
1287
1288        let mut all_logs: Vec<PathOpLog> = Vec::new();
1289
1290        // Start with any unsaved operations in the current mount
1291        if !current_ops.is_empty() {
1292            all_logs.push(current_ops);
1293        }
1294
1295        // Walk the chain from current manifest backwards
1296        let mut link = current_link;
1297
1298        loop {
1299            // Check if we've reached the ancestor
1300            if let Some(ancestor) = ancestor_link {
1301                if &link == ancestor {
1302                    break;
1303                }
1304            }
1305
1306            // Load the manifest at this link
1307            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1308
1309            // Get the secret for this manifest version
1310            // If we can't get the secret (e.g., we weren't a member at this point),
1311            // stop traversing - we can't read older ops anyway
1312            let secret = match self.get_secret_for_manifest(&manifest, &secret_key) {
1313                Ok(s) => s,
1314                Err(MountError::ShareNotFound) => {
1315                    tracing::debug!(
1316                        "collect_ops_since: stopping at link {} - no share for current user",
1317                        link.hash()
1318                    );
1319                    break;
1320                }
1321                Err(e) => return Err(e),
1322            };
1323
1324            // Load the ops_log if present
1325            if let Some(ops_link) = manifest.ops_log() {
1326                let mut ops_log = Self::_get_ops_log_from_blobs(ops_link, &secret, blobs).await?;
1327                ops_log.rebuild_clock();
1328                all_logs.push(ops_log);
1329            }
1330
1331            // Move to previous manifest
1332            match manifest.previous() {
1333                Some(prev) => link = prev.clone(),
1334                None => break, // Reached genesis
1335            }
1336        }
1337
1338        // Merge all logs in chronological order (oldest first, so reverse)
1339        all_logs.reverse();
1340        let mut merged = PathOpLog::new();
1341        for log in all_logs {
1342            merged.merge(&log);
1343        }
1344
1345        Ok(merged)
1346    }
1347
1348    /// Get the decryption secret for a manifest.
1349    ///
1350    /// Decrypts the secret share using the provided secret key.
1351    #[allow(clippy::result_large_err)]
1352    fn get_secret_for_manifest(
1353        &self,
1354        manifest: &Manifest,
1355        secret_key: &SecretKey,
1356    ) -> Result<Secret, MountError> {
1357        let public_key = secret_key.public();
1358        let share = manifest
1359            .get_share(&public_key)
1360            .ok_or(MountError::ShareNotFound)?;
1361
1362        match share.role() {
1363            PrincipalRole::Owner => {
1364                let secret_share = share.share().ok_or(MountError::ShareNotFound)?;
1365                Ok(secret_share.recover(secret_key)?)
1366            }
1367            PrincipalRole::Mirror => manifest
1368                .public()
1369                .cloned()
1370                .ok_or(MountError::MirrorCannotMount),
1371        }
1372    }
1373
1374    /// Find the common ancestor between this mount's chain and another's.
1375    ///
1376    /// Walks both chains backwards via the `previous` links and returns
1377    /// the first link where both chains converge.
1378    ///
1379    /// # Arguments
1380    ///
1381    /// * `other` - The other mount to find common ancestor with
1382    /// * `blobs` - The blob store to read manifests from
1383    ///
1384    /// # Returns
1385    ///
1386    /// The common ancestor link, or None if chains never converge (different buckets).
1387    pub async fn find_common_ancestor(
1388        &self,
1389        other: &Mount,
1390        blobs: &BlobsStore,
1391    ) -> Result<Option<Link>, MountError> {
1392        // Build set of all links in self's chain
1393        let mut self_chain: std::collections::HashSet<Link> = std::collections::HashSet::new();
1394
1395        let self_link = self.link().await;
1396        let mut link = self_link;
1397
1398        loop {
1399            self_chain.insert(link.clone());
1400            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1401            match manifest.previous() {
1402                Some(prev) => link = prev.clone(),
1403                None => break,
1404            }
1405        }
1406
1407        // Walk other's chain and find first link in self's set
1408        let other_link = other.link().await;
1409        let mut link = other_link;
1410
1411        loop {
1412            if self_chain.contains(&link) {
1413                return Ok(Some(link));
1414            }
1415            let manifest = Self::_get_manifest_from_blobs(&link, blobs).await?;
1416            match manifest.previous() {
1417                Some(prev) => link = prev.clone(),
1418                None => break,
1419            }
1420        }
1421
1422        // No common ancestor found
1423        Ok(None)
1424    }
1425
1426    /// Apply resolved operations to the entry tree.
1427    ///
1428    /// For each operation in the resolved state:
1429    /// - Files with content links are checked for existence (cannot recreate without secret)
1430    /// - Directories are created if they don't exist
1431    async fn apply_resolved_state(&mut self, merged_ops: &PathOpLog) -> Result<(), MountError> {
1432        let resolved_state = merged_ops.resolve_all();
1433
1434        for (path, op) in &resolved_state {
1435            if op.content_link.is_some() {
1436                // File operation - check if it exists in our tree
1437                let abs_path = Path::new("/").join(path);
1438                match self.get(&abs_path).await {
1439                    Ok(_) => {
1440                        // File exists, skip
1441                    }
1442                    Err(MountError::PathNotFound(_)) => {
1443                        // Can't recreate - ops_log stores Link but not decryption secret
1444                        tracing::warn!(
1445                            "apply_resolved_state: cannot recreate file {} - no secret",
1446                            path.display()
1447                        );
1448                        // Record in ops_log so it's tracked
1449                        let mut inner = self.0.lock().await;
1450                        inner.ops_log.merge(&PathOpLog::from_operation(op));
1451                    }
1452                    Err(e) => return Err(e),
1453                }
1454            } else if op.is_dir && matches!(op.op_type, super::path_ops::OpType::Mkdir) {
1455                // Directory operation - create if missing
1456                let abs_path = Path::new("/").join(path);
1457                match self.mkdir(&abs_path).await {
1458                    Ok(()) => {}
1459                    Err(MountError::PathAlreadyExists(_)) => {}
1460                    Err(e) => return Err(e),
1461                }
1462            }
1463        }
1464
1465        Ok(())
1466    }
1467
1468    /// Merge another mount's changes into this one using the given resolver.
1469    ///
1470    /// This method:
1471    /// 1. Finds the common ancestor between the two chains
1472    /// 2. Collects ops from both chains since that ancestor
1473    /// 3. Merges using the resolver
1474    /// 4. Applies the merged state
1475    /// 5. Saves the result as a new version
1476    ///
1477    /// # Arguments
1478    ///
1479    /// * `incoming` - The mount to merge changes from
1480    /// * `resolver` - The conflict resolution strategy to use
1481    /// * `blobs` - The blob store
1482    ///
1483    /// # Returns
1484    ///
1485    /// A MergeResult containing conflict information and the new version link.
1486    pub async fn merge_from<R: super::ConflictResolver>(
1487        &mut self,
1488        incoming: &Mount,
1489        resolver: &R,
1490        blobs: &BlobsStore,
1491    ) -> Result<(MergeResult, Link), MountError> {
1492        // Find the common ancestor
1493        let ancestor = self.find_common_ancestor(incoming, blobs).await?;
1494
1495        // Collect ops from both chains since the ancestor
1496        let local_ops = self.collect_ops_since(ancestor.as_ref(), blobs).await?;
1497        let incoming_ops = incoming.collect_ops_since(ancestor.as_ref(), blobs).await?;
1498
1499        // Get local peer ID for tie-breaking
1500        let peer_id = {
1501            let inner = self.0.lock().await;
1502            inner.peer_id
1503        };
1504
1505        // Merge the operations
1506        let mut merged_ops = local_ops.clone();
1507        let merge_result = merged_ops.merge_with_resolver(&incoming_ops, resolver, &peer_id);
1508
1509        // Apply merged state to the entry tree
1510        self.apply_resolved_state(&merged_ops).await?;
1511
1512        // Merge the ops_log to include all merged operations
1513        {
1514            let mut inner = self.0.lock().await;
1515            inner.ops_log.merge(&merged_ops);
1516        }
1517
1518        // Save the merged state
1519        let (link, _, _) = self.save(blobs, false).await?;
1520
1521        Ok((merge_result, link))
1522    }
1523}