use std::{
collections::{BTreeMap, BTreeSet, VecDeque},
ffi::{OsStr, OsString},
path::{Component, Path, PathBuf},
sync::{
Arc, Mutex, RwLock, Weak,
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
},
thread::JoinHandle,
time::{Duration, Instant, SystemTime},
};
use objects::{
object::{
Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
TreeEntryTarget,
},
store::{AnyStore, ObjectStore},
sync::{LockExt, RwLockExt},
util::gitlink_placeholder_bytes,
};
use oplog::OpLog;
use refs::RefManager;
use repo::Repository;
use tracing::{debug, instrument, warn};
use crate::{
cache::BlobCachePool,
error::{MountError, Result},
shell::{
AttrUpdate, Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, RenameOptions,
kind_for_mode,
},
};
const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
pub(crate) const MAX_MOUNT_HOT_FILE_SIZE: u64 = 100 * 1024 * 1024;
fn validate_write_extent(offset: u64, data_len: usize) -> Result<usize> {
let data_len_u64 = u64::try_from(data_len).map_err(|_| {
MountError::InvalidArgument(format!("write length {data_len} does not fit in u64"))
})?;
let end = offset.checked_add(data_len_u64).ok_or_else(|| {
MountError::InvalidArgument(format!(
"write offset {offset} + length {data_len} overflows u64"
))
})?;
if end > MAX_MOUNT_HOT_FILE_SIZE {
return Err(MountError::FileTooLarge(format!(
"write would extend file to {end} bytes (max {MAX_MOUNT_HOT_FILE_SIZE})"
)));
}
usize::try_from(end).map_err(|_| {
MountError::InvalidArgument(format!(
"write extent end {end} does not fit in usize on this platform"
))
})
}
fn validate_truncate_size(new_size: u64) -> Result<usize> {
if new_size > MAX_MOUNT_HOT_FILE_SIZE {
return Err(MountError::FileTooLarge(format!(
"truncate to {new_size} bytes exceeds max {MAX_MOUNT_HOT_FILE_SIZE}"
)));
}
usize::try_from(new_size).map_err(|_| {
MountError::InvalidArgument(format!(
"truncate size {new_size} does not fit in usize on this platform"
))
})
}
#[derive(Clone, Copy, Debug)]
pub struct PromotionPolicy {
pub idle_after: Duration,
pub sweep_interval: Option<Duration>,
}
impl Default for PromotionPolicy {
fn default() -> Self {
Self {
idle_after: DEFAULT_PROMOTION_IDLE,
sweep_interval: DEFAULT_SWEEP_INTERVAL,
}
}
}
#[derive(Clone, Debug)]
enum NodeRecord {
Root {
tree: ContentHash,
},
Dir {
tree: ContentHash,
path: PathBuf,
},
PendingDir {
path: PathBuf,
},
File {
blob: ContentHash,
mode: FileMode,
path: PathBuf,
},
Gitlink {
placeholder: Vec<u8>,
path: PathBuf,
},
Symlink {
blob: ContentHash,
},
PendingFile {
path: PathBuf,
mode: FileMode,
},
PendingSymlink {
path: PathBuf,
},
}
impl NodeRecord {
fn kind(&self) -> NodeKind {
match self {
NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
NodeKind::Directory
}
NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
kind_for_mode(*mode)
}
NodeRecord::Gitlink { .. } => NodeKind::File,
NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => NodeKind::Symlink,
}
}
fn unix_mode(&self) -> u32 {
match self {
NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
DIR_UNIX_MODE
}
NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
mode.to_unix_mode()
}
NodeRecord::Gitlink { .. } => FileMode::Normal.to_unix_mode(),
NodeRecord::Symlink { .. } | NodeRecord::PendingSymlink { .. } => {
FileMode::Symlink.to_unix_mode()
}
}
}
}
#[derive(Default)]
struct Inodes {
next: u64,
by_id: BTreeMap<u64, NodeRecord>,
by_hash: BTreeMap<HashKey, u64>,
by_path: BTreeMap<PathBuf, u64>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct HashKey {
kind: u8,
hash: ContentHash,
}
impl Inodes {
fn new(root_tree: ContentHash) -> Self {
let mut me = Self {
next: NodeId::ROOT.0 + 1,
by_id: BTreeMap::new(),
by_hash: BTreeMap::new(),
by_path: BTreeMap::new(),
};
me.by_id
.insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
me.by_hash.insert(
HashKey {
kind: 0,
hash: root_tree,
},
NodeId::ROOT.0,
);
me
}
fn get(&self, id: NodeId) -> Option<NodeRecord> {
self.by_id.get(&id.0).cloned()
}
fn intern(&mut self, record: NodeRecord) -> NodeId {
match &record {
NodeRecord::Root { tree } => {
let key = HashKey {
kind: 0,
hash: *tree,
};
if let Some(&id) = self.by_hash.get(&key) {
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_id.insert(id, record);
self.by_hash.insert(key, id);
NodeId(id)
}
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
if let Some(&id) = self.by_path.get(path) {
self.by_id.insert(id, record);
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_path.insert(path.clone(), id);
self.by_id.insert(id, record);
NodeId(id)
}
NodeRecord::File { path, .. }
| NodeRecord::Gitlink { path, .. }
| NodeRecord::PendingFile { path, .. }
| NodeRecord::PendingSymlink { path } => {
if let Some(&id) = self.by_path.get(path) {
self.by_id.insert(id, record);
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_path.insert(path.clone(), id);
self.by_id.insert(id, record);
NodeId(id)
}
NodeRecord::Symlink { blob } => {
let key = HashKey {
kind: 2,
hash: *blob,
};
if let Some(&id) = self.by_hash.get(&key) {
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_id.insert(id, record);
self.by_hash.insert(key, id);
NodeId(id)
}
}
}
fn forget(&mut self, id: NodeId) {
if id == NodeId::ROOT {
return;
}
if let Some(record) = self.by_id.remove(&id.0) {
match record {
NodeRecord::Root { tree } => {
self.by_hash.remove(&HashKey {
kind: 0,
hash: tree,
});
}
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
if self.by_path.get(&path) == Some(&id.0) {
self.by_path.remove(&path);
}
}
NodeRecord::File { path, .. }
| NodeRecord::Gitlink { path, .. }
| NodeRecord::PendingFile { path, .. }
| NodeRecord::PendingSymlink { path } => {
if self.by_path.get(&path) == Some(&id.0) {
self.by_path.remove(&path);
}
}
NodeRecord::Symlink { blob } => {
self.by_hash.remove(&HashKey {
kind: 2,
hash: blob,
});
}
}
}
}
}
struct HotBuffer {
path: PathBuf,
mode: FileMode,
bytes: Vec<u8>,
last_touched: Instant,
}
#[derive(Clone, Debug)]
struct PendingEntry {
blob: ContentHash,
mode: FileMode,
size: u64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum NodeState {
Live { open_count: u32 },
Orphan { open_count: u32 },
}
#[derive(Default)]
#[doc(hidden)]
pub struct Pending<'brand> {
hot: BTreeMap<u64, HotBuffer>,
hot_by_path: BTreeMap<PathBuf, u64>,
warm: BTreeMap<u64, PendingEntry>,
tombstones: BTreeSet<PathBuf>,
dir_tombstones: BTreeSet<PathBuf>,
explicit_dirs: BTreeSet<PathBuf>,
symlinks: BTreeMap<PathBuf, Vec<u8>>,
state: BTreeMap<u64, NodeState>,
_brand: std::marker::PhantomData<fn(&'brand ()) -> &'brand ()>,
}
impl<'brand> Pending<'brand> {
fn is_orphan(&self, id: u64) -> bool {
matches!(self.state.get(&id), Some(NodeState::Orphan { .. }))
}
fn open_count(&self, id: u64) -> u32 {
match self.state.get(&id) {
Some(NodeState::Live { open_count } | NodeState::Orphan { open_count }) => *open_count,
None => 0,
}
}
pub(crate) fn lookup_state(&self, id: u64) -> Option<NodeState> {
self.state.get(&id).copied()
}
pub(crate) fn apply_transition_to_orphan(
&mut self,
w: &crate::pending::Witness<'_, 'brand, crate::pending::Orphan>,
) {
let id = w.id();
let open_count = self.open_count(id);
self.state.insert(id, NodeState::Orphan { open_count });
}
pub(crate) fn lifecycle_iter(&self) -> impl Iterator<Item = (u64, NodeState)> + '_ {
self.state.iter().map(|(&id, &s)| (id, s))
}
pub(crate) fn apply_kernel_forget(
&mut self,
w: &crate::pending::KernelForgetWitness<'_, 'brand>,
) -> bool {
let id = w.id();
if let Some(buf) = self.hot.remove(&id)
&& self.hot_by_path.get(&buf.path) == Some(&id)
{
self.hot_by_path.remove(&buf.path);
}
self.state.remove(&id);
self.warm.contains_key(&id)
}
pub(crate) fn apply_drain_for_capture(&mut self, surviving: &BTreeSet<u64>) {
self.hot.retain(|id, _| surviving.contains(id));
self.warm.retain(|id, _| surviving.contains(id));
self.state.retain(|id, _| surviving.contains(id));
self.hot_by_path.clear();
self.tombstones.clear();
self.dir_tombstones.clear();
self.explicit_dirs.clear();
self.symlinks.clear();
}
#[cfg(test)]
pub(crate) fn test_insert_state(&mut self, id: u64, state: NodeState) {
self.state.insert(id, state);
}
#[cfg(test)]
pub(crate) fn test_insert_hot(&mut self, id: u64, path: PathBuf, bytes: Vec<u8>) {
self.hot.insert(
id,
HotBuffer {
path,
mode: FileMode::Normal,
bytes,
last_touched: Instant::now(),
},
);
}
#[cfg(test)]
pub(crate) fn test_has_hot(&self, id: u64) -> bool {
self.hot.contains_key(&id)
}
}
pub struct ContentAddressedMount<S: ObjectStore + 'static = AnyStore> {
inner: Arc<MountInner<S>>,
sweeper: Mutex<Option<SweepHandle>>,
}
pub(crate) struct MountInner<S: ObjectStore> {
repo: Repository<RefManager, OpLog, S>,
thread: String,
state: RwLock<MountState>,
inodes: Mutex<Inodes>,
pending: Mutex<Pending<'static>>,
promotion: RwLock<PromotionPolicy>,
mounted_at: SystemTime,
write_mu: Mutex<()>,
blob_cache: Arc<BlobCachePool>,
}
struct SweepHandle {
state: Arc<SweepShutdown>,
join: Option<JoinHandle<()>>,
}
struct SweepShutdown {
shutdown: Mutex<bool>,
cv: std::sync::Condvar,
}
impl SweepShutdown {
fn new() -> Self {
Self {
shutdown: Mutex::new(false),
cv: std::sync::Condvar::new(),
}
}
fn signal(&self) {
*self.shutdown.lock_or_poisoned() = true;
self.cv.notify_all();
}
fn wait(&self, dur: Duration) -> bool {
let guard = self.shutdown.lock_or_poisoned();
let (guard, _timeout) = self
.cv
.wait_timeout_while(guard, dur, |s| !*s)
.expect("sweep shutdown wait");
*guard
}
}
impl SweepHandle {
fn signal_and_join(&mut self) {
self.state.signal();
if let Some(handle) = self.join.take() {
let _ = handle.join();
}
}
}
impl Drop for SweepHandle {
fn drop(&mut self) {
self.signal_and_join();
}
}
const PREWARM_WORKERS: usize = 4;
const PREWARM_FULL_FRACTION: u8 = 90;
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct PrewarmStats {
pub hashes_discovered: u64,
pub hashes_visited: u64,
pub already_cached: u64,
pub loaded: u64,
pub completed: bool,
}
pub struct PrewarmHandle {
cancel: Arc<AtomicBool>,
join: Option<JoinHandle<PrewarmStats>>,
}
impl PrewarmHandle {
fn start<S: ObjectStore + 'static>(weak: Weak<MountInner<S>>) -> Self {
let cancel = Arc::new(AtomicBool::new(false));
let cancel_for_worker = Arc::clone(&cancel);
let join = std::thread::Builder::new()
.name("heddle-prewarm-coordinator".to_string())
.spawn(move || prewarm_run(weak, cancel_for_worker))
.ok();
Self { cancel, join }
}
pub fn cancel(&self) {
self.cancel.store(true, Ordering::SeqCst);
}
pub fn wait(mut self) -> PrewarmStats {
self.join
.take()
.and_then(|h| h.join().ok())
.unwrap_or_default()
}
}
impl Drop for PrewarmHandle {
fn drop(&mut self) {
self.cancel.store(true, Ordering::SeqCst);
if let Some(join) = self.join.take() {
let _ = join.join();
}
}
}
fn prewarm_run<S: ObjectStore + 'static>(
weak: Weak<MountInner<S>>,
cancel: Arc<AtomicBool>,
) -> PrewarmStats {
let Some(inner) = weak.upgrade() else {
return PrewarmStats::default();
};
let mut stats = PrewarmStats::default();
let mut hashes: Vec<ContentHash> = Vec::new();
let root_tree = inner.state.read_or_poisoned().tree;
let mut queue: VecDeque<ContentHash> = VecDeque::from([root_tree]);
let mut seen_trees: std::collections::HashSet<ContentHash> = std::collections::HashSet::new();
while let Some(tree_hash) = queue.pop_front() {
if cancel.load(Ordering::Relaxed) {
return stats;
}
if !seen_trees.insert(tree_hash) {
continue;
}
let Ok(Some(tree)) = inner.repo.store().get_tree(&tree_hash) else {
continue;
};
for entry in tree.entries() {
match entry.entry_type() {
EntryType::Tree => {
if let Some(hash) = entry.tree_hash() {
queue.push_back(hash);
}
}
EntryType::Blob | EntryType::Symlink => {
if let Some(hash) = entry.content_hash() {
hashes.push(hash);
}
stats.hashes_discovered += 1;
}
EntryType::Gitlink => {}
}
}
}
drop(inner);
if hashes.is_empty() {
stats.completed = true;
return stats;
}
let hashes = Arc::new(hashes);
let cursor = Arc::new(AtomicUsize::new(0));
let visited = Arc::new(AtomicU32::new(0));
let already = Arc::new(AtomicU32::new(0));
let loaded = Arc::new(AtomicU32::new(0));
let stop_full = Arc::new(AtomicBool::new(false));
let mut workers = Vec::with_capacity(PREWARM_WORKERS);
for worker_id in 0..PREWARM_WORKERS {
let weak = weak.clone();
let cancel = Arc::clone(&cancel);
let hashes = Arc::clone(&hashes);
let cursor = Arc::clone(&cursor);
let visited = Arc::clone(&visited);
let already = Arc::clone(&already);
let loaded = Arc::clone(&loaded);
let stop_full = Arc::clone(&stop_full);
let handle = std::thread::Builder::new()
.name(format!("heddle-prewarm-{worker_id}"))
.spawn(move || {
loop {
if cancel.load(Ordering::Relaxed) || stop_full.load(Ordering::Relaxed) {
return;
}
let idx = cursor.fetch_add(1, Ordering::Relaxed);
if idx >= hashes.len() {
return;
}
let hash = hashes[idx];
let Some(inner) = weak.upgrade() else {
return;
};
visited.fetch_add(1, Ordering::Relaxed);
if inner.blob_cache.get(&hash).is_some() {
already.fetch_add(1, Ordering::Relaxed);
continue;
}
let pool = &inner.blob_cache;
let full_threshold = pool
.cap_bytes()
.saturating_mul(PREWARM_FULL_FRACTION as usize)
/ 100;
if pool.resident_bytes() >= full_threshold {
stop_full.store(true, Ordering::Relaxed);
return;
}
match inner.repo.store().get_blob_bytes(&hash) {
Ok(Some(bytes)) => {
pool.insert(hash, bytes);
loaded.fetch_add(1, Ordering::Relaxed);
}
Ok(None) | Err(_) => {
}
}
}
})
.ok();
if let Some(h) = handle {
workers.push(h);
}
}
for w in workers {
let _ = w.join();
}
stats.hashes_visited = visited.load(Ordering::Relaxed) as u64;
stats.already_cached = already.load(Ordering::Relaxed) as u64;
stats.loaded = loaded.load(Ordering::Relaxed) as u64;
stats.completed = !cancel.load(Ordering::Relaxed) && !stop_full.load(Ordering::Relaxed);
stats
}
impl<S: ObjectStore + 'static> Drop for ContentAddressedMount<S> {
fn drop(&mut self) {
if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
handle.signal_and_join();
}
}
}
#[derive(Clone, Copy, Debug)]
struct MountState {
change_id: ChangeId,
tree: ContentHash,
}
#[derive(Clone, Default)]
pub struct MountOptions {
pub blob_cache: Option<Arc<BlobCachePool>>,
}
impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
pub fn new(repo: Repository<RefManager, OpLog, S>, thread: impl Into<String>) -> Result<Self> {
Self::with_options(repo, thread, MountOptions::default())
}
pub fn with_options(
repo: Repository<RefManager, OpLog, S>,
thread: impl Into<String>,
options: MountOptions,
) -> Result<Self> {
let thread = thread.into();
let state = resolve_thread(&repo, &thread)?;
let inodes = Mutex::new(Inodes::new(state.tree));
let blob_cache = options
.blob_cache
.unwrap_or_else(|| Arc::new(BlobCachePool::with_default_capacity()));
let inner = Arc::new(MountInner {
repo,
thread,
state: RwLock::new(state),
inodes,
pending: Mutex::new(Pending::default()),
promotion: RwLock::new(PromotionPolicy::default()),
mounted_at: SystemTime::now(),
blob_cache,
write_mu: Mutex::new(()),
});
let sweeper = spawn_sweep_worker(&inner);
Ok(Self {
inner,
sweeper: Mutex::new(sweeper),
})
}
pub fn blob_cache_pool(&self) -> &Arc<BlobCachePool> {
&self.inner.blob_cache
}
pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
if let Some(mut handle) = self.sweeper.lock_or_poisoned().take() {
handle.signal_and_join();
}
*self.inner.promotion.write_or_poisoned() = policy;
let sweeper = spawn_sweep_worker(&self.inner);
*self.sweeper.lock_or_poisoned() = sweeper;
self
}
pub fn refresh(&self) -> Result<()> {
let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
*self.inner.state.write_or_poisoned() = next;
Ok(())
}
pub fn thread(&self) -> &str {
&self.inner.thread
}
pub fn current_change_id(&self) -> ChangeId {
self.inner.state.read_or_poisoned().change_id
}
fn store(&self) -> &S {
self.inner.repo.store()
}
fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
self.store()
.get_tree(hash)?
.ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
}
pub fn clear_blob_cache(&self) {
self.inner.blob_cache.clear();
self.inner.repo.store().clear_recent_caches();
}
pub fn prewarm(&self) -> PrewarmHandle {
PrewarmHandle::start(Arc::downgrade(&self.inner))
}
fn load_blob_bytes(&self, hash: &ContentHash) -> Result<bytes::Bytes> {
if let Some(hit) = self.inner.blob_cache.get(hash) {
return Ok(hit);
}
let bytes = self
.store()
.get_blob_bytes(hash)?
.ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
self.inner.blob_cache.insert(*hash, bytes.clone());
Ok(bytes)
}
fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
self.store()
.blob_size(hash)?
.ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
}
fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
self.inner
.inodes
.lock()
.expect("inode lock")
.get(id)
.ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
}
fn intern(&self, record: NodeRecord) -> NodeId {
self.inner.inodes.lock_or_poisoned().intern(record)
}
pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
let mut node = NodeId::ROOT;
for component in path.as_ref().components() {
match component {
Component::CurDir | Component::RootDir => continue,
Component::Prefix(_) => {
return Err(MountError::NotFound(format!(
"unsupported path component in {}",
path.as_ref().display()
)));
}
Component::ParentDir => {
return Err(MountError::NotFound(format!(
"parent traversal not supported: {}",
path.as_ref().display()
)));
}
Component::Normal(name) => {
let entry = self
.lookup(node, name)?
.ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
node = entry.node;
}
}
}
Ok(node)
}
fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
let entry_path = join_child(parent_path, tree_entry.name());
let (kind, size, unix_mode, record) = match tree_entry.target() {
TreeEntryTarget::Tree { hash } => {
let subtree = self.load_tree(hash)?;
(
NodeKind::Directory,
subtree.entries().len() as u64,
DIR_UNIX_MODE,
NodeRecord::Dir {
tree: *hash,
path: entry_path,
},
)
}
TreeEntryTarget::Blob { hash, executable } => {
let size = self.blob_size(hash)?;
let mode = if *executable {
FileMode::Executable
} else {
FileMode::Normal
};
(
kind_for_mode(mode),
size,
mode.to_unix_mode(),
NodeRecord::File {
blob: *hash,
mode,
path: entry_path,
},
)
}
TreeEntryTarget::Symlink { hash } => {
let size = self.blob_size(hash)?;
(
NodeKind::Symlink,
size,
FileMode::Symlink.to_unix_mode(),
NodeRecord::Symlink { blob: *hash },
)
}
TreeEntryTarget::Gitlink { target } => {
let placeholder = gitlink_placeholder_bytes(target);
let size = placeholder.len() as u64;
(
NodeKind::File,
size,
FileMode::Normal.to_unix_mode(),
NodeRecord::Gitlink {
placeholder,
path: entry_path,
},
)
}
};
let node = self.intern(record);
Ok(Entry {
node,
name: OsString::from(tree_entry.name()),
kind,
size,
unix_mode,
})
}
fn entry_from_pending_hit(&self, hit: PendingHit, path: &Path, name: &OsStr) -> Option<Entry> {
match hit {
PendingHit::Tombstone => None,
PendingHit::Hot { node, size, mode } => Some(Entry {
node,
name: name.to_os_string(),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
}),
PendingHit::Warm {
blob: _,
size,
mode,
} => {
let node = self.intern(NodeRecord::PendingFile {
path: path.to_path_buf(),
mode,
});
Some(Entry {
node,
name: name.to_os_string(),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
})
}
PendingHit::Symlink { target_len } => {
let node = self.intern(NodeRecord::PendingSymlink {
path: path.to_path_buf(),
});
Some(Entry {
node,
name: name.to_os_string(),
kind: NodeKind::Symlink,
size: target_len,
unix_mode: FileMode::Symlink.to_unix_mode(),
})
}
}
}
fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
match record {
NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
NodeRecord::PendingDir { .. } => Ok(Tree::new()),
_ => Err(MountError::NotADirectory(format!("{record:?}"))),
}
}
fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
match record {
NodeRecord::Root { .. } => Some(PathBuf::new()),
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
_ => None,
}
}
fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
match record {
NodeRecord::PendingFile { path, .. }
| NodeRecord::File { path, .. }
| NodeRecord::Gitlink { path, .. } => Some(path.clone()),
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
NodeRecord::PendingSymlink { path } => Some(path.clone()),
_ => None,
}
}
fn promote_idle_buffers(&self) -> Result<()> {
self.inner.sweep_idle_buffers()
}
pub fn flush_node(&self, node: NodeId) -> Result<()> {
self.inner.flush_node(node)
}
pub fn release_node(&self, node: NodeId) -> Result<()> {
self.inner.release_node(node)
}
pub fn on_open(&self, node: NodeId) -> Result<()> {
let mut pending = self.inner.pending.lock_or_poisoned();
let next = match pending.state.get(&node.0).copied() {
None => NodeState::Live { open_count: 1 },
Some(NodeState::Live { open_count }) => NodeState::Live {
open_count: open_count.saturating_add(1),
},
Some(NodeState::Orphan { open_count }) => NodeState::Orphan {
open_count: open_count.saturating_add(1),
},
};
pending.state.insert(node.0, next);
Ok(())
}
pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref().to_path_buf();
let bound_id = {
let inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.get(&path).copied()
};
let mut pending = self.inner.pending.lock_or_poisoned();
if let Some(node_id) = pending.hot_by_path.remove(&path) {
pending.hot.remove(&node_id);
pending.warm.remove(&node_id);
pending.state.remove(&node_id);
}
if let Some(node_id) = bound_id {
pending.hot.remove(&node_id);
pending.warm.remove(&node_id);
pending.state.remove(&node_id);
}
pending.symlinks.remove(&path);
pending.tombstones.insert(path.clone());
drop(pending);
if bound_id.is_some() {
let mut inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.remove(&path);
}
Ok(())
}
pub fn create_file(
&self,
parent: NodeId,
name: &OsStr,
mode: FileMode,
exclusive: bool,
) -> Result<Entry> {
let _write_guard = self.inner.write_mu.lock_or_poisoned();
let name_str = validate_entry_name(name)?;
if let Some(existing) = self.lookup(parent, name)? {
if exclusive {
return Err(MountError::AlreadyExists(name_str.to_string()));
}
return Ok(existing);
}
let parent_record = self.record_for(parent)?;
let parent_path = self
.dir_path_of(&parent_record)
.ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
let child_path = join_child(&parent_path, name_str);
{
let mut pending = self.inner.pending.lock_or_poisoned();
pending.tombstones.remove(&child_path);
pending.explicit_dirs.remove(&child_path);
}
let node = self.intern(NodeRecord::PendingFile {
path: child_path.clone(),
mode,
});
{
let mut pending = self.inner.pending.lock_or_poisoned();
pending.hot.insert(
node.0,
HotBuffer {
path: child_path.clone(),
mode,
bytes: Vec::new(),
last_touched: Instant::now(),
},
);
pending.hot_by_path.insert(child_path, node.0);
}
Ok(Entry {
node,
name: name.to_os_string(),
kind: kind_for_mode(mode),
size: 0,
unix_mode: mode.to_unix_mode(),
})
}
pub fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
let _write_guard = self.inner.write_mu.lock_or_poisoned();
let name_str = validate_entry_name(name)?;
if self.lookup(parent, name)?.is_some() {
return Err(MountError::AlreadyExists(name_str.to_string()));
}
let parent_record = self.record_for(parent)?;
let parent_path = self
.dir_path_of(&parent_record)
.ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
let child_path = join_child(&parent_path, name_str);
{
let mut pending = self.inner.pending.lock_or_poisoned();
pending.dir_tombstones.remove(&child_path);
pending.tombstones.remove(&child_path);
pending.explicit_dirs.insert(child_path.clone());
}
let node = self.intern(NodeRecord::PendingDir { path: child_path });
Ok(Entry {
node,
name: name.to_os_string(),
kind: NodeKind::Directory,
size: 0,
unix_mode: DIR_UNIX_MODE,
})
}
pub fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
let _write_guard = self.inner.write_mu.lock_or_poisoned();
let name_str = validate_entry_name(name)?;
let entry = self
.lookup(parent, name)?
.ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
if entry.kind == NodeKind::Directory {
return Err(MountError::IsADirectory(name_str.to_string()));
}
let parent_record = self.record_for(parent)?;
let parent_path = self
.dir_path_of(&parent_record)
.ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
let child_path = join_child(&parent_path, name_str);
let node_id = entry.node.0;
{
let mut pending = self.inner.pending.lock_or_poisoned();
pending.hot_by_path.remove(&child_path);
pending.with_brand(|bp| {
let _ = bp.transition_to_orphan(node_id);
});
pending.symlinks.remove(&child_path);
pending.tombstones.insert(child_path.clone());
}
{
let mut inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.remove(&child_path);
}
Ok(())
}
pub fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
let _write_guard = self.inner.write_mu.lock_or_poisoned();
let name_str = validate_entry_name(name)?;
let entry = self
.lookup(parent, name)?
.ok_or_else(|| MountError::NotFound(name_str.to_string()))?;
if entry.kind != NodeKind::Directory {
return Err(MountError::NotADirectory(name_str.to_string()));
}
let children = self.enumerate(entry.node)?;
if !children.is_empty() {
return Err(MountError::NotEmpty(name_str.to_string()));
}
let parent_record = self.record_for(parent)?;
let parent_path = self
.dir_path_of(&parent_record)
.ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
let child_path = join_child(&parent_path, name_str);
{
let mut pending = self.inner.pending.lock_or_poisoned();
pending.explicit_dirs.remove(&child_path);
pending.dir_tombstones.insert(child_path.clone());
}
{
let mut inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.remove(&child_path);
}
Ok(())
}
pub fn rename_entry(
&self,
old_parent: NodeId,
old_name: &OsStr,
new_parent: NodeId,
new_name: &OsStr,
) -> Result<()> {
self.rename_entry_with_options(
old_parent,
old_name,
new_parent,
new_name,
RenameOptions::default(),
)
}
pub fn rename_entry_with_options(
&self,
old_parent: NodeId,
old_name: &OsStr,
new_parent: NodeId,
new_name: &OsStr,
options: RenameOptions,
) -> Result<()> {
let _write_guard = self.inner.write_mu.lock_or_poisoned();
let old_name_str = validate_entry_name(old_name)?;
let new_name_str = validate_entry_name(new_name)?;
let src = self
.lookup(old_parent, old_name)?
.ok_or_else(|| MountError::NotFound(format!("rename src {old_name_str}")))?;
let old_parent_record = self.record_for(old_parent)?;
let new_parent_record = self.record_for(new_parent)?;
let old_parent_path = self
.dir_path_of(&old_parent_record)
.ok_or_else(|| MountError::NotADirectory(format!("{old_parent_record:?}")))?;
let new_parent_path = self
.dir_path_of(&new_parent_record)
.ok_or_else(|| MountError::NotADirectory(format!("{new_parent_record:?}")))?;
let old_path = join_child(&old_parent_path, old_name_str);
let new_path = join_child(&new_parent_path, new_name_str);
if old_path == new_path {
return Ok(());
}
let dst = self.lookup(new_parent, new_name)?;
if dst.is_some() && options.no_replace {
return Err(MountError::AlreadyExists(new_name_str.to_string()));
}
if let Some(ref d) = dst {
match (src.kind, d.kind) {
(NodeKind::Directory, NodeKind::Directory) => {
let dst_children = self.enumerate(d.node)?;
if !dst_children.is_empty() {
return Err(MountError::NotEmpty(new_name_str.to_string()));
}
}
(NodeKind::Directory, _) => {
return Err(MountError::NotADirectory(new_name_str.to_string()));
}
(_, NodeKind::Directory) => {
return Err(MountError::IsADirectory(new_name_str.to_string()));
}
_ => {}
}
}
let displaced_inode_id = dst.as_ref().map(|d| d.node.0);
match src.kind {
NodeKind::File => self.move_file(&old_path, &new_path, displaced_inode_id)?,
NodeKind::Symlink => self.move_symlink(&old_path, &new_path, displaced_inode_id)?,
NodeKind::Directory => self.move_overlay_dir(&old_path, &new_path)?,
}
{
let mut inodes = self.inner.inodes.lock_or_poisoned();
let displaced_dest = inodes.by_path.remove(&new_path);
let rebased_src = if let Some(src_id) = inodes.by_path.remove(&old_path) {
if let Some(
NodeRecord::PendingFile { path, .. }
| NodeRecord::File { path, .. }
| NodeRecord::Gitlink { path, .. }
| NodeRecord::PendingSymlink { path }
| NodeRecord::Dir { path, .. }
| NodeRecord::PendingDir { path },
) = inodes.by_id.get_mut(&src_id)
{
*path = new_path.clone();
}
inodes.by_path.insert(new_path.clone(), src_id);
if src.kind == NodeKind::Directory {
let descendants: Vec<(PathBuf, PathBuf, u64)> = inodes
.by_path
.iter()
.filter_map(|(p, id)| {
let tail = p.strip_prefix(&old_path).ok()?;
if tail.as_os_str().is_empty() {
return None;
}
Some((p.clone(), new_path.join(tail), *id))
})
.collect();
for (old_key, new_key, id) in descendants {
inodes.by_path.remove(&old_key);
if let Some(
NodeRecord::PendingFile { path, .. }
| NodeRecord::File { path, .. }
| NodeRecord::Gitlink { path, .. }
| NodeRecord::PendingSymlink { path }
| NodeRecord::Dir { path, .. }
| NodeRecord::PendingDir { path },
) = inodes.by_id.get_mut(&id)
{
*path = new_key.clone();
}
inodes.by_path.insert(new_key, id);
}
}
Some(src_id)
} else {
None
};
drop(inodes);
let mut pending = self.inner.pending.lock_or_poisoned();
if let Some(src_id) = rebased_src
&& let Some(buf) = pending.hot.get_mut(&src_id)
{
buf.path = new_path.clone();
}
if let Some(dest_id) = displaced_dest {
pending.with_brand(|bp| {
let _ = bp.transition_to_orphan(dest_id);
});
}
}
Ok(())
}
fn move_file(
&self,
old_path: &Path,
new_path: &Path,
displaced_inode_id: Option<u64>,
) -> Result<()> {
let src_id_opt = self
.inner
.pending
.lock()
.expect("pending lock")
.hot_by_path
.get(old_path)
.copied();
if let Some(id) = src_id_opt {
self.flush_node(NodeId(id))?;
}
let src_id = {
let inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.get(old_path).copied()
};
let needs_synth = match src_id {
Some(id) => !self
.inner
.pending
.lock()
.expect("pending lock")
.warm
.contains_key(&id),
None => true,
};
let captured_seed = if needs_synth {
Some(self.captured_file_at(old_path)?)
} else {
None
};
let mut pending = self.inner.pending.lock_or_poisoned();
pending.hot_by_path.remove(new_path);
pending.symlinks.remove(new_path);
pending.symlinks.remove(old_path);
if let Some(id) = pending.hot_by_path.remove(old_path) {
if let Some(buf) = pending.hot.get_mut(&id) {
buf.path = new_path.to_path_buf();
}
pending.hot_by_path.insert(new_path.to_path_buf(), id);
}
if let (Some(id), Some((blob, mode, size))) = (src_id, captured_seed) {
pending.warm.insert(id, PendingEntry { blob, mode, size });
}
pending.tombstones.insert(old_path.to_path_buf());
pending.tombstones.remove(new_path);
let _ = displaced_inode_id;
Ok(())
}
fn move_symlink(
&self,
old_path: &Path,
new_path: &Path,
displaced_inode_id: Option<u64>,
) -> Result<()> {
let target_bytes = {
let pending = self.inner.pending.lock_or_poisoned();
pending.symlinks.get(old_path).cloned()
};
let target_bytes = match target_bytes {
Some(b) => b,
None => {
let blob = self.captured_symlink_at(old_path)?;
(*self.load_blob_bytes(&blob)?).to_vec()
}
};
let mut pending = self.inner.pending.lock_or_poisoned();
pending.hot_by_path.remove(new_path);
pending.symlinks.remove(new_path);
pending.symlinks.remove(old_path);
pending
.symlinks
.insert(new_path.to_path_buf(), target_bytes);
pending.tombstones.remove(new_path);
pending.tombstones.insert(old_path.to_path_buf());
let _ = displaced_inode_id;
Ok(())
}
fn move_overlay_dir(&self, old_path: &Path, new_path: &Path) -> Result<()> {
if self.captured_dir_exists(old_path)? {
return Err(MountError::InvalidArgument(format!(
"cross-tree directory rename {} → {} not supported by the overlay",
old_path.display(),
new_path.display()
)));
}
let mut pending = self.inner.pending.lock_or_poisoned();
fn rebase(p: &Path, old: &Path, new: &Path) -> Option<PathBuf> {
let tail = p.strip_prefix(old).ok()?;
Some(new.join(tail))
}
let mut new_explicit: BTreeSet<PathBuf> = BTreeSet::new();
let mut new_symlinks: BTreeMap<PathBuf, Vec<u8>> = BTreeMap::new();
let mut new_tombstones: BTreeSet<PathBuf> = BTreeSet::new();
let mut new_hot_by_path: BTreeMap<PathBuf, u64> = BTreeMap::new();
let mut hot_path_updates: Vec<(u64, PathBuf)> = Vec::new();
for explicit in std::mem::take(&mut pending.explicit_dirs) {
match rebase(&explicit, old_path, new_path) {
Some(rebased) => {
new_explicit.insert(rebased);
}
None => {
if explicit != old_path {
new_explicit.insert(explicit);
}
}
}
}
for (path, target) in std::mem::take(&mut pending.symlinks) {
match rebase(&path, old_path, new_path) {
Some(rebased) => {
new_symlinks.insert(rebased, target);
}
None => {
new_symlinks.insert(path, target);
}
}
}
for path in std::mem::take(&mut pending.tombstones) {
match rebase(&path, old_path, new_path) {
Some(rebased) => {
new_tombstones.insert(rebased);
}
None => {
new_tombstones.insert(path);
}
}
}
for (path, id) in std::mem::take(&mut pending.hot_by_path) {
match rebase(&path, old_path, new_path) {
Some(rebased) => {
hot_path_updates.push((id, rebased.clone()));
new_hot_by_path.insert(rebased, id);
}
None => {
new_hot_by_path.insert(path, id);
}
}
}
for (id, new_p) in hot_path_updates {
if let Some(buf) = pending.hot.get_mut(&id) {
buf.path = new_p;
}
}
new_explicit.insert(new_path.to_path_buf());
pending.explicit_dirs = new_explicit;
pending.symlinks = new_symlinks;
pending.tombstones = new_tombstones;
pending.hot_by_path = new_hot_by_path;
Ok(())
}
fn captured_file_at(&self, path: &Path) -> Result<(ContentHash, FileMode, u64)> {
let entry = self.captured_tree_entry(path)?;
let Some(hash) = entry.blob_hash() else {
return Err(MountError::InvalidArgument(format!(
"{} is not a mutable file in the captured tree",
path.display()
)));
};
let mode = entry.mode();
let size = self.blob_size(&hash)?;
Ok((hash, mode, size))
}
fn captured_symlink_at(&self, path: &Path) -> Result<ContentHash> {
let entry = self.captured_tree_entry(path)?;
let Some(hash) = entry.symlink_hash() else {
return Err(MountError::InvalidArgument(format!(
"{} is not a symlink in the captured tree",
path.display()
)));
};
Ok(hash)
}
fn captured_tree_entry(&self, path: &Path) -> Result<TreeEntry> {
let root_record = self.record_for(NodeId::ROOT)?;
let mut tree = self.tree_for_record(&root_record)?;
let comps: Vec<&str> = path
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let (leaf, dirs) = comps
.split_last()
.ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
for d in dirs {
let e = tree
.get(d)
.ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
if !e.is_tree() {
return Err(MountError::NotADirectory(d.to_string()));
}
let Some(hash) = e.tree_hash() else {
return Err(MountError::NotADirectory(d.to_string()));
};
tree = self.load_tree(&hash)?;
}
let entry = tree
.get(leaf)
.cloned()
.ok_or_else(|| MountError::NotFound(path.display().to_string()))?;
Ok(entry)
}
fn captured_dir_exists(&self, path: &Path) -> Result<bool> {
match self.captured_tree_entry(path) {
Ok(e) => Ok(e.is_tree()),
Err(MountError::NotFound(_)) => Ok(false),
Err(e) => Err(e),
}
}
pub fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
let _write_guard = self.inner.write_mu.lock_or_poisoned();
if let Some(raw_mode) = update.mode {
let new_mode = if (raw_mode & 0o100) != 0 {
FileMode::Executable
} else {
FileMode::Normal
};
let mut inodes = self.inner.inodes.lock_or_poisoned();
if let Some(NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. }) =
inodes.by_id.get_mut(&node.0)
{
*mode = new_mode;
}
drop(inodes);
let record = self.record_for(node)?;
if let Some(path) = match &record {
NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
_ => None,
} {
let path = path.clone();
let mut pending = self.inner.pending.lock_or_poisoned();
if let Some(buf) = pending.hot.get_mut(&node.0) {
buf.mode = new_mode;
}
if !pending.is_orphan(node.0) {
if let Some(other_id) = pending.hot_by_path.get(&path).copied()
&& let Some(buf) = pending.hot.get_mut(&other_id)
{
buf.mode = new_mode;
}
let warm_id = {
let inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.get(&path).copied()
};
if let Some(id) = warm_id
&& let Some(entry) = pending.warm.get_mut(&id)
{
entry.mode = new_mode;
}
}
}
}
if let Some(new_size) = update.size {
self.apply_truncate(node, new_size)?;
}
self.attrs(node)
}
fn apply_truncate(&self, node: NodeId, new_size: u64) -> Result<()> {
let new_size = validate_truncate_size(new_size)?;
let record = self.record_for(node)?;
let (path, mode, captured_blob) = match &record {
NodeRecord::File {
path, mode, blob, ..
} => (path.clone(), *mode, Some(*blob)),
NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
_ => {
return Err(MountError::IsADirectory(format!(
"setattr(size) on non-file {record:?}"
)));
}
};
enum Phase1 {
ResizedInPlace,
NeedSeed {
orphan: bool,
seed: Option<ContentHash>,
},
}
let phase1 = {
let path_owner = {
let inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.get(&path).copied()
};
let mut pending = self.inner.pending.lock_or_poisoned();
let orphan = pending.is_orphan(node.0);
let id = if pending.hot.contains_key(&node.0) {
Some(node.0)
} else if orphan {
None
} else {
pending.hot_by_path.get(&path).copied()
};
if let Some(id) = id
&& let Some(buf) = pending.hot.get_mut(&id)
{
buf.bytes.resize(new_size, 0);
buf.last_touched = Instant::now();
Phase1::ResizedInPlace
} else {
let seed = if orphan {
pending.warm.get(&node.0).map(|e| e.blob).or(captured_blob)
} else {
path_owner
.and_then(|id| pending.warm.get(&id).map(|e| e.blob))
.or(captured_blob)
};
Phase1::NeedSeed { orphan, seed }
}
};
let (orphan, seed_blob) = match phase1 {
Phase1::ResizedInPlace => return Ok(()),
Phase1::NeedSeed { orphan, seed } => (orphan, seed),
};
let mut bytes = match seed_blob {
Some(hash) => (*self.load_blob_bytes(&hash)?).to_vec(),
None => Vec::new(),
};
bytes.resize(new_size, 0);
let mut pending = self.inner.pending.lock_or_poisoned();
if orphan {
pending.hot.insert(
node.0,
HotBuffer {
path,
mode,
bytes,
last_touched: Instant::now(),
},
);
} else {
pending.tombstones.remove(&path);
pending.hot.insert(
node.0,
HotBuffer {
path: path.clone(),
mode,
bytes,
last_touched: Instant::now(),
},
);
pending.hot_by_path.insert(path, node.0);
}
Ok(())
}
pub fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
let _write_guard = self.inner.write_mu.lock_or_poisoned();
let name_str = validate_entry_name(name)?;
if self.lookup(parent, name)?.is_some() {
return Err(MountError::AlreadyExists(name_str.to_string()));
}
let parent_record = self.record_for(parent)?;
let parent_path = self
.dir_path_of(&parent_record)
.ok_or_else(|| MountError::NotADirectory(format!("{parent_record:?}")))?;
let child_path = join_child(&parent_path, name_str);
let target_bytes = target.as_os_str().as_encoded_bytes().to_vec();
let target_len = target_bytes.len() as u64;
{
let mut pending = self.inner.pending.lock_or_poisoned();
pending.tombstones.remove(&child_path);
pending.symlinks.insert(child_path.clone(), target_bytes);
}
let node = self.intern(NodeRecord::PendingSymlink { path: child_path });
Ok(Entry {
node,
name: name.to_os_string(),
kind: NodeKind::Symlink,
size: target_len,
unix_mode: FileMode::Symlink.to_unix_mode(),
})
}
pub fn read_link(&self, node: NodeId) -> Result<OsString> {
let record = self.record_for(node)?;
match record {
NodeRecord::PendingSymlink { path } => {
let pending = self.inner.pending.lock_or_poisoned();
let bytes = pending
.symlinks
.get(&path)
.ok_or_else(|| MountError::Stale(format!("symlink {}", path.display())))?;
symlink_target_from_bytes(bytes)
}
NodeRecord::Symlink { blob } => {
let bytes = self.load_blob_bytes(&blob)?;
symlink_target_from_bytes(&bytes)
}
other => Err(MountError::InvalidArgument(format!(
"read_link on non-symlink record: {other:?}"
))),
}
}
pub fn flush_all(&self) -> Result<()> {
let ids: Vec<u64> = self
.inner
.pending
.lock()
.expect("pending lock")
.hot
.keys()
.copied()
.collect();
for id in ids {
self.flush_node(NodeId(id))?;
}
Ok(())
}
fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
let pending = self.inner.pending.lock_or_poisoned();
if pending.tombstones.contains(path) {
return Some(PendingHit::Tombstone);
}
if let Some(target) = pending.symlinks.get(path) {
return Some(PendingHit::Symlink {
target_len: target.len() as u64,
});
}
if let Some(node_id) = pending.hot_by_path.get(path)
&& let Some(buf) = pending.hot.get(node_id)
{
return Some(PendingHit::Hot {
node: NodeId(*node_id),
size: buf.bytes.len() as u64,
mode: buf.mode,
});
}
let inodes = self.inner.inodes.lock_or_poisoned();
let id = *inodes.by_path.get(path)?;
let entry = pending.warm.get(&id)?;
Some(PendingHit::Warm {
blob: entry.blob,
size: entry.size,
mode: entry.mode,
})
}
fn ancestor_is_dir_tombstoned(&self, pending: &Pending, path: &Path) -> bool {
let mut cursor = path.parent();
while let Some(p) = cursor {
if p.as_os_str().is_empty() {
break;
}
if pending.dir_tombstones.contains(p) {
return true;
}
cursor = p.parent();
}
false
}
fn pending_dir_exists(&self, dir: &Path) -> bool {
if dir.as_os_str().is_empty() {
return false;
}
let pending = self.inner.pending.lock_or_poisoned();
if pending.explicit_dirs.contains(dir) {
return true;
}
let prefix = dir;
let probe = |path: &Path| -> bool {
path.strip_prefix(prefix)
.ok()
.and_then(|tail| tail.components().next())
.is_some()
};
let warm_under = {
let inodes = self.inner.inodes.lock_or_poisoned();
pending.warm.keys().any(|id| {
if pending.is_orphan(*id) {
return false;
}
let Some(record) = inodes.by_id.get(id) else {
return false;
};
match warm_path_of_record(record) {
Some(p) => !pending.tombstones.contains(p) && probe(p),
None => false,
}
})
};
warm_under
|| pending
.hot_by_path
.keys()
.any(|p| !pending.tombstones.contains(p) && probe(p))
|| pending.symlinks.keys().any(|p| probe(p))
}
fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
let pending = self.inner.pending.lock_or_poisoned();
let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
let project = |path: &Path| -> Option<(String, bool)> {
let suffix = if dir.as_os_str().is_empty() {
Some(path)
} else {
path.strip_prefix(dir).ok()
}?;
let mut comps = suffix.components();
let first = comps.next()?;
let name = match first {
Component::Normal(n) => n.to_str()?.to_string(),
_ => return None,
};
let is_dir = comps.next().is_some();
Some((name, is_dir))
};
for (path, node_id) in pending.hot_by_path.iter() {
if pending.tombstones.contains(path) {
continue;
}
let Some((name, is_dir)) = project(path) else {
continue;
};
if is_dir {
out.entry(name).or_insert(PendingChildKind::Dir);
} else if let Some(buf) = pending.hot.get(node_id) {
out.insert(
name,
PendingChildKind::HotFile {
node: NodeId(*node_id),
size: buf.bytes.len() as u64,
mode: buf.mode,
},
);
}
}
let inodes = self.inner.inodes.lock_or_poisoned();
for (id, entry) in pending.warm.iter() {
if pending.is_orphan(*id) {
continue;
}
let Some(record) = inodes.by_id.get(id) else {
continue;
};
let Some(path) = warm_path_of_record(record) else {
continue;
};
if pending.tombstones.contains(path) {
continue;
}
let Some((name, is_dir)) = project(path) else {
continue;
};
if is_dir {
out.entry(name).or_insert(PendingChildKind::Dir);
} else {
out.entry(name).or_insert(PendingChildKind::WarmFile {
size: entry.size,
mode: entry.mode,
});
}
}
drop(inodes);
for (path, target) in pending.symlinks.iter() {
let Some((name, is_dir)) = project(path) else {
continue;
};
if is_dir {
out.entry(name).or_insert(PendingChildKind::Dir);
} else {
out.entry(name).or_insert(PendingChildKind::Symlink {
size: target.len() as u64,
});
}
}
for explicit in pending.explicit_dirs.iter() {
let Some((name, _is_deeper)) = project(explicit) else {
continue;
};
out.entry(name).or_insert(PendingChildKind::Dir);
}
out.into_iter().collect()
}
}
fn validate_entry_name(name: &OsStr) -> Result<&str> {
let bytes = name.as_encoded_bytes();
if bytes.contains(&0) {
return Err(MountError::InvalidArgument(format!(
"entry name {name:?} contains NUL"
)));
}
let name_str = name.to_str().ok_or_else(|| {
MountError::InvalidArgument(format!("entry name {name:?} is not valid UTF-8"))
})?;
objects::object::validate_tree_entry_name(name_str)
.map_err(|e| MountError::InvalidArgument(e.to_string()))?;
Ok(name_str)
}
fn warm_path_of_record(record: &NodeRecord) -> Option<&Path> {
match record {
NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => Some(path),
_ => None,
}
}
fn symlink_target_from_bytes(bytes: &[u8]) -> Result<OsString> {
#[cfg(unix)]
{
use std::os::unix::ffi::OsStrExt;
Ok(OsStr::from_bytes(bytes).to_os_string())
}
#[cfg(not(unix))]
{
match std::str::from_utf8(bytes) {
Ok(s) => Ok(OsString::from(s)),
Err(_) => Err(MountError::InvalidArgument(
"captured symlink target bytes are not valid UTF-8".into(),
)),
}
}
}
#[inline]
fn join_child(parent: &Path, name: &str) -> PathBuf {
if parent.as_os_str().is_empty() {
PathBuf::from(name)
} else {
parent.join(name)
}
}
#[inline]
fn copy_into(src: &[u8], offset: u64, buf: &mut [u8]) -> usize {
let offset = offset as usize;
if offset >= src.len() {
return 0;
}
let take = std::cmp::min(buf.len(), src.len() - offset);
buf[..take].copy_from_slice(&src[offset..offset + take]);
take
}
enum Overlay {
Warm(ContentHash),
Gone,
}
#[allow(dead_code)] enum PendingHit {
Hot {
node: NodeId,
size: u64,
mode: FileMode,
},
Warm {
blob: ContentHash,
size: u64,
mode: FileMode,
},
Symlink {
target_len: u64,
},
Tombstone,
}
enum PendingChildKind {
HotFile {
node: NodeId,
size: u64,
mode: FileMode,
},
WarmFile {
size: u64,
mode: FileMode,
},
Symlink {
size: u64,
},
Dir,
}
impl<S: ObjectStore> MountInner<S> {
fn sweep_idle_buffers(&self) -> Result<()> {
let now = Instant::now();
let idle_after = self.promotion.read_or_poisoned().idle_after;
let to_promote: Vec<u64> = {
let pending = self.pending.lock_or_poisoned();
pending
.hot
.iter()
.filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
.map(|(id, _)| *id)
.collect()
};
for id in to_promote {
let _ = self.flush_node(NodeId(id));
}
Ok(())
}
fn flush_node(&self, node: NodeId) -> Result<()> {
let (path, mode, bytes) = {
let mut pending = self.pending.lock_or_poisoned();
if pending.is_orphan(node.0) {
return Ok(());
}
let Some(buf) = pending.hot.remove(&node.0) else {
return Ok(());
};
if pending.hot_by_path.get(&buf.path) == Some(&node.0) {
pending.hot_by_path.remove(&buf.path);
}
(buf.path, buf.mode, buf.bytes)
};
let size = bytes.len() as u64;
let blob = Blob::new(bytes);
let blob_oid = self
.repo
.store()
.put_blob(&blob)
.map_err(MountError::Store)?;
debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
let mut pending = self.pending.lock_or_poisoned();
pending.warm.insert(
node.0,
PendingEntry {
blob: blob_oid,
mode,
size,
},
);
pending.tombstones.remove(&path);
Ok(())
}
fn release_node(&self, node: NodeId) -> Result<()> {
enum Outcome {
Flush,
OrphanFinal { hot: Option<(PathBuf, Vec<u8>)> },
MaybeUntrackedNoop,
}
let outcome = {
let mut pending = self.pending.lock_or_poisoned();
match pending.state.get(&node.0).copied() {
None => {
if pending.hot.contains_key(&node.0) {
Outcome::Flush
} else {
Outcome::MaybeUntrackedNoop
}
}
Some(NodeState::Live { open_count }) => {
let n = open_count.saturating_sub(1);
if n == 0 {
pending.state.remove(&node.0);
} else {
pending
.state
.insert(node.0, NodeState::Live { open_count: n });
}
Outcome::Flush
}
Some(NodeState::Orphan { open_count }) => {
let n = open_count.saturating_sub(1);
if n == 0 {
let hot = pending
.hot
.get(&node.0)
.map(|buf| (buf.path.clone(), buf.bytes.clone()));
Outcome::OrphanFinal { hot }
} else {
pending
.state
.insert(node.0, NodeState::Orphan { open_count: n });
Outcome::Flush
}
}
}
};
match outcome {
Outcome::Flush => self.flush_node(node),
Outcome::MaybeUntrackedNoop => {
if !self
.inodes
.lock()
.expect("inode lock")
.by_id
.contains_key(&node.0)
{
return Err(MountError::NotFound(format!("node {}", node.0)));
}
Ok(())
}
Outcome::OrphanFinal { hot } => {
if let Some((path, bytes)) = hot {
let size = bytes.len() as u64;
let blob = Blob::new(bytes);
let blob_oid = self
.repo
.store()
.put_blob(&blob)
.map_err(MountError::Store)?;
debug!(?path, %blob_oid, size, "persisted orphan hot buffer to CAS");
}
let mut pending = self.pending.lock_or_poisoned();
pending.state.remove(&node.0);
pending.hot.remove(&node.0);
pending.warm.remove(&node.0);
Ok(())
}
}
}
}
fn spawn_sweep_worker<S: ObjectStore + 'static>(inner: &Arc<MountInner<S>>) -> Option<SweepHandle> {
let interval = inner
.promotion
.read()
.expect("promotion lock")
.sweep_interval?;
let weak = Arc::downgrade(inner);
let state = Arc::new(SweepShutdown::new());
let state_for_thread = Arc::clone(&state);
let join = std::thread::Builder::new()
.name("heddle-mount-sweep".into())
.spawn(move || sweep_worker_loop(weak, state_for_thread, interval))
.ok()?;
Some(SweepHandle {
state,
join: Some(join),
})
}
fn sweep_worker_loop<S: ObjectStore + 'static>(
inner: std::sync::Weak<MountInner<S>>,
state: Arc<SweepShutdown>,
interval: Duration,
) {
loop {
if state.wait(interval) {
return;
}
let Some(mount) = inner.upgrade() else {
return;
};
if let Err(err) = mount.sweep_idle_buffers() {
warn!(?err, "sweep worker hit error promoting idle buffers");
}
drop(mount);
}
}
fn resolve_thread<S: ObjectStore>(
repo: &Repository<RefManager, OpLog, S>,
thread: &str,
) -> Result<MountState> {
let thread_name = objects::object::ThreadName::from(thread);
let change_id = repo
.refs()
.get_thread(&thread_name)?
.ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
let state = repo
.store()
.get_state(&change_id)?
.ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
Ok(MountState {
change_id,
tree: state.tree,
})
}
impl<S: ObjectStore + 'static> PlatformShell for ContentAddressedMount<S> {
fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
let record = self.record_for(parent)?;
let parent_path = match self.dir_path_of(&record) {
Some(p) => p,
None => return Ok(None),
};
let Some(name_str) = name.to_str() else {
return Ok(None);
};
let child_path = join_child(&parent_path, name_str);
match self.pending_lookup(&child_path) {
Some(PendingHit::Tombstone) => return Ok(None),
Some(hit) => {
if let Some(entry) = self.entry_from_pending_hit(hit, &child_path, name) {
return Ok(Some(entry));
}
return Ok(None);
}
None => {}
}
{
let pending = self.inner.pending.lock_or_poisoned();
if pending.dir_tombstones.contains(&child_path)
|| self.ancestor_is_dir_tombstoned(&pending, &child_path)
{
return Ok(None);
}
}
let parent_tree = self.tree_for_record(&record)?;
if let Some(tree_entry) = parent_tree.get(name_str) {
return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
}
if self.pending_dir_exists(&child_path) {
let node = self.intern(NodeRecord::PendingDir {
path: child_path.clone(),
});
return Ok(Some(Entry {
node,
name: OsString::from(name_str),
kind: NodeKind::Directory,
size: self.pending_children_at(&child_path).len() as u64,
unix_mode: DIR_UNIX_MODE,
}));
}
Ok(None)
}
fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
let record = self.record_for(node)?;
{
let pending = self.inner.pending.lock_or_poisoned();
if let Some(hot) = pending.hot.get(&node.0) {
return Ok(copy_into(&hot.bytes, offset, buf));
}
}
match &record {
NodeRecord::PendingFile { path, .. } => {
let warm_blob = {
let pending = self.inner.pending.lock_or_poisoned();
if pending.is_orphan(node.0) {
return match pending.warm.get(&node.0).map(|e| e.blob) {
Some(blob) => {
drop(pending);
let bytes = self.load_blob_bytes(&blob)?;
Ok(copy_into(&bytes, offset, buf))
}
None => Err(MountError::Stale(format!(
"orphan pending file {} has no readable bytes",
path.display()
))),
};
}
if let Some(id) = pending.hot_by_path.get(path).copied()
&& let Some(hot) = pending.hot.get(&id)
{
return Ok(copy_into(&hot.bytes, offset, buf));
}
let inodes = self.inner.inodes.lock_or_poisoned();
inodes
.by_path
.get(path)
.copied()
.and_then(|id| pending.warm.get(&id).map(|e| e.blob))
};
match warm_blob {
Some(blob) => {
let bytes = self.load_blob_bytes(&blob)?;
Ok(copy_into(&bytes, offset, buf))
}
None => Err(MountError::Stale(format!(
"pending file {}",
path.display()
))),
}
}
NodeRecord::File { blob, path, .. } => {
let overlay = {
let pending = self.inner.pending.lock_or_poisoned();
if pending.is_orphan(node.0) {
pending
.warm
.get(&node.0)
.map(|warm| Overlay::Warm(warm.blob))
} else if pending.tombstones.contains(path) {
Some(Overlay::Gone)
} else if let Some(other_id) = pending.hot_by_path.get(path).copied()
&& let Some(hot) = pending.hot.get(&other_id)
{
return Ok(copy_into(&hot.bytes, offset, buf));
} else {
let inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.get(path).copied().and_then(|id| {
pending.warm.get(&id).map(|warm| Overlay::Warm(warm.blob))
})
}
};
match overlay {
Some(Overlay::Gone) => Err(MountError::Stale(format!(
"file {} was unlinked through the mount",
path.display()
))),
Some(Overlay::Warm(blob)) => {
let bytes = self.load_blob_bytes(&blob)?;
Ok(copy_into(&bytes, offset, buf))
}
None => {
let bytes = self.load_blob_bytes(blob)?;
Ok(copy_into(&bytes, offset, buf))
}
}
}
NodeRecord::Gitlink { placeholder, .. } => Ok(copy_into(placeholder, offset, buf)),
NodeRecord::Symlink { blob } => {
let bytes = self.load_blob_bytes(blob)?;
Ok(copy_into(&bytes, offset, buf))
}
_ => Err(MountError::NotFound(format!(
"read on non-file node {}",
node.0
))),
}
}
fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
let end = validate_write_extent(offset, data.len())?;
let offset = usize::try_from(offset).map_err(|_| {
MountError::InvalidArgument(format!("write offset {offset} does not fit in usize"))
})?;
let record = self.record_for(node)?;
let (path, mode, captured_blob) = match &record {
NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
NodeRecord::File {
path, mode, blob, ..
} => (path.clone(), *mode, Some(*blob)),
_ => return Err(MountError::ReadOnly),
};
enum Seed {
None,
Blob(ContentHash),
}
let seed = {
let path_owner = {
let inodes = self.inner.inodes.lock_or_poisoned();
inodes.by_path.get(&path).copied()
};
let pending = self.inner.pending.lock_or_poisoned();
let orphan = pending.is_orphan(node.0);
if pending.hot.contains_key(&node.0) {
Seed::None
} else if !orphan
&& pending
.hot_by_path
.get(&path)
.is_some_and(|id| pending.hot.contains_key(id))
{
Seed::None
} else if orphan {
pending
.warm
.get(&node.0)
.map(|e| Seed::Blob(e.blob))
.or_else(|| captured_blob.map(Seed::Blob))
.unwrap_or(Seed::None)
} else if pending.tombstones.contains(&path) {
Seed::None
} else if let Some(entry) = path_owner.and_then(|id| pending.warm.get(&id)) {
Seed::Blob(entry.blob)
} else if let Some(blob) = captured_blob {
Seed::Blob(blob)
} else {
Seed::None
}
};
let seed_bytes = match seed {
Seed::None => None,
Seed::Blob(hash) => Some((*self.load_blob_bytes(&hash)?).to_vec()),
};
let mut pending = self.inner.pending.lock_or_poisoned();
let orphan = pending.is_orphan(node.0);
if !orphan {
if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
&& existing_id != node.0
&& let Some(buf) = pending.hot.remove(&existing_id)
{
pending.hot.insert(node.0, buf);
}
pending.hot_by_path.insert(path.clone(), node.0);
pending.tombstones.remove(&path);
}
let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
path: path.clone(),
mode,
bytes: seed_bytes.unwrap_or_default(),
last_touched: Instant::now(),
});
if buf.bytes.len() < end {
buf.bytes.resize(end, 0);
}
buf.bytes[offset..end].copy_from_slice(data);
buf.last_touched = Instant::now();
let written = data.len();
drop(pending);
let _ = self.promote_idle_buffers();
Ok(written)
}
fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
let record = self.record_for(dir)?;
let parent_path = match self.dir_path_of(&record) {
Some(p) => p,
None => return Err(MountError::NotADirectory(format!("{record:?}"))),
};
let tree = self.tree_for_record(&record)?;
let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
{
let pending = self.inner.pending.lock_or_poisoned();
if pending.dir_tombstones.contains(&parent_path)
|| self.ancestor_is_dir_tombstoned(&pending, &parent_path)
{
return Ok(vec![]);
}
}
for tree_entry in tree.entries() {
let entry_path = join_child(&parent_path, tree_entry.name());
{
let pending = self.inner.pending.lock_or_poisoned();
if pending.dir_tombstones.contains(&entry_path) {
continue;
}
}
match self.pending_lookup(&entry_path) {
Some(PendingHit::Tombstone) => continue,
Some(hit) => {
if let Some(entry) =
self.entry_from_pending_hit(hit, &entry_path, OsStr::new(tree_entry.name()))
{
by_name.insert(tree_entry.name().to_string(), entry);
}
continue;
}
None => {}
}
let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
by_name.insert(tree_entry.name().to_string(), entry);
}
let pending_children = self.pending_children_at(&parent_path);
for (name, kind) in pending_children {
if by_name.contains_key(&name) {
continue;
}
let full_path = join_child(&parent_path, &name);
match kind {
PendingChildKind::HotFile { node, size, mode } => {
by_name.insert(
name.clone(),
Entry {
node,
name: OsString::from(&name),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
},
);
}
PendingChildKind::WarmFile { size, mode } => {
let node = self.intern(NodeRecord::PendingFile {
path: full_path,
mode,
});
by_name.insert(
name.clone(),
Entry {
node,
name: OsString::from(&name),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
},
);
}
PendingChildKind::Dir => {
let node = self.intern(NodeRecord::PendingDir { path: full_path });
by_name.insert(
name.clone(),
Entry {
node,
name: OsString::from(&name),
kind: NodeKind::Directory,
size: 0,
unix_mode: DIR_UNIX_MODE,
},
);
}
PendingChildKind::Symlink { size } => {
let node = self.intern(NodeRecord::PendingSymlink { path: full_path });
by_name.insert(
name.clone(),
Entry {
node,
name: OsString::from(&name),
kind: NodeKind::Symlink,
size,
unix_mode: FileMode::Symlink.to_unix_mode(),
},
);
}
}
}
Ok(by_name.into_values().collect())
}
fn attrs(&self, node: NodeId) -> Result<Attrs> {
let record = self.record_for(node)?;
let kind = record.kind();
let unix_mode = record.unix_mode();
let (size, nlink) = match &record {
NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
let tree = self.load_tree(tree)?;
(tree.entries().len() as u64, 2)
}
NodeRecord::PendingDir { path } => {
(self.pending_children_at(path).len() as u64, 2)
}
NodeRecord::File { blob, path, .. } => {
let overlay_size = {
let pending = self.inner.pending.lock_or_poisoned();
if let Some(buf) = pending.hot.get(&node.0) {
Some(Some(buf.bytes.len() as u64))
} else if pending.is_orphan(node.0) {
pending.warm.get(&node.0).map(|e| Some(e.size))
} else if pending.tombstones.contains(path) {
Some(None)
} else if let Some(other_id) = pending.hot_by_path.get(path).copied()
&& let Some(hot) = pending.hot.get(&other_id)
{
Some(Some(hot.bytes.len() as u64))
} else {
let inodes = self.inner.inodes.lock_or_poisoned();
inodes
.by_path
.get(path)
.copied()
.and_then(|id| pending.warm.get(&id).map(|warm| Some(warm.size)))
}
};
match overlay_size {
Some(Some(size)) => (size, 1),
Some(None) => {
return Err(MountError::Stale(format!(
"file {} was unlinked through the mount",
path.display()
)));
}
None => (self.blob_size(blob)?, 1),
}
}
NodeRecord::Gitlink { placeholder, .. } => (placeholder.len() as u64, 1),
NodeRecord::Symlink { blob } => (self.blob_size(blob)?, 1),
NodeRecord::PendingFile { path, .. } => {
let orphan_size = {
let pending = self.inner.pending.lock_or_poisoned();
if pending.is_orphan(node.0) {
Some(
pending
.hot
.get(&node.0)
.map(|buf| buf.bytes.len() as u64)
.or_else(|| pending.warm.get(&node.0).map(|e| e.size)),
)
} else {
None
}
};
if let Some(opt) = orphan_size {
let size = opt.ok_or_else(|| {
MountError::Stale(format!(
"orphan pending file {} has no buffered bytes",
path.display()
))
})?;
(size, 1)
} else {
let hit = self.pending_lookup(path).ok_or_else(|| {
MountError::Stale(format!("pending file {}", path.display()))
})?;
let size = match hit {
PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
PendingHit::Symlink { target_len } => target_len,
PendingHit::Tombstone => 0,
};
(size, 1)
}
}
NodeRecord::PendingSymlink { path } => {
let pending = self.inner.pending.lock_or_poisoned();
let size = pending
.symlinks
.get(path)
.map(|t| t.len() as u64)
.ok_or_else(|| {
MountError::Stale(format!("pending symlink {}", path.display()))
})?;
(size, 1)
}
};
let _ = self.path_of(&record);
Ok(Attrs {
node,
kind,
size,
unix_mode,
nlink,
mtime: self.inner.mounted_at,
})
}
fn invalidate(&self, node: NodeId) -> Result<()> {
let retire_inode_record = {
let mut pending = self.inner.pending.lock_or_poisoned();
pending.with_brand(|bp| {
bp.kernel_forget_inode(node.0)
.map(|warm_still_references| !warm_still_references)
.unwrap_or(false)
})
};
if retire_inode_record {
self.inner.inodes.lock_or_poisoned().forget(node);
}
Ok(())
}
fn flush(&self, node: NodeId) -> Result<()> {
self.flush_node(node)
}
fn release(&self, node: NodeId) -> Result<()> {
self.release_node(node)
}
fn on_open(&self, node: NodeId) -> Result<()> {
ContentAddressedMount::on_open(self, node)
}
fn create_file(
&self,
parent: NodeId,
name: &OsStr,
mode: FileMode,
exclusive: bool,
) -> Result<Entry> {
ContentAddressedMount::create_file(self, parent, name, mode, exclusive)
}
fn make_dir(&self, parent: NodeId, name: &OsStr) -> Result<Entry> {
ContentAddressedMount::make_dir(self, parent, name)
}
fn unlink_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
ContentAddressedMount::unlink_entry(self, parent, name)
}
fn rmdir_entry(&self, parent: NodeId, name: &OsStr) -> Result<()> {
ContentAddressedMount::rmdir_entry(self, parent, name)
}
fn rename_entry(
&self,
old_parent: NodeId,
old_name: &OsStr,
new_parent: NodeId,
new_name: &OsStr,
) -> Result<()> {
ContentAddressedMount::rename_entry(self, old_parent, old_name, new_parent, new_name)
}
fn rename_entry_with_options(
&self,
old_parent: NodeId,
old_name: &OsStr,
new_parent: NodeId,
new_name: &OsStr,
options: RenameOptions,
) -> Result<()> {
ContentAddressedMount::rename_entry_with_options(
self, old_parent, old_name, new_parent, new_name, options,
)
}
fn set_attrs(&self, node: NodeId, update: AttrUpdate) -> Result<Attrs> {
ContentAddressedMount::set_attrs(self, node, update)
}
fn create_symlink(&self, parent: NodeId, name: &OsStr, target: &Path) -> Result<Entry> {
ContentAddressedMount::create_symlink(self, parent, name, target)
}
fn read_link(&self, node: NodeId) -> Result<OsString> {
ContentAddressedMount::read_link(self, node)
}
}
impl ContentAddressedMount {
pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
let attribution = self
.inner
.repo
.get_attribution()
.map_err(MountError::Store)?;
self.capture_with_attribution(intent, attribution)
}
#[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
pub fn capture_with_attribution(
&self,
intent: impl Into<Option<String>>,
attribution: Attribution,
) -> Result<ChangeId> {
self.flush_all()?;
let state_snapshot = *self.inner.state.read_or_poisoned();
let parent_tree = self.load_tree(&state_snapshot.tree)?;
let tree_hash = {
let pending = self.inner.pending.lock_or_poisoned();
let inodes = self.inner.inodes.lock_or_poisoned();
apply_pending_to_tree(self.store(), &parent_tree, &pending, &inodes)?
};
let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
let parents = match parent_id {
Some(id) => vec![id],
None => vec![],
};
let mut state = State::new_snapshot(tree_hash, parents, attribution);
if let Some(intent) = intent.into() {
state = state.with_intent(intent);
}
state = state.with_confidence(self.inner.repo.config().defaults.confidence);
self.inner
.repo
.put_authored_state(&mut state)
.map_err(MountError::Store)?;
let change_id = state.change_id;
let prev_head_change_id = state_snapshot.change_id;
let served_thread = objects::object::ThreadName::from(self.inner.thread.as_str());
self.inner
.repo
.commit_snapshot_atomic_with_capture_visibility(
&change_id,
Some(prev_head_change_id),
Some(&served_thread),
false,
)
.map_err(MountError::Store)?;
let new_tree = self.load_tree(&tree_hash)?;
if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
&self.inner.repo,
&state,
&new_tree,
) {
warn!(?err, "thread metadata refresh from mount capture failed");
}
{
let mut pending = self.inner.pending.lock_or_poisoned();
pending.drain_for_capture();
}
let mut state_lock = self.inner.state.write_or_poisoned();
*state_lock = MountState {
change_id,
tree: tree_hash,
};
let mut inodes = self.inner.inodes.lock_or_poisoned();
if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
*record = NodeRecord::Root { tree: tree_hash };
}
warn!(
thread = %self.inner.thread,
change = %change_id,
"captured mount writes into new state"
);
Ok(change_id)
}
}
fn apply_pending_to_tree(
store: &impl ObjectStore,
parent: &Tree,
pending: &Pending,
inodes: &Inodes,
) -> Result<ContentHash> {
#[derive(Default)]
struct VDir {
files: BTreeMap<String, (ContentHash, FileMode)>,
symlinks: BTreeMap<String, ContentHash>,
deletions: BTreeSet<String>,
dir_deletions: BTreeSet<String>,
explicit_empty: BTreeSet<String>,
children: BTreeMap<String, VDir>,
}
let mut root = VDir::default();
fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
let mut cursor = node;
for c in components {
if !cursor.children.contains_key(*c) {
cursor.children.insert((*c).to_string(), VDir::default());
}
cursor = cursor.children.get_mut(*c).unwrap();
}
cursor
}
for (id, entry) in &pending.warm {
if pending.is_orphan(*id) {
continue;
}
let Some(record) = inodes.by_id.get(id) else {
continue;
};
let Some(path) = warm_path_of_record(record) else {
continue;
};
if inodes.by_path.get(path) != Some(id) {
continue;
}
let comps: Vec<&str> = path
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let Some((leaf_name, dir_components)) = comps.split_last() else {
continue;
};
let dir = descend(&mut root, dir_components);
dir.files
.insert((*leaf_name).to_string(), (entry.blob, entry.mode));
dir.deletions.remove(*leaf_name);
}
for (path, target_bytes) in &pending.symlinks {
let comps: Vec<&str> = path
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let Some((leaf_name, dir_components)) = comps.split_last() else {
continue;
};
let blob = Blob::new(target_bytes.clone());
let blob_oid = store.put_blob(&blob).map_err(MountError::Store)?;
let dir = descend(&mut root, dir_components);
dir.symlinks.insert((*leaf_name).to_string(), blob_oid);
dir.deletions.remove(*leaf_name);
}
for explicit in &pending.explicit_dirs {
let comps: Vec<&str> = explicit
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let Some((leaf_name, dir_components)) = comps.split_last() else {
continue;
};
let parent_dir = descend(&mut root, dir_components);
parent_dir.explicit_empty.insert((*leaf_name).to_string());
parent_dir
.children
.entry((*leaf_name).to_string())
.or_default();
}
for tomb in &pending.tombstones {
let comps: Vec<&str> = tomb
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let Some((leaf_name, dir_components)) = comps.split_last() else {
continue;
};
let dir = descend(&mut root, dir_components);
dir.files.remove(*leaf_name);
dir.symlinks.remove(*leaf_name);
dir.deletions.insert((*leaf_name).to_string());
}
for tomb in &pending.dir_tombstones {
let comps: Vec<&str> = tomb
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let Some((leaf_name, dir_components)) = comps.split_last() else {
continue;
};
let dir = descend(&mut root, dir_components);
dir.children.remove(*leaf_name);
dir.explicit_empty.remove(*leaf_name);
dir.dir_deletions.insert((*leaf_name).to_string());
}
fn materialize(
v: &VDir,
captured: &Tree,
store: &impl ObjectStore,
) -> Result<Option<ContentHash>> {
let mut entries: BTreeMap<String, TreeEntry> = captured
.entries()
.iter()
.map(|e| (e.name().to_string(), e.clone()))
.collect();
for name in &v.deletions {
entries.remove(name);
}
for name in &v.dir_deletions {
entries.remove(name);
}
for (name, (blob, mode)) in &v.files {
let executable = matches!(mode, FileMode::Executable);
let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
})?;
entries.insert(name.clone(), entry);
}
for (name, blob) in &v.symlinks {
let entry = TreeEntry::symlink(name.clone(), *blob).map_err(|e| {
MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
})?;
entries.insert(name.clone(), entry);
}
for (name, child) in &v.children {
if v.dir_deletions.contains(name) {
entries.remove(name);
continue;
}
let child_captured = match captured.get(name) {
Some(e) if e.is_tree() => {
let hash = e.tree_hash().ok_or_else(|| {
MountError::Store(objects::error::HeddleError::MissingObject {
object_type: "tree".to_string(),
id: "<non-tree>".to_string(),
})
})?;
store
.get_tree(&hash)
.map_err(MountError::Store)?
.ok_or_else(|| {
MountError::Store(objects::error::HeddleError::MissingObject {
object_type: "tree".to_string(),
id: hash.to_string(),
})
})?
}
_ => Tree::new(),
};
let force_empty = v.explicit_empty.contains(name);
match materialize(child, &child_captured, store)? {
Some(hash) => {
let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
})?;
entries.insert(name.clone(), entry);
}
None if force_empty => {
let hash = store.put_tree(&Tree::new()).map_err(MountError::Store)?;
let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
})?;
entries.insert(name.clone(), entry);
}
None => {
entries.remove(name);
}
}
}
if entries.is_empty() {
return Ok(None);
}
let tree = Tree::from_entries(entries.into_values().collect());
let hash = store.put_tree(&tree).map_err(MountError::Store)?;
Ok(Some(hash))
}
let hash = match materialize(&root, parent, store)? {
Some(h) => h,
None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
};
Ok(hash)
}
impl<S: ObjectStore + 'static> ContentAddressedMount<S> {
#[cfg(test)]
pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
let pending = self.inner.pending.lock_or_poisoned();
let inodes = self.inner.inodes.lock_or_poisoned();
pending
.warm
.keys()
.filter(|id| !pending.is_orphan(**id))
.filter_map(|id| inodes.by_id.get(id).and_then(warm_path_of_record))
.map(Path::to_path_buf)
.collect()
}
#[cfg(test)]
pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
let path = path.as_ref();
let id = self
.inner
.inodes
.lock()
.expect("inode lock")
.by_path
.get(path)
.copied()?;
self.inner
.pending
.lock()
.expect("pending lock")
.warm
.get(&id)
.map(|e| e.blob)
}
#[cfg(test)]
pub(crate) fn hot_buffer_count(&self) -> usize {
self.inner.pending.lock_or_poisoned().hot.len()
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
self.inner
.pending
.lock()
.expect("pending lock")
.tombstones
.iter()
.cloned()
.collect()
}
#[cfg(test)]
pub(crate) fn repo_handle(&self) -> &Repository<RefManager, OpLog, S> {
&self.inner.repo
}
#[cfg(test)]
pub(crate) fn orphans_contains(&self, node: NodeId) -> bool {
self.inner
.pending
.lock()
.expect("pending lock")
.is_orphan(node.0)
}
}
#[cfg(test)]
pub(crate) mod test_helpers {
use super::*;
pub(crate) fn install_pending_file(
mount: &ContentAddressedMount,
name: &str,
mode: FileMode,
) -> NodeId {
let path = PathBuf::from(name);
mount.intern(NodeRecord::PendingFile { path, mode })
}
}