Skip to main content

objects/store/fs/
fs_impl.rs

1// SPDX-License-Identifier: Apache-2.0
2//! ObjectStore implementation for FsStore.
3
4use std::{
5    fs,
6    path::{Path, PathBuf},
7};
8
9use heddle_format::compression::{header_uncompressed_size, is_compressed};
10use tracing::{debug, instrument, trace};
11
12use super::{
13    FsStore,
14    fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
15    fs_paths::{
16        action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
17        state_visibility_dir, state_visibility_path, states_dir, trees_dir,
18    },
19};
20use crate::{
21    object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
22    store::{
23        HeddleError, ObjectStore, Result, codec,
24        pack::{ObjectType, PackManager, PackObjectId},
25    },
26};
27
28/// Bytes we read off disk to recover a blob's uncompressed size.
29/// Must cover the 9-byte modern header **plus** the 4-byte ZSTD
30/// magic that `header_uncompressed_size` uses to disambiguate
31/// modern from legacy (5-byte) headers — without the magic in the
32/// peek buffer the lookup silently returns the on-disk byte length
33/// instead of the recorded uncompressed size, which left `stat`
34/// reporting the compressed size of every loose blob.
35const BLOB_HEADER_PEEK: usize = 13;
36
37fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
38    tree.validate()?;
39    Ok(tree)
40}
41
42fn validate_blob_bytes(data: &[u8], hash: ContentHash) -> Result<()> {
43    let mut hasher = ContentHash::typed_hasher("blob", data.len() as u64);
44    hasher.update(data);
45    let found = ContentHash::from_bytes(hasher.finalize().into());
46    if found != hash {
47        return Err(HeddleError::Corruption {
48            expected: hash,
49            found,
50        });
51    }
52
53    Ok(())
54}
55
56fn validate_tree_serialized(data: &[u8], hash: ContentHash) -> Result<Tree> {
57    let tree = codec::decode_tree_serialized(data)?;
58    let tree = validate_loaded_tree(tree)?;
59    let found = tree.hash();
60    if found != hash {
61        return Err(HeddleError::Corruption {
62            expected: hash,
63            found,
64        });
65    }
66
67    Ok(tree)
68}
69
70fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
71    if state.change_id != *requested_id {
72        return Err(HeddleError::InvalidObject(format!(
73            "state change_id mismatch: requested {}, found {}",
74            requested_id, state.change_id
75        )));
76    }
77
78    Ok(state)
79}
80
81fn validate_state_serialized(data: &[u8], id: ChangeId) -> Result<State> {
82    let state: State = rmp_serde::from_slice(data)?;
83    validate_loaded_state(&id, state)
84}
85
86fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
87    let found_id = action.compute_id();
88    if found_id != *requested_id {
89        return Err(HeddleError::InvalidObject(format!(
90            "action id mismatch: requested {}, found {}",
91            requested_id, found_id
92        )));
93    }
94
95    Ok(action)
96}
97
98fn validate_action_serialized(data: &[u8], id: ActionId) -> Result<Action> {
99    let action: Action = rmp_serde::from_slice(data)?;
100    validate_loaded_action(&id, action)
101}
102
103/// Validate every entry in a pack against its tagged id (checksum
104/// validation) and return the installed id list. This is the shared
105/// validated core for both install seams: the byte-buffer install
106/// (`install_pack`) and the memory-bounded temp-file install
107/// (`install_pack_streaming`) both run their pack through here, so
108/// both apply the same checksum validation and report the same
109/// installed ids regardless of how the bytes reach the store.
110fn validate_and_list_pack(reader: &crate::store::pack::PackReader) -> Result<Vec<PackObjectId>> {
111    let ids = reader.list_ids();
112    for id in &ids {
113        let Some((obj_type, data)) = reader.get_object_bytes(id)? else {
114            continue;
115        };
116        validate_pack_entry(id, obj_type, data.as_ref())?;
117    }
118    Ok(ids)
119}
120
121fn state_entries_from_pack(
122    reader: &crate::store::pack::PackReader,
123    ids: &[PackObjectId],
124) -> Result<Vec<(ChangeId, Vec<u8>)>> {
125    let mut states = Vec::new();
126    for id in ids {
127        let PackObjectId::ChangeId(change_id) = id else {
128            continue;
129        };
130        let Some((obj_type, data)) = reader.get_object(id)? else {
131            continue;
132        };
133        if obj_type != ObjectType::State {
134            return Err(HeddleError::InvalidObject(format!(
135                "pack id {} is indexed as {:?}, expected State",
136                change_id.to_string_full(),
137                obj_type
138            )));
139        }
140        validate_state_serialized(&data, *change_id)?;
141        states.push((*change_id, data));
142    }
143    Ok(states)
144}
145
146fn validate_pack_entry(id: &PackObjectId, obj_type: ObjectType, data: &[u8]) -> Result<()> {
147    match (id, obj_type) {
148        (PackObjectId::Hash(hash), ObjectType::Blob) => validate_blob_bytes(data, *hash),
149        (PackObjectId::Hash(hash), ObjectType::Tree) => {
150            validate_tree_serialized(data, *hash).map(|_| ())
151        }
152        (PackObjectId::Hash(hash), ObjectType::Action) => {
153            validate_action_serialized(data, ActionId::from_hash(*hash)).map(|_| ())
154        }
155        (PackObjectId::ChangeId(change_id), ObjectType::State) => {
156            validate_state_serialized(data, *change_id).map(|_| ())
157        }
158        _ => Err(HeddleError::InvalidObject(format!(
159            "unsupported native pack object: {:?} {:?}",
160            id, obj_type
161        ))),
162    }
163}
164
165impl FsStore {
166    /// Single-pass blob lookup. The wrapper in `ObjectStore::get_blob`
167    /// retries this once after a stale-reload on miss.
168    fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
169        let path = hash_path(&blobs_dir(&self.root), hash);
170        let loose_exists = path.exists();
171        let pack_has = if loose_exists {
172            false
173        } else if let Ok(manager) = self.pack_manager().read() {
174            manager.has_object(hash)
175        } else {
176            false
177        };
178        if (loose_exists || pack_has)
179            && let Ok(cache) = self.recent_blobs.read()
180            && let Some(blob) = cache.get(hash)
181        {
182            trace!("Found blob in recent object cache");
183            return Ok(Some(blob.clone()));
184        }
185
186        if let Ok(manager) = self.pack_manager().read()
187            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
188            && obj_type == ObjectType::Blob
189        {
190            trace!("Found blob in packfile");
191            // Step 2: skip the BLAKE3 re-hash. The pack reader already
192            // located this entry by its content-addressed key in the
193            // pack index — anything served here either matches or
194            // means the pack itself is corrupted in ways a per-read
195            // hash check can't recover from cleanly. For multi-MB
196            // blobs the verify was the dominant tail of the cold
197            // read (~3GB/s × 10MB ≈ 3.3ms per call).
198            return Ok(Some(Blob::new(data)));
199        }
200
201        match read_file_bytes(&path)? {
202            Some(data) => {
203                trace!(size = data.as_slice().len(), "Blob data read");
204                let content = codec::decode_blob_content(data.as_slice())?;
205                let blob = Blob::new(content);
206                // Loose blobs are bare bytes on disk: a half-written
207                // file or bit-rot inside the payload would slip past
208                // the path-is-the-hash invariant. Keep the verify on
209                // this path. Pack-resident reads above skip it because
210                // pack entries are framed with offset + length records
211                // that fail to parse if the pack is corrupt.
212                if blob.hash() != *hash {
213                    return Err(HeddleError::Corruption {
214                        expected: *hash,
215                        found: blob.hash(),
216                    });
217                }
218                Ok(Some(blob))
219            }
220            None => Ok(None),
221        }
222    }
223
224    /// Shared body for `try_has_{blob,tree,state}_once`: object is
225    /// present iff the loose path exists or the pack manager
226    /// resolves it. Callers pass the loose path and the
227    /// pack-manager probe; the helper handles the lock.
228    fn loose_or_packed(
229        &self,
230        loose_path: &Path,
231        in_pack: impl FnOnce(&PackManager) -> bool,
232    ) -> Result<bool> {
233        if loose_path.exists() {
234            return Ok(true);
235        }
236        if let Ok(manager) = self.pack_manager().read() {
237            return Ok(in_pack(&manager));
238        }
239        Ok(false)
240    }
241
242    fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
243        let path = hash_path(&blobs_dir(&self.root), hash);
244        self.loose_or_packed(&path, |m| m.has_object(hash))
245    }
246
247    /// Header-only size lookup for a single attempt. Tries:
248    /// 1. The recent-blob cache (we already have the bytes in
249    ///    memory — `len()` is free).
250    /// 2. The loose blob: peek the 9-byte compression header. For a
251    ///    compressed blob the recorded uncompressed size lives in the
252    ///    header. For an uncompressed blob (no recognised header) the
253    ///    on-disk file length IS the blob size.
254    /// 3. Any loaded pack: the pack format records the uncompressed
255    ///    size as a varint right after the tagged id, so we can decode
256    ///    it without touching the body.
257    ///
258    /// Cost: one short read (typically 9 bytes) for loose blobs, or a
259    /// pure in-memory varint decode for packed blobs. *No*
260    /// decompression.
261    fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
262        if let Ok(cache) = self.recent_blobs.read()
263            && let Some(blob) = cache.get(hash)
264        {
265            return Ok(Some(blob.content().len() as u64));
266        }
267
268        let path = hash_path(&blobs_dir(&self.root), hash);
269        if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
270            if let Some(size) = header_uncompressed_size(&header) {
271                return Ok(Some(size));
272            }
273            // No recognised compression header — the file is raw
274            // blob bytes. The on-disk length is the blob size.
275            return Ok(Some(file_len));
276        }
277
278        if let Ok(manager) = self.pack_manager().read()
279            && let Some(size) = manager.get_hashed_object_size(hash)?
280        {
281            return Ok(Some(size));
282        }
283        Ok(None)
284    }
285
286    fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
287        // Cache first. The recent-object cache only ever holds trees we
288        // wrote or read this process, so a hit is authoritative for a
289        // read.
290        if let Ok(cache) = self.recent_trees.read()
291            && let Some(tree) = cache.get(hash)
292        {
293            trace!("Found tree in recent object cache");
294            return Ok(Some(tree.clone()));
295        }
296
297        // Loose trees may be migration-promoted V2 shadows of an older packed
298        // V1 encoding at the same semantic tree hash. Prefer the loose copy
299        // when it exists, then fall through to pack lookup.
300        let path = hash_path(&trees_dir(&self.root), hash);
301        if path.exists()
302            && let Some(data) = read_file_bytes(&path)?
303        {
304            trace!(size = data.as_slice().len(), "Tree data read");
305            let tree = validate_loaded_tree(codec::decode_tree(data.as_slice())?)?;
306            if tree.hash() != *hash {
307                return Err(HeddleError::Corruption {
308                    expected: *hash,
309                    found: tree.hash(),
310                });
311            }
312            if let Ok(mut cache) = self.recent_trees.write() {
313                cache.insert(*hash, tree.clone());
314            }
315            return Ok(Some(tree));
316        }
317
318        if let Ok(manager) = self.pack_manager().read()
319            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
320            && obj_type == ObjectType::Tree
321        {
322            trace!("Found tree in packfile");
323            let tree = validate_loaded_tree(codec::decode_tree_serialized(&data)?)?;
324            if tree.hash() != *hash {
325                return Err(HeddleError::Corruption {
326                    expected: *hash,
327                    found: tree.hash(),
328                });
329            }
330            return Ok(Some(tree));
331        }
332        Ok(None)
333    }
334
335    fn try_get_tree_serialized_once(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
336        let path = hash_path(&trees_dir(&self.root), hash);
337        if path.exists()
338            && let Some(data) = read_file_bytes(&path)?
339        {
340            return Ok(Some(codec::decode_tree_body(data.as_slice())?));
341        }
342
343        if let Ok(manager) = self.pack_manager().read()
344            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
345            && obj_type == ObjectType::Tree
346        {
347            return Ok(Some(data));
348        }
349
350        Ok(None)
351    }
352
353    fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
354        let path = hash_path(&trees_dir(&self.root), hash);
355        self.loose_or_packed(&path, |m| m.has_object(hash))
356    }
357
358    fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
359        let path = state_path(&self.root, id);
360        let loose_exists = path.exists();
361        let pack_has = if loose_exists {
362            false
363        } else if let Ok(manager) = self.pack_manager().read() {
364            manager.has_object_id(&PackObjectId::ChangeId(*id))
365        } else {
366            false
367        };
368        if (loose_exists || pack_has)
369            && let Ok(cache) = self.recent_states.read()
370            && let Some(state) = cache.get(id)
371        {
372            trace!("Found state in recent object cache");
373            return Ok(Some(state.clone()));
374        }
375
376        // States are addressed by `change_id`, NOT by content hash, so the same
377        // id can resolve to two different bodies: a copy packed at adopt/import
378        // time and a newer LOOSE copy written by an authorized rewrite (the #570
379        // fidelity backfill re-hashes + rewrites adopted states loose). The
380        // loose object MUST shadow the packed one — read it FIRST and only
381        // consult the pack when no loose object exists, or a cold read (cache
382        // miss) returns the stale packed body. (Trees/blobs are
383        // content-addressed and can't go stale this way, so their read paths
384        // deliberately keep pack-first ordering.)
385        if loose_exists && let Some(data) = read_file_bytes(&path)? {
386            trace!(
387                size = data.as_slice().len(),
388                "State read from loose object (shadows any packed copy)"
389            );
390            let state = validate_loaded_state(id, codec::decode_state(data.as_slice())?)?;
391            if let Ok(mut cache) = self.recent_states.write() {
392                cache.insert(*id, state.clone());
393            }
394            return Ok(Some(state));
395        }
396
397        if let Ok(manager) = self.pack_manager().read()
398            && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
399            && obj_type == ObjectType::State
400        {
401            trace!("Found state in packfile");
402            let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
403            if let Ok(mut cache) = self.recent_states.write() {
404                cache.insert(*id, state.clone());
405            }
406            return Ok(Some(state));
407        }
408
409        Ok(None)
410    }
411
412    fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
413        let path = state_path(&self.root, id);
414        self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
415    }
416}
417
418impl ObjectStore for FsStore {
419    fn clear_recent_caches(&self) {
420        self.clear_recent_object_caches();
421    }
422
423    /// Zero-copy pack fast path. When the blob lives in a packfile
424    /// and is non-delta + uncompressed, returns a `Bytes::slice`
425    /// view of the pack's mmap — no decompression, no allocation,
426    /// no memcpy. Compressed pack entries, delta entries, and
427    /// loose blobs fall back to `get_blob` and wrap the result in a
428    /// `Bytes` (the `Vec` → `Bytes` conversion is itself zero-copy).
429    fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
430        if let Ok(manager) = self.pack_manager().read()
431            && let Some((obj_type, data)) = manager.get_hashed_object_bytes(hash)?
432            && obj_type == crate::store::pack::ObjectType::Blob
433        {
434            return Ok(Some(data));
435        }
436        Ok(self
437            .get_blob(hash)?
438            .map(|blob| bytes::Bytes::from(blob.into_content())))
439    }
440
441    #[instrument(skip(self), fields(hash = %hash.short()))]
442    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
443        if let Some(blob) = self.try_get_blob_once(hash)? {
444            return Ok(Some(blob));
445        }
446        // Miss path: a sibling FsStore (e.g. the worktree's repo
447        // backing the same `.heddle/`) may have installed a new pack
448        // since we loaded ours. Cheap disk-count check first; full
449        // reload only when the count grew.
450        if self.reload_packs_if_stale()?
451            && let Some(blob) = self.try_get_blob_once(hash)?
452        {
453            return Ok(Some(blob));
454        }
455        trace!("Blob not found");
456        Ok(None)
457    }
458
459    #[instrument(skip(self, blob), fields(size = blob.content().len()))]
460    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
461        let hash = blob.hash();
462        let path = hash_path(&blobs_dir(&self.root), &hash);
463
464        if !path.exists() {
465            let data = codec::encode_blob_content(blob.content(), &self.compression)?;
466            trace!(compressed_size = data.len(), "Writing blob");
467            self.write_loose_object_atomic(&path, &data)?;
468        } else {
469            trace!("Blob already exists, skipping write");
470        }
471        if let Ok(mut cache) = self.recent_blobs.write() {
472            cache.insert(hash, blob.clone());
473        }
474
475        Ok(hash)
476    }
477
478    #[instrument(skip(self, blob), fields(hash = %hash.short()))]
479    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
480        if blob.hash() != hash {
481            return Err(HeddleError::Corruption {
482                expected: hash,
483                found: blob.hash(),
484            });
485        }
486
487        let path = hash_path(&blobs_dir(&self.root), &hash);
488
489        if !path.exists() {
490            let data = codec::encode_blob_content(blob.content(), &self.compression)?;
491            trace!(
492                compressed_size = data.len(),
493                "Writing blob with precomputed hash"
494            );
495            self.write_loose_object_atomic(&path, &data)?;
496        }
497        if let Ok(mut cache) = self.recent_blobs.write() {
498            cache.insert(hash, blob.clone());
499        }
500
501        Ok(hash)
502    }
503
504    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
505    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
506        validate_blob_bytes(data, hash)?;
507
508        let path = hash_path(&blobs_dir(&self.root), &hash);
509        if !path.exists() {
510            trace!(
511                size = data.len(),
512                "Writing raw blob bytes with precomputed hash"
513            );
514            self.write_loose_object_atomic(&path, data)?;
515        }
516        if let Ok(mut cache) = self.recent_blobs.write() {
517            cache.insert(hash, Blob::from_slice(data));
518        }
519
520        Ok(hash)
521    }
522
523    #[instrument(skip(self), fields(hash = %hash.short()))]
524    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
525        if self.try_has_blob_once(hash)? {
526            return Ok(true);
527        }
528        if self.reload_packs_if_stale()? {
529            return self.try_has_blob_once(hash);
530        }
531        Ok(false)
532    }
533
534    /// Loose blob path safe for clonefile/copy materialization.
535    ///
536    /// Returns `Some(path)` only when the loose file exists, is
537    /// stored uncompressed, *and* its bytes hash to the expected
538    /// content hash. Compressed blobs and pack-only blobs fall
539    /// through to `None`; so do *torn* cache-mirror files (the
540    /// `AtomicWriteMode::NoSync` write side may leave one if the
541    /// host crashed during a previous promote). On the torn case
542    /// the caller re-promotes from the authoritative pack copy.
543    ///
544    /// Verification is amortised: a hash that passes the check once
545    /// in this process is recorded in `verified_loose_blobs` and
546    /// subsequent calls skip the read+hash. So the cost on the
547    /// materialize hot path is at most one BLAKE3 over each unique
548    /// blob per process lifetime — negligible for tiny blobs,
549    /// bounded by working-set size for huge ones.
550    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
551        let path = hash_path(&blobs_dir(&self.root), hash);
552        // Fast path: this process already verified (or wrote) this
553        // hash's loose mirror in `promote_to_loose_uncompressed`.
554        // Trust without re-hashing — `path.exists()` is the only
555        // I/O we need.
556        if let Ok(verified) = self.verified_loose_blobs.read()
557            && verified.get(hash).is_some()
558            && path.exists()
559        {
560            return Some(path);
561        }
562
563        // First-time-this-process check: peek the header to filter
564        // out compressed-loose files cheaply, then verify the
565        // body's hash matches what the caller expects. A torn-write
566        // (post-crash) cache mirror fails this and the caller
567        // re-promotes from the pack.
568        //
569        // Header peek must cover the 9-byte modern header **plus**
570        // the 4-byte ZSTD magic that `is_compressed` checks —
571        // peeking only 9 bytes makes `is_compressed` falsely
572        // return `false` on a properly-compressed blob, and we'd
573        // hand the caller the compressed file path. Same off-by-4
574        // we fixed in `BLOB_HEADER_PEEK`.
575        let (header, _) = read_file_header(&path, BLOB_HEADER_PEEK).ok().flatten()?;
576        if is_compressed(&header) {
577            return None;
578        }
579        let bytes = read_file_bytes(&path).ok().flatten()?;
580        let actual = ContentHash::compute_typed("blob", bytes.as_slice());
581        if actual != *hash {
582            // Torn write or unrelated corruption. Leave the file on
583            // disk; the caller's `promote_to_loose_uncompressed`
584            // will overwrite it via the standard temp+rename path.
585            return None;
586        }
587        if let Ok(mut verified) = self.verified_loose_blobs.write() {
588            verified.insert(*hash, ());
589        }
590        Some(path)
591    }
592
593    /// Promote a blob to its uncompressed-loose canonical path so
594    /// `loose_blob_path` returns `Some(path)` and hardlink-first
595    /// materialization fires.
596    ///
597    /// Three cases:
598    /// 1. Already loose+uncompressed: peek the header, no-op.
599    /// 2. Loose but compressed: read+decompress, atomically rewrite
600    ///    the canonical path with raw bytes.
601    /// 3. Pack-only: read out of the pack via `get_blob`, atomically
602    ///    write to the canonical loose path. Pack copy is left in
603    ///    place — the next prune cycle will discard the loose mirror
604    ///    and a future materialize will re-promote.
605    #[instrument(skip(self), fields(hash = %hash.short()))]
606    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
607        let path = hash_path(&blobs_dir(&self.root), hash);
608
609        // Idempotent fast path: already loose AND uncompressed.
610        if let Some((header, _)) = read_file_header(&path, 9)?
611            && !is_compressed(&header)
612        {
613            trace!("Blob already loose+uncompressed; skipping promotion");
614            return Ok(false);
615        }
616
617        // Either compressed-loose or pack-only. Reading via
618        // `get_blob` covers both: compressed-loose decompresses on
619        // the way out, pack-only reads from the loaded pack manager.
620        let blob = self.get_blob(hash)?.ok_or_else(|| {
621            HeddleError::NotFound(format!(
622                "blob {} not found in store; cannot promote to loose-uncompressed",
623                hash
624            ))
625        })?;
626
627        // Install the uncompressed bytes at the canonical loose path
628        // via the cache-mirror atomic-write variant: no fsync, just
629        // temp+rename. The fsync skip is what makes promotion fast
630        // (measured: ~5 ms/blob with `sync_data` vs ~0.2 ms without
631        // on macOS APFS); the safety comes from the read-side hash
632        // check in `loose_blob_path`. A torn write after a crash
633        // produces a file whose content hash doesn't match, so the
634        // next reader rejects it and re-promotes from the pack.
635        //
636        // Record the hash in this process's verified-blobs cache:
637        // we just wrote the bytes ourselves, so the subsequent read
638        // path can trust them without re-hashing.
639        debug!(
640            size = blob.content().len(),
641            "Promoting blob to loose-uncompressed canonical store"
642        );
643        self.write_loose_object_cache(&path, blob.content())?;
644        if let Ok(mut verified) = self.verified_loose_blobs.write() {
645            verified.insert(*hash, ());
646        }
647        Ok(true)
648    }
649
650    #[instrument(skip(self), fields(hash = %hash.short()))]
651    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
652        if let Some(size) = self.try_get_blob_size_once(hash)? {
653            return Ok(Some(size));
654        }
655        // Sibling-store recovery, mirroring the read path: if a
656        // concurrent writer just installed a pack we don't know about,
657        // reload and retry once before reporting a miss.
658        if self.reload_packs_if_stale()?
659            && let Some(size) = self.try_get_blob_size_once(hash)?
660        {
661            return Ok(Some(size));
662        }
663        Ok(None)
664    }
665
666    #[instrument(skip(self), fields(hash = %hash.short()))]
667    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
668        if let Some(tree) = self.try_get_tree_once(hash)? {
669            return Ok(Some(tree));
670        }
671        if self.reload_packs_if_stale()?
672            && let Some(tree) = self.try_get_tree_once(hash)?
673        {
674            return Ok(Some(tree));
675        }
676        trace!("Tree not found");
677        Ok(None)
678    }
679
680    #[instrument(skip(self), fields(hash = %hash.short()))]
681    fn get_tree_serialized(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
682        if let Some(data) = self.try_get_tree_serialized_once(hash)? {
683            return Ok(Some(data));
684        }
685        if self.reload_packs_if_stale()?
686            && let Some(data) = self.try_get_tree_serialized_once(hash)?
687        {
688            return Ok(Some(data));
689        }
690        Ok(None)
691    }
692
693    #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
694    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
695        let hash = tree.hash();
696        let path = hash_path(&trees_dir(&self.root), &hash);
697
698        if !path.exists() {
699            let (_, data) = codec::encode_tree(tree, &self.compression)?;
700            trace!(compressed_size = data.len(), "Writing tree");
701            self.write_loose_object_atomic(&path, &data)?;
702        } else {
703            trace!("Tree already exists, skipping write");
704        }
705        if let Ok(mut cache) = self.recent_trees.write() {
706            cache.insert(hash, tree.clone());
707        }
708
709        Ok(hash)
710    }
711
712    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
713    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
714        let tree = validate_tree_serialized(data, hash)?;
715
716        let path = hash_path(&trees_dir(&self.root), &hash);
717        let should_write = match read_file_bytes(&path)? {
718            Some(existing) => codec::decode_tree_body(existing.as_slice())? != data,
719            None => true,
720        };
721        if should_write {
722            trace!(size = data.len(), "Writing raw serialized tree");
723            self.write_loose_object_atomic(&path, data)?;
724        }
725        if let Ok(mut cache) = self.recent_trees.write() {
726            cache.insert(hash, tree);
727        }
728
729        Ok(hash)
730    }
731
732    #[instrument(skip(self), fields(hash = %hash.short()))]
733    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
734        if self.try_has_tree_once(hash)? {
735            return Ok(true);
736        }
737        if self.reload_packs_if_stale()? {
738            return self.try_has_tree_once(hash);
739        }
740        Ok(false)
741    }
742
743    #[instrument(skip(self), fields(id = %id.short()))]
744    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
745        if let Some(state) = self.try_get_state_once(id)? {
746            return Ok(Some(state));
747        }
748        if self.reload_packs_if_stale()?
749            && let Some(state) = self.try_get_state_once(id)?
750        {
751            return Ok(Some(state));
752        }
753        trace!("State not found");
754        Ok(None)
755    }
756
757    #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
758    fn put_state(&self, state: &State) -> Result<()> {
759        let path = state_path(&self.root, &state.change_id);
760        let data = codec::encode_state(state, &self.compression)?;
761        trace!(compressed_size = data.len(), "Writing state");
762        self.write_loose_object_atomic(&path, &data)?;
763        if let Ok(mut cache) = self.recent_states.write() {
764            cache.insert(state.change_id, state.clone());
765        }
766        Ok(())
767    }
768
769    #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
770    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
771        let state = validate_state_serialized(data, id)?;
772        let path = state_path(&self.root, &id);
773        trace!(size = data.len(), "Writing raw serialized state");
774        self.write_loose_object_atomic(&path, data)?;
775        if let Ok(mut cache) = self.recent_states.write() {
776            cache.insert(id, state);
777        }
778        Ok(())
779    }
780
781    #[instrument(skip(self), fields(id = %id.short()))]
782    fn has_state(&self, id: &ChangeId) -> Result<bool> {
783        if self.try_has_state_once(id)? {
784            return Ok(true);
785        }
786        if self.reload_packs_if_stale()? {
787            return self.try_has_state_once(id);
788        }
789        Ok(false)
790    }
791
792    #[instrument(skip(self))]
793    fn list_states(&self) -> Result<Vec<ChangeId>> {
794        self.reload_packs_if_stale()?;
795
796        let dir = states_dir(&self.root);
797        if !dir.exists() {
798            return Ok(Vec::new());
799        }
800
801        let mut states = Vec::new();
802        for entry in fs::read_dir(&dir)? {
803            let entry = entry?;
804            let path = entry.path();
805            if let Some(name) = path.file_stem()
806                && let Some(name_str) = name.to_str()
807                && let Ok(id) = ChangeId::parse(name_str)
808            {
809                states.push(id);
810            }
811        }
812        if let Ok(manager) = self.pack_manager().read() {
813            for id in manager.list_all_ids()? {
814                if let PackObjectId::ChangeId(change_id) = id
815                    && !states.contains(&change_id)
816                {
817                    states.push(change_id);
818                }
819            }
820        }
821        debug!(count = states.len(), "Listed states");
822        Ok(states)
823    }
824
825    #[instrument(skip(self), fields(id = %id))]
826    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
827        let path = action_path(&self.root, id);
828        if !path.exists()
829            && let Ok(manager) = self.pack_manager().read()
830            && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
831            && obj_type == ObjectType::Action
832        {
833            trace!("Found action in packfile");
834            let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
835            return Ok(Some(action));
836        }
837        match read_file_bytes(&path)? {
838            Some(data) => {
839                trace!(size = data.as_slice().len(), "Action data read");
840                let action = validate_loaded_action(id, codec::decode_action(data.as_slice())?)?;
841                Ok(Some(action))
842            }
843            None => {
844                trace!("Action not found");
845                Ok(None)
846            }
847        }
848    }
849
850    #[instrument(skip(self, action))]
851    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
852        let id = action.id();
853        let path = action_path(&self.root, &id);
854
855        if !path.exists() {
856            let (_, data) = codec::encode_action(action, &self.compression)?;
857            trace!(id = %id, compressed_size = data.len(), "Writing action");
858            self.write_loose_object_atomic(&path, &data)?;
859        }
860
861        Ok(id)
862    }
863
864    #[instrument(skip(self))]
865    fn list_actions(&self) -> Result<Vec<ActionId>> {
866        let dir = actions_dir(&self.root);
867        let mut actions = Vec::new();
868        if dir.exists() {
869            for entry in fs::read_dir(&dir)? {
870                let entry = entry?;
871                let path = entry.path();
872                if let Some(name) = path.file_stem()
873                    && let Some(name_str) = name.to_str()
874                    && let Ok(hash) = ContentHash::from_hex(name_str)
875                {
876                    actions.push(ActionId::from_hash(hash));
877                }
878            }
879        }
880        if let Ok(manager) = self.pack_manager().read() {
881            for id in manager.list_all_ids()? {
882                if let PackObjectId::Hash(hash) = id
883                    && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
884                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
885                    && obj_type == ObjectType::Action
886                {
887                    actions.push(ActionId::from_hash(hash));
888                }
889            }
890        }
891        debug!(count = actions.len(), "Listed actions");
892        Ok(actions)
893    }
894
895    #[instrument(skip(self))]
896    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
897        let dir = blobs_dir(&self.root);
898        let mut blobs = list_hashes_from_dir(&dir)?;
899        if let Ok(manager) = self.pack_manager().read() {
900            for id in manager.list_all_ids()? {
901                if let PackObjectId::Hash(hash) = id
902                    && !blobs.contains(&hash)
903                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
904                    && obj_type == ObjectType::Blob
905                {
906                    blobs.push(hash);
907                }
908            }
909        }
910        Ok(blobs)
911    }
912
913    #[instrument(skip(self))]
914    fn list_trees(&self) -> Result<Vec<ContentHash>> {
915        let dir = trees_dir(&self.root);
916        let mut trees = list_hashes_from_dir(&dir)?;
917        if let Ok(manager) = self.pack_manager().read() {
918            for id in manager.list_all_ids()? {
919                if let PackObjectId::Hash(hash) = id
920                    && !trees.contains(&hash)
921                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
922                    && obj_type == ObjectType::Tree
923                {
924                    trees.push(hash);
925                }
926            }
927        }
928        Ok(trees)
929    }
930
931    #[instrument(skip(self))]
932    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
933        self.pack_objects_impl(aggressive)
934    }
935
936    #[instrument(skip(self), fields(id = ?id))]
937    fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
938        if let Ok(manager) = self.pack_manager().read()
939            && let Some((obj_type, data)) = manager.get_object(id)?
940        {
941            return Ok(Some((obj_type, data)));
942        }
943
944        match id {
945            PackObjectId::Hash(hash) => {
946                if let Some(blob) = self.get_blob(hash)? {
947                    return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
948                }
949                if let Some(tree) = self.get_tree(hash)? {
950                    return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
951                }
952                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
953                    return Ok(Some((
954                        ObjectType::Action,
955                        rmp_serde::to_vec_named(&action)?,
956                    )));
957                }
958                Ok(None)
959            }
960            PackObjectId::ChangeId(change_id) => {
961                if let Some(state) = self.get_state(change_id)? {
962                    Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
963                } else {
964                    Ok(None)
965                }
966            }
967        }
968    }
969
970    #[instrument(skip(self, pack_data, index_data))]
971    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
972        let reader = crate::store::pack::PackReader::from_slice(pack_data, index_data)?;
973        let ids = validate_and_list_pack(&reader)?;
974        let state_entries = state_entries_from_pack(&reader, &ids)?;
975        self.install_pack_files(pack_data, index_data)?;
976        for (id, data) in state_entries {
977            self.put_state_serialized(&data, id)?;
978        }
979        self.clear_recent_object_caches();
980        Ok(ids)
981    }
982
983    #[instrument(skip(self, blobs), fields(count = blobs.len()))]
984    fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
985        self.put_blobs_packed_impl(blobs)
986    }
987
988    #[instrument(skip(self))]
989    fn install_pack_streaming(
990        &self,
991        pack_path: &std::path::Path,
992        index_path: &std::path::Path,
993    ) -> Result<Vec<PackObjectId>> {
994        // Validate + list ids through the same core as the byte-buffer
995        // seam, but via an mmap-backed reader so the pack is never
996        // copied into the heap — the memory-bounded promise survives.
997        // Drop the reader (releasing the mmap) before the rename so
998        // the file move isn't racing an open mapping.
999        let ids = {
1000            let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
1001            validate_and_list_pack(&reader)?
1002        };
1003        let state_entries = {
1004            let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
1005            state_entries_from_pack(&reader, &ids)?
1006        };
1007        self.install_pack_files_streaming(pack_path, index_path)?;
1008        for (id, data) in state_entries {
1009            self.put_state_serialized(&data, id)?;
1010        }
1011        Ok(ids)
1012    }
1013
1014    #[instrument(skip(self))]
1015    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
1016        self.prune_loose_objects_impl()
1017    }
1018
1019    #[instrument(skip(self))]
1020    fn begin_snapshot_write_batch(&self) -> Result<()> {
1021        self.begin_snapshot_write_batch_impl()
1022    }
1023
1024    #[instrument(skip(self))]
1025    fn flush_snapshot_write_batch(&self) -> Result<()> {
1026        self.flush_snapshot_write_batch_impl()
1027    }
1028
1029    #[instrument(skip(self))]
1030    fn abort_snapshot_write_batch(&self) {
1031        self.abort_snapshot_write_batch_impl();
1032    }
1033
1034    fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
1035        Ok(redaction_path(&self.root, blob).exists())
1036    }
1037
1038    fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
1039        let path = redaction_path(&self.root, blob);
1040        match fs::read(&path) {
1041            Ok(bytes) => Ok(Some(bytes)),
1042            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1043            Err(err) => Err(HeddleError::Io(err)),
1044        }
1045    }
1046
1047    fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
1048        let dir = redactions_dir(&self.root);
1049        if !dir.exists() {
1050            fs::create_dir_all(&dir)?;
1051        }
1052        let path = redaction_path(&self.root, blob);
1053        crate::fs_atomic::write_file_atomic(&path, bytes)?;
1054        Ok(())
1055    }
1056
1057    fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
1058        let dir = redactions_dir(&self.root);
1059        if !dir.exists() {
1060            return Ok(Vec::new());
1061        }
1062        let mut out = Vec::new();
1063        for entry in fs::read_dir(&dir)? {
1064            let entry = entry?;
1065            let path = entry.path();
1066            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1067                continue;
1068            }
1069            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1070                continue;
1071            };
1072            if let Ok(hash) = ContentHash::from_hex(stem) {
1073                out.push(hash);
1074            }
1075        }
1076        Ok(out)
1077    }
1078
1079    fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
1080        Ok(state_visibility_path(&self.root, state).exists())
1081    }
1082
1083    fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
1084        let path = state_visibility_path(&self.root, state);
1085        match fs::read(&path) {
1086            Ok(bytes) => Ok(Some(bytes)),
1087            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1088            Err(err) => Err(HeddleError::Io(err)),
1089        }
1090    }
1091
1092    fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
1093        let dir = state_visibility_dir(&self.root);
1094        if !dir.exists() {
1095            fs::create_dir_all(&dir)?;
1096        }
1097        let path = state_visibility_path(&self.root, state);
1098        crate::fs_atomic::write_file_atomic(&path, bytes)?;
1099        Ok(())
1100    }
1101
1102    fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
1103        let dir = state_visibility_dir(&self.root);
1104        if !dir.exists() {
1105            return Ok(Vec::new());
1106        }
1107        let mut out = Vec::new();
1108        for entry in fs::read_dir(&dir)? {
1109            let entry = entry?;
1110            let path = entry.path();
1111            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1112                continue;
1113            }
1114            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1115                continue;
1116            };
1117            if let Ok(state) = ChangeId::parse(stem) {
1118                out.push(state);
1119            }
1120        }
1121        Ok(out)
1122    }
1123}