Skip to main content

osproxy_rewrite/
msearch.rs

1//! Parsing the `_msearch` NDJSON body into structured per-search requests.
2//!
3//! `_msearch` (multi-search) is newline-delimited JSON in *header/body* pairs:
4//! a header line (`{"index":"a"}`, possibly empty) followed by the search body
5//! line (`{"query":{…}}`). This module turns the raw bytes into a
6//! `Vec<MsearchItem>` the engine wraps in the partition filter and demuxes by
7//! target (`docs/04` §4). Like [`parse_bulk`](crate::parse_bulk) it is a pure
8//! parse with no tenancy meaning.
9
10use serde_json::Value;
11
12use crate::error::RewriteError;
13
14/// One parsed multi-search request: the optional explicit `index` from the
15/// header line (else the URL default), and the raw query body line.
16#[derive(Clone, PartialEq, Eq, Debug)]
17pub struct MsearchItem {
18    /// The explicit `index` from the header line, if any (else the URL default).
19    pub index: Option<String>,
20    /// The raw search body (the line after the header), forwarded once wrapped.
21    pub query: Vec<u8>,
22}
23
24/// Parses an `_msearch` NDJSON body into its ordered searches.
25///
26/// Each search is a header line followed by a body line. The header's `index`
27/// may be a string or the first entry of an array (OpenSearch accepts both);
28/// any other shape leaves the index defaulted to the URL.
29///
30/// # Errors
31///
32/// Returns [`RewriteError::InvalidJson`] if a header or body line is not valid
33/// JSON, or [`RewriteError::MalformedBulkAction`] if a header line has no
34/// following body line.
35///
36/// # Examples
37///
38/// ```
39/// use osproxy_rewrite::parse_msearch;
40///
41/// let body = b"{\"index\":\"a\"}\n{\"query\":{\"match_all\":{}}}\n";
42/// let items = parse_msearch(body).unwrap();
43/// assert_eq!(items.len(), 1);
44/// assert_eq!(items[0].index.as_deref(), Some("a"));
45/// ```
46pub fn parse_msearch(body: &[u8]) -> Result<Vec<MsearchItem>, RewriteError> {
47    let mut items = Vec::new();
48    let mut lines = body
49        .split(|&b| b == b'\n')
50        .filter(|l| !l.iter().all(u8::is_ascii_whitespace));
51    while let Some(header_line) = lines.next() {
52        let header: Value =
53            serde_json::from_slice(header_line).map_err(|_| RewriteError::InvalidJson)?;
54        let body_line = lines.next().ok_or(RewriteError::MalformedBulkAction)?;
55        // Validate the body is JSON, but forward the original bytes verbatim.
56        serde_json::from_slice::<Value>(body_line).map_err(|_| RewriteError::InvalidJson)?;
57        items.push(MsearchItem {
58            index: header_index(&header),
59            query: body_line.to_vec(),
60        });
61    }
62    Ok(items)
63}
64
65/// The `index` named in a header line, accepting a string or an array's first
66/// string entry (a missing or other-shaped value defaults to the URL index).
67fn header_index(header: &Value) -> Option<String> {
68    match header.get("index") {
69        Some(Value::String(s)) => Some(s.clone()),
70        Some(Value::Array(a)) => a.first().and_then(Value::as_str).map(str::to_owned),
71        _ => None,
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use serde_json::Value;
79
80    #[test]
81    fn parses_header_body_pairs_in_order() {
82        let body = concat!(
83            "{\"index\":\"a\"}\n",
84            "{\"query\":{\"match_all\":{}}}\n",
85            "{}\n",
86            "{\"query\":{\"term\":{\"k\":1}}}\n",
87        );
88        let items = parse_msearch(body.as_bytes()).unwrap();
89        assert_eq!(items.len(), 2);
90        assert_eq!(items[0].index.as_deref(), Some("a"));
91        let q0: Value = serde_json::from_slice(&items[0].query).unwrap();
92        assert_eq!(q0["query"]["match_all"], serde_json::json!({}));
93        assert_eq!(items[1].index, None);
94        let q1: Value = serde_json::from_slice(&items[1].query).unwrap();
95        assert_eq!(q1["query"]["term"]["k"], 1);
96    }
97
98    #[test]
99    fn index_array_takes_the_first_entry() {
100        let body = "{\"index\":[\"a\",\"b\"]}\n{\"query\":{\"match_all\":{}}}\n";
101        let items = parse_msearch(body.as_bytes()).unwrap();
102        assert_eq!(items[0].index.as_deref(), Some("a"));
103    }
104
105    #[test]
106    fn header_without_body_is_rejected() {
107        assert_eq!(
108            parse_msearch(b"{\"index\":\"a\"}\n").unwrap_err(),
109            RewriteError::MalformedBulkAction
110        );
111    }
112
113    #[test]
114    fn invalid_json_header_or_body_is_rejected() {
115        assert_eq!(
116            parse_msearch(b"not json\n{}\n").unwrap_err(),
117            RewriteError::InvalidJson
118        );
119        assert_eq!(
120            parse_msearch(b"{}\nnot json\n").unwrap_err(),
121            RewriteError::InvalidJson
122        );
123    }
124
125    #[test]
126    fn blank_lines_between_pairs_are_skipped() {
127        let body = "\n{}\n{\"query\":{\"match_all\":{}}}\n\n";
128        let items = parse_msearch(body.as_bytes()).unwrap();
129        assert_eq!(items.len(), 1);
130    }
131}