Skip to main content

objects/store/fs/
fs_impl.rs

1// SPDX-License-Identifier: Apache-2.0
2//! ObjectStore implementation for FsStore.
3
4use std::{
5    fs,
6    path::{Path, PathBuf},
7};
8
9use tracing::{debug, instrument, trace};
10
11use super::{
12    FsStore,
13    fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
14    fs_paths::{
15        action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
16        state_visibility_dir, state_visibility_path, states_dir, trees_dir,
17    },
18};
19use crate::{
20    object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
21    store::{
22        HeddleError, ObjectStore, Result, codec,
23        compression::{header_uncompressed_size, is_compressed},
24        pack::{ObjectType, PackManager, PackObjectId},
25    },
26};
27
28/// Bytes we read off disk to recover a blob's uncompressed size.
29/// Must cover the 9-byte modern header **plus** the 4-byte ZSTD
30/// magic that `header_uncompressed_size` uses to disambiguate
31/// modern from legacy (5-byte) headers — without the magic in the
32/// peek buffer the lookup silently returns the on-disk byte length
33/// instead of the recorded uncompressed size, which left `stat`
34/// reporting the compressed size of every loose blob.
35const BLOB_HEADER_PEEK: usize = 13;
36
37fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
38    tree.validate()?;
39    Ok(tree)
40}
41
42fn validate_blob_bytes(data: &[u8], hash: ContentHash) -> Result<()> {
43    let mut hasher = ContentHash::typed_hasher("blob", data.len() as u64);
44    hasher.update(data);
45    let found = ContentHash::from_bytes(hasher.finalize().into());
46    if found != hash {
47        return Err(HeddleError::Corruption {
48            expected: hash,
49            found,
50        });
51    }
52
53    Ok(())
54}
55
56fn validate_tree_serialized(data: &[u8], hash: ContentHash) -> Result<Tree> {
57    let tree: Tree = rmp_serde::from_slice(data)?;
58    let tree = validate_loaded_tree(tree)?;
59    let found = tree.hash();
60    if found != hash {
61        return Err(HeddleError::Corruption {
62            expected: hash,
63            found,
64        });
65    }
66
67    Ok(tree)
68}
69
70fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
71    if state.change_id != *requested_id {
72        return Err(HeddleError::InvalidObject(format!(
73            "state change_id mismatch: requested {}, found {}",
74            requested_id, state.change_id
75        )));
76    }
77
78    Ok(state)
79}
80
81fn validate_state_serialized(data: &[u8], id: ChangeId) -> Result<State> {
82    let state: State = rmp_serde::from_slice(data)?;
83    validate_loaded_state(&id, state)
84}
85
86fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
87    let found_id = action.compute_id();
88    if found_id != *requested_id {
89        return Err(HeddleError::InvalidObject(format!(
90            "action id mismatch: requested {}, found {}",
91            requested_id, found_id
92        )));
93    }
94
95    Ok(action)
96}
97
98fn validate_action_serialized(data: &[u8], id: ActionId) -> Result<Action> {
99    let action: Action = rmp_serde::from_slice(data)?;
100    validate_loaded_action(&id, action)
101}
102
103/// Validate every entry in a pack against its tagged id (checksum
104/// validation) and return the installed id list. This is the shared
105/// validated core for both install seams: the byte-buffer install
106/// (`install_pack`) and the memory-bounded temp-file install
107/// (`install_pack_streaming`) both run their pack through here, so
108/// both apply the same checksum validation and report the same
109/// installed ids regardless of how the bytes reach the store.
110fn validate_and_list_pack(reader: &crate::store::pack::PackReader) -> Result<Vec<PackObjectId>> {
111    let ids = reader.list_ids();
112    for id in &ids {
113        let Some((obj_type, data)) = reader.get_object_bytes(id)? else {
114            continue;
115        };
116        validate_pack_entry(id, obj_type, data.as_ref())?;
117    }
118    Ok(ids)
119}
120
121fn validate_pack_entry(id: &PackObjectId, obj_type: ObjectType, data: &[u8]) -> Result<()> {
122    match (id, obj_type) {
123        (PackObjectId::Hash(hash), ObjectType::Blob) => validate_blob_bytes(data, *hash),
124        (PackObjectId::Hash(hash), ObjectType::Tree) => {
125            validate_tree_serialized(data, *hash).map(|_| ())
126        }
127        (PackObjectId::Hash(hash), ObjectType::Action) => {
128            validate_action_serialized(data, ActionId::from_hash(*hash)).map(|_| ())
129        }
130        (PackObjectId::ChangeId(change_id), ObjectType::State) => {
131            validate_state_serialized(data, *change_id).map(|_| ())
132        }
133        _ => Err(HeddleError::InvalidObject(format!(
134            "unsupported native pack object: {:?} {:?}",
135            id, obj_type
136        ))),
137    }
138}
139
140impl FsStore {
141    /// Single-pass blob lookup. The wrapper in `ObjectStore::get_blob`
142    /// retries this once after a stale-reload on miss.
143    fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
144        let path = hash_path(&blobs_dir(&self.root), hash);
145        let loose_exists = path.exists();
146        let pack_has = if loose_exists {
147            false
148        } else if let Ok(manager) = self.pack_manager().read() {
149            manager.has_object(hash)
150        } else {
151            false
152        };
153        if (loose_exists || pack_has)
154            && let Ok(cache) = self.recent_blobs.read()
155            && let Some(blob) = cache.get(hash)
156        {
157            trace!("Found blob in recent object cache");
158            return Ok(Some(blob.clone()));
159        }
160
161        if let Ok(manager) = self.pack_manager().read()
162            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
163            && obj_type == ObjectType::Blob
164        {
165            trace!("Found blob in packfile");
166            // Step 2: skip the BLAKE3 re-hash. The pack reader already
167            // located this entry by its content-addressed key in the
168            // pack index — anything served here either matches or
169            // means the pack itself is corrupted in ways a per-read
170            // hash check can't recover from cleanly. For multi-MB
171            // blobs the verify was the dominant tail of the cold
172            // read (~3GB/s × 10MB ≈ 3.3ms per call).
173            return Ok(Some(Blob::new(data)));
174        }
175
176        match read_file_bytes(&path)? {
177            Some(data) => {
178                trace!(size = data.as_slice().len(), "Blob data read");
179                let content = codec::decode_blob_content(data.as_slice())?;
180                let blob = Blob::new(content);
181                // Loose blobs are bare bytes on disk: a half-written
182                // file or bit-rot inside the payload would slip past
183                // the path-is-the-hash invariant. Keep the verify on
184                // this path. Pack-resident reads above skip it because
185                // pack entries are framed with offset + length records
186                // that fail to parse if the pack is corrupt.
187                if blob.hash() != *hash {
188                    return Err(HeddleError::Corruption {
189                        expected: *hash,
190                        found: blob.hash(),
191                    });
192                }
193                Ok(Some(blob))
194            }
195            None => Ok(None),
196        }
197    }
198
199    /// Shared body for `try_has_{blob,tree,state}_once`: object is
200    /// present iff the loose path exists or the pack manager
201    /// resolves it. Callers pass the loose path and the
202    /// pack-manager probe; the helper handles the lock.
203    fn loose_or_packed(
204        &self,
205        loose_path: &Path,
206        in_pack: impl FnOnce(&PackManager) -> bool,
207    ) -> Result<bool> {
208        if loose_path.exists() {
209            return Ok(true);
210        }
211        if let Ok(manager) = self.pack_manager().read() {
212            return Ok(in_pack(&manager));
213        }
214        Ok(false)
215    }
216
217    fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
218        let path = hash_path(&blobs_dir(&self.root), hash);
219        self.loose_or_packed(&path, |m| m.has_object(hash))
220    }
221
222    /// Header-only size lookup for a single attempt. Tries:
223    /// 1. The recent-blob cache (we already have the bytes in
224    ///    memory — `len()` is free).
225    /// 2. The loose blob: peek the 9-byte compression header. For a
226    ///    compressed blob the recorded uncompressed size lives in the
227    ///    header. For an uncompressed blob (no recognised header) the
228    ///    on-disk file length IS the blob size.
229    /// 3. Any loaded pack: the pack format records the uncompressed
230    ///    size as a varint right after the tagged id, so we can decode
231    ///    it without touching the body.
232    ///
233    /// Cost: one short read (typically 9 bytes) for loose blobs, or a
234    /// pure in-memory varint decode for packed blobs. *No*
235    /// decompression.
236    fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
237        if let Ok(cache) = self.recent_blobs.read()
238            && let Some(blob) = cache.get(hash)
239        {
240            return Ok(Some(blob.content().len() as u64));
241        }
242
243        let path = hash_path(&blobs_dir(&self.root), hash);
244        if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
245            if let Some(size) = header_uncompressed_size(&header) {
246                return Ok(Some(size));
247            }
248            // No recognised compression header — the file is raw
249            // blob bytes. The on-disk length is the blob size.
250            return Ok(Some(file_len));
251        }
252
253        if let Ok(manager) = self.pack_manager().read()
254            && let Some(size) = manager.get_hashed_object_size(hash)?
255        {
256            return Ok(Some(size));
257        }
258        Ok(None)
259    }
260
261    fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
262        // Cache first. The recent-object cache only ever holds trees we
263        // wrote or read this process, so a hit is authoritative for a
264        // read — no need to confirm on-disk existence first. Skipping
265        // that probe removes a `stat` (and a pack-index lookup) per
266        // call, which dominates `heddle status` on directory-heavy
267        // trees (the walker loads one subtree per directory).
268        if let Ok(cache) = self.recent_trees.read()
269            && let Some(tree) = cache.get(hash)
270        {
271            trace!("Found tree in recent object cache");
272            return Ok(Some(tree.clone()));
273        }
274
275        if let Ok(manager) = self.pack_manager().read()
276            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
277            && obj_type == ObjectType::Tree
278        {
279            trace!("Found tree in packfile");
280            let tree = validate_loaded_tree(rmp_serde::from_slice(&data)?)?;
281            if tree.hash() != *hash {
282                return Err(HeddleError::Corruption {
283                    expected: *hash,
284                    found: tree.hash(),
285                });
286            }
287            return Ok(Some(tree));
288        }
289
290        let path = hash_path(&trees_dir(&self.root), hash);
291        match read_file_bytes(&path)? {
292            Some(data) => {
293                trace!(size = data.as_slice().len(), "Tree data read");
294                let tree = validate_loaded_tree(codec::decode_tree(data.as_slice())?)?;
295                if tree.hash() != *hash {
296                    return Err(HeddleError::Corruption {
297                        expected: *hash,
298                        found: tree.hash(),
299                    });
300                }
301                if let Ok(mut cache) = self.recent_trees.write() {
302                    cache.insert(*hash, tree.clone());
303                }
304                Ok(Some(tree))
305            }
306            None => Ok(None),
307        }
308    }
309
310    fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
311        let path = hash_path(&trees_dir(&self.root), hash);
312        self.loose_or_packed(&path, |m| m.has_object(hash))
313    }
314
315    fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
316        let path = state_path(&self.root, id);
317        let loose_exists = path.exists();
318        let pack_has = if loose_exists {
319            false
320        } else if let Ok(manager) = self.pack_manager().read() {
321            manager.has_object_id(&PackObjectId::ChangeId(*id))
322        } else {
323            false
324        };
325        if (loose_exists || pack_has)
326            && let Ok(cache) = self.recent_states.read()
327            && let Some(state) = cache.get(id)
328        {
329            trace!("Found state in recent object cache");
330            return Ok(Some(state.clone()));
331        }
332
333        // States are addressed by `change_id`, NOT by content hash, so the same
334        // id can resolve to two different bodies: a copy packed at adopt/import
335        // time and a newer LOOSE copy written by an authorized rewrite (the #570
336        // fidelity backfill re-hashes + rewrites adopted states loose). The
337        // loose object MUST shadow the packed one — read it FIRST and only
338        // consult the pack when no loose object exists, or a cold read (cache
339        // miss) returns the stale packed body. (Trees/blobs are
340        // content-addressed and can't go stale this way, so their read paths
341        // deliberately keep pack-first ordering.)
342        if loose_exists && let Some(data) = read_file_bytes(&path)? {
343            trace!(
344                size = data.as_slice().len(),
345                "State read from loose object (shadows any packed copy)"
346            );
347            let state = validate_loaded_state(id, codec::decode_state(data.as_slice())?)?;
348            if let Ok(mut cache) = self.recent_states.write() {
349                cache.insert(*id, state.clone());
350            }
351            return Ok(Some(state));
352        }
353
354        if let Ok(manager) = self.pack_manager().read()
355            && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
356            && obj_type == ObjectType::State
357        {
358            trace!("Found state in packfile");
359            let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
360            if let Ok(mut cache) = self.recent_states.write() {
361                cache.insert(*id, state.clone());
362            }
363            return Ok(Some(state));
364        }
365
366        Ok(None)
367    }
368
369    fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
370        let path = state_path(&self.root, id);
371        self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
372    }
373}
374
375impl ObjectStore for FsStore {
376    fn clear_recent_caches(&self) {
377        self.clear_recent_object_caches();
378    }
379
380    /// Zero-copy pack fast path. When the blob lives in a packfile
381    /// and is non-delta + uncompressed, returns a `Bytes::slice`
382    /// view of the pack's mmap — no decompression, no allocation,
383    /// no memcpy. Compressed pack entries, delta entries, and
384    /// loose blobs fall back to `get_blob` and wrap the result in a
385    /// `Bytes` (the `Vec` → `Bytes` conversion is itself zero-copy).
386    fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
387        if let Ok(manager) = self.pack_manager().read()
388            && let Some((obj_type, data)) = manager.get_hashed_object_bytes(hash)?
389            && obj_type == crate::store::pack::ObjectType::Blob
390        {
391            return Ok(Some(data));
392        }
393        Ok(self
394            .get_blob(hash)?
395            .map(|blob| bytes::Bytes::from(blob.into_content())))
396    }
397
398    #[instrument(skip(self), fields(hash = %hash.short()))]
399    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
400        if let Some(blob) = self.try_get_blob_once(hash)? {
401            return Ok(Some(blob));
402        }
403        // Miss path: a sibling FsStore (e.g. the worktree's repo
404        // backing the same `.heddle/`) may have installed a new pack
405        // since we loaded ours. Cheap disk-count check first; full
406        // reload only when the count grew.
407        if self.reload_packs_if_stale()?
408            && let Some(blob) = self.try_get_blob_once(hash)?
409        {
410            return Ok(Some(blob));
411        }
412        trace!("Blob not found");
413        Ok(None)
414    }
415
416    #[instrument(skip(self, blob), fields(size = blob.content().len()))]
417    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
418        let hash = blob.hash();
419        let path = hash_path(&blobs_dir(&self.root), &hash);
420
421        if !path.exists() {
422            let data = codec::encode_blob_content(blob.content(), &self.compression)?;
423            trace!(compressed_size = data.len(), "Writing blob");
424            self.write_loose_object_atomic(&path, &data)?;
425        } else {
426            trace!("Blob already exists, skipping write");
427        }
428        if let Ok(mut cache) = self.recent_blobs.write() {
429            cache.insert(hash, blob.clone());
430        }
431
432        Ok(hash)
433    }
434
435    #[instrument(skip(self, blob), fields(hash = %hash.short()))]
436    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
437        if blob.hash() != hash {
438            return Err(HeddleError::Corruption {
439                expected: hash,
440                found: blob.hash(),
441            });
442        }
443
444        let path = hash_path(&blobs_dir(&self.root), &hash);
445
446        if !path.exists() {
447            let data = codec::encode_blob_content(blob.content(), &self.compression)?;
448            trace!(
449                compressed_size = data.len(),
450                "Writing blob with precomputed hash"
451            );
452            self.write_loose_object_atomic(&path, &data)?;
453        }
454        if let Ok(mut cache) = self.recent_blobs.write() {
455            cache.insert(hash, blob.clone());
456        }
457
458        Ok(hash)
459    }
460
461    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
462    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
463        validate_blob_bytes(data, hash)?;
464
465        let path = hash_path(&blobs_dir(&self.root), &hash);
466        if !path.exists() {
467            trace!(
468                size = data.len(),
469                "Writing raw blob bytes with precomputed hash"
470            );
471            self.write_loose_object_atomic(&path, data)?;
472        }
473        if let Ok(mut cache) = self.recent_blobs.write() {
474            cache.insert(hash, Blob::from_slice(data));
475        }
476
477        Ok(hash)
478    }
479
480    #[instrument(skip(self), fields(hash = %hash.short()))]
481    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
482        if self.try_has_blob_once(hash)? {
483            return Ok(true);
484        }
485        if self.reload_packs_if_stale()? {
486            return self.try_has_blob_once(hash);
487        }
488        Ok(false)
489    }
490
491    /// Loose blob path safe for clonefile/copy materialization.
492    ///
493    /// Returns `Some(path)` only when the loose file exists, is
494    /// stored uncompressed, *and* its bytes hash to the expected
495    /// content hash. Compressed blobs and pack-only blobs fall
496    /// through to `None`; so do *torn* cache-mirror files (the
497    /// `AtomicWriteMode::NoSync` write side may leave one if the
498    /// host crashed during a previous promote). On the torn case
499    /// the caller re-promotes from the authoritative pack copy.
500    ///
501    /// Verification is amortised: a hash that passes the check once
502    /// in this process is recorded in `verified_loose_blobs` and
503    /// subsequent calls skip the read+hash. So the cost on the
504    /// materialize hot path is at most one BLAKE3 over each unique
505    /// blob per process lifetime — negligible for tiny blobs,
506    /// bounded by working-set size for huge ones.
507    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
508        let path = hash_path(&blobs_dir(&self.root), hash);
509        // Fast path: this process already verified (or wrote) this
510        // hash's loose mirror in `promote_to_loose_uncompressed`.
511        // Trust without re-hashing — `path.exists()` is the only
512        // I/O we need.
513        if let Ok(verified) = self.verified_loose_blobs.read()
514            && verified.get(hash).is_some()
515            && path.exists()
516        {
517            return Some(path);
518        }
519
520        // First-time-this-process check: peek the header to filter
521        // out compressed-loose files cheaply, then verify the
522        // body's hash matches what the caller expects. A torn-write
523        // (post-crash) cache mirror fails this and the caller
524        // re-promotes from the pack.
525        //
526        // Header peek must cover the 9-byte modern header **plus**
527        // the 4-byte ZSTD magic that `is_compressed` checks —
528        // peeking only 9 bytes makes `is_compressed` falsely
529        // return `false` on a properly-compressed blob, and we'd
530        // hand the caller the compressed file path. Same off-by-4
531        // we fixed in `BLOB_HEADER_PEEK`.
532        let (header, _) = read_file_header(&path, BLOB_HEADER_PEEK).ok().flatten()?;
533        if is_compressed(&header) {
534            return None;
535        }
536        let bytes = read_file_bytes(&path).ok().flatten()?;
537        let actual = ContentHash::compute_typed("blob", bytes.as_slice());
538        if actual != *hash {
539            // Torn write or unrelated corruption. Leave the file on
540            // disk; the caller's `promote_to_loose_uncompressed`
541            // will overwrite it via the standard temp+rename path.
542            return None;
543        }
544        if let Ok(mut verified) = self.verified_loose_blobs.write() {
545            verified.insert(*hash, ());
546        }
547        Some(path)
548    }
549
550    /// Promote a blob to its uncompressed-loose canonical path so
551    /// `loose_blob_path` returns `Some(path)` and hardlink-first
552    /// materialization fires.
553    ///
554    /// Three cases:
555    /// 1. Already loose+uncompressed: peek the header, no-op.
556    /// 2. Loose but compressed: read+decompress, atomically rewrite
557    ///    the canonical path with raw bytes.
558    /// 3. Pack-only: read out of the pack via `get_blob`, atomically
559    ///    write to the canonical loose path. Pack copy is left in
560    ///    place — the next prune cycle will discard the loose mirror
561    ///    and a future materialize will re-promote.
562    #[instrument(skip(self), fields(hash = %hash.short()))]
563    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
564        let path = hash_path(&blobs_dir(&self.root), hash);
565
566        // Idempotent fast path: already loose AND uncompressed.
567        if let Some((header, _)) = read_file_header(&path, 9)?
568            && !is_compressed(&header)
569        {
570            trace!("Blob already loose+uncompressed; skipping promotion");
571            return Ok(false);
572        }
573
574        // Either compressed-loose or pack-only. Reading via
575        // `get_blob` covers both: compressed-loose decompresses on
576        // the way out, pack-only reads from the loaded pack manager.
577        let blob = self.get_blob(hash)?.ok_or_else(|| {
578            HeddleError::NotFound(format!(
579                "blob {} not found in store; cannot promote to loose-uncompressed",
580                hash
581            ))
582        })?;
583
584        // Install the uncompressed bytes at the canonical loose path
585        // via the cache-mirror atomic-write variant: no fsync, just
586        // temp+rename. The fsync skip is what makes promotion fast
587        // (measured: ~5 ms/blob with `sync_data` vs ~0.2 ms without
588        // on macOS APFS); the safety comes from the read-side hash
589        // check in `loose_blob_path`. A torn write after a crash
590        // produces a file whose content hash doesn't match, so the
591        // next reader rejects it and re-promotes from the pack.
592        //
593        // Record the hash in this process's verified-blobs cache:
594        // we just wrote the bytes ourselves, so the subsequent read
595        // path can trust them without re-hashing.
596        debug!(
597            size = blob.content().len(),
598            "Promoting blob to loose-uncompressed canonical store"
599        );
600        self.write_loose_object_cache(&path, blob.content())?;
601        if let Ok(mut verified) = self.verified_loose_blobs.write() {
602            verified.insert(*hash, ());
603        }
604        Ok(true)
605    }
606
607    #[instrument(skip(self), fields(hash = %hash.short()))]
608    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
609        if let Some(size) = self.try_get_blob_size_once(hash)? {
610            return Ok(Some(size));
611        }
612        // Sibling-store recovery, mirroring the read path: if a
613        // concurrent writer just installed a pack we don't know about,
614        // reload and retry once before reporting a miss.
615        if self.reload_packs_if_stale()?
616            && let Some(size) = self.try_get_blob_size_once(hash)?
617        {
618            return Ok(Some(size));
619        }
620        Ok(None)
621    }
622
623    #[instrument(skip(self), fields(hash = %hash.short()))]
624    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
625        if let Some(tree) = self.try_get_tree_once(hash)? {
626            return Ok(Some(tree));
627        }
628        if self.reload_packs_if_stale()?
629            && let Some(tree) = self.try_get_tree_once(hash)?
630        {
631            return Ok(Some(tree));
632        }
633        trace!("Tree not found");
634        Ok(None)
635    }
636
637    #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
638    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
639        let hash = tree.hash();
640        let path = hash_path(&trees_dir(&self.root), &hash);
641
642        if !path.exists() {
643            let (_, data) = codec::encode_tree(tree, &self.compression)?;
644            trace!(compressed_size = data.len(), "Writing tree");
645            self.write_loose_object_atomic(&path, &data)?;
646        } else {
647            trace!("Tree already exists, skipping write");
648        }
649        if let Ok(mut cache) = self.recent_trees.write() {
650            cache.insert(hash, tree.clone());
651        }
652
653        Ok(hash)
654    }
655
656    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
657    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
658        let tree = validate_tree_serialized(data, hash)?;
659
660        let path = hash_path(&trees_dir(&self.root), &hash);
661        if !path.exists() {
662            trace!(size = data.len(), "Writing raw serialized tree");
663            self.write_loose_object_atomic(&path, data)?;
664        }
665        if let Ok(mut cache) = self.recent_trees.write() {
666            cache.insert(hash, tree);
667        }
668
669        Ok(hash)
670    }
671
672    #[instrument(skip(self), fields(hash = %hash.short()))]
673    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
674        if self.try_has_tree_once(hash)? {
675            return Ok(true);
676        }
677        if self.reload_packs_if_stale()? {
678            return self.try_has_tree_once(hash);
679        }
680        Ok(false)
681    }
682
683    #[instrument(skip(self), fields(id = %id.short()))]
684    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
685        if let Some(state) = self.try_get_state_once(id)? {
686            return Ok(Some(state));
687        }
688        if self.reload_packs_if_stale()?
689            && let Some(state) = self.try_get_state_once(id)?
690        {
691            return Ok(Some(state));
692        }
693        trace!("State not found");
694        Ok(None)
695    }
696
697    #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
698    fn put_state(&self, state: &State) -> Result<()> {
699        let path = state_path(&self.root, &state.change_id);
700        let data = codec::encode_state(state, &self.compression)?;
701        trace!(compressed_size = data.len(), "Writing state");
702        self.write_loose_object_atomic(&path, &data)?;
703        if let Ok(mut cache) = self.recent_states.write() {
704            cache.insert(state.change_id, state.clone());
705        }
706        Ok(())
707    }
708
709    #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
710    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
711        let state = validate_state_serialized(data, id)?;
712        let path = state_path(&self.root, &id);
713        trace!(size = data.len(), "Writing raw serialized state");
714        self.write_loose_object_atomic(&path, data)?;
715        if let Ok(mut cache) = self.recent_states.write() {
716            cache.insert(id, state);
717        }
718        Ok(())
719    }
720
721    #[instrument(skip(self), fields(id = %id.short()))]
722    fn has_state(&self, id: &ChangeId) -> Result<bool> {
723        if self.try_has_state_once(id)? {
724            return Ok(true);
725        }
726        if self.reload_packs_if_stale()? {
727            return self.try_has_state_once(id);
728        }
729        Ok(false)
730    }
731
732    #[instrument(skip(self))]
733    fn list_states(&self) -> Result<Vec<ChangeId>> {
734        self.reload_packs_if_stale()?;
735
736        let dir = states_dir(&self.root);
737        if !dir.exists() {
738            return Ok(Vec::new());
739        }
740
741        let mut states = Vec::new();
742        for entry in fs::read_dir(&dir)? {
743            let entry = entry?;
744            let path = entry.path();
745            if let Some(name) = path.file_stem()
746                && let Some(name_str) = name.to_str()
747                && let Ok(id) = ChangeId::parse(name_str)
748            {
749                states.push(id);
750            }
751        }
752        if let Ok(manager) = self.pack_manager().read() {
753            for id in manager.list_all_ids()? {
754                if let PackObjectId::ChangeId(change_id) = id
755                    && !states.contains(&change_id)
756                {
757                    states.push(change_id);
758                }
759            }
760        }
761        debug!(count = states.len(), "Listed states");
762        Ok(states)
763    }
764
765    #[instrument(skip(self), fields(id = %id))]
766    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
767        let path = action_path(&self.root, id);
768        if !path.exists()
769            && let Ok(manager) = self.pack_manager().read()
770            && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
771            && obj_type == ObjectType::Action
772        {
773            trace!("Found action in packfile");
774            let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
775            return Ok(Some(action));
776        }
777        match read_file_bytes(&path)? {
778            Some(data) => {
779                trace!(size = data.as_slice().len(), "Action data read");
780                let action = validate_loaded_action(id, codec::decode_action(data.as_slice())?)?;
781                Ok(Some(action))
782            }
783            None => {
784                trace!("Action not found");
785                Ok(None)
786            }
787        }
788    }
789
790    #[instrument(skip(self, action))]
791    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
792        let id = action.id();
793        let path = action_path(&self.root, &id);
794
795        if !path.exists() {
796            let (_, data) = codec::encode_action(action, &self.compression)?;
797            trace!(id = %id, compressed_size = data.len(), "Writing action");
798            self.write_loose_object_atomic(&path, &data)?;
799        }
800
801        Ok(id)
802    }
803
804    #[instrument(skip(self))]
805    fn list_actions(&self) -> Result<Vec<ActionId>> {
806        let dir = actions_dir(&self.root);
807        let mut actions = Vec::new();
808        if dir.exists() {
809            for entry in fs::read_dir(&dir)? {
810                let entry = entry?;
811                let path = entry.path();
812                if let Some(name) = path.file_stem()
813                    && let Some(name_str) = name.to_str()
814                    && let Ok(hash) = ContentHash::from_hex(name_str)
815                {
816                    actions.push(ActionId::from_hash(hash));
817                }
818            }
819        }
820        if let Ok(manager) = self.pack_manager().read() {
821            for id in manager.list_all_ids()? {
822                if let PackObjectId::Hash(hash) = id
823                    && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
824                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
825                    && obj_type == ObjectType::Action
826                {
827                    actions.push(ActionId::from_hash(hash));
828                }
829            }
830        }
831        debug!(count = actions.len(), "Listed actions");
832        Ok(actions)
833    }
834
835    #[instrument(skip(self))]
836    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
837        let dir = blobs_dir(&self.root);
838        let mut blobs = list_hashes_from_dir(&dir)?;
839        if let Ok(manager) = self.pack_manager().read() {
840            for id in manager.list_all_ids()? {
841                if let PackObjectId::Hash(hash) = id
842                    && !blobs.contains(&hash)
843                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
844                    && obj_type == ObjectType::Blob
845                {
846                    blobs.push(hash);
847                }
848            }
849        }
850        Ok(blobs)
851    }
852
853    #[instrument(skip(self))]
854    fn list_trees(&self) -> Result<Vec<ContentHash>> {
855        let dir = trees_dir(&self.root);
856        let mut trees = list_hashes_from_dir(&dir)?;
857        if let Ok(manager) = self.pack_manager().read() {
858            for id in manager.list_all_ids()? {
859                if let PackObjectId::Hash(hash) = id
860                    && !trees.contains(&hash)
861                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
862                    && obj_type == ObjectType::Tree
863                {
864                    trees.push(hash);
865                }
866            }
867        }
868        Ok(trees)
869    }
870
871    #[instrument(skip(self))]
872    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
873        self.pack_objects_impl(aggressive)
874    }
875
876    #[instrument(skip(self), fields(id = ?id))]
877    fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
878        if let Ok(manager) = self.pack_manager().read()
879            && let Some((obj_type, data)) = manager.get_object(id)?
880        {
881            return Ok(Some((obj_type, data)));
882        }
883
884        match id {
885            PackObjectId::Hash(hash) => {
886                if let Some(blob) = self.get_blob(hash)? {
887                    return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
888                }
889                if let Some(tree) = self.get_tree(hash)? {
890                    return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
891                }
892                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
893                    return Ok(Some((
894                        ObjectType::Action,
895                        rmp_serde::to_vec_named(&action)?,
896                    )));
897                }
898                Ok(None)
899            }
900            PackObjectId::ChangeId(change_id) => {
901                if let Some(state) = self.get_state(change_id)? {
902                    Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
903                } else {
904                    Ok(None)
905                }
906            }
907        }
908    }
909
910    #[instrument(skip(self, pack_data, index_data))]
911    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
912        let reader = crate::store::pack::PackReader::from_slice(pack_data, index_data)?;
913        let ids = validate_and_list_pack(&reader)?;
914        self.install_pack_files(pack_data, index_data)?;
915        self.clear_recent_object_caches();
916        Ok(ids)
917    }
918
919    #[instrument(skip(self, blobs), fields(count = blobs.len()))]
920    fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
921        self.put_blobs_packed_impl(blobs)
922    }
923
924    #[instrument(skip(self))]
925    fn install_pack_streaming(
926        &self,
927        pack_path: &std::path::Path,
928        index_path: &std::path::Path,
929    ) -> Result<Vec<PackObjectId>> {
930        // Validate + list ids through the same core as the byte-buffer
931        // seam, but via an mmap-backed reader so the pack is never
932        // copied into the heap — the memory-bounded promise survives.
933        // Drop the reader (releasing the mmap) before the rename so
934        // the file move isn't racing an open mapping.
935        let ids = {
936            let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
937            validate_and_list_pack(&reader)?
938        };
939        self.install_pack_files_streaming(pack_path, index_path)?;
940        Ok(ids)
941    }
942
943    #[instrument(skip(self))]
944    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
945        self.prune_loose_objects_impl()
946    }
947
948    #[instrument(skip(self))]
949    fn begin_snapshot_write_batch(&self) -> Result<()> {
950        self.begin_snapshot_write_batch_impl()
951    }
952
953    #[instrument(skip(self))]
954    fn flush_snapshot_write_batch(&self) -> Result<()> {
955        self.flush_snapshot_write_batch_impl()
956    }
957
958    #[instrument(skip(self))]
959    fn abort_snapshot_write_batch(&self) {
960        self.abort_snapshot_write_batch_impl();
961    }
962
963    fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
964        Ok(redaction_path(&self.root, blob).exists())
965    }
966
967    fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
968        let path = redaction_path(&self.root, blob);
969        match fs::read(&path) {
970            Ok(bytes) => Ok(Some(bytes)),
971            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
972            Err(err) => Err(HeddleError::Io(err)),
973        }
974    }
975
976    fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
977        let dir = redactions_dir(&self.root);
978        if !dir.exists() {
979            fs::create_dir_all(&dir)?;
980        }
981        let path = redaction_path(&self.root, blob);
982        crate::fs_atomic::write_file_atomic(&path, bytes)?;
983        Ok(())
984    }
985
986    fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
987        let dir = redactions_dir(&self.root);
988        if !dir.exists() {
989            return Ok(Vec::new());
990        }
991        let mut out = Vec::new();
992        for entry in fs::read_dir(&dir)? {
993            let entry = entry?;
994            let path = entry.path();
995            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
996                continue;
997            }
998            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
999                continue;
1000            };
1001            if let Ok(hash) = ContentHash::from_hex(stem) {
1002                out.push(hash);
1003            }
1004        }
1005        Ok(out)
1006    }
1007
1008    fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
1009        Ok(state_visibility_path(&self.root, state).exists())
1010    }
1011
1012    fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
1013        let path = state_visibility_path(&self.root, state);
1014        match fs::read(&path) {
1015            Ok(bytes) => Ok(Some(bytes)),
1016            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1017            Err(err) => Err(HeddleError::Io(err)),
1018        }
1019    }
1020
1021    fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
1022        let dir = state_visibility_dir(&self.root);
1023        if !dir.exists() {
1024            fs::create_dir_all(&dir)?;
1025        }
1026        let path = state_visibility_path(&self.root, state);
1027        crate::fs_atomic::write_file_atomic(&path, bytes)?;
1028        Ok(())
1029    }
1030
1031    fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
1032        let dir = state_visibility_dir(&self.root);
1033        if !dir.exists() {
1034            return Ok(Vec::new());
1035        }
1036        let mut out = Vec::new();
1037        for entry in fs::read_dir(&dir)? {
1038            let entry = entry?;
1039            let path = entry.path();
1040            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1041                continue;
1042            }
1043            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1044                continue;
1045            };
1046            if let Ok(state) = ChangeId::parse(stem) {
1047                out.push(state);
1048            }
1049        }
1050        Ok(out)
1051    }
1052}