arrow-zerobus-sdk-wrapper 0.8.1

Cross-platform Rust SDK wrapper for Databricks Zerobus with Python bindings
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
"""Comprehensive integration tests for Python bindings

These tests verify Python API functionality including async operations,
error translation, and RecordBatch conversion.
"""

import pytest
import pyarrow as pa
import asyncio

# Skip all tests if the module is not available
try:
    import arrow_zerobus_sdk_wrapper
    from arrow_zerobus_sdk_wrapper import (
        ZerobusWrapper,
        WrapperConfiguration,
        TransmissionResult,
        ConfigurationError,
        AuthenticationError,
        ConnectionError,
        ConversionError,
        TransmissionError,
        RetryExhausted,
        TokenRefreshError,
    )
except ImportError:
    pytestmark = pytest.mark.skip("arrow_zerobus_sdk_wrapper not available")


def test_error_translation():
    """Test that Rust errors are properly translated to Python exceptions."""
    # Test that all error types can be imported and are exception classes
    assert issubclass(ConfigurationError, Exception)
    assert issubclass(AuthenticationError, Exception)
    assert issubclass(ConnectionError, Exception)
    assert issubclass(ConversionError, Exception)
    assert issubclass(TransmissionError, Exception)
    assert issubclass(RetryExhausted, Exception)
    assert issubclass(TokenRefreshError, Exception)

    # Test that error instances can be created
    config_error = ConfigurationError("test config error")
    assert isinstance(config_error, Exception)
    assert str(config_error) == "test config error"

    auth_error = AuthenticationError("test auth error")
    assert isinstance(auth_error, Exception)
    assert str(auth_error) == "test auth error"


def test_error_translation_all_types():
    """Test error translation for all error types."""
    error_messages = {
        ConfigurationError: "Configuration error",
        AuthenticationError: "Authentication error",
        ConnectionError: "Connection error",
        ConversionError: "Conversion error",
        TransmissionError: "Transmission error",
        RetryExhausted: "Retry exhausted",
        TokenRefreshError: "Token refresh error",
    }

    for error_class, message in error_messages.items():
        error = error_class(message)
        assert isinstance(error, Exception)
        assert str(error) == message


@pytest.mark.asyncio
async def test_async_context_manager():
    """Test async context manager (if implemented)."""
    # Test that ZerobusWrapper can be used as an async context manager

    try:
        config = WrapperConfiguration(
            endpoint="https://test.cloud.databricks.com",
            table_name="test_table",
        )

        # Create wrapper using configuration
        wrapper = ZerobusWrapper(config)

        # Test async context manager
        # The context manager should handle entry and exit gracefully
        async with wrapper:
            # Context manager should work - wrapper is available in context
            assert wrapper is not None
            # After context manager exits, shutdown() is called automatically

        # After context manager, wrapper should be shut down
        # (shutdown is called in __aexit__)

    except Exception as e:
        # Expected if credentials are required or SDK not available
        assert isinstance(e, (ConfigurationError, ConnectionError, ImportError))


@pytest.mark.asyncio
async def test_concurrent_python_operations():
    """Test concurrent operations from Python."""
    # Test that multiple async operations can run concurrently
    # This verifies thread safety from Python side

    async def create_wrapper():
        """Helper to create wrapper."""
        try:
            config = WrapperConfiguration(
                endpoint="https://test.cloud.databricks.com",
                table_name="test_table",
            )
            wrapper = ZerobusWrapper(config)
            return wrapper
        except Exception:
            return None

    # Create multiple wrappers concurrently
    tasks = [create_wrapper() for _ in range(5)]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # All should complete (may fail, but shouldn't deadlock)
    assert len(results) == 5

    # Verify no exceptions were raised (or all are expected exceptions)
    for result in results:
        if isinstance(result, Exception):
            assert isinstance(
                result, (ConfigurationError, ConnectionError, ImportError)
            )


