use std::collections::HashMap;
use futures_util::stream::StreamExt as _;
use osproxy_rewrite::{parse_mget, MgetItem};
use osproxy_sink::{ReadOp, ReadOutcome, Reader, SinkError};
use osproxy_spi::{BodyDoc, RequestCtx};
use osproxy_tenancy::{Resolved, Router};
use serde_json::{json, Value};
use crate::error::RequestError;
use crate::pipeline::PipelineResponse;
use crate::read::{build_read_op, not_found_body, shape_found, ReadShape};
use crate::retry::with_retry;
const MAX_FETCH_CONCURRENCY: usize = 8;
pub(crate) async fn multi_get<R: Router, S: Reader>(
router: &R,
sink: &S,
ctx: &RequestCtx<'_>,
retry: crate::RetryPolicy,
up_trace: Option<osproxy_core::TraceContext>,
) -> Result<PipelineResponse, RequestError> {
let items = parse_mget(ctx.body())?;
let n = items.len();
let partition = router.resolve_partition(ctx, BodyDoc::new(&[]))?;
let mut docs: Vec<Value> = vec![Value::Null; n];
let mut prepared: Vec<Option<Prepared>> = (0..n).map(|_| None).collect();
let mut cache: HashMap<String, Resolved> = HashMap::new();
for (ordinal, item) in items.into_iter().enumerate() {
match prepare(
router,
ctx,
&partition,
&mut cache,
&item,
retry,
up_trace.as_ref(),
)
.await
{
Ok(p) => prepared[ordinal] = Some(p),
Err(line) => docs[ordinal] = line,
}
}
fetch_all(sink, &prepared, &mut docs).await;
let body = json!({ "docs": docs });
Ok(PipelineResponse {
status: 200,
body: serde_json::to_vec(&body).map_err(|_| RequestError::Internal {
reason: "serializing mget response",
})?,
content_type: None,
})
}
struct Prepared {
op: ReadOp,
shape: ReadShape,
logical_index: String,
logical_id: String,
}
#[allow(
clippy::too_many_arguments,
reason = "a per-document prepare genuinely needs the router, request, resolved \
partition, placement cache, the item, the retry policy, and the gated \
upstream trace; bundling them would only shuffle the same values."
)]
async fn prepare<R: Router>(
router: &R,
ctx: &RequestCtx<'_>,
partition: &osproxy_core::PartitionId,
cache: &mut HashMap<String, Resolved>,
item: &MgetItem,
retry: crate::RetryPolicy,
up_trace: Option<&osproxy_core::TraceContext>,
) -> Result<Prepared, Value> {
let logical_index = item
.index
.clone()
.unwrap_or_else(|| ctx.logical_index().to_owned());
let resolved = if let Some(r) = cache.get(&logical_index) {
r.clone()
} else {
let r = with_retry(retry, || {
router.resolve_placement(ctx, partition.clone(), &logical_index)
})
.await
.map_err(|_| error_doc(&logical_index, &item.id, "placement_missing"))?;
cache.insert(logical_index.clone(), r.clone());
r
};
let (op, shape) = build_read_op(&resolved, &item.id)
.map_err(|_| error_doc(&logical_index, &item.id, "irreversible_id"))?;
Ok(Prepared {
op: op
.with_trace(up_trace.cloned())
.with_forward_headers(ctx.forward_headers().to_vec()),
shape,
logical_index,
logical_id: item.id.clone(),
})
}
async fn fetch_all<S: Reader>(sink: &S, prepared: &[Option<Prepared>], docs: &mut [Value]) {
let ops: Vec<(usize, ReadOp)> = prepared
.iter()
.enumerate()
.filter_map(|(ordinal, slot)| slot.as_ref().map(|p| (ordinal, p.op.clone())))
.collect();
let results: Vec<(usize, Result<ReadOutcome, SinkError>)> = futures_util::stream::iter(ops)
.map(|(ordinal, op)| async move { (ordinal, sink.get(op).await) })
.buffer_unordered(MAX_FETCH_CONCURRENCY)
.collect()
.await;
for (ordinal, result) in results {
if let Some(p) = prepared[ordinal].as_ref() {
docs[ordinal] = shape_result(p, result);
}
}
}
fn shape_result(p: &Prepared, result: Result<ReadOutcome, SinkError>) -> Value {
match result {
Ok(outcome) if outcome.found => {
let shaped = shape_found(
&outcome.body,
&p.logical_index,
&p.logical_id,
&p.shape.inject_names,
);
shaped
.ok()
.and_then(|bytes| serde_json::from_slice(&bytes).ok())
.unwrap_or_else(|| error_doc(&p.logical_index, &p.logical_id, "malformed_upstream"))
}
Ok(_) => serde_json::from_slice(¬_found_body(&p.logical_index, &p.logical_id))
.unwrap_or_else(|_| error_doc(&p.logical_index, &p.logical_id, "malformed_upstream")),
Err(_) => error_doc(&p.logical_index, &p.logical_id, "upstream_failed"),
}
}
fn error_doc(logical_index: &str, logical_id: &str, error: &'static str) -> Value {
json!({
"_index": logical_index,
"_id": logical_id,
"error": { "type": error },
})
}