pub(crate) mod format;
pub(crate) mod log_buffer;
pub mod options;
pub mod reader;
pub use options::JournalOptions;
pub use reader::{JournalIter, JournalReader, JournalRecord, JournalTailState};
use crate::{Error, Result};
use log_buffer::LogBuffer;
use std::fs::{File, OpenOptions};
use std::io::Seek;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Lsn(pub u64);
impl Lsn {
pub const ZERO: Lsn = Lsn(0);
#[must_use]
pub fn as_u64(self) -> u64 {
self.0
}
}
impl std::fmt::Display for Lsn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Lsn({})", self.0)
}
}
pub struct JournalHandle {
pub(crate) file: File,
pub(crate) next_lsn: AtomicU64,
pub(crate) synced_lsn: AtomicU64,
pub(crate) sync_gate: Mutex<()>,
#[allow(dead_code)]
path: std::path::PathBuf,
#[cfg(all(target_os = "linux", feature = "async"))]
pub(crate) native_ring: std::sync::OnceLock<
Option<std::sync::Arc<crate::async_io::completion_driver::AsyncIoUring>>,
>,
pub(crate) direct: bool,
pub(crate) log_buffer: Option<Mutex<LogBuffer>>,
}
impl JournalHandle {
pub(crate) fn open(path: &Path) -> Result<Self> {
Self::open_with_options(path, JournalOptions::default())
}
pub(crate) fn open_with_options(path: &Path, options: JournalOptions) -> Result<Self> {
if options.direct {
Self::open_direct(path, options)
} else {
Self::open_buffered(path)
}
}
fn open_buffered(path: &Path) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
let len = file.seek(std::io::SeekFrom::End(0)).map_err(Error::Io)?;
Ok(Self {
file,
next_lsn: AtomicU64::new(len),
synced_lsn: AtomicU64::new(len),
sync_gate: Mutex::new(()),
path: path.to_path_buf(),
#[cfg(all(target_os = "linux", feature = "async"))]
native_ring: std::sync::OnceLock::new(),
direct: false,
log_buffer: None,
})
}
fn open_direct(path: &Path, options: JournalOptions) -> Result<Self> {
let resume_lsn = if path.exists() {
scan_clean_end(path)?
} else {
0
};
let sector_size = crate::platform::probe_sector_size(path);
let (file, direct_active) = open_direct_journal(path, sector_size)?;
let log_buffer = if direct_active {
let cap_bytes = options.log_buffer_kib.saturating_mul(1024);
let mut buf = LogBuffer::new(cap_bytes, sector_size, 0)?;
if resume_lsn > 0 {
rehydrate_log_buffer(&mut buf, &file, sector_size, resume_lsn)?;
}
Some(Mutex::new(buf))
} else {
None
};
Ok(Self {
file,
next_lsn: AtomicU64::new(resume_lsn),
synced_lsn: AtomicU64::new(resume_lsn),
sync_gate: Mutex::new(()),
path: path.to_path_buf(),
#[cfg(all(target_os = "linux", feature = "async"))]
native_ring: std::sync::OnceLock::new(),
direct: direct_active,
log_buffer,
})
}
#[must_use]
pub fn is_direct_active(&self) -> bool {
self.direct
}
pub fn append(&self, record: &[u8]) -> Result<Lsn> {
#[cfg(feature = "tracing")]
let _span = tracing::trace_span!(
"fsys::journal::append",
payload_bytes = record.len(),
direct = self.direct,
)
.entered();
if let Some(buffer_mutex) = &self.log_buffer {
let mut buf = buffer_mutex.lock().unwrap_or_else(|p| p.into_inner());
let (_start, end) = buf.append_frame(&self.file, record)?;
self.next_lsn.store(end, Ordering::Release);
#[cfg(feature = "tracing")]
tracing::trace!(end_lsn = end, "direct append complete");
return Ok(Lsn(end));
}
let frame = format::encode_frame_owned(record)?;
let frame_len = frame.len() as u64;
let start = self.next_lsn.fetch_add(frame_len, Ordering::AcqRel);
let end = start + frame_len;
crate::platform::write_at(&self.file, start, &frame)?;
Ok(Lsn(end))
}
pub fn sync_through(&self, lsn: Lsn) -> Result<()> {
#[cfg(feature = "tracing")]
let _span = tracing::trace_span!(
"fsys::journal::sync_through",
target_lsn = lsn.0,
direct = self.direct,
)
.entered();
if self.synced_lsn.load(Ordering::Acquire) >= lsn.0 {
#[cfg(feature = "tracing")]
tracing::trace!(
synced_lsn = self.synced_lsn.load(Ordering::Acquire),
"fast-path sync skipped"
);
return Ok(());
}
let _guard = self.sync_gate.lock().unwrap_or_else(|p| p.into_inner());
if self.synced_lsn.load(Ordering::Acquire) >= lsn.0 {
return Ok(());
}
if let Some(buffer_mutex) = &self.log_buffer {
let mut buf = buffer_mutex.lock().unwrap_or_else(|p| p.into_inner());
buf.flush_partial(&self.file)?;
}
let frontier = self.next_lsn.load(Ordering::Acquire);
self.file.sync_data().map_err(Error::Io)?;
self.synced_lsn.store(frontier, Ordering::Release);
#[cfg(feature = "tracing")]
tracing::debug!(new_synced_lsn = frontier, "group-commit fsync completed");
Ok(())
}
#[must_use]
pub fn synced_lsn(&self) -> Lsn {
Lsn(self.synced_lsn.load(Ordering::Acquire))
}
#[must_use]
pub fn next_lsn(&self) -> Lsn {
Lsn(self.next_lsn.load(Ordering::Acquire))
}
pub fn preallocate(&self, offset: u64, len: u64) -> Result<()> {
crate::platform::preallocate(&self.file, offset, len)
}
pub fn advise(&self, offset: u64, len: u64, advice: crate::Advice) -> Result<()> {
crate::platform::advise(&self.file, offset, len, advice)
}
pub fn close(self) -> Result<()> {
let frontier = self.next_lsn.load(Ordering::Acquire);
self.sync_through(Lsn(frontier))?;
drop(self);
Ok(())
}
}
#[allow(dead_code)]
fn _assert_journal_handle_is_send() {
fn require_send<T: Send>(_: &T) {}
fn require_sync<T: Sync>(_: &T) {}
let _ = |h: &JournalHandle| {
require_send(h);
require_sync(h);
};
}
impl Drop for JournalHandle {
fn drop(&mut self) {
if let Some(buffer_mutex) = &self.log_buffer {
if let Ok(mut buf) = buffer_mutex.lock() {
let _ = buf.flush_partial(&self.file);
}
let _ = self.file.sync_data();
}
}
}
fn scan_clean_end(path: &Path) -> Result<u64> {
let mut reader = JournalReader::open(path)?;
if reader.file_size() == 0 {
return Ok(0);
}
let mut iter = reader.iter();
while iter.next().transpose()?.is_some() {}
drop(iter);
match reader.tail_state() {
JournalTailState::CleanEnd
| JournalTailState::TruncatedHeader
| JournalTailState::TruncatedPayload
| JournalTailState::ChecksumMismatch => Ok(reader.position().0),
JournalTailState::BadMagic => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("journal at {:?} has bad magic at offset {} — refusing to open in direct mode", path, reader.position().0),
))),
JournalTailState::LengthOverflow => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("journal at {:?} has frame length overflow at offset {} — refusing to open in direct mode", path, reader.position().0),
))),
}
}
#[cfg(target_os = "linux")]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
use std::os::fd::FromRawFd;
let path_cstr =
std::ffi::CString::new(path.as_os_str().to_string_lossy().as_bytes()).map_err(|_| {
Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"journal path contains a NUL byte",
))
})?;
let mut flags = libc::O_RDWR | libc::O_CREAT | libc::O_CLOEXEC | libc::O_DIRECT;
let fd = unsafe { libc::open(path_cstr.as_ptr(), flags, 0o600_i32) };
if fd >= 0 {
return Ok((unsafe { File::from_raw_fd(fd) }, true));
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINVAL) {
flags &= !libc::O_DIRECT;
let fd2 = unsafe { libc::open(path_cstr.as_ptr(), flags, 0o600_i32) };
if fd2 >= 0 {
return Ok((unsafe { File::from_raw_fd(fd2) }, false));
}
return Err(Error::Io(std::io::Error::last_os_error()));
}
Err(Error::Io(err))
}
#[cfg(target_os = "macos")]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
use std::os::unix::io::AsRawFd;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_NOCACHE, 1) };
Ok((file, ret == 0))
}
#[cfg(target_os = "windows")]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
use std::os::windows::ffi::OsStrExt;
use std::os::windows::io::FromRawHandle;
use windows_sys::Win32::Foundation::{GetLastError, INVALID_HANDLE_VALUE};
use windows_sys::Win32::Storage::FileSystem::{
CreateFileW, FILE_FLAG_NO_BUFFERING, FILE_FLAG_WRITE_THROUGH, FILE_GENERIC_READ,
FILE_GENERIC_WRITE, FILE_SHARE_READ, OPEN_ALWAYS,
};
let mut wide: Vec<u16> = path.as_os_str().encode_wide().collect();
wide.push(0);
let handle = unsafe {
CreateFileW(
wide.as_ptr(),
FILE_GENERIC_READ | FILE_GENERIC_WRITE,
FILE_SHARE_READ,
std::ptr::null(),
OPEN_ALWAYS,
FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH,
std::ptr::null_mut(),
)
};
if handle != INVALID_HANDLE_VALUE && !handle.is_null() {
return Ok((unsafe { File::from_raw_handle(handle as _) }, true));
}
let err_code = unsafe { GetLastError() };
if err_code == 87 {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
return Ok((file, false));
}
Err(Error::Io(std::io::Error::from_raw_os_error(
err_code as i32,
)))
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
Ok((file, false))
}
fn rehydrate_log_buffer(
buf: &mut LogBuffer,
file: &File,
sector_size: u32,
resume_lsn: u64,
) -> Result<()> {
let ss = sector_size as u64;
let last_sector_start = (resume_lsn / ss) * ss;
let in_sector_offset = (resume_lsn - last_sector_start) as usize;
if in_sector_offset == 0 {
buf.set_flush_pos_for_resume(resume_lsn, 0, &[]);
return Ok(());
}
let bytes = crate::platform::read_range(file, last_sector_start, sector_size as usize)?;
buf.set_flush_pos_for_resume(last_sector_start, in_sector_offset, &bytes);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lsn_default_is_zero() {
assert_eq!(Lsn::default(), Lsn::ZERO);
assert_eq!(Lsn::ZERO.as_u64(), 0);
}
#[test]
fn lsn_display_format() {
assert_eq!(format!("{}", Lsn(42)), "Lsn(42)");
}
#[test]
fn lsn_ordering_matches_u64() {
assert!(Lsn(100) < Lsn(200));
assert!(Lsn(0) < Lsn(1));
assert_eq!(Lsn(42), Lsn(42));
}
fn tmp_path(tag: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!(
"fsys_journal_test_{}_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos(),
tag
))
}
struct Cleanup(std::path::PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
#[test]
fn open_creates_new_file_with_zero_lsn() {
let path = tmp_path("new");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
assert_eq!(j.next_lsn(), Lsn::ZERO);
assert_eq!(j.synced_lsn(), Lsn::ZERO);
}
#[test]
fn append_advances_lsn_by_framed_record_length() {
let path = tmp_path("append");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn1 = j.append(b"hello").expect("append1");
assert_eq!(lsn1, Lsn(5 + 12));
assert_eq!(j.next_lsn(), Lsn(17));
let lsn2 = j.append(b" world").expect("append2");
assert_eq!(lsn2, Lsn(17 + 6 + 12));
assert_eq!(j.next_lsn(), Lsn(35));
}
#[test]
fn append_empty_record_writes_framed_marker() {
let path = tmp_path("empty_record");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let _ = j.append(b"first").expect("first");
assert_eq!(j.next_lsn(), Lsn(5 + 12));
let lsn = j.append(b"").expect("empty");
assert_eq!(lsn, Lsn(17 + 12));
assert_eq!(j.next_lsn(), Lsn(29));
}
#[test]
fn sync_through_zero_is_noop() {
let path = tmp_path("sync_zero");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
j.sync_through(Lsn::ZERO).expect("sync_through(0)");
assert_eq!(j.synced_lsn(), Lsn::ZERO);
}
#[test]
fn sync_through_advances_synced_lsn() {
let path = tmp_path("sync_advance");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append(b"durable").expect("append");
j.sync_through(lsn).expect("sync");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn append_then_reopen_resumes_at_existing_size() {
let path = tmp_path("resume");
let _g = Cleanup(path.clone());
{
let j = JournalHandle::open(&path).expect("open1");
let _ = j.append(b"persist this").expect("append");
j.close().expect("close");
}
let j2 = JournalHandle::open(&path).expect("reopen");
assert_eq!(j2.next_lsn(), Lsn(24));
assert_eq!(j2.synced_lsn(), Lsn(24));
}
#[test]
fn append_writes_framed_records_to_file() {
let path = tmp_path("readback");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let _ = j.append(b"alpha").expect("a1");
let _ = j.append(b"beta").expect("a2");
j.close().expect("close");
let bytes = std::fs::read(&path).expect("read");
assert_eq!(bytes.len(), 33);
match format::decode_frame(&bytes) {
format::FrameDecode::Ok {
consumed,
payload_start,
payload_end,
} => {
assert_eq!(consumed, 17);
assert_eq!(&bytes[payload_start..payload_end], b"alpha");
match format::decode_frame(&bytes[17..]) {
format::FrameDecode::Ok {
consumed,
payload_start,
payload_end,
} => {
assert_eq!(consumed, 16);
assert_eq!(&bytes[17 + payload_start..17 + payload_end], b"beta");
}
other => panic!("frame 2 decode failed: {other:?}"),
}
}
other => panic!("frame 1 decode failed: {other:?}"),
}
}
#[test]
fn group_commit_concurrent_sync_through() {
use std::sync::Arc;
let path = tmp_path("group_commit");
let _g = Cleanup(path.clone());
let j = Arc::new(JournalHandle::open(&path).expect("open"));
let mut lsns = Vec::new();
for i in 0..32 {
let lsn = j.append(format!("rec {i:04}").as_bytes()).expect("append");
lsns.push(lsn);
}
let mut handles = Vec::new();
for lsn in &lsns {
let j = j.clone();
let lsn = *lsn;
handles.push(std::thread::spawn(move || {
j.sync_through(lsn).expect("sync_through")
}));
}
for h in handles {
h.join().expect("join");
}
assert!(j.synced_lsn() >= *lsns.last().unwrap());
}
}