use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use crate::error::OtlpConfigError;
use secrecy::SecretString;
use url::Url;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ForwardingProtocol {
#[default]
Protobuf,
ArrowFlight,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ProtocolConfig {
#[serde(default = "default_protobuf_enabled")]
pub protobuf_enabled: bool,
#[serde(default = "default_protobuf_port")]
pub protobuf_port: u16,
#[serde(default = "default_arrow_flight_enabled")]
pub arrow_flight_enabled: bool,
#[serde(default = "default_arrow_flight_port")]
pub arrow_flight_port: u16,
#[serde(default = "default_sdk_extraction_enabled")]
pub sdk_extraction_enabled: bool,
}
impl Default for ProtocolConfig {
fn default() -> Self {
Self {
protobuf_enabled: default_protobuf_enabled(),
protobuf_port: default_protobuf_port(),
arrow_flight_enabled: default_arrow_flight_enabled(),
arrow_flight_port: default_arrow_flight_port(),
sdk_extraction_enabled: default_sdk_extraction_enabled(),
}
}
}
impl ProtocolConfig {
pub fn validate(&self) -> Result<(), OtlpConfigError> {
if !self.protobuf_enabled && !self.arrow_flight_enabled {
return Err(OtlpConfigError::ValidationFailed(
"At least one protocol must be enabled".to_string(),
));
}
if self.protobuf_port == 0 {
return Err(OtlpConfigError::ValidationFailed(
"Protobuf port must be between 1 and 65535".to_string(),
));
}
if self.arrow_flight_port == 0 {
return Err(OtlpConfigError::ValidationFailed(
"Arrow Flight port must be between 1 and 65535".to_string(),
));
}
if self.protobuf_enabled
&& self.arrow_flight_enabled
&& self.protobuf_port == self.arrow_flight_port
{
return Err(OtlpConfigError::ValidationFailed(
"Protobuf and Arrow Flight ports must be different when both protocols are enabled"
.to_string(),
));
}
Ok(())
}
}
fn default_protobuf_enabled() -> bool {
true
}
fn default_protobuf_port() -> u16 {
4317
}
fn default_arrow_flight_enabled() -> bool {
true
}
fn default_arrow_flight_port() -> u16 {
4318
}
fn default_sdk_extraction_enabled() -> bool {
true
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DashboardConfig {
#[serde(default = "default_dashboard_enabled")]
pub enabled: bool,
#[serde(default = "default_dashboard_port")]
pub port: u16,
#[serde(default = "default_dashboard_static_dir")]
pub static_dir: PathBuf,
#[serde(default = "default_dashboard_bind_address")]
pub bind_address: String,
#[serde(default)]
pub x_frame_options: Option<String>,
}
impl Default for DashboardConfig {
fn default() -> Self {
Self {
enabled: default_dashboard_enabled(),
port: default_dashboard_port(),
static_dir: default_dashboard_static_dir(),
bind_address: default_dashboard_bind_address(),
x_frame_options: None, }
}
}
impl DashboardConfig {
pub fn validate(&self) -> Result<(), OtlpConfigError> {
if self.enabled {
if self.port == 0 {
return Err(OtlpConfigError::ValidationFailed(
"Dashboard port must be between 1 and 65535".to_string(),
));
}
if self.port == 4317 || self.port == 4318 {
return Err(OtlpConfigError::ValidationFailed(
"Dashboard port conflicts with gRPC port (4317 or 4318)".to_string(),
));
}
if !self.bind_address.is_empty() {
if self.bind_address.parse::<std::net::IpAddr>().is_err() {
return Err(OtlpConfigError::ValidationFailed(format!(
"Dashboard bind_address must be a valid IP address: {}",
self.bind_address
)));
}
}
if !self.static_dir.exists() {
return Err(OtlpConfigError::InvalidOutputDir(format!(
"Dashboard static directory does not exist: {}",
self.static_dir.display()
)));
}
if !self.static_dir.is_dir() {
return Err(OtlpConfigError::InvalidOutputDir(format!(
"Dashboard static directory is not a directory: {}",
self.static_dir.display()
)));
}
if let Some(ref xfo) = self.x_frame_options
&& xfo != "DENY"
&& xfo != "SAMEORIGIN"
{
return Err(OtlpConfigError::ValidationFailed(format!(
"x_frame_options must be 'DENY' or 'SAMEORIGIN' (got: {})",
xfo
)));
}
}
Ok(())
}
}
fn default_dashboard_enabled() -> bool {
false
}
fn default_dashboard_port() -> u16 {
8080
}
fn default_dashboard_static_dir() -> PathBuf {
PathBuf::from("./dashboard/dist")
}
fn default_dashboard_bind_address() -> String {
"127.0.0.1".to_string()
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
#[serde(default = "default_output_dir")]
pub output_dir: PathBuf,
#[serde(default = "default_write_interval_secs")]
pub write_interval_secs: u64,
#[serde(default = "default_trace_cleanup_interval_secs")]
pub trace_cleanup_interval_secs: u64,
#[serde(default = "default_metric_cleanup_interval_secs")]
pub metric_cleanup_interval_secs: u64,
#[serde(default = "default_max_trace_buffer_size")]
pub max_trace_buffer_size: usize,
#[serde(default = "default_max_metric_buffer_size")]
pub max_metric_buffer_size: usize,
#[serde(default)]
pub protocols: ProtocolConfig,
#[serde(default)]
pub forwarding: Option<ForwardingConfig>,
#[serde(default)]
pub dashboard: DashboardConfig,
#[serde(skip)]
pub metric_temporality: Option<opentelemetry_sdk::metrics::Temporality>,
}
impl Default for Config {
fn default() -> Self {
Self {
output_dir: default_output_dir(),
write_interval_secs: default_write_interval_secs(),
trace_cleanup_interval_secs: default_trace_cleanup_interval_secs(),
metric_cleanup_interval_secs: default_metric_cleanup_interval_secs(),
max_trace_buffer_size: default_max_trace_buffer_size(),
max_metric_buffer_size: default_max_metric_buffer_size(),
protocols: ProtocolConfig::default(),
forwarding: None,
dashboard: DashboardConfig::default(),
metric_temporality: None, }
}
}
impl Config {
pub fn validate(&self) -> Result<(), OtlpConfigError> {
if self.output_dir.to_string_lossy().is_empty() {
return Err(OtlpConfigError::InvalidOutputDir(
"Output directory cannot be empty".to_string(),
));
}
let path_str = self.output_dir.to_string_lossy();
if path_str.len() > 4096 {
return Err(OtlpConfigError::InvalidOutputDir(format!(
"Output directory path is too long ({} characters, max 4096)",
path_str.len()
)));
}
if path_str.contains('\0') {
return Err(OtlpConfigError::InvalidOutputDir(
"Output directory path cannot contain null bytes".to_string(),
));
}
if self.write_interval_secs == 0 {
return Err(OtlpConfigError::InvalidInterval(
"Write interval must be greater than 0".to_string(),
));
}
if self.write_interval_secs > 3600 {
return Err(OtlpConfigError::InvalidInterval(
"Write interval must be less than 3600 seconds (1 hour)".to_string(),
));
}
if self.trace_cleanup_interval_secs == 0 {
return Err(OtlpConfigError::InvalidInterval(
"Trace cleanup interval must be greater than 0".to_string(),
));
}
if self.metric_cleanup_interval_secs == 0 {
return Err(OtlpConfigError::InvalidInterval(
"Metric cleanup interval must be greater than 0".to_string(),
));
}
if self.max_trace_buffer_size == 0 || self.max_trace_buffer_size > 1_000_000 {
return Err(OtlpConfigError::ValidationFailed(format!(
"max_trace_buffer_size must be between 1 and 1,000,000 (got {})",
self.max_trace_buffer_size
)));
}
if self.max_metric_buffer_size == 0 || self.max_metric_buffer_size > 1_000_000 {
return Err(OtlpConfigError::ValidationFailed(format!(
"max_metric_buffer_size must be between 1 and 1,000,000 (got {})",
self.max_metric_buffer_size
)));
}
if self.trace_cleanup_interval_secs > 86400 {
return Err(OtlpConfigError::InvalidInterval(
"Trace cleanup interval must be less than 86400 seconds (1 day)".to_string(),
));
}
if self.metric_cleanup_interval_secs > 86400 {
return Err(OtlpConfigError::InvalidInterval(
"Metric cleanup interval must be less than 86400 seconds (1 day)".to_string(),
));
}
if self.trace_cleanup_interval_secs < 60 {
return Err(OtlpConfigError::InvalidInterval(
"Trace cleanup interval must be at least 60 seconds".to_string(),
));
}
if self.metric_cleanup_interval_secs < 60 {
return Err(OtlpConfigError::InvalidInterval(
"Metric cleanup interval must be at least 60 seconds".to_string(),
));
}
self.protocols.validate()?;
if let Some(ref forwarding) = self.forwarding {
forwarding.validate()?;
}
self.dashboard.validate()?;
Ok(())
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct ForwardingConfig {
#[serde(default)]
pub enabled: bool,
pub endpoint_url: Option<String>,
#[serde(default)]
pub protocol: ForwardingProtocol,
#[serde(default)]
pub authentication: Option<AuthConfig>,
}
impl ForwardingConfig {
pub fn validate(&self) -> Result<(), OtlpConfigError> {
if self.enabled {
if let Some(ref url) = self.endpoint_url {
if url.is_empty() {
return Err(OtlpConfigError::InvalidUrl(
"Endpoint URL cannot be empty when forwarding is enabled".to_string(),
));
}
let parsed_url = Url::parse(url).map_err(|e| {
OtlpConfigError::InvalidUrl(format!(
"Invalid endpoint URL format: {} (error: {})",
url, e
))
})?;
match parsed_url.scheme() {
"http" | "https" => {}
scheme => {
return Err(OtlpConfigError::InvalidUrl(format!(
"Endpoint URL must use http or https scheme (got: {}): {}",
scheme, url
)));
}
}
if parsed_url.host().is_none() {
return Err(OtlpConfigError::InvalidUrl(format!(
"Endpoint URL must include a host: {}",
url
)));
}
} else {
return Err(OtlpConfigError::MissingRequiredField(
"endpoint_url is required when forwarding is enabled".to_string(),
));
}
}
Ok(())
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AuthConfig {
pub auth_type: String,
#[serde(skip_serializing, deserialize_with = "deserialize_secret_credentials")]
pub credentials: HashMap<String, SecretString>,
}
fn deserialize_secret_credentials<'de, D>(
deserializer: D,
) -> Result<HashMap<String, SecretString>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
let map: HashMap<String, String> = HashMap::deserialize(deserializer)?;
Ok(map
.into_iter()
.map(|(k, v)| (k, SecretString::new(v)))
.collect())
}
impl AuthConfig {
pub fn validate(&self) -> Result<(), OtlpConfigError> {
if self.auth_type.is_empty() {
return Err(OtlpConfigError::ValidationFailed(
"Authentication type cannot be empty".to_string(),
));
}
match self.auth_type.as_str() {
"api_key" => {
if !self.credentials.contains_key("key") {
return Err(OtlpConfigError::MissingRequiredField(
"key required for api_key authentication".to_string(),
));
}
}
"bearer_token" => {
if !self.credentials.contains_key("token") {
return Err(OtlpConfigError::MissingRequiredField(
"token required for bearer_token authentication".to_string(),
));
}
}
"basic" => {
if !self.credentials.contains_key("username")
|| !self.credentials.contains_key("password")
{
return Err(OtlpConfigError::MissingRequiredField(
"username and password required for basic auth".to_string(),
));
}
}
_ => {
return Err(OtlpConfigError::ValidationFailed(format!(
"Unsupported authentication type: {}",
self.auth_type
)));
}
}
Ok(())
}
}
#[derive(Debug, Default)]
pub struct ConfigBuilder {
config: Config,
}
impl ConfigBuilder {
pub fn new() -> Self {
Self {
config: Config::default(),
}
}
pub fn output_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.config.output_dir = dir.into();
self
}
pub fn write_interval_secs(mut self, secs: u64) -> Self {
self.config.write_interval_secs = secs;
self
}
pub fn trace_cleanup_interval_secs(mut self, secs: u64) -> Self {
self.config.trace_cleanup_interval_secs = secs;
self
}
pub fn metric_cleanup_interval_secs(mut self, secs: u64) -> Self {
self.config.metric_cleanup_interval_secs = secs;
self
}
pub fn max_trace_buffer_size(mut self, size: usize) -> Self {
self.config.max_trace_buffer_size = size;
self
}
pub fn max_metric_buffer_size(mut self, size: usize) -> Self {
self.config.max_metric_buffer_size = size;
self
}
pub fn with_temporality(
mut self,
temporality: opentelemetry_sdk::metrics::Temporality,
) -> Self {
self.config.metric_temporality = Some(temporality);
self
}
pub fn protocols(mut self, protocols: ProtocolConfig) -> Self {
self.config.protocols = protocols;
self
}
pub fn protobuf_enabled(mut self, enabled: bool) -> Self {
self.config.protocols.protobuf_enabled = enabled;
self
}
pub fn protobuf_port(mut self, port: u16) -> Self {
self.config.protocols.protobuf_port = port;
self
}
pub fn arrow_flight_enabled(mut self, enabled: bool) -> Self {
self.config.protocols.arrow_flight_enabled = enabled;
self
}
pub fn arrow_flight_port(mut self, port: u16) -> Self {
self.config.protocols.arrow_flight_port = port;
self
}
pub fn enable_forwarding(mut self, forwarding: ForwardingConfig) -> Self {
self.config.forwarding = Some(forwarding);
self
}
pub fn forwarding(mut self, forwarding: Option<ForwardingConfig>) -> Self {
self.config.forwarding = forwarding;
self
}
pub fn dashboard(mut self, dashboard: DashboardConfig) -> Self {
self.config.dashboard = dashboard;
self
}
pub fn dashboard_enabled(mut self, enabled: bool) -> Self {
self.config.dashboard.enabled = enabled;
self
}
pub fn dashboard_port(mut self, port: u16) -> Self {
self.config.dashboard.port = port;
self
}
pub fn dashboard_static_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.config.dashboard.static_dir = dir.into();
self
}
pub fn build(self) -> Result<Config, OtlpConfigError> {
self.config.validate()?;
Ok(self.config)
}
}
fn default_output_dir() -> PathBuf {
PathBuf::from("./output_dir")
}
fn default_write_interval_secs() -> u64 {
5
}
fn default_trace_cleanup_interval_secs() -> u64 {
600
}
fn default_metric_cleanup_interval_secs() -> u64 {
3600
}
fn default_max_trace_buffer_size() -> usize {
10000
}
fn default_max_metric_buffer_size() -> usize {
10000
}