Skip to main content

nono_proxy/
server.rs

1//! Proxy server: TCP listener, connection dispatch, and lifecycle.
2//!
3//! The server binds to `127.0.0.1:0` (OS-assigned port), accepts TCP
4//! connections, reads the first HTTP line to determine the mode, and
5//! dispatches to the appropriate handler.
6//!
7//! CONNECT method -> [`connect`] or [`external`] handler
8//! Other methods  -> [`reverse`] handler (credential injection)
9
10use crate::audit;
11use crate::config::ProxyConfig;
12use crate::connect;
13use crate::credential::CredentialStore;
14use crate::error::{ProxyError, Result};
15use crate::external;
16use crate::filter::ProxyFilter;
17use crate::reverse;
18use crate::token;
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::net::TcpListener;
24use tokio::sync::watch;
25use tracing::{debug, info, warn};
26use zeroize::Zeroizing;
27
28/// Maximum total size of HTTP headers (64 KiB). Prevents OOM from
29/// malicious clients sending unbounded header data.
30const MAX_HEADER_SIZE: usize = 64 * 1024;
31
32/// Handle returned when the proxy server starts.
33///
34/// Contains the assigned port, session token, and a shutdown channel.
35/// Drop the handle or send to `shutdown_tx` to stop the proxy.
36pub struct ProxyHandle {
37    /// The actual port the proxy is listening on
38    pub port: u16,
39    /// Session token for client authentication
40    pub token: Zeroizing<String>,
41    /// Shared in-memory network audit log
42    audit_log: audit::SharedAuditLog,
43    /// Send `true` to trigger graceful shutdown
44    shutdown_tx: watch::Sender<bool>,
45    /// Route prefixes that have credentials actually loaded.
46    /// Routes whose credentials were unavailable are excluded so we
47    /// don't inject phantom tokens that shadow valid external credentials.
48    loaded_routes: std::collections::HashSet<String>,
49}
50
51impl ProxyHandle {
52    /// Signal the proxy to shut down gracefully.
53    pub fn shutdown(&self) {
54        let _ = self.shutdown_tx.send(true);
55    }
56
57    /// Drain and return collected network audit events.
58    #[must_use]
59    pub fn drain_audit_events(&self) -> Vec<nono::undo::NetworkAuditEvent> {
60        audit::drain_audit_events(&self.audit_log)
61    }
62
63    /// Environment variables to inject into the child process.
64    ///
65    /// The proxy URL includes `nono:<token>@` userinfo so that standard HTTP
66    /// clients (curl, Python requests, etc.) automatically send
67    /// `Proxy-Authorization: Basic ...` on every request. The raw token is
68    /// also provided via `NONO_PROXY_TOKEN` for nono-aware clients that
69    /// prefer Bearer auth.
70    #[must_use]
71    pub fn env_vars(&self) -> Vec<(String, String)> {
72        let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
73
74        let mut vars = vec![
75            ("HTTP_PROXY".to_string(), proxy_url.clone()),
76            ("HTTPS_PROXY".to_string(), proxy_url.clone()),
77            ("NO_PROXY".to_string(), "localhost,127.0.0.1".to_string()),
78            ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
79        ];
80
81        // Lowercase variants for compatibility
82        vars.push(("http_proxy".to_string(), proxy_url.clone()));
83        vars.push(("https_proxy".to_string(), proxy_url));
84        vars.push(("no_proxy".to_string(), "localhost,127.0.0.1".to_string()));
85
86        // Node.js v22.21.0+ / v24.0.0+ requires this flag for native fetch() to use HTTP_PROXY
87        vars.push(("NODE_USE_ENV_PROXY".to_string(), "1".to_string()));
88
89        vars
90    }
91
92    /// Environment variables for reverse proxy credential routes.
93    ///
94    /// Returns two types of env vars per route:
95    /// 1. SDK base URL overrides (e.g., `OPENAI_BASE_URL=http://127.0.0.1:PORT/openai`)
96    /// 2. SDK API key vars set to the session token (e.g., `OPENAI_API_KEY=<token>`)
97    ///
98    /// The SDK sends the session token as its "API key" (phantom token pattern).
99    /// The proxy validates this token and swaps it for the real credential.
100    #[must_use]
101    pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
102        let mut vars = Vec::new();
103        for route in &config.routes {
104            // Base URL override (e.g., OPENAI_BASE_URL)
105            let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
106            let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
107            vars.push((base_url_name, url));
108
109            // Only inject phantom token env vars for routes whose credentials
110            // were actually loaded. If a credential was unavailable (e.g.,
111            // GITHUB_TOKEN env var not set), injecting a phantom token would
112            // shadow valid credentials from other sources (keyring, gh auth).
113            if !self.loaded_routes.contains(&route.prefix) {
114                continue;
115            }
116
117            // API key set to session token (phantom token pattern).
118            // Use explicit env_var if set (required for op:// URIs), otherwise
119            // fall back to uppercasing the credential_key (e.g., "openai_api_key" -> "OPENAI_API_KEY").
120            if let Some(ref env_var) = route.env_var {
121                vars.push((env_var.clone(), self.token.to_string()));
122            } else if let Some(ref cred_key) = route.credential_key {
123                let api_key_name = cred_key.to_uppercase();
124                vars.push((api_key_name, self.token.to_string()));
125            }
126        }
127        vars
128    }
129}
130
131/// Shared state for the proxy server.
132struct ProxyState {
133    filter: ProxyFilter,
134    session_token: Zeroizing<String>,
135    credential_store: CredentialStore,
136    config: ProxyConfig,
137    /// Shared TLS connector for upstream connections (reverse proxy mode).
138    /// Created once at startup to avoid rebuilding the root cert store per request.
139    tls_connector: tokio_rustls::TlsConnector,
140    /// Active connection count for connection limiting.
141    active_connections: AtomicUsize,
142    /// Shared network audit log for this proxy session.
143    audit_log: audit::SharedAuditLog,
144}
145
146/// Start the proxy server.
147///
148/// Binds to `config.bind_addr:config.bind_port` (port 0 = OS-assigned),
149/// generates a session token, and begins accepting connections.
150///
151/// Returns a `ProxyHandle` with the assigned port and session token.
152/// The server runs until the handle is dropped or `shutdown()` is called.
153pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
154    // Generate session token
155    let session_token = token::generate_session_token()?;
156
157    // Bind listener
158    let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
159    let listener = TcpListener::bind(bind_addr)
160        .await
161        .map_err(|e| ProxyError::Bind {
162            addr: bind_addr.to_string(),
163            source: e,
164        })?;
165
166    let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
167        addr: bind_addr.to_string(),
168        source: e,
169    })?;
170    let port = local_addr.port();
171
172    info!("Proxy server listening on {}", local_addr);
173
174    // Load credentials for reverse proxy routes
175    let credential_store = if config.routes.is_empty() {
176        CredentialStore::empty()
177    } else {
178        CredentialStore::load(&config.routes)?
179    };
180    let loaded_routes = credential_store.loaded_prefixes();
181
182    // Build filter
183    let filter = if config.allowed_hosts.is_empty() {
184        ProxyFilter::allow_all()
185    } else {
186        ProxyFilter::new(&config.allowed_hosts)
187    };
188
189    // Build shared TLS connector (root cert store is expensive to construct).
190    // Use the ring provider explicitly to avoid ambiguity when multiple
191    // crypto providers are in the dependency tree.
192    let mut root_store = rustls::RootCertStore::empty();
193    root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
194    let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
195        rustls::crypto::ring::default_provider(),
196    ))
197    .with_safe_default_protocol_versions()
198    .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
199    .with_root_certificates(root_store)
200    .with_no_client_auth();
201    let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
202
203    // Shutdown channel
204    let (shutdown_tx, shutdown_rx) = watch::channel(false);
205    let audit_log = audit::new_audit_log();
206
207    let state = Arc::new(ProxyState {
208        filter,
209        session_token: session_token.clone(),
210        credential_store,
211        config,
212        tls_connector,
213        active_connections: AtomicUsize::new(0),
214        audit_log: Arc::clone(&audit_log),
215    });
216
217    // Spawn accept loop as a task within the current runtime.
218    // The caller MUST ensure this runtime is being driven (e.g., via
219    // a dedicated thread calling block_on or a multi-thread runtime).
220    tokio::spawn(accept_loop(listener, state, shutdown_rx));
221
222    Ok(ProxyHandle {
223        port,
224        token: session_token,
225        audit_log,
226        shutdown_tx,
227        loaded_routes,
228    })
229}
230
231/// Accept loop: listen for connections until shutdown.
232async fn accept_loop(
233    listener: TcpListener,
234    state: Arc<ProxyState>,
235    mut shutdown_rx: watch::Receiver<bool>,
236) {
237    loop {
238        tokio::select! {
239            result = listener.accept() => {
240                match result {
241                    Ok((stream, addr)) => {
242                        // Connection limit enforcement
243                        let max = state.config.max_connections;
244                        if max > 0 {
245                            let current = state.active_connections.load(Ordering::Relaxed);
246                            if current >= max {
247                                warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
248                                // Drop the stream (connection refused)
249                                drop(stream);
250                                continue;
251                            }
252                        }
253                        state.active_connections.fetch_add(1, Ordering::Relaxed);
254
255                        debug!("Accepted connection from {}", addr);
256                        let state = Arc::clone(&state);
257                        tokio::spawn(async move {
258                            if let Err(e) = handle_connection(stream, &state).await {
259                                debug!("Connection handler error: {}", e);
260                            }
261                            state.active_connections.fetch_sub(1, Ordering::Relaxed);
262                        });
263                    }
264                    Err(e) => {
265                        warn!("Accept error: {}", e);
266                    }
267                }
268            }
269            _ = shutdown_rx.changed() => {
270                if *shutdown_rx.borrow() {
271                    info!("Proxy server shutting down");
272                    return;
273                }
274            }
275        }
276    }
277}
278
279/// Handle a single client connection.
280///
281/// Reads the first HTTP line to determine the proxy mode:
282/// - CONNECT method -> tunnel (Mode 1 or 3)
283/// - Other methods  -> reverse proxy (Mode 2)
284async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
285    // Read the first line and headers through a BufReader.
286    // We keep the BufReader alive until we've consumed the full header
287    // to prevent data loss (BufReader may read ahead into the body).
288    let mut buf_reader = BufReader::new(&mut stream);
289    let mut first_line = String::new();
290    buf_reader.read_line(&mut first_line).await?;
291
292    if first_line.is_empty() {
293        return Ok(()); // Client disconnected
294    }
295
296    // Read remaining headers (up to empty line), with size limit to prevent OOM.
297    let mut header_bytes = Vec::new();
298    loop {
299        let mut line = String::new();
300        let n = buf_reader.read_line(&mut line).await?;
301        if n == 0 || line.trim().is_empty() {
302            break;
303        }
304        header_bytes.extend_from_slice(line.as_bytes());
305        if header_bytes.len() > MAX_HEADER_SIZE {
306            drop(buf_reader);
307            let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
308            stream.write_all(response.as_bytes()).await?;
309            return Ok(());
310        }
311    }
312
313    // Extract any data buffered beyond headers before dropping BufReader.
314    // BufReader may have read ahead into the request body. We capture
315    // those bytes and pass them to the reverse proxy handler so no body
316    // data is lost. For CONNECT requests this is always empty (no body).
317    let buffered = buf_reader.buffer().to_vec();
318    drop(buf_reader);
319
320    let first_line = first_line.trim_end();
321
322    // Dispatch by method
323    if first_line.starts_with("CONNECT ") {
324        // Check if external proxy is configured
325        if let Some(ref ext_config) = state.config.external_proxy {
326            external::handle_external_proxy(
327                first_line,
328                &mut stream,
329                &header_bytes,
330                &state.filter,
331                &state.session_token,
332                ext_config,
333                Some(&state.audit_log),
334            )
335            .await
336        } else {
337            connect::handle_connect(
338                first_line,
339                &mut stream,
340                &state.filter,
341                &state.session_token,
342                &header_bytes,
343                Some(&state.audit_log),
344            )
345            .await
346        }
347    } else if !state.credential_store.is_empty() {
348        // Non-CONNECT request with credential routes -> reverse proxy
349        let ctx = reverse::ReverseProxyCtx {
350            credential_store: &state.credential_store,
351            session_token: &state.session_token,
352            filter: &state.filter,
353            tls_connector: &state.tls_connector,
354            audit_log: Some(&state.audit_log),
355        };
356        reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
357    } else {
358        // No credential routes configured, reject non-CONNECT requests
359        let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
360        stream.write_all(response.as_bytes()).await?;
361        Ok(())
362    }
363}
364
365#[cfg(test)]
366#[allow(clippy::unwrap_used)]
367mod tests {
368    use super::*;
369
370    #[tokio::test]
371    async fn test_proxy_starts_and_binds() {
372        let config = ProxyConfig::default();
373        let handle = start(config).await.unwrap();
374
375        // Port should be non-zero (OS-assigned)
376        assert!(handle.port > 0);
377        // Token should be 64 hex chars
378        assert_eq!(handle.token.len(), 64);
379
380        // Shutdown
381        handle.shutdown();
382    }
383
384    #[tokio::test]
385    async fn test_proxy_env_vars() {
386        let config = ProxyConfig::default();
387        let handle = start(config).await.unwrap();
388
389        let vars = handle.env_vars();
390        let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
391        assert!(http_proxy.is_some());
392        assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
393
394        let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
395        assert!(token_var.is_some());
396        assert_eq!(token_var.unwrap().1.len(), 64);
397
398        handle.shutdown();
399    }
400
401    #[tokio::test]
402    async fn test_proxy_credential_env_vars() {
403        let config = ProxyConfig {
404            routes: vec![crate::config::RouteConfig {
405                prefix: "openai".to_string(),
406                upstream: "https://api.openai.com".to_string(),
407                credential_key: None,
408                inject_mode: crate::config::InjectMode::Header,
409                inject_header: "Authorization".to_string(),
410                credential_format: "Bearer {}".to_string(),
411                path_pattern: None,
412                path_replacement: None,
413                query_param_name: None,
414                env_var: None,
415            }],
416            ..Default::default()
417        };
418        let handle = start(config.clone()).await.unwrap();
419
420        let vars = handle.credential_env_vars(&config);
421        assert_eq!(vars.len(), 1);
422        assert_eq!(vars[0].0, "OPENAI_BASE_URL");
423        assert!(vars[0].1.contains("/openai"));
424
425        handle.shutdown();
426    }
427
428    #[test]
429    fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
430        // When env_var is None and credential_key is set, the env var name
431        // should be derived from uppercasing credential_key. This is the
432        // backward-compatible path for keyring-backed credentials.
433        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
434        let handle = ProxyHandle {
435            port: 12345,
436            token: Zeroizing::new("test_token".to_string()),
437            audit_log: audit::new_audit_log(),
438            shutdown_tx,
439            loaded_routes: ["openai".to_string()].into_iter().collect(),
440        };
441        let config = ProxyConfig {
442            routes: vec![crate::config::RouteConfig {
443                prefix: "openai".to_string(),
444                upstream: "https://api.openai.com".to_string(),
445                credential_key: Some("openai_api_key".to_string()),
446                inject_mode: crate::config::InjectMode::Header,
447                inject_header: "Authorization".to_string(),
448                credential_format: "Bearer {}".to_string(),
449                path_pattern: None,
450                path_replacement: None,
451                query_param_name: None,
452                env_var: None, // No explicit env_var — should fall back to uppercase
453            }],
454            ..Default::default()
455        };
456
457        let vars = handle.credential_env_vars(&config);
458        assert_eq!(vars.len(), 2); // BASE_URL + API_KEY
459
460        // Should derive OPENAI_API_KEY from uppercasing "openai_api_key"
461        let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
462        assert!(
463            api_key_var.is_some(),
464            "Should derive env var name from credential_key.to_uppercase()"
465        );
466
467        let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
468        assert_eq!(val, "test_token");
469    }
470
471    #[test]
472    fn test_proxy_credential_env_vars_with_explicit_env_var() {
473        // When env_var is set on a route, it should be used instead of
474        // deriving from credential_key. This is essential for op:// URIs
475        // where uppercasing produces nonsensical env var names.
476        //
477        // We construct a ProxyHandle directly to test env var generation
478        // without starting a real proxy (which would try to load credentials).
479        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
480        let handle = ProxyHandle {
481            port: 12345,
482            token: Zeroizing::new("test_token".to_string()),
483            audit_log: audit::new_audit_log(),
484            shutdown_tx,
485            loaded_routes: ["openai".to_string()].into_iter().collect(),
486        };
487        let config = ProxyConfig {
488            routes: vec![crate::config::RouteConfig {
489                prefix: "openai".to_string(),
490                upstream: "https://api.openai.com".to_string(),
491                credential_key: Some("op://Development/OpenAI/credential".to_string()),
492                inject_mode: crate::config::InjectMode::Header,
493                inject_header: "Authorization".to_string(),
494                credential_format: "Bearer {}".to_string(),
495                path_pattern: None,
496                path_replacement: None,
497                query_param_name: None,
498                env_var: Some("OPENAI_API_KEY".to_string()),
499            }],
500            ..Default::default()
501        };
502
503        let vars = handle.credential_env_vars(&config);
504        assert_eq!(vars.len(), 2); // BASE_URL + API_KEY
505
506        let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
507        assert!(
508            api_key_var.is_some(),
509            "Should use explicit env_var name, not derive from credential_key"
510        );
511
512        // Verify the value is the phantom token, not the real credential
513        let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
514        assert_eq!(val, "test_token");
515
516        // Verify no nonsensical OP:// env var was generated
517        let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
518        assert!(
519            bad_var.is_none(),
520            "Should not generate env var from op:// URI uppercase"
521        );
522    }
523
524    #[test]
525    fn test_proxy_credential_env_vars_skips_unloaded_routes() {
526        // When a credential is unavailable (e.g., GITHUB_TOKEN not set),
527        // the route should NOT inject a phantom token env var. Otherwise
528        // the phantom token shadows valid credentials from other sources
529        // like the system keyring. See: #234
530        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
531        let handle = ProxyHandle {
532            port: 12345,
533            token: Zeroizing::new("test_token".to_string()),
534            audit_log: audit::new_audit_log(),
535            shutdown_tx,
536            // Only "openai" was loaded; "github" credential was unavailable
537            loaded_routes: ["openai".to_string()].into_iter().collect(),
538        };
539        let config = ProxyConfig {
540            routes: vec![
541                crate::config::RouteConfig {
542                    prefix: "openai".to_string(),
543                    upstream: "https://api.openai.com".to_string(),
544                    credential_key: Some("openai_api_key".to_string()),
545                    inject_mode: crate::config::InjectMode::Header,
546                    inject_header: "Authorization".to_string(),
547                    credential_format: "Bearer {}".to_string(),
548                    path_pattern: None,
549                    path_replacement: None,
550                    query_param_name: None,
551                    env_var: None,
552                },
553                crate::config::RouteConfig {
554                    prefix: "github".to_string(),
555                    upstream: "https://api.github.com".to_string(),
556                    credential_key: Some("env://GITHUB_TOKEN".to_string()),
557                    inject_mode: crate::config::InjectMode::Header,
558                    inject_header: "Authorization".to_string(),
559                    credential_format: "token {}".to_string(),
560                    path_pattern: None,
561                    path_replacement: None,
562                    query_param_name: None,
563                    env_var: Some("GITHUB_TOKEN".to_string()),
564                },
565            ],
566            ..Default::default()
567        };
568
569        let vars = handle.credential_env_vars(&config);
570
571        // openai should have BASE_URL + API_KEY (credential loaded)
572        let openai_base = vars.iter().find(|(k, _)| k == "OPENAI_BASE_URL");
573        assert!(openai_base.is_some(), "loaded route should have BASE_URL");
574        let openai_key = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
575        assert!(openai_key.is_some(), "loaded route should have API key");
576
577        // github should have BASE_URL (always set for declared routes) but
578        // must NOT have GITHUB_TOKEN (credential was not loaded)
579        let github_base = vars.iter().find(|(k, _)| k == "GITHUB_BASE_URL");
580        assert!(
581            github_base.is_some(),
582            "declared route should still have BASE_URL"
583        );
584        let github_token = vars.iter().find(|(k, _)| k == "GITHUB_TOKEN");
585        assert!(
586            github_token.is_none(),
587            "unloaded route must not inject phantom GITHUB_TOKEN"
588        );
589    }
590}