rivven-operator
Kubernetes operator for deploying and managing Rivven clusters and connectors.
Overview
The Rivven Operator provides Custom Resource Definitions (CRDs) for declarative, GitOps-friendly cluster management.
Features
| Resource | Description |
|---|---|
| RivvenCluster | Declarative cluster management with StatefulSets |
| RivvenConnect | Declarative connector pipeline management |
| RivvenTopic | GitOps-friendly topic management |
| RivvenSchemaRegistry | Declarative Schema Registry deployment |
Additional capabilities:
- Automated reconciliation with eventual consistency
- Ordered deployment, scaling, and rolling updates
- Secure credential handling for sources and sinks
- Prometheus-compatible operator metrics
Quick Start
Prerequisites
- Kubernetes 1.28+
- kubectl configured for your cluster
- Helm 3.x (optional, for Helm-based deployment)
Install CRDs
Deploy Operator
# Using kubectl
# Or using Helm
Create a Rivven Cluster
apiVersion: rivven.hupe1980.github.io/v1alpha1
kind: RivvenCluster
metadata:
name: production
namespace: default
spec:
replicas: 3
version: "0.0.1"
storage:
size: 100Gi
storageClassName: fast-ssd
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "4"
memory: 8Gi
config:
defaultPartitions: 3
defaultReplicationFactor: 2
logRetentionHours: 168
tls:
enabled: true
certSecretName: rivven-tls
metrics:
enabled: true
port: 9090
Create a RivvenConnect Instance
RivvenConnect uses a Kafka Connect-style generic configuration approach, allowing any connector to be configured without CRD schema changes. All connector-specific parameters go in the config field and are validated at runtime by the controller.
apiVersion: rivven.hupe1980.github.io/v1alpha1
kind: RivvenConnect
metadata:
name: cdc-pipeline
namespace: default
spec:
clusterRef:
name: production
replicas: 2
version: "0.0.1"
# PostgreSQL CDC source with generic config
sources:
- name: postgres-cdc
connector: postgres-cdc
topic: cdc.events
enabled: true
configSecretRef: postgres-credentials # Secret with host/port/user/pass
# All connector-specific config goes here (Kafka Connect style)
config:
slotName: rivven_cdc_slot
publication: rivven_pub
snapshotMode: initial
decodingPlugin: pgoutput
heartbeatIntervalMs: 10000
# Table selection with column filtering
tables:
- schema: public
table: orders
columns:
excludeColumns:
- schema: public
table: customers
columnMasks:
email: "***@***.***"
phone: "***-***-****"
# Advanced CDC features
snapshot:
batchSize: 20000
parallelTables: 8
queryTimeoutSecs: 600
incrementalSnapshot:
enabled: true
chunkSize: 2048
watermarkStrategy: update_and_insert
heartbeat:
enabled: true
intervalSecs: 5
maxLagSecs: 60
emitEvents: true
deduplication:
enabled: true
bloomExpectedInsertions: 500000
windowSecs: 7200
parallel:
enabled: true
concurrency: 8
workStealing: true
health:
enabled: true
checkIntervalSecs: 5
maxLagMs: 10000
autoRecovery: true
topicConfig:
partitions: 6
replicationFactor: 2
# S3 sink with generic config
sinks:
- name: s3-archive
connector: s3
topics:
- "cdc.*"
consumerGroup: s3-archiver
enabled: true
startOffset: earliest
configSecretRef: s3-credentials # Secret with AWS keys
# All connector-specific config goes here
config:
bucket: my-data-lake
region: us-east-1
prefix: cdc/events
format: parquet
compression: zstd
batchSize: 10000
flushIntervalSeconds: 60
rateLimit:
eventsPerSecond: 10000
burstCapacity: 1000
# Apache Iceberg lakehouse sink
- name: iceberg-lakehouse
connector: iceberg
topics:
- "cdc.events"
consumerGroup: iceberg-writer
enabled: true
startOffset: earliest
configSecretRef: iceberg-credentials # Secret with S3/catalog keys
config:
catalog:
type: rest
rest:
uri: http://polaris:8181
warehouse: s3://my-lakehouse/warehouse
namespace: analytics
table: cdc_events
partitioning: day
partitionFields:
commitMode: append
batchSize: 50000
flushIntervalSeconds: 300
targetFileSizeMb: 128
compression: zstd
storage:
s3:
region: us-east-1
# Debug stdout sink
- name: debug-output
connector: stdout
topics:
- "cdc.events"
consumerGroup: debug
enabled: true
config:
format: json
pretty: true
includeMetadata: true
# Custom connector with generic config
- name: custom-webhook
connector: my-custom-sink
topics:
- "cdc.events"
consumerGroup: custom
enabled: true
config:
url: https://api.example.com/webhook
method: POST
headers:
Content-Type: application/json
batchSize: 100
settings:
topic:
autoCreate: true
defaultPartitions: 3
defaultReplicationFactor: 2
retry:
maxRetries: 10
initialBackoffMs: 100
maxBackoffMs: 30000
health:
enabled: true
port: 8080
metrics:
enabled: true
port: 9091
tls:
enabled: true
certSecretName: rivven-tls
Create an Advanced CDC Pipeline
Production-grade CDC with parallel processing, deduplication, and health monitoring:
apiVersion: rivven.hupe1980.github.io/v1alpha1
kind: RivvenConnect
metadata:
name: production-cdc
namespace: default
spec:
clusterRef:
name: production
replicas: 3
version: "0.0.1"
sources:
- name: postgres-cdc-advanced
connector: postgres-cdc
topic: cdc.events
enabled: true
configSecretRef: postgres-credentials
# All connector config in generic 'config' field (Kafka Connect style)
config:
slotName: rivven_production_slot
publication: rivven_all_tables
snapshotMode: initial
decodingPlugin: pgoutput
tables:
- schema: public
table: orders
- schema: public
table: customers
# Advanced snapshot configuration
snapshot:
batchSize: 50000
parallelTables: 8
queryTimeoutSecs: 600
maxRetries: 5
# Incremental snapshot for zero-downtime re-snapshots
incrementalSnapshot:
enabled: true
chunkSize: 5000
watermarkStrategy: insert
maxConcurrentChunks: 4
# Signal table for ad-hoc snapshot control
signal:
enabled: true
dataCollection: cdc.signals
enabledChannels:
# Heartbeat monitoring
heartbeat:
enabled: true
intervalSecs: 5
maxLagSecs: 60
emitEvents: true
topic: __cdc_heartbeat
# Deduplication with bloom filter
deduplication:
enabled: true
bloomExpectedInsertions: 1000000
bloomFpp: 0.001
lruSize: 100000
windowSecs: 7200
# Transaction metadata
transactionTopic:
enabled: true
topicName: __cdc_transactions
includeDataCollections: true
# Event routing
router:
enabled: true
defaultDestination: cdc.default
deadLetterQueue: cdc.dlq
rules:
- conditionType: table
conditionValue: orders
destination: cdc.orders
priority: 10
- conditionType: operation
conditionValue: DELETE
destination: cdc.deletes
priority: 5
# Custom partitioning
partitioner:
enabled: true
numPartitions: 32
strategy: key_hash
keyColumns:
# SMT transforms
transforms:
- type: extract_new_record_state
config:
addFields: "op,source.ts_ms"
dropTombstones: false
- type: mask_field
config:
fields: "email,phone"
replacement: "***MASKED***"
# Parallel processing
parallel:
enabled: true
concurrency: 8
perTableBuffer: 5000
outputBuffer: 50000
workStealing: true
# Health monitoring
health:
enabled: true
checkIntervalSecs: 5
maxLagMs: 10000
failureThreshold: 3
autoRecovery: true
settings:
topic:
autoCreate: true
defaultPartitions: 6
defaultReplicationFactor: 3
retry:
maxRetries: 10
initialBackoffMs: 100
maxBackoffMs: 30000
health:
enabled: true
port: 8080
metrics:
enabled: true
port: 9091
Create a RivvenTopic
Manage topics declaratively for GitOps workflows:
apiVersion: rivven.hupe1980.github.io/v1alpha1
kind: RivvenTopic
metadata:
name: orders-events
namespace: default
spec:
clusterRef:
name: production
partitions: 12
replicationFactor: 3
config:
# 7 days retention
retentionMs: 604800000
cleanupPolicy: delete
compressionType: lz4
minInsyncReplicas: 2
# Access control
acls:
- principal: "user:order-service"
operations:
- principal: "user:analytics"
operations:
# Keep topic when CRD is deleted
deleteOnRemove: false
# Check topic status
Create a RivvenSchemaRegistry
Deploy and manage a high-performance Schema Registry:
apiVersion: rivven.hupe1980.github.io/v1alpha1
kind: RivvenSchemaRegistry
metadata:
name: schema-registry
namespace: default
spec:
clusterRef:
name: production
replicas: 2
version: "0.0.1"
# Server configuration
server:
port: 8081
bindAddress: "0.0.0.0"
requestTimeoutMs: 30000
corsEnabled: true
# Schema storage
storage:
mode: broker # or "memory"
topic: _schemas
replicationFactor: 3
partitions: 1
# Compatibility settings
compatibility:
defaultLevel: BACKWARD
perSubject:
"order-events-value": FULL
"user-profile-value": FORWARD
# Supported schema formats
formats:
avro: true
jsonSchema: true
protobuf: true
# Authentication
auth:
enabled: true
method: jwt
jwt:
issuerUrl: "https://auth.example.com"
audience: "schema-registry"
usernameClaim: "sub"
rolesClaim: "groups"
# TLS
tls:
enabled: true
certSecretName: schema-registry-tls
mtlsEnabled: false
# Metrics
metrics:
enabled: true
port: 9090
path: /metrics
serviceMonitorEnabled: true
# Resource requests
resources:
requests:
cpu: "500m"
memory: 1Gi
limits:
cpu: "2"
memory: 4Gi
# Check schema registry status
Check Status
# Check cluster status
# Check connect status
# Check topic status
# Check schema registry status
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ RIVVEN OPERATOR │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ Controller │────►│ Kubernetes API │ │
│ │ │ │ │ │
│ │ • Watch CRDs │◄────│ • RivvenCluster events │ │
│ │ • Reconcile │ │ • RivvenConnect events │ │
│ │ • Update status│ │ • RivvenSchemaRegistry events │ │
│ └─────────────────┘ │ • StatefulSet/Deployment status │ │
│ │ └─────────────────────────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Managed Resources (RivvenCluster) │ │
│ │ │ │
│ │ • StatefulSet (rivven-{name}) │ │
│ │ • Headless Service (rivven-{name}-headless) │ │
│ │ • Client Service (rivven-{name}) │ │
│ │ • ConfigMap (rivven-{name}-config) │ │
│ │ • PodDisruptionBudget (rivven-{name}-pdb) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Managed Resources (RivvenConnect) │ │
│ │ │ │
│ │ • Deployment (rivven-connect-{name}) │ │
│ │ • ConfigMap (rivven-connect-{name}-config) │ │
│ │ • Service (rivven-connect-{name}) │ │
│ │ • ServiceMonitor (optional, for Prometheus) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Managed Resources (RivvenSchemaRegistry) │ │
│ │ │ │
│ │ • Deployment (rivven-schema-{name}) │ │
│ │ • ConfigMap (rivven-schema-{name}-config) │ │
│ │ • Service (rivven-schema-{name}) │ │
│ │ • PodDisruptionBudget (rivven-schema-{name}-pdb) │ │
│ │ • ServiceMonitor (optional, for Prometheus) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Custom Resource Definitions
RivvenCluster
Manages Rivven broker clusters with StatefulSets for data persistence.
| Field | Type | Default | Description |
|---|---|---|---|
replicas |
int | 3 | Number of broker replicas |
version |
string | latest | Rivven image version |
image |
string | ghcr.io/hupe1980/rivven |
Container image |
storage.size |
string | 10Gi |
PVC size per broker |
storage.storageClassName |
string | "" |
Storage class |
resources |
ResourceRequirements | - | CPU/memory requests/limits |
config |
BrokerConfig | - | Broker configuration |
tls.enabled |
bool | false | Enable TLS |
metrics.enabled |
bool | true | Enable Prometheus metrics |
RivvenConnect
Manages Rivven Connect instances for data pipelines.
| Field | Type | Default | Description |
|---|---|---|---|
clusterRef.name |
string | required | RivvenCluster to connect to |
clusterRef.namespace |
string | same namespace | Cluster namespace |
replicas |
int | 1 | Number of connect workers |
version |
string | latest | Connect image version |
sources |
[]SourceConnectorSpec | [] | Source connectors |
sinks |
[]SinkConnectorSpec | [] | Sink connectors |
settings.topic.autoCreate |
bool | true | Auto-create topics |
settings.retry.maxRetries |
int | 10 | Max retry attempts |
tls.enabled |
bool | false | Enable TLS for broker |
Source Connector Spec
| Field | Type | Default | Description |
|---|---|---|---|
name |
string | required | Unique connector name |
connector |
string | required | Type (postgres-cdc, mysql-cdc, http, datagen) |
topic |
string | required | Target topic |
topicRouting |
string | - | Topic pattern with placeholders |
enabled |
bool | true | Enable/disable connector |
postgresCdc |
PostgresCdcConfig | - | Typed PostgreSQL CDC config |
mysqlCdc |
MysqlCdcConfig | - | Typed MySQL CDC config |
http |
HttpSourceConfig | - | Typed HTTP source config |
datagen |
DatagenConfig | - | Typed Datagen config |
config |
object | {} | Generic config (for custom connectors) |
configSecretRef |
string | - | Secret for sensitive config |
topicConfig |
SourceTopicConfigSpec | - | Topic auto-creation settings |
Note: CDC connectors (postgres-cdc, mysql-cdc) have tables field inside their typed configs (e.g., postgresCdc.tables).
Sink Connector Spec
| Field | Type | Default | Description |
|---|---|---|---|
name |
string | required | Unique connector name |
connector |
string | required | Type (stdout, s3, http, elasticsearch) |
topics |
[]string | required | Topics to consume (supports wildcards) |
consumerGroup |
string | required | Consumer group for offsets |
enabled |
bool | true | Enable/disable connector |
startOffset |
string | latest | Starting offset (earliest, latest, timestamp) |
s3 |
S3SinkConfig | - | Typed S3 sink config |
http |
HttpSinkConfig | - | Typed HTTP sink config |
stdout |
StdoutSinkConfig | - | Typed stdout sink config |
config |
object | {} | Generic config (for custom connectors) |
configSecretRef |
string | - | Secret for sensitive config |
rateLimit.eventsPerSecond |
int | 0 | Rate limit (0 = unlimited) |
Typed Source Configs
PostgresCdcConfig (for connector: postgres-cdc):
| Field | Type | Default | Description |
|---|---|---|---|
slotName |
string | auto | Replication slot name |
publication |
string | auto | PostgreSQL publication name |
snapshotMode |
string | initial | initial, always, never, when_needed, initial_only, schema_only, recovery, exported, custom |
decodingPlugin |
string | pgoutput | pgoutput, wal2json, decoderbufs |
includeTransactionMetadata |
bool | false | Include transaction info |
heartbeatIntervalMs |
int | 0 | Heartbeat interval (0 = disabled) |
signalTable |
string | - | Signal table for runtime control |
tables |
[]TableSpec | [] | Tables to capture from PostgreSQL |
snapshot |
SnapshotCdcConfigSpec | - | Advanced snapshot configuration |
incrementalSnapshot |
IncrementalSnapshotSpec | - | Non-blocking incremental snapshot |
signal |
SignalTableSpec | - | Signal table for ad-hoc snapshots |
heartbeat |
HeartbeatCdcSpec | - | Heartbeat monitoring configuration |
deduplication |
DeduplicationCdcSpec | - | Event deduplication (bloom filter/LRU) |
transactionTopic |
TransactionTopicSpec | - | Transaction metadata topic |
schemaChangeTopic |
SchemaChangeTopicSpec | - | Schema change capture |
tombstone |
TombstoneCdcSpec | - | Tombstone event handling |
fieldEncryption |
FieldEncryptionSpec | - | Field-level encryption |
readOnlyReplica |
ReadOnlyReplicaSpec | - | PostgreSQL read replica support |
router |
EventRouterSpec | - | Event routing configuration |
partitioner |
PartitionerSpec | - | Custom partitioning strategy |
transforms |
[]SmtTransformSpec | [] | SMT (Single Message Transforms) |
parallel |
ParallelCdcSpec | - | Parallel processing configuration |
outbox |
OutboxSpec | - | Outbox pattern configuration |
health |
HealthMonitorSpec | - | Health monitoring configuration |
MysqlCdcConfig (for connector: mysql-cdc):
| Field | Type | Default | Description |
|---|---|---|---|
serverId |
int | auto | Unique server ID for binlog replication |
snapshotMode |
string | initial | initial, always, never, when_needed, initial_only, schema_only, recovery, exported, custom |
includeGtid |
bool | false | Include GTID in events |
heartbeatIntervalMs |
int | 0 | Heartbeat interval |
databaseHistoryTopic |
string | auto | Topic for schema changes |
tables |
[]TableSpec | [] | Tables to capture from MySQL |
snapshot |
SnapshotCdcConfigSpec | - | Advanced snapshot configuration |
incrementalSnapshot |
IncrementalSnapshotSpec | - | Non-blocking incremental snapshot |
signal |
SignalTableSpec | - | Signal table for ad-hoc snapshots |
heartbeat |
HeartbeatCdcSpec | - | Heartbeat monitoring configuration |
deduplication |
DeduplicationCdcSpec | - | Event deduplication (bloom filter/LRU) |
transactionTopic |
TransactionTopicSpec | - | Transaction metadata topic |
schemaChangeTopic |
SchemaChangeTopicSpec | - | Schema change capture |
tombstone |
TombstoneCdcSpec | - | Tombstone event handling |
fieldEncryption |
FieldEncryptionSpec | - | Field-level encryption |
router |
EventRouterSpec | - | Event routing configuration |
partitioner |
PartitionerSpec | - | Custom partitioning strategy |
transforms |
[]SmtTransformSpec | [] | SMT (Single Message Transforms) |
parallel |
ParallelCdcSpec | - | Parallel processing configuration |
outbox |
OutboxSpec | - | Outbox pattern configuration |
health |
HealthMonitorSpec | - | Health monitoring configuration |
DatagenConfig (for connector: datagen):
| Field | Type | Default | Description |
|---|---|---|---|
eventsPerSecond |
int | 1 | Events per second to generate |
maxEvents |
int | 0 | Total events (0 = unlimited) |
schemaType |
string | json | Output schema type |
seed |
int | - | Random seed for reproducibility |
ExternalSourceConfig (for connector: external-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
brokers |
[]string | required | External broker addresses |
topic |
string | required | External topic to consume from |
consumerGroup |
string | required | Consumer group ID |
startOffset |
string | latest | earliest, latest |
securityProtocol |
string | plaintext | plaintext, ssl, sasl_plaintext, sasl_ssl |
saslMechanism |
string | - | plain, scram-sha-256, scram-sha-512 |
saslUsername |
string | - | SASL username (use secret for password) |
MqttSourceConfig (for connector: mqtt-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
brokerUrl |
string | required | MQTT broker URL (e.g., mqtt://broker:1883) |
topics |
[]string | required | MQTT topics (supports wildcards: +, #) |
clientId |
string | - | MQTT client ID |
qos |
string | at_least_once | at_most_once, at_least_once, exactly_once |
cleanSession |
bool | true | Clean session on connect |
mqttVersion |
string | v311 | v3, v311, v5 |
username |
string | - | Auth username (use secret for password) |
SqsSourceConfig (for connector: sqs-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
queueUrl |
string | required | SQS queue URL |
region |
string | required | AWS region (e.g., us-east-1) |
maxMessages |
int | 10 | Max messages per poll (1-10) |
waitTimeSeconds |
int | 20 | Long polling wait time |
visibilityTimeout |
int | 30 | Visibility timeout in seconds |
awsProfile |
string | - | AWS profile name |
roleArn |
string | - | IAM role ARN to assume |
PubSubSourceConfig (for connector: pubsub-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
projectId |
string | required | GCP project ID |
subscription |
string | required | Pub/Sub subscription name |
topic |
string | - | Topic name (for auto-created subscriptions) |
maxMessages |
int | 100 | Max messages per pull (1-1000) |
ackDeadlineSeconds |
int | 30 | Ack deadline (10-600 seconds) |
credentialsFile |
string | - | Path to service account credentials |
useAdc |
bool | true | Use Application Default Credentials |
Typed Sink Configs
S3SinkConfig (for connector: s3, rivven-storage):
| Field | Type | Default | Description |
|---|---|---|---|
bucket |
string | required | S3 bucket name |
region |
string | - | AWS region |
endpointUrl |
string | - | Custom endpoint (MinIO, etc.) |
prefix |
string | - | Object key prefix |
format |
string | json | json, jsonl, parquet, avro |
compression |
string | none | none, gzip, snappy, lz4, zstd |
batchSize |
int | 1000 | Events per file |
flushIntervalSeconds |
int | 60 | Flush interval |
HttpSinkConfig (for connector: http):
| Field | Type | Default | Description |
|---|---|---|---|
url |
string | required | Target URL |
method |
string | POST | POST, PUT, PATCH |
contentType |
string | application/json | Content-Type header |
timeoutMs |
int | 30000 | Request timeout |
batchSize |
int | 100 | Events per request |
ExternalSinkConfig (for connector: external-sink, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
brokers |
[]string | required | External broker addresses |
topic |
string | required | Target external topic |
acks |
string | all | none, leader, all, 0, 1, -1 |
compression |
string | none | none, gzip, snappy, lz4, zstd |
batchSize |
int | 16384 | Batch size in bytes |
lingerMs |
int | 0 | Linger time in milliseconds |
securityProtocol |
string | plaintext | plaintext, ssl, sasl_plaintext, sasl_ssl |
saslMechanism |
string | - | plain, scram-sha-256, scram-sha-512 |
saslUsername |
string | - | SASL username (use secret for password) |
GcsSinkConfig (for connector: gcs, rivven-storage):
| Field | Type | Default | Description |
|---|---|---|---|
bucket |
string | required | GCS bucket name |
prefix |
string | - | Object key prefix |
format |
string | jsonl | json, jsonl, parquet, avro |
compression |
string | none | none, gzip |
partitioning |
string | none | none, daily, hourly |
batchSize |
int | 1000 | Events per file |
flushIntervalSeconds |
int | 60 | Flush interval |
credentialsFile |
string | - | Path to service account JSON |
useAdc |
bool | true | Use Application Default Credentials |
AzureBlobSinkConfig (for connector: azure-blob, rivven-storage):
| Field | Type | Default | Description |
|---|---|---|---|
accountName |
string | required | Storage account name |
container |
string | required | Container name |
prefix |
string | - | Blob prefix |
format |
string | jsonl | json, jsonl, parquet, avro |
compression |
string | none | none, gzip |
partitioning |
string | none | none, daily, hourly |
batchSize |
int | 1000 | Events per file |
flushIntervalSeconds |
int | 60 | Flush interval |
SnowflakeSinkConfig (for connector: snowflake, rivven-warehouse):
| Field | Type | Default | Description |
|---|---|---|---|
account |
string | required | Snowflake account identifier |
user |
string | required | Snowflake user name |
privateKeyPath |
string | required | Path to PKCS#8 private key (use secret) |
database |
string | required | Target database |
schema |
string | required | Target schema |
table |
string | required | Target table |
warehouse |
string | - | Snowflake warehouse name |
role |
string | - | Snowflake role name |
batchSize |
int | 1000 | Rows per batch insert |
BigQuerySinkConfig (for connector: bigquery, rivven-warehouse):
| Field | Type | Default | Description |
|---|---|---|---|
projectId |
string | required | GCP project ID |
datasetId |
string | required | BigQuery dataset ID |
tableId |
string | required | BigQuery table ID |
credentialsFile |
string | - | Path to service account JSON |
useAdc |
bool | true | Use Application Default Credentials |
batchSize |
int | 500 | Rows per insert request |
autoCreateTable |
bool | false | Auto-create table if not exists |
RedshiftSinkConfig (for connector: redshift, rivven-warehouse):
| Field | Type | Default | Description |
|---|---|---|---|
host |
string | required | Redshift cluster endpoint |
port |
int | 5439 | Redshift port |
database |
string | required | Database name |
user |
string | required | Redshift user name |
schema |
string | public | Target schema |
table |
string | required | Target table |
sslMode |
string | prefer | disable, prefer, require, verify-ca, verify-full |
batchSize |
int | 1000 | Rows per batch insert |
IcebergSinkConfig (for connector: iceberg, rivven-connect lakehouse):
| Field | Type | Default | Description |
|---|---|---|---|
catalog.type |
string | rest | rest, hive, glue, jdbc, nessie, memory |
catalog.rest.uri |
string | - | REST catalog URI (Polaris, Tabular, Lakekeeper) |
catalog.rest.credential |
string | - | Client credentials (client_id:client_secret) |
catalog.rest.token |
string | - | Bearer token for authentication |
catalog.warehouse |
string | - | Warehouse location (s3://bucket/warehouse) |
namespace |
string | required | Iceberg namespace/database |
table |
string | required | Iceberg table name |
partitioning |
string | table_default | none, table_default, identity, bucket, year, month, day, hour |
partitionFields |
[]string | [] | Fields to partition by |
bucketCount |
int | - | Number of buckets for bucket partitioning |
commitMode |
string | append | append, overwrite, upsert |
upsertKeyFields |
[]string | [] | Key fields for upsert/merge operations |
batchSize |
int | 10000 | Events per commit |
flushIntervalSeconds |
int | 60 | Flush interval |
targetFileSizeMb |
int | 128 | Target Parquet file size (rolling files) |
compression |
string | snappy | none, snappy, gzip, lz4, zstd, brotli |
schemaEvolution |
string | strict | strict, add_columns, full |
storage.s3.region |
string | - | AWS region |
storage.s3.endpointUrl |
string | - | Custom endpoint (MinIO) |
storage.s3.pathStyleAccess |
bool | false | Use path-style access (required for MinIO) |
DeltaLakeSinkConfig (for connector: delta, rivven-connect lakehouse):
| Field | Type | Default | Description |
|---|---|---|---|
tablePath |
string | required | Delta table path (s3://bucket/delta/table) |
partitionBy |
[]string | [] | Partition columns |
writeMode |
string | append | append, overwrite, merge |
mergeKeyFields |
[]string | [] | Key fields for merge operations |
targetFileSizeMb |
int | 128 | Target Parquet file size |
compression |
string | snappy | none, snappy, gzip, lz4, zstd |
batchSize |
int | 10000 | Events per commit |
flushIntervalSeconds |
int | 60 | Flush interval |
storage.s3.region |
string | - | AWS region |
storage.s3.endpointUrl |
string | - | Custom endpoint (MinIO) |
TableSpec (CDC Tables)
| Field | Type | Default | Description |
|---|---|---|---|
schema |
string | - | Schema/namespace (e.g., "public") |
table |
string | required | Table name |
topic |
string | - | Override topic for this table |
columns |
[]string | [] | Columns to include (empty = all) |
excludeColumns |
[]string | [] | Columns to exclude |
columnMasks |
map[string]string | {} | Column masking rules |
Advanced CDC Configuration Types
These types provide production-grade CDC features for both PostgreSQL and MySQL connectors.
SnapshotCdcConfigSpec (initial data capture settings):
| Field | Type | Default | Description |
|---|---|---|---|
batchSize |
int | 10000 | SELECT query batch size (rows) |
parallelTables |
int | 4 | Tables to snapshot in parallel (1-32) |
queryTimeoutSecs |
int | 300 | Query timeout seconds (10-3600) |
throttleDelayMs |
int | 0 | Delay between batches for backpressure |
maxRetries |
int | 3 | Max retries per batch on failure |
includeTables |
[]string | [] | Tables to include (empty = all) |
excludeTables |
[]string | [] | Tables to exclude |
IncrementalSnapshotSpec (non-blocking re-snapshot):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable incremental snapshots |
chunkSize |
int | 1024 | Rows per chunk (100-100000) |
watermarkStrategy |
string | - | insert or update_and_insert |
watermarkSignalTable |
string | - | Signal table for watermark tracking |
maxConcurrentChunks |
int | 1 | Max concurrent chunks (1-16) |
chunkDelayMs |
int | 0 | Delay between chunks |
SignalTableSpec (ad-hoc snapshot control):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable signal processing |
dataCollection |
string | - | Signal table name (schema.table) |
topic |
string | - | Topic for signal messages |
enabledChannels |
[]string | [] | Channels: source, topic, file, api |
pollIntervalMs |
int | 1000 | Poll interval for file channel |
HeartbeatCdcSpec (connection health monitoring):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable heartbeat monitoring |
intervalSecs |
int | 10 | Heartbeat interval (1-3600s) |
maxLagSecs |
int | 300 | Max allowed lag before unhealthy |
emitEvents |
bool | false | Emit heartbeat events to topic |
topic |
string | - | Topic for heartbeat events |
actionQuery |
string | - | SQL to execute on each heartbeat |
DeduplicationCdcSpec (duplicate event prevention):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable deduplication |
bloomExpectedInsertions |
int | 100000 | Bloom filter expected items |
bloomFpp |
float | 0.01 | Bloom filter false positive rate |
lruSize |
int | 10000 | LRU cache size |
windowSecs |
int | 3600 | Deduplication window (60-604800s) |
TransactionTopicSpec (transaction metadata):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable transaction topic |
topicName |
string | - | Transaction metadata topic |
includeDataCollections |
bool | true | Include affected tables list |
minEventsThreshold |
int | 0 | Min events to emit transaction |
SchemaChangeTopicSpec (DDL change capture):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable schema change capture |
topicName |
string | - | Schema change topic |
includeColumns |
bool | true | Include column details |
schemas |
[]string | [] | Schemas to monitor (empty = all) |
TombstoneCdcSpec (delete event handling):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable tombstone handling |
afterDelete |
bool | true | Emit tombstone after delete |
behavior |
string | - | emit_null or emit_with_key |
FieldEncryptionSpec (field-level encryption):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable field encryption |
keySecretRef |
string | - | Secret containing encryption key |
fields |
[]string | [] | Fields to encrypt |
algorithm |
string | aes-256-gcm | Encryption algorithm |
ReadOnlyReplicaSpec (PostgreSQL read replica support):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable read replica support |
lagThresholdMs |
int | 5000 | Max acceptable replication lag |
deduplicate |
bool | true | Deduplicate events across replicas |
watermarkSource |
string | - | primary or replica |
EventRouterSpec (dynamic event routing):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable event routing |
defaultDestination |
string | - | Default destination topic |
deadLetterQueue |
string | - | DLQ for unroutable events |
dropUnroutable |
bool | false | Drop events with no route |
rules |
[]RouteRuleSpec | [] | Routing rules |
RouteRuleSpec (routing rule definition):
| Field | Type | Default | Description |
|---|---|---|---|
conditionType |
string | required | always, table, table_pattern, schema, operation, field_equals, field_exists |
conditionValue |
string | - | Condition value/pattern |
destination |
string | required | Destination topic |
priority |
int | 0 | Rule priority (higher = first) |
PartitionerSpec (custom partitioning):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable custom partitioning |
numPartitions |
int | 16 | Number of partitions (1-1000) |
strategy |
string | key_hash | round_robin, key_hash, table_hash, full_table_hash, sticky |
keyColumns |
[]string | [] | Columns for key-based partitioning |
SmtTransformSpec (Single Message Transform):
| Field | Type | Default | Description |
|---|---|---|---|
type |
string | required | Transform type (see below) |
config |
object | {} | Transform-specific configuration |
Supported transform types:
extract_new_record_state- Extract after state from change eventsmask_field- Mask sensitive fieldsfilter- Filter events by conditionflatten- Flatten nested structurescast- Cast field typesinsert_field- Add static fieldsreplace_field- Replace field valuesvalue_to_key- Promote value fields to keyregex_router- Route by regex patterncontent_router- Route by contenttimestamp_router- Route by timestampheader_from- Copy value to headerdrop_headers- Remove headersmessage_timestamp- Set message timestampunwrap_envelope- Unwrap CDC envelopeconvert_timezone- Convert timestampsdrop_null- Drop null fields
ParallelCdcSpec (parallel processing):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable parallel processing |
concurrency |
int | 4 | Worker thread count (1-64) |
perTableBuffer |
int | 1000 | Buffer per table (100-100000) |
outputBuffer |
int | 10000 | Output buffer size |
workStealing |
bool | true | Enable work stealing |
perTableRateLimit |
int | - | Events/sec per table |
shutdownTimeoutSecs |
int | 30 | Graceful shutdown timeout |
OutboxSpec (transactional outbox pattern):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable outbox pattern |
tableName |
string | outbox | Outbox table name |
pollIntervalMs |
int | 100 | Poll interval (10-60000ms) |
batchSize |
int | 100 | Batch size (1-10000) |
maxRetries |
int | 3 | Max delivery retries |
deliveryTimeoutSecs |
int | 30 | Delivery timeout |
orderedDelivery |
bool | true | Maintain message order |
retentionSecs |
int | 86400 | Message retention (1hr-30days) |
maxConcurrency |
int | 10 | Max concurrent deliveries |
HealthMonitorSpec (connector health monitoring):
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false | Enable health monitoring |
checkIntervalSecs |
int | 10 | Health check interval (1-3600s) |
maxLagMs |
int | 30000 | Max acceptable lag |
failureThreshold |
int | 3 | Failures before unhealthy |
successThreshold |
int | 2 | Successes before healthy |
checkTimeoutSecs |
int | 5 | Health check timeout |
autoRecovery |
bool | true | Auto-recovery on failure |
recoveryDelaySecs |
int | 1 | Initial recovery delay |
maxRecoveryDelaySecs |
int | 60 | Max recovery delay (exponential backoff) |
RivvenSchemaRegistry
Manages a high-performance Schema Registry for schema validation and evolution.
| Field | Type | Default | Description |
|---|---|---|---|
clusterRef.name |
string | required | RivvenCluster to connect to |
clusterRef.namespace |
string | same namespace | Cluster namespace |
replicas |
int | 1 | Number of registry replicas |
version |
string | latest | Registry image version |
server.port |
int | 8081 | HTTP server port |
server.bindAddress |
string | 0.0.0.0 |
Bind address |
server.requestTimeoutMs |
int | 30000 | Request timeout |
server.corsEnabled |
bool | false | Enable CORS |
storage.mode |
string | broker |
Storage: memory or broker |
storage.topic |
string | _schemas |
Schema storage topic |
storage.replicationFactor |
int | 1 | Topic replication factor |
compatibility.defaultLevel |
string | BACKWARD |
Default compatibility level |
compatibility.perSubject |
map[string]string | {} | Per-subject compatibility overrides |
formats.avro |
bool | true | Enable Avro schemas |
formats.jsonSchema |
bool | true | Enable JSON Schema |
formats.protobuf |
bool | true | Enable Protobuf schemas |
auth.enabled |
bool | false | Enable authentication |
auth.method |
string | basic |
Auth method: basic, jwt, cedar |
tls.enabled |
bool | false | Enable TLS |
tls.certSecretName |
string | - | TLS certificate secret |
tls.mtlsEnabled |
bool | false | Enable mutual TLS |
metrics.enabled |
bool | true | Enable Prometheus metrics |
metrics.port |
int | 9090 | Metrics port |
metrics.serviceMonitorEnabled |
bool | false | Create ServiceMonitor |
Schema Compatibility Levels
| Level | Description |
|---|---|
NONE |
No compatibility checking |
BACKWARD |
New schema can read old data |
BACKWARD_TRANSITIVE |
All previous schemas can be read |
FORWARD |
Old schema can read new data |
FORWARD_TRANSITIVE |
All future schemas can read old data |
FULL |
Both backward and forward compatible |
FULL_TRANSITIVE |
Full compatibility with all versions |
Authentication Methods
Basic Auth (auth.method: basic):
auth:
enabled: true
method: basic
users:
- username: admin
passwordSecretKey: admin-password
role: admin
- username: readonly
passwordSecretKey: ro-password
role: reader
JWT/OIDC Auth (auth.method: jwt):
auth:
enabled: true
method: jwt
jwt:
issuerUrl: "https://auth.example.com"
jwksUrl: "https://auth.example.com/.well-known/jwks.json"
audience: "schema-registry"
usernameClaim: "sub"
rolesClaim: "groups"
Cedar Policy Auth (auth.method: cedar):
auth:
enabled: true
method: cedar
cedar:
policySecretRef: cedar-policies
External Registry Sync
Sync schemas with external registries:
externalRegistry:
enabled: true
registryType: compatible # or "glue"
registryUrl: "https://external-sr.example.com"
syncMode: mirror # "mirror", "push", "bidirectional"
syncSubjects:
- "orders-*"
- "users-*"
syncIntervalSeconds: 300
credentialsSecretRef: external-creds
Configuration
Operator Flags
| Flag | Env Var | Default | Description |
|---|---|---|---|
--metrics-addr |
METRICS_ADDR |
0.0.0.0:8080 |
Metrics server address |
--health-addr |
HEALTH_ADDR |
0.0.0.0:8081 |
Health probe address |
--leader-election |
LEADER_ELECTION |
true |
Enable leader election |
--namespace |
WATCH_NAMESPACE |
"" |
Namespace to watch (empty = all) |
Supported Connectors
Built-in Sources (rivven-connect core)
postgres-cdc- PostgreSQL Change Data Capture (logical replication)mysql-cdc- MySQL Change Data Capture (binlog replication)http- HTTP/Webhook sourcedatagen- Data generator for testing
Queue Sources (rivven-queue crate)
external-source- External message queue consumer (for migrations)mqtt-source- MQTT broker subscriber (IoT data ingestion)sqs-source- AWS SQS queue consumerpubsub-source- Google Cloud Pub/Sub subscriber
Built-in Sinks (rivven-connect core)
stdout- Standard output (debugging)http-webhook- HTTP webhooks
Queue Sinks (rivven-queue crate)
external-sink- External message queue producer (hybrid deployments)
Storage Sinks (rivven-storage crate)
s3- Amazon S3 / MinIO object storagegcs- Google Cloud Storageazure-blob- Azure Blob Storage
Warehouse Sinks (rivven-warehouse crate)
snowflake- Snowflake Data Warehouse (Snowpipe Streaming)bigquery- Google BigQueryredshift- Amazon Redshift
Development
Build
Run Locally (against kind/minikube)
# Install CRDs
# Run operator locally
Run Tests
Generate CRD YAML
Documentation
License
Apache-2.0. See LICENSE.