osproxy-server 1.0.1

The osproxy binary: process lifecycle and wiring. No business logic.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
//! The ingress handler: authenticates the caller, builds a request context, and
//! drives the engine pipeline, mapping the outcome to an HTTP response.
//
// JUSTIFY(file-length): the single ingress-orchestration point, pre-auth
// introspection routing, the TLS and auth/authz gates, and data-plane dispatch
// are one cohesive flow over the handler's private state; splitting it would
// force those fields pub(crate) and scatter the request lifecycle across files.

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

use osproxy_core::{Clock, EndpointKind, ErrorCode, RequestId};
use osproxy_engine::{Pipeline, PipelineResponse, RequestError};
use osproxy_observe::{
    decode_directive_set, DirectiveStore, InMemoryDirectiveStore, Metrics, PoolSnapshot,
};
use osproxy_sink::OpenSearchSink;
use osproxy_spi::{
    Action, AuthError, Authenticator, Authorizer, ClientCredentials, HeaderView, HttpMethod,
    Principal, RequestCtx,
};
use osproxy_tenancy::TenancyRouter;
use osproxy_transport::{
    Incoming, IngressHandler, IngressRequest, IngressResponse, StreamingResponse,
};

use crate::auth::AllowAllAuthorizer;
use crate::forward_headers::ForwardPolicy;
use crate::log::{NoLog, RequestLog};
use crate::tenancy::ReferenceTenancy;
use osproxy_capture::{Capture, CaptureRecord, NoCapture};

/// The privileged fleet-directive admin channel: a shared store to publish into,
/// gated by a bearer token, with a clock to resolve relative TTLs.
struct DirectiveAdmin {
    store: Arc<InMemoryDirectiveStore>,
    token: String,
    clock: Arc<dyn Clock>,
}

/// The concrete pipeline this binary serves.
pub type AppPipeline = Pipeline<TenancyRouter<ReferenceTenancy>, OpenSearchSink>;

/// Adapts the engine pipeline to the transport's [`IngressHandler`] contract,
/// authenticating each request with the configured [`Authenticator`] and, after
/// authentication, authorizing it with the configured [`Authorizer`] (default
/// [`AllowAllAuthorizer`], no second policy layer until one is supplied).
pub struct AppHandler<A, Z = AllowAllAuthorizer> {
    pipeline: AppPipeline,
    authenticator: A,
    authorizer: Z,
    request_seq: AtomicU64,
    request_log: Box<dyn RequestLog>,
    directive_admin: Option<DirectiveAdmin>,
    metrics: Metrics,
    /// When true (default), a body-mutating request over cleartext is refused
    /// (NFR-S1), the proxy must terminate TLS to rewrite the stream. An operator
    /// on a trusted network can opt out.
    require_tls_for_mutation: bool,
    /// When true (default), the pre-auth `/debug/explain` and `/debug/breakglass`
    /// surfaces are served. They are shape-only, but still expose operational
    /// metadata to anyone who can reach the port, so production deployments turn
    /// them off; disabled, both report `not_enabled` (`/metrics` stays on).
    debug_endpoints: bool,
    /// Full-fidelity traffic capture (off by default). When enabled, each
    /// forwarded data-plane exchange is teed to this sink for replay/audit. Unlike
    /// the shape-only telemetry, the records carry bodies and values, so capture
    /// is deliberate and the stream is privileged (`capture` module).
    capture: Box<dyn Capture>,
    /// Which client headers to relay verbatim to the upstream. Default pass-all
    /// (sidecar trust), minus the mandatory hop-by-hop/framing set. Computed from
    /// the *raw* request headers (so the client `Authorization` is available),
    /// separate from the auth-stripped view the pipeline routes on.
    forward_policy: ForwardPolicy,
}

impl<A, Z> std::fmt::Debug for AppHandler<A, Z> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // The injected logger is not `Debug`; show whether it is enabled.
        f.debug_struct("AppHandler")
            .field("logging", &self.request_log.enabled())
            .finish_non_exhaustive()
    }
}

