use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use nodedb_cluster::GroupAppliedWatchers;
pub type ProposeResult = std::result::Result<Vec<u8>, crate::Error>;
enum TrackerSlot {
Waiting {
tx: oneshot::Sender<ProposeResult>,
expected_key: u64,
},
Completed(ProposeResult),
}
pub struct ProposeTracker {
slots: Mutex<HashMap<(u64, u64), TrackerSlot>>,
group_watchers: Option<Arc<GroupAppliedWatchers>>,
}
impl Default for ProposeTracker {
fn default() -> Self {
Self::new()
}
}
impl ProposeTracker {
pub fn new() -> Self {
Self {
slots: Mutex::new(HashMap::new()),
group_watchers: None,
}
}
pub fn with_group_watchers(mut self, watchers: Arc<GroupAppliedWatchers>) -> Self {
self.group_watchers = Some(watchers);
self
}
pub fn register(
&self,
group_id: u64,
log_index: u64,
expected_key: u64,
) -> oneshot::Receiver<ProposeResult> {
let (tx, rx) = oneshot::channel();
let mut slots = self.slots.lock().unwrap_or_else(|p| p.into_inner());
match slots.entry((group_id, log_index)) {
Entry::Vacant(e) => {
e.insert(TrackerSlot::Waiting { tx, expected_key });
}
Entry::Occupied(e) => {
match e.get() {
TrackerSlot::Completed(_) => {
if let TrackerSlot::Completed(result) = e.remove() {
let _ = tx.send(result);
}
}
TrackerSlot::Waiting { .. } => {
*e.into_mut() = TrackerSlot::Waiting { tx, expected_key };
}
}
}
}
rx
}
pub fn complete(
&self,
group_id: u64,
log_index: u64,
applied_key: u64,
result: ProposeResult,
) -> bool {
if let Some(w) = &self.group_watchers {
w.bump(group_id, log_index);
}
let mut slots = self.slots.lock().unwrap_or_else(|p| p.into_inner());
match slots.entry((group_id, log_index)) {
Entry::Vacant(e) => {
e.insert(TrackerSlot::Completed(result));
false
}
Entry::Occupied(e) => {
match e.get() {
TrackerSlot::Waiting { expected_key, .. } => {
let mismatch =
applied_key != 0 && *expected_key != 0 && applied_key != *expected_key;
let final_result = if mismatch {
tracing::warn!(
group_id,
log_index,
applied_key,
expected_key = *expected_key,
"raft entry at proposer's index was overwritten by \
a different proposal (idempotency_key mismatch); \
surfacing RetryableLeaderChange"
);
Err(crate::Error::RetryableLeaderChange {
group_id,
log_index,
})
} else {
result
};
if let TrackerSlot::Waiting { tx, .. } = e.remove() {
let _ = tx.send(final_result);
return true;
}
}
TrackerSlot::Completed(_) => {
*e.into_mut() = TrackerSlot::Completed(result);
}
}
false
}
}
}
}