Crate danube_connect_core

Crate danube_connect_core 

Source
Expand description

§Danube Connect Core

Core SDK for building Danube connectors.

This library provides the foundational framework for creating connectors that integrate external systems with Danube Messaging. It handles all the complexity of communicating with Danube brokers, managing lifecycles, retries, and observability, allowing connector developers to focus solely on the integration logic.

§Overview

Connectors are standalone processes that either:

  • Sink: Consume messages from Danube and write to an external system
  • Source: Read from an external system and publish to Danube

§Quick Start

use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
use async_trait::async_trait;

pub struct MyConnector;

#[async_trait]
impl SinkConnector for MyConnector {
    async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
        // Setup your connector
        Ok(())
    }
     
    async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
        Ok(vec![ConsumerConfig {
            topic: "/default/my-topic".to_string(),
            consumer_name: "my-consumer".to_string(),
            subscription: "my-sub".to_string(),
            subscription_type: SubscriptionType::Exclusive,
        }])
    }
     
    async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()> {
        // Process message
        println!("Got message: {:?}", record.payload());
        Ok(())
    }
}

§Features

  • Automatic Lifecycle Management: The runtime handles initialization, message loops, and shutdown
  • Built-in Retry Logic: Configurable exponential backoff for transient failures
  • Message Transformation: Helpers for JSON, binary, and schema-based transformations
  • Observability: Automatic metrics, structured logging, and health checks
  • Configuration: Standard environment variable and file-based configuration

Re-exports§

pub use utils::Batcher;
pub use utils::HealthChecker;
pub use utils::HealthStatus;

Modules§

utils
Utility modules for connector development.

Structs§

ConnectorConfig
Main configuration for connectors
ConnectorMetrics
Metrics collector for connectors
ConsumerConfig
Configuration for a Danube consumer
DanubeMetadata
Danube-specific metadata for observability and debugging
MessageID
Offset
Checkpoint/offset information for source connectors
ProcessingSettings
Processing and runtime configuration settings
ProducerConfig
Configuration for a Danube producer
RetryConfig
Configuration for retry behavior
RetrySettings
Retry configuration settings
RetryStrategy
Retry strategy implementation
SinkRecord
Record passed to sink connectors (from Danube → External System)
SinkRuntime
Runtime for Sink Connectors (Danube → External System)
SourceRecord
Record passed from source connectors (External System → Danube)
SourceRuntime
Runtime for Source Connectors (External System → Danube)
StreamMessage

Enums§

ConnectorError
Error types for connector operations
SchemaType
Schema type for Danube messages
SubType
Represents the type of subscription
SubscriptionType
Subscription type for configuration (mirrors SubType but with Serialize/Deserialize)

Constants§

NAME
VERSION

Traits§

SinkConnector
Trait for implementing Sink Connectors (Danube → External System)
SourceConnector
Trait for implementing Source Connectors (External System → Danube)

Type Aliases§

ConnectorResult
Result type for connector operations