use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{broadcast, oneshot, RwLock};
use uuid::Uuid;
use crate::database::models::FuseMount;
use crate::database::types::MountStatus;
use crate::database::Database;
use crate::fuse::cache::FileCacheConfig;
use crate::fuse::jax_fs::JaxFs;
use crate::fuse::sync_events::SyncEvent;
use crate::fuse::FileCache;
use crate::http_server::api::client::ApiClient;
use common::mount::{ConflictFile, Mount};
use common::peer::Peer;
#[derive(Debug, Clone)]
pub struct MountManagerConfig {
pub sync_event_capacity: usize,
pub api_port: u16,
}
impl Default for MountManagerConfig {
fn default() -> Self {
Self {
sync_event_capacity: 256,
api_port: 5001,
}
}
}
pub struct SessionHandle {
unmount_tx: Option<oneshot::Sender<()>>,
}
impl SessionHandle {
fn spawn(session: fuser::BackgroundSession) -> Self {
let (unmount_tx, unmount_rx) = oneshot::channel();
std::thread::spawn(move || {
let _session = session;
let _ = unmount_rx.blocking_recv();
});
Self {
unmount_tx: Some(unmount_tx),
}
}
fn unmount(&mut self) {
if let Some(tx) = self.unmount_tx.take() {
let _ = tx.send(());
}
}
}
pub struct LiveMount {
pub mount: Arc<RwLock<Mount>>,
pub session: Option<SessionHandle>,
pub cache: FileCache,
pub config: FuseMount,
}
impl std::fmt::Debug for LiveMount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LiveMount")
.field("config", &self.config)
.field("has_session", &self.session.is_some())
.finish()
}
}
pub struct MountManager {
mounts: RwLock<HashMap<Uuid, LiveMount>>,
db: Database,
peer: Peer<Database>,
sync_tx: broadcast::Sender<SyncEvent>,
api_port: u16,
}
impl MountManager {
pub fn new(db: Database, peer: Peer<Database>, config: MountManagerConfig) -> Self {
let (sync_tx, _) = broadcast::channel(config.sync_event_capacity);
Self {
mounts: RwLock::new(HashMap::new()),
db,
peer,
sync_tx,
api_port: config.api_port,
}
}
pub fn subscribe_sync_events(&self) -> broadcast::Receiver<SyncEvent> {
self.sync_tx.subscribe()
}
pub fn sync_sender(&self) -> broadcast::Sender<SyncEvent> {
self.sync_tx.clone()
}
pub async fn on_bucket_synced(&self, bucket_id: Uuid) -> Result<(), MountError> {
let mounts = self.mounts.read().await;
for (mount_id, live_mount) in mounts.iter() {
if *live_mount.config.bucket_id == bucket_id {
let has_local_changes = {
let mount_guard = live_mount.mount.read().await;
let inner = mount_guard.inner().await;
!inner.ops_log().is_empty()
};
if has_local_changes {
tracing::info!(
"Mount {} has local changes, merging with incoming sync",
mount_id
);
let incoming = self
.peer
.mount(bucket_id)
.await
.map_err(|e| MountError::MountLoad(e.into()))?;
let resolver = ConflictFile::new();
let mut mount_guard = live_mount.mount.write().await;
match mount_guard
.merge_from(&incoming, &resolver, self.peer.blobs())
.await
{
Ok((result, link)) => {
for resolved in &result.conflicts_resolved {
tracing::info!(
"Resolved conflict for {:?}: {:?}",
resolved.conflict.path,
resolved.resolution
);
}
tracing::info!(
"Merged {} operations, {} conflicts resolved, new link: {}",
result.operations_added,
result.conflicts_resolved.len(),
link.hash()
);
if let Err(e) = self.peer.save_mount(&mount_guard, None).await {
tracing::error!("Failed to save merged mount {}: {}", mount_id, e);
}
}
Err(e) => {
tracing::error!(
"Failed to merge mount {} with incoming changes: {}",
mount_id,
e
);
let new_mount = self
.peer
.mount(bucket_id)
.await
.map_err(|e| MountError::MountLoad(e.into()))?;
*mount_guard = new_mount;
}
}
} else {
let new_mount = self
.peer
.mount(bucket_id)
.await
.map_err(|e| MountError::MountLoad(e.into()))?;
*live_mount.mount.write().await = new_mount;
}
live_mount.cache.invalidate_all();
let _ = self.sync_tx.send(SyncEvent::MountInvalidated {
mount_id: *mount_id,
});
tracing::debug!(
"Mount {} cache invalidated after bucket {} sync",
mount_id,
bucket_id
);
}
}
Ok(())
}
pub async fn create_mount(
&self,
bucket_id: Uuid,
mount_point: &str,
auto_mount: bool,
read_only: bool,
cache_size_mb: Option<u32>,
cache_ttl_secs: Option<u32>,
) -> Result<FuseMount, MountError> {
let bucket_info = self
.db
.get_bucket_info(&bucket_id)
.await
.map_err(MountError::Database)?;
if bucket_info.is_none() {
return Err(MountError::BucketNotFound(bucket_id));
}
let mount_path = PathBuf::from(mount_point);
if !mount_path.exists() {
std::fs::create_dir_all(&mount_path).map_err(|e| {
MountError::MountPointNotFound(format!("{} (failed to create: {})", mount_point, e))
})?;
}
if !mount_path.is_dir() {
return Err(MountError::MountPointNotDirectory(mount_point.to_string()));
}
let mount = FuseMount::create(
bucket_id,
mount_point,
auto_mount,
read_only,
cache_size_mb.map(|v| v as i64),
cache_ttl_secs.map(|v| v as i64),
&self.db,
)
.await
.map_err(MountError::Database)?;
tracing::info!(
"Created mount {} for bucket {} at {}",
mount.mount_id,
mount.bucket_id,
mount.mount_point
);
Ok(mount)
}
pub async fn get(&self, mount_id: &Uuid) -> Result<Option<FuseMount>, MountError> {
FuseMount::get(*mount_id, &self.db)
.await
.map_err(MountError::Database)
}
pub async fn list(&self) -> Result<Vec<FuseMount>, MountError> {
FuseMount::list(&self.db)
.await
.map_err(MountError::Database)
}
#[allow(clippy::too_many_arguments)]
pub async fn update(
&self,
mount_id: &Uuid,
mount_point: Option<&str>,
enabled: Option<bool>,
auto_mount: Option<bool>,
read_only: Option<bool>,
cache_size_mb: Option<u32>,
cache_ttl_secs: Option<u32>,
) -> Result<Option<FuseMount>, MountError> {
FuseMount::update(
*mount_id,
mount_point,
enabled,
auto_mount,
read_only,
cache_size_mb.map(|v| v as i64),
cache_ttl_secs.map(|v| v as i64),
&self.db,
)
.await
.map_err(MountError::Database)
}
pub async fn delete(&self, mount_id: &Uuid) -> Result<bool, MountError> {
let _ = self.stop(mount_id).await;
self.mounts.write().await.remove(mount_id);
FuseMount::delete(*mount_id, &self.db)
.await
.map_err(MountError::Database)
}
pub async fn start(&self, mount_id: &Uuid) -> Result<(), MountError> {
let mount_config = FuseMount::get(*mount_id, &self.db)
.await
.map_err(MountError::Database)?
.ok_or(MountError::MountNotFound(*mount_id))?;
{
let mounts = self.mounts.read().await;
if let Some(live) = mounts.get(mount_id) {
if live.session.is_some() {
return Err(MountError::AlreadyRunning(*mount_id));
}
}
}
FuseMount::update_status(*mount_id, MountStatus::Starting, None, &self.db)
.await
.map_err(MountError::Database)?;
let bucket_mount = self
.peer
.mount(*mount_config.bucket_id)
.await
.map_err(|e| MountError::MountLoad(e.into()))?;
let cache = FileCache::new(FileCacheConfig::from_basic(
mount_config.cache_size_mb as u32,
mount_config.cache_ttl_secs as u32,
));
let mount_arc = Arc::new(RwLock::new(bucket_mount));
let sync_rx = self.subscribe_sync_events();
let api_base_url = url::Url::parse(&format!("http://localhost:{}", self.api_port))
.expect("valid localhost URL");
let api_client = ApiClient::new(&api_base_url)
.map_err(|e| MountError::SpawnFailed(format!("Failed to create API client: {}", e)))?;
let fs = JaxFs::new(
tokio::runtime::Handle::current(),
mount_arc.clone(),
*mount_id,
*mount_config.bucket_id,
FileCacheConfig::from_basic(
mount_config.cache_size_mb as u32,
mount_config.cache_ttl_secs as u32,
),
*mount_config.read_only,
Some(sync_rx),
api_client,
);
#[cfg(target_os = "linux")]
let options = vec![
fuser::MountOption::FSName("jax".to_string()),
fuser::MountOption::AutoUnmount,
fuser::MountOption::AllowOther,
];
#[cfg(target_os = "macos")]
let options = {
let bucket_name = self
.db
.get_bucket_info(&mount_config.bucket_id)
.await
.ok()
.flatten()
.map(|info| info.name)
.unwrap_or_else(|| "jax".to_string());
vec![
fuser::MountOption::FSName("jax".to_string()),
fuser::MountOption::AutoUnmount,
fuser::MountOption::CUSTOM(format!("volname={}", bucket_name)),
fuser::MountOption::CUSTOM("local".to_string()),
fuser::MountOption::CUSTOM("noappledouble".to_string()),
]
};
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
let options = vec![
fuser::MountOption::FSName("jax".to_string()),
fuser::MountOption::AutoUnmount,
];
let mount_path = std::path::Path::new(&mount_config.mount_point);
tracing::info!("Mounting FUSE filesystem at {:?}", mount_path);
let session = fuser::spawn_mount2(fs, mount_path, &options).map_err(|e| {
MountError::SpawnFailed(format!(
"Failed to mount at {}: {}",
mount_config.mount_point, e
))
})?;
let session_handle = SessionHandle::spawn(session);
let live_mount = LiveMount {
mount: mount_arc,
session: Some(session_handle),
cache,
config: mount_config.clone(),
};
self.mounts.write().await.insert(*mount_id, live_mount);
FuseMount::update_status(*mount_id, MountStatus::Running, None, &self.db)
.await
.map_err(MountError::Database)?;
tracing::info!("Started mount {} at {}", mount_id, mount_config.mount_point);
Ok(())
}
pub async fn stop(&self, mount_id: &Uuid) -> Result<(), MountError> {
let _ = FuseMount::update_status(*mount_id, MountStatus::Stopping, None, &self.db).await;
let mount_point = {
let mut mounts = self.mounts.write().await;
if let Some(live) = mounts.get_mut(mount_id) {
if let Some(ref mut session) = live.session {
session.unmount();
}
live.session.take();
live.config.mount_point.clone()
} else {
match FuseMount::get(*mount_id, &self.db).await {
Ok(Some(config)) => config.mount_point,
_ => return Ok(()),
}
}
};
self.unmount_path(&mount_point).await?;
FuseMount::update_status(*mount_id, MountStatus::Stopped, None, &self.db)
.await
.map_err(MountError::Database)?;
tracing::info!("Stopped mount {} at {}", mount_id, mount_point);
Ok(())
}
pub async fn stop_all(&self) -> Result<(), MountError> {
let mount_ids: Vec<Uuid> = {
let mounts = self.mounts.read().await;
mounts.keys().copied().collect()
};
for mount_id in mount_ids {
if let Err(e) = self.stop(&mount_id).await {
tracing::error!("Failed to stop mount {}: {}", mount_id, e);
}
}
Ok(())
}
pub async fn start_auto(&self) -> Result<(), MountError> {
let auto_mounts = FuseMount::auto_list(&self.db)
.await
.map_err(MountError::Database)?;
tracing::info!("Starting {} auto-mount(s)", auto_mounts.len());
for mount in auto_mounts {
if let Err(e) = self.start(&mount.mount_id).await {
tracing::error!(
"Failed to auto-mount {} at {}: {}",
mount.mount_id,
mount.mount_point,
e
);
let _ = FuseMount::update_status(
*mount.mount_id,
MountStatus::Error,
Some(&e.to_string()),
&self.db,
)
.await;
}
}
Ok(())
}
pub async fn get_live_mount(&self, mount_id: &Uuid) -> Option<Arc<RwLock<Mount>>> {
let mounts = self.mounts.read().await;
mounts.get(mount_id).map(|m| m.mount.clone())
}
pub async fn get_mount_cache(&self, mount_id: &Uuid) -> Option<FileCache> {
let mounts = self.mounts.read().await;
mounts.get(mount_id).map(|m| m.cache.clone())
}
async fn unmount_path(&self, mount_point: &str) -> Result<(), MountError> {
use std::process::Command;
#[cfg(target_os = "macos")]
{
let status = Command::new("umount")
.arg(mount_point)
.status()
.map_err(|e| MountError::UnmountFailed(e.to_string()))?;
if !status.success() {
let _ = Command::new("diskutil")
.args(["unmount", "force", mount_point])
.status();
}
}
#[cfg(target_os = "linux")]
{
let status = Command::new("fusermount")
.args(["-u", mount_point])
.status()
.map_err(|e| MountError::UnmountFailed(e.to_string()))?;
if !status.success() {
let _ = Command::new("fusermount")
.args(["-uz", mount_point])
.status();
}
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
{
tracing::warn!("Unmount not implemented for this platform");
}
Ok(())
}
}
impl std::fmt::Debug for MountManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MountManager")
.field("peer_id", &self.peer.id())
.finish()
}
}
#[derive(Debug, thiserror::Error)]
pub enum MountError {
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
#[error("bucket not found: {0}")]
BucketNotFound(Uuid),
#[error("mount not found: {0}")]
MountNotFound(Uuid),
#[error("mount point not found: {0}")]
MountPointNotFound(String),
#[error("mount point is not a directory: {0}")]
MountPointNotDirectory(String),
#[error("mount already running: {0}")]
AlreadyRunning(Uuid),
#[error("failed to load bucket mount: {0}")]
MountLoad(#[source] anyhow::Error),
#[error("failed to spawn FUSE process: {0}")]
SpawnFailed(String),
#[error("failed to unmount: {0}")]
UnmountFailed(String),
}