nodedb_cluster/swim/dissemination/
queue.rs1use std::collections::HashMap;
14use std::sync::Mutex;
15
16use nodedb_types::NodeId;
17
18use crate::swim::member::record::MemberUpdate;
19
20use super::entry::PendingUpdate;
21
22#[derive(Debug, Default)]
24pub struct DisseminationQueue {
25 inner: Mutex<HashMap<NodeId, PendingUpdate>>,
26}
27
28impl DisseminationQueue {
29 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn enqueue(&self, update: MemberUpdate) {
37 let mut guard = self.inner.lock().expect("dissemination lock poisoned");
38 guard.insert(update.node_id.clone(), PendingUpdate::new(update));
39 }
40
41 pub fn len(&self) -> usize {
43 self.inner
44 .lock()
45 .expect("dissemination lock poisoned")
46 .len()
47 }
48
49 pub fn is_empty(&self) -> bool {
51 self.len() == 0
52 }
53
54 pub fn clear(&self) {
56 self.inner
57 .lock()
58 .expect("dissemination lock poisoned")
59 .clear();
60 }
61
62 pub fn take_for_message(&self, max: usize, lambda_log_n: u32) -> Vec<MemberUpdate> {
71 if max == 0 {
72 return Vec::new();
73 }
74 let mut guard = self.inner.lock().expect("dissemination lock poisoned");
75
76 let mut keys: Vec<NodeId> = guard.keys().cloned().collect();
80 keys.sort_by(|a, b| {
81 let pa = &guard[a];
82 let pb = &guard[b];
83 pa.sent_count
84 .cmp(&pb.sent_count)
85 .then_with(|| a.as_str().cmp(b.as_str()))
86 .then_with(|| pa.update.incarnation.cmp(&pb.update.incarnation))
87 });
88 keys.truncate(max);
89
90 let mut out = Vec::with_capacity(keys.len());
91 for k in keys {
92 if let Some(pending) = guard.get_mut(&k) {
93 pending.record_sent();
94 out.push(pending.update.clone());
95 if pending.sent_count >= lambda_log_n {
96 guard.remove(&k);
97 }
98 }
99 }
100 out
101 }
102
103 pub fn fanout_threshold(cluster_size: usize, lambda: u32) -> u32 {
106 let n = (cluster_size + 1).max(2) as f64;
107 (lambda as f64 * n.log2()).ceil() as u32
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use crate::swim::incarnation::Incarnation;
115 use crate::swim::member::MemberState;
116
117 fn upd(id: &str, inc: u64) -> MemberUpdate {
118 MemberUpdate {
119 node_id: NodeId::try_new(id).expect("test fixture"),
120 addr: "127.0.0.1:7000".to_string(),
121 state: MemberState::Alive,
122 incarnation: Incarnation::new(inc),
123 }
124 }
125
126 #[test]
127 fn enqueue_replaces_by_node_id() {
128 let q = DisseminationQueue::new();
129 q.enqueue(upd("n1", 1));
130 q.enqueue(upd("n1", 5));
131 assert_eq!(q.len(), 1);
132 let out = q.take_for_message(10, 4);
134 assert_eq!(out.len(), 1);
135 assert_eq!(out[0].incarnation, Incarnation::new(5));
136 }
137
138 #[test]
139 fn take_caps_at_max() {
140 let q = DisseminationQueue::new();
141 q.enqueue(upd("n1", 0));
142 q.enqueue(upd("n2", 0));
143 q.enqueue(upd("n3", 0));
144 let out = q.take_for_message(2, 10);
145 assert_eq!(out.len(), 2);
146 }
147
148 #[test]
149 fn take_zero_max_returns_empty() {
150 let q = DisseminationQueue::new();
151 q.enqueue(upd("n1", 0));
152 let out = q.take_for_message(0, 4);
153 assert!(out.is_empty());
154 }
155
156 #[test]
157 fn entries_drop_after_fanout_threshold() {
158 let q = DisseminationQueue::new();
159 q.enqueue(upd("n1", 0));
160 let _ = q.take_for_message(1, 2);
162 assert_eq!(q.len(), 1);
163 let _ = q.take_for_message(1, 2);
164 assert_eq!(q.len(), 0);
165 }
166
167 #[test]
168 fn least_disseminated_wins() {
169 let q = DisseminationQueue::new();
170 q.enqueue(upd("a", 0));
171 let _ = q.take_for_message(1, 10);
173 let _ = q.take_for_message(1, 10);
174 q.enqueue(upd("b", 0));
176 let out = q.take_for_message(1, 10);
178 assert_eq!(out[0].node_id.as_str(), "b");
179 }
180
181 #[test]
182 fn fanout_threshold_formula() {
183 assert_eq!(DisseminationQueue::fanout_threshold(7, 3), 9);
185 assert_eq!(DisseminationQueue::fanout_threshold(1, 3), 3);
187 assert_eq!(DisseminationQueue::fanout_threshold(0, 3), 3);
189 }
190
191 #[test]
192 fn clear_empties_queue() {
193 let q = DisseminationQueue::new();
194 q.enqueue(upd("n1", 0));
195 q.enqueue(upd("n2", 0));
196 q.clear();
197 assert!(q.is_empty());
198 }
199}