1use crate::audit;
15use crate::config::InjectMode;
16use crate::credential::CredentialStore;
17use crate::error::{ProxyError, Result};
18use crate::filter::ProxyFilter;
19use crate::forward::{self, AuditCtx, UpstreamScheme, UpstreamSpec, UpstreamStrategy};
20use crate::reverse;
21use crate::route::RouteStore;
22use crate::tls_intercept::acceptor;
23use crate::tls_intercept::cert_cache::CertCache;
24use std::sync::Arc;
25use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
26use tokio::net::TcpStream;
27use tokio_rustls::TlsAcceptor;
28use tracing::{debug, warn};
29use zeroize::Zeroizing;
30
31const MAX_HEADER_SIZE: usize = 64 * 1024;
34
35pub struct InterceptUpstreamProxy<'a> {
43 pub proxy_addr: &'a str,
45 pub proxy_auth_header: Option<&'a str>,
48}
49
50pub fn select_upstream_strategy<'a>(
57 upstream_proxy: &'a Option<InterceptUpstreamProxy<'a>>,
58 resolved_addrs: &'a [std::net::SocketAddr],
59) -> UpstreamStrategy<'a> {
60 if let Some(proxy) = upstream_proxy {
61 UpstreamStrategy::ExternalProxy {
62 proxy_addr: proxy.proxy_addr,
63 proxy_auth_header: proxy.proxy_auth_header,
64 }
65 } else {
66 UpstreamStrategy::Direct { resolved_addrs }
67 }
68}
69
70pub struct InterceptCtx<'a> {
72 pub route_id: Option<&'a str>,
73 pub host: &'a str,
74 pub port: u16,
75 pub route_store: &'a RouteStore,
76 pub credential_store: &'a CredentialStore,
77 pub session_token: &'a Zeroizing<String>,
78 pub cert_cache: Arc<CertCache>,
79 pub tls_connector: &'a tokio_rustls::TlsConnector,
80 pub filter: &'a ProxyFilter,
81 pub audit_log: Option<&'a audit::SharedAuditLog>,
82 pub upstream_proxy: Option<InterceptUpstreamProxy<'a>>,
85}
86
87pub async fn handle_intercept_connect(stream: &mut TcpStream, ctx: InterceptCtx<'_>) -> Result<()> {
93 debug!(
94 "tls_intercept: accepting CONNECT to {}:{} for L7 inspection",
95 ctx.host, ctx.port
96 );
97
98 let response = b"HTTP/1.1 200 Connection Established\r\n\r\n";
100 stream.write_all(response).await?;
101 stream.flush().await?;
102
103 let server_config = acceptor::build_server_config(Arc::clone(&ctx.cert_cache))?;
104 let tls_acceptor = TlsAcceptor::from(server_config);
105
106 let mut tls_stream = match tls_acceptor.accept(&mut *stream).await {
107 Ok(s) => s,
108 Err(e) => {
109 let reason = format!("tls handshake failed: {}", e);
112 warn!(
113 "tls_intercept: handshake failed for {}:{} — {}. \
114 Agent likely pins certs or carries a hard-coded trust list. \
115 Remove endpoint_rules / credential_key from the route to fall \
116 back to a transparent CONNECT tunnel.",
117 ctx.host, ctx.port, e
118 );
119 audit::log_denied(
120 ctx.audit_log,
121 audit::ProxyMode::ConnectIntercept,
122 &audit::EventContext {
123 route_id: ctx.route_id,
124 auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::ProxyAuthorization),
125 auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
126 denial_category: Some(
127 nono::undo::NetworkAuditDenialCategory::InterceptHandshakeFailed,
128 ),
129 ..audit::EventContext::default()
130 },
131 ctx.host,
132 ctx.port,
133 &reason,
134 );
135 return Ok(());
136 }
137 };
138
139 audit::log_allowed(
142 ctx.audit_log,
143 audit::ProxyMode::ConnectIntercept,
144 &audit::EventContext {
145 route_id: ctx.route_id,
146 auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::ProxyAuthorization),
147 auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
148 ..audit::EventContext::default()
149 },
150 ctx.host,
151 ctx.port,
152 "CONNECT",
153 );
154
155 if let Err(e) = handle_inner_request(&mut tls_stream, &ctx).await {
156 debug!(
157 "tls_intercept: inner-request handling failed for {}:{}: {}",
158 ctx.host, ctx.port, e
159 );
160 }
161 Ok(())
162}
163
164struct ParsedRequest {
168 method: String,
169 path: String,
170 version: String,
171 header_bytes: Vec<u8>,
173 buffered: Vec<u8>,
175}
176
177async fn resolve_upstream_or_deny<S>(
183 stream: &mut S,
184 ctx: &InterceptCtx<'_>,
185 deny_event_ctx: audit::EventContext<'_>,
186) -> Result<Option<Vec<std::net::SocketAddr>>>
187where
188 S: tokio::io::AsyncWrite + Unpin,
189{
190 let check = ctx.filter.check_host(ctx.host, ctx.port).await?;
191 if !check.result.is_allowed() {
192 let reason = check.result.reason();
193 warn!("tls_intercept: upstream host denied by filter: {}", reason);
194 audit::log_denied(
195 ctx.audit_log,
196 audit::ProxyMode::ConnectIntercept,
197 &audit::EventContext {
198 denial_category: Some(nono::undo::NetworkAuditDenialCategory::HostDenied),
199 ..deny_event_ctx
200 },
201 ctx.host,
202 ctx.port,
203 &reason,
204 );
205 reverse::send_error_generic(stream, 403, "Forbidden").await?;
206 return Ok(None);
207 }
208 Ok(Some(check.resolved_addrs))
209}
210
211async fn parse_inner_request<S>(stream: &mut S) -> Result<Option<ParsedRequest>>
220where
221 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
222{
223 let mut buf_reader = BufReader::new(&mut *stream);
224 let mut first_line = String::new();
225 buf_reader.read_line(&mut first_line).await?;
226 if first_line.is_empty() {
227 return Ok(None);
228 }
229
230 let mut header_bytes = Vec::new();
231 loop {
232 let mut line = String::new();
233 let n = buf_reader.read_line(&mut line).await?;
234 if n == 0 || line.trim().is_empty() {
235 break;
236 }
237 header_bytes.extend_from_slice(line.as_bytes());
238 if header_bytes.len() > MAX_HEADER_SIZE {
239 drop(buf_reader);
242 stream
243 .write_all(b"HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n")
244 .await?;
245 return Ok(None);
246 }
247 }
248 let buffered = buf_reader.buffer().to_vec();
249 drop(buf_reader);
250
251 let first_line = first_line.trim_end();
252 let (method, path, version) = parse_request_line(first_line)?;
253 Ok(Some(ParsedRequest {
254 method,
255 path,
256 version,
257 header_bytes,
258 buffered,
259 }))
260}
261
262async fn handle_inner_request<S>(tls_stream: &mut S, ctx: &InterceptCtx<'_>) -> Result<()>
265where
266 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
267{
268 let req = match parse_inner_request(tls_stream).await? {
269 Some(r) => r,
270 None => return Ok(()),
271 };
272 debug!("tls_intercept: inner request {} {}", req.method, req.path);
273
274 let host_port = format!("{}:{}", ctx.host.to_lowercase(), ctx.port);
276 let candidates = ctx.route_store.lookup_all_by_upstream(&host_port);
277 if candidates.is_empty() {
278 warn!(
279 "tls_intercept: no route for {} after intercept handshake",
280 host_port
281 );
282 reverse::send_error_generic(tls_stream, 502, "Bad Gateway").await?;
283 return Ok(());
284 }
285
286 let selected = match crate::route::select_route(&candidates, &req.method, &req.path) {
290 crate::route::RouteSelection::EndpointDenied => {
291 let reason = format!(
292 "endpoint rules denied {} {}: no rule matched on {}:{}",
293 req.method, req.path, ctx.host, ctx.port
294 );
295 warn!("tls_intercept: {}", reason);
296 audit::log_denied(
297 ctx.audit_log,
298 audit::ProxyMode::ConnectIntercept,
299 &audit::EventContext {
300 denial_category: Some(nono::undo::NetworkAuditDenialCategory::EndpointPolicy),
301 ..audit::EventContext::default()
302 },
303 ctx.host,
304 ctx.port,
305 &reason,
306 );
307 reverse::send_error_generic(tls_stream, 403, "Forbidden").await?;
308 return Ok(());
309 }
310 crate::route::RouteSelection::Ambiguous(names) => {
311 let reason = format!(
312 "ambiguous route: {} {} matched {} credential routes: {:?}. \
313 Narrow endpoint_rules so each request matches exactly one route.",
314 req.method,
315 req.path,
316 names.len(),
317 names
318 );
319 warn!("tls_intercept: {}", reason);
320 audit::log_denied(
321 ctx.audit_log,
322 audit::ProxyMode::ConnectIntercept,
323 &audit::EventContext {
324 denial_category: Some(nono::undo::NetworkAuditDenialCategory::EndpointPolicy),
325 ..audit::EventContext::default()
326 },
327 ctx.host,
328 ctx.port,
329 &reason,
330 );
331 reverse::send_error_generic(tls_stream, 403, "Forbidden").await?;
332 return Ok(());
333 }
334 crate::route::RouteSelection::Selected(selected) => selected,
335 };
336 let service: Option<&str> = selected.map(|(s, _)| s);
337 let route: Option<&crate::route::LoadedRoute> = selected.map(|(_, r)| r);
338 match service {
339 Some(svc) => debug!(
340 "tls_intercept: selected route '{}' for {} {}",
341 svc, req.method, req.path
342 ),
343 None => debug!(
344 "tls_intercept: no endpoint_rules matched {} {}, forwarding without credentials",
345 req.method, req.path
346 ),
347 }
348
349 let cred = service.and_then(|s| ctx.credential_store.get(s));
350 let oauth2_route = service.and_then(|s| ctx.credential_store.get_oauth2(s));
351 let aws_route = service.and_then(|s| ctx.credential_store.get_aws(s));
352
353 if let Some(rt) = route
354 && rt.missing_managed_credential(
355 cred.is_some(),
356 oauth2_route.is_some(),
357 aws_route.is_some(),
358 )
359 {
360 let svc = service.unwrap_or("unknown");
361 let reason = format!(
362 "managed credential unavailable for route '{}': intercepted request requires proxy-supplied auth",
363 svc
364 );
365 warn!("tls_intercept: {}", reason);
366 audit::log_denied(
367 ctx.audit_log,
368 audit::ProxyMode::ConnectIntercept,
369 &audit::EventContext {
370 route_id: service,
371 auth_mechanism: rt.managed_auth_mechanism.clone(),
372 auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
373 managed_credential_active: Some(false),
374 injection_mode: rt.managed_injection_mode.clone(),
375 denial_category: Some(
376 nono::undo::NetworkAuditDenialCategory::ManagedCredentialUnavailable,
377 ),
378 },
379 ctx.host,
380 ctx.port,
381 &reason,
382 );
383 reverse::send_error_generic(tls_stream, 503, "Service Unavailable").await?;
384 return Ok(());
385 }
386
387 if aws_route.is_some() {
391 reverse::send_error_generic(tls_stream, 501, "Not Implemented").await?;
392 return Ok(());
393 }
394
395 let transformed_path = if let Some(cred) = cred {
397 let cleaned = reverse::strip_proxy_artifacts(
398 &req.path,
399 &cred.proxy_inject_mode,
400 &cred.inject_mode,
401 cred.proxy_path_pattern.as_deref(),
402 cred.proxy_query_param_name.as_deref(),
403 );
404 reverse::transform_path_for_mode(
405 &cred.inject_mode,
406 &cleaned,
407 cred.path_pattern.as_deref(),
408 cred.path_replacement.as_deref(),
409 cred.query_param_name.as_deref(),
410 &cred.raw_credential,
411 )?
412 } else {
413 req.path.clone()
414 };
415
416 let resolved_addrs = match resolve_upstream_or_deny(
418 tls_stream,
419 ctx,
420 audit::EventContext {
421 route_id: service,
422 managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
423 injection_mode: cred.map(|c| match c.inject_mode {
424 InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
425 InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
426 InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
427 InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
428 }),
429 ..audit::EventContext::default()
430 },
431 )
432 .await?
433 {
434 Some(addrs) => addrs,
435 None => return Ok(()),
436 };
437
438 let strip_header = cred.map(|c| c.proxy_header_name.as_str()).unwrap_or("");
441 let filtered_headers = reverse::filter_headers(&req.header_bytes, strip_header);
442 let content_length = reverse::extract_content_length(&req.header_bytes);
443 let body = match reverse::read_request_body(tls_stream, content_length, &req.buffered).await? {
444 Some(b) => b,
445 None => return Ok(()),
446 };
447
448 let upstream_authority = reverse::format_host_header(UpstreamScheme::Https, ctx.host, ctx.port);
450 let mut request = Zeroizing::new(format!(
451 "{} {} {}\r\nHost: {}\r\n",
452 req.method, transformed_path, req.version, upstream_authority
453 ));
454 if let Some(cred) = cred {
455 reverse::inject_credential_for_mode(cred, &mut request);
456 }
457 let auth_header_lower = cred.map(|c| c.header_name.to_lowercase());
458 for (name, value) in &filtered_headers {
459 if let (Some(cred), Some(hdr)) = (cred, auth_header_lower.as_ref())
460 && matches!(cred.inject_mode, InjectMode::Header | InjectMode::BasicAuth)
461 && name.to_lowercase() == *hdr
462 {
463 continue;
464 }
465 request.push_str(&format!("{}: {}\r\n", name, value));
466 }
467 request.push_str("Connection: close\r\n");
468 if !body.is_empty() {
469 request.push_str(&format!("Content-Length: {}\r\n", body.len()));
470 }
471 request.push_str("\r\n");
472
473 let connector = route
475 .and_then(|r| r.tls_connector.as_ref())
476 .unwrap_or(ctx.tls_connector);
477 let strategy = select_upstream_strategy(&ctx.upstream_proxy, &resolved_addrs);
478 let upstream_spec = UpstreamSpec {
479 scheme: UpstreamScheme::Https,
480 host: ctx.host,
481 port: ctx.port,
482 strategy,
483 tls_connector: connector,
484 };
485 let event_ctx = audit::EventContext {
486 route_id: service,
487 auth_mechanism: cred.map(|c| match c.proxy_inject_mode {
488 InjectMode::Header | InjectMode::BasicAuth => {
489 nono::undo::NetworkAuditAuthMechanism::PhantomHeader
490 }
491 InjectMode::UrlPath => nono::undo::NetworkAuditAuthMechanism::PhantomPath,
492 InjectMode::QueryParam => nono::undo::NetworkAuditAuthMechanism::PhantomQuery,
493 }),
494 auth_outcome: cred.map(|_| nono::undo::NetworkAuditAuthOutcome::Succeeded),
495 managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
496 injection_mode: cred.map(|c| match c.inject_mode {
497 InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
498 InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
499 InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
500 InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
501 }),
502 denial_category: None,
503 };
504 let audit_ctx = AuditCtx {
505 log: ctx.audit_log,
506 mode: audit::ProxyMode::ConnectIntercept,
507 event_ctx: event_ctx.clone(),
508 target: ctx.host,
509 method: &req.method,
510 path: &req.path,
511 };
512 if let Err(e) = forward::forward_request(
513 tls_stream,
514 request.as_bytes(),
515 &body,
516 upstream_spec,
517 audit_ctx,
518 )
519 .await
520 {
521 warn!("tls_intercept: upstream forwarding failed: {}", e);
522 audit::log_denied(
523 ctx.audit_log,
524 audit::ProxyMode::ConnectIntercept,
525 &audit::EventContext {
526 denial_category: Some(
527 nono::undo::NetworkAuditDenialCategory::UpstreamConnectFailed,
528 ),
529 ..event_ctx
530 },
531 ctx.host,
532 ctx.port,
533 &e.to_string(),
534 );
535 let _ = reverse::send_error_generic(tls_stream, 502, "Bad Gateway").await;
536 }
537 Ok(())
538}
539
540fn parse_request_line(line: &str) -> Result<(String, String, String)> {
542 let parts: Vec<&str> = line.split_whitespace().collect();
543 if parts.len() < 3 {
544 return Err(ProxyError::HttpParse(format!(
545 "malformed inner request line: {}",
546 line
547 )));
548 }
549 Ok((
550 parts[0].to_string(),
551 parts[1].to_string(),
552 parts[2].to_string(),
553 ))
554}
555
556#[cfg(test)]
557#[allow(clippy::unwrap_used)]
558mod tests {
559 use super::*;
560
561 #[test]
562 fn parse_request_line_extracts_components() {
563 let (m, p, v) = parse_request_line("GET /v1/models HTTP/1.1").unwrap();
564 assert_eq!(m, "GET");
565 assert_eq!(p, "/v1/models");
566 assert_eq!(v, "HTTP/1.1");
567 }
568
569 #[test]
570 fn parse_request_line_rejects_malformed() {
571 assert!(parse_request_line("malformed").is_err());
572 assert!(parse_request_line("").is_err());
573 }
574
575 #[test]
576 fn upstream_strategy_selects_external_proxy_when_configured() {
577 let proxy = InterceptUpstreamProxy {
580 proxy_addr: "proxy.corp:80",
581 proxy_auth_header: None,
582 };
583 let some_proxy = Some(proxy);
584 let strategy = select_upstream_strategy(&some_proxy, &[]);
585 match strategy {
586 UpstreamStrategy::ExternalProxy {
587 proxy_addr,
588 proxy_auth_header,
589 } => {
590 assert_eq!(proxy_addr, "proxy.corp:80");
591 assert!(proxy_auth_header.is_none());
592 }
593 UpstreamStrategy::Direct { .. } => {
594 panic!("expected ExternalProxy strategy, got Direct");
595 }
596 }
597 }
598
599 #[test]
600 fn upstream_strategy_selects_direct_when_no_proxy() {
601 let addrs: Vec<std::net::SocketAddr> = vec![];
604 let strategy = select_upstream_strategy(&None, &addrs);
605 match strategy {
606 UpstreamStrategy::Direct { resolved_addrs } => {
607 assert!(resolved_addrs.is_empty());
608 }
609 UpstreamStrategy::ExternalProxy { .. } => {
610 panic!("expected Direct strategy, got ExternalProxy");
611 }
612 }
613 }
614
615 #[test]
616 fn upstream_strategy_external_proxy_with_auth_header() {
617 let proxy = InterceptUpstreamProxy {
619 proxy_addr: "proxy.corp:3128",
620 proxy_auth_header: Some("Basic dXNlcjpwYXNz"),
621 };
622 let some_proxy = Some(proxy);
623 let strategy = select_upstream_strategy(&some_proxy, &[]);
624 match strategy {
625 UpstreamStrategy::ExternalProxy {
626 proxy_addr,
627 proxy_auth_header,
628 } => {
629 assert_eq!(proxy_addr, "proxy.corp:3128");
630 assert_eq!(proxy_auth_header, Some("Basic dXNlcjpwYXNz"));
631 }
632 UpstreamStrategy::Direct { .. } => {
633 panic!("expected ExternalProxy strategy, got Direct");
634 }
635 }
636 }
637}