Skip to main content

danube_connect_core/
config.rs

1//! Configuration management for connectors.
2
3use crate::route::{SinkRoute, SourceRoute};
4use crate::{ConnectorError, ConnectorResult};
5use danube_client::SubType;
6use serde::de::DeserializeOwned;
7use serde::{Deserialize, Serialize};
8use std::env;
9use std::fs;
10use std::path::{Path, PathBuf};
11
12/// Main configuration for connectors
13///
14/// Can be created via:
15/// - `ConnectorConfig::from_env()` - load from environment variables
16/// - `ConnectorConfig::from_file()` - load from TOML file
17/// - Direct construction in code for full programmatic control
18///
19/// # Structure
20/// - **Mandatory fields**: `danube_service_url`, `connector_name`
21/// - **Optional fields**: `retry`, `processing`, `schemas` (with defaults)
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ConnectorConfig {
24    /// Danube broker service URL (mandatory, from DANUBE_SERVICE_URL env var)
25    pub danube_service_url: String,
26
27    /// Connector name (mandatory, from CONNECTOR_NAME env var, must be unique)
28    pub connector_name: String,
29
30    /// Retry settings (optional, from config file or defaults)
31    #[serde(default)]
32    pub retry: RetrySettings,
33
34    /// Processing and runtime settings (optional, from config file or defaults)
35    #[serde(default)]
36    pub processing: ProcessingSettings,
37
38    /// Schema mappings for topics (optional, for source connectors using schema registry)
39    #[serde(default)]
40    pub schemas: Vec<SchemaMapping>,
41}
42
43/// Trait for applying environment-variable overrides after deserialization.
44///
45/// Implementors can override this trait to apply environment-specific settings
46/// to their configuration values.
47pub trait ConfigEnvOverrides {
48    /// Apply environment-specific overrides to a deserialized configuration value.
49    fn apply_env_overrides(&mut self) -> ConnectorResult<()> {
50        Ok(())
51    }
52}
53
54/// Trait for validating configuration values after loading.
55///
56/// Implementors can override this trait to validate their configuration values
57/// and return an error for invalid settings.
58pub trait ConfigValidate {
59    /// Validate the loaded configuration and return an error for invalid settings.
60    fn validate_config(&self) -> ConnectorResult<()> {
61        Ok(())
62    }
63}
64
65/// Helper for loading connector configuration from TOML files and environment variables.
66///
67/// This loader provides a flexible way to load configuration values from various sources.
68#[derive(Debug, Clone)]
69pub struct ConnectorConfigLoader {
70    config_path_env: String,
71}
72
73impl Default for ConnectorConfigLoader {
74    fn default() -> Self {
75        Self {
76            config_path_env: "CONNECTOR_CONFIG_PATH".to_string(),
77        }
78    }
79}
80
81impl ConnectorConfigLoader {
82    /// Create a loader that reads the configuration path from `CONNECTOR_CONFIG_PATH`.
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Override the environment variable used to locate the configuration file.
88    pub fn with_path_env(mut self, env_var: impl Into<String>) -> Self {
89        self.config_path_env = env_var.into();
90        self
91    }
92
93    /// Load a configuration value from the configured path environment variable.
94    pub fn load<T>(&self) -> ConnectorResult<T>
95    where
96        T: DeserializeOwned + ConfigEnvOverrides + ConfigValidate,
97    {
98        let config_path = env::var(&self.config_path_env).map_err(|_| {
99            ConnectorError::config(format!(
100                "{} environment variable must be set to the path of the TOML configuration file",
101                self.config_path_env
102            ))
103        })?;
104
105        self.from_file(config_path)
106    }
107
108    /// Load a configuration value from a TOML file path.
109    pub fn from_file<T>(&self, path: impl AsRef<Path>) -> ConnectorResult<T>
110    where
111        T: DeserializeOwned + ConfigEnvOverrides + ConfigValidate,
112    {
113        let path = path.as_ref();
114        let content = fs::read_to_string(path).map_err(|e| {
115            ConnectorError::config(format!(
116                "Failed to read config file {}: {}",
117                path.display(),
118                e
119            ))
120        })?;
121
122        let source_name = path.display().to_string();
123        self.parse_str(&content, &source_name)
124    }
125
126    /// Parse a configuration value directly from TOML content.
127    pub fn parse_str<T>(&self, content: &str, source_name: &str) -> ConnectorResult<T>
128    where
129        T: DeserializeOwned + ConfigEnvOverrides + ConfigValidate,
130    {
131        let mut config: T = toml::from_str(content).map_err(|e| {
132            ConnectorError::config(format!(
133                "Failed to parse config file {}: {}",
134                source_name, e
135            ))
136        })?;
137
138        config.apply_env_overrides()?;
139        config.validate_config()?;
140        Ok(config)
141    }
142}
143
144impl ConnectorConfig {
145    /// Load `ConnectorConfig` using `ConnectorConfigLoader` defaults.
146    pub fn load() -> ConnectorResult<Self> {
147        ConnectorConfigLoader::new().load()
148    }
149
150    /// Load mandatory configuration from environment variables
151    ///
152    /// Only reads mandatory fields:
153    /// - `DANUBE_SERVICE_URL`: Danube broker URL (required)
154    /// - `CONNECTOR_NAME`: Unique connector name (required)
155    ///
156    /// All retry and processing settings use defaults.
157    /// To customize these, load from a config file or set them explicitly.
158    pub fn from_env() -> ConnectorResult<Self> {
159        let danube_service_url = env::var("DANUBE_SERVICE_URL")
160            .map_err(|_| ConnectorError::config("DANUBE_SERVICE_URL is required"))?;
161
162        let connector_name = env::var("CONNECTOR_NAME")
163            .map_err(|_| ConnectorError::config("CONNECTOR_NAME is required"))?;
164
165        Ok(Self {
166            danube_service_url,
167            connector_name,
168            retry: RetrySettings::default(),
169            processing: ProcessingSettings::default(),
170            schemas: Vec::new(),
171        })
172    }
173
174    /// Load configuration from a TOML file
175    pub fn from_file(path: &str) -> ConnectorResult<Self> {
176        let content = std::fs::read_to_string(path).map_err(|e| {
177            ConnectorError::config(format!("Failed to read config file {}: {}", path, e))
178        })?;
179
180        toml::from_str(&content).map_err(|e| {
181            ConnectorError::config(format!("Failed to parse config file {}: {}", path, e))
182        })
183    }
184
185    /// Apply environment variable overrides to mandatory fields only
186    ///
187    /// This only overrides `danube_service_url` and `connector_name`.
188    /// Retry and processing settings should come from config files, not env vars.
189    pub fn apply_env_overrides(&mut self) {
190        if let Ok(val) = env::var("DANUBE_SERVICE_URL") {
191            self.danube_service_url = val;
192        }
193        if let Ok(val) = env::var("CONNECTOR_NAME") {
194            self.connector_name = val;
195        }
196    }
197
198    /// Validate the configuration
199    ///
200    /// Called internally by the runtime. Users can also call this for early validation.
201    pub fn validate(&self) -> ConnectorResult<()> {
202        if self.danube_service_url.is_empty() {
203            return Err(ConnectorError::config("danube_service_url cannot be empty"));
204        }
205
206        if self.connector_name.is_empty() {
207            return Err(ConnectorError::config("connector_name cannot be empty"));
208        }
209
210        if self.retry.max_retries > 100 {
211            return Err(ConnectorError::config("max_retries too high (max 100)"));
212        }
213
214        if self.processing.batch_size == 0 {
215            return Err(ConnectorError::config("batch_size must be > 0"));
216        }
217
218        Ok(())
219    }
220}
221
222impl ConfigEnvOverrides for ConnectorConfig {
223    fn apply_env_overrides(&mut self) -> ConnectorResult<()> {
224        ConnectorConfig::apply_env_overrides(self);
225        Ok(())
226    }
227}
228
229impl ConfigValidate for ConnectorConfig {
230    fn validate_config(&self) -> ConnectorResult<()> {
231        self.validate()
232    }
233}
234
235impl Default for ConnectorConfig {
236    fn default() -> Self {
237        Self {
238            danube_service_url: "http://localhost:6650".to_string(),
239            connector_name: "default-connector".to_string(),
240            retry: RetrySettings::default(),
241            processing: ProcessingSettings::default(),
242            schemas: Vec::new(),
243        }
244    }
245}
246
247/// Retry configuration settings
248///
249/// Can be used via:
250/// - TOML config file: `[retry]` section
251/// - Direct construction in code for programmatic control
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct RetrySettings {
254    /// Maximum number of retries for failed operations
255    #[serde(default = "default_max_retries")]
256    pub max_retries: u32,
257
258    /// Base backoff duration in milliseconds
259    #[serde(default = "default_retry_backoff_ms")]
260    pub retry_backoff_ms: u64,
261
262    /// Maximum backoff duration in milliseconds
263    #[serde(default = "default_max_backoff_ms")]
264    pub max_backoff_ms: u64,
265}
266
267fn default_max_retries() -> u32 {
268    3
269}
270
271fn default_retry_backoff_ms() -> u64 {
272    1000
273}
274
275fn default_max_backoff_ms() -> u64 {
276    30000
277}
278
279impl Default for RetrySettings {
280    fn default() -> Self {
281        Self {
282            max_retries: 3,
283            retry_backoff_ms: 1000,
284            max_backoff_ms: 30000,
285        }
286    }
287}
288
289/// Processing and runtime configuration settings
290///
291/// Can be used via:
292/// - TOML config file: `[processing]` section
293/// - Direct construction in code for programmatic control
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct ProcessingSettings {
296    /// Batch size for batch processing
297    #[serde(default = "default_batch_size")]
298    pub batch_size: usize,
299
300    /// Batch timeout in milliseconds
301    #[serde(default = "default_batch_timeout_ms")]
302    pub batch_timeout_ms: u64,
303
304    /// Poll interval in milliseconds for source connectors
305    #[serde(default = "default_poll_interval_ms")]
306    pub poll_interval_ms: u64,
307
308    /// Metrics export port
309    #[serde(default = "default_metrics_port")]
310    pub metrics_port: u16,
311
312    /// Log level
313    #[serde(default = "default_log_level")]
314    pub log_level: String,
315
316    /// Interval between health checks, in milliseconds.
317    #[serde(default = "default_health_check_interval_ms")]
318    pub health_check_interval_ms: u64,
319
320    /// Number of consecutive failed health checks before reporting unhealthy status.
321    #[serde(default = "default_health_check_failure_threshold")]
322    pub health_check_failure_threshold: usize,
323}
324
325fn default_batch_size() -> usize {
326    1000
327}
328
329fn default_batch_timeout_ms() -> u64 {
330    1000
331}
332
333fn default_poll_interval_ms() -> u64 {
334    100
335}
336
337fn default_metrics_port() -> u16 {
338    9090
339}
340
341fn default_log_level() -> String {
342    "info".to_string()
343}
344
345fn default_health_check_interval_ms() -> u64 {
346    30000
347}
348
349fn default_health_check_failure_threshold() -> usize {
350    3
351}
352
353impl Default for ProcessingSettings {
354    fn default() -> Self {
355        Self {
356            batch_size: 1000,
357            batch_timeout_ms: 1000,
358            poll_interval_ms: 100,
359            metrics_port: 9090,
360            log_level: "info".to_string(),
361            health_check_interval_ms: 30000,
362            health_check_failure_threshold: 3,
363        }
364    }
365}
366
367/// Schema mapping configuration for topics
368///
369/// Maps a topic to its schema definition for source connectors using schema registry.
370///
371/// Can be used via:
372/// - TOML config file: `[[schemas]]` array
373/// - Direct construction in code for programmatic control
374#[derive(Debug, Clone, Serialize, Deserialize)]
375pub struct SchemaMapping {
376    /// Danube topic name (format: /{namespace}/{topic_name})
377    pub topic: String,
378
379    /// Schema subject name in the registry
380    pub subject: String,
381
382    /// Schema type (e.g., "json_schema", "avro", "protobuf")
383    pub schema_type: String,
384
385    /// Path to schema definition file
386    pub schema_file: PathBuf,
387
388    /// Auto-register schema on startup if it doesn't exist
389    #[serde(default = "default_auto_register")]
390    pub auto_register: bool,
391
392    /// Version strategy for this schema
393    #[serde(default)]
394    pub version_strategy: VersionStrategy,
395}
396
397fn default_auto_register() -> bool {
398    true
399}
400
401/// Subscription type for configuration
402///
403/// **Mandatory public API** - required for `ConsumerConfig`.
404///
405/// Mirrors `SubType` from danube-client but with Serialize/Deserialize for config files.
406#[derive(Debug, Clone, Serialize, Deserialize)]
407pub enum SubscriptionType {
408    /// Only one consumer on the subscription receives messages.
409    Exclusive,
410    /// Multiple consumers can share message delivery.
411    Shared,
412    /// A standby consumer takes over when the active consumer fails.
413    FailOver,
414}
415
416impl From<SubscriptionType> for SubType {
417    fn from(st: SubscriptionType) -> Self {
418        match st {
419            SubscriptionType::Exclusive => SubType::Exclusive,
420            SubscriptionType::Shared => SubType::Shared,
421            SubscriptionType::FailOver => SubType::FailOver,
422        }
423    }
424}
425
426/// Configuration for a Danube consumer
427///
428/// **Mandatory public API** - required by `SinkConnector::consumer_configs()` trait.
429///
430/// Specifies how to create a consumer for a specific topic, including subscription settings.
431#[derive(Debug, Clone)]
432pub struct ConsumerConfig {
433    /// Danube topic to consume from (format: /{namespace}/{topic_name})
434    pub topic: String,
435    /// Consumer name (for identification)
436    pub consumer_name: String,
437    /// Subscription name (shared across consumer instances)
438    pub subscription: String,
439    /// Subscription type (Exclusive, Shared, FailOver)
440    pub subscription_type: SubscriptionType,
441    /// Optional: Expected schema subject for validation
442    /// If set, runtime will validate that incoming messages match this schema
443    pub expected_schema_subject: Option<String>,
444}
445
446impl ConsumerConfig {
447    /// Convert this consumer configuration into the route representation used by the runtime.
448    pub fn route(&self) -> SinkRoute {
449        self.clone().into()
450    }
451
452    /// Build a consumer configuration from a sink route definition.
453    pub fn from_route(route: SinkRoute) -> Self {
454        route.into()
455    }
456}
457
458/// Configuration for a Danube producer
459///
460/// **Mandatory public API** - required by `SourceConnector::producer_configs()` trait.
461///
462/// Specifies how to create a producer for a specific topic, including partitioning
463/// and reliability settings.
464///
465/// Note: The `schema_config` field is internal and populated by the runtime from `SchemaMapping`.
466/// Always use `ProducerConfig::new()` or set `schema_config` to `None` when constructing manually.
467#[derive(Debug, Clone)]
468pub struct ProducerConfig {
469    /// Danube topic name (format: /{namespace}/{topic_name})
470    pub topic: String,
471    /// Number of partitions (0 = non-partitioned)
472    pub partitions: usize,
473    /// Use reliable dispatch (WAL + Cloud persistence)
474    pub reliable_dispatch: bool,
475    /// Internal: Schema configuration (populated by runtime from SchemaMapping)
476    /// Users should not set this - always use None
477    pub schema_config: Option<SchemaConfig>,
478}
479
480impl ProducerConfig {
481    /// Create a new ProducerConfig
482    pub fn new(topic: impl Into<String>, partitions: usize, reliable_dispatch: bool) -> Self {
483        Self {
484            topic: topic.into(),
485            partitions,
486            reliable_dispatch,
487            schema_config: None,
488        }
489    }
490
491    /// Convert this producer configuration into the route representation used by the runtime.
492    pub fn route(&self) -> SourceRoute {
493        self.clone().into()
494    }
495
496    /// Build a producer configuration from a source route definition.
497    pub fn from_route(route: SourceRoute) -> Self {
498        route.into()
499    }
500}
501
502/// Schema configuration for a topic
503///
504/// Can be used via:
505/// - TOML config file: Use `SchemaMapping` in `[[schemas]]` (recommended)
506/// - Direct construction in code for advanced programmatic control
507///
508/// Note: Usually populated automatically from `SchemaMapping` by the runtime.
509///
510/// The runtime converts `SchemaMapping` → `SchemaConfig` automatically when loading from files.
511#[derive(Debug, Clone, Serialize, Deserialize)]
512pub struct SchemaConfig {
513    /// Schema subject name in the registry
514    pub subject: String,
515
516    /// Schema type (JsonSchema, Avro, Protobuf, etc.)
517    pub schema_type: String,
518
519    /// Path to schema definition file
520    pub schema_file: PathBuf,
521
522    /// Auto-register schema on startup if it doesn't exist
523    #[serde(default = "default_schema_auto_register")]
524    pub auto_register: bool,
525
526    /// Version strategy for producers
527    #[serde(default)]
528    pub version_strategy: VersionStrategy,
529}
530
531fn default_schema_auto_register() -> bool {
532    true
533}
534
535/// Strategy for selecting schema version
536///
537/// Used in `SchemaMapping` and `SchemaConfig` to control which schema version producers use.
538///
539/// Can be used via:
540/// - TOML config file: `version_strategy` field in `[[schemas]]`
541/// - Direct construction in code for programmatic control
542#[derive(Debug, Clone, Default, Serialize, Deserialize)]
543#[serde(rename_all = "lowercase")]
544pub enum VersionStrategy {
545    /// Use the latest schema version (default)
546    #[default]
547    Latest,
548
549    /// Pin to a specific schema version
550    Pinned(u32),
551
552    /// Use minimum version or newer
553    Minimum(u32),
554}
555
556#[cfg(test)]
557mod tests {
558    use super::*;
559    use std::io::Write;
560    use tempfile::NamedTempFile;
561
562    #[test]
563    fn test_config_default() {
564        let config = ConnectorConfig::default();
565        assert_eq!(config.danube_service_url, "http://localhost:6650");
566        assert_eq!(config.connector_name, "default-connector");
567        assert_eq!(config.retry.max_retries, 3);
568        assert_eq!(config.processing.batch_size, 1000);
569        assert_eq!(config.processing.health_check_interval_ms, 30000);
570        assert_eq!(config.processing.health_check_failure_threshold, 3);
571    }
572
573    #[test]
574    fn test_config_validation() {
575        let mut config = ConnectorConfig::default();
576        assert!(config.validate().is_ok());
577
578        config.danube_service_url = "".to_string();
579        assert!(config.validate().is_err());
580
581        config.danube_service_url = "http://localhost:6650".to_string();
582        config.processing.batch_size = 0;
583        assert!(config.validate().is_err());
584    }
585
586    #[derive(Debug, Deserialize)]
587    struct LoaderTestConfig {
588        value: String,
589    }
590
591    impl ConfigValidate for LoaderTestConfig {
592        fn validate_config(&self) -> ConnectorResult<()> {
593            if self.value.is_empty() {
594                return Err(ConnectorError::config("value cannot be empty"));
595            }
596
597            Ok(())
598        }
599    }
600
601    impl ConfigEnvOverrides for LoaderTestConfig {}
602
603    #[test]
604    fn test_config_loader_from_file() {
605        let mut file = NamedTempFile::new().unwrap();
606        writeln!(file, "value = \"loaded\"").unwrap();
607
608        let config: LoaderTestConfig = ConnectorConfigLoader::new().from_file(file.path()).unwrap();
609
610        assert_eq!(config.value, "loaded");
611    }
612}