Skip to main content

mkit_core/
store.rs

1//! Local content-addressed object store.
2//!
3//! Layout (rooted at the working-tree directory passed to [`ObjectStore::open`]
4//! / [`ObjectStore::init`]):
5//!
6//! ```text
7//! .mkit/
8//!   objects/
9//!     <2-hex>/<62-hex>   # raw canonical object bytes, BLAKE3-named
10//! ```
11//!
12//! Writes are atomic: bytes are first written to a sibling temp file
13//! (`<name>.tmp.<pid>.<rand>`), made durable, then renamed into place.
14//! A crash mid-write leaves only the temp file behind and never
15//! produces a visible object that fails the read-time hash check.
16//!
17//! Durability comes in three shapes (see [`crate::batch`] for the full
18//! contract): [`ObjectStore::write`] flushes per object
19//! ([`SyncPolicy::PerObject`]); [`ObjectStore::batch`] defers visibility
20//! and amortises durability to **one** full flush per batch — the write
21//! path used by every multi-object command (add, commit, pack unpack);
22//! and [`ObjectStore::bulk_writer`] fsyncs each object's contents before
23//! the rename and batches the dir fsyncs at commit (git import). In all
24//! three an object is never visible before its bytes are durable, so a
25//! ref or index written after the write/commit returns can never
26//! reference a non-durable object.
27//!
28//! Reads always verify integrity by recomputing BLAKE3 over the bytes
29//! and comparing against the requested hash; mismatch returns
30//! [`StoreError::HashMismatch`].
31//!
32//! See `docs/SPEC-OBJECTS.md` §10 for the path-layout rule.
33
34use std::fs::{self, File};
35use std::io::{self, Read, Write};
36use std::path::{Path, PathBuf};
37use std::process;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicU64, Ordering};
40
41use tempfile::NamedTempFile;
42
43use crate::batch::{RealSyncer, Syncer};
44pub use crate::batch::{SyncPolicy, WriteBatch};
45use crate::hash::{self, Hash, object_path, to_hex};
46use crate::object::{MkitError, Object};
47use crate::serialize;
48
49/// Top-level repository directory name.
50pub const MKIT_DIR: &str = ".mkit";
51/// Subdirectory under `.mkit/` that holds raw object files.
52pub const OBJECTS_DIR: &str = "objects";
53/// Hard cap on raw object size, enforced on both [`ObjectStore::write`]
54/// and [`ObjectStore::read`].
55pub const MAX_RAW_OBJECT_SIZE: usize = 1024 * 1024 * 1024; // 1 GiB
56
57/// Hard cap on tree-walk recursion depth.
58///
59/// Every core tree-walker (`index::from_tree`, `ops::diff`, `ops::merge`,
60/// `ops::restore`) recurses one native stack frame per directory level.
61/// Content-addressing prevents true cycles (a child's hash can never equal
62/// its parent's), but a crafted untrusted repo can still nest a few thousand
63/// single-entry trees in a tiny pack and overflow the stack — a denial of
64/// service reachable from `clone`/`checkout`/`merge`/`diff`. Walkers thread a
65/// depth counter and abort with a typed `TreeTooDeep` error once
66/// `depth > MAX_TREE_DEPTH`.
67///
68/// 128 is far beyond any legitimately-deep source tree (Git's own working
69/// trees rarely exceed a couple dozen levels) while remaining comfortably
70/// within a default 8 MiB thread stack. Mirrors the `MAX_REF_DEPTH` cap on
71/// the ref-tree walker in `refs.rs`.
72pub const MAX_TREE_DEPTH: usize = 128;
73
74/// Errors raised by the [`ObjectStore`] surface. Distinct from
75/// [`MkitError`] so callers can pattern-match on filesystem failures
76/// without losing the structured-decode-error variants.
77#[derive(Debug, thiserror::Error)]
78pub enum StoreError {
79    #[error("path is not an mkit repository (missing .mkit/objects)")]
80    NotAMkitRepository,
81    #[error(".mkit already exists in this directory")]
82    AlreadyInitialized,
83    #[error("object {0} not found")]
84    ObjectNotFound(String),
85    #[error("object exceeds {} byte cap", MAX_RAW_OBJECT_SIZE)]
86    ObjectTooLarge,
87    #[error("tree nesting exceeds {} levels", MAX_TREE_DEPTH)]
88    TreeTooDeep,
89    #[error("on-disk bytes hash to {actual}, expected {expected}")]
90    HashMismatch { expected: String, actual: String },
91    #[error(transparent)]
92    Io(#[from] io::Error),
93    #[error(transparent)]
94    Decode(#[from] MkitError),
95}
96
97/// Deferred-fsync writer returned by [`ObjectStore::bulk_writer`].
98/// See that method for the crash-safety contract.
99#[derive(Debug)]
100pub struct BulkWriter<'a> {
101    store: &'a ObjectStore,
102    dirs: std::collections::HashSet<PathBuf>,
103    files: std::collections::HashSet<PathBuf>,
104}
105
106impl BulkWriter<'_> {
107    /// Write one object durable-before-visible: contents are fsynced
108    /// before the rename publishes the object, and only the shard-dir
109    /// fsync (rename durability) is deferred to commit. This upholds the
110    /// store's global invariant (an object is never visible before its
111    /// bytes are durable), so another process's `contains()` dedup can
112    /// never reference a half-written object. An existing path is
113    /// byte-verified: matched objects are left in place (but still
114    /// fsynced at commit, see below), torn ones are rewritten.
115    ///
116    /// # Panics
117    /// Never in practice: object paths always have a 2-hex shard
118    /// parent by construction.
119    pub fn write(&mut self, bytes: &[u8]) -> StoreResult<Hash> {
120        if bytes.len() > MAX_RAW_OBJECT_SIZE {
121            return Err(StoreError::ObjectTooLarge);
122        }
123        let h = hash::hash(bytes);
124        let final_path = self.store.path_for(&h);
125        // VERIFY-skip an existing object: content addressing makes
126        // byte-equality the exact durability test — a good file (from
127        // a previously committed session, possibly referenced by
128        // native history) must NOT be replaced with an unsynced inode
129        // a power loss could tear, while a torn file from a crashed
130        // session fails the comparison and is healed by the rewrite.
131        let shard_dir = final_path
132            .parent()
133            .expect("object path always has a 2-hex parent");
134        if let Ok(existing) = fs::read(&final_path)
135            && existing == bytes
136        {
137            // The bytes may have matched out of the PAGE CACHE of a
138            // crashed session's unsynced write — byte equality is not a
139            // durability test, so this pre-existing file still needs a
140            // content fsync at commit.
141            self.dirs.insert(shard_dir.to_path_buf());
142            self.files.insert(final_path);
143            return Ok(h);
144        }
145        fs::create_dir_all(shard_dir)?;
146        // Content fsync happens BEFORE the rename, so the object is
147        // durable the instant it becomes visible. Only the rename's
148        // durability (the shard dir fsync) is deferred to commit.
149        crate::atomic::write_content_synced(&final_path, bytes)?;
150        self.dirs.insert(shard_dir.to_path_buf());
151        Ok(h)
152    }
153
154    /// Make the session durable: fsync the CONTENTS of any pre-existing
155    /// objects this batch re-used (newly written objects were already
156    /// content-fsynced before their rename in [`write`](Self::write)),
157    /// then every touched shard directory (renames become durable).
158    pub fn commit(self) -> StoreResult<()> {
159        for file in &self.files {
160            fs::OpenOptions::new().write(true).open(file)?.sync_all()?;
161        }
162        for dir in &self.dirs {
163            crate::atomic::sync_dir(dir)?;
164        }
165        Ok(())
166    }
167}
168
169/// Result alias used throughout this module.
170pub type StoreResult<T> = Result<T, StoreError>;
171
172// Tiny per-process counter for unique temp-file names. We use this
173// instead of pulling in `rand` because the temp name only needs to be
174// unique within the process; the atomic `rename` enforces global
175// correctness even if two processes collide on a name.
176static TEMP_SEQ: AtomicU64 = AtomicU64::new(0);
177
178/// Local content-addressed object store backed by the filesystem.
179#[derive(Debug, Clone)]
180pub struct ObjectStore {
181    /// Absolute path to `<root>/.mkit/objects`.
182    objects_root: PathBuf,
183    /// Flush/rename primitive seam. Production code always uses
184    /// [`RealSyncer`]; unit tests inject a recording double to assert
185    /// flush ordering and counts (the O(1)-flushes-per-batch contract).
186    syncer: Arc<dyn Syncer>,
187    /// Policy handed out by [`Self::batch`]. Defaults to
188    /// [`SyncPolicy::Batch`]; the CLI maps the repo config key
189    /// `durability.objects = per-object` onto it for deployments that
190    /// want the strict historical schedule.
191    default_policy: SyncPolicy,
192}
193
194impl ObjectStore {
195    /// Open an existing repository rooted at `root`. Returns
196    /// [`StoreError::NotAMkitRepository`] if `<root>/.mkit/objects` does
197    /// not exist.
198    pub fn open(root: &Path) -> StoreResult<Self> {
199        let objects_root = root.join(MKIT_DIR).join(OBJECTS_DIR);
200        if !objects_root.is_dir() {
201            return Err(StoreError::NotAMkitRepository);
202        }
203        Ok(Self {
204            objects_root,
205            syncer: Arc::new(RealSyncer),
206            default_policy: SyncPolicy::Batch,
207        })
208    }
209
210    /// Initialise a fresh `.mkit/` directory under `root`. Returns
211    /// [`StoreError::AlreadyInitialized`] if `.mkit/` already exists.
212    pub fn init(root: &Path) -> StoreResult<Self> {
213        let mkit_root = root.join(MKIT_DIR);
214        if mkit_root.exists() {
215            return Err(StoreError::AlreadyInitialized);
216        }
217        let objects_root = mkit_root.join(OBJECTS_DIR);
218        fs::create_dir_all(&objects_root)?;
219        Ok(Self {
220            objects_root,
221            syncer: Arc::new(RealSyncer),
222            default_policy: SyncPolicy::Batch,
223        })
224    }
225
226    /// Select the [`SyncPolicy`] that [`Self::batch`] hands out. The
227    /// CLI wires the repo config key `durability.objects` here so
228    /// deployments on filesystems where they prefer the strict
229    /// per-object schedule can opt into it (SPEC-OBJECTS §10.1).
230    pub fn set_sync_policy(&mut self, policy: SyncPolicy) {
231        self.default_policy = policy;
232    }
233
234    /// Replace the flush/rename primitives. Test-only seam — see the
235    /// `syncer` field. Not exposed publicly so the production sync
236    /// strategy cannot be silently weakened by downstream code.
237    #[cfg(test)]
238    pub(crate) fn set_syncer(&mut self, syncer: Arc<dyn Syncer>) {
239        self.syncer = syncer;
240    }
241
242    /// The active flush/rename primitives, shared with [`WriteBatch`].
243    pub(crate) fn syncer(&self) -> &Arc<dyn Syncer> {
244        &self.syncer
245    }
246
247    /// Start a batched write with the store's configured policy
248    /// (default [`SyncPolicy::Batch`]): objects staged by the batch
249    /// become durable and visible together at [`WriteBatch::commit`],
250    /// with O(1) full flushes per batch instead of per object.
251    #[must_use]
252    pub fn batch(&self) -> WriteBatch<'_> {
253        self.batch_with_policy(self.default_policy)
254    }
255
256    /// Start a batched write with an explicit [`SyncPolicy`].
257    #[must_use]
258    pub fn batch_with_policy(&self, policy: SyncPolicy) -> WriteBatch<'_> {
259        WriteBatch::new(self, policy)
260    }
261
262    /// Returns `true` when `root` contains a `.mkit/objects` directory.
263    #[must_use]
264    pub fn is_repo_root(root: &Path) -> bool {
265        root.join(MKIT_DIR).join(OBJECTS_DIR).is_dir()
266    }
267
268    /// Absolute path to the `objects/` directory.
269    #[must_use]
270    pub fn objects_root(&self) -> &Path {
271        &self.objects_root
272    }
273
274    /// Compute the on-disk path for `hash`, joined under `objects/`.
275    /// `pub(crate)` so [`WriteBatch`] shares the single layout rule.
276    pub(crate) fn path_for(&self, h: &Hash) -> PathBuf {
277        let p = object_path(h);
278        // Both halves are ASCII hex by construction in `object_path`.
279        let dir = std::str::from_utf8(&p.dir).expect("ascii hex");
280        let file = std::str::from_utf8(&p.file).expect("ascii hex");
281        self.objects_root.join(dir).join(file)
282    }
283
284    /// Returns `true` when the object `h` is present in the store. Does
285    /// **not** verify integrity — use [`Self::read`] for that.
286    #[must_use]
287    pub fn contains(&self, h: &Hash) -> bool {
288        self.path_for(h).is_file()
289    }
290
291    /// Write `bytes` to the store, returning their BLAKE3 hash. Atomic:
292    /// writes to a sibling temp file, `fsync`s, then renames into place.
293    /// Idempotent — re-writing the same bytes is a no-op (the temp file
294    /// is unlinked on the early-return path).
295    ///
296    /// # Panics
297    ///
298    /// Panics only if the internal hash-to-path mapping produces a path
299    /// without a parent directory, which is impossible by construction.
300    pub fn write(&self, bytes: &[u8]) -> StoreResult<Hash> {
301        if bytes.len() > MAX_RAW_OBJECT_SIZE {
302            return Err(StoreError::ObjectTooLarge);
303        }
304        let h = hash::hash(bytes);
305        let final_path = self.path_for(&h);
306        if final_path.exists() {
307            // Dedup hit: the object is visible, but if another process
308            // renamed it and has not yet flushed the dirent, it may not
309            // be durable — and our caller is about to reference it.
310            // Flush its shard dir before returning (SPEC-OBJECTS §10.1
311            // dedup rule, mirroring WriteBatch's touched_shards).
312            self.syncer()
313                .dir_sync(final_path.parent().expect("object path has parent"))?;
314            return Ok(h);
315        }
316        let shard_dir = final_path
317            .parent()
318            .expect("object path always has a 2-hex parent");
319        fs::create_dir_all(shard_dir)?;
320        write_atomic(&final_path, bytes, &**self.syncer())?;
321        Ok(h)
322    }
323
324    /// Begin a bulk-write session: each new object is written
325    /// durable-before-visible (contents fsynced, then renamed), and
326    /// [`BulkWriter::commit`] batches the directory fsyncs (rename
327    /// durability) plus a content fsync of any re-used pre-existing
328    /// objects once at the end, instead of fsyncing a dir per write.
329    ///
330    /// Because content is durable before the rename, the session upholds
331    /// the store's global invariant even under concurrent readers: an
332    /// object another process can `contains()`-dedup against is always
333    /// backed by durable bytes.
334    ///
335    /// Crash-safety contract (deliberately weaker than [`Self::write`]
336    /// only for the rename/dirent half, for callers whose whole
337    /// operation is idempotent — e.g. the deterministic git-import,
338    /// which re-runs from a retained source mirror): after a crash
339    /// BEFORE `commit`, a just-renamed object's dirent may be lost (the
340    /// shard dir is not yet fsynced), so objects may be missing — never
341    /// torn, since contents were fsynced first. Existing paths are
342    /// VERIFIED (byte compare)
343    /// rather than blindly rewritten or blindly trusted: a matching
344    /// file is left untouched (it may be durable and referenced by
345    /// native history — replacing it with an unsynced inode would put
346    /// it at risk), a torn one is healed by rewrite, and reads always
347    /// BLAKE3-verify. Callers MUST gate bulk sessions behind their
348    /// own crash marker and re-run on detection.
349    #[must_use]
350    pub fn bulk_writer(&self) -> BulkWriter<'_> {
351        BulkWriter {
352            store: self,
353            dirs: std::collections::HashSet::new(),
354            files: std::collections::HashSet::new(),
355        }
356    }
357
358    /// Read raw bytes for `h`. Verifies that BLAKE3 of the on-disk
359    /// bytes equals `h` and returns [`StoreError::HashMismatch`] on
360    /// failure (the bytes are still discarded so callers cannot
361    /// accidentally use corrupt data).
362    pub fn read(&self, h: &Hash) -> StoreResult<Vec<u8>> {
363        let path = self.path_for(h);
364        let mut file = File::open(&path).map_err(|e| match e.kind() {
365            io::ErrorKind::NotFound => StoreError::ObjectNotFound(to_hex(h)),
366            _ => StoreError::Io(e),
367        })?;
368        let meta = file.metadata()?;
369        let size = meta.len();
370        if u128::from(size) > MAX_RAW_OBJECT_SIZE as u128 {
371            return Err(StoreError::ObjectTooLarge);
372        }
373        // We've already bounded `size` by `MAX_RAW_OBJECT_SIZE` (1 GiB),
374        // which fits in `usize` on every platform we support (32-bit
375        // included). Pre-size the buffer to avoid the doubling
376        // re-allocations of `read_to_end` for large objects.
377        let cap = usize::try_from(size).map_err(|_| StoreError::ObjectTooLarge)?;
378        let mut bytes = Vec::with_capacity(cap);
379        file.read_to_end(&mut bytes)?;
380        let actual = hash::hash(&bytes);
381        if actual != *h {
382            return Err(StoreError::HashMismatch {
383                expected: to_hex(h),
384                actual: to_hex(&actual),
385            });
386        }
387        Ok(bytes)
388    }
389
390    /// The object's type tag, from its 6-byte prologue — without
391    /// reading or hash-verifying the body. Backs cheap shape checks
392    /// (e.g. "is this staged hash blob-like?") that previously paid a
393    /// full read+BLAKE3 of every staged blob per status/commit; the
394    /// real read path still integrity-verifies at use time.
395    ///
396    /// # Errors
397    /// [`StoreError::ObjectNotFound`] if absent; [`StoreError::Decode`]
398    /// for a short file, bad magic/version, or unknown tag.
399    pub fn object_type(&self, h: &Hash) -> StoreResult<crate::object::ObjectType> {
400        let path = self.path_for(h);
401        let mut file = File::open(&path).map_err(|e| match e.kind() {
402            io::ErrorKind::NotFound => StoreError::ObjectNotFound(to_hex(h)),
403            _ => StoreError::Io(e),
404        })?;
405        let mut prologue = [0u8; 6];
406        file.read_exact(&mut prologue)
407            .map_err(|_| StoreError::Decode(MkitError::EmptyData))?;
408        if prologue[1..5] != crate::object::MAGIC {
409            return Err(StoreError::Decode(MkitError::InvalidMagic));
410        }
411        if prologue[5] != crate::object::SCHEMA_VERSION {
412            return Err(StoreError::Decode(MkitError::UnsupportedObjectVersion));
413        }
414        crate::object::ObjectType::from_u8(prologue[0]).map_err(StoreError::Decode)
415    }
416
417    /// Convenience: read raw bytes and decode into a typed [`Object`].
418    pub fn read_object(&self, h: &Hash) -> StoreResult<Object> {
419        let bytes = self.read(h)?;
420        let obj = serialize::deserialize(&bytes)?;
421        Ok(obj)
422    }
423
424    /// Hash-verifying variant of [`object_type`](Self::object_type): reads
425    /// the whole object and confirms its content hashes to `h` before
426    /// returning the declared type. This is the integrity guard for
427    /// tree-publication paths (commit, merge, rebase, …) — `object_type`
428    /// alone reads only the 6-byte prologue, so a staged object corrupted
429    /// after `add` would otherwise be published into a durable tree and
430    /// only fail at later read time. Use `object_type` on hot read-only
431    /// paths (status/diff snapshots) where nothing durable is published.
432    pub fn verify_object_type(&self, h: &Hash) -> StoreResult<crate::object::ObjectType> {
433        let bytes = self.read(h)?; // read() re-hashes and rejects on mismatch
434        if bytes.len() < 6 {
435            return Err(StoreError::Decode(MkitError::EmptyData));
436        }
437        if bytes[1..5] != crate::object::MAGIC {
438            return Err(StoreError::Decode(MkitError::InvalidMagic));
439        }
440        if bytes[5] != crate::object::SCHEMA_VERSION {
441            return Err(StoreError::Decode(MkitError::UnsupportedObjectVersion));
442        }
443        crate::object::ObjectType::from_u8(bytes[0]).map_err(StoreError::Decode)
444    }
445
446    /// Enumerate every object hash currently in the store by walking
447    /// `objects/<2-hex>/<62-hex>`. Entries whose names are not the
448    /// expected hex shape (stray files, atomic-write temp files, unknown
449    /// dirs) are skipped — they are not objects. Used by `gc` to find
450    /// prune candidates.
451    ///
452    /// # Errors
453    /// [`StoreError::Io`] if a directory cannot be read (gc must then
454    /// fail closed rather than prune against a partial enumeration).
455    pub fn iter_object_hashes(&self) -> StoreResult<Vec<Hash>> {
456        let mut out = Vec::new();
457        let shards = match fs::read_dir(&self.objects_root) {
458            Ok(rd) => rd,
459            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
460            Err(e) => return Err(StoreError::Io(e)),
461        };
462        for shard in shards {
463            let shard = shard?;
464            if !shard.file_type()?.is_dir() {
465                continue;
466            }
467            let dir_name = shard.file_name();
468            let Some(dir) = dir_name.to_str() else {
469                continue;
470            };
471            if dir.len() != 2 || !dir.bytes().all(|b| b.is_ascii_hexdigit()) {
472                continue;
473            }
474            for entry in fs::read_dir(shard.path())? {
475                let entry = entry?;
476                if !entry.file_type()?.is_file() {
477                    continue;
478                }
479                let file_name = entry.file_name();
480                let Some(file) = file_name.to_str() else {
481                    continue;
482                };
483                if file.len() != 62 {
484                    continue;
485                }
486                // Reassemble the 64-hex id and parse it; skips non-objects.
487                if let Ok(h) = hash::from_hex(&format!("{dir}{file}")) {
488                    out.push(h);
489                }
490            }
491        }
492        Ok(out)
493    }
494
495    /// Filesystem metadata for object `h` (size + mtime), for gc's
496    /// grace-window check.
497    ///
498    /// # Errors
499    /// [`StoreError::ObjectNotFound`] if absent, else [`StoreError::Io`].
500    pub fn object_metadata(&self, h: &Hash) -> StoreResult<fs::Metadata> {
501        fs::metadata(self.path_for(h)).map_err(|e| match e.kind() {
502            io::ErrorKind::NotFound => StoreError::ObjectNotFound(to_hex(h)),
503            _ => StoreError::Io(e),
504        })
505    }
506
507    /// Delete object `h` from the store. Idempotent: a missing object is
508    /// not an error (it may have been pruned already).
509    ///
510    /// # Errors
511    /// [`StoreError::Io`] on a filesystem failure other than not-found.
512    pub fn remove_object(&self, h: &Hash) -> StoreResult<()> {
513        match fs::remove_file(self.path_for(h)) {
514            Ok(()) => Ok(()),
515            Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
516            Err(e) => Err(StoreError::Io(e)),
517        }
518    }
519}
520
521/// Create a uniquely-named sibling temp file in `parent` for the object
522/// file `file_name`. The `.{name}.tmp.{pid}.{seq}` shape is what
523/// [`ObjectStore::iter_object_hashes`] and the stale-temp tests rely on
524/// to skip non-objects.
525pub(crate) fn temp_file_in(parent: &Path, file_name: &str) -> io::Result<NamedTempFile> {
526    let pid = process::id();
527    let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed);
528    let tmp_name = format!(".{file_name}.tmp.{pid}.{seq}");
529    NamedTempFile::with_prefix_in(tmp_name, parent)
530}
531
532/// Atomically write `bytes` to `final_path` with per-object durability
533/// ([`SyncPolicy::PerObject`]): temp file, full flush, rename into
534/// place, parent-dir flush. On Unix, `rename(2)` is atomic with respect
535/// to concurrent readers and replaces the destination. On Windows,
536/// [`NamedTempFile::persist`] uses `MOVEFILE_REPLACE_EXISTING`
537/// semantics so the replace-existing path works there too.
538///
539/// After a successful rename we flush the parent directory on Unix to
540/// commit the dirent update — without this, the rename can survive a
541/// power loss only in the page cache and the file appears missing on
542/// reboot.
543///
544/// All flush/rename primitives go through `syncer` so unit tests can
545/// assert ordering; [`RealSyncer`] preserves the historical behaviour
546/// exactly.
547fn write_atomic(final_path: &Path, bytes: &[u8], syncer: &dyn Syncer) -> io::Result<()> {
548    let parent = final_path.parent().expect("write_atomic: path has parent");
549    let file_name = final_path
550        .file_name()
551        .expect("write_atomic: path has file name")
552        .to_string_lossy();
553
554    let mut tmp = temp_file_in(parent, &file_name)?;
555    tmp.as_file_mut().write_all(bytes)?;
556    syncer.full(tmp.as_file(), tmp.path())?;
557
558    // NamedTempFile::persist uses a cross-platform atomic replace:
559    // rename(2) on Unix, MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows.
560    syncer.rename(tmp.into_temp_path(), final_path)?;
561
562    syncer.dir_sync(parent)?;
563    Ok(())
564}
565
566/// Read source shared by [`ObjectStore`] and snapshot overlays, so
567/// tree-diff code can resolve objects from either the durable store or
568/// an in-memory [`EphemeralSink`].
569pub trait ObjectSource {
570    /// Read and integrity-verify the raw bytes of `h`.
571    ///
572    /// # Errors
573    /// [`StoreError::ObjectNotFound`] / [`StoreError::HashMismatch`] /
574    /// I/O errors, as for [`ObjectStore::read`].
575    fn read(&self, h: &Hash) -> StoreResult<Vec<u8>>;
576
577    /// Read and decode `h` into a typed [`Object`].
578    ///
579    /// # Errors
580    /// As [`Self::read`], plus decode errors.
581    fn read_object(&self, h: &Hash) -> StoreResult<Object> {
582        Ok(serialize::deserialize(&self.read(h)?)?)
583    }
584}
585
586impl ObjectSource for ObjectStore {
587    fn read(&self, h: &Hash) -> StoreResult<Vec<u8>> {
588        ObjectStore::read(self, h)
589    }
590}
591
592/// In-memory object overlay for **ephemeral worktree snapshots**
593/// (`status`, `diff`, conflict/restore safety checks).
594///
595/// Writes never touch the durable store: objects whose hash already
596/// exists on disk are deduplicated against it (the store's
597/// visible-implies-durable invariant makes that safe), everything else
598/// lives in a private map that vanishes with the sink. This is what
599/// keeps query commands from (a) paying any durability cost, (b)
600/// growing `objects/` with throwaway snapshot trees, and (c) ever
601/// making a non-durable object *visible* where another writer's dedup
602/// could durably reference it.
603///
604/// Reads fall through to the underlying store, so diff walkers can
605/// resolve a snapshot tree that references committed objects.
606#[derive(Debug)]
607pub struct EphemeralSink<'s> {
608    store: &'s ObjectStore,
609    objects: std::sync::Mutex<std::collections::HashMap<Hash, Vec<u8>>>,
610}
611
612impl<'s> EphemeralSink<'s> {
613    /// Create an empty overlay over `store`.
614    #[must_use]
615    pub fn new(store: &'s ObjectStore) -> Self {
616        Self {
617            store,
618            objects: std::sync::Mutex::new(std::collections::HashMap::new()),
619        }
620    }
621}
622
623impl ObjectSink for EphemeralSink<'_> {
624    fn put(&self, bytes: &[u8]) -> StoreResult<Hash> {
625        self.put_parts(&[bytes])
626    }
627
628    fn put_parts(&self, parts: &[&[u8]]) -> StoreResult<Hash> {
629        let mut total: usize = 0;
630        let mut hasher = hash::Hasher::new();
631        for p in parts {
632            total = total
633                .checked_add(p.len())
634                .ok_or(StoreError::ObjectTooLarge)?;
635            hasher.update(p);
636        }
637        if total > MAX_RAW_OBJECT_SIZE {
638            return Err(StoreError::ObjectTooLarge);
639        }
640        let h = hasher.finalize();
641        // Dedup against the durable store: visible store objects are
642        // durable by invariant, and skipping them keeps the overlay's
643        // memory bounded by the *changed* content, not the worktree.
644        if self.store.contains(&h) {
645            return Ok(h);
646        }
647        let mut map = self.objects.lock().expect("ephemeral sink mutex");
648        map.entry(h).or_insert_with(|| {
649            let mut buf = Vec::with_capacity(total);
650            for p in parts {
651                buf.extend_from_slice(p);
652            }
653            buf
654        });
655        Ok(h)
656    }
657
658    fn has(&self, h: &Hash) -> bool {
659        self.objects
660            .lock()
661            .expect("ephemeral sink mutex")
662            .contains_key(h)
663            || self.store.contains(h)
664    }
665}
666
667impl ObjectSource for EphemeralSink<'_> {
668    fn read(&self, h: &Hash) -> StoreResult<Vec<u8>> {
669        if let Some(bytes) = self
670            .objects
671            .lock()
672            .expect("ephemeral sink mutex")
673            .get(h)
674            .cloned()
675        {
676            return Ok(bytes);
677        }
678        self.store.read(h)
679    }
680}
681
682/// Write target shared by [`ObjectStore`] (per-object durability) and
683/// [`WriteBatch`] (batched durability), so ingest code can be written
684/// once against either sink.
685pub trait ObjectSink {
686    /// Store `bytes` as one object, returning its BLAKE3 hash.
687    fn put(&self, bytes: &[u8]) -> StoreResult<Hash>;
688    /// Store the concatenation of `parts` as one object without the
689    /// caller having to materialise the concatenated buffer.
690    fn put_parts(&self, parts: &[&[u8]]) -> StoreResult<Hash>;
691    /// True when the object is already present (or staged) in this sink.
692    fn has(&self, h: &Hash) -> bool;
693}
694
695impl ObjectSink for ObjectStore {
696    fn put(&self, bytes: &[u8]) -> StoreResult<Hash> {
697        self.write(bytes)
698    }
699
700    fn put_parts(&self, parts: &[&[u8]]) -> StoreResult<Hash> {
701        // Per-object path is the legacy/rare sink; hot ingest paths use
702        // WriteBatch, whose put_parts is copy-free.
703        let total: usize = parts.iter().map(|p| p.len()).sum();
704        let mut bytes = Vec::with_capacity(total);
705        for p in parts {
706            bytes.extend_from_slice(p);
707        }
708        self.write(&bytes)
709    }
710
711    fn has(&self, h: &Hash) -> bool {
712        self.contains(h)
713    }
714}
715
716/// On Unix, fsync the directory holding the just-renamed file so the
717/// dirent update is durable. No-op on non-Unix (Windows does not expose
718/// a stable directory-fsync primitive via `std::fs`).
719#[cfg(unix)]
720pub(crate) fn sync_parent_dir(parent: &Path) -> io::Result<()> {
721    match File::open(parent) {
722        Ok(dir) => dir.sync_all(),
723        // If the dir disappeared under us (race with external cleanup),
724        // the durability invariant is moot — propagate silently.
725        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
726        Err(e) => Err(e),
727    }
728}
729
730#[cfg(not(unix))]
731#[allow(clippy::unnecessary_wraps)]
732pub(crate) fn sync_parent_dir(_parent: &Path) -> io::Result<()> {
733    Ok(())
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739    use crate::object::Blob;
740    use std::fs::OpenOptions;
741    use std::io::Seek;
742    use tempfile::TempDir;
743
744    fn fresh_store() -> (TempDir, ObjectStore) {
745        let dir = TempDir::new().expect("tempdir");
746        let store = ObjectStore::init(dir.path()).expect("init");
747        (dir, store)
748    }
749
750    #[test]
751    fn init_creates_layout() {
752        let dir = TempDir::new().unwrap();
753        let _ = ObjectStore::init(dir.path()).unwrap();
754        assert!(dir.path().join(MKIT_DIR).is_dir());
755        assert!(dir.path().join(MKIT_DIR).join(OBJECTS_DIR).is_dir());
756    }
757
758    #[test]
759    fn init_rejects_already_initialized() {
760        let dir = TempDir::new().unwrap();
761        ObjectStore::init(dir.path()).unwrap();
762        let err = ObjectStore::init(dir.path()).unwrap_err();
763        assert!(matches!(err, StoreError::AlreadyInitialized));
764    }
765
766    #[test]
767    fn open_rejects_non_repo() {
768        let dir = TempDir::new().unwrap();
769        let err = ObjectStore::open(dir.path()).unwrap_err();
770        assert!(matches!(err, StoreError::NotAMkitRepository));
771    }
772
773    #[test]
774    fn is_repo_root_predicate() {
775        let dir = TempDir::new().unwrap();
776        assert!(!ObjectStore::is_repo_root(dir.path()));
777        ObjectStore::init(dir.path()).unwrap();
778        assert!(ObjectStore::is_repo_root(dir.path()));
779    }
780
781    #[test]
782    fn write_then_read_roundtrip() {
783        let (_dir, store) = fresh_store();
784        let bytes = b"hello world".to_vec();
785        let h = store.write(&bytes).unwrap();
786        assert!(store.contains(&h));
787        let got = store.read(&h).unwrap();
788        assert_eq!(got, bytes);
789    }
790
791    #[test]
792    fn read_object_deserialises() {
793        let (_dir, store) = fresh_store();
794        let obj = Object::Blob(Blob {
795            data: b"object bytes".to_vec(),
796        });
797        let bytes = serialize::serialize(&obj).unwrap();
798        let h = store.write(&bytes).unwrap();
799        let parsed = store.read_object(&h).unwrap();
800        assert_eq!(parsed, obj);
801    }
802
803    #[test]
804    fn write_is_idempotent() {
805        let (_dir, store) = fresh_store();
806        let bytes = b"duplicate".to_vec();
807        let h1 = store.write(&bytes).unwrap();
808        let h2 = store.write(&bytes).unwrap();
809        assert_eq!(h1, h2);
810        // Second write must not have produced any stray temp files.
811        let shard = store.path_for(&h1);
812        let parent = shard.parent().unwrap();
813        let entries: Vec<_> = fs::read_dir(parent).unwrap().collect();
814        assert_eq!(
815            entries.len(),
816            1,
817            "shard dir should contain exactly the final object, no temp leaks"
818        );
819    }
820
821    #[test]
822    fn read_missing_returns_not_found() {
823        let (_dir, store) = fresh_store();
824        let phony = hash::hash(b"never written");
825        let err = store.read(&phony).unwrap_err();
826        assert!(matches!(err, StoreError::ObjectNotFound(_)));
827        assert!(!store.contains(&phony));
828    }
829
830    #[test]
831    fn write_rejects_oversize() {
832        let (_dir, store) = fresh_store();
833        // We obviously can't allocate 1 GiB+1 in a test, so use a small
834        // synthetic threshold check by exercising the guard surface
835        // through a fake slice header. The cleanest portable approach
836        // is to assert the constant directly and rely on the smaller
837        // overflow test below for runtime coverage.
838        let _ = MAX_RAW_OBJECT_SIZE;
839        // Realistic small write still works.
840        let h = store.write(&[0u8; 16]).unwrap();
841        assert!(store.contains(&h));
842    }
843
844    #[test]
845    fn read_rejects_oversize_on_disk() {
846        // Construct an oversize on-disk blob by hand and confirm `read`
847        // refuses it. We use a sentinel size = MAX + 1 file padded with
848        // zeros; this allocates ~1 GiB of disk, which is unfriendly in
849        // unit tests, so instead we monkey-patch via a smaller-cap copy
850        // of the read path: we synthesise a too-large file by truncating
851        // a real one and verifying the comparison logic at the boundary.
852        //
853        // We exercise `MAX_RAW_OBJECT_SIZE` indirectly by writing a
854        // small object and then *replacing* the on-disk file with one
855        // whose `metadata().len()` exceeds the cap. We use sparse
856        // truncation so no real disk is consumed.
857        let (_dir, store) = fresh_store();
858        let h = store.write(b"seed").unwrap();
859        let path = store.path_for(&h);
860        let f = OpenOptions::new().write(true).open(&path).unwrap();
861        // Sparse extend to cap+1 bytes; allocates effectively no blocks.
862        f.set_len(MAX_RAW_OBJECT_SIZE as u64 + 1).unwrap();
863        drop(f);
864        let err = store.read(&h).unwrap_err();
865        assert!(matches!(err, StoreError::ObjectTooLarge));
866    }
867
868    #[test]
869    fn read_detects_corruption() {
870        let (_dir, store) = fresh_store();
871        let bytes = b"trustworthy".to_vec();
872        let h = store.write(&bytes).unwrap();
873        // Flip a single byte in the on-disk file and expect HashMismatch.
874        let path = store.path_for(&h);
875        {
876            let mut f = OpenOptions::new()
877                .read(true)
878                .write(true)
879                .open(&path)
880                .unwrap();
881            f.seek(io::SeekFrom::Start(0)).unwrap();
882            f.write_all(&[bytes[0] ^ 0xFF]).unwrap();
883            f.sync_all().unwrap();
884        }
885        let err = store.read(&h).unwrap_err();
886        match err {
887            StoreError::HashMismatch { expected, actual } => {
888                assert_eq!(expected, to_hex(&h));
889                assert_ne!(actual, expected, "actual must differ once corrupted");
890            }
891            other => panic!("expected HashMismatch, got {other:?}"),
892        }
893    }
894
895    #[test]
896    fn path_layout_is_2_then_62_hex() {
897        let (_dir, store) = fresh_store();
898        let bytes = b"layout test".to_vec();
899        let h = store.write(&bytes).unwrap();
900        let hex = to_hex(&h);
901        let path = store.path_for(&h);
902        let parent = path.parent().unwrap();
903        let parent_name = parent.file_name().unwrap().to_str().unwrap();
904        let file_name = path.file_name().unwrap().to_str().unwrap();
905        assert_eq!(parent_name.len(), 2);
906        assert_eq!(file_name.len(), 62);
907        assert_eq!(parent_name, &hex[..2]);
908        assert_eq!(file_name, &hex[2..]);
909        assert!(path.is_file(), "object file must exist at expected path");
910    }
911
912    #[test]
913    fn temp_file_left_behind_does_not_satisfy_contains() {
914        // Simulate a crash mid-write: drop a stale `.tmp.*` file in the
915        // shard dir without ever renaming. `contains()` must report the
916        // object as absent, and the shard dir must not contain a real
917        // entry that passes the hash check.
918        let (_dir, store) = fresh_store();
919        let target = hash::hash(b"never finalised");
920        let final_path = store.path_for(&target);
921        let shard = final_path.parent().unwrap();
922        fs::create_dir_all(shard).unwrap();
923        let stale = shard.join(format!(
924            ".{}.tmp.0.0",
925            final_path.file_name().unwrap().to_string_lossy()
926        ));
927        fs::write(&stale, b"partial").unwrap();
928        assert!(stale.is_file());
929        assert!(!store.contains(&target));
930        // No file in the shard dir should hash to `target`.
931        for entry in fs::read_dir(shard).unwrap() {
932            let p = entry.unwrap().path();
933            let bytes = fs::read(&p).unwrap();
934            assert_ne!(
935                hash::hash(&bytes),
936                target,
937                "stale temp file must not satisfy the target hash"
938            );
939        }
940    }
941}
942
943#[cfg(test)]
944mod bulk_writer_tests {
945    use super::*;
946
947    #[test]
948    fn bulk_writer_round_trips_and_rewrites() {
949        let td = tempfile::tempdir().unwrap();
950        let store = ObjectStore::init(td.path()).unwrap();
951        let obj = crate::serialize::serialize(&crate::object::Object::Blob(crate::object::Blob {
952            data: b"bulk".to_vec(),
953        }))
954        .unwrap();
955        let mut bw = store.bulk_writer();
956        let h1 = bw.write(&obj).unwrap();
957        // No existence short-circuit: rewriting is fine and heals
958        // torn files on idempotent re-runs.
959        let h2 = bw.write(&obj).unwrap();
960        assert_eq!(h1, h2);
961        bw.commit().unwrap();
962        assert_eq!(store.read(&h1).unwrap(), obj);
963        // Interoperates with the normal (fsynced) writer.
964        assert_eq!(store.write(&obj).unwrap(), h1);
965    }
966}