Skip to main content

objects/store/fs/
fs_impl.rs

1// SPDX-License-Identifier: Apache-2.0
2//! ObjectStore implementation for FsStore.
3
4use std::{
5    fs,
6    path::{Path, PathBuf},
7};
8
9use tracing::{debug, instrument, trace};
10
11use super::{
12    FsStore,
13    fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
14    fs_paths::{
15        action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
16        state_visibility_dir, state_visibility_path, states_dir, trees_dir,
17    },
18};
19use crate::{
20    object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
21    store::{
22        HeddleError, ObjectStore, Result, codec,
23        compression::{header_uncompressed_size, is_compressed},
24        pack::{ObjectType, PackManager, PackObjectId},
25    },
26};
27
28/// Bytes we read off disk to recover a blob's uncompressed size.
29/// Must cover the 9-byte modern header **plus** the 4-byte ZSTD
30/// magic that `header_uncompressed_size` uses to disambiguate
31/// modern from legacy (5-byte) headers — without the magic in the
32/// peek buffer the lookup silently returns the on-disk byte length
33/// instead of the recorded uncompressed size, which left `stat`
34/// reporting the compressed size of every loose blob.
35const BLOB_HEADER_PEEK: usize = 13;
36
37fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
38    tree.validate()?;
39    Ok(tree)
40}
41
42fn validate_blob_bytes(data: &[u8], hash: ContentHash) -> Result<()> {
43    let mut hasher = ContentHash::typed_hasher("blob", data.len() as u64);
44    hasher.update(data);
45    let found = ContentHash::from_bytes(hasher.finalize().into());
46    if found != hash {
47        return Err(HeddleError::Corruption {
48            expected: hash,
49            found,
50        });
51    }
52
53    Ok(())
54}
55
56fn validate_tree_serialized(data: &[u8], hash: ContentHash) -> Result<Tree> {
57    let tree: Tree = rmp_serde::from_slice(data)?;
58    let tree = validate_loaded_tree(tree)?;
59    let found = tree.hash();
60    if found != hash {
61        return Err(HeddleError::Corruption {
62            expected: hash,
63            found,
64        });
65    }
66
67    Ok(tree)
68}
69
70fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
71    if state.change_id != *requested_id {
72        return Err(HeddleError::InvalidObject(format!(
73            "state change_id mismatch: requested {}, found {}",
74            requested_id, state.change_id
75        )));
76    }
77
78    Ok(state)
79}
80
81fn validate_state_serialized(data: &[u8], id: ChangeId) -> Result<State> {
82    let state: State = rmp_serde::from_slice(data)?;
83    validate_loaded_state(&id, state)
84}
85
86fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
87    let found_id = action.compute_id();
88    if found_id != *requested_id {
89        return Err(HeddleError::InvalidObject(format!(
90            "action id mismatch: requested {}, found {}",
91            requested_id, found_id
92        )));
93    }
94
95    Ok(action)
96}
97
98fn validate_action_serialized(data: &[u8], id: ActionId) -> Result<Action> {
99    let action: Action = rmp_serde::from_slice(data)?;
100    validate_loaded_action(&id, action)
101}
102
103/// Validate every entry in a pack against its tagged id (checksum
104/// validation) and return the installed id list. This is the shared
105/// validated core for both install seams: the byte-buffer install
106/// (`install_pack`) and the memory-bounded temp-file install
107/// (`install_pack_streaming`) both run their pack through here, so
108/// both apply the same checksum validation and report the same
109/// installed ids regardless of how the bytes reach the store.
110fn validate_and_list_pack(reader: &crate::store::pack::PackReader) -> Result<Vec<PackObjectId>> {
111    let ids = reader.list_ids();
112    for id in &ids {
113        let Some((obj_type, data)) = reader.get_object_bytes(id)? else {
114            continue;
115        };
116        validate_pack_entry(id, obj_type, data.as_ref())?;
117    }
118    Ok(ids)
119}
120
121fn state_entries_from_pack(
122    reader: &crate::store::pack::PackReader,
123    ids: &[PackObjectId],
124) -> Result<Vec<(ChangeId, Vec<u8>)>> {
125    let mut states = Vec::new();
126    for id in ids {
127        let PackObjectId::ChangeId(change_id) = id else {
128            continue;
129        };
130        let Some((obj_type, data)) = reader.get_object(id)? else {
131            continue;
132        };
133        if obj_type != ObjectType::State {
134            return Err(HeddleError::InvalidObject(format!(
135                "pack id {} is indexed as {:?}, expected State",
136                change_id.to_string_full(),
137                obj_type
138            )));
139        }
140        validate_state_serialized(&data, *change_id)?;
141        states.push((*change_id, data));
142    }
143    Ok(states)
144}
145
146fn validate_pack_entry(id: &PackObjectId, obj_type: ObjectType, data: &[u8]) -> Result<()> {
147    match (id, obj_type) {
148        (PackObjectId::Hash(hash), ObjectType::Blob) => validate_blob_bytes(data, *hash),
149        (PackObjectId::Hash(hash), ObjectType::Tree) => {
150            validate_tree_serialized(data, *hash).map(|_| ())
151        }
152        (PackObjectId::Hash(hash), ObjectType::Action) => {
153            validate_action_serialized(data, ActionId::from_hash(*hash)).map(|_| ())
154        }
155        (PackObjectId::ChangeId(change_id), ObjectType::State) => {
156            validate_state_serialized(data, *change_id).map(|_| ())
157        }
158        _ => Err(HeddleError::InvalidObject(format!(
159            "unsupported native pack object: {:?} {:?}",
160            id, obj_type
161        ))),
162    }
163}
164
165impl FsStore {
166    /// Single-pass blob lookup. The wrapper in `ObjectStore::get_blob`
167    /// retries this once after a stale-reload on miss.
168    fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
169        let path = hash_path(&blobs_dir(&self.root), hash);
170        let loose_exists = path.exists();
171        let pack_has = if loose_exists {
172            false
173        } else if let Ok(manager) = self.pack_manager().read() {
174            manager.has_object(hash)
175        } else {
176            false
177        };
178        if (loose_exists || pack_has)
179            && let Ok(cache) = self.recent_blobs.read()
180            && let Some(blob) = cache.get(hash)
181        {
182            trace!("Found blob in recent object cache");
183            return Ok(Some(blob.clone()));
184        }
185
186        if let Ok(manager) = self.pack_manager().read()
187            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
188            && obj_type == ObjectType::Blob
189        {
190            trace!("Found blob in packfile");
191            // Step 2: skip the BLAKE3 re-hash. The pack reader already
192            // located this entry by its content-addressed key in the
193            // pack index — anything served here either matches or
194            // means the pack itself is corrupted in ways a per-read
195            // hash check can't recover from cleanly. For multi-MB
196            // blobs the verify was the dominant tail of the cold
197            // read (~3GB/s × 10MB ≈ 3.3ms per call).
198            return Ok(Some(Blob::new(data)));
199        }
200
201        match read_file_bytes(&path)? {
202            Some(data) => {
203                trace!(size = data.as_slice().len(), "Blob data read");
204                let content = codec::decode_blob_content(data.as_slice())?;
205                let blob = Blob::new(content);
206                // Loose blobs are bare bytes on disk: a half-written
207                // file or bit-rot inside the payload would slip past
208                // the path-is-the-hash invariant. Keep the verify on
209                // this path. Pack-resident reads above skip it because
210                // pack entries are framed with offset + length records
211                // that fail to parse if the pack is corrupt.
212                if blob.hash() != *hash {
213                    return Err(HeddleError::Corruption {
214                        expected: *hash,
215                        found: blob.hash(),
216                    });
217                }
218                Ok(Some(blob))
219            }
220            None => Ok(None),
221        }
222    }
223
224    /// Shared body for `try_has_{blob,tree,state}_once`: object is
225    /// present iff the loose path exists or the pack manager
226    /// resolves it. Callers pass the loose path and the
227    /// pack-manager probe; the helper handles the lock.
228    fn loose_or_packed(
229        &self,
230        loose_path: &Path,
231        in_pack: impl FnOnce(&PackManager) -> bool,
232    ) -> Result<bool> {
233        if loose_path.exists() {
234            return Ok(true);
235        }
236        if let Ok(manager) = self.pack_manager().read() {
237            return Ok(in_pack(&manager));
238        }
239        Ok(false)
240    }
241
242    fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
243        let path = hash_path(&blobs_dir(&self.root), hash);
244        self.loose_or_packed(&path, |m| m.has_object(hash))
245    }
246
247    /// Header-only size lookup for a single attempt. Tries:
248    /// 1. The recent-blob cache (we already have the bytes in
249    ///    memory — `len()` is free).
250    /// 2. The loose blob: peek the 9-byte compression header. For a
251    ///    compressed blob the recorded uncompressed size lives in the
252    ///    header. For an uncompressed blob (no recognised header) the
253    ///    on-disk file length IS the blob size.
254    /// 3. Any loaded pack: the pack format records the uncompressed
255    ///    size as a varint right after the tagged id, so we can decode
256    ///    it without touching the body.
257    ///
258    /// Cost: one short read (typically 9 bytes) for loose blobs, or a
259    /// pure in-memory varint decode for packed blobs. *No*
260    /// decompression.
261    fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
262        if let Ok(cache) = self.recent_blobs.read()
263            && let Some(blob) = cache.get(hash)
264        {
265            return Ok(Some(blob.content().len() as u64));
266        }
267
268        let path = hash_path(&blobs_dir(&self.root), hash);
269        if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
270            if let Some(size) = header_uncompressed_size(&header) {
271                return Ok(Some(size));
272            }
273            // No recognised compression header — the file is raw
274            // blob bytes. The on-disk length is the blob size.
275            return Ok(Some(file_len));
276        }
277
278        if let Ok(manager) = self.pack_manager().read()
279            && let Some(size) = manager.get_hashed_object_size(hash)?
280        {
281            return Ok(Some(size));
282        }
283        Ok(None)
284    }
285
286    fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
287        // Cache first. The recent-object cache only ever holds trees we
288        // wrote or read this process, so a hit is authoritative for a
289        // read — no need to confirm on-disk existence first. Skipping
290        // that probe removes a `stat` (and a pack-index lookup) per
291        // call, which dominates `heddle status` on directory-heavy
292        // trees (the walker loads one subtree per directory).
293        if let Ok(cache) = self.recent_trees.read()
294            && let Some(tree) = cache.get(hash)
295        {
296            trace!("Found tree in recent object cache");
297            return Ok(Some(tree.clone()));
298        }
299
300        if let Ok(manager) = self.pack_manager().read()
301            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
302            && obj_type == ObjectType::Tree
303        {
304            trace!("Found tree in packfile");
305            let tree = validate_loaded_tree(rmp_serde::from_slice(&data)?)?;
306            if tree.hash() != *hash {
307                return Err(HeddleError::Corruption {
308                    expected: *hash,
309                    found: tree.hash(),
310                });
311            }
312            return Ok(Some(tree));
313        }
314
315        let path = hash_path(&trees_dir(&self.root), hash);
316        match read_file_bytes(&path)? {
317            Some(data) => {
318                trace!(size = data.as_slice().len(), "Tree data read");
319                let tree = validate_loaded_tree(codec::decode_tree(data.as_slice())?)?;
320                if tree.hash() != *hash {
321                    return Err(HeddleError::Corruption {
322                        expected: *hash,
323                        found: tree.hash(),
324                    });
325                }
326                if let Ok(mut cache) = self.recent_trees.write() {
327                    cache.insert(*hash, tree.clone());
328                }
329                Ok(Some(tree))
330            }
331            None => Ok(None),
332        }
333    }
334
335    fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
336        let path = hash_path(&trees_dir(&self.root), hash);
337        self.loose_or_packed(&path, |m| m.has_object(hash))
338    }
339
340    fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
341        let path = state_path(&self.root, id);
342        let loose_exists = path.exists();
343        let pack_has = if loose_exists {
344            false
345        } else if let Ok(manager) = self.pack_manager().read() {
346            manager.has_object_id(&PackObjectId::ChangeId(*id))
347        } else {
348            false
349        };
350        if (loose_exists || pack_has)
351            && let Ok(cache) = self.recent_states.read()
352            && let Some(state) = cache.get(id)
353        {
354            trace!("Found state in recent object cache");
355            return Ok(Some(state.clone()));
356        }
357
358        // States are addressed by `change_id`, NOT by content hash, so the same
359        // id can resolve to two different bodies: a copy packed at adopt/import
360        // time and a newer LOOSE copy written by an authorized rewrite (the #570
361        // fidelity backfill re-hashes + rewrites adopted states loose). The
362        // loose object MUST shadow the packed one — read it FIRST and only
363        // consult the pack when no loose object exists, or a cold read (cache
364        // miss) returns the stale packed body. (Trees/blobs are
365        // content-addressed and can't go stale this way, so their read paths
366        // deliberately keep pack-first ordering.)
367        if loose_exists && let Some(data) = read_file_bytes(&path)? {
368            trace!(
369                size = data.as_slice().len(),
370                "State read from loose object (shadows any packed copy)"
371            );
372            let state = validate_loaded_state(id, codec::decode_state(data.as_slice())?)?;
373            if let Ok(mut cache) = self.recent_states.write() {
374                cache.insert(*id, state.clone());
375            }
376            return Ok(Some(state));
377        }
378
379        if let Ok(manager) = self.pack_manager().read()
380            && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
381            && obj_type == ObjectType::State
382        {
383            trace!("Found state in packfile");
384            let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
385            if let Ok(mut cache) = self.recent_states.write() {
386                cache.insert(*id, state.clone());
387            }
388            return Ok(Some(state));
389        }
390
391        Ok(None)
392    }
393
394    fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
395        let path = state_path(&self.root, id);
396        self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
397    }
398}
399
400impl ObjectStore for FsStore {
401    fn clear_recent_caches(&self) {
402        self.clear_recent_object_caches();
403    }
404
405    /// Zero-copy pack fast path. When the blob lives in a packfile
406    /// and is non-delta + uncompressed, returns a `Bytes::slice`
407    /// view of the pack's mmap — no decompression, no allocation,
408    /// no memcpy. Compressed pack entries, delta entries, and
409    /// loose blobs fall back to `get_blob` and wrap the result in a
410    /// `Bytes` (the `Vec` → `Bytes` conversion is itself zero-copy).
411    fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
412        if let Ok(manager) = self.pack_manager().read()
413            && let Some((obj_type, data)) = manager.get_hashed_object_bytes(hash)?
414            && obj_type == crate::store::pack::ObjectType::Blob
415        {
416            return Ok(Some(data));
417        }
418        Ok(self
419            .get_blob(hash)?
420            .map(|blob| bytes::Bytes::from(blob.into_content())))
421    }
422
423    #[instrument(skip(self), fields(hash = %hash.short()))]
424    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
425        if let Some(blob) = self.try_get_blob_once(hash)? {
426            return Ok(Some(blob));
427        }
428        // Miss path: a sibling FsStore (e.g. the worktree's repo
429        // backing the same `.heddle/`) may have installed a new pack
430        // since we loaded ours. Cheap disk-count check first; full
431        // reload only when the count grew.
432        if self.reload_packs_if_stale()?
433            && let Some(blob) = self.try_get_blob_once(hash)?
434        {
435            return Ok(Some(blob));
436        }
437        trace!("Blob not found");
438        Ok(None)
439    }
440
441    #[instrument(skip(self, blob), fields(size = blob.content().len()))]
442    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
443        let hash = blob.hash();
444        let path = hash_path(&blobs_dir(&self.root), &hash);
445
446        if !path.exists() {
447            let data = codec::encode_blob_content(blob.content(), &self.compression)?;
448            trace!(compressed_size = data.len(), "Writing blob");
449            self.write_loose_object_atomic(&path, &data)?;
450        } else {
451            trace!("Blob already exists, skipping write");
452        }
453        if let Ok(mut cache) = self.recent_blobs.write() {
454            cache.insert(hash, blob.clone());
455        }
456
457        Ok(hash)
458    }
459
460    #[instrument(skip(self, blob), fields(hash = %hash.short()))]
461    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
462        if blob.hash() != hash {
463            return Err(HeddleError::Corruption {
464                expected: hash,
465                found: blob.hash(),
466            });
467        }
468
469        let path = hash_path(&blobs_dir(&self.root), &hash);
470
471        if !path.exists() {
472            let data = codec::encode_blob_content(blob.content(), &self.compression)?;
473            trace!(
474                compressed_size = data.len(),
475                "Writing blob with precomputed hash"
476            );
477            self.write_loose_object_atomic(&path, &data)?;
478        }
479        if let Ok(mut cache) = self.recent_blobs.write() {
480            cache.insert(hash, blob.clone());
481        }
482
483        Ok(hash)
484    }
485
486    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
487    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
488        validate_blob_bytes(data, hash)?;
489
490        let path = hash_path(&blobs_dir(&self.root), &hash);
491        if !path.exists() {
492            trace!(
493                size = data.len(),
494                "Writing raw blob bytes with precomputed hash"
495            );
496            self.write_loose_object_atomic(&path, data)?;
497        }
498        if let Ok(mut cache) = self.recent_blobs.write() {
499            cache.insert(hash, Blob::from_slice(data));
500        }
501
502        Ok(hash)
503    }
504
505    #[instrument(skip(self), fields(hash = %hash.short()))]
506    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
507        if self.try_has_blob_once(hash)? {
508            return Ok(true);
509        }
510        if self.reload_packs_if_stale()? {
511            return self.try_has_blob_once(hash);
512        }
513        Ok(false)
514    }
515
516    /// Loose blob path safe for clonefile/copy materialization.
517    ///
518    /// Returns `Some(path)` only when the loose file exists, is
519    /// stored uncompressed, *and* its bytes hash to the expected
520    /// content hash. Compressed blobs and pack-only blobs fall
521    /// through to `None`; so do *torn* cache-mirror files (the
522    /// `AtomicWriteMode::NoSync` write side may leave one if the
523    /// host crashed during a previous promote). On the torn case
524    /// the caller re-promotes from the authoritative pack copy.
525    ///
526    /// Verification is amortised: a hash that passes the check once
527    /// in this process is recorded in `verified_loose_blobs` and
528    /// subsequent calls skip the read+hash. So the cost on the
529    /// materialize hot path is at most one BLAKE3 over each unique
530    /// blob per process lifetime — negligible for tiny blobs,
531    /// bounded by working-set size for huge ones.
532    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
533        let path = hash_path(&blobs_dir(&self.root), hash);
534        // Fast path: this process already verified (or wrote) this
535        // hash's loose mirror in `promote_to_loose_uncompressed`.
536        // Trust without re-hashing — `path.exists()` is the only
537        // I/O we need.
538        if let Ok(verified) = self.verified_loose_blobs.read()
539            && verified.get(hash).is_some()
540            && path.exists()
541        {
542            return Some(path);
543        }
544
545        // First-time-this-process check: peek the header to filter
546        // out compressed-loose files cheaply, then verify the
547        // body's hash matches what the caller expects. A torn-write
548        // (post-crash) cache mirror fails this and the caller
549        // re-promotes from the pack.
550        //
551        // Header peek must cover the 9-byte modern header **plus**
552        // the 4-byte ZSTD magic that `is_compressed` checks —
553        // peeking only 9 bytes makes `is_compressed` falsely
554        // return `false` on a properly-compressed blob, and we'd
555        // hand the caller the compressed file path. Same off-by-4
556        // we fixed in `BLOB_HEADER_PEEK`.
557        let (header, _) = read_file_header(&path, BLOB_HEADER_PEEK).ok().flatten()?;
558        if is_compressed(&header) {
559            return None;
560        }
561        let bytes = read_file_bytes(&path).ok().flatten()?;
562        let actual = ContentHash::compute_typed("blob", bytes.as_slice());
563        if actual != *hash {
564            // Torn write or unrelated corruption. Leave the file on
565            // disk; the caller's `promote_to_loose_uncompressed`
566            // will overwrite it via the standard temp+rename path.
567            return None;
568        }
569        if let Ok(mut verified) = self.verified_loose_blobs.write() {
570            verified.insert(*hash, ());
571        }
572        Some(path)
573    }
574
575    /// Promote a blob to its uncompressed-loose canonical path so
576    /// `loose_blob_path` returns `Some(path)` and hardlink-first
577    /// materialization fires.
578    ///
579    /// Three cases:
580    /// 1. Already loose+uncompressed: peek the header, no-op.
581    /// 2. Loose but compressed: read+decompress, atomically rewrite
582    ///    the canonical path with raw bytes.
583    /// 3. Pack-only: read out of the pack via `get_blob`, atomically
584    ///    write to the canonical loose path. Pack copy is left in
585    ///    place — the next prune cycle will discard the loose mirror
586    ///    and a future materialize will re-promote.
587    #[instrument(skip(self), fields(hash = %hash.short()))]
588    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
589        let path = hash_path(&blobs_dir(&self.root), hash);
590
591        // Idempotent fast path: already loose AND uncompressed.
592        if let Some((header, _)) = read_file_header(&path, 9)?
593            && !is_compressed(&header)
594        {
595            trace!("Blob already loose+uncompressed; skipping promotion");
596            return Ok(false);
597        }
598
599        // Either compressed-loose or pack-only. Reading via
600        // `get_blob` covers both: compressed-loose decompresses on
601        // the way out, pack-only reads from the loaded pack manager.
602        let blob = self.get_blob(hash)?.ok_or_else(|| {
603            HeddleError::NotFound(format!(
604                "blob {} not found in store; cannot promote to loose-uncompressed",
605                hash
606            ))
607        })?;
608
609        // Install the uncompressed bytes at the canonical loose path
610        // via the cache-mirror atomic-write variant: no fsync, just
611        // temp+rename. The fsync skip is what makes promotion fast
612        // (measured: ~5 ms/blob with `sync_data` vs ~0.2 ms without
613        // on macOS APFS); the safety comes from the read-side hash
614        // check in `loose_blob_path`. A torn write after a crash
615        // produces a file whose content hash doesn't match, so the
616        // next reader rejects it and re-promotes from the pack.
617        //
618        // Record the hash in this process's verified-blobs cache:
619        // we just wrote the bytes ourselves, so the subsequent read
620        // path can trust them without re-hashing.
621        debug!(
622            size = blob.content().len(),
623            "Promoting blob to loose-uncompressed canonical store"
624        );
625        self.write_loose_object_cache(&path, blob.content())?;
626        if let Ok(mut verified) = self.verified_loose_blobs.write() {
627            verified.insert(*hash, ());
628        }
629        Ok(true)
630    }
631
632    #[instrument(skip(self), fields(hash = %hash.short()))]
633    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
634        if let Some(size) = self.try_get_blob_size_once(hash)? {
635            return Ok(Some(size));
636        }
637        // Sibling-store recovery, mirroring the read path: if a
638        // concurrent writer just installed a pack we don't know about,
639        // reload and retry once before reporting a miss.
640        if self.reload_packs_if_stale()?
641            && let Some(size) = self.try_get_blob_size_once(hash)?
642        {
643            return Ok(Some(size));
644        }
645        Ok(None)
646    }
647
648    #[instrument(skip(self), fields(hash = %hash.short()))]
649    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
650        if let Some(tree) = self.try_get_tree_once(hash)? {
651            return Ok(Some(tree));
652        }
653        if self.reload_packs_if_stale()?
654            && let Some(tree) = self.try_get_tree_once(hash)?
655        {
656            return Ok(Some(tree));
657        }
658        trace!("Tree not found");
659        Ok(None)
660    }
661
662    #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
663    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
664        let hash = tree.hash();
665        let path = hash_path(&trees_dir(&self.root), &hash);
666
667        if !path.exists() {
668            let (_, data) = codec::encode_tree(tree, &self.compression)?;
669            trace!(compressed_size = data.len(), "Writing tree");
670            self.write_loose_object_atomic(&path, &data)?;
671        } else {
672            trace!("Tree already exists, skipping write");
673        }
674        if let Ok(mut cache) = self.recent_trees.write() {
675            cache.insert(hash, tree.clone());
676        }
677
678        Ok(hash)
679    }
680
681    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
682    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
683        let tree = validate_tree_serialized(data, hash)?;
684
685        let path = hash_path(&trees_dir(&self.root), &hash);
686        if !path.exists() {
687            trace!(size = data.len(), "Writing raw serialized tree");
688            self.write_loose_object_atomic(&path, data)?;
689        }
690        if let Ok(mut cache) = self.recent_trees.write() {
691            cache.insert(hash, tree);
692        }
693
694        Ok(hash)
695    }
696
697    #[instrument(skip(self), fields(hash = %hash.short()))]
698    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
699        if self.try_has_tree_once(hash)? {
700            return Ok(true);
701        }
702        if self.reload_packs_if_stale()? {
703            return self.try_has_tree_once(hash);
704        }
705        Ok(false)
706    }
707
708    #[instrument(skip(self), fields(id = %id.short()))]
709    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
710        if let Some(state) = self.try_get_state_once(id)? {
711            return Ok(Some(state));
712        }
713        if self.reload_packs_if_stale()?
714            && let Some(state) = self.try_get_state_once(id)?
715        {
716            return Ok(Some(state));
717        }
718        trace!("State not found");
719        Ok(None)
720    }
721
722    #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
723    fn put_state(&self, state: &State) -> Result<()> {
724        let path = state_path(&self.root, &state.change_id);
725        let data = codec::encode_state(state, &self.compression)?;
726        trace!(compressed_size = data.len(), "Writing state");
727        self.write_loose_object_atomic(&path, &data)?;
728        if let Ok(mut cache) = self.recent_states.write() {
729            cache.insert(state.change_id, state.clone());
730        }
731        Ok(())
732    }
733
734    #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
735    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
736        let state = validate_state_serialized(data, id)?;
737        let path = state_path(&self.root, &id);
738        trace!(size = data.len(), "Writing raw serialized state");
739        self.write_loose_object_atomic(&path, data)?;
740        if let Ok(mut cache) = self.recent_states.write() {
741            cache.insert(id, state);
742        }
743        Ok(())
744    }
745
746    #[instrument(skip(self), fields(id = %id.short()))]
747    fn has_state(&self, id: &ChangeId) -> Result<bool> {
748        if self.try_has_state_once(id)? {
749            return Ok(true);
750        }
751        if self.reload_packs_if_stale()? {
752            return self.try_has_state_once(id);
753        }
754        Ok(false)
755    }
756
757    #[instrument(skip(self))]
758    fn list_states(&self) -> Result<Vec<ChangeId>> {
759        self.reload_packs_if_stale()?;
760
761        let dir = states_dir(&self.root);
762        if !dir.exists() {
763            return Ok(Vec::new());
764        }
765
766        let mut states = Vec::new();
767        for entry in fs::read_dir(&dir)? {
768            let entry = entry?;
769            let path = entry.path();
770            if let Some(name) = path.file_stem()
771                && let Some(name_str) = name.to_str()
772                && let Ok(id) = ChangeId::parse(name_str)
773            {
774                states.push(id);
775            }
776        }
777        if let Ok(manager) = self.pack_manager().read() {
778            for id in manager.list_all_ids()? {
779                if let PackObjectId::ChangeId(change_id) = id
780                    && !states.contains(&change_id)
781                {
782                    states.push(change_id);
783                }
784            }
785        }
786        debug!(count = states.len(), "Listed states");
787        Ok(states)
788    }
789
790    #[instrument(skip(self), fields(id = %id))]
791    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
792        let path = action_path(&self.root, id);
793        if !path.exists()
794            && let Ok(manager) = self.pack_manager().read()
795            && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
796            && obj_type == ObjectType::Action
797        {
798            trace!("Found action in packfile");
799            let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
800            return Ok(Some(action));
801        }
802        match read_file_bytes(&path)? {
803            Some(data) => {
804                trace!(size = data.as_slice().len(), "Action data read");
805                let action = validate_loaded_action(id, codec::decode_action(data.as_slice())?)?;
806                Ok(Some(action))
807            }
808            None => {
809                trace!("Action not found");
810                Ok(None)
811            }
812        }
813    }
814
815    #[instrument(skip(self, action))]
816    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
817        let id = action.id();
818        let path = action_path(&self.root, &id);
819
820        if !path.exists() {
821            let (_, data) = codec::encode_action(action, &self.compression)?;
822            trace!(id = %id, compressed_size = data.len(), "Writing action");
823            self.write_loose_object_atomic(&path, &data)?;
824        }
825
826        Ok(id)
827    }
828
829    #[instrument(skip(self))]
830    fn list_actions(&self) -> Result<Vec<ActionId>> {
831        let dir = actions_dir(&self.root);
832        let mut actions = Vec::new();
833        if dir.exists() {
834            for entry in fs::read_dir(&dir)? {
835                let entry = entry?;
836                let path = entry.path();
837                if let Some(name) = path.file_stem()
838                    && let Some(name_str) = name.to_str()
839                    && let Ok(hash) = ContentHash::from_hex(name_str)
840                {
841                    actions.push(ActionId::from_hash(hash));
842                }
843            }
844        }
845        if let Ok(manager) = self.pack_manager().read() {
846            for id in manager.list_all_ids()? {
847                if let PackObjectId::Hash(hash) = id
848                    && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
849                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
850                    && obj_type == ObjectType::Action
851                {
852                    actions.push(ActionId::from_hash(hash));
853                }
854            }
855        }
856        debug!(count = actions.len(), "Listed actions");
857        Ok(actions)
858    }
859
860    #[instrument(skip(self))]
861    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
862        let dir = blobs_dir(&self.root);
863        let mut blobs = list_hashes_from_dir(&dir)?;
864        if let Ok(manager) = self.pack_manager().read() {
865            for id in manager.list_all_ids()? {
866                if let PackObjectId::Hash(hash) = id
867                    && !blobs.contains(&hash)
868                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
869                    && obj_type == ObjectType::Blob
870                {
871                    blobs.push(hash);
872                }
873            }
874        }
875        Ok(blobs)
876    }
877
878    #[instrument(skip(self))]
879    fn list_trees(&self) -> Result<Vec<ContentHash>> {
880        let dir = trees_dir(&self.root);
881        let mut trees = list_hashes_from_dir(&dir)?;
882        if let Ok(manager) = self.pack_manager().read() {
883            for id in manager.list_all_ids()? {
884                if let PackObjectId::Hash(hash) = id
885                    && !trees.contains(&hash)
886                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
887                    && obj_type == ObjectType::Tree
888                {
889                    trees.push(hash);
890                }
891            }
892        }
893        Ok(trees)
894    }
895
896    #[instrument(skip(self))]
897    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
898        self.pack_objects_impl(aggressive)
899    }
900
901    #[instrument(skip(self), fields(id = ?id))]
902    fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
903        if let Ok(manager) = self.pack_manager().read()
904            && let Some((obj_type, data)) = manager.get_object(id)?
905        {
906            return Ok(Some((obj_type, data)));
907        }
908
909        match id {
910            PackObjectId::Hash(hash) => {
911                if let Some(blob) = self.get_blob(hash)? {
912                    return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
913                }
914                if let Some(tree) = self.get_tree(hash)? {
915                    return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
916                }
917                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
918                    return Ok(Some((
919                        ObjectType::Action,
920                        rmp_serde::to_vec_named(&action)?,
921                    )));
922                }
923                Ok(None)
924            }
925            PackObjectId::ChangeId(change_id) => {
926                if let Some(state) = self.get_state(change_id)? {
927                    Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
928                } else {
929                    Ok(None)
930                }
931            }
932        }
933    }
934
935    #[instrument(skip(self, pack_data, index_data))]
936    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
937        let reader = crate::store::pack::PackReader::from_slice(pack_data, index_data)?;
938        let ids = validate_and_list_pack(&reader)?;
939        let state_entries = state_entries_from_pack(&reader, &ids)?;
940        self.install_pack_files(pack_data, index_data)?;
941        for (id, data) in state_entries {
942            self.put_state_serialized(&data, id)?;
943        }
944        self.clear_recent_object_caches();
945        Ok(ids)
946    }
947
948    #[instrument(skip(self, blobs), fields(count = blobs.len()))]
949    fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
950        self.put_blobs_packed_impl(blobs)
951    }
952
953    #[instrument(skip(self))]
954    fn install_pack_streaming(
955        &self,
956        pack_path: &std::path::Path,
957        index_path: &std::path::Path,
958    ) -> Result<Vec<PackObjectId>> {
959        // Validate + list ids through the same core as the byte-buffer
960        // seam, but via an mmap-backed reader so the pack is never
961        // copied into the heap — the memory-bounded promise survives.
962        // Drop the reader (releasing the mmap) before the rename so
963        // the file move isn't racing an open mapping.
964        let ids = {
965            let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
966            validate_and_list_pack(&reader)?
967        };
968        let state_entries = {
969            let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
970            state_entries_from_pack(&reader, &ids)?
971        };
972        self.install_pack_files_streaming(pack_path, index_path)?;
973        for (id, data) in state_entries {
974            self.put_state_serialized(&data, id)?;
975        }
976        Ok(ids)
977    }
978
979    #[instrument(skip(self))]
980    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
981        self.prune_loose_objects_impl()
982    }
983
984    #[instrument(skip(self))]
985    fn begin_snapshot_write_batch(&self) -> Result<()> {
986        self.begin_snapshot_write_batch_impl()
987    }
988
989    #[instrument(skip(self))]
990    fn flush_snapshot_write_batch(&self) -> Result<()> {
991        self.flush_snapshot_write_batch_impl()
992    }
993
994    #[instrument(skip(self))]
995    fn abort_snapshot_write_batch(&self) {
996        self.abort_snapshot_write_batch_impl();
997    }
998
999    fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
1000        Ok(redaction_path(&self.root, blob).exists())
1001    }
1002
1003    fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
1004        let path = redaction_path(&self.root, blob);
1005        match fs::read(&path) {
1006            Ok(bytes) => Ok(Some(bytes)),
1007            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1008            Err(err) => Err(HeddleError::Io(err)),
1009        }
1010    }
1011
1012    fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
1013        let dir = redactions_dir(&self.root);
1014        if !dir.exists() {
1015            fs::create_dir_all(&dir)?;
1016        }
1017        let path = redaction_path(&self.root, blob);
1018        crate::fs_atomic::write_file_atomic(&path, bytes)?;
1019        Ok(())
1020    }
1021
1022    fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
1023        let dir = redactions_dir(&self.root);
1024        if !dir.exists() {
1025            return Ok(Vec::new());
1026        }
1027        let mut out = Vec::new();
1028        for entry in fs::read_dir(&dir)? {
1029            let entry = entry?;
1030            let path = entry.path();
1031            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1032                continue;
1033            }
1034            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1035                continue;
1036            };
1037            if let Ok(hash) = ContentHash::from_hex(stem) {
1038                out.push(hash);
1039            }
1040        }
1041        Ok(out)
1042    }
1043
1044    fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
1045        Ok(state_visibility_path(&self.root, state).exists())
1046    }
1047
1048    fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
1049        let path = state_visibility_path(&self.root, state);
1050        match fs::read(&path) {
1051            Ok(bytes) => Ok(Some(bytes)),
1052            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1053            Err(err) => Err(HeddleError::Io(err)),
1054        }
1055    }
1056
1057    fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
1058        let dir = state_visibility_dir(&self.root);
1059        if !dir.exists() {
1060            fs::create_dir_all(&dir)?;
1061        }
1062        let path = state_visibility_path(&self.root, state);
1063        crate::fs_atomic::write_file_atomic(&path, bytes)?;
1064        Ok(())
1065    }
1066
1067    fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
1068        let dir = state_visibility_dir(&self.root);
1069        if !dir.exists() {
1070            return Ok(Vec::new());
1071        }
1072        let mut out = Vec::new();
1073        for entry in fs::read_dir(&dir)? {
1074            let entry = entry?;
1075            let path = entry.path();
1076            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1077                continue;
1078            }
1079            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1080                continue;
1081            };
1082            if let Ok(state) = ChangeId::parse(stem) {
1083                out.push(state);
1084            }
1085        }
1086        Ok(out)
1087    }
1088}