1#![allow(clippy::must_use_candidate)]
38
39use std::collections::HashMap;
40use std::sync::atomic::{AtomicBool, Ordering};
41
42use tracing::warn;
43
44static DEPRECATION_WARNED: AtomicBool = AtomicBool::new(false);
46
47#[derive(Debug, Clone)]
49pub struct EnvVar {
50 pub standard: String,
52 pub legacy: Vec<String>,
54 pub description: Option<String>,
56}
57
58impl EnvVar {
59 #[must_use]
61 pub fn new(standard: &str) -> Self {
62 Self {
63 standard: standard.to_string(),
64 legacy: Vec::new(),
65 description: None,
66 }
67 }
68
69 #[must_use]
71 pub fn with_legacy(mut self, name: &str) -> Self {
72 self.legacy.push(name.to_string());
73 self
74 }
75
76 #[must_use]
78 pub fn with_legacy_names(mut self, names: &[&str]) -> Self {
79 for name in names {
80 self.legacy.push((*name).to_string());
81 }
82 self
83 }
84
85 #[must_use]
87 pub fn with_description(mut self, desc: &str) -> Self {
88 self.description = Some(desc.to_string());
89 self
90 }
91
92 #[must_use]
96 pub fn get(&self) -> Option<String> {
97 if let Ok(value) = std::env::var(&self.standard) {
99 return Some(value);
100 }
101
102 for legacy_name in &self.legacy {
104 if let Ok(value) = std::env::var(legacy_name) {
105 log_deprecation_warning(legacy_name, &self.standard);
106 return Some(value);
107 }
108 }
109
110 None
111 }
112
113 #[must_use]
115 pub fn get_or(&self, default: &str) -> String {
116 self.get().unwrap_or_else(|| default.to_string())
117 }
118
119 pub fn get_parsed<T: std::str::FromStr>(&self) -> Option<T> {
121 self.get().and_then(|v| v.parse().ok())
122 }
123
124 #[must_use]
128 pub fn get_bool(&self) -> Option<bool> {
129 self.get().map(|v| {
130 let v = v.to_lowercase();
131 v == "true" || v == "1" || v == "yes" || v == "on"
132 })
133 }
134
135 #[must_use]
137 pub fn get_list(&self) -> Option<Vec<String>> {
138 self.get()
139 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
140 }
141
142 #[must_use]
144 pub fn which_name_used(&self) -> Option<&str> {
145 if std::env::var(&self.standard).is_ok() {
146 return Some(&self.standard);
147 }
148 self.legacy
149 .iter()
150 .find(|name| std::env::var(name).is_ok())
151 .map(String::as_str)
152 }
153}
154
155fn log_deprecation_warning(legacy_name: &str, standard_name: &str) {
157 let already_warned = DEPRECATION_WARNED.swap(true, Ordering::Relaxed);
159
160 if already_warned {
161 tracing::debug!(
162 legacy = %legacy_name,
163 standard = %standard_name,
164 "Deprecated environment variable used"
165 );
166 } else {
167 warn!(
168 legacy = %legacy_name,
169 standard = %standard_name,
170 "Using deprecated environment variable. Please migrate to the standard name."
171 );
172 }
173}
174
175#[cfg(test)]
177pub fn reset_deprecation_warnings() {
178 DEPRECATION_WARNED.store(false, Ordering::Relaxed);
179}
180
181pub mod postgres {
190 use super::EnvVar;
191
192 pub fn host() -> EnvVar {
194 EnvVar::new("PGHOST")
195 .with_legacy_names(&["POSTGRESQL_HOST", "PG_HOST", "POSTGRES_HOST"])
196 .with_description("PostgreSQL server hostname")
197 }
198
199 pub fn port() -> EnvVar {
201 EnvVar::new("PGPORT")
202 .with_legacy_names(&["POSTGRESQL_PORT", "PG_PORT", "POSTGRES_PORT"])
203 .with_description("PostgreSQL server port")
204 }
205
206 pub fn user() -> EnvVar {
208 EnvVar::new("PGUSER")
209 .with_legacy_names(&["POSTGRESQL_USER", "PG_USER", "POSTGRES_USER"])
210 .with_description("PostgreSQL username")
211 }
212
213 pub fn password() -> EnvVar {
215 EnvVar::new("PGPASSWORD")
216 .with_legacy_names(&["POSTGRESQL_PASSWORD", "PG_PASSWORD", "POSTGRES_PASSWORD"])
217 .with_description("PostgreSQL password")
218 }
219
220 pub fn database() -> EnvVar {
222 EnvVar::new("PGDATABASE")
223 .with_legacy_names(&[
224 "POSTGRESQL_DATABASE",
225 "PG_DATABASE",
226 "POSTGRES_DATABASE",
227 "POSTGRES_DB",
228 ])
229 .with_description("PostgreSQL database name")
230 }
231
232 pub fn sslmode() -> EnvVar {
234 EnvVar::new("PGSSLMODE")
235 .with_legacy_names(&["POSTGRESQL_SSLMODE", "PG_SSLMODE"])
236 .with_description("PostgreSQL SSL mode")
237 }
238}
239
240pub mod kafka {
244 use super::EnvVar;
245
246 fn kafka_var(name: &str, legacy: &[&str]) -> EnvVar {
248 let standard = format!("KAFKA_{name}");
249 let mut var = EnvVar::new(&standard);
250 for l in legacy {
251 var = var.with_legacy(l);
252 }
253 var
254 }
255
256 pub fn bootstrap_servers() -> EnvVar {
258 kafka_var("BOOTSTRAP_SERVERS", &["KAFKA_BROKERS"])
259 .with_description("Kafka broker addresses (comma-separated)")
260 }
261
262 pub fn security_protocol() -> EnvVar {
264 kafka_var("SECURITY_PROTOCOL", &[])
265 .with_description("Security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)")
266 }
267
268 pub fn sasl_mechanism() -> EnvVar {
270 kafka_var("SASL_MECHANISM", &[])
271 .with_description("SASL mechanism (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512)")
272 }
273
274 pub fn sasl_username() -> EnvVar {
276 kafka_var("SASL_USERNAME", &["KAFKA_SASL_USER"]).with_description("SASL username")
277 }
278
279 pub fn sasl_password() -> EnvVar {
281 kafka_var("SASL_PASSWORD", &[]).with_description("SASL password")
282 }
283
284 pub fn group_id() -> EnvVar {
286 kafka_var("GROUP_ID", &["KAFKA_GROUP", "KAFKA_CONSUMER_GROUP"])
287 .with_description("Consumer group ID")
288 }
289
290 pub fn client_id() -> EnvVar {
292 kafka_var("CLIENT_ID", &[]).with_description("Client ID for broker logs")
293 }
294
295 pub fn topics() -> EnvVar {
297 kafka_var("TOPICS", &["KAFKA_TOPIC"])
298 .with_description("Topics to subscribe to (comma-separated)")
299 }
300
301 pub fn ssl_ca_location() -> EnvVar {
303 kafka_var("SSL_CA_LOCATION", &["KAFKA_CA_CERT", "KAFKA_SSL_CA"])
304 .with_description("Path to SSL CA certificate")
305 }
306
307 pub fn ssl_skip_verify() -> EnvVar {
309 kafka_var("SSL_SKIP_VERIFY", &["KAFKA_SSL_INSECURE", "KAFKA_INSECURE"])
310 .with_description("Skip SSL certificate verification")
311 }
312
313 pub fn profile() -> EnvVar {
315 kafka_var("PROFILE", &[]).with_description("Kafka profile (production, devtest)")
316 }
317
318 pub fn with_prefix(prefix: &str, name: &str) -> EnvVar {
322 EnvVar::new(&format!("{prefix}_KAFKA_{name}")).with_legacy(&format!("{prefix}_{name}"))
323 }
324}
325
326pub mod vault {
330 use super::EnvVar;
331
332 pub fn addr() -> EnvVar {
334 EnvVar::new("VAULT_ADDR")
335 .with_legacy_names(&["OPENBAO_ADDR", "BAO_ADDR"])
336 .with_description("Vault/OpenBao server address")
337 }
338
339 pub fn token() -> EnvVar {
341 EnvVar::new("VAULT_TOKEN")
342 .with_legacy_names(&["OPENBAO_TOKEN", "BAO_TOKEN", "OPENBAO_ROOT_TOKEN"])
343 .with_description("Vault/OpenBao authentication token")
344 }
345
346 pub fn namespace() -> EnvVar {
348 EnvVar::new("VAULT_NAMESPACE")
349 .with_legacy_names(&["OPENBAO_NAMESPACE", "BAO_NAMESPACE"])
350 .with_description("Vault namespace (Enterprise)")
351 }
352
353 pub fn skip_verify() -> EnvVar {
355 EnvVar::new("VAULT_SKIP_VERIFY")
356 .with_legacy_names(&[
357 "OPENBAO_SKIP_VERIFY",
358 "BAO_SKIP_VERIFY",
359 "VAULT_TLS_SKIP_VERIFY",
360 ])
361 .with_description("Skip TLS certificate verification")
362 }
363
364 pub fn ca_cert() -> EnvVar {
366 EnvVar::new("VAULT_CACERT")
367 .with_legacy_names(&["OPENBAO_CACERT", "BAO_CACERT", "VAULT_CA_CERT"])
368 .with_description("Path to CA certificate for Vault TLS")
369 }
370
371 pub fn approle_role_id() -> EnvVar {
373 EnvVar::new("VAULT_ROLE_ID")
374 .with_legacy_names(&["OPENBAO_ROLE_ID", "BAO_ROLE_ID"])
375 .with_description("AppRole role ID")
376 }
377
378 pub fn approle_secret_id() -> EnvVar {
380 EnvVar::new("VAULT_SECRET_ID")
381 .with_legacy_names(&["OPENBAO_SECRET_ID", "BAO_SECRET_ID"])
382 .with_description("AppRole secret ID")
383 }
384
385 pub fn k8s_role() -> EnvVar {
387 EnvVar::new("VAULT_K8S_ROLE")
388 .with_legacy_names(&["OPENBAO_K8S_ROLE", "BAO_K8S_ROLE"])
389 .with_description("Kubernetes auth role name")
390 }
391}
392
393pub mod aws {
395 use super::EnvVar;
396
397 pub fn access_key_id() -> EnvVar {
399 EnvVar::new("AWS_ACCESS_KEY_ID")
400 .with_legacy_names(&["AWS_ACCESS_KEY"])
401 .with_description("AWS access key ID")
402 }
403
404 pub fn secret_access_key() -> EnvVar {
406 EnvVar::new("AWS_SECRET_ACCESS_KEY")
407 .with_legacy_names(&["AWS_SECRET_KEY"])
408 .with_description("AWS secret access key")
409 }
410
411 pub fn session_token() -> EnvVar {
413 EnvVar::new("AWS_SESSION_TOKEN")
414 .with_legacy_names(&["AWS_SECURITY_TOKEN"])
415 .with_description("AWS session token (for temporary credentials)")
416 }
417
418 pub fn region() -> EnvVar {
420 EnvVar::new("AWS_DEFAULT_REGION")
421 .with_legacy_names(&["AWS_REGION"])
422 .with_description("AWS region")
423 }
424
425 pub fn endpoint_url() -> EnvVar {
427 EnvVar::new("AWS_ENDPOINT_URL")
428 .with_legacy_names(&["AWS_ENDPOINT", "LOCALSTACK_ENDPOINT"])
429 .with_description("Custom AWS endpoint URL")
430 }
431}
432
433pub mod clickhouse {
435 use super::EnvVar;
436
437 pub fn host() -> EnvVar {
439 EnvVar::new("CLICKHOUSE_HOST")
440 .with_legacy_names(&["CH_HOST"])
441 .with_description("ClickHouse server hostname")
442 }
443
444 pub fn native_port() -> EnvVar {
446 EnvVar::new("CLICKHOUSE_NATIVE_PORT")
447 .with_legacy_names(&["CLICKHOUSE_PORT", "CH_PORT"])
448 .with_description("ClickHouse native protocol port (default: 9000)")
449 }
450
451 pub fn http_port() -> EnvVar {
453 EnvVar::new("CLICKHOUSE_HTTP_PORT")
454 .with_legacy_names(&["CH_HTTP_PORT"])
455 .with_description("ClickHouse HTTP port (default: 8123)")
456 }
457
458 pub fn user() -> EnvVar {
460 EnvVar::new("CLICKHOUSE_USER")
461 .with_legacy_names(&["CH_USER", "CLICKHOUSE_USERNAME"])
462 .with_description("ClickHouse username")
463 }
464
465 pub fn password() -> EnvVar {
467 EnvVar::new("CLICKHOUSE_PASSWORD")
468 .with_legacy_names(&["CH_PASSWORD"])
469 .with_description("ClickHouse password")
470 }
471
472 pub fn database() -> EnvVar {
474 EnvVar::new("CLICKHOUSE_DATABASE")
475 .with_legacy_names(&["CH_DATABASE", "CLICKHOUSE_DB"])
476 .with_description("ClickHouse database name")
477 }
478}
479
480#[must_use]
483pub fn load_all_standard() -> HashMap<String, Option<String>> {
484 let mut vars = HashMap::new();
485
486 vars.insert("pg.host".into(), postgres::host().get());
488 vars.insert("pg.port".into(), postgres::port().get());
489 vars.insert("pg.user".into(), postgres::user().get());
490 vars.insert("pg.database".into(), postgres::database().get());
491
492 vars.insert(
494 "kafka.bootstrap_servers".into(),
495 kafka::bootstrap_servers().get(),
496 );
497 vars.insert(
498 "kafka.security_protocol".into(),
499 kafka::security_protocol().get(),
500 );
501 vars.insert("kafka.sasl_mechanism".into(), kafka::sasl_mechanism().get());
502 vars.insert("kafka.sasl_username".into(), kafka::sasl_username().get());
503
504 vars.insert("vault.addr".into(), vault::addr().get());
506 vars.insert("vault.namespace".into(), vault::namespace().get());
507
508 vars.insert("aws.region".into(), aws::region().get());
510
511 vars.insert("clickhouse.host".into(), clickhouse::host().get());
513 vars.insert("clickhouse.database".into(), clickhouse::database().get());
514
515 vars
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use std::sync::Mutex;
522
523 static ENV_LOCK: Mutex<()> = Mutex::new(());
526
527 fn setup() {
528 reset_deprecation_warnings();
529 }
530
531 #[test]
532 fn test_env_var_standard_name() {
533 let _lock = ENV_LOCK.lock().unwrap();
534 setup();
535 temp_env::with_var("TEST_STANDARD_VAR", Some("standard_value"), || {
536 let var = EnvVar::new("TEST_STANDARD_VAR").with_legacy("TEST_LEGACY_VAR");
537 assert_eq!(var.get(), Some("standard_value".to_string()));
538 });
539 }
540
541 #[test]
542 fn test_env_var_legacy_fallback() {
543 let _lock = ENV_LOCK.lock().unwrap();
544 setup();
545 temp_env::with_var("TEST_LEGACY_VAR2", Some("legacy_value"), || {
546 let var = EnvVar::new("TEST_STANDARD_VAR2").with_legacy("TEST_LEGACY_VAR2");
547 assert_eq!(var.get(), Some("legacy_value".to_string()));
548 });
549 }
550
551 #[test]
552 fn test_env_var_standard_takes_precedence() {
553 let _lock = ENV_LOCK.lock().unwrap();
554 setup();
555 temp_env::with_vars(
556 [
557 ("TEST_STANDARD_VAR3", Some("standard")),
558 ("TEST_LEGACY_VAR3", Some("legacy")),
559 ],
560 || {
561 let var = EnvVar::new("TEST_STANDARD_VAR3").with_legacy("TEST_LEGACY_VAR3");
562 assert_eq!(var.get(), Some("standard".to_string()));
563 },
564 );
565 }
566
567 #[test]
568 fn test_env_var_missing() {
569 let _lock = ENV_LOCK.lock().unwrap();
570 setup();
571 let var = EnvVar::new("NONEXISTENT_VAR").with_legacy("ALSO_NONEXISTENT");
572 assert_eq!(var.get(), None);
573 }
574
575 #[test]
576 fn test_env_var_get_bool() {
577 let _lock = ENV_LOCK.lock().unwrap();
578 setup();
579 temp_env::with_vars(
580 [
581 ("TEST_BOOL_TRUE", Some("true")),
582 ("TEST_BOOL_ONE", Some("1")),
583 ("TEST_BOOL_YES", Some("YES")),
584 ("TEST_BOOL_FALSE", Some("false")),
585 ],
586 || {
587 assert_eq!(EnvVar::new("TEST_BOOL_TRUE").get_bool(), Some(true));
588 assert_eq!(EnvVar::new("TEST_BOOL_ONE").get_bool(), Some(true));
589 assert_eq!(EnvVar::new("TEST_BOOL_YES").get_bool(), Some(true));
590 assert_eq!(EnvVar::new("TEST_BOOL_FALSE").get_bool(), Some(false));
591 },
592 );
593 }
594
595 #[test]
596 fn test_env_var_get_list() {
597 let _lock = ENV_LOCK.lock().unwrap();
598 setup();
599 temp_env::with_var("TEST_LIST", Some("a, b, c"), || {
600 let var = EnvVar::new("TEST_LIST");
601 assert_eq!(
602 var.get_list(),
603 Some(vec!["a".to_string(), "b".to_string(), "c".to_string()])
604 );
605 });
606 }
607
608 #[test]
609 fn test_postgres_env_vars() {
610 let _lock = ENV_LOCK.lock().unwrap();
611 setup();
612 temp_env::with_var("PGHOST", Some("localhost"), || {
613 assert_eq!(postgres::host().get(), Some("localhost".to_string()));
614 });
615 }
616
617 #[test]
618 fn test_postgres_legacy_fallback() {
619 let _lock = ENV_LOCK.lock().unwrap();
620 setup();
621 temp_env::with_vars(
622 [
623 ("PGHOST", None::<&str>),
624 ("POSTGRESQL_HOST", Some("legacy-host")),
625 ],
626 || assert_eq!(postgres::host().get(), Some("legacy-host".to_string())),
627 );
628 }
629
630 #[test]
631 fn test_kafka_env_vars() {
632 let _lock = ENV_LOCK.lock().unwrap();
633 setup();
634 temp_env::with_var("KAFKA_BOOTSTRAP_SERVERS", Some("kafka:9092"), || {
635 assert_eq!(
636 kafka::bootstrap_servers().get(),
637 Some("kafka:9092".to_string())
638 );
639 });
640 }
641
642 #[test]
643 fn test_vault_env_vars() {
644 let _lock = ENV_LOCK.lock().unwrap();
645 setup();
646 temp_env::with_var("VAULT_ADDR", Some("https://vault:8200"), || {
647 assert_eq!(vault::addr().get(), Some("https://vault:8200".to_string()));
648 });
649 }
650
651 #[test]
652 fn test_vault_openbao_fallback() {
653 let _lock = ENV_LOCK.lock().unwrap();
654 setup();
655 temp_env::with_vars(
656 [
657 ("VAULT_ADDR", None::<&str>),
658 ("OPENBAO_ADDR", Some("https://openbao:8200")),
659 ],
660 || {
661 assert_eq!(
662 vault::addr().get(),
663 Some("https://openbao:8200".to_string())
664 );
665 },
666 );
667 }
668
669 #[test]
670 fn test_which_name_used() {
671 let _lock = ENV_LOCK.lock().unwrap();
672 setup();
673 temp_env::with_var("TEST_WHICH_LEGACY", Some("value"), || {
674 let var = EnvVar::new("TEST_WHICH_STANDARD").with_legacy("TEST_WHICH_LEGACY");
675 assert_eq!(var.which_name_used(), Some("TEST_WHICH_LEGACY"));
676 });
677 }
678}