1#![allow(clippy::must_use_candidate)]
45
46use std::collections::HashMap;
47use std::sync::atomic::{AtomicBool, Ordering};
48
49use tracing::warn;
50
51static DEPRECATION_WARNED: AtomicBool = AtomicBool::new(false);
54
55#[derive(Debug, Clone)]
57pub struct EnvVar {
58 pub standard: String,
60 pub legacy: Vec<String>,
62 pub description: Option<String>,
64}
65
66impl EnvVar {
67 #[must_use]
69 pub fn new(standard: &str) -> Self {
70 Self {
71 standard: standard.to_string(),
72 legacy: Vec::new(),
73 description: None,
74 }
75 }
76
77 #[must_use]
79 pub fn with_legacy(mut self, name: &str) -> Self {
80 self.legacy.push(name.to_string());
81 self
82 }
83
84 #[must_use]
86 pub fn with_legacy_names(mut self, names: &[&str]) -> Self {
87 for name in names {
88 self.legacy.push((*name).to_string());
89 }
90 self
91 }
92
93 #[must_use]
95 pub fn with_description(mut self, desc: &str) -> Self {
96 self.description = Some(desc.to_string());
97 self
98 }
99
100 #[must_use]
104 pub fn get(&self) -> Option<String> {
105 if let Ok(value) = std::env::var(&self.standard) {
107 return Some(value);
108 }
109
110 for legacy_name in &self.legacy {
112 if let Ok(value) = std::env::var(legacy_name) {
113 log_deprecation_warning(legacy_name, &self.standard);
114 return Some(value);
115 }
116 }
117
118 None
119 }
120
121 #[must_use]
123 pub fn get_or(&self, default: &str) -> String {
124 self.get().unwrap_or_else(|| default.to_string())
125 }
126
127 pub fn get_parsed<T: std::str::FromStr>(&self) -> Option<T> {
129 self.get().and_then(|v| v.parse().ok())
130 }
131
132 #[must_use]
136 pub fn get_bool(&self) -> Option<bool> {
137 self.get().map(|v| {
138 let v = v.to_lowercase();
139 v == "true" || v == "1" || v == "yes" || v == "on"
140 })
141 }
142
143 #[must_use]
145 pub fn get_list(&self) -> Option<Vec<String>> {
146 self.get()
147 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
148 }
149
150 #[must_use]
152 pub fn which_name_used(&self) -> Option<&str> {
153 if std::env::var(&self.standard).is_ok() {
154 return Some(&self.standard);
155 }
156 self.legacy
157 .iter()
158 .find(|name| std::env::var(name).is_ok())
159 .map(String::as_str)
160 }
161}
162
163fn log_deprecation_warning(legacy_name: &str, standard_name: &str) {
165 let already_warned = DEPRECATION_WARNED.swap(true, Ordering::Relaxed);
168
169 if already_warned {
170 tracing::debug!(
172 legacy = %legacy_name,
173 standard = %standard_name,
174 "Deprecated environment variable used"
175 );
176 } else {
177 warn!(
179 legacy = %legacy_name,
180 standard = %standard_name,
181 "Using deprecated environment variable. Please migrate to the standard name."
182 );
183 }
184}
185
186#[cfg(test)]
188pub fn reset_deprecation_warnings() {
189 DEPRECATION_WARNED.store(false, Ordering::Relaxed);
190}
191
192pub mod postgres {
201 use super::EnvVar;
202
203 pub fn host() -> EnvVar {
205 EnvVar::new("PGHOST")
206 .with_legacy_names(&["POSTGRESQL_HOST", "PG_HOST", "POSTGRES_HOST"])
207 .with_description("PostgreSQL server hostname")
208 }
209
210 pub fn port() -> EnvVar {
212 EnvVar::new("PGPORT")
213 .with_legacy_names(&["POSTGRESQL_PORT", "PG_PORT", "POSTGRES_PORT"])
214 .with_description("PostgreSQL server port")
215 }
216
217 pub fn user() -> EnvVar {
219 EnvVar::new("PGUSER")
220 .with_legacy_names(&["POSTGRESQL_USER", "PG_USER", "POSTGRES_USER"])
221 .with_description("PostgreSQL username")
222 }
223
224 pub fn password() -> EnvVar {
226 EnvVar::new("PGPASSWORD")
227 .with_legacy_names(&["POSTGRESQL_PASSWORD", "PG_PASSWORD", "POSTGRES_PASSWORD"])
228 .with_description("PostgreSQL password")
229 }
230
231 pub fn database() -> EnvVar {
233 EnvVar::new("PGDATABASE")
234 .with_legacy_names(&[
235 "POSTGRESQL_DATABASE",
236 "PG_DATABASE",
237 "POSTGRES_DATABASE",
238 "POSTGRES_DB",
239 ])
240 .with_description("PostgreSQL database name")
241 }
242
243 pub fn sslmode() -> EnvVar {
245 EnvVar::new("PGSSLMODE")
246 .with_legacy_names(&["POSTGRESQL_SSLMODE", "PG_SSLMODE"])
247 .with_description("PostgreSQL SSL mode")
248 }
249}
250
251pub mod kafka {
255 use super::EnvVar;
256
257 fn kafka_var(name: &str, legacy: &[&str]) -> EnvVar {
259 let standard = format!("KAFKA_{name}");
260 let mut var = EnvVar::new(&standard);
261 for l in legacy {
262 var = var.with_legacy(l);
263 }
264 var
265 }
266
267 pub fn bootstrap_servers() -> EnvVar {
269 kafka_var("BOOTSTRAP_SERVERS", &["KAFKA_BROKERS"])
270 .with_description("Kafka broker addresses (comma-separated)")
271 }
272
273 pub fn security_protocol() -> EnvVar {
275 kafka_var("SECURITY_PROTOCOL", &[])
276 .with_description("Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)")
277 }
278
279 pub fn sasl_mechanism() -> EnvVar {
281 kafka_var("SASL_MECHANISM", &[])
282 .with_description("SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)")
283 }
284
285 pub fn sasl_username() -> EnvVar {
287 kafka_var("SASL_USERNAME", &["KAFKA_SASL_USER"]).with_description("SASL username")
288 }
289
290 pub fn sasl_password() -> EnvVar {
292 kafka_var("SASL_PASSWORD", &[]).with_description("SASL password")
293 }
294
295 pub fn group_id() -> EnvVar {
297 kafka_var("GROUP_ID", &["KAFKA_GROUP", "KAFKA_CONSUMER_GROUP"])
298 .with_description("Consumer group ID")
299 }
300
301 pub fn client_id() -> EnvVar {
303 kafka_var("CLIENT_ID", &[]).with_description("Client ID for broker logs")
304 }
305
306 pub fn topics() -> EnvVar {
308 kafka_var("TOPICS", &["KAFKA_TOPIC"])
309 .with_description("Topics to subscribe to (comma-separated)")
310 }
311
312 pub fn ssl_ca_location() -> EnvVar {
314 kafka_var("SSL_CA_LOCATION", &["KAFKA_CA_CERT", "KAFKA_SSL_CA"])
315 .with_description("Path to SSL CA certificate")
316 }
317
318 pub fn ssl_skip_verify() -> EnvVar {
320 kafka_var("SSL_SKIP_VERIFY", &["KAFKA_SSL_INSECURE", "KAFKA_INSECURE"])
321 .with_description("Skip SSL certificate verification")
322 }
323
324 pub fn profile() -> EnvVar {
326 kafka_var("PROFILE", &[]).with_description("Kafka profile (production, devtest)")
327 }
328
329 pub fn with_prefix(prefix: &str, name: &str) -> EnvVar {
333 EnvVar::new(&format!("{prefix}_KAFKA_{name}")).with_legacy(&format!("{prefix}_{name}"))
334 }
335}
336
337pub mod vault {
341 use super::EnvVar;
342
343 pub fn addr() -> EnvVar {
345 EnvVar::new("VAULT_ADDR")
346 .with_legacy_names(&["OPENBAO_ADDR", "BAO_ADDR"])
347 .with_description("Vault/OpenBao server address")
348 }
349
350 pub fn token() -> EnvVar {
352 EnvVar::new("VAULT_TOKEN")
353 .with_legacy_names(&["OPENBAO_TOKEN", "BAO_TOKEN", "OPENBAO_ROOT_TOKEN"])
354 .with_description("Vault/OpenBao authentication token")
355 }
356
357 pub fn namespace() -> EnvVar {
359 EnvVar::new("VAULT_NAMESPACE")
360 .with_legacy_names(&["OPENBAO_NAMESPACE", "BAO_NAMESPACE"])
361 .with_description("Vault namespace (Enterprise)")
362 }
363
364 pub fn skip_verify() -> EnvVar {
366 EnvVar::new("VAULT_SKIP_VERIFY")
367 .with_legacy_names(&[
368 "OPENBAO_SKIP_VERIFY",
369 "BAO_SKIP_VERIFY",
370 "VAULT_TLS_SKIP_VERIFY",
371 ])
372 .with_description("Skip TLS certificate verification")
373 }
374
375 pub fn ca_cert() -> EnvVar {
377 EnvVar::new("VAULT_CACERT")
378 .with_legacy_names(&["OPENBAO_CACERT", "BAO_CACERT", "VAULT_CA_CERT"])
379 .with_description("Path to CA certificate for Vault TLS")
380 }
381
382 pub fn approle_role_id() -> EnvVar {
384 EnvVar::new("VAULT_ROLE_ID")
385 .with_legacy_names(&["OPENBAO_ROLE_ID", "BAO_ROLE_ID"])
386 .with_description("AppRole role ID")
387 }
388
389 pub fn approle_secret_id() -> EnvVar {
391 EnvVar::new("VAULT_SECRET_ID")
392 .with_legacy_names(&["OPENBAO_SECRET_ID", "BAO_SECRET_ID"])
393 .with_description("AppRole secret ID")
394 }
395
396 pub fn k8s_role() -> EnvVar {
398 EnvVar::new("VAULT_K8S_ROLE")
399 .with_legacy_names(&["OPENBAO_K8S_ROLE", "BAO_K8S_ROLE"])
400 .with_description("Kubernetes auth role name")
401 }
402}
403
404pub mod aws {
406 use super::EnvVar;
407
408 pub fn access_key_id() -> EnvVar {
410 EnvVar::new("AWS_ACCESS_KEY_ID")
411 .with_legacy_names(&["AWS_ACCESS_KEY"])
412 .with_description("AWS access key ID")
413 }
414
415 pub fn secret_access_key() -> EnvVar {
417 EnvVar::new("AWS_SECRET_ACCESS_KEY")
418 .with_legacy_names(&["AWS_SECRET_KEY"])
419 .with_description("AWS secret access key")
420 }
421
422 pub fn session_token() -> EnvVar {
424 EnvVar::new("AWS_SESSION_TOKEN")
425 .with_legacy_names(&["AWS_SECURITY_TOKEN"])
426 .with_description("AWS session token (for temporary credentials)")
427 }
428
429 pub fn region() -> EnvVar {
431 EnvVar::new("AWS_DEFAULT_REGION")
432 .with_legacy_names(&["AWS_REGION"])
433 .with_description("AWS region")
434 }
435
436 pub fn endpoint_url() -> EnvVar {
438 EnvVar::new("AWS_ENDPOINT_URL")
439 .with_legacy_names(&["AWS_ENDPOINT", "LOCALSTACK_ENDPOINT"])
440 .with_description("Custom AWS endpoint URL")
441 }
442}
443
444pub mod clickhouse {
446 use super::EnvVar;
447
448 pub fn host() -> EnvVar {
450 EnvVar::new("CLICKHOUSE_HOST")
451 .with_legacy_names(&["CH_HOST"])
452 .with_description("ClickHouse server hostname")
453 }
454
455 pub fn native_port() -> EnvVar {
457 EnvVar::new("CLICKHOUSE_NATIVE_PORT")
458 .with_legacy_names(&["CLICKHOUSE_PORT", "CH_PORT"])
459 .with_description("ClickHouse native protocol port (default: 9000)")
460 }
461
462 pub fn http_port() -> EnvVar {
464 EnvVar::new("CLICKHOUSE_HTTP_PORT")
465 .with_legacy_names(&["CH_HTTP_PORT"])
466 .with_description("ClickHouse HTTP port (default: 8123)")
467 }
468
469 pub fn user() -> EnvVar {
471 EnvVar::new("CLICKHOUSE_USER")
472 .with_legacy_names(&["CH_USER", "CLICKHOUSE_USERNAME"])
473 .with_description("ClickHouse username")
474 }
475
476 pub fn password() -> EnvVar {
478 EnvVar::new("CLICKHOUSE_PASSWORD")
479 .with_legacy_names(&["CH_PASSWORD"])
480 .with_description("ClickHouse password")
481 }
482
483 pub fn database() -> EnvVar {
485 EnvVar::new("CLICKHOUSE_DATABASE")
486 .with_legacy_names(&["CH_DATABASE", "CLICKHOUSE_DB"])
487 .with_description("ClickHouse database name")
488 }
489}
490
491#[must_use]
495pub fn load_all_standard() -> HashMap<String, Option<String>> {
496 let mut vars = HashMap::new();
497
498 vars.insert("pg.host".into(), postgres::host().get());
500 vars.insert("pg.port".into(), postgres::port().get());
501 vars.insert("pg.user".into(), postgres::user().get());
502 vars.insert("pg.database".into(), postgres::database().get());
503
504 vars.insert(
506 "kafka.bootstrap_servers".into(),
507 kafka::bootstrap_servers().get(),
508 );
509 vars.insert(
510 "kafka.security_protocol".into(),
511 kafka::security_protocol().get(),
512 );
513 vars.insert("kafka.sasl_mechanism".into(), kafka::sasl_mechanism().get());
514 vars.insert("kafka.sasl_username".into(), kafka::sasl_username().get());
515
516 vars.insert("vault.addr".into(), vault::addr().get());
518 vars.insert("vault.namespace".into(), vault::namespace().get());
519
520 vars.insert("aws.region".into(), aws::region().get());
522
523 vars.insert("clickhouse.host".into(), clickhouse::host().get());
525 vars.insert("clickhouse.database".into(), clickhouse::database().get());
526
527 vars
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use std::sync::Mutex;
534
535 static ENV_LOCK: Mutex<()> = Mutex::new(());
538
539 fn setup() {
540 reset_deprecation_warnings();
541 }
542
543 #[test]
544 fn test_env_var_standard_name() {
545 let _lock = ENV_LOCK.lock().unwrap();
546 setup();
547 temp_env::with_var("TEST_STANDARD_VAR", Some("standard_value"), || {
548 let var = EnvVar::new("TEST_STANDARD_VAR").with_legacy("TEST_LEGACY_VAR");
549 assert_eq!(var.get(), Some("standard_value".to_string()));
550 });
551 }
552
553 #[test]
554 fn test_env_var_legacy_fallback() {
555 let _lock = ENV_LOCK.lock().unwrap();
556 setup();
557 temp_env::with_var("TEST_LEGACY_VAR2", Some("legacy_value"), || {
558 let var = EnvVar::new("TEST_STANDARD_VAR2").with_legacy("TEST_LEGACY_VAR2");
559 assert_eq!(var.get(), Some("legacy_value".to_string()));
560 });
561 }
562
563 #[test]
564 fn test_env_var_standard_takes_precedence() {
565 let _lock = ENV_LOCK.lock().unwrap();
566 setup();
567 temp_env::with_vars(
568 [
569 ("TEST_STANDARD_VAR3", Some("standard")),
570 ("TEST_LEGACY_VAR3", Some("legacy")),
571 ],
572 || {
573 let var = EnvVar::new("TEST_STANDARD_VAR3").with_legacy("TEST_LEGACY_VAR3");
574 assert_eq!(var.get(), Some("standard".to_string()));
575 },
576 );
577 }
578
579 #[test]
580 fn test_env_var_missing() {
581 let _lock = ENV_LOCK.lock().unwrap();
582 setup();
583 let var = EnvVar::new("NONEXISTENT_VAR").with_legacy("ALSO_NONEXISTENT");
584 assert_eq!(var.get(), None);
585 }
586
587 #[test]
588 fn test_env_var_get_bool() {
589 let _lock = ENV_LOCK.lock().unwrap();
590 setup();
591 temp_env::with_vars(
592 [
593 ("TEST_BOOL_TRUE", Some("true")),
594 ("TEST_BOOL_ONE", Some("1")),
595 ("TEST_BOOL_YES", Some("YES")),
596 ("TEST_BOOL_FALSE", Some("false")),
597 ],
598 || {
599 assert_eq!(EnvVar::new("TEST_BOOL_TRUE").get_bool(), Some(true));
600 assert_eq!(EnvVar::new("TEST_BOOL_ONE").get_bool(), Some(true));
601 assert_eq!(EnvVar::new("TEST_BOOL_YES").get_bool(), Some(true));
602 assert_eq!(EnvVar::new("TEST_BOOL_FALSE").get_bool(), Some(false));
603 },
604 );
605 }
606
607 #[test]
608 fn test_env_var_get_list() {
609 let _lock = ENV_LOCK.lock().unwrap();
610 setup();
611 temp_env::with_var("TEST_LIST", Some("a, b, c"), || {
612 let var = EnvVar::new("TEST_LIST");
613 assert_eq!(
614 var.get_list(),
615 Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
616 );
617 });
618 }
619
620 #[test]
621 fn test_postgres_env_vars() {
622 let _lock = ENV_LOCK.lock().unwrap();
623 setup();
624 temp_env::with_var("PGHOST", Some("localhost"), || {
625 assert_eq!(postgres::host().get(), Some("localhost".to_string()));
626 });
627 }
628
629 #[test]
630 fn test_postgres_legacy_fallback() {
631 let _lock = ENV_LOCK.lock().unwrap();
632 setup();
633 temp_env::with_vars(
634 [
635 ("PGHOST", None::<&str>),
636 ("POSTGRESQL_HOST", Some("legacy-host")),
637 ],
638 || assert_eq!(postgres::host().get(), Some("legacy-host".to_string())),
639 );
640 }
641
642 #[test]
643 fn test_kafka_env_vars() {
644 let _lock = ENV_LOCK.lock().unwrap();
645 setup();
646 temp_env::with_var("KAFKA_BOOTSTRAP_SERVERS", Some("kafka:9092"), || {
647 assert_eq!(
648 kafka::bootstrap_servers().get(),
649 Some("kafka:9092".to_string())
650 );
651 });
652 }
653
654 #[test]
655 fn test_vault_env_vars() {
656 let _lock = ENV_LOCK.lock().unwrap();
657 setup();
658 temp_env::with_var("VAULT_ADDR", Some("https://vault:8200"), || {
659 assert_eq!(vault::addr().get(), Some("https://vault:8200".to_string()));
660 });
661 }
662
663 #[test]
664 fn test_vault_openbao_fallback() {
665 let _lock = ENV_LOCK.lock().unwrap();
666 setup();
667 temp_env::with_vars(
668 [
669 ("VAULT_ADDR", None::<&str>),
670 ("OPENBAO_ADDR", Some("https://openbao:8200")),
671 ],
672 || {
673 assert_eq!(
674 vault::addr().get(),
675 Some("https://openbao:8200".to_string())
676 );
677 },
678 );
679 }
680
681 #[test]
682 fn test_which_name_used() {
683 let _lock = ENV_LOCK.lock().unwrap();
684 setup();
685 temp_env::with_var("TEST_WHICH_LEGACY", Some("value"), || {
686 let var = EnvVar::new("TEST_WHICH_STANDARD").with_legacy("TEST_WHICH_LEGACY");
687 assert_eq!(var.which_name_used(), Some("TEST_WHICH_LEGACY"));
688 });
689 }
690}