Skip to main content

objects/store/fs/
fs_impl.rs

1// SPDX-License-Identifier: Apache-2.0
2//! ObjectStore implementation for FsStore.
3
4use std::{
5    fs,
6    path::{Path, PathBuf},
7};
8
9use tracing::{debug, instrument, trace};
10
11use super::{
12    FsStore,
13    fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
14    fs_paths::{
15        action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
16        states_dir, trees_dir,
17    },
18};
19use crate::{
20    object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
21    store::{
22        HeddleError, ObjectStore, Result,
23        compression::{compress, decompress, header_uncompressed_size, is_compressed},
24        pack::{ObjectType, PackManager, PackObjectId},
25    },
26};
27
28/// Bytes we read off disk to recover a blob's uncompressed size: the
29/// 9-byte compression header is enough for both modern and legacy
30/// (5-byte) headers — `header_uncompressed_size` picks the right
31/// width.
32const BLOB_HEADER_PEEK: usize = 9;
33
34fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
35    tree.validate()?;
36    Ok(tree)
37}
38
39fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
40    if state.change_id != *requested_id {
41        return Err(HeddleError::InvalidObject(format!(
42            "state change_id mismatch: requested {}, found {}",
43            requested_id, state.change_id
44        )));
45    }
46
47    Ok(state)
48}
49
50fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
51    let found_id = action.compute_id();
52    if found_id != *requested_id {
53        return Err(HeddleError::InvalidObject(format!(
54            "action id mismatch: requested {}, found {}",
55            requested_id, found_id
56        )));
57    }
58
59    Ok(action)
60}
61
62impl FsStore {
63    /// Single-pass blob lookup. The wrapper in `ObjectStore::get_blob`
64    /// retries this once after a stale-reload on miss.
65    fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
66        let path = hash_path(&blobs_dir(&self.root), hash);
67        let loose_exists = path.exists();
68        let pack_has = if loose_exists {
69            false
70        } else if let Ok(manager) = self.pack_manager().read() {
71            manager.has_object(hash)
72        } else {
73            false
74        };
75        if (loose_exists || pack_has)
76            && let Ok(cache) = self.recent_blobs.read()
77            && let Some(blob) = cache.get(hash)
78        {
79            trace!("Found blob in recent object cache");
80            return Ok(Some(blob.clone()));
81        }
82
83        if let Ok(manager) = self.pack_manager().read()
84            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
85            && obj_type == ObjectType::Blob
86        {
87            trace!("Found blob in packfile");
88            let blob = Blob::new(data);
89            if blob.hash() != *hash {
90                return Err(HeddleError::Corruption {
91                    expected: *hash,
92                    found: blob.hash(),
93                });
94            }
95            return Ok(Some(blob));
96        }
97
98        match read_file_bytes(&path)? {
99            Some(data) => {
100                trace!(size = data.as_slice().len(), "Blob data read");
101                let content = if is_compressed(data.as_slice()) {
102                    decompress(data.as_slice())?
103                } else {
104                    data.into_vec()
105                };
106                let blob = Blob::new(content);
107                if blob.hash() != *hash {
108                    return Err(HeddleError::Corruption {
109                        expected: *hash,
110                        found: blob.hash(),
111                    });
112                }
113                if let Ok(mut cache) = self.recent_blobs.write() {
114                    cache.insert(*hash, blob.clone());
115                }
116                Ok(Some(blob))
117            }
118            None => Ok(None),
119        }
120    }
121
122    /// Shared body for `try_has_{blob,tree,state}_once`: object is
123    /// present iff the loose path exists or the pack manager
124    /// resolves it. Callers pass the loose path and the
125    /// pack-manager probe; the helper handles the lock.
126    fn loose_or_packed(
127        &self,
128        loose_path: &Path,
129        in_pack: impl FnOnce(&PackManager) -> bool,
130    ) -> Result<bool> {
131        if loose_path.exists() {
132            return Ok(true);
133        }
134        if let Ok(manager) = self.pack_manager().read() {
135            return Ok(in_pack(&manager));
136        }
137        Ok(false)
138    }
139
140    fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
141        let path = hash_path(&blobs_dir(&self.root), hash);
142        self.loose_or_packed(&path, |m| m.has_object(hash))
143    }
144
145    /// Header-only size lookup for a single attempt. Tries:
146    /// 1. The recent-blob cache (we already have the bytes in
147    ///    memory — `len()` is free).
148    /// 2. The loose blob: peek the 9-byte compression header. For a
149    ///    compressed blob the recorded uncompressed size lives in the
150    ///    header. For an uncompressed blob (no recognised header) the
151    ///    on-disk file length IS the blob size.
152    /// 3. Any loaded pack: the pack format records the uncompressed
153    ///    size as a varint right after the tagged id, so we can decode
154    ///    it without touching the body.
155    ///
156    /// Cost: one short read (typically 9 bytes) for loose blobs, or a
157    /// pure in-memory varint decode for packed blobs. *No*
158    /// decompression.
159    fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
160        if let Ok(cache) = self.recent_blobs.read()
161            && let Some(blob) = cache.get(hash)
162        {
163            return Ok(Some(blob.content().len() as u64));
164        }
165
166        let path = hash_path(&blobs_dir(&self.root), hash);
167        if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
168            if let Some(size) = header_uncompressed_size(&header) {
169                return Ok(Some(size));
170            }
171            // No recognised compression header — the file is raw
172            // blob bytes. The on-disk length is the blob size.
173            return Ok(Some(file_len));
174        }
175
176        if let Ok(manager) = self.pack_manager().read()
177            && let Some(size) = manager.get_hashed_object_size(hash)?
178        {
179            return Ok(Some(size));
180        }
181        Ok(None)
182    }
183
184    fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
185        let path = hash_path(&trees_dir(&self.root), hash);
186        let loose_exists = path.exists();
187        let pack_has = if loose_exists {
188            false
189        } else if let Ok(manager) = self.pack_manager().read() {
190            manager.has_object(hash)
191        } else {
192            false
193        };
194        if (loose_exists || pack_has)
195            && let Ok(cache) = self.recent_trees.read()
196            && let Some(tree) = cache.get(hash)
197        {
198            trace!("Found tree in recent object cache");
199            return Ok(Some(tree.clone()));
200        }
201
202        if let Ok(manager) = self.pack_manager().read()
203            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
204            && obj_type == ObjectType::Tree
205        {
206            trace!("Found tree in packfile");
207            let tree = validate_loaded_tree(rmp_serde::from_slice(&data)?)?;
208            if tree.hash() != *hash {
209                return Err(HeddleError::Corruption {
210                    expected: *hash,
211                    found: tree.hash(),
212                });
213            }
214            return Ok(Some(tree));
215        }
216
217        match read_file_bytes(&path)? {
218            Some(data) => {
219                trace!(size = data.as_slice().len(), "Tree data read");
220                let decoded = if is_compressed(data.as_slice()) {
221                    decompress(data.as_slice())?
222                } else {
223                    data.into_vec()
224                };
225                let tree = validate_loaded_tree(rmp_serde::from_slice(&decoded)?)?;
226                if tree.hash() != *hash {
227                    return Err(HeddleError::Corruption {
228                        expected: *hash,
229                        found: tree.hash(),
230                    });
231                }
232                if let Ok(mut cache) = self.recent_trees.write() {
233                    cache.insert(*hash, tree.clone());
234                }
235                Ok(Some(tree))
236            }
237            None => Ok(None),
238        }
239    }
240
241    fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
242        let path = hash_path(&trees_dir(&self.root), hash);
243        self.loose_or_packed(&path, |m| m.has_object(hash))
244    }
245
246    fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
247        let path = state_path(&self.root, id);
248        let loose_exists = path.exists();
249        let pack_has = if loose_exists {
250            false
251        } else if let Ok(manager) = self.pack_manager().read() {
252            manager.has_object_id(&PackObjectId::ChangeId(*id))
253        } else {
254            false
255        };
256        if (loose_exists || pack_has)
257            && let Ok(cache) = self.recent_states.read()
258            && let Some(state) = cache.get(id)
259        {
260            trace!("Found state in recent object cache");
261            return Ok(Some(state.clone()));
262        }
263
264        if let Ok(manager) = self.pack_manager().read()
265            && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
266            && obj_type == ObjectType::State
267        {
268            trace!("Found state in packfile");
269            let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
270            if let Ok(mut cache) = self.recent_states.write() {
271                cache.insert(*id, state.clone());
272            }
273            return Ok(Some(state));
274        }
275
276        match read_file_bytes(&path)? {
277            Some(data) => {
278                trace!(size = data.as_slice().len(), "State data read");
279                let decoded = if is_compressed(data.as_slice()) {
280                    decompress(data.as_slice())?
281                } else {
282                    data.into_vec()
283                };
284                let state = validate_loaded_state(id, rmp_serde::from_slice(&decoded)?)?;
285                if let Ok(mut cache) = self.recent_states.write() {
286                    cache.insert(*id, state.clone());
287                }
288                Ok(Some(state))
289            }
290            None => Ok(None),
291        }
292    }
293
294    fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
295        let path = state_path(&self.root, id);
296        self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
297    }
298}
299
300impl ObjectStore for FsStore {
301    #[instrument(skip(self), fields(hash = %hash.short()))]
302    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
303        if let Some(blob) = self.try_get_blob_once(hash)? {
304            return Ok(Some(blob));
305        }
306        // Miss path: a sibling FsStore (e.g. the worktree's repo
307        // backing the same `.heddle/`) may have installed a new pack
308        // since we loaded ours. Cheap disk-count check first; full
309        // reload only when the count grew.
310        if self.reload_packs_if_stale()?
311            && let Some(blob) = self.try_get_blob_once(hash)?
312        {
313            return Ok(Some(blob));
314        }
315        trace!("Blob not found");
316        Ok(None)
317    }
318
319    #[instrument(skip(self, blob), fields(size = blob.content().len()))]
320    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
321        let hash = blob.hash();
322        let path = hash_path(&blobs_dir(&self.root), &hash);
323
324        if !path.exists() {
325            let content = blob.content();
326            let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
327            trace!(compressed_size = data.len(), "Writing blob");
328            self.write_loose_object_atomic(&path, &data)?;
329        } else {
330            trace!("Blob already exists, skipping write");
331        }
332        if let Ok(mut cache) = self.recent_blobs.write() {
333            cache.insert(hash, blob.clone());
334        }
335
336        Ok(hash)
337    }
338
339    #[instrument(skip(self, blob), fields(hash = %hash.short()))]
340    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
341        if blob.hash() != hash {
342            return Err(HeddleError::Corruption {
343                expected: hash,
344                found: blob.hash(),
345            });
346        }
347
348        let path = hash_path(&blobs_dir(&self.root), &hash);
349
350        if !path.exists() {
351            let content = blob.content();
352            let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
353            trace!(
354                compressed_size = data.len(),
355                "Writing blob with precomputed hash"
356            );
357            self.write_loose_object_atomic(&path, &data)?;
358        }
359        if let Ok(mut cache) = self.recent_blobs.write() {
360            cache.insert(hash, blob.clone());
361        }
362
363        Ok(hash)
364    }
365
366    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
367    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
368        let found = ContentHash::compute_typed("blob", data);
369        if found != hash {
370            return Err(HeddleError::Corruption {
371                expected: hash,
372                found,
373            });
374        }
375
376        let path = hash_path(&blobs_dir(&self.root), &hash);
377        if !path.exists() {
378            trace!(
379                size = data.len(),
380                "Writing raw blob bytes with precomputed hash"
381            );
382            self.write_loose_object_atomic(&path, data)?;
383        }
384        if let Ok(mut cache) = self.recent_blobs.write() {
385            cache.insert(hash, Blob::from_slice(data));
386        }
387
388        Ok(hash)
389    }
390
391    #[instrument(skip(self), fields(hash = %hash.short()))]
392    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
393        if self.try_has_blob_once(hash)? {
394            return Ok(true);
395        }
396        if self.reload_packs_if_stale()? {
397            return self.try_has_blob_once(hash);
398        }
399        Ok(false)
400    }
401
402    /// Loose blob path safe for hardlink/clonefile materialization.
403    ///
404    /// Returns `Some(path)` only when the loose file exists *and* is
405    /// stored uncompressed — then the on-disk bytes are byte-identical
406    /// to the blob's content, so a hard link materializes the worktree
407    /// file without an extra copy. Compressed blobs and pack-only blobs
408    /// fall through to `None` and the caller writes decompressed bytes
409    /// the slow way.
410    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
411        let path = hash_path(&blobs_dir(&self.root), hash);
412        // 9 bytes is enough to recognise the modern compression header
413        // (LEGACY_COMPRESSED_HEADER_LEN = 5 also fits inside).
414        let header = read_file_header(&path, 9).ok().flatten()?;
415        if is_compressed(&header.0) {
416            return None;
417        }
418        Some(path)
419    }
420
421    /// Promote a blob to its uncompressed-loose canonical path so
422    /// `loose_blob_path` returns `Some(path)` and hardlink-first
423    /// materialization fires.
424    ///
425    /// Three cases:
426    /// 1. Already loose+uncompressed: peek the header, no-op.
427    /// 2. Loose but compressed: read+decompress, atomically rewrite
428    ///    the canonical path with raw bytes.
429    /// 3. Pack-only: read out of the pack via `get_blob`, atomically
430    ///    write to the canonical loose path. Pack copy is left in
431    ///    place — the next prune cycle will discard the loose mirror
432    ///    and a future materialize will re-promote.
433    #[instrument(skip(self), fields(hash = %hash.short()))]
434    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
435        let path = hash_path(&blobs_dir(&self.root), hash);
436
437        // Idempotent fast path: already loose AND uncompressed.
438        if let Some((header, _)) = read_file_header(&path, 9)?
439            && !is_compressed(&header)
440        {
441            trace!("Blob already loose+uncompressed; skipping promotion");
442            return Ok(false);
443        }
444
445        // Either compressed-loose or pack-only. Reading via
446        // `get_blob` covers both: compressed-loose decompresses on
447        // the way out, pack-only reads from the loaded pack manager.
448        let blob = self.get_blob(hash)?.ok_or_else(|| {
449            HeddleError::NotFound(format!(
450                "blob {} not found in store; cannot promote to loose-uncompressed",
451                hash
452            ))
453        })?;
454
455        // Atomically install the uncompressed bytes at the canonical
456        // loose path. `write_loose_object_atomic` writes to a temp
457        // path in the same parent dir and `rename(2)`s — so a
458        // concurrent reader either sees the old contents (compressed
459        // header → falls through to `get_blob` → still correct) or
460        // the new contents (uncompressed → safe to hardlink).
461        debug!(
462            size = blob.content().len(),
463            "Promoting blob to loose-uncompressed canonical store"
464        );
465        self.write_loose_object_atomic(&path, blob.content())?;
466        Ok(true)
467    }
468
469    #[instrument(skip(self), fields(hash = %hash.short()))]
470    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
471        if let Some(size) = self.try_get_blob_size_once(hash)? {
472            return Ok(Some(size));
473        }
474        // Sibling-store recovery, mirroring the read path: if a
475        // concurrent writer just installed a pack we don't know about,
476        // reload and retry once before reporting a miss.
477        if self.reload_packs_if_stale()?
478            && let Some(size) = self.try_get_blob_size_once(hash)?
479        {
480            return Ok(Some(size));
481        }
482        Ok(None)
483    }
484
485    #[instrument(skip(self), fields(hash = %hash.short()))]
486    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
487        if let Some(tree) = self.try_get_tree_once(hash)? {
488            return Ok(Some(tree));
489        }
490        if self.reload_packs_if_stale()?
491            && let Some(tree) = self.try_get_tree_once(hash)?
492        {
493            return Ok(Some(tree));
494        }
495        trace!("Tree not found");
496        Ok(None)
497    }
498
499    #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
500    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
501        let hash = tree.hash();
502        let path = hash_path(&trees_dir(&self.root), &hash);
503
504        if !path.exists() {
505            let serialized = rmp_serde::to_vec(tree)?;
506            let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
507            trace!(compressed_size = data.len(), "Writing tree");
508            self.write_loose_object_atomic(&path, &data)?;
509        } else {
510            trace!("Tree already exists, skipping write");
511        }
512        if let Ok(mut cache) = self.recent_trees.write() {
513            cache.insert(hash, tree.clone());
514        }
515
516        Ok(hash)
517    }
518
519    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
520    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
521        let tree: Tree = rmp_serde::from_slice(data)?;
522        validate_loaded_tree(tree.clone())?;
523        let found = tree.hash();
524        if found != hash {
525            return Err(HeddleError::Corruption {
526                expected: hash,
527                found,
528            });
529        }
530
531        let path = hash_path(&trees_dir(&self.root), &hash);
532        if !path.exists() {
533            trace!(size = data.len(), "Writing raw serialized tree");
534            self.write_loose_object_atomic(&path, data)?;
535        }
536        if let Ok(mut cache) = self.recent_trees.write() {
537            cache.insert(hash, tree);
538        }
539
540        Ok(hash)
541    }
542
543    #[instrument(skip(self), fields(hash = %hash.short()))]
544    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
545        if self.try_has_tree_once(hash)? {
546            return Ok(true);
547        }
548        if self.reload_packs_if_stale()? {
549            return self.try_has_tree_once(hash);
550        }
551        Ok(false)
552    }
553
554    #[instrument(skip(self), fields(id = %id.short()))]
555    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
556        if let Some(state) = self.try_get_state_once(id)? {
557            return Ok(Some(state));
558        }
559        if self.reload_packs_if_stale()?
560            && let Some(state) = self.try_get_state_once(id)?
561        {
562            return Ok(Some(state));
563        }
564        trace!("State not found");
565        Ok(None)
566    }
567
568    #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
569    fn put_state(&self, state: &State) -> Result<()> {
570        let path = state_path(&self.root, &state.change_id);
571        let serialized = rmp_serde::to_vec(state)?;
572        let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
573        trace!(compressed_size = data.len(), "Writing state");
574        self.write_loose_object_atomic(&path, &data)?;
575        if let Ok(mut cache) = self.recent_states.write() {
576            cache.insert(state.change_id, state.clone());
577        }
578        Ok(())
579    }
580
581    #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
582    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
583        let state: State = rmp_serde::from_slice(data)?;
584        if state.change_id != id {
585            return Err(HeddleError::InvalidObject(format!(
586                "state change_id mismatch: expected {}, found {}",
587                id, state.change_id
588            )));
589        }
590        let path = state_path(&self.root, &id);
591        trace!(size = data.len(), "Writing raw serialized state");
592        self.write_loose_object_atomic(&path, data)?;
593        if let Ok(mut cache) = self.recent_states.write() {
594            cache.insert(id, state);
595        }
596        Ok(())
597    }
598
599    #[instrument(skip(self), fields(id = %id.short()))]
600    fn has_state(&self, id: &ChangeId) -> Result<bool> {
601        if self.try_has_state_once(id)? {
602            return Ok(true);
603        }
604        if self.reload_packs_if_stale()? {
605            return self.try_has_state_once(id);
606        }
607        Ok(false)
608    }
609
610    #[instrument(skip(self))]
611    fn list_states(&self) -> Result<Vec<ChangeId>> {
612        let dir = states_dir(&self.root);
613        if !dir.exists() {
614            return Ok(Vec::new());
615        }
616
617        let mut states = Vec::new();
618        for entry in fs::read_dir(&dir)? {
619            let entry = entry?;
620            let path = entry.path();
621            if let Some(name) = path.file_stem()
622                && let Some(name_str) = name.to_str()
623                && let Ok(id) = ChangeId::parse(name_str)
624            {
625                states.push(id);
626            }
627        }
628        if let Ok(manager) = self.pack_manager().read() {
629            for id in manager.list_all_ids()? {
630                if let PackObjectId::ChangeId(change_id) = id
631                    && !states.contains(&change_id)
632                {
633                    states.push(change_id);
634                }
635            }
636        }
637        debug!(count = states.len(), "Listed states");
638        Ok(states)
639    }
640
641    #[instrument(skip(self), fields(id = %id))]
642    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
643        let path = action_path(&self.root, id);
644        if !path.exists()
645            && let Ok(manager) = self.pack_manager().read()
646            && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
647            && obj_type == ObjectType::Action
648        {
649            trace!("Found action in packfile");
650            let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
651            return Ok(Some(action));
652        }
653        match read_file_bytes(&path)? {
654            Some(data) => {
655                trace!(size = data.as_slice().len(), "Action data read");
656                let decoded = if is_compressed(data.as_slice()) {
657                    decompress(data.as_slice())?
658                } else {
659                    data.into_vec()
660                };
661                let action = validate_loaded_action(id, rmp_serde::from_slice(&decoded)?)?;
662                Ok(Some(action))
663            }
664            None => {
665                trace!("Action not found");
666                Ok(None)
667            }
668        }
669    }
670
671    #[instrument(skip(self, action))]
672    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
673        let id = action.id();
674        let path = action_path(&self.root, &id);
675
676        if !path.exists() {
677            let serialized = rmp_serde::to_vec(action)?;
678            let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
679            trace!(id = %id, compressed_size = data.len(), "Writing action");
680            self.write_loose_object_atomic(&path, &data)?;
681        }
682
683        Ok(id)
684    }
685
686    #[instrument(skip(self))]
687    fn list_actions(&self) -> Result<Vec<ActionId>> {
688        let dir = actions_dir(&self.root);
689        let mut actions = Vec::new();
690        if dir.exists() {
691            for entry in fs::read_dir(&dir)? {
692                let entry = entry?;
693                let path = entry.path();
694                if let Some(name) = path.file_stem()
695                    && let Some(name_str) = name.to_str()
696                    && let Ok(hash) = ContentHash::from_hex(name_str)
697                {
698                    actions.push(ActionId::from_hash(hash));
699                }
700            }
701        }
702        if let Ok(manager) = self.pack_manager().read() {
703            for id in manager.list_all_ids()? {
704                if let PackObjectId::Hash(hash) = id
705                    && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
706                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
707                    && obj_type == ObjectType::Action
708                {
709                    actions.push(ActionId::from_hash(hash));
710                }
711            }
712        }
713        debug!(count = actions.len(), "Listed actions");
714        Ok(actions)
715    }
716
717    #[instrument(skip(self))]
718    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
719        let dir = blobs_dir(&self.root);
720        let mut blobs = list_hashes_from_dir(&dir)?;
721        if let Ok(manager) = self.pack_manager().read() {
722            for id in manager.list_all_ids()? {
723                if let PackObjectId::Hash(hash) = id
724                    && !blobs.contains(&hash)
725                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
726                    && obj_type == ObjectType::Blob
727                {
728                    blobs.push(hash);
729                }
730            }
731        }
732        Ok(blobs)
733    }
734
735    #[instrument(skip(self))]
736    fn list_trees(&self) -> Result<Vec<ContentHash>> {
737        let dir = trees_dir(&self.root);
738        let mut trees = list_hashes_from_dir(&dir)?;
739        if let Ok(manager) = self.pack_manager().read() {
740            for id in manager.list_all_ids()? {
741                if let PackObjectId::Hash(hash) = id
742                    && !trees.contains(&hash)
743                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
744                    && obj_type == ObjectType::Tree
745                {
746                    trees.push(hash);
747                }
748            }
749        }
750        Ok(trees)
751    }
752
753    #[instrument(skip(self))]
754    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
755        self.pack_objects_impl(aggressive)
756    }
757
758    #[instrument(skip(self), fields(id = ?id))]
759    fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
760        if let Ok(manager) = self.pack_manager().read()
761            && let Some((obj_type, data)) = manager.get_object(id)?
762        {
763            return Ok(Some((obj_type, data)));
764        }
765
766        match id {
767            PackObjectId::Hash(hash) => {
768                if let Some(blob) = self.get_blob(hash)? {
769                    return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
770                }
771                if let Some(tree) = self.get_tree(hash)? {
772                    return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
773                }
774                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
775                    return Ok(Some((
776                        ObjectType::Action,
777                        rmp_serde::to_vec_named(&action)?,
778                    )));
779                }
780                Ok(None)
781            }
782            PackObjectId::ChangeId(change_id) => {
783                if let Some(state) = self.get_state(change_id)? {
784                    Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
785                } else {
786                    Ok(None)
787                }
788            }
789        }
790    }
791
792    #[instrument(skip(self, pack_data, index_data))]
793    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
794        let reader =
795            crate::store::pack::PackReader::from_bytes(pack_data.to_vec(), index_data.to_vec())?;
796        let ids = reader.list_ids();
797        self.install_pack_files(pack_data, index_data)?;
798        Ok(ids)
799    }
800
801    #[instrument(skip(self, blobs), fields(count = blobs.len()))]
802    fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
803        self.put_blobs_packed_impl(blobs)
804    }
805
806    #[instrument(skip(self))]
807    fn install_pack_streaming(
808        &self,
809        pack_path: &std::path::Path,
810        index_path: &std::path::Path,
811    ) -> Result<()> {
812        self.install_pack_files_streaming(pack_path, index_path)
813    }
814
815    #[instrument(skip(self))]
816    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
817        self.prune_loose_objects_impl()
818    }
819
820    #[instrument(skip(self))]
821    fn begin_snapshot_write_batch(&self) -> Result<()> {
822        self.begin_snapshot_write_batch_impl()
823    }
824
825    #[instrument(skip(self))]
826    fn flush_snapshot_write_batch(&self) -> Result<()> {
827        self.flush_snapshot_write_batch_impl()
828    }
829
830    #[instrument(skip(self))]
831    fn abort_snapshot_write_batch(&self) {
832        self.abort_snapshot_write_batch_impl();
833    }
834
835    fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
836        Ok(redaction_path(&self.root, blob).exists())
837    }
838
839    fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
840        let path = redaction_path(&self.root, blob);
841        match fs::read(&path) {
842            Ok(bytes) => Ok(Some(bytes)),
843            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
844            Err(err) => Err(HeddleError::Io(err)),
845        }
846    }
847
848    fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
849        let dir = redactions_dir(&self.root);
850        if !dir.exists() {
851            fs::create_dir_all(&dir)?;
852        }
853        let path = redaction_path(&self.root, blob);
854        crate::fs_atomic::write_file_atomic(&path, bytes)?;
855        Ok(())
856    }
857
858    fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
859        let dir = redactions_dir(&self.root);
860        if !dir.exists() {
861            return Ok(Vec::new());
862        }
863        let mut out = Vec::new();
864        for entry in fs::read_dir(&dir)? {
865            let entry = entry?;
866            let path = entry.path();
867            if path.extension().and_then(|e| e.to_str()) != Some("bin") {
868                continue;
869            }
870            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
871                continue;
872            };
873            if let Ok(hash) = ContentHash::from_hex(stem) {
874                out.push(hash);
875            }
876        }
877        Ok(out)
878    }
879}