use heed::{
Database, EnvOpenOptions,
byteorder::BigEndian,
types::{Bytes, U64},
};
use crate::config::JournalConfig;
use crate::disruptor::{
Envelope, RingSlot,
traits::{RkyvError, RkyvToBytes},
};
use crate::error::{RecoveryError, SegmentError, WalError};
use core_affinity::CoreId;
use disrupt_rs::{EventPoller, GatedSequence, Polling, Sequence, wait_strategies::WaitStrategy};
use std::{
cell::{Cell, RefCell},
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::Instant,
};
use tracing::{error, info, trace, warn};
type BEU64 = U64<BigEndian>;
type LogDb = Database<BEU64, Bytes>;
type MetaDb = Database<Bytes, BEU64>;
const META_SCHEMA_VERSION_KEY: &[u8] = b"schema_version";
const WAL_SCHEMA_VERSION: u64 = 1;
const TRASH_DIR_NAME: &str = "trash";
thread_local! {
static SCRATCH: RefCell<rkyv::util::AlignedVec> =
RefCell::new(rkyv::util::AlignedVec::with_capacity(1024));
}
#[derive(Debug, Clone, Copy)]
pub struct RecoveryState {
pub last_app_seq: u64,
pub last_tx_id: u64,
}
pub type ArchivedEnvelopeOf<T> = <Envelope<T> as rkyv::Archive>::Archived;
fn segment_dir_name(start_seq: u64) -> String {
format!("seg_{start_seq:020}")
}
fn parse_segment_dir(name: &str) -> Option<u64> {
name.strip_prefix("seg_")
.and_then(|s| s.parse::<u64>().ok())
}
fn list_segment_dirs(root: &Path) -> anyhow::Result<Vec<String>> {
let mut dirs = Vec::new();
if !root.exists() {
return Ok(dirs);
}
for entry in std::fs::read_dir(root)? {
let entry = entry?;
if entry.file_type()?.is_dir() {
let name = entry.file_name().to_string_lossy().into_owned();
if parse_segment_dir(&name).is_some() {
dirs.push(name);
}
}
}
dirs.sort_unstable_by_key(|name| parse_segment_dir(name).unwrap_or(0));
Ok(dirs)
}
fn trash_dir(root: &Path) -> PathBuf {
root.join(TRASH_DIR_NAME)
}
fn clear_trash_dir(root: &Path) -> anyhow::Result<usize> {
let trash = trash_dir(root);
if !trash.exists() {
return Ok(0);
}
let mut removed = 0usize;
for entry in std::fs::read_dir(&trash)? {
let entry = entry?;
let path = entry.path();
if entry.file_type()?.is_dir() {
std::fs::remove_dir_all(&path)?;
} else {
std::fs::remove_file(&path)?;
}
removed += 1;
}
if std::fs::read_dir(&trash)?.next().is_none() {
let _ = std::fs::remove_dir(&trash);
}
Ok(removed)
}
fn find_segment_for_seq(dirs: &[String], target: u64) -> usize {
match dirs.binary_search_by_key(&target, |d| parse_segment_dir(d).unwrap_or(0)) {
Ok(i) => i,
Err(i) => i.saturating_sub(1),
}
}
struct Segment {
env: heed::Env,
log: LogDb,
event_count: Cell<u64>,
}
impl Segment {
fn open(dir_path: &Path, db_name: &str, map_size: usize) -> anyhow::Result<Self> {
std::fs::create_dir_all(dir_path)?;
let env = unsafe {
EnvOpenOptions::new()
.map_size(map_size)
.max_dbs(8)
.open(dir_path)?
};
let log = {
let mut wtxn = env.write_txn()?;
let db: LogDb = env.create_database(&mut wtxn, Some(db_name))?;
let meta: MetaDb = env.create_database(&mut wtxn, Some("meta"))?;
match meta.get(&wtxn, META_SCHEMA_VERSION_KEY)? {
Some(v) if v == WAL_SCHEMA_VERSION => {}
Some(v) => {
return Err(WalError::SchemaMismatch {
expected: WAL_SCHEMA_VERSION,
found: v,
}
.into());
}
None => {
meta.put(&mut wtxn, META_SCHEMA_VERSION_KEY, &WAL_SCHEMA_VERSION)?;
}
}
wtxn.commit()?;
db
};
let event_count = {
let rtxn = env.read_txn()?;
log.len(&rtxn)?
};
Ok(Self {
env,
log,
event_count: Cell::new(event_count),
})
}
fn write_txn(&self) -> anyhow::Result<heed::RwTxn<'_>> {
Ok(self.env.write_txn()?)
}
fn put_envelope<T>(
&self,
wtxn: &mut heed::RwTxn<'_>,
envelope: &Envelope<T>,
) -> anyhow::Result<()>
where
T: RkyvToBytes,
{
SCRATCH.with(|scratch| -> anyhow::Result<()> {
let mut bytes = scratch.borrow_mut();
let mut writer = std::mem::take(&mut *bytes);
writer.clear();
let writer = rkyv::api::high::to_bytes_in::<_, RkyvError>(envelope, writer)?;
self.log.put(wtxn, &envelope.seq, writer.as_slice())?;
*bytes = writer;
Ok(())
})?;
Ok(())
}
fn first_seq(&self) -> anyhow::Result<Option<u64>> {
let rtxn = self.env.read_txn()?;
Ok(self.log.first(&rtxn)?.map(|(k, _)| k))
}
fn last_seq(&self) -> anyhow::Result<Option<u64>> {
let rtxn = self.env.read_txn()?;
Ok(self.log.last(&rtxn)?.map(|(k, _)| k))
}
fn len(&self) -> u64 {
self.event_count.get()
}
}
pub struct SegmentInfo {
pub dir_name: String,
pub path: PathBuf,
pub start_seq: Option<u64>,
pub end_seq: Option<u64>,
pub event_count: u64,
pub file_size_bytes: u64,
}
fn segment_info_from_open(
root_dir: &Path,
dir_name: &str,
seg: &Segment,
) -> anyhow::Result<SegmentInfo> {
let path = root_dir.join(dir_name);
let start_seq = seg.first_seq()?;
let end_seq = seg.last_seq()?;
let event_count = seg.len();
let file_size_bytes = dir_size(&path)?;
Ok(SegmentInfo {
dir_name: dir_name.to_string(),
path,
start_seq,
end_seq,
event_count,
file_size_bytes,
})
}
fn segment_info_static(
root_dir: &Path,
dir_name: &str,
db_name: &str,
map_size: usize,
) -> anyhow::Result<SegmentInfo> {
let path = root_dir.join(dir_name);
let seg = Segment::open(&path, db_name, map_size)?;
segment_info_from_open(root_dir, dir_name, &seg)
}
fn dir_size(path: &Path) -> anyhow::Result<u64> {
let mut total = 0u64;
if path.is_dir() {
for entry in std::fs::read_dir(path)? {
let entry = entry?;
let meta = entry.metadata()?;
if meta.is_file() {
total += meta.len();
}
}
}
Ok(total)
}
pub struct JournalHandler {
root_dir: PathBuf,
db_name: String,
config: JournalConfig,
active: Segment,
active_dir_name: String,
rotation_requested: bool,
}
impl JournalHandler {
pub fn open(
root_dir: impl AsRef<Path>,
db_name: &str,
config: JournalConfig,
) -> anyhow::Result<Self> {
let root_dir = root_dir.as_ref().to_path_buf();
std::fs::create_dir_all(&root_dir)?;
let dirs = list_segment_dirs(&root_dir)?;
let (active, active_dir_name) = if dirs.is_empty() {
let dir_name = segment_dir_name(0);
let seg_path = root_dir.join(&dir_name);
info!(segment = %dir_name, "creating initial segment");
let seg = Segment::open(&seg_path, db_name, config.segment_map_size_bytes)?;
(seg, dir_name)
} else {
let last = dirs.last().unwrap().clone();
let seg_path = root_dir.join(&last);
info!(segment = %last, "opening last segment as active");
let seg = Segment::open(&seg_path, db_name, config.segment_map_size_bytes)?;
(seg, last)
};
let mut handler = Self {
root_dir,
db_name: db_name.to_string(),
config,
active,
active_dir_name,
rotation_requested: false,
};
if let Some(confirmed_seq) = handler.config.prune_before_seq_on_startup {
let moved = handler.prune_segments_before(confirmed_seq)?;
if !moved.is_empty() {
info!(
moved_segments = moved.len(),
confirmed_seq, "pruned sealed segments on startup"
);
}
}
if handler.config.clear_trash_on_startup {
match clear_trash_dir(&handler.root_dir) {
Ok(0) => {}
Ok(n) => info!(
removed_entries = n,
"deleted trashed journal segments on startup"
),
Err(e) => warn!(error = %e, "failed to clear trash directory on startup"),
}
}
if handler.config.validate_on_startup {
let dirs = list_segment_dirs(&handler.root_dir)?;
if dirs.len() > 1 {
handler.validate_segments()?;
}
}
Ok(handler)
}
fn segment_info(&self, dir_name: &str) -> anyhow::Result<SegmentInfo> {
if dir_name == self.active_dir_name {
segment_info_from_open(&self.root_dir, dir_name, &self.active)
} else {
segment_info_static(
&self.root_dir,
dir_name,
&self.db_name,
self.config.segment_map_size_bytes,
)
}
}
fn open_segment_if_not_active(&self, dir_name: &str) -> anyhow::Result<Option<Segment>> {
if dir_name == self.active_dir_name {
Ok(None)
} else {
let path = self.root_dir.join(dir_name);
Ok(Some(Segment::open(
&path,
&self.db_name,
self.config.segment_map_size_bytes,
)?))
}
}
pub fn write_txn(&self) -> anyhow::Result<heed::RwTxn<'_>> {
self.active.write_txn()
}
pub fn put_envelope<T>(
&self,
wtxn: &mut heed::RwTxn<'_>,
envelope: &Envelope<T>,
) -> anyhow::Result<()>
where
T: RkyvToBytes,
{
self.active.put_envelope(wtxn, envelope)?;
self.active
.event_count
.set(self.active.event_count.get() + 1);
Ok(())
}
pub fn request_rotation(&mut self) {
self.rotation_requested = true;
}
pub fn should_rotate(&self) -> bool {
if self.rotation_requested {
return true;
}
self.active.event_count.get() >= self.config.max_segment_events
}
pub fn rotate(&mut self, expected_start_seq: u64) -> anyhow::Result<()> {
let dir_name = segment_dir_name(expected_start_seq);
let seg_path = self.root_dir.join(&dir_name);
info!(segment = %dir_name, expected_start_seq, "rotating to new segment");
let new_seg = Segment::open(&seg_path, &self.db_name, self.config.segment_map_size_bytes)?;
self.active = new_seg;
self.active_dir_name = dir_name;
self.rotation_requested = false;
Ok(())
}
pub fn recover_from<T, F>(&self, after_seq: Option<u64>, mut on_event: F) -> anyhow::Result<()>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>,
F: for<'a> FnMut(&'a ArchivedEnvelopeOf<T>, bool) -> anyhow::Result<()>,
{
let start_seq = after_seq.map(|s| s.saturating_add(1)).unwrap_or(0);
info!(after_seq = ?after_seq, start_seq, "starting segmented wal recovery");
let dirs = list_segment_dirs(&self.root_dir)?;
if dirs.is_empty() {
info!("no segments found, nothing to recover");
return Ok(());
}
let start_idx = find_segment_for_seq(&dirs, start_seq);
struct TxState {
tx_id: u64,
tx_len: u16,
count: u16, }
let mut current_tx: Option<TxState> = None;
let mut last_seen_seq: Option<u64> = None;
let mut pending_last_bytes: Option<Vec<u8>> = None;
let mut completed_tx_buf: Vec<Vec<u8>> = Vec::new();
for dir_name in &dirs[start_idx..] {
let owned_seg = self.open_segment_if_not_active(dir_name)?;
let seg = owned_seg.as_ref().unwrap_or(&self.active);
let rtxn = seg.env.read_txn()?;
let mut iter = seg.log.range(&rtxn, &(start_seq..))?;
while let Some(entry) = iter.next().transpose()? {
let (app_seq, val_bytes) = entry;
last_seen_seq = Some(app_seq);
let archived =
match rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(val_bytes) {
Ok(a) => a,
Err(e) => {
error!(app_seq, error = %e, "corrupt WAL record");
return Err(WalError::CorruptRecord {
app_seq,
details: e.to_string(),
}
.into());
}
};
let tx_id: u64 = archived.tx_id.into();
let tx_len: u16 = archived.tx_len.into();
let tx_ix: u16 = archived.tx_ix.into();
if tx_len == 0 || tx_ix >= tx_len {
error!(app_seq, tx_id, tx_len, tx_ix, "invalid wal tx framing");
return Err(WalError::InvalidTxFraming {
app_seq,
tx_id,
tx_len,
tx_ix,
}
.into());
}
match &mut current_tx {
None => {
if tx_ix != 0 {
continue;
}
completed_tx_buf.clear();
completed_tx_buf.push(val_bytes.to_vec());
current_tx = Some(TxState {
tx_id,
tx_len,
count: 1,
});
}
Some(tx) => {
let expected_ix = tx.count;
let framing_mismatch =
tx.tx_id != tx_id || tx.tx_len != tx_len || tx_ix != expected_ix;
if framing_mismatch {
if tx_ix != 0 {
current_tx = None;
completed_tx_buf.clear();
continue;
}
completed_tx_buf.clear();
completed_tx_buf.push(val_bytes.to_vec());
*tx = TxState {
tx_id,
tx_len,
count: 1,
};
} else {
completed_tx_buf.push(val_bytes.to_vec());
tx.count += 1;
}
}
}
let is_complete = current_tx.as_ref().is_some_and(|tx| tx.count == tx.tx_len);
if is_complete {
for buf in completed_tx_buf.drain(..) {
if let Some(prev_bytes) = pending_last_bytes.take() {
let prev = rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(
&prev_bytes,
)?;
on_event(prev, false)?;
}
pending_last_bytes = Some(buf);
}
current_tx = None;
}
}
}
if let Some(tx) = current_tx.take() {
let trimmed_count = tx.count;
if trimmed_count != 0 {
let Some(first_bytes) = completed_tx_buf.first() else {
return Err(RecoveryError::WalTailMissingFirst.into());
};
let Some(last_bytes) = completed_tx_buf.last() else {
return Err(RecoveryError::WalTailMissingLast.into());
};
let first =
rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(first_bytes)?;
let last = rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(last_bytes)?;
let trimmed_start: u64 = first.seq.into();
let trimmed_end: u64 = last.seq.into();
let trim_to = trimmed_start.saturating_sub(1);
warn!(
trimmed_start,
trimmed_end, trimmed_count, trim_to, "dropping incomplete wal tail"
);
}
}
if let Some(last_bytes) = pending_last_bytes.take() {
let last = rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(&last_bytes)?;
on_event(last, true)?;
}
let last_seq = last_seen_seq.or(after_seq).unwrap_or(0);
info!(last_seq, "wal recovery complete");
Ok(())
}
pub fn validate_segments(&self) -> anyhow::Result<()> {
let dirs = list_segment_dirs(&self.root_dir)?;
if dirs.is_empty() {
return Err(SegmentError::NoSegments {
path: self.root_dir.display().to_string(),
}
.into());
}
let mut prev_end: Option<u64> = None;
for dir_name in &dirs {
let info = self.segment_info(dir_name)?;
if let Some(prev_end_seq) = prev_end
&& let Some(next_start_seq) = info.start_seq
{
let expected = prev_end_seq + 1;
if next_start_seq > expected {
return Err(SegmentError::ContiguityGap {
prev_end_seq,
next_start_seq,
expected,
}
.into());
}
if next_start_seq < expected {
return Err(SegmentError::ContiguityOverlap {
prev_end_seq,
next_start_seq,
}
.into());
}
}
if let Some(end) = info.end_seq {
prev_end = Some(end);
}
}
Ok(())
}
pub fn list_segments(&self) -> anyhow::Result<Vec<SegmentInfo>> {
let dirs = list_segment_dirs(&self.root_dir)?;
let mut infos = Vec::with_capacity(dirs.len());
for dir_name in &dirs {
infos.push(self.segment_info(dir_name)?);
}
Ok(infos)
}
pub fn prune_segments_before(&mut self, confirmed_seq: u64) -> anyhow::Result<Vec<String>> {
let dirs = list_segment_dirs(&self.root_dir)?;
if dirs.len() <= 1 {
return Ok(Vec::new());
}
let trash_root = trash_dir(&self.root_dir);
let mut deleted = Vec::new();
for dir_name in &dirs[..dirs.len() - 1] {
let info = self.segment_info(dir_name)?;
if let Some(end_seq) = info.end_seq
&& end_seq <= confirmed_seq
{
std::fs::create_dir_all(&trash_root)?;
let target = trash_root.join(dir_name);
if target.exists() {
warn!(segment = %dir_name, target = %target.display(), "trash target already exists, replacing");
if target.is_dir() {
std::fs::remove_dir_all(&target)?;
} else {
std::fs::remove_file(&target)?;
}
}
info!(segment = %dir_name, end_seq, confirmed_seq, "moving confirmed segment to trash");
std::fs::rename(&info.path, &target)?;
deleted.push(dir_name.clone());
}
}
Ok(deleted)
}
pub fn into_poller<T, W, B>(
self,
poller: EventPoller<RingSlot<T>, B, W::Notifier>,
wait_strategy: W,
gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
) -> JournalPoller<T, W, B>
where
W: WaitStrategy,
{
JournalPoller::<T, W, B>::new(self, poller, wait_strategy, gate, shutdown)
}
}
pub struct JournalPoller<T, W: WaitStrategy, B> {
journal: JournalHandler,
poller: EventPoller<RingSlot<T>, B, W::Notifier>,
wait_strategy: W,
gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
}
impl<T, W: WaitStrategy, B> JournalPoller<T, W, B> {
pub fn new(
journal: JournalHandler,
poller: EventPoller<RingSlot<T>, B, W::Notifier>,
wait_strategy: W,
gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
) -> Self {
Self {
journal,
poller,
wait_strategy,
gate,
shutdown,
}
}
}
impl<T, W, B> JournalPoller<T, W, B>
where
W: WaitStrategy + Send + 'static,
W::Notifier: Send + 'static,
T: RkyvToBytes + Send + Sync + 'static,
B: disrupt_rs::Barrier + 'static,
{
pub fn poll(self) -> thread::JoinHandle<anyhow::Result<()>> {
self.spawn(None)
}
pub fn poll_pinned(self, core_id: Option<usize>) -> thread::JoinHandle<anyhow::Result<()>> {
self.spawn(core_id)
}
fn spawn(self, core_id: Option<usize>) -> thread::JoinHandle<anyhow::Result<()>> {
let mut this = self;
thread::spawn(move || {
trace!(core_id = ?core_id, "starting wal poller");
if let Some(id) = core_id
&& !core_affinity::set_for_current(CoreId { id })
{
warn!(
core_id = id,
"failed to pin wal poller thread to requested core"
);
}
crate::metrics_stage!("wal_poller");
let mut waiter = this.wait_strategy.new_waiter();
let res = (|| {
let mut wtxn = this.journal.write_txn()?;
let mut last_completed_tx_slot: Sequence = 0;
'poll: loop {
match this.poller.poll_wait(&mut waiter) {
Ok(mut events) => {
let t0 = Instant::now();
let events_len = events.len();
crate::metric!({ inflight: 1, in_total: events_len as u64 });
let mut batch_end_app_seq: Option<u64> = None;
for (s, e) in &mut events {
let envelope: &Envelope<T> = e;
this.journal.put_envelope(&mut wtxn, envelope)?;
if envelope.is_tx_end() {
last_completed_tx_slot = s;
batch_end_app_seq = Some(envelope.seq);
} else {
batch_end_app_seq = None;
}
}
wtxn.commit()?;
this.gate.set(last_completed_tx_slot);
if this.journal.should_rotate()
&& let Some(last_app_seq) = batch_end_app_seq
{
let next_seq = last_app_seq.saturating_add(1);
this.journal.rotate(next_seq)?;
}
wtxn = this.journal.write_txn()?;
crate::metric!({
out_total: 1, inflight: 0,
duration_ns: t0.elapsed().as_nanos() as u64
});
}
Err(Polling::NoEvents) => {
if this.shutdown.load(Ordering::Acquire) {
break 'poll;
}
continue;
}
Err(Polling::Shutdown) => break 'poll,
}
}
Ok::<_, anyhow::Error>(())
})();
if let Err(e) = res {
error!(error = ?e, "journal poller error");
crate::metric!({ err_total: 1 });
this.shutdown.store(true, Ordering::Relaxed);
return Err(e);
}
Ok(())
})
}
}