Skip to main content

osproxy_sink/
batch.rs

1//! The unit of work handed to a [`Sink`](crate::Sink): epoch-stamped writes
2//! against a single target.
3
4use osproxy_core::{Epoch, Target, TraceContext};
5use osproxy_spi::Protocol;
6
7/// A single write operation against a resolved [`Target`].
8///
9/// Carries the epoch the routing decision was derived from, stamped here so the
10/// sink (or a future migration-aware backend) can reject a stale-epoch write
11/// (`docs/06` §2). For M1 the epoch is recorded and forwarded; stale-epoch
12/// rejection arrives with migration in M5.
13#[derive(Clone, PartialEq, Eq, Debug)]
14pub struct WriteOp {
15    /// The physical destination of this operation.
16    pub target: Target,
17    /// The document operation to perform.
18    pub doc: DocOp,
19    /// The placement epoch this write was resolved against.
20    pub epoch: Epoch,
21    /// The upstream wire protocol this op is dispatched over (per-request,
22    /// `docs/04` §7). Defaults to [`Protocol::Http1`].
23    pub protocol: Protocol,
24    /// The W3C trace context to forward downstream (`traceparent`), so the
25    /// upstream's spans join this request's distributed trace.
26    pub trace: Option<TraceContext>,
27    /// Client headers to relay verbatim to the upstream (the forwarding policy's
28    /// output). Applied before [`trace`](Self::trace). Empty by default.
29    pub forward_headers: Vec<(String, String)>,
30}
31
32impl WriteOp {
33    /// Constructs a write operation (defaulting to HTTP/1.1 upstream).
34    #[must_use]
35    pub fn new(target: Target, doc: DocOp, epoch: Epoch) -> Self {
36        Self {
37            target,
38            doc,
39            epoch,
40            protocol: Protocol::Http1,
41            trace: None,
42            forward_headers: Vec::new(),
43        }
44    }
45
46    /// Sets the upstream protocol for this op (builder style).
47    #[must_use]
48    pub fn with_protocol(mut self, protocol: Protocol) -> Self {
49        self.protocol = protocol;
50        self
51    }
52
53    /// Sets the trace context to propagate downstream (builder style).
54    #[must_use]
55    pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
56        self.trace = trace;
57        self
58    }
59
60    /// Sets the client headers to relay verbatim to the upstream (builder style).
61    #[must_use]
62    pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
63        self.forward_headers = headers;
64        self
65    }
66}
67
68/// A document-level operation: the already-transformed body plus the
69/// constructed id/routing (the tenancy rewrite has already run, `docs/04`).
70///
71/// Not `#[non_exhaustive]`: every sink must handle every op kind, so adding one
72/// should force sinks to be updated.
73#[derive(Clone, PartialEq, Eq, Debug)]
74pub enum DocOp {
75    /// Index (create-or-replace) a document.
76    Index {
77        /// The constructed document id, or `None` to let OpenSearch auto-assign.
78        id: Option<String>,
79        /// The `_routing` value (the partition id), if routing is enabled.
80        routing: Option<String>,
81        /// The transformed document body (injected fields applied).
82        body: bytes::Bytes,
83    },
84    /// Create a document, failing with a conflict if the id already exists
85    /// (`op_type=create`). Distinct from [`DocOp::Index`] so the sink can target
86    /// the `_create` endpoint and surface the 409 (`docs/04` §3).
87    Create {
88        /// The constructed document id, or `None` to let OpenSearch auto-assign.
89        id: Option<String>,
90        /// The `_routing` value (the partition id), if routing is enabled.
91        routing: Option<String>,
92        /// The transformed document body (injected fields applied).
93        body: bytes::Bytes,
94    },
95    /// Partial-update a document by id (`_update`): the body carries the
96    /// already-transformed `doc`/`upsert`/`script` (`docs/04` §3).
97    Update {
98        /// The constructed document id to update.
99        id: String,
100        /// The `_routing` value, if routing is enabled.
101        routing: Option<String>,
102        /// The transformed update body (injected into `doc`/`upsert`).
103        body: bytes::Bytes,
104    },
105    /// Delete a document by id.
106    Delete {
107        /// The constructed document id to delete.
108        id: String,
109        /// The `_routing` value, if routing is enabled.
110        routing: Option<String>,
111    },
112}
113
114/// A batch of operations destined for one target.
115///
116/// M1 single-doc ingest produces a one-operation batch; the same type carries a
117/// demultiplexed per-target slice of a `_bulk` request in M3 (`docs/04` §3).
118#[derive(Clone, PartialEq, Eq, Debug, Default)]
119pub struct WriteBatch {
120    ops: Vec<WriteOp>,
121}
122
123impl WriteBatch {
124    /// An empty batch.
125    #[must_use]
126    pub fn new() -> Self {
127        Self::default()
128    }
129
130    /// A batch of a single operation (the M1 single-doc case).
131    #[must_use]
132    pub fn single(op: WriteOp) -> Self {
133        Self { ops: vec![op] }
134    }
135
136    /// Appends an operation (builder style).
137    #[must_use]
138    pub fn with(mut self, op: WriteOp) -> Self {
139        self.ops.push(op);
140        self
141    }
142
143    /// Tags every operation in the batch with the same downstream trace context
144    /// (builder style), so all upstream requests for this batch propagate it.
145    #[must_use]
146    pub fn with_trace(mut self, trace: Option<&TraceContext>) -> Self {
147        for op in &mut self.ops {
148            // Clone per op: one context fans across the whole batch (it carries an
149            // owned tracestate, so it is no longer `Copy`). Borrowed, not consumed.
150            op.trace = trace.cloned();
151        }
152        self
153    }
154
155    /// Tags every operation in the batch with the same forwarded client headers
156    /// (builder style), so all upstream requests for this batch relay them.
157    #[must_use]
158    pub fn with_forward_headers(mut self, headers: &[(String, String)]) -> Self {
159        for op in &mut self.ops {
160            op.forward_headers = headers.to_vec();
161        }
162        self
163    }
164
165    /// The operations in this batch, in order.
166    #[must_use]
167    pub fn ops(&self) -> &[WriteOp] {
168        &self.ops
169    }
170
171    /// Whether the batch has no operations.
172    #[must_use]
173    pub fn is_empty(&self) -> bool {
174        self.ops.is_empty()
175    }
176
177    /// The number of operations.
178    #[must_use]
179    pub fn len(&self) -> usize {
180        self.ops.len()
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187    use osproxy_core::{ClusterId, IndexName};
188
189    fn op(id: &str) -> WriteOp {
190        WriteOp::new(
191            Target::new(ClusterId::from("c"), IndexName::from("i")),
192            DocOp::Index {
193                id: Some(id.to_owned()),
194                routing: Some("p".to_owned()),
195                body: bytes::Bytes::from_static(b"{}"),
196            },
197            Epoch::new(1),
198        )
199    }
200
201    #[test]
202    fn single_batch_has_one_op() {
203        let b = WriteBatch::single(op("x"));
204        assert_eq!(b.len(), 1);
205        assert!(!b.is_empty());
206        assert_eq!(b.ops()[0].epoch, Epoch::new(1));
207    }
208
209    #[test]
210    fn empty_and_builder() {
211        let b = WriteBatch::new();
212        assert!(b.is_empty());
213        let b = b.with(op("a")).with(op("b"));
214        assert_eq!(b.len(), 2);
215    }
216}