Skip to main content

osproxy_server/
handler.rs

1//! The ingress handler: authenticates the caller, builds a request context, and
2//! drives the engine pipeline, mapping the outcome to an HTTP response.
3//
4// JUSTIFY(file-length): the single ingress-orchestration point, pre-auth
5// introspection routing, the TLS and auth/authz gates, and data-plane dispatch
6// are one cohesive flow over the handler's private state; splitting it would
7// force those fields pub(crate) and scatter the request lifecycle across files.
8
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use osproxy_core::{Clock, EndpointKind, ErrorCode, RequestId};
13use osproxy_engine::{Pipeline, PipelineResponse, RequestError};
14use osproxy_observe::{
15    decode_directive_set, DirectiveStore, InMemoryDirectiveStore, Metrics, PoolSnapshot,
16};
17use osproxy_sink::OpenSearchSink;
18use osproxy_spi::{
19    Action, AuthError, Authenticator, Authorizer, ClientCredentials, HeaderView, HttpMethod,
20    Principal, RequestCtx,
21};
22use osproxy_tenancy::TenancyRouter;
23use osproxy_transport::{
24    Incoming, IngressHandler, IngressRequest, IngressResponse, StreamingResponse,
25};
26
27use crate::auth::AllowAllAuthorizer;
28use crate::forward_headers::ForwardPolicy;
29use crate::log::{NoLog, RequestLog};
30use crate::tenancy::ReferenceTenancy;
31use osproxy_capture::{Capture, CaptureRecord, NoCapture};
32
33/// The privileged fleet-directive admin channel: a shared store to publish into,
34/// gated by a bearer token, with a clock to resolve relative TTLs.
35struct DirectiveAdmin {
36    store: Arc<InMemoryDirectiveStore>,
37    token: String,
38    clock: Arc<dyn Clock>,
39}
40
41/// The concrete pipeline this binary serves.
42pub type AppPipeline = Pipeline<TenancyRouter<ReferenceTenancy>, OpenSearchSink>;
43
44/// Adapts the engine pipeline to the transport's [`IngressHandler`] contract,
45/// authenticating each request with the configured [`Authenticator`] and, after
46/// authentication, authorizing it with the configured [`Authorizer`] (default
47/// [`AllowAllAuthorizer`], no second policy layer until one is supplied).
48pub struct AppHandler<A, Z = AllowAllAuthorizer> {
49    pipeline: AppPipeline,
50    authenticator: A,
51    authorizer: Z,
52    request_seq: AtomicU64,
53    request_log: Box<dyn RequestLog>,
54    directive_admin: Option<DirectiveAdmin>,
55    metrics: Metrics,
56    /// When true (default), a body-mutating request over cleartext is refused
57    /// (NFR-S1), the proxy must terminate TLS to rewrite the stream. An operator
58    /// on a trusted network can opt out.
59    require_tls_for_mutation: bool,
60    /// When true (default), the pre-auth `/debug/explain` and `/debug/breakglass`
61    /// surfaces are served. They are shape-only, but still expose operational
62    /// metadata to anyone who can reach the port, so production deployments turn
63    /// them off; disabled, both report `not_enabled` (`/metrics` stays on).
64    debug_endpoints: bool,
65    /// Full-fidelity traffic capture (off by default). When enabled, each
66    /// forwarded data-plane exchange is teed to this sink for replay/audit. Unlike
67    /// the shape-only telemetry, the records carry bodies and values, so capture
68    /// is deliberate and the stream is privileged (`capture` module).
69    capture: Box<dyn Capture>,
70    /// Which client headers to relay verbatim to the upstream. Default pass-all
71    /// (sidecar trust), minus the mandatory hop-by-hop/framing set. Computed from
72    /// the *raw* request headers (so the client `Authorization` is available),
73    /// separate from the auth-stripped view the pipeline routes on.
74    forward_policy: ForwardPolicy,
75}
76
77impl<A, Z> std::fmt::Debug for AppHandler<A, Z> {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        // The injected logger is not `Debug`; show whether it is enabled.
80        f.debug_struct("AppHandler")
81            .field("logging", &self.request_log.enabled())
82            .finish_non_exhaustive()
83    }
84}
85
86impl<A: Authenticator> AppHandler<A, AllowAllAuthorizer> {
87    /// Wraps a pipeline and an authenticator (no request logging by default, and
88    /// the allow-all authorizer until [`Self::with_authorizer`] supplies one).
89    #[must_use]
90    pub fn new(pipeline: AppPipeline, authenticator: A) -> Self {
91        Self {
92            pipeline,
93            authenticator,
94            authorizer: AllowAllAuthorizer,
95            request_seq: AtomicU64::new(0),
96            request_log: Box::new(NoLog),
97            directive_admin: None,
98            metrics: Metrics::new(),
99            require_tls_for_mutation: true,
100            debug_endpoints: true,
101            capture: Box::new(NoCapture),
102            forward_policy: ForwardPolicy::pass_all(),
103        }
104    }
105}
106
107impl<A: Authenticator, Z: Authorizer> AppHandler<A, Z> {
108    /// Sets the post-authentication [`Authorizer`] (builder style). Replaces the
109    /// default allow-all policy; the principal is already resolved, so the
110    /// authorizer decides only whether that principal may perform the action.
111    #[must_use]
112    pub fn with_authorizer<Z2: Authorizer>(self, authorizer: Z2) -> AppHandler<A, Z2> {
113        AppHandler {
114            pipeline: self.pipeline,
115            authenticator: self.authenticator,
116            authorizer,
117            request_seq: self.request_seq,
118            request_log: self.request_log,
119            directive_admin: self.directive_admin,
120            metrics: self.metrics,
121            require_tls_for_mutation: self.require_tls_for_mutation,
122            debug_endpoints: self.debug_endpoints,
123            capture: self.capture,
124            forward_policy: self.forward_policy,
125        }
126    }
127
128    /// Sets the client-to-upstream header forwarding policy (builder style).
129    /// Default pass-all (sidecar trust). Restrict it to keep specific headers
130    /// (e.g. `authorization`) off the cluster, or disable forwarding entirely.
131    #[must_use]
132    pub fn with_forward_policy(mut self, policy: ForwardPolicy) -> Self {
133        self.forward_policy = policy;
134        self
135    }
136
137    /// Sets the full-fidelity traffic capture (builder style). Off by default.
138    /// Compose redaction with `capture::RedactingCapture`; the stream carries
139    /// bodies and values, so treat it as privileged.
140    #[must_use]
141    pub fn with_capture(mut self, capture: Box<dyn Capture>) -> Self {
142        self.capture = capture;
143        self
144    }
145
146    /// Sets whether the pre-auth `/debug/explain` and `/debug/breakglass`
147    /// surfaces are served (builder style). Default `true`; set `false` in
148    /// production so operational metadata is not exposed unauthenticated.
149    #[must_use]
150    pub fn with_debug_endpoints(mut self, enabled: bool) -> Self {
151        self.debug_endpoints = enabled;
152        self
153    }
154
155    /// Sets whether body-mutating requests are refused over cleartext (NFR-S1).
156    /// Builder style; default `true` (enforce). Pass `false` only on a trusted
157    /// network where the operator accepts mutating over cleartext.
158    #[must_use]
159    pub fn with_require_tls_for_mutation(mut self, require: bool) -> Self {
160        self.require_tls_for_mutation = require;
161        self
162    }
163
164    /// The pipeline this handler serves, a read-only accessor for introspection
165    /// (e.g. the perf harness reading upstream `pool_stats` after a load run).
166    #[must_use]
167    pub fn pipeline(&self) -> &AppPipeline {
168        &self.pipeline
169    }
170
171    /// Builds the shape-only `/metrics` snapshot JSON: request tallies plus every
172    /// configured cluster's upstream pool-reuse counters. No tenant data, so it is
173    /// always safe to expose.
174    fn metrics_snapshot(&self) -> String {
175        let pools = self
176            .pipeline
177            .sink()
178            .pool_stats_all()
179            .into_iter()
180            .map(|(id, s)| PoolSnapshot {
181                cluster: id.as_str().to_owned(),
182                opened: s.opened,
183                dispatched: s.dispatched,
184                reused: s.reused(),
185            })
186            .collect();
187        self.metrics.snapshot(pools).to_json()
188    }
189
190    /// Sets the structured per-request logger (builder style). Default: no logs.
191    #[must_use]
192    pub fn with_request_log(mut self, request_log: Box<dyn RequestLog>) -> Self {
193        self.request_log = request_log;
194        self
195    }
196
197    /// Enables the `POST /admin/directives` channel (builder style): publishes a
198    /// fleet directive set into `store` when the request carries the bearer
199    /// `token`. Without this, the endpoint reports `not_enabled`.
200    #[must_use]
201    pub fn with_directive_admin(
202        mut self,
203        store: Arc<InMemoryDirectiveStore>,
204        token: String,
205        clock: Arc<dyn Clock>,
206    ) -> Self {
207        self.directive_admin = Some(DirectiveAdmin {
208            store,
209            token,
210            clock,
211        });
212        self
213    }
214
215    /// A per-request correlation id. A monotonic counter is enough for the
216    /// reference binary; a real deployment would carry a propagated trace id.
217    fn next_request_id(&self) -> RequestId {
218        let n = self.request_seq.fetch_add(1, Ordering::Relaxed) + 1;
219        RequestId::from(format!("req-{n}").as_str())
220    }
221
222    /// Handles `POST /admin/directives`: publishes a fleet directive set into the
223    /// shared store when enabled and the bearer token matches. Fail-closed at
224    /// every step, disabled, wrong method, bad token, or malformed body all leave
225    /// the active set unchanged.
226    fn publish_directives(&self, req: &IngressRequest) -> IngressResponse {
227        let Some(admin) = &self.directive_admin else {
228            return IngressResponse::json(404, br#"{"error":"not_enabled"}"#.to_vec());
229        };
230        if req.method != HttpMethod::Post {
231            return IngressResponse::json(405, br#"{"error":"method_not_allowed"}"#.to_vec());
232        }
233        // Publishing a fleet directive set is a privileged mutation carrying a
234        // bearer token; refuse it over cleartext (same NFR-S1 stance as the data
235        // plane) so the token is never exposed on the wire. The introspection
236        // routes short-circuit before the data-plane TLS gate, so enforce it here.
237        if self.require_tls_for_mutation && !req.secure {
238            return IngressResponse::json(403, br#"{"error":"tls_required"}"#.to_vec());
239        }
240        if !crate::bearer::matches(&req.headers, &admin.token) {
241            return IngressResponse::json(401, br#"{"error":"unauthorized"}"#.to_vec());
242        }
243        match decode_directive_set(&req.body, admin.clock.as_ref()) {
244            Ok(set) => {
245                let count = set.len();
246                admin.store.publish(set);
247                IngressResponse::json(200, format!(r#"{{"published":{count}}}"#).into_bytes())
248            }
249            Err(reason) => {
250                IngressResponse::json(400, format!(r#"{{"error":"{reason}"}}"#).into_bytes())
251            }
252        }
253    }
254
255    /// The pre-auth introspection and control-plane surfaces, in one place: the
256    /// shape-only `/debug/*` tools, the always-on `/metrics` snapshot, and the
257    /// token-gated `/admin/directives` (GET reads, POST publishes). Returns `Some`
258    /// when `req` targets one of them, else `None` (the request is data plane).
259    fn introspection_route(&self, req: &IngressRequest) -> Option<IngressResponse> {
260        // /debug/*: the shape-only diagnostics surfaces, served only when enabled
261        // (off in production so operational metadata is not exposed unauthenticated).
262        // Disabled, they report `not_enabled` rather than 404, to distinguish "turned
263        // off here" from "no such route".
264        if req.path.starts_with("/debug/") {
265            if !self.debug_endpoints {
266                return Some(IngressResponse::json(
267                    404,
268                    br#"{"error":"not_enabled"}"#.to_vec(),
269                ));
270            }
271            // /debug/explain/{id}: the shape-only causal trace for one request.
272            if let Some(id) = req.path.strip_prefix("/debug/explain/") {
273                return Some(match self.pipeline.explain(&RequestId::from(id)) {
274                    Some(doc) => IngressResponse::json(200, doc.to_string().into_bytes()),
275                    None => {
276                        IngressResponse::json(404, br#"{"error":"unknown_request_id"}"#.to_vec())
277                    }
278                });
279            }
280            // /debug/breakglass: the forensic tape captured under a ring_buffer
281            // directive (`docs/05` §5), oldest first. Shape-only like the explain doc.
282            if req.path == "/debug/breakglass" {
283                let tape = serde_json::Value::Array(self.pipeline.break_glass().snapshot());
284                return Some(IngressResponse::json(200, tape.to_string().into_bytes()));
285            }
286        }
287        // /metrics: the always-on, prod-safe operational snapshot (shape-only
288        // counts/rates/cluster ids, so no auth; see `metrics_snapshot`).
289        if req.path == "/metrics" {
290            return Some(IngressResponse::json(
291                200,
292                self.metrics_snapshot().into_bytes(),
293            ));
294        }
295        // /admin/directives: privileged control-plane settings, GET introspects
296        // what this instance applies, POST publishes a new set; both token-gated
297        // and fail-closed (a forged token reveals/changes nothing, `docs/05` §3).
298        if req.path == "/admin/directives" {
299            return Some(match req.method {
300                HttpMethod::Get => self.introspect_directives(req),
301                _ => self.publish_directives(req),
302            });
303        }
304        None
305    }
306
307    /// Handles `GET /admin/directives`: returns the control-plane settings this
308    /// instance is currently applying, the read side of the directive store, so
309    /// an agent can see what is in effect (per instance; the replicating store
310    /// keeps the fleet consistent). Token-gated like the publish path (the
311    /// targeting selectors are operator config) and fail-closed: disabled or a bad
312    /// token reveals nothing.
313    fn introspect_directives(&self, req: &IngressRequest) -> IngressResponse {
314        let Some(admin) = &self.directive_admin else {
315            return IngressResponse::json(404, br#"{"error":"not_enabled"}"#.to_vec());
316        };
317        if !crate::bearer::matches(&req.headers, &admin.token) {
318            return IngressResponse::json(401, br#"{"error":"unauthorized"}"#.to_vec());
319        }
320        let view = admin.store.load().introspect(admin.clock.now());
321        IngressResponse::json(200, view.to_string().into_bytes())
322    }
323}
324
325impl<A: Authenticator, Z: Authorizer> AppHandler<A, Z> {
326    /// The shared pre-dispatch gate for both the buffered and streamed paths:
327    /// refuse mutation over cleartext (NFR-S1), authenticate (the bearer token is
328    /// consumed here, never reaching the pipeline or telemetry), and authorize.
329    /// Returns the resolved principal, or the error response to return verbatim.
330    async fn gate(
331        &self,
332        req: &IngressRequest,
333        request_id: &RequestId,
334    ) -> Result<Principal, IngressResponse> {
335        if self.require_tls_for_mutation && req.endpoint.is_tenancy_aware() && !req.secure {
336            return Err(
337                IngressResponse::json(403, br#"{"error":"tls_required"}"#.to_vec())
338                    .with_header("x-request-id", request_id.as_str()),
339            );
340        }
341        let principal = self
342            .authenticator
343            .authenticate(&credentials_from(req))
344            .await
345            .map_err(|err| {
346                IngressResponse::json(err.http_status(), auth_error_body(&err))
347                    .with_header("x-request-id", request_id.as_str())
348            })?;
349        let action = Action {
350            endpoint: req.endpoint,
351            logical_index: req.logical_index.clone(),
352        };
353        self.authorizer
354            .authorize(&principal, &action)
355            .await
356            .map_err(|err| {
357                IngressResponse::json(err.http_status(), auth_error_body(&err))
358                    .with_header("x-request-id", request_id.as_str())
359            })?;
360        Ok(principal)
361    }
362
363    /// Maps a streamed pipeline outcome to a response, tallying side effects, the
364    /// shared tail of the streaming forward and bulk paths.
365    fn finish_streamed(
366        &self,
367        req: &IngressRequest,
368        request_id: &RequestId,
369        result: Result<PipelineResponse, RequestError>,
370        should_capture: bool,
371    ) -> IngressResponse {
372        let (response, ok) = match result {
373            Ok(resp) => {
374                let ok = (200..300).contains(&resp.status);
375                (ingress_from(resp), ok)
376            }
377            Err(err) => (
378                IngressResponse::json(status_for(&err), error_body(&err)),
379                false,
380            ),
381        };
382        self.after_response(req, &response, request_id, ok, should_capture);
383        response.with_header("x-request-id", request_id.as_str())
384    }
385}
386
387impl<A: Authenticator, Z: Authorizer> IngressHandler for AppHandler<A, Z> {
388    async fn handle(&self, req: IngressRequest) -> IngressResponse {
389        let request_id = self.next_request_id();
390
391        // Introspection + admin surfaces short-circuit before auth; the data plane
392        // continues below.
393        if let Some(resp) = self.introspection_route(&req) {
394            return resp;
395        }
396
397        let principal = match self.gate(&req, &request_id).await {
398            Ok(principal) => principal,
399            Err(resp) => return resp,
400        };
401
402        // The credentials were consumed above; strip the `Authorization` header so
403        // the bearer token never reaches the pipeline, observability, or logs. The
404        // partition header, `traceparent`, and `x-debug-directive` are preserved,
405        // the engine still needs them.
406        let safe_headers = crate::bearer::without_authorization(&req.headers);
407        // The forwarded set is computed from the *raw* headers (so the client
408        // `Authorization` is available per the pass-all default), independent of
409        // the auth-stripped view the pipeline routes on.
410        let forward = self.forward_policy.forward_set(&req.headers);
411        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
412
413        // Echo the request id so a client (or LLM) can fetch its
414        // /debug/explain/{id} afterward. `should_capture` is the live per-request
415        // capture decision, applied to both success and error responses.
416        let (result, should_capture) = self.pipeline.handle_with_capture(&ctx).await;
417        let (response, ok) = match result {
418            Ok(resp) => {
419                let ok = (200..300).contains(&resp.status);
420                (ingress_from(resp), ok)
421            }
422            Err(err) => (
423                IngressResponse::json(status_for(&err), error_body(&err)),
424                false,
425            ),
426        };
427        self.after_response(&req, &response, &request_id, ok, should_capture);
428        response.with_header("x-request-id", request_id.as_str())
429    }
430
431    fn forward_plan(&self, path: &str, logical_index: &str) -> bool {
432        // Full-fidelity capture tees the raw exchange, which needs the body in
433        // memory, so when capture is wired, buffer (take the `handle` path) rather
434        // than stream. Streaming and capture are mutually exclusive by nature.
435        if self.capture.enabled() {
436            return false;
437        }
438        // Never stream-forward the proxy-internal surfaces, they are served
439        // pre-auth in `handle` and must not be forwarded to a cluster, even under
440        // a whole-instance passthrough policy (which matches every index).
441        if path.starts_with("/debug/") || path == "/metrics" || path == "/admin/directives" {
442            return false;
443        }
444        self.pipeline.is_passthrough(logical_index)
445    }
446
447    async fn handle_forward(&self, req: IngressRequest, body: Incoming) -> StreamingResponse {
448        let request_id = self.next_request_id();
449        // `forward_plan` already excluded the introspection routes; apply the same
450        // TLS + auth + authz gate as the buffered path before forwarding.
451        let principal = match self.gate(&req, &request_id).await {
452            Ok(principal) => principal,
453            // The gate's refusal is a small buffered error; carry it as a streaming
454            // response so both arms share one return type.
455            Err(resp) => return to_streaming(resp),
456        };
457
458        let safe_headers = crate::bearer::without_authorization(&req.headers);
459        // The forwarded set is computed from the *raw* headers (so the client
460        // `Authorization` is available per the pass-all default), independent of
461        // the auth-stripped view the pipeline routes on.
462        let forward = self.forward_policy.forward_set(&req.headers);
463        // The body is the streamed `body` argument, not `req.body` (empty here).
464        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
465
466        // Both directions stream: the request body pipes upstream, the upstream
467        // response pipes back, neither buffered.
468        let upstream = osproxy_sink::stream_body(body);
469        let (result, _capture) = self.pipeline.forward_streamed(&ctx, upstream).await;
470        let response = match result {
471            Ok(forward) => {
472                self.after_streamed(&request_id, (200..300).contains(&forward.status));
473                let mut response = StreamingResponse::stream(forward.status, forward.body);
474                // Carry the upstream content type so a non-JSON verbatim forward
475                // (e.g. a passed-through `_cat`) is not relabeled `application/json`.
476                if let Some(content_type) = forward.content_type {
477                    response = response.with_header("content-type", content_type);
478                }
479                response
480            }
481            Err(err) => {
482                self.after_streamed(&request_id, false);
483                StreamingResponse::buffered(status_for(&err), error_body(&err))
484            }
485        };
486        response.with_header("x-request-id", request_id.as_str())
487    }
488
489    fn wants_search_stream(&self, endpoint: EndpointKind, query: Option<&str>) -> bool {
490        // Capture must tee the buffered response, so streaming and capture are
491        // mutually exclusive. A scroll-opening search keeps the buffered path: its
492        // `_scroll_id` affinity wrap needs the whole response body. A PIT-pinned
493        // search is detected from the body inside the engine, which falls back to
494        // buffered there.
495        endpoint == EndpointKind::Search && !self.capture.enabled() && !opens_scroll(query)
496    }
497
498    async fn handle_search_stream(&self, req: IngressRequest) -> StreamingResponse {
499        let request_id = self.next_request_id();
500        // `wants_search_stream` does not gate; apply the same TLS + auth + authz
501        // gate as the buffered path before dispatching.
502        let principal = match self.gate(&req, &request_id).await {
503            Ok(principal) => principal,
504            Err(resp) => return to_streaming(resp),
505        };
506        let safe_headers = crate::bearer::without_authorization(&req.headers);
507        // The forwarded set is computed from the *raw* headers (so the client
508        // `Authorization` is available per the pass-all default), independent of
509        // the auth-stripped view the pipeline routes on.
510        let forward = self.forward_policy.forward_set(&req.headers);
511        // Unlike the forward/bulk streams, the search query body *is* needed and is
512        // the buffered `req.body`; only the response streams.
513        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
514
515        let (result, _capture) = self.pipeline.search_streamed(&ctx).await;
516        let response = match result {
517            Ok(search) => {
518                self.after_streamed(&request_id, (200..300).contains(&search.status));
519                StreamingResponse::stream(search.status, search.body)
520            }
521            Err(err) => {
522                self.after_streamed(&request_id, false);
523                StreamingResponse::buffered(status_for(&err), error_body(&err))
524            }
525        };
526        response.with_header("x-request-id", request_id.as_str())
527    }
528
529    fn wants_bulk_stream(&self, endpoint: EndpointKind, headers: &[(String, String)]) -> bool {
530        // Capture must tee the buffered body, so streaming and capture are mutually
531        // exclusive; only sync `_bulk` streams (async fan-out keeps the buffered
532        // path, which enqueues per item).
533        endpoint == EndpointKind::IngestBulk
534            && !self.capture.enabled()
535            && self.pipeline.is_sync_write(headers)
536    }
537
538    async fn handle_bulk_stream(&self, req: IngressRequest, body: Incoming) -> IngressResponse {
539        let request_id = self.next_request_id();
540        let principal = match self.gate(&req, &request_id).await {
541            Ok(principal) => principal,
542            Err(resp) => return resp,
543        };
544        let safe_headers = crate::bearer::without_authorization(&req.headers);
545        // The forwarded set is computed from the *raw* headers (so the client
546        // `Authorization` is available per the pass-all default), independent of
547        // the auth-stripped view the pipeline routes on.
548        let forward = self.forward_policy.forward_set(&req.headers);
549        // The body is the streamed NDJSON batch, not `req.body` (empty here).
550        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
551
552        let stream = osproxy_sink::stream_body(body);
553        let (result, should_capture) = self.pipeline.handle_bulk_streamed(&ctx, stream).await;
554        self.finish_streamed(&req, &request_id, result, should_capture)
555    }
556}
557
558impl<A, Z> AppHandler<A, Z> {
559    /// Post-response side effects: tally metrics (shape-only), emit the structured
560    /// log (opt-in), and tee the full-fidelity capture (opt-in).
561    fn after_response(
562        &self,
563        req: &IngressRequest,
564        response: &IngressResponse,
565        request_id: &RequestId,
566        ok: bool,
567        should_capture: bool,
568    ) {
569        self.metrics.record(ok);
570        // The structured log is the shape-only explain document, which carries the
571        // request's trace_id, so logs join the trace/spans.
572        if self.request_log.enabled() {
573            if let Some(record) = self.pipeline.explain(request_id) {
574                self.request_log.emit(&record);
575            }
576        }
577        self.tee_capture(req, response, request_id, should_capture);
578    }
579
580    /// Post-response side effects for a **streamed** response (no body retained):
581    /// tally metrics and emit the structured log. Capture is never available on a
582    /// streamed path (there is no buffered body to tee), so it is not attempted.
583    fn after_streamed(&self, request_id: &RequestId, ok: bool) {
584        self.metrics.record(ok);
585        if self.request_log.enabled() {
586            if let Some(record) = self.pipeline.explain(request_id) {
587                self.request_log.emit(&record);
588            }
589        }
590    }
591
592    /// Full-fidelity capture: tee the raw exchange for replay/audit when a capture
593    /// sink is wired *and* `should_capture` (the live directive decision) selected
594    /// this request, so capture is on demand, not whenever a sink exists. The
595    /// original request headers pass through; redaction (e.g. dropping
596    /// `Authorization`) is composed via `RedactingCapture`.
597    fn tee_capture(
598        &self,
599        req: &IngressRequest,
600        response: &IngressResponse,
601        request_id: &RequestId,
602        should_capture: bool,
603    ) {
604        if !should_capture || !self.capture.enabled() {
605            return;
606        }
607        self.capture.capture(&CaptureRecord {
608            request_id: request_id.as_str(),
609            method: req.method,
610            path: &req.path,
611            query: req.query.as_deref(),
612            headers: &req.headers,
613            body: &req.body,
614            response_status: response.status,
615            response_body: &response.body,
616        });
617    }
618}
619
620/// Whether the query string opens a scroll (`scroll=…`). Such a search returns a
621/// `_scroll_id` that must be affinity-wrapped against the whole response body, so
622/// it keeps the buffered path rather than streaming.
623fn opens_scroll(query: Option<&str>) -> bool {
624    query.is_some_and(|q| {
625        q.split('&')
626            .any(|p| p == "scroll" || p.starts_with("scroll="))
627    })
628}
629
630/// Carries a small buffered [`IngressResponse`] (e.g. an auth refusal) as a
631/// [`StreamingResponse`], preserving its status and headers, so the streamed
632/// forward path has one return type for both the gate refusal and the stream.
633fn to_streaming(resp: IngressResponse) -> StreamingResponse {
634    let mut streaming = StreamingResponse::buffered(resp.status, resp.body);
635    streaming.headers = resp.headers;
636    streaming
637}
638
639/// Builds the engine [`RequestCtx`] from an authenticated request, the one place
640/// the four data-plane entry points (`handle`, `handle_forward`,
641/// `handle_search_stream`, `handle_bulk_stream`) share, so they cannot drift in
642/// which fields they wire. The borrows (`principal`, `request_id`, `safe_headers`)
643/// are caller-owned locals; the body always rides as `req.body` (empty on the
644/// streamed paths, where the real body travels beside the ctx).
645fn build_ctx<'a>(
646    req: &'a IngressRequest,
647    principal: &'a Principal,
648    request_id: &'a RequestId,
649    safe_headers: &'a [(String, String)],
650    forward_headers: &'a [(String, String)],
651) -> RequestCtx<'a> {
652    RequestCtx::new(
653        principal,
654        request_id,
655        req.method,
656        req.endpoint,
657        req.protocol,
658        &req.logical_index,
659        HeaderView::new(safe_headers),
660        &req.body,
661    )
662    .with_doc_id(req.doc_id.as_deref())
663    .with_query(req.query.as_deref())
664    .with_path(&req.path)
665    .with_forward_headers(forward_headers)
666}
667
668/// Builds the ingress response from a pipeline response, carrying its content type
669/// when set, so a verbatim admin/passthrough body (e.g. `_cat` `text/plain`) is
670/// not mislabeled `application/json`. A shaped response leaves it `None` and the
671/// transport defaults to JSON.
672fn ingress_from(resp: PipelineResponse) -> IngressResponse {
673    let out = IngressResponse::json(resp.status, resp.body);
674    match resp.content_type {
675        Some(content_type) => out.with_header("content-type", content_type),
676        None => out,
677    }
678}
679
680/// Extracts client credentials from a request: a bearer token from
681/// `Authorization` and the verified mTLS client-certificate identity, if any.
682fn credentials_from(req: &IngressRequest) -> ClientCredentials {
683    ClientCredentials {
684        bearer_token: crate::bearer::parse(&req.headers).map(str::to_owned),
685        client_cert_subject: req.client_cert_subject.clone(),
686    }
687}
688
689/// A value-free JSON body for an auth failure.
690fn auth_error_body(err: &AuthError) -> Vec<u8> {
691    format!(r#"{{"error":"{}"}}"#, err.code().as_slug()).into_bytes()
692}
693
694/// Maps a request-path error to an HTTP status, by its stable code.
695fn status_for(err: &RequestError) -> u16 {
696    match err.code() {
697        ErrorCode::PartitionUnresolved | ErrorCode::UnsupportedEndpoint => 400,
698        ErrorCode::AuthFailed => 401,
699        ErrorCode::Unauthorized => 403,
700        ErrorCode::PlacementMissing => 404,
701        ErrorCode::StaleEpoch => 409,
702        ErrorCode::PayloadTooLarge => 413,
703        ErrorCode::UpstreamFailed => 502,
704        ErrorCode::PlacementBackendUnavailable | ErrorCode::Overloaded => 503,
705        // ErrorCode is non-exhaustive; an unmapped code is an internal fault.
706        _ => 500,
707    }
708}
709
710/// A value-free JSON error body carrying the stable code and retryability, so a
711/// client or LLM can act on it without any tenant data leaking (NFR-S2).
712fn error_body(err: &RequestError) -> Vec<u8> {
713    format!(
714        r#"{{"error":"{}","retryable":{}}}"#,
715        err.code().as_slug(),
716        err.retryable(),
717    )
718    .into_bytes()
719}
720
721#[cfg(test)]
722#[path = "handler_tests.rs"]
723mod tests;