pub mod backend;
pub(crate) mod format;
pub(crate) mod log_buffer;
pub mod options;
pub mod reader;
pub use backend::{JournalBackend, JournalBackendHealth, JournalBackendInfo, JournalBackendKind};
pub use options::{JournalOptions, SyncMode, WriteLifetimeHint};
pub use reader::{JournalIter, JournalReader, JournalRecord, JournalTailState};
use crate::{Error, Result};
use crossbeam_utils::CachePadded;
use log_buffer::LogBuffer;
use parking_lot::{Condvar, Mutex as PlMutex};
use std::fs::{File, OpenOptions};
use std::io::Seek;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
const STACK_FRAME_THRESHOLD: usize = 2048;
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Lsn(u64);
impl Lsn {
pub const ZERO: Lsn = Lsn(0);
#[must_use]
#[inline]
pub const fn new(offset: u64) -> Self {
Self(offset)
}
#[must_use]
#[inline]
pub const fn as_u64(self) -> u64 {
self.0
}
}
impl From<u64> for Lsn {
#[inline]
fn from(offset: u64) -> Self {
Self(offset)
}
}
impl From<Lsn> for u64 {
#[inline]
fn from(lsn: Lsn) -> Self {
lsn.0
}
}
impl std::fmt::Display for Lsn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Lsn({})", self.0)
}
}
pub struct JournalHandle {
pub(crate) file: File,
pub(crate) next_lsn: CachePadded<AtomicU64>,
pub(crate) synced_lsn: CachePadded<AtomicU64>,
pub(crate) group_commit: GroupCommit,
#[allow(dead_code)]
path: std::path::PathBuf,
#[cfg(all(target_os = "linux", feature = "async"))]
pub(crate) native_ring: std::sync::OnceLock<
Option<std::sync::Arc<crate::async_io::completion_driver::AsyncIoUring>>,
>,
pub(crate) direct: bool,
log_buffer: Option<LogBuffer>,
observer: Option<std::sync::Arc<dyn crate::observer::FsysObserver>>,
sync_mode: options::SyncMode,
}
impl JournalHandle {
pub(crate) fn open(path: &Path) -> Result<Self> {
Self::open_with_options(path, JournalOptions::default())
}
pub(crate) fn open_with_options(path: &Path, options: JournalOptions) -> Result<Self> {
if options.direct {
Self::open_direct(path, options)
} else {
Self::open_buffered(path, options)
}
}
fn open_buffered(path: &Path, options: JournalOptions) -> Result<Self> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
Self::apply_write_lifetime_hint(&file, options.write_lifetime_hint);
let len = file.seek(std::io::SeekFrom::End(0)).map_err(Error::Io)?;
Ok(Self {
file,
next_lsn: CachePadded::new(AtomicU64::new(len)),
synced_lsn: CachePadded::new(AtomicU64::new(len)),
group_commit: GroupCommit::new(
options.group_commit_window,
options.group_commit_max_batch,
len,
),
path: path.to_path_buf(),
#[cfg(all(target_os = "linux", feature = "async"))]
native_ring: std::sync::OnceLock::new(),
direct: false,
log_buffer: None,
observer: None,
sync_mode: options.sync_mode,
})
}
fn apply_write_lifetime_hint(file: &File, hint: Option<options::WriteLifetimeHint>) {
if let Some(h) = hint {
let ordinal: u8 = match h {
options::WriteLifetimeHint::Short => 0,
options::WriteLifetimeHint::Medium => 1,
options::WriteLifetimeHint::Long => 2,
options::WriteLifetimeHint::Extreme => 3,
};
let _ = crate::platform::set_write_lifetime_hint(file, ordinal);
}
}
fn open_direct(path: &Path, options: JournalOptions) -> Result<Self> {
let resume_lsn = if path.exists() {
scan_clean_end(path)?
} else {
0
};
let sector_size = crate::platform::probe_sector_size(path);
let (file, direct_active) = open_direct_journal(path, sector_size)?;
Self::apply_write_lifetime_hint(&file, options.write_lifetime_hint);
let log_buffer = if direct_active {
let cap_bytes = options.log_buffer_kib.saturating_mul(1024);
let buf = LogBuffer::new(cap_bytes, sector_size, 0)?;
if resume_lsn > 0 {
rehydrate_log_buffer(&buf, &file, sector_size, resume_lsn)?;
}
Some(buf)
} else {
None
};
Ok(Self {
file,
next_lsn: CachePadded::new(AtomicU64::new(resume_lsn)),
synced_lsn: CachePadded::new(AtomicU64::new(resume_lsn)),
group_commit: GroupCommit::new(
options.group_commit_window,
options.group_commit_max_batch,
resume_lsn,
),
path: path.to_path_buf(),
#[cfg(all(target_os = "linux", feature = "async"))]
native_ring: std::sync::OnceLock::new(),
direct: direct_active,
log_buffer,
observer: None,
sync_mode: options.sync_mode,
})
}
pub(crate) fn set_observer(
&mut self,
observer: Option<std::sync::Arc<dyn crate::observer::FsysObserver>>,
) {
self.observer = observer;
}
#[must_use]
#[inline]
pub fn is_direct_active(&self) -> bool {
self.direct
}
pub fn append(&self, record: &[u8]) -> Result<Lsn> {
#[cfg(feature = "tracing")]
let _span = tracing::trace_span!(
"fsys::journal::append",
payload_bytes = record.len(),
direct = self.direct,
)
.entered();
let obs_ref = self.observer.as_ref();
let obs_start = obs_ref.map(|_| Instant::now());
let result = self.append_inner(record);
if let (Some(obs), Some(start)) = (obs_ref, obs_start) {
let bytes = record.len() as u64 + format::FRAME_OVERHEAD as u64;
obs.on_journal_append(crate::observer::JournalAppendEvent {
bytes_written: bytes,
records: 1,
duration: start.elapsed(),
error: result.is_err(),
});
}
result
}
fn append_inner(&self, record: &[u8]) -> Result<Lsn> {
if let Some(log_buffer) = &self.log_buffer {
let (_start, end) = log_buffer.append_frame(&self.file, record)?;
self.next_lsn.store(end, Ordering::Release);
#[cfg(feature = "tracing")]
tracing::trace!(end_lsn = end, "direct append complete");
return Ok(Lsn(end));
}
let payload_len = record.len();
if (payload_len as u64) > (format::FRAME_MAX_PAYLOAD as u64) {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"journal record exceeds FRAME_MAX_PAYLOAD (256 MiB)",
)));
}
let total = payload_len
.checked_add(format::FRAME_OVERHEAD)
.ok_or_else(|| {
Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"journal frame size overflow",
))
})?;
let frame_len = total as u64;
let start = self.next_lsn.fetch_add(frame_len, Ordering::Release);
let end = start + frame_len;
if total <= STACK_FRAME_THRESHOLD {
let mut stack: std::mem::MaybeUninit<[u8; STACK_FRAME_THRESHOLD]> =
std::mem::MaybeUninit::uninit();
let stack_slice: &mut [u8] =
unsafe { std::slice::from_raw_parts_mut(stack.as_mut_ptr().cast::<u8>(), total) };
let _ = format::encode_frame_into(record, stack_slice)?;
crate::platform::write_at(&self.file, start, stack_slice)?;
} else {
let frame = format::encode_frame_owned(record)?;
crate::platform::write_at(&self.file, start, &frame)?;
}
Ok(Lsn(end))
}
pub fn append_batch(&self, records: &[&[u8]]) -> Result<Lsn> {
#[cfg(feature = "tracing")]
let _span = tracing::trace_span!(
"fsys::journal::append_batch",
record_count = records.len(),
direct = self.direct,
)
.entered();
let obs_ref = self.observer.as_ref();
let obs_start = obs_ref.map(|_| Instant::now());
let result = self.append_batch_inner(records);
if let (Some(obs), Some(start)) = (obs_ref, obs_start) {
let bytes = records.iter().map(|r| r.len() as u64).sum::<u64>()
+ records.len() as u64 * format::FRAME_OVERHEAD as u64;
obs.on_journal_append(crate::observer::JournalAppendEvent {
bytes_written: bytes,
records: u32::try_from(records.len()).unwrap_or(u32::MAX),
duration: start.elapsed(),
error: result.is_err(),
});
}
result
}
fn append_batch_inner(&self, records: &[&[u8]]) -> Result<Lsn> {
if records.is_empty() {
return Ok(Lsn(self.next_lsn.load(Ordering::Acquire)));
}
let mut total: usize = 0;
for record in records {
if (record.len() as u64) > (format::FRAME_MAX_PAYLOAD as u64) {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"journal record exceeds FRAME_MAX_PAYLOAD (256 MiB)",
)));
}
let frame_size = record
.len()
.checked_add(format::FRAME_OVERHEAD)
.ok_or_else(|| {
Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"journal frame size overflow",
))
})?;
total = total.checked_add(frame_size).ok_or_else(|| {
Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"journal append_batch total size overflow",
))
})?;
}
if let Some(log_buffer) = &self.log_buffer {
let last_end = match log_buffer.try_append_frames_batched(records, total)? {
Some((_start, end)) => end,
None => {
let mut last: u64 = self.next_lsn.load(Ordering::Acquire);
for record in records {
let (_start, end) = log_buffer.append_frame(&self.file, record)?;
last = end;
}
last
}
};
self.next_lsn.store(last_end, Ordering::Release);
#[cfg(feature = "tracing")]
tracing::trace!(end_lsn = last_end, "direct append_batch complete");
return Ok(Lsn::new(last_end));
}
let frame_total = total as u64;
let start = self.next_lsn.fetch_add(frame_total, Ordering::Release);
let end = start + frame_total;
#[allow(clippy::uninit_vec)]
let mut buf: Vec<u8> = {
let mut v: Vec<u8> = Vec::with_capacity(total);
unsafe {
v.set_len(total);
}
v
};
let mut cursor = 0usize;
for record in records {
let written = format::encode_frame_into(record, &mut buf[cursor..])?;
cursor += written;
}
debug_assert_eq!(cursor, total);
crate::platform::write_at(&self.file, start, &buf)?;
#[cfg(feature = "tracing")]
tracing::trace!(end_lsn = end, "buffered append_batch complete");
Ok(Lsn(end))
}
pub fn sync_through(&self, lsn: Lsn) -> Result<()> {
#[cfg(feature = "tracing")]
let _span = tracing::trace_span!(
"fsys::journal::sync_through",
target_lsn = lsn.0,
direct = self.direct,
)
.entered();
if self.synced_lsn.load(Ordering::Acquire) >= lsn.0 {
#[cfg(feature = "tracing")]
tracing::trace!(
synced_lsn = self.synced_lsn.load(Ordering::Acquire),
"fast-path sync skipped"
);
return Ok(());
}
let leader_start: Instant;
{
let mut state = self.group_commit.state.lock();
loop {
if state.committed_lsn >= lsn.0 {
#[cfg(feature = "tracing")]
tracing::trace!(
committed_lsn = state.committed_lsn,
"group-commit follower covered without leadership"
);
return Ok(());
}
if !state.in_flight {
state.in_flight = true;
leader_start = Instant::now();
break;
}
let _ = self
.group_commit
.pending_followers
.fetch_add(1, Ordering::Release);
let _ = self.group_commit.cv_leader.notify_one();
self.group_commit.cv_followers.wait(&mut state);
drop(state);
let _ = self
.group_commit
.pending_followers
.fetch_sub(1, Ordering::AcqRel);
if self.synced_lsn.load(Ordering::Acquire) >= lsn.0 {
return Ok(());
}
state = self.group_commit.state.lock();
}
if let Some(window) = self.group_commit.window {
let deadline = Instant::now() + window;
while self.group_commit.pending_followers.load(Ordering::Acquire)
< self.group_commit.max_batch
{
let now = Instant::now();
if now >= deadline {
break;
}
let timeout = deadline - now;
let result = self.group_commit.cv_leader.wait_for(&mut state, timeout);
if result.timed_out() {
break;
}
}
}
drop(state);
}
if let Some(log_buffer) = &self.log_buffer {
log_buffer.flush_partial(&self.file)?;
}
let frontier = self.next_lsn.load(Ordering::Acquire);
let sync_result = match self.sync_mode {
options::SyncMode::Full => self.file.sync_data().map_err(Error::Io),
options::SyncMode::Barrier => crate::platform::sync_barrier(&self.file),
};
let followers_at_commit;
{
let mut state = self.group_commit.state.lock();
if sync_result.is_ok() && frontier > state.committed_lsn {
state.committed_lsn = frontier;
self.synced_lsn.store(frontier, Ordering::Release);
}
followers_at_commit = self.group_commit.pending_followers.load(Ordering::Acquire);
state.in_flight = false;
let _ = self.group_commit.cv_followers.notify_all();
}
#[cfg(feature = "tracing")]
if sync_result.is_ok() {
tracing::debug!(new_synced_lsn = frontier, "group-commit fsync completed");
}
if let Some(obs) = self.observer.as_ref() {
obs.on_journal_sync(crate::observer::JournalSyncEvent {
durable_lsn: frontier,
duration: leader_start.elapsed(),
followers_at_commit,
error: sync_result.is_err(),
});
}
sync_result
}
#[must_use]
#[inline]
pub fn synced_lsn(&self) -> Lsn {
Lsn(self.synced_lsn.load(Ordering::Acquire))
}
#[must_use]
#[inline]
pub fn next_lsn(&self) -> Lsn {
Lsn(self.next_lsn.load(Ordering::Acquire))
}
#[must_use]
#[inline]
pub fn backend_kind(&self) -> JournalBackendKind {
#[cfg(all(target_os = "linux", feature = "async"))]
{
if let Some(Some(_)) = self.native_ring.get() {
return JournalBackendKind::KernelIoUring;
}
}
if self.direct {
JournalBackendKind::KernelDirect
} else {
JournalBackendKind::KernelBuffered
}
}
#[must_use]
#[inline]
pub fn backend_health(&self) -> JournalBackendHealth {
JournalBackendHealth::empty(self.backend_kind())
}
#[must_use]
pub fn backend_info(&self) -> JournalBackendInfo {
let kind = self.backend_kind();
let reason = match kind {
JournalBackendKind::KernelIoUring => {
"Linux io_uring native substrate active for this journal"
}
JournalBackendKind::KernelDirect => {
"Direct-IO mode (JournalOptions::direct(true)) active"
}
JournalBackendKind::KernelBuffered => {
"default buffered-IO mode (lock-free pwrite + group-commit fdatasync)"
}
JournalBackendKind::Spdk => {
"SPDK backend in use (opened via fsys-spdk companion crate)"
}
};
JournalBackendInfo::single(kind, reason)
}
pub fn preallocate(&self, offset: u64, len: u64) -> Result<()> {
crate::platform::preallocate(&self.file, offset, len)
}
pub fn advise(&self, offset: u64, len: u64, advice: crate::Advice) -> Result<()> {
crate::platform::advise(&self.file, offset, len, advice)
}
pub fn close(self) -> Result<()> {
let frontier = self.next_lsn.load(Ordering::Acquire);
self.sync_through(Lsn(frontier))?;
drop(self);
Ok(())
}
}
#[allow(dead_code)]
fn _assert_journal_handle_is_send() {
fn require_send<T: Send>(_: &T) {}
fn require_sync<T: Sync>(_: &T) {}
let _ = |h: &JournalHandle| {
require_send(h);
require_sync(h);
};
}
impl Drop for JournalHandle {
fn drop(&mut self) {
if let Some(log_buffer) = &self.log_buffer {
let _ = log_buffer.flush_partial(&self.file);
let _ = self.file.sync_data();
}
}
}
pub(crate) struct GroupCommitState {
pub(crate) in_flight: bool,
pub(crate) committed_lsn: u64,
}
pub(crate) struct GroupCommit {
pub(crate) state: PlMutex<GroupCommitState>,
pub(crate) cv_followers: Condvar,
cv_leader: Condvar,
window: Option<Duration>,
max_batch: u32,
pub(crate) pending_followers: std::sync::atomic::AtomicU32,
}
impl GroupCommit {
pub(crate) fn new(
window: Option<Duration>,
max_batch: u32,
initial_committed_lsn: u64,
) -> Self {
Self {
state: PlMutex::new(GroupCommitState {
in_flight: false,
committed_lsn: initial_committed_lsn,
}),
cv_followers: Condvar::new(),
cv_leader: Condvar::new(),
window,
max_batch,
pending_followers: std::sync::atomic::AtomicU32::new(0),
}
}
}
fn scan_clean_end(path: &Path) -> Result<u64> {
let mut reader = JournalReader::open(path)?;
if reader.file_size() == 0 {
return Ok(0);
}
let mut iter = reader.iter();
while iter.next().transpose()?.is_some() {}
drop(iter);
match reader.tail_state() {
JournalTailState::CleanEnd
| JournalTailState::TruncatedHeader
| JournalTailState::TruncatedPayload
| JournalTailState::ChecksumMismatch => Ok(reader.position().0),
JournalTailState::BadMagic => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("journal at {:?} has bad magic at offset {} — refusing to open in direct mode", path, reader.position().0),
))),
JournalTailState::LengthOverflow => Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("journal at {:?} has frame length overflow at offset {} — refusing to open in direct mode", path, reader.position().0),
))),
}
}
#[cfg(target_os = "linux")]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
use std::os::fd::FromRawFd;
let path_cstr =
std::ffi::CString::new(path.as_os_str().to_string_lossy().as_bytes()).map_err(|_| {
Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"journal path contains a NUL byte",
))
})?;
let mut flags = libc::O_RDWR | libc::O_CREAT | libc::O_CLOEXEC | libc::O_DIRECT;
let fd = unsafe { libc::open(path_cstr.as_ptr(), flags, 0o600_i32) };
if fd >= 0 {
return Ok((unsafe { File::from_raw_fd(fd) }, true));
}
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::EINVAL) {
flags &= !libc::O_DIRECT;
let fd2 = unsafe { libc::open(path_cstr.as_ptr(), flags, 0o600_i32) };
if fd2 >= 0 {
return Ok((unsafe { File::from_raw_fd(fd2) }, false));
}
return Err(Error::Io(std::io::Error::last_os_error()));
}
Err(Error::Io(err))
}
#[cfg(target_os = "macos")]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
use std::os::unix::io::AsRawFd;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_NOCACHE, 1) };
Ok((file, ret == 0))
}
#[cfg(target_os = "windows")]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
use std::os::windows::ffi::OsStrExt;
use std::os::windows::io::FromRawHandle;
use windows_sys::Win32::Foundation::{GetLastError, INVALID_HANDLE_VALUE};
use windows_sys::Win32::Storage::FileSystem::{
CreateFileW, FILE_FLAG_NO_BUFFERING, FILE_FLAG_WRITE_THROUGH, FILE_GENERIC_READ,
FILE_GENERIC_WRITE, FILE_SHARE_READ, OPEN_ALWAYS,
};
let mut wide: Vec<u16> = path.as_os_str().encode_wide().collect();
wide.push(0);
let handle = unsafe {
CreateFileW(
wide.as_ptr(),
FILE_GENERIC_READ | FILE_GENERIC_WRITE,
FILE_SHARE_READ,
std::ptr::null(),
OPEN_ALWAYS,
FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH,
std::ptr::null_mut(),
)
};
if handle != INVALID_HANDLE_VALUE && !handle.is_null() {
return Ok((unsafe { File::from_raw_handle(handle as _) }, true));
}
let err_code = unsafe { GetLastError() };
if err_code == 87 {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
return Ok((file, false));
}
Err(Error::Io(std::io::Error::from_raw_os_error(
err_code as i32,
)))
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
fn open_direct_journal(path: &Path, _sector_size: u32) -> Result<(File, bool)> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(Error::Io)?;
Ok((file, false))
}
fn rehydrate_log_buffer(
buf: &LogBuffer,
file: &File,
sector_size: u32,
resume_lsn: u64,
) -> Result<()> {
let ss = sector_size as u64;
let last_sector_start = (resume_lsn / ss) * ss;
let in_sector_offset = (resume_lsn - last_sector_start) as usize;
if in_sector_offset == 0 {
buf.set_flush_pos_for_resume(resume_lsn, 0, &[]);
return Ok(());
}
let bytes = crate::platform::read_range(file, last_sector_start, sector_size as usize)?;
buf.set_flush_pos_for_resume(last_sector_start, in_sector_offset, &bytes);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn lsn_default_is_zero() {
assert_eq!(Lsn::default(), Lsn::ZERO);
assert_eq!(Lsn::ZERO.as_u64(), 0);
}
#[test]
fn lsn_display_format() {
assert_eq!(format!("{}", Lsn(42)), "Lsn(42)");
}
#[test]
fn lsn_ordering_matches_u64() {
assert!(Lsn(100) < Lsn(200));
assert!(Lsn(0) < Lsn(1));
assert_eq!(Lsn(42), Lsn(42));
}
fn tmp_path(tag: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!(
"fsys_journal_test_{}_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos(),
tag
))
}
struct Cleanup(std::path::PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
#[test]
fn open_creates_new_file_with_zero_lsn() {
let path = tmp_path("new");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
assert_eq!(j.next_lsn(), Lsn::ZERO);
assert_eq!(j.synced_lsn(), Lsn::ZERO);
}
#[test]
fn append_advances_lsn_by_framed_record_length() {
let path = tmp_path("append");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn1 = j.append(b"hello").expect("append1");
assert_eq!(lsn1, Lsn(5 + 12));
assert_eq!(j.next_lsn(), Lsn(17));
let lsn2 = j.append(b" world").expect("append2");
assert_eq!(lsn2, Lsn(17 + 6 + 12));
assert_eq!(j.next_lsn(), Lsn(35));
}
#[test]
fn append_empty_record_writes_framed_marker() {
let path = tmp_path("empty_record");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let _ = j.append(b"first").expect("first");
assert_eq!(j.next_lsn(), Lsn(5 + 12));
let lsn = j.append(b"").expect("empty");
assert_eq!(lsn, Lsn(17 + 12));
assert_eq!(j.next_lsn(), Lsn(29));
}
#[test]
fn append_one_byte_record_produces_13_byte_frame() {
let path = tmp_path("one_byte");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append(b"x").expect("append");
assert_eq!(lsn.as_u64(), 13, "1-byte payload + 12-byte overhead = 13");
assert_eq!(j.next_lsn().as_u64(), 13);
let lsn2 = j.append(b"y").expect("append 2");
assert_eq!(lsn2.as_u64(), 26, "two 1-byte frames = 26");
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader");
let payloads: Vec<Vec<u8>> = reader.iter().map(|r| r.expect("record").payload).collect();
assert_eq!(payloads, vec![b"x".to_vec(), b"y".to_vec()]);
}
#[test]
fn append_frame_at_exact_4kb_boundary_round_trips() {
let payload_len = 4096 - format::FRAME_OVERHEAD;
let payload = vec![0xC3u8; payload_len];
let path = tmp_path("exact_4kb");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append(&payload).expect("append");
assert_eq!(lsn.as_u64(), 4096, "frame total at exact 4 KiB boundary");
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader");
let rec = reader
.iter()
.next()
.expect("record present")
.expect("decode");
assert_eq!(rec.payload.len(), payload_len);
assert!(
rec.payload.iter().all(|&b| b == 0xC3),
"payload content drift at 4 KiB boundary"
);
}
#[test]
fn append_frame_at_exact_16kb_boundary_round_trips() {
let payload_len = 16384 - format::FRAME_OVERHEAD;
let payload = vec![0x5Au8; payload_len];
let path = tmp_path("exact_16kb");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append(&payload).expect("append");
assert_eq!(lsn.as_u64(), 16384, "frame total at exact 16 KiB boundary");
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader");
let rec = reader
.iter()
.next()
.expect("record present")
.expect("decode");
assert_eq!(rec.payload.len(), payload_len);
assert!(rec.payload.iter().all(|&b| b == 0x5A));
}
#[test]
fn append_frame_at_exact_64kb_boundary_round_trips() {
let payload_len = 65536 - format::FRAME_OVERHEAD;
let payload = vec![0xA5u8; payload_len];
let path = tmp_path("exact_64kb");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append(&payload).expect("append");
assert_eq!(lsn.as_u64(), 65536, "frame total at exact 64 KiB boundary");
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader");
let rec = reader
.iter()
.next()
.expect("record present")
.expect("decode");
assert_eq!(rec.payload.len(), payload_len);
assert!(rec.payload.iter().all(|&b| b == 0xA5));
}
#[test]
fn append_batch_single_one_byte_record_round_trips() {
let path = tmp_path("batch_one_byte");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let payload: &[u8] = b"\x42";
let lsn = j.append_batch(&[payload]).expect("append_batch");
assert_eq!(lsn.as_u64(), 13);
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader");
let rec = reader.iter().next().expect("present").expect("decode");
assert_eq!(rec.payload, vec![0x42]);
assert!(reader.iter().next().is_none(), "exactly one record");
}
#[test]
fn append_batch_only_empty_records_round_trips() {
let path = tmp_path("batch_empty_records");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let empties: [&[u8]; 4] = [b"", b"", b"", b""];
let lsn = j.append_batch(&empties).expect("append_batch");
assert_eq!(lsn.as_u64(), 4 * format::FRAME_OVERHEAD as u64);
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader");
let payloads: Vec<Vec<u8>> = reader.iter().map(|r| r.expect("decode").payload).collect();
assert_eq!(payloads.len(), 4, "all four empty records present");
for (i, p) in payloads.iter().enumerate() {
assert!(p.is_empty(), "record {i} should be empty");
}
}
#[test]
fn append_batch_mixed_empty_and_small_records_round_trips() {
let path = tmp_path("batch_mixed_empty");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let mixed: [&[u8]; 4] = [b"", b"a", b"", b"b"];
let lsn = j.append_batch(&mixed).expect("append_batch");
let expected_total =
(2 * format::FRAME_OVERHEAD as u64) + (2 * (1 + format::FRAME_OVERHEAD as u64));
assert_eq!(lsn.as_u64(), expected_total);
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader");
let payloads: Vec<Vec<u8>> = reader.iter().map(|r| r.expect("decode").payload).collect();
assert_eq!(
payloads,
vec![Vec::new(), b"a".to_vec(), Vec::new(), b"b".to_vec()]
);
}
#[test]
fn sync_through_zero_is_noop() {
let path = tmp_path("sync_zero");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
j.sync_through(Lsn::ZERO).expect("sync_through(0)");
assert_eq!(j.synced_lsn(), Lsn::ZERO);
}
#[test]
fn sync_through_advances_synced_lsn() {
let path = tmp_path("sync_advance");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append(b"durable").expect("append");
j.sync_through(lsn).expect("sync");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn append_then_reopen_resumes_at_existing_size() {
let path = tmp_path("resume");
let _g = Cleanup(path.clone());
{
let j = JournalHandle::open(&path).expect("open1");
let _ = j.append(b"persist this").expect("append");
j.close().expect("close");
}
let j2 = JournalHandle::open(&path).expect("reopen");
assert_eq!(j2.next_lsn(), Lsn(24));
assert_eq!(j2.synced_lsn(), Lsn(24));
}
#[test]
fn append_writes_framed_records_to_file() {
let path = tmp_path("readback");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let _ = j.append(b"alpha").expect("a1");
let _ = j.append(b"beta").expect("a2");
j.close().expect("close");
let bytes = std::fs::read(&path).expect("read");
assert_eq!(bytes.len(), 33);
match format::decode_frame(&bytes) {
format::FrameDecode::Ok {
consumed,
payload_start,
payload_end,
} => {
assert_eq!(consumed, 17);
assert_eq!(&bytes[payload_start..payload_end], b"alpha");
match format::decode_frame(&bytes[17..]) {
format::FrameDecode::Ok {
consumed,
payload_start,
payload_end,
} => {
assert_eq!(consumed, 16);
assert_eq!(&bytes[17 + payload_start..17 + payload_end], b"beta");
}
other => panic!("frame 2 decode failed: {other:?}"),
}
}
other => panic!("frame 1 decode failed: {other:?}"),
}
}
#[test]
fn append_batch_empty_is_noop() {
let path = tmp_path("batch_empty");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append_batch(&[]).expect("append_batch empty");
assert_eq!(lsn, Lsn::ZERO);
assert_eq!(j.next_lsn(), Lsn::ZERO);
let _ = j.append(b"first").expect("append");
let lsn2 = j.append_batch(&[]).expect("append_batch empty 2");
assert_eq!(lsn2, j.next_lsn());
}
#[test]
fn append_batch_single_record_matches_append() {
let path_a = tmp_path("batch_single_a");
let _ga = Cleanup(path_a.clone());
let j_a = JournalHandle::open(&path_a).expect("open a");
let lsn_a = j_a.append(b"singleton").expect("append");
j_a.close().expect("close a");
let bytes_a = std::fs::read(&path_a).expect("read a");
let path_b = tmp_path("batch_single_b");
let _gb = Cleanup(path_b.clone());
let j_b = JournalHandle::open(&path_b).expect("open b");
let payload: &[u8] = b"singleton";
let lsn_b = j_b.append_batch(&[payload]).expect("append_batch");
j_b.close().expect("close b");
let bytes_b = std::fs::read(&path_b).expect("read b");
assert_eq!(lsn_a, lsn_b);
assert_eq!(bytes_a, bytes_b);
}
#[test]
fn append_batch_multi_record_parity_with_append_loop() {
let payloads: Vec<&[u8]> = vec![b"alpha", b"beta", b"", b"gamma!", b"delta-payload-x"];
let path_loop = tmp_path("batch_parity_loop");
let _g1 = Cleanup(path_loop.clone());
let j_loop = JournalHandle::open(&path_loop).expect("open loop");
let mut last_loop = Lsn::ZERO;
for p in &payloads {
last_loop = j_loop.append(p).expect("append");
}
j_loop.close().expect("close loop");
let bytes_loop = std::fs::read(&path_loop).expect("read loop");
let path_batch = tmp_path("batch_parity_batch");
let _g2 = Cleanup(path_batch.clone());
let j_batch = JournalHandle::open(&path_batch).expect("open batch");
let last_batch = j_batch.append_batch(&payloads).expect("append_batch");
j_batch.close().expect("close batch");
let bytes_batch = std::fs::read(&path_batch).expect("read batch");
assert_eq!(last_loop, last_batch);
assert_eq!(bytes_loop, bytes_batch);
}
#[test]
fn append_batch_returns_end_lsn_of_last_record() {
let path = tmp_path("batch_lsn");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let p1: &[u8] = b"hello";
let p2: &[u8] = b"abcd";
let p3: &[u8] = b"journal";
let lsn = j.append_batch(&[p1, p2, p3]).expect("append_batch");
assert_eq!(lsn, Lsn(52));
assert_eq!(j.next_lsn(), Lsn(52));
}
#[test]
fn append_batch_decodes_back_via_journal_reader() {
let path = tmp_path("batch_readback");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let originals: Vec<Vec<u8>> = (0..32)
.map(|i| {
let mut v = vec![0u8; 64 + (i % 17)];
for (k, b) in v.iter_mut().enumerate() {
*b = ((i.wrapping_mul(31).wrapping_add(k)) & 0xFF) as u8;
}
v
})
.collect();
let refs: Vec<&[u8]> = originals.iter().map(|v| v.as_slice()).collect();
let _ = j.append_batch(&refs).expect("append_batch");
j.close().expect("close");
let mut reader = JournalReader::open(&path).expect("reader open");
let mut decoded: Vec<Vec<u8>> = Vec::new();
let mut iter = reader.iter();
while let Some(rec) = iter.next().transpose().expect("decode") {
decoded.push(rec.payload.to_vec());
}
assert_eq!(decoded, originals);
}
#[test]
fn append_batch_oversize_record_rejected() {
let path = tmp_path("batch_oversize_smoke");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let p: &[u8] = b"normal";
let lsn = j.append_batch(&[p]).expect("append_batch");
assert_eq!(lsn, Lsn(6 + 12));
}
#[test]
fn append_batch_concurrent_appenders_serialise_via_atomic() {
use std::sync::Arc;
let path = tmp_path("batch_concurrent");
let _g = Cleanup(path.clone());
let j = Arc::new(JournalHandle::open(&path).expect("open"));
let p_a: Vec<Vec<u8>> = (0..8).map(|i| format!("a{i:02}").into_bytes()).collect();
let p_b: Vec<Vec<u8>> = (0..8).map(|i| format!("b{i:02}").into_bytes()).collect();
let j_a = j.clone();
let p_a_clone = p_a.clone();
let h_a = std::thread::spawn(move || {
let refs: Vec<&[u8]> = p_a_clone.iter().map(|v| v.as_slice()).collect();
j_a.append_batch(&refs).expect("batch a")
});
let j_b = j.clone();
let p_b_clone = p_b.clone();
let h_b = std::thread::spawn(move || {
let refs: Vec<&[u8]> = p_b_clone.iter().map(|v| v.as_slice()).collect();
j_b.append_batch(&refs).expect("batch b")
});
let _ = h_a.join().expect("join a");
let _ = h_b.join().expect("join b");
j.sync_through(j.next_lsn()).expect("sync");
drop(j);
let mut reader = JournalReader::open(&path).expect("reader");
let mut decoded: Vec<Vec<u8>> = Vec::new();
let mut iter = reader.iter();
while let Some(rec) = iter.next().transpose().expect("decode") {
decoded.push(rec.payload.to_vec());
}
assert_eq!(decoded.len(), 16);
let bytes_a: Vec<Vec<u8>> = p_a;
let bytes_b: Vec<Vec<u8>> = p_b;
let pos_a = decoded
.iter()
.position(|r| r == &bytes_a[0])
.expect("first a in stream");
for (k, original) in bytes_a.iter().enumerate() {
assert_eq!(&decoded[pos_a + k], original);
}
let pos_b = decoded
.iter()
.position(|r| r == &bytes_b[0])
.expect("first b in stream");
for (k, original) in bytes_b.iter().enumerate() {
assert_eq!(&decoded[pos_b + k], original);
}
}
#[test]
fn append_batch_resume_after_close_reopen() {
let path = tmp_path("batch_resume");
let _g = Cleanup(path.clone());
{
let j = JournalHandle::open(&path).expect("open1");
let p: &[&[u8]] = &[b"persist1", b"persist2", b"persist3"];
let _ = j.append_batch(p).expect("batch");
j.close().expect("close");
}
let j2 = JournalHandle::open(&path).expect("reopen");
assert_eq!(j2.next_lsn(), Lsn(60));
let next = j2.append(b"after").expect("append after batch");
assert_eq!(next, Lsn(60 + 5 + 12));
}
#[test]
fn group_commit_window_none_disables_batching() {
let path = tmp_path("gc_window_none");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new().group_commit_window(None);
let j = JournalHandle::open_with_options(&path, opts).expect("open");
let lsn = j.append(b"x").expect("append");
j.sync_through(lsn).expect("sync");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn group_commit_window_some_succeeds() {
let path = tmp_path("gc_window_some");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new()
.group_commit_window(Some(Duration::from_micros(100)))
.group_commit_max_batch(4);
let j = JournalHandle::open_with_options(&path, opts).expect("open");
let lsn = j.append(b"y").expect("append");
j.sync_through(lsn).expect("sync");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn group_commit_follower_promoted_when_target_above_leader_frontier() {
use std::sync::Arc;
let path = tmp_path("gc_follower_promote");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new().group_commit_max_batch(4);
let j = Arc::new(JournalHandle::open_with_options(&path, opts).expect("open"));
let mut handles = Vec::new();
for thread_id in 0..16u32 {
let j = j.clone();
handles.push(std::thread::spawn(move || {
for round in 0..8u32 {
let payload = format!("t{thread_id:02}r{round:02}");
let lsn = j.append(payload.as_bytes()).expect("append");
j.sync_through(lsn).expect("sync");
assert!(j.synced_lsn() >= lsn);
}
}));
}
for h in handles {
h.join().expect("join");
}
}
#[test]
fn group_commit_idempotent_when_already_synced() {
let path = tmp_path("gc_idempotent");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
let lsn = j.append(b"sync me").expect("append");
j.sync_through(lsn).expect("first sync");
j.sync_through(lsn).expect("second sync");
j.sync_through(Lsn::ZERO).expect("zero sync");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn group_commit_wake_stampede_64_followers() {
use std::sync::{Arc, Barrier};
const FOLLOWER_COUNT: usize = 64;
const BUDGET: Duration = Duration::from_secs(60);
let path = tmp_path("gc_wake_stampede_64");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new()
.group_commit_window(Some(Duration::from_micros(100)))
.group_commit_max_batch(FOLLOWER_COUNT as u32);
let j = Arc::new(JournalHandle::open_with_options(&path, opts).expect("open"));
let target_lsn = j.append(b"stampede-target-record").expect("append");
let gate = Arc::new(Barrier::new(FOLLOWER_COUNT));
let start = Instant::now();
let mut handles = Vec::with_capacity(FOLLOWER_COUNT);
for _ in 0..FOLLOWER_COUNT {
let j = j.clone();
let gate = gate.clone();
handles.push(std::thread::spawn(move || {
let _ = gate.wait();
j.sync_through(target_lsn).expect("follower sync");
assert!(
j.synced_lsn() >= target_lsn,
"follower returned but durable frontier still below target",
);
}));
}
for h in handles {
h.join().expect("follower thread join");
}
let elapsed = start.elapsed();
assert!(
elapsed < BUDGET,
"wake-stampede 64-follower test exceeded {BUDGET:?} budget: {elapsed:?} — \
possible missed-wakeup regression",
);
assert_eq!(
j.group_commit.pending_followers.load(Ordering::Acquire),
0,
"pending_followers leaked a count — increment/decrement asymmetry",
);
assert!(j.synced_lsn() >= target_lsn);
}
#[test]
fn group_commit_batching_with_8_threads() {
use std::sync::Arc;
let path = tmp_path("gc_eight_threads");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new()
.group_commit_window(Some(Duration::from_micros(500)))
.group_commit_max_batch(8);
let j = Arc::new(JournalHandle::open_with_options(&path, opts).expect("open"));
let start = Instant::now();
let mut handles = Vec::new();
for thread_id in 0..8u32 {
let j = j.clone();
handles.push(std::thread::spawn(move || {
for record in 0..50u32 {
let payload = format!("th{thread_id:02}rc{record:04}");
let lsn = j.append(payload.as_bytes()).expect("append");
j.sync_through(lsn).expect("sync");
}
}));
}
for h in handles {
h.join().expect("join");
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(30),
"group-commit harness exceeded 30s wall budget: {elapsed:?}"
);
assert!(j.synced_lsn().0 > 0);
}
#[test]
#[ignore = "manual sanity bench; run with --ignored --nocapture"]
#[allow(clippy::print_stdout)] fn append_batch_sanity_bench() {
const N: usize = 10_000;
const PAYLOAD_LEN: usize = 150;
const ROUNDS: usize = 5;
let payload = vec![0xABu8; PAYLOAD_LEN];
let mut best_loop = std::time::Duration::from_secs(u64::MAX);
let mut best_batch = std::time::Duration::from_secs(u64::MAX);
for round in 0..ROUNDS {
let path_loop = tmp_path(&format!("bench_loop_{round}"));
let _g1 = Cleanup(path_loop.clone());
let j_loop = JournalHandle::open(&path_loop).expect("open");
let t0 = Instant::now();
for _ in 0..N {
let _ = j_loop.append(&payload).expect("append");
}
j_loop.sync_through(j_loop.next_lsn()).expect("sync");
let dur_loop = t0.elapsed();
if dur_loop < best_loop {
best_loop = dur_loop;
}
drop(j_loop);
let path_batch = tmp_path(&format!("bench_batch_{round}"));
let _g2 = Cleanup(path_batch.clone());
let j_batch = JournalHandle::open(&path_batch).expect("open");
let refs: Vec<&[u8]> = std::iter::repeat(payload.as_slice()).take(N).collect();
let t0 = Instant::now();
let _ = j_batch.append_batch(&refs).expect("batch");
j_batch.sync_through(j_batch.next_lsn()).expect("sync");
let dur_batch = t0.elapsed();
if dur_batch < best_batch {
best_batch = dur_batch;
}
drop(j_batch);
}
let speedup = best_loop.as_secs_f64() / best_batch.as_secs_f64();
println!("append loop ({N}× {PAYLOAD_LEN} B), best of {ROUNDS}: {best_loop:?}");
println!("append_batch ({N}× {PAYLOAD_LEN} B), best of {ROUNDS}: {best_batch:?}");
println!("speedup: {speedup:.2}×");
assert!(
speedup > 1.2,
"append_batch must be at least 1.2× faster than append-loop; got {speedup:.2}×"
);
}
#[test]
fn observer_fires_on_append_and_sync() {
use crate::observer::{FsysObserver, JournalAppendEvent, JournalSyncEvent};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
#[derive(Debug, Default)]
struct Counts {
append_calls: AtomicU64,
append_records: AtomicU64,
append_bytes: AtomicU64,
sync_calls: AtomicU64,
last_durable_lsn: AtomicU64,
}
impl FsysObserver for Counts {
fn on_journal_append(&self, e: JournalAppendEvent) {
let _ = self.append_calls.fetch_add(1, Ordering::Relaxed);
let _ = self
.append_records
.fetch_add(u64::from(e.records), Ordering::Relaxed);
let _ = self
.append_bytes
.fetch_add(e.bytes_written, Ordering::Relaxed);
}
fn on_journal_sync(&self, e: JournalSyncEvent) {
let _ = self.sync_calls.fetch_add(1, Ordering::Relaxed);
self.last_durable_lsn
.store(e.durable_lsn, Ordering::Relaxed);
}
}
let path = tmp_path("observer_e2e");
let _g = Cleanup(path.clone());
let counts = Arc::new(Counts::default());
let fs = crate::builder()
.observer(counts.clone() as Arc<dyn FsysObserver>)
.build()
.expect("build");
let log = fs.journal(&path).expect("journal");
let _ = log.append(b"alpha").expect("a1");
let _ = log.append(b"beta").expect("a2");
let _ = log.append(b"gamma").expect("a3");
let last = log
.append_batch(&[b"delta" as &[u8], b"epsilon", b"zeta"])
.expect("batch");
log.sync_through(last).expect("sync");
assert_eq!(counts.append_calls.load(Ordering::Relaxed), 4);
assert_eq!(counts.append_records.load(Ordering::Relaxed), 6);
assert_eq!(counts.append_bytes.load(Ordering::Relaxed), 102);
assert_eq!(counts.sync_calls.load(Ordering::Relaxed), 1);
assert_eq!(counts.last_durable_lsn.load(Ordering::Relaxed), last.0);
}
#[test]
fn observer_no_op_when_handle_built_without_observer() {
let path = tmp_path("observer_absent");
let _g = Cleanup(path.clone());
let j = JournalHandle::open(&path).expect("open");
assert!(j.observer.is_none());
let _ = j.append(b"unobserved").expect("append");
let _ = j.append_batch(&[b"a" as &[u8], b"b", b"c"]).expect("batch");
j.sync_through(j.next_lsn()).expect("sync");
}
#[test]
fn sync_mode_full_round_trips_through_journal() {
let path = tmp_path("sync_mode_full");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new().sync_mode(options::SyncMode::Full);
let j = JournalHandle::open_with_options(&path, opts).expect("open");
let lsn = j.append(b"full-sync payload").expect("append");
j.sync_through(lsn).expect("sync_through full");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn sync_mode_barrier_round_trips_through_journal() {
let path = tmp_path("sync_mode_barrier");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new().sync_mode(options::SyncMode::Barrier);
let j = JournalHandle::open_with_options(&path, opts).expect("open");
let lsn = j.append(b"barrier-sync payload").expect("append");
j.sync_through(lsn).expect("sync_through barrier");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn write_lifetime_hint_open_succeeds_for_every_variant() {
for hint in [
options::WriteLifetimeHint::Short,
options::WriteLifetimeHint::Medium,
options::WriteLifetimeHint::Long,
options::WriteLifetimeHint::Extreme,
] {
let path = tmp_path(&format!("rw_hint_{hint:?}"));
let _g = Cleanup(path.clone());
let opts = JournalOptions::new().write_lifetime_hint(Some(hint));
let j = JournalHandle::open_with_options(&path, opts)
.expect("open with write_lifetime_hint must succeed");
let lsn = j.append(b"hint payload").expect("append");
j.sync_through(lsn).expect("sync_through");
assert!(j.synced_lsn() >= lsn);
}
}
#[test]
fn sync_mode_and_lifetime_hint_compose() {
let path = tmp_path("sync_and_hint");
let _g = Cleanup(path.clone());
let opts = JournalOptions::new()
.sync_mode(options::SyncMode::Barrier)
.write_lifetime_hint(Some(options::WriteLifetimeHint::Long));
let j = JournalHandle::open_with_options(&path, opts).expect("open");
let lsn = j.append(b"compose payload").expect("append");
j.sync_through(lsn).expect("sync_through");
assert!(j.synced_lsn() >= lsn);
}
#[test]
fn group_commit_concurrent_sync_through() {
use std::sync::Arc;
let path = tmp_path("group_commit");
let _g = Cleanup(path.clone());
let j = Arc::new(JournalHandle::open(&path).expect("open"));
let mut lsns = Vec::new();
for i in 0..32 {
let lsn = j.append(format!("rec {i:04}").as_bytes()).expect("append");
lsns.push(lsn);
}
let mut handles = Vec::new();
for lsn in &lsns {
let j = j.clone();
let lsn = *lsn;
handles.push(std::thread::spawn(move || {
j.sync_through(lsn).expect("sync_through")
}));
}
for h in handles {
h.join().expect("join");
}
assert!(j.synced_lsn() >= *lsns.last().unwrap());
}
}