osproxy_engine/asyncwrite.rs
1//! The asynchronous fan-out write mode (`docs/04` §9).
2//!
3//! In sync mode the proxy forwards a mutation to the upstream and returns its
4//! real result. In **async mode** it instead durably enqueues the fully-resolved,
5//! epoch-stamped op onto a [`WriteQueue`] and returns `202 Accepted` with an
6//! `op_id` handle. A separate downstream component consumes the queue and applies
7//! each op to one or more destinations, so the proxy's only promise is *durable
8//! acceptance into the pipeline*, never application or its result.
9//!
10//! This is deliberately narrow:
11//!
12//! * The `202` is returned **only after** the queue acknowledges the enqueue, so a
13//! client that got `202` knows the op will not be silently dropped. A queue that
14//! cannot accept the op fails the request rather than lying.
15//! * The op carries an **`op_id`**, client-supplied via the `X-Op-Id` header
16//! (validated) or proxy-minted from the request id otherwise, that is both the
17//! correlation handle and the idempotency key the downstream applier dedups on.
18//! * The proxy hosts **no status surface**: whether and how a failed apply is
19//! reported back is the downstream's responsibility, out of scope here.
20//!
21//! Mode is negotiated per request (`X-Write-Mode`) over a deployment baseline; see
22//! [`crate::Pipeline::with_baseline_write_mode`].
23
24use std::future::Future;
25use std::pin::Pin;
26
27use osproxy_core::RequestId;
28use osproxy_sink::{Reader, Sink, WriteBatch};
29use osproxy_spi::RequestCtx;
30use osproxy_tenancy::{Resolved, Router};
31use serde_json::json;
32
33use crate::pipeline::{Pipeline, PipelineResponse};
34
35/// How a mutation is dispatched.
36#[derive(Clone, Copy, PartialEq, Eq, Debug, Default)]
37pub enum WriteMode {
38 /// Forward to the upstream and return its real result. The default.
39 #[default]
40 Sync,
41 /// Durably enqueue the op for downstream fan-out and return `202` + a handle.
42 Async,
43}
44
45impl WriteMode {
46 /// Parses an `X-Write-Mode` header value (ASCII-case-insensitive). Unknown
47 /// values yield `None` so the caller can reject rather than guess.
48 #[must_use]
49 pub fn parse(value: &str) -> Option<Self> {
50 if value.eq_ignore_ascii_case("sync") {
51 Some(Self::Sync)
52 } else if value.eq_ignore_ascii_case("async") {
53 Some(Self::Async)
54 } else {
55 None
56 }
57 }
58}
59
60/// The maximum accepted length of a client-supplied `X-Op-Id`. The id is keyed
61/// into the queue and logged, so it is bounded and charset-restricted to keep it
62/// from injecting into a downstream keyspace.
63const MAX_OP_ID_LEN: usize = 128;
64
65/// Whether `candidate` is an acceptable op id: non-empty, within
66/// `MAX_OP_ID_LEN`, and limited to a safe key charset (`A-Za-z0-9-_.:`).
67#[must_use]
68pub fn valid_op_id(candidate: &str) -> bool {
69 !candidate.is_empty()
70 && candidate.len() <= MAX_OP_ID_LEN
71 && candidate
72 .bytes()
73 .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.' | b':'))
74}
75
76/// Resolves the op id for a request: the validated client-supplied `X-Op-Id`, or
77/// the proxy's own request id when the header is absent or malformed. Always
78/// returns a usable id so the loop can always be correlated.
79#[must_use]
80pub fn op_id_for(ctx: &RequestCtx<'_>, request_id: &RequestId) -> String {
81 ctx.headers()
82 .get("x-op-id")
83 .filter(|h| valid_op_id(h))
84 .map_or_else(|| request_id.as_str().to_owned(), ToOwned::to_owned)
85}
86
87/// Why a mutation cannot be honored in async mode, if it cannot, a short,
88/// value-free reason for the `400`. These all need read-modify-write against the
89/// document's current state, which does not exist at enqueue time, so async
90/// rejects them rather than silently mis-applying or dropping the precondition.
91#[must_use]
92pub fn unsupported_async(ctx: &RequestCtx<'_>) -> Option<&'static str> {
93 // Optimistic concurrency: the precondition is checked against the live
94 // version, so dropping it (the only async option) would corrupt the contract.
95 if let Some(query) = ctx.query() {
96 let has_cas = query.split('&').any(|pair| {
97 let key = pair.split('=').next().unwrap_or(pair);
98 matches!(key, "if_seq_no" | "if_primary_term" | "version")
99 });
100 if has_cas {
101 return Some("optimistic concurrency (if_seq_no/if_primary_term/version) is not supported in async write mode");
102 }
103 }
104 // A scripted/partial `_update` merges into the current document; async fan-out
105 // has no single authoritative document to merge against. The path is
106 // `/{index}/_update/{id}`, so match the `_update` path segment, not a suffix.
107 if ctx.path().split('/').any(|seg| seg == "_update") {
108 return Some("scripted/partial _update is not supported in async write mode");
109 }
110 None
111}
112
113/// A mutation accepted for asynchronous fan-out: the fully-resolved,
114/// epoch-stamped [`WriteBatch`] the sync path would have dispatched, plus the
115/// correlation/idempotency id and the ordering key.
116#[derive(Clone, Debug)]
117pub struct QueuedWrite {
118 /// Correlation handle and downstream idempotency key.
119 pub op_id: String,
120 /// The partition id, used as the queue partition key so all ops for one
121 /// logical partition stay ordered through the fan-out.
122 pub partition_key: String,
123 /// The resolved op(s), identical to what the sync path would deliver.
124 pub batch: WriteBatch,
125}
126
127/// A queue that durably accepts resolved write ops for downstream fan-out.
128///
129/// The implementor (a Kafka producer with a write-ahead log, in the shipped
130/// binary) must only resolve the future `Ok` once the op is durably accepted, so
131/// the `202` the pipeline returns is truthful. Implementations MUST NOT panic.
132pub trait WriteQueue: Send + Sync {
133 /// Whether async writes can be served. `false` (the default [`NoQueue`])
134 /// means an async request is refused rather than dropped.
135 fn enabled(&self) -> bool {
136 false
137 }
138
139 /// Durably enqueues `write`, resolving `Ok` only once it is accepted.
140 fn enqueue<'a>(
141 &'a self,
142 write: QueuedWrite,
143 ) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send + 'a>>;
144}
145
146/// A failure to enqueue an async write. Carries a value-free reason only.
147#[derive(Clone, Copy, Debug, PartialEq, Eq)]
148pub struct QueueError {
149 /// A short, value-free description (e.g. `"broker unavailable"`).
150 pub reason: &'static str,
151}
152
153/// The default queue: async writes are unavailable. An async request against a
154/// pipeline with no queue is refused with `422`, never accepted-and-dropped.
155#[derive(Clone, Copy, Debug, Default)]
156pub struct NoQueue;
157
158impl WriteQueue for NoQueue {
159 fn enqueue<'a>(
160 &'a self,
161 _write: QueuedWrite,
162 ) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send + 'a>> {
163 Box::pin(async {
164 Err(QueueError {
165 reason: "async write queue is not configured",
166 })
167 })
168 }
169}
170
171/// The `202 Accepted` envelope returned once an async write is durably enqueued.
172///
173/// A generic async handle, not a synthetic OpenSearch result: `result:"queued"`
174/// is honest about what happened (the op was accepted, not applied), and `op_id`
175/// is the handle the client correlates any downstream outcome against.
176#[must_use]
177pub(crate) fn accepted_response(op_id: &str, index: &str) -> PipelineResponse {
178 PipelineResponse {
179 status: 202,
180 body: serde_json::to_vec(&json!({
181 "op_id": op_id,
182 "status": "accepted",
183 "result": "queued",
184 "_index": index,
185 }))
186 .unwrap_or_else(|_| b"{}".to_vec()),
187 content_type: None,
188 }
189}
190
191/// The `400` returned when an op cannot be honored in async mode (see
192/// [`unsupported_async`]). The `reason` is value-free.
193#[must_use]
194pub(crate) fn unsupported_response(reason: &str, index: &str) -> PipelineResponse {
195 PipelineResponse {
196 status: 400,
197 body: serde_json::to_vec(&json!({
198 "status": "rejected",
199 "error": reason,
200 "_index": index,
201 }))
202 .unwrap_or_else(|_| b"{}".to_vec()),
203 content_type: None,
204 }
205}
206
207/// The `422` returned when async mode was requested but no queue is configured,
208/// the op is refused, never accepted-and-dropped.
209#[must_use]
210pub(crate) fn unavailable_response(index: &str) -> PipelineResponse {
211 PipelineResponse {
212 status: 422,
213 body: serde_json::to_vec(&json!({
214 "status": "rejected",
215 "error": "async write mode is not available on this proxy",
216 "_index": index,
217 }))
218 .unwrap_or_else(|_| b"{}".to_vec()),
219 content_type: None,
220 }
221}
222
223/// The `503` returned when the queue refused the op. Retryable: the same `op_id`
224/// makes the retry idempotent downstream.
225#[must_use]
226pub(crate) fn enqueue_failed_response(op_id: &str, index: &str) -> PipelineResponse {
227 PipelineResponse {
228 status: 503,
229 body: serde_json::to_vec(&json!({
230 "op_id": op_id,
231 "status": "rejected",
232 "error": "async write could not be enqueued",
233 "_index": index,
234 }))
235 .unwrap_or_else(|_| b"{}".to_vec()),
236 content_type: None,
237 }
238}
239
240impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
241 /// Durably enqueues a resolved write for downstream fan-out and returns the
242 /// `202` handle (`docs/04` §9). The `202` is produced **only after** the queue
243 /// acknowledges the enqueue; a missing queue is refused (`422`) and an enqueue
244 /// failure is reported (`503`), the op is never accepted-and-dropped. No live
245 /// epoch gate runs here: the op carries its epoch and the downstream applier
246 /// owns staleness, since there is no synchronous upstream to hold.
247 pub(crate) async fn enqueue_async(
248 &self,
249 ctx: &RequestCtx<'_>,
250 resolved: &Resolved,
251 batch: WriteBatch,
252 ) -> PipelineResponse {
253 let index = ctx.logical_index();
254 // A client error takes precedence over a missing queue: an op that cannot
255 // be honored async is rejected (`400`) whether or not a queue is wired.
256 if let Some(reason) = unsupported_async(ctx) {
257 return unsupported_response(reason, index);
258 }
259 if !self.write_queue.enabled() {
260 return unavailable_response(index);
261 }
262 let op_id = op_id_for(ctx, ctx.request_id());
263 let write = QueuedWrite {
264 op_id: op_id.clone(),
265 partition_key: resolved.partition.as_str().to_owned(),
266 batch,
267 };
268 match self.write_queue.enqueue(write).await {
269 Ok(()) => accepted_response(&op_id, index),
270 Err(_) => enqueue_failed_response(&op_id, index),
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn parses_known_modes_case_insensitively_and_rejects_unknown() {
281 assert_eq!(WriteMode::parse("sync"), Some(WriteMode::Sync));
282 assert_eq!(WriteMode::parse("ASYNC"), Some(WriteMode::Async));
283 assert_eq!(WriteMode::parse("queue"), None);
284 assert_eq!(WriteMode::parse(""), None);
285 }
286
287 #[test]
288 fn op_id_validation_bounds_length_and_charset() {
289 assert!(valid_op_id("a-b_c.d:1"));
290 assert!(!valid_op_id(""));
291 assert!(!valid_op_id("has space"));
292 assert!(!valid_op_id("inject\nkey"));
293 assert!(!valid_op_id(&"x".repeat(MAX_OP_ID_LEN + 1)));
294 assert!(valid_op_id(&"x".repeat(MAX_OP_ID_LEN)));
295 }
296
297 #[tokio::test]
298 async fn no_queue_is_disabled_and_refuses() {
299 assert!(!NoQueue.enabled());
300 let write = QueuedWrite {
301 op_id: "op-1".to_owned(),
302 partition_key: "acme".to_owned(),
303 batch: WriteBatch::single(test_op()),
304 };
305 let err = NoQueue.enqueue(write).await.unwrap_err();
306 assert_eq!(err.reason, "async write queue is not configured");
307 }
308
309 fn test_op() -> osproxy_sink::WriteOp {
310 use osproxy_core::{ClusterId, Epoch, IndexName, Target};
311 use osproxy_sink::{DocOp, WriteOp};
312 WriteOp::new(
313 Target::new(ClusterId::from("c"), IndexName::from("i")),
314 DocOp::Index {
315 id: Some("p:1".to_owned()),
316 routing: None,
317 body: b"{}".to_vec(),
318 },
319 Epoch::new(1),
320 )
321 }
322}