Skip to main content

objects/store/fs/
fs_store.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Core FsStore structure.
3
4use std::{
5    collections::{BTreeSet, HashMap, VecDeque},
6    hash::Hash,
7    path::{Path, PathBuf},
8    sync::{Mutex, RwLock},
9};
10
11use heddle_format::compression::CompressionConfig;
12
13use super::{
14    fs_io::{AtomicWriteMode, write_atomic},
15    fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
16};
17use crate::{
18    fs_atomic::sync_directory,
19    object::{Blob, ChangeId, ContentHash, State, Tree},
20    store::{
21        Result,
22        pack::{PackManager, PackObjectId},
23    },
24};
25
26const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
27const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
28/// Soft cap on the in-process loose-blob verification cache. Each
29/// entry is one `ContentHash` (~32 bytes) so this is ≈2 MB of memory
30/// for the upper bound, and the FIFO eviction is bounded by hash
31/// hits rather than store size. 65k entries covers the typical hot
32/// working set for million-blob monorepos; a daemon that materialises
33/// dozens of unrelated trees won't drift toward unbounded growth.
34const VERIFIED_LOOSE_BLOB_CACHE_CAPACITY: usize = 65_536;
35
36#[derive(Clone, Copy, Debug, Eq, PartialEq)]
37pub enum LooseObjectWriteMode {
38    Durable,
39    BatchDirectorySync,
40}
41
42#[derive(Debug)]
43pub(super) struct RecentObjectCache<K, V> {
44    entries: HashMap<K, V>,
45    order: VecDeque<K>,
46    capacity: usize,
47}
48
49impl<K, V> RecentObjectCache<K, V>
50where
51    K: Copy + Eq + Hash,
52{
53    pub(super) fn with_capacity(capacity: usize) -> Self {
54        Self {
55            entries: HashMap::new(),
56            order: VecDeque::new(),
57            capacity,
58        }
59    }
60
61    pub(super) fn get(&self, key: &K) -> Option<&V> {
62        self.entries.get(key)
63    }
64
65    pub(super) fn insert(&mut self, key: K, value: V) {
66        if self.capacity == 0 {
67            return;
68        }
69        if self.entries.insert(key, value).is_none() {
70            self.order.push_back(key);
71        }
72        while self.entries.len() > self.capacity {
73            if let Some(oldest) = self.order.pop_front() {
74                self.entries.remove(&oldest);
75            }
76        }
77    }
78}
79
80/// Filesystem-based storage for Heddle objects.
81///
82/// Layout:
83/// ```text
84/// .heddle/
85///   objects/
86///     blobs/
87///       ab/
88///         cdef1234...
89///     trees/
90///       ab/
91///         cdef1234...
92///     states/
93///       <change_id>.state
94///   actions/
95///     <action_id>.action
96///   packs/
97///     <hash>.pack
98///     <hash>.idx
99/// ```
100pub struct FsStore {
101    pub(super) root: PathBuf,
102    pub(super) compression: CompressionConfig,
103    pack_manager: RwLock<PackManager>,
104    pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
105    pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
106    pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
107    loose_object_write_mode: LooseObjectWriteMode,
108    snapshot_write_batch_depth: Mutex<usize>,
109    pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
110    /// In-process trust cache for loose-blob cache mirrors. A hash
111    /// enters this LRU when this process either (a) wrote the blob
112    /// itself via `promote_to_loose_uncompressed` or (b) successfully
113    /// hash-verified it on first read. Bytes-on-disk for any entry
114    /// in this cache can be trusted without a re-hash by subsequent
115    /// `loose_blob_path` calls within the same process.
116    ///
117    /// Capped at [`VERIFIED_LOOSE_BLOB_CACHE_CAPACITY`] entries so a
118    /// long-lived process (`heddled`) materialising many unrelated
119    /// trees doesn't drift into unbounded memory growth. FIFO
120    /// eviction; an evicted hash pays one extra BLAKE3 on its next
121    /// read (cost-of-evict ≈ working-set-size BLAKE3 ops). Stored as
122    /// `RecentObjectCache<…, ()>` to share the FIFO-eviction
123    /// machinery with the other on-store caches; the unit value is
124    /// a marker that the corresponding loose mirror was verified.
125    ///
126    /// Pairs with `AtomicWriteMode::NoSync` on the write side: a
127    /// crashed promote leaves a torn cache-mirror file, but its
128    /// hash won't match on the next process's first-read verify,
129    /// so the reader falls through to a fresh promote off the pack.
130    pub(super) verified_loose_blobs: RwLock<RecentObjectCache<ContentHash, ()>>,
131}
132
133impl Clone for FsStore {
134    fn clone(&self) -> Self {
135        let mut cloned = Self::with_compression(&self.root, self.compression);
136        cloned.loose_object_write_mode = self.loose_object_write_mode;
137        cloned
138    }
139}
140
141impl FsStore {
142    /// Create a new filesystem store rooted at the given path.
143    ///
144    /// The path should be the `.heddle` directory.
145    pub fn new(root: impl AsRef<Path>) -> Self {
146        let root = root.as_ref().to_path_buf();
147        let pack_manager = PackManager::new(packs_dir(&root));
148        Self {
149            root,
150            compression: CompressionConfig::default(),
151            pack_manager: RwLock::new(pack_manager),
152            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
153            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
154            recent_states: RwLock::new(RecentObjectCache::with_capacity(
155                RECENT_TREE_CACHE_CAPACITY,
156            )),
157            loose_object_write_mode: LooseObjectWriteMode::Durable,
158            snapshot_write_batch_depth: Mutex::new(0),
159            pending_directory_syncs: Mutex::new(BTreeSet::new()),
160            verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
161                VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
162            )),
163        }
164    }
165
166    /// Create a new filesystem store with custom compression settings.
167    pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
168        let root = root.as_ref().to_path_buf();
169        let pack_manager = PackManager::new(packs_dir(&root));
170        Self {
171            root,
172            compression,
173            pack_manager: RwLock::new(pack_manager),
174            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
175            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
176            recent_states: RwLock::new(RecentObjectCache::with_capacity(
177                RECENT_TREE_CACHE_CAPACITY,
178            )),
179            loose_object_write_mode: LooseObjectWriteMode::Durable,
180            snapshot_write_batch_depth: Mutex::new(0),
181            pending_directory_syncs: Mutex::new(BTreeSet::new()),
182            verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
183                VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
184            )),
185        }
186    }
187
188    /// Initialize the directory structure.
189    pub fn init(&self) -> Result<()> {
190        std::fs::create_dir_all(blobs_dir(&self.root))?;
191        std::fs::create_dir_all(trees_dir(&self.root))?;
192        std::fs::create_dir_all(states_dir(&self.root))?;
193        std::fs::create_dir_all(actions_dir(&self.root))?;
194        std::fs::create_dir_all(packs_dir(&self.root))?;
195        Ok(())
196    }
197
198    /// Get the root path.
199    pub fn root(&self) -> &Path {
200        &self.root
201    }
202
203    /// Get the compression configuration.
204    pub fn compression(&self) -> CompressionConfig {
205        self.compression
206    }
207
208    /// Set the compression configuration.
209    pub fn set_compression(&mut self, compression: CompressionConfig) {
210        self.compression = compression;
211    }
212
213    pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
214        self.loose_object_write_mode
215    }
216
217    pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
218        self.loose_object_write_mode = mode;
219    }
220
221    fn flush_pending_directory_syncs(&self) -> Result<usize> {
222        let pending_dirs = {
223            let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
224                crate::store::HeddleError::Config(
225                    "Failed to acquire pending directory sync lock".to_string(),
226                )
227            })?;
228            if guard.is_empty() {
229                return Ok(0);
230            }
231            let dirs = guard.iter().cloned().collect::<Vec<_>>();
232            guard.clear();
233            dirs
234        };
235
236        for (index, dir) in pending_dirs.iter().enumerate() {
237            if let Err(error) = sync_directory(dir) {
238                if let Ok(mut guard) = self.pending_directory_syncs.lock() {
239                    guard.extend(pending_dirs[index..].iter().cloned());
240                }
241                return Err(error.into());
242            }
243        }
244
245        Ok(pending_dirs.len())
246    }
247
248    /// Reload pack files from disk.
249    pub fn reload_packs(&self) -> Result<()> {
250        let mut manager = self.pack_manager.write().map_err(|_| {
251            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
252        })?;
253        manager.reload()
254    }
255
256    /// Reload pack files only if the packs directory has grown on
257    /// disk since we last read it. Cheap (one `read_dir` + count)
258    /// when nothing changed; full reload only when a sibling
259    /// `FsStore` has installed a new pack.
260    ///
261    /// Returns `true` when a reload happened. Used by `get_*` and
262    /// `has_*` paths after an in-memory miss to recover from the
263    /// "two FsStores backing the same `.heddle/` directory" case
264    /// (typical for lightweight thread worktrees).
265    ///
266    /// Double-checked locking: the read-lock fast path means a
267    /// thundering herd of concurrent misses doesn't serialize on
268    /// the write lock; only the first thread that observes a stale
269    /// view escalates and does the reload.
270    pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
271        // Fast path: read-lock and bail out if disk hasn't grown.
272        {
273            let manager = self.pack_manager.read().map_err(|_| {
274                crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
275            })?;
276            if !manager.needs_reload()? {
277                return Ok(false);
278            }
279        }
280        // Slow path: take the write lock and re-check (another
281        // thread may have already reloaded between our drop and
282        // re-acquire).
283        let mut manager = self.pack_manager.write().map_err(|_| {
284            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
285        })?;
286        manager.reload_if_disk_grew()
287    }
288
289    /// Get the pack manager for pack operations.
290    pub fn pack_manager(&self) -> &RwLock<PackManager> {
291        &self.pack_manager
292    }
293
294    pub fn clear_recent_object_caches(&self) {
295        if let Ok(mut blobs) = self.recent_blobs.write() {
296            *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
297        }
298        if let Ok(mut trees) = self.recent_trees.write() {
299            *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
300        }
301        if let Ok(mut states) = self.recent_states.write() {
302            *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
303        }
304    }
305
306    pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
307        let manager = self.pack_manager.read().map_err(|_| {
308            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
309        })?;
310        manager.list_all_ids()
311    }
312
313    pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
314        let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
315            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
316        })?;
317        let configured_mode = if *batch_active > 0 {
318            LooseObjectWriteMode::BatchDirectorySync
319        } else {
320            self.loose_object_write_mode
321        };
322        drop(batch_active);
323
324        let mode = match configured_mode {
325            LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
326            LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
327        };
328        write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
329    }
330
331    pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
332        write_atomic(path, data, AtomicWriteMode::Durable, None)
333    }
334
335    /// Atomic write tuned for *cache-mirror* loose objects: no fsync
336    /// at any level. The authoritative copy lives in a pack; if a
337    /// crash leaves the cache mirror torn, the read-side hash check
338    /// catches it and `promote_to_loose_uncompressed` rebuilds it
339    /// from the pack on the next access.
340    ///
341    /// On macOS APFS, `sync_data` alone costs ~5 ms per call (it
342    /// behaves like `F_FULLFSYNC` for tiny writes), and the parent
343    /// directory fsync is ~3-10 ms on top. For 1k blobs, that's
344    /// 5-15 seconds of pure fsync wallclock — the dominant cost in
345    /// the cold materialize path. Dropping both pays back ~30× on
346    /// raw create+rename throughput (measured: 200/s with sync_data
347    /// vs 5500/s without).
348    ///
349    /// Safety contract: this is only valid for files whose authority
350    /// lives elsewhere. Used by `promote_to_loose_uncompressed`; the
351    /// matching `loose_blob_path` reader hash-verifies before
352    /// trusting the bytes. Do *not* use for `put_blob` / `put_tree`
353    /// / `put_state` — those are the authoritative copy and must
354    /// survive a crash.
355    pub(super) fn write_loose_object_cache(&self, path: &Path, data: &[u8]) -> Result<()> {
356        write_atomic(path, data, AtomicWriteMode::NoSync, None)
357    }
358
359    pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
360        let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
361            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
362        })?;
363        *depth += 1;
364        Ok(())
365    }
366
367    pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
368        let should_flush = {
369            let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
370                crate::store::HeddleError::Config(
371                    "Failed to acquire snapshot batch lock".to_string(),
372                )
373            })?;
374            if *depth == 0 {
375                return Ok(());
376            }
377            *depth -= 1;
378            *depth == 0
379        };
380
381        if should_flush {
382            let _ = self.flush_pending_directory_syncs()?;
383        }
384
385        Ok(())
386    }
387
388    pub(super) fn abort_snapshot_write_batch_impl(&self) {
389        if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
390            *depth = 0;
391        }
392        if let Ok(mut pending) = self.pending_directory_syncs.lock() {
393            pending.clear();
394        }
395    }
396
397    #[cfg(test)]
398    pub(super) fn pending_directory_sync_count(&self) -> usize {
399        self.pending_directory_syncs
400            .lock()
401            .map(|pending| pending.len())
402            .unwrap_or(0)
403    }
404}