1use std::{
5 collections::{BTreeSet, HashMap, VecDeque},
6 hash::Hash,
7 path::{Path, PathBuf},
8 sync::{Mutex, RwLock},
9};
10
11use heddle_format::compression::CompressionConfig;
12
13use super::{
14 fs_io::{AtomicWriteMode, write_atomic},
15 fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
16};
17use crate::{
18 fs_atomic::sync_directory,
19 object::{Blob, ChangeId, ContentHash, State, Tree},
20 store::{
21 Result,
22 pack::{PackManager, PackObjectId},
23 },
24};
25
26const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
27const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
28const VERIFIED_LOOSE_BLOB_CACHE_CAPACITY: usize = 65_536;
35
36#[derive(Clone, Copy, Debug, Eq, PartialEq)]
37pub enum LooseObjectWriteMode {
38 Durable,
39 BatchDirectorySync,
40}
41
42#[derive(Debug)]
43pub(super) struct RecentObjectCache<K, V> {
44 entries: HashMap<K, V>,
45 order: VecDeque<K>,
46 capacity: usize,
47}
48
49impl<K, V> RecentObjectCache<K, V>
50where
51 K: Copy + Eq + Hash,
52{
53 pub(super) fn with_capacity(capacity: usize) -> Self {
54 Self {
55 entries: HashMap::new(),
56 order: VecDeque::new(),
57 capacity,
58 }
59 }
60
61 pub(super) fn get(&self, key: &K) -> Option<&V> {
62 self.entries.get(key)
63 }
64
65 pub(super) fn insert(&mut self, key: K, value: V) {
66 if self.capacity == 0 {
67 return;
68 }
69 if self.entries.insert(key, value).is_none() {
70 self.order.push_back(key);
71 }
72 while self.entries.len() > self.capacity {
73 if let Some(oldest) = self.order.pop_front() {
74 self.entries.remove(&oldest);
75 }
76 }
77 }
78}
79
80pub struct FsStore {
101 pub(super) root: PathBuf,
102 pub(super) compression: CompressionConfig,
103 pack_manager: RwLock<PackManager>,
104 pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
105 pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
106 pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
107 loose_object_write_mode: LooseObjectWriteMode,
108 snapshot_write_batch_depth: Mutex<usize>,
109 pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
110 pub(super) verified_loose_blobs: RwLock<RecentObjectCache<ContentHash, ()>>,
131}
132
133impl Clone for FsStore {
134 fn clone(&self) -> Self {
135 let mut cloned = Self::with_compression(&self.root, self.compression);
136 cloned.loose_object_write_mode = self.loose_object_write_mode;
137 cloned
138 }
139}
140
141impl FsStore {
142 pub fn new(root: impl AsRef<Path>) -> Self {
146 let root = root.as_ref().to_path_buf();
147 let pack_manager = PackManager::new(packs_dir(&root));
148 Self {
149 root,
150 compression: CompressionConfig::default(),
151 pack_manager: RwLock::new(pack_manager),
152 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
153 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
154 recent_states: RwLock::new(RecentObjectCache::with_capacity(
155 RECENT_TREE_CACHE_CAPACITY,
156 )),
157 loose_object_write_mode: LooseObjectWriteMode::Durable,
158 snapshot_write_batch_depth: Mutex::new(0),
159 pending_directory_syncs: Mutex::new(BTreeSet::new()),
160 verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
161 VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
162 )),
163 }
164 }
165
166 pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
168 let root = root.as_ref().to_path_buf();
169 let pack_manager = PackManager::new(packs_dir(&root));
170 Self {
171 root,
172 compression,
173 pack_manager: RwLock::new(pack_manager),
174 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
175 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
176 recent_states: RwLock::new(RecentObjectCache::with_capacity(
177 RECENT_TREE_CACHE_CAPACITY,
178 )),
179 loose_object_write_mode: LooseObjectWriteMode::Durable,
180 snapshot_write_batch_depth: Mutex::new(0),
181 pending_directory_syncs: Mutex::new(BTreeSet::new()),
182 verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
183 VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
184 )),
185 }
186 }
187
188 pub fn init(&self) -> Result<()> {
190 std::fs::create_dir_all(blobs_dir(&self.root))?;
191 std::fs::create_dir_all(trees_dir(&self.root))?;
192 std::fs::create_dir_all(states_dir(&self.root))?;
193 std::fs::create_dir_all(actions_dir(&self.root))?;
194 std::fs::create_dir_all(packs_dir(&self.root))?;
195 Ok(())
196 }
197
198 pub fn root(&self) -> &Path {
200 &self.root
201 }
202
203 pub fn compression(&self) -> CompressionConfig {
205 self.compression
206 }
207
208 pub fn set_compression(&mut self, compression: CompressionConfig) {
210 self.compression = compression;
211 }
212
213 pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
214 self.loose_object_write_mode
215 }
216
217 pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
218 self.loose_object_write_mode = mode;
219 }
220
221 fn flush_pending_directory_syncs(&self) -> Result<usize> {
222 let pending_dirs = {
223 let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
224 crate::store::HeddleError::Config(
225 "Failed to acquire pending directory sync lock".to_string(),
226 )
227 })?;
228 if guard.is_empty() {
229 return Ok(0);
230 }
231 let dirs = guard.iter().cloned().collect::<Vec<_>>();
232 guard.clear();
233 dirs
234 };
235
236 for (index, dir) in pending_dirs.iter().enumerate() {
237 if let Err(error) = sync_directory(dir) {
238 if let Ok(mut guard) = self.pending_directory_syncs.lock() {
239 guard.extend(pending_dirs[index..].iter().cloned());
240 }
241 return Err(error.into());
242 }
243 }
244
245 Ok(pending_dirs.len())
246 }
247
248 pub fn reload_packs(&self) -> Result<()> {
250 let mut manager = self.pack_manager.write().map_err(|_| {
251 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
252 })?;
253 manager.reload()
254 }
255
256 pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
271 {
273 let manager = self.pack_manager.read().map_err(|_| {
274 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
275 })?;
276 if !manager.needs_reload()? {
277 return Ok(false);
278 }
279 }
280 let mut manager = self.pack_manager.write().map_err(|_| {
284 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
285 })?;
286 manager.reload_if_disk_grew()
287 }
288
289 pub fn pack_manager(&self) -> &RwLock<PackManager> {
291 &self.pack_manager
292 }
293
294 pub fn clear_recent_object_caches(&self) {
295 if let Ok(mut blobs) = self.recent_blobs.write() {
296 *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
297 }
298 if let Ok(mut trees) = self.recent_trees.write() {
299 *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
300 }
301 if let Ok(mut states) = self.recent_states.write() {
302 *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
303 }
304 }
305
306 pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
307 let manager = self.pack_manager.read().map_err(|_| {
308 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
309 })?;
310 manager.list_all_ids()
311 }
312
313 pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
314 let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
315 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
316 })?;
317 let configured_mode = if *batch_active > 0 {
318 LooseObjectWriteMode::BatchDirectorySync
319 } else {
320 self.loose_object_write_mode
321 };
322 drop(batch_active);
323
324 let mode = match configured_mode {
325 LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
326 LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
327 };
328 write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
329 }
330
331 pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
332 write_atomic(path, data, AtomicWriteMode::Durable, None)
333 }
334
335 pub(super) fn write_loose_object_cache(&self, path: &Path, data: &[u8]) -> Result<()> {
356 write_atomic(path, data, AtomicWriteMode::NoSync, None)
357 }
358
359 pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
360 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
361 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
362 })?;
363 *depth += 1;
364 Ok(())
365 }
366
367 pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
368 let should_flush = {
369 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
370 crate::store::HeddleError::Config(
371 "Failed to acquire snapshot batch lock".to_string(),
372 )
373 })?;
374 if *depth == 0 {
375 return Ok(());
376 }
377 *depth -= 1;
378 *depth == 0
379 };
380
381 if should_flush {
382 let _ = self.flush_pending_directory_syncs()?;
383 }
384
385 Ok(())
386 }
387
388 pub(super) fn abort_snapshot_write_batch_impl(&self) {
389 if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
390 *depth = 0;
391 }
392 if let Ok(mut pending) = self.pending_directory_syncs.lock() {
393 pending.clear();
394 }
395 }
396
397 #[cfg(test)]
398 pub(super) fn pending_directory_sync_count(&self) -> usize {
399 self.pending_directory_syncs
400 .lock()
401 .map(|pending| pending.len())
402 .unwrap_or(0)
403 }
404}