use serde_json::Value;
use crate::error::RewriteError;
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct MsearchItem {
pub index: Option<String>,
pub query: Vec<u8>,
}
pub fn parse_msearch(body: &[u8]) -> Result<Vec<MsearchItem>, RewriteError> {
let mut items = Vec::new();
let mut lines = body
.split(|&b| b == b'\n')
.filter(|l| !l.iter().all(u8::is_ascii_whitespace));
while let Some(header_line) = lines.next() {
let header: Value =
serde_json::from_slice(header_line).map_err(|_| RewriteError::InvalidJson)?;
let body_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
serde_json::from_slice::<Value>(body_line).map_err(|_| RewriteError::InvalidJson)?;
items.push(MsearchItem {
index: header_index(&header),
query: body_line.to_vec(),
});
}
Ok(items)
}
fn header_index(header: &Value) -> Option<String> {
match header.get("index") {
Some(Value::String(s)) => Some(s.clone()),
Some(Value::Array(a)) => a.first().and_then(Value::as_str).map(str::to_owned),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;
#[test]
fn parses_header_body_pairs_in_order() {
let body = concat!(
"{\"index\":\"a\"}\n",
"{\"query\":{\"match_all\":{}}}\n",
"{}\n",
"{\"query\":{\"term\":{\"k\":1}}}\n",
);
let items = parse_msearch(body.as_bytes()).unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0].index.as_deref(), Some("a"));
let q0: Value = serde_json::from_slice(&items[0].query).unwrap();
assert_eq!(q0["query"]["match_all"], serde_json::json!({}));
assert_eq!(items[1].index, None);
let q1: Value = serde_json::from_slice(&items[1].query).unwrap();
assert_eq!(q1["query"]["term"]["k"], 1);
}
#[test]
fn index_array_takes_the_first_entry() {
let body = "{\"index\":[\"a\",\"b\"]}\n{\"query\":{\"match_all\":{}}}\n";
let items = parse_msearch(body.as_bytes()).unwrap();
assert_eq!(items[0].index.as_deref(), Some("a"));
}
#[test]
fn header_without_body_is_rejected() {
assert_eq!(
parse_msearch(b"{\"index\":\"a\"}\n").unwrap_err(),
RewriteError::MalformedBulkAction
);
}
#[test]
fn invalid_json_header_or_body_is_rejected() {
assert_eq!(
parse_msearch(b"not json\n{}\n").unwrap_err(),
RewriteError::InvalidJson
);
assert_eq!(
parse_msearch(b"{}\nnot json\n").unwrap_err(),
RewriteError::InvalidJson
);
}
#[test]
fn blank_lines_between_pairs_are_skipped() {
let body = "\n{}\n{\"query\":{\"match_all\":{}}}\n\n";
let items = parse_msearch(body.as_bytes()).unwrap();
assert_eq!(items.len(), 1);
}
}