Arrow Zerobus SDK Wrapper
Cross-platform Rust SDK wrapper for Databricks Zerobus with Python bindings. Provides a unified API for sending Arrow RecordBatch data to Zerobus with automatic protocol conversion, authentication, retry logic, and observability.
Features
- Rust SDK: Native Rust API for sending Arrow RecordBatch data to Zerobus
- Python Bindings: Python 3.11+ support via PyO3 with zero-copy data transfer
- Automatic Retry: Exponential backoff with jitter for transient failures
- Token Refresh: Automatic authentication token refresh for long-running operations
- Observability: OpenTelemetry metrics and traces integration
- Debug Output: Optional Arrow and Protobuf file output for debugging
- Writer Disabled Mode: Disable Zerobus SDK transmission while maintaining debug file output for local development and testing
- Per-Row Error Tracking: Identify which specific rows failed, enabling partial batch success and efficient quarantine workflows
- Error Analysis: Group errors by type, track statistics, and analyze patterns for debugging
- Thread-Safe: Concurrent operations from multiple threads/async tasks
- Cross-Platform: Linux, macOS, Windows support
Requirements
- Rust 1.75+ (edition 2021)
- Python 3.11+ (for Python bindings)
- Databricks workspace with Zerobus enabled
- OAuth2 credentials (client_id, client_secret)
- Unity Catalog URL
Installation
Rust
Add to your Cargo.toml:
[]
= { = "0.1.0", = "../arrow-zerobus-sdk-wrapper" }
= "57"
= { = "1.35", = ["full"] }
Python
Or from source:
Quick Start
Rust
use ;
use RecordBatch;
async
Python
See examples/python_example.py for a complete example.
=
# Create Arrow RecordBatch
=
=
=
# Send batch
= await
# Handle per-row errors (if any)
# Extract and quarantine failed rows
=
# Extract and write successful rows
=
# Analyze error patterns
=
=
await
Writer Disabled Mode
The wrapper supports a "writer disabled" mode that allows you to test data conversion logic and write debug files without making network calls to Zerobus. This is useful for:
- Local Development: Test data transformations without Databricks workspace access
- CI/CD Testing: Validate data format without requiring credentials
- Performance Testing: Benchmark conversion logic without network overhead
Usage
Rust:
use ;
use PathBuf;
let config = new
.with_debug_output
.with_zerobus_writer_disabled; // Enable disabled mode
let wrapper = new.await?;
// No credentials required when writer is disabled
let result = wrapper.send_batch.await?;
// Debug files written, no network calls made
Python:
=
= await
# Debug files written, no network calls made
Note: When zerobus_writer_disabled is true, debug_enabled must also be true. Credentials are optional when writer is disabled.
Building
Rust
Python Bindings
Testing
Rust
Note: When running tests with the python feature enabled, PyO3 needs to link against the Python library. You can either:
-
Use the helper script (recommended):
-
Set PYO3_PYTHON manually:
# Find your Python executable || # Run tests with Python feature PYO3_PYTHON=/path/to/python3
The helper script automatically detects and uses Python 3.11+ from your PATH.
Rust Tests
# Run all tests (without Python feature)
# Run all tests including Python bindings
# Run with coverage (requires cargo-tarpaulin)
Python Tests
Note: Python tests require the Python extension to be built first. The tests use pytest-forked to work around PyO3 GIL issues that can cause pytest to hang after tests.
# Recommended: Use the helper script (handles setup automatically)
# Manual setup:
# 1. Build Python extension
# 2. Install test dependencies
# 3. Run tests with PyO3 workaround
# Run with coverage
PyO3 Pytest Workaround: The --forked flag ensures each test runs in a separate process, preventing GIL (Global Interpreter Lock) deadlocks that can cause pytest to hang. The conftest.py file includes additional fixtures to ensure proper Python initialization and cleanup.
Performance Benchmarks
# Run latency benchmarks
# Run throughput benchmarks
Performance
- Latency: p95 latency under 150ms for batches up to 10MB
- Success Rate: 99.999% under normal network conditions
- Concurrency: Thread-safe, supports concurrent operations
Debug File Inspection with DuckDB
When debug output is enabled, the wrapper writes Arrow and Protobuf files to disk for inspection. You can use DuckDB to read and analyze these files using the Arrow IPC support in DuckDB.
Installing the Arrow Extension
First, install and load the DuckDB Arrow community extension:
INSTALL arrow FROM community;
LOAD arrow;
Reading Arrow IPC Files
Arrow files are written in Arrow IPC stream format (.arrow or .arrows extension) and can be read directly by DuckDB:
-- Read Arrow IPC file using read_arrow() function
SELECT * FROM read_arrow('debug_output/zerobus/arrow/table.arrow');
-- DuckDB supports replacement scans - you can omit read_arrow()
-- if the filename ends with .arrow or .arrows
SELECT * FROM 'debug_output/zerobus/arrow/table.arrow';
-- Read multiple Arrow files (including rotated files)
SELECT * FROM read_arrow('debug_output/zerobus/arrow/*.arrow');
-- Query specific columns
SELECT id, name, score
FROM 'debug_output/zerobus/arrow/table.arrow'
WHERE score > 90;
-- Aggregate data
SELECT
COUNT(*) as total_rows,
AVG(score) as avg_score,
MAX(score) as max_score
FROM 'debug_output/zerobus/arrow/table.arrow';
Reading from HTTP
You can also read Arrow IPC files over HTTP using DuckDB's httpfs extension:
INSTALL httpfs;
LOAD httpfs;
LOAD arrow;
-- Read Arrow IPC file from HTTP server
SELECT * FROM read_arrow('http://localhost:8008/table.arrow');
Reading from stdin
You can pipe Arrow IPC data directly to DuckDB:
# Using curl to fetch and pipe to DuckDB
URL="http://localhost:8008/table.arrow"
SQL="LOAD arrow; FROM read_arrow('/dev/stdin') SELECT count(*);"
|
Example: Analyzing Debug Files
-- Load the Arrow extension
LOAD arrow;
-- Read all Arrow files from debug output
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT file_name) as num_files
FROM read_arrow('debug_output/zerobus/arrow/*.arrow');
-- Analyze specific data
SELECT
id,
name,
score,
COUNT(*) OVER as total_count
FROM 'debug_output/zerobus/arrow/table.arrow'
WHERE score > 90
ORDER BY score DESC;
Python Example with DuckDB
# Connect to DuckDB
=
# Install and load Arrow extension
# Read Arrow debug files (using replacement scan)
=
# Analyze data across multiple files
=
Reading Protobuf Files
Protobuf files contain binary Protobuf messages. To read them with DuckDB, you'll need to:
- Convert Protobuf to Arrow IPC first (using a Protobuf parser), or
- Use Python with PyArrow to convert Protobuf to Arrow format
For Protobuf files, you can use Python to convert to Arrow IPC format, then read with DuckDB:
# Read Protobuf file and convert to Arrow IPC
# (This requires knowledge of your Protobuf schema)
# Parse Protobuf messages and convert to Arrow
# Then write as Arrow IPC format
# Implementation depends on your specific Protobuf schema
pass
# Convert Protobuf to Arrow IPC file
# Then use DuckDB to read the converted Arrow IPC file
=
=
Alternatively, if you have the Protobuf schema definition, you can use tools like protoc to generate code for parsing, then convert to Arrow IPC format that DuckDB can read.
Notes
- Arrow IPC Format: Files use Arrow IPC stream format (recommended extension:
.arrows) - Extension Support: DuckDB can read files with
.arrowor.arrowsextensions directly (replacement scans) - Multi-file Reading: Supports reading multiple files using glob patterns (
*.arrow) - Performance: Arrow IPC format is optimized for fast encoding/decoding and zero-copy data transfer
- Protobuf Files: Require conversion to Arrow IPC format first before reading with DuckDB
- File Rotation: Rotated files (with timestamp suffixes) can be read using glob patterns
- Flush Before Reading: Debug files are written incrementally, so you may need to call
wrapper.flush()before reading
For more details, see the official DuckDB Arrow IPC support documentation.
Development
Pre-commit Hooks
The repository includes a pre-commit hook to ensure version consistency across all configuration files. Before each commit, the hook verifies that version numbers match in:
Cargo.tomlpyproject.tomlCHANGELOG.md(latest release)
To install the pre-commit hook:
To manually check version consistency:
Version Management
When releasing a new version, ensure all version numbers are updated:
- Update
Cargo.toml:version = "X.Y.Z" - Update
pyproject.toml:version = "X.Y.Z" - Update
CHANGELOG.md: Add new release section## [X.Y.Z] - YYYY-MM-DD
The pre-commit hook and CI pipeline will verify version consistency automatically.
For more details, see Version Management Guide.
Documentation
License
MIT OR Apache-2.0