1use osproxy_core::json;
11use serde_json::Value;
12
13use crate::error::RewriteError;
14
15#[derive(Clone, Copy, PartialEq, Eq, Debug)]
17pub enum BulkAction {
18 Index,
20 Create,
22 Update,
24 Delete,
26}
27
28impl BulkAction {
29 #[must_use]
31 pub fn has_source(self) -> bool {
32 !matches!(self, Self::Delete)
33 }
34
35 #[must_use]
38 pub fn keyword(self) -> &'static str {
39 match self {
40 Self::Index => "index",
41 Self::Create => "create",
42 Self::Update => "update",
43 Self::Delete => "delete",
44 }
45 }
46}
47
48#[derive(Clone, PartialEq, Eq, Debug)]
51pub struct BulkItem {
52 pub action: BulkAction,
54 pub index: Option<String>,
56 pub id: Option<String>,
58 pub routing: Option<String>,
60 pub concurrency_control: bool,
65 pub source: Option<Vec<u8>>,
69}
70
71pub fn parse_bulk(body: &[u8]) -> Result<Vec<BulkItem>, RewriteError> {
93 let mut items = Vec::new();
94 let mut lines = body
95 .split(|&b| b == b'\n')
96 .filter(|l| !l.iter().all(u8::is_ascii_whitespace));
97 while let Some(action_line) = lines.next() {
98 let (action, meta) = parse_action_line(action_line)?;
99 let source = if action.has_source() {
100 let source_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
101 json::validate(source_line).map_err(|_| RewriteError::InvalidJson)?;
104 Some(source_line.to_vec())
105 } else {
106 None
107 };
108 items.push(BulkItem {
109 action,
110 index: meta.index,
111 id: meta.id,
112 routing: meta.routing,
113 concurrency_control: meta.concurrency_control,
114 source,
115 });
116 }
117 Ok(items)
118}
119
120#[derive(Debug)]
127pub struct ParsedAction {
128 action: BulkAction,
129 meta: ActionMeta,
130}
131
132impl ParsedAction {
133 #[must_use]
135 pub fn action(&self) -> BulkAction {
136 self.action
137 }
138
139 #[must_use]
142 pub fn has_source(&self) -> bool {
143 self.action.has_source()
144 }
145
146 pub fn into_item(self, source_line: Option<&[u8]>) -> Result<BulkItem, RewriteError> {
156 let source = if self.action.has_source() {
157 let line = source_line.ok_or(RewriteError::MalformedBulkAction)?;
158 json::validate(line).map_err(|_| RewriteError::InvalidJson)?;
159 Some(line.to_vec())
160 } else {
161 None
162 };
163 Ok(BulkItem {
164 action: self.action,
165 index: self.meta.index,
166 id: self.meta.id,
167 routing: self.meta.routing,
168 concurrency_control: self.meta.concurrency_control,
169 source,
170 })
171 }
172}
173
174pub fn parse_bulk_action(line: &[u8]) -> Result<ParsedAction, RewriteError> {
184 let (action, meta) = parse_action_line(line)?;
185 Ok(ParsedAction { action, meta })
186}
187
188#[derive(Debug)]
190struct ActionMeta {
191 index: Option<String>,
192 id: Option<String>,
193 routing: Option<String>,
194 concurrency_control: bool,
195}
196
197fn parse_action_line(line: &[u8]) -> Result<(BulkAction, ActionMeta), RewriteError> {
199 let value: Value = serde_json::from_slice(line).map_err(|_| RewriteError::InvalidJson)?;
200 let obj = value.as_object().ok_or(RewriteError::MalformedBulkAction)?;
201 let mut entries = obj.iter();
203 let (verb, meta) = entries.next().ok_or(RewriteError::MalformedBulkAction)?;
204 if entries.next().is_some() {
205 return Err(RewriteError::MalformedBulkAction);
206 }
207 let action = match verb.as_str() {
208 "index" => BulkAction::Index,
209 "create" => BulkAction::Create,
210 "update" => BulkAction::Update,
211 "delete" => BulkAction::Delete,
212 _ => return Err(RewriteError::MalformedBulkAction),
213 };
214 Ok((action, action_meta(meta)))
215}
216
217fn action_meta(meta: &Value) -> ActionMeta {
220 let str_field = |name: &str| meta.get(name).and_then(Value::as_str).map(str::to_owned);
221 let concurrency_control = ["if_seq_no", "if_primary_term", "version", "version_type"]
222 .iter()
223 .any(|k| meta.get(*k).is_some());
224 ActionMeta {
225 index: str_field("_index"),
226 id: str_field("_id"),
227 routing: str_field("routing"),
228 concurrency_control,
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use serde_json::json;
236
237 fn source_json(item: &BulkItem) -> Value {
239 serde_json::from_slice(item.source.as_ref().unwrap()).unwrap()
240 }
241
242 #[test]
243 fn parses_index_create_delete_in_order() {
244 let body = concat!(
245 "{\"index\":{\"_index\":\"a\",\"_id\":\"1\"}}\n",
246 "{\"msg\":\"one\"}\n",
247 "{\"create\":{\"_id\":\"2\"}}\n",
248 "{\"msg\":\"two\"}\n",
249 "{\"delete\":{\"_id\":\"3\"}}\n",
250 );
251 let items = parse_bulk(body.as_bytes()).unwrap();
252 assert_eq!(items.len(), 3);
253
254 assert_eq!(items[0].action, BulkAction::Index);
255 assert_eq!(items[0].index.as_deref(), Some("a"));
256 assert_eq!(items[0].id.as_deref(), Some("1"));
257 assert_eq!(source_json(&items[0])["msg"], json!("one"));
258
259 assert_eq!(items[1].action, BulkAction::Create);
260 assert_eq!(source_json(&items[1])["msg"], json!("two"));
261
262 assert_eq!(items[2].action, BulkAction::Delete);
263 assert_eq!(items[2].id.as_deref(), Some("3"));
264 assert!(items[2].source.is_none());
265 }
266
267 #[test]
268 fn optimistic_concurrency_metadata_is_flagged() {
269 let body = concat!(
270 "{\"index\":{\"_id\":\"1\",\"if_seq_no\":3,\"if_primary_term\":1}}\n{\"k\":1}\n",
271 "{\"index\":{\"_id\":\"2\",\"version\":7}}\n{\"k\":2}\n",
272 "{\"index\":{\"_id\":\"3\"}}\n{\"k\":3}\n",
273 );
274 let items = parse_bulk(body.as_bytes()).unwrap();
275 assert!(items[0].concurrency_control, "if_seq_no/if_primary_term");
276 assert!(items[1].concurrency_control, "version");
277 assert!(!items[2].concurrency_control, "plain index");
278 }
279
280 #[test]
281 fn routing_is_read_from_the_action_line() {
282 let body = "{\"index\":{\"_id\":\"1\",\"routing\":\"r\"}}\n{\"k\":1}\n";
283 let items = parse_bulk(body.as_bytes()).unwrap();
284 assert_eq!(items[0].routing.as_deref(), Some("r"));
285 }
286
287 #[test]
288 fn blank_lines_are_skipped() {
289 let body = "\n{\"delete\":{\"_id\":\"9\"}}\n\n";
290 let items = parse_bulk(body.as_bytes()).unwrap();
291 assert_eq!(items.len(), 1);
292 assert_eq!(items[0].action, BulkAction::Delete);
293 }
294
295 #[test]
296 fn missing_source_line_is_rejected() {
297 let body = "{\"index\":{\"_id\":\"1\"}}\n"; assert_eq!(
299 parse_bulk(body.as_bytes()).unwrap_err(),
300 RewriteError::MalformedBulkAction
301 );
302 }
303
304 #[test]
305 fn unknown_verb_and_multikey_action_are_rejected() {
306 assert_eq!(
307 parse_bulk(b"{\"frobnicate\":{}}\n").unwrap_err(),
308 RewriteError::MalformedBulkAction
309 );
310 assert_eq!(
311 parse_bulk(b"{\"index\":{},\"delete\":{}}\n").unwrap_err(),
312 RewriteError::MalformedBulkAction
313 );
314 }
315
316 #[test]
317 fn invalid_json_action_is_rejected() {
318 assert_eq!(
319 parse_bulk(b"not json\n").unwrap_err(),
320 RewriteError::InvalidJson
321 );
322 }
323
324 #[test]
325 fn streaming_parse_op_matches_whole_body_parse() {
326 let action = br#"{"index":{"_index":"a","_id":"1"}}"#;
329 let source = br#"{"msg":"hi"}"#;
330 let parsed = parse_bulk_action(action).unwrap();
331 assert_eq!(parsed.action(), BulkAction::Index);
332 assert!(parsed.has_source());
333 let op = parsed.into_item(Some(source)).unwrap();
334 assert_eq!(op.action, BulkAction::Index);
335 assert_eq!(op.index.as_deref(), Some("a"));
336 assert_eq!(op.id.as_deref(), Some("1"));
337 assert_eq!(op.source.as_deref(), Some(&source[..]));
338
339 let del = br#"{"delete":{"_id":"2"}}"#;
340 let parsed = parse_bulk_action(del).unwrap();
341 assert_eq!(parsed.action(), BulkAction::Delete);
342 assert!(!parsed.has_source());
343 let op = parsed.into_item(None).unwrap();
344 assert_eq!(op.action, BulkAction::Delete);
345 assert!(op.source.is_none());
346 }
347
348 #[test]
349 fn streaming_parse_op_rejects_missing_source_and_bad_json() {
350 let action = br#"{"index":{"_id":"1"}}"#;
351 assert_eq!(
352 parse_bulk_action(action)
353 .unwrap()
354 .into_item(None)
355 .unwrap_err(),
356 RewriteError::MalformedBulkAction
357 );
358 assert_eq!(
359 parse_bulk_action(action)
360 .unwrap()
361 .into_item(Some(b"not json"))
362 .unwrap_err(),
363 RewriteError::InvalidJson
364 );
365 }
366
367 #[test]
368 fn has_source_and_keyword_match_the_action() {
369 assert!(BulkAction::Index.has_source());
370 assert!(!BulkAction::Delete.has_source());
371 assert_eq!(BulkAction::Create.keyword(), "create");
372 assert_eq!(BulkAction::Update.keyword(), "update");
373 }
374}