tycho_core/overlay_client/
neighbour.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tycho_network::PeerId;
6use tycho_util::time::now_sec;
7
8#[derive(Clone)]
9#[repr(transparent)]
10pub struct Neighbour {
11 inner: Arc<Inner>,
12}
13
14impl Neighbour {
15 pub fn new(peer_id: PeerId, expires_at: u32, default_roundtrip: &Duration) -> Self {
16 Self {
17 inner: Arc::new(Inner {
18 peer_id,
19 expires_at,
20 stats: RwLock::new(TrackedStats::new(truncate_time(default_roundtrip))),
21 }),
22 }
23 }
24
25 #[inline]
26 pub fn peer_id(&self) -> &PeerId {
27 &self.inner.peer_id
28 }
29
30 #[inline]
31 pub fn expires_at_secs(&self) -> u32 {
32 self.inner.expires_at
33 }
34
35 pub fn get_stats(&self) -> NeighbourStats {
36 let stats = self.inner.stats.read();
37 NeighbourStats {
38 score: stats.score,
39 total_requests: stats.total,
40 failed_requests: stats.failed,
41 avg_roundtrip: stats
42 .roundtrip
43 .get_avg()
44 .map(|avg| Duration::from_millis(avg as u64)),
45 created: stats.created,
46 }
47 }
48
49 pub fn cmp_score(&self, other: &Neighbour) -> std::cmp::Ordering {
50 let own_stats = self.inner.stats.read().score;
51 let other_stats = other.inner.stats.read().score;
52 own_stats.cmp(&other_stats)
53 }
54
55 pub fn is_reliable(&self) -> bool {
56 self.inner.stats.read().higher_than_threshold()
57 }
58
59 pub fn compute_selection_score(&self) -> Option<u8> {
60 self.inner.stats.read().compute_selection_score()
61 }
62
63 pub fn get_roundtrip(&self) -> Option<Duration> {
64 let roundtrip = self.inner.stats.read().roundtrip.get_avg()?;
65 Some(Duration::from_millis(roundtrip as u64))
66 }
67
68 pub fn track_request(&self, roundtrip: &Duration, success: bool) {
69 let roundtrip = truncate_time(roundtrip);
70 self.inner.stats.write().update(roundtrip, success);
71 }
72
73 pub fn punish(&self, reason: PunishReason) {
74 self.inner.stats.write().punish(reason);
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum PunishReason {
80 Dumb,
81 Slow,
82 Malicious,
83}
84
85impl PunishReason {
86 pub fn score(self) -> u8 {
87 match self {
88 Self::Dumb => 4,
89 Self::Slow => 8,
90 Self::Malicious => 128,
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct NeighbourStats {
98 pub score: u8,
100 pub total_requests: u64,
102 pub failed_requests: u64,
104 pub avg_roundtrip: Option<Duration>,
107 pub created: u32,
109}
110
111struct Inner {
112 peer_id: PeerId,
113 expires_at: u32,
114 stats: parking_lot::RwLock<TrackedStats>,
115}
116
117struct TrackedStats {
118 score: u8,
119 total: u64,
120 failed: u64,
121 failed_requests_history: u64,
122 roundtrip: PackedSmaBuffer,
123 created: u32,
124}
125
126impl TrackedStats {
127 const MAX_SCORE: u8 = 128;
128 const SCORE_THRESHOLD: u8 = 16;
129 const INITIAL_SCORE: u8 = Self::MAX_SCORE / 2;
130
131 fn new(default_roundtrip_ms: u16) -> Self {
132 Self {
133 score: Self::INITIAL_SCORE,
134 total: 0,
135 failed: 0,
136 failed_requests_history: 0,
137 roundtrip: PackedSmaBuffer(default_roundtrip_ms as u64),
138 created: now_sec(),
139 }
140 }
141
142 fn higher_than_threshold(&self) -> bool {
143 self.score >= TrackedStats::SCORE_THRESHOLD
144 }
145
146 fn compute_selection_score(&self) -> Option<u8> {
147 const OK_ROUNDTRIP: u16 = 160; const MAX_ROUNDTRIP_BONUS: u8 = 16;
149 const ROUNDTRIP_BONUS_THRESHOLD: u8 = 120;
150
151 const MAX_FAILED_REQUESTS: u8 = 4;
152 const FAILURE_PENALTY: u8 = 16;
153
154 const FAILED_REQUESTS_MASK: u64 = (1 << MAX_FAILED_REQUESTS) - 1;
155
156 let mut score = self.score;
157 if self.failed_requests_history & FAILED_REQUESTS_MASK == FAILED_REQUESTS_MASK {
158 score = score.saturating_sub(FAILURE_PENALTY);
160 } else if score >= ROUNDTRIP_BONUS_THRESHOLD {
161 if let Some(avg) = self.roundtrip.get_avg() {
163 let max = OK_ROUNDTRIP;
164 if let Some(inv_avg) = max.checked_sub(avg) {
165 let bonus = (inv_avg * MAX_ROUNDTRIP_BONUS as u16 / max) as u8;
167 score = score.saturating_add(std::cmp::max(bonus, 1));
168 }
169 }
170 }
171
172 (score >= Self::SCORE_THRESHOLD).then_some(score)
173 }
174
175 fn update(&mut self, roundtrip: u16, success: bool) {
176 const SUCCESS_REQUEST_SCORE: u8 = 8;
177 const FAILED_REQUEST_PENALTY: u8 = 8;
178
179 self.failed_requests_history <<= 1;
180 if success {
181 self.score = std::cmp::min(
182 self.score.saturating_add(SUCCESS_REQUEST_SCORE),
183 Self::MAX_SCORE,
184 );
185 } else {
186 self.score = self.score.saturating_sub(FAILED_REQUEST_PENALTY);
187 self.failed += 1;
188 self.failed_requests_history |= 1;
189 }
190 self.total += 1;
191
192 let roundtrip_buffer = &mut self.roundtrip;
193 roundtrip_buffer.add(roundtrip);
194 }
195
196 fn punish(&mut self, reason: PunishReason) {
197 self.score = self.score.saturating_sub(reason.score());
198 }
199}
200
201#[repr(transparent)]
202struct PackedSmaBuffer(u64);
203
204impl PackedSmaBuffer {
205 fn add(&mut self, value: u16) {
206 self.0 <<= 16;
207 self.0 |= value as u64;
208 }
209
210 fn get_avg(&self) -> Option<u16> {
211 let mut storage = self.0;
212 let mut total = 0;
213 let mut i = 0;
214 while storage > 0 {
215 total += storage & 0xffff;
216 storage >>= 16;
217 i += 1;
218 }
219
220 if i == 0 {
221 None
222 } else {
223 Some((total / i) as u16)
224 }
225 }
226}
227
228fn truncate_time(roundtrip: &Duration) -> u16 {
229 std::cmp::min(roundtrip.as_millis() as u64, u16::MAX as u64) as u16
230}