use std::collections::HashMap;
use std::sync::Mutex;
use nodedb_types::NodeId;
use crate::swim::member::record::MemberUpdate;
use super::entry::PendingUpdate;
#[derive(Debug, Default)]
pub struct DisseminationQueue {
inner: Mutex<HashMap<NodeId, PendingUpdate>>,
}
impl DisseminationQueue {
pub fn new() -> Self {
Self::default()
}
pub fn enqueue(&self, update: MemberUpdate) {
let mut guard = self.inner.lock().expect("dissemination lock poisoned");
guard.insert(update.node_id.clone(), PendingUpdate::new(update));
}
pub fn len(&self) -> usize {
self.inner
.lock()
.expect("dissemination lock poisoned")
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn clear(&self) {
self.inner
.lock()
.expect("dissemination lock poisoned")
.clear();
}
pub fn take_for_message(&self, max: usize, lambda_log_n: u32) -> Vec<MemberUpdate> {
if max == 0 {
return Vec::new();
}
let mut guard = self.inner.lock().expect("dissemination lock poisoned");
let mut keys: Vec<NodeId> = guard.keys().cloned().collect();
keys.sort_by(|a, b| {
let pa = &guard[a];
let pb = &guard[b];
pa.sent_count
.cmp(&pb.sent_count)
.then_with(|| a.as_str().cmp(b.as_str()))
.then_with(|| pa.update.incarnation.cmp(&pb.update.incarnation))
});
keys.truncate(max);
let mut out = Vec::with_capacity(keys.len());
for k in keys {
if let Some(pending) = guard.get_mut(&k) {
pending.record_sent();
out.push(pending.update.clone());
if pending.sent_count >= lambda_log_n {
guard.remove(&k);
}
}
}
out
}
pub fn fanout_threshold(cluster_size: usize, lambda: u32) -> u32 {
let n = (cluster_size + 1).max(2) as f64;
(lambda as f64 * n.log2()).ceil() as u32
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::swim::incarnation::Incarnation;
use crate::swim::member::MemberState;
fn upd(id: &str, inc: u64) -> MemberUpdate {
MemberUpdate {
node_id: NodeId::new(id),
addr: "127.0.0.1:7000".to_string(),
state: MemberState::Alive,
incarnation: Incarnation::new(inc),
}
}
#[test]
fn enqueue_replaces_by_node_id() {
let q = DisseminationQueue::new();
q.enqueue(upd("n1", 1));
q.enqueue(upd("n1", 5));
assert_eq!(q.len(), 1);
let out = q.take_for_message(10, 4);
assert_eq!(out.len(), 1);
assert_eq!(out[0].incarnation, Incarnation::new(5));
}
#[test]
fn take_caps_at_max() {
let q = DisseminationQueue::new();
q.enqueue(upd("n1", 0));
q.enqueue(upd("n2", 0));
q.enqueue(upd("n3", 0));
let out = q.take_for_message(2, 10);
assert_eq!(out.len(), 2);
}
#[test]
fn take_zero_max_returns_empty() {
let q = DisseminationQueue::new();
q.enqueue(upd("n1", 0));
let out = q.take_for_message(0, 4);
assert!(out.is_empty());
}
#[test]
fn entries_drop_after_fanout_threshold() {
let q = DisseminationQueue::new();
q.enqueue(upd("n1", 0));
let _ = q.take_for_message(1, 2);
assert_eq!(q.len(), 1);
let _ = q.take_for_message(1, 2);
assert_eq!(q.len(), 0);
}
#[test]
fn least_disseminated_wins() {
let q = DisseminationQueue::new();
q.enqueue(upd("a", 0));
let _ = q.take_for_message(1, 10);
let _ = q.take_for_message(1, 10);
q.enqueue(upd("b", 0));
let out = q.take_for_message(1, 10);
assert_eq!(out[0].node_id.as_str(), "b");
}
#[test]
fn fanout_threshold_formula() {
assert_eq!(DisseminationQueue::fanout_threshold(7, 3), 9);
assert_eq!(DisseminationQueue::fanout_threshold(1, 3), 3);
assert_eq!(DisseminationQueue::fanout_threshold(0, 3), 3);
}
#[test]
fn clear_empties_queue() {
let q = DisseminationQueue::new();
q.enqueue(upd("n1", 0));
q.enqueue(upd("n2", 0));
q.clear();
assert!(q.is_empty());
}
}