osproxy_engine/search_stream.rs
1//! The streaming search response: a live upstream body piped back to the client
2//! through the hit transform, never buffered (ADR-014, final stage).
3//!
4//! [`shape_hits_stream`] wraps the upstream [`ByteBody`] in a [`SearchHitsScanner`]
5//! and produces a new [`ByteBody`] that emits transformed bytes as the upstream
6//! flows. It is built with [`futures_util::stream::unfold`] + a
7//! [`StreamBody`], a spawn-free combinator (no
8//! `tokio::spawn`, satisfying the spawn-discipline gate) that carries the
9//! upstream body and the scanner as its state.
10
11use bytes::Bytes;
12use futures_util::{stream, StreamExt as _};
13use http_body::Frame;
14use http_body_util::{BodyExt as _, BodyStream, StreamBody};
15use osproxy_observe::RequestTrace;
16use osproxy_sink::{buffered, BodyError, ByteBody, Reader, Sink};
17use osproxy_spi::RequestCtx;
18use osproxy_tenancy::Router;
19
20use crate::cursor::{forwardable_query, pit_id_in_body};
21use crate::error::RequestError;
22use crate::observe::{read_dispatch_info, resolve_info};
23use crate::pipeline::Pipeline;
24use crate::read::build_search_op;
25use crate::search_scan::{HitShaper, SearchHitsScanner};
26
27impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
28 /// The **streaming** search path (ADR-014, final stage): like
29 /// [`search`](Self::search) but the upstream response is piped back through
30 /// the hit transform without buffering, each hit shaped incrementally, every
31 /// sibling (`aggregations` especially) forwarded verbatim. A PIT-pinned search
32 /// falls back to the buffered path: its `_scroll_id` affinity wrap needs the
33 /// whole body. The caller (transport) already excluded scroll-opening
34 /// searches, which also need the buffered body. The trace lifecycle is owned
35 /// by [`search_streamed`](Self::search_streamed).
36 pub(crate) async fn run_search_stream(
37 &self,
38 ctx: &RequestCtx<'_>,
39 trace: &mut RequestTrace,
40 ) -> Result<StreamSearch, RequestError> {
41 if self.cursor_signer.is_some() {
42 if let Some(wrapped) = pit_id_in_body(ctx.body()) {
43 let resp = self.pit_search(ctx, trace, &wrapped).await?;
44 return Ok(StreamSearch::buffered(resp.status, resp.body));
45 }
46 }
47 let resolved = self.resolve_with_retry(ctx).await?;
48 trace.record_resolve(resolve_info(&resolved));
49
50 let (search_op, shape) = build_search_op(&resolved, ctx.body())?;
51 let stream = self
52 .sink()
53 .search_stream(
54 search_op
55 .with_query(forwardable_query(ctx.query()))
56 .with_trace(self.upstream_trace(ctx))
57 .with_forward_headers(ctx.forward_headers().to_vec()),
58 )
59 .await?;
60 trace.record_dispatch(read_dispatch_info(
61 &resolved,
62 stream.status,
63 stream.pool_reuse,
64 ));
65
66 let shaper = HitShaper {
67 logical_index: ctx.logical_index().to_owned(),
68 partition: resolved.partition.as_str().to_owned(),
69 shape,
70 };
71 Ok(StreamSearch::stream(
72 stream.status,
73 shape_hits_stream(stream.body, shaper),
74 ))
75 }
76}
77
78/// The outcome of a streaming search: the upstream status and the response body
79/// as a live [`ByteBody`], the hits transformed incrementally, all siblings
80/// (including `aggregations`) passed through verbatim, none of it buffered.
81pub struct StreamSearch {
82 /// The upstream HTTP status.
83 pub status: u16,
84 /// The transformed response body, streamed back.
85 pub body: ByteBody,
86}
87
88impl std::fmt::Debug for StreamSearch {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 // The streamed body is not `Debug`; show the rest of the shape.
91 f.debug_struct("StreamSearch")
92 .field("status", &self.status)
93 .finish_non_exhaustive()
94 }
95}
96
97impl StreamSearch {
98 /// A streaming response whose body is transformed on the fly.
99 #[must_use]
100 pub fn stream(status: u16, body: ByteBody) -> Self {
101 Self { status, body }
102 }
103
104 /// A buffered response (the PIT/affinity fallback, or an error), boxed into
105 /// the same body type so both arms share one return type.
106 #[must_use]
107 pub fn buffered(status: u16, body: Vec<u8>) -> Self {
108 Self {
109 status,
110 body: buffered(Bytes::from(body)),
111 }
112 }
113}
114
115/// The live state of the transforming stream: the upstream frames and the
116/// scanner driving them.
117struct Active {
118 frames: BodyStream<ByteBody>,
119 scanner: SearchHitsScanner,
120}
121
122/// The per-step state of the transforming stream. The active state is boxed (it
123/// dwarfs the terminal `Done`) so the `unfold` state stays small.
124enum Stage {
125 /// Still pulling upstream frames through the scanner.
126 Active(Box<Active>),
127 /// Exhausted.
128 Done,
129}
130
131/// Wraps the upstream search-response body so its `hits.hits` are transformed to
132/// the client's logical view incrementally, peak memory is one hit plus one
133/// upstream frame, independent of the response size (INV-MEM).
134#[must_use]
135pub(crate) fn shape_hits_stream(upstream: ByteBody, shaper: HitShaper) -> ByteBody {
136 let init = Stage::Active(Box::new(Active {
137 frames: BodyStream::new(upstream),
138 scanner: SearchHitsScanner::new(shaper),
139 }));
140 let stream = stream::unfold(init, |stage| async move { next_frame(stage).await });
141 StreamBody::new(stream).boxed_unsync()
142}
143
144/// Produces the next output frame from the stream stage, or `None` at end.
145/// Pulls upstream frames, feeds the scanner, and yields only non-empty output
146/// (an upstream frame consumed entirely into a partial hit yields nothing, so we
147/// loop to the next frame rather than emit an empty frame).
148async fn next_frame(stage: Stage) -> Option<(Result<Frame<Bytes>, BodyError>, Stage)> {
149 let Stage::Active(mut active) = stage else {
150 return None;
151 };
152 loop {
153 match active.frames.next().await {
154 Some(Ok(frame)) => {
155 let Ok(data) = frame.into_data() else {
156 continue; // a non-data frame (trailers): ignore
157 };
158 let out = active.scanner.feed(&data);
159 if !out.is_empty() {
160 return Some((Ok(Frame::data(Bytes::from(out))), Stage::Active(active)));
161 }
162 }
163 Some(Err(err)) => return Some((Err(err), Stage::Done)),
164 None => {
165 // Upstream ended cleanly: emit the scanner's final bytes, then stop.
166 // For well-formed input this tail is always empty, every hit and
167 // sibling is emitted incrementally as it closes, so the non-empty
168 // arm is defensive (it would carry trailing bytes only if a future
169 // change deferred emission). A truncated body leaves its partial hit
170 // unparsed and unemitted, so it is dropped, never leaked unshaped.
171 let tail = active.scanner.finish();
172 return if tail.is_empty() {
173 None
174 } else {
175 Some((Ok(Frame::data(Bytes::from(tail))), Stage::Done))
176 };
177 }
178 }
179 }
180}
181
182#[cfg(test)]
183#[path = "search_stream_tests.rs"]
184mod tests;