use crate::coordinate::Region;
use crate::store::delivery::canal::{Canal, CanalBatch, CanalClosed};
use crate::store::delivery::observation::CheckpointId;
use crate::store::index::{IndexEntry, StoreIndex};
use crate::store::platform::fs::StoreFs;
use crate::store::StoreError;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
mod checkpoint;
mod gap;
mod worker;
pub use checkpoint::CursorCheckpoint;
use checkpoint::{cursor_checkpoint_path, CursorDurableBinding};
use gap::GapBuffer;
pub use gap::{CursorGapConfig, GapObservation};
pub use worker::{CursorWorkerAction, CursorWorkerConfig, CursorWorkerHandle};
pub struct Cursor {
region: Region,
position: u64, started: bool, index: Arc<StoreIndex>,
gap_buffer: Option<GapBuffer>,
durable: Option<CursorDurableBinding>,
}
impl Cursor {
pub(crate) fn new(region: Region, index: Arc<StoreIndex>) -> Self {
Self {
region,
position: 0,
started: false,
index,
gap_buffer: None,
durable: None,
}
}
fn new_bound_checkpoint(
region: Region,
index: Arc<StoreIndex>,
data_dir: &Path,
id: CheckpointId,
) -> Self {
Self {
region,
position: 0,
started: false,
index,
gap_buffer: None,
durable: Some(CursorDurableBinding {
data_dir: data_dir.to_path_buf(),
id,
}),
}
}
pub(crate) fn new_with_checkpoint(
region: Region,
index: Arc<StoreIndex>,
data_dir: &Path,
id: &CheckpointId,
) -> Result<Self, StoreError> {
let mut cursor = Self::new_bound_checkpoint(region, index, data_dir, id.clone());
match Self::load_checkpoint(data_dir, id) {
Ok(Some(ckpt)) => {
let expected_region = cursor.region.checkpoint_identity();
if ckpt.region_identity.as_deref() != Some(expected_region.as_str()) {
return Err(StoreError::CursorCheckpointRegionMismatch {
path: cursor_checkpoint_path(data_dir, id),
stored: ckpt.region_identity,
expected: expected_region,
});
}
cursor.position = ckpt.position;
cursor.started = ckpt.started;
}
Ok(None) => {}
Err(error) => {
if error.kind() == std::io::ErrorKind::InvalidData {
return Err(StoreError::CursorCheckpointCorrupt {
path: cursor_checkpoint_path(data_dir, id),
reason: error.to_string(),
});
}
return Err(StoreError::Io(error));
}
}
Ok(cursor)
}
pub fn poll(&mut self) -> Option<IndexEntry> {
let hits = self
.index
.query_hits_after(&self.region, self.position, self.started, 1);
if let Some(hit) = hits.into_iter().next() {
let expected_sequence = if self.started {
self.position.saturating_add(1)
} else {
0
};
self.record_gap(expected_sequence, hit.global_sequence);
self.position = hit.global_sequence;
self.started = true;
self.index.upgrade_hit(hit)
} else {
None
}
}
pub fn poll_batch(&mut self, max: usize) -> Vec<IndexEntry> {
let hits = self
.index
.query_hits_after(&self.region, self.position, self.started, max);
if hits.is_empty() {
return Vec::new();
}
self.record_gaps_for_hits(&hits);
self.started = true;
self.position = hits[hits.len() - 1].global_sequence;
hits.into_iter()
.filter_map(|hit| self.index.upgrade_hit(hit))
.collect()
}
pub(crate) fn visibility_epoch(&self) -> u64 {
self.index.sequence.visibility_epoch()
}
pub(crate) fn park_for_data(&self, since_epoch: u64, timeout: Duration) {
self.index
.sequence
.park_for_visibility_change(since_epoch, timeout);
}
#[must_use]
pub fn with_gap_config(mut self, config: CursorGapConfig) -> Self {
self.gap_buffer = match config {
CursorGapConfig::Disabled => None,
CursorGapConfig::Enabled { capacity } => Some(GapBuffer::new_nonzero(capacity)),
};
self
}
pub fn take_gaps(&mut self) -> Vec<GapObservation> {
match self.gap_buffer.as_mut() {
Some(buffer) => buffer.take_all(),
None => Vec::new(),
}
}
pub(crate) fn checkpoint(&self) -> (u64, bool) {
(self.position, self.started)
}
pub(crate) fn restore_checkpoint(&mut self, position: u64, started: bool) {
self.position = position;
self.started = started;
}
pub(crate) fn persist_current(&self, fs: &dyn StoreFs) -> std::io::Result<()> {
let Some(binding) = &self.durable else {
return Ok(());
};
let ckpt = CursorCheckpoint::from_checkpoint(
self.position,
self.started,
self.region.checkpoint_identity(),
);
Self::save_checkpoint_with_fs(&binding.data_dir, &binding.id, &ckpt, fs)
}
fn record_gaps_for_hits(&mut self, hits: &[crate::store::index::QueryHit]) {
let mut expected_sequence = if self.started {
self.position.saturating_add(1)
} else {
0
};
for hit in hits {
self.record_gap(expected_sequence, hit.global_sequence);
expected_sequence = hit.global_sequence.saturating_add(1);
}
}
fn record_gap(&mut self, expected_sequence: u64, delivered_sequence: u64) {
let Some(buffer) = self.gap_buffer.as_mut() else {
return;
};
if delivered_sequence <= expected_sequence {
return;
}
let cancelled_visibility = self.index.cancelled_visibility_ranges();
let lane_ranges = self
.region
.lane
.and_then(|lane| cancelled_visibility.lanes.get(&lane));
let cancelled_ranges = cancelled_visibility
.global
.iter()
.chain(lane_ranges.into_iter().flatten())
.copied()
.filter_map(|(start, end)| {
let overlap_start = start.max(expected_sequence);
let overlap_end = end.min(delivered_sequence);
(overlap_start < overlap_end).then_some((overlap_start, overlap_end))
})
.collect::<Vec<_>>();
if cancelled_ranges.is_empty() {
return;
}
buffer.push(GapObservation {
expected_sequence,
delivered_sequence,
cancelled_ranges,
});
}
}
impl Canal for Cursor {
type Item = IndexEntry;
type Error = CanalClosed;
fn pull_batch(
&mut self,
max: usize,
deadline: Duration,
) -> Result<CanalBatch<Self::Item>, Self::Error> {
if max == 0 {
return Ok(CanalBatch::Empty);
}
let start = Instant::now();
loop {
let epoch = self.visibility_epoch();
let batch = self.poll_batch(max);
match batch.len() {
0 => {
if start.elapsed() >= deadline {
return Ok(CanalBatch::Empty);
}
let remaining = deadline.saturating_sub(start.elapsed());
self.park_for_data(epoch, remaining);
}
1 => {
let mut batch = batch;
return Ok(CanalBatch::One(batch.remove(0)));
}
_ => return Ok(CanalBatch::Many(batch)),
}
}
}
}
#[cfg(test)]
mod mutation_kill_tests {
use super::{Canal, CanalBatch, Cursor, CursorGapConfig, Region, StoreIndex};
use crate::store::hidden_ranges::CancelledVisibilityRanges;
use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn fresh_index() -> Arc<StoreIndex> {
Arc::new(StoreIndex::new())
}
#[test]
fn record_gap_intersects_with_a_half_open_range_excluding_zero_width_overlaps() {
let index = fresh_index();
index.restore_cancelled_visibility_ranges(CancelledVisibilityRanges {
global: vec![(2, 4), (5, 7)],
lanes: BTreeMap::new(),
});
let mut cursor =
Cursor::new(Region::all(), index).with_gap_config(CursorGapConfig::Enabled {
capacity: NonZeroUsize::new(8).expect("nonzero capacity"),
});
cursor.record_gap(1, 5);
let gaps = cursor.take_gaps();
assert_eq!(gaps.len(), 1, "exactly one gap observation is recorded");
assert_eq!(gaps[0].expected_sequence, 1);
assert_eq!(gaps[0].delivered_sequence, 5);
assert_eq!(
gaps[0].cancelled_ranges,
vec![(2, 4)],
"PROPERTY: only the genuine (2,4) overlap is retained; the `<=` mutant \
also admits the zero-width (5,5) boundary touch, got {:?}",
gaps[0].cancelled_ranges
);
}
#[test]
fn pull_batch_blocks_until_the_deadline_before_reporting_empty() {
let index = fresh_index();
let mut cursor = Cursor::new(Region::all(), index);
let deadline = Duration::from_millis(50);
let start = Instant::now();
let result = cursor
.pull_batch(8, deadline)
.expect("pull_batch on an empty cursor");
let elapsed = start.elapsed();
assert!(
matches!(result, CanalBatch::Empty),
"an idle cursor eventually returns Empty"
);
assert!(
elapsed >= Duration::from_millis(20),
"PROPERTY: pull_batch must block toward its deadline before reporting \
Empty; the `<` mutant returns instantly, elapsed only {elapsed:?}"
);
}
#[test]
fn visibility_epoch_advances_with_each_publish() {
let index = fresh_index();
let cursor = Cursor::new(Region::all(), Arc::clone(&index));
let before = cursor.visibility_epoch();
index.reserve_sequences(1);
index
.publish(1, "mutation-kill-visibility")
.expect("publish 1");
let after_one = cursor.visibility_epoch();
index.reserve_sequences(1);
index
.publish(2, "mutation-kill-visibility")
.expect("publish 2");
let after_two = cursor.visibility_epoch();
let mut failures: Vec<String> = Vec::new();
if before != 0 {
failures.push(format!("fresh index epoch must be 0, got {before}"));
}
if after_one != 1 {
failures.push(format!(
"epoch after one publish must be 1, got {after_one}"
));
}
if after_two != 2 {
failures.push(format!(
"epoch after two publishes must be 2, got {after_two}"
));
}
assert!(
failures.is_empty(),
"visibility_epoch mismatches: {failures:?}"
);
}
#[test]
fn park_for_data_blocks_until_the_timeout_when_no_publish_races() {
let index = fresh_index();
let cursor = Cursor::new(Region::all(), index);
let epoch = cursor.visibility_epoch();
let timeout = Duration::from_millis(50);
let start = Instant::now();
cursor.park_for_data(epoch, timeout);
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(20),
"park_for_data must block for ~timeout when no publish advances the epoch; \
elapsed only {elapsed:?}"
);
}
#[test]
fn pull_batch_returns_empty_not_many_when_no_data_arrives() {
let index = fresh_index();
let mut cursor = Cursor::new(Region::all(), index);
let result = cursor
.pull_batch(8, Duration::from_millis(30))
.expect("pull_batch on empty cursor");
assert!(
matches!(result, CanalBatch::Empty),
"an empty cursor must return CanalBatch::Empty after the deadline, got a non-empty batch"
);
}
}