Skip to main content

tycho_core/overlay_client/
neighbours.rs

1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use parking_lot::Mutex;
5use rand::Rng;
6use rand::distr::uniform::{UniformInt, UniformSampler};
7use tokio::sync::Notify;
8use tycho_network::PeerId;
9use tycho_util::FastHashSet;
10
11use crate::overlay_client::neighbour::Neighbour;
12#[derive(Clone)]
13#[repr(transparent)]
14pub struct Neighbours {
15    inner: Arc<Inner>,
16}
17
18impl Neighbours {
19    pub fn new(entries: Vec<Neighbour>, max_neighbours: usize) -> Self {
20        let mut selection_index = SelectionIndex::new(max_neighbours);
21        selection_index.update(&entries);
22
23        Self {
24            inner: Arc::new(Inner {
25                max_neighbours,
26                entries: ArcSwap::new(Arc::new(entries)),
27                selection_index: Mutex::new(selection_index),
28                changed: Notify::new(),
29            }),
30        }
31    }
32
33    pub async fn wait_for_peers(&self, count: usize) {
34        loop {
35            let changed = self.inner.changed.notified();
36
37            if self.inner.entries.load().len() >= count {
38                break;
39            }
40
41            changed.await;
42        }
43    }
44
45    pub fn changed(&self) -> &Notify {
46        &self.inner.changed
47    }
48
49    pub fn choose(&self) -> Option<Neighbour> {
50        let selection_index = self.inner.selection_index.lock();
51        selection_index.choose(&mut rand::rng())
52    }
53
54    pub fn choose_multiple(&self, n: usize, neighbour_type: NeighbourType) -> Vec<Neighbour> {
55        let selection_index = self.inner.selection_index.lock();
56        selection_index.choose_multiple(&mut rand::rng(), n, neighbour_type)
57    }
58
59    /// Tries to apply neighbours score to selection index.
60    /// Returns `true` if new values were stored. Did nothing otherwise.
61    pub fn try_apply_score(&self, now: u32) -> bool {
62        let entires_arc = self.inner.entries.load_full();
63
64        let mut entries = entires_arc.as_ref().clone();
65        let mut entries_changed = false;
66        entries.retain(|x| {
67            let retain = x.is_reliable() && x.expires_at_secs() > now;
68            entries_changed |= !retain;
69            retain
70        });
71        let new_entries_arc = Arc::new(entries);
72
73        let mut lock = self.inner.selection_index.lock();
74
75        {
76            let prev_entires_arc = self
77                .inner
78                .entries
79                .compare_and_swap(&entires_arc, new_entries_arc.clone());
80
81            if !Arc::ptr_eq(&prev_entires_arc, &entires_arc) {
82                // Entires were already changed, we must not overwrite the index
83                return false;
84            }
85        }
86
87        // Recompute distribution
88        lock.update(new_entries_arc.as_ref());
89
90        if entries_changed {
91            // Notify waiters if some peers were removed
92            self.inner.changed.notify_waiters();
93        }
94
95        true
96    }
97
98    /// Update neighbours metrics.
99    pub fn update_metrics(&self, local_id: &PeerId) {
100        let entries = self.get_active_neighbours();
101
102        let local_id = local_id.to_string();
103
104        for neighbour in entries.iter() {
105            let peer_id = neighbour.peer_id();
106            let stats = neighbour.get_stats();
107
108            let labels = [
109                ("local_id", local_id.clone()),
110                ("peer_id", peer_id.to_string()),
111            ];
112
113            metrics::gauge!("tycho_core_overlay_client_neighbour_score", &labels)
114                .set(stats.score as f64);
115
116            metrics::gauge!(
117                "tycho_core_overlay_client_neighbour_total_requests",
118                &labels
119            )
120            .set(stats.total_requests as f64);
121
122            metrics::gauge!(
123                "tycho_core_overlay_client_neighbour_failed_requests",
124                &labels
125            )
126            .set(stats.failed_requests as f64);
127        }
128    }
129
130    pub fn get_sorted_neighbours(&self) -> Vec<(Neighbour, u32)> {
131        let mut index = self.inner.selection_index.lock();
132        index
133            .indices_with_weights
134            .sort_by(|(_, lw), (_, rw)| rw.cmp(lw));
135        index.indices_with_weights.clone()
136    }
137
138    pub fn get_active_neighbours(&self) -> Arc<Vec<Neighbour>> {
139        self.inner.entries.load_full()
140    }
141
142    pub fn update(&self, new: Vec<Neighbour>) {
143        let now = tycho_util::time::now_sec();
144
145        let mut new_peer_ids = new
146            .iter()
147            .map(|neighbour| *neighbour.peer_id())
148            .collect::<FastHashSet<_>>();
149
150        let mut entries = self.inner.entries.load().as_slice().to_vec();
151
152        // Remove unreliable and expired neighbours.
153        let mut changed = false;
154        entries.retain(|x| {
155            // Remove the existing peer from the `new_peers` list to prevent it
156            // from appearing in the same list again (especially if it was unreliable).
157            new_peer_ids.remove(x.peer_id());
158
159            let retain = x.is_reliable() && x.expires_at_secs() > now;
160            changed |= !retain;
161            retain
162        });
163
164        // If all neighbours are reliable and valid then remove the worst
165        if entries.len() >= self.inner.max_neighbours
166            && let Some((worst_index, _)) = entries
167                .iter()
168                .enumerate()
169                .min_by(|(_, l), (_, r)| l.cmp_score(r))
170        {
171            entries.swap_remove(worst_index);
172            changed = true;
173        }
174
175        for neighbour in new {
176            if entries.len() >= self.inner.max_neighbours {
177                break;
178            }
179            if !new_peer_ids.contains(neighbour.peer_id()) {
180                continue;
181            }
182
183            entries.push(neighbour);
184            changed = true;
185        }
186
187        let new_entries_arc = Arc::new(entries);
188        let mut lock = self.inner.selection_index.lock();
189
190        // Overwrite current entries
191        self.inner.entries.store(new_entries_arc.clone());
192        // Recompute distribution
193        lock.update(new_entries_arc.as_ref());
194
195        if changed {
196            // Notify waiter if some peers were added or removed
197            self.inner.changed.notify_waiters();
198        }
199    }
200}
201
202struct Inner {
203    max_neighbours: usize,
204    entries: ArcSwap<Vec<Neighbour>>,
205    selection_index: Mutex<SelectionIndex>,
206    changed: Notify,
207}
208
209struct SelectionIndex {
210    /// Neighbour indices with cumulative weight.
211    indices_with_weights: Vec<(Neighbour, u32)>,
212    /// Optional uniform distribution `[0; total_weight)`.
213    distribution: Option<UniformInt<u32>>,
214}
215
216impl SelectionIndex {
217    fn new(capacity: usize) -> Self {
218        Self {
219            indices_with_weights: Vec::with_capacity(capacity),
220            distribution: None,
221        }
222    }
223
224    fn update(&mut self, neighbours: &[Neighbour]) {
225        self.indices_with_weights.clear();
226        let mut total_weight = 0;
227        for neighbour in neighbours.iter() {
228            if let Some(score) = neighbour.compute_selection_score() {
229                total_weight += score as u32;
230                self.indices_with_weights
231                    .push((neighbour.clone(), total_weight));
232            }
233        }
234
235        self.distribution = UniformInt::new(0, total_weight).ok();
236
237        // TODO: fallback to uniform sample from any neighbour
238    }
239
240    fn choose<R: Rng + ?Sized>(&self, rng: &mut R) -> Option<Neighbour> {
241        let chosen_weight = self.distribution.as_ref()?.sample(rng);
242
243        // Find the first item which has a weight higher than the chosen weight.
244        let i = self
245            .indices_with_weights
246            .partition_point(|(_, w)| *w <= chosen_weight);
247
248        self.indices_with_weights
249            .get(i)
250            .map(|(neighbour, _)| neighbour)
251            .cloned()
252    }
253
254    fn choose_multiple<R: Rng + ?Sized>(
255        &self,
256        rng: &mut R,
257        mut n: usize,
258        neighbour_type: NeighbourType,
259    ) -> Vec<Neighbour> {
260        struct Element<'a> {
261            key: f64,
262            neighbour: &'a Neighbour,
263        }
264
265        impl Eq for Element<'_> {}
266        impl PartialEq for Element<'_> {
267            #[inline]
268            fn eq(&self, other: &Self) -> bool {
269                // Bitwise comparison is safe
270                self.key == other.key
271            }
272        }
273
274        impl PartialOrd for Element<'_> {
275            #[inline]
276            fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
277                Some(self.cmp(other))
278            }
279        }
280
281        impl Ord for Element<'_> {
282            #[inline]
283            fn cmp(&self, other: &Self) -> std::cmp::Ordering {
284                // Weight must not be nan
285                self.key.partial_cmp(&other.key).unwrap()
286            }
287        }
288
289        n = std::cmp::min(n, self.indices_with_weights.len());
290        if n == 0 {
291            return Vec::new();
292        }
293
294        let mut candidates = Vec::with_capacity(self.indices_with_weights.len());
295        let mut prev_total_weight = None;
296        for (neighbour, total_weight) in &self.indices_with_weights {
297            let weight = match prev_total_weight {
298                Some(prev) => total_weight - prev,
299                None => *total_weight,
300            };
301            prev_total_weight = Some(*total_weight);
302
303            debug_assert!(weight > 0);
304
305            let key = rng.random::<f64>().powf(1.0 / weight as f64);
306            candidates.push(Element { key, neighbour });
307        }
308
309        // Partially sort the array to find the `n` elements with the greatest
310        // keys. Do this by using `select_nth_unstable` to put the elements with
311        // the *smallest* keys at the beginning of the list in `O(n)` time, which
312        // provides equivalent information about the elements with the *greatest* keys.
313        let (_, mid, greater) = candidates.select_nth_unstable(self.indices_with_weights.len() - n);
314
315        let mut result = Vec::with_capacity(n);
316
317        let check_reliable = matches!(neighbour_type, NeighbourType::Reliable);
318        let is_valid = |neighbour: &Neighbour| !check_reliable || neighbour.is_reliable();
319
320        if is_valid(mid.neighbour) {
321            result.push(mid.neighbour.clone());
322        }
323
324        for element in greater {
325            if is_valid(element.neighbour) {
326                result.push(element.neighbour.clone());
327            }
328        }
329
330        result
331    }
332}
333
334#[derive(Eq, PartialEq)]
335pub enum NeighbourType {
336    All,
337    Reliable,
338}