#![allow(dead_code)]
use crate::journal::format;
use crate::platform::{round_up, AlignedBuf};
use crate::{Error, Result};
use std::fs::File;
pub(crate) struct LogBuffer {
buf: AlignedBuf,
len: usize,
flush_pos: u64,
sector_size: usize,
}
impl LogBuffer {
pub(crate) fn new(capacity: u32, sector_size: u32, flush_pos: u64) -> Result<Self> {
let ss = sector_size as usize;
debug_assert!(ss.is_power_of_two(), "sector_size must be a power of two");
debug_assert!(
flush_pos % sector_size as u64 == 0,
"flush_pos must be sector-aligned"
);
let cap = round_up(capacity as usize, ss).max(ss);
let buf = AlignedBuf::new(cap, ss)?;
Ok(Self {
buf,
len: 0,
flush_pos,
sector_size: ss,
})
}
#[inline]
pub(crate) fn next_lsn(&self) -> u64 {
self.flush_pos + self.len as u64
}
#[inline]
pub(crate) fn capacity(&self) -> usize {
self.buf.len
}
#[inline]
pub(crate) fn flushed_through(&self) -> u64 {
self.flush_pos
}
#[inline]
pub(crate) fn buffered_len(&self) -> usize {
self.len
}
pub(crate) fn append_frame(&mut self, file: &File, payload: &[u8]) -> Result<(u64, u64)> {
let frame_size = payload.len() + format::FRAME_OVERHEAD;
let frame = format::encode_frame_owned(payload)?;
debug_assert_eq!(frame.len(), frame_size);
if self.len + frame_size <= self.buf.len {
let start = self.flush_pos + self.len as u64;
let end = start + frame_size as u64;
self.buf.as_mut_slice()[self.len..self.len + frame_size].copy_from_slice(&frame);
self.len += frame_size;
return Ok((start, end));
}
if self.len > 0 {
self.flush_full(file)?;
}
debug_assert_eq!(self.len, 0);
if frame_size <= self.buf.len {
let start = self.flush_pos;
let end = start + frame_size as u64;
self.buf.as_mut_slice()[..frame_size].copy_from_slice(&frame);
self.len = frame_size;
Ok((start, end))
} else {
let aligned = round_up(frame_size, self.sector_size);
let mut scratch = AlignedBuf::new(aligned, self.sector_size)?;
scratch.as_mut_slice()[..frame_size].copy_from_slice(&frame);
let start = self.flush_pos;
crate::platform::write_at_direct(file, self.flush_pos, scratch.as_slice())?;
let end = start + frame_size as u64;
let new_flush_pos = (end / self.sector_size as u64) * self.sector_size as u64;
self.flush_pos = new_flush_pos;
let tail = (end - new_flush_pos) as usize;
if tail > 0 {
let scratch_offset = aligned - self.sector_size;
let in_sector = (frame_size - scratch_offset).min(self.sector_size);
self.buf.as_mut_slice()[..in_sector].copy_from_slice(
&scratch.as_slice()[scratch_offset..scratch_offset + in_sector],
);
for b in &mut self.buf.as_mut_slice()[in_sector..self.sector_size] {
*b = 0;
}
self.len = tail;
}
Ok((start, end))
}
}
fn flush_full(&mut self, file: &File) -> Result<()> {
let cap = self.buf.len;
crate::platform::write_at_direct(file, self.flush_pos, &self.buf.as_slice()[..cap])?;
self.flush_pos = self
.flush_pos
.checked_add(cap as u64)
.ok_or_else(|| Error::Io(std::io::Error::other("flush_pos overflow")))?;
self.len = 0;
for b in self.buf.as_mut_slice().iter_mut() {
*b = 0;
}
Ok(())
}
pub(crate) fn set_flush_pos_for_resume(
&mut self,
flush_pos: u64,
in_sector_offset: usize,
prefix_bytes: &[u8],
) {
debug_assert_eq!(self.len, 0, "rehydrate must run on a fresh buffer");
debug_assert!(
flush_pos % self.sector_size as u64 == 0,
"flush_pos must be sector-aligned"
);
self.flush_pos = flush_pos;
if in_sector_offset > 0 {
let copy_len = in_sector_offset
.min(prefix_bytes.len())
.min(self.sector_size);
self.buf.as_mut_slice()[..copy_len].copy_from_slice(&prefix_bytes[..copy_len]);
self.len = copy_len;
}
}
pub(crate) fn flush_partial(&mut self, file: &File) -> Result<()> {
if self.len == 0 {
return Ok(()); }
let aligned = round_up(self.len, self.sector_size);
crate::platform::write_at_direct(file, self.flush_pos, &self.buf.as_slice()[..aligned])?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::OpenOptions;
use std::io::Read;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, Ordering};
static C: AtomicU32 = AtomicU32::new(0);
fn tmp_path(tag: &str) -> PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("fsys_logbuf_{}_{}_{tag}", std::process::id(), n))
}
struct Cleanup(PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
fn make_file() -> (PathBuf, File, Cleanup) {
let path = tmp_path("logbuf");
let f = OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(&path)
.unwrap();
(path.clone(), f, Cleanup(path))
}
#[test]
fn new_buffer_is_aligned_and_zeroed() {
let buf = LogBuffer::new(4096, 512, 0).unwrap();
assert_eq!(buf.capacity(), 4096);
assert_eq!(buf.next_lsn(), 0);
assert_eq!(buf.buffered_len(), 0);
}
#[test]
fn append_frame_fits_in_buffer() {
let (path, file, _g) = make_file();
let mut buf = LogBuffer::new(4096, 512, 0).unwrap();
let (start, end) = buf.append_frame(&file, b"hello").unwrap();
assert_eq!(start, 0);
assert_eq!(end, 5 + format::FRAME_OVERHEAD as u64);
assert_eq!(buf.next_lsn(), end);
let on_disk = std::fs::read(&path).unwrap();
assert!(on_disk.is_empty());
}
#[test]
fn buffer_full_triggers_real_flush_and_advances_flush_pos() {
let (path, file, _g) = make_file();
let mut buf = LogBuffer::new(4096, 512, 0).unwrap();
let payload = vec![0xABu8; 100];
for _ in 0..36 {
let _ = buf.append_frame(&file, &payload).unwrap();
}
assert_eq!(buf.flushed_through(), 0); let _ = buf.append_frame(&file, &payload).unwrap();
assert_eq!(buf.flushed_through(), 4096);
let mut f = OpenOptions::new().read(true).open(&path).unwrap();
let mut bytes = Vec::new();
let _ = f.read_to_end(&mut bytes).unwrap();
assert_eq!(bytes.len(), 4096);
}
#[test]
fn flush_partial_writes_records_plus_zero_pad() {
let (path, file, _g) = make_file();
let mut buf = LogBuffer::new(4096, 512, 0).unwrap();
let _ = buf.append_frame(&file, b"x").unwrap(); buf.flush_partial(&file).unwrap();
let mut f = OpenOptions::new().read(true).open(&path).unwrap();
let mut bytes = Vec::new();
let _ = f.read_to_end(&mut bytes).unwrap();
assert_eq!(bytes.len(), 512);
assert!(bytes[13..].iter().all(|&b| b == 0));
assert_eq!(buf.flushed_through(), 0);
}
#[test]
fn oversize_record_writes_directly_with_partial_sector_carryover() {
let (path, file, _g) = make_file();
let mut buf = LogBuffer::new(4096, 512, 0).unwrap();
let payload = vec![0xCDu8; 5000];
let (start, end) = buf.append_frame(&file, &payload).unwrap();
assert_eq!(start, 0);
assert_eq!(end, 5012);
assert_eq!(buf.flushed_through(), 4608);
assert_eq!(buf.buffered_len(), 404);
let mut f = OpenOptions::new().read(true).open(&path).unwrap();
let mut bytes = Vec::new();
let _ = f.read_to_end(&mut bytes).unwrap();
assert_eq!(bytes.len(), 5120);
}
}