impl<A: Authenticator> AppHandler<A, AllowAllAuthorizer> {
    /// Wraps a pipeline and an authenticator (no request logging by default, and
    /// the allow-all authorizer until [`Self::with_authorizer`] supplies one).
    #[must_use]
    pub fn new(pipeline: AppPipeline, authenticator: A) -> Self {
        Self {
            pipeline,
            authenticator,
            authorizer: AllowAllAuthorizer,
            request_seq: AtomicU64::new(0),
            request_log: Box::new(NoLog),
            directive_admin: None,
            metrics: Metrics::new(),
            require_tls_for_mutation: true,
            debug_endpoints: true,
            capture: Box::new(NoCapture),
            forward_policy: ForwardPolicy::pass_all(),
        }
    }
}

impl<A: Authenticator, Z: Authorizer> AppHandler<A, Z> {
    /// Sets the post-authentication [`Authorizer`] (builder style). Replaces the
    /// default allow-all policy; the principal is already resolved, so the
    /// authorizer decides only whether that principal may perform the action.
    #[must_use]
    pub fn with_authorizer<Z2: Authorizer>(self, authorizer: Z2) -> AppHandler<A, Z2> {
        AppHandler {
            pipeline: self.pipeline,
            authenticator: self.authenticator,
            authorizer,
            request_seq: self.request_seq,
            request_log: self.request_log,
            directive_admin: self.directive_admin,
            metrics: self.metrics,
            require_tls_for_mutation: self.require_tls_for_mutation,
            debug_endpoints: self.debug_endpoints,
            capture: self.capture,
            forward_policy: self.forward_policy,
        }
    }

    /// Sets the client-to-upstream header forwarding policy (builder style).
    /// Default pass-all (sidecar trust). Restrict it to keep specific headers
    /// (e.g. `authorization`) off the cluster, or disable forwarding entirely.
    #[must_use]
    pub fn with_forward_policy(mut self, policy: ForwardPolicy) -> Self {
        self.forward_policy = policy;
        self
    }

    /// Sets the full-fidelity traffic capture (builder style). Off by default.
    /// Compose redaction with `capture::RedactingCapture`; the stream carries
    /// bodies and values, so treat it as privileged.
    #[must_use]
    pub fn with_capture(mut self, capture: Box<dyn Capture>) -> Self {
        self.capture = capture;
        self
    }

    /// Sets whether the pre-auth `/debug/explain` and `/debug/breakglass`
    /// surfaces are served (builder style). Default `true`; set `false` in
    /// production so operational metadata is not exposed unauthenticated.
    #[must_use]
    pub fn with_debug_endpoints(mut self, enabled: bool) -> Self {
        self.debug_endpoints = enabled;
        self
    }

    /// Sets whether body-mutating requests are refused over cleartext (NFR-S1).
    /// Builder style; default `true` (enforce). Pass `false` only on a trusted
    /// network where the operator accepts mutating over cleartext.
    #[must_use]
    pub fn with_require_tls_for_mutation(mut self, require: bool) -> Self {
        self.require_tls_for_mutation = require;
        self
    }

    /// The pipeline this handler serves, a read-only accessor for introspection
    /// (e.g. the perf harness reading upstream `pool_stats` after a load run).
    #[must_use]
    pub fn pipeline(&self) -> &AppPipeline {
        &self.pipeline
    }

    /// Builds the shape-only `/metrics` snapshot JSON: request tallies plus every
    /// configured cluster's upstream pool-reuse counters. No tenant data, so it is
    /// always safe to expose.
    fn metrics_snapshot(&self) -> String {
        let pools = self
            .pipeline
            .sink()
            .pool_stats_all()
            .into_iter()
            .map(|(id, s)| PoolSnapshot {
                cluster: id.as_str().to_owned(),
                opened: s.opened,
                dispatched: s.dispatched,
                reused: s.reused(),
            })
            .collect();
        self.metrics.snapshot(pools).to_json()
    }

    /// Sets the structured per-request logger (builder style). Default: no logs.
    #[must_use]
    pub fn with_request_log(mut self, request_log: Box<dyn RequestLog>) -> Self {
        self.request_log = request_log;
        self
    }

    /// Enables the `POST /admin/directives` channel (builder style): publishes a
    /// fleet directive set into `store` when the request carries the bearer
    /// `token`. Without this, the endpoint reports `not_enabled`.
    #[must_use]
    pub fn with_directive_admin(
        mut self,
        store: Arc<InMemoryDirectiveStore>,
        token: String,
        clock: Arc<dyn Clock>,
    ) -> Self {
        self.directive_admin = Some(DirectiveAdmin {
            store,
            token,
            clock,
        });
        self
    }

