#![allow(dead_code)]
use crate::journal::format;
use crate::platform::{round_up, AlignedBuf};
use crate::{Error, Result};
use parking_lot::{Condvar, Mutex};
use std::cell::UnsafeCell;
use std::fs::File;
pub(crate) struct LogBuffer {
#[cfg(target_os = "linux")]
iouring: Option<IouringFlushState>,
bufs: [UnsafeCell<AlignedBuf>; 2],
capacity: usize,
sector_size: usize,
state: Mutex<State>,
flush_done: Condvar,
}
#[cfg(target_os = "linux")]
struct IouringFlushState {
ring: crate::platform::linux_iouring::IoUringRing,
}
unsafe impl Send for LogBuffer {}
unsafe impl Sync for LogBuffer {}
#[derive(Debug)]
struct State {
active_idx: u8,
active_len: usize,
active_flush_pos: u64,
flushing: Option<u8>,
}
impl LogBuffer {
pub(crate) fn new(capacity_per_slot: u32, sector_size: u32, flush_pos: u64) -> Result<Self> {
let ss = sector_size as usize;
debug_assert!(ss.is_power_of_two(), "sector_size must be a power of two");
debug_assert!(
flush_pos % sector_size as u64 == 0,
"flush_pos must be sector-aligned"
);
let cap = round_up(capacity_per_slot as usize, ss).max(ss);
let buf0 = AlignedBuf::new(cap, ss)?;
let buf1 = AlignedBuf::new(cap, ss)?;
let bufs = [UnsafeCell::new(buf0), UnsafeCell::new(buf1)];
#[cfg(target_os = "linux")]
let iouring = Self::try_init_iouring(&bufs, cap);
Ok(Self {
#[cfg(target_os = "linux")]
iouring,
bufs,
capacity: cap,
sector_size: ss,
state: Mutex::new(State {
active_idx: 0,
active_len: 0,
active_flush_pos: flush_pos,
flushing: None,
}),
flush_done: Condvar::new(),
})
}
#[cfg(target_os = "linux")]
fn try_init_iouring(
bufs: &[UnsafeCell<AlignedBuf>; 2],
cap: usize,
) -> Option<IouringFlushState> {
let ring = crate::platform::linux_iouring::IoUringRing::new(8, None).ok()?;
let iovs: Vec<(usize, usize)> = bufs
.iter()
.map(|cell| {
let buf = unsafe { (*cell.get()).as_slice() };
debug_assert_eq!(buf.len(), cap);
(buf.as_ptr() as usize, buf.len())
})
.collect();
ring.register_buffers(&iovs).ok()?;
Some(IouringFlushState { ring })
}
#[inline]
pub(crate) fn next_lsn(&self) -> u64 {
let state = self.state.lock();
state.active_flush_pos + state.active_len as u64
}
#[inline]
pub(crate) fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub(crate) fn flushed_through(&self) -> u64 {
self.state.lock().active_flush_pos
}
#[inline]
pub(crate) fn buffered_len(&self) -> usize {
self.state.lock().active_len
}
pub(crate) fn try_append_frames_batched(
&self,
records: &[&[u8]],
total_encoded_size: usize,
) -> Result<Option<(u64, u64)>> {
if records.is_empty() {
return Ok(Some((0, 0)));
}
let mut state = self.state.lock();
let remaining = self.capacity.saturating_sub(state.active_len);
if total_encoded_size > remaining {
return Ok(None);
}
let active_idx = state.active_idx as usize;
let offset_start = state.active_len;
let start_lsn = state.active_flush_pos + offset_start as u64;
let mut cursor = offset_start;
unsafe {
let slice = (*self.bufs[active_idx].get()).as_mut_slice();
for record in records {
let frame_size = record
.len()
.checked_add(format::FRAME_OVERHEAD)
.ok_or_else(|| {
Error::Io(std::io::Error::other("batched frame size overflow"))
})?;
let _ = format::encode_frame_into(record, &mut slice[cursor..cursor + frame_size])?;
cursor += frame_size;
}
}
state.active_len = cursor;
let end_lsn = state.active_flush_pos + cursor as u64;
Ok(Some((start_lsn, end_lsn)))
}
pub(crate) fn append_frame(&self, file: &File, payload: &[u8]) -> Result<(u64, u64)> {
let frame = format::encode_frame_owned(payload)?;
let frame_size = frame.len();
loop {
let mut state = self.state.lock();
if state.active_len + frame_size <= self.capacity {
let active_idx = state.active_idx as usize;
let offset = state.active_len;
let start = state.active_flush_pos + offset as u64;
let end = start + frame_size as u64;
unsafe {
let slice = (*self.bufs[active_idx].get()).as_mut_slice();
slice[offset..offset + frame_size].copy_from_slice(&frame);
}
state.active_len += frame_size;
return Ok((start, end));
}
if state.flushing.is_some() {
self.flush_done.wait(&mut state);
continue;
}
let old_idx = state.active_idx;
let old_len = state.active_len;
let old_flush_pos = state.active_flush_pos;
let new_idx = old_idx ^ 1;
if old_len > 0 {
state.active_idx = new_idx;
state.active_len = 0;
state.active_flush_pos = old_flush_pos
.checked_add(self.capacity as u64)
.ok_or_else(|| Error::Io(std::io::Error::other("flush_pos overflow")))?;
state.flushing = Some(old_idx);
drop(state);
let flush_result = unsafe {
let slice = (*self.bufs[old_idx as usize].get()).as_slice();
self.flush_slot_to_disk(file, old_idx, slice, old_flush_pos)
};
{
let mut state = self.state.lock();
unsafe {
let slice = (*self.bufs[old_idx as usize].get()).as_mut_slice();
slice.fill(0);
}
state.flushing = None;
let _ = self.flush_done.notify_all();
}
flush_result?;
continue;
}
debug_assert_eq!(old_len, 0);
debug_assert!(frame_size > self.capacity);
let start = old_flush_pos;
let aligned = round_up(frame_size, self.sector_size);
let mut scratch = AlignedBuf::new(aligned, self.sector_size)?;
scratch.as_mut_slice()[..frame_size].copy_from_slice(&frame);
let end = start + frame_size as u64;
let new_flush_pos = (end / self.sector_size as u64) * self.sector_size as u64;
let tail = (end - new_flush_pos) as usize;
drop(state);
crate::platform::write_at_direct(file, start, scratch.as_slice())?;
{
let mut state = self.state.lock();
state.active_flush_pos = new_flush_pos;
if tail > 0 {
let scratch_offset = aligned - self.sector_size;
let in_sector = (frame_size - scratch_offset).min(self.sector_size);
let active_idx = state.active_idx as usize;
unsafe {
let slice = (*self.bufs[active_idx].get()).as_mut_slice();
slice[..in_sector].copy_from_slice(
&scratch.as_slice()[scratch_offset..scratch_offset + in_sector],
);
for b in &mut slice[in_sector..self.sector_size] {
*b = 0;
}
}
state.active_len = tail;
}
}
return Ok((start, end));
}
}
pub(crate) fn flush_partial(&self, file: &File) -> Result<()> {
let mut state = self.state.lock();
while state.flushing.is_some() {
self.flush_done.wait(&mut state);
}
if state.active_len == 0 {
return Ok(()); }
let aligned = round_up(state.active_len, self.sector_size);
let active_idx = state.active_idx as usize;
let active_flush_pos = state.active_flush_pos;
unsafe {
let slice = (*self.bufs[active_idx].get()).as_slice();
self.flush_slot_to_disk(file, active_idx as u8, &slice[..aligned], active_flush_pos)?;
}
Ok(())
}
#[cfg_attr(not(target_os = "linux"), allow(unused_variables))]
fn flush_slot_to_disk(
&self,
file: &File,
slot_idx: u8,
slice: &[u8],
offset: u64,
) -> Result<()> {
#[cfg(target_os = "linux")]
if let Some(iouring) = self.iouring.as_ref() {
use std::os::fd::AsRawFd;
let fd = file.as_raw_fd();
debug_assert!(slot_idx < 2);
let written = iouring
.ring
.write_at_fixed(fd, slot_idx as u16, slice, offset)?;
if written != slice.len() {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"iouring write_at_fixed returned short count",
)));
}
return Ok(());
}
crate::platform::write_at_direct(file, offset, slice)
}
pub(crate) fn set_flush_pos_for_resume(
&self,
flush_pos: u64,
in_sector_offset: usize,
prefix_bytes: &[u8],
) {
let mut state = self.state.lock();
debug_assert_eq!(state.active_len, 0, "rehydrate must run on a fresh buffer");
debug_assert!(
flush_pos % self.sector_size as u64 == 0,
"flush_pos must be sector-aligned"
);
debug_assert!(
state.flushing.is_none(),
"rehydrate must run before any flush has started"
);
state.active_flush_pos = flush_pos;
if in_sector_offset > 0 {
let copy_len = in_sector_offset
.min(prefix_bytes.len())
.min(self.sector_size);
let active_idx = state.active_idx as usize;
unsafe {
let slice = (*self.bufs[active_idx].get()).as_mut_slice();
slice[..copy_len].copy_from_slice(&prefix_bytes[..copy_len]);
}
state.active_len = copy_len;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::OpenOptions;
use std::io::Read;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
static C: AtomicU32 = AtomicU32::new(0);
fn tmp_path(tag: &str) -> PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("fsys_logbuf_{}_{}_{tag}", std::process::id(), n))
}
struct Cleanup(PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
fn make_file() -> (PathBuf, File, Cleanup) {
let path = tmp_path("logbuf");
let f = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&path)
.unwrap();
(path.clone(), f, Cleanup(path))
}
#[test]
fn new_buffer_is_aligned_and_zeroed() {
let buf = LogBuffer::new(4096, 512, 0).unwrap();
assert_eq!(buf.capacity(), 4096);
assert_eq!(buf.next_lsn(), 0);
assert_eq!(buf.buffered_len(), 0);
}
#[test]
fn append_frame_fits_in_active_slot() {
let (path, file, _g) = make_file();
let buf = LogBuffer::new(4096, 512, 0).unwrap();
let (start, end) = buf.append_frame(&file, b"hello").unwrap();
assert_eq!(start, 0);
assert_eq!(end, 5 + format::FRAME_OVERHEAD as u64);
assert_eq!(buf.next_lsn(), end);
let on_disk = std::fs::read(&path).unwrap();
assert!(on_disk.is_empty());
}
#[test]
fn active_full_triggers_rotation_and_real_flush() {
let (path, file, _g) = make_file();
let buf = LogBuffer::new(4096, 512, 0).unwrap();
let payload = vec![0xABu8; 100];
for _ in 0..36 {
let _ = buf.append_frame(&file, &payload).unwrap();
}
assert_eq!(buf.flushed_through(), 0);
let _ = buf.append_frame(&file, &payload).unwrap();
assert_eq!(buf.flushed_through(), 4096);
let mut f = OpenOptions::new().read(true).open(&path).unwrap();
let mut bytes = Vec::new();
let _ = f.read_to_end(&mut bytes).unwrap();
assert_eq!(bytes.len(), 4096);
}
#[test]
fn flush_partial_writes_records_plus_zero_pad() {
let (path, file, _g) = make_file();
let buf = LogBuffer::new(4096, 512, 0).unwrap();
let _ = buf.append_frame(&file, b"x").unwrap(); buf.flush_partial(&file).unwrap();
let mut f = OpenOptions::new().read(true).open(&path).unwrap();
let mut bytes = Vec::new();
let _ = f.read_to_end(&mut bytes).unwrap();
assert_eq!(bytes.len(), 512);
assert!(bytes[13..].iter().all(|&b| b == 0));
assert_eq!(buf.flushed_through(), 0);
}
#[test]
fn oversize_record_writes_standalone_with_tail_carryover() {
let (path, file, _g) = make_file();
let buf = LogBuffer::new(4096, 512, 0).unwrap();
let payload = vec![0xCDu8; 5000];
let (start, end) = buf.append_frame(&file, &payload).unwrap();
assert_eq!(start, 0);
assert_eq!(end, 5012);
assert_eq!(buf.flushed_through(), 4608);
assert_eq!(buf.buffered_len(), 404);
let mut f = OpenOptions::new().read(true).open(&path).unwrap();
let mut bytes = Vec::new();
let _ = f.read_to_end(&mut bytes).unwrap();
assert_eq!(bytes.len(), 5120);
}
#[test]
fn rotation_alternates_active_slot_indices() {
let (_path, file, _g) = make_file();
let buf = LogBuffer::new(512, 512, 0).unwrap();
let payload = [0xAAu8; 4];
for _ in 0..(32 * 4 + 1) {
let _ = buf.append_frame(&file, &payload).unwrap();
}
assert_eq!(buf.flushed_through(), 2048);
}
#[test]
fn concurrent_appends_during_flush_dont_block_on_syscall() {
use std::sync::Arc;
let (path, file, _g) = make_file();
let buf = Arc::new(LogBuffer::new(4096, 512, 0).unwrap());
let file = Arc::new(file);
let n_threads = 8usize;
let per_thread = 200usize;
let payload_size = 24usize; let payload = vec![0xBBu8; payload_size];
let mut handles = Vec::with_capacity(n_threads);
for _ in 0..n_threads {
let buf = Arc::clone(&buf);
let file = Arc::clone(&file);
let payload = payload.clone();
handles.push(std::thread::spawn(move || {
for _ in 0..per_thread {
let _ = buf.append_frame(&file, &payload).expect("append");
}
}));
}
for h in handles {
h.join().expect("join");
}
buf.flush_partial(&file).expect("partial flush");
let total_bytes = (n_threads * per_thread * 36) as u64;
let on_disk_len = std::fs::metadata(&path).unwrap().len();
assert!(
on_disk_len >= total_bytes - 4096,
"on-disk {} < total {}",
on_disk_len,
total_bytes
);
}
#[test]
fn flush_partial_waits_for_in_flight_rotation_flush() {
let (path, file, _g) = make_file();
let buf = LogBuffer::new(512, 512, 0).unwrap();
let payload = [0xCCu8; 4];
for _ in 0..32 {
let _ = buf.append_frame(&file, &payload).unwrap();
}
let _ = buf.append_frame(&file, &payload).unwrap();
buf.flush_partial(&file).unwrap();
let on_disk_len = std::fs::metadata(&path).unwrap().len();
assert_eq!(on_disk_len, 1024);
}
#[test]
fn back_to_back_rotations_after_sustained_load() {
let (_path, file, _g) = make_file();
let buf = LogBuffer::new(4096, 512, 0).unwrap();
let payload = vec![0xDDu8; 100]; for _ in 0..(36 * 5 + 1) {
let _ = buf.append_frame(&file, &payload).unwrap();
}
assert_eq!(buf.flushed_through(), 20480);
}
}