danube-connect-core
Core SDK for building high-performance connectors for the Danube messaging system.
Overview
danube-connect-core is a production-ready framework for building connectors that integrate Danube with external systems. Whether you're building a sink connector to export messages or a source connector to import data, this SDK provides everything you need.
Key Features
- 🔌 Simple Trait-Based API - Implement
SinkConnectororSourceConnector - 📦 Batch-First Sink Runtime - Sink connectors process
Vec<SinkRecord>batches - 🌊 Polling or Streaming Sources - Source connectors can use
poll()orstart_streaming() - 📋 Schema Registry Integration - Automatic schema-aware serialization/deserialization
- 🔄 Multi-Topic Routing - Handle multiple topics with per-route dispatch and schema policy
- ⚙️ Automatic Runtime Management - Lifecycle, retry, metrics, and graceful shutdown
- Observability - Prometheus metrics plus health-check hooks for connectors
- 🛠️ Message Utilities - Batching, type conversion, routing, and record context helpers
- ⚡ Async Runtime - Built on Tokio for efficient connector execution
Core Concepts
Connector Traits
SinkConnector (Danube → External System)
- Consume messages from Danube topics
- Receive batches of
SinkRecord - Work with typed
serde_json::Valuepayloads already deserialized by the runtime - Write to external databases, APIs, or services using
process_batch()
SourceConnector (External System → Danube)
- Read from external systems
- Emit
SourceRecordvalues wrapped inSourceEnvelope - Support both polling and streaming execution modes
- Let the runtime handle schema-aware serialization before publishing
Schema Registry Integration
The runtime automatically handles schema operations - your connector works with typed data:
For Sink Connectors:
- Runtime deserializes broker payloads using schema metadata
SinkRecordexposes typedserde_json::Valuepayloads plus schema/context helpers- You can deserialize to your domain type with
record.as_type::<T>()
For Source Connectors:
- Runtime matches configured schema policy to the target topic
- Source payloads are serialized according to the configured schema type
- Schemas can be auto-registered and versioned through
SchemaMapping/SchemaConfig
Benefits:
- ✅ Zero schema boilerplate in connector code
- ✅ Centralized schema registration and validation
- ✅ Generic record/context APIs that stay platform-agnostic
- ✅ Cached schema lookups for repeated use
Configuration
Connectors can be configured in two ways:
1. TOML Configuration (recommended for standalone connectors):
= "http://localhost:6650"
= "my-connector"
[]
= 500
= 1000
[[]]
= "/events/users"
= "user-events-schema"
= "json_schema"
= "schemas/user-event.json"
= "latest"
# Include connector-specific settings
[]
= "mqtt://localhost:1883"
2. Programmatic Configuration (for embedding in applications):
let config = ConnectorConfig ;
For file-based loading in standalone connectors, use ConnectorConfigLoader.
See Programmatic Configuration Guide for embedding-oriented examples.
The runtime automatically:
- Loads schema definitions from files
- Registers schemas with Danube's schema registry
- Caches schemas for performance
- Handles schema versioning and evolution
Message Types
SinkRecord - Messages from Danube
payload()- Returns&serde_json::Value(typed data, already deserialized)as_type<T>()- Deserialize to specific Rust typeschema()- Schema metadata (subject, version, type)topic(),partition(),attributes(),publish_time()- Message metadatarouting_context()/context()- Generic routing and record metadata helpers
SourceRecord - Messages to Danube
new(topic, payload)- Create fromserde_json::Valuefrom_json(topic, data)- Create from any serializable typefrom_string(topic, text)- Create from stringfrom_number(topic, number)- Create from numeric valuefrom_avro(topic, data)- Create from Avro-compatible structfrom_bytes(topic, data)- Create from binary datawith_attribute(),with_key()- Add metadatarouting_context()/context()- Inspect generic routing metadata
SourceEnvelope - Source records plus optional checkpoint information
SourceEnvelope::new(record)- Emit a record with no offsetSourceEnvelope::with_offset(record, offset)- Emit a record plus checkpoint data- Used by
SourceConnector::poll()andSourceSender
Utilities
Batcher<T>- Message batching with size/timeout-based flushingHealthChecker- Health tracking with failure thresholdsConnectorMetrics- Prometheus metrics integration
Installation
Add to your Cargo.toml:
[]
= "0.5"
= { = "1", = ["full"] }
= "0.1"
Documentation
Getting Started
- 📖 Connector Overview - Introduction and key concepts
- 🏗️ Architecture Guide - Design, patterns, and schema registry
- 🛠️ Sink Connector Development Guide - Build batch-first sink connectors
- 🛠️ Source Connector Development Guide - Build polling or streaming source connectors
- ⚙️ Programmatic Configuration - Embed runtimes in your own application
API Reference
- 📚 API Documentation - Complete API reference on docs.rs
Examples & Source Code
-
🔌 Production Connectors - Reference implementations:
- MQTT Source Connector (IoT integration)
- HTTP/Webhook Source Connector (API ingestion)
- Qdrant Sink Connector (Vector embeddings)
- SurrealDB Sink Connector (Multi-model database)
- Delta Lake Sink Connector (Data lake)
-
💻 SDK Source Code - Core framework implementation
Performance & Best Practices
danube-connect-core is optimized for production workloads:
- Async/await - Built on Tokio for efficient async I/O
- Schema caching - Automatic in-memory schema lookups for efficient reuse
- Batching - Configurable sink batching and source publish loops
- Runtime reuse - Shared runtime logic for retries, metrics, and lifecycle
- Metrics - Built-in Prometheus metrics for monitoring and alerting
- Context helpers - Generic routing and record context without platform-specific coupling
See the connector development guides above for schema evolution strategies and deployment patterns.
Contributing
Contributions are welcome! Here's how you can help:
- Core SDK Improvements: Enhance the connector framework, add new features, or improve performance
- Documentation: Improve guides, add examples, or clarify concepts
- Bug Fixes: Report and fix issues
- Testing: Add test coverage and integration tests
Please open an issue or pull request on GitHub.
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Ecosystem
- Danube Messaging - High-performance message broker
- Danube Connectors - Connectors implementations
- Danube Documentation - Complete documentation portal
Resources
- 📦 Crates.io: danube-connect-core
- 📚 API Docs: docs.rs/danube-connect-core
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions