osproxy_server/handler.rs
1//! The ingress handler: authenticates the caller, builds a request context, and
2//! drives the engine pipeline, mapping the outcome to an HTTP response.
3//
4// JUSTIFY(file-length): the single ingress-orchestration point, pre-auth
5// introspection routing, the TLS and auth/authz gates, and data-plane dispatch
6// are one cohesive flow over the handler's private state; splitting it would
7// force those fields pub(crate) and scatter the request lifecycle across files.
8
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use osproxy_core::{Clock, EndpointKind, ErrorCode, RequestId};
13use osproxy_engine::{Pipeline, PipelineResponse, RequestError};
14use osproxy_observe::{
15 decode_directive_set, DirectiveStore, InMemoryDirectiveStore, Metrics, PoolSnapshot,
16};
17use osproxy_sink::OpenSearchSink;
18use osproxy_spi::{
19 Action, AuthError, Authenticator, Authorizer, ClientCredentials, HeaderView, HttpMethod,
20 Principal, RequestCtx,
21};
22use osproxy_tenancy::TenancyRouter;
23use osproxy_transport::{
24 Incoming, IngressHandler, IngressRequest, IngressResponse, StreamingResponse,
25};
26
27use crate::auth::AllowAllAuthorizer;
28use crate::forward_headers::ForwardPolicy;
29use crate::log::{NoLog, RequestLog};
30use crate::tenancy::ReferenceTenancy;
31use osproxy_capture::{Capture, CaptureRecord, NoCapture};
32
33/// The privileged fleet-directive admin channel: a shared store to publish into,
34/// gated by a bearer token, with a clock to resolve relative TTLs.
35struct DirectiveAdmin {
36 store: Arc<InMemoryDirectiveStore>,
37 token: String,
38 clock: Arc<dyn Clock>,
39}
40
41/// The concrete pipeline this binary serves.
42pub type AppPipeline = Pipeline<TenancyRouter<ReferenceTenancy>, OpenSearchSink>;
43
44/// Adapts the engine pipeline to the transport's [`IngressHandler`] contract,
45/// authenticating each request with the configured [`Authenticator`] and, after
46/// authentication, authorizing it with the configured [`Authorizer`] (default
47/// [`AllowAllAuthorizer`], no second policy layer until one is supplied).
48pub struct AppHandler<A, Z = AllowAllAuthorizer> {
49 pipeline: AppPipeline,
50 authenticator: A,
51 authorizer: Z,
52 request_seq: AtomicU64,
53 request_log: Box<dyn RequestLog>,
54 directive_admin: Option<DirectiveAdmin>,
55 metrics: Metrics,
56 /// When true (default), a body-mutating request over cleartext is refused
57 /// (NFR-S1), the proxy must terminate TLS to rewrite the stream. An operator
58 /// on a trusted network can opt out.
59 require_tls_for_mutation: bool,
60 /// When true (default), the pre-auth `/debug/explain` and `/debug/breakglass`
61 /// surfaces are served. They are shape-only, but still expose operational
62 /// metadata to anyone who can reach the port, so production deployments turn
63 /// them off; disabled, both report `not_enabled` (`/metrics` stays on).
64 debug_endpoints: bool,
65 /// Full-fidelity traffic capture (off by default). When enabled, each
66 /// forwarded data-plane exchange is teed to this sink for replay/audit. Unlike
67 /// the shape-only telemetry, the records carry bodies and values, so capture
68 /// is deliberate and the stream is privileged (`capture` module).
69 capture: Box<dyn Capture>,
70 /// Which client headers to relay verbatim to the upstream. Default pass-all
71 /// (sidecar trust), minus the mandatory hop-by-hop/framing set. Computed from
72 /// the *raw* request headers (so the client `Authorization` is available),
73 /// separate from the auth-stripped view the pipeline routes on.
74 forward_policy: ForwardPolicy,
75}
76
77impl<A, Z> std::fmt::Debug for AppHandler<A, Z> {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 // The injected logger is not `Debug`; show whether it is enabled.
80 f.debug_struct("AppHandler")
81 .field("logging", &self.request_log.enabled())
82 .finish_non_exhaustive()
83 }
84}
85
86impl<A: Authenticator> AppHandler<A, AllowAllAuthorizer> {
87 /// Wraps a pipeline and an authenticator (no request logging by default, and
88 /// the allow-all authorizer until [`Self::with_authorizer`] supplies one).
89 #[must_use]
90 pub fn new(pipeline: AppPipeline, authenticator: A) -> Self {
91 Self {
92 pipeline,
93 authenticator,
94 authorizer: AllowAllAuthorizer,
95 request_seq: AtomicU64::new(0),
96 request_log: Box::new(NoLog),
97 directive_admin: None,
98 metrics: Metrics::new(),
99 require_tls_for_mutation: true,
100 debug_endpoints: true,
101 capture: Box::new(NoCapture),
102 forward_policy: ForwardPolicy::pass_all(),
103 }
104 }
105}
106
107impl<A: Authenticator, Z: Authorizer> AppHandler<A, Z> {
108 /// Sets the post-authentication [`Authorizer`] (builder style). Replaces the
109 /// default allow-all policy; the principal is already resolved, so the
110 /// authorizer decides only whether that principal may perform the action.
111 #[must_use]
112 pub fn with_authorizer<Z2: Authorizer>(self, authorizer: Z2) -> AppHandler<A, Z2> {
113 AppHandler {
114 pipeline: self.pipeline,
115 authenticator: self.authenticator,
116 authorizer,
117 request_seq: self.request_seq,
118 request_log: self.request_log,
119 directive_admin: self.directive_admin,
120 metrics: self.metrics,
121 require_tls_for_mutation: self.require_tls_for_mutation,
122 debug_endpoints: self.debug_endpoints,
123 capture: self.capture,
124 forward_policy: self.forward_policy,
125 }
126 }
127
128 /// Sets the client-to-upstream header forwarding policy (builder style).
129 /// Default pass-all (sidecar trust). Restrict it to keep specific headers
130 /// (e.g. `authorization`) off the cluster, or disable forwarding entirely.
131 #[must_use]
132 pub fn with_forward_policy(mut self, policy: ForwardPolicy) -> Self {
133 self.forward_policy = policy;
134 self
135 }
136
137 /// Sets the full-fidelity traffic capture (builder style). Off by default.
138 /// Compose redaction with `capture::RedactingCapture`; the stream carries
139 /// bodies and values, so treat it as privileged.
140 #[must_use]
141 pub fn with_capture(mut self, capture: Box<dyn Capture>) -> Self {
142 self.capture = capture;
143 self
144 }
145
146 /// Sets whether the pre-auth `/debug/explain` and `/debug/breakglass`
147 /// surfaces are served (builder style). Default `true`; set `false` in
148 /// production so operational metadata is not exposed unauthenticated.
149 #[must_use]
150 pub fn with_debug_endpoints(mut self, enabled: bool) -> Self {
151 self.debug_endpoints = enabled;
152 self
153 }
154
155 /// Sets whether body-mutating requests are refused over cleartext (NFR-S1).
156 /// Builder style; default `true` (enforce). Pass `false` only on a trusted
157 /// network where the operator accepts mutating over cleartext.
158 #[must_use]
159 pub fn with_require_tls_for_mutation(mut self, require: bool) -> Self {
160 self.require_tls_for_mutation = require;
161 self
162 }
163
164 /// The pipeline this handler serves, a read-only accessor for introspection
165 /// (e.g. the perf harness reading upstream `pool_stats` after a load run).
166 #[must_use]
167 pub fn pipeline(&self) -> &AppPipeline {
168 &self.pipeline
169 }
170
171 /// Builds the shape-only `/metrics` snapshot JSON: request tallies plus every
172 /// configured cluster's upstream pool-reuse counters. No tenant data, so it is
173 /// always safe to expose.
174 fn metrics_snapshot(&self) -> String {
175 let pools = self
176 .pipeline
177 .sink()
178 .pool_stats_all()
179 .into_iter()
180 .map(|(id, s)| PoolSnapshot {
181 cluster: id.as_str().to_owned(),
182 opened: s.opened,
183 dispatched: s.dispatched,
184 reused: s.reused(),
185 })
186 .collect();
187 self.metrics.snapshot(pools).to_json()
188 }
189
190 /// Sets the structured per-request logger (builder style). Default: no logs.
191 #[must_use]
192 pub fn with_request_log(mut self, request_log: Box<dyn RequestLog>) -> Self {
193 self.request_log = request_log;
194 self
195 }
196
197 /// Enables the `POST /admin/directives` channel (builder style): publishes a
198 /// fleet directive set into `store` when the request carries the bearer
199 /// `token`. Without this, the endpoint reports `not_enabled`.
200 #[must_use]
201 pub fn with_directive_admin(
202 mut self,
203 store: Arc<InMemoryDirectiveStore>,
204 token: String,
205 clock: Arc<dyn Clock>,
206 ) -> Self {
207 self.directive_admin = Some(DirectiveAdmin {
208 store,
209 token,
210 clock,
211 });
212 self
213 }
214
215 /// A per-request correlation id. A monotonic counter is enough for the
216 /// reference binary; a real deployment would carry a propagated trace id.
217 fn next_request_id(&self) -> RequestId {
218 let n = self.request_seq.fetch_add(1, Ordering::Relaxed) + 1;
219 RequestId::from(format!("req-{n}").as_str())
220 }
221
222 /// Handles `POST /admin/directives`: publishes a fleet directive set into the
223 /// shared store when enabled and the bearer token matches. Fail-closed at
224 /// every step, disabled, wrong method, bad token, or malformed body all leave
225 /// the active set unchanged.
226 fn publish_directives(&self, req: &IngressRequest) -> IngressResponse {
227 let Some(admin) = &self.directive_admin else {
228 return IngressResponse::json(404, br#"{"error":"not_enabled"}"#.to_vec());
229 };
230 if req.method != HttpMethod::Post {
231 return IngressResponse::json(405, br#"{"error":"method_not_allowed"}"#.to_vec());
232 }
233 // Publishing a fleet directive set is a privileged mutation carrying a
234 // bearer token; refuse it over cleartext (same NFR-S1 stance as the data
235 // plane) so the token is never exposed on the wire. The introspection
236 // routes short-circuit before the data-plane TLS gate, so enforce it here.
237 if self.require_tls_for_mutation && !req.secure {
238 return IngressResponse::json(403, br#"{"error":"tls_required"}"#.to_vec());
239 }
240 if !crate::bearer::matches(&req.headers, &admin.token) {
241 return IngressResponse::json(401, br#"{"error":"unauthorized"}"#.to_vec());
242 }
243 match decode_directive_set(&req.body, admin.clock.as_ref()) {
244 Ok(set) => {
245 let count = set.len();
246 admin.store.publish(set);
247 IngressResponse::json(200, format!(r#"{{"published":{count}}}"#).into_bytes())
248 }
249 Err(reason) => {
250 IngressResponse::json(400, format!(r#"{{"error":"{reason}"}}"#).into_bytes())
251 }
252 }
253 }
254
255 /// The pre-auth introspection and control-plane surfaces, in one place: the
256 /// shape-only `/debug/*` tools, the always-on `/metrics` snapshot, and the
257 /// token-gated `/admin/directives` (GET reads, POST publishes). Returns `Some`
258 /// when `req` targets one of them, else `None` (the request is data plane).
259 fn introspection_route(&self, req: &IngressRequest) -> Option<IngressResponse> {
260 // /debug/*: the shape-only diagnostics surfaces, served only when enabled
261 // (off in production so operational metadata is not exposed unauthenticated).
262 // Disabled, they report `not_enabled` rather than 404, to distinguish "turned
263 // off here" from "no such route".
264 if req.path.starts_with("/debug/") {
265 if !self.debug_endpoints {
266 return Some(IngressResponse::json(
267 404,
268 br#"{"error":"not_enabled"}"#.to_vec(),
269 ));
270 }
271 // /debug/explain/{id}: the shape-only causal trace for one request.
272 if let Some(id) = req.path.strip_prefix("/debug/explain/") {
273 return Some(match self.pipeline.explain(&RequestId::from(id)) {
274 Some(doc) => IngressResponse::json(200, doc.to_string().into_bytes()),
275 None => {
276 IngressResponse::json(404, br#"{"error":"unknown_request_id"}"#.to_vec())
277 }
278 });
279 }
280 // /debug/breakglass: the forensic tape captured under a ring_buffer
281 // directive (`docs/05` §5), oldest first. Shape-only like the explain doc.
282 if req.path == "/debug/breakglass" {
283 let tape = serde_json::Value::Array(self.pipeline.break_glass().snapshot());
284 return Some(IngressResponse::json(200, tape.to_string().into_bytes()));
285 }
286 }
287 // /metrics: the always-on, prod-safe operational snapshot (shape-only
288 // counts/rates/cluster ids, so no auth; see `metrics_snapshot`).
289 if req.path == "/metrics" {
290 return Some(IngressResponse::json(
291 200,
292 self.metrics_snapshot().into_bytes(),
293 ));
294 }
295 // /admin/directives: privileged control-plane settings, GET introspects
296 // what this instance applies, POST publishes a new set; both token-gated
297 // and fail-closed (a forged token reveals/changes nothing, `docs/05` §3).
298 if req.path == "/admin/directives" {
299 return Some(match req.method {
300 HttpMethod::Get => self.introspect_directives(req),
301 _ => self.publish_directives(req),
302 });
303 }
304 None
305 }
306
307 /// Handles `GET /admin/directives`: returns the control-plane settings this
308 /// instance is currently applying, the read side of the directive store, so
309 /// an agent can see what is in effect (per instance; the replicating store
310 /// keeps the fleet consistent). Token-gated like the publish path (the
311 /// targeting selectors are operator config) and fail-closed: disabled or a bad
312 /// token reveals nothing.
313 fn introspect_directives(&self, req: &IngressRequest) -> IngressResponse {
314 let Some(admin) = &self.directive_admin else {
315 return IngressResponse::json(404, br#"{"error":"not_enabled"}"#.to_vec());
316 };
317 if !crate::bearer::matches(&req.headers, &admin.token) {
318 return IngressResponse::json(401, br#"{"error":"unauthorized"}"#.to_vec());
319 }
320 let view = admin.store.load().introspect(admin.clock.now());
321 IngressResponse::json(200, view.to_string().into_bytes())
322 }
323}
324
325impl<A: Authenticator, Z: Authorizer> AppHandler<A, Z> {
326 /// The shared pre-dispatch gate for both the buffered and streamed paths:
327 /// refuse mutation over cleartext (NFR-S1), authenticate (the bearer token is
328 /// consumed here, never reaching the pipeline or telemetry), and authorize.
329 /// Returns the resolved principal, or the error response to return verbatim.
330 async fn gate(
331 &self,
332 req: &IngressRequest,
333 request_id: &RequestId,
334 ) -> Result<Principal, IngressResponse> {
335 if self.require_tls_for_mutation && req.endpoint.is_tenancy_aware() && !req.secure {
336 return Err(
337 IngressResponse::json(403, br#"{"error":"tls_required"}"#.to_vec())
338 .with_header("x-request-id", request_id.as_str()),
339 );
340 }
341 let principal = self
342 .authenticator
343 .authenticate(&credentials_from(req))
344 .await
345 .map_err(|err| {
346 IngressResponse::json(err.http_status(), auth_error_body(&err))
347 .with_header("x-request-id", request_id.as_str())
348 })?;
349 let action = Action {
350 endpoint: req.endpoint,
351 logical_index: req.logical_index.clone(),
352 };
353 self.authorizer
354 .authorize(&principal, &action)
355 .await
356 .map_err(|err| {
357 IngressResponse::json(err.http_status(), auth_error_body(&err))
358 .with_header("x-request-id", request_id.as_str())
359 })?;
360 Ok(principal)
361 }
362
363 /// Maps a streamed pipeline outcome to a response, tallying side effects, the
364 /// shared tail of the streaming forward and bulk paths.
365 fn finish_streamed(
366 &self,
367 req: &IngressRequest,
368 request_id: &RequestId,
369 result: Result<PipelineResponse, RequestError>,
370 should_capture: bool,
371 ) -> IngressResponse {
372 let (response, ok) = match result {
373 Ok(resp) => {
374 let ok = (200..300).contains(&resp.status);
375 (ingress_from(resp), ok)
376 }
377 Err(err) => (
378 IngressResponse::json(status_for(&err), error_body(&err)),
379 false,
380 ),
381 };
382 self.after_response(req, &response, request_id, ok, should_capture);
383 response.with_header("x-request-id", request_id.as_str())
384 }
385}
386
387impl<A: Authenticator, Z: Authorizer> IngressHandler for AppHandler<A, Z> {
388 async fn handle(&self, req: IngressRequest) -> IngressResponse {
389 let request_id = self.next_request_id();
390
391 // Introspection + admin surfaces short-circuit before auth; the data plane
392 // continues below.
393 if let Some(resp) = self.introspection_route(&req) {
394 return resp;
395 }
396
397 let principal = match self.gate(&req, &request_id).await {
398 Ok(principal) => principal,
399 Err(resp) => return resp,
400 };
401
402 // The credentials were consumed above; strip the `Authorization` header so
403 // the bearer token never reaches the pipeline, observability, or logs. The
404 // partition header, `traceparent`, and `x-debug-directive` are preserved,
405 // the engine still needs them.
406 let safe_headers = crate::bearer::without_authorization(&req.headers);
407 // The forwarded set is computed from the *raw* headers (so the client
408 // `Authorization` is available per the pass-all default), independent of
409 // the auth-stripped view the pipeline routes on.
410 let forward = self.forward_policy.forward_set(&req.headers);
411 let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
412
413 // Echo the request id so a client (or LLM) can fetch its
414 // /debug/explain/{id} afterward. `should_capture` is the live per-request
415 // capture decision, applied to both success and error responses.
416 let (result, should_capture) = self.pipeline.handle_with_capture(&ctx).await;
417 let (response, ok) = match result {
418 Ok(resp) => {
419 let ok = (200..300).contains(&resp.status);
420 (ingress_from(resp), ok)
421 }
422 Err(err) => (
423 IngressResponse::json(status_for(&err), error_body(&err)),
424 false,
425 ),
426 };
427 self.after_response(&req, &response, &request_id, ok, should_capture);
428 response.with_header("x-request-id", request_id.as_str())
429 }
430
431 fn forward_plan(&self, path: &str, logical_index: &str) -> bool {
432 // Full-fidelity capture tees the raw exchange, which needs the body in
433 // memory, so when capture is wired, buffer (take the `handle` path) rather
434 // than stream. Streaming and capture are mutually exclusive by nature.
435 if self.capture.enabled() {
436 return false;
437 }
438 // Never stream-forward the proxy-internal surfaces, they are served
439 // pre-auth in `handle` and must not be forwarded to a cluster, even under
440 // a whole-instance passthrough policy (which matches every index).
441 if path.starts_with("/debug/") || path == "/metrics" || path == "/admin/directives" {
442 return false;
443 }
444 self.pipeline.is_passthrough(logical_index)
445 }
446
447 async fn handle_forward(&self, req: IngressRequest, body: Incoming) -> StreamingResponse {
448 let request_id = self.next_request_id();
449 // `forward_plan` already excluded the introspection routes; apply the same
450 // TLS + auth + authz gate as the buffered path before forwarding.
451 let principal = match self.gate(&req, &request_id).await {
452 Ok(principal) => principal,
453 // The gate's refusal is a small buffered error; carry it as a streaming
454 // response so both arms share one return type.
455 Err(resp) => return to_streaming(resp),
456 };
457
458 let safe_headers = crate::bearer::without_authorization(&req.headers);
459 // The forwarded set is computed from the *raw* headers (so the client
460 // `Authorization` is available per the pass-all default), independent of
461 // the auth-stripped view the pipeline routes on.
462 let forward = self.forward_policy.forward_set(&req.headers);
463 // The body is the streamed `body` argument, not `req.body` (empty here).
464 let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
465
466 // Both directions stream: the request body pipes upstream, the upstream
467 // response pipes back, neither buffered.
468 let upstream = osproxy_sink::stream_body(body);
469 let (result, _capture) = self.pipeline.forward_streamed(&ctx, upstream).await;
470 let response = match result {
471 Ok(forward) => {
472 self.after_streamed(&request_id, (200..300).contains(&forward.status));
473 let mut response = StreamingResponse::stream(forward.status, forward.body);
474 // Carry the upstream content type so a non-JSON verbatim forward
475 // (e.g. a passed-through `_cat`) is not relabeled `application/json`.
476 if let Some(content_type) = forward.content_type {
477 response = response.with_header("content-type", content_type);
478 }
479 response
480 }
481 Err(err) => {
482 self.after_streamed(&request_id, false);
483 StreamingResponse::buffered(status_for(&err), error_body(&err))
484 }
485 };
486 response.with_header("x-request-id", request_id.as_str())
487 }
488
489 fn wants_search_stream(&self, endpoint: EndpointKind, query: Option<&str>) -> bool {
490 // Capture must tee the buffered response, so streaming and capture are
491 // mutually exclusive. A scroll-opening search keeps the buffered path: its
492 // `_scroll_id` affinity wrap needs the whole response body. A PIT-pinned
493 // search is detected from the body inside the engine, which falls back to
494 // buffered there.
495 endpoint == EndpointKind::Search && !self.capture.enabled() && !opens_scroll(query)
496 }
497
498 async fn handle_search_stream(&self, req: IngressRequest) -> StreamingResponse {
499 let request_id = self.next_request_id();
500 // `wants_search_stream` does not gate; apply the same TLS + auth + authz
501 // gate as the buffered path before dispatching.
502 let principal = match self.gate(&req, &request_id).await {
503 Ok(principal) => principal,
504 Err(resp) => return to_streaming(resp),
505 };
506 let safe_headers = crate::bearer::without_authorization(&req.headers);
507 // The forwarded set is computed from the *raw* headers (so the client
508 // `Authorization` is available per the pass-all default), independent of
509 // the auth-stripped view the pipeline routes on.
510 let forward = self.forward_policy.forward_set(&req.headers);
511 // Unlike the forward/bulk streams, the search query body *is* needed and is
512 // the buffered `req.body`; only the response streams.
513 let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
514
515 let (result, _capture) = self.pipeline.search_streamed(&ctx).await;
516 let response = match result {
517 Ok(search) => {
518 self.after_streamed(&request_id, (200..300).contains(&search.status));
519 StreamingResponse::stream(search.status, search.body)
520 }
521 Err(err) => {
522 self.after_streamed(&request_id, false);
523 StreamingResponse::buffered(status_for(&err), error_body(&err))
524 }
525 };
526 response.with_header("x-request-id", request_id.as_str())
527 }
528
529 fn wants_bulk_stream(&self, endpoint: EndpointKind, headers: &[(String, String)]) -> bool {
530 // Capture must tee the buffered body, so streaming and capture are mutually
531 // exclusive; only sync `_bulk` streams (async fan-out keeps the buffered
532 // path, which enqueues per item).
533 endpoint == EndpointKind::IngestBulk
534 && !self.capture.enabled()
535 && self.pipeline.is_sync_write(headers)
536 }
537
538 async fn handle_bulk_stream(&self, req: IngressRequest, body: Incoming) -> IngressResponse {
539 let request_id = self.next_request_id();
540 let principal = match self.gate(&req, &request_id).await {
541 Ok(principal) => principal,
542 Err(resp) => return resp,
543 };
544 let safe_headers = crate::bearer::without_authorization(&req.headers);
545 // The forwarded set is computed from the *raw* headers (so the client
546 // `Authorization` is available per the pass-all default), independent of
547 // the auth-stripped view the pipeline routes on.
548 let forward = self.forward_policy.forward_set(&req.headers);
549 // The body is the streamed NDJSON batch, not `req.body` (empty here).
550 let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);
551
552 let stream = osproxy_sink::stream_body(body);
553 let (result, should_capture) = self.pipeline.handle_bulk_streamed(&ctx, stream).await;
554 self.finish_streamed(&req, &request_id, result, should_capture)
555 }
556}
557
558impl<A, Z> AppHandler<A, Z> {
559 /// Post-response side effects: tally metrics (shape-only), emit the structured
560 /// log (opt-in), and tee the full-fidelity capture (opt-in).
561 fn after_response(
562 &self,
563 req: &IngressRequest,
564 response: &IngressResponse,
565 request_id: &RequestId,
566 ok: bool,
567 should_capture: bool,
568 ) {
569 self.metrics.record(ok);
570 // The structured log is the shape-only explain document, which carries the
571 // request's trace_id, so logs join the trace/spans.
572 if self.request_log.enabled() {
573 if let Some(record) = self.pipeline.explain(request_id) {
574 self.request_log.emit(&record);
575 }
576 }
577 self.tee_capture(req, response, request_id, should_capture);
578 }
579
580 /// Post-response side effects for a **streamed** response (no body retained):
581 /// tally metrics and emit the structured log. Capture is never available on a
582 /// streamed path (there is no buffered body to tee), so it is not attempted.
583 fn after_streamed(&self, request_id: &RequestId, ok: bool) {
584 self.metrics.record(ok);
585 if self.request_log.enabled() {
586 if let Some(record) = self.pipeline.explain(request_id) {
587 self.request_log.emit(&record);
588 }
589 }
590 }
591
592 /// Full-fidelity capture: tee the raw exchange for replay/audit when a capture
593 /// sink is wired *and* `should_capture` (the live directive decision) selected
594 /// this request, so capture is on demand, not whenever a sink exists. The
595 /// original request headers pass through; redaction (e.g. dropping
596 /// `Authorization`) is composed via `RedactingCapture`.
597 fn tee_capture(
598 &self,
599 req: &IngressRequest,
600 response: &IngressResponse,
601 request_id: &RequestId,
602 should_capture: bool,
603 ) {
604 if !should_capture || !self.capture.enabled() {
605 return;
606 }
607 self.capture.capture(&CaptureRecord {
608 request_id: request_id.as_str(),
609 method: req.method,
610 path: &req.path,
611 query: req.query.as_deref(),
612 headers: &req.headers,
613 body: &req.body,
614 response_status: response.status,
615 response_body: &response.body,
616 });
617 }
618}
619
620/// Whether the query string opens a scroll (`scroll=…`). Such a search returns a
621/// `_scroll_id` that must be affinity-wrapped against the whole response body, so
622/// it keeps the buffered path rather than streaming.
623fn opens_scroll(query: Option<&str>) -> bool {
624 query.is_some_and(|q| {
625 q.split('&')
626 .any(|p| p == "scroll" || p.starts_with("scroll="))
627 })
628}
629
630/// Carries a small buffered [`IngressResponse`] (e.g. an auth refusal) as a
631/// [`StreamingResponse`], preserving its status and headers, so the streamed
632/// forward path has one return type for both the gate refusal and the stream.
633fn to_streaming(resp: IngressResponse) -> StreamingResponse {
634 let mut streaming = StreamingResponse::buffered(resp.status, resp.body);
635 streaming.headers = resp.headers;
636 streaming
637}
638
639/// Builds the engine [`RequestCtx`] from an authenticated request, the one place
640/// the four data-plane entry points (`handle`, `handle_forward`,
641/// `handle_search_stream`, `handle_bulk_stream`) share, so they cannot drift in
642/// which fields they wire. The borrows (`principal`, `request_id`, `safe_headers`)
643/// are caller-owned locals; the body always rides as `req.body` (empty on the
644/// streamed paths, where the real body travels beside the ctx).
645fn build_ctx<'a>(
646 req: &'a IngressRequest,
647 principal: &'a Principal,
648 request_id: &'a RequestId,
649 safe_headers: &'a [(String, String)],
650 forward_headers: &'a [(String, String)],
651) -> RequestCtx<'a> {
652 RequestCtx::new(
653 principal,
654 request_id,
655 req.method,
656 req.endpoint,
657 req.protocol,
658 &req.logical_index,
659 HeaderView::new(safe_headers),
660 &req.body,
661 )
662 .with_doc_id(req.doc_id.as_deref())
663 .with_query(req.query.as_deref())
664 .with_path(&req.path)
665 .with_forward_headers(forward_headers)
666}
667
668/// Builds the ingress response from a pipeline response, carrying its content type
669/// when set, so a verbatim admin/passthrough body (e.g. `_cat` `text/plain`) is
670/// not mislabeled `application/json`. A shaped response leaves it `None` and the
671/// transport defaults to JSON.
672fn ingress_from(resp: PipelineResponse) -> IngressResponse {
673 let out = IngressResponse::json(resp.status, resp.body);
674 match resp.content_type {
675 Some(content_type) => out.with_header("content-type", content_type),
676 None => out,
677 }
678}
679
680/// Extracts client credentials from a request: a bearer token from
681/// `Authorization` and the verified mTLS client-certificate identity, if any.
682fn credentials_from(req: &IngressRequest) -> ClientCredentials {
683 ClientCredentials {
684 bearer_token: crate::bearer::parse(&req.headers).map(str::to_owned),
685 client_cert_subject: req.client_cert_subject.clone(),
686 }
687}
688
689/// A value-free JSON body for an auth failure.
690fn auth_error_body(err: &AuthError) -> Vec<u8> {
691 format!(r#"{{"error":"{}"}}"#, err.code().as_slug()).into_bytes()
692}
693
694/// Maps a request-path error to an HTTP status, by its stable code.
695fn status_for(err: &RequestError) -> u16 {
696 match err.code() {
697 ErrorCode::PartitionUnresolved | ErrorCode::UnsupportedEndpoint => 400,
698 ErrorCode::AuthFailed => 401,
699 ErrorCode::Unauthorized => 403,
700 ErrorCode::PlacementMissing => 404,
701 ErrorCode::StaleEpoch => 409,
702 ErrorCode::PayloadTooLarge => 413,
703 ErrorCode::UpstreamFailed => 502,
704 ErrorCode::PlacementBackendUnavailable | ErrorCode::Overloaded => 503,
705 // ErrorCode is non-exhaustive; an unmapped code is an internal fault.
706 _ => 500,
707 }
708}
709
710/// A value-free JSON error body carrying the stable code and retryability, so a
711/// client or LLM can act on it without any tenant data leaking (NFR-S2).
712fn error_body(err: &RequestError) -> Vec<u8> {
713 format!(
714 r#"{{"error":"{}","retryable":{}}}"#,
715 err.code().as_slug(),
716 err.retryable(),
717 )
718 .into_bytes()
719}
720
721#[cfg(test)]
722#[path = "handler_tests.rs"]
723mod tests;