use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use crate::api::errors::{Error, Result};
use super::codec::{
decode_file_header, encode_erase_record, encode_file_header, encode_insert_record,
encode_record, encode_rename_object_record, FileHeader, FILE_HEADER_SIZE,
};
use super::txn_op::TxnOp;
pub const AUTO_FLUSH_THRESHOLD: usize = 64 * 1024;
#[derive(Debug)]
pub struct WalWriter {
path: PathBuf,
file: File,
pending: Vec<u8>,
bytes_written: u64,
header: FileHeader,
}
impl WalWriter {
pub fn create(path: &Path, tree_id: u64) -> Result<Self> {
let mut file = OpenOptions::new().write(true).create_new(true).open(path)?;
let header = FileHeader::now(tree_id);
let mut buf = Vec::with_capacity(FILE_HEADER_SIZE);
encode_file_header(&header, &mut buf);
file.write_all(&buf)?;
file.sync_data()?;
let file = OpenOptions::new().append(true).open(path)?;
Ok(Self {
path: path.to_path_buf(),
file,
pending: Vec::with_capacity(4096),
bytes_written: FILE_HEADER_SIZE as u64,
header,
})
}
pub fn open_existing(path: &Path) -> Result<Self> {
let mut header_bytes = [0u8; FILE_HEADER_SIZE];
{
let mut f = File::open(path)?;
use std::io::Read;
f.read_exact(&mut header_bytes)?;
}
let header = decode_file_header(&header_bytes)?;
let file = OpenOptions::new().append(true).open(path)?;
let bytes_written = file.metadata()?.len();
Ok(Self {
path: path.to_path_buf(),
file,
pending: Vec::with_capacity(4096),
bytes_written,
header,
})
}
pub fn open_or_create(path: &Path, tree_id: u64) -> Result<Self> {
if path.exists() {
let w = Self::open_existing(path)?;
if w.header.tree_id != tree_id {
return Err(Error::ReplaySanityFailed {
context: "WAL file tree_id mismatch on open",
record_offset: 0,
});
}
Ok(w)
} else {
Self::create(path, tree_id)
}
}
#[must_use]
pub fn header(&self) -> FileHeader {
self.header
}
#[must_use]
pub fn bytes_written(&self) -> u64 {
self.bytes_written + self.pending.len() as u64
}
pub fn append(&mut self, op: &TxnOp, seq: u64) -> Result<()> {
encode_record(op, seq, &mut self.pending)?;
self.maybe_drain()
}
pub fn append_insert(
&mut self,
seq: u64,
tree_id: u64,
key: &[u8],
value: &[u8],
prev_value: Option<&[u8]>,
) -> Result<()> {
encode_insert_record(&mut self.pending, seq, tree_id, key, value, prev_value);
self.maybe_drain()
}
pub fn append_erase(&mut self, seq: u64, tree_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
encode_erase_record(&mut self.pending, seq, tree_id, key, value);
self.maybe_drain()
}
pub fn append_rename_object(
&mut self,
seq: u64,
tree_id: u64,
src_key: &[u8],
dst_key: &[u8],
force: bool,
) -> Result<()> {
encode_rename_object_record(&mut self.pending, seq, tree_id, src_key, dst_key, force);
self.maybe_drain()
}
fn maybe_drain(&mut self) -> Result<()> {
if self.pending.len() >= AUTO_FLUSH_THRESHOLD {
self.drain_to_os()?;
}
Ok(())
}
fn drain_to_os(&mut self) -> Result<()> {
if self.pending.is_empty() {
return Ok(());
}
self.file.write_all(&self.pending)?;
self.bytes_written += self.pending.len() as u64;
self.pending.clear();
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
self.drain_to_os()?;
self.file.sync_data()?;
Ok(())
}
pub fn discard_pending(&mut self) {
self.pending.clear();
}
pub fn truncate(&mut self) -> Result<()> {
self.pending.clear();
let tmp_path = self.path.with_extension("wal.tmp");
{
let mut tmp = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&tmp_path)?;
let header = FileHeader::now(self.header.tree_id);
let mut buf = Vec::with_capacity(FILE_HEADER_SIZE);
encode_file_header(&header, &mut buf);
tmp.write_all(&buf)?;
tmp.sync_data()?;
self.header = header;
}
std::fs::rename(&tmp_path, &self.path)?;
self.file = OpenOptions::new().append(true).open(&self.path)?;
self.bytes_written = FILE_HEADER_SIZE as u64;
Ok(())
}
}