use std::{
collections::{BTreeMap, BTreeSet},
ffi::{OsStr, OsString},
path::{Component, Path, PathBuf},
sync::{
Arc, Mutex, RwLock,
atomic::{AtomicBool, Ordering},
},
thread::JoinHandle,
time::{Duration, Instant, SystemTime},
};
use objects::{
object::{
Attribution, Blob, ChangeId, ContentHash, EntryType, FileMode, State, Tree, TreeEntry,
},
store::ObjectStore,
};
use refs::Head;
use repo::Repository;
use tracing::{debug, instrument, warn};
use crate::{
error::{MountError, Result},
shell::{Attrs, DIR_UNIX_MODE, Entry, NodeId, NodeKind, PlatformShell, kind_for_mode},
};
const DEFAULT_PROMOTION_IDLE: Duration = Duration::from_secs(2);
const DEFAULT_SWEEP_INTERVAL: Option<Duration> = Some(Duration::from_secs(5));
#[derive(Clone, Copy, Debug)]
pub struct PromotionPolicy {
pub idle_after: Duration,
pub sweep_interval: Option<Duration>,
}
impl Default for PromotionPolicy {
fn default() -> Self {
Self {
idle_after: DEFAULT_PROMOTION_IDLE,
sweep_interval: DEFAULT_SWEEP_INTERVAL,
}
}
}
#[derive(Clone, Debug)]
enum NodeRecord {
Root {
tree: ContentHash,
},
Dir {
tree: ContentHash,
path: PathBuf,
},
PendingDir {
path: PathBuf,
},
File {
blob: ContentHash,
mode: FileMode,
path: PathBuf,
},
Symlink {
blob: ContentHash,
},
PendingFile {
path: PathBuf,
mode: FileMode,
},
}
impl NodeRecord {
fn kind(&self) -> NodeKind {
match self {
NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
NodeKind::Directory
}
NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
kind_for_mode(*mode)
}
NodeRecord::Symlink { .. } => NodeKind::Symlink,
}
}
fn unix_mode(&self) -> u32 {
match self {
NodeRecord::Root { .. } | NodeRecord::Dir { .. } | NodeRecord::PendingDir { .. } => {
DIR_UNIX_MODE
}
NodeRecord::File { mode, .. } | NodeRecord::PendingFile { mode, .. } => {
mode.to_unix_mode()
}
NodeRecord::Symlink { .. } => FileMode::Symlink.to_unix_mode(),
}
}
}
#[derive(Default)]
struct Inodes {
next: u64,
by_id: BTreeMap<u64, NodeRecord>,
by_hash: BTreeMap<HashKey, u64>,
by_path: BTreeMap<PathBuf, u64>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct HashKey {
kind: u8,
hash: ContentHash,
}
impl Inodes {
fn new(root_tree: ContentHash) -> Self {
let mut me = Self {
next: NodeId::ROOT.0 + 1,
by_id: BTreeMap::new(),
by_hash: BTreeMap::new(),
by_path: BTreeMap::new(),
};
me.by_id
.insert(NodeId::ROOT.0, NodeRecord::Root { tree: root_tree });
me.by_hash.insert(
HashKey {
kind: 0,
hash: root_tree,
},
NodeId::ROOT.0,
);
me
}
fn get(&self, id: NodeId) -> Option<NodeRecord> {
self.by_id.get(&id.0).cloned()
}
fn intern(&mut self, record: NodeRecord) -> NodeId {
match &record {
NodeRecord::Root { tree } => {
let key = HashKey {
kind: 0,
hash: *tree,
};
if let Some(&id) = self.by_hash.get(&key) {
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_id.insert(id, record);
self.by_hash.insert(key, id);
NodeId(id)
}
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
if let Some(&id) = self.by_path.get(path) {
self.by_id.insert(id, record);
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_path.insert(path.clone(), id);
self.by_id.insert(id, record);
NodeId(id)
}
NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => {
if let Some(&id) = self.by_path.get(path) {
self.by_id.insert(id, record);
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_path.insert(path.clone(), id);
self.by_id.insert(id, record);
NodeId(id)
}
NodeRecord::Symlink { blob } => {
let key = HashKey {
kind: 2,
hash: *blob,
};
if let Some(&id) = self.by_hash.get(&key) {
return NodeId(id);
}
let id = self.next;
self.next += 1;
self.by_id.insert(id, record);
self.by_hash.insert(key, id);
NodeId(id)
}
}
}
fn forget(&mut self, id: NodeId) {
if id == NodeId::ROOT {
return;
}
if let Some(record) = self.by_id.remove(&id.0) {
match record {
NodeRecord::Root { tree } => {
self.by_hash.remove(&HashKey {
kind: 0,
hash: tree,
});
}
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => {
self.by_path.remove(&path);
}
NodeRecord::File { path, .. } | NodeRecord::PendingFile { path, .. } => {
self.by_path.remove(&path);
}
NodeRecord::Symlink { blob } => {
self.by_hash.remove(&HashKey {
kind: 2,
hash: blob,
});
}
}
}
}
}
struct HotBuffer {
path: PathBuf,
mode: FileMode,
bytes: Vec<u8>,
last_touched: Instant,
}
#[derive(Clone, Debug)]
struct PendingEntry {
blob: ContentHash,
mode: FileMode,
size: u64,
}
#[derive(Default)]
struct Pending {
hot: BTreeMap<u64, HotBuffer>,
hot_by_path: BTreeMap<PathBuf, u64>,
warm: BTreeMap<PathBuf, PendingEntry>,
tombstones: BTreeSet<PathBuf>,
}
pub struct ContentAddressedMount {
inner: Arc<MountInner>,
sweeper: Mutex<Option<SweepHandle>>,
}
pub(crate) struct MountInner {
repo: Repository,
thread: String,
state: RwLock<MountState>,
inodes: Mutex<Inodes>,
pending: Mutex<Pending>,
promotion: RwLock<PromotionPolicy>,
mounted_at: SystemTime,
}
struct SweepHandle {
shutdown: Arc<AtomicBool>,
join: Option<JoinHandle<()>>,
}
impl SweepHandle {
fn signal_and_join(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.join.take() {
let _ = handle.join();
}
}
}
impl Drop for SweepHandle {
fn drop(&mut self) {
self.signal_and_join();
}
}
impl Drop for ContentAddressedMount {
fn drop(&mut self) {
if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
handle.signal_and_join();
}
}
}
#[derive(Clone, Copy, Debug)]
struct MountState {
change_id: ChangeId,
tree: ContentHash,
}
impl ContentAddressedMount {
pub fn new(repo: Repository, thread: impl Into<String>) -> Result<Self> {
let thread = thread.into();
let state = resolve_thread(&repo, &thread)?;
let inodes = Mutex::new(Inodes::new(state.tree));
let inner = Arc::new(MountInner {
repo,
thread,
state: RwLock::new(state),
inodes,
pending: Mutex::new(Pending::default()),
promotion: RwLock::new(PromotionPolicy::default()),
mounted_at: SystemTime::now(),
});
let sweeper = spawn_sweep_worker(&inner);
Ok(Self {
inner,
sweeper: Mutex::new(sweeper),
})
}
pub fn with_promotion_policy(self, policy: PromotionPolicy) -> Self {
if let Some(mut handle) = self.sweeper.lock().expect("sweeper lock").take() {
handle.signal_and_join();
}
*self.inner.promotion.write().expect("promotion lock") = policy;
let sweeper = spawn_sweep_worker(&self.inner);
*self.sweeper.lock().expect("sweeper lock") = sweeper;
self
}
pub fn refresh(&self) -> Result<()> {
let next = resolve_thread(&self.inner.repo, &self.inner.thread)?;
*self.inner.state.write().expect("mount state lock") = next;
Ok(())
}
pub fn thread(&self) -> &str {
&self.inner.thread
}
pub fn current_change_id(&self) -> ChangeId {
self.inner.state.read().expect("mount state lock").change_id
}
fn store(&self) -> &dyn ObjectStore {
self.inner.repo.store()
}
fn load_tree(&self, hash: &ContentHash) -> Result<Tree> {
self.store()
.get_tree(hash)?
.ok_or_else(|| MountError::NotFound(format!("tree {hash}")))
}
fn load_blob_bytes(&self, hash: &ContentHash) -> Result<Vec<u8>> {
let blob = self
.store()
.get_blob(hash)?
.ok_or_else(|| MountError::NotFound(format!("blob {hash}")))?;
Ok(blob.into_content())
}
fn blob_size(&self, hash: &ContentHash) -> Result<u64> {
self.store()
.blob_size(hash)?
.ok_or_else(|| MountError::NotFound(format!("blob {hash}")))
}
fn record_for(&self, id: NodeId) -> Result<NodeRecord> {
self.inner
.inodes
.lock()
.expect("inode lock")
.get(id)
.ok_or_else(|| MountError::Stale(format!("node {}", id.0)))
}
fn intern(&self, record: NodeRecord) -> NodeId {
self.inner.inodes.lock().expect("inode lock").intern(record)
}
pub fn lookup_path(&self, path: impl AsRef<Path>) -> Result<NodeId> {
let mut node = NodeId::ROOT;
for component in path.as_ref().components() {
match component {
Component::CurDir | Component::RootDir => continue,
Component::Prefix(_) => {
return Err(MountError::NotFound(format!(
"unsupported path component in {}",
path.as_ref().display()
)));
}
Component::ParentDir => {
return Err(MountError::NotFound(format!(
"parent traversal not supported: {}",
path.as_ref().display()
)));
}
Component::Normal(name) => {
let entry = self
.lookup(node, name)?
.ok_or_else(|| MountError::NotFound(name.to_string_lossy().into_owned()))?;
node = entry.node;
}
}
}
Ok(node)
}
fn entry_from_tree_entry(&self, parent_path: &Path, tree_entry: &TreeEntry) -> Result<Entry> {
let entry_path = if parent_path.as_os_str().is_empty() {
PathBuf::from(&tree_entry.name)
} else {
parent_path.join(&tree_entry.name)
};
let (kind, size, unix_mode, record) = match tree_entry.entry_type {
EntryType::Tree => {
let subtree = self.load_tree(&tree_entry.hash)?;
(
NodeKind::Directory,
subtree.entries().len() as u64,
DIR_UNIX_MODE,
NodeRecord::Dir {
tree: tree_entry.hash,
path: entry_path,
},
)
}
EntryType::Blob => {
let size = self.blob_size(&tree_entry.hash)?;
let mode = tree_entry.mode;
(
kind_for_mode(mode),
size,
mode.to_unix_mode(),
NodeRecord::File {
blob: tree_entry.hash,
mode,
path: entry_path,
},
)
}
EntryType::Symlink => {
let size = self.blob_size(&tree_entry.hash)?;
(
NodeKind::Symlink,
size,
FileMode::Symlink.to_unix_mode(),
NodeRecord::Symlink {
blob: tree_entry.hash,
},
)
}
};
let node = self.intern(record);
Ok(Entry {
node,
name: OsString::from(&tree_entry.name),
kind,
size,
unix_mode,
})
}
fn tree_for_record(&self, record: &NodeRecord) -> Result<Tree> {
match record {
NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => self.load_tree(tree),
NodeRecord::PendingDir { .. } => Ok(Tree::new()),
_ => Err(MountError::NotADirectory(format!("{record:?}"))),
}
}
fn dir_path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
match record {
NodeRecord::Root { .. } => Some(PathBuf::new()),
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
_ => None,
}
}
fn path_of(&self, record: &NodeRecord) -> Option<PathBuf> {
match record {
NodeRecord::PendingFile { path, .. } | NodeRecord::File { path, .. } => {
Some(path.clone())
}
NodeRecord::Dir { path, .. } | NodeRecord::PendingDir { path } => Some(path.clone()),
_ => None,
}
}
fn promote_idle_buffers(&self) -> Result<()> {
self.inner.sweep_idle_buffers()
}
pub fn flush_node(&self, node: NodeId) -> Result<()> {
self.inner.flush_node(node)
}
pub fn unlink_path(&self, path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref().to_path_buf();
let mut pending = self.inner.pending.lock().expect("pending lock");
if let Some(node_id) = pending.hot_by_path.remove(&path) {
pending.hot.remove(&node_id);
}
pending.warm.remove(&path);
pending.tombstones.insert(path);
Ok(())
}
pub fn flush_all(&self) -> Result<()> {
let ids: Vec<u64> = self
.inner
.pending
.lock()
.expect("pending lock")
.hot
.keys()
.copied()
.collect();
for id in ids {
self.flush_node(NodeId(id))?;
}
Ok(())
}
fn pending_lookup(&self, path: &Path) -> Option<PendingHit> {
let pending = self.inner.pending.lock().expect("pending lock");
if pending.tombstones.contains(path) {
return Some(PendingHit::Tombstone);
}
if let Some(node_id) = pending.hot_by_path.get(path)
&& let Some(buf) = pending.hot.get(node_id)
{
return Some(PendingHit::Hot {
node: NodeId(*node_id),
size: buf.bytes.len() as u64,
mode: buf.mode,
});
}
if let Some(entry) = pending.warm.get(path) {
return Some(PendingHit::Warm {
blob: entry.blob,
size: entry.size,
mode: entry.mode,
});
}
None
}
fn pending_dir_exists(&self, dir: &Path) -> bool {
if dir.as_os_str().is_empty() {
return false;
}
let pending = self.inner.pending.lock().expect("pending lock");
let prefix = dir;
let probe = |path: &Path| -> bool {
path.strip_prefix(prefix)
.ok()
.and_then(|tail| tail.components().next())
.is_some()
};
pending
.warm
.keys()
.any(|p| !pending.tombstones.contains(p) && probe(p))
|| pending
.hot_by_path
.keys()
.any(|p| !pending.tombstones.contains(p) && probe(p))
}
fn pending_children_at(&self, dir: &Path) -> Vec<(String, PendingChildKind)> {
let pending = self.inner.pending.lock().expect("pending lock");
let mut out: BTreeMap<String, PendingChildKind> = BTreeMap::new();
let project = |path: &Path| -> Option<(String, bool)> {
let suffix = if dir.as_os_str().is_empty() {
Some(path)
} else {
path.strip_prefix(dir).ok()
}?;
let mut comps = suffix.components();
let first = comps.next()?;
let name = match first {
Component::Normal(n) => n.to_str()?.to_string(),
_ => return None,
};
let is_dir = comps.next().is_some();
Some((name, is_dir))
};
for (path, node_id) in pending.hot_by_path.iter() {
if pending.tombstones.contains(path) {
continue;
}
let Some((name, is_dir)) = project(path) else {
continue;
};
if is_dir {
out.entry(name).or_insert(PendingChildKind::Dir);
} else if let Some(buf) = pending.hot.get(node_id) {
out.insert(
name,
PendingChildKind::HotFile {
node: NodeId(*node_id),
size: buf.bytes.len() as u64,
mode: buf.mode,
},
);
}
}
for (path, entry) in pending.warm.iter() {
if pending.tombstones.contains(path) {
continue;
}
let Some((name, is_dir)) = project(path) else {
continue;
};
if is_dir {
out.entry(name).or_insert(PendingChildKind::Dir);
} else {
out.entry(name).or_insert(PendingChildKind::WarmFile {
size: entry.size,
mode: entry.mode,
});
}
}
out.into_iter().collect()
}
fn pending_bytes(&self, path: &Path) -> Result<Option<Vec<u8>>> {
let warm_blob = {
let pending = self.inner.pending.lock().expect("pending lock");
if let Some(node_id) = pending.hot_by_path.get(path)
&& let Some(buf) = pending.hot.get(node_id)
{
return Ok(Some(buf.bytes.clone()));
}
pending.warm.get(path).map(|e| e.blob)
};
match warm_blob {
Some(blob) => Ok(Some(self.load_blob_bytes(&blob)?)),
None => Ok(None),
}
}
}
#[allow(dead_code)] enum PendingHit {
Hot {
node: NodeId,
size: u64,
mode: FileMode,
},
Warm {
blob: ContentHash,
size: u64,
mode: FileMode,
},
Tombstone,
}
enum PendingChildKind {
HotFile {
node: NodeId,
size: u64,
mode: FileMode,
},
WarmFile {
size: u64,
mode: FileMode,
},
Dir,
}
impl MountInner {
fn sweep_idle_buffers(&self) -> Result<()> {
let now = Instant::now();
let idle_after = self.promotion.read().expect("promotion lock").idle_after;
let to_promote: Vec<u64> = {
let pending = self.pending.lock().expect("pending lock");
pending
.hot
.iter()
.filter(|(_, buf)| now.saturating_duration_since(buf.last_touched) >= idle_after)
.map(|(id, _)| *id)
.collect()
};
for id in to_promote {
let _ = self.flush_node(NodeId(id));
}
Ok(())
}
fn flush_node(&self, node: NodeId) -> Result<()> {
let (path, mode, bytes) = {
let mut pending = self.pending.lock().expect("pending lock");
let Some(buf) = pending.hot.remove(&node.0) else {
return Ok(());
};
pending.hot_by_path.remove(&buf.path);
(buf.path, buf.mode, buf.bytes)
};
let size = bytes.len() as u64;
let blob = Blob::new(bytes);
let blob_oid = self
.repo
.store()
.put_blob(&blob)
.map_err(MountError::Store)?;
debug!(?path, %blob_oid, size, "promoted hot buffer to CAS");
let mut pending = self.pending.lock().expect("pending lock");
pending.warm.insert(
path.clone(),
PendingEntry {
blob: blob_oid,
mode,
size,
},
);
pending.tombstones.remove(&path);
Ok(())
}
}
fn spawn_sweep_worker(inner: &Arc<MountInner>) -> Option<SweepHandle> {
let interval = inner
.promotion
.read()
.expect("promotion lock")
.sweep_interval?;
let weak = Arc::downgrade(inner);
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_for_thread = shutdown.clone();
let join = std::thread::Builder::new()
.name("heddle-mount-sweep".into())
.spawn(move || sweep_worker_loop(weak, shutdown_for_thread, interval))
.ok()?;
Some(SweepHandle {
shutdown,
join: Some(join),
})
}
fn sweep_worker_loop(
inner: std::sync::Weak<MountInner>,
shutdown: Arc<AtomicBool>,
interval: Duration,
) {
let slice = std::cmp::min(interval, Duration::from_millis(50));
let mut elapsed = Duration::ZERO;
while !shutdown.load(Ordering::SeqCst) {
std::thread::sleep(slice);
if shutdown.load(Ordering::SeqCst) {
break;
}
elapsed += slice;
if elapsed < interval {
continue;
}
elapsed = Duration::ZERO;
let Some(mount) = inner.upgrade() else {
break;
};
if let Err(err) = mount.sweep_idle_buffers() {
warn!(?err, "sweep worker hit error promoting idle buffers");
}
drop(mount);
}
}
fn resolve_thread(repo: &Repository, thread: &str) -> Result<MountState> {
let change_id = repo
.refs()
.get_thread(thread)?
.ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
let state = repo
.store()
.get_state(&change_id)?
.ok_or_else(|| MountError::UnknownThread(thread.to_string()))?;
Ok(MountState {
change_id,
tree: state.tree,
})
}
impl PlatformShell for ContentAddressedMount {
fn lookup(&self, parent: NodeId, name: &OsStr) -> Result<Option<Entry>> {
let record = self.record_for(parent)?;
let parent_path = match self.dir_path_of(&record) {
Some(p) => p,
None => return Ok(None),
};
let Some(name_str) = name.to_str() else {
return Ok(None);
};
let child_path = if parent_path.as_os_str().is_empty() {
PathBuf::from(name_str)
} else {
parent_path.join(name_str)
};
match self.pending_lookup(&child_path) {
Some(PendingHit::Tombstone) => return Ok(None),
Some(PendingHit::Hot { node, size, mode }) => {
return Ok(Some(Entry {
node,
name: OsString::from(name_str),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
}));
}
Some(PendingHit::Warm {
blob: _,
size,
mode,
}) => {
let node = self.intern(NodeRecord::PendingFile {
path: child_path.clone(),
mode,
});
return Ok(Some(Entry {
node,
name: OsString::from(name_str),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
}));
}
None => {}
}
let parent_tree = self.tree_for_record(&record)?;
if let Some(tree_entry) = parent_tree.get(name_str) {
return Ok(Some(self.entry_from_tree_entry(&parent_path, tree_entry)?));
}
if self.pending_dir_exists(&child_path) {
let node = self.intern(NodeRecord::PendingDir {
path: child_path.clone(),
});
return Ok(Some(Entry {
node,
name: OsString::from(name_str),
kind: NodeKind::Directory,
size: self.pending_children_at(&child_path).len() as u64,
unix_mode: DIR_UNIX_MODE,
}));
}
Ok(None)
}
fn read(&self, node: NodeId, offset: u64, buf: &mut [u8]) -> Result<usize> {
let record = self.record_for(node)?;
let bytes = match &record {
NodeRecord::PendingFile { path, .. } => self
.pending_bytes(path)?
.ok_or_else(|| MountError::Stale(format!("pending file {}", path.display())))?,
NodeRecord::File { blob, .. } | NodeRecord::Symlink { blob } => {
let pending = self.inner.pending.lock().expect("pending lock");
if let Some(buf) = pending.hot.get(&node.0) {
buf.bytes.clone()
} else {
drop(pending);
self.load_blob_bytes(blob)?
}
}
_ => {
return Err(MountError::NotFound(format!(
"read on non-file node {}",
node.0
)));
}
};
let offset = offset as usize;
if offset >= bytes.len() {
return Ok(0);
}
let take = std::cmp::min(buf.len(), bytes.len() - offset);
buf[..take].copy_from_slice(&bytes[offset..offset + take]);
Ok(take)
}
fn write(&self, node: NodeId, offset: u64, data: &[u8]) -> Result<usize> {
let record = self.record_for(node)?;
let (path, mode, captured_blob) = match &record {
NodeRecord::PendingFile { path, mode } => (path.clone(), *mode, None),
NodeRecord::File {
path, mode, blob, ..
} => (path.clone(), *mode, Some(*blob)),
_ => return Err(MountError::ReadOnly),
};
enum Seed {
None,
Blob(ContentHash),
}
let seed = {
let pending = self.inner.pending.lock().expect("pending lock");
if pending.hot.contains_key(&node.0)
|| pending
.hot_by_path
.get(&path)
.is_some_and(|id| pending.hot.contains_key(id))
{
Seed::None
} else if pending.tombstones.contains(&path) {
Seed::None
} else if let Some(entry) = pending.warm.get(&path) {
Seed::Blob(entry.blob)
} else if let Some(blob) = captured_blob {
Seed::Blob(blob)
} else {
Seed::None
}
};
let seed_bytes = match seed {
Seed::None => None,
Seed::Blob(hash) => Some(self.load_blob_bytes(&hash)?),
};
let mut pending = self.inner.pending.lock().expect("pending lock");
if let Some(existing_id) = pending.hot_by_path.get(&path).copied()
&& existing_id != node.0
&& let Some(buf) = pending.hot.remove(&existing_id)
{
pending.hot.insert(node.0, buf);
}
pending.hot_by_path.insert(path.clone(), node.0);
pending.tombstones.remove(&path);
let buf = pending.hot.entry(node.0).or_insert_with(|| HotBuffer {
path: path.clone(),
mode,
bytes: seed_bytes.unwrap_or_default(),
last_touched: Instant::now(),
});
let offset = offset as usize;
let end = offset + data.len();
if buf.bytes.len() < end {
buf.bytes.resize(end, 0);
}
buf.bytes[offset..end].copy_from_slice(data);
buf.last_touched = Instant::now();
let written = data.len();
drop(pending);
let _ = self.promote_idle_buffers();
Ok(written)
}
fn enumerate(&self, dir: NodeId) -> Result<Vec<Entry>> {
let record = self.record_for(dir)?;
let parent_path = match self.dir_path_of(&record) {
Some(p) => p,
None => return Err(MountError::NotADirectory(format!("{record:?}"))),
};
let tree = self.tree_for_record(&record)?;
let mut by_name: BTreeMap<String, Entry> = BTreeMap::new();
for tree_entry in tree.entries() {
let entry_path = if parent_path.as_os_str().is_empty() {
PathBuf::from(&tree_entry.name)
} else {
parent_path.join(&tree_entry.name)
};
match self.pending_lookup(&entry_path) {
Some(PendingHit::Tombstone) => continue,
Some(PendingHit::Hot { node, size, mode }) => {
by_name.insert(
tree_entry.name.clone(),
Entry {
node,
name: OsString::from(&tree_entry.name),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
},
);
continue;
}
Some(PendingHit::Warm {
blob: _,
size,
mode,
}) => {
let node = self.intern(NodeRecord::PendingFile {
path: entry_path,
mode,
});
by_name.insert(
tree_entry.name.clone(),
Entry {
node,
name: OsString::from(&tree_entry.name),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
},
);
continue;
}
None => {}
}
let entry = self.entry_from_tree_entry(&parent_path, tree_entry)?;
by_name.insert(tree_entry.name.clone(), entry);
}
let pending_children = self.pending_children_at(&parent_path);
for (name, kind) in pending_children {
if by_name.contains_key(&name) {
continue;
}
let full_path = if parent_path.as_os_str().is_empty() {
PathBuf::from(&name)
} else {
parent_path.join(&name)
};
match kind {
PendingChildKind::HotFile { node, size, mode } => {
by_name.insert(
name.clone(),
Entry {
node,
name: OsString::from(&name),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
},
);
}
PendingChildKind::WarmFile { size, mode } => {
let node = self.intern(NodeRecord::PendingFile {
path: full_path,
mode,
});
by_name.insert(
name.clone(),
Entry {
node,
name: OsString::from(&name),
kind: kind_for_mode(mode),
size,
unix_mode: mode.to_unix_mode(),
},
);
}
PendingChildKind::Dir => {
let child_count = self.pending_children_at(&full_path).len() as u64;
let node = self.intern(NodeRecord::PendingDir { path: full_path });
by_name.insert(
name.clone(),
Entry {
node,
name: OsString::from(&name),
kind: NodeKind::Directory,
size: child_count,
unix_mode: DIR_UNIX_MODE,
},
);
}
}
}
Ok(by_name.into_values().collect())
}
fn attrs(&self, node: NodeId) -> Result<Attrs> {
let record = self.record_for(node)?;
let kind = record.kind();
let unix_mode = record.unix_mode();
let (size, nlink) = match &record {
NodeRecord::Root { tree } | NodeRecord::Dir { tree, .. } => {
let tree = self.load_tree(tree)?;
(tree.entries().len() as u64, 2)
}
NodeRecord::PendingDir { path } => {
(self.pending_children_at(path).len() as u64, 2)
}
NodeRecord::File { blob, .. } | NodeRecord::Symlink { blob } => {
let pending = self.inner.pending.lock().expect("pending lock");
if let Some(buf) = pending.hot.get(&node.0) {
(buf.bytes.len() as u64, 1)
} else {
drop(pending);
(self.blob_size(blob)?, 1)
}
}
NodeRecord::PendingFile { path, .. } => {
let hit = self
.pending_lookup(path)
.ok_or_else(|| MountError::Stale(format!("pending file {}", path.display())))?;
let size = match hit {
PendingHit::Hot { size, .. } | PendingHit::Warm { size, .. } => size,
PendingHit::Tombstone => 0,
};
(size, 1)
}
};
let _ = self.path_of(&record);
Ok(Attrs {
node,
kind,
size,
unix_mode,
nlink,
mtime: self.inner.mounted_at,
})
}
fn invalidate(&self, node: NodeId) -> Result<()> {
{
let mut pending = self.inner.pending.lock().expect("pending lock");
if let Some(buf) = pending.hot.remove(&node.0) {
pending.hot_by_path.remove(&buf.path);
}
}
self.inner.inodes.lock().expect("inode lock").forget(node);
Ok(())
}
fn flush(&self, node: NodeId) -> Result<()> {
self.flush_node(node)
}
fn release(&self, node: NodeId) -> Result<()> {
self.flush_node(node)
}
}
impl ContentAddressedMount {
pub fn capture(&self, intent: impl Into<Option<String>>) -> Result<ChangeId> {
let attribution = self
.inner
.repo
.get_attribution()
.map_err(MountError::Store)?;
self.capture_with_attribution(intent, attribution)
}
#[instrument(skip(self, attribution, intent), fields(thread = %self.inner.thread))]
pub fn capture_with_attribution(
&self,
intent: impl Into<Option<String>>,
attribution: Attribution,
) -> Result<ChangeId> {
self.flush_all()?;
let state_snapshot = *self.inner.state.read().expect("mount state lock");
let parent_tree = self.load_tree(&state_snapshot.tree)?;
let tree_hash = {
let pending = self.inner.pending.lock().expect("pending lock");
apply_pending_to_tree(self.store(), &parent_tree, &pending)?
};
let parent_id = self.inner.repo.head().map_err(MountError::Store)?;
let parents = match parent_id {
Some(id) => vec![id],
None => vec![],
};
let mut state = State::new_snapshot(tree_hash, parents, attribution);
if let Some(intent) = intent.into() {
state = state.with_intent(intent);
}
state = state.with_confidence(self.inner.repo.config().defaults.confidence);
self.store().put_state(&state).map_err(MountError::Store)?;
let change_id = state.change_id;
let prev_head_change_id = state_snapshot.change_id;
match self.inner.repo.head_ref().map_err(MountError::Store)? {
Head::Attached { thread } if thread == self.inner.thread => {
self.inner
.repo
.refs()
.set_thread(&thread, &change_id)
.map_err(MountError::Store)?;
}
_ => {
self.inner
.repo
.refs()
.set_thread(&self.inner.thread, &change_id)
.map_err(MountError::Store)?;
}
}
if let Err(err) = repo::snapshot_metadata::record_snapshot_in_oplog(
&self.inner.repo,
&change_id,
Some(&prev_head_change_id),
Some(&self.inner.thread),
) {
warn!(?err, "oplog record_snapshot from mount capture failed");
}
let new_tree = self.load_tree(&tree_hash)?;
if let Err(err) = repo::snapshot_metadata::refresh_active_thread_metadata(
&self.inner.repo,
&state,
&new_tree,
) {
warn!(?err, "thread metadata refresh from mount capture failed");
}
{
let mut pending = self.inner.pending.lock().expect("pending lock");
pending.hot.clear();
pending.hot_by_path.clear();
pending.warm.clear();
pending.tombstones.clear();
}
let mut state_lock = self.inner.state.write().expect("mount state lock");
*state_lock = MountState {
change_id,
tree: tree_hash,
};
let mut inodes = self.inner.inodes.lock().expect("inode lock");
if let Some(record) = inodes.by_id.get_mut(&NodeId::ROOT.0) {
*record = NodeRecord::Root { tree: tree_hash };
}
warn!(
thread = %self.inner.thread,
change = %change_id,
"captured mount writes into new state"
);
Ok(change_id)
}
}
fn apply_pending_to_tree(
store: &dyn ObjectStore,
parent: &Tree,
pending: &Pending,
) -> Result<ContentHash> {
#[derive(Default)]
struct VDir {
files: BTreeMap<String, (ContentHash, FileMode)>,
deletions: BTreeSet<String>,
children: BTreeMap<String, VDir>,
}
let mut root = VDir::default();
fn descend<'a>(node: &'a mut VDir, components: &[&str]) -> &'a mut VDir {
let mut cursor = node;
for c in components {
cursor = cursor.children.entry((*c).to_string()).or_default();
}
cursor
}
for (path, entry) in &pending.warm {
let comps: Vec<&str> = path
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let Some((leaf_name, dir_components)) = comps.split_last() else {
continue;
};
let dir = descend(&mut root, dir_components);
dir.files
.insert((*leaf_name).to_string(), (entry.blob, entry.mode));
dir.deletions.remove(*leaf_name);
}
for tomb in &pending.tombstones {
let comps: Vec<&str> = tomb
.components()
.filter_map(|c| match c {
Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let Some((leaf_name, dir_components)) = comps.split_last() else {
continue;
};
let dir = descend(&mut root, dir_components);
dir.files.remove(*leaf_name);
dir.deletions.insert((*leaf_name).to_string());
}
fn materialize(
v: &VDir,
captured: &Tree,
store: &dyn ObjectStore,
) -> Result<Option<ContentHash>> {
let mut entries: BTreeMap<String, TreeEntry> = captured
.entries()
.iter()
.map(|e| (e.name.clone(), e.clone()))
.collect();
for name in &v.deletions {
entries.remove(name);
}
for (name, (blob, mode)) in &v.files {
let executable = matches!(mode, FileMode::Executable);
let entry = TreeEntry::file(name.clone(), *blob, executable).map_err(|e| {
MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
})?;
entries.insert(name.clone(), entry);
}
for (name, child) in &v.children {
let child_captured = match captured.get(name) {
Some(e) if e.is_tree() => store
.get_tree(&e.hash)
.map_err(MountError::Store)?
.unwrap_or_default(),
_ => Tree::new(),
};
match materialize(child, &child_captured, store)? {
Some(hash) => {
let entry = TreeEntry::directory(name.clone(), hash).map_err(|e| {
MountError::Store(objects::error::HeddleError::InvalidObject(e.to_string()))
})?;
entries.insert(name.clone(), entry);
}
None => {
entries.remove(name);
}
}
}
if entries.is_empty() {
return Ok(None);
}
let tree = Tree::from_entries(entries.into_values().collect());
let hash = store.put_tree(&tree).map_err(MountError::Store)?;
Ok(Some(hash))
}
let hash = match materialize(&root, parent, store)? {
Some(h) => h,
None => store.put_tree(&Tree::new()).map_err(MountError::Store)?,
};
Ok(hash)
}
impl ContentAddressedMount {
#[cfg(test)]
pub(crate) fn warm_keys(&self) -> Vec<PathBuf> {
self.inner
.pending
.lock()
.expect("pending lock")
.warm
.keys()
.cloned()
.collect()
}
#[cfg(test)]
pub(crate) fn warm_blob(&self, path: impl AsRef<Path>) -> Option<ContentHash> {
self.inner
.pending
.lock()
.expect("pending lock")
.warm
.get(path.as_ref())
.map(|e| e.blob)
}
#[cfg(test)]
pub(crate) fn hot_buffer_count(&self) -> usize {
self.inner.pending.lock().expect("pending lock").hot.len()
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn tombstones(&self) -> Vec<PathBuf> {
self.inner
.pending
.lock()
.expect("pending lock")
.tombstones
.iter()
.cloned()
.collect()
}
#[cfg(test)]
pub(crate) fn repo_handle(&self) -> &Repository {
&self.inner.repo
}
}
#[cfg(test)]
pub(crate) mod test_helpers {
use super::*;
pub(crate) fn install_pending_file(
mount: &ContentAddressedMount,
name: &str,
mode: FileMode,
) -> NodeId {
let path = PathBuf::from(name);
mount.intern(NodeRecord::PendingFile { path, mode })
}
}