use std::collections::{BTreeMap, HashMap};
use futures_util::stream::StreamExt as _;
use osproxy_rewrite::{parse_msearch, MsearchItem};
use osproxy_sink::{Reader, SearchOp, SearchOutcome, SinkError};
use osproxy_spi::{BodyDoc, RequestCtx};
use osproxy_tenancy::{Resolved, Router};
use serde_json::json;
use serde_json::value::RawValue;
use crate::error::RequestError;
use crate::pipeline::PipelineResponse;
use crate::read::{shape_hits, ReadShape};
use crate::retry::with_retry;
const MAX_SEARCH_CONCURRENCY: usize = 8;
pub(crate) async fn multi_search<R: Router, S: Reader>(
router: &R,
sink: &S,
ctx: &RequestCtx<'_>,
retry: crate::RetryPolicy,
up_trace: Option<osproxy_core::TraceContext>,
) -> Result<PipelineResponse, RequestError> {
let items = parse_msearch(ctx.body())?;
let n = items.len();
let partition = router.resolve_partition(ctx, BodyDoc::new(&[]))?;
let mut responses: Vec<Vec<u8>> = vec![b"null".to_vec(); n];
let mut prepared: Vec<Option<Prepared>> = (0..n).map(|_| None).collect();
let mut cache: HashMap<String, Resolved> = HashMap::new();
for (ordinal, item) in items.into_iter().enumerate() {
match prepare(
router,
ctx,
&partition,
&mut cache,
&item,
retry,
up_trace.as_ref(),
)
.await
{
Ok(p) => prepared[ordinal] = Some(p),
Err(line) => responses[ordinal] = line,
}
}
run_all(sink, &prepared, &mut responses).await;
Ok(PipelineResponse {
status: 200,
body: assemble_responses(&responses),
content_type: None,
})
}
fn assemble_responses(responses: &[Vec<u8>]) -> Vec<u8> {
let mut body = Vec::with_capacity(16 + responses.iter().map(Vec::len).sum::<usize>());
body.extend_from_slice(b"{\"responses\":[");
for (i, entry) in responses.iter().enumerate() {
if i > 0 {
body.push(b',');
}
body.extend_from_slice(entry);
}
body.extend_from_slice(b"]}");
body
}
struct Prepared {
op: SearchOp,
shape: ReadShape,
logical_index: String,
partition: String,
}
#[allow(
clippy::too_many_arguments,
reason = "a per-search prepare genuinely needs the router, request, resolved \
partition, placement cache, the item, the retry policy, and the gated \
upstream trace; bundling them would only shuffle the same values."
)]
async fn prepare<R: Router>(
router: &R,
ctx: &RequestCtx<'_>,
partition: &osproxy_core::PartitionId,
cache: &mut HashMap<String, Resolved>,
item: &MsearchItem,
retry: crate::RetryPolicy,
up_trace: Option<&osproxy_core::TraceContext>,
) -> Result<Prepared, Vec<u8>> {
let logical_index = item
.index
.clone()
.unwrap_or_else(|| ctx.logical_index().to_owned());
let resolved = if let Some(r) = cache.get(&logical_index) {
r.clone()
} else {
let r = with_retry(retry, || {
router.resolve_placement(ctx, partition.clone(), &logical_index)
})
.await
.map_err(|_| error_response(400, "placement_missing"))?;
cache.insert(logical_index.clone(), r.clone());
r
};
let (op, shape) = crate::read::build_search_op(&resolved, &item.query)
.map_err(|_| error_response(400, "invalid_query"))?;
Ok(Prepared {
op: op
.with_trace(up_trace.cloned())
.with_forward_headers(ctx.forward_headers().to_vec()),
shape,
logical_index,
partition: resolved.partition.as_str().to_owned(),
})
}
async fn run_all<S: Reader>(sink: &S, prepared: &[Option<Prepared>], responses: &mut [Vec<u8>]) {
let ops: Vec<(usize, SearchOp)> = prepared
.iter()
.enumerate()
.filter_map(|(ordinal, slot)| slot.as_ref().map(|p| (ordinal, p.op.clone())))
.collect();
let results: Vec<(usize, Result<SearchOutcome, SinkError>)> = futures_util::stream::iter(ops)
.map(|(ordinal, op)| async move { (ordinal, sink.search(op).await) })
.buffer_unordered(MAX_SEARCH_CONCURRENCY)
.collect()
.await;
for (ordinal, result) in results {
if let Some(p) = prepared[ordinal].as_ref() {
responses[ordinal] = shape_response(p, result);
}
}
}
fn shape_response(p: &Prepared, result: Result<SearchOutcome, SinkError>) -> Vec<u8> {
let Ok(outcome) = result else {
return error_response(502, "upstream_failed");
};
match shape_hits(&outcome.body, &p.logical_index, &p.partition, &p.shape) {
Ok(shaped) => with_status(&shaped, outcome.status)
.unwrap_or_else(|| error_response(502, "malformed_upstream")),
Err(_) => error_response(502, "malformed_upstream"),
}
}
fn with_status(shaped: &[u8], status: u16) -> Option<Vec<u8>> {
let mut top: BTreeMap<String, Box<RawValue>> = serde_json::from_slice(shaped).ok()?;
top.insert(
"status".to_owned(),
serde_json::value::to_raw_value(&status).ok()?,
);
serde_json::to_vec(&top).ok()
}
fn error_response(status: u16, error: &'static str) -> Vec<u8> {
serde_json::to_vec(&json!({ "error": { "type": error }, "status": status }))
.unwrap_or_else(|_| br#"{"error":{"type":"internal"},"status":500}"#.to_vec())
}