Skip to main content

noxu_log/
file_manager.rs

1//! File manager for log files.
2//!
3//!
4//! The FileManager presents the abstraction of one contiguous log file,
5//! managing the actual on-disk log files, file handles, and LSN allocation.
6
7use crate::error::{LogError, Result};
8use crate::file_handle::FileHandle;
9use crate::file_header::{
10    FILE_HEADER_SIZE, FileHeader, LOG_VERSION, on_disk_size,
11};
12use hashbrown::HashMap;
13use memmap2::Mmap;
14use noxu_latch::ExclusiveLatch;
15use noxu_sync::{Mutex, RwLock};
16use noxu_util::lsn::Lsn;
17use std::fs::{self, File, OpenOptions};
18use std::io::Write;
19use std::num::NonZeroUsize;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
23
24/// File extension for noxu database log files.
25pub const LOG_FILE_EXTENSION: &str = ".ndb";
26
27/// Lock file name for environment locking.
28pub const LOCK_FILE_NAME: &str = "noxu.lck";
29
30/// File mode for opening log files.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum FileMode {
33    /// Read-only access.
34    ReadOnly,
35    /// Read-write access.
36    ReadWrite,
37}
38
39/// Returns the byte offset of the first log entry in a **new** log file.
40///
41/// New files are always written as the current `LOG_VERSION`, so this
42/// returns `FILE_HEADER_SIZE` (36 for v3).  When reading an **existing**
43/// file use [`FileManager::file_header_size_for`] to account for v2 files
44/// whose first entry is at offset 32.
45#[inline]
46pub fn first_log_entry_offset() -> u32 {
47    FILE_HEADER_SIZE as u32
48}
49
50/// Formats a file number as an 8-digit lowercase hex string.
51///
52/// Example: 42 -> "0000002a"
53fn format_file_number(file_num: u32) -> String {
54    format!("{:08x}", file_num)
55}
56
57/// Parses a file number from a hex string filename.
58///
59/// Example: "0000002a.ndb" -> 42
60fn parse_file_number(filename: &str) -> Option<u32> {
61    let stem = filename.strip_suffix(LOG_FILE_EXTENSION)?;
62    u32::from_str_radix(stem, 16).ok()
63}
64
65/// LRU cache of open file handles.
66///
67/// The key is the log file
68/// number; values are `Arc`-wrapped so callers may hold a reference after the
69/// cache evicts the entry (matching `FileHandle` reference-counting
70/// pattern).  Capacity is configurable (default: `ENV_RUN_CLEANER_THREADS
71/// + 2`; Noxu default: 10).
72type FileHandleCache = lru::LruCache<u32, Arc<FileHandle>>;
73
74/// Manages log files in the environment directory.
75pub struct FileManager {
76    /// Environment directory path.
77    env_dir: PathBuf,
78    /// Whether the environment is read-only.
79    read_only: bool,
80    /// Maximum size of a single log file (bytes).
81    max_file_size: u64,
82    /// LRU cache of open file handles.
83    ///
84    /// Protected by `noxu_sync::Mutex` because `lru::LruCache::get()` mutates
85    /// the eviction order, so a shared read lock would not be safe.
86    file_cache: Mutex<FileHandleCache>,
87    /// Current file number being written to.
88    current_file_num: AtomicU32,
89    /// Next available LSN for writing.
90    next_available_lsn: AtomicU64,
91    /// Last LSN that was used in the current file.
92    last_used_lsn: AtomicU64,
93    /// Map of file number to last LSN used in that file (for file headers).
94    per_file_last_lsn: RwLock<HashMap<u32, Lsn>>,
95    /// Latch protecting file creation and file number advancement.
96    file_latch: ExclusiveLatch,
97    /// Lock file handle (for environment locking).
98    lock_file: RwLock<Option<File>>,
99    /// Number of log files opened (cache miss = new file open).
100    pub n_file_opens: AtomicU64,
101    /// Number of sequential read calls.
102    pub n_sequential_reads: AtomicU64,
103    /// Total bytes read sequentially.
104    pub n_sequential_read_bytes: AtomicU64,
105    /// Number of sequential write calls.
106    pub n_sequential_writes: AtomicU64,
107    /// Total bytes written sequentially.
108    pub n_sequential_write_bytes: AtomicU64,
109    /// Number of random (point-lookup) read operations.
110    pub n_random_reads: AtomicU64,
111    /// Total bytes from random read operations.
112    pub n_random_read_bytes: AtomicU64,
113}
114
115impl FileManager {
116    /// Creates a new FileManager.
117    ///
118    /// # Arguments
119    ///
120    /// * `env_dir` - Path to the environment directory
121    /// * `read_only` - Whether to open in read-only mode
122    /// * `max_file_size` - Maximum size of a single log file (bytes)
123    /// * `cache_size` - Maximum number of file handles to cache
124    ///
125    /// # Returns
126    ///
127    /// A new FileManager instance, or an error if the directory is invalid
128    /// or the environment is locked.
129    pub fn new(
130        env_dir: impl AsRef<Path>,
131        read_only: bool,
132        max_file_size: u64,
133        cache_size: usize,
134    ) -> Result<Self> {
135        let env_dir = env_dir.as_ref().to_path_buf();
136
137        // Verify directory exists
138        if !env_dir.exists() {
139            return Err(LogError::InvalidDirectory(format!(
140                "Environment directory does not exist: {}",
141                env_dir.display()
142            )));
143        }
144
145        if !env_dir.is_dir() {
146            return Err(LogError::InvalidDirectory(format!(
147                "Path is not a directory: {}",
148                env_dir.display()
149            )));
150        }
151
152        let capacity = NonZeroUsize::new(cache_size.max(1))
153            .expect("cache_size.max(1) is always >= 1");
154        let manager = FileManager {
155            env_dir,
156            read_only,
157            max_file_size,
158            file_cache: Mutex::new(lru::LruCache::new(capacity)),
159            current_file_num: AtomicU32::new(0),
160            next_available_lsn: AtomicU64::new(
161                Lsn::new(0, first_log_entry_offset()).as_u64(),
162            ),
163            last_used_lsn: AtomicU64::new(noxu_util::lsn::NULL_LSN.as_u64()),
164            per_file_last_lsn: RwLock::new(HashMap::new()),
165            file_latch: ExclusiveLatch::named("file_manager"),
166            lock_file: RwLock::new(None),
167            n_file_opens: AtomicU64::new(0),
168            n_sequential_reads: AtomicU64::new(0),
169            n_sequential_read_bytes: AtomicU64::new(0),
170            n_sequential_writes: AtomicU64::new(0),
171            n_sequential_write_bytes: AtomicU64::new(0),
172            n_random_reads: AtomicU64::new(0),
173            n_random_read_bytes: AtomicU64::new(0),
174        };
175
176        // Lock the environment
177        manager.lock_environment()?;
178
179        Ok(manager)
180    }
181
182    /// Locks the environment to prevent concurrent access.
183    fn lock_environment(&self) -> Result<()> {
184        if self.read_only {
185            // For read-only environments, we don't take an exclusive lock
186            // (in a full implementation, we'd use a shared lock)
187            return Ok(());
188        }
189
190        let lock_path = self.env_dir.join(LOCK_FILE_NAME);
191
192        // Try to create/open the lock file
193        let lock_file = OpenOptions::new()
194            .create(true)
195            .truncate(false)
196            .write(true)
197            .open(&lock_path)?;
198
199        // Try to acquire an exclusive lock.
200        // fs2::FileExt is supported on unix and windows; on other platforms
201        // (e.g. WASM, embedded) we skip the lock — acceptable since those
202        // targets run single-process environments.
203        #[cfg(any(unix, windows))]
204        {
205            use fs2::FileExt;
206            lock_file.try_lock_exclusive().map_err(|_| {
207                LogError::EnvironmentLocked(format!(
208                    "Environment is locked by another process: {}",
209                    self.env_dir.display()
210                ))
211            })?;
212        }
213
214        *self.lock_file.write() = Some(lock_file);
215
216        Ok(())
217    }
218
219    /// Returns the path to a log file for the given file number.
220    fn file_path(&self, file_num: u32) -> PathBuf {
221        let filename =
222            format!("{}{}", format_file_number(file_num), LOG_FILE_EXTENSION);
223        self.env_dir.join(filename)
224    }
225
226    /// Lists all log file numbers in the environment directory.
227    ///
228    /// Returns the file numbers sorted in ascending order.
229    pub fn list_file_numbers(&self) -> Result<Vec<u32>> {
230        let mut file_nums = Vec::new();
231
232        for entry in fs::read_dir(&self.env_dir)? {
233            let entry = entry?;
234            let filename = entry.file_name();
235            let filename_str = filename.to_string_lossy();
236
237            if let Some(file_num) = parse_file_number(&filename_str) {
238                file_nums.push(file_num);
239            }
240        }
241
242        file_nums.sort_unstable();
243        Ok(file_nums)
244    }
245
246    /// Returns the first (lowest numbered) file, or None if no files exist.
247    pub fn get_first_file_num(&self) -> Result<Option<u32>> {
248        Ok(self.list_file_numbers()?.into_iter().next())
249    }
250
251    /// Returns the last (highest numbered) file, or None if no files exist.
252    pub fn get_last_file_num(&self) -> Result<Option<u32>> {
253        Ok(self.list_file_numbers()?.into_iter().last())
254    }
255
256    /// Returns the configured maximum log file size in bytes.
257    /// Returns true if this FileManager was opened read-only.
258    pub fn is_read_only(&self) -> bool {
259        self.read_only
260    }
261
262    pub fn max_file_size(&self) -> u64 {
263        self.max_file_size
264    }
265
266    /// Returns the current file number being written to.
267    pub fn get_current_file_num(&self) -> u32 {
268        self.current_file_num.load(Ordering::Acquire)
269    }
270
271    /// Returns the next available LSN for writing.
272    pub fn get_next_available_lsn(&self) -> Lsn {
273        Lsn::from_u64(self.next_available_lsn.load(Ordering::Acquire))
274    }
275
276    /// Returns the last used LSN.
277    pub fn get_last_used_lsn(&self) -> Lsn {
278        Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire))
279    }
280
281    /// Sets the end-of-log position.
282    ///
283    /// Called during recovery to set where the log should continue from.
284    pub fn set_last_position(
285        &self,
286        next_available_lsn: Lsn,
287        last_used_lsn: Lsn,
288    ) {
289        self.last_used_lsn.store(last_used_lsn.as_u64(), Ordering::Release);
290        self.per_file_last_lsn
291            .write()
292            .insert(last_used_lsn.file_number(), last_used_lsn);
293        self.next_available_lsn
294            .store(next_available_lsn.as_u64(), Ordering::Release);
295        self.current_file_num
296            .store(next_available_lsn.file_number(), Ordering::Release);
297    }
298
299    /// Gets a file handle for the given file number.
300    ///
301    /// Checks the LRU cache first.
302    /// On a cache miss the file is opened, its header validated, and the
303    /// resulting `Arc<FileHandle>` is inserted — with automatic LRU eviction
304    /// when the cache is at capacity.  Because `lru::LruCache::get()` mutates
305    /// the eviction order, the entire lookup+insert is done under a single
306    /// `Mutex` lock, eliminating any TOCTOU race between a cache miss and the
307    /// subsequent insert.
308    pub fn get_file_handle(&self, file_num: u32) -> Result<Arc<FileHandle>> {
309        let mut cache = self.file_cache.lock();
310
311        // Fast path: cache hit — LruCache::get() promotes the entry to MRU.
312        if let Some(handle) = cache.get(&file_num) {
313            return Ok(handle.clone());
314        }
315
316        // Slow path: open the file, validate its header, and insert into cache.
317        let path = self.file_path(file_num);
318        if !path.exists() {
319            return Err(LogError::FileNotFound(format!(
320                "Log file not found: {}",
321                path.display()
322            )));
323        }
324
325        let mut handle = FileHandle::new(file_num);
326
327        // Open the file.
328        let file = if self.read_only {
329            File::open(&path)?
330        } else {
331            OpenOptions::new().read(true).write(true).open(&path)?
332        };
333
334        // Read and validate the header.
335        let log_version = self.read_and_validate_header(&file, file_num)?;
336
337        // Initialize the handle.
338        handle.init(file, log_version);
339
340        let handle = Arc::new(handle);
341
342        // Insert into the LRU cache (evicts the least-recently-used entry when
343        // the cache is at capacity, mirroring FileHandleCache eviction).
344        cache.put(file_num, handle.clone());
345        self.n_file_opens.fetch_add(1, Ordering::Relaxed);
346
347        Ok(handle)
348    }
349
350    /// Reads and validates the file header.
351    ///
352    /// Handles both v2 (32-byte) and v3 (36-byte) files:
353    ///
354    /// - Reads up to `FILE_HEADER_SIZE` (36) bytes, capped at the actual file
355    ///   length. For a v2 file the 32-byte header is followed by entry bytes
356    ///   that are irrelevant; `FileHeader::read_from` stops consuming after
357    ///   32 bytes when it sees `log_version == 2`.
358    /// - A v2 header-only file (exactly 32 bytes) is read cleanly without
359    ///   over-reading.
360    fn read_and_validate_header(
361        &self,
362        file: &File,
363        file_num: u32,
364    ) -> Result<u32> {
365        // Clamp to actual file length so we never over-read a legacy v2
366        // header-only file (exactly 32 bytes).
367        let file_len = file.metadata()?.len() as usize;
368        let read_size = FILE_HEADER_SIZE.min(file_len);
369        let mut header_buf = vec![0u8; read_size];
370        crate::posio::read_exact_at(file, &mut header_buf, 0)?;
371
372        // Parse header — read_from branches on version.
373        let mut cursor = std::io::Cursor::new(header_buf);
374        let header = FileHeader::read_from(&mut cursor)?;
375
376        // Validate
377        header.validate(file_num)
378    }
379
380    /// Returns the on-disk header size (= first-entry byte offset) for a
381    /// given log file.
382    ///
383    /// Opens (or cache-hits) the file to read its `log_version`, then
384    /// returns `FileHeader::on_disk_size(version)`:
385    ///
386    /// - v2 file → 32 bytes → first entry at offset 32
387    /// - v3 file → 36 bytes → first entry at offset 36
388    ///
389    /// Use this whenever computing the "first entry offset" for an
390    /// **existing** file instead of the bare `FILE_HEADER_SIZE` constant.
391    pub fn file_header_size_for(&self, file_num: u32) -> Result<usize> {
392        let handle = self.get_file_handle(file_num)?;
393        Ok(on_disk_size(handle.log_version()))
394    }
395
396    /// Creates a new log file with the given file number.
397    ///
398    /// Writes the file header with a link to the previous file.
399    pub fn create_file(&self, file_num: u32) -> Result<Arc<FileHandle>> {
400        let _guard = self
401            .file_latch
402            .acquire()
403            .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
404        self.create_file_internal(file_num)
405    }
406
407    /// Flips to the next log file.
408    ///
409    /// Called when the current file reaches its maximum size.
410    pub fn flip_file(&self) -> Result<u32> {
411        let _guard = self
412            .file_latch
413            .acquire()
414            .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
415
416        let current = self.current_file_num.load(Ordering::Acquire);
417        let next = current + 1;
418
419        // Save last LSN for current file
420        let last_lsn =
421            Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire));
422        if !last_lsn.is_null() {
423            self.per_file_last_lsn.write().insert(current, last_lsn);
424        }
425
426        // Create next file (note: create_file_internal doesn't acquire the latch)
427        self.create_file_internal(next)?;
428
429        // Update current file number
430        self.current_file_num.store(next, Ordering::Release);
431
432        // Update next available LSN to point to start of new file
433        self.next_available_lsn.store(
434            Lsn::new(next, first_log_entry_offset()).as_u64(),
435            Ordering::Release,
436        );
437
438        Ok(next)
439    }
440
441    /// Internal helper to create a file without acquiring the file latch.
442    fn create_file_internal(&self, file_num: u32) -> Result<Arc<FileHandle>> {
443        if self.read_only {
444            return Err(LogError::WriteFailed(
445                "Cannot create file in read-only mode".to_string(),
446            ));
447        }
448
449        let path = self.file_path(file_num);
450
451        // Create the file
452        let mut file = OpenOptions::new()
453            .read(true)
454            .write(true)
455            .create_new(true)
456            .open(&path)?;
457
458        // Determine last entry offset in previous file
459        let last_entry_offset = if file_num > 0 {
460            self.per_file_last_lsn
461                .read()
462                .get(&(file_num - 1))
463                .map(|lsn| lsn.file_offset())
464                .unwrap_or(0)
465        } else {
466            0
467        };
468
469        // Write the header
470        let header = FileHeader::new(file_num, last_entry_offset);
471        header.write_to(&mut file)?;
472        file.flush()?;
473        file.sync_all()?;
474
475        // C-1 (2026 audit F-3.1 / 2026 audit 1-G):
476        // After fsync-ing the new file, fsync the parent directory so the
477        // directory entry itself is durable.  Without this a power-loss between
478        // file creation and the next directory write loses the file entirely.
479        // Cross-platform: real dir-fsync on Unix; best-effort on Windows
480        // (directory handle needs FILE_FLAG_BACKUP_SEMANTICS; NTFS journals
481        // the entry).  See `crate::posio::sync_dir`.
482        crate::posio::sync_dir(&self.env_dir)?;
483
484        // Create handle
485        let mut handle = FileHandle::new(file_num);
486        handle.init(file, LOG_VERSION);
487
488        let handle = Arc::new(handle);
489
490        // Insert into the LRU cache.
491        self.file_cache.lock().put(file_num, handle.clone());
492
493        Ok(handle)
494    }
495
496    /// Deletes a log file.
497    ///
498    /// Used by the cleaner to remove old log files.
499    pub fn delete_file(&self, file_num: u32) -> Result<()> {
500        if self.read_only {
501            return Err(LogError::WriteFailed(
502                "Cannot delete file in read-only mode".to_string(),
503            ));
504        }
505
506        // Remove from cache.
507        self.file_cache.lock().pop(&file_num);
508
509        // Delete the file
510        let path = self.file_path(file_num);
511        if path.exists() {
512            fs::remove_file(&path)?;
513        }
514
515        Ok(())
516    }
517
518    /// Clears the file handle cache.
519    pub fn clear_cache(&self) {
520        self.file_cache.lock().clear();
521    }
522
523    /// Physically truncate log file `file_num` to `offset` bytes (JE
524    /// `FileManager.truncateSingleFile`, FileManager.java:2345). Used at
525    /// recovery to remove a torn / half-written trailing entry so it cannot
526    /// be misread on a later scan. Evicts the cached handle first so a stale
527    /// open handle does not see the old length.
528    pub fn truncate_single_file(
529        &self,
530        file_num: u32,
531        offset: u64,
532    ) -> Result<()> {
533        if self.read_only {
534            return Err(LogError::WriteFailed(
535                "Cannot truncate file in read-only mode".to_string(),
536            ));
537        }
538        // Drop any cached handle so the next open sees the truncated length.
539        self.file_cache.lock().pop(&file_num);
540        let path = self.file_path(file_num);
541        if path.exists() {
542            let f = fs::OpenOptions::new().write(true).open(&path)?;
543            f.set_len(offset)?;
544            f.sync_all()?;
545        }
546        Ok(())
547    }
548
549    /// Flip the invisible bit (flags 0x10) on each LSN's log-entry header,
550    /// in file order, WITHOUT recomputing the checksum.
551    ///
552    /// Port of JE `FileManager.makeInvisible` (called from
553    /// `RollbackTracker.setInvisible`). The invisible bit is excluded from the
554    /// CRC at read time (cloaked, see `LogEntryHeader.turnOffInvisible` /
555    /// `log_manager` checksum path), so flipping it in place is a single-byte
556    /// `pwrite` per entry. The flags byte is at `file_offset + FLAGS_OFFSET`
557    /// (offset 5) of each entry.
558    ///
559    /// Caller must `force` the affected files afterwards for durability
560    /// (JE `RollbackTracker.recoveryEndFsyncInvisible`).
561    pub fn make_invisible(&self, file_num: u32, offsets: &[u32]) -> Result<()> {
562        if self.read_only {
563            return Err(LogError::WriteFailed(
564                "Cannot make entries invisible in read-only mode".to_string(),
565            ));
566        }
567        if offsets.is_empty() {
568            return Ok(());
569        }
570        let path = self.file_path(file_num);
571        if !path.exists() {
572            return Ok(());
573        }
574        // Drop any cached handle so the bit flip is observed on the next read.
575        self.file_cache.lock().pop(&file_num);
576        let file = OpenOptions::new().read(true).write(true).open(&path)?;
577        // FLAGS_OFFSET within an entry header is 5 (checksum[0..4], type[4],
578        // flags[5]).
579        const FLAGS_OFFSET: u64 = 5;
580        const INVISIBLE_MASK: u8 = 0x10;
581        for &off in offsets {
582            let flags_pos = off as u64 + FLAGS_OFFSET;
583            let mut byte = [0u8; 1];
584            Self::pread_exact(&file, flags_pos, &mut byte)?;
585            byte[0] |= INVISIBLE_MASK;
586            Self::pwrite_exact(&file, flags_pos, &byte)?;
587        }
588        Ok(())
589    }
590
591    /// fsync the given set of log files (JE `FileManager.force`). Used after
592    /// `make_invisible` to make the rollback's invisible bits durable so a
593    /// crash mid-rollback does not re-apply rolled-back entries.
594    pub fn force(&self, file_nums: &[u32]) -> Result<()> {
595        if self.read_only {
596            return Ok(());
597        }
598        for &file_num in file_nums {
599            let path = self.file_path(file_num);
600            if path.exists() {
601                let f = OpenOptions::new().write(true).open(&path)?;
602                f.sync_all()?;
603            }
604        }
605        Ok(())
606    }
607
608    #[cfg(unix)]
609    fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
610        use std::os::unix::fs::FileExt;
611        file.read_exact_at(buf, offset)?;
612        Ok(())
613    }
614
615    #[cfg(unix)]
616    fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
617        use std::os::unix::fs::FileExt;
618        file.write_all_at(buf, offset)?;
619        Ok(())
620    }
621
622    #[cfg(not(unix))]
623    fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
624        use std::io::{Read, Seek, SeekFrom};
625        let mut f = file.try_clone()?;
626        f.seek(SeekFrom::Start(offset))?;
627        f.read_exact(buf)?;
628        Ok(())
629    }
630
631    #[cfg(not(unix))]
632    fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
633        use std::io::{Seek, SeekFrom, Write};
634        let mut f = file.try_clone()?;
635        f.seek(SeekFrom::Start(offset))?;
636        f.write_all(buf)?;
637        Ok(())
638    }
639
640    /// Truncate the log at (`file_num`, `offset`): truncate `file_num` to
641    /// `offset` and delete every higher-numbered file, in descending order to
642    /// avoid a log-entry gap (JE `FileManager.truncateLog`, FileManager.java:2374,
643    /// SR [#19463]). If `offset == 0` the file header itself is gone, so the
644    /// whole file is deleted too.
645    pub fn truncate_log(&self, file_num: u32, offset: u64) -> Result<()> {
646        if self.read_only {
647            return Err(LogError::WriteFailed(
648                "Cannot truncate log in read-only mode".to_string(),
649            ));
650        }
651        let last = self.get_last_file_num()?.unwrap_or(file_num);
652        let mut i = last as i64;
653        while i >= file_num as i64 {
654            let fnum = i as u32;
655            if self.file_path(fnum).exists() {
656                if fnum == file_num {
657                    self.truncate_single_file(fnum, offset)?;
658                    if offset != 0 {
659                        i -= 1;
660                        continue;
661                    }
662                }
663                self.delete_file(fnum)?;
664            }
665            i -= 1;
666        }
667        Ok(())
668    }
669
670    /// Writes `data` to the current log file at the given file offset.
671    ///
672    /// `writeToFile()`.  The caller must supply the exact file-level byte
673    /// offset at which `data` should be written (i.e. `firstLsn.fileOffset`
674    /// in terms).  After a successful write the method checks whether the
675    /// file has grown past `max_file_size`; if so it calls `flip_file()` and
676    /// returns the new file number, otherwise it returns the current one.
677    ///
678    /// # Arguments
679    /// * `data`        - The raw bytes to append (header + payload).
680    /// * `file_offset` - Byte offset within the file at which to write.
681    ///
682    /// # Returns
683    /// The file number that was actually written to.
684    /// Writes `data` at `file_offset` within log file `file_num`.
685    ///
686    /// JE faithfulness: JE `FileManager.writeLogBuffer` uses
687    /// `fullBuffer.getFirstLsn()` to determine which file to write to, not
688    /// `currentFileNum`.  This method mirrors that by accepting an explicit
689    /// `file_num` parameter so `write_dirty` and `fill_flush_pending` can
690    /// write dirty buffers to the file their `first_lsn` belongs to.
691    ///
692    /// The auto-flip (check `file_len >= max_file_size` and call `flip_file`)
693    /// has been removed: file flips are managed exclusively by
694    /// `LogManager::log_internal` via the `flipped` flag and
695    /// `get_write_buffer`/`sync_log_end_and_finish_file`.  Auto-flip in this
696    /// method would race with the explicit flip and double-create files.
697    pub fn write_buffer_to_file(
698        &self,
699        file_num: u32,
700        data: &[u8],
701        file_offset: u64,
702    ) -> Result<()> {
703        if self.read_only {
704            return Err(LogError::WriteFailed(
705                "Cannot write in read-only mode".to_string(),
706            ));
707        }
708
709        // Obtain (or create) the file handle under `file_latch`.
710        //
711        // We MUST hold `file_latch` for the entire exists-check → get/create
712        // sequence to avoid a TOCTOU race:
713        //
714        //   Thread A: inside create_file_internal (created empty file, writing
715        //             header but not done yet)
716        //   Thread B: file_path.exists()=true → get_file_handle → tries to
717        //             read the header from an empty file → "failed to fill
718        //             whole buffer" (UnexpectedEof)
719        //
720        // Holding file_latch serialises creation and subsequent opens so that
721        // Thread B waits until Thread A's create_file_internal (which also
722        // holds file_latch) has written and fsynced the full header.
723        let handle = {
724            let _guard = self
725                .file_latch
726                .acquire()
727                .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
728
729            if self.file_path(file_num).exists() {
730                self.get_file_handle(file_num)?
731            } else {
732                // create_file_internal (called here directly, since we already
733                // hold file_latch) creates the file, writes the header, fsyncs.
734                self.create_file_internal(file_num)?
735            }
736        };
737
738        {
739            let mut guard = handle.acquire()?;
740            guard.write_at(file_offset, data)?;
741        }
742
743        self.n_sequential_writes.fetch_add(1, Ordering::Relaxed);
744        self.n_sequential_write_bytes
745            .fetch_add(data.len() as u64, Ordering::Relaxed);
746
747        Ok(())
748    }
749
750    /// Writes `data` at `file_offset` within the CURRENT log file.
751    ///
752    /// For new entries written by `LogManager::log_internal` when the entry is
753    /// too large for the buffer pool (temp-buffer path). The current file is
754    /// always correct here because `set_last_position` has already advanced
755    /// `current_file_num` to the file that holds `current_lsn`.
756    ///
757    /// Callers that write data belonging to a SPECIFIC file (dirty buffer
758    /// flush in `write_dirty` / `fill_flush_pending`) must use
759    /// `write_buffer_to_file` instead to avoid writing old data to the
760    /// wrong file after a flip.
761    pub fn write_buffer(&self, data: &[u8], file_offset: u64) -> Result<u32> {
762        if self.read_only {
763            return Err(LogError::WriteFailed(
764                "Cannot write in read-only mode".to_string(),
765            ));
766        }
767
768        let file_num = self.current_file_num.load(Ordering::Acquire);
769        self.write_buffer_to_file(file_num, data, file_offset)?;
770        Ok(file_num)
771    }
772
773    /// Reads bytes from a log file at a given offset.
774    ///
775    ///
776    ///
777    /// # Arguments
778    /// * `file_num` - The log file number to read from.
779    /// * `offset`   - Byte offset within the file.
780    /// * `buf`      - Output buffer; filled with as many bytes as available
781    ///   (may be less than `buf.len()` at end of file).
782    ///
783    /// # Returns
784    /// The number of bytes actually read.
785    pub fn read_from_file(
786        &self,
787        file_num: u32,
788        offset: u64,
789        buf: &mut [u8],
790    ) -> Result<usize> {
791        let handle = self.get_file_handle(file_num)?;
792        let mut guard = handle.acquire()?;
793        let n = guard.read_at(offset, buf)?;
794        self.n_sequential_reads.fetch_add(1, Ordering::Relaxed);
795        self.n_sequential_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
796        Ok(n)
797    }
798
799    /// Reads bytes from a log file at a given offset, counted as a random
800    /// (point-lookup) read rather than a sequential scan read.
801    ///
802    /// Used by `LogManager::read_at_lsn` for in-flight log reads.
803    pub fn read_from_file_random(
804        &self,
805        file_num: u32,
806        offset: u64,
807        buf: &mut [u8],
808    ) -> Result<usize> {
809        let handle = self.get_file_handle(file_num)?;
810        let mut guard = handle.acquire()?;
811        let n = guard.read_at(offset, buf)?;
812        self.n_random_reads.fetch_add(1, Ordering::Relaxed);
813        self.n_random_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
814        Ok(n)
815    }
816
817    /// Returns the length of a log file in bytes.
818    pub fn get_file_length(&self, file_num: u32) -> Result<u64> {
819        let path = self.file_path(file_num);
820        if !path.exists() {
821            return Err(LogError::FileNotFound(format!(
822                "Log file not found: {}",
823                path.display()
824            )));
825        }
826        Ok(path.metadata()?.len())
827    }
828
829    /// Memory-maps a log file for read-only sequential access.
830    ///
831    /// Returns a `Mmap` covering the entire file.  The OS handles page-in
832    /// lazily with automatic sequential read-ahead, eliminating all per-entry
833    /// `pread64` syscalls during recovery scanning.
834    ///
835    /// # Safety
836    /// The caller must not hold a mutable reference into the mapped memory
837    /// while other processes write to the file.  During recovery, log files
838    /// are read-only, making this safe.
839    pub fn mmap_file(&self, file_num: u32) -> Result<Mmap> {
840        // Never mmap the current write file. It can be appended to
841        // concurrently (pwrite64 on the log-writer thread) while a
842        // disk-ordered cursor reads it; `memmap2` requires that a mapped file
843        // is not modified for the lifetime of the mapping, so mapping the live
844        // write file would be undefined behaviour. Callers (e.g. the
845        // file-manager log scanner) fall back to positioned `pread` reads,
846        // which are safe under concurrent appends. Complete (sealed) files are
847        // never written again and are safe to map.
848        if file_num == self.get_current_file_num() {
849            return Err(LogError::Io(std::io::Error::other(format!(
850                "refusing to mmap the current write file {file_num} \
851                 (may be concurrently appended); use pread fallback"
852            ))));
853        }
854        let path = self.file_path(file_num);
855        let file = File::open(&path).map_err(|e| {
856            LogError::FileNotFound(format!(
857                "Cannot open {:?} for mmap: {}",
858                path, e
859            ))
860        })?;
861        // SAFETY: `file_num` is not the current write file (checked above), so
862        // it is a sealed log file whose bytes do not change for the lifetime
863        // of the mapping.
864        let mmap = unsafe { Mmap::map(&file) }.map_err(|e| {
865            LogError::Io(std::io::Error::other(format!(
866                "mmap {:?}: {}",
867                path, e
868            )))
869        })?;
870        Ok(mmap)
871    }
872
873    /// Returns current I/O statistics for this FileManager.
874    pub fn get_io_stats(&self) -> FileManagerIoStats {
875        FileManagerIoStats {
876            n_file_opens: self.n_file_opens.load(Ordering::Relaxed),
877            n_sequential_reads: self.n_sequential_reads.load(Ordering::Relaxed),
878            n_sequential_read_bytes: self
879                .n_sequential_read_bytes
880                .load(Ordering::Relaxed),
881            n_sequential_writes: self
882                .n_sequential_writes
883                .load(Ordering::Relaxed),
884            n_sequential_write_bytes: self
885                .n_sequential_write_bytes
886                .load(Ordering::Relaxed),
887            n_random_reads: self.n_random_reads.load(Ordering::Relaxed),
888            n_random_read_bytes: self
889                .n_random_read_bytes
890                .load(Ordering::Relaxed),
891        }
892    }
893
894    /// Fsyncs the current log file to stable storage and removes it from the
895    /// file-handle cache, making the old file handle eligible for GC.
896    ///
897    /// JE faithfulness (Part-3, DRIFT-3/7): mirrors
898    /// `FileManager.syncLogEndAndFinishFile()` which calls `syncLogEnd()` then
899    /// `endOfLog.close()`.  Called by `LogBufferPool.getWriteBuffer` when
900    /// `flippedFile=true`, under the LWL, BEFORE `advanceLsn` advances the
901    /// LSN bookkeeping to the new file.  This establishes the JE invariant
902    /// that the OLD file is durably closed before any entry is written to the
903    /// NEW file.
904    ///
905    /// References:
906    /// - JE `FileManager.syncLogEndAndFinishFile` (line 2077)
907    /// - JE `LogBufferPool.getWriteBuffer` (called after `bumpAndWriteDirty`
908    ///   when `flippedFile=true`)
909    pub fn sync_log_end_and_finish_file(&self) -> Result<()> {
910        self.sync_log_end()?;
911        // Evict the current (old) file from the LRU cache so its OS file
912        // descriptor is released promptly — JE `endOfLog.close()`.
913        let file_num = self.current_file_num.load(Ordering::Acquire);
914        let mut cache = self.file_cache.lock();
915        cache.pop(&file_num);
916        Ok(())
917    }
918
919    /// Fsyncs the current log file to stable storage.
920    ///
921    /// JE: `FileManager.syncLogEnd()` (called from `syncLogEndAndFinishFile`).
922    pub fn sync_log_end(&self) -> Result<()> {
923        if self.read_only {
924            return Ok(());
925        }
926
927        let file_num = self.current_file_num.load(Ordering::Acquire);
928        let path = self.file_path(file_num);
929
930        if !path.exists() {
931            // Nothing to sync yet.
932            return Ok(());
933        }
934
935        let handle = self.get_file_handle(file_num)?;
936        let mut guard = handle.acquire()?;
937        // Use fdatasync (sync_data) — only log data must be durable here,
938        // not file metadata.  uses FileChannel.force(false) for this.
939        guard.sync_data()?;
940        Ok(())
941    }
942
943    /// Closes the file manager, releasing all resources.
944    pub fn close(&self) -> Result<()> {
945        self.clear_cache();
946
947        // Release the lock file
948        if let Some(lock_file) = self.lock_file.write().take() {
949            {
950                #[allow(unused_imports)]
951                use fs2::FileExt;
952                let _ = lock_file.unlock();
953            }
954            drop(lock_file);
955        }
956
957        Ok(())
958    }
959}
960
961impl Drop for FileManager {
962    fn drop(&mut self) {
963        let _ = self.close();
964    }
965}
966
967/// Snapshot of FileManager I/O statistics.
968///
969/// FILEMGR_FILE_OPENS, FILEMGR_SEQUENTIAL_READS/WRITES,
970/// FILEMGR_RANDOM_READS etc.
971#[derive(Debug, Clone, Default)]
972pub struct FileManagerIoStats {
973    /// Number of log files opened (LRU cache miss).
974    pub n_file_opens: u64,
975    /// Number of sequential read operations (recovery scan).
976    pub n_sequential_reads: u64,
977    /// Total bytes read sequentially.
978    pub n_sequential_read_bytes: u64,
979    /// Number of sequential write operations.
980    pub n_sequential_writes: u64,
981    /// Total bytes written sequentially.
982    pub n_sequential_write_bytes: u64,
983    /// Number of random (point-lookup) read operations.
984    pub n_random_reads: u64,
985    /// Total bytes from random read operations.
986    pub n_random_read_bytes: u64,
987}
988
989#[cfg(test)]
990mod tests {
991    use super::*;
992    use tempfile::TempDir;
993
994    #[test]
995    fn test_format_parse_file_number() {
996        assert_eq!(format_file_number(0), "00000000");
997        assert_eq!(format_file_number(42), "0000002a");
998        assert_eq!(format_file_number(255), "000000ff");
999        assert_eq!(format_file_number(0x12345678), "12345678");
1000
1001        assert_eq!(parse_file_number("00000000.ndb"), Some(0));
1002        assert_eq!(parse_file_number("0000002a.ndb"), Some(42));
1003        assert_eq!(parse_file_number("000000ff.ndb"), Some(255));
1004        assert_eq!(parse_file_number("12345678.ndb"), Some(0x12345678));
1005
1006        assert_eq!(parse_file_number("invalid.ndb"), None);
1007        assert_eq!(parse_file_number("00000000.txt"), None);
1008    }
1009
1010    #[test]
1011    fn test_file_manager_create() {
1012        let temp_dir = TempDir::new().unwrap();
1013        let manager =
1014            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1015
1016        assert_eq!(manager.get_current_file_num(), 0);
1017        assert_eq!(manager.get_first_file_num().unwrap(), None);
1018    }
1019
1020    #[test]
1021    fn test_file_manager_create_file() {
1022        let temp_dir = TempDir::new().unwrap();
1023        let manager =
1024            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1025
1026        let handle = manager.create_file(0).unwrap();
1027        assert_eq!(handle.file_num(), 0);
1028        assert_eq!(handle.log_version(), LOG_VERSION);
1029
1030        // File should exist
1031        let path = manager.file_path(0);
1032        assert!(path.exists());
1033
1034        // Should be able to get it again from cache
1035        let handle2 = manager.get_file_handle(0).unwrap();
1036        assert_eq!(handle2.file_num(), 0);
1037    }
1038
1039    #[test]
1040    fn test_file_manager_list_files() {
1041        let temp_dir = TempDir::new().unwrap();
1042        let manager =
1043            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1044
1045        manager.create_file(0).unwrap();
1046        manager.create_file(2).unwrap();
1047        manager.create_file(1).unwrap();
1048
1049        let files = manager.list_file_numbers().unwrap();
1050        assert_eq!(files, vec![0, 1, 2]);
1051
1052        assert_eq!(manager.get_first_file_num().unwrap(), Some(0));
1053        assert_eq!(manager.get_last_file_num().unwrap(), Some(2));
1054    }
1055
1056    #[test]
1057    fn test_file_manager_flip_file() {
1058        let temp_dir = TempDir::new().unwrap();
1059
1060        {
1061            let manager =
1062                FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1063                    .unwrap();
1064
1065            // Create initial file
1066            manager.create_file(0).unwrap();
1067
1068            // Set current file
1069            manager.current_file_num.store(0, Ordering::Release);
1070            manager
1071                .last_used_lsn
1072                .store(Lsn::new(0, 1000).as_u64(), Ordering::Release);
1073
1074            // Flip to next file
1075            let next = manager.flip_file().unwrap();
1076            assert_eq!(next, 1);
1077            assert_eq!(manager.get_current_file_num(), 1);
1078
1079            // Should have created file 1
1080            let files = manager.list_file_numbers().unwrap();
1081            assert!(files.contains(&1));
1082        } // manager dropped here, releasing lock
1083    }
1084
1085    #[test]
1086    fn test_environment_locking() {
1087        let temp_dir = TempDir::new().unwrap();
1088
1089        // First manager locks the environment
1090        let _manager1 =
1091            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1092
1093        // Second manager should fail to lock
1094        let result = FileManager::new(temp_dir.path(), false, 10_000_000, 100);
1095        assert!(result.is_err());
1096        match result {
1097            Err(LogError::EnvironmentLocked(_)) => (),
1098            _ => panic!("Expected EnvironmentLocked error"),
1099        }
1100    }
1101
1102    #[test]
1103    fn test_nonexistent_directory_fails() {
1104        let result = FileManager::new(
1105            "/tmp/does_not_exist_noxu_xyz",
1106            false,
1107            10_000_000,
1108            100,
1109        );
1110        assert!(result.is_err());
1111        match result {
1112            Err(LogError::InvalidDirectory(_)) => (),
1113            _ => panic!("Expected InvalidDirectory error"),
1114        }
1115    }
1116
1117    #[test]
1118    fn test_get_file_handle_missing_file_fails() {
1119        let temp_dir = TempDir::new().unwrap();
1120        let manager =
1121            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1122
1123        let result = manager.get_file_handle(99);
1124        assert!(result.is_err());
1125        match result {
1126            Err(LogError::FileNotFound(_)) => (),
1127            _ => panic!("Expected FileNotFound error"),
1128        }
1129    }
1130
1131    #[test]
1132    fn test_delete_file() {
1133        let temp_dir = TempDir::new().unwrap();
1134        let manager =
1135            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1136
1137        manager.create_file(0).unwrap();
1138        assert!(manager.file_path(0).exists());
1139
1140        manager.delete_file(0).unwrap();
1141        assert!(!manager.file_path(0).exists());
1142        assert_eq!(manager.list_file_numbers().unwrap(), Vec::<u32>::new());
1143    }
1144
1145    #[test]
1146    fn test_delete_nonexistent_file_is_ok() {
1147        let temp_dir = TempDir::new().unwrap();
1148        let manager =
1149            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1150
1151        // Deleting a file that does not exist should not return an error.
1152        assert!(manager.delete_file(42).is_ok());
1153    }
1154
1155    #[test]
1156    fn test_set_last_position() {
1157        let temp_dir = TempDir::new().unwrap();
1158        let manager =
1159            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1160
1161        let next = Lsn::new(3, 1024);
1162        let last = Lsn::new(2, 512);
1163        manager.set_last_position(next, last);
1164
1165        assert_eq!(manager.get_next_available_lsn(), next);
1166        assert_eq!(manager.get_last_used_lsn(), last);
1167        assert_eq!(manager.get_current_file_num(), 3);
1168    }
1169
1170    #[test]
1171    fn test_read_only_create_file_fails() {
1172        let temp_dir = TempDir::new().unwrap();
1173        // Create a writable manager first to avoid the lock conflict.
1174        {
1175            let _mgr =
1176                FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1177                    .unwrap();
1178        } // lock released on drop
1179
1180        // Read-only mode must not create files.
1181        let ro_mgr =
1182            FileManager::new(temp_dir.path(), true, 10_000_000, 100).unwrap();
1183        let result = ro_mgr.create_file(0);
1184        assert!(result.is_err());
1185    }
1186
1187    #[test]
1188    fn test_first_and_last_file_num_empty() {
1189        let temp_dir = TempDir::new().unwrap();
1190        let manager =
1191            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1192
1193        assert_eq!(manager.get_first_file_num().unwrap(), None);
1194        assert_eq!(manager.get_last_file_num().unwrap(), None);
1195    }
1196
1197    #[test]
1198    fn test_clear_cache() {
1199        let temp_dir = TempDir::new().unwrap();
1200        let manager =
1201            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1202
1203        manager.create_file(0).unwrap();
1204        // Clearing the cache should not panic or corrupt state.
1205        manager.clear_cache();
1206
1207        // After clearing, get_file_handle must re-open the file.
1208        let handle = manager.get_file_handle(0).unwrap();
1209        assert_eq!(handle.file_num(), 0);
1210    }
1211
1212    /// C-1 regression: parent directory must be fsynced after creating each
1213    /// new log file so the directory entry is durable across a power loss.
1214    ///
1215    /// This test verifies that `create_file_internal` completes without error
1216    /// (which confirms the dir-open + sync_all code path runs), and that
1217    /// the created file is visible in a directory listing performed after the
1218    /// call returns — i.e. the same state recovery would see after a restart.
1219    #[test]
1220    fn test_parent_dir_fsynced_after_file_create() {
1221        let temp_dir = TempDir::new().unwrap();
1222        let manager =
1223            FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1224
1225        // Creating file 0 must succeed (includes parent-dir fsync).
1226        manager.create_file(0).unwrap();
1227
1228        // The file must be present in the directory listing — the same check
1229        // recovery performs when scanning for log files to replay.
1230        let listed = manager.list_file_numbers().unwrap();
1231        assert_eq!(
1232            listed,
1233            vec![0],
1234            "file 0 must be visible in dir listing after create"
1235        );
1236
1237        // Create a second file (flip) to exercise the path for file_num > 0.
1238        manager.flip_file().unwrap();
1239        let listed2 = manager.list_file_numbers().unwrap();
1240        assert!(listed2.contains(&1), "file 1 must be visible after flip");
1241    }
1242}