use crate::bridge::envelope::{Payload, PhysicalPlan, Response, Status};
use crate::control::gateway::GatewayErrorMap;
use crate::control::gateway::core::QueryContext;
use crate::control::server::dispatch_utils;
use crate::control::server::wal_dispatch;
use crate::control::state::SharedState;
use crate::types::{DatabaseId, Lsn, RequestId, TraceId, VShardId};
use super::session::RespSession;
pub(super) async fn dispatch_kv(
state: &SharedState,
session: &RespSession,
plan: PhysicalPlan,
) -> crate::Result<Response> {
match state.gateway.as_ref() {
Some(gw) => {
let gw_ctx = QueryContext {
tenant_id: session.tenant_id,
trace_id: TraceId::generate(),
database_id: DatabaseId::DEFAULT,
};
gw.execute(&gw_ctx, plan)
.await
.map_err(|e| crate::Error::Bridge {
detail: GatewayErrorMap::to_resp(&e),
})
.map(gateway_payloads_to_response)
}
None => {
let vshard =
VShardId::from_collection_in_database(DatabaseId::DEFAULT, &session.collection);
dispatch_utils::dispatch_to_data_plane(
state,
session.tenant_id,
vshard,
plan,
TraceId::ZERO,
)
.await
.map_err(map_busy_error)
}
}
}
pub(super) async fn dispatch_kv_write(
state: &SharedState,
session: &RespSession,
plan: PhysicalPlan,
) -> crate::Result<Response> {
let vshard = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &session.collection);
wal_dispatch::wal_append_if_write(
&state.wal,
session.tenant_id,
vshard,
DatabaseId::DEFAULT,
&plan,
)?;
match state.gateway.as_ref() {
Some(gw) => {
let gw_ctx = QueryContext {
tenant_id: session.tenant_id,
trace_id: TraceId::generate(),
database_id: DatabaseId::DEFAULT,
};
gw.execute(&gw_ctx, plan)
.await
.map_err(|e| crate::Error::Bridge {
detail: GatewayErrorMap::to_resp(&e),
})
.map(gateway_payloads_to_response)
}
None => dispatch_utils::dispatch_to_data_plane(
state,
session.tenant_id,
vshard,
plan,
TraceId::ZERO,
)
.await
.map_err(map_busy_error),
}
}
fn gateway_payloads_to_response(payloads: Vec<Vec<u8>>) -> Response {
let payload = payloads
.into_iter()
.next()
.map(Payload::from_vec)
.unwrap_or_else(Payload::empty);
Response {
request_id: RequestId::new(0),
status: Status::Ok,
attempt: 0,
partial: false,
payload,
watermark_lsn: Lsn::new(0),
error_code: None,
}
}
fn map_busy_error(e: crate::Error) -> crate::Error {
match &e {
crate::Error::Bridge { .. } | crate::Error::Dispatch { .. } => crate::Error::Bridge {
detail: "BUSY NodeDB is processing requests, retry later".into(),
},
_ => e,
}
}
pub(super) fn parse_json_field_i64(
payload: &crate::bridge::envelope::Payload,
field: &str,
) -> Option<i64> {
let json: serde_json::Value = sonic_rs::from_slice(payload).ok()?;
json.get(field)?.as_i64()
}