octoproxy_lib/
metric.rs

1use std::{
2    borrow::Cow,
3    collections::HashMap,
4    fmt::Display,
5    net::SocketAddr,
6    time::{Duration, Instant},
7};
8
9use serde::{Deserialize, Serialize};
10
11#[derive(Serialize, Deserialize)]
12pub struct BackendMetric<'a> {
13    pub backend_name: Cow<'a, str>,
14    pub domain: Cow<'a, str>,
15    pub address: Cow<'a, str>,
16    pub metric: MetricData,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Default)]
20#[serde(rename_all = "snake_case")]
21pub enum BackendStatus {
22    #[default]
23    Normal,
24    /// `Closed` status will be changed to `Normal`
25    Closed,
26    Unknown,
27    /// set by manual, will never up automatically
28    ForceClosed,
29}
30
31#[derive(Debug, Deserialize, Serialize, Default, Clone, Copy)]
32#[serde(rename_all = "lowercase")]
33pub enum BackendProtocol {
34    HTTP2,
35    #[default]
36    Quic,
37}
38
39impl Display for BackendProtocol {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        let s = match self {
42            BackendProtocol::HTTP2 => "http2",
43            BackendProtocol::Quic => "quic",
44        };
45
46        write!(f, "{:<12}", s)
47    }
48}
49
50impl Display for BackendStatus {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        let s = match self {
53            BackendStatus::Normal => "NORMAL",
54            BackendStatus::Closed => "CLOSED",
55            BackendStatus::Unknown => "UNKNOWN",
56            BackendStatus::ForceClosed => "FORCECLOSED",
57        };
58        write!(f, "{:<12}", s)
59    }
60}
61
62#[derive(Clone, Default, Serialize, Deserialize)]
63pub struct MetricData {
64    pub active_connections: u32,
65    pub protocol: BackendProtocol,
66
67    #[serde(default)]
68    pub active_peers: HashMap<SocketAddr, u32>,
69    pub status: BackendStatus,
70    pub connection_time: Duration,
71
72    #[serde(skip)]
73    pub peak_ewma: PeakEWMA,
74    pub transmission_time: Duration,
75    pub incoming_bytes: u64,
76    pub outgoing_bytes: u64,
77    pub total_incoming_bytes: u64,
78    pub total_outgoing_bytes: u64,
79    pub failures: u32,
80}
81
82/// Copy from Sozu: <https://github.com/sozu-proxy/sozu/blob/c92a92e47ce79a9dfe23530244dc0ce8604acbcd/lib/src/lib.rs#L975>
83/// exponentially weighted moving average with high sensibility to latency bursts
84///
85/// cf Finagle for the original implementation: <https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala>
86#[derive(Debug, PartialEq, Clone)]
87pub struct PeakEWMA {
88    /// decay in nanoseconds
89    ///
90    /// higher values will make the EWMA decay slowly to 0
91    pub decay: f64,
92    /// estimated RTT in nanoseconds
93    ///
94    /// must be set to a high enough default value so that new backends do not
95    /// get all the traffic right away
96    pub rtt: f64,
97    /// last modification
98    pub last_event: Instant,
99}
100
101impl Default for PeakEWMA {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl PeakEWMA {
108    // hardcoded default values for now
109    pub fn new() -> Self {
110        PeakEWMA {
111            // 1s
112            decay: 1_000_000_000f64,
113            // 50ms
114            rtt: 50_000_000f64,
115            last_event: Instant::now(),
116        }
117    }
118
119    pub fn observe(&mut self, rtt: f64) {
120        let now = Instant::now();
121        let dur = now - self.last_event;
122
123        // if latency is rising, we will immediately raise the cost
124        if rtt > self.rtt {
125            self.rtt = rtt;
126        } else {
127            // new_rtt = old_rtt * e^(-elapsed/decay) + observed_rtt * (1 - e^(-elapsed/decay))
128            let weight = (-1.0 * dur.as_nanos() as f64 / self.decay).exp();
129            self.rtt = self.rtt * weight + rtt * (1.0 - weight);
130        }
131
132        self.last_event = now;
133    }
134
135    pub fn get(&self, active_requests: u32) -> f64 {
136        let now = Instant::now();
137        let dur = now - self.last_event;
138        let weight = (-1.0 * dur.as_nanos() as f64 / self.decay).exp();
139        (active_requests + 1) as f64 * self.rtt * weight
140    }
141
142    pub fn get_mut(&mut self, active_requests: u32) -> f64 {
143        // decay the current value
144        // (we might not have seen a request in a long time)
145        self.observe(0.0);
146
147        (active_requests + 1) as f64 * self.rtt
148    }
149}
150
151#[derive(Serialize, Deserialize)]
152#[serde(tag = "type")]
153pub enum MetricApiReq {
154    SwitchBackendStatus { backend_id: usize },
155    SwitchBackendProtocol { backend_id: usize },
156    ResetBackend { backend_id: usize },
157    AllBackends,
158}
159
160#[derive(Serialize, Deserialize)]
161#[serde(tag = "type")]
162pub enum MetricApiResp<'a> {
163    SwitchBackendStatus,
164    SwitchBackendProtocol,
165    ResetBackend,
166    AllBackends { items: Vec<BackendMetric<'a>> },
167    Error { msg: String },
168}