nodedb_cluster/swim/dissemination/
queue.rs1use std::collections::HashMap;
12use std::sync::Mutex;
13
14use nodedb_types::NodeId;
15
16use crate::swim::member::record::MemberUpdate;
17
18use super::entry::PendingUpdate;
19
20#[derive(Debug, Default)]
22pub struct DisseminationQueue {
23 inner: Mutex<HashMap<NodeId, PendingUpdate>>,
24}
25
26impl DisseminationQueue {
27 pub fn new() -> Self {
29 Self::default()
30 }
31
32 pub fn enqueue(&self, update: MemberUpdate) {
35 let mut guard = self.inner.lock().expect("dissemination lock poisoned");
36 guard.insert(update.node_id.clone(), PendingUpdate::new(update));
37 }
38
39 pub fn len(&self) -> usize {
41 self.inner
42 .lock()
43 .expect("dissemination lock poisoned")
44 .len()
45 }
46
47 pub fn is_empty(&self) -> bool {
49 self.len() == 0
50 }
51
52 pub fn clear(&self) {
54 self.inner
55 .lock()
56 .expect("dissemination lock poisoned")
57 .clear();
58 }
59
60 pub fn take_for_message(&self, max: usize, lambda_log_n: u32) -> Vec<MemberUpdate> {
69 if max == 0 {
70 return Vec::new();
71 }
72 let mut guard = self.inner.lock().expect("dissemination lock poisoned");
73
74 let mut keys: Vec<NodeId> = guard.keys().cloned().collect();
78 keys.sort_by(|a, b| {
79 let pa = &guard[a];
80 let pb = &guard[b];
81 pa.sent_count
82 .cmp(&pb.sent_count)
83 .then_with(|| a.as_str().cmp(b.as_str()))
84 .then_with(|| pa.update.incarnation.cmp(&pb.update.incarnation))
85 });
86 keys.truncate(max);
87
88 let mut out = Vec::with_capacity(keys.len());
89 for k in keys {
90 if let Some(pending) = guard.get_mut(&k) {
91 pending.record_sent();
92 out.push(pending.update.clone());
93 if pending.sent_count >= lambda_log_n {
94 guard.remove(&k);
95 }
96 }
97 }
98 out
99 }
100
101 pub fn fanout_threshold(cluster_size: usize, lambda: u32) -> u32 {
104 let n = (cluster_size + 1).max(2) as f64;
105 (lambda as f64 * n.log2()).ceil() as u32
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use crate::swim::incarnation::Incarnation;
113 use crate::swim::member::MemberState;
114
115 fn upd(id: &str, inc: u64) -> MemberUpdate {
116 MemberUpdate {
117 node_id: NodeId::new(id),
118 addr: "127.0.0.1:7000".to_string(),
119 state: MemberState::Alive,
120 incarnation: Incarnation::new(inc),
121 }
122 }
123
124 #[test]
125 fn enqueue_replaces_by_node_id() {
126 let q = DisseminationQueue::new();
127 q.enqueue(upd("n1", 1));
128 q.enqueue(upd("n1", 5));
129 assert_eq!(q.len(), 1);
130 let out = q.take_for_message(10, 4);
132 assert_eq!(out.len(), 1);
133 assert_eq!(out[0].incarnation, Incarnation::new(5));
134 }
135
136 #[test]
137 fn take_caps_at_max() {
138 let q = DisseminationQueue::new();
139 q.enqueue(upd("n1", 0));
140 q.enqueue(upd("n2", 0));
141 q.enqueue(upd("n3", 0));
142 let out = q.take_for_message(2, 10);
143 assert_eq!(out.len(), 2);
144 }
145
146 #[test]
147 fn take_zero_max_returns_empty() {
148 let q = DisseminationQueue::new();
149 q.enqueue(upd("n1", 0));
150 let out = q.take_for_message(0, 4);
151 assert!(out.is_empty());
152 }
153
154 #[test]
155 fn entries_drop_after_fanout_threshold() {
156 let q = DisseminationQueue::new();
157 q.enqueue(upd("n1", 0));
158 let _ = q.take_for_message(1, 2);
160 assert_eq!(q.len(), 1);
161 let _ = q.take_for_message(1, 2);
162 assert_eq!(q.len(), 0);
163 }
164
165 #[test]
166 fn least_disseminated_wins() {
167 let q = DisseminationQueue::new();
168 q.enqueue(upd("a", 0));
169 let _ = q.take_for_message(1, 10);
171 let _ = q.take_for_message(1, 10);
172 q.enqueue(upd("b", 0));
174 let out = q.take_for_message(1, 10);
176 assert_eq!(out[0].node_id.as_str(), "b");
177 }
178
179 #[test]
180 fn fanout_threshold_formula() {
181 assert_eq!(DisseminationQueue::fanout_threshold(7, 3), 9);
183 assert_eq!(DisseminationQueue::fanout_threshold(1, 3), 3);
185 assert_eq!(DisseminationQueue::fanout_threshold(0, 3), 3);
187 }
188
189 #[test]
190 fn clear_empties_queue() {
191 let q = DisseminationQueue::new();
192 q.enqueue(upd("n1", 0));
193 q.enqueue(upd("n2", 0));
194 q.clear();
195 assert!(q.is_empty());
196 }
197}