def test_record_batch_conversion():
    """Test PyArrow RecordBatch conversion."""
    # Test zero-copy conversion from PyArrow to Rust

    # Create a simple RecordBatch
    schema = pa.schema(
        [
            pa.field("id", pa.int64()),
            pa.field("name", pa.string()),
            pa.field("score", pa.float64()),
        ]
    )

    arrays = [
        pa.array([1, 2, 3], type=pa.int64()),
        pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
        pa.array([95.5, 87.0, 92.5], type=pa.float64()),
    ]

    batch = pa.RecordBatch.from_arrays(arrays, schema=schema)

    # Verify batch structure
    assert batch.num_rows == 3
    assert batch.num_columns == 3
    assert len(batch.schema) == 3

    # Test that batch can be passed to wrapper (if available)
    # This is a structural test - actual conversion happens in send_batch
    try:
        config = WrapperConfiguration(
            endpoint="https://test.cloud.databricks.com",
            table_name="test_table",
        )
        wrapper = ZerobusWrapper(config)

        # The batch should be convertible (actual conversion tested in send_batch)
        assert batch is not None
        assert wrapper is not None

    except Exception:
        # Expected if credentials required
        pass


def test_record_batch_various_types():
    """Test RecordBatch with various Arrow types."""
    # Test that different Arrow types can be converted

    # Test with different data types
    schema = pa.schema(
        [
            pa.field("int32", pa.int32()),
            pa.field("int64", pa.int64()),
            pa.field("float32", pa.float32()),
            pa.field("float64", pa.float64()),
            pa.field("string", pa.string()),
            pa.field("bool", pa.bool_()),
        ]
    )

    arrays = [
        pa.array([1, 2, 3], type=pa.int32()),
        pa.array([10, 20, 30], type=pa.int64()),
        pa.array([1.5, 2.5, 3.5], type=pa.float32()),
        pa.array([10.5, 20.5, 30.5], type=pa.float64()),
        pa.array(["a", "b", "c"], type=pa.string()),
        pa.array([True, False, True], type=pa.bool_()),
    ]

    batch = pa.RecordBatch.from_arrays(arrays, schema=schema)

    assert batch.num_rows == 3
    assert batch.num_columns == 6


def test_record_batch_with_nulls():
    """Test RecordBatch with null values."""
    # Test that null values are handled correctly

    schema = pa.schema(
        [
            pa.field("id", pa.int64()),
            pa.field("name", pa.string()),
        ]
    )

    arrays = [
        pa.array([1, None, 3], type=pa.int64()),
        pa.array(["Alice", "Bob", None], type=pa.string()),
    ]

    batch = pa.RecordBatch.from_arrays(arrays, schema=schema)

    assert batch.num_rows == 3
    assert batch.num_columns == 2


def test_wrapper_configuration_methods():
    """Test WrapperConfiguration builder methods."""
    # Test that configuration can be built using builder pattern

    config = WrapperConfiguration(
        endpoint="https://test.cloud.databricks.com",
        table_name="test_table",
    )

    # Test that configuration has expected attributes
    assert hasattr(config, "endpoint")
    assert hasattr(config, "table_name")

    # Test with credentials
    config_with_creds = WrapperConfiguration(
        endpoint="https://test.cloud.databricks.com",
        table_name="test_table",
        client_id="test_id",
        client_secret="test_secret",
    )

    assert config_with_creds is not None


def test_transmission_result_structure():
    """Test TransmissionResult structure."""
    # Test that TransmissionResult has expected attributes
    # Note: We can't easily create a TransmissionResult without sending a batch
    # But we can verify the class exists and has expected structure

    # TransmissionResult is returned by send_batch
    # We verify it exists as a class
    assert TransmissionResult is not None

    # If we could create an instance, it should have these attributes:
    # - success: bool
    # - error: Optional[ZerobusError]
    # - attempts: int
    # - latency_ms: Optional[int]
    # - batch_size_bytes: int


def test_wrapper_initialization_with_options():
    """Test wrapper initialization with various options."""
    # Test that wrapper can be initialized with different configurations

    # Basic configuration
    try:
        config1 = WrapperConfiguration(
            endpoint="https://test.cloud.databricks.com",
            table_name="test_table",
        )
        wrapper1 = ZerobusWrapper(config1)
        assert wrapper1 is not None
    except Exception:
        pass  # Expected if credentials required

    # With credentials
    try:
        config2 = WrapperConfiguration(
            endpoint="https://test.cloud.databricks.com",
            table_name="test_table",
            client_id="test_id",
            client_secret="test_secret",
            unity_catalog_url="https://unity-catalog-url",
        )
        wrapper2 = ZerobusWrapper(config2)
        assert wrapper2 is not None
    except Exception:
        pass  # Expected without real credentials


