Skip to main content

sozu_command_lib/
request.rs

1use std::{
2    error,
3    fmt::{self, Display},
4    fs::File,
5    io::{BufReader, Read},
6    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
7    str::FromStr,
8};
9
10use prost::{DecodeError, Message};
11use rusty_ulid::Ulid;
12
13use crate::{
14    proto::{
15        command::{
16            InitialState, IpAddress, LoadBalancingAlgorithms, PathRuleKind, Request,
17            RequestHttpFrontend, RulePosition, SocketAddress, Uint128, WorkerRequest, ip_address,
18            request::RequestType,
19        },
20        display::format_request_type,
21    },
22    response::HttpFrontend,
23};
24
25#[derive(thiserror::Error, Debug)]
26pub enum RequestError {
27    #[error("invalid value {value} for field '{name}'")]
28    InvalidValue { name: String, value: i32 },
29    #[error("Could not read requests from file: {0}")]
30    ReadFile(std::io::Error),
31    #[error("Could not decode requests: {0}")]
32    Decode(DecodeError),
33}
34
35impl Request {
36    /// determine to which of the three proxies (HTTP, HTTPS, TCP) a request is destined
37    pub fn get_destinations(&self) -> ProxyDestinations {
38        let mut proxy_destination = ProxyDestinations {
39            to_http_proxy: false,
40            to_https_proxy: false,
41            to_tcp_proxy: false,
42        };
43        let request_type = match &self.request_type {
44            Some(t) => t,
45            None => return proxy_destination,
46        };
47
48        match request_type {
49            RequestType::AddHttpFrontend(_) | RequestType::RemoveHttpFrontend(_) => {
50                proxy_destination.to_http_proxy = true
51            }
52
53            RequestType::AddHttpsFrontend(_)
54            | RequestType::RemoveHttpsFrontend(_)
55            | RequestType::AddCertificate(_)
56            | RequestType::QueryCertificatesFromWorkers(_)
57            | RequestType::ReplaceCertificate(_)
58            | RequestType::RemoveCertificate(_) => proxy_destination.to_https_proxy = true,
59
60            RequestType::AddTcpFrontend(_) | RequestType::RemoveTcpFrontend(_) => {
61                proxy_destination.to_tcp_proxy = true
62            }
63
64            RequestType::AddCluster(_)
65            | RequestType::AddBackend(_)
66            | RequestType::RemoveCluster(_)
67            | RequestType::RemoveBackend(_)
68            | RequestType::SetHealthCheck(_)
69            | RequestType::RemoveHealthCheck(_)
70            | RequestType::SoftStop(_)
71            | RequestType::HardStop(_)
72            | RequestType::Status(_) => {
73                proxy_destination.to_http_proxy = true;
74                proxy_destination.to_https_proxy = true;
75                proxy_destination.to_tcp_proxy = true;
76            }
77
78            // handled at worker level prior to this call
79            RequestType::ConfigureMetrics(_)
80            | RequestType::SetMetricDetail(_)
81            | RequestType::QueryMetrics(_)
82            | RequestType::Logging(_)
83            | RequestType::QueryClustersHashes(_)
84            | RequestType::QueryClusterById(_)
85            | RequestType::QueryClustersByDomain(_)
86            | RequestType::SetMaxConnectionsPerIp(_)
87            | RequestType::QueryMaxConnectionsPerIp(_) => {}
88
89            // the Add***Listener / Update***Listener and other Listener orders will be
90            // handled separately by the notify_proxys function, so we don't give them
91            // destinations here
92            RequestType::AddHttpsListener(_)
93            | RequestType::AddHttpListener(_)
94            | RequestType::AddTcpListener(_)
95            | RequestType::UpdateHttpListener(_)
96            | RequestType::UpdateHttpsListener(_)
97            | RequestType::UpdateTcpListener(_)
98            | RequestType::RemoveListener(_)
99            | RequestType::ActivateListener(_)
100            | RequestType::DeactivateListener(_)
101            | RequestType::ReturnListenSockets(_) => {}
102
103            // These won't ever reach a worker anyway
104            RequestType::SaveState(_)
105            | RequestType::CountRequests(_)
106            | RequestType::QueryCertificatesFromTheState(_)
107            | RequestType::QueryHealthChecks(_)
108            | RequestType::LoadState(_)
109            | RequestType::ListWorkers(_)
110            | RequestType::ListFrontends(_)
111            | RequestType::ListListeners(_)
112            | RequestType::LaunchWorker(_)
113            | RequestType::UpgradeMain(_)
114            | RequestType::UpgradeWorker(_)
115            | RequestType::SubscribeEvents(_)
116            | RequestType::ReloadConfiguration(_) => {}
117        }
118        proxy_destination
119    }
120
121    /// True if the request is a SoftStop or a HardStop
122    pub fn is_a_stop(&self) -> bool {
123        matches!(
124            self.request_type,
125            Some(RequestType::SoftStop(_)) | Some(RequestType::HardStop(_))
126        )
127    }
128
129    pub fn short_name(&self) -> &str {
130        match &self.request_type {
131            Some(request_type) => format_request_type(request_type),
132            None => "Unallowed",
133        }
134    }
135}
136
137impl WorkerRequest {
138    pub fn new(id: String, content: Request) -> Self {
139        Self { id, content }
140    }
141}
142
143impl fmt::Display for WorkerRequest {
144    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145        write!(f, "{}-{:?}", self.id, self.content)
146    }
147}
148
149pub fn read_initial_state_from_file(file: &mut File) -> Result<InitialState, RequestError> {
150    let mut buf_reader = BufReader::new(file);
151    read_initial_state(&mut buf_reader)
152}
153
154pub fn read_initial_state<R: Read>(reader: &mut R) -> Result<InitialState, RequestError> {
155    let mut buffer = Vec::new();
156    reader
157        .read_to_end(&mut buffer)
158        .map_err(RequestError::ReadFile)?;
159
160    InitialState::decode(&buffer[..]).map_err(RequestError::Decode)
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Hash)]
164pub struct ProxyDestinations {
165    pub to_http_proxy: bool,
166    pub to_https_proxy: bool,
167    pub to_tcp_proxy: bool,
168}
169
170impl RequestHttpFrontend {
171    /// convert a requested frontend to a usable one by parsing its address
172    pub fn to_frontend(self) -> Result<HttpFrontend, RequestError> {
173        Ok(HttpFrontend {
174            address: self.address.into(),
175            cluster_id: self.cluster_id,
176            hostname: self.hostname,
177            path: self.path,
178            method: self.method,
179            position: RulePosition::try_from(self.position).map_err(|_| {
180                RequestError::InvalidValue {
181                    name: "position".to_string(),
182                    value: self.position,
183                }
184            })?,
185            tags: Some(self.tags),
186            redirect: self.redirect,
187            redirect_scheme: self.redirect_scheme,
188            redirect_template: self.redirect_template,
189            rewrite_host: self.rewrite_host,
190            rewrite_path: self.rewrite_path,
191            rewrite_port: self.rewrite_port,
192            required_auth: self.required_auth,
193            headers: self.headers,
194            hsts: self.hsts,
195        })
196    }
197}
198
199impl Display for RequestHttpFrontend {
200    /// Used to create a unique summary of the frontend, used as a key in maps
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        let s = match &PathRuleKind::try_from(self.path.kind) {
203            Ok(PathRuleKind::Prefix) => {
204                format!("{};{};P{}", self.address, self.hostname, self.path.value)
205            }
206            Ok(PathRuleKind::Regex) => {
207                format!("{};{};R{}", self.address, self.hostname, self.path.value)
208            }
209            Ok(PathRuleKind::Equals) => {
210                format!("{};{};={}", self.address, self.hostname, self.path.value)
211            }
212            Err(e) => format!("Wrong variant of PathRuleKind: {e}"),
213        };
214
215        match &self.method {
216            Some(method) => write!(f, "{s};{method}"),
217            None => write!(f, "{s}"),
218        }
219    }
220}
221
222#[derive(Debug)]
223pub struct ParseErrorLoadBalancing;
224
225impl fmt::Display for ParseErrorLoadBalancing {
226    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
227        write!(f, "Cannot find the load balancing policy asked")
228    }
229}
230
231impl error::Error for ParseErrorLoadBalancing {
232    fn description(&self) -> &str {
233        "Cannot find the load balancing policy asked"
234    }
235
236    fn cause(&self) -> Option<&dyn error::Error> {
237        None
238    }
239}
240
241impl FromStr for LoadBalancingAlgorithms {
242    type Err = ParseErrorLoadBalancing;
243
244    fn from_str(s: &str) -> Result<Self, Self::Err> {
245        match s.to_lowercase().as_str() {
246            "round_robin" => Ok(LoadBalancingAlgorithms::RoundRobin),
247            "random" => Ok(LoadBalancingAlgorithms::Random),
248            "power_of_two" => Ok(LoadBalancingAlgorithms::PowerOfTwo),
249            "least_loaded" => Ok(LoadBalancingAlgorithms::LeastLoaded),
250            _ => Err(ParseErrorLoadBalancing {}),
251        }
252    }
253}
254
255impl SocketAddress {
256    pub fn new_v4(a: u8, b: u8, c: u8, d: u8, port: u16) -> Self {
257        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(a, b, c, d)), port).into()
258    }
259}
260
261impl From<SocketAddr> for SocketAddress {
262    fn from(socket_addr: SocketAddr) -> SocketAddress {
263        let ip_inner = match socket_addr {
264            SocketAddr::V4(ip_v4_addr) => ip_address::Inner::V4(u32::from(*ip_v4_addr.ip())),
265            SocketAddr::V6(ip_v6_addr) => {
266                ip_address::Inner::V6(Uint128::from(u128::from(*ip_v6_addr.ip())))
267            }
268        };
269
270        SocketAddress {
271            port: socket_addr.port() as u32,
272            ip: IpAddress {
273                inner: Some(ip_inner),
274            },
275        }
276    }
277}
278
279impl From<SocketAddress> for SocketAddr {
280    fn from(socket_address: SocketAddress) -> Self {
281        let port = socket_address.port as u16;
282
283        let ip = match socket_address.ip.inner {
284            Some(inner) => match inner {
285                ip_address::Inner::V4(v4_value) => IpAddr::V4(Ipv4Addr::from(v4_value)),
286                ip_address::Inner::V6(v6_value) => IpAddr::V6(Ipv6Addr::from(u128::from(v6_value))),
287            },
288            None => IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), // should never happen
289        };
290
291        SocketAddr::new(ip, port)
292    }
293}
294
295impl From<Uint128> for u128 {
296    fn from(value: Uint128) -> Self {
297        value.low as u128 | ((value.high as u128) << 64)
298    }
299}
300
301impl From<u128> for Uint128 {
302    fn from(value: u128) -> Self {
303        let low = value as u64;
304        let high = (value >> 64) as u64;
305        Uint128 { low, high }
306    }
307}
308
309impl From<i128> for Uint128 {
310    fn from(value: i128) -> Self {
311        Uint128::from(value as u128)
312    }
313}
314
315impl From<Ulid> for Uint128 {
316    fn from(value: Ulid) -> Self {
317        let (low, high) = value.into();
318        Uint128 { low, high }
319    }
320}
321
322impl From<Uint128> for Ulid {
323    fn from(value: Uint128) -> Self {
324        let Uint128 { low, high } = value;
325        Ulid::from((low, high))
326    }
327}