Skip to main content

nodedb_cluster/swim/dissemination/
queue.rs

1//! `DisseminationQueue` — bounded, decaying piggyback buffer.
2//!
3//! The queue is a `HashMap<NodeId, PendingUpdate>` so that a fresher
4//! rumour about the same node **replaces** its stale predecessor in
5//! place — otherwise the queue would balloon with outdated tombstones.
6//!
7//! Emission picks the `max_piggyback` least-disseminated entries,
8//! increments their send counters, and drops any entry that has now
9//! reached the `lambda_log_n` fanout threshold.
10
11use std::collections::HashMap;
12use std::sync::Mutex;
13
14use nodedb_types::NodeId;
15
16use crate::swim::member::record::MemberUpdate;
17
18use super::entry::PendingUpdate;
19
20/// Bounded dissemination buffer keyed by `NodeId`.
21#[derive(Debug, Default)]
22pub struct DisseminationQueue {
23    inner: Mutex<HashMap<NodeId, PendingUpdate>>,
24}
25
26impl DisseminationQueue {
27    /// Fresh empty queue.
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    /// Insert or replace the entry for `update.node_id`. The send
33    /// counter resets to 0 so a fresh rumour is always gossiped anew.
34    pub fn enqueue(&self, update: MemberUpdate) {
35        let mut guard = self.inner.lock().expect("dissemination lock poisoned");
36        guard.insert(update.node_id.clone(), PendingUpdate::new(update));
37    }
38
39    /// Total number of rumours currently in the queue.
40    pub fn len(&self) -> usize {
41        self.inner
42            .lock()
43            .expect("dissemination lock poisoned")
44            .len()
45    }
46
47    /// True when the queue is empty.
48    pub fn is_empty(&self) -> bool {
49        self.len() == 0
50    }
51
52    /// Drop every rumour. Used by tests and on shutdown.
53    pub fn clear(&self) {
54        self.inner
55            .lock()
56            .expect("dissemination lock poisoned")
57            .clear();
58    }
59
60    /// Return up to `max` least-disseminated updates for a single
61    /// outgoing message. Increments each returned entry's `sent_count`
62    /// and drops entries whose new count has reached `lambda_log_n`.
63    ///
64    /// `lambda_log_n` is `ceil(lambda * log2(cluster_size + 1))` —
65    /// computed by the caller because it depends on the current
66    /// membership size and the [`super::super::config::SwimConfig`]
67    /// `fanout_lambda` knob.
68    pub fn take_for_message(&self, max: usize, lambda_log_n: u32) -> Vec<MemberUpdate> {
69        if max == 0 {
70            return Vec::new();
71        }
72        let mut guard = self.inner.lock().expect("dissemination lock poisoned");
73
74        // Sort a snapshot of keys by (sent_count, node_id) ascending;
75        // we can't use BinaryHeap directly without cloning because the
76        // values need to be mutated in place after the decision.
77        let mut keys: Vec<NodeId> = guard.keys().cloned().collect();
78        keys.sort_by(|a, b| {
79            let pa = &guard[a];
80            let pb = &guard[b];
81            pa.sent_count
82                .cmp(&pb.sent_count)
83                .then_with(|| a.as_str().cmp(b.as_str()))
84                .then_with(|| pa.update.incarnation.cmp(&pb.update.incarnation))
85        });
86        keys.truncate(max);
87
88        let mut out = Vec::with_capacity(keys.len());
89        for k in keys {
90            if let Some(pending) = guard.get_mut(&k) {
91                pending.record_sent();
92                out.push(pending.update.clone());
93                if pending.sent_count >= lambda_log_n {
94                    guard.remove(&k);
95                }
96            }
97        }
98        out
99    }
100
101    /// Compute `ceil(lambda * log2(cluster_size + 1))`. Exposed so the
102    /// runner can pass the result straight into [`take_for_message`].
103    pub fn fanout_threshold(cluster_size: usize, lambda: u32) -> u32 {
104        let n = (cluster_size + 1).max(2) as f64;
105        (lambda as f64 * n.log2()).ceil() as u32
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use crate::swim::incarnation::Incarnation;
113    use crate::swim::member::MemberState;
114
115    fn upd(id: &str, inc: u64) -> MemberUpdate {
116        MemberUpdate {
117            node_id: NodeId::new(id),
118            addr: "127.0.0.1:7000".to_string(),
119            state: MemberState::Alive,
120            incarnation: Incarnation::new(inc),
121        }
122    }
123
124    #[test]
125    fn enqueue_replaces_by_node_id() {
126        let q = DisseminationQueue::new();
127        q.enqueue(upd("n1", 1));
128        q.enqueue(upd("n1", 5));
129        assert_eq!(q.len(), 1);
130        // Taking it returns the latest incarnation.
131        let out = q.take_for_message(10, 4);
132        assert_eq!(out.len(), 1);
133        assert_eq!(out[0].incarnation, Incarnation::new(5));
134    }
135
136    #[test]
137    fn take_caps_at_max() {
138        let q = DisseminationQueue::new();
139        q.enqueue(upd("n1", 0));
140        q.enqueue(upd("n2", 0));
141        q.enqueue(upd("n3", 0));
142        let out = q.take_for_message(2, 10);
143        assert_eq!(out.len(), 2);
144    }
145
146    #[test]
147    fn take_zero_max_returns_empty() {
148        let q = DisseminationQueue::new();
149        q.enqueue(upd("n1", 0));
150        let out = q.take_for_message(0, 4);
151        assert!(out.is_empty());
152    }
153
154    #[test]
155    fn entries_drop_after_fanout_threshold() {
156        let q = DisseminationQueue::new();
157        q.enqueue(upd("n1", 0));
158        // threshold = 2 → second take should drain and drop.
159        let _ = q.take_for_message(1, 2);
160        assert_eq!(q.len(), 1);
161        let _ = q.take_for_message(1, 2);
162        assert_eq!(q.len(), 0);
163    }
164
165    #[test]
166    fn least_disseminated_wins() {
167        let q = DisseminationQueue::new();
168        q.enqueue(upd("a", 0));
169        // Drain "a" twice so its sent_count reaches 2.
170        let _ = q.take_for_message(1, 10);
171        let _ = q.take_for_message(1, 10);
172        // Now enqueue a fresh "b" with sent_count=0.
173        q.enqueue(upd("b", 0));
174        // Next take should pick "b" (count=0) over "a" (count=2).
175        let out = q.take_for_message(1, 10);
176        assert_eq!(out[0].node_id.as_str(), "b");
177    }
178
179    #[test]
180    fn fanout_threshold_formula() {
181        // 7-node cluster, lambda=3 → ceil(3 * log2(8)) = 9.
182        assert_eq!(DisseminationQueue::fanout_threshold(7, 3), 9);
183        // 1-node cluster, lambda=3 → ceil(3 * log2(2)) = 3.
184        assert_eq!(DisseminationQueue::fanout_threshold(1, 3), 3);
185        // 0-node cluster, lambda=3 → ceil(3 * log2(2)) = 3 (clamped).
186        assert_eq!(DisseminationQueue::fanout_threshold(0, 3), 3);
187    }
188
189    #[test]
190    fn clear_empties_queue() {
191        let q = DisseminationQueue::new();
192        q.enqueue(upd("n1", 0));
193        q.enqueue(upd("n2", 0));
194        q.clear();
195        assert!(q.is_empty());
196    }
197}