use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_zerobus_sdk_wrapper::{
OtlpSdkConfig, TransmissionResult, WrapperConfiguration, ZerobusError, ZerobusWrapper,
};
use std::sync::Arc;
#[test]
fn test_config_contract_required_fields() {
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
);
assert_eq!(config.zerobus_endpoint, "https://test.cloud.databricks.com");
assert_eq!(config.table_name, "test_table");
}
#[test]
fn test_config_contract_builder_methods() {
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_credentials("client_id".to_string(), "client_secret".to_string())
.with_unity_catalog("https://unity-catalog-url".to_string());
use secrecy::ExposeSecret;
assert_eq!(
config
.client_id
.as_ref()
.map(|s| s.expose_secret().as_str()),
Some("client_id")
);
assert_eq!(
config
.client_secret
.as_ref()
.map(|s| s.expose_secret().as_str()),
Some("client_secret")
);
assert_eq!(
config.unity_catalog_url,
Some("https://unity-catalog-url".to_string())
);
}
#[tokio::test]
async fn test_wrapper_new_contract() {
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_credentials("client_id".to_string(), "client_secret".to_string())
.with_unity_catalog("https://unity-catalog-url".to_string());
let result = config.validate();
assert!(result.is_ok());
let _wrapper_result = ZerobusWrapper::new(config).await;
}
#[tokio::test]
#[ignore] async fn test_send_batch_contract() {
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_credentials("client_id".to_string(), "client_secret".to_string())
.with_unity_catalog("https://unity-catalog-url".to_string());
let wrapper = ZerobusWrapper::new(config).await.unwrap();
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let id_array = Int64Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id_array), Arc::new(name_array)],
)
.unwrap();
let result: Result<TransmissionResult, ZerobusError> = wrapper.send_batch(batch).await;
match result {
Ok(transmission_result) => {
assert!(transmission_result.batch_size_bytes > 0);
if transmission_result.success {
assert!(transmission_result.error.is_none());
} else {
assert!(transmission_result.error.is_some());
}
}
Err(_) => {
}
}
}
#[test]
fn test_transmission_result_contract() {
let result = TransmissionResult {
success: true,
error: None,
attempts: 1,
latency_ms: Some(100),
batch_size_bytes: 1024,
failed_rows: None,
successful_rows: None,
total_rows: 0,
successful_count: 0,
failed_count: 0,
};
assert!(result.success);
assert!(result.error.is_none());
assert_eq!(result.attempts, 1);
assert_eq!(result.latency_ms, Some(100));
assert_eq!(result.batch_size_bytes, 1024);
}
#[test]
fn test_error_contract() {
let _config = ZerobusError::ConfigurationError("test".to_string());
let _auth = ZerobusError::AuthenticationError("test".to_string());
let _conn = ZerobusError::ConnectionError("test".to_string());
let _conv = ZerobusError::ConversionError("test".to_string());
let _trans = ZerobusError::TransmissionError("test".to_string());
let _retry = ZerobusError::RetryExhausted("test".to_string());
let _token = ZerobusError::TokenRefreshError("test".to_string());
}
#[tokio::test]
async fn test_wrapper_lifecycle_contract() {
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_credentials("client_id".to_string(), "client_secret".to_string())
.with_unity_catalog("https://unity-catalog-url".to_string());
let wrapper_result = ZerobusWrapper::new(config).await;
if let Ok(wrapper) = wrapper_result {
let flush_result: Result<(), ZerobusError> = wrapper.flush().await;
assert!(flush_result.is_ok() || flush_result.is_err());
let shutdown_result: Result<(), ZerobusError> = wrapper.shutdown().await;
assert!(shutdown_result.is_ok() || shutdown_result.is_err());
}
}
#[test]
fn test_observability_contract() {
use std::path::PathBuf;
let otlp_config = OtlpSdkConfig {
endpoint: Some("http://localhost:4317".to_string()),
output_dir: Some(PathBuf::from("/tmp/otlp")),
write_interval_secs: 5,
log_level: "info".to_string(),
};
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_observability(otlp_config);
assert!(config.observability_enabled);
assert!(config.observability_config.is_some());
}
#[test]
fn test_debug_output_contract() {
use std::path::PathBuf;
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_debug_output(PathBuf::from("/tmp/debug"));
assert!(config.debug_enabled);
assert_eq!(config.debug_output_dir, Some(PathBuf::from("/tmp/debug")));
}
#[test]
fn test_retry_config_contract() {
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_retry_config(10, 200, 60000);
assert_eq!(config.retry_max_attempts, 10);
assert_eq!(config.retry_base_delay_ms, 200);
assert_eq!(config.retry_max_delay_ms, 60000);
}