manifold/column_family/
file_handle_pool.rs1use crate::DatabaseError;
2use crate::StorageBackend;
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::Instant;
7
8use super::unlocked_backend::UnlockedFileBackend;
9
10struct PoolEntry {
12 backend: Arc<dyn StorageBackend>,
13 last_used: Instant,
14}
15
16impl PoolEntry {
17 fn new(backend: Arc<dyn StorageBackend>) -> Self {
18 Self {
19 backend,
20 last_used: Instant::now(),
21 }
22 }
23
24 fn touch(&mut self) {
25 self.last_used = Instant::now();
26 }
27}
28
29pub struct FileHandlePool {
38 path: PathBuf,
39 max_size: usize,
40 entries: Mutex<HashMap<String, PoolEntry>>,
41 file_growth_lock: Arc<Mutex<()>>,
46}
47
48impl FileHandlePool {
49 pub fn new(path: PathBuf, max_size: usize) -> Self {
56 Self {
57 path,
58 max_size,
59 entries: Mutex::new(HashMap::new()),
60 file_growth_lock: Arc::new(Mutex::new(())),
61 }
62 }
63
64 pub fn acquire(&self, cf_name: &str) -> Result<Arc<dyn StorageBackend>, DatabaseError> {
78 {
80 let mut entries = self.entries.lock().unwrap();
81 if let Some(entry) = entries.get_mut(cf_name) {
82 entry.touch();
83 return Ok(entry.backend.clone());
84 }
85 }
86
87 let file = std::fs::OpenOptions::new()
90 .read(true)
91 .write(true)
92 .open(&self.path)?;
93
94 let backend: Arc<dyn StorageBackend> = Arc::new(UnlockedFileBackend::new(file)?);
95
96 let mut entries = self.entries.lock().unwrap();
98
99 if let Some(entry) = entries.get_mut(cf_name) {
101 entry.touch();
102 return Ok(entry.backend.clone());
103 }
104
105 if entries.len() >= self.max_size {
106 Self::evict_lru(&mut entries, cf_name);
107 }
108
109 entries.insert(cf_name.to_string(), PoolEntry::new(backend.clone()));
110 Ok(backend)
111 }
112
113 pub fn touch(&self, cf_name: &str) {
118 let mut entries = self.entries.lock().unwrap();
120 if let Some(entry) = entries.get_mut(cf_name) {
121 entry.touch();
122 }
123 }
125
126 pub fn release(&self, cf_name: &str) {
131 let mut entries = self.entries.lock().unwrap();
132 entries.remove(cf_name);
133 }
134
135 pub fn len(&self) -> usize {
137 let entries = self.entries.lock().unwrap();
138 entries.len()
139 }
140
141 pub fn is_empty(&self) -> bool {
143 self.len() == 0
144 }
145
146 pub fn max_size(&self) -> usize {
148 self.max_size
149 }
150
151 fn evict_lru(entries: &mut HashMap<String, PoolEntry>, exclude: &str) {
156 let mut lru_name: Option<String> = None;
157 let mut lru_time = Instant::now();
158
159 for (name, entry) in entries.iter() {
160 if name == exclude {
161 continue;
162 }
163
164 if lru_name.is_none() || entry.last_used < lru_time {
165 lru_name = Some(name.clone());
166 lru_time = entry.last_used;
167 }
168 }
169
170 if let Some(name) = lru_name {
171 entries.remove(&name);
172 }
173 }
174
175 pub fn file_growth_lock(&self) -> Arc<Mutex<()>> {
184 Arc::clone(&self.file_growth_lock)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use tempfile::NamedTempFile;
192
193 #[test]
194 fn test_pool_creation() {
195 let tmpfile = NamedTempFile::new().unwrap();
196 std::fs::write(tmpfile.path(), b"test").unwrap();
197
198 let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
199 assert_eq!(pool.max_size(), 10);
200 assert_eq!(pool.len(), 0);
201 assert!(pool.is_empty());
202 }
203
204 #[test]
205 fn test_acquire_creates_new_handle() {
206 let tmpfile = NamedTempFile::new().unwrap();
207 std::fs::write(tmpfile.path(), b"test").unwrap();
208
209 let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
210
211 let handle1 = pool.acquire("cf1").unwrap();
212 assert_eq!(pool.len(), 1);
213 assert!(!pool.is_empty());
214
215 let handle2 = pool.acquire("cf2").unwrap();
216 assert_eq!(pool.len(), 2);
217
218 assert!(!Arc::ptr_eq(&handle1, &handle2));
219 }
220
221 #[test]
222 fn test_acquire_reuses_existing_handle() {
223 let tmpfile = NamedTempFile::new().unwrap();
224 std::fs::write(tmpfile.path(), b"test").unwrap();
225
226 let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
227
228 let handle1 = pool.acquire("cf1").unwrap();
229 let handle2 = pool.acquire("cf1").unwrap();
230
231 assert_eq!(pool.len(), 1);
232 assert!(Arc::ptr_eq(&handle1, &handle2));
233 }
234
235 #[test]
236 fn test_touch_updates_timestamp() {
237 let tmpfile = NamedTempFile::new().unwrap();
238 std::fs::write(tmpfile.path(), b"test").unwrap();
239
240 let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
241
242 pool.acquire("cf1").unwrap();
243
244 let entries = pool.entries.lock().unwrap();
245 let time1 = entries.get("cf1").unwrap().last_used;
246 drop(entries);
247
248 std::thread::sleep(std::time::Duration::from_millis(10));
249 pool.touch("cf1");
250
251 let entries = pool.entries.lock().unwrap();
252 let time2 = entries.get("cf1").unwrap().last_used;
253
254 assert!(time2 > time1);
255 }
256
257 #[test]
258 fn test_release_removes_handle() {
259 let tmpfile = NamedTempFile::new().unwrap();
260 std::fs::write(tmpfile.path(), b"test").unwrap();
261
262 let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
263
264 pool.acquire("cf1").unwrap();
265 assert_eq!(pool.len(), 1);
266
267 pool.release("cf1");
268 assert_eq!(pool.len(), 0);
269 assert!(pool.is_empty());
270 }
271
272 #[test]
273 fn test_lru_eviction() {
274 let tmpfile = NamedTempFile::new().unwrap();
275 std::fs::write(tmpfile.path(), b"test").unwrap();
276
277 let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 3);
278
279 pool.acquire("cf1").unwrap();
280 std::thread::sleep(std::time::Duration::from_millis(10));
281
282 pool.acquire("cf2").unwrap();
283 std::thread::sleep(std::time::Duration::from_millis(10));
284
285 pool.acquire("cf3").unwrap();
286 assert_eq!(pool.len(), 3);
287
288 pool.acquire("cf4").unwrap();
289 assert_eq!(pool.len(), 3);
290
291 let entries = pool.entries.lock().unwrap();
292 assert!(!entries.contains_key("cf1"));
293 assert!(entries.contains_key("cf2"));
294 assert!(entries.contains_key("cf3"));
295 assert!(entries.contains_key("cf4"));
296 }
297
298 #[test]
299 fn test_lru_respects_touch() {
300 let tmpfile = NamedTempFile::new().unwrap();
301 std::fs::write(tmpfile.path(), b"test").unwrap();
302
303 let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 3);
304
305 pool.acquire("cf1").unwrap();
306 std::thread::sleep(std::time::Duration::from_millis(10));
307
308 pool.acquire("cf2").unwrap();
309 std::thread::sleep(std::time::Duration::from_millis(10));
310
311 pool.acquire("cf3").unwrap();
312 std::thread::sleep(std::time::Duration::from_millis(10));
313
314 pool.touch("cf1");
315
316 pool.acquire("cf4").unwrap();
317
318 let entries = pool.entries.lock().unwrap();
319 assert!(entries.contains_key("cf1"));
320 assert!(!entries.contains_key("cf2"));
321 assert!(entries.contains_key("cf3"));
322 assert!(entries.contains_key("cf4"));
323 }
324
325 #[test]
326 fn test_concurrent_acquire() {
327 use std::sync::Arc;
328 use std::thread;
329
330 let tmpfile = NamedTempFile::new().unwrap();
331 std::fs::write(tmpfile.path(), b"test").unwrap();
332
333 let pool = Arc::new(FileHandlePool::new(tmpfile.path().to_path_buf(), 10));
334
335 let mut handles = vec![];
336 for i in 0..5 {
337 let pool_clone = pool.clone();
338 let handle = thread::spawn(move || {
339 pool_clone.acquire(&format!("cf{i}")).unwrap();
340 });
341 handles.push(handle);
342 }
343
344 for handle in handles {
345 handle.join().unwrap();
346 }
347
348 assert_eq!(pool.len(), 5);
349 }
350}