Skip to main content

osproxy_rewrite/
query.rs

1//! Wrapping a client search body in the mandatory partition filter.
2//!
3//! The shared-index isolation guarantee (`docs/03` §5) is enforced by nesting
4//! the *entire* client query inside a `bool` whose `filter` pins the partition
5//! field(s). Because the client query becomes the `must` clause of a bool the
6//! proxy constructs, there is no syntactic way for it to escape the sibling
7//! `filter`, the filter is not a suggestion the client can override, it is a
8//! structural enclosure. This is the read-path counterpart of the write-path
9//! field injection.
10
11use std::collections::BTreeMap;
12
13use osproxy_core::FieldName;
14use serde_json::value::RawValue;
15use serde_json::{Map, Value};
16
17use crate::error::RewriteError;
18
19/// Wraps the `query` of a client search body so every match is additionally
20/// constrained by `filter` term(s) the client cannot remove.
21///
22/// The client's original query (or an implicit `match_all` when absent) becomes
23/// the `must` clause of a freshly constructed `bool`; the partition `filter`
24/// terms become its `filter` clause. All other top-level keys (`size`, `sort`,
25/// `_source`, `aggs`, …) are preserved untouched.
26///
27/// When `filter` is non-empty (a shared index, where isolation depends on the
28/// filter), the body is also screened for constructs that escape it, a `global`
29/// aggregation or a `suggest` block, and rejected with [`RewriteError::Unfilterable`]
30/// (`docs/03` §5, NFR-S4). With an empty `filter` (a dedicated index/cluster, the
31/// whole target belongs to the partition) nothing is screened.
32///
33/// # Errors
34///
35/// Returns [`RewriteError::InvalidJson`] if `body` is non-empty but not valid
36/// JSON, [`RewriteError::NotAnObject`] if it is not a JSON object, or
37/// [`RewriteError::Unfilterable`] if a partition filter is in force and the body
38/// carries a construct that would bypass it.
39///
40/// # Examples
41///
42/// ```
43/// use osproxy_core::FieldName;
44/// use serde_json::{json, Value};
45/// use osproxy_rewrite::wrap_query;
46///
47/// let wrapped = wrap_query(
48///     br#"{"query":{"match":{"msg":"hi"}}}"#,
49///     &[(FieldName::from("_tenant"), Value::from("acme"))],
50/// )
51/// .unwrap();
52/// let doc: Value = serde_json::from_slice(&wrapped).unwrap();
53/// assert_eq!(doc["query"]["bool"]["filter"][0]["term"]["_tenant"], "acme");
54/// assert_eq!(doc["query"]["bool"]["must"][0]["match"]["msg"], "hi");
55/// ```
56pub fn wrap_query(body: &[u8], filter: &[(FieldName, Value)]) -> Result<Vec<u8>, RewriteError> {
57    // Parse only the top level. Untouched sibling keys (`size`, `sort`, `aggs`, …)
58    // and the client's own query stay as raw byte spans rather than being
59    // materialized into `Value` trees, serde still fully validates the JSON and
60    // proves the body is an object, so the isolation guarantee is unchanged; we
61    // only avoid re-allocating subtrees the proxy does not inspect.
62    let mut top = parse_top(body)?;
63
64    // Isolation depends on the filter only when there is one (a shared index). If
65    // so, refuse any sibling construct that OpenSearch evaluates outside the
66    // query, a `global` aggregation or a `suggest` block, since the wrapping
67    // `bool.filter` cannot constrain it (NFR-S4, `docs/03` §5).
68    if !filter.is_empty() {
69        reject_unfilterable(&top)?;
70    }
71
72    // The client's query becomes the inner `must`; absent means match-all. It is
73    // re-embedded verbatim (its raw bytes), never re-serialized.
74    let client_query = top.remove("query");
75    let query = build_filtered_query(client_query.as_deref(), filter)?;
76    top.insert("query".to_owned(), query);
77
78    // RawValue values serialize as their raw bytes, so the preserved siblings are
79    // copied out verbatim without a second parse.
80    serde_json::to_vec(&top).map_err(|_| RewriteError::InvalidJson)
81}
82
83/// Builds the `{"bool":{"must":[…],"filter":[…]}}` subtree, embedding the client
84/// query (if any) verbatim and the partition `filter` terms, as one [`RawValue`].
85fn build_filtered_query(
86    client_query: Option<&RawValue>,
87    filter: &[(FieldName, Value)],
88) -> Result<Box<RawValue>, RewriteError> {
89    let mut q = Vec::with_capacity(64 + client_query.map_or(0, |q| q.get().len()));
90    q.extend_from_slice(br#"{"bool":{"must":"#);
91    match client_query {
92        // The client query is a single `must` clause, embedded byte-for-byte.
93        Some(raw) => {
94            q.push(b'[');
95            q.extend_from_slice(raw.get().as_bytes());
96            q.push(b']');
97        }
98        None => q.extend_from_slice(b"[]"),
99    }
100    q.extend_from_slice(br#","filter":["#);
101    for (i, (name, value)) in filter.iter().enumerate() {
102        if i > 0 {
103            q.push(b',');
104        }
105        q.extend_from_slice(br#"{"term":"#);
106        // Serialize `{<name>: <value>}` with serde so the field name and value are
107        // correctly quoted/escaped, never hand-rolled.
108        let mut term = Map::with_capacity(1);
109        term.insert(name.as_str().to_owned(), value.clone());
110        serde_json::to_writer(&mut q, &term).map_err(|_| RewriteError::InvalidJson)?;
111        q.push(b'}');
112    }
113    q.extend_from_slice(b"]}}");
114
115    let s = String::from_utf8(q).map_err(|_| RewriteError::InvalidJson)?;
116    RawValue::from_string(s).map_err(|_| RewriteError::InvalidJson)
117}
118
119/// Parses the search body's top-level object, with each value left as a raw byte
120/// span. An empty body is an empty object (a bare `_search` is a match-all). A
121/// valid-but-non-object body is [`RewriteError::NotAnObject`]; malformed JSON is
122/// [`RewriteError::InvalidJson`].
123fn parse_top(body: &[u8]) -> Result<BTreeMap<String, Box<RawValue>>, RewriteError> {
124    if body.iter().all(u8::is_ascii_whitespace) {
125        return Ok(BTreeMap::new());
126    }
127    match serde_json::from_slice::<BTreeMap<String, Box<RawValue>>>(body) {
128        Ok(map) => Ok(map),
129        // The map parse fails both for non-object JSON and for malformed JSON;
130        // re-validate (cold path) as a raw value to tell the two apart so the
131        // caller still gets a precise error.
132        Err(_) => match serde_json::from_slice::<&RawValue>(body) {
133            Ok(_) => Err(RewriteError::NotAnObject),
134            Err(_) => Err(RewriteError::InvalidJson),
135        },
136    }
137}
138
139/// Rejects search bodies whose top level carries a construct that escapes the
140/// partition filter (`docs/03` §5): a `suggest` block, or an `aggs`/`aggregations`
141/// tree containing a `global` aggregation. Only the small request-side agg
142/// *definition* is parsed (never the response), so the no-materialization posture
143/// holds. Fail-closed: a malformed agg subtree is itself rejected.
144fn reject_unfilterable(top: &BTreeMap<String, Box<RawValue>>) -> Result<(), RewriteError> {
145    if top.contains_key("suggest") {
146        return Err(RewriteError::Unfilterable {
147            construct: "suggest",
148        });
149    }
150    for key in ["aggs", "aggregations"] {
151        if let Some(raw) = top.get(key) {
152            let aggs: Value = serde_json::from_slice(raw.get().as_bytes())
153                .map_err(|_| RewriteError::InvalidJson)?;
154            if contains_global_agg(&aggs) {
155                return Err(RewriteError::Unfilterable {
156                    construct: "global aggregation",
157                });
158            }
159        }
160    }
161    Ok(())
162}
163
164/// Whether an aggregations object contains a `global` bucket at any nesting
165/// depth. An aggregation is `{"<name>": {"<type>": …, "aggs": {…}}}`; a `global`
166/// agg is the one whose body has a `"global"` key. Recurses through nested
167/// `aggs`/`aggregations` so a `global` buried under other buckets is still found.
168fn contains_global_agg(aggs: &Value) -> bool {
169    let Some(obj) = aggs.as_object() else {
170        return false;
171    };
172    obj.values().any(|agg| {
173        agg.as_object().is_some_and(|agg| {
174            agg.contains_key("global")
175                || ["aggs", "aggregations"]
176                    .iter()
177                    .filter_map(|k| agg.get(*k))
178                    .any(contains_global_agg)
179        })
180    })
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    fn filter() -> Vec<(FieldName, Value)> {
188        vec![(FieldName::from("_tenant"), Value::from("acme"))]
189    }
190
191    #[test]
192    fn client_query_is_nested_under_must_with_filter_sibling() {
193        let wrapped = wrap_query(br#"{"query":{"match":{"msg":"hi"}}}"#, &filter()).unwrap();
194        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
195        assert_eq!(doc["query"]["bool"]["must"][0]["match"]["msg"], "hi");
196        assert_eq!(doc["query"]["bool"]["filter"][0]["term"]["_tenant"], "acme");
197    }
198
199    #[test]
200    fn absent_query_becomes_filtered_match_all() {
201        let wrapped = wrap_query(br#"{"size":5}"#, &filter()).unwrap();
202        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
203        // No client query => empty `must`, but the filter still pins the tenant.
204        assert_eq!(doc["query"]["bool"]["must"].as_array().unwrap().len(), 0);
205        assert_eq!(doc["query"]["bool"]["filter"][0]["term"]["_tenant"], "acme");
206        // Unrelated top-level keys survive.
207        assert_eq!(doc["size"], 5);
208    }
209
210    #[test]
211    fn empty_body_is_a_filtered_match_all() {
212        let wrapped = wrap_query(b"", &filter()).unwrap();
213        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
214        assert_eq!(doc["query"]["bool"]["filter"][0]["term"]["_tenant"], "acme");
215    }
216
217    #[test]
218    fn multiple_filter_terms_are_all_applied() {
219        let wrapped = wrap_query(
220            b"{}",
221            &[
222                (FieldName::from("_tenant"), Value::from("acme")),
223                (FieldName::from("_region"), Value::from("eu")),
224            ],
225        )
226        .unwrap();
227        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
228        let terms = doc["query"]["bool"]["filter"].as_array().unwrap();
229        assert_eq!(terms.len(), 2);
230    }
231
232    #[test]
233    fn a_nested_query_key_is_not_confused_with_the_top_level_one() {
234        // Only the *top-level* `query` is lifted into the bool. A `query` key
235        // nested inside a sibling subtree must ride along untouched.
236        let wrapped = wrap_query(
237            br#"{"query":{"match":{"msg":"hi"}},"aggs":{"q":{"terms":{"field":"query"}}}}"#,
238            &filter(),
239        )
240        .unwrap();
241        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
242        assert_eq!(doc["query"]["bool"]["must"][0]["match"]["msg"], "hi");
243        // The nested aggregation (which itself mentions "query") survives verbatim.
244        assert_eq!(doc["aggs"]["q"]["terms"]["field"], "query");
245    }
246
247    #[test]
248    fn complex_sibling_subtrees_survive_verbatim() {
249        let body = br#"{"size":5,"sort":[{"ts":"desc"},"_score"],"_source":["a","b"],"query":{"term":{"k":"v"}}}"#;
250        let wrapped = wrap_query(body, &filter()).unwrap();
251        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
252        assert_eq!(doc["size"], 5);
253        assert_eq!(doc["sort"][0]["ts"], "desc");
254        assert_eq!(doc["sort"][1], "_score");
255        assert_eq!(doc["_source"][1], "b");
256        assert_eq!(doc["query"]["bool"]["must"][0]["term"]["k"], "v");
257        assert_eq!(doc["query"]["bool"]["filter"][0]["term"]["_tenant"], "acme");
258    }
259
260    #[test]
261    fn escaped_and_unicode_content_in_the_client_query_is_preserved() {
262        // Embedding the query verbatim must not corrupt escapes or non-ASCII.
263        let body = "{\"query\":{\"match\":{\"msg\":\"a\\\"b\\\\c\\té \u{4e2d}\"}}}";
264        let wrapped = wrap_query(body.as_bytes(), &filter()).unwrap();
265        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
266        assert_eq!(
267            doc["query"]["bool"]["must"][0]["match"]["msg"],
268            "a\"b\\c\t\u{e9} \u{4e2d}"
269        );
270    }
271
272    #[test]
273    fn a_non_string_filter_value_is_embedded_correctly() {
274        let wrapped = wrap_query(
275            br#"{"query":{"match_all":{}}}"#,
276            &[
277                (FieldName::from("_active"), Value::from(true)),
278                (FieldName::from("_shard"), Value::from(7)),
279            ],
280        )
281        .unwrap();
282        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
283        assert_eq!(doc["query"]["bool"]["filter"][0]["term"]["_active"], true);
284        assert_eq!(doc["query"]["bool"]["filter"][1]["term"]["_shard"], 7);
285    }
286
287    #[test]
288    fn a_global_aggregation_is_rejected_under_a_partition_filter() {
289        // `global` ignores the query, so under a shared-index filter it would read
290        // across partitions (NFR-S4). Reject it, even nested under other aggs.
291        let body = br#"{"size":0,"aggs":{"outer":{"terms":{"field":"k"},"aggs":{"leak":{"global":{},"aggs":{"hits":{"top_hits":{"size":50}}}}}}}}"#;
292        assert_eq!(
293            wrap_query(body, &filter()).unwrap_err(),
294            RewriteError::Unfilterable {
295                construct: "global aggregation"
296            }
297        );
298        // The `aggregations` spelling is caught too.
299        let body = br#"{"aggregations":{"g":{"global":{}}}}"#;
300        assert!(matches!(
301            wrap_query(body, &filter()).unwrap_err(),
302            RewriteError::Unfilterable { .. }
303        ));
304    }
305
306    #[test]
307    fn a_suggest_block_is_rejected_under_a_partition_filter() {
308        let body = br#"{"suggest":{"s":{"text":"x","term":{"field":"msg"}}}}"#;
309        assert_eq!(
310            wrap_query(body, &filter()).unwrap_err(),
311            RewriteError::Unfilterable {
312                construct: "suggest"
313            }
314        );
315    }
316
317    #[test]
318    fn ordinary_query_scoped_aggregations_are_allowed() {
319        // A normal aggregation respects the wrapping `bool.filter`, so it stays.
320        let body = br#"{"aggs":{"by_k":{"terms":{"field":"k"}}}}"#;
321        let wrapped = wrap_query(body, &filter()).unwrap();
322        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
323        assert_eq!(doc["aggs"]["by_k"]["terms"]["field"], "k");
324        assert_eq!(doc["query"]["bool"]["filter"][0]["term"]["_tenant"], "acme");
325    }
326
327    #[test]
328    fn unfilterable_constructs_are_allowed_without_a_partition_filter() {
329        // A dedicated index/cluster has no filter (the whole target is the
330        // partition's), so a `global` agg or `suggest` is harmless and passes.
331        let body = br#"{"aggs":{"g":{"global":{}}},"suggest":{"s":{"text":"x"}}}"#;
332        let wrapped = wrap_query(body, &[]).unwrap();
333        let doc: Value = serde_json::from_slice(&wrapped).unwrap();
334        assert_eq!(doc["aggs"]["g"]["global"], serde_json::json!({}));
335    }
336
337    #[test]
338    fn non_object_body_is_rejected() {
339        assert_eq!(
340            wrap_query(b"[1,2,3]", &filter()).unwrap_err(),
341            RewriteError::NotAnObject
342        );
343        assert_eq!(
344            wrap_query(b"not json", &filter()).unwrap_err(),
345            RewriteError::InvalidJson
346        );
347    }
348}