use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::Path;
use crate::api::errors::{Error, Result};
#[cfg(test)]
use super::codec::encode_record;
use super::codec::{decode_file_header, encode_file_header, FileHeader, FILE_HEADER_SIZE};
#[cfg(test)]
use super::wal_op::WalOp;
pub const AUTO_FLUSH_THRESHOLD: usize = 64 * 1024;
#[derive(Debug)]
pub struct WalWriter {
file: File,
pending: Vec<u8>,
bytes_written: u64,
header: FileHeader,
}
impl WalWriter {
pub fn create(path: &Path, tree_id: u64) -> Result<Self> {
let mut file = OpenOptions::new().write(true).create_new(true).open(path)?;
let header = FileHeader::now(tree_id);
let mut buf = Vec::with_capacity(FILE_HEADER_SIZE);
encode_file_header(&header, &mut buf);
file.write_all(&buf)?;
file.sync_data()?;
let file = OpenOptions::new().append(true).open(path)?;
Ok(Self {
file,
pending: Vec::with_capacity(4096),
bytes_written: FILE_HEADER_SIZE as u64,
header,
})
}
pub fn open_existing(path: &Path) -> Result<Self> {
let mut header_bytes = [0u8; FILE_HEADER_SIZE];
{
let mut f = File::open(path)?;
use std::io::Read;
f.read_exact(&mut header_bytes)?;
}
let header = decode_file_header(&header_bytes)?;
let file = OpenOptions::new().append(true).open(path)?;
let bytes_written = file.metadata()?.len();
Ok(Self {
file,
pending: Vec::with_capacity(4096),
bytes_written,
header,
})
}
pub fn open_or_create(path: &Path, tree_id: u64) -> Result<Self> {
if path.exists() {
let w = Self::open_existing(path)?;
if w.header.tree_id != tree_id {
return Err(Error::ReplaySanityFailed {
context: "WAL file tree_id mismatch on open",
record_offset: 0,
});
}
Ok(w)
} else {
Self::create(path, tree_id)
}
}
#[cfg(test)]
#[must_use]
pub fn header(&self) -> FileHeader {
self.header
}
#[cfg(test)]
#[must_use]
pub fn bytes_written(&self) -> u64 {
self.bytes_written + self.pending.len() as u64
}
#[must_use]
pub(crate) fn has_records(&self) -> bool {
self.bytes_written + self.pending.len() as u64 > FILE_HEADER_SIZE as u64
}
#[cfg(test)]
pub fn append(&mut self, op: &WalOp, seq: u64) -> Result<()> {
encode_record(op, seq, &mut self.pending);
self.maybe_drain()
}
pub(crate) fn append_encoded(&mut self, record: &[u8]) -> Result<()> {
self.pending.extend_from_slice(record);
self.maybe_drain()
}
fn maybe_drain(&mut self) -> Result<()> {
if self.pending.len() >= AUTO_FLUSH_THRESHOLD {
self.drain_to_os()?;
}
Ok(())
}
pub(crate) fn drain_to_os(&mut self) -> Result<()> {
if self.pending.is_empty() {
return Ok(());
}
self.file.write_all(&self.pending)?;
self.bytes_written += self.pending.len() as u64;
self.pending.clear();
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
self.drain_to_os()?;
self.file.sync_data()?;
Ok(())
}
#[cfg(test)]
pub fn discard_pending(&mut self) {
self.pending.clear();
}
pub fn truncate(&mut self) -> Result<()> {
self.pending.clear();
self.file.set_len(FILE_HEADER_SIZE as u64)?;
self.file.sync_data()?;
self.bytes_written = FILE_HEADER_SIZE as u64;
#[cfg(feature = "tracing")]
tracing::info!(target: "holt::wal", "wal truncated to header-only");
Ok(())
}
}