Skip to main content

tiny_proxy/proxy/
proxy.rs

1use hyper::body::Incoming;
2use hyper::service::service_fn;
3use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
4use hyper_util::client::legacy::connect::HttpConnector;
5use hyper_util::client::legacy::Client;
6use hyper_util::rt::TokioExecutor;
7use hyper_util::rt::TokioIo;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::net::TcpListener;
12use tokio::sync::Semaphore;
13use tracing::{info, warn};
14
15use crate::config::Config;
16use crate::proxy::handler::proxy;
17
18/// HTTP Proxy server that can be embedded into other applications
19///
20/// This struct encapsulates the proxy state and allows programmatic control
21/// over the proxy lifecycle.
22///
23/// # Example
24///
25/// ```no_run
26/// use tiny_proxy::{Config, Proxy};
27///
28/// #[tokio::main]
29/// async fn main() -> anyhow::Result<()> {
30///     let config = Config::from_file("file.caddy")?;
31///     let proxy = Proxy::new(config);
32///     proxy.start("127.0.0.1:8080").await?;
33///     Ok(())
34/// }
35/// ```
36pub struct Proxy {
37    config: Config,
38    client: Client<HttpsConnector<HttpConnector>, Incoming>,
39    max_concurrency: usize,
40    semaphore: Arc<Semaphore>,
41}
42
43impl Proxy {
44    /// Create a new proxy instance with the given configuration
45    ///
46    /// # Arguments
47    ///
48    /// * `config` - Configuration loaded from file or constructed programmatically
49    ///
50    /// # Returns
51    ///
52    /// A new `Proxy` instance ready to be started
53    pub fn new(config: Config) -> Self {
54        let mut http = HttpConnector::new();
55        http.set_keepalive(Some(Duration::from_secs(60)));
56        http.set_nodelay(true);
57        let https = HttpsConnectorBuilder::new()
58            .with_native_roots()
59            .unwrap()
60            .https_or_http()
61            .enable_http1()
62            .wrap_connector(http);
63
64        let client = Client::builder(TokioExecutor::new())
65            .pool_max_idle_per_host(100)
66            .pool_idle_timeout(Duration::from_secs(90))
67            .build::<_, Incoming>(https);
68
69        let max_concurrency = std::env::var("TINY_PROXY_MAX_CONCURRENCY")
70            .ok()
71            .and_then(|v| v.parse().ok())
72            .unwrap_or_else(|| num_cpus::get() * 256);
73
74        let semaphore = Arc::new(Semaphore::new(max_concurrency));
75
76        info!(
77            "Proxy initialized with max_concurrency={} (default: {})",
78            max_concurrency,
79            num_cpus::get() * 256
80        );
81
82        Self {
83            config,
84            client,
85            max_concurrency,
86            semaphore,
87        }
88    }
89
90    /// Start the proxy server on the specified address
91    ///
92    /// This method blocks indefinitely, handling incoming connections.
93    /// To run the proxy in the background, spawn it in a tokio task.
94    ///
95    /// # Arguments
96    ///
97    /// * `addr` - Address to listen on (e.g., "127.0.0.1:8080" or "0.0.0.0:8080")
98    ///
99    /// # Example
100    ///
101    /// ```no_run
102    /// # use tiny_proxy::{Config, Proxy};
103    /// # #[tokio::main]
104    /// # async fn main() -> anyhow::Result<()> {
105    /// # let config = Config::from_file("config.caddy")?;
106    /// # let proxy = Proxy::new(config);
107    /// proxy.start("127.0.0.1:8080").await?;
108    /// # Ok(())
109    /// # }
110    /// ```
111    ///
112    /// To run in background:
113    /// ```no_run
114    /// # use tiny_proxy::{Config, Proxy};
115    /// # #[tokio::main]
116    /// # async fn main() -> anyhow::Result<()> {
117    /// # let config = Config::from_file("config.caddy")?;
118    /// # let proxy = std::sync::Arc::new(Proxy::new(config));
119    /// let handle = tokio::spawn(async move {
120    ///     if let Err(e) = proxy.start("127.0.0.1:8080").await {
121    ///         eprintln!("Proxy error: {}", e);
122    ///     }
123    /// });
124    /// # handle.await?;
125    /// # Ok(())
126    /// # }
127    /// ```
128    pub async fn start(&self, addr: &str) -> anyhow::Result<()> {
129        let addr: SocketAddr = addr.parse()?;
130        self.start_with_addr(addr).await
131    }
132
133    /// Start the proxy server with a parsed SocketAddr
134    ///
135    /// This is a convenience method if you already have a parsed SocketAddr.
136    ///
137    /// # Arguments
138    ///
139    /// * `addr` - Parsed SocketAddr to listen on
140    pub async fn start_with_addr(&self, addr: SocketAddr) -> anyhow::Result<()> {
141        let listener = TcpListener::bind(&addr).await?;
142
143        info!("Tiny Proxy listening on http://{}", addr);
144        info!(
145            "Max concurrency: {} ({})",
146            self.max_concurrency,
147            if self.max_concurrency == num_cpus::get() * 256 {
148                "default"
149            } else {
150                "custom"
151            }
152        );
153
154        loop {
155            let (stream, _) = listener.accept().await?;
156            let io = TokioIo::new(stream);
157            let client = self.client.clone();
158            let config = Arc::new(self.config.clone());
159            let semaphore = self.semaphore.clone();
160
161            match semaphore.try_acquire_owned() {
162                Ok(permit) => {
163                    tokio::task::spawn(async move {
164                        let _permit = permit;
165                        let service = service_fn(move |req| {
166                            let client = client.clone();
167                            let config = config.clone();
168                            proxy(req, client, config)
169                        });
170
171                        let mut builder = hyper::server::conn::http1::Builder::new();
172                        builder.keep_alive(true).pipeline_flush(false);
173
174                        builder.serve_connection(io, service).await
175                    });
176                }
177                Err(_) => {
178                    warn!(
179                        "Concurrency limit exceeded ({}), rejecting connection",
180                        self.max_concurrency
181                    );
182                }
183            }
184        }
185    }
186
187    /// Get a reference to current configuration
188    ///
189    /// This allows inspection of current proxy configuration
190    /// without modifying it.
191    ///
192    /// # Returns
193    ///
194    /// Reference to current `Config`
195    pub fn config(&self) -> &Config {
196        &self.config
197    }
198
199    /// Get current concurrency limit
200    ///
201    /// # Returns
202    ///
203    /// Current maximum number of concurrent connections
204    pub fn max_concurrency(&self) -> usize {
205        self.max_concurrency
206    }
207
208    /// Update concurrency limit at runtime
209    ///
210    /// # Arguments
211    ///
212    /// * `max` - New maximum number of concurrent connections
213    ///
214    /// # Note
215    ///
216    /// This updates the semaphore immediately. New connections will use
217    /// the new limit, but existing connections are not affected.
218    pub fn set_max_concurrency(&mut self, max: usize) {
219        self.max_concurrency = max;
220        self.semaphore = Arc::new(Semaphore::new(max));
221        info!("Max concurrency updated to {}", max);
222    }
223
224    /// Update the configuration
225    ///
226    /// This allows hot-reload of configuration without restarting the proxy.
227    /// New connections will use the updated configuration immediately.
228    ///
229    /// # Arguments
230    ///
231    /// * `config` - New configuration to use
232    ///
233    /// # Note
234    ///
235    /// This operation is atomic for new connections. Existing connections
236    /// will continue to use their original configuration.
237    pub fn update_config(&mut self, config: Config) {
238        self.config = config;
239        info!("Configuration updated");
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn test_proxy_creation() {
249        let config = Config {
250            sites: std::collections::HashMap::new(),
251        };
252        let proxy = Proxy::new(config);
253        assert_eq!(proxy.config().sites.len(), 0);
254    }
255
256    #[test]
257    fn test_config_access() {
258        let mut config = Config {
259            sites: std::collections::HashMap::new(),
260        };
261        config.sites.insert(
262            "localhost:8080".to_string(),
263            crate::config::SiteConfig {
264                address: "localhost:8080".to_string(),
265                directives: vec![],
266            },
267        );
268
269        let proxy = Proxy::new(config);
270        assert_eq!(proxy.config().sites.len(), 1);
271        assert!(proxy.config().sites.contains_key("localhost:8080"));
272    }
273
274    #[test]
275    fn test_config_update() {
276        let config1 = Config {
277            sites: std::collections::HashMap::new(),
278        };
279        let mut proxy = Proxy::new(config1);
280        assert_eq!(proxy.config().sites.len(), 0);
281
282        let mut config2 = Config {
283            sites: std::collections::HashMap::new(),
284        };
285        config2.sites.insert(
286            "test.local".to_string(),
287            crate::config::SiteConfig {
288                address: "test.local".to_string(),
289                directives: vec![],
290            },
291        );
292
293        proxy.update_config(config2);
294        assert_eq!(proxy.config().sites.len(), 1);
295        assert!(proxy.config().sites.contains_key("test.local"));
296    }
297}