1use std::{
5 collections::{BTreeSet, HashMap, VecDeque},
6 hash::Hash,
7 path::{Path, PathBuf},
8 sync::{Mutex, RwLock},
9};
10
11use super::{
12 fs_io::{AtomicWriteMode, write_atomic},
13 fs_paths::{actions_dir, blobs_dir, packs_dir, states_dir, trees_dir},
14};
15use crate::{
16 fs_atomic::sync_directory,
17 object::{Blob, ChangeId, ContentHash, State, Tree},
18 store::{
19 CompressionConfig, Result,
20 pack::{PackManager, PackObjectId},
21 },
22};
23
24const RECENT_BLOB_CACHE_CAPACITY: usize = 2_048;
25const RECENT_TREE_CACHE_CAPACITY: usize = 1_024;
26
27#[derive(Clone, Copy, Debug, Eq, PartialEq)]
28pub enum LooseObjectWriteMode {
29 Durable,
30 BatchDirectorySync,
31}
32
33#[derive(Debug)]
34pub(super) struct RecentObjectCache<K, V> {
35 entries: HashMap<K, V>,
36 order: VecDeque<K>,
37 capacity: usize,
38}
39
40impl<K, V> RecentObjectCache<K, V>
41where
42 K: Copy + Eq + Hash,
43{
44 fn with_capacity(capacity: usize) -> Self {
45 Self {
46 entries: HashMap::new(),
47 order: VecDeque::new(),
48 capacity,
49 }
50 }
51
52 pub(super) fn get(&self, key: &K) -> Option<&V> {
53 self.entries.get(key)
54 }
55
56 pub(super) fn insert(&mut self, key: K, value: V) {
57 if self.capacity == 0 {
58 return;
59 }
60 if self.entries.insert(key, value).is_none() {
61 self.order.push_back(key);
62 }
63 while self.entries.len() > self.capacity {
64 if let Some(oldest) = self.order.pop_front() {
65 self.entries.remove(&oldest);
66 }
67 }
68 }
69}
70
71pub struct FsStore {
92 pub(super) root: PathBuf,
93 pub(super) compression: CompressionConfig,
94 pack_manager: RwLock<PackManager>,
95 pub(super) recent_blobs: RwLock<RecentObjectCache<ContentHash, Blob>>,
96 pub(super) recent_trees: RwLock<RecentObjectCache<ContentHash, Tree>>,
97 pub(super) recent_states: RwLock<RecentObjectCache<ChangeId, State>>,
98 loose_object_write_mode: LooseObjectWriteMode,
99 snapshot_write_batch_depth: Mutex<usize>,
100 pending_directory_syncs: Mutex<BTreeSet<PathBuf>>,
101}
102
103impl FsStore {
104 pub fn new(root: impl AsRef<Path>) -> Self {
108 let root = root.as_ref().to_path_buf();
109 let pack_manager = PackManager::new(packs_dir(&root));
110 Self {
111 root,
112 compression: CompressionConfig::default(),
113 pack_manager: RwLock::new(pack_manager),
114 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
115 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
116 recent_states: RwLock::new(RecentObjectCache::with_capacity(
117 RECENT_TREE_CACHE_CAPACITY,
118 )),
119 loose_object_write_mode: LooseObjectWriteMode::Durable,
120 snapshot_write_batch_depth: Mutex::new(0),
121 pending_directory_syncs: Mutex::new(BTreeSet::new()),
122 }
123 }
124
125 pub fn with_compression(root: impl AsRef<Path>, compression: CompressionConfig) -> Self {
127 let root = root.as_ref().to_path_buf();
128 let pack_manager = PackManager::new(packs_dir(&root));
129 Self {
130 root,
131 compression,
132 pack_manager: RwLock::new(pack_manager),
133 recent_blobs: RwLock::new(RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY)),
134 recent_trees: RwLock::new(RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY)),
135 recent_states: RwLock::new(RecentObjectCache::with_capacity(
136 RECENT_TREE_CACHE_CAPACITY,
137 )),
138 loose_object_write_mode: LooseObjectWriteMode::Durable,
139 snapshot_write_batch_depth: Mutex::new(0),
140 pending_directory_syncs: Mutex::new(BTreeSet::new()),
141 }
142 }
143
144 pub fn init(&self) -> Result<()> {
146 std::fs::create_dir_all(blobs_dir(&self.root))?;
147 std::fs::create_dir_all(trees_dir(&self.root))?;
148 std::fs::create_dir_all(states_dir(&self.root))?;
149 std::fs::create_dir_all(actions_dir(&self.root))?;
150 std::fs::create_dir_all(packs_dir(&self.root))?;
151 Ok(())
152 }
153
154 pub fn root(&self) -> &Path {
156 &self.root
157 }
158
159 pub fn compression(&self) -> CompressionConfig {
161 self.compression
162 }
163
164 pub fn set_compression(&mut self, compression: CompressionConfig) {
166 self.compression = compression;
167 }
168
169 pub fn loose_object_write_mode(&self) -> LooseObjectWriteMode {
170 self.loose_object_write_mode
171 }
172
173 pub fn set_loose_object_write_mode(&mut self, mode: LooseObjectWriteMode) {
174 self.loose_object_write_mode = mode;
175 }
176
177 fn flush_pending_directory_syncs(&self) -> Result<usize> {
178 let pending_dirs = {
179 let mut guard = self.pending_directory_syncs.lock().map_err(|_| {
180 crate::store::HeddleError::Config(
181 "Failed to acquire pending directory sync lock".to_string(),
182 )
183 })?;
184 if guard.is_empty() {
185 return Ok(0);
186 }
187 let dirs = guard.iter().cloned().collect::<Vec<_>>();
188 guard.clear();
189 dirs
190 };
191
192 for (index, dir) in pending_dirs.iter().enumerate() {
193 if let Err(error) = sync_directory(dir) {
194 if let Ok(mut guard) = self.pending_directory_syncs.lock() {
195 guard.extend(pending_dirs[index..].iter().cloned());
196 }
197 return Err(error.into());
198 }
199 }
200
201 Ok(pending_dirs.len())
202 }
203
204 pub fn reload_packs(&self) -> Result<()> {
206 let mut manager = self.pack_manager.write().map_err(|_| {
207 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
208 })?;
209 manager.reload()
210 }
211
212 pub(super) fn reload_packs_if_stale(&self) -> Result<bool> {
227 {
229 let manager = self.pack_manager.read().map_err(|_| {
230 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
231 })?;
232 if !manager.needs_reload()? {
233 return Ok(false);
234 }
235 }
236 let mut manager = self.pack_manager.write().map_err(|_| {
240 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
241 })?;
242 manager.reload_if_disk_grew()
243 }
244
245 pub fn pack_manager(&self) -> &RwLock<PackManager> {
247 &self.pack_manager
248 }
249
250 pub fn clear_recent_object_caches(&self) {
251 if let Ok(mut blobs) = self.recent_blobs.write() {
252 *blobs = RecentObjectCache::with_capacity(RECENT_BLOB_CACHE_CAPACITY);
253 }
254 if let Ok(mut trees) = self.recent_trees.write() {
255 *trees = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
256 }
257 if let Ok(mut states) = self.recent_states.write() {
258 *states = RecentObjectCache::with_capacity(RECENT_TREE_CACHE_CAPACITY);
259 }
260 }
261
262 pub fn pack_ids(&self) -> Result<Vec<PackObjectId>> {
263 let manager = self.pack_manager.read().map_err(|_| {
264 crate::store::HeddleError::Config("Failed to acquire pack manager lock".to_string())
265 })?;
266 manager.list_all_ids()
267 }
268
269 pub(super) fn write_loose_object_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
270 let batch_active = self.snapshot_write_batch_depth.lock().map_err(|_| {
271 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
272 })?;
273 let configured_mode = if *batch_active > 0 {
274 LooseObjectWriteMode::BatchDirectorySync
275 } else {
276 self.loose_object_write_mode
277 };
278 drop(batch_active);
279
280 let mode = match configured_mode {
281 LooseObjectWriteMode::Durable => AtomicWriteMode::Durable,
282 LooseObjectWriteMode::BatchDirectorySync => AtomicWriteMode::BatchDirectorySync,
283 };
284 write_atomic(path, data, mode, Some(&self.pending_directory_syncs))
285 }
286
287 pub(super) fn write_pack_atomic(&self, path: &Path, data: &[u8]) -> Result<()> {
288 write_atomic(path, data, AtomicWriteMode::Durable, None)
289 }
290
291 pub(super) fn begin_snapshot_write_batch_impl(&self) -> Result<()> {
292 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
293 crate::store::HeddleError::Config("Failed to acquire snapshot batch lock".to_string())
294 })?;
295 *depth += 1;
296 Ok(())
297 }
298
299 pub(super) fn flush_snapshot_write_batch_impl(&self) -> Result<()> {
300 let should_flush = {
301 let mut depth = self.snapshot_write_batch_depth.lock().map_err(|_| {
302 crate::store::HeddleError::Config(
303 "Failed to acquire snapshot batch lock".to_string(),
304 )
305 })?;
306 if *depth == 0 {
307 return Ok(());
308 }
309 *depth -= 1;
310 *depth == 0
311 };
312
313 if should_flush {
314 let _ = self.flush_pending_directory_syncs()?;
315 }
316
317 Ok(())
318 }
319
320 pub(super) fn abort_snapshot_write_batch_impl(&self) {
321 if let Ok(mut depth) = self.snapshot_write_batch_depth.lock() {
322 *depth = 0;
323 }
324 if let Ok(mut pending) = self.pending_directory_syncs.lock() {
325 pending.clear();
326 }
327 }
328
329 #[cfg(test)]
330 pub(super) fn pending_directory_sync_count(&self) -> usize {
331 self.pending_directory_syncs
332 .lock()
333 .map(|pending| pending.len())
334 .unwrap_or(0)
335 }
336}