Skip to main content

Crate danube_connect_core

Crate danube_connect_core 

Source
Expand description

§Danube Connect Core

Core SDK for building Danube connectors.

This library provides the foundational framework for creating connectors that integrate external systems with Danube Messaging. It handles all the complexity of communicating with Danube brokers, managing lifecycles, retries, and observability, allowing connector developers to focus solely on the integration logic.

§Overview

Connectors are standalone processes that either:

  • Sink: Consume messages from Danube and write to an external system
  • Source: Read from an external system and publish to Danube

§Quick Start

use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
use async_trait::async_trait;

pub struct MyConnector;

#[async_trait]
impl SinkConnector for MyConnector {
    async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
        // Setup your connector
        Ok(())
    }
     
    async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
        Ok(vec![ConsumerConfig {
            topic: "/default/my-topic".to_string(),
            consumer_name: "my-consumer".to_string(),
            subscription: "my-sub".to_string(),
            subscription_type: SubscriptionType::Exclusive,
            expected_schema_subject: None,
        }])
    }
     
    async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
        // Process message batch
        for record in records {
            println!("Got message: {:?}", record.payload());
        }
        Ok(())
    }
}

§Features

  • Automatic Lifecycle Management: The runtime handles initialization, message loops, and shutdown
  • Built-in Retry Logic: Configurable exponential backoff for transient failures
  • Message Transformation: Helpers for JSON, binary, and schema-based transformations
  • Observability: Automatic metrics, structured logging, and health checks
  • Configuration: Standard environment variable and file-based configuration

Re-exports§

pub use utils::Batcher;
pub use utils::HealthChecker;
pub use utils::HealthStatus;

Modules§

utils
Utility modules for connector development.

Structs§

ConnectorConfig
Main configuration for connectors
ConnectorConfigLoader
Helper for loading connector configuration from TOML files and environment variables.
ConnectorMetrics
Metrics collector for connectors
ConsumerConfig
Configuration for a Danube consumer
DanubeMetadata
Danube-specific metadata for observability and debugging
Offset
Checkpoint/offset information for source connectors
ProcessingSettings
Processing and runtime configuration settings
ProducerConfig
Configuration for a Danube producer
RecordContext
Full record context combining routing data with source metadata.
RetrySettings
Retry configuration settings
RouteDispatchPolicy
Dispatch behavior for a source route.
RouteSchemaPolicy
Schema-related routing policy shared by sink and source routes.
RouteSubscriptionPolicy
Subscription behavior for a sink route.
RoutingContext
Routing metadata shared by source and sink records.
SchemaConfig
Schema configuration for a topic
SchemaInfo
Information about a schema retrieved from the registry
SchemaMapping
Schema mapping configuration for topics
SinkRecord
Record passed to sink connectors (from Danube → External System)
SinkRoute
Complete sink-side routing definition derived from ConsumerConfig.
SinkRuntime
Runtime for sink connectors that consume from Danube and write to external systems. Runtime for Sink Connectors (Danube → External System)
SourceEnvelope
A source record together with optional checkpoint information.
SourceRecord
Record passed from source connectors (External System → Danube)
SourceRoute
Complete source-side routing definition derived from ProducerConfig.
SourceRuntime
Runtime for source connectors that read from external systems and publish to Danube. Runtime for Source Connectors (External System → Danube)
SourceSender
Handle used by streaming connectors to emit records into the runtime.

Enums§

ConnectorError
Error types for connector operations
SchemaType
Canonical schema type enum used by connector configuration and schema registration. Schema types supported by the registry
SourceConnectorMode
Execution model for a source connector.
SubType
Represents the type of subscription
SubscriptionType
Subscription type for configuration
VersionStrategy
Strategy for selecting schema version

Constants§

NAME
Crate name for the currently built danube-connect-core package.
VERSION
Crate version for the currently built danube-connect-core package.

Traits§

ConfigEnvOverrides
Trait for applying environment-variable overrides after deserialization.
ConfigValidate
Trait for validating configuration values after loading.
SinkConnector
Trait for implementing Sink Connectors (Danube → External System)
SourceConnector
Trait for implementing Source Connectors (External System → Danube)

Type Aliases§

ConnectorResult
Result type for connector operations