pub use zerodds_bridge_security::{
Acl, AclEntry, AclOp, AuthError, AuthMode, AuthSubject, SecurityConfig, SecurityCtx,
SecurityError, authorize, build_client_tls_connector, build_ctx, extract_mtls_subject,
parse_server_name,
};
use std::path::Path;
use std::sync::Arc;
#[derive(Debug, Clone, Default)]
pub struct AmqpSecurityConfig {
pub tls_enabled: bool,
pub tls_ca_file: String,
pub tls_cert_file: String,
pub tls_key_file: String,
pub tls_server_name: String,
pub auth_mode: String,
pub bearer_token: Option<String>,
pub bearer_subject: Option<String>,
pub sasl_username: Option<String>,
pub sasl_password: Option<String>,
pub topic_acl: std::collections::HashMap<String, (Vec<String>, Vec<String>)>,
}
pub fn ctx_from_amqp_config(
cfg: &AmqpSecurityConfig,
) -> Result<(SecurityCtx, Option<Arc<rustls::ClientConfig>>), SecurityError> {
let mut sc = SecurityConfig {
auth_mode: cfg.auth_mode.clone(),
..Default::default()
};
if let (Some(tok), Some(subj)) = (cfg.bearer_token.as_ref(), cfg.bearer_subject.as_ref()) {
sc.bearer_tokens.insert(tok.clone(), subj.clone());
} else if let Some(tok) = cfg.bearer_token.as_ref() {
sc.bearer_tokens.insert(tok.clone(), "anonymous".into());
}
if let (Some(u), Some(p)) = (cfg.sasl_username.as_ref(), cfg.sasl_password.as_ref()) {
sc.sasl_users.insert(u.clone(), p.clone());
}
for (topic, (read, write)) in &cfg.topic_acl {
sc.topic_acl.insert(
topic.clone(),
AclEntry {
read: read.clone(),
write: write.clone(),
},
);
}
let ctx = build_ctx(&sc)?;
let client_tls = if cfg.tls_enabled {
let ca: Option<&Path> = if cfg.tls_ca_file.is_empty() {
None
} else {
Some(Path::new(&cfg.tls_ca_file))
};
let cert: Option<&Path> = if cfg.tls_cert_file.is_empty() {
None
} else {
Some(Path::new(&cfg.tls_cert_file))
};
let key: Option<&Path> = if cfg.tls_key_file.is_empty() {
None
} else {
Some(Path::new(&cfg.tls_key_file))
};
Some(build_client_tls_connector(ca, cert, key).map_err(SecurityError::Tls)?)
} else {
None
};
Ok((ctx, client_tls))
}
#[must_use]
pub fn sasl_plain_init_response(cfg: &AmqpSecurityConfig) -> Option<Vec<u8>> {
if !matches!(cfg.auth_mode.as_str(), "sasl" | "sasl_plain") {
return None;
}
let user = cfg.sasl_username.as_deref()?;
let pass = cfg.sasl_password.as_deref()?;
let mut buf = Vec::with_capacity(2 + user.len() + pass.len());
buf.push(0);
buf.extend_from_slice(user.as_bytes());
buf.push(0);
buf.extend_from_slice(pass.as_bytes());
Some(buf)
}
pub fn authenticate_amqp_sasl(
auth: &AuthMode,
sasl_init_response: Option<&[u8]>,
mtls_subject: Option<AuthSubject>,
) -> Result<AuthSubject, AuthError> {
zerodds_bridge_security::authenticate(auth, None, sasl_init_response, mtls_subject)
}
pub fn authenticate_amqp_bearer(
auth: &AuthMode,
bearer_value: Option<&str>,
) -> Result<AuthSubject, AuthError> {
let header = bearer_value.map(|t| format!("Bearer {t}"));
zerodds_bridge_security::authenticate(auth, header.as_deref(), None, None)
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn amqp_sasl_plain_accepts() {
let mut users = HashMap::new();
users.insert("alice".to_string(), "wonderland".to_string());
let auth = AuthMode::SaslPlain { users };
let s = authenticate_amqp_sasl(&auth, Some(b"\0alice\0wonderland"), None).unwrap();
assert_eq!(s.name, "alice");
}
#[test]
fn amqp_sasl_plain_rejects_unknown_user() {
let users = HashMap::new();
let auth = AuthMode::SaslPlain { users };
let err = authenticate_amqp_sasl(&auth, Some(b"\0bob\0xx"), None).unwrap_err();
assert!(matches!(err, AuthError::Rejected(_)));
}
#[test]
fn amqp_bearer_via_application_properties() {
let mut tokens = HashMap::new();
tokens.insert("tk".into(), AuthSubject::new("alice"));
let auth = AuthMode::Bearer { tokens };
let s = authenticate_amqp_bearer(&auth, Some("tk")).unwrap();
assert_eq!(s.name, "alice");
}
#[test]
fn amqp_acl_address_check() {
let mut cfg = SecurityConfig::default();
cfg.topic_acl.insert(
"queue/orders".into(),
AclEntry {
read: vec!["alice".into()],
write: vec!["alice".into()],
},
);
let ctx = build_ctx(&cfg).unwrap();
let alice = AuthSubject::new("alice");
let bob = AuthSubject::new("bob");
assert!(authorize(&ctx.acl, &alice, AclOp::Read, "queue/orders"));
assert!(!authorize(&ctx.acl, &bob, AclOp::Read, "queue/orders"));
}
#[test]
fn amqp_none_mode_yields_anonymous_for_sasl_anonymous() {
let s = authenticate_amqp_sasl(&AuthMode::None, None, None).unwrap();
assert_eq!(s.name, "anonymous");
}
}