danube_connect_core/
lib.rs

1//! # Danube Connect Core
2//!
3//! Core SDK for building Danube connectors.
4//!
5//! This library provides the foundational framework for creating connectors that integrate
6//! external systems with Danube Messaging. It handles all the complexity of communicating
7//! with Danube brokers, managing lifecycles, retries, and observability, allowing connector
8//! developers to focus solely on the integration logic.
9//!
10//! ## Overview
11//!
12//! Connectors are standalone processes that either:
13//! - **Sink**: Consume messages from Danube and write to an external system
14//! - **Source**: Read from an external system and publish to Danube
15//!
16//! ## Quick Start
17//!
18//! ```rust,no_run
19//! use danube_connect_core::{SinkConnector, SinkRecord, ConnectorConfig, ConnectorResult, ConsumerConfig, SubscriptionType};
20//! use async_trait::async_trait;
21//!
22//! pub struct MyConnector;
23//!
24//! #[async_trait]
25//! impl SinkConnector for MyConnector {
26//!     async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()> {
27//!         // Setup your connector
28//!         Ok(())
29//!     }
30//!     
31//!     async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>> {
32//!         Ok(vec![ConsumerConfig {
33//!             topic: "/default/my-topic".to_string(),
34//!             consumer_name: "my-consumer".to_string(),
35//!             subscription: "my-sub".to_string(),
36//!             subscription_type: SubscriptionType::Exclusive,
37//!             expected_schema_subject: None,
38//!         }])
39//!     }
40//!     
41//!     async fn process(&mut self, record: SinkRecord) -> ConnectorResult<()> {
42//!         // Process message
43//!         println!("Got message: {:?}", record.payload());
44//!         Ok(())
45//!     }
46//! }
47//! ```
48//!
49//! ## Features
50//!
51//! - **Automatic Lifecycle Management**: The runtime handles initialization, message loops, and shutdown
52//! - **Built-in Retry Logic**: Configurable exponential backoff for transient failures
53//! - **Message Transformation**: Helpers for JSON, binary, and schema-based transformations
54//! - **Observability**: Automatic metrics, structured logging, and health checks
55//! - **Configuration**: Standard environment variable and file-based configuration
56
57mod config;
58mod error;
59mod message;
60mod metrics;
61mod retry;
62mod runtime;
63mod schema;
64mod traits;
65pub mod utils;
66
67// Re-export public API
68pub use config::{
69    ConnectorConfig, ConsumerConfig, ProcessingSettings, ProducerConfig, RetrySettings,
70    SchemaConfig, SchemaMapping, SubscriptionType, VersionStrategy,
71};
72pub use error::{ConnectorError, ConnectorResult};
73pub use message::{DanubeMetadata, SinkRecord, SourceRecord};
74pub use metrics::ConnectorMetrics;
75pub use runtime::{SinkRuntime, SourceRuntime};
76pub use schema::SchemaType;
77pub use traits::{Offset, SinkConnector, SourceConnector};
78pub use utils::{Batcher, HealthChecker, HealthStatus};
79
80// Re-export commonly used types from danube-client
81pub use danube_client::{SchemaInfo, SubType};
82
83// Version info
84pub const VERSION: &str = env!("CARGO_PKG_VERSION");
85pub const NAME: &str = env!("CARGO_PKG_NAME");