1use std::net::SocketAddr;
4use std::sync::Arc;
5
6use arbiter_metrics::ArbiterMetrics;
7use hyper::server::conn::http1;
8use hyper::service::service_fn;
9use hyper_util::rt::TokioIo;
10use tokio::net::TcpListener;
11use tokio::signal;
12
13use crate::config::ProxyConfig;
14use crate::middleware::MiddlewareChain;
15use crate::proxy::{ProxyState, build_audit, handle_request};
16
17pub async fn run(config: ProxyConfig) -> anyhow::Result<()> {
19 let addr: SocketAddr = format!(
20 "{}:{}",
21 config.server.listen_addr, config.server.listen_port
22 )
23 .parse()?;
24
25 let middleware = MiddlewareChain::from_config(&config.middleware);
26
27 let (audit_sink, redaction_config) = build_audit(&config.audit);
28 let metrics = Arc::new(
29 ArbiterMetrics::new().map_err(|e| anyhow::anyhow!("failed to create metrics: {e}"))?,
30 );
31
32 let state = Arc::new(ProxyState::new(
33 config.upstream.url.clone(),
34 middleware,
35 audit_sink,
36 redaction_config,
37 metrics,
38 ));
39
40 let listener = TcpListener::bind(addr).await?;
41 tracing::info!(%addr, upstream = %config.upstream.url, "proxy listening");
42
43 let shutdown = shutdown_signal();
44 tokio::pin!(shutdown);
45
46 loop {
47 tokio::select! {
48 result = listener.accept() => {
49 let (stream, remote_addr) = result?;
50 let state = Arc::clone(&state);
51 tracing::debug!(%remote_addr, "accepted connection");
52
53 tokio::spawn(async move {
54 let io = TokioIo::new(stream);
55 let svc = service_fn(move |req| {
56 let state = Arc::clone(&state);
57 handle_request(state, req)
58 });
59 if let Err(e) = http1::Builder::new()
60 .serve_connection(io, svc)
61 .await
62 {
63 tracing::error!(error = %e, %remote_addr, "connection error");
64 }
65 });
66 }
67 _ = &mut shutdown => {
68 tracing::info!("shutdown signal received, stopping");
69 break;
70 }
71 }
72 }
73
74 Ok(())
75}
76
77async fn shutdown_signal() {
79 let ctrl_c = async {
80 signal::ctrl_c()
81 .await
82 .expect("failed to install ctrl-c handler");
83 };
84
85 #[cfg(unix)]
86 let terminate = async {
87 signal::unix::signal(signal::unix::SignalKind::terminate())
88 .expect("failed to install SIGTERM handler")
89 .recv()
90 .await;
91 };
92
93 #[cfg(not(unix))]
94 let terminate = std::future::pending::<()>();
95
96 tokio::select! {
97 _ = ctrl_c => {}
98 _ = terminate => {}
99 }
100}