use osproxy_core::{ClusterId, TraceContext};
use osproxy_observe::DispatchInfo;
use osproxy_sink::{CursorOp, Reader, Sink, WriteAck, WriteBatch};
use osproxy_spi::RequestCtx;
use osproxy_tenancy::{Resolved, Router};
use crate::asyncwrite::WriteMode;
use crate::cursor::{
cursor_request, forwardable_query, has_scroll_id, pit_id_in_body, rewrite_cursor_body,
wrap_scroll_id_in_response,
};
use crate::error::RequestError;
use crate::observe::{dispatch_info, read_dispatch_info, resolve_info, rewrite_info};
use crate::pipeline::{Pipeline, PipelineResponse};
use crate::plan::build_write_batch;
use crate::read::{
build_delete_op, build_read_op, build_search_op, not_found_body, shape_delete, shape_found,
shape_hits,
};
use crate::retry::with_retry;
use osproxy_observe::RequestTrace;
impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
pub(crate) async fn ingest_doc(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
let resolved = self.resolve_with_retry(ctx).await?;
trace.record_resolve(resolve_info(&resolved));
let batch = build_write_batch(&resolved, ctx.body())?;
trace.record_rewrite(rewrite_info(&resolved, &batch));
if self.write_mode(ctx) == WriteMode::Async {
return Ok(self.enqueue_async(ctx, &resolved, batch).await);
}
self.gate_write(&resolved).await?;
let up_trace = self.upstream_trace(ctx);
let ack = self
.sink
.write(
batch
.with_trace(up_trace.as_ref())
.with_forward_headers(ctx.forward_headers()),
)
.await?;
trace.record_dispatch(dispatch_info(&resolved, &ack));
Ok(response_for(&resolved, &ack))
}
pub(crate) async fn resolve_with_retry(
&self,
ctx: &RequestCtx<'_>,
) -> Result<Resolved, RequestError> {
with_retry(self.retry, || self.router.resolve(ctx))
.await
.map_err(Into::into)
}
async fn gate_write(&self, resolved: &Resolved) -> Result<(), RequestError> {
let epoch = resolved.decision.epoch;
if self.router.admit_write(&resolved.partition, epoch).await {
Ok(())
} else {
Err(RequestError::StaleEpoch { stamped: epoch })
}
}
pub(crate) async fn ingest_bulk(
&self,
ctx: &RequestCtx<'_>,
_trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
if self.write_mode(ctx) == WriteMode::Async {
return crate::bulk::ingest_bulk_async(
&self.router,
self.write_queue.as_ref(),
ctx,
self.retry,
self.upstream_trace(ctx),
)
.await;
}
crate::bulk::ingest_bulk(
&self.router,
&self.sink,
ctx,
self.retry,
self.upstream_trace(ctx),
)
.await
}
pub(crate) async fn get_by_id(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
let resolved = self.resolve_with_retry(ctx).await?;
trace.record_resolve(resolve_info(&resolved));
let logical_id = ctx.doc_id().ok_or(RequestError::Internal {
reason: "get-by-id reached the engine without a document id",
})?;
let (read_op, shape) = build_read_op(&resolved, logical_id)?;
let outcome = self
.sink
.get(
read_op
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec()),
)
.await?;
trace.record_dispatch(read_dispatch_info(
&resolved,
outcome.status,
outcome.pool_reuse,
));
if outcome.found {
let body = shape_found(
&outcome.body,
ctx.logical_index(),
logical_id,
&shape.inject_names,
)?;
Ok(PipelineResponse {
status: 200,
body,
content_type: None,
})
} else {
Ok(PipelineResponse {
status: 404,
body: not_found_body(ctx.logical_index(), logical_id),
content_type: None,
})
}
}
pub(crate) async fn multi_get(
&self,
ctx: &RequestCtx<'_>,
_trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
crate::mget::multi_get(
&self.router,
&self.sink,
ctx,
self.retry,
self.upstream_trace(ctx),
)
.await
}
pub(crate) async fn delete_by_id(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
let resolved = self.resolve_with_retry(ctx).await?;
trace.record_resolve(resolve_info(&resolved));
let logical_id = ctx.doc_id().ok_or(RequestError::Internal {
reason: "delete-by-id reached the engine without a document id",
})?;
let op = build_delete_op(&resolved, logical_id)?;
if self.write_mode(ctx) == WriteMode::Async {
let batch = WriteBatch::single(op);
return Ok(self.enqueue_async(ctx, &resolved, batch).await);
}
self.gate_write(&resolved).await?;
let up_trace = self.upstream_trace(ctx);
let ack = self
.sink
.write(
WriteBatch::single(op)
.with_trace(up_trace.as_ref())
.with_forward_headers(ctx.forward_headers()),
)
.await?;
trace.record_dispatch(dispatch_info(&resolved, &ack));
let status = ack.results().first().map_or(200, |r| r.status);
Ok(PipelineResponse {
status,
body: shape_delete(ctx.logical_index(), logical_id, status),
content_type: None,
})
}
pub(crate) async fn search(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
if self.cursor_signer.is_some() {
if let Some(wrapped) = pit_id_in_body(ctx.body()) {
return self.pit_search(ctx, trace, &wrapped).await;
}
}
let resolved = self.resolve_with_retry(ctx).await?;
trace.record_resolve(resolve_info(&resolved));
let (search_op, shape) = build_search_op(&resolved, ctx.body())?;
let outcome = self
.sink
.search(
search_op
.with_query(forwardable_query(ctx.query()))
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec()),
)
.await?;
trace.record_dispatch(read_dispatch_info(
&resolved,
outcome.status,
outcome.pool_reuse,
));
let body = shape_hits(
&outcome.body,
ctx.logical_index(),
resolved.partition.as_str(),
&shape,
)?;
let body = self.wrap_scroll_id(body, &resolved.decision.target.cluster);
Ok(PipelineResponse {
status: outcome.status,
body,
content_type: None,
})
}
fn wrap_scroll_id(&self, body: Vec<u8>, cluster: &ClusterId) -> Vec<u8> {
let Some(signer) = &self.cursor_signer else {
return body;
};
if !has_scroll_id(&body) {
return body;
}
wrap_scroll_id_in_response(body, signer.as_ref(), cluster)
}
pub(crate) async fn multi_search(
&self,
ctx: &RequestCtx<'_>,
_trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
crate::msearch::multi_search(
&self.router,
&self.sink,
ctx,
self.retry,
self.upstream_trace(ctx),
)
.await
}
pub(crate) async fn count(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
let resolved = self.resolve_with_retry(ctx).await?;
trace.record_resolve(resolve_info(&resolved));
let (search_op, _shape) = build_search_op(&resolved, ctx.body())?;
let outcome = self
.sink
.count(
search_op
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec()),
)
.await?;
trace.record_dispatch(read_dispatch_info(
&resolved,
outcome.status,
outcome.pool_reuse,
));
let body = format!(r#"{{"count":{}}}"#, outcome.count).into_bytes();
Ok(PipelineResponse {
status: outcome.status,
body,
content_type: None,
})
}
pub(crate) async fn cursor(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
let Some(signer) = &self.cursor_signer else {
return Err(RequestError::Cursor {
reason: "cursor affinity is not enabled",
});
};
if !ctx.logical_index().is_empty() {
return self.pit_create(ctx, trace).await;
}
if let Some(pit_ids) = crate::cursor::pit_ids_in_delete_body(ctx.body()) {
return self.pit_delete(ctx, trace, &pit_ids).await;
}
let req = cursor_request(ctx).ok_or(RequestError::Cursor {
reason: "no cursor id in the request",
})?;
let (cluster, real_id) = osproxy_core::cursor::unwrap(signer.as_ref(), &req.wrapped)
.ok_or(RequestError::Cursor {
reason: "cursor envelope is invalid or expired",
})?;
let body = rewrite_cursor_body(ctx.body(), req.id_field, &real_id);
let op = CursorOp::new(cluster.clone(), ctx.method(), req.upstream_path, body)
.with_endpoint(self.router.cluster_endpoint(&cluster))
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec());
let outcome = self.sink.cursor(op).await?;
let resp_body = self.wrap_scroll_id(outcome.body, &cluster);
trace.record_dispatch(DispatchInfo {
cluster,
upstream_status: outcome.status,
pool_reuse: outcome.pool_reuse,
});
Ok(PipelineResponse {
status: outcome.status,
body: resp_body,
content_type: None,
})
}
}
pub(crate) fn wire_trace(ctx: &RequestCtx<'_>) -> TraceContext {
let b3 = b3_single(ctx);
TraceContext::propagate_with_b3(
ctx.headers().get("traceparent"),
ctx.headers().get("tracestate"),
b3.as_deref(),
ctx.request_id(),
)
}
fn b3_single(ctx: &RequestCtx<'_>) -> Option<String> {
let h = ctx.headers();
if let Some(single) = h.get("b3") {
return Some(single.to_owned());
}
let trace = h.get("x-b3-traceid")?;
let span = h.get("x-b3-spanid")?;
let mut out = format!("{trace}-{span}");
if let Some(sampled) = h.get("x-b3-sampled") {
out.push('-');
out.push_str(sampled);
}
Some(out)
}
fn response_for(resolved: &Resolved, ack: &WriteAck) -> PipelineResponse {
let Some(result) = ack.results().first() else {
return PipelineResponse {
status: 200,
body: b"{}".to_vec(),
content_type: None,
};
};
let outcome = if result.created { "created" } else { "updated" };
let logical_id = crate::read::logical_write_id(resolved, &result.id);
let body = serde_json::to_vec(&serde_json::json!({
"_id": logical_id,
"result": outcome,
}))
.unwrap_or_else(|_| b"{}".to_vec());
PipelineResponse {
status: result.status,
body,
content_type: None,
}
}