use anyhow::{anyhow, Result};
use kdl::KdlNode;
use std::collections::HashMap;
use std::path::PathBuf;
use zentinel_common::TraceIdFormat;
use crate::namespace::ExportConfig;
use crate::{
AgentConfig, Limits, ListenerConfig, NamespaceConfig, ObservabilityConfig, RouteConfig,
ServerConfig, ServiceConfig, UpstreamConfig, WafConfig,
};
pub(super) fn get_string_entry(node: &KdlNode, name: &str) -> Option<String> {
for entry in node.entries() {
if let Some(entry_name) = entry.name() {
if entry_name.value() == name {
return entry.value().as_string().map(|s| s.to_string());
}
}
}
if let Some(children) = node.children() {
if let Some(child) = children.get(name) {
if let Some(first_entry) = child.entries().first() {
return first_entry.value().as_string().map(|s| s.to_string());
}
}
}
None
}
pub(super) fn get_int_entry(node: &KdlNode, name: &str) -> Option<i64> {
for entry in node.entries() {
if let Some(entry_name) = entry.name() {
if entry_name.value() == name {
if let Some(i) = entry.value().as_integer() {
return Some(i as i64);
}
}
}
}
if let Some(children) = node.children() {
if let Some(child) = children.get(name) {
if let Some(first_entry) = child.entries().first() {
if let Some(i) = first_entry.value().as_integer() {
return Some(i as i64);
}
}
}
}
None
}
pub(super) fn get_bool_entry(node: &KdlNode, name: &str) -> Option<bool> {
for entry in node.entries() {
if let Some(entry_name) = entry.name() {
if entry_name.value() == name {
return entry.value().as_bool();
}
}
}
if let Some(children) = node.children() {
if let Some(child) = children.get(name) {
if let Some(first_entry) = child.entries().first() {
return first_entry.value().as_bool();
}
}
}
None
}
pub(super) fn get_first_arg_string(node: &KdlNode) -> Option<String> {
node.entries()
.first()
.and_then(|e| e.value().as_string())
.map(|s| s.to_string())
}
pub(super) fn parse_server(node: &KdlNode) -> Result<ServerConfig> {
Ok(ServerConfig {
worker_threads: get_int_entry(node, "worker-threads")
.map(|v| v as usize)
.unwrap_or(0),
max_connections: get_int_entry(node, "max-connections")
.map(|v| v as usize)
.unwrap_or(10000),
graceful_shutdown_timeout_secs: get_int_entry(node, "graceful-shutdown-timeout-secs")
.map(|v| v as u64)
.unwrap_or(30),
daemon: get_bool_entry(node, "daemon").unwrap_or(false),
pid_file: get_string_entry(node, "pid-file").map(PathBuf::from),
user: get_string_entry(node, "user"),
group: get_string_entry(node, "group"),
working_directory: get_string_entry(node, "working-directory").map(PathBuf::from),
trace_id_format: get_string_entry(node, "trace-id-format")
.map(|s| TraceIdFormat::from_str_loose(&s))
.unwrap_or_default(),
auto_reload: get_bool_entry(node, "auto-reload").unwrap_or(false),
})
}
pub(super) fn parse_listener(node: &KdlNode) -> Result<ListenerConfig> {
let id = get_first_arg_string(node).ok_or_else(|| anyhow!("Listener requires an ID"))?;
Ok(ListenerConfig {
id,
address: get_string_entry(node, "address").unwrap_or_else(|| "0.0.0.0:8080".to_string()),
protocol: match get_string_entry(node, "protocol").as_deref() {
Some("https") => crate::ListenerProtocol::Https,
_ => crate::ListenerProtocol::Http,
},
tls: None, default_route: get_string_entry(node, "default-route"),
request_timeout_secs: get_int_entry(node, "request-timeout-secs")
.map(|v| v as u64)
.unwrap_or(60),
keepalive_timeout_secs: get_int_entry(node, "keepalive-timeout-secs")
.map(|v| v as u64)
.unwrap_or(75),
max_concurrent_streams: get_int_entry(node, "max-concurrent-streams")
.map(|v| v as u32)
.unwrap_or(100),
keepalive_max_requests: get_int_entry(node, "keepalive-max-requests").map(|v| v as u32),
})
}
pub(super) fn parse_route(node: &KdlNode) -> Result<RouteConfig> {
let id = get_first_arg_string(node).ok_or_else(|| anyhow!("Route requires an ID"))?;
let mut matches = Vec::new();
if let Some(path) = get_string_entry(node, "path") {
matches.push(crate::MatchCondition::PathPrefix(path));
}
if let Some(path_prefix) = get_string_entry(node, "path-prefix") {
matches.push(crate::MatchCondition::PathPrefix(path_prefix));
}
if matches.is_empty() {
matches.push(crate::MatchCondition::PathPrefix("/".to_string()));
}
Ok(RouteConfig {
id,
priority: crate::Priority::NORMAL,
matches,
upstream: get_string_entry(node, "upstream"),
service_type: crate::ServiceType::Web,
policies: crate::RoutePolicies::default(),
filters: Vec::new(),
builtin_handler: None,
waf_enabled: get_bool_entry(node, "waf-enabled").unwrap_or(false),
circuit_breaker: None,
retry_policy: None,
static_files: None,
api_schema: None,
inference: None,
error_pages: None,
websocket: get_bool_entry(node, "websocket").unwrap_or(false),
websocket_inspection: get_bool_entry(node, "websocket-inspection").unwrap_or(false),
shadow: None,
fallback: None,
})
}
pub(super) fn parse_upstream(node: &KdlNode) -> Result<(String, UpstreamConfig)> {
let name = get_first_arg_string(node).ok_or_else(|| anyhow!("Upstream requires a name"))?;
let mut targets = Vec::new();
if let Some(children) = node.children() {
if let Some(targets_node) = children.get("targets") {
if let Some(target_children) = targets_node.children() {
for target_node in target_children.nodes() {
if target_node.name().value() == "target" {
let address = get_string_entry(target_node, "address")
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let weight = get_int_entry(target_node, "weight")
.map(|v| v as u32)
.unwrap_or(1);
targets.push(crate::UpstreamTarget {
address,
weight,
max_requests: None,
metadata: HashMap::new(),
});
}
}
}
}
}
if targets.is_empty() {
if let Some(address) = get_string_entry(node, "address") {
targets.push(crate::UpstreamTarget {
address,
weight: 1,
max_requests: None,
metadata: HashMap::new(),
});
}
}
Ok((
name.clone(),
UpstreamConfig {
id: name,
targets,
load_balancing: get_string_entry(node, "load-balancing")
.map(|s| match s.as_str() {
"round_robin" => crate::LoadBalancingAlgorithm::RoundRobin,
"least_connections" => crate::LoadBalancingAlgorithm::LeastConnections,
"ip_hash" => crate::LoadBalancingAlgorithm::IpHash,
"random" => crate::LoadBalancingAlgorithm::Random,
_ => crate::LoadBalancingAlgorithm::RoundRobin,
})
.unwrap_or(crate::LoadBalancingAlgorithm::RoundRobin),
sticky_session: None,
health_check: None,
connection_pool: crate::ConnectionPoolConfig::default(),
timeouts: crate::UpstreamTimeouts::default(),
tls: None,
http_version: crate::HttpVersionConfig::default(),
},
))
}
pub(super) fn parse_agent(node: &KdlNode) -> Result<AgentConfig> {
let id = get_first_arg_string(node).ok_or_else(|| anyhow!("Agent requires an ID"))?;
let type_str = get_string_entry(node, "type").unwrap_or_else(|| "custom".to_string());
let agent_type = match type_str.as_str() {
"auth" => crate::AgentType::Auth,
"rate_limit" => crate::AgentType::RateLimit,
"waf" => crate::AgentType::Waf,
other => crate::AgentType::Custom(other.to_string()),
};
let socket_path = get_string_entry(node, "socket-path")
.unwrap_or_else(|| format!("/var/run/zentinel/{}.sock", id));
let transport = crate::AgentTransport::UnixSocket {
path: PathBuf::from(socket_path),
};
Ok(AgentConfig {
id,
agent_type,
transport,
events: vec![crate::AgentEvent::RequestHeaders],
pool: None,
timeout_ms: get_int_entry(node, "timeout-ms")
.map(|v| v as u64)
.unwrap_or(100),
failure_mode: match get_string_entry(node, "failure-mode").as_deref() {
Some("closed") => crate::FailureMode::Closed,
_ => crate::FailureMode::Open,
},
max_request_body_bytes: get_int_entry(node, "max-request-body-bytes").map(|v| v as usize),
max_response_body_bytes: None,
circuit_breaker: None,
request_body_mode: Default::default(),
response_body_mode: Default::default(),
chunk_timeout_ms: 5000,
config: None,
max_concurrent_calls: get_int_entry(node, "max-concurrent-calls")
.map(|v| v as usize)
.unwrap_or(100),
})
}
pub(super) fn parse_waf(node: &KdlNode) -> Result<WafConfig> {
let engine = match get_string_entry(node, "engine").as_deref() {
Some("modsecurity") | None => crate::WafEngine::ModSecurity,
Some("coraza") => crate::WafEngine::Coraza,
Some(other) => crate::WafEngine::Custom(other.to_string()),
};
Ok(WafConfig {
engine,
ruleset: crate::WafRuleset {
crs_version: get_string_entry(node, "crs-version").unwrap_or_default(),
paranoia_level: get_int_entry(node, "paranoia-level")
.map(|v| v as u8)
.unwrap_or(1),
anomaly_threshold: get_int_entry(node, "anomaly-threshold")
.map(|v| v as u32)
.unwrap_or(5),
custom_rules_dir: get_string_entry(node, "custom-rules-dir").map(PathBuf::from),
exclusions: Vec::new(),
},
mode: match get_string_entry(node, "mode").as_deref() {
Some("detection") => crate::WafMode::Detection,
_ => crate::WafMode::Prevention,
},
audit_log: get_bool_entry(node, "audit-log").unwrap_or(true),
body_inspection: crate::BodyInspectionPolicy::default(),
})
}
pub(super) fn parse_limits(node: &KdlNode) -> Result<Limits> {
let defaults = Limits::default();
Ok(Limits {
max_header_size_bytes: get_int_entry(node, "max-header-size-bytes")
.or_else(|| get_int_entry(node, "max-header-size"))
.map(|v| v as usize)
.unwrap_or(defaults.max_header_size_bytes),
max_header_count: get_int_entry(node, "max-header-count")
.map(|v| v as usize)
.unwrap_or(defaults.max_header_count),
max_header_name_bytes: get_int_entry(node, "max-header-name-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_header_name_bytes),
max_header_value_bytes: get_int_entry(node, "max-header-value-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_header_value_bytes),
max_body_size_bytes: get_int_entry(node, "max-body-size-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_body_size_bytes),
max_body_buffer_bytes: get_int_entry(node, "max-body-buffer-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_body_buffer_bytes),
max_body_inspection_bytes: get_int_entry(node, "max-body-inspection-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_body_inspection_bytes),
max_decompression_ratio: defaults.max_decompression_ratio,
max_decompressed_size_bytes: get_int_entry(node, "max-decompressed-size-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_decompressed_size_bytes),
max_connections_per_client: get_int_entry(node, "max-connections-per-client")
.or_else(|| get_int_entry(node, "max-connections"))
.map(|v| v as usize)
.unwrap_or(defaults.max_connections_per_client),
max_connections_per_route: get_int_entry(node, "max-connections-per-route")
.map(|v| v as usize)
.unwrap_or(defaults.max_connections_per_route),
max_total_connections: get_int_entry(node, "max-total-connections")
.map(|v| v as usize)
.unwrap_or(defaults.max_total_connections),
max_idle_connections_per_upstream: get_int_entry(node, "max-idle-connections-per-upstream")
.map(|v| v as usize)
.unwrap_or(defaults.max_idle_connections_per_upstream),
max_in_flight_requests: get_int_entry(node, "max-in-flight-requests")
.map(|v| v as usize)
.unwrap_or(defaults.max_in_flight_requests),
max_in_flight_requests_per_worker: get_int_entry(node, "max-in-flight-requests-per-worker")
.map(|v| v as usize)
.unwrap_or(defaults.max_in_flight_requests_per_worker),
max_queued_requests: get_int_entry(node, "max-queued-requests")
.map(|v| v as usize)
.unwrap_or(defaults.max_queued_requests),
max_agent_queue_depth: get_int_entry(node, "max-agent-queue-depth")
.map(|v| v as usize)
.unwrap_or(defaults.max_agent_queue_depth),
max_agent_body_bytes: get_int_entry(node, "max-agent-body-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_agent_body_bytes),
max_agent_response_bytes: get_int_entry(node, "max-agent-response-bytes")
.map(|v| v as usize)
.unwrap_or(defaults.max_agent_response_bytes),
max_requests_per_second_global: get_int_entry(node, "max-requests-per-second-global")
.map(|v| v as u32),
max_requests_per_second_per_client: get_int_entry(
node,
"max-requests-per-second-per-client",
)
.map(|v| v as u32),
max_requests_per_second_per_route: get_int_entry(node, "max-requests-per-second-per-route")
.map(|v| v as u32),
max_memory_bytes: get_int_entry(node, "max-memory-bytes").map(|v| v as usize),
max_memory_percent: None,
})
}
pub(super) fn parse_observability(node: &KdlNode) -> Result<ObservabilityConfig> {
let mut config = ObservabilityConfig::default();
if let Some(children) = node.children() {
if let Some(metrics_node) = children.get("metrics") {
config.metrics.enabled = get_bool_entry(metrics_node, "enabled").unwrap_or(true);
if let Some(addr) = get_string_entry(metrics_node, "address") {
config.metrics.address = addr;
}
if let Some(path) = get_string_entry(metrics_node, "path") {
config.metrics.path = path;
}
}
if let Some(logging_node) = children.get("logging") {
if let Some(level) = get_string_entry(logging_node, "level") {
config.logging.level = level;
}
if let Some(format) = get_string_entry(logging_node, "format") {
config.logging.format = format;
}
}
}
Ok(config)
}
pub(super) fn parse_namespace(node: &KdlNode) -> Result<NamespaceConfig> {
let id =
get_first_arg_string(node).ok_or_else(|| anyhow!("namespace requires an ID argument"))?;
if id.contains(':') {
return Err(anyhow!(
"Namespace ID '{}' cannot contain ':' character (reserved for qualified names)",
id
));
}
let mut namespace = NamespaceConfig::new(id);
if let Some(children) = node.children() {
for child in children.nodes() {
match child.name().value() {
"limits" => {
namespace.limits = Some(parse_limits(child)?);
}
"upstream" => {
let (name, upstream) = parse_upstream(child)?;
namespace.upstreams.insert(name, upstream);
}
"route" => {
namespace.routes.push(parse_route(child)?);
}
"agent" => {
namespace.agents.push(parse_agent(child)?);
}
"listener" => {
namespace.listeners.push(parse_listener(child)?);
}
"service" => {
namespace.services.push(parse_service(child)?);
}
"exports" => {
namespace.exports = parse_exports(child)?;
}
_ => {
tracing::debug!(
"Ignoring unknown node in namespace: {}",
child.name().value()
);
}
}
}
}
Ok(namespace)
}
pub(super) fn parse_service(node: &KdlNode) -> Result<ServiceConfig> {
let id =
get_first_arg_string(node).ok_or_else(|| anyhow!("service requires an ID argument"))?;
if id.contains(':') {
return Err(anyhow!(
"Service ID '{}' cannot contain ':' character (reserved for qualified names)",
id
));
}
let mut service = ServiceConfig::new(id);
if let Some(children) = node.children() {
for child in children.nodes() {
match child.name().value() {
"limits" => {
service.limits = Some(parse_limits(child)?);
}
"upstream" => {
let (name, upstream) = parse_upstream(child)?;
service.upstreams.insert(name, upstream);
}
"route" => {
service.routes.push(parse_route(child)?);
}
"agent" => {
service.agents.push(parse_agent(child)?);
}
"listener" => {
service.listener = Some(parse_listener(child)?);
}
_ => {
tracing::debug!("Ignoring unknown node in service: {}", child.name().value());
}
}
}
}
Ok(service)
}
fn parse_exports(node: &KdlNode) -> Result<ExportConfig> {
let mut exports = ExportConfig::default();
if let Some(children) = node.children() {
for child in children.nodes() {
match child.name().value() {
"upstreams" => {
for entry in child.entries() {
if let Some(name) = entry.value().as_string() {
exports.upstreams.push(name.to_string());
}
}
}
"agents" => {
for entry in child.entries() {
if let Some(name) = entry.value().as_string() {
exports.agents.push(name.to_string());
}
}
}
"filters" => {
for entry in child.entries() {
if let Some(name) = entry.value().as_string() {
exports.filters.push(name.to_string());
}
}
}
_ => {
tracing::debug!("Ignoring unknown node in exports: {}", child.name().value());
}
}
}
}
Ok(exports)
}