use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use core_api::{
ServiceRequestId, SnapshotConfig, UdpServiceClient, UdpServiceIncoming,
UdpServiceServer,
};
use introspection_core::{
MiddlewareLoadReport, ResourceCatalogReport, RuntimeLoadReport, StatusSnapshot,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub const GATEWAY_SERVICE_API_VERSION: &str = "robotrt.gateway.service.v1";
pub const GATEWAY_SERVICE_NAME: &str = "/robotrt/gateway/query";
pub const STATUS_SERVICE_API_VERSION: &str = GATEWAY_SERVICE_API_VERSION;
pub const STATUS_SERVICE_NAME: &str = GATEWAY_SERVICE_NAME;
pub const STATUS_OP_SNAPSHOT: &str = "snapshot";
pub const STATUS_OP_RUNTIME: &str = "runtime";
pub const STATUS_OP_MIDDLEWARE: &str = "middleware";
pub const STATUS_OP_RESOURCE: &str = "resource";
pub const STATUS_OP_SERVICE_CALL: &str = "service_call";
pub const STATUS_OP_TOPIC_SUBSCRIBE: &str = "topic_subscribe";
pub const STATUS_OP_TOPIC_POLL: &str = "topic_poll";
pub const STATUS_OP_TOPIC_UNSUBSCRIBE: &str = "topic_unsubscribe";
pub const STATUS_OP_TOPIC_PUBLISH: &str = "topic_publish";
pub const STATUS_OP_ACTION_SEND: &str = "action_send";
pub const STATUS_OP_ACTION_CANCEL: &str = "action_cancel";
pub const STATUS_OP_ACTION_WATCH: &str = "action_watch";
pub const STATUS_OP_MISSION_REQUEST: &str = "mission_request";
pub const STATUS_OP_MISSION_REPLY: &str = "mission_reply";
pub const STATUS_OP_MISSION_WATCH: &str = "mission_watch";
pub const STATUS_OP_GATEWAY_OBSERVE: &str = "gateway_observe";
pub const STATUS_OP_GATEWAY_POLICY_GET: &str = "gateway_policy_get";
pub const STATUS_OP_GATEWAY_POLICY_SET: &str = "gateway_policy_set";
pub const STATUS_OP_GATEWAY_AUDIT_LIST: &str = "gateway_audit_list";
pub const STATUS_OP_GATEWAY_SHUTDOWN: &str = "gateway_shutdown";
pub const STATUS_OP_SNAPSHOT_CONFIG_GET: &str = "snapshot_config_get";
pub const STATUS_OP_SNAPSHOT_CONFIG_SET: &str = "snapshot_config_set";
pub const NODE_INFO_API_VERSION: &str = "robotrt.node.info.v1";
pub const TOPIC_INFO_API_VERSION: &str = "robotrt.topic.info.v1";
pub const ACTION_INFO_API_VERSION: &str = "robotrt.action.info.v1";
pub const TOPIC_HZ_API_VERSION: &str = "robotrt.topic.hz.v1";
pub const TOPIC_ECHO_API_VERSION: &str = "robotrt.topic.echo.v1";
pub const GATEWAY_PLANE_GOVERNANCE: &str = "governance";
pub const GATEWAY_ROUTE_MODE_DIRECT_OR_NEARBY: &str = "direct_or_nearby";
pub const GATEWAY_ERR_UNSUPPORTED_API_VERSION: &str = "GW_ERR_UNSUPPORTED_API_VERSION";
pub const GATEWAY_ERR_UNSUPPORTED_OP: &str = "GW_ERR_UNSUPPORTED_OP";
pub const GATEWAY_ERR_MISSING_FIELD: &str = "GW_ERR_MISSING_FIELD";
pub const GATEWAY_ERR_INVALID_META: &str = "GW_ERR_INVALID_META";
pub const GATEWAY_ERR_DATA_PROXY_FORBIDDEN: &str = "GW_ERR_DATA_PROXY_FORBIDDEN";
pub const GATEWAY_ERR_DATA_PLANE_BLOCKED: &str = "GW_ERR_DATA_PLANE_BLOCKED";
static NEXT_STATUS_REQUEST_ID: AtomicU64 = AtomicU64::new(1);
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GatewayRequestMeta {
pub plane: String,
pub route_mode: String,
pub traffic_kind: String,
pub allow_gateway_data_proxy: bool,
}
impl Default for GatewayRequestMeta {
fn default() -> Self {
Self {
plane: GATEWAY_PLANE_GOVERNANCE.to_string(),
route_mode: GATEWAY_ROUTE_MODE_DIRECT_OR_NEARBY.to_string(),
traffic_kind: String::from("governance"),
allow_gateway_data_proxy: false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GatewayResponseMeta {
pub plane: String,
pub route_mode: String,
pub data_plane_forwarding: bool,
}
impl Default for GatewayResponseMeta {
fn default() -> Self {
Self {
plane: GATEWAY_PLANE_GOVERNANCE.to_string(),
route_mode: GATEWAY_ROUTE_MODE_DIRECT_OR_NEARBY.to_string(),
data_plane_forwarding: false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ServiceCallPayload {
pub service: String,
pub request: String,
pub timeout_ms: Option<u64>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ServiceCallResult {
pub service: String,
pub accepted: bool,
pub response: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StatusServiceRequest {
pub api_version: String,
pub op: String,
pub snapshot_config: Option<SnapshotConfig>,
pub service_call: Option<ServiceCallPayload>,
pub op_payload: Option<Value>,
#[serde(default)]
pub gateway_meta: Option<GatewayRequestMeta>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StatusServiceResponse {
pub api_version: String,
pub op: String,
pub snapshot: Option<StatusSnapshot>,
pub runtime: Option<RuntimeLoadReport>,
pub middleware: Option<MiddlewareLoadReport>,
pub resource: Option<ResourceCatalogReport>,
pub snapshot_config: Option<SnapshotConfig>,
pub service_call: Option<ServiceCallResult>,
pub op_result: Option<Value>,
#[serde(default)]
pub gateway_meta: GatewayResponseMeta,
}
pub fn gateway_error(code: &str, detail: impl AsRef<str>) -> String {
format!("{code}: {}", detail.as_ref())
}
fn op_traffic_kind(op: &str) -> &'static str {
match op {
STATUS_OP_TOPIC_SUBSCRIBE | STATUS_OP_TOPIC_POLL | STATUS_OP_TOPIC_UNSUBSCRIBE
| STATUS_OP_TOPIC_PUBLISH => "topic",
STATUS_OP_SERVICE_CALL => "service",
STATUS_OP_ACTION_SEND | STATUS_OP_ACTION_CANCEL | STATUS_OP_ACTION_WATCH => "action",
STATUS_OP_MISSION_REQUEST | STATUS_OP_MISSION_REPLY | STATUS_OP_MISSION_WATCH => "mission",
_ => "governance",
}
}
fn default_gateway_request_meta(op: &str) -> GatewayRequestMeta {
GatewayRequestMeta {
traffic_kind: op_traffic_kind(op).to_string(),
..GatewayRequestMeta::default()
}
}
fn request_gateway_meta(req: &StatusServiceRequest) -> GatewayRequestMeta {
req.gateway_meta
.clone()
.unwrap_or_else(|| default_gateway_request_meta(&req.op))
}
fn default_gateway_response_meta() -> GatewayResponseMeta {
GatewayResponseMeta::default()
}
pub fn next_request_id() -> ServiceRequestId {
ServiceRequestId(NEXT_STATUS_REQUEST_ID.fetch_add(1, Ordering::Relaxed))
}
pub fn build_request(op: &str) -> StatusServiceRequest {
StatusServiceRequest {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: op.to_string(),
snapshot_config: None,
service_call: None,
op_payload: None,
gateway_meta: Some(default_gateway_request_meta(op)),
}
}
pub fn build_config_request(op: &str, config: SnapshotConfig) -> StatusServiceRequest {
StatusServiceRequest {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: op.to_string(),
snapshot_config: Some(config),
service_call: None,
op_payload: None,
gateway_meta: Some(default_gateway_request_meta(op)),
}
}
pub fn build_op_payload_request(op: &str, payload: Value) -> StatusServiceRequest {
StatusServiceRequest {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: op.to_string(),
snapshot_config: None,
service_call: None,
op_payload: Some(payload),
gateway_meta: Some(default_gateway_request_meta(op)),
}
}
pub fn build_service_call_request(
service: impl Into<String>,
request: impl Into<String>,
timeout_ms: Option<u64>,
) -> StatusServiceRequest {
StatusServiceRequest {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: STATUS_OP_SERVICE_CALL.to_string(),
snapshot_config: None,
service_call: Some(ServiceCallPayload {
service: service.into(),
request: request.into(),
timeout_ms,
}),
op_payload: None,
gateway_meta: Some(default_gateway_request_meta(STATUS_OP_SERVICE_CALL)),
}
}
pub fn build_snapshot_request(_request_id: ServiceRequestId) -> StatusServiceRequest {
build_request(STATUS_OP_SNAPSHOT)
}
pub fn validate_request(req: &StatusServiceRequest) -> Result<(), String> {
if req.api_version != STATUS_SERVICE_API_VERSION {
return Err(gateway_error(
GATEWAY_ERR_UNSUPPORTED_API_VERSION,
format!(
"unsupported api_version={} expected={}",
req.api_version, STATUS_SERVICE_API_VERSION
),
));
}
if !matches!(
req.op.as_str(),
STATUS_OP_SNAPSHOT
| STATUS_OP_RUNTIME
| STATUS_OP_MIDDLEWARE
| STATUS_OP_RESOURCE
| STATUS_OP_SERVICE_CALL
| STATUS_OP_TOPIC_SUBSCRIBE
| STATUS_OP_TOPIC_POLL
| STATUS_OP_TOPIC_UNSUBSCRIBE
| STATUS_OP_TOPIC_PUBLISH
| STATUS_OP_ACTION_SEND
| STATUS_OP_ACTION_CANCEL
| STATUS_OP_ACTION_WATCH
| STATUS_OP_MISSION_REQUEST
| STATUS_OP_MISSION_REPLY
| STATUS_OP_MISSION_WATCH
| STATUS_OP_GATEWAY_OBSERVE
| STATUS_OP_GATEWAY_POLICY_GET
| STATUS_OP_GATEWAY_POLICY_SET
| STATUS_OP_GATEWAY_AUDIT_LIST
| STATUS_OP_GATEWAY_SHUTDOWN
| STATUS_OP_SNAPSHOT_CONFIG_GET
| STATUS_OP_SNAPSHOT_CONFIG_SET
) {
return Err(gateway_error(
GATEWAY_ERR_UNSUPPORTED_OP,
format!("unsupported op={}", req.op),
));
}
let meta = request_gateway_meta(req);
if meta.plane != GATEWAY_PLANE_GOVERNANCE {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!(
"unsupported gateway_meta.plane={} expected={}",
meta.plane, GATEWAY_PLANE_GOVERNANCE
),
));
}
if meta.route_mode != GATEWAY_ROUTE_MODE_DIRECT_OR_NEARBY {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!(
"unsupported gateway_meta.route_mode={} expected={}",
meta.route_mode, GATEWAY_ROUTE_MODE_DIRECT_OR_NEARBY
),
));
}
if meta.allow_gateway_data_proxy {
return Err(gateway_error(
GATEWAY_ERR_DATA_PROXY_FORBIDDEN,
"gateway_meta.allow_gateway_data_proxy=true is forbidden",
));
}
let expected_kind = op_traffic_kind(&req.op);
if meta.traffic_kind != expected_kind {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!(
"mismatched gateway_meta.traffic_kind={} expected={expected_kind}",
meta.traffic_kind
),
));
}
if req.op == STATUS_OP_SNAPSHOT_CONFIG_SET && req.snapshot_config.is_none() {
return Err(gateway_error(
GATEWAY_ERR_MISSING_FIELD,
"missing snapshot_config payload for op=snapshot_config_set",
));
}
if req.op == STATUS_OP_SERVICE_CALL && req.service_call.is_none() {
return Err(gateway_error(
GATEWAY_ERR_MISSING_FIELD,
"missing service_call payload for op=service_call",
));
}
if matches!(
req.op.as_str(),
STATUS_OP_TOPIC_SUBSCRIBE
| STATUS_OP_TOPIC_POLL
| STATUS_OP_TOPIC_UNSUBSCRIBE
| STATUS_OP_TOPIC_PUBLISH
| STATUS_OP_ACTION_SEND
| STATUS_OP_ACTION_CANCEL
| STATUS_OP_ACTION_WATCH
| STATUS_OP_MISSION_REQUEST
| STATUS_OP_MISSION_REPLY
| STATUS_OP_MISSION_WATCH
| STATUS_OP_GATEWAY_POLICY_SET
| STATUS_OP_GATEWAY_AUDIT_LIST
) && req.op_payload.is_none()
{
return Err(gateway_error(
GATEWAY_ERR_MISSING_FIELD,
format!("missing op_payload for op={}", req.op),
));
}
Ok(())
}
pub fn success_response(
_request_id: ServiceRequestId,
snapshot: StatusSnapshot,
) -> StatusServiceResponse {
StatusServiceResponse {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: STATUS_OP_SNAPSHOT.to_string(),
snapshot: Some(snapshot),
runtime: None,
middleware: None,
resource: None,
snapshot_config: None,
service_call: None,
op_result: None,
gateway_meta: default_gateway_response_meta(),
}
}
pub fn success_runtime_response(report: RuntimeLoadReport) -> StatusServiceResponse {
StatusServiceResponse {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: STATUS_OP_RUNTIME.to_string(),
snapshot: None,
runtime: Some(report),
middleware: None,
resource: None,
snapshot_config: None,
service_call: None,
op_result: None,
gateway_meta: default_gateway_response_meta(),
}
}
pub fn success_middleware_response(report: MiddlewareLoadReport) -> StatusServiceResponse {
StatusServiceResponse {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: STATUS_OP_MIDDLEWARE.to_string(),
snapshot: None,
runtime: None,
middleware: Some(report),
resource: None,
snapshot_config: None,
service_call: None,
op_result: None,
gateway_meta: default_gateway_response_meta(),
}
}
pub fn success_resource_response(report: ResourceCatalogReport) -> StatusServiceResponse {
StatusServiceResponse {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: STATUS_OP_RESOURCE.to_string(),
snapshot: None,
runtime: None,
middleware: None,
resource: Some(report),
snapshot_config: None,
service_call: None,
op_result: None,
gateway_meta: default_gateway_response_meta(),
}
}
pub fn success_service_call_response(result: ServiceCallResult) -> StatusServiceResponse {
StatusServiceResponse {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: STATUS_OP_SERVICE_CALL.to_string(),
snapshot: None,
runtime: None,
middleware: None,
resource: None,
snapshot_config: None,
service_call: Some(result),
op_result: None,
gateway_meta: default_gateway_response_meta(),
}
}
pub fn success_op_result_response(op: &str, result: Value) -> StatusServiceResponse {
StatusServiceResponse {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: op.to_string(),
snapshot: None,
runtime: None,
middleware: None,
resource: None,
snapshot_config: None,
service_call: None,
op_result: Some(result),
gateway_meta: default_gateway_response_meta(),
}
}
pub fn success_snapshot_config_response(
op: &str,
config: SnapshotConfig,
) -> StatusServiceResponse {
StatusServiceResponse {
api_version: STATUS_SERVICE_API_VERSION.to_string(),
op: op.to_string(),
snapshot: None,
runtime: None,
middleware: None,
resource: None,
snapshot_config: Some(config),
service_call: None,
op_result: None,
gateway_meta: default_gateway_response_meta(),
}
}
pub fn validate_response(
resp: &StatusServiceResponse,
_request_id: ServiceRequestId,
expected_op: &str,
) -> Result<(), String> {
if resp.api_version != STATUS_SERVICE_API_VERSION {
return Err(format!(
"unsupported response api_version={} expected={}",
resp.api_version, STATUS_SERVICE_API_VERSION
));
}
if resp.op != expected_op {
return Err(format!(
"unexpected response op={} expected={expected_op}",
resp.op
));
}
if resp.gateway_meta.plane != GATEWAY_PLANE_GOVERNANCE {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!(
"unexpected response gateway_meta.plane={} expected={}",
resp.gateway_meta.plane, GATEWAY_PLANE_GOVERNANCE
),
));
}
if resp.gateway_meta.route_mode != GATEWAY_ROUTE_MODE_DIRECT_OR_NEARBY {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!(
"unexpected response gateway_meta.route_mode={} expected={}",
resp.gateway_meta.route_mode, GATEWAY_ROUTE_MODE_DIRECT_OR_NEARBY
),
));
}
if resp.gateway_meta.data_plane_forwarding {
return Err(gateway_error(
GATEWAY_ERR_DATA_PROXY_FORBIDDEN,
"response indicates gateway data plane forwarding is enabled",
));
}
Ok(())
}
pub fn make_udp_service_client(
endpoint: String,
timeout_ms: u64,
) -> Result<UdpServiceClient, String> {
UdpServiceClient::new(endpoint, Duration::from_millis(timeout_ms))
.map_err(|err| format!("create udp service client failed: {err}"))
}
pub fn make_udp_service_server(bind: &str) -> Result<UdpServiceServer, String> {
UdpServiceServer::bind(bind).map_err(|err| format!("bind udp service server failed: {err}"))
}
pub fn recv_status_request(
server: &UdpServiceServer,
) -> Result<UdpServiceIncoming<StatusServiceRequest>, String> {
server
.recv_json::<StatusServiceRequest>()
.map_err(|err| format!("recv status service request failed: {err}"))
}
pub fn normalize_udp_endpoint(raw: &str) -> Result<String, String> {
if let Some(rest) = raw.strip_prefix("udp://") {
if rest.is_empty() {
return Err(String::from("invalid --endpoint value: udp://"));
}
return Ok(rest.to_string());
}
if raw.contains("://") {
return Err(format!("unsupported endpoint scheme: {raw}"));
}
Ok(raw.to_string())
}
pub fn fetch_snapshot_from_endpoint(
endpoint: &str,
timeout_ms: u64,
) -> Result<(StatusSnapshot, String), String> {
let endpoint = normalize_udp_endpoint(endpoint)?;
let client = make_udp_service_client(endpoint.clone(), timeout_ms)?;
let request_id = next_request_id();
let request = build_snapshot_request(request_id);
let response: StatusServiceResponse = client
.call_json(STATUS_SERVICE_NAME, request_id, &request)
.map_err(|err| format!("status query to {endpoint} failed: {err}"))?;
validate_response(&response, request_id, STATUS_OP_SNAPSHOT)?;
let snapshot = response.snapshot.ok_or_else(|| {
format!(
"status response from {endpoint} missing snapshot payload for op={STATUS_OP_SNAPSHOT}"
)
})?;
Ok((snapshot, endpoint))
}