Skip to main content

osproxy_rewrite/
mget.rs

1//! Parsing the `_mget` body into structured per-document fetches.
2//!
3//! `_mget` (multi-get) carries either a `docs` array of `{_index,_id,routing}`
4//! objects or a bare `ids` array (with the index taken from the URL). Like
5//! [`parse_bulk`](crate::parse_bulk) this is a pure parse with no tenancy
6//! meaning: the engine resolves each item's partition and demuxes by target
7//! (`docs/04` §5). Held to the same coverage bar as the other transforms.
8
9use serde_json::Value;
10
11use crate::error::RewriteError;
12
13/// One parsed multi-get fetch: the optional explicit `_index` (else the URL
14/// default), the document `_id`, and the optional `routing`.
15#[derive(Clone, PartialEq, Eq, Debug)]
16pub struct MgetItem {
17    /// The explicit `_index` from the doc entry, if any (else the URL default).
18    pub index: Option<String>,
19    /// The logical document id to fetch.
20    pub id: String,
21    /// The explicit `routing` from the doc entry, if any.
22    pub routing: Option<String>,
23}
24
25/// Parses an `_mget` body into its ordered fetches.
26///
27/// Accepts both shapes OpenSearch supports: `{"docs":[{"_index":…,"_id":…},…]}`
28/// and `{"ids":["1","2"]}` (index defaulted from the URL).
29///
30/// # Errors
31///
32/// Returns [`RewriteError::InvalidJson`] if the body is not valid JSON,
33/// [`RewriteError::NotAnObject`] if it is not an object carrying `docs` or
34/// `ids`, or [`RewriteError::MalformedBulkAction`] if a `docs` entry is not an
35/// object with a string `_id` (or an `ids` entry is not a string).
36///
37/// # Examples
38///
39/// ```
40/// use osproxy_rewrite::parse_mget;
41///
42/// let body = br#"{"docs":[{"_index":"a","_id":"1"},{"_id":"2"}]}"#;
43/// let items = parse_mget(body).unwrap();
44/// assert_eq!(items.len(), 2);
45/// assert_eq!(items[0].index.as_deref(), Some("a"));
46/// assert_eq!(items[1].id, "2");
47/// ```
48pub fn parse_mget(body: &[u8]) -> Result<Vec<MgetItem>, RewriteError> {
49    let value: Value = serde_json::from_slice(body).map_err(|_| RewriteError::InvalidJson)?;
50    let obj = value.as_object().ok_or(RewriteError::NotAnObject)?;
51
52    if let Some(docs) = obj.get("docs").and_then(Value::as_array) {
53        docs.iter().map(parse_doc_entry).collect()
54    } else if let Some(ids) = obj.get("ids").and_then(Value::as_array) {
55        ids.iter().map(parse_id_entry).collect()
56    } else {
57        Err(RewriteError::NotAnObject)
58    }
59}
60
61/// Parses one `docs` entry into an [`MgetItem`].
62fn parse_doc_entry(entry: &Value) -> Result<MgetItem, RewriteError> {
63    let obj = entry.as_object().ok_or(RewriteError::MalformedBulkAction)?;
64    let str_field = |name: &str| obj.get(name).and_then(Value::as_str).map(str::to_owned);
65    let id = str_field("_id").ok_or(RewriteError::MalformedBulkAction)?;
66    Ok(MgetItem {
67        index: str_field("_index"),
68        id,
69        routing: str_field("routing"),
70    })
71}
72
73/// Parses one bare `ids` entry (a string id, index from the URL).
74fn parse_id_entry(entry: &Value) -> Result<MgetItem, RewriteError> {
75    let id = entry
76        .as_str()
77        .ok_or(RewriteError::MalformedBulkAction)?
78        .to_owned();
79    Ok(MgetItem {
80        index: None,
81        id,
82        routing: None,
83    })
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89
90    #[test]
91    fn parses_docs_form_in_order() {
92        let body = br#"{"docs":[
93            {"_index":"a","_id":"1","routing":"r"},
94            {"_id":"2"}
95        ]}"#;
96        let items = parse_mget(body).unwrap();
97        assert_eq!(items.len(), 2);
98        assert_eq!(items[0].index.as_deref(), Some("a"));
99        assert_eq!(items[0].id, "1");
100        assert_eq!(items[0].routing.as_deref(), Some("r"));
101        assert_eq!(items[1].index, None);
102        assert_eq!(items[1].id, "2");
103        assert_eq!(items[1].routing, None);
104    }
105
106    #[test]
107    fn parses_ids_form() {
108        let items = parse_mget(br#"{"ids":["7","8"]}"#).unwrap();
109        assert_eq!(items.len(), 2);
110        assert_eq!(items[0].id, "7");
111        assert!(items[0].index.is_none());
112        assert_eq!(items[1].id, "8");
113    }
114
115    #[test]
116    fn doc_entry_without_id_is_rejected() {
117        assert_eq!(
118            parse_mget(br#"{"docs":[{"_index":"a"}]}"#).unwrap_err(),
119            RewriteError::MalformedBulkAction
120        );
121    }
122
123    #[test]
124    fn non_string_id_entry_is_rejected() {
125        assert_eq!(
126            parse_mget(br#"{"ids":[1]}"#).unwrap_err(),
127            RewriteError::MalformedBulkAction
128        );
129    }
130
131    #[test]
132    fn body_without_docs_or_ids_is_not_an_object_request() {
133        assert_eq!(
134            parse_mget(br#"{"other":1}"#).unwrap_err(),
135            RewriteError::NotAnObject
136        );
137        assert_eq!(
138            parse_mget(br"[1,2]").unwrap_err(),
139            RewriteError::NotAnObject
140        );
141    }
142
143    #[test]
144    fn invalid_json_is_rejected() {
145        assert_eq!(
146            parse_mget(b"not json").unwrap_err(),
147            RewriteError::InvalidJson
148        );
149    }
150}