1use std::{
5 collections::{BTreeSet, HashMap, VecDeque},
6 hash::Hash,
7 path::{Path, PathBuf},
8 sync::{Mutex, RwLock},
9};
10
11use super::{
12 fs_io::{AtomicWriteMode, write_atomic},
13 fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
14};
15use crate::{
16 fs_atomic::sync_directory,
17 object::{Blob, ChangeId, ContentHash, State, Tree},
18 store::{
19 CompressionConfig, Result,
20 pack::{PackManager, PackObjectId},
21 },
22};
23
24const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
25const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
26const VERIFIED_LOOSE_BLOB_CACHE_CAPACITY: usize = 65_536;
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum LooseObjectWriteMode {
36 Durable,
37 BatchDirectorySync,
38}
39
40#[derive(Debug)]
41pub(super) struct RecentObjectCache<K, V> {
42 entries: HashMap<K, V>,
43 order: VecDeque<K>,
44 capacity: usize,
45}
46
47impl<K, V> RecentObjectCache<K, V>
48where
49 K: Copy + Eq + Hash,
50{
51 pub(super) fn with_capacity(capacity: usize) -> Self {
52 Self {
53 entries: HashMap::new(),
54 order: VecDeque::new(),
55 capacity,
56 }
57 }
58
59 pub(super) fn get(&self, key: &K) -> Option<&V> {
60 self.entries.get(key)
61 }
62
63 pub(super) fn insert(&mut self, key: K, value: V) {
64 if self.capacity == 0 {
65 return;
66 }
67 if self.entries.insert(key, value).is_none() {
68 self.order.push_back(key);
69 }
70 while self.entries.len() > self.capacity {
71 if let Some(oldest) = self.order.pop_front() {
72 self.entries.remove(&oldest);
73 }
74 }
75 }
76}
77
78pub struct FsStore {
99 pub(super) root: PathBuf,
100 pub(super) compression: CompressionConfig,
101 pack_manager: RwLock<PackManager>,
102 pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
103 pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
104 pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
105 loose_object_write_mode: LooseObjectWriteMode,
106 snapshot_write_batch_depth: Mutex<usize>,
107 pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
108 pub(super) verified_loose_blobs: RwLock<RecentObjectCache<ContentHash, ()>>,
129}
130
131impl Clone for FsStore {
132 fn clone(&self) -> Self {
133 let mut cloned = Self::with_compression(&self.root, self.compression);
134 cloned.loose_object_write_mode = self.loose_object_write_mode;
135 cloned
136 }
137}
138
139impl FsStore {
140 pub fn new(root: impl AsRef<Path>) -> Self {
144 let root = root.as_ref().to_path_buf();
145 let pack_manager = PackManager::new(packs_dir(&root));
146 Self {
147 root,
148 compression: CompressionConfig::default(),
149 pack_manager: RwLock::new(pack_manager),
150 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
151 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
152 recent_states: RwLock::new(RecentObjectCache::with_capacity(
153 RECENT_TREE_CACHE_CAPACITY,
154 )),
155 loose_object_write_mode: LooseObjectWriteMode::Durable,
156 snapshot_write_batch_depth: Mutex::new(0),
157 pending_directory_syncs: Mutex::new(BTreeSet::new()),
158 verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
159 VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
160 )),
161 }
162 }
163
164 pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
166 let root = root.as_ref().to_path_buf();
167 let pack_manager = PackManager::new(packs_dir(&root));
168 Self {
169 root,
170 compression,
171 pack_manager: RwLock::new(pack_manager),
172 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
173 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
174 recent_states: RwLock::new(RecentObjectCache::with_capacity(
175 RECENT_TREE_CACHE_CAPACITY,
176 )),
177 loose_object_write_mode: LooseObjectWriteMode::Durable,
178 snapshot_write_batch_depth: Mutex::new(0),
179 pending_directory_syncs: Mutex::new(BTreeSet::new()),
180 verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
181 VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
182 )),
183 }
184 }
185
186 pub fn init(&self) -> Result<()> {
188 std::fs::create_dir_all(blobs_dir(&self.root))?;
189 std::fs::create_dir_all(trees_dir(&self.root))?;
190 std::fs::create_dir_all(states_dir(&self.root))?;
191 std::fs::create_dir_all(actions_dir(&self.root))?;
192 std::fs::create_dir_all(packs_dir(&self.root))?;
193 Ok(())
194 }
195
196 pub fn root(&self) -> &Path {
198 &self.root
199 }
200
201 pub fn compression(&self) -> CompressionConfig {
203 self.compression
204 }
205
206 pub fn set_compression(&mut self, compression: CompressionConfig) {
208 self.compression = compression;
209 }
210
211 pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
212 self.loose_object_write_mode
213 }
214
215 pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
216 self.loose_object_write_mode = mode;
217 }
218
219 fn flush_pending_directory_syncs(&self) -> Result<usize> {
220 let pending_dirs = {
221 let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
222 crate::store::HeddleError::Config(
223 "Failed to acquire pending directory sync lock".to_string(),
224 )
225 })?;
226 if guard.is_empty() {
227 return Ok(0);
228 }
229 let dirs = guard.iter().cloned().collect::<Vec<_>>();
230 guard.clear();
231 dirs
232 };
233
234 for (index, dir) in pending_dirs.iter().enumerate() {
235 if let Err(error) = sync_directory(dir) {
236 if let Ok(mut guard) = self.pending_directory_syncs.lock() {
237 guard.extend(pending_dirs[index..].iter().cloned());
238 }
239 return Err(error.into());
240 }
241 }
242
243 Ok(pending_dirs.len())
244 }
245
246 pub fn reload_packs(&self) -> Result<()> {
248 let mut manager = self.pack_manager.write().map_err(|_| {
249 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
250 })?;
251 manager.reload()
252 }
253
254 pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
269 {
271 let manager = self.pack_manager.read().map_err(|_| {
272 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
273 })?;
274 if !manager.needs_reload()? {
275 return Ok(false);
276 }
277 }
278 let mut manager = self.pack_manager.write().map_err(|_| {
282 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
283 })?;
284 manager.reload_if_disk_grew()
285 }
286
287 pub fn pack_manager(&self) -> &RwLock<PackManager> {
289 &self.pack_manager
290 }
291
292 pub fn clear_recent_object_caches(&self) {
293 if let Ok(mut blobs) = self.recent_blobs.write() {
294 *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
295 }
296 if let Ok(mut trees) = self.recent_trees.write() {
297 *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
298 }
299 if let Ok(mut states) = self.recent_states.write() {
300 *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
301 }
302 }
303
304 pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
305 let manager = self.pack_manager.read().map_err(|_| {
306 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
307 })?;
308 manager.list_all_ids()
309 }
310
311 pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
312 let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
313 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
314 })?;
315 let configured_mode = if *batch_active > 0 {
316 LooseObjectWriteMode::BatchDirectorySync
317 } else {
318 self.loose_object_write_mode
319 };
320 drop(batch_active);
321
322 let mode = match configured_mode {
323 LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
324 LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
325 };
326 write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
327 }
328
329 pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
330 write_atomic(path, data, AtomicWriteMode::Durable, None)
331 }
332
333 pub(super) fn write_loose_object_cache(&self, path: &Path, data: &[u8]) -> Result<()> {
354 write_atomic(path, data, AtomicWriteMode::NoSync, None)
355 }
356
357 pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
358 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
359 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
360 })?;
361 *depth += 1;
362 Ok(())
363 }
364
365 pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
366 let should_flush = {
367 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
368 crate::store::HeddleError::Config(
369 "Failed to acquire snapshot batch lock".to_string(),
370 )
371 })?;
372 if *depth == 0 {
373 return Ok(());
374 }
375 *depth -= 1;
376 *depth == 0
377 };
378
379 if should_flush {
380 let _ = self.flush_pending_directory_syncs()?;
381 }
382
383 Ok(())
384 }
385
386 pub(super) fn abort_snapshot_write_batch_impl(&self) {
387 if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
388 *depth = 0;
389 }
390 if let Ok(mut pending) = self.pending_directory_syncs.lock() {
391 pending.clear();
392 }
393 }
394
395 #[cfg(test)]
396 pub(super) fn pending_directory_sync_count(&self) -> usize {
397 self.pending_directory_syncs
398 .lock()
399 .map(|pending| pending.len())
400 .unwrap_or(0)
401 }
402}