Skip to main content

osproxy_rewrite/
bulk.rs

1//! Parsing the `_bulk` NDJSON body into structured per-document actions.
2//!
3//! `_bulk` is newline-delimited JSON: each operation is an *action* line
4//! (`{"index":{"_id":"1"}}`) optionally followed by a *source* line (the
5//! document, for index/create/update; absent for delete). This module turns the
6//! raw bytes into a `Vec<BulkItem>` the engine demuxes by partition (`docs/04`
7//! §3). It is a pure parse, no tenancy meaning, held to the same coverage bar
8//! as the other transforms.
9
10use osproxy_core::json;
11use serde_json::Value;
12
13use crate::error::RewriteError;
14
15/// The action of a bulk operation, mirroring OpenSearch's verbs.
16#[derive(Clone, Copy, PartialEq, Eq, Debug)]
17pub enum BulkAction {
18    /// Index (create-or-replace) a document.
19    Index,
20    /// Create a document, failing if it already exists.
21    Create,
22    /// Partial-update / scripted-update a document.
23    Update,
24    /// Delete a document by id.
25    Delete,
26}
27
28impl BulkAction {
29    /// Whether this action carries a source line after its action line.
30    #[must_use]
31    pub fn has_source(self) -> bool {
32        !matches!(self, Self::Delete)
33    }
34
35    /// The action's wire keyword (`index`/`create`/`update`/`delete`), used as
36    /// the per-item key in the bulk response.
37    #[must_use]
38    pub fn keyword(self) -> &'static str {
39        match self {
40            Self::Index => "index",
41            Self::Create => "create",
42            Self::Update => "update",
43            Self::Delete => "delete",
44        }
45    }
46}
47
48/// One parsed bulk operation: its action, the optional explicit `_index`/`_id`/
49/// `routing` from the action line, and the source document (if any).
50#[derive(Clone, PartialEq, Eq, Debug)]
51pub struct BulkItem {
52    /// The operation verb.
53    pub action: BulkAction,
54    /// The explicit `_index` from the action line, if any (else the URL default).
55    pub index: Option<String>,
56    /// The explicit `_id` from the action line, if any.
57    pub id: Option<String>,
58    /// The explicit `routing` from the action line, if any.
59    pub routing: Option<String>,
60    /// Whether the action line carries an optimistic-concurrency precondition
61    /// (`if_seq_no`/`if_primary_term`/`version`/`version_type`). The async
62    /// fan-out path rejects such items: the precondition is evaluated against the
63    /// live version, which does not exist at enqueue time (`docs/04` §9).
64    pub concurrency_control: bool,
65    /// The source document as **raw bytes** (for index/create/update; `None` for
66    /// delete). Kept verbatim, not parsed into a `Value`, so the per-item
67    /// transform can scan and splice it without materializing a tree (ADR-014).
68    pub source: Option<Vec<u8>>,
69}
70
71/// Parses an NDJSON `_bulk` body into its ordered operations.
72///
73/// # Errors
74///
75/// Returns [`RewriteError::InvalidJson`] if an action or source line is not
76/// valid JSON, or [`RewriteError::MalformedBulkAction`] if an action line is not
77/// a single-key `{verb: {…}}` object, names an unknown verb, or a source line is
78/// missing for an action that requires one.
79///
80/// # Examples
81///
82/// ```
83/// use osproxy_rewrite::{parse_bulk, BulkAction};
84///
85/// let body = b"{\"index\":{\"_id\":\"1\"}}\n{\"msg\":\"hi\"}\n";
86/// let items = parse_bulk(body).unwrap();
87/// assert_eq!(items.len(), 1);
88/// assert_eq!(items[0].action, BulkAction::Index);
89/// assert_eq!(items[0].id.as_deref(), Some("1"));
90/// assert_eq!(items[0].source.as_deref(), Some(&b"{\"msg\":\"hi\"}"[..]));
91/// ```
92pub fn parse_bulk(body: &[u8]) -> Result<Vec<BulkItem>, RewriteError> {
93    let mut items = Vec::new();
94    let mut lines = body
95        .split(|&b| b == b'\n')
96        .filter(|l| !l.iter().all(u8::is_ascii_whitespace));
97    while let Some(action_line) = lines.next() {
98        let (action, meta) = parse_action_line(action_line)?;
99        let source = if action.has_source() {
100            let source_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
101            // Validate the line is well-formed JSON (no alloc), but keep the raw
102            // bytes, the transform splices them later without a `Value` tree.
103            json::validate(source_line).map_err(|_| RewriteError::InvalidJson)?;
104            Some(source_line.to_vec())
105        } else {
106            None
107        };
108        items.push(BulkItem {
109            action,
110            index: meta.index,
111            id: meta.id,
112            routing: meta.routing,
113            concurrency_control: meta.concurrency_control,
114            source,
115        });
116    }
117    Ok(items)
118}
119
120/// A bulk action line parsed **once**: its verb plus the `_index`/`_id`/`routing`
121/// metadata. The streaming demux parses each action line a single time into this,
122/// uses [`ParsedAction::has_source`] to decide whether a source line follows
123/// before framing the next line, then [`ParsedAction::into_item`] to finalize the
124/// op — so the action line's JSON is never parsed twice (ADR-014 stage 4). The
125/// buffered [`parse_bulk`] already parses each line once.
126#[derive(Debug)]
127pub struct ParsedAction {
128    action: BulkAction,
129    meta: ActionMeta,
130}
131
132impl ParsedAction {
133    /// The parsed verb.
134    #[must_use]
135    pub fn action(&self) -> BulkAction {
136        self.action
137    }
138
139    /// Whether a source line follows this action (index/create/update do, delete
140    /// does not), so the streaming reader knows to frame one before finalizing.
141    #[must_use]
142    pub fn has_source(&self) -> bool {
143        self.action.has_source()
144    }
145
146    /// Finalizes the op from the already-parsed action plus, for source-bearing
147    /// verbs, its source line. The source is kept as raw bytes, validated but not
148    /// materialized, exactly as [`parse_bulk`] does.
149    ///
150    /// # Errors
151    ///
152    /// [`RewriteError::MalformedBulkAction`] if a source-bearing action has no
153    /// source line, [`RewriteError::InvalidJson`] if the source line is not valid
154    /// JSON.
155    pub fn into_item(self, source_line: Option<&[u8]>) -> Result<BulkItem, RewriteError> {
156        let source = if self.action.has_source() {
157            let line = source_line.ok_or(RewriteError::MalformedBulkAction)?;
158            json::validate(line).map_err(|_| RewriteError::InvalidJson)?;
159            Some(line.to_vec())
160        } else {
161            None
162        };
163        Ok(BulkItem {
164            action: self.action,
165            index: self.meta.index,
166            id: self.meta.id,
167            routing: self.meta.routing,
168            concurrency_control: self.meta.concurrency_control,
169            source,
170        })
171    }
172}
173
174/// Parses one bulk **action line** into a [`ParsedAction`] — its verb and
175/// metadata — without consuming the source line, the per-op entry point for the
176/// streaming demux (ADR-014 stage 4). Validates the single-key `{verb: {…}}`
177/// shape like [`parse_bulk`].
178///
179/// # Errors
180///
181/// [`RewriteError::InvalidJson`] if the line is not JSON, or
182/// [`RewriteError::MalformedBulkAction`] if it is not a single-key action object.
183pub fn parse_bulk_action(line: &[u8]) -> Result<ParsedAction, RewriteError> {
184    let (action, meta) = parse_action_line(line)?;
185    Ok(ParsedAction { action, meta })
186}
187
188/// The `_index`/`_id`/`routing` pulled from an action line's metadata object.
189#[derive(Debug)]
190struct ActionMeta {
191    index: Option<String>,
192    id: Option<String>,
193    routing: Option<String>,
194    concurrency_control: bool,
195}
196
197/// Parses one action line into its action and metadata.
198fn parse_action_line(line: &[u8]) -> Result<(BulkAction, ActionMeta), RewriteError> {
199    let value: Value = serde_json::from_slice(line).map_err(|_| RewriteError::InvalidJson)?;
200    let obj = value.as_object().ok_or(RewriteError::MalformedBulkAction)?;
201    // Exactly one key: the action verb mapping to its metadata object.
202    let mut entries = obj.iter();
203    let (verb, meta) = entries.next().ok_or(RewriteError::MalformedBulkAction)?;
204    if entries.next().is_some() {
205        return Err(RewriteError::MalformedBulkAction);
206    }
207    let action = match verb.as_str() {
208        "index" => BulkAction::Index,
209        "create" => BulkAction::Create,
210        "update" => BulkAction::Update,
211        "delete" => BulkAction::Delete,
212        _ => return Err(RewriteError::MalformedBulkAction),
213    };
214    Ok((action, action_meta(meta)))
215}
216
217/// Extracts `_index`/`_id`/`routing` from an action's metadata object (lenient:
218/// a missing or non-object meta yields all-`None`).
219fn action_meta(meta: &Value) -> ActionMeta {
220    let str_field = |name: &str| meta.get(name).and_then(Value::as_str).map(str::to_owned);
221    let concurrency_control = ["if_seq_no", "if_primary_term", "version", "version_type"]
222        .iter()
223        .any(|k| meta.get(*k).is_some());
224    ActionMeta {
225        index: str_field("_index"),
226        id: str_field("_id"),
227        routing: str_field("routing"),
228        concurrency_control,
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use serde_json::json;
236
237    /// Parses an item's raw source bytes back into a `Value` for assertions.
238    fn source_json(item: &BulkItem) -> Value {
239        serde_json::from_slice(item.source.as_ref().unwrap()).unwrap()
240    }
241
242    #[test]
243    fn parses_index_create_delete_in_order() {
244        let body = concat!(
245            "{\"index\":{\"_index\":\"a\",\"_id\":\"1\"}}\n",
246            "{\"msg\":\"one\"}\n",
247            "{\"create\":{\"_id\":\"2\"}}\n",
248            "{\"msg\":\"two\"}\n",
249            "{\"delete\":{\"_id\":\"3\"}}\n",
250        );
251        let items = parse_bulk(body.as_bytes()).unwrap();
252        assert_eq!(items.len(), 3);
253
254        assert_eq!(items[0].action, BulkAction::Index);
255        assert_eq!(items[0].index.as_deref(), Some("a"));
256        assert_eq!(items[0].id.as_deref(), Some("1"));
257        assert_eq!(source_json(&items[0])["msg"], json!("one"));
258
259        assert_eq!(items[1].action, BulkAction::Create);
260        assert_eq!(source_json(&items[1])["msg"], json!("two"));
261
262        assert_eq!(items[2].action, BulkAction::Delete);
263        assert_eq!(items[2].id.as_deref(), Some("3"));
264        assert!(items[2].source.is_none());
265    }
266
267    #[test]
268    fn optimistic_concurrency_metadata_is_flagged() {
269        let body = concat!(
270            "{\"index\":{\"_id\":\"1\",\"if_seq_no\":3,\"if_primary_term\":1}}\n{\"k\":1}\n",
271            "{\"index\":{\"_id\":\"2\",\"version\":7}}\n{\"k\":2}\n",
272            "{\"index\":{\"_id\":\"3\"}}\n{\"k\":3}\n",
273        );
274        let items = parse_bulk(body.as_bytes()).unwrap();
275        assert!(items[0].concurrency_control, "if_seq_no/if_primary_term");
276        assert!(items[1].concurrency_control, "version");
277        assert!(!items[2].concurrency_control, "plain index");
278    }
279
280    #[test]
281    fn routing_is_read_from_the_action_line() {
282        let body = "{\"index\":{\"_id\":\"1\",\"routing\":\"r\"}}\n{\"k\":1}\n";
283        let items = parse_bulk(body.as_bytes()).unwrap();
284        assert_eq!(items[0].routing.as_deref(), Some("r"));
285    }
286
287    #[test]
288    fn blank_lines_are_skipped() {
289        let body = "\n{\"delete\":{\"_id\":\"9\"}}\n\n";
290        let items = parse_bulk(body.as_bytes()).unwrap();
291        assert_eq!(items.len(), 1);
292        assert_eq!(items[0].action, BulkAction::Delete);
293    }
294
295    #[test]
296    fn missing_source_line_is_rejected() {
297        let body = "{\"index\":{\"_id\":\"1\"}}\n"; // no source follows
298        assert_eq!(
299            parse_bulk(body.as_bytes()).unwrap_err(),
300            RewriteError::MalformedBulkAction
301        );
302    }
303
304    #[test]
305    fn unknown_verb_and_multikey_action_are_rejected() {
306        assert_eq!(
307            parse_bulk(b"{\"frobnicate\":{}}\n").unwrap_err(),
308            RewriteError::MalformedBulkAction
309        );
310        assert_eq!(
311            parse_bulk(b"{\"index\":{},\"delete\":{}}\n").unwrap_err(),
312            RewriteError::MalformedBulkAction
313        );
314    }
315
316    #[test]
317    fn invalid_json_action_is_rejected() {
318        assert_eq!(
319            parse_bulk(b"not json\n").unwrap_err(),
320            RewriteError::InvalidJson
321        );
322    }
323
324    #[test]
325    fn streaming_parse_op_matches_whole_body_parse() {
326        // The per-op streaming API yields the same items as parse_bulk for the
327        // same lines: an index (with source) and a delete (no source).
328        let action = br#"{"index":{"_index":"a","_id":"1"}}"#;
329        let source = br#"{"msg":"hi"}"#;
330        let parsed = parse_bulk_action(action).unwrap();
331        assert_eq!(parsed.action(), BulkAction::Index);
332        assert!(parsed.has_source());
333        let op = parsed.into_item(Some(source)).unwrap();
334        assert_eq!(op.action, BulkAction::Index);
335        assert_eq!(op.index.as_deref(), Some("a"));
336        assert_eq!(op.id.as_deref(), Some("1"));
337        assert_eq!(op.source.as_deref(), Some(&source[..]));
338
339        let del = br#"{"delete":{"_id":"2"}}"#;
340        let parsed = parse_bulk_action(del).unwrap();
341        assert_eq!(parsed.action(), BulkAction::Delete);
342        assert!(!parsed.has_source());
343        let op = parsed.into_item(None).unwrap();
344        assert_eq!(op.action, BulkAction::Delete);
345        assert!(op.source.is_none());
346    }
347
348    #[test]
349    fn streaming_parse_op_rejects_missing_source_and_bad_json() {
350        let action = br#"{"index":{"_id":"1"}}"#;
351        assert_eq!(
352            parse_bulk_action(action)
353                .unwrap()
354                .into_item(None)
355                .unwrap_err(),
356            RewriteError::MalformedBulkAction
357        );
358        assert_eq!(
359            parse_bulk_action(action)
360                .unwrap()
361                .into_item(Some(b"not json"))
362                .unwrap_err(),
363            RewriteError::InvalidJson
364        );
365    }
366
367    #[test]
368    fn has_source_and_keyword_match_the_action() {
369        assert!(BulkAction::Index.has_source());
370        assert!(!BulkAction::Delete.has_source());
371        assert_eq!(BulkAction::Create.keyword(), "create");
372        assert_eq!(BulkAction::Update.keyword(), "update");
373    }
374}