use crate::error::ZerobusError;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OtlpConfig {
pub endpoint: Option<String>,
#[serde(default = "default_log_level")]
pub log_level: String,
#[serde(flatten)]
pub extra: std::collections::HashMap<String, serde_json::Value>,
}
fn default_log_level() -> String {
"info".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OtlpSdkConfig {
pub endpoint: Option<String>,
pub output_dir: Option<PathBuf>,
#[serde(default = "default_write_interval")]
pub write_interval_secs: u64,
#[serde(default = "default_log_level")]
pub log_level: String,
}
fn default_write_interval() -> u64 {
5
}
impl Default for OtlpSdkConfig {
fn default() -> Self {
Self {
endpoint: None,
output_dir: None,
write_interval_secs: 5,
log_level: "info".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct WrapperConfiguration {
pub zerobus_endpoint: String,
pub unity_catalog_url: Option<String>,
pub client_id: Option<SecretString>,
pub client_secret: Option<SecretString>,
pub table_name: String,
pub observability_enabled: bool,
pub observability_config: Option<OtlpSdkConfig>,
pub debug_enabled: bool,
pub debug_output_dir: Option<PathBuf>,
pub debug_flush_interval_secs: u64,
pub debug_max_file_size: Option<u64>,
pub retry_max_attempts: u32,
pub retry_base_delay_ms: u64,
pub retry_max_delay_ms: u64,
pub zerobus_writer_disabled: bool,
}
impl WrapperConfiguration {
pub fn new(endpoint: String, table_name: String) -> Self {
Self {
zerobus_endpoint: endpoint,
table_name,
unity_catalog_url: None,
client_id: None,
client_secret: None,
observability_enabled: false,
observability_config: None,
debug_enabled: false,
debug_output_dir: None,
debug_flush_interval_secs: 5,
debug_max_file_size: None,
retry_max_attempts: 5,
retry_base_delay_ms: 100,
retry_max_delay_ms: 30000,
zerobus_writer_disabled: false,
}
}
pub fn with_credentials(mut self, client_id: String, client_secret: String) -> Self {
self.client_id = Some(SecretString::new(client_id));
self.client_secret = Some(SecretString::new(client_secret));
self
}
pub fn with_unity_catalog(mut self, url: String) -> Self {
self.unity_catalog_url = Some(url);
self
}
pub fn with_observability(mut self, config: OtlpSdkConfig) -> Self {
self.observability_enabled = true;
self.observability_config = Some(config);
self
}
pub fn with_debug_output(mut self, output_dir: PathBuf) -> Self {
self.debug_enabled = true;
self.debug_output_dir = Some(output_dir);
self
}
pub fn with_debug_flush_interval_secs(mut self, interval_secs: u64) -> Self {
self.debug_flush_interval_secs = interval_secs;
self
}
pub fn with_debug_max_file_size(mut self, max_size: Option<u64>) -> Self {
self.debug_max_file_size = max_size;
self
}
pub fn with_retry_config(
mut self,
max_attempts: u32,
base_delay_ms: u64,
max_delay_ms: u64,
) -> Self {
self.retry_max_attempts = max_attempts;
self.retry_base_delay_ms = base_delay_ms;
self.retry_max_delay_ms = max_delay_ms;
self
}
pub fn with_zerobus_writer_disabled(mut self, disabled: bool) -> Self {
self.zerobus_writer_disabled = disabled;
self
}
pub fn validate(&self) -> Result<(), ZerobusError> {
if !self.zerobus_endpoint.starts_with("https://")
&& !self.zerobus_endpoint.starts_with("http://")
{
return Err(ZerobusError::ConfigurationError(format!(
"zerobus_endpoint must start with 'https://' or 'http://', got: '{}'",
self.zerobus_endpoint
)));
}
if self.debug_enabled && self.debug_output_dir.is_none() {
return Err(ZerobusError::ConfigurationError(
"debug_output_dir is required when debug_enabled is true".to_string(),
));
}
if self.zerobus_writer_disabled && !self.debug_enabled {
return Err(ZerobusError::ConfigurationError(
"debug_enabled must be true when zerobus_writer_disabled is true. Use with_debug_output() to enable debug output.".to_string(),
));
}
if self.retry_max_attempts == 0 {
return Err(ZerobusError::ConfigurationError(
"retry_max_attempts must be > 0".to_string(),
));
}
if self.debug_flush_interval_secs == 0 {
return Err(ZerobusError::ConfigurationError(
"debug_flush_interval_secs must be > 0".to_string(),
));
}
if self.retry_max_delay_ms < self.retry_base_delay_ms {
return Err(ZerobusError::ConfigurationError(format!(
"retry_max_delay_ms ({}) must be >= retry_base_delay_ms ({})",
self.retry_max_delay_ms, self.retry_base_delay_ms
)));
}
Ok(())
}
}
impl OtlpSdkConfig {
pub fn validate(&self) -> Result<(), ZerobusError> {
if let Some(endpoint) = &self.endpoint {
if !endpoint.starts_with("https://") && !endpoint.starts_with("http://") {
return Err(ZerobusError::ConfigurationError(format!(
"endpoint must start with 'https://' or 'http://', got: '{}'",
endpoint
)));
}
}
if let Some(output_dir) = &self.output_dir {
if output_dir.as_os_str().is_empty() {
return Err(ZerobusError::ConfigurationError(
"output_dir must not be empty".to_string(),
));
}
}
if self.write_interval_secs == 0 {
return Err(ZerobusError::ConfigurationError(
"write_interval_secs must be > 0".to_string(),
));
}
let valid_levels = ["trace", "debug", "info", "warn", "error"];
if !valid_levels.contains(&self.log_level.to_lowercase().as_str()) {
return Err(ZerobusError::ConfigurationError(format!(
"log_level must be one of {:?}, got: '{}'",
valid_levels, self.log_level
)));
}
Ok(())
}
}