use std::collections::{HashMap, VecDeque};
use std::time::{SystemTime, UNIX_EPOCH};
use core_api::{SnapshotIncludeFlags, SnapshotConfig};
use serde_json::Value;
use crate::constants::{
DEFAULT_LOAD_REPORT_PATH, DEFAULT_RESOURCE_REPORT_PATH, DEFAULT_RUNTIME_REPORT_PATH,
DEFAULT_STATUS_REPORT_PATH,
};
use crate::demo::{
demo_middleware_report, demo_resource_report, demo_runtime_report, demo_status_snapshot,
};
use crate::helpers::{has_flag, option_value, parse_report_path};
use crate::gateway::{
STATUS_OP_ACTION_CANCEL, STATUS_OP_ACTION_SEND, STATUS_OP_ACTION_WATCH,
STATUS_OP_GATEWAY_AUDIT_LIST, STATUS_OP_GATEWAY_OBSERVE, STATUS_OP_GATEWAY_POLICY_GET,
STATUS_OP_GATEWAY_POLICY_SET, STATUS_OP_GATEWAY_SHUTDOWN,
STATUS_OP_MIDDLEWARE, STATUS_OP_MISSION_REPLY, STATUS_OP_MISSION_REQUEST,
STATUS_OP_MISSION_WATCH, STATUS_OP_RESOURCE, STATUS_OP_RUNTIME, STATUS_OP_SNAPSHOT,
STATUS_OP_SERVICE_CALL, STATUS_OP_SNAPSHOT_CONFIG_GET, STATUS_OP_SNAPSHOT_CONFIG_SET,
STATUS_OP_TOPIC_POLL, STATUS_OP_TOPIC_PUBLISH, STATUS_OP_TOPIC_SUBSCRIBE,
STATUS_OP_TOPIC_UNSUBSCRIBE,
STATUS_SERVICE_API_VERSION, STATUS_SERVICE_NAME, ServiceCallResult, make_udp_service_server,
GATEWAY_ERR_DATA_PLANE_BLOCKED, GATEWAY_ERR_INVALID_META, GATEWAY_ERR_MISSING_FIELD,
gateway_error,
recv_status_request,
success_op_result_response,
success_middleware_response, success_resource_response, success_runtime_response,
success_service_call_response, success_snapshot_config_response, validate_request,
};
use super::DEFAULT_STATUS_BIND_ADDR;
#[derive(Clone, Debug)]
struct TopicStreamState {
topic: String,
cursor: u64,
next_sequence: u64,
max_batch: usize,
}
#[derive(Clone, Debug)]
struct LiveTopicFrame {
sequence: u64,
captured_at_unix_nanos: u128,
transport: String,
payload: Value,
payload_base64: Option<String>,
external_ref: Option<Value>,
packet_kind: Option<String>,
control_label: Option<String>,
schema_id: Option<String>,
schema_version: Option<u16>,
}
#[derive(Clone, Debug)]
struct ActionGoalState {
action: String,
goal: String,
state: String,
feedback: Option<String>,
result: Option<String>,
error: Option<String>,
updated_at_unix_nanos: u128,
}
#[derive(Clone, Debug)]
struct MissionEntry {
direction: String,
message: String,
captured_at_unix_nanos: u128,
}
#[derive(Clone, Debug)]
struct GatewayPolicyState {
route_preference: String,
rate_limit_per_sec: u64,
namespace_isolation: Option<String>,
rollback_enabled: bool,
topic_max_retry: u8,
topic_retry_timeout_ms: u64,
service_retry_timeout_ms: u64,
action_retry_timeout_ms: u64,
mission_retry_timeout_ms: u64,
topic_max_inflight: usize,
topic_replay_window: Option<usize>,
topic_dedupe_window: Option<usize>,
topic_replay_strategy: String,
updated_at_unix_nanos: u128,
}
impl Default for GatewayPolicyState {
fn default() -> Self {
Self {
route_preference: String::from("local_first"),
rate_limit_per_sec: 500,
namespace_isolation: None,
rollback_enabled: true,
topic_max_retry: 3,
topic_retry_timeout_ms: 200,
service_retry_timeout_ms: 200,
action_retry_timeout_ms: 200,
mission_retry_timeout_ms: 200,
topic_max_inflight: 16,
topic_replay_window: None,
topic_dedupe_window: Some(64),
topic_replay_strategy: String::from("block_publisher"),
updated_at_unix_nanos: now_unix_nanos(),
}
}
}
#[derive(Clone, Debug)]
struct GatewayAuditEvent {
sequence: u64,
captured_at_unix_nanos: u128,
category: String,
action: String,
status: String,
detail: Value,
}
fn now_unix_nanos() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0)
}
fn parse_required_string(payload: &Value, field: &str) -> Result<String, String> {
payload
.get(field)
.and_then(Value::as_str)
.map(|value| value.to_string())
.ok_or_else(|| format!("missing string field op_payload.{field}"))
}
fn parse_bool(value: &str) -> Result<bool, String> {
match value.to_ascii_lowercase().as_str() {
"true" | "1" | "yes" | "on" => Ok(true),
"false" | "0" | "no" | "off" => Ok(false),
_ => Err(format!("invalid bool value: {value}")),
}
}
fn append_audit(
log: &mut VecDeque<GatewayAuditEvent>,
next_seq: &mut u64,
max_items: usize,
enabled: bool,
category: &str,
action: &str,
status: &str,
detail: Value,
) -> Option<u64> {
if !enabled {
return None;
}
let sequence = *next_seq;
*next_seq = next_seq.saturating_add(1);
log.push_back(GatewayAuditEvent {
sequence,
captured_at_unix_nanos: now_unix_nanos(),
category: category.to_string(),
action: action.to_string(),
status: status.to_string(),
detail,
});
while log.len() > max_items {
log.pop_front();
}
Some(sequence)
}
fn policy_to_json(policy: &GatewayPolicyState) -> Value {
serde_json::json!({
"route_preference": policy.route_preference,
"rate_limit_per_sec": policy.rate_limit_per_sec,
"namespace_isolation": policy.namespace_isolation,
"rollback_enabled": policy.rollback_enabled,
"topic_max_retry": policy.topic_max_retry,
"topic_retry_timeout_ms": policy.topic_retry_timeout_ms,
"service_retry_timeout_ms": policy.service_retry_timeout_ms,
"action_retry_timeout_ms": policy.action_retry_timeout_ms,
"mission_retry_timeout_ms": policy.mission_retry_timeout_ms,
"topic_max_inflight": policy.topic_max_inflight,
"topic_replay_window": policy.topic_replay_window,
"topic_dedupe_window": policy.topic_dedupe_window,
"topic_replay_strategy": policy.topic_replay_strategy,
"updated_at_unix_nanos": policy.updated_at_unix_nanos,
})
}
fn summarize_snapshot(snapshot: &introspection_core::StatusSnapshot) -> Value {
serde_json::json!({
"captured_at_unix_nanos": snapshot.captured_at_unix_nanos,
"nodes": snapshot.nodes.len(),
"topics": snapshot.topics.len(),
"services": snapshot.services.len(),
"actions": snapshot.actions.len(),
"missions": snapshot.missions.len(),
"plugins": snapshot.plugins.len(),
"edges": snapshot.edges.len(),
"non_healthy_components": snapshot
.health
.iter()
.filter(|item| !item.status.eq_ignore_ascii_case("healthy"))
.count(),
})
}
fn summarize_runtime(report: &introspection_core::RuntimeLoadReport) -> Value {
serde_json::json!({
"queue_depth": report.queue_depth,
"retransmit_drivers": report.retransmit_drivers,
"backpressure": report.backpressure,
"health": {
"status": report.health.status,
"reason": report.health.reason,
},
"metric_count": report.metrics.len(),
})
}
fn summarize_middleware(report: &introspection_core::MiddlewareLoadReport) -> Value {
serde_json::json!({
"captured_at_unix_nanos": report.captured_at_unix_nanos,
"discovery": report.discovery,
"sessions": report.sessions,
"totals": report.totals,
"topic_items": report.topics.len(),
"service_items": report.services.len(),
})
}
fn summarize_resource(report: &introspection_core::ResourceCatalogReport) -> Value {
serde_json::json!({
"captured_at_unix_nanos": report.captured_at_unix_nanos,
"nodes": report.nodes.len(),
"topics": report.topics.len(),
"services": report.services.len(),
"actions": report.actions.len(),
"missions": report.missions.len(),
})
}
pub(super) fn gateway_serve(args: &[String]) -> Result<(), String> {
let bind = option_value(args, "--bind").unwrap_or_else(|| DEFAULT_STATUS_BIND_ADDR.to_string());
let source = option_value(args, "--source").unwrap_or_else(|| "live".to_string());
let report_path = parse_report_path(args, DEFAULT_STATUS_REPORT_PATH)?;
let runtime_report_path = option_value(args, "--runtime-report")
.unwrap_or_else(|| DEFAULT_RUNTIME_REPORT_PATH.to_string());
let middleware_report_path = option_value(args, "--middleware-report")
.unwrap_or_else(|| DEFAULT_LOAD_REPORT_PATH.to_string());
let resource_report_path = option_value(args, "--resource-report")
.unwrap_or_else(|| DEFAULT_RESOURCE_REPORT_PATH.to_string());
let audit_enabled = option_value(args, "--audit-enabled")
.map(|value| parse_bool(&value))
.transpose()?
.unwrap_or(true);
let mut snapshot_config = SnapshotConfig {
enabled: true,
status_report_path: report_path.to_string_lossy().to_string(),
runtime_report_path: runtime_report_path.clone(),
middleware_report_path: middleware_report_path.clone(),
resource_report_path: resource_report_path.clone(),
include_flags: SnapshotIncludeFlags {
status: true,
runtime: true,
middleware: true,
resource: true,
},
..SnapshotConfig::default()
};
let once = has_flag(args, "--once");
let json = has_flag(args, "--json");
let mut topic_streams: HashMap<String, TopicStreamState> = HashMap::new();
let mut next_topic_stream_id = 1u64;
let mut topic_buffers: HashMap<String, VecDeque<LiveTopicFrame>> = HashMap::new();
let mut topic_next_sequence: HashMap<String, u64> = HashMap::new();
let mut action_goals: HashMap<String, ActionGoalState> = HashMap::new();
let mut next_action_goal_id = 1u64;
let mut mission_logs: HashMap<String, VecDeque<MissionEntry>> = HashMap::new();
let mut gateway_policy = GatewayPolicyState::default();
let mut gateway_audit: VecDeque<GatewayAuditEvent> = VecDeque::new();
let mut next_audit_seq = 1u64;
append_audit(
&mut gateway_audit,
&mut next_audit_seq,
1024,
audit_enabled,
"lifecycle",
"gateway_boot",
"ok",
serde_json::json!({
"bind": bind.clone(),
"source": source.clone(),
"mode": "gateway_serve",
}),
);
if source != "demo" && source != "file" && source != "live" {
return Err(format!(
"unsupported --source value: {source} (expected live|demo|file)"
));
}
let server = make_udp_service_server(&bind)?;
if json {
let boot = serde_json::json!({
"api_version": STATUS_SERVICE_API_VERSION,
"mode": "gateway_serve",
"service": STATUS_SERVICE_NAME,
"bind": bind,
"source": source,
"status_report": report_path,
"runtime_report": runtime_report_path,
"middleware_report": middleware_report_path,
"resource_report": resource_report_path,
"audit_enabled": audit_enabled,
"once": once,
});
println!(
"{}",
serde_json::to_string_pretty(&boot)
.map_err(|err| format!("serialize gateway server bootstrap failed: {err}"))?
);
} else {
println!("RobotRT Gateway Serve");
println!("bind: {}", bind);
println!("source: {}", source);
println!("status_report: {}", report_path.display());
println!("runtime_report: {}", runtime_report_path);
println!("middleware_report: {}", middleware_report_path);
println!("resource_report: {}", resource_report_path);
println!("audit_enabled: {}", audit_enabled);
println!("once: {}", once);
}
loop {
let incoming = recv_status_request(&server)?;
let req = incoming.request;
let request_id = incoming.request_id;
let peer = incoming.peer;
let service_name = incoming.service;
if let Err(reason) = validate_request(&req) {
server
.reply_error(&service_name, request_id, peer, reason)
.map_err(|err| format!("gateway server send error response failed: {err}"))?;
if once {
return Ok(());
}
continue;
}
if req.op == STATUS_OP_SNAPSHOT_CONFIG_GET {
let payload = success_snapshot_config_response(&req.op, snapshot_config.clone());
server
.reply_json(&service_name, request_id, peer, &payload)
.map_err(|err| format!("gateway server send response failed: {err}"))?;
if once {
return Ok(());
}
continue;
}
if req.op == STATUS_OP_SNAPSHOT_CONFIG_SET {
let config = req.snapshot_config.ok_or_else(|| {
String::from(
"missing snapshot_config payload for op=snapshot_config_set",
)
})?;
snapshot_config = config;
let payload = success_snapshot_config_response(&req.op, snapshot_config.clone());
server
.reply_json(&service_name, request_id, peer, &payload)
.map_err(|err| format!("gateway server send response failed: {err}"))?;
if once {
return Ok(());
}
continue;
}
let mut shutdown_after_response = false;
let response = match req.op.as_str() {
STATUS_OP_SNAPSHOT => {
if !snapshot_config.include_flags.status {
Err(String::from("status report is disabled by include_flags.status"))
} else {
let snapshot = if source == "demo" {
Ok(demo_status_snapshot())
} else {
introspection_core::read_status_snapshot(&snapshot_config.status_report_path)
.map_err(|err| {
format!(
"read status snapshot from {} failed: {err}",
snapshot_config.status_report_path
)
})
};
snapshot.map(|snapshot| crate::gateway::success_response(request_id, snapshot))
}
}
STATUS_OP_RUNTIME => {
if !snapshot_config.include_flags.runtime {
Err(String::from(
"runtime report is disabled by include_flags.runtime",
))
} else {
let runtime = if source == "demo" {
Ok(demo_runtime_report())
} else {
introspection_core::read_runtime_load_report(
&snapshot_config.runtime_report_path,
)
.map_err(|err| {
format!(
"read runtime report from {} failed: {err}",
snapshot_config.runtime_report_path
)
})
};
runtime.map(success_runtime_response)
}
}
STATUS_OP_MIDDLEWARE => {
if !snapshot_config.include_flags.middleware {
Err(String::from(
"middleware report is disabled by include_flags.middleware",
))
} else {
let middleware = if source == "demo" {
Ok(demo_middleware_report())
} else {
introspection_core::read_middleware_load_report(
&snapshot_config.middleware_report_path,
)
.map_err(|err| {
format!(
"read middleware report from {} failed: {err}",
snapshot_config.middleware_report_path
)
})
};
middleware.map(success_middleware_response)
}
}
STATUS_OP_RESOURCE => {
if !snapshot_config.include_flags.resource {
Err(String::from(
"resource report is disabled by include_flags.resource",
))
} else {
let resource = if source == "demo" {
Ok(demo_resource_report())
} else {
introspection_core::read_resource_catalog_report(
&snapshot_config.resource_report_path,
)
.map_err(|err| {
format!(
"read resource report from {} failed: {err}",
snapshot_config.resource_report_path
)
})
};
resource.map(success_resource_response)
}
}
STATUS_OP_GATEWAY_OBSERVE => {
let payload = req.op_payload.unwrap_or(Value::Null);
let audit_limit = payload
.get("audit_limit")
.and_then(Value::as_u64)
.unwrap_or(20)
.clamp(1, 200) as usize;
let snapshot = if snapshot_config.include_flags.status {
if source == "demo" {
Some(demo_status_snapshot())
} else {
Some(
introspection_core::read_status_snapshot(
&snapshot_config.status_report_path,
)
.map_err(|err| {
format!(
"read status snapshot from {} failed: {err}",
snapshot_config.status_report_path
)
})?,
)
}
} else {
None
};
let runtime = if snapshot_config.include_flags.runtime {
if source == "demo" {
Some(demo_runtime_report())
} else {
Some(
introspection_core::read_runtime_load_report(
&snapshot_config.runtime_report_path,
)
.map_err(|err| {
format!(
"read runtime report from {} failed: {err}",
snapshot_config.runtime_report_path
)
})?,
)
}
} else {
None
};
let middleware = if snapshot_config.include_flags.middleware {
if source == "demo" {
Some(demo_middleware_report())
} else {
Some(
introspection_core::read_middleware_load_report(
&snapshot_config.middleware_report_path,
)
.map_err(|err| {
format!(
"read middleware report from {} failed: {err}",
snapshot_config.middleware_report_path
)
})?,
)
}
} else {
None
};
let resource = if snapshot_config.include_flags.resource {
if source == "demo" {
Some(demo_resource_report())
} else {
Some(
introspection_core::read_resource_catalog_report(
&snapshot_config.resource_report_path,
)
.map_err(|err| {
format!(
"read resource report from {} failed: {err}",
snapshot_config.resource_report_path
)
})?,
)
}
} else {
None
};
let snapshot_summary = snapshot.as_ref().map(summarize_snapshot);
let runtime_summary = runtime.as_ref().map(summarize_runtime);
let middleware_summary = middleware.as_ref().map(summarize_middleware);
let resource_summary = resource.as_ref().map(summarize_resource);
let mut audit_tail = gateway_audit
.iter()
.rev()
.take(audit_limit)
.map(|item| {
serde_json::json!({
"sequence": item.sequence,
"captured_at_unix_nanos": item.captured_at_unix_nanos,
"category": item.category,
"action": item.action,
"status": item.status,
"detail": item.detail,
})
})
.collect::<Vec<_>>();
audit_tail.reverse();
let observe = serde_json::json!({
"policy": policy_to_json(&gateway_policy),
"topology": {
"nodes": snapshot.as_ref().map(|item| item.nodes.len()).unwrap_or(0),
"edges": snapshot.as_ref().map(|item| item.edges.len()).unwrap_or(0),
},
"sessions": {
"topic_streams": topic_streams.len(),
"topic_buffer_depth_total": topic_buffers.values().map(VecDeque::len).sum::<usize>(),
"action_goals": action_goals.len(),
"mission_queues": mission_logs.len(),
},
"audit": {
"enabled": audit_enabled,
"total": gateway_audit.len(),
"tail": audit_tail,
},
"reports": {
"snapshot": snapshot_summary,
"runtime": runtime_summary,
"middleware": middleware_summary,
"resource": resource_summary,
}
});
append_audit(
&mut gateway_audit,
&mut next_audit_seq,
1024,
audit_enabled,
"observe",
"gateway_observe",
"ok",
serde_json::json!({"audit_limit": audit_limit}),
);
Ok(success_op_result_response(STATUS_OP_GATEWAY_OBSERVE, observe))
}
STATUS_OP_GATEWAY_POLICY_GET => Ok(success_op_result_response(
STATUS_OP_GATEWAY_POLICY_GET,
serde_json::json!({
"policy": policy_to_json(&gateway_policy),
}),
)),
STATUS_OP_GATEWAY_POLICY_SET => {
let payload = req.op_payload.ok_or_else(|| {
String::from("missing op_payload for op=gateway_policy_set")
})?;
let mut changes = serde_json::Map::new();
if let Some(route_preference) = payload
.get("route_preference")
.and_then(Value::as_str)
{
if !matches!(route_preference, "local_first" | "network_first" | "adaptive") {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!(
"unsupported route_preference={route_preference} expected local_first|network_first|adaptive"
),
));
}
gateway_policy.route_preference = route_preference.to_string();
changes.insert(
"route_preference".to_string(),
Value::String(route_preference.to_string()),
);
}
if let Some(rate_limit_per_sec) = payload
.get("rate_limit_per_sec")
.and_then(Value::as_u64)
{
gateway_policy.rate_limit_per_sec = rate_limit_per_sec.max(1);
changes.insert(
"rate_limit_per_sec".to_string(),
Value::from(gateway_policy.rate_limit_per_sec),
);
}
if let Some(namespace_isolation) = payload.get("namespace_isolation") {
gateway_policy.namespace_isolation = namespace_isolation
.as_str()
.map(ToString::to_string)
.filter(|value| !value.trim().is_empty());
changes.insert(
"namespace_isolation".to_string(),
gateway_policy
.namespace_isolation
.as_ref()
.map(|value| Value::String(value.clone()))
.unwrap_or(Value::Null),
);
}
if let Some(rollback_enabled) = payload
.get("rollback_enabled")
.and_then(Value::as_bool)
{
gateway_policy.rollback_enabled = rollback_enabled;
changes.insert(
"rollback_enabled".to_string(),
Value::Bool(rollback_enabled),
);
}
if let Some(topic_max_retry) = payload.get("topic_max_retry") {
let value = topic_max_retry.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"topic_max_retry must be an unsigned integer",
)
})?;
if value > u8::MAX as u64 {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!("topic_max_retry={value} exceeds u8 max={}", u8::MAX),
));
}
gateway_policy.topic_max_retry = value as u8;
changes.insert("topic_max_retry".to_string(), Value::from(value));
}
if let Some(topic_retry_timeout_ms) = payload.get("topic_retry_timeout_ms") {
let value = topic_retry_timeout_ms.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"topic_retry_timeout_ms must be an unsigned integer",
)
})?;
gateway_policy.topic_retry_timeout_ms = value.max(1);
changes.insert(
"topic_retry_timeout_ms".to_string(),
Value::from(gateway_policy.topic_retry_timeout_ms),
);
}
if let Some(service_retry_timeout_ms) = payload.get("service_retry_timeout_ms") {
let value = service_retry_timeout_ms.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"service_retry_timeout_ms must be an unsigned integer",
)
})?;
gateway_policy.service_retry_timeout_ms = value.max(1);
changes.insert(
"service_retry_timeout_ms".to_string(),
Value::from(gateway_policy.service_retry_timeout_ms),
);
}
if let Some(action_retry_timeout_ms) = payload.get("action_retry_timeout_ms") {
let value = action_retry_timeout_ms.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"action_retry_timeout_ms must be an unsigned integer",
)
})?;
gateway_policy.action_retry_timeout_ms = value.max(1);
changes.insert(
"action_retry_timeout_ms".to_string(),
Value::from(gateway_policy.action_retry_timeout_ms),
);
}
if let Some(mission_retry_timeout_ms) = payload.get("mission_retry_timeout_ms") {
let value = mission_retry_timeout_ms.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"mission_retry_timeout_ms must be an unsigned integer",
)
})?;
gateway_policy.mission_retry_timeout_ms = value.max(1);
changes.insert(
"mission_retry_timeout_ms".to_string(),
Value::from(gateway_policy.mission_retry_timeout_ms),
);
}
if let Some(topic_max_inflight) = payload.get("topic_max_inflight") {
let value = topic_max_inflight.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"topic_max_inflight must be an unsigned integer",
)
})?;
gateway_policy.topic_max_inflight = (value as usize).max(1);
changes.insert(
"topic_max_inflight".to_string(),
Value::from(gateway_policy.topic_max_inflight as u64),
);
}
if let Some(topic_replay_window) = payload.get("topic_replay_window") {
gateway_policy.topic_replay_window = if topic_replay_window.is_null() {
None
} else {
let value = topic_replay_window.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"topic_replay_window must be an unsigned integer or null",
)
})?;
Some(value as usize)
};
changes.insert(
"topic_replay_window".to_string(),
gateway_policy
.topic_replay_window
.map(|value| Value::from(value as u64))
.unwrap_or(Value::Null),
);
}
if let Some(topic_dedupe_window) = payload.get("topic_dedupe_window") {
gateway_policy.topic_dedupe_window = if topic_dedupe_window.is_null() {
None
} else {
let value = topic_dedupe_window.as_u64().ok_or_else(|| {
gateway_error(
GATEWAY_ERR_INVALID_META,
"topic_dedupe_window must be an unsigned integer or null",
)
})?;
Some((value as usize).max(1))
};
changes.insert(
"topic_dedupe_window".to_string(),
gateway_policy
.topic_dedupe_window
.map(|value| Value::from(value as u64))
.unwrap_or(Value::Null),
);
}
if let Some(topic_replay_strategy) = payload
.get("topic_replay_strategy")
.and_then(Value::as_str)
{
if !matches!(topic_replay_strategy, "block_publisher" | "drop_oldest") {
return Err(gateway_error(
GATEWAY_ERR_INVALID_META,
format!(
"unsupported topic_replay_strategy={topic_replay_strategy} expected block_publisher|drop_oldest"
),
));
}
gateway_policy.topic_replay_strategy = topic_replay_strategy.to_string();
changes.insert(
"topic_replay_strategy".to_string(),
Value::String(topic_replay_strategy.to_string()),
);
}
gateway_policy.updated_at_unix_nanos = now_unix_nanos();
let receipt_id = format!("receipt-{}", gateway_policy.updated_at_unix_nanos);
let audit_seq = append_audit(
&mut gateway_audit,
&mut next_audit_seq,
1024,
audit_enabled,
"control",
"gateway_policy_set",
"ok",
serde_json::json!({
"receipt_id": receipt_id,
"changes": Value::Object(changes.clone()),
}),
);
Ok(success_op_result_response(
STATUS_OP_GATEWAY_POLICY_SET,
serde_json::json!({
"receipt_id": receipt_id,
"audit_sequence": audit_seq,
"applied": Value::Object(changes),
"policy": policy_to_json(&gateway_policy),
}),
))
}
STATUS_OP_GATEWAY_AUDIT_LIST => {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=gateway_audit_list"))?;
let limit = payload
.get("limit")
.and_then(Value::as_u64)
.unwrap_or(20)
.clamp(1, 200) as usize;
let category = payload
.get("category")
.and_then(Value::as_str)
.map(ToString::to_string);
let mut entries = gateway_audit
.iter()
.rev()
.filter(|item| {
category
.as_ref()
.map(|wanted| item.category == *wanted)
.unwrap_or(true)
})
.take(limit)
.map(|item| {
serde_json::json!({
"sequence": item.sequence,
"captured_at_unix_nanos": item.captured_at_unix_nanos,
"category": item.category,
"action": item.action,
"status": item.status,
"detail": item.detail,
})
})
.collect::<Vec<_>>();
entries.reverse();
Ok(success_op_result_response(
STATUS_OP_GATEWAY_AUDIT_LIST,
serde_json::json!({
"enabled": audit_enabled,
"total": gateway_audit.len(),
"entries": entries,
}),
))
}
STATUS_OP_GATEWAY_SHUTDOWN => {
shutdown_after_response = true;
let receipt_id = format!("shutdown-{}", now_unix_nanos());
let audit_seq = append_audit(
&mut gateway_audit,
&mut next_audit_seq,
1024,
audit_enabled,
"lifecycle",
"gateway_shutdown",
"ok",
serde_json::json!({"receipt_id": receipt_id}),
);
Ok(success_op_result_response(
STATUS_OP_GATEWAY_SHUTDOWN,
serde_json::json!({
"receipt_id": receipt_id,
"accepted": true,
"audit_sequence": audit_seq,
}),
))
}
STATUS_OP_SERVICE_CALL => {
let call = req.service_call.ok_or_else(|| {
String::from("missing service_call payload for op=service_call")
})?;
if source != "demo" {
Err(String::from(
"service_call is only supported in demo source for gateway serve",
))
} else {
let accepted = call.service == "/demo/echo" || call.service.ends_with("/echo");
let result = if accepted {
ServiceCallResult {
service: call.service,
accepted: true,
response: Some(format!("echo:{}", call.request)),
error: None,
}
} else {
ServiceCallResult {
service: call.service,
accepted: false,
response: None,
error: Some(String::from(
"demo service router only supports /demo/echo",
)),
}
};
Ok(success_service_call_response(result))
}
}
STATUS_OP_TOPIC_SUBSCRIBE => {
if source == "file" {
Err(String::from(
"topic_subscribe is not supported in file source for gateway serve",
))
} else {
let payload = req.op_payload.ok_or_else(|| {
String::from("missing op_payload for op=topic_subscribe")
})?;
let topic = parse_required_string(&payload, "topic")?;
let max_batch = payload
.get("max_batch")
.and_then(Value::as_u64)
.unwrap_or(16)
.clamp(1, 256) as usize;
let cursor = topic_buffers
.get(&topic)
.and_then(|queue| queue.back())
.map(|frame| frame.sequence)
.unwrap_or(0);
let stream_id = format!("ts-{next_topic_stream_id}");
next_topic_stream_id = next_topic_stream_id.saturating_add(1);
topic_streams.insert(
stream_id.clone(),
TopicStreamState {
topic: topic.clone(),
cursor,
next_sequence: 1,
max_batch,
},
);
Ok(success_op_result_response(
STATUS_OP_TOPIC_SUBSCRIBE,
serde_json::json!({
"stream_id": stream_id,
"topic": topic,
"accepted": true,
"governance_only": true,
"control_plane": "open_stream",
"data_plane": {
"mode": "governance_sample",
"supports_external_ref": false,
"default_transport": "inline_control"
}
}),
))
}
}
STATUS_OP_TOPIC_POLL => {
if source == "file" {
Err(String::from(
"topic_poll is not supported in file source for gateway serve",
))
} else {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=topic_poll"))?;
let stream_id = parse_required_string(&payload, "stream_id")?;
let state = topic_streams.get_mut(&stream_id).ok_or_else(|| {
format!("unknown topic stream_id={stream_id}")
})?;
let max_items = payload
.get("max_items")
.and_then(Value::as_u64)
.unwrap_or(state.max_batch as u64)
.clamp(1, 256) as usize;
let mut frames = Vec::with_capacity(max_items);
let mut cursor = state.cursor;
if source == "demo" {
for _ in 0..max_items {
let sequence = state.next_sequence;
state.next_sequence = state.next_sequence.saturating_add(1);
frames.push(serde_json::json!({
"sequence": sequence,
"captured_at_unix_nanos": now_unix_nanos(),
"transport": "inline",
"payload": format!("payload:{}:{}", state.topic, sequence),
"payload_base64": null,
"packet_kind": "data",
"control_label": null,
"schema_id": null,
"schema_version": null,
"external_ref": null,
}));
cursor = sequence;
}
} else if let Some(queue) = topic_buffers.get(&state.topic) {
for frame in queue.iter() {
if frame.sequence <= cursor {
continue;
}
if frames.len() >= max_items {
break;
}
frames.push(serde_json::json!({
"sequence": frame.sequence,
"captured_at_unix_nanos": frame.captured_at_unix_nanos,
"transport": frame.transport,
"payload": frame.payload,
"payload_base64": frame.payload_base64,
"packet_kind": frame.packet_kind,
"control_label": frame.control_label,
"schema_id": frame.schema_id,
"schema_version": frame.schema_version,
"external_ref": frame.external_ref,
}));
cursor = frame.sequence;
}
}
state.cursor = cursor;
Ok(success_op_result_response(
STATUS_OP_TOPIC_POLL,
serde_json::json!({
"stream_id": stream_id,
"topic": state.topic.clone(),
"frames": frames,
}),
))
}
}
STATUS_OP_TOPIC_UNSUBSCRIBE => {
if source == "file" {
Err(String::from(
"topic_unsubscribe is not supported in file source for gateway serve",
))
} else {
let payload = req.op_payload.ok_or_else(|| {
String::from("missing op_payload for op=topic_unsubscribe")
})?;
let stream_id = parse_required_string(&payload, "stream_id")?;
let removed = topic_streams.remove(&stream_id).is_some();
Ok(success_op_result_response(
STATUS_OP_TOPIC_UNSUBSCRIBE,
serde_json::json!({
"stream_id": stream_id,
"removed": removed,
}),
))
}
}
STATUS_OP_TOPIC_PUBLISH => {
if source == "file" {
Err(String::from(
"topic_publish is not supported in file source for gateway serve",
))
} else {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=topic_publish"))?;
let topic = parse_required_string(&payload, "topic")?;
let transport = payload
.get("transport")
.and_then(Value::as_str)
.unwrap_or("local")
.to_string();
let body = payload.get("payload").cloned().unwrap_or(Value::Null);
let payload_base64 = payload
.get("payload_base64")
.and_then(Value::as_str)
.map(ToString::to_string);
let external_ref = payload.get("external_ref").cloned();
let packet_kind = payload
.get("packet_kind")
.and_then(Value::as_str)
.map(ToString::to_string)
.unwrap_or_else(|| String::from("data"));
let control_label = payload
.get("control_label")
.and_then(Value::as_str)
.map(ToString::to_string);
let schema_id = payload
.get("schema_id")
.and_then(Value::as_str)
.map(ToString::to_string);
let schema_version = payload
.get("schema_version")
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok());
let captured = payload
.get("captured_at_unix_nanos")
.and_then(Value::as_u64)
.map(u128::from)
.unwrap_or_else(now_unix_nanos);
if packet_kind != "control" {
Err(gateway_error(
GATEWAY_ERR_DATA_PLANE_BLOCKED,
"gateway only accepts packet_kind=control for topic_publish",
))
} else if payload_base64.is_some() || external_ref.is_some() {
Err(gateway_error(
GATEWAY_ERR_DATA_PLANE_BLOCKED,
"gateway topic_publish forbids payload_base64/external_ref forwarding",
))
} else if control_label.is_none() {
Err(gateway_error(
GATEWAY_ERR_MISSING_FIELD,
"missing op_payload.control_label for control packet",
))
} else {
let body_size = serde_json::to_vec(&body)
.map(|bytes| bytes.len())
.unwrap_or(usize::MAX);
if body_size > 2048 {
Err(gateway_error(
GATEWAY_ERR_DATA_PLANE_BLOCKED,
format!(
"gateway control payload too large: {body_size} bytes (max=2048)"
),
))
} else {
let next_sequence = topic_next_sequence.entry(topic.clone()).or_insert(1);
let sequence = payload
.get("sequence")
.and_then(Value::as_u64)
.unwrap_or(*next_sequence);
*next_sequence = (*next_sequence).max(sequence.saturating_add(1));
let queue = topic_buffers.entry(topic.clone()).or_default();
queue.push_back(LiveTopicFrame {
sequence,
captured_at_unix_nanos: captured,
transport,
payload: body,
payload_base64,
external_ref,
packet_kind: Some(packet_kind),
control_label,
schema_id,
schema_version,
});
while queue.len() > 4096 {
queue.pop_front();
}
Ok(success_op_result_response(
STATUS_OP_TOPIC_PUBLISH,
serde_json::json!({
"topic": topic,
"sequence": sequence,
"accepted": true,
"queue_depth": queue.len(),
"governance_only": true,
}),
))
}
}
}
}
STATUS_OP_ACTION_SEND => {
if source != "demo" {
Err(String::from(
"action_send is only supported in demo source for gateway serve",
))
} else {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=action_send"))?;
let action = parse_required_string(&payload, "action")?;
let goal = parse_required_string(&payload, "goal")?;
let goal_id = format!("goal-{next_action_goal_id}");
next_action_goal_id = next_action_goal_id.saturating_add(1);
action_goals.insert(
goal_id.clone(),
ActionGoalState {
action: action.clone(),
goal: goal.clone(),
state: String::from("executing"),
feedback: Some(format!("accepted goal: {goal}")),
result: None,
error: None,
updated_at_unix_nanos: now_unix_nanos(),
},
);
Ok(success_op_result_response(
STATUS_OP_ACTION_SEND,
serde_json::json!({
"action": action,
"goal_id": goal_id,
"accepted": true,
"state": "executing",
}),
))
}
}
STATUS_OP_ACTION_CANCEL => {
if source != "demo" {
Err(String::from(
"action_cancel is only supported in demo source for gateway serve",
))
} else {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=action_cancel"))?;
let action = parse_required_string(&payload, "action")?;
let goal_id = parse_required_string(&payload, "goal_id")?;
let mut canceled = false;
let mut state_name = String::from("unknown");
let mut error: Option<String> = None;
if let Some(goal) = action_goals.get_mut(&goal_id) {
if goal.action != action {
error = Some(format!(
"goal_id={goal_id} belongs to action={} not action={action}",
goal.action
));
state_name = goal.state.clone();
} else if goal.state == "executing" {
goal.state = String::from("canceled");
goal.result = None;
goal.feedback = Some(String::from("goal canceled by operator"));
goal.updated_at_unix_nanos = now_unix_nanos();
canceled = true;
state_name = goal.state.clone();
} else {
state_name = goal.state.clone();
}
} else {
error = Some(format!("unknown goal_id={goal_id}"));
}
Ok(success_op_result_response(
STATUS_OP_ACTION_CANCEL,
serde_json::json!({
"action": action,
"goal_id": goal_id,
"canceled": canceled,
"state": state_name,
"error": error,
}),
))
}
}
STATUS_OP_ACTION_WATCH => {
if source != "demo" {
Err(String::from(
"action_watch is only supported in demo source for gateway serve",
))
} else {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=action_watch"))?;
let action_name = payload
.get("action")
.and_then(Value::as_str)
.map(|value| value.to_string());
let goal_id = payload
.get("goal_id")
.and_then(Value::as_str)
.map(|value| value.to_string());
let now = now_unix_nanos();
for goal in action_goals.values_mut() {
let stale_ns = now.saturating_sub(goal.updated_at_unix_nanos);
if goal.state == "executing" && stale_ns > 1_200_000_000 {
goal.state = String::from("succeeded");
goal.result = Some(format!("done:{}", goal.goal));
goal.feedback = Some(String::from("goal completed"));
goal.updated_at_unix_nanos = now;
}
}
let goals = action_goals
.iter()
.filter(|(id, goal)| {
action_name
.as_ref()
.map(|name| goal.action == *name)
.unwrap_or(true)
&& goal_id
.as_ref()
.map(|wanted| *id == wanted)
.unwrap_or(true)
})
.map(|(id, goal)| {
serde_json::json!({
"goal_id": id,
"action": goal.action.clone(),
"goal": goal.goal.clone(),
"state": goal.state.clone(),
"feedback": goal.feedback.clone(),
"result": goal.result.clone(),
"error": goal.error.clone(),
"updated_at_unix_nanos": goal.updated_at_unix_nanos,
})
})
.collect::<Vec<_>>();
Ok(success_op_result_response(
STATUS_OP_ACTION_WATCH,
serde_json::json!({
"goals": goals,
}),
))
}
}
STATUS_OP_MISSION_REQUEST => {
if source != "demo" {
Err(String::from(
"mission_request is only supported in demo source for gateway serve",
))
} else {
let payload = req.op_payload.ok_or_else(|| {
String::from("missing op_payload for op=mission_request")
})?;
let mission = parse_required_string(&payload, "mission")?;
let message = parse_required_string(&payload, "message")?;
let queue = mission_logs.entry(mission.clone()).or_default();
queue.push_back(MissionEntry {
direction: String::from("request"),
message: message.clone(),
captured_at_unix_nanos: now_unix_nanos(),
});
let auto_reply = format!("ack:{message}");
queue.push_back(MissionEntry {
direction: String::from("response"),
message: auto_reply.clone(),
captured_at_unix_nanos: now_unix_nanos(),
});
Ok(success_op_result_response(
STATUS_OP_MISSION_REQUEST,
serde_json::json!({
"mission": mission,
"accepted": true,
"response": auto_reply,
}),
))
}
}
STATUS_OP_MISSION_REPLY => {
if source != "demo" {
Err(String::from(
"mission_reply is only supported in demo source for gateway serve",
))
} else {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=mission_reply"))?;
let mission = parse_required_string(&payload, "mission")?;
let message = parse_required_string(&payload, "message")?;
let queue = mission_logs.entry(mission.clone()).or_default();
queue.push_back(MissionEntry {
direction: String::from("reply"),
message,
captured_at_unix_nanos: now_unix_nanos(),
});
Ok(success_op_result_response(
STATUS_OP_MISSION_REPLY,
serde_json::json!({
"mission": mission,
"accepted": true,
}),
))
}
}
STATUS_OP_MISSION_WATCH => {
if source != "demo" {
Err(String::from(
"mission_watch is only supported in demo source for gateway serve",
))
} else {
let payload = req
.op_payload
.ok_or_else(|| String::from("missing op_payload for op=mission_watch"))?;
let mission = parse_required_string(&payload, "mission")?;
let max_items = payload
.get("max_items")
.and_then(Value::as_u64)
.unwrap_or(10)
.clamp(1, 256) as usize;
let queue = mission_logs.entry(mission.clone()).or_default();
let mut items = Vec::new();
while items.len() < max_items {
let Some(item) = queue.pop_front() else {
break;
};
items.push(serde_json::json!({
"direction": item.direction,
"message": item.message,
"captured_at_unix_nanos": item.captured_at_unix_nanos,
}));
}
Ok(success_op_result_response(
STATUS_OP_MISSION_WATCH,
serde_json::json!({
"mission": mission,
"entries": items,
}),
))
}
}
_ => Err(format!(
"gateway serve supports op={STATUS_OP_SNAPSHOT}|{STATUS_OP_RUNTIME}|{STATUS_OP_MIDDLEWARE}|{STATUS_OP_RESOURCE}|{STATUS_OP_GATEWAY_OBSERVE}|{STATUS_OP_GATEWAY_POLICY_GET}|{STATUS_OP_GATEWAY_POLICY_SET}|{STATUS_OP_GATEWAY_AUDIT_LIST}|{STATUS_OP_GATEWAY_SHUTDOWN}|{STATUS_OP_SERVICE_CALL}|{STATUS_OP_TOPIC_SUBSCRIBE}|{STATUS_OP_TOPIC_POLL}|{STATUS_OP_TOPIC_UNSUBSCRIBE}|{STATUS_OP_TOPIC_PUBLISH}|{STATUS_OP_ACTION_SEND}|{STATUS_OP_ACTION_CANCEL}|{STATUS_OP_ACTION_WATCH}|{STATUS_OP_MISSION_REQUEST}|{STATUS_OP_MISSION_REPLY}|{STATUS_OP_MISSION_WATCH}|{STATUS_OP_SNAPSHOT_CONFIG_GET}|{STATUS_OP_SNAPSHOT_CONFIG_SET}"
)),
};
match response {
Ok(payload) => {
server
.reply_json(&service_name, request_id, peer, &payload)
.map_err(|err| format!("gateway server send response failed: {err}"))?;
}
Err(err) => {
server
.reply_error(&service_name, request_id, peer, err)
.map_err(|err| format!("gateway server send error response failed: {err}"))?;
}
}
if shutdown_after_response {
return Ok(());
}
if once {
return Ok(());
}
}
}