Rivven Kubernetes Operator
Production-grade Kubernetes operator for deploying and managing Rivven clusters and connectors.
Features
- Custom Resource Definitions:
RivvenClusterfor declarative cluster managementRivvenConnectfor declarative connector pipeline managementRivvenTopicfor declarative topic management (GitOps-friendly)
- Automated Reconciliation: Continuous state management with eventual consistency
- StatefulSet Management: Ordered deployment, scaling, and rolling updates
- Service Discovery: Automatic headless service for broker discovery
- Configuration Management: ConfigMaps for broker and connector configuration
- Secret Management: Secure credential handling for sources and sinks
- Metrics: Prometheus-compatible operator metrics
- Finalizers: Clean resource cleanup on deletion
Quick Start
Prerequisites
- Kubernetes 1.28+
- kubectl configured for your cluster
- Helm 3.x (optional, for Helm-based deployment)
Install CRDs
Deploy Operator
# Using kubectl
# Or using Helm
Create a Rivven Cluster
apiVersion: rivven.io/v1alpha1
kind: RivvenCluster
metadata:
name: production
namespace: default
spec:
replicas: 3
version: "0.0.1"
storage:
size: 100Gi
storageClassName: fast-ssd
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "4"
memory: 8Gi
config:
defaultPartitions: 3
defaultReplicationFactor: 2
logRetentionHours: 168
tls:
enabled: true
certSecretName: rivven-tls
metrics:
enabled: true
port: 9090
Create a RivvenConnect Instance
apiVersion: rivven.io/v1alpha1
kind: RivvenConnect
metadata:
name: cdc-pipeline
namespace: default
spec:
clusterRef:
name: production
replicas: 2
version: "0.0.1"
# PostgreSQL CDC source with TYPED config
sources:
- name: postgres-cdc
connector: postgres-cdc
topic: cdc.events
enabled: true
configSecretRef: postgres-credentials # Secret with host/port/user/pass
# Typed PostgreSQL CDC configuration (validated by CRD)
postgresCdc:
slotName: rivven_cdc_slot
publication: rivven_pub
snapshotMode: initial
decodingPlugin: pgoutput
heartbeatIntervalMs: 10000
# Table selection with column filtering (inside CDC config)
tables:
- schema: public
table: orders
columns:
excludeColumns:
- schema: public
table: customers
columnMasks:
email: "***@***.***"
phone: "***-***-****"
topicConfig:
partitions: 6
replicationFactor: 2
# S3 sink with TYPED config
sinks:
- name: s3-archive
connector: s3
topics:
- "cdc.*"
consumerGroup: s3-archiver
enabled: true
startOffset: earliest
configSecretRef: s3-credentials # Secret with AWS keys
# Typed S3 configuration (validated by CRD)
s3:
bucket: my-data-lake
region: us-east-1
prefix: cdc/events
format: parquet
compression: zstd
batchSize: 10000
flushIntervalSeconds: 60
rateLimit:
eventsPerSecond: 10000
burstCapacity: 1000
# Debug stdout sink with TYPED config
- name: debug-output
connector: stdout
topics:
- "cdc.events"
consumerGroup: debug
enabled: true
stdout:
format: json
pretty: true
includeMetadata: true
# Custom connector with GENERIC config
- name: custom-webhook
connector: my-custom-sink
topics:
- "cdc.events"
consumerGroup: custom
enabled: true
# Generic config for custom connectors
config:
customField: value
nested:
option: true
settings:
topic:
autoCreate: true
defaultPartitions: 3
defaultReplicationFactor: 2
retry:
maxRetries: 10
initialBackoffMs: 100
maxBackoffMs: 30000
health:
enabled: true
port: 8080
metrics:
enabled: true
port: 9091
tls:
enabled: true
certSecretName: rivven-tls
Create a RivvenTopic
Manage topics declaratively for GitOps workflows:
apiVersion: rivven.io/v1alpha1
kind: RivvenTopic
metadata:
name: orders-events
namespace: default
spec:
clusterRef:
name: production
partitions: 12
replicationFactor: 3
config:
# 7 days retention
retentionMs: 604800000
cleanupPolicy: delete
compressionType: lz4
minInsyncReplicas: 2
# Access control
acls:
- principal: "user:order-service"
operations:
- principal: "user:analytics"
operations:
# Keep topic when CRD is deleted
deleteOnRemove: false
# Check topic status
Check Status
# Check cluster status
# Check connect status
# Check topic status
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ RIVVEN OPERATOR │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ Controller │────►│ Kubernetes API │ │
│ │ │ │ │ │
│ │ • Watch CRDs │◄────│ • RivvenCluster events │ │
│ │ • Reconcile │ │ • RivvenConnect events │ │
│ │ • Update status│ │ • StatefulSet/Deployment status │ │
│ └─────────────────┘ └─────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Managed Resources (RivvenCluster) │ │
│ │ │ │
│ │ • StatefulSet (rivven-{name}) │ │
│ │ • Headless Service (rivven-{name}-headless) │ │
│ │ • Client Service (rivven-{name}) │ │
│ │ • ConfigMap (rivven-{name}-config) │ │
│ │ • PodDisruptionBudget (rivven-{name}-pdb) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Managed Resources (RivvenConnect) │ │
│ │ │ │
│ │ • Deployment (rivven-connect-{name}) │ │
│ │ • ConfigMap (rivven-connect-{name}-config) │ │
│ │ • Service (rivven-connect-{name}) │ │
│ │ • ServiceMonitor (optional, for Prometheus) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Custom Resource Definitions
RivvenCluster
Manages Rivven broker clusters with StatefulSets for data persistence.
| Field | Type | Default | Description |
|---|---|---|---|
replicas |
int | 3 | Number of broker replicas |
version |
string | latest | Rivven image version |
image |
string | ghcr.io/hupe1980/rivven |
Container image |
storage.size |
string | 10Gi |
PVC size per broker |
storage.storageClassName |
string | "" |
Storage class |
resources |
ResourceRequirements | - | CPU/memory requests/limits |
config |
BrokerConfig | - | Broker configuration |
tls.enabled |
bool | false | Enable TLS |
metrics.enabled |
bool | true | Enable Prometheus metrics |
RivvenConnect
Manages Rivven Connect instances for data pipelines.
| Field | Type | Default | Description |
|---|---|---|---|
clusterRef.name |
string | required | RivvenCluster to connect to |
clusterRef.namespace |
string | same namespace | Cluster namespace |
replicas |
int | 1 | Number of connect workers |
version |
string | latest | Connect image version |
sources |
[]SourceConnectorSpec | [] | Source connectors |
sinks |
[]SinkConnectorSpec | [] | Sink connectors |
settings.topic.autoCreate |
bool | true | Auto-create topics |
settings.retry.maxRetries |
int | 10 | Max retry attempts |
tls.enabled |
bool | false | Enable TLS for broker |
Source Connector Spec
| Field | Type | Default | Description |
|---|---|---|---|
name |
string | required | Unique connector name |
connector |
string | required | Type (postgres-cdc, mysql-cdc, http, datagen) |
topic |
string | required | Target topic |
topicRouting |
string | - | Topic pattern with placeholders |
enabled |
bool | true | Enable/disable connector |
postgresCdc |
PostgresCdcConfig | - | Typed PostgreSQL CDC config |
mysqlCdc |
MysqlCdcConfig | - | Typed MySQL CDC config |
http |
HttpSourceConfig | - | Typed HTTP source config |
datagen |
DatagenConfig | - | Typed Datagen config |
config |
object | {} | Generic config (for custom connectors) |
configSecretRef |
string | - | Secret for sensitive config |
topicConfig |
SourceTopicConfigSpec | - | Topic auto-creation settings |
Note: CDC connectors (postgres-cdc, mysql-cdc) have tables field inside their typed configs (e.g., postgresCdc.tables).
Sink Connector Spec
| Field | Type | Default | Description |
|---|---|---|---|
name |
string | required | Unique connector name |
connector |
string | required | Type (stdout, s3, http, elasticsearch) |
topics |
[]string | required | Topics to consume (supports wildcards) |
consumerGroup |
string | required | Consumer group for offsets |
enabled |
bool | true | Enable/disable connector |
startOffset |
string | latest | Starting offset (earliest, latest, timestamp) |
s3 |
S3SinkConfig | - | Typed S3 sink config |
http |
HttpSinkConfig | - | Typed HTTP sink config |
stdout |
StdoutSinkConfig | - | Typed stdout sink config |
config |
object | {} | Generic config (for custom connectors) |
configSecretRef |
string | - | Secret for sensitive config |
rateLimit.eventsPerSecond |
int | 0 | Rate limit (0 = unlimited) |
Typed Source Configs
PostgresCdcConfig (for connector: postgres-cdc):
| Field | Type | Default | Description |
|---|---|---|---|
slotName |
string | auto | Replication slot name |
publication |
string | auto | PostgreSQL publication name |
snapshotMode |
string | initial | initial, never, when_needed, exported |
decodingPlugin |
string | pgoutput | pgoutput, wal2json, decoderbufs |
includeTransactionMetadata |
bool | false | Include transaction info |
heartbeatIntervalMs |
int | 0 | Heartbeat interval (0 = disabled) |
signalTable |
string | - | Signal table for runtime control |
tables |
[]TableSpec | [] | Tables to capture from PostgreSQL |
MysqlCdcConfig (for connector: mysql-cdc):
| Field | Type | Default | Description |
|---|---|---|---|
serverId |
int | auto | Unique server ID for binlog replication |
snapshotMode |
string | initial | initial, never, when_needed, schema_only |
includeGtid |
bool | false | Include GTID in events |
heartbeatIntervalMs |
int | 0 | Heartbeat interval |
databaseHistoryTopic |
string | auto | Topic for schema changes |
tables |
[]TableSpec | [] | Tables to capture from MySQL |
DatagenConfig (for connector: datagen):
| Field | Type | Default | Description |
|---|---|---|---|
eventsPerSecond |
int | 1 | Events per second to generate |
maxEvents |
int | 0 | Total events (0 = unlimited) |
schemaType |
string | json | Output schema type |
seed |
int | - | Random seed for reproducibility |
KafkaSourceConfig (for connector: kafka-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
brokers |
[]string | required | Kafka broker addresses |
topic |
string | required | Kafka topic to consume from |
consumerGroup |
string | required | Consumer group ID |
startOffset |
string | latest | earliest, latest |
securityProtocol |
string | plaintext | plaintext, ssl, sasl_plaintext, sasl_ssl |
saslMechanism |
string | - | plain, scram-sha-256, scram-sha-512 |
saslUsername |
string | - | SASL username (use secret for password) |
MqttSourceConfig (for connector: mqtt-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
brokerUrl |
string | required | MQTT broker URL (e.g., mqtt://broker:1883) |
topics |
[]string | required | MQTT topics (supports wildcards: +, #) |
clientId |
string | - | MQTT client ID |
qos |
string | at_least_once | at_most_once, at_least_once, exactly_once |
cleanSession |
bool | true | Clean session on connect |
mqttVersion |
string | v311 | v3, v311, v5 |
username |
string | - | Auth username (use secret for password) |
SqsSourceConfig (for connector: sqs-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
queueUrl |
string | required | SQS queue URL |
region |
string | required | AWS region (e.g., us-east-1) |
maxMessages |
int | 10 | Max messages per poll (1-10) |
waitTimeSeconds |
int | 20 | Long polling wait time |
visibilityTimeout |
int | 30 | Visibility timeout in seconds |
awsProfile |
string | - | AWS profile name |
roleArn |
string | - | IAM role ARN to assume |
PubSubSourceConfig (for connector: pubsub-source, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
projectId |
string | required | GCP project ID |
subscription |
string | required | Pub/Sub subscription name |
topic |
string | - | Topic name (for auto-created subscriptions) |
maxMessages |
int | 100 | Max messages per pull (1-1000) |
ackDeadlineSeconds |
int | 30 | Ack deadline (10-600 seconds) |
credentialsFile |
string | - | Path to service account credentials |
useAdc |
bool | true | Use Application Default Credentials |
Typed Sink Configs
S3SinkConfig (for connector: s3, rivven-storage):
| Field | Type | Default | Description |
|---|---|---|---|
bucket |
string | required | S3 bucket name |
region |
string | - | AWS region |
endpointUrl |
string | - | Custom endpoint (MinIO, etc.) |
prefix |
string | - | Object key prefix |
format |
string | json | json, jsonl, parquet, avro |
compression |
string | none | none, gzip, snappy, lz4, zstd |
batchSize |
int | 1000 | Events per file |
flushIntervalSeconds |
int | 60 | Flush interval |
HttpSinkConfig (for connector: http):
| Field | Type | Default | Description |
|---|---|---|---|
url |
string | required | Target URL |
method |
string | POST | POST, PUT, PATCH |
contentType |
string | application/json | Content-Type header |
timeoutMs |
int | 30000 | Request timeout |
batchSize |
int | 100 | Events per request |
KafkaSinkConfig (for connector: kafka-sink, rivven-queue):
| Field | Type | Default | Description |
|---|---|---|---|
brokers |
[]string | required | Kafka broker addresses |
topic |
string | required | Target Kafka topic |
acks |
string | all | none, leader, all, 0, 1, -1 |
compression |
string | none | none, gzip, snappy, lz4, zstd |
batchSize |
int | 16384 | Batch size in bytes |
lingerMs |
int | 0 | Linger time in milliseconds |
securityProtocol |
string | plaintext | plaintext, ssl, sasl_plaintext, sasl_ssl |
saslMechanism |
string | - | plain, scram-sha-256, scram-sha-512 |
saslUsername |
string | - | SASL username (use secret for password) |
GcsSinkConfig (for connector: gcs, rivven-storage):
| Field | Type | Default | Description |
|---|---|---|---|
bucket |
string | required | GCS bucket name |
prefix |
string | - | Object key prefix |
format |
string | jsonl | json, jsonl, parquet, avro |
compression |
string | none | none, gzip |
partitioning |
string | none | none, daily, hourly |
batchSize |
int | 1000 | Events per file |
flushIntervalSeconds |
int | 60 | Flush interval |
credentialsFile |
string | - | Path to service account JSON |
useAdc |
bool | true | Use Application Default Credentials |
AzureBlobSinkConfig (for connector: azure-blob, rivven-storage):
| Field | Type | Default | Description |
|---|---|---|---|
accountName |
string | required | Storage account name |
container |
string | required | Container name |
prefix |
string | - | Blob prefix |
format |
string | jsonl | json, jsonl, parquet, avro |
compression |
string | none | none, gzip |
partitioning |
string | none | none, daily, hourly |
batchSize |
int | 1000 | Events per file |
flushIntervalSeconds |
int | 60 | Flush interval |
SnowflakeSinkConfig (for connector: snowflake, rivven-warehouse):
| Field | Type | Default | Description |
|---|---|---|---|
account |
string | required | Snowflake account identifier |
user |
string | required | Snowflake user name |
privateKeyPath |
string | required | Path to PKCS#8 private key (use secret) |
database |
string | required | Target database |
schema |
string | required | Target schema |
table |
string | required | Target table |
warehouse |
string | - | Snowflake warehouse name |
role |
string | - | Snowflake role name |
batchSize |
int | 1000 | Rows per batch insert |
BigQuerySinkConfig (for connector: bigquery, rivven-warehouse):
| Field | Type | Default | Description |
|---|---|---|---|
projectId |
string | required | GCP project ID |
datasetId |
string | required | BigQuery dataset ID |
tableId |
string | required | BigQuery table ID |
credentialsFile |
string | - | Path to service account JSON |
useAdc |
bool | true | Use Application Default Credentials |
batchSize |
int | 500 | Rows per insert request |
autoCreateTable |
bool | false | Auto-create table if not exists |
RedshiftSinkConfig (for connector: redshift, rivven-warehouse):
| Field | Type | Default | Description |
|---|---|---|---|
host |
string | required | Redshift cluster endpoint |
port |
int | 5439 | Redshift port |
database |
string | required | Database name |
user |
string | required | Redshift user name |
schema |
string | public | Target schema |
table |
string | required | Target table |
sslMode |
string | prefer | disable, prefer, require, verify-ca, verify-full |
batchSize |
int | 1000 | Rows per batch insert |
TableSpec (CDC Tables)
| Field | Type | Default | Description |
|---|---|---|---|
schema |
string | - | Schema/namespace (e.g., "public") |
table |
string | required | Table name |
topic |
string | - | Override topic for this table |
columns |
[]string | [] | Columns to include (empty = all) |
excludeColumns |
[]string | [] | Columns to exclude |
columnMasks |
map[string]string | {} | Column masking rules |
Configuration
Operator Flags
| Flag | Env Var | Default | Description |
|---|---|---|---|
--metrics-addr |
METRICS_ADDR |
0.0.0.0:8080 |
Metrics server address |
--health-addr |
HEALTH_ADDR |
0.0.0.0:8081 |
Health probe address |
--leader-election |
LEADER_ELECTION |
true |
Enable leader election |
--namespace |
WATCH_NAMESPACE |
"" |
Namespace to watch (empty = all) |
Supported Connectors
Built-in Sources (rivven-connect core)
postgres-cdc- PostgreSQL Change Data Capture (logical replication)mysql-cdc- MySQL Change Data Capture (binlog replication)http- HTTP/Webhook sourcedatagen- Data generator for testing
Queue Sources (rivven-queue crate)
kafka-source- Apache Kafka consumer (migration from Kafka)mqtt-source- MQTT broker subscriber (IoT data ingestion)sqs-source- AWS SQS queue consumerpubsub-source- Google Cloud Pub/Sub subscriber
Built-in Sinks (rivven-connect core)
stdout- Standard output (debugging)http-webhook- HTTP webhooks
Queue Sinks (rivven-queue crate)
kafka-sink- Apache Kafka producer (hybrid deployments)
Storage Sinks (rivven-storage crate)
s3- Amazon S3 / MinIO object storagegcs- Google Cloud Storageazure-blob- Azure Blob Storage
Warehouse Sinks (rivven-warehouse crate)
snowflake- Snowflake Data Warehouse (Snowpipe Streaming)bigquery- Google BigQueryredshift- Amazon Redshift
Development
Build
Run Locally (against kind/minikube)
# Install CRDs
# Run operator locally
Run Tests
Generate CRD YAML
License
See root LICENSE file.