use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::time::Instant;
use rkyv::api::high;
use rkyv::rancor::Error as RkyvError;
use rkyv::util::AlignedVec;
use super::entry::PerCoreWalEntry;
use super::error::PerCoreWalError;
const RECORD_HEADER_SIZE: u64 = 8;
pub struct CoreWalWriter {
core_id: usize,
writer: BufWriter<File>,
path: PathBuf,
position: u64,
synced_position: u64,
epoch: u64,
sequence: u64,
last_sync: Instant,
entries_since_sync: u64,
write_buffer: Vec<u8>,
serialize_buffer: AlignedVec,
}
impl CoreWalWriter {
pub fn new(core_id: usize, path: &Path) -> Result<Self, PerCoreWalError> {
let file = OpenOptions::new().create(true).append(true).open(path)?;
let position = file.metadata()?.len();
Ok(Self {
core_id,
writer: BufWriter::with_capacity(64 * 1024, file), path: path.to_path_buf(),
position,
synced_position: position,
epoch: 0,
sequence: 0,
last_sync: Instant::now(),
entries_since_sync: 0,
write_buffer: Vec::with_capacity(4096),
serialize_buffer: AlignedVec::with_capacity(256),
})
}
pub fn open_at(core_id: usize, path: &Path, position: u64) -> Result<Self, PerCoreWalError> {
let file = OpenOptions::new().write(true).open(path)?;
file.set_len(position)?;
let file = OpenOptions::new().append(true).open(path)?;
Ok(Self {
core_id,
writer: BufWriter::with_capacity(64 * 1024, file),
path: path.to_path_buf(),
position,
synced_position: position,
epoch: 0,
sequence: 0,
last_sync: Instant::now(),
entries_since_sync: 0,
write_buffer: Vec::with_capacity(4096),
serialize_buffer: AlignedVec::with_capacity(256),
})
}
#[must_use]
pub fn core_id(&self) -> usize {
self.core_id
}
#[must_use]
pub fn position(&self) -> u64 {
self.position
}
#[must_use]
pub fn synced_position(&self) -> u64 {
self.synced_position
}
#[must_use]
pub fn epoch(&self) -> u64 {
self.epoch
}
#[must_use]
pub fn sequence(&self) -> u64 {
self.sequence
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
#[must_use]
pub fn entries_since_sync(&self) -> u64 {
self.entries_since_sync
}
pub fn set_epoch(&mut self, epoch: u64) {
self.epoch = epoch;
}
#[inline]
#[allow(clippy::cast_possible_truncation)] pub fn append_put(&mut self, key: &[u8], value: &[u8]) -> Result<u64, PerCoreWalError> {
let ts = PerCoreWalEntry::now_ns();
let entry = PerCoreWalEntry::put(
self.core_id as u16,
self.epoch,
self.sequence,
key.to_vec(),
value.to_vec(),
ts,
);
self.append(&entry)
}
#[inline]
#[allow(clippy::cast_possible_truncation)] pub fn append_delete(&mut self, key: &[u8]) -> Result<u64, PerCoreWalError> {
let ts = PerCoreWalEntry::now_ns();
let entry = PerCoreWalEntry::delete(
self.core_id as u16,
self.epoch,
self.sequence,
key.to_vec(),
ts,
);
self.append(&entry)
}
fn serialize_entry(&mut self, entry: &PerCoreWalEntry) -> Result<(), PerCoreWalError> {
self.serialize_buffer.clear();
let taken = std::mem::take(&mut self.serialize_buffer);
let bytes = high::to_bytes_in::<_, RkyvError>(entry, taken)
.map_err(|e| PerCoreWalError::Serialization(e.to_string()))?;
let crc = crc32c::crc32c(&bytes);
#[allow(clippy::cast_possible_truncation)]
let len = bytes.len() as u32;
self.write_buffer.clear();
#[allow(clippy::cast_possible_truncation)]
self.write_buffer
.reserve(RECORD_HEADER_SIZE as usize + bytes.len());
self.write_buffer.extend_from_slice(&len.to_le_bytes());
self.write_buffer.extend_from_slice(&crc.to_le_bytes());
self.write_buffer.extend_from_slice(&bytes);
self.serialize_buffer = bytes;
Ok(())
}
fn advance_position(&mut self) {
#[allow(clippy::cast_possible_truncation)]
let data_len = self.write_buffer.len() as u64 - RECORD_HEADER_SIZE;
self.position += RECORD_HEADER_SIZE + data_len;
self.sequence += 1;
self.entries_since_sync += 1;
}
pub fn append(&mut self, entry: &PerCoreWalEntry) -> Result<u64, PerCoreWalError> {
let start_pos = self.position;
self.serialize_entry(entry)?;
self.writer.write_all(&self.write_buffer)?;
self.advance_position();
Ok(start_pos)
}
pub fn mark_synced(&mut self) {
self.synced_position = self.position;
self.last_sync = Instant::now();
self.entries_since_sync = 0;
}
#[allow(clippy::cast_possible_truncation)] pub fn append_checkpoint(&mut self, checkpoint_id: u64) -> Result<u64, PerCoreWalError> {
let ts = PerCoreWalEntry::now_ns();
let entry = PerCoreWalEntry::checkpoint(
self.core_id as u16,
self.epoch,
self.sequence,
checkpoint_id,
ts,
);
self.append(&entry)
}
#[allow(clippy::cast_possible_truncation)] pub fn append_epoch_barrier(&mut self) -> Result<u64, PerCoreWalError> {
let ts = PerCoreWalEntry::now_ns();
let entry =
PerCoreWalEntry::epoch_barrier(self.core_id as u16, self.epoch, self.sequence, ts);
self.append(&entry)
}
#[allow(clippy::cast_possible_truncation)] #[allow(clippy::disallowed_types)] pub fn append_commit(
&mut self,
offsets: std::collections::HashMap<String, u64>,
watermark: Option<i64>,
) -> Result<u64, PerCoreWalError> {
let ts = PerCoreWalEntry::now_ns();
let entry = PerCoreWalEntry::commit(
self.core_id as u16,
self.epoch,
self.sequence,
offsets,
watermark,
ts,
);
self.append(&entry)
}
pub fn sync(&mut self) -> Result<(), PerCoreWalError> {
self.writer.flush()?;
self.writer.get_ref().sync_data()?;
self.synced_position = self.position;
self.last_sync = Instant::now();
self.entries_since_sync = 0;
Ok(())
}
pub fn truncate(&mut self, position: u64) -> Result<(), PerCoreWalError> {
self.sync()?;
let file = OpenOptions::new()
.write(true)
.truncate(false)
.open(&self.path)?;
file.set_len(position)?;
file.sync_all()?;
let file = OpenOptions::new().append(true).open(&self.path)?;
self.writer = BufWriter::with_capacity(64 * 1024, file);
self.position = position;
self.synced_position = position;
Ok(())
}
pub fn reset(&mut self) -> Result<(), PerCoreWalError> {
self.truncate(0)?;
self.sequence = 0;
Ok(())
}
}
impl std::fmt::Debug for CoreWalWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoreWalWriter")
.field("core_id", &self.core_id)
.field("path", &self.path)
.field("position", &self.position)
.field("synced_position", &self.synced_position)
.field("epoch", &self.epoch)
.field("sequence", &self.sequence)
.field("entries_since_sync", &self.entries_since_sync)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_temp_writer(core_id: usize) -> (CoreWalWriter, TempDir) {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join(format!("wal-{core_id}.log"));
let writer = CoreWalWriter::new(core_id, &path).unwrap();
(writer, temp_dir)
}
#[test]
fn test_writer_creation() {
let (writer, _temp_dir) = create_temp_writer(0);
assert_eq!(writer.core_id(), 0);
assert_eq!(writer.position(), 0);
assert_eq!(writer.epoch(), 0);
assert_eq!(writer.sequence(), 0);
}
#[test]
fn test_append_put() {
let (mut writer, _temp_dir) = create_temp_writer(0);
let pos = writer.append_put(b"key1", b"value1").unwrap();
assert_eq!(pos, 0);
assert!(writer.position() > 0);
assert_eq!(writer.sequence(), 1);
let pos2 = writer.append_put(b"key2", b"value2").unwrap();
assert!(pos2 > pos);
assert_eq!(writer.sequence(), 2);
}
#[test]
fn test_append_delete() {
let (mut writer, _temp_dir) = create_temp_writer(1);
let pos = writer.append_delete(b"key1").unwrap();
assert_eq!(pos, 0);
assert!(writer.position() > 0);
}
#[test]
fn test_sync() {
let (mut writer, _temp_dir) = create_temp_writer(0);
writer.append_put(b"key1", b"value1").unwrap();
assert_eq!(writer.entries_since_sync(), 1);
writer.sync().unwrap();
assert_eq!(writer.entries_since_sync(), 0);
}
#[test]
fn test_epoch_setting() {
let (mut writer, _temp_dir) = create_temp_writer(0);
assert_eq!(writer.epoch(), 0);
writer.set_epoch(5);
assert_eq!(writer.epoch(), 5);
}
#[test]
fn test_truncate() {
let (mut writer, _temp_dir) = create_temp_writer(0);
writer.append_put(b"key1", b"value1").unwrap();
let pos1 = writer.position();
writer.append_put(b"key2", b"value2").unwrap();
let pos2 = writer.position();
assert!(pos2 > pos1);
writer.truncate(pos1).unwrap();
assert_eq!(writer.position(), pos1);
}
#[test]
fn test_reset() {
let (mut writer, _temp_dir) = create_temp_writer(0);
writer.append_put(b"key1", b"value1").unwrap();
writer.append_put(b"key2", b"value2").unwrap();
assert!(writer.position() > 0);
assert_eq!(writer.sequence(), 2);
writer.reset().unwrap();
assert_eq!(writer.position(), 0);
assert_eq!(writer.sequence(), 0);
}
#[test]
fn test_append_checkpoint() {
let (mut writer, _temp_dir) = create_temp_writer(0);
let pos = writer.append_checkpoint(100).unwrap();
assert_eq!(pos, 0);
assert!(writer.position() > 0);
}
#[test]
fn test_append_epoch_barrier() {
let (mut writer, _temp_dir) = create_temp_writer(0);
writer.set_epoch(5);
let pos = writer.append_epoch_barrier().unwrap();
assert_eq!(pos, 0);
assert!(writer.position() > 0);
}
#[test]
fn test_append_commit() {
let (mut writer, _temp_dir) = create_temp_writer(0);
#[allow(clippy::disallowed_types)] let mut offsets = std::collections::HashMap::new();
offsets.insert("topic1".to_string(), 100);
let pos = writer.append_commit(offsets, Some(12345)).unwrap();
assert_eq!(pos, 0);
assert!(writer.position() > 0);
}
#[test]
fn test_synced_position_tracks_sync() {
let (mut writer, _temp_dir) = create_temp_writer(0);
assert_eq!(writer.position(), 0);
assert_eq!(writer.synced_position(), 0);
writer.append_put(b"key1", b"value1").unwrap();
assert!(writer.position() > 0);
assert_eq!(writer.synced_position(), 0);
let pos_before_sync = writer.position();
writer.sync().unwrap();
assert_eq!(writer.synced_position(), pos_before_sync);
writer.append_put(b"key2", b"value2").unwrap();
assert!(writer.position() > writer.synced_position());
assert_eq!(writer.synced_position(), pos_before_sync);
}
#[test]
fn test_debug_format() {
let (writer, _temp_dir) = create_temp_writer(42);
let debug_str = format!("{writer:?}");
assert!(debug_str.contains("CoreWalWriter"));
assert!(debug_str.contains("42"));
}
#[test]
fn test_open_at() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("wal-0.log");
{
let mut writer = CoreWalWriter::new(0, &path).unwrap();
writer.append_put(b"key1", b"value1").unwrap();
writer.append_put(b"key2", b"value2").unwrap();
writer.sync().unwrap();
}
let file_size = std::fs::metadata(&path).unwrap().len();
let writer = CoreWalWriter::open_at(0, &path, 0).unwrap();
assert_eq!(writer.position(), 0);
let new_size = std::fs::metadata(&path).unwrap().len();
assert!(new_size < file_size);
}
}