1use crate::audit;
11use crate::config::ProxyConfig;
12use crate::connect;
13use crate::credential::CredentialStore;
14use crate::error::{ProxyError, Result};
15use crate::external;
16use crate::filter::ProxyFilter;
17use crate::reverse;
18use crate::token;
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::net::TcpListener;
24use tokio::sync::watch;
25use tracing::{debug, info, warn};
26use zeroize::Zeroizing;
27
28const MAX_HEADER_SIZE: usize = 64 * 1024;
31
32pub struct ProxyHandle {
37 pub port: u16,
39 pub token: Zeroizing<String>,
41 audit_log: audit::SharedAuditLog,
43 shutdown_tx: watch::Sender<bool>,
45 loaded_routes: std::collections::HashSet<String>,
49}
50
51impl ProxyHandle {
52 pub fn shutdown(&self) {
54 let _ = self.shutdown_tx.send(true);
55 }
56
57 #[must_use]
59 pub fn drain_audit_events(&self) -> Vec<nono::undo::NetworkAuditEvent> {
60 audit::drain_audit_events(&self.audit_log)
61 }
62
63 #[must_use]
71 pub fn env_vars(&self) -> Vec<(String, String)> {
72 let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
73
74 let mut vars = vec![
75 ("HTTP_PROXY".to_string(), proxy_url.clone()),
76 ("HTTPS_PROXY".to_string(), proxy_url.clone()),
77 ("NO_PROXY".to_string(), "localhost,127.0.0.1".to_string()),
78 ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
79 ];
80
81 vars.push(("http_proxy".to_string(), proxy_url.clone()));
83 vars.push(("https_proxy".to_string(), proxy_url));
84 vars.push(("no_proxy".to_string(), "localhost,127.0.0.1".to_string()));
85
86 vars.push(("NODE_USE_ENV_PROXY".to_string(), "1".to_string()));
88
89 vars
90 }
91
92 #[must_use]
101 pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
102 let mut vars = Vec::new();
103 for route in &config.routes {
104 let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
106 let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
107 vars.push((base_url_name, url));
108
109 if !self.loaded_routes.contains(&route.prefix) {
114 continue;
115 }
116
117 if let Some(ref env_var) = route.env_var {
121 vars.push((env_var.clone(), self.token.to_string()));
122 } else if let Some(ref cred_key) = route.credential_key {
123 let api_key_name = cred_key.to_uppercase();
124 vars.push((api_key_name, self.token.to_string()));
125 }
126 }
127 vars
128 }
129}
130
131struct ProxyState {
133 filter: ProxyFilter,
134 session_token: Zeroizing<String>,
135 credential_store: CredentialStore,
136 config: ProxyConfig,
137 tls_connector: tokio_rustls::TlsConnector,
140 active_connections: AtomicUsize,
142 audit_log: audit::SharedAuditLog,
144 bypass_matcher: external::BypassMatcher,
147}
148
149pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
157 let session_token = token::generate_session_token()?;
159
160 let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
162 let listener = TcpListener::bind(bind_addr)
163 .await
164 .map_err(|e| ProxyError::Bind {
165 addr: bind_addr.to_string(),
166 source: e,
167 })?;
168
169 let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
170 addr: bind_addr.to_string(),
171 source: e,
172 })?;
173 let port = local_addr.port();
174
175 info!("Proxy server listening on {}", local_addr);
176
177 let credential_store = if config.routes.is_empty() {
179 CredentialStore::empty()
180 } else {
181 CredentialStore::load(&config.routes)?
182 };
183 let loaded_routes = credential_store.loaded_prefixes();
184
185 let filter = if config.allowed_hosts.is_empty() {
187 ProxyFilter::allow_all()
188 } else {
189 ProxyFilter::new(&config.allowed_hosts)
190 };
191
192 let mut root_store = rustls::RootCertStore::empty();
196 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
197 let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
198 rustls::crypto::ring::default_provider(),
199 ))
200 .with_safe_default_protocol_versions()
201 .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
202 .with_root_certificates(root_store)
203 .with_no_client_auth();
204 let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
205
206 let bypass_matcher = config
208 .external_proxy
209 .as_ref()
210 .map(|ext| external::BypassMatcher::new(&ext.bypass_hosts))
211 .unwrap_or_else(|| external::BypassMatcher::new(&[]));
212
213 let (shutdown_tx, shutdown_rx) = watch::channel(false);
215 let audit_log = audit::new_audit_log();
216
217 let state = Arc::new(ProxyState {
218 filter,
219 session_token: session_token.clone(),
220 credential_store,
221 config,
222 tls_connector,
223 active_connections: AtomicUsize::new(0),
224 audit_log: Arc::clone(&audit_log),
225 bypass_matcher,
226 });
227
228 tokio::spawn(accept_loop(listener, state, shutdown_rx));
232
233 Ok(ProxyHandle {
234 port,
235 token: session_token,
236 audit_log,
237 shutdown_tx,
238 loaded_routes,
239 })
240}
241
242async fn accept_loop(
244 listener: TcpListener,
245 state: Arc<ProxyState>,
246 mut shutdown_rx: watch::Receiver<bool>,
247) {
248 loop {
249 tokio::select! {
250 result = listener.accept() => {
251 match result {
252 Ok((stream, addr)) => {
253 let max = state.config.max_connections;
255 if max > 0 {
256 let current = state.active_connections.load(Ordering::Relaxed);
257 if current >= max {
258 warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
259 drop(stream);
261 continue;
262 }
263 }
264 state.active_connections.fetch_add(1, Ordering::Relaxed);
265
266 debug!("Accepted connection from {}", addr);
267 let state = Arc::clone(&state);
268 tokio::spawn(async move {
269 if let Err(e) = handle_connection(stream, &state).await {
270 debug!("Connection handler error: {}", e);
271 }
272 state.active_connections.fetch_sub(1, Ordering::Relaxed);
273 });
274 }
275 Err(e) => {
276 warn!("Accept error: {}", e);
277 }
278 }
279 }
280 _ = shutdown_rx.changed() => {
281 if *shutdown_rx.borrow() {
282 info!("Proxy server shutting down");
283 return;
284 }
285 }
286 }
287 }
288}
289
290async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
296 let mut buf_reader = BufReader::new(&mut stream);
300 let mut first_line = String::new();
301 buf_reader.read_line(&mut first_line).await?;
302
303 if first_line.is_empty() {
304 return Ok(()); }
306
307 let mut header_bytes = Vec::new();
309 loop {
310 let mut line = String::new();
311 let n = buf_reader.read_line(&mut line).await?;
312 if n == 0 || line.trim().is_empty() {
313 break;
314 }
315 header_bytes.extend_from_slice(line.as_bytes());
316 if header_bytes.len() > MAX_HEADER_SIZE {
317 drop(buf_reader);
318 let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
319 stream.write_all(response.as_bytes()).await?;
320 return Ok(());
321 }
322 }
323
324 let buffered = buf_reader.buffer().to_vec();
329 drop(buf_reader);
330
331 let first_line = first_line.trim_end();
332
333 if first_line.starts_with("CONNECT ") {
335 let use_external = if let Some(ref ext_config) = state.config.external_proxy {
337 if state.bypass_matcher.is_empty() {
338 Some(ext_config)
339 } else {
340 let host = first_line
342 .split_whitespace()
343 .nth(1)
344 .and_then(|authority| {
345 authority
346 .rsplit_once(':')
347 .map(|(h, _)| h)
348 .or(Some(authority))
349 })
350 .unwrap_or("");
351 if state.bypass_matcher.matches(host) {
352 debug!("Bypassing external proxy for {}", host);
353 None
354 } else {
355 Some(ext_config)
356 }
357 }
358 } else {
359 None
360 };
361
362 if let Some(ext_config) = use_external {
363 external::handle_external_proxy(
364 first_line,
365 &mut stream,
366 &header_bytes,
367 &state.filter,
368 &state.session_token,
369 ext_config,
370 Some(&state.audit_log),
371 )
372 .await
373 } else if state.config.external_proxy.is_some() {
374 token::validate_proxy_auth(&header_bytes, &state.session_token)?;
379 connect::handle_connect(
380 first_line,
381 &mut stream,
382 &state.filter,
383 &state.session_token,
384 &header_bytes,
385 Some(&state.audit_log),
386 )
387 .await
388 } else {
389 connect::handle_connect(
390 first_line,
391 &mut stream,
392 &state.filter,
393 &state.session_token,
394 &header_bytes,
395 Some(&state.audit_log),
396 )
397 .await
398 }
399 } else if !state.credential_store.is_empty() {
400 let ctx = reverse::ReverseProxyCtx {
402 credential_store: &state.credential_store,
403 session_token: &state.session_token,
404 filter: &state.filter,
405 tls_connector: &state.tls_connector,
406 audit_log: Some(&state.audit_log),
407 };
408 reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
409 } else {
410 let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
412 stream.write_all(response.as_bytes()).await?;
413 Ok(())
414 }
415}
416
417#[cfg(test)]
418#[allow(clippy::unwrap_used)]
419mod tests {
420 use super::*;
421
422 #[tokio::test]
423 async fn test_proxy_starts_and_binds() {
424 let config = ProxyConfig::default();
425 let handle = start(config).await.unwrap();
426
427 assert!(handle.port > 0);
429 assert_eq!(handle.token.len(), 64);
431
432 handle.shutdown();
434 }
435
436 #[tokio::test]
437 async fn test_proxy_env_vars() {
438 let config = ProxyConfig::default();
439 let handle = start(config).await.unwrap();
440
441 let vars = handle.env_vars();
442 let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
443 assert!(http_proxy.is_some());
444 assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
445
446 let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
447 assert!(token_var.is_some());
448 assert_eq!(token_var.unwrap().1.len(), 64);
449
450 handle.shutdown();
451 }
452
453 #[tokio::test]
454 async fn test_proxy_credential_env_vars() {
455 let config = ProxyConfig {
456 routes: vec![crate::config::RouteConfig {
457 prefix: "openai".to_string(),
458 upstream: "https://api.openai.com".to_string(),
459 credential_key: None,
460 inject_mode: crate::config::InjectMode::Header,
461 inject_header: "Authorization".to_string(),
462 credential_format: "Bearer {}".to_string(),
463 path_pattern: None,
464 path_replacement: None,
465 query_param_name: None,
466 env_var: None,
467 }],
468 ..Default::default()
469 };
470 let handle = start(config.clone()).await.unwrap();
471
472 let vars = handle.credential_env_vars(&config);
473 assert_eq!(vars.len(), 1);
474 assert_eq!(vars[0].0, "OPENAI_BASE_URL");
475 assert!(vars[0].1.contains("/openai"));
476
477 handle.shutdown();
478 }
479
480 #[test]
481 fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
482 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
486 let handle = ProxyHandle {
487 port: 12345,
488 token: Zeroizing::new("test_token".to_string()),
489 audit_log: audit::new_audit_log(),
490 shutdown_tx,
491 loaded_routes: ["openai".to_string()].into_iter().collect(),
492 };
493 let config = ProxyConfig {
494 routes: vec![crate::config::RouteConfig {
495 prefix: "openai".to_string(),
496 upstream: "https://api.openai.com".to_string(),
497 credential_key: Some("openai_api_key".to_string()),
498 inject_mode: crate::config::InjectMode::Header,
499 inject_header: "Authorization".to_string(),
500 credential_format: "Bearer {}".to_string(),
501 path_pattern: None,
502 path_replacement: None,
503 query_param_name: None,
504 env_var: None, }],
506 ..Default::default()
507 };
508
509 let vars = handle.credential_env_vars(&config);
510 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
514 assert!(
515 api_key_var.is_some(),
516 "Should derive env var name from credential_key.to_uppercase()"
517 );
518
519 let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
520 assert_eq!(val, "test_token");
521 }
522
523 #[test]
524 fn test_proxy_credential_env_vars_with_explicit_env_var() {
525 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
533 let handle = ProxyHandle {
534 port: 12345,
535 token: Zeroizing::new("test_token".to_string()),
536 audit_log: audit::new_audit_log(),
537 shutdown_tx,
538 loaded_routes: ["openai".to_string()].into_iter().collect(),
539 };
540 let config = ProxyConfig {
541 routes: vec![crate::config::RouteConfig {
542 prefix: "openai".to_string(),
543 upstream: "https://api.openai.com".to_string(),
544 credential_key: Some("op://Development/OpenAI/credential".to_string()),
545 inject_mode: crate::config::InjectMode::Header,
546 inject_header: "Authorization".to_string(),
547 credential_format: "Bearer {}".to_string(),
548 path_pattern: None,
549 path_replacement: None,
550 query_param_name: None,
551 env_var: Some("OPENAI_API_KEY".to_string()),
552 }],
553 ..Default::default()
554 };
555
556 let vars = handle.credential_env_vars(&config);
557 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
560 assert!(
561 api_key_var.is_some(),
562 "Should use explicit env_var name, not derive from credential_key"
563 );
564
565 let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
567 assert_eq!(val, "test_token");
568
569 let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
571 assert!(
572 bad_var.is_none(),
573 "Should not generate env var from op:// URI uppercase"
574 );
575 }
576
577 #[test]
578 fn test_proxy_credential_env_vars_skips_unloaded_routes() {
579 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
584 let handle = ProxyHandle {
585 port: 12345,
586 token: Zeroizing::new("test_token".to_string()),
587 audit_log: audit::new_audit_log(),
588 shutdown_tx,
589 loaded_routes: ["openai".to_string()].into_iter().collect(),
591 };
592 let config = ProxyConfig {
593 routes: vec![
594 crate::config::RouteConfig {
595 prefix: "openai".to_string(),
596 upstream: "https://api.openai.com".to_string(),
597 credential_key: Some("openai_api_key".to_string()),
598 inject_mode: crate::config::InjectMode::Header,
599 inject_header: "Authorization".to_string(),
600 credential_format: "Bearer {}".to_string(),
601 path_pattern: None,
602 path_replacement: None,
603 query_param_name: None,
604 env_var: None,
605 },
606 crate::config::RouteConfig {
607 prefix: "github".to_string(),
608 upstream: "https://api.github.com".to_string(),
609 credential_key: Some("env://GITHUB_TOKEN".to_string()),
610 inject_mode: crate::config::InjectMode::Header,
611 inject_header: "Authorization".to_string(),
612 credential_format: "token {}".to_string(),
613 path_pattern: None,
614 path_replacement: None,
615 query_param_name: None,
616 env_var: Some("GITHUB_TOKEN".to_string()),
617 },
618 ],
619 ..Default::default()
620 };
621
622 let vars = handle.credential_env_vars(&config);
623
624 let openai_base = vars.iter().find(|(k, _)| k == "OPENAI_BASE_URL");
626 assert!(openai_base.is_some(), "loaded route should have BASE_URL");
627 let openai_key = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
628 assert!(openai_key.is_some(), "loaded route should have API key");
629
630 let github_base = vars.iter().find(|(k, _)| k == "GITHUB_BASE_URL");
633 assert!(
634 github_base.is_some(),
635 "declared route should still have BASE_URL"
636 );
637 let github_token = vars.iter().find(|(k, _)| k == "GITHUB_TOKEN");
638 assert!(
639 github_token.is_none(),
640 "unloaded route must not inject phantom GITHUB_TOKEN"
641 );
642 }
643}