Skip to main content

jax_daemon/fuse/
mount_manager.rs

1//! Mount manager for FUSE filesystems
2//!
3//! Manages the lifecycle of FUSE mounts, keeping them alive and synced.
4//! Subscribes to sync events to invalidate caches when bucket state changes.
5
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
11use uuid::Uuid;
12
13use crate::database::models::FuseMount;
14use crate::database::types::MountStatus;
15use crate::database::Database;
16use crate::fuse::cache::FileCacheConfig;
17use crate::fuse::jax_fs::JaxFs;
18use crate::fuse::sync_events::{SaveRequest, SyncEvent};
19use crate::fuse::FileCache;
20use common::mount::{ConflictFile, Mount};
21use common::peer::Peer;
22
23/// Configuration for mount manager
24#[derive(Debug, Clone)]
25pub struct MountManagerConfig {
26    /// Channel capacity for sync events
27    pub sync_event_capacity: usize,
28}
29
30impl Default for MountManagerConfig {
31    fn default() -> Self {
32        Self {
33            sync_event_capacity: 256,
34        }
35    }
36}
37
38/// Handle to a FUSE session running in a background thread
39///
40/// This wrapper makes the session Send+Sync by running the actual BackgroundSession
41/// in a dedicated thread and communicating via channel.
42pub struct SessionHandle {
43    /// Sending on this channel signals the session thread to drop the session and unmount
44    unmount_tx: Option<oneshot::Sender<()>>,
45}
46
47impl SessionHandle {
48    /// Create a new session handle that owns the FUSE session in a background thread
49    fn spawn(session: fuser::BackgroundSession) -> Self {
50        let (unmount_tx, unmount_rx) = oneshot::channel();
51
52        // Spawn a thread that owns the session
53        // When unmount_rx receives a message (or is dropped), the session is dropped
54        std::thread::spawn(move || {
55            // Keep the session alive until we receive the unmount signal
56            let _session = session;
57            // Block until unmount is requested or the sender is dropped
58            let _ = unmount_rx.blocking_recv();
59            // Session is dropped here, which unmounts the filesystem
60        });
61
62        Self {
63            unmount_tx: Some(unmount_tx),
64        }
65    }
66
67    /// Signal the session to unmount
68    fn unmount(&mut self) {
69        if let Some(tx) = self.unmount_tx.take() {
70            let _ = tx.send(());
71        }
72    }
73}
74
75/// A live mount with its associated state
76pub struct LiveMount {
77    /// The bucket mount (kept alive for quick access)
78    pub mount: Arc<RwLock<Mount>>,
79    /// FUSE session handle (if running)
80    pub session: Option<SessionHandle>,
81    /// File cache for this mount
82    pub cache: FileCache,
83    /// Configuration from database
84    pub config: FuseMount,
85}
86
87impl std::fmt::Debug for LiveMount {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        f.debug_struct("LiveMount")
90            .field("config", &self.config)
91            .field("has_session", &self.session.is_some())
92            .finish()
93    }
94}
95
96/// Manager for FUSE mounts
97///
98/// Keeps mounts alive and synced, handling lifecycle and cache invalidation.
99pub struct MountManager {
100    /// Live mounts: mount_id → LiveMount
101    mounts: RwLock<HashMap<Uuid, LiveMount>>,
102    /// Database for persistence
103    db: Database,
104    /// Peer for sync integration
105    peer: Peer<Database>,
106    /// Sync event broadcaster
107    sync_tx: broadcast::Sender<SyncEvent>,
108}
109
110impl MountManager {
111    /// Create a new mount manager
112    pub fn new(db: Database, peer: Peer<Database>, config: MountManagerConfig) -> Self {
113        let (sync_tx, _) = broadcast::channel(config.sync_event_capacity);
114
115        Self {
116            mounts: RwLock::new(HashMap::new()),
117            db,
118            peer,
119            sync_tx,
120        }
121    }
122
123    /// Subscribe to sync events
124    pub fn subscribe_sync_events(&self) -> broadcast::Receiver<SyncEvent> {
125        self.sync_tx.subscribe()
126    }
127
128    /// Get a reference to the sync event sender
129    pub fn sync_sender(&self) -> broadcast::Sender<SyncEvent> {
130        self.sync_tx.clone()
131    }
132
133    /// Called when a bucket sync completes - refresh affected mounts
134    ///
135    /// If the live mount has unsaved local changes (ops_log non-empty), merges them
136    /// with incoming changes using conflict resolution. Otherwise, simply reloads
137    /// from the new head.
138    pub async fn on_bucket_synced(&self, bucket_id: Uuid) -> Result<(), MountError> {
139        let mounts = self.mounts.read().await;
140
141        for (mount_id, live_mount) in mounts.iter() {
142            if *live_mount.config.bucket_id == bucket_id {
143                // Check if mount has unsaved local changes
144                let has_local_changes = {
145                    let mount_guard = live_mount.mount.read().await;
146                    let inner = mount_guard.inner().await;
147                    !inner.ops_log().is_empty()
148                };
149
150                if has_local_changes {
151                    tracing::info!(
152                        "Mount {} has local changes, merging with incoming sync",
153                        mount_id
154                    );
155
156                    // Load the incoming mount from the new head
157                    let incoming = self
158                        .peer
159                        .mount(bucket_id)
160                        .await
161                        .map_err(|e| MountError::MountLoad(e.into()))?;
162
163                    // Use ConflictFile resolver to create conflict copies for concurrent edits
164                    let resolver = ConflictFile::new();
165
166                    // Merge incoming changes into local mount
167                    let mut mount_guard = live_mount.mount.write().await;
168                    match mount_guard
169                        .merge_from(&incoming, &resolver, self.peer.blobs())
170                        .await
171                    {
172                        Ok((result, link)) => {
173                            // Log any conflicts that were resolved
174                            for resolved in &result.conflicts_resolved {
175                                tracing::info!(
176                                    "Resolved conflict for {:?}: {:?}",
177                                    resolved.conflict.path,
178                                    resolved.resolution
179                                );
180                            }
181
182                            tracing::info!(
183                                "Merged {} operations, {} conflicts resolved, new link: {}",
184                                result.operations_added,
185                                result.conflicts_resolved.len(),
186                                link.hash()
187                            );
188
189                            // Save the merged result
190                            if let Err(e) = self.peer.save_mount(&mount_guard, false).await {
191                                tracing::error!("Failed to save merged mount {}: {}", mount_id, e);
192                            }
193                        }
194                        Err(e) => {
195                            tracing::error!(
196                                "Failed to merge mount {} with incoming changes: {}",
197                                mount_id,
198                                e
199                            );
200                            // Fall back to simple reload
201                            let new_mount = self
202                                .peer
203                                .mount(bucket_id)
204                                .await
205                                .map_err(|e| MountError::MountLoad(e.into()))?;
206                            *mount_guard = new_mount;
207                        }
208                    }
209                } else {
210                    // No local changes - simple reload from updated head
211                    let new_mount = self
212                        .peer
213                        .mount(bucket_id)
214                        .await
215                        .map_err(|e| MountError::MountLoad(e.into()))?;
216
217                    *live_mount.mount.write().await = new_mount;
218                }
219
220                // Invalidate cache
221                live_mount.cache.invalidate_all();
222
223                // Notify subscribers
224                let _ = self.sync_tx.send(SyncEvent::MountInvalidated {
225                    mount_id: *mount_id,
226                });
227
228                tracing::debug!(
229                    "Mount {} cache invalidated after bucket {} sync",
230                    mount_id,
231                    bucket_id
232                );
233            }
234        }
235
236        Ok(())
237    }
238
239    /// Create a new mount configuration
240    pub async fn create_mount(
241        &self,
242        bucket_id: Uuid,
243        mount_point: &str,
244        auto_mount: bool,
245        read_only: bool,
246        cache_size_mb: Option<u32>,
247        cache_ttl_secs: Option<u32>,
248    ) -> Result<FuseMount, MountError> {
249        // Validate bucket exists
250        let bucket_info = self
251            .db
252            .get_bucket_info(&bucket_id)
253            .await
254            .map_err(MountError::Database)?;
255
256        if bucket_info.is_none() {
257            return Err(MountError::BucketNotFound(bucket_id));
258        }
259
260        // Validate or create mount point
261        let mount_path = PathBuf::from(mount_point);
262        if !mount_path.exists() {
263            std::fs::create_dir_all(&mount_path).map_err(|e| {
264                MountError::MountPointNotFound(format!("{} (failed to create: {})", mount_point, e))
265            })?;
266        }
267
268        if !mount_path.is_dir() {
269            return Err(MountError::MountPointNotDirectory(mount_point.to_string()));
270        }
271
272        // Create in database
273        let mount = FuseMount::create(
274            bucket_id,
275            mount_point,
276            auto_mount,
277            read_only,
278            cache_size_mb.map(|v| v as i64),
279            cache_ttl_secs.map(|v| v as i64),
280            &self.db,
281        )
282        .await
283        .map_err(MountError::Database)?;
284
285        tracing::info!(
286            "Created mount {} for bucket {} at {}",
287            mount.mount_id,
288            mount.bucket_id,
289            mount.mount_point
290        );
291
292        Ok(mount)
293    }
294
295    /// Get a mount by ID
296    pub async fn get(&self, mount_id: &Uuid) -> Result<Option<FuseMount>, MountError> {
297        FuseMount::get(*mount_id, &self.db)
298            .await
299            .map_err(MountError::Database)
300    }
301
302    /// List all mounts
303    pub async fn list(&self) -> Result<Vec<FuseMount>, MountError> {
304        FuseMount::list(&self.db)
305            .await
306            .map_err(MountError::Database)
307    }
308
309    /// Update a mount configuration
310    #[allow(clippy::too_many_arguments)]
311    pub async fn update(
312        &self,
313        mount_id: &Uuid,
314        mount_point: Option<&str>,
315        enabled: Option<bool>,
316        auto_mount: Option<bool>,
317        read_only: Option<bool>,
318        cache_size_mb: Option<u32>,
319        cache_ttl_secs: Option<u32>,
320    ) -> Result<Option<FuseMount>, MountError> {
321        FuseMount::update(
322            *mount_id,
323            mount_point,
324            enabled,
325            auto_mount,
326            read_only,
327            cache_size_mb.map(|v| v as i64),
328            cache_ttl_secs.map(|v| v as i64),
329            &self.db,
330        )
331        .await
332        .map_err(MountError::Database)
333    }
334
335    /// Delete a mount configuration
336    pub async fn delete(&self, mount_id: &Uuid) -> Result<bool, MountError> {
337        // First stop if running
338        let _ = self.stop(mount_id).await;
339
340        // Remove from live mounts
341        self.mounts.write().await.remove(mount_id);
342
343        // Delete from database
344        FuseMount::delete(*mount_id, &self.db)
345            .await
346            .map_err(MountError::Database)
347    }
348
349    /// Start a mount (spawn FUSE process)
350    pub async fn start(&self, mount_id: &Uuid) -> Result<(), MountError> {
351        // Get mount config
352        let mount_config = FuseMount::get(*mount_id, &self.db)
353            .await
354            .map_err(MountError::Database)?
355            .ok_or(MountError::MountNotFound(*mount_id))?;
356
357        // Check if already running
358        {
359            let mounts = self.mounts.read().await;
360            if let Some(live) = mounts.get(mount_id) {
361                if live.session.is_some() {
362                    return Err(MountError::AlreadyRunning(*mount_id));
363                }
364            }
365        }
366
367        // Update status to starting
368        FuseMount::update_status(*mount_id, MountStatus::Starting, None, &self.db)
369            .await
370            .map_err(MountError::Database)?;
371
372        // Load the bucket mount
373        let bucket_mount = self
374            .peer
375            .mount(*mount_config.bucket_id)
376            .await
377            .map_err(|e| MountError::MountLoad(e.into()))?;
378
379        // Create cache
380        let cache = FileCache::new(crate::fuse::cache::FileCacheConfig {
381            max_size_mb: mount_config.cache_size_mb as u32,
382            ttl_secs: mount_config.cache_ttl_secs as u32,
383        });
384
385        // Create the FUSE filesystem with direct Mount reference
386        let mount_arc = Arc::new(RwLock::new(bucket_mount));
387        let sync_rx = self.subscribe_sync_events();
388
389        // Create save channel for persistence requests from FUSE
390        let (save_tx, save_rx) = mpsc::channel::<SaveRequest>(32);
391
392        let fs = JaxFs::new(
393            tokio::runtime::Handle::current(),
394            mount_arc.clone(),
395            *mount_id,
396            *mount_config.bucket_id,
397            FileCacheConfig {
398                max_size_mb: mount_config.cache_size_mb as u32,
399                ttl_secs: mount_config.cache_ttl_secs as u32,
400            },
401            *mount_config.read_only,
402            Some(sync_rx),
403            Some(save_tx),
404        );
405
406        // Spawn save handler task
407        self.spawn_save_handler(save_rx, mount_arc.clone());
408
409        // Mount options
410        #[cfg(target_os = "linux")]
411        let options = vec![
412            fuser::MountOption::FSName("jax".to_string()),
413            fuser::MountOption::AutoUnmount,
414            fuser::MountOption::AllowOther,
415        ];
416
417        #[cfg(target_os = "macos")]
418        let options = {
419            // Get bucket name for volume label (macOS only)
420            let bucket_name = self
421                .db
422                .get_bucket_info(&mount_config.bucket_id)
423                .await
424                .ok()
425                .flatten()
426                .map(|info| info.name)
427                .unwrap_or_else(|| "jax".to_string());
428
429            vec![
430                fuser::MountOption::FSName("jax".to_string()),
431                fuser::MountOption::AutoUnmount,
432                fuser::MountOption::CUSTOM(format!("volname={}", bucket_name)),
433                fuser::MountOption::CUSTOM("local".to_string()),
434                fuser::MountOption::CUSTOM("noappledouble".to_string()),
435            ]
436        };
437
438        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
439        let options = vec![
440            fuser::MountOption::FSName("jax".to_string()),
441            fuser::MountOption::AutoUnmount,
442        ];
443
444        // Spawn the FUSE session in background
445        let mount_path = std::path::Path::new(&mount_config.mount_point);
446        tracing::info!("Mounting FUSE filesystem at {:?}", mount_path);
447
448        let session = fuser::spawn_mount2(fs, mount_path, &options).map_err(|e| {
449            MountError::SpawnFailed(format!(
450                "Failed to mount at {}: {}",
451                mount_config.mount_point, e
452            ))
453        })?;
454
455        // Wrap session in a handle for thread safety
456        let session_handle = SessionHandle::spawn(session);
457
458        // Create live mount
459        let live_mount = LiveMount {
460            mount: mount_arc,
461            session: Some(session_handle),
462            cache,
463            config: mount_config.clone(),
464        };
465
466        // Store in live mounts
467        self.mounts.write().await.insert(*mount_id, live_mount);
468
469        // Update status to running
470        FuseMount::update_status(*mount_id, MountStatus::Running, None, &self.db)
471            .await
472            .map_err(MountError::Database)?;
473
474        tracing::info!("Started mount {} at {}", mount_id, mount_config.mount_point);
475
476        Ok(())
477    }
478
479    /// Stop a mount
480    pub async fn stop(&self, mount_id: &Uuid) -> Result<(), MountError> {
481        // Update status to stopping
482        let _ = FuseMount::update_status(*mount_id, MountStatus::Stopping, None, &self.db).await;
483
484        let mount_point = {
485            let mut mounts = self.mounts.write().await;
486            if let Some(live) = mounts.get_mut(mount_id) {
487                // Signal the session to unmount
488                if let Some(ref mut session) = live.session {
489                    session.unmount();
490                }
491                live.session.take();
492
493                live.config.mount_point.clone()
494            } else {
495                // Try to get from database
496                match FuseMount::get(*mount_id, &self.db).await {
497                    Ok(Some(config)) => config.mount_point,
498                    _ => return Ok(()),
499                }
500            }
501        };
502
503        // Platform-specific unmount
504        self.unmount_path(&mount_point).await?;
505
506        // Update status to stopped
507        FuseMount::update_status(*mount_id, MountStatus::Stopped, None, &self.db)
508            .await
509            .map_err(MountError::Database)?;
510
511        tracing::info!("Stopped mount {} at {}", mount_id, mount_point);
512
513        Ok(())
514    }
515
516    /// Stop all running mounts
517    pub async fn stop_all(&self) -> Result<(), MountError> {
518        let mount_ids: Vec<Uuid> = {
519            let mounts = self.mounts.read().await;
520            mounts.keys().copied().collect()
521        };
522
523        for mount_id in mount_ids {
524            if let Err(e) = self.stop(&mount_id).await {
525                tracing::error!("Failed to stop mount {}: {}", mount_id, e);
526            }
527        }
528
529        Ok(())
530    }
531
532    /// Start all mounts configured for auto-mount
533    pub async fn start_auto(&self) -> Result<(), MountError> {
534        let auto_mounts = FuseMount::auto_list(&self.db)
535            .await
536            .map_err(MountError::Database)?;
537
538        tracing::info!("Starting {} auto-mount(s)", auto_mounts.len());
539
540        for mount in auto_mounts {
541            if let Err(e) = self.start(&mount.mount_id).await {
542                tracing::error!(
543                    "Failed to auto-mount {} at {}: {}",
544                    mount.mount_id,
545                    mount.mount_point,
546                    e
547                );
548
549                // Update status to error
550                let _ = FuseMount::update_status(
551                    *mount.mount_id,
552                    MountStatus::Error,
553                    Some(&e.to_string()),
554                    &self.db,
555                )
556                .await;
557            }
558        }
559
560        Ok(())
561    }
562
563    /// Get a live mount by ID
564    pub async fn get_live_mount(&self, mount_id: &Uuid) -> Option<Arc<RwLock<Mount>>> {
565        let mounts = self.mounts.read().await;
566        mounts.get(mount_id).map(|m| m.mount.clone())
567    }
568
569    /// Get the cache for a live mount
570    pub async fn get_mount_cache(&self, mount_id: &Uuid) -> Option<FileCache> {
571        let mounts = self.mounts.read().await;
572        mounts.get(mount_id).map(|m| m.cache.clone())
573    }
574
575    /// Spawn a background task to handle save requests from FUSE
576    fn spawn_save_handler(
577        &self,
578        mut save_rx: mpsc::Receiver<SaveRequest>,
579        mount: Arc<RwLock<Mount>>,
580    ) {
581        let peer = self.peer.clone();
582
583        tokio::spawn(async move {
584            while let Some(request) = save_rx.recv().await {
585                tracing::debug!("Received save request for mount {}", request.mount_id);
586
587                // Get the current mount state and save it
588                let mount_guard = mount.read().await;
589                match peer.save_mount(&mount_guard, false).await {
590                    Ok(link) => {
591                        tracing::info!(
592                            "Successfully saved mount {} to {}",
593                            request.mount_id,
594                            link.hash()
595                        );
596                    }
597                    Err(e) => {
598                        tracing::error!("Failed to save mount {}: {}", request.mount_id, e);
599                    }
600                }
601            }
602
603            tracing::debug!("Save handler shutting down");
604        });
605    }
606
607    /// Platform-specific unmount
608    async fn unmount_path(&self, mount_point: &str) -> Result<(), MountError> {
609        use std::process::Command;
610
611        #[cfg(target_os = "macos")]
612        {
613            let status = Command::new("umount")
614                .arg(mount_point)
615                .status()
616                .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
617
618            if !status.success() {
619                // Try diskutil as fallback
620                let _ = Command::new("diskutil")
621                    .args(["unmount", "force", mount_point])
622                    .status();
623            }
624        }
625
626        #[cfg(target_os = "linux")]
627        {
628            let status = Command::new("fusermount")
629                .args(["-u", mount_point])
630                .status()
631                .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
632
633            if !status.success() {
634                // Try lazy unmount as fallback
635                let _ = Command::new("fusermount")
636                    .args(["-uz", mount_point])
637                    .status();
638            }
639        }
640
641        #[cfg(not(any(target_os = "macos", target_os = "linux")))]
642        {
643            tracing::warn!("Unmount not implemented for this platform");
644        }
645
646        Ok(())
647    }
648}
649
650impl std::fmt::Debug for MountManager {
651    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652        f.debug_struct("MountManager")
653            .field("peer_id", &self.peer.id())
654            .finish()
655    }
656}
657
658/// Errors that can occur during mount operations
659#[derive(Debug, thiserror::Error)]
660pub enum MountError {
661    #[error("database error: {0}")]
662    Database(#[from] sqlx::Error),
663
664    #[error("bucket not found: {0}")]
665    BucketNotFound(Uuid),
666
667    #[error("mount not found: {0}")]
668    MountNotFound(Uuid),
669
670    #[error("mount point not found: {0}")]
671    MountPointNotFound(String),
672
673    #[error("mount point is not a directory: {0}")]
674    MountPointNotDirectory(String),
675
676    #[error("mount already running: {0}")]
677    AlreadyRunning(Uuid),
678
679    #[error("failed to load bucket mount: {0}")]
680    MountLoad(#[source] anyhow::Error),
681
682    #[error("failed to spawn FUSE process: {0}")]
683    SpawnFailed(String),
684
685    #[error("failed to unmount: {0}")]
686    UnmountFailed(String),
687}