osproxy_sink/read.rs
1//! The [`Reader`] trait: fetching a single document by physical id.
2//!
3//! Reads are always direct-to-cluster, unlike writes, they cannot be served by
4//! a queue, so the read seam is separate from [`Sink`](crate::Sink). The same
5//! backend type may implement both (`OpenSearchSink` does, sharing its pooled
6//! connection), while a write-only `QueueSink` implements only [`Sink`].
7//!
8//! [`Sink`]: crate::Sink
9//
10// JUSTIFY(file-length): one cohesive family of read-path value types, the
11// `Reader` trait plus the op (`ReadOp`/`SearchOp`/`CursorOp`) and outcome
12// (`ReadOutcome`/`SearchOutcome`/`CountOutcome`/`CursorOutcome`) structs they
13// exchange. They share the same builders and conventions; splitting them would
14// scatter one small vocabulary across files for no real separation.
15
16use osproxy_core::{ClusterId, Target, TraceContext};
17use osproxy_spi::{HttpMethod, Protocol};
18
19use crate::error::SinkError;
20
21/// A read-by-id operation against a resolved [`Target`].
22///
23/// The id is already the **physical** id (the tenancy adapter mapped the
24/// client's logical id, `docs/04` §5); the reader does no rewriting.
25#[derive(Clone, PartialEq, Eq, Debug)]
26pub struct ReadOp {
27 /// The physical destination to read from.
28 pub target: Target,
29 /// The physical document id to fetch.
30 pub id: String,
31 /// The `_routing` value (the partition id), if the placement routes.
32 pub routing: Option<String>,
33 /// The upstream wire protocol this read is dispatched over. Defaults to
34 /// [`Protocol::Http1`].
35 pub protocol: Protocol,
36 /// The W3C trace context to forward downstream (`traceparent`), so the
37 /// upstream's spans join this request's distributed trace. `None` = no
38 /// propagation header is sent.
39 pub trace: Option<TraceContext>,
40 /// Client headers to relay verbatim to the upstream (the forwarding policy's
41 /// output). Applied before [`trace`](Self::trace). Empty by default.
42 pub forward_headers: Vec<(String, String)>,
43}
44
45impl ReadOp {
46 /// Constructs a read operation (defaulting to HTTP/1.1 upstream).
47 #[must_use]
48 pub fn new(target: Target, id: impl Into<String>, routing: Option<String>) -> Self {
49 Self {
50 target,
51 id: id.into(),
52 routing,
53 protocol: Protocol::Http1,
54 trace: None,
55 forward_headers: Vec::new(),
56 }
57 }
58
59 /// Sets the upstream protocol for this op (builder style).
60 #[must_use]
61 pub fn with_protocol(mut self, protocol: Protocol) -> Self {
62 self.protocol = protocol;
63 self
64 }
65
66 /// Sets the trace context to propagate downstream (builder style).
67 #[must_use]
68 pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
69 self.trace = trace;
70 self
71 }
72
73 /// Sets the client headers to relay verbatim to the upstream (builder style).
74 #[must_use]
75 pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
76 self.forward_headers = headers;
77 self
78 }
79}
80
81/// The outcome of a read: whether the document was found, and its raw upstream
82/// body (the document as stored, before the read-path field strip).
83#[derive(Clone, PartialEq, Eq, Debug)]
84pub struct ReadOutcome {
85 /// The upstream HTTP status.
86 pub status: u16,
87 /// Whether the document exists.
88 pub found: bool,
89 /// The raw upstream response body (the stored document when `found`).
90 pub body: Vec<u8>,
91 /// Whether this read rode a reused pooled connection (NFR-P telemetry).
92 pub pool_reuse: bool,
93}
94
95impl ReadOutcome {
96 /// A hit carrying the stored document body.
97 #[must_use]
98 pub fn found(status: u16, body: Vec<u8>) -> Self {
99 Self {
100 status,
101 found: true,
102 body,
103 pool_reuse: false,
104 }
105 }
106
107 /// A miss (no such document).
108 #[must_use]
109 pub fn not_found(status: u16, body: Vec<u8>) -> Self {
110 Self {
111 status,
112 found: false,
113 body,
114 pool_reuse: false,
115 }
116 }
117
118 /// Records whether the dispatch reused a pooled connection (builder style).
119 #[must_use]
120 pub fn with_pool_reuse(mut self, reused: bool) -> Self {
121 self.pool_reuse = reused;
122 self
123 }
124}
125
126/// A search operation against a resolved [`Target`].
127///
128/// The body is the **already-wrapped** query (the tenancy partition filter has
129/// been applied, `docs/04` §4); the reader forwards it verbatim.
130#[derive(Clone, PartialEq, Eq, Debug)]
131pub struct SearchOp {
132 /// The physical destination to search.
133 pub target: Target,
134 /// The query body to forward upstream (already partition-filtered).
135 pub body: Vec<u8>,
136 /// The upstream wire protocol this search is dispatched over. Defaults to
137 /// [`Protocol::Http1`].
138 pub protocol: Protocol,
139 /// An already-allow-listed query string (without the `?`) to append to the
140 /// upstream URL, e.g. `scroll=1m` to open a scroll. The engine filters this
141 /// to cursor-safe params before it reaches here; the sink appends it verbatim.
142 pub query: Option<String>,
143 /// The W3C trace context to forward downstream (`traceparent`).
144 pub trace: Option<TraceContext>,
145 /// Client headers to relay verbatim to the upstream (the forwarding policy's
146 /// output). Applied before [`trace`](Self::trace). Empty by default.
147 pub forward_headers: Vec<(String, String)>,
148}
149
150impl SearchOp {
151 /// Constructs a search operation (defaulting to HTTP/1.1 upstream).
152 #[must_use]
153 pub fn new(target: Target, body: Vec<u8>) -> Self {
154 Self {
155 target,
156 body,
157 protocol: Protocol::Http1,
158 query: None,
159 trace: None,
160 forward_headers: Vec::new(),
161 }
162 }
163
164 /// Sets the upstream protocol for this op (builder style).
165 #[must_use]
166 pub fn with_protocol(mut self, protocol: Protocol) -> Self {
167 self.protocol = protocol;
168 self
169 }
170
171 /// Sets the (already allow-listed) upstream query string (builder style).
172 #[must_use]
173 pub fn with_query(mut self, query: Option<String>) -> Self {
174 self.query = query;
175 self
176 }
177
178 /// Sets the trace context to propagate downstream (builder style).
179 #[must_use]
180 pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
181 self.trace = trace;
182 self
183 }
184
185 /// Sets the client headers to relay verbatim to the upstream (builder style).
186 #[must_use]
187 pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
188 self.forward_headers = headers;
189 self
190 }
191}
192
193/// The outcome of a search: the upstream status and raw response body (the
194/// hits, before the read-path field strip).
195#[derive(Clone, PartialEq, Eq, Debug)]
196pub struct SearchOutcome {
197 /// The upstream HTTP status.
198 pub status: u16,
199 /// The raw upstream response body (the hits envelope).
200 pub body: Vec<u8>,
201 /// Whether this search rode a reused pooled connection (NFR-P telemetry).
202 pub pool_reuse: bool,
203}
204
205impl SearchOutcome {
206 /// Constructs a search outcome.
207 #[must_use]
208 pub fn new(status: u16, body: Vec<u8>) -> Self {
209 Self {
210 status,
211 body,
212 pool_reuse: false,
213 }
214 }
215
216 /// Records whether the dispatch reused a pooled connection (builder style).
217 #[must_use]
218 pub fn with_pool_reuse(mut self, reused: bool) -> Self {
219 self.pool_reuse = reused;
220 self
221 }
222}
223
224/// The outcome of a count: the upstream status and the matched document count.
225#[derive(Clone, Copy, PartialEq, Eq, Debug)]
226pub struct CountOutcome {
227 /// The upstream HTTP status.
228 pub status: u16,
229 /// The number of matching documents.
230 pub count: u64,
231 /// Whether this count rode a reused pooled connection (NFR-P telemetry).
232 pub pool_reuse: bool,
233}
234
235impl CountOutcome {
236 /// Constructs a count outcome.
237 #[must_use]
238 pub fn new(status: u16, count: u64) -> Self {
239 Self {
240 status,
241 count,
242 pool_reuse: false,
243 }
244 }
245
246 /// Records whether the dispatch reused a pooled connection (builder style).
247 #[must_use]
248 pub fn with_pool_reuse(mut self, reused: bool) -> Self {
249 self.pool_reuse = reused;
250 self
251 }
252}
253
254/// A raw cursor passthrough op (`docs/03` §6): forward `method path` with `body`
255/// to the specific `cluster` the cursor is pinned to, scroll/PIT continue,
256/// clear, or close. Unlike the typed ops, the destination is *already resolved*
257/// (the engine recovered it from the cursor's signed envelope), so this carries
258/// the cluster directly rather than a partition.
259#[derive(Clone, PartialEq, Eq, Debug)]
260pub struct CursorOp {
261 /// The cluster the cursor is pinned to.
262 pub cluster: ClusterId,
263 /// The HTTP method to forward (continue is `POST`/`GET`, clear/close `DELETE`).
264 pub method: HttpMethod,
265 /// The upstream path (e.g. `/_search/scroll`), already with the real cursor id.
266 pub path: String,
267 /// The request body to forward (the real, unwrapped cursor id substituted in).
268 pub body: Vec<u8>,
269 /// An already-allow-listed query string (without the `?`) to append to the
270 /// upstream URL, e.g. `keep_alive=1m` on PIT create. Filtered by the engine.
271 pub query: Option<String>,
272 /// The pinned cluster's base URL, when the engine knows it (the placement that
273 /// opened the cursor supplied it). `None` for an affinity continue recovered
274 /// from the envelope alone: the sink then reuses the pool the opening request
275 /// already built for this cluster, erroring only if none exists.
276 pub endpoint: Option<String>,
277 /// The upstream wire protocol. Defaults to [`Protocol::Http1`].
278 pub protocol: Protocol,
279 /// The W3C trace context to forward downstream.
280 pub trace: Option<TraceContext>,
281 /// Client headers to relay verbatim to the upstream (the forwarding policy's
282 /// output). Applied before [`trace`](Self::trace), so a proxy-injected trace
283 /// header wins when span export is on. Empty by default.
284 pub forward_headers: Vec<(String, String)>,
285}
286
287impl CursorOp {
288 /// Constructs a cursor passthrough op (defaulting to HTTP/1.1 upstream).
289 #[must_use]
290 pub fn new(
291 cluster: ClusterId,
292 method: HttpMethod,
293 path: impl Into<String>,
294 body: Vec<u8>,
295 ) -> Self {
296 Self {
297 cluster,
298 method,
299 path: path.into(),
300 body,
301 query: None,
302 endpoint: None,
303 protocol: Protocol::Http1,
304 trace: None,
305 forward_headers: Vec::new(),
306 }
307 }
308
309 /// Sets the client headers to relay verbatim to the upstream (builder style).
310 #[must_use]
311 pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
312 self.forward_headers = headers;
313 self
314 }
315
316 /// Sets the pinned cluster's base URL (builder style), when the engine knows
317 /// it. Lets the sink build the pool for an affinity request even on an
318 /// instance that did not serve the opening call.
319 #[must_use]
320 pub fn with_endpoint(mut self, endpoint: Option<String>) -> Self {
321 self.endpoint = endpoint;
322 self
323 }
324
325 /// Sets the upstream wire protocol (builder style).
326 #[must_use]
327 pub fn with_protocol(mut self, protocol: Protocol) -> Self {
328 self.protocol = protocol;
329 self
330 }
331
332 /// Sets the (already allow-listed) upstream query string (builder style).
333 #[must_use]
334 pub fn with_query(mut self, query: Option<String>) -> Self {
335 self.query = query;
336 self
337 }
338
339 /// Sets the trace context to propagate downstream (builder style).
340 #[must_use]
341 pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
342 self.trace = trace;
343 self
344 }
345}
346
347/// A verbatim forward whose request body is a **stream**, not buffered bytes
348/// (ADR-014 stage 2): the same destination shape as [`CursorOp`] but the body is
349/// supplied separately as a [`ByteBody`](crate::ByteBody) so it can be
350/// piped from the downstream connection straight to the upstream without ever
351/// being collected. Used by the tenant-agnostic passthrough path.
352#[derive(Clone, PartialEq, Eq, Debug)]
353pub struct ForwardOp {
354 /// The cluster to forward to.
355 pub cluster: ClusterId,
356 /// The HTTP method to forward.
357 pub method: HttpMethod,
358 /// The upstream path, forwarded verbatim.
359 pub path: String,
360 /// An already-allow-listed query string (without the `?`) to append upstream.
361 pub query: Option<String>,
362 /// The cluster's base URL, when known (so the pool can be built on any
363 /// instance). `None` reuses an existing pool, erroring if none exists.
364 pub endpoint: Option<String>,
365 /// The upstream wire protocol. Defaults to [`Protocol::Http1`].
366 pub protocol: Protocol,
367 /// The W3C trace context to forward downstream.
368 pub trace: Option<TraceContext>,
369 /// Client headers to relay verbatim to the upstream (the forwarding policy's
370 /// output). Applied before [`trace`](Self::trace). Empty by default.
371 pub forward_headers: Vec<(String, String)>,
372}
373
374impl ForwardOp {
375 /// Constructs a streaming forward op (defaulting to HTTP/1.1 upstream).
376 #[must_use]
377 pub fn new(cluster: ClusterId, method: HttpMethod, path: impl Into<String>) -> Self {
378 Self {
379 cluster,
380 method,
381 path: path.into(),
382 query: None,
383 endpoint: None,
384 protocol: Protocol::Http1,
385 trace: None,
386 forward_headers: Vec::new(),
387 }
388 }
389
390 /// Sets the client headers to relay verbatim to the upstream (builder style).
391 #[must_use]
392 pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
393 self.forward_headers = headers;
394 self
395 }
396
397 /// Sets the cluster's base URL (builder style).
398 #[must_use]
399 pub fn with_endpoint(mut self, endpoint: Option<String>) -> Self {
400 self.endpoint = endpoint;
401 self
402 }
403
404 /// Sets the (already allow-listed) upstream query string (builder style).
405 #[must_use]
406 pub fn with_query(mut self, query: Option<String>) -> Self {
407 self.query = query;
408 self
409 }
410
411 /// Sets the upstream wire protocol (builder style).
412 #[must_use]
413 pub fn with_protocol(mut self, protocol: Protocol) -> Self {
414 self.protocol = protocol;
415 self
416 }
417
418 /// Sets the trace context to propagate downstream (builder style).
419 #[must_use]
420 pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
421 self.trace = trace;
422 self
423 }
424}
425
426/// The outcome of a **streaming** verbatim forward (ADR-014): the upstream status
427/// and its response body as a live [`ByteBody`](crate::ByteBody) stream, piped
428/// back to the client without ever being collected. Unlike [`CursorOutcome`], the
429/// body is not materialized here, so this carries no derives (the stream is
430/// one-shot).
431pub struct StreamingForward {
432 /// The upstream HTTP status.
433 pub status: u16,
434 /// The upstream response body, streamed back verbatim.
435 pub body: crate::ByteBody,
436 /// The upstream `Content-Type`, forwarded verbatim so a non-JSON passthrough
437 /// body is not mislabeled `application/json`. `None` ⇒ the caller defaults to
438 /// JSON.
439 pub content_type: Option<String>,
440 /// Whether this op rode a reused pooled connection (NFR-P telemetry).
441 pub pool_reuse: bool,
442}
443
444impl std::fmt::Debug for StreamingForward {
445 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446 // The streamed body is not `Debug`; show the shape.
447 f.debug_struct("StreamingForward")
448 .field("status", &self.status)
449 .field("pool_reuse", &self.pool_reuse)
450 .finish_non_exhaustive()
451 }
452}
453
454/// The outcome of a **streaming** search (ADR-014, final stage): the upstream
455/// status and its response body as a live [`ByteBody`](crate::ByteBody), piped
456/// back through the engine's hit transform without ever being collected. Like
457/// [`StreamingForward`], the body is one-shot, so this carries no derives.
458pub struct StreamingSearch {
459 /// The upstream HTTP status.
460 pub status: u16,
461 /// The upstream response body, streamed back to be transformed on the fly.
462 pub body: crate::ByteBody,
463 /// Whether this search rode a reused pooled connection (NFR-P telemetry).
464 pub pool_reuse: bool,
465}
466
467impl std::fmt::Debug for StreamingSearch {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 f.debug_struct("StreamingSearch")
470 .field("status", &self.status)
471 .field("pool_reuse", &self.pool_reuse)
472 .finish_non_exhaustive()
473 }
474}
475
476/// The outcome of a cursor passthrough: the upstream status and raw body,
477/// forwarded back to the client verbatim.
478#[derive(Clone, PartialEq, Eq, Debug)]
479pub struct CursorOutcome {
480 /// The upstream HTTP status.
481 pub status: u16,
482 /// The raw upstream response body.
483 pub body: Vec<u8>,
484 /// The upstream `Content-Type`, forwarded verbatim so an admin/cursor
485 /// passthrough body (e.g. a `_cat` `text/plain`) is not mislabeled
486 /// `application/json`. `None` ⇒ the caller defaults to JSON.
487 pub content_type: Option<String>,
488 /// Whether this op rode a reused pooled connection (NFR-P telemetry).
489 pub pool_reuse: bool,
490}
491
492impl CursorOutcome {
493 /// Constructs a cursor outcome.
494 #[must_use]
495 pub fn new(status: u16, body: Vec<u8>) -> Self {
496 Self {
497 status,
498 body,
499 content_type: None,
500 pool_reuse: false,
501 }
502 }
503
504 /// Records whether the dispatch reused a pooled connection (builder style).
505 #[must_use]
506 pub fn with_pool_reuse(mut self, reused: bool) -> Self {
507 self.pool_reuse = reused;
508 self
509 }
510
511 /// Carries the upstream `Content-Type` (builder style).
512 #[must_use]
513 pub fn with_content_type(mut self, content_type: Option<String>) -> Self {
514 self.content_type = content_type;
515 self
516 }
517}
518
519/// Where reads come from.
520///
521/// The read counterpart of [`Sink`](crate::Sink). Kept separate because a read
522/// is inherently direct-to-cluster: a redundancy `QueueSink` can absorb writes
523/// but cannot answer a get-by-id or a search.
524///
525/// # Invariants
526///
527/// - MUST NOT panic; return [`SinkError`] for every transport/upstream failure
528/// (NFR-R1). A missing document is *not* an error, it is a
529/// [`ReadOutcome`] with `found == false`.
530pub trait Reader: Send + Sync {
531 /// Fetches a single document by physical id.
532 ///
533 /// # Errors
534 ///
535 /// Returns [`SinkError`] if the upstream cannot be reached or returns a
536 /// server error (a 404 for a missing document is a normal not-found
537 /// outcome, not an error).
538 fn get(
539 &self,
540 op: ReadOp,
541 ) -> impl std::future::Future<Output = Result<ReadOutcome, SinkError>> + Send;
542
543 /// Runs a search, returning the raw hits envelope.
544 ///
545 /// # Errors
546 ///
547 /// Returns [`SinkError`] if the upstream cannot be reached or returns a
548 /// server error.
549 fn search(
550 &self,
551 op: SearchOp,
552 ) -> impl std::future::Future<Output = Result<SearchOutcome, SinkError>> + Send;
553
554 /// Counts the documents matching a (partition-filtered) query.
555 ///
556 /// Takes the same [`SearchOp`] as [`Reader::search`], the wrapped query is
557 /// identical, but hits the count endpoint, returning only the total.
558 ///
559 /// # Errors
560 ///
561 /// Returns [`SinkError`] if the upstream cannot be reached or returns a
562 /// server error.
563 fn count(
564 &self,
565 op: SearchOp,
566 ) -> impl std::future::Future<Output = Result<CountOutcome, SinkError>> + Send;
567
568 /// Forwards a raw cursor request to its pinned cluster (scroll/PIT continue,
569 /// clear, close). The default is **unsupported**, a sink that cannot
570 /// passthrough (the in-memory test sink, a write-only queue) rejects it;
571 /// `OpenSearchSink` overrides it with a real upstream call.
572 ///
573 /// # Errors
574 ///
575 /// Returns [`SinkError`] if the sink does not support passthrough or the
576 /// upstream cannot be reached.
577 fn cursor(
578 &self,
579 _op: CursorOp,
580 ) -> impl std::future::Future<Output = Result<CursorOutcome, SinkError>> + Send {
581 async {
582 Err(SinkError::Transport {
583 kind: "cursor passthrough not supported by this sink",
584 })
585 }
586 }
587
588 /// Runs a search whose **response** streams back (ADR-014, final stage): the
589 /// upstream hits envelope is piped to the engine's hit transform without being
590 /// collected, so a large response (e.g. heavy `aggregations`) never lands in
591 /// memory. The default is **unsupported**; `OpenSearchSink` overrides it with a
592 /// real streamed upstream call.
593 ///
594 /// # Errors
595 ///
596 /// Returns [`SinkError`] if the sink does not support streaming search or the
597 /// upstream cannot be reached or returns a server error.
598 fn search_stream(
599 &self,
600 _op: SearchOp,
601 ) -> impl std::future::Future<Output = Result<StreamingSearch, SinkError>> + Send {
602 async {
603 Err(SinkError::Transport {
604 kind: "streaming search not supported by this sink",
605 })
606 }
607 }
608
609 /// Forwards a request to a cluster with the body supplied as a **stream**
610 /// (ADR-014 stage 2): the verbatim-passthrough path pipes the downstream body
611 /// straight upstream without buffering. The default is **unsupported**;
612 /// `OpenSearchSink` overrides it with a real streamed upstream call.
613 ///
614 /// # Errors
615 ///
616 /// Returns [`SinkError`] if the sink does not support streaming forward or the
617 /// upstream cannot be reached.
618 fn forward_stream(
619 &self,
620 _op: ForwardOp,
621 _body: crate::opensearch::ByteBody,
622 ) -> impl std::future::Future<Output = Result<StreamingForward, SinkError>> + Send {
623 async {
624 Err(SinkError::Transport {
625 kind: "streaming forward not supported by this sink",
626 })
627 }
628 }
629}