use std::path::PathBuf;
use std::sync::Arc;
use bytes::BufMut;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{Duration, Instant};
use uuid::Uuid;
use crabka_metadata::{
MetadataImage, MetadataRecord, VotersRecord, from_kraft_value, to_kraft_values,
};
use crabka_protocol::records::{Record, RecordBatch};
use crate::error::RaftError;
use crate::kraft::action::{Action, TimerKind};
use crate::kraft::core::QuorumStateMachine;
use crate::kraft::event::{Event, LogEnd};
use crate::kraft::log::KraftLog;
use crate::kraft::role::Role;
use crate::kraft::snapshot_fetch::{SnapshotFetchState, SnapshotFetchStep};
use crate::kraft::transport::{
Command, Inbound, MetadataFetchSlice, PeerSender, QuorumStateSnapshot, TimerTick, api_key, wire,
};
use crate::kraft::types::{LeaderEpoch, LogView, NodeId, QuorumState, ReplicaKey, SimInstant};
const FETCH_MISS_LIMIT: u32 = 3;
const HEARTBEAT_DIVISOR: u64 = 3;
const QUORUM_STATE_FILE: &str = "quorum-state";
const SNAPSHOT_NOT_FOUND: i16 = 98;
const METADATA_SUBDIR: &str = "@metadata-0";
#[must_use]
pub fn checkpoint_dir(data_dir: &std::path::Path) -> PathBuf {
data_dir.join(METADATA_SUBDIR)
}
struct Engine {
me: NodeId,
core: QuorumStateMachine,
log: KraftLog,
image: MetadataImage,
peers: Arc<dyn PeerSender>,
image_tx: watch::Sender<Arc<MetadataImage>>,
leader_tx: watch::Sender<Option<NodeId>>,
quorum_tx: watch::Sender<QuorumStateSnapshot>,
cmd_tx: mpsc::Sender<Command>,
data_dir: PathBuf,
clock_base: Instant,
election_timeout_ms: u64,
election_at: Option<Instant>,
fetch_at: Option<Instant>,
fetch_misses: u32,
commit_waiters: Vec<CommitWaiter>,
was_leader: bool,
held_epoch: LeaderEpoch,
snapshot_interval_records: u64,
last_snapshot_end_offset: i64,
snapshot_fetch: Option<SnapshotFetchState>,
installed_snapshot_epoch: Option<LeaderEpoch>,
}
struct CommitWaiter {
base_offset: i64,
need_offset: i64,
rejection: Option<RaftError>,
reply: oneshot::Sender<Result<(), RaftError>>,
}
#[derive(Clone)]
pub struct KraftController {
cmd_tx: mpsc::Sender<Command>,
image_rx: watch::Receiver<Arc<MetadataImage>>,
leader_rx: watch::Receiver<Option<NodeId>>,
quorum_rx: watch::Receiver<QuorumStateSnapshot>,
me: NodeId,
}
pub struct KraftConfig {
pub me: NodeId,
pub cluster_id: Uuid,
pub initial_state: QuorumState,
pub election_timeout_ms: u64,
pub peers: Arc<dyn PeerSender>,
pub snapshot_interval_records: u64,
}
impl KraftController {
#[must_use]
pub fn spawn(config: KraftConfig, log: KraftLog, data_dir: PathBuf) -> Self {
let cluster_id = config.cluster_id;
let image = MetadataImage::new(cluster_id);
Self::spawn_with_image(config, log, data_dir, image, 0)
}
fn spawn_with_image(
config: KraftConfig,
log: KraftLog,
data_dir: PathBuf,
image: MetadataImage,
last_snapshot_end_offset: i64,
) -> Self {
let KraftConfig {
me,
cluster_id: _,
initial_state,
election_timeout_ms,
peers,
snapshot_interval_records,
} = config;
let core = QuorumStateMachine::new(me, initial_state, election_timeout_ms);
let initial_leader = core.quorum_state().leader_id;
let initial_was_leader = core.role().is_leader();
let initial_epoch = core.quorum_state().leader_epoch;
let mut image = image;
image.apply(&MetadataRecord::V1Voters(VotersRecord {
voters: core.quorum_state().voters.clone(),
}));
let (image_tx, image_rx) = watch::channel(Arc::new(image.clone()));
let (leader_tx, leader_rx) = watch::channel(initial_leader);
let initial_snapshot = QuorumStateSnapshot {
leader_id: initial_leader,
leader_epoch: initial_epoch,
high_watermark: log.hwm(),
log_end_offset: log.log_end_offset(),
log_start_offset: log.log_start_offset(),
voters: initial_state_voters(&core),
per_voter_fetch_offset: std::collections::BTreeMap::new(),
};
let (quorum_tx, quorum_rx) = watch::channel(initial_snapshot);
let (cmd_tx, cmd_rx) = mpsc::channel(256);
let clock_base = Instant::now();
let election_at = if core.is_voter() && initial_leader.is_none() {
if core.quorum_state().voters.len() == 1 {
Some(clock_base)
} else {
let jitter =
crate::kraft::core::election_jitter_ms(me, initial_epoch, election_timeout_ms);
Some(clock_base + Duration::from_millis(election_timeout_ms + jitter))
}
} else {
None
};
let engine = Engine {
me,
core,
log,
image,
peers,
image_tx,
leader_tx,
quorum_tx,
cmd_tx: cmd_tx.clone(),
data_dir,
clock_base,
election_timeout_ms,
election_at,
fetch_at: None,
fetch_misses: 0,
commit_waiters: Vec::new(),
was_leader: initial_was_leader,
held_epoch: initial_epoch,
snapshot_interval_records,
last_snapshot_end_offset,
snapshot_fetch: None,
installed_snapshot_epoch: None,
};
tokio::spawn(engine.run(cmd_rx));
Self {
cmd_tx,
image_rx,
leader_rx,
quorum_rx,
me,
}
}
pub fn open(
data_dir: PathBuf,
me: NodeId,
cluster_id: Uuid,
bootstrap_voters: crabka_metadata::voters::VoterSet,
election_timeout_ms: u64,
peers: Arc<dyn PeerSender>,
snapshot_interval_records: u64,
) -> Result<Self, RaftError> {
std::fs::create_dir_all(&data_dir).map_err(crabka_log::LogError::Io)?;
let mut log = KraftLog::open(&data_dir)?;
let mut image = MetadataImage::new(cluster_id);
let mut last_snapshot_end_offset = 0;
if let Some(bytes) = load_latest_checkpoint(&checkpoint_dir(&data_dir))? {
let records = crate::snapshot::SnapshotReader::read_records(&bytes)?;
image = MetadataImage::from_records(cluster_id, &records);
if let Some((off, _ep)) = latest_checkpoint_id(&checkpoint_dir(&data_dir)) {
last_snapshot_end_offset = off;
}
}
replay_committed(&log, &mut image, 0);
log.advance_hwm(log.log_end_offset());
let initial_state = load_quorum_state(&data_dir, cluster_id, &bootstrap_voters)?
.unwrap_or_else(|| QuorumState::bootstrap(cluster_id, bootstrap_voters));
Ok(Self::spawn_with_image(
KraftConfig {
me,
cluster_id,
initial_state,
election_timeout_ms,
peers,
snapshot_interval_records,
},
log,
data_dir,
image,
last_snapshot_end_offset,
))
}
#[must_use]
pub fn node_id(&self) -> NodeId {
self.me
}
#[must_use]
pub fn current_image(&self) -> Arc<MetadataImage> {
self.image_rx.borrow().clone()
}
#[must_use]
pub fn watch_image(&self) -> watch::Receiver<Arc<MetadataImage>> {
self.image_rx.clone()
}
#[must_use]
pub fn watch_leader(&self) -> watch::Receiver<Option<NodeId>> {
self.leader_rx.clone()
}
#[must_use]
pub fn quorum_snapshot(&self) -> QuorumStateSnapshot {
self.quorum_rx.borrow().clone()
}
pub async fn submit_change(
&self,
records: Vec<crabka_metadata::MetadataRecord>,
) -> Result<(), RaftError> {
let (reply, rx) = oneshot::channel();
self.cmd_tx
.send(Command::SubmitChange { records, reply })
.await
.map_err(|_| RaftError::Shutdown)?;
rx.await.map_err(|_| RaftError::Shutdown)?
}
pub async fn quorum_state(&self) -> Result<QuorumStateSnapshot, RaftError> {
let (reply, rx) = oneshot::channel();
self.cmd_tx
.send(Command::QuorumStateSnapshot { reply })
.await
.map_err(|_| RaftError::Shutdown)?;
rx.await.map_err(|_| RaftError::Shutdown)
}
pub async fn metadata_fetch(
&self,
fetch_offset: i64,
max_bytes: usize,
) -> Result<MetadataFetchSlice, RaftError> {
let (reply, rx) = oneshot::channel();
self.cmd_tx
.send(Command::MetadataFetch {
fetch_offset,
max_bytes,
reply,
})
.await
.map_err(|_| RaftError::Shutdown)?;
rx.await.map_err(|_| RaftError::Shutdown)
}
pub async fn trigger_snapshot(&self) -> Result<(), RaftError> {
let (reply, rx) = oneshot::channel();
self.cmd_tx
.send(Command::TriggerSnapshot { reply })
.await
.map_err(|_| RaftError::Shutdown)?;
rx.await.map_err(|_| RaftError::Shutdown)?
}
pub async fn inject_event(&self, event: Event) -> Result<(), RaftError> {
self.cmd_tx
.send(Command::Event(event))
.await
.map_err(|_| RaftError::Shutdown)
}
pub async fn deliver(&self, inbound: Inbound) -> Result<(), RaftError> {
self.cmd_tx
.send(Command::Inbound(inbound))
.await
.map_err(|_| RaftError::Shutdown)
}
pub async fn shutdown(&self) {
let _ = self.cmd_tx.send(Command::Shutdown).await;
}
#[cfg(test)]
async fn test_append_and_commit(
&self,
records: Vec<crabka_metadata::MetadataRecord>,
) -> Result<i64, RaftError> {
let (reply, rx) = oneshot::channel();
self.cmd_tx
.send(Command::TestAppendAndCommit { records, reply })
.await
.map_err(|_| RaftError::Shutdown)?;
rx.await.map_err(|_| RaftError::Shutdown)
}
}
impl Engine {
async fn run(mut self, mut cmd_rx: mpsc::Receiver<Command>) {
let hb_period =
Duration::from_millis((self.election_timeout_ms / HEARTBEAT_DIVISOR).max(1));
let mut heartbeat = tokio::time::interval(hb_period);
heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
let election_sleep = sleep_until_opt(self.election_at);
let fetch_sleep = sleep_until_opt(self.fetch_at);
tokio::pin!(election_sleep);
tokio::pin!(fetch_sleep);
tokio::select! {
cmd = cmd_rx.recv() => {
match cmd {
None | Some(Command::Shutdown) => break,
Some(c) => self.on_command(c),
}
}
() = &mut election_sleep => {
self.election_at = None;
self.on_timer(TimerTick::Election);
}
() = &mut fetch_sleep => {
self.fetch_at = None;
self.on_timer(TimerTick::Fetch);
}
_ = heartbeat.tick() => {
self.on_timer(TimerTick::Heartbeat);
}
}
}
for w in self.commit_waiters.drain(..) {
let _ = w.reply.send(Err(RaftError::Shutdown));
}
}
fn now(&self) -> SimInstant {
let ms = Instant::now()
.saturating_duration_since(self.clock_base)
.as_millis();
SimInstant(u64::try_from(ms).unwrap_or(u64::MAX))
}
fn on_command(&mut self, cmd: Command) {
match cmd {
Command::Shutdown => {}
Command::Event(event) => self.on_event(event),
Command::FetchResponse { from, body } => self.on_fetch_response(from, &body),
Command::FetchSnapshotResponse { from, body } => {
self.on_fetch_snapshot_response(from, &body);
}
Command::Inbound(inbound) => self.on_inbound(inbound),
Command::Timer(tick) => self.on_timer(tick),
Command::SubmitChange { records, reply } => self.on_submit_change(records, reply),
Command::TriggerSnapshot { reply } => {
let _ = reply.send(self.do_trigger_snapshot());
}
Command::QuorumStateSnapshot { reply } => {
let _ = reply.send(self.quorum_state_snapshot());
}
Command::MetadataFetch {
fetch_offset,
max_bytes,
reply,
} => {
let _ = reply.send(self.metadata_fetch_slice(fetch_offset, max_bytes));
}
#[cfg(test)]
Command::TestAppendAndCommit { records, reply } => {
let off = self.test_append_and_commit(records);
let _ = reply.send(off);
}
}
}
fn on_event(&mut self, event: Event) {
let now = self.now();
let prev_role = self.core.role().name();
let actions = self.core.on_event(event, &self.log, now);
self.execute(actions);
self.reconcile_timers(prev_role);
self.publish_leader();
}
fn on_timer(&mut self, tick: TimerTick) {
match tick {
TimerTick::Election => {
if self.core.is_voter() && !self.core.role().is_leader() {
self.on_event(Event::ElectionTimeout);
}
}
TimerTick::Fetch => {
let leader = self.following_leader();
if let Some(leader_id) = leader {
self.fetch_misses += 1;
if self.fetch_misses >= FETCH_MISS_LIMIT {
self.fetch_misses = 0;
self.on_event(Event::FetchTimeout);
} else {
self.send_fetch(leader_id);
self.arm_fetch_timer();
}
} else if self.core.is_voter() {
self.on_event(Event::FetchTimeout);
}
}
TimerTick::Heartbeat => {
if self.core.role().is_leader() {
let epoch = self.core.quorum_state().leader_epoch;
self.broadcast_begin_quorum_epoch(epoch);
}
}
}
}
fn following_leader(&self) -> Option<NodeId> {
match self.core.role() {
Role::Follower { leader_id, .. } => Some(*leader_id),
Role::Observer { leader_id, .. } => *leader_id,
_ => None,
}
}
#[allow(clippy::too_many_lines)]
fn on_inbound(&mut self, inbound: Inbound) {
match inbound {
Inbound::Vote { req, reply } => {
if let Some(wire::PeerRequest::Vote {
voter_id,
candidate_epoch,
candidate,
last_epoch,
last_offset,
pre_vote,
}) = wire::decode_vote(&req)
{
let event = Event::ReceiveVoteRequest {
from: candidate,
voter_id,
candidate_epoch,
candidate,
candidate_log_end: LogEnd {
last_epoch,
last_offset,
},
pre_vote,
};
let resp = self.run_inbound_reply(event);
let _ = reply.send(resp);
}
}
Inbound::BeginQuorumEpoch { req, reply } => {
if let Some(wire::PeerRequest::BeginQuorumEpoch {
leader_id,
leader_epoch,
}) = wire::decode_begin(&req)
{
self.on_event(Event::ReceiveBeginQuorumEpoch {
leader_id,
leader_epoch,
});
let ack = wire::PeerResponse::Ack {
epoch: self.core.quorum_state().leader_epoch,
};
let _ = reply.send(ack.encode());
}
}
Inbound::EndQuorumEpoch { req, reply } => {
if let Some(wire::PeerRequest::EndQuorumEpoch {
leader_id,
leader_epoch,
}) = wire::decode_end(&req)
{
self.on_event(Event::ReceiveEndQuorumEpoch {
leader_id,
leader_epoch,
});
let ack = wire::PeerResponse::Ack {
epoch: self.core.quorum_state().leader_epoch,
};
let _ = reply.send(ack.encode());
}
}
Inbound::Fetch { req, reply } => {
if let Some(wire::PeerRequest::Fetch {
from,
fetch_epoch,
fetch_offset,
}) = wire::decode_fetch(&req)
{
let now = self.now();
let prev_role = self.core.role().name();
let actions = self.core.on_event(
Event::ReceiveFetch {
from,
fetch_epoch,
fetch_offset,
},
&self.log,
now,
);
let mut diverging = None;
for action in &actions {
if let Action::TruncateTo(point) = action {
diverging = Some(*point);
}
}
self.execute(actions);
self.reconcile_timers(prev_role);
self.publish_leader();
let log_start = self.log.log_start_offset();
let snapshot_id = if fetch_offset >= 0 && fetch_offset < log_start {
self.latest_snapshot_id()
} else {
None
};
let records = if snapshot_id.is_some()
|| diverging.is_some()
|| !self.core.role().is_leader()
{
bytes::Bytes::new()
} else {
self.serve_fetch_records(fetch_offset)
};
let advertised_leader = self.core.quorum_state().leader_id.unwrap_or(self.me);
let resp = wire::PeerResponse::Fetch {
leader_id: advertised_leader,
leader_epoch: self.core.quorum_state().leader_epoch,
diverging,
snapshot_id,
hwm: self.log.hwm(),
records,
};
let _ = reply.send(resp.encode());
}
}
Inbound::FetchSnapshot { req, reply } => {
if let Some(wire::PeerRequest::FetchSnapshot {
snapshot_id,
position,
max_bytes,
..
}) = wire::decode_fetch_snapshot(&req)
{
let (end_offset, epoch) = snapshot_id;
let resp = match load_checkpoint_by_id(
&checkpoint_dir(&self.data_dir),
end_offset,
epoch,
) {
Some(bytes) => {
let max = usize::try_from(max_bytes.max(0)).unwrap_or(0);
let pos = usize::try_from(position.max(0)).unwrap_or(0);
let chunk =
crate::snapshot::SnapshotReader::byte_range(&bytes, pos, max);
wire::PeerResponse::FetchSnapshot {
snapshot_id,
size: i64::try_from(bytes.len()).unwrap_or(i64::MAX),
position,
bytes: bytes::Bytes::copy_from_slice(chunk),
error_code: 0,
}
}
None => wire::PeerResponse::FetchSnapshot {
snapshot_id,
size: 0,
position,
bytes: bytes::Bytes::new(),
error_code: SNAPSHOT_NOT_FOUND,
},
};
let _ = reply.send(resp.encode());
}
}
}
}
fn run_inbound_reply(&mut self, event: Event) -> bytes::Bytes {
let now = self.now();
let prev_role = self.core.role().name();
let actions = self.core.on_event(event, &self.log, now);
let mut resp = wire::PeerResponse::Vote {
epoch: self.core.quorum_state().leader_epoch,
granted: false,
};
let mut local = Vec::new();
for action in actions {
if let Action::ReplyVote { epoch, granted, .. } = action {
resp = wire::PeerResponse::Vote { epoch, granted };
} else {
local.push(action);
}
}
self.execute_local_only(local);
self.reconcile_timers(prev_role);
self.publish_leader();
resp.encode()
}
fn execute(&mut self, actions: Vec<Action>) {
for action in actions {
match action {
Action::SendVoteRequest { epoch, pre_vote } => {
self.broadcast_vote(epoch, pre_vote);
}
Action::SendBeginQuorumEpoch { epoch } => {
self.broadcast_begin_quorum_epoch(epoch);
}
Action::SendEndQuorumEpoch { epoch } => {
self.broadcast_end_quorum_epoch(epoch);
}
Action::SendFetch { leader_id } => {
self.send_fetch(leader_id);
self.fetch_misses = 0;
}
other => self.execute_one_local(other),
}
}
}
fn execute_local_only(&mut self, actions: Vec<Action>) {
for action in actions {
match action {
Action::SendVoteRequest { epoch, pre_vote } => self.broadcast_vote(epoch, pre_vote),
Action::SendBeginQuorumEpoch { epoch } => {
self.broadcast_begin_quorum_epoch(epoch);
}
Action::SendEndQuorumEpoch { epoch } => self.broadcast_end_quorum_epoch(epoch),
Action::SendFetch { leader_id } => {
self.send_fetch(leader_id);
self.fetch_misses = 0;
}
Action::ReplyVote { .. } => {}
other => self.execute_one_local(other),
}
}
}
fn execute_one_local(&mut self, action: Action) {
match action {
Action::AppendLeaderChange { epoch } => {
if let Err(e) = self.append_leader_change(epoch) {
tracing::error!(?e, "kraft: append leader-change failed");
}
}
Action::AdvanceHighWatermark(n) => {
self.advance_and_apply(n);
}
Action::TruncateTo(point) => {
if let Err(e) = self.log.truncate_to(point.offset) {
tracing::error!(?e, "kraft: truncate failed");
}
}
Action::PersistQuorumState => {
if let Err(e) = self.persist_quorum_state() {
tracing::error!(?e, "kraft: persist quorum-state failed");
}
}
Action::ResetTimer { kind, deadline } => match kind {
TimerKind::Election => self.election_at = Some(self.deadline_instant(deadline)),
TimerKind::Fetch => self.fetch_at = Some(self.deadline_instant(deadline)),
},
Action::TransitionedTo(_name) => {}
Action::SendVoteRequest { .. }
| Action::SendBeginQuorumEpoch { .. }
| Action::SendEndQuorumEpoch { .. }
| Action::SendFetch { .. }
| Action::ReplyVote { .. } => {
debug_assert!(false, "network/reply action routed to local executor");
}
}
}
fn reconcile_timers(&mut self, _prev_role: &'static str) {
match self.core.role() {
Role::Leader { .. } => {
self.election_at = None;
self.fetch_at = None;
self.fetch_misses = 0;
}
Role::Follower { .. } | Role::Observer { .. } => {
self.election_at = None;
}
Role::Prospective { .. }
| Role::Candidate { .. }
| Role::Unattached { .. }
| Role::Voted { .. } => {
self.fetch_at = None;
self.fetch_misses = 0;
}
Role::Resigned => {}
}
self.fail_waiters_on_leadership_loss();
}
fn fail_waiters_on_leadership_loss(&mut self) {
let is_leader = self.core.role().is_leader();
let epoch = self.core.quorum_state().leader_epoch;
let lost_leadership = self.was_leader && (!is_leader || epoch != self.held_epoch);
if lost_leadership && !self.commit_waiters.is_empty() {
let current_leader = self.core.quorum_state().leader_id;
for w in self.commit_waiters.drain(..) {
let _ = w.reply.send(Err(RaftError::NotLeader { current_leader }));
}
}
self.was_leader = is_leader;
self.held_epoch = epoch;
}
fn arm_fetch_timer(&mut self) {
self.fetch_at = Some(Instant::now() + Duration::from_millis(self.election_timeout_ms));
}
fn deadline_instant(&self, deadline: SimInstant) -> Instant {
self.clock_base + Duration::from_millis(deadline.0)
}
fn append_leader_change(&mut self, epoch: LeaderEpoch) -> Result<i64, RaftError> {
let voter_ids: Vec<NodeId> = self.core.quorum_state().voters.ids().into_iter().collect();
let mut batch = leader_change_batch(epoch, self.me, &voter_ids);
self.log.append(&mut batch)
}
#[allow(clippy::needless_pass_by_value)]
fn on_submit_change(
&mut self,
records: Vec<crabka_metadata::MetadataRecord>,
reply: oneshot::Sender<Result<(), RaftError>>,
) {
if !self.core.role().is_leader() {
let _ = reply.send(Err(RaftError::NotLeader {
current_leader: self.core.quorum_state().leader_id,
}));
return;
}
let assign_base = self.log.log_end_offset();
let mut scratch = self.image.clone();
let mut value_blobs: Vec<bytes::Bytes> = Vec::new();
for r in &records {
let stamped;
let r: &MetadataRecord = match r {
MetadataRecord::V1BrokerRegistration(b) => {
let delta = i64::try_from(value_blobs.len()).unwrap_or(i64::MAX);
let mut b = b.clone();
b.broker_epoch = assign_base + delta;
stamped = MetadataRecord::V1BrokerRegistration(b);
&stamped
}
other => other,
};
if let Err(e) = scratch.validate(r) {
let _ = reply.send(Err(RaftError::Metadata(e)));
return;
}
match to_kraft_values(r, &scratch) {
Ok(mut blobs) => value_blobs.append(&mut blobs),
Err(e) => {
let _ = reply.send(Err(RaftError::ChangeRejected(format!("encode: {e}"))));
return;
}
}
scratch.apply(r);
}
if value_blobs.is_empty() {
let _ = reply.send(Ok(()));
return;
}
let leader_epoch = self.core.quorum_state().leader_epoch;
let kafka_records: Vec<Record> = value_blobs
.iter()
.map(|blob| Record {
value: Some(blob.clone()),
..Default::default()
})
.collect();
let mut batch = RecordBatch {
partition_leader_epoch: i32::try_from(leader_epoch).unwrap_or(i32::MAX),
last_offset_delta: i32::try_from(value_blobs.len().saturating_sub(1)).unwrap_or(0),
records: kafka_records,
..Default::default()
};
let base = match self.log.append(&mut batch) {
Ok(off) => off,
Err(e) => {
let _ = reply.send(Err(e));
return;
}
};
let need_offset = base + i64::try_from(value_blobs.len()).unwrap_or(1);
self.commit_waiters.push(CommitWaiter {
base_offset: base,
need_offset,
rejection: None,
reply,
});
if self.core.quorum_state().majority() == 1 {
self.advance_and_apply(self.log.log_end_offset());
}
self.try_resolve_waiters();
}
#[cfg(test)]
#[allow(clippy::needless_pass_by_value)]
fn test_append_and_commit(&mut self, records: Vec<crabka_metadata::MetadataRecord>) -> i64 {
let leader_epoch = self.core.quorum_state().leader_epoch;
let mut scratch = self.image.clone();
let mut blobs: Vec<bytes::Bytes> = Vec::new();
for r in &records {
if let Ok(mut bs) = to_kraft_values(r, &scratch) {
blobs.append(&mut bs);
}
scratch.apply(r);
}
let kafka_records: Vec<Record> = blobs
.iter()
.map(|blob| Record {
value: Some(blob.clone()),
..Default::default()
})
.collect();
let mut batch = RecordBatch {
partition_leader_epoch: i32::try_from(leader_epoch).unwrap_or(i32::MAX),
last_offset_delta: i32::try_from(blobs.len().saturating_sub(1)).unwrap_or(0),
records: kafka_records,
..Default::default()
};
let base = match self.log.append(&mut batch) {
Ok(off) => off,
Err(e) => {
tracing::error!(?e, "kraft: test append failed");
return -1;
}
};
self.advance_and_apply(self.log.log_end_offset());
base
}
fn advance_and_apply(&mut self, new_hwm: i64) {
let prev_hwm = self.log.hwm();
self.log.advance_hwm(new_hwm);
let applied_hwm = self.log.hwm();
if applied_hwm <= prev_hwm {
self.try_resolve_waiters();
self.maybe_snapshot_and_prune();
return;
}
match self.log.read_decoded(prev_hwm, MAX_APPLY_BYTES) {
Ok(batches) => {
let mut changed = false;
for batch in &batches {
if batch.base_offset < prev_hwm || batch.base_offset >= applied_hwm {
continue;
}
if batch.attributes.is_control_batch() {
continue;
}
for rec in &batch.records {
let Some(value) = rec.value.as_ref() else {
continue;
};
match from_kraft_value(value, &self.image) {
Ok(meta) => match self.image.validate(&meta) {
Ok(()) => {
self.image.apply(&meta);
changed = true;
}
Err(e) => {
self.note_rejection(batch.base_offset, &e);
tracing::debug!(
?e,
"kraft: rejected committed record on apply"
);
}
},
Err(e) => {
tracing::debug!(?e, "kraft: failed to decode committed record");
}
}
}
}
if changed {
let _ = self.image_tx.send(Arc::new(self.image.clone()));
}
}
Err(e) => tracing::error!(?e, "kraft: read for apply failed"),
}
self.try_resolve_waiters();
self.maybe_snapshot_and_prune();
}
fn maybe_snapshot_and_prune(&mut self) {
if self.snapshot_interval_records == 0 || !self.core.role().is_leader() {
return;
}
let hwm = self.log.hwm();
let advanced = u64::try_from((hwm - self.last_snapshot_end_offset).max(0)).unwrap_or(0);
if advanced < self.snapshot_interval_records {
return;
}
let bytes = match crate::snapshot::SnapshotWriter::serialize(&self.image, 0) {
Ok(b) => b,
Err(e) => {
tracing::error!(?e, "kraft: snapshot serialize failed");
return;
}
};
let epoch = i32::try_from(self.core.quorum_state().leader_epoch).unwrap_or(i32::MAX);
if let Err(e) = write_checkpoint(&checkpoint_dir(&self.data_dir), hwm, epoch, &bytes) {
tracing::error!(?e, "kraft: checkpoint write failed; skipping prune");
return;
}
self.last_snapshot_end_offset = hwm;
if let Err(e) = self.log.prune_to(hwm) {
tracing::error!(?e, "kraft: prune_to failed");
}
retain_latest_checkpoint(&checkpoint_dir(&self.data_dir));
}
fn latest_snapshot_id(&self) -> Option<(i64, i32)> {
latest_checkpoint_id(&checkpoint_dir(&self.data_dir))
}
fn note_rejection(&mut self, record_offset: i64, err: &crabka_metadata::MetadataError) {
for w in &mut self.commit_waiters {
if w.base_offset <= record_offset
&& record_offset < w.need_offset
&& w.rejection.is_none()
{
w.rejection = Some(RaftError::Metadata(err.clone()));
}
}
}
fn try_resolve_waiters(&mut self) {
let hwm = self.log.hwm();
let mut still = Vec::new();
for w in self.commit_waiters.drain(..) {
if hwm >= w.need_offset {
let result = w.rejection.map_or(Ok(()), Err);
let _ = w.reply.send(result);
} else {
still.push(w);
}
}
self.commit_waiters = still;
}
fn do_trigger_snapshot(&self) -> Result<(), RaftError> {
let bytes = crate::snapshot::SnapshotWriter::serialize(&self.image, 0)?;
let end_offset = self.log.hwm();
let epoch = i32::try_from(self.core.quorum_state().leader_epoch).unwrap_or(i32::MAX);
write_checkpoint(&checkpoint_dir(&self.data_dir), end_offset, epoch, &bytes)
}
fn persist_quorum_state(&self) -> Result<(), RaftError> {
save_quorum_state(&self.data_dir, self.core.quorum_state())
}
fn quorum_state_snapshot(&self) -> QuorumStateSnapshot {
let qs = self.core.quorum_state();
let mut per_voter_fetch_offset = std::collections::BTreeMap::new();
if let Role::Leader { replicas, .. } = self.core.role() {
per_voter_fetch_offset.insert(self.core.me(), self.log.log_end_offset());
for (id, progress) in replicas {
per_voter_fetch_offset.insert(*id, progress.fetch_offset);
}
}
QuorumStateSnapshot {
leader_id: qs.leader_id,
leader_epoch: qs.leader_epoch,
high_watermark: self.log.hwm(),
log_end_offset: self.log.log_end_offset(),
log_start_offset: self.log.log_start_offset(),
voters: qs.voters.ids().into_iter().collect(),
per_voter_fetch_offset,
}
}
fn metadata_fetch_slice(&self, fetch_offset: i64, max_bytes: usize) -> MetadataFetchSlice {
let high_watermark = self.log.hwm();
let log_start_offset = self.log.log_start_offset();
let records = if fetch_offset < 0 || fetch_offset >= high_watermark {
bytes::Bytes::new()
} else {
match self.log.read_decoded(fetch_offset, max_bytes.max(1)) {
Ok(batches) => {
let committed: Vec<RecordBatch> = batches
.into_iter()
.filter(|b| b.base_offset < high_watermark)
.collect();
encode_batches(&committed)
}
Err(e) => {
tracing::error!(?e, "kraft: metadata fetch read failed");
bytes::Bytes::new()
}
}
};
MetadataFetchSlice {
records,
log_start_offset,
high_watermark,
}
}
fn other_voters(&self) -> Vec<NodeId> {
self.core
.quorum_state()
.voters
.ids()
.into_iter()
.filter(|&id| id != self.me)
.collect()
}
fn broadcast_vote(&self, epoch: LeaderEpoch, pre_vote: bool) {
let last_epoch = self.log.last_epoch();
let last_offset = self.log.end_offset();
for peer in self.other_voters() {
let body = wire::PeerRequest::Vote {
voter_id: peer,
candidate_epoch: epoch,
candidate: self.me,
last_epoch,
last_offset,
pre_vote,
}
.encode();
self.spawn_send(peer, api_key::VOTE, body);
}
}
fn broadcast_begin_quorum_epoch(&self, epoch: LeaderEpoch) {
let body = wire::PeerRequest::BeginQuorumEpoch {
leader_id: self.me,
leader_epoch: epoch,
}
.encode();
for peer in self.other_voters() {
self.spawn_send(peer, api_key::BEGIN_QUORUM_EPOCH, body.clone());
}
}
fn broadcast_end_quorum_epoch(&self, epoch: LeaderEpoch) {
let body = wire::PeerRequest::EndQuorumEpoch {
leader_id: self.me,
leader_epoch: epoch,
}
.encode();
for peer in self.other_voters() {
self.spawn_send(peer, api_key::END_QUORUM_EPOCH, body.clone());
}
}
fn send_fetch(&self, leader_id: NodeId) {
if leader_id == self.me {
return;
}
let fetch_offset = self.log.end_offset();
let fetch_epoch = match self.installed_snapshot_epoch {
Some(ep) if self.log.log_end_offset() == self.log.log_start_offset() => ep,
_ => self.log.last_epoch(),
};
let body = wire::PeerRequest::Fetch {
from: self.me,
fetch_epoch,
fetch_offset,
}
.encode();
self.spawn_send(leader_id, api_key::FETCH, body);
}
fn send_fetch_snapshot(&self, leader_id: NodeId, snapshot_id: (i64, i32), position: i64) {
if leader_id == self.me {
return;
}
let body = wire::PeerRequest::FetchSnapshot {
from: self.me,
snapshot_id,
position,
max_bytes: i32::try_from(MAX_APPLY_BYTES).unwrap_or(i32::MAX),
}
.encode();
self.spawn_send(leader_id, api_key::FETCH_SNAPSHOT, body);
}
fn serve_fetch_records(&self, fetch_offset: i64) -> bytes::Bytes {
let log_end = self.log.log_end_offset();
if fetch_offset < 0 || fetch_offset >= log_end {
return bytes::Bytes::new();
}
let batches = match self.log.read_decoded(fetch_offset, MAX_APPLY_BYTES) {
Ok(b) => b,
Err(e) => {
tracing::error!(?e, "kraft: serve_fetch read failed");
return bytes::Bytes::new();
}
};
encode_batches(&batches)
}
fn on_fetch_response(&mut self, from: NodeId, body: &[u8]) {
let Some(wire::PeerResponse::Fetch {
leader_id,
leader_epoch,
diverging,
snapshot_id,
hwm,
records,
}) = wire::PeerResponse::decode_fetch(body)
else {
return;
};
let _ = from;
if let Some(id) = snapshot_id {
if id.0 > self.log.log_end_offset()
&& self
.snapshot_fetch
.as_ref()
.is_none_or(|s| s.snapshot_id != id)
{
self.snapshot_fetch = Some(SnapshotFetchState::new(id, leader_id));
self.send_fetch_snapshot(leader_id, id, 0);
}
self.on_event(Event::ReceiveFetchResponse {
leader_id,
leader_epoch,
diverging,
});
return;
}
if let Some(point) = diverging {
if let Err(e) = self.log.truncate_to(point.offset) {
tracing::error!(?e, "kraft: follower truncate failed");
}
} else if !records.is_empty() {
match decode_batches(&records) {
Ok(batches) => {
for mut batch in batches {
let at = batch.base_offset;
let log_end = self.log.log_end_offset();
if at < log_end {
continue; }
if at > log_end {
break;
}
if let Err(e) = self.log.append_at(&mut batch, at) {
tracing::error!(?e, at, "kraft: follower append_at failed");
break;
}
self.installed_snapshot_epoch = None;
}
}
Err(e) => tracing::error!(?e, "kraft: follower decode batches failed"),
}
let target = hwm.min(self.log.log_end_offset());
self.advance_and_apply(target);
} else {
let target = hwm.min(self.log.log_end_offset());
self.advance_and_apply(target);
}
self.on_event(Event::ReceiveFetchResponse {
leader_id,
leader_epoch,
diverging,
});
}
fn on_fetch_snapshot_response(&mut self, from: NodeId, body: &[u8]) {
let Some(wire::PeerResponse::FetchSnapshot {
snapshot_id,
size,
position,
bytes,
error_code,
}) = wire::PeerResponse::decode_fetch_snapshot(body)
else {
return;
};
let Some(state) = self.snapshot_fetch.as_mut() else {
return;
};
if error_code != 0 || from != state.leader_id {
self.snapshot_fetch = None;
self.send_fetch(from);
return;
}
match state.on_chunk(snapshot_id, size, position, &bytes) {
SnapshotFetchStep::Continue { next_position } => {
self.send_fetch_snapshot(from, snapshot_id, next_position);
}
SnapshotFetchStep::Restart => {
self.snapshot_fetch = None;
self.send_fetch(from);
}
SnapshotFetchStep::Complete(assembled) => {
let id = state.snapshot_id;
self.snapshot_fetch = None;
if let Err(e) = self.install_fetched_snapshot(id, &assembled) {
tracing::error!(?e, "kraft: snapshot install failed; will re-fetch");
}
self.send_fetch(from);
}
}
}
fn install_fetched_snapshot(&mut self, id: (i64, i32), bytes: &[u8]) -> Result<(), RaftError> {
let (end_offset, epoch) = id;
let records = crate::snapshot::SnapshotReader::read_records(bytes)?;
if end_offset <= self.log.log_end_offset() {
return Ok(()); }
let cluster_id = self.image.cluster_id();
let mut new_image = MetadataImage::from_records(cluster_id, &records);
new_image.apply(&MetadataRecord::V1Voters(VotersRecord {
voters: self.core.quorum_state().voters.clone(),
}));
write_checkpoint(&checkpoint_dir(&self.data_dir), end_offset, epoch, bytes)?;
self.image = new_image;
self.log.install_snapshot(end_offset)?;
self.last_snapshot_end_offset = end_offset;
self.installed_snapshot_epoch = Some(u32::try_from(epoch).unwrap_or(0));
let _ = self.image_tx.send(Arc::new(self.image.clone()));
retain_latest_checkpoint(&checkpoint_dir(&self.data_dir));
Ok(())
}
fn spawn_send(&self, peer: NodeId, api_key: i16, body: bytes::Bytes) {
let peers = Arc::clone(&self.peers);
let cmd_tx = self.cmd_tx.clone();
tokio::spawn(async move {
match peers.send(peer, api_key, body).await {
Ok(resp_body) => {
if api_key == self::api_key::FETCH {
let _ = cmd_tx
.send(Command::FetchResponse {
from: peer,
body: resp_body,
})
.await;
} else if api_key == self::api_key::FETCH_SNAPSHOT {
let _ = cmd_tx
.send(Command::FetchSnapshotResponse {
from: peer,
body: resp_body,
})
.await;
} else if let Some(event) = response_to_event(peer, api_key, &resp_body) {
let _ = cmd_tx.send(Command::Event(event)).await;
}
}
Err(e) => tracing::debug!(peer, ?e, "kraft: peer send failed"),
}
});
}
fn publish_leader(&self) {
let leader = self.core.quorum_state().leader_id;
if *self.leader_tx.borrow() != leader {
let _ = self.leader_tx.send(leader);
}
let snapshot = self.quorum_state_snapshot();
self.quorum_tx.send_replace(snapshot);
}
}
fn response_to_event(peer: NodeId, api_key: i16, body: &[u8]) -> Option<Event> {
match api_key {
self::api_key::VOTE => match wire::PeerResponse::decode_vote(body)? {
wire::PeerResponse::Vote { epoch, granted } => Some(Event::ReceiveVoteResponse {
from: peer,
epoch,
vote_granted: granted,
}),
_ => None,
},
_ => None,
}
}
async fn sleep_until_opt(deadline: Option<Instant>) {
match deadline {
Some(at) => tokio::time::sleep_until(at).await,
None => std::future::pending::<()>().await,
}
}
const MAX_APPLY_BYTES: usize = 8 * 1024 * 1024;
fn encode_batches(batches: &[RecordBatch]) -> bytes::Bytes {
let mut out = bytes::BytesMut::new();
for batch in batches {
if let Err(e) = batch.encode(&mut out) {
tracing::error!(?e, "kraft: encode batch for fetch serve failed");
}
}
out.freeze()
}
fn decode_batches(mut buf: &[u8]) -> Result<Vec<RecordBatch>, RaftError> {
let mut out = Vec::new();
while !buf.is_empty() {
match RecordBatch::decode(&mut buf) {
Ok(batch) => out.push(batch),
Err(e) => {
return Err(RaftError::ChangeRejected(format!(
"decode replicated batch: {e}"
)));
}
}
}
Ok(out)
}
fn initial_state_voters(core: &QuorumStateMachine) -> Vec<NodeId> {
core.quorum_state().voters.ids().into_iter().collect()
}
fn leader_change_batch(epoch: LeaderEpoch, leader_id: NodeId, voter_ids: &[NodeId]) -> RecordBatch {
use crabka_protocol::Encode;
use crabka_protocol::owned::common::leader_change_message::voter::Voter;
use crabka_protocol::owned::leader_change_message::LeaderChangeMessage;
use crabka_protocol::records::header::Attributes;
use crabka_protocol::records::metadata::control::{ControlRecordType, control_record_key};
let voters: Vec<Voter> = voter_ids
.iter()
.map(|&id| Voter {
voter_id: i32::try_from(id).unwrap_or(i32::MAX),
..Default::default()
})
.collect();
let msg = LeaderChangeMessage {
version: 0,
leader_id: i32::try_from(leader_id).unwrap_or(i32::MAX),
voters: voters.clone(),
granting_voters: voters,
..Default::default()
};
let mut value = bytes::BytesMut::new();
let _ = msg.encode(&mut value, 0);
let key = control_record_key(ControlRecordType::LeaderChange);
RecordBatch {
partition_leader_epoch: i32::try_from(epoch).unwrap_or(i32::MAX),
attributes: Attributes::default().with_control(true),
last_offset_delta: 0,
records: vec![Record {
offset_delta: 0,
key: Some(key),
value: Some(value.freeze()),
..Default::default()
}],
..Default::default()
}
}
fn replay_committed(log: &KraftLog, image: &mut MetadataImage, from: i64) {
match log.read_decoded(from, MAX_APPLY_BYTES) {
Ok(batches) => {
for batch in &batches {
if batch.attributes.is_control_batch() {
continue;
}
for rec in &batch.records {
let Some(value) = rec.value.as_ref() else {
continue;
};
if let Ok(meta) = from_kraft_value(value, image)
&& image.validate(&meta).is_ok()
{
image.apply(&meta);
}
}
}
}
Err(e) => tracing::error!(?e, "kraft: replay for recovery failed"),
}
}
fn save_quorum_state(dir: &std::path::Path, state: &QuorumState) -> Result<(), RaftError> {
let mut buf = Vec::with_capacity(64);
buf.extend_from_slice(state.cluster_id.as_bytes());
buf.put_u32(state.leader_epoch);
if let Some(id) = state.leader_id {
buf.put_u8(1);
buf.put_u64(id);
} else {
buf.put_u8(0);
buf.put_u64(0);
}
if let Some(k) = state.voted_key {
buf.put_u8(1);
buf.put_u64(k.id);
buf.extend_from_slice(k.directory_id.as_bytes());
} else {
buf.put_u8(0);
buf.put_u64(0);
buf.extend_from_slice(&[0u8; 16]);
}
let path = dir.join(QUORUM_STATE_FILE);
let tmp = path.with_extension("tmp");
std::fs::write(&tmp, &buf).map_err(crabka_log::LogError::Io)?;
std::fs::rename(&tmp, &path).map_err(crabka_log::LogError::Io)?;
Ok(())
}
fn load_quorum_state(
dir: &std::path::Path,
cluster_id: Uuid,
voters: &crabka_metadata::voters::VoterSet,
) -> Result<Option<QuorumState>, RaftError> {
use bytes::Buf;
let path = dir.join(QUORUM_STATE_FILE);
let bytes = match std::fs::read(&path) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(RaftError::Storage(crabka_log::LogError::Io(e))),
};
if bytes.len() < 54 {
return Ok(None);
}
let mut cur: &[u8] = &bytes;
let mut cid = [0u8; 16];
cur.copy_to_slice(&mut cid);
let _ = cluster_id; let leader_epoch = cur.get_u32();
let leader_present = cur.get_u8() != 0;
let leader_raw = cur.get_u64();
let leader_id = leader_present.then_some(leader_raw);
let voted_present = cur.get_u8() != 0;
let voted_id = cur.get_u64();
let mut dir_bytes = [0u8; 16];
cur.copy_to_slice(&mut dir_bytes);
let voted_key = voted_present.then(|| ReplicaKey {
id: voted_id,
directory_id: Uuid::from_bytes(dir_bytes),
});
let _ = leader_id;
Ok(Some(QuorumState {
cluster_id: Uuid::from_bytes(cid),
leader_epoch,
leader_id: None,
voted_key,
voters: voters.clone(),
}))
}
fn write_checkpoint(
dir: &std::path::Path,
end_offset: i64,
epoch: i32,
bytes: &[u8],
) -> Result<(), RaftError> {
std::fs::create_dir_all(dir).map_err(crabka_log::LogError::Io)?;
let name = format!("{end_offset:020}-{epoch:010}.checkpoint");
let path = dir.join(name);
let tmp = path.with_extension("tmp");
std::fs::write(&tmp, bytes).map_err(crabka_log::LogError::Io)?;
std::fs::rename(&tmp, &path).map_err(crabka_log::LogError::Io)?;
Ok(())
}
fn load_latest_checkpoint(dir: &std::path::Path) -> Result<Option<Vec<u8>>, RaftError> {
let Some((end_offset, epoch)) = latest_checkpoint_id(dir) else {
return Ok(None);
};
let name = format!("{end_offset:020}-{epoch:010}.checkpoint");
let bytes = std::fs::read(dir.join(name)).map_err(crabka_log::LogError::Io)?;
Ok(Some(bytes))
}
fn latest_checkpoint_id(dir: &std::path::Path) -> Option<(i64, i32)> {
let entries = std::fs::read_dir(dir).ok()?;
let mut best: Option<(i64, i32)> = None;
for entry in entries.flatten() {
let name = entry.file_name();
let Some(name) = name.to_str() else { continue };
let Some(stem) = name.strip_suffix(".checkpoint") else {
continue;
};
let Some((off, ep)) = stem.split_once('-') else {
continue;
};
let (Ok(off), Ok(ep)) = (off.parse::<i64>(), ep.parse::<i32>()) else {
continue;
};
if best.is_none_or(|cur| (off, ep) > cur) {
best = Some((off, ep));
}
}
best
}
fn retain_latest_checkpoint(dir: &std::path::Path) {
let Some(latest) = latest_checkpoint_id(dir) else {
return;
};
let Ok(entries) = std::fs::read_dir(dir) else {
return;
};
for entry in entries.flatten() {
let name = entry.file_name();
let Some(name) = name.to_str() else { continue };
let Some(stem) = name.strip_suffix(".checkpoint") else {
continue;
};
let Some((off, ep)) = stem.split_once('-') else {
continue;
};
let (Ok(off), Ok(ep)) = (off.parse::<i64>(), ep.parse::<i32>()) else {
continue;
};
if (off, ep) != latest {
let _ = std::fs::remove_file(entry.path());
}
}
}
fn load_checkpoint_by_id(dir: &std::path::Path, end_offset: i64, epoch: i32) -> Option<Vec<u8>> {
let name = format!("{end_offset:020}-{epoch:010}.checkpoint");
std::fs::read(dir.join(name)).ok()
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::time::Duration as StdDuration;
use crate::kraft::transport::NullPeerSender;
fn voter_set(ids: &[NodeId]) -> crabka_metadata::voters::VoterSet {
crabka_metadata::voters::VoterSet::from_voters(ids.iter().map(|&id| {
crabka_metadata::voters::Voter {
id,
directory_id: uuid::Uuid::nil(),
endpoints: Vec::new(),
kraft_version: crabka_metadata::voters::KRaftVersionRange::default(),
}
}))
}
fn build(me: NodeId, ids: &[NodeId]) -> (KraftController, tempfile::TempDir) {
build_with_timeout(me, ids, 1000)
}
fn build_with_timeout(
me: NodeId,
ids: &[NodeId],
timeout_ms: u64,
) -> (KraftController, tempfile::TempDir) {
build_full(me, ids, timeout_ms, 0)
}
fn build_with_snapshot_interval(
me: NodeId,
ids: &[NodeId],
snapshot_interval_records: u64,
) -> (KraftController, tempfile::TempDir) {
build_full(me, ids, 1000, snapshot_interval_records)
}
fn build_full(
me: NodeId,
ids: &[NodeId],
timeout_ms: u64,
snapshot_interval_records: u64,
) -> (KraftController, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let log = KraftLog::open(dir.path()).expect("open log");
let state = QuorumState::bootstrap(uuid::Uuid::nil(), voter_set(ids));
let ctrl = KraftController::spawn(
KraftConfig {
me,
cluster_id: uuid::Uuid::nil(),
initial_state: state,
election_timeout_ms: timeout_ms,
peers: Arc::new(NullPeerSender),
snapshot_interval_records,
},
log,
dir.path().to_path_buf(),
);
(ctrl, dir)
}
fn topic_record(name: &str) -> Vec<crabka_metadata::MetadataRecord> {
topic_record_named(name, 1)
}
async fn elect_leader_with_helper(ctrl: &KraftController, me: NodeId, helper: NodeId) {
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
ctrl.inject_event(Event::ReceiveVoteResponse {
from: helper,
epoch: 0,
vote_granted: true,
})
.await
.unwrap();
ctrl.inject_event(Event::ReceiveVoteResponse {
from: helper,
epoch: 1,
vote_granted: true,
})
.await
.unwrap();
await_leader(ctrl, Some(me)).await;
}
async fn await_leader(ctrl: &KraftController, want: Option<NodeId>) {
let mut rx = ctrl.watch_leader();
for _ in 0..200 {
if *rx.borrow() == want {
return;
}
let _ = tokio::time::timeout(StdDuration::from_secs(5), rx.changed()).await;
}
assert!(*rx.borrow() == want, "leader did not reach {want:?}");
}
#[tokio::test]
async fn single_voter_engine_starts_with_no_initial_leader() {
let (ctrl, _dir) = build(1, &[1]);
let initial = *ctrl.watch_leader().borrow();
assert!(initial.is_none());
ctrl.shutdown().await;
}
#[tokio::test]
async fn injected_election_makes_single_voter_leader() {
let (ctrl, _dir) = build(1, &[1]);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
ctrl.shutdown().await;
}
#[tokio::test]
async fn committed_batch_applies_to_image() {
let (ctrl, _dir) = build(1, &[1]);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
assert!(ctrl.current_image().topic("t").is_none());
let off = ctrl
.test_append_and_commit(topic_record("t"))
.await
.unwrap();
assert!(off >= 0);
let mut img_rx = ctrl.watch_image();
assert!(img_rx.borrow_and_update().topic("t").is_some());
assert!(ctrl.current_image().topic("t").is_some());
ctrl.shutdown().await;
}
#[tokio::test]
async fn duplicate_committed_record_rejected_on_apply() {
let (ctrl, _dir) = build(1, &[1]);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
ctrl.test_append_and_commit(topic_record("t"))
.await
.unwrap();
assert!(ctrl.current_image().topic("t").is_some());
ctrl.test_append_and_commit(topic_record("t"))
.await
.unwrap();
assert!(ctrl.current_image().topic("t").is_some());
ctrl.shutdown().await;
}
#[tokio::test]
async fn single_voter_auto_elects_on_election_timeout() {
let (ctrl, _dir) = build_with_timeout(1, &[1], 80);
tokio::time::timeout(StdDuration::from_secs(5), await_leader(&ctrl, Some(1)))
.await
.expect("auto-elected within timeout");
ctrl.shutdown().await;
}
#[tokio::test]
async fn follower_with_live_leader_does_not_elect() {
let (ctrl, _dir) = build_with_timeout(1, &[1, 2, 3], 120);
ctrl.inject_event(Event::ReceiveBeginQuorumEpoch {
leader_id: 2,
leader_epoch: 1,
})
.await
.unwrap();
await_leader(&ctrl, Some(2)).await;
for _ in 0..6 {
tokio::time::sleep(StdDuration::from_millis(40)).await;
ctrl.inject_event(Event::ReceiveBeginQuorumEpoch {
leader_id: 2,
leader_epoch: 1,
})
.await
.unwrap();
}
let leader = *ctrl.watch_leader().borrow();
assert!(
leader == Some(2),
"follower spuriously left leader 2: {leader:?}"
);
ctrl.shutdown().await;
}
#[tokio::test]
async fn submit_change_commits_on_single_voter_leader() {
let (ctrl, _dir) = build(1, &[1]);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
tokio::time::timeout(
StdDuration::from_secs(5),
ctrl.submit_change(topic_record("orders")),
)
.await
.expect("submit did not hang")
.expect("submit ok");
assert!(ctrl.current_image().topic("orders").is_some());
let qs = ctrl.quorum_state().await.unwrap();
assert!(qs.leader_id == Some(1));
assert!(qs.high_watermark > 0);
ctrl.shutdown().await;
}
#[tokio::test]
async fn submit_change_duplicate_rejected() {
let (ctrl, _dir) = build(1, &[1]);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
ctrl.submit_change(topic_record("t")).await.unwrap();
let dup = ctrl.submit_change(topic_record("t")).await;
assert!(matches!(dup, Err(RaftError::Metadata(_))), "got {dup:?}");
ctrl.shutdown().await;
}
#[tokio::test]
async fn submit_waiter_fails_on_leadership_loss() {
let (ctrl, _dir) = build(1, &[1, 2, 3]);
elect_leader_with_helper(&ctrl, 1, 2).await;
let ctrl2 = ctrl.clone();
let submit = tokio::spawn(async move { ctrl2.submit_change(topic_record("orders")).await });
tokio::time::sleep(StdDuration::from_millis(50)).await;
ctrl.inject_event(Event::ReceiveBeginQuorumEpoch {
leader_id: 2,
leader_epoch: 9,
})
.await
.unwrap();
let result = tokio::time::timeout(StdDuration::from_secs(5), submit)
.await
.expect("submit did not hang on leadership loss")
.expect("join");
assert!(
matches!(
result,
Err(RaftError::NotLeader {
current_leader: Some(2)
})
),
"got {result:?}"
);
ctrl.shutdown().await;
}
#[tokio::test]
async fn submit_change_on_non_leader_rejects() {
let (ctrl, _dir) = build(1, &[1, 2, 3]);
let r = ctrl.submit_change(topic_record("t")).await;
assert!(matches!(r, Err(RaftError::NotLeader { .. })), "got {r:?}");
ctrl.shutdown().await;
}
fn topic_record_named(name: &str, id: u128) -> Vec<crabka_metadata::MetadataRecord> {
vec![
crabka_metadata::MetadataRecord::V1Topic(crabka_metadata::TopicRecord {
name: name.to_string(),
topic_id: uuid::Uuid::from_u128(id),
partitions: 1,
replication_factor: 1,
}),
crabka_metadata::MetadataRecord::V1Partition(crabka_metadata::PartitionRecord {
topic: name.to_string(),
partition: 0,
leader: 1,
replicas: vec![1],
isr: vec![1],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
]
}
#[tokio::test]
async fn rejection_scoped_to_owning_waiter_range() {
let (ctrl, _dir) = build(1, &[1, 2, 3]);
elect_leader_with_helper(&ctrl, 1, 2).await;
let ca = ctrl.clone();
let cb = ctrl.clone();
let cc = ctrl.clone();
let a = tokio::spawn(async move { ca.submit_change(topic_record_named("first", 1)).await });
tokio::time::sleep(StdDuration::from_millis(20)).await;
let b = tokio::spawn(async move { cb.submit_change(topic_record_named("first", 1)).await });
tokio::time::sleep(StdDuration::from_millis(20)).await;
let c = tokio::spawn(async move { cc.submit_change(topic_record_named("third", 3)).await });
tokio::time::sleep(StdDuration::from_millis(40)).await;
let qs = ctrl.quorum_state().await.unwrap();
ctrl.inject_event(Event::ReceiveFetch {
from: 2,
fetch_epoch: qs.leader_epoch,
fetch_offset: qs.log_end_offset,
})
.await
.unwrap();
let ra = tokio::time::timeout(StdDuration::from_secs(5), a)
.await
.expect("A did not hang")
.expect("join");
let rb = tokio::time::timeout(StdDuration::from_secs(5), b)
.await
.expect("B did not hang")
.expect("join");
let rc = tokio::time::timeout(StdDuration::from_secs(5), c)
.await
.expect("C did not hang")
.expect("join");
assert!(ra.is_ok(), "A (first valid) should commit: {ra:?}");
assert!(
matches!(rb, Err(RaftError::Metadata(_))),
"B (duplicate) should be rejected: {rb:?}"
);
assert!(
rc.is_ok(),
"C (distinct valid) must NOT bleed B's rejection: {rc:?}"
);
ctrl.shutdown().await;
}
#[tokio::test]
async fn snapshot_then_restart_recovers_image() {
let dir = tempfile::tempdir().expect("tempdir");
let data_dir = dir.path().to_path_buf();
let cluster_id = uuid::Uuid::from_u128(7);
let voters = voter_set(&[1]);
{
let log = KraftLog::open(&data_dir).expect("open log");
let ctrl = KraftController::spawn(
KraftConfig {
me: 1,
cluster_id,
initial_state: QuorumState::bootstrap(cluster_id, voters.clone()),
election_timeout_ms: 1000,
peers: Arc::new(NullPeerSender),
snapshot_interval_records: 0,
},
log,
data_dir.clone(),
);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
ctrl.submit_change(topic_record("recovered")).await.unwrap();
assert!(ctrl.current_image().topic("recovered").is_some());
ctrl.trigger_snapshot().await.unwrap();
ctrl.shutdown().await;
tokio::time::sleep(StdDuration::from_millis(50)).await;
}
let ctrl2 = KraftController::open(
data_dir.clone(),
1,
cluster_id,
voters,
1000,
Arc::new(NullPeerSender),
0,
)
.expect("reopen");
assert!(ctrl2.current_image().topic("recovered").is_some());
ctrl2.shutdown().await;
}
#[tokio::test]
async fn quorum_state_file_round_trips() {
let dir = tempfile::tempdir().expect("tempdir");
let cid = uuid::Uuid::from_u128(9);
let mut state = QuorumState::bootstrap(cid, voter_set(&[1, 2, 3]));
state.leader_epoch = 5;
state.leader_id = Some(2);
state.voted_key = Some(ReplicaKey {
id: 3,
directory_id: uuid::Uuid::from_u128(3),
});
save_quorum_state(dir.path(), &state).unwrap();
let loaded = load_quorum_state(dir.path(), cid, &voter_set(&[1, 2, 3]))
.unwrap()
.expect("present");
assert!(loaded.leader_epoch == 5);
assert!(loaded.leader_id.is_none());
assert!(loaded.voted_key.map(|k| k.id) == Some(3));
assert!(loaded.cluster_id == cid);
}
#[tokio::test]
async fn leader_snapshots_and_prunes_at_threshold() {
let (ctrl, dir) = build_with_snapshot_interval(1, &[1], 3);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
for name in ["a", "b", "c", "d"] {
ctrl.submit_change(topic_record(name)).await.unwrap();
}
let cp = load_latest_checkpoint(&checkpoint_dir(dir.path()))
.expect("scan checkpoints")
.expect("a checkpoint exists");
assert!(!cp.is_empty());
let qs = ctrl.quorum_state().await.unwrap();
assert!(
qs.log_start_offset > 0,
"log not pruned: log_start_offset = {}",
qs.log_start_offset
);
ctrl.shutdown().await;
}
#[tokio::test]
async fn broker_registration_epoch_equals_commit_offset() {
use crabka_metadata::{BrokerRegistrationRecord, MetadataRecord};
let (ctrl, _dir) = build(1, &[1]);
ctrl.inject_event(Event::ElectionTimeout).await.unwrap();
await_leader(&ctrl, Some(1)).await;
let reg = |id: u64| {
vec![MetadataRecord::V1BrokerRegistration(
BrokerRegistrationRecord {
node_id: id,
broker_epoch: 0, incarnation_id: uuid::Uuid::from_u128(u128::from(id)),
host: "h".into(),
port: 9092,
rack: None,
endpoints: vec![],
},
)]
};
let base1 = ctrl.quorum_state().await.unwrap().log_end_offset;
ctrl.submit_change(reg(7))
.await
.expect("first registration");
let e1 = ctrl.current_image().broker_epoch(7);
assert!(e1 == Some(base1), "epoch {e1:?} != commit offset {base1}");
let base2 = ctrl.quorum_state().await.unwrap().log_end_offset;
ctrl.submit_change(reg(7)).await.expect("re-registration");
let e2 = ctrl.current_image().broker_epoch(7);
assert!(e2 == Some(base2), "re-reg epoch {e2:?} != offset {base2}");
assert!(base2 > base1 && e2 > e1, "epoch must strictly increase");
ctrl.shutdown().await;
}
}