use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::warn;
use nodedb_cluster::raft_loop::CommitApplier;
use nodedb_raft::message::LogEntry;
use super::propose_tracker::ProposeTracker;
pub struct ApplyBatch {
pub(crate) group_id: u64,
pub(crate) entries: Vec<LogEntry>,
}
pub struct DistributedApplier {
apply_tx: mpsc::Sender<ApplyBatch>,
tracker: Arc<ProposeTracker>,
}
impl DistributedApplier {
pub fn new(apply_tx: mpsc::Sender<ApplyBatch>, tracker: Arc<ProposeTracker>) -> Self {
Self { apply_tx, tracker }
}
pub fn tracker(&self) -> &Arc<ProposeTracker> {
&self.tracker
}
}
impl CommitApplier for DistributedApplier {
fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64 {
let last_index = entries.last().map(|e| e.index).unwrap_or(0);
for entry in entries {
if entry.data.is_empty() {
tracing::error!(
group_id,
log_index = entry.index,
"leader-change no-op committed at index where a proposer was waiting; \
surfacing RetryableLeaderChange so the gateway re-proposes"
);
self.tracker.complete(
group_id,
entry.index,
0,
Err(crate::Error::RetryableLeaderChange {
group_id,
log_index: entry.index,
}),
);
}
}
let real_entries: Vec<LogEntry> = entries
.iter()
.filter(|e| !e.data.is_empty())
.cloned()
.collect();
if real_entries.is_empty() {
return last_index;
}
if let Err(e) = self.apply_tx.try_send(ApplyBatch {
group_id,
entries: real_entries,
}) {
warn!(group_id, error = %e, "apply queue full, entries will be retried on next tick");
return 0;
}
last_index
}
}
pub fn create_distributed_applier(
tracker: Arc<ProposeTracker>,
) -> (DistributedApplier, mpsc::Receiver<ApplyBatch>) {
let (tx, rx) = mpsc::channel(1024);
let applier = DistributedApplier::new(tx, tracker);
(applier, rx)
}