use std::collections::HashMap;
use bytes::Bytes;
use osproxy_core::{FieldName, PartitionId};
use osproxy_rewrite::{
construct_id_bytes, inject_fields_bytes, inject_update, map_logical_to_physical,
map_physical_to_logical, BulkAction, BulkItem,
};
use osproxy_sink::{DocOp, WriteOp};
use osproxy_spi::{BodyDoc, BodyTransform, InjectedField, InjectedValue, RequestCtx};
use osproxy_tenancy::{Resolved, Router};
use serde_json::Value;
pub(crate) struct CachedResolution {
resolved: Resolved,
inject: Vec<(FieldName, Value)>,
}
pub(crate) type ResolutionCache = HashMap<(PartitionId, String), CachedResolution>;
pub(crate) struct Prepared {
pub(crate) op: WriteOp,
pub(crate) partition: PartitionId,
pub(crate) action: &'static str,
pub(crate) logical_index: String,
pub(crate) logical_id: String,
}
pub(crate) struct ItemFailure {
action: &'static str,
logical_index: String,
logical_id: Option<String>,
status: u16,
error: &'static str,
}
impl ItemFailure {
pub(crate) fn into_line(self) -> crate::bulkline::Line {
crate::bulkline::Line::error(
self.action,
self.logical_index,
self.logical_id,
self.status,
self.error,
)
}
}
pub(crate) async fn prepare<R: Router>(
router: &R,
ctx: &RequestCtx<'_>,
cache: &mut ResolutionCache,
item: BulkItem,
retry: crate::RetryPolicy,
up_trace: Option<&osproxy_core::TraceContext>,
) -> Result<Prepared, ItemFailure> {
let action = item.action.keyword();
let logical_index = item
.index
.clone()
.unwrap_or_else(|| ctx.logical_index().to_owned());
let partition = router
.resolve_partition(
ctx,
BodyDoc::new(item.source.as_deref().unwrap_or_default()),
)
.map_err(|_| {
fail(
action,
&logical_index,
item.id.clone(),
400,
"partition_unresolved",
)
})?;
let key = (partition.clone(), logical_index.clone());
if !cache.contains_key(&key) {
let resolved = crate::retry::with_retry(retry, || {
router.resolve_placement(ctx, partition.clone(), &logical_index)
})
.await
.map_err(|_| {
fail(
action,
&logical_index,
item.id.clone(),
404,
"placement_missing",
)
})?;
let inject = inject_pairs(
&resolved.decision.body_transform,
resolved.partition.as_str(),
);
cache.insert(key.clone(), CachedResolution { resolved, inject });
}
let Some(entry) = cache.get(&key) else {
return Err(fail(
action,
&logical_index,
item.id.clone(),
404,
"placement_missing",
));
};
let mut prepared = build_op(&entry.resolved, &entry.inject, &item, action, logical_index)?;
prepared.op = prepared
.op
.with_trace(up_trace.cloned())
.with_forward_headers(ctx.forward_headers().to_vec());
Ok(prepared)
}
fn build_op(
resolved: &Resolved,
inject: &[(FieldName, Value)],
item: &BulkItem,
action: &'static str,
logical_index: String,
) -> Result<Prepared, ItemFailure> {
let partition = resolved.partition.as_str();
let id_rule = id_rule_of(&resolved.decision.body_transform);
let rule = id_rule.as_ref();
let bad = |code| fail(action, &logical_index, item.id.clone(), 400, code);
let (doc, logical_id) = match item.action {
BulkAction::Delete => {
let logical = item.id.clone().ok_or_else(|| bad("delete_without_id"))?;
let id =
physical_id(rule, partition, &logical).ok_or_else(|| bad("irreversible_id"))?;
(
DocOp::Delete {
id,
routing: routing_for(rule, partition),
},
logical,
)
}
BulkAction::Update => build_update(item, inject, rule, partition, bad)?,
BulkAction::Index | BulkAction::Create => {
let source = item
.source
.as_deref()
.ok_or_else(|| bad("missing_source"))?;
let body = Bytes::from(
inject_fields_bytes(source, inject).map_err(|_| bad("reserved_field_collision"))?,
);
let (id, logical) =
index_id(rule, partition, item, source).ok_or_else(|| bad("id_construction"))?;
let routing = id.as_ref().and_then(|_| routing_for(rule, partition));
let doc = if item.action == BulkAction::Create {
DocOp::Create { id, routing, body }
} else {
DocOp::Index { id, routing, body }
};
(doc, logical)
}
};
Ok(Prepared {
op: WriteOp::new(
resolved.decision.target.clone(),
doc,
resolved.decision.epoch,
)
.with_protocol(resolved.decision.upstream_protocol),
partition: resolved.partition.clone(),
action,
logical_index,
logical_id,
})
}
fn build_update<F: Fn(&'static str) -> ItemFailure>(
item: &BulkItem,
inject: &[(osproxy_core::FieldName, Value)],
rule: Option<&IdRule<'_>>,
partition: &str,
bad: F,
) -> Result<(DocOp, String), ItemFailure> {
let logical = item.id.clone().ok_or_else(|| bad("update_without_id"))?;
let id = physical_id(rule, partition, &logical).ok_or_else(|| bad("irreversible_id"))?;
let source = item
.source
.as_deref()
.ok_or_else(|| bad("missing_source"))?;
let mut update: Value = serde_json::from_slice(source).map_err(|_| bad("invalid_json"))?;
inject_update(&mut update, inject).map_err(|_| bad("reserved_field_collision"))?;
let body = Bytes::from(serde_json::to_vec(&update).map_err(|_| bad("serialize"))?);
Ok((
DocOp::Update {
id,
routing: routing_for(rule, partition),
body,
},
logical,
))
}
fn index_id(
id_rule: Option<&IdRule<'_>>,
partition: &str,
item: &BulkItem,
source: &[u8],
) -> Option<(Option<String>, String)> {
match (id_rule, item.id.as_deref()) {
(Some(rule), Some(logical)) => {
let physical = map_logical_to_physical(rule.template, partition, logical).ok()?;
Some((Some(physical), logical.to_owned()))
}
(Some(rule), None) => {
let physical = construct_id_bytes(rule.template, partition, source).ok()?;
let logical = map_physical_to_logical(rule.template, partition, &physical)
.ok()
.flatten()
.unwrap_or_else(|| physical.clone());
Some((Some(physical), logical))
}
(None, id) => Some((id.map(str::to_owned), id.unwrap_or("").to_owned())),
}
}
fn physical_id(id_rule: Option<&IdRule<'_>>, partition: &str, logical: &str) -> Option<String> {
match id_rule {
Some(rule) => map_logical_to_physical(rule.template, partition, logical).ok(),
None => Some(logical.to_owned()),
}
}
fn routing_for(id_rule: Option<&IdRule<'_>>, partition: &str) -> Option<String> {
id_rule
.filter(|r| r.set_routing)
.map(|_| partition.to_owned())
}
struct IdRule<'a> {
template: &'a str,
set_routing: bool,
}
fn inject_pairs(transform: &BodyTransform, partition: &str) -> Vec<(FieldName, Value)> {
let fields: &[InjectedField] = match transform {
BodyTransform::Inject(f) | BodyTransform::Both { inject: f, .. } => f,
BodyTransform::None | BodyTransform::ConstructId(_) => &[],
};
fields
.iter()
.map(|f| (f.name.clone(), constant(&f.value, partition)))
.collect()
}
fn id_rule_of(transform: &BodyTransform) -> Option<IdRule<'_>> {
let rule: Option<&osproxy_spi::DocIdRule> = match transform {
BodyTransform::ConstructId(r) | BodyTransform::Both { id: r, .. } => Some(r),
BodyTransform::None | BodyTransform::Inject(_) => None,
};
rule.map(|r| IdRule {
template: r.template.as_str(),
set_routing: r.set_routing,
})
}
fn constant(value: &InjectedValue, partition: &str) -> Value {
match value {
InjectedValue::Constant(v) => v.clone(),
InjectedValue::PartitionId
| InjectedValue::FromPrincipal(_)
| InjectedValue::FromHeader(_) => Value::String(partition.to_owned()),
}
}
fn fail(
action: &'static str,
logical_index: &str,
logical_id: Option<String>,
status: u16,
error: &'static str,
) -> ItemFailure {
ItemFailure {
action,
logical_index: logical_index.to_owned(),
logical_id,
status,
error,
}
}