use std::net::SocketAddr;
use crate::raw::Raw;
use crate::{
AdminPassthroughConfig, CaptureTlsConfig, Config, ConfigError, DiagBaseline, EtcdConfig,
FanoutBodyEncoding, FanoutConfig, HeaderForwardingConfig, ObservabilityConfig,
PassthroughConfig, TlsConfig,
};
mod resolve_capture;
const DEFAULT_ADMIN_PREFIXES: &str = "/_cat/,/_cluster/,/_nodes/";
pub(crate) fn resolve(raw: &Raw) -> Result<Config, ConfigError> {
Ok(Config {
bind: socket_addr(raw, "bind", "127.0.0.1:8080")?,
grpc_bind: optional_socket_addr(raw, "grpc_bind")?,
upstream: string_or(raw, "upstream", "http://127.0.0.1:9200"),
index: string_or(raw, "index", "osproxy-shared"),
tokens: tokens(raw)?,
require_tls_for_mutation: !bool_or(raw, "allow_cleartext_mutation", false)?,
tls: tls(raw)?,
observability: observability(raw)?,
admin_passthrough: admin_passthrough(raw),
cursor_affinity_key: opt(raw, "cursor_affinity_key"),
passthrough: passthrough(raw)?,
header_forwarding: HeaderForwardingConfig {
enabled: bool_or(raw, "forward_client_headers", true)?,
deny: csv(raw, "forward_header_deny"),
},
capture: resolve_capture::capture(raw)?,
capture_default: bool_or(raw, "capture_default", false)?,
fanout: fanout(raw)?,
etcd: etcd(raw)?,
})
}
fn etcd(raw: &Raw) -> Result<Option<EtcdConfig>, ConfigError> {
let endpoints = csv(raw, "etcd_endpoints");
if endpoints.is_empty() {
if raw.get("etcd_directives_key").is_some() {
return Err(ConfigError::invalid(
"etcd_endpoints",
"set etcd_endpoints when configuring an etcd directive store",
));
}
return Ok(None);
}
Ok(Some(EtcdConfig {
endpoints,
directives_key: string_or(raw, "etcd_directives_key", "osproxy/directives"),
}))
}
fn fanout(raw: &Raw) -> Result<Option<FanoutConfig>, ConfigError> {
match (opt(raw, "fanout_kafka_brokers"), opt(raw, "fanout_topic")) {
(None, None) => {
if raw.get("fanout_kafka_ca").is_some() {
return Err(ConfigError::invalid(
"fanout_kafka_ca",
"set fanout_kafka_brokers and fanout_topic to enable fan-out",
));
}
Ok(None)
}
(Some(brokers), Some(topic)) => {
let brokers: Vec<String> = brokers
.split(',')
.map(str::trim)
.filter(|b| !b.is_empty())
.map(str::to_owned)
.collect();
if brokers.is_empty() {
return Err(ConfigError::invalid(
"fanout_kafka_brokers",
"expected at least one `host:port` broker",
));
}
Ok(Some(FanoutConfig {
brokers,
topic,
tls: fanout_tls(raw)?,
body_encoding: fanout_body_encoding(raw)?,
async_default: bool_or(raw, "fanout_async_default", false)?,
expand_delete_by_query: bool_or(raw, "fanout_expand_delete_by_query", false)?,
}))
}
_ => Err(ConfigError::invalid(
"fanout_kafka_brokers",
"set both fanout_kafka_brokers and fanout_topic, or neither",
)),
}
}
fn fanout_body_encoding(raw: &Raw) -> Result<FanoutBodyEncoding, ConfigError> {
match raw.get("fanout_body_encoding").map(str::trim) {
None => Ok(FanoutBodyEncoding::default()),
Some(v) if v.eq_ignore_ascii_case("cbor") => Ok(FanoutBodyEncoding::Cbor),
Some(v) if v.eq_ignore_ascii_case("json") => Ok(FanoutBodyEncoding::Json),
Some(_) => Err(ConfigError::invalid(
"fanout_body_encoding",
"expected `cbor` or `json`",
)),
}
}
fn fanout_tls(raw: &Raw) -> Result<Option<CaptureTlsConfig>, ConfigError> {
let client = match (
opt(raw, "fanout_kafka_client_cert"),
opt(raw, "fanout_kafka_client_key"),
) {
(None, None) => None,
(Some(cert), Some(key)) => Some((cert, key)),
_ => {
return Err(ConfigError::invalid(
"fanout_kafka_client_cert",
"set both fanout_kafka_client_cert and fanout_kafka_client_key, or neither",
))
}
};
let Some(ca_path) = opt(raw, "fanout_kafka_ca") else {
if client.is_some() {
return Err(ConfigError::invalid(
"fanout_kafka_ca",
"client-cert mTLS to the brokers requires fanout_kafka_ca",
));
}
return Ok(None);
};
let (client_cert_path, client_key_path) = match client {
Some((cert, key)) => (Some(cert), Some(key)),
None => (None, None),
};
Ok(Some(CaptureTlsConfig {
ca_path,
client_cert_path,
client_key_path,
}))
}
fn u64_or(raw: &Raw, key: &'static str, default: u64) -> Result<u64, ConfigError> {
match raw.get(key) {
None => Ok(default),
Some(value) => match value.trim().parse::<u64>() {
Ok(n) if n > 0 => Ok(n),
_ => Err(ConfigError::invalid(key, "expected a positive integer")),
},
}
}
fn passthrough(raw: &Raw) -> Result<Option<PassthroughConfig>, ConfigError> {
match (
opt(raw, "passthrough_cluster"),
opt(raw, "passthrough_endpoint"),
) {
(None, None) => Ok(None),
(Some(cluster), Some(endpoint)) => Ok(Some(PassthroughConfig {
cluster,
endpoint,
index_prefixes: csv(raw, "passthrough_indices"),
})),
_ => Err(ConfigError::invalid(
"passthrough_cluster",
"set both passthrough_cluster and passthrough_endpoint, or neither",
)),
}
}
fn csv(raw: &Raw, key: &'static str) -> Vec<String> {
raw.get(key)
.unwrap_or_default()
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_owned)
.collect()
}
fn opt(raw: &Raw, key: &'static str) -> Option<String> {
raw.get(key).map(str::to_owned)
}
fn string_or(raw: &Raw, key: &'static str, default: &str) -> String {
raw.get(key).unwrap_or(default).to_owned()
}
fn socket_addr(raw: &Raw, key: &'static str, default: &str) -> Result<SocketAddr, ConfigError> {
raw.get(key)
.unwrap_or(default)
.parse()
.map_err(|_| ConfigError::invalid(key, "expected a `host:port` socket address"))
}
fn optional_socket_addr(raw: &Raw, key: &'static str) -> Result<Option<SocketAddr>, ConfigError> {
match raw.get(key) {
Some(value) => value
.parse()
.map(Some)
.map_err(|_| ConfigError::invalid(key, "expected a `host:port` socket address")),
None => Ok(None),
}
}
fn bool_or(raw: &Raw, key: &'static str, default: bool) -> Result<bool, ConfigError> {
match raw.get(key) {
None => Ok(default),
Some(value) => match value.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "yes" => Ok(true),
"0" | "false" | "no" => Ok(false),
_ => Err(ConfigError::invalid(
key,
"expected a boolean (1/true/yes or 0/false/no)",
)),
},
}
}
fn tokens(raw: &Raw) -> Result<Vec<(String, String)>, ConfigError> {
let Some(spec) = raw.get("tokens") else {
return Ok(Vec::new());
};
let mut out = Vec::new();
for entry in spec.split(',').map(str::trim).filter(|e| !e.is_empty()) {
let (token, principal) = entry.split_once('=').ok_or_else(|| {
ConfigError::invalid(
"tokens",
format!("entry {entry:?} is not `token=principal`"),
)
})?;
let (token, principal) = (token.trim(), principal.trim());
if token.is_empty() || principal.is_empty() {
return Err(ConfigError::invalid(
"tokens",
format!("entry {entry:?} has an empty token or principal"),
));
}
out.push((token.to_owned(), principal.to_owned()));
}
Ok(out)
}
fn tls(raw: &Raw) -> Result<Option<TlsConfig>, ConfigError> {
match (opt(raw, "tls_cert"), opt(raw, "tls_key")) {
(None, None) => {
if raw.get("tls_client_ca").is_some() {
return Err(ConfigError::invalid(
"tls_client_ca",
"set tls_cert and tls_key before configuring a client CA (mTLS)",
));
}
Ok(None)
}
(Some(cert_path), Some(key_path)) => Ok(Some(TlsConfig {
cert_path,
key_path,
client_ca_path: opt(raw, "tls_client_ca"),
})),
_ => Err(ConfigError::invalid(
"tls_cert",
"set both tls_cert and tls_key, or neither",
)),
}
}
fn observability(raw: &Raw) -> Result<ObservabilityConfig, ConfigError> {
Ok(ObservabilityConfig {
log_requests: bool_or(raw, "log_requests", false)?,
otlp_endpoint: opt(raw, "otlp_endpoint"),
service_name: string_or(raw, "service_name", "osproxy"),
diag_baseline: diag_baseline(raw)?,
debug_directive_key: opt(raw, "debug_directive_key"),
directive_admin_token: opt(raw, "directive_admin_token"),
debug_endpoints: bool_or(raw, "debug_endpoints", true)?,
log_diagnostic_captures: bool_or(raw, "log_diagnostic_captures", false)?,
})
}
fn diag_baseline(raw: &Raw) -> Result<DiagBaseline, ConfigError> {
match raw.get("diag_baseline") {
None => Ok(DiagBaseline::Shape),
Some(value) => match value.to_ascii_lowercase().as_str() {
"off" => Ok(DiagBaseline::Off),
"shape" => Ok(DiagBaseline::Shape),
"shape-timing" => Ok(DiagBaseline::ShapeTiming),
"shape-rewrite-diff" => Ok(DiagBaseline::ShapeRewriteDiff),
_ => Err(ConfigError::invalid(
"diag_baseline",
"expected off|shape|shape-timing|shape-rewrite-diff",
)),
},
}
}
fn admin_passthrough(raw: &Raw) -> Option<AdminPassthroughConfig> {
let cluster = raw.get("admin_passthrough_cluster")?.to_owned();
let prefixes = raw
.get("admin_passthrough_prefixes")
.unwrap_or(DEFAULT_ADMIN_PREFIXES)
.split(',')
.map(str::trim)
.filter(|p| !p.is_empty())
.map(str::to_owned)
.collect();
Some(AdminPassthroughConfig {
cluster,
prefixes,
endpoint: opt(raw, "admin_passthrough_endpoint"),
})
}