use crate::bridge::envelope::{Payload, Response, Status};
use crate::control::gateway::GatewayErrorMap;
use crate::control::gateway::core::QueryContext as GatewayQueryContext;
use crate::control::planner::physical::PhysicalTask;
use crate::control::server::{dispatch_utils, wal_dispatch};
use crate::types::{Lsn, RequestId, TraceId};
use super::DispatchCtx;
pub(super) async fn dispatch_task_via_gateway(
ctx: &DispatchCtx<'_>,
task: PhysicalTask,
) -> crate::Result<Response> {
let vshard_id = task.vshard_id;
let tenant_id = task.tenant_id;
let plan = task.plan;
match ctx.state.gateway.as_ref() {
Some(gw) => {
let gw_ctx = GatewayQueryContext {
tenant_id,
trace_id: TraceId::generate(),
database_id: nodedb_types::id::DatabaseId::DEFAULT,
};
gw.execute(&gw_ctx, plan)
.await
.map_err(|e| {
let (code, msg) = GatewayErrorMap::to_native(&e);
crate::Error::Internal {
detail: format!("gateway error {code}: {msg}"),
}
})
.map(payloads_to_response)
}
None => {
wal_dispatch::wal_append_if_write(
&ctx.state.wal,
tenant_id,
vshard_id,
crate::types::DatabaseId::DEFAULT,
&plan,
)?;
dispatch_utils::dispatch_to_data_plane(
ctx.state,
tenant_id,
vshard_id,
plan,
TraceId::ZERO,
)
.await
}
}
}
fn payloads_to_response(payloads: Vec<Vec<u8>>) -> Response {
let payload = payloads
.into_iter()
.next()
.map(Payload::from_vec)
.unwrap_or_else(Payload::empty);
Response {
request_id: RequestId::new(0),
status: Status::Ok,
attempt: 0,
partial: false,
payload,
watermark_lsn: Lsn::new(0),
error_code: None,
}
}