use std::{
collections::HashMap,
fs,
path::{Path, PathBuf},
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use anyhow::{Context, Result, anyhow};
use mount::{BlobCachePool, ContentAddressedMount, FuseShell, MountOptions, PrewarmHandle};
use repo::{
Repository,
daemon::{MountRegistryFile, PersistedMount, mount_daemon_registry_path},
};
use tracing::{debug, warn};
fn default_blob_cache_cap_bytes() -> usize {
const FOUR_GIB: usize = 4 * 1024 * 1024 * 1024;
const FALLBACK: usize = 1024 * 1024 * 1024;
let physical = probe_physical_ram_bytes().unwrap_or(0);
if physical == 0 {
return FALLBACK;
}
std::cmp::min(FOUR_GIB, physical / 4)
}
fn probe_physical_ram_bytes() -> Option<usize> {
#[cfg(target_os = "macos")]
{
use std::process::Command;
let out = Command::new("sysctl")
.args(["-n", "hw.memsize"])
.output()
.ok()?;
if !out.status.success() {
return None;
}
let s = std::str::from_utf8(&out.stdout).ok()?.trim();
s.parse::<usize>().ok()
}
#[cfg(target_os = "linux")]
{
let s = std::fs::read_to_string("/proc/meminfo").ok()?;
for line in s.lines() {
if let Some(rest) = line.strip_prefix("MemTotal:") {
let n = rest.split_whitespace().next()?.parse::<usize>().ok()?;
return Some(n * 1024); }
}
None
}
#[cfg(not(any(target_os = "macos", target_os = "linux")))]
{
None
}
}
pub struct LiveMount {
session: Option<mount::BackgroundSession>,
pub mount_path: PathBuf,
pub since_ms: u64,
_prewarm: Option<PrewarmHandle>,
}
impl LiveMount {
pub fn shutdown(&mut self) {
self.session = None;
self._prewarm = None;
}
}
pub enum MountOutcome {
Created,
Existing,
}
pub struct MountRegistry {
repo_root: PathBuf,
mounts: HashMap<String, LiveMount>,
blob_cache: Arc<BlobCachePool>,
}
impl MountRegistry {
pub fn new(repo_root: PathBuf) -> Self {
Self::with_blob_cache_capacity(repo_root, default_blob_cache_cap_bytes())
}
pub fn with_blob_cache_capacity(repo_root: PathBuf, cap_bytes: usize) -> Self {
Self {
repo_root,
mounts: HashMap::new(),
blob_cache: Arc::new(BlobCachePool::with_capacity(cap_bytes)),
}
}
#[allow(dead_code)]
pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
&self.blob_cache
}
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.mounts.is_empty()
}
pub fn len(&self) -> usize {
self.mounts.len()
}
pub fn mount(&mut self, thread_id: &str, mount_path: &Path) -> Result<MountOutcome> {
if let Some(existing) = self.mounts.get(thread_id) {
return if existing.mount_path == mount_path {
Ok(MountOutcome::Existing)
} else {
Err(anyhow!(
"thread '{thread_id}' is already mounted at {} (requested {})",
existing.mount_path.display(),
mount_path.display(),
))
};
}
fs::create_dir_all(mount_path)
.with_context(|| format!("create mount point {}", mount_path.display()))?;
let repo = Repository::open(&self.repo_root)
.with_context(|| format!("open repo at {} for mount", self.repo_root.display()))?;
let mount = ContentAddressedMount::with_options(
repo,
thread_id,
MountOptions {
blob_cache: Some(Arc::clone(&self.blob_cache)),
},
)
.map_err(|e| anyhow!("open content-addressed mount for {thread_id}: {e}"))?;
let prewarm = mount.prewarm();
let shell = FuseShell::new(mount);
let session = shell.mount_background(mount_path).map_err(|e| {
anyhow!(
"spawn FUSE background session at {}: {e}",
mount_path.display()
)
})?;
let since_ms = current_millis();
self.mounts.insert(
thread_id.to_string(),
LiveMount {
session: Some(session),
mount_path: mount_path.to_path_buf(),
since_ms,
_prewarm: Some(prewarm),
},
);
self.persist()?;
debug!(thread = thread_id, path = %mount_path.display(), "mount registered");
Ok(MountOutcome::Created)
}
pub fn unmount(&mut self, thread_id: &str) -> Result<bool> {
let Some(mut live) = self.mounts.remove(thread_id) else {
return Ok(false);
};
live.shutdown();
self.persist()?;
debug!(thread = thread_id, "mount unregistered");
Ok(true)
}
pub fn shutdown_all(&mut self) {
let drained: Vec<_> = self.mounts.drain().collect();
for (thread_id, mut live) in drained {
debug!(thread = %thread_id, path = %live.mount_path.display(), "unmounting on shutdown");
live.shutdown();
}
if let Err(error) = self.persist() {
warn!(%error, "failed to persist empty mount registry on shutdown");
}
let _ = fs::remove_file(mount_daemon_registry_path(&self.repo_root));
}
pub fn snapshot(&self) -> Vec<PersistedMount> {
self.mounts
.iter()
.map(|(thread_id, live)| PersistedMount {
thread_id: thread_id.clone(),
mount_path: live.mount_path.clone(),
pid: std::process::id(),
since_ms: live.since_ms,
})
.collect()
}
fn persist(&self) -> Result<()> {
let file = MountRegistryFile {
mounts: self.snapshot(),
};
let path = mount_daemon_registry_path(&self.repo_root);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let bytes = serde_json::to_vec_pretty(&file).context("encode mount registry")?;
objects::fs_atomic::write_file_atomic(&path, &bytes)
.with_context(|| format!("persist mount registry at {}", path.display()))?;
Ok(())
}
#[doc(hidden)]
#[allow(non_snake_case)]
pub fn __test_inject_phantom_mount(&mut self, thread_id: &str, mount_path: PathBuf) {
self.mounts.insert(
thread_id.to_string(),
LiveMount {
session: None,
mount_path,
since_ms: current_millis(),
_prewarm: None,
},
);
}
}
fn current_millis() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[allow(dead_code)]
pub type SharedRegistry = Arc<std::sync::Mutex<MountRegistry>>;