osproxy_sink/batch.rs
1//! The unit of work handed to a [`Sink`](crate::Sink): epoch-stamped writes
2//! against a single target.
3
4use osproxy_core::{Epoch, Target, TraceContext};
5use osproxy_spi::Protocol;
6
7/// A single write operation against a resolved [`Target`].
8///
9/// Carries the epoch the routing decision was derived from, stamped here so the
10/// sink (or a future migration-aware backend) can reject a stale-epoch write
11/// (`docs/06` §2). For M1 the epoch is recorded and forwarded; stale-epoch
12/// rejection arrives with migration in M5.
13#[derive(Clone, PartialEq, Eq, Debug)]
14pub struct WriteOp {
15 /// The physical destination of this operation.
16 pub target: Target,
17 /// The document operation to perform.
18 pub doc: DocOp,
19 /// The placement epoch this write was resolved against.
20 pub epoch: Epoch,
21 /// The upstream wire protocol this op is dispatched over (per-request,
22 /// `docs/04` §7). Defaults to [`Protocol::Http1`].
23 pub protocol: Protocol,
24 /// The W3C trace context to forward downstream (`traceparent`), so the
25 /// upstream's spans join this request's distributed trace.
26 pub trace: Option<TraceContext>,
27 /// Client headers to relay verbatim to the upstream (the forwarding policy's
28 /// output). Applied before [`trace`](Self::trace). Empty by default.
29 pub forward_headers: Vec<(String, String)>,
30}
31
32impl WriteOp {
33 /// Constructs a write operation (defaulting to HTTP/1.1 upstream).
34 #[must_use]
35 pub fn new(target: Target, doc: DocOp, epoch: Epoch) -> Self {
36 Self {
37 target,
38 doc,
39 epoch,
40 protocol: Protocol::Http1,
41 trace: None,
42 forward_headers: Vec::new(),
43 }
44 }
45
46 /// Sets the upstream protocol for this op (builder style).
47 #[must_use]
48 pub fn with_protocol(mut self, protocol: Protocol) -> Self {
49 self.protocol = protocol;
50 self
51 }
52
53 /// Sets the trace context to propagate downstream (builder style).
54 #[must_use]
55 pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
56 self.trace = trace;
57 self
58 }
59
60 /// Sets the client headers to relay verbatim to the upstream (builder style).
61 #[must_use]
62 pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
63 self.forward_headers = headers;
64 self
65 }
66}
67
68/// A document-level operation: the already-transformed body plus the
69/// constructed id/routing (the tenancy rewrite has already run, `docs/04`).
70///
71/// Not `#[non_exhaustive]`: every sink must handle every op kind, so adding one
72/// should force sinks to be updated.
73#[derive(Clone, PartialEq, Eq, Debug)]
74pub enum DocOp {
75 /// Index (create-or-replace) a document.
76 Index {
77 /// The constructed document id, or `None` to let OpenSearch auto-assign.
78 id: Option<String>,
79 /// The `_routing` value (the partition id), if routing is enabled.
80 routing: Option<String>,
81 /// The transformed document body (injected fields applied).
82 body: Vec<u8>,
83 },
84 /// Create a document, failing with a conflict if the id already exists
85 /// (`op_type=create`). Distinct from [`DocOp::Index`] so the sink can target
86 /// the `_create` endpoint and surface the 409 (`docs/04` §3).
87 Create {
88 /// The constructed document id, or `None` to let OpenSearch auto-assign.
89 id: Option<String>,
90 /// The `_routing` value (the partition id), if routing is enabled.
91 routing: Option<String>,
92 /// The transformed document body (injected fields applied).
93 body: Vec<u8>,
94 },
95 /// Partial-update a document by id (`_update`): the body carries the
96 /// already-transformed `doc`/`upsert`/`script` (`docs/04` §3).
97 Update {
98 /// The constructed document id to update.
99 id: String,
100 /// The `_routing` value, if routing is enabled.
101 routing: Option<String>,
102 /// The transformed update body (injected into `doc`/`upsert`).
103 body: Vec<u8>,
104 },
105 /// Delete a document by id.
106 Delete {
107 /// The constructed document id to delete.
108 id: String,
109 /// The `_routing` value, if routing is enabled.
110 routing: Option<String>,
111 },
112}
113
114/// A batch of operations destined for one target.
115///
116/// M1 single-doc ingest produces a one-operation batch; the same type carries a
117/// demultiplexed per-target slice of a `_bulk` request in M3 (`docs/04` §3).
118#[derive(Clone, PartialEq, Eq, Debug, Default)]
119pub struct WriteBatch {
120 ops: Vec<WriteOp>,
121}
122
123impl WriteBatch {
124 /// An empty batch.
125 #[must_use]
126 pub fn new() -> Self {
127 Self::default()
128 }
129
130 /// A batch of a single operation (the M1 single-doc case).
131 #[must_use]
132 pub fn single(op: WriteOp) -> Self {
133 Self { ops: vec![op] }
134 }
135
136 /// Appends an operation (builder style).
137 #[must_use]
138 pub fn with(mut self, op: WriteOp) -> Self {
139 self.ops.push(op);
140 self
141 }
142
143 /// Tags every operation in the batch with the same downstream trace context
144 /// (builder style), so all upstream requests for this batch propagate it.
145 #[must_use]
146 pub fn with_trace(mut self, trace: Option<&TraceContext>) -> Self {
147 for op in &mut self.ops {
148 // Clone per op: one context fans across the whole batch (it carries an
149 // owned tracestate, so it is no longer `Copy`). Borrowed, not consumed.
150 op.trace = trace.cloned();
151 }
152 self
153 }
154
155 /// Tags every operation in the batch with the same forwarded client headers
156 /// (builder style), so all upstream requests for this batch relay them.
157 #[must_use]
158 pub fn with_forward_headers(mut self, headers: &[(String, String)]) -> Self {
159 for op in &mut self.ops {
160 op.forward_headers = headers.to_vec();
161 }
162 self
163 }
164
165 /// The operations in this batch, in order.
166 #[must_use]
167 pub fn ops(&self) -> &[WriteOp] {
168 &self.ops
169 }
170
171 /// Whether the batch has no operations.
172 #[must_use]
173 pub fn is_empty(&self) -> bool {
174 self.ops.is_empty()
175 }
176
177 /// The number of operations.
178 #[must_use]
179 pub fn len(&self) -> usize {
180 self.ops.len()
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use osproxy_core::{ClusterId, IndexName};
188
189 fn op(id: &str) -> WriteOp {
190 WriteOp::new(
191 Target::new(ClusterId::from("c"), IndexName::from("i")),
192 DocOp::Index {
193 id: Some(id.to_owned()),
194 routing: Some("p".to_owned()),
195 body: b"{}".to_vec(),
196 },
197 Epoch::new(1),
198 )
199 }
200
201 #[test]
202 fn single_batch_has_one_op() {
203 let b = WriteBatch::single(op("x"));
204 assert_eq!(b.len(), 1);
205 assert!(!b.is_empty());
206 assert_eq!(b.ops()[0].epoch, Epoch::new(1));
207 }
208
209 #[test]
210 fn empty_and_builder() {
211 let b = WriteBatch::new();
212 assert!(b.is_empty());
213 let b = b.with(op("a")).with(op("b"));
214 assert_eq!(b.len(), 2);
215 }
216}