Skip to main content

nodedb_cluster/swim/dissemination/
queue.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `DisseminationQueue` — bounded, decaying piggyback buffer.
4//!
5//! The queue is a `HashMap<NodeId, PendingUpdate>` so that a fresher
6//! rumour about the same node **replaces** its stale predecessor in
7//! place — otherwise the queue would balloon with outdated tombstones.
8//!
9//! Emission picks the `max_piggyback` least-disseminated entries,
10//! increments their send counters, and drops any entry that has now
11//! reached the `lambda_log_n` fanout threshold.
12
13use std::collections::HashMap;
14use std::sync::Mutex;
15
16use nodedb_types::NodeId;
17
18use crate::swim::member::record::MemberUpdate;
19
20use super::entry::PendingUpdate;
21
22/// Bounded dissemination buffer keyed by `NodeId`.
23#[derive(Debug, Default)]
24pub struct DisseminationQueue {
25    inner: Mutex<HashMap<NodeId, PendingUpdate>>,
26}
27
28impl DisseminationQueue {
29    /// Fresh empty queue.
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Insert or replace the entry for `update.node_id`. The send
35    /// counter resets to 0 so a fresh rumour is always gossiped anew.
36    pub fn enqueue(&self, update: MemberUpdate) {
37        let mut guard = self.inner.lock().expect("dissemination lock poisoned");
38        guard.insert(update.node_id.clone(), PendingUpdate::new(update));
39    }
40
41    /// Total number of rumours currently in the queue.
42    pub fn len(&self) -> usize {
43        self.inner
44            .lock()
45            .expect("dissemination lock poisoned")
46            .len()
47    }
48
49    /// True when the queue is empty.
50    pub fn is_empty(&self) -> bool {
51        self.len() == 0
52    }
53
54    /// Drop every rumour. Used by tests and on shutdown.
55    pub fn clear(&self) {
56        self.inner
57            .lock()
58            .expect("dissemination lock poisoned")
59            .clear();
60    }
61
62    /// Return up to `max` least-disseminated updates for a single
63    /// outgoing message. Increments each returned entry's `sent_count`
64    /// and drops entries whose new count has reached `lambda_log_n`.
65    ///
66    /// `lambda_log_n` is `ceil(lambda * log2(cluster_size + 1))` —
67    /// computed by the caller because it depends on the current
68    /// membership size and the [`super::super::config::SwimConfig`]
69    /// `fanout_lambda` knob.
70    pub fn take_for_message(&self, max: usize, lambda_log_n: u32) -> Vec<MemberUpdate> {
71        if max == 0 {
72            return Vec::new();
73        }
74        let mut guard = self.inner.lock().expect("dissemination lock poisoned");
75
76        // Sort a snapshot of keys by (sent_count, node_id) ascending;
77        // we can't use BinaryHeap directly without cloning because the
78        // values need to be mutated in place after the decision.
79        let mut keys: Vec<NodeId> = guard.keys().cloned().collect();
80        keys.sort_by(|a, b| {
81            let pa = &guard[a];
82            let pb = &guard[b];
83            pa.sent_count
84                .cmp(&pb.sent_count)
85                .then_with(|| a.as_str().cmp(b.as_str()))
86                .then_with(|| pa.update.incarnation.cmp(&pb.update.incarnation))
87        });
88        keys.truncate(max);
89
90        let mut out = Vec::with_capacity(keys.len());
91        for k in keys {
92            if let Some(pending) = guard.get_mut(&k) {
93                pending.record_sent();
94                out.push(pending.update.clone());
95                if pending.sent_count >= lambda_log_n {
96                    guard.remove(&k);
97                }
98            }
99        }
100        out
101    }
102
103    /// Compute `ceil(lambda * log2(cluster_size + 1))`. Exposed so the
104    /// runner can pass the result straight into [`take_for_message`].
105    pub fn fanout_threshold(cluster_size: usize, lambda: u32) -> u32 {
106        let n = (cluster_size + 1).max(2) as f64;
107        (lambda as f64 * n.log2()).ceil() as u32
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use crate::swim::incarnation::Incarnation;
115    use crate::swim::member::MemberState;
116
117    fn upd(id: &str, inc: u64) -> MemberUpdate {
118        MemberUpdate {
119            node_id: NodeId::try_new(id).expect("test fixture"),
120            addr: "127.0.0.1:7000".to_string(),
121            state: MemberState::Alive,
122            incarnation: Incarnation::new(inc),
123        }
124    }
125
126    #[test]
127    fn enqueue_replaces_by_node_id() {
128        let q = DisseminationQueue::new();
129        q.enqueue(upd("n1", 1));
130        q.enqueue(upd("n1", 5));
131        assert_eq!(q.len(), 1);
132        // Taking it returns the latest incarnation.
133        let out = q.take_for_message(10, 4);
134        assert_eq!(out.len(), 1);
135        assert_eq!(out[0].incarnation, Incarnation::new(5));
136    }
137
138    #[test]
139    fn take_caps_at_max() {
140        let q = DisseminationQueue::new();
141        q.enqueue(upd("n1", 0));
142        q.enqueue(upd("n2", 0));
143        q.enqueue(upd("n3", 0));
144        let out = q.take_for_message(2, 10);
145        assert_eq!(out.len(), 2);
146    }
147
148    #[test]
149    fn take_zero_max_returns_empty() {
150        let q = DisseminationQueue::new();
151        q.enqueue(upd("n1", 0));
152        let out = q.take_for_message(0, 4);
153        assert!(out.is_empty());
154    }
155
156    #[test]
157    fn entries_drop_after_fanout_threshold() {
158        let q = DisseminationQueue::new();
159        q.enqueue(upd("n1", 0));
160        // threshold = 2 → second take should drain and drop.
161        let _ = q.take_for_message(1, 2);
162        assert_eq!(q.len(), 1);
163        let _ = q.take_for_message(1, 2);
164        assert_eq!(q.len(), 0);
165    }
166
167    #[test]
168    fn least_disseminated_wins() {
169        let q = DisseminationQueue::new();
170        q.enqueue(upd("a", 0));
171        // Drain "a" twice so its sent_count reaches 2.
172        let _ = q.take_for_message(1, 10);
173        let _ = q.take_for_message(1, 10);
174        // Now enqueue a fresh "b" with sent_count=0.
175        q.enqueue(upd("b", 0));
176        // Next take should pick "b" (count=0) over "a" (count=2).
177        let out = q.take_for_message(1, 10);
178        assert_eq!(out[0].node_id.as_str(), "b");
179    }
180
181    #[test]
182    fn fanout_threshold_formula() {
183        // 7-node cluster, lambda=3 → ceil(3 * log2(8)) = 9.
184        assert_eq!(DisseminationQueue::fanout_threshold(7, 3), 9);
185        // 1-node cluster, lambda=3 → ceil(3 * log2(2)) = 3.
186        assert_eq!(DisseminationQueue::fanout_threshold(1, 3), 3);
187        // 0-node cluster, lambda=3 → ceil(3 * log2(2)) = 3 (clamped).
188        assert_eq!(DisseminationQueue::fanout_threshold(0, 3), 3);
189    }
190
191    #[test]
192    fn clear_empties_queue() {
193        let q = DisseminationQueue::new();
194        q.enqueue(upd("n1", 0));
195        q.enqueue(upd("n2", 0));
196        q.clear();
197        assert!(q.is_empty());
198    }
199}