use std::{
ffi::OsStr,
fs,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use objects::{
error::HeddleError,
object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
store::ObjectStore,
};
use repo::Repository;
use tempfile::TempDir;
use crate::{
core::ContentAddressedMount,
error::MountError,
shell::{NodeId, NodeKind, PlatformShell},
};
fn fixture() -> (TempDir, Repository) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
fs::write(temp.path().join("hello.txt"), b"world").unwrap();
fs::create_dir_all(temp.path().join("nested")).unwrap();
fs::write(temp.path().join("nested/inner.txt"), b"deep").unwrap();
fs::write(temp.path().join("nested/note.md"), b"# heading\nbody\n").unwrap();
let run_path = temp.path().join("run.sh");
fs::write(&run_path, b"#!/bin/sh\n").unwrap();
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&run_path).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&run_path, perms).unwrap();
}
repo.snapshot(Some("fixture".into()), None).unwrap();
(temp, repo)
}
fn open_mount() -> (TempDir, ContentAddressedMount) {
let (temp, repo) = fixture();
let mount = ContentAddressedMount::new(repo, "main").unwrap();
(temp, mount)
}
#[test]
fn lookup_hits_root_entry() {
let (_temp, mount) = open_mount();
let entry = mount
.lookup(NodeId::ROOT, OsStr::new("hello.txt"))
.unwrap()
.expect("hello.txt should exist");
assert_eq!(entry.kind, NodeKind::File);
assert_eq!(entry.size, 5);
assert_eq!(entry.unix_mode & 0o777, 0o644);
}
#[test]
fn lookup_misses_return_none() {
let (_temp, mount) = open_mount();
let missing = mount
.lookup(NodeId::ROOT, OsStr::new("does-not-exist"))
.unwrap();
assert!(missing.is_none());
}
#[test]
fn read_full_file() {
let (_temp, mount) = open_mount();
let node = mount.lookup_path("hello.txt").unwrap();
let mut buf = vec![0u8; 64];
let n = mount.read(node, 0, &mut buf).unwrap();
assert_eq!(&buf[..n], b"world");
}
#[test]
fn read_with_offset_returns_tail() {
let (_temp, mount) = open_mount();
let node = mount.lookup_path("nested/note.md").unwrap();
let mut buf = vec![0u8; 64];
let n = mount.read(node, 10, &mut buf).unwrap();
assert_eq!(&buf[..n], b"body\n");
}
#[test]
fn read_past_eof_yields_zero() {
let (_temp, mount) = open_mount();
let node = mount.lookup_path("hello.txt").unwrap();
let mut buf = vec![0u8; 16];
let n = mount.read(node, 9_999, &mut buf).unwrap();
assert_eq!(n, 0);
}
#[test]
fn enumerate_root_lists_all_entries() {
let (_temp, mount) = open_mount();
let entries = mount.enumerate(NodeId::ROOT).unwrap();
let names: Vec<String> = entries
.iter()
.map(|e| e.name.to_string_lossy().into_owned())
.collect();
assert!(names.contains(&"hello.txt".to_string()));
assert!(names.contains(&"nested".to_string()));
assert!(names.contains(&"run.sh".to_string()));
}
#[test]
fn enumerate_nested_lists_subdir_entries() {
let (_temp, mount) = open_mount();
let nested = mount.lookup_path("nested").unwrap();
let entries = mount.enumerate(nested).unwrap();
let names: Vec<_> = entries
.iter()
.map(|e| e.name.to_string_lossy().into_owned())
.collect();
assert_eq!(
{
let mut sorted = names.clone();
sorted.sort();
sorted
},
vec!["inner.txt".to_string(), "note.md".to_string()]
);
}
#[test]
fn attrs_distinguish_file_and_directory() {
let (_temp, mount) = open_mount();
let root_attrs = mount.attrs(NodeId::ROOT).unwrap();
assert_eq!(root_attrs.kind, NodeKind::Directory);
assert_eq!(root_attrs.unix_mode & 0o170000, 0o040000);
assert!(root_attrs.size >= 3);
let file = mount.lookup_path("hello.txt").unwrap();
let file_attrs = mount.attrs(file).unwrap();
assert_eq!(file_attrs.kind, NodeKind::File);
assert_eq!(file_attrs.size, 5);
assert_eq!(file_attrs.nlink, 1);
}
#[test]
fn attrs_preserve_executable_bit() {
let (_temp, mount) = open_mount();
let run = mount.lookup_path("run.sh").unwrap();
let attrs = mount.attrs(run).unwrap();
assert_eq!(attrs.unix_mode & 0o111, 0o111);
}
#[test]
fn write_to_overlay_then_read_back() {
let (_temp, mount) = open_mount();
let node = mount.lookup_path("hello.txt").unwrap();
let written = mount.write(node, 0, b"WORLD").unwrap();
assert_eq!(written, 5);
assert_eq!(mount.hot_buffer_count(), 1);
assert!(mount.warm_keys().is_empty());
let mut buf = vec![0u8; 16];
let n = mount.read(node, 0, &mut buf).unwrap();
assert_eq!(&buf[..n], b"WORLD");
}
fn mount_with_seed(path: &str, content: &[u8]) -> (TempDir, ContentAddressedMount) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let full = temp.path().join(path);
if let Some(parent) = full.parent() {
fs::create_dir_all(parent).unwrap();
}
fs::write(&full, content).unwrap();
repo.snapshot(Some("seed".into()), None).unwrap();
let mount = ContentAddressedMount::new(repo, "main").unwrap();
(temp, mount)
}
fn read_captured_blob(mount: &ContentAddressedMount, change_id: &ChangeId, path: &str) -> Vec<u8> {
let store = mount.repo_handle().store();
let state = store.get_state(change_id).unwrap().unwrap();
let mut tree = store.get_tree(&state.tree).unwrap().unwrap();
let comps: Vec<&str> = std::path::Path::new(path)
.components()
.filter_map(|c| match c {
std::path::Component::Normal(n) => n.to_str(),
_ => None,
})
.collect();
let (leaf, dirs) = comps.split_last().expect("non-empty path");
for dir in dirs {
let entry = tree.get(dir).expect("intermediate dir");
assert!(entry.is_tree());
tree = store.get_tree(&entry.hash).unwrap().unwrap();
}
let entry = tree.get(leaf).expect("leaf entry");
let blob = store.get_blob(&entry.hash).unwrap().unwrap();
blob.into_content()
}
#[test]
fn partial_overwrite_preserves_captured_tail() {
let (_temp, mount) = mount_with_seed("greet.txt", b"hello world\n");
let node = mount.lookup_path("greet.txt").unwrap();
mount.write(node, 0, b"HELLO").unwrap();
mount.flush(node).unwrap();
let new_id = mount.capture(Some("partial overwrite".into())).unwrap();
let bytes = read_captured_blob(&mount, &new_id, "greet.txt");
assert_eq!(bytes, b"HELLO world\n");
}
#[test]
fn partial_overwrite_at_nonzero_offset() {
let (_temp, mount) = mount_with_seed("alpha.txt", b"abcdefgh");
let node = mount.lookup_path("alpha.txt").unwrap();
mount.write(node, 3, b"XYZ").unwrap();
mount.flush(node).unwrap();
let new_id = mount.capture(Some("middle overwrite".into())).unwrap();
let bytes = read_captured_blob(&mount, &new_id, "alpha.txt");
assert_eq!(bytes, b"abcXYZgh");
}
#[test]
fn write_past_end_zero_fills() {
let (_temp, mount) = mount_with_seed("short.txt", b"abc");
let node = mount.lookup_path("short.txt").unwrap();
mount.write(node, 10, b"XYZ").unwrap();
mount.flush(node).unwrap();
let new_id = mount.capture(Some("zero fill".into())).unwrap();
let bytes = read_captured_blob(&mount, &new_id, "short.txt");
assert_eq!(bytes, b"abc\0\0\0\0\0\0\0XYZ");
assert_eq!(bytes.len(), 13);
}
#[test]
fn write_seeds_from_warm_tier_not_captured() {
let (_temp, mount) = mount_with_seed("evolving.txt", b"original");
let node = mount.lookup_path("evolving.txt").unwrap();
mount.write(node, 0, b"FIRST_VERSION").unwrap();
mount.flush(node).unwrap();
assert_eq!(mount.hot_buffer_count(), 0);
assert!(mount.warm_blob("evolving.txt").is_some());
let node2 = mount.lookup_path("evolving.txt").unwrap();
mount.write(node2, 0, b"X").unwrap();
mount.flush(node2).unwrap();
let new_id = mount.capture(Some("warm seed".into())).unwrap();
let bytes = read_captured_blob(&mount, &new_id, "evolving.txt");
assert_eq!(bytes, b"XIRST_VERSION");
}
#[test]
fn empty_buffer_for_new_file_path() {
let (_temp, mount) = fresh_mount();
let node = create_pending_file(&mount, "brand_new.txt", objects::object::FileMode::Normal);
mount.write(node, 5, b"hi").unwrap();
mount.flush(node).unwrap();
let new_id = mount.capture(Some("brand new".into())).unwrap();
let bytes = read_captured_blob(&mount, &new_id, "brand_new.txt");
assert_eq!(bytes, b"\0\0\0\0\0hi");
}
#[test]
fn write_to_directory_returns_read_only() {
let (_temp, mount) = open_mount();
let err = mount.write(NodeId::ROOT, 0, b"x").unwrap_err();
assert!(matches!(err, MountError::ReadOnly));
}
#[test]
fn unknown_thread_is_enoent_shaped() {
let (_temp, repo) = fixture();
let err = match ContentAddressedMount::new(repo, "no-such-thread") {
Ok(_) => panic!("expected unknown-thread error"),
Err(err) => err,
};
assert!(matches!(err, MountError::UnknownThread(_)));
assert_eq!(err.to_errno(), libc::ENOENT);
}
#[test]
fn invalidate_drops_the_mapping() {
let (_temp, mount) = open_mount();
let node = mount.lookup_path("hello.txt").unwrap();
mount.invalidate(node).unwrap();
let again = mount.lookup_path("hello.txt").unwrap();
let mut buf = vec![0u8; 16];
let n = mount.read(again, 0, &mut buf).unwrap();
assert_eq!(&buf[..n], b"world");
}
struct CountingStore {
inner: Box<dyn ObjectStore>,
get_blob_calls: Arc<AtomicUsize>,
blob_size_calls: Arc<AtomicUsize>,
}
impl ObjectStore for CountingStore {
fn get_blob(&self, hash: &ContentHash) -> objects::store::Result<Option<Blob>> {
self.get_blob_calls.fetch_add(1, Ordering::Relaxed);
self.inner.get_blob(hash)
}
fn put_blob(&self, blob: &Blob) -> objects::store::Result<ContentHash> {
self.inner.put_blob(blob)
}
fn has_blob(&self, hash: &ContentHash) -> objects::store::Result<bool> {
self.inner.has_blob(hash)
}
fn blob_size(&self, hash: &ContentHash) -> objects::store::Result<Option<u64>> {
self.blob_size_calls.fetch_add(1, Ordering::Relaxed);
self.inner.blob_size(hash)
}
fn get_tree(&self, hash: &ContentHash) -> objects::store::Result<Option<Tree>> {
self.inner.get_tree(hash)
}
fn put_tree(&self, tree: &Tree) -> objects::store::Result<ContentHash> {
self.inner.put_tree(tree)
}
fn has_tree(&self, hash: &ContentHash) -> objects::store::Result<bool> {
self.inner.has_tree(hash)
}
fn get_state(&self, id: &ChangeId) -> objects::store::Result<Option<State>> {
self.inner.get_state(id)
}
fn put_state(&self, state: &State) -> objects::store::Result<()> {
self.inner.put_state(state)
}
fn has_state(&self, id: &ChangeId) -> objects::store::Result<bool> {
self.inner.has_state(id)
}
fn list_states(&self) -> objects::store::Result<Vec<ChangeId>> {
self.inner.list_states()
}
fn get_action(&self, id: &ActionId) -> objects::store::Result<Option<Action>> {
self.inner.get_action(id)
}
fn put_action(&self, action: &mut Action) -> objects::store::Result<ActionId> {
self.inner.put_action(action)
}
fn list_actions(&self) -> objects::store::Result<Vec<ActionId>> {
self.inner.list_actions()
}
fn list_blobs(&self) -> objects::store::Result<Vec<ContentHash>> {
self.inner.list_blobs()
}
fn list_trees(&self) -> objects::store::Result<Vec<ContentHash>> {
self.inner.list_trees()
}
}
#[test]
fn enumerate_serves_size_without_loading_blob_bytes() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
fs::write(temp.path().join("a.txt"), b"first").unwrap();
fs::write(temp.path().join("b.txt"), b"second-larger-payload").unwrap();
fs::write(temp.path().join("c.txt"), vec![0u8; 4096]).unwrap();
repo.snapshot(Some("fixture".into()), None).unwrap();
drop(repo);
let get_blob_calls = Arc::new(AtomicUsize::new(0));
let blob_size_calls = Arc::new(AtomicUsize::new(0));
let inner: Box<dyn ObjectStore> =
Box::new(objects::store::FsStore::new(temp.path().join(".heddle")));
let store = CountingStore {
inner,
get_blob_calls: get_blob_calls.clone(),
blob_size_calls: blob_size_calls.clone(),
};
let repo = Repository::open_with_store(temp.path().join(".heddle"), Box::new(store)).unwrap();
let mount = ContentAddressedMount::new(repo, "main").unwrap();
let entries = mount.enumerate(NodeId::ROOT).unwrap();
let names: Vec<_> = entries
.iter()
.map(|e| e.name.to_string_lossy().into_owned())
.collect();
assert!(names.contains(&"a.txt".to_string()));
assert!(names.contains(&"b.txt".to_string()));
assert!(names.contains(&"c.txt".to_string()));
let a_size = entries
.iter()
.find(|e| e.name == "a.txt")
.map(|e| e.size)
.unwrap();
assert_eq!(a_size, 5);
let c_size = entries
.iter()
.find(|e| e.name == "c.txt")
.map(|e| e.size)
.unwrap();
assert_eq!(c_size, 4096);
assert_eq!(
get_blob_calls.load(Ordering::Relaxed),
0,
"enumerate() pulled blob bytes when only size was needed"
);
assert!(
blob_size_calls.load(Ordering::Relaxed) >= 3,
"expected blob_size to be called at least once per blob entry"
);
let prior_get_blob = get_blob_calls.load(Ordering::Relaxed);
let node = mount.lookup_path("c.txt").unwrap();
let _attrs = mount.attrs(node).unwrap();
assert_eq!(
get_blob_calls.load(Ordering::Relaxed),
prior_get_blob,
"attrs() pulled blob bytes when only size was needed"
);
}
fn fresh_mount() -> (TempDir, ContentAddressedMount) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let mount = ContentAddressedMount::new(repo, "main").unwrap();
(temp, mount)
}
fn create_pending_file(
mount: &ContentAddressedMount,
name: &str,
mode: objects::object::FileMode,
) -> NodeId {
use crate::core::test_helpers::install_pending_file;
install_pending_file(mount, name, mode)
}
#[test]
fn write_then_read_same_file() {
let (_temp, mount) = fresh_mount();
let node = create_pending_file(&mount, "draft.txt", objects::object::FileMode::Normal);
let written = mount.write(node, 0, b"hello mount").unwrap();
assert_eq!(written, 11);
let mut buf = vec![0u8; 64];
let n = mount.read(node, 0, &mut buf).unwrap();
assert_eq!(&buf[..n], b"hello mount");
}
#[test]
fn flush_promotes_buffer_to_warm_tier() {
let (_temp, mount) = fresh_mount();
let node = create_pending_file(&mount, "out.txt", objects::object::FileMode::Normal);
mount.write(node, 0, b"promote me").unwrap();
assert_eq!(mount.hot_buffer_count(), 1);
assert!(mount.warm_keys().is_empty());
mount.flush(node).unwrap();
assert_eq!(mount.hot_buffer_count(), 0, "hot buffer should be drained");
let warm = mount.warm_keys();
assert_eq!(warm.len(), 1);
assert_eq!(warm[0], std::path::PathBuf::from("out.txt"));
}
#[test]
fn lookup_after_write_serves_new_content() {
let (_temp, mount) = fresh_mount();
let node = create_pending_file(&mount, "fresh.md", objects::object::FileMode::Normal);
mount.write(node, 0, b"# fresh\n").unwrap();
let looked_up = mount.lookup_path("fresh.md").unwrap();
let mut buf = vec![0u8; 64];
let n = mount.read(looked_up, 0, &mut buf).unwrap();
assert_eq!(&buf[..n], b"# fresh\n");
mount.flush(node).unwrap();
let looked_up_warm = mount.lookup_path("fresh.md").unwrap();
let n = mount.read(looked_up_warm, 0, &mut buf).unwrap();
assert_eq!(&buf[..n], b"# fresh\n");
}
#[test]
fn cross_thread_blob_dedup() {
let temp = TempDir::new().unwrap();
let repo_a = Repository::init_default(temp.path()).unwrap();
let main_id = repo_a.refs().get_thread("main").unwrap().unwrap();
repo_a.refs().set_thread("feature", &main_id).unwrap();
drop(repo_a);
let repo_main = Repository::open(temp.path()).unwrap();
let mount_main = ContentAddressedMount::new(repo_main, "main").unwrap();
let repo_feat = Repository::open(temp.path()).unwrap();
let mount_feat = ContentAddressedMount::new(repo_feat, "feature").unwrap();
let payload = b"shared module content\n// dedup demo\n";
let n_main = create_pending_file(&mount_main, "lib.rs", objects::object::FileMode::Normal);
mount_main.write(n_main, 0, payload).unwrap();
mount_main.flush(n_main).unwrap();
let n_feat = create_pending_file(&mount_feat, "module.rs", objects::object::FileMode::Normal);
mount_feat.write(n_feat, 0, payload).unwrap();
mount_feat.flush(n_feat).unwrap();
let oid_a = mount_main.warm_blob("lib.rs").expect("a promoted");
let oid_b = mount_feat.warm_blob("module.rs").expect("b promoted");
assert_eq!(
oid_a, oid_b,
"identical content must hash to the same blob_oid across threads"
);
let repo_check = Repository::open(temp.path()).unwrap();
let blobs = repo_check.store().list_blobs().unwrap();
let count = blobs.iter().filter(|h| **h == oid_a).count();
assert_eq!(
count, 1,
"writing the same payload to two threads must yield exactly one blob in the store"
);
}
#[test]
fn capture_builds_state_and_advances_thread() {
let (_temp, mount) = fresh_mount();
let n1 = create_pending_file(&mount, "alpha.txt", objects::object::FileMode::Normal);
mount.write(n1, 0, b"alpha").unwrap();
let n2 = create_pending_file(&mount, "beta.txt", objects::object::FileMode::Normal);
mount.write(n2, 0, b"beta!").unwrap();
mount.flush_all().unwrap();
let alpha_oid = mount.warm_blob("alpha.txt").unwrap();
let beta_oid = mount.warm_blob("beta.txt").unwrap();
let prior_head = mount.current_change_id();
let new_id = mount.capture(Some("two files".to_string())).unwrap();
assert_ne!(new_id, prior_head, "capture should advance the thread");
let new_state = match dig_state(&mount, &new_id) {
Some(s) => s,
None => panic!("captured state not found in store"),
};
let new_tree = mount
.repo_handle()
.store()
.get_tree(&new_state.tree)
.unwrap()
.unwrap();
let names: Vec<&str> = new_tree.entries().iter().map(|e| e.name.as_str()).collect();
assert!(names.contains(&"alpha.txt"));
assert!(names.contains(&"beta.txt"));
assert_eq!(
new_tree
.get("alpha.txt")
.map(|e| e.hash)
.expect("alpha entry"),
alpha_oid
);
assert_eq!(
new_tree
.get("beta.txt")
.map(|e| e.hash)
.expect("beta entry"),
beta_oid
);
let repo_check = mount.repo_handle();
let head = repo_check.refs().get_thread("main").unwrap().unwrap();
assert_eq!(head, new_id);
}
fn dig_state(mount: &ContentAddressedMount, id: &ChangeId) -> Option<State> {
mount.repo_handle().store().get_state(id).ok().flatten()
}
#[allow(dead_code)]
fn _force_unused(e: HeddleError) -> HeddleError {
e
}
fn lookup_path_via_components(
mount: &ContentAddressedMount,
path: &str,
) -> Option<crate::shell::Entry> {
let mut node = NodeId::ROOT;
let mut last = None;
for comp in std::path::Path::new(path).components() {
let std::path::Component::Normal(name) = comp else {
continue;
};
let entry = mount.lookup(node, name).ok().flatten()?;
node = entry.node;
last = Some(entry);
}
last
}
#[test]
fn capture_nested_new_file_under_existing_dir() {
let (_temp, mount) = open_mount();
let node = create_pending_file(&mount, "nested/extra.rs", objects::object::FileMode::Normal);
mount.write(node, 0, b"// fresh\n").unwrap();
mount.flush_all().unwrap();
let extra_blob = mount.warm_blob("nested/extra.rs").unwrap();
let new_id = mount.capture(Some("nested write".into())).unwrap();
let store = mount.repo_handle().store();
let state = store.get_state(&new_id).unwrap().unwrap();
let root_tree = store.get_tree(&state.tree).unwrap().unwrap();
let nested_entry = root_tree.get("nested").expect("nested dir");
assert!(nested_entry.is_tree());
let nested = store.get_tree(&nested_entry.hash).unwrap().unwrap();
let names: Vec<&str> = nested.entries().iter().map(|e| e.name.as_str()).collect();
assert!(names.contains(&"inner.txt"));
assert!(names.contains(&"note.md"));
assert!(names.contains(&"extra.rs"));
assert_eq!(nested.get("extra.rs").unwrap().hash, extra_blob);
}
#[test]
fn capture_creates_new_intermediate_dirs() {
let (_temp, mount) = open_mount();
let node = create_pending_file(&mount, "newdir/bar.rs", objects::object::FileMode::Normal);
mount.write(node, 0, b"newcontent").unwrap();
mount.flush_all().unwrap();
let new_id = mount.capture(Some("new dir".into())).unwrap();
let store = mount.repo_handle().store();
let state = store.get_state(&new_id).unwrap().unwrap();
let root_tree = store.get_tree(&state.tree).unwrap().unwrap();
let newdir_entry = root_tree.get("newdir").expect("newdir created");
assert!(newdir_entry.is_tree());
let newdir = store.get_tree(&newdir_entry.hash).unwrap().unwrap();
assert_eq!(newdir.entries().len(), 1);
assert_eq!(newdir.entries()[0].name, "bar.rs");
}
#[test]
fn capture_handles_multiple_files_at_multiple_depths() {
let (_temp, mount) = open_mount();
let paths = [
"top.txt",
"nested/extra.rs",
"deep/again/level3.rs",
"deep/sibling.rs",
];
for p in paths {
let node = create_pending_file(&mount, p, objects::object::FileMode::Normal);
mount.write(node, 0, p.as_bytes()).unwrap();
}
mount.flush_all().unwrap();
let new_id = mount.capture(Some("many paths".into())).unwrap();
let store = mount.repo_handle().store();
let state = store.get_state(&new_id).unwrap().unwrap();
let root_tree = store.get_tree(&state.tree).unwrap().unwrap();
for p in paths {
let entry = root_tree.get_path(std::path::Path::new(p));
let resolved = if entry.is_some() {
entry.cloned()
} else {
let mut current = root_tree.clone();
let mut last = None;
for comp in std::path::Path::new(p).components() {
let std::path::Component::Normal(name) = comp else {
continue;
};
let name = name.to_str().unwrap();
match current.get(name).cloned() {
Some(e) if e.is_tree() => {
current = store.get_tree(&e.hash).unwrap().unwrap();
last = Some(e);
}
Some(e) => {
last = Some(e);
break;
}
None => {
last = None;
break;
}
}
}
last
};
assert!(resolved.is_some(), "path {} missing in captured tree", p);
}
}
#[test]
fn lookup_serves_implicit_pending_dir_before_capture() {
let (_temp, mount) = fresh_mount();
let node = create_pending_file(&mount, "newdir/foo.rs", objects::object::FileMode::Normal);
mount.write(node, 0, b"hello").unwrap();
let dir_entry = lookup_path_via_components(&mount, "newdir")
.expect("newdir resolves as implicit pending dir");
assert_eq!(dir_entry.kind, NodeKind::Directory);
let file_entry = lookup_path_via_components(&mount, "newdir/foo.rs")
.expect("newdir/foo.rs resolves through implicit dir");
let mut buf = vec![0u8; 16];
let n = mount.read(file_entry.node, 0, &mut buf).unwrap();
assert_eq!(&buf[..n], b"hello");
}
#[test]
fn capture_unlink_prunes_empty_parent_trees() {
let (_temp, mount) = fresh_mount();
let node = create_pending_file(&mount, "dir/only.rs", objects::object::FileMode::Normal);
mount.write(node, 0, b"x").unwrap();
mount.flush_all().unwrap();
let _first = mount.capture(Some("plant".into())).unwrap();
mount.unlink_path("dir/only.rs").unwrap();
let second = mount.capture(Some("delete".into())).unwrap();
let store = mount.repo_handle().store();
let state = store.get_state(&second).unwrap().unwrap();
let root_tree = store.get_tree(&state.tree).unwrap().unwrap();
assert!(
root_tree.get("dir").is_none(),
"empty `dir/` should have been pruned, found: {:?}",
root_tree
.entries()
.iter()
.map(|e| &e.name)
.collect::<Vec<_>>()
);
}
#[test]
fn capture_unlink_drops_only_named_path() {
let (_temp, mount) = fresh_mount();
for p in ["dir/keep.rs", "dir/drop.rs"] {
let node = create_pending_file(&mount, p, objects::object::FileMode::Normal);
mount.write(node, 0, p.as_bytes()).unwrap();
}
mount.flush_all().unwrap();
let _first = mount.capture(Some("plant".into())).unwrap();
mount.unlink_path("dir/drop.rs").unwrap();
let second = mount.capture(Some("delete".into())).unwrap();
let store = mount.repo_handle().store();
let state = store.get_state(&second).unwrap().unwrap();
let root_tree = store.get_tree(&state.tree).unwrap().unwrap();
let dir_entry = root_tree.get("dir").expect("dir survives");
let dir = store.get_tree(&dir_entry.hash).unwrap().unwrap();
let names: Vec<&str> = dir.entries().iter().map(|e| e.name.as_str()).collect();
assert_eq!(names, vec!["keep.rs"]);
}
#[test]
fn capture_records_oplog_entry() {
use oplog::OpRecord;
let (_temp, mount) = fresh_mount();
let prior_count = mount
.repo_handle()
.oplog()
.recent(1024)
.map(|v| v.len())
.unwrap_or(0);
let node = create_pending_file(&mount, "x.txt", objects::object::FileMode::Normal);
mount.write(node, 0, b"y").unwrap();
mount.flush_all().unwrap();
let new_id = mount.capture(Some("oplog".into())).unwrap();
let entries = mount.repo_handle().oplog().recent(1024).unwrap();
assert!(entries.len() > prior_count, "oplog entry count grew");
let saw_snapshot = entries.iter().any(|entry| {
matches!(&entry.operation, OpRecord::Snapshot { new_state, .. } if *new_state == new_id)
});
assert!(
saw_snapshot,
"expected a Snapshot oplog entry pointing at the new state {:?}",
new_id
);
}
#[test]
fn capture_refreshes_thread_metadata_when_thread_record_exists() {
use chrono::Utc;
use repo::{Thread, ThreadManager, ThreadMode, ThreadState};
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let base_state_id = repo.refs().get_thread("main").unwrap().unwrap();
let base_root = repo
.store()
.get_state(&base_state_id)
.unwrap()
.unwrap()
.tree
.to_hex();
let manager = ThreadManager::new(repo.heddle_dir());
let thread = Thread {
id: "main".to_string(),
thread: "main".to_string(),
target_thread: None,
parent_thread: None,
mode: ThreadMode::Virtualized,
state: ThreadState::Active,
base_state: base_state_id.short(),
base_root,
current_state: None,
merged_state: None,
task: None,
execution_path: temp.path().to_path_buf(),
materialized_path: None,
changed_paths: Vec::new(),
impact_categories: Vec::new(),
heavy_impact_paths: Vec::new(),
promotion_suggested: false,
freshness: repo::ThreadFreshness::Unknown,
verification_summary: Default::default(),
confidence_summary: Default::default(),
integration_policy_result: Default::default(),
created_at: Utc::now(),
updated_at: Utc::now(),
ephemeral: None,
auto: false,
shared_target_dir: None,
};
manager.save(&thread).unwrap();
let mount = ContentAddressedMount::new(repo, "main").unwrap();
let node = create_pending_file(
&mount,
"src/calculator.rs",
objects::object::FileMode::Normal,
);
mount.write(node, 0, b"// fresh\n").unwrap();
mount.flush_all().unwrap();
let new_id = mount.capture(Some("metadata refresh".into())).unwrap();
let manager = ThreadManager::new(mount.repo_handle().heddle_dir());
let updated = manager.load("main").unwrap().expect("thread row");
assert!(
updated
.changed_paths
.iter()
.any(|p| p == "src/calculator.rs"),
"changed_paths={:?}",
updated.changed_paths
);
assert_eq!(updated.current_state, Some(new_id.short()));
}
#[test]
fn capture_with_explicit_attribution_lands_on_state() {
use objects::object::{Agent, Attribution, Principal};
let (_temp, mount) = fresh_mount();
let node = create_pending_file(&mount, "x.txt", objects::object::FileMode::Normal);
mount.write(node, 0, b"y").unwrap();
mount.flush_all().unwrap();
let attribution = Attribution::with_agent(
Principal::new("Test User", "test@example.com"),
Agent::new("anthropic", "claude-3"),
);
let new_id = mount
.capture_with_attribution(Some("attrib".into()), attribution.clone())
.unwrap();
let state = mount
.repo_handle()
.store()
.get_state(&new_id)
.unwrap()
.unwrap();
assert_eq!(
state.attribution.agent.as_ref().map(|a| a.provider.clone()),
Some("anthropic".to_string()),
);
assert_eq!(
state.attribution.agent.as_ref().map(|a| a.model.clone()),
Some("claude-3".to_string()),
);
}
#[test]
fn clock_sweep_promotes_idle_buffers() {
use std::time::Duration;
let (_temp, repo) = fixture();
let mount = ContentAddressedMount::new(repo, "main")
.unwrap()
.with_promotion_policy(crate::core::PromotionPolicy {
idle_after: Duration::from_millis(50),
sweep_interval: Some(Duration::from_millis(80)),
});
let node = create_pending_file(&mount, "draft.txt", objects::object::FileMode::Normal);
mount.write(node, 0, b"sleeping").unwrap();
assert_eq!(mount.hot_buffer_count(), 1);
assert!(mount.warm_keys().is_empty());
std::thread::sleep(Duration::from_millis(400));
assert_eq!(
mount.hot_buffer_count(),
0,
"clock sweep should have promoted the idle buffer"
);
assert_eq!(mount.warm_keys().len(), 1);
}
#[test]
fn no_sweep_interval_disables_clock_promotion() {
use std::time::Duration;
let (_temp, repo) = fixture();
let mount = ContentAddressedMount::new(repo, "main")
.unwrap()
.with_promotion_policy(crate::core::PromotionPolicy {
idle_after: Duration::from_millis(50),
sweep_interval: None,
});
let node = create_pending_file(&mount, "draft.txt", objects::object::FileMode::Normal);
mount.write(node, 0, b"sleeping").unwrap();
std::thread::sleep(Duration::from_millis(250));
assert_eq!(mount.hot_buffer_count(), 1);
assert!(mount.warm_keys().is_empty());
}
#[test]
fn drop_joins_sweep_thread_cleanly() {
use std::{sync::mpsc::channel, time::Duration};
let (tx, rx) = channel();
let join = std::thread::spawn(move || {
let (_temp, repo) = fixture();
let mount = ContentAddressedMount::new(repo, "main")
.unwrap()
.with_promotion_policy(crate::core::PromotionPolicy {
idle_after: Duration::from_millis(20),
sweep_interval: Some(Duration::from_millis(30)),
});
let node = create_pending_file(&mount, "x.txt", objects::object::FileMode::Normal);
mount.write(node, 0, b"k").unwrap();
std::thread::sleep(Duration::from_millis(80));
drop(mount);
let _ = tx.send(());
});
let result = rx.recv_timeout(Duration::from_secs(5));
let join_result = join.join();
assert!(result.is_ok(), "drop did not complete within 5s");
assert!(join_result.is_ok(), "test thread panicked");
}
#[test]
fn crash_recovery_warm_durable_hot_lost() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
{
let mount = ContentAddressedMount::new(repo, "main")
.unwrap()
.with_promotion_policy(crate::core::PromotionPolicy {
idle_after: std::time::Duration::from_secs(3600),
sweep_interval: None,
});
let n1 = create_pending_file(&mount, "durable.txt", objects::object::FileMode::Normal);
mount.write(n1, 0, b"durable").unwrap();
mount.flush(n1).unwrap(); let n2 = create_pending_file(&mount, "transient.txt", objects::object::FileMode::Normal);
mount.write(n2, 0, b"gone").unwrap();
let durable_blob = mount.warm_blob("durable.txt").unwrap();
let blobs_before = mount.repo_handle().store().list_blobs().unwrap();
assert!(blobs_before.contains(&durable_blob));
}
let repo = Repository::open(temp.path()).unwrap();
let mount = ContentAddressedMount::new(repo, "main").unwrap();
let lookup = mount
.lookup(NodeId::ROOT, OsStr::new("transient.txt"))
.unwrap();
assert!(
lookup.is_none(),
"hot-tier-only file should be gone after crash"
);
let durable_lookup = mount
.lookup(NodeId::ROOT, OsStr::new("durable.txt"))
.unwrap();
assert!(
durable_lookup.is_none(),
"warm-tier-only file (no capture) is not in the captured tree"
);
}
#[test]
fn cross_thread_blob_dedup_at_scale() {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let main_id = repo.refs().get_thread("main").unwrap().unwrap();
for i in 0..9 {
repo.refs()
.set_thread(&format!("feat-{i}"), &main_id)
.unwrap();
}
drop(repo);
let thread_names: Vec<String> = std::iter::once("main".to_string())
.chain((0..9).map(|i| format!("feat-{i}")))
.collect();
let shared_count = 5; for name in &thread_names {
let r = Repository::open(temp.path()).unwrap();
let mount = ContentAddressedMount::new(r, name).unwrap();
for i in 0..10 {
let path = format!("file{i}.txt");
let node = create_pending_file(&mount, &path, objects::object::FileMode::Normal);
let bytes = if i < shared_count {
format!("shared-content-{i}\n").into_bytes()
} else {
format!("unique-{name}-{i}\n").into_bytes()
};
mount.write(node, 0, &bytes).unwrap();
}
mount.flush_all().unwrap();
}
let repo = Repository::open(temp.path()).unwrap();
let blobs: std::collections::HashSet<_> =
repo.store().list_blobs().unwrap().into_iter().collect();
let expected_unique = shared_count + 5 * 10;
assert!(
blobs.len() >= expected_unique && blobs.len() <= expected_unique + 4,
"expected ~{expected_unique} distinct blobs, got {}",
blobs.len()
);
}
mod proptests {
use std::collections::BTreeMap;
use proptest::prelude::*;
use super::*;
use crate::core::test_helpers::install_pending_file;
#[derive(Clone, Debug)]
enum Op {
WriteFresh {
name: String,
offset: u64,
bytes: Vec<u8>,
},
Flush { name: String },
Read { name: String, len: usize },
Capture,
Unlink { name: String },
}
fn strategy_name() -> impl Strategy<Value = String> {
prop_oneof![
Just("a.txt".to_string()),
Just("b.txt".to_string()),
Just("nested/c.txt".to_string()),
Just("nested/d.txt".to_string()),
]
}
fn strategy_op() -> impl Strategy<Value = Op> {
prop_oneof![
(
strategy_name(),
0u64..32,
proptest::collection::vec(any::<u8>(), 1..32)
)
.prop_map(|(name, offset, bytes)| Op::WriteFresh {
name,
offset,
bytes,
}),
strategy_name().prop_map(|name| Op::Flush { name }),
(strategy_name(), 1usize..64usize).prop_map(|(name, len)| Op::Read { name, len }),
Just(Op::Capture),
strategy_name().prop_map(|name| Op::Unlink { name }),
]
}
fn model_pwrite(buf: &mut Vec<u8>, offset: usize, data: &[u8]) {
let end = offset + data.len();
if buf.len() < end {
buf.resize(end, 0);
}
buf[offset..end].copy_from_slice(data);
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 256,
.. ProptestConfig::default()
})]
#[test]
fn mount_matches_in_memory_model(ops in proptest::collection::vec(strategy_op(), 1..32)) {
let temp = TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
let mount = ContentAddressedMount::new(repo, "main").unwrap().with_promotion_policy(
crate::core::PromotionPolicy {
idle_after: std::time::Duration::from_secs(3600),
sweep_interval: None,
},
);
let mut open: BTreeMap<String, NodeId> = BTreeMap::new();
let mut model: BTreeMap<String, Vec<u8>> = BTreeMap::new();
for op in ops {
match op {
Op::WriteFresh { name, offset, bytes } => {
let node = if let Some(entry) =
lookup_path_via_components(&mount, &name)
{
entry.node
} else {
install_pending_file(
&mount,
&name,
objects::object::FileMode::Normal,
)
};
mount.write(node, offset, &bytes).unwrap();
open.insert(name.clone(), node);
let buf = model.entry(name).or_default();
model_pwrite(buf, offset as usize, &bytes);
}
Op::Flush { name } => {
if let Some(node) = open.remove(&name) {
mount.flush(node).unwrap();
}
}
Op::Read { name, len } => {
let lookup = lookup_path_via_components(&mount, &name);
match (lookup, model.get(&name)) {
(Some(entry), Some(model_bytes)) => {
let mut buf = vec![0u8; len];
let n = mount.read(entry.node, 0, &mut buf).unwrap();
let take = std::cmp::min(len, model_bytes.len());
prop_assert_eq!(&buf[..n], &model_bytes[..take]);
}
(None, None) => {}
(Some(_), None) => prop_assert!(false, "mount has {} but model doesn't", name),
(None, Some(_)) => prop_assert!(false, "model has {} but mount doesn't", name),
}
}
Op::Capture => {
let _ = mount.capture(Some("proptest".into())).unwrap();
open.clear();
}
Op::Unlink { name } => {
mount.unlink_path(&name).unwrap();
open.remove(&name);
model.remove(&name);
}
}
for path in [
"a.txt",
"b.txt",
"nested/c.txt",
"nested/d.txt",
] {
let entry = lookup_path_via_components(&mount, path);
match (entry, model.get(path)) {
(Some(e), Some(model_bytes)) => {
let mut buf = vec![0u8; model_bytes.len().max(1)];
let n = mount.read(e.node, 0, &mut buf).unwrap();
prop_assert_eq!(
&buf[..n],
&model_bytes[..],
"mount/model mismatch on read of {}",
path
);
}
(None, None) => {}
(Some(_), None) => prop_assert!(
false,
"mount resolved {} but model has no entry (tombstone leak?)",
path
),
(None, Some(_)) => prop_assert!(
false,
"model has {} but mount lookup returned None (tombstone obscures hot/warm?)",
path
),
}
}
}
for path in [
"a.txt",
"b.txt",
"nested/c.txt",
"nested/d.txt",
] {
let entry = lookup_path_via_components(&mount, path);
match (entry, model.get(path)) {
(Some(e), Some(bytes)) => {
let mut buf = vec![0u8; bytes.len().max(1)];
let n = mount.read(e.node, 0, &mut buf).unwrap();
prop_assert_eq!(&buf[..n], &bytes[..]);
}
(None, None) => {}
(Some(_), None) => prop_assert!(
false,
"final: mount resolved {} but model deleted it",
path
),
(None, Some(_)) => prop_assert!(
false,
"final: model has {} but mount returned None",
path
),
}
}
}
}
}
#[cfg(all(target_os = "linux", feature = "fuse"))]
mod fuse_smoke {
use std::time::Duration;
use super::*;
use crate::FuseShell;
fn fuse_available() -> bool {
std::path::Path::new("/dev/fuse").exists()
}
fn wait_until_exists(path: &std::path::Path, deadline: Duration) {
let start = std::time::Instant::now();
while !path.exists() && start.elapsed() < deadline {
std::thread::sleep(Duration::from_millis(20));
}
}
#[test]
fn fuse_open_close_read_round_trip() {
if !fuse_available() {
eprintln!("skipping: /dev/fuse not present");
return;
}
let repo_dir = TempDir::new().unwrap();
let repo = Repository::init_default(repo_dir.path()).unwrap();
std::fs::write(repo_dir.path().join("seed.txt"), b"hello").unwrap();
repo.snapshot(Some("seed".into()), None).unwrap();
let mount = ContentAddressedMount::new(repo, "main").unwrap();
let mountpoint = TempDir::new().unwrap();
let session = match FuseShell::new(mount).mount_background(mountpoint.path()) {
Ok(s) => s,
Err(_) => {
eprintln!("skipping: FUSE mount failed (likely no kernel module)");
return;
}
};
wait_until_exists(&mountpoint.path().join("seed.txt"), Duration::from_secs(5));
let read = std::fs::read_to_string(mountpoint.path().join("seed.txt")).unwrap();
assert_eq!(read, "hello");
drop(session);
}
}