Zerobus Rust SDK
A high-performance Rust client for streaming data ingestion into Databricks Delta tables using the Zerobus service.
Table of Contents
- Overview
- Features
- Installation
- Quick Start
- Repository Structure
- How It Works
- Usage Guide
- Configuration Options
- Error Handling
- Examples
- Best Practices
- API Reference
- Language Bindings
- Building from Source
- Community and Contributing
- License
Overview
The Zerobus Rust SDK provides a robust, async-first interface for ingesting large volumes of data into Databricks Delta tables. It abstracts the complexity of the Zerobus service and handles authentication, retries, stream recovery, and acknowledgment tracking automatically.
What is Zerobus? See the project overview for details on the Zerobus service.
Features
- Async/Await Support - Built on Tokio for efficient concurrent I/O operations
- Automatic OAuth 2.0 Authentication - Seamless token management with Unity Catalog
- Built-in Recovery - Automatic retry and reconnection for transient failures
- High Throughput - Configurable inflight record limits for optimal performance
- Batch Ingestion - Ingest multiple records at once with all-or-nothing semantics for maximum throughput
- Flexible Serialization - Support for both JSON (simple) and Protocol Buffers (type-safe) data formats
- Type Safety - Protocol Buffers ensure schema validation at compile time
- Schema Generation - CLI tool to generate protobuf schemas from Unity Catalog tables
- Flexible Configuration - Fine-tune timeouts, retries, and recovery behavior
- Graceful Stream Management - Proper flushing and acknowledgment tracking
- Acknowledgment Callbacks - Receive notifications when records are acknowledged or encounter errors
Installation
Add the SDK to your Cargo.toml:
Why these dependencies?
databricks-zerobus-ingest-sdk- The SDK itselfprostandprost-types- Required for encoding your data to Protocol Buffers and loading schema descriptorstokio- Async runtime required for running async functions (the SDK is fully async)
What's in the crates.io package? The published crate contains only the core Zerobus ingestion SDK. Tools for schema generation (
tools/generate_files) and working examples (examples/) are only available in the GitHub repository. You'll need to clone the repo to generate protobuf schemas from your Unity Catalog tables.
For Local Development
Clone the repository and use a path dependency:
Then in your Cargo.toml:
[]
= { = "../zerobus-sdk/rust/sdk" }
= "0.13.3"
= "0.13.3"
= { = "1.42.0", = ["macros", "rt-multi-thread"] }
Quick Start
The SDK supports two serialization formats and two ingestion methods:
Serialization:
- JSON (Recommended for getting started): Simpler approach using JSON strings, no schema generation required
- Protocol Buffers (Recommended for production): Type-safe approach with schema validation at compile time
Ingestion Methods:
- Single-record (
ingest_record_offset): Ingest records one at a time with per-record acknowledgment - Batch (
ingest_records_offset): Ingest multiple records at once with all-or-nothing semantics for higher throughput
Note: The older
ingest_record()andingest_records()methods are deprecated as of v0.4.0. Use the_offsetvariants instead.
See examples/README.md for detailed setup instructions and examples for all combinations.
Repository Structure
zerobus_rust_sdk/
├── sdk/ # Core SDK library
│ ├── src/
│ │ ├── lib.rs # Main SDK and stream implementation
│ │ ├── default_token_factory.rs # OAuth 2.0 token handling
│ │ ├── errors.rs # Error types and retryable logic
│ │ ├── headers_provider.rs # Trait for custom authentication headers
│ │ ├── builder/ # Builder pattern for SDK initialization
│ │ ├── tls_config.rs # TLS configuration strategies
│ │ ├── stream_configuration.rs # Stream options
│ │ ├── landing_zone.rs # Inflight record buffer
│ │ └── offset_generator.rs # Logical offset tracking
│ ├── zerobus_service.proto # gRPC protocol definition
│ ├── build.rs # Build script for protobuf compilation
│ └── Cargo.toml
│
├── ffi/ # C FFI bindings for other languages
│ ├── src/
│ │ ├── lib.rs # FFI implementation
│ │ └── tests.rs # FFI unit tests
│ ├── zerobus.h # Generated C header
│ ├── cbindgen.toml # Header generation config
│ ├── build.rs # Build script for header generation
│ └── Cargo.toml
│
├── jni/ # JNI bindings for Java SDK
│ ├── src/
│ │ └── lib.rs # JNI implementation
│ └── Cargo.toml
│
├── tools/
│ └── generate_files/ # Schema generation CLI tool
│ ├── src/
│ │ ├── main.rs # CLI entry point
│ │ └── generate.rs # Unity Catalog -> Proto conversion
│ ├── README.md # Tool documentation
│ └── Cargo.toml
│
├── examples/
│ ├── README.md # Examples documentation
│ ├── json/
│ │ ├── README.md # JSON examples documentation
│ │ ├── single/ # JSON single-record example
│ │ │ ├── src/main.rs
│ │ │ └── Cargo.toml
│ │ └── batch/ # JSON batch ingestion example
│ │ ├── src/main.rs
│ │ └── Cargo.toml
│ └── proto/
│ ├── README.md # Protocol Buffers examples documentation
│ ├── single/ # Protocol Buffers single-record example
│ │ ├── src/main.rs
│ │ ├── output/ # Generated schema files
│ │ └── Cargo.toml
│ └── batch/ # Protocol Buffers batch ingestion example
│ ├── src/main.rs
│ ├── output/ # Generated schema files
│ └── Cargo.toml
│
├── tests/ # Integration tests crate
│ ├── src/
│ │ ├── mock_grpc.rs # Mock Zerobus gRPC server
│ │ └── rust_tests.rs # Test suite
│ ├── build.rs
│ └── Cargo.toml
│
├── Cargo.toml # Workspace configuration
└── README.md # This file
Key Components
sdk/- The main library crate containing all SDK functionalityffi/- C FFI bindings for building language wrappers (Go, C#, C++, etc.)jni/- JNI bindings for the Java SDKtools/- CLI tool for generating Protocol Buffer schemas from Unity Catalog tablesexamples/- Complete working examples demonstrating SDK usage- Workspace - Root
Cargo.tomldefines a Cargo workspace for unified builds
How It Works
Architecture Overview
+-----------------+
| Your App |
+-----------------+
| 1. create_stream()
v
+-----------------+
| ZerobusSdk |
| - Manages TLS |
| - Creates |
| channels |
+-----------------+
| 2. Opens bidirectional gRPC stream
v
+--------------------------------------+
| ZerobusStream |
| +----------------------------------+ |
| | Supervisor | | Manages lifecycle, recovery
| +----------------------------------+ |
| | |
| +-----------+-----------+ |
| v v |
| +----------+ +----------+ |
| | Sender | | Receiver | | Parallel tasks
| | Task | | Task | |
| +----------+ +----------+ |
| ^ | |
| | v |
| +----------------------------------+ |
| | Landing Zone | | Inflight buffer
| +----------------------------------+ |
+--------------------------------------+
| 3. gRPC stream
v
+-----------------------+
| Databricks |
| Zerobus Service |
+-----------------------+
Data Flow
- Ingestion - Your app calls
stream.ingest_record(data)orstream.ingest_records(batch) - Buffering - Records are placed in the landing zone with logical offsets
- Sending - Sender task sends records over gRPC with physical offsets
- Acknowledgment - Receiver task gets server ack and resolves the future
- Recovery - If connection fails, supervisor reconnects and resends unacked records
Authentication Flow
The SDK uses OAuth 2.0 client credentials flow:
- SDK constructs authorization request with Unity Catalog privileges
- Sends request to
{uc_endpoint}/oidc/v1/tokenwith client credentials - Token includes scoped permissions for the specific table
- Token is attached to gRPC metadata as Bearer token
- Fresh tokens are fetched automatically on each connection
Custom Authentication
For advanced use cases, you can implement the HeadersProvider trait to supply your own authentication headers. This is useful for integrating with a different OAuth provider, using a centralized token caching service, or implementing alternative authentication mechanisms.
Note: The headers you provide must still conform to the authentication protocol expected by the Zerobus service. The default implementation,
OAuthHeadersProvider, serves as the reference for the required headers (authorizationandx-databricks-zerobus-table-name). This feature provides flexibility in how you source your credentials, not in changing the authentication protocol itself.
Example:
use *;
use HashMap;
use Arc;
use async_trait;
;
async
Usage Guide
The SDK supports two approaches for data serialization:
- JSON - Simpler approach that uses JSON strings. No schema generation required, making it ideal for quick prototyping. See
examples/README.mdfor a complete example. - Protocol Buffers - Type-safe approach with schema validation at compile time. Recommended for production use cases. This guide focuses on the Protocol Buffers approach.
For JSON-based ingestion, you can skip the schema generation step and directly pass JSON strings to ingest_record().
1. Generate Protocol Buffer Schema (Protocol Buffers approach only)
Important Note: The schema generation tool and examples are only available in the GitHub repository. The crate published on crates.io contains only the core Zerobus ingestion SDK logic. To generate protobuf schemas or see working examples, clone the repository:
Use the included tool to generate schema files from your Unity Catalog table:
# For AWS
# For Azure
This generates three files:
{table}.proto- Protocol Buffer schema definition{table}.rs- Rust structs with serialization code{table}.descriptor- Binary descriptor for runtime validation
See tools/generate_files/README.md for supported data types and limitations.
See examples/README.md for more information on how to get OAuth credentials.
2. Initialize the SDK
Create an SDK instance using the builder pattern:
// For AWS
let sdk = builder
.endpoint
.unity_catalog_url
.build?;
// For Azure
let sdk = builder
.endpoint
.unity_catalog_url
.build?;
Note: The workspace ID is automatically extracted from the Zerobus endpoint. The https:// scheme is optional — if omitted, the SDK automatically prepends https://.
TLS Configuration
By default, the SDK uses SecureTlsConfig which enables TLS with the operating system's trusted CA certificates. For testing against a local http:// server, use NoTlsConfig (requires the testing feature):
use ;
use Arc;
let sdk = builder
.endpoint
.tls_config
.build?;
For custom certificate handling, implement the TlsConfig trait:
use ;
use Endpoint;
3. Configure Authentication
The SDK handles authentication automatically. You just need to provide:
- Client ID - Your OAuth client ID
- Client Secret - Your OAuth client secret
- Unity Catalog Endpoint - Passed to SDK constructor
- Table Name - Included in table properties
let client_id = "your-client-id".to_string;
let client_secret = "your-client-secret".to_string;
See examples/README.md for more information on how to get these credentials.
4. Create a Stream
Configure table properties and stream options:
use fs;
use Message;
use ;
// Load descriptor from generated files
let descriptor_proto = load_descriptor;
let table_properties = TableProperties ;
let options = StreamConfigurationOptions ;
let mut stream = sdk.create_stream.await?;
5. Ingest Data
The SDK provides flexible ways to ingest data with different levels of abstraction:
| Wrapper | Format | Description |
|---|---|---|
ProtoMessage<T> |
Proto | Auto-encoding: pass structs, SDK handles encoding |
ProtoBytes |
Proto | Pre-encoded: pass bytes with explicit wrapper |
Vec<u8> |
Proto | Backward-compatible: raw bytes without wrapper |
JsonValue<T> |
JSON | Auto-serializing: pass structs, SDK handles JSON conversion |
JsonString |
JSON | Pre-serialized: pass JSON strings with explicit wrapper |
String |
JSON | Backward-compatible: raw strings without wrapper |
Single Record Ingestion
use ProtoMessage;
let record = YourMessage ;
// Ingest and get offset (after queuing)
let offset = stream.ingest_record_offset.await?;
// Wait for server acknowledgment
stream.wait_for_offset.await?;
Batch Ingestion
Ingest multiple records at once for higher throughput with all-or-nothing semantics.
use ProtoMessage;
let records: = vec!;
// Returns Some(offset) for non-empty batches, None for empty batches
if let Some = stream.ingest_records_offset.await?
High Throughput Pattern
Ingest many records without waiting for each acknowledgment, then flush periodically:
for i in 0..100_000
stream.flush.await?;
See examples/ for complete working examples with all wrapper types, serialization formats, and ingestion patterns.
6. Handle Acknowledgments
The recommended ingest_record_offset() and ingest_records_offset() methods return offsets directly (after queuing):
ingest_record_offset()returnsOffsetId(the logical offset)ingest_records_offset()returnsOption<OffsetId>(None if the batch is empty)
// Ingest and get offset, after queuing the record.
let offset_id = stream.ingest_record_offset.await?;
println!;
// Wait for acknowledgment when needed.
stream.wait_for_offset.await?;
println!;
// For batches, the method returns Option<OffsetId>.
// None if the batch is empty.
let batch = vec!;
if let Some = stream.ingest_records_offset.await? else
// High-throughput: collect offsets and wait selectively.
let mut offsets = Vecnew;
for i in 0..1000
// Wait for specific offsets as needed.
for offset in offsets
// Or use flush() to wait for all pending acknowledgments at once.
stream.flush.await?;
Using Acknowledgment Callbacks
For scenarios where you need to track acknowledgments without explicitly waiting (e.g., for metrics or logging), you can use callbacks:
use ;
use Arc;
// Define a callback that implements the AckCallback trait
;
// Configure stream with callback
let options = StreamConfigurationOptions ;
let mut stream = sdk.create_stream.await?;
for i in 0..1000
stream.flush.await?;
Important: Callbacks run synchronously in a dedicated callback handler task. Keep them lightweight (simple logging, metrics increment) to avoid callback backlog. For heavy work like database writes or network calls, send data to a channel for processing in a separate task:
use mpsc;
let = unbounded_channel;
let callback = new;
// Heavy processing in separate task
spawn;
7. Close the Stream
Always close streams to ensure data is flushed:
// Close gracefully (flushes automatically)
stream.close.await?;
If the stream fails, retrieve unacknowledged records:
match stream.close.await
Configuration Options
StreamConfigurationOptions
| Field | Type | Default | Description |
|---|---|---|---|
max_inflight_requests |
usize |
1,000,000 | Maximum unacknowledged requests in flight |
recovery |
bool |
true | Enable automatic stream recovery on failure |
recovery_timeout_ms |
u64 |
15,000 | Timeout for recovery operations (ms) |
recovery_backoff_ms |
u64 |
2,000 | Delay between recovery retry attempts (ms) |
recovery_retries |
u32 |
4 | Maximum number of recovery attempts |
server_lack_of_ack_timeout_ms |
u64 |
60,000 | Timeout waiting for server acks (ms) |
flush_timeout_ms |
u64 |
300,000 | Timeout for flush operations (ms) |
record_type |
RecordType |
RecordType::Proto |
Record serialization format (Proto or Json) |
stream_paused_max_wait_time_ms |
Option<u64> |
None |
Max time to wait during graceful close (None = full server duration, Some(0) = immediate, Some(x) = min(x, server_duration)) |
ack_callback |
Option<Arc<dyn AckCallback>> |
None |
Optional callback for acknowledgment notifications |
callback_max_wait_time_ms |
Option<u64> |
None |
Maximum time to wait for callback processing to complete after closing the stream (None = wait indefinitely, Some(x) = wait up to x ms) |
Example:
let options = StreamConfigurationOptions ;
Error Handling
The SDK categorizes errors as retryable or non-retryable:
Retryable Errors
Auto-recovered if recovery is enabled:
- Network failures
- Connection timeouts
- Temporary server errors
- Stream closed by server
Non-Retryable Errors
Require manual intervention:
InvalidUCTokenError- Invalid OAuth credentialsInvalidTableName- Table doesn't exist or invalid formatInvalidArgument- Invalid parameters or schema mismatchCode::Unauthenticated- Authentication failureCode::PermissionDenied- Insufficient table permissionsChannelCreationError- Failed to establish TLS connection
Check if an error is retryable:
match stream.ingest_record.await
Examples
Complete Working Examples
The examples/ directory contains four working examples covering different serialization formats and ingestion patterns:
| Example | Serialization | Ingestion | Package |
|---|---|---|---|
json/single/ |
JSON | Single-record | example_json_single |
json/batch/ |
JSON | Batch | example_json_batch |
proto/single/ |
Protocol Buffers | Single-record | example_proto_single |
proto/batch/ |
Protocol Buffers | Batch | example_proto_batch |
Check examples/README.md for setup instructions and detailed comparisons.
Stream Recovery
let sdk = builder
.endpoint
.unity_catalog_url
.build?;
let mut stream = sdk.create_stream.await?;
// Ingest data...
match stream.close.await
Tests
Integration tests live in the tests/ crate and run against a lightweight mock Zerobus gRPC server.
- Mock server:
tests/src/mock_grpc.rs - Test suite:
tests/src/rust_tests.rs
Run tests with logs:
Best Practices
- Reuse SDK Instances - Create one
ZerobusSdkper application and reuse for multiple streams - Always Close Streams - Use
stream.close().await?to ensure all data is flushed - Choose the Right Ingestion Method:
- Use
ingest_records_offset()for high throughput batch ingestion - Use
ingest_record_offset()when processing records individually - Both return offsets directly; use
wait_for_offset()to explicitly wait for acknowledgments - The older
ingest_record()andingest_records()methods are deprecated
- Use
- Tune Inflight Limits - Adjust
max_inflight_requestsbased on memory and throughput needs - Enable Recovery - Always set
recovery: truein production environments - Handle Ack Futures - Use
tokio::spawnfor fire-and-forget or batch-wait for verification - Monitor Errors - Log and alert on non-retryable errors
- Validate Schemas - Use the schema generation tool to ensure type safety (for Protocol Buffers)
- Secure Credentials - Never hardcode secrets; use environment variables or secret managers
- Test Recovery - Simulate failures to verify your error handling logic
API Reference
ZerobusSdk
Main entry point for the SDK.
Builder:
let sdk = builder
.endpoint // Required
.unity_catalog_url // Optional with custom headers
.tls_config // Optional, defaults to SecureTlsConfig
.build?;
Methods:
pub async
pub async
Recreates a failed stream, preserving and re-ingesting unacknowledged records.
pub async
Creates a stream with a custom headers provider for advanced authentication.
ZerobusStream
Represents an active ingestion stream.
Methods:
pub async
Ingests a single encoded record (Protocol Buffers or JSON). The await queues the record for sending and returns the logical offset ID directly. Use wait_for_offset() to explicitly wait for server acknowledgment of this offset.
pub async
Ingests multiple encoded records as a batch with all-or-nothing semantics. The entire batch either succeeds or fails as a unit. The await queues the batch for sending and returns the logical offset ID directly (or None for empty batches). Use wait_for_offset() to explicitly wait for server acknowledgment.
pub async >
Deprecated: Use ingest_record_offset() instead. Returns a future that resolves to the offset ID.
pub async
Deprecated: Use ingest_records_offset() instead. Returns a future that resolves to Some(offset_id) for non-empty batches, or None if the batch is empty.
pub async
Waits for acknowledgment of a specific logical offset. Use this method with offsets returned from ingest_record_offset() or ingest_records_offset() to explicitly wait for server acknowledgment.
pub async
Flushes all pending records and waits for acknowledgment.
pub async
Flushes and closes the stream gracefully.
pub async
Returns an iterator over all unacknowledged records as individual EncodedRecord items. This flattens batches into individual records. Only call after stream failure.
pub async
Returns unacknowledged records grouped by batch, preserving the original batch structure. Records ingested together remain grouped:
- Each
ingest_record()call creates a batch containing one record - Each
ingest_records()call creates a batch containing multiple records
Only call after stream failure.
TableProperties
Configuration for the target table.
Fields:
table_name- Full table name (e.g., "catalog.schema.table")descriptor_proto- Optional Protocol buffer descriptor loaded from generated files (required for Proto record type, None for JSON)
StreamConfigurationOptions
Stream behavior configuration.
Fields:
See Configuration Options for details.
AckCallback
Trait for receiving acknowledgment notifications.
Methods:
on_ack()- Called when a record/batch is successfully acknowledgedon_error()- Called when a record/batch encounters an error
HeadersProvider
Trait for custom authentication header providers.
Methods:
Implement this trait to provide custom authentication headers. The default implementation (OAuthHeadersProvider) handles OAuth 2.0 token management. Use this for:
- Custom token caching strategies
- Alternative authentication mechanisms
- Integration with centralized credential services
See Custom Authentication section for usage examples.
TlsConfig
Trait for TLS configuration strategies.
Implementations:
SecureTlsConfig(default) - Production TLS with system CA certificatesNoTlsConfig- No-op TLS for testing withhttp://endpoints (requirestestingfeature)
ZerobusError
Error type for all SDK operations.
Methods:
Returns true if the error can be automatically recovered by the SDK.
Language Bindings
This repository includes bindings for building SDKs in other languages.
C FFI (ffi/)
C Foreign Function Interface for languages that can call C functions (Go, C#, C++, etc.).
# Build static and dynamic libraries
# Output:
# target/release/libzerobus_ffi.a (static library for Go, C++)
# target/release/libzerobus_ffi.so (Linux dynamic library for C#)
# target/release/libzerobus_ffi.dylib (macOS dynamic library for C#)
# target/release/zerobus_ffi.dll (Windows dynamic library for C#)
# ffi/zerobus.h (C header file)
Pre-built binaries for all platforms are available in GitHub Releases with tags ffi-vX.X.X.
JNI (jni/)
Java Native Interface bindings for the Zerobus Java SDK.
# Build JNI library
# Output:
# target/release/libzerobus_jni.so (Linux)
# target/release/libzerobus_jni.dylib (macOS)
# target/release/zerobus_jni.dll (Windows)
Pre-built binaries are available in GitHub Releases with tags jni-vX.X.X.
Building from Source
For contributors or those who want to build and test the SDK:
Build specific components:
# Build only SDK
# Build only schema tool
# Build and run JSON single-record example
# Build and run JSON batch example
# Build and run Protocol Buffers single-record example
# Build and run Protocol Buffers batch example
Community and Contributing
This is an open source project. We welcome contributions, feedback, and bug reports.
- Contributing Guide: Rust-specific development setup and workflow.
- General Contributing Guide: Pull request process, commit requirements, and policies.
- Changelog: See the history of changes in the SDK.
- Security Policy: Read about our security process and how to report vulnerabilities.
- Developer Certificate of Origin (DCO): Understand the agreement for contributions.
- Open Source Attributions: See a list of the open source libraries we use.
License
This SDK is licensed under the Databricks License. See the LICENSE file for the full license text. The license is also available online at https://www.databricks.com/legal/db-license.
Requirements
- Rust 1.70 or higher (2021 edition)
- TLS - Uses native OS certificate store by default (configurable via
TlsConfigtrait) - See prerequisites for Databricks workspace and credential requirements
For issues, questions, or contributions, please visit the GitHub repository. See the monorepo README for an overview of all available SDKs.