Skip to main content

objects/store/fs/
fs_store.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Core FsStore structure.
3
4use std::{
5    collections::{BTreeSet, HashMap, VecDeque},
6    hash::Hash,
7    path::{Path, PathBuf},
8    sync::{Mutex, RwLock},
9};
10
11use super::{
12    fs_io::{AtomicWriteMode, write_atomic},
13    fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
14};
15use crate::{
16    fs_atomic::sync_directory,
17    object::{Blob, ChangeId, ContentHash, State, Tree},
18    store::{
19        CompressionConfig, Result,
20        pack::{PackManager, PackObjectId},
21    },
22};
23
24const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
25const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
26/// Soft cap on the in-process loose-blob verification cache. Each
27/// entry is one `ContentHash` (~32 bytes) so this is ≈2 MB of memory
28/// for the upper bound, and the FIFO eviction is bounded by hash
29/// hits rather than store size. 65k entries covers the typical hot
30/// working set for million-blob monorepos; a daemon that materialises
31/// dozens of unrelated trees won't drift toward unbounded growth.
32const VERIFIED_LOOSE_BLOB_CACHE_CAPACITY: usize = 65_536;
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum LooseObjectWriteMode {
36    Durable,
37    BatchDirectorySync,
38}
39
40#[derive(Debug)]
41pub(super) struct RecentObjectCache<K, V> {
42    entries: HashMap<K, V>,
43    order: VecDeque<K>,
44    capacity: usize,
45}
46
47impl<K, V> RecentObjectCache<K, V>
48where
49    K: Copy + Eq + Hash,
50{
51    pub(super) fn with_capacity(capacity: usize) -> Self {
52        Self {
53            entries: HashMap::new(),
54            order: VecDeque::new(),
55            capacity,
56        }
57    }
58
59    pub(super) fn get(&self, key: &K) -> Option<&V> {
60        self.entries.get(key)
61    }
62
63    pub(super) fn insert(&mut self, key: K, value: V) {
64        if self.capacity == 0 {
65            return;
66        }
67        if self.entries.insert(key, value).is_none() {
68            self.order.push_back(key);
69        }
70        while self.entries.len() > self.capacity {
71            if let Some(oldest) = self.order.pop_front() {
72                self.entries.remove(&oldest);
73            }
74        }
75    }
76}
77
78/// Filesystem-based storage for Heddle objects.
79///
80/// Layout:
81/// ```text
82/// .heddle/
83///   objects/
84///     blobs/
85///       ab/
86///         cdef1234...
87///     trees/
88///       ab/
89///         cdef1234...
90///     states/
91///       <change_id>.state
92///   actions/
93///     <action_id>.action
94///   packs/
95///     <hash>.pack
96///     <hash>.idx
97/// ```
98pub struct FsStore {
99    pub(super) root: PathBuf,
100    pub(super) compression: CompressionConfig,
101    pack_manager: RwLock<PackManager>,
102    pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
103    pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
104    pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
105    loose_object_write_mode: LooseObjectWriteMode,
106    snapshot_write_batch_depth: Mutex<usize>,
107    pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
108    /// In-process trust cache for loose-blob cache mirrors. A hash
109    /// enters this LRU when this process either (a) wrote the blob
110    /// itself via `promote_to_loose_uncompressed` or (b) successfully
111    /// hash-verified it on first read. Bytes-on-disk for any entry
112    /// in this cache can be trusted without a re-hash by subsequent
113    /// `loose_blob_path` calls within the same process.
114    ///
115    /// Capped at [`VERIFIED_LOOSE_BLOB_CACHE_CAPACITY`] entries so a
116    /// long-lived process (`heddled`) materialising many unrelated
117    /// trees doesn't drift into unbounded memory growth. FIFO
118    /// eviction; an evicted hash pays one extra BLAKE3 on its next
119    /// read (cost-of-evict ≈ working-set-size BLAKE3 ops). Stored as
120    /// `RecentObjectCache<…, ()>` to share the FIFO-eviction
121    /// machinery with the other on-store caches; the unit value is
122    /// a marker that the corresponding loose mirror was verified.
123    ///
124    /// Pairs with `AtomicWriteMode::NoSync` on the write side: a
125    /// crashed promote leaves a torn cache-mirror file, but its
126    /// hash won't match on the next process's first-read verify,
127    /// so the reader falls through to a fresh promote off the pack.
128    pub(super) verified_loose_blobs: RwLock<RecentObjectCache<ContentHash, ()>>,
129}
130
131impl Clone for FsStore {
132    fn clone(&self) -> Self {
133        let mut cloned = Self::with_compression(&self.root, self.compression);
134        cloned.loose_object_write_mode = self.loose_object_write_mode;
135        cloned
136    }
137}
138
139impl FsStore {
140    /// Create a new filesystem store rooted at the given path.
141    ///
142    /// The path should be the `.heddle` directory.
143    pub fn new(root: impl AsRef<Path>) -> Self {
144        let root = root.as_ref().to_path_buf();
145        let pack_manager = PackManager::new(packs_dir(&root));
146        Self {
147            root,
148            compression: CompressionConfig::default(),
149            pack_manager: RwLock::new(pack_manager),
150            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
151            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
152            recent_states: RwLock::new(RecentObjectCache::with_capacity(
153                RECENT_TREE_CACHE_CAPACITY,
154            )),
155            loose_object_write_mode: LooseObjectWriteMode::Durable,
156            snapshot_write_batch_depth: Mutex::new(0),
157            pending_directory_syncs: Mutex::new(BTreeSet::new()),
158            verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
159                VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
160            )),
161        }
162    }
163
164    /// Create a new filesystem store with custom compression settings.
165    pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
166        let root = root.as_ref().to_path_buf();
167        let pack_manager = PackManager::new(packs_dir(&root));
168        Self {
169            root,
170            compression,
171            pack_manager: RwLock::new(pack_manager),
172            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
173            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
174            recent_states: RwLock::new(RecentObjectCache::with_capacity(
175                RECENT_TREE_CACHE_CAPACITY,
176            )),
177            loose_object_write_mode: LooseObjectWriteMode::Durable,
178            snapshot_write_batch_depth: Mutex::new(0),
179            pending_directory_syncs: Mutex::new(BTreeSet::new()),
180            verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
181                VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
182            )),
183        }
184    }
185
186    /// Initialize the directory structure.
187    pub fn init(&self) -> Result<()> {
188        std::fs::create_dir_all(blobs_dir(&self.root))?;
189        std::fs::create_dir_all(trees_dir(&self.root))?;
190        std::fs::create_dir_all(states_dir(&self.root))?;
191        std::fs::create_dir_all(actions_dir(&self.root))?;
192        std::fs::create_dir_all(packs_dir(&self.root))?;
193        Ok(())
194    }
195
196    /// Get the root path.
197    pub fn root(&self) -> &Path {
198        &self.root
199    }
200
201    /// Get the compression configuration.
202    pub fn compression(&self) -> CompressionConfig {
203        self.compression
204    }
205
206    /// Set the compression configuration.
207    pub fn set_compression(&mut self, compression: CompressionConfig) {
208        self.compression = compression;
209    }
210
211    pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
212        self.loose_object_write_mode
213    }
214
215    pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
216        self.loose_object_write_mode = mode;
217    }
218
219    fn flush_pending_directory_syncs(&self) -> Result<usize> {
220        let pending_dirs = {
221            let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
222                crate::store::HeddleError::Config(
223                    "Failed to acquire pending directory sync lock".to_string(),
224                )
225            })?;
226            if guard.is_empty() {
227                return Ok(0);
228            }
229            let dirs = guard.iter().cloned().collect::<Vec<_>>();
230            guard.clear();
231            dirs
232        };
233
234        for (index, dir) in pending_dirs.iter().enumerate() {
235            if let Err(error) = sync_directory(dir) {
236                if let Ok(mut guard) = self.pending_directory_syncs.lock() {
237                    guard.extend(pending_dirs[index..].iter().cloned());
238                }
239                return Err(error.into());
240            }
241        }
242
243        Ok(pending_dirs.len())
244    }
245
246    /// Reload pack files from disk.
247    pub fn reload_packs(&self) -> Result<()> {
248        let mut manager = self.pack_manager.write().map_err(|_| {
249            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
250        })?;
251        manager.reload()
252    }
253
254    /// Reload pack files only if the packs directory has grown on
255    /// disk since we last read it. Cheap (one `read_dir` + count)
256    /// when nothing changed; full reload only when a sibling
257    /// `FsStore` has installed a new pack.
258    ///
259    /// Returns `true` when a reload happened. Used by `get_*` and
260    /// `has_*` paths after an in-memory miss to recover from the
261    /// "two FsStores backing the same `.heddle/` directory" case
262    /// (typical for lightweight thread worktrees).
263    ///
264    /// Double-checked locking: the read-lock fast path means a
265    /// thundering herd of concurrent misses doesn't serialize on
266    /// the write lock; only the first thread that observes a stale
267    /// view escalates and does the reload.
268    pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
269        // Fast path: read-lock and bail out if disk hasn't grown.
270        {
271            let manager = self.pack_manager.read().map_err(|_| {
272                crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
273            })?;
274            if !manager.needs_reload()? {
275                return Ok(false);
276            }
277        }
278        // Slow path: take the write lock and re-check (another
279        // thread may have already reloaded between our drop and
280        // re-acquire).
281        let mut manager = self.pack_manager.write().map_err(|_| {
282            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
283        })?;
284        manager.reload_if_disk_grew()
285    }
286
287    /// Get the pack manager for pack operations.
288    pub fn pack_manager(&self) -> &RwLock<PackManager> {
289        &self.pack_manager
290    }
291
292    pub fn clear_recent_object_caches(&self) {
293        if let Ok(mut blobs) = self.recent_blobs.write() {
294            *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
295        }
296        if let Ok(mut trees) = self.recent_trees.write() {
297            *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
298        }
299        if let Ok(mut states) = self.recent_states.write() {
300            *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
301        }
302    }
303
304    pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
305        let manager = self.pack_manager.read().map_err(|_| {
306            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
307        })?;
308        manager.list_all_ids()
309    }
310
311    pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
312        let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
313            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
314        })?;
315        let configured_mode = if *batch_active > 0 {
316            LooseObjectWriteMode::BatchDirectorySync
317        } else {
318            self.loose_object_write_mode
319        };
320        drop(batch_active);
321
322        let mode = match configured_mode {
323            LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
324            LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
325        };
326        write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
327    }
328
329    pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
330        write_atomic(path, data, AtomicWriteMode::Durable, None)
331    }
332
333    /// Atomic write tuned for *cache-mirror* loose objects: no fsync
334    /// at any level. The authoritative copy lives in a pack; if a
335    /// crash leaves the cache mirror torn, the read-side hash check
336    /// catches it and `promote_to_loose_uncompressed` rebuilds it
337    /// from the pack on the next access.
338    ///
339    /// On macOS APFS, `sync_data` alone costs ~5 ms per call (it
340    /// behaves like `F_FULLFSYNC` for tiny writes), and the parent
341    /// directory fsync is ~3-10 ms on top. For 1k blobs, that's
342    /// 5-15 seconds of pure fsync wallclock — the dominant cost in
343    /// the cold materialize path. Dropping both pays back ~30× on
344    /// raw create+rename throughput (measured: 200/s with sync_data
345    /// vs 5500/s without).
346    ///
347    /// Safety contract: this is only valid for files whose authority
348    /// lives elsewhere. Used by `promote_to_loose_uncompressed`; the
349    /// matching `loose_blob_path` reader hash-verifies before
350    /// trusting the bytes. Do *not* use for `put_blob` / `put_tree`
351    /// / `put_state` — those are the authoritative copy and must
352    /// survive a crash.
353    pub(super) fn write_loose_object_cache(&self, path: &Path, data: &[u8]) -> Result<()> {
354        write_atomic(path, data, AtomicWriteMode::NoSync, None)
355    }
356
357    pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
358        let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
359            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
360        })?;
361        *depth += 1;
362        Ok(())
363    }
364
365    pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
366        let should_flush = {
367            let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
368                crate::store::HeddleError::Config(
369                    "Failed to acquire snapshot batch lock".to_string(),
370                )
371            })?;
372            if *depth == 0 {
373                return Ok(());
374            }
375            *depth -= 1;
376            *depth == 0
377        };
378
379        if should_flush {
380            let _ = self.flush_pending_directory_syncs()?;
381        }
382
383        Ok(())
384    }
385
386    pub(super) fn abort_snapshot_write_batch_impl(&self) {
387        if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
388            *depth = 0;
389        }
390        if let Ok(mut pending) = self.pending_directory_syncs.lock() {
391            pending.clear();
392        }
393    }
394
395    #[cfg(test)]
396    pub(super) fn pending_directory_sync_count(&self) -> usize {
397        self.pending_directory_syncs
398            .lock()
399            .map(|pending| pending.len())
400            .unwrap_or(0)
401    }
402}