arrow-zerobus-sdk-wrapper 0.8.0

Cross-platform Rust SDK wrapper for Databricks Zerobus with Python bindings
Documentation
//! End-to-end integration test for Rust API
//!
//! This test verifies the complete user journey:
//! 1. Create configuration
//! 2. Initialize wrapper
//! 3. Create Arrow RecordBatch
//! 4. Send batch to Zerobus
//! 5. Verify result
//! 6. Shutdown wrapper

use arrow::array::{Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_zerobus_sdk_wrapper::{
    TransmissionResult, WrapperConfiguration, ZerobusError, ZerobusWrapper,
};
use std::sync::Arc;

/// Create a test RecordBatch with sample data
fn create_test_record_batch() -> RecordBatch {
    let schema = Schema::new(vec![
        Field::new("id", DataType::Int64, false),
        Field::new("name", DataType::Utf8, false),
        Field::new("score", DataType::Float64, true),
    ]);

    let id_array = Int64Array::from(vec![1, 2, 3, 4, 5]);
    let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve"]);
    let score_array =
        Float64Array::from(vec![Some(95.5), Some(87.0), None, Some(92.5), Some(88.0)]);

    RecordBatch::try_new(
        Arc::new(schema),
        vec![
            Arc::new(id_array),
            Arc::new(name_array),
            Arc::new(score_array),
        ],
    )
    .expect("Failed to create test RecordBatch")
}

/// Test complete user journey with mock configuration
#[tokio::test]
#[ignore] // Requires actual Zerobus SDK and credentials
async fn test_complete_user_journey() {
    // Step 1: Create configuration
    let config = WrapperConfiguration::new(
        "https://test-workspace.cloud.databricks.com".to_string(),
        "test_table".to_string(),
    )
    .with_credentials(
        std::env::var("ZEROBUS_CLIENT_ID").unwrap_or_else(|_| "test_client_id".to_string()),
        std::env::var("ZEROBUS_CLIENT_SECRET").unwrap_or_else(|_| "test_client_secret".to_string()),
    )
    .with_unity_catalog(
        std::env::var("UNITY_CATALOG_URL")
            .unwrap_or_else(|_| "https://test.cloud.databricks.com".to_string()),
    )
    .with_retry_config(3, 100, 1000); // Reduced retries for testing

    // Step 2: Initialize wrapper
    let wrapper_result = ZerobusWrapper::new(config).await;

    // Without real credentials, this will fail, but we can test the flow
    match wrapper_result {
        Ok(wrapper) => {
            // Step 3: Create Arrow RecordBatch
            let batch = create_test_record_batch();
            assert_eq!(batch.num_rows(), 5);
            assert_eq!(batch.num_columns(), 3);

            // Step 4: Send batch to Zerobus
            let result: Result<TransmissionResult, ZerobusError> = wrapper.send_batch(batch).await;

            // Step 5: Verify result
            match result {
                Ok(transmission_result) => {
                    // Verify TransmissionResult structure
                    assert!(transmission_result.batch_size_bytes > 0);
                    assert!(transmission_result.attempts >= 1);

                    if transmission_result.success {
                        assert!(transmission_result.error.is_none());
                        assert!(transmission_result.latency_ms.is_some());
                        println!(
                            "✅ Batch sent successfully! Latency: {}ms, Size: {} bytes",
                            transmission_result.latency_ms.unwrap_or(0),
                            transmission_result.batch_size_bytes
                        );
                    } else {
                        assert!(transmission_result.error.is_some());
                        println!("❌ Transmission failed: {:?}", transmission_result.error);
                    }
                }
                Err(e) => {
                    // Error is acceptable in test environment
                    println!("⚠️  Transmission error (expected in test): {}", e);
                }
            }

            // Step 6: Shutdown wrapper
            let shutdown_result = wrapper.shutdown().await;
            assert!(shutdown_result.is_ok() || shutdown_result.is_err());
        }
        Err(e) => {
            // Initialization failure is expected without real credentials
            println!(
                "⚠️  Wrapper initialization failed (expected in test): {}",
                e
            );
        }
    }
}

/// Test configuration validation in user journey
#[test]
fn test_user_journey_configuration_validation() {
    // Test that invalid configuration is caught early
    let invalid_config = WrapperConfiguration::new(
        "invalid-endpoint".to_string(), // Invalid endpoint
        "test_table".to_string(),
    );

    let validation_result = invalid_config.validate();
    assert!(validation_result.is_err());

    // Test that valid configuration passes validation
    let valid_config = WrapperConfiguration::new(
        "https://test.cloud.databricks.com".to_string(),
        "test_table".to_string(),
    );

    let validation_result = valid_config.validate();
    assert!(validation_result.is_ok());
}

/// Test error handling in user journey
#[tokio::test]
async fn test_user_journey_error_handling() {
    // Test that configuration errors are properly returned
    let config = WrapperConfiguration::new("invalid".to_string(), "test_table".to_string());

    // Validation should fail
    assert!(config.validate().is_err());

    // Test that missing credentials are detected
    let config = WrapperConfiguration::new(
        "https://test.cloud.databricks.com".to_string(),
        "test_table".to_string(),
    );
    // No credentials set

    // Wrapper initialization should fail without credentials
    let wrapper_result = ZerobusWrapper::new(config).await;
    assert!(wrapper_result.is_err());
}

/// Test that RecordBatch conversion works in user journey
#[test]
fn test_user_journey_record_batch_creation() {
    // Test that we can create a valid RecordBatch
    let batch = create_test_record_batch();

    // Verify batch structure
    assert_eq!(batch.num_rows(), 5);
    assert_eq!(batch.num_columns(), 3);

    // Verify schema
    let schema = batch.schema();
    assert_eq!(schema.fields().len(), 3);
    assert_eq!(schema.field(0).name(), "id");
    assert_eq!(schema.field(1).name(), "name");
    assert_eq!(schema.field(2).name(), "score");

    // Verify data
    let id_array = batch.column(0);
    let name_array = batch.column(1);
    let score_array = batch.column(2);

    // Check that arrays are not empty
    assert_eq!(id_array.len(), 5);
    assert_eq!(name_array.len(), 5);
    assert_eq!(score_array.len(), 5);
}

/// Test retry behavior in user journey
#[tokio::test]
async fn test_user_journey_retry_behavior() {
    use arrow_zerobus_sdk_wrapper::wrapper::retry::RetryConfig;
    use arrow_zerobus_sdk_wrapper::ZerobusError;

    // Test that retry config works as expected
    let retry_config = RetryConfig::new(3, 10, 1000);

    let attempts = std::sync::Arc::new(std::sync::Mutex::new(0));
    let attempts_clone = attempts.clone();
    let result = retry_config
        .execute_with_retry(|| {
            let attempts = attempts_clone.clone();
            async move {
                let mut count = attempts.lock().unwrap();
                *count += 1;
                let current = *count;
                drop(count);

                if current < 2 {
                    Err::<String, _>(ZerobusError::ConnectionError("transient".to_string()))
                } else {
                    Ok("success".to_string())
                }
            }
        })
        .await;

    assert!(result.is_ok());
    assert_eq!(*attempts.lock().unwrap(), 2);
}

/// Test that wrapper can be cloned for concurrent use
#[tokio::test]
async fn test_user_journey_concurrent_access() {
    let config = WrapperConfiguration::new(
        "https://test.cloud.databricks.com".to_string(),
        "test_table".to_string(),
    )
    .with_credentials("client_id".to_string(), "client_secret".to_string())
    .with_unity_catalog("https://unity-catalog-url".to_string());

    let wrapper_result = ZerobusWrapper::new(config).await;

    if let Ok(wrapper) = wrapper_result {
        // Test that wrapper can be cloned (for concurrent access)
        let wrapper_clone = wrapper.clone();

        // Both should be usable (though will fail without real SDK)
        let _flush1 = wrapper.flush().await;
        let _flush2 = wrapper_clone.flush().await;
    }
}