use async_trait::async_trait;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;
use std::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use super::manager::MountManager;
use super::types::MountInfo;
use super::types::MountMetadata;
use super::types::MountOptions;
use super::types::MountStatus;
use super::utils;
use crate::error::Result;
use crate::error::ThoughtsError;
use crate::platform::common::MOUNT_RETRY_DELAY;
use crate::platform::common::MOUNT_TIMEOUT;
use crate::platform::common::MOUNT_VERIFY_TIMEOUT;
use crate::platform::common::UNMOUNT_TIMEOUT;
use crate::platform::detector::LinuxInfo;
use crate::platform::linux::DEFAULT_MOUNT_OPTIONS;
use crate::platform::linux::MERGERFS_FSTYPE;
use crate::platform::linux::PROC_MOUNTINFO;
use crate::platform::linux::PROC_MOUNTS;
pub struct MergerfsManager {
mergerfs_path: Option<PathBuf>,
fusermount_path: Option<PathBuf>,
}
impl MergerfsManager {
pub fn new(platform_info: LinuxInfo) -> Self {
let mergerfs_path = platform_info
.mergerfs_path
.clone()
.or_else(|| which::which("mergerfs").ok());
let fusermount_path = platform_info
.fusermount_path
.or_else(|| which::which("fusermount").ok())
.or_else(|| which::which("fusermount3").ok());
Self {
mergerfs_path,
fusermount_path,
}
}
#[cfg(test)]
fn new_without_fallback(
mergerfs_path: Option<PathBuf>,
fusermount_path: Option<PathBuf>,
) -> Self {
Self {
mergerfs_path,
fusermount_path,
}
}
fn build_mount_args(sources: &[PathBuf], target: &Path, options: &MountOptions) -> Vec<String> {
let mut args = Vec::new();
args.push("-o".to_string());
let mut opts = Vec::new();
opts.extend(
DEFAULT_MOUNT_OPTIONS
.iter()
.map(std::string::ToString::to_string),
);
if options.read_only {
opts.push("ro".to_string());
}
if options.allow_other {
opts.push("allow_other".to_string());
}
opts.extend(options.extra_options.clone());
args.push(opts.join(","));
let source_str = sources
.iter()
.map(|p| p.display().to_string())
.collect::<Vec<_>>()
.join(":");
args.push(source_str);
args.push(target.display().to_string());
args
}
async fn parse_proc_mounts(&self) -> Result<Vec<MountInfo>> {
use tokio::fs;
let content = fs::read_to_string(PROC_MOUNTS).await?;
let mut mounts = Vec::new();
for line in content.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 6 {
continue;
}
let fs_type = fields[2];
if fs_type != MERGERFS_FSTYPE {
continue;
}
let sources_str = fields[0];
let target = PathBuf::from(fields[1]);
let options = fields[3]
.split(',')
.map(std::string::ToString::to_string)
.collect();
let sources: Vec<PathBuf> = sources_str.split(':').map(PathBuf::from).collect();
mounts.push(MountInfo {
target,
sources,
status: MountStatus::Mounted,
fs_type: fs_type.to_string(),
options,
mounted_at: None, pid: None,
metadata: MountMetadata::Linux {
mount_id: None,
parent_id: None,
major_minor: None,
},
});
}
Ok(mounts)
}
async fn get_detailed_mount_info(&self, target: &Path) -> Result<Option<MountInfo>> {
use tokio::fs;
let target_canon = std::fs::canonicalize(target).unwrap_or_else(|_| target.to_path_buf());
let Ok(content) = fs::read_to_string(PROC_MOUNTINFO).await else {
let mounts = self.parse_proc_mounts().await?;
return Ok(mounts.into_iter().find(|m| {
let mt = std::fs::canonicalize(&m.target).unwrap_or_else(|_| m.target.clone());
mt == target_canon
}));
};
for line in content.lines() {
let fields: Vec<&str> = line.split_whitespace().collect();
if fields.len() < 10 {
continue;
}
let mount_id: u32 = fields[0].parse().unwrap_or(0);
let parent_id: u32 = fields[1].parse().unwrap_or(0);
let major_minor = fields[2].to_string();
let separator_pos = fields
.iter()
.position(|&f| f == "-")
.unwrap_or(fields.len());
if separator_pos + 3 >= fields.len() {
continue;
}
let mount_point = PathBuf::from(fields[4]);
let mount_point_canon =
std::fs::canonicalize(&mount_point).unwrap_or_else(|_| mount_point.clone());
if mount_point_canon != target_canon {
continue;
}
let fs_type = fields[separator_pos + 1];
if fs_type != MERGERFS_FSTYPE {
continue;
}
let sources_str = fields[separator_pos + 2];
let sources: Vec<PathBuf> = sources_str.split(':').map(PathBuf::from).collect();
let options: Vec<String> = fields[5..separator_pos]
.iter()
.flat_map(|o| o.split(','))
.map(std::string::ToString::to_string)
.collect();
return Ok(Some(MountInfo {
target: mount_point,
sources,
status: MountStatus::Mounted,
fs_type: fs_type.to_string(),
options,
mounted_at: None,
pid: None,
metadata: MountMetadata::Linux {
mount_id: Some(mount_id),
parent_id: Some(parent_id),
major_minor: Some(major_minor),
},
}));
}
Ok(None)
}
async fn unmount_with_umount(&self, target: &Path, force: bool) -> Result<()> {
let mut cmd = tokio::process::Command::new("umount");
if force {
cmd.arg("-l"); }
cmd.arg(target);
let output = timeout(UNMOUNT_TIMEOUT, cmd.output())
.await
.map_err(|_| ThoughtsError::CommandTimeout {
command: "umount".to_string(),
timeout_secs: UNMOUNT_TIMEOUT.as_secs(),
})?
.map_err(ThoughtsError::from)?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(ThoughtsError::MountOperationFailed {
message: format!("umount failed: {stderr}"),
});
}
Ok(())
}
}
#[async_trait]
impl MountManager for MergerfsManager {
async fn mount(
&self,
sources: &[PathBuf],
target: &Path,
options: &MountOptions,
) -> Result<()> {
let mergerfs_path =
self.mergerfs_path
.as_ref()
.ok_or_else(|| ThoughtsError::ToolNotFound {
tool: "mergerfs".to_string(),
})?;
if sources.is_empty() {
return Err(ThoughtsError::MountOperationFailed {
message: "No source directories provided".to_string(),
});
}
if !target.is_absolute() {
return Err(ThoughtsError::MountOperationFailed {
message: format!("Mount target must be absolute: {}", target.display()),
});
}
for source in sources {
if !source.is_absolute() {
return Err(ThoughtsError::MountOperationFailed {
message: format!("Mount source must be absolute: {}", source.display()),
});
}
}
for source in sources {
if !source.exists() {
return Err(ThoughtsError::MountSourceNotFound {
path: source.clone(),
});
}
}
utils::validate_mount_point(target).await?;
utils::ensure_mount_point(target).await?;
if self.is_mounted(target).await? {
info!("Target is already mounted: {}", target.display());
return Ok(());
}
let args = Self::build_mount_args(sources, target, options);
let _timeout = options.timeout.unwrap_or(MOUNT_TIMEOUT);
info!("Mounting {} sources to {}", sources.len(), target.display());
debug!(
"Mount command: {} {}",
mergerfs_path.display(),
args.join(" ")
);
for attempt in 0..=options.retries {
if attempt > 0 {
warn!("Mount attempt {} of {}", attempt + 1, options.retries + 1);
sleep(MOUNT_RETRY_DELAY).await;
}
let start = Instant::now();
let output = tokio::process::Command::new(mergerfs_path)
.args(&args)
.output()
.await?;
let duration = start.elapsed();
if output.status.success() {
info!("Successfully mounted in {:?}", duration);
let verified = utils::verify_with_polling(
|| async { self.is_mounted(target).await },
MOUNT_VERIFY_TIMEOUT,
Duration::from_millis(100),
)
.await?;
if verified {
return Ok(());
}
warn!(
"Mount command succeeded but target '{}' not visible after {}s polling",
target.display(),
MOUNT_VERIFY_TIMEOUT.as_secs()
);
if let Ok(content) = tokio::fs::read_to_string(PROC_MOUNTS).await {
let target_str = target.display().to_string();
let relevant: Vec<&str> = content
.lines()
.filter(|l| l.contains(&target_str) || l.contains(MERGERFS_FSTYPE))
.collect();
if !relevant.is_empty() {
warn!(
"Mount verification diagnostics for {}:\n {}",
target.display(),
relevant.join("\n ")
);
}
}
return Err(ThoughtsError::MountVerificationTimeout {
target: target.to_path_buf(),
timeout_secs: MOUNT_VERIFY_TIMEOUT.as_secs(),
});
}
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Mount failed: {}", stderr);
if attempt == options.retries {
return Err(ThoughtsError::MountOperationFailed {
message: format!("mergerfs mount failed: {stderr}"),
});
}
}
Err(ThoughtsError::MountOperationFailed {
message: "Mount failed after all retries".to_string(),
})
}
async fn unmount(&self, target: &Path, force: bool) -> Result<()> {
if !self.is_mounted(target).await? {
debug!("Target is not mounted: {}", target.display());
return Ok(());
}
info!("Unmounting {}", target.display());
if let Some(fusermount_path) = &self.fusermount_path {
let mut cmd = tokio::process::Command::new(fusermount_path);
cmd.arg("-u");
if force {
cmd.arg("-z"); }
cmd.arg(target);
let output = timeout(UNMOUNT_TIMEOUT, cmd.output())
.await
.map_err(|_| ThoughtsError::CommandTimeout {
command: "fusermount".to_string(),
timeout_secs: UNMOUNT_TIMEOUT.as_secs(),
})?
.map_err(ThoughtsError::from)?;
if output.status.success() {
info!(
"Successfully unmounted {} with fusermount",
target.display()
);
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
warn!("fusermount failed: {}, trying umount", stderr);
self.unmount_with_umount(target, force).await?;
}
} else {
debug!("fusermount not available, using umount");
self.unmount_with_umount(target, force).await?;
}
utils::cleanup_mount_point(target).await?;
info!("Successfully unmounted {}", target.display());
Ok(())
}
async fn is_mounted(&self, target: &Path) -> Result<bool> {
let mounts = self.parse_proc_mounts().await?;
let target_canon = std::fs::canonicalize(target).unwrap_or_else(|_| target.to_path_buf());
Ok(mounts.iter().any(|m| {
let mt = std::fs::canonicalize(&m.target).unwrap_or_else(|_| m.target.clone());
mt == target_canon
}))
}
async fn list_mounts(&self) -> Result<Vec<MountInfo>> {
self.parse_proc_mounts().await
}
async fn get_mount_info(&self, target: &Path) -> Result<Option<MountInfo>> {
self.get_detailed_mount_info(target).await
}
async fn check_health(&self) -> Result<()> {
let mergerfs_path =
self.mergerfs_path
.as_ref()
.ok_or_else(|| ThoughtsError::ToolNotFound {
tool: "mergerfs".to_string(),
})?;
if !Path::new("/dev/fuse").exists() {
return Err(ThoughtsError::MountOperationFailed {
message: "FUSE device not found. Is FUSE kernel module loaded?".to_string(),
});
}
let output = Command::new(mergerfs_path).arg("-V").output()?;
if !output.status.success() {
return Err(ThoughtsError::MountOperationFailed {
message: "Failed to get mergerfs version".to_string(),
});
}
debug!("mergerfs health check passed");
Ok(())
}
fn get_mount_command(
&self,
sources: &[PathBuf],
target: &Path,
options: &MountOptions,
) -> String {
let args = Self::build_mount_args(sources, target, options);
match &self.mergerfs_path {
Some(p) => format!("{} {}", p.display(), args.join(" ")),
None => "<mergerfs not available>".to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_linux_info_stub() -> LinuxInfo {
LinuxInfo {
distro: "Ubuntu".to_string(),
version: "22.04".to_string(),
has_mergerfs: true,
mergerfs_version: Some("2.33.5".to_string()),
fuse_available: true,
has_fusermount: false,
mergerfs_path: Some(PathBuf::from("/usr/bin/mergerfs")),
fusermount_path: None,
}
}
#[test]
fn test_build_mount_args() {
let sources = vec![PathBuf::from("/tmp/a"), PathBuf::from("/tmp/b")];
let target = Path::new("/mnt/merged");
let options = MountOptions {
read_only: true,
..Default::default()
};
let args = MergerfsManager::build_mount_args(&sources, target, &options);
assert_eq!(args[0], "-o");
assert!(args[1].contains("category.create=mfs"));
assert!(args[1].contains("ro"));
assert_eq!(args[2], "/tmp/a:/tmp/b");
assert_eq!(args[3], "/mnt/merged");
}
#[tokio::test]
async fn test_mount_validation() {
let manager = MergerfsManager::new(test_linux_info_stub());
let target = Path::new("/tmp/test_mount");
let options = MountOptions::default();
let result = manager.mount(&[], target, &options).await;
assert!(result.is_err());
let sources = vec![PathBuf::from("/this/does/not/exist")];
let result = manager.mount(&sources, target, &options).await;
assert!(result.is_err());
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_check_health_tool_missing() {
let manager = MergerfsManager::new_without_fallback(None, None);
let res = manager.check_health().await;
assert!(matches!(res, Err(ThoughtsError::ToolNotFound { .. })));
}
#[cfg(target_os = "linux")]
#[test]
fn test_manager_stores_absolute_path() {
let mut info = test_linux_info_stub();
let abs = PathBuf::from("/usr/local/bin/mergerfs");
info.mergerfs_path = Some(abs.clone());
let manager = MergerfsManager::new(info);
assert_eq!(manager.mergerfs_path, Some(abs));
}
#[cfg(target_os = "linux")]
#[test]
fn test_get_mount_command_when_missing_path() {
let manager = MergerfsManager::new_without_fallback(None, None);
let cmd = manager.get_mount_command(
&[PathBuf::from("/a")],
Path::new("/b"),
&MountOptions::default(),
);
assert_eq!(cmd, "<mergerfs not available>");
}
}