1use crate::{ConnectorError, ConnectorResult};
4use danube_client::SubType;
5use serde::{Deserialize, Serialize};
6use std::env;
7use std::path::PathBuf;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ConnectorConfig {
21 pub danube_service_url: String,
23
24 pub connector_name: String,
26
27 #[serde(default)]
29 pub retry: RetrySettings,
30
31 #[serde(default)]
33 pub processing: ProcessingSettings,
34
35 #[serde(default)]
37 pub schemas: Vec<SchemaMapping>,
38}
39
40impl ConnectorConfig {
41 pub fn from_env() -> ConnectorResult<Self> {
50 let danube_service_url = env::var("DANUBE_SERVICE_URL")
51 .map_err(|_| ConnectorError::config("DANUBE_SERVICE_URL is required"))?;
52
53 let connector_name = env::var("CONNECTOR_NAME")
54 .map_err(|_| ConnectorError::config("CONNECTOR_NAME is required"))?;
55
56 Ok(Self {
57 danube_service_url,
58 connector_name,
59 retry: RetrySettings::default(),
60 processing: ProcessingSettings::default(),
61 schemas: Vec::new(),
62 })
63 }
64
65 pub fn from_file(path: &str) -> ConnectorResult<Self> {
67 let content = std::fs::read_to_string(path).map_err(|e| {
68 ConnectorError::config(format!("Failed to read config file {}: {}", path, e))
69 })?;
70
71 toml::from_str(&content).map_err(|e| {
72 ConnectorError::config(format!("Failed to parse config file {}: {}", path, e))
73 })
74 }
75
76 pub fn apply_env_overrides(&mut self) {
81 if let Ok(val) = env::var("DANUBE_SERVICE_URL") {
82 self.danube_service_url = val;
83 }
84 if let Ok(val) = env::var("CONNECTOR_NAME") {
85 self.connector_name = val;
86 }
87 }
88
89 pub(crate) fn validate(&self) -> ConnectorResult<()> {
93 if self.danube_service_url.is_empty() {
94 return Err(ConnectorError::config("danube_service_url cannot be empty"));
95 }
96
97 if self.connector_name.is_empty() {
98 return Err(ConnectorError::config("connector_name cannot be empty"));
99 }
100
101 if self.retry.max_retries > 100 {
102 return Err(ConnectorError::config("max_retries too high (max 100)"));
103 }
104
105 if self.processing.batch_size == 0 {
106 return Err(ConnectorError::config("batch_size must be > 0"));
107 }
108
109 Ok(())
110 }
111}
112
113impl Default for ConnectorConfig {
114 fn default() -> Self {
115 Self {
116 danube_service_url: "http://localhost:6650".to_string(),
117 connector_name: "default-connector".to_string(),
118 retry: RetrySettings::default(),
119 processing: ProcessingSettings::default(),
120 schemas: Vec::new(),
121 }
122 }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct RetrySettings {
132 #[serde(default = "default_max_retries")]
134 pub max_retries: u32,
135
136 #[serde(default = "default_retry_backoff_ms")]
138 pub retry_backoff_ms: u64,
139
140 #[serde(default = "default_max_backoff_ms")]
142 pub max_backoff_ms: u64,
143}
144
145fn default_max_retries() -> u32 {
146 3
147}
148fn default_retry_backoff_ms() -> u64 {
149 1000
150}
151fn default_max_backoff_ms() -> u64 {
152 30000
153}
154
155impl Default for RetrySettings {
156 fn default() -> Self {
157 Self {
158 max_retries: 3,
159 retry_backoff_ms: 1000,
160 max_backoff_ms: 30000,
161 }
162 }
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct ProcessingSettings {
172 #[serde(default = "default_batch_size")]
174 pub batch_size: usize,
175
176 #[serde(default = "default_batch_timeout_ms")]
178 pub batch_timeout_ms: u64,
179
180 #[serde(default = "default_poll_interval_ms")]
182 pub poll_interval_ms: u64,
183
184 #[serde(default = "default_metrics_port")]
186 pub metrics_port: u16,
187
188 #[serde(default = "default_log_level")]
190 pub log_level: String,
191}
192
193fn default_batch_size() -> usize {
194 1000
195}
196fn default_batch_timeout_ms() -> u64 {
197 1000
198}
199fn default_poll_interval_ms() -> u64 {
200 100
201}
202fn default_metrics_port() -> u16 {
203 9090
204}
205fn default_log_level() -> String {
206 "info".to_string()
207}
208
209impl Default for ProcessingSettings {
210 fn default() -> Self {
211 Self {
212 batch_size: 1000,
213 batch_timeout_ms: 1000,
214 poll_interval_ms: 100,
215 metrics_port: 9090,
216 log_level: "info".to_string(),
217 }
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct SchemaMapping {
230 pub topic: String,
232
233 pub subject: String,
235
236 pub schema_type: String,
238
239 pub schema_file: PathBuf,
241
242 #[serde(default = "default_auto_register")]
244 pub auto_register: bool,
245
246 #[serde(default)]
248 pub version_strategy: VersionStrategy,
249}
250
251fn default_auto_register() -> bool {
252 true
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
261pub enum SubscriptionType {
262 Exclusive,
263 Shared,
264 FailOver,
265}
266
267impl From<SubscriptionType> for SubType {
268 fn from(st: SubscriptionType) -> Self {
269 match st {
270 SubscriptionType::Exclusive => SubType::Exclusive,
271 SubscriptionType::Shared => SubType::Shared,
272 SubscriptionType::FailOver => SubType::FailOver,
273 }
274 }
275}
276
277#[derive(Debug, Clone)]
283pub struct ConsumerConfig {
284 pub topic: String,
286 pub consumer_name: String,
288 pub subscription: String,
290 pub subscription_type: SubscriptionType,
292 pub expected_schema_subject: Option<String>,
295}
296
297#[derive(Debug, Clone)]
307pub struct ProducerConfig {
308 pub topic: String,
310 pub partitions: usize,
312 pub reliable_dispatch: bool,
314 pub schema_config: Option<SchemaConfig>,
317}
318
319impl ProducerConfig {
320 pub fn new(topic: impl Into<String>, partitions: usize, reliable_dispatch: bool) -> Self {
322 Self {
323 topic: topic.into(),
324 partitions,
325 reliable_dispatch,
326 schema_config: None,
327 }
328 }
329}
330
331#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct SchemaConfig {
342 pub subject: String,
344
345 pub schema_type: String,
347
348 pub schema_file: PathBuf,
350
351 #[serde(default = "default_schema_auto_register")]
353 pub auto_register: bool,
354
355 #[serde(default)]
357 pub version_strategy: VersionStrategy,
358}
359
360fn default_schema_auto_register() -> bool {
361 true
362}
363
364#[derive(Debug, Clone, Default, Serialize, Deserialize)]
372#[serde(rename_all = "lowercase")]
373pub enum VersionStrategy {
374 #[default]
376 Latest,
377
378 Pinned(u32),
380
381 Minimum(u32),
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[test]
390 fn test_config_default() {
391 let config = ConnectorConfig::default();
392 assert_eq!(config.danube_service_url, "http://localhost:6650");
393 assert_eq!(config.connector_name, "default-connector");
394 assert_eq!(config.retry.max_retries, 3);
395 assert_eq!(config.processing.batch_size, 1000);
396 }
397
398 #[test]
399 fn test_config_validation() {
400 let mut config = ConnectorConfig::default();
401 assert!(config.validate().is_ok());
402
403 config.danube_service_url = "".to_string();
404 assert!(config.validate().is_err());
405
406 config.danube_service_url = "http://localhost:6650".to_string();
407 config.processing.batch_size = 0;
408 assert!(config.validate().is_err());
409 }
410}