Skip to main content

objects/store/fs/
fs_impl.rs

1// SPDX-License-Identifier: Apache-2.0
2//! ObjectStore implementation for FsStore.
3
4use std::{
5    fs,
6    path::{Path, PathBuf},
7};
8
9use tracing::{debug, instrument, trace};
10
11use super::{
12    FsStore,
13    fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
14    fs_paths::{
15        action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
16        state_visibility_dir, state_visibility_path, states_dir, trees_dir,
17    },
18};
19use crate::{
20    object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
21    store::{
22        HeddleError, ObjectStore, Result,
23        compression::{compress, decompress, header_uncompressed_size, is_compressed},
24        pack::{ObjectType, PackManager, PackObjectId},
25    },
26};
27
28/// Bytes we read off disk to recover a blob's uncompressed size.
29/// Must cover the 9-byte modern header **plus** the 4-byte ZSTD
30/// magic that `header_uncompressed_size` uses to disambiguate
31/// modern from legacy (5-byte) headers — without the magic in the
32/// peek buffer the lookup silently returns the on-disk byte length
33/// instead of the recorded uncompressed size, which left `stat`
34/// reporting the compressed size of every loose blob.
35const BLOB_HEADER_PEEK: usize = 13;
36
37fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
38    tree.validate()?;
39    Ok(tree)
40}
41
42fn validate_blob_bytes(data: &[u8], hash: ContentHash) -> Result<()> {
43    let mut hasher = ContentHash::typed_hasher("blob", data.len() as u64);
44    hasher.update(data);
45    let found = ContentHash::from_bytes(hasher.finalize().into());
46    if found != hash {
47        return Err(HeddleError::Corruption {
48            expected: hash,
49            found,
50        });
51    }
52
53    Ok(())
54}
55
56fn validate_tree_serialized(data: &[u8], hash: ContentHash) -> Result<Tree> {
57    let tree: Tree = rmp_serde::from_slice(data)?;
58    let tree = validate_loaded_tree(tree)?;
59    let found = tree.hash();
60    if found != hash {
61        return Err(HeddleError::Corruption {
62            expected: hash,
63            found,
64        });
65    }
66
67    Ok(tree)
68}
69
70fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
71    if state.change_id != *requested_id {
72        return Err(HeddleError::InvalidObject(format!(
73            "state change_id mismatch: requested {}, found {}",
74            requested_id, state.change_id
75        )));
76    }
77
78    Ok(state)
79}
80
81fn validate_state_serialized(data: &[u8], id: ChangeId) -> Result<State> {
82    let state: State = rmp_serde::from_slice(data)?;
83    validate_loaded_state(&id, state)
84}
85
86fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
87    let found_id = action.compute_id();
88    if found_id != *requested_id {
89        return Err(HeddleError::InvalidObject(format!(
90            "action id mismatch: requested {}, found {}",
91            requested_id, found_id
92        )));
93    }
94
95    Ok(action)
96}
97
98fn validate_action_serialized(data: &[u8], id: ActionId) -> Result<Action> {
99    let action: Action = rmp_serde::from_slice(data)?;
100    validate_loaded_action(&id, action)
101}
102
103/// Validate every entry in a pack against its tagged id (checksum
104/// validation) and return the installed id list. This is the shared
105/// validated core for both install seams: the byte-buffer install
106/// (`install_pack`) and the memory-bounded temp-file install
107/// (`install_pack_streaming`) both run their pack through here, so
108/// both apply the same checksum validation and report the same
109/// installed ids regardless of how the bytes reach the store.
110fn validate_and_list_pack(reader: &crate::store::pack::PackReader) -> Result<Vec<PackObjectId>> {
111    let ids = reader.list_ids();
112    for id in &ids {
113        let Some((obj_type, data)) = reader.get_object_bytes(id)? else {
114            continue;
115        };
116        validate_pack_entry(id, obj_type, data.as_ref())?;
117    }
118    Ok(ids)
119}
120
121fn validate_pack_entry(id: &PackObjectId, obj_type: ObjectType, data: &[u8]) -> Result<()> {
122    match (id, obj_type) {
123        (PackObjectId::Hash(hash), ObjectType::Blob) => validate_blob_bytes(data, *hash),
124        (PackObjectId::Hash(hash), ObjectType::Tree) => {
125            validate_tree_serialized(data, *hash).map(|_| ())
126        }
127        (PackObjectId::Hash(hash), ObjectType::Action) => {
128            validate_action_serialized(data, ActionId::from_hash(*hash)).map(|_| ())
129        }
130        (PackObjectId::ChangeId(change_id), ObjectType::State) => {
131            validate_state_serialized(data, *change_id).map(|_| ())
132        }
133        _ => Err(HeddleError::InvalidObject(format!(
134            "unsupported native pack object: {:?} {:?}",
135            id, obj_type
136        ))),
137    }
138}
139
140impl FsStore {
141    /// Single-pass blob lookup. The wrapper in `ObjectStore::get_blob`
142    /// retries this once after a stale-reload on miss.
143    fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
144        let path = hash_path(&blobs_dir(&self.root), hash);
145        let loose_exists = path.exists();
146        let pack_has = if loose_exists {
147            false
148        } else if let Ok(manager) = self.pack_manager().read() {
149            manager.has_object(hash)
150        } else {
151            false
152        };
153        if (loose_exists || pack_has)
154            && let Ok(cache) = self.recent_blobs.read()
155            && let Some(blob) = cache.get(hash)
156        {
157            trace!("Found blob in recent object cache");
158            return Ok(Some(blob.clone()));
159        }
160
161        if let Ok(manager) = self.pack_manager().read()
162            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
163            && obj_type == ObjectType::Blob
164        {
165            trace!("Found blob in packfile");
166            // Step 2: skip the BLAKE3 re-hash. The pack reader already
167            // located this entry by its content-addressed key in the
168            // pack index — anything served here either matches or
169            // means the pack itself is corrupted in ways a per-read
170            // hash check can't recover from cleanly. For multi-MB
171            // blobs the verify was the dominant tail of the cold
172            // read (~3GB/s × 10MB ≈ 3.3ms per call).
173            return Ok(Some(Blob::new(data)));
174        }
175
176        match read_file_bytes(&path)? {
177            Some(data) => {
178                trace!(size = data.as_slice().len(), "Blob data read");
179                let content = if is_compressed(data.as_slice()) {
180                    decompress(data.as_slice())?
181                } else {
182                    data.into_vec()
183                };
184                let blob = Blob::new(content);
185                // Loose blobs are bare bytes on disk: a half-written
186                // file or bit-rot inside the payload would slip past
187                // the path-is-the-hash invariant. Keep the verify on
188                // this path. Pack-resident reads above skip it because
189                // pack entries are framed with offset + length records
190                // that fail to parse if the pack is corrupt.
191                if blob.hash() != *hash {
192                    return Err(HeddleError::Corruption {
193                        expected: *hash,
194                        found: blob.hash(),
195                    });
196                }
197                Ok(Some(blob))
198            }
199            None => Ok(None),
200        }
201    }
202
203    /// Shared body for `try_has_{blob,tree,state}_once`: object is
204    /// present iff the loose path exists or the pack manager
205    /// resolves it. Callers pass the loose path and the
206    /// pack-manager probe; the helper handles the lock.
207    fn loose_or_packed(
208        &self,
209        loose_path: &Path,
210        in_pack: impl FnOnce(&PackManager) -> bool,
211    ) -> Result<bool> {
212        if loose_path.exists() {
213            return Ok(true);
214        }
215        if let Ok(manager) = self.pack_manager().read() {
216            return Ok(in_pack(&manager));
217        }
218        Ok(false)
219    }
220
221    fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
222        let path = hash_path(&blobs_dir(&self.root), hash);
223        self.loose_or_packed(&path, |m| m.has_object(hash))
224    }
225
226    /// Header-only size lookup for a single attempt. Tries:
227    /// 1. The recent-blob cache (we already have the bytes in
228    ///    memory — `len()` is free).
229    /// 2. The loose blob: peek the 9-byte compression header. For a
230    ///    compressed blob the recorded uncompressed size lives in the
231    ///    header. For an uncompressed blob (no recognised header) the
232    ///    on-disk file length IS the blob size.
233    /// 3. Any loaded pack: the pack format records the uncompressed
234    ///    size as a varint right after the tagged id, so we can decode
235    ///    it without touching the body.
236    ///
237    /// Cost: one short read (typically 9 bytes) for loose blobs, or a
238    /// pure in-memory varint decode for packed blobs. *No*
239    /// decompression.
240    fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
241        if let Ok(cache) = self.recent_blobs.read()
242            && let Some(blob) = cache.get(hash)
243        {
244            return Ok(Some(blob.content().len() as u64));
245        }
246
247        let path = hash_path(&blobs_dir(&self.root), hash);
248        if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
249            if let Some(size) = header_uncompressed_size(&header) {
250                return Ok(Some(size));
251            }
252            // No recognised compression header — the file is raw
253            // blob bytes. The on-disk length is the blob size.
254            return Ok(Some(file_len));
255        }
256
257        if let Ok(manager) = self.pack_manager().read()
258            && let Some(size) = manager.get_hashed_object_size(hash)?
259        {
260            return Ok(Some(size));
261        }
262        Ok(None)
263    }
264
265    fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
266        // Cache first. The recent-object cache only ever holds trees we
267        // wrote or read this process, so a hit is authoritative for a
268        // read — no need to confirm on-disk existence first. Skipping
269        // that probe removes a `stat` (and a pack-index lookup) per
270        // call, which dominates `heddle status` on directory-heavy
271        // trees (the walker loads one subtree per directory).
272        if let Ok(cache) = self.recent_trees.read()
273            && let Some(tree) = cache.get(hash)
274        {
275            trace!("Found tree in recent object cache");
276            return Ok(Some(tree.clone()));
277        }
278
279        if let Ok(manager) = self.pack_manager().read()
280            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
281            && obj_type == ObjectType::Tree
282        {
283            trace!("Found tree in packfile");
284            let tree = validate_loaded_tree(rmp_serde::from_slice(&data)?)?;
285            if tree.hash() != *hash {
286                return Err(HeddleError::Corruption {
287                    expected: *hash,
288                    found: tree.hash(),
289                });
290            }
291            return Ok(Some(tree));
292        }
293
294        let path = hash_path(&trees_dir(&self.root), hash);
295        match read_file_bytes(&path)? {
296            Some(data) => {
297                trace!(size = data.as_slice().len(), "Tree data read");
298                let decoded = if is_compressed(data.as_slice()) {
299                    decompress(data.as_slice())?
300                } else {
301                    data.into_vec()
302                };
303                let tree = validate_loaded_tree(rmp_serde::from_slice(&decoded)?)?;
304                if tree.hash() != *hash {
305                    return Err(HeddleError::Corruption {
306                        expected: *hash,
307                        found: tree.hash(),
308                    });
309                }
310                if let Ok(mut cache) = self.recent_trees.write() {
311                    cache.insert(*hash, tree.clone());
312                }
313                Ok(Some(tree))
314            }
315            None => Ok(None),
316        }
317    }
318
319    fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
320        let path = hash_path(&trees_dir(&self.root), hash);
321        self.loose_or_packed(&path, |m| m.has_object(hash))
322    }
323
324    fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
325        let path = state_path(&self.root, id);
326        let loose_exists = path.exists();
327        let pack_has = if loose_exists {
328            false
329        } else if let Ok(manager) = self.pack_manager().read() {
330            manager.has_object_id(&PackObjectId::ChangeId(*id))
331        } else {
332            false
333        };
334        if (loose_exists || pack_has)
335            && let Ok(cache) = self.recent_states.read()
336            && let Some(state) = cache.get(id)
337        {
338            trace!("Found state in recent object cache");
339            return Ok(Some(state.clone()));
340        }
341
342        // States are addressed by `change_id`, NOT by content hash, so the same
343        // id can resolve to two different bodies: a copy packed at adopt/import
344        // time and a newer LOOSE copy written by an authorized rewrite (the #570
345        // fidelity backfill re-hashes + rewrites adopted states loose). The
346        // loose object MUST shadow the packed one — read it FIRST and only
347        // consult the pack when no loose object exists, or a cold read (cache
348        // miss) returns the stale packed body. (Trees/blobs are
349        // content-addressed and can't go stale this way, so their read paths
350        // deliberately keep pack-first ordering.)
351        if loose_exists && let Some(data) = read_file_bytes(&path)? {
352            trace!(
353                size = data.as_slice().len(),
354                "State read from loose object (shadows any packed copy)"
355            );
356            let decoded = if is_compressed(data.as_slice()) {
357                decompress(data.as_slice())?
358            } else {
359                data.into_vec()
360            };
361            let state = validate_loaded_state(id, rmp_serde::from_slice(&decoded)?)?;
362            if let Ok(mut cache) = self.recent_states.write() {
363                cache.insert(*id, state.clone());
364            }
365            return Ok(Some(state));
366        }
367
368        if let Ok(manager) = self.pack_manager().read()
369            && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
370            && obj_type == ObjectType::State
371        {
372            trace!("Found state in packfile");
373            let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
374            if let Ok(mut cache) = self.recent_states.write() {
375                cache.insert(*id, state.clone());
376            }
377            return Ok(Some(state));
378        }
379
380        Ok(None)
381    }
382
383    fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
384        let path = state_path(&self.root, id);
385        self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
386    }
387}
388
389impl ObjectStore for FsStore {
390    fn clear_recent_caches(&self) {
391        self.clear_recent_object_caches();
392    }
393
394    /// Zero-copy pack fast path. When the blob lives in a packfile
395    /// and is non-delta + uncompressed, returns a `Bytes::slice`
396    /// view of the pack's mmap — no decompression, no allocation,
397    /// no memcpy. Compressed pack entries, delta entries, and
398    /// loose blobs fall back to `get_blob` and wrap the result in a
399    /// `Bytes` (the `Vec` → `Bytes` conversion is itself zero-copy).
400    fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
401        if let Ok(manager) = self.pack_manager().read()
402            && let Some((obj_type, data)) = manager.get_hashed_object_bytes(hash)?
403            && obj_type == crate::store::pack::ObjectType::Blob
404        {
405            return Ok(Some(data));
406        }
407        Ok(self
408            .get_blob(hash)?
409            .map(|blob| bytes::Bytes::from(blob.into_content())))
410    }
411
412    #[instrument(skip(self), fields(hash = %hash.short()))]
413    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
414        if let Some(blob) = self.try_get_blob_once(hash)? {
415            return Ok(Some(blob));
416        }
417        // Miss path: a sibling FsStore (e.g. the worktree's repo
418        // backing the same `.heddle/`) may have installed a new pack
419        // since we loaded ours. Cheap disk-count check first; full
420        // reload only when the count grew.
421        if self.reload_packs_if_stale()?
422            && let Some(blob) = self.try_get_blob_once(hash)?
423        {
424            return Ok(Some(blob));
425        }
426        trace!("Blob not found");
427        Ok(None)
428    }
429
430    #[instrument(skip(self, blob), fields(size = blob.content().len()))]
431    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
432        let hash = blob.hash();
433        let path = hash_path(&blobs_dir(&self.root), &hash);
434
435        if !path.exists() {
436            let content = blob.content();
437            let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
438            trace!(compressed_size = data.len(), "Writing blob");
439            self.write_loose_object_atomic(&path, &data)?;
440        } else {
441            trace!("Blob already exists, skipping write");
442        }
443        if let Ok(mut cache) = self.recent_blobs.write() {
444            cache.insert(hash, blob.clone());
445        }
446
447        Ok(hash)
448    }
449
450    #[instrument(skip(self, blob), fields(hash = %hash.short()))]
451    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
452        if blob.hash() != hash {
453            return Err(HeddleError::Corruption {
454                expected: hash,
455                found: blob.hash(),
456            });
457        }
458
459        let path = hash_path(&blobs_dir(&self.root), &hash);
460
461        if !path.exists() {
462            let content = blob.content();
463            let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
464            trace!(
465                compressed_size = data.len(),
466                "Writing blob with precomputed hash"
467            );
468            self.write_loose_object_atomic(&path, &data)?;
469        }
470        if let Ok(mut cache) = self.recent_blobs.write() {
471            cache.insert(hash, blob.clone());
472        }
473
474        Ok(hash)
475    }
476
477    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
478    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
479        validate_blob_bytes(data, hash)?;
480
481        let path = hash_path(&blobs_dir(&self.root), &hash);
482        if !path.exists() {
483            trace!(
484                size = data.len(),
485                "Writing raw blob bytes with precomputed hash"
486            );
487            self.write_loose_object_atomic(&path, data)?;
488        }
489        if let Ok(mut cache) = self.recent_blobs.write() {
490            cache.insert(hash, Blob::from_slice(data));
491        }
492
493        Ok(hash)
494    }
495
496    #[instrument(skip(self), fields(hash = %hash.short()))]
497    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
498        if self.try_has_blob_once(hash)? {
499            return Ok(true);
500        }
501        if self.reload_packs_if_stale()? {
502            return self.try_has_blob_once(hash);
503        }
504        Ok(false)
505    }
506
507    /// Loose blob path safe for clonefile/copy materialization.
508    ///
509    /// Returns `Some(path)` only when the loose file exists, is
510    /// stored uncompressed, *and* its bytes hash to the expected
511    /// content hash. Compressed blobs and pack-only blobs fall
512    /// through to `None`; so do *torn* cache-mirror files (the
513    /// `AtomicWriteMode::NoSync` write side may leave one if the
514    /// host crashed during a previous promote). On the torn case
515    /// the caller re-promotes from the authoritative pack copy.
516    ///
517    /// Verification is amortised: a hash that passes the check once
518    /// in this process is recorded in `verified_loose_blobs` and
519    /// subsequent calls skip the read+hash. So the cost on the
520    /// materialize hot path is at most one BLAKE3 over each unique
521    /// blob per process lifetime — negligible for tiny blobs,
522    /// bounded by working-set size for huge ones.
523    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
524        let path = hash_path(&blobs_dir(&self.root), hash);
525        // Fast path: this process already verified (or wrote) this
526        // hash's loose mirror in `promote_to_loose_uncompressed`.
527        // Trust without re-hashing — `path.exists()` is the only
528        // I/O we need.
529        if let Ok(verified) = self.verified_loose_blobs.read()
530            && verified.get(hash).is_some()
531            && path.exists()
532        {
533            return Some(path);
534        }
535
536        // First-time-this-process check: peek the header to filter
537        // out compressed-loose files cheaply, then verify the
538        // body's hash matches what the caller expects. A torn-write
539        // (post-crash) cache mirror fails this and the caller
540        // re-promotes from the pack.
541        //
542        // Header peek must cover the 9-byte modern header **plus**
543        // the 4-byte ZSTD magic that `is_compressed` checks —
544        // peeking only 9 bytes makes `is_compressed` falsely
545        // return `false` on a properly-compressed blob, and we'd
546        // hand the caller the compressed file path. Same off-by-4
547        // we fixed in `BLOB_HEADER_PEEK`.
548        let (header, _) = read_file_header(&path, BLOB_HEADER_PEEK).ok().flatten()?;
549        if is_compressed(&header) {
550            return None;
551        }
552        let bytes = read_file_bytes(&path).ok().flatten()?;
553        let actual = ContentHash::compute_typed("blob", bytes.as_slice());
554        if actual != *hash {
555            // Torn write or unrelated corruption. Leave the file on
556            // disk; the caller's `promote_to_loose_uncompressed`
557            // will overwrite it via the standard temp+rename path.
558            return None;
559        }
560        if let Ok(mut verified) = self.verified_loose_blobs.write() {
561            verified.insert(*hash, ());
562        }
563        Some(path)
564    }
565
566    /// Promote a blob to its uncompressed-loose canonical path so
567    /// `loose_blob_path` returns `Some(path)` and hardlink-first
568    /// materialization fires.
569    ///
570    /// Three cases:
571    /// 1. Already loose+uncompressed: peek the header, no-op.
572    /// 2. Loose but compressed: read+decompress, atomically rewrite
573    ///    the canonical path with raw bytes.
574    /// 3. Pack-only: read out of the pack via `get_blob`, atomically
575    ///    write to the canonical loose path. Pack copy is left in
576    ///    place — the next prune cycle will discard the loose mirror
577    ///    and a future materialize will re-promote.
578    #[instrument(skip(self), fields(hash = %hash.short()))]
579    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
580        let path = hash_path(&blobs_dir(&self.root), hash);
581
582        // Idempotent fast path: already loose AND uncompressed.
583        if let Some((header, _)) = read_file_header(&path, 9)?
584            && !is_compressed(&header)
585        {
586            trace!("Blob already loose+uncompressed; skipping promotion");
587            return Ok(false);
588        }
589
590        // Either compressed-loose or pack-only. Reading via
591        // `get_blob` covers both: compressed-loose decompresses on
592        // the way out, pack-only reads from the loaded pack manager.
593        let blob = self.get_blob(hash)?.ok_or_else(|| {
594            HeddleError::NotFound(format!(
595                "blob {} not found in store; cannot promote to loose-uncompressed",
596                hash
597            ))
598        })?;
599
600        // Install the uncompressed bytes at the canonical loose path
601        // via the cache-mirror atomic-write variant: no fsync, just
602        // temp+rename. The fsync skip is what makes promotion fast
603        // (measured: ~5 ms/blob with `sync_data` vs ~0.2 ms without
604        // on macOS APFS); the safety comes from the read-side hash
605        // check in `loose_blob_path`. A torn write after a crash
606        // produces a file whose content hash doesn't match, so the
607        // next reader rejects it and re-promotes from the pack.
608        //
609        // Record the hash in this process's verified-blobs cache:
610        // we just wrote the bytes ourselves, so the subsequent read
611        // path can trust them without re-hashing.
612        debug!(
613            size = blob.content().len(),
614            "Promoting blob to loose-uncompressed canonical store"
615        );
616        self.write_loose_object_cache(&path, blob.content())?;
617        if let Ok(mut verified) = self.verified_loose_blobs.write() {
618            verified.insert(*hash, ());
619        }
620        Ok(true)
621    }
622
623    #[instrument(skip(self), fields(hash = %hash.short()))]
624    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
625        if let Some(size) = self.try_get_blob_size_once(hash)? {
626            return Ok(Some(size));
627        }
628        // Sibling-store recovery, mirroring the read path: if a
629        // concurrent writer just installed a pack we don't know about,
630        // reload and retry once before reporting a miss.
631        if self.reload_packs_if_stale()?
632            && let Some(size) = self.try_get_blob_size_once(hash)?
633        {
634            return Ok(Some(size));
635        }
636        Ok(None)
637    }
638
639    #[instrument(skip(self), fields(hash = %hash.short()))]
640    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
641        if let Some(tree) = self.try_get_tree_once(hash)? {
642            return Ok(Some(tree));
643        }
644        if self.reload_packs_if_stale()?
645            && let Some(tree) = self.try_get_tree_once(hash)?
646        {
647            return Ok(Some(tree));
648        }
649        trace!("Tree not found");
650        Ok(None)
651    }
652
653    #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
654    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
655        let hash = tree.hash();
656        let path = hash_path(&trees_dir(&self.root), &hash);
657
658        if !path.exists() {
659            let serialized = rmp_serde::to_vec(tree)?;
660            let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
661            trace!(compressed_size = data.len(), "Writing tree");
662            self.write_loose_object_atomic(&path, &data)?;
663        } else {
664            trace!("Tree already exists, skipping write");
665        }
666        if let Ok(mut cache) = self.recent_trees.write() {
667            cache.insert(hash, tree.clone());
668        }
669
670        Ok(hash)
671    }
672
673    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
674    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
675        let tree = validate_tree_serialized(data, hash)?;
676
677        let path = hash_path(&trees_dir(&self.root), &hash);
678        if !path.exists() {
679            trace!(size = data.len(), "Writing raw serialized tree");
680            self.write_loose_object_atomic(&path, data)?;
681        }
682        if let Ok(mut cache) = self.recent_trees.write() {
683            cache.insert(hash, tree);
684        }
685
686        Ok(hash)
687    }
688
689    #[instrument(skip(self), fields(hash = %hash.short()))]
690    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
691        if self.try_has_tree_once(hash)? {
692            return Ok(true);
693        }
694        if self.reload_packs_if_stale()? {
695            return self.try_has_tree_once(hash);
696        }
697        Ok(false)
698    }
699
700    #[instrument(skip(self), fields(id = %id.short()))]
701    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
702        if let Some(state) = self.try_get_state_once(id)? {
703            return Ok(Some(state));
704        }
705        if self.reload_packs_if_stale()?
706            && let Some(state) = self.try_get_state_once(id)?
707        {
708            return Ok(Some(state));
709        }
710        trace!("State not found");
711        Ok(None)
712    }
713
714    #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
715    fn put_state(&self, state: &State) -> Result<()> {
716        let path = state_path(&self.root, &state.change_id);
717        let serialized = rmp_serde::to_vec(state)?;
718        let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
719        trace!(compressed_size = data.len(), "Writing state");
720        self.write_loose_object_atomic(&path, &data)?;
721        if let Ok(mut cache) = self.recent_states.write() {
722            cache.insert(state.change_id, state.clone());
723        }
724        Ok(())
725    }
726
727    #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
728    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
729        let state = validate_state_serialized(data, id)?;
730        let path = state_path(&self.root, &id);
731        trace!(size = data.len(), "Writing raw serialized state");
732        self.write_loose_object_atomic(&path, data)?;
733        if let Ok(mut cache) = self.recent_states.write() {
734            cache.insert(id, state);
735        }
736        Ok(())
737    }
738
739    #[instrument(skip(self), fields(id = %id.short()))]
740    fn has_state(&self, id: &ChangeId) -> Result<bool> {
741        if self.try_has_state_once(id)? {
742            return Ok(true);
743        }
744        if self.reload_packs_if_stale()? {
745            return self.try_has_state_once(id);
746        }
747        Ok(false)
748    }
749
750    #[instrument(skip(self))]
751    fn list_states(&self) -> Result<Vec<ChangeId>> {
752        self.reload_packs_if_stale()?;
753
754        let dir = states_dir(&self.root);
755        if !dir.exists() {
756            return Ok(Vec::new());
757        }
758
759        let mut states = Vec::new();
760        for entry in fs::read_dir(&dir)? {
761            let entry = entry?;
762            let path = entry.path();
763            if let Some(name) = path.file_stem()
764                && let Some(name_str) = name.to_str()
765                && let Ok(id) = ChangeId::parse(name_str)
766            {
767                states.push(id);
768            }
769        }
770        if let Ok(manager) = self.pack_manager().read() {
771            for id in manager.list_all_ids()? {
772                if let PackObjectId::ChangeId(change_id) = id
773                    && !states.contains(&change_id)
774                {
775                    states.push(change_id);
776                }
777            }
778        }
779        debug!(count = states.len(), "Listed states");
780        Ok(states)
781    }
782
783    #[instrument(skip(self), fields(id = %id))]
784    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
785        let path = action_path(&self.root, id);
786        if !path.exists()
787            && let Ok(manager) = self.pack_manager().read()
788            && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
789            && obj_type == ObjectType::Action
790        {
791            trace!("Found action in packfile");
792            let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
793            return Ok(Some(action));
794        }
795        match read_file_bytes(&path)? {
796            Some(data) => {
797                trace!(size = data.as_slice().len(), "Action data read");
798                let decoded = if is_compressed(data.as_slice()) {
799                    decompress(data.as_slice())?
800                } else {
801                    data.into_vec()
802                };
803                let action = validate_loaded_action(id, rmp_serde::from_slice(&decoded)?)?;
804                Ok(Some(action))
805            }
806            None => {
807                trace!("Action not found");
808                Ok(None)
809            }
810        }
811    }
812
813    #[instrument(skip(self, action))]
814    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
815        let id = action.id();
816        let path = action_path(&self.root, &id);
817
818        if !path.exists() {
819            let serialized = rmp_serde::to_vec(action)?;
820            let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
821            trace!(id = %id, compressed_size = data.len(), "Writing action");
822            self.write_loose_object_atomic(&path, &data)?;
823        }
824
825        Ok(id)
826    }
827
828    #[instrument(skip(self))]
829    fn list_actions(&self) -> Result<Vec<ActionId>> {
830        let dir = actions_dir(&self.root);
831        let mut actions = Vec::new();
832        if dir.exists() {
833            for entry in fs::read_dir(&dir)? {
834                let entry = entry?;
835                let path = entry.path();
836                if let Some(name) = path.file_stem()
837                    && let Some(name_str) = name.to_str()
838                    && let Ok(hash) = ContentHash::from_hex(name_str)
839                {
840                    actions.push(ActionId::from_hash(hash));
841                }
842            }
843        }
844        if let Ok(manager) = self.pack_manager().read() {
845            for id in manager.list_all_ids()? {
846                if let PackObjectId::Hash(hash) = id
847                    && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
848                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
849                    && obj_type == ObjectType::Action
850                {
851                    actions.push(ActionId::from_hash(hash));
852                }
853            }
854        }
855        debug!(count = actions.len(), "Listed actions");
856        Ok(actions)
857    }
858
859    #[instrument(skip(self))]
860    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
861        let dir = blobs_dir(&self.root);
862        let mut blobs = list_hashes_from_dir(&dir)?;
863        if let Ok(manager) = self.pack_manager().read() {
864            for id in manager.list_all_ids()? {
865                if let PackObjectId::Hash(hash) = id
866                    && !blobs.contains(&hash)
867                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
868                    && obj_type == ObjectType::Blob
869                {
870                    blobs.push(hash);
871                }
872            }
873        }
874        Ok(blobs)
875    }
876
877    #[instrument(skip(self))]
878    fn list_trees(&self) -> Result<Vec<ContentHash>> {
879        let dir = trees_dir(&self.root);
880        let mut trees = list_hashes_from_dir(&dir)?;
881        if let Ok(manager) = self.pack_manager().read() {
882            for id in manager.list_all_ids()? {
883                if let PackObjectId::Hash(hash) = id
884                    && !trees.contains(&hash)
885                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
886                    && obj_type == ObjectType::Tree
887                {
888                    trees.push(hash);
889                }
890            }
891        }
892        Ok(trees)
893    }
894
895    #[instrument(skip(self))]
896    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
897        self.pack_objects_impl(aggressive)
898    }
899
900    #[instrument(skip(self), fields(id = ?id))]
901    fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
902        if let Ok(manager) = self.pack_manager().read()
903            && let Some((obj_type, data)) = manager.get_object(id)?
904        {
905            return Ok(Some((obj_type, data)));
906        }
907
908        match id {
909            PackObjectId::Hash(hash) => {
910                if let Some(blob) = self.get_blob(hash)? {
911                    return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
912                }
913                if let Some(tree) = self.get_tree(hash)? {
914                    return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
915                }
916                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
917                    return Ok(Some((
918                        ObjectType::Action,
919                        rmp_serde::to_vec_named(&action)?,
920                    )));
921                }
922                Ok(None)
923            }
924            PackObjectId::ChangeId(change_id) => {
925                if let Some(state) = self.get_state(change_id)? {
926                    Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
927                } else {
928                    Ok(None)
929                }
930            }
931        }
932    }
933
934    #[instrument(skip(self, pack_data, index_data))]
935    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
936        let reader = crate::store::pack::PackReader::from_slice(pack_data, index_data)?;
937        let ids = validate_and_list_pack(&reader)?;
938        self.install_pack_files(pack_data, index_data)?;
939        self.clear_recent_object_caches();
940        Ok(ids)
941    }
942
943    #[instrument(skip(self, blobs), fields(count = blobs.len()))]
944    fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
945        self.put_blobs_packed_impl(blobs)
946    }
947
948    #[instrument(skip(self))]
949    fn install_pack_streaming(
950        &self,
951        pack_path: &std::path::Path,
952        index_path: &std::path::Path,
953    ) -> Result<Vec<PackObjectId>> {
954        // Validate + list ids through the same core as the byte-buffer
955        // seam, but via an mmap-backed reader so the pack is never
956        // copied into the heap — the memory-bounded promise survives.
957        // Drop the reader (releasing the mmap) before the rename so
958        // the file move isn't racing an open mapping.
959        let ids = {
960            let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
961            validate_and_list_pack(&reader)?
962        };
963        self.install_pack_files_streaming(pack_path, index_path)?;
964        Ok(ids)
965    }
966
967    #[instrument(skip(self))]
968    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
969        self.prune_loose_objects_impl()
970    }
971
972    #[instrument(skip(self))]
973    fn begin_snapshot_write_batch(&self) -> Result<()> {
974        self.begin_snapshot_write_batch_impl()
975    }
976
977    #[instrument(skip(self))]
978    fn flush_snapshot_write_batch(&self) -> Result<()> {
979        self.flush_snapshot_write_batch_impl()
980    }
981
982    #[instrument(skip(self))]
983    fn abort_snapshot_write_batch(&self) {
984        self.abort_snapshot_write_batch_impl();
985    }
986
987    fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
988        Ok(redaction_path(&self.root, blob).exists())
989    }
990
991    fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
992        let path = redaction_path(&self.root, blob);
993        match fs::read(&path) {
994            Ok(bytes) => Ok(Some(bytes)),
995            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
996            Err(err) => Err(HeddleError::Io(err)),
997        }
998    }
999
1000    fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
1001        let dir = redactions_dir(&self.root);
1002        if !dir.exists() {
1003            fs::create_dir_all(&dir)?;
1004        }
1005        let path = redaction_path(&self.root, blob);
1006        crate::fs_atomic::write_file_atomic(&path, bytes)?;
1007        Ok(())
1008    }
1009
1010    fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
1011        let dir = redactions_dir(&self.root);
1012        if !dir.exists() {
1013            return Ok(Vec::new());
1014        }
1015        let mut out = Vec::new();
1016        for entry in fs::read_dir(&dir)? {
1017            let entry = entry?;
1018            let path = entry.path();
1019            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1020                continue;
1021            }
1022            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1023                continue;
1024            };
1025            if let Ok(hash) = ContentHash::from_hex(stem) {
1026                out.push(hash);
1027            }
1028        }
1029        Ok(out)
1030    }
1031
1032    fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
1033        Ok(state_visibility_path(&self.root, state).exists())
1034    }
1035
1036    fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
1037        let path = state_visibility_path(&self.root, state);
1038        match fs::read(&path) {
1039            Ok(bytes) => Ok(Some(bytes)),
1040            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1041            Err(err) => Err(HeddleError::Io(err)),
1042        }
1043    }
1044
1045    fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
1046        let dir = state_visibility_dir(&self.root);
1047        if !dir.exists() {
1048            fs::create_dir_all(&dir)?;
1049        }
1050        let path = state_visibility_path(&self.root, state);
1051        crate::fs_atomic::write_file_atomic(&path, bytes)?;
1052        Ok(())
1053    }
1054
1055    fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
1056        let dir = state_visibility_dir(&self.root);
1057        if !dir.exists() {
1058            return Ok(Vec::new());
1059        }
1060        let mut out = Vec::new();
1061        for entry in fs::read_dir(&dir)? {
1062            let entry = entry?;
1063            let path = entry.path();
1064            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1065                continue;
1066            }
1067            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1068                continue;
1069            };
1070            if let Ok(state) = ChangeId::parse(stem) {
1071                out.push(state);
1072            }
1073        }
1074        Ok(out)
1075    }
1076}