use super::{Cursor, CursorGapConfig};
use crate::coordinate::Region;
use crate::store::delivery::canal::CanalHandle;
use crate::store::delivery::observation::{AtLeastOnce, CheckpointId};
use crate::store::index::{IndexEntry, StoreIndex};
use crate::store::{RestartPolicy, Store, StoreError};
use parking_lot::Mutex;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub enum CursorWorkerAction {
Continue,
Stop,
StopWithRollback,
}
pub(crate) type RestartBudgetExhaustedCallback = Box<dyn FnOnce() + Send + 'static>;
pub(crate) type CheckpointFailureCallback = Box<dyn FnMut(&str, std::io::Error) + Send + 'static>;
fn checkpoint_write_failed(id: &str, error: &std::io::Error) -> StoreError {
StoreError::CheckpointWriteFailed {
id: id.to_string(),
source: std::io::Error::new(error.kind(), error.to_string()),
}
}
fn stringify_panic_payload(panic_info: &(dyn std::any::Any + Send)) -> String {
if let Some(s) = panic_info.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic_info.downcast_ref::<String>() {
s.clone()
} else {
"unknown panic".to_string()
}
}
fn build_worker_cursor(
region: &Region,
index: &Arc<StoreIndex>,
data_dir: &Path,
checkpoint_id: Option<&CheckpointId>,
load_saved_checkpoint: bool,
) -> Result<Cursor, StoreError> {
match checkpoint_id {
Some(id) if load_saved_checkpoint => {
Cursor::new_with_checkpoint(region.clone(), Arc::clone(index), data_dir, id)
}
Some(id) => Ok(Cursor::new_bound_checkpoint(
region.clone(),
Arc::clone(index),
data_dir,
id.clone(),
)),
None => Ok(Cursor::new(region.clone(), Arc::clone(index))),
}
}
#[non_exhaustive]
pub struct CursorWorkerConfig {
pub batch_size: usize,
pub idle_sleep: Duration,
pub restart: RestartPolicy,
pub checkpoint_id: Option<CheckpointId>,
pub gap_observation: CursorGapConfig,
pub(crate) on_restart_budget_exhausted: Option<RestartBudgetExhaustedCallback>,
pub(crate) on_checkpoint_failure: Option<CheckpointFailureCallback>,
}
impl Clone for CursorWorkerConfig {
fn clone(&self) -> Self {
Self {
batch_size: self.batch_size,
idle_sleep: self.idle_sleep,
restart: self.restart.clone(),
checkpoint_id: self.checkpoint_id.clone(),
gap_observation: self.gap_observation,
on_restart_budget_exhausted: None,
on_checkpoint_failure: None,
}
}
}
impl std::fmt::Debug for CursorWorkerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CursorWorkerConfig")
.field("batch_size", &self.batch_size)
.field("idle_sleep", &self.idle_sleep)
.field("restart", &self.restart)
.field("checkpoint_id", &self.checkpoint_id)
.field("gap_observation", &self.gap_observation)
.field(
"on_restart_budget_exhausted",
&self.on_restart_budget_exhausted.is_some(),
)
.field(
"on_checkpoint_failure",
&self.on_checkpoint_failure.is_some(),
)
.finish()
}
}
impl Default for CursorWorkerConfig {
fn default() -> Self {
Self {
batch_size: 64,
idle_sleep: Duration::from_millis(10),
restart: RestartPolicy::Once,
checkpoint_id: None,
gap_observation: CursorGapConfig::default(),
on_restart_budget_exhausted: None,
on_checkpoint_failure: None,
}
}
}
#[must_use = "dropping a CursorWorkerHandle leaks the background worker thread; call stop()/join() to wind it down"]
pub struct CursorWorkerHandle {
stop: Arc<AtomicBool>,
join: Option<Box<dyn crate::store::platform::spawn::JobHandle>>,
error_slot: Arc<Mutex<Option<StoreError>>>,
}
impl CursorWorkerHandle {
fn finish_join(&mut self) -> Result<(), StoreError> {
if let Some(join) = self.join.take() {
join.join().map_err(|_| StoreError::WriterCrashed)?;
}
let mut guard = self.error_slot.lock();
guard.take().map_or(Ok(()), Err)
}
pub fn stop(&self) {
self.stop.store(true, Ordering::Release);
}
pub fn join(mut self) -> Result<(), StoreError> {
self.finish_join()
}
pub fn stop_and_join(mut self) -> Result<(), StoreError> {
self.stop();
self.finish_join()
}
}
impl CanalHandle for CursorWorkerHandle {
fn stop(&self) {
CursorWorkerHandle::stop(self);
}
fn join(self: Box<Self>) -> Result<(), StoreError> {
(*self).join()
}
fn stop_and_join(self: Box<Self>) -> Result<(), StoreError> {
(*self).stop_and_join()
}
}
impl Drop for CursorWorkerHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Release);
}
}
impl Store<crate::store::Open> {
pub fn cursor_worker<F>(
self: &Arc<Self>,
region: &Region,
config: CursorWorkerConfig,
handler: F,
) -> Result<CursorWorkerHandle, StoreError>
where
F: FnMut(
&[IndexEntry],
&Store<crate::store::Open>,
Option<&AtLeastOnce>,
) -> CursorWorkerAction
+ Send
+ 'static,
{
let store = Arc::clone(self);
let region = region.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = Arc::clone(&stop);
let error_slot = Arc::new(Mutex::new(None));
let error_slot_thread = Arc::clone(&error_slot);
let CursorWorkerConfig {
batch_size,
idle_sleep,
restart,
checkpoint_id,
gap_observation,
on_restart_budget_exhausted,
on_checkpoint_failure,
} = config;
let at_least_once = checkpoint_id
.as_ref()
.map(|id| AtLeastOnce::from_cursor_callback(id.as_str()))
.transpose()
.map_err(StoreError::CheckpointId)?;
let loop_ctx = CursorWorkerLoop {
store,
region,
stop: stop_thread,
error_slot: error_slot_thread,
batch_size,
idle_sleep,
restart,
checkpoint_id,
gap_observation,
on_restart_budget_exhausted,
on_checkpoint_failure,
at_least_once,
};
let join = self
.config
.spawner()
.spawn(
"batpak-cursor-worker".to_string(),
None,
Box::new(move || loop_ctx.run(handler)),
)
.map_err(StoreError::from)?;
Ok(CursorWorkerHandle {
stop,
join: Some(join),
error_slot,
})
}
}
struct CursorWorkerLoop {
store: Arc<Store>,
region: Region,
stop: Arc<AtomicBool>,
error_slot: Arc<Mutex<Option<StoreError>>>,
batch_size: usize,
idle_sleep: Duration,
restart: RestartPolicy,
checkpoint_id: Option<CheckpointId>,
gap_observation: CursorGapConfig,
on_restart_budget_exhausted: Option<RestartBudgetExhaustedCallback>,
on_checkpoint_failure: Option<CheckpointFailureCallback>,
at_least_once: Option<AtLeastOnce>,
}
enum Step {
Continue,
Stop,
}
struct CursorRunState {
cursor: Cursor,
committed: (u64, bool),
restarts: u32,
window_start_ns: i64,
}
impl CursorWorkerLoop {
fn record_first_error(&self, error: StoreError) {
let mut guard = self.error_slot.lock();
if guard.is_none() {
*guard = Some(error);
}
}
fn signal_stop(&self) {
self.stop.store(true, Ordering::Release);
}
fn run<F>(mut self, mut handler: F)
where
F: FnMut(
&[IndexEntry],
&Store<crate::store::Open>,
Option<&AtLeastOnce>,
) -> CursorWorkerAction
+ Send
+ 'static,
{
let mut cursor = match build_worker_cursor(
&self.region,
&self.store.index,
&self.store.config.data_dir,
self.checkpoint_id.as_ref(),
true,
) {
Ok(cursor) => cursor,
Err(error) => {
self.record_first_error(error);
self.signal_stop();
return;
}
};
cursor = cursor.with_gap_config(self.gap_observation);
let committed = cursor.checkpoint();
let mut state = CursorRunState {
cursor,
committed,
restarts: 0,
window_start_ns: self.store.runtime.now_mono_ns(),
};
while !self.stop.load(Ordering::Acquire) {
let epoch = state.cursor.visibility_epoch();
let batch = state.cursor.poll_batch(self.batch_size);
if batch.is_empty() {
state.cursor.park_for_data(epoch, self.idle_sleep);
continue;
}
let at_least_once = self.at_least_once.as_ref();
let store = &self.store;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
handler(&batch, store, at_least_once)
}));
let step = match result {
Ok(action) => self.handle_action(action, &mut state),
Err(panic_info) => self.handle_panic(panic_info.as_ref(), &mut state),
};
if matches!(step, Step::Stop) {
self.signal_stop();
}
}
}
fn handle_action(&mut self, action: CursorWorkerAction, state: &mut CursorRunState) -> Step {
match action {
CursorWorkerAction::Continue => {
let next_checkpoint = state.cursor.checkpoint();
if let Err(error) = state
.cursor
.persist_current(self.store.config.fs().as_ref())
{
self.report_persist_failure(
error,
"no \
failure callback wired — stopping worker to avoid \
silent durable-resume regression",
);
state
.cursor
.restore_checkpoint(state.committed.0, state.committed.1);
return Step::Stop;
}
state.committed = next_checkpoint;
Step::Continue
}
CursorWorkerAction::Stop => {
let final_checkpoint = state.cursor.checkpoint();
if let Err(error) = state
.cursor
.persist_current(self.store.config.fs().as_ref())
{
self.report_persist_failure(error, "no failure callback wired (clean stop)");
} else {
state.committed = final_checkpoint;
}
Step::Stop
}
CursorWorkerAction::StopWithRollback => {
state
.cursor
.restore_checkpoint(state.committed.0, state.committed.1);
Step::Stop
}
}
}
fn report_persist_failure(&mut self, error: std::io::Error, no_callback_note: &str) {
let Some(id) = self.checkpoint_id.as_ref() else {
debug_assert!(
false,
"in-memory cursor checkpoint persist failure is unreachable"
);
return;
};
{
let mut guard = self.error_slot.lock();
if guard.is_none() {
*guard = Some(checkpoint_write_failed(id.as_str(), &error));
}
}
if let Some(cb) = self.on_checkpoint_failure.as_mut() {
cb(id.as_str(), error);
} else {
tracing::error!(
cursor_id = %id.as_str(),
"durable cursor checkpoint persist failed; {no_callback_note}"
);
}
}
fn handle_panic(
&mut self,
panic_info: &(dyn std::any::Any + Send),
state: &mut CursorRunState,
) -> Step {
let panic_msg = stringify_panic_payload(panic_info);
if !self.restart_budget_ok(state) {
tracing::error!(
"cursor worker restart budget exhausted; stopping worker. \
Last panic: {panic_msg}"
);
if let Some(cb) = self.on_restart_budget_exhausted.take() {
cb();
}
return Step::Stop;
}
tracing::warn!(
"cursor worker panicked; restarting from last checkpoint. \
Panic: {panic_msg}"
);
let cursor = match build_worker_cursor(
&self.region,
&self.store.index,
&self.store.config.data_dir,
self.checkpoint_id.as_ref(),
false,
) {
Ok(cursor) => cursor,
Err(error) => {
self.record_first_error(error);
return Step::Stop;
}
};
state.cursor = cursor.with_gap_config(self.gap_observation);
state
.cursor
.restore_checkpoint(state.committed.0, state.committed.1);
Step::Continue
}
fn restart_budget_ok(&self, state: &mut CursorRunState) -> bool {
match &self.restart {
RestartPolicy::Once => {
if state.restarts >= 1 {
false
} else {
state.restarts += 1;
true
}
}
RestartPolicy::Bounded {
max_restarts,
within_ms,
} => {
let elapsed_ms = self
.store
.runtime
.now_mono_ns()
.saturating_sub(state.window_start_ns)
.max(0)
/ 1_000_000;
if elapsed_ms > i64::try_from(*within_ms).unwrap_or(i64::MAX) {
state.restarts = 0;
state.window_start_ns = self.store.runtime.now_mono_ns();
}
if state.restarts >= *max_restarts {
false
} else {
state.restarts += 1;
true
}
}
}
}
}
#[cfg(test)]
#[path = "worker_mutation_kill.rs"]
mod worker_mutation_kill;
#[cfg(test)]
mod tests {
use super::{build_worker_cursor, stringify_panic_payload, CursorWorkerConfig};
use crate::coordinate::Region;
use crate::store::delivery::cursor::{Cursor, CursorCheckpoint};
use crate::store::delivery::observation::CheckpointId;
use crate::store::index::StoreIndex;
use std::any::Any;
use std::sync::Arc;
#[test]
fn cursor_worker_config_debug_renders_struct_name_and_fields() {
let rendered = format!("{:?}", CursorWorkerConfig::default());
assert!(
rendered.contains("CursorWorkerConfig"),
"PROPERTY: Debug must name the struct; the empty-string mutant erases it, got {rendered:?}"
);
assert!(
rendered.contains("batch_size"),
"PROPERTY: Debug must render the configured fields, got {rendered:?}"
);
}
#[test]
fn build_worker_cursor_honors_the_load_saved_checkpoint_flag() {
let dir = tempfile::TempDir::new().expect("temp dir");
let region = Region::entity("entity:cursor-load-flag");
let index = Arc::new(StoreIndex::new());
let id = CheckpointId::new("batpak-build-worker-cursor-flag").expect("valid id");
let persisted = CursorCheckpoint {
position: 7,
started: true,
process_boot_ns: None,
region_identity: Some(region.checkpoint_identity()),
};
Cursor::save_checkpoint(dir.path(), &id, &persisted).expect("save checkpoint");
let loaded = build_worker_cursor(®ion, &index, dir.path(), Some(&id), true)
.expect("build loading cursor");
assert_eq!(
loaded.checkpoint(),
(7, true),
"PROPERTY: Some(id) + load=true must load the persisted position"
);
let bound = build_worker_cursor(®ion, &index, dir.path(), Some(&id), false)
.expect("build bound cursor");
assert_eq!(
bound.checkpoint(),
(0, false),
"PROPERTY: Some(id) + load=false must BIND without loading — the match \
guard `if load_saved_checkpoint` is load-bearing, not `if true`"
);
}
#[test]
fn build_worker_cursor_with_no_id_is_in_memory_and_fresh() {
let region = Region::entity("entity:cursor-no-id");
let index = Arc::new(StoreIndex::new());
let cursor = build_worker_cursor(
®ion,
&index,
std::path::Path::new("/nonexistent"),
None,
true,
)
.expect("build in-memory cursor");
assert_eq!(
cursor.checkpoint(),
(0, false),
"PROPERTY: a None checkpoint id yields a fresh in-memory cursor regardless of the flag"
);
}
#[test]
fn stringify_panic_payload_preserves_static_str_payload() {
let payload: Box<dyn Any + Send> = Box::new("cursor panic detail");
assert_eq!(
stringify_panic_payload(payload.as_ref()),
"cursor panic detail",
"PROPERTY: cursor worker panic evidence should include literal panic payloads"
);
}
#[test]
fn stringify_panic_payload_preserves_string_payload() {
let payload: Box<dyn Any + Send> = Box::new(String::from("owned cursor panic detail"));
assert_eq!(
stringify_panic_payload(payload.as_ref()),
"owned cursor panic detail",
"PROPERTY: cursor worker panic evidence should include owned String panic payloads"
);
}
#[test]
fn stringify_panic_payload_falls_back_for_non_string_payload() {
let payload: Box<dyn Any + Send> = Box::new(17_u32);
assert_eq!(
stringify_panic_payload(payload.as_ref()),
"unknown panic",
"PROPERTY: cursor worker panic evidence should remain well-formed for opaque payloads"
);
}
}