use std::fs;
use std::path::Path;
use serde::Deserialize;
use serde_json::{Map, Value};
use crate::helpers::option_value;
const GATEWAY_POLICY_API_VERSION: &str = "robotrt.gateway.policy.v1";
#[derive(Debug, Deserialize)]
struct GatewayPolicyFile {
api_version: Option<String>,
route_preference: Option<String>,
rate_limit_per_sec: Option<u64>,
rollback_enabled: Option<bool>,
topic_max_retry: Option<u64>,
topic_retry_timeout_ms: Option<u64>,
service_retry_timeout_ms: Option<u64>,
action_retry_timeout_ms: Option<u64>,
mission_retry_timeout_ms: Option<u64>,
topic_max_inflight: Option<u64>,
topic_replay_strategy: Option<String>,
}
pub(super) fn parse_gateway_policy_set_payload(args: &[String]) -> Result<Map<String, Value>, String> {
let mut payload = Map::new();
if let Some(path) = option_value(args, "--policy") {
apply_policy_file(&mut payload, Path::new(&path))?;
}
if let Some(route_preference) = option_value(args, "--route-preference") {
validate_route_preference(&route_preference)?;
payload.insert(
String::from("route_preference"),
Value::String(route_preference),
);
}
if let Some(raw) = option_value(args, "--rate-limit") {
let value = raw
.parse::<u64>()
.map_err(|err| format!("invalid --rate-limit value: {raw} ({err})"))?;
payload.insert(String::from("rate_limit_per_sec"), Value::from(value));
}
if option_value(args, "--namespace-isolation").is_some() {
let namespace = option_value(args, "--namespace-isolation").unwrap_or_default();
if namespace.trim().is_empty() {
payload.insert(String::from("namespace_isolation"), Value::Null);
} else {
payload.insert(
String::from("namespace_isolation"),
Value::String(namespace),
);
}
}
if let Some(raw) = option_value(args, "--rollback-enabled") {
let value = parse_bool(&raw)?;
payload.insert(String::from("rollback_enabled"), Value::Bool(value));
}
if let Some(raw) = option_value(args, "--topic-max-retry") {
let value = parse_unsigned_option(raw.as_str(), "--topic-max-retry")?;
payload.insert(String::from("topic_max_retry"), Value::from(value));
}
if let Some(raw) = option_value(args, "--topic-retry-timeout-ms") {
let value = parse_unsigned_option(raw.as_str(), "--topic-retry-timeout-ms")?;
payload.insert(String::from("topic_retry_timeout_ms"), Value::from(value));
}
if let Some(raw) = option_value(args, "--service-retry-timeout-ms") {
let value = parse_unsigned_option(raw.as_str(), "--service-retry-timeout-ms")?;
payload.insert(String::from("service_retry_timeout_ms"), Value::from(value));
}
if let Some(raw) = option_value(args, "--action-retry-timeout-ms") {
let value = parse_unsigned_option(raw.as_str(), "--action-retry-timeout-ms")?;
payload.insert(String::from("action_retry_timeout_ms"), Value::from(value));
}
if let Some(raw) = option_value(args, "--mission-retry-timeout-ms") {
let value = parse_unsigned_option(raw.as_str(), "--mission-retry-timeout-ms")?;
payload.insert(String::from("mission_retry_timeout_ms"), Value::from(value));
}
if let Some(raw) = option_value(args, "--topic-max-inflight") {
let value = parse_unsigned_option(raw.as_str(), "--topic-max-inflight")?;
payload.insert(String::from("topic_max_inflight"), Value::from(value));
}
if let Some(raw) = option_value(args, "--topic-replay-window") {
if raw.eq_ignore_ascii_case("none") || raw.eq_ignore_ascii_case("null") {
payload.insert(String::from("topic_replay_window"), Value::Null);
} else {
let value = parse_unsigned_option(raw.as_str(), "--topic-replay-window")?;
payload.insert(String::from("topic_replay_window"), Value::from(value));
}
}
if let Some(raw) = option_value(args, "--topic-replay-strategy") {
validate_topic_replay_strategy(&raw)?;
payload.insert(String::from("topic_replay_strategy"), Value::String(raw));
}
if let Some(raw) = option_value(args, "--topic-dedupe-window") {
if raw.eq_ignore_ascii_case("none") || raw.eq_ignore_ascii_case("null") {
payload.insert(String::from("topic_dedupe_window"), Value::Null);
} else {
let value = parse_unsigned_option(raw.as_str(), "--topic-dedupe-window")?;
payload.insert(String::from("topic_dedupe_window"), Value::from(value));
}
}
Ok(payload)
}
fn apply_policy_file(payload: &mut Map<String, Value>, path: &Path) -> Result<(), String> {
let content = fs::read_to_string(path)
.map_err(|err| format!("read policy file {} failed: {err}", path.display()))?;
let raw: Value = serde_json::from_str(&content)
.map_err(|err| format!("parse policy file {} failed: {err}", path.display()))?;
let raw_object = raw.as_object().ok_or_else(|| {
format!(
"parse policy file {} failed: root must be a JSON object",
path.display()
)
})?;
let parsed: GatewayPolicyFile = serde_json::from_str(&content)
.map_err(|err| format!("parse policy file {} failed: {err}", path.display()))?;
if let Some(version) = parsed.api_version.as_deref()
&& version != GATEWAY_POLICY_API_VERSION
{
return Err(format!(
"unsupported policy api_version in {}: {version} (expected {GATEWAY_POLICY_API_VERSION})",
path.display()
));
}
if let Some(route_preference) = parsed.route_preference {
validate_route_preference(&route_preference)?;
payload.insert(
String::from("route_preference"),
Value::String(route_preference),
);
}
if let Some(rate_limit_per_sec) = parsed.rate_limit_per_sec {
payload.insert(String::from("rate_limit_per_sec"), Value::from(rate_limit_per_sec));
}
if let Some(namespace_isolation) = raw_object.get("namespace_isolation") {
match namespace_isolation {
Value::Null => {
payload.insert(String::from("namespace_isolation"), Value::Null);
}
Value::String(value) => {
payload.insert(
String::from("namespace_isolation"),
Value::String(value.clone()),
);
}
_ => {
return Err(format!(
"invalid namespace_isolation in {}: expected string or null",
path.display()
));
}
}
}
if let Some(rollback_enabled) = parsed.rollback_enabled {
payload.insert(String::from("rollback_enabled"), Value::Bool(rollback_enabled));
}
insert_optional_u64(payload, "topic_max_retry", parsed.topic_max_retry);
insert_optional_u64(
payload,
"topic_retry_timeout_ms",
parsed.topic_retry_timeout_ms,
);
insert_optional_u64(
payload,
"service_retry_timeout_ms",
parsed.service_retry_timeout_ms,
);
insert_optional_u64(
payload,
"action_retry_timeout_ms",
parsed.action_retry_timeout_ms,
);
insert_optional_u64(
payload,
"mission_retry_timeout_ms",
parsed.mission_retry_timeout_ms,
);
insert_optional_u64(payload, "topic_max_inflight", parsed.topic_max_inflight);
if let Some(topic_replay_window) = raw_object.get("topic_replay_window") {
match topic_replay_window {
Value::Null => {
payload.insert(String::from("topic_replay_window"), Value::Null);
}
Value::Number(number) => {
let value = number.as_u64().ok_or_else(|| {
format!(
"invalid topic_replay_window in {}: expected unsigned integer or null",
path.display()
)
})?;
payload.insert(String::from("topic_replay_window"), Value::from(value));
}
_ => {
return Err(format!(
"invalid topic_replay_window in {}: expected unsigned integer or null",
path.display()
));
}
}
}
if let Some(topic_replay_strategy) = parsed.topic_replay_strategy {
validate_topic_replay_strategy(&topic_replay_strategy)?;
payload.insert(
String::from("topic_replay_strategy"),
Value::String(topic_replay_strategy),
);
}
if let Some(topic_dedupe_window) = raw_object.get("topic_dedupe_window") {
match topic_dedupe_window {
Value::Null => {
payload.insert(String::from("topic_dedupe_window"), Value::Null);
}
Value::Number(number) => {
let value = number.as_u64().ok_or_else(|| {
format!(
"invalid topic_dedupe_window in {}: expected unsigned integer or null",
path.display()
)
})?;
payload.insert(String::from("topic_dedupe_window"), Value::from(value));
}
_ => {
return Err(format!(
"invalid topic_dedupe_window in {}: expected unsigned integer or null",
path.display()
));
}
}
}
Ok(())
}
fn insert_optional_u64(payload: &mut Map<String, Value>, key: &str, value: Option<u64>) {
if let Some(value) = value {
payload.insert(key.to_string(), Value::from(value));
}
}
fn parse_unsigned_option(raw: &str, option: &str) -> Result<u64, String> {
raw.parse::<u64>()
.map_err(|err| format!("invalid {option} value: {raw} ({err})"))
}
fn validate_route_preference(raw: &str) -> Result<(), String> {
if matches!(raw, "local_first" | "network_first" | "adaptive") {
return Ok(());
}
Err(format!(
"invalid route_preference value: {raw} (expected local_first|network_first|adaptive)"
))
}
fn validate_topic_replay_strategy(raw: &str) -> Result<(), String> {
if matches!(raw, "block_publisher" | "drop_oldest") {
return Ok(());
}
Err(format!(
"invalid topic_replay_strategy value: {raw} (expected block_publisher|drop_oldest)"
))
}
fn parse_bool(raw: &str) -> Result<bool, String> {
match raw.to_ascii_lowercase().as_str() {
"true" | "1" | "yes" | "on" => Ok(true),
"false" | "0" | "no" | "off" => Ok(false),
_ => Err(format!("invalid bool value: {raw}")),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_policy_path(prefix: &str) -> std::path::PathBuf {
let mut path = std::env::temp_dir();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("unix epoch")
.as_nanos();
path.push(format!("robotrt-gateway-policy-{prefix}-{nanos}.json"));
path
}
#[test]
fn policy_set_payload_uses_cli_over_file_precedence() {
let policy_path = temp_policy_path("precedence");
let policy_body = serde_json::json!({
"api_version": "robotrt.gateway.policy.v1",
"route_preference": "local_first",
"topic_retry_timeout_ms": 180,
"topic_dedupe_window": 32,
"service_retry_timeout_ms": 220
});
fs::write(
&policy_path,
serde_json::to_vec_pretty(&policy_body).expect("serialize policy"),
)
.expect("write policy");
let args = vec![
"--policy".to_string(),
policy_path.display().to_string(),
"--route-preference".to_string(),
"network_first".to_string(),
"--service-retry-timeout-ms".to_string(),
"300".to_string(),
];
let payload = parse_gateway_policy_set_payload(&args).expect("parse payload");
assert_eq!(
payload.get("route_preference").and_then(Value::as_str),
Some("network_first")
);
assert_eq!(
payload
.get("service_retry_timeout_ms")
.and_then(Value::as_u64),
Some(300)
);
assert_eq!(
payload
.get("topic_retry_timeout_ms")
.and_then(Value::as_u64),
Some(180)
);
assert_eq!(
payload
.get("topic_dedupe_window")
.and_then(Value::as_u64),
Some(32)
);
let _ = std::fs::remove_file(policy_path);
}
#[test]
fn policy_set_payload_supports_null_fields() {
let policy_path = temp_policy_path("nulls");
let policy_body = serde_json::json!({
"api_version": "robotrt.gateway.policy.v1",
"topic_replay_window": null,
"topic_dedupe_window": null,
"namespace_isolation": null
});
fs::write(
&policy_path,
serde_json::to_vec_pretty(&policy_body).expect("serialize policy"),
)
.expect("write policy");
let args = vec![
"--policy".to_string(),
policy_path.display().to_string(),
];
let payload = parse_gateway_policy_set_payload(&args).expect("parse payload");
assert!(payload.get("topic_replay_window").is_some_and(Value::is_null));
assert!(payload.get("topic_dedupe_window").is_some_and(Value::is_null));
assert!(payload.get("namespace_isolation").is_some_and(Value::is_null));
let _ = std::fs::remove_file(policy_path);
}
}