1use crate::audit;
11use crate::config::ProxyConfig;
12use crate::connect;
13use crate::credential::CredentialStore;
14use crate::error::{ProxyError, Result};
15use crate::external;
16use crate::filter::ProxyFilter;
17use crate::reverse;
18use crate::token;
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::net::TcpListener;
24use tokio::sync::watch;
25use tracing::{debug, info, warn};
26use zeroize::Zeroizing;
27
28const MAX_HEADER_SIZE: usize = 64 * 1024;
31
32pub struct ProxyHandle {
37 pub port: u16,
39 pub token: Zeroizing<String>,
41 audit_log: audit::SharedAuditLog,
43 shutdown_tx: watch::Sender<bool>,
45 loaded_routes: std::collections::HashSet<String>,
49 no_proxy_hosts: Vec<String>,
52}
53
54impl ProxyHandle {
55 pub fn shutdown(&self) {
57 let _ = self.shutdown_tx.send(true);
58 }
59
60 #[must_use]
62 pub fn drain_audit_events(&self) -> Vec<nono::undo::NetworkAuditEvent> {
63 audit::drain_audit_events(&self.audit_log)
64 }
65
66 #[must_use]
74 pub fn env_vars(&self) -> Vec<(String, String)> {
75 let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
76
77 let mut no_proxy_parts = vec!["localhost".to_string(), "127.0.0.1".to_string()];
81 for host in &self.no_proxy_hosts {
82 let hostname = if host.contains("]:") {
85 host.rsplit_once("]:")
87 .map(|(h, _)| format!("{}]", h))
88 .unwrap_or_else(|| host.clone())
89 } else {
90 host.rsplit_once(':')
91 .and_then(|(h, p)| p.parse::<u16>().ok().map(|_| h.to_string()))
92 .unwrap_or_else(|| host.clone())
93 };
94 if !no_proxy_parts.contains(&hostname.to_string()) {
95 no_proxy_parts.push(hostname.to_string());
96 }
97 }
98 let no_proxy = no_proxy_parts.join(",");
99
100 let mut vars = vec![
101 ("HTTP_PROXY".to_string(), proxy_url.clone()),
102 ("HTTPS_PROXY".to_string(), proxy_url.clone()),
103 ("NO_PROXY".to_string(), no_proxy.clone()),
104 ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
105 ];
106
107 vars.push(("http_proxy".to_string(), proxy_url.clone()));
109 vars.push(("https_proxy".to_string(), proxy_url));
110 vars.push(("no_proxy".to_string(), no_proxy));
111
112 vars
113 }
114
115 #[must_use]
124 pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
125 let mut vars = Vec::new();
126 for route in &config.routes {
127 let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
129 let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
130 vars.push((base_url_name, url));
131
132 if !self.loaded_routes.contains(&route.prefix) {
137 continue;
138 }
139
140 if let Some(ref env_var) = route.env_var {
144 vars.push((env_var.clone(), self.token.to_string()));
145 } else if let Some(ref cred_key) = route.credential_key {
146 let api_key_name = cred_key.to_uppercase();
147 vars.push((api_key_name, self.token.to_string()));
148 }
149 }
150 vars
151 }
152}
153
154struct ProxyState {
156 filter: ProxyFilter,
157 session_token: Zeroizing<String>,
158 credential_store: CredentialStore,
159 config: ProxyConfig,
160 tls_connector: tokio_rustls::TlsConnector,
163 active_connections: AtomicUsize,
165 audit_log: audit::SharedAuditLog,
167 bypass_matcher: external::BypassMatcher,
170}
171
172pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
180 let session_token = token::generate_session_token()?;
182
183 let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
185 let listener = TcpListener::bind(bind_addr)
186 .await
187 .map_err(|e| ProxyError::Bind {
188 addr: bind_addr.to_string(),
189 source: e,
190 })?;
191
192 let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
193 addr: bind_addr.to_string(),
194 source: e,
195 })?;
196 let port = local_addr.port();
197
198 info!("Proxy server listening on {}", local_addr);
199
200 let credential_store = if config.routes.is_empty() {
202 CredentialStore::empty()
203 } else {
204 CredentialStore::load(&config.routes)?
205 };
206 let loaded_routes = credential_store.loaded_prefixes();
207
208 let filter = if config.allowed_hosts.is_empty() {
210 ProxyFilter::allow_all()
211 } else {
212 ProxyFilter::new(&config.allowed_hosts)
213 };
214
215 let mut root_store = rustls::RootCertStore::empty();
219 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
220 let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
221 rustls::crypto::ring::default_provider(),
222 ))
223 .with_safe_default_protocol_versions()
224 .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
225 .with_root_certificates(root_store)
226 .with_no_client_auth();
227 let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
228
229 let bypass_matcher = config
231 .external_proxy
232 .as_ref()
233 .map(|ext| external::BypassMatcher::new(&ext.bypass_hosts))
234 .unwrap_or_else(|| external::BypassMatcher::new(&[]));
235
236 let (shutdown_tx, shutdown_rx) = watch::channel(false);
238 let audit_log = audit::new_audit_log();
239
240 let no_proxy_hosts: Vec<String> = if cfg!(target_os = "macos") {
251 Vec::new()
252 } else {
253 let credential_hosts = credential_store.credential_upstream_hosts();
254 config
255 .allowed_hosts
256 .iter()
257 .filter(|host| {
258 let normalised = {
259 let h = host.to_lowercase();
260 if h.starts_with('[') {
261 if h.contains("]:") {
263 h
264 } else {
265 format!("{}:443", h)
266 }
267 } else if h.contains(':') {
268 h
269 } else {
270 format!("{}:443", h)
271 }
272 };
273 !credential_hosts.contains(&normalised)
274 })
275 .cloned()
276 .collect()
277 };
278
279 if !no_proxy_hosts.is_empty() {
280 debug!("Smart NO_PROXY bypass hosts: {:?}", no_proxy_hosts);
281 }
282
283 let state = Arc::new(ProxyState {
284 filter,
285 session_token: session_token.clone(),
286 credential_store,
287 config,
288 tls_connector,
289 active_connections: AtomicUsize::new(0),
290 audit_log: Arc::clone(&audit_log),
291 bypass_matcher,
292 });
293
294 tokio::spawn(accept_loop(listener, state, shutdown_rx));
298
299 Ok(ProxyHandle {
300 port,
301 token: session_token,
302 audit_log,
303 shutdown_tx,
304 loaded_routes,
305 no_proxy_hosts,
306 })
307}
308
309async fn accept_loop(
311 listener: TcpListener,
312 state: Arc<ProxyState>,
313 mut shutdown_rx: watch::Receiver<bool>,
314) {
315 loop {
316 tokio::select! {
317 result = listener.accept() => {
318 match result {
319 Ok((stream, addr)) => {
320 let max = state.config.max_connections;
322 if max > 0 {
323 let current = state.active_connections.load(Ordering::Relaxed);
324 if current >= max {
325 warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
326 drop(stream);
328 continue;
329 }
330 }
331 state.active_connections.fetch_add(1, Ordering::Relaxed);
332
333 debug!("Accepted connection from {}", addr);
334 let state = Arc::clone(&state);
335 tokio::spawn(async move {
336 if let Err(e) = handle_connection(stream, &state).await {
337 debug!("Connection handler error: {}", e);
338 }
339 state.active_connections.fetch_sub(1, Ordering::Relaxed);
340 });
341 }
342 Err(e) => {
343 warn!("Accept error: {}", e);
344 }
345 }
346 }
347 _ = shutdown_rx.changed() => {
348 if *shutdown_rx.borrow() {
349 info!("Proxy server shutting down");
350 return;
351 }
352 }
353 }
354 }
355}
356
357async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
363 let mut buf_reader = BufReader::new(&mut stream);
367 let mut first_line = String::new();
368 buf_reader.read_line(&mut first_line).await?;
369
370 if first_line.is_empty() {
371 return Ok(()); }
373
374 let mut header_bytes = Vec::new();
376 loop {
377 let mut line = String::new();
378 let n = buf_reader.read_line(&mut line).await?;
379 if n == 0 || line.trim().is_empty() {
380 break;
381 }
382 header_bytes.extend_from_slice(line.as_bytes());
383 if header_bytes.len() > MAX_HEADER_SIZE {
384 drop(buf_reader);
385 let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
386 stream.write_all(response.as_bytes()).await?;
387 return Ok(());
388 }
389 }
390
391 let buffered = buf_reader.buffer().to_vec();
396 drop(buf_reader);
397
398 let first_line = first_line.trim_end();
399
400 if first_line.starts_with("CONNECT ") {
402 if !state.credential_store.is_empty() {
407 if let Some(authority) = first_line.split_whitespace().nth(1) {
408 let host_port = if authority.starts_with('[') {
411 if authority.contains("]:") {
413 authority.to_lowercase()
414 } else {
415 format!("{}:443", authority.to_lowercase())
416 }
417 } else if authority.contains(':') {
418 authority.to_lowercase()
419 } else {
420 format!("{}:443", authority.to_lowercase())
421 };
422 if state.credential_store.is_credential_upstream(&host_port) {
423 let (host, port) = host_port
424 .rsplit_once(':')
425 .map(|(h, p)| (h, p.parse::<u16>().unwrap_or(443)))
426 .unwrap_or((&host_port, 443));
427 warn!(
428 "Blocked CONNECT to credential upstream {} — use reverse proxy path instead",
429 authority
430 );
431 audit::log_denied(
432 Some(&state.audit_log),
433 audit::ProxyMode::Connect,
434 host,
435 port,
436 "credential upstream: CONNECT bypasses L7 filtering",
437 );
438 let response = "HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\n\r\n";
439 stream.write_all(response.as_bytes()).await?;
440 return Ok(());
441 }
442 }
443 }
444
445 let use_external = if let Some(ref ext_config) = state.config.external_proxy {
447 if state.bypass_matcher.is_empty() {
448 Some(ext_config)
449 } else {
450 let host = first_line
452 .split_whitespace()
453 .nth(1)
454 .and_then(|authority| {
455 authority
456 .rsplit_once(':')
457 .map(|(h, _)| h)
458 .or(Some(authority))
459 })
460 .unwrap_or("");
461 if state.bypass_matcher.matches(host) {
462 debug!("Bypassing external proxy for {}", host);
463 None
464 } else {
465 Some(ext_config)
466 }
467 }
468 } else {
469 None
470 };
471
472 if let Some(ext_config) = use_external {
473 external::handle_external_proxy(
474 first_line,
475 &mut stream,
476 &header_bytes,
477 &state.filter,
478 &state.session_token,
479 ext_config,
480 Some(&state.audit_log),
481 )
482 .await
483 } else if state.config.external_proxy.is_some() {
484 token::validate_proxy_auth(&header_bytes, &state.session_token)?;
489 connect::handle_connect(
490 first_line,
491 &mut stream,
492 &state.filter,
493 &state.session_token,
494 &header_bytes,
495 Some(&state.audit_log),
496 )
497 .await
498 } else {
499 connect::handle_connect(
500 first_line,
501 &mut stream,
502 &state.filter,
503 &state.session_token,
504 &header_bytes,
505 Some(&state.audit_log),
506 )
507 .await
508 }
509 } else if !state.credential_store.is_empty() {
510 let ctx = reverse::ReverseProxyCtx {
512 credential_store: &state.credential_store,
513 session_token: &state.session_token,
514 filter: &state.filter,
515 tls_connector: &state.tls_connector,
516 audit_log: Some(&state.audit_log),
517 };
518 reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
519 } else {
520 let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
522 stream.write_all(response.as_bytes()).await?;
523 Ok(())
524 }
525}
526
527#[cfg(test)]
528#[allow(clippy::unwrap_used)]
529mod tests {
530 use super::*;
531
532 #[tokio::test]
533 async fn test_proxy_starts_and_binds() {
534 let config = ProxyConfig::default();
535 let handle = start(config).await.unwrap();
536
537 assert!(handle.port > 0);
539 assert_eq!(handle.token.len(), 64);
541
542 handle.shutdown();
544 }
545
546 #[tokio::test]
547 async fn test_proxy_env_vars() {
548 let config = ProxyConfig::default();
549 let handle = start(config).await.unwrap();
550
551 let vars = handle.env_vars();
552 let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
553 assert!(http_proxy.is_some());
554 assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
555
556 let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
557 assert!(token_var.is_some());
558 assert_eq!(token_var.unwrap().1.len(), 64);
559
560 let node_proxy_flag = vars.iter().find(|(k, _)| k == "NODE_USE_ENV_PROXY");
561 assert!(
562 node_proxy_flag.is_none(),
563 "proxy env should avoid Node-specific flags that can perturb non-Node runtimes"
564 );
565
566 handle.shutdown();
567 }
568
569 #[tokio::test]
570 async fn test_proxy_credential_env_vars() {
571 let config = ProxyConfig {
572 routes: vec![crate::config::RouteConfig {
573 prefix: "openai".to_string(),
574 upstream: "https://api.openai.com".to_string(),
575 credential_key: None,
576 inject_mode: crate::config::InjectMode::Header,
577 inject_header: "Authorization".to_string(),
578 credential_format: "Bearer {}".to_string(),
579 path_pattern: None,
580 path_replacement: None,
581 query_param_name: None,
582 env_var: None,
583 endpoint_rules: vec![],
584 tls_ca: None,
585 }],
586 ..Default::default()
587 };
588 let handle = start(config.clone()).await.unwrap();
589
590 let vars = handle.credential_env_vars(&config);
591 assert_eq!(vars.len(), 1);
592 assert_eq!(vars[0].0, "OPENAI_BASE_URL");
593 assert!(vars[0].1.contains("/openai"));
594
595 handle.shutdown();
596 }
597
598 #[test]
599 fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
600 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
604 let handle = ProxyHandle {
605 port: 12345,
606 token: Zeroizing::new("test_token".to_string()),
607 audit_log: audit::new_audit_log(),
608 shutdown_tx,
609 loaded_routes: ["openai".to_string()].into_iter().collect(),
610 no_proxy_hosts: Vec::new(),
611 };
612 let config = ProxyConfig {
613 routes: vec![crate::config::RouteConfig {
614 prefix: "openai".to_string(),
615 upstream: "https://api.openai.com".to_string(),
616 credential_key: Some("openai_api_key".to_string()),
617 inject_mode: crate::config::InjectMode::Header,
618 inject_header: "Authorization".to_string(),
619 credential_format: "Bearer {}".to_string(),
620 path_pattern: None,
621 path_replacement: None,
622 query_param_name: None,
623 env_var: None, endpoint_rules: vec![],
625 tls_ca: None,
626 }],
627 ..Default::default()
628 };
629
630 let vars = handle.credential_env_vars(&config);
631 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
635 assert!(
636 api_key_var.is_some(),
637 "Should derive env var name from credential_key.to_uppercase()"
638 );
639
640 let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
641 assert_eq!(val, "test_token");
642 }
643
644 #[test]
645 fn test_proxy_credential_env_vars_with_explicit_env_var() {
646 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
654 let handle = ProxyHandle {
655 port: 12345,
656 token: Zeroizing::new("test_token".to_string()),
657 audit_log: audit::new_audit_log(),
658 shutdown_tx,
659 loaded_routes: ["openai".to_string()].into_iter().collect(),
660 no_proxy_hosts: Vec::new(),
661 };
662 let config = ProxyConfig {
663 routes: vec![crate::config::RouteConfig {
664 prefix: "openai".to_string(),
665 upstream: "https://api.openai.com".to_string(),
666 credential_key: Some("op://Development/OpenAI/credential".to_string()),
667 inject_mode: crate::config::InjectMode::Header,
668 inject_header: "Authorization".to_string(),
669 credential_format: "Bearer {}".to_string(),
670 path_pattern: None,
671 path_replacement: None,
672 query_param_name: None,
673 env_var: Some("OPENAI_API_KEY".to_string()),
674 endpoint_rules: vec![],
675 tls_ca: None,
676 }],
677 ..Default::default()
678 };
679
680 let vars = handle.credential_env_vars(&config);
681 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
684 assert!(
685 api_key_var.is_some(),
686 "Should use explicit env_var name, not derive from credential_key"
687 );
688
689 let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
691 assert_eq!(val, "test_token");
692
693 let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
695 assert!(
696 bad_var.is_none(),
697 "Should not generate env var from op:// URI uppercase"
698 );
699 }
700
701 #[test]
702 fn test_proxy_credential_env_vars_skips_unloaded_routes() {
703 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
708 let handle = ProxyHandle {
709 port: 12345,
710 token: Zeroizing::new("test_token".to_string()),
711 audit_log: audit::new_audit_log(),
712 shutdown_tx,
713 loaded_routes: ["openai".to_string()].into_iter().collect(),
715 no_proxy_hosts: Vec::new(),
716 };
717 let config = ProxyConfig {
718 routes: vec![
719 crate::config::RouteConfig {
720 prefix: "openai".to_string(),
721 upstream: "https://api.openai.com".to_string(),
722 credential_key: Some("openai_api_key".to_string()),
723 inject_mode: crate::config::InjectMode::Header,
724 inject_header: "Authorization".to_string(),
725 credential_format: "Bearer {}".to_string(),
726 path_pattern: None,
727 path_replacement: None,
728 query_param_name: None,
729 env_var: None,
730 endpoint_rules: vec![],
731 tls_ca: None,
732 },
733 crate::config::RouteConfig {
734 prefix: "github".to_string(),
735 upstream: "https://api.github.com".to_string(),
736 credential_key: Some("env://GITHUB_TOKEN".to_string()),
737 inject_mode: crate::config::InjectMode::Header,
738 inject_header: "Authorization".to_string(),
739 credential_format: "token {}".to_string(),
740 path_pattern: None,
741 path_replacement: None,
742 query_param_name: None,
743 env_var: Some("GITHUB_TOKEN".to_string()),
744 endpoint_rules: vec![],
745 tls_ca: None,
746 },
747 ],
748 ..Default::default()
749 };
750
751 let vars = handle.credential_env_vars(&config);
752
753 let openai_base = vars.iter().find(|(k, _)| k == "OPENAI_BASE_URL");
755 assert!(openai_base.is_some(), "loaded route should have BASE_URL");
756 let openai_key = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
757 assert!(openai_key.is_some(), "loaded route should have API key");
758
759 let github_base = vars.iter().find(|(k, _)| k == "GITHUB_BASE_URL");
762 assert!(
763 github_base.is_some(),
764 "declared route should still have BASE_URL"
765 );
766 let github_token = vars.iter().find(|(k, _)| k == "GITHUB_TOKEN");
767 assert!(
768 github_token.is_none(),
769 "unloaded route must not inject phantom GITHUB_TOKEN"
770 );
771 }
772
773 #[test]
774 fn test_no_proxy_excludes_credential_upstreams() {
775 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
776 let handle = ProxyHandle {
777 port: 12345,
778 token: Zeroizing::new("test_token".to_string()),
779 audit_log: audit::new_audit_log(),
780 shutdown_tx,
781 loaded_routes: std::collections::HashSet::new(),
782 no_proxy_hosts: vec![
783 "nats.internal:4222".to_string(),
784 "opencode.internal:4096".to_string(),
785 ],
786 };
787
788 let vars = handle.env_vars();
789 let no_proxy = vars.iter().find(|(k, _)| k == "NO_PROXY").unwrap();
790 assert!(
791 no_proxy.1.contains("nats.internal"),
792 "non-credential host should be in NO_PROXY"
793 );
794 assert!(
795 no_proxy.1.contains("opencode.internal"),
796 "non-credential host should be in NO_PROXY"
797 );
798 assert!(
799 no_proxy.1.contains("localhost"),
800 "localhost should always be in NO_PROXY"
801 );
802 }
803
804 #[test]
805 fn test_no_proxy_empty_when_no_non_credential_hosts() {
806 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
807 let handle = ProxyHandle {
808 port: 12345,
809 token: Zeroizing::new("test_token".to_string()),
810 audit_log: audit::new_audit_log(),
811 shutdown_tx,
812 loaded_routes: std::collections::HashSet::new(),
813 no_proxy_hosts: Vec::new(),
814 };
815
816 let vars = handle.env_vars();
817 let no_proxy = vars.iter().find(|(k, _)| k == "NO_PROXY").unwrap();
818 assert_eq!(
819 no_proxy.1, "localhost,127.0.0.1",
820 "NO_PROXY should only contain loopback when no bypass hosts"
821 );
822 }
823}