tiny_proxy/proxy/proxy.rs
1use hyper::body::Incoming;
2use hyper::service::service_fn;
3use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
4use hyper_util::client::legacy::connect::HttpConnector;
5use hyper_util::client::legacy::Client;
6use hyper_util::rt::TokioExecutor;
7use hyper_util::rt::TokioIo;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::net::TcpListener;
12use tokio::sync::Semaphore;
13use tracing::{info, warn};
14
15use crate::config::Config;
16use crate::proxy::handler::proxy;
17
18/// HTTP Proxy server that can be embedded into other applications
19///
20/// This struct encapsulates the proxy state and allows programmatic control
21/// over the proxy lifecycle.
22///
23/// # Example
24///
25/// ```no_run
26/// use tiny_proxy::{Config, Proxy};
27///
28/// #[tokio::main]
29/// async fn main() -> anyhow::Result<()> {
30/// let config = Config::from_file("file.caddy")?;
31/// let proxy = Proxy::new(config);
32/// proxy.start("127.0.0.1:8080").await?;
33/// Ok(())
34/// }
35/// ```
36pub struct Proxy {
37 config: Config,
38 client: Client<HttpsConnector<HttpConnector>, Incoming>,
39 max_concurrency: usize,
40 semaphore: Arc<Semaphore>,
41}
42
43impl Proxy {
44 /// Create a new proxy instance with the given configuration
45 ///
46 /// # Arguments
47 ///
48 /// * `config` - Configuration loaded from file or constructed programmatically
49 ///
50 /// # Returns
51 ///
52 /// A new `Proxy` instance ready to be started
53 pub fn new(config: Config) -> Self {
54 let mut http = HttpConnector::new();
55 http.set_keepalive(Some(Duration::from_secs(60)));
56 http.set_nodelay(true);
57 let https = HttpsConnectorBuilder::new()
58 .with_native_roots()
59 .unwrap()
60 .https_or_http()
61 .enable_http1()
62 .wrap_connector(http);
63
64 let client = Client::builder(TokioExecutor::new())
65 .pool_max_idle_per_host(100)
66 .pool_idle_timeout(Duration::from_secs(90))
67 .build::<_, Incoming>(https);
68
69 let max_concurrency = std::env::var("TINY_PROXY_MAX_CONCURRENCY")
70 .ok()
71 .and_then(|v| v.parse().ok())
72 .unwrap_or_else(|| num_cpus::get() * 256);
73
74 let semaphore = Arc::new(Semaphore::new(max_concurrency));
75
76 info!(
77 "Proxy initialized with max_concurrency={} (default: {})",
78 max_concurrency,
79 num_cpus::get() * 256
80 );
81
82 Self {
83 config,
84 client,
85 max_concurrency,
86 semaphore,
87 }
88 }
89
90 /// Start the proxy server on the specified address
91 ///
92 /// This method blocks indefinitely, handling incoming connections.
93 /// To run the proxy in the background, spawn it in a tokio task.
94 ///
95 /// # Arguments
96 ///
97 /// * `addr` - Address to listen on (e.g., "127.0.0.1:8080" or "0.0.0.0:8080")
98 ///
99 /// # Example
100 ///
101 /// ```no_run
102 /// # use tiny_proxy::{Config, Proxy};
103 /// # #[tokio::main]
104 /// # async fn main() -> anyhow::Result<()> {
105 /// # let config = Config::from_file("config.caddy")?;
106 /// # let proxy = Proxy::new(config);
107 /// proxy.start("127.0.0.1:8080").await?;
108 /// # Ok(())
109 /// # }
110 /// ```
111 ///
112 /// To run in background:
113 /// ```no_run
114 /// # use tiny_proxy::{Config, Proxy};
115 /// # #[tokio::main]
116 /// # async fn main() -> anyhow::Result<()> {
117 /// # let config = Config::from_file("config.caddy")?;
118 /// # let proxy = std::sync::Arc::new(Proxy::new(config));
119 /// let handle = tokio::spawn(async move {
120 /// if let Err(e) = proxy.start("127.0.0.1:8080").await {
121 /// eprintln!("Proxy error: {}", e);
122 /// }
123 /// });
124 /// # handle.await?;
125 /// # Ok(())
126 /// # }
127 /// ```
128 pub async fn start(&self, addr: &str) -> anyhow::Result<()> {
129 let addr: SocketAddr = addr.parse()?;
130 self.start_with_addr(addr).await
131 }
132
133 /// Start the proxy server with a parsed SocketAddr
134 ///
135 /// This is a convenience method if you already have a parsed SocketAddr.
136 ///
137 /// # Arguments
138 ///
139 /// * `addr` - Parsed SocketAddr to listen on
140 pub async fn start_with_addr(&self, addr: SocketAddr) -> anyhow::Result<()> {
141 let listener = TcpListener::bind(&addr).await?;
142
143 info!("Tiny Proxy listening on http://{}", addr);
144 info!(
145 "Max concurrency: {} ({})",
146 self.max_concurrency,
147 if self.max_concurrency == num_cpus::get() * 256 {
148 "default"
149 } else {
150 "custom"
151 }
152 );
153
154 loop {
155 let (stream, _) = listener.accept().await?;
156 let io = TokioIo::new(stream);
157 let client = self.client.clone();
158 let config = Arc::new(self.config.clone());
159 let semaphore = self.semaphore.clone();
160
161 match semaphore.try_acquire_owned() {
162 Ok(permit) => {
163 tokio::task::spawn(async move {
164 let _permit = permit;
165 let service = service_fn(move |req| {
166 let client = client.clone();
167 let config = config.clone();
168 proxy(req, client, config)
169 });
170
171 let mut builder = hyper::server::conn::http1::Builder::new();
172 builder.keep_alive(true).pipeline_flush(false);
173
174 builder.serve_connection(io, service).await
175 });
176 }
177 Err(_) => {
178 warn!(
179 "Concurrency limit exceeded ({}), rejecting connection",
180 self.max_concurrency
181 );
182 }
183 }
184 }
185 }
186
187 /// Get a reference to current configuration
188 ///
189 /// This allows inspection of current proxy configuration
190 /// without modifying it.
191 ///
192 /// # Returns
193 ///
194 /// Reference to current `Config`
195 pub fn config(&self) -> &Config {
196 &self.config
197 }
198
199 /// Get current concurrency limit
200 ///
201 /// # Returns
202 ///
203 /// Current maximum number of concurrent connections
204 pub fn max_concurrency(&self) -> usize {
205 self.max_concurrency
206 }
207
208 /// Update concurrency limit at runtime
209 ///
210 /// # Arguments
211 ///
212 /// * `max` - New maximum number of concurrent connections
213 ///
214 /// # Note
215 ///
216 /// This updates the semaphore immediately. New connections will use
217 /// the new limit, but existing connections are not affected.
218 pub fn set_max_concurrency(&mut self, max: usize) {
219 self.max_concurrency = max;
220 self.semaphore = Arc::new(Semaphore::new(max));
221 info!("Max concurrency updated to {}", max);
222 }
223
224 /// Update the configuration
225 ///
226 /// This allows hot-reload of configuration without restarting the proxy.
227 /// New connections will use the updated configuration immediately.
228 ///
229 /// # Arguments
230 ///
231 /// * `config` - New configuration to use
232 ///
233 /// # Note
234 ///
235 /// This operation is atomic for new connections. Existing connections
236 /// will continue to use their original configuration.
237 pub fn update_config(&mut self, config: Config) {
238 self.config = config;
239 info!("Configuration updated");
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn test_proxy_creation() {
249 let config = Config {
250 sites: std::collections::HashMap::new(),
251 };
252 let proxy = Proxy::new(config);
253 assert_eq!(proxy.config().sites.len(), 0);
254 }
255
256 #[test]
257 fn test_config_access() {
258 let mut config = Config {
259 sites: std::collections::HashMap::new(),
260 };
261 config.sites.insert(
262 "localhost:8080".to_string(),
263 crate::config::SiteConfig {
264 address: "localhost:8080".to_string(),
265 directives: vec![],
266 },
267 );
268
269 let proxy = Proxy::new(config);
270 assert_eq!(proxy.config().sites.len(), 1);
271 assert!(proxy.config().sites.contains_key("localhost:8080"));
272 }
273
274 #[test]
275 fn test_config_update() {
276 let config1 = Config {
277 sites: std::collections::HashMap::new(),
278 };
279 let mut proxy = Proxy::new(config1);
280 assert_eq!(proxy.config().sites.len(), 0);
281
282 let mut config2 = Config {
283 sites: std::collections::HashMap::new(),
284 };
285 config2.sites.insert(
286 "test.local".to_string(),
287 crate::config::SiteConfig {
288 address: "test.local".to_string(),
289 directives: vec![],
290 },
291 );
292
293 proxy.update_config(config2);
294 assert_eq!(proxy.config().sites.len(), 1);
295 assert!(proxy.config().sites.contains_key("test.local"));
296 }
297}