Skip to main content

tycho_core/overlay_client/
neighbour.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tycho_network::PeerId;
6use tycho_util::time::now_sec;
7
8#[derive(Clone)]
9#[repr(transparent)]
10pub struct Neighbour {
11    inner: Arc<Inner>,
12}
13
14impl Neighbour {
15    pub fn new(peer_id: PeerId, expires_at: u32, default_roundtrip: &Duration) -> Self {
16        Self {
17            inner: Arc::new(Inner {
18                peer_id,
19                expires_at,
20                stats: RwLock::new(TrackedStats::new(truncate_time(default_roundtrip))),
21            }),
22        }
23    }
24
25    #[inline]
26    pub fn peer_id(&self) -> &PeerId {
27        &self.inner.peer_id
28    }
29
30    #[inline]
31    pub fn expires_at_secs(&self) -> u32 {
32        self.inner.expires_at
33    }
34
35    pub fn get_stats(&self) -> NeighbourStats {
36        let stats = self.inner.stats.read();
37        NeighbourStats {
38            score: stats.score,
39            total_requests: stats.total,
40            failed_requests: stats.failed,
41            avg_roundtrip: stats
42                .roundtrip
43                .get_avg()
44                .map(|avg| Duration::from_millis(avg as u64)),
45            created: stats.created,
46        }
47    }
48
49    pub fn cmp_score(&self, other: &Neighbour) -> std::cmp::Ordering {
50        let own_stats = self.inner.stats.read().score;
51        let other_stats = other.inner.stats.read().score;
52        own_stats.cmp(&other_stats)
53    }
54
55    pub fn is_reliable(&self) -> bool {
56        self.inner.stats.read().higher_than_threshold()
57    }
58
59    pub fn compute_selection_score(&self) -> Option<u8> {
60        self.inner.stats.read().compute_selection_score()
61    }
62
63    pub fn get_roundtrip(&self) -> Option<Duration> {
64        let roundtrip = self.inner.stats.read().roundtrip.get_avg()?;
65        Some(Duration::from_millis(roundtrip as u64))
66    }
67
68    pub fn track_request(&self, roundtrip: &Duration, success: bool) {
69        let roundtrip = truncate_time(roundtrip);
70        self.inner.stats.write().update(roundtrip, success);
71    }
72
73    pub fn punish(&self, reason: PunishReason) {
74        self.inner.stats.write().punish(reason);
75    }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum PunishReason {
80    Dumb,
81    Slow,
82    Malicious,
83}
84
85impl PunishReason {
86    pub fn score(self) -> u8 {
87        match self {
88            Self::Dumb => 4,
89            Self::Slow => 8,
90            Self::Malicious => 128,
91        }
92    }
93}
94
95/// Neighbour request statistics.
96#[derive(Debug, Clone)]
97pub struct NeighbourStats {
98    /// Current reliability score.
99    pub score: u8,
100    /// Total number of requests to the neighbour.
101    pub total_requests: u64,
102    /// The number of failed requests to the neighbour.
103    pub failed_requests: u64,
104    /// Average roundtrip.
105    /// NONE if there were no requests to the neighbour.
106    pub avg_roundtrip: Option<Duration>,
107    /// Neighbour first appearance
108    pub created: u32,
109}
110
111struct Inner {
112    peer_id: PeerId,
113    expires_at: u32,
114    stats: parking_lot::RwLock<TrackedStats>,
115}
116
117struct TrackedStats {
118    score: u8,
119    total: u64,
120    failed: u64,
121    failed_requests_history: u64,
122    roundtrip: PackedSmaBuffer,
123    created: u32,
124}
125
126impl TrackedStats {
127    const MAX_SCORE: u8 = 128;
128    const SCORE_THRESHOLD: u8 = 16;
129    const INITIAL_SCORE: u8 = Self::MAX_SCORE / 2;
130
131    fn new(default_roundtrip_ms: u16) -> Self {
132        Self {
133            score: Self::INITIAL_SCORE,
134            total: 0,
135            failed: 0,
136            failed_requests_history: 0,
137            roundtrip: PackedSmaBuffer(default_roundtrip_ms as u64),
138            created: now_sec(),
139        }
140    }
141
142    fn higher_than_threshold(&self) -> bool {
143        self.score >= TrackedStats::SCORE_THRESHOLD
144    }
145
146    fn compute_selection_score(&self) -> Option<u8> {
147        const OK_ROUNDTRIP: u16 = 160; // ms
148        const MAX_ROUNDTRIP_BONUS: u8 = 16;
149        const ROUNDTRIP_BONUS_THRESHOLD: u8 = 120;
150
151        const MAX_FAILED_REQUESTS: u8 = 4;
152        const FAILURE_PENALTY: u8 = 16;
153
154        const FAILED_REQUESTS_MASK: u64 = (1 << MAX_FAILED_REQUESTS) - 1;
155
156        let mut score = self.score;
157        if self.failed_requests_history & FAILED_REQUESTS_MASK == FAILED_REQUESTS_MASK {
158            // Reduce the score if there were several sequential failures
159            score = score.saturating_sub(FAILURE_PENALTY);
160        } else if score >= ROUNDTRIP_BONUS_THRESHOLD {
161            // Try to compute a score bonus for neighbours with short roundtrip
162            if let Some(avg) = self.roundtrip.get_avg() {
163                let max = OK_ROUNDTRIP;
164                if let Some(inv_avg) = max.checked_sub(avg) {
165                    // Scale bonus
166                    let bonus = (inv_avg * MAX_ROUNDTRIP_BONUS as u16 / max) as u8;
167                    score = score.saturating_add(std::cmp::max(bonus, 1));
168                }
169            }
170        }
171
172        (score >= Self::SCORE_THRESHOLD).then_some(score)
173    }
174
175    fn update(&mut self, roundtrip: u16, success: bool) {
176        const SUCCESS_REQUEST_SCORE: u8 = 8;
177        const FAILED_REQUEST_PENALTY: u8 = 8;
178
179        self.failed_requests_history <<= 1;
180        if success {
181            self.score = std::cmp::min(
182                self.score.saturating_add(SUCCESS_REQUEST_SCORE),
183                Self::MAX_SCORE,
184            );
185        } else {
186            self.score = self.score.saturating_sub(FAILED_REQUEST_PENALTY);
187            self.failed += 1;
188            self.failed_requests_history |= 1;
189        }
190        self.total += 1;
191
192        let roundtrip_buffer = &mut self.roundtrip;
193        roundtrip_buffer.add(roundtrip);
194    }
195
196    fn punish(&mut self, reason: PunishReason) {
197        self.score = self.score.saturating_sub(reason.score());
198    }
199}
200
201#[repr(transparent)]
202struct PackedSmaBuffer(u64);
203
204impl PackedSmaBuffer {
205    fn add(&mut self, value: u16) {
206        self.0 <<= 16;
207        self.0 |= value as u64;
208    }
209
210    fn get_avg(&self) -> Option<u16> {
211        let mut storage = self.0;
212        let mut total = 0;
213        let mut i = 0;
214        while storage > 0 {
215            total += storage & 0xffff;
216            storage >>= 16;
217            i += 1;
218        }
219
220        if i == 0 {
221            None
222        } else {
223            Some((total / i) as u16)
224        }
225    }
226}
227
228fn truncate_time(roundtrip: &Duration) -> u16 {
229    std::cmp::min(roundtrip.as_millis() as u64, u16::MAX as u64) as u16
230}