Skip to main content

nono_proxy/
server.rs

1//! Proxy server: TCP listener, connection dispatch, and lifecycle.
2//!
3//! The server binds to `127.0.0.1:0` (OS-assigned port), accepts TCP
4//! connections, reads the first HTTP line to determine the mode, and
5//! dispatches to the appropriate handler.
6//!
7//! CONNECT method -> [`connect`] or [`external`] handler
8//! Other methods  -> [`reverse`] handler (credential injection)
9
10use crate::config::ProxyConfig;
11use crate::connect;
12use crate::credential::CredentialStore;
13use crate::error::{ProxyError, Result};
14use crate::external;
15use crate::filter::ProxyFilter;
16use crate::reverse;
17use crate::token;
18use std::net::SocketAddr;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::TcpListener;
23use tokio::sync::watch;
24use tracing::{debug, info, warn};
25use zeroize::Zeroizing;
26
27/// Maximum total size of HTTP headers (64 KiB). Prevents OOM from
28/// malicious clients sending unbounded header data.
29const MAX_HEADER_SIZE: usize = 64 * 1024;
30
31/// Handle returned when the proxy server starts.
32///
33/// Contains the assigned port, session token, and a shutdown channel.
34/// Drop the handle or send to `shutdown_tx` to stop the proxy.
35pub struct ProxyHandle {
36    /// The actual port the proxy is listening on
37    pub port: u16,
38    /// Session token for client authentication
39    pub token: Zeroizing<String>,
40    /// Send `true` to trigger graceful shutdown
41    shutdown_tx: watch::Sender<bool>,
42}
43
44impl ProxyHandle {
45    /// Signal the proxy to shut down gracefully.
46    pub fn shutdown(&self) {
47        let _ = self.shutdown_tx.send(true);
48    }
49
50    /// Environment variables to inject into the child process.
51    ///
52    /// The proxy URL includes `nono:<token>@` userinfo so that standard HTTP
53    /// clients (curl, Python requests, etc.) automatically send
54    /// `Proxy-Authorization: Basic ...` on every request. The raw token is
55    /// also provided via `NONO_PROXY_TOKEN` for nono-aware clients that
56    /// prefer Bearer auth.
57    #[must_use]
58    pub fn env_vars(&self) -> Vec<(String, String)> {
59        let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
60
61        let mut vars = vec![
62            ("HTTP_PROXY".to_string(), proxy_url.clone()),
63            ("HTTPS_PROXY".to_string(), proxy_url.clone()),
64            ("NO_PROXY".to_string(), "localhost,127.0.0.1".to_string()),
65            ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
66        ];
67
68        // Lowercase variants for compatibility
69        vars.push(("http_proxy".to_string(), proxy_url.clone()));
70        vars.push(("https_proxy".to_string(), proxy_url));
71        vars.push(("no_proxy".to_string(), "localhost,127.0.0.1".to_string()));
72
73        // Node.js v22.21.0+ / v24.0.0+ requires this flag for native fetch() to use HTTP_PROXY
74        vars.push(("NODE_USE_ENV_PROXY".to_string(), "1".to_string()));
75
76        vars
77    }
78
79    /// Environment variables for reverse proxy credential routes.
80    ///
81    /// Returns two types of env vars per route:
82    /// 1. SDK base URL overrides (e.g., `OPENAI_BASE_URL=http://127.0.0.1:PORT/openai`)
83    /// 2. SDK API key vars set to the session token (e.g., `OPENAI_API_KEY=<token>`)
84    ///
85    /// The SDK sends the session token as its "API key" (phantom token pattern).
86    /// The proxy validates this token and swaps it for the real credential.
87    #[must_use]
88    pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
89        let mut vars = Vec::new();
90        for route in &config.routes {
91            // Base URL override (e.g., OPENAI_BASE_URL)
92            let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
93            let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
94            vars.push((base_url_name, url));
95
96            // API key set to session token (phantom token pattern).
97            // Use explicit env_var if set (required for op:// URIs), otherwise
98            // fall back to uppercasing the credential_key (e.g., "openai_api_key" -> "OPENAI_API_KEY").
99            if let Some(ref env_var) = route.env_var {
100                vars.push((env_var.clone(), self.token.to_string()));
101            } else if let Some(ref cred_key) = route.credential_key {
102                let api_key_name = cred_key.to_uppercase();
103                vars.push((api_key_name, self.token.to_string()));
104            }
105        }
106        vars
107    }
108}
109
110/// Shared state for the proxy server.
111struct ProxyState {
112    filter: ProxyFilter,
113    session_token: Zeroizing<String>,
114    credential_store: CredentialStore,
115    config: ProxyConfig,
116    /// Shared TLS connector for upstream connections (reverse proxy mode).
117    /// Created once at startup to avoid rebuilding the root cert store per request.
118    tls_connector: tokio_rustls::TlsConnector,
119    /// Active connection count for connection limiting.
120    active_connections: AtomicUsize,
121}
122
123/// Start the proxy server.
124///
125/// Binds to `config.bind_addr:config.bind_port` (port 0 = OS-assigned),
126/// generates a session token, and begins accepting connections.
127///
128/// Returns a `ProxyHandle` with the assigned port and session token.
129/// The server runs until the handle is dropped or `shutdown()` is called.
130pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
131    // Generate session token
132    let session_token = token::generate_session_token()?;
133
134    // Bind listener
135    let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
136    let listener = TcpListener::bind(bind_addr)
137        .await
138        .map_err(|e| ProxyError::Bind {
139            addr: bind_addr.to_string(),
140            source: e,
141        })?;
142
143    let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
144        addr: bind_addr.to_string(),
145        source: e,
146    })?;
147    let port = local_addr.port();
148
149    info!("Proxy server listening on {}", local_addr);
150
151    // Load credentials for reverse proxy routes
152    let credential_store = if config.routes.is_empty() {
153        CredentialStore::empty()
154    } else {
155        CredentialStore::load(&config.routes)?
156    };
157
158    // Build filter
159    let filter = if config.allowed_hosts.is_empty() {
160        ProxyFilter::allow_all()
161    } else {
162        ProxyFilter::new(&config.allowed_hosts)
163    };
164
165    // Build shared TLS connector (root cert store is expensive to construct).
166    // Use the ring provider explicitly to avoid ambiguity when multiple
167    // crypto providers are in the dependency tree.
168    let mut root_store = rustls::RootCertStore::empty();
169    root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
170    let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
171        rustls::crypto::ring::default_provider(),
172    ))
173    .with_safe_default_protocol_versions()
174    .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
175    .with_root_certificates(root_store)
176    .with_no_client_auth();
177    let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
178
179    // Shutdown channel
180    let (shutdown_tx, shutdown_rx) = watch::channel(false);
181
182    let state = Arc::new(ProxyState {
183        filter,
184        session_token: session_token.clone(),
185        credential_store,
186        config,
187        tls_connector,
188        active_connections: AtomicUsize::new(0),
189    });
190
191    // Spawn accept loop as a task within the current runtime.
192    // The caller MUST ensure this runtime is being driven (e.g., via
193    // a dedicated thread calling block_on or a multi-thread runtime).
194    tokio::spawn(accept_loop(listener, state, shutdown_rx));
195
196    Ok(ProxyHandle {
197        port,
198        token: session_token,
199        shutdown_tx,
200    })
201}
202
203/// Accept loop: listen for connections until shutdown.
204async fn accept_loop(
205    listener: TcpListener,
206    state: Arc<ProxyState>,
207    mut shutdown_rx: watch::Receiver<bool>,
208) {
209    loop {
210        tokio::select! {
211            result = listener.accept() => {
212                match result {
213                    Ok((stream, addr)) => {
214                        // Connection limit enforcement
215                        let max = state.config.max_connections;
216                        if max > 0 {
217                            let current = state.active_connections.load(Ordering::Relaxed);
218                            if current >= max {
219                                warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
220                                // Drop the stream (connection refused)
221                                drop(stream);
222                                continue;
223                            }
224                        }
225                        state.active_connections.fetch_add(1, Ordering::Relaxed);
226
227                        debug!("Accepted connection from {}", addr);
228                        let state = Arc::clone(&state);
229                        tokio::spawn(async move {
230                            if let Err(e) = handle_connection(stream, &state).await {
231                                debug!("Connection handler error: {}", e);
232                            }
233                            state.active_connections.fetch_sub(1, Ordering::Relaxed);
234                        });
235                    }
236                    Err(e) => {
237                        warn!("Accept error: {}", e);
238                    }
239                }
240            }
241            _ = shutdown_rx.changed() => {
242                if *shutdown_rx.borrow() {
243                    info!("Proxy server shutting down");
244                    return;
245                }
246            }
247        }
248    }
249}
250
251/// Handle a single client connection.
252///
253/// Reads the first HTTP line to determine the proxy mode:
254/// - CONNECT method -> tunnel (Mode 1 or 3)
255/// - Other methods  -> reverse proxy (Mode 2)
256async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
257    // Read the first line and headers through a BufReader.
258    // We keep the BufReader alive until we've consumed the full header
259    // to prevent data loss (BufReader may read ahead into the body).
260    let mut buf_reader = BufReader::new(&mut stream);
261    let mut first_line = String::new();
262    buf_reader.read_line(&mut first_line).await?;
263
264    if first_line.is_empty() {
265        return Ok(()); // Client disconnected
266    }
267
268    // Read remaining headers (up to empty line), with size limit to prevent OOM.
269    let mut header_bytes = Vec::new();
270    loop {
271        let mut line = String::new();
272        let n = buf_reader.read_line(&mut line).await?;
273        if n == 0 || line.trim().is_empty() {
274            break;
275        }
276        header_bytes.extend_from_slice(line.as_bytes());
277        if header_bytes.len() > MAX_HEADER_SIZE {
278            drop(buf_reader);
279            let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
280            stream.write_all(response.as_bytes()).await?;
281            return Ok(());
282        }
283    }
284
285    // Extract any data buffered beyond headers before dropping BufReader.
286    // BufReader may have read ahead into the request body. We capture
287    // those bytes and pass them to the reverse proxy handler so no body
288    // data is lost. For CONNECT requests this is always empty (no body).
289    let buffered = buf_reader.buffer().to_vec();
290    drop(buf_reader);
291
292    let first_line = first_line.trim_end();
293
294    // Dispatch by method
295    if first_line.starts_with("CONNECT ") {
296        // Check if external proxy is configured
297        if let Some(ref ext_config) = state.config.external_proxy {
298            external::handle_external_proxy(
299                first_line,
300                &mut stream,
301                &header_bytes,
302                &state.filter,
303                &state.session_token,
304                ext_config,
305            )
306            .await
307        } else {
308            connect::handle_connect(
309                first_line,
310                &mut stream,
311                &state.filter,
312                &state.session_token,
313                &header_bytes,
314            )
315            .await
316        }
317    } else if !state.credential_store.is_empty() {
318        // Non-CONNECT request with credential routes -> reverse proxy
319        let ctx = reverse::ReverseProxyCtx {
320            credential_store: &state.credential_store,
321            session_token: &state.session_token,
322            filter: &state.filter,
323            tls_connector: &state.tls_connector,
324        };
325        reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
326    } else {
327        // No credential routes configured, reject non-CONNECT requests
328        let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
329        stream.write_all(response.as_bytes()).await?;
330        Ok(())
331    }
332}
333
334#[cfg(test)]
335#[allow(clippy::unwrap_used)]
336mod tests {
337    use super::*;
338
339    #[tokio::test]
340    async fn test_proxy_starts_and_binds() {
341        let config = ProxyConfig::default();
342        let handle = start(config).await.unwrap();
343
344        // Port should be non-zero (OS-assigned)
345        assert!(handle.port > 0);
346        // Token should be 64 hex chars
347        assert_eq!(handle.token.len(), 64);
348
349        // Shutdown
350        handle.shutdown();
351    }
352
353    #[tokio::test]
354    async fn test_proxy_env_vars() {
355        let config = ProxyConfig::default();
356        let handle = start(config).await.unwrap();
357
358        let vars = handle.env_vars();
359        let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
360        assert!(http_proxy.is_some());
361        assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
362
363        let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
364        assert!(token_var.is_some());
365        assert_eq!(token_var.unwrap().1.len(), 64);
366
367        handle.shutdown();
368    }
369
370    #[tokio::test]
371    async fn test_proxy_credential_env_vars() {
372        let config = ProxyConfig {
373            routes: vec![crate::config::RouteConfig {
374                prefix: "openai".to_string(),
375                upstream: "https://api.openai.com".to_string(),
376                credential_key: None,
377                inject_mode: crate::config::InjectMode::Header,
378                inject_header: "Authorization".to_string(),
379                credential_format: "Bearer {}".to_string(),
380                path_pattern: None,
381                path_replacement: None,
382                query_param_name: None,
383                env_var: None,
384            }],
385            ..Default::default()
386        };
387        let handle = start(config.clone()).await.unwrap();
388
389        let vars = handle.credential_env_vars(&config);
390        assert_eq!(vars.len(), 1);
391        assert_eq!(vars[0].0, "OPENAI_BASE_URL");
392        assert!(vars[0].1.contains("/openai"));
393
394        handle.shutdown();
395    }
396
397    #[test]
398    fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
399        // When env_var is None and credential_key is set, the env var name
400        // should be derived from uppercasing credential_key. This is the
401        // backward-compatible path for keyring-backed credentials.
402        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
403        let handle = ProxyHandle {
404            port: 12345,
405            token: Zeroizing::new("test_token".to_string()),
406            shutdown_tx,
407        };
408        let config = ProxyConfig {
409            routes: vec![crate::config::RouteConfig {
410                prefix: "openai".to_string(),
411                upstream: "https://api.openai.com".to_string(),
412                credential_key: Some("openai_api_key".to_string()),
413                inject_mode: crate::config::InjectMode::Header,
414                inject_header: "Authorization".to_string(),
415                credential_format: "Bearer {}".to_string(),
416                path_pattern: None,
417                path_replacement: None,
418                query_param_name: None,
419                env_var: None, // No explicit env_var — should fall back to uppercase
420            }],
421            ..Default::default()
422        };
423
424        let vars = handle.credential_env_vars(&config);
425        assert_eq!(vars.len(), 2); // BASE_URL + API_KEY
426
427        // Should derive OPENAI_API_KEY from uppercasing "openai_api_key"
428        let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
429        assert!(
430            api_key_var.is_some(),
431            "Should derive env var name from credential_key.to_uppercase()"
432        );
433
434        let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
435        assert_eq!(val, "test_token");
436    }
437
438    #[test]
439    fn test_proxy_credential_env_vars_with_explicit_env_var() {
440        // When env_var is set on a route, it should be used instead of
441        // deriving from credential_key. This is essential for op:// URIs
442        // where uppercasing produces nonsensical env var names.
443        //
444        // We construct a ProxyHandle directly to test env var generation
445        // without starting a real proxy (which would try to load credentials).
446        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
447        let handle = ProxyHandle {
448            port: 12345,
449            token: Zeroizing::new("test_token".to_string()),
450            shutdown_tx,
451        };
452        let config = ProxyConfig {
453            routes: vec![crate::config::RouteConfig {
454                prefix: "openai".to_string(),
455                upstream: "https://api.openai.com".to_string(),
456                credential_key: Some("op://Development/OpenAI/credential".to_string()),
457                inject_mode: crate::config::InjectMode::Header,
458                inject_header: "Authorization".to_string(),
459                credential_format: "Bearer {}".to_string(),
460                path_pattern: None,
461                path_replacement: None,
462                query_param_name: None,
463                env_var: Some("OPENAI_API_KEY".to_string()),
464            }],
465            ..Default::default()
466        };
467
468        let vars = handle.credential_env_vars(&config);
469        assert_eq!(vars.len(), 2); // BASE_URL + API_KEY
470
471        let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
472        assert!(
473            api_key_var.is_some(),
474            "Should use explicit env_var name, not derive from credential_key"
475        );
476
477        // Verify the value is the phantom token, not the real credential
478        let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
479        assert_eq!(val, "test_token");
480
481        // Verify no nonsensical OP:// env var was generated
482        let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
483        assert!(
484            bad_var.is_none(),
485            "Should not generate env var from op:// URI uppercase"
486        );
487    }
488}