use osproxy_observe::{DispatchInfo, RequestTrace};
use osproxy_sink::{CursorOp, Reader, Sink};
use osproxy_spi::RequestCtx;
use osproxy_tenancy::Router;
use std::collections::BTreeMap;
use osproxy_core::ClusterId;
use crate::cursor::{forwardable_query, rewrite_pit_id, wrap_pit_id_in_response};
use crate::error::RequestError;
use crate::observe::resolve_info;
use crate::pipeline::{Pipeline, PipelineResponse};
use crate::read::{build_search_op, shape_hits};
impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
pub(crate) async fn pit_search(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
wrapped_pit: &str,
) -> Result<PipelineResponse, RequestError> {
let signer = self.cursor_signer.as_ref().ok_or(RequestError::Cursor {
reason: "cursor affinity is not enabled",
})?;
let (cluster, real_pit) = osproxy_core::cursor::unwrap(signer.as_ref(), wrapped_pit)
.ok_or(RequestError::Cursor {
reason: "pit envelope is invalid or expired",
})?;
let resolved = self.resolve_with_retry(ctx).await?;
trace.record_resolve(resolve_info(&resolved));
let (search_op, shape) = build_search_op(&resolved, ctx.body())?;
let body = rewrite_pit_id(search_op.body, &real_pit);
let op = CursorOp::new(cluster.clone(), ctx.method(), "/_search", body)
.with_endpoint(self.router.cluster_endpoint(&cluster))
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec());
let outcome = self.sink.cursor(op).await?;
trace.record_dispatch(DispatchInfo {
cluster: cluster.clone(),
upstream_status: outcome.status,
pool_reuse: outcome.pool_reuse,
});
let stripped = shape_hits(
&outcome.body,
ctx.logical_index(),
resolved.partition.as_str(),
&shape,
)?;
let body = wrap_pit_id_in_response(stripped, signer.as_ref(), &cluster);
Ok(PipelineResponse {
status: outcome.status,
body,
content_type: None,
})
}
pub(crate) async fn pit_create(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
let resolved = self.resolve_with_retry(ctx).await?;
trace.record_resolve(resolve_info(&resolved));
let target = &resolved.decision.target;
let op = CursorOp::new(
target.cluster.clone(),
ctx.method(),
format!("/{}/_search/point_in_time", target.index.as_str()),
ctx.body().to_vec(),
)
.with_endpoint(target.endpoint.clone())
.with_query(forwardable_query(ctx.query()))
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec());
let outcome = self.sink.cursor(op).await?;
let body = match &self.cursor_signer {
Some(signer) => wrap_pit_id_in_response(outcome.body, signer.as_ref(), &target.cluster),
None => outcome.body,
};
trace.record_dispatch(DispatchInfo {
cluster: target.cluster.clone(),
upstream_status: outcome.status,
pool_reuse: outcome.pool_reuse,
});
Ok(PipelineResponse {
status: outcome.status,
body,
content_type: None,
})
}
pub(crate) async fn pit_delete(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
wrapped_ids: &[String],
) -> Result<PipelineResponse, RequestError> {
let signer = self.cursor_signer.as_ref().ok_or(RequestError::Cursor {
reason: "cursor affinity is not enabled",
})?;
let mut by_cluster: BTreeMap<ClusterId, Vec<String>> = BTreeMap::new();
for wrapped in wrapped_ids {
let (cluster, real) = osproxy_core::cursor::unwrap(signer.as_ref(), wrapped).ok_or(
RequestError::Cursor {
reason: "pit envelope is invalid or expired",
},
)?;
by_cluster.entry(cluster).or_default().push(real);
}
let mut merged_pits: Vec<serde_json::Value> = Vec::new();
let mut status = 200;
for (cluster, real_ids) in by_cluster {
let body = serde_json::json!({ "pit_id": real_ids });
let op = CursorOp::new(
cluster.clone(),
ctx.method(),
"/_search/point_in_time",
serde_json::to_vec(&body).unwrap_or_default(),
)
.with_endpoint(self.router.cluster_endpoint(&cluster))
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec());
let outcome = self.sink.cursor(op).await?;
if outcome.status >= 400 {
status = outcome.status;
}
trace.record_dispatch(DispatchInfo {
cluster,
upstream_status: outcome.status,
pool_reuse: outcome.pool_reuse,
});
if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&outcome.body) {
if let Some(pits) = v.get("pits").and_then(serde_json::Value::as_array) {
merged_pits.extend(pits.iter().cloned());
}
}
}
let body = serde_json::to_vec(&serde_json::json!({ "pits": merged_pits }))
.unwrap_or_else(|_| br#"{"pits":[]}"#.to_vec());
Ok(PipelineResponse {
status,
body,
content_type: None,
})
}
}