Skip to main content

nono_proxy/tls_intercept/
handle.rs

1//! CONNECT-intercept entry point.
2//!
3//! Owns:
4//!
5//! 1. Sending `200 Connection Established` to the agent.
6//! 2. Driving the inner TLS handshake using the cert resolver from
7//!    [`super::CertCache`].
8//! 3. Reading one HTTP/1.1 request inside the TLS stream.
9//! 4. Endpoint-rule + phantom-token enforcement.
10//! 5. Calling [`crate::forward::forward_request`] with the route's L7
11//!    context so the same upstream pipeline is used as the reverse proxy.
12//!
13//! Every failure mode is hard-fail — a route that asked for L7 visibility
14//! never silently degrades to a transparent tunnel.
15
16use crate::audit;
17use crate::config::InjectMode;
18use crate::credential::CredentialStore;
19use crate::error::{ProxyError, Result};
20use crate::forward::{self, AuditCtx, UpstreamScheme, UpstreamSpec, UpstreamStrategy};
21use crate::reverse;
22use crate::route::RouteStore;
23use crate::tls_intercept::acceptor;
24use crate::tls_intercept::cert_cache::CertCache;
25use crate::{filter::ProxyFilter, token};
26use std::sync::Arc;
27use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
28use tokio::net::TcpStream;
29use tokio_rustls::TlsAcceptor;
30use tracing::{debug, warn};
31use zeroize::Zeroizing;
32
33/// Header byte cap matching the outer proxy's `MAX_HEADER_SIZE` to keep the
34/// memory ceiling consistent.
35const MAX_HEADER_SIZE: usize = 64 * 1024;
36
37/// Per-connection context passed to [`handle_intercept_connect`].
38pub struct InterceptCtx<'a> {
39    pub route_id: Option<&'a str>,
40    pub host: &'a str,
41    pub port: u16,
42    pub route_store: &'a RouteStore,
43    pub credential_store: &'a CredentialStore,
44    pub session_token: &'a Zeroizing<String>,
45    pub cert_cache: Arc<CertCache>,
46    pub tls_connector: &'a tokio_rustls::TlsConnector,
47    pub filter: &'a ProxyFilter,
48    pub audit_log: Option<&'a audit::SharedAuditLog>,
49}
50
51/// Handle a CONNECT request that matched a route requiring L7 visibility.
52///
53/// Caller responsibilities (already enforced in `server.rs`):
54/// * Validate strict OUTER `Proxy-Authorization` against the session token.
55/// * Confirm `route_store.has_intercept_route(host, port)`.
56pub async fn handle_intercept_connect(stream: &mut TcpStream, ctx: InterceptCtx<'_>) -> Result<()> {
57    debug!(
58        "tls_intercept: accepting CONNECT to {}:{} for L7 inspection",
59        ctx.host, ctx.port
60    );
61
62    // 200 to the agent before the inner TLS handshake.
63    let response = b"HTTP/1.1 200 Connection Established\r\n\r\n";
64    stream.write_all(response).await?;
65    stream.flush().await?;
66
67    let server_config = acceptor::build_server_config(Arc::clone(&ctx.cert_cache))?;
68    let tls_acceptor = TlsAcceptor::from(server_config);
69
70    let mut tls_stream = match tls_acceptor.accept(&mut *stream).await {
71        Ok(s) => s,
72        Err(e) => {
73            // Hard fail: never silently degrade. Agent sees a TLS error,
74            // we record the failure with a sanitized rustls Display string.
75            let reason = format!("tls handshake failed: {}", e);
76            warn!(
77                "tls_intercept: handshake failed for {}:{} — {}. \
78                 Agent likely pins certs or carries a hard-coded trust list. \
79                 Remove endpoint_rules / credential_key from the route to fall \
80                 back to a transparent CONNECT tunnel.",
81                ctx.host, ctx.port, e
82            );
83            audit::log_denied(
84                ctx.audit_log,
85                audit::ProxyMode::ConnectIntercept,
86                &audit::EventContext {
87                    route_id: ctx.route_id,
88                    auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::ProxyAuthorization),
89                    auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
90                    denial_category: Some(
91                        nono::undo::NetworkAuditDenialCategory::InterceptHandshakeFailed,
92                    ),
93                    ..audit::EventContext::default()
94                },
95                ctx.host,
96                ctx.port,
97                &reason,
98            );
99            return Ok(());
100        }
101    };
102
103    // Acceptance event: the inner TLS handshake completed. Per-request L7
104    // events are emitted by `forward_request` once we hand off below.
105    audit::log_allowed(
106        ctx.audit_log,
107        audit::ProxyMode::ConnectIntercept,
108        &audit::EventContext {
109            route_id: ctx.route_id,
110            auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::ProxyAuthorization),
111            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
112            ..audit::EventContext::default()
113        },
114        ctx.host,
115        ctx.port,
116        "CONNECT",
117    );
118
119    if let Err(e) = forward_inner_request(&mut tls_stream, &ctx).await {
120        debug!(
121            "tls_intercept: inner-request handling failed for {}:{}: {}",
122            ctx.host, ctx.port, e
123        );
124    }
125    Ok(())
126}
127
128/// Read one HTTP/1.1 request from the inner TLS stream, enforce endpoint
129/// rules + phantom token, and forward via the shared L7 pipeline.
130async fn forward_inner_request<S>(tls_stream: &mut S, ctx: &InterceptCtx<'_>) -> Result<()>
131where
132    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
133{
134    // --- Parse the inner request line + headers ---
135    let mut buf_reader = BufReader::new(&mut *tls_stream);
136    let mut first_line = String::new();
137    buf_reader.read_line(&mut first_line).await?;
138    if first_line.is_empty() {
139        return Ok(());
140    }
141
142    let mut header_bytes = Vec::new();
143    loop {
144        let mut line = String::new();
145        let n = buf_reader.read_line(&mut line).await?;
146        if n == 0 || line.trim().is_empty() {
147            break;
148        }
149        header_bytes.extend_from_slice(line.as_bytes());
150        if header_bytes.len() > MAX_HEADER_SIZE {
151            // Mirror the outer proxy's behaviour. We have to write into the
152            // BufReader's inner stream — release it first.
153            let buffered = buf_reader.buffer().to_vec();
154            drop(buf_reader);
155            tls_stream
156                .write_all(b"HTTP/1.1 431 Request Header Fields Too Large\r\n\r\n")
157                .await?;
158            let _ = buffered;
159            return Ok(());
160        }
161    }
162    let buffered = buf_reader.buffer().to_vec();
163    drop(buf_reader);
164
165    let first_line = first_line.trim_end();
166    let (method, path, version) = parse_request_line(first_line)?;
167    debug!("tls_intercept: inner request {} {}", method, path);
168
169    // --- Look up the route by upstream host:port ---
170    let host_port = format!("{}:{}", ctx.host.to_lowercase(), ctx.port);
171    let (service, route) = match ctx.route_store.lookup_by_upstream(&host_port) {
172        Some(hit) => hit,
173        None => {
174            // Shouldn't reach here — the dispatch path already confirmed
175            // a route match before invoking us. Treat as 502 defensively.
176            warn!(
177                "tls_intercept: no route for {} after intercept handshake",
178                host_port
179            );
180            reverse::send_error_generic(tls_stream, 502, "Bad Gateway").await?;
181            return Ok(());
182        }
183    };
184    let cred = ctx.credential_store.get(service);
185    let oauth2_route = ctx.credential_store.get_oauth2(service);
186
187    if route.missing_managed_credential(cred.is_some(), oauth2_route.is_some()) {
188        let reason = format!(
189            "managed credential unavailable for route '{}': intercepted request requires proxy-supplied auth",
190            service
191        );
192        warn!("tls_intercept: {}", reason);
193        audit::log_denied(
194            ctx.audit_log,
195            audit::ProxyMode::ConnectIntercept,
196            &audit::EventContext {
197                route_id: ctx.route_id,
198                auth_mechanism: route.managed_auth_mechanism.clone(),
199                auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
200                managed_credential_active: Some(false),
201                injection_mode: route.managed_injection_mode.clone(),
202                denial_category: Some(
203                    nono::undo::NetworkAuditDenialCategory::ManagedCredentialUnavailable,
204                ),
205            },
206            ctx.host,
207            ctx.port,
208            &reason,
209        );
210        reverse::send_error_generic(tls_stream, 503, "Service Unavailable").await?;
211        return Ok(());
212    }
213
214    // --- L7 endpoint filtering ---
215    if !route.endpoint_rules.is_allowed(&method, &path) {
216        let reason = format!("endpoint denied: {} {}", method, path);
217        warn!("tls_intercept: {}", reason);
218        audit::log_denied(
219            ctx.audit_log,
220            audit::ProxyMode::ConnectIntercept,
221            &audit::EventContext {
222                route_id: Some(service),
223                managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
224                injection_mode: cred.map(|c| match c.inject_mode {
225                    InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
226                    InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
227                    InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
228                    InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
229                }),
230                denial_category: Some(nono::undo::NetworkAuditDenialCategory::EndpointPolicy),
231                ..audit::EventContext::default()
232            },
233            ctx.host,
234            ctx.port,
235            &reason,
236        );
237        reverse::send_error_generic(tls_stream, 403, "Forbidden").await?;
238        return Ok(());
239    }
240
241    // --- Phantom-token validation for credentialed routes ---
242    if let Some(cred) = cred {
243        if let Err(e) = reverse::validate_phantom_token_for_mode(
244            &cred.proxy_inject_mode,
245            &header_bytes,
246            &path,
247            &cred.proxy_header_name,
248            cred.proxy_path_pattern.as_deref(),
249            cred.proxy_query_param_name.as_deref(),
250            ctx.session_token,
251        ) {
252            audit::log_denied(
253                ctx.audit_log,
254                audit::ProxyMode::ConnectIntercept,
255                &audit::EventContext {
256                    route_id: Some(service),
257                    auth_mechanism: Some(match cred.proxy_inject_mode {
258                        InjectMode::Header | InjectMode::BasicAuth => {
259                            nono::undo::NetworkAuditAuthMechanism::PhantomHeader
260                        }
261                        InjectMode::UrlPath => nono::undo::NetworkAuditAuthMechanism::PhantomPath,
262                        InjectMode::QueryParam => {
263                            nono::undo::NetworkAuditAuthMechanism::PhantomQuery
264                        }
265                    }),
266                    auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
267                    managed_credential_active: Some(true),
268                    injection_mode: Some(match cred.inject_mode {
269                        InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
270                        InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
271                        InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
272                        InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
273                    }),
274                    denial_category: Some(
275                        nono::undo::NetworkAuditDenialCategory::AuthenticationFailed,
276                    ),
277                },
278                ctx.host,
279                ctx.port,
280                &e.to_string(),
281            );
282            reverse::send_error_generic(tls_stream, 401, "Unauthorized").await?;
283            return Ok(());
284        }
285    }
286    // L7-only routes have no inner-token check by design — the outer CONNECT
287    // Proxy-Authorization already proved the caller holds the session token.
288
289    // --- Path / credential transformation ---
290    let transformed_path = if let Some(cred) = cred {
291        let cleaned = reverse::strip_proxy_artifacts(
292            &path,
293            &cred.proxy_inject_mode,
294            &cred.inject_mode,
295            cred.proxy_path_pattern.as_deref(),
296            cred.proxy_query_param_name.as_deref(),
297        );
298        reverse::transform_path_for_mode(
299            &cred.inject_mode,
300            &cleaned,
301            cred.path_pattern.as_deref(),
302            cred.path_replacement.as_deref(),
303            cred.query_param_name.as_deref(),
304            &cred.raw_credential,
305        )?
306    } else {
307        path.clone()
308    };
309
310    // --- Resolve upstream IPs (DNS-rebind-safe via filter) ---
311    let check = ctx.filter.check_host(ctx.host, ctx.port).await?;
312    if !check.result.is_allowed() {
313        let reason = check.result.reason();
314        warn!("tls_intercept: upstream host denied by filter: {}", reason);
315        audit::log_denied(
316            ctx.audit_log,
317            audit::ProxyMode::ConnectIntercept,
318            &audit::EventContext {
319                route_id: Some(service),
320                managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
321                injection_mode: cred.map(|c| match c.inject_mode {
322                    InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
323                    InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
324                    InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
325                    InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
326                }),
327                denial_category: Some(nono::undo::NetworkAuditDenialCategory::HostDenied),
328                ..audit::EventContext::default()
329            },
330            ctx.host,
331            ctx.port,
332            &reason,
333        );
334        reverse::send_error_generic(tls_stream, 403, "Forbidden").await?;
335        return Ok(());
336    }
337
338    // --- Read body (Content-Length only; chunked is rare in API requests
339    // and matches the existing reverse-proxy contract). ---
340    let strip_header = cred.map(|c| c.proxy_header_name.as_str()).unwrap_or("");
341    let filtered_headers = reverse::filter_headers(&header_bytes, strip_header);
342    let content_length = reverse::extract_content_length(&header_bytes);
343    let body = match reverse::read_request_body(tls_stream, content_length, &buffered).await? {
344        Some(b) => b,
345        None => return Ok(()),
346    };
347
348    // --- Build upstream request bytes ---
349    let upstream_authority = reverse::format_host_header(UpstreamScheme::Https, ctx.host, ctx.port);
350    let mut request = Zeroizing::new(format!(
351        "{} {} {}\r\nHost: {}\r\n",
352        method, transformed_path, version, upstream_authority
353    ));
354    if let Some(cred) = cred {
355        reverse::inject_credential_for_mode(cred, &mut request);
356    }
357    let auth_header_lower = cred.map(|c| c.header_name.to_lowercase());
358    for (name, value) in &filtered_headers {
359        if let (Some(cred), Some(hdr)) = (cred, auth_header_lower.as_ref()) {
360            if matches!(cred.inject_mode, InjectMode::Header | InjectMode::BasicAuth)
361                && name.to_lowercase() == *hdr
362            {
363                continue;
364            }
365        }
366        request.push_str(&format!("{}: {}\r\n", name, value));
367    }
368    request.push_str("Connection: close\r\n");
369    if !body.is_empty() {
370        request.push_str(&format!("Content-Length: {}\r\n", body.len()));
371    }
372    request.push_str("\r\n");
373
374    // --- Forward via shared pipeline ---
375    let connector = route.tls_connector.as_ref().unwrap_or(ctx.tls_connector);
376    let upstream_spec = UpstreamSpec {
377        scheme: UpstreamScheme::Https,
378        host: ctx.host,
379        port: ctx.port,
380        strategy: UpstreamStrategy::Direct {
381            resolved_addrs: &check.resolved_addrs,
382        },
383        tls_connector: connector,
384    };
385    let audit_ctx = AuditCtx {
386        log: ctx.audit_log,
387        mode: audit::ProxyMode::ConnectIntercept,
388        event_ctx: audit::EventContext {
389            route_id: Some(service),
390            auth_mechanism: cred.map(|c| match c.proxy_inject_mode {
391                InjectMode::Header | InjectMode::BasicAuth => {
392                    nono::undo::NetworkAuditAuthMechanism::PhantomHeader
393                }
394                InjectMode::UrlPath => nono::undo::NetworkAuditAuthMechanism::PhantomPath,
395                InjectMode::QueryParam => nono::undo::NetworkAuditAuthMechanism::PhantomQuery,
396            }),
397            auth_outcome: cred.map(|_| nono::undo::NetworkAuditAuthOutcome::Succeeded),
398            managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
399            injection_mode: cred.map(|c| match c.inject_mode {
400                InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
401                InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
402                InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
403                InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
404            }),
405            denial_category: None,
406        },
407        target: ctx.host,
408        method: &method,
409        path: &path,
410    };
411    if let Err(e) = forward::forward_request(
412        tls_stream,
413        request.as_bytes(),
414        &body,
415        upstream_spec,
416        audit_ctx,
417    )
418    .await
419    {
420        warn!("tls_intercept: upstream forwarding failed: {}", e);
421        audit::log_denied(
422            ctx.audit_log,
423            audit::ProxyMode::ConnectIntercept,
424            &audit::EventContext {
425                route_id: Some(service),
426                auth_mechanism: cred.map(|c| match c.proxy_inject_mode {
427                    InjectMode::Header | InjectMode::BasicAuth => {
428                        nono::undo::NetworkAuditAuthMechanism::PhantomHeader
429                    }
430                    InjectMode::UrlPath => nono::undo::NetworkAuditAuthMechanism::PhantomPath,
431                    InjectMode::QueryParam => nono::undo::NetworkAuditAuthMechanism::PhantomQuery,
432                }),
433                auth_outcome: cred.map(|_| nono::undo::NetworkAuditAuthOutcome::Succeeded),
434                managed_credential_active: Some(cred.is_some() || oauth2_route.is_some()),
435                injection_mode: cred.map(|c| match c.inject_mode {
436                    InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
437                    InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
438                    InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
439                    InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
440                }),
441                denial_category: Some(
442                    nono::undo::NetworkAuditDenialCategory::UpstreamConnectFailed,
443                ),
444            },
445            ctx.host,
446            ctx.port,
447            &e.to_string(),
448        );
449        let _ = reverse::send_error_generic(tls_stream, 502, "Bad Gateway").await;
450    }
451    Ok(())
452}
453
454/// Parse a request line into (method, path, version).
455fn parse_request_line(line: &str) -> Result<(String, String, String)> {
456    let parts: Vec<&str> = line.split_whitespace().collect();
457    if parts.len() < 3 {
458        return Err(ProxyError::HttpParse(format!(
459            "malformed inner request line: {}",
460            line
461        )));
462    }
463    Ok((
464        parts[0].to_string(),
465        parts[1].to_string(),
466        parts[2].to_string(),
467    ))
468}
469
470/// Trim unused-arg warnings: `token` is used implicitly via `validate_phantom_token_for_mode`.
471#[allow(dead_code)]
472fn _suppress_token_dependency() {
473    let _ = token::constant_time_eq;
474}
475
476#[cfg(test)]
477#[allow(clippy::unwrap_used)]
478mod tests {
479    use super::*;
480
481    #[test]
482    fn parse_request_line_extracts_components() {
483        let (m, p, v) = parse_request_line("GET /v1/models HTTP/1.1").unwrap();
484        assert_eq!(m, "GET");
485        assert_eq!(p, "/v1/models");
486        assert_eq!(v, "HTTP/1.1");
487    }
488
489    #[test]
490    fn parse_request_line_rejects_malformed() {
491        assert!(parse_request_line("malformed").is_err());
492        assert!(parse_request_line("").is_err());
493    }
494}