use crate::coordinate::Region;
use crate::store::delivery::observation::{AtLeastOnce, CheckpointId};
use crate::store::index::{IndexEntry, StoreIndex};
use crate::store::{RestartPolicy, Store, StoreError};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tempfile::NamedTempFile;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CursorCheckpoint {
pub position: u64,
pub started: bool,
pub process_boot_ns: Option<u64>,
#[serde(default)]
pub region_identity: Option<String>,
}
impl CursorCheckpoint {
fn from_checkpoint(position: u64, started: bool, region_identity: String) -> Self {
Self {
position,
started,
process_boot_ns: None,
region_identity: Some(region_identity),
}
}
}
fn cursor_checkpoint_dir(data_dir: &Path) -> PathBuf {
data_dir.join("cursors")
}
fn cursor_checkpoint_path(data_dir: &Path, id: &str) -> PathBuf {
cursor_checkpoint_dir(data_dir).join(format!("{id}.ckpt"))
}
pub struct Cursor {
region: Region,
position: u64, started: bool, index: Arc<StoreIndex>,
gap_buffer: Option<GapBuffer>,
durable: Option<CursorDurableBinding>,
}
#[derive(Clone, Debug)]
struct CursorDurableBinding {
data_dir: PathBuf,
id: CheckpointId,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct CursorGapConfig {
pub enabled: bool,
pub buffer_capacity: usize,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GapObservation {
pub expected_sequence: u64,
pub delivered_sequence: u64,
pub cancelled_ranges: Vec<(u64, u64)>,
}
#[derive(Clone, Debug)]
struct GapBuffer {
capacity: usize,
observations: VecDeque<GapObservation>,
}
impl GapBuffer {
fn new(capacity: usize) -> Option<Self> {
(capacity > 0).then(|| Self {
capacity,
observations: VecDeque::with_capacity(capacity),
})
}
fn push(&mut self, observation: GapObservation) {
if self.observations.len() == self.capacity {
self.observations.pop_front();
}
self.observations.push_back(observation);
}
fn take_all(&mut self) -> Vec<GapObservation> {
self.observations.drain(..).collect()
}
}
impl Cursor {
pub(crate) fn new(region: Region, index: Arc<StoreIndex>) -> Self {
Self {
region,
position: 0,
started: false,
index,
gap_buffer: None,
durable: None,
}
}
fn new_bound_checkpoint(
region: Region,
index: Arc<StoreIndex>,
data_dir: &Path,
id: &str,
) -> Self {
Self {
region,
position: 0,
started: false,
index,
gap_buffer: None,
durable: Some(CursorDurableBinding {
data_dir: data_dir.to_path_buf(),
id: CheckpointId::new(id),
}),
}
}
pub(crate) fn new_with_checkpoint(
region: Region,
index: Arc<StoreIndex>,
data_dir: &Path,
id: &str,
) -> Result<Self, StoreError> {
let mut cursor = Self::new_bound_checkpoint(region, index, data_dir, id);
match Self::load_checkpoint(data_dir, id) {
Ok(Some(ckpt)) => {
let expected_region = cursor.region.checkpoint_identity();
if ckpt.region_identity.as_deref() != Some(expected_region.as_str()) {
return Err(StoreError::CursorCheckpointRegionMismatch {
path: cursor_checkpoint_path(data_dir, id),
stored: ckpt.region_identity,
expected: expected_region,
});
}
cursor.position = ckpt.position;
cursor.started = ckpt.started;
}
Ok(None) => {}
Err(error) => {
if error.kind() == std::io::ErrorKind::InvalidData {
return Err(StoreError::CursorCheckpointCorrupt {
path: cursor_checkpoint_path(data_dir, id),
reason: error.to_string(),
});
}
return Err(StoreError::Io(error));
}
}
Ok(cursor)
}
pub fn poll(&mut self) -> Option<IndexEntry> {
let hits = self
.index
.query_hits_after(&self.region, self.position, self.started, 1);
if let Some(hit) = hits.into_iter().next() {
let expected_sequence = if self.started {
self.position.saturating_add(1)
} else {
0
};
self.record_gap(expected_sequence, hit.global_sequence);
self.position = hit.global_sequence;
self.started = true;
self.index.upgrade_hit(hit)
} else {
None
}
}
pub fn poll_batch(&mut self, max: usize) -> Vec<IndexEntry> {
let hits = self
.index
.query_hits_after(&self.region, self.position, self.started, max);
if hits.is_empty() {
return Vec::new();
}
self.record_gaps_for_hits(&hits);
self.started = true;
self.position = hits[hits.len() - 1].global_sequence;
hits.into_iter()
.filter_map(|hit| self.index.upgrade_hit(hit))
.collect()
}
#[must_use]
pub fn with_gap_config(mut self, config: CursorGapConfig) -> Self {
self.gap_buffer = if config.enabled {
GapBuffer::new(config.buffer_capacity)
} else {
None
};
self
}
pub fn take_gaps(&mut self) -> Vec<GapObservation> {
match self.gap_buffer.as_mut() {
Some(buffer) => buffer.take_all(),
None => Vec::new(),
}
}
pub(crate) fn checkpoint(&self) -> (u64, bool) {
(self.position, self.started)
}
pub(crate) fn restore_checkpoint(&mut self, position: u64, started: bool) {
self.position = position;
self.started = started;
}
pub(crate) fn persist_current(&self) -> std::io::Result<()> {
let Some(binding) = &self.durable else {
return Ok(());
};
let ckpt = CursorCheckpoint::from_checkpoint(
self.position,
self.started,
self.region.checkpoint_identity(),
);
Self::save_checkpoint(&binding.data_dir, binding.id.as_str(), &ckpt)
}
pub fn load_checkpoint(data_dir: &Path, id: &str) -> std::io::Result<Option<CursorCheckpoint>> {
let path = cursor_checkpoint_path(data_dir, id);
let bytes = match std::fs::read(&path) {
Ok(b) => b,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(error) => return Err(error),
};
match rmp_serde::from_slice::<CursorCheckpoint>(&bytes) {
Ok(ckpt) => Ok(Some(ckpt)),
Err(error) => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("cursor checkpoint decode failed: {error}"),
)),
}
}
pub fn save_checkpoint(
data_dir: &Path,
id: &str,
ckpt: &CursorCheckpoint,
) -> std::io::Result<()> {
let dir = cursor_checkpoint_dir(data_dir);
std::fs::create_dir_all(&dir)?;
let bytes =
rmp_serde::to_vec_named(ckpt).map_err(|e| std::io::Error::other(e.to_string()))?;
let final_path = cursor_checkpoint_path(data_dir, id);
let mut tmp = NamedTempFile::new_in(&dir)?;
{
use std::io::Write;
tmp.write_all(&bytes)?;
tmp.flush()?;
}
crate::store::platform::sync::sync_file_all_io(tmp.as_file())?;
let admission = crate::store::platform::sync::admit_current_parent_dir_sync()
.map_err(|error| std::io::Error::other(error.to_string()))?;
crate::store::platform::sync::persist_temp_with_parent_sync(tmp, &final_path, admission)?;
Ok(())
}
fn record_gaps_for_hits(&mut self, hits: &[crate::store::index::QueryHit]) {
let mut expected_sequence = if self.started {
self.position.saturating_add(1)
} else {
0
};
for hit in hits {
self.record_gap(expected_sequence, hit.global_sequence);
expected_sequence = hit.global_sequence.saturating_add(1);
}
}
fn record_gap(&mut self, expected_sequence: u64, delivered_sequence: u64) {
let Some(buffer) = self.gap_buffer.as_mut() else {
return;
};
if delivered_sequence <= expected_sequence {
return;
}
let cancelled_ranges = self
.index
.cancelled_visibility_ranges()
.into_iter()
.filter_map(|(start, end)| {
let overlap_start = start.max(expected_sequence);
let overlap_end = end.min(delivered_sequence);
(overlap_start < overlap_end).then_some((overlap_start, overlap_end))
})
.collect::<Vec<_>>();
if cancelled_ranges.is_empty() {
return;
}
buffer.push(GapObservation {
expected_sequence,
delivered_sequence,
cancelled_ranges,
});
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CursorWorkerAction {
Continue,
Stop,
StopWithRollback,
}
pub(crate) type RestartBudgetExhaustedCallback = Box<dyn FnOnce() + Send + 'static>;
pub(crate) type CheckpointFailureCallback = Box<dyn FnMut(&str, std::io::Error) + Send + 'static>;
fn checkpoint_write_failed(id: &str, error: &std::io::Error) -> StoreError {
StoreError::CheckpointWriteFailed {
id: id.to_string(),
source: std::io::Error::new(error.kind(), error.to_string()),
}
}
fn build_worker_cursor(
region: &Region,
index: &Arc<StoreIndex>,
data_dir: &Path,
checkpoint_id: Option<&CheckpointId>,
load_saved_checkpoint: bool,
) -> Result<Cursor, StoreError> {
match checkpoint_id {
Some(id) if load_saved_checkpoint => {
Cursor::new_with_checkpoint(region.clone(), Arc::clone(index), data_dir, id.as_str())
}
Some(id) => Ok(Cursor::new_bound_checkpoint(
region.clone(),
Arc::clone(index),
data_dir,
id.as_str(),
)),
None => Ok(Cursor::new(region.clone(), Arc::clone(index))),
}
}
#[non_exhaustive]
pub struct CursorWorkerConfig {
pub batch_size: usize,
pub idle_sleep: Duration,
pub restart: RestartPolicy,
pub checkpoint_id: Option<CheckpointId>,
pub gap_observation: CursorGapConfig,
pub(crate) on_restart_budget_exhausted: Option<RestartBudgetExhaustedCallback>,
pub(crate) on_checkpoint_failure: Option<CheckpointFailureCallback>,
}
impl Clone for CursorWorkerConfig {
fn clone(&self) -> Self {
Self {
batch_size: self.batch_size,
idle_sleep: self.idle_sleep,
restart: self.restart.clone(),
checkpoint_id: self.checkpoint_id.clone(),
gap_observation: self.gap_observation,
on_restart_budget_exhausted: None,
on_checkpoint_failure: None,
}
}
}
impl std::fmt::Debug for CursorWorkerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CursorWorkerConfig")
.field("batch_size", &self.batch_size)
.field("idle_sleep", &self.idle_sleep)
.field("restart", &self.restart)
.field("checkpoint_id", &self.checkpoint_id)
.field("gap_observation", &self.gap_observation)
.field(
"on_restart_budget_exhausted",
&self.on_restart_budget_exhausted.is_some(),
)
.field(
"on_checkpoint_failure",
&self.on_checkpoint_failure.is_some(),
)
.finish()
}
}
impl Default for CursorWorkerConfig {
fn default() -> Self {
Self {
batch_size: 64,
idle_sleep: Duration::from_millis(10),
restart: RestartPolicy::Once,
checkpoint_id: None,
gap_observation: CursorGapConfig::default(),
on_restart_budget_exhausted: None,
on_checkpoint_failure: None,
}
}
}
pub struct CursorWorkerHandle {
stop: Arc<AtomicBool>,
join: Option<std::thread::JoinHandle<()>>,
error_slot: Arc<Mutex<Option<StoreError>>>,
}
impl CursorWorkerHandle {
fn finish_join(&mut self) -> Result<(), StoreError> {
if let Some(join) = self.join.take() {
join.join().map_err(|_| StoreError::WriterCrashed)?;
}
let mut guard = self
.error_slot
.lock()
.map_err(|_| StoreError::WriterCrashed)?;
guard.take().map_or(Ok(()), Err)
}
pub fn stop(&self) {
self.stop.store(true, Ordering::Release);
}
pub fn join(mut self) -> Result<(), StoreError> {
self.finish_join()
}
pub fn stop_and_join(mut self) -> Result<(), StoreError> {
self.stop();
self.finish_join()
}
}
impl Drop for CursorWorkerHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Release);
}
}
impl Store<crate::store::Open> {
pub fn cursor_worker<F>(
self: &Arc<Self>,
region: &Region,
config: CursorWorkerConfig,
mut handler: F,
) -> Result<CursorWorkerHandle, StoreError>
where
F: FnMut(
&[IndexEntry],
&Store<crate::store::Open>,
Option<&AtLeastOnce>,
) -> CursorWorkerAction
+ Send
+ 'static,
{
let store = Arc::clone(self);
let region = region.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_thread = Arc::clone(&stop);
let error_slot = Arc::new(Mutex::new(None));
let error_slot_thread = Arc::clone(&error_slot);
let CursorWorkerConfig {
batch_size,
idle_sleep,
restart,
checkpoint_id,
gap_observation,
on_restart_budget_exhausted,
on_checkpoint_failure,
} = config;
let at_least_once = checkpoint_id
.as_ref()
.map(|id| AtLeastOnce::from_cursor_callback(id.as_str()));
let join = std::thread::Builder::new()
.name("batpak-cursor-worker".into())
.spawn(move || {
let mut cursor = match build_worker_cursor(
®ion,
&store.index,
&store.config.data_dir,
checkpoint_id.as_ref(),
true,
) {
Ok(cursor) => cursor,
Err(error) => {
if let Ok(mut guard) = error_slot_thread.lock() {
if guard.is_none() {
*guard = Some(error);
}
}
stop_thread.store(true, Ordering::Release);
return;
}
};
cursor = cursor.with_gap_config(gap_observation);
let mut committed = cursor.checkpoint();
let mut restarts = 0u32;
let mut window_start = Instant::now();
let mut budget_callback = on_restart_budget_exhausted;
let checkpoint_error_slot = Arc::clone(&error_slot_thread);
let mut checkpoint_failure_callback = on_checkpoint_failure;
while !stop_thread.load(Ordering::Acquire) {
let batch = cursor.poll_batch(batch_size);
if batch.is_empty() {
std::thread::sleep(idle_sleep);
continue;
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
handler(&batch, &store, at_least_once.as_ref())
}));
match result {
Ok(CursorWorkerAction::Continue) => {
let next_checkpoint = cursor.checkpoint();
if let Err(error) = cursor.persist_current() {
let Some(id) = checkpoint_id.as_ref() else {
debug_assert!(
false,
"in-memory cursor checkpoint persist failure is unreachable"
);
stop_thread.store(true, Ordering::Release);
continue;
};
if let Ok(mut guard) = checkpoint_error_slot.lock() {
if guard.is_none() {
*guard = Some(checkpoint_write_failed(id.as_str(), &error));
}
}
if let Some(cb) = checkpoint_failure_callback.as_mut() {
cb(id.as_str(), error);
} else {
tracing::error!(
cursor_id = %id.as_str(),
"durable cursor checkpoint persist failed; no \
failure callback wired — stopping worker to \
avoid silent durable-resume regression"
);
}
cursor.restore_checkpoint(committed.0, committed.1);
stop_thread.store(true, Ordering::Release);
continue;
}
committed = next_checkpoint;
}
Ok(CursorWorkerAction::Stop) => {
let final_checkpoint = cursor.checkpoint();
if let Err(error) = cursor.persist_current() {
let Some(id) = checkpoint_id.as_ref() else {
debug_assert!(
false,
"in-memory cursor checkpoint persist failure is unreachable"
);
stop_thread.store(true, Ordering::Release);
continue;
};
if let Ok(mut guard) = checkpoint_error_slot.lock() {
if guard.is_none() {
*guard = Some(checkpoint_write_failed(id.as_str(), &error));
}
}
if let Some(cb) = checkpoint_failure_callback.as_mut() {
cb(id.as_str(), error);
} else {
tracing::error!(
cursor_id = %id.as_str(),
"durable cursor checkpoint persist failed on \
clean stop; no failure callback wired"
);
}
} else {
committed = final_checkpoint;
}
stop_thread.store(true, Ordering::Release);
}
Ok(CursorWorkerAction::StopWithRollback) => {
cursor.restore_checkpoint(committed.0, committed.1);
stop_thread.store(true, Ordering::Release);
}
Err(_) => {
let budget_ok = match &restart {
RestartPolicy::Once => {
if restarts >= 1 {
false
} else {
restarts += 1;
true
}
}
RestartPolicy::Bounded {
max_restarts,
within_ms,
} => {
if window_start.elapsed() > Duration::from_millis(*within_ms) {
restarts = 0;
window_start = Instant::now();
}
if restarts >= *max_restarts {
false
} else {
restarts += 1;
true
}
}
};
if !budget_ok {
tracing::error!(
"cursor worker restart budget exhausted; stopping worker"
);
if let Some(cb) = budget_callback.take() {
cb();
}
stop_thread.store(true, Ordering::Release);
continue;
}
tracing::warn!(
"cursor worker panicked; restarting from last checkpoint"
);
cursor = match build_worker_cursor(
®ion,
&store.index,
&store.config.data_dir,
checkpoint_id.as_ref(),
false,
) {
Ok(cursor) => cursor,
Err(error) => {
if let Ok(mut guard) = error_slot_thread.lock() {
if guard.is_none() {
*guard = Some(error);
}
}
stop_thread.store(true, Ordering::Release);
continue;
}
};
cursor = cursor.with_gap_config(gap_observation);
cursor.restore_checkpoint(committed.0, committed.1);
}
}
}
})
.map_err(StoreError::Io)?;
Ok(CursorWorkerHandle {
stop,
join: Some(join),
error_slot,
})
}
}