tycho_core/overlay_client/
neighbours.rs1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use parking_lot::Mutex;
5use rand::Rng;
6use rand::distr::uniform::{UniformInt, UniformSampler};
7use tokio::sync::Notify;
8use tycho_network::PeerId;
9use tycho_util::FastHashSet;
10
11use crate::overlay_client::neighbour::Neighbour;
12#[derive(Clone)]
13#[repr(transparent)]
14pub struct Neighbours {
15 inner: Arc<Inner>,
16}
17
18impl Neighbours {
19 pub fn new(entries: Vec<Neighbour>, max_neighbours: usize) -> Self {
20 let mut selection_index = SelectionIndex::new(max_neighbours);
21 selection_index.update(&entries);
22
23 Self {
24 inner: Arc::new(Inner {
25 max_neighbours,
26 entries: ArcSwap::new(Arc::new(entries)),
27 selection_index: Mutex::new(selection_index),
28 changed: Notify::new(),
29 }),
30 }
31 }
32
33 pub async fn wait_for_peers(&self, count: usize) {
34 loop {
35 let changed = self.inner.changed.notified();
36
37 if self.inner.entries.load().len() >= count {
38 break;
39 }
40
41 changed.await;
42 }
43 }
44
45 pub fn changed(&self) -> &Notify {
46 &self.inner.changed
47 }
48
49 pub fn choose(&self) -> Option<Neighbour> {
50 let selection_index = self.inner.selection_index.lock();
51 selection_index.choose(&mut rand::rng())
52 }
53
54 pub fn choose_multiple(&self, n: usize, neighbour_type: NeighbourType) -> Vec<Neighbour> {
55 let selection_index = self.inner.selection_index.lock();
56 selection_index.choose_multiple(&mut rand::rng(), n, neighbour_type)
57 }
58
59 pub fn try_apply_score(&self, now: u32) -> bool {
62 let entires_arc = self.inner.entries.load_full();
63
64 let mut entries = entires_arc.as_ref().clone();
65 let mut entries_changed = false;
66 entries.retain(|x| {
67 let retain = x.is_reliable() && x.expires_at_secs() > now;
68 entries_changed |= !retain;
69 retain
70 });
71 let new_entries_arc = Arc::new(entries);
72
73 let mut lock = self.inner.selection_index.lock();
74
75 {
76 let prev_entires_arc = self
77 .inner
78 .entries
79 .compare_and_swap(&entires_arc, new_entries_arc.clone());
80
81 if !Arc::ptr_eq(&prev_entires_arc, &entires_arc) {
82 return false;
84 }
85 }
86
87 lock.update(new_entries_arc.as_ref());
89
90 if entries_changed {
91 self.inner.changed.notify_waiters();
93 }
94
95 true
96 }
97
98 pub fn update_metrics(&self, local_id: &PeerId) {
100 let entries = self.get_active_neighbours();
101
102 let local_id = local_id.to_string();
103
104 for neighbour in entries.iter() {
105 let peer_id = neighbour.peer_id();
106 let stats = neighbour.get_stats();
107
108 let labels = [
109 ("local_id", local_id.clone()),
110 ("peer_id", peer_id.to_string()),
111 ];
112
113 metrics::gauge!("tycho_core_overlay_client_neighbour_score", &labels)
114 .set(stats.score as f64);
115
116 metrics::gauge!(
117 "tycho_core_overlay_client_neighbour_total_requests",
118 &labels
119 )
120 .set(stats.total_requests as f64);
121
122 metrics::gauge!(
123 "tycho_core_overlay_client_neighbour_failed_requests",
124 &labels
125 )
126 .set(stats.failed_requests as f64);
127 }
128 }
129
130 pub fn get_sorted_neighbours(&self) -> Vec<(Neighbour, u32)> {
131 let mut index = self.inner.selection_index.lock();
132 index
133 .indices_with_weights
134 .sort_by(|(_, lw), (_, rw)| rw.cmp(lw));
135 index.indices_with_weights.clone()
136 }
137
138 pub fn get_active_neighbours(&self) -> Arc<Vec<Neighbour>> {
139 self.inner.entries.load_full()
140 }
141
142 pub fn update(&self, new: Vec<Neighbour>) {
143 let now = tycho_util::time::now_sec();
144
145 let mut new_peer_ids = new
146 .iter()
147 .map(|neighbour| *neighbour.peer_id())
148 .collect::<FastHashSet<_>>();
149
150 let mut entries = self.inner.entries.load().as_slice().to_vec();
151
152 let mut changed = false;
154 entries.retain(|x| {
155 new_peer_ids.remove(x.peer_id());
158
159 let retain = x.is_reliable() && x.expires_at_secs() > now;
160 changed |= !retain;
161 retain
162 });
163
164 if entries.len() >= self.inner.max_neighbours
166 && let Some((worst_index, _)) = entries
167 .iter()
168 .enumerate()
169 .min_by(|(_, l), (_, r)| l.cmp_score(r))
170 {
171 entries.swap_remove(worst_index);
172 changed = true;
173 }
174
175 for neighbour in new {
176 if entries.len() >= self.inner.max_neighbours {
177 break;
178 }
179 if !new_peer_ids.contains(neighbour.peer_id()) {
180 continue;
181 }
182
183 entries.push(neighbour);
184 changed = true;
185 }
186
187 let new_entries_arc = Arc::new(entries);
188 let mut lock = self.inner.selection_index.lock();
189
190 self.inner.entries.store(new_entries_arc.clone());
192 lock.update(new_entries_arc.as_ref());
194
195 if changed {
196 self.inner.changed.notify_waiters();
198 }
199 }
200}
201
202struct Inner {
203 max_neighbours: usize,
204 entries: ArcSwap<Vec<Neighbour>>,
205 selection_index: Mutex<SelectionIndex>,
206 changed: Notify,
207}
208
209struct SelectionIndex {
210 indices_with_weights: Vec<(Neighbour, u32)>,
212 distribution: Option<UniformInt<u32>>,
214}
215
216impl SelectionIndex {
217 fn new(capacity: usize) -> Self {
218 Self {
219 indices_with_weights: Vec::with_capacity(capacity),
220 distribution: None,
221 }
222 }
223
224 fn update(&mut self, neighbours: &[Neighbour]) {
225 self.indices_with_weights.clear();
226 let mut total_weight = 0;
227 for neighbour in neighbours.iter() {
228 if let Some(score) = neighbour.compute_selection_score() {
229 total_weight += score as u32;
230 self.indices_with_weights
231 .push((neighbour.clone(), total_weight));
232 }
233 }
234
235 self.distribution = UniformInt::new(0, total_weight).ok();
236
237 }
239
240 fn choose<R: Rng + ?Sized>(&self, rng: &mut R) -> Option<Neighbour> {
241 let chosen_weight = self.distribution.as_ref()?.sample(rng);
242
243 let i = self
245 .indices_with_weights
246 .partition_point(|(_, w)| *w <= chosen_weight);
247
248 self.indices_with_weights
249 .get(i)
250 .map(|(neighbour, _)| neighbour)
251 .cloned()
252 }
253
254 fn choose_multiple<R: Rng + ?Sized>(
255 &self,
256 rng: &mut R,
257 mut n: usize,
258 neighbour_type: NeighbourType,
259 ) -> Vec<Neighbour> {
260 struct Element<'a> {
261 key: f64,
262 neighbour: &'a Neighbour,
263 }
264
265 impl Eq for Element<'_> {}
266 impl PartialEq for Element<'_> {
267 #[inline]
268 fn eq(&self, other: &Self) -> bool {
269 self.key == other.key
271 }
272 }
273
274 impl PartialOrd for Element<'_> {
275 #[inline]
276 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
277 Some(self.cmp(other))
278 }
279 }
280
281 impl Ord for Element<'_> {
282 #[inline]
283 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
284 self.key.partial_cmp(&other.key).unwrap()
286 }
287 }
288
289 n = std::cmp::min(n, self.indices_with_weights.len());
290 if n == 0 {
291 return Vec::new();
292 }
293
294 let mut candidates = Vec::with_capacity(self.indices_with_weights.len());
295 let mut prev_total_weight = None;
296 for (neighbour, total_weight) in &self.indices_with_weights {
297 let weight = match prev_total_weight {
298 Some(prev) => total_weight - prev,
299 None => *total_weight,
300 };
301 prev_total_weight = Some(*total_weight);
302
303 debug_assert!(weight > 0);
304
305 let key = rng.random::<f64>().powf(1.0 / weight as f64);
306 candidates.push(Element { key, neighbour });
307 }
308
309 let (_, mid, greater) = candidates.select_nth_unstable(self.indices_with_weights.len() - n);
314
315 let mut result = Vec::with_capacity(n);
316
317 let check_reliable = matches!(neighbour_type, NeighbourType::Reliable);
318 let is_valid = |neighbour: &Neighbour| !check_reliable || neighbour.is_reliable();
319
320 if is_valid(mid.neighbour) {
321 result.push(mid.neighbour.clone());
322 }
323
324 for element in greater {
325 if is_valid(element.neighbour) {
326 result.push(element.neighbour.clone());
327 }
328 }
329
330 result
331 }
332}
333
334#[derive(Eq, PartialEq)]
335pub enum NeighbourType {
336 All,
337 Reliable,
338}