Skip to main content

nono_proxy/
reverse.rs

1//! Reverse proxy handler (Mode 2 — Credential Injection).
2//!
3//! Routes requests by path prefix to upstream APIs, injecting credentials
4//! from the keystore. The agent uses `http://localhost:PORT/openai/v1/chat`
5//! and the proxy rewrites to `https://api.openai.com/v1/chat` with the
6//! real credential injected.
7//!
8//! Supports multiple injection modes:
9//! - `header`: Inject into HTTP header (e.g., `Authorization: Bearer ...`)
10//! - `url_path`: Replace pattern in URL path (e.g., Telegram `/bot{}/`)
11//! - `query_param`: Add/replace query parameter (e.g., `?api_key=...`)
12//! - `basic_auth`: HTTP Basic Authentication
13//!
14//! Streaming responses (SSE, MCP Streamable HTTP, A2A JSON-RPC) are
15//! forwarded without buffering.
16
17use crate::audit;
18use crate::config::InjectMode;
19use crate::credential::{CredentialStore, LoadedCredential};
20use crate::error::{ProxyError, Result};
21use crate::filter::ProxyFilter;
22use crate::forward::{self, AuditCtx, UpstreamScheme, UpstreamSpec, UpstreamStrategy};
23use crate::route::RouteStore;
24use crate::token;
25use std::net::SocketAddr;
26use tokio::io::AsyncReadExt;
27use tokio::io::AsyncWriteExt;
28use tokio::net::TcpStream;
29use tokio_rustls::TlsConnector;
30use tracing::{debug, warn};
31use zeroize::Zeroizing;
32
33/// Maximum request body size (16 MiB). Prevents DoS from malicious Content-Length.
34const MAX_REQUEST_BODY: usize = 16 * 1024 * 1024;
35
36fn auth_mechanism_for_inject_mode(mode: &InjectMode) -> nono::undo::NetworkAuditAuthMechanism {
37    match mode {
38        InjectMode::Header | InjectMode::BasicAuth => {
39            nono::undo::NetworkAuditAuthMechanism::PhantomHeader
40        }
41        InjectMode::UrlPath => nono::undo::NetworkAuditAuthMechanism::PhantomPath,
42        InjectMode::QueryParam => nono::undo::NetworkAuditAuthMechanism::PhantomQuery,
43    }
44}
45
46fn audit_injection_mode_for_inject_mode(
47    mode: &InjectMode,
48) -> nono::undo::NetworkAuditInjectionMode {
49    match mode {
50        InjectMode::Header => nono::undo::NetworkAuditInjectionMode::Header,
51        InjectMode::UrlPath => nono::undo::NetworkAuditInjectionMode::UrlPath,
52        InjectMode::QueryParam => nono::undo::NetworkAuditInjectionMode::QueryParam,
53        InjectMode::BasicAuth => nono::undo::NetworkAuditInjectionMode::BasicAuth,
54    }
55}
56
57fn proxy_auth_event_ctx<'a>(route_id: &'a str) -> audit::EventContext<'a> {
58    audit::EventContext {
59        route_id: Some(route_id),
60        auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::ProxyAuthorization),
61        ..audit::EventContext::default()
62    }
63}
64
65fn managed_credential_event_ctx<'a>(
66    route_id: &'a str,
67    proxy_mode: &InjectMode,
68    inject_mode: nono::undo::NetworkAuditInjectionMode,
69) -> audit::EventContext<'a> {
70    audit::EventContext {
71        route_id: Some(route_id),
72        auth_mechanism: Some(auth_mechanism_for_inject_mode(proxy_mode)),
73        managed_credential_active: Some(true),
74        injection_mode: Some(inject_mode),
75        ..audit::EventContext::default()
76    }
77}
78
79/// Handle a non-CONNECT HTTP request (reverse proxy mode).
80///
81/// Reads the full HTTP request from the client, matches path prefix to
82/// a configured route, injects credentials, and forwards to the upstream.
83/// Shared context passed from the server to the reverse proxy handler.
84pub struct ReverseProxyCtx<'a> {
85    /// Route store for upstream URL, L7 filtering, and per-route TLS
86    pub route_store: &'a RouteStore,
87    /// Credential store for service lookups (optional injection)
88    pub credential_store: &'a CredentialStore,
89    /// Session token for authentication
90    pub session_token: &'a Zeroizing<String>,
91    /// Host filter for upstream validation
92    pub filter: &'a ProxyFilter,
93    /// Shared TLS connector
94    pub tls_connector: &'a TlsConnector,
95    /// Shared network audit sink for session metadata capture
96    pub audit_log: Option<&'a audit::SharedAuditLog>,
97}
98
99/// Handle a non-CONNECT HTTP request (reverse proxy mode).
100///
101/// `buffered_body` contains any bytes the BufReader read ahead beyond the
102/// headers. These are prepended to the body read from the stream to prevent
103/// data loss.
104///
105/// ## Phantom Token Pattern
106///
107/// The client (SDK) sends the session token as its "API key". The proxy:
108/// 1. Extracts the service from the path (e.g., `/openai/v1/chat` → `openai`)
109/// 2. Looks up which header that service uses (e.g., `Authorization` or `x-api-key`)
110/// 3. Validates the phantom token from that header
111/// 4. Replaces it with the real credential from keyring
112pub async fn handle_reverse_proxy(
113    first_line: &str,
114    stream: &mut TcpStream,
115    remaining_header: &[u8],
116    ctx: &ReverseProxyCtx<'_>,
117    buffered_body: &[u8],
118) -> Result<()> {
119    // Parse method, path, and HTTP version
120    let (method, path, version) = parse_request_line(first_line)?;
121    debug!("Reverse proxy: {} {}", method, path);
122
123    // Extract service prefix from path (e.g., "/openai/v1/chat" -> ("openai", "/v1/chat"))
124    let (service, upstream_path) = parse_service_prefix(&path)?;
125    let route = ctx
126        .route_store
127        .get(&service)
128        .ok_or_else(|| ProxyError::UnknownService {
129            prefix: service.clone(),
130        })?;
131    let static_cred = ctx.credential_store.get(&service);
132    let oauth2_route = ctx.credential_store.get_oauth2(&service);
133    let aws_route = ctx.credential_store.get_aws(&service);
134    let managed_ctx = static_cred.map(|cred| {
135        managed_credential_event_ctx(
136            &service,
137            &cred.proxy_inject_mode,
138            audit_injection_mode_for_inject_mode(&cred.inject_mode),
139        )
140    });
141    let oauth2_ctx = oauth2_route.map(|_| audit::EventContext {
142        route_id: Some(&service),
143        auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::PhantomHeader),
144        managed_credential_active: Some(true),
145        injection_mode: Some(nono::undo::NetworkAuditInjectionMode::OAuth2),
146        ..audit::EventContext::default()
147    });
148    let route_ctx = managed_ctx
149        .clone()
150        .or_else(|| oauth2_ctx.clone())
151        .unwrap_or_else(|| audit::EventContext {
152            route_id: Some(&service),
153            managed_credential_active: Some(false),
154            ..audit::EventContext::default()
155        });
156
157    if route.missing_managed_credential(
158        static_cred.is_some(),
159        oauth2_route.is_some(),
160        aws_route.is_some(),
161    ) {
162        let reason = format!(
163            "managed credential unavailable for service '{}': route is configured for proxy-supplied auth",
164            service
165        );
166        warn!("{}", reason);
167        let deny_ctx = audit::EventContext {
168            route_id: Some(&service),
169            auth_mechanism: route.managed_auth_mechanism.clone(),
170            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
171            managed_credential_active: Some(false),
172            injection_mode: route.managed_injection_mode.clone(),
173            denial_category: Some(
174                nono::undo::NetworkAuditDenialCategory::ManagedCredentialUnavailable,
175            ),
176        };
177        audit::log_denied(
178            ctx.audit_log,
179            audit::ProxyMode::Reverse,
180            &deny_ctx,
181            &service,
182            0,
183            &reason,
184        );
185        send_error(stream, 503, "Service Unavailable").await?;
186        return Ok(());
187    }
188
189    // L7 endpoint filtering runs for all reverse-proxy routes, whether or not
190    // they inject a credential.
191    if !route.endpoint_rules.is_allowed(&method, &upstream_path) {
192        let reason = format!(
193            "endpoint denied: {} {} on service '{}'",
194            method, upstream_path, service
195        );
196        warn!("{}", reason);
197        let deny_ctx = audit::EventContext {
198            denial_category: Some(nono::undo::NetworkAuditDenialCategory::EndpointPolicy),
199            ..route_ctx.clone()
200        };
201        audit::log_denied(
202            ctx.audit_log,
203            audit::ProxyMode::Reverse,
204            &deny_ctx,
205            &service,
206            0,
207            &reason,
208        );
209        send_error(stream, 403, "Forbidden").await?;
210        return Ok(());
211    }
212
213    if let Some(oauth2_route) = oauth2_route {
214        return handle_oauth2_credential(
215            oauth2_route,
216            route,
217            &service,
218            &upstream_path,
219            &method,
220            &version,
221            stream,
222            remaining_header,
223            buffered_body,
224            ctx,
225        )
226        .await;
227    }
228
229    // AWS SigV4 signing is not yet implemented. Return 501 so the caller
230    // knows the route exists but is not functional. This branch will be
231    // replaced with real SigV4 signing in a follow-up.
232    if aws_route.is_some() {
233        send_error(stream, 501, "Not Implemented").await?;
234        return Ok(());
235    }
236
237    let cred = static_cred;
238
239    // Authenticate the request. Every reverse proxy request must prove
240    // possession of the session token, regardless of whether a credential
241    // is configured — this is the localhost auth boundary.
242    if let Some(cred) = cred {
243        if let Err(e) = validate_phantom_token_for_mode(
244            &cred.proxy_inject_mode,
245            remaining_header,
246            &upstream_path,
247            &cred.proxy_header_name,
248            cred.proxy_path_pattern.as_deref(),
249            cred.proxy_query_param_name.as_deref(),
250            ctx.session_token,
251        ) {
252            let deny_ctx = audit::EventContext {
253                auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
254                denial_category: Some(nono::undo::NetworkAuditDenialCategory::AuthenticationFailed),
255                ..managed_ctx.clone().unwrap_or_else(|| route_ctx.clone())
256            };
257            audit::log_denied(
258                ctx.audit_log,
259                audit::ProxyMode::Reverse,
260                &deny_ctx,
261                &service,
262                0,
263                &e.to_string(),
264            );
265            send_error(stream, 401, "Unauthorized").await?;
266            return Ok(());
267        }
268    } else if let Err(e) = token::validate_proxy_auth(remaining_header, ctx.session_token) {
269        let deny_ctx = audit::EventContext {
270            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
271            denial_category: Some(nono::undo::NetworkAuditDenialCategory::AuthenticationFailed),
272            ..proxy_auth_event_ctx(&service)
273        };
274        audit::log_denied(
275            ctx.audit_log,
276            audit::ProxyMode::Reverse,
277            &deny_ctx,
278            &service,
279            0,
280            &e.to_string(),
281        );
282        send_error(stream, 407, "Proxy Authentication Required").await?;
283        return Ok(());
284    }
285
286    let transformed_path = if let Some(cred) = cred {
287        let cleaned_path = strip_proxy_artifacts(
288            &upstream_path,
289            &cred.proxy_inject_mode,
290            &cred.inject_mode,
291            cred.proxy_path_pattern.as_deref(),
292            cred.proxy_query_param_name.as_deref(),
293        );
294        transform_path_for_mode(
295            &cred.inject_mode,
296            &cleaned_path,
297            cred.path_pattern.as_deref(),
298            cred.path_replacement.as_deref(),
299            cred.query_param_name.as_deref(),
300            &cred.raw_credential,
301        )?
302    } else {
303        upstream_path.clone()
304    };
305
306    let upstream_url = format!(
307        "{}{}",
308        route.upstream.trim_end_matches('/'),
309        transformed_path
310    );
311    debug!("Forwarding to upstream: {} {}", method, upstream_url);
312
313    let (upstream_scheme, upstream_host, upstream_port, upstream_path_full) =
314        parse_upstream_url(&upstream_url)?;
315    let check = ctx.filter.check_host(&upstream_host, upstream_port).await?;
316    if !check.result.is_allowed() {
317        let reason = check.result.reason();
318        warn!("Upstream host denied by filter: {}", reason);
319        send_error(stream, 403, "Forbidden").await?;
320        let deny_ctx = audit::EventContext {
321            denial_category: Some(nono::undo::NetworkAuditDenialCategory::HostDenied),
322            ..route_ctx.clone()
323        };
324        audit::log_denied(
325            ctx.audit_log,
326            audit::ProxyMode::Reverse,
327            &deny_ctx,
328            &service,
329            0,
330            &reason,
331        );
332        return Ok(());
333    }
334    if let Err(reason) =
335        validate_http_upstream_target(upstream_scheme, &upstream_host, &check.resolved_addrs)
336    {
337        warn!("{}", reason);
338        send_error(stream, 502, "Bad Gateway").await?;
339        let deny_ctx = audit::EventContext {
340            denial_category: Some(nono::undo::NetworkAuditDenialCategory::UpstreamConnectFailed),
341            ..route_ctx.clone()
342        };
343        audit::log_denied(
344            ctx.audit_log,
345            audit::ProxyMode::Reverse,
346            &deny_ctx,
347            &service,
348            0,
349            &reason,
350        );
351        return Ok(());
352    }
353
354    let success_ctx = if let Some(ctx) = managed_ctx.clone() {
355        audit::EventContext {
356            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
357            ..ctx
358        }
359    } else if oauth2_ctx.is_some() {
360        audit::EventContext {
361            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
362            ..oauth2_ctx.clone().unwrap_or_default()
363        }
364    } else {
365        audit::EventContext {
366            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
367            managed_credential_active: Some(false),
368            ..proxy_auth_event_ctx(&service)
369        }
370    };
371
372    let strip_header = cred.map(|c| c.proxy_header_name.as_str()).unwrap_or("");
373    let filtered_headers = filter_headers(remaining_header, strip_header);
374    let content_length = extract_content_length(remaining_header);
375    let body = match read_request_body(stream, content_length, buffered_body).await? {
376        Some(body) => body,
377        None => return Ok(()),
378    };
379
380    let upstream_authority = format_host_header(upstream_scheme, &upstream_host, upstream_port);
381    let mut request = Zeroizing::new(format!(
382        "{} {} {}\r\nHost: {}\r\n",
383        method, upstream_path_full, version, upstream_authority
384    ));
385
386    if let Some(cred) = cred {
387        inject_credential_for_mode(cred, &mut request);
388    }
389
390    let auth_header_lower = cred.map(|c| c.header_name.to_lowercase());
391    for (name, value) in &filtered_headers {
392        if let (Some(cred), Some(header_lower)) = (cred, auth_header_lower.as_ref())
393            && matches!(cred.inject_mode, InjectMode::Header | InjectMode::BasicAuth)
394            && name.to_lowercase() == *header_lower
395        {
396            continue;
397        }
398        request.push_str(&format!("{}: {}\r\n", name, value));
399    }
400
401    request.push_str("Connection: close\r\n");
402    if !body.is_empty() {
403        request.push_str(&format!("Content-Length: {}\r\n", body.len()));
404    }
405    request.push_str("\r\n");
406
407    let connector = route.tls_connector.as_ref().unwrap_or(ctx.tls_connector);
408    let upstream_spec = UpstreamSpec {
409        scheme: upstream_scheme,
410        host: &upstream_host,
411        port: upstream_port,
412        strategy: UpstreamStrategy::Direct {
413            resolved_addrs: &check.resolved_addrs,
414        },
415        tls_connector: connector,
416    };
417    let audit_ctx = AuditCtx {
418        log: ctx.audit_log,
419        mode: audit::ProxyMode::Reverse,
420        event_ctx: success_ctx.clone(),
421        target: &service,
422        method: &method,
423        path: &upstream_path,
424    };
425    if let Err(e) =
426        forward::forward_request(stream, request.as_bytes(), &body, upstream_spec, audit_ctx).await
427    {
428        warn!("Upstream connection failed: {}", e);
429        send_error(stream, 502, "Bad Gateway").await?;
430        let deny_ctx = audit::EventContext {
431            denial_category: Some(nono::undo::NetworkAuditDenialCategory::UpstreamConnectFailed),
432            ..success_ctx.clone()
433        };
434        audit::log_denied(
435            ctx.audit_log,
436            audit::ProxyMode::Reverse,
437            &deny_ctx,
438            &service,
439            0,
440            &e.to_string(),
441        );
442    }
443    Ok(())
444}
445
446/// Handle a reverse proxy request using an OAuth2 token cache.
447///
448/// Retrieves a (possibly refreshed) access token from the cache and injects
449/// it as `Authorization: Bearer <token>`. The agent authenticates with the
450/// session token via the `Authorization: Bearer <phantom>` header, which is
451/// validated and then replaced with the real OAuth2 access token.
452#[allow(clippy::too_many_arguments)]
453async fn handle_oauth2_credential(
454    oauth2_route: &crate::credential::OAuth2Route,
455    route: &crate::route::LoadedRoute,
456    service: &str,
457    upstream_path: &str,
458    method: &str,
459    version: &str,
460    stream: &mut TcpStream,
461    remaining_header: &[u8],
462    buffered_body: &[u8],
463    ctx: &ReverseProxyCtx<'_>,
464) -> Result<()> {
465    // Get (possibly refreshed) OAuth2 access token
466    let access_token = oauth2_route.cache.get_or_refresh().await;
467
468    // Validate session token from Authorization header (phantom token pattern).
469    // OAuth2 routes still require the agent to authenticate with the session
470    // token — this prevents unauthorized access to the token-exchanged credential.
471    if let Err(e) = validate_phantom_token(remaining_header, "Authorization", ctx.session_token) {
472        let deny_ctx = audit::EventContext {
473            route_id: Some(service),
474            auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::PhantomHeader),
475            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Failed),
476            managed_credential_active: Some(true),
477            injection_mode: Some(nono::undo::NetworkAuditInjectionMode::OAuth2),
478            denial_category: Some(nono::undo::NetworkAuditDenialCategory::AuthenticationFailed),
479        };
480        audit::log_denied(
481            ctx.audit_log,
482            audit::ProxyMode::Reverse,
483            &deny_ctx,
484            service,
485            0,
486            &e.to_string(),
487        );
488        send_error(stream, 401, "Unauthorized").await?;
489        return Ok(());
490    }
491
492    let upstream_url = format!(
493        "{}{}",
494        oauth2_route.upstream.trim_end_matches('/'),
495        upstream_path
496    );
497    debug!("OAuth2 forwarding to upstream: {} {}", method, upstream_url);
498
499    let (upstream_scheme, upstream_host, upstream_port, upstream_path_full) =
500        parse_upstream_url(&upstream_url)?;
501    // DNS resolve + host check via the filter
502    let check = ctx.filter.check_host(&upstream_host, upstream_port).await?;
503    if !check.result.is_allowed() {
504        let reason = check.result.reason();
505        warn!("Upstream host denied by filter: {}", reason);
506        send_error(stream, 403, "Forbidden").await?;
507        let route_ctx = audit::EventContext {
508            route_id: Some(service),
509            managed_credential_active: Some(true),
510            injection_mode: Some(nono::undo::NetworkAuditInjectionMode::OAuth2),
511            denial_category: Some(nono::undo::NetworkAuditDenialCategory::HostDenied),
512            ..audit::EventContext::default()
513        };
514        audit::log_denied(
515            ctx.audit_log,
516            audit::ProxyMode::Reverse,
517            &route_ctx,
518            service,
519            0,
520            &reason,
521        );
522        return Ok(());
523    }
524    if let Err(reason) =
525        validate_http_upstream_target(upstream_scheme, &upstream_host, &check.resolved_addrs)
526    {
527        warn!("{}", reason);
528        send_error(stream, 502, "Bad Gateway").await?;
529        let route_ctx = audit::EventContext {
530            route_id: Some(service),
531            managed_credential_active: Some(true),
532            injection_mode: Some(nono::undo::NetworkAuditInjectionMode::OAuth2),
533            denial_category: Some(nono::undo::NetworkAuditDenialCategory::UpstreamConnectFailed),
534            ..audit::EventContext::default()
535        };
536        audit::log_denied(
537            ctx.audit_log,
538            audit::ProxyMode::Reverse,
539            &route_ctx,
540            service,
541            0,
542            &reason,
543        );
544        return Ok(());
545    }
546
547    // Collect remaining request headers, stripping the client-supplied
548    // Authorization header that carries the phantom token.
549    let filtered_headers = filter_headers(remaining_header, "Authorization");
550    let content_length = extract_content_length(remaining_header);
551
552    // Read request body
553    let body = match read_request_body(stream, content_length, buffered_body).await? {
554        Some(body) => body,
555        None => return Ok(()),
556    };
557
558    // Build upstream request with Bearer token injection
559    let upstream_authority = format_host_header(upstream_scheme, &upstream_host, upstream_port);
560    let mut request = Zeroizing::new(format!(
561        "{} {} {}\r\nHost: {}\r\n",
562        method, upstream_path_full, version, upstream_authority
563    ));
564
565    // Inject OAuth2 access token as Authorization: Bearer
566    request.push_str(&format!(
567        "Authorization: Bearer {}\r\n",
568        access_token.as_str()
569    ));
570
571    // Forward filtered headers (auth headers already stripped by filter_headers)
572    for (name, value) in &filtered_headers {
573        request.push_str(&format!("{}: {}\r\n", name, value));
574    }
575
576    if !body.is_empty() {
577        request.push_str(&format!("Content-Length: {}\r\n", body.len()));
578    }
579    request.push_str("\r\n");
580
581    let connector = route.tls_connector.as_ref().unwrap_or(ctx.tls_connector);
582    let upstream_spec = UpstreamSpec {
583        scheme: upstream_scheme,
584        host: &upstream_host,
585        port: upstream_port,
586        strategy: UpstreamStrategy::Direct {
587            resolved_addrs: &check.resolved_addrs,
588        },
589        tls_connector: connector,
590    };
591    let audit_ctx = AuditCtx {
592        log: ctx.audit_log,
593        mode: audit::ProxyMode::Reverse,
594        event_ctx: audit::EventContext {
595            route_id: Some(service),
596            auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::PhantomHeader),
597            auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
598            managed_credential_active: Some(true),
599            injection_mode: Some(nono::undo::NetworkAuditInjectionMode::OAuth2),
600            denial_category: None,
601        },
602        target: service,
603        method,
604        path: upstream_path,
605    };
606    if let Err(e) =
607        forward::forward_request(stream, request.as_bytes(), &body, upstream_spec, audit_ctx).await
608    {
609        warn!("Upstream connection failed: {}", e);
610        send_error(stream, 502, "Bad Gateway").await?;
611        audit::log_denied(
612            ctx.audit_log,
613            audit::ProxyMode::Reverse,
614            &audit::EventContext {
615                route_id: Some(service),
616                auth_mechanism: Some(nono::undo::NetworkAuditAuthMechanism::PhantomHeader),
617                auth_outcome: Some(nono::undo::NetworkAuditAuthOutcome::Succeeded),
618                managed_credential_active: Some(true),
619                injection_mode: Some(nono::undo::NetworkAuditInjectionMode::OAuth2),
620                denial_category: Some(
621                    nono::undo::NetworkAuditDenialCategory::UpstreamConnectFailed,
622                ),
623            },
624            service,
625            0,
626            &e.to_string(),
627        );
628    }
629    Ok(())
630}
631
632/// Read request body from the client stream with size limit.
633///
634/// `buffered_body` contains bytes the BufReader read ahead beyond headers.
635/// Generic over the inbound stream so the TLS-intercept handler can reuse
636/// it on a `TlsStream<…>` without duplication.
637pub(crate) async fn read_request_body<S>(
638    stream: &mut S,
639    content_length: Option<usize>,
640    buffered_body: &[u8],
641) -> Result<Option<Vec<u8>>>
642where
643    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
644{
645    if let Some(len) = content_length {
646        if len > MAX_REQUEST_BODY {
647            send_error_generic(stream, 413, "Payload Too Large").await?;
648            return Ok(None);
649        }
650        let mut buf = Vec::with_capacity(len);
651        let pre = buffered_body.len().min(len);
652        buf.extend_from_slice(&buffered_body[..pre]);
653        let remaining = len - pre;
654        if remaining > 0 {
655            let mut rest = vec![0u8; remaining];
656            stream.read_exact(&mut rest).await?;
657            buf.extend_from_slice(&rest);
658        }
659        Ok(Some(buf))
660    } else {
661        Ok(Some(Vec::new()))
662    }
663}
664
665/// Generic equivalent of `send_error` used by [`read_request_body`].
666pub(crate) async fn send_error_generic<S>(stream: &mut S, status: u16, reason: &str) -> Result<()>
667where
668    S: tokio::io::AsyncWrite + Unpin,
669{
670    let body = format!("{{\"error\":\"{}\"}}", reason);
671    let response = format!(
672        "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
673        status,
674        reason,
675        body.len(),
676        body
677    );
678    stream.write_all(response.as_bytes()).await?;
679    stream.flush().await?;
680    Ok(())
681}
682
683/// Parse an HTTP request line into (method, path, version).
684fn parse_request_line(line: &str) -> Result<(String, String, String)> {
685    let parts: Vec<&str> = line.split_whitespace().collect();
686    if parts.len() < 3 {
687        return Err(ProxyError::HttpParse(format!(
688            "malformed request line: {}",
689            line
690        )));
691    }
692    Ok((
693        parts[0].to_string(),
694        parts[1].to_string(),
695        parts[2].to_string(),
696    ))
697}
698
699/// Extract service prefix from path.
700///
701/// "/openai/v1/chat/completions" -> ("openai", "/v1/chat/completions")
702/// "/anthropic/v1/messages" -> ("anthropic", "/v1/messages")
703fn parse_service_prefix(path: &str) -> Result<(String, String)> {
704    let trimmed = path.strip_prefix('/').unwrap_or(path);
705    if let Some((prefix, rest)) = trimmed.split_once('/') {
706        Ok((prefix.to_string(), format!("/{}", rest)))
707    } else {
708        // No sub-path, just the prefix
709        Ok((trimmed.to_string(), "/".to_string()))
710    }
711}
712
713/// Validate the phantom token from the service's auth header.
714///
715/// The SDK sends the session token as its "API key" in the standard auth header
716/// for that service (e.g., `Authorization: Bearer <token>` for OpenAI,
717/// `x-api-key: <token>` for Anthropic). We validate the token matches the
718/// session token before swapping in the real credential.
719fn validate_phantom_token(
720    header_bytes: &[u8],
721    header_name: &str,
722    session_token: &Zeroizing<String>,
723) -> Result<()> {
724    let header_str = std::str::from_utf8(header_bytes).map_err(|_| ProxyError::InvalidToken)?;
725    let header_name_lower = header_name.to_lowercase();
726
727    for line in header_str.lines() {
728        let lower = line.to_lowercase();
729        if lower.starts_with(&format!("{}:", header_name_lower)) {
730            let value = line.split_once(':').map(|(_, v)| v.trim()).unwrap_or("");
731
732            // Handle "Bearer <token>" format (strip "Bearer " prefix if present)
733            // Use case-insensitive check, then slice original value by length
734            let value_lower = value.to_lowercase();
735            let token_value = if value_lower.starts_with("bearer ") {
736                // "bearer ".len() == 7
737                value[7..].trim()
738            } else {
739                value
740            };
741
742            if token::constant_time_eq(token_value.as_bytes(), session_token.as_bytes()) {
743                return Ok(());
744            }
745            warn!("Invalid phantom token in {} header", header_name);
746            return Err(ProxyError::InvalidToken);
747        }
748    }
749
750    warn!(
751        "Missing {} header for phantom token validation",
752        header_name
753    );
754    Err(ProxyError::InvalidToken)
755}
756
757/// Filter headers, removing hop-by-hop and proxy-internal headers.
758///
759/// Always strips:
760/// - `Host` (rewritten to upstream)
761/// - `Content-Length` (re-added after body is read)
762/// - `Proxy-Authorization` (hop-by-hop, contains session token)
763///
764/// When `cred_header` is non-empty, also strips that header (it contains
765/// the phantom token that must not be forwarded alongside the real credential).
766/// When `cred_header` is empty (no-credential route), all other headers
767/// including `Authorization` are passed through to the upstream.
768pub(crate) fn filter_headers(header_bytes: &[u8], cred_header: &str) -> Vec<(String, String)> {
769    let header_str = std::str::from_utf8(header_bytes).unwrap_or("");
770    let cred_header_lower = if cred_header.is_empty() {
771        String::new()
772    } else {
773        format!("{}:", cred_header.to_lowercase())
774    };
775    let mut headers = Vec::new();
776
777    for line in header_str.lines() {
778        let lower = line.to_lowercase();
779        if lower.starts_with("host:")
780            || lower.starts_with("content-length:")
781            || lower.starts_with("connection:")
782            || lower.starts_with("proxy-authorization:")
783            || (!cred_header_lower.is_empty() && lower.starts_with(&cred_header_lower))
784            || line.trim().is_empty()
785        {
786            continue;
787        }
788        if let Some((name, value)) = line.split_once(':') {
789            headers.push((name.trim().to_string(), value.trim().to_string()));
790        }
791    }
792
793    headers
794}
795
796/// Extract Content-Length value from raw headers.
797pub(crate) fn extract_content_length(header_bytes: &[u8]) -> Option<usize> {
798    let header_str = std::str::from_utf8(header_bytes).ok()?;
799    for line in header_str.lines() {
800        if line.to_lowercase().starts_with("content-length:") {
801            let value = line.split_once(':')?.1.trim();
802            return value.parse().ok();
803        }
804    }
805    None
806}
807
808fn validate_http_upstream_target(
809    scheme: UpstreamScheme,
810    host: &str,
811    resolved_addrs: &[SocketAddr],
812) -> std::result::Result<(), String> {
813    if matches!(scheme, UpstreamScheme::Https) {
814        return Ok(());
815    }
816
817    if is_local_only_target(host, resolved_addrs) {
818        Ok(())
819    } else {
820        Err(format!(
821            "refusing insecure http upstream for non-local host '{}'; http is only allowed for loopback addresses",
822            host
823        ))
824    }
825}
826
827fn is_local_only_target(host: &str, resolved_addrs: &[SocketAddr]) -> bool {
828    if !resolved_addrs.is_empty() {
829        return resolved_addrs.iter().all(|addr| addr.ip().is_loopback());
830    }
831
832    match host.parse::<std::net::IpAddr>() {
833        Ok(std::net::IpAddr::V4(ip)) => ip.is_loopback(),
834        Ok(std::net::IpAddr::V6(ip)) => ip.is_loopback(),
835        Err(_) => false,
836    }
837}
838
839pub(crate) fn format_host_header(scheme: UpstreamScheme, host: &str, port: u16) -> String {
840    let default_port = match scheme {
841        UpstreamScheme::Http => 80,
842        UpstreamScheme::Https => 443,
843    };
844    let bracketed_host = if host.contains(':') && !host.starts_with('[') {
845        format!("[{}]", host)
846    } else {
847        host.to_string()
848    };
849
850    if port == default_port {
851        bracketed_host
852    } else {
853        format!("{}:{}", bracketed_host, port)
854    }
855}
856
857fn parse_upstream_url(url_str: &str) -> Result<(UpstreamScheme, String, u16, String)> {
858    let parsed = url::Url::parse(url_str)
859        .map_err(|e| ProxyError::HttpParse(format!("invalid upstream URL '{}': {}", url_str, e)))?;
860
861    let scheme = match parsed.scheme() {
862        "https" => UpstreamScheme::Https,
863        "http" => UpstreamScheme::Http,
864        _ => {
865            return Err(ProxyError::HttpParse(format!(
866                "unsupported URL scheme: {}",
867                url_str
868            )));
869        }
870    };
871
872    let host = parsed
873        .host_str()
874        .ok_or_else(|| ProxyError::HttpParse(format!("missing host in URL: {}", url_str)))?
875        .to_string();
876
877    let default_port = if matches!(scheme, UpstreamScheme::Https) {
878        443
879    } else {
880        80
881    };
882    let port = parsed.port().unwrap_or(default_port);
883
884    let path = parsed.path().to_string();
885    let path = if path.is_empty() {
886        "/".to_string()
887    } else {
888        path
889    };
890
891    // Include query string if present
892    let path_with_query = if let Some(query) = parsed.query() {
893        format!("{}?{}", path, query)
894    } else {
895        path
896    };
897
898    Ok((scheme, host, port, path_with_query))
899}
900
901/// Send an HTTP error response.
902async fn send_error(stream: &mut TcpStream, status: u16, reason: &str) -> Result<()> {
903    let body = format!("{{\"error\":\"{}\"}}", reason);
904    let response = format!(
905        "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
906        status,
907        reason,
908        body.len(),
909        body
910    );
911    stream.write_all(response.as_bytes()).await?;
912    stream.flush().await?;
913    Ok(())
914}
915
916// ============================================================================
917// Injection mode helpers
918// ============================================================================
919
920/// Validate phantom token based on injection mode.
921///
922/// Different modes extract the phantom token from different locations:
923/// - `Header`/`BasicAuth`: From the auth header (Authorization, x-api-key, etc.)
924/// - `UrlPath`: From the URL path pattern (e.g., `/bot<token>/getMe`)
925/// - `QueryParam`: From the query parameter (e.g., `?api_key=<token>`)
926pub(crate) fn validate_phantom_token_for_mode(
927    mode: &InjectMode,
928    header_bytes: &[u8],
929    path: &str,
930    header_name: &str,
931    path_pattern: Option<&str>,
932    query_param_name: Option<&str>,
933    session_token: &Zeroizing<String>,
934) -> Result<()> {
935    match mode {
936        InjectMode::Header | InjectMode::BasicAuth => {
937            // Validate from header (existing behavior)
938            validate_phantom_token(header_bytes, header_name, session_token)
939        }
940        InjectMode::UrlPath => {
941            // Validate from URL path
942            let pattern = path_pattern.ok_or_else(|| {
943                ProxyError::HttpParse("url_path mode requires path_pattern".to_string())
944            })?;
945            validate_phantom_token_in_path(path, pattern, session_token)
946        }
947        InjectMode::QueryParam => {
948            // Validate from query parameter
949            let param_name = query_param_name.ok_or_else(|| {
950                ProxyError::HttpParse("query_param mode requires query_param_name".to_string())
951            })?;
952            validate_phantom_token_in_query(path, param_name, session_token)
953        }
954    }
955}
956
957/// Validate phantom token embedded in URL path.
958///
959/// Extracts the token from the path using the pattern (e.g., `/bot{}/` matches
960/// `/bot<token>/getMe` and extracts `<token>`).
961fn validate_phantom_token_in_path(
962    path: &str,
963    pattern: &str,
964    session_token: &Zeroizing<String>,
965) -> Result<()> {
966    // Split pattern on {} to get prefix and suffix
967    let parts: Vec<&str> = pattern.split("{}").collect();
968    if parts.len() != 2 {
969        return Err(ProxyError::HttpParse(format!(
970            "invalid path_pattern '{}': must contain exactly one {{}}",
971            pattern
972        )));
973    }
974    let (prefix, suffix) = (parts[0], parts[1]);
975
976    // Find the token in the path
977    if let Some(start) = path.find(prefix) {
978        let after_prefix = start + prefix.len();
979
980        // Handle empty suffix case (token extends to end of path or next '/' or '?')
981        let end_offset = if suffix.is_empty() {
982            path[after_prefix..]
983                .find(['/', '?'])
984                .unwrap_or(path[after_prefix..].len())
985        } else {
986            match path[after_prefix..].find(suffix) {
987                Some(offset) => offset,
988                None => {
989                    warn!("Missing phantom token in URL path (pattern: {})", pattern);
990                    return Err(ProxyError::InvalidToken);
991                }
992            }
993        };
994
995        let token = &path[after_prefix..after_prefix + end_offset];
996        if token::constant_time_eq(token.as_bytes(), session_token.as_bytes()) {
997            return Ok(());
998        }
999        warn!("Invalid phantom token in URL path");
1000        return Err(ProxyError::InvalidToken);
1001    }
1002
1003    warn!("Missing phantom token in URL path (pattern: {})", pattern);
1004    Err(ProxyError::InvalidToken)
1005}
1006
1007/// Validate phantom token in query parameter.
1008fn validate_phantom_token_in_query(
1009    path: &str,
1010    param_name: &str,
1011    session_token: &Zeroizing<String>,
1012) -> Result<()> {
1013    // Parse query string from path
1014    if let Some(query_start) = path.find('?') {
1015        let query = &path[query_start + 1..];
1016        for pair in query.split('&') {
1017            if let Some((name, value)) = pair.split_once('=')
1018                && name == param_name
1019            {
1020                let decoded = urlencoding::decode(value).unwrap_or_else(|_| value.into());
1021                if token::constant_time_eq(decoded.as_bytes(), session_token.as_bytes()) {
1022                    return Ok(());
1023                }
1024                warn!("Invalid phantom token in query parameter '{}'", param_name);
1025                return Err(ProxyError::InvalidToken);
1026            }
1027        }
1028    }
1029
1030    warn!("Missing phantom token in query parameter '{}'", param_name);
1031    Err(ProxyError::InvalidToken)
1032}
1033
1034/// Transform URL path based on injection mode.
1035///
1036/// - `UrlPath`: Replace phantom token with real credential in path
1037/// - `QueryParam`: Add/replace query parameter with real credential
1038/// - `Header`/`BasicAuth`: No path transformation needed
1039pub(crate) fn transform_path_for_mode(
1040    mode: &InjectMode,
1041    path: &str,
1042    path_pattern: Option<&str>,
1043    path_replacement: Option<&str>,
1044    query_param_name: Option<&str>,
1045    credential: &Zeroizing<String>,
1046) -> Result<String> {
1047    match mode {
1048        InjectMode::Header | InjectMode::BasicAuth => {
1049            // No path transformation needed
1050            Ok(path.to_string())
1051        }
1052        InjectMode::UrlPath => {
1053            let pattern = path_pattern.ok_or_else(|| {
1054                ProxyError::HttpParse("url_path mode requires path_pattern".to_string())
1055            })?;
1056            let replacement = path_replacement.unwrap_or(pattern);
1057            transform_url_path(path, pattern, replacement, credential)
1058        }
1059        InjectMode::QueryParam => {
1060            let param_name = query_param_name.ok_or_else(|| {
1061                ProxyError::HttpParse("query_param mode requires query_param_name".to_string())
1062            })?;
1063            transform_query_param(path, param_name, credential)
1064        }
1065    }
1066}
1067
1068/// Transform URL path by replacing phantom token pattern with real credential.
1069///
1070/// Example: `/bot<phantom>/getMe` with pattern `/bot{}/` becomes `/bot<real>/getMe`
1071fn transform_url_path(
1072    path: &str,
1073    pattern: &str,
1074    replacement: &str,
1075    credential: &Zeroizing<String>,
1076) -> Result<String> {
1077    // Split pattern on {} to get prefix and suffix
1078    let parts: Vec<&str> = pattern.split("{}").collect();
1079    if parts.len() != 2 {
1080        return Err(ProxyError::HttpParse(format!(
1081            "invalid path_pattern '{}': must contain exactly one {{}}",
1082            pattern
1083        )));
1084    }
1085    let (pattern_prefix, pattern_suffix) = (parts[0], parts[1]);
1086
1087    // Split replacement on {}
1088    let repl_parts: Vec<&str> = replacement.split("{}").collect();
1089    if repl_parts.len() != 2 {
1090        return Err(ProxyError::HttpParse(format!(
1091            "invalid path_replacement '{}': must contain exactly one {{}}",
1092            replacement
1093        )));
1094    }
1095    let (repl_prefix, repl_suffix) = (repl_parts[0], repl_parts[1]);
1096
1097    // Find and replace the token in the path
1098    if let Some(start) = path.find(pattern_prefix) {
1099        let after_prefix = start + pattern_prefix.len();
1100
1101        // Handle empty suffix case (token extends to end of path or next '/' or '?')
1102        let end_offset = if pattern_suffix.is_empty() {
1103            // Find the next path segment delimiter or end of path
1104            path[after_prefix..]
1105                .find(['/', '?'])
1106                .unwrap_or(path[after_prefix..].len())
1107        } else {
1108            // Find the suffix in the remaining path
1109            match path[after_prefix..].find(pattern_suffix) {
1110                Some(offset) => offset,
1111                None => {
1112                    return Err(ProxyError::HttpParse(format!(
1113                        "path '{}' does not match pattern '{}'",
1114                        path, pattern
1115                    )));
1116                }
1117            }
1118        };
1119
1120        let before = &path[..start];
1121        let after = &path[after_prefix + end_offset + pattern_suffix.len()..];
1122        return Ok(format!(
1123            "{}{}{}{}{}",
1124            before,
1125            repl_prefix,
1126            credential.as_str(),
1127            repl_suffix,
1128            after
1129        ));
1130    }
1131
1132    Err(ProxyError::HttpParse(format!(
1133        "path '{}' does not match pattern '{}'",
1134        path, pattern
1135    )))
1136}
1137
1138/// Transform query string by adding or replacing a parameter with the credential.
1139fn transform_query_param(
1140    path: &str,
1141    param_name: &str,
1142    credential: &Zeroizing<String>,
1143) -> Result<String> {
1144    let encoded_value = urlencoding::encode(credential.as_str());
1145
1146    if let Some(query_start) = path.find('?') {
1147        let base_path = &path[..query_start];
1148        let query = &path[query_start + 1..];
1149
1150        // Check if parameter already exists
1151        let mut found = false;
1152        let new_query: Vec<String> = query
1153            .split('&')
1154            .map(|pair| {
1155                if let Some((name, _)) = pair.split_once('=')
1156                    && name == param_name
1157                {
1158                    found = true;
1159                    return format!("{}={}", param_name, encoded_value);
1160                }
1161                pair.to_string()
1162            })
1163            .collect();
1164
1165        if found {
1166            Ok(format!("{}?{}", base_path, new_query.join("&")))
1167        } else {
1168            // Append the parameter
1169            Ok(format!(
1170                "{}?{}&{}={}",
1171                base_path, query, param_name, encoded_value
1172            ))
1173        }
1174    } else {
1175        // No query string, add one
1176        Ok(format!("{}?{}={}", path, param_name, encoded_value))
1177    }
1178}
1179
1180/// Strip proxy-side artifacts from the path when proxy and upstream modes differ.
1181///
1182/// When the proxy validates the phantom token using a different injection mode
1183/// than the upstream (e.g., proxy uses `url_path` or `query_param` while upstream
1184/// uses `header`), the proxy-side token is embedded in the URL. This function
1185/// removes it before the path is forwarded to the upstream, preventing phantom
1186/// token leakage.
1187///
1188/// When both modes are the same, the upstream transform handles replacement
1189/// (phantom → real credential), so no stripping is needed.
1190pub(crate) fn strip_proxy_artifacts(
1191    path: &str,
1192    proxy_mode: &InjectMode,
1193    upstream_mode: &InjectMode,
1194    proxy_path_pattern: Option<&str>,
1195    proxy_query_param_name: Option<&str>,
1196) -> String {
1197    // Only strip when modes differ — same-mode cases are handled by the
1198    // upstream transform which replaces the phantom token with the real one.
1199    if proxy_mode == upstream_mode {
1200        return path.to_string();
1201    }
1202
1203    match proxy_mode {
1204        InjectMode::UrlPath => {
1205            if let Some(pattern) = proxy_path_pattern {
1206                strip_proxy_path_token(path, pattern)
1207            } else {
1208                path.to_string()
1209            }
1210        }
1211        InjectMode::QueryParam => {
1212            if let Some(param_name) = proxy_query_param_name {
1213                strip_proxy_query_param(path, param_name)
1214            } else {
1215                path.to_string()
1216            }
1217        }
1218        // Header and BasicAuth modes don't embed artifacts in the URL path.
1219        InjectMode::Header | InjectMode::BasicAuth => path.to_string(),
1220    }
1221}
1222
1223/// Remove a phantom token path segment matched by the given pattern.
1224///
1225/// Example: path `/TOKEN123/api/v1/pods` with pattern `/{}/` → `/api/v1/pods`
1226fn strip_proxy_path_token(path: &str, pattern: &str) -> String {
1227    let parts: Vec<&str> = pattern.split("{}").collect();
1228    if parts.len() != 2 {
1229        return path.to_string();
1230    }
1231    let (prefix, suffix) = (parts[0], parts[1]);
1232
1233    // Prefer matching at the start of the path to avoid false hits on
1234    // common prefixes like "/" that would otherwise match at position 0
1235    // even if the intended token is in a later segment.
1236    let start = if path.starts_with(prefix) {
1237        Some(0)
1238    } else {
1239        path.find(prefix)
1240    };
1241
1242    if let Some(start) = start {
1243        let after_prefix = start + prefix.len();
1244        let end_offset = if suffix.is_empty() {
1245            path[after_prefix..]
1246                .find(['/', '?'])
1247                .unwrap_or(path[after_prefix..].len())
1248        } else {
1249            match path[after_prefix..].find(suffix) {
1250                Some(offset) => offset,
1251                None => return path.to_string(),
1252            }
1253        };
1254
1255        let before = &path[..start];
1256        let after = &path[after_prefix + end_offset + suffix.len()..];
1257
1258        // Join before and after with exactly one separator to avoid
1259        // malformed paths: "/prefixapi" (missing slash) or "/api//v1"
1260        // (double slash) when the stripped segment was mid-path.
1261        let joined = match (before.ends_with('/'), after.starts_with('/')) {
1262            (true, true) => format!("{}{}", before, &after[1..]),
1263            (false, false) if !before.is_empty() && !after.is_empty() => {
1264                format!("{}/{}", before, after)
1265            }
1266            _ => format!("{}{}", before, after),
1267        };
1268
1269        if joined.is_empty() || !joined.starts_with('/') {
1270            format!("/{}", joined)
1271        } else {
1272            joined
1273        }
1274    } else {
1275        path.to_string()
1276    }
1277}
1278
1279/// Remove a phantom token query parameter from the URL.
1280///
1281/// Example: path `/api/v1/pods?token=XXX&limit=10` → `/api/v1/pods?limit=10`
1282fn strip_proxy_query_param(path: &str, param_name: &str) -> String {
1283    if let Some(query_start) = path.find('?') {
1284        let base_path = &path[..query_start];
1285        let query = &path[query_start + 1..];
1286
1287        let remaining: Vec<&str> = query
1288            .split('&')
1289            .filter(|pair| {
1290                pair.split_once('=')
1291                    .map(|(name, _)| name != param_name)
1292                    .unwrap_or(true)
1293            })
1294            .collect();
1295
1296        if remaining.is_empty() {
1297            base_path.to_string()
1298        } else {
1299            format!("{}?{}", base_path, remaining.join("&"))
1300        }
1301    } else {
1302        path.to_string()
1303    }
1304}
1305
1306/// Inject credential into request based on mode.
1307///
1308/// For header/basic_auth modes, adds the credential header.
1309/// For url_path/query_param modes, the credential is already in the path.
1310pub(crate) fn inject_credential_for_mode(cred: &LoadedCredential, request: &mut Zeroizing<String>) {
1311    match cred.inject_mode {
1312        InjectMode::Header | InjectMode::BasicAuth => {
1313            // Inject credential header
1314            request.push_str(&format!(
1315                "{}: {}\r\n",
1316                cred.header_name,
1317                cred.header_value.as_str()
1318            ));
1319        }
1320        InjectMode::UrlPath | InjectMode::QueryParam => {
1321            // Credential is already injected into the URL path/query
1322            // No header injection needed
1323        }
1324    }
1325}
1326
1327#[cfg(test)]
1328#[allow(clippy::unwrap_used)]
1329mod tests {
1330    use super::*;
1331
1332    #[test]
1333    fn test_parse_request_line() {
1334        let (method, path, version) = parse_request_line("POST /openai/v1/chat HTTP/1.1").unwrap();
1335        assert_eq!(method, "POST");
1336        assert_eq!(path, "/openai/v1/chat");
1337        assert_eq!(version, "HTTP/1.1");
1338    }
1339
1340    #[test]
1341    fn test_parse_request_line_malformed() {
1342        assert!(parse_request_line("GET").is_err());
1343    }
1344
1345    #[test]
1346    fn test_parse_service_prefix() {
1347        let (service, path) = parse_service_prefix("/openai/v1/chat/completions").unwrap();
1348        assert_eq!(service, "openai");
1349        assert_eq!(path, "/v1/chat/completions");
1350    }
1351
1352    #[test]
1353    fn test_parse_service_prefix_no_subpath() {
1354        let (service, path) = parse_service_prefix("/anthropic").unwrap();
1355        assert_eq!(service, "anthropic");
1356        assert_eq!(path, "/");
1357    }
1358
1359    #[test]
1360    fn test_validate_phantom_token_bearer_valid() {
1361        let token = Zeroizing::new("secret123".to_string());
1362        let header = b"Authorization: Bearer secret123\r\nContent-Type: application/json\r\n\r\n";
1363        assert!(validate_phantom_token(header, "Authorization", &token).is_ok());
1364    }
1365
1366    #[test]
1367    fn test_validate_phantom_token_bearer_invalid() {
1368        let token = Zeroizing::new("secret123".to_string());
1369        let header = b"Authorization: Bearer wrong\r\n\r\n";
1370        assert!(validate_phantom_token(header, "Authorization", &token).is_err());
1371    }
1372
1373    #[test]
1374    fn test_validate_phantom_token_x_api_key_valid() {
1375        let token = Zeroizing::new("secret123".to_string());
1376        let header = b"x-api-key: secret123\r\nContent-Type: application/json\r\n\r\n";
1377        assert!(validate_phantom_token(header, "x-api-key", &token).is_ok());
1378    }
1379
1380    #[test]
1381    fn test_validate_phantom_token_x_goog_api_key_valid() {
1382        let token = Zeroizing::new("secret123".to_string());
1383        let header = b"x-goog-api-key: secret123\r\nContent-Type: application/json\r\n\r\n";
1384        assert!(validate_phantom_token(header, "x-goog-api-key", &token).is_ok());
1385    }
1386
1387    #[test]
1388    fn test_validate_phantom_token_missing() {
1389        let token = Zeroizing::new("secret123".to_string());
1390        let header = b"Content-Type: application/json\r\n\r\n";
1391        assert!(validate_phantom_token(header, "Authorization", &token).is_err());
1392    }
1393
1394    #[test]
1395    fn test_validate_phantom_token_case_insensitive_header() {
1396        let token = Zeroizing::new("secret123".to_string());
1397        let header = b"AUTHORIZATION: Bearer secret123\r\n\r\n";
1398        assert!(validate_phantom_token(header, "Authorization", &token).is_ok());
1399    }
1400
1401    #[test]
1402    fn test_filter_headers_removes_host_auth() {
1403        let header = b"Host: localhost:8080\r\nAuthorization: Bearer old\r\nContent-Type: application/json\r\nAccept: */*\r\n\r\n";
1404        let filtered = filter_headers(header, "Authorization");
1405        assert_eq!(filtered.len(), 2);
1406        assert_eq!(filtered[0].0, "Content-Type");
1407        assert_eq!(filtered[1].0, "Accept");
1408    }
1409
1410    #[test]
1411    fn test_filter_headers_removes_x_api_key() {
1412        let header = b"x-api-key: sk-old\r\nContent-Type: application/json\r\n\r\n";
1413        let filtered = filter_headers(header, "x-api-key");
1414        assert_eq!(filtered.len(), 1);
1415        assert_eq!(filtered[0].0, "Content-Type");
1416    }
1417
1418    #[test]
1419    fn test_filter_headers_removes_custom_header() {
1420        let header = b"PRIVATE-TOKEN: phantom123\r\nContent-Type: application/json\r\n\r\n";
1421        let filtered = filter_headers(header, "PRIVATE-TOKEN");
1422        assert_eq!(filtered.len(), 1);
1423        assert_eq!(filtered[0].0, "Content-Type");
1424    }
1425
1426    #[test]
1427    fn test_extract_content_length() {
1428        let header = b"Content-Type: application/json\r\nContent-Length: 42\r\n\r\n";
1429        assert_eq!(extract_content_length(header), Some(42));
1430    }
1431
1432    #[test]
1433    fn test_extract_content_length_missing() {
1434        let header = b"Content-Type: application/json\r\n\r\n";
1435        assert_eq!(extract_content_length(header), None);
1436    }
1437
1438    #[test]
1439    fn test_parse_upstream_url_https() {
1440        let (scheme, host, port, path) =
1441            parse_upstream_url("https://api.openai.com/v1/chat/completions").unwrap();
1442        assert_eq!(scheme, UpstreamScheme::Https);
1443        assert_eq!(host, "api.openai.com");
1444        assert_eq!(port, 443);
1445        assert_eq!(path, "/v1/chat/completions");
1446    }
1447
1448    #[test]
1449    fn test_parse_upstream_url_http_with_port() {
1450        let (scheme, host, port, path) = parse_upstream_url("http://localhost:8080/api").unwrap();
1451        assert_eq!(scheme, UpstreamScheme::Http);
1452        assert_eq!(host, "localhost");
1453        assert_eq!(port, 8080);
1454        assert_eq!(path, "/api");
1455    }
1456
1457    #[test]
1458    fn test_parse_upstream_url_no_path() {
1459        let (scheme, host, port, path) = parse_upstream_url("https://api.anthropic.com").unwrap();
1460        assert_eq!(scheme, UpstreamScheme::Https);
1461        assert_eq!(host, "api.anthropic.com");
1462        assert_eq!(port, 443);
1463        assert_eq!(path, "/");
1464    }
1465
1466    #[test]
1467    fn test_parse_upstream_url_invalid_scheme() {
1468        assert!(parse_upstream_url("ftp://example.com").is_err());
1469    }
1470
1471    #[test]
1472    fn test_validate_http_upstream_target_rejects_non_local_host() {
1473        let err = validate_http_upstream_target(UpstreamScheme::Http, "api.example.com", &[])
1474            .expect_err("non-local http upstream should be rejected");
1475        assert!(err.contains("refusing insecure http upstream"));
1476    }
1477
1478    #[test]
1479    fn test_validate_http_upstream_target_allows_loopback() {
1480        let loopback = [SocketAddr::from(([127, 0, 0, 1], 8080))];
1481        assert!(validate_http_upstream_target(UpstreamScheme::Http, "127.0.0.1", &[]).is_ok());
1482        assert!(validate_http_upstream_target(UpstreamScheme::Http, "::1", &[]).is_ok());
1483        assert!(
1484            validate_http_upstream_target(UpstreamScheme::Http, "localhost", &loopback).is_ok()
1485        );
1486    }
1487
1488    #[test]
1489    fn test_validate_http_upstream_target_rejects_unspecified_addresses() {
1490        let unspecified = [SocketAddr::from(([0, 0, 0, 0], 8080))];
1491        let err = validate_http_upstream_target(UpstreamScheme::Http, "0.0.0.0", &[])
1492            .expect_err("unspecified http upstream should be rejected");
1493        assert!(err.contains("loopback addresses"));
1494
1495        let err = validate_http_upstream_target(UpstreamScheme::Http, "localhost", &unspecified)
1496            .expect_err("localhost resolving to unspecified should be rejected");
1497        assert!(err.contains("loopback addresses"));
1498    }
1499
1500    #[test]
1501    fn test_validate_http_upstream_target_rejects_localhost_resolving_non_loopback() {
1502        let poisoned = [SocketAddr::from(([203, 0, 113, 10], 8080))];
1503        let err = validate_http_upstream_target(UpstreamScheme::Http, "localhost", &poisoned)
1504            .expect_err("localhost resolving off-host should be rejected");
1505        assert!(err.contains("refusing insecure http upstream"));
1506    }
1507
1508    #[test]
1509    fn test_format_host_header_uses_port_for_non_default_http() {
1510        assert_eq!(
1511            format_host_header(UpstreamScheme::Http, "localhost", 8080),
1512            "localhost:8080"
1513        );
1514    }
1515
1516    #[test]
1517    fn test_format_host_header_omits_default_https_port() {
1518        assert_eq!(
1519            format_host_header(UpstreamScheme::Https, "api.openai.com", 443),
1520            "api.openai.com"
1521        );
1522    }
1523
1524    #[test]
1525    fn test_format_host_header_brackets_ipv6() {
1526        assert_eq!(
1527            format_host_header(UpstreamScheme::Http, "::1", 8080),
1528            "[::1]:8080"
1529        );
1530    }
1531
1532    // Status-line parsing moved to `crate::forward` along with the upstream
1533    // response-streaming pipeline; coverage continues there.
1534
1535    // ============================================================================
1536    // URL Path Injection Mode Tests
1537    // ============================================================================
1538
1539    #[test]
1540    fn test_validate_phantom_token_in_path_valid() {
1541        let token = Zeroizing::new("session123".to_string());
1542        let path = "/bot/session123/getMe";
1543        let pattern = "/bot/{}/";
1544        assert!(validate_phantom_token_in_path(path, pattern, &token).is_ok());
1545    }
1546
1547    #[test]
1548    fn test_validate_phantom_token_in_path_invalid() {
1549        let token = Zeroizing::new("session123".to_string());
1550        let path = "/bot/wrong_token/getMe";
1551        let pattern = "/bot/{}/";
1552        assert!(validate_phantom_token_in_path(path, pattern, &token).is_err());
1553    }
1554
1555    #[test]
1556    fn test_validate_phantom_token_in_path_missing() {
1557        let token = Zeroizing::new("session123".to_string());
1558        let path = "/api/getMe";
1559        let pattern = "/bot/{}/";
1560        assert!(validate_phantom_token_in_path(path, pattern, &token).is_err());
1561    }
1562
1563    #[test]
1564    fn test_transform_url_path_basic() {
1565        let credential = Zeroizing::new("real_token".to_string());
1566        let path = "/bot/phantom_token/getMe";
1567        let pattern = "/bot/{}/";
1568        let replacement = "/bot/{}/";
1569        let result = transform_url_path(path, pattern, replacement, &credential).unwrap();
1570        assert_eq!(result, "/bot/real_token/getMe");
1571    }
1572
1573    #[test]
1574    fn test_transform_url_path_different_replacement() {
1575        let credential = Zeroizing::new("real_token".to_string());
1576        let path = "/api/v1/phantom_token/chat";
1577        let pattern = "/api/v1/{}/";
1578        let replacement = "/v2/bot/{}/";
1579        let result = transform_url_path(path, pattern, replacement, &credential).unwrap();
1580        assert_eq!(result, "/v2/bot/real_token/chat");
1581    }
1582
1583    #[test]
1584    fn test_transform_url_path_no_trailing_slash() {
1585        let credential = Zeroizing::new("real_token".to_string());
1586        let path = "/bot/phantom_token";
1587        let pattern = "/bot/{}";
1588        let replacement = "/bot/{}";
1589        let result = transform_url_path(path, pattern, replacement, &credential).unwrap();
1590        assert_eq!(result, "/bot/real_token");
1591    }
1592
1593    // ============================================================================
1594    // Query Param Injection Mode Tests
1595    // ============================================================================
1596
1597    #[test]
1598    fn test_validate_phantom_token_in_query_valid() {
1599        let token = Zeroizing::new("session123".to_string());
1600        let path = "/api/data?api_key=session123&other=value";
1601        assert!(validate_phantom_token_in_query(path, "api_key", &token).is_ok());
1602    }
1603
1604    #[test]
1605    fn test_validate_phantom_token_in_query_invalid() {
1606        let token = Zeroizing::new("session123".to_string());
1607        let path = "/api/data?api_key=wrong_token";
1608        assert!(validate_phantom_token_in_query(path, "api_key", &token).is_err());
1609    }
1610
1611    #[test]
1612    fn test_validate_phantom_token_in_query_missing_param() {
1613        let token = Zeroizing::new("session123".to_string());
1614        let path = "/api/data?other=value";
1615        assert!(validate_phantom_token_in_query(path, "api_key", &token).is_err());
1616    }
1617
1618    #[test]
1619    fn test_validate_phantom_token_in_query_no_query_string() {
1620        let token = Zeroizing::new("session123".to_string());
1621        let path = "/api/data";
1622        assert!(validate_phantom_token_in_query(path, "api_key", &token).is_err());
1623    }
1624
1625    #[test]
1626    fn test_validate_phantom_token_in_query_url_encoded() {
1627        let token = Zeroizing::new("token with spaces".to_string());
1628        let path = "/api/data?api_key=token%20with%20spaces";
1629        assert!(validate_phantom_token_in_query(path, "api_key", &token).is_ok());
1630    }
1631
1632    #[test]
1633    fn test_transform_query_param_add_to_no_query() {
1634        let credential = Zeroizing::new("real_key".to_string());
1635        let path = "/api/data";
1636        let result = transform_query_param(path, "api_key", &credential).unwrap();
1637        assert_eq!(result, "/api/data?api_key=real_key");
1638    }
1639
1640    #[test]
1641    fn test_transform_query_param_add_to_existing_query() {
1642        let credential = Zeroizing::new("real_key".to_string());
1643        let path = "/api/data?other=value";
1644        let result = transform_query_param(path, "api_key", &credential).unwrap();
1645        assert_eq!(result, "/api/data?other=value&api_key=real_key");
1646    }
1647
1648    #[test]
1649    fn test_transform_query_param_replace_existing() {
1650        let credential = Zeroizing::new("real_key".to_string());
1651        let path = "/api/data?api_key=phantom&other=value";
1652        let result = transform_query_param(path, "api_key", &credential).unwrap();
1653        assert_eq!(result, "/api/data?api_key=real_key&other=value");
1654    }
1655
1656    #[test]
1657    fn test_transform_query_param_url_encodes_special_chars() {
1658        let credential = Zeroizing::new("key with spaces".to_string());
1659        let path = "/api/data";
1660        let result = transform_query_param(path, "api_key", &credential).unwrap();
1661        assert_eq!(result, "/api/data?api_key=key%20with%20spaces");
1662    }
1663
1664    #[test]
1665    fn test_validate_phantom_token_uses_proxy_mode_over_upstream_mode() {
1666        let token = Zeroizing::new("session123".to_string());
1667        let header = b"Authorization: Bearer session123\r\n\r\n";
1668        let path = "/api/data?api_key=wrong";
1669
1670        // Simulate split config where proxy-side mode is header while upstream
1671        // mode might be query_param.
1672        let result = validate_phantom_token_for_mode(
1673            &InjectMode::Header,
1674            header,
1675            path,
1676            "Authorization",
1677            None,
1678            Some("api_key"),
1679            &token,
1680        );
1681
1682        assert!(result.is_ok());
1683    }
1684
1685    #[test]
1686    fn test_transform_path_uses_upstream_mode_independently() {
1687        let credential = Zeroizing::new("real_key".to_string());
1688        let path = "/api/data?api_key=phantom";
1689
1690        // Simulate split config where upstream mode is query_param.
1691        let transformed = transform_path_for_mode(
1692            &InjectMode::QueryParam,
1693            path,
1694            None,
1695            None,
1696            Some("api_key"),
1697            &credential,
1698        )
1699        .expect("query-param transform should succeed");
1700
1701        assert_eq!(transformed, "/api/data?api_key=real_key");
1702    }
1703
1704    // ========================================================================
1705    // Proxy artifact stripping tests
1706    // ========================================================================
1707
1708    #[test]
1709    fn test_strip_proxy_path_token_basic() {
1710        // Pattern: /{}/  — token is the first path segment
1711        let result = strip_proxy_path_token("/PHANTOM123/api/v1/pods", "/{}/");
1712        assert_eq!(result, "/api/v1/pods");
1713    }
1714
1715    #[test]
1716    fn test_strip_proxy_path_token_nested_pattern() {
1717        // Pattern: /auth/{}/  — token is in a nested segment
1718        let result = strip_proxy_path_token("/auth/PHANTOM123/api/v1/pods", "/auth/{}/");
1719        assert_eq!(result, "/api/v1/pods");
1720    }
1721
1722    #[test]
1723    fn test_strip_proxy_path_token_no_trailing_slash() {
1724        // Pattern: /{}  — token at end of path with no trailing content
1725        let result = strip_proxy_path_token("/PHANTOM123", "/{}");
1726        assert_eq!(result, "/");
1727    }
1728
1729    #[test]
1730    fn test_strip_proxy_path_token_preserves_query() {
1731        // Pattern: /{}/  — should preserve query string after stripping
1732        let result = strip_proxy_path_token("/PHANTOM123/api?limit=10", "/{}/");
1733        assert_eq!(result, "/api?limit=10");
1734    }
1735
1736    #[test]
1737    fn test_strip_proxy_path_token_no_match() {
1738        // Pattern doesn't match — return path unchanged
1739        let result = strip_proxy_path_token("/api/v1/pods", "/auth/{}/");
1740        assert_eq!(result, "/api/v1/pods");
1741    }
1742
1743    #[test]
1744    fn test_strip_proxy_path_token_mid_path_slash_join() {
1745        // Token in the middle: before="/api" after="data" must join with "/"
1746        let result = strip_proxy_path_token("/api/k8s/PHANTOM/data", "/k8s/{}/");
1747        assert_eq!(result, "/api/data");
1748    }
1749
1750    #[test]
1751    fn test_strip_proxy_path_token_no_double_slash() {
1752        // Before ends with "/" and after starts with "/" — collapse to one
1753        let result = strip_proxy_path_token("/prefix/PHANTOM//suffix", "/prefix/{}/");
1754        assert_eq!(result, "/suffix");
1755    }
1756
1757    #[test]
1758    fn test_strip_proxy_query_param_only_param() {
1759        let result = strip_proxy_query_param("/api/v1/pods?token=PHANTOM123", "token");
1760        assert_eq!(result, "/api/v1/pods");
1761    }
1762
1763    #[test]
1764    fn test_strip_proxy_query_param_with_other_params() {
1765        let result = strip_proxy_query_param("/api/v1/pods?token=PHANTOM123&limit=10", "token");
1766        assert_eq!(result, "/api/v1/pods?limit=10");
1767    }
1768
1769    #[test]
1770    fn test_strip_proxy_query_param_middle() {
1771        let result =
1772            strip_proxy_query_param("/api/v1/pods?limit=10&token=PHANTOM123&watch=true", "token");
1773        assert_eq!(result, "/api/v1/pods?limit=10&watch=true");
1774    }
1775
1776    #[test]
1777    fn test_strip_proxy_query_param_no_match() {
1778        let result = strip_proxy_query_param("/api/v1/pods?limit=10", "token");
1779        assert_eq!(result, "/api/v1/pods?limit=10");
1780    }
1781
1782    #[test]
1783    fn test_strip_proxy_query_param_no_query_string() {
1784        let result = strip_proxy_query_param("/api/v1/pods", "token");
1785        assert_eq!(result, "/api/v1/pods");
1786    }
1787
1788    #[test]
1789    fn test_strip_proxy_artifacts_same_mode_noop() {
1790        // When proxy and upstream use the same mode, no stripping (upstream transform handles it)
1791        let path = "/PHANTOM123/api/v1/pods";
1792        let result = strip_proxy_artifacts(
1793            path,
1794            &InjectMode::UrlPath,
1795            &InjectMode::UrlPath,
1796            Some("/{}/"),
1797            None,
1798        );
1799        assert_eq!(result, path);
1800    }
1801
1802    #[test]
1803    fn test_strip_proxy_artifacts_url_path_to_header() {
1804        // Proxy uses url_path, upstream uses header — must strip path token
1805        let result = strip_proxy_artifacts(
1806            "/PHANTOM123/api/v1/pods",
1807            &InjectMode::UrlPath,
1808            &InjectMode::Header,
1809            Some("/{}/"),
1810            None,
1811        );
1812        assert_eq!(result, "/api/v1/pods");
1813    }
1814
1815    #[test]
1816    fn test_strip_proxy_artifacts_query_param_to_header() {
1817        // Proxy uses query_param, upstream uses header — must strip query param
1818        let result = strip_proxy_artifacts(
1819            "/api/v1/pods?token=PHANTOM123",
1820            &InjectMode::QueryParam,
1821            &InjectMode::Header,
1822            None,
1823            Some("token"),
1824        );
1825        assert_eq!(result, "/api/v1/pods");
1826    }
1827
1828    #[test]
1829    fn test_strip_proxy_artifacts_header_to_query_param() {
1830        // Proxy uses header, upstream uses query_param — no URL artifacts to strip
1831        let path = "/api/v1/pods";
1832        let result = strip_proxy_artifacts(
1833            path,
1834            &InjectMode::Header,
1835            &InjectMode::QueryParam,
1836            None,
1837            None,
1838        );
1839        assert_eq!(result, path);
1840    }
1841
1842    #[test]
1843    fn test_end_to_end_url_path_proxy_header_upstream() {
1844        // Full flow: proxy validates via url_path, upstream injects via header.
1845        // The path token must be stripped before forwarding.
1846        let token = Zeroizing::new("session456".to_string());
1847        let credential = Zeroizing::new("real_bearer_token".to_string());
1848        let path = "/session456/api/v1/namespaces";
1849
1850        // 1. Proxy-side validation succeeds
1851        assert!(
1852            validate_phantom_token_for_mode(
1853                &InjectMode::UrlPath,
1854                b"\r\n\r\n", // no auth header needed for url_path mode
1855                path,
1856                "Authorization",
1857                Some("/{}/"),
1858                None,
1859                &token,
1860            )
1861            .is_ok()
1862        );
1863
1864        // 2. Strip proxy artifacts
1865        let cleaned = strip_proxy_artifacts(
1866            path,
1867            &InjectMode::UrlPath,
1868            &InjectMode::Header,
1869            Some("/{}/"),
1870            None,
1871        );
1872        assert_eq!(cleaned, "/api/v1/namespaces");
1873
1874        // 3. Upstream transform (header mode = no path change)
1875        let transformed =
1876            transform_path_for_mode(&InjectMode::Header, &cleaned, None, None, None, &credential)
1877                .unwrap();
1878        assert_eq!(transformed, "/api/v1/namespaces");
1879    }
1880
1881    #[test]
1882    fn test_end_to_end_query_param_proxy_header_upstream() {
1883        // Full flow: proxy validates via query_param, upstream injects via header.
1884        let token = Zeroizing::new("session789".to_string());
1885        let credential = Zeroizing::new("real_bearer_token".to_string());
1886        let path = "/api/v1/pods?token=session789&limit=100";
1887
1888        // 1. Proxy-side validation succeeds
1889        assert!(
1890            validate_phantom_token_for_mode(
1891                &InjectMode::QueryParam,
1892                b"\r\n\r\n",
1893                path,
1894                "Authorization",
1895                None,
1896                Some("token"),
1897                &token,
1898            )
1899            .is_ok()
1900        );
1901
1902        // 2. Strip proxy artifacts
1903        let cleaned = strip_proxy_artifacts(
1904            path,
1905            &InjectMode::QueryParam,
1906            &InjectMode::Header,
1907            None,
1908            Some("token"),
1909        );
1910        assert_eq!(cleaned, "/api/v1/pods?limit=100");
1911
1912        // 3. Upstream transform (header mode = no path change)
1913        let transformed =
1914            transform_path_for_mode(&InjectMode::Header, &cleaned, None, None, None, &credential)
1915                .unwrap();
1916        assert_eq!(transformed, "/api/v1/pods?limit=100");
1917    }
1918}