danube-connect-core
Core SDK for building high-performance connectors for the Danube messaging system.
Overview
danube-connect-core is a production-ready framework for building connectors that integrate Danube with external systems. Whether you're building a sink connector to export messages or a source connector to import data, this SDK provides everything you need.
Key Features
- 🔌 Simple Trait-Based API - Implement just
SinkConnectororSourceConnectortraits - 📋 Schema Registry Integration - Automatic schema-aware serialization/deserialization
- 🔄 Multi-Topic Support - Handle multiple topics with different schemas per connector
- 🎯 Zero Schema Boilerplate - Runtime handles all schema operations transparently
- ⚙️ Automatic Runtime Management - Lifecycle, message loops, and graceful shutdown
- 🔁 Built-in Retry Logic - Exponential backoff with jitter for resilient integrations
- 📊 Observability - Prometheus metrics, structured logging, and health checks
- 🛠️ Message Utilities - Batching, type conversion, and validation helpers
- ⚡ High Performance - Async/await with Tokio, connection pooling, schema caching
Core Concepts
Connector Traits
SinkConnector (Danube → External System)
- Consume messages from Danube topics
- Receive typed
serde_json::Valuedata (already deserialized by runtime) - Write to external databases, APIs, or services
SourceConnector (External System → Danube)
- Read from external systems
- Provide typed
serde_json::Valuedata - Runtime handles schema-aware serialization before publishing
Schema Registry Integration
The runtime automatically handles schema operations - your connector works with typed data:
For Sink Connectors:
- Runtime deserializes messages based on their schema
- You receive
SinkRecordwith typedserde_json::Valuepayload - Access data directly or deserialize to structs with
as_type<T>() - Schema info available via
record.schema()
For Source Connectors:
- Create
SourceRecordwith typed data using helper methods - Runtime serializes based on topic's configured schema
- Automatic schema registration and validation
- Support for JSON Schema, String, Bytes, Number, and Avro (Protobuf coming soon)
Benefits:
- ✅ Zero schema boilerplate in connector code
- ✅ Type safety and data validation
- ✅ Managed schema evolution with version strategies
- ✅ Automatic caching for performance
- ✅ Centralized schema management
Runtime Management
Both SinkRuntime and SourceRuntime handle:
- Schema registry client initialization and caching
- Schema-aware serialization/deserialization
- Message polling and processing loops
- Automatic retry with exponential backoff
- Graceful shutdown and signal handling
- Prometheus metrics and health checks
Configuration
Connectors can be configured in two ways:
1. TOML Configuration (recommended for standalone connectors):
= "http://localhost:6650"
= "my-connector"
[[]]
= "/events/users"
= "user-events-schema"
= "json_schema"
= "schemas/user-event.json"
= "latest" # or "pinned" or "minimum"
# Include connector-specific settings
[]
= "mqtt://localhost:1883"
2. Programmatic Configuration (for embedding in applications):
let config = ConnectorConfig ;
See Programmatic Configuration Guide for complete details.
The runtime automatically:
- Loads schema definitions from files
- Registers schemas with Danube's schema registry
- Caches schemas for performance
- Handles schema versioning and evolution
Message Types
SinkRecord - Messages from Danube
payload()- Returns&serde_json::Value(typed data, already deserialized)as_type<T>()- Deserialize to specific Rust typeschema()- Schema metadata (subject, version, type)topic(),offset(),attributes(),publish_time()- Message metadata
SourceRecord - Messages to Danube
new(topic, payload)- Create fromserde_json::Valuefrom_json(topic, data)- Create from any serializable typefrom_string(topic, text)- Create from stringfrom_number(topic, number)- Create from numeric valuefrom_avro(topic, data)- Create from Avro-compatible structfrom_bytes(topic, data)- Create from binary datawith_attribute(),with_key()- Add metadata
Utilities
Batcher<T>- Message batching with size/timeout-based flushingHealthChecker- Health tracking with failure thresholdsConnectorMetrics- Prometheus metrics integration
Installation
Add to your Cargo.toml:
[]
= "0.3"
= { = "1", = ["full"] }
= "0.1"
Documentation
Getting Started
- 📖 Connector Overview - Introduction and key concepts
- 🏗️ Architecture Guide - Design, patterns, and schema registry
- 🛠️ Development Guide - Build your own connector with schema registry
- ⚙️ Programmatic Configuration - Use connectors in your application without TOML files
API Reference
- 📚 API Documentation - Complete API reference on docs.rs
Examples & Source Code
-
🔌 Production Connectors - Reference implementations:
- MQTT Source Connector (IoT integration)
- HTTP/Webhook Source Connector (API ingestion)
- Qdrant Sink Connector (Vector embeddings)
- SurrealDB Sink Connector (Multi-model database)
- Delta Lake Sink Connector (Data lake)
-
💻 SDK Source Code - Core framework implementation
Performance & Best Practices
danube-connect-core is optimized for production workloads:
- Async/await - Built on Tokio for efficient async I/O
- Schema caching - Automatic in-memory schema cache with LRU eviction
- Batching - Process messages in configurable batches for higher throughput
- Connection pooling - Reuse connections to external systems and Danube
- Metrics - Built-in Prometheus metrics for monitoring and alerting
- Health checks - HTTP endpoint for Kubernetes liveness/readiness probes
See the Development Guide for schema evolution strategies and production deployment patterns.
Contributing
Contributions are welcome! Here's how you can help:
- Core SDK Improvements: Enhance the connector framework, add new features, or improve performance
- Documentation: Improve guides, add examples, or clarify concepts
- Bug Fixes: Report and fix issues
- Testing: Add test coverage and integration tests
Please open an issue or pull request on GitHub.
License
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Ecosystem
- Danube Messaging - High-performance message broker
- Danube Connectors - Connectors implementations
- Danube Documentation - Complete documentation portal
Resources
- 📦 Crates.io: danube-connect-core
- 📚 API Docs: docs.rs/danube-connect-core
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions