use crate::share_coordinator::persistence::{ShareSnapshotValue, ShareUpdateValue, StateBatch};
#[derive(Debug, Clone, Default)]
pub struct SharePartitionState {
pub state_epoch: i32,
pub leader_epoch: i32,
pub start_offset: i64,
pub delivery_complete_count: i32,
pub state_batches: Vec<StateBatch>,
pub snapshot_epoch: i64,
pub last_snapshot_offset: i64,
pub updates_since_snapshot: u32,
}
impl SharePartitionState {
pub fn apply_snapshot(&mut self, v: &ShareSnapshotValue) {
self.snapshot_epoch = v.snapshot_epoch;
self.state_epoch = v.state_epoch;
self.leader_epoch = v.leader_epoch;
self.start_offset = v.start_offset;
self.delivery_complete_count = v.delivery_complete_count;
self.state_batches.clone_from(&v.state_batches);
self.updates_since_snapshot = 0;
}
pub fn apply_update(&mut self, v: &ShareUpdateValue) {
self.leader_epoch = v.leader_epoch;
self.merge_batches(v.start_offset, &v.state_batches);
self.delivery_complete_count = v.delivery_complete_count;
self.updates_since_snapshot += 1;
}
pub fn merge_batches(&mut self, new_start: i64, written: &[StateBatch]) {
if new_start > self.start_offset {
self.start_offset = new_start;
}
self.state_batches
.retain(|b| b.last_offset >= self.start_offset);
for w in written {
if w.last_offset < self.start_offset {
continue;
}
self.state_batches
.retain(|b| b.first_offset != w.first_offset);
self.state_batches.push(w.clone());
}
self.state_batches.sort_by_key(|b| b.first_offset);
}
#[must_use]
pub fn to_snapshot(&self) -> ShareSnapshotValue {
ShareSnapshotValue {
snapshot_epoch: self.snapshot_epoch + 1,
state_epoch: self.state_epoch,
leader_epoch: self.leader_epoch,
start_offset: self.start_offset,
delivery_complete_count: self.delivery_complete_count,
state_batches: self.state_batches.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
fn batch(first: i64, last: i64) -> StateBatch {
StateBatch {
first_offset: first,
last_offset: last,
delivery_state: 0,
delivery_count: 1,
}
}
#[test]
fn apply_snapshot_then_update_advances_spso_and_drops_sub_spso_batches() {
let mut s = SharePartitionState::default();
s.apply_snapshot(&ShareSnapshotValue {
snapshot_epoch: 1,
state_epoch: 2,
leader_epoch: 3,
start_offset: 0,
delivery_complete_count: 0,
state_batches: vec![batch(0, 9), batch(10, 19), batch(20, 29)],
});
assert!(s.state_epoch == 2);
assert!(s.start_offset == 0);
s.apply_update(&ShareUpdateValue {
snapshot_epoch: 1,
leader_epoch: 4,
start_offset: 20,
delivery_complete_count: 7,
state_batches: vec![batch(30, 39)],
});
assert!(s.start_offset == 20);
assert!(s.leader_epoch == 4);
assert!(s.delivery_complete_count == 7);
let firsts: Vec<i64> = s.state_batches.iter().map(|b| b.first_offset).collect();
assert!(firsts == vec![20, 30]);
assert!(s.updates_since_snapshot == 1);
}
#[test]
fn merge_upserts_batch_by_first_offset() {
let mut s = SharePartitionState {
state_batches: vec![batch(0, 9), batch(10, 19)],
..Default::default()
};
s.merge_batches(
0,
&[
StateBatch {
first_offset: 10,
last_offset: 25,
delivery_state: 2,
delivery_count: 5,
},
batch(30, 39),
],
);
assert!(s.state_batches.len() == 3);
let updated = s
.state_batches
.iter()
.find(|b| b.first_offset == 10)
.unwrap();
assert!(updated.last_offset == 25);
assert!(updated.delivery_count == 5);
let firsts: Vec<i64> = s.state_batches.iter().map(|b| b.first_offset).collect();
assert!(firsts == vec![0, 10, 30]);
}
#[test]
fn merge_drops_written_batch_below_spso() {
let mut s = SharePartitionState {
start_offset: 50,
..Default::default()
};
s.merge_batches(50, &[batch(0, 9)]);
assert!(s.state_batches.is_empty());
}
#[test]
fn to_snapshot_bumps_snapshot_epoch() {
let s = SharePartitionState {
snapshot_epoch: 4,
state_epoch: 1,
start_offset: 10,
state_batches: vec![batch(10, 19)],
..Default::default()
};
let snap = s.to_snapshot();
assert!(snap.snapshot_epoch == 5);
assert!(snap.start_offset == 10);
assert!(snap.state_batches == s.state_batches);
}
}