1use crate::acl;
21use ::prometheus;
22use erbium_net::addr::{NetAddr, ToNetAddr as _, tokio_to_unixaddr};
23use http_body_util::Full;
24use hyper::{
25 Request, Response,
26 body::{Body, Bytes},
27};
28use hyper_util::rt::TokioIo;
29
30use std::convert::Infallible;
31use erbium_net::nix;
33
34#[derive(Debug)]
35pub enum Error {
36 InvalidName(String),
37 ListenError(String, std::io::Error),
38 SocketInUse(String),
39 CleanupFailed(String, std::io::Error),
40}
41
42impl std::error::Error for Error {}
43
44impl std::fmt::Display for Error {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 use Error::*;
47 match self {
48 InvalidName(sock_name) => write!(f, "{} is not a valid socket name", sock_name),
49 ListenError(sock_name, err) => write!(f, "Failed to listen on {}: {}", sock_name, err),
50 SocketInUse(sock_name) => {
51 write!(
52 f,
53 "{} already in use by existing running process",
54 sock_name
55 )
56 }
57 CleanupFailed(sock_name, err) => write!(f, "Failed to cleanup {}: {}", sock_name, err),
58 }
59 }
60}
61
62use async_trait::async_trait;
70
71#[async_trait]
72trait Accepter {
73 type AcceptedSocket;
74 async fn accept_connection(&self) -> Result<(Self::AcceptedSocket, NetAddr), std::io::Error>;
75}
76
77#[async_trait]
78impl Accepter for TokioIo<tokio::net::UnixListener> {
79 type AcceptedSocket = TokioIo<tokio::net::UnixStream>;
80 async fn accept_connection(&self) -> Result<(Self::AcceptedSocket, NetAddr), std::io::Error> {
81 self.inner()
82 .accept()
83 .await
84 .map(|(sock, addr)| (TokioIo::new(sock), tokio_to_unixaddr(&addr).to_net_addr()))
85 }
86}
87
88#[async_trait]
89impl Accepter for TokioIo<tokio::net::TcpListener> {
90 type AcceptedSocket = TokioIo<tokio::net::TcpStream>;
91 async fn accept_connection(&self) -> Result<(Self::AcceptedSocket, NetAddr), std::io::Error> {
92 self.inner()
93 .accept()
94 .await
95 .map(|(sock, addr)| (TokioIo::new(sock), addr.into()))
96 }
97}
98
99async fn serve_metrics<IB: Body>(_req: Request<IB>) -> Result<Response<Full<Bytes>>, Infallible> {
100 use prometheus::{Encoder, TextEncoder};
101
102 let mut buffer = Vec::new();
105 let encoder = TextEncoder::new();
106
107 let metric_families = prometheus::gather();
109 encoder.encode(&metric_families, &mut buffer).unwrap();
111
112 Ok(Response::builder()
113 .status(200)
114 .header("Content-type", "text/plain; version=0.0.4")
115 .body(buffer.into())
116 .unwrap())
117}
118
119async fn serve_leases<IB: Body>(
120 _req: Request<IB>,
121 dhcp: &std::sync::Arc<crate::dhcp::DhcpService>,
122) -> Result<Response<Full<Bytes>>, Infallible> {
123 let mut leases = dhcp.get_leases().await;
124 leases.sort();
125 let buffer = format!(
126 "{{ \"leases\" : [\n{}\n]}}\n",
127 leases
128 .iter()
129 .map(|li| format!(
130 " {{ \"ip\": \"{}\", \"client_id\": \"{}\", \"start\": {}, \"expire\": {}{} }}",
131 li.ip,
132 li.client_id
133 .iter()
134 .map(|b| format!("{:0>2x}", b))
135 .collect::<Vec<_>>()
136 .join(":"),
137 li.start,
138 li.expire,
139 crate::dhcp::dhcppkt::parse_options(crate::pktparser::Buffer::new(&li.options))
140 .ok()
141 .and_then(|o| o.get_hostname())
142 .map(|h| format!(", \"host-name\": {:?}", h))
143 .or_else(|| Some("".to_string()))
144 .unwrap(),
145 ))
146 .collect::<Vec<_>>()
147 .join(",\n")
148 );
149
150 Ok(Response::builder()
151 .status(200)
152 .header("Content-type", "application/json")
153 .body(buffer.into())
154 .unwrap())
155}
156
157fn permission_denied() -> Response<Full<Bytes>> {
158 use hyper::StatusCode;
159 Response::builder()
160 .status(StatusCode::FORBIDDEN)
161 .header("Content-Type", "text/html")
162 .body(
163 "<!DOCTYPE html>
164<head>
165 <title>Forbidden</title>
166</head>
167<body>
168 <h1>Forbidden</h1>
169</body>
170</html>"
171 .into(),
172 )
173 .unwrap()
174}
175
176fn require_http_permission(
177 acl: &[acl::Acl],
178 client: &acl::Attributes,
179 perm: acl::PermissionType,
180) -> Option<Response<Full<Bytes>>> {
181 match acl::require_permission(acl, client, perm) {
182 Ok(()) => None,
183 Err(err) => {
184 log::trace!("{}: {}", client.addr, err);
185 Some(permission_denied())
186 }
187 }
188}
189
190async fn serve_request<IB: Body>(
191 conf: crate::config::SharedConfig,
192 req: Request<IB>,
193 addr: std::sync::Arc<NetAddr>,
194 dhcp: std::sync::Arc<crate::dhcp::DhcpService>,
195) -> Result<Response<Full<Bytes>>, Infallible> {
196 use hyper::{Method, StatusCode};
197
198 let client = acl::Attributes { addr: *addr };
199
200 match (req.method(), req.uri().path()) {
201 (&Method::GET, "/") => {
202 if let Some(ret) =
203 require_http_permission(&conf.read().await.acls, &client, acl::PermissionType::Http)
204 {
205 Ok(ret)
206 } else {
207 Ok(Response::new(
208 format!("Welcome to Erbium {}", env!("CARGO_PKG_VERSION"),).into(),
209 ))
210 }
211 }
212 (&Method::GET, "/metrics") => {
213 if let Some(ret) = require_http_permission(
214 &conf.read().await.acls,
215 &client,
216 acl::PermissionType::HttpMetrics,
217 ) {
218 Ok(ret)
219 } else {
220 dhcp.update_metrics().await;
221 serve_metrics(req).await
222 }
223 }
224 (&Method::GET, "/api/v1/leases.json") => serve_leases(req, &dhcp).await,
225 _ => {
226 if let Some(ret) = require_http_permission(
227 &conf.read().await.acls,
228 &client,
229 acl::PermissionType::HttpLeases,
230 ) {
231 Ok(ret)
232 } else {
233 Ok(Response::builder()
234 .status(StatusCode::NOT_FOUND)
235 .body("Not found".into())
236 .unwrap())
237 }
238 }
239 }
240}
241
242async fn run_listener<L>(
243 conf: crate::config::SharedConfig,
244 dhcp: std::sync::Arc<crate::dhcp::DhcpService>,
245 listener: L,
246) -> Result<(), hyper::Error>
247where
248 L: Accepter + Unpin,
249 <L as Accepter>::AcceptedSocket: Unpin + hyper::rt::Write + hyper::rt::Read + Send + 'static,
250{
251 use hyper::service::service_fn;
252
253 loop {
254 let (stream, addr) = match listener.accept_connection().await {
255 Ok((stream, addr)) => (stream, std::sync::Arc::new(addr)),
256 Err(e) => {
257 log::warn!("Failed to accept on API server: {}", e);
258 continue;
259 }
260 };
261 let conf_copy = conf.clone();
262 let dhcp_copy = dhcp.clone();
263 let srv = move |req| serve_request(conf_copy.clone(), req, addr.clone(), dhcp_copy.clone());
264 tokio::task::spawn(async move {
265 if let Err(http_err) = hyper::server::conn::http1::Builder::new()
266 .keep_alive(true)
267 .serve_connection(stream, service_fn(srv))
268 .await
269 {
270 log::warn!("Error while serving HTTP connection: {}", http_err);
271 }
272 });
273 }
274}
275
276pub async fn run(
277 dhcp: std::sync::Arc<crate::dhcp::DhcpService>,
278 conf: crate::config::SharedConfig,
279) -> Result<(), Error> {
280 for addr in &conf.read().await.listeners {
282 use nix::sys::socket::{AddressFamily::*, SockaddrLike as _};
283 use tokio::net::{TcpListener, UnixListener};
284 match addr.family() {
285 Some(Inet) => {
286 let s = addr.as_sockaddr_in().unwrap();
287
288 let listener = TcpListener::bind((s.ip(), s.port()))
289 .await
290 .map_err(|e| Error::ListenError(s.to_string(), e))?;
291 tokio::task::spawn(run_listener(
292 conf.clone(),
293 dhcp.clone(),
294 TokioIo::new(listener),
295 ));
296 }
297 Some(Inet6) => {
298 let s = addr.as_sockaddr_in6().unwrap();
299
300 let listener = TcpListener::bind((s.ip(), s.port()))
301 .await
302 .map_err(|e| Error::ListenError(s.to_string(), e))?;
303 tokio::task::spawn(run_listener(
304 conf.clone(),
305 dhcp.clone(),
306 TokioIo::new(listener),
307 ));
308 }
309 Some(Unix) => {
310 let s = addr.as_unix_addr().unwrap();
311 let listener;
312 if let Some(path) = s.path() {
313 loop {
314 use nix::sys::stat::*;
315 let oldmask = umask(Mode::from_bits(0o077).unwrap());
316 let mut newmask = oldmask;
317 newmask.insert(Mode::from_bits(0o077).unwrap());
319 let _ = umask(newmask);
320 let listener_status = UnixListener::bind(path);
321 umask(oldmask);
323 use std::io;
324 listener = match listener_status {
325 Ok(l) => l,
326 Err(listen_err) if listen_err.kind() == io::ErrorKind::AddrInUse => {
327 match tokio::net::UnixStream::connect(path).await {
331 Err(e) if e.kind() == io::ErrorKind::ConnectionRefused => {
332 log::warn!(
333 "Cleaning up stale socket {}",
334 path.to_string_lossy()
335 );
336 std::fs::remove_file(path).map_err(|e| {
337 Error::CleanupFailed(path.to_string_lossy().into(), e)
338 })?;
339 continue;
341 }
342 Err(_) => {
343 return Err(Error::ListenError(
345 path.to_string_lossy().into(),
346 listen_err,
347 ));
348 }
349 Ok(_) => {
350 return Err(Error::SocketInUse(
354 path.to_string_lossy().into(),
355 ));
356 }
357 }
358 }
359 Err(e) => {
360 return Err(Error::ListenError(path.to_string_lossy().into(), e));
363 }
364 };
365 break;
366 }
367 } else if let Some(name) = s.as_abstract() {
368 let mut name_bytes = vec![0x00_u8];
369 name_bytes.extend(name);
370 let sock_name = String::from_utf8(name_bytes)
371 .map_err(|_| Error::InvalidName(String::from_utf8_lossy(name).into()))?;
372 listener = UnixListener::bind(sock_name)
373 .map_err(|e| Error::ListenError(String::from_utf8_lossy(name).into(), e))?;
374 } else {
375 panic!("Unknown unix listener!");
376 }
377 log::trace!("Starting listener on {:?}", listener);
378 tokio::task::spawn(run_listener(
379 conf.clone(),
380 dhcp.clone(),
381 TokioIo::new(listener),
382 ));
383 }
384 _ => panic!("Unknown listener type!"),
385 }
386 }
387 Ok(())
388}