Skip to main content

objects/store/fs/
fs_store.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Core FsStore structure.
3
4use std::{
5    collections::{BTreeSet, HashMap, VecDeque},
6    hash::Hash,
7    path::{Path, PathBuf},
8    sync::{Mutex, RwLock},
9};
10
11use super::{
12    fs_io::{AtomicWriteMode, write_atomic},
13    fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
14};
15use crate::{
16    fs_atomic::sync_directory,
17    object::{Blob, ChangeId, ContentHash, State, Tree},
18    store::{
19        CompressionConfig, Result,
20        pack::{PackManager, PackObjectId},
21    },
22};
23
24const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
25const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
26
27#[derive(Clone, Copy, Debug, Eq, PartialEq)]
28pub enum LooseObjectWriteMode {
29    Durable,
30    BatchDirectorySync,
31}
32
33#[derive(Debug)]
34pub(super) struct RecentObjectCache<K, V> {
35    entries: HashMap<K, V>,
36    order: VecDeque<K>,
37    capacity: usize,
38}
39
40impl<K, V> RecentObjectCache<K, V>
41where
42    K: Copy + Eq + Hash,
43{
44    fn with_capacity(capacity: usize) -> Self {
45        Self {
46            entries: HashMap::new(),
47            order: VecDeque::new(),
48            capacity,
49        }
50    }
51
52    pub(super) fn get(&self, key: &K) -> Option<&V> {
53        self.entries.get(key)
54    }
55
56    pub(super) fn insert(&mut self, key: K, value: V) {
57        if self.capacity == 0 {
58            return;
59        }
60        if self.entries.insert(key, value).is_none() {
61            self.order.push_back(key);
62        }
63        while self.entries.len() > self.capacity {
64            if let Some(oldest) = self.order.pop_front() {
65                self.entries.remove(&oldest);
66            }
67        }
68    }
69}
70
71/// Filesystem-based storage for Heddle objects.
72///
73/// Layout:
74/// ```text
75/// .heddle/
76///   objects/
77///     blobs/
78///       ab/
79///         cdef1234...
80///     trees/
81///       ab/
82///         cdef1234...
83///     states/
84///       <change_id>.state
85///   actions/
86///     <action_id>.action
87///   packs/
88///     <hash>.pack
89///     <hash>.idx
90/// ```
91pub struct FsStore {
92    pub(super) root: PathBuf,
93    pub(super) compression: CompressionConfig,
94    pack_manager: RwLock<PackManager>,
95    pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
96    pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
97    pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
98    loose_object_write_mode: LooseObjectWriteMode,
99    snapshot_write_batch_depth: Mutex<usize>,
100    pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
101}
102
103impl FsStore {
104    /// Create a new filesystem store rooted at the given path.
105    ///
106    /// The path should be the `.heddle` directory.
107    pub fn new(root: impl AsRef<Path>) -> Self {
108        let root = root.as_ref().to_path_buf();
109        let pack_manager = PackManager::new(packs_dir(&root));
110        Self {
111            root,
112            compression: CompressionConfig::default(),
113            pack_manager: RwLock::new(pack_manager),
114            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
115            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
116            recent_states: RwLock::new(RecentObjectCache::with_capacity(
117                RECENT_TREE_CACHE_CAPACITY,
118            )),
119            loose_object_write_mode: LooseObjectWriteMode::Durable,
120            snapshot_write_batch_depth: Mutex::new(0),
121            pending_directory_syncs: Mutex::new(BTreeSet::new()),
122        }
123    }
124
125    /// Create a new filesystem store with custom compression settings.
126    pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
127        let root = root.as_ref().to_path_buf();
128        let pack_manager = PackManager::new(packs_dir(&root));
129        Self {
130            root,
131            compression,
132            pack_manager: RwLock::new(pack_manager),
133            recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
134            recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
135            recent_states: RwLock::new(RecentObjectCache::with_capacity(
136                RECENT_TREE_CACHE_CAPACITY,
137            )),
138            loose_object_write_mode: LooseObjectWriteMode::Durable,
139            snapshot_write_batch_depth: Mutex::new(0),
140            pending_directory_syncs: Mutex::new(BTreeSet::new()),
141        }
142    }
143
144    /// Initialize the directory structure.
145    pub fn init(&self) -> Result<()> {
146        std::fs::create_dir_all(blobs_dir(&self.root))?;
147        std::fs::create_dir_all(trees_dir(&self.root))?;
148        std::fs::create_dir_all(states_dir(&self.root))?;
149        std::fs::create_dir_all(actions_dir(&self.root))?;
150        std::fs::create_dir_all(packs_dir(&self.root))?;
151        Ok(())
152    }
153
154    /// Get the root path.
155    pub fn root(&self) -> &Path {
156        &self.root
157    }
158
159    /// Get the compression configuration.
160    pub fn compression(&self) -> CompressionConfig {
161        self.compression
162    }
163
164    /// Set the compression configuration.
165    pub fn set_compression(&mut self, compression: CompressionConfig) {
166        self.compression = compression;
167    }
168
169    pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
170        self.loose_object_write_mode
171    }
172
173    pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
174        self.loose_object_write_mode = mode;
175    }
176
177    fn flush_pending_directory_syncs(&self) -> Result<usize> {
178        let pending_dirs = {
179            let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
180                crate::store::HeddleError::Config(
181                    "Failed to acquire pending directory sync lock".to_string(),
182                )
183            })?;
184            if guard.is_empty() {
185                return Ok(0);
186            }
187            let dirs = guard.iter().cloned().collect::<Vec<_>>();
188            guard.clear();
189            dirs
190        };
191
192        for (index, dir) in pending_dirs.iter().enumerate() {
193            if let Err(error) = sync_directory(dir) {
194                if let Ok(mut guard) = self.pending_directory_syncs.lock() {
195                    guard.extend(pending_dirs[index..].iter().cloned());
196                }
197                return Err(error.into());
198            }
199        }
200
201        Ok(pending_dirs.len())
202    }
203
204    /// Reload pack files from disk.
205    pub fn reload_packs(&self) -> Result<()> {
206        let mut manager = self.pack_manager.write().map_err(|_| {
207            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
208        })?;
209        manager.reload()
210    }
211
212    /// Reload pack files only if the packs directory has grown on
213    /// disk since we last read it. Cheap (one `read_dir` + count)
214    /// when nothing changed; full reload only when a sibling
215    /// `FsStore` has installed a new pack.
216    ///
217    /// Returns `true` when a reload happened. Used by `get_*` and
218    /// `has_*` paths after an in-memory miss to recover from the
219    /// "two FsStores backing the same `.heddle/` directory" case
220    /// (typical for lightweight thread worktrees).
221    ///
222    /// Double-checked locking: the read-lock fast path means a
223    /// thundering herd of concurrent misses doesn't serialize on
224    /// the write lock; only the first thread that observes a stale
225    /// view escalates and does the reload.
226    pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
227        // Fast path: read-lock and bail out if disk hasn't grown.
228        {
229            let manager = self.pack_manager.read().map_err(|_| {
230                crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
231            })?;
232            if !manager.needs_reload()? {
233                return Ok(false);
234            }
235        }
236        // Slow path: take the write lock and re-check (another
237        // thread may have already reloaded between our drop and
238        // re-acquire).
239        let mut manager = self.pack_manager.write().map_err(|_| {
240            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
241        })?;
242        manager.reload_if_disk_grew()
243    }
244
245    /// Get the pack manager for pack operations.
246    pub fn pack_manager(&self) -> &RwLock<PackManager> {
247        &self.pack_manager
248    }
249
250    pub fn clear_recent_object_caches(&self) {
251        if let Ok(mut blobs) = self.recent_blobs.write() {
252            *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
253        }
254        if let Ok(mut trees) = self.recent_trees.write() {
255            *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
256        }
257        if let Ok(mut states) = self.recent_states.write() {
258            *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
259        }
260    }
261
262    pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
263        let manager = self.pack_manager.read().map_err(|_| {
264            crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
265        })?;
266        manager.list_all_ids()
267    }
268
269    pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
270        let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
271            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
272        })?;
273        let configured_mode = if *batch_active > 0 {
274            LooseObjectWriteMode::BatchDirectorySync
275        } else {
276            self.loose_object_write_mode
277        };
278        drop(batch_active);
279
280        let mode = match configured_mode {
281            LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
282            LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
283        };
284        write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
285    }
286
287    pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
288        write_atomic(path, data, AtomicWriteMode::Durable, None)
289    }
290
291    pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
292        let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
293            crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
294        })?;
295        *depth += 1;
296        Ok(())
297    }
298
299    pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
300        let should_flush = {
301            let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
302                crate::store::HeddleError::Config(
303                    "Failed to acquire snapshot batch lock".to_string(),
304                )
305            })?;
306            if *depth == 0 {
307                return Ok(());
308            }
309            *depth -= 1;
310            *depth == 0
311        };
312
313        if should_flush {
314            let _ = self.flush_pending_directory_syncs()?;
315        }
316
317        Ok(())
318    }
319
320    pub(super) fn abort_snapshot_write_batch_impl(&self) {
321        if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
322            *depth = 0;
323        }
324        if let Ok(mut pending) = self.pending_directory_syncs.lock() {
325            pending.clear();
326        }
327    }
328
329    #[cfg(test)]
330    pub(super) fn pending_directory_sync_count(&self) -> usize {
331        self.pending_directory_syncs
332            .lock()
333            .map(|pending| pending.len())
334            .unwrap_or(0)
335    }
336}