1use crate::audit;
11use crate::config::ProxyConfig;
12use crate::connect;
13use crate::credential::CredentialStore;
14use crate::error::{ProxyError, Result};
15use crate::external;
16use crate::filter::ProxyFilter;
17use crate::reverse;
18use crate::token;
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::net::TcpListener;
24use tokio::sync::watch;
25use tracing::{debug, info, warn};
26use zeroize::Zeroizing;
27
28const MAX_HEADER_SIZE: usize = 64 * 1024;
31
32pub struct ProxyHandle {
37 pub port: u16,
39 pub token: Zeroizing<String>,
41 audit_log: audit::SharedAuditLog,
43 shutdown_tx: watch::Sender<bool>,
45 loaded_routes: std::collections::HashSet<String>,
49 no_proxy_hosts: Vec<String>,
52}
53
54impl ProxyHandle {
55 pub fn shutdown(&self) {
57 let _ = self.shutdown_tx.send(true);
58 }
59
60 #[must_use]
62 pub fn drain_audit_events(&self) -> Vec<nono::undo::NetworkAuditEvent> {
63 audit::drain_audit_events(&self.audit_log)
64 }
65
66 #[must_use]
74 pub fn env_vars(&self) -> Vec<(String, String)> {
75 let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
76
77 let mut no_proxy_parts = vec!["localhost".to_string(), "127.0.0.1".to_string()];
81 for host in &self.no_proxy_hosts {
82 let hostname = if host.contains("]:") {
85 host.rsplit_once("]:")
87 .map(|(h, _)| format!("{}]", h))
88 .unwrap_or_else(|| host.clone())
89 } else {
90 host.rsplit_once(':')
91 .and_then(|(h, p)| p.parse::<u16>().ok().map(|_| h.to_string()))
92 .unwrap_or_else(|| host.clone())
93 };
94 if !no_proxy_parts.contains(&hostname.to_string()) {
95 no_proxy_parts.push(hostname.to_string());
96 }
97 }
98 let no_proxy = no_proxy_parts.join(",");
99
100 let mut vars = vec![
101 ("HTTP_PROXY".to_string(), proxy_url.clone()),
102 ("HTTPS_PROXY".to_string(), proxy_url.clone()),
103 ("NO_PROXY".to_string(), no_proxy.clone()),
104 ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
105 ];
106
107 vars.push(("http_proxy".to_string(), proxy_url.clone()));
109 vars.push(("https_proxy".to_string(), proxy_url));
110 vars.push(("no_proxy".to_string(), no_proxy));
111
112 vars
113 }
114
115 #[must_use]
124 pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
125 let mut vars = Vec::new();
126 for route in &config.routes {
127 let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
129 let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
130 vars.push((base_url_name, url));
131
132 if !self.loaded_routes.contains(&route.prefix) {
137 continue;
138 }
139
140 if let Some(ref env_var) = route.env_var {
144 vars.push((env_var.clone(), self.token.to_string()));
145 } else if let Some(ref cred_key) = route.credential_key {
146 let api_key_name = cred_key.to_uppercase();
147 vars.push((api_key_name, self.token.to_string()));
148 }
149 }
150 vars
151 }
152}
153
154struct ProxyState {
156 filter: ProxyFilter,
157 session_token: Zeroizing<String>,
158 credential_store: CredentialStore,
159 config: ProxyConfig,
160 tls_connector: tokio_rustls::TlsConnector,
163 active_connections: AtomicUsize,
165 audit_log: audit::SharedAuditLog,
167 bypass_matcher: external::BypassMatcher,
170}
171
172pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
180 let session_token = token::generate_session_token()?;
182
183 let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
185 let listener = TcpListener::bind(bind_addr)
186 .await
187 .map_err(|e| ProxyError::Bind {
188 addr: bind_addr.to_string(),
189 source: e,
190 })?;
191
192 let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
193 addr: bind_addr.to_string(),
194 source: e,
195 })?;
196 let port = local_addr.port();
197
198 info!("Proxy server listening on {}", local_addr);
199
200 let credential_store = if config.routes.is_empty() {
202 CredentialStore::empty()
203 } else {
204 CredentialStore::load(&config.routes)?
205 };
206 let loaded_routes = credential_store.loaded_prefixes();
207
208 let filter = if config.allowed_hosts.is_empty() {
210 ProxyFilter::allow_all()
211 } else {
212 ProxyFilter::new(&config.allowed_hosts)
213 };
214
215 let mut root_store = rustls::RootCertStore::empty();
219 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
220 let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
221 rustls::crypto::ring::default_provider(),
222 ))
223 .with_safe_default_protocol_versions()
224 .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
225 .with_root_certificates(root_store)
226 .with_no_client_auth();
227 let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
228
229 let bypass_matcher = config
231 .external_proxy
232 .as_ref()
233 .map(|ext| external::BypassMatcher::new(&ext.bypass_hosts))
234 .unwrap_or_else(|| external::BypassMatcher::new(&[]));
235
236 let (shutdown_tx, shutdown_rx) = watch::channel(false);
238 let audit_log = audit::new_audit_log();
239
240 let credential_hosts = credential_store.credential_upstream_hosts();
245 let no_proxy_hosts: Vec<String> = config
246 .allowed_hosts
247 .iter()
248 .filter(|host| {
249 let normalised = {
250 let h = host.to_lowercase();
251 if h.contains(':') {
252 h
253 } else {
254 format!("{}:443", h)
255 }
256 };
257 !credential_hosts.contains(&normalised)
258 })
259 .cloned()
260 .collect();
261
262 if !no_proxy_hosts.is_empty() {
263 debug!("Smart NO_PROXY bypass hosts: {:?}", no_proxy_hosts);
264 }
265
266 let state = Arc::new(ProxyState {
267 filter,
268 session_token: session_token.clone(),
269 credential_store,
270 config,
271 tls_connector,
272 active_connections: AtomicUsize::new(0),
273 audit_log: Arc::clone(&audit_log),
274 bypass_matcher,
275 });
276
277 tokio::spawn(accept_loop(listener, state, shutdown_rx));
281
282 Ok(ProxyHandle {
283 port,
284 token: session_token,
285 audit_log,
286 shutdown_tx,
287 loaded_routes,
288 no_proxy_hosts,
289 })
290}
291
292async fn accept_loop(
294 listener: TcpListener,
295 state: Arc<ProxyState>,
296 mut shutdown_rx: watch::Receiver<bool>,
297) {
298 loop {
299 tokio::select! {
300 result = listener.accept() => {
301 match result {
302 Ok((stream, addr)) => {
303 let max = state.config.max_connections;
305 if max > 0 {
306 let current = state.active_connections.load(Ordering::Relaxed);
307 if current >= max {
308 warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
309 drop(stream);
311 continue;
312 }
313 }
314 state.active_connections.fetch_add(1, Ordering::Relaxed);
315
316 debug!("Accepted connection from {}", addr);
317 let state = Arc::clone(&state);
318 tokio::spawn(async move {
319 if let Err(e) = handle_connection(stream, &state).await {
320 debug!("Connection handler error: {}", e);
321 }
322 state.active_connections.fetch_sub(1, Ordering::Relaxed);
323 });
324 }
325 Err(e) => {
326 warn!("Accept error: {}", e);
327 }
328 }
329 }
330 _ = shutdown_rx.changed() => {
331 if *shutdown_rx.borrow() {
332 info!("Proxy server shutting down");
333 return;
334 }
335 }
336 }
337 }
338}
339
340async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
346 let mut buf_reader = BufReader::new(&mut stream);
350 let mut first_line = String::new();
351 buf_reader.read_line(&mut first_line).await?;
352
353 if first_line.is_empty() {
354 return Ok(()); }
356
357 let mut header_bytes = Vec::new();
359 loop {
360 let mut line = String::new();
361 let n = buf_reader.read_line(&mut line).await?;
362 if n == 0 || line.trim().is_empty() {
363 break;
364 }
365 header_bytes.extend_from_slice(line.as_bytes());
366 if header_bytes.len() > MAX_HEADER_SIZE {
367 drop(buf_reader);
368 let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
369 stream.write_all(response.as_bytes()).await?;
370 return Ok(());
371 }
372 }
373
374 let buffered = buf_reader.buffer().to_vec();
379 drop(buf_reader);
380
381 let first_line = first_line.trim_end();
382
383 if first_line.starts_with("CONNECT ") {
385 if !state.credential_store.is_empty() {
390 if let Some(authority) = first_line.split_whitespace().nth(1) {
391 let host_port = if authority.starts_with('[') {
394 if authority.contains("]:") {
396 authority.to_lowercase()
397 } else {
398 format!("{}:443", authority.to_lowercase())
399 }
400 } else if authority.contains(':') {
401 authority.to_lowercase()
402 } else {
403 format!("{}:443", authority.to_lowercase())
404 };
405 if state.credential_store.is_credential_upstream(&host_port) {
406 let (host, port) = host_port
407 .rsplit_once(':')
408 .map(|(h, p)| (h, p.parse::<u16>().unwrap_or(443)))
409 .unwrap_or((&host_port, 443));
410 warn!(
411 "Blocked CONNECT to credential upstream {} — use reverse proxy path instead",
412 authority
413 );
414 audit::log_denied(
415 Some(&state.audit_log),
416 audit::ProxyMode::Connect,
417 host,
418 port,
419 "credential upstream: CONNECT bypasses L7 filtering",
420 );
421 let response = "HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\n\r\n";
422 stream.write_all(response.as_bytes()).await?;
423 return Ok(());
424 }
425 }
426 }
427
428 let use_external = if let Some(ref ext_config) = state.config.external_proxy {
430 if state.bypass_matcher.is_empty() {
431 Some(ext_config)
432 } else {
433 let host = first_line
435 .split_whitespace()
436 .nth(1)
437 .and_then(|authority| {
438 authority
439 .rsplit_once(':')
440 .map(|(h, _)| h)
441 .or(Some(authority))
442 })
443 .unwrap_or("");
444 if state.bypass_matcher.matches(host) {
445 debug!("Bypassing external proxy for {}", host);
446 None
447 } else {
448 Some(ext_config)
449 }
450 }
451 } else {
452 None
453 };
454
455 if let Some(ext_config) = use_external {
456 external::handle_external_proxy(
457 first_line,
458 &mut stream,
459 &header_bytes,
460 &state.filter,
461 &state.session_token,
462 ext_config,
463 Some(&state.audit_log),
464 )
465 .await
466 } else if state.config.external_proxy.is_some() {
467 token::validate_proxy_auth(&header_bytes, &state.session_token)?;
472 connect::handle_connect(
473 first_line,
474 &mut stream,
475 &state.filter,
476 &state.session_token,
477 &header_bytes,
478 Some(&state.audit_log),
479 )
480 .await
481 } else {
482 connect::handle_connect(
483 first_line,
484 &mut stream,
485 &state.filter,
486 &state.session_token,
487 &header_bytes,
488 Some(&state.audit_log),
489 )
490 .await
491 }
492 } else if !state.credential_store.is_empty() {
493 let ctx = reverse::ReverseProxyCtx {
495 credential_store: &state.credential_store,
496 session_token: &state.session_token,
497 filter: &state.filter,
498 tls_connector: &state.tls_connector,
499 audit_log: Some(&state.audit_log),
500 };
501 reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
502 } else {
503 let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
505 stream.write_all(response.as_bytes()).await?;
506 Ok(())
507 }
508}
509
510#[cfg(test)]
511#[allow(clippy::unwrap_used)]
512mod tests {
513 use super::*;
514
515 #[tokio::test]
516 async fn test_proxy_starts_and_binds() {
517 let config = ProxyConfig::default();
518 let handle = start(config).await.unwrap();
519
520 assert!(handle.port > 0);
522 assert_eq!(handle.token.len(), 64);
524
525 handle.shutdown();
527 }
528
529 #[tokio::test]
530 async fn test_proxy_env_vars() {
531 let config = ProxyConfig::default();
532 let handle = start(config).await.unwrap();
533
534 let vars = handle.env_vars();
535 let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
536 assert!(http_proxy.is_some());
537 assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
538
539 let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
540 assert!(token_var.is_some());
541 assert_eq!(token_var.unwrap().1.len(), 64);
542
543 let node_proxy_flag = vars.iter().find(|(k, _)| k == "NODE_USE_ENV_PROXY");
544 assert!(
545 node_proxy_flag.is_none(),
546 "proxy env should avoid Node-specific flags that can perturb non-Node runtimes"
547 );
548
549 handle.shutdown();
550 }
551
552 #[tokio::test]
553 async fn test_proxy_credential_env_vars() {
554 let config = ProxyConfig {
555 routes: vec![crate::config::RouteConfig {
556 prefix: "openai".to_string(),
557 upstream: "https://api.openai.com".to_string(),
558 credential_key: None,
559 inject_mode: crate::config::InjectMode::Header,
560 inject_header: "Authorization".to_string(),
561 credential_format: "Bearer {}".to_string(),
562 path_pattern: None,
563 path_replacement: None,
564 query_param_name: None,
565 env_var: None,
566 endpoint_rules: vec![],
567 tls_ca: None,
568 }],
569 ..Default::default()
570 };
571 let handle = start(config.clone()).await.unwrap();
572
573 let vars = handle.credential_env_vars(&config);
574 assert_eq!(vars.len(), 1);
575 assert_eq!(vars[0].0, "OPENAI_BASE_URL");
576 assert!(vars[0].1.contains("/openai"));
577
578 handle.shutdown();
579 }
580
581 #[test]
582 fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
583 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
587 let handle = ProxyHandle {
588 port: 12345,
589 token: Zeroizing::new("test_token".to_string()),
590 audit_log: audit::new_audit_log(),
591 shutdown_tx,
592 loaded_routes: ["openai".to_string()].into_iter().collect(),
593 no_proxy_hosts: Vec::new(),
594 };
595 let config = ProxyConfig {
596 routes: vec![crate::config::RouteConfig {
597 prefix: "openai".to_string(),
598 upstream: "https://api.openai.com".to_string(),
599 credential_key: Some("openai_api_key".to_string()),
600 inject_mode: crate::config::InjectMode::Header,
601 inject_header: "Authorization".to_string(),
602 credential_format: "Bearer {}".to_string(),
603 path_pattern: None,
604 path_replacement: None,
605 query_param_name: None,
606 env_var: None, endpoint_rules: vec![],
608 tls_ca: None,
609 }],
610 ..Default::default()
611 };
612
613 let vars = handle.credential_env_vars(&config);
614 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
618 assert!(
619 api_key_var.is_some(),
620 "Should derive env var name from credential_key.to_uppercase()"
621 );
622
623 let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
624 assert_eq!(val, "test_token");
625 }
626
627 #[test]
628 fn test_proxy_credential_env_vars_with_explicit_env_var() {
629 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
637 let handle = ProxyHandle {
638 port: 12345,
639 token: Zeroizing::new("test_token".to_string()),
640 audit_log: audit::new_audit_log(),
641 shutdown_tx,
642 loaded_routes: ["openai".to_string()].into_iter().collect(),
643 no_proxy_hosts: Vec::new(),
644 };
645 let config = ProxyConfig {
646 routes: vec![crate::config::RouteConfig {
647 prefix: "openai".to_string(),
648 upstream: "https://api.openai.com".to_string(),
649 credential_key: Some("op://Development/OpenAI/credential".to_string()),
650 inject_mode: crate::config::InjectMode::Header,
651 inject_header: "Authorization".to_string(),
652 credential_format: "Bearer {}".to_string(),
653 path_pattern: None,
654 path_replacement: None,
655 query_param_name: None,
656 env_var: Some("OPENAI_API_KEY".to_string()),
657 endpoint_rules: vec![],
658 tls_ca: None,
659 }],
660 ..Default::default()
661 };
662
663 let vars = handle.credential_env_vars(&config);
664 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
667 assert!(
668 api_key_var.is_some(),
669 "Should use explicit env_var name, not derive from credential_key"
670 );
671
672 let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
674 assert_eq!(val, "test_token");
675
676 let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
678 assert!(
679 bad_var.is_none(),
680 "Should not generate env var from op:// URI uppercase"
681 );
682 }
683
684 #[test]
685 fn test_proxy_credential_env_vars_skips_unloaded_routes() {
686 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
691 let handle = ProxyHandle {
692 port: 12345,
693 token: Zeroizing::new("test_token".to_string()),
694 audit_log: audit::new_audit_log(),
695 shutdown_tx,
696 loaded_routes: ["openai".to_string()].into_iter().collect(),
698 no_proxy_hosts: Vec::new(),
699 };
700 let config = ProxyConfig {
701 routes: vec![
702 crate::config::RouteConfig {
703 prefix: "openai".to_string(),
704 upstream: "https://api.openai.com".to_string(),
705 credential_key: Some("openai_api_key".to_string()),
706 inject_mode: crate::config::InjectMode::Header,
707 inject_header: "Authorization".to_string(),
708 credential_format: "Bearer {}".to_string(),
709 path_pattern: None,
710 path_replacement: None,
711 query_param_name: None,
712 env_var: None,
713 endpoint_rules: vec![],
714 tls_ca: None,
715 },
716 crate::config::RouteConfig {
717 prefix: "github".to_string(),
718 upstream: "https://api.github.com".to_string(),
719 credential_key: Some("env://GITHUB_TOKEN".to_string()),
720 inject_mode: crate::config::InjectMode::Header,
721 inject_header: "Authorization".to_string(),
722 credential_format: "token {}".to_string(),
723 path_pattern: None,
724 path_replacement: None,
725 query_param_name: None,
726 env_var: Some("GITHUB_TOKEN".to_string()),
727 endpoint_rules: vec![],
728 tls_ca: None,
729 },
730 ],
731 ..Default::default()
732 };
733
734 let vars = handle.credential_env_vars(&config);
735
736 let openai_base = vars.iter().find(|(k, _)| k == "OPENAI_BASE_URL");
738 assert!(openai_base.is_some(), "loaded route should have BASE_URL");
739 let openai_key = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
740 assert!(openai_key.is_some(), "loaded route should have API key");
741
742 let github_base = vars.iter().find(|(k, _)| k == "GITHUB_BASE_URL");
745 assert!(
746 github_base.is_some(),
747 "declared route should still have BASE_URL"
748 );
749 let github_token = vars.iter().find(|(k, _)| k == "GITHUB_TOKEN");
750 assert!(
751 github_token.is_none(),
752 "unloaded route must not inject phantom GITHUB_TOKEN"
753 );
754 }
755
756 #[test]
757 fn test_no_proxy_excludes_credential_upstreams() {
758 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
759 let handle = ProxyHandle {
760 port: 12345,
761 token: Zeroizing::new("test_token".to_string()),
762 audit_log: audit::new_audit_log(),
763 shutdown_tx,
764 loaded_routes: std::collections::HashSet::new(),
765 no_proxy_hosts: vec![
766 "nats.internal:4222".to_string(),
767 "opencode.internal:4096".to_string(),
768 ],
769 };
770
771 let vars = handle.env_vars();
772 let no_proxy = vars.iter().find(|(k, _)| k == "NO_PROXY").unwrap();
773 assert!(
774 no_proxy.1.contains("nats.internal"),
775 "non-credential host should be in NO_PROXY"
776 );
777 assert!(
778 no_proxy.1.contains("opencode.internal"),
779 "non-credential host should be in NO_PROXY"
780 );
781 assert!(
782 no_proxy.1.contains("localhost"),
783 "localhost should always be in NO_PROXY"
784 );
785 }
786
787 #[test]
788 fn test_no_proxy_empty_when_no_non_credential_hosts() {
789 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
790 let handle = ProxyHandle {
791 port: 12345,
792 token: Zeroizing::new("test_token".to_string()),
793 audit_log: audit::new_audit_log(),
794 shutdown_tx,
795 loaded_routes: std::collections::HashSet::new(),
796 no_proxy_hosts: Vec::new(),
797 };
798
799 let vars = handle.env_vars();
800 let no_proxy = vars.iter().find(|(k, _)| k == "NO_PROXY").unwrap();
801 assert_eq!(
802 no_proxy.1, "localhost,127.0.0.1",
803 "NO_PROXY should only contain loopback when no bypass hosts"
804 );
805 }
806}