manifold/column_family/
file_handle_pool.rs

1use crate::DatabaseError;
2use crate::StorageBackend;
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::{Arc, Mutex};
6use std::time::Instant;
7
8use super::unlocked_backend::UnlockedFileBackend;
9
10/// Entry in the file handle pool tracking usage metadata.
11struct PoolEntry {
12    backend: Arc<dyn StorageBackend>,
13    last_used: Instant,
14}
15
16impl PoolEntry {
17    fn new(backend: Arc<dyn StorageBackend>) -> Self {
18        Self {
19            backend,
20            last_used: Instant::now(),
21        }
22    }
23
24    fn touch(&mut self) {
25        self.last_used = Instant::now();
26    }
27}
28
29/// Manages a pool of file handles for column families.
30///
31/// The pool maintains a fixed maximum number of open file descriptors and implements
32/// LRU eviction when the pool is full. This allows unlimited column families while
33/// keeping file descriptor usage bounded.
34///
35/// Each column family can acquire its own `FileBackend` to the same physical file,
36/// enabling true concurrent writes through independent file descriptors.
37pub struct FileHandlePool {
38    path: PathBuf,
39    max_size: usize,
40    entries: Mutex<HashMap<String, PoolEntry>>,
41    /// Global lock for file growth operations to prevent race conditions
42    /// when multiple column families try to grow the same underlying file concurrently.
43    /// Serializes `set_len()` calls across all file handles to the same file.
44    /// Stored as Arc so it can be shared with all `PartitionedStorageBackend` instances.
45    file_growth_lock: Arc<Mutex<()>>,
46}
47
48impl FileHandlePool {
49    /// Creates a new file handle pool.
50    ///
51    /// # Arguments
52    ///
53    /// * `path` - Path to the database file
54    /// * `max_size` - Maximum number of file handles to keep open
55    pub fn new(path: PathBuf, max_size: usize) -> Self {
56        Self {
57            path,
58            max_size,
59            entries: Mutex::new(HashMap::new()),
60            file_growth_lock: Arc::new(Mutex::new(())),
61        }
62    }
63
64    /// Acquires a file handle for the specified column family.
65    ///
66    /// If the column family already has an open handle, it is reused and its
67    /// `last_used` timestamp is updated. If the pool is full, the least recently
68    /// used handle is evicted.
69    ///
70    /// # Arguments
71    ///
72    /// * `cf_name` - Name of the column family requesting a handle
73    ///
74    /// # Returns
75    ///
76    /// An Arc-wrapped `StorageBackend` that the column family can use for I/O operations.
77    pub fn acquire(&self, cf_name: &str) -> Result<Arc<dyn StorageBackend>, DatabaseError> {
78        // Fast path: check if already exists (read-only, no eviction needed)
79        {
80            let mut entries = self.entries.lock().unwrap();
81            if let Some(entry) = entries.get_mut(cf_name) {
82                entry.touch();
83                return Ok(entry.backend.clone());
84            }
85        }
86
87        // Slow path: need to open a new file
88        // Open file WITHOUT holding the lock to avoid serializing all threads
89        let file = std::fs::OpenOptions::new()
90            .read(true)
91            .write(true)
92            .open(&self.path)?;
93
94        let backend: Arc<dyn StorageBackend> = Arc::new(UnlockedFileBackend::new(file)?);
95
96        // Acquire lock only for the insert
97        let mut entries = self.entries.lock().unwrap();
98
99        // Double-check: another thread might have inserted while we were opening the file
100        if let Some(entry) = entries.get_mut(cf_name) {
101            entry.touch();
102            return Ok(entry.backend.clone());
103        }
104
105        if entries.len() >= self.max_size {
106            Self::evict_lru(&mut entries, cf_name);
107        }
108
109        entries.insert(cf_name.to_string(), PoolEntry::new(backend.clone()));
110        Ok(backend)
111    }
112
113    /// Updates the `last_used` timestamp for a column family's handle.
114    ///
115    /// This should be called when a column family's `Database` is reused to prevent
116    /// premature eviction.
117    pub fn touch(&self, cf_name: &str) {
118        // Minimize lock hold time - just update timestamp
119        let mut entries = self.entries.lock().unwrap();
120        if let Some(entry) = entries.get_mut(cf_name) {
121            entry.touch();
122        }
123        // Lock released immediately
124    }
125
126    /// Explicitly releases a column family's file handle.
127    ///
128    /// This is optional and allows manual control over when handles are released.
129    /// If not called, handles are released automatically via LRU eviction.
130    pub fn release(&self, cf_name: &str) {
131        let mut entries = self.entries.lock().unwrap();
132        entries.remove(cf_name);
133    }
134
135    /// Returns the current number of open file handles.
136    pub fn len(&self) -> usize {
137        let entries = self.entries.lock().unwrap();
138        entries.len()
139    }
140
141    /// Returns true if the pool has no open handles.
142    pub fn is_empty(&self) -> bool {
143        self.len() == 0
144    }
145
146    /// Returns the maximum pool size.
147    pub fn max_size(&self) -> usize {
148        self.max_size
149    }
150
151    /// Evicts the least recently used entry from the pool.
152    ///
153    /// The entry being acquired (`cf_name`) is never evicted, even if it would be
154    /// the LRU candidate.
155    fn evict_lru(entries: &mut HashMap<String, PoolEntry>, exclude: &str) {
156        let mut lru_name: Option<String> = None;
157        let mut lru_time = Instant::now();
158
159        for (name, entry) in entries.iter() {
160            if name == exclude {
161                continue;
162            }
163
164            if lru_name.is_none() || entry.last_used < lru_time {
165                lru_name = Some(name.clone());
166                lru_time = entry.last_used;
167            }
168        }
169
170        if let Some(name) = lru_name {
171            entries.remove(&name);
172        }
173    }
174
175    /// Returns a clone of the Arc wrapping the file growth lock.
176    ///
177    /// This Arc should be passed to `PartitionedStorageBackend` instances so they can
178    /// serialize file growth operations across all column families using the same file.
179    ///
180    /// # Returns
181    ///
182    /// An Arc-wrapped Mutex that all backends for this file should use when calling `set_len()`.
183    pub fn file_growth_lock(&self) -> Arc<Mutex<()>> {
184        Arc::clone(&self.file_growth_lock)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use tempfile::NamedTempFile;
192
193    #[test]
194    fn test_pool_creation() {
195        let tmpfile = NamedTempFile::new().unwrap();
196        std::fs::write(tmpfile.path(), b"test").unwrap();
197
198        let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
199        assert_eq!(pool.max_size(), 10);
200        assert_eq!(pool.len(), 0);
201        assert!(pool.is_empty());
202    }
203
204    #[test]
205    fn test_acquire_creates_new_handle() {
206        let tmpfile = NamedTempFile::new().unwrap();
207        std::fs::write(tmpfile.path(), b"test").unwrap();
208
209        let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
210
211        let handle1 = pool.acquire("cf1").unwrap();
212        assert_eq!(pool.len(), 1);
213        assert!(!pool.is_empty());
214
215        let handle2 = pool.acquire("cf2").unwrap();
216        assert_eq!(pool.len(), 2);
217
218        assert!(!Arc::ptr_eq(&handle1, &handle2));
219    }
220
221    #[test]
222    fn test_acquire_reuses_existing_handle() {
223        let tmpfile = NamedTempFile::new().unwrap();
224        std::fs::write(tmpfile.path(), b"test").unwrap();
225
226        let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
227
228        let handle1 = pool.acquire("cf1").unwrap();
229        let handle2 = pool.acquire("cf1").unwrap();
230
231        assert_eq!(pool.len(), 1);
232        assert!(Arc::ptr_eq(&handle1, &handle2));
233    }
234
235    #[test]
236    fn test_touch_updates_timestamp() {
237        let tmpfile = NamedTempFile::new().unwrap();
238        std::fs::write(tmpfile.path(), b"test").unwrap();
239
240        let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
241
242        pool.acquire("cf1").unwrap();
243
244        let entries = pool.entries.lock().unwrap();
245        let time1 = entries.get("cf1").unwrap().last_used;
246        drop(entries);
247
248        std::thread::sleep(std::time::Duration::from_millis(10));
249        pool.touch("cf1");
250
251        let entries = pool.entries.lock().unwrap();
252        let time2 = entries.get("cf1").unwrap().last_used;
253
254        assert!(time2 > time1);
255    }
256
257    #[test]
258    fn test_release_removes_handle() {
259        let tmpfile = NamedTempFile::new().unwrap();
260        std::fs::write(tmpfile.path(), b"test").unwrap();
261
262        let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 10);
263
264        pool.acquire("cf1").unwrap();
265        assert_eq!(pool.len(), 1);
266
267        pool.release("cf1");
268        assert_eq!(pool.len(), 0);
269        assert!(pool.is_empty());
270    }
271
272    #[test]
273    fn test_lru_eviction() {
274        let tmpfile = NamedTempFile::new().unwrap();
275        std::fs::write(tmpfile.path(), b"test").unwrap();
276
277        let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 3);
278
279        pool.acquire("cf1").unwrap();
280        std::thread::sleep(std::time::Duration::from_millis(10));
281
282        pool.acquire("cf2").unwrap();
283        std::thread::sleep(std::time::Duration::from_millis(10));
284
285        pool.acquire("cf3").unwrap();
286        assert_eq!(pool.len(), 3);
287
288        pool.acquire("cf4").unwrap();
289        assert_eq!(pool.len(), 3);
290
291        let entries = pool.entries.lock().unwrap();
292        assert!(!entries.contains_key("cf1"));
293        assert!(entries.contains_key("cf2"));
294        assert!(entries.contains_key("cf3"));
295        assert!(entries.contains_key("cf4"));
296    }
297
298    #[test]
299    fn test_lru_respects_touch() {
300        let tmpfile = NamedTempFile::new().unwrap();
301        std::fs::write(tmpfile.path(), b"test").unwrap();
302
303        let pool = FileHandlePool::new(tmpfile.path().to_path_buf(), 3);
304
305        pool.acquire("cf1").unwrap();
306        std::thread::sleep(std::time::Duration::from_millis(10));
307
308        pool.acquire("cf2").unwrap();
309        std::thread::sleep(std::time::Duration::from_millis(10));
310
311        pool.acquire("cf3").unwrap();
312        std::thread::sleep(std::time::Duration::from_millis(10));
313
314        pool.touch("cf1");
315
316        pool.acquire("cf4").unwrap();
317
318        let entries = pool.entries.lock().unwrap();
319        assert!(entries.contains_key("cf1"));
320        assert!(!entries.contains_key("cf2"));
321        assert!(entries.contains_key("cf3"));
322        assert!(entries.contains_key("cf4"));
323    }
324
325    #[test]
326    fn test_concurrent_acquire() {
327        use std::sync::Arc;
328        use std::thread;
329
330        let tmpfile = NamedTempFile::new().unwrap();
331        std::fs::write(tmpfile.path(), b"test").unwrap();
332
333        let pool = Arc::new(FileHandlePool::new(tmpfile.path().to_path_buf(), 10));
334
335        let mut handles = vec![];
336        for i in 0..5 {
337            let pool_clone = pool.clone();
338            let handle = thread::spawn(move || {
339                pool_clone.acquire(&format!("cf{i}")).unwrap();
340            });
341            handles.push(handle);
342        }
343
344        for handle in handles {
345            handle.join().unwrap();
346        }
347
348        assert_eq!(pool.len(), 5);
349    }
350}