1use osproxy_core::json;
11use serde_json::Value;
12
13use crate::error::RewriteError;
14
15#[derive(Clone, Copy, PartialEq, Eq, Debug)]
17pub enum BulkAction {
18 Index,
20 Create,
22 Update,
24 Delete,
26}
27
28impl BulkAction {
29 #[must_use]
31 pub fn has_source(self) -> bool {
32 !matches!(self, Self::Delete)
33 }
34
35 #[must_use]
38 pub fn keyword(self) -> &'static str {
39 match self {
40 Self::Index => "index",
41 Self::Create => "create",
42 Self::Update => "update",
43 Self::Delete => "delete",
44 }
45 }
46}
47
48#[derive(Clone, PartialEq, Eq, Debug)]
51pub struct BulkItem {
52 pub action: BulkAction,
54 pub index: Option<String>,
56 pub id: Option<String>,
58 pub routing: Option<String>,
60 pub concurrency_control: bool,
65 pub source: Option<Vec<u8>>,
69}
70
71pub fn parse_bulk(body: &[u8]) -> Result<Vec<BulkItem>, RewriteError> {
93 let mut items = Vec::new();
94 let mut lines = body
95 .split(|&b| b == b'\n')
96 .filter(|l| !l.iter().all(u8::is_ascii_whitespace));
97 while let Some(action_line) = lines.next() {
98 let (action, meta) = parse_action_line(action_line)?;
99 let source = if action.has_source() {
100 let source_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
101 json::validate(source_line).map_err(|_| RewriteError::InvalidJson)?;
104 Some(source_line.to_vec())
105 } else {
106 None
107 };
108 items.push(BulkItem {
109 action,
110 index: meta.index,
111 id: meta.id,
112 routing: meta.routing,
113 concurrency_control: meta.concurrency_control,
114 source,
115 });
116 }
117 Ok(items)
118}
119
120pub fn parse_bulk_action(line: &[u8]) -> Result<BulkAction, RewriteError> {
130 parse_action_line(line).map(|(action, _)| action)
131}
132
133pub fn parse_bulk_op(
143 action_line: &[u8],
144 source_line: Option<&[u8]>,
145) -> Result<BulkItem, RewriteError> {
146 let (action, meta) = parse_action_line(action_line)?;
147 let source = if action.has_source() {
148 let line = source_line.ok_or(RewriteError::MalformedBulkAction)?;
149 json::validate(line).map_err(|_| RewriteError::InvalidJson)?;
150 Some(line.to_vec())
151 } else {
152 None
153 };
154 Ok(BulkItem {
155 action,
156 index: meta.index,
157 id: meta.id,
158 routing: meta.routing,
159 concurrency_control: meta.concurrency_control,
160 source,
161 })
162}
163
164struct ActionMeta {
166 index: Option<String>,
167 id: Option<String>,
168 routing: Option<String>,
169 concurrency_control: bool,
170}
171
172fn parse_action_line(line: &[u8]) -> Result<(BulkAction, ActionMeta), RewriteError> {
174 let value: Value = serde_json::from_slice(line).map_err(|_| RewriteError::InvalidJson)?;
175 let obj = value.as_object().ok_or(RewriteError::MalformedBulkAction)?;
176 let mut entries = obj.iter();
178 let (verb, meta) = entries.next().ok_or(RewriteError::MalformedBulkAction)?;
179 if entries.next().is_some() {
180 return Err(RewriteError::MalformedBulkAction);
181 }
182 let action = match verb.as_str() {
183 "index" => BulkAction::Index,
184 "create" => BulkAction::Create,
185 "update" => BulkAction::Update,
186 "delete" => BulkAction::Delete,
187 _ => return Err(RewriteError::MalformedBulkAction),
188 };
189 Ok((action, action_meta(meta)))
190}
191
192fn action_meta(meta: &Value) -> ActionMeta {
195 let str_field = |name: &str| meta.get(name).and_then(Value::as_str).map(str::to_owned);
196 let concurrency_control = ["if_seq_no", "if_primary_term", "version", "version_type"]
197 .iter()
198 .any(|k| meta.get(*k).is_some());
199 ActionMeta {
200 index: str_field("_index"),
201 id: str_field("_id"),
202 routing: str_field("routing"),
203 concurrency_control,
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use serde_json::json;
211
212 fn source_json(item: &BulkItem) -> Value {
214 serde_json::from_slice(item.source.as_ref().unwrap()).unwrap()
215 }
216
217 #[test]
218 fn parses_index_create_delete_in_order() {
219 let body = concat!(
220 "{\"index\":{\"_index\":\"a\",\"_id\":\"1\"}}\n",
221 "{\"msg\":\"one\"}\n",
222 "{\"create\":{\"_id\":\"2\"}}\n",
223 "{\"msg\":\"two\"}\n",
224 "{\"delete\":{\"_id\":\"3\"}}\n",
225 );
226 let items = parse_bulk(body.as_bytes()).unwrap();
227 assert_eq!(items.len(), 3);
228
229 assert_eq!(items[0].action, BulkAction::Index);
230 assert_eq!(items[0].index.as_deref(), Some("a"));
231 assert_eq!(items[0].id.as_deref(), Some("1"));
232 assert_eq!(source_json(&items[0])["msg"], json!("one"));
233
234 assert_eq!(items[1].action, BulkAction::Create);
235 assert_eq!(source_json(&items[1])["msg"], json!("two"));
236
237 assert_eq!(items[2].action, BulkAction::Delete);
238 assert_eq!(items[2].id.as_deref(), Some("3"));
239 assert!(items[2].source.is_none());
240 }
241
242 #[test]
243 fn optimistic_concurrency_metadata_is_flagged() {
244 let body = concat!(
245 "{\"index\":{\"_id\":\"1\",\"if_seq_no\":3,\"if_primary_term\":1}}\n{\"k\":1}\n",
246 "{\"index\":{\"_id\":\"2\",\"version\":7}}\n{\"k\":2}\n",
247 "{\"index\":{\"_id\":\"3\"}}\n{\"k\":3}\n",
248 );
249 let items = parse_bulk(body.as_bytes()).unwrap();
250 assert!(items[0].concurrency_control, "if_seq_no/if_primary_term");
251 assert!(items[1].concurrency_control, "version");
252 assert!(!items[2].concurrency_control, "plain index");
253 }
254
255 #[test]
256 fn routing_is_read_from_the_action_line() {
257 let body = "{\"index\":{\"_id\":\"1\",\"routing\":\"r\"}}\n{\"k\":1}\n";
258 let items = parse_bulk(body.as_bytes()).unwrap();
259 assert_eq!(items[0].routing.as_deref(), Some("r"));
260 }
261
262 #[test]
263 fn blank_lines_are_skipped() {
264 let body = "\n{\"delete\":{\"_id\":\"9\"}}\n\n";
265 let items = parse_bulk(body.as_bytes()).unwrap();
266 assert_eq!(items.len(), 1);
267 assert_eq!(items[0].action, BulkAction::Delete);
268 }
269
270 #[test]
271 fn missing_source_line_is_rejected() {
272 let body = "{\"index\":{\"_id\":\"1\"}}\n"; assert_eq!(
274 parse_bulk(body.as_bytes()).unwrap_err(),
275 RewriteError::MalformedBulkAction
276 );
277 }
278
279 #[test]
280 fn unknown_verb_and_multikey_action_are_rejected() {
281 assert_eq!(
282 parse_bulk(b"{\"frobnicate\":{}}\n").unwrap_err(),
283 RewriteError::MalformedBulkAction
284 );
285 assert_eq!(
286 parse_bulk(b"{\"index\":{},\"delete\":{}}\n").unwrap_err(),
287 RewriteError::MalformedBulkAction
288 );
289 }
290
291 #[test]
292 fn invalid_json_action_is_rejected() {
293 assert_eq!(
294 parse_bulk(b"not json\n").unwrap_err(),
295 RewriteError::InvalidJson
296 );
297 }
298
299 #[test]
300 fn streaming_parse_op_matches_whole_body_parse() {
301 let action = br#"{"index":{"_index":"a","_id":"1"}}"#;
304 let source = br#"{"msg":"hi"}"#;
305 assert!(parse_bulk_action(action).unwrap() == BulkAction::Index);
306 let op = parse_bulk_op(action, Some(source)).unwrap();
307 assert_eq!(op.action, BulkAction::Index);
308 assert_eq!(op.index.as_deref(), Some("a"));
309 assert_eq!(op.id.as_deref(), Some("1"));
310 assert_eq!(op.source.as_deref(), Some(&source[..]));
311
312 let del = br#"{"delete":{"_id":"2"}}"#;
313 assert_eq!(parse_bulk_action(del).unwrap(), BulkAction::Delete);
314 let op = parse_bulk_op(del, None).unwrap();
315 assert_eq!(op.action, BulkAction::Delete);
316 assert!(op.source.is_none());
317 }
318
319 #[test]
320 fn streaming_parse_op_rejects_missing_source_and_bad_json() {
321 let action = br#"{"index":{"_id":"1"}}"#;
322 assert_eq!(
323 parse_bulk_op(action, None).unwrap_err(),
324 RewriteError::MalformedBulkAction
325 );
326 assert_eq!(
327 parse_bulk_op(action, Some(b"not json")).unwrap_err(),
328 RewriteError::InvalidJson
329 );
330 }
331
332 #[test]
333 fn has_source_and_keyword_match_the_action() {
334 assert!(BulkAction::Index.has_source());
335 assert!(!BulkAction::Delete.has_source());
336 assert_eq!(BulkAction::Create.keyword(), "create");
337 assert_eq!(BulkAction::Update.keyword(), "update");
338 }
339}