Skip to main content

noxu_log/
file_manager.rs

1//! File manager for log files.
2//!
3//!
4//! The FileManager presents the abstraction of one contiguous log file,
5//! managing the actual on-disk log files, file handles, and LSN allocation.
6
7use crate::error::{LogError, Result};
8use crate::file_handle::FileHandle;
9use crate::file_header::{
10    FILE_HEADER_SIZE, FileHeader, LOG_VERSION, on_disk_size,
11};
12use hashbrown::HashMap;
13use memmap2::Mmap;
14use noxu_latch::ExclusiveLatch;
15use noxu_sync::{Mutex, RwLock};
16use noxu_util::lsn::Lsn;
17use std::fs::{self, File, OpenOptions};
18use std::io::Write;
19use std::num::NonZeroUsize;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
23
24/// File extension for noxu database log files.
25pub const LOG_FILE_EXTENSION: &str = ".ndb";
26
27/// Lock file name for environment locking.
28pub const LOCK_FILE_NAME: &str = "noxu.lck";
29
30/// File mode for opening log files.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum FileMode {
33    /// Read-only access.
34    ReadOnly,
35    /// Read-write access.
36    ReadWrite,
37}
38
39/// Returns the byte offset of the first log entry in a **new** log file.
40///
41/// New files are always written as the current `LOG_VERSION`, so this
42/// returns `FILE_HEADER_SIZE` (36 for v3).  When reading an **existing**
43/// file use [`FileManager::file_header_size_for`] to account for v2 files
44/// whose first entry is at offset 32.
45#[inline]
46pub fn first_log_entry_offset() -> u32 {
47    FILE_HEADER_SIZE as u32
48}
49
50/// Formats a file number as an 8-digit lowercase hex string.
51///
52/// Example: 42 -> "0000002a"
53fn format_file_number(file_num: u32) -> String {
54    format!("{:08x}", file_num)
55}
56
57/// Parses a file number from a hex string filename.
58///
59/// Example: "0000002a.ndb" -> 42
60fn parse_file_number(filename: &str) -> Option<u32> {
61    let stem = filename.strip_suffix(LOG_FILE_EXTENSION)?;
62    u32::from_str_radix(stem, 16).ok()
63}
64
65/// LRU cache of open file handles.
66///
67/// The key is the log file
68/// number; values are `Arc`-wrapped so callers may hold a reference after the
69/// cache evicts the entry (matching `FileHandle` reference-counting
70/// pattern).  Capacity is configurable (default: `ENV_RUN_CLEANER_THREADS
71/// + 2`; Noxu default: 10).
72type FileHandleCache = lru::LruCache<u32, Arc<FileHandle>>;
73
74/// Manages log files in the environment directory.
75pub struct FileManager {
76    /// Environment directory path.
77    env_dir: PathBuf,
78    /// Whether the environment is read-only.
79    read_only: bool,
80    /// Maximum size of a single log file (bytes).
81    max_file_size: u64,
82    /// LRU cache of open file handles.
83    ///
84    /// Protected by `noxu_sync::Mutex` because `lru::LruCache::get()` mutates
85    /// the eviction order, so a shared read lock would not be safe.
86    file_cache: Mutex<FileHandleCache>,
87    /// Current file number being written to.
88    current_file_num: AtomicU32,
89    /// Next available LSN for writing.
90    next_available_lsn: AtomicU64,
91    /// Last LSN that was used in the current file.
92    last_used_lsn: AtomicU64,
93    /// Map of file number to last LSN used in that file (for file headers).
94    per_file_last_lsn: RwLock<HashMap<u32, Lsn>>,
95    /// Latch protecting file creation and file number advancement.
96    file_latch: ExclusiveLatch,
97    /// Lock file handle (for environment locking).
98    lock_file: RwLock<Option<File>>,
99    /// Number of log files opened (cache miss = new file open).
100    pub n_file_opens: AtomicU64,
101    /// Number of sequential read calls.
102    pub n_sequential_reads: AtomicU64,
103    /// Total bytes read sequentially.
104    pub n_sequential_read_bytes: AtomicU64,
105    /// Number of sequential write calls.
106    pub n_sequential_writes: AtomicU64,
107    /// Total bytes written sequentially.
108    pub n_sequential_write_bytes: AtomicU64,
109    /// Number of random (point-lookup) read operations.
110    pub n_random_reads: AtomicU64,
111    /// Total bytes from random read operations.
112    pub n_random_read_bytes: AtomicU64,
113}
114
115impl FileManager {
116    /// Creates a new FileManager.
117    ///
118    /// # Arguments
119    ///
120    /// * `env_dir` - Path to the environment directory
121    /// * `read_only` - Whether to open in read-only mode
122    /// * `max_file_size` - Maximum size of a single log file (bytes)
123    /// * `cache_size` - Maximum number of file handles to cache
124    ///
125    /// # Returns
126    ///
127    /// A new FileManager instance, or an error if the directory is invalid
128    /// or the environment is locked.
129    pub fn new(
130        env_dir: impl AsRef<Path>,
131        read_only: bool,
132        max_file_size: u64,
133        cache_size: usize,
134    ) -> Result<Self> {
135        let env_dir = env_dir.as_ref().to_path_buf();
136
137        // Verify directory exists
138        if !env_dir.exists() {
139            return Err(LogError::InvalidDirectory(format!(
140                "Environment directory does not exist: {}",
141                env_dir.display()
142            )));
143        }
144
145        if !env_dir.is_dir() {
146            return Err(LogError::InvalidDirectory(format!(
147                "Path is not a directory: {}",
148                env_dir.display()
149            )));
150        }
151
152        let capacity = NonZeroUsize::new(cache_size.max(1))
153            .expect("cache_size.max(1) is always >= 1");
154        let manager = FileManager {
155            env_dir,
156            read_only,
157            max_file_size,
158            file_cache: Mutex::new(lru::LruCache::new(capacity)),
159            current_file_num: AtomicU32::new(0),
160            next_available_lsn: AtomicU64::new(
161                Lsn::new(0, first_log_entry_offset()).as_u64(),
162            ),
163            last_used_lsn: AtomicU64::new(noxu_util::lsn::NULL_LSN.as_u64()),
164            per_file_last_lsn: RwLock::new(HashMap::new()),
165            file_latch: ExclusiveLatch::named("file_manager"),
166            lock_file: RwLock::new(None),
167            n_file_opens: AtomicU64::new(0),
168            n_sequential_reads: AtomicU64::new(0),
169            n_sequential_read_bytes: AtomicU64::new(0),
170            n_sequential_writes: AtomicU64::new(0),
171            n_sequential_write_bytes: AtomicU64::new(0),
172            n_random_reads: AtomicU64::new(0),
173            n_random_read_bytes: AtomicU64::new(0),
174        };
175
176        // Lock the environment
177        manager.lock_environment()?;
178
179        Ok(manager)
180    }
181
182    /// Locks the environment to prevent concurrent access.
183    fn lock_environment(&self) -> Result<()> {
184        if self.read_only {
185            // For read-only environments, we don't take an exclusive lock
186            // (in a full implementation, we'd use a shared lock)
187            return Ok(());
188        }
189
190        let lock_path = self.env_dir.join(LOCK_FILE_NAME);
191
192        // Try to create/open the lock file
193        let lock_file = OpenOptions::new()
194            .create(true)
195            .truncate(false)
196            .write(true)
197            .open(&lock_path)?;
198
199        // Try to acquire an exclusive lock.
200        // fs2::FileExt is supported on unix and windows; on other platforms
201        // (e.g. WASM, embedded) we skip the lock — acceptable since those
202        // targets run single-process environments.
203        #[cfg(any(unix, windows))]
204        {
205            use fs2::FileExt;
206            lock_file.try_lock_exclusive().map_err(|_| {
207                LogError::EnvironmentLocked(format!(
208                    "Environment is locked by another process: {}",
209                    self.env_dir.display()
210                ))
211            })?;
212        }
213
214        *self.lock_file.write() = Some(lock_file);
215
216        Ok(())
217    }
218
219    /// Returns the path to a log file for the given file number.
220    fn file_path(&self, file_num: u32) -> PathBuf {
221        let filename =
222            format!("{}{}", format_file_number(file_num), LOG_FILE_EXTENSION);
223        self.env_dir.join(filename)
224    }
225
226    /// Lists all log file numbers in the environment directory.
227    ///
228    /// Returns the file numbers sorted in ascending order.
229    pub fn list_file_numbers(&self) -> Result<Vec<u32>> {
230        let mut file_nums = Vec::new();
231
232        for entry in fs::read_dir(&self.env_dir)? {
233            let entry = entry?;
234            let filename = entry.file_name();
235            let filename_str = filename.to_string_lossy();
236
237            if let Some(file_num) = parse_file_number(&filename_str) {
238                file_nums.push(file_num);
239            }
240        }
241
242        file_nums.sort_unstable();
243        Ok(file_nums)
244    }
245
246    /// Returns the first (lowest numbered) file, or None if no files exist.
247    pub fn get_first_file_num(&self) -> Result<Option<u32>> {
248        Ok(self.list_file_numbers()?.into_iter().next())
249    }
250
251    /// Returns the last (highest numbered) file, or None if no files exist.
252    pub fn get_last_file_num(&self) -> Result<Option<u32>> {
253        Ok(self.list_file_numbers()?.into_iter().last())
254    }
255
256    /// Returns the configured maximum log file size in bytes.
257    /// Returns true if this FileManager was opened read-only.
258    pub fn is_read_only(&self) -> bool {
259        self.read_only
260    }
261
262    pub fn max_file_size(&self) -> u64 {
263        self.max_file_size
264    }
265
266    /// Returns the environment directory holding the log files.
267    pub fn env_dir(&self) -> &Path {
268        &self.env_dir
269    }
270
271    /// Returns the total size, in bytes, of all `.ndb` log files on disk.
272    ///
273    /// This is the disk-limit "total log size" used by the disk-usage probe.
274    /// JE computes the analogous value in
275    /// `FileProtector.getLogSizeStats()` by summing `activeFiles` (plus the
276    /// last file's length); Noxu has no reserved-file machinery (the cleaner
277    /// deletes files outright rather than parking them as "reserved"), so the
278    /// total is simply the sum of every log file's length — equivalent to
279    /// JE's `activeSize` with `reservedSize == 0`.
280    pub fn total_log_size(&self) -> Result<u64> {
281        let mut total = 0u64;
282        for file_num in self.list_file_numbers()? {
283            // A file may be deleted by the cleaner between listing and stat;
284            // skip it rather than fail the whole probe.
285            if let Ok(len) = self.get_file_length(file_num) {
286                total += len;
287            }
288        }
289        Ok(total)
290    }
291
292    /// Returns the filesystem free (usable) space, in bytes, for the
293    /// environment directory.
294    ///
295    /// JE calls `Cleaner.getDiskFreeSpace()` →
296    /// `FileStoreInfo.getUsableSpace()` (a `statvfs`). Noxu uses
297    /// `fs2::available_space` (also `statvfs`-backed), which reports space
298    /// available to a non-privileged process — the same notion JE uses.
299    pub fn disk_free_space(&self) -> Result<u64> {
300        Ok(fs2::available_space(&self.env_dir)?)
301    }
302
303    /// Returns the current file number being written to.
304    pub fn get_current_file_num(&self) -> u32 {
305        self.current_file_num.load(Ordering::Acquire)
306    }
307
308    /// Returns the next available LSN for writing.
309    pub fn get_next_available_lsn(&self) -> Lsn {
310        Lsn::from_u64(self.next_available_lsn.load(Ordering::Acquire))
311    }
312
313    /// Returns the last used LSN.
314    pub fn get_last_used_lsn(&self) -> Lsn {
315        Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire))
316    }
317
318    /// Sets the end-of-log position.
319    ///
320    /// Called during recovery to set where the log should continue from.
321    pub fn set_last_position(
322        &self,
323        next_available_lsn: Lsn,
324        last_used_lsn: Lsn,
325    ) {
326        self.last_used_lsn.store(last_used_lsn.as_u64(), Ordering::Release);
327        self.per_file_last_lsn
328            .write()
329            .insert(last_used_lsn.file_number(), last_used_lsn);
330        self.next_available_lsn
331            .store(next_available_lsn.as_u64(), Ordering::Release);
332        self.current_file_num
333            .store(next_available_lsn.file_number(), Ordering::Release);
334    }
335
336    /// Gets a file handle for the given file number.
337    ///
338    /// Checks the LRU cache first.
339    /// On a cache miss the file is opened, its header validated, and the
340    /// resulting `Arc<FileHandle>` is inserted — with automatic LRU eviction
341    /// when the cache is at capacity.  Because `lru::LruCache::get()` mutates
342    /// the eviction order, the entire lookup+insert is done under a single
343    /// `Mutex` lock, eliminating any TOCTOU race between a cache miss and the
344    /// subsequent insert.
345    pub fn get_file_handle(&self, file_num: u32) -> Result<Arc<FileHandle>> {
346        let mut cache = self.file_cache.lock();
347
348        // Fast path: cache hit — LruCache::get() promotes the entry to MRU.
349        if let Some(handle) = cache.get(&file_num) {
350            return Ok(handle.clone());
351        }
352
353        // Slow path: open the file, validate its header, and insert into cache.
354        let path = self.file_path(file_num);
355        if !path.exists() {
356            return Err(LogError::FileNotFound(format!(
357                "Log file not found: {}",
358                path.display()
359            )));
360        }
361
362        let mut handle = FileHandle::new(file_num);
363
364        // Open the file.
365        let file = if self.read_only {
366            File::open(&path)?
367        } else {
368            OpenOptions::new().read(true).write(true).open(&path)?
369        };
370
371        // Read and validate the header.
372        let log_version = self.read_and_validate_header(&file, file_num)?;
373
374        // Initialize the handle.
375        handle.init(file, log_version);
376
377        let handle = Arc::new(handle);
378
379        // Insert into the LRU cache (evicts the least-recently-used entry when
380        // the cache is at capacity, mirroring FileHandleCache eviction).
381        cache.put(file_num, handle.clone());
382        self.n_file_opens.fetch_add(1, Ordering::Relaxed);
383
384        Ok(handle)
385    }
386
387    /// Reads and validates the file header.
388    ///
389    /// Handles both v2 (32-byte) and v3 (36-byte) files:
390    ///
391    /// - Reads up to `FILE_HEADER_SIZE` (36) bytes, capped at the actual file
392    ///   length. For a v2 file the 32-byte header is followed by entry bytes
393    ///   that are irrelevant; `FileHeader::read_from` stops consuming after
394    ///   32 bytes when it sees `log_version == 2`.
395    /// - A v2 header-only file (exactly 32 bytes) is read cleanly without
396    ///   over-reading.
397    fn read_and_validate_header(
398        &self,
399        file: &File,
400        file_num: u32,
401    ) -> Result<u32> {
402        // Clamp to actual file length so we never over-read a legacy v2
403        // header-only file (exactly 32 bytes).
404        let file_len = file.metadata()?.len() as usize;
405        let read_size = FILE_HEADER_SIZE.min(file_len);
406        let mut header_buf = vec![0u8; read_size];
407        crate::posio::read_exact_at(file, &mut header_buf, 0)?;
408
409        // Parse header — read_from branches on version.
410        let mut cursor = std::io::Cursor::new(header_buf);
411        let header = FileHeader::read_from(&mut cursor)?;
412
413        // Validate
414        header.validate(file_num)
415    }
416
417    /// Returns the on-disk header size (= first-entry byte offset) for a
418    /// given log file.
419    ///
420    /// Opens (or cache-hits) the file to read its `log_version`, then
421    /// returns `FileHeader::on_disk_size(version)`:
422    ///
423    /// - v2 file → 32 bytes → first entry at offset 32
424    /// - v3 file → 36 bytes → first entry at offset 36
425    ///
426    /// Use this whenever computing the "first entry offset" for an
427    /// **existing** file instead of the bare `FILE_HEADER_SIZE` constant.
428    pub fn file_header_size_for(&self, file_num: u32) -> Result<usize> {
429        let handle = self.get_file_handle(file_num)?;
430        Ok(on_disk_size(handle.log_version()))
431    }
432
433    /// Creates a new log file with the given file number.
434    ///
435    /// Writes the file header with a link to the previous file.
436    pub fn create_file(&self, file_num: u32) -> Result<Arc<FileHandle>> {
437        let _guard = self
438            .file_latch
439            .acquire()
440            .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
441        self.create_file_internal(file_num)
442    }
443
444    /// Flips to the next log file.
445    ///
446    /// Called when the current file reaches its maximum size.
447    pub fn flip_file(&self) -> Result<u32> {
448        let _guard = self
449            .file_latch
450            .acquire()
451            .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
452
453        let current = self.current_file_num.load(Ordering::Acquire);
454        let next = current + 1;
455
456        // Save last LSN for current file
457        let last_lsn =
458            Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire));
459        if !last_lsn.is_null() {
460            self.per_file_last_lsn.write().insert(current, last_lsn);
461        }
462
463        // Create next file (note: create_file_internal doesn't acquire the latch)
464        self.create_file_internal(next)?;
465
466        // Update current file number
467        self.current_file_num.store(next, Ordering::Release);
468
469        // Update next available LSN to point to start of new file
470        self.next_available_lsn.store(
471            Lsn::new(next, first_log_entry_offset()).as_u64(),
472            Ordering::Release,
473        );
474
475        Ok(next)
476    }
477
478    /// Internal helper to create a file without acquiring the file latch.
479    fn create_file_internal(&self, file_num: u32) -> Result<Arc<FileHandle>> {
480        if self.read_only {
481            return Err(LogError::WriteFailed(
482                "Cannot create file in read-only mode".to_string(),
483            ));
484        }
485
486        let path = self.file_path(file_num);
487
488        // Create the file
489        let mut file = OpenOptions::new()
490            .read(true)
491            .write(true)
492            .create_new(true)
493            .open(&path)?;
494
495        // Determine last entry offset in previous file
496        let last_entry_offset = if file_num > 0 {
497            self.per_file_last_lsn
498                .read()
499                .get(&(file_num - 1))
500                .map(|lsn| lsn.file_offset())
501                .unwrap_or(0)
502        } else {
503            0
504        };
505
506        // Write the header
507        let header = FileHeader::new(file_num, last_entry_offset);
508        header.write_to(&mut file)?;
509        file.flush()?;
510        file.sync_all()?;
511
512        // C-1 (2026 audit F-3.1 / 2026 audit 1-G):
513        // After fsync-ing the new file, fsync the parent directory so the
514        // directory entry itself is durable.  Without this a power-loss between
515        // file creation and the next directory write loses the file entirely.
516        // Cross-platform: real dir-fsync on Unix; best-effort on Windows
517        // (directory handle needs FILE_FLAG_BACKUP_SEMANTICS; NTFS journals
518        // the entry).  See `crate::posio::sync_dir`.
519        crate::posio::sync_dir(&self.env_dir)?;
520
521        // Create handle
522        let mut handle = FileHandle::new(file_num);
523        handle.init(file, LOG_VERSION);
524
525        let handle = Arc::new(handle);
526
527        // Insert into the LRU cache.
528        self.file_cache.lock().put(file_num, handle.clone());
529
530        Ok(handle)
531    }
532
533    /// Deletes a log file.
534    ///
535    /// Used by the cleaner to remove old log files.
536    pub fn delete_file(&self, file_num: u32) -> Result<()> {
537        if self.read_only {
538            return Err(LogError::WriteFailed(
539                "Cannot delete file in read-only mode".to_string(),
540            ));
541        }
542
543        // Remove from cache.
544        self.file_cache.lock().pop(&file_num);
545
546        // Delete the file
547        let path = self.file_path(file_num);
548        if path.exists() {
549            fs::remove_file(&path)?;
550        }
551
552        Ok(())
553    }
554
555    /// Clears the file handle cache.
556    pub fn clear_cache(&self) {
557        self.file_cache.lock().clear();
558    }
559
560    /// Physically truncate log file `file_num` to `offset` bytes (JE
561    /// `FileManager.truncateSingleFile`, FileManager.java:2345). Used at
562    /// recovery to remove a torn / half-written trailing entry so it cannot
563    /// be misread on a later scan. Evicts the cached handle first so a stale
564    /// open handle does not see the old length.
565    pub fn truncate_single_file(
566        &self,
567        file_num: u32,
568        offset: u64,
569    ) -> Result<()> {
570        if self.read_only {
571            return Err(LogError::WriteFailed(
572                "Cannot truncate file in read-only mode".to_string(),
573            ));
574        }
575        // Drop any cached handle so the next open sees the truncated length.
576        self.file_cache.lock().pop(&file_num);
577        let path = self.file_path(file_num);
578        if path.exists() {
579            let f = fs::OpenOptions::new().write(true).open(&path)?;
580            f.set_len(offset)?;
581            f.sync_all()?;
582        }
583        Ok(())
584    }
585
586    /// Flip the invisible bit (flags 0x10) on each LSN's log-entry header,
587    /// in file order, WITHOUT recomputing the checksum.
588    ///
589    /// Port of JE `FileManager.makeInvisible` (called from
590    /// `RollbackTracker.setInvisible`). The invisible bit is excluded from the
591    /// CRC at read time (cloaked, see `LogEntryHeader.turnOffInvisible` /
592    /// `log_manager` checksum path), so flipping it in place is a single-byte
593    /// `pwrite` per entry. The flags byte is at `file_offset + FLAGS_OFFSET`
594    /// (offset 5) of each entry.
595    ///
596    /// Caller must `force` the affected files afterwards for durability
597    /// (JE `RollbackTracker.recoveryEndFsyncInvisible`).
598    pub fn make_invisible(&self, file_num: u32, offsets: &[u32]) -> Result<()> {
599        if self.read_only {
600            return Err(LogError::WriteFailed(
601                "Cannot make entries invisible in read-only mode".to_string(),
602            ));
603        }
604        if offsets.is_empty() {
605            return Ok(());
606        }
607        let path = self.file_path(file_num);
608        if !path.exists() {
609            return Ok(());
610        }
611        // Drop any cached handle so the bit flip is observed on the next read.
612        self.file_cache.lock().pop(&file_num);
613        let file = OpenOptions::new().read(true).write(true).open(&path)?;
614        // FLAGS_OFFSET within an entry header is 5 (checksum[0..4], type[4],
615        // flags[5]).
616        const FLAGS_OFFSET: u64 = 5;
617        const INVISIBLE_MASK: u8 = 0x10;
618        for &off in offsets {
619            let flags_pos = off as u64 + FLAGS_OFFSET;
620            let mut byte = [0u8; 1];
621            Self::pread_exact(&file, flags_pos, &mut byte)?;
622            byte[0] |= INVISIBLE_MASK;
623            Self::pwrite_exact(&file, flags_pos, &byte)?;
624        }
625        Ok(())
626    }
627
628    /// fsync the given set of log files (JE `FileManager.force`). Used after
629    /// `make_invisible` to make the rollback's invisible bits durable so a
630    /// crash mid-rollback does not re-apply rolled-back entries.
631    pub fn force(&self, file_nums: &[u32]) -> Result<()> {
632        if self.read_only {
633            return Ok(());
634        }
635        for &file_num in file_nums {
636            let path = self.file_path(file_num);
637            if path.exists() {
638                let f = OpenOptions::new().write(true).open(&path)?;
639                f.sync_all()?;
640            }
641        }
642        Ok(())
643    }
644
645    #[cfg(unix)]
646    fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
647        use std::os::unix::fs::FileExt;
648        file.read_exact_at(buf, offset)?;
649        Ok(())
650    }
651
652    #[cfg(unix)]
653    fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
654        // Route header writes through posio so the DST fault layer covers them
655        // too (inactive in production -> identical to a direct write_all_at).
656        crate::posio::write_all_at(file, buf, offset)?;
657        Ok(())
658    }
659
660    #[cfg(not(unix))]
661    fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
662        use std::io::{Read, Seek, SeekFrom};
663        let mut f = file.try_clone()?;
664        f.seek(SeekFrom::Start(offset))?;
665        f.read_exact(buf)?;
666        Ok(())
667    }
668
669    #[cfg(not(unix))]
670    fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
671        use std::io::{Seek, SeekFrom, Write};
672        let mut f = file.try_clone()?;
673        f.seek(SeekFrom::Start(offset))?;
674        f.write_all(buf)?;
675        Ok(())
676    }
677
678    /// Truncate the log at (`file_num`, `offset`): truncate `file_num` to
679    /// `offset` and delete every higher-numbered file, in descending order to
680    /// avoid a log-entry gap (JE `FileManager.truncateLog`, FileManager.java:2374,
681    /// SR [#19463]). If `offset == 0` the file header itself is gone, so the
682    /// whole file is deleted too.
683    pub fn truncate_log(&self, file_num: u32, offset: u64) -> Result<()> {
684        if self.read_only {
685            return Err(LogError::WriteFailed(
686                "Cannot truncate log in read-only mode".to_string(),
687            ));
688        }
689        let last = self.get_last_file_num()?.unwrap_or(file_num);
690        let mut i = last as i64;
691        while i >= file_num as i64 {
692            let fnum = i as u32;
693            if self.file_path(fnum).exists() {
694                if fnum == file_num {
695                    self.truncate_single_file(fnum, offset)?;
696                    if offset != 0 {
697                        i -= 1;
698                        continue;
699                    }
700                }
701                self.delete_file(fnum)?;
702            }
703            i -= 1;
704        }
705        Ok(())
706    }
707
708    /// Writes `data` to the current log file at the given file offset.
709    ///
710    /// `writeToFile()`.  The caller must supply the exact file-level byte
711    /// offset at which `data` should be written (i.e. `firstLsn.fileOffset`
712    /// in terms).  After a successful write the method checks whether the
713    /// file has grown past `max_file_size`; if so it calls `flip_file()` and
714    /// returns the new file number, otherwise it returns the current one.
715    ///
716    /// # Arguments
717    /// * `data`        - The raw bytes to append (header + payload).
718    /// * `file_offset` - Byte offset within the file at which to write.
719    ///
720    /// # Returns
721    /// The file number that was actually written to.
722    /// Writes `data` at `file_offset` within log file `file_num`.
723    ///
724    /// JE faithfulness: JE `FileManager.writeLogBuffer` uses
725    /// `fullBuffer.getFirstLsn()` to determine which file to write to, not
726    /// `currentFileNum`.  This method mirrors that by accepting an explicit
727    /// `file_num` parameter so `write_dirty` and `fill_flush_pending` can
728    /// write dirty buffers to the file their `first_lsn` belongs to.
729    ///
730    /// The auto-flip (check `file_len >= max_file_size` and call `flip_file`)
731    /// has been removed: file flips are managed exclusively by
732    /// `LogManager::log_internal` via the `flipped` flag and
733    /// `get_write_buffer`/`sync_log_end_and_finish_file`.  Auto-flip in this
734    /// method would race with the explicit flip and double-create files.
735    pub fn write_buffer_to_file(
736        &self,
737        file_num: u32,
738        data: &[u8],
739        file_offset: u64,
740    ) -> Result<()> {
741        if self.read_only {
742            return Err(LogError::WriteFailed(
743                "Cannot write in read-only mode".to_string(),
744            ));
745        }
746
747        // Obtain (or create) the file handle under `file_latch`.
748        //
749        // We MUST hold `file_latch` for the entire exists-check → get/create
750        // sequence to avoid a TOCTOU race:
751        //
752        //   Thread A: inside create_file_internal (created empty file, writing
753        //             header but not done yet)
754        //   Thread B: file_path.exists()=true → get_file_handle → tries to
755        //             read the header from an empty file → "failed to fill
756        //             whole buffer" (UnexpectedEof)
757        //
758        // Holding file_latch serialises creation and subsequent opens so that
759        // Thread B waits until Thread A's create_file_internal (which also
760        // holds file_latch) has written and fsynced the full header.
761        let handle = {
762            let _guard = self
763                .file_latch
764                .acquire()
765                .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
766
767            if self.file_path(file_num).exists() {
768                self.get_file_handle(file_num)?
769            } else {
770                // create_file_internal (called here directly, since we already
771                // hold file_latch) creates the file, writes the header, fsyncs.
772                self.create_file_internal(file_num)?
773            }
774        };
775
776        {
777            let mut guard = handle.acquire()?;
778            guard.write_at(file_offset, data)?;
779        }
780
781        self.n_sequential_writes.fetch_add(1, Ordering::Relaxed);
782        self.n_sequential_write_bytes
783            .fetch_add(data.len() as u64, Ordering::Relaxed);
784
785        Ok(())
786    }
787
788    /// Writes `data` at `file_offset` within the CURRENT log file.
789    ///
790    /// For new entries written by `LogManager::log_internal` when the entry is
791    /// too large for the buffer pool (temp-buffer path). The current file is
792    /// always correct here because `set_last_position` has already advanced
793    /// `current_file_num` to the file that holds `current_lsn`.
794    ///
795    /// Callers that write data belonging to a SPECIFIC file (dirty buffer
796    /// flush in `write_dirty` / `fill_flush_pending`) must use
797    /// `write_buffer_to_file` instead to avoid writing old data to the
798    /// wrong file after a flip.
799    pub fn write_buffer(&self, data: &[u8], file_offset: u64) -> Result<u32> {
800        if self.read_only {
801            return Err(LogError::WriteFailed(
802                "Cannot write in read-only mode".to_string(),
803            ));
804        }
805
806        let file_num = self.current_file_num.load(Ordering::Acquire);
807        self.write_buffer_to_file(file_num, data, file_offset)?;
808        Ok(file_num)
809    }
810
811    /// Reads bytes from a log file at a given offset.
812    ///
813    ///
814    ///
815    /// # Arguments
816    /// * `file_num` - The log file number to read from.
817    /// * `offset`   - Byte offset within the file.
818    /// * `buf`      - Output buffer; filled with as many bytes as available
819    ///   (may be less than `buf.len()` at end of file).
820    ///
821    /// # Returns
822    /// The number of bytes actually read.
823    pub fn read_from_file(
824        &self,
825        file_num: u32,
826        offset: u64,
827        buf: &mut [u8],
828    ) -> Result<usize> {
829        let handle = self.get_file_handle(file_num)?;
830        let mut guard = handle.acquire()?;
831        let n = guard.read_at(offset, buf)?;
832        self.n_sequential_reads.fetch_add(1, Ordering::Relaxed);
833        self.n_sequential_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
834        Ok(n)
835    }
836
837    /// Reads bytes from a log file at a given offset, counted as a random
838    /// (point-lookup) read rather than a sequential scan read.
839    ///
840    /// Used by `LogManager::read_at_lsn` for in-flight log reads.
841    pub fn read_from_file_random(
842        &self,
843        file_num: u32,
844        offset: u64,
845        buf: &mut [u8],
846    ) -> Result<usize> {
847        let handle = self.get_file_handle(file_num)?;
848        let mut guard = handle.acquire()?;
849        let n = guard.read_at(offset, buf)?;
850        self.n_random_reads.fetch_add(1, Ordering::Relaxed);
851        self.n_random_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
852        Ok(n)
853    }
854
855    /// Returns the length of a log file in bytes.
856    pub fn get_file_length(&self, file_num: u32) -> Result<u64> {
857        let path = self.file_path(file_num);
858        if !path.exists() {
859            return Err(LogError::FileNotFound(format!(
860                "Log file not found: {}",
861                path.display()
862            )));
863        }
864        Ok(path.metadata()?.len())
865    }
866
867    /// Memory-maps a log file for read-only sequential access.
868    ///
869    /// Returns a `Mmap` covering the entire file.  The OS handles page-in
870    /// lazily with automatic sequential read-ahead, eliminating all per-entry
871    /// `pread64` syscalls during recovery scanning.
872    ///
873    /// # Safety
874    /// The caller must not hold a mutable reference into the mapped memory
875    /// while other processes write to the file.  During recovery, log files
876    /// are read-only, making this safe.
877    pub fn mmap_file(&self, file_num: u32) -> Result<Mmap> {
878        // Never mmap the current write file. It can be appended to
879        // concurrently (pwrite64 on the log-writer thread) while a
880        // disk-ordered cursor reads it; `memmap2` requires that a mapped file
881        // is not modified for the lifetime of the mapping, so mapping the live
882        // write file would be undefined behaviour. Callers (e.g. the
883        // file-manager log scanner) fall back to positioned `pread` reads,
884        // which are safe under concurrent appends. Complete (sealed) files are
885        // never written again and are safe to map.
886        if file_num == self.get_current_file_num() {
887            return Err(LogError::Io(std::io::Error::other(format!(
888                "refusing to mmap the current write file {file_num} \
889                 (may be concurrently appended); use pread fallback"
890            ))));
891        }
892        let path = self.file_path(file_num);
893        let file = File::open(&path).map_err(|e| {
894            LogError::FileNotFound(format!(
895                "Cannot open {:?} for mmap: {}",
896                path, e
897            ))
898        })?;
899        // SAFETY: `file_num` is not the current write file (checked above), so
900        // it is a sealed log file whose bytes do not change for the lifetime
901        // of the mapping.
902        let mmap = unsafe { Mmap::map(&file) }.map_err(|e| {
903            LogError::Io(std::io::Error::other(format!(
904                "mmap {:?}: {}",
905                path, e
906            )))
907        })?;
908        Ok(mmap)
909    }
910
911    /// Returns current I/O statistics for this FileManager.
912    pub fn get_io_stats(&self) -> FileManagerIoStats {
913        FileManagerIoStats {
914            n_file_opens: self.n_file_opens.load(Ordering::Relaxed),
915            n_sequential_reads: self.n_sequential_reads.load(Ordering::Relaxed),
916            n_sequential_read_bytes: self
917                .n_sequential_read_bytes
918                .load(Ordering::Relaxed),
919            n_sequential_writes: self
920                .n_sequential_writes
921                .load(Ordering::Relaxed),
922            n_sequential_write_bytes: self
923                .n_sequential_write_bytes
924                .load(Ordering::Relaxed),
925            n_random_reads: self.n_random_reads.load(Ordering::Relaxed),
926            n_random_read_bytes: self
927                .n_random_read_bytes
928                .load(Ordering::Relaxed),
929        }
930    }
931
932    /// Fsyncs the current log file to stable storage and removes it from the
933    /// file-handle cache, making the old file handle eligible for GC.
934    ///
935    /// JE faithfulness (Part-3, DRIFT-3/7): mirrors
936    /// `FileManager.syncLogEndAndFinishFile()` which calls `syncLogEnd()` then
937    /// `endOfLog.close()`.  Called by `LogBufferPool.getWriteBuffer` when
938    /// `flippedFile=true`, under the LWL, BEFORE `advanceLsn` advances the
939    /// LSN bookkeeping to the new file.  This establishes the JE invariant
940    /// that the OLD file is durably closed before any entry is written to the
941    /// NEW file.
942    ///
943    /// References:
944    /// - JE `FileManager.syncLogEndAndFinishFile` (line 2077)
945    /// - JE `LogBufferPool.getWriteBuffer` (called after `bumpAndWriteDirty`
946    ///   when `flippedFile=true`)
947    pub fn sync_log_end_and_finish_file(&self) -> Result<()> {
948        self.sync_log_end()?;
949        // Evict the current (old) file from the LRU cache so its OS file
950        // descriptor is released promptly — JE `endOfLog.close()`.
951        let file_num = self.current_file_num.load(Ordering::Acquire);
952        let mut cache = self.file_cache.lock();
953        cache.pop(&file_num);
954        Ok(())
955    }
956
957    /// Fsyncs the current log file to stable storage.
958    ///
959    /// JE: `FileManager.syncLogEnd()` (called from `syncLogEndAndFinishFile`).
960    pub fn sync_log_end(&self) -> Result<()> {
961        if self.read_only {
962            return Ok(());
963        }
964
965        let file_num = self.current_file_num.load(Ordering::Acquire);
966        let path = self.file_path(file_num);
967
968        if !path.exists() {
969            // Nothing to sync yet.
970            return Ok(());
971        }
972
973        let handle = self.get_file_handle(file_num)?;
974        let mut guard = handle.acquire()?;
975        // Use fdatasync (sync_data) — only log data must be durable here,
976        // not file metadata.  uses FileChannel.force(false) for this.
977        guard.sync_data()?;
978        Ok(())
979    }
980
981    /// Closes the file manager, releasing all resources.
982    pub fn close(&self) -> Result<()> {
983        self.clear_cache();
984
985        // Release the lock file
986        if let Some(lock_file) = self.lock_file.write().take() {
987            {
988                #[allow(unused_imports)]
989                use fs2::FileExt;
990                let _ = lock_file.unlock();
991            }
992            drop(lock_file);
993        }
994
995        Ok(())
996    }
997}
998
999impl Drop for FileManager {
1000    fn drop(&mut self) {
1001        let _ = self.close();
1002    }
1003}
1004
1005/// Snapshot of FileManager I/O statistics.
1006///
1007/// FILEMGR_FILE_OPENS, FILEMGR_SEQUENTIAL_READS/WRITES,
1008/// FILEMGR_RANDOM_READS etc.
1009#[derive(Debug, Clone, Default)]
1010pub struct FileManagerIoStats {
1011    /// Number of log files opened (LRU cache miss).
1012    pub n_file_opens: u64,
1013    /// Number of sequential read operations (recovery scan).
1014    pub n_sequential_reads: u64,
1015    /// Total bytes read sequentially.
1016    pub n_sequential_read_bytes: u64,
1017    /// Number of sequential write operations.
1018    pub n_sequential_writes: u64,
1019    /// Total bytes written sequentially.
1020    pub n_sequential_write_bytes: u64,
1021    /// Number of random (point-lookup) read operations.
1022    pub n_random_reads: u64,
1023    /// Total bytes from random read operations.
1024    pub n_random_read_bytes: u64,
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029    use super::*;
1030    use tempfile::TempDir;
1031
1032    #[test]
1033    fn test_format_parse_file_number() {
1034        assert_eq!(format_file_number(0), "00000000");
1035        assert_eq!(format_file_number(42), "0000002a");
1036        assert_eq!(format_file_number(255), "000000ff");
1037        assert_eq!(format_file_number(0x12345678), "12345678");
1038
1039        assert_eq!(parse_file_number("00000000.ndb"), Some(0));
1040        assert_eq!(parse_file_number("0000002a.ndb"), Some(42));
1041        assert_eq!(parse_file_number("000000ff.ndb"), Some(255));
1042        assert_eq!(parse_file_number("12345678.ndb"), Some(0x12345678));
1043
1044        assert_eq!(parse_file_number("invalid.ndb"), None);
1045        assert_eq!(parse_file_number("00000000.txt"), None);
1046    }
1047
1048    #[test]
1049    fn test_file_manager_create() {
1050        let temp_dir = TempDir::new().unwrap();
1051        let manager =
1052            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1053
1054        assert_eq!(manager.get_current_file_num(), 0);
1055        assert_eq!(manager.get_first_file_num().unwrap(), None);
1056    }
1057
1058    #[test]
1059    fn test_file_manager_create_file() {
1060        let temp_dir = TempDir::new().unwrap();
1061        let manager =
1062            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1063
1064        let handle = manager.create_file(0).unwrap();
1065        assert_eq!(handle.file_num(), 0);
1066        assert_eq!(handle.log_version(), LOG_VERSION);
1067
1068        // File should exist
1069        let path = manager.file_path(0);
1070        assert!(path.exists());
1071
1072        // Should be able to get it again from cache
1073        let handle2 = manager.get_file_handle(0).unwrap();
1074        assert_eq!(handle2.file_num(), 0);
1075    }
1076
1077    #[test]
1078    fn test_file_manager_list_files() {
1079        let temp_dir = TempDir::new().unwrap();
1080        let manager =
1081            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1082
1083        manager.create_file(0).unwrap();
1084        manager.create_file(2).unwrap();
1085        manager.create_file(1).unwrap();
1086
1087        let files = manager.list_file_numbers().unwrap();
1088        assert_eq!(files, vec![0, 1, 2]);
1089
1090        assert_eq!(manager.get_first_file_num().unwrap(), Some(0));
1091        assert_eq!(manager.get_last_file_num().unwrap(), Some(2));
1092    }
1093
1094    #[test]
1095    fn test_file_manager_flip_file() {
1096        let temp_dir = TempDir::new().unwrap();
1097
1098        {
1099            let manager =
1100                FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1101                    .unwrap();
1102
1103            // Create initial file
1104            manager.create_file(0).unwrap();
1105
1106            // Set current file
1107            manager.current_file_num.store(0, Ordering::Release);
1108            manager
1109                .last_used_lsn
1110                .store(Lsn::new(0, 1000).as_u64(), Ordering::Release);
1111
1112            // Flip to next file
1113            let next = manager.flip_file().unwrap();
1114            assert_eq!(next, 1);
1115            assert_eq!(manager.get_current_file_num(), 1);
1116
1117            // Should have created file 1
1118            let files = manager.list_file_numbers().unwrap();
1119            assert!(files.contains(&1));
1120        } // manager dropped here, releasing lock
1121    }
1122
1123    #[test]
1124    fn test_environment_locking() {
1125        let temp_dir = TempDir::new().unwrap();
1126
1127        // First manager locks the environment
1128        let _manager1 =
1129            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1130
1131        // Second manager should fail to lock
1132        let result = FileManager::new(temp_dir.path(), false, 10_000_000, 100);
1133        assert!(result.is_err());
1134        match result {
1135            Err(LogError::EnvironmentLocked(_)) => (),
1136            _ => panic!("Expected EnvironmentLocked error"),
1137        }
1138    }
1139
1140    #[test]
1141    fn test_nonexistent_directory_fails() {
1142        let result = FileManager::new(
1143            "/tmp/does_not_exist_noxu_xyz",
1144            false,
1145            10_000_000,
1146            100,
1147        );
1148        assert!(result.is_err());
1149        match result {
1150            Err(LogError::InvalidDirectory(_)) => (),
1151            _ => panic!("Expected InvalidDirectory error"),
1152        }
1153    }
1154
1155    #[test]
1156    fn test_get_file_handle_missing_file_fails() {
1157        let temp_dir = TempDir::new().unwrap();
1158        let manager =
1159            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1160
1161        let result = manager.get_file_handle(99);
1162        assert!(result.is_err());
1163        match result {
1164            Err(LogError::FileNotFound(_)) => (),
1165            _ => panic!("Expected FileNotFound error"),
1166        }
1167    }
1168
1169    #[test]
1170    fn test_delete_file() {
1171        let temp_dir = TempDir::new().unwrap();
1172        let manager =
1173            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1174
1175        manager.create_file(0).unwrap();
1176        assert!(manager.file_path(0).exists());
1177
1178        manager.delete_file(0).unwrap();
1179        assert!(!manager.file_path(0).exists());
1180        assert_eq!(manager.list_file_numbers().unwrap(), Vec::<u32>::new());
1181    }
1182
1183    #[test]
1184    fn test_delete_nonexistent_file_is_ok() {
1185        let temp_dir = TempDir::new().unwrap();
1186        let manager =
1187            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1188
1189        // Deleting a file that does not exist should not return an error.
1190        assert!(manager.delete_file(42).is_ok());
1191    }
1192
1193    #[test]
1194    fn test_set_last_position() {
1195        let temp_dir = TempDir::new().unwrap();
1196        let manager =
1197            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1198
1199        let next = Lsn::new(3, 1024);
1200        let last = Lsn::new(2, 512);
1201        manager.set_last_position(next, last);
1202
1203        assert_eq!(manager.get_next_available_lsn(), next);
1204        assert_eq!(manager.get_last_used_lsn(), last);
1205        assert_eq!(manager.get_current_file_num(), 3);
1206    }
1207
1208    #[test]
1209    fn test_read_only_create_file_fails() {
1210        let temp_dir = TempDir::new().unwrap();
1211        // Create a writable manager first to avoid the lock conflict.
1212        {
1213            let _mgr =
1214                FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1215                    .unwrap();
1216        } // lock released on drop
1217
1218        // Read-only mode must not create files.
1219        let ro_mgr =
1220            FileManager::new(temp_dir.path(), true, 10_000_000, 100).unwrap();
1221        let result = ro_mgr.create_file(0);
1222        assert!(result.is_err());
1223    }
1224
1225    #[test]
1226    fn test_first_and_last_file_num_empty() {
1227        let temp_dir = TempDir::new().unwrap();
1228        let manager =
1229            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1230
1231        assert_eq!(manager.get_first_file_num().unwrap(), None);
1232        assert_eq!(manager.get_last_file_num().unwrap(), None);
1233    }
1234
1235    #[test]
1236    fn test_clear_cache() {
1237        let temp_dir = TempDir::new().unwrap();
1238        let manager =
1239            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1240
1241        manager.create_file(0).unwrap();
1242        // Clearing the cache should not panic or corrupt state.
1243        manager.clear_cache();
1244
1245        // After clearing, get_file_handle must re-open the file.
1246        let handle = manager.get_file_handle(0).unwrap();
1247        assert_eq!(handle.file_num(), 0);
1248    }
1249
1250    /// C-1 regression: parent directory must be fsynced after creating each
1251    /// new log file so the directory entry is durable across a power loss.
1252    ///
1253    /// This test verifies that `create_file_internal` completes without error
1254    /// (which confirms the dir-open + sync_all code path runs), and that
1255    /// the created file is visible in a directory listing performed after the
1256    /// call returns — i.e. the same state recovery would see after a restart.
1257    #[test]
1258    fn test_parent_dir_fsynced_after_file_create() {
1259        let temp_dir = TempDir::new().unwrap();
1260        let manager =
1261            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1262
1263        // Creating file 0 must succeed (includes parent-dir fsync).
1264        manager.create_file(0).unwrap();
1265
1266        // The file must be present in the directory listing — the same check
1267        // recovery performs when scanning for log files to replay.
1268        let listed = manager.list_file_numbers().unwrap();
1269        assert_eq!(
1270            listed,
1271            vec![0],
1272            "file 0 must be visible in dir listing after create"
1273        );
1274
1275        // Create a second file (flip) to exercise the path for file_num > 0.
1276        manager.flip_file().unwrap();
1277        let listed2 = manager.list_file_numbers().unwrap();
1278        assert!(listed2.contains(&1), "file 1 must be visible after flip");
1279    }
1280}