use osproxy_core::json;
use serde_json::Value;
use crate::error::RewriteError;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum BulkAction {
Index,
Create,
Update,
Delete,
}
impl BulkAction {
#[must_use]
pub fn has_source(self) -> bool {
!matches!(self, Self::Delete)
}
#[must_use]
pub fn keyword(self) -> &'static str {
match self {
Self::Index => "index",
Self::Create => "create",
Self::Update => "update",
Self::Delete => "delete",
}
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct BulkItem {
pub action: BulkAction,
pub index: Option<String>,
pub id: Option<String>,
pub routing: Option<String>,
pub concurrency_control: bool,
pub source: Option<Vec<u8>>,
}
pub fn parse_bulk(body: &[u8]) -> Result<Vec<BulkItem>, RewriteError> {
let mut items = Vec::new();
let mut lines = body
.split(|&b| b == b'\n')
.filter(|l| !l.iter().all(u8::is_ascii_whitespace));
while let Some(action_line) = lines.next() {
let (action, meta) = parse_action_line(action_line)?;
let source = if action.has_source() {
let source_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
json::validate(source_line).map_err(|_| RewriteError::InvalidJson)?;
Some(source_line.to_vec())
} else {
None
};
items.push(BulkItem {
action,
index: meta.index,
id: meta.id,
routing: meta.routing,
concurrency_control: meta.concurrency_control,
source,
});
}
Ok(items)
}
#[derive(Debug)]
pub struct ParsedAction {
action: BulkAction,
meta: ActionMeta,
}
impl ParsedAction {
#[must_use]
pub fn action(&self) -> BulkAction {
self.action
}
#[must_use]
pub fn has_source(&self) -> bool {
self.action.has_source()
}
pub fn into_item(self, source_line: Option<&[u8]>) -> Result<BulkItem, RewriteError> {
let source = if self.action.has_source() {
let line = source_line.ok_or(RewriteError::MalformedBulkAction)?;
json::validate(line).map_err(|_| RewriteError::InvalidJson)?;
Some(line.to_vec())
} else {
None
};
Ok(BulkItem {
action: self.action,
index: self.meta.index,
id: self.meta.id,
routing: self.meta.routing,
concurrency_control: self.meta.concurrency_control,
source,
})
}
}
pub fn parse_bulk_action(line: &[u8]) -> Result<ParsedAction, RewriteError> {
let (action, meta) = parse_action_line(line)?;
Ok(ParsedAction { action, meta })
}
#[derive(Debug)]
struct ActionMeta {
index: Option<String>,
id: Option<String>,
routing: Option<String>,
concurrency_control: bool,
}
fn parse_action_line(line: &[u8]) -> Result<(BulkAction, ActionMeta), RewriteError> {
let value: Value = serde_json::from_slice(line).map_err(|_| RewriteError::InvalidJson)?;
let obj = value.as_object().ok_or(RewriteError::MalformedBulkAction)?;
let mut entries = obj.iter();
let (verb, meta) = entries.next().ok_or(RewriteError::MalformedBulkAction)?;
if entries.next().is_some() {
return Err(RewriteError::MalformedBulkAction);
}
let action = match verb.as_str() {
"index" => BulkAction::Index,
"create" => BulkAction::Create,
"update" => BulkAction::Update,
"delete" => BulkAction::Delete,
_ => return Err(RewriteError::MalformedBulkAction),
};
Ok((action, action_meta(meta)))
}
fn action_meta(meta: &Value) -> ActionMeta {
let str_field = |name: &str| meta.get(name).and_then(Value::as_str).map(str::to_owned);
let concurrency_control = ["if_seq_no", "if_primary_term", "version", "version_type"]
.iter()
.any(|k| meta.get(*k).is_some());
ActionMeta {
index: str_field("_index"),
id: str_field("_id"),
routing: str_field("routing"),
concurrency_control,
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn source_json(item: &BulkItem) -> Value {
serde_json::from_slice(item.source.as_ref().unwrap()).unwrap()
}
#[test]
fn parses_index_create_delete_in_order() {
let body = concat!(
"{\"index\":{\"_index\":\"a\",\"_id\":\"1\"}}\n",
"{\"msg\":\"one\"}\n",
"{\"create\":{\"_id\":\"2\"}}\n",
"{\"msg\":\"two\"}\n",
"{\"delete\":{\"_id\":\"3\"}}\n",
);
let items = parse_bulk(body.as_bytes()).unwrap();
assert_eq!(items.len(), 3);
assert_eq!(items[0].action, BulkAction::Index);
assert_eq!(items[0].index.as_deref(), Some("a"));
assert_eq!(items[0].id.as_deref(), Some("1"));
assert_eq!(source_json(&items[0])["msg"], json!("one"));
assert_eq!(items[1].action, BulkAction::Create);
assert_eq!(source_json(&items[1])["msg"], json!("two"));
assert_eq!(items[2].action, BulkAction::Delete);
assert_eq!(items[2].id.as_deref(), Some("3"));
assert!(items[2].source.is_none());
}
#[test]
fn optimistic_concurrency_metadata_is_flagged() {
let body = concat!(
"{\"index\":{\"_id\":\"1\",\"if_seq_no\":3,\"if_primary_term\":1}}\n{\"k\":1}\n",
"{\"index\":{\"_id\":\"2\",\"version\":7}}\n{\"k\":2}\n",
"{\"index\":{\"_id\":\"3\"}}\n{\"k\":3}\n",
);
let items = parse_bulk(body.as_bytes()).unwrap();
assert!(items[0].concurrency_control, "if_seq_no/if_primary_term");
assert!(items[1].concurrency_control, "version");
assert!(!items[2].concurrency_control, "plain index");
}
#[test]
fn routing_is_read_from_the_action_line() {
let body = "{\"index\":{\"_id\":\"1\",\"routing\":\"r\"}}\n{\"k\":1}\n";
let items = parse_bulk(body.as_bytes()).unwrap();
assert_eq!(items[0].routing.as_deref(), Some("r"));
}
#[test]
fn blank_lines_are_skipped() {
let body = "\n{\"delete\":{\"_id\":\"9\"}}\n\n";
let items = parse_bulk(body.as_bytes()).unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0].action, BulkAction::Delete);
}
#[test]
fn missing_source_line_is_rejected() {
let body = "{\"index\":{\"_id\":\"1\"}}\n"; assert_eq!(
parse_bulk(body.as_bytes()).unwrap_err(),
RewriteError::MalformedBulkAction
);
}
#[test]
fn unknown_verb_and_multikey_action_are_rejected() {
assert_eq!(
parse_bulk(b"{\"frobnicate\":{}}\n").unwrap_err(),
RewriteError::MalformedBulkAction
);
assert_eq!(
parse_bulk(b"{\"index\":{},\"delete\":{}}\n").unwrap_err(),
RewriteError::MalformedBulkAction
);
}
#[test]
fn invalid_json_action_is_rejected() {
assert_eq!(
parse_bulk(b"not json\n").unwrap_err(),
RewriteError::InvalidJson
);
}
#[test]
fn streaming_parse_op_matches_whole_body_parse() {
let action = br#"{"index":{"_index":"a","_id":"1"}}"#;
let source = br#"{"msg":"hi"}"#;
let parsed = parse_bulk_action(action).unwrap();
assert_eq!(parsed.action(), BulkAction::Index);
assert!(parsed.has_source());
let op = parsed.into_item(Some(source)).unwrap();
assert_eq!(op.action, BulkAction::Index);
assert_eq!(op.index.as_deref(), Some("a"));
assert_eq!(op.id.as_deref(), Some("1"));
assert_eq!(op.source.as_deref(), Some(&source[..]));
let del = br#"{"delete":{"_id":"2"}}"#;
let parsed = parse_bulk_action(del).unwrap();
assert_eq!(parsed.action(), BulkAction::Delete);
assert!(!parsed.has_source());
let op = parsed.into_item(None).unwrap();
assert_eq!(op.action, BulkAction::Delete);
assert!(op.source.is_none());
}
#[test]
fn streaming_parse_op_rejects_missing_source_and_bad_json() {
let action = br#"{"index":{"_id":"1"}}"#;
assert_eq!(
parse_bulk_action(action)
.unwrap()
.into_item(None)
.unwrap_err(),
RewriteError::MalformedBulkAction
);
assert_eq!(
parse_bulk_action(action)
.unwrap()
.into_item(Some(b"not json"))
.unwrap_err(),
RewriteError::InvalidJson
);
}
#[test]
fn has_source_and_keyword_match_the_action() {
assert!(BulkAction::Index.has_source());
assert!(!BulkAction::Delete.has_source());
assert_eq!(BulkAction::Create.keyword(), "create");
assert_eq!(BulkAction::Update.keyword(), "update");
}
}