Skip to main content

faucet_common_kafka/
auth.rs

1//! Kafka authentication modes.
2
3use rdkafka::ClientConfig;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::path::{Path, PathBuf};
7
8/// SCRAM hash algorithm used for SASL/SCRAM authentication.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
10#[serde(rename_all = "snake_case")]
11pub enum ScramMechanism {
12    /// SCRAM-SHA-256
13    Sha256,
14    /// SCRAM-SHA-512
15    Sha512,
16}
17
18/// Basic username/password credentials reused across auth modes.
19#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
20pub struct BasicAuth {
21    pub username: String,
22    pub password: String,
23}
24
25/// Kafka broker authentication configuration.
26#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
27#[serde(tag = "type", content = "config", rename_all = "snake_case")]
28pub enum KafkaAuth {
29    /// No authentication — plaintext brokers only.
30    #[default]
31    None,
32    /// SASL/PLAIN username + password.
33    SaslPlain { username: String, password: String },
34    /// SASL/SCRAM-SHA-256 or SCRAM-SHA-512.
35    SaslScram {
36        /// Hash algorithm variant.
37        mechanism: ScramMechanism,
38        username: String,
39        password: String,
40    },
41    /// SSL/TLS client-certificate authentication (path-based).
42    Ssl {
43        /// Path to the CA certificate file.
44        ca_path: PathBuf,
45        /// Path to the client certificate file.
46        cert_path: PathBuf,
47        /// Path to the client private key file.
48        key_path: PathBuf,
49        /// Optional passphrase for the private key.
50        #[serde(default, skip_serializing_if = "Option::is_none")]
51        key_password: Option<String>,
52    },
53    /// SASL over SSL — combines a SASL mechanism with TLS transport.
54    SaslSsl {
55        /// Inner SASL auth (must be `SaslPlain` or `SaslScram`).
56        sasl: Box<KafkaAuth>,
57        /// TLS layer (must be `Ssl`).
58        ssl: Box<KafkaAuth>,
59    },
60}
61
62impl KafkaAuth {
63    /// Apply this authentication configuration to an `rdkafka::ClientConfig`.
64    ///
65    /// Returns `FaucetError::Config` if SSL paths are missing or SASL fields are empty.
66    pub fn apply(&self, config: &mut ClientConfig) -> Result<(), faucet_core::FaucetError> {
67        match self {
68            KafkaAuth::None => {
69                config.set("security.protocol", "PLAINTEXT");
70            }
71            KafkaAuth::SaslPlain { username, password } => {
72                Self::require_nonempty("username", username)?;
73                Self::require_nonempty("password", password)?;
74                config.set("security.protocol", "SASL_PLAINTEXT");
75                config.set("sasl.mechanism", "PLAIN");
76                config.set("sasl.username", username);
77                config.set("sasl.password", password);
78            }
79            KafkaAuth::SaslScram {
80                mechanism,
81                username,
82                password,
83            } => {
84                Self::require_nonempty("username", username)?;
85                Self::require_nonempty("password", password)?;
86                config.set("security.protocol", "SASL_PLAINTEXT");
87                config.set("sasl.mechanism", mechanism.as_str());
88                config.set("sasl.username", username);
89                config.set("sasl.password", password);
90            }
91            KafkaAuth::Ssl {
92                ca_path,
93                cert_path,
94                key_path,
95                key_password,
96            } => {
97                Self::require_path("ca_path", ca_path)?;
98                Self::require_path("cert_path", cert_path)?;
99                Self::require_path("key_path", key_path)?;
100                config.set("security.protocol", "SSL");
101                config.set("ssl.ca.location", path_str(ca_path));
102                config.set("ssl.certificate.location", path_str(cert_path));
103                config.set("ssl.key.location", path_str(key_path));
104                if let Some(pw) = key_password {
105                    config.set("ssl.key.password", pw);
106                }
107            }
108            KafkaAuth::SaslSsl { sasl, ssl } => {
109                // Apply SSL settings first, then SASL settings, then override
110                // security.protocol to SASL_SSL.
111                ssl.apply(config)?;
112                sasl.apply(config)?;
113                config.set("security.protocol", "SASL_SSL");
114            }
115        }
116        Ok(())
117    }
118
119    fn require_nonempty(field: &str, value: &str) -> Result<(), faucet_core::FaucetError> {
120        if value.is_empty() {
121            Err(faucet_core::FaucetError::Config(format!(
122                "kafka auth field '{field}' must not be empty"
123            )))
124        } else {
125            Ok(())
126        }
127    }
128
129    fn require_path(field: &str, path: &Path) -> Result<(), faucet_core::FaucetError> {
130        if !path.exists() {
131            return Err(faucet_core::FaucetError::Config(format!(
132                "kafka auth path '{field}' does not exist: {}",
133                path.display()
134            )));
135        }
136        Ok(())
137    }
138}
139
140impl ScramMechanism {
141    /// Returns the librdkafka mechanism string for this SCRAM variant.
142    pub fn as_str(&self) -> &'static str {
143        match self {
144            ScramMechanism::Sha256 => "SCRAM-SHA-256",
145            ScramMechanism::Sha512 => "SCRAM-SHA-512",
146        }
147    }
148}
149
150fn path_str(p: &Path) -> String {
151    p.to_string_lossy().into_owned()
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn apply_none_sets_plaintext() {
160        let mut cfg = ClientConfig::new();
161        KafkaAuth::None.apply(&mut cfg).unwrap();
162        assert_eq!(cfg.get("security.protocol"), Some("PLAINTEXT"));
163    }
164
165    #[test]
166    fn apply_sasl_plain_sets_expected_keys() {
167        let mut cfg = ClientConfig::new();
168        KafkaAuth::SaslPlain {
169            username: "alice".into(),
170            password: "secret".into(),
171        }
172        .apply(&mut cfg)
173        .unwrap();
174        assert_eq!(cfg.get("security.protocol"), Some("SASL_PLAINTEXT"));
175        assert_eq!(cfg.get("sasl.mechanism"), Some("PLAIN"));
176        assert_eq!(cfg.get("sasl.username"), Some("alice"));
177        assert_eq!(cfg.get("sasl.password"), Some("secret"));
178    }
179
180    #[test]
181    fn apply_sasl_scram_sha512() {
182        let mut cfg = ClientConfig::new();
183        KafkaAuth::SaslScram {
184            mechanism: ScramMechanism::Sha512,
185            username: "bob".into(),
186            password: "pw".into(),
187        }
188        .apply(&mut cfg)
189        .unwrap();
190        assert_eq!(cfg.get("sasl.mechanism"), Some("SCRAM-SHA-512"));
191    }
192
193    #[test]
194    fn apply_sasl_scram_sha256() {
195        let mut cfg = ClientConfig::new();
196        KafkaAuth::SaslScram {
197            mechanism: ScramMechanism::Sha256,
198            username: "bob".into(),
199            password: "pw".into(),
200        }
201        .apply(&mut cfg)
202        .unwrap();
203        assert_eq!(cfg.get("sasl.mechanism"), Some("SCRAM-SHA-256"));
204    }
205
206    #[test]
207    fn apply_ssl_sets_key_password_when_provided() {
208        let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("Cargo.toml");
209        let mut cfg = ClientConfig::new();
210        KafkaAuth::Ssl {
211            ca_path: manifest.clone(),
212            cert_path: manifest.clone(),
213            key_path: manifest,
214            key_password: Some("topsecret".into()),
215        }
216        .apply(&mut cfg)
217        .unwrap();
218        assert_eq!(cfg.get("ssl.key.password"), Some("topsecret"));
219    }
220
221    #[test]
222    fn apply_sasl_plain_rejects_empty_username() {
223        let mut cfg = ClientConfig::new();
224        let err = KafkaAuth::SaslPlain {
225            username: String::new(),
226            password: "x".into(),
227        }
228        .apply(&mut cfg)
229        .unwrap_err();
230        let msg = format!("{err}");
231        assert!(msg.contains("username"));
232    }
233
234    #[test]
235    fn apply_ssl_rejects_missing_ca_path() {
236        let mut cfg = ClientConfig::new();
237        let err = KafkaAuth::Ssl {
238            ca_path: PathBuf::from("/nonexistent/ca.pem"),
239            cert_path: PathBuf::from("/nonexistent/cert.pem"),
240            key_path: PathBuf::from("/nonexistent/key.pem"),
241            key_password: None,
242        }
243        .apply(&mut cfg)
244        .unwrap_err();
245        assert!(format!("{err}").contains("ca_path"));
246    }
247
248    #[test]
249    fn sasl_ssl_overrides_protocol() {
250        // Use the crate's Cargo.toml which is guaranteed to exist.
251        let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("Cargo.toml");
252        let mut cfg = ClientConfig::new();
253        KafkaAuth::SaslSsl {
254            sasl: Box::new(KafkaAuth::SaslPlain {
255                username: "u".into(),
256                password: "p".into(),
257            }),
258            ssl: Box::new(KafkaAuth::Ssl {
259                ca_path: manifest.clone(),
260                cert_path: manifest.clone(),
261                key_path: manifest,
262                key_password: None,
263            }),
264        }
265        .apply(&mut cfg)
266        .unwrap();
267        assert_eq!(cfg.get("security.protocol"), Some("SASL_SSL"));
268        assert_eq!(cfg.get("sasl.username"), Some("u"));
269        // Confirm the inner ssl.apply() actually wrote the SSL location properties
270        // before SaslSsl flipped the protocol — guards against a dropped ssl.apply() call.
271        assert!(cfg.get("ssl.ca.location").is_some());
272        assert!(cfg.get("ssl.certificate.location").is_some());
273        assert!(cfg.get("ssl.key.location").is_some());
274    }
275
276    #[test]
277    fn serde_round_trip_sasl_plain() {
278        let auth = KafkaAuth::SaslPlain {
279            username: "alice".into(),
280            password: "secret".into(),
281        };
282        let serialized = serde_json::to_value(&auth).unwrap();
283        assert_eq!(serialized["type"], "sasl_plain");
284        let parsed: KafkaAuth = serde_json::from_value(serialized).unwrap();
285        match parsed {
286            KafkaAuth::SaslPlain { username, .. } => assert_eq!(username, "alice"),
287            _ => panic!("wrong variant"),
288        }
289    }
290
291    #[test]
292    fn schema_for_kafka_auth_compiles() {
293        let _ = schemars::schema_for!(KafkaAuth);
294    }
295}