danube-connect-core
Core SDK for building high-performance connectors for the Danube messaging system.
Overview
danube-connect-core is a production-ready framework for building connectors that integrate Danube with external systems. Whether you're building a sink connector to export messages or a source connector to import data, this SDK provides everything you need.
Key Features
- Simple Trait-Based API - Implement just
SinkConnectororSourceConnectortraits - Multi-Topic Support - Sink connectors can consume from multiple topics, source connectors can publish to multiple topics dynamically
- Flexible Topic Configuration - Connectors specify their own topic/subscription settings
- Automatic Runtime Management - Lifecycle handling, message loops, and graceful shutdown
- Built-in Retry Logic - Exponential backoff with jitter for resilient integrations
- Observability - Prometheus metrics, structured logging, and health checks
- Message Utilities - Batching, serialization, and format conversion helpers
- Hybrid Configuration - Mandatory fields from env vars, optional settings from TOML files
- Error Handling - Comprehensive error types and recovery strategies
- Async/Await - Built on Tokio for high-performance async operations
Core Concepts
Traits
-
SinkConnector- Consumes messages from Danube and sends them to an external system- Implement
consumer_configs()to specify which topics to consume from - Implement
process()to handle individual messages - Supports multiple topic consumption
- Implement
-
SourceConnector- Reads data from an external system and publishes to Danube- Implement
poll()to read data and createSourceRecords - Use
ProducerConfigto specify per-record topic and partitioning - Supports dynamic multi-topic publishing
- Implement
Runtime
-
SinkRuntime- Manages the lifecycle of a sink connector- Creates multiple consumers based on
consumer_configs() - Handles message processing with retry logic
- Manages acknowledgments and dead-letter queues
- Creates multiple consumers based on
-
SourceRuntime- Manages the lifecycle of a source connector- Creates producers dynamically based on
ProducerConfig - Handles batching and publishing
- Manages offsets and checkpointing
- Creates producers dynamically based on
Both runtimes handle:
- Danube client connection management
- Message polling/processing loops
- Error recovery and automatic retries
- Graceful shutdown
- Signal handling (SIGTERM, SIGINT)
- Prometheus metrics and health checks
Configuration
Configuration uses a hybrid approach:
- Mandatory fields from environment variables
- Optional settings from TOML config files with sensible defaults
Environment Variables
Only mandatory deployment-specific fields:
# Required
DANUBE_SERVICE_URL=http://localhost:6650 # Danube broker URL
CONNECTOR_NAME=my-connector # Unique connector name
TOML Configuration File
Optional settings for retry and processing behavior:
# Mandatory fields (can also be set via env vars)
= "http://localhost:6650"
= "my-connector"
# Retry settings (optional, these are defaults)
[]
= 3
= 1000
= 30000
# Processing settings (optional, these are defaults)
[]
= 1000
= 1000
= 100
= 9090
= "info"
Load config:
// From environment only (uses defaults for retry/processing)
let config = from_env?;
// From TOML file (with env var overrides)
let mut config = from_file?;
config.apply_env_overrides;
// Programmatically
let config = ConnectorConfig ;
Topic Configuration
Sink connectors specify topics via consumer_configs() method:
async
Source connectors specify topics per record via ProducerConfig:
let record = from_string
.with_producer_config;
Message Types
SinkRecord- Message received from Danube (topic, offset, payload, attributes)SourceRecord- Message to publish to Danube (topic, payload, attributes)
Utilities
The SDK includes helpful utilities in the utils module:
Batcher<T>- Collect messages with size/timeout-based flushingHealthChecker- Track connector health with failure thresholdsserialization- JSON and string conversion helpers
use ;
use Duration;
// Batch messages
let mut batcher = new;
batcher.add;
if batcher.should_flush
// Track health
let mut health = new; // Unhealthy after 3 failures
health.record_failure;
if !health.is_healthy
Quick Start
Installation
Add to your Cargo.toml:
[]
= "0.3"
= { = "1", = ["full"] }
= "0.1"
Building Connectors
For detailed implementation guides, see:
- Connector Development Guide - Comprehensive guide on implementing sink and source connectors
- Complete MQTT Example - Full reference implementation with all patterns
Examples
The repository includes working examples:
simple_sink- Basic sink connector that prints messagessimple_source- Basic source connector that generates test messages
Run them with Docker:
# Start Danube cluster
# Run examples
Testing
Test your connector with the included Docker Compose setup:
# Start the cluster
# Run your connector
# Monitor metrics
For complete testing examples, see examples/source-mqtt.
Documentation
Connector Documentation
- Connector Development Guide - Conceptual guide for implementing connectors
- Architecture Overview - Connector framework architecture and design
- Message Patterns - Message transformation and data flow patterns
- Configuration Guide - Configuration architecture and best practices
Examples
- MQTT Source Connector - Complete reference implementation
- MQTT Example Setup - Full deployment with Docker Compose
- Docker Test Cluster - Local Danube cluster for testing
Performance
danube-connect-core is designed for high-performance integrations:
- Async/await - Built on Tokio for efficient async operations
- Batching - Process messages in batches for better throughput
- Connection pooling - Reuse connections to external systems
- Metrics - Monitor performance with Prometheus
Contributing
Contributions are welcome! Please see the main danube-connect repository for guidelines.
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Related Projects
- Danube - The core messaging system
- danube-connect - Connector repository
- Danube Docs - Official documentation