Skip to main content

osproxy_rewrite/
bulk.rs

1//! Parsing the `_bulk` NDJSON body into structured per-document actions.
2//!
3//! `_bulk` is newline-delimited JSON: each operation is an *action* line
4//! (`{"index":{"_id":"1"}}`) optionally followed by a *source* line (the
5//! document, for index/create/update; absent for delete). This module turns the
6//! raw bytes into a `Vec<BulkItem>` the engine demuxes by partition (`docs/04`
7//! §3). It is a pure parse, no tenancy meaning, held to the same coverage bar
8//! as the other transforms.
9
10use osproxy_core::json;
11use serde_json::Value;
12
13use crate::error::RewriteError;
14
15/// The action of a bulk operation, mirroring OpenSearch's verbs.
16#[derive(Clone, Copy, PartialEq, Eq, Debug)]
17pub enum BulkAction {
18    /// Index (create-or-replace) a document.
19    Index,
20    /// Create a document, failing if it already exists.
21    Create,
22    /// Partial-update / scripted-update a document.
23    Update,
24    /// Delete a document by id.
25    Delete,
26}
27
28impl BulkAction {
29    /// Whether this action carries a source line after its action line.
30    #[must_use]
31    pub fn has_source(self) -> bool {
32        !matches!(self, Self::Delete)
33    }
34
35    /// The action's wire keyword (`index`/`create`/`update`/`delete`), used as
36    /// the per-item key in the bulk response.
37    #[must_use]
38    pub fn keyword(self) -> &'static str {
39        match self {
40            Self::Index => "index",
41            Self::Create => "create",
42            Self::Update => "update",
43            Self::Delete => "delete",
44        }
45    }
46}
47
48/// One parsed bulk operation: its action, the optional explicit `_index`/`_id`/
49/// `routing` from the action line, and the source document (if any).
50#[derive(Clone, PartialEq, Eq, Debug)]
51pub struct BulkItem {
52    /// The operation verb.
53    pub action: BulkAction,
54    /// The explicit `_index` from the action line, if any (else the URL default).
55    pub index: Option<String>,
56    /// The explicit `_id` from the action line, if any.
57    pub id: Option<String>,
58    /// The explicit `routing` from the action line, if any.
59    pub routing: Option<String>,
60    /// Whether the action line carries an optimistic-concurrency precondition
61    /// (`if_seq_no`/`if_primary_term`/`version`/`version_type`). The async
62    /// fan-out path rejects such items: the precondition is evaluated against the
63    /// live version, which does not exist at enqueue time (`docs/04` §9).
64    pub concurrency_control: bool,
65    /// The source document as **raw bytes** (for index/create/update; `None` for
66    /// delete). Kept verbatim, not parsed into a `Value`, so the per-item
67    /// transform can scan and splice it without materializing a tree (ADR-014).
68    pub source: Option<Vec<u8>>,
69}
70
71/// Parses an NDJSON `_bulk` body into its ordered operations.
72///
73/// # Errors
74///
75/// Returns [`RewriteError::InvalidJson`] if an action or source line is not
76/// valid JSON, or [`RewriteError::MalformedBulkAction`] if an action line is not
77/// a single-key `{verb: {…}}` object, names an unknown verb, or a source line is
78/// missing for an action that requires one.
79///
80/// # Examples
81///
82/// ```
83/// use osproxy_rewrite::{parse_bulk, BulkAction};
84///
85/// let body = b"{\"index\":{\"_id\":\"1\"}}\n{\"msg\":\"hi\"}\n";
86/// let items = parse_bulk(body).unwrap();
87/// assert_eq!(items.len(), 1);
88/// assert_eq!(items[0].action, BulkAction::Index);
89/// assert_eq!(items[0].id.as_deref(), Some("1"));
90/// assert_eq!(items[0].source.as_deref(), Some(&b"{\"msg\":\"hi\"}"[..]));
91/// ```
92pub fn parse_bulk(body: &[u8]) -> Result<Vec<BulkItem>, RewriteError> {
93    let mut items = Vec::new();
94    let mut lines = body
95        .split(|&b| b == b'\n')
96        .filter(|l| !l.iter().all(u8::is_ascii_whitespace));
97    while let Some(action_line) = lines.next() {
98        let (action, meta) = parse_action_line(action_line)?;
99        let source = if action.has_source() {
100            let source_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
101            // Validate the line is well-formed JSON (no alloc), but keep the raw
102            // bytes, the transform splices them later without a `Value` tree.
103            json::validate(source_line).map_err(|_| RewriteError::InvalidJson)?;
104            Some(source_line.to_vec())
105        } else {
106            None
107        };
108        items.push(BulkItem {
109            action,
110            index: meta.index,
111            id: meta.id,
112            routing: meta.routing,
113            concurrency_control: meta.concurrency_control,
114            source,
115        });
116    }
117    Ok(items)
118}
119
120/// Parses just the **action** of a bulk action line, the verb, so a streaming
121/// reader can decide whether a source line follows ([`BulkAction::has_source`])
122/// before it frames the next line (ADR-014 stage 4). Validates the single-key
123/// `{verb: {…}}` shape like [`parse_bulk`].
124///
125/// # Errors
126///
127/// [`RewriteError::InvalidJson`] if the line is not JSON, or
128/// [`RewriteError::MalformedBulkAction`] if it is not a single-key action object.
129pub fn parse_bulk_action(line: &[u8]) -> Result<BulkAction, RewriteError> {
130    parse_action_line(line).map(|(action, _)| action)
131}
132
133/// Parses one bulk operation from its already-framed action line and (for
134/// source-bearing verbs) source line, the per-op entry point for the streaming
135/// demux (ADR-014 stage 4). The source line is kept as raw bytes, validated but
136/// not materialized, exactly as [`parse_bulk`] does.
137///
138/// # Errors
139///
140/// [`RewriteError::MalformedBulkAction`] if a source-bearing action has no source
141/// line, [`RewriteError::InvalidJson`] if a line is not valid JSON.
142pub fn parse_bulk_op(
143    action_line: &[u8],
144    source_line: Option<&[u8]>,
145) -> Result<BulkItem, RewriteError> {
146    let (action, meta) = parse_action_line(action_line)?;
147    let source = if action.has_source() {
148        let line = source_line.ok_or(RewriteError::MalformedBulkAction)?;
149        json::validate(line).map_err(|_| RewriteError::InvalidJson)?;
150        Some(line.to_vec())
151    } else {
152        None
153    };
154    Ok(BulkItem {
155        action,
156        index: meta.index,
157        id: meta.id,
158        routing: meta.routing,
159        concurrency_control: meta.concurrency_control,
160        source,
161    })
162}
163
164/// The `_index`/`_id`/`routing` pulled from an action line's metadata object.
165struct ActionMeta {
166    index: Option<String>,
167    id: Option<String>,
168    routing: Option<String>,
169    concurrency_control: bool,
170}
171
172/// Parses one action line into its action and metadata.
173fn parse_action_line(line: &[u8]) -> Result<(BulkAction, ActionMeta), RewriteError> {
174    let value: Value = serde_json::from_slice(line).map_err(|_| RewriteError::InvalidJson)?;
175    let obj = value.as_object().ok_or(RewriteError::MalformedBulkAction)?;
176    // Exactly one key: the action verb mapping to its metadata object.
177    let mut entries = obj.iter();
178    let (verb, meta) = entries.next().ok_or(RewriteError::MalformedBulkAction)?;
179    if entries.next().is_some() {
180        return Err(RewriteError::MalformedBulkAction);
181    }
182    let action = match verb.as_str() {
183        "index" => BulkAction::Index,
184        "create" => BulkAction::Create,
185        "update" => BulkAction::Update,
186        "delete" => BulkAction::Delete,
187        _ => return Err(RewriteError::MalformedBulkAction),
188    };
189    Ok((action, action_meta(meta)))
190}
191
192/// Extracts `_index`/`_id`/`routing` from an action's metadata object (lenient:
193/// a missing or non-object meta yields all-`None`).
194fn action_meta(meta: &Value) -> ActionMeta {
195    let str_field = |name: &str| meta.get(name).and_then(Value::as_str).map(str::to_owned);
196    let concurrency_control = ["if_seq_no", "if_primary_term", "version", "version_type"]
197        .iter()
198        .any(|k| meta.get(*k).is_some());
199    ActionMeta {
200        index: str_field("_index"),
201        id: str_field("_id"),
202        routing: str_field("routing"),
203        concurrency_control,
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use serde_json::json;
211
212    /// Parses an item's raw source bytes back into a `Value` for assertions.
213    fn source_json(item: &BulkItem) -> Value {
214        serde_json::from_slice(item.source.as_ref().unwrap()).unwrap()
215    }
216
217    #[test]
218    fn parses_index_create_delete_in_order() {
219        let body = concat!(
220            "{\"index\":{\"_index\":\"a\",\"_id\":\"1\"}}\n",
221            "{\"msg\":\"one\"}\n",
222            "{\"create\":{\"_id\":\"2\"}}\n",
223            "{\"msg\":\"two\"}\n",
224            "{\"delete\":{\"_id\":\"3\"}}\n",
225        );
226        let items = parse_bulk(body.as_bytes()).unwrap();
227        assert_eq!(items.len(), 3);
228
229        assert_eq!(items[0].action, BulkAction::Index);
230        assert_eq!(items[0].index.as_deref(), Some("a"));
231        assert_eq!(items[0].id.as_deref(), Some("1"));
232        assert_eq!(source_json(&items[0])["msg"], json!("one"));
233
234        assert_eq!(items[1].action, BulkAction::Create);
235        assert_eq!(source_json(&items[1])["msg"], json!("two"));
236
237        assert_eq!(items[2].action, BulkAction::Delete);
238        assert_eq!(items[2].id.as_deref(), Some("3"));
239        assert!(items[2].source.is_none());
240    }
241
242    #[test]
243    fn optimistic_concurrency_metadata_is_flagged() {
244        let body = concat!(
245            "{\"index\":{\"_id\":\"1\",\"if_seq_no\":3,\"if_primary_term\":1}}\n{\"k\":1}\n",
246            "{\"index\":{\"_id\":\"2\",\"version\":7}}\n{\"k\":2}\n",
247            "{\"index\":{\"_id\":\"3\"}}\n{\"k\":3}\n",
248        );
249        let items = parse_bulk(body.as_bytes()).unwrap();
250        assert!(items[0].concurrency_control, "if_seq_no/if_primary_term");
251        assert!(items[1].concurrency_control, "version");
252        assert!(!items[2].concurrency_control, "plain index");
253    }
254
255    #[test]
256    fn routing_is_read_from_the_action_line() {
257        let body = "{\"index\":{\"_id\":\"1\",\"routing\":\"r\"}}\n{\"k\":1}\n";
258        let items = parse_bulk(body.as_bytes()).unwrap();
259        assert_eq!(items[0].routing.as_deref(), Some("r"));
260    }
261
262    #[test]
263    fn blank_lines_are_skipped() {
264        let body = "\n{\"delete\":{\"_id\":\"9\"}}\n\n";
265        let items = parse_bulk(body.as_bytes()).unwrap();
266        assert_eq!(items.len(), 1);
267        assert_eq!(items[0].action, BulkAction::Delete);
268    }
269
270    #[test]
271    fn missing_source_line_is_rejected() {
272        let body = "{\"index\":{\"_id\":\"1\"}}\n"; // no source follows
273        assert_eq!(
274            parse_bulk(body.as_bytes()).unwrap_err(),
275            RewriteError::MalformedBulkAction
276        );
277    }
278
279    #[test]
280    fn unknown_verb_and_multikey_action_are_rejected() {
281        assert_eq!(
282            parse_bulk(b"{\"frobnicate\":{}}\n").unwrap_err(),
283            RewriteError::MalformedBulkAction
284        );
285        assert_eq!(
286            parse_bulk(b"{\"index\":{},\"delete\":{}}\n").unwrap_err(),
287            RewriteError::MalformedBulkAction
288        );
289    }
290
291    #[test]
292    fn invalid_json_action_is_rejected() {
293        assert_eq!(
294            parse_bulk(b"not json\n").unwrap_err(),
295            RewriteError::InvalidJson
296        );
297    }
298
299    #[test]
300    fn streaming_parse_op_matches_whole_body_parse() {
301        // The per-op streaming API yields the same items as parse_bulk for the
302        // same lines: an index (with source) and a delete (no source).
303        let action = br#"{"index":{"_index":"a","_id":"1"}}"#;
304        let source = br#"{"msg":"hi"}"#;
305        assert!(parse_bulk_action(action).unwrap() == BulkAction::Index);
306        let op = parse_bulk_op(action, Some(source)).unwrap();
307        assert_eq!(op.action, BulkAction::Index);
308        assert_eq!(op.index.as_deref(), Some("a"));
309        assert_eq!(op.id.as_deref(), Some("1"));
310        assert_eq!(op.source.as_deref(), Some(&source[..]));
311
312        let del = br#"{"delete":{"_id":"2"}}"#;
313        assert_eq!(parse_bulk_action(del).unwrap(), BulkAction::Delete);
314        let op = parse_bulk_op(del, None).unwrap();
315        assert_eq!(op.action, BulkAction::Delete);
316        assert!(op.source.is_none());
317    }
318
319    #[test]
320    fn streaming_parse_op_rejects_missing_source_and_bad_json() {
321        let action = br#"{"index":{"_id":"1"}}"#;
322        assert_eq!(
323            parse_bulk_op(action, None).unwrap_err(),
324            RewriteError::MalformedBulkAction
325        );
326        assert_eq!(
327            parse_bulk_op(action, Some(b"not json")).unwrap_err(),
328            RewriteError::InvalidJson
329        );
330    }
331
332    #[test]
333    fn has_source_and_keyword_match_the_action() {
334        assert!(BulkAction::Index.has_source());
335        assert!(!BulkAction::Delete.has_source());
336        assert_eq!(BulkAction::Create.keyword(), "create");
337        assert_eq!(BulkAction::Update.keyword(), "update");
338    }
339}