shadowsocks_service/local/loadbalancing/
server_stat.rs

1//! Server latency statistic
2
3use std::{
4    collections::VecDeque,
5    time::{Duration, Instant},
6};
7
8/// Interval between each check
9pub const DEFAULT_CHECK_INTERVAL_SEC: u64 = 10;
10/// Timeout of each check
11pub const DEFAULT_CHECK_TIMEOUT_SEC: u64 = 5; // A common connection timeout of 5 seconds.
12
13/// Statistic score
14#[derive(Debug, Copy, Clone)]
15pub enum Score {
16    /// Unified latency
17    Latency(u32),
18    /// Request error
19    Errored,
20}
21
22/// Server statistic data
23#[derive(Debug, Clone, Copy)]
24pub struct ServerStatData {
25    /// Median of latency time (in millisec)
26    ///
27    /// Use median instead of average time,
28    /// because probing result may have some really bad cases
29    pub latency_median: u32,
30    /// Total_Fail / Total_Probe
31    pub fail_rate: f64,
32    /// Score's standard deviation
33    pub latency_stdev: f64,
34    /// Score's average
35    pub latency_mean: f64,
36    /// Score's median absolute deviation
37    pub latency_mad: u32,
38}
39
40/// Statistic of a remote server
41#[derive(Debug)]
42pub struct ServerStat {
43    /// MAX server's RTT, normally the check timeout milliseconds
44    max_server_rtt: u32,
45    /// Recently probe data
46    latency_queue: VecDeque<(Score, Instant)>,
47    /// Score's standard deviation MAX
48    max_latency_stdev: f64,
49    /// User's customized weight
50    user_weight: f32,
51    /// Checking window size
52    check_window: Duration,
53    /// Statistic Data
54    data: ServerStatData,
55}
56
57fn max_latency_stdev(max_server_rtt: u32) -> f64 {
58    let mrtt = max_server_rtt as f64;
59    let avg = (0.0 + mrtt) / 2.0;
60    let diff1 = (0.0 - avg) * (0.0 - avg);
61    let diff2 = (mrtt - avg) * (mrtt - avg);
62    // (1.0 / (2.0 - 1.0)) * (diff1 + diff2).sqrt()
63    (diff1 + diff2).sqrt()
64}
65
66impl ServerStat {
67    pub fn new(user_weight: f32, max_server_rtt: u32, check_window: Duration) -> Self {
68        assert!((0.0..=1.0).contains(&user_weight));
69
70        let max_latency_stdev = max_latency_stdev(max_server_rtt);
71        Self {
72            max_server_rtt,
73            latency_queue: VecDeque::new(),
74            max_latency_stdev,
75            user_weight,
76            check_window,
77            data: ServerStatData {
78                latency_median: max_server_rtt,
79                fail_rate: 1.0,
80                latency_stdev: max_latency_stdev,
81                latency_mean: max_server_rtt as f64,
82                latency_mad: max_server_rtt,
83            },
84        }
85    }
86
87    fn score(&self) -> u32 {
88        // Normalize rtt
89        let nrtt = self.data.latency_median as f64 / self.max_server_rtt as f64;
90
91        // Normalize stdev
92        // let nstdev = self.data.latency_stdev / self.max_latency_stdev;
93        // Mormalize mad
94        let nmad = self.data.latency_mad as f64 / self.max_server_rtt as f64;
95
96        const SCORE_RTT_WEIGHT: f64 = 1.0;
97        const SCORE_FAIL_WEIGHT: f64 = 3.0;
98        // const SCORE_STDEV_WEIGHT: f64 = 0.0;
99        const SCORE_MAD_WEIGHT: f64 = 1.0;
100
101        // [EPSILON, 1]
102        // Just for avoiding divide by 0
103        let user_weight = self.user_weight.max(f32::EPSILON);
104
105        // Score = (norm_lat * 1.0 + prop_err * 3.0 + (stdev || mad) * 1.0) / 5.0 / user_weight
106        //
107        // 1. The lower latency, the better
108        // 2. The lower errored count, the better
109        // 3. The lower latency's stdev / mad, the better
110        // 4. The higher user's weight, the better
111        let score = (nrtt * SCORE_RTT_WEIGHT + self.data.fail_rate * SCORE_FAIL_WEIGHT + nmad * SCORE_MAD_WEIGHT)
112            / (SCORE_RTT_WEIGHT + SCORE_FAIL_WEIGHT + SCORE_MAD_WEIGHT)
113            / user_weight as f64;
114
115        // Times 10000 converts to u32, for 0.0001 precision
116        (score * 10000.0) as u32
117    }
118
119    pub fn push_score(&mut self, score: Score) -> u32 {
120        let now = Instant::now();
121
122        self.latency_queue.push_back((score, now));
123
124        // Removes stats that are not in the check window
125        while let Some((_, inst)) = self.latency_queue.front() {
126            if now - *inst > self.check_window {
127                self.latency_queue.pop_front();
128            } else {
129                break;
130            }
131        }
132
133        self.recalculate_score()
134    }
135
136    fn recalculate_score(&mut self) -> u32 {
137        if self.latency_queue.is_empty() {
138            return self.score();
139        }
140
141        let mut vlat = Vec::with_capacity(self.latency_queue.len());
142        let mut cerr = 0;
143        for (s, _) in &self.latency_queue {
144            match *s {
145                Score::Errored => cerr += 1,
146                Score::Latency(lat) => vlat.push(lat),
147            }
148        }
149
150        // Error rate
151        self.data.fail_rate = cerr as f64 / self.latency_queue.len() as f64;
152
153        self.data.latency_median = self.max_server_rtt;
154        self.data.latency_stdev = self.max_latency_stdev;
155        self.data.latency_mean = self.max_server_rtt as f64;
156        self.data.latency_mad = self.max_server_rtt;
157
158        if !vlat.is_empty() {
159            vlat.sort_unstable();
160
161            // Find median of latency
162            let mid = vlat.len() / 2;
163
164            self.data.latency_median = if vlat.len() % 2 == 0 {
165                (vlat[mid] + vlat[mid - 1]) / 2
166            } else {
167                vlat[mid]
168            };
169
170            if vlat.len() > 1 {
171                let n = vlat.len() as f64;
172
173                // mean
174                let total_lat: u32 = vlat.iter().sum();
175                self.data.latency_mean = total_lat as f64 / n;
176
177                // STDEV
178                let acc_mean_diff_square: f64 = vlat
179                    .iter()
180                    .map(|s| {
181                        let diff = *s as f64 - self.data.latency_mean;
182                        diff * diff
183                    })
184                    .sum();
185                // Corrected Sample Standard Deviation
186                self.data.latency_stdev = (acc_mean_diff_square / (n - 1.0)).sqrt();
187
188                // MAD
189                let mut vlat_abs_diff: Vec<u32> = vlat
190                    .iter()
191                    .map(|s| (*s as i32 - self.data.latency_median as i32).unsigned_abs())
192                    .collect();
193                vlat_abs_diff.sort_unstable();
194
195                let abs_diff_median_mid = vlat_abs_diff.len() / 2;
196                self.data.latency_mad = if vlat_abs_diff.len().is_multiple_of(2) {
197                    (vlat_abs_diff[abs_diff_median_mid] + vlat_abs_diff[abs_diff_median_mid - 1]) / 2
198                } else {
199                    vlat_abs_diff[abs_diff_median_mid]
200                };
201            } else {
202                self.data.latency_mean = vlat[0] as f64;
203                self.data.latency_mad = 0;
204            }
205        }
206
207        self.score()
208    }
209
210    pub fn data(&self) -> &ServerStatData {
211        &self.data
212    }
213}