noxu_log/file_manager.rs
1//! File manager for log files.
2//!
3//!
4//! The FileManager presents the abstraction of one contiguous log file,
5//! managing the actual on-disk log files, file handles, and LSN allocation.
6
7use crate::error::{LogError, Result};
8use crate::file_handle::FileHandle;
9use crate::file_header::{
10 FILE_HEADER_SIZE, FileHeader, LOG_VERSION, on_disk_size,
11};
12use hashbrown::HashMap;
13use memmap2::Mmap;
14use noxu_latch::ExclusiveLatch;
15use noxu_sync::{Mutex, RwLock};
16use noxu_util::lsn::Lsn;
17use std::fs::{self, File, OpenOptions};
18use std::io::Write;
19use std::num::NonZeroUsize;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
23
24/// File extension for noxu database log files.
25pub const LOG_FILE_EXTENSION: &str = ".ndb";
26
27/// Lock file name for environment locking.
28pub const LOCK_FILE_NAME: &str = "noxu.lck";
29
30/// File mode for opening log files.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum FileMode {
33 /// Read-only access.
34 ReadOnly,
35 /// Read-write access.
36 ReadWrite,
37}
38
39/// Returns the byte offset of the first log entry in a **new** log file.
40///
41/// New files are always written as the current `LOG_VERSION`, so this
42/// returns `FILE_HEADER_SIZE` (36 for v3). When reading an **existing**
43/// file use [`FileManager::file_header_size_for`] to account for v2 files
44/// whose first entry is at offset 32.
45#[inline]
46pub fn first_log_entry_offset() -> u32 {
47 FILE_HEADER_SIZE as u32
48}
49
50/// Formats a file number as an 8-digit lowercase hex string.
51///
52/// Example: 42 -> "0000002a"
53fn format_file_number(file_num: u32) -> String {
54 format!("{:08x}", file_num)
55}
56
57/// Parses a file number from a hex string filename.
58///
59/// Example: "0000002a.ndb" -> 42
60fn parse_file_number(filename: &str) -> Option<u32> {
61 let stem = filename.strip_suffix(LOG_FILE_EXTENSION)?;
62 u32::from_str_radix(stem, 16).ok()
63}
64
65/// LRU cache of open file handles.
66///
67/// The key is the log file
68/// number; values are `Arc`-wrapped so callers may hold a reference after the
69/// cache evicts the entry (matching `FileHandle` reference-counting
70/// pattern). Capacity is configurable (default: `ENV_RUN_CLEANER_THREADS
71/// + 2`; Noxu default: 10).
72type FileHandleCache = lru::LruCache<u32, Arc<FileHandle>>;
73
74/// Manages log files in the environment directory.
75pub struct FileManager {
76 /// Environment directory path.
77 env_dir: PathBuf,
78 /// Whether the environment is read-only.
79 read_only: bool,
80 /// Maximum size of a single log file (bytes).
81 max_file_size: u64,
82 /// LRU cache of open file handles.
83 ///
84 /// Protected by `noxu_sync::Mutex` because `lru::LruCache::get()` mutates
85 /// the eviction order, so a shared read lock would not be safe.
86 file_cache: Mutex<FileHandleCache>,
87 /// Current file number being written to.
88 current_file_num: AtomicU32,
89 /// Next available LSN for writing.
90 next_available_lsn: AtomicU64,
91 /// Last LSN that was used in the current file.
92 last_used_lsn: AtomicU64,
93 /// Map of file number to last LSN used in that file (for file headers).
94 per_file_last_lsn: RwLock<HashMap<u32, Lsn>>,
95 /// Latch protecting file creation and file number advancement.
96 file_latch: ExclusiveLatch,
97 /// Lock file handle (for environment locking).
98 lock_file: RwLock<Option<File>>,
99 /// Number of log files opened (cache miss = new file open).
100 pub n_file_opens: AtomicU64,
101 /// Number of sequential read calls.
102 pub n_sequential_reads: AtomicU64,
103 /// Total bytes read sequentially.
104 pub n_sequential_read_bytes: AtomicU64,
105 /// Number of sequential write calls.
106 pub n_sequential_writes: AtomicU64,
107 /// Total bytes written sequentially.
108 pub n_sequential_write_bytes: AtomicU64,
109 /// Number of random (point-lookup) read operations.
110 pub n_random_reads: AtomicU64,
111 /// Total bytes from random read operations.
112 pub n_random_read_bytes: AtomicU64,
113}
114
115impl FileManager {
116 /// Creates a new FileManager.
117 ///
118 /// # Arguments
119 ///
120 /// * `env_dir` - Path to the environment directory
121 /// * `read_only` - Whether to open in read-only mode
122 /// * `max_file_size` - Maximum size of a single log file (bytes)
123 /// * `cache_size` - Maximum number of file handles to cache
124 ///
125 /// # Returns
126 ///
127 /// A new FileManager instance, or an error if the directory is invalid
128 /// or the environment is locked.
129 pub fn new(
130 env_dir: impl AsRef<Path>,
131 read_only: bool,
132 max_file_size: u64,
133 cache_size: usize,
134 ) -> Result<Self> {
135 let env_dir = env_dir.as_ref().to_path_buf();
136
137 // Verify directory exists
138 if !env_dir.exists() {
139 return Err(LogError::InvalidDirectory(format!(
140 "Environment directory does not exist: {}",
141 env_dir.display()
142 )));
143 }
144
145 if !env_dir.is_dir() {
146 return Err(LogError::InvalidDirectory(format!(
147 "Path is not a directory: {}",
148 env_dir.display()
149 )));
150 }
151
152 let capacity = NonZeroUsize::new(cache_size.max(1))
153 .expect("cache_size.max(1) is always >= 1");
154 let manager = FileManager {
155 env_dir,
156 read_only,
157 max_file_size,
158 file_cache: Mutex::new(lru::LruCache::new(capacity)),
159 current_file_num: AtomicU32::new(0),
160 next_available_lsn: AtomicU64::new(
161 Lsn::new(0, first_log_entry_offset()).as_u64(),
162 ),
163 last_used_lsn: AtomicU64::new(noxu_util::lsn::NULL_LSN.as_u64()),
164 per_file_last_lsn: RwLock::new(HashMap::new()),
165 file_latch: ExclusiveLatch::named("file_manager"),
166 lock_file: RwLock::new(None),
167 n_file_opens: AtomicU64::new(0),
168 n_sequential_reads: AtomicU64::new(0),
169 n_sequential_read_bytes: AtomicU64::new(0),
170 n_sequential_writes: AtomicU64::new(0),
171 n_sequential_write_bytes: AtomicU64::new(0),
172 n_random_reads: AtomicU64::new(0),
173 n_random_read_bytes: AtomicU64::new(0),
174 };
175
176 // Lock the environment
177 manager.lock_environment()?;
178
179 Ok(manager)
180 }
181
182 /// Locks the environment to prevent concurrent access.
183 fn lock_environment(&self) -> Result<()> {
184 if self.read_only {
185 // For read-only environments, we don't take an exclusive lock
186 // (in a full implementation, we'd use a shared lock)
187 return Ok(());
188 }
189
190 let lock_path = self.env_dir.join(LOCK_FILE_NAME);
191
192 // Try to create/open the lock file
193 let lock_file = OpenOptions::new()
194 .create(true)
195 .truncate(false)
196 .write(true)
197 .open(&lock_path)?;
198
199 // Try to acquire an exclusive lock.
200 // fs2::FileExt is supported on unix and windows; on other platforms
201 // (e.g. WASM, embedded) we skip the lock — acceptable since those
202 // targets run single-process environments.
203 #[cfg(any(unix, windows))]
204 {
205 use fs2::FileExt;
206 lock_file.try_lock_exclusive().map_err(|_| {
207 LogError::EnvironmentLocked(format!(
208 "Environment is locked by another process: {}",
209 self.env_dir.display()
210 ))
211 })?;
212 }
213
214 *self.lock_file.write() = Some(lock_file);
215
216 Ok(())
217 }
218
219 /// Returns the path to a log file for the given file number.
220 fn file_path(&self, file_num: u32) -> PathBuf {
221 let filename =
222 format!("{}{}", format_file_number(file_num), LOG_FILE_EXTENSION);
223 self.env_dir.join(filename)
224 }
225
226 /// Lists all log file numbers in the environment directory.
227 ///
228 /// Returns the file numbers sorted in ascending order.
229 pub fn list_file_numbers(&self) -> Result<Vec<u32>> {
230 let mut file_nums = Vec::new();
231
232 for entry in fs::read_dir(&self.env_dir)? {
233 let entry = entry?;
234 let filename = entry.file_name();
235 let filename_str = filename.to_string_lossy();
236
237 if let Some(file_num) = parse_file_number(&filename_str) {
238 file_nums.push(file_num);
239 }
240 }
241
242 file_nums.sort_unstable();
243 Ok(file_nums)
244 }
245
246 /// Returns the first (lowest numbered) file, or None if no files exist.
247 pub fn get_first_file_num(&self) -> Result<Option<u32>> {
248 Ok(self.list_file_numbers()?.into_iter().next())
249 }
250
251 /// Returns the last (highest numbered) file, or None if no files exist.
252 pub fn get_last_file_num(&self) -> Result<Option<u32>> {
253 Ok(self.list_file_numbers()?.into_iter().last())
254 }
255
256 /// Returns the configured maximum log file size in bytes.
257 /// Returns true if this FileManager was opened read-only.
258 pub fn is_read_only(&self) -> bool {
259 self.read_only
260 }
261
262 pub fn max_file_size(&self) -> u64 {
263 self.max_file_size
264 }
265
266 /// Returns the current file number being written to.
267 pub fn get_current_file_num(&self) -> u32 {
268 self.current_file_num.load(Ordering::Acquire)
269 }
270
271 /// Returns the next available LSN for writing.
272 pub fn get_next_available_lsn(&self) -> Lsn {
273 Lsn::from_u64(self.next_available_lsn.load(Ordering::Acquire))
274 }
275
276 /// Returns the last used LSN.
277 pub fn get_last_used_lsn(&self) -> Lsn {
278 Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire))
279 }
280
281 /// Sets the end-of-log position.
282 ///
283 /// Called during recovery to set where the log should continue from.
284 pub fn set_last_position(
285 &self,
286 next_available_lsn: Lsn,
287 last_used_lsn: Lsn,
288 ) {
289 self.last_used_lsn.store(last_used_lsn.as_u64(), Ordering::Release);
290 self.per_file_last_lsn
291 .write()
292 .insert(last_used_lsn.file_number(), last_used_lsn);
293 self.next_available_lsn
294 .store(next_available_lsn.as_u64(), Ordering::Release);
295 self.current_file_num
296 .store(next_available_lsn.file_number(), Ordering::Release);
297 }
298
299 /// Gets a file handle for the given file number.
300 ///
301 /// Checks the LRU cache first.
302 /// On a cache miss the file is opened, its header validated, and the
303 /// resulting `Arc<FileHandle>` is inserted — with automatic LRU eviction
304 /// when the cache is at capacity. Because `lru::LruCache::get()` mutates
305 /// the eviction order, the entire lookup+insert is done under a single
306 /// `Mutex` lock, eliminating any TOCTOU race between a cache miss and the
307 /// subsequent insert.
308 pub fn get_file_handle(&self, file_num: u32) -> Result<Arc<FileHandle>> {
309 let mut cache = self.file_cache.lock();
310
311 // Fast path: cache hit — LruCache::get() promotes the entry to MRU.
312 if let Some(handle) = cache.get(&file_num) {
313 return Ok(handle.clone());
314 }
315
316 // Slow path: open the file, validate its header, and insert into cache.
317 let path = self.file_path(file_num);
318 if !path.exists() {
319 return Err(LogError::FileNotFound(format!(
320 "Log file not found: {}",
321 path.display()
322 )));
323 }
324
325 let mut handle = FileHandle::new(file_num);
326
327 // Open the file.
328 let file = if self.read_only {
329 File::open(&path)?
330 } else {
331 OpenOptions::new().read(true).write(true).open(&path)?
332 };
333
334 // Read and validate the header.
335 let log_version = self.read_and_validate_header(&file, file_num)?;
336
337 // Initialize the handle.
338 handle.init(file, log_version);
339
340 let handle = Arc::new(handle);
341
342 // Insert into the LRU cache (evicts the least-recently-used entry when
343 // the cache is at capacity, mirroring FileHandleCache eviction).
344 cache.put(file_num, handle.clone());
345 self.n_file_opens.fetch_add(1, Ordering::Relaxed);
346
347 Ok(handle)
348 }
349
350 /// Reads and validates the file header.
351 ///
352 /// Handles both v2 (32-byte) and v3 (36-byte) files:
353 ///
354 /// - Reads up to `FILE_HEADER_SIZE` (36) bytes, capped at the actual file
355 /// length. For a v2 file the 32-byte header is followed by entry bytes
356 /// that are irrelevant; `FileHeader::read_from` stops consuming after
357 /// 32 bytes when it sees `log_version == 2`.
358 /// - A v2 header-only file (exactly 32 bytes) is read cleanly without
359 /// over-reading.
360 fn read_and_validate_header(
361 &self,
362 file: &File,
363 file_num: u32,
364 ) -> Result<u32> {
365 // Clamp to actual file length so we never over-read a legacy v2
366 // header-only file (exactly 32 bytes).
367 let file_len = file.metadata()?.len() as usize;
368 let read_size = FILE_HEADER_SIZE.min(file_len);
369 let mut header_buf = vec![0u8; read_size];
370 crate::posio::read_exact_at(file, &mut header_buf, 0)?;
371
372 // Parse header — read_from branches on version.
373 let mut cursor = std::io::Cursor::new(header_buf);
374 let header = FileHeader::read_from(&mut cursor)?;
375
376 // Validate
377 header.validate(file_num)
378 }
379
380 /// Returns the on-disk header size (= first-entry byte offset) for a
381 /// given log file.
382 ///
383 /// Opens (or cache-hits) the file to read its `log_version`, then
384 /// returns `FileHeader::on_disk_size(version)`:
385 ///
386 /// - v2 file → 32 bytes → first entry at offset 32
387 /// - v3 file → 36 bytes → first entry at offset 36
388 ///
389 /// Use this whenever computing the "first entry offset" for an
390 /// **existing** file instead of the bare `FILE_HEADER_SIZE` constant.
391 pub fn file_header_size_for(&self, file_num: u32) -> Result<usize> {
392 let handle = self.get_file_handle(file_num)?;
393 Ok(on_disk_size(handle.log_version()))
394 }
395
396 /// Creates a new log file with the given file number.
397 ///
398 /// Writes the file header with a link to the previous file.
399 pub fn create_file(&self, file_num: u32) -> Result<Arc<FileHandle>> {
400 let _guard = self
401 .file_latch
402 .acquire()
403 .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
404 self.create_file_internal(file_num)
405 }
406
407 /// Flips to the next log file.
408 ///
409 /// Called when the current file reaches its maximum size.
410 pub fn flip_file(&self) -> Result<u32> {
411 let _guard = self
412 .file_latch
413 .acquire()
414 .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
415
416 let current = self.current_file_num.load(Ordering::Acquire);
417 let next = current + 1;
418
419 // Save last LSN for current file
420 let last_lsn =
421 Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire));
422 if !last_lsn.is_null() {
423 self.per_file_last_lsn.write().insert(current, last_lsn);
424 }
425
426 // Create next file (note: create_file_internal doesn't acquire the latch)
427 self.create_file_internal(next)?;
428
429 // Update current file number
430 self.current_file_num.store(next, Ordering::Release);
431
432 // Update next available LSN to point to start of new file
433 self.next_available_lsn.store(
434 Lsn::new(next, first_log_entry_offset()).as_u64(),
435 Ordering::Release,
436 );
437
438 Ok(next)
439 }
440
441 /// Internal helper to create a file without acquiring the file latch.
442 fn create_file_internal(&self, file_num: u32) -> Result<Arc<FileHandle>> {
443 if self.read_only {
444 return Err(LogError::WriteFailed(
445 "Cannot create file in read-only mode".to_string(),
446 ));
447 }
448
449 let path = self.file_path(file_num);
450
451 // Create the file
452 let mut file = OpenOptions::new()
453 .read(true)
454 .write(true)
455 .create_new(true)
456 .open(&path)?;
457
458 // Determine last entry offset in previous file
459 let last_entry_offset = if file_num > 0 {
460 self.per_file_last_lsn
461 .read()
462 .get(&(file_num - 1))
463 .map(|lsn| lsn.file_offset())
464 .unwrap_or(0)
465 } else {
466 0
467 };
468
469 // Write the header
470 let header = FileHeader::new(file_num, last_entry_offset);
471 header.write_to(&mut file)?;
472 file.flush()?;
473 file.sync_all()?;
474
475 // C-1 (2026 audit F-3.1 / 2026 audit 1-G):
476 // After fsync-ing the new file, fsync the parent directory so the
477 // directory entry itself is durable. Without this a power-loss between
478 // file creation and the next directory write loses the file entirely.
479 // Cross-platform: real dir-fsync on Unix; best-effort on Windows
480 // (directory handle needs FILE_FLAG_BACKUP_SEMANTICS; NTFS journals
481 // the entry). See `crate::posio::sync_dir`.
482 crate::posio::sync_dir(&self.env_dir)?;
483
484 // Create handle
485 let mut handle = FileHandle::new(file_num);
486 handle.init(file, LOG_VERSION);
487
488 let handle = Arc::new(handle);
489
490 // Insert into the LRU cache.
491 self.file_cache.lock().put(file_num, handle.clone());
492
493 Ok(handle)
494 }
495
496 /// Deletes a log file.
497 ///
498 /// Used by the cleaner to remove old log files.
499 pub fn delete_file(&self, file_num: u32) -> Result<()> {
500 if self.read_only {
501 return Err(LogError::WriteFailed(
502 "Cannot delete file in read-only mode".to_string(),
503 ));
504 }
505
506 // Remove from cache.
507 self.file_cache.lock().pop(&file_num);
508
509 // Delete the file
510 let path = self.file_path(file_num);
511 if path.exists() {
512 fs::remove_file(&path)?;
513 }
514
515 Ok(())
516 }
517
518 /// Clears the file handle cache.
519 pub fn clear_cache(&self) {
520 self.file_cache.lock().clear();
521 }
522
523 /// Physically truncate log file `file_num` to `offset` bytes (JE
524 /// `FileManager.truncateSingleFile`, FileManager.java:2345). Used at
525 /// recovery to remove a torn / half-written trailing entry so it cannot
526 /// be misread on a later scan. Evicts the cached handle first so a stale
527 /// open handle does not see the old length.
528 pub fn truncate_single_file(
529 &self,
530 file_num: u32,
531 offset: u64,
532 ) -> Result<()> {
533 if self.read_only {
534 return Err(LogError::WriteFailed(
535 "Cannot truncate file in read-only mode".to_string(),
536 ));
537 }
538 // Drop any cached handle so the next open sees the truncated length.
539 self.file_cache.lock().pop(&file_num);
540 let path = self.file_path(file_num);
541 if path.exists() {
542 let f = fs::OpenOptions::new().write(true).open(&path)?;
543 f.set_len(offset)?;
544 f.sync_all()?;
545 }
546 Ok(())
547 }
548
549 /// Flip the invisible bit (flags 0x10) on each LSN's log-entry header,
550 /// in file order, WITHOUT recomputing the checksum.
551 ///
552 /// Port of JE `FileManager.makeInvisible` (called from
553 /// `RollbackTracker.setInvisible`). The invisible bit is excluded from the
554 /// CRC at read time (cloaked, see `LogEntryHeader.turnOffInvisible` /
555 /// `log_manager` checksum path), so flipping it in place is a single-byte
556 /// `pwrite` per entry. The flags byte is at `file_offset + FLAGS_OFFSET`
557 /// (offset 5) of each entry.
558 ///
559 /// Caller must `force` the affected files afterwards for durability
560 /// (JE `RollbackTracker.recoveryEndFsyncInvisible`).
561 pub fn make_invisible(&self, file_num: u32, offsets: &[u32]) -> Result<()> {
562 if self.read_only {
563 return Err(LogError::WriteFailed(
564 "Cannot make entries invisible in read-only mode".to_string(),
565 ));
566 }
567 if offsets.is_empty() {
568 return Ok(());
569 }
570 let path = self.file_path(file_num);
571 if !path.exists() {
572 return Ok(());
573 }
574 // Drop any cached handle so the bit flip is observed on the next read.
575 self.file_cache.lock().pop(&file_num);
576 let file = OpenOptions::new().read(true).write(true).open(&path)?;
577 // FLAGS_OFFSET within an entry header is 5 (checksum[0..4], type[4],
578 // flags[5]).
579 const FLAGS_OFFSET: u64 = 5;
580 const INVISIBLE_MASK: u8 = 0x10;
581 for &off in offsets {
582 let flags_pos = off as u64 + FLAGS_OFFSET;
583 let mut byte = [0u8; 1];
584 Self::pread_exact(&file, flags_pos, &mut byte)?;
585 byte[0] |= INVISIBLE_MASK;
586 Self::pwrite_exact(&file, flags_pos, &byte)?;
587 }
588 Ok(())
589 }
590
591 /// fsync the given set of log files (JE `FileManager.force`). Used after
592 /// `make_invisible` to make the rollback's invisible bits durable so a
593 /// crash mid-rollback does not re-apply rolled-back entries.
594 pub fn force(&self, file_nums: &[u32]) -> Result<()> {
595 if self.read_only {
596 return Ok(());
597 }
598 for &file_num in file_nums {
599 let path = self.file_path(file_num);
600 if path.exists() {
601 let f = OpenOptions::new().write(true).open(&path)?;
602 f.sync_all()?;
603 }
604 }
605 Ok(())
606 }
607
608 #[cfg(unix)]
609 fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
610 use std::os::unix::fs::FileExt;
611 file.read_exact_at(buf, offset)?;
612 Ok(())
613 }
614
615 #[cfg(unix)]
616 fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
617 use std::os::unix::fs::FileExt;
618 file.write_all_at(buf, offset)?;
619 Ok(())
620 }
621
622 #[cfg(not(unix))]
623 fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
624 use std::io::{Read, Seek, SeekFrom};
625 let mut f = file.try_clone()?;
626 f.seek(SeekFrom::Start(offset))?;
627 f.read_exact(buf)?;
628 Ok(())
629 }
630
631 #[cfg(not(unix))]
632 fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
633 use std::io::{Seek, SeekFrom, Write};
634 let mut f = file.try_clone()?;
635 f.seek(SeekFrom::Start(offset))?;
636 f.write_all(buf)?;
637 Ok(())
638 }
639
640 /// Truncate the log at (`file_num`, `offset`): truncate `file_num` to
641 /// `offset` and delete every higher-numbered file, in descending order to
642 /// avoid a log-entry gap (JE `FileManager.truncateLog`, FileManager.java:2374,
643 /// SR [#19463]). If `offset == 0` the file header itself is gone, so the
644 /// whole file is deleted too.
645 pub fn truncate_log(&self, file_num: u32, offset: u64) -> Result<()> {
646 if self.read_only {
647 return Err(LogError::WriteFailed(
648 "Cannot truncate log in read-only mode".to_string(),
649 ));
650 }
651 let last = self.get_last_file_num()?.unwrap_or(file_num);
652 let mut i = last as i64;
653 while i >= file_num as i64 {
654 let fnum = i as u32;
655 if self.file_path(fnum).exists() {
656 if fnum == file_num {
657 self.truncate_single_file(fnum, offset)?;
658 if offset != 0 {
659 i -= 1;
660 continue;
661 }
662 }
663 self.delete_file(fnum)?;
664 }
665 i -= 1;
666 }
667 Ok(())
668 }
669
670 /// Writes `data` to the current log file at the given file offset.
671 ///
672 /// `writeToFile()`. The caller must supply the exact file-level byte
673 /// offset at which `data` should be written (i.e. `firstLsn.fileOffset`
674 /// in terms). After a successful write the method checks whether the
675 /// file has grown past `max_file_size`; if so it calls `flip_file()` and
676 /// returns the new file number, otherwise it returns the current one.
677 ///
678 /// # Arguments
679 /// * `data` - The raw bytes to append (header + payload).
680 /// * `file_offset` - Byte offset within the file at which to write.
681 ///
682 /// # Returns
683 /// The file number that was actually written to.
684 /// Writes `data` at `file_offset` within log file `file_num`.
685 ///
686 /// JE faithfulness: JE `FileManager.writeLogBuffer` uses
687 /// `fullBuffer.getFirstLsn()` to determine which file to write to, not
688 /// `currentFileNum`. This method mirrors that by accepting an explicit
689 /// `file_num` parameter so `write_dirty` and `fill_flush_pending` can
690 /// write dirty buffers to the file their `first_lsn` belongs to.
691 ///
692 /// The auto-flip (check `file_len >= max_file_size` and call `flip_file`)
693 /// has been removed: file flips are managed exclusively by
694 /// `LogManager::log_internal` via the `flipped` flag and
695 /// `get_write_buffer`/`sync_log_end_and_finish_file`. Auto-flip in this
696 /// method would race with the explicit flip and double-create files.
697 pub fn write_buffer_to_file(
698 &self,
699 file_num: u32,
700 data: &[u8],
701 file_offset: u64,
702 ) -> Result<()> {
703 if self.read_only {
704 return Err(LogError::WriteFailed(
705 "Cannot write in read-only mode".to_string(),
706 ));
707 }
708
709 // Obtain (or create) the file handle under `file_latch`.
710 //
711 // We MUST hold `file_latch` for the entire exists-check → get/create
712 // sequence to avoid a TOCTOU race:
713 //
714 // Thread A: inside create_file_internal (created empty file, writing
715 // header but not done yet)
716 // Thread B: file_path.exists()=true → get_file_handle → tries to
717 // read the header from an empty file → "failed to fill
718 // whole buffer" (UnexpectedEof)
719 //
720 // Holding file_latch serialises creation and subsequent opens so that
721 // Thread B waits until Thread A's create_file_internal (which also
722 // holds file_latch) has written and fsynced the full header.
723 let handle = {
724 let _guard = self
725 .file_latch
726 .acquire()
727 .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
728
729 if self.file_path(file_num).exists() {
730 self.get_file_handle(file_num)?
731 } else {
732 // create_file_internal (called here directly, since we already
733 // hold file_latch) creates the file, writes the header, fsyncs.
734 self.create_file_internal(file_num)?
735 }
736 };
737
738 {
739 let mut guard = handle.acquire()?;
740 guard.write_at(file_offset, data)?;
741 }
742
743 self.n_sequential_writes.fetch_add(1, Ordering::Relaxed);
744 self.n_sequential_write_bytes
745 .fetch_add(data.len() as u64, Ordering::Relaxed);
746
747 Ok(())
748 }
749
750 /// Writes `data` at `file_offset` within the CURRENT log file.
751 ///
752 /// For new entries written by `LogManager::log_internal` when the entry is
753 /// too large for the buffer pool (temp-buffer path). The current file is
754 /// always correct here because `set_last_position` has already advanced
755 /// `current_file_num` to the file that holds `current_lsn`.
756 ///
757 /// Callers that write data belonging to a SPECIFIC file (dirty buffer
758 /// flush in `write_dirty` / `fill_flush_pending`) must use
759 /// `write_buffer_to_file` instead to avoid writing old data to the
760 /// wrong file after a flip.
761 pub fn write_buffer(&self, data: &[u8], file_offset: u64) -> Result<u32> {
762 if self.read_only {
763 return Err(LogError::WriteFailed(
764 "Cannot write in read-only mode".to_string(),
765 ));
766 }
767
768 let file_num = self.current_file_num.load(Ordering::Acquire);
769 self.write_buffer_to_file(file_num, data, file_offset)?;
770 Ok(file_num)
771 }
772
773 /// Reads bytes from a log file at a given offset.
774 ///
775 ///
776 ///
777 /// # Arguments
778 /// * `file_num` - The log file number to read from.
779 /// * `offset` - Byte offset within the file.
780 /// * `buf` - Output buffer; filled with as many bytes as available
781 /// (may be less than `buf.len()` at end of file).
782 ///
783 /// # Returns
784 /// The number of bytes actually read.
785 pub fn read_from_file(
786 &self,
787 file_num: u32,
788 offset: u64,
789 buf: &mut [u8],
790 ) -> Result<usize> {
791 let handle = self.get_file_handle(file_num)?;
792 let mut guard = handle.acquire()?;
793 let n = guard.read_at(offset, buf)?;
794 self.n_sequential_reads.fetch_add(1, Ordering::Relaxed);
795 self.n_sequential_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
796 Ok(n)
797 }
798
799 /// Reads bytes from a log file at a given offset, counted as a random
800 /// (point-lookup) read rather than a sequential scan read.
801 ///
802 /// Used by `LogManager::read_at_lsn` for in-flight log reads.
803 pub fn read_from_file_random(
804 &self,
805 file_num: u32,
806 offset: u64,
807 buf: &mut [u8],
808 ) -> Result<usize> {
809 let handle = self.get_file_handle(file_num)?;
810 let mut guard = handle.acquire()?;
811 let n = guard.read_at(offset, buf)?;
812 self.n_random_reads.fetch_add(1, Ordering::Relaxed);
813 self.n_random_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
814 Ok(n)
815 }
816
817 /// Returns the length of a log file in bytes.
818 pub fn get_file_length(&self, file_num: u32) -> Result<u64> {
819 let path = self.file_path(file_num);
820 if !path.exists() {
821 return Err(LogError::FileNotFound(format!(
822 "Log file not found: {}",
823 path.display()
824 )));
825 }
826 Ok(path.metadata()?.len())
827 }
828
829 /// Memory-maps a log file for read-only sequential access.
830 ///
831 /// Returns a `Mmap` covering the entire file. The OS handles page-in
832 /// lazily with automatic sequential read-ahead, eliminating all per-entry
833 /// `pread64` syscalls during recovery scanning.
834 ///
835 /// # Safety
836 /// The caller must not hold a mutable reference into the mapped memory
837 /// while other processes write to the file. During recovery, log files
838 /// are read-only, making this safe.
839 pub fn mmap_file(&self, file_num: u32) -> Result<Mmap> {
840 // Never mmap the current write file. It can be appended to
841 // concurrently (pwrite64 on the log-writer thread) while a
842 // disk-ordered cursor reads it; `memmap2` requires that a mapped file
843 // is not modified for the lifetime of the mapping, so mapping the live
844 // write file would be undefined behaviour. Callers (e.g. the
845 // file-manager log scanner) fall back to positioned `pread` reads,
846 // which are safe under concurrent appends. Complete (sealed) files are
847 // never written again and are safe to map.
848 if file_num == self.get_current_file_num() {
849 return Err(LogError::Io(std::io::Error::other(format!(
850 "refusing to mmap the current write file {file_num} \
851 (may be concurrently appended); use pread fallback"
852 ))));
853 }
854 let path = self.file_path(file_num);
855 let file = File::open(&path).map_err(|e| {
856 LogError::FileNotFound(format!(
857 "Cannot open {:?} for mmap: {}",
858 path, e
859 ))
860 })?;
861 // SAFETY: `file_num` is not the current write file (checked above), so
862 // it is a sealed log file whose bytes do not change for the lifetime
863 // of the mapping.
864 let mmap = unsafe { Mmap::map(&file) }.map_err(|e| {
865 LogError::Io(std::io::Error::other(format!(
866 "mmap {:?}: {}",
867 path, e
868 )))
869 })?;
870 Ok(mmap)
871 }
872
873 /// Returns current I/O statistics for this FileManager.
874 pub fn get_io_stats(&self) -> FileManagerIoStats {
875 FileManagerIoStats {
876 n_file_opens: self.n_file_opens.load(Ordering::Relaxed),
877 n_sequential_reads: self.n_sequential_reads.load(Ordering::Relaxed),
878 n_sequential_read_bytes: self
879 .n_sequential_read_bytes
880 .load(Ordering::Relaxed),
881 n_sequential_writes: self
882 .n_sequential_writes
883 .load(Ordering::Relaxed),
884 n_sequential_write_bytes: self
885 .n_sequential_write_bytes
886 .load(Ordering::Relaxed),
887 n_random_reads: self.n_random_reads.load(Ordering::Relaxed),
888 n_random_read_bytes: self
889 .n_random_read_bytes
890 .load(Ordering::Relaxed),
891 }
892 }
893
894 /// Fsyncs the current log file to stable storage and removes it from the
895 /// file-handle cache, making the old file handle eligible for GC.
896 ///
897 /// JE faithfulness (Part-3, DRIFT-3/7): mirrors
898 /// `FileManager.syncLogEndAndFinishFile()` which calls `syncLogEnd()` then
899 /// `endOfLog.close()`. Called by `LogBufferPool.getWriteBuffer` when
900 /// `flippedFile=true`, under the LWL, BEFORE `advanceLsn` advances the
901 /// LSN bookkeeping to the new file. This establishes the JE invariant
902 /// that the OLD file is durably closed before any entry is written to the
903 /// NEW file.
904 ///
905 /// References:
906 /// - JE `FileManager.syncLogEndAndFinishFile` (line 2077)
907 /// - JE `LogBufferPool.getWriteBuffer` (called after `bumpAndWriteDirty`
908 /// when `flippedFile=true`)
909 pub fn sync_log_end_and_finish_file(&self) -> Result<()> {
910 self.sync_log_end()?;
911 // Evict the current (old) file from the LRU cache so its OS file
912 // descriptor is released promptly — JE `endOfLog.close()`.
913 let file_num = self.current_file_num.load(Ordering::Acquire);
914 let mut cache = self.file_cache.lock();
915 cache.pop(&file_num);
916 Ok(())
917 }
918
919 /// Fsyncs the current log file to stable storage.
920 ///
921 /// JE: `FileManager.syncLogEnd()` (called from `syncLogEndAndFinishFile`).
922 pub fn sync_log_end(&self) -> Result<()> {
923 if self.read_only {
924 return Ok(());
925 }
926
927 let file_num = self.current_file_num.load(Ordering::Acquire);
928 let path = self.file_path(file_num);
929
930 if !path.exists() {
931 // Nothing to sync yet.
932 return Ok(());
933 }
934
935 let handle = self.get_file_handle(file_num)?;
936 let mut guard = handle.acquire()?;
937 // Use fdatasync (sync_data) — only log data must be durable here,
938 // not file metadata. uses FileChannel.force(false) for this.
939 guard.sync_data()?;
940 Ok(())
941 }
942
943 /// Closes the file manager, releasing all resources.
944 pub fn close(&self) -> Result<()> {
945 self.clear_cache();
946
947 // Release the lock file
948 if let Some(lock_file) = self.lock_file.write().take() {
949 {
950 #[allow(unused_imports)]
951 use fs2::FileExt;
952 let _ = lock_file.unlock();
953 }
954 drop(lock_file);
955 }
956
957 Ok(())
958 }
959}
960
961impl Drop for FileManager {
962 fn drop(&mut self) {
963 let _ = self.close();
964 }
965}
966
967/// Snapshot of FileManager I/O statistics.
968///
969/// FILEMGR_FILE_OPENS, FILEMGR_SEQUENTIAL_READS/WRITES,
970/// FILEMGR_RANDOM_READS etc.
971#[derive(Debug, Clone, Default)]
972pub struct FileManagerIoStats {
973 /// Number of log files opened (LRU cache miss).
974 pub n_file_opens: u64,
975 /// Number of sequential read operations (recovery scan).
976 pub n_sequential_reads: u64,
977 /// Total bytes read sequentially.
978 pub n_sequential_read_bytes: u64,
979 /// Number of sequential write operations.
980 pub n_sequential_writes: u64,
981 /// Total bytes written sequentially.
982 pub n_sequential_write_bytes: u64,
983 /// Number of random (point-lookup) read operations.
984 pub n_random_reads: u64,
985 /// Total bytes from random read operations.
986 pub n_random_read_bytes: u64,
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992 use tempfile::TempDir;
993
994 #[test]
995 fn test_format_parse_file_number() {
996 assert_eq!(format_file_number(0), "00000000");
997 assert_eq!(format_file_number(42), "0000002a");
998 assert_eq!(format_file_number(255), "000000ff");
999 assert_eq!(format_file_number(0x12345678), "12345678");
1000
1001 assert_eq!(parse_file_number("00000000.ndb"), Some(0));
1002 assert_eq!(parse_file_number("0000002a.ndb"), Some(42));
1003 assert_eq!(parse_file_number("000000ff.ndb"), Some(255));
1004 assert_eq!(parse_file_number("12345678.ndb"), Some(0x12345678));
1005
1006 assert_eq!(parse_file_number("invalid.ndb"), None);
1007 assert_eq!(parse_file_number("00000000.txt"), None);
1008 }
1009
1010 #[test]
1011 fn test_file_manager_create() {
1012 let temp_dir = TempDir::new().unwrap();
1013 let manager =
1014 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1015
1016 assert_eq!(manager.get_current_file_num(), 0);
1017 assert_eq!(manager.get_first_file_num().unwrap(), None);
1018 }
1019
1020 #[test]
1021 fn test_file_manager_create_file() {
1022 let temp_dir = TempDir::new().unwrap();
1023 let manager =
1024 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1025
1026 let handle = manager.create_file(0).unwrap();
1027 assert_eq!(handle.file_num(), 0);
1028 assert_eq!(handle.log_version(), LOG_VERSION);
1029
1030 // File should exist
1031 let path = manager.file_path(0);
1032 assert!(path.exists());
1033
1034 // Should be able to get it again from cache
1035 let handle2 = manager.get_file_handle(0).unwrap();
1036 assert_eq!(handle2.file_num(), 0);
1037 }
1038
1039 #[test]
1040 fn test_file_manager_list_files() {
1041 let temp_dir = TempDir::new().unwrap();
1042 let manager =
1043 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1044
1045 manager.create_file(0).unwrap();
1046 manager.create_file(2).unwrap();
1047 manager.create_file(1).unwrap();
1048
1049 let files = manager.list_file_numbers().unwrap();
1050 assert_eq!(files, vec![0, 1, 2]);
1051
1052 assert_eq!(manager.get_first_file_num().unwrap(), Some(0));
1053 assert_eq!(manager.get_last_file_num().unwrap(), Some(2));
1054 }
1055
1056 #[test]
1057 fn test_file_manager_flip_file() {
1058 let temp_dir = TempDir::new().unwrap();
1059
1060 {
1061 let manager =
1062 FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1063 .unwrap();
1064
1065 // Create initial file
1066 manager.create_file(0).unwrap();
1067
1068 // Set current file
1069 manager.current_file_num.store(0, Ordering::Release);
1070 manager
1071 .last_used_lsn
1072 .store(Lsn::new(0, 1000).as_u64(), Ordering::Release);
1073
1074 // Flip to next file
1075 let next = manager.flip_file().unwrap();
1076 assert_eq!(next, 1);
1077 assert_eq!(manager.get_current_file_num(), 1);
1078
1079 // Should have created file 1
1080 let files = manager.list_file_numbers().unwrap();
1081 assert!(files.contains(&1));
1082 } // manager dropped here, releasing lock
1083 }
1084
1085 #[test]
1086 fn test_environment_locking() {
1087 let temp_dir = TempDir::new().unwrap();
1088
1089 // First manager locks the environment
1090 let _manager1 =
1091 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1092
1093 // Second manager should fail to lock
1094 let result = FileManager::new(temp_dir.path(), false, 10_000_000, 100);
1095 assert!(result.is_err());
1096 match result {
1097 Err(LogError::EnvironmentLocked(_)) => (),
1098 _ => panic!("Expected EnvironmentLocked error"),
1099 }
1100 }
1101
1102 #[test]
1103 fn test_nonexistent_directory_fails() {
1104 let result = FileManager::new(
1105 "/tmp/does_not_exist_noxu_xyz",
1106 false,
1107 10_000_000,
1108 100,
1109 );
1110 assert!(result.is_err());
1111 match result {
1112 Err(LogError::InvalidDirectory(_)) => (),
1113 _ => panic!("Expected InvalidDirectory error"),
1114 }
1115 }
1116
1117 #[test]
1118 fn test_get_file_handle_missing_file_fails() {
1119 let temp_dir = TempDir::new().unwrap();
1120 let manager =
1121 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1122
1123 let result = manager.get_file_handle(99);
1124 assert!(result.is_err());
1125 match result {
1126 Err(LogError::FileNotFound(_)) => (),
1127 _ => panic!("Expected FileNotFound error"),
1128 }
1129 }
1130
1131 #[test]
1132 fn test_delete_file() {
1133 let temp_dir = TempDir::new().unwrap();
1134 let manager =
1135 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1136
1137 manager.create_file(0).unwrap();
1138 assert!(manager.file_path(0).exists());
1139
1140 manager.delete_file(0).unwrap();
1141 assert!(!manager.file_path(0).exists());
1142 assert_eq!(manager.list_file_numbers().unwrap(), Vec::<u32>::new());
1143 }
1144
1145 #[test]
1146 fn test_delete_nonexistent_file_is_ok() {
1147 let temp_dir = TempDir::new().unwrap();
1148 let manager =
1149 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1150
1151 // Deleting a file that does not exist should not return an error.
1152 assert!(manager.delete_file(42).is_ok());
1153 }
1154
1155 #[test]
1156 fn test_set_last_position() {
1157 let temp_dir = TempDir::new().unwrap();
1158 let manager =
1159 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1160
1161 let next = Lsn::new(3, 1024);
1162 let last = Lsn::new(2, 512);
1163 manager.set_last_position(next, last);
1164
1165 assert_eq!(manager.get_next_available_lsn(), next);
1166 assert_eq!(manager.get_last_used_lsn(), last);
1167 assert_eq!(manager.get_current_file_num(), 3);
1168 }
1169
1170 #[test]
1171 fn test_read_only_create_file_fails() {
1172 let temp_dir = TempDir::new().unwrap();
1173 // Create a writable manager first to avoid the lock conflict.
1174 {
1175 let _mgr =
1176 FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1177 .unwrap();
1178 } // lock released on drop
1179
1180 // Read-only mode must not create files.
1181 let ro_mgr =
1182 FileManager::new(temp_dir.path(), true, 10_000_000, 100).unwrap();
1183 let result = ro_mgr.create_file(0);
1184 assert!(result.is_err());
1185 }
1186
1187 #[test]
1188 fn test_first_and_last_file_num_empty() {
1189 let temp_dir = TempDir::new().unwrap();
1190 let manager =
1191 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1192
1193 assert_eq!(manager.get_first_file_num().unwrap(), None);
1194 assert_eq!(manager.get_last_file_num().unwrap(), None);
1195 }
1196
1197 #[test]
1198 fn test_clear_cache() {
1199 let temp_dir = TempDir::new().unwrap();
1200 let manager =
1201 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1202
1203 manager.create_file(0).unwrap();
1204 // Clearing the cache should not panic or corrupt state.
1205 manager.clear_cache();
1206
1207 // After clearing, get_file_handle must re-open the file.
1208 let handle = manager.get_file_handle(0).unwrap();
1209 assert_eq!(handle.file_num(), 0);
1210 }
1211
1212 /// C-1 regression: parent directory must be fsynced after creating each
1213 /// new log file so the directory entry is durable across a power loss.
1214 ///
1215 /// This test verifies that `create_file_internal` completes without error
1216 /// (which confirms the dir-open + sync_all code path runs), and that
1217 /// the created file is visible in a directory listing performed after the
1218 /// call returns — i.e. the same state recovery would see after a restart.
1219 #[test]
1220 fn test_parent_dir_fsynced_after_file_create() {
1221 let temp_dir = TempDir::new().unwrap();
1222 let manager =
1223 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1224
1225 // Creating file 0 must succeed (includes parent-dir fsync).
1226 manager.create_file(0).unwrap();
1227
1228 // The file must be present in the directory listing — the same check
1229 // recovery performs when scanning for log files to replay.
1230 let listed = manager.list_file_numbers().unwrap();
1231 assert_eq!(
1232 listed,
1233 vec![0],
1234 "file 0 must be visible in dir listing after create"
1235 );
1236
1237 // Create a second file (flip) to exercise the path for file_num > 0.
1238 manager.flip_file().unwrap();
1239 let listed2 = manager.list_file_numbers().unwrap();
1240 assert!(listed2.contains(&1), "file 1 must be visible after flip");
1241 }
1242}