Skip to main content

erbium/
http.rs

1/*   Copyright 2024 Perry Lorier
2 *
3 *  Licensed under the Apache License, Version 2.0 (the "License");
4 *  you may not use this file except in compliance with the License.
5 *  You may obtain a copy of the License at
6 *
7 *      http://www.apache.org/licenses/LICENSE-2.0
8 *
9 *  Unless required by applicable law or agreed to in writing, software
10 *  distributed under the License is distributed on an "AS IS" BASIS,
11 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 *  See the License for the specific language governing permissions and
13 *  limitations under the License.
14 *
15 *  SPDX-License-Identifier: Apache-2.0
16 *
17 *  HTTP services for erbium
18 */
19
20use crate::acl;
21use ::prometheus;
22use erbium_net::addr::{NetAddr, ToNetAddr as _, tokio_to_unixaddr};
23use http_body_util::Full;
24use hyper::{
25    Request, Response,
26    body::{Body, Bytes},
27};
28use hyper_util::rt::TokioIo;
29
30use std::convert::Infallible;
31// TODO: the code here that depends on nix should move into erbium-net
32use erbium_net::nix;
33
34#[derive(Debug)]
35pub enum Error {
36    InvalidName(String),
37    ListenError(String, std::io::Error),
38    SocketInUse(String),
39    CleanupFailed(String, std::io::Error),
40}
41
42impl std::error::Error for Error {}
43
44impl std::fmt::Display for Error {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        use Error::*;
47        match self {
48            InvalidName(sock_name) => write!(f, "{} is not a valid socket name", sock_name),
49            ListenError(sock_name, err) => write!(f, "Failed to listen on {}: {}", sock_name, err),
50            SocketInUse(sock_name) => {
51                write!(
52                    f,
53                    "{} already in use by existing running process",
54                    sock_name
55                )
56            }
57            CleanupFailed(sock_name, err) => write!(f, "Failed to cleanup {}: {}", sock_name, err),
58        }
59    }
60}
61
62// tokio wanted to v1.0 by Eo2020.  tokio::Stream::Stream is RFC'd to be moved into rust std::
63// but hadn't happened yet.  So tokio removed the Stream impl's from the Listener sockets.  tokio
64// has a guarantee that it will support rust compilers for up to 6 months, so they won't be adding
65// it back until at least mid 2021 (assuming the RFC is merged with std very soon).  Now there is
66// no shared trait between UnixListener and TcpListener, so run_listener() can't take a trait.
67// This adapter is here to temporarily resolve that.  It can be cleaned up when tokio starts
68// supporting std's Stream for UnixListener and TcpListener again.
69use async_trait::async_trait;
70
71#[async_trait]
72trait Accepter {
73    type AcceptedSocket;
74    async fn accept_connection(&self) -> Result<(Self::AcceptedSocket, NetAddr), std::io::Error>;
75}
76
77#[async_trait]
78impl Accepter for TokioIo<tokio::net::UnixListener> {
79    type AcceptedSocket = TokioIo<tokio::net::UnixStream>;
80    async fn accept_connection(&self) -> Result<(Self::AcceptedSocket, NetAddr), std::io::Error> {
81        self.inner()
82            .accept()
83            .await
84            .map(|(sock, addr)| (TokioIo::new(sock), tokio_to_unixaddr(&addr).to_net_addr()))
85    }
86}
87
88#[async_trait]
89impl Accepter for TokioIo<tokio::net::TcpListener> {
90    type AcceptedSocket = TokioIo<tokio::net::TcpStream>;
91    async fn accept_connection(&self) -> Result<(Self::AcceptedSocket, NetAddr), std::io::Error> {
92        self.inner()
93            .accept()
94            .await
95            .map(|(sock, addr)| (TokioIo::new(sock), addr.into()))
96    }
97}
98
99async fn serve_metrics<IB: Body>(_req: Request<IB>) -> Result<Response<Full<Bytes>>, Infallible> {
100    use prometheus::{Encoder, TextEncoder};
101
102    // Register & measure some metrics.
103
104    let mut buffer = Vec::new();
105    let encoder = TextEncoder::new();
106
107    // Gather the metrics.
108    let metric_families = prometheus::gather();
109    // Encode them to send.
110    encoder.encode(&metric_families, &mut buffer).unwrap();
111
112    Ok(Response::builder()
113        .status(200)
114        .header("Content-type", "text/plain; version=0.0.4")
115        .body(buffer.into())
116        .unwrap())
117}
118
119async fn serve_leases<IB: Body>(
120    _req: Request<IB>,
121    dhcp: &std::sync::Arc<crate::dhcp::DhcpService>,
122) -> Result<Response<Full<Bytes>>, Infallible> {
123    let mut leases = dhcp.get_leases().await;
124    leases.sort();
125    let buffer = format!(
126        "{{ \"leases\" : [\n{}\n]}}\n",
127        leases
128            .iter()
129            .map(|li| format!(
130                " {{ \"ip\": \"{}\", \"client_id\": \"{}\", \"start\": {}, \"expire\": {}{} }}",
131                li.ip,
132                li.client_id
133                    .iter()
134                    .map(|b| format!("{:0>2x}", b))
135                    .collect::<Vec<_>>()
136                    .join(":"),
137                li.start,
138                li.expire,
139                crate::dhcp::dhcppkt::parse_options(crate::pktparser::Buffer::new(&li.options))
140                    .ok()
141                    .and_then(|o| o.get_hostname())
142                    .map(|h| format!(", \"host-name\": {:?}", h))
143                    .or_else(|| Some("".to_string()))
144                    .unwrap(),
145            ))
146            .collect::<Vec<_>>()
147            .join(",\n")
148    );
149
150    Ok(Response::builder()
151        .status(200)
152        .header("Content-type", "application/json")
153        .body(buffer.into())
154        .unwrap())
155}
156
157fn permission_denied() -> Response<Full<Bytes>> {
158    use hyper::StatusCode;
159    Response::builder()
160        .status(StatusCode::FORBIDDEN)
161        .header("Content-Type", "text/html")
162        .body(
163            "<!DOCTYPE html>
164<head>
165 <title>Forbidden</title>
166</head>
167<body>
168 <h1>Forbidden</h1>
169</body>
170</html>"
171                .into(),
172        )
173        .unwrap()
174}
175
176fn require_http_permission(
177    acl: &[acl::Acl],
178    client: &acl::Attributes,
179    perm: acl::PermissionType,
180) -> Option<Response<Full<Bytes>>> {
181    match acl::require_permission(acl, client, perm) {
182        Ok(()) => None,
183        Err(err) => {
184            log::trace!("{}: {}", client.addr, err);
185            Some(permission_denied())
186        }
187    }
188}
189
190async fn serve_request<IB: Body>(
191    conf: crate::config::SharedConfig,
192    req: Request<IB>,
193    addr: std::sync::Arc<NetAddr>,
194    dhcp: std::sync::Arc<crate::dhcp::DhcpService>,
195) -> Result<Response<Full<Bytes>>, Infallible> {
196    use hyper::{Method, StatusCode};
197
198    let client = acl::Attributes { addr: *addr };
199
200    match (req.method(), req.uri().path()) {
201        (&Method::GET, "/") => {
202            if let Some(ret) =
203                require_http_permission(&conf.read().await.acls, &client, acl::PermissionType::Http)
204            {
205                Ok(ret)
206            } else {
207                Ok(Response::new(
208                    format!("Welcome to Erbium {}", env!("CARGO_PKG_VERSION"),).into(),
209                ))
210            }
211        }
212        (&Method::GET, "/metrics") => {
213            if let Some(ret) = require_http_permission(
214                &conf.read().await.acls,
215                &client,
216                acl::PermissionType::HttpMetrics,
217            ) {
218                Ok(ret)
219            } else {
220                dhcp.update_metrics().await;
221                serve_metrics(req).await
222            }
223        }
224        (&Method::GET, "/api/v1/leases.json") => serve_leases(req, &dhcp).await,
225        _ => {
226            if let Some(ret) = require_http_permission(
227                &conf.read().await.acls,
228                &client,
229                acl::PermissionType::HttpLeases,
230            ) {
231                Ok(ret)
232            } else {
233                Ok(Response::builder()
234                    .status(StatusCode::NOT_FOUND)
235                    .body("Not found".into())
236                    .unwrap())
237            }
238        }
239    }
240}
241
242async fn run_listener<L>(
243    conf: crate::config::SharedConfig,
244    dhcp: std::sync::Arc<crate::dhcp::DhcpService>,
245    listener: L,
246) -> Result<(), hyper::Error>
247where
248    L: Accepter + Unpin,
249    <L as Accepter>::AcceptedSocket: Unpin + hyper::rt::Write + hyper::rt::Read + Send + 'static,
250{
251    use hyper::service::service_fn;
252
253    loop {
254        let (stream, addr) = match listener.accept_connection().await {
255            Ok((stream, addr)) => (stream, std::sync::Arc::new(addr)),
256            Err(e) => {
257                log::warn!("Failed to accept on API server: {}", e);
258                continue;
259            }
260        };
261        let conf_copy = conf.clone();
262        let dhcp_copy = dhcp.clone();
263        let srv = move |req| serve_request(conf_copy.clone(), req, addr.clone(), dhcp_copy.clone());
264        tokio::task::spawn(async move {
265            if let Err(http_err) = hyper::server::conn::http1::Builder::new()
266                .keep_alive(true)
267                .serve_connection(stream, service_fn(srv))
268                .await
269            {
270                log::warn!("Error while serving HTTP connection: {}", http_err);
271            }
272        });
273    }
274}
275
276pub async fn run(
277    dhcp: std::sync::Arc<crate::dhcp::DhcpService>,
278    conf: crate::config::SharedConfig,
279) -> Result<(), Error> {
280    // Set up all the listeners and listen on them.
281    for addr in &conf.read().await.listeners {
282        use nix::sys::socket::{AddressFamily::*, SockaddrLike as _};
283        use tokio::net::{TcpListener, UnixListener};
284        match addr.family() {
285            Some(Inet) => {
286                let s = addr.as_sockaddr_in().unwrap();
287
288                let listener = TcpListener::bind((s.ip(), s.port()))
289                    .await
290                    .map_err(|e| Error::ListenError(s.to_string(), e))?;
291                tokio::task::spawn(run_listener(
292                    conf.clone(),
293                    dhcp.clone(),
294                    TokioIo::new(listener),
295                ));
296            }
297            Some(Inet6) => {
298                let s = addr.as_sockaddr_in6().unwrap();
299
300                let listener = TcpListener::bind((s.ip(), s.port()))
301                    .await
302                    .map_err(|e| Error::ListenError(s.to_string(), e))?;
303                tokio::task::spawn(run_listener(
304                    conf.clone(),
305                    dhcp.clone(),
306                    TokioIo::new(listener),
307                ));
308            }
309            Some(Unix) => {
310                let s = addr.as_unix_addr().unwrap();
311                let listener;
312                if let Some(path) = s.path() {
313                    loop {
314                        use nix::sys::stat::*;
315                        let oldmask = umask(Mode::from_bits(0o077).unwrap());
316                        let mut newmask = oldmask;
317                        // Limit to at least 0o077
318                        newmask.insert(Mode::from_bits(0o077).unwrap());
319                        let _ = umask(newmask);
320                        let listener_status = UnixListener::bind(path);
321                        // Now restore it.
322                        umask(oldmask);
323                        use std::io;
324                        listener = match listener_status {
325                            Ok(l) => l,
326                            Err(listen_err) if listen_err.kind() == io::ErrorKind::AddrInUse => {
327                                // This is perhaps a socket left over from a previous encantation of
328                                // the program.  Test to see if it's live, if it's not, then remove it
329                                // and try again.
330                                match tokio::net::UnixStream::connect(path).await {
331                                    Err(e) if e.kind() == io::ErrorKind::ConnectionRefused => {
332                                        log::warn!(
333                                            "Cleaning up stale socket {}",
334                                            path.to_string_lossy()
335                                        );
336                                        std::fs::remove_file(path).map_err(|e| {
337                                            Error::CleanupFailed(path.to_string_lossy().into(), e)
338                                        })?;
339                                        // Try and rebind this socket again.
340                                        continue;
341                                    }
342                                    Err(_) => {
343                                        // We return the top level error ("Address in use")
344                                        return Err(Error::ListenError(
345                                            path.to_string_lossy().into(),
346                                            listen_err,
347                                        ));
348                                    }
349                                    Ok(_) => {
350                                        // We were able to connect to the unix domain socket, so
351                                        // there must be a process on the other side listening on
352                                        // it.  Fail the entire operation.
353                                        return Err(Error::SocketInUse(
354                                            path.to_string_lossy().into(),
355                                        ));
356                                    }
357                                }
358                            }
359                            Err(e) => {
360                                // We were unable to listen on the socket for some reason (eg the
361                                // containing directory doesn't exist).
362                                return Err(Error::ListenError(path.to_string_lossy().into(), e));
363                            }
364                        };
365                        break;
366                    }
367                } else if let Some(name) = s.as_abstract() {
368                    let mut name_bytes = vec![0x00_u8];
369                    name_bytes.extend(name);
370                    let sock_name = String::from_utf8(name_bytes)
371                        .map_err(|_| Error::InvalidName(String::from_utf8_lossy(name).into()))?;
372                    listener = UnixListener::bind(sock_name)
373                        .map_err(|e| Error::ListenError(String::from_utf8_lossy(name).into(), e))?;
374                } else {
375                    panic!("Unknown unix listener!");
376                }
377                log::trace!("Starting listener on {:?}", listener);
378                tokio::task::spawn(run_listener(
379                    conf.clone(),
380                    dhcp.clone(),
381                    TokioIo::new(listener),
382                ));
383            }
384            _ => panic!("Unknown listener type!"),
385        }
386    }
387    Ok(())
388}