1use crate::audit;
11use crate::config::ProxyConfig;
12use crate::connect;
13use crate::credential::CredentialStore;
14use crate::error::{ProxyError, Result};
15use crate::external;
16use crate::filter::ProxyFilter;
17use crate::reverse;
18use crate::token;
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::net::TcpListener;
24use tokio::sync::watch;
25use tracing::{debug, info, warn};
26use zeroize::Zeroizing;
27
28const MAX_HEADER_SIZE: usize = 64 * 1024;
31
32pub struct ProxyHandle {
37 pub port: u16,
39 pub token: Zeroizing<String>,
41 audit_log: audit::SharedAuditLog,
43 shutdown_tx: watch::Sender<bool>,
45 loaded_routes: std::collections::HashSet<String>,
49 no_proxy_hosts: Vec<String>,
52}
53
54impl ProxyHandle {
55 pub fn shutdown(&self) {
57 let _ = self.shutdown_tx.send(true);
58 }
59
60 #[must_use]
62 pub fn drain_audit_events(&self) -> Vec<nono::undo::NetworkAuditEvent> {
63 audit::drain_audit_events(&self.audit_log)
64 }
65
66 #[must_use]
74 pub fn env_vars(&self) -> Vec<(String, String)> {
75 let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
76
77 let mut no_proxy_parts = vec!["localhost".to_string(), "127.0.0.1".to_string()];
81 for host in &self.no_proxy_hosts {
82 let hostname = if host.contains("]:") {
85 host.rsplit_once("]:")
87 .map(|(h, _)| format!("{}]", h))
88 .unwrap_or_else(|| host.clone())
89 } else {
90 host.rsplit_once(':')
91 .and_then(|(h, p)| p.parse::<u16>().ok().map(|_| h.to_string()))
92 .unwrap_or_else(|| host.clone())
93 };
94 if !no_proxy_parts.contains(&hostname.to_string()) {
95 no_proxy_parts.push(hostname.to_string());
96 }
97 }
98 let no_proxy = no_proxy_parts.join(",");
99
100 let mut vars = vec![
101 ("HTTP_PROXY".to_string(), proxy_url.clone()),
102 ("HTTPS_PROXY".to_string(), proxy_url.clone()),
103 ("NO_PROXY".to_string(), no_proxy.clone()),
104 ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
105 ];
106
107 vars.push(("http_proxy".to_string(), proxy_url.clone()));
109 vars.push(("https_proxy".to_string(), proxy_url));
110 vars.push(("no_proxy".to_string(), no_proxy));
111
112 vars.push(("NODE_USE_ENV_PROXY".to_string(), "1".to_string()));
114
115 vars
116 }
117
118 #[must_use]
127 pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
128 let mut vars = Vec::new();
129 for route in &config.routes {
130 let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
132 let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
133 vars.push((base_url_name, url));
134
135 if !self.loaded_routes.contains(&route.prefix) {
140 continue;
141 }
142
143 if let Some(ref env_var) = route.env_var {
147 vars.push((env_var.clone(), self.token.to_string()));
148 } else if let Some(ref cred_key) = route.credential_key {
149 let api_key_name = cred_key.to_uppercase();
150 vars.push((api_key_name, self.token.to_string()));
151 }
152 }
153 vars
154 }
155}
156
157struct ProxyState {
159 filter: ProxyFilter,
160 session_token: Zeroizing<String>,
161 credential_store: CredentialStore,
162 config: ProxyConfig,
163 tls_connector: tokio_rustls::TlsConnector,
166 active_connections: AtomicUsize,
168 audit_log: audit::SharedAuditLog,
170 bypass_matcher: external::BypassMatcher,
173}
174
175pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
183 let session_token = token::generate_session_token()?;
185
186 let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
188 let listener = TcpListener::bind(bind_addr)
189 .await
190 .map_err(|e| ProxyError::Bind {
191 addr: bind_addr.to_string(),
192 source: e,
193 })?;
194
195 let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
196 addr: bind_addr.to_string(),
197 source: e,
198 })?;
199 let port = local_addr.port();
200
201 info!("Proxy server listening on {}", local_addr);
202
203 let credential_store = if config.routes.is_empty() {
205 CredentialStore::empty()
206 } else {
207 CredentialStore::load(&config.routes)?
208 };
209 let loaded_routes = credential_store.loaded_prefixes();
210
211 let filter = if config.allowed_hosts.is_empty() {
213 ProxyFilter::allow_all()
214 } else {
215 ProxyFilter::new(&config.allowed_hosts)
216 };
217
218 let mut root_store = rustls::RootCertStore::empty();
222 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
223 let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
224 rustls::crypto::ring::default_provider(),
225 ))
226 .with_safe_default_protocol_versions()
227 .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
228 .with_root_certificates(root_store)
229 .with_no_client_auth();
230 let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
231
232 let bypass_matcher = config
234 .external_proxy
235 .as_ref()
236 .map(|ext| external::BypassMatcher::new(&ext.bypass_hosts))
237 .unwrap_or_else(|| external::BypassMatcher::new(&[]));
238
239 let (shutdown_tx, shutdown_rx) = watch::channel(false);
241 let audit_log = audit::new_audit_log();
242
243 let credential_hosts = credential_store.credential_upstream_hosts();
248 let no_proxy_hosts: Vec<String> = config
249 .allowed_hosts
250 .iter()
251 .filter(|host| {
252 let normalised = {
253 let h = host.to_lowercase();
254 if h.contains(':') {
255 h
256 } else {
257 format!("{}:443", h)
258 }
259 };
260 !credential_hosts.contains(&normalised)
261 })
262 .cloned()
263 .collect();
264
265 if !no_proxy_hosts.is_empty() {
266 debug!("Smart NO_PROXY bypass hosts: {:?}", no_proxy_hosts);
267 }
268
269 let state = Arc::new(ProxyState {
270 filter,
271 session_token: session_token.clone(),
272 credential_store,
273 config,
274 tls_connector,
275 active_connections: AtomicUsize::new(0),
276 audit_log: Arc::clone(&audit_log),
277 bypass_matcher,
278 });
279
280 tokio::spawn(accept_loop(listener, state, shutdown_rx));
284
285 Ok(ProxyHandle {
286 port,
287 token: session_token,
288 audit_log,
289 shutdown_tx,
290 loaded_routes,
291 no_proxy_hosts,
292 })
293}
294
295async fn accept_loop(
297 listener: TcpListener,
298 state: Arc<ProxyState>,
299 mut shutdown_rx: watch::Receiver<bool>,
300) {
301 loop {
302 tokio::select! {
303 result = listener.accept() => {
304 match result {
305 Ok((stream, addr)) => {
306 let max = state.config.max_connections;
308 if max > 0 {
309 let current = state.active_connections.load(Ordering::Relaxed);
310 if current >= max {
311 warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
312 drop(stream);
314 continue;
315 }
316 }
317 state.active_connections.fetch_add(1, Ordering::Relaxed);
318
319 debug!("Accepted connection from {}", addr);
320 let state = Arc::clone(&state);
321 tokio::spawn(async move {
322 if let Err(e) = handle_connection(stream, &state).await {
323 debug!("Connection handler error: {}", e);
324 }
325 state.active_connections.fetch_sub(1, Ordering::Relaxed);
326 });
327 }
328 Err(e) => {
329 warn!("Accept error: {}", e);
330 }
331 }
332 }
333 _ = shutdown_rx.changed() => {
334 if *shutdown_rx.borrow() {
335 info!("Proxy server shutting down");
336 return;
337 }
338 }
339 }
340 }
341}
342
343async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
349 let mut buf_reader = BufReader::new(&mut stream);
353 let mut first_line = String::new();
354 buf_reader.read_line(&mut first_line).await?;
355
356 if first_line.is_empty() {
357 return Ok(()); }
359
360 let mut header_bytes = Vec::new();
362 loop {
363 let mut line = String::new();
364 let n = buf_reader.read_line(&mut line).await?;
365 if n == 0 || line.trim().is_empty() {
366 break;
367 }
368 header_bytes.extend_from_slice(line.as_bytes());
369 if header_bytes.len() > MAX_HEADER_SIZE {
370 drop(buf_reader);
371 let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
372 stream.write_all(response.as_bytes()).await?;
373 return Ok(());
374 }
375 }
376
377 let buffered = buf_reader.buffer().to_vec();
382 drop(buf_reader);
383
384 let first_line = first_line.trim_end();
385
386 if first_line.starts_with("CONNECT ") {
388 if !state.credential_store.is_empty() {
393 if let Some(authority) = first_line.split_whitespace().nth(1) {
394 let host_port = if authority.starts_with('[') {
397 if authority.contains("]:") {
399 authority.to_lowercase()
400 } else {
401 format!("{}:443", authority.to_lowercase())
402 }
403 } else if authority.contains(':') {
404 authority.to_lowercase()
405 } else {
406 format!("{}:443", authority.to_lowercase())
407 };
408 if state.credential_store.is_credential_upstream(&host_port) {
409 let (host, port) = host_port
410 .rsplit_once(':')
411 .map(|(h, p)| (h, p.parse::<u16>().unwrap_or(443)))
412 .unwrap_or((&host_port, 443));
413 warn!(
414 "Blocked CONNECT to credential upstream {} — use reverse proxy path instead",
415 authority
416 );
417 audit::log_denied(
418 Some(&state.audit_log),
419 audit::ProxyMode::Connect,
420 host,
421 port,
422 "credential upstream: CONNECT bypasses L7 filtering",
423 );
424 let response = "HTTP/1.1 403 Forbidden\r\nContent-Length: 0\r\n\r\n";
425 stream.write_all(response.as_bytes()).await?;
426 return Ok(());
427 }
428 }
429 }
430
431 let use_external = if let Some(ref ext_config) = state.config.external_proxy {
433 if state.bypass_matcher.is_empty() {
434 Some(ext_config)
435 } else {
436 let host = first_line
438 .split_whitespace()
439 .nth(1)
440 .and_then(|authority| {
441 authority
442 .rsplit_once(':')
443 .map(|(h, _)| h)
444 .or(Some(authority))
445 })
446 .unwrap_or("");
447 if state.bypass_matcher.matches(host) {
448 debug!("Bypassing external proxy for {}", host);
449 None
450 } else {
451 Some(ext_config)
452 }
453 }
454 } else {
455 None
456 };
457
458 if let Some(ext_config) = use_external {
459 external::handle_external_proxy(
460 first_line,
461 &mut stream,
462 &header_bytes,
463 &state.filter,
464 &state.session_token,
465 ext_config,
466 Some(&state.audit_log),
467 )
468 .await
469 } else if state.config.external_proxy.is_some() {
470 token::validate_proxy_auth(&header_bytes, &state.session_token)?;
475 connect::handle_connect(
476 first_line,
477 &mut stream,
478 &state.filter,
479 &state.session_token,
480 &header_bytes,
481 Some(&state.audit_log),
482 )
483 .await
484 } else {
485 connect::handle_connect(
486 first_line,
487 &mut stream,
488 &state.filter,
489 &state.session_token,
490 &header_bytes,
491 Some(&state.audit_log),
492 )
493 .await
494 }
495 } else if !state.credential_store.is_empty() {
496 let ctx = reverse::ReverseProxyCtx {
498 credential_store: &state.credential_store,
499 session_token: &state.session_token,
500 filter: &state.filter,
501 tls_connector: &state.tls_connector,
502 audit_log: Some(&state.audit_log),
503 };
504 reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
505 } else {
506 let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
508 stream.write_all(response.as_bytes()).await?;
509 Ok(())
510 }
511}
512
513#[cfg(test)]
514#[allow(clippy::unwrap_used)]
515mod tests {
516 use super::*;
517
518 #[tokio::test]
519 async fn test_proxy_starts_and_binds() {
520 let config = ProxyConfig::default();
521 let handle = start(config).await.unwrap();
522
523 assert!(handle.port > 0);
525 assert_eq!(handle.token.len(), 64);
527
528 handle.shutdown();
530 }
531
532 #[tokio::test]
533 async fn test_proxy_env_vars() {
534 let config = ProxyConfig::default();
535 let handle = start(config).await.unwrap();
536
537 let vars = handle.env_vars();
538 let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
539 assert!(http_proxy.is_some());
540 assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
541
542 let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
543 assert!(token_var.is_some());
544 assert_eq!(token_var.unwrap().1.len(), 64);
545
546 handle.shutdown();
547 }
548
549 #[tokio::test]
550 async fn test_proxy_credential_env_vars() {
551 let config = ProxyConfig {
552 routes: vec![crate::config::RouteConfig {
553 prefix: "openai".to_string(),
554 upstream: "https://api.openai.com".to_string(),
555 credential_key: None,
556 inject_mode: crate::config::InjectMode::Header,
557 inject_header: "Authorization".to_string(),
558 credential_format: "Bearer {}".to_string(),
559 path_pattern: None,
560 path_replacement: None,
561 query_param_name: None,
562 env_var: None,
563 endpoint_rules: vec![],
564 }],
565 ..Default::default()
566 };
567 let handle = start(config.clone()).await.unwrap();
568
569 let vars = handle.credential_env_vars(&config);
570 assert_eq!(vars.len(), 1);
571 assert_eq!(vars[0].0, "OPENAI_BASE_URL");
572 assert!(vars[0].1.contains("/openai"));
573
574 handle.shutdown();
575 }
576
577 #[test]
578 fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
579 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
583 let handle = ProxyHandle {
584 port: 12345,
585 token: Zeroizing::new("test_token".to_string()),
586 audit_log: audit::new_audit_log(),
587 shutdown_tx,
588 loaded_routes: ["openai".to_string()].into_iter().collect(),
589 no_proxy_hosts: Vec::new(),
590 };
591 let config = ProxyConfig {
592 routes: vec![crate::config::RouteConfig {
593 prefix: "openai".to_string(),
594 upstream: "https://api.openai.com".to_string(),
595 credential_key: Some("openai_api_key".to_string()),
596 inject_mode: crate::config::InjectMode::Header,
597 inject_header: "Authorization".to_string(),
598 credential_format: "Bearer {}".to_string(),
599 path_pattern: None,
600 path_replacement: None,
601 query_param_name: None,
602 env_var: None, endpoint_rules: vec![],
604 }],
605 ..Default::default()
606 };
607
608 let vars = handle.credential_env_vars(&config);
609 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
613 assert!(
614 api_key_var.is_some(),
615 "Should derive env var name from credential_key.to_uppercase()"
616 );
617
618 let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
619 assert_eq!(val, "test_token");
620 }
621
622 #[test]
623 fn test_proxy_credential_env_vars_with_explicit_env_var() {
624 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
632 let handle = ProxyHandle {
633 port: 12345,
634 token: Zeroizing::new("test_token".to_string()),
635 audit_log: audit::new_audit_log(),
636 shutdown_tx,
637 loaded_routes: ["openai".to_string()].into_iter().collect(),
638 no_proxy_hosts: Vec::new(),
639 };
640 let config = ProxyConfig {
641 routes: vec![crate::config::RouteConfig {
642 prefix: "openai".to_string(),
643 upstream: "https://api.openai.com".to_string(),
644 credential_key: Some("op://Development/OpenAI/credential".to_string()),
645 inject_mode: crate::config::InjectMode::Header,
646 inject_header: "Authorization".to_string(),
647 credential_format: "Bearer {}".to_string(),
648 path_pattern: None,
649 path_replacement: None,
650 query_param_name: None,
651 env_var: Some("OPENAI_API_KEY".to_string()),
652 endpoint_rules: vec![],
653 }],
654 ..Default::default()
655 };
656
657 let vars = handle.credential_env_vars(&config);
658 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
661 assert!(
662 api_key_var.is_some(),
663 "Should use explicit env_var name, not derive from credential_key"
664 );
665
666 let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
668 assert_eq!(val, "test_token");
669
670 let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
672 assert!(
673 bad_var.is_none(),
674 "Should not generate env var from op:// URI uppercase"
675 );
676 }
677
678 #[test]
679 fn test_proxy_credential_env_vars_skips_unloaded_routes() {
680 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
685 let handle = ProxyHandle {
686 port: 12345,
687 token: Zeroizing::new("test_token".to_string()),
688 audit_log: audit::new_audit_log(),
689 shutdown_tx,
690 loaded_routes: ["openai".to_string()].into_iter().collect(),
692 no_proxy_hosts: Vec::new(),
693 };
694 let config = ProxyConfig {
695 routes: vec![
696 crate::config::RouteConfig {
697 prefix: "openai".to_string(),
698 upstream: "https://api.openai.com".to_string(),
699 credential_key: Some("openai_api_key".to_string()),
700 inject_mode: crate::config::InjectMode::Header,
701 inject_header: "Authorization".to_string(),
702 credential_format: "Bearer {}".to_string(),
703 path_pattern: None,
704 path_replacement: None,
705 query_param_name: None,
706 env_var: None,
707 endpoint_rules: vec![],
708 },
709 crate::config::RouteConfig {
710 prefix: "github".to_string(),
711 upstream: "https://api.github.com".to_string(),
712 credential_key: Some("env://GITHUB_TOKEN".to_string()),
713 inject_mode: crate::config::InjectMode::Header,
714 inject_header: "Authorization".to_string(),
715 credential_format: "token {}".to_string(),
716 path_pattern: None,
717 path_replacement: None,
718 query_param_name: None,
719 env_var: Some("GITHUB_TOKEN".to_string()),
720 endpoint_rules: vec![],
721 },
722 ],
723 ..Default::default()
724 };
725
726 let vars = handle.credential_env_vars(&config);
727
728 let openai_base = vars.iter().find(|(k, _)| k == "OPENAI_BASE_URL");
730 assert!(openai_base.is_some(), "loaded route should have BASE_URL");
731 let openai_key = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
732 assert!(openai_key.is_some(), "loaded route should have API key");
733
734 let github_base = vars.iter().find(|(k, _)| k == "GITHUB_BASE_URL");
737 assert!(
738 github_base.is_some(),
739 "declared route should still have BASE_URL"
740 );
741 let github_token = vars.iter().find(|(k, _)| k == "GITHUB_TOKEN");
742 assert!(
743 github_token.is_none(),
744 "unloaded route must not inject phantom GITHUB_TOKEN"
745 );
746 }
747
748 #[test]
749 fn test_no_proxy_excludes_credential_upstreams() {
750 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
751 let handle = ProxyHandle {
752 port: 12345,
753 token: Zeroizing::new("test_token".to_string()),
754 audit_log: audit::new_audit_log(),
755 shutdown_tx,
756 loaded_routes: std::collections::HashSet::new(),
757 no_proxy_hosts: vec![
758 "nats.internal:4222".to_string(),
759 "opencode.internal:4096".to_string(),
760 ],
761 };
762
763 let vars = handle.env_vars();
764 let no_proxy = vars.iter().find(|(k, _)| k == "NO_PROXY").unwrap();
765 assert!(
766 no_proxy.1.contains("nats.internal"),
767 "non-credential host should be in NO_PROXY"
768 );
769 assert!(
770 no_proxy.1.contains("opencode.internal"),
771 "non-credential host should be in NO_PROXY"
772 );
773 assert!(
774 no_proxy.1.contains("localhost"),
775 "localhost should always be in NO_PROXY"
776 );
777 }
778
779 #[test]
780 fn test_no_proxy_empty_when_no_non_credential_hosts() {
781 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
782 let handle = ProxyHandle {
783 port: 12345,
784 token: Zeroizing::new("test_token".to_string()),
785 audit_log: audit::new_audit_log(),
786 shutdown_tx,
787 loaded_routes: std::collections::HashSet::new(),
788 no_proxy_hosts: Vec::new(),
789 };
790
791 let vars = handle.env_vars();
792 let no_proxy = vars.iter().find(|(k, _)| k == "NO_PROXY").unwrap();
793 assert_eq!(
794 no_proxy.1, "localhost,127.0.0.1",
795 "NO_PROXY should only contain loopback when no bypass hosts"
796 );
797 }
798}