Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
rivven-connect
Config-driven connector runtime for the Rivven event streaming platform.
Overview
rivven-connect provides a single binary that handles all data movement operations:
- Sources: Extract data from external systems (PostgreSQL CDC, MySQL, APIs)
- Sinks: Load data into destinations (Rivven broker, S3, HTTP APIs)
- Transforms: Process events in-flight (filter, mask PII, enrich)
Scalable Architecture (300+ Connectors)
rivven-connect is designed to scale to 300+ connectors with:
- Hierarchical Categories: Connectors organized by type (database, messaging, storage, warehouse, AI/ML)
- Rich Metadata: Tags and search capabilities
- Connector Inventory: Auto-registration with metadata indexing for fast discovery
- Feature Gating: Compile only the connectors you need
┌─────────────────────────────────────────────────────────────────┐
│ Connector Inventory (Scalable) │
├─────────────────────────────────────────────────────────────────┤
│ Categories │
│ ├── Database (postgres_cdc, mysql_cdc, sqlserver_cdc, ...). │
│ ├── Messaging (mqtt, sqs, pubsub, nats, ...) │
│ ├── Storage (s3, gcs, azure, minio, ...) │
│ ├── Warehouse (snowflake, bigquery, redshift, databricks, clickhouse, ...) │
│ ├── AI/ML Vector (qdrant, pinecone, weaviate, ...) │
│ └── Utility (datagen, stdout, http, ...) │
└─────────────────────────────────────────────────────────────────┘
Runtime Registry Dispatch
When rivven-connect encounters a connector type that isn't hardcoded (e.g., a community or custom connector), the SourceRunner and SinkRunner automatically fall back to the connector registry. The registry uses type-erased AnySource/AnySink dispatch so third-party connectors get the same pipeline features as built-in ones:
- Check → Discover → Read/Write lifecycle via
SourceFactory/SinkFactory - Same rate limiting, backpressure, and transform pipeline as built-in connectors
- Zero code changes needed — just register your factory in
create_source_registry()/create_sink_registry()
Lock-Free Rate Limiter
The token-bucket rate limiter on source and sink hot paths is fully lock-free. It uses AtomicU64 with CAS loops for both refill timestamp tracking and token management — zero RwLock or Mutex contention under concurrent workloads.
Zero-Allocation Batch Sizing
Batch size estimation uses SourceEvent::estimated_size() — a heuristic that walks field lengths without serialization. This eliminates the serde_json::to_vec() allocation that was previously required per event during batch splitting, keeping the hot path allocation-free.
Security Architecture
rivven-connect runs as a separate process from the Rivven broker. This design:
- Security: Database credentials are isolated - if the broker is compromised, attackers don't get DB access
- Scalability: CDC connectors can run on different machines, closer to source databases
- Simplicity: Broker stays lean and focused on messaging
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ PostgreSQL │─────▶│ rivven-connect │─────▶│ rivvend │
│ MySQL │ │ (config-driven)│ │ (broker) │
│ HTTP API │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Quick Start
1. Generate Example Configuration
2. Edit Configuration
version: "1.0"
broker:
bootstrap_servers:
- localhost:9092
sources:
postgres_db:
connector: postgres-cdc
topic: cdc.postgres.events # Required: destination topic
config:
host: localhost
port: 5432
database: mydb
user: postgres
password: ${POSTGRES_PASSWORD}
slot_name: rivven_slot
publication: rivven_pub
topic_routing: "cdc.{schema}.{table}" # Optional: dynamic routing
schemas:
- public
tables:
- users
- orders
sinks:
s3_archive:
connector: s3
topics: # Required: topics to consume
consumer_group: s3-archive # Required: for offset tracking
config:
bucket: my-data-lake
region: us-east-1
format: parquet
3. Check Configuration
4. Run Pipelines
Commands
| Command | Description |
|---|---|
run |
Run all enabled pipelines |
check |
Validate config and test connectivity |
discover |
Discover available streams from a source |
spec |
Show connector specifications |
init |
Generate example configuration |
status |
Show status of running pipelines |
postgres |
Legacy mode: direct PostgreSQL CDC |
Available Connectors
Sources
| Connector | Feature | Description |
|---|---|---|
datagen |
(default) | Synthetic data generator for testing |
postgres-cdc |
postgres |
PostgreSQL logical replication CDC |
mysql-cdc |
mysql |
MySQL/MariaDB binlog CDC |
sqlserver-cdc |
sqlserver |
SQL Server Change Data Capture |
rdbc-source |
rdbc |
Query-based polling source (PostgreSQL, MySQL, SQL Server) |
kafka-source |
kafka |
Apache Kafka consumer (pure Rust, zero C deps) |
external-queue |
external-queue |
External message queue consumer |
mqtt |
mqtt |
MQTT broker subscriber (rumqttc, TLS, exponential backoff) |
sqs |
sqs |
AWS SQS queue consumer |
pubsub |
pubsub |
Google Cloud Pub/Sub subscriber |
Sinks
| Connector | Feature | Description |
|---|---|---|
stdout |
(default) | Console output for debugging |
http-webhook |
http |
HTTP/HTTPS webhooks with signing |
rdbc-sink |
rdbc |
Batch database sink (PostgreSQL, MySQL, SQL Server) |
kafka-sink |
kafka |
Apache Kafka producer (pure Rust, zero C deps) |
sqs-sink |
sqs |
AWS SQS queue producer (batch sending, FIFO support) |
external-queue |
external-queue |
External message queue producer |
object-storage |
cloud-storage |
Unified object storage (S3, GCS, Azure, local) |
s3 |
s3 |
Amazon S3 / MinIO / R2 (via object-storage) |
gcs |
gcs |
Google Cloud Storage (via object-storage) |
azure-blob |
azure |
Azure Blob Storage (via object-storage) |
iceberg |
iceberg |
Apache Iceberg lakehouse format |
delta-lake |
delta-lake |
Delta Lake with ACID transactions |
snowflake |
snowflake |
Snowflake Data Cloud |
bigquery |
bigquery |
Google BigQuery |
redshift |
redshift |
Amazon Redshift |
databricks |
databricks |
Databricks Delta tables via Zerobus |
clickhouse |
clickhouse |
ClickHouse OLAP via native RowBinary + circuit breaker + metrics |
qdrant |
qdrant |
Qdrant vector DB via gRPC + circuit breaker + metrics |
pinecone |
pinecone |
Pinecone managed vector DB via gRPC + circuit breaker + metrics |
Unified Object Storage
All object storage connectors (S3, GCS, Azure Blob) now use a single unified implementation
powered by the object_store crate. Configure with the provider field:
sinks:
events:
connector: object-storage
topics: # Required: topics to consume
consumer_group: object-storage # Required: for offset tracking
config:
provider: s3 # s3 | gcs | azure | local
bucket: my-bucket
prefix: events/
format: jsonl
compression: gzip
partitioning: day
s3:
region: us-east-1
Apache Iceberg
The Iceberg sink writes events to Apache Iceberg tables using the official Apache Iceberg Rust SDK (iceberg crate v0.8.0). Features include:
- Full Writer Stack: Uses
ParquetWriterBuilder,RollingFileWriterBuilder, andDataFileWriterBuilder - Catalog Management: REST (Polaris, Tabular, Lakekeeper) and Memory catalogs
- Table Management: Automatic namespace and table creation with schema inference
- Atomic Transactions: Fast append via Transaction API with snapshot isolation
- Configurable Compression: Snappy (default), Gzip, LZ4, Zstd, Brotli, or None
- Production-Ready: Arrow/Parquet v57.x with structured logging
sinks:
lakehouse:
connector: iceberg
topics: # Required: topics to consume
consumer_group: iceberg-sink # Required: for offset tracking
config:
catalog:
type: rest # rest | glue | hive | memory
rest:
uri: http://polaris:8181
warehouse: s3://bucket/warehouse
namespace: analytics
table: events
batch_size: 10000
target_file_size_mb: 128 # Rolling file size
compression: snappy # none | snappy | gzip | lz4 | zstd | brotli
partitioning: time # none | identity | bucket | time
partition_fields:
commit_mode: append # append | overwrite | upsert
s3:
region: us-west-2
endpoint: http://minio:9000 # For MinIO
path_style_access: true
See docs/ICEBERG_SINK.md for complete configuration reference.
Delta Lake
The Delta Lake sink writes events to Delta Lake tables using the delta-rs native Rust implementation (deltalake crate v0.30). Features include:
- ACID Transactions: Every write is atomic with snapshot isolation via the Delta log
- No JVM Required: Pure Rust via delta-rs (3k+ stars)
- Storage Backends: S3, GCS, Azure, local filesystem
- RecordBatchWriter: Arrow-native write path with Parquet compression
- Auto Table Creation: Creates tables on first write with configurable schema
- Commit Retry: Exponential backoff on transaction conflicts
sinks:
delta:
connector: delta-lake
topics:
consumer_group: delta-sink
config:
table_uri: s3://my-bucket/warehouse/events
auto_create_table: true
batch_size: 10000
flush_interval_secs: 60
compression: snappy
partition_columns:
- event_type
s3:
region: us-east-1
See docs/delta-lake-sink.md for complete configuration reference.
Apache Kafka (Pure Rust)
The Kafka connector provides bidirectional Kafka integration using rskafka, a pure Rust Kafka client with zero C dependencies:
- Pure Rust: No librdkafka, no C compiler needed
- Lock-free Metrics: Atomic counters with Prometheus export
- Exponential Backoff: Configurable retry with backoff
- Compression: None, Gzip, Snappy, LZ4, Zstd
- Security: SASL PLAIN with SSL/TLS
# Source: Consume from Kafka
sources:
kafka_orders:
connector: kafka-source
topic: kafka-orders # Rivven topic (destination)
config:
brokers:
topic: orders # Kafka topic (external source)
consumer_group: rivven-migration
start_offset: earliest
fetch_max_messages: 500
retry_initial_ms: 100
retry_max_ms: 10000
security:
protocol: sasl_ssl
mechanism: PLAIN
username: ${KAFKA_USER}
password: ${KAFKA_PASSWORD}
# Sink: Produce to Kafka
sinks:
kafka_replica:
connector: kafka-sink
topics: # Rivven topics to consume from
consumer_group: kafka-producer
config:
brokers:
topic: orders-replica # Kafka topic (external destination)
compression: lz4
acks: all
batch_size_bytes: 16384
linger_ms: 5
See docs/kafka-connector.md for complete configuration reference.
HTTP Webhook Sink
Best-in-class HTTP webhook sink with enterprise features:
- Methods: POST, PUT, PATCH, DELETE
- Authentication: Bearer, Basic, API Key
- Signing: HMAC-SHA256 webhook signatures
- Compression: gzip for payloads above threshold
- Circuit Breaker: Fail fast when endpoint is unavailable
- Retry Strategy: Exponential backoff with jitter
- URL Templates: Dynamic URLs with
{stream},{event_type},{namespace} - Batching: Configurable size and timeout
- TLS: Custom CA certificates
sinks:
events:
connector: http-webhook
topics: # Required: topics to consume
consumer_group: http-webhook # Required: for offset tracking
config:
url: https://api.example.com/{stream}/events
method: POST
batch_size: 100
batch_timeout_ms: 5000
# Authentication
auth:
type: bearer
token: ${WEBHOOK_TOKEN}
# HMAC Signing (for webhook verification)
signing:
secret: ${WEBHOOK_SECRET}
header_name: X-Webhook-Signature
include_timestamp: true
# Compression (for large payloads)
compression_threshold_bytes: 1024
# Resilience
max_retries: 3
retry_backoff_ms: 1000
retry_jitter: true
circuit_breaker_threshold: 5
circuit_breaker_reset_secs: 30
# Advanced
max_concurrency: 4
timeout_secs: 30
health_check_url: https://api.example.com/health
headers:
X-Custom-Header: value
Metrics (via /metrics):
requests_sent,requests_success,requests_failedcircuit_breaker_rejections,rate_limitedbytes_sent,bytes_compressed,avg_latency_ms
RDBC Connectors
Query-based connectors powered by rivven-rdbc. Features hot path optimizations including true batch operations (10-100x throughput vs per-row), connection lifecycle management, and optional transactional writes for exactly-once semantics.
RDBC Source
Query-based polling source supporting bulk, incrementing, timestamp, and combined modes:
sources:
users:
connector: rdbc-source
topic: user-events # Required: destination topic
config:
connection_url: postgres://user:pass@localhost/db
schema: public
table: users
mode: incrementing # bulk | incrementing | timestamp | timestamp_incrementing
incrementing_column: id
poll_interval_ms: 1000
batch_size: 1000
# Pool configuration
pool_size: 1 # Optional: max connections (default: 1)
min_pool_size: 1 # Optional: warm-up connections (default: 1)
max_lifetime_secs: 3600 # Optional: max connection age (default: 1 hour)
idle_timeout_secs: 600 # Optional: idle timeout (default: 10 minutes)
acquire_timeout_ms: 30000 # Optional: pool wait timeout (default: 30 seconds)
RDBC Sink
High-performance batch writer with connection pooling, upsert, insert, and delete support:
sinks:
users_copy:
connector: rdbc-sink
topics: # Required: topics to consume
consumer_group: rdbc-warehouse # Required: for offset tracking
config:
connection_url: postgres://user:pass@localhost/warehouse
schema: public
table: users_snapshot
write_mode: upsert # insert | update | upsert
pk_columns:
batch_size: 1000
batch_timeout_ms: 5000
delete_enabled: true
transactional: true # Optional: exactly-once semantics
# Pool configuration with lifecycle management
pool_size: 4 # Optional: max connections (default: 4)
min_pool_size: 2 # Optional: warm-up connections (default: 1)
max_lifetime_secs: 3600 # Optional: max connection age (default: 1 hour)
idle_timeout_secs: 600 # Optional: idle timeout (default: 10 minutes)
acquire_timeout_ms: 30000 # Optional: pool wait timeout (default: 30 seconds)
Pool Metrics
RDBC connectors expose pool metrics via the Prometheus endpoint (/metrics):
| Metric | Description |
|---|---|
rivven_connect_pool_connections_total |
Total connections created |
rivven_connect_pool_acquisitions_total |
Total pool acquisitions |
rivven_connect_pool_reuse_ratio |
Connection reuse ratio (0-1) |
rivven_connect_pool_avg_wait_ms |
Average acquisition wait time |
rivven_connect_pool_recycled_total |
Connections recycled (by reason) |
rivven_connect_pool_health_failures_total |
Failed health checks |
Multi-Database Support
Use the rdbc-* features to enable database drivers:
= { = "0.0.15", = ["rdbc-postgres"] } # PostgreSQL
= { = "0.0.15", = ["rdbc-mysql"] } # MySQL/MariaDB
= { = "0.0.15", = ["rdbc-sqlserver"] } # SQL Server
= { = "0.0.15", = ["rdbc-full"] } # All databases
Feature Bundles
# In Cargo.toml
= { = "0.0.15", = ["full"] }
# Or selective features
= { = "0.0.15", = ["postgres", "s3"] }
| Bundle | Includes |
|---|---|
rdbc-full |
rdbc-postgres, rdbc-mysql, rdbc-sqlserver |
queue-full |
mqtt, sqs, pubsub |
storage-full |
s3, gcs, azure, parquet |
lakehouse-full |
iceberg, delta-lake |
warehouse-full |
snowflake, bigquery, redshift, databricks, clickhouse |
vectordb-full |
qdrant, pinecone |
full |
All connectors (including rdbc-full) |
Single Message Transforms (SMT)
CDC connectors support 17 built-in transforms applied via YAML configuration. No code required - fully configurable at deployment time.
| Transform | Description |
|---|---|
extract_new_record_state |
Flatten envelope, extract "after" state |
value_to_key |
Extract key fields from value |
mask_field |
Mask sensitive fields (PII, credit cards) |
insert_field |
Add static or computed fields |
replace_field / rename_field |
Rename, include, or exclude fields |
regex_router |
Route events based on regex patterns |
timestamp_converter |
Convert timestamp formats |
timezone_converter |
Convert between timezones |
filter |
Filter events based on conditions |
cast |
Convert field types |
flatten |
Flatten nested JSON structures |
content_router |
Route based on field content |
header_to_value |
Copy envelope metadata into record |
unwrap |
Extract nested field to top level |
set_null |
Set fields to null conditionally |
compute_field |
Compute new fields (concat, hash, etc.) |
externalize_blob |
Store large blobs in object storage (S3/GCS/Azure/local) |
Predicates (Conditional Transforms)
Apply transforms only to events matching specific conditions:
transforms:
# Mask SSN only for users table
- type: mask_field
predicate:
table: "users"
config:
fields:
# Externalize blobs only for documents table
- type: externalize_blob
predicate:
table: "documents"
operations:
config:
storage_type: s3
bucket: my-blobs
Predicate Options:
table/tables- Filter by table name (regex)schema/schemas- Filter by schema name (regex)operations- Filter by operation (insert, update, delete, snapshot)field_exists- Apply only if field existsfield_value- Apply only if field matches valuenegate- Invert the predicate
Example Configuration:
sources:
postgres_cdc:
connector: postgres-cdc
config:
host: localhost
port: 5432
database: mydb
user: replicator
password: ${DB_PASSWORD}
# Column-level filtering
column_filters:
public.users:
include:
exclude:
# Transform pipeline
transforms:
- type: mask_field
config:
fields:
- type: timestamp_converter
config:
fields:
format: iso8601
- type: filter
config:
condition: "status = active"
drop: false
- type: extract_new_record_state
config:
add_table: true
add_op: true
See the full CDC Configuration Reference for all options.
CDC Features
CDC connectors include advanced built-in features for production workloads:
| Feature | Description |
|---|---|
| Field-Level Encryption | AES-256-GCM / ChaCha20-Poly1305 encryption for sensitive fields (PII, PCI) |
| Deduplication | Hash-based event deduplication with configurable TTL |
| Tombstone Handling | Proper CDC tombstone emission for compacted topics |
| Column Masking | Real-time masking of sensitive data |
| Event Filtering | SQL-like filter expressions |
Field-Level Encryption
Encrypt sensitive fields at the source using AES-256-GCM or ChaCha20-Poly1305 encryption with versioned key rotation. Keys are managed via environment variables or external key providers.
sources:
postgres_cdc:
connector: postgres-cdc
config:
# ... connection settings ...
cdc_features:
encryption:
enabled: true
fields:
- credit_card
- ssn
- bank_account
Encrypted fields are transparently decrypted by downstream consumers with the same key configuration. Supports key rotation via versioned encryption keys.
Deduplication
Prevent duplicate events caused by network retries, replication lag, or connector restarts:
sources:
postgres_cdc:
connector: postgres-cdc
config:
cdc_features:
deduplication:
enabled: true
ttl_seconds: 3600 # How long to remember seen events
max_entries: 100000 # Max cache size (LRU eviction)
Tombstone Handling
Properly emit tombstones (null-value records) for DELETE operations, enabling log compaction:
sources:
postgres_cdc:
connector: postgres-cdc
config:
cdc_features:
tombstones:
enabled: true
emit_tombstone_after_delete: true # Emit tombstone after delete event
Datagen Source (Testing & Demos)
Generate synthetic data without any external dependencies. Perfect for testing and demos.
Available Patterns
| Pattern | Description |
|---|---|
sequence |
Sequential integers (0, 1, 2, ...) |
random |
Random JSON objects |
users |
Fake user records with names, emails |
orders |
E-commerce order data |
events |
Generic event stream (page_view, click, etc.) |
metrics |
Time-series metric data |
key_value |
Simple key-value pairs |
Example Configuration
version: "1.0"
broker:
address: localhost:9092
sources:
demo-data:
connector: datagen
topic: demo-events
config:
pattern: orders # Data pattern
events_per_second: 100 # Rate limit (0 = unlimited)
max_events: 10000 # Stop after N events (0 = unlimited)
stream_name: orders # Stream name in events
seed: 42 # For reproducible data
cdc_mode: true # Generate INSERT/UPDATE/DELETE cycles
CDC Simulation Mode
When cdc_mode: true, the datagen connector simulates realistic CDC event streams:
- 60% INSERT: New records added to the stream
- 30% UPDATE: Existing records modified (with before/after values)
- 10% DELETE: Records removed
This is perfect for testing CDC pipelines without setting up a real database.
All Configuration Options
config:
# Data pattern: sequence, random, users, orders, events, metrics, key_value
pattern: orders
# Events per second (0 = as fast as possible)
events_per_second: 10
# Maximum events (0 = unlimited)
max_events: 0
# Stream name for events
stream_name: datagen
# Namespace (optional)
namespace: demo
# Include sequence number in data
include_sequence: true
# Include timestamp in data
include_timestamp: true
# Add custom fields to every event
custom_fields:
env: "test"
version: 1
# Seed for reproducibility (optional)
seed: 12345
# CDC mode: generate INSERT/UPDATE/DELETE cycles (default: false)
cdc_mode: false
PostgreSQL CDC Setup
Configure your PostgreSQL database:
-- Create publication for tables you want to capture
CREATE PUBLICATION rivven_pub FOR TABLE users, orders, products;
-- Or for all tables
CREATE PUBLICATION rivven_pub FOR ALL TABLES;
The replication slot is created automatically on first run.
SQL Server CDC Setup
SQL Server CDC uses the native Change Data Capture feature which tracks changes via CDC tables.
Enable CDC on Database and Tables
-- Enable CDC on the database
USE mydb;
GO
EXEC sys.sp_cdc_enable_db;
GO
-- Enable CDC on tables (creates capture instances)
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'users',
@role_name = NULL,
@supports_net_changes = 1;
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'orders',
@role_name = NULL,
@supports_net_changes = 1;
GO
Example Configuration
sources:
sqlserver_db:
connector: sqlserver-cdc
config:
host: localhost
port: 1433
database: mydb
username: sa
password: ${MSSQL_PASSWORD}
schema: dbo
poll_interval_ms: 500
snapshot_mode: initial # initial, always, never, when_needed
encrypt: false
trust_server_certificate: true
include_tables:
- users
- orders
streams:
- name: users
namespace: dbo
sync_mode: incremental
- name: orders
namespace: dbo
sync_mode: incremental
Supported Versions
- SQL Server 2016 SP1+ (on-premises)
- SQL Server 2019 (recommended)
- SQL Server 2022
- Azure SQL Database (with CDC enabled)
Snowflake Sink Setup
The Snowflake sink uses Snowpipe Streaming API with JWT (RSA key-pair) authentication and production-grade resilience features:
- Exponential backoff retry with configurable jitter and Retry-After support
- Circuit breaker pattern to prevent cascading failures
- Request tracing with unique
X-Request-IDheaders for end-to-end debugging - Metrics integration for observability (success/failed/retried counts, latencies)
- Compression support for large payloads (gzip)
Generate RSA Key Pair
# Generate RSA private key (unencrypted PKCS#8 format)
|
# Extract public key for Snowflake
Configure Snowflake User
-- As ACCOUNTADMIN
RIVVEN_USER SET RSA_PUBLIC_KEY='MIIBIjANBgkqhki...';
-- Grant privileges
USAGE ON DATABASE ANALYTICS TO ROLE RIVVEN_ROLE;
INSERT, SELECT ON TABLE ANALYTICS.CDC.EVENTS TO ROLE RIVVEN_ROLE;
Example Configuration
sinks:
snowflake:
connector: snowflake
topics: # Required: topics to consume
consumer_group: snowflake-sink # Required: for offset tracking
config:
account: myorg-account123
user: RIVVEN_USER
private_key_path: /secrets/rsa_key.p8
role: RIVVEN_ROLE
database: ANALYTICS
schema: CDC
table: ORDERS
warehouse: COMPUTE_WH
batch_size: 5000
# Compression (recommended for large batches)
compression_enabled: true
compression_threshold_bytes: 8192
# Retry configuration (optional)
retry:
max_retries: 3
initial_backoff_ms: 1000
max_backoff_ms: 30000
jitter_factor: 0.1 # 10% randomization
# Circuit breaker (optional)
circuit_breaker:
enabled: true
failure_threshold: 5 # Failures before opening circuit
reset_timeout_secs: 30 # Seconds before testing recovery
Note: Private key must be unencrypted PKCS#8 PEM format (begins with
-----BEGIN PRIVATE KEY-----).
Retry Behavior
Transient failures are automatically retried with exponential backoff and jitter:
- Retryable: 408, 429, 500-504, network errors
- Non-retryable: 400, 401, 403, 404
- Jitter: Adds randomization to prevent thundering herd
- Retry-After: Respects Snowflake's
Retry-Afterheader for 429 responses
Circuit Breaker
The circuit breaker prevents cascading failures when Snowflake is unavailable:
- Closed: Normal operation
- Open: After 5 consecutive failures, requests fail fast
- Half-Open: After 30 seconds, allows test request to check recovery
Metrics
The connector exports metrics via the metrics crate facade:
snowflake.requests.success- Successful API requestssnowflake.requests.failed- Failed API requestssnowflake.requests.retried- Retried requestssnowflake.request.duration_ms- Request latency histogramsnowflake.circuit_breaker.rejected- Requests rejected by circuit breaker
Configuration Reference
See examples/connect-config.yaml for a complete example.
Sources
sources:
<name>:
connector: <connector-type>
topic: <output-topic> # Required: destination topic
config:
# Connector-specific configuration
topic_routing: <pattern> # Optional: dynamic topic routing (CDC only)
schemas: # Optional: schemas to include
- <schema-name>
tables: # Optional: tables to include
- <table-name>
Topic Routing (CDC Connectors)
Route CDC events to different topics based on source metadata. The topic_routing option is configured inside the connector's config section since it uses CDC-specific placeholders.
sources:
postgres:
connector: postgres-cdc
topic: cdc.default # Fallback topic
config:
# ... postgres config
topic_routing: "cdc.{schema}.{table}"
Supported Placeholders:
| Placeholder | Description | Example |
|---|---|---|
{database} |
Database name | mydb |
{schema} |
Schema name | public |
{table} |
Table name | users |
Examples:
"cdc.{schema}.{table}"→cdc.public.users"{database}.{schema}.{table}"→mydb.public.users"events.{table}"→events.orders
Topic Name Normalization:
Rivven provides comprehensive normalization following industry standards:
| Original | Normalized | Reason |
|---|---|---|
my schema |
my_schema |
Spaces → underscore |
user@data |
user_data |
Special chars → underscore |
schema.with.dots |
schema_with_dots |
Dots replaced |
UserProfiles |
user_profiles |
snake_case conversion |
my-table |
my_table |
Avro-compatible mode |
dbo_users |
users |
Prefix stripping |
Normalization Options:
sources:
postgres:
connector: postgres-cdc
topic: cdc.default
config:
topic_routing: "cdc.{schema}.{table}"
normalization:
case_conversion: snake_case # none, lower, upper, snake_case, kebab-case
avro_compatible: true # Stricter naming for Avro
strip_prefixes: # Remove common prefixes
strip_suffixes: # Remove common suffixes
Topic names longer than 249 characters are truncated with a hash suffix for uniqueness.
Programmatic Usage:
use ;
// Simple snake_case conversion
let resolver = builder
.snake_case
.build
.unwrap;
// Production configuration with all options
let resolver = builder
.snake_case
.avro_compatible
.strip_prefixes
.build
.unwrap;
// Resolve topic name
let metadata = new;
let topic = resolver.resolve; // "sales_db.my_schema.user_profiles"
// Custom normalization config
let config = new
.with_case_conversion
.with_avro_compatible
.with_strip_prefixes;
let resolver = builder
.normalization
.build
.unwrap;
Sinks
sinks:
<name>:
connector: <connector-type>
topics: # Required: topics to consume
consumer_group: <group-id> # Required: for offset tracking
config:
# Connector-specific configuration
Security
- Database credentials are isolated to
rivven-connect- the broker never sees them - Supports environment variable substitution:
${VAR_NAME} - TLS support for sink connections
Schema Registry
rivven-connect provides 3 schema registry modes for maximum deployment flexibility, with native support for JSON Schema, Apache Avro, and Protocol Buffers.
Note: The broker (rivvend) is schema-agnostic. All schema operations are handled by rivven-schema or external registries.
Type Architecture: Schema types are re-exported from rivven-schema for consistency:
// All types come from rivven-schema
use ;
| Mode | Description | Use Case |
|---|---|---|
| External | Connect to external Schema Registry (incl. rivven-schema) | Production, multi-cluster |
| External (AWS Glue) | Connect to AWS Glue | AWS-native deployments |
| Disabled | No schema validation | Development, plain JSON |
Mode 1: External
Connect to an external Schema Registry or any compatible implementation:
schema_registry:
mode: external
external:
url: http://schema-registry:8081
username: ${SCHEMA_REGISTRY_USER}
password: ${SCHEMA_REGISTRY_PASSWORD}
Mode 2: AWS Glue
Connect to AWS Glue Schema Registry for AWS-native deployments:
schema_registry:
mode: glue
glue:
region: us-east-1
registry_name: my-registry # optional, defaults to "default-registry"
cache_ttl_secs: 300
Features:
- Full AWS Glue Schema Registry API support
- IAM authentication (uses default AWS credential chain)
- Schema versioning and compatibility checking
- Automatic UUID-to-integer ID mapping for standard wire format
- Schema caching for performance
Mode 3: Disabled (No Registry)
For development or simple use cases, disable schema validation:
schema_registry:
mode: disabled
Events are sent as plain JSON without the 5-byte schema ID header.
Native Avro Support
Full Apache Avro support with standard wire format compatibility:
use ;
let schema = parse?;
let codec = new;
let avro_bytes = codec.encode?;
let wire_bytes = codec.encode_wire_format?;
Native Protobuf Support
Full Protocol Buffers support with dynamic schema parsing:
use ;
let schema = parse?;
let codec = new;
let proto_bytes = codec.encode?;
Features
- Schema Evolution: Backward/forward/full compatibility checking
- Multiple Formats: JSON Schema, Avro, Protobuf (all ✅ supported)
- Standard Wire Format: Compatible with common Schema Registries
- Schema Fingerprinting: MD5 and SHA-256 fingerprints for caching
- Auto-inference: Automatically infer schemas from data samples
- Caching: Built-in schema caching for performance
Output Formats
The format module provides pluggable serialization for storage sinks.
Architecture
Formats are how data is serialized; storage connectors are where it is written:
┌─────────────────────────────────────────────────────────────────┐
│ Storage Connectors (WHERE to write) │
│ S3Sink, GcsSink, AzureBlobSink │
└───────────────────────────┬─────────────────────────────────────┘
│ uses
▼
┌─────────────────────────────────────────────────────────────────┐
│ Format Writers (HOW to serialize) │
│ JsonWriter, ParquetWriter, AvroWriter, CsvWriter │
└─────────────────────────────────────────────────────────────────┘
Available Formats
| Format | Extension | Best For | Append |
|---|---|---|---|
jsonl |
.jsonl |
Streaming, logs, real-time | ✅ |
json |
.json |
Human-readable, debugging | ❌ |
parquet |
.parquet |
Analytics, data lakes | ❌ |
avro |
.avro |
Schema Registry integration | ❌ |
csv |
.csv |
Spreadsheets, exports | ✅ |
Using Format Writers
use ;
// Create a Parquet writer
let writer = new;
// Create an Avro writer with schema inference
let avro_writer = new?;
// Create a CSV writer
let csv_writer = new;
// Use via FormatWriter trait
println!; // "parquet"
println!; // ".parquet"
let bytes = writer.write_batch?;
Configuration
sinks:
s3:
connector: s3
config:
bucket: my-bucket
format: parquet # Options: json, jsonl, parquet, avro, csv
parquet:
compression: zstd
row_group_size: 10000
Output Compression
Text-based formats (JSON, JSONL, CSV) support output file compression:
sinks:
s3:
connector: s3
config:
bucket: my-bucket
format: jsonl
compression: gzip # Options: none, gzip, zstd, snappy, lz4
| Algorithm | Extension | Speed | Use Case |
|---|---|---|---|
none |
- | N/A | No compression |
gzip |
.gz |
Medium | Universal compatibility |
zstd |
.zst |
Fast | Best balance |
snappy |
.snappy |
Very Fast | Interoperability |
lz4 |
.lz4 |
Fastest | Real-time streaming |
Note: Parquet and Avro have built-in compression - use their internal compression settings instead of output compression.
Distributed Mode
Run rivven-connect across multiple nodes for scalability and high availability:
distributed:
enabled: true
node_id: node-1
cluster_topic: _connect_status
# Singleton connectors (like CDC) get automatic failover
failover:
heartbeat_interval_ms: 1000
failure_timeout_ms: 10000
Connector Modes
| Mode | Description | Example |
|---|---|---|
singleton |
One instance with failover | CDC connectors |
scalable |
Multiple instances, load balanced | HTTP sources |
partitioned |
One instance per partition | Topic consumers |
Singleton Enforcement (CDC)
CDC connectors cannot run in parallel because:
- Database replication slots allow only one active consumer
- Multiple instances would cause data duplication
When a singleton leader fails, the coordinator automatically promotes a standby:
Node-1 (Leader) ──fails──► Node-2 (Standby) becomes Leader
Node-3 (Standby) watches
Health Monitoring
Monitor connector health with automatic failure detection and auto-recovery:
sources:
postgres_db:
connector: postgres-cdc
config:
# ... database config ...
health:
enabled: true
check_interval_secs: 10
max_lag_ms: 30000
failure_threshold: 3
success_threshold: 2
auto_recovery: true
recovery_delay_secs: 1
max_recovery_delay_secs: 60
Health States
| State | Description |
|---|---|
Healthy |
All checks passing, lag within threshold |
Degraded |
Some checks passing with warnings |
Unhealthy |
Failure threshold exceeded |
Recovering |
Auto-recovery with exponential backoff |
Kubernetes Health Probes
The CdcFeatureProcessor provides K8s-compatible health endpoints:
use CdcFeatureProcessor;
// Liveness probe (is the process alive?)
let = processor.liveness_probe.await; // (200, "OK") or (503, "Service Unavailable")
// Readiness probe (can we serve traffic?)
let = processor.readiness_probe.await;
// Detailed health JSON
let json = processor.health_json.await;
// {"status":"healthy","ready":true,"lag_ms":100,"uptime_secs":3600,...}
Programmatic Health Monitoring
use ;
let config = builder
.health
.build;
let processor = new;
// Register custom health checks
processor.register_health_check.await;
// Monitor health
processor.mark_connected;
let is_healthy = processor.is_healthy;
// Auto-recovery with custom function
let recovered = processor.attempt_recovery.await;
// Unregister health check when no longer needed
processor.unregister_health_check.await;
Health Monitoring Prometheus Metrics
The health monitor emits comprehensive Prometheus metrics for observability:
| Metric | Type | Description |
|---|---|---|
rivven_cdc_health_monitoring_enabled |
Gauge | Health monitoring enabled status |
rivven_cdc_health_state_healthy |
Gauge | Current health state (1/0) |
rivven_cdc_health_state_ready |
Gauge | Current readiness state (1/0) |
rivven_cdc_health_checks_passed_total |
Counter | Passed health checks |
rivven_cdc_health_checks_failed_total |
Counter | Failed health checks |
rivven_cdc_health_state_transitions_total |
Counter | State transitions |
rivven_cdc_health_recoveries_succeeded_total |
Counter | Successful recoveries |
rivven_cdc_health_recoveries_failed_total |
Counter | Failed recoveries |
rivven_cdc_health_unhealthy_time_ms_total |
Counter | Time spent unhealthy |
Legacy Mode
For simple PostgreSQL CDC without a config file:
Development
Building Custom Connectors
Use the derive macros to reduce boilerplate when building custom connectors:
use *;
use SourceConfigDerive;
// Generates: MySourceConfigSpec with spec(), name(), version() methods
// The spec() includes automatic JSON Schema generation
For sinks with batching support:
use *;
use SinkConfigDerive;
// Generates: MySinkConfigSpec with spec(), name(), version(), batch_config() methods
See Connector Development Guide for complete documentation.
Build Commands
# Build
# Run with verbose logging
# Run unit tests
# Run integration tests (requires Docker)
Documentation
License
Apache-2.0. See LICENSE.