1use crate::route::{SinkRoute, SourceRoute};
4use crate::{ConnectorError, ConnectorResult};
5use danube_client::SubType;
6use serde::de::DeserializeOwned;
7use serde::{Deserialize, Serialize};
8use std::env;
9use std::fs;
10use std::path::{Path, PathBuf};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ConnectorConfig {
24 pub danube_service_url: String,
26
27 pub connector_name: String,
29
30 #[serde(default)]
32 pub retry: RetrySettings,
33
34 #[serde(default)]
36 pub processing: ProcessingSettings,
37
38 #[serde(default)]
40 pub schemas: Vec<SchemaMapping>,
41}
42
43pub trait ConfigEnvOverrides {
48 fn apply_env_overrides(&mut self) -> ConnectorResult<()> {
50 Ok(())
51 }
52}
53
54pub trait ConfigValidate {
59 fn validate_config(&self) -> ConnectorResult<()> {
61 Ok(())
62 }
63}
64
65#[derive(Debug, Clone)]
69pub struct ConnectorConfigLoader {
70 config_path_env: String,
71}
72
73impl Default for ConnectorConfigLoader {
74 fn default() -> Self {
75 Self {
76 config_path_env: "CONNECTOR_CONFIG_PATH".to_string(),
77 }
78 }
79}
80
81impl ConnectorConfigLoader {
82 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn with_path_env(mut self, env_var: impl Into<String>) -> Self {
89 self.config_path_env = env_var.into();
90 self
91 }
92
93 pub fn load<T>(&self) -> ConnectorResult<T>
95 where
96 T: DeserializeOwned + ConfigEnvOverrides + ConfigValidate,
97 {
98 let config_path = env::var(&self.config_path_env).map_err(|_| {
99 ConnectorError::config(format!(
100 "{} environment variable must be set to the path of the TOML configuration file",
101 self.config_path_env
102 ))
103 })?;
104
105 self.from_file(config_path)
106 }
107
108 pub fn from_file<T>(&self, path: impl AsRef<Path>) -> ConnectorResult<T>
110 where
111 T: DeserializeOwned + ConfigEnvOverrides + ConfigValidate,
112 {
113 let path = path.as_ref();
114 let content = fs::read_to_string(path).map_err(|e| {
115 ConnectorError::config(format!(
116 "Failed to read config file {}: {}",
117 path.display(),
118 e
119 ))
120 })?;
121
122 let source_name = path.display().to_string();
123 self.parse_str(&content, &source_name)
124 }
125
126 pub fn parse_str<T>(&self, content: &str, source_name: &str) -> ConnectorResult<T>
128 where
129 T: DeserializeOwned + ConfigEnvOverrides + ConfigValidate,
130 {
131 let mut config: T = toml::from_str(content).map_err(|e| {
132 ConnectorError::config(format!(
133 "Failed to parse config file {}: {}",
134 source_name, e
135 ))
136 })?;
137
138 config.apply_env_overrides()?;
139 config.validate_config()?;
140 Ok(config)
141 }
142}
143
144impl ConnectorConfig {
145 pub fn load() -> ConnectorResult<Self> {
147 ConnectorConfigLoader::new().load()
148 }
149
150 pub fn from_env() -> ConnectorResult<Self> {
159 let danube_service_url = env::var("DANUBE_SERVICE_URL")
160 .map_err(|_| ConnectorError::config("DANUBE_SERVICE_URL is required"))?;
161
162 let connector_name = env::var("CONNECTOR_NAME")
163 .map_err(|_| ConnectorError::config("CONNECTOR_NAME is required"))?;
164
165 Ok(Self {
166 danube_service_url,
167 connector_name,
168 retry: RetrySettings::default(),
169 processing: ProcessingSettings::default(),
170 schemas: Vec::new(),
171 })
172 }
173
174 pub fn from_file(path: &str) -> ConnectorResult<Self> {
176 let content = std::fs::read_to_string(path).map_err(|e| {
177 ConnectorError::config(format!("Failed to read config file {}: {}", path, e))
178 })?;
179
180 toml::from_str(&content).map_err(|e| {
181 ConnectorError::config(format!("Failed to parse config file {}: {}", path, e))
182 })
183 }
184
185 pub fn apply_env_overrides(&mut self) {
190 if let Ok(val) = env::var("DANUBE_SERVICE_URL") {
191 self.danube_service_url = val;
192 }
193 if let Ok(val) = env::var("CONNECTOR_NAME") {
194 self.connector_name = val;
195 }
196 }
197
198 pub fn validate(&self) -> ConnectorResult<()> {
202 if self.danube_service_url.is_empty() {
203 return Err(ConnectorError::config("danube_service_url cannot be empty"));
204 }
205
206 if self.connector_name.is_empty() {
207 return Err(ConnectorError::config("connector_name cannot be empty"));
208 }
209
210 if self.retry.max_retries > 100 {
211 return Err(ConnectorError::config("max_retries too high (max 100)"));
212 }
213
214 if self.processing.batch_size == 0 {
215 return Err(ConnectorError::config("batch_size must be > 0"));
216 }
217
218 Ok(())
219 }
220}
221
222impl ConfigEnvOverrides for ConnectorConfig {
223 fn apply_env_overrides(&mut self) -> ConnectorResult<()> {
224 ConnectorConfig::apply_env_overrides(self);
225 Ok(())
226 }
227}
228
229impl ConfigValidate for ConnectorConfig {
230 fn validate_config(&self) -> ConnectorResult<()> {
231 self.validate()
232 }
233}
234
235impl Default for ConnectorConfig {
236 fn default() -> Self {
237 Self {
238 danube_service_url: "http://localhost:6650".to_string(),
239 connector_name: "default-connector".to_string(),
240 retry: RetrySettings::default(),
241 processing: ProcessingSettings::default(),
242 schemas: Vec::new(),
243 }
244 }
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct RetrySettings {
254 #[serde(default = "default_max_retries")]
256 pub max_retries: u32,
257
258 #[serde(default = "default_retry_backoff_ms")]
260 pub retry_backoff_ms: u64,
261
262 #[serde(default = "default_max_backoff_ms")]
264 pub max_backoff_ms: u64,
265}
266
267fn default_max_retries() -> u32 {
268 3
269}
270
271fn default_retry_backoff_ms() -> u64 {
272 1000
273}
274
275fn default_max_backoff_ms() -> u64 {
276 30000
277}
278
279impl Default for RetrySettings {
280 fn default() -> Self {
281 Self {
282 max_retries: 3,
283 retry_backoff_ms: 1000,
284 max_backoff_ms: 30000,
285 }
286 }
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
295pub struct ProcessingSettings {
296 #[serde(default = "default_batch_size")]
298 pub batch_size: usize,
299
300 #[serde(default = "default_batch_timeout_ms")]
302 pub batch_timeout_ms: u64,
303
304 #[serde(default = "default_poll_interval_ms")]
306 pub poll_interval_ms: u64,
307
308 #[serde(default = "default_metrics_port")]
310 pub metrics_port: u16,
311
312 #[serde(default = "default_log_level")]
314 pub log_level: String,
315
316 #[serde(default = "default_health_check_interval_ms")]
318 pub health_check_interval_ms: u64,
319
320 #[serde(default = "default_health_check_failure_threshold")]
322 pub health_check_failure_threshold: usize,
323}
324
325fn default_batch_size() -> usize {
326 1000
327}
328
329fn default_batch_timeout_ms() -> u64 {
330 1000
331}
332
333fn default_poll_interval_ms() -> u64 {
334 100
335}
336
337fn default_metrics_port() -> u16 {
338 9090
339}
340
341fn default_log_level() -> String {
342 "info".to_string()
343}
344
345fn default_health_check_interval_ms() -> u64 {
346 30000
347}
348
349fn default_health_check_failure_threshold() -> usize {
350 3
351}
352
353impl Default for ProcessingSettings {
354 fn default() -> Self {
355 Self {
356 batch_size: 1000,
357 batch_timeout_ms: 1000,
358 poll_interval_ms: 100,
359 metrics_port: 9090,
360 log_level: "info".to_string(),
361 health_check_interval_ms: 30000,
362 health_check_failure_threshold: 3,
363 }
364 }
365}
366
367#[derive(Debug, Clone, Serialize, Deserialize)]
375pub struct SchemaMapping {
376 pub topic: String,
378
379 pub subject: String,
381
382 pub schema_type: String,
384
385 pub schema_file: PathBuf,
387
388 #[serde(default = "default_auto_register")]
390 pub auto_register: bool,
391
392 #[serde(default)]
394 pub version_strategy: VersionStrategy,
395}
396
397fn default_auto_register() -> bool {
398 true
399}
400
401#[derive(Debug, Clone, Serialize, Deserialize)]
407pub enum SubscriptionType {
408 Exclusive,
410 Shared,
412 FailOver,
414}
415
416impl From<SubscriptionType> for SubType {
417 fn from(st: SubscriptionType) -> Self {
418 match st {
419 SubscriptionType::Exclusive => SubType::Exclusive,
420 SubscriptionType::Shared => SubType::Shared,
421 SubscriptionType::FailOver => SubType::FailOver,
422 }
423 }
424}
425
426#[derive(Debug, Clone)]
432pub struct ConsumerConfig {
433 pub topic: String,
435 pub consumer_name: String,
437 pub subscription: String,
439 pub subscription_type: SubscriptionType,
441 pub expected_schema_subject: Option<String>,
444}
445
446impl ConsumerConfig {
447 pub fn route(&self) -> SinkRoute {
449 self.clone().into()
450 }
451
452 pub fn from_route(route: SinkRoute) -> Self {
454 route.into()
455 }
456}
457
458#[derive(Debug, Clone)]
468pub struct ProducerConfig {
469 pub topic: String,
471 pub partitions: usize,
473 pub reliable_dispatch: bool,
475 pub schema_config: Option<SchemaConfig>,
478}
479
480impl ProducerConfig {
481 pub fn new(topic: impl Into<String>, partitions: usize, reliable_dispatch: bool) -> Self {
483 Self {
484 topic: topic.into(),
485 partitions,
486 reliable_dispatch,
487 schema_config: None,
488 }
489 }
490
491 pub fn route(&self) -> SourceRoute {
493 self.clone().into()
494 }
495
496 pub fn from_route(route: SourceRoute) -> Self {
498 route.into()
499 }
500}
501
502#[derive(Debug, Clone, Serialize, Deserialize)]
512pub struct SchemaConfig {
513 pub subject: String,
515
516 pub schema_type: String,
518
519 pub schema_file: PathBuf,
521
522 #[serde(default = "default_schema_auto_register")]
524 pub auto_register: bool,
525
526 #[serde(default)]
528 pub version_strategy: VersionStrategy,
529}
530
531fn default_schema_auto_register() -> bool {
532 true
533}
534
535#[derive(Debug, Clone, Default, Serialize, Deserialize)]
543#[serde(rename_all = "lowercase")]
544pub enum VersionStrategy {
545 #[default]
547 Latest,
548
549 Pinned(u32),
551
552 Minimum(u32),
554}
555
556#[cfg(test)]
557mod tests {
558 use super::*;
559 use std::io::Write;
560 use tempfile::NamedTempFile;
561
562 #[test]
563 fn test_config_default() {
564 let config = ConnectorConfig::default();
565 assert_eq!(config.danube_service_url, "http://localhost:6650");
566 assert_eq!(config.connector_name, "default-connector");
567 assert_eq!(config.retry.max_retries, 3);
568 assert_eq!(config.processing.batch_size, 1000);
569 assert_eq!(config.processing.health_check_interval_ms, 30000);
570 assert_eq!(config.processing.health_check_failure_threshold, 3);
571 }
572
573 #[test]
574 fn test_config_validation() {
575 let mut config = ConnectorConfig::default();
576 assert!(config.validate().is_ok());
577
578 config.danube_service_url = "".to_string();
579 assert!(config.validate().is_err());
580
581 config.danube_service_url = "http://localhost:6650".to_string();
582 config.processing.batch_size = 0;
583 assert!(config.validate().is_err());
584 }
585
586 #[derive(Debug, Deserialize)]
587 struct LoaderTestConfig {
588 value: String,
589 }
590
591 impl ConfigValidate for LoaderTestConfig {
592 fn validate_config(&self) -> ConnectorResult<()> {
593 if self.value.is_empty() {
594 return Err(ConnectorError::config("value cannot be empty"));
595 }
596
597 Ok(())
598 }
599 }
600
601 impl ConfigEnvOverrides for LoaderTestConfig {}
602
603 #[test]
604 fn test_config_loader_from_file() {
605 let mut file = NamedTempFile::new().unwrap();
606 writeln!(file, "value = \"loaded\"").unwrap();
607
608 let config: LoaderTestConfig = ConnectorConfigLoader::new().from_file(file.path()).unwrap();
609
610 assert_eq!(config.value, "loaded");
611 }
612}