use std::collections::HashMap;
use osproxy_core::PartitionId;
use osproxy_rewrite::{
construct_id_bytes, inject_fields_bytes, inject_update, map_logical_to_physical,
map_physical_to_logical, BulkAction, BulkItem,
};
use osproxy_sink::{DocOp, WriteOp};
use osproxy_spi::{BodyDoc, BodyTransform, InjectedField, InjectedValue, RequestCtx};
use osproxy_tenancy::{Resolved, Router};
use serde_json::Value;
pub(crate) struct Prepared {
pub(crate) op: WriteOp,
pub(crate) partition: PartitionId,
pub(crate) action: &'static str,
pub(crate) logical_index: String,
pub(crate) logical_id: String,
}
pub(crate) struct ItemFailure {
action: &'static str,
logical_index: String,
logical_id: Option<String>,
status: u16,
error: &'static str,
}
impl ItemFailure {
pub(crate) fn into_line(self) -> Value {
serde_json::json!({ self.action: {
"_index": self.logical_index,
"_id": self.logical_id,
"status": self.status,
"error": { "type": self.error },
}})
}
}
pub(crate) async fn prepare<R: Router>(
router: &R,
ctx: &RequestCtx<'_>,
cache: &mut HashMap<(PartitionId, String), Resolved>,
item: BulkItem,
retry: crate::RetryPolicy,
up_trace: Option<&osproxy_core::TraceContext>,
) -> Result<Prepared, ItemFailure> {
let action = item.action.keyword();
let logical_index = item
.index
.clone()
.unwrap_or_else(|| ctx.logical_index().to_owned());
let partition = router
.resolve_partition(
ctx,
BodyDoc::new(item.source.as_deref().unwrap_or_default()),
)
.map_err(|_| {
fail(
action,
&logical_index,
item.id.clone(),
400,
"partition_unresolved",
)
})?;
let key = (partition.clone(), logical_index.clone());
let resolved = if let Some(r) = cache.get(&key) {
r.clone()
} else {
let r = crate::retry::with_retry(retry, || {
router.resolve_placement(ctx, partition.clone(), &logical_index)
})
.await
.map_err(|_| {
fail(
action,
&logical_index,
item.id.clone(),
404,
"placement_missing",
)
})?;
cache.insert(key, r.clone());
r
};
let mut prepared = build_op(&resolved, &item, action, logical_index)?;
prepared.op = prepared
.op
.with_trace(up_trace.cloned())
.with_forward_headers(ctx.forward_headers().to_vec());
Ok(prepared)
}
fn build_op(
resolved: &Resolved,
item: &BulkItem,
action: &'static str,
logical_index: String,
) -> Result<Prepared, ItemFailure> {
let partition = resolved.partition.as_str();
let (inject, id_rule) = transform_parts(&resolved.decision.body_transform, partition);
let rule = id_rule.as_ref();
let bad = |code| fail(action, &logical_index, item.id.clone(), 400, code);
let (doc, logical_id) = match item.action {
BulkAction::Delete => {
let logical = item.id.clone().ok_or_else(|| bad("delete_without_id"))?;
let id =
physical_id(rule, partition, &logical).ok_or_else(|| bad("irreversible_id"))?;
(
DocOp::Delete {
id,
routing: routing_for(rule, partition),
},
logical,
)
}
BulkAction::Update => build_update(item, &inject, rule, partition, bad)?,
BulkAction::Index | BulkAction::Create => {
let source = item
.source
.as_deref()
.ok_or_else(|| bad("missing_source"))?;
let body = inject_fields_bytes(source, &inject)
.map_err(|_| bad("reserved_field_collision"))?;
let (id, logical) =
index_id(rule, partition, item, source).ok_or_else(|| bad("id_construction"))?;
let routing = id.as_ref().and_then(|_| routing_for(rule, partition));
let doc = if item.action == BulkAction::Create {
DocOp::Create { id, routing, body }
} else {
DocOp::Index { id, routing, body }
};
(doc, logical)
}
};
Ok(Prepared {
op: WriteOp::new(
resolved.decision.target.clone(),
doc,
resolved.decision.epoch,
)
.with_protocol(resolved.decision.upstream_protocol),
partition: resolved.partition.clone(),
action,
logical_index,
logical_id,
})
}
fn build_update<F: Fn(&'static str) -> ItemFailure>(
item: &BulkItem,
inject: &[(osproxy_core::FieldName, Value)],
rule: Option<&IdRule<'_>>,
partition: &str,
bad: F,
) -> Result<(DocOp, String), ItemFailure> {
let logical = item.id.clone().ok_or_else(|| bad("update_without_id"))?;
let id = physical_id(rule, partition, &logical).ok_or_else(|| bad("irreversible_id"))?;
let source = item
.source
.as_deref()
.ok_or_else(|| bad("missing_source"))?;
let mut update: Value = serde_json::from_slice(source).map_err(|_| bad("invalid_json"))?;
inject_update(&mut update, inject).map_err(|_| bad("reserved_field_collision"))?;
let body = serde_json::to_vec(&update).map_err(|_| bad("serialize"))?;
Ok((
DocOp::Update {
id,
routing: routing_for(rule, partition),
body,
},
logical,
))
}
fn index_id(
id_rule: Option<&IdRule<'_>>,
partition: &str,
item: &BulkItem,
source: &[u8],
) -> Option<(Option<String>, String)> {
match (id_rule, item.id.as_deref()) {
(Some(rule), Some(logical)) => {
let physical = map_logical_to_physical(rule.template, partition, logical).ok()?;
Some((Some(physical), logical.to_owned()))
}
(Some(rule), None) => {
let physical = construct_id_bytes(rule.template, partition, source).ok()?;
let logical = map_physical_to_logical(rule.template, partition, &physical)
.ok()
.flatten()
.unwrap_or_else(|| physical.clone());
Some((Some(physical), logical))
}
(None, id) => Some((id.map(str::to_owned), id.unwrap_or("").to_owned())),
}
}
fn physical_id(id_rule: Option<&IdRule<'_>>, partition: &str, logical: &str) -> Option<String> {
match id_rule {
Some(rule) => map_logical_to_physical(rule.template, partition, logical).ok(),
None => Some(logical.to_owned()),
}
}
fn routing_for(id_rule: Option<&IdRule<'_>>, partition: &str) -> Option<String> {
id_rule
.filter(|r| r.set_routing)
.map(|_| partition.to_owned())
}
struct IdRule<'a> {
template: &'a str,
set_routing: bool,
}
fn transform_parts<'a>(
transform: &'a BodyTransform,
partition: &str,
) -> (Vec<(osproxy_core::FieldName, Value)>, Option<IdRule<'a>>) {
let (fields, rule): (&[InjectedField], Option<&osproxy_spi::DocIdRule>) = match transform {
BodyTransform::None => (&[], None),
BodyTransform::Inject(f) => (f, None),
BodyTransform::ConstructId(r) => (&[], Some(r)),
BodyTransform::Both { inject, id } => (inject, Some(id)),
};
let inject = fields
.iter()
.map(|f| (f.name.clone(), constant(&f.value, partition)))
.collect();
let id_rule = rule.map(|r| IdRule {
template: r.template.as_str(),
set_routing: r.set_routing,
});
(inject, id_rule)
}
fn constant(value: &InjectedValue, partition: &str) -> Value {
match value {
InjectedValue::Constant(v) => v.clone(),
InjectedValue::PartitionId
| InjectedValue::FromPrincipal(_)
| InjectedValue::FromHeader(_) => Value::String(partition.to_owned()),
}
}
fn fail(
action: &'static str,
logical_index: &str,
logical_id: Option<String>,
status: u16,
error: &'static str,
) -> ItemFailure {
ItemFailure {
action,
logical_index: logical_index.to_owned(),
logical_id,
status,
error,
}
}