use arrow::array::{Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_zerobus_sdk_wrapper::{
TransmissionResult, WrapperConfiguration, ZerobusError, ZerobusWrapper,
};
use std::sync::Arc;
fn create_test_record_batch() -> RecordBatch {
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("score", DataType::Float64, true),
]);
let id_array = Int64Array::from(vec![1, 2, 3, 4, 5]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve"]);
let score_array =
Float64Array::from(vec![Some(95.5), Some(87.0), None, Some(92.5), Some(88.0)]);
RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(id_array),
Arc::new(name_array),
Arc::new(score_array),
],
)
.expect("Failed to create test RecordBatch")
}
#[tokio::test]
#[ignore] async fn test_complete_user_journey() {
let config = WrapperConfiguration::new(
"https://test-workspace.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_credentials(
std::env::var("ZEROBUS_CLIENT_ID").unwrap_or_else(|_| "test_client_id".to_string()),
std::env::var("ZEROBUS_CLIENT_SECRET").unwrap_or_else(|_| "test_client_secret".to_string()),
)
.with_unity_catalog(
std::env::var("UNITY_CATALOG_URL")
.unwrap_or_else(|_| "https://test.cloud.databricks.com".to_string()),
)
.with_retry_config(3, 100, 1000);
let wrapper_result = ZerobusWrapper::new(config).await;
match wrapper_result {
Ok(wrapper) => {
let batch = create_test_record_batch();
assert_eq!(batch.num_rows(), 5);
assert_eq!(batch.num_columns(), 3);
let result: Result<TransmissionResult, ZerobusError> = wrapper.send_batch(batch).await;
match result {
Ok(transmission_result) => {
assert!(transmission_result.batch_size_bytes > 0);
assert!(transmission_result.attempts >= 1);
if transmission_result.success {
assert!(transmission_result.error.is_none());
assert!(transmission_result.latency_ms.is_some());
println!(
"✅ Batch sent successfully! Latency: {}ms, Size: {} bytes",
transmission_result.latency_ms.unwrap_or(0),
transmission_result.batch_size_bytes
);
} else {
assert!(transmission_result.error.is_some());
println!("❌ Transmission failed: {:?}", transmission_result.error);
}
}
Err(e) => {
println!("⚠️ Transmission error (expected in test): {}", e);
}
}
let shutdown_result = wrapper.shutdown().await;
assert!(shutdown_result.is_ok() || shutdown_result.is_err());
}
Err(e) => {
println!(
"⚠️ Wrapper initialization failed (expected in test): {}",
e
);
}
}
}
#[test]
fn test_user_journey_configuration_validation() {
let invalid_config = WrapperConfiguration::new(
"invalid-endpoint".to_string(), "test_table".to_string(),
);
let validation_result = invalid_config.validate();
assert!(validation_result.is_err());
let valid_config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
);
let validation_result = valid_config.validate();
assert!(validation_result.is_ok());
}
#[tokio::test]
async fn test_user_journey_error_handling() {
let config = WrapperConfiguration::new("invalid".to_string(), "test_table".to_string());
assert!(config.validate().is_err());
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
);
let wrapper_result = ZerobusWrapper::new(config).await;
assert!(wrapper_result.is_err());
}
#[test]
fn test_user_journey_record_batch_creation() {
let batch = create_test_record_batch();
assert_eq!(batch.num_rows(), 5);
assert_eq!(batch.num_columns(), 3);
let schema = batch.schema();
assert_eq!(schema.fields().len(), 3);
assert_eq!(schema.field(0).name(), "id");
assert_eq!(schema.field(1).name(), "name");
assert_eq!(schema.field(2).name(), "score");
let id_array = batch.column(0);
let name_array = batch.column(1);
let score_array = batch.column(2);
assert_eq!(id_array.len(), 5);
assert_eq!(name_array.len(), 5);
assert_eq!(score_array.len(), 5);
}
#[tokio::test]
async fn test_user_journey_retry_behavior() {
use arrow_zerobus_sdk_wrapper::wrapper::retry::RetryConfig;
use arrow_zerobus_sdk_wrapper::ZerobusError;
let retry_config = RetryConfig::new(3, 10, 1000);
let attempts = std::sync::Arc::new(std::sync::Mutex::new(0));
let attempts_clone = attempts.clone();
let result = retry_config
.execute_with_retry(|| {
let attempts = attempts_clone.clone();
async move {
let mut count = attempts.lock().unwrap();
*count += 1;
let current = *count;
drop(count);
if current < 2 {
Err::<String, _>(ZerobusError::ConnectionError("transient".to_string()))
} else {
Ok("success".to_string())
}
}
})
.await;
assert!(result.is_ok());
assert_eq!(*attempts.lock().unwrap(), 2);
}
#[tokio::test]
async fn test_user_journey_concurrent_access() {
let config = WrapperConfiguration::new(
"https://test.cloud.databricks.com".to_string(),
"test_table".to_string(),
)
.with_credentials("client_id".to_string(), "client_secret".to_string())
.with_unity_catalog("https://unity-catalog-url".to_string());
let wrapper_result = ZerobusWrapper::new(config).await;
if let Ok(wrapper) = wrapper_result {
let wrapper_clone = wrapper.clone();
let _flush1 = wrapper.flush().await;
let _flush2 = wrapper_clone.flush().await;
}
}