Skip to main content

osproxy_engine/
asyncwrite.rs

1//! The asynchronous fan-out write mode (`docs/04` §9).
2//!
3//! In sync mode the proxy forwards a mutation to the upstream and returns its
4//! real result. In **async mode** it instead durably enqueues the fully-resolved,
5//! epoch-stamped op onto a [`WriteQueue`] and returns `202 Accepted` with an
6//! `op_id` handle. A separate downstream component consumes the queue and applies
7//! each op to one or more destinations, so the proxy's only promise is *durable
8//! acceptance into the pipeline*, never application or its result.
9//!
10//! This is deliberately narrow:
11//!
12//! * The `202` is returned **only after** the queue acknowledges the enqueue, so a
13//!   client that got `202` knows the op will not be silently dropped. A queue that
14//!   cannot accept the op fails the request rather than lying.
15//! * The op carries an **`op_id`**, client-supplied via the `X-Op-Id` header
16//!   (validated) or proxy-minted from the request id otherwise, that is both the
17//!   correlation handle and the idempotency key the downstream applier dedups on.
18//! * The proxy hosts **no status surface**: whether and how a failed apply is
19//!   reported back is the downstream's responsibility, out of scope here.
20//!
21//! Mode is negotiated per request (`X-Write-Mode`) over a deployment baseline; see
22//! [`crate::Pipeline::with_baseline_write_mode`].
23
24use std::future::Future;
25use std::pin::Pin;
26
27use osproxy_core::RequestId;
28use osproxy_sink::{Reader, Sink, WriteBatch};
29use osproxy_spi::RequestCtx;
30use osproxy_tenancy::{Resolved, Router};
31use serde_json::json;
32
33use crate::pipeline::{Pipeline, PipelineResponse};
34
35/// How a mutation is dispatched.
36#[derive(Clone, Copy, PartialEq, Eq, Debug, Default)]
37pub enum WriteMode {
38    /// Forward to the upstream and return its real result. The default.
39    #[default]
40    Sync,
41    /// Durably enqueue the op for downstream fan-out and return `202` + a handle.
42    Async,
43}
44
45impl WriteMode {
46    /// Parses an `X-Write-Mode` header value (ASCII-case-insensitive). Unknown
47    /// values yield `None` so the caller can reject rather than guess.
48    #[must_use]
49    pub fn parse(value: &str) -> Option<Self> {
50        if value.eq_ignore_ascii_case("sync") {
51            Some(Self::Sync)
52        } else if value.eq_ignore_ascii_case("async") {
53            Some(Self::Async)
54        } else {
55            None
56        }
57    }
58}
59
60/// The maximum accepted length of a client-supplied `X-Op-Id`. The id is keyed
61/// into the queue and logged, so it is bounded and charset-restricted to keep it
62/// from injecting into a downstream keyspace.
63const MAX_OP_ID_LEN: usize = 128;
64
65/// Whether `candidate` is an acceptable op id: non-empty, within
66/// `MAX_OP_ID_LEN`, and limited to a safe key charset (`A-Za-z0-9-_.:`).
67#[must_use]
68pub fn valid_op_id(candidate: &str) -> bool {
69    !candidate.is_empty()
70        && candidate.len() <= MAX_OP_ID_LEN
71        && candidate
72            .bytes()
73            .all(|b| b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.' | b':'))
74}
75
76/// Resolves the op id for a request: the validated client-supplied `X-Op-Id`, or
77/// the proxy's own request id when the header is absent or malformed. Always
78/// returns a usable id so the loop can always be correlated.
79#[must_use]
80pub fn op_id_for(ctx: &RequestCtx<'_>, request_id: &RequestId) -> String {
81    ctx.headers()
82        .get("x-op-id")
83        .filter(|h| valid_op_id(h))
84        .map_or_else(|| request_id.as_str().to_owned(), ToOwned::to_owned)
85}
86
87/// Why a mutation cannot be honored in async mode, if it cannot, a short,
88/// value-free reason for the `400`. These all need read-modify-write against the
89/// document's current state, which does not exist at enqueue time, so async
90/// rejects them rather than silently mis-applying or dropping the precondition.
91#[must_use]
92pub fn unsupported_async(ctx: &RequestCtx<'_>) -> Option<&'static str> {
93    // Optimistic concurrency: the precondition is checked against the live
94    // version, so dropping it (the only async option) would corrupt the contract.
95    if let Some(query) = ctx.query() {
96        let has_cas = query.split('&').any(|pair| {
97            let key = pair.split('=').next().unwrap_or(pair);
98            matches!(key, "if_seq_no" | "if_primary_term" | "version")
99        });
100        if has_cas {
101            return Some("optimistic concurrency (if_seq_no/if_primary_term/version) is not supported in async write mode");
102        }
103    }
104    // A scripted/partial `_update` merges into the current document; async fan-out
105    // has no single authoritative document to merge against. The path is
106    // `/{index}/_update/{id}`, so match the `_update` path segment, not a suffix.
107    if ctx.path().split('/').any(|seg| seg == "_update") {
108        return Some("scripted/partial _update is not supported in async write mode");
109    }
110    None
111}
112
113/// A mutation accepted for asynchronous fan-out: the fully-resolved,
114/// epoch-stamped [`WriteBatch`] the sync path would have dispatched, plus the
115/// correlation/idempotency id and the ordering key.
116#[derive(Clone, Debug)]
117pub struct QueuedWrite {
118    /// Correlation handle and downstream idempotency key.
119    pub op_id: String,
120    /// The partition id, used as the queue partition key so all ops for one
121    /// logical partition stay ordered through the fan-out.
122    pub partition_key: String,
123    /// The resolved op(s), identical to what the sync path would deliver.
124    pub batch: WriteBatch,
125}
126
127/// A queue that durably accepts resolved write ops for downstream fan-out.
128///
129/// The implementor (a Kafka producer with a write-ahead log, in the shipped
130/// binary) must only resolve the future `Ok` once the op is durably accepted, so
131/// the `202` the pipeline returns is truthful. Implementations MUST NOT panic.
132pub trait WriteQueue: Send + Sync {
133    /// Whether async writes can be served. `false` (the default [`NoQueue`])
134    /// means an async request is refused rather than dropped.
135    fn enabled(&self) -> bool {
136        false
137    }
138
139    /// Durably enqueues `write`, resolving `Ok` only once it is accepted.
140    fn enqueue<'a>(
141        &'a self,
142        write: QueuedWrite,
143    ) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send + 'a>>;
144}
145
146/// A failure to enqueue an async write. Carries a value-free reason only.
147#[derive(Clone, Copy, Debug, PartialEq, Eq)]
148pub struct QueueError {
149    /// A short, value-free description (e.g. `"broker unavailable"`).
150    pub reason: &'static str,
151}
152
153/// The default queue: async writes are unavailable. An async request against a
154/// pipeline with no queue is refused with `422`, never accepted-and-dropped.
155#[derive(Clone, Copy, Debug, Default)]
156pub struct NoQueue;
157
158impl WriteQueue for NoQueue {
159    fn enqueue<'a>(
160        &'a self,
161        _write: QueuedWrite,
162    ) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send + 'a>> {
163        Box::pin(async {
164            Err(QueueError {
165                reason: "async write queue is not configured",
166            })
167        })
168    }
169}
170
171/// The `202 Accepted` envelope returned once an async write is durably enqueued.
172///
173/// A generic async handle, not a synthetic OpenSearch result: `result:"queued"`
174/// is honest about what happened (the op was accepted, not applied), and `op_id`
175/// is the handle the client correlates any downstream outcome against.
176#[must_use]
177pub(crate) fn accepted_response(op_id: &str, index: &str) -> PipelineResponse {
178    PipelineResponse {
179        status: 202,
180        body: serde_json::to_vec(&json!({
181            "op_id": op_id,
182            "status": "accepted",
183            "result": "queued",
184            "_index": index,
185        }))
186        .unwrap_or_else(|_| b"{}".to_vec()),
187        content_type: None,
188    }
189}
190
191/// The `400` returned when an op cannot be honored in async mode (see
192/// [`unsupported_async`]). The `reason` is value-free.
193#[must_use]
194pub(crate) fn unsupported_response(reason: &str, index: &str) -> PipelineResponse {
195    PipelineResponse {
196        status: 400,
197        body: serde_json::to_vec(&json!({
198            "status": "rejected",
199            "error": reason,
200            "_index": index,
201        }))
202        .unwrap_or_else(|_| b"{}".to_vec()),
203        content_type: None,
204    }
205}
206
207/// The `422` returned when async mode was requested but no queue is configured,
208/// the op is refused, never accepted-and-dropped.
209#[must_use]
210pub(crate) fn unavailable_response(index: &str) -> PipelineResponse {
211    PipelineResponse {
212        status: 422,
213        body: serde_json::to_vec(&json!({
214            "status": "rejected",
215            "error": "async write mode is not available on this proxy",
216            "_index": index,
217        }))
218        .unwrap_or_else(|_| b"{}".to_vec()),
219        content_type: None,
220    }
221}
222
223/// The `503` returned when the queue refused the op. Retryable: the same `op_id`
224/// makes the retry idempotent downstream.
225#[must_use]
226pub(crate) fn enqueue_failed_response(op_id: &str, index: &str) -> PipelineResponse {
227    PipelineResponse {
228        status: 503,
229        body: serde_json::to_vec(&json!({
230            "op_id": op_id,
231            "status": "rejected",
232            "error": "async write could not be enqueued",
233            "_index": index,
234        }))
235        .unwrap_or_else(|_| b"{}".to_vec()),
236        content_type: None,
237    }
238}
239
240impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
241    /// Durably enqueues a resolved write for downstream fan-out and returns the
242    /// `202` handle (`docs/04` §9). The `202` is produced **only after** the queue
243    /// acknowledges the enqueue; a missing queue is refused (`422`) and an enqueue
244    /// failure is reported (`503`), the op is never accepted-and-dropped. No live
245    /// epoch gate runs here: the op carries its epoch and the downstream applier
246    /// owns staleness, since there is no synchronous upstream to hold.
247    pub(crate) async fn enqueue_async(
248        &self,
249        ctx: &RequestCtx<'_>,
250        resolved: &Resolved,
251        batch: WriteBatch,
252    ) -> PipelineResponse {
253        let index = ctx.logical_index();
254        // A client error takes precedence over a missing queue: an op that cannot
255        // be honored async is rejected (`400`) whether or not a queue is wired.
256        if let Some(reason) = unsupported_async(ctx) {
257            return unsupported_response(reason, index);
258        }
259        if !self.write_queue.enabled() {
260            return unavailable_response(index);
261        }
262        let op_id = op_id_for(ctx, ctx.request_id());
263        let write = QueuedWrite {
264            op_id: op_id.clone(),
265            partition_key: resolved.partition.as_str().to_owned(),
266            batch,
267        };
268        match self.write_queue.enqueue(write).await {
269            Ok(()) => accepted_response(&op_id, index),
270            Err(_) => enqueue_failed_response(&op_id, index),
271        }
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn parses_known_modes_case_insensitively_and_rejects_unknown() {
281        assert_eq!(WriteMode::parse("sync"), Some(WriteMode::Sync));
282        assert_eq!(WriteMode::parse("ASYNC"), Some(WriteMode::Async));
283        assert_eq!(WriteMode::parse("queue"), None);
284        assert_eq!(WriteMode::parse(""), None);
285    }
286
287    #[test]
288    fn op_id_validation_bounds_length_and_charset() {
289        assert!(valid_op_id("a-b_c.d:1"));
290        assert!(!valid_op_id(""));
291        assert!(!valid_op_id("has space"));
292        assert!(!valid_op_id("inject\nkey"));
293        assert!(!valid_op_id(&"x".repeat(MAX_OP_ID_LEN + 1)));
294        assert!(valid_op_id(&"x".repeat(MAX_OP_ID_LEN)));
295    }
296
297    #[tokio::test]
298    async fn no_queue_is_disabled_and_refuses() {
299        assert!(!NoQueue.enabled());
300        let write = QueuedWrite {
301            op_id: "op-1".to_owned(),
302            partition_key: "acme".to_owned(),
303            batch: WriteBatch::single(test_op()),
304        };
305        let err = NoQueue.enqueue(write).await.unwrap_err();
306        assert_eq!(err.reason, "async write queue is not configured");
307    }
308
309    fn test_op() -> osproxy_sink::WriteOp {
310        use osproxy_core::{ClusterId, Epoch, IndexName, Target};
311        use osproxy_sink::{DocOp, WriteOp};
312        WriteOp::new(
313            Target::new(ClusterId::from("c"), IndexName::from("i")),
314            DocOp::Index {
315                id: Some("p:1".to_owned()),
316                routing: None,
317                body: b"{}".to_vec(),
318            },
319            Epoch::new(1),
320        )
321    }
322}