osproxy_rewrite/
msearch.rs1use serde_json::Value;
11
12use crate::error::RewriteError;
13
14#[derive(Clone, PartialEq, Eq, Debug)]
17pub struct MsearchItem {
18 pub index: Option<String>,
20 pub query: Vec<u8>,
22}
23
24pub fn parse_msearch(body: &[u8]) -> Result<Vec<MsearchItem>, RewriteError> {
47 let mut items = Vec::new();
48 let mut lines = body
49 .split(|&b| b == b'\n')
50 .filter(|l| !l.iter().all(u8::is_ascii_whitespace));
51 while let Some(header_line) = lines.next() {
52 let header: Value =
53 serde_json::from_slice(header_line).map_err(|_| RewriteError::InvalidJson)?;
54 let body_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
55 serde_json::from_slice::<Value>(body_line).map_err(|_| RewriteError::InvalidJson)?;
57 items.push(MsearchItem {
58 index: header_index(&header),
59 query: body_line.to_vec(),
60 });
61 }
62 Ok(items)
63}
64
65fn header_index(header: &Value) -> Option<String> {
68 match header.get("index") {
69 Some(Value::String(s)) => Some(s.clone()),
70 Some(Value::Array(a)) => a.first().and_then(Value::as_str).map(str::to_owned),
71 _ => None,
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use serde_json::Value;
79
80 #[test]
81 fn parses_header_body_pairs_in_order() {
82 let body = concat!(
83 "{\"index\":\"a\"}\n",
84 "{\"query\":{\"match_all\":{}}}\n",
85 "{}\n",
86 "{\"query\":{\"term\":{\"k\":1}}}\n",
87 );
88 let items = parse_msearch(body.as_bytes()).unwrap();
89 assert_eq!(items.len(), 2);
90 assert_eq!(items[0].index.as_deref(), Some("a"));
91 let q0: Value = serde_json::from_slice(&items[0].query).unwrap();
92 assert_eq!(q0["query"]["match_all"], serde_json::json!({}));
93 assert_eq!(items[1].index, None);
94 let q1: Value = serde_json::from_slice(&items[1].query).unwrap();
95 assert_eq!(q1["query"]["term"]["k"], 1);
96 }
97
98 #[test]
99 fn index_array_takes_the_first_entry() {
100 let body = "{\"index\":[\"a\",\"b\"]}\n{\"query\":{\"match_all\":{}}}\n";
101 let items = parse_msearch(body.as_bytes()).unwrap();
102 assert_eq!(items[0].index.as_deref(), Some("a"));
103 }
104
105 #[test]
106 fn header_without_body_is_rejected() {
107 assert_eq!(
108 parse_msearch(b"{\"index\":\"a\"}\n").unwrap_err(),
109 RewriteError::MalformedBulkAction
110 );
111 }
112
113 #[test]
114 fn invalid_json_header_or_body_is_rejected() {
115 assert_eq!(
116 parse_msearch(b"not json\n{}\n").unwrap_err(),
117 RewriteError::InvalidJson
118 );
119 assert_eq!(
120 parse_msearch(b"{}\nnot json\n").unwrap_err(),
121 RewriteError::InvalidJson
122 );
123 }
124
125 #[test]
126 fn blank_lines_between_pairs_are_skipped() {
127 let body = "\n{}\n{\"query\":{\"match_all\":{}}}\n\n";
128 let items = parse_msearch(body.as_bytes()).unwrap();
129 assert_eq!(items.len(), 1);
130 }
131}