Skip to main content

nono_proxy/
server.rs

1//! Proxy server: TCP listener, connection dispatch, and lifecycle.
2//!
3//! The server binds to `127.0.0.1:0` (OS-assigned port), accepts TCP
4//! connections, reads the first HTTP line to determine the mode, and
5//! dispatches to the appropriate handler.
6//!
7//! CONNECT method -> [`connect`] or [`external`] handler
8//! Other methods  -> [`reverse`] handler (credential injection)
9
10use crate::audit;
11use crate::config::ProxyConfig;
12use crate::connect;
13use crate::credential::CredentialStore;
14use crate::error::{ProxyError, Result};
15use crate::external;
16use crate::filter::ProxyFilter;
17use crate::reverse;
18use crate::token;
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::net::TcpListener;
24use tokio::sync::watch;
25use tracing::{debug, info, warn};
26use zeroize::Zeroizing;
27
28/// Maximum total size of HTTP headers (64 KiB). Prevents OOM from
29/// malicious clients sending unbounded header data.
30const MAX_HEADER_SIZE: usize = 64 * 1024;
31
32/// Handle returned when the proxy server starts.
33///
34/// Contains the assigned port, session token, and a shutdown channel.
35/// Drop the handle or send to `shutdown_tx` to stop the proxy.
36pub struct ProxyHandle {
37    /// The actual port the proxy is listening on
38    pub port: u16,
39    /// Session token for client authentication
40    pub token: Zeroizing<String>,
41    /// Shared in-memory network audit log
42    audit_log: audit::SharedAuditLog,
43    /// Send `true` to trigger graceful shutdown
44    shutdown_tx: watch::Sender<bool>,
45    /// Route prefixes that have credentials actually loaded.
46    /// Routes whose credentials were unavailable are excluded so we
47    /// don't inject phantom tokens that shadow valid external credentials.
48    loaded_routes: std::collections::HashSet<String>,
49}
50
51impl ProxyHandle {
52    /// Signal the proxy to shut down gracefully.
53    pub fn shutdown(&self) {
54        let _ = self.shutdown_tx.send(true);
55    }
56
57    /// Drain and return collected network audit events.
58    #[must_use]
59    pub fn drain_audit_events(&self) -> Vec<nono::undo::NetworkAuditEvent> {
60        audit::drain_audit_events(&self.audit_log)
61    }
62
63    /// Environment variables to inject into the child process.
64    ///
65    /// The proxy URL includes `nono:<token>@` userinfo so that standard HTTP
66    /// clients (curl, Python requests, etc.) automatically send
67    /// `Proxy-Authorization: Basic ...` on every request. The raw token is
68    /// also provided via `NONO_PROXY_TOKEN` for nono-aware clients that
69    /// prefer Bearer auth.
70    #[must_use]
71    pub fn env_vars(&self) -> Vec<(String, String)> {
72        let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
73
74        let mut vars = vec![
75            ("HTTP_PROXY".to_string(), proxy_url.clone()),
76            ("HTTPS_PROXY".to_string(), proxy_url.clone()),
77            ("NO_PROXY".to_string(), "localhost,127.0.0.1".to_string()),
78            ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
79        ];
80
81        // Lowercase variants for compatibility
82        vars.push(("http_proxy".to_string(), proxy_url.clone()));
83        vars.push(("https_proxy".to_string(), proxy_url));
84        vars.push(("no_proxy".to_string(), "localhost,127.0.0.1".to_string()));
85
86        // Node.js v22.21.0+ / v24.0.0+ requires this flag for native fetch() to use HTTP_PROXY
87        vars.push(("NODE_USE_ENV_PROXY".to_string(), "1".to_string()));
88
89        vars
90    }
91
92    /// Environment variables for reverse proxy credential routes.
93    ///
94    /// Returns two types of env vars per route:
95    /// 1. SDK base URL overrides (e.g., `OPENAI_BASE_URL=http://127.0.0.1:PORT/openai`)
96    /// 2. SDK API key vars set to the session token (e.g., `OPENAI_API_KEY=<token>`)
97    ///
98    /// The SDK sends the session token as its "API key" (phantom token pattern).
99    /// The proxy validates this token and swaps it for the real credential.
100    #[must_use]
101    pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
102        let mut vars = Vec::new();
103        for route in &config.routes {
104            // Base URL override (e.g., OPENAI_BASE_URL)
105            let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
106            let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
107            vars.push((base_url_name, url));
108
109            // Only inject phantom token env vars for routes whose credentials
110            // were actually loaded. If a credential was unavailable (e.g.,
111            // GITHUB_TOKEN env var not set), injecting a phantom token would
112            // shadow valid credentials from other sources (keyring, gh auth).
113            if !self.loaded_routes.contains(&route.prefix) {
114                continue;
115            }
116
117            // API key set to session token (phantom token pattern).
118            // Use explicit env_var if set (required for URI manager refs), otherwise
119            // fall back to uppercasing the credential_key (e.g., "openai_api_key" -> "OPENAI_API_KEY").
120            if let Some(ref env_var) = route.env_var {
121                vars.push((env_var.clone(), self.token.to_string()));
122            } else if let Some(ref cred_key) = route.credential_key {
123                let api_key_name = cred_key.to_uppercase();
124                vars.push((api_key_name, self.token.to_string()));
125            }
126        }
127        vars
128    }
129}
130
131/// Shared state for the proxy server.
132struct ProxyState {
133    filter: ProxyFilter,
134    session_token: Zeroizing<String>,
135    credential_store: CredentialStore,
136    config: ProxyConfig,
137    /// Shared TLS connector for upstream connections (reverse proxy mode).
138    /// Created once at startup to avoid rebuilding the root cert store per request.
139    tls_connector: tokio_rustls::TlsConnector,
140    /// Active connection count for connection limiting.
141    active_connections: AtomicUsize,
142    /// Shared network audit log for this proxy session.
143    audit_log: audit::SharedAuditLog,
144    /// Matcher for hosts that bypass the external proxy and route direct.
145    /// Built once at startup from `ExternalProxyConfig.bypass_hosts`.
146    bypass_matcher: external::BypassMatcher,
147}
148
149/// Start the proxy server.
150///
151/// Binds to `config.bind_addr:config.bind_port` (port 0 = OS-assigned),
152/// generates a session token, and begins accepting connections.
153///
154/// Returns a `ProxyHandle` with the assigned port and session token.
155/// The server runs until the handle is dropped or `shutdown()` is called.
156pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
157    // Generate session token
158    let session_token = token::generate_session_token()?;
159
160    // Bind listener
161    let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
162    let listener = TcpListener::bind(bind_addr)
163        .await
164        .map_err(|e| ProxyError::Bind {
165            addr: bind_addr.to_string(),
166            source: e,
167        })?;
168
169    let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
170        addr: bind_addr.to_string(),
171        source: e,
172    })?;
173    let port = local_addr.port();
174
175    info!("Proxy server listening on {}", local_addr);
176
177    // Load credentials for reverse proxy routes
178    let credential_store = if config.routes.is_empty() {
179        CredentialStore::empty()
180    } else {
181        CredentialStore::load(&config.routes)?
182    };
183    let loaded_routes = credential_store.loaded_prefixes();
184
185    // Build filter
186    let filter = if config.allowed_hosts.is_empty() {
187        ProxyFilter::allow_all()
188    } else {
189        ProxyFilter::new(&config.allowed_hosts)
190    };
191
192    // Build shared TLS connector (root cert store is expensive to construct).
193    // Use the ring provider explicitly to avoid ambiguity when multiple
194    // crypto providers are in the dependency tree.
195    let mut root_store = rustls::RootCertStore::empty();
196    root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
197    let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
198        rustls::crypto::ring::default_provider(),
199    ))
200    .with_safe_default_protocol_versions()
201    .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
202    .with_root_certificates(root_store)
203    .with_no_client_auth();
204    let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
205
206    // Build bypass matcher from external proxy config (once, not per-request)
207    let bypass_matcher = config
208        .external_proxy
209        .as_ref()
210        .map(|ext| external::BypassMatcher::new(&ext.bypass_hosts))
211        .unwrap_or_else(|| external::BypassMatcher::new(&[]));
212
213    // Shutdown channel
214    let (shutdown_tx, shutdown_rx) = watch::channel(false);
215    let audit_log = audit::new_audit_log();
216
217    let state = Arc::new(ProxyState {
218        filter,
219        session_token: session_token.clone(),
220        credential_store,
221        config,
222        tls_connector,
223        active_connections: AtomicUsize::new(0),
224        audit_log: Arc::clone(&audit_log),
225        bypass_matcher,
226    });
227
228    // Spawn accept loop as a task within the current runtime.
229    // The caller MUST ensure this runtime is being driven (e.g., via
230    // a dedicated thread calling block_on or a multi-thread runtime).
231    tokio::spawn(accept_loop(listener, state, shutdown_rx));
232
233    Ok(ProxyHandle {
234        port,
235        token: session_token,
236        audit_log,
237        shutdown_tx,
238        loaded_routes,
239    })
240}
241
242/// Accept loop: listen for connections until shutdown.
243async fn accept_loop(
244    listener: TcpListener,
245    state: Arc<ProxyState>,
246    mut shutdown_rx: watch::Receiver<bool>,
247) {
248    loop {
249        tokio::select! {
250            result = listener.accept() => {
251                match result {
252                    Ok((stream, addr)) => {
253                        // Connection limit enforcement
254                        let max = state.config.max_connections;
255                        if max > 0 {
256                            let current = state.active_connections.load(Ordering::Relaxed);
257                            if current >= max {
258                                warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
259                                // Drop the stream (connection refused)
260                                drop(stream);
261                                continue;
262                            }
263                        }
264                        state.active_connections.fetch_add(1, Ordering::Relaxed);
265
266                        debug!("Accepted connection from {}", addr);
267                        let state = Arc::clone(&state);
268                        tokio::spawn(async move {
269                            if let Err(e) = handle_connection(stream, &state).await {
270                                debug!("Connection handler error: {}", e);
271                            }
272                            state.active_connections.fetch_sub(1, Ordering::Relaxed);
273                        });
274                    }
275                    Err(e) => {
276                        warn!("Accept error: {}", e);
277                    }
278                }
279            }
280            _ = shutdown_rx.changed() => {
281                if *shutdown_rx.borrow() {
282                    info!("Proxy server shutting down");
283                    return;
284                }
285            }
286        }
287    }
288}
289
290/// Handle a single client connection.
291///
292/// Reads the first HTTP line to determine the proxy mode:
293/// - CONNECT method -> tunnel (Mode 1 or 3)
294/// - Other methods  -> reverse proxy (Mode 2)
295async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
296    // Read the first line and headers through a BufReader.
297    // We keep the BufReader alive until we've consumed the full header
298    // to prevent data loss (BufReader may read ahead into the body).
299    let mut buf_reader = BufReader::new(&mut stream);
300    let mut first_line = String::new();
301    buf_reader.read_line(&mut first_line).await?;
302
303    if first_line.is_empty() {
304        return Ok(()); // Client disconnected
305    }
306
307    // Read remaining headers (up to empty line), with size limit to prevent OOM.
308    let mut header_bytes = Vec::new();
309    loop {
310        let mut line = String::new();
311        let n = buf_reader.read_line(&mut line).await?;
312        if n == 0 || line.trim().is_empty() {
313            break;
314        }
315        header_bytes.extend_from_slice(line.as_bytes());
316        if header_bytes.len() > MAX_HEADER_SIZE {
317            drop(buf_reader);
318            let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
319            stream.write_all(response.as_bytes()).await?;
320            return Ok(());
321        }
322    }
323
324    // Extract any data buffered beyond headers before dropping BufReader.
325    // BufReader may have read ahead into the request body. We capture
326    // those bytes and pass them to the reverse proxy handler so no body
327    // data is lost. For CONNECT requests this is always empty (no body).
328    let buffered = buf_reader.buffer().to_vec();
329    drop(buf_reader);
330
331    let first_line = first_line.trim_end();
332
333    // Dispatch by method
334    if first_line.starts_with("CONNECT ") {
335        // Check if external proxy is configured and host is not bypassed
336        let use_external = if let Some(ref ext_config) = state.config.external_proxy {
337            if state.bypass_matcher.is_empty() {
338                Some(ext_config)
339            } else {
340                // Parse host from CONNECT line to check bypass
341                let host = first_line
342                    .split_whitespace()
343                    .nth(1)
344                    .and_then(|authority| {
345                        authority
346                            .rsplit_once(':')
347                            .map(|(h, _)| h)
348                            .or(Some(authority))
349                    })
350                    .unwrap_or("");
351                if state.bypass_matcher.matches(host) {
352                    debug!("Bypassing external proxy for {}", host);
353                    None
354                } else {
355                    Some(ext_config)
356                }
357            }
358        } else {
359            None
360        };
361
362        if let Some(ext_config) = use_external {
363            external::handle_external_proxy(
364                first_line,
365                &mut stream,
366                &header_bytes,
367                &state.filter,
368                &state.session_token,
369                ext_config,
370                Some(&state.audit_log),
371            )
372            .await
373        } else if state.config.external_proxy.is_some() {
374            // Bypass route: enforce strict session token validation before
375            // routing direct. Without this, bypassed hosts would inherit
376            // connect::handle_connect()'s lenient auth (which tolerates
377            // missing Proxy-Authorization for Node.js undici compat).
378            token::validate_proxy_auth(&header_bytes, &state.session_token)?;
379            connect::handle_connect(
380                first_line,
381                &mut stream,
382                &state.filter,
383                &state.session_token,
384                &header_bytes,
385                Some(&state.audit_log),
386            )
387            .await
388        } else {
389            connect::handle_connect(
390                first_line,
391                &mut stream,
392                &state.filter,
393                &state.session_token,
394                &header_bytes,
395                Some(&state.audit_log),
396            )
397            .await
398        }
399    } else if !state.credential_store.is_empty() {
400        // Non-CONNECT request with credential routes -> reverse proxy
401        let ctx = reverse::ReverseProxyCtx {
402            credential_store: &state.credential_store,
403            session_token: &state.session_token,
404            filter: &state.filter,
405            tls_connector: &state.tls_connector,
406            audit_log: Some(&state.audit_log),
407        };
408        reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
409    } else {
410        // No credential routes configured, reject non-CONNECT requests
411        let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
412        stream.write_all(response.as_bytes()).await?;
413        Ok(())
414    }
415}
416
417#[cfg(test)]
418#[allow(clippy::unwrap_used)]
419mod tests {
420    use super::*;
421
422    #[tokio::test]
423    async fn test_proxy_starts_and_binds() {
424        let config = ProxyConfig::default();
425        let handle = start(config).await.unwrap();
426
427        // Port should be non-zero (OS-assigned)
428        assert!(handle.port > 0);
429        // Token should be 64 hex chars
430        assert_eq!(handle.token.len(), 64);
431
432        // Shutdown
433        handle.shutdown();
434    }
435
436    #[tokio::test]
437    async fn test_proxy_env_vars() {
438        let config = ProxyConfig::default();
439        let handle = start(config).await.unwrap();
440
441        let vars = handle.env_vars();
442        let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
443        assert!(http_proxy.is_some());
444        assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
445
446        let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
447        assert!(token_var.is_some());
448        assert_eq!(token_var.unwrap().1.len(), 64);
449
450        handle.shutdown();
451    }
452
453    #[tokio::test]
454    async fn test_proxy_credential_env_vars() {
455        let config = ProxyConfig {
456            routes: vec![crate::config::RouteConfig {
457                prefix: "openai".to_string(),
458                upstream: "https://api.openai.com".to_string(),
459                credential_key: None,
460                inject_mode: crate::config::InjectMode::Header,
461                inject_header: "Authorization".to_string(),
462                credential_format: "Bearer {}".to_string(),
463                path_pattern: None,
464                path_replacement: None,
465                query_param_name: None,
466                env_var: None,
467            }],
468            ..Default::default()
469        };
470        let handle = start(config.clone()).await.unwrap();
471
472        let vars = handle.credential_env_vars(&config);
473        assert_eq!(vars.len(), 1);
474        assert_eq!(vars[0].0, "OPENAI_BASE_URL");
475        assert!(vars[0].1.contains("/openai"));
476
477        handle.shutdown();
478    }
479
480    #[test]
481    fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
482        // When env_var is None and credential_key is set, the env var name
483        // should be derived from uppercasing credential_key. This is the
484        // backward-compatible path for keyring-backed credentials.
485        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
486        let handle = ProxyHandle {
487            port: 12345,
488            token: Zeroizing::new("test_token".to_string()),
489            audit_log: audit::new_audit_log(),
490            shutdown_tx,
491            loaded_routes: ["openai".to_string()].into_iter().collect(),
492        };
493        let config = ProxyConfig {
494            routes: vec![crate::config::RouteConfig {
495                prefix: "openai".to_string(),
496                upstream: "https://api.openai.com".to_string(),
497                credential_key: Some("openai_api_key".to_string()),
498                inject_mode: crate::config::InjectMode::Header,
499                inject_header: "Authorization".to_string(),
500                credential_format: "Bearer {}".to_string(),
501                path_pattern: None,
502                path_replacement: None,
503                query_param_name: None,
504                env_var: None, // No explicit env_var — should fall back to uppercase
505            }],
506            ..Default::default()
507        };
508
509        let vars = handle.credential_env_vars(&config);
510        assert_eq!(vars.len(), 2); // BASE_URL + API_KEY
511
512        // Should derive OPENAI_API_KEY from uppercasing "openai_api_key"
513        let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
514        assert!(
515            api_key_var.is_some(),
516            "Should derive env var name from credential_key.to_uppercase()"
517        );
518
519        let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
520        assert_eq!(val, "test_token");
521    }
522
523    #[test]
524    fn test_proxy_credential_env_vars_with_explicit_env_var() {
525        // When env_var is set on a route, it should be used instead of
526        // deriving from credential_key. This is essential for URI manager
527        // credential refs (e.g., op://, apple-password://)
528        // where uppercasing produces nonsensical env var names.
529        //
530        // We construct a ProxyHandle directly to test env var generation
531        // without starting a real proxy (which would try to load credentials).
532        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
533        let handle = ProxyHandle {
534            port: 12345,
535            token: Zeroizing::new("test_token".to_string()),
536            audit_log: audit::new_audit_log(),
537            shutdown_tx,
538            loaded_routes: ["openai".to_string()].into_iter().collect(),
539        };
540        let config = ProxyConfig {
541            routes: vec![crate::config::RouteConfig {
542                prefix: "openai".to_string(),
543                upstream: "https://api.openai.com".to_string(),
544                credential_key: Some("op://Development/OpenAI/credential".to_string()),
545                inject_mode: crate::config::InjectMode::Header,
546                inject_header: "Authorization".to_string(),
547                credential_format: "Bearer {}".to_string(),
548                path_pattern: None,
549                path_replacement: None,
550                query_param_name: None,
551                env_var: Some("OPENAI_API_KEY".to_string()),
552            }],
553            ..Default::default()
554        };
555
556        let vars = handle.credential_env_vars(&config);
557        assert_eq!(vars.len(), 2); // BASE_URL + API_KEY
558
559        let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
560        assert!(
561            api_key_var.is_some(),
562            "Should use explicit env_var name, not derive from credential_key"
563        );
564
565        // Verify the value is the phantom token, not the real credential
566        let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
567        assert_eq!(val, "test_token");
568
569        // Verify no nonsensical OP:// env var was generated
570        let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
571        assert!(
572            bad_var.is_none(),
573            "Should not generate env var from op:// URI uppercase"
574        );
575    }
576
577    #[test]
578    fn test_proxy_credential_env_vars_skips_unloaded_routes() {
579        // When a credential is unavailable (e.g., GITHUB_TOKEN not set),
580        // the route should NOT inject a phantom token env var. Otherwise
581        // the phantom token shadows valid credentials from other sources
582        // like the system keyring. See: #234
583        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
584        let handle = ProxyHandle {
585            port: 12345,
586            token: Zeroizing::new("test_token".to_string()),
587            audit_log: audit::new_audit_log(),
588            shutdown_tx,
589            // Only "openai" was loaded; "github" credential was unavailable
590            loaded_routes: ["openai".to_string()].into_iter().collect(),
591        };
592        let config = ProxyConfig {
593            routes: vec![
594                crate::config::RouteConfig {
595                    prefix: "openai".to_string(),
596                    upstream: "https://api.openai.com".to_string(),
597                    credential_key: Some("openai_api_key".to_string()),
598                    inject_mode: crate::config::InjectMode::Header,
599                    inject_header: "Authorization".to_string(),
600                    credential_format: "Bearer {}".to_string(),
601                    path_pattern: None,
602                    path_replacement: None,
603                    query_param_name: None,
604                    env_var: None,
605                },
606                crate::config::RouteConfig {
607                    prefix: "github".to_string(),
608                    upstream: "https://api.github.com".to_string(),
609                    credential_key: Some("env://GITHUB_TOKEN".to_string()),
610                    inject_mode: crate::config::InjectMode::Header,
611                    inject_header: "Authorization".to_string(),
612                    credential_format: "token {}".to_string(),
613                    path_pattern: None,
614                    path_replacement: None,
615                    query_param_name: None,
616                    env_var: Some("GITHUB_TOKEN".to_string()),
617                },
618            ],
619            ..Default::default()
620        };
621
622        let vars = handle.credential_env_vars(&config);
623
624        // openai should have BASE_URL + API_KEY (credential loaded)
625        let openai_base = vars.iter().find(|(k, _)| k == "OPENAI_BASE_URL");
626        assert!(openai_base.is_some(), "loaded route should have BASE_URL");
627        let openai_key = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
628        assert!(openai_key.is_some(), "loaded route should have API key");
629
630        // github should have BASE_URL (always set for declared routes) but
631        // must NOT have GITHUB_TOKEN (credential was not loaded)
632        let github_base = vars.iter().find(|(k, _)| k == "GITHUB_BASE_URL");
633        assert!(
634            github_base.is_some(),
635            "declared route should still have BASE_URL"
636        );
637        let github_token = vars.iter().find(|(k, _)| k == "GITHUB_TOKEN");
638        assert!(
639            github_token.is_none(),
640            "unloaded route must not inject phantom GITHUB_TOKEN"
641        );
642    }
643}