Skip to main content

jax_daemon/fuse/
mount_manager.rs

1//! Mount manager for FUSE filesystems
2//!
3//! Manages the lifecycle of FUSE mounts, keeping them alive and synced.
4//! Subscribes to sync events to invalidate caches when bucket state changes.
5
6use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use tokio::sync::{broadcast, oneshot, RwLock};
11use uuid::Uuid;
12
13use crate::database::models::FuseMount;
14use crate::database::types::MountStatus;
15use crate::database::Database;
16use crate::fuse::cache::FileCacheConfig;
17use crate::fuse::jax_fs::JaxFs;
18use crate::fuse::sync_events::SyncEvent;
19use crate::fuse::FileCache;
20use crate::http_server::api::client::ApiClient;
21use common::mount::{ConflictFile, Mount};
22use common::peer::Peer;
23
24/// Configuration for mount manager
25#[derive(Debug, Clone)]
26pub struct MountManagerConfig {
27    /// Channel capacity for sync events
28    pub sync_event_capacity: usize,
29    /// Port of the daemon API server (used by FUSE to route mutations)
30    pub api_port: u16,
31}
32
33impl Default for MountManagerConfig {
34    fn default() -> Self {
35        Self {
36            sync_event_capacity: 256,
37            api_port: 5001,
38        }
39    }
40}
41
42/// Handle to a FUSE session running in a background thread
43///
44/// This wrapper makes the session Send+Sync by running the actual BackgroundSession
45/// in a dedicated thread and communicating via channel.
46pub struct SessionHandle {
47    /// Sending on this channel signals the session thread to drop the session and unmount
48    unmount_tx: Option<oneshot::Sender<()>>,
49}
50
51impl SessionHandle {
52    /// Create a new session handle that owns the FUSE session in a background thread
53    fn spawn(session: fuser::BackgroundSession) -> Self {
54        let (unmount_tx, unmount_rx) = oneshot::channel();
55
56        // Spawn a thread that owns the session
57        // When unmount_rx receives a message (or is dropped), the session is dropped
58        std::thread::spawn(move || {
59            // Keep the session alive until we receive the unmount signal
60            let _session = session;
61            // Block until unmount is requested or the sender is dropped
62            let _ = unmount_rx.blocking_recv();
63            // Session is dropped here, which unmounts the filesystem
64        });
65
66        Self {
67            unmount_tx: Some(unmount_tx),
68        }
69    }
70
71    /// Signal the session to unmount
72    fn unmount(&mut self) {
73        if let Some(tx) = self.unmount_tx.take() {
74            let _ = tx.send(());
75        }
76    }
77}
78
79/// A live mount with its associated state
80pub struct LiveMount {
81    /// The bucket mount (kept alive for quick access)
82    pub mount: Arc<RwLock<Mount>>,
83    /// FUSE session handle (if running)
84    pub session: Option<SessionHandle>,
85    /// File cache for this mount
86    pub cache: FileCache,
87    /// Configuration from database
88    pub config: FuseMount,
89}
90
91impl std::fmt::Debug for LiveMount {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("LiveMount")
94            .field("config", &self.config)
95            .field("has_session", &self.session.is_some())
96            .finish()
97    }
98}
99
100/// Manager for FUSE mounts
101///
102/// Keeps mounts alive and synced, handling lifecycle and cache invalidation.
103pub struct MountManager {
104    /// Live mounts: mount_id → LiveMount
105    mounts: RwLock<HashMap<Uuid, LiveMount>>,
106    /// Database for persistence
107    db: Database,
108    /// Peer for sync integration
109    peer: Peer<Database>,
110    /// Sync event broadcaster
111    sync_tx: broadcast::Sender<SyncEvent>,
112    /// Port for the daemon API server (FUSE routes mutations through this)
113    api_port: u16,
114}
115
116impl MountManager {
117    /// Create a new mount manager
118    pub fn new(db: Database, peer: Peer<Database>, config: MountManagerConfig) -> Self {
119        let (sync_tx, _) = broadcast::channel(config.sync_event_capacity);
120
121        Self {
122            mounts: RwLock::new(HashMap::new()),
123            db,
124            peer,
125            sync_tx,
126            api_port: config.api_port,
127        }
128    }
129
130    /// Subscribe to sync events
131    pub fn subscribe_sync_events(&self) -> broadcast::Receiver<SyncEvent> {
132        self.sync_tx.subscribe()
133    }
134
135    /// Get a reference to the sync event sender
136    pub fn sync_sender(&self) -> broadcast::Sender<SyncEvent> {
137        self.sync_tx.clone()
138    }
139
140    /// Called when a bucket sync completes - refresh affected mounts
141    ///
142    /// If the live mount has unsaved local changes (ops_log non-empty), merges them
143    /// with incoming changes using conflict resolution. Otherwise, simply reloads
144    /// from the new head.
145    pub async fn on_bucket_synced(&self, bucket_id: Uuid) -> Result<(), MountError> {
146        let mounts = self.mounts.read().await;
147
148        for (mount_id, live_mount) in mounts.iter() {
149            if *live_mount.config.bucket_id == bucket_id {
150                // Check if mount has unsaved local changes
151                let has_local_changes = {
152                    let mount_guard = live_mount.mount.read().await;
153                    let inner = mount_guard.inner().await;
154                    !inner.ops_log().is_empty()
155                };
156
157                if has_local_changes {
158                    tracing::info!(
159                        "Mount {} has local changes, merging with incoming sync",
160                        mount_id
161                    );
162
163                    // Load the incoming mount from the new head
164                    let incoming = self
165                        .peer
166                        .mount(bucket_id)
167                        .await
168                        .map_err(|e| MountError::MountLoad(e.into()))?;
169
170                    // Use ConflictFile resolver to create conflict copies for concurrent edits
171                    let resolver = ConflictFile::new();
172
173                    // Merge incoming changes into local mount
174                    let mut mount_guard = live_mount.mount.write().await;
175                    match mount_guard
176                        .merge_from(&incoming, &resolver, self.peer.blobs())
177                        .await
178                    {
179                        Ok((result, link)) => {
180                            // Log any conflicts that were resolved
181                            for resolved in &result.conflicts_resolved {
182                                tracing::info!(
183                                    "Resolved conflict for {:?}: {:?}",
184                                    resolved.conflict.path,
185                                    resolved.resolution
186                                );
187                            }
188
189                            tracing::info!(
190                                "Merged {} operations, {} conflicts resolved, new link: {}",
191                                result.operations_added,
192                                result.conflicts_resolved.len(),
193                                link.hash()
194                            );
195
196                            // Save the merged result
197                            if let Err(e) = self.peer.save_mount(&mount_guard, None).await {
198                                tracing::error!("Failed to save merged mount {}: {}", mount_id, e);
199                            }
200                        }
201                        Err(e) => {
202                            tracing::error!(
203                                "Failed to merge mount {} with incoming changes: {}",
204                                mount_id,
205                                e
206                            );
207                            // Fall back to simple reload
208                            let new_mount = self
209                                .peer
210                                .mount(bucket_id)
211                                .await
212                                .map_err(|e| MountError::MountLoad(e.into()))?;
213                            *mount_guard = new_mount;
214                        }
215                    }
216                } else {
217                    // No local changes - simple reload from updated head
218                    let new_mount = self
219                        .peer
220                        .mount(bucket_id)
221                        .await
222                        .map_err(|e| MountError::MountLoad(e.into()))?;
223
224                    *live_mount.mount.write().await = new_mount;
225                }
226
227                // Invalidate cache
228                live_mount.cache.invalidate_all();
229
230                // Notify subscribers
231                let _ = self.sync_tx.send(SyncEvent::MountInvalidated {
232                    mount_id: *mount_id,
233                });
234
235                tracing::debug!(
236                    "Mount {} cache invalidated after bucket {} sync",
237                    mount_id,
238                    bucket_id
239                );
240            }
241        }
242
243        Ok(())
244    }
245
246    /// Create a new mount configuration
247    pub async fn create_mount(
248        &self,
249        bucket_id: Uuid,
250        mount_point: &str,
251        auto_mount: bool,
252        read_only: bool,
253        cache_size_mb: Option<u32>,
254        cache_ttl_secs: Option<u32>,
255    ) -> Result<FuseMount, MountError> {
256        // Validate bucket exists
257        let bucket_info = self
258            .db
259            .get_bucket_info(&bucket_id)
260            .await
261            .map_err(MountError::Database)?;
262
263        if bucket_info.is_none() {
264            return Err(MountError::BucketNotFound(bucket_id));
265        }
266
267        // Validate or create mount point
268        let mount_path = PathBuf::from(mount_point);
269        if !mount_path.exists() {
270            std::fs::create_dir_all(&mount_path).map_err(|e| {
271                MountError::MountPointNotFound(format!("{} (failed to create: {})", mount_point, e))
272            })?;
273        }
274
275        if !mount_path.is_dir() {
276            return Err(MountError::MountPointNotDirectory(mount_point.to_string()));
277        }
278
279        // Create in database
280        let mount = FuseMount::create(
281            bucket_id,
282            mount_point,
283            auto_mount,
284            read_only,
285            cache_size_mb.map(|v| v as i64),
286            cache_ttl_secs.map(|v| v as i64),
287            &self.db,
288        )
289        .await
290        .map_err(MountError::Database)?;
291
292        tracing::info!(
293            "Created mount {} for bucket {} at {}",
294            mount.mount_id,
295            mount.bucket_id,
296            mount.mount_point
297        );
298
299        Ok(mount)
300    }
301
302    /// Get a mount by ID
303    pub async fn get(&self, mount_id: &Uuid) -> Result<Option<FuseMount>, MountError> {
304        FuseMount::get(*mount_id, &self.db)
305            .await
306            .map_err(MountError::Database)
307    }
308
309    /// List all mounts
310    pub async fn list(&self) -> Result<Vec<FuseMount>, MountError> {
311        FuseMount::list(&self.db)
312            .await
313            .map_err(MountError::Database)
314    }
315
316    /// Update a mount configuration
317    #[allow(clippy::too_many_arguments)]
318    pub async fn update(
319        &self,
320        mount_id: &Uuid,
321        mount_point: Option<&str>,
322        enabled: Option<bool>,
323        auto_mount: Option<bool>,
324        read_only: Option<bool>,
325        cache_size_mb: Option<u32>,
326        cache_ttl_secs: Option<u32>,
327    ) -> Result<Option<FuseMount>, MountError> {
328        FuseMount::update(
329            *mount_id,
330            mount_point,
331            enabled,
332            auto_mount,
333            read_only,
334            cache_size_mb.map(|v| v as i64),
335            cache_ttl_secs.map(|v| v as i64),
336            &self.db,
337        )
338        .await
339        .map_err(MountError::Database)
340    }
341
342    /// Delete a mount configuration
343    pub async fn delete(&self, mount_id: &Uuid) -> Result<bool, MountError> {
344        // First stop if running
345        let _ = self.stop(mount_id).await;
346
347        // Remove from live mounts
348        self.mounts.write().await.remove(mount_id);
349
350        // Delete from database
351        FuseMount::delete(*mount_id, &self.db)
352            .await
353            .map_err(MountError::Database)
354    }
355
356    /// Start a mount (spawn FUSE process)
357    pub async fn start(&self, mount_id: &Uuid) -> Result<(), MountError> {
358        // Get mount config
359        let mount_config = FuseMount::get(*mount_id, &self.db)
360            .await
361            .map_err(MountError::Database)?
362            .ok_or(MountError::MountNotFound(*mount_id))?;
363
364        // Check if already running
365        {
366            let mounts = self.mounts.read().await;
367            if let Some(live) = mounts.get(mount_id) {
368                if live.session.is_some() {
369                    return Err(MountError::AlreadyRunning(*mount_id));
370                }
371            }
372        }
373
374        // Update status to starting
375        FuseMount::update_status(*mount_id, MountStatus::Starting, None, &self.db)
376            .await
377            .map_err(MountError::Database)?;
378
379        // Load the bucket mount
380        let bucket_mount = self
381            .peer
382            .mount(*mount_config.bucket_id)
383            .await
384            .map_err(|e| MountError::MountLoad(e.into()))?;
385
386        // Create cache
387        let cache = FileCache::new(FileCacheConfig::from_basic(
388            mount_config.cache_size_mb as u32,
389            mount_config.cache_ttl_secs as u32,
390        ));
391
392        // Create the FUSE filesystem with direct Mount reference for reads
393        // and HTTP API client for mutations (persistence handled by the API)
394        let mount_arc = Arc::new(RwLock::new(bucket_mount));
395        let sync_rx = self.subscribe_sync_events();
396
397        let api_base_url = url::Url::parse(&format!("http://localhost:{}", self.api_port))
398            .expect("valid localhost URL");
399        let api_client = ApiClient::new(&api_base_url)
400            .map_err(|e| MountError::SpawnFailed(format!("Failed to create API client: {}", e)))?;
401
402        let fs = JaxFs::new(
403            tokio::runtime::Handle::current(),
404            mount_arc.clone(),
405            *mount_id,
406            *mount_config.bucket_id,
407            FileCacheConfig::from_basic(
408                mount_config.cache_size_mb as u32,
409                mount_config.cache_ttl_secs as u32,
410            ),
411            *mount_config.read_only,
412            Some(sync_rx),
413            api_client,
414        );
415
416        // Mount options
417        #[cfg(target_os = "linux")]
418        let options = vec![
419            fuser::MountOption::FSName("jax".to_string()),
420            fuser::MountOption::AutoUnmount,
421            fuser::MountOption::AllowOther,
422        ];
423
424        #[cfg(target_os = "macos")]
425        let options = {
426            // Get bucket name for volume label (macOS only)
427            let bucket_name = self
428                .db
429                .get_bucket_info(&mount_config.bucket_id)
430                .await
431                .ok()
432                .flatten()
433                .map(|info| info.name)
434                .unwrap_or_else(|| "jax".to_string());
435
436            vec![
437                fuser::MountOption::FSName("jax".to_string()),
438                fuser::MountOption::AutoUnmount,
439                fuser::MountOption::CUSTOM(format!("volname={}", bucket_name)),
440                fuser::MountOption::CUSTOM("local".to_string()),
441                fuser::MountOption::CUSTOM("noappledouble".to_string()),
442            ]
443        };
444
445        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
446        let options = vec![
447            fuser::MountOption::FSName("jax".to_string()),
448            fuser::MountOption::AutoUnmount,
449        ];
450
451        // Spawn the FUSE session in background
452        let mount_path = std::path::Path::new(&mount_config.mount_point);
453        tracing::info!("Mounting FUSE filesystem at {:?}", mount_path);
454
455        let session = fuser::spawn_mount2(fs, mount_path, &options).map_err(|e| {
456            MountError::SpawnFailed(format!(
457                "Failed to mount at {}: {}",
458                mount_config.mount_point, e
459            ))
460        })?;
461
462        // Wrap session in a handle for thread safety
463        let session_handle = SessionHandle::spawn(session);
464
465        // Create live mount
466        let live_mount = LiveMount {
467            mount: mount_arc,
468            session: Some(session_handle),
469            cache,
470            config: mount_config.clone(),
471        };
472
473        // Store in live mounts
474        self.mounts.write().await.insert(*mount_id, live_mount);
475
476        // Update status to running
477        FuseMount::update_status(*mount_id, MountStatus::Running, None, &self.db)
478            .await
479            .map_err(MountError::Database)?;
480
481        tracing::info!("Started mount {} at {}", mount_id, mount_config.mount_point);
482
483        Ok(())
484    }
485
486    /// Stop a mount
487    pub async fn stop(&self, mount_id: &Uuid) -> Result<(), MountError> {
488        // Update status to stopping
489        let _ = FuseMount::update_status(*mount_id, MountStatus::Stopping, None, &self.db).await;
490
491        let mount_point = {
492            let mut mounts = self.mounts.write().await;
493            if let Some(live) = mounts.get_mut(mount_id) {
494                // Signal the session to unmount
495                if let Some(ref mut session) = live.session {
496                    session.unmount();
497                }
498                live.session.take();
499
500                live.config.mount_point.clone()
501            } else {
502                // Try to get from database
503                match FuseMount::get(*mount_id, &self.db).await {
504                    Ok(Some(config)) => config.mount_point,
505                    _ => return Ok(()),
506                }
507            }
508        };
509
510        // Platform-specific unmount
511        self.unmount_path(&mount_point).await?;
512
513        // Update status to stopped
514        FuseMount::update_status(*mount_id, MountStatus::Stopped, None, &self.db)
515            .await
516            .map_err(MountError::Database)?;
517
518        tracing::info!("Stopped mount {} at {}", mount_id, mount_point);
519
520        Ok(())
521    }
522
523    /// Stop all running mounts
524    pub async fn stop_all(&self) -> Result<(), MountError> {
525        let mount_ids: Vec<Uuid> = {
526            let mounts = self.mounts.read().await;
527            mounts.keys().copied().collect()
528        };
529
530        for mount_id in mount_ids {
531            if let Err(e) = self.stop(&mount_id).await {
532                tracing::error!("Failed to stop mount {}: {}", mount_id, e);
533            }
534        }
535
536        Ok(())
537    }
538
539    /// Start all mounts configured for auto-mount
540    pub async fn start_auto(&self) -> Result<(), MountError> {
541        let auto_mounts = FuseMount::auto_list(&self.db)
542            .await
543            .map_err(MountError::Database)?;
544
545        tracing::info!("Starting {} auto-mount(s)", auto_mounts.len());
546
547        for mount in auto_mounts {
548            if let Err(e) = self.start(&mount.mount_id).await {
549                tracing::error!(
550                    "Failed to auto-mount {} at {}: {}",
551                    mount.mount_id,
552                    mount.mount_point,
553                    e
554                );
555
556                // Update status to error
557                let _ = FuseMount::update_status(
558                    *mount.mount_id,
559                    MountStatus::Error,
560                    Some(&e.to_string()),
561                    &self.db,
562                )
563                .await;
564            }
565        }
566
567        Ok(())
568    }
569
570    /// Get a live mount by ID
571    pub async fn get_live_mount(&self, mount_id: &Uuid) -> Option<Arc<RwLock<Mount>>> {
572        let mounts = self.mounts.read().await;
573        mounts.get(mount_id).map(|m| m.mount.clone())
574    }
575
576    /// Get the cache for a live mount
577    pub async fn get_mount_cache(&self, mount_id: &Uuid) -> Option<FileCache> {
578        let mounts = self.mounts.read().await;
579        mounts.get(mount_id).map(|m| m.cache.clone())
580    }
581
582    /// Platform-specific unmount
583    async fn unmount_path(&self, mount_point: &str) -> Result<(), MountError> {
584        use std::process::Command;
585
586        #[cfg(target_os = "macos")]
587        {
588            let status = Command::new("umount")
589                .arg(mount_point)
590                .status()
591                .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
592
593            if !status.success() {
594                // Try diskutil as fallback
595                let _ = Command::new("diskutil")
596                    .args(["unmount", "force", mount_point])
597                    .status();
598            }
599        }
600
601        #[cfg(target_os = "linux")]
602        {
603            let status = Command::new("fusermount")
604                .args(["-u", mount_point])
605                .status()
606                .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
607
608            if !status.success() {
609                // Try lazy unmount as fallback
610                let _ = Command::new("fusermount")
611                    .args(["-uz", mount_point])
612                    .status();
613            }
614        }
615
616        #[cfg(not(any(target_os = "macos", target_os = "linux")))]
617        {
618            tracing::warn!("Unmount not implemented for this platform");
619        }
620
621        Ok(())
622    }
623}
624
625impl std::fmt::Debug for MountManager {
626    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
627        f.debug_struct("MountManager")
628            .field("peer_id", &self.peer.id())
629            .finish()
630    }
631}
632
633/// Errors that can occur during mount operations
634#[derive(Debug, thiserror::Error)]
635pub enum MountError {
636    #[error("database error: {0}")]
637    Database(#[from] sqlx::Error),
638
639    #[error("bucket not found: {0}")]
640    BucketNotFound(Uuid),
641
642    #[error("mount not found: {0}")]
643    MountNotFound(Uuid),
644
645    #[error("mount point not found: {0}")]
646    MountPointNotFound(String),
647
648    #[error("mount point is not a directory: {0}")]
649    MountPointNotDirectory(String),
650
651    #[error("mount already running: {0}")]
652    AlreadyRunning(Uuid),
653
654    #[error("failed to load bucket mount: {0}")]
655    MountLoad(#[source] anyhow::Error),
656
657    #[error("failed to spawn FUSE process: {0}")]
658    SpawnFailed(String),
659
660    #[error("failed to unmount: {0}")]
661    UnmountFailed(String),
662}