Skip to main content

osproxy_engine/
search_stream.rs

1//! The streaming search response: a live upstream body piped back to the client
2//! through the hit transform, never buffered (ADR-014, final stage).
3//!
4//! [`shape_hits_stream`] wraps the upstream [`ByteBody`] in a [`SearchHitsScanner`]
5//! and produces a new [`ByteBody`] that emits transformed bytes as the upstream
6//! flows. It is built with [`futures_util::stream::unfold`] + a
7//! [`StreamBody`], a spawn-free combinator (no
8//! `tokio::spawn`, satisfying the spawn-discipline gate) that carries the
9//! upstream body and the scanner as its state.
10
11use bytes::Bytes;
12use futures_util::{stream, StreamExt as _};
13use http_body::Frame;
14use http_body_util::{BodyExt as _, BodyStream, StreamBody};
15use osproxy_observe::RequestTrace;
16use osproxy_sink::{buffered, BodyError, ByteBody, Reader, Sink};
17use osproxy_spi::RequestCtx;
18use osproxy_tenancy::Router;
19
20use crate::cursor::{forwardable_query, pit_id_in_body};
21use crate::error::RequestError;
22use crate::observe::{read_dispatch_info, resolve_info};
23use crate::pipeline::Pipeline;
24use crate::read::build_search_op;
25use crate::search_scan::{HitShaper, SearchHitsScanner};
26
27impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
28    /// The **streaming** search path (ADR-014, final stage): like
29    /// [`search`](Self::search) but the upstream response is piped back through
30    /// the hit transform without buffering, each hit shaped incrementally, every
31    /// sibling (`aggregations` especially) forwarded verbatim. A PIT-pinned search
32    /// falls back to the buffered path: its `_scroll_id` affinity wrap needs the
33    /// whole body. The caller (transport) already excluded scroll-opening
34    /// searches, which also need the buffered body. The trace lifecycle is owned
35    /// by [`search_streamed`](Self::search_streamed).
36    pub(crate) async fn run_search_stream(
37        &self,
38        ctx: &RequestCtx<'_>,
39        trace: &mut RequestTrace,
40    ) -> Result<StreamSearch, RequestError> {
41        if self.cursor_signer.is_some() {
42            if let Some(wrapped) = pit_id_in_body(ctx.body()) {
43                let resp = self.pit_search(ctx, trace, &wrapped).await?;
44                return Ok(StreamSearch::buffered(resp.status, resp.body));
45            }
46        }
47        let resolved = self.resolve_with_retry(ctx).await?;
48        trace.record_resolve(resolve_info(&resolved));
49
50        let (search_op, shape) = build_search_op(&resolved, ctx.body())?;
51        let stream = self
52            .sink()
53            .search_stream(
54                search_op
55                    .with_query(forwardable_query(ctx.query()))
56                    .with_trace(self.upstream_trace(ctx))
57                    .with_forward_headers(ctx.forward_headers().to_vec()),
58            )
59            .await?;
60        trace.record_dispatch(read_dispatch_info(
61            &resolved,
62            stream.status,
63            stream.pool_reuse,
64        ));
65
66        let shaper = HitShaper {
67            logical_index: ctx.logical_index().to_owned(),
68            partition: resolved.partition.as_str().to_owned(),
69            shape,
70        };
71        Ok(StreamSearch::stream(
72            stream.status,
73            shape_hits_stream(stream.body, shaper),
74        ))
75    }
76}
77
78/// The outcome of a streaming search: the upstream status and the response body
79/// as a live [`ByteBody`], the hits transformed incrementally, all siblings
80/// (including `aggregations`) passed through verbatim, none of it buffered.
81pub struct StreamSearch {
82    /// The upstream HTTP status.
83    pub status: u16,
84    /// The transformed response body, streamed back.
85    pub body: ByteBody,
86}
87
88impl std::fmt::Debug for StreamSearch {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        // The streamed body is not `Debug`; show the rest of the shape.
91        f.debug_struct("StreamSearch")
92            .field("status", &self.status)
93            .finish_non_exhaustive()
94    }
95}
96
97impl StreamSearch {
98    /// A streaming response whose body is transformed on the fly.
99    #[must_use]
100    pub fn stream(status: u16, body: ByteBody) -> Self {
101        Self { status, body }
102    }
103
104    /// A buffered response (the PIT/affinity fallback, or an error), boxed into
105    /// the same body type so both arms share one return type.
106    #[must_use]
107    pub fn buffered(status: u16, body: Vec<u8>) -> Self {
108        Self {
109            status,
110            body: buffered(Bytes::from(body)),
111        }
112    }
113}
114
115/// The live state of the transforming stream: the upstream frames and the
116/// scanner driving them.
117struct Active {
118    frames: BodyStream<ByteBody>,
119    scanner: SearchHitsScanner,
120}
121
122/// The per-step state of the transforming stream. The active state is boxed (it
123/// dwarfs the terminal `Done`) so the `unfold` state stays small.
124enum Stage {
125    /// Still pulling upstream frames through the scanner.
126    Active(Box<Active>),
127    /// Exhausted.
128    Done,
129}
130
131/// Wraps the upstream search-response body so its `hits.hits` are transformed to
132/// the client's logical view incrementally, peak memory is one hit plus one
133/// upstream frame, independent of the response size (INV-MEM).
134#[must_use]
135pub(crate) fn shape_hits_stream(upstream: ByteBody, shaper: HitShaper) -> ByteBody {
136    let init = Stage::Active(Box::new(Active {
137        frames: BodyStream::new(upstream),
138        scanner: SearchHitsScanner::new(shaper),
139    }));
140    let stream = stream::unfold(init, |stage| async move { next_frame(stage).await });
141    StreamBody::new(stream).boxed_unsync()
142}
143
144/// Produces the next output frame from the stream stage, or `None` at end.
145/// Pulls upstream frames, feeds the scanner, and yields only non-empty output
146/// (an upstream frame consumed entirely into a partial hit yields nothing, so we
147/// loop to the next frame rather than emit an empty frame).
148async fn next_frame(stage: Stage) -> Option<(Result<Frame<Bytes>, BodyError>, Stage)> {
149    let Stage::Active(mut active) = stage else {
150        return None;
151    };
152    loop {
153        match active.frames.next().await {
154            Some(Ok(frame)) => {
155                let Ok(data) = frame.into_data() else {
156                    continue; // a non-data frame (trailers): ignore
157                };
158                let out = active.scanner.feed(&data);
159                if !out.is_empty() {
160                    return Some((Ok(Frame::data(Bytes::from(out))), Stage::Active(active)));
161                }
162            }
163            Some(Err(err)) => return Some((Err(err), Stage::Done)),
164            None => {
165                // Upstream ended cleanly: emit the scanner's final bytes, then stop.
166                // For well-formed input this tail is always empty, every hit and
167                // sibling is emitted incrementally as it closes, so the non-empty
168                // arm is defensive (it would carry trailing bytes only if a future
169                // change deferred emission). A truncated body leaves its partial hit
170                // unparsed and unemitted, so it is dropped, never leaked unshaped.
171                let tail = active.scanner.finish();
172                return if tail.is_empty() {
173                    None
174                } else {
175                    Some((Ok(Frame::data(Bytes::from(tail))), Stage::Done))
176                };
177            }
178        }
179    }
180}
181
182#[cfg(test)]
183#[path = "search_stream_tests.rs"]
184mod tests;