def test_error_hierarchy():
    """Test that error classes form a proper hierarchy."""
    # Verify error classes are properly structured

    # All should be exceptions
    assert issubclass(ConfigurationError, Exception)
    assert issubclass(AuthenticationError, Exception)
    assert issubclass(ConnectionError, Exception)
    assert issubclass(ConversionError, Exception)
    assert issubclass(TransmissionError, Exception)
    assert issubclass(RetryExhausted, Exception)
    assert issubclass(TokenRefreshError, Exception)

    # All should be subclasses of ZerobusError (if it's a base class)
    # Note: In PyO3, all exceptions extend PyException directly
    # but they're logically grouped as ZerobusError exceptions


def test_configuration_validation_from_python():
    """Test configuration validation from Python."""
    # Test that validation works from Python

    # Valid configuration
    valid_config = WrapperConfiguration(
        endpoint="https://test.cloud.databricks.com",
        table_name="test_table",
    )

    try:
        valid_config.validate()
        # Should not raise
    except Exception as e:
        # May raise if validation is strict
        assert isinstance(e, ConfigurationError)

    # Invalid configuration
    invalid_config = WrapperConfiguration(
        endpoint="invalid-endpoint",
        table_name="test_table",
    )

    try:
        invalid_config.validate()
        # May or may not raise depending on implementation
    except Exception as e:
        assert isinstance(e, ConfigurationError)


@pytest.mark.asyncio
async def test_async_send_batch():
    """Test async send_batch operation."""
    # Test that send_batch can be called asynchronously

    try:
        config = WrapperConfiguration(
            endpoint="https://test.cloud.databricks.com",
            table_name="test_table",
        )
        wrapper = ZerobusWrapper(config)

        # Create test batch
        schema = pa.schema(
            [
                pa.field("id", pa.int64()),
                pa.field("name", pa.string()),
            ]
        )
        arrays = [
            pa.array([1, 2, 3], type=pa.int64()),
            pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
        ]
        batch = pa.RecordBatch.from_arrays(arrays, schema=schema)

        # Try to send (will fail without credentials, but tests async pattern)
        try:
            result = await wrapper.send_batch(batch)
            # If successful, verify result structure
            assert hasattr(result, "success")
            assert hasattr(result, "attempts")
            assert hasattr(result, "batch_size_bytes")
        except Exception as e:
            # Expected without real credentials
            assert isinstance(
                e, (ConfigurationError, AuthenticationError, ConnectionError)
            )

    except Exception:
        # Expected if wrapper creation fails
        pass


def test_module_imports():
    """Test that all expected modules and classes can be imported."""
    # Verify all public API is accessible

    assert hasattr(arrow_zerobus_sdk_wrapper, "ZerobusWrapper")
    assert hasattr(arrow_zerobus_sdk_wrapper, "WrapperConfiguration")
    assert hasattr(arrow_zerobus_sdk_wrapper, "TransmissionResult")
    assert hasattr(arrow_zerobus_sdk_wrapper, "ZerobusError")
    assert hasattr(arrow_zerobus_sdk_wrapper, "ConfigurationError")
    assert hasattr(arrow_zerobus_sdk_wrapper, "AuthenticationError")
    assert hasattr(arrow_zerobus_sdk_wrapper, "ConnectionError")
    assert hasattr(arrow_zerobus_sdk_wrapper, "ConversionError")
    assert hasattr(arrow_zerobus_sdk_wrapper, "TransmissionError")
    assert hasattr(arrow_zerobus_sdk_wrapper, "RetryExhausted")
    assert hasattr(arrow_zerobus_sdk_wrapper, "TokenRefreshError")


def test_pyarrow_compatibility():
    """Test PyArrow compatibility and zero-copy."""
    # Test that PyArrow RecordBatch can be used directly

    # Create various PyArrow structures
    schema = pa.schema(
        [
            pa.field("id", pa.int64()),
            pa.field("name", pa.string()),
        ]
    )

    # Test that we can create arrays and batches
    id_array = pa.array([1, 2, 3], type=pa.int64())
    name_array = pa.array(["Alice", "Bob", "Charlie"], type=pa.string())

    batch = pa.RecordBatch.from_arrays([id_array, name_array], schema=schema)

    # Verify batch is valid PyArrow RecordBatch
    assert isinstance(batch, pa.RecordBatch)
    assert batch.num_rows == 3
    assert batch.num_columns == 2

    # Test that batch can be serialized (for zero-copy transfer)
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, batch.schema) as writer:
        writer.write_batch(batch)
    sink_bytes = sink.getvalue()

    # Verify serialization works
    assert len(sink_bytes) > 0