1use rdkafka::ClientConfig;
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6use std::path::{Path, PathBuf};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
10#[serde(rename_all = "snake_case")]
11pub enum ScramMechanism {
12 Sha256,
14 Sha512,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
20pub struct BasicAuth {
21 pub username: String,
22 pub password: String,
23}
24
25#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
27#[serde(tag = "type", content = "config", rename_all = "snake_case")]
28pub enum KafkaAuth {
29 #[default]
31 None,
32 SaslPlain { username: String, password: String },
34 SaslScram {
36 mechanism: ScramMechanism,
38 username: String,
39 password: String,
40 },
41 Ssl {
43 ca_path: PathBuf,
45 cert_path: PathBuf,
47 key_path: PathBuf,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
51 key_password: Option<String>,
52 },
53 SaslSsl {
55 sasl: Box<KafkaAuth>,
57 ssl: Box<KafkaAuth>,
59 },
60}
61
62impl KafkaAuth {
63 pub fn apply(&self, config: &mut ClientConfig) -> Result<(), faucet_core::FaucetError> {
67 match self {
68 KafkaAuth::None => {
69 config.set("security.protocol", "PLAINTEXT");
70 }
71 KafkaAuth::SaslPlain { username, password } => {
72 Self::require_nonempty("username", username)?;
73 Self::require_nonempty("password", password)?;
74 config.set("security.protocol", "SASL_PLAINTEXT");
75 config.set("sasl.mechanism", "PLAIN");
76 config.set("sasl.username", username);
77 config.set("sasl.password", password);
78 }
79 KafkaAuth::SaslScram {
80 mechanism,
81 username,
82 password,
83 } => {
84 Self::require_nonempty("username", username)?;
85 Self::require_nonempty("password", password)?;
86 config.set("security.protocol", "SASL_PLAINTEXT");
87 config.set("sasl.mechanism", mechanism.as_str());
88 config.set("sasl.username", username);
89 config.set("sasl.password", password);
90 }
91 KafkaAuth::Ssl {
92 ca_path,
93 cert_path,
94 key_path,
95 key_password,
96 } => {
97 Self::require_path("ca_path", ca_path)?;
98 Self::require_path("cert_path", cert_path)?;
99 Self::require_path("key_path", key_path)?;
100 config.set("security.protocol", "SSL");
101 config.set("ssl.ca.location", path_str(ca_path));
102 config.set("ssl.certificate.location", path_str(cert_path));
103 config.set("ssl.key.location", path_str(key_path));
104 if let Some(pw) = key_password {
105 config.set("ssl.key.password", pw);
106 }
107 }
108 KafkaAuth::SaslSsl { sasl, ssl } => {
109 ssl.apply(config)?;
112 sasl.apply(config)?;
113 config.set("security.protocol", "SASL_SSL");
114 }
115 }
116 Ok(())
117 }
118
119 fn require_nonempty(field: &str, value: &str) -> Result<(), faucet_core::FaucetError> {
120 if value.is_empty() {
121 Err(faucet_core::FaucetError::Config(format!(
122 "kafka auth field '{field}' must not be empty"
123 )))
124 } else {
125 Ok(())
126 }
127 }
128
129 fn require_path(field: &str, path: &Path) -> Result<(), faucet_core::FaucetError> {
130 if !path.exists() {
131 return Err(faucet_core::FaucetError::Config(format!(
132 "kafka auth path '{field}' does not exist: {}",
133 path.display()
134 )));
135 }
136 Ok(())
137 }
138}
139
140impl ScramMechanism {
141 pub fn as_str(&self) -> &'static str {
143 match self {
144 ScramMechanism::Sha256 => "SCRAM-SHA-256",
145 ScramMechanism::Sha512 => "SCRAM-SHA-512",
146 }
147 }
148}
149
150fn path_str(p: &Path) -> String {
151 p.to_string_lossy().into_owned()
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn apply_none_sets_plaintext() {
160 let mut cfg = ClientConfig::new();
161 KafkaAuth::None.apply(&mut cfg).unwrap();
162 assert_eq!(cfg.get("security.protocol"), Some("PLAINTEXT"));
163 }
164
165 #[test]
166 fn apply_sasl_plain_sets_expected_keys() {
167 let mut cfg = ClientConfig::new();
168 KafkaAuth::SaslPlain {
169 username: "alice".into(),
170 password: "secret".into(),
171 }
172 .apply(&mut cfg)
173 .unwrap();
174 assert_eq!(cfg.get("security.protocol"), Some("SASL_PLAINTEXT"));
175 assert_eq!(cfg.get("sasl.mechanism"), Some("PLAIN"));
176 assert_eq!(cfg.get("sasl.username"), Some("alice"));
177 assert_eq!(cfg.get("sasl.password"), Some("secret"));
178 }
179
180 #[test]
181 fn apply_sasl_scram_sha512() {
182 let mut cfg = ClientConfig::new();
183 KafkaAuth::SaslScram {
184 mechanism: ScramMechanism::Sha512,
185 username: "bob".into(),
186 password: "pw".into(),
187 }
188 .apply(&mut cfg)
189 .unwrap();
190 assert_eq!(cfg.get("sasl.mechanism"), Some("SCRAM-SHA-512"));
191 }
192
193 #[test]
194 fn apply_sasl_scram_sha256() {
195 let mut cfg = ClientConfig::new();
196 KafkaAuth::SaslScram {
197 mechanism: ScramMechanism::Sha256,
198 username: "bob".into(),
199 password: "pw".into(),
200 }
201 .apply(&mut cfg)
202 .unwrap();
203 assert_eq!(cfg.get("sasl.mechanism"), Some("SCRAM-SHA-256"));
204 }
205
206 #[test]
207 fn apply_ssl_sets_key_password_when_provided() {
208 let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("Cargo.toml");
209 let mut cfg = ClientConfig::new();
210 KafkaAuth::Ssl {
211 ca_path: manifest.clone(),
212 cert_path: manifest.clone(),
213 key_path: manifest,
214 key_password: Some("topsecret".into()),
215 }
216 .apply(&mut cfg)
217 .unwrap();
218 assert_eq!(cfg.get("ssl.key.password"), Some("topsecret"));
219 }
220
221 #[test]
222 fn apply_sasl_plain_rejects_empty_username() {
223 let mut cfg = ClientConfig::new();
224 let err = KafkaAuth::SaslPlain {
225 username: String::new(),
226 password: "x".into(),
227 }
228 .apply(&mut cfg)
229 .unwrap_err();
230 let msg = format!("{err}");
231 assert!(msg.contains("username"));
232 }
233
234 #[test]
235 fn apply_ssl_rejects_missing_ca_path() {
236 let mut cfg = ClientConfig::new();
237 let err = KafkaAuth::Ssl {
238 ca_path: PathBuf::from("/nonexistent/ca.pem"),
239 cert_path: PathBuf::from("/nonexistent/cert.pem"),
240 key_path: PathBuf::from("/nonexistent/key.pem"),
241 key_password: None,
242 }
243 .apply(&mut cfg)
244 .unwrap_err();
245 assert!(format!("{err}").contains("ca_path"));
246 }
247
248 #[test]
249 fn sasl_ssl_overrides_protocol() {
250 let manifest = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("Cargo.toml");
252 let mut cfg = ClientConfig::new();
253 KafkaAuth::SaslSsl {
254 sasl: Box::new(KafkaAuth::SaslPlain {
255 username: "u".into(),
256 password: "p".into(),
257 }),
258 ssl: Box::new(KafkaAuth::Ssl {
259 ca_path: manifest.clone(),
260 cert_path: manifest.clone(),
261 key_path: manifest,
262 key_password: None,
263 }),
264 }
265 .apply(&mut cfg)
266 .unwrap();
267 assert_eq!(cfg.get("security.protocol"), Some("SASL_SSL"));
268 assert_eq!(cfg.get("sasl.username"), Some("u"));
269 assert!(cfg.get("ssl.ca.location").is_some());
272 assert!(cfg.get("ssl.certificate.location").is_some());
273 assert!(cfg.get("ssl.key.location").is_some());
274 }
275
276 #[test]
277 fn serde_round_trip_sasl_plain() {
278 let auth = KafkaAuth::SaslPlain {
279 username: "alice".into(),
280 password: "secret".into(),
281 };
282 let serialized = serde_json::to_value(&auth).unwrap();
283 assert_eq!(serialized["type"], "sasl_plain");
284 let parsed: KafkaAuth = serde_json::from_value(serialized).unwrap();
285 match parsed {
286 KafkaAuth::SaslPlain { username, .. } => assert_eq!(username, "alice"),
287 _ => panic!("wrong variant"),
288 }
289 }
290
291 #[test]
292 fn schema_for_kafka_auth_compiles() {
293 let _ = schemars::schema_for!(KafkaAuth);
294 }
295}