Expand description
§Danube Connect Core
Core SDK for building Danube connectors.
This library provides the foundational framework for creating connectors that integrate external systems with Danube Messaging. It handles all the complexity of communicating with Danube brokers, managing lifecycles, retries, and observability, allowing connector developers to focus solely on the integration logic.
§Overview
Connectors are standalone processes that either:
- Sink: Consume messages from Danube and write to an external system
- Source: Read from an external system and publish to Danube
§Quick Start
use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
use async_trait::async_trait;
pub struct MyConnector;
#[async_trait]
impl SinkConnector for MyConnector {
async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
// Setup your connector
Ok(())
}
async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
Ok(vec![ConsumerConfig {
topic: "/default/my-topic".to_string(),
consumer_name: "my-consumer".to_string(),
subscription: "my-sub".to_string(),
subscription_type: SubscriptionType::Exclusive,
}])
}
async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()> {
// Process message
println!("Got message: {:?}", record.payload());
Ok(())
}
}§Features
- Automatic Lifecycle Management: The runtime handles initialization, message loops, and shutdown
- Built-in Retry Logic: Configurable exponential backoff for transient failures
- Message Transformation: Helpers for JSON, binary, and schema-based transformations
- Observability: Automatic metrics, structured logging, and health checks
- Configuration: Standard environment variable and file-based configuration
Re-exports§
pub use utils::Batcher;pub use utils::HealthChecker;pub use utils::HealthStatus;
Modules§
- utils
- Utility modules for connector development.
Structs§
- Connector
Config - Main configuration for connectors
- Connector
Metrics - Metrics collector for connectors
- Consumer
Config - Configuration for a Danube consumer
- Danube
Metadata - Danube-specific metadata for observability and debugging
- MessageID
- Offset
- Checkpoint/offset information for source connectors
- Processing
Settings - Processing and runtime configuration settings
- Producer
Config - Configuration for a Danube producer
- Retry
Config - Configuration for retry behavior
- Retry
Settings - Retry configuration settings
- Retry
Strategy - Retry strategy implementation
- Sink
Record - Record passed to sink connectors (from Danube → External System)
- Sink
Runtime - Runtime for Sink Connectors (Danube → External System)
- Source
Record - Record passed from source connectors (External System → Danube)
- Source
Runtime - Runtime for Source Connectors (External System → Danube)
- Stream
Message
Enums§
- Connector
Error - Error types for connector operations
- Schema
Type - Schema type for Danube messages
- SubType
- Represents the type of subscription
- Subscription
Type - Subscription type for configuration (mirrors SubType but with Serialize/Deserialize)
Constants§
Traits§
- Sink
Connector - Trait for implementing Sink Connectors (Danube → External System)
- Source
Connector - Trait for implementing Source Connectors (External System → Danube)
Type Aliases§
- Connector
Result - Result type for connector operations