import pytest
import pyarrow as pa
try:
import arrow_zerobus_sdk_wrapper except ImportError:
pytestmark = pytest.mark.skip("arrow_zerobus_sdk_wrapper not available")
def test_import_module():
import arrow_zerobus_sdk_wrapper
assert hasattr(arrow_zerobus_sdk_wrapper, "ZerobusWrapper")
assert hasattr(arrow_zerobus_sdk_wrapper, "ZerobusError")
def test_configuration_creation():
from arrow_zerobus_sdk_wrapper import WrapperConfiguration
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
client_id="test_client_id",
client_secret="test_client_secret",
unity_catalog_url="https://unity-catalog-url",
)
assert config is not None
def test_configuration_validation():
from arrow_zerobus_sdk_wrapper import WrapperConfiguration
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
)
try:
config.validate()
except Exception as e:
pytest.fail(f"Valid configuration should not raise error: {e}")
invalid_config = WrapperConfiguration(
endpoint="invalid-endpoint",
table_name="test_table",
)
with pytest.raises(Exception): invalid_config.validate()
def test_transmission_result():
pass
def test_error_classes():
from arrow_zerobus_sdk_wrapper import (
ZerobusError,
ConfigurationError,
AuthenticationError,
ConnectionError,
ConversionError,
TransmissionError,
RetryExhausted,
TokenRefreshError,
)
assert ZerobusError is not None
assert ConfigurationError is not None
assert AuthenticationError is not None
assert ConnectionError is not None
assert ConversionError is not None
assert TransmissionError is not None
assert RetryExhausted is not None
assert TokenRefreshError is not None
@pytest.mark.skip(reason="Requires actual Zerobus SDK and credentials")
def test_wrapper_initialization():
from arrow_zerobus_sdk_wrapper import ZerobusWrapper, WrapperConfiguration
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
client_id="test_client_id",
client_secret="test_client_secret",
unity_catalog_url="https://unity-catalog-url",
)
wrapper = ZerobusWrapper(config)
assert wrapper is not None
@pytest.mark.skip(reason="Requires actual Zerobus SDK and credentials")
def test_send_batch():
from arrow_zerobus_sdk_wrapper import ZerobusWrapper, WrapperConfiguration
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
]
)
arrays = [
pa.array([1, 2, 3], type=pa.int64()),
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
]
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
client_id="test_client_id",
client_secret="test_client_secret",
unity_catalog_url="https://unity-catalog-url",
)
wrapper = ZerobusWrapper(config)
result = wrapper.send_batch(batch)
assert hasattr(result, "success")
assert hasattr(result, "error")
assert hasattr(result, "attempts")
assert hasattr(result, "latency_ms")
assert hasattr(result, "batch_size_bytes")
def test_record_batch_creation():
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("score", pa.float64()),
]
)
arrays = [
pa.array([1, 2, 3], type=pa.int64()),
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
pa.array([95.5, 87.0, 92.5], type=pa.float64()),
]
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
assert batch.num_rows == 3
assert batch.num_columns == 3
assert batch.schema == schema
def test_writer_disabled_parameter():
from arrow_zerobus_sdk_wrapper import WrapperConfiguration
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
debug_enabled=True,
debug_output_dir="./test_debug",
zerobus_writer_disabled=True,
)
assert config.zerobus_writer_disabled is True
def test_writer_disabled_validation():
from arrow_zerobus_sdk_wrapper import WrapperConfiguration, ConfigurationError
with pytest.raises(ConfigurationError):
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
debug_enabled=False,
zerobus_writer_disabled=True,
)
config.validate()
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
debug_enabled=True,
debug_output_dir="./test_debug",
zerobus_writer_disabled=True,
)
try:
config.validate()
except Exception as e:
pytest.fail(f"Valid configuration should not raise error: {e}")
def test_debug_enabled_requires_output_dir():
from arrow_zerobus_sdk_wrapper import WrapperConfiguration
with pytest.raises(Exception) as exc_info:
WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
debug_enabled=True,
debug_output_dir=None, )
error_msg = str(exc_info.value)
assert (
"debug_output_dir" in error_msg.lower() or "output" in error_msg.lower()
), f"Error message should mention debug_output_dir requirement, got: {error_msg}"
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
debug_enabled=True,
debug_output_dir="./test_debug",
)
assert (
config.debug_enabled is True
), "debug_enabled should be True when output_dir is provided"
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
debug_enabled=False,
debug_output_dir=None,
)
assert (
config.debug_enabled is False
), "debug_enabled should be False when not enabled"
@pytest.mark.asyncio
async def test_wrapper_works_without_credentials_when_disabled():
import tempfile
import os
from arrow_zerobus_sdk_wrapper import ZerobusWrapper, WrapperConfiguration
temp_dir = tempfile.mkdtemp()
debug_output_dir = os.path.join(temp_dir, "debug")
try:
config = WrapperConfiguration(
endpoint="https://test.cloud.databricks.com",
table_name="test_table",
debug_enabled=True,
debug_output_dir=debug_output_dir,
zerobus_writer_disabled=True,
)
wrapper = ZerobusWrapper(config)
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
]
)
arrays = [
pa.array([1, 2, 3], type=pa.int64()),
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
]
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
result = wrapper.send_batch(batch)
assert result.success, "send_batch should succeed when writer disabled"
assert result.success
wrapper.shutdown()
finally:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)