danube_connect_core/
config.rs

1//! Configuration management for connectors.
2
3use crate::{ConnectorError, ConnectorResult};
4use danube_client::SubType;
5use serde::{Deserialize, Serialize};
6use std::env;
7use std::path::PathBuf;
8
9/// Main configuration for connectors
10///
11/// Can be created via:
12/// - `ConnectorConfig::from_env()` - load from environment variables
13/// - `ConnectorConfig::from_file()` - load from TOML file
14/// - Direct construction in code for full programmatic control
15///
16/// # Structure
17/// - **Mandatory fields**: `danube_service_url`, `connector_name`
18/// - **Optional fields**: `retry`, `processing`, `schemas` (with defaults)
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ConnectorConfig {
21    /// Danube broker service URL (mandatory, from DANUBE_SERVICE_URL env var)
22    pub danube_service_url: String,
23
24    /// Connector name (mandatory, from CONNECTOR_NAME env var, must be unique)
25    pub connector_name: String,
26
27    /// Retry settings (optional, from config file or defaults)
28    #[serde(default)]
29    pub retry: RetrySettings,
30
31    /// Processing and runtime settings (optional, from config file or defaults)
32    #[serde(default)]
33    pub processing: ProcessingSettings,
34
35    /// Schema mappings for topics (optional, for source connectors using schema registry)
36    #[serde(default)]
37    pub schemas: Vec<SchemaMapping>,
38}
39
40impl ConnectorConfig {
41    /// Load mandatory configuration from environment variables
42    ///
43    /// Only reads mandatory fields:
44    /// - `DANUBE_SERVICE_URL`: Danube broker URL (required)
45    /// - `CONNECTOR_NAME`: Unique connector name (required)
46    ///
47    /// All retry and processing settings use defaults.
48    /// To customize these, load from a config file or set them explicitly.
49    pub fn from_env() -> ConnectorResult<Self> {
50        let danube_service_url = env::var("DANUBE_SERVICE_URL")
51            .map_err(|_| ConnectorError::config("DANUBE_SERVICE_URL is required"))?;
52
53        let connector_name = env::var("CONNECTOR_NAME")
54            .map_err(|_| ConnectorError::config("CONNECTOR_NAME is required"))?;
55
56        Ok(Self {
57            danube_service_url,
58            connector_name,
59            retry: RetrySettings::default(),
60            processing: ProcessingSettings::default(),
61            schemas: Vec::new(),
62        })
63    }
64
65    /// Load configuration from a TOML file
66    pub fn from_file(path: &str) -> ConnectorResult<Self> {
67        let content = std::fs::read_to_string(path).map_err(|e| {
68            ConnectorError::config(format!("Failed to read config file {}: {}", path, e))
69        })?;
70
71        toml::from_str(&content).map_err(|e| {
72            ConnectorError::config(format!("Failed to parse config file {}: {}", path, e))
73        })
74    }
75
76    /// Apply environment variable overrides to mandatory fields only
77    ///
78    /// This only overrides `danube_service_url` and `connector_name`.
79    /// Retry and processing settings should come from config files, not env vars.
80    pub fn apply_env_overrides(&mut self) {
81        if let Ok(val) = env::var("DANUBE_SERVICE_URL") {
82            self.danube_service_url = val;
83        }
84        if let Ok(val) = env::var("CONNECTOR_NAME") {
85            self.connector_name = val;
86        }
87    }
88
89    /// Validate the configuration
90    ///
91    /// Called internally by the runtime. Users can also call this for early validation.
92    pub(crate) fn validate(&self) -> ConnectorResult<()> {
93        if self.danube_service_url.is_empty() {
94            return Err(ConnectorError::config("danube_service_url cannot be empty"));
95        }
96
97        if self.connector_name.is_empty() {
98            return Err(ConnectorError::config("connector_name cannot be empty"));
99        }
100
101        if self.retry.max_retries > 100 {
102            return Err(ConnectorError::config("max_retries too high (max 100)"));
103        }
104
105        if self.processing.batch_size == 0 {
106            return Err(ConnectorError::config("batch_size must be > 0"));
107        }
108
109        Ok(())
110    }
111}
112
113impl Default for ConnectorConfig {
114    fn default() -> Self {
115        Self {
116            danube_service_url: "http://localhost:6650".to_string(),
117            connector_name: "default-connector".to_string(),
118            retry: RetrySettings::default(),
119            processing: ProcessingSettings::default(),
120            schemas: Vec::new(),
121        }
122    }
123}
124
125/// Retry configuration settings
126///
127/// Can be used via:
128/// - TOML config file: `[retry]` section
129/// - Direct construction in code for programmatic control
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct RetrySettings {
132    /// Maximum number of retries for failed operations
133    #[serde(default = "default_max_retries")]
134    pub max_retries: u32,
135
136    /// Base backoff duration in milliseconds
137    #[serde(default = "default_retry_backoff_ms")]
138    pub retry_backoff_ms: u64,
139
140    /// Maximum backoff duration in milliseconds
141    #[serde(default = "default_max_backoff_ms")]
142    pub max_backoff_ms: u64,
143}
144
145fn default_max_retries() -> u32 {
146    3
147}
148fn default_retry_backoff_ms() -> u64 {
149    1000
150}
151fn default_max_backoff_ms() -> u64 {
152    30000
153}
154
155impl Default for RetrySettings {
156    fn default() -> Self {
157        Self {
158            max_retries: 3,
159            retry_backoff_ms: 1000,
160            max_backoff_ms: 30000,
161        }
162    }
163}
164
165/// Processing and runtime configuration settings
166///
167/// Can be used via:
168/// - TOML config file: `[processing]` section
169/// - Direct construction in code for programmatic control
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct ProcessingSettings {
172    /// Batch size for batch processing
173    #[serde(default = "default_batch_size")]
174    pub batch_size: usize,
175
176    /// Batch timeout in milliseconds
177    #[serde(default = "default_batch_timeout_ms")]
178    pub batch_timeout_ms: u64,
179
180    /// Poll interval in milliseconds for source connectors
181    #[serde(default = "default_poll_interval_ms")]
182    pub poll_interval_ms: u64,
183
184    /// Metrics export port
185    #[serde(default = "default_metrics_port")]
186    pub metrics_port: u16,
187
188    /// Log level
189    #[serde(default = "default_log_level")]
190    pub log_level: String,
191}
192
193fn default_batch_size() -> usize {
194    1000
195}
196fn default_batch_timeout_ms() -> u64 {
197    1000
198}
199fn default_poll_interval_ms() -> u64 {
200    100
201}
202fn default_metrics_port() -> u16 {
203    9090
204}
205fn default_log_level() -> String {
206    "info".to_string()
207}
208
209impl Default for ProcessingSettings {
210    fn default() -> Self {
211        Self {
212            batch_size: 1000,
213            batch_timeout_ms: 1000,
214            poll_interval_ms: 100,
215            metrics_port: 9090,
216            log_level: "info".to_string(),
217        }
218    }
219}
220
221/// Schema mapping configuration for topics
222///
223/// Maps a topic to its schema definition for source connectors using schema registry.
224///
225/// Can be used via:
226/// - TOML config file: `[[schemas]]` array
227/// - Direct construction in code for programmatic control
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct SchemaMapping {
230    /// Danube topic name (format: /{namespace}/{topic_name})
231    pub topic: String,
232
233    /// Schema subject name in the registry
234    pub subject: String,
235
236    /// Schema type (e.g., "json_schema", "avro", "protobuf")
237    pub schema_type: String,
238
239    /// Path to schema definition file
240    pub schema_file: PathBuf,
241
242    /// Auto-register schema on startup if it doesn't exist
243    #[serde(default = "default_auto_register")]
244    pub auto_register: bool,
245
246    /// Version strategy for this schema
247    #[serde(default)]
248    pub version_strategy: VersionStrategy,
249}
250
251fn default_auto_register() -> bool {
252    true
253}
254
255/// Subscription type for configuration
256///
257/// **Mandatory public API** - required for `ConsumerConfig`.
258///
259/// Mirrors `SubType` from danube-client but with Serialize/Deserialize for config files.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub enum SubscriptionType {
262    Exclusive,
263    Shared,
264    FailOver,
265}
266
267impl From<SubscriptionType> for SubType {
268    fn from(st: SubscriptionType) -> Self {
269        match st {
270            SubscriptionType::Exclusive => SubType::Exclusive,
271            SubscriptionType::Shared => SubType::Shared,
272            SubscriptionType::FailOver => SubType::FailOver,
273        }
274    }
275}
276
277/// Configuration for a Danube consumer
278///
279/// **Mandatory public API** - required by `SinkConnector::consumer_configs()` trait.
280///
281/// Specifies how to create a consumer for a specific topic, including subscription settings.
282#[derive(Debug, Clone)]
283pub struct ConsumerConfig {
284    /// Danube topic to consume from (format: /{namespace}/{topic_name})
285    pub topic: String,
286    /// Consumer name (for identification)
287    pub consumer_name: String,
288    /// Subscription name (shared across consumer instances)
289    pub subscription: String,
290    /// Subscription type (Exclusive, Shared, FailOver)
291    pub subscription_type: SubscriptionType,
292    /// Optional: Expected schema subject for validation
293    /// If set, runtime will validate that incoming messages match this schema
294    pub expected_schema_subject: Option<String>,
295}
296
297/// Configuration for a Danube producer
298///
299/// **Mandatory public API** - required by `SourceConnector::producer_configs()` trait.
300///
301/// Specifies how to create a producer for a specific topic, including partitioning
302/// and reliability settings.
303///
304/// Note: The `schema_config` field is internal and populated by the runtime from `SchemaMapping`.
305/// Always use `ProducerConfig::new()` or set `schema_config` to `None` when constructing manually.
306#[derive(Debug, Clone)]
307pub struct ProducerConfig {
308    /// Danube topic name (format: /{namespace}/{topic_name})
309    pub topic: String,
310    /// Number of partitions (0 = non-partitioned)
311    pub partitions: usize,
312    /// Use reliable dispatch (WAL + Cloud persistence)
313    pub reliable_dispatch: bool,
314    /// Internal: Schema configuration (populated by runtime from SchemaMapping)
315    /// Users should not set this - always use None
316    pub schema_config: Option<SchemaConfig>,
317}
318
319impl ProducerConfig {
320    /// Create a new ProducerConfig
321    pub fn new(topic: impl Into<String>, partitions: usize, reliable_dispatch: bool) -> Self {
322        Self {
323            topic: topic.into(),
324            partitions,
325            reliable_dispatch,
326            schema_config: None,
327        }
328    }
329}
330
331/// Schema configuration for a topic
332///
333/// Can be used via:
334/// - TOML config file: Use `SchemaMapping` in `[[schemas]]` (recommended)
335/// - Direct construction in code for advanced programmatic control
336///
337/// Note: Usually populated automatically from `SchemaMapping` by the runtime.
338///
339/// The runtime converts `SchemaMapping` → `SchemaConfig` automatically when loading from files.
340#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct SchemaConfig {
342    /// Schema subject name in the registry
343    pub subject: String,
344
345    /// Schema type (JsonSchema, Avro, Protobuf, etc.)
346    pub schema_type: String,
347
348    /// Path to schema definition file
349    pub schema_file: PathBuf,
350
351    /// Auto-register schema on startup if it doesn't exist
352    #[serde(default = "default_schema_auto_register")]
353    pub auto_register: bool,
354
355    /// Version strategy for producers
356    #[serde(default)]
357    pub version_strategy: VersionStrategy,
358}
359
360fn default_schema_auto_register() -> bool {
361    true
362}
363
364/// Strategy for selecting schema version
365///
366/// Used in `SchemaMapping` and `SchemaConfig` to control which schema version producers use.
367///
368/// Can be used via:
369/// - TOML config file: `version_strategy` field in `[[schemas]]`
370/// - Direct construction in code for programmatic control
371#[derive(Debug, Clone, Default, Serialize, Deserialize)]
372#[serde(rename_all = "lowercase")]
373pub enum VersionStrategy {
374    /// Use the latest schema version (default)
375    #[default]
376    Latest,
377
378    /// Pin to a specific schema version
379    Pinned(u32),
380
381    /// Use minimum version or newer
382    Minimum(u32),
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn test_config_default() {
391        let config = ConnectorConfig::default();
392        assert_eq!(config.danube_service_url, "http://localhost:6650");
393        assert_eq!(config.connector_name, "default-connector");
394        assert_eq!(config.retry.max_retries, 3);
395        assert_eq!(config.processing.batch_size, 1000);
396    }
397
398    #[test]
399    fn test_config_validation() {
400        let mut config = ConnectorConfig::default();
401        assert!(config.validate().is_ok());
402
403        config.danube_service_url = "".to_string();
404        assert!(config.validate().is_err());
405
406        config.danube_service_url = "http://localhost:6650".to_string();
407        config.processing.batch_size = 0;
408        assert!(config.validate().is_err());
409    }
410}