noxu_log/file_manager.rs
1//! File manager for log files.
2//!
3//!
4//! The FileManager presents the abstraction of one contiguous log file,
5//! managing the actual on-disk log files, file handles, and LSN allocation.
6
7use crate::error::{LogError, Result};
8use crate::file_handle::FileHandle;
9use crate::file_header::{
10 FILE_HEADER_SIZE, FileHeader, LOG_VERSION, on_disk_size,
11};
12use hashbrown::HashMap;
13use memmap2::Mmap;
14use noxu_latch::ExclusiveLatch;
15use noxu_sync::{Mutex, RwLock};
16use noxu_util::lsn::Lsn;
17use std::fs::{self, File, OpenOptions};
18use std::io::Write;
19use std::num::NonZeroUsize;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
23
24/// File extension for noxu database log files.
25pub const LOG_FILE_EXTENSION: &str = ".ndb";
26
27/// Lock file name for environment locking.
28pub const LOCK_FILE_NAME: &str = "noxu.lck";
29
30/// File mode for opening log files.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum FileMode {
33 /// Read-only access.
34 ReadOnly,
35 /// Read-write access.
36 ReadWrite,
37}
38
39/// Returns the byte offset of the first log entry in a **new** log file.
40///
41/// New files are always written as the current `LOG_VERSION`, so this
42/// returns `FILE_HEADER_SIZE` (36 for v3). When reading an **existing**
43/// file use [`FileManager::file_header_size_for`] to account for v2 files
44/// whose first entry is at offset 32.
45#[inline]
46pub fn first_log_entry_offset() -> u32 {
47 FILE_HEADER_SIZE as u32
48}
49
50/// Formats a file number as an 8-digit lowercase hex string.
51///
52/// Example: 42 -> "0000002a"
53fn format_file_number(file_num: u32) -> String {
54 format!("{:08x}", file_num)
55}
56
57/// Parses a file number from a hex string filename.
58///
59/// Example: "0000002a.ndb" -> 42
60fn parse_file_number(filename: &str) -> Option<u32> {
61 let stem = filename.strip_suffix(LOG_FILE_EXTENSION)?;
62 u32::from_str_radix(stem, 16).ok()
63}
64
65/// LRU cache of open file handles.
66///
67/// The key is the log file
68/// number; values are `Arc`-wrapped so callers may hold a reference after the
69/// cache evicts the entry (matching `FileHandle` reference-counting
70/// pattern). Capacity is configurable (default: `ENV_RUN_CLEANER_THREADS
71/// + 2`; Noxu default: 10).
72type FileHandleCache = lru::LruCache<u32, Arc<FileHandle>>;
73
74/// Manages log files in the environment directory.
75pub struct FileManager {
76 /// Environment directory path.
77 env_dir: PathBuf,
78 /// Whether the environment is read-only.
79 read_only: bool,
80 /// Maximum size of a single log file (bytes).
81 max_file_size: u64,
82 /// LRU cache of open file handles.
83 ///
84 /// Protected by `noxu_sync::Mutex` because `lru::LruCache::get()` mutates
85 /// the eviction order, so a shared read lock would not be safe.
86 file_cache: Mutex<FileHandleCache>,
87 /// Current file number being written to.
88 current_file_num: AtomicU32,
89 /// Next available LSN for writing.
90 next_available_lsn: AtomicU64,
91 /// Last LSN that was used in the current file.
92 last_used_lsn: AtomicU64,
93 /// Map of file number to last LSN used in that file (for file headers).
94 per_file_last_lsn: RwLock<HashMap<u32, Lsn>>,
95 /// Latch protecting file creation and file number advancement.
96 file_latch: ExclusiveLatch,
97 /// Lock file handle (for environment locking).
98 lock_file: RwLock<Option<File>>,
99 /// Number of log files opened (cache miss = new file open).
100 pub n_file_opens: AtomicU64,
101 /// Number of sequential read calls.
102 pub n_sequential_reads: AtomicU64,
103 /// Total bytes read sequentially.
104 pub n_sequential_read_bytes: AtomicU64,
105 /// Number of sequential write calls.
106 pub n_sequential_writes: AtomicU64,
107 /// Total bytes written sequentially.
108 pub n_sequential_write_bytes: AtomicU64,
109 /// Number of random (point-lookup) read operations.
110 pub n_random_reads: AtomicU64,
111 /// Total bytes from random read operations.
112 pub n_random_read_bytes: AtomicU64,
113}
114
115impl FileManager {
116 /// Creates a new FileManager.
117 ///
118 /// # Arguments
119 ///
120 /// * `env_dir` - Path to the environment directory
121 /// * `read_only` - Whether to open in read-only mode
122 /// * `max_file_size` - Maximum size of a single log file (bytes)
123 /// * `cache_size` - Maximum number of file handles to cache
124 ///
125 /// # Returns
126 ///
127 /// A new FileManager instance, or an error if the directory is invalid
128 /// or the environment is locked.
129 pub fn new(
130 env_dir: impl AsRef<Path>,
131 read_only: bool,
132 max_file_size: u64,
133 cache_size: usize,
134 ) -> Result<Self> {
135 let env_dir = env_dir.as_ref().to_path_buf();
136
137 // Verify directory exists
138 if !env_dir.exists() {
139 return Err(LogError::InvalidDirectory(format!(
140 "Environment directory does not exist: {}",
141 env_dir.display()
142 )));
143 }
144
145 if !env_dir.is_dir() {
146 return Err(LogError::InvalidDirectory(format!(
147 "Path is not a directory: {}",
148 env_dir.display()
149 )));
150 }
151
152 let capacity = NonZeroUsize::new(cache_size.max(1))
153 .expect("cache_size.max(1) is always >= 1");
154 let manager = FileManager {
155 env_dir,
156 read_only,
157 max_file_size,
158 file_cache: Mutex::new(lru::LruCache::new(capacity)),
159 current_file_num: AtomicU32::new(0),
160 next_available_lsn: AtomicU64::new(
161 Lsn::new(0, first_log_entry_offset()).as_u64(),
162 ),
163 last_used_lsn: AtomicU64::new(noxu_util::lsn::NULL_LSN.as_u64()),
164 per_file_last_lsn: RwLock::new(HashMap::new()),
165 file_latch: ExclusiveLatch::named("file_manager"),
166 lock_file: RwLock::new(None),
167 n_file_opens: AtomicU64::new(0),
168 n_sequential_reads: AtomicU64::new(0),
169 n_sequential_read_bytes: AtomicU64::new(0),
170 n_sequential_writes: AtomicU64::new(0),
171 n_sequential_write_bytes: AtomicU64::new(0),
172 n_random_reads: AtomicU64::new(0),
173 n_random_read_bytes: AtomicU64::new(0),
174 };
175
176 // Lock the environment
177 manager.lock_environment()?;
178
179 Ok(manager)
180 }
181
182 /// Locks the environment to prevent concurrent access.
183 fn lock_environment(&self) -> Result<()> {
184 if self.read_only {
185 // For read-only environments, we don't take an exclusive lock
186 // (in a full implementation, we'd use a shared lock)
187 return Ok(());
188 }
189
190 let lock_path = self.env_dir.join(LOCK_FILE_NAME);
191
192 // Try to create/open the lock file
193 let lock_file = OpenOptions::new()
194 .create(true)
195 .truncate(false)
196 .write(true)
197 .open(&lock_path)?;
198
199 // Try to acquire an exclusive lock.
200 // fs2::FileExt is supported on unix and windows; on other platforms
201 // (e.g. WASM, embedded) we skip the lock — acceptable since those
202 // targets run single-process environments.
203 #[cfg(any(unix, windows))]
204 {
205 use fs2::FileExt;
206 lock_file.try_lock_exclusive().map_err(|_| {
207 LogError::EnvironmentLocked(format!(
208 "Environment is locked by another process: {}",
209 self.env_dir.display()
210 ))
211 })?;
212 }
213
214 *self.lock_file.write() = Some(lock_file);
215
216 Ok(())
217 }
218
219 /// Returns the path to a log file for the given file number.
220 fn file_path(&self, file_num: u32) -> PathBuf {
221 let filename =
222 format!("{}{}", format_file_number(file_num), LOG_FILE_EXTENSION);
223 self.env_dir.join(filename)
224 }
225
226 /// Lists all log file numbers in the environment directory.
227 ///
228 /// Returns the file numbers sorted in ascending order.
229 pub fn list_file_numbers(&self) -> Result<Vec<u32>> {
230 let mut file_nums = Vec::new();
231
232 for entry in fs::read_dir(&self.env_dir)? {
233 let entry = entry?;
234 let filename = entry.file_name();
235 let filename_str = filename.to_string_lossy();
236
237 if let Some(file_num) = parse_file_number(&filename_str) {
238 file_nums.push(file_num);
239 }
240 }
241
242 file_nums.sort_unstable();
243 Ok(file_nums)
244 }
245
246 /// Returns the first (lowest numbered) file, or None if no files exist.
247 pub fn get_first_file_num(&self) -> Result<Option<u32>> {
248 Ok(self.list_file_numbers()?.into_iter().next())
249 }
250
251 /// Returns the last (highest numbered) file, or None if no files exist.
252 pub fn get_last_file_num(&self) -> Result<Option<u32>> {
253 Ok(self.list_file_numbers()?.into_iter().last())
254 }
255
256 /// Returns the configured maximum log file size in bytes.
257 /// Returns true if this FileManager was opened read-only.
258 pub fn is_read_only(&self) -> bool {
259 self.read_only
260 }
261
262 pub fn max_file_size(&self) -> u64 {
263 self.max_file_size
264 }
265
266 /// Returns the environment directory holding the log files.
267 pub fn env_dir(&self) -> &Path {
268 &self.env_dir
269 }
270
271 /// Returns the total size, in bytes, of all `.ndb` log files on disk.
272 ///
273 /// This is the disk-limit "total log size" used by the disk-usage probe.
274 /// JE computes the analogous value in
275 /// `FileProtector.getLogSizeStats()` by summing `activeFiles` (plus the
276 /// last file's length); Noxu has no reserved-file machinery (the cleaner
277 /// deletes files outright rather than parking them as "reserved"), so the
278 /// total is simply the sum of every log file's length — equivalent to
279 /// JE's `activeSize` with `reservedSize == 0`.
280 pub fn total_log_size(&self) -> Result<u64> {
281 let mut total = 0u64;
282 for file_num in self.list_file_numbers()? {
283 // A file may be deleted by the cleaner between listing and stat;
284 // skip it rather than fail the whole probe.
285 if let Ok(len) = self.get_file_length(file_num) {
286 total += len;
287 }
288 }
289 Ok(total)
290 }
291
292 /// Returns the filesystem free (usable) space, in bytes, for the
293 /// environment directory.
294 ///
295 /// JE calls `Cleaner.getDiskFreeSpace()` →
296 /// `FileStoreInfo.getUsableSpace()` (a `statvfs`). Noxu uses
297 /// `fs2::available_space` (also `statvfs`-backed), which reports space
298 /// available to a non-privileged process — the same notion JE uses.
299 pub fn disk_free_space(&self) -> Result<u64> {
300 Ok(fs2::available_space(&self.env_dir)?)
301 }
302
303 /// Returns the current file number being written to.
304 pub fn get_current_file_num(&self) -> u32 {
305 self.current_file_num.load(Ordering::Acquire)
306 }
307
308 /// Returns the next available LSN for writing.
309 pub fn get_next_available_lsn(&self) -> Lsn {
310 Lsn::from_u64(self.next_available_lsn.load(Ordering::Acquire))
311 }
312
313 /// Returns the last used LSN.
314 pub fn get_last_used_lsn(&self) -> Lsn {
315 Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire))
316 }
317
318 /// Sets the end-of-log position.
319 ///
320 /// Called during recovery to set where the log should continue from.
321 pub fn set_last_position(
322 &self,
323 next_available_lsn: Lsn,
324 last_used_lsn: Lsn,
325 ) {
326 self.last_used_lsn.store(last_used_lsn.as_u64(), Ordering::Release);
327 self.per_file_last_lsn
328 .write()
329 .insert(last_used_lsn.file_number(), last_used_lsn);
330 self.next_available_lsn
331 .store(next_available_lsn.as_u64(), Ordering::Release);
332 self.current_file_num
333 .store(next_available_lsn.file_number(), Ordering::Release);
334 }
335
336 /// Gets a file handle for the given file number.
337 ///
338 /// Checks the LRU cache first.
339 /// On a cache miss the file is opened, its header validated, and the
340 /// resulting `Arc<FileHandle>` is inserted — with automatic LRU eviction
341 /// when the cache is at capacity. Because `lru::LruCache::get()` mutates
342 /// the eviction order, the entire lookup+insert is done under a single
343 /// `Mutex` lock, eliminating any TOCTOU race between a cache miss and the
344 /// subsequent insert.
345 pub fn get_file_handle(&self, file_num: u32) -> Result<Arc<FileHandle>> {
346 let mut cache = self.file_cache.lock();
347
348 // Fast path: cache hit — LruCache::get() promotes the entry to MRU.
349 if let Some(handle) = cache.get(&file_num) {
350 return Ok(handle.clone());
351 }
352
353 // Slow path: open the file, validate its header, and insert into cache.
354 let path = self.file_path(file_num);
355 if !path.exists() {
356 return Err(LogError::FileNotFound(format!(
357 "Log file not found: {}",
358 path.display()
359 )));
360 }
361
362 let mut handle = FileHandle::new(file_num);
363
364 // Open the file.
365 let file = if self.read_only {
366 File::open(&path)?
367 } else {
368 OpenOptions::new().read(true).write(true).open(&path)?
369 };
370
371 // Read and validate the header.
372 let log_version = self.read_and_validate_header(&file, file_num)?;
373
374 // Initialize the handle.
375 handle.init(file, log_version);
376
377 let handle = Arc::new(handle);
378
379 // Insert into the LRU cache (evicts the least-recently-used entry when
380 // the cache is at capacity, mirroring FileHandleCache eviction).
381 cache.put(file_num, handle.clone());
382 self.n_file_opens.fetch_add(1, Ordering::Relaxed);
383
384 Ok(handle)
385 }
386
387 /// Reads and validates the file header.
388 ///
389 /// Handles both v2 (32-byte) and v3 (36-byte) files:
390 ///
391 /// - Reads up to `FILE_HEADER_SIZE` (36) bytes, capped at the actual file
392 /// length. For a v2 file the 32-byte header is followed by entry bytes
393 /// that are irrelevant; `FileHeader::read_from` stops consuming after
394 /// 32 bytes when it sees `log_version == 2`.
395 /// - A v2 header-only file (exactly 32 bytes) is read cleanly without
396 /// over-reading.
397 fn read_and_validate_header(
398 &self,
399 file: &File,
400 file_num: u32,
401 ) -> Result<u32> {
402 // Clamp to actual file length so we never over-read a legacy v2
403 // header-only file (exactly 32 bytes).
404 let file_len = file.metadata()?.len() as usize;
405 let read_size = FILE_HEADER_SIZE.min(file_len);
406 let mut header_buf = vec![0u8; read_size];
407 crate::posio::read_exact_at(file, &mut header_buf, 0)?;
408
409 // Parse header — read_from branches on version.
410 let mut cursor = std::io::Cursor::new(header_buf);
411 let header = FileHeader::read_from(&mut cursor)?;
412
413 // Validate
414 header.validate(file_num)
415 }
416
417 /// Returns the on-disk header size (= first-entry byte offset) for a
418 /// given log file.
419 ///
420 /// Opens (or cache-hits) the file to read its `log_version`, then
421 /// returns `FileHeader::on_disk_size(version)`:
422 ///
423 /// - v2 file → 32 bytes → first entry at offset 32
424 /// - v3 file → 36 bytes → first entry at offset 36
425 ///
426 /// Use this whenever computing the "first entry offset" for an
427 /// **existing** file instead of the bare `FILE_HEADER_SIZE` constant.
428 pub fn file_header_size_for(&self, file_num: u32) -> Result<usize> {
429 let handle = self.get_file_handle(file_num)?;
430 Ok(on_disk_size(handle.log_version()))
431 }
432
433 /// Creates a new log file with the given file number.
434 ///
435 /// Writes the file header with a link to the previous file.
436 pub fn create_file(&self, file_num: u32) -> Result<Arc<FileHandle>> {
437 let _guard = self
438 .file_latch
439 .acquire()
440 .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
441 self.create_file_internal(file_num)
442 }
443
444 /// Flips to the next log file.
445 ///
446 /// Called when the current file reaches its maximum size.
447 pub fn flip_file(&self) -> Result<u32> {
448 let _guard = self
449 .file_latch
450 .acquire()
451 .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
452
453 let current = self.current_file_num.load(Ordering::Acquire);
454 let next = current + 1;
455
456 // Save last LSN for current file
457 let last_lsn =
458 Lsn::from_u64(self.last_used_lsn.load(Ordering::Acquire));
459 if !last_lsn.is_null() {
460 self.per_file_last_lsn.write().insert(current, last_lsn);
461 }
462
463 // Create next file (note: create_file_internal doesn't acquire the latch)
464 self.create_file_internal(next)?;
465
466 // Update current file number
467 self.current_file_num.store(next, Ordering::Release);
468
469 // Update next available LSN to point to start of new file
470 self.next_available_lsn.store(
471 Lsn::new(next, first_log_entry_offset()).as_u64(),
472 Ordering::Release,
473 );
474
475 Ok(next)
476 }
477
478 /// Internal helper to create a file without acquiring the file latch.
479 fn create_file_internal(&self, file_num: u32) -> Result<Arc<FileHandle>> {
480 if self.read_only {
481 return Err(LogError::WriteFailed(
482 "Cannot create file in read-only mode".to_string(),
483 ));
484 }
485
486 let path = self.file_path(file_num);
487
488 // Create the file
489 let mut file = OpenOptions::new()
490 .read(true)
491 .write(true)
492 .create_new(true)
493 .open(&path)?;
494
495 // Determine last entry offset in previous file
496 let last_entry_offset = if file_num > 0 {
497 self.per_file_last_lsn
498 .read()
499 .get(&(file_num - 1))
500 .map(|lsn| lsn.file_offset())
501 .unwrap_or(0)
502 } else {
503 0
504 };
505
506 // Write the header
507 let header = FileHeader::new(file_num, last_entry_offset);
508 header.write_to(&mut file)?;
509 file.flush()?;
510 file.sync_all()?;
511
512 // C-1 (2026 audit F-3.1 / 2026 audit 1-G):
513 // After fsync-ing the new file, fsync the parent directory so the
514 // directory entry itself is durable. Without this a power-loss between
515 // file creation and the next directory write loses the file entirely.
516 // Cross-platform: real dir-fsync on Unix; best-effort on Windows
517 // (directory handle needs FILE_FLAG_BACKUP_SEMANTICS; NTFS journals
518 // the entry). See `crate::posio::sync_dir`.
519 crate::posio::sync_dir(&self.env_dir)?;
520
521 // Create handle
522 let mut handle = FileHandle::new(file_num);
523 handle.init(file, LOG_VERSION);
524
525 let handle = Arc::new(handle);
526
527 // Insert into the LRU cache.
528 self.file_cache.lock().put(file_num, handle.clone());
529
530 Ok(handle)
531 }
532
533 /// Deletes a log file.
534 ///
535 /// Used by the cleaner to remove old log files.
536 pub fn delete_file(&self, file_num: u32) -> Result<()> {
537 if self.read_only {
538 return Err(LogError::WriteFailed(
539 "Cannot delete file in read-only mode".to_string(),
540 ));
541 }
542
543 // Remove from cache.
544 self.file_cache.lock().pop(&file_num);
545
546 // Delete the file
547 let path = self.file_path(file_num);
548 if path.exists() {
549 fs::remove_file(&path)?;
550 }
551
552 Ok(())
553 }
554
555 /// Clears the file handle cache.
556 pub fn clear_cache(&self) {
557 self.file_cache.lock().clear();
558 }
559
560 /// Physically truncate log file `file_num` to `offset` bytes (JE
561 /// `FileManager.truncateSingleFile`, FileManager.java:2345). Used at
562 /// recovery to remove a torn / half-written trailing entry so it cannot
563 /// be misread on a later scan. Evicts the cached handle first so a stale
564 /// open handle does not see the old length.
565 pub fn truncate_single_file(
566 &self,
567 file_num: u32,
568 offset: u64,
569 ) -> Result<()> {
570 if self.read_only {
571 return Err(LogError::WriteFailed(
572 "Cannot truncate file in read-only mode".to_string(),
573 ));
574 }
575 // Drop any cached handle so the next open sees the truncated length.
576 self.file_cache.lock().pop(&file_num);
577 let path = self.file_path(file_num);
578 if path.exists() {
579 let f = fs::OpenOptions::new().write(true).open(&path)?;
580 f.set_len(offset)?;
581 f.sync_all()?;
582 }
583 Ok(())
584 }
585
586 /// Flip the invisible bit (flags 0x10) on each LSN's log-entry header,
587 /// in file order, WITHOUT recomputing the checksum.
588 ///
589 /// Port of JE `FileManager.makeInvisible` (called from
590 /// `RollbackTracker.setInvisible`). The invisible bit is excluded from the
591 /// CRC at read time (cloaked, see `LogEntryHeader.turnOffInvisible` /
592 /// `log_manager` checksum path), so flipping it in place is a single-byte
593 /// `pwrite` per entry. The flags byte is at `file_offset + FLAGS_OFFSET`
594 /// (offset 5) of each entry.
595 ///
596 /// Caller must `force` the affected files afterwards for durability
597 /// (JE `RollbackTracker.recoveryEndFsyncInvisible`).
598 pub fn make_invisible(&self, file_num: u32, offsets: &[u32]) -> Result<()> {
599 if self.read_only {
600 return Err(LogError::WriteFailed(
601 "Cannot make entries invisible in read-only mode".to_string(),
602 ));
603 }
604 if offsets.is_empty() {
605 return Ok(());
606 }
607 let path = self.file_path(file_num);
608 if !path.exists() {
609 return Ok(());
610 }
611 // Drop any cached handle so the bit flip is observed on the next read.
612 self.file_cache.lock().pop(&file_num);
613 let file = OpenOptions::new().read(true).write(true).open(&path)?;
614 // FLAGS_OFFSET within an entry header is 5 (checksum[0..4], type[4],
615 // flags[5]).
616 const FLAGS_OFFSET: u64 = 5;
617 const INVISIBLE_MASK: u8 = 0x10;
618 for &off in offsets {
619 let flags_pos = off as u64 + FLAGS_OFFSET;
620 let mut byte = [0u8; 1];
621 Self::pread_exact(&file, flags_pos, &mut byte)?;
622 byte[0] |= INVISIBLE_MASK;
623 Self::pwrite_exact(&file, flags_pos, &byte)?;
624 }
625 Ok(())
626 }
627
628 /// fsync the given set of log files (JE `FileManager.force`). Used after
629 /// `make_invisible` to make the rollback's invisible bits durable so a
630 /// crash mid-rollback does not re-apply rolled-back entries.
631 pub fn force(&self, file_nums: &[u32]) -> Result<()> {
632 if self.read_only {
633 return Ok(());
634 }
635 for &file_num in file_nums {
636 let path = self.file_path(file_num);
637 if path.exists() {
638 let f = OpenOptions::new().write(true).open(&path)?;
639 f.sync_all()?;
640 }
641 }
642 Ok(())
643 }
644
645 #[cfg(unix)]
646 fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
647 use std::os::unix::fs::FileExt;
648 file.read_exact_at(buf, offset)?;
649 Ok(())
650 }
651
652 #[cfg(unix)]
653 fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
654 // Route header writes through posio so the DST fault layer covers them
655 // too (inactive in production -> identical to a direct write_all_at).
656 crate::posio::write_all_at(file, buf, offset)?;
657 Ok(())
658 }
659
660 #[cfg(not(unix))]
661 fn pread_exact(file: &File, offset: u64, buf: &mut [u8]) -> Result<()> {
662 use std::io::{Read, Seek, SeekFrom};
663 let mut f = file.try_clone()?;
664 f.seek(SeekFrom::Start(offset))?;
665 f.read_exact(buf)?;
666 Ok(())
667 }
668
669 #[cfg(not(unix))]
670 fn pwrite_exact(file: &File, offset: u64, buf: &[u8]) -> Result<()> {
671 use std::io::{Seek, SeekFrom, Write};
672 let mut f = file.try_clone()?;
673 f.seek(SeekFrom::Start(offset))?;
674 f.write_all(buf)?;
675 Ok(())
676 }
677
678 /// Truncate the log at (`file_num`, `offset`): truncate `file_num` to
679 /// `offset` and delete every higher-numbered file, in descending order to
680 /// avoid a log-entry gap (JE `FileManager.truncateLog`, FileManager.java:2374,
681 /// SR [#19463]). If `offset == 0` the file header itself is gone, so the
682 /// whole file is deleted too.
683 pub fn truncate_log(&self, file_num: u32, offset: u64) -> Result<()> {
684 if self.read_only {
685 return Err(LogError::WriteFailed(
686 "Cannot truncate log in read-only mode".to_string(),
687 ));
688 }
689 let last = self.get_last_file_num()?.unwrap_or(file_num);
690 let mut i = last as i64;
691 while i >= file_num as i64 {
692 let fnum = i as u32;
693 if self.file_path(fnum).exists() {
694 if fnum == file_num {
695 self.truncate_single_file(fnum, offset)?;
696 if offset != 0 {
697 i -= 1;
698 continue;
699 }
700 }
701 self.delete_file(fnum)?;
702 }
703 i -= 1;
704 }
705 Ok(())
706 }
707
708 /// Writes `data` to the current log file at the given file offset.
709 ///
710 /// `writeToFile()`. The caller must supply the exact file-level byte
711 /// offset at which `data` should be written (i.e. `firstLsn.fileOffset`
712 /// in terms). After a successful write the method checks whether the
713 /// file has grown past `max_file_size`; if so it calls `flip_file()` and
714 /// returns the new file number, otherwise it returns the current one.
715 ///
716 /// # Arguments
717 /// * `data` - The raw bytes to append (header + payload).
718 /// * `file_offset` - Byte offset within the file at which to write.
719 ///
720 /// # Returns
721 /// The file number that was actually written to.
722 /// Writes `data` at `file_offset` within log file `file_num`.
723 ///
724 /// JE faithfulness: JE `FileManager.writeLogBuffer` uses
725 /// `fullBuffer.getFirstLsn()` to determine which file to write to, not
726 /// `currentFileNum`. This method mirrors that by accepting an explicit
727 /// `file_num` parameter so `write_dirty` and `fill_flush_pending` can
728 /// write dirty buffers to the file their `first_lsn` belongs to.
729 ///
730 /// The auto-flip (check `file_len >= max_file_size` and call `flip_file`)
731 /// has been removed: file flips are managed exclusively by
732 /// `LogManager::log_internal` via the `flipped` flag and
733 /// `get_write_buffer`/`sync_log_end_and_finish_file`. Auto-flip in this
734 /// method would race with the explicit flip and double-create files.
735 pub fn write_buffer_to_file(
736 &self,
737 file_num: u32,
738 data: &[u8],
739 file_offset: u64,
740 ) -> Result<()> {
741 if self.read_only {
742 return Err(LogError::WriteFailed(
743 "Cannot write in read-only mode".to_string(),
744 ));
745 }
746
747 // Obtain (or create) the file handle under `file_latch`.
748 //
749 // We MUST hold `file_latch` for the entire exists-check → get/create
750 // sequence to avoid a TOCTOU race:
751 //
752 // Thread A: inside create_file_internal (created empty file, writing
753 // header but not done yet)
754 // Thread B: file_path.exists()=true → get_file_handle → tries to
755 // read the header from an empty file → "failed to fill
756 // whole buffer" (UnexpectedEof)
757 //
758 // Holding file_latch serialises creation and subsequent opens so that
759 // Thread B waits until Thread A's create_file_internal (which also
760 // holds file_latch) has written and fsynced the full header.
761 let handle = {
762 let _guard = self
763 .file_latch
764 .acquire()
765 .map_err(|e| LogError::LatchTimeout(e.to_string()))?;
766
767 if self.file_path(file_num).exists() {
768 self.get_file_handle(file_num)?
769 } else {
770 // create_file_internal (called here directly, since we already
771 // hold file_latch) creates the file, writes the header, fsyncs.
772 self.create_file_internal(file_num)?
773 }
774 };
775
776 {
777 let mut guard = handle.acquire()?;
778 guard.write_at(file_offset, data)?;
779 }
780
781 self.n_sequential_writes.fetch_add(1, Ordering::Relaxed);
782 self.n_sequential_write_bytes
783 .fetch_add(data.len() as u64, Ordering::Relaxed);
784
785 Ok(())
786 }
787
788 /// Writes `data` at `file_offset` within the CURRENT log file.
789 ///
790 /// For new entries written by `LogManager::log_internal` when the entry is
791 /// too large for the buffer pool (temp-buffer path). The current file is
792 /// always correct here because `set_last_position` has already advanced
793 /// `current_file_num` to the file that holds `current_lsn`.
794 ///
795 /// Callers that write data belonging to a SPECIFIC file (dirty buffer
796 /// flush in `write_dirty` / `fill_flush_pending`) must use
797 /// `write_buffer_to_file` instead to avoid writing old data to the
798 /// wrong file after a flip.
799 pub fn write_buffer(&self, data: &[u8], file_offset: u64) -> Result<u32> {
800 if self.read_only {
801 return Err(LogError::WriteFailed(
802 "Cannot write in read-only mode".to_string(),
803 ));
804 }
805
806 let file_num = self.current_file_num.load(Ordering::Acquire);
807 self.write_buffer_to_file(file_num, data, file_offset)?;
808 Ok(file_num)
809 }
810
811 /// Reads bytes from a log file at a given offset.
812 ///
813 ///
814 ///
815 /// # Arguments
816 /// * `file_num` - The log file number to read from.
817 /// * `offset` - Byte offset within the file.
818 /// * `buf` - Output buffer; filled with as many bytes as available
819 /// (may be less than `buf.len()` at end of file).
820 ///
821 /// # Returns
822 /// The number of bytes actually read.
823 pub fn read_from_file(
824 &self,
825 file_num: u32,
826 offset: u64,
827 buf: &mut [u8],
828 ) -> Result<usize> {
829 let handle = self.get_file_handle(file_num)?;
830 let mut guard = handle.acquire()?;
831 let n = guard.read_at(offset, buf)?;
832 self.n_sequential_reads.fetch_add(1, Ordering::Relaxed);
833 self.n_sequential_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
834 Ok(n)
835 }
836
837 /// Reads bytes from a log file at a given offset, counted as a random
838 /// (point-lookup) read rather than a sequential scan read.
839 ///
840 /// Used by `LogManager::read_at_lsn` for in-flight log reads.
841 pub fn read_from_file_random(
842 &self,
843 file_num: u32,
844 offset: u64,
845 buf: &mut [u8],
846 ) -> Result<usize> {
847 let handle = self.get_file_handle(file_num)?;
848 let mut guard = handle.acquire()?;
849 let n = guard.read_at(offset, buf)?;
850 self.n_random_reads.fetch_add(1, Ordering::Relaxed);
851 self.n_random_read_bytes.fetch_add(n as u64, Ordering::Relaxed);
852 Ok(n)
853 }
854
855 /// Returns the length of a log file in bytes.
856 pub fn get_file_length(&self, file_num: u32) -> Result<u64> {
857 let path = self.file_path(file_num);
858 if !path.exists() {
859 return Err(LogError::FileNotFound(format!(
860 "Log file not found: {}",
861 path.display()
862 )));
863 }
864 Ok(path.metadata()?.len())
865 }
866
867 /// Memory-maps a log file for read-only sequential access.
868 ///
869 /// Returns a `Mmap` covering the entire file. The OS handles page-in
870 /// lazily with automatic sequential read-ahead, eliminating all per-entry
871 /// `pread64` syscalls during recovery scanning.
872 ///
873 /// # Safety
874 /// The caller must not hold a mutable reference into the mapped memory
875 /// while other processes write to the file. During recovery, log files
876 /// are read-only, making this safe.
877 pub fn mmap_file(&self, file_num: u32) -> Result<Mmap> {
878 // Never mmap the current write file. It can be appended to
879 // concurrently (pwrite64 on the log-writer thread) while a
880 // disk-ordered cursor reads it; `memmap2` requires that a mapped file
881 // is not modified for the lifetime of the mapping, so mapping the live
882 // write file would be undefined behaviour. Callers (e.g. the
883 // file-manager log scanner) fall back to positioned `pread` reads,
884 // which are safe under concurrent appends. Complete (sealed) files are
885 // never written again and are safe to map.
886 if file_num == self.get_current_file_num() {
887 return Err(LogError::Io(std::io::Error::other(format!(
888 "refusing to mmap the current write file {file_num} \
889 (may be concurrently appended); use pread fallback"
890 ))));
891 }
892 let path = self.file_path(file_num);
893 let file = File::open(&path).map_err(|e| {
894 LogError::FileNotFound(format!(
895 "Cannot open {:?} for mmap: {}",
896 path, e
897 ))
898 })?;
899 // SAFETY: `file_num` is not the current write file (checked above), so
900 // it is a sealed log file whose bytes do not change for the lifetime
901 // of the mapping.
902 let mmap = unsafe { Mmap::map(&file) }.map_err(|e| {
903 LogError::Io(std::io::Error::other(format!(
904 "mmap {:?}: {}",
905 path, e
906 )))
907 })?;
908 Ok(mmap)
909 }
910
911 /// Returns current I/O statistics for this FileManager.
912 pub fn get_io_stats(&self) -> FileManagerIoStats {
913 FileManagerIoStats {
914 n_file_opens: self.n_file_opens.load(Ordering::Relaxed),
915 n_sequential_reads: self.n_sequential_reads.load(Ordering::Relaxed),
916 n_sequential_read_bytes: self
917 .n_sequential_read_bytes
918 .load(Ordering::Relaxed),
919 n_sequential_writes: self
920 .n_sequential_writes
921 .load(Ordering::Relaxed),
922 n_sequential_write_bytes: self
923 .n_sequential_write_bytes
924 .load(Ordering::Relaxed),
925 n_random_reads: self.n_random_reads.load(Ordering::Relaxed),
926 n_random_read_bytes: self
927 .n_random_read_bytes
928 .load(Ordering::Relaxed),
929 }
930 }
931
932 /// Fsyncs the current log file to stable storage and removes it from the
933 /// file-handle cache, making the old file handle eligible for GC.
934 ///
935 /// JE faithfulness (Part-3, DRIFT-3/7): mirrors
936 /// `FileManager.syncLogEndAndFinishFile()` which calls `syncLogEnd()` then
937 /// `endOfLog.close()`. Called by `LogBufferPool.getWriteBuffer` when
938 /// `flippedFile=true`, under the LWL, BEFORE `advanceLsn` advances the
939 /// LSN bookkeeping to the new file. This establishes the JE invariant
940 /// that the OLD file is durably closed before any entry is written to the
941 /// NEW file.
942 ///
943 /// References:
944 /// - JE `FileManager.syncLogEndAndFinishFile` (line 2077)
945 /// - JE `LogBufferPool.getWriteBuffer` (called after `bumpAndWriteDirty`
946 /// when `flippedFile=true`)
947 pub fn sync_log_end_and_finish_file(&self) -> Result<()> {
948 self.sync_log_end()?;
949 // Evict the current (old) file from the LRU cache so its OS file
950 // descriptor is released promptly — JE `endOfLog.close()`.
951 let file_num = self.current_file_num.load(Ordering::Acquire);
952 let mut cache = self.file_cache.lock();
953 cache.pop(&file_num);
954 Ok(())
955 }
956
957 /// Fsyncs the current log file to stable storage.
958 ///
959 /// JE: `FileManager.syncLogEnd()` (called from `syncLogEndAndFinishFile`).
960 pub fn sync_log_end(&self) -> Result<()> {
961 if self.read_only {
962 return Ok(());
963 }
964
965 let file_num = self.current_file_num.load(Ordering::Acquire);
966 let path = self.file_path(file_num);
967
968 if !path.exists() {
969 // Nothing to sync yet.
970 return Ok(());
971 }
972
973 let handle = self.get_file_handle(file_num)?;
974 let mut guard = handle.acquire()?;
975 // Use fdatasync (sync_data) — only log data must be durable here,
976 // not file metadata. uses FileChannel.force(false) for this.
977 guard.sync_data()?;
978 Ok(())
979 }
980
981 /// Closes the file manager, releasing all resources.
982 pub fn close(&self) -> Result<()> {
983 self.clear_cache();
984
985 // Release the lock file
986 if let Some(lock_file) = self.lock_file.write().take() {
987 {
988 #[allow(unused_imports)]
989 use fs2::FileExt;
990 let _ = lock_file.unlock();
991 }
992 drop(lock_file);
993 }
994
995 Ok(())
996 }
997}
998
999impl Drop for FileManager {
1000 fn drop(&mut self) {
1001 let _ = self.close();
1002 }
1003}
1004
1005/// Snapshot of FileManager I/O statistics.
1006///
1007/// FILEMGR_FILE_OPENS, FILEMGR_SEQUENTIAL_READS/WRITES,
1008/// FILEMGR_RANDOM_READS etc.
1009#[derive(Debug, Clone, Default)]
1010pub struct FileManagerIoStats {
1011 /// Number of log files opened (LRU cache miss).
1012 pub n_file_opens: u64,
1013 /// Number of sequential read operations (recovery scan).
1014 pub n_sequential_reads: u64,
1015 /// Total bytes read sequentially.
1016 pub n_sequential_read_bytes: u64,
1017 /// Number of sequential write operations.
1018 pub n_sequential_writes: u64,
1019 /// Total bytes written sequentially.
1020 pub n_sequential_write_bytes: u64,
1021 /// Number of random (point-lookup) read operations.
1022 pub n_random_reads: u64,
1023 /// Total bytes from random read operations.
1024 pub n_random_read_bytes: u64,
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029 use super::*;
1030 use tempfile::TempDir;
1031
1032 #[test]
1033 fn test_format_parse_file_number() {
1034 assert_eq!(format_file_number(0), "00000000");
1035 assert_eq!(format_file_number(42), "0000002a");
1036 assert_eq!(format_file_number(255), "000000ff");
1037 assert_eq!(format_file_number(0x12345678), "12345678");
1038
1039 assert_eq!(parse_file_number("00000000.ndb"), Some(0));
1040 assert_eq!(parse_file_number("0000002a.ndb"), Some(42));
1041 assert_eq!(parse_file_number("000000ff.ndb"), Some(255));
1042 assert_eq!(parse_file_number("12345678.ndb"), Some(0x12345678));
1043
1044 assert_eq!(parse_file_number("invalid.ndb"), None);
1045 assert_eq!(parse_file_number("00000000.txt"), None);
1046 }
1047
1048 #[test]
1049 fn test_file_manager_create() {
1050 let temp_dir = TempDir::new().unwrap();
1051 let manager =
1052 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1053
1054 assert_eq!(manager.get_current_file_num(), 0);
1055 assert_eq!(manager.get_first_file_num().unwrap(), None);
1056 }
1057
1058 #[test]
1059 fn test_file_manager_create_file() {
1060 let temp_dir = TempDir::new().unwrap();
1061 let manager =
1062 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1063
1064 let handle = manager.create_file(0).unwrap();
1065 assert_eq!(handle.file_num(), 0);
1066 assert_eq!(handle.log_version(), LOG_VERSION);
1067
1068 // File should exist
1069 let path = manager.file_path(0);
1070 assert!(path.exists());
1071
1072 // Should be able to get it again from cache
1073 let handle2 = manager.get_file_handle(0).unwrap();
1074 assert_eq!(handle2.file_num(), 0);
1075 }
1076
1077 #[test]
1078 fn test_file_manager_list_files() {
1079 let temp_dir = TempDir::new().unwrap();
1080 let manager =
1081 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1082
1083 manager.create_file(0).unwrap();
1084 manager.create_file(2).unwrap();
1085 manager.create_file(1).unwrap();
1086
1087 let files = manager.list_file_numbers().unwrap();
1088 assert_eq!(files, vec![0, 1, 2]);
1089
1090 assert_eq!(manager.get_first_file_num().unwrap(), Some(0));
1091 assert_eq!(manager.get_last_file_num().unwrap(), Some(2));
1092 }
1093
1094 #[test]
1095 fn test_file_manager_flip_file() {
1096 let temp_dir = TempDir::new().unwrap();
1097
1098 {
1099 let manager =
1100 FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1101 .unwrap();
1102
1103 // Create initial file
1104 manager.create_file(0).unwrap();
1105
1106 // Set current file
1107 manager.current_file_num.store(0, Ordering::Release);
1108 manager
1109 .last_used_lsn
1110 .store(Lsn::new(0, 1000).as_u64(), Ordering::Release);
1111
1112 // Flip to next file
1113 let next = manager.flip_file().unwrap();
1114 assert_eq!(next, 1);
1115 assert_eq!(manager.get_current_file_num(), 1);
1116
1117 // Should have created file 1
1118 let files = manager.list_file_numbers().unwrap();
1119 assert!(files.contains(&1));
1120 } // manager dropped here, releasing lock
1121 }
1122
1123 #[test]
1124 fn test_environment_locking() {
1125 let temp_dir = TempDir::new().unwrap();
1126
1127 // First manager locks the environment
1128 let _manager1 =
1129 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1130
1131 // Second manager should fail to lock
1132 let result = FileManager::new(temp_dir.path(), false, 10_000_000, 100);
1133 assert!(result.is_err());
1134 match result {
1135 Err(LogError::EnvironmentLocked(_)) => (),
1136 _ => panic!("Expected EnvironmentLocked error"),
1137 }
1138 }
1139
1140 #[test]
1141 fn test_nonexistent_directory_fails() {
1142 let result = FileManager::new(
1143 "/tmp/does_not_exist_noxu_xyz",
1144 false,
1145 10_000_000,
1146 100,
1147 );
1148 assert!(result.is_err());
1149 match result {
1150 Err(LogError::InvalidDirectory(_)) => (),
1151 _ => panic!("Expected InvalidDirectory error"),
1152 }
1153 }
1154
1155 #[test]
1156 fn test_get_file_handle_missing_file_fails() {
1157 let temp_dir = TempDir::new().unwrap();
1158 let manager =
1159 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1160
1161 let result = manager.get_file_handle(99);
1162 assert!(result.is_err());
1163 match result {
1164 Err(LogError::FileNotFound(_)) => (),
1165 _ => panic!("Expected FileNotFound error"),
1166 }
1167 }
1168
1169 #[test]
1170 fn test_delete_file() {
1171 let temp_dir = TempDir::new().unwrap();
1172 let manager =
1173 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1174
1175 manager.create_file(0).unwrap();
1176 assert!(manager.file_path(0).exists());
1177
1178 manager.delete_file(0).unwrap();
1179 assert!(!manager.file_path(0).exists());
1180 assert_eq!(manager.list_file_numbers().unwrap(), Vec::<u32>::new());
1181 }
1182
1183 #[test]
1184 fn test_delete_nonexistent_file_is_ok() {
1185 let temp_dir = TempDir::new().unwrap();
1186 let manager =
1187 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1188
1189 // Deleting a file that does not exist should not return an error.
1190 assert!(manager.delete_file(42).is_ok());
1191 }
1192
1193 #[test]
1194 fn test_set_last_position() {
1195 let temp_dir = TempDir::new().unwrap();
1196 let manager =
1197 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1198
1199 let next = Lsn::new(3, 1024);
1200 let last = Lsn::new(2, 512);
1201 manager.set_last_position(next, last);
1202
1203 assert_eq!(manager.get_next_available_lsn(), next);
1204 assert_eq!(manager.get_last_used_lsn(), last);
1205 assert_eq!(manager.get_current_file_num(), 3);
1206 }
1207
1208 #[test]
1209 fn test_read_only_create_file_fails() {
1210 let temp_dir = TempDir::new().unwrap();
1211 // Create a writable manager first to avoid the lock conflict.
1212 {
1213 let _mgr =
1214 FileManager::new(temp_dir.path(), false, 10_000_000, 100)
1215 .unwrap();
1216 } // lock released on drop
1217
1218 // Read-only mode must not create files.
1219 let ro_mgr =
1220 FileManager::new(temp_dir.path(), true, 10_000_000, 100).unwrap();
1221 let result = ro_mgr.create_file(0);
1222 assert!(result.is_err());
1223 }
1224
1225 #[test]
1226 fn test_first_and_last_file_num_empty() {
1227 let temp_dir = TempDir::new().unwrap();
1228 let manager =
1229 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1230
1231 assert_eq!(manager.get_first_file_num().unwrap(), None);
1232 assert_eq!(manager.get_last_file_num().unwrap(), None);
1233 }
1234
1235 #[test]
1236 fn test_clear_cache() {
1237 let temp_dir = TempDir::new().unwrap();
1238 let manager =
1239 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1240
1241 manager.create_file(0).unwrap();
1242 // Clearing the cache should not panic or corrupt state.
1243 manager.clear_cache();
1244
1245 // After clearing, get_file_handle must re-open the file.
1246 let handle = manager.get_file_handle(0).unwrap();
1247 assert_eq!(handle.file_num(), 0);
1248 }
1249
1250 /// C-1 regression: parent directory must be fsynced after creating each
1251 /// new log file so the directory entry is durable across a power loss.
1252 ///
1253 /// This test verifies that `create_file_internal` completes without error
1254 /// (which confirms the dir-open + sync_all code path runs), and that
1255 /// the created file is visible in a directory listing performed after the
1256 /// call returns — i.e. the same state recovery would see after a restart.
1257 #[test]
1258 fn test_parent_dir_fsynced_after_file_create() {
1259 let temp_dir = TempDir::new().unwrap();
1260 let manager =
1261 FileManager::new(temp_dir.path(), false, 10_000_000, 100).unwrap();
1262
1263 // Creating file 0 must succeed (includes parent-dir fsync).
1264 manager.create_file(0).unwrap();
1265
1266 // The file must be present in the directory listing — the same check
1267 // recovery performs when scanning for log files to replay.
1268 let listed = manager.list_file_numbers().unwrap();
1269 assert_eq!(
1270 listed,
1271 vec![0],
1272 "file 0 must be visible in dir listing after create"
1273 );
1274
1275 // Create a second file (flip) to exercise the path for file_num > 0.
1276 manager.flip_file().unwrap();
1277 let listed2 = manager.list_file_numbers().unwrap();
1278 assert!(listed2.contains(&1), "file 1 must be visible after flip");
1279 }
1280}