1use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13use crate::acl::{Acl, AclEntry, AclOp};
14use crate::auth::{AuthError, AuthInput, AuthMode, AuthSubject};
15use crate::tls::{TlsConfigError, load_server_config, load_server_config_with_client_auth};
16
17#[derive(Clone)]
19pub struct SecurityCtx {
20 pub tls: Option<Arc<rustls::ServerConfig>>,
22 pub auth: Arc<AuthMode>,
24 pub acl: Arc<Acl>,
26}
27
28impl core::fmt::Debug for SecurityCtx {
29 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
30 f.debug_struct("SecurityCtx")
31 .field("tls_enabled", &self.tls.is_some())
32 .field("auth", &self.auth)
33 .finish_non_exhaustive()
34 }
35}
36
37#[derive(Debug, Clone, Default)]
39pub struct SecurityConfig {
40 pub tls_cert: Option<PathBuf>,
42 pub tls_key: Option<PathBuf>,
44 pub client_ca: Option<PathBuf>,
46 pub auth_mode: String,
48 pub bearer_tokens: HashMap<String, String>,
50 pub jwt_pubkey_der: Option<Vec<u8>>,
52 pub jwt_expected_iss: Option<String>,
54 pub sasl_users: HashMap<String, String>,
56 pub topic_acl: HashMap<String, AclEntry>,
58 pub topic_acl_default: Option<AclEntry>,
60}
61
62#[derive(Debug)]
64pub enum SecurityError {
65 Tls(TlsConfigError),
67 UnknownAuthMode(String),
69 MissingAuthInput(&'static str),
71}
72
73impl core::fmt::Display for SecurityError {
74 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
75 match self {
76 Self::Tls(e) => write!(f, "tls: {e}"),
77 Self::UnknownAuthMode(m) => write!(f, "unknown auth-mode: {m}"),
78 Self::MissingAuthInput(s) => write!(f, "missing auth input: {s}"),
79 }
80 }
81}
82
83impl std::error::Error for SecurityError {}
84
85impl From<TlsConfigError> for SecurityError {
86 fn from(e: TlsConfigError) -> Self {
87 Self::Tls(e)
88 }
89}
90
91pub fn build_ctx(cfg: &SecurityConfig) -> Result<SecurityCtx, SecurityError> {
97 let tls = match (&cfg.tls_cert, &cfg.tls_key) {
98 (Some(c), Some(k)) => match &cfg.client_ca {
99 Some(ca) => Some(load_server_config_with_client_auth(c, k, ca)?),
100 None => Some(load_server_config(c, k)?),
101 },
102 (None, None) => None,
103 _ => {
104 return Err(SecurityError::Tls(TlsConfigError::Rustls(
105 "tls_cert and tls_key must be set together".to_string(),
106 )));
107 }
108 };
109
110 let auth = match cfg.auth_mode.as_str() {
111 "" | "none" => AuthMode::None,
112 "bearer" => {
113 let mut tokens: HashMap<String, AuthSubject> = HashMap::new();
114 for (tok, name) in &cfg.bearer_tokens {
115 tokens.insert(tok.clone(), AuthSubject::new(name.clone()));
116 }
117 AuthMode::Bearer { tokens }
118 }
119 "jwt" => {
120 let der = cfg
121 .jwt_pubkey_der
122 .clone()
123 .ok_or(SecurityError::MissingAuthInput("jwt_pubkey_der"))?;
124 AuthMode::Jwt {
125 pkcs1_pubkey_der: der,
126 expected_issuer: cfg.jwt_expected_iss.clone(),
127 }
128 }
129 "mtls" => AuthMode::Mtls,
130 "sasl" => {
131 if cfg.sasl_users.is_empty() {
132 return Err(SecurityError::MissingAuthInput("sasl_users"));
133 }
134 AuthMode::SaslPlain {
135 users: cfg.sasl_users.clone(),
136 }
137 }
138 other => return Err(SecurityError::UnknownAuthMode(other.to_string())),
139 };
140
141 let mut acl = if cfg.topic_acl.is_empty() && cfg.topic_acl_default.is_none() {
142 if matches!(auth, AuthMode::None) {
143 Acl::allow_all()
144 } else {
145 Acl::deny_all()
146 }
147 } else {
148 Acl::deny_all()
149 };
150 for (topic, entry) in &cfg.topic_acl {
151 acl.set(topic.clone(), entry.clone());
152 }
153 if let Some(d) = &cfg.topic_acl_default {
154 acl.set_default(d.clone());
155 }
156
157 Ok(SecurityCtx {
158 tls,
159 auth: Arc::new(auth),
160 acl: Arc::new(acl),
161 })
162}
163
164pub fn authenticate(
173 auth: &AuthMode,
174 authorization_header: Option<&str>,
175 sasl_plain_blob: Option<&[u8]>,
176 mtls_subject: Option<AuthSubject>,
177) -> Result<AuthSubject, AuthError> {
178 let input = AuthInput {
179 authorization_header,
180 sasl_plain_blob,
181 mtls_subject,
182 };
183 auth.validate(&input)
184}
185
186#[must_use]
188pub fn authorize(acl: &Acl, subject: &AuthSubject, op: AclOp, topic: &str) -> bool {
189 acl.check(subject, op, topic)
190}
191
192#[must_use]
199pub fn extract_mtls_subject(conn: &rustls::ServerConnection) -> Option<AuthSubject> {
200 let certs = conn.peer_certificates()?;
201 let leaf = certs.first()?;
202 let hash = sha256_hex(leaf.as_ref());
206 Some(AuthSubject::new(format!("mtls:{hash}")))
207}
208
209fn sha256_hex(data: &[u8]) -> String {
210 use ring::digest::{Context, SHA256};
211 let mut ctx = Context::new(&SHA256);
212 ctx.update(data);
213 let d = ctx.finish();
214 let mut s = String::with_capacity(64);
215 for b in d.as_ref() {
216 s.push_str(&format!("{b:02x}"));
217 }
218 s
219}
220
221#[cfg(test)]
222#[allow(clippy::expect_used, clippy::unwrap_used)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn build_default_yields_none_auth_allow_all_acl() {
228 let cfg = SecurityConfig::default();
229 let ctx = build_ctx(&cfg).unwrap();
230 assert!(ctx.tls.is_none());
231 assert!(matches!(*ctx.auth, AuthMode::None));
232 assert!(ctx.acl.check(&AuthSubject::anonymous(), AclOp::Read, "X"));
233 }
234
235 #[test]
236 fn build_bearer_with_tokens() {
237 let mut cfg = SecurityConfig {
238 auth_mode: "bearer".into(),
239 ..Default::default()
240 };
241 cfg.bearer_tokens.insert("t".into(), "alice".into());
242 let ctx = build_ctx(&cfg).unwrap();
243 let s = authenticate(&ctx.auth, Some("Bearer t"), None, None).unwrap();
244 assert_eq!(s.name, "alice");
245 }
246
247 #[test]
248 fn build_sasl_with_users() {
249 let mut cfg = SecurityConfig {
250 auth_mode: "sasl".into(),
251 ..Default::default()
252 };
253 cfg.sasl_users.insert("u".into(), "p".into());
254 let ctx = build_ctx(&cfg).unwrap();
255 let s = authenticate(&ctx.auth, None, Some(b"\0u\0p"), None).unwrap();
256 assert_eq!(s.name, "u");
257 }
258
259 #[test]
260 fn build_sasl_without_users_rejected() {
261 let cfg = SecurityConfig {
262 auth_mode: "sasl".into(),
263 ..Default::default()
264 };
265 let err = build_ctx(&cfg).unwrap_err();
266 assert!(matches!(err, SecurityError::MissingAuthInput(_)));
267 }
268
269 #[test]
270 fn unknown_auth_mode_rejected() {
271 let cfg = SecurityConfig {
272 auth_mode: "weird".into(),
273 ..Default::default()
274 };
275 let err = build_ctx(&cfg).unwrap_err();
276 assert!(matches!(err, SecurityError::UnknownAuthMode(_)));
277 }
278
279 #[test]
280 fn jwt_without_key_rejected() {
281 let cfg = SecurityConfig {
282 auth_mode: "jwt".into(),
283 ..Default::default()
284 };
285 let err = build_ctx(&cfg).unwrap_err();
286 assert!(matches!(err, SecurityError::MissingAuthInput(_)));
287 }
288
289 #[test]
290 fn explicit_acl_overrides_open_default() {
291 let mut cfg = SecurityConfig::default();
292 cfg.topic_acl.insert(
293 "T".into(),
294 AclEntry {
295 read: vec!["alice".into()],
296 write: vec!["alice".into()],
297 },
298 );
299 let ctx = build_ctx(&cfg).unwrap();
300 let alice = AuthSubject::new("alice");
301 let bob = AuthSubject::new("bob");
302 assert!(authorize(&ctx.acl, &alice, AclOp::Read, "T"));
303 assert!(!authorize(&ctx.acl, &bob, AclOp::Read, "T"));
304 assert!(!authorize(&ctx.acl, &alice, AclOp::Read, "Other"));
305 }
306
307 #[test]
308 fn explicit_acl_default_used_for_unknown() {
309 let cfg = SecurityConfig {
310 topic_acl_default: Some(AclEntry {
311 read: vec!["*".into()],
312 write: vec![],
313 }),
314 ..SecurityConfig::default()
315 };
316 let ctx = build_ctx(&cfg).unwrap();
317 let bob = AuthSubject::new("bob");
318 assert!(authorize(&ctx.acl, &bob, AclOp::Read, "Anything"));
319 assert!(!authorize(&ctx.acl, &bob, AclOp::Write, "Anything"));
320 }
321
322 #[test]
323 fn partial_tls_paths_rejected() {
324 let cfg = SecurityConfig {
325 tls_cert: Some("/x".into()),
326 tls_key: None,
327 ..Default::default()
328 };
329 let err = build_ctx(&cfg).unwrap_err();
330 assert!(matches!(err, SecurityError::Tls(_)));
331 }
332}