1use std::{
5 collections::{BTreeSet, HashMap, VecDeque},
6 hash::Hash,
7 path::{Path, PathBuf},
8 sync::{Mutex, RwLock},
9};
10
11use super::{
12 fs_io::{AtomicWriteMode, write_atomic},
13 fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
14};
15use crate::{
16 fs_atomic::sync_directory,
17 object::{Blob, ChangeId, ContentHash, State, Tree},
18 store::{
19 CompressionConfig, Result,
20 pack::{PackManager, PackObjectId},
21 },
22};
23
24const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
25const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
26const VERIFIED_LOOSE_BLOB_CACHE_CAPACITY: usize = 65_536;
33
34#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum LooseObjectWriteMode {
36 Durable,
37 BatchDirectorySync,
38}
39
40#[derive(Debug)]
41pub(super) struct RecentObjectCache<K, V> {
42 entries: HashMap<K, V>,
43 order: VecDeque<K>,
44 capacity: usize,
45}
46
47impl<K, V> RecentObjectCache<K, V>
48where
49 K: Copy + Eq + Hash,
50{
51 pub(super) fn with_capacity(capacity: usize) -> Self {
52 Self {
53 entries: HashMap::new(),
54 order: VecDeque::new(),
55 capacity,
56 }
57 }
58
59 pub(super) fn get(&self, key: &K) -> Option<&V> {
60 self.entries.get(key)
61 }
62
63 pub(super) fn insert(&mut self, key: K, value: V) {
64 if self.capacity == 0 {
65 return;
66 }
67 if self.entries.insert(key, value).is_none() {
68 self.order.push_back(key);
69 }
70 while self.entries.len() > self.capacity {
71 if let Some(oldest) = self.order.pop_front() {
72 self.entries.remove(&oldest);
73 }
74 }
75 }
76}
77
78pub struct FsStore {
99 pub(super) root: PathBuf,
100 pub(super) compression: CompressionConfig,
101 pack_manager: RwLock<PackManager>,
102 pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
103 pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
104 pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
105 loose_object_write_mode: LooseObjectWriteMode,
106 snapshot_write_batch_depth: Mutex<usize>,
107 pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
108 pub(super) verified_loose_blobs: RwLock<RecentObjectCache<ContentHash, ()>>,
129}
130
131impl FsStore {
132 pub fn new(root: impl AsRef<Path>) -> Self {
136 let root = root.as_ref().to_path_buf();
137 let pack_manager = PackManager::new(packs_dir(&root));
138 Self {
139 root,
140 compression: CompressionConfig::default(),
141 pack_manager: RwLock::new(pack_manager),
142 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
143 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
144 recent_states: RwLock::new(RecentObjectCache::with_capacity(
145 RECENT_TREE_CACHE_CAPACITY,
146 )),
147 loose_object_write_mode: LooseObjectWriteMode::Durable,
148 snapshot_write_batch_depth: Mutex::new(0),
149 pending_directory_syncs: Mutex::new(BTreeSet::new()),
150 verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
151 VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
152 )),
153 }
154 }
155
156 pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
158 let root = root.as_ref().to_path_buf();
159 let pack_manager = PackManager::new(packs_dir(&root));
160 Self {
161 root,
162 compression,
163 pack_manager: RwLock::new(pack_manager),
164 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
165 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
166 recent_states: RwLock::new(RecentObjectCache::with_capacity(
167 RECENT_TREE_CACHE_CAPACITY,
168 )),
169 loose_object_write_mode: LooseObjectWriteMode::Durable,
170 snapshot_write_batch_depth: Mutex::new(0),
171 pending_directory_syncs: Mutex::new(BTreeSet::new()),
172 verified_loose_blobs: RwLock::new(RecentObjectCache::with_capacity(
173 VERIFIED_LOOSE_BLOB_CACHE_CAPACITY,
174 )),
175 }
176 }
177
178 pub fn init(&self) -> Result<()> {
180 std::fs::create_dir_all(blobs_dir(&self.root))?;
181 std::fs::create_dir_all(trees_dir(&self.root))?;
182 std::fs::create_dir_all(states_dir(&self.root))?;
183 std::fs::create_dir_all(actions_dir(&self.root))?;
184 std::fs::create_dir_all(packs_dir(&self.root))?;
185 Ok(())
186 }
187
188 pub fn root(&self) -> &Path {
190 &self.root
191 }
192
193 pub fn compression(&self) -> CompressionConfig {
195 self.compression
196 }
197
198 pub fn set_compression(&mut self, compression: CompressionConfig) {
200 self.compression = compression;
201 }
202
203 pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
204 self.loose_object_write_mode
205 }
206
207 pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
208 self.loose_object_write_mode = mode;
209 }
210
211 fn flush_pending_directory_syncs(&self) -> Result<usize> {
212 let pending_dirs = {
213 let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
214 crate::store::HeddleError::Config(
215 "Failed to acquire pending directory sync lock".to_string(),
216 )
217 })?;
218 if guard.is_empty() {
219 return Ok(0);
220 }
221 let dirs = guard.iter().cloned().collect::<Vec<_>>();
222 guard.clear();
223 dirs
224 };
225
226 for (index, dir) in pending_dirs.iter().enumerate() {
227 if let Err(error) = sync_directory(dir) {
228 if let Ok(mut guard) = self.pending_directory_syncs.lock() {
229 guard.extend(pending_dirs[index..].iter().cloned());
230 }
231 return Err(error.into());
232 }
233 }
234
235 Ok(pending_dirs.len())
236 }
237
238 pub fn reload_packs(&self) -> Result<()> {
240 let mut manager = self.pack_manager.write().map_err(|_| {
241 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
242 })?;
243 manager.reload()
244 }
245
246 pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
261 {
263 let manager = self.pack_manager.read().map_err(|_| {
264 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
265 })?;
266 if !manager.needs_reload()? {
267 return Ok(false);
268 }
269 }
270 let mut manager = self.pack_manager.write().map_err(|_| {
274 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
275 })?;
276 manager.reload_if_disk_grew()
277 }
278
279 pub fn pack_manager(&self) -> &RwLock<PackManager> {
281 &self.pack_manager
282 }
283
284 pub fn clear_recent_object_caches(&self) {
285 if let Ok(mut blobs) = self.recent_blobs.write() {
286 *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
287 }
288 if let Ok(mut trees) = self.recent_trees.write() {
289 *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
290 }
291 if let Ok(mut states) = self.recent_states.write() {
292 *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
293 }
294 }
295
296 pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
297 let manager = self.pack_manager.read().map_err(|_| {
298 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
299 })?;
300 manager.list_all_ids()
301 }
302
303 pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
304 let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
305 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
306 })?;
307 let configured_mode = if *batch_active > 0 {
308 LooseObjectWriteMode::BatchDirectorySync
309 } else {
310 self.loose_object_write_mode
311 };
312 drop(batch_active);
313
314 let mode = match configured_mode {
315 LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
316 LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
317 };
318 write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
319 }
320
321 pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
322 write_atomic(path, data, AtomicWriteMode::Durable, None)
323 }
324
325 pub(super) fn write_loose_object_cache(&self, path: &Path, data: &[u8]) -> Result<()> {
346 write_atomic(path, data, AtomicWriteMode::NoSync, None)
347 }
348
349 pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
350 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
351 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
352 })?;
353 *depth += 1;
354 Ok(())
355 }
356
357 pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
358 let should_flush = {
359 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
360 crate::store::HeddleError::Config(
361 "Failed to acquire snapshot batch lock".to_string(),
362 )
363 })?;
364 if *depth == 0 {
365 return Ok(());
366 }
367 *depth -= 1;
368 *depth == 0
369 };
370
371 if should_flush {
372 let _ = self.flush_pending_directory_syncs()?;
373 }
374
375 Ok(())
376 }
377
378 pub(super) fn abort_snapshot_write_batch_impl(&self) {
379 if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
380 *depth = 0;
381 }
382 if let Ok(mut pending) = self.pending_directory_syncs.lock() {
383 pending.clear();
384 }
385 }
386
387 #[cfg(test)]
388 pub(super) fn pending_directory_sync_count(&self) -> usize {
389 self.pending_directory_syncs
390 .lock()
391 .map(|pending| pending.len())
392 .unwrap_or(0)
393 }
394}