use super::entry::{serialize_marker_item, Entry};
use crate::{
batch::item::Item as BatchItem, file::fsync_directory, journal::recovery::JournalId,
keyspace::InternalKeyspaceId,
};
use lsm_tree::{CompressionType, SeqNo, ValueType};
use std::{
fs::{File, OpenOptions},
hash::Hasher,
io::{BufWriter, Seek, Write},
path::{Path, PathBuf},
};
pub const PRE_ALLOCATED_BYTES: u64 = 64 * 1_024 * 1_024;
pub const JOURNAL_BUFFER_BYTES: usize = 8 * 1_024;
pub struct Writer {
pub(crate) path: PathBuf,
file: BufWriter<File>,
buf: Vec<u8>,
is_buffer_dirty: bool,
compression: CompressionType,
compression_threshold: usize,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum PersistMode {
Buffer,
SyncData,
SyncAll,
}
impl Writer {
pub fn set_compression(&mut self, comp: CompressionType, threshold: usize) {
self.compression = comp;
self.compression_threshold = threshold;
}
pub fn pos(&mut self) -> crate::Result<u64> {
self.file.stream_position().map_err(Into::into)
}
pub fn len(&self) -> crate::Result<u64> {
Ok(self.file.get_ref().metadata()?.len())
}
pub fn rotate(&mut self) -> crate::Result<(PathBuf, PathBuf)> {
self.persist(PersistMode::SyncAll)?;
log::debug!(
"Sealing active journal at {}, len={}B",
self.path.display(),
self.path
.metadata()
.inspect_err(|e| {
log::error!(
"Failed to get file metadata of journal file at {}: {e:?}",
self.path.display()
);
})?
.len(),
);
let prev_path = self.path.clone();
let folder = self
.path
.parent()
.expect("should have parent")
.to_path_buf();
let Some(basename) = self
.path
.file_name()
.expect("should be valid file name")
.to_str()
.expect("should be valid utf-8")
.strip_suffix(".jnl")
else {
log::error!("Invalid journal file name: {}", self.path.display());
return Err(crate::Error::JournalRecovery(
crate::JournalRecoveryError::InvalidFileName,
));
};
let journal_id = basename.parse::<JournalId>().map_err(|_| {
log::error!("Invalid journal file name: {}", self.path.display());
crate::Error::JournalRecovery(crate::JournalRecoveryError::InvalidFileName)
})?;
let new_path = folder.join(format!("{}.jnl", journal_id + 1));
log::debug!("Rotating active journal to {}", new_path.display());
let comp = self.compression;
let compt = self.compression_threshold;
*self = Self::create_new(new_path.clone())?;
self.set_compression(comp, compt);
fsync_directory(&folder)?;
Ok((prev_path, new_path))
}
pub fn create_new<P: Into<PathBuf>>(path: P) -> crate::Result<Self> {
let path = path.into();
let file = File::create_new(&path).inspect_err(|e| {
log::error!("Failed to create journal file at {}: {e:?}", path.display());
})?;
file.set_len(PRE_ALLOCATED_BYTES).inspect_err(|e| {
log::error!(
"Failed to set journal file size to {PRE_ALLOCATED_BYTES}B at {}: {e:?}",
path.display(),
);
})?;
file.sync_all().inspect_err(|e| {
log::error!("Failed to fsync journal file at {}: {e:?}", path.display());
})?;
Ok(Self {
path,
file: BufWriter::new(file),
buf: Vec::new(),
is_buffer_dirty: false,
compression: CompressionType::None,
compression_threshold: 0,
})
}
pub fn from_file<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
let path = path.as_ref();
if !path.try_exists()? {
let file = OpenOptions::new()
.create_new(true)
.write(true)
.open(path)
.inspect_err(|e| {
log::error!("Failed to create journal file at {}: {e:?}", path.display());
})?;
file.set_len(PRE_ALLOCATED_BYTES).inspect_err(|e| {
log::error!(
"Failed to set journal file size to {PRE_ALLOCATED_BYTES}B at {}: {e:?}",
path.display(),
);
})?;
file.sync_all().inspect_err(|e| {
log::error!("Failed to fsync journal file at {}: {e:?}", path.display());
})?;
return Ok(Self {
path: path.into(),
file: BufWriter::with_capacity(JOURNAL_BUFFER_BYTES, file),
buf: Vec::new(),
is_buffer_dirty: false,
compression: CompressionType::None,
compression_threshold: 0,
});
}
let file = OpenOptions::new()
.append(true)
.open(path)
.inspect_err(|e| {
log::error!("Failed to open journal file at {}: {e:?}", path.display());
})?;
Ok(Self {
path: path.into(),
file: BufWriter::with_capacity(JOURNAL_BUFFER_BYTES, file),
buf: Vec::new(),
is_buffer_dirty: false,
compression: CompressionType::None,
compression_threshold: 0,
})
}
pub(crate) fn persist(&mut self, mode: PersistMode) -> std::io::Result<()> {
log::trace!(
"Persisting journal at {} with mode={mode:?}",
self.path.display(),
);
if self.is_buffer_dirty {
self.file.flush().inspect_err(|e| {
log::error!(
"Failed to flush journal IO buffers at {}: {e:?}",
self.path.display(),
);
})?;
self.is_buffer_dirty = false;
}
match mode {
PersistMode::SyncAll => self.file.get_mut().sync_all().inspect_err(|e| {
log::error!(
"Failed to fsync journal file at {}: {e:?}",
self.path.display(),
);
}),
PersistMode::SyncData => self.file.get_mut().sync_data().inspect_err(|e| {
log::error!(
"Failed to fsyncdata journal file at {}: {e:?}",
self.path.display(),
);
}),
PersistMode::Buffer => Ok(()),
}
}
fn write_start(&mut self, item_count: u32, seqno: SeqNo) -> Result<usize, crate::Error> {
debug_assert!(self.buf.is_empty());
Entry::Start { item_count, seqno }.encode_into(&mut self.buf)?;
self.file.write_all(&self.buf)?;
Ok(self.buf.len())
}
fn write_end(&mut self, checksum: u64) -> Result<usize, crate::Error> {
debug_assert!(self.buf.is_empty());
Entry::End(checksum).encode_into(&mut self.buf)?;
self.file.write_all(&self.buf)?;
Ok(self.buf.len())
}
pub(crate) fn write_raw(
&mut self,
keyspace_id: InternalKeyspaceId,
key: &[u8],
value: &[u8],
value_type: ValueType,
seqno: u64,
) -> crate::Result<usize> {
self.is_buffer_dirty = true;
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
let mut byte_count = 0;
self.buf.clear();
byte_count += self.write_start(1, seqno)?;
self.buf.clear();
serialize_marker_item(
&mut self.buf,
keyspace_id,
key,
value,
value_type,
if self.compression_threshold > 0 && value.len() >= self.compression_threshold {
self.compression
} else {
CompressionType::None
},
)?;
self.file.write_all(&self.buf)?;
hasher.update(&self.buf);
byte_count += self.buf.len();
self.buf.clear();
let checksum = hasher.finish();
byte_count += self.write_end(checksum)?;
Ok(byte_count)
}
pub(crate) fn write_clear(
&mut self,
keyspace_id: InternalKeyspaceId,
seqno: SeqNo,
) -> crate::Result<usize> {
self.is_buffer_dirty = true;
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
let mut byte_count = 0;
self.buf.clear();
byte_count += self.write_start(1, seqno)?;
self.buf.clear();
Entry::Clear { keyspace_id }.encode_into(&mut self.buf)?;
self.file.write_all(&self.buf)?;
hasher.update(&self.buf);
byte_count += self.buf.len();
self.buf.clear();
let checksum = hasher.finish();
byte_count += self.write_end(checksum)?;
Ok(byte_count)
}
pub fn write_batch<'a>(
&mut self,
items: impl Iterator<Item = &'a BatchItem>,
batch_size: usize,
seqno: SeqNo,
) -> crate::Result<usize> {
if batch_size == 0 {
return Ok(0);
}
self.is_buffer_dirty = true;
self.buf.clear();
#[expect(clippy::cast_possible_truncation)]
let item_count = batch_size as u32;
let mut hasher = xxhash_rust::xxh3::Xxh3::default();
let mut byte_count = 0;
byte_count += self.write_start(item_count, seqno)?;
self.buf.clear();
for item in items {
debug_assert!(self.buf.is_empty());
serialize_marker_item(
&mut self.buf,
item.keyspace.id,
&item.key,
&item.value,
item.value_type,
if self.compression_threshold > 0 && item.value.len() >= self.compression_threshold
{
self.compression
} else {
CompressionType::None
},
)?;
self.file.write_all(&self.buf)?;
hasher.update(&self.buf);
byte_count += self.buf.len();
self.buf.clear();
}
let checksum = hasher.finish();
byte_count += self.write_end(checksum)?;
Ok(byte_count)
}
}