use std::collections::BTreeMap;
use osproxy_core::FieldName;
use osproxy_rewrite::{map_logical_to_physical, map_physical_to_logical, strip_fields, wrap_query};
use osproxy_sink::{DocOp, ReadOp, SearchOp, WriteOp};
use osproxy_spi::{BodyTransform, DocIdRule, InjectedValue};
use osproxy_tenancy::Resolved;
use serde_json::value::RawValue;
use serde_json::Value;
use crate::error::RequestError;
pub(crate) struct ReadShape {
pub inject_names: Vec<FieldName>,
pub id_rule: Option<DocIdRule>,
}
pub(crate) fn build_read_op(
resolved: &Resolved,
logical_id: &str,
) -> Result<(ReadOp, ReadShape), RequestError> {
let shape = read_shape(&resolved.decision.body_transform);
let (physical_id, routing) = physical_id_and_routing(resolved, logical_id, &shape)?;
let op = ReadOp::new(resolved.decision.target.clone(), physical_id, routing)
.with_protocol(resolved.decision.upstream_protocol);
Ok((op, shape))
}
pub(crate) fn build_delete_op(
resolved: &Resolved,
logical_id: &str,
) -> Result<WriteOp, RequestError> {
let shape = read_shape(&resolved.decision.body_transform);
let (physical_id, routing) = physical_id_and_routing(resolved, logical_id, &shape)?;
Ok(WriteOp::new(
resolved.decision.target.clone(),
DocOp::Delete {
id: physical_id,
routing,
},
resolved.decision.epoch,
)
.with_protocol(resolved.decision.upstream_protocol))
}
pub(crate) fn build_delete_op_physical(resolved: &Resolved, physical_id: String) -> WriteOp {
let shape = read_shape(&resolved.decision.body_transform);
let routing = shape
.id_rule
.as_ref()
.filter(|r| r.set_routing)
.map(|_| resolved.partition.as_str().to_owned());
WriteOp::new(
resolved.decision.target.clone(),
DocOp::Delete {
id: physical_id,
routing,
},
resolved.decision.epoch,
)
.with_protocol(resolved.decision.upstream_protocol)
}
fn physical_id_and_routing(
resolved: &Resolved,
logical_id: &str,
shape: &ReadShape,
) -> Result<(String, Option<String>), RequestError> {
let partition = resolved.partition.as_str();
let physical_id = match &shape.id_rule {
Some(rule) => map_logical_to_physical(rule.template.as_str(), partition, logical_id)?,
None => logical_id.to_owned(),
};
let routing = shape
.id_rule
.as_ref()
.filter(|r| r.set_routing)
.map(|_| partition.to_owned());
Ok((physical_id, routing))
}
pub(crate) fn logical_write_id(resolved: &Resolved, physical_id: &str) -> String {
match read_shape(&resolved.decision.body_transform).id_rule {
Some(rule) => map_physical_to_logical(
rule.template.as_str(),
resolved.partition.as_str(),
physical_id,
)
.ok()
.flatten()
.unwrap_or_else(|| physical_id.to_owned()),
None => physical_id.to_owned(),
}
}
pub(crate) fn shape_found(
upstream_body: &[u8],
logical_index: &str,
logical_id: &str,
inject_names: &[FieldName],
) -> Result<Vec<u8>, RequestError> {
let mut doc: Value = serde_json::from_slice(upstream_body)
.map_err(|_| osproxy_rewrite::RewriteError::InvalidJson)?;
if let Some(obj) = doc.as_object_mut() {
obj.insert("_index".to_owned(), Value::String(logical_index.to_owned()));
obj.insert("_id".to_owned(), Value::String(logical_id.to_owned()));
obj.remove("_routing");
if let Some(source) = obj.get_mut("_source") {
strip_fields(source, inject_names);
}
}
serde_json::to_vec(&doc).map_err(|_| RequestError::Internal {
reason: "serializing read response",
})
}
#[must_use]
pub(crate) fn shape_delete(logical_index: &str, logical_id: &str, status: u16) -> Vec<u8> {
let result = ["deleted", "not_found"][usize::from(status == 404)];
let doc = serde_json::json!({
"_index": logical_index,
"_id": logical_id,
"result": result,
});
serde_json::to_vec(&doc).unwrap_or_else(|_| b"{}".to_vec())
}
#[must_use]
pub(crate) fn not_found_body(logical_index: &str, logical_id: &str) -> Vec<u8> {
let doc = serde_json::json!({
"_index": logical_index,
"_id": logical_id,
"found": false,
});
serde_json::to_vec(&doc).unwrap_or_else(|_| b"{\"found\":false}".to_vec())
}
pub(crate) fn build_search_op(
resolved: &Resolved,
body: &[u8],
) -> Result<(SearchOp, ReadShape), RequestError> {
let partition = resolved.partition.as_str();
let shape = read_shape(&resolved.decision.body_transform);
let filter = filter_terms(&resolved.decision.body_transform, partition);
let wrapped = wrap_query(body, &filter)?;
let op = SearchOp::new(resolved.decision.target.clone(), wrapped)
.with_protocol(resolved.decision.upstream_protocol);
Ok((op, shape))
}
pub(crate) fn shape_hits(
upstream_body: &[u8],
logical_index: &str,
partition: &str,
shape: &ReadShape,
) -> Result<Vec<u8>, RequestError> {
let internal = || RequestError::Internal {
reason: "serializing search response",
};
let mut top: BTreeMap<String, Box<RawValue>> = match serde_json::from_slice(upstream_body) {
Ok(top) => top,
Err(_) => {
return if serde_json::from_slice::<&RawValue>(upstream_body).is_ok() {
Ok(upstream_body.to_vec())
} else {
Err(osproxy_rewrite::RewriteError::InvalidJson.into())
};
}
};
if let Some(hits_raw) = top.remove("hits") {
let mut hits: Value = serde_json::from_slice(hits_raw.get().as_bytes())
.map_err(|_| osproxy_rewrite::RewriteError::InvalidJson)?;
if let Some(arr) = hits.get_mut("hits").and_then(Value::as_array_mut) {
for hit in arr.iter_mut() {
shape_hit(hit, logical_index, partition, shape);
}
}
top.insert(
"hits".to_owned(),
serde_json::value::to_raw_value(&hits).map_err(|_| internal())?,
);
}
serde_json::to_vec(&top).map_err(|_| internal())
}
pub(crate) fn shape_hit(hit: &mut Value, logical_index: &str, partition: &str, shape: &ReadShape) {
let Some(obj) = hit.as_object_mut() else {
return;
};
obj.insert("_index".to_owned(), Value::String(logical_index.to_owned()));
obj.remove("_routing");
if let Some(rule) = &shape.id_rule {
if let Some(Value::String(physical)) = obj.get("_id") {
if let Ok(Some(logical)) =
map_physical_to_logical(rule.template.as_str(), partition, physical)
{
obj.insert("_id".to_owned(), Value::String(logical));
}
}
}
if let Some(source) = obj.get_mut("_source") {
strip_fields(source, &shape.inject_names);
}
}
fn filter_terms(transform: &BodyTransform, partition: &str) -> Vec<(FieldName, Value)> {
let fields = match transform {
BodyTransform::Inject(fields) | BodyTransform::Both { inject: fields, .. } => {
fields.as_slice()
}
BodyTransform::None | BodyTransform::ConstructId(_) => &[],
};
fields
.iter()
.filter(|field| matches!(field.value, InjectedValue::PartitionId))
.map(|field| (field.name.clone(), Value::String(partition.to_owned())))
.collect()
}
fn read_shape(transform: &BodyTransform) -> ReadShape {
match transform {
BodyTransform::None => ReadShape {
inject_names: Vec::new(),
id_rule: None,
},
BodyTransform::Inject(fields) => ReadShape {
inject_names: field_names(fields),
id_rule: None,
},
BodyTransform::ConstructId(rule) => ReadShape {
inject_names: Vec::new(),
id_rule: Some(rule.clone()),
},
BodyTransform::Both { inject, id } => ReadShape {
inject_names: field_names(inject),
id_rule: Some(id.clone()),
},
}
}
fn field_names(fields: &[osproxy_spi::InjectedField]) -> Vec<FieldName> {
fields.iter().map(|f| f.name.clone()).collect()
}
#[cfg(test)]
#[path = "read_tests.rs"]
mod tests;