Skip to main content

composefs/
repository.rs

1//! Content-addressable repository for composefs objects.
2//!
3//! This module provides a repository abstraction for storing and retrieving
4//! content-addressed objects, splitstreams, and images with fs-verity
5//! verification and garbage collection support.
6//!
7//! # Repository Layout
8//!
9//! A composefs repository is a directory with the following structure:
10//!
11//! ```text
12//! repository/
13//! ├── objects/                  # Content-addressed object storage
14//! │   ├── 4e/                   # First byte of fs-verity hash (hex)
15//! │   │   └── 67eaccd9fd...     # Remaining bytes of hash
16//! │   └── ...
17//! ├── images/                   # Composefs (erofs) image tracking
18//! │   ├── 4e67eaccd9fd... → ../objects/4e/67eaccd9fd...
19//! │   └── refs/
20//! │       └── myimage → ../../4e67eaccd9fd...
21//! └── streams/                  # Splitstream storage
22//!     ├── oci-config-sha256:... → ../objects/XX/YYY...
23//!     ├── oci-layer-sha256:... → ../objects/XX/YYY...
24//!     └── refs/                 # Named references (GC roots)
25//!         └── mytarball → ../../oci-layer-sha256:...
26//! ```
27//!
28//! # Object Storage
29//!
30//! All content is stored in `objects/` using fs-verity hashes as filenames,
31//! split into 256 subdirectories (`00`-`ff`) by the first byte for filesystem
32//! efficiency. Objects are immutable and deduplicated by content. Every file
33//! must have fs-verity enabled (except in "insecure" mode).
34//!
35//! # Images vs Streams
36//!
37//! The repository distinguishes between two types of derived content:
38//!
39//! - **Images** (`images/`): Composefs/erofs filesystem images that can be mounted.
40//!   These are tracked separately for security: only images produced by the repository
41//!   (via mkcomposefs) should be mounted, to avoid exposing the kernel's filesystem
42//!   code to untrusted data.
43//!
44//! - **Streams** (`streams/`): Splitstreams storing arbitrary data (e.g., OCI
45//!   image layers and configs). Symlinks map content identifiers to objects.
46//!
47//! # References (GC Roots)
48//!
49//! Both `images/refs/` and `streams/refs/` contain named symlinks that serve as
50//! garbage collection roots. Any object reachable from a ref is protected from GC.
51//! Refs can be organized hierarchically (e.g., `refs/myapp/layer1`).
52//!
53//! See [`Repository::name_stream`] for creating stream refs.
54//!
55//! # Garbage Collection
56//!
57//! The repository supports garbage collection via [`Repository::gc()`]. Objects
58//! not reachable from any reference are deleted. The GC algorithm:
59//!
60//! 1. Walks all references in `images/refs/` and `streams/refs/` to find roots
61//! 2. Transitively follows stream references to find all reachable objects
62//! 3. Deletes unreferenced objects, images, and streams
63//!
64//! # fs-verity Integration
65//!
66//! When running on a filesystem that supports fs-verity (ext4, btrfs, etc.), objects
67//! are stored with fs-verity enabled, providing kernel-level integrity verification.
68//! In "insecure" mode, fs-verity is not required, allowing operation on filesystems
69//! like tmpfs or overlayfs.
70//!
71//! # Concurrency
72//!
73//! The repository uses advisory file locking (flock) to coordinate concurrent access.
74//! Opening a repository acquires a shared lock, while garbage collection requires
75//! an exclusive lock. This ensures GC cannot run while other processes have the
76//! repository open.
77//!
78//! For more details, see the [repository design documentation](../../../doc/repository.md).
79
80use std::{
81    collections::{HashMap, HashSet},
82    ffi::{CStr, CString, OsStr, OsString},
83    fmt,
84    fs::{File, canonicalize},
85    io::{BufRead, Read, Write},
86    os::{
87        fd::{AsFd, BorrowedFd, OwnedFd},
88        unix::ffi::OsStrExt,
89    },
90    path::{Path, PathBuf},
91    sync::Arc,
92    thread::available_parallelism,
93};
94
95use log::{debug, trace};
96use tokio::sync::Semaphore;
97
98use anyhow::{Context, Result, bail, ensure};
99use fn_error_context::context;
100use once_cell::sync::OnceCell;
101use rustix::{
102    fs::{
103        Access, AtFlags, CWD, Dir, FileType, FlockOperation, Mode, OFlags, StatVfsMountFlags,
104        accessat, flock, fstatvfs, linkat, mkdirat, openat, readlinkat, statat, syncfs, unlinkat,
105    },
106    io::{Errno, Result as ErrnoResult},
107};
108
109use crate::{
110    erofs::format::{FormatConfig, FormatVersion},
111    fsverity::{
112        Algorithm, CompareVerityError, DEFAULT_LG_BLOCKSIZE, EnableVerityError, FsVerityHashValue,
113        FsVerityHasher, MeasureVerityError, compute_verity, enable_verity_maybe_copy,
114        ensure_verity_equal, has_verity, measure_verity, measure_verity_opt,
115    },
116    mount::{MountOptions, composefs_fsmount, mount_at},
117    shared_internals::IO_BUF_CAPACITY,
118    splitstream::{SplitStreamReader, SplitStreamWriter},
119    util::{ErrnoFilter, proc_self_fd, reopen_tmpfile_ro, replace_symlinkat},
120};
121
122/// The filename used for repository metadata.
123pub const REPO_METADATA_FILENAME: &str = "meta.json";
124
125/// Error marker indicating a named image/ref does not exist in the repository.
126#[derive(Debug, thiserror::Error)]
127#[error("image not found: {name}")]
128pub struct ImageNotFound {
129    /// The name/ref that was not found.
130    pub name: String,
131}
132
133/// Errors that can occur when opening a repository.
134#[derive(Debug, thiserror::Error)]
135pub enum RepositoryOpenError {
136    /// `meta.json` is missing and the directory does not appear to be
137    /// an existing repository.
138    #[error(
139        "{REPO_METADATA_FILENAME} not found; this repository must be initialized with `cfsctl init`"
140    )]
141    MetadataMissing,
142    /// `meta.json` is missing but `objects/` exists, indicating an
143    /// old-format repository that predates `meta.json`.
144    #[error(
145        "{REPO_METADATA_FILENAME} not found; this appears to be an old-format repository — use Repository::open_upgrade() or `cfsctl init` to migrate"
146    )]
147    OldFormatRepository,
148    /// `meta.json` exists but could not be parsed.
149    #[error("failed to parse {REPO_METADATA_FILENAME}")]
150    MetadataInvalid(#[source] serde_json::Error),
151    /// The algorithm in `meta.json` does not match the expected type.
152    #[error("repository algorithm {found} does not match expected {expected}")]
153    AlgorithmMismatch {
154        /// The algorithm found in `meta.json`.
155        found: Algorithm,
156        /// The algorithm expected for this repository type.
157        expected: Algorithm,
158    },
159    /// The repository format version is newer than this tool supports.
160    #[error(
161        "unsupported repository format version {found} (this tool supports up to {REPO_FORMAT_VERSION})"
162    )]
163    UnsupportedVersion {
164        /// The version found in `meta.json`.
165        found: u32,
166    },
167    /// The repository requires features this tool does not understand.
168    #[error("repository requires unknown incompatible features: {0:?}")]
169    IncompatibleFeatures(Vec<String>),
170    /// An I/O error occurred while opening or probing the repository.
171    #[error(transparent)]
172    Io(std::io::Error),
173}
174
175impl From<Errno> for RepositoryOpenError {
176    fn from(e: Errno) -> Self {
177        Self::Io(e.into())
178    }
179}
180
181impl From<std::io::Error> for RepositoryOpenError {
182    fn from(e: std::io::Error) -> Self {
183        Self::Io(e)
184    }
185}
186
187/// The current repository format version.
188///
189/// This is a simple integer that is bumped only for fundamental,
190/// incompatible changes to the repository layout.  Finer-grained
191/// evolution uses the [`FeatureFlags`] system instead.
192pub const REPO_FORMAT_VERSION: u32 = 1;
193
194/// Set of feature flags understood by this version of the code.
195///
196/// When reading a repository whose metadata lists features not in
197/// these sets, the rules are:
198///
199/// - Unknown **compatible** features are silently ignored.
200/// - Unknown **read-only compatible** features allow read operations
201///   but prevent any writes (adding objects, creating images, GC, …).
202/// - Unknown **incompatible** features cause the repository to be
203///   rejected entirely.
204pub mod known_features {
205    /// The ro-compat feature flag for V1-only EROFS repositories.
206    ///
207    /// When present in `read_only_compatible`, the repository generates only V1
208    /// (C-tool compatible) EROFS images.  Old tools that don't recognize this
209    /// flag will open the repository as read-only, preventing accidental V2
210    /// image writes into a V1 repo.
211    pub const V1_EROFS: &str = "v1_erofs";
212
213    /// Compatible features understood by this version.
214    pub const COMPAT: &[&str] = &[];
215    /// Read-only compatible features understood by this version.
216    pub const RO_COMPAT: &[&str] = &[V1_EROFS];
217    /// Incompatible features understood by this version.
218    pub const INCOMPAT: &[&str] = &[];
219}
220
221/// Feature flags for a composefs repository.
222///
223/// Inspired by the ext4/XFS/EROFS on-disk feature model:
224///
225/// - **compatible**: old tools that don't understand these can still
226///   fully read and write the repository.
227/// - **read_only_compatible**: old tools can read but must not write.
228/// - **incompatible**: old tools must refuse to open the repository.
229#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
230#[serde(rename_all = "kebab-case")]
231pub struct FeatureFlags {
232    /// Features that can be safely ignored by older tools.
233    #[serde(default)]
234    pub compatible: Vec<String>,
235
236    /// Features that allow reading but prevent writing by older tools.
237    #[serde(default)]
238    pub read_only_compatible: Vec<String>,
239
240    /// Features that require newer tools; older tools must refuse entirely.
241    #[serde(default)]
242    pub incompatible: Vec<String>,
243}
244
245/// Result of checking repository feature compatibility.
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum FeatureCheck {
248    /// All features are understood; full read-write access.
249    ReadWrite,
250    /// Unknown read-only-compatible features present; read access only.
251    /// The vec contains the unknown feature names.
252    ReadOnly(Vec<String>),
253}
254
255impl FeatureFlags {
256    /// Check these flags against the known feature sets.
257    ///
258    /// Returns an error if any unknown incompatible features are present.
259    /// Returns [`FeatureCheck::ReadOnly`] if unknown ro-compat features
260    /// are present. Returns [`FeatureCheck::ReadWrite`] otherwise.
261    pub fn check(&self) -> Result<FeatureCheck, RepositoryOpenError> {
262        // Check incompatible features first
263        let unknown_incompat: Vec<String> = self
264            .incompatible
265            .iter()
266            .filter(|f| !known_features::INCOMPAT.contains(&f.as_str()))
267            .cloned()
268            .collect();
269        if !unknown_incompat.is_empty() {
270            return Err(RepositoryOpenError::IncompatibleFeatures(unknown_incompat));
271        }
272
273        // Check ro-compat features
274        let unknown_ro: Vec<String> = self
275            .read_only_compatible
276            .iter()
277            .filter(|f| !known_features::RO_COMPAT.contains(&f.as_str()))
278            .cloned()
279            .collect();
280        if !unknown_ro.is_empty() {
281            return Ok(FeatureCheck::ReadOnly(unknown_ro));
282        }
283
284        // Compatible features are ignored by definition
285        Ok(FeatureCheck::ReadWrite)
286    }
287}
288
289/// Repository metadata stored in `meta.json` at the repository root.
290///
291/// This file records the repository's format version, digest algorithm,
292/// feature flags, and EROFS format configuration so that tools can detect
293/// misconfigured invocations (e.g. opening a sha256 repo with `--hash sha512`)
294/// and so the algorithm and format don't need to be specified on every command.
295///
296/// The versioning model is inspired by Linux filesystem superblocks
297/// (ext4, XFS, EROFS): a base version integer for fundamental layout
298/// changes, plus three tiers of feature flags for finer-grained
299/// evolution.
300///
301/// The EROFS format configuration is stored in two ways for compatibility:
302/// - The `erofs_formats` field (new) persists the full [`FormatConfig`] directly.
303/// - The `"v1_erofs"` ro_compat flag (legacy) is kept for old-tool compat.
304///
305/// When reading old repos without an `erofs_formats` field, the serde default
306/// gives `FormatConfig::single(V2)`, which is then overridden to `single(V1)` if
307/// the `"v1_erofs"` flag is present.
308#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
309pub struct RepoMetadata {
310    /// Base repository format version.  Tools must refuse to operate
311    /// on a repository whose version exceeds what they understand.
312    pub version: u32,
313
314    /// The fs-verity algorithm configuration for this repository.
315    pub algorithm: Algorithm,
316
317    /// Feature flags.
318    #[serde(default)]
319    pub features: FeatureFlags,
320
321    /// EROFS format configuration for images produced by this repository.
322    ///
323    /// Persisted directly so that dual-format repositories (e.g. V1+V2) can
324    /// round-trip through `meta.json`.  For repositories created before this
325    /// field existed, the field is absent from JSON and defaults to
326    /// `FormatConfig::single(FormatVersion::V2)`; the `"v1_erofs"` flag is then
327    /// checked for seamless migration of old V1 repos.
328    #[serde(default)]
329    pub erofs_formats: FormatConfig,
330}
331
332impl RepoMetadata {
333    /// Derive the effective EROFS format version for this repository.
334    ///
335    /// Uses `erofs_formats.default` when the field was explicitly set (i.e. is
336    /// not the serde default of `single(V2)`).  For old repos that predate the
337    /// `erofs_formats` field, falls back to the `"v1_erofs"` ro_compat flag:
338    ///
339    /// - `"v1_erofs"` present → [`FormatVersion::V1`]
340    /// - absent → [`FormatVersion::V2`]
341    pub fn erofs_version(&self) -> FormatVersion {
342        self.format_config().default
343    }
344
345    /// Return the effective [`FormatConfig`] for this repository.
346    ///
347    /// If `erofs_formats` was explicitly set in `meta.json` (i.e. it is not the
348    /// serde default `single(V2)`), it is returned as-is.  Otherwise the config
349    /// is derived from the legacy `"v1_erofs"` ro_compat flag for backward
350    /// compatibility with repos created before the `erofs_formats` field existed.
351    pub fn format_config(&self) -> FormatConfig {
352        let default_config = FormatConfig::default();
353        if self.erofs_formats != default_config {
354            // Field was explicitly stored — use it directly.
355            self.erofs_formats.clone()
356        } else if self
357            .features
358            .read_only_compatible
359            .iter()
360            .any(|f| f == known_features::V1_EROFS)
361        {
362            // Legacy V1 repo: flag present but new field absent/default.
363            FormatConfig::single(FormatVersion::V1)
364        } else {
365            // V2 default (new repos or old repos without the flag).
366            default_config
367        }
368    }
369}
370
371impl RepoMetadata {
372    /// Build metadata for a repository using the given hash type, with the default EROFS version.
373    pub fn for_hash<ObjectID: FsVerityHashValue>() -> Self {
374        Self::new_with_formats(
375            Algorithm::for_hash::<ObjectID>(),
376            &FormatConfig::single(FormatVersion::default()),
377        )
378    }
379
380    /// Build metadata from an explicit [`Algorithm`], with the default EROFS format version.
381    pub fn new(algorithm: Algorithm) -> Self {
382        Self::new_with_formats(algorithm, &FormatConfig::single(FormatVersion::default()))
383    }
384
385    /// Build metadata with the correct feature flags for the given [`FormatConfig`].
386    ///
387    /// The on-disk encoding uses a feature flag for compatibility:
388    /// - `erofs_formats` field: stores the full [`FormatConfig`] directly.
389    /// - `"v1_erofs"` ro_compat flag: present when the primary version is V1,
390    ///   so that older tools that don't know `erofs_formats` open the repository
391    ///   read-only rather than writing images in the wrong format.
392    pub fn new_with_formats(algorithm: Algorithm, formats: &FormatConfig) -> Self {
393        let mut features = FeatureFlags::default();
394        if formats.default == FormatVersion::V1 {
395            features
396                .read_only_compatible
397                .push(known_features::V1_EROFS.to_string());
398        }
399        Self {
400            version: REPO_FORMAT_VERSION,
401            algorithm,
402            features,
403            erofs_formats: formats.clone(),
404        }
405    }
406
407    /// Check whether this metadata is compatible with the given hash type.
408    ///
409    /// Validates the base version, feature flags, and algorithm.
410    /// Returns a [`FeatureCheck`] indicating read-write or read-only access.
411    pub fn check_compatible<ObjectID: FsVerityHashValue>(
412        &self,
413    ) -> Result<FeatureCheck, RepositoryOpenError> {
414        if self.version > REPO_FORMAT_VERSION {
415            return Err(RepositoryOpenError::UnsupportedVersion {
416                found: self.version,
417            });
418        }
419        if !self.algorithm.is_compatible::<ObjectID>() {
420            return Err(RepositoryOpenError::AlgorithmMismatch {
421                found: self.algorithm,
422                expected: Algorithm::for_hash::<ObjectID>(),
423            });
424        }
425        let access = self.features.check()?;
426        Ok(access)
427    }
428
429    /// Serialize to pretty-printed JSON with a trailing newline.
430    pub fn to_json(&self) -> Result<Vec<u8>> {
431        let mut buf = serde_json::to_vec_pretty(self).context("serializing repository metadata")?;
432        buf.push(b'\n');
433        Ok(buf)
434    }
435
436    /// Deserialize from JSON bytes.
437    #[context("Parsing repository metadata JSON")]
438    pub fn from_json(data: &[u8]) -> Result<Self> {
439        serde_json::from_slice(data).context("deserializing repository metadata")
440    }
441}
442
443/// Configuration for initializing a new composefs repository.
444///
445/// Passed to [`Repository::init_path`] to specify the algorithm,
446/// fs-verity policy, and default EROFS format version.
447///
448/// fs-verity is **required by default**.  Call [`set_insecure`](Self::set_insecure)
449/// to opt out (e.g. on tmpfs or in tests).
450///
451/// # Examples
452///
453/// ```no_run
454/// use composefs::repository::RepositoryConfig;
455/// use composefs::fsverity::Algorithm;
456///
457/// // Default: SHA-256, fs-verity required, EROFS V2.
458/// let config = RepositoryConfig::default();
459///
460/// // SHA-512 with fs-verity required.
461/// let config = RepositoryConfig::new(Algorithm::SHA512);
462///
463/// // Insecure mode (tmpfs, testing).
464/// let config = RepositoryConfig::default().set_insecure();
465///
466/// // Custom algorithm, insecure.
467/// let config = RepositoryConfig::new(Algorithm::SHA512).set_insecure();
468/// ```
469#[derive(Debug, Clone, PartialEq, Eq)]
470pub struct RepositoryConfig {
471    /// The fs-verity hash algorithm for content-addressed objects.
472    pub algorithm: Algorithm,
473    /// EROFS format configuration for images produced by this repository.
474    ///
475    /// The full [`FormatConfig`] is written to `meta.json` (both the `default`
476    /// primary version and any `extra` versions), so dual-format repos round-trip
477    /// correctly.  The `"v1_erofs"` ro_compat flag is also kept for old-tool compat.
478    ///
479    /// Use `FormatConfig::single(FormatVersion::V1)` for C-tool compatible output,
480    /// or `FormatConfig { default: FormatVersion::V1, extra: [FormatVersion::V2].into() }`
481    /// when both V1 and V2 images should be produced (e.g. for bootc workflows).
482    pub erofs_formats: FormatConfig,
483    /// When `true`, fs-verity is NOT enabled on `meta.json` and is not required
484    /// on stored objects.  Use [`set_insecure`](Self::set_insecure) to set this.
485    insecure: bool,
486}
487
488impl RepositoryConfig {
489    /// Create a config with the given algorithm and all other settings at their defaults
490    /// (fs-verity required, EROFS format = default version only).
491    pub fn new(algorithm: Algorithm) -> Self {
492        Self {
493            algorithm,
494            ..Self::default()
495        }
496    }
497
498    /// Disable fs-verity for this repository.
499    ///
500    /// Suitable for use on filesystems that do not support fs-verity (tmpfs,
501    /// overlayfs) or in test environments.  Returns `self` for chaining.
502    pub fn set_insecure(mut self) -> Self {
503        self.insecure = true;
504        self
505    }
506}
507
508impl Default for RepositoryConfig {
509    fn default() -> Self {
510        Self {
511            algorithm: Algorithm::SHA256,
512            erofs_formats: FormatConfig::single(FormatVersion::default()),
513            insecure: false,
514        }
515    }
516}
517
518/// Read the fs-verity algorithm from a repository's `meta.json`.
519///
520/// This is the public API for determining which algorithm a repository
521/// uses before opening it (needed to choose the correct `ObjectID`
522/// generic parameter for [`Repository::open_path`]).
523///
524/// Returns `Ok(None)` when `meta.json` is absent.
525#[context("Reading repository algorithm")]
526pub fn read_repo_algorithm(repo_fd: &impl AsFd) -> Result<Option<Algorithm>> {
527    Ok(read_repo_metadata(repo_fd)?.map(|m| m.algorithm))
528}
529
530/// Read `meta.json` from a repository directory fd, if it exists.
531///
532/// Returns `Ok(None)` when the file is absent.
533#[context("Reading repository metadata")]
534pub(crate) fn read_repo_metadata(repo_fd: &impl AsFd) -> Result<Option<RepoMetadata>> {
535    match openat(
536        repo_fd,
537        REPO_METADATA_FILENAME,
538        OFlags::RDONLY | OFlags::CLOEXEC,
539        Mode::empty(),
540    ) {
541        Ok(fd) => {
542            let meta = serde_json::from_reader(std::io::BufReader::new(File::from(fd)))
543                .context("parsing meta.json")?;
544            Ok(Some(meta))
545        }
546        Err(Errno::NOENT) => Ok(None),
547        Err(e) => Err(e).context("opening meta.json")?,
548    }
549}
550
551/// Enable fs-verity on an fd, dispatching to the correct hash type
552/// based on the [`Algorithm`].
553fn enable_verity_for_algorithm(
554    dirfd: &impl AsFd,
555    fd: BorrowedFd,
556    algorithm: &Algorithm,
557) -> Result<()> {
558    match algorithm {
559        Algorithm::Sha256 { .. } => {
560            enable_verity_maybe_copy::<crate::fsverity::Sha256HashValue>(dirfd, fd)
561                .context("enabling verity (sha256)")?;
562        }
563        Algorithm::Sha512 { .. } => {
564            enable_verity_maybe_copy::<crate::fsverity::Sha512HashValue>(dirfd, fd)
565                .context("enabling verity (sha512)")?;
566        }
567    }
568    Ok(())
569}
570
571/// Remove algorithm-specific data from a repository directory.
572///
573/// Deletes `streams/`, `images/`, and `meta.json` but preserves
574/// `objects/` (content-addressed blobs that are algorithm-agnostic).
575/// This prepares the repository for re-initialization with a
576/// (potentially different) algorithm via [`Repository::init_path`].
577///
578/// After calling this, streams and images will need to be re-imported.
579#[context("Resetting repository metadata at {}", path.as_ref().display())]
580pub fn reset_metadata(path: impl AsRef<Path>) -> Result<()> {
581    let path = path.as_ref();
582    for dir in ["streams", "images"] {
583        let p = path.join(dir);
584        if p.exists() {
585            std::fs::remove_dir_all(&p).with_context(|| format!("removing {}", p.display()))?;
586        }
587    }
588    let meta_path = path.join(REPO_METADATA_FILENAME);
589    if meta_path.exists() {
590        std::fs::remove_file(&meta_path)
591            .with_context(|| format!("removing {}", meta_path.display()))?;
592    }
593    Ok(())
594}
595
596/// Return the default path for the user-owned composefs repository.
597pub fn user_path() -> Result<PathBuf> {
598    let home = std::env::var("HOME").with_context(|| "$HOME must be set when in user mode")?;
599    Ok(PathBuf::from(home).join(".var/lib/composefs"))
600}
601
602/// Return the default path for the system-global composefs repository.
603pub fn system_path() -> PathBuf {
604    PathBuf::from("/sysroot/composefs")
605}
606
607/// Derive the primary [`FormatConfig`] from a [`RepoMetadata`].
608///
609/// Delegates to [`RepoMetadata::format_config`], which prefers the explicit
610/// `erofs_formats` field when present and falls back to the legacy `"v1_erofs"`
611/// ro_compat flag for backward compatibility.
612fn repo_format_config_from_meta(meta: &RepoMetadata) -> FormatConfig {
613    meta.format_config()
614}
615
616/// Write `meta.json` into a repository directory fd.
617///
618/// This atomically writes (via O_TMPFILE + linkat) the metadata file.
619/// It will fail if the file already exists.
620///
621/// If `enable_verity` is true, fs-verity is enabled on `meta.json`
622/// before linking it into place.  This signals to future
623/// [`Repository::open_path`] callers that verity is required on all
624/// objects.
625#[context("Writing repository metadata")]
626pub(crate) fn write_repo_metadata(
627    repo_fd: &impl AsFd,
628    meta: &RepoMetadata,
629    enable_verity: bool,
630) -> Result<()> {
631    let data = meta.to_json()?;
632
633    // Try O_TMPFILE for atomic creation
634    match openat(
635        repo_fd,
636        ".",
637        OFlags::WRONLY | OFlags::TMPFILE | OFlags::CLOEXEC,
638        Mode::from_raw_mode(0o644),
639    ) {
640        Ok(fd) => {
641            let mut file = File::from(fd);
642            file.write_all(&data)
643                .context("writing metadata to tmpfile")?;
644            file.sync_all().context("syncing metadata tmpfile")?;
645
646            let ro_fd = reopen_tmpfile_ro(file).context("re-opening tmpfile read-only")?;
647
648            if enable_verity {
649                enable_verity_for_algorithm(repo_fd, ro_fd.as_fd(), &meta.algorithm)
650                    .context("enabling verity on meta.json")?;
651            }
652
653            linkat(
654                CWD,
655                proc_self_fd(&ro_fd),
656                repo_fd,
657                REPO_METADATA_FILENAME,
658                AtFlags::SYMLINK_FOLLOW,
659            )
660            .context("linking meta.json into repository")?;
661        }
662        Err(Errno::OPNOTSUPP | Errno::NOSYS) => {
663            // Fallback: direct create (no tmpfs O_TMPFILE support).
664            // Use O_EXCL to avoid overwriting, and fsync to ensure the
665            // file is complete on disk before we consider init done.
666            let fd = openat(
667                repo_fd,
668                REPO_METADATA_FILENAME,
669                OFlags::WRONLY | OFlags::CREATE | OFlags::EXCL | OFlags::CLOEXEC,
670                Mode::from_raw_mode(0o644),
671            )
672            .context("creating meta.json")?;
673            let mut file = File::from(fd);
674            file.write_all(&data).context("writing meta.json")?;
675            file.sync_all().context("syncing meta.json to disk")?;
676
677            if enable_verity {
678                let ro_fd = openat(
679                    repo_fd,
680                    REPO_METADATA_FILENAME,
681                    OFlags::RDONLY | OFlags::CLOEXEC,
682                    Mode::empty(),
683                )
684                .context("re-opening meta.json for verity")?;
685                drop(file);
686                enable_verity_for_algorithm(repo_fd, ro_fd.as_fd(), &meta.algorithm)
687                    .context("enabling verity on meta.json")?;
688            }
689        }
690        Err(e) => {
691            return Err(e).context("creating tmpfile for meta.json")?;
692        }
693    }
694    Ok(())
695}
696
697/// Infer repository metadata by examining existing objects.
698///
699/// Walks `objects/` to find any stored object, determines the hash
700/// algorithm from the filename length, and probes for fs-verity.
701///
702/// Returns `(Algorithm, has_verity)` or an error if the objects
703/// directory is empty or the algorithm can't be determined.
704fn infer_metadata(repo_fd: &OwnedFd) -> Result<(Algorithm, bool)> {
705    let objects_fd = openat(
706        repo_fd,
707        "objects",
708        OFlags::RDONLY | OFlags::CLOEXEC,
709        Mode::empty(),
710    )
711    .context("opening objects/ directory")?;
712
713    let dir = Dir::read_from(&objects_fd).context("reading objects/ directory")?;
714
715    for entry in dir {
716        let entry = entry.context("reading objects/ directory entry")?;
717        let subdir_name = entry.file_name().to_bytes();
718
719        if subdir_name == b"." || subdir_name == b".." {
720            continue;
721        }
722
723        // Each subdirectory should be a 2-char hex prefix
724        if subdir_name.len() != 2 {
725            continue;
726        }
727
728        let subdir_fd = openat(
729            &objects_fd,
730            entry.file_name(),
731            OFlags::RDONLY | OFlags::CLOEXEC,
732            Mode::empty(),
733        )
734        .with_context(|| {
735            format!(
736                "opening objects/{} subdirectory",
737                entry.file_name().to_string_lossy()
738            )
739        })?;
740
741        let subdir = Dir::read_from(&subdir_fd).context("reading object subdirectory")?;
742        for obj_entry in subdir {
743            let obj_entry = obj_entry.context("reading object subdirectory entry")?;
744            let obj_name = obj_entry.file_name().to_bytes();
745
746            if obj_name == b"." || obj_name == b".." {
747                continue;
748            }
749
750            // Infer algorithm from filename length.
751            // Objects are stored as objects/XX/<remaining_hex>, where XX is the first
752            // byte (2 hex chars). The filename is the remaining bytes in hex.
753            // SHA-256: 32 bytes total → 62 hex char filename
754            // SHA-512: 64 bytes total → 126 hex char filename
755            let algorithm = match obj_name.len() {
756                62 => Algorithm::Sha256 {
757                    lg_blocksize: DEFAULT_LG_BLOCKSIZE,
758                },
759                126 => Algorithm::Sha512 {
760                    lg_blocksize: DEFAULT_LG_BLOCKSIZE,
761                },
762                _ => continue,
763            };
764
765            let obj_fd = openat(
766                &subdir_fd,
767                obj_entry.file_name(),
768                OFlags::RDONLY | OFlags::CLOEXEC,
769                Mode::empty(),
770            )
771            .with_context(|| {
772                format!(
773                    "opening object file {}",
774                    obj_entry.file_name().to_string_lossy()
775                )
776            })?;
777
778            let has_verity =
779                has_verity(&obj_fd, algorithm).context("probing fs-verity on object")?;
780
781            return Ok((algorithm, has_verity));
782        }
783    }
784
785    bail!("no objects found in repository — cannot infer metadata");
786}
787
788/// Infer the repository algorithm by examining existing object filenames.
789///
790/// This is useful when `meta.json` is missing (old-format repos) and the
791/// caller needs to determine the hash type before constructing a typed
792/// [`Repository`].  For example, the CLI uses this to pick the correct
793/// `ObjectID` generic parameter before calling [`Repository::open_upgrade`].
794///
795/// Returns the inferred [`Algorithm`], or an error if the objects
796/// directory is empty or contains no recognizable filenames.
797pub fn infer_repo_algorithm(repo_fd: &OwnedFd) -> Result<Algorithm> {
798    Ok(infer_metadata(repo_fd)?.0)
799}
800
801/// How an object was stored in the repository.
802///
803/// Returned by [`Repository::ensure_object_from_file`] to indicate
804/// whether the operation used zero-copy reflinks, a regular copy, or found
805/// an existing object.
806#[derive(Debug, Clone, Copy, PartialEq, Eq)]
807pub enum ObjectStoreMethod {
808    /// Object was stored via reflink (zero-copy, FICLONE ioctl).
809    Reflinked,
810    /// Object was stored via hardlink (zero-copy, source file linked directly).
811    Hardlinked,
812    /// Object was stored via regular file copy (reflink not supported).
813    Copied,
814    /// Object already existed in the repository (deduplicated).
815    AlreadyPresent,
816}
817
818/// Per-operation context for [`Repository::ensure_object_from_file`].
819///
820/// Create one of these at the start of a bulk import operation (e.g. importing
821/// all layers of a container image) and pass it to every
822/// `ensure_object_from_file` call.  The context caches which
823/// `(source_device, dest_device)` pairs do not support reflinks, so that
824/// after the first `EOPNOTSUPP` / `EXDEV` on a given pair, subsequent
825/// calls skip the FICLONE probe entirely.
826///
827/// This correctly handles multi-store imports where layers may come from
828/// different filesystems: a reflink failure on ext4→xfs does not suppress
829/// reflink attempts for a later xfs→xfs pair.
830#[derive(Debug, Default)]
831pub struct ImportContext {
832    /// Device-ID pairs where FICLONE has already failed.  Stored as
833    /// `(source_dev, dest_dev)` from `fstat().st_dev`.
834    reflink_unsupported_devs: Vec<(u64, u64)>,
835}
836
837impl ImportContext {
838    /// Check whether reflinks are known to be unsupported for this
839    /// source→destination device pair.
840    pub(crate) fn is_reflink_unsupported(&self, src_dev: u64, dst_dev: u64) -> bool {
841        self.reflink_unsupported_devs
842            .iter()
843            .any(|&(s, d)| s == src_dev && d == dst_dev)
844    }
845
846    /// Record that reflinks are unsupported for this device pair.
847    pub(crate) fn mark_reflink_unsupported(&mut self, src_dev: u64, dst_dev: u64) {
848        if !self.is_reflink_unsupported(src_dev, dst_dev) {
849            self.reflink_unsupported_devs.push((src_dev, dst_dev));
850        }
851    }
852}
853
854/// Call openat() on the named subdirectory of "dirfd", possibly creating it first.
855///
856/// We assume that the directory will probably exist (ie: we try the open first), and on ENOENT, we
857/// mkdirat() and retry.
858fn ensure_dir_and_openat(dirfd: impl AsFd, filename: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
859    match openat(
860        &dirfd,
861        filename,
862        flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
863        0o666.into(),
864    ) {
865        Ok(file) => Ok(file),
866        Err(Errno::NOENT) => match mkdirat(&dirfd, filename, 0o777.into()) {
867            Ok(()) | Err(Errno::EXIST) => openat(
868                dirfd,
869                filename,
870                flags | OFlags::CLOEXEC | OFlags::DIRECTORY,
871                0o666.into(),
872            ),
873            Err(other) => Err(other),
874        },
875        Err(other) => Err(other),
876    }
877}
878
879/// Create a directory under `dirfd` if it doesn't already exist.
880///
881/// Returns `Ok(())` on success or if the directory already exists.
882/// Propagates all other errors from `mkdirat`.
883fn ensure_dir_at(dirfd: impl AsFd, path: &str, mode: Mode) -> ErrnoResult<()> {
884    match mkdirat(dirfd, path, mode) {
885        Ok(()) | Err(Errno::EXIST) => Ok(()),
886        Err(e) => Err(e),
887    }
888}
889
890/// A zero-sized proof token confirming that a [`Repository`] is writable.
891///
892/// Obtained by calling [`Repository::ensure_writable`], which performs a fast
893/// `faccessat(2)` check.  Write methods on `Repository` require this token
894/// internally so that the writable check is performed exactly once per
895/// top-level operation rather than at every leaf call.
896///
897/// Because the type is zero-sized, passing it around has no runtime cost.
898#[derive(Debug, Clone, Copy)]
899pub(crate) struct WritableRepo;
900
901/// A content-addressable repository for composefs objects.
902///
903/// Stores content-addressed objects, splitstreams, and images with fsverity
904/// verification. Objects are stored by their fsverity digest, streams by SHA256
905/// content hash, and both support named references for persistence across
906/// garbage collection.
907pub struct Repository<ObjectID: FsVerityHashValue> {
908    repository: OwnedFd,
909    objects: OnceCell<OwnedFd>,
910    write_semaphore: OnceCell<Arc<Semaphore>>,
911    /// Optional override for the number of concurrent object writes.
912    /// Set via [`set_write_concurrency`](Self::set_write_concurrency) before the semaphore
913    /// is first used; if `None`, defaults to [`available_parallelism`].
914    write_concurrency: Option<usize>,
915    insecure: bool,
916    metadata: RepoMetadata,
917    /// Per-invocation EROFS version override set by [`set_erofs_version`](Self::set_erofs_version).
918    /// Does not rewrite `meta.json`; only affects this `Repository` instance.
919    erofs_version_override: Option<FormatVersion>,
920    /// When true, SplitStreamWriter::done() writes old-format (pre-repr(C))
921    /// headers. Used to test backward compatibility with splitstreams
922    /// written before #[repr(C)] was added to SplitstreamHeader.
923    #[cfg(any(test, feature = "test"))]
924    write_old_splitstream_format: std::sync::atomic::AtomicBool,
925    _data: std::marker::PhantomData<ObjectID>,
926}
927
928impl<ObjectID: FsVerityHashValue> std::fmt::Debug for Repository<ObjectID> {
929    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
930        f.debug_struct("Repository")
931            .field("repository", &self.repository)
932            .field("objects", &self.objects)
933            .field("insecure", &self.insecure)
934            .finish_non_exhaustive()
935    }
936}
937
938impl<ObjectID: FsVerityHashValue> Drop for Repository<ObjectID> {
939    fn drop(&mut self) {
940        flock(&self.repository, FlockOperation::Unlock).expect("repository unlock failed");
941    }
942}
943
944/// For Repository::gc_category
945enum GCCategoryWalkMode {
946    RefsOnly,
947    AllEntries,
948}
949
950/// Statistics from a garbage collection operation.
951///
952/// Returned by [`Repository::gc`] to report what was (or would be) removed.
953#[derive(Debug, Clone, Default, PartialEq, Eq)]
954#[cfg_attr(
955    feature = "varlink",
956    derive(serde::Serialize, serde::Deserialize, zlink_core::introspect::Type),
957    zlink(crate = "zlink_core")
958)]
959pub struct GcResult {
960    /// Number of unreferenced objects removed (or that would be removed)
961    pub objects_removed: u64,
962    /// Total bytes of object data removed (or that would be removed)
963    pub objects_bytes: u64,
964    /// Number of broken symlinks removed in images/
965    pub images_pruned: u64,
966    /// Number of broken symlinks removed in streams/
967    pub streams_pruned: u64,
968}
969
970/// A structured error found during a filesystem consistency check.
971///
972/// Each variant corresponds to a specific kind of repository integrity problem.
973/// The `Display` implementation produces a kebab-case error type prefix followed
974/// by the path/context and any relevant details, suitable for both human display
975/// and structured logging.
976#[derive(Debug, Clone, serde::Serialize, thiserror::Error)]
977#[serde(tag = "type", rename_all = "kebab-case")]
978#[non_exhaustive]
979#[allow(missing_docs)]
980pub enum FsckError {
981    #[error("fsck: object-invalid-name: {path}: {detail}")]
982    ObjectInvalidName { path: String, detail: String },
983
984    #[error("fsck: object-open-failed: {path}: {detail}")]
985    ObjectOpenFailed { path: String, detail: String },
986
987    #[error("fsck: object-digest-mismatch: {path}: measured {measured}")]
988    ObjectDigestMismatch { path: String, measured: String },
989
990    #[error("fsck: object-verity-failed: {path}: {detail}")]
991    ObjectVerityFailed { path: String, detail: String },
992
993    #[error("fsck: object-verity-missing: {path}")]
994    ObjectVerityMissing { path: String },
995
996    #[error("fsck: entry-not-symlink: {path}")]
997    EntryNotSymlink { path: String },
998
999    #[error("fsck: broken-symlink: {path}")]
1000    BrokenSymlink { path: String },
1001
1002    #[error("fsck: stat-failed: {path}: {detail}")]
1003    StatFailed { path: String, detail: String },
1004
1005    #[error("fsck: unexpected-file-type: {path}: {detail}")]
1006    UnexpectedFileType { path: String, detail: String },
1007
1008    #[error("fsck: stream-open-failed: {path}: {detail}")]
1009    StreamOpenFailed { path: String, detail: String },
1010
1011    #[error("fsck: missing-object-ref: {path}: {object_id}")]
1012    #[serde(rename_all = "camelCase")]
1013    MissingObjectRef { path: String, object_id: String },
1014
1015    #[error("fsck: stream-read-failed: {path}: {detail}")]
1016    StreamReadFailed { path: String, detail: String },
1017
1018    #[error("fsck: missing-named-ref: {path}: ref {ref_name}: {object_id}")]
1019    #[serde(rename_all = "camelCase")]
1020    MissingNamedRef {
1021        path: String,
1022        ref_name: String,
1023        object_id: String,
1024    },
1025
1026    #[error("fsck: object-check-failed: {path}: {object_id}: {detail}")]
1027    #[serde(rename_all = "camelCase")]
1028    ObjectCheckFailed {
1029        path: String,
1030        object_id: String,
1031        detail: String,
1032    },
1033
1034    #[error("fsck: image-open-failed: {path}: {detail}")]
1035    ImageOpenFailed { path: String, detail: String },
1036
1037    #[error("fsck: image-read-failed: {path}: {detail}")]
1038    ImageReadFailed { path: String, detail: String },
1039
1040    #[error("fsck: image-invalid: {path}: {detail}")]
1041    ImageInvalid { path: String, detail: String },
1042
1043    #[error("fsck: image-missing-object: {path}: {object_id}")]
1044    #[serde(rename_all = "camelCase")]
1045    ImageMissingObject { path: String, object_id: String },
1046
1047    #[error("fsck: metadata-parse-failed: meta.json: {detail}")]
1048    MetadataParseFailed { detail: String },
1049
1050    #[error(
1051        "fsck: metadata-algorithm-mismatch: meta.json: expected {expected}, repository opened as {actual}"
1052    )]
1053    MetadataAlgorithmMismatch { expected: String, actual: String },
1054}
1055
1056/// Results from a filesystem consistency check.
1057///
1058/// Returned by [`Repository::fsck`] to report repository integrity status.
1059#[derive(Debug, Clone, Default, serde::Serialize)]
1060#[serde(rename_all = "camelCase")]
1061pub struct FsckResult {
1062    /// Whether the repository has a `meta.json` metadata file.
1063    pub has_metadata: bool,
1064    /// Number of objects whose fs-verity digests were verified.
1065    pub objects_checked: u64,
1066    /// Number of objects found to have a bad fs-verity digest.
1067    pub objects_corrupted: u64,
1068    /// Number of splitstreams verified.
1069    pub streams_checked: u64,
1070    /// Number of splitstreams with issues (bad header, missing refs, etc.).
1071    pub streams_corrupted: u64,
1072    /// Number of images verified.
1073    pub images_checked: u64,
1074    /// Number of images with issues.
1075    pub images_corrupted: u64,
1076    /// Number of broken symlinks found.
1077    pub broken_links: u64,
1078    /// Number of missing objects referenced by streams.
1079    pub missing_objects: u64,
1080    /// Structured descriptions of each error found.
1081    pub errors: Vec<FsckError>,
1082}
1083
1084impl FsckResult {
1085    /// Whether the repository has a `meta.json` file.
1086    pub fn has_metadata(&self) -> bool {
1087        self.has_metadata
1088    }
1089
1090    /// Returns true if no corruption or errors were found.
1091    pub fn is_ok(&self) -> bool {
1092        debug_assert!(
1093            self.objects_corrupted == 0
1094                && self.streams_corrupted == 0
1095                && self.images_corrupted == 0
1096                && self.broken_links == 0
1097                && self.missing_objects == 0
1098                || !self.errors.is_empty(),
1099            "corruption counters are non-zero but no error messages recorded"
1100        );
1101        self.errors.is_empty()
1102    }
1103
1104    /// Number of objects verified.
1105    pub fn objects_checked(&self) -> u64 {
1106        self.objects_checked
1107    }
1108
1109    /// Number of objects with bad fsverity digests.
1110    pub fn objects_corrupted(&self) -> u64 {
1111        self.objects_corrupted
1112    }
1113
1114    /// Number of streams verified.
1115    pub fn streams_checked(&self) -> u64 {
1116        self.streams_checked
1117    }
1118
1119    /// Number of streams with issues (bad header, missing refs, etc.).
1120    pub fn streams_corrupted(&self) -> u64 {
1121        self.streams_corrupted
1122    }
1123
1124    /// Number of images verified.
1125    pub fn images_checked(&self) -> u64 {
1126        self.images_checked
1127    }
1128
1129    /// Number of images with issues.
1130    pub fn images_corrupted(&self) -> u64 {
1131        self.images_corrupted
1132    }
1133
1134    /// Number of broken symlinks found.
1135    pub fn broken_links(&self) -> u64 {
1136        self.broken_links
1137    }
1138
1139    /// Number of missing objects referenced by streams.
1140    pub fn missing_objects(&self) -> u64 {
1141        self.missing_objects
1142    }
1143
1144    /// Errors found during the check.
1145    pub fn errors(&self) -> &[FsckError] {
1146        &self.errors
1147    }
1148}
1149
1150impl fmt::Display for FsckResult {
1151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1152        let metadata_errors = self.errors.iter().any(|e| {
1153            matches!(
1154                e,
1155                FsckError::MetadataParseFailed { .. } | FsckError::MetadataAlgorithmMismatch { .. }
1156            )
1157        });
1158        if metadata_errors {
1159            writeln!(f, "meta.json: error")?;
1160        } else if self.has_metadata {
1161            writeln!(f, "meta.json: ok")?;
1162        } else {
1163            writeln!(f, "meta.json: absent")?;
1164        }
1165        writeln!(
1166            f,
1167            "objects: {}/{} ok",
1168            self.objects_checked.saturating_sub(self.objects_corrupted),
1169            self.objects_checked
1170        )?;
1171        writeln!(
1172            f,
1173            "streams: {}/{} ok",
1174            self.streams_checked.saturating_sub(self.streams_corrupted),
1175            self.streams_checked
1176        )?;
1177        writeln!(
1178            f,
1179            "images: {}/{} ok",
1180            self.images_checked.saturating_sub(self.images_corrupted),
1181            self.images_checked
1182        )?;
1183        if self.broken_links > 0 {
1184            writeln!(f, "broken symlinks: {}", self.broken_links)?;
1185        }
1186        if self.missing_objects > 0 {
1187            writeln!(f, "missing objects: {}", self.missing_objects)?;
1188        }
1189        if self.errors.is_empty() {
1190            writeln!(f, "status: ok")?;
1191        } else {
1192            writeln!(f, "status: {} error(s)", self.errors.len())?;
1193            for err in &self.errors {
1194                writeln!(f, "  - {err}")?;
1195            }
1196        }
1197        Ok(())
1198    }
1199}
1200
1201impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
1202    /// Enable or disable writing old-format splitstream headers.
1203    ///
1204    /// When enabled, all splitstreams created via [`create_stream`] will be
1205    /// written with the pre-`repr(C)` header layout, simulating data written
1206    /// by composefs-rs versions before the `#[repr(C)]` fix.
1207    #[cfg(any(test, feature = "test"))]
1208    pub fn set_write_old_splitstream_format(&self, enabled: bool) {
1209        self.write_old_splitstream_format
1210            .store(enabled, std::sync::atomic::Ordering::Relaxed);
1211    }
1212
1213    /// Whether splitstream writers should use the old (pre-repr(C)) header format.
1214    #[cfg(any(test, feature = "test"))]
1215    pub(crate) fn write_old_splitstream_format(&self) -> bool {
1216        self.write_old_splitstream_format
1217            .load(std::sync::atomic::Ordering::Relaxed)
1218    }
1219
1220    /// Return the objects directory.
1221    pub fn objects_dir(&self) -> ErrnoResult<&OwnedFd> {
1222        self.objects
1223            .get_or_try_init(|| ensure_dir_and_openat(&self.repository, "objects", OFlags::PATH))
1224    }
1225
1226    /// Override the maximum number of concurrent object writes.
1227    ///
1228    /// Must be called before the first use of [`write_semaphore`](Self::write_semaphore);
1229    /// has no effect if the semaphore has already been initialized.
1230    pub fn set_write_concurrency(&mut self, n: usize) {
1231        // Guard: the semaphore is lazily initialized on first use. If it's
1232        // already been initialized, this call has no effect. Callers must
1233        // set concurrency before any write operations begin.
1234        debug_assert!(
1235            self.write_semaphore.get().is_none(),
1236            "set_write_concurrency called after write_semaphore was already initialized; \
1237             call this before any write operations"
1238        );
1239        if self.write_semaphore.get().is_some() {
1240            log::warn!(
1241                "set_write_concurrency called after semaphore was already initialized; ignoring"
1242            );
1243            return;
1244        }
1245        self.write_concurrency = Some(n);
1246    }
1247
1248    /// Return a shared semaphore for limiting concurrent object writes.
1249    ///
1250    /// This semaphore is lazily initialized with `available_parallelism()` permits
1251    /// (or the value set via [`set_write_concurrency`](Self::set_write_concurrency)),
1252    /// and shared across all operations on this repository. Use this to limit
1253    /// concurrent I/O when processing multiple files or layers in parallel.
1254    pub fn write_semaphore(&self) -> Arc<Semaphore> {
1255        self.write_semaphore
1256            .get_or_init(|| {
1257                let max_concurrent = self
1258                    .write_concurrency
1259                    .unwrap_or_else(|| available_parallelism().map(|n| n.get()).unwrap_or(4));
1260                Arc::new(Semaphore::new(max_concurrent))
1261            })
1262            .clone()
1263    }
1264
1265    /// Initialize a new repository at the target path and open it.
1266    ///
1267    /// Creates the directory (mode 0700) if it does not exist, writes
1268    /// `meta.json` using the parameters from `config`, and returns the opened
1269    /// repository together with a flag indicating whether this was a
1270    /// fresh initialization (`true`) or an idempotent open of an
1271    /// existing repository with the same algorithm (`false`).
1272    ///
1273    /// The `config.algorithm` must be compatible with this repository's
1274    /// `ObjectID` type (e.g. `Algorithm::Sha512` for
1275    /// `Repository<Sha512HashValue>`).
1276    ///
1277    /// Unless `config` has been made insecure via [`RepositoryConfig::set_insecure`],
1278    /// fs-verity is enabled on `meta.json`, signaling that all objects must also have verity.
1279    ///
1280    /// If `meta.json` already exists with a different algorithm, an
1281    /// error is returned.
1282    #[context("Initializing repository at {}", path.as_ref().display())]
1283    pub fn init_path(
1284        dirfd: impl AsFd,
1285        path: impl AsRef<Path>,
1286        config: RepositoryConfig,
1287    ) -> Result<(Self, bool)> {
1288        let path = path.as_ref();
1289        let RepositoryConfig {
1290            algorithm,
1291            erofs_formats,
1292            insecure,
1293        } = config;
1294        let require_fsverity = !insecure;
1295
1296        if !algorithm.is_compatible::<ObjectID>() {
1297            bail!(
1298                "algorithm {} is not compatible with this repository type (expected {})",
1299                algorithm,
1300                Algorithm::for_hash::<ObjectID>(),
1301            );
1302        }
1303
1304        mkdirat(&dirfd, path, Mode::from_raw_mode(0o700))
1305            .or_else(|e| if e == Errno::EXIST { Ok(()) } else { Err(e) })
1306            .with_context(|| format!("creating repository directory {}", path.display()))?;
1307
1308        let repo_fd = openat(
1309            &dirfd,
1310            path,
1311            OFlags::RDONLY | OFlags::CLOEXEC,
1312            Mode::empty(),
1313        )
1314        .with_context(|| format!("opening repository directory {}", path.display()))?;
1315
1316        let meta = RepoMetadata::new_with_formats(algorithm, &erofs_formats);
1317
1318        // Try to write meta.json.  If it already exists, check for
1319        // idempotency: same config is fine; certain upgrades are allowed;
1320        // incompatible changes are errors.
1321        if let Err(write_err) = write_repo_metadata(&repo_fd, &meta, require_fsverity) {
1322            match read_repo_metadata(&repo_fd)? {
1323                Some(existing) if existing == meta => {
1324                    // Idempotent: same config, already initialized.
1325                    let repo = Self::open_path(dirfd, path)?;
1326                    return Ok((repo, false));
1327                }
1328                Some(existing) => {
1329                    bail!(
1330                        "repository already initialized with different configuration \
1331                         (algorithm: {}, erofs_version: {:?}); \
1332                         cannot re-initialize with (algorithm: {}, erofs_version: {:?})",
1333                        existing.algorithm,
1334                        existing.erofs_version(),
1335                        meta.algorithm,
1336                        meta.erofs_version(),
1337                    );
1338                }
1339                None => {
1340                    // meta.json doesn't exist, so the write failure
1341                    // was something else — propagate original error.
1342                    return Err(write_err);
1343                }
1344            }
1345        }
1346
1347        drop(repo_fd);
1348        let repo = Self::open_path(dirfd, path)?;
1349        Ok((repo, true))
1350    }
1351
1352    /// Open a repository at the target directory and path.
1353    ///
1354    /// `meta.json` is read, parsed, and validated against this
1355    /// repository's `ObjectID` type.  Parsing or compatibility errors
1356    /// are propagated immediately so that broken metadata is never
1357    /// silently ignored.
1358    ///
1359    /// The repository's security mode is auto-detected: if `meta.json`
1360    /// has fs-verity enabled the repo requires verity on all objects
1361    /// (secure mode).  Otherwise the repository operates in insecure
1362    /// mode.  Use [`set_insecure`] to override after opening.
1363    pub fn open_path(
1364        dirfd: impl AsFd,
1365        path: impl AsRef<Path>,
1366    ) -> Result<Self, RepositoryOpenError> {
1367        let path = path.as_ref();
1368
1369        // O_PATH isn't enough because flock()
1370        let repository = openat(dirfd, path, OFlags::RDONLY | OFlags::CLOEXEC, Mode::empty())?;
1371
1372        flock(&repository, FlockOperation::LockShared)?;
1373
1374        // Read, parse, and validate meta.json up front so that broken
1375        // or incompatible metadata is caught immediately rather than
1376        // being discovered lazily on first use.
1377        let (metadata, has_verity) = Self::read_and_probe_metadata(&repository)?;
1378        metadata.check_compatible::<ObjectID>()?;
1379
1380        Ok(Self {
1381            repository,
1382            objects: OnceCell::new(),
1383            write_semaphore: OnceCell::new(),
1384            write_concurrency: None,
1385            insecure: !has_verity,
1386            metadata,
1387            erofs_version_override: None,
1388            #[cfg(any(test, feature = "test"))]
1389            write_old_splitstream_format: std::sync::atomic::AtomicBool::new(false),
1390            _data: std::marker::PhantomData,
1391        })
1392    }
1393
1394    /// Open a repository, upgrading old-format repos that lack `meta.json`.
1395    ///
1396    /// This method first tries [`open_path`](Self::open_path). If that fails
1397    /// with [`OldFormatRepository`](RepositoryOpenError::OldFormatRepository),
1398    /// it infers the algorithm and verity mode from existing objects,
1399    /// writes `meta.json`, and retries the open.
1400    ///
1401    /// This is the non-destructive upgrade path for repositories created
1402    /// by composefs-rs versions that predated `meta.json`.
1403    ///
1404    /// Returns `(repo, upgraded)` where `upgraded` is true if `meta.json`
1405    /// was written.
1406    pub fn open_upgrade(dirfd: impl AsFd, path: impl AsRef<Path>) -> Result<(Self, bool)> {
1407        let path = path.as_ref();
1408
1409        match Self::open_path(&dirfd, path) {
1410            Ok(repo) => Ok((repo, false)),
1411            Err(RepositoryOpenError::OldFormatRepository) => {
1412                let repo_fd = openat(
1413                    &dirfd,
1414                    path,
1415                    OFlags::RDONLY | OFlags::CLOEXEC,
1416                    Mode::empty(),
1417                )
1418                .with_context(|| format!("opening repository directory {}", path.display()))?;
1419
1420                let (algorithm, has_verity) = infer_metadata(&repo_fd)?;
1421
1422                if !algorithm.is_compatible::<ObjectID>() {
1423                    bail!(
1424                        "inferred algorithm {} is not compatible with this repository type \
1425                         (expected {})",
1426                        algorithm,
1427                        Algorithm::for_hash::<ObjectID>(),
1428                    );
1429                }
1430
1431                // Use `new` (no `v1_erofs` flag) for legacy repos
1432                // that pre-date the FormatConfig feature.  No feature flags → V2-only,
1433                // which is correct: old repos used V2 exclusively.
1434                let meta = RepoMetadata::new(algorithm);
1435                write_repo_metadata(&repo_fd, &meta, has_verity)?;
1436
1437                drop(repo_fd);
1438
1439                let repo = Self::open_path(&dirfd, path)
1440                    .context("opening repository after writing meta.json")?;
1441
1442                Ok((repo, true))
1443            }
1444            Err(other) => Err(other.into()),
1445        }
1446    }
1447
1448    /// Read, parse, and probe verity on `meta.json`.
1449    ///
1450    /// Returns `Ok((metadata, has_verity))` when the file exists,
1451    /// and `Err` when absent or on I/O / parse failures.
1452    fn read_and_probe_metadata(
1453        repo_fd: &OwnedFd,
1454    ) -> Result<(RepoMetadata, bool), RepositoryOpenError> {
1455        let meta_fd = match openat(
1456            repo_fd,
1457            REPO_METADATA_FILENAME,
1458            OFlags::RDONLY | OFlags::CLOEXEC,
1459            Mode::empty(),
1460        ) {
1461            Ok(fd) => fd,
1462            Err(Errno::NOENT) => {
1463                // Detect old-format repositories that have objects/ but
1464                // no meta.json.  Use filter_errno so non-ENOENT errors
1465                // from statat are propagated.
1466                return Err(
1467                    match statat(repo_fd, "objects", AtFlags::empty()).filter_errno(Errno::NOENT) {
1468                        Ok(Some(_)) => RepositoryOpenError::OldFormatRepository,
1469                        Ok(None) => RepositoryOpenError::MetadataMissing,
1470                        Err(e) => e.into(),
1471                    },
1472                );
1473            }
1474            Err(e) => return Err(e.into()),
1475        };
1476
1477        // Clone the fd: one for reading, one for the verity probe.
1478        let read_fd = meta_fd.try_clone()?;
1479        let meta: RepoMetadata =
1480            serde_json::from_reader(std::io::BufReader::new(File::from(read_fd)))
1481                .map_err(RepositoryOpenError::MetadataInvalid)?;
1482
1483        // Probe verity on the original fd.
1484        let has_verity = measure_verity_opt::<ObjectID>(&meta_fd)
1485            .map_err(|e| std::io::Error::other(e.to_string()))?
1486            .is_some();
1487
1488        Ok((meta, has_verity))
1489    }
1490
1491    /// Open the default user-owned composefs repository.
1492    #[context("Opening user repository")]
1493    pub fn open_user() -> Result<Self> {
1494        Ok(Self::open_path(CWD, user_path()?)?)
1495    }
1496
1497    /// Open the default system-global composefs repository.
1498    #[context("Opening system repository")]
1499    pub fn open_system() -> Result<Self> {
1500        Ok(Self::open_path(CWD, system_path())?)
1501    }
1502
1503    fn ensure_dir(&self, dir: impl AsRef<Path>) -> ErrnoResult<()> {
1504        mkdirat(&self.repository, dir.as_ref(), 0o755.into()).or_else(|e| match e {
1505            Errno::EXIST => Ok(()),
1506            _ => Err(e),
1507        })
1508    }
1509
1510    /// Asynchronously ensures an object exists in the repository.
1511    ///
1512    /// Same as `ensure_object` but runs the operation on a blocking thread pool
1513    /// to avoid blocking async tasks. Returns the fsverity digest of the object.
1514    ///
1515    /// For performance reasons, this function does *not* call fsync() or similar.  After you're
1516    /// done with everything, call `Repository::sync_async()`.
1517    #[context("Ensuring object asynchronously")]
1518    pub async fn ensure_object_async(self: &Arc<Self>, data: Vec<u8>) -> Result<ObjectID> {
1519        let writable = self.ensure_writable_token()?;
1520        let self_ = Arc::clone(self);
1521        tokio::task::spawn_blocking(move || self_.ensure_object_impl(&data, &writable)).await?
1522    }
1523
1524    /// Import an object by streaming from a file descriptor into a
1525    /// tmpfile, without buffering the entire file in memory.
1526    ///
1527    /// In insecure mode the verity digest is computed while copying,
1528    /// avoiding a second read pass.
1529    #[context("Ensuring object from file descriptor")]
1530    pub(crate) fn ensure_object_from_fd(&self, source: OwnedFd, size: u64) -> Result<ObjectID> {
1531        let writable = self.ensure_writable_token()?;
1532        let tmpfile_fd = self.create_object_tmpfile_impl(&writable)?;
1533
1534        if self.insecure {
1535            // Insecure mode: compute verity digest while copying, avoiding
1536            // a second read of the data in finalize_object_tmpfile_impl.
1537            let mut hasher = FsVerityHasher::<ObjectID>::new();
1538            let mut src = std::io::BufReader::with_capacity(IO_BUF_CAPACITY, File::from(source));
1539            let mut dst = File::from(tmpfile_fd.try_clone()?);
1540
1541            loop {
1542                let buf = src.fill_buf()?;
1543                if buf.is_empty() {
1544                    break;
1545                }
1546                let chunk = &buf[..buf.len().min(FsVerityHasher::<ObjectID>::BLOCK_SIZE)];
1547                hasher.add_block(chunk);
1548                dst.write_all(chunk)?;
1549                let n = chunk.len();
1550                src.consume(n);
1551            }
1552            drop(dst);
1553
1554            let id = hasher.digest();
1555            let ro_fd = reopen_tmpfile_ro(File::from(tmpfile_fd))
1556                .context("Re-opening tmpfile as read-only")?;
1557            let objects_dir = self.objects_dir().context("Getting objects directory")?;
1558            let (id, _method) = self.link_tmpfile_as_object(objects_dir, &ro_fd, &id, size)?;
1559            Ok(id)
1560        } else {
1561            // Secure mode: let std::io::copy use copy_file_range for
1562            // potential reflinks, then finalize_object_tmpfile_impl
1563            // enables kernel verity and measures the digest.
1564            let mut src = File::from(source);
1565            let mut dst = File::from(tmpfile_fd.try_clone()?);
1566            let copied = std::io::copy(&mut src, &mut dst)?;
1567            ensure!(copied == size, "Expected {size} bytes, got {copied}");
1568            drop(dst);
1569
1570            let (id, _method) =
1571                self.finalize_object_tmpfile_impl(File::from(tmpfile_fd), size, &writable)?;
1572            Ok(id)
1573        }
1574    }
1575
1576    /// Create an O_TMPFILE in the objects directory for streaming writes.
1577    ///
1578    /// Returns the file descriptor for writing. The caller should write data to this fd,
1579    /// then call [`finalize_object_tmpfile`](Self::finalize_object_tmpfile) to compute
1580    /// the verity digest, enable fs-verity, and link the file into the objects directory.
1581    #[context("Creating object tmpfile")]
1582    pub fn create_object_tmpfile(&self) -> Result<OwnedFd> {
1583        let writable = self.ensure_writable_token()?;
1584        self.create_object_tmpfile_impl(&writable)
1585    }
1586
1587    #[context("Creating object tmpfile")]
1588    pub(crate) fn create_object_tmpfile_impl(&self, _writable: &WritableRepo) -> Result<OwnedFd> {
1589        let objects_dir = self
1590            .objects_dir()
1591            .context("Getting objects directory for tmpfile creation")?;
1592        let fd = openat(
1593            objects_dir,
1594            ".",
1595            OFlags::RDWR | OFlags::TMPFILE | OFlags::CLOEXEC,
1596            Mode::from_raw_mode(0o644),
1597        )
1598        .context("Opening temp file in objects directory")?;
1599        Ok(fd)
1600    }
1601
1602    /// Ensure an object exists by reflinking or hardlinking from a source file.
1603    ///
1604    /// The fallback chain is: reflink -> hardlink -> copy.
1605    ///
1606    /// - **Reflink** (FICLONE): zero-copy clone on btrfs/XFS. Uses a tmpfile.
1607    /// - **Hardlink**: enables fs-verity on the source file in-place, then
1608    ///   hardlinks it directly into the objects directory. This avoids all data
1609    ///   copying on filesystems like ext4 that don't support reflinks.
1610    /// - **Copy**: regular data copy into a tmpfile as last resort.
1611    ///
1612    /// The `ctx` argument accumulates knowledge across calls in the same
1613    /// import operation.  After the first reflink attempt fails with
1614    /// `EOPNOTSUPP` / `EXDEV`, the context records this so that subsequent
1615    /// calls skip straight to the hardlink path.
1616    ///
1617    /// This is particularly useful for importing from containers-storage where
1618    /// we already have the file on disk and want to avoid copying data.
1619    pub fn ensure_object_from_file(
1620        &self,
1621        src: &std::fs::File,
1622        size: u64,
1623        ctx: &mut ImportContext,
1624    ) -> Result<(ObjectID, ObjectStoreMethod)> {
1625        self.ensure_object_from_file_inner(src, size, true, ctx)
1626    }
1627
1628    /// Like [`ensure_object_from_file`](Self::ensure_object_from_file) but
1629    /// errors if neither reflink nor hardlink succeeds, instead of falling back
1630    /// to a regular copy.
1631    ///
1632    /// Intended for bootc's unified storage path where the composefs repo and
1633    /// containers-storage are always on the same filesystem, so zero-copy
1634    /// should always be possible.
1635    pub fn ensure_object_from_file_zerocopy(
1636        &self,
1637        src: &std::fs::File,
1638        size: u64,
1639        ctx: &mut ImportContext,
1640    ) -> Result<(ObjectID, ObjectStoreMethod)> {
1641        self.ensure_object_from_file_inner(src, size, false, ctx)
1642    }
1643
1644    /// Inner implementation for [`ensure_object_from_file`](Self::ensure_object_from_file) and
1645    /// [`ensure_object_from_file_zerocopy`](Self::ensure_object_from_file_zerocopy).
1646    ///
1647    /// When `allow_copy` is false, the copy fallback returns an error instead.
1648    fn ensure_object_from_file_inner(
1649        &self,
1650        src: &std::fs::File,
1651        size: u64,
1652        allow_copy: bool,
1653        ctx: &mut ImportContext,
1654    ) -> Result<(ObjectID, ObjectStoreMethod)> {
1655        use rustix::fs::{fstat, ioctl_ficlone};
1656
1657        let writable = self.ensure_writable_token()?;
1658
1659        // Determine the source and destination device IDs so we can look up
1660        // whether this particular filesystem pair supports reflinks.
1661        let src_dev = fstat(src)?.st_dev;
1662        let dst_dev = fstat(self.objects_dir()?)?.st_dev;
1663
1664        // Try reflink first, unless a previous call on this device pair
1665        // already discovered that FICLONE is unsupported.
1666        if !ctx.is_reflink_unsupported(src_dev, dst_dev) {
1667            let tmpfile_fd = self.create_object_tmpfile_impl(&writable)?;
1668            let tmpfile = File::from(tmpfile_fd);
1669
1670            match ioctl_ficlone(&tmpfile, src) {
1671                Ok(()) => {
1672                    // Reflink succeeded — verify size matches
1673                    let stat = fstat(&tmpfile)?;
1674                    anyhow::ensure!(
1675                        stat.st_size as u64 == size,
1676                        "Reflink size mismatch: expected {}, got {}",
1677                        size,
1678                        stat.st_size
1679                    );
1680
1681                    let (object_id, method) = self.finalize_object_tmpfile(tmpfile, size)?;
1682                    let method = match method {
1683                        ObjectStoreMethod::Copied => ObjectStoreMethod::Reflinked,
1684                        other => other,
1685                    };
1686                    return Ok((object_id, method));
1687                }
1688                Err(Errno::OPNOTSUPP | Errno::XDEV) => {
1689                    // Record for this device pair so subsequent calls skip.
1690                    ctx.mark_reflink_unsupported(src_dev, dst_dev);
1691                    drop(tmpfile);
1692                }
1693                Err(e) => {
1694                    return Err(e).context("Reflinking source file to objects directory")?;
1695                }
1696            }
1697        }
1698
1699        // Try hardlink: enable verity on the source in-place, then link it
1700        // directly into objects/. This avoids all data copying.
1701        match self.try_hardlink_object(src, size) {
1702            Ok(result) => return Ok(result),
1703            Err(_) if allow_copy => {
1704                // Hardlink failed, fall through to copy.
1705                // Common causes: cross-mount (overlay bind mount), EPERM,
1706                // or verity enablement failure.
1707            }
1708            Err(e) => {
1709                return Err(e).context(
1710                    "reflink and hardlink both failed; copy fallback is disabled (zerocopy mode)",
1711                );
1712            }
1713        }
1714
1715        // Final fallback: copy data into a new tmpfile.
1716        let tmpfile_fd = self.create_object_tmpfile_impl(&writable)?;
1717        let mut tmpfile = File::from(tmpfile_fd);
1718        {
1719            use std::io::{Seek, SeekFrom};
1720            let mut src_clone = src.try_clone()?;
1721            src_clone.seek(SeekFrom::Start(0))?;
1722            std::io::copy(&mut src_clone, &mut tmpfile)?;
1723        }
1724
1725        let (object_id, method) = self.finalize_object_tmpfile(tmpfile, size)?;
1726        Ok((object_id, method))
1727    }
1728
1729    /// Try to hardlink a source file directly into the objects directory.
1730    ///
1731    /// Enables fs-verity on the source file in-place, measures the digest to
1732    /// determine the object ID, then hardlinks the source into `objects/<hash>`.
1733    ///
1734    /// Returns an error if verity cannot be enabled, the digest cannot be
1735    /// measured, or the hardlink fails (e.g. cross-device).
1736    fn try_hardlink_object(
1737        &self,
1738        src: &std::fs::File,
1739        size: u64,
1740    ) -> Result<(ObjectID, ObjectStoreMethod)> {
1741        use crate::fsverity::enable_verity_with_retry;
1742        use rustix::thread::{CapabilitySet, capabilities};
1743
1744        // AT_EMPTY_PATH linkat requires CAP_DAC_READ_SEARCH.  Check upfront
1745        // so callers get a clear error instead of a confusing ENOENT.
1746        let has_cap = capabilities(None)
1747            .map(|caps| caps.effective.contains(CapabilitySet::DAC_READ_SEARCH))
1748            .unwrap_or(false);
1749        if !has_cap {
1750            anyhow::bail!(
1751                "hardlinking objects requires CAP_DAC_READ_SEARCH \
1752                 (run as root or use the copy fallback)"
1753            );
1754        }
1755
1756        let objects_dir = self.objects_dir()?;
1757
1758        // Enable fs-verity on the source file in-place.
1759        // This is safe because the caller (bootc/containers-storage) owns the
1760        // source files and they are immutable image data.
1761        // AlreadyEnabled is fine — the file was already verity-protected.
1762        let verity_enabled = match enable_verity_with_retry::<ObjectID>(src) {
1763            Ok(()) => true,
1764            Err(EnableVerityError::AlreadyEnabled) => true,
1765            Err(EnableVerityError::FilesystemNotSupported) if self.insecure => false,
1766            Err(e) => {
1767                return Err(e).context("enabling verity on source file for hardlink")?;
1768            }
1769        };
1770
1771        // Get the object ID from the verity digest (kernel-measured or userspace-computed)
1772        let id: ObjectID = if verity_enabled {
1773            measure_verity(src).context("measuring verity digest on source file")?
1774        } else {
1775            // Insecure mode on a filesystem without verity: compute digest in userspace
1776            let mut reader = std::io::BufReader::new(
1777                src.try_clone()
1778                    .context("cloning fd for digest computation")?,
1779            );
1780            Self::compute_verity_digest(&mut reader)
1781                .context("computing verity digest in insecure mode")?
1782        };
1783
1784        // Check if object already exists (dedup)
1785        let path = id.to_object_pathname();
1786        match statat(objects_dir, &path, AtFlags::empty()) {
1787            Ok(stat) if stat.st_size as u64 == size => {
1788                return Ok((id, ObjectStoreMethod::AlreadyPresent));
1789            }
1790            _ => {}
1791        }
1792
1793        // Ensure parent directory exists (e.g. objects/4e/)
1794        let parent_dir = id.to_object_dir();
1795        ensure_dir_at(objects_dir, &parent_dir, Mode::from_raw_mode(0o755))
1796            .context("creating object parent directory")?;
1797
1798        // Hardlink the source file directly into objects/<hash>.
1799        // Use AT_EMPTY_PATH to link by fd, which avoids the kernel's
1800        // may_linkat() restriction that rejects AT_SYMLINK_FOLLOW on
1801        // /proc/self/fd/<N> magic symlinks for non-root mounts.
1802        // AT_EMPTY_PATH requires CAP_DAC_READ_SEARCH, which is available
1803        // when running as root (the expected case for containers-storage).
1804        match linkat(src, "", objects_dir, &path, AtFlags::EMPTY_PATH) {
1805            Ok(()) => Ok((id, ObjectStoreMethod::Hardlinked)),
1806            Err(Errno::EXIST) => Ok((id, ObjectStoreMethod::AlreadyPresent)),
1807            Err(e) => Err(e).context("hardlinking source file into objects directory")?,
1808        }
1809    }
1810
1811    /// Finalize a tmpfile as an object.
1812    ///
1813    /// This method should be called from a blocking context (e.g., `spawn_blocking`)
1814    /// as it performs synchronous I/O operations.
1815    ///
1816    /// This method:
1817    /// 1. Re-opens the file as read-only
1818    /// 2. Enables fs-verity on the file (kernel computes digest)
1819    /// 3. Reads the digest from the kernel
1820    /// 4. Checks if object already exists (deduplication)
1821    /// 5. Links the file into the objects directory
1822    ///
1823    /// By letting the kernel compute the digest during verity enable, we avoid
1824    /// reading the file an extra time in userspace.
1825    #[context("Finalizing object tempfile")]
1826    pub fn finalize_object_tmpfile(
1827        &self,
1828        file: File,
1829        size: u64,
1830    ) -> Result<(ObjectID, ObjectStoreMethod)> {
1831        let writable = self.ensure_writable_token()?;
1832        self.finalize_object_tmpfile_impl(file, size, &writable)
1833    }
1834
1835    #[context("Finalizing object tempfile")]
1836    pub(crate) fn finalize_object_tmpfile_impl(
1837        &self,
1838        file: File,
1839        size: u64,
1840        _writable: &WritableRepo,
1841    ) -> Result<(ObjectID, ObjectStoreMethod)> {
1842        let ro_fd =
1843            reopen_tmpfile_ro(file).context("Re-opening tmpfile as read-only for verity")?;
1844
1845        // Get objects_dir early since we may need it for verity copy
1846        let objects_dir = self
1847            .objects_dir()
1848            .context("Getting objects directory for finalization")?;
1849
1850        // Enable verity - the kernel reads the file and computes the digest.
1851        // Use enable_verity_maybe_copy to handle the case where forked processes
1852        // have inherited writable fds to this file.
1853        let (ro_fd, verity_enabled) =
1854            match enable_verity_maybe_copy::<ObjectID>(objects_dir, ro_fd.as_fd()) {
1855                Ok(None) => (ro_fd, true),
1856                Ok(Some(new_fd)) => (new_fd, true),
1857                Err(EnableVerityError::FilesystemNotSupported) if self.insecure => (ro_fd, false),
1858                Err(EnableVerityError::AlreadyEnabled) => (ro_fd, true),
1859                Err(other) => return Err(other).context("Enabling verity on tmpfile")?,
1860            };
1861
1862        // Get the digest - either from kernel (fast) or compute in userspace (fallback)
1863        let id: ObjectID = if verity_enabled {
1864            measure_verity(&ro_fd).context("Measuring verity digest")?
1865        } else {
1866            // Insecure mode: compute digest in userspace from ro_fd
1867            let mut reader = std::io::BufReader::new(File::from(
1868                ro_fd
1869                    .try_clone()
1870                    .context("Cloning fd for digest computation")?,
1871            ));
1872            Self::compute_verity_digest(&mut reader)
1873                .context("Computing verity digest in insecure mode")?
1874        };
1875
1876        self.link_tmpfile_as_object(objects_dir, &ro_fd, &id, size)
1877    }
1878
1879    /// Link a read-only tmpfile into the objects directory with dedup check.
1880    ///
1881    /// If an object with the same digest and size already exists, the
1882    /// tmpfile is discarded and `AlreadyPresent` is returned.
1883    fn link_tmpfile_as_object(
1884        &self,
1885        objects_dir: &OwnedFd,
1886        ro_fd: &impl AsFd,
1887        id: &ObjectID,
1888        size: u64,
1889    ) -> Result<(ObjectID, ObjectStoreMethod)> {
1890        let path = id.to_object_pathname();
1891
1892        match statat(objects_dir, &path, AtFlags::empty()) {
1893            Ok(stat) if stat.st_size as u64 == size => {
1894                return Ok((id.clone(), ObjectStoreMethod::AlreadyPresent));
1895            }
1896            _ => {}
1897        }
1898
1899        let parent_dir = id.to_object_dir();
1900        ensure_dir_at(objects_dir, &parent_dir, Mode::from_raw_mode(0o755))
1901            .context("creating object parent directory")?;
1902
1903        match linkat(
1904            CWD,
1905            proc_self_fd(ro_fd),
1906            objects_dir,
1907            &path,
1908            AtFlags::SYMLINK_FOLLOW,
1909        ) {
1910            Ok(()) => Ok((id.clone(), ObjectStoreMethod::Copied)),
1911            Err(Errno::EXIST) => Ok((id.clone(), ObjectStoreMethod::AlreadyPresent)),
1912            Err(e) => Err(e).context("Linking tmpfile into objects directory")?,
1913        }
1914    }
1915
1916    /// Compute fs-verity digest in userspace by reading from a buffered source.
1917    /// Used as fallback when kernel verity is not available (insecure mode).
1918    #[context("Computing verity digest in userspace")]
1919    fn compute_verity_digest(reader: &mut impl std::io::BufRead) -> Result<ObjectID> {
1920        let mut hasher = FsVerityHasher::<ObjectID>::new();
1921
1922        loop {
1923            let buf = reader
1924                .fill_buf()
1925                .context("Reading buffer for verity computation")?;
1926            if buf.is_empty() {
1927                break;
1928            }
1929            // add_block expects at most one block at a time
1930            let chunk_size = buf.len().min(FsVerityHasher::<ObjectID>::BLOCK_SIZE);
1931            hasher.add_block(&buf[..chunk_size]);
1932            reader.consume(chunk_size);
1933        }
1934
1935        Ok(hasher.digest())
1936    }
1937
1938    /// Store an object with a pre-computed fs-verity ID.
1939    ///
1940    /// This is an internal helper that stores data assuming the caller has already
1941    /// computed the correct fs-verity digest. The digest is verified after storage.
1942    #[context("Storing object with ID {id:?}")]
1943    fn store_object_with_id(
1944        &self,
1945        data: &[u8],
1946        id: &ObjectID,
1947        _writable: &WritableRepo,
1948    ) -> Result<()> {
1949        let dirfd = self
1950            .objects_dir()
1951            .context("Getting objects directory for storage")?;
1952        let path = id.to_object_pathname();
1953
1954        // the usual case is that the file will already exist
1955        match openat(
1956            dirfd,
1957            &path,
1958            OFlags::RDONLY | OFlags::CLOEXEC,
1959            Mode::empty(),
1960        ) {
1961            Ok(fd) => {
1962                // measure the existing file to ensure that it's correct
1963                // TODO: try to replace file if it's broken?
1964                match ensure_verity_equal(&fd, id) {
1965                    Ok(()) => {}
1966                    Err(CompareVerityError::Measure(MeasureVerityError::VerityMissing))
1967                        if self.insecure =>
1968                    {
1969                        match enable_verity_maybe_copy::<ObjectID>(dirfd, fd.as_fd()) {
1970                            Ok(Some(fd)) => ensure_verity_equal(&fd, id)
1971                                .context("Verifying verity after enabling (copied)")?,
1972                            Ok(None) => ensure_verity_equal(&fd, id)
1973                                .context("Verifying verity after enabling (original)")?,
1974                            Err(other) => {
1975                                Err(other).context("Enabling verity on existing object")?
1976                            }
1977                        }
1978                    }
1979                    Err(CompareVerityError::Measure(
1980                        MeasureVerityError::FilesystemNotSupported,
1981                    )) if self.insecure => {}
1982                    Err(other) => Err(other).context("Verifying existing object integrity")?,
1983                }
1984                return Ok(());
1985            }
1986            Err(Errno::NOENT) => {
1987                // in this case we'll create the file
1988            }
1989            Err(other) => {
1990                return Err(other).context("Checking for existing object in repository")?;
1991            }
1992        }
1993
1994        let fd = ensure_dir_and_openat(dirfd, &id.to_object_dir(), OFlags::RDWR | OFlags::TMPFILE)
1995            .with_context(|| "Creating tempfile in object subdirectory")?;
1996        let mut file = File::from(fd);
1997        file.write_all(data).context("Writing data to tmpfile")?;
1998        // NB: We should do fdatasync() or fsync() here, but doing this for each file forces the
1999        // creation of a massive number of journal commits and is a performance disaster.  We need
2000        // to coordinate this at a higher level.  See .write_stream().
2001        let ro_fd = reopen_tmpfile_ro(file).context("Re-opening file as read-only for verity")?;
2002
2003        let ro_fd = match enable_verity_maybe_copy::<ObjectID>(dirfd, ro_fd.as_fd()) {
2004            Ok(maybe_fd) => {
2005                let ro_fd = maybe_fd.unwrap_or(ro_fd);
2006                match ensure_verity_equal(&ro_fd, id) {
2007                    Ok(()) => ro_fd,
2008                    Err(CompareVerityError::Measure(
2009                        MeasureVerityError::VerityMissing
2010                        | MeasureVerityError::FilesystemNotSupported,
2011                    )) if self.insecure => ro_fd,
2012                    Err(other) => Err(other).context("Double-checking verity digest")?,
2013                }
2014            }
2015            Err(EnableVerityError::FilesystemNotSupported) if self.insecure => ro_fd,
2016            Err(other) => Err(other).context("Enabling verity digest")?,
2017        };
2018
2019        match linkat(
2020            CWD,
2021            proc_self_fd(&ro_fd),
2022            dirfd,
2023            path,
2024            AtFlags::SYMLINK_FOLLOW,
2025        ) {
2026            Ok(()) => {}
2027            Err(Errno::EXIST) => {
2028                // TODO: strictly, we should measure the newly-appeared file
2029            }
2030            Err(other) => {
2031                return Err(other).context("Linking created object file");
2032            }
2033        }
2034
2035        Ok(())
2036    }
2037
2038    /// Given a blob of data, store it in the repository.
2039    ///
2040    /// For performance reasons, this function does *not* call fsync() or similar.  After you're
2041    /// done with everything, call `Repository::sync()`.
2042    #[context("Ensuring object exists in repository")]
2043    pub fn ensure_object(&self, data: &[u8]) -> Result<ObjectID> {
2044        let writable = self.ensure_writable_token()?;
2045        self.ensure_object_impl(data, &writable)
2046    }
2047
2048    /// Like [`ensure_object`] but requires a [`WritableRepo`] token
2049    /// instead of performing the check itself.
2050    ///
2051    /// This exists so that [`SplitStreamWriter`] (which carries a token)
2052    /// can store objects without redundant `faccessat` calls.
2053    #[context("Ensuring object exists in repository")]
2054    pub(crate) fn ensure_object_impl(
2055        &self,
2056        data: &[u8],
2057        writable: &WritableRepo,
2058    ) -> Result<ObjectID> {
2059        let id: ObjectID = compute_verity(data);
2060        self.store_object_with_id(data, &id, writable)?;
2061        Ok(id)
2062    }
2063
2064    #[context("Opening file '{filename}' with verity verification")]
2065    fn open_with_verity(&self, filename: &str, expected_verity: &ObjectID) -> Result<OwnedFd> {
2066        let fd = self
2067            .openat(filename, OFlags::RDONLY)
2068            .with_context(|| format!("Opening file '{filename}' in repository"))?;
2069        match ensure_verity_equal(&fd, expected_verity) {
2070            Ok(()) => {}
2071            Err(CompareVerityError::Measure(
2072                MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported,
2073            )) if self.insecure => {}
2074            Err(other) => Err(other).context("Verifying file verity digest")?,
2075        }
2076        Ok(fd)
2077    }
2078
2079    /// Returns whether the repository is in insecure mode.
2080    ///
2081    /// This is auto-detected from whether `meta.json` has fs-verity
2082    /// enabled, but can be overridden with [`set_insecure`].
2083    pub fn is_insecure(&self) -> bool {
2084        self.insecure
2085    }
2086
2087    /// Override the EROFS format version for this repository session.
2088    ///
2089    /// Changes the in-memory default used by [`FileSystem::commit_image`]
2090    /// and [`FileSystem::compute_image_id`] for the lifetime of this
2091    /// Override the EROFS format version for this `Repository` instance only.
2092    ///
2093    /// Does **not** rewrite `meta.json`.  Intended for CLI tools that accept a
2094    /// per-invocation `--erofs-version` flag to override the repository's stored default.
2095    pub fn set_erofs_version(&mut self, version: FormatVersion) -> &mut Self {
2096        self.erofs_version_override = Some(version);
2097        self
2098    }
2099
2100    /// Mark this repository as insecure, disabling verification of
2101    /// fs-verity digests.  This allows operation on filesystems
2102    /// without verity support.
2103    pub fn set_insecure(&mut self) -> &mut Self {
2104        self.insecure = true;
2105        self
2106    }
2107
2108    /// Require that this repository has fs-verity enabled.
2109    ///
2110    /// Returns an error if the repository was not initialized with
2111    /// verity on `meta.json`, since there is no mechanism to
2112    /// retroactively enable verity on existing objects.
2113    pub fn require_verity(&self) -> Result<()> {
2114        if self.insecure {
2115            bail!(
2116                "repository was not initialized with fs-verity \
2117                 (hint: re-create with `cfsctl init` on a \
2118                 verity-capable filesystem)"
2119            );
2120        }
2121        Ok(())
2122    }
2123
2124    /// Fast pre-flight check that the repository is writable.
2125    ///
2126    /// Uses `faccessat(W_OK)` to catch read-only mounts and permission
2127    /// issues before starting expensive network or I/O work.  Callers
2128    /// that want to fail early (e.g. before downloading an image) should
2129    /// call this; individual write methods already check internally.
2130    pub fn ensure_writable(&self) -> Result<()> {
2131        self.ensure_writable_token()?;
2132        Ok(())
2133    }
2134
2135    /// Like [`ensure_writable`] but returns a proof token for internal use.
2136    pub(crate) fn ensure_writable_token(&self) -> Result<WritableRepo> {
2137        // fstatvfs catches read-only mounts (ST_RDONLY).  faccessat(W_OK)
2138        // alone is insufficient because it only checks DAC permission bits
2139        // and root bypasses those, so a root process on a read-only
2140        // bind-mounted repo would pass the faccessat check.  Conversely,
2141        // fstatvfs alone misses writable filesystems where the caller lacks
2142        // write permission (e.g. a repo owned by another user), so we follow
2143        // up with faccessat to catch that case.
2144        let st = fstatvfs(&self.repository).context("Repository is not writable")?;
2145        if st.f_flag.contains(StatVfsMountFlags::RDONLY) {
2146            anyhow::bail!("Repository is not writable: read-only file system");
2147        }
2148        accessat(&self.repository, ".", Access::WRITE_OK, AtFlags::empty())
2149            .context("Repository is not writable")?;
2150        Ok(WritableRepo)
2151    }
2152
2153    /// Creates a SplitStreamWriter for writing a split stream.
2154    /// You should write the data to the returned object and then pass it to .store_stream() to
2155    /// store the result.
2156    ///
2157    /// The writable check is performed here so that callers cannot obtain
2158    /// a writer without first verifying the repository is writable.
2159    /// The [`WritableRepo`] token is carried by the writer so that
2160    /// subsequent object writes skip redundant checks.
2161    pub fn create_stream(
2162        self: &Arc<Self>,
2163        content_type: u64,
2164    ) -> Result<SplitStreamWriter<ObjectID>> {
2165        let writable = self.ensure_writable_token()?;
2166        Ok(SplitStreamWriter::new(self, content_type, writable))
2167    }
2168
2169    fn format_object_path(id: &ObjectID) -> String {
2170        format!("objects/{}", id.to_object_pathname())
2171    }
2172
2173    fn format_stream_path(content_identifier: &str) -> String {
2174        format!("streams/{content_identifier}")
2175    }
2176
2177    /// Check if the provided splitstream is present in the repository;
2178    /// if so, return its fsverity digest.
2179    #[context("Checking if stream '{content_identifier}' exists")]
2180    pub fn has_stream(&self, content_identifier: &str) -> Result<Option<ObjectID>> {
2181        let stream_path = Self::format_stream_path(content_identifier);
2182
2183        match readlinkat(&self.repository, &stream_path, []) {
2184            Ok(target) => {
2185                let bytes = target.as_bytes();
2186                ensure!(
2187                    bytes.starts_with(b"../"),
2188                    "stream symlink has incorrect prefix"
2189                );
2190                Ok(Some(
2191                    ObjectID::from_object_pathname(bytes)
2192                        .context("Parsing object ID from stream symlink target")?,
2193                ))
2194            }
2195            Err(Errno::NOENT) => Ok(None),
2196            Err(err) => Err(err).context("Reading stream symlink")?,
2197        }
2198    }
2199
2200    /// Write the given splitstream to the repository with the provided content identifier and
2201    /// optional reference name.
2202    ///
2203    /// This call contains an internal barrier that guarantees that, in event of a crash, either:
2204    ///  - the named stream (by `content_identifier`) will not be available; or
2205    ///  - the stream and all of its linked data will be available
2206    ///
2207    /// In other words: it will not be possible to boot a system which contained a stream named
2208    /// `content_identifier` but is missing linked streams or objects from that stream.
2209    #[context("Writing stream '{content_identifier}' to repository")]
2210    pub fn write_stream(
2211        &self,
2212        writer: SplitStreamWriter<ObjectID>,
2213        content_identifier: &str,
2214        reference: Option<&str>,
2215    ) -> Result<ObjectID> {
2216        let writable = *writer.writable();
2217        let object_id = writer.done().context("Finalizing split stream writer")?;
2218
2219        // Right now we have:
2220        //   - all of the linked external objects and streams; and
2221        //   - the binary data of this splitstream itself
2222        //
2223        // in the filesystem but but not yet guaranteed to be synced to disk.  This is OK because
2224        // nobody knows that the binary data of the splitstream is a splitstream yet: it could just
2225        // as well be a random data file contained in an OS image or something.
2226        //
2227        // We need to make sure that all of that makes it to the disk before the splitstream is
2228        // visible as a splitstream.
2229        self.sync()?;
2230
2231        let stream_path = Self::format_stream_path(content_identifier);
2232        let object_path = Self::format_object_path(&object_id);
2233        self.symlink_impl(&stream_path, &object_path, &writable)?;
2234
2235        if let Some(name) = reference {
2236            let reference_path = format!("streams/refs/{name}");
2237            self.symlink_impl(&reference_path, &stream_path, &writable)?;
2238        }
2239
2240        Ok(object_id)
2241    }
2242
2243    /// Register an already-stored object as a named stream.
2244    ///
2245    /// This is useful when using `SplitStreamBuilder` which stores the splitstream
2246    /// directly via `finish()`. After calling `finish()`, call this method to
2247    /// sync all data to disk and create the stream symlink.
2248    ///
2249    /// This method ensures atomicity: the stream symlink is only created after
2250    /// all objects have been synced to disk.
2251    #[context("Registering stream '{content_identifier}' with object ID {object_id:?}")]
2252    pub async fn register_stream(
2253        self: &Arc<Self>,
2254        object_id: &ObjectID,
2255        content_identifier: &str,
2256        reference: Option<&str>,
2257    ) -> Result<()> {
2258        let writable = self.ensure_writable_token()?;
2259        self.sync_async().await?;
2260
2261        let stream_path = Self::format_stream_path(content_identifier);
2262        let object_path = Self::format_object_path(object_id);
2263        self.symlink_impl(&stream_path, &object_path, &writable)?;
2264
2265        if let Some(name) = reference {
2266            let reference_path = format!("streams/refs/{name}");
2267            self.symlink_impl(&reference_path, &stream_path, &writable)?;
2268        }
2269
2270        Ok(())
2271    }
2272
2273    /// Async version of `write_stream` for use with parallel object storage.
2274    ///
2275    /// This method awaits any pending parallel object storage tasks before
2276    /// finalizing the stream. Use this when you've called `write_external_parallel()`
2277    /// on the writer.
2278    #[context("Writing stream '{content_identifier}' to repository (async)")]
2279    pub async fn write_stream_async(
2280        self: &Arc<Self>,
2281        writer: SplitStreamWriter<ObjectID>,
2282        content_identifier: &str,
2283        reference: Option<&str>,
2284    ) -> Result<ObjectID> {
2285        let writable = *writer.writable();
2286        let object_id = writer
2287            .done_async()
2288            .await
2289            .context("Finalizing split stream writer (async)")?;
2290
2291        self.sync_async().await?;
2292
2293        let stream_path = Self::format_stream_path(content_identifier);
2294        let object_path = Self::format_object_path(&object_id);
2295        self.symlink_impl(&stream_path, &object_path, &writable)?;
2296
2297        if let Some(name) = reference {
2298            let reference_path = format!("streams/refs/{name}");
2299            self.symlink_impl(&reference_path, &stream_path, &writable)?;
2300        }
2301
2302        Ok(object_id)
2303    }
2304
2305    /// Check if a splitstream with a given name exists in the "refs" in the repository.
2306    #[context("Checking if named stream '{name}' exists")]
2307    pub fn has_named_stream(&self, name: &str) -> Result<bool> {
2308        let stream_path = format!("streams/refs/{name}");
2309
2310        Ok(statat(&self.repository, &stream_path, AtFlags::empty())
2311            .filter_errno(Errno::NOENT)
2312            .with_context(|| format!("Looking for stream '{name}' in repository"))?
2313            .map(|s| FileType::from_raw_mode(s.st_mode).is_symlink())
2314            .unwrap_or(false))
2315    }
2316
2317    /// Assign a named reference to a stream, making it a GC root.
2318    ///
2319    /// Creates a symlink at `streams/refs/{name}` pointing to the stream identified
2320    /// by `content_identifier`. The stream must already exist in the repository.
2321    ///
2322    /// Named references serve two purposes:
2323    /// 1. They provide human-readable names for streams
2324    /// 2. They act as GC roots - streams reachable from refs are not garbage collected
2325    ///
2326    /// The `name` can include path separators to organize refs hierarchically
2327    /// (e.g., `myapp/layer1`), and intermediate directories are created automatically.
2328    #[context("Naming stream '{content_identifier}' as '{name}'")]
2329    pub fn name_stream(&self, content_identifier: &str, name: &str) -> Result<()> {
2330        let writable = self.ensure_writable_token()?;
2331        let stream_path = Self::format_stream_path(content_identifier);
2332        let reference_path = format!("streams/refs/{name}");
2333        self.symlink_impl(&reference_path, &stream_path, &writable)?;
2334        Ok(())
2335    }
2336
2337    /// Ensures that the stream with a given content identifier digest exists in the repository.
2338    ///
2339    /// This tries to find the stream by the content identifier.  If the stream is already in the
2340    /// repository, the object ID (fs-verity digest) is read from the symlink.  If the stream is
2341    /// not already in the repository, a `SplitStreamWriter` is created and passed to `callback`.
2342    /// On return, the object ID of the stream will be calculated and it will be written to disk
2343    /// (if it wasn't already created by someone else in the meantime).
2344    ///
2345    /// In both cases, if `reference` is provided, it is used to provide a fixed name for the
2346    /// object.  Any object that doesn't have a fixed reference to it is subject to garbage
2347    /// collection.  It is an error if this reference already exists.
2348    ///
2349    /// On success, the object ID of the new object is returned.  It is expected that this object
2350    /// ID will be used when referring to the stream from other linked streams.
2351    #[context("Ensuring stream '{content_identifier}' exists")]
2352    pub fn ensure_stream<T: Default>(
2353        self: &Arc<Self>,
2354        content_identifier: &str,
2355        content_type: u64,
2356        callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<T>,
2357        reference: Option<&str>,
2358    ) -> Result<(ObjectID, T)> {
2359        let writable = self.ensure_writable_token()?;
2360        let stream_path = Self::format_stream_path(content_identifier);
2361
2362        let (object_id, extra) = match self.has_stream(content_identifier)? {
2363            Some(id) => (id, T::default()),
2364            None => {
2365                let mut writer = self.create_stream(content_type)?;
2366                let extra = callback(&mut writer).context("Writing stream content via callback")?;
2367                let id = self.write_stream(writer, content_identifier, reference)?;
2368                (id, extra)
2369            }
2370        };
2371
2372        if let Some(name) = reference {
2373            let reference_path = format!("streams/refs/{name}");
2374            self.symlink_impl(&reference_path, &stream_path, &writable)?;
2375        }
2376
2377        Ok((object_id, extra))
2378    }
2379
2380    /// Open a splitstream with the given name.
2381    #[context("Opening stream '{content_identifier}'")]
2382    pub fn open_stream(
2383        &self,
2384        content_identifier: &str,
2385        verity: Option<&ObjectID>,
2386        expected_content_type: Option<u64>,
2387    ) -> Result<SplitStreamReader<ObjectID>> {
2388        let file = File::from(if let Some(verity_hash) = verity {
2389            self.open_object(verity_hash)
2390                .with_context(|| format!("Opening object '{verity_hash:?}'"))?
2391        } else {
2392            let filename = Self::format_stream_path(content_identifier);
2393            self.openat(&filename, OFlags::RDONLY)
2394                .with_context(|| format!("Opening ref '{filename}'"))?
2395        });
2396
2397        SplitStreamReader::new(file, expected_content_type)
2398    }
2399
2400    /// Given an object identifier (a digest), return a read-only file descriptor
2401    /// for its contents. The fsverity digest is verified (if the repository is not in `insecure` mode).
2402    #[context("Opening object {id:?}")]
2403    pub fn open_object(&self, id: &ObjectID) -> Result<OwnedFd> {
2404        self.open_with_verity(&Self::format_object_path(id), id)
2405    }
2406
2407    /// Read the contents of an object into a Vec
2408    #[context("Reading object {id:?} into memory")]
2409    pub fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
2410        let mut data = vec![];
2411        File::from(self.open_object(id)?)
2412            .read_to_end(&mut data)
2413            .context("Reading object data")?;
2414        Ok(data)
2415    }
2416
2417    /// Merges a splitstream into a single continuous stream.
2418    ///
2419    /// Opens the named splitstream, resolves all object references, and writes
2420    /// the complete merged content to the provided writer. Optionally verifies
2421    /// the splitstream's fsverity digest matches the expected value.
2422    #[context("Merging splitstream '{content_identifier}'")]
2423    pub fn merge_splitstream(
2424        &self,
2425        content_identifier: &str,
2426        verity: Option<&ObjectID>,
2427        expected_content_type: Option<u64>,
2428        output: &mut impl Write,
2429    ) -> Result<()> {
2430        let mut split_stream =
2431            self.open_stream(content_identifier, verity, expected_content_type)?;
2432        split_stream.cat(self, output)
2433    }
2434
2435    /// Write `data into the repository as an image with the given `name`.
2436    ///
2437    /// The fsverity digest is returned.
2438    ///
2439    /// # Integrity
2440    ///
2441    /// This function is not safe for untrusted users.
2442    #[context("Writing image to repository")]
2443    pub fn write_image(&self, name: Option<&str>, data: &[u8]) -> Result<ObjectID> {
2444        let writable = self.ensure_writable_token()?;
2445        let object_id = self.ensure_object_impl(data, &writable)?;
2446
2447        let object_path = Self::format_object_path(&object_id);
2448        let image_path = format!("images/{}", object_id.to_hex());
2449
2450        self.symlink_impl(&image_path, &object_path, &writable)?;
2451
2452        if let Some(reference) = name {
2453            let ref_path = format!("images/refs/{reference}");
2454            self.symlink_impl(&ref_path, &image_path, &writable)?;
2455        }
2456
2457        Ok(object_id)
2458    }
2459
2460    /// Import the data from the provided read into the repository as an image.
2461    ///
2462    /// The fsverity digest is returned.
2463    ///
2464    /// # Integrity
2465    ///
2466    /// This function is not safe for untrusted users.
2467    #[context("Importing image '{name}' from reader")]
2468    pub fn import_image<R: Read>(&self, name: &str, image: &mut R) -> Result<ObjectID> {
2469        let mut data = vec![];
2470        image
2471            .read_to_end(&mut data)
2472            .context("Reading image data from input")?;
2473        self.write_image(Some(name), &data)
2474    }
2475
2476    /// Returns the fd of the image and whether or not verity should be
2477    /// enabled when mounting it.
2478    #[context("Opening image '{name}'")]
2479    pub fn open_image(&self, name: &str) -> Result<(OwnedFd, bool)> {
2480        let image = match self.openat(&format!("images/{name}"), OFlags::RDONLY) {
2481            Ok(fd) => fd,
2482            Err(Errno::NOENT) => {
2483                return Err(anyhow::Error::new(ImageNotFound {
2484                    name: name.to_string(),
2485                }));
2486            }
2487            Err(e) => {
2488                return Err(e).with_context(|| format!("Opening ref 'images/{name}'"));
2489            }
2490        };
2491
2492        if name.contains("/") {
2493            return Ok((image, true));
2494        }
2495
2496        // A name with no slashes in it is taken to be a sha256 fs-verity digest
2497        match measure_verity::<ObjectID>(&image) {
2498            Ok(found)
2499                if found
2500                    == FsVerityHashValue::from_hex(name)
2501                        .context("Parsing expected verity hash from image name")? =>
2502            {
2503                Ok((image, true))
2504            }
2505            Ok(_) => bail!("fs-verity content mismatch"),
2506            Err(MeasureVerityError::VerityMissing | MeasureVerityError::FilesystemNotSupported)
2507                if self.insecure =>
2508            {
2509                Ok((image, false))
2510            }
2511            Err(other) => Err(other).context("Measuring image verity digest")?,
2512        }
2513    }
2514
2515    /// Create a detached mount of an image. This file descriptor can then
2516    /// be attached via e.g. `move_mount`.
2517    #[context("Mounting image '{name}'")]
2518    pub fn mount_with_options(&self, name: &str, options: &MountOptions) -> Result<OwnedFd> {
2519        let (image, enable_verity) = self.open_image(name)?;
2520
2521        composefs_fsmount(
2522            image,
2523            name,
2524            self.objects_dir()
2525                .context("Getting objects directory for mount")?,
2526            enable_verity,
2527            options,
2528        )
2529        .context("Creating filesystem mount")
2530    }
2531
2532    /// Create a detached read-only mount of an image.
2533    /// This file descriptor can then be attached via e.g. `move_mount`.
2534    #[context("Mounting image '{name}'")]
2535    pub fn mount(&self, name: &str) -> Result<OwnedFd> {
2536        self.mount_with_options(name, &MountOptions::default())
2537    }
2538
2539    /// Mount the image with the provided digest at the target path.
2540    #[context("Mounting image '{name}' at path")]
2541    pub fn mount_at(
2542        &self,
2543        name: &str,
2544        mountpoint: impl AsRef<Path>,
2545        options: &MountOptions,
2546    ) -> Result<()> {
2547        mount_at(
2548            self.mount_with_options(name, options)?,
2549            CWD,
2550            &canonicalize(mountpoint).context("Canonicalizing mountpoint path")?,
2551        )
2552        .context("Attaching mount at target path")
2553    }
2554
2555    /// Creates a relative symlink within the repository.
2556    ///
2557    /// Computes the correct relative path from the symlink location to the target,
2558    /// creating any necessary intermediate directories. Atomically replaces any
2559    /// existing symlink at the specified name.
2560    pub fn symlink(
2561        &self,
2562        name: impl AsRef<Path> + std::fmt::Debug,
2563        target: impl AsRef<Path> + std::fmt::Debug,
2564    ) -> anyhow::Result<()> {
2565        let writable = self.ensure_writable_token()?;
2566        self.symlink_impl(name, target, &writable)
2567    }
2568
2569    #[context("Creating symlink from {name:?} to {target:?}")]
2570    pub(crate) fn symlink_impl(
2571        &self,
2572        name: impl AsRef<Path> + std::fmt::Debug,
2573        target: impl AsRef<Path> + std::fmt::Debug,
2574        _writable: &WritableRepo,
2575    ) -> anyhow::Result<()> {
2576        let name = name.as_ref();
2577
2578        let mut symlink_components = name.parent().unwrap().components().peekable();
2579        let mut target_components = target.as_ref().components().peekable();
2580
2581        let mut symlink_ancestor = PathBuf::new();
2582
2583        // remove common leading components
2584        while symlink_components.peek() == target_components.peek() {
2585            symlink_ancestor.push(symlink_components.next().unwrap());
2586            target_components.next().unwrap();
2587        }
2588
2589        let mut relative = PathBuf::new();
2590        // prepend a "../" for each ancestor of the symlink
2591        // and create those ancestors as we do so
2592        for symlink_component in symlink_components {
2593            symlink_ancestor.push(symlink_component);
2594            self.ensure_dir(&symlink_ancestor)?;
2595            relative.push("..");
2596        }
2597
2598        // now build the relative path from the remaining components of the target
2599        for target_component in target_components {
2600            relative.push(target_component);
2601        }
2602
2603        // Atomically replace existing symlink
2604        Ok(replace_symlinkat(&relative, &self.repository, name)?)
2605    }
2606
2607    #[context("Reading symlink hash value from {name:?}")]
2608    fn read_symlink_hashvalue(dirfd: &OwnedFd, name: &CStr) -> Result<ObjectID> {
2609        let link_content = readlinkat(dirfd, name, []).context("Reading symlink target")?;
2610        ObjectID::from_object_pathname(link_content.to_bytes())
2611            .context("Parsing object ID from symlink target")
2612    }
2613
2614    #[context("Walking symlink directory")]
2615    fn walk_symlinkdir(fd: OwnedFd, entry_digests: &mut HashSet<OsString>) -> Result<()> {
2616        for item in Dir::read_from(&fd).context("Reading directory entries")? {
2617            let entry = item.context("Reading directory entry")?;
2618            // NB: the underlying filesystem must support returning filetype via direntry
2619            // that's a reasonable assumption, since it must also support fsverity...
2620            match entry.file_type() {
2621                FileType::Directory => {
2622                    let filename = entry.file_name();
2623                    if filename != c"." && filename != c".." {
2624                        let dirfd = openat(
2625                            &fd,
2626                            filename,
2627                            OFlags::RDONLY | OFlags::CLOEXEC,
2628                            Mode::empty(),
2629                        )
2630                        .context("Opening subdirectory for walking")?;
2631                        Self::walk_symlinkdir(dirfd, entry_digests)?;
2632                    }
2633                }
2634                FileType::Symlink => {
2635                    let link_content = readlinkat(&fd, entry.file_name(), [])
2636                        .context("Reading symlink content")?;
2637                    let linked_path = Path::new(OsStr::from_bytes(link_content.as_bytes()));
2638                    if let Some(entry_name) = linked_path.file_name() {
2639                        entry_digests.insert(entry_name.to_os_string());
2640                    } else {
2641                        // Does not have a proper file base name (i.e. "..")
2642                        // TODO: this case needs to be checked in fsck implementation
2643                        continue;
2644                    }
2645                }
2646                _ => {
2647                    bail!("Unexpected file type encountered");
2648                }
2649            }
2650        }
2651
2652        Ok(())
2653    }
2654
2655    /// Open the provided path in the repository.
2656    fn openat(&self, name: &str, flags: OFlags) -> ErrnoResult<OwnedFd> {
2657        // Unconditionally add CLOEXEC as we always want it.
2658        openat(
2659            &self.repository,
2660            name,
2661            flags | OFlags::CLOEXEC,
2662            Mode::empty(),
2663        )
2664    }
2665
2666    // For a GC category (images / streams), return underlying entry digests and
2667    // object IDs for each entry
2668    // Under RefsOnly mode, only entries explicitly referenced in `<category>/refs`
2669    // directory structure would be walked and returned
2670    // Under AllEntries mode, all entires will be returned
2671    // Note that this function assumes all`*/refs/` links link to 1st level entries
2672    // and all 1st level entries link to object store
2673    // TODO: fsck the above noted assumption
2674    #[context("Walking GC category '{category}'")]
2675    fn gc_category(
2676        &self,
2677        category: &str,
2678        mode: GCCategoryWalkMode,
2679    ) -> Result<Vec<(ObjectID, String)>> {
2680        let Some(category_fd) = self
2681            .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
2682            .filter_errno(Errno::NOENT)
2683            .context(format!("Opening {category} dir in repository"))?
2684        else {
2685            return Ok(Vec::new());
2686        };
2687
2688        let mut entry_digests = HashSet::new();
2689        match mode {
2690            GCCategoryWalkMode::RefsOnly => {
2691                if let Some(refs) = openat(
2692                    &category_fd,
2693                    "refs",
2694                    OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
2695                    Mode::empty(),
2696                )
2697                .filter_errno(Errno::NOENT)
2698                .context(format!("Opening {category}/refs dir in repository"))?
2699                {
2700                    Self::walk_symlinkdir(refs, &mut entry_digests)
2701                        .context("Walking refs symlink directory")?;
2702                }
2703            }
2704            GCCategoryWalkMode::AllEntries => {
2705                // All first-level link entries should be directly object references
2706                for item in Dir::read_from(&category_fd).context("Reading category directory")? {
2707                    let entry = item.context("Reading category directory entry")?;
2708                    let filename = entry.file_name();
2709                    if filename != c"refs" && filename != c"." && filename != c".." {
2710                        if entry.file_type() != FileType::Symlink {
2711                            bail!("category directory contains non-symlink");
2712                        }
2713                        entry_digests.insert(OsString::from(&OsStr::from_bytes(
2714                            entry.file_name().to_bytes(),
2715                        )));
2716                    }
2717                }
2718            }
2719        }
2720
2721        let objects = entry_digests
2722            .into_iter()
2723            .map(|entry_fn| {
2724                Ok((
2725                    Self::read_symlink_hashvalue(
2726                        &category_fd,
2727                        CString::new(entry_fn.as_bytes())
2728                            .context("Creating CString from filename")?
2729                            .as_c_str(),
2730                    )
2731                    .context("Reading symlink hash value")?,
2732                    entry_fn
2733                        .to_str()
2734                        .context("str conversion fails")?
2735                        .to_owned(),
2736                ))
2737            })
2738            .collect::<Result<_>>()?;
2739
2740        Ok(objects)
2741    }
2742
2743    // Remove all broken links from a directory, may operate recursively
2744    /// Remove broken symlinks from a directory.
2745    /// If `dry_run` is true, counts but does not remove. Returns the count.
2746    #[context("Cleaning up broken links")]
2747    fn cleanup_broken_links(fd: &OwnedFd, recursive: bool, dry_run: bool) -> Result<u64> {
2748        let mut count = 0;
2749        for item in Dir::read_from(fd).context("Reading directory for broken links cleanup")? {
2750            let entry = item.context("Reading directory entry for broken links cleanup")?;
2751            match entry.file_type() {
2752                FileType::Directory => {
2753                    if !recursive {
2754                        continue;
2755                    }
2756                    let filename = entry.file_name();
2757                    if filename != c"." && filename != c".." {
2758                        let dirfd = openat(
2759                            fd,
2760                            filename,
2761                            OFlags::RDONLY | OFlags::CLOEXEC,
2762                            Mode::empty(),
2763                        )
2764                        .context("Opening subdirectory for recursive broken link cleanup")?;
2765                        count += Self::cleanup_broken_links(&dirfd, recursive, dry_run)
2766                            .context("Cleaning up broken links in subdirectory")?;
2767                    }
2768                }
2769
2770                FileType::Symlink => {
2771                    let filename = entry.file_name();
2772                    let result = statat(fd, filename, AtFlags::empty())
2773                        .filter_errno(Errno::NOENT)
2774                        .context("Testing for broken links")?;
2775                    if result.is_none() {
2776                        count += 1;
2777                        if !dry_run {
2778                            unlinkat(fd, filename, AtFlags::empty())
2779                                .context("Unlinking broken symlink")?;
2780                        }
2781                    }
2782                }
2783
2784                _ => {
2785                    bail!("Unexpected file type encountered");
2786                }
2787            }
2788        }
2789        Ok(count)
2790    }
2791
2792    /// Clean up broken links in a gc category. Returns count of links removed.
2793    #[context("Cleaning up broken links in {category} category")]
2794    fn cleanup_gc_category(&self, category: &'static str, dry_run: bool) -> Result<u64> {
2795        let Some(category_fd) = self
2796            .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
2797            .filter_errno(Errno::NOENT)
2798            .context(format!("Opening {category} dir in repository"))?
2799        else {
2800            return Ok(0);
2801        };
2802        // Always cleanup first-level first, then the refs
2803        let mut count = Self::cleanup_broken_links(&category_fd, false, dry_run)
2804            .with_context(|| format!("Cleaning up broken links in {category}/"))?;
2805        let ref_fd = openat(
2806            &category_fd,
2807            "refs",
2808            OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
2809            Mode::empty(),
2810        )
2811        .filter_errno(Errno::NOENT)
2812        .context(format!("Opening {category}/refs to clean up broken links"))?;
2813        if let Some(ref dirfd) = ref_fd {
2814            count += Self::cleanup_broken_links(dirfd, true, dry_run).with_context(|| {
2815                format!("Cleaning up broken links recursively in {category}/refs")
2816            })?;
2817        }
2818        Ok(count)
2819    }
2820
2821    // Traverse split streams to resolve all linked objects
2822    #[context("Walking streams starting from '{stream_name}'")]
2823    fn walk_streams(
2824        &self,
2825        stream_name_map: &HashMap<ObjectID, String>,
2826        stream_name: &str,
2827        walked_streams: &mut HashSet<String>,
2828        objects: &mut HashSet<ObjectID>,
2829    ) -> Result<()> {
2830        if walked_streams.contains(stream_name) {
2831            return Ok(());
2832        }
2833        walked_streams.insert(stream_name.to_owned());
2834
2835        let mut split_stream = self
2836            .open_stream(stream_name, None, None)
2837            .context("Opening stream for walking")?;
2838        // Plain object references, add to live objects set
2839        split_stream
2840            .get_object_refs(|id| {
2841                trace!("   with {id:?}");
2842                objects.insert(id.clone());
2843            })
2844            .context("Getting object references from stream")?;
2845        // Collect all stream names from named references table to be walked next
2846        let streams_to_walk: Vec<_> = split_stream.iter_named_refs().collect();
2847        // Note that stream name from the named references table is not stream name in repository
2848        // In practice repository name is often table name prefixed with stream types (e.g. oci-config-<table name>)
2849        // Here we always match objectID to be absolutely sure
2850        for (stream_name_in_table, stream_object_id) in streams_to_walk {
2851            trace!(
2852                "   named reference stream {stream_name_in_table} lives, with {stream_object_id:?}"
2853            );
2854            objects.insert(stream_object_id.clone());
2855            if let Some(stream_name_in_repo) = stream_name_map.get(stream_object_id) {
2856                self.walk_streams(
2857                    stream_name_map,
2858                    stream_name_in_repo,
2859                    walked_streams,
2860                    objects,
2861                )
2862                .context("Walking referenced stream")?;
2863            } else {
2864                // stream is in table but not in repo, the repo is potentially broken, issue a warning
2865                trace!(
2866                    "broken repo: named reference stream {stream_name_in_table} not found as stream in repo"
2867                );
2868            }
2869        }
2870        Ok(())
2871    }
2872
2873    /// Given an image, return the set of all objects referenced by it.
2874    #[context("Collecting objects for image '{name}'")]
2875    pub fn objects_for_image(&self, name: &str) -> Result<HashSet<ObjectID>> {
2876        let (image, _) = self.open_image(name)?;
2877        let mut data = vec![];
2878        std::fs::File::from(image)
2879            .read_to_end(&mut data)
2880            .context("Reading image data")?;
2881        crate::erofs::reader::collect_objects(&data)
2882            .context("Collecting objects from erofs image data")
2883    }
2884
2885    /// Makes sure all content is written to the repository.
2886    ///
2887    /// This is currently just syncfs() on the repository's root directory because we don't have
2888    /// any better options at present.  This blocks until the data is written out.
2889    #[context("Syncing repository to disk")]
2890    pub fn sync(&self) -> Result<()> {
2891        syncfs(&self.repository).context("Syncing filesystem")?;
2892        Ok(())
2893    }
2894
2895    /// Makes sure all content is written to the repository.
2896    ///
2897    /// This is currently just syncfs() on the repository's root directory because we don't have
2898    /// any better options at present.  This won't return until the data is written out.
2899    #[context("Syncing repository to disk (async)")]
2900    pub async fn sync_async(self: &Arc<Self>) -> Result<()> {
2901        let self_ = Arc::clone(self);
2902        tokio::task::spawn_blocking(move || self_.sync())
2903            .await
2904            .context("Spawning blocking sync task")?
2905    }
2906
2907    /// Perform garbage collection, removing unreferenced objects.
2908    ///
2909    /// Objects reachable from `images/refs/` or `streams/refs/` are preserved,
2910    /// plus any `additional_roots` (looked up in both images and streams).
2911    /// Returns statistics about what was removed.
2912    ///
2913    /// # Locking
2914    ///
2915    /// An exclusive lock is held for the duration of this operation.
2916    #[context("Running garbage collection")]
2917    pub fn gc(&self, additional_roots: &[&str]) -> Result<GcResult> {
2918        self.ensure_writable_token()?;
2919        flock(&self.repository, FlockOperation::LockExclusive)
2920            .context("Acquiring exclusive lock for GC")?;
2921        self.gc_impl(additional_roots, false)
2922    }
2923
2924    /// Preview what garbage collection would remove, without deleting.
2925    ///
2926    /// Returns the same statistics that [`gc`](Self::gc) would return,
2927    /// but no files are actually deleted.
2928    ///
2929    /// # Locking
2930    ///
2931    /// A shared lock is held for the duration of this operation (readers
2932    /// are not blocked).
2933    #[context("Running garbage collection dry run")]
2934    pub fn gc_dry_run(&self, additional_roots: &[&str]) -> Result<GcResult> {
2935        // Shared lock is sufficient since we don't modify anything
2936        flock(&self.repository, FlockOperation::LockShared)
2937            .context("Acquiring shared lock for GC dry run")?;
2938        self.gc_impl(additional_roots, true)
2939    }
2940
2941    /// Internal GC implementation (lock must already be held).
2942    #[context("GC implementation (dry_run: {dry_run})")]
2943    fn gc_impl(&self, additional_roots: &[&str], dry_run: bool) -> Result<GcResult> {
2944        let mut result = GcResult::default();
2945        let mut live_objects = HashSet::new();
2946
2947        // Build set of additional roots (checked in both images and streams)
2948        let extra_roots: HashSet<_> = additional_roots.iter().map(|s| s.to_string()).collect();
2949
2950        // Collect images: those in images/refs plus caller-specified roots
2951        let all_images = self
2952            .gc_category("images", GCCategoryWalkMode::AllEntries)
2953            .context("Collecting all images")?;
2954        let root_images: Vec<_> = self
2955            .gc_category("images", GCCategoryWalkMode::RefsOnly)
2956            .context("Collecting image refs")?
2957            .into_iter()
2958            .chain(
2959                all_images
2960                    .into_iter()
2961                    .filter(|(_, name)| extra_roots.contains(name)),
2962            )
2963            .collect();
2964
2965        for ref image in root_images {
2966            trace!("{image:?} lives as an image");
2967            live_objects.insert(image.0.clone());
2968            self.objects_for_image(&image.1)
2969                .with_context(|| format!("Collecting objects for image {}", image.1))?
2970                .iter()
2971                .for_each(|id| {
2972                    trace!("   with {id:?}");
2973                    live_objects.insert(id.clone());
2974                });
2975        }
2976
2977        // Collect all streams for the name map, then filter to roots
2978        let all_streams = self
2979            .gc_category("streams", GCCategoryWalkMode::AllEntries)
2980            .context("Collecting all streams")?;
2981        let stream_name_map: HashMap<_, _> = all_streams.iter().cloned().collect();
2982        let root_streams: Vec<_> = self
2983            .gc_category("streams", GCCategoryWalkMode::RefsOnly)
2984            .context("Collecting stream refs")?
2985            .into_iter()
2986            .chain(
2987                all_streams
2988                    .into_iter()
2989                    .filter(|(_, name)| extra_roots.contains(name)),
2990            )
2991            .collect();
2992
2993        let mut walked_streams = HashSet::new();
2994        for stream in root_streams {
2995            trace!("{stream:?} lives as a stream");
2996            live_objects.insert(stream.0.clone());
2997            self.walk_streams(
2998                &stream_name_map,
2999                &stream.1,
3000                &mut walked_streams,
3001                &mut live_objects,
3002            )
3003            .with_context(|| format!("Walking stream {}", stream.1))?;
3004        }
3005
3006        // Walk all objects and remove unreferenced ones
3007        for first_byte in 0x0..=0xff {
3008            let dirfd = match self.openat(
3009                &format!("objects/{first_byte:02x}"),
3010                OFlags::RDONLY | OFlags::DIRECTORY,
3011            ) {
3012                Ok(fd) => fd,
3013                Err(Errno::NOENT) => continue,
3014                Err(e) => Err(e)?,
3015            };
3016            for item in Dir::read_from(&dirfd)
3017                .with_context(|| format!("Reading objects/{first_byte:02x} directory"))?
3018            {
3019                let entry = item.context("Reading object directory entry")?;
3020                let filename = entry.file_name();
3021                if filename != c"." && filename != c".." {
3022                    let id =
3023                        ObjectID::from_object_dir_and_basename(first_byte, filename.to_bytes())
3024                            .context("Parsing object ID from directory entry")?;
3025                    if !live_objects.contains(&id) {
3026                        // Get file size before removing
3027                        if let Ok(stat) = statat(&dirfd, filename, AtFlags::empty()) {
3028                            result.objects_bytes += stat.st_size as u64;
3029                        }
3030                        result.objects_removed += 1;
3031
3032                        debug!(
3033                            "{}: objects/{first_byte:02x}/{filename:?}",
3034                            if dry_run { "would remove" } else { "removing" },
3035                        );
3036
3037                        if !dry_run {
3038                            unlinkat(&dirfd, filename, AtFlags::empty()).with_context(|| {
3039                                format!("Unlinking object {first_byte:02x}/{filename:?}")
3040                            })?;
3041                        }
3042                    } else {
3043                        trace!("objects/{first_byte:02x}/{filename:?} lives");
3044                    }
3045                }
3046            }
3047        }
3048
3049        // Clean up broken symlinks
3050        result.images_pruned = self
3051            .cleanup_gc_category("images", dry_run)
3052            .context("Cleaning up broken image symlinks")?;
3053        result.streams_pruned = self
3054            .cleanup_gc_category("streams", dry_run)
3055            .context("Cleaning up broken stream symlinks")?;
3056
3057        // Downgrade to shared lock if we had exclusive (for actual GC)
3058        if !dry_run {
3059            flock(&self.repository, FlockOperation::LockShared)
3060                .context("Downgrading to shared lock after GC")?;
3061        }
3062        Ok(result)
3063    }
3064
3065    /// Check the structural integrity of the repository.
3066    ///
3067    /// Walks all objects, streams, and images in the repository, verifying:
3068    /// - Object fsverity digests match their path-derived identifiers
3069    /// - Stream and image symlinks resolve to existing objects
3070    /// - Stream/image refs resolve to valid entries
3071    /// - Splitstreams have valid headers and reference only existing objects
3072    ///
3073    /// Object directories are checked in parallel using `spawn_blocking`,
3074    /// with concurrency bounded by `available_parallelism()`.
3075    ///
3076    /// Returns a [`FsckResult`] summarizing the findings. Does not modify
3077    /// any repository contents.
3078    #[context("Running filesystem consistency check")]
3079    pub async fn fsck(&self) -> Result<FsckResult> {
3080        self.fsck_inner(true).await
3081    }
3082
3083    /// Run a metadata-only consistency check.
3084    ///
3085    /// This validates `meta.json` and the stream/image symlinks (including
3086    /// splitstream structure and referenced-object existence) but skips the
3087    /// expensive per-object fs-verity digest verification done by [`fsck`].
3088    /// Useful for a fast structural check on large repositories.
3089    ///
3090    /// [`fsck`]: Self::fsck
3091    pub async fn fsck_metadata_only(&self) -> Result<FsckResult> {
3092        self.fsck_inner(false).await
3093    }
3094
3095    /// Shared implementation for [`fsck`] and [`fsck_metadata_only`].
3096    ///
3097    /// When `check_objects` is false, the object fs-verity verification phase
3098    /// is skipped (so `objects_checked` stays zero).
3099    ///
3100    /// [`fsck`]: Self::fsck
3101    /// [`fsck_metadata_only`]: Self::fsck_metadata_only
3102    async fn fsck_inner(&self, check_objects: bool) -> Result<FsckResult> {
3103        let mut result = FsckResult::default();
3104
3105        // Phase 0: Validate meta.json if present
3106        self.fsck_metadata(&mut result);
3107
3108        // Phase 1: Verify all objects (parallel across object subdirectories)
3109        if check_objects {
3110            self.fsck_objects(&mut result)
3111                .await
3112                .context("Checking objects")?;
3113        }
3114
3115        // Phase 2: Verify stream symlinks and splitstream integrity
3116        self.fsck_category("streams", &mut result)
3117            .context("Checking streams")?;
3118
3119        // Phase 3: Verify image symlinks
3120        self.fsck_category("images", &mut result)
3121            .context("Checking images")?;
3122
3123        Ok(result)
3124    }
3125
3126    /// Validate `meta.json`.
3127    ///
3128    /// Since `open_path` already requires `meta.json` to exist and be
3129    /// parseable, this re-reads from disk to verify on-disk integrity
3130    /// and checks algorithm compatibility.
3131    fn fsck_metadata(&self, result: &mut FsckResult) {
3132        match read_repo_metadata(&self.repository) {
3133            Ok(Some(meta)) => {
3134                result.has_metadata = true;
3135                if let Err(e) = meta.check_compatible::<ObjectID>() {
3136                    result.errors.push(FsckError::MetadataAlgorithmMismatch {
3137                        expected: meta.algorithm.to_string(),
3138                        actual: ObjectID::ALGORITHM.hash_name().to_string(),
3139                    });
3140                    log::warn!("meta.json algorithm mismatch: {e}");
3141                }
3142            }
3143            Ok(None) => {
3144                // Should not happen since open_path requires meta.json,
3145                // but report it if the file was removed after open.
3146                result.errors.push(FsckError::MetadataParseFailed {
3147                    detail: format!(
3148                        "{REPO_METADATA_FILENAME} not found; \
3149                         expected because repository was opened successfully"
3150                    ),
3151                });
3152            }
3153            Err(e) => {
3154                result.errors.push(FsckError::MetadataParseFailed {
3155                    detail: format!("{e:#}"),
3156                });
3157            }
3158        }
3159    }
3160
3161    /// Verify all objects in the repository have correct fsverity digests.
3162    ///
3163    /// Each `objects/XX/` subdirectory is checked on a blocking thread via
3164    /// `tokio::task::spawn_blocking`, with bounded concurrency to avoid
3165    /// overwhelming the system with I/O.
3166    async fn fsck_objects(&self, result: &mut FsckResult) -> Result<()> {
3167        // Cap at available CPUs; the work is a mix of I/O (reading objects)
3168        // and CPU (computing verity hashes).
3169        let max_concurrent = available_parallelism().map(|n| n.get()).unwrap_or(4);
3170        let insecure = self.insecure;
3171
3172        let mut joinset = tokio::task::JoinSet::new();
3173        let mut partial_results = Vec::new();
3174
3175        for first_byte in 0x00..=0xffu8 {
3176            // Drain completed tasks if we're at the concurrency limit
3177            while joinset.len() >= max_concurrent {
3178                partial_results.push(joinset.join_next().await.unwrap()??);
3179            }
3180
3181            let dirfd = match self.openat(
3182                &format!("objects/{first_byte:02x}"),
3183                OFlags::RDONLY | OFlags::DIRECTORY,
3184            ) {
3185                Ok(fd) => fd,
3186                Err(Errno::NOENT) => continue,
3187                Err(e) => {
3188                    Err(e).with_context(|| format!("Opening objects/{first_byte:02x} directory"))?
3189                }
3190            };
3191
3192            joinset
3193                .spawn_blocking(move || fsck_object_dir::<ObjectID>(dirfd, first_byte, insecure));
3194        }
3195
3196        // Drain remaining tasks
3197        while let Some(output) = joinset.join_next().await {
3198            partial_results.push(output??);
3199        }
3200
3201        // Fold all per-directory results into the main result
3202        for partial in partial_results {
3203            result.objects_checked += partial.objects_checked;
3204            result.objects_corrupted += partial.objects_corrupted;
3205            result.errors.extend(partial.errors);
3206        }
3207
3208        Ok(())
3209    }
3210
3211    /// Verify symlink integrity and splitstream/image validity for a category
3212    /// ("streams" or "images").
3213    #[context("Checking {category} integrity")]
3214    fn fsck_category(&self, category: &str, result: &mut FsckResult) -> Result<()> {
3215        let is_streams = category == "streams";
3216
3217        let Some(category_fd) = self
3218            .openat(category, OFlags::RDONLY | OFlags::DIRECTORY)
3219            .filter_errno(Errno::NOENT)
3220            .with_context(|| format!("Opening {category} directory"))?
3221        else {
3222            return Ok(());
3223        };
3224
3225        // Check first-level symlinks: each should point to an existing object
3226        for item in
3227            Dir::read_from(&category_fd).with_context(|| format!("Reading {category} directory"))?
3228        {
3229            let entry = item.context("Reading directory entry")?;
3230            let filename = entry.file_name();
3231            if filename == c"." || filename == c".." || filename == c"refs" {
3232                continue;
3233            }
3234
3235            if is_streams {
3236                result.streams_checked += 1;
3237            } else {
3238                result.images_checked += 1;
3239            }
3240
3241            if entry.file_type() != FileType::Symlink {
3242                if is_streams {
3243                    result.streams_corrupted += 1;
3244                } else {
3245                    result.images_corrupted += 1;
3246                }
3247                result.errors.push(FsckError::EntryNotSymlink {
3248                    path: format!(
3249                        "{category}/{}",
3250                        String::from_utf8_lossy(filename.to_bytes())
3251                    ),
3252                });
3253                continue;
3254            }
3255
3256            // Check the symlink resolves (follows through to the object)
3257            match statat(&category_fd, filename, AtFlags::empty()) {
3258                Ok(_) => {}
3259                Err(Errno::NOENT) => {
3260                    result.broken_links += 1;
3261                    if is_streams {
3262                        result.streams_corrupted += 1;
3263                    } else {
3264                        result.images_corrupted += 1;
3265                    }
3266                    result.errors.push(FsckError::BrokenSymlink {
3267                        path: format!(
3268                            "{category}/{}",
3269                            String::from_utf8_lossy(filename.to_bytes())
3270                        ),
3271                    });
3272                    continue;
3273                }
3274                Err(e) => {
3275                    result.errors.push(FsckError::StatFailed {
3276                        path: format!(
3277                            "{category}/{}",
3278                            String::from_utf8_lossy(filename.to_bytes())
3279                        ),
3280                        detail: e.to_string(),
3281                    });
3282                    continue;
3283                }
3284            }
3285
3286            let name = String::from_utf8_lossy(filename.to_bytes()).to_string();
3287            if is_streams {
3288                // Validate splitstream contents
3289                self.fsck_splitstream(&name, result);
3290            } else {
3291                // Validate erofs image structure and object references
3292                self.fsck_image(&name, result);
3293            }
3294        }
3295
3296        // Check refs/ symlinks
3297        let refs_fd = match openat(
3298            &category_fd,
3299            c"refs",
3300            OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
3301            Mode::empty(),
3302        )
3303        .filter_errno(Errno::NOENT)
3304        .with_context(|| format!("Opening {category}/refs directory"))?
3305        {
3306            Some(fd) => fd,
3307            None => return Ok(()),
3308        };
3309
3310        self.fsck_refs_dir(&refs_fd, category, "", result)
3311            .with_context(|| format!("Checking {category}/refs"))
3312    }
3313
3314    /// Recursively verify that all ref symlinks resolve to valid entries in the
3315    /// parent category directory.
3316    fn fsck_refs_dir(
3317        &self,
3318        refs_fd: &OwnedFd,
3319        category: &str,
3320        prefix: &str,
3321        result: &mut FsckResult,
3322    ) -> Result<()> {
3323        for item in Dir::read_from(refs_fd)
3324            .with_context(|| format!("Reading {category}/refs/{prefix} directory"))?
3325        {
3326            let entry = item.context("Reading refs directory entry")?;
3327            let filename = entry.file_name();
3328            if filename == c"." || filename == c".." {
3329                continue;
3330            }
3331
3332            let name = String::from_utf8_lossy(filename.to_bytes()).to_string();
3333            let display_path = if prefix.is_empty() {
3334                format!("{category}/refs/{name}")
3335            } else {
3336                format!("{category}/refs/{prefix}/{name}")
3337            };
3338
3339            match entry.file_type() {
3340                FileType::Directory => {
3341                    let subdir = openat(
3342                        refs_fd,
3343                        filename,
3344                        OFlags::RDONLY | OFlags::DIRECTORY | OFlags::CLOEXEC,
3345                        Mode::empty(),
3346                    )
3347                    .with_context(|| format!("Opening {display_path}"))?;
3348                    let sub_prefix = if prefix.is_empty() {
3349                        name.clone()
3350                    } else {
3351                        format!("{prefix}/{name}")
3352                    };
3353                    self.fsck_refs_dir(&subdir, category, &sub_prefix, result)?;
3354                }
3355                FileType::Symlink => {
3356                    // The ref should ultimately resolve to a file (following
3357                    // the chain: refs/X -> ../../entry -> ../objects/XX/YY)
3358                    match statat(refs_fd, filename, AtFlags::empty()) {
3359                        Ok(_) => {}
3360                        Err(Errno::NOENT) => {
3361                            result.broken_links += 1;
3362                            result.errors.push(FsckError::BrokenSymlink {
3363                                path: display_path.clone(),
3364                            });
3365                        }
3366                        Err(e) => {
3367                            result.errors.push(FsckError::StatFailed {
3368                                path: display_path.clone(),
3369                                detail: e.to_string(),
3370                            });
3371                        }
3372                    }
3373                }
3374                other => {
3375                    result.errors.push(FsckError::UnexpectedFileType {
3376                        path: display_path.clone(),
3377                        detail: format!("{other:?}"),
3378                    });
3379                }
3380            }
3381        }
3382        Ok(())
3383    }
3384
3385    /// Validate a single splitstream: check header and object references.
3386    fn fsck_splitstream(&self, stream_name: &str, result: &mut FsckResult) {
3387        let stream_path = format!("streams/{stream_name}");
3388        let mut split_stream = match self.open_stream(stream_name, None, None) {
3389            Ok(s) => s,
3390            Err(e) => {
3391                result.streams_corrupted += 1;
3392                result.errors.push(FsckError::StreamOpenFailed {
3393                    path: stream_path,
3394                    detail: e.to_string(),
3395                });
3396                return;
3397            }
3398        };
3399
3400        // Check that all object_refs point to existing objects
3401        let check_result = split_stream.get_object_refs(|id| {
3402            let obj_path = Self::format_object_path(id);
3403            match self.openat(&obj_path, OFlags::RDONLY) {
3404                Ok(_) => {}
3405                Err(Errno::NOENT) => {
3406                    result.missing_objects += 1;
3407                    result.errors.push(FsckError::MissingObjectRef {
3408                        path: stream_path.clone(),
3409                        object_id: id.to_hex(),
3410                    });
3411                }
3412                Err(e) => {
3413                    result.errors.push(FsckError::ObjectCheckFailed {
3414                        path: stream_path.clone(),
3415                        object_id: id.to_hex(),
3416                        detail: e.to_string(),
3417                    });
3418                }
3419            }
3420        });
3421        if let Err(e) = check_result {
3422            result.streams_corrupted += 1;
3423            result.errors.push(FsckError::StreamReadFailed {
3424                path: stream_path,
3425                detail: e.to_string(),
3426            });
3427            return;
3428        }
3429
3430        // Check that all named refs (stream refs) point to existing objects
3431        for (ref_name, ref_id) in split_stream.iter_named_refs() {
3432            // The named ref's object should exist
3433            let obj_path = Self::format_object_path(ref_id);
3434            match self.openat(&obj_path, OFlags::RDONLY) {
3435                Ok(_) => {}
3436                Err(Errno::NOENT) => {
3437                    result.missing_objects += 1;
3438                    result.errors.push(FsckError::MissingNamedRef {
3439                        path: stream_path.clone(),
3440                        ref_name: ref_name.to_string(),
3441                        object_id: ref_id.to_hex(),
3442                    });
3443                }
3444                Err(e) => {
3445                    result.errors.push(FsckError::ObjectCheckFailed {
3446                        path: stream_path.clone(),
3447                        object_id: ref_id.to_hex(),
3448                        detail: format!("checking named ref '{ref_name}': {e}"),
3449                    });
3450                }
3451            }
3452            // The stream entry itself should also exist (but don't double-count).
3453            // Note: the named ref name may not correspond to an actual stream
3454            // entry.  OCI images use named refs with keys like
3455            // "config:sha256:..." or layer diff_ids that aren't stream names.
3456            // We only warn if the object itself is missing (handled above);
3457            // a missing stream entry with an existing object is benign.
3458        }
3459    }
3460
3461    /// Validate a single erofs image: parse structure, enforce composefs
3462    /// invariants, and verify all referenced objects exist.
3463    fn fsck_image(&self, image_name: &str, result: &mut FsckResult) {
3464        // Read the image data
3465        let image_path = format!("images/{image_name}");
3466        let mut data = vec![];
3467        let fd = match self.openat(&image_path, OFlags::RDONLY) {
3468            Ok(fd) => fd,
3469            Err(e) => {
3470                result.images_corrupted += 1;
3471                result.errors.push(FsckError::ImageOpenFailed {
3472                    path: image_path,
3473                    detail: e.to_string(),
3474                });
3475                return;
3476            }
3477        };
3478        if let Err(e) = File::from(fd).read_to_end(&mut data) {
3479            result.images_corrupted += 1;
3480            result.errors.push(FsckError::ImageReadFailed {
3481                path: image_path,
3482                detail: e.to_string(),
3483            });
3484            return;
3485        }
3486
3487        // Parse the erofs image with composefs-specific structural validation
3488        // (header magic, superblock, no unsupported features, etc.) and walk
3489        // the directory tree to collect all referenced object IDs.
3490        let objects = match crate::erofs::reader::collect_objects::<ObjectID>(&data) {
3491            Ok(objects) => objects,
3492            Err(e) => {
3493                result.images_corrupted += 1;
3494                result.errors.push(FsckError::ImageInvalid {
3495                    path: image_path,
3496                    detail: e.to_string(),
3497                });
3498                return;
3499            }
3500        };
3501
3502        // Verify all referenced objects exist
3503        for obj_id in &objects {
3504            let path = Self::format_object_path(obj_id);
3505            match self.openat(&path, OFlags::RDONLY) {
3506                Ok(_) => {}
3507                Err(Errno::NOENT) => {
3508                    result.missing_objects += 1;
3509                    result.errors.push(FsckError::ImageMissingObject {
3510                        path: image_path.clone(),
3511                        object_id: obj_id.to_hex(),
3512                    });
3513                }
3514                Err(e) => {
3515                    result.errors.push(FsckError::ObjectCheckFailed {
3516                        path: image_path.clone(),
3517                        object_id: obj_id.to_hex(),
3518                        detail: e.to_string(),
3519                    });
3520                }
3521            }
3522        }
3523    }
3524
3525    /// Returns a borrowed file descriptor for the repository root.
3526    ///
3527    /// This allows low-level operations on the repository directory.
3528    pub fn repo_fd(&self) -> BorrowedFd<'_> {
3529        self.repository.as_fd()
3530    }
3531
3532    /// Return the repository metadata parsed from `meta.json` at open time.
3533    ///
3534    /// The metadata was already validated against this repository's
3535    /// `ObjectID` type when the repository was opened, so no further
3536    /// compatibility check is needed.
3537    pub fn metadata(&self) -> &RepoMetadata {
3538        &self.metadata
3539    }
3540
3541    /// Returns the effective EROFS format version for this repository.
3542    ///
3543    /// Returns the per-invocation override set by [`set_erofs_version`](Self::set_erofs_version)
3544    /// if one is active, otherwise returns the default version from the stored
3545    /// [`FormatConfig`] (see [`format_config`](Self::format_config)).
3546    pub fn erofs_version(&self) -> FormatVersion {
3547        self.erofs_version_override
3548            .unwrap_or_else(|| self.metadata.erofs_version())
3549    }
3550
3551    /// Returns the effective [`FormatConfig`] for this repository.
3552    ///
3553    /// When a per-invocation version override is active (set via
3554    /// [`set_erofs_version`](Self::set_erofs_version)), returns a single-version
3555    /// config for that override — the override narrows generation to exactly one
3556    /// format, discarding any `extra` versions from `meta.json`.
3557    ///
3558    /// Otherwise returns the full config from `meta.json`, including any `extra`
3559    /// versions.  For repositories created before the `erofs_formats` field was
3560    /// added, the config is derived from the legacy `"v1_erofs"` ro_compat flag.
3561    pub fn format_config(&self) -> FormatConfig {
3562        if let Some(v) = self.erofs_version_override {
3563            FormatConfig::single(v)
3564        } else {
3565            repo_format_config_from_meta(&self.metadata)
3566        }
3567    }
3568
3569    /// Returns the primary [`FormatConfig`] configured for this repository.
3570    ///
3571    /// Alias for [`format_config`](Self::format_config).
3572    pub fn default_format_config(&self) -> FormatConfig {
3573        self.format_config()
3574    }
3575
3576    /// Lists all named stream references under a given prefix.
3577    ///
3578    /// Returns (name, target) pairs where name is relative to the prefix.
3579    pub fn list_stream_refs(&self, prefix: &str) -> Result<Vec<(String, String)>> {
3580        let ref_path = format!("streams/refs/{prefix}");
3581
3582        let dir_fd = match self.openat(&ref_path, OFlags::RDONLY | OFlags::DIRECTORY) {
3583            Ok(fd) => fd,
3584            Err(Errno::NOENT) => return Ok(Vec::new()),
3585            Err(e) => return Err(e.into()),
3586        };
3587
3588        let mut refs = Vec::new();
3589        for item in Dir::read_from(&dir_fd)? {
3590            let entry = item?;
3591            let name_bytes = entry.file_name().to_bytes();
3592
3593            if name_bytes == b"." || name_bytes == b".." {
3594                continue;
3595            }
3596
3597            let name = match std::str::from_utf8(name_bytes) {
3598                Ok(s) => s.to_string(),
3599                Err(_) => continue,
3600            };
3601
3602            if let Ok(target) = readlinkat(&dir_fd, name_bytes, vec![])
3603                && let Ok(target_str) = target.into_string()
3604            {
3605                refs.push((name, target_str));
3606            }
3607        }
3608
3609        Ok(refs)
3610    }
3611}
3612
3613/// Verify each object in a single `objects/XX/` subdirectory.
3614///
3615/// This is a free function (not a method) so it can be used with
3616/// `spawn_blocking` — it captures only owned/`Send` values.
3617fn fsck_object_dir<ObjectID: FsVerityHashValue>(
3618    dirfd: OwnedFd,
3619    first_byte: u8,
3620    insecure: bool,
3621) -> Result<FsckResult> {
3622    let mut result = FsckResult::default();
3623
3624    for item in Dir::read_from(&dirfd)
3625        .with_context(|| format!("Reading objects/{first_byte:02x} directory"))?
3626    {
3627        let entry = item.context("Reading object directory entry")?;
3628        let filename = entry.file_name();
3629        if filename == c"." || filename == c".." {
3630            continue;
3631        }
3632
3633        result.objects_checked += 1;
3634
3635        let expected_id =
3636            match ObjectID::from_object_dir_and_basename(first_byte, filename.to_bytes()) {
3637                Ok(id) => id,
3638                Err(e) => {
3639                    result.objects_corrupted += 1;
3640                    result.errors.push(FsckError::ObjectInvalidName {
3641                        path: format!(
3642                            "objects/{first_byte:02x}/{}",
3643                            String::from_utf8_lossy(filename.to_bytes())
3644                        ),
3645                        detail: e.to_string(),
3646                    });
3647                    continue;
3648                }
3649            };
3650
3651        let fd = match openat(
3652            &dirfd,
3653            filename,
3654            OFlags::RDONLY | OFlags::CLOEXEC,
3655            Mode::empty(),
3656        ) {
3657            Ok(fd) => fd,
3658            Err(e) => {
3659                result.objects_corrupted += 1;
3660                result.errors.push(FsckError::ObjectOpenFailed {
3661                    path: format!(
3662                        "objects/{first_byte:02x}/{}",
3663                        String::from_utf8_lossy(filename.to_bytes())
3664                    ),
3665                    detail: e.to_string(),
3666                });
3667                continue;
3668            }
3669        };
3670
3671        let Some(measured) =
3672            fsck_measure_object::<ObjectID>(fd, &expected_id, insecure, &mut result)
3673        else {
3674            continue;
3675        };
3676
3677        if measured != expected_id {
3678            result.objects_corrupted += 1;
3679            result.errors.push(FsckError::ObjectDigestMismatch {
3680                path: format!("objects/{}", expected_id.to_object_pathname()),
3681                measured: measured.to_hex(),
3682            });
3683        }
3684    }
3685    Ok(result)
3686}
3687
3688/// Measure the verity digest of a single object file.
3689///
3690/// Returns `Some(digest)` on success, or `None` after recording the error
3691/// in `result` (so the caller can `continue`).
3692fn fsck_measure_object<ObjectID: FsVerityHashValue>(
3693    fd: OwnedFd,
3694    expected_id: &ObjectID,
3695    insecure: bool,
3696    result: &mut FsckResult,
3697) -> Option<ObjectID> {
3698    if let Ok(digest) = measure_verity::<ObjectID>(&fd) {
3699        return Some(digest);
3700    }
3701
3702    // Kernel measurement failed — in insecure mode, try userspace computation
3703    if insecure {
3704        match Repository::<ObjectID>::compute_verity_digest(&mut std::io::BufReader::new(
3705            File::from(fd),
3706        )) {
3707            Ok(digest) => return Some(digest),
3708            Err(e) => {
3709                result.objects_corrupted += 1;
3710                result.errors.push(FsckError::ObjectVerityFailed {
3711                    path: format!("objects/{}", expected_id.to_object_pathname()),
3712                    detail: e.to_string(),
3713                });
3714                return None;
3715            }
3716        }
3717    }
3718
3719    // Not insecure — verity is required but missing/unsupported
3720    result.objects_corrupted += 1;
3721    result.errors.push(FsckError::ObjectVerityMissing {
3722        path: format!("objects/{}", expected_id.to_object_pathname()),
3723    });
3724    None
3725}
3726
3727#[cfg(test)]
3728mod tests {
3729    use super::*;
3730    use crate::fsverity::{Sha256HashValue, Sha512HashValue};
3731    use crate::test::tempdir;
3732    use rustix::fs::{CWD, statat};
3733    use tempfile::TempDir;
3734
3735    /// Create a test repository in insecure mode (no fs-verity required).
3736    fn create_test_repo(path: &Path) -> Result<Arc<Repository<Sha512HashValue>>> {
3737        let (repo, _) = Repository::init_path(
3738            CWD,
3739            path,
3740            RepositoryConfig::new(Algorithm::SHA512).set_insecure(),
3741        )?;
3742        Ok(Arc::new(repo))
3743    }
3744
3745    /// Generate deterministic test data of a given size.
3746    fn generate_test_data(size: u64, seed: u8) -> Vec<u8> {
3747        (0..size)
3748            .map(|i| ((i as u8).wrapping_add(seed)).wrapping_mul(17))
3749            .collect()
3750    }
3751
3752    fn read_links_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<Option<PathBuf>>
3753    where
3754        P: AsRef<Path>,
3755    {
3756        let full_path = tmp.path().join("repo").join(repo_sub_path);
3757        match readlinkat(CWD, &full_path, Vec::new()) {
3758            Ok(result) => Ok(Some(PathBuf::from(result.to_str()?))),
3759            Err(rustix::io::Errno::NOENT) => Ok(None),
3760            Err(e) => Err(e.into()),
3761        }
3762    }
3763
3764    // Does not follow symlinks
3765    fn test_path_exists_in_repo<P>(tmp: &TempDir, repo_sub_path: P) -> Result<bool>
3766    where
3767        P: AsRef<Path>,
3768    {
3769        let full_path = tmp.path().join("repo").join(repo_sub_path);
3770        match statat(CWD, &full_path, AtFlags::SYMLINK_NOFOLLOW) {
3771            Ok(_) => Ok(true),
3772            Err(rustix::io::Errno::NOENT) => Ok(false),
3773            Err(e) => Err(e.into()),
3774        }
3775    }
3776
3777    fn test_object_exists(tmp: &TempDir, obj: &Sha512HashValue) -> Result<bool> {
3778        let digest = obj.to_hex();
3779        let (first_two, remainder) = digest.split_at(2);
3780        test_path_exists_in_repo(tmp, &format!("objects/{first_two}/{remainder}"))
3781    }
3782
3783    #[test]
3784    fn test_gc_removes_one_stream() -> Result<()> {
3785        let tmp = tempdir();
3786        let repo = create_test_repo(&tmp.path().join("repo"))?;
3787
3788        let obj1 = generate_test_data(32 * 1024, 0xAE);
3789        let obj2 = generate_test_data(64 * 1024, 0xEA);
3790
3791        let obj1_id = repo.ensure_object(&obj1)?;
3792        let obj2_id: Sha512HashValue = compute_verity(&obj2);
3793
3794        let mut writer = repo.create_stream(0)?;
3795        writer.write_external(&obj2)?;
3796        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
3797
3798        repo.sync()?;
3799
3800        assert!(test_object_exists(&tmp, &obj1_id)?);
3801        assert!(test_object_exists(&tmp, &obj2_id)?);
3802        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
3803        let link_target =
3804            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
3805        assert!(test_path_exists_in_repo(
3806            &tmp,
3807            PathBuf::from("streams").join(&link_target)
3808        )?);
3809
3810        // Now perform gc - should remove 2 objects (obj1 + obj2) and 1 stream symlink
3811        let result = repo.gc(&[])?;
3812
3813        assert!(!test_object_exists(&tmp, &obj1_id)?);
3814        assert!(!test_object_exists(&tmp, &obj2_id)?);
3815        assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream")?);
3816
3817        // Verify GcResult: 3 objects removed (obj1, obj2, splitstream), stream symlink pruned
3818        assert_eq!(result.objects_removed, 3);
3819        assert!(result.objects_bytes > 0);
3820        assert_eq!(result.streams_pruned, 1);
3821        assert_eq!(result.images_pruned, 0);
3822        Ok(())
3823    }
3824
3825    #[test]
3826    fn test_gc_keeps_one_stream() -> Result<()> {
3827        let tmp = tempdir();
3828        let repo = create_test_repo(&tmp.path().join("repo"))?;
3829
3830        let obj1 = generate_test_data(32 * 1024, 0xAE);
3831        let obj2 = generate_test_data(64 * 1024, 0xEA);
3832
3833        let obj1_id = repo.ensure_object(&obj1)?;
3834        let obj2_id: Sha512HashValue = compute_verity(&obj2);
3835
3836        let mut writer = repo.create_stream(0)?;
3837        writer.write_external(&obj2)?;
3838        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
3839
3840        repo.sync()?;
3841
3842        assert!(test_object_exists(&tmp, &obj1_id)?);
3843        assert!(test_object_exists(&tmp, &obj2_id)?);
3844        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
3845        let link_target =
3846            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
3847        assert!(test_path_exists_in_repo(
3848            &tmp,
3849            PathBuf::from("streams").join(&link_target)
3850        )?);
3851
3852        // Now perform gc - should remove only obj1, keep obj2 and stream
3853        let result = repo.gc(&["test-stream"])?;
3854
3855        assert!(!test_object_exists(&tmp, &obj1_id)?);
3856        assert!(test_object_exists(&tmp, &obj2_id)?);
3857        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
3858        let link_target =
3859            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
3860        assert!(test_path_exists_in_repo(
3861            &tmp,
3862            PathBuf::from("streams").join(&link_target)
3863        )?);
3864
3865        // Verify GcResult: only 1 object removed, no symlinks pruned
3866        assert_eq!(result.objects_removed, 1);
3867        assert!(result.objects_bytes > 0);
3868        assert_eq!(result.streams_pruned, 0);
3869        assert_eq!(result.images_pruned, 0);
3870        Ok(())
3871    }
3872
3873    #[test]
3874    fn test_gc_keeps_one_stream_from_refs() -> Result<()> {
3875        let tmp = tempdir();
3876        let repo = create_test_repo(&tmp.path().join("repo"))?;
3877
3878        let obj1 = generate_test_data(32 * 1024, 0xAE);
3879        let obj2 = generate_test_data(64 * 1024, 0xEA);
3880
3881        let obj1_id = repo.ensure_object(&obj1)?;
3882        let obj2_id: Sha512HashValue = compute_verity(&obj2);
3883
3884        let mut writer = repo.create_stream(0)?;
3885        writer.write_external(&obj2)?;
3886        let _stream_id = repo.write_stream(writer, "test-stream", Some("ref-name"))?;
3887
3888        repo.sync()?;
3889
3890        assert!(test_object_exists(&tmp, &obj1_id)?);
3891        assert!(test_object_exists(&tmp, &obj2_id)?);
3892        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
3893        let link_target =
3894            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
3895        assert!(test_path_exists_in_repo(
3896            &tmp,
3897            PathBuf::from("streams").join(&link_target)
3898        )?);
3899
3900        // Now perform gc - stream is kept via ref, only obj1 removed
3901        let result = repo.gc(&[])?;
3902
3903        assert!(!test_object_exists(&tmp, &obj1_id)?);
3904        assert!(test_object_exists(&tmp, &obj2_id)?);
3905        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream")?);
3906        let link_target =
3907            read_links_in_repo(&tmp, "streams/test-stream")?.expect("link is not broken");
3908        assert!(test_path_exists_in_repo(
3909            &tmp,
3910            PathBuf::from("streams").join(&link_target)
3911        )?);
3912
3913        // Verify GcResult: 1 object removed, no symlinks pruned (stream has ref)
3914        assert_eq!(result.objects_removed, 1);
3915        assert!(result.objects_bytes > 0);
3916        assert_eq!(result.streams_pruned, 0);
3917        assert_eq!(result.images_pruned, 0);
3918        Ok(())
3919    }
3920
3921    #[test]
3922    fn test_gc_keeps_one_stream_from_two_overlapped() -> Result<()> {
3923        let tmp = tempdir();
3924        let repo = create_test_repo(&tmp.path().join("repo"))?;
3925
3926        let obj1 = generate_test_data(32 * 1024, 0xAE);
3927        let obj2 = generate_test_data(64 * 1024, 0xEA);
3928        let obj3 = generate_test_data(64 * 1024, 0xAA);
3929        let obj4 = generate_test_data(64 * 1024, 0xEE);
3930
3931        let obj1_id = repo.ensure_object(&obj1)?;
3932        let obj2_id: Sha512HashValue = compute_verity(&obj2);
3933        let obj3_id: Sha512HashValue = compute_verity(&obj3);
3934        let obj4_id: Sha512HashValue = compute_verity(&obj4);
3935
3936        let mut writer1 = repo.create_stream(0)?;
3937        writer1.write_external(&obj2)?;
3938        writer1.write_external(&obj3)?;
3939        let _stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
3940
3941        let mut writer2 = repo.create_stream(0)?;
3942        writer2.write_external(&obj2)?;
3943        writer2.write_external(&obj4)?;
3944        let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
3945
3946        repo.sync()?;
3947
3948        assert!(test_object_exists(&tmp, &obj1_id)?);
3949        assert!(test_object_exists(&tmp, &obj2_id)?);
3950        assert!(test_object_exists(&tmp, &obj3_id)?);
3951        assert!(test_object_exists(&tmp, &obj4_id)?);
3952        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
3953        let link_target =
3954            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
3955        assert!(test_path_exists_in_repo(
3956            &tmp,
3957            PathBuf::from("streams").join(&link_target)
3958        )?);
3959        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
3960        let link_target =
3961            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
3962        assert!(test_path_exists_in_repo(
3963            &tmp,
3964            PathBuf::from("streams").join(&link_target)
3965        )?);
3966
3967        // Now perform gc - keep stream1, remove obj1, obj4, and stream2
3968        let result = repo.gc(&["test-stream1"])?;
3969
3970        assert!(!test_object_exists(&tmp, &obj1_id)?);
3971        assert!(test_object_exists(&tmp, &obj2_id)?);
3972        assert!(test_object_exists(&tmp, &obj3_id)?);
3973        assert!(!test_object_exists(&tmp, &obj4_id)?);
3974        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
3975        let link_target =
3976            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
3977        assert!(test_path_exists_in_repo(
3978            &tmp,
3979            PathBuf::from("streams").join(&link_target)
3980        )?);
3981        assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
3982
3983        // Verify GcResult: 3 objects removed (obj1, obj4, stream2's splitstream), 1 stream pruned
3984        assert_eq!(result.objects_removed, 3);
3985        assert!(result.objects_bytes > 0);
3986        assert_eq!(result.streams_pruned, 1);
3987        assert_eq!(result.images_pruned, 0);
3988        Ok(())
3989    }
3990
3991    #[test]
3992    fn test_gc_keeps_named_references() -> Result<()> {
3993        let tmp = tempdir();
3994        let repo = create_test_repo(&tmp.path().join("repo"))?;
3995
3996        let obj1 = generate_test_data(32 * 1024, 0xAE);
3997        let obj2 = generate_test_data(64 * 1024, 0xEA);
3998
3999        let obj1_id = repo.ensure_object(&obj1)?;
4000        let obj2_id: Sha512HashValue = compute_verity(&obj2);
4001
4002        let mut writer1 = repo.create_stream(0)?;
4003        writer1.write_external(&obj2)?;
4004        let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
4005
4006        let mut writer2 = repo.create_stream(0)?;
4007        writer2.add_named_stream_ref("test-stream1", &stream1_id);
4008        let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
4009
4010        repo.sync()?;
4011
4012        assert!(test_object_exists(&tmp, &obj1_id)?);
4013        assert!(test_object_exists(&tmp, &obj2_id)?);
4014        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
4015        let link_target =
4016            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
4017        assert!(test_path_exists_in_repo(
4018            &tmp,
4019            PathBuf::from("streams").join(&link_target)
4020        )?);
4021        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
4022        let link_target =
4023            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
4024        assert!(test_path_exists_in_repo(
4025            &tmp,
4026            PathBuf::from("streams").join(&link_target)
4027        )?);
4028
4029        // Now perform gc - stream2 refs stream1, both kept, only obj1 removed
4030        let result = repo.gc(&["test-stream2"])?;
4031
4032        assert!(!test_object_exists(&tmp, &obj1_id)?);
4033        assert!(test_object_exists(&tmp, &obj2_id)?);
4034        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
4035        let link_target =
4036            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
4037        assert!(test_path_exists_in_repo(
4038            &tmp,
4039            PathBuf::from("streams").join(&link_target)
4040        )?);
4041        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
4042        let link_target =
4043            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
4044        assert!(test_path_exists_in_repo(
4045            &tmp,
4046            PathBuf::from("streams").join(&link_target)
4047        )?);
4048
4049        // Verify GcResult: 1 object removed, no symlinks pruned
4050        assert_eq!(result.objects_removed, 1);
4051        assert!(result.objects_bytes > 0);
4052        assert_eq!(result.streams_pruned, 0);
4053        assert_eq!(result.images_pruned, 0);
4054        Ok(())
4055    }
4056
4057    #[test]
4058    fn test_gc_keeps_named_references_with_different_table_name() -> Result<()> {
4059        let tmp = tempdir();
4060        let repo = create_test_repo(&tmp.path().join("repo"))?;
4061
4062        let obj1 = generate_test_data(32 * 1024, 0xAE);
4063        let obj2 = generate_test_data(64 * 1024, 0xEA);
4064
4065        let obj1_id = repo.ensure_object(&obj1)?;
4066        let obj2_id: Sha512HashValue = compute_verity(&obj2);
4067
4068        let mut writer1 = repo.create_stream(0)?;
4069        writer1.write_external(&obj2)?;
4070        let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
4071
4072        let mut writer2 = repo.create_stream(0)?;
4073        writer2.add_named_stream_ref("different-table-name-for-test-stream1", &stream1_id);
4074        let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
4075
4076        repo.sync()?;
4077
4078        assert!(test_object_exists(&tmp, &obj1_id)?);
4079        assert!(test_object_exists(&tmp, &obj2_id)?);
4080        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
4081        let link_target =
4082            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
4083        assert!(test_path_exists_in_repo(
4084            &tmp,
4085            PathBuf::from("streams").join(&link_target)
4086        )?);
4087        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
4088        let link_target =
4089            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
4090        assert!(test_path_exists_in_repo(
4091            &tmp,
4092            PathBuf::from("streams").join(&link_target)
4093        )?);
4094
4095        // Now perform gc - different table name, but same object ID links them
4096        let result = repo.gc(&["test-stream2"])?;
4097
4098        assert!(!test_object_exists(&tmp, &obj1_id)?);
4099        assert!(test_object_exists(&tmp, &obj2_id)?);
4100        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
4101        let link_target =
4102            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
4103        assert!(test_path_exists_in_repo(
4104            &tmp,
4105            PathBuf::from("streams").join(&link_target)
4106        )?);
4107        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
4108        let link_target =
4109            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
4110        assert!(test_path_exists_in_repo(
4111            &tmp,
4112            PathBuf::from("streams").join(&link_target)
4113        )?);
4114
4115        // Verify GcResult: 1 object removed, no symlinks pruned
4116        assert_eq!(result.objects_removed, 1);
4117        assert!(result.objects_bytes > 0);
4118        assert_eq!(result.streams_pruned, 0);
4119        assert_eq!(result.images_pruned, 0);
4120        Ok(())
4121    }
4122
4123    #[test]
4124    fn test_gc_keeps_one_named_reference_from_two_overlapped() -> Result<()> {
4125        let tmp = tempdir();
4126        let repo = create_test_repo(&tmp.path().join("repo"))?;
4127
4128        let obj1 = generate_test_data(32 * 1024, 0xAE);
4129        let obj2 = generate_test_data(64 * 1024, 0xEA);
4130        let obj3 = generate_test_data(64 * 1024, 0xAA);
4131        let obj4 = generate_test_data(64 * 1024, 0xEE);
4132
4133        let obj1_id = repo.ensure_object(&obj1)?;
4134        let obj2_id: Sha512HashValue = compute_verity(&obj2);
4135        let obj3_id: Sha512HashValue = compute_verity(&obj3);
4136        let obj4_id: Sha512HashValue = compute_verity(&obj4);
4137
4138        let mut writer = repo.create_stream(0)?;
4139        writer.write_external(&obj2)?;
4140        let stream1_id = repo.write_stream(writer, "test-stream1", None)?;
4141
4142        let mut writer = repo.create_stream(0)?;
4143        writer.write_external(&obj3)?;
4144        let stream2_id = repo.write_stream(writer, "test-stream2", None)?;
4145
4146        let mut writer = repo.create_stream(0)?;
4147        writer.write_external(&obj4)?;
4148        let stream3_id = repo.write_stream(writer, "test-stream3", None)?;
4149
4150        let mut writer = repo.create_stream(0)?;
4151        writer.add_named_stream_ref("test-stream1", &stream1_id);
4152        writer.add_named_stream_ref("test-stream2", &stream2_id);
4153        let _ref_stream1_id = repo.write_stream(writer, "ref-stream1", None)?;
4154
4155        let mut writer = repo.create_stream(0)?;
4156        writer.add_named_stream_ref("test-stream1", &stream1_id);
4157        writer.add_named_stream_ref("test-stream3", &stream3_id);
4158        let _ref_stream2_id = repo.write_stream(writer, "ref-stream2", None)?;
4159
4160        repo.sync()?;
4161
4162        assert!(test_object_exists(&tmp, &obj1_id)?);
4163        assert!(test_object_exists(&tmp, &obj2_id)?);
4164        assert!(test_object_exists(&tmp, &obj3_id)?);
4165        assert!(test_object_exists(&tmp, &obj4_id)?);
4166        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
4167        let link_target =
4168            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
4169        assert!(test_path_exists_in_repo(
4170            &tmp,
4171            PathBuf::from("streams").join(&link_target)
4172        )?);
4173        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
4174        let link_target =
4175            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
4176        assert!(test_path_exists_in_repo(
4177            &tmp,
4178            PathBuf::from("streams").join(&link_target)
4179        )?);
4180        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
4181        let link_target =
4182            read_links_in_repo(&tmp, "streams/test-stream3")?.expect("link is not broken");
4183        assert!(test_path_exists_in_repo(
4184            &tmp,
4185            PathBuf::from("streams").join(&link_target)
4186        )?);
4187        assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
4188        let link_target =
4189            read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
4190        assert!(test_path_exists_in_repo(
4191            &tmp,
4192            PathBuf::from("streams").join(&link_target)
4193        )?);
4194        assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
4195        let link_target =
4196            read_links_in_repo(&tmp, "streams/ref-stream2")?.expect("link is not broken");
4197        assert!(test_path_exists_in_repo(
4198            &tmp,
4199            PathBuf::from("streams").join(&link_target)
4200        )?);
4201
4202        // Now perform gc - ref-stream1 refs stream1+stream2, so keep those and their objects
4203        let result = repo.gc(&["ref-stream1"])?;
4204
4205        assert!(!test_object_exists(&tmp, &obj1_id)?);
4206        assert!(test_object_exists(&tmp, &obj2_id)?);
4207        assert!(test_object_exists(&tmp, &obj3_id)?);
4208        assert!(!test_object_exists(&tmp, &obj4_id)?);
4209        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream1")?);
4210        let link_target =
4211            read_links_in_repo(&tmp, "streams/test-stream1")?.expect("link is not broken");
4212        assert!(test_path_exists_in_repo(
4213            &tmp,
4214            PathBuf::from("streams").join(&link_target)
4215        )?);
4216        assert!(test_path_exists_in_repo(&tmp, "streams/test-stream2")?);
4217        let link_target =
4218            read_links_in_repo(&tmp, "streams/test-stream2")?.expect("link is not broken");
4219        assert!(test_path_exists_in_repo(
4220            &tmp,
4221            PathBuf::from("streams").join(&link_target)
4222        )?);
4223        assert!(!test_path_exists_in_repo(&tmp, "streams/test-stream3")?);
4224        assert!(test_path_exists_in_repo(&tmp, "streams/ref-stream1")?);
4225        let link_target =
4226            read_links_in_repo(&tmp, "streams/ref-stream1")?.expect("link is not broken");
4227        assert!(test_path_exists_in_repo(
4228            &tmp,
4229            PathBuf::from("streams").join(&link_target)
4230        )?);
4231        assert!(!test_path_exists_in_repo(&tmp, "streams/ref-stream2")?);
4232
4233        // Verify GcResult: objects removed include obj1, obj4, plus splitstreams for stream3 and ref-stream2
4234        assert_eq!(result.objects_removed, 4);
4235        assert!(result.objects_bytes > 0);
4236        assert_eq!(result.streams_pruned, 2);
4237        assert_eq!(result.images_pruned, 0);
4238
4239        Ok(())
4240    }
4241
4242    use crate::tree::{FileSystem, Inode, LeafContent, RegularFile, Stat};
4243
4244    /// Create a default root stat for test filesystems
4245    fn test_root_stat() -> Stat {
4246        Stat {
4247            st_mode: 0o755,
4248            st_uid: 0,
4249            st_gid: 0,
4250            st_mtim_sec: 0,
4251            st_mtim_nsec: 0,
4252            xattrs: Default::default(),
4253        }
4254    }
4255
4256    /// Make a test in-memory filesystem that only contains one externally referenced object
4257    fn make_test_fs(obj: &Sha512HashValue, size: u64) -> FileSystem<Sha512HashValue> {
4258        let mut fs: FileSystem<Sha512HashValue> = FileSystem::new(test_root_stat());
4259        let leaf_id = fs.push_leaf(
4260            Stat {
4261                st_mode: 0o644,
4262                st_uid: 0,
4263                st_gid: 0,
4264                st_mtim_sec: 0,
4265                st_mtim_nsec: 0,
4266                xattrs: Default::default(),
4267            },
4268            LeafContent::Regular(RegularFile::External(obj.clone(), size)),
4269        );
4270        let inode = Inode::leaf(leaf_id);
4271        fs.root.insert(OsStr::new("data"), inode);
4272        fs
4273    }
4274
4275    #[test]
4276    fn test_gc_removes_one_image() -> Result<()> {
4277        let tmp = tempdir();
4278        let repo = create_test_repo(&tmp.path().join("repo"))?;
4279
4280        let obj1_size: u64 = 32 * 1024;
4281        let obj1 = generate_test_data(obj1_size, 0xAE);
4282        let obj2_size: u64 = 64 * 1024;
4283        let obj2 = generate_test_data(obj2_size, 0xEA);
4284
4285        let obj1_id = repo.ensure_object(&obj1)?;
4286        let obj2_id = repo.ensure_object(&obj2)?;
4287
4288        let fs = make_test_fs(&obj2_id, obj2_size);
4289        let image1 = fs.commit_image(&repo, None)?;
4290        let image1_path = format!("images/{}", image1.to_hex());
4291
4292        repo.sync()?;
4293
4294        assert!(test_object_exists(&tmp, &obj1_id)?);
4295        assert!(test_object_exists(&tmp, &obj2_id)?);
4296        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
4297        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
4298        assert!(test_path_exists_in_repo(
4299            &tmp,
4300            PathBuf::from("images").join(&link_target)
4301        )?);
4302
4303        // Now perform gc - no refs, so image and both objects removed
4304        let result = repo.gc(&[])?;
4305
4306        assert!(!test_object_exists(&tmp, &obj1_id)?);
4307        assert!(!test_object_exists(&tmp, &obj2_id)?);
4308        assert!(!test_path_exists_in_repo(&tmp, &image1_path)?);
4309
4310        // Verify GcResult: 3 objects removed (obj1, obj2, image erofs), 1 image pruned
4311        assert_eq!(result.objects_removed, 3);
4312        assert!(result.objects_bytes > 0);
4313        assert_eq!(result.images_pruned, 1);
4314        assert_eq!(result.streams_pruned, 0);
4315        Ok(())
4316    }
4317
4318    #[test]
4319    fn test_gc_keeps_one_image() -> Result<()> {
4320        let tmp = tempdir();
4321        let repo = create_test_repo(&tmp.path().join("repo"))?;
4322
4323        let obj1_size: u64 = 32 * 1024;
4324        let obj1 = generate_test_data(obj1_size, 0xAE);
4325        let obj2_size: u64 = 64 * 1024;
4326        let obj2 = generate_test_data(obj2_size, 0xEA);
4327
4328        let obj1_id = repo.ensure_object(&obj1)?;
4329        let obj2_id = repo.ensure_object(&obj2)?;
4330
4331        let fs = make_test_fs(&obj2_id, obj2_size);
4332        let image1 = fs.commit_image(&repo, None)?;
4333        let image1_path = format!("images/{}", image1.to_hex());
4334
4335        repo.sync()?;
4336
4337        assert!(test_object_exists(&tmp, &obj1_id)?);
4338        assert!(test_object_exists(&tmp, &obj2_id)?);
4339        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
4340        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
4341        assert!(test_path_exists_in_repo(
4342            &tmp,
4343            PathBuf::from("images").join(&link_target)
4344        )?);
4345
4346        // Now perform gc - keep image via additional_roots
4347        let image1_hex = image1.to_hex();
4348        let result = repo.gc(&[image1_hex.as_str()])?;
4349
4350        assert!(!test_object_exists(&tmp, &obj1_id)?);
4351        assert!(test_object_exists(&tmp, &obj2_id)?);
4352        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
4353        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
4354        assert!(test_path_exists_in_repo(
4355            &tmp,
4356            PathBuf::from("images").join(&link_target)
4357        )?);
4358
4359        // Verify GcResult: 1 object removed (obj1), no symlinks pruned
4360        assert_eq!(result.objects_removed, 1);
4361        assert!(result.objects_bytes > 0);
4362        assert_eq!(result.images_pruned, 0);
4363        assert_eq!(result.streams_pruned, 0);
4364        Ok(())
4365    }
4366
4367    #[test]
4368    fn test_gc_keeps_one_image_from_refs() -> Result<()> {
4369        let tmp = tempdir();
4370        let repo = create_test_repo(&tmp.path().join("repo"))?;
4371
4372        let obj1_size: u64 = 32 * 1024;
4373        let obj1 = generate_test_data(obj1_size, 0xAE);
4374        let obj2_size: u64 = 64 * 1024;
4375        let obj2 = generate_test_data(obj2_size, 0xEA);
4376
4377        let obj1_id = repo.ensure_object(&obj1)?;
4378        let obj2_id = repo.ensure_object(&obj2)?;
4379
4380        let fs = make_test_fs(&obj2_id, obj2_size);
4381        let image1 = fs.commit_image(&repo, Some("ref-name"))?;
4382        let image1_path = format!("images/{}", image1.to_hex());
4383
4384        repo.sync()?;
4385
4386        assert!(test_object_exists(&tmp, &obj1_id)?);
4387        assert!(test_object_exists(&tmp, &obj2_id)?);
4388        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
4389        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
4390        assert!(test_path_exists_in_repo(
4391            &tmp,
4392            PathBuf::from("images").join(&link_target)
4393        )?);
4394
4395        // Now perform gc - image kept via ref, only obj1 removed
4396        let result = repo.gc(&[])?;
4397
4398        assert!(!test_object_exists(&tmp, &obj1_id)?);
4399        assert!(test_object_exists(&tmp, &obj2_id)?);
4400        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
4401        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
4402        assert!(test_path_exists_in_repo(
4403            &tmp,
4404            PathBuf::from("images").join(&link_target)
4405        )?);
4406
4407        // Verify GcResult: 1 object removed, no symlinks pruned (image has ref)
4408        assert_eq!(result.objects_removed, 1);
4409        assert!(result.objects_bytes > 0);
4410        assert_eq!(result.images_pruned, 0);
4411        assert_eq!(result.streams_pruned, 0);
4412        Ok(())
4413    }
4414
4415    fn make_test_fs_with_two_files(
4416        obj1: &Sha512HashValue,
4417        size1: u64,
4418        obj2: &Sha512HashValue,
4419        size2: u64,
4420    ) -> FileSystem<Sha512HashValue> {
4421        let mut fs = make_test_fs(obj1, size1);
4422        let leaf_id = fs.push_leaf(
4423            Stat {
4424                st_mode: 0o644,
4425                st_uid: 0,
4426                st_gid: 0,
4427                st_mtim_sec: 0,
4428                st_mtim_nsec: 0,
4429                xattrs: Default::default(),
4430            },
4431            LeafContent::Regular(RegularFile::External(obj2.clone(), size2)),
4432        );
4433        let inode = Inode::leaf(leaf_id);
4434        fs.root.insert(OsStr::new("extra_data"), inode);
4435        fs
4436    }
4437
4438    #[test]
4439    fn test_gc_keeps_one_image_from_two_overlapped() -> Result<()> {
4440        let tmp = tempdir();
4441        let repo = create_test_repo(&tmp.path().join("repo"))?;
4442
4443        let obj1_size: u64 = 32 * 1024;
4444        let obj1 = generate_test_data(obj1_size, 0xAE);
4445        let obj2_size: u64 = 64 * 1024;
4446        let obj2 = generate_test_data(obj2_size, 0xEA);
4447        let obj3_size: u64 = 64 * 1024;
4448        let obj3 = generate_test_data(obj2_size, 0xAA);
4449        let obj4_size: u64 = 64 * 1024;
4450        let obj4 = generate_test_data(obj2_size, 0xEE);
4451
4452        let obj1_id = repo.ensure_object(&obj1)?;
4453        let obj2_id = repo.ensure_object(&obj2)?;
4454        let obj3_id = repo.ensure_object(&obj3)?;
4455        let obj4_id = repo.ensure_object(&obj4)?;
4456
4457        let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj3_id, obj3_size);
4458        let image1 = fs.commit_image(&repo, None)?;
4459        let image1_path = format!("images/{}", image1.to_hex());
4460
4461        let fs = make_test_fs_with_two_files(&obj2_id, obj2_size, &obj4_id, obj4_size);
4462        let image2 = fs.commit_image(&repo, None)?;
4463        let image2_path = format!("images/{}", image2.to_hex());
4464
4465        repo.sync()?;
4466
4467        assert!(test_object_exists(&tmp, &obj1_id)?);
4468        assert!(test_object_exists(&tmp, &obj2_id)?);
4469        assert!(test_object_exists(&tmp, &obj3_id)?);
4470        assert!(test_object_exists(&tmp, &obj4_id)?);
4471        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
4472        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
4473        assert!(test_path_exists_in_repo(
4474            &tmp,
4475            PathBuf::from("images").join(&link_target)
4476        )?);
4477        assert!(test_path_exists_in_repo(&tmp, &image2_path)?);
4478        let link_target = read_links_in_repo(&tmp, &image2_path)?.expect("link is not broken");
4479        assert!(test_path_exists_in_repo(
4480            &tmp,
4481            PathBuf::from("images").join(&link_target)
4482        )?);
4483
4484        // Now perform gc - keep image1, remove image2 and its unique objects
4485        let image1_hex = image1.to_hex();
4486        let result = repo.gc(&[image1_hex.as_str()])?;
4487
4488        assert!(!test_object_exists(&tmp, &obj1_id)?);
4489        assert!(test_object_exists(&tmp, &obj2_id)?);
4490        assert!(test_object_exists(&tmp, &obj3_id)?);
4491        assert!(!test_object_exists(&tmp, &obj4_id)?);
4492        assert!(test_path_exists_in_repo(&tmp, &image1_path)?);
4493        let link_target = read_links_in_repo(&tmp, &image1_path)?.expect("link is not broken");
4494        assert!(test_path_exists_in_repo(
4495            &tmp,
4496            PathBuf::from("images").join(&link_target)
4497        )?);
4498        assert!(!test_path_exists_in_repo(&tmp, &image2_path)?);
4499
4500        // Verify GcResult: 3 objects removed (obj1, obj4, image2 erofs), 1 image pruned
4501        assert_eq!(result.objects_removed, 3);
4502        assert!(result.objects_bytes > 0);
4503        assert_eq!(result.images_pruned, 1);
4504        assert_eq!(result.streams_pruned, 0);
4505        Ok(())
4506    }
4507
4508    #[test]
4509    fn test_ensure_object_from_file() -> Result<()> {
4510        use std::io::{Seek, SeekFrom, Write};
4511
4512        let tmp = tempdir();
4513        let repo = create_test_repo(&tmp.path().join("repo"))?;
4514        let mut ctx = ImportContext::default();
4515
4516        let test_data = generate_test_data(64 * 1024, 0xBE);
4517        let mut temp_file = crate::test::tempfile();
4518        temp_file.write_all(&test_data)?;
4519        temp_file.seek(SeekFrom::Start(0))?;
4520
4521        // First store should return Copied or Reflinked (depending on fs)
4522        let (object_id, method) =
4523            repo.ensure_object_from_file(&temp_file, test_data.len() as u64, &mut ctx)?;
4524        assert_ne!(method, ObjectStoreMethod::AlreadyPresent);
4525        assert!(test_object_exists(&tmp, &object_id)?);
4526
4527        // Read back and verify contents match
4528        let stored_data = repo.read_object(&object_id)?;
4529        assert_eq!(stored_data, test_data);
4530
4531        // Second store of same data should return AlreadyPresent
4532        temp_file.seek(SeekFrom::Start(0))?;
4533        let (object_id_2, method_2) =
4534            repo.ensure_object_from_file(&temp_file, test_data.len() as u64, &mut ctx)?;
4535        assert_eq!(object_id, object_id_2);
4536        assert_eq!(method_2, ObjectStoreMethod::AlreadyPresent);
4537
4538        Ok(())
4539    }
4540
4541    // ==================== Fsck Tests ====================
4542
4543    #[tokio::test]
4544    async fn test_fsck_empty_repo() -> Result<()> {
4545        let tmp = tempdir();
4546        let repo = create_test_repo(&tmp.path().join("repo"))?;
4547
4548        let result = repo.fsck().await?;
4549
4550        assert!(result.is_ok());
4551        assert_eq!(result.objects_checked, 0);
4552        assert_eq!(result.objects_corrupted, 0);
4553        assert_eq!(result.streams_checked, 0);
4554        assert_eq!(result.streams_corrupted, 0);
4555        assert_eq!(result.images_checked, 0);
4556        assert_eq!(result.images_corrupted, 0);
4557        assert_eq!(result.broken_links, 0);
4558        assert_eq!(result.missing_objects, 0);
4559        assert!(result.errors.is_empty());
4560        Ok(())
4561    }
4562
4563    #[tokio::test]
4564    async fn test_fsck_healthy_repo_with_objects() -> Result<()> {
4565        let tmp = tempdir();
4566        let repo = create_test_repo(&tmp.path().join("repo"))?;
4567
4568        let obj1 = generate_test_data(32 * 1024, 0xAE);
4569        let obj2 = generate_test_data(64 * 1024, 0xEA);
4570
4571        let _obj1_id = repo.ensure_object(&obj1)?;
4572        let _obj2_id: Sha512HashValue = compute_verity(&obj2);
4573
4574        let mut writer = repo.create_stream(0)?;
4575        writer.write_external(&obj2)?;
4576        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
4577        repo.sync()?;
4578
4579        let result = repo.fsck().await?;
4580
4581        assert!(result.is_ok(), "fsck should pass: {result}");
4582        // 3 objects: obj1, obj2, and the splitstream object
4583        assert!(result.objects_checked >= 3);
4584        assert_eq!(result.objects_corrupted, 0);
4585        assert_eq!(result.streams_checked, 1);
4586        assert_eq!(result.streams_corrupted, 0);
4587        assert_eq!(result.broken_links, 0);
4588        assert_eq!(result.missing_objects, 0);
4589        assert!(result.errors.is_empty());
4590        Ok(())
4591    }
4592
4593    #[tokio::test]
4594    async fn test_fsck_detects_corrupted_object() -> Result<()> {
4595        let tmp = tempdir();
4596        let repo = create_test_repo(&tmp.path().join("repo"))?;
4597
4598        let obj = generate_test_data(32 * 1024, 0xAE);
4599        let obj_id = repo.ensure_object(&obj)?;
4600        repo.sync()?;
4601
4602        // Corrupt the object by replacing the file (objects may be
4603        // immutable due to fs-verity, so we delete and recreate).
4604        let hex = obj_id.to_hex();
4605        let (dir, file) = hex.split_at(2);
4606        let obj_path = tmp
4607            .path()
4608            .join("repo")
4609            .join(format!("objects/{dir}/{file}"));
4610        std::fs::remove_file(&obj_path)?;
4611        std::fs::write(&obj_path, b"corrupted data")?;
4612
4613        let result = repo.fsck().await?;
4614
4615        assert!(!result.is_ok(), "fsck should detect corruption");
4616        assert!(
4617            result.objects_corrupted > 0,
4618            "should report corrupted objects"
4619        );
4620        assert!(
4621            result
4622                .errors
4623                .iter()
4624                .any(|e| e.to_string().contains("object-digest-mismatch")),
4625            "errors should mention digest mismatch: {:?}",
4626            result.errors
4627        );
4628        Ok(())
4629    }
4630
4631    #[tokio::test]
4632    async fn test_fsck_detects_broken_stream_link() -> Result<()> {
4633        let tmp = tempdir();
4634        let repo = create_test_repo(&tmp.path().join("repo"))?;
4635
4636        let obj = generate_test_data(64 * 1024, 0xEA);
4637        let _obj_verity: Sha512HashValue = compute_verity(&obj);
4638
4639        let mut writer = repo.create_stream(0)?;
4640        writer.write_external(&obj)?;
4641        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
4642        repo.sync()?;
4643
4644        // The stream symlink points to a splitstream object. Find and
4645        // read the symlink target, then delete the backing object.
4646        let stream_symlink = tmp.path().join("repo/streams/test-stream");
4647        let link_target = std::fs::read_link(&stream_symlink)?;
4648        // link_target is relative to streams/, e.g. "../objects/XX/YY..."
4649        let backing_path = tmp.path().join("repo/streams").join(&link_target);
4650        std::fs::remove_file(&backing_path)?;
4651
4652        let result = repo.fsck().await?;
4653
4654        assert!(!result.is_ok(), "fsck should detect broken link");
4655        assert!(
4656            result.broken_links > 0,
4657            "should report broken links: {result}"
4658        );
4659        Ok(())
4660    }
4661
4662    #[tokio::test]
4663    async fn test_fsck_detects_missing_stream_object_ref() -> Result<()> {
4664        let tmp = tempdir();
4665        let repo = create_test_repo(&tmp.path().join("repo"))?;
4666
4667        let obj = generate_test_data(64 * 1024, 0xEA);
4668        let obj_verity: Sha512HashValue = compute_verity(&obj);
4669
4670        // Create a stream with an external reference to the object.
4671        // write_external calls ensure_object internally, so the object
4672        // will exist.
4673        let mut writer = repo.create_stream(0)?;
4674        writer.write_external(&obj)?;
4675        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
4676        repo.sync()?;
4677
4678        // Delete the referenced object (but leave the splitstream intact)
4679        let hex = obj_verity.to_hex();
4680        let (dir, file) = hex.split_at(2);
4681        let obj_path = tmp
4682            .path()
4683            .join("repo")
4684            .join(format!("objects/{dir}/{file}"));
4685        std::fs::remove_file(&obj_path)?;
4686
4687        let result = repo.fsck().await?;
4688
4689        assert!(!result.is_ok(), "fsck should detect missing object ref");
4690        assert!(
4691            result.missing_objects > 0,
4692            "should report missing objects: {result}"
4693        );
4694        assert!(
4695            result
4696                .errors
4697                .iter()
4698                .any(|e| e.to_string().contains("missing-object-ref")),
4699            "errors should mention missing object: {:?}",
4700            result.errors
4701        );
4702        Ok(())
4703    }
4704
4705    // ==================== Additional Fsck Gap Tests ====================
4706
4707    fn open_test_repo_dir(tmp: &tempfile::TempDir) -> cap_std::fs::Dir {
4708        cap_std::fs::Dir::open_ambient_dir(tmp.path().join("repo"), cap_std::ambient_authority())
4709            .unwrap()
4710    }
4711
4712    #[tokio::test]
4713    async fn test_fsck_detects_non_symlink_in_streams() -> Result<()> {
4714        // Exercises fsck_category non-symlink detection (line ~1695).
4715        // The code checks entry.file_type() != FileType::Symlink and reports
4716        // "not a symlink" for regular files or directories in streams/.
4717        let tmp = tempdir();
4718        let repo = create_test_repo(&tmp.path().join("repo"))?;
4719        repo.sync()?;
4720
4721        // Create a regular file directly in streams/ (not a symlink)
4722        let dir = open_test_repo_dir(&tmp);
4723        dir.create_dir_all("streams")?;
4724        dir.write("streams/bogus-entry", b"not a symlink")?;
4725
4726        let result = repo.fsck().await?;
4727
4728        assert!(!result.is_ok(), "fsck should detect non-symlink in streams");
4729        assert_eq!(result.streams_corrupted, 1);
4730        assert!(
4731            result
4732                .errors
4733                .iter()
4734                .any(|e| e.to_string().contains("entry-not-symlink")),
4735            "errors should mention non-symlink: {:?}",
4736            result.errors
4737        );
4738        Ok(())
4739    }
4740
4741    #[tokio::test]
4742    async fn test_fsck_detects_non_symlink_in_images() -> Result<()> {
4743        // Exercises fsck_category non-symlink detection for the "images"
4744        // category (same code path as streams, but counting images_corrupted).
4745        let tmp = tempdir();
4746        let repo = create_test_repo(&tmp.path().join("repo"))?;
4747        repo.sync()?;
4748
4749        let dir = open_test_repo_dir(&tmp);
4750        dir.create_dir_all("images")?;
4751        dir.write("images/bogus-image", b"not a symlink")?;
4752
4753        let result = repo.fsck().await?;
4754
4755        assert!(!result.is_ok(), "fsck should detect non-symlink in images");
4756        assert_eq!(result.images_corrupted, 1);
4757        assert!(
4758            result
4759                .errors
4760                .iter()
4761                .any(|e| e.to_string().contains("entry-not-symlink")),
4762            "errors should mention non-symlink: {:?}",
4763            result.errors
4764        );
4765        Ok(())
4766    }
4767
4768    #[tokio::test]
4769    async fn test_fsck_detects_broken_ref_symlink() -> Result<()> {
4770        // Exercises fsck_refs_dir broken symlink detection (line ~1804).
4771        // Creates a ref symlink that points to a non-existent stream entry,
4772        // so following the chain refs/X -> ../../stream-entry -> object fails.
4773        let tmp = tempdir();
4774        let repo = create_test_repo(&tmp.path().join("repo"))?;
4775        repo.sync()?;
4776
4777        // Create refs directory under streams
4778        let dir = open_test_repo_dir(&tmp);
4779        dir.create_dir_all("streams/refs")?;
4780
4781        // Create a dangling symlink in refs/
4782        dir.symlink("../nonexistent-stream", "streams/refs/broken-ref")?;
4783
4784        let result = repo.fsck().await?;
4785
4786        assert!(!result.is_ok(), "fsck should detect broken ref symlink");
4787        assert!(result.broken_links > 0, "should report broken links");
4788        assert!(
4789            result
4790                .errors
4791                .iter()
4792                .any(|e| e.to_string().contains("broken-symlink")
4793                    && e.to_string().contains("refs")),
4794            "errors should mention broken ref symlink: {:?}",
4795            result.errors
4796        );
4797        Ok(())
4798    }
4799
4800    #[tokio::test]
4801    async fn test_fsck_refs_dir_unexpected_file_type() -> Result<()> {
4802        // Exercises the "unexpected file type" branch in fsck_refs_dir
4803        // (line ~1817). Regular files in refs/ are neither symlinks nor
4804        // directories — they should be flagged.
4805        let tmp = tempdir();
4806        let repo = create_test_repo(&tmp.path().join("repo"))?;
4807        repo.sync()?;
4808
4809        let dir = open_test_repo_dir(&tmp);
4810        dir.create_dir_all("streams/refs")?;
4811
4812        // Put a regular file directly in refs/
4813        dir.write("streams/refs/stray-file", b"should not be here")?;
4814
4815        let result = repo.fsck().await?;
4816
4817        assert!(!result.is_ok(), "fsck should detect unexpected file type");
4818        assert!(
4819            result
4820                .errors
4821                .iter()
4822                .any(|e| e.to_string().contains("unexpected-file-type")),
4823            "errors should mention unexpected file type: {:?}",
4824            result.errors
4825        );
4826        Ok(())
4827    }
4828
4829    #[tokio::test]
4830    async fn test_fsck_refs_dir_recursive() -> Result<()> {
4831        // Exercises the recursive walk in fsck_refs_dir: creates a nested
4832        // subdirectory under refs/ with a broken symlink inside to verify
4833        // the recursion actually descends into subdirs.
4834        let tmp = tempdir();
4835        let repo = create_test_repo(&tmp.path().join("repo"))?;
4836        repo.sync()?;
4837
4838        let dir = open_test_repo_dir(&tmp);
4839        dir.create_dir_all("streams/refs/nested/deep")?;
4840
4841        // Broken symlink in the nested directory
4842        dir.symlink(
4843            "../../../nonexistent-stream",
4844            "streams/refs/nested/deep/broken-nested-ref",
4845        )?;
4846
4847        let result = repo.fsck().await?;
4848
4849        assert!(
4850            !result.is_ok(),
4851            "fsck should detect broken symlink in nested refs"
4852        );
4853        assert!(result.broken_links > 0);
4854        assert!(
4855            result
4856                .errors
4857                .iter()
4858                .any(|e| e.to_string().contains("nested/deep")
4859                    && e.to_string().contains("broken-symlink")),
4860            "error should reference the nested path: {:?}",
4861            result.errors
4862        );
4863        Ok(())
4864    }
4865
4866    #[tokio::test]
4867    async fn test_fsck_detects_invalid_object_filename() -> Result<()> {
4868        // Exercises fsck_object_dir invalid filename detection (line ~1581).
4869        // Creates a file with a name that can't be parsed as a hex hash
4870        // remainder in objects/XX/.
4871        let tmp = tempdir();
4872        let repo = create_test_repo(&tmp.path().join("repo"))?;
4873        repo.sync()?;
4874
4875        let dir = open_test_repo_dir(&tmp);
4876        dir.create_dir_all("objects/ab")?;
4877        dir.write("objects/ab/not-a-valid-hex-hash", b"junk")?;
4878
4879        let result = repo.fsck().await?;
4880
4881        assert!(
4882            !result.is_ok(),
4883            "fsck should detect invalid object filename"
4884        );
4885        assert!(result.objects_corrupted > 0);
4886        assert!(
4887            result
4888                .errors
4889                .iter()
4890                .any(|e| e.to_string().contains("object-invalid-name")),
4891            "errors should mention invalid filename: {:?}",
4892            result.errors
4893        );
4894        Ok(())
4895    }
4896
4897    #[tokio::test]
4898    async fn test_fsck_detects_broken_image_symlink() -> Result<()> {
4899        // Exercises the broken symlink path in fsck_category for images
4900        // (line ~1711). The stream broken-symlink test covers streams;
4901        // this covers the same logic for images/.
4902        let tmp = tempdir();
4903        let repo = create_test_repo(&tmp.path().join("repo"))?;
4904
4905        let obj_size: u64 = 32 * 1024;
4906        let obj = generate_test_data(obj_size, 0xBB);
4907        let obj_id = repo.ensure_object(&obj)?;
4908
4909        let fs = make_test_fs(&obj_id, obj_size);
4910        let image_id = fs.commit_image(&repo, None)?;
4911        repo.sync()?;
4912
4913        // Delete the backing object that the image symlink points to
4914        let dir = open_test_repo_dir(&tmp);
4915        let image_rel = format!("images/{}", image_id.to_hex());
4916        let link_target = dir.read_link(&image_rel)?;
4917        let backing_rel = PathBuf::from("images").join(&link_target);
4918        dir.remove_file(&backing_rel)?;
4919
4920        let result = repo.fsck().await?;
4921
4922        assert!(
4923            !result.is_ok(),
4924            "fsck should detect broken image symlink: {result}"
4925        );
4926        assert!(result.broken_links > 0);
4927        assert!(result.images_corrupted > 0);
4928        Ok(())
4929    }
4930
4931    #[tokio::test]
4932    async fn test_fsck_detects_missing_named_ref_object() -> Result<()> {
4933        // Exercises fsck_splitstream named ref checking (line ~1869).
4934        // Creates a stream with a named ref pointing to a non-existent
4935        // object, which should be detected as a missing object.
4936        let tmp = tempdir();
4937        let repo = create_test_repo(&tmp.path().join("repo"))?;
4938
4939        let obj = generate_test_data(64 * 1024, 0xEA);
4940
4941        // Create stream1 that references obj
4942        let mut writer1 = repo.create_stream(0)?;
4943        writer1.write_external(&obj)?;
4944        let stream1_id = repo.write_stream(writer1, "test-stream1", None)?;
4945
4946        // Create stream2 with a named ref to stream1
4947        let mut writer2 = repo.create_stream(0)?;
4948        writer2.add_named_stream_ref("test-stream1", &stream1_id);
4949        let _stream2_id = repo.write_stream(writer2, "test-stream2", None)?;
4950        repo.sync()?;
4951
4952        // Delete the object that the named ref points to (the stream1 splitstream object)
4953        let hex = stream1_id.to_hex();
4954        let (prefix, rest) = hex.split_at(2);
4955        let repo_dir = open_test_repo_dir(&tmp);
4956        repo_dir.remove_file(format!("objects/{prefix}/{rest}"))?;
4957
4958        let result = repo.fsck().await?;
4959
4960        assert!(
4961            !result.is_ok(),
4962            "fsck should detect missing named ref object"
4963        );
4964        assert!(
4965            result.missing_objects > 0,
4966            "should report missing objects: {result}"
4967        );
4968        assert!(
4969            result
4970                .errors
4971                .iter()
4972                .any(|e| e.to_string().contains("missing-named-ref")),
4973            "errors should mention missing named ref object: {:?}",
4974            result.errors
4975        );
4976        Ok(())
4977    }
4978
4979    #[tokio::test]
4980    async fn test_fsck_healthy_repo_with_refs() -> Result<()> {
4981        // Verifies fsck_refs_dir passes on valid refs. Prior tests only
4982        // checked that fsck detects broken refs; this confirms a repo
4983        // with valid refs reports ok.
4984        let tmp = tempdir();
4985        let repo = create_test_repo(&tmp.path().join("repo"))?;
4986
4987        let obj = generate_test_data(64 * 1024, 0xEA);
4988
4989        let mut writer = repo.create_stream(0)?;
4990        writer.write_external(&obj)?;
4991        // write_stream with reference creates a ref symlink
4992        let _stream_id = repo.write_stream(writer, "test-stream", Some("my-ref"))?;
4993        repo.sync()?;
4994
4995        let result = repo.fsck().await?;
4996
4997        assert!(result.is_ok(), "fsck should pass with valid refs: {result}");
4998        assert!(result.errors.is_empty());
4999        Ok(())
5000    }
5001
5002    #[tokio::test]
5003    async fn test_fsck_detects_corrupted_splitstream_object() -> Result<()> {
5004        // Exercises fsck_splitstream failure-to-open path (line ~1829).
5005        // Corrupts the splitstream object so that open_stream fails to
5006        // parse it, which is different from a missing external object ref.
5007        let tmp = tempdir();
5008        let repo = create_test_repo(&tmp.path().join("repo"))?;
5009
5010        let obj = generate_test_data(64 * 1024, 0xEA);
5011
5012        let mut writer = repo.create_stream(0)?;
5013        writer.write_external(&obj)?;
5014        let _stream_id = repo.write_stream(writer, "test-stream", None)?;
5015        repo.sync()?;
5016
5017        // Find the splitstream object path via the stream symlink
5018        let dir = open_test_repo_dir(&tmp);
5019        let link_target = dir.read_link("streams/test-stream")?;
5020        let backing_rel = PathBuf::from("streams").join(&link_target);
5021
5022        // Corrupt the splitstream object (not the data object it references)
5023        dir.remove_file(&backing_rel)?;
5024        dir.write(&backing_rel, b"corrupted splitstream header")?;
5025
5026        let result = repo.fsck().await?;
5027
5028        assert!(
5029            !result.is_ok(),
5030            "fsck should detect corrupted splitstream: {result}"
5031        );
5032        // The object digest mismatch is detected by object checking,
5033        // and the stream is also flagged because open_stream will fail
5034        // or the object refs check will fail.
5035        assert!(
5036            result.objects_corrupted > 0 || result.streams_corrupted > 0,
5037            "should report corruption: {result}"
5038        );
5039        Ok(())
5040    }
5041
5042    #[tokio::test]
5043    async fn test_fsck_validates_erofs_image_objects() -> Result<()> {
5044        // Exercises fsck_image: creates a valid erofs image, then deletes
5045        // one of its referenced objects. Fsck should detect the missing
5046        // object via erofs parsing.
5047        let tmp = tempdir();
5048        let repo = create_test_repo(&tmp.path().join("repo"))?;
5049
5050        let obj_size: u64 = 32 * 1024;
5051        let obj = generate_test_data(obj_size, 0xCC);
5052        let obj_id = repo.ensure_object(&obj)?;
5053
5054        let fs = make_test_fs(&obj_id, obj_size);
5055        let image_id = fs.commit_image(&repo, None)?;
5056        repo.sync()?;
5057
5058        // Sanity: fsck passes on a healthy image
5059        let result = repo.fsck().await?;
5060        assert!(result.is_ok(), "healthy image should pass fsck: {result}");
5061        assert!(result.images_checked > 0, "should have checked the image");
5062
5063        // Delete the object referenced by the erofs image
5064        let hex = obj_id.to_hex();
5065        let (prefix, rest) = hex.split_at(2);
5066        let dir = open_test_repo_dir(&tmp);
5067        dir.remove_file(format!("objects/{prefix}/{rest}"))?;
5068
5069        let result = repo.fsck().await?;
5070        assert!(
5071            !result.is_ok(),
5072            "fsck should detect missing object referenced by erofs image: {result}"
5073        );
5074        assert!(
5075            result.missing_objects > 0,
5076            "should report missing objects: {result}"
5077        );
5078        assert!(
5079            result
5080                .errors
5081                .iter()
5082                .any(|e| e.to_string().contains(&image_id.to_hex())
5083                    && e.to_string().contains("image-missing-object")),
5084            "error should reference the image: {:?}",
5085            result.errors
5086        );
5087        Ok(())
5088    }
5089
5090    #[tokio::test]
5091    async fn test_fsck_detects_corrupt_erofs_image() -> Result<()> {
5092        // Exercises fsck_image: corrupts the erofs image data so that
5093        // parsing fails. fsck_image returns an error rather than panicking.
5094        let tmp = tempdir();
5095        let repo = create_test_repo(&tmp.path().join("repo"))?;
5096
5097        let obj_size: u64 = 32 * 1024;
5098        let obj = generate_test_data(obj_size, 0xDD);
5099        let obj_id = repo.ensure_object(&obj)?;
5100
5101        let fs = make_test_fs(&obj_id, obj_size);
5102        let image_id = fs.commit_image(&repo, None)?;
5103        repo.sync()?;
5104
5105        // Corrupt the erofs image data (replace the backing object)
5106        let hex = image_id.to_hex();
5107        let (prefix, rest) = hex.split_at(2);
5108        let dir = open_test_repo_dir(&tmp);
5109        let obj_path = format!("objects/{prefix}/{rest}");
5110        dir.remove_file(&obj_path)?;
5111        dir.write(&obj_path, b"this is not a valid erofs image")?;
5112
5113        let result = repo.fsck().await?;
5114        assert!(
5115            !result.is_ok(),
5116            "fsck should detect corrupt erofs image: {result}"
5117        );
5118        assert!(
5119            result
5120                .errors
5121                .iter()
5122                .any(|e| e.to_string().contains("image-invalid")
5123                    || e.to_string().contains("digest mismatch")),
5124            "error should mention erofs corruption or digest mismatch: {:?}",
5125            result.errors
5126        );
5127        Ok(())
5128    }
5129
5130    /// Helper to create a V1 (C-compatible) EROFS image and write it to the repo.
5131    fn commit_v1_image(
5132        repo: &Repository<Sha512HashValue>,
5133        obj_id: &Sha512HashValue,
5134        obj_size: u64,
5135    ) -> Result<Sha512HashValue> {
5136        use crate::erofs::writer::{ValidatedFileSystem, mkfs_erofs_versioned};
5137
5138        let fs = make_test_fs(obj_id, obj_size);
5139        let image_data = mkfs_erofs_versioned(
5140            &mut ValidatedFileSystem::new(fs).unwrap(),
5141            FormatVersion::V1,
5142        );
5143        repo.write_image(None, &image_data)
5144    }
5145
5146    #[tokio::test]
5147    async fn test_fsck_validates_v1_erofs_image() -> Result<()> {
5148        // V1 images (C-compatible format) should pass fsck just like V2.
5149        // This catches regressions where fsck or the reader doesn't handle
5150        // compact inodes, BFS ordering, or the whiteout table.
5151        let tmp = tempdir();
5152        let repo = create_test_repo(&tmp.path().join("repo"))?;
5153
5154        let obj_size: u64 = 32 * 1024;
5155        let obj = generate_test_data(obj_size, 0xBB);
5156        let obj_id = repo.ensure_object(&obj)?;
5157
5158        commit_v1_image(&repo, &obj_id, obj_size)?;
5159        repo.sync()?;
5160
5161        let result = repo.fsck().await?;
5162        assert!(
5163            result.is_ok(),
5164            "V1 (C-compatible) erofs image should pass fsck: {result}"
5165        );
5166        assert!(result.images_checked > 0, "should have checked the image");
5167        Ok(())
5168    }
5169
5170    // ---- Fsck metadata validation tests ----
5171
5172    #[tokio::test]
5173    async fn test_fsck_valid_metadata() -> Result<()> {
5174        let tmp = tempdir();
5175        let repo = create_test_repo(&tmp.path().join("repo"))?;
5176
5177        let result = repo.fsck().await?;
5178        assert!(result.is_ok());
5179        assert!(result.has_metadata());
5180        assert!(result.errors().is_empty());
5181        assert!(
5182            result.to_string().contains("meta.json: ok"),
5183            "display should show ok: {result}"
5184        );
5185        Ok(())
5186    }
5187
5188    #[tokio::test]
5189    async fn test_fsck_corrupt_metadata() -> Result<()> {
5190        // Write garbage to meta.json after opening — fsck re-reads from disk.
5191        let tmp = tempdir();
5192        let repo = create_test_repo(&tmp.path().join("repo"))?;
5193
5194        let dir = open_test_repo_dir(&tmp);
5195        // Remove the valid meta.json and replace with garbage
5196        dir.remove_file(REPO_METADATA_FILENAME)?;
5197        dir.write(REPO_METADATA_FILENAME, b"not valid json {{")?;
5198
5199        let result = repo.fsck().await?;
5200        assert!(!result.is_ok());
5201        assert!(
5202            result
5203                .errors()
5204                .iter()
5205                .any(|e| matches!(e, FsckError::MetadataParseFailed { .. }))
5206        );
5207        assert!(
5208            result.to_string().contains("meta.json: error"),
5209            "display should show error: {result}"
5210        );
5211        Ok(())
5212    }
5213
5214    #[test]
5215    fn test_open_path_requires_metadata() {
5216        // Opening a directory without meta.json should fail with MetadataMissing.
5217        let tmp = tempdir();
5218        let path = tmp.path().join("bare-repo");
5219        mkdirat(CWD, &path, Mode::from_raw_mode(0o755)).unwrap();
5220        assert!(matches!(
5221            Repository::<Sha512HashValue>::open_path(CWD, &path),
5222            Err(RepositoryOpenError::MetadataMissing)
5223        ));
5224    }
5225
5226    #[test]
5227    fn test_open_path_detects_old_format() {
5228        // A directory with objects/ but no meta.json → OldFormatRepository.
5229        let tmp = tempdir();
5230        let path = tmp.path().join("old-repo");
5231        mkdirat(CWD, &path, Mode::from_raw_mode(0o755)).unwrap();
5232        mkdirat(CWD, &path.join("objects"), Mode::from_raw_mode(0o755)).unwrap();
5233        assert!(matches!(
5234            Repository::<Sha512HashValue>::open_path(CWD, &path),
5235            Err(RepositoryOpenError::OldFormatRepository)
5236        ));
5237    }
5238
5239    #[test]
5240    fn test_open_path_algorithm_mismatch() {
5241        // Open a sha512 repo as sha256 → AlgorithmMismatch.
5242        let tmp = tempdir();
5243        let path = tmp.path().join("sha512-repo");
5244        Repository::<Sha512HashValue>::init_path(
5245            CWD,
5246            &path,
5247            RepositoryConfig::new(Algorithm::SHA512).set_insecure(),
5248        )
5249        .unwrap();
5250        assert!(matches!(
5251            Repository::<Sha256HashValue>::open_path(CWD, &path),
5252            Err(RepositoryOpenError::AlgorithmMismatch { .. })
5253        ));
5254    }
5255
5256    // ---- RepoMetadata / FeatureFlags tests ----
5257    //
5258    // Basic metadata construction, JSON roundtrip, algorithm compatibility,
5259    // and read/write are covered by the fsck tests above and the CLI
5260    // integration tests (init, hash-mismatch, backcompat).  The tests
5261    // below focus on the three-tier feature-flag compatibility model and
5262    // JSON serialization of populated feature vectors, which aren't
5263    // exercised elsewhere.
5264
5265    #[test]
5266    fn test_metadata_json_with_features() {
5267        let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
5268        meta.features.compatible.push("some-compat".to_string());
5269        meta.features
5270            .read_only_compatible
5271            .push("some-rocompat".to_string());
5272
5273        let json = meta.to_json().unwrap();
5274        let parsed: serde_json::Value = serde_json::from_slice(&json).unwrap();
5275
5276        assert_eq!(parsed["features"]["compatible"][0], "some-compat");
5277        assert_eq!(
5278            parsed["features"]["read-only-compatible"][0],
5279            "some-rocompat"
5280        );
5281
5282        // Roundtrip
5283        let meta2 = RepoMetadata::from_json(&json).unwrap();
5284        assert_eq!(meta, meta2);
5285    }
5286
5287    #[test]
5288    fn test_feature_flags_unknown_incompat() {
5289        let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
5290        meta.features
5291            .incompatible
5292            .push("fancy-new-thing".to_string());
5293        let err = meta.check_compatible::<Sha512HashValue>().unwrap_err();
5294        assert!(
5295            format!("{err}").contains("fancy-new-thing"),
5296            "error should name the unknown feature: {err}"
5297        );
5298    }
5299
5300    #[test]
5301    fn test_feature_flags_unknown_ro_compat() {
5302        let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
5303        meta.features
5304            .read_only_compatible
5305            .push("new-index".to_string());
5306        let check = meta.check_compatible::<Sha512HashValue>().unwrap();
5307        assert_eq!(check, FeatureCheck::ReadOnly(vec!["new-index".to_string()]));
5308    }
5309
5310    #[test]
5311    fn test_feature_flags_unknown_compat_ignored() {
5312        let mut meta = RepoMetadata::for_hash::<Sha512HashValue>();
5313        meta.features.compatible.push("optional-hint".to_string());
5314        assert_eq!(
5315            meta.check_compatible::<Sha512HashValue>().unwrap(),
5316            FeatureCheck::ReadWrite
5317        );
5318    }
5319
5320    // ---- erofs_version / v1_erofs feature tests ----
5321
5322    #[test]
5323    fn test_init_v1_repo_metadata() {
5324        let meta = RepoMetadata::new_with_formats(
5325            Algorithm::SHA256,
5326            &FormatConfig::single(FormatVersion::V1),
5327        );
5328        assert_eq!(meta.erofs_version(), FormatVersion::V1);
5329        assert!(
5330            meta.features
5331                .read_only_compatible
5332                .contains(&known_features::V1_EROFS.to_string()),
5333            "V1 repo must list v1_erofs in ro_compat, got: {:?}",
5334            meta.features.read_only_compatible
5335        );
5336    }
5337
5338    #[test]
5339    fn test_init_v2_repo_metadata() {
5340        let meta = RepoMetadata::new_with_formats(
5341            Algorithm::SHA256,
5342            &FormatConfig::single(FormatVersion::V2),
5343        );
5344        assert_eq!(meta.erofs_version(), FormatVersion::V2);
5345        assert!(
5346            !meta
5347                .features
5348                .read_only_compatible
5349                .contains(&known_features::V1_EROFS.to_string()),
5350            "V2 repo must NOT list v1_erofs in ro_compat"
5351        );
5352    }
5353
5354    #[test]
5355    fn test_init_path_erofs_version_mismatch() -> Result<()> {
5356        let tmp = tempdir();
5357        let path = tmp.path().join("repo");
5358
5359        // First init: V1
5360        let config_v1 = RepositoryConfig {
5361            algorithm: Algorithm::SHA256,
5362            erofs_formats: FormatConfig::single(FormatVersion::V1),
5363            ..RepositoryConfig::default().set_insecure()
5364        };
5365        Repository::<Sha256HashValue>::init_path(CWD, &path, config_v1)?;
5366
5367        // Second init: V2 — should fail because meta.json already exists with V1 config
5368        let config_v2 = RepositoryConfig {
5369            algorithm: Algorithm::SHA256,
5370            erofs_formats: FormatConfig::single(FormatVersion::V2),
5371            ..RepositoryConfig::default().set_insecure()
5372        };
5373        let result = Repository::<Sha256HashValue>::init_path(CWD, &path, config_v2);
5374        assert!(
5375            result.is_err(),
5376            "re-initializing with different erofs_version must fail"
5377        );
5378        let err = result.unwrap_err();
5379        // Use the full chain representation so we see the inner bail! message,
5380        // not just the outermost fn_error_context wrapper.
5381        let msg = format!("{err:#}");
5382        assert!(
5383            msg.contains("erofs_version"),
5384            "error message must mention erofs_version, got: {msg}"
5385        );
5386        Ok(())
5387    }
5388
5389    #[test]
5390    fn test_init_path_same_erofs_version_is_idempotent() -> Result<()> {
5391        let tmp = tempdir();
5392        let path = tmp.path().join("repo");
5393
5394        let config = RepositoryConfig {
5395            algorithm: Algorithm::SHA256,
5396            erofs_formats: FormatConfig::single(FormatVersion::V1),
5397            ..RepositoryConfig::default().set_insecure()
5398        };
5399        let (_, was_new) = Repository::<Sha256HashValue>::init_path(CWD, &path, config.clone())?;
5400        assert!(was_new, "first init must be fresh");
5401
5402        let (repo, was_new) = Repository::<Sha256HashValue>::init_path(CWD, &path, config)?;
5403        assert!(!was_new, "second init with same config must be idempotent");
5404        assert_eq!(repo.erofs_version(), FormatVersion::V1);
5405        Ok(())
5406    }
5407
5408    #[test]
5409    fn test_legacy_repo_defaults_to_v2() {
5410        // A repo with no feature flags → no v1_erofs → derived version is V2.
5411        let json = br#"{"version":1,"algorithm":"fsverity-sha256-12","features":{}}"#;
5412        let meta: RepoMetadata = serde_json::from_slice(json).unwrap();
5413        assert_eq!(
5414            meta.erofs_version(),
5415            FormatVersion::V2,
5416            "repo with no v1_erofs flag should derive V2"
5417        );
5418
5419        // A repo with v1_erofs in ro_compat → derived version is V1.
5420        let json_v1 = br#"{"version":1,"algorithm":"fsverity-sha256-12","features":{"read-only-compatible":["v1_erofs"]}}"#;
5421        let meta_v1: RepoMetadata = serde_json::from_slice(json_v1).unwrap();
5422        assert_eq!(
5423            meta_v1.erofs_version(),
5424            FormatVersion::V1,
5425            "repo with v1_erofs flag should derive V1"
5426        );
5427
5428        // Old JSON that happens to have an erofs_version field (written by a previous
5429        // version of this code) must deserialize successfully — serde ignores unknown fields.
5430        let json_old =
5431            br#"{"version":1,"algorithm":"fsverity-sha256-12","features":{},"erofs_version":2}"#;
5432        let meta_old: RepoMetadata = serde_json::from_slice(json_old).unwrap();
5433        assert_eq!(
5434            meta_old.erofs_version(),
5435            FormatVersion::V2,
5436            "old JSON with explicit erofs_version field should still derive V2 from flags"
5437        );
5438    }
5439
5440    #[test]
5441    fn test_old_tool_blocked_on_v1_repo() {
5442        // Simulate an old tool that does not know about "v1_erofs".
5443        // A V1 repo places "v1_erofs" in ro_compat, so any tool that
5444        // does not recognise that feature must open the repo read-only.
5445        // We model this by constructing the FeatureFlags directly and filtering
5446        // against an empty ro_compat allowlist.
5447        let features = FeatureFlags {
5448            compatible: vec![],
5449            read_only_compatible: vec![known_features::V1_EROFS.to_string()],
5450            incompatible: vec![],
5451        };
5452
5453        // An unknown ro_compat feature must not prevent opening, but must
5454        // signal read-only access.
5455        let unknown_ro: Vec<String> = features
5456            .read_only_compatible
5457            .iter()
5458            .filter(|f| ![].contains(&f.as_str())) // empty old-tool allowlist
5459            .cloned()
5460            .collect();
5461        assert_eq!(
5462            unknown_ro,
5463            vec![known_features::V1_EROFS.to_string()],
5464            "old tool should see v1_erofs as an unknown ro_compat feature"
5465        );
5466        // And the current tool knows about it, so check() returns ReadWrite.
5467        assert_eq!(features.check().unwrap(), FeatureCheck::ReadWrite);
5468    }
5469
5470    #[test]
5471    fn test_object_store_method_variants() {
5472        // Verify all variants exist and are distinct
5473        let methods = [
5474            ObjectStoreMethod::Reflinked,
5475            ObjectStoreMethod::Hardlinked,
5476            ObjectStoreMethod::Copied,
5477            ObjectStoreMethod::AlreadyPresent,
5478        ];
5479
5480        for (i, a) in methods.iter().enumerate() {
5481            for (j, b) in methods.iter().enumerate() {
5482                if i == j {
5483                    assert_eq!(a, b);
5484                } else {
5485                    assert_ne!(a, b);
5486                }
5487            }
5488        }
5489
5490        // Verify Debug impl works
5491        assert_eq!(format!("{:?}", ObjectStoreMethod::Hardlinked), "Hardlinked");
5492    }
5493
5494    // ---- open_upgrade tests ----
5495
5496    #[test]
5497    fn test_open_upgrade_sha256() {
5498        let tmp = tempdir();
5499        let repo_path = tmp.path().join("repo");
5500
5501        // Create a repo, store an object, then remove meta.json to
5502        // simulate an old-format repository.
5503        let (repo, _) = Repository::<Sha256HashValue>::init_path(
5504            CWD,
5505            &repo_path,
5506            RepositoryConfig::default().set_insecure(),
5507        )
5508        .unwrap();
5509        let data = b"hello world";
5510        let obj_id = repo.ensure_object(data).unwrap();
5511        drop(repo);
5512
5513        std::fs::remove_file(repo_path.join(REPO_METADATA_FILENAME)).unwrap();
5514
5515        // open_path should fail with OldFormatRepository
5516        assert!(matches!(
5517            Repository::<Sha256HashValue>::open_path(CWD, &repo_path),
5518            Err(RepositoryOpenError::OldFormatRepository)
5519        ));
5520
5521        // open_upgrade should infer metadata and succeed
5522        let (repo, upgraded) =
5523            Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap();
5524        assert!(upgraded);
5525        assert!(repo_path.join(REPO_METADATA_FILENAME).exists());
5526
5527        // Verify the algorithm was inferred correctly
5528        let meta = read_repo_metadata(
5529            &openat(
5530                CWD,
5531                &repo_path,
5532                OFlags::RDONLY | OFlags::CLOEXEC,
5533                Mode::empty(),
5534            )
5535            .unwrap(),
5536        )
5537        .unwrap()
5538        .unwrap();
5539        assert!(meta.algorithm.is_compatible::<Sha256HashValue>());
5540
5541        // The repo should work — read back the object
5542        let read_data = repo.read_object(&obj_id).unwrap();
5543        assert_eq!(&read_data[..], data);
5544
5545        // Second call should not upgrade
5546        drop(repo);
5547        let (_repo, upgraded) =
5548            Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap();
5549        assert!(!upgraded);
5550    }
5551
5552    #[test]
5553    fn test_open_upgrade_sha512() {
5554        let tmp = tempdir();
5555        let repo_path = tmp.path().join("repo");
5556
5557        let (repo, _) = Repository::<Sha512HashValue>::init_path(
5558            CWD,
5559            &repo_path,
5560            RepositoryConfig::new(Algorithm::SHA512).set_insecure(),
5561        )
5562        .unwrap();
5563        let data = b"sha512 test data";
5564        let obj_id = repo.ensure_object(data).unwrap();
5565        drop(repo);
5566
5567        std::fs::remove_file(repo_path.join(REPO_METADATA_FILENAME)).unwrap();
5568
5569        let (repo, upgraded) =
5570            Repository::<Sha512HashValue>::open_upgrade(CWD, &repo_path).unwrap();
5571        assert!(upgraded);
5572
5573        let meta = read_repo_metadata(
5574            &openat(
5575                CWD,
5576                &repo_path,
5577                OFlags::RDONLY | OFlags::CLOEXEC,
5578                Mode::empty(),
5579            )
5580            .unwrap(),
5581        )
5582        .unwrap()
5583        .unwrap();
5584        assert!(meta.algorithm.is_compatible::<Sha512HashValue>());
5585
5586        let read_data = repo.read_object(&obj_id).unwrap();
5587        assert_eq!(&read_data[..], data);
5588    }
5589
5590    #[test]
5591    fn test_open_upgrade_algorithm_mismatch() {
5592        // Create a sha512 repo, remove meta.json, then try to
5593        // open_upgrade as sha256 — should fail with algorithm mismatch.
5594        let tmp = tempdir();
5595        let repo_path = tmp.path().join("repo");
5596
5597        let (repo, _) = Repository::<Sha512HashValue>::init_path(
5598            CWD,
5599            &repo_path,
5600            RepositoryConfig::new(Algorithm::SHA512).set_insecure(),
5601        )
5602        .unwrap();
5603        repo.ensure_object(b"some data").unwrap();
5604        drop(repo);
5605
5606        std::fs::remove_file(repo_path.join(REPO_METADATA_FILENAME)).unwrap();
5607
5608        let err = Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap_err();
5609        let msg = format!("{err:#}");
5610        assert!(
5611            msg.contains("not compatible"),
5612            "expected algorithm mismatch error, got: {msg}"
5613        );
5614    }
5615
5616    #[test]
5617    fn test_open_upgrade_empty_objects() {
5618        // An old-format repo with an empty objects/ directory should
5619        // fail because we can't infer the algorithm.
5620        let tmp = tempdir();
5621        let repo_path = tmp.path().join("repo");
5622        mkdirat(CWD, &repo_path, Mode::from_raw_mode(0o755)).unwrap();
5623        mkdirat(CWD, &repo_path.join("objects"), Mode::from_raw_mode(0o755)).unwrap();
5624
5625        let err = Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap_err();
5626        let msg = format!("{err:#}");
5627        assert!(
5628            msg.contains("no objects found"),
5629            "expected 'no objects found' error, got: {msg}"
5630        );
5631    }
5632
5633    #[test]
5634    fn test_open_upgrade_already_initialized() {
5635        // open_upgrade on a repo that already has meta.json should
5636        // return upgraded=false.
5637        let tmp = tempdir();
5638        let repo_path = tmp.path().join("repo");
5639
5640        Repository::<Sha256HashValue>::init_path(
5641            CWD,
5642            &repo_path,
5643            RepositoryConfig::default().set_insecure(),
5644        )
5645        .unwrap();
5646
5647        let (_repo, upgraded) =
5648            Repository::<Sha256HashValue>::open_upgrade(CWD, &repo_path).unwrap();
5649        assert!(!upgraded);
5650    }
5651
5652    #[tokio::test]
5653    async fn test_fsck_v1_image_detects_missing_object() -> Result<()> {
5654        // Same as test_fsck_validates_erofs_image_objects but with a V1 image,
5655        // ensuring fsck correctly parses V1 images to find object references.
5656        let tmp = tempdir();
5657        let repo = create_test_repo(&tmp.path().join("repo"))?;
5658
5659        let obj_size: u64 = 32 * 1024;
5660        let obj = generate_test_data(obj_size, 0xBC);
5661        let obj_id = repo.ensure_object(&obj)?;
5662
5663        commit_v1_image(&repo, &obj_id, obj_size)?;
5664        repo.sync()?;
5665
5666        // Sanity: passes before we break it
5667        let result = repo.fsck().await?;
5668        assert!(
5669            result.is_ok(),
5670            "healthy V1 image should pass fsck: {result}"
5671        );
5672
5673        // Delete the referenced object
5674        let hex = obj_id.to_hex();
5675        let (prefix, rest) = hex.split_at(2);
5676        let dir = open_test_repo_dir(&tmp);
5677        dir.remove_file(format!("objects/{prefix}/{rest}"))?;
5678
5679        let result = repo.fsck().await?;
5680        assert!(
5681            !result.is_ok(),
5682            "fsck should detect missing object in V1 erofs image: {result}"
5683        );
5684        assert!(
5685            result.missing_objects > 0,
5686            "should report missing objects: {result}"
5687        );
5688        Ok(())
5689    }
5690
5691    // ---- v1_erofs feature flag tests ----
5692    //
5693    // The `v1_erofs` ro_compat flag is the single on-disk signal for V1 EROFS.
5694    // It is derived from `erofs_formats.default`; the `extra` list is not
5695    // persisted to disk.
5696
5697    #[test]
5698    fn test_v1_erofs_flag_set_for_v1_repos() {
5699        // V1 primary → v1_erofs present
5700        let meta = RepoMetadata::new_with_formats(
5701            Algorithm::SHA256,
5702            &FormatConfig::single(FormatVersion::V1),
5703        );
5704        assert!(
5705            meta.features
5706                .read_only_compatible
5707                .contains(&known_features::V1_EROFS.to_string()),
5708            "V1 repo must set v1_erofs in ro_compat, got: {:?}",
5709            meta.features.read_only_compatible
5710        );
5711        assert_eq!(meta.erofs_version(), FormatVersion::V1);
5712    }
5713
5714    #[test]
5715    fn test_v1_erofs_flag_absent_for_v2_repos() {
5716        let meta = RepoMetadata::new_with_formats(
5717            Algorithm::SHA256,
5718            &FormatConfig::single(FormatVersion::V2),
5719        );
5720        assert!(
5721            !meta
5722                .features
5723                .read_only_compatible
5724                .contains(&known_features::V1_EROFS.to_string()),
5725            "V2 repo must NOT set v1_erofs in ro_compat, got: {:?}",
5726            meta.features.read_only_compatible
5727        );
5728        assert!(
5729            meta.features.incompatible.is_empty(),
5730            "V2 repo must have no incompat flags, got: {:?}",
5731            meta.features.incompatible
5732        );
5733        assert_eq!(meta.erofs_version(), FormatVersion::V2);
5734    }
5735
5736    #[test]
5737    fn test_default_format_config_from_v1_erofs_flag() {
5738        // v1_erofs present → V1 primary
5739        let meta_v1 = RepoMetadata::new_with_formats(
5740            Algorithm::SHA256,
5741            &FormatConfig::single(FormatVersion::V1),
5742        );
5743        assert_eq!(
5744            repo_format_config_from_meta(&meta_v1),
5745            FormatConfig::single(FormatVersion::V1)
5746        );
5747
5748        // v1_erofs absent → V2 primary
5749        let meta_v2 = RepoMetadata::new_with_formats(
5750            Algorithm::SHA256,
5751            &FormatConfig::single(FormatVersion::V2),
5752        );
5753        assert_eq!(
5754            repo_format_config_from_meta(&meta_v2),
5755            FormatConfig::single(FormatVersion::V2)
5756        );
5757    }
5758
5759    #[test]
5760    fn test_init_path_v1_format_config() -> Result<()> {
5761        let tmp = tempdir();
5762        let path = tmp.path().join("repo");
5763
5764        let config = RepositoryConfig {
5765            algorithm: Algorithm::SHA256,
5766            erofs_formats: FormatConfig::single(FormatVersion::V1),
5767            ..RepositoryConfig::default().set_insecure()
5768        };
5769        let (repo, was_new) = Repository::<Sha256HashValue>::init_path(CWD, &path, config)?;
5770        assert!(was_new);
5771        assert_eq!(repo.erofs_version(), FormatVersion::V1);
5772        assert_eq!(
5773            repo.default_format_config(),
5774            FormatConfig::single(FormatVersion::V1)
5775        );
5776        assert!(
5777            repo.metadata()
5778                .features
5779                .read_only_compatible
5780                .contains(&known_features::V1_EROFS.to_string()),
5781            "v1_erofs must be in ro_compat for V1 repos"
5782        );
5783        Ok(())
5784    }
5785
5786    #[test]
5787    fn test_init_path_v2_format_config() -> Result<()> {
5788        let tmp = tempdir();
5789        let path = tmp.path().join("repo");
5790
5791        let config = RepositoryConfig {
5792            algorithm: Algorithm::SHA256,
5793            erofs_formats: FormatConfig::single(FormatVersion::V2),
5794            ..RepositoryConfig::default().set_insecure()
5795        };
5796        let (repo, was_new) = Repository::<Sha256HashValue>::init_path(CWD, &path, config)?;
5797        assert!(was_new);
5798        assert_eq!(repo.erofs_version(), FormatVersion::V2);
5799        assert_eq!(
5800            repo.default_format_config(),
5801            FormatConfig::single(FormatVersion::V2)
5802        );
5803        assert!(
5804            !repo
5805                .metadata()
5806                .features
5807                .read_only_compatible
5808                .contains(&known_features::V1_EROFS.to_string()),
5809            "v1_erofs must NOT be in ro_compat for V2-only repos"
5810        );
5811        assert!(
5812            repo.metadata().features.incompatible.is_empty(),
5813            "V2-only repo must have no incompat flags, got: {:?}",
5814            repo.metadata().features.incompatible
5815        );
5816        Ok(())
5817    }
5818
5819    /// Verify `commit_images` with a dual-format config:
5820    /// - both ObjectIDs are in the returned map,
5821    /// - both image symlinks exist in `images/`,
5822    /// - the named ref points to the V1 image (the default / primary version).
5823    #[test]
5824    fn test_commit_images_both_named_ref_points_to_v1() -> Result<()> {
5825        use crate::tree::{FileSystem, Stat};
5826
5827        let tmp = tempdir();
5828        let repo_path = tmp.path().join("repo");
5829        // V1 is default (gets the named ref); V2 is extra.
5830        let dual_fmt = FormatConfig {
5831            default: FormatVersion::V1,
5832            extra: [FormatVersion::V2].into(),
5833        };
5834        let config = RepositoryConfig {
5835            algorithm: Algorithm::SHA256,
5836            erofs_formats: dual_fmt,
5837            ..RepositoryConfig::default().set_insecure()
5838        };
5839        let (repo, _) = Repository::<Sha256HashValue>::init_path(CWD, &repo_path, config)?;
5840
5841        // Build a minimal filesystem (empty root dir is enough).
5842        let root_stat = Stat {
5843            st_mode: 0o755,
5844            st_uid: 0,
5845            st_gid: 0,
5846            st_mtim_sec: 0,
5847            st_mtim_nsec: 0,
5848            xattrs: Default::default(),
5849        };
5850        let fs: FileSystem<Sha256HashValue> = FileSystem::new(root_stat);
5851
5852        // commit_images now reads the FormatConfig from the repository itself.
5853        let map = fs.commit_images(&repo, Some("myref"))?;
5854        repo.sync()?;
5855
5856        // Both versions must be in the result.
5857        let v1_id = map
5858            .get(&FormatVersion::V1)
5859            .expect("V1 must be in result map");
5860        let v2_id = map
5861            .get(&FormatVersion::V2)
5862            .expect("V2 must be in result map");
5863
5864        // Both image symlinks must exist under images/.
5865        let v1_image_path = format!("images/{}", v1_id.to_hex());
5866        let v2_image_path = format!("images/{}", v2_id.to_hex());
5867        assert!(
5868            test_path_exists_in_repo(&tmp, &v1_image_path)?,
5869            "V1 image symlink must exist: {v1_image_path}"
5870        );
5871        assert!(
5872            test_path_exists_in_repo(&tmp, &v2_image_path)?,
5873            "V2 image symlink must exist: {v2_image_path}"
5874        );
5875
5876        // The named ref must exist and must point to the V1 image (primary).
5877        let ref_path = "images/refs/myref";
5878        assert!(
5879            test_path_exists_in_repo(&tmp, ref_path)?,
5880            "named ref images/refs/myref must exist"
5881        );
5882        // The ref symlink target should contain the V1 image hex, not V2.
5883        let ref_full = tmp.path().join("repo").join(ref_path);
5884        let target = readlinkat(CWD, &ref_full, Vec::new())?;
5885        let target_str = target.to_str()?;
5886        assert!(
5887            target_str.contains(&v1_id.to_hex()),
5888            "named ref must point to V1 image ({}), but points to: {target_str}",
5889            v1_id.to_hex()
5890        );
5891        assert!(
5892            !target_str.contains(&v2_id.to_hex()),
5893            "named ref must NOT point to V2 image, but points to: {target_str}"
5894        );
5895        Ok(())
5896    }
5897
5898    #[test]
5899    fn test_meta_json_shapes() {
5900        // Verify the serialized JSON shapes for each repo type.
5901        let v2 = RepoMetadata::new_with_formats(
5902            Algorithm::SHA256,
5903            &FormatConfig::single(FormatVersion::V2),
5904        );
5905        let v2_json = String::from_utf8(v2.to_json().unwrap()).unwrap();
5906        // erofs_formats field must be present with default:2, no extra key
5907        assert!(
5908            v2_json.contains("\"erofs_formats\""),
5909            "V2 meta.json must contain erofs_formats, got: {v2_json}"
5910        );
5911        assert!(
5912            v2_json.contains("\"default\": 2"),
5913            "V2 meta.json must have default:2, got: {v2_json}"
5914        );
5915        assert!(
5916            !v2_json.contains("\"extra\""),
5917            "V2 single-format meta.json must not have extra key, got: {v2_json}"
5918        );
5919
5920        let v1 = RepoMetadata::new_with_formats(
5921            Algorithm::SHA256,
5922            &FormatConfig::single(FormatVersion::V1),
5923        );
5924        let v1_json = String::from_utf8(v1.to_json().unwrap()).unwrap();
5925        assert!(
5926            v1_json.contains("\"erofs_formats\""),
5927            "V1 meta.json must contain erofs_formats, got: {v1_json}"
5928        );
5929        assert!(
5930            v1_json.contains("\"default\": 1"),
5931            "V1 meta.json must have default:1, got: {v1_json}"
5932        );
5933        assert!(
5934            !v1_json.contains("\"extra\""),
5935            "V1 single-format meta.json must not have extra key, got: {v1_json}"
5936        );
5937        assert!(
5938            v1_json.contains("v1_erofs"),
5939            "V1 meta.json must still contain v1_erofs flag for old-tool compat, got: {v1_json}"
5940        );
5941
5942        let dual = RepoMetadata::new_with_formats(
5943            Algorithm::SHA256,
5944            &FormatConfig {
5945                default: FormatVersion::V1,
5946                extra: [FormatVersion::V2].into(),
5947            },
5948        );
5949        let dual_json = String::from_utf8(dual.to_json().unwrap()).unwrap();
5950        assert!(
5951            dual_json.contains("\"erofs_formats\""),
5952            "dual meta.json must contain erofs_formats, got: {dual_json}"
5953        );
5954        assert!(
5955            dual_json.contains("\"default\": 1"),
5956            "dual meta.json must have default:1, got: {dual_json}"
5957        );
5958        assert!(
5959            dual_json.contains("\"extra\""),
5960            "dual meta.json must have extra key, got: {dual_json}"
5961        );
5962        assert!(
5963            dual_json.contains("v1_erofs"),
5964            "dual meta.json must still contain v1_erofs flag, got: {dual_json}"
5965        );
5966
5967        // Verify round-trip: parse back and check format_config()
5968        let v2_parsed: RepoMetadata = serde_json::from_str(&v2_json).unwrap();
5969        assert_eq!(
5970            v2_parsed.format_config(),
5971            FormatConfig::single(FormatVersion::V2)
5972        );
5973
5974        let v1_parsed: RepoMetadata = serde_json::from_str(&v1_json).unwrap();
5975        assert_eq!(
5976            v1_parsed.format_config(),
5977            FormatConfig::single(FormatVersion::V1)
5978        );
5979
5980        let dual_parsed: RepoMetadata = serde_json::from_str(&dual_json).unwrap();
5981        assert_eq!(
5982            dual_parsed.format_config(),
5983            FormatConfig {
5984                default: FormatVersion::V1,
5985                extra: [FormatVersion::V2].into(),
5986            }
5987        );
5988    }
5989}