1use crate::audit;
11use crate::config::ProxyConfig;
12use crate::connect;
13use crate::credential::CredentialStore;
14use crate::error::{ProxyError, Result};
15use crate::external;
16use crate::filter::ProxyFilter;
17use crate::reverse;
18use crate::token;
19use std::net::SocketAddr;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
23use tokio::net::TcpListener;
24use tokio::sync::watch;
25use tracing::{debug, info, warn};
26use zeroize::Zeroizing;
27
28const MAX_HEADER_SIZE: usize = 64 * 1024;
31
32pub struct ProxyHandle {
37 pub port: u16,
39 pub token: Zeroizing<String>,
41 audit_log: audit::SharedAuditLog,
43 shutdown_tx: watch::Sender<bool>,
45 loaded_routes: std::collections::HashSet<String>,
49}
50
51impl ProxyHandle {
52 pub fn shutdown(&self) {
54 let _ = self.shutdown_tx.send(true);
55 }
56
57 #[must_use]
59 pub fn drain_audit_events(&self) -> Vec<nono::undo::NetworkAuditEvent> {
60 audit::drain_audit_events(&self.audit_log)
61 }
62
63 #[must_use]
71 pub fn env_vars(&self) -> Vec<(String, String)> {
72 let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
73
74 let mut vars = vec![
75 ("HTTP_PROXY".to_string(), proxy_url.clone()),
76 ("HTTPS_PROXY".to_string(), proxy_url.clone()),
77 ("NO_PROXY".to_string(), "localhost,127.0.0.1".to_string()),
78 ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
79 ];
80
81 vars.push(("http_proxy".to_string(), proxy_url.clone()));
83 vars.push(("https_proxy".to_string(), proxy_url));
84 vars.push(("no_proxy".to_string(), "localhost,127.0.0.1".to_string()));
85
86 vars.push(("NODE_USE_ENV_PROXY".to_string(), "1".to_string()));
88
89 vars
90 }
91
92 #[must_use]
101 pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
102 let mut vars = Vec::new();
103 for route in &config.routes {
104 let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
106 let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
107 vars.push((base_url_name, url));
108
109 if !self.loaded_routes.contains(&route.prefix) {
114 continue;
115 }
116
117 if let Some(ref env_var) = route.env_var {
121 vars.push((env_var.clone(), self.token.to_string()));
122 } else if let Some(ref cred_key) = route.credential_key {
123 let api_key_name = cred_key.to_uppercase();
124 vars.push((api_key_name, self.token.to_string()));
125 }
126 }
127 vars
128 }
129}
130
131struct ProxyState {
133 filter: ProxyFilter,
134 session_token: Zeroizing<String>,
135 credential_store: CredentialStore,
136 config: ProxyConfig,
137 tls_connector: tokio_rustls::TlsConnector,
140 active_connections: AtomicUsize,
142 audit_log: audit::SharedAuditLog,
144}
145
146pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
154 let session_token = token::generate_session_token()?;
156
157 let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
159 let listener = TcpListener::bind(bind_addr)
160 .await
161 .map_err(|e| ProxyError::Bind {
162 addr: bind_addr.to_string(),
163 source: e,
164 })?;
165
166 let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
167 addr: bind_addr.to_string(),
168 source: e,
169 })?;
170 let port = local_addr.port();
171
172 info!("Proxy server listening on {}", local_addr);
173
174 let credential_store = if config.routes.is_empty() {
176 CredentialStore::empty()
177 } else {
178 CredentialStore::load(&config.routes)?
179 };
180 let loaded_routes = credential_store.loaded_prefixes();
181
182 let filter = if config.allowed_hosts.is_empty() {
184 ProxyFilter::allow_all()
185 } else {
186 ProxyFilter::new(&config.allowed_hosts)
187 };
188
189 let mut root_store = rustls::RootCertStore::empty();
193 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
194 let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
195 rustls::crypto::ring::default_provider(),
196 ))
197 .with_safe_default_protocol_versions()
198 .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
199 .with_root_certificates(root_store)
200 .with_no_client_auth();
201 let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
202
203 let (shutdown_tx, shutdown_rx) = watch::channel(false);
205 let audit_log = audit::new_audit_log();
206
207 let state = Arc::new(ProxyState {
208 filter,
209 session_token: session_token.clone(),
210 credential_store,
211 config,
212 tls_connector,
213 active_connections: AtomicUsize::new(0),
214 audit_log: Arc::clone(&audit_log),
215 });
216
217 tokio::spawn(accept_loop(listener, state, shutdown_rx));
221
222 Ok(ProxyHandle {
223 port,
224 token: session_token,
225 audit_log,
226 shutdown_tx,
227 loaded_routes,
228 })
229}
230
231async fn accept_loop(
233 listener: TcpListener,
234 state: Arc<ProxyState>,
235 mut shutdown_rx: watch::Receiver<bool>,
236) {
237 loop {
238 tokio::select! {
239 result = listener.accept() => {
240 match result {
241 Ok((stream, addr)) => {
242 let max = state.config.max_connections;
244 if max > 0 {
245 let current = state.active_connections.load(Ordering::Relaxed);
246 if current >= max {
247 warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
248 drop(stream);
250 continue;
251 }
252 }
253 state.active_connections.fetch_add(1, Ordering::Relaxed);
254
255 debug!("Accepted connection from {}", addr);
256 let state = Arc::clone(&state);
257 tokio::spawn(async move {
258 if let Err(e) = handle_connection(stream, &state).await {
259 debug!("Connection handler error: {}", e);
260 }
261 state.active_connections.fetch_sub(1, Ordering::Relaxed);
262 });
263 }
264 Err(e) => {
265 warn!("Accept error: {}", e);
266 }
267 }
268 }
269 _ = shutdown_rx.changed() => {
270 if *shutdown_rx.borrow() {
271 info!("Proxy server shutting down");
272 return;
273 }
274 }
275 }
276 }
277}
278
279async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
285 let mut buf_reader = BufReader::new(&mut stream);
289 let mut first_line = String::new();
290 buf_reader.read_line(&mut first_line).await?;
291
292 if first_line.is_empty() {
293 return Ok(()); }
295
296 let mut header_bytes = Vec::new();
298 loop {
299 let mut line = String::new();
300 let n = buf_reader.read_line(&mut line).await?;
301 if n == 0 || line.trim().is_empty() {
302 break;
303 }
304 header_bytes.extend_from_slice(line.as_bytes());
305 if header_bytes.len() > MAX_HEADER_SIZE {
306 drop(buf_reader);
307 let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
308 stream.write_all(response.as_bytes()).await?;
309 return Ok(());
310 }
311 }
312
313 let buffered = buf_reader.buffer().to_vec();
318 drop(buf_reader);
319
320 let first_line = first_line.trim_end();
321
322 if first_line.starts_with("CONNECT ") {
324 if let Some(ref ext_config) = state.config.external_proxy {
326 external::handle_external_proxy(
327 first_line,
328 &mut stream,
329 &header_bytes,
330 &state.filter,
331 &state.session_token,
332 ext_config,
333 Some(&state.audit_log),
334 )
335 .await
336 } else {
337 connect::handle_connect(
338 first_line,
339 &mut stream,
340 &state.filter,
341 &state.session_token,
342 &header_bytes,
343 Some(&state.audit_log),
344 )
345 .await
346 }
347 } else if !state.credential_store.is_empty() {
348 let ctx = reverse::ReverseProxyCtx {
350 credential_store: &state.credential_store,
351 session_token: &state.session_token,
352 filter: &state.filter,
353 tls_connector: &state.tls_connector,
354 audit_log: Some(&state.audit_log),
355 };
356 reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
357 } else {
358 let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
360 stream.write_all(response.as_bytes()).await?;
361 Ok(())
362 }
363}
364
365#[cfg(test)]
366#[allow(clippy::unwrap_used)]
367mod tests {
368 use super::*;
369
370 #[tokio::test]
371 async fn test_proxy_starts_and_binds() {
372 let config = ProxyConfig::default();
373 let handle = start(config).await.unwrap();
374
375 assert!(handle.port > 0);
377 assert_eq!(handle.token.len(), 64);
379
380 handle.shutdown();
382 }
383
384 #[tokio::test]
385 async fn test_proxy_env_vars() {
386 let config = ProxyConfig::default();
387 let handle = start(config).await.unwrap();
388
389 let vars = handle.env_vars();
390 let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
391 assert!(http_proxy.is_some());
392 assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
393
394 let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
395 assert!(token_var.is_some());
396 assert_eq!(token_var.unwrap().1.len(), 64);
397
398 handle.shutdown();
399 }
400
401 #[tokio::test]
402 async fn test_proxy_credential_env_vars() {
403 let config = ProxyConfig {
404 routes: vec![crate::config::RouteConfig {
405 prefix: "openai".to_string(),
406 upstream: "https://api.openai.com".to_string(),
407 credential_key: None,
408 inject_mode: crate::config::InjectMode::Header,
409 inject_header: "Authorization".to_string(),
410 credential_format: "Bearer {}".to_string(),
411 path_pattern: None,
412 path_replacement: None,
413 query_param_name: None,
414 env_var: None,
415 }],
416 ..Default::default()
417 };
418 let handle = start(config.clone()).await.unwrap();
419
420 let vars = handle.credential_env_vars(&config);
421 assert_eq!(vars.len(), 1);
422 assert_eq!(vars[0].0, "OPENAI_BASE_URL");
423 assert!(vars[0].1.contains("/openai"));
424
425 handle.shutdown();
426 }
427
428 #[test]
429 fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
430 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
434 let handle = ProxyHandle {
435 port: 12345,
436 token: Zeroizing::new("test_token".to_string()),
437 audit_log: audit::new_audit_log(),
438 shutdown_tx,
439 loaded_routes: ["openai".to_string()].into_iter().collect(),
440 };
441 let config = ProxyConfig {
442 routes: vec![crate::config::RouteConfig {
443 prefix: "openai".to_string(),
444 upstream: "https://api.openai.com".to_string(),
445 credential_key: Some("openai_api_key".to_string()),
446 inject_mode: crate::config::InjectMode::Header,
447 inject_header: "Authorization".to_string(),
448 credential_format: "Bearer {}".to_string(),
449 path_pattern: None,
450 path_replacement: None,
451 query_param_name: None,
452 env_var: None, }],
454 ..Default::default()
455 };
456
457 let vars = handle.credential_env_vars(&config);
458 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
462 assert!(
463 api_key_var.is_some(),
464 "Should derive env var name from credential_key.to_uppercase()"
465 );
466
467 let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
468 assert_eq!(val, "test_token");
469 }
470
471 #[test]
472 fn test_proxy_credential_env_vars_with_explicit_env_var() {
473 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
480 let handle = ProxyHandle {
481 port: 12345,
482 token: Zeroizing::new("test_token".to_string()),
483 audit_log: audit::new_audit_log(),
484 shutdown_tx,
485 loaded_routes: ["openai".to_string()].into_iter().collect(),
486 };
487 let config = ProxyConfig {
488 routes: vec![crate::config::RouteConfig {
489 prefix: "openai".to_string(),
490 upstream: "https://api.openai.com".to_string(),
491 credential_key: Some("op://Development/OpenAI/credential".to_string()),
492 inject_mode: crate::config::InjectMode::Header,
493 inject_header: "Authorization".to_string(),
494 credential_format: "Bearer {}".to_string(),
495 path_pattern: None,
496 path_replacement: None,
497 query_param_name: None,
498 env_var: Some("OPENAI_API_KEY".to_string()),
499 }],
500 ..Default::default()
501 };
502
503 let vars = handle.credential_env_vars(&config);
504 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
507 assert!(
508 api_key_var.is_some(),
509 "Should use explicit env_var name, not derive from credential_key"
510 );
511
512 let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
514 assert_eq!(val, "test_token");
515
516 let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
518 assert!(
519 bad_var.is_none(),
520 "Should not generate env var from op:// URI uppercase"
521 );
522 }
523
524 #[test]
525 fn test_proxy_credential_env_vars_skips_unloaded_routes() {
526 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
531 let handle = ProxyHandle {
532 port: 12345,
533 token: Zeroizing::new("test_token".to_string()),
534 audit_log: audit::new_audit_log(),
535 shutdown_tx,
536 loaded_routes: ["openai".to_string()].into_iter().collect(),
538 };
539 let config = ProxyConfig {
540 routes: vec![
541 crate::config::RouteConfig {
542 prefix: "openai".to_string(),
543 upstream: "https://api.openai.com".to_string(),
544 credential_key: Some("openai_api_key".to_string()),
545 inject_mode: crate::config::InjectMode::Header,
546 inject_header: "Authorization".to_string(),
547 credential_format: "Bearer {}".to_string(),
548 path_pattern: None,
549 path_replacement: None,
550 query_param_name: None,
551 env_var: None,
552 },
553 crate::config::RouteConfig {
554 prefix: "github".to_string(),
555 upstream: "https://api.github.com".to_string(),
556 credential_key: Some("env://GITHUB_TOKEN".to_string()),
557 inject_mode: crate::config::InjectMode::Header,
558 inject_header: "Authorization".to_string(),
559 credential_format: "token {}".to_string(),
560 path_pattern: None,
561 path_replacement: None,
562 query_param_name: None,
563 env_var: Some("GITHUB_TOKEN".to_string()),
564 },
565 ],
566 ..Default::default()
567 };
568
569 let vars = handle.credential_env_vars(&config);
570
571 let openai_base = vars.iter().find(|(k, _)| k == "OPENAI_BASE_URL");
573 assert!(openai_base.is_some(), "loaded route should have BASE_URL");
574 let openai_key = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
575 assert!(openai_key.is_some(), "loaded route should have API key");
576
577 let github_base = vars.iter().find(|(k, _)| k == "GITHUB_BASE_URL");
580 assert!(
581 github_base.is_some(),
582 "declared route should still have BASE_URL"
583 );
584 let github_token = vars.iter().find(|(k, _)| k == "GITHUB_TOKEN");
585 assert!(
586 github_token.is_none(),
587 "unloaded route must not inject phantom GITHUB_TOKEN"
588 );
589 }
590}