use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use crate::api::errors::{Error, Result};
use super::ring::{ReserveTicket, WalRing};
use super::writer::WalWriter;
#[derive(Debug, Clone, Copy)]
pub(crate) struct JournalStats {
pub(crate) appends: u64,
pub(crate) batches: u64,
pub(crate) syncs: u64,
pub(crate) queued_work: u64,
pub(crate) written_work: u64,
pub(crate) flushed_work: u64,
pub(crate) checkpointed_work: u64,
pub(crate) pending_work: u64,
pub(crate) checkpoint_debt: u64,
}
const RING_CAPACITY_BYTES: usize = 16 * 1024 * 1024;
const FLUSH_POLL: Duration = Duration::from_micros(50);
const RECORD_BUFFER_POOL_LIMIT: usize = 1024;
const RECORD_BUFFER_RETAIN_MAX: usize = 64 * 1024;
enum Control {
Flush,
Truncate(Sender<Result<()>>),
Stop,
}
struct Shared {
ring: WalRing,
writer: Mutex<WalWriter>,
record_base: u64,
queued: AtomicU64,
written: AtomicU64,
flushed: AtomicU64,
checkpointed: AtomicU64,
sync_target: AtomicU64,
appends: AtomicU64,
batches: AtomicU64,
syncs: AtomicU64,
err: Mutex<Option<&'static str>>,
flushed_mx: Mutex<()>,
flushed_cv: Condvar,
space_mx: Mutex<()>,
space_cv: Condvar,
control_tx: Sender<Control>,
record_pool: Mutex<Vec<Vec<u8>>>,
}
impl Shared {
fn sticky_err(&self) -> Option<&'static str> {
*self.err.lock().unwrap()
}
fn set_err(&self, msg: &'static str) {
let mut slot = self.err.lock().unwrap();
if slot.is_none() {
*slot = Some(msg);
}
drop(slot);
let _g = self.flushed_mx.lock().unwrap();
self.flushed_cv.notify_all();
}
fn drain_and_maybe_sync(&self) {
if self.sticky_err().is_some() {
return;
}
let rc = self.ring.committed_records();
let want_sync =
self.sync_target.load(Ordering::Acquire) > self.flushed.load(Ordering::Acquire);
let mut sink_err: Option<&'static str> = None;
let mut freed_space = false;
{
let mut w = self.writer.lock().unwrap();
let copied = self.ring.copy_committed_prefix(&mut |bytes| {
if sink_err.is_none() && w.append_encoded(bytes).is_err() {
sink_err = Some("journal flusher append failed");
}
});
if let Some(msg) = sink_err {
drop(w);
self.set_err(msg);
return;
}
if copied > 0 {
self.written
.fetch_max(self.record_base + rc, Ordering::AcqRel);
self.batches.fetch_add(1, Ordering::Relaxed);
freed_space = true;
}
if want_sync {
if w.flush().is_err() {
drop(w);
self.set_err("journal flusher fsync failed");
return;
}
self.syncs.fetch_add(1, Ordering::Relaxed);
self.flushed
.fetch_max(self.record_base + rc, Ordering::AcqRel);
}
}
if want_sync {
let _g = self.flushed_mx.lock().unwrap();
self.flushed_cv.notify_all();
}
if freed_space {
let _g = self.space_mx.lock().unwrap();
self.space_cv.notify_all();
}
}
fn wait_for_ring_space(&self, ticket: &ReserveTicket) -> Result<()> {
let _ = self.control_tx.send(Control::Flush);
let mut guard = self.space_mx.lock().unwrap();
while !self.ring.reserve_space_ready(ticket) {
if let Some(m) = self.sticky_err() {
return Err(Error::Internal(m));
}
let _ = self.control_tx.send(Control::Flush);
let (next, _timeout) = self
.space_cv
.wait_timeout(guard, FLUSH_POLL.saturating_mul(4))
.unwrap();
guard = next;
}
Ok(())
}
fn flush_to(&self, target: u64) -> Result<()> {
if target <= self.flushed.load(Ordering::Acquire) {
return match self.sticky_err() {
Some(m) => Err(Error::Internal(m)),
None => Ok(()),
};
}
self.sync_target.fetch_max(target, Ordering::AcqRel);
let _ = self.control_tx.send(Control::Flush);
let mut guard = self.flushed_mx.lock().unwrap();
loop {
if let Some(m) = self.sticky_err() {
return Err(Error::Internal(m));
}
if self.flushed.load(Ordering::Acquire) >= target {
return Ok(());
}
guard = self.flushed_cv.wait(guard).unwrap();
}
}
fn record_buffer(&self, min_capacity: usize) -> Vec<u8> {
if min_capacity <= RECORD_BUFFER_RETAIN_MAX {
if let Ok(mut pool) = self.record_pool.try_lock() {
while let Some(mut buf) = pool.pop() {
if buf.capacity() >= min_capacity {
buf.clear();
return buf;
}
}
}
}
Vec::with_capacity(min_capacity)
}
fn recycle(&self, mut buf: Vec<u8>) {
if buf.capacity() == 0 || buf.capacity() > RECORD_BUFFER_RETAIN_MAX {
return;
}
if let Ok(mut pool) = self.record_pool.try_lock() {
if pool.len() < RECORD_BUFFER_POOL_LIMIT {
buf.clear();
pool.push(buf);
}
}
}
}
pub(crate) struct JournalAck {
shared: Arc<Shared>,
target: u64,
}
impl JournalAck {
pub(crate) fn wait(self) -> Result<()> {
self.shared.flush_to(self.target)
}
}
pub(crate) struct Journal {
shared: Arc<Shared>,
handle: Mutex<Option<JoinHandle<()>>>,
}
impl Journal {
pub(crate) fn open_or_create(path: &std::path::Path, tree_id: u64) -> Result<Self> {
let writer = WalWriter::open_or_create(path, tree_id)?;
let record_base = u64::from(writer.has_records());
let initial_flushed = record_base.saturating_sub(1);
let (control_tx, control_rx) = unbounded::<Control>();
let shared = Arc::new(Shared {
ring: WalRing::with_capacity(RING_CAPACITY_BYTES),
writer: Mutex::new(writer),
record_base,
queued: AtomicU64::new(record_base),
written: AtomicU64::new(record_base),
flushed: AtomicU64::new(initial_flushed),
checkpointed: AtomicU64::new(0),
sync_target: AtomicU64::new(0),
appends: AtomicU64::new(0),
batches: AtomicU64::new(0),
syncs: AtomicU64::new(0),
err: Mutex::new(None),
flushed_mx: Mutex::new(()),
flushed_cv: Condvar::new(),
space_mx: Mutex::new(()),
space_cv: Condvar::new(),
control_tx,
record_pool: Mutex::new(Vec::new()),
});
let worker_shared = Arc::clone(&shared);
let handle = thread::Builder::new()
.name("holt-journal-ring".to_owned())
.spawn(move || run_flusher(worker_shared, control_rx))
.map_err(|_| Error::Internal("OS rejected thread spawn for holt-journal-ring"))?;
Ok(Self {
shared,
handle: Mutex::new(Some(handle)),
})
}
pub(crate) fn submit(&self, bytes: Vec<u8>, sync: bool) -> Result<Option<JournalAck>> {
if let Some(m) = self.shared.sticky_err() {
return Err(Error::Internal(m));
}
if bytes.is_empty() {
return Err(Error::Internal("journal record must not be empty"));
}
if bytes.len() as u64 > self.shared.ring.capacity() {
return Err(Error::Internal("journal record exceeds WAL ring capacity"));
}
let ticket = self.shared.ring.reserve(bytes.len() as u64);
if !self.shared.ring.reserve_space_ready(&ticket) {
self.shared.wait_for_ring_space(&ticket)?;
}
self.shared.ring.fill(&ticket, &bytes);
self.shared.ring.publish(&ticket);
self.shared.recycle(bytes);
let n = self.shared.queued.fetch_add(1, Ordering::AcqRel) + 1;
self.shared.appends.fetch_add(1, Ordering::Relaxed);
if sync {
Ok(Some(JournalAck {
shared: Arc::clone(&self.shared),
target: n,
}))
} else {
Ok(None)
}
}
pub(crate) fn record_buffer(&self, min_capacity: usize) -> Vec<u8> {
self.shared.record_buffer(min_capacity)
}
pub(crate) fn queued_work(&self) -> u64 {
self.shared.queued.load(Ordering::Acquire)
}
pub(crate) fn flush_up_to(&self, observed: u64) -> Result<()> {
self.shared.flush_to(observed)
}
pub(crate) fn truncate(&self) -> Result<()> {
let observed = self.shared.queued.load(Ordering::Acquire);
if observed == self.shared.checkpointed.load(Ordering::Acquire) {
return Ok(());
}
let (ack, rx) = crossbeam_channel::bounded(1);
self.shared
.control_tx
.send(Control::Truncate(ack))
.map_err(|_| Error::Internal("journal flusher stopped before truncate"))?;
rx.recv()
.map_err(|_| Error::Internal("journal flusher dropped truncate acknowledgement"))??;
self.shared
.checkpointed
.fetch_max(observed, Ordering::AcqRel);
Ok(())
}
pub(crate) fn needs_checkpoint(&self) -> bool {
self.shared.queued.load(Ordering::Acquire)
!= self.shared.checkpointed.load(Ordering::Acquire)
}
#[cfg(test)]
fn needs_flush(&self) -> bool {
self.shared.queued.load(Ordering::Acquire) > self.shared.flushed.load(Ordering::Acquire)
}
pub(crate) fn stats(&self) -> JournalStats {
let queued_work = self.shared.queued.load(Ordering::Acquire);
let written_work = self.shared.written.load(Ordering::Acquire);
let flushed_work = self.shared.flushed.load(Ordering::Acquire);
let checkpointed_work = self.shared.checkpointed.load(Ordering::Acquire);
JournalStats {
appends: self.shared.appends.load(Ordering::Relaxed),
batches: self.shared.batches.load(Ordering::Relaxed),
syncs: self.shared.syncs.load(Ordering::Relaxed),
queued_work,
written_work,
flushed_work,
checkpointed_work,
pending_work: queued_work.saturating_sub(flushed_work),
checkpoint_debt: queued_work.saturating_sub(checkpointed_work),
}
}
}
impl Drop for Journal {
fn drop(&mut self) {
let _ = self.shared.control_tx.send(Control::Stop);
if let Some(handle) = self.handle.lock().unwrap().take() {
let _ = handle.join();
}
}
}
fn run_flusher(shared: Arc<Shared>, control_rx: Receiver<Control>) {
loop {
shared.drain_and_maybe_sync();
match control_rx.recv_timeout(FLUSH_POLL) {
Ok(Control::Flush) | Err(RecvTimeoutError::Timeout) => {}
Ok(Control::Truncate(ack)) => {
shared.drain_and_maybe_sync();
let result = do_truncate(&shared);
let _ = ack.send(result);
}
Ok(Control::Stop) => {
shared.drain_and_maybe_sync();
break;
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
}
fn do_truncate(shared: &Shared) -> Result<()> {
if let Some(m) = shared.sticky_err() {
return Err(Error::Internal(m));
}
let mut w = shared.writer.lock().unwrap();
w.truncate()?;
drop(w);
shared.ring.reset_after_drain();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::journal::codec::FILE_HEADER_SIZE;
#[test]
fn fresh_journal_flush_and_truncate_are_noops() {
let dir = tempfile::tempdir().unwrap();
let journal = Journal::open_or_create(&dir.path().join("journal.wal"), 0).unwrap();
assert!(!journal.needs_checkpoint());
journal.flush_up_to(journal.queued_work()).unwrap();
journal.truncate().unwrap();
let stats = journal.stats();
assert_eq!(stats.appends, 0);
assert_eq!(stats.syncs, 0);
assert!(!journal.needs_checkpoint());
}
#[test]
fn append_requires_one_checkpoint_truncate() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("journal.wal");
let journal = Journal::open_or_create(&path, 0).unwrap();
journal.submit(vec![1, 2, 3, 4], false).unwrap();
assert!(journal.needs_checkpoint());
journal.flush_up_to(journal.queued_work()).unwrap();
assert!(std::fs::metadata(&path).unwrap().len() > FILE_HEADER_SIZE as u64);
journal.truncate().unwrap();
assert!(!journal.needs_checkpoint());
assert_eq!(
std::fs::metadata(&path).unwrap().len(),
FILE_HEADER_SIZE as u64
);
let syncs_after_truncate = journal.stats().syncs;
journal.flush_up_to(journal.queued_work()).unwrap();
journal.truncate().unwrap();
assert_eq!(journal.stats().syncs, syncs_after_truncate);
}
#[test]
fn durable_append_satisfies_later_flush_barrier() {
let dir = tempfile::tempdir().unwrap();
let journal = Journal::open_or_create(&dir.path().join("journal.wal"), 0).unwrap();
let ack = journal
.submit(vec![5, 6, 7, 8], true)
.unwrap()
.expect("durable append returns an ack");
ack.wait().unwrap();
assert!(journal.needs_checkpoint());
assert!(!journal.needs_flush());
let syncs_after_append = journal.stats().syncs;
journal.flush_up_to(journal.queued_work()).unwrap();
assert_eq!(journal.stats().syncs, syncs_after_append);
journal.truncate().unwrap();
assert!(!journal.needs_checkpoint());
}
#[test]
fn enqueue_append_is_flushed_by_later_barrier() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("journal.wal");
let journal = Journal::open_or_create(&path, 0).unwrap();
let ack = journal.submit(vec![1, 3, 5, 7], false).unwrap();
assert!(ack.is_none());
journal.flush_up_to(journal.queued_work()).unwrap();
assert!(std::fs::metadata(&path).unwrap().len() > FILE_HEADER_SIZE as u64);
assert!(!journal.needs_flush());
assert_eq!(journal.stats().syncs, 1);
assert_eq!(journal.stats().appends, 1);
assert!(journal.stats().batches >= 1);
}
#[test]
fn invalid_record_size_is_rejected_without_poisoning_journal() {
let dir = tempfile::tempdir().unwrap();
let journal = Journal::open_or_create(&dir.path().join("journal.wal"), 0).unwrap();
assert!(journal.submit(Vec::new(), false).is_err());
assert!(journal
.submit(vec![0; RING_CAPACITY_BYTES + 1], false)
.is_err());
journal.submit(vec![1, 2, 3, 4], false).unwrap();
journal.flush_up_to(journal.queued_work()).unwrap();
assert_eq!(journal.stats().appends, 1);
}
#[test]
fn encoded_record_buffers_are_recycled_after_flusher_append() {
let dir = tempfile::tempdir().unwrap();
let journal = Journal::open_or_create(&dir.path().join("journal.wal"), 0).unwrap();
let mut record = journal.record_buffer(64);
let capacity = record.capacity();
assert!(capacity >= 64);
record.extend_from_slice(&[1; 32]);
journal.submit(record, false).unwrap();
journal.flush_up_to(journal.queued_work()).unwrap();
let reused = journal.record_buffer(16);
assert!(reused.capacity() >= capacity);
}
#[test]
fn reopened_nonempty_wal_still_needs_checkpoint() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("journal.wal");
{
let journal = Journal::open_or_create(&path, 0).unwrap();
journal.submit(vec![9, 8, 7, 6], false).unwrap();
journal.flush_up_to(journal.queued_work()).unwrap();
assert!(journal.needs_checkpoint());
}
let journal = Journal::open_or_create(&path, 0).unwrap();
assert!(journal.needs_checkpoint());
assert!(journal.needs_flush());
journal.flush_up_to(journal.queued_work()).unwrap();
assert!(!journal.needs_flush());
journal.truncate().unwrap();
assert!(!journal.needs_checkpoint());
}
}