use osproxy_core::{ClusterId, CursorSigner};
use osproxy_spi::RequestCtx;
const FORWARDABLE_PARAMS: &[&str] = &["scroll", "keep_alive"];
pub(crate) fn forwardable_query(raw: Option<&str>) -> Option<String> {
let kept: Vec<&str> = raw?
.split('&')
.filter(|pair| {
let key = pair.split('=').next().unwrap_or(pair);
FORWARDABLE_PARAMS.contains(&key)
})
.collect();
if kept.is_empty() {
None
} else {
Some(kept.join("&"))
}
}
pub(crate) fn has_scroll_id(body: &[u8]) -> bool {
const NEEDLE: &[u8] = b"_scroll_id";
body.windows(NEEDLE.len()).any(|w| w == NEEDLE)
}
pub(crate) fn wrap_scroll_id_in_response(
body: Vec<u8>,
signer: &dyn CursorSigner,
cluster: &ClusterId,
) -> Vec<u8> {
let Ok(mut v) = serde_json::from_slice::<serde_json::Value>(&body) else {
return body;
};
let Some(id) = v.get("_scroll_id").and_then(serde_json::Value::as_str) else {
return body;
};
let wrapped = osproxy_core::cursor::wrap(signer, cluster, id);
v["_scroll_id"] = serde_json::Value::String(wrapped);
serde_json::to_vec(&v).unwrap_or(body)
}
pub(crate) fn pit_id_in_body(body: &[u8]) -> Option<String> {
let v: serde_json::Value = serde_json::from_slice(body).ok()?;
v.get("pit")?
.get("id")?
.as_str()
.map(std::borrow::ToOwned::to_owned)
}
pub(crate) fn rewrite_pit_id(body: Vec<u8>, real_id: &str) -> Vec<u8> {
let Ok(mut v) = serde_json::from_slice::<serde_json::Value>(&body) else {
return body;
};
let Some(pit) = v.get_mut("pit").and_then(serde_json::Value::as_object_mut) else {
return body;
};
pit.insert(
"id".to_owned(),
serde_json::Value::String(real_id.to_owned()),
);
serde_json::to_vec(&v).unwrap_or(body)
}
pub(crate) fn wrap_pit_id_in_response(
body: Vec<u8>,
signer: &dyn CursorSigner,
cluster: &ClusterId,
) -> Vec<u8> {
let Ok(mut v) = serde_json::from_slice::<serde_json::Value>(&body) else {
return body;
};
let Some(id) = v.get("pit_id").and_then(serde_json::Value::as_str) else {
return body;
};
let wrapped = osproxy_core::cursor::wrap(signer, cluster, id);
v["pit_id"] = serde_json::Value::String(wrapped);
serde_json::to_vec(&v).unwrap_or(body)
}
pub(crate) fn pit_ids_in_delete_body(body: &[u8]) -> Option<Vec<String>> {
let v: serde_json::Value = serde_json::from_slice(body).ok()?;
let ids: Vec<String> = v
.get("pit_id")?
.as_array()?
.iter()
.filter_map(|x| x.as_str().map(std::borrow::ToOwned::to_owned))
.collect();
(!ids.is_empty()).then_some(ids)
}
pub(crate) struct CursorRequest {
pub(crate) wrapped: String,
pub(crate) upstream_path: &'static str,
pub(crate) id_field: &'static str,
}
pub(crate) fn cursor_request(ctx: &RequestCtx<'_>) -> Option<CursorRequest> {
if let Some(id) = ctx.doc_id() {
return Some(CursorRequest {
wrapped: id.to_owned(),
upstream_path: "/_search/scroll",
id_field: "scroll_id",
});
}
let v: serde_json::Value = serde_json::from_slice(ctx.body()).ok()?;
if let Some(id) = v.get("scroll_id").and_then(serde_json::Value::as_str) {
return Some(CursorRequest {
wrapped: id.to_owned(),
upstream_path: "/_search/scroll",
id_field: "scroll_id",
});
}
None
}
pub(crate) fn rewrite_cursor_body(client_body: &[u8], id_field: &str, real_id: &str) -> Vec<u8> {
let mut v = serde_json::from_slice::<serde_json::Value>(client_body)
.ok()
.filter(serde_json::Value::is_object)
.unwrap_or_else(|| serde_json::json!({}));
v[id_field] = serde_json::Value::String(real_id.to_owned());
serde_json::to_vec(&v).unwrap_or_else(|_| {
serde_json::to_vec(&serde_json::json!({ id_field: real_id }))
.unwrap_or_else(|_| b"{}".to_vec())
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn only_cursor_params_are_forwarded_not_query_affecting_ones() {
assert_eq!(
forwardable_query(Some("scroll=1m")).as_deref(),
Some("scroll=1m")
);
assert_eq!(
forwardable_query(Some("keep_alive=5m")).as_deref(),
Some("keep_alive=5m")
);
assert_eq!(
forwardable_query(Some("q=*&scroll=1m")).as_deref(),
Some("scroll=1m"),
"a query-string search param must never reach the upstream"
);
assert_eq!(forwardable_query(Some("q=*")), None);
assert_eq!(forwardable_query(Some("source={}&analyzer=x")), None);
assert_eq!(forwardable_query(None), None);
assert_eq!(forwardable_query(Some("")), None);
}
#[test]
fn pit_delete_body_yields_the_wrapped_id_array() {
assert_eq!(
pit_ids_in_delete_body(br#"{"pit_id":["a","b"]}"#),
Some(vec!["a".to_owned(), "b".to_owned()])
);
assert_eq!(pit_ids_in_delete_body(br#"{"pit_id":[]}"#), None);
assert_eq!(pit_ids_in_delete_body(br#"{"scroll_id":"x"}"#), None);
assert_eq!(pit_ids_in_delete_body(b"not json"), None);
}
}