Skip to main content

noxu_log/
file_handle.rs

1//! File handle with latch protection.
2//!
3//!
4//! A FileHandle wraps a file descriptor with a latch to ensure exclusive
5//! access during I/O operations.
6
7use crate::error::{LogError, Result};
8use noxu_latch::{ExclusiveLatch, ExclusiveLatchGuard};
9use noxu_sync::Mutex;
10use std::fs::File;
11use std::sync::Arc;
12
13use crate::posio;
14
15/// A file handle with latch protection for thread-safe I/O.
16///
17/// The handle holds a file descriptor and an exclusive latch.
18/// All I/O operations must be performed while holding the latch.
19pub struct FileHandle {
20    /// The underlying file (wrapped in Mutex for interior mutability).
21    file: Mutex<Option<File>>,
22    /// Latch protecting access to the file.
23    latch: Arc<ExclusiveLatch>,
24    /// Log version of this file.
25    log_version: u32,
26    /// File number this handle represents.
27    file_num: u32,
28}
29
30impl FileHandle {
31    /// Creates a new uninitialized file handle.
32    ///
33    /// The file must be initialized via `init()` before use.
34    pub fn new(file_num: u32) -> Self {
35        let latch =
36            Arc::new(ExclusiveLatch::named(format!("file_{:08x}", file_num)));
37
38        FileHandle { file: Mutex::new(None), latch, log_version: 0, file_num }
39    }
40
41    /// Initializes the handle with an open file and log version.
42    pub fn init(&mut self, file: File, log_version: u32) {
43        let mut f = self.file.lock();
44        assert!(f.is_none(), "FileHandle already initialized");
45        *f = Some(file);
46        self.log_version = log_version;
47    }
48
49    /// Returns the file number.
50    pub fn file_num(&self) -> u32 {
51        self.file_num
52    }
53
54    /// Returns the log version.
55    pub fn log_version(&self) -> u32 {
56        self.log_version
57    }
58
59    /// Returns true if the file is initialized.
60    pub fn is_initialized(&self) -> bool {
61        self.file.lock().is_some()
62    }
63
64    /// Acquires the latch and returns a guard that provides access to the file.
65    ///
66    /// Returns `Ok(guard)` on success, or `Err(LogError::LatchTimeout)` if the
67    /// latch acquisition times out. The latch is released when the guard drops.
68    pub fn acquire(&self) -> Result<FileHandleGuard<'_>> {
69        let _latch_guard = self
70            .latch
71            .acquire()
72            .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
73        Ok(FileHandleGuard { handle: self, _latch_guard })
74    }
75
76    /// Attempts to acquire the latch without blocking.
77    ///
78    /// Returns `None` if the latch is currently held.
79    pub fn try_acquire(&self) -> Option<FileHandleGuard<'_>> {
80        self.latch
81            .try_acquire()
82            .map(|_latch_guard| FileHandleGuard { handle: self, _latch_guard })
83    }
84
85    /// Closes the file handle.
86    ///
87    /// This should only be called when the handle is no longer in use.
88    pub fn close(&mut self) -> Result<()> {
89        if let Some(file) = self.file.lock().take() {
90            drop(file); // File is closed when dropped
91        }
92        Ok(())
93    }
94}
95
96impl Drop for FileHandle {
97    fn drop(&mut self) {
98        let _ = self.close();
99    }
100}
101
102/// RAII guard providing access to the file while the latch is held.
103pub struct FileHandleGuard<'a> {
104    handle: &'a FileHandle,
105    _latch_guard: ExclusiveLatchGuard<'a>,
106}
107
108impl<'a> FileHandleGuard<'a> {
109    /// Reads data from the file at the given offset.
110    ///
111    /// # Arguments
112    ///
113    /// * `offset` - File offset to read from
114    /// * `buf` - Buffer to read into
115    ///
116    /// # Returns
117    ///
118    /// Number of bytes read.
119    /// Reads data from the file at the given offset.
120    ///
121    /// Uses `pread64` (one syscall) instead of `lseek + read` (two syscalls).
122    /// The JVM
123    /// lowers to `pread64` on Linux.
124    pub fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<usize> {
125        let file_guard = self.handle.file.lock();
126        let file = file_guard.as_ref().ok_or_else(|| {
127            LogError::Internal("FileHandle not initialized".to_string())
128        })?;
129        Ok(posio::read_at(file, buf, offset)?)
130    }
131
132    /// Reads exactly `buf.len()` bytes from the file at the given offset.
133    ///
134    /// Uses `pread64` in a retry loop.
135    /// Returns an error if fewer bytes are available.
136    pub fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<()> {
137        let file_guard = self.handle.file.lock();
138        let file = file_guard.as_ref().ok_or_else(|| {
139            LogError::Internal("FileHandle not initialized".to_string())
140        })?;
141        posio::read_exact_at(file, buf, offset)?;
142        Ok(())
143    }
144
145    /// Writes data to the file at the given offset.
146    ///
147    /// Uses `pwrite64` (one syscall) instead of `lseek + write` (two syscalls).
148    /// `FileChannel.write(ByteBuffer, position)` which the JVM
149    /// lowers to `pwrite64` on Linux.  This eliminates half the syscalls on
150    /// the hot write path and removes the need to serialise seek+write under
151    /// the guard (pwrite64 is inherently positional and thread-safe).
152    ///
153    /// # Arguments
154    ///
155    /// * `offset` - File offset to write to (passed directly to pwrite64)
156    /// * `buf` - Data to write
157    ///
158    /// # Returns
159    ///
160    /// Number of bytes written (always `buf.len()` on success).
161    pub fn write_at(&mut self, offset: u64, buf: &[u8]) -> Result<usize> {
162        let file_guard = self.handle.file.lock();
163        let file = file_guard.as_ref().ok_or_else(|| {
164            LogError::Internal("FileHandle not initialized".to_string())
165        })?;
166        posio::write_all_at(file, buf, offset)?;
167        Ok(buf.len())
168    }
169
170    /// Syncs all file data and metadata to disk (fsync).
171    ///
172    /// Use this when the file's metadata (size, mtime) must also be durable —
173    /// typically for file-header writes.  For log-data writes prefer
174    /// `sync_data()` which is faster.
175    pub fn sync(&mut self) -> Result<()> {
176        let file_guard = self.handle.file.lock();
177        let file = file_guard.as_ref().ok_or_else(|| {
178            LogError::Internal("FileHandle not initialized".to_string())
179        })?;
180        // DST fault layer (inactive in production): a dropped fsync is
181        // acknowledged without flushing, then power is cut so the unsynced
182        // bytes vanish — modelling a disk that lies about durability.
183        if crate::faultdisk::on_fsync() {
184            drop(file_guard);
185            crate::faultdisk::power_cut();
186        }
187        file.sync_all()?;
188        Ok(())
189    }
190
191    /// Syncs only the file data to disk (fdatasync).
192    ///
193    /// Faster than `sync()` because it does not flush file metadata (mtime
194    /// etc.).  uses `FileChannel.force(false)` (= fdatasync) for all
195    /// log-data writes and `force(true)` (= fsync) only for file-header writes.
196    ///
197    /// / `FileChannel.force(false)`.
198    pub fn sync_data(&mut self) -> Result<()> {
199        let file_guard = self.handle.file.lock();
200        let file = file_guard.as_ref().ok_or_else(|| {
201            LogError::Internal("FileHandle not initialized".to_string())
202        })?;
203        // DST fault layer (inactive in production): see `sync` above.
204        if crate::faultdisk::on_fsync() {
205            drop(file_guard);
206            crate::faultdisk::power_cut();
207        }
208        file.sync_data()?;
209        Ok(())
210    }
211
212    /// Returns true if the file is empty.
213    pub fn is_empty(&mut self) -> Result<bool> {
214        Ok(self.len()? == 0)
215    }
216
217    /// Returns the file length.
218    pub fn len(&mut self) -> Result<u64> {
219        let file_guard = self.handle.file.lock();
220        let file = file_guard.as_ref().ok_or_else(|| {
221            LogError::Internal("FileHandle not initialized".to_string())
222        })?;
223        Ok(file.metadata()?.len())
224    }
225
226    /// Truncates the file to the given length.
227    pub fn truncate(&mut self, len: u64) -> Result<()> {
228        let file_guard = self.handle.file.lock();
229        let file = file_guard.as_ref().ok_or_else(|| {
230            LogError::Internal("FileHandle not initialized".to_string())
231        })?;
232        file.set_len(len)?;
233        Ok(())
234    }
235
236    /// Returns the file number.
237    pub fn file_num(&self) -> u32 {
238        self.handle.file_num()
239    }
240
241    /// Returns the log version.
242    pub fn log_version(&self) -> u32 {
243        self.handle.log_version()
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use std::io::Write;
251    use tempfile::NamedTempFile;
252
253    #[test]
254    fn test_file_handle_basic() {
255        let mut temp_file = NamedTempFile::new().unwrap();
256        temp_file.write_all(b"Hello, world!").unwrap();
257        temp_file.flush().unwrap();
258
259        let file = File::open(temp_file.path()).unwrap();
260
261        let mut handle = FileHandle::new(0);
262        handle.init(file, 1);
263
264        assert_eq!(handle.file_num(), 0);
265        assert_eq!(handle.log_version(), 1);
266        assert!(handle.is_initialized());
267    }
268
269    #[test]
270    fn test_file_handle_read_write() {
271        let temp_file = NamedTempFile::new().unwrap();
272        let file = File::options()
273            .read(true)
274            .write(true)
275            .open(temp_file.path())
276            .unwrap();
277
278        let mut handle = FileHandle::new(0);
279        handle.init(file, 1);
280
281        {
282            let mut guard = handle.acquire().expect("acquire");
283            guard.write_at(0, b"test data").unwrap();
284            guard.sync().unwrap();
285        }
286
287        {
288            let mut guard = handle.acquire().expect("acquire");
289            let mut buf = vec![0u8; 9];
290            let n = guard.read_at(0, &mut buf).unwrap();
291            assert_eq!(n, 9);
292            assert_eq!(&buf, b"test data");
293        }
294    }
295
296    #[test]
297    fn test_file_handle_new_uninitialized() {
298        let handle = FileHandle::new(42);
299        assert_eq!(handle.file_num(), 42);
300        assert_eq!(handle.log_version(), 0);
301        assert!(!handle.is_initialized());
302    }
303
304    #[test]
305    fn test_file_handle_log_version_set_on_init() {
306        let temp_file = NamedTempFile::new().unwrap();
307        let file = File::open(temp_file.path()).unwrap();
308        let mut handle = FileHandle::new(7);
309        handle.init(file, 5);
310        assert_eq!(handle.log_version(), 5);
311        assert!(handle.is_initialized());
312    }
313
314    #[test]
315    fn test_file_handle_file_num_preserved() {
316        let mut handle = FileHandle::new(0xFF);
317        let temp_file = NamedTempFile::new().unwrap();
318        let file = File::open(temp_file.path()).unwrap();
319        handle.init(file, 1);
320        assert_eq!(handle.file_num(), 0xFF);
321    }
322
323    #[test]
324    fn test_file_handle_close_uninitialised() {
325        let mut handle = FileHandle::new(0);
326        // Closing a non-initialized handle should not error
327        assert!(handle.close().is_ok());
328        assert!(!handle.is_initialized());
329    }
330
331    #[test]
332    fn test_file_handle_close_initialized() {
333        let temp_file = NamedTempFile::new().unwrap();
334        let file = File::open(temp_file.path()).unwrap();
335        let mut handle = FileHandle::new(1);
336        handle.init(file, 1);
337        assert!(handle.is_initialized());
338        assert!(handle.close().is_ok());
339        assert!(!handle.is_initialized());
340    }
341
342    #[test]
343    fn test_file_handle_guard_file_num() {
344        let temp_file = NamedTempFile::new().unwrap();
345        let file = File::open(temp_file.path()).unwrap();
346        let mut handle = FileHandle::new(99);
347        handle.init(file, 3);
348        let guard = handle.acquire().expect("acquire");
349        assert_eq!(guard.file_num(), 99);
350        assert_eq!(guard.log_version(), 3);
351    }
352
353    #[test]
354    fn test_file_handle_guard_read_exact() {
355        let temp_file = NamedTempFile::new().unwrap();
356        let file = File::options()
357            .read(true)
358            .write(true)
359            .open(temp_file.path())
360            .unwrap();
361        let mut handle = FileHandle::new(0);
362        handle.init(file, 1);
363
364        {
365            let mut guard = handle.acquire().expect("acquire");
366            guard.write_at(0, b"hello").unwrap();
367        }
368        {
369            let mut guard = handle.acquire().expect("acquire");
370            let mut buf = vec![0u8; 5];
371            guard.read_exact_at(0, &mut buf).unwrap();
372            assert_eq!(&buf, b"hello");
373        }
374    }
375
376    #[test]
377    fn test_file_handle_guard_len_and_is_empty() {
378        let temp_file = NamedTempFile::new().unwrap();
379        let file = File::options()
380            .read(true)
381            .write(true)
382            .open(temp_file.path())
383            .unwrap();
384        let mut handle = FileHandle::new(0);
385        handle.init(file, 1);
386
387        {
388            let mut guard = handle.acquire().expect("acquire");
389            assert!(guard.is_empty().unwrap());
390            assert_eq!(guard.len().unwrap(), 0);
391            guard.write_at(0, b"abc").unwrap();
392        }
393        {
394            let mut guard = handle.acquire().expect("acquire");
395            assert!(!guard.is_empty().unwrap());
396            assert_eq!(guard.len().unwrap(), 3);
397        }
398    }
399
400    #[test]
401    fn test_file_handle_guard_truncate() {
402        let temp_file = NamedTempFile::new().unwrap();
403        let file = File::options()
404            .read(true)
405            .write(true)
406            .open(temp_file.path())
407            .unwrap();
408        let mut handle = FileHandle::new(0);
409        handle.init(file, 1);
410
411        {
412            let mut guard = handle.acquire().expect("acquire");
413            guard.write_at(0, b"hello world").unwrap();
414        }
415        {
416            let mut guard = handle.acquire().expect("acquire");
417            guard.truncate(5).unwrap();
418            assert_eq!(guard.len().unwrap(), 5);
419        }
420    }
421
422    #[test]
423    fn test_file_handle_try_acquire() {
424        let temp_file = NamedTempFile::new().unwrap();
425        let file = File::open(temp_file.path()).unwrap();
426        let mut handle = FileHandle::new(0);
427        handle.init(file, 1);
428
429        let guard = handle.try_acquire();
430        assert!(guard.is_some());
431        // Guard released when dropped, then try_acquire succeeds again
432        drop(guard);
433        let guard2 = handle.try_acquire();
434        assert!(guard2.is_some());
435    }
436
437    #[test]
438    fn test_file_handle_read_at_offset() {
439        let temp_file = NamedTempFile::new().unwrap();
440        let file = File::options()
441            .read(true)
442            .write(true)
443            .open(temp_file.path())
444            .unwrap();
445        let mut handle = FileHandle::new(0);
446        handle.init(file, 1);
447
448        {
449            let mut guard = handle.acquire().expect("acquire");
450            guard.write_at(0, b"ABCDEF").unwrap();
451        }
452        {
453            let mut guard = handle.acquire().expect("acquire");
454            let mut buf = vec![0u8; 3];
455            let n = guard.read_at(2, &mut buf).unwrap();
456            assert_eq!(n, 3);
457            assert_eq!(&buf, b"CDE");
458        }
459    }
460
461    #[test]
462    fn test_file_handle_write_at_offset() {
463        let temp_file = NamedTempFile::new().unwrap();
464        let file = File::options()
465            .read(true)
466            .write(true)
467            .open(temp_file.path())
468            .unwrap();
469        let mut handle = FileHandle::new(0);
470        handle.init(file, 1);
471
472        {
473            let mut guard = handle.acquire().expect("acquire");
474            guard.write_at(0, b"XXXXXXXX").unwrap();
475            guard.write_at(2, b"AB").unwrap();
476        }
477        {
478            let mut guard = handle.acquire().expect("acquire");
479            let mut buf = vec![0u8; 8];
480            guard.read_exact_at(0, &mut buf).unwrap();
481            assert_eq!(&buf[2..4], b"AB");
482        }
483    }
484}