use std::{
collections::VecDeque,
future::poll_fn,
io,
path::{Path, PathBuf},
pin::Pin,
rc::Rc,
task::Poll,
};
use bytes::BytesMut;
use derive_more::{Display, Error, From};
use tempest_core::{
journal::{Journal, JournalError, JournalHandle},
utils::ByteSize,
};
use tempest_io::{Io, IoBuf, OpenOptions, Statx};
use tempest_rt::{
CloseFile, JoinHandle, OpenFile, SyncFile, WriteExact, close_file, open_file, read_exact,
stat_file, sync::oneshot, sync_file, write_exact, yield_now,
};
use crate::{
config::WalConfig,
wal::{
data::{WalData, WalDataEdit},
format::{WAL_HEADER_SIZE, WAL_RECORD_PREFIX_SIZE, WalHeader, WalRecordPrefix},
},
};
mod data;
mod format;
#[cfg(test)]
mod tests;
fn wal_file_path(dir: impl AsRef<Path>, filenum: u64) -> PathBuf {
dir.as_ref().join("files").join(format!("{}.wal", filenum))
}
#[derive(Debug, Clone, Display, From, Error)]
pub enum WalError {
#[display("io error: {}", _0)]
Io(Rc<io::Error>),
#[display("journal error: {}", _0)]
Journal(Rc<JournalError>),
}
impl From<io::Error> for WalError {
fn from(e: io::Error) -> Self {
Self::Io(Rc::new(e))
}
}
impl From<JournalError> for WalError {
fn from(e: JournalError) -> Self {
Self::Journal(Rc::new(e))
}
}
pub(crate) type AckResult = Result<(), WalError>;
pub(crate) type WalAck = oneshot::Receiver<AckResult>;
#[derive(Debug)]
enum WalState {
Active,
Draining(WalError),
Drained(WalError),
}
enum Rotating<I: Io> {
Opening {
future: OpenFile<I>,
filenum: u64,
},
WritingHeader {
future: WriteExact<BytesMut, I>,
filenum: u64,
new_fd: I::Fd,
},
NeedsJournalWrite {
edit: WalDataEdit,
new_fd: I::Fd,
new_write_offset: u64,
},
Ready {
new_fd: I::Fd,
new_write_offset: u64,
},
}
pub(crate) struct Wal<I: Io> {
dir: PathBuf,
journal: JournalHandle<WalData>,
journal_task: JoinHandle<()>,
state: WalState,
fd: I::Fd,
write_offset: u64,
queued: VecDeque<(BytesMut, oneshot::Sender<AckResult>)>,
pending: VecDeque<(WriteExact<BytesMut, I>, oneshot::Sender<AckResult>, bool)>,
syncing: Vec<oneshot::Sender<AckResult>>,
fsync: Option<SyncFile<I>>,
rotating: Option<Rotating<I>>,
closing_fds: Vec<CloseFile<I>>,
config: WalConfig,
}
impl<I: Io> Wal<I> {
pub(crate) async fn init(
dir: PathBuf,
config: WalConfig,
mut on_record: impl FnMut(BytesMut),
) -> Result<Self, WalError> {
let (journal, journal_task) =
Journal::<WalData, I>::new(dir.join("journal"), config.journal.clone()).await?;
let (fd, write_offset) = if journal.data().live_files.is_empty() {
let filenum = journal.data().next_filenum;
let path = wal_file_path(&dir, filenum);
debug!(filenum, ?path, "creating WAL file");
let fd = open_file::<I>(path, OpenOptions::new().create(true).write(true)).await?;
journal.append(WalDataEdit::add_file(filenum)).await?;
let header = WalHeader::new(filenum);
let mut header_buf = BytesMut::with_capacity(WAL_HEADER_SIZE);
header_buf.extend_from_slice(&header.encode());
write_exact::<_, I>(fd, header_buf, 0).await.0?;
(fd, WAL_HEADER_SIZE as u64)
} else {
let mut filenums: Vec<u64> = journal.data().live_files.iter().cloned().collect();
filenums.sort_unstable();
let last_filenum = *filenums.last().unwrap();
debug!(
file_count = filenums.len(),
filenums = ?filenums,
"recovering WAL"
);
let mut last_fd = None;
let mut last_write_offset = WAL_HEADER_SIZE as u64;
for filenum in filenums {
let is_last = filenum == last_filenum;
let path = wal_file_path(&dir, filenum);
let fd =
open_file::<I>(path.clone(), OpenOptions::new().read(true).write(true)).await?;
let file_size = stat_file::<I>(fd).await?.stx_size();
debug!(filenum, file_size, ?path, "replaying WAL file");
let mut header_buf = BytesMut::zeroed(WAL_HEADER_SIZE);
let (res, slice) =
read_exact::<_, I>(fd, header_buf.slice(0..WAL_HEADER_SIZE), 0).await;
header_buf = slice.into_inner();
res?;
let header = WalHeader::decode(header_buf[..WAL_HEADER_SIZE].try_into().unwrap())
.map_err(WalError::from)?;
if header.filenum != filenum {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"WAL header filenum mismatch: expected {}, got {}",
filenum, header.filenum
),
)
.into());
}
let mut read_offset = WAL_HEADER_SIZE as u64;
let mut record_count = 0u64;
while read_offset < file_size {
let mut prefix_buf = BytesMut::zeroed(WAL_RECORD_PREFIX_SIZE);
let (res, slice) = read_exact::<_, I>(
fd,
prefix_buf.slice(0..WAL_RECORD_PREFIX_SIZE),
read_offset,
)
.await;
prefix_buf = slice.into_inner();
if res.is_err() {
warn!(
filenum,
read_offset, "failed to read record prefix, stopping replay"
);
break;
}
let prefix = WalRecordPrefix::decode(
prefix_buf[..WAL_RECORD_PREFIX_SIZE].try_into().unwrap(),
);
let data_len = prefix.len as usize;
let mut data_buf = BytesMut::zeroed(data_len);
let (res, slice) = read_exact::<_, I>(
fd,
data_buf.slice(0..data_len),
read_offset + WAL_RECORD_PREFIX_SIZE as u64,
)
.await;
data_buf = slice.into_inner();
if res.is_err() {
warn!(
filenum,
read_offset, "failed to read record data, stopping replay"
);
break;
}
if prefix.is_valid_record(&data_buf) {
trace!(filenum, read_offset, size = data_len, "replayed record");
on_record(data_buf);
read_offset += WAL_RECORD_PREFIX_SIZE as u64 + data_len as u64;
record_count += 1;
} else {
warn!(
filenum,
read_offset, "record checksum mismatch, stopping replay"
);
break;
}
}
debug!(
filenum,
record_count,
write_offset = read_offset,
"finished replaying WAL file"
);
if is_last {
last_fd = Some(fd);
last_write_offset = read_offset;
} else {
close_file::<I>(fd).await?;
}
}
debug!(write_offset = last_write_offset, "WAL recovery complete");
(last_fd.unwrap(), last_write_offset)
};
Ok(Self {
dir,
journal,
journal_task,
state: WalState::Active,
fd,
write_offset,
queued: VecDeque::new(),
pending: VecDeque::new(),
syncing: Vec::new(),
fsync: None,
rotating: None,
closing_fds: Vec::new(),
config,
})
}
fn has_crashed(&self) -> bool {
matches!(self.state, WalState::Draining(_) | WalState::Drained(_))
}
fn drain_syncing(&mut self, result: AckResult) {
for tx in self.syncing.drain(..) {
if let Err(_) = tx.send(result.clone()) {
warn!("wal write acked, but ack receiver dropped")
}
}
}
pub(crate) fn is_idle(&self) -> bool {
self.queued.is_empty()
&& self.pending.is_empty()
&& self.fsync.is_none()
&& self.rotating.is_none()
&& self.closing_fds.is_empty()
}
pub(crate) async fn tick(&mut self) -> Result<(), WalError> {
if let WalState::Drained(err) = &self.state {
return Err(err.clone());
}
poll_fn(|cx| {
if let Some(fsync) = self.fsync.as_mut() {
match Pin::new(fsync).poll(cx) {
Poll::Ready(Err(e)) => {
let err = WalError::Io(Rc::new(e));
self.drain_syncing(Err(err.clone()));
if !self.has_crashed() {
self.state = WalState::Draining(err)
} else {
error!("fsync failed while WAL is already crashing: {}", err)
}
self.fsync = None;
}
Poll::Ready(Ok(())) => {
let result = if let WalState::Draining(e) = &self.state {
Err(e.clone())
} else {
Ok(())
};
self.drain_syncing(result);
self.fsync = None;
}
Poll::Pending => {}
}
}
self.closing_fds.retain_mut(|fut| {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(())) => false,
Poll::Ready(Err(e)) => {
warn!("failed to close old WAL file: {}", e);
false
}
Poll::Pending => true,
}
});
let mut i = 0;
while i < self.pending.len() {
let (write, _, finished) = &mut self.pending[i];
if *finished {
i += 1;
continue;
}
match Pin::new(write).poll(cx) {
Poll::Ready((res, _)) => {
*finished = true;
if let Err(e) = res {
if !self.has_crashed() {
let err = WalError::Io(Rc::new(e));
self.state = WalState::Draining(err);
} else {
error!("write failed while WAL is already crashing: {}", e)
}
} else {
trace!(pos = i, "write completed");
}
}
Poll::Pending => {}
}
i += 1;
}
if let WalState::Draining(err) = &self.state {
for (_, tx) in self.queued.drain(..) {
let _ = tx.send(Err(err.clone()));
}
if self.pending.is_empty() && self.fsync.is_none() {
let err = err.clone();
self.state = WalState::Drained(err.clone());
Poll::Ready(Err(err.clone()))
} else {
Poll::Ready(Ok(()))
}
} else {
let ready_for_sync = self
.pending
.iter()
.take_while(|(_, _, finished)| *finished)
.count();
if self.fsync.is_none() && ready_for_sync > 0 {
trace!(count = ready_for_sync, "syncing completed writes");
self.syncing
.extend(self.pending.drain(..ready_for_sync).map(|(_, tx, _)| tx));
self.fsync = Some(sync_file(self.fd));
}
match self.rotating.as_mut() {
Some(Rotating::Opening { future, filenum }) => {
match Pin::new(future).poll(cx) {
Poll::Ready(Ok(new_fd)) => {
let filenum = *filenum;
trace!(filenum, "rotation: file opened, writing header");
let header = WalHeader::new(filenum);
let mut buf = BytesMut::with_capacity(WAL_HEADER_SIZE);
buf.extend_from_slice(&header.encode());
self.rotating = Some(Rotating::WritingHeader {
future: write_exact::<_, I>(new_fd, buf, 0),
filenum,
new_fd,
});
}
Poll::Ready(Err(e)) => {
let err = WalError::from(e);
error!(filenum, %err, "rotation failed: could not open new WAL file");
if !self.has_crashed() {
self.state = WalState::Draining(err);
}
self.rotating = None;
}
Poll::Pending => {}
}
}
Some(Rotating::WritingHeader { future, filenum, new_fd }) => {
match Pin::new(future).poll(cx) {
Poll::Ready((Ok(()), _)) => {
let filenum = *filenum;
let new_fd = *new_fd;
trace!(filenum, "rotation: header written, updating journal");
self.rotating = Some(Rotating::NeedsJournalWrite {
edit: WalDataEdit::add_file(filenum),
new_fd,
new_write_offset: WAL_HEADER_SIZE as u64,
});
}
Poll::Ready((Err(e), _)) => {
let err = WalError::from(e);
error!(filenum, %err, "rotation failed: could not write WAL header");
if !self.has_crashed() {
self.state = WalState::Draining(err);
}
self.rotating = None;
}
Poll::Pending => {}
}
}
Some(Rotating::Ready { new_fd, new_write_offset }) => {
if self.pending.is_empty() {
let old_fd = self.fd;
self.fd = *new_fd;
self.write_offset = *new_write_offset;
self.rotating = None;
self.closing_fds.push(close_file(old_fd));
debug!("rotation complete, closing old WAL file asynchronously");
}
}
_ => {}
}
if self.rotating.is_none() {
if self.write_offset >= self.config.rotate_file_size_threshold {
let filenum = self.journal.data().next_filenum + 1;
let path = wal_file_path(&self.dir, filenum);
debug!(filenum, write_offset = self.write_offset, threshold = self.config.rotate_file_size_threshold, "rotating WAL file");
self.rotating = Some(Rotating::Opening {
future: open_file::<I>(path, OpenOptions::new().write(true).create(true)),
filenum,
});
} else {
let write_slots_available =
self.config.max_concurrent_writes - self.pending.len();
let ready_for_write = self.queued.len().min(write_slots_available);
if ready_for_write > 0 {
trace!(count = ready_for_write, "moving queued writes into pending");
}
for (buf, tx) in self.queued.drain(..ready_for_write) {
let write_offset = self.write_offset;
self.write_offset += buf.len() as u64;
trace!(write_offset, size=?ByteSize(buf.len() as u64), "starting write");
let write = write_exact::<_, I>(self.fd, buf, write_offset);
self.pending.push_back((write, tx, false));
}
}
}
Poll::Ready(Ok(()))
}
})
.await?;
if matches!(self.rotating, Some(Rotating::NeedsJournalWrite { .. })) {
let Some(Rotating::NeedsJournalWrite {
edit,
new_fd,
new_write_offset,
}) = self.rotating.take()
else {
unreachable!()
};
trace!("rotation: writing journal entry");
match self.journal.append(edit).await {
Ok(()) => {
trace!("rotation: journal updated, waiting for pending writes to drain");
self.rotating = Some(Rotating::Ready {
new_fd,
new_write_offset,
});
}
Err(e) => {
let err = WalError::from(e);
error!(%err, "rotation failed: could not update journal");
if !self.has_crashed() {
self.state = WalState::Draining(err);
}
}
}
}
Ok(())
}
pub(crate) async fn close(mut self) -> Result<(), WalError> {
debug!("closing WAL");
loop {
let idle = self.queued.is_empty()
&& self.pending.is_empty()
&& self.fsync.is_none()
&& self.rotating.is_none()
&& self.closing_fds.is_empty();
if idle {
break;
}
match self.tick().await {
Ok(()) => {}
Err(_) => break,
}
yield_now().await;
}
close_file::<I>(self.fd).await?;
drop(self.journal); self.journal_task.await;
debug!("WAL closed");
Ok(())
}
pub(crate) fn append(&mut self, record: &[u8]) -> WalAck {
let (tx, rx) = oneshot::channel();
if let WalState::Draining(err) | WalState::Drained(err) = &self.state {
trace!("appended after WAL crashed; short circuiting, pushing error");
let _ = tx.send(Err(err.clone()));
return rx;
}
let mut buf = BytesMut::with_capacity(record.len() + WAL_RECORD_PREFIX_SIZE);
let prefix = WalRecordPrefix::new(record);
buf.extend_from_slice(&prefix.encode());
buf.extend_from_slice(record);
self.queued.push_back((buf, tx));
rx
}
}