use crate::error::ZerobusError;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct OtlpConfig {
pub endpoint: Option<String>,
#[serde(default = "default_log_level")]
pub log_level: String,
#[serde(flatten)]
pub extra: std::collections::HashMap<String, serde_json::Value>,
}
fn default_log_level() -> String {
"info".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OtlpSdkConfig {
pub endpoint: Option<String>,
pub output_dir: Option<PathBuf>,
#[serde(default = "default_write_interval")]
pub write_interval_secs: u64,
#[serde(default = "default_log_level")]
pub log_level: String,
}
fn default_write_interval() -> u64 {
5
}
impl Default for OtlpSdkConfig {
fn default() -> Self {
Self {
endpoint: None,
output_dir: None,
write_interval_secs: 5,
log_level: "info".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct WrapperConfiguration {
pub zerobus_endpoint: String,
pub unity_catalog_url: Option<String>,
pub client_id: Option<SecretString>,
pub client_secret: Option<SecretString>,
pub table_name: String,
pub observability_enabled: bool,
pub observability_config: Option<OtlpSdkConfig>,
pub debug_enabled: bool,
pub debug_arrow_enabled: bool,
pub debug_protobuf_enabled: bool,
pub debug_output_dir: Option<PathBuf>,
pub debug_flush_interval_secs: u64,
pub debug_max_file_size: Option<u64>,
pub debug_max_files_retained: Option<usize>,
pub retry_max_attempts: u32,
pub retry_base_delay_ms: u64,
pub retry_max_delay_ms: u64,
pub zerobus_writer_disabled: bool,
}
impl WrapperConfiguration {
pub fn new(endpoint: String, table_name: String) -> Self {
Self {
zerobus_endpoint: endpoint,
table_name,
unity_catalog_url: None,
client_id: None,
client_secret: None,
observability_enabled: false,
observability_config: None,
debug_enabled: false,
debug_arrow_enabled: false,
debug_protobuf_enabled: false,
debug_output_dir: None,
debug_flush_interval_secs: 5,
debug_max_file_size: None,
debug_max_files_retained: Some(10),
retry_max_attempts: 5,
retry_base_delay_ms: 100,
retry_max_delay_ms: 30000,
zerobus_writer_disabled: false,
}
}
pub fn with_credentials(mut self, client_id: String, client_secret: String) -> Self {
self.client_id = Some(SecretString::new(client_id));
self.client_secret = Some(SecretString::new(client_secret));
self
}
pub fn with_unity_catalog(mut self, url: String) -> Self {
self.unity_catalog_url = Some(url);
self
}
pub fn with_observability(mut self, config: OtlpSdkConfig) -> Self {
self.observability_enabled = true;
self.observability_config = Some(config);
self
}
pub fn with_debug_output(mut self, output_dir: PathBuf) -> Self {
self.debug_enabled = true;
self.debug_output_dir = Some(output_dir);
self
}
pub fn with_debug_flush_interval_secs(mut self, interval_secs: u64) -> Self {
self.debug_flush_interval_secs = interval_secs;
self
}
pub fn with_debug_max_file_size(mut self, max_size: Option<u64>) -> Self {
self.debug_max_file_size = max_size;
self
}
pub fn with_debug_arrow_enabled(mut self, enabled: bool) -> Self {
self.debug_arrow_enabled = enabled;
self
}
pub fn with_debug_protobuf_enabled(mut self, enabled: bool) -> Self {
self.debug_protobuf_enabled = enabled;
self
}
pub fn with_debug_max_files_retained(mut self, max_files: Option<usize>) -> Self {
self.debug_max_files_retained = max_files;
self
}
pub fn with_retry_config(
mut self,
max_attempts: u32,
base_delay_ms: u64,
max_delay_ms: u64,
) -> Self {
self.retry_max_attempts = max_attempts;
self.retry_base_delay_ms = base_delay_ms;
self.retry_max_delay_ms = max_delay_ms;
self
}
pub fn with_zerobus_writer_disabled(mut self, disabled: bool) -> Self {
self.zerobus_writer_disabled = disabled;
self
}
pub fn validate(&self) -> Result<(), ZerobusError> {
if !self.zerobus_endpoint.starts_with("https://")
&& !self.zerobus_endpoint.starts_with("http://")
{
return Err(ZerobusError::ConfigurationError(format!(
"zerobus_endpoint must start with 'https://' or 'http://', got: '{}'",
self.zerobus_endpoint
)));
}
let parts: Vec<&str> = self.table_name.split('.').collect();
let num_parts = parts.len();
if num_parts > 3 {
return Err(ZerobusError::ConfigurationError(format!(
"table_name has too many parts ({}). Must be in format 'table', 'schema.table', or 'catalog.schema.table'. Got: '{}'",
num_parts, self.table_name
)));
}
let get_part_name = |idx: usize, total: usize| -> &'static str {
match total {
1 => match idx {
0 => "table",
_ => "part",
},
2 => match idx {
0 => "schema",
1 => "table",
_ => "part",
},
3 => match idx {
0 => "catalog",
1 => "schema",
2 => "table",
_ => "part",
},
_ => "part",
}
};
for (idx, part) in parts.iter().enumerate() {
let part_name = get_part_name(idx, num_parts);
if part.is_empty() {
return Err(ZerobusError::ConfigurationError(format!(
"table_name {} part cannot be empty. Got: '{}'",
part_name, self.table_name
)));
}
if !part.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
return Err(ZerobusError::ConfigurationError(format!(
"table_name {} part '{}' must contain only ASCII letters, digits, and underscores (Zerobus requirement). Got: '{}'",
part_name, part, self.table_name
)));
}
}
let any_debug_enabled =
self.debug_arrow_enabled || self.debug_protobuf_enabled || self.debug_enabled;
if any_debug_enabled && self.debug_output_dir.is_none() {
return Err(ZerobusError::ConfigurationError(
"debug_output_dir is required when any debug format is enabled".to_string(),
));
}
if self.zerobus_writer_disabled && !any_debug_enabled {
return Err(ZerobusError::ConfigurationError(
"At least one debug format must be enabled when zerobus_writer_disabled is true. Use with_debug_arrow_enabled() or with_debug_protobuf_enabled() to enable debug output.".to_string(),
));
}
if self.retry_max_attempts == 0 {
return Err(ZerobusError::ConfigurationError(
"retry_max_attempts must be > 0".to_string(),
));
}
if self.debug_flush_interval_secs == 0 {
return Err(ZerobusError::ConfigurationError(
"debug_flush_interval_secs must be > 0".to_string(),
));
}
if self.retry_max_delay_ms < self.retry_base_delay_ms {
return Err(ZerobusError::ConfigurationError(format!(
"retry_max_delay_ms ({}) must be >= retry_base_delay_ms ({})",
self.retry_max_delay_ms, self.retry_base_delay_ms
)));
}
Ok(())
}
}
impl OtlpSdkConfig {
pub fn validate(&self) -> Result<(), ZerobusError> {
if let Some(endpoint) = &self.endpoint {
if !endpoint.starts_with("https://") && !endpoint.starts_with("http://") {
return Err(ZerobusError::ConfigurationError(format!(
"endpoint must start with 'https://' or 'http://', got: '{}'",
endpoint
)));
}
}
if let Some(output_dir) = &self.output_dir {
if output_dir.as_os_str().is_empty() {
return Err(ZerobusError::ConfigurationError(
"output_dir must not be empty".to_string(),
));
}
}
if self.write_interval_secs == 0 {
return Err(ZerobusError::ConfigurationError(
"write_interval_secs must be > 0".to_string(),
));
}
let valid_levels = ["trace", "debug", "info", "warn", "error"];
if !valid_levels.contains(&self.log_level.to_lowercase().as_str()) {
return Err(ZerobusError::ConfigurationError(format!(
"log_level must be one of {:?}, got: '{}'",
valid_levels, self.log_level
)));
}
Ok(())
}
}