mkit_core/store.rs
1//! Local content-addressed object store.
2//!
3//! Layout (rooted at the working-tree directory passed to [`ObjectStore::open`]
4//! / [`ObjectStore::init`]):
5//!
6//! ```text
7//! .mkit/
8//! objects/
9//! <2-hex>/<62-hex> # raw canonical object bytes, BLAKE3-named
10//! ```
11//!
12//! Writes are atomic: bytes are first written to a sibling temp file
13//! (`<name>.tmp.<pid>.<rand>`), made durable, then renamed into place.
14//! A crash mid-write leaves only the temp file behind and never
15//! produces a visible object that fails the read-time hash check.
16//!
17//! Durability comes in three shapes (see [`crate::batch`] for the full
18//! contract): [`ObjectStore::write`] flushes per object
19//! ([`SyncPolicy::PerObject`]); [`ObjectStore::batch`] defers visibility
20//! and amortises durability to **one** full flush per batch — the write
21//! path used by every multi-object command (add, commit, pack unpack);
22//! and [`ObjectStore::bulk_writer`] fsyncs each object's contents before
23//! the rename and batches the dir fsyncs at commit (git import). In all
24//! three an object is never visible before its bytes are durable, so a
25//! ref or index written after the write/commit returns can never
26//! reference a non-durable object.
27//!
28//! Reads always verify integrity by recomputing BLAKE3 over the bytes
29//! and comparing against the requested hash; mismatch returns
30//! [`StoreError::HashMismatch`].
31//!
32//! See `docs/SPEC-OBJECTS.md` §10 for the path-layout rule.
33
34use std::fs::{self, File};
35use std::io::{self, Read, Write};
36use std::path::{Path, PathBuf};
37use std::process;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicU64, Ordering};
40
41use tempfile::NamedTempFile;
42
43use crate::batch::{RealSyncer, Syncer};
44pub use crate::batch::{SyncPolicy, WriteBatch};
45use crate::hash::{self, Hash, object_path, to_hex};
46use crate::object::{MkitError, Object};
47use crate::serialize;
48
49/// Top-level repository directory name.
50pub const MKIT_DIR: &str = ".mkit";
51/// Subdirectory under `.mkit/` that holds raw object files.
52pub const OBJECTS_DIR: &str = "objects";
53/// Hard cap on raw object size, enforced on both [`ObjectStore::write`]
54/// and [`ObjectStore::read`].
55pub const MAX_RAW_OBJECT_SIZE: usize = 1024 * 1024 * 1024; // 1 GiB
56
57/// Hard cap on tree-walk recursion depth.
58///
59/// Every core tree-walker (`index::from_tree`, `ops::diff`, `ops::merge`,
60/// `ops::restore`) recurses one native stack frame per directory level.
61/// Content-addressing prevents true cycles (a child's hash can never equal
62/// its parent's), but a crafted untrusted repo can still nest a few thousand
63/// single-entry trees in a tiny pack and overflow the stack — a denial of
64/// service reachable from `clone`/`checkout`/`merge`/`diff`. Walkers thread a
65/// depth counter and abort with a typed `TreeTooDeep` error once
66/// `depth > MAX_TREE_DEPTH`.
67///
68/// 128 is far beyond any legitimately-deep source tree (Git's own working
69/// trees rarely exceed a couple dozen levels) while remaining comfortably
70/// within a default 8 MiB thread stack. Mirrors the `MAX_REF_DEPTH` cap on
71/// the ref-tree walker in `refs.rs`.
72pub const MAX_TREE_DEPTH: usize = 128;
73
74/// Errors raised by the [`ObjectStore`] surface. Distinct from
75/// [`MkitError`] so callers can pattern-match on filesystem failures
76/// without losing the structured-decode-error variants.
77#[derive(Debug, thiserror::Error)]
78pub enum StoreError {
79 #[error("path is not an mkit repository (missing .mkit/objects)")]
80 NotAMkitRepository,
81 #[error(".mkit already exists in this directory")]
82 AlreadyInitialized,
83 #[error("object {0} not found")]
84 ObjectNotFound(String),
85 #[error("object exceeds {} byte cap", MAX_RAW_OBJECT_SIZE)]
86 ObjectTooLarge,
87 #[error("tree nesting exceeds {} levels", MAX_TREE_DEPTH)]
88 TreeTooDeep,
89 #[error("on-disk bytes hash to {actual}, expected {expected}")]
90 HashMismatch { expected: String, actual: String },
91 #[error(transparent)]
92 Io(#[from] io::Error),
93 #[error(transparent)]
94 Decode(#[from] MkitError),
95}
96
97/// Deferred-fsync writer returned by [`ObjectStore::bulk_writer`].
98/// See that method for the crash-safety contract.
99#[derive(Debug)]
100pub struct BulkWriter<'a> {
101 store: &'a ObjectStore,
102 dirs: std::collections::HashSet<PathBuf>,
103 files: std::collections::HashSet<PathBuf>,
104}
105
106impl BulkWriter<'_> {
107 /// Write one object durable-before-visible: contents are fsynced
108 /// before the rename publishes the object, and only the shard-dir
109 /// fsync (rename durability) is deferred to commit. This upholds the
110 /// store's global invariant (an object is never visible before its
111 /// bytes are durable), so another process's `contains()` dedup can
112 /// never reference a half-written object. An existing path is
113 /// byte-verified: matched objects are left in place (but still
114 /// fsynced at commit, see below), torn ones are rewritten.
115 ///
116 /// # Panics
117 /// Never in practice: object paths always have a 2-hex shard
118 /// parent by construction.
119 pub fn write(&mut self, bytes: &[u8]) -> StoreResult<Hash> {
120 if bytes.len() > MAX_RAW_OBJECT_SIZE {
121 return Err(StoreError::ObjectTooLarge);
122 }
123 let h = hash::hash(bytes);
124 let final_path = self.store.path_for(&h);
125 // VERIFY-skip an existing object: content addressing makes
126 // byte-equality the exact durability test — a good file (from
127 // a previously committed session, possibly referenced by
128 // native history) must NOT be replaced with an unsynced inode
129 // a power loss could tear, while a torn file from a crashed
130 // session fails the comparison and is healed by the rewrite.
131 let shard_dir = final_path
132 .parent()
133 .expect("object path always has a 2-hex parent");
134 if let Ok(existing) = fs::read(&final_path)
135 && existing == bytes
136 {
137 // The bytes may have matched out of the PAGE CACHE of a
138 // crashed session's unsynced write — byte equality is not a
139 // durability test, so this pre-existing file still needs a
140 // content fsync at commit.
141 self.dirs.insert(shard_dir.to_path_buf());
142 self.files.insert(final_path);
143 return Ok(h);
144 }
145 fs::create_dir_all(shard_dir)?;
146 // Content fsync happens BEFORE the rename, so the object is
147 // durable the instant it becomes visible. Only the rename's
148 // durability (the shard dir fsync) is deferred to commit.
149 crate::atomic::write_content_synced(&final_path, bytes)?;
150 self.dirs.insert(shard_dir.to_path_buf());
151 Ok(h)
152 }
153
154 /// Make the session durable: fsync the CONTENTS of any pre-existing
155 /// objects this batch re-used (newly written objects were already
156 /// content-fsynced before their rename in [`write`](Self::write)),
157 /// then every touched shard directory (renames become durable).
158 pub fn commit(self) -> StoreResult<()> {
159 for file in &self.files {
160 fs::OpenOptions::new().write(true).open(file)?.sync_all()?;
161 }
162 for dir in &self.dirs {
163 crate::atomic::sync_dir(dir)?;
164 }
165 Ok(())
166 }
167}
168
169/// Result alias used throughout this module.
170pub type StoreResult<T> = Result<T, StoreError>;
171
172// Tiny per-process counter for unique temp-file names. We use this
173// instead of pulling in `rand` because the temp name only needs to be
174// unique within the process; the atomic `rename` enforces global
175// correctness even if two processes collide on a name.
176static TEMP_SEQ: AtomicU64 = AtomicU64::new(0);
177
178/// Local content-addressed object store backed by the filesystem.
179#[derive(Debug, Clone)]
180pub struct ObjectStore {
181 /// Absolute path to `<root>/.mkit/objects`.
182 objects_root: PathBuf,
183 /// Flush/rename primitive seam. Production code always uses
184 /// [`RealSyncer`]; unit tests inject a recording double to assert
185 /// flush ordering and counts (the O(1)-flushes-per-batch contract).
186 syncer: Arc<dyn Syncer>,
187 /// Policy handed out by [`Self::batch`]. Defaults to
188 /// [`SyncPolicy::Batch`]; the CLI maps the repo config key
189 /// `durability.objects = per-object` onto it for deployments that
190 /// want the strict historical schedule.
191 default_policy: SyncPolicy,
192}
193
194impl ObjectStore {
195 /// Open an existing repository rooted at `root`. Returns
196 /// [`StoreError::NotAMkitRepository`] if `<root>/.mkit/objects` does
197 /// not exist.
198 pub fn open(root: &Path) -> StoreResult<Self> {
199 let objects_root = root.join(MKIT_DIR).join(OBJECTS_DIR);
200 if !objects_root.is_dir() {
201 return Err(StoreError::NotAMkitRepository);
202 }
203 Ok(Self {
204 objects_root,
205 syncer: Arc::new(RealSyncer),
206 default_policy: SyncPolicy::Batch,
207 })
208 }
209
210 /// Initialise a fresh `.mkit/` directory under `root`. Returns
211 /// [`StoreError::AlreadyInitialized`] if `.mkit/` already exists.
212 pub fn init(root: &Path) -> StoreResult<Self> {
213 let mkit_root = root.join(MKIT_DIR);
214 if mkit_root.exists() {
215 return Err(StoreError::AlreadyInitialized);
216 }
217 let objects_root = mkit_root.join(OBJECTS_DIR);
218 fs::create_dir_all(&objects_root)?;
219 Ok(Self {
220 objects_root,
221 syncer: Arc::new(RealSyncer),
222 default_policy: SyncPolicy::Batch,
223 })
224 }
225
226 /// Select the [`SyncPolicy`] that [`Self::batch`] hands out. The
227 /// CLI wires the repo config key `durability.objects` here so
228 /// deployments on filesystems where they prefer the strict
229 /// per-object schedule can opt into it (SPEC-OBJECTS §10.1).
230 pub fn set_sync_policy(&mut self, policy: SyncPolicy) {
231 self.default_policy = policy;
232 }
233
234 /// Replace the flush/rename primitives. Test-only seam — see the
235 /// `syncer` field. Not exposed publicly so the production sync
236 /// strategy cannot be silently weakened by downstream code.
237 #[cfg(test)]
238 pub(crate) fn set_syncer(&mut self, syncer: Arc<dyn Syncer>) {
239 self.syncer = syncer;
240 }
241
242 /// The active flush/rename primitives, shared with [`WriteBatch`].
243 pub(crate) fn syncer(&self) -> &Arc<dyn Syncer> {
244 &self.syncer
245 }
246
247 /// Start a batched write with the store's configured policy
248 /// (default [`SyncPolicy::Batch`]): objects staged by the batch
249 /// become durable and visible together at [`WriteBatch::commit`],
250 /// with O(1) full flushes per batch instead of per object.
251 #[must_use]
252 pub fn batch(&self) -> WriteBatch<'_> {
253 self.batch_with_policy(self.default_policy)
254 }
255
256 /// Start a batched write with an explicit [`SyncPolicy`].
257 #[must_use]
258 pub fn batch_with_policy(&self, policy: SyncPolicy) -> WriteBatch<'_> {
259 WriteBatch::new(self, policy)
260 }
261
262 /// Returns `true` when `root` contains a `.mkit/objects` directory.
263 #[must_use]
264 pub fn is_repo_root(root: &Path) -> bool {
265 root.join(MKIT_DIR).join(OBJECTS_DIR).is_dir()
266 }
267
268 /// Absolute path to the `objects/` directory.
269 #[must_use]
270 pub fn objects_root(&self) -> &Path {
271 &self.objects_root
272 }
273
274 /// Compute the on-disk path for `hash`, joined under `objects/`.
275 /// `pub(crate)` so [`WriteBatch`] shares the single layout rule.
276 pub(crate) fn path_for(&self, h: &Hash) -> PathBuf {
277 let p = object_path(h);
278 // Both halves are ASCII hex by construction in `object_path`.
279 let dir = std::str::from_utf8(&p.dir).expect("ascii hex");
280 let file = std::str::from_utf8(&p.file).expect("ascii hex");
281 self.objects_root.join(dir).join(file)
282 }
283
284 /// Returns `true` when the object `h` is present in the store. Does
285 /// **not** verify integrity — use [`Self::read`] for that.
286 #[must_use]
287 pub fn contains(&self, h: &Hash) -> bool {
288 self.path_for(h).is_file()
289 }
290
291 /// Write `bytes` to the store, returning their BLAKE3 hash. Atomic:
292 /// writes to a sibling temp file, `fsync`s, then renames into place.
293 /// Idempotent — re-writing the same bytes is a no-op (the temp file
294 /// is unlinked on the early-return path).
295 ///
296 /// # Panics
297 ///
298 /// Panics only if the internal hash-to-path mapping produces a path
299 /// without a parent directory, which is impossible by construction.
300 pub fn write(&self, bytes: &[u8]) -> StoreResult<Hash> {
301 if bytes.len() > MAX_RAW_OBJECT_SIZE {
302 return Err(StoreError::ObjectTooLarge);
303 }
304 let h = hash::hash(bytes);
305 let final_path = self.path_for(&h);
306 if final_path.exists() {
307 // Dedup hit: the object is visible, but if another process
308 // renamed it and has not yet flushed the dirent, it may not
309 // be durable — and our caller is about to reference it.
310 // Flush its shard dir before returning (SPEC-OBJECTS §10.1
311 // dedup rule, mirroring WriteBatch's touched_shards).
312 self.syncer()
313 .dir_sync(final_path.parent().expect("object path has parent"))?;
314 return Ok(h);
315 }
316 let shard_dir = final_path
317 .parent()
318 .expect("object path always has a 2-hex parent");
319 fs::create_dir_all(shard_dir)?;
320 write_atomic(&final_path, bytes, &**self.syncer())?;
321 Ok(h)
322 }
323
324 /// Begin a bulk-write session: each new object is written
325 /// durable-before-visible (contents fsynced, then renamed), and
326 /// [`BulkWriter::commit`] batches the directory fsyncs (rename
327 /// durability) plus a content fsync of any re-used pre-existing
328 /// objects once at the end, instead of fsyncing a dir per write.
329 ///
330 /// Because content is durable before the rename, the session upholds
331 /// the store's global invariant even under concurrent readers: an
332 /// object another process can `contains()`-dedup against is always
333 /// backed by durable bytes.
334 ///
335 /// Crash-safety contract (deliberately weaker than [`Self::write`]
336 /// only for the rename/dirent half, for callers whose whole
337 /// operation is idempotent — e.g. the deterministic git-import,
338 /// which re-runs from a retained source mirror): after a crash
339 /// BEFORE `commit`, a just-renamed object's dirent may be lost (the
340 /// shard dir is not yet fsynced), so objects may be missing — never
341 /// torn, since contents were fsynced first. Existing paths are
342 /// VERIFIED (byte compare)
343 /// rather than blindly rewritten or blindly trusted: a matching
344 /// file is left untouched (it may be durable and referenced by
345 /// native history — replacing it with an unsynced inode would put
346 /// it at risk), a torn one is healed by rewrite, and reads always
347 /// BLAKE3-verify. Callers MUST gate bulk sessions behind their
348 /// own crash marker and re-run on detection.
349 #[must_use]
350 pub fn bulk_writer(&self) -> BulkWriter<'_> {
351 BulkWriter {
352 store: self,
353 dirs: std::collections::HashSet::new(),
354 files: std::collections::HashSet::new(),
355 }
356 }
357
358 /// Read raw bytes for `h`. Verifies that BLAKE3 of the on-disk
359 /// bytes equals `h` and returns [`StoreError::HashMismatch`] on
360 /// failure (the bytes are still discarded so callers cannot
361 /// accidentally use corrupt data).
362 pub fn read(&self, h: &Hash) -> StoreResult<Vec<u8>> {
363 let path = self.path_for(h);
364 let mut file = File::open(&path).map_err(|e| match e.kind() {
365 io::ErrorKind::NotFound => StoreError::ObjectNotFound(to_hex(h)),
366 _ => StoreError::Io(e),
367 })?;
368 let meta = file.metadata()?;
369 let size = meta.len();
370 if u128::from(size) > MAX_RAW_OBJECT_SIZE as u128 {
371 return Err(StoreError::ObjectTooLarge);
372 }
373 // We've already bounded `size` by `MAX_RAW_OBJECT_SIZE` (1 GiB),
374 // which fits in `usize` on every platform we support (32-bit
375 // included). Pre-size the buffer to avoid the doubling
376 // re-allocations of `read_to_end` for large objects.
377 let cap = usize::try_from(size).map_err(|_| StoreError::ObjectTooLarge)?;
378 let mut bytes = Vec::with_capacity(cap);
379 file.read_to_end(&mut bytes)?;
380 let actual = hash::hash(&bytes);
381 if actual != *h {
382 return Err(StoreError::HashMismatch {
383 expected: to_hex(h),
384 actual: to_hex(&actual),
385 });
386 }
387 Ok(bytes)
388 }
389
390 /// The object's type tag, from its 6-byte prologue — without
391 /// reading or hash-verifying the body. Backs cheap shape checks
392 /// (e.g. "is this staged hash blob-like?") that previously paid a
393 /// full read+BLAKE3 of every staged blob per status/commit; the
394 /// real read path still integrity-verifies at use time.
395 ///
396 /// # Errors
397 /// [`StoreError::ObjectNotFound`] if absent; [`StoreError::Decode`]
398 /// for a short file, bad magic/version, or unknown tag.
399 pub fn object_type(&self, h: &Hash) -> StoreResult<crate::object::ObjectType> {
400 let path = self.path_for(h);
401 let mut file = File::open(&path).map_err(|e| match e.kind() {
402 io::ErrorKind::NotFound => StoreError::ObjectNotFound(to_hex(h)),
403 _ => StoreError::Io(e),
404 })?;
405 let mut prologue = [0u8; 6];
406 file.read_exact(&mut prologue)
407 .map_err(|_| StoreError::Decode(MkitError::EmptyData))?;
408 if prologue[1..5] != crate::object::MAGIC {
409 return Err(StoreError::Decode(MkitError::InvalidMagic));
410 }
411 if prologue[5] != crate::object::SCHEMA_VERSION {
412 return Err(StoreError::Decode(MkitError::UnsupportedObjectVersion));
413 }
414 crate::object::ObjectType::from_u8(prologue[0]).map_err(StoreError::Decode)
415 }
416
417 /// Convenience: read raw bytes and decode into a typed [`Object`].
418 pub fn read_object(&self, h: &Hash) -> StoreResult<Object> {
419 let bytes = self.read(h)?;
420 let obj = serialize::deserialize(&bytes)?;
421 Ok(obj)
422 }
423
424 /// Hash-verifying variant of [`object_type`](Self::object_type): reads
425 /// the whole object and confirms its content hashes to `h` before
426 /// returning the declared type. This is the integrity guard for
427 /// tree-publication paths (commit, merge, rebase, …) — `object_type`
428 /// alone reads only the 6-byte prologue, so a staged object corrupted
429 /// after `add` would otherwise be published into a durable tree and
430 /// only fail at later read time. Use `object_type` on hot read-only
431 /// paths (status/diff snapshots) where nothing durable is published.
432 pub fn verify_object_type(&self, h: &Hash) -> StoreResult<crate::object::ObjectType> {
433 let bytes = self.read(h)?; // read() re-hashes and rejects on mismatch
434 if bytes.len() < 6 {
435 return Err(StoreError::Decode(MkitError::EmptyData));
436 }
437 if bytes[1..5] != crate::object::MAGIC {
438 return Err(StoreError::Decode(MkitError::InvalidMagic));
439 }
440 if bytes[5] != crate::object::SCHEMA_VERSION {
441 return Err(StoreError::Decode(MkitError::UnsupportedObjectVersion));
442 }
443 crate::object::ObjectType::from_u8(bytes[0]).map_err(StoreError::Decode)
444 }
445
446 /// Enumerate every object hash currently in the store by walking
447 /// `objects/<2-hex>/<62-hex>`. Entries whose names are not the
448 /// expected hex shape (stray files, atomic-write temp files, unknown
449 /// dirs) are skipped — they are not objects. Used by `gc` to find
450 /// prune candidates.
451 ///
452 /// # Errors
453 /// [`StoreError::Io`] if a directory cannot be read (gc must then
454 /// fail closed rather than prune against a partial enumeration).
455 pub fn iter_object_hashes(&self) -> StoreResult<Vec<Hash>> {
456 let mut out = Vec::new();
457 let shards = match fs::read_dir(&self.objects_root) {
458 Ok(rd) => rd,
459 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
460 Err(e) => return Err(StoreError::Io(e)),
461 };
462 for shard in shards {
463 let shard = shard?;
464 if !shard.file_type()?.is_dir() {
465 continue;
466 }
467 let dir_name = shard.file_name();
468 let Some(dir) = dir_name.to_str() else {
469 continue;
470 };
471 if dir.len() != 2 || !dir.bytes().all(|b| b.is_ascii_hexdigit()) {
472 continue;
473 }
474 for entry in fs::read_dir(shard.path())? {
475 let entry = entry?;
476 if !entry.file_type()?.is_file() {
477 continue;
478 }
479 let file_name = entry.file_name();
480 let Some(file) = file_name.to_str() else {
481 continue;
482 };
483 if file.len() != 62 {
484 continue;
485 }
486 // Reassemble the 64-hex id and parse it; skips non-objects.
487 if let Ok(h) = hash::from_hex(&format!("{dir}{file}")) {
488 out.push(h);
489 }
490 }
491 }
492 Ok(out)
493 }
494
495 /// Filesystem metadata for object `h` (size + mtime), for gc's
496 /// grace-window check.
497 ///
498 /// # Errors
499 /// [`StoreError::ObjectNotFound`] if absent, else [`StoreError::Io`].
500 pub fn object_metadata(&self, h: &Hash) -> StoreResult<fs::Metadata> {
501 fs::metadata(self.path_for(h)).map_err(|e| match e.kind() {
502 io::ErrorKind::NotFound => StoreError::ObjectNotFound(to_hex(h)),
503 _ => StoreError::Io(e),
504 })
505 }
506
507 /// Delete object `h` from the store. Idempotent: a missing object is
508 /// not an error (it may have been pruned already).
509 ///
510 /// # Errors
511 /// [`StoreError::Io`] on a filesystem failure other than not-found.
512 pub fn remove_object(&self, h: &Hash) -> StoreResult<()> {
513 match fs::remove_file(self.path_for(h)) {
514 Ok(()) => Ok(()),
515 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
516 Err(e) => Err(StoreError::Io(e)),
517 }
518 }
519}
520
521/// Create a uniquely-named sibling temp file in `parent` for the object
522/// file `file_name`. The `.{name}.tmp.{pid}.{seq}` shape is what
523/// [`ObjectStore::iter_object_hashes`] and the stale-temp tests rely on
524/// to skip non-objects.
525pub(crate) fn temp_file_in(parent: &Path, file_name: &str) -> io::Result<NamedTempFile> {
526 let pid = process::id();
527 let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed);
528 let tmp_name = format!(".{file_name}.tmp.{pid}.{seq}");
529 NamedTempFile::with_prefix_in(tmp_name, parent)
530}
531
532/// Atomically write `bytes` to `final_path` with per-object durability
533/// ([`SyncPolicy::PerObject`]): temp file, full flush, rename into
534/// place, parent-dir flush. On Unix, `rename(2)` is atomic with respect
535/// to concurrent readers and replaces the destination. On Windows,
536/// [`NamedTempFile::persist`] uses `MOVEFILE_REPLACE_EXISTING`
537/// semantics so the replace-existing path works there too.
538///
539/// After a successful rename we flush the parent directory on Unix to
540/// commit the dirent update — without this, the rename can survive a
541/// power loss only in the page cache and the file appears missing on
542/// reboot.
543///
544/// All flush/rename primitives go through `syncer` so unit tests can
545/// assert ordering; [`RealSyncer`] preserves the historical behaviour
546/// exactly.
547fn write_atomic(final_path: &Path, bytes: &[u8], syncer: &dyn Syncer) -> io::Result<()> {
548 let parent = final_path.parent().expect("write_atomic: path has parent");
549 let file_name = final_path
550 .file_name()
551 .expect("write_atomic: path has file name")
552 .to_string_lossy();
553
554 let mut tmp = temp_file_in(parent, &file_name)?;
555 tmp.as_file_mut().write_all(bytes)?;
556 syncer.full(tmp.as_file(), tmp.path())?;
557
558 // NamedTempFile::persist uses a cross-platform atomic replace:
559 // rename(2) on Unix, MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows.
560 syncer.rename(tmp.into_temp_path(), final_path)?;
561
562 syncer.dir_sync(parent)?;
563 Ok(())
564}
565
566/// Read source shared by [`ObjectStore`] and snapshot overlays, so
567/// tree-diff code can resolve objects from either the durable store or
568/// an in-memory [`EphemeralSink`].
569pub trait ObjectSource {
570 /// Read and integrity-verify the raw bytes of `h`.
571 ///
572 /// # Errors
573 /// [`StoreError::ObjectNotFound`] / [`StoreError::HashMismatch`] /
574 /// I/O errors, as for [`ObjectStore::read`].
575 fn read(&self, h: &Hash) -> StoreResult<Vec<u8>>;
576
577 /// Read and decode `h` into a typed [`Object`].
578 ///
579 /// # Errors
580 /// As [`Self::read`], plus decode errors.
581 fn read_object(&self, h: &Hash) -> StoreResult<Object> {
582 Ok(serialize::deserialize(&self.read(h)?)?)
583 }
584}
585
586impl ObjectSource for ObjectStore {
587 fn read(&self, h: &Hash) -> StoreResult<Vec<u8>> {
588 ObjectStore::read(self, h)
589 }
590}
591
592/// In-memory object overlay for **ephemeral worktree snapshots**
593/// (`status`, `diff`, conflict/restore safety checks).
594///
595/// Writes never touch the durable store: objects whose hash already
596/// exists on disk are deduplicated against it (the store's
597/// visible-implies-durable invariant makes that safe), everything else
598/// lives in a private map that vanishes with the sink. This is what
599/// keeps query commands from (a) paying any durability cost, (b)
600/// growing `objects/` with throwaway snapshot trees, and (c) ever
601/// making a non-durable object *visible* where another writer's dedup
602/// could durably reference it.
603///
604/// Reads fall through to the underlying store, so diff walkers can
605/// resolve a snapshot tree that references committed objects.
606#[derive(Debug)]
607pub struct EphemeralSink<'s> {
608 store: &'s ObjectStore,
609 objects: std::sync::Mutex<std::collections::HashMap<Hash, Vec<u8>>>,
610}
611
612impl<'s> EphemeralSink<'s> {
613 /// Create an empty overlay over `store`.
614 #[must_use]
615 pub fn new(store: &'s ObjectStore) -> Self {
616 Self {
617 store,
618 objects: std::sync::Mutex::new(std::collections::HashMap::new()),
619 }
620 }
621}
622
623impl ObjectSink for EphemeralSink<'_> {
624 fn put(&self, bytes: &[u8]) -> StoreResult<Hash> {
625 self.put_parts(&[bytes])
626 }
627
628 fn put_parts(&self, parts: &[&[u8]]) -> StoreResult<Hash> {
629 let mut total: usize = 0;
630 let mut hasher = hash::Hasher::new();
631 for p in parts {
632 total = total
633 .checked_add(p.len())
634 .ok_or(StoreError::ObjectTooLarge)?;
635 hasher.update(p);
636 }
637 if total > MAX_RAW_OBJECT_SIZE {
638 return Err(StoreError::ObjectTooLarge);
639 }
640 let h = hasher.finalize();
641 // Dedup against the durable store: visible store objects are
642 // durable by invariant, and skipping them keeps the overlay's
643 // memory bounded by the *changed* content, not the worktree.
644 if self.store.contains(&h) {
645 return Ok(h);
646 }
647 let mut map = self.objects.lock().expect("ephemeral sink mutex");
648 map.entry(h).or_insert_with(|| {
649 let mut buf = Vec::with_capacity(total);
650 for p in parts {
651 buf.extend_from_slice(p);
652 }
653 buf
654 });
655 Ok(h)
656 }
657
658 fn has(&self, h: &Hash) -> bool {
659 self.objects
660 .lock()
661 .expect("ephemeral sink mutex")
662 .contains_key(h)
663 || self.store.contains(h)
664 }
665}
666
667impl ObjectSource for EphemeralSink<'_> {
668 fn read(&self, h: &Hash) -> StoreResult<Vec<u8>> {
669 if let Some(bytes) = self
670 .objects
671 .lock()
672 .expect("ephemeral sink mutex")
673 .get(h)
674 .cloned()
675 {
676 return Ok(bytes);
677 }
678 self.store.read(h)
679 }
680}
681
682/// Write target shared by [`ObjectStore`] (per-object durability) and
683/// [`WriteBatch`] (batched durability), so ingest code can be written
684/// once against either sink.
685pub trait ObjectSink {
686 /// Store `bytes` as one object, returning its BLAKE3 hash.
687 fn put(&self, bytes: &[u8]) -> StoreResult<Hash>;
688 /// Store the concatenation of `parts` as one object without the
689 /// caller having to materialise the concatenated buffer.
690 fn put_parts(&self, parts: &[&[u8]]) -> StoreResult<Hash>;
691 /// True when the object is already present (or staged) in this sink.
692 fn has(&self, h: &Hash) -> bool;
693}
694
695impl ObjectSink for ObjectStore {
696 fn put(&self, bytes: &[u8]) -> StoreResult<Hash> {
697 self.write(bytes)
698 }
699
700 fn put_parts(&self, parts: &[&[u8]]) -> StoreResult<Hash> {
701 // Per-object path is the legacy/rare sink; hot ingest paths use
702 // WriteBatch, whose put_parts is copy-free.
703 let total: usize = parts.iter().map(|p| p.len()).sum();
704 let mut bytes = Vec::with_capacity(total);
705 for p in parts {
706 bytes.extend_from_slice(p);
707 }
708 self.write(&bytes)
709 }
710
711 fn has(&self, h: &Hash) -> bool {
712 self.contains(h)
713 }
714}
715
716/// On Unix, fsync the directory holding the just-renamed file so the
717/// dirent update is durable. No-op on non-Unix (Windows does not expose
718/// a stable directory-fsync primitive via `std::fs`).
719#[cfg(unix)]
720pub(crate) fn sync_parent_dir(parent: &Path) -> io::Result<()> {
721 match File::open(parent) {
722 Ok(dir) => dir.sync_all(),
723 // If the dir disappeared under us (race with external cleanup),
724 // the durability invariant is moot — propagate silently.
725 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
726 Err(e) => Err(e),
727 }
728}
729
730#[cfg(not(unix))]
731#[allow(clippy::unnecessary_wraps)]
732pub(crate) fn sync_parent_dir(_parent: &Path) -> io::Result<()> {
733 Ok(())
734}
735
736#[cfg(test)]
737mod tests {
738 use super::*;
739 use crate::object::Blob;
740 use std::fs::OpenOptions;
741 use std::io::Seek;
742 use tempfile::TempDir;
743
744 fn fresh_store() -> (TempDir, ObjectStore) {
745 let dir = TempDir::new().expect("tempdir");
746 let store = ObjectStore::init(dir.path()).expect("init");
747 (dir, store)
748 }
749
750 #[test]
751 fn init_creates_layout() {
752 let dir = TempDir::new().unwrap();
753 let _ = ObjectStore::init(dir.path()).unwrap();
754 assert!(dir.path().join(MKIT_DIR).is_dir());
755 assert!(dir.path().join(MKIT_DIR).join(OBJECTS_DIR).is_dir());
756 }
757
758 #[test]
759 fn init_rejects_already_initialized() {
760 let dir = TempDir::new().unwrap();
761 ObjectStore::init(dir.path()).unwrap();
762 let err = ObjectStore::init(dir.path()).unwrap_err();
763 assert!(matches!(err, StoreError::AlreadyInitialized));
764 }
765
766 #[test]
767 fn open_rejects_non_repo() {
768 let dir = TempDir::new().unwrap();
769 let err = ObjectStore::open(dir.path()).unwrap_err();
770 assert!(matches!(err, StoreError::NotAMkitRepository));
771 }
772
773 #[test]
774 fn is_repo_root_predicate() {
775 let dir = TempDir::new().unwrap();
776 assert!(!ObjectStore::is_repo_root(dir.path()));
777 ObjectStore::init(dir.path()).unwrap();
778 assert!(ObjectStore::is_repo_root(dir.path()));
779 }
780
781 #[test]
782 fn write_then_read_roundtrip() {
783 let (_dir, store) = fresh_store();
784 let bytes = b"hello world".to_vec();
785 let h = store.write(&bytes).unwrap();
786 assert!(store.contains(&h));
787 let got = store.read(&h).unwrap();
788 assert_eq!(got, bytes);
789 }
790
791 #[test]
792 fn read_object_deserialises() {
793 let (_dir, store) = fresh_store();
794 let obj = Object::Blob(Blob {
795 data: b"object bytes".to_vec(),
796 });
797 let bytes = serialize::serialize(&obj).unwrap();
798 let h = store.write(&bytes).unwrap();
799 let parsed = store.read_object(&h).unwrap();
800 assert_eq!(parsed, obj);
801 }
802
803 #[test]
804 fn write_is_idempotent() {
805 let (_dir, store) = fresh_store();
806 let bytes = b"duplicate".to_vec();
807 let h1 = store.write(&bytes).unwrap();
808 let h2 = store.write(&bytes).unwrap();
809 assert_eq!(h1, h2);
810 // Second write must not have produced any stray temp files.
811 let shard = store.path_for(&h1);
812 let parent = shard.parent().unwrap();
813 let entries: Vec<_> = fs::read_dir(parent).unwrap().collect();
814 assert_eq!(
815 entries.len(),
816 1,
817 "shard dir should contain exactly the final object, no temp leaks"
818 );
819 }
820
821 #[test]
822 fn read_missing_returns_not_found() {
823 let (_dir, store) = fresh_store();
824 let phony = hash::hash(b"never written");
825 let err = store.read(&phony).unwrap_err();
826 assert!(matches!(err, StoreError::ObjectNotFound(_)));
827 assert!(!store.contains(&phony));
828 }
829
830 #[test]
831 fn write_rejects_oversize() {
832 let (_dir, store) = fresh_store();
833 // We obviously can't allocate 1 GiB+1 in a test, so use a small
834 // synthetic threshold check by exercising the guard surface
835 // through a fake slice header. The cleanest portable approach
836 // is to assert the constant directly and rely on the smaller
837 // overflow test below for runtime coverage.
838 let _ = MAX_RAW_OBJECT_SIZE;
839 // Realistic small write still works.
840 let h = store.write(&[0u8; 16]).unwrap();
841 assert!(store.contains(&h));
842 }
843
844 #[test]
845 fn read_rejects_oversize_on_disk() {
846 // Construct an oversize on-disk blob by hand and confirm `read`
847 // refuses it. We use a sentinel size = MAX + 1 file padded with
848 // zeros; this allocates ~1 GiB of disk, which is unfriendly in
849 // unit tests, so instead we monkey-patch via a smaller-cap copy
850 // of the read path: we synthesise a too-large file by truncating
851 // a real one and verifying the comparison logic at the boundary.
852 //
853 // We exercise `MAX_RAW_OBJECT_SIZE` indirectly by writing a
854 // small object and then *replacing* the on-disk file with one
855 // whose `metadata().len()` exceeds the cap. We use sparse
856 // truncation so no real disk is consumed.
857 let (_dir, store) = fresh_store();
858 let h = store.write(b"seed").unwrap();
859 let path = store.path_for(&h);
860 let f = OpenOptions::new().write(true).open(&path).unwrap();
861 // Sparse extend to cap+1 bytes; allocates effectively no blocks.
862 f.set_len(MAX_RAW_OBJECT_SIZE as u64 + 1).unwrap();
863 drop(f);
864 let err = store.read(&h).unwrap_err();
865 assert!(matches!(err, StoreError::ObjectTooLarge));
866 }
867
868 #[test]
869 fn read_detects_corruption() {
870 let (_dir, store) = fresh_store();
871 let bytes = b"trustworthy".to_vec();
872 let h = store.write(&bytes).unwrap();
873 // Flip a single byte in the on-disk file and expect HashMismatch.
874 let path = store.path_for(&h);
875 {
876 let mut f = OpenOptions::new()
877 .read(true)
878 .write(true)
879 .open(&path)
880 .unwrap();
881 f.seek(io::SeekFrom::Start(0)).unwrap();
882 f.write_all(&[bytes[0] ^ 0xFF]).unwrap();
883 f.sync_all().unwrap();
884 }
885 let err = store.read(&h).unwrap_err();
886 match err {
887 StoreError::HashMismatch { expected, actual } => {
888 assert_eq!(expected, to_hex(&h));
889 assert_ne!(actual, expected, "actual must differ once corrupted");
890 }
891 other => panic!("expected HashMismatch, got {other:?}"),
892 }
893 }
894
895 #[test]
896 fn path_layout_is_2_then_62_hex() {
897 let (_dir, store) = fresh_store();
898 let bytes = b"layout test".to_vec();
899 let h = store.write(&bytes).unwrap();
900 let hex = to_hex(&h);
901 let path = store.path_for(&h);
902 let parent = path.parent().unwrap();
903 let parent_name = parent.file_name().unwrap().to_str().unwrap();
904 let file_name = path.file_name().unwrap().to_str().unwrap();
905 assert_eq!(parent_name.len(), 2);
906 assert_eq!(file_name.len(), 62);
907 assert_eq!(parent_name, &hex[..2]);
908 assert_eq!(file_name, &hex[2..]);
909 assert!(path.is_file(), "object file must exist at expected path");
910 }
911
912 #[test]
913 fn temp_file_left_behind_does_not_satisfy_contains() {
914 // Simulate a crash mid-write: drop a stale `.tmp.*` file in the
915 // shard dir without ever renaming. `contains()` must report the
916 // object as absent, and the shard dir must not contain a real
917 // entry that passes the hash check.
918 let (_dir, store) = fresh_store();
919 let target = hash::hash(b"never finalised");
920 let final_path = store.path_for(&target);
921 let shard = final_path.parent().unwrap();
922 fs::create_dir_all(shard).unwrap();
923 let stale = shard.join(format!(
924 ".{}.tmp.0.0",
925 final_path.file_name().unwrap().to_string_lossy()
926 ));
927 fs::write(&stale, b"partial").unwrap();
928 assert!(stale.is_file());
929 assert!(!store.contains(&target));
930 // No file in the shard dir should hash to `target`.
931 for entry in fs::read_dir(shard).unwrap() {
932 let p = entry.unwrap().path();
933 let bytes = fs::read(&p).unwrap();
934 assert_ne!(
935 hash::hash(&bytes),
936 target,
937 "stale temp file must not satisfy the target hash"
938 );
939 }
940 }
941}
942
943#[cfg(test)]
944mod bulk_writer_tests {
945 use super::*;
946
947 #[test]
948 fn bulk_writer_round_trips_and_rewrites() {
949 let td = tempfile::tempdir().unwrap();
950 let store = ObjectStore::init(td.path()).unwrap();
951 let obj = crate::serialize::serialize(&crate::object::Object::Blob(crate::object::Blob {
952 data: b"bulk".to_vec(),
953 }))
954 .unwrap();
955 let mut bw = store.bulk_writer();
956 let h1 = bw.write(&obj).unwrap();
957 // No existence short-circuit: rewriting is fine and heals
958 // torn files on idempotent re-runs.
959 let h2 = bw.write(&obj).unwrap();
960 assert_eq!(h1, h2);
961 bw.commit().unwrap();
962 assert_eq!(store.read(&h1).unwrap(), obj);
963 // Interoperates with the normal (fsynced) writer.
964 assert_eq!(store.write(&obj).unwrap(), h1);
965 }
966}