use heed::{
Database, EnvFlags, EnvOpenOptions,
byteorder::BigEndian,
types::{Bytes, U64},
};
use super::journal_scan::{
ArchivedEnvelopeOf, JournalEvent, JournalScanSource, ScanStartPolicy, ScanSummary,
ValidatedScan, decode_journal_event, replay_wal, validate_recovery_window,
validate_segment_sequence,
};
use crate::disruptor::{
Envelope, RingSlot,
traits::{RkyvError, RkyvToBytes},
};
use crate::error::WalError;
use crate::{book::BookEventEnvelope, config::JournalConfig};
use chrono::Utc;
use core_affinity::CoreId;
use disrupt_rs::{EventPoller, GatedSequence, Polling, Sequence, wait_strategies::WaitStrategy};
use fs4::{FileExt, TryLockError};
use parking_lot::Mutex;
use std::{
cell::{Cell, RefCell},
collections::HashSet,
fs::{File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::{
Arc, LazyLock,
atomic::{AtomicBool, AtomicU64, Ordering},
},
thread,
time::Instant,
};
use tracing::{error, info, trace, warn};
type BEU64 = U64<BigEndian>;
type LogDb = Database<BEU64, Bytes>;
type MetaDb = Database<Bytes, BEU64>;
const META_SCHEMA_VERSION_KEY: &[u8] = b"schema_version";
const WAL_SCHEMA_VERSION: u64 = 3;
const TRASH_DIR_NAME: &str = "trash";
const RETIRED_DIR_NAME: &str = "retired";
const WRITER_LOCK_FILE_NAME: &str = ".writer.lock";
static HELD_WAL_ROOTS: LazyLock<Mutex<HashSet<PathBuf>>> =
LazyLock::new(|| Mutex::new(HashSet::new()));
#[derive(Clone, Debug, Default)]
pub struct WalMaintenanceHandle {
retention_watermark: Arc<AtomicU64>,
}
impl WalMaintenanceHandle {
pub fn advance_retention(&self, seq: u64) {
self.retention_watermark.fetch_max(seq, Ordering::Release);
}
pub fn retention_watermark(&self) -> u64 {
self.retention_watermark.load(Ordering::Acquire)
}
}
#[inline]
fn journaler_error_shutdown_ordering() -> Ordering {
Ordering::Release
}
thread_local! {
static SCRATCH: RefCell<rkyv::util::AlignedVec> =
RefCell::new(rkyv::util::AlignedVec::with_capacity(1024));
}
pub(crate) const ENGINE_WAL_DB_NAME: &str = "walw";
fn segment_dir_name(start_seq: u64) -> String {
format!("seg_{start_seq:020}")
}
pub(crate) fn parse_segment_dir(name: &str) -> Option<u64> {
name.strip_prefix("seg_")
.and_then(|s| s.parse::<u64>().ok())
}
pub(crate) fn list_segment_dirs(root: &Path) -> anyhow::Result<Vec<String>> {
let mut dirs = Vec::new();
if !root.exists() {
return Ok(dirs);
}
for entry in std::fs::read_dir(root)? {
let entry = entry?;
if entry.file_type()?.is_dir() {
let name = entry.file_name().to_string_lossy().into_owned();
if parse_segment_dir(&name).is_some() {
dirs.push(name);
}
}
}
dirs.sort_unstable_by_key(|name| parse_segment_dir(name).unwrap_or(0));
Ok(dirs)
}
fn trash_dir(root: &Path) -> PathBuf {
root.join(TRASH_DIR_NAME)
}
fn clear_trash_dir(root: &Path) -> anyhow::Result<usize> {
let trash = trash_dir(root);
if !trash.exists() {
return Ok(0);
}
let mut removed = 0usize;
for entry in std::fs::read_dir(&trash)? {
let entry = entry?;
let path = entry.path();
if entry.file_type()?.is_dir() {
std::fs::remove_dir_all(&path)?;
} else {
std::fs::remove_file(&path)?;
}
removed += 1;
}
if std::fs::read_dir(&trash)?.next().is_none() {
drop(std::fs::remove_dir(&trash));
}
Ok(removed)
}
pub(crate) fn find_segment_for_seq(dirs: &[String], target: u64) -> usize {
match dirs.binary_search_by_key(&target, |d| parse_segment_dir(d).unwrap_or(0)) {
Ok(i) => i,
Err(i) => i.saturating_sub(1),
}
}
fn first_log_seq(env: &heed::Env, log: &LogDb) -> anyhow::Result<Option<u64>> {
let rtxn = env.read_txn()?;
Ok(log.first(&rtxn)?.map(|(k, _)| k))
}
fn last_log_seq(env: &heed::Env, log: &LogDb) -> anyhow::Result<Option<u64>> {
let rtxn = env.read_txn()?;
Ok(log.last(&rtxn)?.map(|(k, _)| k))
}
fn visit_log_entries<F>(
env: &heed::Env,
log: &LogDb,
start_seq: u64,
up_to_seq: Option<u64>,
mut on_entry: F,
) -> anyhow::Result<()>
where
F: FnMut(u64, &[u8]) -> anyhow::Result<()>,
{
let rtxn = env.read_txn()?;
match up_to_seq {
Some(up_to_seq) => {
let mut iter = log.range(&rtxn, &(start_seq..=up_to_seq))?;
while let Some(entry) = iter.next().transpose()? {
let (seq, val_bytes) = entry;
on_entry(seq, val_bytes)?;
}
}
None => {
let mut iter = log.range(&rtxn, &(start_seq..))?;
while let Some(entry) = iter.next().transpose()? {
let (seq, val_bytes) = entry;
on_entry(seq, val_bytes)?;
}
}
}
Ok(())
}
struct Segment {
env: heed::Env,
log: LogDb,
event_count: Cell<u64>,
}
impl Segment {
fn open(dir_path: &Path, db_name: &str, map_size: usize) -> anyhow::Result<Self> {
std::fs::create_dir_all(dir_path)?;
let env = unsafe {
EnvOpenOptions::new()
.map_size(map_size)
.max_dbs(8)
.open(dir_path)?
};
let log = {
let mut wtxn = env.write_txn()?;
let db: LogDb = env.create_database(&mut wtxn, Some(db_name))?;
let meta: MetaDb = env.create_database(&mut wtxn, Some("meta"))?;
match meta.get(&wtxn, META_SCHEMA_VERSION_KEY)? {
Some(v) if v == WAL_SCHEMA_VERSION => {}
Some(v) => {
return Err(WalError::SchemaMismatch {
expected: WAL_SCHEMA_VERSION,
found: v,
}
.into());
}
None => {
meta.put(&mut wtxn, META_SCHEMA_VERSION_KEY, &WAL_SCHEMA_VERSION)?;
}
}
wtxn.commit()?;
db
};
let event_count = {
let rtxn = env.read_txn()?;
log.len(&rtxn)?
};
Ok(Self {
env,
log,
event_count: Cell::new(event_count),
})
}
fn write_txn(&self) -> anyhow::Result<heed::RwTxn<'_>> {
Ok(self.env.write_txn()?)
}
fn put_envelope<T>(
&self,
wtxn: &mut heed::RwTxn<'_>,
envelope: &Envelope<T>,
) -> anyhow::Result<()>
where
T: RkyvToBytes,
{
SCRATCH.with(|scratch| -> anyhow::Result<()> {
let mut bytes = scratch.borrow_mut();
let mut writer = std::mem::take(&mut *bytes);
writer.clear();
let writer = rkyv::api::high::to_bytes_in::<_, RkyvError>(envelope, writer)?;
self.log.put(wtxn, &envelope.seq, writer.as_slice())?;
*bytes = writer;
Ok(())
})?;
Ok(())
}
}
pub(crate) struct ReadSegment {
env: heed::Env,
log: LogDb,
event_count: u64,
}
impl ReadSegment {
pub(crate) fn open(dir_path: &Path, db_name: &str, map_size: usize) -> anyhow::Result<Self> {
let env = unsafe {
let mut options = EnvOpenOptions::new();
options.map_size(map_size).max_dbs(8);
options.flags(EnvFlags::READ_ONLY);
options.open(dir_path)?
};
let (log, event_count) = {
let rtxn = env.read_txn()?;
let log: LogDb = env
.open_database(&rtxn, Some(db_name))?
.ok_or_else(|| anyhow::anyhow!("journal database `{db_name}` not found"))?;
let meta: MetaDb =
env.open_database(&rtxn, Some("meta"))?
.ok_or(WalError::SchemaMismatch {
expected: WAL_SCHEMA_VERSION,
found: 0,
})?;
match meta.get(&rtxn, META_SCHEMA_VERSION_KEY)? {
Some(v) if v == WAL_SCHEMA_VERSION => {}
Some(v) => {
return Err(WalError::SchemaMismatch {
expected: WAL_SCHEMA_VERSION,
found: v,
}
.into());
}
None => {
return Err(WalError::SchemaMismatch {
expected: WAL_SCHEMA_VERSION,
found: 0,
}
.into());
}
}
let event_count = log.len(&rtxn)?;
rtxn.commit()?;
(log, event_count)
};
Ok(Self {
env,
log,
event_count,
})
}
pub(crate) fn visit_entries<F>(
&self,
start_seq: u64,
up_to_seq: Option<u64>,
on_entry: F,
) -> anyhow::Result<()>
where
F: FnMut(u64, &[u8]) -> anyhow::Result<()>,
{
visit_log_entries(&self.env, &self.log, start_seq, up_to_seq, on_entry)
}
}
pub struct SegmentInfo {
pub dir_name: String,
pub path: PathBuf,
pub start_seq: Option<u64>,
pub end_seq: Option<u64>,
pub event_count: u64,
pub file_size_bytes: u64,
}
fn build_segment_info(
root_dir: &Path,
dir_name: &str,
start_seq: Option<u64>,
end_seq: Option<u64>,
event_count: u64,
) -> anyhow::Result<SegmentInfo> {
let path = root_dir.join(dir_name);
let file_size_bytes = dir_size(&path)?;
Ok(SegmentInfo {
dir_name: dir_name.to_string(),
path,
start_seq,
end_seq,
event_count,
file_size_bytes,
})
}
fn segment_info_from_open(
root_dir: &Path,
dir_name: &str,
seg: &Segment,
) -> anyhow::Result<SegmentInfo> {
build_segment_info(
root_dir,
dir_name,
first_log_seq(&seg.env, &seg.log)?,
last_log_seq(&seg.env, &seg.log)?,
seg.event_count.get(),
)
}
fn segment_info_static(
root_dir: &Path,
dir_name: &str,
db_name: &str,
map_size: usize,
) -> anyhow::Result<SegmentInfo> {
let path = root_dir.join(dir_name);
let seg = Segment::open(&path, db_name, map_size)?;
segment_info_from_open(root_dir, dir_name, &seg)
}
fn segment_info_from_read(
root_dir: &Path,
dir_name: &str,
seg: &ReadSegment,
) -> anyhow::Result<SegmentInfo> {
build_segment_info(
root_dir,
dir_name,
first_log_seq(&seg.env, &seg.log)?,
last_log_seq(&seg.env, &seg.log)?,
seg.event_count,
)
}
pub(crate) fn segment_info_readonly(
root_dir: &Path,
dir_name: &str,
db_name: &str,
map_size: usize,
) -> anyhow::Result<SegmentInfo> {
let path = root_dir.join(dir_name);
let seg = ReadSegment::open(&path, db_name, map_size)?;
segment_info_from_read(root_dir, dir_name, &seg)
}
fn dir_size(path: &Path) -> anyhow::Result<u64> {
let mut total = 0u64;
if path.is_dir() {
for entry in std::fs::read_dir(path)? {
let entry = entry?;
let meta = entry.metadata()?;
if meta.is_file() {
total += meta.len();
}
}
}
Ok(total)
}
struct WalWriterLease {
canonical_root: PathBuf,
lock_file: File,
}
impl WalWriterLease {
fn acquire(root_dir: &Path) -> anyhow::Result<Self> {
let canonical_root = std::fs::canonicalize(root_dir)?;
{
let mut held = HELD_WAL_ROOTS.lock();
if held.contains(&canonical_root) {
return Err(WalError::WalAlreadyLocked {
wal_path: canonical_root.display().to_string(),
owner: read_writer_owner_metadata(&canonical_root),
}
.into());
}
held.insert(canonical_root.clone());
}
let lease = (|| -> anyhow::Result<Self> {
let lock_path = canonical_root.join(WRITER_LOCK_FILE_NAME);
let mut lock_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)?;
match FileExt::try_lock(&lock_file) {
Ok(()) => {}
Err(TryLockError::WouldBlock) => {
return Err(WalError::WalAlreadyLocked {
wal_path: canonical_root.display().to_string(),
owner: read_owner_metadata_from_file(&mut lock_file),
}
.into());
}
Err(TryLockError::Error(err)) => return Err(err.into()),
}
write_writer_owner_metadata(&mut lock_file, &canonical_root)?;
Ok(Self {
canonical_root: canonical_root.clone(),
lock_file,
})
})();
if lease.is_err() {
HELD_WAL_ROOTS.lock().remove(&canonical_root);
}
lease
}
}
impl Drop for WalWriterLease {
fn drop(&mut self) {
drop(FileExt::unlock(&self.lock_file));
HELD_WAL_ROOTS.lock().remove(&self.canonical_root);
}
}
fn read_writer_owner_metadata(root_dir: &Path) -> Option<String> {
let lock_path = root_dir.join(WRITER_LOCK_FILE_NAME);
let mut lock_file = OpenOptions::new()
.read(true)
.write(true)
.open(&lock_path)
.ok()?;
read_owner_metadata_from_file(&mut lock_file)
}
fn read_owner_metadata_from_file(file: &mut File) -> Option<String> {
file.seek(SeekFrom::Start(0)).ok()?;
let mut owner = String::new();
file.read_to_string(&mut owner).ok()?;
let owner = owner.trim().to_string();
(!owner.is_empty()).then_some(owner)
}
fn write_writer_owner_metadata(file: &mut File, canonical_root: &Path) -> anyhow::Result<()> {
file.set_len(0)?;
file.seek(SeekFrom::Start(0))?;
file.write_all(build_writer_owner_metadata(canonical_root).as_bytes())?;
file.flush()?;
Ok(())
}
fn build_writer_owner_metadata(canonical_root: &Path) -> String {
let pid = std::process::id();
let hostname = std::env::var("HOSTNAME")
.ok()
.or_else(|| std::env::var("COMPUTERNAME").ok())
.unwrap_or_else(|| "unknown".to_string());
let exe = std::env::current_exe()
.ok()
.map(|path| path.display().to_string())
.unwrap_or_else(|| "unknown".to_string());
let acquired_at = Utc::now().to_rfc3339();
format!(
"pid={pid}; hostname={hostname}; exe={exe}; acquired_at={acquired_at}; wal_path={}",
canonical_root.display()
)
}
pub struct JournalHandler {
root_dir: PathBuf,
db_name: String,
config: JournalConfig,
active: Segment,
active_dir_name: String,
rotation_requested: bool,
_writer_lease: WalWriterLease,
}
impl JournalScanSource for JournalHandler {
fn root_dir(&self) -> &Path {
&self.root_dir
}
fn visit_segment_entries<F>(
&self,
dir_name: &str,
start_seq: u64,
up_to_seq: Option<u64>,
on_entry: F,
) -> anyhow::Result<()>
where
F: FnMut(u64, &[u8]) -> anyhow::Result<()>,
{
let owned_seg = self.open_segment_if_not_active(dir_name)?;
let seg = owned_seg.as_ref().unwrap_or(&self.active);
visit_log_entries(&seg.env, &seg.log, start_seq, up_to_seq, on_entry)
}
}
impl JournalHandler {
pub fn open_engine(root_dir: impl AsRef<Path>, config: JournalConfig) -> anyhow::Result<Self> {
Self::open_raw(root_dir, ENGINE_WAL_DB_NAME, config)
}
pub fn open(
root_dir: impl AsRef<Path>,
db_name: &str,
config: JournalConfig,
) -> anyhow::Result<Self> {
let mut handler = Self::open_raw(root_dir, db_name, config)?;
handler.apply_startup_maintenance()?;
if handler.config.validate_on_startup {
handler.validate_segments()?;
}
Ok(handler)
}
fn open_raw(
root_dir: impl AsRef<Path>,
db_name: &str,
config: JournalConfig,
) -> anyhow::Result<Self> {
let root_dir = root_dir.as_ref().to_path_buf();
std::fs::create_dir_all(&root_dir)?;
let writer_lease = WalWriterLease::acquire(&root_dir)?;
let root_dir = writer_lease.canonical_root.clone();
let dirs = list_segment_dirs(&root_dir)?;
let (active, active_dir_name) = if dirs.is_empty() {
let dir_name = segment_dir_name(0);
let seg_path = root_dir.join(&dir_name);
info!(segment = %dir_name, "creating initial segment");
let seg = Segment::open(&seg_path, db_name, config.segment_map_size_bytes)?;
(seg, dir_name)
} else {
let last = dirs.last().unwrap().clone();
let seg_path = root_dir.join(&last);
info!(segment = %last, "opening last segment as active");
let seg = Segment::open(&seg_path, db_name, config.segment_map_size_bytes)?;
(seg, last)
};
let handler = Self {
root_dir,
db_name: db_name.to_string(),
config,
active,
active_dir_name,
rotation_requested: false,
_writer_lease: writer_lease,
};
Ok(handler)
}
pub fn apply_startup_maintenance(&mut self) -> anyhow::Result<()> {
if let Some(confirmed_seq) = self.config.prune_before_seq_on_startup {
let moved = self.prune_segments_before(confirmed_seq)?;
if !moved.is_empty() {
info!(
moved_segments = moved.len(),
confirmed_seq, "retired sealed segments after startup recovery"
);
}
}
if self.config.clear_trash_on_startup {
match clear_trash_dir(&self.root_dir) {
Ok(0) => {}
Ok(n) => info!(
removed_entries = n,
"deleted trashed journal segments after startup recovery"
),
Err(e) => warn!(
error = %e,
"failed to clear trash directory after startup recovery"
),
}
}
Ok(())
}
fn segment_info(&self, dir_name: &str) -> anyhow::Result<SegmentInfo> {
if dir_name == self.active_dir_name {
segment_info_from_open(&self.root_dir, dir_name, &self.active)
} else {
segment_info_static(
&self.root_dir,
dir_name,
&self.db_name,
self.config.segment_map_size_bytes,
)
}
}
fn open_segment_if_not_active(&self, dir_name: &str) -> anyhow::Result<Option<Segment>> {
if dir_name == self.active_dir_name {
Ok(None)
} else {
let path = self.root_dir.join(dir_name);
Ok(Some(Segment::open(
&path,
&self.db_name,
self.config.segment_map_size_bytes,
)?))
}
}
pub fn write_txn(&self) -> anyhow::Result<heed::RwTxn<'_>> {
self.active.write_txn()
}
pub fn put_envelope<T>(
&self,
wtxn: &mut heed::RwTxn<'_>,
envelope: &Envelope<T>,
) -> anyhow::Result<()>
where
T: RkyvToBytes,
{
self.active.put_envelope(wtxn, envelope)?;
self.active
.event_count
.set(self.active.event_count.get() + 1);
Ok(())
}
pub fn request_rotation(&mut self) {
self.rotation_requested = true;
}
pub fn should_rotate(&self) -> bool {
if self.rotation_requested {
return true;
}
self.active.event_count.get() >= self.config.max_segment_events
}
pub fn rotate(&mut self, expected_start_seq: u64) -> anyhow::Result<()> {
let dir_name = segment_dir_name(expected_start_seq);
let seg_path = self.root_dir.join(&dir_name);
info!(segment = %dir_name, expected_start_seq, "rotating to new segment");
let new_seg = Segment::open(&seg_path, &self.db_name, self.config.segment_map_size_bytes)?;
self.active = new_seg;
self.active_dir_name = dir_name;
self.rotation_requested = false;
Ok(())
}
pub fn recover_from<T, F>(&self, after_seq: Option<u64>, mut on_event: F) -> anyhow::Result<()>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>,
F: for<'a> FnMut(&'a ArchivedEnvelopeOf<T>, bool) -> anyhow::Result<()>,
{
let validation = validate_recovery_window::<T, _>(
self,
after_seq,
None,
ScanStartPolicy::ExactContinuation,
None,
)?;
let Some(validated_up_to_seq) = validation.summary.last_emitted_seq else {
return Ok(());
};
replay_wal::<T, _, _>(
self,
after_seq,
Some(validated_up_to_seq),
ScanStartPolicy::ExactContinuation,
|bytes, end_of_batch| {
let archived = rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(bytes)?;
on_event(archived, end_of_batch)
},
)?;
Ok(())
}
pub fn replay_engine_events_from<F>(
&self,
after_seq: Option<u64>,
mut on_event: F,
) -> anyhow::Result<Option<u64>>
where
F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
{
let validation = self.validate_events_from_required(after_seq, None)?;
let Some(validated_up_to_seq) = validation.summary.last_emitted_seq else {
return Ok(None);
};
self.replay_events_from(after_seq, Some(validated_up_to_seq), &mut on_event)?;
Ok(Some(validated_up_to_seq))
}
pub(crate) fn validate_events_from_required(
&self,
after_seq: Option<u64>,
required_through_seq: Option<u64>,
) -> anyhow::Result<ValidatedScan> {
validate_recovery_window::<BookEventEnvelope, _>(
self,
after_seq,
None,
ScanStartPolicy::ExactContinuation,
required_through_seq,
)
}
pub(crate) fn replay_events_from<F>(
&self,
after_seq: Option<u64>,
up_to_seq: Option<u64>,
mut on_event: F,
) -> anyhow::Result<ScanSummary>
where
F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
{
replay_wal::<BookEventEnvelope, _, _>(
self,
after_seq,
up_to_seq,
ScanStartPolicy::ExactContinuation,
|bytes, end_of_batch| {
let event = decode_journal_event(bytes)?;
on_event(&event, end_of_batch)
},
)
}
pub fn validate_segments(&self) -> anyhow::Result<()> {
validate_segment_sequence(&self.root_dir, |dir_name| self.segment_info(dir_name))
}
pub fn list_segments(&self) -> anyhow::Result<Vec<SegmentInfo>> {
let dirs = list_segment_dirs(&self.root_dir)?;
let mut infos = Vec::with_capacity(dirs.len());
for dir_name in &dirs {
infos.push(self.segment_info(dir_name)?);
}
Ok(infos)
}
pub fn prune_segments_before(&self, confirmed_seq: u64) -> anyhow::Result<Vec<String>> {
let dirs = list_segment_dirs(&self.root_dir)?;
if dirs.len() <= 1 {
return Ok(Vec::new());
}
let retired_root = self.root_dir.join(RETIRED_DIR_NAME);
let mut moved = Vec::new();
for dir_name in &dirs[..dirs.len() - 1] {
let info = self.segment_info(dir_name)?;
if let Some(end_seq) = info.end_seq
&& end_seq <= confirmed_seq
{
std::fs::create_dir_all(&retired_root)?;
let target = retired_root.join(dir_name);
if target.exists() {
anyhow::bail!(
"retired segment target already exists: {}",
target.display()
);
}
info!(
segment = %dir_name,
end_seq,
confirmed_seq,
retired_dir = %retired_root.display(),
"moving confirmed segment to retired directory"
);
std::fs::rename(&info.path, &target)?;
moved.push(dir_name.clone());
}
}
Ok(moved)
}
pub fn into_poller<T, W, B>(
self,
poller: EventPoller<RingSlot<T>, B, W::Notifier>,
wait_strategy: W,
gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
maintenance: WalMaintenanceHandle,
) -> JournalPoller<T, W, B>
where
W: WaitStrategy,
{
JournalPoller::<T, W, B>::new(self, poller, wait_strategy, gate, shutdown, maintenance)
}
}
pub struct JournalPoller<T, W: WaitStrategy, B> {
journal: JournalHandler,
poller: EventPoller<RingSlot<T>, B, W::Notifier>,
wait_strategy: W,
gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
maintenance: WalMaintenanceHandle,
}
impl<T, W: WaitStrategy, B> JournalPoller<T, W, B> {
pub fn new(
journal: JournalHandler,
poller: EventPoller<RingSlot<T>, B, W::Notifier>,
wait_strategy: W,
gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
maintenance: WalMaintenanceHandle,
) -> Self {
Self {
journal,
poller,
wait_strategy,
gate,
shutdown,
maintenance,
}
}
}
fn apply_retention_watermark(
journal: &JournalHandler,
maintenance: &WalMaintenanceHandle,
) -> anyhow::Result<()> {
let watermark = maintenance.retention_watermark();
if watermark == 0 {
return Ok(());
}
let moved = journal.prune_segments_before(watermark)?;
if !moved.is_empty() {
info!(
retired_segments = moved.len(),
retention_watermark = watermark,
"retired sealed WAL segments below retention watermark"
);
}
Ok(())
}
impl<T, W, B> JournalPoller<T, W, B>
where
W: WaitStrategy + Send + 'static,
W::Notifier: Send + 'static,
T: RkyvToBytes + Send + Sync + 'static,
B: disrupt_rs::Barrier + 'static,
{
pub fn poll(self) -> thread::JoinHandle<anyhow::Result<()>> {
self.spawn(None)
}
pub fn poll_pinned(self, core_id: Option<usize>) -> thread::JoinHandle<anyhow::Result<()>> {
self.spawn(core_id)
}
fn spawn(self, core_id: Option<usize>) -> thread::JoinHandle<anyhow::Result<()>> {
let JournalPoller {
mut journal,
mut poller,
wait_strategy,
gate,
shutdown,
maintenance,
} = self;
thread::spawn(move || {
trace!(core_id = ?core_id, "starting wal poller");
if let Some(id) = core_id
&& !core_affinity::set_for_current(CoreId { id })
{
warn!(
core_id = id,
"failed to pin wal poller thread to requested core"
);
}
crate::metrics_stage!("wal_poller");
let mut waiter = wait_strategy.new_waiter();
let res = (|| {
let mut wtxn = journal.write_txn()?;
let mut last_completed_tx_slot: Sequence = 0;
'poll: loop {
match poller.poll_wait(&mut waiter) {
Ok(mut events) => {
let t0 = Instant::now();
let events_len = events.len();
crate::metric!({ inflight: 1, in_total: events_len as u64 });
let mut batch_end_app_seq: Option<u64> = None;
for (s, e) in &mut events {
let envelope: &Envelope<T> = e;
journal.put_envelope(&mut wtxn, envelope)?;
if envelope.is_tx_end() {
last_completed_tx_slot = s;
batch_end_app_seq = Some(envelope.seq);
} else {
batch_end_app_seq = None;
}
}
wtxn.commit()?;
gate.set(last_completed_tx_slot);
if journal.should_rotate()
&& let Some(last_app_seq) = batch_end_app_seq
{
let next_seq = last_app_seq.saturating_add(1);
journal.rotate(next_seq)?;
apply_retention_watermark(&journal, &maintenance)?;
}
wtxn = journal.write_txn()?;
crate::metric!({
out_total: 1, inflight: 0,
duration_ns: t0.elapsed().as_nanos() as u64
});
}
Err(Polling::NoEvents) => {
if shutdown.load(Ordering::Acquire) {
break 'poll;
}
continue;
}
Err(Polling::Shutdown) => break 'poll,
}
}
Ok::<_, anyhow::Error>(())
})();
if let Err(e) = res {
error!(error = ?e, "journal poller error");
crate::metric!({ err_total: 1 });
shutdown.store(true, journaler_error_shutdown_ordering());
return Err(e);
}
Ok(())
})
}
}