Expand description
§Danube Connect Core
Core SDK for building Danube connectors.
This library provides the foundational framework for creating connectors that integrate external systems with Danube Messaging. It handles all the complexity of communicating with Danube brokers, managing lifecycles, retries, and observability, allowing connector developers to focus solely on the integration logic.
§Overview
Connectors are standalone processes that either:
- Sink: Consume messages from Danube and write to an external system
- Source: Read from an external system and publish to Danube
§Quick Start
use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
use async_trait::async_trait;
pub struct MyConnector;
#[async_trait]
impl SinkConnector for MyConnector {
async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
// Setup your connector
Ok(())
}
async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
Ok(vec![ConsumerConfig {
topic: "/default/my-topic".to_string(),
consumer_name: "my-consumer".to_string(),
subscription: "my-sub".to_string(),
subscription_type: SubscriptionType::Exclusive,
expected_schema_subject: None,
}])
}
async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
// Process message batch
for record in records {
println!("Got message: {:?}", record.payload());
}
Ok(())
}
}§Features
- Automatic Lifecycle Management: The runtime handles initialization, message loops, and shutdown
- Built-in Retry Logic: Configurable exponential backoff for transient failures
- Message Transformation: Helpers for JSON, binary, and schema-based transformations
- Observability: Automatic metrics, structured logging, and health checks
- Configuration: Standard environment variable and file-based configuration
Re-exports§
pub use utils::Batcher;pub use utils::HealthChecker;pub use utils::HealthStatus;
Modules§
- utils
- Utility modules for connector development.
Structs§
- Connector
Config - Main configuration for connectors
- Connector
Config Loader - Helper for loading connector configuration from TOML files and environment variables.
- Connector
Metrics - Metrics collector for connectors
- Consumer
Config - Configuration for a Danube consumer
- Danube
Metadata - Danube-specific metadata for observability and debugging
- Offset
- Checkpoint/offset information for source connectors
- Processing
Settings - Processing and runtime configuration settings
- Producer
Config - Configuration for a Danube producer
- Record
Context - Full record context combining routing data with source metadata.
- Retry
Settings - Retry configuration settings
- Route
Dispatch Policy - Dispatch behavior for a source route.
- Route
Schema Policy - Schema-related routing policy shared by sink and source routes.
- Route
Subscription Policy - Subscription behavior for a sink route.
- Routing
Context - Routing metadata shared by source and sink records.
- Schema
Config - Schema configuration for a topic
- Schema
Info - Information about a schema retrieved from the registry
- Schema
Mapping - Schema mapping configuration for topics
- Sink
Record - Record passed to sink connectors (from Danube → External System)
- Sink
Route - Complete sink-side routing definition derived from
ConsumerConfig. - Sink
Runtime - Runtime for sink connectors that consume from Danube and write to external systems. Runtime for Sink Connectors (Danube → External System)
- Source
Envelope - A source record together with optional checkpoint information.
- Source
Record - Record passed from source connectors (External System → Danube)
- Source
Route - Complete source-side routing definition derived from
ProducerConfig. - Source
Runtime - Runtime for source connectors that read from external systems and publish to Danube. Runtime for Source Connectors (External System → Danube)
- Source
Sender - Handle used by streaming connectors to emit records into the runtime.
Enums§
- Connector
Error - Error types for connector operations
- Schema
Type - Canonical schema type enum used by connector configuration and schema registration. Schema types supported by the registry
- Source
Connector Mode - Execution model for a source connector.
- SubType
- Represents the type of subscription
- Subscription
Type - Subscription type for configuration
- Version
Strategy - Strategy for selecting schema version
Constants§
- NAME
- Crate name for the currently built
danube-connect-corepackage. - VERSION
- Crate version for the currently built
danube-connect-corepackage.
Traits§
- Config
EnvOverrides - Trait for applying environment-variable overrides after deserialization.
- Config
Validate - Trait for validating configuration values after loading.
- Sink
Connector - Trait for implementing Sink Connectors (Danube → External System)
- Source
Connector - Trait for implementing Source Connectors (External System → Danube)
Type Aliases§
- Connector
Result - Result type for connector operations