Skip to main content

arbiter_proxy/
server.rs

1//! Server bootstrap and graceful shutdown.
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5
6use arbiter_metrics::ArbiterMetrics;
7use hyper::server::conn::http1;
8use hyper::service::service_fn;
9use hyper_util::rt::TokioIo;
10use tokio::net::TcpListener;
11use tokio::signal;
12
13use crate::config::ProxyConfig;
14use crate::middleware::MiddlewareChain;
15use crate::proxy::{ProxyState, build_audit, handle_request};
16
17/// Run the proxy server. Blocks until a shutdown signal is received.
18pub async fn run(config: ProxyConfig) -> anyhow::Result<()> {
19    let addr: SocketAddr = format!(
20        "{}:{}",
21        config.server.listen_addr, config.server.listen_port
22    )
23    .parse()?;
24
25    let middleware = MiddlewareChain::from_config(&config.middleware);
26
27    let (audit_sink, redaction_config) = build_audit(&config.audit);
28    let metrics = Arc::new(
29        ArbiterMetrics::new().map_err(|e| anyhow::anyhow!("failed to create metrics: {e}"))?,
30    );
31
32    let state = Arc::new(ProxyState::new(
33        config.upstream.url.clone(),
34        middleware,
35        audit_sink,
36        redaction_config,
37        metrics,
38    ));
39
40    let listener = TcpListener::bind(addr).await?;
41    tracing::info!(%addr, upstream = %config.upstream.url, "proxy listening");
42
43    let shutdown = shutdown_signal();
44    tokio::pin!(shutdown);
45
46    loop {
47        tokio::select! {
48            result = listener.accept() => {
49                let (stream, remote_addr) = result?;
50                let state = Arc::clone(&state);
51                tracing::debug!(%remote_addr, "accepted connection");
52
53                tokio::spawn(async move {
54                    let io = TokioIo::new(stream);
55                    let svc = service_fn(move |req| {
56                        let state = Arc::clone(&state);
57                        handle_request(state, req)
58                    });
59                    if let Err(e) = http1::Builder::new()
60                        .serve_connection(io, svc)
61                        .await
62                    {
63                        tracing::error!(error = %e, %remote_addr, "connection error");
64                    }
65                });
66            }
67            _ = &mut shutdown => {
68                tracing::info!("shutdown signal received, stopping");
69                break;
70            }
71        }
72    }
73
74    Ok(())
75}
76
77/// Wait for SIGTERM or SIGINT (ctrl-c).
78async fn shutdown_signal() {
79    let ctrl_c = async {
80        signal::ctrl_c()
81            .await
82            .expect("failed to install ctrl-c handler");
83    };
84
85    #[cfg(unix)]
86    let terminate = async {
87        signal::unix::signal(signal::unix::SignalKind::terminate())
88            .expect("failed to install SIGTERM handler")
89            .recv()
90            .await;
91    };
92
93    #[cfg(not(unix))]
94    let terminate = std::future::pending::<()>();
95
96    tokio::select! {
97        _ = ctrl_c => {}
98        _ = terminate => {}
99    }
100}