    /// A per-request correlation id. A monotonic counter is enough for the
    /// reference binary; a real deployment would carry a propagated trace id.
    fn next_request_id(&self) -> RequestId {
        let n = self.request_seq.fetch_add(1, Ordering::Relaxed) + 1;
        RequestId::from(format!("req-{n}").as_str())
    }

    /// Handles `POST /admin/directives`: publishes a fleet directive set into the
    /// shared store when enabled and the bearer token matches. Fail-closed at
    /// every step, disabled, wrong method, bad token, or malformed body all leave
    /// the active set unchanged.
    fn publish_directives(&self, req: &IngressRequest) -> IngressResponse {
        let Some(admin) = &self.directive_admin else {
            return IngressResponse::json(404, br#"{"error":"not_enabled"}"#.to_vec());
        };
        if req.method != HttpMethod::Post {
            return IngressResponse::json(405, br#"{"error":"method_not_allowed"}"#.to_vec());
        }
        // Publishing a fleet directive set is a privileged mutation carrying a
        // bearer token; refuse it over cleartext (same NFR-S1 stance as the data
        // plane) so the token is never exposed on the wire. The introspection
        // routes short-circuit before the data-plane TLS gate, so enforce it here.
        if self.require_tls_for_mutation && !req.secure {
            return IngressResponse::json(403, br#"{"error":"tls_required"}"#.to_vec());
        }
        if !crate::bearer::matches(&req.headers, &admin.token) {
            return IngressResponse::json(401, br#"{"error":"unauthorized"}"#.to_vec());
        }
        match decode_directive_set(&req.body, admin.clock.as_ref()) {
            Ok(set) => {
                let count = set.len();
                admin.store.publish(set);
                IngressResponse::json(200, format!(r#"{{"published":{count}}}"#).into_bytes())
            }
            Err(reason) => {
                IngressResponse::json(400, format!(r#"{{"error":"{reason}"}}"#).into_bytes())
            }
        }
    }

    /// The pre-auth introspection and control-plane surfaces, in one place: the
    /// shape-only `/debug/*` tools, the always-on `/metrics` snapshot, and the
    /// token-gated `/admin/directives` (GET reads, POST publishes). Returns `Some`
    /// when `req` targets one of them, else `None` (the request is data plane).
    fn introspection_route(&self, req: &IngressRequest) -> Option<IngressResponse> {
        // /debug/*: the shape-only diagnostics surfaces, served only when enabled
        // (off in production so operational metadata is not exposed unauthenticated).
        // Disabled, they report `not_enabled` rather than 404, to distinguish "turned
        // off here" from "no such route".
        if req.path.starts_with("/debug/") {
            if !self.debug_endpoints {
                return Some(IngressResponse::json(
                    404,
                    br#"{"error":"not_enabled"}"#.to_vec(),
                ));
            }
            // /debug/explain/{id}: the shape-only causal trace for one request.
            if let Some(id) = req.path.strip_prefix("/debug/explain/") {
                return Some(match self.pipeline.explain(&RequestId::from(id)) {
                    Some(doc) => IngressResponse::json(200, doc.to_string().into_bytes()),
                    None => {
                        IngressResponse::json(404, br#"{"error":"unknown_request_id"}"#.to_vec())
                    }
                });
            }
            // /debug/breakglass: the forensic tape captured under a ring_buffer
            // directive (`docs/05` §5), oldest first. Shape-only like the explain doc.
            if req.path == "/debug/breakglass" {
                let tape = serde_json::Value::Array(self.pipeline.break_glass().snapshot());
                return Some(IngressResponse::json(200, tape.to_string().into_bytes()));
            }
        }
        // /metrics: the always-on, prod-safe operational snapshot (shape-only
        // counts/rates/cluster ids, so no auth; see `metrics_snapshot`).
        if req.path == "/metrics" {
            return Some(IngressResponse::json(
                200,
                self.metrics_snapshot().into_bytes(),
            ));
        }
        // /admin/directives: privileged control-plane settings, GET introspects
        // what this instance applies, POST publishes a new set; both token-gated
        // and fail-closed (a forged token reveals/changes nothing, `docs/05` §3).
        if req.path == "/admin/directives" {
            return Some(match req.method {
                HttpMethod::Get => self.introspect_directives(req),
                _ => self.publish_directives(req),
            });
        }
        None
    }

    /// Handles `GET /admin/directives`: returns the control-plane settings this
    /// instance is currently applying, the read side of the directive store, so
    /// an agent can see what is in effect (per instance; the replicating store
    /// keeps the fleet consistent). Token-gated like the publish path (the
    /// targeting selectors are operator config) and fail-closed: disabled or a bad
    /// token reveals nothing.
    fn introspect_directives(&self, req: &IngressRequest) -> IngressResponse {
        let Some(admin) = &self.directive_admin else {
            return IngressResponse::json(404, br#"{"error":"not_enabled"}"#.to_vec());
        };
        if !crate::bearer::matches(&req.headers, &admin.token) {
            return IngressResponse::json(401, br#"{"error":"unauthorized"}"#.to_vec());
        }
        let view = admin.store.load().introspect(admin.clock.now());
        IngressResponse::json(200, view.to_string().into_bytes())
    }
}

impl<A: Authenticator, Z: Authorizer> AppHandler<A, Z> {
    /// The shared pre-dispatch gate for both the buffered and streamed paths:
    /// refuse mutation over cleartext (NFR-S1), authenticate (the bearer token is
    /// consumed here, never reaching the pipeline or telemetry), and authorize.
    /// Returns the resolved principal, or the error response to return verbatim.
    async fn gate(
        &self,
        req: &IngressRequest,
        request_id: &RequestId,
    ) -> Result<Principal, IngressResponse> {
        if self.require_tls_for_mutation && req.endpoint.is_tenancy_aware() && !req.secure {
            return Err(
                IngressResponse::json(403, br#"{"error":"tls_required"}"#.to_vec())
                    .with_header("x-request-id", request_id.as_str()),
            );
        }
        let principal = self
            .authenticator
            .authenticate(&credentials_from(req))
            .await
            .map_err(|err| {
                IngressResponse::json(err.http_status(), auth_error_body(&err))
                    .with_header("x-request-id", request_id.as_str())
            })?;
        let action = Action {
            endpoint: req.endpoint,
            logical_index: req.logical_index.clone(),
        };
        self.authorizer
            .authorize(&principal, &action)
            .await
            .map_err(|err| {
                IngressResponse::json(err.http_status(), auth_error_body(&err))
                    .with_header("x-request-id", request_id.as_str())
            })?;
        Ok(principal)
    }

    /// Maps a streamed pipeline outcome to a response, tallying side effects, the
    /// shared tail of the streaming forward and bulk paths.
    fn finish_streamed(
        &self,
        req: &IngressRequest,
        request_id: &RequestId,
        result: Result<PipelineResponse, RequestError>,
        should_capture: bool,
    ) -> IngressResponse {
        let (response, ok) = match result {
            Ok(resp) => {
                let ok = (200..300).contains(&resp.status);
                (ingress_from(resp), ok)
            }
            Err(err) => (
                IngressResponse::json(status_for(&err), error_body(&err)),
                false,
            ),
        };
        self.after_response(req, &response, request_id, ok, should_capture);
        response.with_header("x-request-id", request_id.as_str())
    }
}

impl<A: Authenticator, Z: Authorizer> IngressHandler for AppHandler<A, Z> {
    async fn handle(&self, req: IngressRequest) -> IngressResponse {
        let request_id = self.next_request_id();

        // Introspection + admin surfaces short-circuit before auth; the data plane
        // continues below.
        if let Some(resp) = self.introspection_route(&req) {
            return resp;
        }

        let principal = match self.gate(&req, &request_id).await {
            Ok(principal) => principal,
            Err(resp) => return resp,
        };

        // The credentials were consumed above; strip the `Authorization` header so
        // the bearer token never reaches the pipeline, observability, or logs. The
        // partition header, `traceparent`, and `x-debug-directive` are preserved,
        // the engine still needs them.
        let safe_headers = crate::bearer::without_authorization(&req.headers);
        // The forwarded set is computed from the *raw* headers (so the client
        // `Authorization` is available per the pass-all default), independent of
        // the auth-stripped view the pipeline routes on.
        let forward = self.forward_policy.forward_set(&req.headers);
        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);

        // Echo the request id so a client (or LLM) can fetch its
        // /debug/explain/{id} afterward. `should_capture` is the live per-request
        // capture decision, applied to both success and error responses.
        let (result, should_capture) = self.pipeline.handle_with_capture(&ctx).await;
        let (response, ok) = match result {
            Ok(resp) => {
                let ok = (200..300).contains(&resp.status);
                (ingress_from(resp), ok)
            }
            Err(err) => (
                IngressResponse::json(status_for(&err), error_body(&err)),
                false,
            ),
        };
        self.after_response(&req, &response, &request_id, ok, should_capture);
        response.with_header("x-request-id", request_id.as_str())
    }

    fn forward_plan(&self, path: &str, logical_index: &str) -> bool {
        // Full-fidelity capture tees the raw exchange, which needs the body in
        // memory, so when capture is wired, buffer (take the `handle` path) rather
        // than stream. Streaming and capture are mutually exclusive by nature.
        if self.capture.enabled() {
            return false;
        }
        // Never stream-forward the proxy-internal surfaces, they are served
        // pre-auth in `handle` and must not be forwarded to a cluster, even under
        // a whole-instance passthrough policy (which matches every index).
        if path.starts_with("/debug/") || path == "/metrics" || path == "/admin/directives" {
            return false;
        }
        self.pipeline.is_passthrough(logical_index)
    }

    async fn handle_forward(&self, req: IngressRequest, body: Incoming) -> StreamingResponse {
        let request_id = self.next_request_id();
        // `forward_plan` already excluded the introspection routes; apply the same
        // TLS + auth + authz gate as the buffered path before forwarding.
        let principal = match self.gate(&req, &request_id).await {
            Ok(principal) => principal,
            // The gate's refusal is a small buffered error; carry it as a streaming
            // response so both arms share one return type.
            Err(resp) => return to_streaming(resp),
        };

        let safe_headers = crate::bearer::without_authorization(&req.headers);
        // The forwarded set is computed from the *raw* headers (so the client
        // `Authorization` is available per the pass-all default), independent of
        // the auth-stripped view the pipeline routes on.
        let forward = self.forward_policy.forward_set(&req.headers);
        // The body is the streamed `body` argument, not `req.body` (empty here).
        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);

        // Both directions stream: the request body pipes upstream, the upstream
        // response pipes back, neither buffered.
        let upstream = osproxy_sink::stream_body(body);
        let (result, _capture) = self.pipeline.forward_streamed(&ctx, upstream).await;
        let response = match result {
            Ok(forward) => {
                self.after_streamed(&request_id, (200..300).contains(&forward.status));
                let mut response = StreamingResponse::stream(forward.status, forward.body);
                // Carry the upstream content type so a non-JSON verbatim forward
                // (e.g. a passed-through `_cat`) is not relabeled `application/json`.
                if let Some(content_type) = forward.content_type {
                    response = response.with_header("content-type", content_type);
                }
                response
            }
            Err(err) => {
                self.after_streamed(&request_id, false);
                StreamingResponse::buffered(status_for(&err), error_body(&err))
            }
        };
        response.with_header("x-request-id", request_id.as_str())
    }

    fn wants_search_stream(&self, endpoint: EndpointKind, query: Option<&str>) -> bool {
        // Capture must tee the buffered response, so streaming and capture are
        // mutually exclusive. A scroll-opening search keeps the buffered path: its
        // `_scroll_id` affinity wrap needs the whole response body. A PIT-pinned
        // search is detected from the body inside the engine, which falls back to
        // buffered there.
        endpoint == EndpointKind::Search && !self.capture.enabled() && !opens_scroll(query)
    }

    async fn handle_search_stream(&self, req: IngressRequest) -> StreamingResponse {
        let request_id = self.next_request_id();
        // `wants_search_stream` does not gate; apply the same TLS + auth + authz
        // gate as the buffered path before dispatching.
        let principal = match self.gate(&req, &request_id).await {
            Ok(principal) => principal,
            Err(resp) => return to_streaming(resp),
        };
        let safe_headers = crate::bearer::without_authorization(&req.headers);
        // The forwarded set is computed from the *raw* headers (so the client
        // `Authorization` is available per the pass-all default), independent of
        // the auth-stripped view the pipeline routes on.
        let forward = self.forward_policy.forward_set(&req.headers);
        // Unlike the forward/bulk streams, the search query body *is* needed and is
        // the buffered `req.body`; only the response streams.
        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);

        let (result, _capture) = self.pipeline.search_streamed(&ctx).await;
        let response = match result {
            Ok(search) => {
                self.after_streamed(&request_id, (200..300).contains(&search.status));
                StreamingResponse::stream(search.status, search.body)
            }
            Err(err) => {
                self.after_streamed(&request_id, false);
                StreamingResponse::buffered(status_for(&err), error_body(&err))
            }
        };
        response.with_header("x-request-id", request_id.as_str())
    }

    fn wants_bulk_stream(&self, endpoint: EndpointKind, headers: &[(String, String)]) -> bool {
        // Capture must tee the buffered body, so streaming and capture are mutually
        // exclusive; only sync `_bulk` streams (async fan-out keeps the buffered
        // path, which enqueues per item).
        endpoint == EndpointKind::IngestBulk
            && !self.capture.enabled()
            && self.pipeline.is_sync_write(headers)
    }

    async fn handle_bulk_stream(&self, req: IngressRequest, body: Incoming) -> IngressResponse {
        let request_id = self.next_request_id();
        let principal = match self.gate(&req, &request_id).await {
            Ok(principal) => principal,
            Err(resp) => return resp,
        };
        let safe_headers = crate::bearer::without_authorization(&req.headers);
        // The forwarded set is computed from the *raw* headers (so the client
        // `Authorization` is available per the pass-all default), independent of
        // the auth-stripped view the pipeline routes on.
        let forward = self.forward_policy.forward_set(&req.headers);
        // The body is the streamed NDJSON batch, not `req.body` (empty here).
        let ctx = build_ctx(&req, &principal, &request_id, &safe_headers, &forward);

        let stream = osproxy_sink::stream_body(body);
        let (result, should_capture) = self.pipeline.handle_bulk_streamed(&ctx, stream).await;
        self.finish_streamed(&req, &request_id, result, should_capture)
    }
}

impl<A, Z> AppHandler<A, Z> {
    /// Post-response side effects: tally metrics (shape-only), emit the structured
    /// log (opt-in), and tee the full-fidelity capture (opt-in).
    fn after_response(
        &self,
        req: &IngressRequest,
        response: &IngressResponse,
        request_id: &RequestId,
        ok: bool,
        should_capture: bool,
    ) {
        self.metrics.record(ok);
        // The structured log is the shape-only explain document, which carries the
        // request's trace_id, so logs join the trace/spans.
        if self.request_log.enabled() {
            if let Some(record) = self.pipeline.explain(request_id) {
                self.request_log.emit(&record);
            }
        }
        self.tee_capture(req, response, request_id, should_capture);
    }

    /// Post-response side effects for a **streamed** response (no body retained):
    /// tally metrics and emit the structured log. Capture is never available on a
    /// streamed path (there is no buffered body to tee), so it is not attempted.
    fn after_streamed(&self, request_id: &RequestId, ok: bool) {
        self.metrics.record(ok);
        if self.request_log.enabled() {
            if let Some(record) = self.pipeline.explain(request_id) {
                self.request_log.emit(&record);
            }
        }
    }

    /// Full-fidelity capture: tee the raw exchange for replay/audit when a capture
    /// sink is wired *and* `should_capture` (the live directive decision) selected
    /// this request, so capture is on demand, not whenever a sink exists. The
    /// original request headers pass through; redaction (e.g. dropping
    /// `Authorization`) is composed via `RedactingCapture`.
    fn tee_capture(
        &self,
        req: &IngressRequest,
        response: &IngressResponse,
        request_id: &RequestId,
        should_capture: bool,
    ) {
        if !should_capture || !self.capture.enabled() {
            return;
        }
        self.capture.capture(&CaptureRecord {
            request_id: request_id.as_str(),
            method: req.method,
            path: &req.path,
            query: req.query.as_deref(),
            headers: &req.headers,
            body: &req.body,
            response_status: response.status,
            response_body: &response.body,
        });
    }
}

/// Whether the query string opens a scroll (`scroll=…`). Such a search returns a
/// `_scroll_id` that must be affinity-wrapped against the whole response body, so
/// it keeps the buffered path rather than streaming.
fn opens_scroll(query: Option<&str>) -> bool {
    query.is_some_and(|q| {
        q.split('&')
            .any(|p| p == "scroll" || p.starts_with("scroll="))
    })
}

/// Carries a small buffered [`IngressResponse`] (e.g. an auth refusal) as a
/// [`StreamingResponse`], preserving its status and headers, so the streamed
/// forward path has one return type for both the gate refusal and the stream.
fn to_streaming(resp: IngressResponse) -> StreamingResponse {
    let mut streaming = StreamingResponse::buffered(resp.status, resp.body);
    streaming.headers = resp.headers;
    streaming
}

/// Builds the engine [`RequestCtx`] from an authenticated request, the one place
/// the four data-plane entry points (`handle`, `handle_forward`,
/// `handle_search_stream`, `handle_bulk_stream`) share, so they cannot drift in
/// which fields they wire. The borrows (`principal`, `request_id`, `safe_headers`)
/// are caller-owned locals; the body always rides as `req.body` (empty on the
/// streamed paths, where the real body travels beside the ctx).
fn build_ctx<'a>(
    req: &'a IngressRequest,
    principal: &'a Principal,
    request_id: &'a RequestId,
    safe_headers: &'a [(String, String)],
    forward_headers: &'a [(String, String)],
) -> RequestCtx<'a> {
    RequestCtx::new(
        principal,
        request_id,
        req.method,
        req.endpoint,
        req.protocol,
        &req.logical_index,
        HeaderView::new(safe_headers),
        &req.body,
    )
    .with_doc_id(req.doc_id.as_deref())
    .with_query(req.query.as_deref())
    .with_path(&req.path)
    .with_forward_headers(forward_headers)
}

/// Builds the ingress response from a pipeline response, carrying its content type
/// when set, so a verbatim admin/passthrough body (e.g. `_cat` `text/plain`) is
/// not mislabeled `application/json`. A shaped response leaves it `None` and the
/// transport defaults to JSON.
fn ingress_from(resp: PipelineResponse) -> IngressResponse {
    let out = IngressResponse::json(resp.status, resp.body);
    match resp.content_type {
        Some(content_type) => out.with_header("content-type", content_type),
        None => out,
    }
}

/// Extracts client credentials from a request: a bearer token from
/// `Authorization` and the verified mTLS client-certificate identity, if any.
fn credentials_from(req: &IngressRequest) -> ClientCredentials {
    ClientCredentials {
        bearer_token: crate::bearer::parse(&req.headers).map(str::to_owned),
        client_cert_subject: req.client_cert_subject.clone(),
    }
}

/// A value-free JSON body for an auth failure.
fn auth_error_body(err: &AuthError) -> Vec<u8> {
    format!(r#"{{"error":"{}"}}"#, err.code().as_slug()).into_bytes()
}

/// Maps a request-path error to an HTTP status, by its stable code.
fn status_for(err: &RequestError) -> u16 {
    match err.code() {
        ErrorCode::PartitionUnresolved | ErrorCode::UnsupportedEndpoint => 400,
        ErrorCode::AuthFailed => 401,
        ErrorCode::Unauthorized => 403,
        ErrorCode::PlacementMissing => 404,
        ErrorCode::StaleEpoch => 409,
        ErrorCode::PayloadTooLarge => 413,
        ErrorCode::UpstreamFailed => 502,
        ErrorCode::PlacementBackendUnavailable | ErrorCode::Overloaded => 503,
        // ErrorCode is non-exhaustive; an unmapped code is an internal fault.
        _ => 500,
    }
}

/// A value-free JSON error body carrying the stable code and retryability, so a
/// client or LLM can act on it without any tenant data leaking (NFR-S2).
fn error_body(err: &RequestError) -> Vec<u8> {
    format!(
        r#"{{"error":"{}","retryable":{}}}"#,
        err.code().as_slug(),
        err.retryable(),
    )
    .into_bytes()
}

#[cfg(test)]
#[path = "handler_tests.rs"]
mod tests;