Skip to main content

objects/store/fs/
fs_store.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Core FsStore structure.
3
4use std::{
5    collections::{BTreeSet, HashMap, VecDeque},
6    hash::Hash,
7    path::{Path, PathBuf},
8    sync::{Mutex, RwLock},
9};
10
11use super::{
12    fs_io::{AtomicWriteMode, write_atomic},
13    fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
14};
15use crate::{
16    fs_atomic::sync_directory,
17    object::{Blob, ChangeId, ContentHash, State, Tree},
18    store::{
19        CompressionConfig, Result,
20        pack::{PackManager, PackObjectId},
21    },
22};
23
24const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
25const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
26/// Soft cap on the in-process loose-blob verification cache. Each
27/// entry is one `ContentHash` (~32 bytes) so this is ≈2 MB of memory
28/// for the upper bound, and the FIFO eviction is bounded by hash
29/// hits rather than store size. 65k entries covers the typical hot
30/// working set for million-blob monorepos; a daemon that materialises
31/// dozens of unrelated trees won't drift toward unbounded growth.
32const VERIFIED_LOOSE_BLOB_CACHE_CAPACITY: usize = 65_536;
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum LooseObjectWriteMode {
36    Durable,
37    BatchDirectorySync,
38}
39
40#[derive(Debug)]
41pub(super) struct RecentObjectCache<K, V> {
42    entries: HashMap<K, V>,
43    order: VecDeque<K>,
44    capacity: usize,
45}
46
47impl<K, V> RecentObjectCache<K, V>
48where
49    K: Copy + Eq + Hash,
50{
51    pub(super) fn with_capacity(capacity: usize) -> Self {
52        Self {
53            entries: HashMap::new(),
54            order: VecDeque::new(),
55            capacity,
56        }
57    }
58
59    pub(super) fn get(&self, key: &K) -> Option<&V> {
60        self.entries.get(key)
61    }
62
63    pub(super) fn insert(&mut self, key: K, value: V) {
64        if self.capacity == 0 {
65            return;
66        }
67        if self.entries.insert(key, value).is_none() {
68            self.order.push_back(key);
69        }
70        while self.entries.len() > self.capacity {
71            if let Some(oldest) = self.order.pop_front() {
72                self.entries.remove(&oldest);
73            }
74        }
75    }
76}
77
78/// Filesystem-based storage for Heddle objects.
79///
80/// Layout:
81/// ```text
82/// .heddle/
83///   objects/
84///     blobs/
85///       ab/
86///         cdef1234...
87///     trees/
88///       ab/
89///         cdef1234...
90///     states/
91///       <change_id>.state
92///   actions/
93///     <action_id>.action
94///   packs/
95///     <hash>.pack
96///     <hash>.idx
97/// ```
98pub struct FsStore {
99    pub(super) root: PathBuf,
100    pub(super) compression: CompressionConfig,
101    pack_manager: RwLock<PackManager>,
102    pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
103    pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
104    pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
105    loose_object_write_mode: LooseObjectWriteMode,
106    snapshot_write_batch_depth: Mutex<usize>,
107    pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
108    /// In-process trust cache for loose-blob cache mirrors. A hash
109    /// enters this LRU when this process either (a) wrote the blob
110    /// itself via `promote_to_loose_uncompressed` or (b) successfully
111    /// hash-verified it on first read. Bytes-on-disk for any entry
112    /// in this cache can be trusted without a re-hash by subsequent
113    /// `loose_blob_path` calls within the same process.
114    ///
115    /// Capped at [`VERIFIED_LOOSE_BLOB_CACHE_CAPACITY`] entries so a
116    /// long-lived process (`heddled`) materialising many unrelated
117    /// trees doesn't drift into unbounded memory growth. FIFO
118    /// eviction; an evicted hash pays one extra BLAKE3 on its next
119    /// read (cost-of-evict ≈ working-set-size BLAKE3 ops). Stored as
120    /// `RecentObjectCache<…, ()>` to share the FIFO-eviction
121    /// machinery with the other on-store caches; the unit value is
122    /// a marker that the corresponding loose mirror was verified.
123    ///
124    /// Pairs with `AtomicWriteMode::NoSync` on the write side: a
125    /// crashed promote leaves a torn cache-mirror file, but its
126    /// hash won't match on the next process's first-read verify,
127    /// so the reader falls through to a fresh promote off the pack.
128    pub(super) verified_loose_blobs: RwLock<RecentObjectCache<ContentHash, ()>>,
129}
130
131impl FsStore {
132    /// Create a new filesystem store rooted at the given path.
133    ///
134    /// The path should be the `.heddle` directory.
135    pub fn new(root: impl AsRef<Path>) -> Self {
136        let root = root.as_ref().to_path_buf();
137        let pack_manager = PackManager::new(packs_dir(&root));
138        Self {
139            root,
140            compression: CompressionConfig::default(),
141            pack_manager: RwLock::new(pack_manager),
142            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
143            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
144            recent_states: RwLock::new(RecentObjectCache::with_capacity(
145                RECENT_TREE_CACHE_CAPACITY,
146            )),
147            loose_object_write_mode: LooseObjectWriteMode::Durable,
148            snapshot_write_batch_depth: Mutex::new(0),
149            pending_directory_syncs: Mutex::new(BTreeSet::new()),
150            verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
151                VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
152            )),
153        }
154    }
155
156    /// Create a new filesystem store with custom compression settings.
157    pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
158        let root = root.as_ref().to_path_buf();
159        let pack_manager = PackManager::new(packs_dir(&root));
160        Self {
161            root,
162            compression,
163            pack_manager: RwLock::new(pack_manager),
164            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
165            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
166            recent_states: RwLock::new(RecentObjectCache::with_capacity(
167                RECENT_TREE_CACHE_CAPACITY,
168            )),
169            loose_object_write_mode: LooseObjectWriteMode::Durable,
170            snapshot_write_batch_depth: Mutex::new(0),
171            pending_directory_syncs: Mutex::new(BTreeSet::new()),
172            verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
173                VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
174            )),
175        }
176    }
177
178    /// Initialize the directory structure.
179    pub fn init(&self) -> Result<()> {
180        std::fs::create_dir_all(blobs_dir(&self.root))?;
181        std::fs::create_dir_all(trees_dir(&self.root))?;
182        std::fs::create_dir_all(states_dir(&self.root))?;
183        std::fs::create_dir_all(actions_dir(&self.root))?;
184        std::fs::create_dir_all(packs_dir(&self.root))?;
185        Ok(())
186    }
187
188    /// Get the root path.
189    pub fn root(&self) -> &Path {
190        &self.root
191    }
192
193    /// Get the compression configuration.
194    pub fn compression(&self) -> CompressionConfig {
195        self.compression
196    }
197
198    /// Set the compression configuration.
199    pub fn set_compression(&mut self, compression: CompressionConfig) {
200        self.compression = compression;
201    }
202
203    pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
204        self.loose_object_write_mode
205    }
206
207    pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
208        self.loose_object_write_mode = mode;
209    }
210
211    fn flush_pending_directory_syncs(&self) -> Result<usize> {
212        let pending_dirs = {
213            let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
214                crate::store::HeddleError::Config(
215                    "Failed to acquire pending directory sync lock".to_string(),
216                )
217            })?;
218            if guard.is_empty() {
219                return Ok(0);
220            }
221            let dirs = guard.iter().cloned().collect::<Vec<_>>();
222            guard.clear();
223            dirs
224        };
225
226        for (index, dir) in pending_dirs.iter().enumerate() {
227            if let Err(error) = sync_directory(dir) {
228                if let Ok(mut guard) = self.pending_directory_syncs.lock() {
229                    guard.extend(pending_dirs[index..].iter().cloned());
230                }
231                return Err(error.into());
232            }
233        }
234
235        Ok(pending_dirs.len())
236    }
237
238    /// Reload pack files from disk.
239    pub fn reload_packs(&self) -> Result<()> {
240        let mut manager = self.pack_manager.write().map_err(|_| {
241            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
242        })?;
243        manager.reload()
244    }
245
246    /// Reload pack files only if the packs directory has grown on
247    /// disk since we last read it. Cheap (one `read_dir` + count)
248    /// when nothing changed; full reload only when a sibling
249    /// `FsStore` has installed a new pack.
250    ///
251    /// Returns `true` when a reload happened. Used by `get_*` and
252    /// `has_*` paths after an in-memory miss to recover from the
253    /// "two FsStores backing the same `.heddle/` directory" case
254    /// (typical for lightweight thread worktrees).
255    ///
256    /// Double-checked locking: the read-lock fast path means a
257    /// thundering herd of concurrent misses doesn't serialize on
258    /// the write lock; only the first thread that observes a stale
259    /// view escalates and does the reload.
260    pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
261        // Fast path: read-lock and bail out if disk hasn't grown.
262        {
263            let manager = self.pack_manager.read().map_err(|_| {
264                crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
265            })?;
266            if !manager.needs_reload()? {
267                return Ok(false);
268            }
269        }
270        // Slow path: take the write lock and re-check (another
271        // thread may have already reloaded between our drop and
272        // re-acquire).
273        let mut manager = self.pack_manager.write().map_err(|_| {
274            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
275        })?;
276        manager.reload_if_disk_grew()
277    }
278
279    /// Get the pack manager for pack operations.
280    pub fn pack_manager(&self) -> &RwLock<PackManager> {
281        &self.pack_manager
282    }
283
284    pub fn clear_recent_object_caches(&self) {
285        if let Ok(mut blobs) = self.recent_blobs.write() {
286            *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
287        }
288        if let Ok(mut trees) = self.recent_trees.write() {
289            *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
290        }
291        if let Ok(mut states) = self.recent_states.write() {
292            *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
293        }
294    }
295
296    pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
297        let manager = self.pack_manager.read().map_err(|_| {
298            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
299        })?;
300        manager.list_all_ids()
301    }
302
303    pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
304        let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
305            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
306        })?;
307        let configured_mode = if *batch_active > 0 {
308            LooseObjectWriteMode::BatchDirectorySync
309        } else {
310            self.loose_object_write_mode
311        };
312        drop(batch_active);
313
314        let mode = match configured_mode {
315            LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
316            LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
317        };
318        write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
319    }
320
321    pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
322        write_atomic(path, data, AtomicWriteMode::Durable, None)
323    }
324
325    /// Atomic write tuned for *cache-mirror* loose objects: no fsync
326    /// at any level. The authoritative copy lives in a pack; if a
327    /// crash leaves the cache mirror torn, the read-side hash check
328    /// catches it and `promote_to_loose_uncompressed` rebuilds it
329    /// from the pack on the next access.
330    ///
331    /// On macOS APFS, `sync_data` alone costs ~5 ms per call (it
332    /// behaves like `F_FULLFSYNC` for tiny writes), and the parent
333    /// directory fsync is ~3-10 ms on top. For 1k blobs, that's
334    /// 5-15 seconds of pure fsync wallclock — the dominant cost in
335    /// the cold materialize path. Dropping both pays back ~30× on
336    /// raw create+rename throughput (measured: 200/s with sync_data
337    /// vs 5500/s without).
338    ///
339    /// Safety contract: this is only valid for files whose authority
340    /// lives elsewhere. Used by `promote_to_loose_uncompressed`; the
341    /// matching `loose_blob_path` reader hash-verifies before
342    /// trusting the bytes. Do *not* use for `put_blob` / `put_tree`
343    /// / `put_state` — those are the authoritative copy and must
344    /// survive a crash.
345    pub(super) fn write_loose_object_cache(&self, path: &Path, data: &[u8]) -> Result<()> {
346        write_atomic(path, data, AtomicWriteMode::NoSync, None)
347    }
348
349    pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
350        let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
351            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
352        })?;
353        *depth += 1;
354        Ok(())
355    }
356
357    pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
358        let should_flush = {
359            let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
360                crate::store::HeddleError::Config(
361                    "Failed to acquire snapshot batch lock".to_string(),
362                )
363            })?;
364            if *depth == 0 {
365                return Ok(());
366            }
367            *depth -= 1;
368            *depth == 0
369        };
370
371        if should_flush {
372            let _ = self.flush_pending_directory_syncs()?;
373        }
374
375        Ok(())
376    }
377
378    pub(super) fn abort_snapshot_write_batch_impl(&self) {
379        if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
380            *depth = 0;
381        }
382        if let Ok(mut pending) = self.pending_directory_syncs.lock() {
383            pending.clear();
384        }
385    }
386
387    #[cfg(test)]
388    pub(super) fn pending_directory_sync_count(&self) -> usize {
389        self.pending_directory_syncs
390            .lock()
391            .map(|pending| pending.len())
392            .unwrap_or(0)
393    }
394}