Skip to main content

osproxy_sink/
read.rs

1//! The [`Reader`] trait: fetching a single document by physical id.
2//!
3//! Reads are always direct-to-cluster, unlike writes, they cannot be served by
4//! a queue, so the read seam is separate from [`Sink`](crate::Sink). The same
5//! backend type may implement both (`OpenSearchSink` does, sharing its pooled
6//! connection), while a write-only `QueueSink` implements only [`Sink`].
7//!
8//! [`Sink`]: crate::Sink
9//
10// JUSTIFY(file-length): one cohesive family of read-path value types, the
11// `Reader` trait plus the op (`ReadOp`/`SearchOp`/`CursorOp`) and outcome
12// (`ReadOutcome`/`SearchOutcome`/`CountOutcome`/`CursorOutcome`) structs they
13// exchange. They share the same builders and conventions; splitting them would
14// scatter one small vocabulary across files for no real separation.
15
16use osproxy_core::{ClusterId, Target, TraceContext};
17use osproxy_spi::{HttpMethod, Protocol};
18
19use crate::error::SinkError;
20
21/// A read-by-id operation against a resolved [`Target`].
22///
23/// The id is already the **physical** id (the tenancy adapter mapped the
24/// client's logical id, `docs/04` §5); the reader does no rewriting.
25#[derive(Clone, PartialEq, Eq, Debug)]
26pub struct ReadOp {
27    /// The physical destination to read from.
28    pub target: Target,
29    /// The physical document id to fetch.
30    pub id: String,
31    /// The `_routing` value (the partition id), if the placement routes.
32    pub routing: Option<String>,
33    /// The upstream wire protocol this read is dispatched over. Defaults to
34    /// [`Protocol::Http1`].
35    pub protocol: Protocol,
36    /// The W3C trace context to forward downstream (`traceparent`), so the
37    /// upstream's spans join this request's distributed trace. `None` = no
38    /// propagation header is sent.
39    pub trace: Option<TraceContext>,
40    /// Client headers to relay verbatim to the upstream (the forwarding policy's
41    /// output). Applied before [`trace`](Self::trace). Empty by default.
42    pub forward_headers: Vec<(String, String)>,
43}
44
45impl ReadOp {
46    /// Constructs a read operation (defaulting to HTTP/1.1 upstream).
47    #[must_use]
48    pub fn new(target: Target, id: impl Into<String>, routing: Option<String>) -> Self {
49        Self {
50            target,
51            id: id.into(),
52            routing,
53            protocol: Protocol::Http1,
54            trace: None,
55            forward_headers: Vec::new(),
56        }
57    }
58
59    /// Sets the upstream protocol for this op (builder style).
60    #[must_use]
61    pub fn with_protocol(mut self, protocol: Protocol) -> Self {
62        self.protocol = protocol;
63        self
64    }
65
66    /// Sets the trace context to propagate downstream (builder style).
67    #[must_use]
68    pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
69        self.trace = trace;
70        self
71    }
72
73    /// Sets the client headers to relay verbatim to the upstream (builder style).
74    #[must_use]
75    pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
76        self.forward_headers = headers;
77        self
78    }
79}
80
81/// The outcome of a read: whether the document was found, and its raw upstream
82/// body (the document as stored, before the read-path field strip).
83#[derive(Clone, PartialEq, Eq, Debug)]
84pub struct ReadOutcome {
85    /// The upstream HTTP status.
86    pub status: u16,
87    /// Whether the document exists.
88    pub found: bool,
89    /// The raw upstream response body (the stored document when `found`).
90    pub body: Vec<u8>,
91    /// Whether this read rode a reused pooled connection (NFR-P telemetry).
92    pub pool_reuse: bool,
93}
94
95impl ReadOutcome {
96    /// A hit carrying the stored document body.
97    #[must_use]
98    pub fn found(status: u16, body: Vec<u8>) -> Self {
99        Self {
100            status,
101            found: true,
102            body,
103            pool_reuse: false,
104        }
105    }
106
107    /// A miss (no such document).
108    #[must_use]
109    pub fn not_found(status: u16, body: Vec<u8>) -> Self {
110        Self {
111            status,
112            found: false,
113            body,
114            pool_reuse: false,
115        }
116    }
117
118    /// Records whether the dispatch reused a pooled connection (builder style).
119    #[must_use]
120    pub fn with_pool_reuse(mut self, reused: bool) -> Self {
121        self.pool_reuse = reused;
122        self
123    }
124}
125
126/// A search operation against a resolved [`Target`].
127///
128/// The body is the **already-wrapped** query (the tenancy partition filter has
129/// been applied, `docs/04` §4); the reader forwards it verbatim.
130#[derive(Clone, PartialEq, Eq, Debug)]
131pub struct SearchOp {
132    /// The physical destination to search.
133    pub target: Target,
134    /// The query body to forward upstream (already partition-filtered).
135    pub body: Vec<u8>,
136    /// The upstream wire protocol this search is dispatched over. Defaults to
137    /// [`Protocol::Http1`].
138    pub protocol: Protocol,
139    /// An already-allow-listed query string (without the `?`) to append to the
140    /// upstream URL, e.g. `scroll=1m` to open a scroll. The engine filters this
141    /// to cursor-safe params before it reaches here; the sink appends it verbatim.
142    pub query: Option<String>,
143    /// The W3C trace context to forward downstream (`traceparent`).
144    pub trace: Option<TraceContext>,
145    /// Client headers to relay verbatim to the upstream (the forwarding policy's
146    /// output). Applied before [`trace`](Self::trace). Empty by default.
147    pub forward_headers: Vec<(String, String)>,
148}
149
150impl SearchOp {
151    /// Constructs a search operation (defaulting to HTTP/1.1 upstream).
152    #[must_use]
153    pub fn new(target: Target, body: Vec<u8>) -> Self {
154        Self {
155            target,
156            body,
157            protocol: Protocol::Http1,
158            query: None,
159            trace: None,
160            forward_headers: Vec::new(),
161        }
162    }
163
164    /// Sets the upstream protocol for this op (builder style).
165    #[must_use]
166    pub fn with_protocol(mut self, protocol: Protocol) -> Self {
167        self.protocol = protocol;
168        self
169    }
170
171    /// Sets the (already allow-listed) upstream query string (builder style).
172    #[must_use]
173    pub fn with_query(mut self, query: Option<String>) -> Self {
174        self.query = query;
175        self
176    }
177
178    /// Sets the trace context to propagate downstream (builder style).
179    #[must_use]
180    pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
181        self.trace = trace;
182        self
183    }
184
185    /// Sets the client headers to relay verbatim to the upstream (builder style).
186    #[must_use]
187    pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
188        self.forward_headers = headers;
189        self
190    }
191}
192
193/// The outcome of a search: the upstream status and raw response body (the
194/// hits, before the read-path field strip).
195#[derive(Clone, PartialEq, Eq, Debug)]
196pub struct SearchOutcome {
197    /// The upstream HTTP status.
198    pub status: u16,
199    /// The raw upstream response body (the hits envelope).
200    pub body: Vec<u8>,
201    /// Whether this search rode a reused pooled connection (NFR-P telemetry).
202    pub pool_reuse: bool,
203}
204
205impl SearchOutcome {
206    /// Constructs a search outcome.
207    #[must_use]
208    pub fn new(status: u16, body: Vec<u8>) -> Self {
209        Self {
210            status,
211            body,
212            pool_reuse: false,
213        }
214    }
215
216    /// Records whether the dispatch reused a pooled connection (builder style).
217    #[must_use]
218    pub fn with_pool_reuse(mut self, reused: bool) -> Self {
219        self.pool_reuse = reused;
220        self
221    }
222}
223
224/// The outcome of a count: the upstream status and the matched document count.
225#[derive(Clone, Copy, PartialEq, Eq, Debug)]
226pub struct CountOutcome {
227    /// The upstream HTTP status.
228    pub status: u16,
229    /// The number of matching documents.
230    pub count: u64,
231    /// Whether this count rode a reused pooled connection (NFR-P telemetry).
232    pub pool_reuse: bool,
233}
234
235impl CountOutcome {
236    /// Constructs a count outcome.
237    #[must_use]
238    pub fn new(status: u16, count: u64) -> Self {
239        Self {
240            status,
241            count,
242            pool_reuse: false,
243        }
244    }
245
246    /// Records whether the dispatch reused a pooled connection (builder style).
247    #[must_use]
248    pub fn with_pool_reuse(mut self, reused: bool) -> Self {
249        self.pool_reuse = reused;
250        self
251    }
252}
253
254/// A raw cursor passthrough op (`docs/03` §6): forward `method path` with `body`
255/// to the specific `cluster` the cursor is pinned to, scroll/PIT continue,
256/// clear, or close. Unlike the typed ops, the destination is *already resolved*
257/// (the engine recovered it from the cursor's signed envelope), so this carries
258/// the cluster directly rather than a partition.
259#[derive(Clone, PartialEq, Eq, Debug)]
260pub struct CursorOp {
261    /// The cluster the cursor is pinned to.
262    pub cluster: ClusterId,
263    /// The HTTP method to forward (continue is `POST`/`GET`, clear/close `DELETE`).
264    pub method: HttpMethod,
265    /// The upstream path (e.g. `/_search/scroll`), already with the real cursor id.
266    pub path: String,
267    /// The request body to forward (the real, unwrapped cursor id substituted in).
268    pub body: Vec<u8>,
269    /// An already-allow-listed query string (without the `?`) to append to the
270    /// upstream URL, e.g. `keep_alive=1m` on PIT create. Filtered by the engine.
271    pub query: Option<String>,
272    /// The pinned cluster's base URL, when the engine knows it (the placement that
273    /// opened the cursor supplied it). `None` for an affinity continue recovered
274    /// from the envelope alone: the sink then reuses the pool the opening request
275    /// already built for this cluster, erroring only if none exists.
276    pub endpoint: Option<String>,
277    /// The upstream wire protocol. Defaults to [`Protocol::Http1`].
278    pub protocol: Protocol,
279    /// The W3C trace context to forward downstream.
280    pub trace: Option<TraceContext>,
281    /// Client headers to relay verbatim to the upstream (the forwarding policy's
282    /// output). Applied before [`trace`](Self::trace), so a proxy-injected trace
283    /// header wins when span export is on. Empty by default.
284    pub forward_headers: Vec<(String, String)>,
285}
286
287impl CursorOp {
288    /// Constructs a cursor passthrough op (defaulting to HTTP/1.1 upstream).
289    #[must_use]
290    pub fn new(
291        cluster: ClusterId,
292        method: HttpMethod,
293        path: impl Into<String>,
294        body: Vec<u8>,
295    ) -> Self {
296        Self {
297            cluster,
298            method,
299            path: path.into(),
300            body,
301            query: None,
302            endpoint: None,
303            protocol: Protocol::Http1,
304            trace: None,
305            forward_headers: Vec::new(),
306        }
307    }
308
309    /// Sets the client headers to relay verbatim to the upstream (builder style).
310    #[must_use]
311    pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
312        self.forward_headers = headers;
313        self
314    }
315
316    /// Sets the pinned cluster's base URL (builder style), when the engine knows
317    /// it. Lets the sink build the pool for an affinity request even on an
318    /// instance that did not serve the opening call.
319    #[must_use]
320    pub fn with_endpoint(mut self, endpoint: Option<String>) -> Self {
321        self.endpoint = endpoint;
322        self
323    }
324
325    /// Sets the upstream wire protocol (builder style).
326    #[must_use]
327    pub fn with_protocol(mut self, protocol: Protocol) -> Self {
328        self.protocol = protocol;
329        self
330    }
331
332    /// Sets the (already allow-listed) upstream query string (builder style).
333    #[must_use]
334    pub fn with_query(mut self, query: Option<String>) -> Self {
335        self.query = query;
336        self
337    }
338
339    /// Sets the trace context to propagate downstream (builder style).
340    #[must_use]
341    pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
342        self.trace = trace;
343        self
344    }
345}
346
347/// A verbatim forward whose request body is a **stream**, not buffered bytes
348/// (ADR-014 stage 2): the same destination shape as [`CursorOp`] but the body is
349/// supplied separately as a [`ByteBody`](crate::ByteBody) so it can be
350/// piped from the downstream connection straight to the upstream without ever
351/// being collected. Used by the tenant-agnostic passthrough path.
352#[derive(Clone, PartialEq, Eq, Debug)]
353pub struct ForwardOp {
354    /// The cluster to forward to.
355    pub cluster: ClusterId,
356    /// The HTTP method to forward.
357    pub method: HttpMethod,
358    /// The upstream path, forwarded verbatim.
359    pub path: String,
360    /// An already-allow-listed query string (without the `?`) to append upstream.
361    pub query: Option<String>,
362    /// The cluster's base URL, when known (so the pool can be built on any
363    /// instance). `None` reuses an existing pool, erroring if none exists.
364    pub endpoint: Option<String>,
365    /// The upstream wire protocol. Defaults to [`Protocol::Http1`].
366    pub protocol: Protocol,
367    /// The W3C trace context to forward downstream.
368    pub trace: Option<TraceContext>,
369    /// Client headers to relay verbatim to the upstream (the forwarding policy's
370    /// output). Applied before [`trace`](Self::trace). Empty by default.
371    pub forward_headers: Vec<(String, String)>,
372}
373
374impl ForwardOp {
375    /// Constructs a streaming forward op (defaulting to HTTP/1.1 upstream).
376    #[must_use]
377    pub fn new(cluster: ClusterId, method: HttpMethod, path: impl Into<String>) -> Self {
378        Self {
379            cluster,
380            method,
381            path: path.into(),
382            query: None,
383            endpoint: None,
384            protocol: Protocol::Http1,
385            trace: None,
386            forward_headers: Vec::new(),
387        }
388    }
389
390    /// Sets the client headers to relay verbatim to the upstream (builder style).
391    #[must_use]
392    pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
393        self.forward_headers = headers;
394        self
395    }
396
397    /// Sets the cluster's base URL (builder style).
398    #[must_use]
399    pub fn with_endpoint(mut self, endpoint: Option<String>) -> Self {
400        self.endpoint = endpoint;
401        self
402    }
403
404    /// Sets the (already allow-listed) upstream query string (builder style).
405    #[must_use]
406    pub fn with_query(mut self, query: Option<String>) -> Self {
407        self.query = query;
408        self
409    }
410
411    /// Sets the upstream wire protocol (builder style).
412    #[must_use]
413    pub fn with_protocol(mut self, protocol: Protocol) -> Self {
414        self.protocol = protocol;
415        self
416    }
417
418    /// Sets the trace context to propagate downstream (builder style).
419    #[must_use]
420    pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
421        self.trace = trace;
422        self
423    }
424}
425
426/// The outcome of a **streaming** verbatim forward (ADR-014): the upstream status
427/// and its response body as a live [`ByteBody`](crate::ByteBody) stream, piped
428/// back to the client without ever being collected. Unlike [`CursorOutcome`], the
429/// body is not materialized here, so this carries no derives (the stream is
430/// one-shot).
431pub struct StreamingForward {
432    /// The upstream HTTP status.
433    pub status: u16,
434    /// The upstream response body, streamed back verbatim.
435    pub body: crate::ByteBody,
436    /// The upstream `Content-Type`, forwarded verbatim so a non-JSON passthrough
437    /// body is not mislabeled `application/json`. `None` ⇒ the caller defaults to
438    /// JSON.
439    pub content_type: Option<String>,
440    /// Whether this op rode a reused pooled connection (NFR-P telemetry).
441    pub pool_reuse: bool,
442}
443
444impl std::fmt::Debug for StreamingForward {
445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446        // The streamed body is not `Debug`; show the shape.
447        f.debug_struct("StreamingForward")
448            .field("status", &self.status)
449            .field("pool_reuse", &self.pool_reuse)
450            .finish_non_exhaustive()
451    }
452}
453
454/// The outcome of a **streaming** search (ADR-014, final stage): the upstream
455/// status and its response body as a live [`ByteBody`](crate::ByteBody), piped
456/// back through the engine's hit transform without ever being collected. Like
457/// [`StreamingForward`], the body is one-shot, so this carries no derives.
458pub struct StreamingSearch {
459    /// The upstream HTTP status.
460    pub status: u16,
461    /// The upstream response body, streamed back to be transformed on the fly.
462    pub body: crate::ByteBody,
463    /// Whether this search rode a reused pooled connection (NFR-P telemetry).
464    pub pool_reuse: bool,
465}
466
467impl std::fmt::Debug for StreamingSearch {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        f.debug_struct("StreamingSearch")
470            .field("status", &self.status)
471            .field("pool_reuse", &self.pool_reuse)
472            .finish_non_exhaustive()
473    }
474}
475
476/// The outcome of a cursor passthrough: the upstream status and raw body,
477/// forwarded back to the client verbatim.
478#[derive(Clone, PartialEq, Eq, Debug)]
479pub struct CursorOutcome {
480    /// The upstream HTTP status.
481    pub status: u16,
482    /// The raw upstream response body.
483    pub body: Vec<u8>,
484    /// The upstream `Content-Type`, forwarded verbatim so an admin/cursor
485    /// passthrough body (e.g. a `_cat` `text/plain`) is not mislabeled
486    /// `application/json`. `None` ⇒ the caller defaults to JSON.
487    pub content_type: Option<String>,
488    /// Whether this op rode a reused pooled connection (NFR-P telemetry).
489    pub pool_reuse: bool,
490}
491
492impl CursorOutcome {
493    /// Constructs a cursor outcome.
494    #[must_use]
495    pub fn new(status: u16, body: Vec<u8>) -> Self {
496        Self {
497            status,
498            body,
499            content_type: None,
500            pool_reuse: false,
501        }
502    }
503
504    /// Records whether the dispatch reused a pooled connection (builder style).
505    #[must_use]
506    pub fn with_pool_reuse(mut self, reused: bool) -> Self {
507        self.pool_reuse = reused;
508        self
509    }
510
511    /// Carries the upstream `Content-Type` (builder style).
512    #[must_use]
513    pub fn with_content_type(mut self, content_type: Option<String>) -> Self {
514        self.content_type = content_type;
515        self
516    }
517}
518
519/// Where reads come from.
520///
521/// The read counterpart of [`Sink`](crate::Sink). Kept separate because a read
522/// is inherently direct-to-cluster: a redundancy `QueueSink` can absorb writes
523/// but cannot answer a get-by-id or a search.
524///
525/// # Invariants
526///
527/// - MUST NOT panic; return [`SinkError`] for every transport/upstream failure
528///   (NFR-R1). A missing document is *not* an error, it is a
529///   [`ReadOutcome`] with `found == false`.
530pub trait Reader: Send + Sync {
531    /// Fetches a single document by physical id.
532    ///
533    /// # Errors
534    ///
535    /// Returns [`SinkError`] if the upstream cannot be reached or returns a
536    /// server error (a 404 for a missing document is a normal not-found
537    /// outcome, not an error).
538    fn get(
539        &self,
540        op: ReadOp,
541    ) -> impl std::future::Future<Output = Result<ReadOutcome, SinkError>> + Send;
542
543    /// Runs a search, returning the raw hits envelope.
544    ///
545    /// # Errors
546    ///
547    /// Returns [`SinkError`] if the upstream cannot be reached or returns a
548    /// server error.
549    fn search(
550        &self,
551        op: SearchOp,
552    ) -> impl std::future::Future<Output = Result<SearchOutcome, SinkError>> + Send;
553
554    /// Counts the documents matching a (partition-filtered) query.
555    ///
556    /// Takes the same [`SearchOp`] as [`Reader::search`], the wrapped query is
557    /// identical, but hits the count endpoint, returning only the total.
558    ///
559    /// # Errors
560    ///
561    /// Returns [`SinkError`] if the upstream cannot be reached or returns a
562    /// server error.
563    fn count(
564        &self,
565        op: SearchOp,
566    ) -> impl std::future::Future<Output = Result<CountOutcome, SinkError>> + Send;
567
568    /// Forwards a raw cursor request to its pinned cluster (scroll/PIT continue,
569    /// clear, close). The default is **unsupported**, a sink that cannot
570    /// passthrough (the in-memory test sink, a write-only queue) rejects it;
571    /// `OpenSearchSink` overrides it with a real upstream call.
572    ///
573    /// # Errors
574    ///
575    /// Returns [`SinkError`] if the sink does not support passthrough or the
576    /// upstream cannot be reached.
577    fn cursor(
578        &self,
579        _op: CursorOp,
580    ) -> impl std::future::Future<Output = Result<CursorOutcome, SinkError>> + Send {
581        async {
582            Err(SinkError::Transport {
583                kind: "cursor passthrough not supported by this sink",
584            })
585        }
586    }
587
588    /// Runs a search whose **response** streams back (ADR-014, final stage): the
589    /// upstream hits envelope is piped to the engine's hit transform without being
590    /// collected, so a large response (e.g. heavy `aggregations`) never lands in
591    /// memory. The default is **unsupported**; `OpenSearchSink` overrides it with a
592    /// real streamed upstream call.
593    ///
594    /// # Errors
595    ///
596    /// Returns [`SinkError`] if the sink does not support streaming search or the
597    /// upstream cannot be reached or returns a server error.
598    fn search_stream(
599        &self,
600        _op: SearchOp,
601    ) -> impl std::future::Future<Output = Result<StreamingSearch, SinkError>> + Send {
602        async {
603            Err(SinkError::Transport {
604                kind: "streaming search not supported by this sink",
605            })
606        }
607    }
608
609    /// Forwards a request to a cluster with the body supplied as a **stream**
610    /// (ADR-014 stage 2): the verbatim-passthrough path pipes the downstream body
611    /// straight upstream without buffering. The default is **unsupported**;
612    /// `OpenSearchSink` overrides it with a real streamed upstream call.
613    ///
614    /// # Errors
615    ///
616    /// Returns [`SinkError`] if the sink does not support streaming forward or the
617    /// upstream cannot be reached.
618    fn forward_stream(
619        &self,
620        _op: ForwardOp,
621        _body: crate::opensearch::ByteBody,
622    ) -> impl std::future::Future<Output = Result<StreamingForward, SinkError>> + Send {
623        async {
624            Err(SinkError::Transport {
625                kind: "streaming forward not supported by this sink",
626            })
627        }
628    }
629}