Skip to main content

objects/store/fs/
fs_impl.rs

1// SPDX-License-Identifier: Apache-2.0
2//! ObjectStore implementation for FsStore.
3
4use std::{
5    fs,
6    path::{Path, PathBuf},
7};
8
9use tracing::{debug, instrument, trace};
10
11use super::{
12    FsStore,
13    fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
14    fs_paths::{action_path, actions_dir, blobs_dir, hash_path, state_path, states_dir, trees_dir},
15};
16use crate::{
17    object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
18    store::{
19        HeddleError, ObjectStore, Result,
20        compression::{compress, decompress, header_uncompressed_size, is_compressed},
21        pack::{ObjectType, PackManager, PackObjectId},
22    },
23};
24
25/// Bytes we read off disk to recover a blob's uncompressed size: the
26/// 9-byte compression header is enough for both modern and legacy
27/// (5-byte) headers — `header_uncompressed_size` picks the right
28/// width.
29const BLOB_HEADER_PEEK: usize = 9;
30
31fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
32    tree.validate()?;
33    Ok(tree)
34}
35
36fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
37    if state.change_id != *requested_id {
38        return Err(HeddleError::InvalidObject(format!(
39            "state change_id mismatch: requested {}, found {}",
40            requested_id, state.change_id
41        )));
42    }
43
44    Ok(state)
45}
46
47fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
48    let found_id = action.compute_id();
49    if found_id != *requested_id {
50        return Err(HeddleError::InvalidObject(format!(
51            "action id mismatch: requested {}, found {}",
52            requested_id, found_id
53        )));
54    }
55
56    Ok(action)
57}
58
59impl FsStore {
60    /// Single-pass blob lookup. The wrapper in `ObjectStore::get_blob`
61    /// retries this once after a stale-reload on miss.
62    fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
63        let path = hash_path(&blobs_dir(&self.root), hash);
64        let loose_exists = path.exists();
65        let pack_has = if loose_exists {
66            false
67        } else if let Ok(manager) = self.pack_manager().read() {
68            manager.has_object(hash)
69        } else {
70            false
71        };
72        if (loose_exists || pack_has)
73            && let Ok(cache) = self.recent_blobs.read()
74            && let Some(blob) = cache.get(hash)
75        {
76            trace!("Found blob in recent object cache");
77            return Ok(Some(blob.clone()));
78        }
79
80        if let Ok(manager) = self.pack_manager().read()
81            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
82            && obj_type == ObjectType::Blob
83        {
84            trace!("Found blob in packfile");
85            let blob = Blob::new(data);
86            if blob.hash() != *hash {
87                return Err(HeddleError::Corruption {
88                    expected: *hash,
89                    found: blob.hash(),
90                });
91            }
92            return Ok(Some(blob));
93        }
94
95        match read_file_bytes(&path)? {
96            Some(data) => {
97                trace!(size = data.as_slice().len(), "Blob data read");
98                let content = if is_compressed(data.as_slice()) {
99                    decompress(data.as_slice())?
100                } else {
101                    data.into_vec()
102                };
103                let blob = Blob::new(content);
104                if blob.hash() != *hash {
105                    return Err(HeddleError::Corruption {
106                        expected: *hash,
107                        found: blob.hash(),
108                    });
109                }
110                if let Ok(mut cache) = self.recent_blobs.write() {
111                    cache.insert(*hash, blob.clone());
112                }
113                Ok(Some(blob))
114            }
115            None => Ok(None),
116        }
117    }
118
119    /// Shared body for `try_has_{blob,tree,state}_once`: object is
120    /// present iff the loose path exists or the pack manager
121    /// resolves it. Callers pass the loose path and the
122    /// pack-manager probe; the helper handles the lock.
123    fn loose_or_packed(
124        &self,
125        loose_path: &Path,
126        in_pack: impl FnOnce(&PackManager) -> bool,
127    ) -> Result<bool> {
128        if loose_path.exists() {
129            return Ok(true);
130        }
131        if let Ok(manager) = self.pack_manager().read() {
132            return Ok(in_pack(&manager));
133        }
134        Ok(false)
135    }
136
137    fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
138        let path = hash_path(&blobs_dir(&self.root), hash);
139        self.loose_or_packed(&path, |m| m.has_object(hash))
140    }
141
142    /// Header-only size lookup for a single attempt. Tries:
143    /// 1. The recent-blob cache (we already have the bytes in
144    ///    memory — `len()` is free).
145    /// 2. The loose blob: peek the 9-byte compression header. For a
146    ///    compressed blob the recorded uncompressed size lives in the
147    ///    header. For an uncompressed blob (no recognised header) the
148    ///    on-disk file length IS the blob size.
149    /// 3. Any loaded pack: the pack format records the uncompressed
150    ///    size as a varint right after the tagged id, so we can decode
151    ///    it without touching the body.
152    ///
153    /// Cost: one short read (typically 9 bytes) for loose blobs, or a
154    /// pure in-memory varint decode for packed blobs. *No*
155    /// decompression.
156    fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
157        if let Ok(cache) = self.recent_blobs.read()
158            && let Some(blob) = cache.get(hash)
159        {
160            return Ok(Some(blob.content().len() as u64));
161        }
162
163        let path = hash_path(&blobs_dir(&self.root), hash);
164        if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
165            if let Some(size) = header_uncompressed_size(&header) {
166                return Ok(Some(size));
167            }
168            // No recognised compression header — the file is raw
169            // blob bytes. The on-disk length is the blob size.
170            return Ok(Some(file_len));
171        }
172
173        if let Ok(manager) = self.pack_manager().read()
174            && let Some(size) = manager.get_hashed_object_size(hash)?
175        {
176            return Ok(Some(size));
177        }
178        Ok(None)
179    }
180
181    fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
182        let path = hash_path(&trees_dir(&self.root), hash);
183        let loose_exists = path.exists();
184        let pack_has = if loose_exists {
185            false
186        } else if let Ok(manager) = self.pack_manager().read() {
187            manager.has_object(hash)
188        } else {
189            false
190        };
191        if (loose_exists || pack_has)
192            && let Ok(cache) = self.recent_trees.read()
193            && let Some(tree) = cache.get(hash)
194        {
195            trace!("Found tree in recent object cache");
196            return Ok(Some(tree.clone()));
197        }
198
199        if let Ok(manager) = self.pack_manager().read()
200            && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
201            && obj_type == ObjectType::Tree
202        {
203            trace!("Found tree in packfile");
204            let tree = validate_loaded_tree(rmp_serde::from_slice(&data)?)?;
205            if tree.hash() != *hash {
206                return Err(HeddleError::Corruption {
207                    expected: *hash,
208                    found: tree.hash(),
209                });
210            }
211            return Ok(Some(tree));
212        }
213
214        match read_file_bytes(&path)? {
215            Some(data) => {
216                trace!(size = data.as_slice().len(), "Tree data read");
217                let decoded = if is_compressed(data.as_slice()) {
218                    decompress(data.as_slice())?
219                } else {
220                    data.into_vec()
221                };
222                let tree = validate_loaded_tree(rmp_serde::from_slice(&decoded)?)?;
223                if tree.hash() != *hash {
224                    return Err(HeddleError::Corruption {
225                        expected: *hash,
226                        found: tree.hash(),
227                    });
228                }
229                if let Ok(mut cache) = self.recent_trees.write() {
230                    cache.insert(*hash, tree.clone());
231                }
232                Ok(Some(tree))
233            }
234            None => Ok(None),
235        }
236    }
237
238    fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
239        let path = hash_path(&trees_dir(&self.root), hash);
240        self.loose_or_packed(&path, |m| m.has_object(hash))
241    }
242
243    fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
244        let path = state_path(&self.root, id);
245        let loose_exists = path.exists();
246        let pack_has = if loose_exists {
247            false
248        } else if let Ok(manager) = self.pack_manager().read() {
249            manager.has_object_id(&PackObjectId::ChangeId(*id))
250        } else {
251            false
252        };
253        if (loose_exists || pack_has)
254            && let Ok(cache) = self.recent_states.read()
255            && let Some(state) = cache.get(id)
256        {
257            trace!("Found state in recent object cache");
258            return Ok(Some(state.clone()));
259        }
260
261        if let Ok(manager) = self.pack_manager().read()
262            && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
263            && obj_type == ObjectType::State
264        {
265            trace!("Found state in packfile");
266            let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
267            if let Ok(mut cache) = self.recent_states.write() {
268                cache.insert(*id, state.clone());
269            }
270            return Ok(Some(state));
271        }
272
273        match read_file_bytes(&path)? {
274            Some(data) => {
275                trace!(size = data.as_slice().len(), "State data read");
276                let decoded = if is_compressed(data.as_slice()) {
277                    decompress(data.as_slice())?
278                } else {
279                    data.into_vec()
280                };
281                let state = validate_loaded_state(id, rmp_serde::from_slice(&decoded)?)?;
282                if let Ok(mut cache) = self.recent_states.write() {
283                    cache.insert(*id, state.clone());
284                }
285                Ok(Some(state))
286            }
287            None => Ok(None),
288        }
289    }
290
291    fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
292        let path = state_path(&self.root, id);
293        self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
294    }
295}
296
297impl ObjectStore for FsStore {
298    #[instrument(skip(self), fields(hash = %hash.short()))]
299    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
300        if let Some(blob) = self.try_get_blob_once(hash)? {
301            return Ok(Some(blob));
302        }
303        // Miss path: a sibling FsStore (e.g. the worktree's repo
304        // backing the same `.heddle/`) may have installed a new pack
305        // since we loaded ours. Cheap disk-count check first; full
306        // reload only when the count grew.
307        if self.reload_packs_if_stale()?
308            && let Some(blob) = self.try_get_blob_once(hash)?
309        {
310            return Ok(Some(blob));
311        }
312        trace!("Blob not found");
313        Ok(None)
314    }
315
316    #[instrument(skip(self, blob), fields(size = blob.content().len()))]
317    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
318        let hash = blob.hash();
319        let path = hash_path(&blobs_dir(&self.root), &hash);
320
321        if !path.exists() {
322            let content = blob.content();
323            let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
324            trace!(compressed_size = data.len(), "Writing blob");
325            self.write_loose_object_atomic(&path, &data)?;
326        } else {
327            trace!("Blob already exists, skipping write");
328        }
329        if let Ok(mut cache) = self.recent_blobs.write() {
330            cache.insert(hash, blob.clone());
331        }
332
333        Ok(hash)
334    }
335
336    #[instrument(skip(self, blob), fields(hash = %hash.short()))]
337    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
338        if blob.hash() != hash {
339            return Err(HeddleError::Corruption {
340                expected: hash,
341                found: blob.hash(),
342            });
343        }
344
345        let path = hash_path(&blobs_dir(&self.root), &hash);
346
347        if !path.exists() {
348            let content = blob.content();
349            let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
350            trace!(
351                compressed_size = data.len(),
352                "Writing blob with precomputed hash"
353            );
354            self.write_loose_object_atomic(&path, &data)?;
355        }
356        if let Ok(mut cache) = self.recent_blobs.write() {
357            cache.insert(hash, blob.clone());
358        }
359
360        Ok(hash)
361    }
362
363    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
364    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
365        let found = ContentHash::compute_typed("blob", data);
366        if found != hash {
367            return Err(HeddleError::Corruption {
368                expected: hash,
369                found,
370            });
371        }
372
373        let path = hash_path(&blobs_dir(&self.root), &hash);
374        if !path.exists() {
375            trace!(
376                size = data.len(),
377                "Writing raw blob bytes with precomputed hash"
378            );
379            self.write_loose_object_atomic(&path, data)?;
380        }
381        if let Ok(mut cache) = self.recent_blobs.write() {
382            cache.insert(hash, Blob::from_slice(data));
383        }
384
385        Ok(hash)
386    }
387
388    #[instrument(skip(self), fields(hash = %hash.short()))]
389    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
390        if self.try_has_blob_once(hash)? {
391            return Ok(true);
392        }
393        if self.reload_packs_if_stale()? {
394            return self.try_has_blob_once(hash);
395        }
396        Ok(false)
397    }
398
399    /// Loose blob path safe for hardlink/clonefile materialization.
400    ///
401    /// Returns `Some(path)` only when the loose file exists *and* is
402    /// stored uncompressed — then the on-disk bytes are byte-identical
403    /// to the blob's content, so a hard link materializes the worktree
404    /// file without an extra copy. Compressed blobs and pack-only blobs
405    /// fall through to `None` and the caller writes decompressed bytes
406    /// the slow way.
407    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
408        let path = hash_path(&blobs_dir(&self.root), hash);
409        // 9 bytes is enough to recognise the modern compression header
410        // (LEGACY_COMPRESSED_HEADER_LEN = 5 also fits inside).
411        let header = read_file_header(&path, 9).ok().flatten()?;
412        if is_compressed(&header.0) {
413            return None;
414        }
415        Some(path)
416    }
417
418    /// Promote a blob to its uncompressed-loose canonical path so
419    /// `loose_blob_path` returns `Some(path)` and hardlink-first
420    /// materialization fires.
421    ///
422    /// Three cases:
423    /// 1. Already loose+uncompressed: peek the header, no-op.
424    /// 2. Loose but compressed: read+decompress, atomically rewrite
425    ///    the canonical path with raw bytes.
426    /// 3. Pack-only: read out of the pack via `get_blob`, atomically
427    ///    write to the canonical loose path. Pack copy is left in
428    ///    place — the next prune cycle will discard the loose mirror
429    ///    and a future materialize will re-promote.
430    #[instrument(skip(self), fields(hash = %hash.short()))]
431    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
432        let path = hash_path(&blobs_dir(&self.root), hash);
433
434        // Idempotent fast path: already loose AND uncompressed.
435        if let Some((header, _)) = read_file_header(&path, 9)?
436            && !is_compressed(&header)
437        {
438            trace!("Blob already loose+uncompressed; skipping promotion");
439            return Ok(false);
440        }
441
442        // Either compressed-loose or pack-only. Reading via
443        // `get_blob` covers both: compressed-loose decompresses on
444        // the way out, pack-only reads from the loaded pack manager.
445        let blob = self.get_blob(hash)?.ok_or_else(|| {
446            HeddleError::NotFound(format!(
447                "blob {} not found in store; cannot promote to loose-uncompressed",
448                hash
449            ))
450        })?;
451
452        // Atomically install the uncompressed bytes at the canonical
453        // loose path. `write_loose_object_atomic` writes to a temp
454        // path in the same parent dir and `rename(2)`s — so a
455        // concurrent reader either sees the old contents (compressed
456        // header → falls through to `get_blob` → still correct) or
457        // the new contents (uncompressed → safe to hardlink).
458        debug!(
459            size = blob.content().len(),
460            "Promoting blob to loose-uncompressed canonical store"
461        );
462        self.write_loose_object_atomic(&path, blob.content())?;
463        Ok(true)
464    }
465
466    #[instrument(skip(self), fields(hash = %hash.short()))]
467    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
468        if let Some(size) = self.try_get_blob_size_once(hash)? {
469            return Ok(Some(size));
470        }
471        // Sibling-store recovery, mirroring the read path: if a
472        // concurrent writer just installed a pack we don't know about,
473        // reload and retry once before reporting a miss.
474        if self.reload_packs_if_stale()?
475            && let Some(size) = self.try_get_blob_size_once(hash)?
476        {
477            return Ok(Some(size));
478        }
479        Ok(None)
480    }
481
482    #[instrument(skip(self), fields(hash = %hash.short()))]
483    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
484        if let Some(tree) = self.try_get_tree_once(hash)? {
485            return Ok(Some(tree));
486        }
487        if self.reload_packs_if_stale()?
488            && let Some(tree) = self.try_get_tree_once(hash)?
489        {
490            return Ok(Some(tree));
491        }
492        trace!("Tree not found");
493        Ok(None)
494    }
495
496    #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
497    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
498        let hash = tree.hash();
499        let path = hash_path(&trees_dir(&self.root), &hash);
500
501        if !path.exists() {
502            let serialized = rmp_serde::to_vec(tree)?;
503            let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
504            trace!(compressed_size = data.len(), "Writing tree");
505            self.write_loose_object_atomic(&path, &data)?;
506        } else {
507            trace!("Tree already exists, skipping write");
508        }
509        if let Ok(mut cache) = self.recent_trees.write() {
510            cache.insert(hash, tree.clone());
511        }
512
513        Ok(hash)
514    }
515
516    #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
517    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
518        let tree: Tree = rmp_serde::from_slice(data)?;
519        validate_loaded_tree(tree.clone())?;
520        let found = tree.hash();
521        if found != hash {
522            return Err(HeddleError::Corruption {
523                expected: hash,
524                found,
525            });
526        }
527
528        let path = hash_path(&trees_dir(&self.root), &hash);
529        if !path.exists() {
530            trace!(size = data.len(), "Writing raw serialized tree");
531            self.write_loose_object_atomic(&path, data)?;
532        }
533        if let Ok(mut cache) = self.recent_trees.write() {
534            cache.insert(hash, tree);
535        }
536
537        Ok(hash)
538    }
539
540    #[instrument(skip(self), fields(hash = %hash.short()))]
541    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
542        if self.try_has_tree_once(hash)? {
543            return Ok(true);
544        }
545        if self.reload_packs_if_stale()? {
546            return self.try_has_tree_once(hash);
547        }
548        Ok(false)
549    }
550
551    #[instrument(skip(self), fields(id = %id.short()))]
552    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
553        if let Some(state) = self.try_get_state_once(id)? {
554            return Ok(Some(state));
555        }
556        if self.reload_packs_if_stale()?
557            && let Some(state) = self.try_get_state_once(id)?
558        {
559            return Ok(Some(state));
560        }
561        trace!("State not found");
562        Ok(None)
563    }
564
565    #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
566    fn put_state(&self, state: &State) -> Result<()> {
567        let path = state_path(&self.root, &state.change_id);
568        let serialized = rmp_serde::to_vec(state)?;
569        let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
570        trace!(compressed_size = data.len(), "Writing state");
571        self.write_loose_object_atomic(&path, &data)?;
572        if let Ok(mut cache) = self.recent_states.write() {
573            cache.insert(state.change_id, state.clone());
574        }
575        Ok(())
576    }
577
578    #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
579    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
580        let state: State = rmp_serde::from_slice(data)?;
581        if state.change_id != id {
582            return Err(HeddleError::InvalidObject(format!(
583                "state change_id mismatch: expected {}, found {}",
584                id, state.change_id
585            )));
586        }
587        let path = state_path(&self.root, &id);
588        trace!(size = data.len(), "Writing raw serialized state");
589        self.write_loose_object_atomic(&path, data)?;
590        if let Ok(mut cache) = self.recent_states.write() {
591            cache.insert(id, state);
592        }
593        Ok(())
594    }
595
596    #[instrument(skip(self), fields(id = %id.short()))]
597    fn has_state(&self, id: &ChangeId) -> Result<bool> {
598        if self.try_has_state_once(id)? {
599            return Ok(true);
600        }
601        if self.reload_packs_if_stale()? {
602            return self.try_has_state_once(id);
603        }
604        Ok(false)
605    }
606
607    #[instrument(skip(self))]
608    fn list_states(&self) -> Result<Vec<ChangeId>> {
609        let dir = states_dir(&self.root);
610        if !dir.exists() {
611            return Ok(Vec::new());
612        }
613
614        let mut states = Vec::new();
615        for entry in fs::read_dir(&dir)? {
616            let entry = entry?;
617            let path = entry.path();
618            if let Some(name) = path.file_stem()
619                && let Some(name_str) = name.to_str()
620                && let Ok(id) = ChangeId::parse(name_str)
621            {
622                states.push(id);
623            }
624        }
625        if let Ok(manager) = self.pack_manager().read() {
626            for id in manager.list_all_ids()? {
627                if let PackObjectId::ChangeId(change_id) = id
628                    && !states.contains(&change_id)
629                {
630                    states.push(change_id);
631                }
632            }
633        }
634        debug!(count = states.len(), "Listed states");
635        Ok(states)
636    }
637
638    #[instrument(skip(self), fields(id = %id))]
639    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
640        let path = action_path(&self.root, id);
641        if !path.exists()
642            && let Ok(manager) = self.pack_manager().read()
643            && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
644            && obj_type == ObjectType::Action
645        {
646            trace!("Found action in packfile");
647            let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
648            return Ok(Some(action));
649        }
650        match read_file_bytes(&path)? {
651            Some(data) => {
652                trace!(size = data.as_slice().len(), "Action data read");
653                let decoded = if is_compressed(data.as_slice()) {
654                    decompress(data.as_slice())?
655                } else {
656                    data.into_vec()
657                };
658                let action = validate_loaded_action(id, rmp_serde::from_slice(&decoded)?)?;
659                Ok(Some(action))
660            }
661            None => {
662                trace!("Action not found");
663                Ok(None)
664            }
665        }
666    }
667
668    #[instrument(skip(self, action))]
669    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
670        let id = action.id();
671        let path = action_path(&self.root, &id);
672
673        if !path.exists() {
674            let serialized = rmp_serde::to_vec(action)?;
675            let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
676            trace!(id = %id, compressed_size = data.len(), "Writing action");
677            self.write_loose_object_atomic(&path, &data)?;
678        }
679
680        Ok(id)
681    }
682
683    #[instrument(skip(self))]
684    fn list_actions(&self) -> Result<Vec<ActionId>> {
685        let dir = actions_dir(&self.root);
686        let mut actions = Vec::new();
687        if dir.exists() {
688            for entry in fs::read_dir(&dir)? {
689                let entry = entry?;
690                let path = entry.path();
691                if let Some(name) = path.file_stem()
692                    && let Some(name_str) = name.to_str()
693                    && let Ok(hash) = ContentHash::from_hex(name_str)
694                {
695                    actions.push(ActionId::from_hash(hash));
696                }
697            }
698        }
699        if let Ok(manager) = self.pack_manager().read() {
700            for id in manager.list_all_ids()? {
701                if let PackObjectId::Hash(hash) = id
702                    && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
703                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
704                    && obj_type == ObjectType::Action
705                {
706                    actions.push(ActionId::from_hash(hash));
707                }
708            }
709        }
710        debug!(count = actions.len(), "Listed actions");
711        Ok(actions)
712    }
713
714    #[instrument(skip(self))]
715    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
716        let dir = blobs_dir(&self.root);
717        let mut blobs = list_hashes_from_dir(&dir)?;
718        if let Ok(manager) = self.pack_manager().read() {
719            for id in manager.list_all_ids()? {
720                if let PackObjectId::Hash(hash) = id
721                    && !blobs.contains(&hash)
722                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
723                    && obj_type == ObjectType::Blob
724                {
725                    blobs.push(hash);
726                }
727            }
728        }
729        Ok(blobs)
730    }
731
732    #[instrument(skip(self))]
733    fn list_trees(&self) -> Result<Vec<ContentHash>> {
734        let dir = trees_dir(&self.root);
735        let mut trees = list_hashes_from_dir(&dir)?;
736        if let Ok(manager) = self.pack_manager().read() {
737            for id in manager.list_all_ids()? {
738                if let PackObjectId::Hash(hash) = id
739                    && !trees.contains(&hash)
740                    && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
741                    && obj_type == ObjectType::Tree
742                {
743                    trees.push(hash);
744                }
745            }
746        }
747        Ok(trees)
748    }
749
750    #[instrument(skip(self))]
751    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
752        self.pack_objects_impl(aggressive)
753    }
754
755    #[instrument(skip(self), fields(id = ?id))]
756    fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
757        if let Ok(manager) = self.pack_manager().read()
758            && let Some((obj_type, data)) = manager.get_object(id)?
759        {
760            return Ok(Some((obj_type, data)));
761        }
762
763        match id {
764            PackObjectId::Hash(hash) => {
765                if let Some(blob) = self.get_blob(hash)? {
766                    return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
767                }
768                if let Some(tree) = self.get_tree(hash)? {
769                    return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
770                }
771                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
772                    return Ok(Some((
773                        ObjectType::Action,
774                        rmp_serde::to_vec_named(&action)?,
775                    )));
776                }
777                Ok(None)
778            }
779            PackObjectId::ChangeId(change_id) => {
780                if let Some(state) = self.get_state(change_id)? {
781                    Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
782                } else {
783                    Ok(None)
784                }
785            }
786        }
787    }
788
789    #[instrument(skip(self, pack_data, index_data))]
790    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
791        let reader =
792            crate::store::pack::PackReader::from_bytes(pack_data.to_vec(), index_data.to_vec())?;
793        let ids = reader.list_ids();
794        self.install_pack_files(pack_data, index_data)?;
795        Ok(ids)
796    }
797
798    #[instrument(skip(self, blobs), fields(count = blobs.len()))]
799    fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
800        self.put_blobs_packed_impl(blobs)
801    }
802
803    #[instrument(skip(self))]
804    fn install_pack_streaming(
805        &self,
806        pack_path: &std::path::Path,
807        index_path: &std::path::Path,
808    ) -> Result<()> {
809        self.install_pack_files_streaming(pack_path, index_path)
810    }
811
812    #[instrument(skip(self))]
813    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
814        self.prune_loose_objects_impl()
815    }
816
817    #[instrument(skip(self))]
818    fn begin_snapshot_write_batch(&self) -> Result<()> {
819        self.begin_snapshot_write_batch_impl()
820    }
821
822    #[instrument(skip(self))]
823    fn flush_snapshot_write_batch(&self) -> Result<()> {
824        self.flush_snapshot_write_batch_impl()
825    }
826
827    #[instrument(skip(self))]
828    fn abort_snapshot_write_batch(&self) {
829        self.abort_snapshot_write_batch_impl();
830    }
831}