use std::collections::BTreeMap;
use serde_json::Value as JsonValue;
use super::change_record::{ChangeRecord, RangeAdmitError, RangeAuthority};
use super::util::{get_opt_u64, get_u64, object_from_slice, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RangeStreamPosition {
pub range_id: u64,
pub applied_lsn: u64,
pub accepted_term: u64,
pub accepted_epoch: u64,
}
impl RangeStreamPosition {
pub fn new(range_id: u64, applied_lsn: u64, accepted_term: u64, accepted_epoch: u64) -> Self {
Self {
range_id,
applied_lsn,
accepted_term,
accepted_epoch,
}
}
pub fn at_origin(range_id: u64) -> Self {
Self::new(range_id, 0, 0, 0)
}
pub fn authority(&self) -> RangeAuthority {
RangeAuthority {
range_id: self.range_id,
min_term: self.accepted_term,
min_ownership_epoch: self.accepted_epoch,
}
}
pub fn advance(&mut self, record: &ChangeRecord) {
if record.range_id != Some(self.range_id) || record.lsn <= self.applied_lsn {
return;
}
self.applied_lsn = record.lsn;
if record.term > self.accepted_term {
self.accepted_term = record.term;
}
if let Some(epoch) = record.ownership_epoch {
if epoch > self.accepted_epoch {
self.accepted_epoch = epoch;
}
}
}
pub fn encode_json(&self) -> Vec<u8> {
let mut obj = serde_json::Map::new();
obj.insert(
"range_id".to_string(),
JsonValue::Number(self.range_id.into()),
);
obj.insert(
"applied_lsn".to_string(),
JsonValue::Number(self.applied_lsn.into()),
);
obj.insert(
"accepted_term".to_string(),
JsonValue::Number(self.accepted_term.into()),
);
obj.insert(
"accepted_epoch".to_string(),
JsonValue::Number(self.accepted_epoch.into()),
);
serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
}
pub fn decode_json(bytes: &[u8]) -> Result<Self> {
let obj = object_from_slice(bytes)?;
Ok(Self {
range_id: get_u64(&obj, "range_id")?,
applied_lsn: get_opt_u64(&obj, "applied_lsn").unwrap_or(0),
accepted_term: get_opt_u64(&obj, "accepted_term").unwrap_or(0),
accepted_epoch: get_opt_u64(&obj, "accepted_epoch").unwrap_or(0),
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RangeStreamDecision {
Apply,
SkipOtherRange,
SkipReplayed,
Reject(RangeAdmitError),
}
pub fn classify_range_record(
position: &RangeStreamPosition,
record: &ChangeRecord,
) -> RangeStreamDecision {
if record.range_id != Some(position.range_id) {
return RangeStreamDecision::SkipOtherRange;
}
if record.lsn <= position.applied_lsn {
return RangeStreamDecision::SkipReplayed;
}
match position.authority().admit(record) {
Ok(()) => RangeStreamDecision::Apply,
Err(error) => RangeStreamDecision::Reject(error),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RangeStreamReject {
pub lsn: u64,
pub error: RangeAdmitError,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RangeCatchupPlan {
pub range_id: u64,
pub apply: Vec<usize>,
pub rejected: Vec<RangeStreamReject>,
pub resume: RangeStreamPosition,
pub scanned: usize,
}
impl RangeCatchupPlan {
pub fn apply_count(&self) -> usize {
self.apply.len()
}
pub fn is_empty(&self) -> bool {
self.apply.is_empty()
}
}
pub fn plan_range_catchup(
position: &RangeStreamPosition,
records: &[ChangeRecord],
) -> RangeCatchupPlan {
let mut resume = *position;
let mut apply = Vec::new();
let mut rejected = Vec::new();
for (index, record) in records.iter().enumerate() {
match classify_range_record(&resume, record) {
RangeStreamDecision::Apply => {
apply.push(index);
resume.advance(record);
}
RangeStreamDecision::Reject(error) => rejected.push(RangeStreamReject {
lsn: record.lsn,
error,
}),
RangeStreamDecision::SkipOtherRange | RangeStreamDecision::SkipReplayed => {}
}
}
RangeCatchupPlan {
range_id: position.range_id,
apply,
rejected,
resume,
scanned: records.len(),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RangeStreamProgress {
pub range_id: u64,
pub applied_lsn: u64,
pub streamed_lsn: u64,
pub primary_lsn: u64,
}
impl RangeStreamProgress {
pub fn new(range_id: u64) -> Self {
Self {
range_id,
applied_lsn: 0,
streamed_lsn: 0,
primary_lsn: 0,
}
}
pub fn apply_lag(&self) -> u64 {
self.primary_lsn.saturating_sub(self.applied_lsn)
}
pub fn stream_lag(&self) -> u64 {
self.primary_lsn.saturating_sub(self.streamed_lsn)
}
pub fn is_caught_up(&self) -> bool {
self.primary_lsn > 0 && self.applied_lsn >= self.primary_lsn
}
pub fn failover_eligible(&self, max_lag: u64) -> bool {
self.primary_lsn > 0 && self.apply_lag() <= max_lag
}
pub fn encode_json(&self) -> Vec<u8> {
let mut obj = serde_json::Map::new();
obj.insert(
"range_id".to_string(),
JsonValue::Number(self.range_id.into()),
);
obj.insert(
"applied_lsn".to_string(),
JsonValue::Number(self.applied_lsn.into()),
);
obj.insert(
"streamed_lsn".to_string(),
JsonValue::Number(self.streamed_lsn.into()),
);
obj.insert(
"primary_lsn".to_string(),
JsonValue::Number(self.primary_lsn.into()),
);
serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
}
pub fn decode_json(bytes: &[u8]) -> Result<Self> {
let obj = object_from_slice(bytes)?;
Ok(Self {
range_id: get_u64(&obj, "range_id")?,
applied_lsn: get_opt_u64(&obj, "applied_lsn").unwrap_or(0),
streamed_lsn: get_opt_u64(&obj, "streamed_lsn").unwrap_or(0),
primary_lsn: get_opt_u64(&obj, "primary_lsn").unwrap_or(0),
})
}
}
#[derive(Debug, Clone, Default)]
pub struct RangeProgressTracker {
ranges: BTreeMap<u64, RangeStreamProgress>,
}
impl RangeProgressTracker {
pub fn new() -> Self {
Self::default()
}
fn slot(&mut self, range_id: u64) -> &mut RangeStreamProgress {
self.ranges
.entry(range_id)
.or_insert_with(|| RangeStreamProgress::new(range_id))
}
pub fn index_record(&mut self, record: &ChangeRecord) {
let Some(range_id) = record.range_id else {
return;
};
let slot = self.slot(range_id);
if record.lsn > slot.primary_lsn {
slot.primary_lsn = record.lsn;
}
}
pub fn note_streamed(&mut self, range_id: u64, lsn: u64) {
let slot = self.slot(range_id);
if lsn > slot.streamed_lsn {
slot.streamed_lsn = lsn;
}
if lsn > slot.primary_lsn {
slot.primary_lsn = lsn;
}
}
pub fn note_applied(&mut self, range_id: u64, lsn: u64) {
let slot = self.slot(range_id);
if lsn > slot.applied_lsn {
slot.applied_lsn = lsn;
}
if lsn > slot.streamed_lsn {
slot.streamed_lsn = lsn;
}
if lsn > slot.primary_lsn {
slot.primary_lsn = lsn;
}
}
pub fn observe_position(&mut self, position: &RangeStreamPosition) {
self.note_applied(position.range_id, position.applied_lsn);
}
pub fn progress(&self, range_id: u64) -> Option<&RangeStreamProgress> {
self.ranges.get(&range_id)
}
pub fn apply_lag(&self, range_id: u64) -> Option<u64> {
self.ranges
.get(&range_id)
.map(RangeStreamProgress::apply_lag)
}
pub fn iter(&self) -> impl Iterator<Item = &RangeStreamProgress> {
self.ranges.values()
}
pub fn len(&self) -> usize {
self.ranges.len()
}
pub fn is_empty(&self) -> bool {
self.ranges.is_empty()
}
pub fn failover_eligible(&self, max_lag: u64) -> Vec<u64> {
self.ranges
.values()
.filter(|progress| progress.failover_eligible(max_lag))
.map(|progress| progress.range_id)
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::replication::change_record::ChangeOperation;
fn record(range_id: Option<u64>, lsn: u64, term: u64, epoch: Option<u64>) -> ChangeRecord {
ChangeRecord {
term,
lsn,
timestamp: 1,
operation: ChangeOperation::Insert,
collection: "c".to_string(),
entity_id: lsn,
entity_kind: "row".to_string(),
entity_bytes: Some(vec![1]),
metadata: None,
refresh_records: None,
range_id,
ownership_epoch: epoch,
}
}
#[test]
fn position_round_trips_on_the_json_wire() {
let pos = RangeStreamPosition::new(7, 42, 3, 5);
assert_eq!(
RangeStreamPosition::decode_json(&pos.encode_json()).unwrap(),
pos
);
}
#[test]
fn classify_routes_by_range_identity() {
let pos = RangeStreamPosition::at_origin(7);
assert_eq!(
classify_range_record(&pos, &record(Some(7), 1, 1, Some(1))),
RangeStreamDecision::Apply
);
assert_eq!(
classify_range_record(&pos, &record(Some(9), 1, 1, Some(1))),
RangeStreamDecision::SkipOtherRange
);
assert_eq!(
classify_range_record(&pos, &record(None, 1, 1, None)),
RangeStreamDecision::SkipOtherRange
);
}
#[test]
fn plan_filters_one_range_out_of_a_shared_stream() {
let stream = vec![
record(Some(7), 1, 1, Some(1)),
record(Some(9), 2, 1, Some(1)),
record(Some(7), 3, 1, Some(1)),
record(None, 4, 1, None), record(Some(7), 5, 1, Some(1)),
];
let plan = plan_range_catchup(&RangeStreamPosition::at_origin(7), &stream);
assert_eq!(plan.apply, vec![0, 2, 4]);
assert!(plan.rejected.is_empty());
assert_eq!(plan.scanned, 5);
assert_eq!(plan.resume.applied_lsn, 5);
assert_eq!(plan.apply_count(), 3);
}
#[test]
fn plan_resumes_from_a_known_range_position() {
let stream = vec![
record(Some(7), 1, 1, Some(1)),
record(Some(7), 3, 1, Some(1)),
record(Some(7), 5, 1, Some(1)),
];
let pos = RangeStreamPosition::new(7, 3, 1, 1);
let plan = plan_range_catchup(&pos, &stream);
assert_eq!(plan.apply, vec![2]);
assert_eq!(plan.resume.applied_lsn, 5);
}
#[test]
fn plan_rejects_stale_ownership_epoch_and_term() {
let pos = RangeStreamPosition::new(7, 0, 3, 4);
let stream = vec![
record(Some(7), 1, 3, Some(2)), record(Some(7), 2, 1, Some(9)), record(Some(7), 3, 3, Some(4)), ];
let plan = plan_range_catchup(&pos, &stream);
assert_eq!(plan.apply, vec![2]);
assert_eq!(
plan.rejected,
vec![
RangeStreamReject {
lsn: 1,
error: RangeAdmitError::StaleOwnershipEpoch {
record_epoch: 2,
accepted_epoch: 4,
},
},
RangeStreamReject {
lsn: 2,
error: RangeAdmitError::StaleTerm {
record_term: 1,
accepted_term: 3,
},
},
]
);
assert_eq!(plan.resume.applied_lsn, 3);
assert_eq!(plan.resume.accepted_epoch, 4);
}
#[test]
fn position_advance_ratchets_authority_so_a_later_stale_write_is_fenced() {
let mut pos = RangeStreamPosition::new(7, 0, 1, 1);
pos.advance(&record(Some(7), 10, 4, Some(6)));
assert_eq!(pos.applied_lsn, 10);
assert_eq!(pos.accepted_term, 4);
assert_eq!(pos.accepted_epoch, 6);
assert_eq!(
classify_range_record(&pos, &record(Some(7), 11, 4, Some(5))),
RangeStreamDecision::Reject(RangeAdmitError::StaleOwnershipEpoch {
record_epoch: 5,
accepted_epoch: 6,
})
);
}
#[test]
fn progress_round_trips_and_reports_lag() {
let mut progress = RangeStreamProgress::new(7);
progress.primary_lsn = 100;
progress.streamed_lsn = 80;
progress.applied_lsn = 60;
assert_eq!(progress.apply_lag(), 40);
assert_eq!(progress.stream_lag(), 20);
assert!(!progress.is_caught_up());
assert!(progress.failover_eligible(50));
assert!(!progress.failover_eligible(10));
assert_eq!(
RangeStreamProgress::decode_json(&progress.encode_json()).unwrap(),
progress
);
}
#[test]
fn tracker_reports_independent_lag_for_multiple_ranges() {
let mut tracker = RangeProgressTracker::new();
for rec in [
record(Some(7), 1, 1, Some(1)),
record(Some(9), 2, 1, Some(1)),
record(Some(7), 3, 1, Some(1)),
record(Some(9), 4, 1, Some(1)),
record(None, 5, 1, None), record(Some(9), 6, 1, Some(1)),
] {
tracker.index_record(&rec);
}
tracker.note_applied(7, 3);
tracker.note_applied(9, 2);
assert_eq!(tracker.len(), 2);
assert_eq!(tracker.apply_lag(7), Some(0));
assert!(tracker.progress(7).unwrap().is_caught_up());
assert_eq!(tracker.apply_lag(9), Some(4));
assert!(!tracker.progress(9).unwrap().is_caught_up());
assert_eq!(tracker.apply_lag(99), None);
assert_eq!(tracker.failover_eligible(0), vec![7]);
assert_eq!(tracker.failover_eligible(10), vec![7, 9]);
}
#[test]
fn tracker_frontiers_are_monotonic() {
let mut tracker = RangeProgressTracker::new();
tracker.note_applied(7, 50);
tracker.note_streamed(7, 10);
tracker.note_applied(7, 20);
tracker.index_record(&record(Some(7), 5, 1, Some(1)));
let progress = tracker.progress(7).unwrap();
assert_eq!(progress.applied_lsn, 50);
assert_eq!(progress.streamed_lsn, 50);
assert_eq!(progress.primary_lsn, 50);
}
#[test]
fn observe_position_adopts_follower_applied_frontier() {
let mut tracker = RangeProgressTracker::new();
tracker.index_record(&record(Some(7), 9, 1, Some(1)));
tracker.observe_position(&RangeStreamPosition::new(7, 7, 1, 1));
assert_eq!(tracker.apply_lag(7), Some(2));
}
}