1use crate::audit;
15use crate::config::InjectMode;
16use crate::credential::CredentialStore;
17use crate::error::{ProxyError, Result};
18use crate::filter::ProxyFilter;
19use crate::forward::{self, AuditCtx, UpstreamScheme, UpstreamSpec, UpstreamStrategy};
20use crate::reverse;
21use crate::route::RouteStore;
22use crate::tls_intercept::acceptor;
23use crate::tls_intercept::cert_cache::CertCache;
24use std::sync::Arc;
25use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
26use tokio::net::TcpStream;
27use tokio_rustls::TlsAcceptor;
28use tracing::{debug, warn};
29use zeroize::Zeroizing;
30
31const MAX_HEADER_SIZE: usize = 64 * 1024;
34
35pub struct InterceptCtx<'a> {
37 pub route_id: Option<&'a str>,
38 pub host: &'a str,
39 pub port: u16,
40 pub route_store: &'a RouteStore,
41 pub credential_store: &'a CredentialStore,
42 pub session_token: &'a Zeroizing<String>,
43 pub cert_cache: Arc<CertCache>,
44 pub tls_connector: &'a tokio_rustls::TlsConnector,
45 pub filter: &'a ProxyFilter,
46 pub audit_log: Option<&'a audit::SharedAuditLog>,
47}
48
49pub async fn handle_intercept_connect(stream: &mut TcpStream, ctx: InterceptCtx<'_>) -> Result<()> {
55 debug!(
56 "tls_intercept: accepting CONNECT to {}:{} for L7 inspection",
57 ctx.host, ctx.port
58 );
59
60 let response = b"HTTP/1.1 200 Connection Established\r\n\r\n";
62 stream.write_all(response).await?;
63 stream.flush().await?;
64
65 let server_config = acceptor::build_server_config(Arc::clone(&ctx.cert_cache))?;
66 let tls_acceptor = TlsAcceptor::from(server_config);
67
68 let mut tls_stream = match tls_acceptor.accept(&mut *stream).await {
69 Ok(s) => s,
70 Err(e) => {
71 let reason = format!("tls handshake failed: {}", e);
74 warn!(
75 "tls_intercept: handshake failed for {}:{} — {}. \
76 Agent likely pins certs or carries a hard-coded trust list. \
77 Remove endpoint_rules / credential_key from the route to fall \
78 back to a transparent CONNECT tunnel.",
79 ctx.host, ctx.port, e
80 );
81 audit::log_denied(
82 ctx.audit_log,
83 audit::ProxyMode::ConnectIntercept,
84 &audit::EventContext {
85 route_id: ctx.route_id,
86 auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::ProxyAuthorization),
87 auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
88 denial_category: Some(
89 nono::undo::NetworkAuditDenialCategory::InterceptHandshakeFailed,
90 ),
91 ..audit::EventContext::default()
92 },
93 ctx.host,
94 ctx.port,
95 &reason,
96 );
97 return Ok(());
98 }
99 };
100
101 audit::log_allowed(
104 ctx.audit_log,
105 audit::ProxyMode::ConnectIntercept,
106 &audit::EventContext {
107 route_id: ctx.route_id,
108 auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::ProxyAuthorization),
109 auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
110 ..audit::EventContext::default()
111 },
112 ctx.host,
113 ctx.port,
114 "CONNECT",
115 );
116
117 if let Err(e) = forward_inner_request(&mut tls_stream, &ctx).await {
118 debug!(
119 "tls_intercept: inner-request handling failed for {}:{}: {}",
120 ctx.host, ctx.port, e
121 );
122 }
123 Ok(())
124}
125
126async fn forward_inner_request<S>(tls_stream: &mut S, ctx: &InterceptCtx<'_>) -> Result<()>
129where
130 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
131{
132 let mut buf_reader = BufReader::new(&mut *tls_stream);
134 let mut first_line = String::new();
135 buf_reader.read_line(&mut first_line).await?;
136 if first_line.is_empty() {
137 return Ok(());
138 }
139
140 let mut header_bytes = Vec::new();
141 loop {
142 let mut line = String::new();
143 let n = buf_reader.read_line(&mut line).await?;
144 if n == 0 || line.trim().is_empty() {
145 break;
146 }
147 header_bytes.extend_from_slice(line.as_bytes());
148 if header_bytes.len() > MAX_HEADER_SIZE {
149 let buffered = buf_reader.buffer().to_vec();
152 drop(buf_reader);
153 tls_stream
154 .write_all(b"HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n")
155 .await?;
156 let _ = buffered;
157 return Ok(());
158 }
159 }
160 let buffered = buf_reader.buffer().to_vec();
161 drop(buf_reader);
162
163 let first_line = first_line.trim_end();
164 let (method, path, version) = parse_request_line(first_line)?;
165 debug!("tls_intercept: inner request {} {}", method, path);
166
167 let host_port = format!("{}:{}", ctx.host.to_lowercase(), ctx.port);
169 let candidates = ctx.route_store.lookup_all_by_upstream(&host_port);
170 if candidates.is_empty() {
171 warn!(
172 "tls_intercept: no route for {} after intercept handshake",
173 host_port
174 );
175 reverse::send_error_generic(tls_stream, 502, "Bad Gateway").await?;
176 return Ok(());
177 }
178
179 let mut matches: Vec<(&str, &crate::route::LoadedRoute)> = Vec::new();
180 let mut catch_all: Option<(&str, &crate::route::LoadedRoute)> = None;
181 let mut has_endpoint_only_route = false;
182 let mut endpoint_authorized = false;
183 for (prefix, route) in &candidates {
184 if route.endpoint_rules.is_empty() {
185 if catch_all.is_none() {
186 catch_all = Some((prefix, route));
187 }
188 } else if route.endpoint_rules.is_allowed(&method, &path) {
189 matches.push((prefix, route));
190 if !route.requires_managed_credential {
191 endpoint_authorized = true;
192 }
193 } else if !route.requires_managed_credential {
194 has_endpoint_only_route = true;
195 }
196 }
197
198 if has_endpoint_only_route && !endpoint_authorized {
203 let reason = format!(
204 "endpoint rules denied {} {}: no rule matched on {}:{}",
205 method, path, ctx.host, ctx.port
206 );
207 warn!("tls_intercept: {}", reason);
208 audit::log_denied(
209 ctx.audit_log,
210 audit::ProxyMode::ConnectIntercept,
211 &audit::EventContext {
212 denial_category: Some(nono::undo::NetworkAuditDenialCategory::EndpointPolicy),
213 ..audit::EventContext::default()
214 },
215 ctx.host,
216 ctx.port,
217 &reason,
218 );
219 reverse::send_error_generic(tls_stream, 403, "Forbidden").await?;
220 return Ok(());
221 }
222
223 let credential_matches: Vec<_> = matches
228 .iter()
229 .filter(|(_, route)| route.requires_managed_credential)
230 .collect();
231 if credential_matches.len() > 1 {
232 let names: Vec<_> = credential_matches.iter().map(|(p, _)| *p).collect();
233 let reason = format!(
234 "ambiguous route: {} {} matched {} credential routes: {:?}. \
235 Narrow endpoint_rules so each request matches exactly one route.",
236 method,
237 path,
238 credential_matches.len(),
239 names
240 );
241 warn!("tls_intercept: {}", reason);
242 audit::log_denied(
243 ctx.audit_log,
244 audit::ProxyMode::ConnectIntercept,
245 &audit::EventContext {
246 denial_category: Some(nono::undo::NetworkAuditDenialCategory::EndpointPolicy),
247 ..audit::EventContext::default()
248 },
249 ctx.host,
250 ctx.port,
251 &reason,
252 );
253 reverse::send_error_generic(tls_stream, 403, "Forbidden").await?;
254 return Ok(());
255 }
256
257 let selected = matches
259 .iter()
260 .find(|(_, route)| route.requires_managed_credential)
261 .or(matches.first())
262 .copied()
263 .or(catch_all);
264 let service: Option<&str> = selected.map(|(s, _)| s);
265 let route: Option<&crate::route::LoadedRoute> = selected.map(|(_, r)| r);
266 match service {
267 Some(svc) => debug!(
268 "tls_intercept: selected route '{}' for {} {}",
269 svc, method, path
270 ),
271 None => debug!(
272 "tls_intercept: no endpoint_rules matched {} {}, forwarding without credentials",
273 method, path
274 ),
275 }
276
277 let cred = service.and_then(|s| ctx.credential_store.get(s));
278 let oauth2_route = service.and_then(|s| ctx.credential_store.get_oauth2(s));
279
280 if let Some(rt) = route
281 && rt.missing_managed_credential(cred.is_some(), oauth2_route.is_some())
282 {
283 let svc = service.unwrap_or("unknown");
284 let reason = format!(
285 "managed credential unavailable for route '{}': intercepted request requires proxy-supplied auth",
286 svc
287 );
288 warn!("tls_intercept: {}", reason);
289 audit::log_denied(
290 ctx.audit_log,
291 audit::ProxyMode::ConnectIntercept,
292 &audit::EventContext {
293 route_id: service,
294 auth_mechanism: rt.managed_auth_mechanism.clone(),
295 auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
296 managed_credential_active: Some(false),
297 injection_mode: rt.managed_injection_mode.clone(),
298 denial_category: Some(
299 nono::undo::NetworkAuditDenialCategory::ManagedCredentialUnavailable,
300 ),
301 },
302 ctx.host,
303 ctx.port,
304 &reason,
305 );
306 reverse::send_error_generic(tls_stream, 503, "Service Unavailable").await?;
307 return Ok(());
308 }
309
310 let transformed_path = if let Some(cred) = cred {
312 let cleaned = reverse::strip_proxy_artifacts(
313 &path,
314 &cred.proxy_inject_mode,
315 &cred.inject_mode,
316 cred.proxy_path_pattern.as_deref(),
317 cred.proxy_query_param_name.as_deref(),
318 );
319 reverse::transform_path_for_mode(
320 &cred.inject_mode,
321 &cleaned,
322 cred.path_pattern.as_deref(),
323 cred.path_replacement.as_deref(),
324 cred.query_param_name.as_deref(),
325 &cred.raw_credential,
326 )?
327 } else {
328 path.clone()
329 };
330
331 let check = ctx.filter.check_host(ctx.host, ctx.port).await?;
333 if !check.result.is_allowed() {
334 let reason = check.result.reason();
335 warn!("tls_intercept: upstream host denied by filter: {}", reason);
336 audit::log_denied(
337 ctx.audit_log,
338 audit::ProxyMode::ConnectIntercept,
339 &audit::EventContext {
340 route_id: service,
341 managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
342 injection_mode: cred.map(|c| match c.inject_mode {
343 InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
344 InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
345 InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
346 InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
347 }),
348 denial_category: Some(nono::undo::NetworkAuditDenialCategory::HostDenied),
349 ..audit::EventContext::default()
350 },
351 ctx.host,
352 ctx.port,
353 &reason,
354 );
355 reverse::send_error_generic(tls_stream, 403, "Forbidden").await?;
356 return Ok(());
357 }
358
359 let strip_header = cred.map(|c| c.proxy_header_name.as_str()).unwrap_or("");
362 let filtered_headers = reverse::filter_headers(&header_bytes, strip_header);
363 let content_length = reverse::extract_content_length(&header_bytes);
364 let body = match reverse::read_request_body(tls_stream, content_length, &buffered).await? {
365 Some(b) => b,
366 None => return Ok(()),
367 };
368
369 let upstream_authority = reverse::format_host_header(UpstreamScheme::Https, ctx.host, ctx.port);
371 let mut request = Zeroizing::new(format!(
372 "{} {} {}\r\nHost: {}\r\n",
373 method, transformed_path, version, upstream_authority
374 ));
375 if let Some(cred) = cred {
376 reverse::inject_credential_for_mode(cred, &mut request);
377 }
378 let auth_header_lower = cred.map(|c| c.header_name.to_lowercase());
379 for (name, value) in &filtered_headers {
380 if let (Some(cred), Some(hdr)) = (cred, auth_header_lower.as_ref())
381 && matches!(cred.inject_mode, InjectMode::Header | InjectMode::BasicAuth)
382 && name.to_lowercase() == *hdr
383 {
384 continue;
385 }
386 request.push_str(&format!("{}: {}\r\n", name, value));
387 }
388 request.push_str("Connection: close\r\n");
389 if !body.is_empty() {
390 request.push_str(&format!("Content-Length: {}\r\n", body.len()));
391 }
392 request.push_str("\r\n");
393
394 let connector = route
396 .and_then(|r| r.tls_connector.as_ref())
397 .unwrap_or(ctx.tls_connector);
398 let upstream_spec = UpstreamSpec {
399 scheme: UpstreamScheme::Https,
400 host: ctx.host,
401 port: ctx.port,
402 strategy: UpstreamStrategy::Direct {
403 resolved_addrs: &check.resolved_addrs,
404 },
405 tls_connector: connector,
406 };
407 let audit_ctx = AuditCtx {
408 log: ctx.audit_log,
409 mode: audit::ProxyMode::ConnectIntercept,
410 event_ctx: audit::EventContext {
411 route_id: service,
412 auth_mechanism: cred.map(|c| match c.proxy_inject_mode {
413 InjectMode::Header | InjectMode::BasicAuth => {
414 nono::undo::NetworkAuditAuthMechanism::PhantomHeader
415 }
416 InjectMode::UrlPath => nono::undo::NetworkAuditAuthMechanism::PhantomPath,
417 InjectMode::QueryParam => nono::undo::NetworkAuditAuthMechanism::PhantomQuery,
418 }),
419 auth_outcome: cred.map(|_| nono::undo::NetworkAuditAuthOutcome::Succeeded),
420 managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
421 injection_mode: cred.map(|c| match c.inject_mode {
422 InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
423 InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
424 InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
425 InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
426 }),
427 denial_category: None,
428 },
429 target: ctx.host,
430 method: &method,
431 path: &path,
432 };
433 if let Err(e) = forward::forward_request(
434 tls_stream,
435 request.as_bytes(),
436 &body,
437 upstream_spec,
438 audit_ctx,
439 )
440 .await
441 {
442 warn!("tls_intercept: upstream forwarding failed: {}", e);
443 audit::log_denied(
444 ctx.audit_log,
445 audit::ProxyMode::ConnectIntercept,
446 &audit::EventContext {
447 route_id: service,
448 auth_mechanism: cred.map(|c| match c.proxy_inject_mode {
449 InjectMode::Header | InjectMode::BasicAuth => {
450 nono::undo::NetworkAuditAuthMechanism::PhantomHeader
451 }
452 InjectMode::UrlPath => nono::undo::NetworkAuditAuthMechanism::PhantomPath,
453 InjectMode::QueryParam => nono::undo::NetworkAuditAuthMechanism::PhantomQuery,
454 }),
455 auth_outcome: cred.map(|_| nono::undo::NetworkAuditAuthOutcome::Succeeded),
456 managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
457 injection_mode: cred.map(|c| match c.inject_mode {
458 InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
459 InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
460 InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
461 InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
462 }),
463 denial_category: Some(
464 nono::undo::NetworkAuditDenialCategory::UpstreamConnectFailed,
465 ),
466 },
467 ctx.host,
468 ctx.port,
469 &e.to_string(),
470 );
471 let _ = reverse::send_error_generic(tls_stream, 502, "Bad Gateway").await;
472 }
473 Ok(())
474}
475
476fn parse_request_line(line: &str) -> Result<(String, String, String)> {
478 let parts: Vec<&str> = line.split_whitespace().collect();
479 if parts.len() < 3 {
480 return Err(ProxyError::HttpParse(format!(
481 "malformed inner request line: {}",
482 line
483 )));
484 }
485 Ok((
486 parts[0].to_string(),
487 parts[1].to_string(),
488 parts[2].to_string(),
489 ))
490}
491
492#[cfg(test)]
493#[allow(clippy::unwrap_used)]
494mod tests {
495 use super::*;
496
497 #[test]
498 fn parse_request_line_extracts_components() {
499 let (m, p, v) = parse_request_line("GET /v1/models HTTP/1.1").unwrap();
500 assert_eq!(m, "GET");
501 assert_eq!(p, "/v1/models");
502 assert_eq!(v, "HTTP/1.1");
503 }
504
505 #[test]
506 fn parse_request_line_rejects_malformed() {
507 assert!(parse_request_line("malformed").is_err());
508 assert!(parse_request_line("").is_err());
509 }
510}