use std::{
collections::{HashMap, HashSet},
ffi::{CStr, CString, OsStr, OsString},
fmt,
fs::{File, canonicalize},
io::{BufRead, Read, Write},
os::{
fd::{AsFd, BorrowedFd, OwnedFd},
unix::ffi::OsStrExt,
},
path::{Path, PathBuf},
sync::Arc,
thread::available_parallelism,
};
use log::{debug, trace};
use tokio::sync::Semaphore;
use anyhow::{Context, Result, bail, ensure};
use fn_error_context::context;
use once_cell::sync::OnceCell;
use rustix::{
fs::{
Access, AtFlags, CWD, Dir, FileType, FlockOperation, Mode, OFlags, StatVfsMountFlags,
accessat, flock, fstatvfs, linkat, mkdirat, openat, readlinkat, statat, syncfs, unlinkat,
},
io::{Errno, Result as ErrnoResult},
};
use crate::{
fsverity::{
Algorithm, CompareVerityError, DEFAULT_LG_BLOCKSIZE, EnableVerityError, FsVerityHashValue,
FsVerityHasher, MeasureVerityError, compute_verity, enable_verity_maybe_copy,
ensure_verity_equal, has_verity, measure_verity, measure_verity_opt,
},
mount::{composefs_fsmount, mount_at},
shared_internals::IO_BUF_CAPACITY,
splitstream::{SplitStreamReader, SplitStreamWriter},
util::{ErrnoFilter, proc_self_fd, reopen_tmpfile_ro, replace_symlinkat},
};
pub const REPO_METADATA_FILENAME: &str = "meta.json";
#[derive(Debug, thiserror::Error)]
pub enum RepositoryOpenError {
#[error(
"{REPO_METADATA_FILENAME} not found; this repository must be initialized with `cfsctl init`"
)]
MetadataMissing,
#[error(
"{REPO_METADATA_FILENAME} not found; this appears to be an old-format repository — use Repository::open_upgrade() or `cfsctl init` to migrate"
)]
OldFormatRepository,
#[error("failed to parse {REPO_METADATA_FILENAME}")]
MetadataInvalid(#[source] serde_json::Error),
#[error("repository algorithm {found} does not match expected {expected}")]
AlgorithmMismatch {
found: Algorithm,
expected: Algorithm,
},
#[error(
"unsupported repository format version {found} (this tool supports up to {REPO_FORMAT_VERSION})"
)]
UnsupportedVersion {
found: u32,
},
#[error("repository requires unknown incompatible features: {0:?}")]
IncompatibleFeatures(Vec<String>),
#[error(transparent)]
Io(std::io::Error),
}
impl From<Errno> for RepositoryOpenError {
fn from(e: Errno) -> Self {
Self::Io(e.into())
}
}
impl From<std::io::Error> for RepositoryOpenError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
pub const REPO_FORMAT_VERSION: u32 = 1;
pub mod known_features {
pub const COMPAT: &[&str] = &[];
pub const RO_COMPAT: &[&str] = &[];
pub const INCOMPAT: &[&str] = &[];
}
#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct FeatureFlags {
#[serde(default)]
pub compatible: Vec<String>,
#[serde(default)]
pub read_only_compatible: Vec<String>,
#[serde(default)]
pub incompatible: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FeatureCheck {
ReadWrite,
ReadOnly(Vec<String>),
}
impl FeatureFlags {
pub fn check(&self) -> Result<FeatureCheck, RepositoryOpenError> {
let unknown_incompat: Vec<String> = self
.incompatible
.iter()
.filter(|f| !known_features::INCOMPAT.contains(&f.as_str()))
.cloned()
.collect();
if !unknown_incompat.is_empty() {
return Err(RepositoryOpenError::IncompatibleFeatures(unknown_incompat));
}
let unknown_ro: Vec<String> = self
.read_only_compatible
.iter()
.filter(|f| !known_features::RO_COMPAT.contains(&f.as_str()))
.cloned()
.collect();
if !unknown_ro.is_empty() {
return Ok(FeatureCheck::ReadOnly(unknown_ro));
}
Ok(FeatureCheck::ReadWrite)
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct RepoMetadata {
pub version: u32,
pub algorithm: Algorithm,
#[serde(default)]
pub features: FeatureFlags,
}
impl RepoMetadata {
pub fn for_hash<ObjectID: FsVerityHashValue>() -> Self {
Self {
version: REPO_FORMAT_VERSION,
algorithm: Algorithm::for_hash::<ObjectID>(),
features: FeatureFlags::default(),
}
}
pub fn new(algorithm: Algorithm) -> Self {
Self {
version: REPO_FORMAT_VERSION,
algorithm,
features: FeatureFlags::default(),
}
}
pub fn check_compatible<ObjectID: FsVerityHashValue>(
&self,
) -> Result<FeatureCheck, RepositoryOpenError> {
if self.version > REPO_FORMAT_VERSION {
return Err(RepositoryOpenError::UnsupportedVersion {
found: self.version,
});
}
if !self.algorithm.is_compatible::<ObjectID>() {
return Err(RepositoryOpenError::AlgorithmMismatch {
found: self.algorithm,
expected: Algorithm::for_hash::<ObjectID>(),
});
}
let access = self.features.check()?;
Ok(access)
}
pub fn to_json(&self) -> Result<Vec<u8>> {
let mut buf = serde_json::to_vec_pretty(self).context("serializing repository metadata")?;
buf.push(b'\n');
Ok(buf)
}
#[context("Parsing repository metadata JSON")]
pub fn from_json(data: &[u8]) -> Result<Self> {
serde_json::from_slice(data).context("deserializing repository metadata")
}
}
#[context("Reading repository algorithm")]
pub fn read_repo_algorithm(repo_fd: &impl AsFd) -> Result<Option<Algorithm>> {
Ok(read_repo_metadata(repo_fd)?.map(|m| m.algorithm))
}
#[context("Reading repository metadata")]
pub(crate) fn read_repo_metadata(repo_fd: &impl AsFd) -> Result<Option<RepoMetadata>> {
match openat(
repo_fd,
REPO_METADATA_FILENAME,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
) {
Ok(fd) => {
let meta = serde_json::from_reader(std::io::BufReader::new(File::from(fd)))
.context("parsing meta.json")?;
Ok(Some(meta))
}
Err(Errno::NOENT) => Ok(None),
Err(e) => Err(e).context("opening meta.json")?,
}
}
fn enable_verity_for_algorithm(
dirfd: &impl AsFd,
fd: BorrowedFd,
algorithm: &Algorithm,
) -> Result<()> {
match algorithm {
Algorithm::Sha256 { .. } => {
enable_verity_maybe_copy::<crate::fsverity::Sha256HashValue>(dirfd, fd)
.context("enabling verity (sha256)")?;
}
Algorithm::Sha512 { .. } => {
enable_verity_maybe_copy::<crate::fsverity::Sha512HashValue>(dirfd, fd)
.context("enabling verity (sha512)")?;
}
}
Ok(())
}
#[context("Resetting repository metadata at {}", path.as_ref().display())]
pub fn reset_metadata(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
for dir in ["streams", "images"] {
let p = path.join(dir);
if p.exists() {
std::fs::remove_dir_all(&p).with_context(|| format!("removing {}", p.display()))?;
}
}
let meta_path = path.join(REPO_METADATA_FILENAME);
if meta_path.exists() {
std::fs::remove_file(&meta_path)
.with_context(|| format!("removing {}", meta_path.display()))?;
}
Ok(())
}
pub fn user_path() -> Result<PathBuf> {
let home = std::env::var("HOME").with_context(|| "$HOME must be set when in user mode")?;
Ok(PathBuf::from(home).join(".var/lib/composefs"))
}
pub fn system_path() -> PathBuf {
PathBuf::from("/sysroot/composefs")
}
#[context("Writing repository metadata")]
pub(crate) fn write_repo_metadata(
repo_fd: &impl AsFd,
meta: &RepoMetadata,
enable_verity: bool,
) -> Result<()> {
let data = meta.to_json()?;
match openat(
repo_fd,
".",
OFlags::WRONLY | OFlags::TMPFILE | OFlags::CLOEXEC,
Mode::from_raw_mode(0o644),
) {
Ok(fd) => {
let mut file = File::from(fd);
file.write_all(&data)
.context("writing metadata to tmpfile")?;
file.sync_all().context("syncing metadata tmpfile")?;
let ro_fd = reopen_tmpfile_ro(file).context("re-opening tmpfile read-only")?;
if enable_verity {
enable_verity_for_algorithm(repo_fd, ro_fd.as_fd(), &meta.algorithm)
.context("enabling verity on meta.json")?;
}
linkat(
CWD,
proc_self_fd(&ro_fd),
repo_fd,
REPO_METADATA_FILENAME,
AtFlags::SYMLINK_FOLLOW,
)
.context("linking meta.json into repository")?;
}
Err(Errno::OPNOTSUPP | Errno::NOSYS) => {
let fd = openat(
repo_fd,
REPO_METADATA_FILENAME,
OFlags::WRONLY | OFlags::CREATE | OFlags::EXCL | OFlags::CLOEXEC,
Mode::from_raw_mode(0o644),
)
.context("creating meta.json")?;
let mut file = File::from(fd);
file.write_all(&data).context("writing meta.json")?;
file.sync_all().context("syncing meta.json to disk")?;
if enable_verity {
let ro_fd = openat(
repo_fd,
REPO_METADATA_FILENAME,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.context("re-opening meta.json for verity")?;
drop(file);
enable_verity_for_algorithm(repo_fd, ro_fd.as_fd(), &meta.algorithm)
.context("enabling verity on meta.json")?;
}
}
Err(e) => {
return Err(e).context("creating tmpfile for meta.json")?;
}
}
Ok(())
}
fn infer_metadata(repo_fd: &OwnedFd) -> Result<(Algorithm, bool)> {
let objects_fd = openat(
repo_fd,
"objects",
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.context("opening objects/ directory")?;
let dir = Dir::read_from(&objects_fd).context("reading objects/ directory")?;
for entry in dir {
let entry = entry.context("reading objects/ directory entry")?;
let subdir_name = entry.file_name().to_bytes();
if subdir_name == b"." || subdir_name == b".." {
continue;
}
if subdir_name.len() != 2 {
continue;
}
let subdir_fd = openat(
&objects_fd,
entry.file_name(),
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.with_context(|| {
format!(
"opening objects/{} subdirectory",
entry.file_name().to_string_lossy()
)
})?;
let subdir = Dir::read_from(&subdir_fd).context("reading object subdirectory")?;
for obj_entry in subdir {
let obj_entry = obj_entry.context("reading object subdirectory entry")?;
let obj_name = obj_entry.file_name().to_bytes();
if obj_name == b"." || obj_name == b".." {
continue;
}
let algorithm = match obj_name.len() {
62 => Algorithm::Sha256 {
lg_blocksize: DEFAULT_LG_BLOCKSIZE,
},
126 => Algorithm::Sha512 {
lg_blocksize: DEFAULT_LG_BLOCKSIZE,
},
_ => continue,
};
let obj_fd = openat(
&subdir_fd,
obj_entry.file_name(),
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.with_context(|| {
format!(
"opening object file {}",
obj_entry.file_name().to_string_lossy()
)
})?;
let has_verity =
has_verity(&obj_fd, algorithm).context("probing fs-verity on object")?;
return Ok((algorithm, has_verity));
}
}
bail!("no objects found in repository — cannot infer metadata");
}
pub fn infer_repo_algorithm(repo_fd: &OwnedFd) -> Result<Algorithm> {
Ok(infer_metadata(repo_fd)?.0)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ObjectStoreMethod {
Reflinked,
Hardlinked,
Copied,
AlreadyPresent,
}
#[derive(Debug, Default)]
pub struct ImportContext {
reflink_unsupported_devs: Vec<(u64, u64)>,
}
impl ImportContext {
pub(crate) fn is_reflink_unsupported(&self, src_dev: u64, dst_dev: u64) -> bool {
self.reflink_unsupported_devs
.iter()
.any(|&(s, d)| s == src_dev && d == dst_dev)
}
pub(crate) fn mark_reflink_unsupported(&mut self, src_dev: u64, dst_dev: u64) {
if !self.is_reflink_unsupported(src_dev, dst_dev) {
self.reflink_unsupported_devs.push((src_dev, dst_dev));
}
}
}
fn ensure_dir_and_openat(dirfd: impl AsFd, filename: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
match openat(
&dirfd,
filename,
flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
0o666.into(),
) {
Ok(file) => Ok(file),
Err(Errno::NOENT) => match mkdirat(&dirfd, filename, 0o777.into()) {
Ok(()) | Err(Errno::EXIST) => openat(
dirfd,
filename,
flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
0o666.into(),
),
Err(other) => Err(other),
},
Err(other) => Err(other),
}
}
fn ensure_dir_at(dirfd: impl AsFd, path: &str, mode: Mode) -> ErrnoResult<()> {
match mkdirat(dirfd, path, mode) {
Ok(()) | Err(Errno::EXIST) => Ok(()),
Err(e) => Err(e),
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct WritableRepo;
pub struct Repository<ObjectID: FsVerityHashValue> {
repository: OwnedFd,
objects: OnceCell<OwnedFd>,
write_semaphore: OnceCell<Arc<Semaphore>>,
insecure: bool,
metadata: RepoMetadata,
#[cfg(any(test, feature = "test"))]
write_old_splitstream_format: std::sync::atomic::AtomicBool,
_data: std::marker::PhantomData<ObjectID>,
}
impl<ObjectID: FsVerityHashValue> std::fmt::Debug for Repository<ObjectID> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Repository")
.field("repository", &self.repository)
.field("objects", &self.objects)
.field("insecure", &self.insecure)
.finish_non_exhaustive()
}
}
impl<ObjectID: FsVerityHashValue> Drop for Repository<ObjectID> {
fn drop(&mut self) {
flock(&self.repository, FlockOperation::Unlock).expect("repository unlock failed");
}
}
enum GCCategoryWalkMode {
RefsOnly,
AllEntries,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct GcResult {
pub objects_removed: u64,
pub objects_bytes: u64,
pub images_pruned: u64,
pub streams_pruned: u64,
}
#[derive(Debug, Clone, serde::Serialize, thiserror::Error)]
#[serde(tag = "type", rename_all = "kebab-case")]
#[non_exhaustive]
#[allow(missing_docs)]
pub enum FsckError {
#[error("fsck: object-invalid-name: {path}: {detail}")]
ObjectInvalidName { path: String, detail: String },
#[error("fsck: object-open-failed: {path}: {detail}")]
ObjectOpenFailed { path: String, detail: String },
#[error("fsck: object-digest-mismatch: {path}: measured {measured}")]
ObjectDigestMismatch { path: String, measured: String },
#[error("fsck: object-verity-failed: {path}: {detail}")]
ObjectVerityFailed { path: String, detail: String },
#[error("fsck: object-verity-missing: {path}")]
ObjectVerityMissing { path: String },
#[error("fsck: entry-not-symlink: {path}")]
EntryNotSymlink { path: String },
#[error("fsck: broken-symlink: {path}")]
BrokenSymlink { path: String },
#[error("fsck: stat-failed: {path}: {detail}")]
StatFailed { path: String, detail: String },
#[error("fsck: unexpected-file-type: {path}: {detail}")]
UnexpectedFileType { path: String, detail: String },
#[error("fsck: stream-open-failed: {path}: {detail}")]
StreamOpenFailed { path: String, detail: String },
#[error("fsck: missing-object-ref: {path}: {object_id}")]
#[serde(rename_all = "camelCase")]
MissingObjectRef { path: String, object_id: String },
#[error("fsck: stream-read-failed: {path}: {detail}")]
StreamReadFailed { path: String, detail: String },
#[error("fsck: missing-named-ref: {path}: ref {ref_name}: {object_id}")]
#[serde(rename_all = "camelCase")]
MissingNamedRef {
path: String,
ref_name: String,
object_id: String,
},
#[error("fsck: object-check-failed: {path}: {object_id}: {detail}")]
#[serde(rename_all = "camelCase")]
ObjectCheckFailed {
path: String,
object_id: String,
detail: String,
},
#[error("fsck: image-open-failed: {path}: {detail}")]
ImageOpenFailed { path: String, detail: String },
#[error("fsck: image-read-failed: {path}: {detail}")]
ImageReadFailed { path: String, detail: String },
#[error("fsck: image-invalid: {path}: {detail}")]
ImageInvalid { path: String, detail: String },
#[error("fsck: image-missing-object: {path}: {object_id}")]
#[serde(rename_all = "camelCase")]
ImageMissingObject { path: String, object_id: String },
#[error("fsck: metadata-parse-failed: meta.json: {detail}")]
MetadataParseFailed { detail: String },
#[error(
"fsck: metadata-algorithm-mismatch: meta.json: expected {expected}, repository opened as {actual}"
)]
MetadataAlgorithmMismatch { expected: String, actual: String },
}
#[derive(Debug, Clone, Default, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FsckResult {
pub(crate) has_metadata: bool,
pub(crate) objects_checked: u64,
pub(crate) objects_corrupted: u64,
pub(crate) streams_checked: u64,
pub(crate) streams_corrupted: u64,
pub(crate) images_checked: u64,
pub(crate) images_corrupted: u64,
pub(crate) broken_links: u64,
pub(crate) missing_objects: u64,
pub(crate) errors: Vec<FsckError>,
}
impl FsckResult {
pub fn has_metadata(&self) -> bool {
self.has_metadata
}
pub fn is_ok(&self) -> bool {
debug_assert!(
self.objects_corrupted == 0
&& self.streams_corrupted == 0
&& self.images_corrupted == 0
&& self.broken_links == 0
&& self.missing_objects == 0
|| !self.errors.is_empty(),
"corruption counters are non-zero but no error messages recorded"
);
self.errors.is_empty()
}
pub fn objects_checked(&self) -> u64 {
self.objects_checked
}
pub fn objects_corrupted(&self) -> u64 {
self.objects_corrupted
}
pub fn streams_checked(&self) -> u64 {
self.streams_checked
}
pub fn streams_corrupted(&self) -> u64 {
self.streams_corrupted
}
pub fn images_checked(&self) -> u64 {
self.images_checked
}
pub fn images_corrupted(&self) -> u64 {
self.images_corrupted
}
pub fn broken_links(&self) -> u64 {
self.broken_links
}
pub fn missing_objects(&self) -> u64 {
self.missing_objects
}
pub fn errors(&self) -> &[FsckError] {
&self.errors
}
}
impl fmt::Display for FsckResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let metadata_errors = self.errors.iter().any(|e| {
matches!(
e,
FsckError::MetadataParseFailed { .. } | FsckError::MetadataAlgorithmMismatch { .. }
)
});
if metadata_errors {
writeln!(f, "meta.json: error")?;
} else if self.has_metadata {
writeln!(f, "meta.json: ok")?;
} else {
writeln!(f, "meta.json: absent")?;
}
writeln!(
f,
"objects: {}/{} ok",
self.objects_checked.saturating_sub(self.objects_corrupted),
self.objects_checked
)?;
writeln!(
f,
"streams: {}/{} ok",
self.streams_checked.saturating_sub(self.streams_corrupted),
self.streams_checked
)?;
writeln!(
f,
"images: {}/{} ok",
self.images_checked.saturating_sub(self.images_corrupted),
self.images_checked
)?;
if self.broken_links > 0 {
writeln!(f, "broken symlinks: {}", self.broken_links)?;
}
if self.missing_objects > 0 {
writeln!(f, "missing objects: {}", self.missing_objects)?;
}
if self.errors.is_empty() {
writeln!(f, "status: ok")?;
} else {
writeln!(f, "status: {} error(s)", self.errors.len())?;
for err in &self.errors {
writeln!(f, " - {err}")?;
}
}
Ok(())
}
}
impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
#[cfg(any(test, feature = "test"))]
pub fn set_write_old_splitstream_format(&self, enabled: bool) {
self.write_old_splitstream_format
.store(enabled, std::sync::atomic::Ordering::Relaxed);
}
#[cfg(any(test, feature = "test"))]
pub(crate) fn write_old_splitstream_format(&self) -> bool {
self.write_old_splitstream_format
.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn objects_dir(&self) -> ErrnoResult<&OwnedFd> {
self.objects
.get_or_try_init(|| ensure_dir_and_openat(&self.repository, "objects", OFlags::PATH))
}
pub fn write_semaphore(&self) -> Arc<Semaphore> {
self.write_semaphore
.get_or_init(|| {
let max_concurrent = available_parallelism().map(|n| n.get()).unwrap_or(4);
Arc::new(Semaphore::new(max_concurrent))
})
.clone()
}
#[context("Initializing repository at {}", path.as_ref().display())]
pub fn init_path(
dirfd: impl AsFd,
path: impl AsRef<Path>,
algorithm: Algorithm,
enable_verity: bool,
) -> Result<(Self, bool)> {
let path = path.as_ref();
if !algorithm.is_compatible::<ObjectID>() {
bail!(
"algorithm {} is not compatible with this repository type (expected {})",
algorithm,
Algorithm::for_hash::<ObjectID>(),
);
}
mkdirat(&dirfd, path, Mode::from_raw_mode(0o700))
.or_else(|e| if e == Errno::EXIST { Ok(()) } else { Err(e) })
.with_context(|| format!("creating repository directory {}", path.display()))?;
let repo_fd = openat(
&dirfd,
path,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.with_context(|| format!("opening repository directory {}", path.display()))?;
let meta = RepoMetadata::new(algorithm);
if let Err(write_err) = write_repo_metadata(&repo_fd, &meta, enable_verity) {
match read_repo_metadata(&repo_fd)? {
Some(existing) if existing == meta => {
let repo = Self::open_path(dirfd, path)?;
return Ok((repo, false));
}
Some(existing) => {
bail!(
"repository already initialized with algorithm '{}'; \
cannot re-initialize with '{}'",
existing.algorithm,
meta.algorithm,
);
}
None => {
return Err(write_err);
}
}
}
drop(repo_fd);
let repo = Self::open_path(dirfd, path)?;
Ok((repo, true))
}
pub fn open_path(
dirfd: impl AsFd,
path: impl AsRef<Path>,
) -> Result<Self, RepositoryOpenError> {
let path = path.as_ref();
let repository = openat(dirfd, path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())?;
flock(&repository, FlockOperation::LockShared)?;
let (metadata, has_verity) = Self::read_and_probe_metadata(&repository)?;
metadata.check_compatible::<ObjectID>()?;
Ok(Self {
repository,
objects: OnceCell::new(),
write_semaphore: OnceCell::new(),
insecure: !has_verity,
metadata,
#[cfg(any(test, feature = "test"))]
write_old_splitstream_format: std::sync::atomic::AtomicBool::new(false),
_data: std::marker::PhantomData,
})
}
pub fn open_upgrade(dirfd: impl AsFd, path: impl AsRef<Path>) -> Result<(Self, bool)> {
let path = path.as_ref();
match Self::open_path(&dirfd, path) {
Ok(repo) => Ok((repo, false)),
Err(RepositoryOpenError::OldFormatRepository) => {
let repo_fd = openat(
&dirfd,
path,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.with_context(|| format!("opening repository directory {}", path.display()))?;
let (algorithm, has_verity) = infer_metadata(&repo_fd)?;
if !algorithm.is_compatible::<ObjectID>() {
bail!(
"inferred algorithm {} is not compatible with this repository type \
(expected {})",
algorithm,
Algorithm::for_hash::<ObjectID>(),
);
}
let meta = RepoMetadata::new(algorithm);
write_repo_metadata(&repo_fd, &meta, has_verity)?;
drop(repo_fd);
let repo = Self::open_path(&dirfd, path)
.context("opening repository after writing meta.json")?;
Ok((repo, true))
}
Err(other) => Err(other.into()),
}
}
fn read_and_probe_metadata(
repo_fd: &OwnedFd,
) -> Result<(RepoMetadata, bool), RepositoryOpenError> {
let meta_fd = match openat(
repo_fd,
REPO_METADATA_FILENAME,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
) {
Ok(fd) => fd,
Err(Errno::NOENT) => {
return Err(
match statat(repo_fd, "objects", AtFlags::empty()).filter_errno(Errno::NOENT) {
Ok(Some(_)) => RepositoryOpenError::OldFormatRepository,
Ok(None) => RepositoryOpenError::MetadataMissing,
Err(e) => e.into(),
},
);
}
Err(e) => return Err(e.into()),
};
let read_fd = meta_fd.try_clone()?;
let meta: RepoMetadata =
serde_json::from_reader(std::io::BufReader::new(File::from(read_fd)))
.map_err(RepositoryOpenError::MetadataInvalid)?;
let has_verity = measure_verity_opt::<ObjectID>(&meta_fd)
.map_err(|e| std::io::Error::other(e.to_string()))?
.is_some();
Ok((meta, has_verity))
}
#[context("Opening user repository")]
pub fn open_user() -> Result<Self> {
Ok(Self::open_path(CWD, user_path()?)?)
}
#[context("Opening system repository")]
pub fn open_system() -> Result<Self> {
Ok(Self::open_path(CWD, system_path())?)
}
fn ensure_dir(&self, dir: impl AsRef<Path>) -> ErrnoResult<()> {
mkdirat(&self.repository, dir.as_ref(), 0o755.into()).or_else(|e| match e {
Errno::EXIST => Ok(()),
_ => Err(e),
})
}
#[context("Ensuring object asynchronously")]
pub async fn ensure_object_async(self: &Arc<Self>, data: Vec<u8>) -> Result<ObjectID> {
let writable = self.ensure_writable_token()?;
let self_ = Arc::clone(self);
tokio::task::spawn_blocking(move || self_.ensure_object_impl(&data, &writable)).await?
}
#[context("Ensuring object from file descriptor")]
pub(crate) fn ensure_object_from_fd(&self, source: OwnedFd, size: u64) -> Result<ObjectID> {
let writable = self.ensure_writable_token()?;
let tmpfile_fd = self.create_object_tmpfile_impl(&writable)?;
if self.insecure {
let mut hasher = FsVerityHasher::<ObjectID>::new();
let mut src = std::io::BufReader::with_capacity(IO_BUF_CAPACITY, File::from(source));
let mut dst = File::from(tmpfile_fd.try_clone()?);
loop {
let buf = src.fill_buf()?;
if buf.is_empty() {
break;
}
let chunk = &buf[..buf.len().min(FsVerityHasher::<ObjectID>::BLOCK_SIZE)];
hasher.add_block(chunk);
dst.write_all(chunk)?;
let n = chunk.len();
src.consume(n);
}
drop(dst);
let id = hasher.digest();
let ro_fd = reopen_tmpfile_ro(File::from(tmpfile_fd))
.context("Re-opening tmpfile as read-only")?;
let objects_dir = self.objects_dir().context("Getting objects directory")?;
let (id, _method) = self.link_tmpfile_as_object(objects_dir, &ro_fd, &id, size)?;
Ok(id)
} else {
let mut src = File::from(source);
let mut dst = File::from(tmpfile_fd.try_clone()?);
let copied = std::io::copy(&mut src, &mut dst)?;
ensure!(copied == size, "Expected {size} bytes, got {copied}");
drop(dst);
let (id, _method) =
self.finalize_object_tmpfile_impl(File::from(tmpfile_fd), size, &writable)?;
Ok(id)
}
}
#[context("Creating object tmpfile")]
pub fn create_object_tmpfile(&self) -> Result<OwnedFd> {
let writable = self.ensure_writable_token()?;
self.create_object_tmpfile_impl(&writable)
}
#[context("Creating object tmpfile")]
pub(crate) fn create_object_tmpfile_impl(&self, _writable: &WritableRepo) -> Result<OwnedFd> {
let objects_dir = self
.objects_dir()
.context("Getting objects directory for tmpfile creation")?;
let fd = openat(
objects_dir,
".",
OFlags::RDWR | OFlags::TMPFILE | OFlags::CLOEXEC,
Mode::from_raw_mode(0o644),
)
.context("Opening temp file in objects directory")?;
Ok(fd)
}
pub fn ensure_object_from_file(
&self,
src: &std::fs::File,
size: u64,
ctx: &mut ImportContext,
) -> Result<(ObjectID, ObjectStoreMethod)> {
self.ensure_object_from_file_inner(src, size, true, ctx)
}
pub fn ensure_object_from_file_zerocopy(
&self,
src: &std::fs::File,
size: u64,
ctx: &mut ImportContext,
) -> Result<(ObjectID, ObjectStoreMethod)> {
self.ensure_object_from_file_inner(src, size, false, ctx)
}
fn ensure_object_from_file_inner(
&self,
src: &std::fs::File,
size: u64,
allow_copy: bool,
ctx: &mut ImportContext,
) -> Result<(ObjectID, ObjectStoreMethod)> {
use rustix::fs::{fstat, ioctl_ficlone};
let writable = self.ensure_writable_token()?;
let src_dev = fstat(src)?.st_dev;
let dst_dev = fstat(self.objects_dir()?)?.st_dev;
if !ctx.is_reflink_unsupported(src_dev, dst_dev) {
let tmpfile_fd = self.create_object_tmpfile_impl(&writable)?;
let tmpfile = File::from(tmpfile_fd);
match ioctl_ficlone(&tmpfile, src) {
Ok(()) => {
let stat = fstat(&tmpfile)?;
anyhow::ensure!(
stat.st_size as u64 == size,
"Reflink size mismatch: expected {}, got {}",
size,
stat.st_size
);
let (object_id, method) = self.finalize_object_tmpfile(tmpfile, size)?;
let method = match method {
ObjectStoreMethod::Copied => ObjectStoreMethod::Reflinked,
other => other,
};
return Ok((object_id, method));
}
Err(Errno::OPNOTSUPP | Errno::XDEV) => {
ctx.mark_reflink_unsupported(src_dev, dst_dev);
drop(tmpfile);
}
Err(e) => {
return Err(e).context("Reflinking source file to objects directory")?;
}
}
}
match self.try_hardlink_object(src, size) {
Ok(result) => return Ok(result),
Err(_) if allow_copy => {
}
Err(e) => {
return Err(e).context(
"reflink and hardlink both failed; copy fallback is disabled (zerocopy mode)",
);
}
}
let tmpfile_fd = self.create_object_tmpfile_impl(&writable)?;
let mut tmpfile = File::from(tmpfile_fd);
{
use std::io::{Seek, SeekFrom};
let mut src_clone = src.try_clone()?;
src_clone.seek(SeekFrom::Start(0))?;
std::io::copy(&mut src_clone, &mut tmpfile)?;
}
let (object_id, method) = self.finalize_object_tmpfile(tmpfile, size)?;
Ok((object_id, method))
}
fn try_hardlink_object(
&self,
src: &std::fs::File,
size: u64,
) -> Result<(ObjectID, ObjectStoreMethod)> {
use crate::fsverity::enable_verity_with_retry;
use rustix::thread::{CapabilitySet, capabilities};
let has_cap = capabilities(None)
.map(|caps| caps.effective.contains(CapabilitySet::DAC_READ_SEARCH))
.unwrap_or(false);
if !has_cap {
anyhow::bail!(
"hardlinking objects requires CAP_DAC_READ_SEARCH \
(run as root or use the copy fallback)"
);
}
let objects_dir = self.objects_dir()?;
let verity_enabled = match enable_verity_with_retry::<ObjectID>(src) {
Ok(()) => true,
Err(EnableVerityError::AlreadyEnabled) => true,
Err(EnableVerityError::FilesystemNotSupported) if self.insecure => false,
Err(e) => {
return Err(e).context("enabling verity on source file for hardlink")?;
}
};
let id: ObjectID = if verity_enabled {
measure_verity(src).context("measuring verity digest on source file")?
} else {
let mut reader = std::io::BufReader::new(
src.try_clone()
.context("cloning fd for digest computation")?,
);
Self::compute_verity_digest(&mut reader)
.context("computing verity digest in insecure mode")?
};
let path = id.to_object_pathname();
match statat(objects_dir, &path, AtFlags::empty()) {
Ok(stat) if stat.st_size as u64 == size => {
return Ok((id, ObjectStoreMethod::AlreadyPresent));
}
_ => {}
}
let parent_dir = id.to_object_dir();
ensure_dir_at(objects_dir, &parent_dir, Mode::from_raw_mode(0o755))
.context("creating object parent directory")?;
match linkat(src, "", objects_dir, &path, AtFlags::EMPTY_PATH) {
Ok(()) => Ok((id, ObjectStoreMethod::Hardlinked)),
Err(Errno::EXIST) => Ok((id, ObjectStoreMethod::AlreadyPresent)),
Err(e) => Err(e).context("hardlinking source file into objects directory")?,
}
}
#[context("Finalizing object tempfile")]
pub fn finalize_object_tmpfile(
&self,
file: File,
size: u64,
) -> Result<(ObjectID, ObjectStoreMethod)> {
let writable = self.ensure_writable_token()?;
self.finalize_object_tmpfile_impl(file, size, &writable)
}
#[context("Finalizing object tempfile")]
pub(crate) fn finalize_object_tmpfile_impl(
&self,
file: File,
size: u64,
_writable: &WritableRepo,
) -> Result<(ObjectID, ObjectStoreMethod)> {
let ro_fd =
reopen_tmpfile_ro(file).context("Re-opening tmpfile as read-only for verity")?;
let objects_dir = self
.objects_dir()
.context("Getting objects directory for finalization")?;
let (ro_fd, verity_enabled) =
match enable_verity_maybe_copy::<ObjectID>(objects_dir, ro_fd.as_fd()) {
Ok(None) => (ro_fd, true),
Ok(Some(new_fd)) => (new_fd, true),
Err(EnableVerityError::FilesystemNotSupported) if self.insecure => (ro_fd, false),
Err(EnableVerityError::AlreadyEnabled) => (ro_fd, true),
Err(other) => return Err(other).context("Enabling verity on tmpfile")?,
};
let id: ObjectID = if verity_enabled {
measure_verity(&ro_fd).context("Measuring verity digest")?
} else {
let mut reader = std::io::BufReader::new(File::from(
ro_fd
.try_clone()
.context("Cloning fd for digest computation")?,
));
Self::compute_verity_digest(&mut reader)
.context("Computing verity digest in insecure mode")?
};
self.link_tmpfile_as_object(objects_dir, &ro_fd, &id, size)
}
fn link_tmpfile_as_object(
&self,
objects_dir: &OwnedFd,
ro_fd: &impl AsFd,
id: &ObjectID,
size: u64,
) -> Result<(ObjectID, ObjectStoreMethod)> {
let path = id.to_object_pathname();
match statat(objects_dir, &path, AtFlags::empty()) {
Ok(stat) if stat.st_size as u64 == size => {
return Ok((id.clone(), ObjectStoreMethod::AlreadyPresent));
}
_ => {}
}
let parent_dir = id.to_object_dir();
ensure_dir_at(objects_dir, &parent_dir, Mode::from_raw_mode(0o755))
.context("creating object parent directory")?;
match linkat(
CWD,
proc_self_fd(ro_fd),
objects_dir,
&path,
AtFlags::SYMLINK_FOLLOW,
) {
Ok(()) => Ok((id.clone(), ObjectStoreMethod::Copied)),
Err(Errno::EXIST) => Ok((id.clone(), ObjectStoreMethod::AlreadyPresent)),
Err(e) => Err(e).context("Linking tmpfile into objects directory")?,
}
}
#[context("Computing verity digest in userspace")]
fn compute_verity_digest(reader: &mut impl std::io::BufRead) -> Result<ObjectID> {
let mut hasher = FsVerityHasher::<ObjectID>::new();
loop {
let buf = reader
.fill_buf()
.context("Reading buffer for verity computation")?;
if buf.is_empty() {
break;
}
let chunk_size = buf.len().min(FsVerityHasher::<ObjectID>::BLOCK_SIZE);
hasher.add_block(&buf[..chunk_size]);
reader.consume(chunk_size);
}
Ok(hasher.digest())
}
#[context("Storing object with ID {id:?}")]
fn store_object_with_id(
&self,
data: &[u8],
id: &ObjectID,
_writable: &WritableRepo,
) -> Result<()> {
let dirfd = self
.objects_dir()
.context("Getting objects directory for storage")?;
let path = id.to_object_pathname();
match openat(
dirfd,
&path,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
) {
Ok(fd) => {
match ensure_verity_equal(&fd, id) {
Ok(()) => {}
Err(CompareVerityError::Measure(MeasureVerityError::VerityMissing))
if self.insecure =>
{
match enable_verity_maybe_copy::<ObjectID>(dirfd, fd.as_fd()) {
Ok(Some(fd)) => ensure_verity_equal(&fd, id)
.context("Verifying verity after enabling (copied)")?,
Ok(None) => ensure_verity_equal(&fd, id)
.context("Verifying verity after enabling (original)")?,
Err(other) => {
Err(other).context("Enabling verity on existing object")?
}
}
}
Err(CompareVerityError::Measure(
MeasureVerityError::FilesystemNotSupported,
)) if self.insecure => {}
Err(other) => Err(other).context("Verifying existing object integrity")?,
}
return Ok(());
}
Err(Errno::NOENT) => {
}
Err(other) => {
return Err(other).context("Checking for existing object in repository")?;
}
}
let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)
.with_context(|| "Creating tempfile in object subdirectory")?;
let mut file = File::from(fd);
file.write_all(data).context("Writing data to tmpfile")?;
let ro_fd = reopen_tmpfile_ro(file).context("Re-opening file as read-only for verity")?;
let ro_fd = match enable_verity_maybe_copy::<ObjectID>(dirfd, ro_fd.as_fd()) {
Ok(maybe_fd) => {
let ro_fd = maybe_fd.unwrap_or(ro_fd);
match ensure_verity_equal(&ro_fd, id) {
Ok(()) => ro_fd,
Err(CompareVerityError::Measure(
MeasureVerityError::VerityMissing
| MeasureVerityError::FilesystemNotSupported,
)) if self.insecure => ro_fd,
Err(other) => Err(other).context("Double-checking verity digest")?,
}
}
Err(EnableVerityError::FilesystemNotSupported) if self.insecure => ro_fd,
Err(other) => Err(other).context("Enabling verity digest")?,
};
match linkat(
CWD,
proc_self_fd(&ro_fd),
dirfd,
path,
AtFlags::SYMLINK_FOLLOW,
) {
Ok(()) => {}
Err(Errno::EXIST) => {
}
Err(other) => {
return Err(other).context("Linking created object file");
}
}
Ok(())
}
#[context("Ensuring object exists in repository")]
pub fn ensure_object(&self, data: &[u8]) -> Result<ObjectID> {
let writable = self.ensure_writable_token()?;
self.ensure_object_impl(data, &writable)
}
#[context("Ensuring object exists in repository")]
pub(crate) fn ensure_object_impl(
&self,
data: &[u8],
writable: &WritableRepo,
) -> Result<ObjectID> {
let id: ObjectID = compute_verity(data);
self.store_object_with_id(data, &id, writable)?;
Ok(id)
}
#[context("Opening file '{filename}' with verity verification")]
fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result<OwnedFd> {
let fd = self
.openat(filename, OFlags::RDONLY)
.with_context(|| format!("Opening file '{filename}' in repository"))?;
match ensure_verity_equal(&fd, expected_verity) {
Ok(()) => {}
Err(CompareVerityError::Measure(
MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported,
)) if self.insecure => {}
Err(other) => Err(other).context("Verifying file verity digest")?,
}
Ok(fd)
}
pub fn is_insecure(&self) -> bool {
self.insecure
}
pub fn set_insecure(&mut self) -> &mut Self {
self.insecure = true;
self
}
pub fn require_verity(&self) -> Result<()> {
if self.insecure {
bail!(
"repository was not initialized with fs-verity \
(hint: re-create with `cfsctl init` on a \
verity-capable filesystem)"
);
}
Ok(())
}
pub fn ensure_writable(&self) -> Result<()> {
self.ensure_writable_token()?;
Ok(())
}
pub(crate) fn ensure_writable_token(&self) -> Result<WritableRepo> {
let st = fstatvfs(&self.repository).context("Repository is not writable")?;
if st.f_flag.contains(StatVfsMountFlags::RDONLY) {
anyhow::bail!("Repository is not writable: read-only file system");
}
accessat(&self.repository, ".", Access::WRITE_OK, AtFlags::empty())
.context("Repository is not writable")?;
Ok(WritableRepo)
}
pub fn create_stream(
self: &Arc<Self>,
content_type: u64,
) -> Result<SplitStreamWriter<ObjectID>> {
let writable = self.ensure_writable_token()?;
Ok(SplitStreamWriter::new(self, content_type, writable))
}
fn format_object_path(id: &ObjectID) -> String {
format!("objects/{}", id.to_object_pathname())
}
fn format_stream_path(content_identifier: &str) -> String {
format!("streams/{content_identifier}")
}
#[context("Checking if stream '{content_identifier}' exists")]
pub fn has_stream(&self, content_identifier: &str) -> Result<Option<ObjectID>> {
let stream_path = Self::format_stream_path(content_identifier);
match readlinkat(&self.repository, &stream_path, []) {
Ok(target) => {
let bytes = target.as_bytes();
ensure!(
bytes.starts_with(b"../"),
"stream symlink has incorrect prefix"
);
Ok(Some(
ObjectID::from_object_pathname(bytes)
.context("Parsing object ID from stream symlink target")?,
))
}
Err(Errno::NOENT) => Ok(None),
Err(err) => Err(err).context("Reading stream symlink")?,
}
}
#[context("Writing stream '{content_identifier}' to repository")]
pub fn write_stream(
&self,
writer: SplitStreamWriter<ObjectID>,
content_identifier: &str,
reference: Option<&str>,
) -> Result<ObjectID> {
let writable = *writer.writable();
let object_id = writer.done().context("Finalizing split stream writer")?;
self.sync()?;
let stream_path = Self::format_stream_path(content_identifier);
let object_path = Self::format_object_path(&object_id);
self.symlink_impl(&stream_path, &object_path, &writable)?;
if let Some(name) = reference {
let reference_path = format!("streams/refs/{name}");
self.symlink_impl(&reference_path, &stream_path, &writable)?;
}
Ok(object_id)
}
#[context("Registering stream '{content_identifier}' with object ID {object_id:?}")]
pub async fn register_stream(
self: &Arc<Self>,
object_id: &ObjectID,
content_identifier: &str,
reference: Option<&str>,
) -> Result<()> {
let writable = self.ensure_writable_token()?;
self.sync_async().await?;
let stream_path = Self::format_stream_path(content_identifier);
let object_path = Self::format_object_path(object_id);
self.symlink_impl(&stream_path, &object_path, &writable)?;
if let Some(name) = reference {
let reference_path = format!("streams/refs/{name}");
self.symlink_impl(&reference_path, &stream_path, &writable)?;
}
Ok(())
}
#[context("Writing stream '{content_identifier}' to repository (async)")]
pub async fn write_stream_async(
self: &Arc<Self>,
writer: SplitStreamWriter<ObjectID>,
content_identifier: &str,
reference: Option<&str>,
) -> Result<ObjectID> {
let writable = *writer.writable();
let object_id = writer
.done_async()
.await
.context("Finalizing split stream writer (async)")?;
self.sync_async().await?;
let stream_path = Self::format_stream_path(content_identifier);
let object_path = Self::format_object_path(&object_id);
self.symlink_impl(&stream_path, &object_path, &writable)?;
if let Some(name) = reference {
let reference_path = format!("streams/refs/{name}");
self.symlink_impl(&reference_path, &stream_path, &writable)?;
}
Ok(object_id)
}
#[context("Checking if named stream '{name}' exists")]
pub fn has_named_stream(&self, name: &str) -> Result<bool> {
let stream_path = format!("streams/refs/{name}");
Ok(statat(&self.repository, &stream_path, AtFlags::empty())
.filter_errno(Errno::NOENT)
.with_context(|| format!("Looking for stream '{name}' in repository"))?
.map(|s| FileType::from_raw_mode(s.st_mode).is_symlink())
.unwrap_or(false))
}
#[context("Naming stream '{content_identifier}' as '{name}'")]
pub fn name_stream(&self, content_identifier: &str, name: &str) -> Result<()> {
let writable = self.ensure_writable_token()?;
let stream_path = Self::format_stream_path(content_identifier);
let reference_path = format!("streams/refs/{name}");
self.symlink_impl(&reference_path, &stream_path, &writable)?;
Ok(())
}
#[context("Ensuring stream '{content_identifier}' exists")]
pub fn ensure_stream<T: Default>(
self: &Arc<Self>,
content_identifier: &str,
content_type: u64,
callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<T>,
reference: Option<&str>,
) -> Result<(ObjectID, T)> {
let writable = self.ensure_writable_token()?;
let stream_path = Self::format_stream_path(content_identifier);
let (object_id, extra) = match self.has_stream(content_identifier)? {
Some(id) => (id, T::default()),
None => {
let mut writer = self.create_stream(content_type)?;
let extra = callback(&mut writer).context("Writing stream content via callback")?;
let id = self.write_stream(writer, content_identifier, reference)?;
(id, extra)
}
};
if let Some(name) = reference {
let reference_path = format!("streams/refs/{name}");
self.symlink_impl(&reference_path, &stream_path, &writable)?;
}
Ok((object_id, extra))
}
#[context("Opening stream '{content_identifier}'")]
pub fn open_stream(
&self,
content_identifier: &str,
verity: Option<&ObjectID>,
expected_content_type: Option<u64>,
) -> Result<SplitStreamReader<ObjectID>> {
let file = File::from(if let Some(verity_hash) = verity {
self.open_object(verity_hash)
.with_context(|| format!("Opening object '{verity_hash:?}'"))?
} else {
let filename = Self::format_stream_path(content_identifier);
self.openat(&filename, OFlags::RDONLY)
.with_context(|| format!("Opening ref '{filename}'"))?
});
SplitStreamReader::new(file, expected_content_type)
}
#[context("Opening object {id:?}")]
pub fn open_object(&self, id: &ObjectID) -> Result<OwnedFd> {
self.open_with_verity(&Self::format_object_path(id), id)
}
#[context("Reading object {id:?} into memory")]
pub fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
let mut data = vec![];
File::from(self.open_object(id)?)
.read_to_end(&mut data)
.context("Reading object data")?;
Ok(data)
}
#[context("Merging splitstream '{content_identifier}'")]
pub fn merge_splitstream(
&self,
content_identifier: &str,
verity: Option<&ObjectID>,
expected_content_type: Option<u64>,
output: &mut impl Write,
) -> Result<()> {
let mut split_stream =
self.open_stream(content_identifier, verity, expected_content_type)?;
split_stream.cat(self, output)
}
#[context("Writing image to repository")]
pub fn write_image(&self, name: Option<&str>, data: &[u8]) -> Result<ObjectID> {
let writable = self.ensure_writable_token()?;
let object_id = self.ensure_object_impl(data, &writable)?;
let object_path = Self::format_object_path(&object_id);
let image_path = format!("images/{}", object_id.to_hex());
self.symlink_impl(&image_path, &object_path, &writable)?;
if let Some(reference) = name {
let ref_path = format!("images/refs/{reference}");
self.symlink_impl(&ref_path, &image_path, &writable)?;
}
Ok(object_id)
}
#[context("Importing image '{name}' from reader")]
pub fn import_image<R: Read>(&self, name: &str, image: &mut R) -> Result<ObjectID> {
let mut data = vec![];
image
.read_to_end(&mut data)
.context("Reading image data from input")?;
self.write_image(Some(name), &data)
}
#[context("Opening image '{name}'")]
pub fn open_image(&self, name: &str) -> Result<(OwnedFd, bool)> {
let image = self
.openat(&format!("images/{name}"), OFlags::RDONLY)
.with_context(|| format!("Opening ref 'images/{name}'"))?;
if name.contains("/") {
return Ok((image, true));
}
match measure_verity::<ObjectID>(&image) {
Ok(found)
if found
== FsVerityHashValue::from_hex(name)
.context("Parsing expected verity hash from image name")? =>
{
Ok((image, true))
}
Ok(_) => bail!("fs-verity content mismatch"),
Err(MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported)
if self.insecure =>
{
Ok((image, false))
}
Err(other) => Err(other).context("Measuring image verity digest")?,
}
}
#[context("Mounting image '{name}'")]
pub fn mount(&self, name: &str) -> Result<OwnedFd> {
let (image, enable_verity) = self.open_image(name)?;
composefs_fsmount(
image,
name,
self.objects_dir()
.context("Getting objects directory for mount")?,
enable_verity,
)
.context("Creating filesystem mount")
}
#[context("Mounting image '{name}' at path")]
pub fn mount_at(&self, name: &str, mountpoint: impl AsRef<Path>) -> Result<()> {
mount_at(
self.mount(name)?,
CWD,
&canonicalize(mountpoint).context("Canonicalizing mountpoint path")?,
)
.context("Attaching mount at target path")
}
pub fn symlink(
&self,
name: impl AsRef<Path> + std::fmt::Debug,
target: impl AsRef<Path> + std::fmt::Debug,
) -> anyhow::Result<()> {
let writable = self.ensure_writable_token()?;
self.symlink_impl(name, target, &writable)
}
#[context("Creating symlink from {name:?} to {target:?}")]
pub(crate) fn symlink_impl(
&self,
name: impl AsRef<Path> + std::fmt::Debug,
target: impl AsRef<Path> + std::fmt::Debug,
_writable: &WritableRepo,
) -> anyhow::Result<()> {
let name = name.as_ref();
let mut symlink_components = name.parent().unwrap().components().peekable();
let mut target_components = target.as_ref().components().peekable();
let mut symlink_ancestor = PathBuf::new();
while symlink_components.peek() == target_components.peek() {
symlink_ancestor.push(symlink_components.next().unwrap());
target_components.next().unwrap();
}
let mut relative = PathBuf::new();
for symlink_component in symlink_components {
symlink_ancestor.push(symlink_component);
self.ensure_dir(&symlink_ancestor)?;
relative.push("..");
}
for target_component in target_components {
relative.push(target_component);
}
Ok(replace_symlinkat(&relative, &self.repository, name)?)
}
#[context("Reading symlink hash value from {name:?}")]
fn read_symlink_hashvalue(dirfd: &OwnedFd, name: &CStr) -> Result<ObjectID> {
let link_content = readlinkat(dirfd, name, []).context("Reading symlink target")?;
ObjectID::from_object_pathname(link_content.to_bytes())
.context("Parsing object ID from symlink target")
}
#[context("Walking symlink directory")]
fn walk_symlinkdir(fd: OwnedFd, entry_digests: &mut HashSet<OsString>) -> Result<()> {
for item in Dir::read_from(&fd).context("Reading directory entries")? {
let entry = item.context("Reading directory entry")?;
match entry.file_type() {
FileType::Directory => {
let filename = entry.file_name();
if filename != c"." && filename != c".." {
let dirfd = openat(
&fd,
filename,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.context("Opening subdirectory for walking")?;
Self::walk_symlinkdir(dirfd, entry_digests)?;
}
}
FileType::Symlink => {
let link_content = readlinkat(&fd, entry.file_name(), [])
.context("Reading symlink content")?;
let linked_path = Path::new(OsStr::from_bytes(link_content.as_bytes()));
if let Some(entry_name) = linked_path.file_name() {
entry_digests.insert(entry_name.to_os_string());
} else {
continue;
}
}
_ => {
bail!("Unexpected file type encountered");
}
}
}
Ok(())
}
fn openat(&self, name: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
openat(
&self.repository,
name,
flags | OFlags::CLOEXEC,
Mode::empty(),
)
}
#[context("Walking GC category '{category}'")]
fn gc_category(
&self,
category: &str,
mode: GCCategoryWalkMode,
) -> Result<Vec<(ObjectID, String)>> {
let Some(category_fd) = self
.openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
.filter_errno(Errno::NOENT)
.context(format!("Opening {category} dir in repository"))?
else {
return Ok(Vec::new());
};
let mut entry_digests = HashSet::new();
match mode {
GCCategoryWalkMode::RefsOnly => {
if let Some(refs) = openat(
&category_fd,
"refs",
OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
Mode::empty(),
)
.filter_errno(Errno::NOENT)
.context(format!("Opening {category}/refs dir in repository"))?
{
Self::walk_symlinkdir(refs, &mut entry_digests)
.context("Walking refs symlink directory")?;
}
}
GCCategoryWalkMode::AllEntries => {
for item in Dir::read_from(&category_fd).context("Reading category directory")? {
let entry = item.context("Reading category directory entry")?;
let filename = entry.file_name();
if filename != c"refs" && filename != c"." && filename != c".." {
if entry.file_type() != FileType::Symlink {
bail!("category directory contains non-symlink");
}
entry_digests.insert(OsString::from(&OsStr::from_bytes(
entry.file_name().to_bytes(),
)));
}
}
}
}
let objects = entry_digests
.into_iter()
.map(|entry_fn| {
Ok((
Self::read_symlink_hashvalue(
&category_fd,
CString::new(entry_fn.as_bytes())
.context("Creating CString from filename")?
.as_c_str(),
)
.context("Reading symlink hash value")?,
entry_fn
.to_str()
.context("str conversion fails")?
.to_owned(),
))
})
.collect::<Result<_>>()?;
Ok(objects)
}
#[context("Cleaning up broken links")]
fn cleanup_broken_links(fd: &OwnedFd, recursive: bool, dry_run: bool) -> Result<u64> {
let mut count = 0;
for item in Dir::read_from(fd).context("Reading directory for broken links cleanup")? {
let entry = item.context("Reading directory entry for broken links cleanup")?;
match entry.file_type() {
FileType::Directory => {
if !recursive {
continue;
}
let filename = entry.file_name();
if filename != c"." && filename != c".." {
let dirfd = openat(
fd,
filename,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.context("Opening subdirectory for recursive broken link cleanup")?;
count += Self::cleanup_broken_links(&dirfd, recursive, dry_run)
.context("Cleaning up broken links in subdirectory")?;
}
}
FileType::Symlink => {
let filename = entry.file_name();
let result = statat(fd, filename, AtFlags::empty())
.filter_errno(Errno::NOENT)
.context("Testing for broken links")?;
if result.is_none() {
count += 1;
if !dry_run {
unlinkat(fd, filename, AtFlags::empty())
.context("Unlinking broken symlink")?;
}
}
}
_ => {
bail!("Unexpected file type encountered");
}
}
}
Ok(count)
}
#[context("Cleaning up broken links in {category} category")]
fn cleanup_gc_category(&self, category: &'static str, dry_run: bool) -> Result<u64> {
let Some(category_fd) = self
.openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
.filter_errno(Errno::NOENT)
.context(format!("Opening {category} dir in repository"))?
else {
return Ok(0);
};
let mut count = Self::cleanup_broken_links(&category_fd, false, dry_run)
.with_context(|| format!("Cleaning up broken links in {category}/"))?;
let ref_fd = openat(
&category_fd,
"refs",
OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
Mode::empty(),
)
.filter_errno(Errno::NOENT)
.context(format!("Opening {category}/refs to clean up broken links"))?;
if let Some(ref dirfd) = ref_fd {
count += Self::cleanup_broken_links(dirfd, true, dry_run).with_context(|| {
format!("Cleaning up broken links recursively in {category}/refs")
})?;
}
Ok(count)
}
#[context("Walking streams starting from '{stream_name}'")]
fn walk_streams(
&self,
stream_name_map: &HashMap<ObjectID, String>,
stream_name: &str,
walked_streams: &mut HashSet<String>,
objects: &mut HashSet<ObjectID>,
) -> Result<()> {
if walked_streams.contains(stream_name) {
return Ok(());
}
walked_streams.insert(stream_name.to_owned());
let mut split_stream = self
.open_stream(stream_name, None, None)
.context("Opening stream for walking")?;
split_stream
.get_object_refs(|id| {
trace!(" with {id:?}");
objects.insert(id.clone());
})
.context("Getting object references from stream")?;
let streams_to_walk: Vec<_> = split_stream.iter_named_refs().collect();
for (stream_name_in_table, stream_object_id) in streams_to_walk {
trace!(
" named reference stream {stream_name_in_table} lives, with {stream_object_id:?}"
);
objects.insert(stream_object_id.clone());
if let Some(stream_name_in_repo) = stream_name_map.get(stream_object_id) {
self.walk_streams(
stream_name_map,
stream_name_in_repo,
walked_streams,
objects,
)
.context("Walking referenced stream")?;
} else {
trace!(
"broken repo: named reference stream {stream_name_in_table} not found as stream in repo"
);
}
}
Ok(())
}
#[context("Collecting objects for image '{name}'")]
pub fn objects_for_image(&self, name: &str) -> Result<HashSet<ObjectID>> {
let (image, _) = self.open_image(name)?;
let mut data = vec![];
std::fs::File::from(image)
.read_to_end(&mut data)
.context("Reading image data")?;
crate::erofs::reader::collect_objects(&data)
.context("Collecting objects from erofs image data")
}
#[context("Syncing repository to disk")]
pub fn sync(&self) -> Result<()> {
syncfs(&self.repository).context("Syncing filesystem")?;
Ok(())
}
#[context("Syncing repository to disk (async)")]
pub async fn sync_async(self: &Arc<Self>) -> Result<()> {
let self_ = Arc::clone(self);
tokio::task::spawn_blocking(move || self_.sync())
.await
.context("Spawning blocking sync task")?
}
#[context("Running garbage collection")]
pub fn gc(&self, additional_roots: &[&str]) -> Result<GcResult> {
self.ensure_writable_token()?;
flock(&self.repository, FlockOperation::LockExclusive)
.context("Acquiring exclusive lock for GC")?;
self.gc_impl(additional_roots, false)
}
#[context("Running garbage collection dry run")]
pub fn gc_dry_run(&self, additional_roots: &[&str]) -> Result<GcResult> {
flock(&self.repository, FlockOperation::LockShared)
.context("Acquiring shared lock for GC dry run")?;
self.gc_impl(additional_roots, true)
}
#[context("GC implementation (dry_run: {dry_run})")]
fn gc_impl(&self, additional_roots: &[&str], dry_run: bool) -> Result<GcResult> {
let mut result = GcResult::default();
let mut live_objects = HashSet::new();
let extra_roots: HashSet<_> = additional_roots.iter().map(|s| s.to_string()).collect();
let all_images = self
.gc_category("images", GCCategoryWalkMode::AllEntries)
.context("Collecting all images")?;
let root_images: Vec<_> = self
.gc_category("images", GCCategoryWalkMode::RefsOnly)
.context("Collecting image refs")?
.into_iter()
.chain(
all_images
.into_iter()
.filter(|(_, name)| extra_roots.contains(name)),
)
.collect();
for ref image in root_images {
trace!("{image:?} lives as an image");
live_objects.insert(image.0.clone());
self.objects_for_image(&image.1)
.with_context(|| format!("Collecting objects for image {}", image.1))?
.iter()
.for_each(|id| {
trace!(" with {id:?}");
live_objects.insert(id.clone());
});
}
let all_streams = self
.gc_category("streams", GCCategoryWalkMode::AllEntries)
.context("Collecting all streams")?;
let stream_name_map: HashMap<_, _> = all_streams.iter().cloned().collect();
let root_streams: Vec<_> = self
.gc_category("streams", GCCategoryWalkMode::RefsOnly)
.context("Collecting stream refs")?
.into_iter()
.chain(
all_streams
.into_iter()
.filter(|(_, name)| extra_roots.contains(name)),
)
.collect();
let mut walked_streams = HashSet::new();
for stream in root_streams {
trace!("{stream:?} lives as a stream");
live_objects.insert(stream.0.clone());
self.walk_streams(
&stream_name_map,
&stream.1,
&mut walked_streams,
&mut live_objects,
)
.with_context(|| format!("Walking stream {}", stream.1))?;
}
for first_byte in 0x0..=0xff {
let dirfd = match self.openat(
&format!("objects/{first_byte:02x}"),
OFlags::RDONLY | OFlags::DIRECTORY,
) {
Ok(fd) => fd,
Err(Errno::NOENT) => continue,
Err(e) => Err(e)?,
};
for item in Dir::read_from(&dirfd)
.with_context(|| format!("Reading objects/{first_byte:02x} directory"))?
{
let entry = item.context("Reading object directory entry")?;
let filename = entry.file_name();
if filename != c"." && filename != c".." {
let id =
ObjectID::from_object_dir_and_basename(first_byte, filename.to_bytes())
.context("Parsing object ID from directory entry")?;
if !live_objects.contains(&id) {
if let Ok(stat) = statat(&dirfd, filename, AtFlags::empty()) {
result.objects_bytes += stat.st_size as u64;
}
result.objects_removed += 1;
debug!(
"{}: objects/{first_byte:02x}/{filename:?}",
if dry_run { "would remove" } else { "removing" },
);
if !dry_run {
unlinkat(&dirfd, filename, AtFlags::empty()).with_context(|| {
format!("Unlinking object {first_byte:02x}/{filename:?}")
})?;
}
} else {
trace!("objects/{first_byte:02x}/{filename:?} lives");
}
}
}
}
result.images_pruned = self
.cleanup_gc_category("images", dry_run)
.context("Cleaning up broken image symlinks")?;
result.streams_pruned = self
.cleanup_gc_category("streams", dry_run)
.context("Cleaning up broken stream symlinks")?;
if !dry_run {
flock(&self.repository, FlockOperation::LockShared)
.context("Downgrading to shared lock after GC")?;
}
Ok(result)
}
#[context("Running filesystem consistency check")]
pub async fn fsck(&self) -> Result<FsckResult> {
let mut result = FsckResult::default();
self.fsck_metadata(&mut result);
self.fsck_objects(&mut result)
.await
.context("Checking objects")?;
self.fsck_category("streams", &mut result)
.context("Checking streams")?;
self.fsck_category("images", &mut result)
.context("Checking images")?;
Ok(result)
}
fn fsck_metadata(&self, result: &mut FsckResult) {
match read_repo_metadata(&self.repository) {
Ok(Some(meta)) => {
result.has_metadata = true;
if let Err(e) = meta.check_compatible::<ObjectID>() {
result.errors.push(FsckError::MetadataAlgorithmMismatch {
expected: meta.algorithm.to_string(),
actual: ObjectID::ALGORITHM.hash_name().to_string(),
});
log::warn!("meta.json algorithm mismatch: {e}");
}
}
Ok(None) => {
result.errors.push(FsckError::MetadataParseFailed {
detail: format!(
"{REPO_METADATA_FILENAME} not found; \
expected because repository was opened successfully"
),
});
}
Err(e) => {
result.errors.push(FsckError::MetadataParseFailed {
detail: format!("{e:#}"),
});
}
}
}
async fn fsck_objects(&self, result: &mut FsckResult) -> Result<()> {
let max_concurrent = available_parallelism().map(|n| n.get()).unwrap_or(4);
let insecure = self.insecure;
let mut joinset = tokio::task::JoinSet::new();
let mut partial_results = Vec::new();
for first_byte in 0x00..=0xffu8 {
while joinset.len() >= max_concurrent {
partial_results.push(joinset.join_next().await.unwrap()??);
}
let dirfd = match self.openat(
&format!("objects/{first_byte:02x}"),
OFlags::RDONLY | OFlags::DIRECTORY,
) {
Ok(fd) => fd,
Err(Errno::NOENT) => continue,
Err(e) => {
Err(e).with_context(|| format!("Opening objects/{first_byte:02x} directory"))?
}
};
joinset
.spawn_blocking(move || fsck_object_dir::<ObjectID>(dirfd, first_byte, insecure));
}
while let Some(output) = joinset.join_next().await {
partial_results.push(output??);
}
for partial in partial_results {
result.objects_checked += partial.objects_checked;
result.objects_corrupted += partial.objects_corrupted;
result.errors.extend(partial.errors);
}
Ok(())
}
#[context("Checking {category} integrity")]
fn fsck_category(&self, category: &str, result: &mut FsckResult) -> Result<()> {
let is_streams = category == "streams";
let Some(category_fd) = self
.openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
.filter_errno(Errno::NOENT)
.with_context(|| format!("Opening {category} directory"))?
else {
return Ok(());
};
for item in
Dir::read_from(&category_fd).with_context(|| format!("Reading {category} directory"))?
{
let entry = item.context("Reading directory entry")?;
let filename = entry.file_name();
if filename == c"." || filename == c".." || filename == c"refs" {
continue;
}
if is_streams {
result.streams_checked += 1;
} else {
result.images_checked += 1;
}
if entry.file_type() != FileType::Symlink {
if is_streams {
result.streams_corrupted += 1;
} else {
result.images_corrupted += 1;
}
result.errors.push(FsckError::EntryNotSymlink {
path: format!(
"{category}/{}",
String::from_utf8_lossy(filename.to_bytes())
),
});
continue;
}
match statat(&category_fd, filename, AtFlags::empty()) {
Ok(_) => {}
Err(Errno::NOENT) => {
result.broken_links += 1;
if is_streams {
result.streams_corrupted += 1;
} else {
result.images_corrupted += 1;
}
result.errors.push(FsckError::BrokenSymlink {
path: format!(
"{category}/{}",
String::from_utf8_lossy(filename.to_bytes())
),
});
continue;
}
Err(e) => {
result.errors.push(FsckError::StatFailed {
path: format!(
"{category}/{}",
String::from_utf8_lossy(filename.to_bytes())
),
detail: e.to_string(),
});
continue;
}
}
let name = String::from_utf8_lossy(filename.to_bytes()).to_string();
if is_streams {
self.fsck_splitstream(&name, result);
} else {
self.fsck_image(&name, result);
}
}
let refs_fd = match openat(
&category_fd,
c"refs",
OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
Mode::empty(),
)
.filter_errno(Errno::NOENT)
.with_context(|| format!("Opening {category}/refs directory"))?
{
Some(fd) => fd,
None => return Ok(()),
};
self.fsck_refs_dir(&refs_fd, category, "", result)
.with_context(|| format!("Checking {category}/refs"))
}
fn fsck_refs_dir(
&self,
refs_fd: &OwnedFd,
category: &str,
prefix: &str,
result: &mut FsckResult,
) -> Result<()> {
for item in Dir::read_from(refs_fd)
.with_context(|| format!("Reading {category}/refs/{prefix} directory"))?
{
let entry = item.context("Reading refs directory entry")?;
let filename = entry.file_name();
if filename == c"." || filename == c".." {
continue;
}
let name = String::from_utf8_lossy(filename.to_bytes()).to_string();
let display_path = if prefix.is_empty() {
format!("{category}/refs/{name}")
} else {
format!("{category}/refs/{prefix}/{name}")
};
match entry.file_type() {
FileType::Directory => {
let subdir = openat(
refs_fd,
filename,
OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
Mode::empty(),
)
.with_context(|| format!("Opening {display_path}"))?;
let sub_prefix = if prefix.is_empty() {
name.clone()
} else {
format!("{prefix}/{name}")
};
self.fsck_refs_dir(&subdir, category, &sub_prefix, result)?;
}
FileType::Symlink => {
match statat(refs_fd, filename, AtFlags::empty()) {
Ok(_) => {}
Err(Errno::NOENT) => {
result.broken_links += 1;
result.errors.push(FsckError::BrokenSymlink {
path: display_path.clone(),
});
}
Err(e) => {
result.errors.push(FsckError::StatFailed {
path: display_path.clone(),
detail: e.to_string(),
});
}
}
}
other => {
result.errors.push(FsckError::UnexpectedFileType {
path: display_path.clone(),
detail: format!("{other:?}"),
});
}
}
}
Ok(())
}
fn fsck_splitstream(&self, stream_name: &str, result: &mut FsckResult) {
let stream_path = format!("streams/{stream_name}");
let mut split_stream = match self.open_stream(stream_name, None, None) {
Ok(s) => s,
Err(e) => {
result.streams_corrupted += 1;
result.errors.push(FsckError::StreamOpenFailed {
path: stream_path,
detail: e.to_string(),
});
return;
}
};
let check_result = split_stream.get_object_refs(|id| {
let obj_path = Self::format_object_path(id);
match self.openat(&obj_path, OFlags::RDONLY) {
Ok(_) => {}
Err(Errno::NOENT) => {
result.missing_objects += 1;
result.errors.push(FsckError::MissingObjectRef {
path: stream_path.clone(),
object_id: id.to_hex(),
});
}
Err(e) => {
result.errors.push(FsckError::ObjectCheckFailed {
path: stream_path.clone(),
object_id: id.to_hex(),
detail: e.to_string(),
});
}
}
});
if let Err(e) = check_result {
result.streams_corrupted += 1;
result.errors.push(FsckError::StreamReadFailed {
path: stream_path,
detail: e.to_string(),
});
return;
}
for (ref_name, ref_id) in split_stream.iter_named_refs() {
let obj_path = Self::format_object_path(ref_id);
match self.openat(&obj_path, OFlags::RDONLY) {
Ok(_) => {}
Err(Errno::NOENT) => {
result.missing_objects += 1;
result.errors.push(FsckError::MissingNamedRef {
path: stream_path.clone(),
ref_name: ref_name.to_string(),
object_id: ref_id.to_hex(),
});
}
Err(e) => {
result.errors.push(FsckError::ObjectCheckFailed {
path: stream_path.clone(),
object_id: ref_id.to_hex(),
detail: format!("checking named ref '{ref_name}': {e}"),
});
}
}
}
}
fn fsck_image(&self, image_name: &str, result: &mut FsckResult) {
let image_path = format!("images/{image_name}");
let mut data = vec![];
let fd = match self.openat(&image_path, OFlags::RDONLY) {
Ok(fd) => fd,
Err(e) => {
result.images_corrupted += 1;
result.errors.push(FsckError::ImageOpenFailed {
path: image_path,
detail: e.to_string(),
});
return;
}
};
if let Err(e) = File::from(fd).read_to_end(&mut data) {
result.images_corrupted += 1;
result.errors.push(FsckError::ImageReadFailed {
path: image_path,
detail: e.to_string(),
});
return;
}
let objects = match crate::erofs::reader::collect_objects::<ObjectID>(&data) {
Ok(objects) => objects,
Err(e) => {
result.images_corrupted += 1;
result.errors.push(FsckError::ImageInvalid {
path: image_path,
detail: e.to_string(),
});
return;
}
};
for obj_id in &objects {
let path = Self::format_object_path(obj_id);
match self.openat(&path, OFlags::RDONLY) {
Ok(_) => {}
Err(Errno::NOENT) => {
result.missing_objects += 1;
result.errors.push(FsckError::ImageMissingObject {
path: image_path.clone(),
object_id: obj_id.to_hex(),
});
}
Err(e) => {
result.errors.push(FsckError::ObjectCheckFailed {
path: image_path.clone(),
object_id: obj_id.to_hex(),
detail: e.to_string(),
});
}
}
}
}
pub fn repo_fd(&self) -> BorrowedFd<'_> {
self.repository.as_fd()
}
pub fn metadata(&self) -> &RepoMetadata {
&self.metadata
}
pub fn list_stream_refs(&self, prefix: &str) -> Result<Vec<(String, String)>> {
let ref_path = format!("streams/refs/{prefix}");
let dir_fd = match self.openat(&ref_path, OFlags::RDONLY | OFlags::DIRECTORY) {
Ok(fd) => fd,
Err(Errno::NOENT) => return Ok(Vec::new()),
Err(e) => return Err(e.into()),
};
let mut refs = Vec::new();
for item in Dir::read_from(&dir_fd)? {
let entry = item?;
let name_bytes = entry.file_name().to_bytes();
if name_bytes == b"." || name_bytes == b".." {
continue;
}
let name = match std::str::from_utf8(name_bytes) {
Ok(s) => s.to_string(),
Err(_) => continue,
};
if let Ok(target) = readlinkat(&dir_fd, name_bytes, vec![])
&& let Ok(target_str) = target.into_string()
{
refs.push((name, target_str));
}
}
Ok(refs)
}
}
fn fsck_object_dir<ObjectID: FsVerityHashValue>(
dirfd: OwnedFd,
first_byte: u8,
insecure: bool,
) -> Result<FsckResult> {
let mut result = FsckResult::default();
for item in Dir::read_from(&dirfd)
.with_context(|| format!("Reading objects/{first_byte:02x} directory"))?
{
let entry = item.context("Reading object directory entry")?;
let filename = entry.file_name();
if filename == c"." || filename == c".." {
continue;
}
result.objects_checked += 1;
let expected_id =
match ObjectID::from_object_dir_and_basename(first_byte, filename.to_bytes()) {
Ok(id) => id,
Err(e) => {
result.objects_corrupted += 1;
result.errors.push(FsckError::ObjectInvalidName {
path: format!(
"objects/{first_byte:02x}/{}",
String::from_utf8_lossy(filename.to_bytes())
),
detail: e.to_string(),
});
continue;
}
};
let fd = match openat(
&dirfd,
filename,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
) {
Ok(fd) => fd,
Err(e) => {
result.objects_corrupted += 1;
result.errors.push(FsckError::ObjectOpenFailed {
path: format!(
"objects/{first_byte:02x}/{}",
String::from_utf8_lossy(filename.to_bytes())
),
detail: e.to_string(),
});
continue;
}
};
let Some(measured) =
fsck_measure_object::<ObjectID>(fd, &expected_id, insecure, &mut result)
else {
continue;
};
if measured != expected_id {
result.objects_corrupted += 1;
result.errors.push(FsckError::ObjectDigestMismatch {
path: format!("objects/{}", expected_id.to_object_pathname()),
measured: measured.to_hex(),
});
}
}
Ok(result)
}
fn fsck_measure_object<ObjectID: FsVerityHashValue>(
fd: OwnedFd,
expected_id: &ObjectID,
insecure: bool,
result: &mut FsckResult,
) -> Option<ObjectID> {
if let Ok(digest) = measure_verity::<ObjectID>(&fd) {
return Some(digest);
}
if insecure {
match Repository::<ObjectID>::compute_verity_digest(&mut std::io::BufReader::new(
File::from(fd),
)) {
Ok(digest) => return Some(digest),
Err(e) => {
result.objects_corrupted += 1;
result.errors.push(FsckError::ObjectVerityFailed {
path: format!("objects/{}", expected_id.to_object_pathname()),
detail: e.to_string(),
});
return None;
}
}
}
result.objects_corrupted += 1;
result.errors.push(FsckError::ObjectVerityMissing {
path: format!("objects/{}", expected_id.to_object_pathname()),
});
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fsverity::{Sha256HashValue, Sha512HashValue};
use crate::test::tempdir;
use rustix::fs::{CWD, statat};
use tempfile::TempDir;
fn create_test_repo(path: &Path) -> Result<Arc<Repository<Sha512HashValue>>> {
let (repo, _) = Repository::init_path(CWD, path, Algorithm::SHA512, false)?;
Ok(Arc::new(repo))
}
fn generate_test_data(size: u64, seed: u8) -> Vec<u8> {
(0..size)
.map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
.collect()
}
fn read_links_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<Option<PathBuf>>
where
P: AsRef<Path>,
{
let full_path = tmp.path().join("repo").join(repo_sub_path);
match readlinkat(CWD, &full_path, Vec::new()) {
Ok(result) => Ok(Some(PathBuf::from(result.to_str()?))),
Err(rustix::io::Errno::NOENT) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn test_path_exists_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<bool>
where
P: AsRef<Path>,
{
let full_path = tmp.path().join("repo").join(repo_sub_path);
match statat(CWD, &full_path, AtFlags::SYMLINK_NOFOLLOW) {
Ok(_) => Ok(true),
Err(rustix::io::Errno::NOENT) => Ok(false),
Err(e) => Err(e.into()),
}
}
fn test_object_exists(tmp: &TempDir, obj: &Sha512HashValue) -> Result<bool> {
let digest = obj.to_hex();
let (first_two, remainder) = digest.split_at(2);
test_path_exists_in_repo(tmp, &format!("objects/{first_two}/{remainder}"))
}
#[test]
fn test_gc_removes_one_stream() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id: Sha512HashValue = compute_verity(&obj2);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj2)?;
let _stream_id = repo.write_stream(writer, "test-stream", None)?;
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
let result = repo.gc(&[])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(!test_object_exists(&tmp, &obj2_id)?);
assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream")?);
assert_eq!(result.objects_removed, 3);
assert!(result.objects_bytes > 0);
assert_eq!(result.streams_pruned, 1);
assert_eq!(result.images_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_one_stream() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id: Sha512HashValue = compute_verity(&obj2);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj2)?;
let _stream_id = repo.write_stream(writer, "test-stream", None)?;
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
let result = repo.gc(&["test-stream"])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert_eq!(result.objects_removed, 1);
assert!(result.objects_bytes > 0);
assert_eq!(result.streams_pruned, 0);
assert_eq!(result.images_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_one_stream_from_refs() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id: Sha512HashValue = compute_verity(&obj2);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj2)?;
let _stream_id = repo.write_stream(writer, "test-stream", Some("ref-name"))?;
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
let result = repo.gc(&[])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert_eq!(result.objects_removed, 1);
assert!(result.objects_bytes > 0);
assert_eq!(result.streams_pruned, 0);
assert_eq!(result.images_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_one_stream_from_two_overlapped() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let obj3 = generate_test_data(64 * 1024, 0xAA);
let obj4 = generate_test_data(64 * 1024, 0xEE);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id: Sha512HashValue = compute_verity(&obj2);
let obj3_id: Sha512HashValue = compute_verity(&obj3);
let obj4_id: Sha512HashValue = compute_verity(&obj4);
let mut writer1 = repo.create_stream(0)?;
writer1.write_external(&obj2)?;
writer1.write_external(&obj3)?;
let _stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
let mut writer2 = repo.create_stream(0)?;
writer2.write_external(&obj2)?;
writer2.write_external(&obj4)?;
let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_object_exists(&tmp, &obj3_id)?);
assert!(test_object_exists(&tmp, &obj4_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
let result = repo.gc(&["test-stream1"])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_object_exists(&tmp, &obj3_id)?);
assert!(!test_object_exists(&tmp, &obj4_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
assert_eq!(result.objects_removed, 3);
assert!(result.objects_bytes > 0);
assert_eq!(result.streams_pruned, 1);
assert_eq!(result.images_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_named_references() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id: Sha512HashValue = compute_verity(&obj2);
let mut writer1 = repo.create_stream(0)?;
writer1.write_external(&obj2)?;
let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
let mut writer2 = repo.create_stream(0)?;
writer2.add_named_stream_ref("test-stream1", &stream1_id);
let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
let result = repo.gc(&["test-stream2"])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert_eq!(result.objects_removed, 1);
assert!(result.objects_bytes > 0);
assert_eq!(result.streams_pruned, 0);
assert_eq!(result.images_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_named_references_with_different_table_name() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id: Sha512HashValue = compute_verity(&obj2);
let mut writer1 = repo.create_stream(0)?;
writer1.write_external(&obj2)?;
let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
let mut writer2 = repo.create_stream(0)?;
writer2.add_named_stream_ref("different-table-name-for-test-stream1", &stream1_id);
let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
let result = repo.gc(&["test-stream2"])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert_eq!(result.objects_removed, 1);
assert!(result.objects_bytes > 0);
assert_eq!(result.streams_pruned, 0);
assert_eq!(result.images_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_one_named_reference_from_two_overlapped() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let obj3 = generate_test_data(64 * 1024, 0xAA);
let obj4 = generate_test_data(64 * 1024, 0xEE);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id: Sha512HashValue = compute_verity(&obj2);
let obj3_id: Sha512HashValue = compute_verity(&obj3);
let obj4_id: Sha512HashValue = compute_verity(&obj4);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj2)?;
let stream1_id = repo.write_stream(writer, "test-stream1", None)?;
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj3)?;
let stream2_id = repo.write_stream(writer, "test-stream2", None)?;
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj4)?;
let stream3_id = repo.write_stream(writer, "test-stream3", None)?;
let mut writer = repo.create_stream(0)?;
writer.add_named_stream_ref("test-stream1", &stream1_id);
writer.add_named_stream_ref("test-stream2", &stream2_id);
let _ref_stream1_id = repo.write_stream(writer, "ref-stream1", None)?;
let mut writer = repo.create_stream(0)?;
writer.add_named_stream_ref("test-stream1", &stream1_id);
writer.add_named_stream_ref("test-stream3", &stream3_id);
let _ref_stream2_id = repo.write_stream(writer, "ref-stream2", None)?;
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_object_exists(&tmp, &obj3_id)?);
assert!(test_object_exists(&tmp, &obj4_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream3")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/ref-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
let result = repo.gc(&["ref-stream1"])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_object_exists(&tmp, &obj3_id)?);
assert!(!test_object_exists(&tmp, &obj4_id)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
let link_target =
read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
let link_target =
read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("streams").join(&link_target)
)?);
assert!(!test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
assert_eq!(result.objects_removed, 4);
assert!(result.objects_bytes > 0);
assert_eq!(result.streams_pruned, 2);
assert_eq!(result.images_pruned, 0);
Ok(())
}
use crate::tree::{FileSystem, Inode, LeafContent, RegularFile, Stat};
fn test_root_stat() -> Stat {
Stat {
st_mode: 0o755,
st_uid: 0,
st_gid: 0,
st_mtim_sec: 0,
xattrs: Default::default(),
}
}
fn make_test_fs(obj: &Sha512HashValue, size: u64) -> FileSystem<Sha512HashValue> {
let mut fs: FileSystem<Sha512HashValue> = FileSystem::new(test_root_stat());
let leaf_id = fs.push_leaf(
Stat {
st_mode: 0o644,
st_uid: 0,
st_gid: 0,
st_mtim_sec: 0,
xattrs: Default::default(),
},
LeafContent::Regular(RegularFile::External(obj.clone(), size)),
);
let inode = Inode::leaf(leaf_id);
fs.root.insert(OsStr::new("data"), inode);
fs
}
#[test]
fn test_gc_removes_one_image() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1_size: u64 = 32 * 1024;
let obj1 = generate_test_data(obj1_size, 0xAE);
let obj2_size: u64 = 64 * 1024;
let obj2 = generate_test_data(obj2_size, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id = repo.ensure_object(&obj2)?;
let fs = make_test_fs(&obj2_id, obj2_size);
let image1 = fs.commit_image(&repo, None)?;
let image1_path = format!("images/{}", image1.to_hex());
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
let result = repo.gc(&[])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(!test_object_exists(&tmp, &obj2_id)?);
assert!(!test_path_exists_in_repo(&tmp, &image1_path)?);
assert_eq!(result.objects_removed, 3);
assert!(result.objects_bytes > 0);
assert_eq!(result.images_pruned, 1);
assert_eq!(result.streams_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_one_image() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1_size: u64 = 32 * 1024;
let obj1 = generate_test_data(obj1_size, 0xAE);
let obj2_size: u64 = 64 * 1024;
let obj2 = generate_test_data(obj2_size, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id = repo.ensure_object(&obj2)?;
let fs = make_test_fs(&obj2_id, obj2_size);
let image1 = fs.commit_image(&repo, None)?;
let image1_path = format!("images/{}", image1.to_hex());
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
let image1_hex = image1.to_hex();
let result = repo.gc(&[image1_hex.as_str()])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
assert_eq!(result.objects_removed, 1);
assert!(result.objects_bytes > 0);
assert_eq!(result.images_pruned, 0);
assert_eq!(result.streams_pruned, 0);
Ok(())
}
#[test]
fn test_gc_keeps_one_image_from_refs() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1_size: u64 = 32 * 1024;
let obj1 = generate_test_data(obj1_size, 0xAE);
let obj2_size: u64 = 64 * 1024;
let obj2 = generate_test_data(obj2_size, 0xEA);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id = repo.ensure_object(&obj2)?;
let fs = make_test_fs(&obj2_id, obj2_size);
let image1 = fs.commit_image(&repo, Some("ref-name"))?;
let image1_path = format!("images/{}", image1.to_hex());
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
let result = repo.gc(&[])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
assert_eq!(result.objects_removed, 1);
assert!(result.objects_bytes > 0);
assert_eq!(result.images_pruned, 0);
assert_eq!(result.streams_pruned, 0);
Ok(())
}
fn make_test_fs_with_two_files(
obj1: &Sha512HashValue,
size1: u64,
obj2: &Sha512HashValue,
size2: u64,
) -> FileSystem<Sha512HashValue> {
let mut fs = make_test_fs(obj1, size1);
let leaf_id = fs.push_leaf(
Stat {
st_mode: 0o644,
st_uid: 0,
st_gid: 0,
st_mtim_sec: 0,
xattrs: Default::default(),
},
LeafContent::Regular(RegularFile::External(obj2.clone(), size2)),
);
let inode = Inode::leaf(leaf_id);
fs.root.insert(OsStr::new("extra_data"), inode);
fs
}
#[test]
fn test_gc_keeps_one_image_from_two_overlapped() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1_size: u64 = 32 * 1024;
let obj1 = generate_test_data(obj1_size, 0xAE);
let obj2_size: u64 = 64 * 1024;
let obj2 = generate_test_data(obj2_size, 0xEA);
let obj3_size: u64 = 64 * 1024;
let obj3 = generate_test_data(obj2_size, 0xAA);
let obj4_size: u64 = 64 * 1024;
let obj4 = generate_test_data(obj2_size, 0xEE);
let obj1_id = repo.ensure_object(&obj1)?;
let obj2_id = repo.ensure_object(&obj2)?;
let obj3_id = repo.ensure_object(&obj3)?;
let obj4_id = repo.ensure_object(&obj4)?;
let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj3_id, obj3_size);
let image1 = fs.commit_image(&repo, None)?;
let image1_path = format!("images/{}", image1.to_hex());
let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj4_id, obj4_size);
let image2 = fs.commit_image(&repo, None)?;
let image2_path = format!("images/{}", image2.to_hex());
repo.sync()?;
assert!(test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_object_exists(&tmp, &obj3_id)?);
assert!(test_object_exists(&tmp, &obj4_id)?);
assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
assert!(test_path_exists_in_repo(&tmp, &image2_path)?);
let link_target = read_links_in_repo(&tmp, &image2_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
let image1_hex = image1.to_hex();
let result = repo.gc(&[image1_hex.as_str()])?;
assert!(!test_object_exists(&tmp, &obj1_id)?);
assert!(test_object_exists(&tmp, &obj2_id)?);
assert!(test_object_exists(&tmp, &obj3_id)?);
assert!(!test_object_exists(&tmp, &obj4_id)?);
assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
assert!(test_path_exists_in_repo(
&tmp,
PathBuf::from("images").join(&link_target)
)?);
assert!(!test_path_exists_in_repo(&tmp, &image2_path)?);
assert_eq!(result.objects_removed, 3);
assert!(result.objects_bytes > 0);
assert_eq!(result.images_pruned, 1);
assert_eq!(result.streams_pruned, 0);
Ok(())
}
#[test]
fn test_ensure_object_from_file() -> Result<()> {
use std::io::{Seek, SeekFrom, Write};
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let mut ctx = ImportContext::default();
let test_data = generate_test_data(64 * 1024, 0xBE);
let mut temp_file = crate::test::tempfile();
temp_file.write_all(&test_data)?;
temp_file.seek(SeekFrom::Start(0))?;
let (object_id, method) =
repo.ensure_object_from_file(&temp_file, test_data.len() as u64, &mut ctx)?;
assert_ne!(method, ObjectStoreMethod::AlreadyPresent);
assert!(test_object_exists(&tmp, &object_id)?);
let stored_data = repo.read_object(&object_id)?;
assert_eq!(stored_data, test_data);
temp_file.seek(SeekFrom::Start(0))?;
let (object_id_2, method_2) =
repo.ensure_object_from_file(&temp_file, test_data.len() as u64, &mut ctx)?;
assert_eq!(object_id, object_id_2);
assert_eq!(method_2, ObjectStoreMethod::AlreadyPresent);
Ok(())
}
#[tokio::test]
async fn test_fsck_empty_repo() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let result = repo.fsck().await?;
assert!(result.is_ok());
assert_eq!(result.objects_checked, 0);
assert_eq!(result.objects_corrupted, 0);
assert_eq!(result.streams_checked, 0);
assert_eq!(result.streams_corrupted, 0);
assert_eq!(result.images_checked, 0);
assert_eq!(result.images_corrupted, 0);
assert_eq!(result.broken_links, 0);
assert_eq!(result.missing_objects, 0);
assert!(result.errors.is_empty());
Ok(())
}
#[tokio::test]
async fn test_fsck_healthy_repo_with_objects() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj1 = generate_test_data(32 * 1024, 0xAE);
let obj2 = generate_test_data(64 * 1024, 0xEA);
let _obj1_id = repo.ensure_object(&obj1)?;
let _obj2_id: Sha512HashValue = compute_verity(&obj2);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj2)?;
let _stream_id = repo.write_stream(writer, "test-stream", None)?;
repo.sync()?;
let result = repo.fsck().await?;
assert!(result.is_ok(), "fsck should pass: {result}");
assert!(result.objects_checked >= 3);
assert_eq!(result.objects_corrupted, 0);
assert_eq!(result.streams_checked, 1);
assert_eq!(result.streams_corrupted, 0);
assert_eq!(result.broken_links, 0);
assert_eq!(result.missing_objects, 0);
assert!(result.errors.is_empty());
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_corrupted_object() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj = generate_test_data(32 * 1024, 0xAE);
let obj_id = repo.ensure_object(&obj)?;
repo.sync()?;
let hex = obj_id.to_hex();
let (dir, file) = hex.split_at(2);
let obj_path = tmp
.path()
.join("repo")
.join(format!("objects/{dir}/{file}"));
std::fs::remove_file(&obj_path)?;
std::fs::write(&obj_path, b"corrupted data")?;
let result = repo.fsck().await?;
assert!(!result.is_ok(), "fsck should detect corruption");
assert!(
result.objects_corrupted > 0,
"should report corrupted objects"
);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("object-digest-mismatch")),
"errors should mention digest mismatch: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_broken_stream_link() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj = generate_test_data(64 * 1024, 0xEA);
let _obj_verity: Sha512HashValue = compute_verity(&obj);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj)?;
let _stream_id = repo.write_stream(writer, "test-stream", None)?;
repo.sync()?;
let stream_symlink = tmp.path().join("repo/streams/test-stream");
let link_target = std::fs::read_link(&stream_symlink)?;
let backing_path = tmp.path().join("repo/streams").join(&link_target);
std::fs::remove_file(&backing_path)?;
let result = repo.fsck().await?;
assert!(!result.is_ok(), "fsck should detect broken link");
assert!(
result.broken_links > 0,
"should report broken links: {result}"
);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_missing_stream_object_ref() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj = generate_test_data(64 * 1024, 0xEA);
let obj_verity: Sha512HashValue = compute_verity(&obj);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj)?;
let _stream_id = repo.write_stream(writer, "test-stream", None)?;
repo.sync()?;
let hex = obj_verity.to_hex();
let (dir, file) = hex.split_at(2);
let obj_path = tmp
.path()
.join("repo")
.join(format!("objects/{dir}/{file}"));
std::fs::remove_file(&obj_path)?;
let result = repo.fsck().await?;
assert!(!result.is_ok(), "fsck should detect missing object ref");
assert!(
result.missing_objects > 0,
"should report missing objects: {result}"
);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("missing-object-ref")),
"errors should mention missing object: {:?}",
result.errors
);
Ok(())
}
fn open_test_repo_dir(tmp: &tempfile::TempDir) -> cap_std::fs::Dir {
cap_std::fs::Dir::open_ambient_dir(tmp.path().join("repo"), cap_std::ambient_authority())
.unwrap()
}
#[tokio::test]
async fn test_fsck_detects_non_symlink_in_streams() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
dir.create_dir_all("streams")?;
dir.write("streams/bogus-entry", b"not a symlink")?;
let result = repo.fsck().await?;
assert!(!result.is_ok(), "fsck should detect non-symlink in streams");
assert_eq!(result.streams_corrupted, 1);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("entry-not-symlink")),
"errors should mention non-symlink: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_non_symlink_in_images() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
dir.create_dir_all("images")?;
dir.write("images/bogus-image", b"not a symlink")?;
let result = repo.fsck().await?;
assert!(!result.is_ok(), "fsck should detect non-symlink in images");
assert_eq!(result.images_corrupted, 1);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("entry-not-symlink")),
"errors should mention non-symlink: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_broken_ref_symlink() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
dir.create_dir_all("streams/refs")?;
dir.symlink("../nonexistent-stream", "streams/refs/broken-ref")?;
let result = repo.fsck().await?;
assert!(!result.is_ok(), "fsck should detect broken ref symlink");
assert!(result.broken_links > 0, "should report broken links");
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("broken-symlink")
&& e.to_string().contains("refs")),
"errors should mention broken ref symlink: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_refs_dir_unexpected_file_type() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
dir.create_dir_all("streams/refs")?;
dir.write("streams/refs/stray-file", b"should not be here")?;
let result = repo.fsck().await?;
assert!(!result.is_ok(), "fsck should detect unexpected file type");
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("unexpected-file-type")),
"errors should mention unexpected file type: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_refs_dir_recursive() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
dir.create_dir_all("streams/refs/nested/deep")?;
dir.symlink(
"../../../nonexistent-stream",
"streams/refs/nested/deep/broken-nested-ref",
)?;
let result = repo.fsck().await?;
assert!(
!result.is_ok(),
"fsck should detect broken symlink in nested refs"
);
assert!(result.broken_links > 0);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("nested/deep")
&& e.to_string().contains("broken-symlink")),
"error should reference the nested path: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_invalid_object_filename() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
dir.create_dir_all("objects/ab")?;
dir.write("objects/ab/not-a-valid-hex-hash", b"junk")?;
let result = repo.fsck().await?;
assert!(
!result.is_ok(),
"fsck should detect invalid object filename"
);
assert!(result.objects_corrupted > 0);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("object-invalid-name")),
"errors should mention invalid filename: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_broken_image_symlink() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj_size: u64 = 32 * 1024;
let obj = generate_test_data(obj_size, 0xBB);
let obj_id = repo.ensure_object(&obj)?;
let fs = make_test_fs(&obj_id, obj_size);
let image_id = fs.commit_image(&repo, None)?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
let image_rel = format!("images/{}", image_id.to_hex());
let link_target = dir.read_link(&image_rel)?;
let backing_rel = PathBuf::from("images").join(&link_target);
dir.remove_file(&backing_rel)?;
let result = repo.fsck().await?;
assert!(
!result.is_ok(),
"fsck should detect broken image symlink: {result}"
);
assert!(result.broken_links > 0);
assert!(result.images_corrupted > 0);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_missing_named_ref_object() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj = generate_test_data(64 * 1024, 0xEA);
let mut writer1 = repo.create_stream(0)?;
writer1.write_external(&obj)?;
let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
let mut writer2 = repo.create_stream(0)?;
writer2.add_named_stream_ref("test-stream1", &stream1_id);
let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
repo.sync()?;
let hex = stream1_id.to_hex();
let (prefix, rest) = hex.split_at(2);
let repo_dir = open_test_repo_dir(&tmp);
repo_dir.remove_file(format!("objects/{prefix}/{rest}"))?;
let result = repo.fsck().await?;
assert!(
!result.is_ok(),
"fsck should detect missing named ref object"
);
assert!(
result.missing_objects > 0,
"should report missing objects: {result}"
);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("missing-named-ref")),
"errors should mention missing named ref object: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_healthy_repo_with_refs() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj = generate_test_data(64 * 1024, 0xEA);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj)?;
let _stream_id = repo.write_stream(writer, "test-stream", Some("my-ref"))?;
repo.sync()?;
let result = repo.fsck().await?;
assert!(result.is_ok(), "fsck should pass with valid refs: {result}");
assert!(result.errors.is_empty());
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_corrupted_splitstream_object() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj = generate_test_data(64 * 1024, 0xEA);
let mut writer = repo.create_stream(0)?;
writer.write_external(&obj)?;
let _stream_id = repo.write_stream(writer, "test-stream", None)?;
repo.sync()?;
let dir = open_test_repo_dir(&tmp);
let link_target = dir.read_link("streams/test-stream")?;
let backing_rel = PathBuf::from("streams").join(&link_target);
dir.remove_file(&backing_rel)?;
dir.write(&backing_rel, b"corrupted splitstream header")?;
let result = repo.fsck().await?;
assert!(
!result.is_ok(),
"fsck should detect corrupted splitstream: {result}"
);
assert!(
result.objects_corrupted > 0 || result.streams_corrupted > 0,
"should report corruption: {result}"
);
Ok(())
}
#[tokio::test]
async fn test_fsck_validates_erofs_image_objects() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj_size: u64 = 32 * 1024;
let obj = generate_test_data(obj_size, 0xCC);
let obj_id = repo.ensure_object(&obj)?;
let fs = make_test_fs(&obj_id, obj_size);
let image_id = fs.commit_image(&repo, None)?;
repo.sync()?;
let result = repo.fsck().await?;
assert!(result.is_ok(), "healthy image should pass fsck: {result}");
assert!(result.images_checked > 0, "should have checked the image");
let hex = obj_id.to_hex();
let (prefix, rest) = hex.split_at(2);
let dir = open_test_repo_dir(&tmp);
dir.remove_file(format!("objects/{prefix}/{rest}"))?;
let result = repo.fsck().await?;
assert!(
!result.is_ok(),
"fsck should detect missing object referenced by erofs image: {result}"
);
assert!(
result.missing_objects > 0,
"should report missing objects: {result}"
);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains(&image_id.to_hex())
&& e.to_string().contains("image-missing-object")),
"error should reference the image: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_detects_corrupt_erofs_image() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let obj_size: u64 = 32 * 1024;
let obj = generate_test_data(obj_size, 0xDD);
let obj_id = repo.ensure_object(&obj)?;
let fs = make_test_fs(&obj_id, obj_size);
let image_id = fs.commit_image(&repo, None)?;
repo.sync()?;
let hex = image_id.to_hex();
let (prefix, rest) = hex.split_at(2);
let dir = open_test_repo_dir(&tmp);
let obj_path = format!("objects/{prefix}/{rest}");
dir.remove_file(&obj_path)?;
dir.write(&obj_path, b"this is not a valid erofs image")?;
let result = repo.fsck().await?;
assert!(
!result.is_ok(),
"fsck should detect corrupt erofs image: {result}"
);
assert!(
result
.errors
.iter()
.any(|e| e.to_string().contains("image-invalid")
|| e.to_string().contains("digest mismatch")),
"error should mention erofs corruption or digest mismatch: {:?}",
result.errors
);
Ok(())
}
#[tokio::test]
async fn test_fsck_valid_metadata() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let result = repo.fsck().await?;
assert!(result.is_ok());
assert!(result.has_metadata());
assert!(result.errors().is_empty());
assert!(
result.to_string().contains("meta.json: ok"),
"display should show ok: {result}"
);
Ok(())
}
#[tokio::test]
async fn test_fsck_corrupt_metadata() -> Result<()> {
let tmp = tempdir();
let repo = create_test_repo(&tmp.path().join("repo"))?;
let dir = open_test_repo_dir(&tmp);
dir.remove_file(REPO_METADATA_FILENAME)?;
dir.write(REPO_METADATA_FILENAME, b"not valid json {{")?;
let result = repo.fsck().await?;
assert!(!result.is_ok());
assert!(
result
.errors()
.iter()
.any(|e| matches!(e, FsckError::MetadataParseFailed { .. }))
);
assert!(
result.to_string().contains("meta.json: error"),
"display should show error: {result}"
);
Ok(())
}
#[test]
fn test_open_path_requires_metadata() {
let tmp = tempdir();
let path = tmp.path().join("bare-repo");
mkdirat(CWD, &path, Mode::from_raw_mode(0o755)).unwrap();
assert!(matches!(
Repository::<Sha512HashValue>::open_path(CWD, &path),
Err(RepositoryOpenError::MetadataMissing)
));
}
#[test]
fn test_open_path_detects_old_format() {
let tmp = tempdir();
let path = tmp.path().join("old-repo");
mkdirat(CWD, &path, Mode::from_raw_mode(0o755)).unwrap();
mkdirat(CWD, &path.join("objects"), Mode::from_raw_mode(0o755)).unwrap();
assert!(matches!(
Repository::<Sha512HashValue>::open_path(CWD, &path),
Err(RepositoryOpenError::OldFormatRepository)
));
}
#[test]
fn test_open_path_algorithm_mismatch() {
let tmp = tempdir();
let path = tmp.path().join("sha512-repo");
Repository::<Sha512HashValue>::init_path(CWD, &path, Algorithm::SHA512, false).unwrap();
assert!(matches!(
Repository::<Sha256HashValue>::open_path(CWD, &path),
Err(RepositoryOpenError::AlgorithmMismatch { .. })
));
}
#[test]
fn test_metadata_json_with_features() {
let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
meta.features.compatible.push("some-compat".to_string());
meta.features
.read_only_compatible
.push("some-rocompat".to_string());
let json = meta.to_json().unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&json).unwrap();
assert_eq!(parsed["features"]["compatible"][0], "some-compat");
assert_eq!(
parsed["features"]["read-only-compatible"][0],
"some-rocompat"
);
let meta2 = RepoMetadata::from_json(&json).unwrap();
assert_eq!(meta, meta2);
}
#[test]
fn test_feature_flags_unknown_incompat() {
let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
meta.features
.incompatible
.push("fancy-new-thing".to_string());
let err = meta.check_compatible::<Sha512HashValue>().unwrap_err();
assert!(
format!("{err}").contains("fancy-new-thing"),
"error should name the unknown feature: {err}"
);
}
#[test]
fn test_feature_flags_unknown_ro_compat() {
let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
meta.features
.read_only_compatible
.push("new-index".to_string());
let check = meta.check_compatible::<Sha512HashValue>().unwrap();
assert_eq!(check, FeatureCheck::ReadOnly(vec!["new-index".to_string()]));
}
#[test]
fn test_feature_flags_unknown_compat_ignored() {
let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
meta.features.compatible.push("optional-hint".to_string());
assert_eq!(
meta.check_compatible::<Sha512HashValue>().unwrap(),
FeatureCheck::ReadWrite
);
}
#[test]
fn test_object_store_method_variants() {
let methods = [
ObjectStoreMethod::Reflinked,
ObjectStoreMethod::Hardlinked,
ObjectStoreMethod::Copied,
ObjectStoreMethod::AlreadyPresent,
];
for (i, a) in methods.iter().enumerate() {
for (j, b) in methods.iter().enumerate() {
if i == j {
assert_eq!(a, b);
} else {
assert_ne!(a, b);
}
}
}
assert_eq!(format!("{:?}", ObjectStoreMethod::Hardlinked), "Hardlinked");
}
#[test]
fn test_open_upgrade_sha256() {
let tmp = tempdir();
let repo_path = tmp.path().join("repo");
let (repo, _) =
Repository::<Sha256HashValue>::init_path(CWD, &repo_path, Algorithm::SHA256, false)
.unwrap();
let data = b"hello world";
let obj_id = repo.ensure_object(data).unwrap();
drop(repo);
std::fs::remove_file(repo_path.join(REPO_METADATA_FILENAME)).unwrap();
assert!(matches!(
Repository::<Sha256HashValue>::open_path(CWD, &repo_path),
Err(RepositoryOpenError::OldFormatRepository)
));
let (repo, upgraded) =
Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap();
assert!(upgraded);
assert!(repo_path.join(REPO_METADATA_FILENAME).exists());
let meta = read_repo_metadata(
&openat(
CWD,
&repo_path,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.unwrap(),
)
.unwrap()
.unwrap();
assert!(meta.algorithm.is_compatible::<Sha256HashValue>());
let read_data = repo.read_object(&obj_id).unwrap();
assert_eq!(&read_data[..], data);
drop(repo);
let (_repo, upgraded) =
Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap();
assert!(!upgraded);
}
#[test]
fn test_open_upgrade_sha512() {
let tmp = tempdir();
let repo_path = tmp.path().join("repo");
let (repo, _) =
Repository::<Sha512HashValue>::init_path(CWD, &repo_path, Algorithm::SHA512, false)
.unwrap();
let data = b"sha512 test data";
let obj_id = repo.ensure_object(data).unwrap();
drop(repo);
std::fs::remove_file(repo_path.join(REPO_METADATA_FILENAME)).unwrap();
let (repo, upgraded) =
Repository::<Sha512HashValue>::open_upgrade(CWD, &repo_path).unwrap();
assert!(upgraded);
let meta = read_repo_metadata(
&openat(
CWD,
&repo_path,
OFlags::RDONLY | OFlags::CLOEXEC,
Mode::empty(),
)
.unwrap(),
)
.unwrap()
.unwrap();
assert!(meta.algorithm.is_compatible::<Sha512HashValue>());
let read_data = repo.read_object(&obj_id).unwrap();
assert_eq!(&read_data[..], data);
}
#[test]
fn test_open_upgrade_algorithm_mismatch() {
let tmp = tempdir();
let repo_path = tmp.path().join("repo");
let (repo, _) =
Repository::<Sha512HashValue>::init_path(CWD, &repo_path, Algorithm::SHA512, false)
.unwrap();
repo.ensure_object(b"some data").unwrap();
drop(repo);
std::fs::remove_file(repo_path.join(REPO_METADATA_FILENAME)).unwrap();
let err = Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("not compatible"),
"expected algorithm mismatch error, got: {msg}"
);
}
#[test]
fn test_open_upgrade_empty_objects() {
let tmp = tempdir();
let repo_path = tmp.path().join("repo");
mkdirat(CWD, &repo_path, Mode::from_raw_mode(0o755)).unwrap();
mkdirat(CWD, &repo_path.join("objects"), Mode::from_raw_mode(0o755)).unwrap();
let err = Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap_err();
let msg = format!("{err:#}");
assert!(
msg.contains("no objects found"),
"expected 'no objects found' error, got: {msg}"
);
}
#[test]
fn test_open_upgrade_already_initialized() {
let tmp = tempdir();
let repo_path = tmp.path().join("repo");
Repository::<Sha256HashValue>::init_path(CWD, &repo_path, Algorithm::SHA256, false)
.unwrap();
let (_repo, upgraded) =
Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap();
assert!(!upgraded);
}
}