1use crate::config::ProxyConfig;
11use crate::connect;
12use crate::credential::CredentialStore;
13use crate::error::{ProxyError, Result};
14use crate::external;
15use crate::filter::ProxyFilter;
16use crate::reverse;
17use crate::token;
18use std::net::SocketAddr;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
22use tokio::net::TcpListener;
23use tokio::sync::watch;
24use tracing::{debug, info, warn};
25use zeroize::Zeroizing;
26
27const MAX_HEADER_SIZE: usize = 64 * 1024;
30
31pub struct ProxyHandle {
36 pub port: u16,
38 pub token: Zeroizing<String>,
40 shutdown_tx: watch::Sender<bool>,
42}
43
44impl ProxyHandle {
45 pub fn shutdown(&self) {
47 let _ = self.shutdown_tx.send(true);
48 }
49
50 #[must_use]
58 pub fn env_vars(&self) -> Vec<(String, String)> {
59 let proxy_url = format!("http://nono:{}@127.0.0.1:{}", &*self.token, self.port);
60
61 let mut vars = vec![
62 ("HTTP_PROXY".to_string(), proxy_url.clone()),
63 ("HTTPS_PROXY".to_string(), proxy_url.clone()),
64 ("NO_PROXY".to_string(), "localhost,127.0.0.1".to_string()),
65 ("NONO_PROXY_TOKEN".to_string(), self.token.to_string()),
66 ];
67
68 vars.push(("http_proxy".to_string(), proxy_url.clone()));
70 vars.push(("https_proxy".to_string(), proxy_url));
71 vars.push(("no_proxy".to_string(), "localhost,127.0.0.1".to_string()));
72
73 vars.push(("NODE_USE_ENV_PROXY".to_string(), "1".to_string()));
75
76 vars
77 }
78
79 #[must_use]
88 pub fn credential_env_vars(&self, config: &ProxyConfig) -> Vec<(String, String)> {
89 let mut vars = Vec::new();
90 for route in &config.routes {
91 let base_url_name = format!("{}_BASE_URL", route.prefix.to_uppercase());
93 let url = format!("http://127.0.0.1:{}/{}", self.port, route.prefix);
94 vars.push((base_url_name, url));
95
96 if let Some(ref env_var) = route.env_var {
100 vars.push((env_var.clone(), self.token.to_string()));
101 } else if let Some(ref cred_key) = route.credential_key {
102 let api_key_name = cred_key.to_uppercase();
103 vars.push((api_key_name, self.token.to_string()));
104 }
105 }
106 vars
107 }
108}
109
110struct ProxyState {
112 filter: ProxyFilter,
113 session_token: Zeroizing<String>,
114 credential_store: CredentialStore,
115 config: ProxyConfig,
116 tls_connector: tokio_rustls::TlsConnector,
119 active_connections: AtomicUsize,
121}
122
123pub async fn start(config: ProxyConfig) -> Result<ProxyHandle> {
131 let session_token = token::generate_session_token()?;
133
134 let bind_addr = SocketAddr::new(config.bind_addr, config.bind_port);
136 let listener = TcpListener::bind(bind_addr)
137 .await
138 .map_err(|e| ProxyError::Bind {
139 addr: bind_addr.to_string(),
140 source: e,
141 })?;
142
143 let local_addr = listener.local_addr().map_err(|e| ProxyError::Bind {
144 addr: bind_addr.to_string(),
145 source: e,
146 })?;
147 let port = local_addr.port();
148
149 info!("Proxy server listening on {}", local_addr);
150
151 let credential_store = if config.routes.is_empty() {
153 CredentialStore::empty()
154 } else {
155 CredentialStore::load(&config.routes)?
156 };
157
158 let filter = if config.allowed_hosts.is_empty() {
160 ProxyFilter::allow_all()
161 } else {
162 ProxyFilter::new(&config.allowed_hosts)
163 };
164
165 let mut root_store = rustls::RootCertStore::empty();
169 root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
170 let tls_config = rustls::ClientConfig::builder_with_provider(Arc::new(
171 rustls::crypto::ring::default_provider(),
172 ))
173 .with_safe_default_protocol_versions()
174 .map_err(|e| ProxyError::Config(format!("TLS config error: {}", e)))?
175 .with_root_certificates(root_store)
176 .with_no_client_auth();
177 let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config));
178
179 let (shutdown_tx, shutdown_rx) = watch::channel(false);
181
182 let state = Arc::new(ProxyState {
183 filter,
184 session_token: session_token.clone(),
185 credential_store,
186 config,
187 tls_connector,
188 active_connections: AtomicUsize::new(0),
189 });
190
191 tokio::spawn(accept_loop(listener, state, shutdown_rx));
195
196 Ok(ProxyHandle {
197 port,
198 token: session_token,
199 shutdown_tx,
200 })
201}
202
203async fn accept_loop(
205 listener: TcpListener,
206 state: Arc<ProxyState>,
207 mut shutdown_rx: watch::Receiver<bool>,
208) {
209 loop {
210 tokio::select! {
211 result = listener.accept() => {
212 match result {
213 Ok((stream, addr)) => {
214 let max = state.config.max_connections;
216 if max > 0 {
217 let current = state.active_connections.load(Ordering::Relaxed);
218 if current >= max {
219 warn!("Connection limit reached ({}/{}), rejecting {}", current, max, addr);
220 drop(stream);
222 continue;
223 }
224 }
225 state.active_connections.fetch_add(1, Ordering::Relaxed);
226
227 debug!("Accepted connection from {}", addr);
228 let state = Arc::clone(&state);
229 tokio::spawn(async move {
230 if let Err(e) = handle_connection(stream, &state).await {
231 debug!("Connection handler error: {}", e);
232 }
233 state.active_connections.fetch_sub(1, Ordering::Relaxed);
234 });
235 }
236 Err(e) => {
237 warn!("Accept error: {}", e);
238 }
239 }
240 }
241 _ = shutdown_rx.changed() => {
242 if *shutdown_rx.borrow() {
243 info!("Proxy server shutting down");
244 return;
245 }
246 }
247 }
248 }
249}
250
251async fn handle_connection(mut stream: tokio::net::TcpStream, state: &ProxyState) -> Result<()> {
257 let mut buf_reader = BufReader::new(&mut stream);
261 let mut first_line = String::new();
262 buf_reader.read_line(&mut first_line).await?;
263
264 if first_line.is_empty() {
265 return Ok(()); }
267
268 let mut header_bytes = Vec::new();
270 loop {
271 let mut line = String::new();
272 let n = buf_reader.read_line(&mut line).await?;
273 if n == 0 || line.trim().is_empty() {
274 break;
275 }
276 header_bytes.extend_from_slice(line.as_bytes());
277 if header_bytes.len() > MAX_HEADER_SIZE {
278 drop(buf_reader);
279 let response = "HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n";
280 stream.write_all(response.as_bytes()).await?;
281 return Ok(());
282 }
283 }
284
285 let buffered = buf_reader.buffer().to_vec();
290 drop(buf_reader);
291
292 let first_line = first_line.trim_end();
293
294 if first_line.starts_with("CONNECT ") {
296 if let Some(ref ext_config) = state.config.external_proxy {
298 external::handle_external_proxy(
299 first_line,
300 &mut stream,
301 &header_bytes,
302 &state.filter,
303 &state.session_token,
304 ext_config,
305 )
306 .await
307 } else {
308 connect::handle_connect(
309 first_line,
310 &mut stream,
311 &state.filter,
312 &state.session_token,
313 &header_bytes,
314 )
315 .await
316 }
317 } else if !state.credential_store.is_empty() {
318 let ctx = reverse::ReverseProxyCtx {
320 credential_store: &state.credential_store,
321 session_token: &state.session_token,
322 filter: &state.filter,
323 tls_connector: &state.tls_connector,
324 };
325 reverse::handle_reverse_proxy(first_line, &mut stream, &header_bytes, &ctx, &buffered).await
326 } else {
327 let response = "HTTP/1.1 400 Bad Request\r\n\r\n";
329 stream.write_all(response.as_bytes()).await?;
330 Ok(())
331 }
332}
333
334#[cfg(test)]
335#[allow(clippy::unwrap_used)]
336mod tests {
337 use super::*;
338
339 #[tokio::test]
340 async fn test_proxy_starts_and_binds() {
341 let config = ProxyConfig::default();
342 let handle = start(config).await.unwrap();
343
344 assert!(handle.port > 0);
346 assert_eq!(handle.token.len(), 64);
348
349 handle.shutdown();
351 }
352
353 #[tokio::test]
354 async fn test_proxy_env_vars() {
355 let config = ProxyConfig::default();
356 let handle = start(config).await.unwrap();
357
358 let vars = handle.env_vars();
359 let http_proxy = vars.iter().find(|(k, _)| k == "HTTP_PROXY");
360 assert!(http_proxy.is_some());
361 assert!(http_proxy.unwrap().1.starts_with("http://nono:"));
362
363 let token_var = vars.iter().find(|(k, _)| k == "NONO_PROXY_TOKEN");
364 assert!(token_var.is_some());
365 assert_eq!(token_var.unwrap().1.len(), 64);
366
367 handle.shutdown();
368 }
369
370 #[tokio::test]
371 async fn test_proxy_credential_env_vars() {
372 let config = ProxyConfig {
373 routes: vec![crate::config::RouteConfig {
374 prefix: "openai".to_string(),
375 upstream: "https://api.openai.com".to_string(),
376 credential_key: None,
377 inject_mode: crate::config::InjectMode::Header,
378 inject_header: "Authorization".to_string(),
379 credential_format: "Bearer {}".to_string(),
380 path_pattern: None,
381 path_replacement: None,
382 query_param_name: None,
383 env_var: None,
384 }],
385 ..Default::default()
386 };
387 let handle = start(config.clone()).await.unwrap();
388
389 let vars = handle.credential_env_vars(&config);
390 assert_eq!(vars.len(), 1);
391 assert_eq!(vars[0].0, "OPENAI_BASE_URL");
392 assert!(vars[0].1.contains("/openai"));
393
394 handle.shutdown();
395 }
396
397 #[test]
398 fn test_proxy_credential_env_vars_fallback_to_uppercase_key() {
399 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
403 let handle = ProxyHandle {
404 port: 12345,
405 token: Zeroizing::new("test_token".to_string()),
406 shutdown_tx,
407 };
408 let config = ProxyConfig {
409 routes: vec![crate::config::RouteConfig {
410 prefix: "openai".to_string(),
411 upstream: "https://api.openai.com".to_string(),
412 credential_key: Some("openai_api_key".to_string()),
413 inject_mode: crate::config::InjectMode::Header,
414 inject_header: "Authorization".to_string(),
415 credential_format: "Bearer {}".to_string(),
416 path_pattern: None,
417 path_replacement: None,
418 query_param_name: None,
419 env_var: None, }],
421 ..Default::default()
422 };
423
424 let vars = handle.credential_env_vars(&config);
425 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
429 assert!(
430 api_key_var.is_some(),
431 "Should derive env var name from credential_key.to_uppercase()"
432 );
433
434 let (_, val) = api_key_var.expect("OPENAI_API_KEY should exist");
435 assert_eq!(val, "test_token");
436 }
437
438 #[test]
439 fn test_proxy_credential_env_vars_with_explicit_env_var() {
440 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
447 let handle = ProxyHandle {
448 port: 12345,
449 token: Zeroizing::new("test_token".to_string()),
450 shutdown_tx,
451 };
452 let config = ProxyConfig {
453 routes: vec![crate::config::RouteConfig {
454 prefix: "openai".to_string(),
455 upstream: "https://api.openai.com".to_string(),
456 credential_key: Some("op://Development/OpenAI/credential".to_string()),
457 inject_mode: crate::config::InjectMode::Header,
458 inject_header: "Authorization".to_string(),
459 credential_format: "Bearer {}".to_string(),
460 path_pattern: None,
461 path_replacement: None,
462 query_param_name: None,
463 env_var: Some("OPENAI_API_KEY".to_string()),
464 }],
465 ..Default::default()
466 };
467
468 let vars = handle.credential_env_vars(&config);
469 assert_eq!(vars.len(), 2); let api_key_var = vars.iter().find(|(k, _)| k == "OPENAI_API_KEY");
472 assert!(
473 api_key_var.is_some(),
474 "Should use explicit env_var name, not derive from credential_key"
475 );
476
477 let (_, val) = api_key_var.expect("OPENAI_API_KEY var should exist");
479 assert_eq!(val, "test_token");
480
481 let bad_var = vars.iter().find(|(k, _)| k.starts_with("OP://"));
483 assert!(
484 bad_var.is_none(),
485 "Should not generate env var from op:// URI uppercase"
486 );
487 }
488}