use std::{cell::RefCell, io, path::PathBuf, rc::Rc};
use bincode::Options;
use bytes::{BufMut, BytesMut};
use tempest_io::{Io, IoBuf, OpenOptions, Statx};
use tempest_rt::{
JoinHandle, close_file, list_dir, open_file, read_exact, remove_file, spawn, stat_file,
sync::{
mpsc::{BoundedSender, bounded},
oneshot,
},
sync_file, write_exact,
};
use crate::{bincode_options, utils::ByteSize};
mod config;
mod header;
#[cfg(test)]
mod tests;
pub use config::{JournalConfig, JournalError, Replayable};
pub use header::{EditPrefix, JOURNAL_MAGIC_NUM};
use header::{EDIT_PREFIX_SIZE, JOURNAL_HEADER_SIZE, JournalHeader};
struct JournalMessage<T: Replayable> {
edit: T::Edit,
tx: oneshot::Sender<Result<(), JournalError>>,
}
pub struct JournalHandle<T: Replayable> {
tx: BoundedSender<JournalMessage<T>>,
data: Rc<RefCell<T>>,
}
impl<T: Replayable> JournalHandle<T> {
pub async fn append(&self, edit: T::Edit) -> Result<(), JournalError> {
let (tx, rx) = oneshot::channel();
self.tx
.clone()
.send(JournalMessage { edit, tx })
.await
.map_err(|_| JournalError::WorkerDied)?;
rx.recv()
.await
.map_err(|_| JournalError::WorkerDied)
.flatten()
}
pub fn data(&self) -> std::cell::Ref<'_, T> {
self.data.borrow()
}
}
#[derive(Debug)]
pub struct Journal<T: Replayable, I: Io> {
dir: PathBuf,
data: Rc<RefCell<T>>,
config: JournalConfig,
fd: Option<I::Fd>,
filenum: u64,
write_offset: u64,
rotation_threshold: u64,
}
impl<T: Replayable, I: Io> Journal<T, I> {
fn path(&self, filenum: u64) -> PathBuf {
self.dir
.join(format!("{}-{}", T::filename_prefix(), filenum))
}
fn write_edit(scratch: &mut BytesMut, edit: &T::Edit) -> Result<u64, JournalError> {
let initial_size = scratch.len();
scratch.put_bytes(0, EDIT_PREFIX_SIZE);
if let Err(e) = bincode_options().serialize_into(scratch.writer(), edit) {
return Err(e.into());
}
let prefix = EditPrefix::new(&scratch[initial_size + EDIT_PREFIX_SIZE..]);
scratch[initial_size..initial_size + EDIT_PREFIX_SIZE].copy_from_slice(&prefix.encode());
Ok((scratch.len() - initial_size) as u64)
}
async fn create_file(
&mut self,
filenum: u64,
old_fd: Option<I::Fd>,
) -> Result<(), JournalError> {
let path = self.path(filenum);
debug!(filenum, ?path, "creating new journal file");
let fd = open_file::<I>(
path,
OpenOptions::new().create(true).write(true).truncate(true),
)
.await?;
let mut scratch = BytesMut::new();
let header = JournalHeader::new(filenum).encode();
scratch.put_slice(&header);
Self::write_edit(&mut scratch, &self.data.borrow().snapshot())?;
let initial_size = scratch.len() as u64;
let (result, _) = write_exact::<_, I>(fd, scratch, 0).await;
result?;
sync_file::<I>(fd).await?;
if let Some(old_fd) = old_fd {
let old_path = self.path(self.filenum);
close_file::<I>(old_fd).await?;
if let Err(e) = remove_file::<I>(old_path).await {
warn!("could not remove old journal file: {}", e);
}
}
let factored = initial_size * self.config.growth_factor;
let rotation_threshold = factored.max(self.config.growth_baseline);
self.fd = Some(fd);
self.filenum = filenum;
self.write_offset = initial_size;
self.rotation_threshold = rotation_threshold;
Ok(())
}
async fn replay(&mut self, file_size: u64) -> Result<(), JournalError> {
let fd = self.fd.expect("replay called before fd is set");
let mut read_offset = JOURNAL_HEADER_SIZE as u64;
while read_offset < file_size {
let mut prefix_buf = BytesMut::zeroed(EDIT_PREFIX_SIZE);
let (result, slice) =
read_exact::<_, I>(fd, prefix_buf.slice(0..EDIT_PREFIX_SIZE), read_offset).await;
prefix_buf = slice.into_inner();
result?;
let prefix = EditPrefix::decode_from_slice(&prefix_buf);
read_offset += EDIT_PREFIX_SIZE as u64;
let edit_len = prefix.len() as usize;
let mut edit_buf = BytesMut::zeroed(edit_len);
let (result, slice) =
read_exact::<_, I>(fd, edit_buf.slice(0..edit_len), read_offset).await;
edit_buf = slice.into_inner();
result?;
if !prefix.is_valid(&edit_buf) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"journal record prefix checksum mismatch: potential corruption",
)
.into());
}
let edit: T::Edit = bincode_options().deserialize(&edit_buf)?;
self.data.borrow_mut().apply(edit);
read_offset += edit_len as u64;
}
self.write_offset = read_offset;
Ok(())
}
async fn append(&mut self, edit: T::Edit) -> Result<(), JournalError> {
let fd = self.fd.expect("append called before init");
let original_offset = self.write_offset;
let mut scratch = BytesMut::new();
Self::write_edit(&mut scratch, &edit)?;
let len = scratch.len() as u64;
let (result, _) = write_exact::<_, I>(fd, scratch, self.write_offset).await;
result?;
sync_file::<I>(fd).await?;
self.data.borrow_mut().apply(edit);
self.write_offset += len;
trace!(size=?ByteSize(self.write_offset - original_offset), "wrote edit");
if self.write_offset >= self.rotation_threshold {
debug!(
size = ?ByteSize(self.write_offset),
threshold = ?ByteSize(self.rotation_threshold),
"journal size exceeds rotation threshold, rotating file",
);
self.rotate().await?;
}
Ok(())
}
pub async fn rotate(&mut self) -> Result<(), JournalError> {
debug!("rotating journal file");
let old_fd = self.fd.take();
let new_filenum = self.filenum + 1;
self.create_file(new_filenum, old_fd).await
}
pub async fn close(mut self) -> Result<(), JournalError> {
if let Some(fd) = self.fd.take() {
close_file::<I>(fd).await?;
}
Ok(())
}
async fn init(&mut self) -> Result<(), JournalError> {
let entries = list_dir::<I>(self.dir.clone())?;
let files: Vec<PathBuf> = entries
.into_iter()
.filter(|e| !e.is_dir)
.map(|e| e.path)
.collect();
let mut winner: Option<(u64, PathBuf)> = None;
for path in files {
let fd = match open_file::<I>(path.clone(), OpenOptions::new().read(true)).await {
Ok(fd) => fd,
Err(err) => {
warn!("could not open journal candidate (skipping): {err}");
continue;
}
};
let mut scratch = BytesMut::zeroed(JOURNAL_HEADER_SIZE);
let (result, slice) =
read_exact::<_, I>(fd, scratch.slice(0..JOURNAL_HEADER_SIZE), 0).await;
scratch = slice.into_inner();
close_file::<I>(fd).await.ok();
if result.is_err() {
continue;
}
match JournalHeader::decode_from_slice(&scratch) {
Ok(header) => {
if winner.as_ref().map_or(true, |(n, _)| header.filenum > *n) {
winner = Some((header.filenum, path));
}
}
Err(err) => {
warn!("could not decode journal candidate (skipping): {err}");
continue;
}
}
}
match winner {
None => self.create_file(0, None).await?,
Some((filenum, path)) => {
let fd = open_file::<I>(path, OpenOptions::new().read(true).write(true)).await?;
let stat = stat_file::<I>(fd).await?;
self.fd = Some(fd);
self.filenum = filenum;
self.write_offset = JOURNAL_HEADER_SIZE as u64;
if stat.stx_size() > JOURNAL_HEADER_SIZE as u64 {
self.replay(stat.stx_size()).await?;
}
}
}
Ok(())
}
pub async fn new(
dir: PathBuf,
config: JournalConfig,
) -> Result<(JournalHandle<T>, JoinHandle<()>), JournalError> {
let data = Rc::new(RefCell::new(T::initial()));
let mut instance = Self {
dir,
data: data.clone(),
config,
fd: None,
filenum: 0,
write_offset: 0,
rotation_threshold: 0,
};
instance.init().await?;
let (tx, mut rx) = bounded(256);
let journal_handle = JournalHandle { tx, data };
let join_handle = spawn(async move {
loop {
match rx.recv().await {
Ok(msg) => {
let result = instance.append(msg.edit).await;
if let Err(err) = &result {
error!("failed to append edit: {err}");
break;
}
let _ = msg.tx.send(result);
}
Err(_) => break,
}
}
if let Err(err) = instance.close().await {
error!("failed to close journal: {err}");
}
});
Ok((journal_handle, join_handle))
}
}