#![allow(dead_code)]
use std::collections::HashMap;
use std::hash::BuildHasher;
use crabka_protocol::owned::sasl_authenticate_request::SaslAuthenticateRequest;
use crabka_protocol::owned::sasl_authenticate_response::SaslAuthenticateResponse;
use crabka_protocol::owned::sasl_handshake_request::SaslHandshakeRequest;
use crabka_protocol::owned::sasl_handshake_response::SaslHandshakeResponse;
use crabka_security::{Principal, SaslMechanism, ScramServerExchange};
#[derive(Debug)]
pub enum ConnectionAuth {
Anonymous,
Negotiating {
mechanism: SaslMechanism,
exchange: SaslExchange,
pending_token_expiry_ms: Option<i64>,
},
Authenticated {
principal: Principal,
mechanism: SaslMechanism,
expires_at_ms: Option<i64>,
authenticated_via_token: bool,
},
Reauthenticating {
previous: AuthenticatedSnapshot,
exchange: SaslExchange,
},
}
#[derive(Debug, Clone)]
pub struct AuthenticatedSnapshot {
pub principal: Principal,
pub mechanism: SaslMechanism,
pub expires_at_ms: Option<i64>,
}
#[derive(Debug)]
pub enum SaslExchange {
Plain,
ScramPending,
Scram(Box<ScramServerExchange>),
OAuthBearer,
OAuthBearerFailed,
GssapiPending,
Gssapi(Box<crabka_security::gssapi::server::GssapiServerExchange>),
}
impl ConnectionAuth {
#[must_use]
pub fn is_authenticated(&self) -> bool {
matches!(self, Self::Authenticated { .. })
}
#[must_use]
pub fn principal(&self) -> Option<&Principal> {
if let Self::Authenticated { principal, .. } = self {
Some(principal)
} else {
None
}
}
#[must_use]
pub fn authenticated_via_token(&self) -> bool {
matches!(
self,
Self::Authenticated {
authenticated_via_token: true,
..
}
)
}
#[must_use]
pub fn allows_request(&self, api_key: i16) -> bool {
match self {
Self::Anonymous | Self::Negotiating { .. } => is_pre_auth_allowed(api_key),
Self::Reauthenticating { .. } => api_key == 36,
Self::Authenticated { .. } => true,
}
}
}
#[must_use]
pub fn is_pre_auth_allowed(api_key: i16) -> bool {
matches!(api_key, 17 | 36 | 18)
}
const UNSUPPORTED_SASL_MECHANISM: i16 = 33;
const SASL_AUTHENTICATION_FAILED: i16 = 58;
const GSSAPI_MAX_RECV_SIZE: u32 = 0x1_0000;
pub fn handle_handshake(
req: &SaslHandshakeRequest,
auth: &mut ConnectionAuth,
enabled: &[SaslMechanism],
) -> SaslHandshakeResponse {
let enabled_names: Vec<String> = enabled.iter().map(|m| m.wire_name().to_string()).collect();
let requested = SaslMechanism::from_wire(&req.mechanism);
if let ConnectionAuth::Authenticated {
mechanism: current, ..
} = auth
{
let current = *current;
match requested {
Some(m) if m == current => {
let prev = std::mem::replace(auth, ConnectionAuth::Anonymous);
let ConnectionAuth::Authenticated {
principal,
mechanism,
expires_at_ms,
authenticated_via_token: _,
} = prev
else {
unreachable!("matched Authenticated above");
};
let exchange = exchange_for_mechanism(m);
*auth = ConnectionAuth::Reauthenticating {
previous: AuthenticatedSnapshot {
principal,
mechanism,
expires_at_ms,
},
exchange,
};
return SaslHandshakeResponse {
error_code: 0,
mechanisms: enabled_names,
..Default::default()
};
}
_ => {
tracing::debug!(
requested = %req.mechanism,
"SaslHandshake: mechanism switch on authenticated connection (ILLEGAL_SASL_STATE)"
);
return SaslHandshakeResponse {
error_code: 34,
mechanisms: enabled_names,
..Default::default()
};
}
}
}
match requested {
Some(m) if enabled.contains(&m) => {
let exchange = exchange_for_mechanism(m);
*auth = ConnectionAuth::Negotiating {
mechanism: m,
exchange,
pending_token_expiry_ms: None,
};
SaslHandshakeResponse {
error_code: 0,
mechanisms: enabled_names,
..Default::default()
}
}
_ => {
tracing::debug!(
requested = %req.mechanism,
"SaslHandshake: unsupported mechanism"
);
SaslHandshakeResponse {
error_code: UNSUPPORTED_SASL_MECHANISM,
mechanisms: enabled_names,
..Default::default()
}
}
}
}
fn exchange_for_mechanism(m: SaslMechanism) -> SaslExchange {
match m {
SaslMechanism::Plain => SaslExchange::Plain,
SaslMechanism::ScramSha256 | SaslMechanism::ScramSha512 => SaslExchange::ScramPending,
SaslMechanism::OAuthBearer => SaslExchange::OAuthBearer,
SaslMechanism::Gssapi => SaslExchange::GssapiPending,
}
}
pub fn handle_authenticate_plain<S: BuildHasher>(
req: &SaslAuthenticateRequest,
auth: &mut ConnectionAuth,
plain_credentials: &HashMap<String, String, S>,
) -> SaslAuthenticateResponse {
let parts: Vec<&[u8]> = req.auth_bytes.split(|&b| b == 0).collect();
if parts.len() != 3 {
return fail_authenticate("malformed PLAIN payload");
}
let Ok(user) = std::str::from_utf8(parts[1]) else {
return fail_authenticate("non-utf8 username");
};
let password = parts[2];
match crabka_security::verify_plain(plain_credentials, user, password) {
Ok(p) => {
*auth = ConnectionAuth::Authenticated {
principal: p,
mechanism: SaslMechanism::Plain,
expires_at_ms: None,
authenticated_via_token: false,
};
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::new(),
session_lifetime_ms: 0,
..Default::default()
}
}
Err(_) => fail_authenticate("authentication failed"),
}
}
pub fn handle_authenticate_scram(
req: &SaslAuthenticateRequest,
auth: &mut ConnectionAuth,
controller: &dyn crate::metadata_source::MetadataSource,
) -> SaslAuthenticateResponse {
if let ConnectionAuth::Negotiating {
exchange: SaslExchange::ScramPending,
mechanism,
pending_token_expiry_ms: _,
} = auth
{
let mech = *mechanism;
let Some(username) = parse_scram_username(&req.auth_bytes) else {
return fail_authenticate("malformed SCRAM client-first");
};
let image = controller.current_image();
let (cred, principal_override, token_expiry_ms) =
if let Some(scram_cred) = image.scram_credential(&username, mech) {
(scram_cred.clone(), None, None)
} else if mech == SaslMechanism::ScramSha256 {
if let Some(token) = image.delegation_token_by_id(&username) {
let synth = synthesize_token_scram_credential(token);
let owner = Principal {
name: token.owner.name.clone(),
auth_method: crabka_security::AuthMethod::SaslScramSha256,
groups: vec![],
};
(synth, Some(owner), Some(token.expiry_timestamp_ms))
} else {
return fail_authenticate("unknown user");
}
} else {
return fail_authenticate("unknown user");
};
let mut server = match principal_override {
Some(p) => ScramServerExchange::new_with_principal(username, cred, p),
None => ScramServerExchange::new(username, cred),
};
match server.step(&req.auth_bytes) {
crabka_security::StepResult::Continue(bytes) => {
*auth = ConnectionAuth::Negotiating {
mechanism: mech,
exchange: SaslExchange::Scram(Box::new(server)),
pending_token_expiry_ms: token_expiry_ms,
};
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::from(bytes),
session_lifetime_ms: 0,
..Default::default()
}
}
crabka_security::StepResult::Done(_, _) => {
fail_authenticate("SCRAM server completed in one round")
}
crabka_security::StepResult::Failed(_) => fail_authenticate("SCRAM step failed"),
}
} else if let ConnectionAuth::Negotiating {
exchange: SaslExchange::Scram(server),
mechanism,
pending_token_expiry_ms,
} = auth
{
let mech = *mechanism;
let pending_token_expiry_ms = *pending_token_expiry_ms;
match server.step(&req.auth_bytes) {
crabka_security::StepResult::Continue(_) => {
fail_authenticate("SCRAM second round expected Done")
}
crabka_security::StepResult::Done(principal, bytes) => {
let session_lifetime_ms =
pending_token_expiry_ms.map_or(0, |e| (e - crate::time_util::now_ms()).max(0));
*auth = ConnectionAuth::Authenticated {
principal,
mechanism: mech,
expires_at_ms: pending_token_expiry_ms,
authenticated_via_token: pending_token_expiry_ms.is_some(),
};
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::from(bytes),
session_lifetime_ms,
..Default::default()
}
}
crabka_security::StepResult::Failed(_) => fail_authenticate("SCRAM proof failed"),
}
} else {
fail_authenticate("not in SCRAM negotiation")
}
}
const TOKEN_SCRAM_ITERS: u32 = 4096;
fn synthesize_token_scram_credential(
token: &crabka_metadata::DelegationToken,
) -> crabka_security::ScramCredential {
use base64::Engine;
let password = base64::engine::general_purpose::STANDARD.encode(&token.hmac);
let salt = token.token_id.as_bytes().to_vec();
crabka_security::scram::hash_scram_password_with_salt(
password.as_bytes(),
SaslMechanism::ScramSha256,
TOKEN_SCRAM_ITERS,
salt,
)
}
#[cfg(feature = "sspi-keytab")]
pub fn handle_authenticate_gssapi(
req: &SaslAuthenticateRequest,
auth: &mut ConnectionAuth,
config: &crabka_security::gssapi::GssapiConfig,
) -> SaslAuthenticateResponse {
use crabka_security::gssapi::server::{GssapiServerExchange, ServerStep};
if let ConnectionAuth::Negotiating {
exchange: SaslExchange::GssapiPending,
mechanism,
pending_token_expiry_ms: _,
} = auth
{
let mech = *mechanism;
let keytab = config.keytab_path.to_string_lossy();
let acceptor = match crabka_security::gssapi::provider::SspiAcceptor::new(
&keytab,
&config.service_name,
) {
Ok(a) => a,
Err(e) => return fail_authenticate(&format!("GSSAPI acceptor init failed: {e}")),
};
let mut exchange = GssapiServerExchange::new(Box::new(acceptor), GSSAPI_MAX_RECV_SIZE);
let step = match exchange.step(&req.auth_bytes) {
Ok(s) => s,
Err(e) => return fail_authenticate(&format!("GSSAPI accept failed: {e}")),
};
return match step {
ServerStep::Challenge(token) => {
*auth = ConnectionAuth::Negotiating {
mechanism: mech,
exchange: SaslExchange::Gssapi(Box::new(exchange)),
pending_token_expiry_ms: None,
};
gssapi_challenge_response(token)
}
ServerStep::Done { principal } => finish_gssapi(&principal, mech, config, auth),
};
}
if let ConnectionAuth::Negotiating {
exchange: SaslExchange::Gssapi(exchange),
mechanism,
pending_token_expiry_ms: _,
} = auth
{
let mech = *mechanism;
let step = match exchange.step(&req.auth_bytes) {
Ok(s) => s,
Err(e) => return fail_authenticate(&format!("GSSAPI step failed: {e}")),
};
return match step {
ServerStep::Challenge(token) => gssapi_challenge_response(token),
ServerStep::Done { principal } => finish_gssapi(&principal, mech, config, auth),
};
}
fail_authenticate("not in GSSAPI negotiation")
}
#[cfg(not(feature = "sspi-keytab"))]
pub fn handle_authenticate_gssapi(
_req: &SaslAuthenticateRequest,
_auth: &mut ConnectionAuth,
_config: &crabka_security::gssapi::GssapiConfig,
) -> SaslAuthenticateResponse {
fail_authenticate("GSSAPI support requires the sspi-keytab feature")
}
fn gssapi_challenge_response(token: Vec<u8>) -> SaslAuthenticateResponse {
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::from(token),
session_lifetime_ms: 0,
..Default::default()
}
}
fn finish_gssapi(
raw_principal: &str,
mech: SaslMechanism,
config: &crabka_security::gssapi::GssapiConfig,
auth: &mut ConnectionAuth,
) -> SaslAuthenticateResponse {
let short = match map_gssapi_principal(raw_principal, config) {
Ok(s) => s,
Err(e) => return fail_authenticate(&format!("GSSAPI principal mapping failed: {e}")),
};
*auth = ConnectionAuth::Authenticated {
principal: Principal {
name: short,
auth_method: crabka_security::AuthMethod::SaslGssapi,
groups: vec![],
},
mechanism: mech,
expires_at_ms: None,
authenticated_via_token: false,
};
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::new(),
session_lifetime_ms: 0,
..Default::default()
}
}
fn map_gssapi_principal(
raw: &str,
config: &crabka_security::gssapi::GssapiConfig,
) -> Result<String, crabka_security::gssapi::name::NameError> {
let (head, realm_raw) = raw.rsplit_once('@').unwrap_or((raw, ""));
let realm = realm_raw.to_uppercase();
let components: Vec<&str> = head.split('/').collect();
let default_realm = config.realm.as_deref().unwrap_or(&realm);
crabka_security::gssapi::name::apply(
&config.principal_to_local_rules,
&realm,
&components,
default_realm,
)
}
#[allow(clippy::too_many_lines)]
pub async fn handle_authenticate_oauthbearer(
req: &SaslAuthenticateRequest,
auth: &mut ConnectionAuth,
validator: &crabka_security::OAuthBearerValidator,
now_ms: i64,
max_session_lifetime_seconds: Option<u32>,
) -> SaslAuthenticateResponse {
match auth {
ConnectionAuth::Negotiating {
exchange: SaslExchange::OAuthBearer,
mechanism,
pending_token_expiry_ms: _,
} => {
let mech = *mechanism;
match validate_bearer(&req.auth_bytes, validator, now_ms).await {
Ok(outcome) => {
let raw_session_ms = outcome.expires_at_ms.map_or(0, |e| (e - now_ms).max(0));
let session_lifetime_ms = match max_session_lifetime_seconds {
Some(cap) => raw_session_ms.min(i64::from(cap) * 1000),
None => raw_session_ms,
};
let effective_expires_at_ms = Some(now_ms + session_lifetime_ms);
*auth = ConnectionAuth::Authenticated {
principal: outcome.principal,
mechanism: mech,
expires_at_ms: effective_expires_at_ms,
authenticated_via_token: false,
};
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::new(),
session_lifetime_ms,
..Default::default()
}
}
Err(reason) => {
tracing::debug!(reason, "OAUTHBEARER token rejected");
*auth = ConnectionAuth::Negotiating {
mechanism: mech,
exchange: SaslExchange::OAuthBearerFailed,
pending_token_expiry_ms: None,
};
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::from(
crabka_security::invalid_token_json().into_bytes(),
),
session_lifetime_ms: 0,
..Default::default()
}
}
}
}
ConnectionAuth::Negotiating {
exchange: SaslExchange::OAuthBearerFailed,
..
} => fail_authenticate("oauthbearer token rejected"),
ConnectionAuth::Reauthenticating {
previous,
exchange: SaslExchange::OAuthBearer,
} => {
let prev_mech = previous.mechanism;
let prev_name = previous.principal.name.clone();
match validate_bearer(&req.auth_bytes, validator, now_ms).await {
Ok(outcome) => {
if outcome.principal.name != prev_name {
tracing::debug!(
previous = %prev_name,
attempted = %outcome.principal.name,
"OAUTHBEARER re-auth principal mismatch"
);
return SaslAuthenticateResponse {
error_code: SASL_AUTHENTICATION_FAILED,
error_message: Some(
"re-authentication may not change the principal".to_string(),
),
auth_bytes: bytes::Bytes::new(),
session_lifetime_ms: 0,
..Default::default()
};
}
let raw_session_ms = outcome.expires_at_ms.map_or(0, |e| (e - now_ms).max(0));
let session_lifetime_ms = match max_session_lifetime_seconds {
Some(cap) => raw_session_ms.min(i64::from(cap) * 1000),
None => raw_session_ms,
};
let effective_expires_at_ms = Some(now_ms + session_lifetime_ms);
*auth = ConnectionAuth::Authenticated {
principal: outcome.principal,
mechanism: prev_mech,
expires_at_ms: effective_expires_at_ms,
authenticated_via_token: false,
};
SaslAuthenticateResponse {
error_code: 0,
error_message: None,
auth_bytes: bytes::Bytes::new(),
session_lifetime_ms,
..Default::default()
}
}
Err(reason) => {
tracing::debug!(reason, "OAUTHBEARER re-auth token rejected");
SaslAuthenticateResponse {
error_code: SASL_AUTHENTICATION_FAILED,
error_message: Some("re-authentication failed".to_string()),
auth_bytes: bytes::Bytes::new(),
session_lifetime_ms: 0,
..Default::default()
}
}
}
}
_ => fail_authenticate("not in oauthbearer negotiation"),
}
}
async fn validate_bearer(
auth_bytes: &[u8],
validator: &crabka_security::OAuthBearerValidator,
now_ms: i64,
) -> Result<crabka_security::AuthOutcome, &'static str> {
let parsed = crabka_security::parse_client_initial_response(auth_bytes)
.map_err(|_| "malformed OAUTHBEARER client response")?;
let outcome = validator
.validate(&parsed.token, now_ms)
.await
.map_err(|_| "token validation failed")?;
if let Some(authzid) = parsed.authzid
&& authzid != outcome.principal.name
{
return Err("authzid does not match token principal");
}
Ok(outcome)
}
fn parse_scram_username(bytes: &[u8]) -> Option<String> {
let s = std::str::from_utf8(bytes).ok()?;
let bare = s.strip_prefix("n,,")?;
for attr in bare.split(',') {
if let Some(v) = attr.strip_prefix("n=") {
return Some(v.to_string());
}
}
None
}
fn fail_authenticate(reason: &str) -> SaslAuthenticateResponse {
tracing::debug!(reason, "SASL authenticate failed");
SaslAuthenticateResponse {
error_code: SASL_AUTHENTICATION_FAILED,
error_message: Some("authentication failed".to_string()),
auth_bytes: bytes::Bytes::new(),
session_lifetime_ms: 0,
..Default::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn pre_auth_allowlist_accepts_handshake_authenticate_apiversions() {
assert!(is_pre_auth_allowed(17), "SaslHandshake");
assert!(is_pre_auth_allowed(36), "SaslAuthenticate");
assert!(is_pre_auth_allowed(18), "ApiVersions");
}
#[test]
fn pre_auth_allowlist_rejects_data_plane_apis() {
assert!(!is_pre_auth_allowed(0), "Produce");
assert!(!is_pre_auth_allowed(1), "Fetch");
assert!(!is_pre_auth_allowed(3), "Metadata");
assert!(!is_pre_auth_allowed(19), "CreateTopics");
}
#[test]
fn anonymous_is_not_authenticated() {
let a = ConnectionAuth::Anonymous;
assert!(!a.is_authenticated());
assert!(a.principal().is_none());
}
#[test]
fn negotiating_is_not_authenticated() {
let a = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::Plain,
exchange: SaslExchange::Plain,
pending_token_expiry_ms: None,
};
assert!(!a.is_authenticated());
assert!(a.principal().is_none());
}
#[test]
fn negotiating_scram_pending_is_not_authenticated() {
let a = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::ScramSha512,
exchange: SaslExchange::ScramPending,
pending_token_expiry_ms: None,
};
assert!(!a.is_authenticated());
assert!(a.principal().is_none());
}
fn unsecured_token(sub: &str, exp_s: i64) -> String {
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64;
format!(
"{}.{}.",
B64.encode(b"{\"alg\":\"none\"}"),
B64.encode(format!("{{\"sub\":\"{sub}\",\"exp\":{exp_s}}}").as_bytes())
)
}
fn oauthbearer_client_response(token: &str) -> SaslAuthenticateRequest {
SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from(
format!("n,,\u{1}auth=Bearer {token}\u{1}\u{1}").into_bytes(),
),
..Default::default()
}
}
#[test]
fn handshake_oauthbearer_transitions_to_negotiating() {
let mut auth = ConnectionAuth::Anonymous;
let req = SaslHandshakeRequest {
mechanism: "OAUTHBEARER".to_string(),
..Default::default()
};
let resp = handle_handshake(&req, &mut auth, &[SaslMechanism::OAuthBearer]);
assert!(resp.error_code == 0);
assert!(matches!(
auth,
ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
..
}
));
}
#[tokio::test]
async fn oauthbearer_valid_token_authenticates() {
let validator = crabka_security::OAuthBearerValidator::default();
let now_ms = 1_000_000_000_000;
let token = unsecured_token("svc-account", 1_000_000_900); let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
pending_token_expiry_ms: None,
};
let resp = handle_authenticate_oauthbearer(
&oauthbearer_client_response(&token),
&mut auth,
&validator,
now_ms,
None,
)
.await;
assert!(resp.error_code == 0);
assert!(resp.auth_bytes.is_empty());
let p = auth.principal().expect("authenticated");
assert!(p.name == "svc-account");
assert!(p.auth_method == crabka_security::AuthMethod::SaslOAuthBearer);
}
#[tokio::test]
async fn oauthbearer_invalid_token_returns_error_json_then_fails_on_dummy() {
let validator = crabka_security::OAuthBearerValidator::Unsecured(
crabka_security::UnsecuredJwsValidator {
allowable_clock_skew_ms: 0,
..Default::default()
},
);
let now_ms = 5_000_000_000_000;
let token = unsecured_token("admin", 1_000_000_000);
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
pending_token_expiry_ms: None,
};
let resp = handle_authenticate_oauthbearer(
&oauthbearer_client_response(&token),
&mut auth,
&validator,
now_ms,
None,
)
.await;
assert!(resp.error_code == 0);
assert!(&resp.auth_bytes[..] == br#"{"status":"invalid_token"}"#);
assert!(matches!(
auth,
ConnectionAuth::Negotiating {
exchange: SaslExchange::OAuthBearerFailed,
..
}
));
let dummy = SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from_static(&[1u8]),
..Default::default()
};
let resp2 =
handle_authenticate_oauthbearer(&dummy, &mut auth, &validator, now_ms, None).await;
assert!(resp2.error_code == SASL_AUTHENTICATION_FAILED);
assert!(!auth.is_authenticated());
}
#[tokio::test]
async fn oauthbearer_malformed_response_returns_error_json() {
let validator = crabka_security::OAuthBearerValidator::default();
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
pending_token_expiry_ms: None,
};
let req = SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from_static(b"not-a-valid-gs2-message"),
..Default::default()
};
let resp =
handle_authenticate_oauthbearer(&req, &mut auth, &validator, 1_000_000_000_000, None)
.await;
assert!(resp.error_code == 0);
assert!(&resp.auth_bytes[..] == br#"{"status":"invalid_token"}"#);
}
#[tokio::test]
async fn oauthbearer_authzid_mismatch_fails() {
let validator = crabka_security::OAuthBearerValidator::default();
let now_ms = 1_000_000_000_000;
let token = unsecured_token("alice", 1_000_000_900);
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
pending_token_expiry_ms: None,
};
let req = SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from(
format!("n,a=bob,\u{1}auth=Bearer {token}\u{1}\u{1}").into_bytes(),
),
..Default::default()
};
let resp = handle_authenticate_oauthbearer(&req, &mut auth, &validator, now_ms, None).await;
assert!(resp.error_code == 0);
assert!(&resp.auth_bytes[..] == br#"{"status":"invalid_token"}"#);
assert!(!auth.is_authenticated());
}
#[test]
fn authenticated_returns_principal() {
let a = ConnectionAuth::Authenticated {
principal: Principal {
name: "alice".into(),
auth_method: crabka_security::AuthMethod::SaslScramSha512,
groups: vec![],
},
mechanism: SaslMechanism::ScramSha512,
expires_at_ms: None,
authenticated_via_token: false,
};
assert!(a.is_authenticated());
let p = a.principal().expect("principal");
assert!(p.name == "alice");
assert!(p.auth_method == crabka_security::AuthMethod::SaslScramSha512);
}
#[test]
fn authenticated_state_carries_mechanism_and_expires_at_ms() {
let auth = ConnectionAuth::Authenticated {
principal: Principal {
name: "alice".to_string(),
auth_method: crabka_security::AuthMethod::SaslOAuthBearer,
groups: vec![],
},
mechanism: SaslMechanism::OAuthBearer,
expires_at_ms: Some(2_000_000),
authenticated_via_token: false,
};
match auth {
ConnectionAuth::Authenticated {
principal,
mechanism,
expires_at_ms,
authenticated_via_token: _,
} => {
assert!(principal.name == "alice");
assert!(mechanism == SaslMechanism::OAuthBearer);
assert!(expires_at_ms == Some(2_000_000));
}
_ => panic!("expected Authenticated"),
}
}
#[test]
fn handshake_from_authenticated_with_same_mechanism_transitions_to_reauthenticating() {
let mut auth = ConnectionAuth::Authenticated {
principal: Principal {
name: "alice".to_string(),
auth_method: crabka_security::AuthMethod::SaslOAuthBearer,
groups: vec![],
},
mechanism: SaslMechanism::OAuthBearer,
expires_at_ms: Some(2_000_000),
authenticated_via_token: false,
};
let req = SaslHandshakeRequest {
mechanism: "OAUTHBEARER".to_string(),
..Default::default()
};
let resp = handle_handshake(&req, &mut auth, &[SaslMechanism::OAuthBearer]);
assert!(resp.error_code == 0);
assert!(matches!(
auth,
ConnectionAuth::Reauthenticating {
previous: AuthenticatedSnapshot {
mechanism: SaslMechanism::OAuthBearer,
..
},
exchange: SaslExchange::OAuthBearer,
}
));
}
#[test]
fn handshake_from_authenticated_with_different_mechanism_rejected_with_illegal_sasl_state() {
let mut auth = ConnectionAuth::Authenticated {
principal: Principal {
name: "alice".to_string(),
auth_method: crabka_security::AuthMethod::SaslOAuthBearer,
groups: vec![],
},
mechanism: SaslMechanism::OAuthBearer,
expires_at_ms: Some(2_000_000),
authenticated_via_token: false,
};
let req = SaslHandshakeRequest {
mechanism: "SCRAM-SHA-512".to_string(),
..Default::default()
};
let resp = handle_handshake(
&req,
&mut auth,
&[SaslMechanism::OAuthBearer, SaslMechanism::ScramSha512],
);
assert!(resp.error_code == 34);
assert!(matches!(auth, ConnectionAuth::Authenticated { .. }));
}
#[tokio::test]
async fn authenticate_during_reauth_same_principal_transitions_back_to_authenticated() {
let validator = crabka_security::OAuthBearerValidator::default();
let now_ms = 1_000_000_000_000;
let new_token_exp_seconds: i64 = 1_000_000_900;
let new_token_exp_millis: i64 = new_token_exp_seconds * 1000;
let token = unsecured_token("alice", new_token_exp_seconds);
let mut auth = ConnectionAuth::Reauthenticating {
previous: AuthenticatedSnapshot {
principal: Principal {
name: "alice".to_string(),
auth_method: crabka_security::AuthMethod::SaslOAuthBearer,
groups: vec![],
},
mechanism: SaslMechanism::OAuthBearer,
expires_at_ms: Some(now_ms + 1_000), },
exchange: SaslExchange::OAuthBearer,
};
let resp = handle_authenticate_oauthbearer(
&oauthbearer_client_response(&token),
&mut auth,
&validator,
now_ms,
None,
)
.await;
assert!(resp.error_code == 0);
assert!(resp.session_lifetime_ms == new_token_exp_millis - now_ms);
assert!(matches!(
auth,
ConnectionAuth::Authenticated {
mechanism: SaslMechanism::OAuthBearer,
expires_at_ms: Some(_),
..
}
));
if let ConnectionAuth::Authenticated {
principal,
expires_at_ms,
..
} = &auth
{
assert!(principal.name == "alice");
assert!(*expires_at_ms == Some(new_token_exp_millis));
} else {
panic!("expected Authenticated");
}
}
#[tokio::test]
async fn authenticate_during_reauth_different_principal_rejected_with_sasl_auth_failed() {
let validator = crabka_security::OAuthBearerValidator::default();
let now_ms = 1_000_000_000_000;
let token = unsecured_token("bob", 1_000_000_900);
let mut auth = ConnectionAuth::Reauthenticating {
previous: AuthenticatedSnapshot {
principal: Principal {
name: "alice".to_string(),
auth_method: crabka_security::AuthMethod::SaslOAuthBearer,
groups: vec![],
},
mechanism: SaslMechanism::OAuthBearer,
expires_at_ms: Some(now_ms + 1_000),
},
exchange: SaslExchange::OAuthBearer,
};
let resp = handle_authenticate_oauthbearer(
&oauthbearer_client_response(&token),
&mut auth,
&validator,
now_ms,
None,
)
.await;
assert!(resp.error_code == SASL_AUTHENTICATION_FAILED);
assert!(
resp.error_message
.as_deref()
.unwrap_or("")
.contains("principal")
);
assert!(matches!(auth, ConnectionAuth::Reauthenticating { .. }));
}
#[test]
fn allows_request_during_reauthenticating_only_sasl_authenticate() {
let auth = ConnectionAuth::Reauthenticating {
previous: AuthenticatedSnapshot {
principal: Principal {
name: "alice".to_string(),
auth_method: crabka_security::AuthMethod::SaslOAuthBearer,
groups: vec![],
},
mechanism: SaslMechanism::OAuthBearer,
expires_at_ms: Some(2_000_000),
},
exchange: SaslExchange::OAuthBearer,
};
assert!(auth.allows_request(36)); assert!(!auth.allows_request(17)); assert!(!auth.allows_request(18)); assert!(!auth.allows_request(3)); }
#[test]
fn allows_request_anonymous_uses_pre_auth_allowlist() {
let auth = ConnectionAuth::Anonymous;
assert!(auth.allows_request(17));
assert!(auth.allows_request(36));
assert!(auth.allows_request(18));
assert!(!auth.allows_request(0));
assert!(!auth.allows_request(3));
}
#[test]
fn allows_request_authenticated_allows_all() {
let auth = ConnectionAuth::Authenticated {
principal: Principal {
name: "alice".into(),
auth_method: crabka_security::AuthMethod::SaslScramSha512,
groups: vec![],
},
mechanism: SaslMechanism::ScramSha512,
expires_at_ms: None,
authenticated_via_token: false,
};
assert!(auth.allows_request(0));
assert!(auth.allows_request(3));
assert!(auth.allows_request(17));
assert!(auth.allows_request(36));
}
#[tokio::test]
async fn handle_authenticate_oauthbearer_clamps_session_lifetime_when_cap_set_below_exp() {
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
pending_token_expiry_ms: None,
};
let validator = crabka_security::OAuthBearerValidator::Unsecured(
crabka_security::UnsecuredJwsValidator {
allowable_clock_skew_ms: 0,
..Default::default()
},
);
let now_ms = 1_000_000_i64;
let exp_ms = now_ms + 60_000; let token = unsecured_token("alice", exp_ms / 1000);
let req = oauthbearer_client_response(&token);
let resp = handle_authenticate_oauthbearer(
&req,
&mut auth,
&validator,
now_ms,
Some(30), )
.await;
assert!(resp.error_code == 0);
assert!(resp.session_lifetime_ms == 30_000);
match auth {
ConnectionAuth::Authenticated { expires_at_ms, .. } => {
assert!(
expires_at_ms == Some(now_ms + 30_000),
"expires_at_ms must reflect the clamped value (not raw token exp)"
);
}
_ => panic!("expected Authenticated"),
}
}
#[tokio::test]
async fn handle_authenticate_oauthbearer_no_clamp_when_cap_unset() {
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
pending_token_expiry_ms: None,
};
let validator = crabka_security::OAuthBearerValidator::Unsecured(
crabka_security::UnsecuredJwsValidator {
allowable_clock_skew_ms: 0,
..Default::default()
},
);
let now_ms = 1_000_000_i64;
let exp_ms = now_ms + 60_000;
let token = unsecured_token("alice", exp_ms / 1000);
let req = oauthbearer_client_response(&token);
let resp = handle_authenticate_oauthbearer(&req, &mut auth, &validator, now_ms, None).await;
assert!(resp.error_code == 0);
assert!(resp.session_lifetime_ms == 60_000);
match auth {
ConnectionAuth::Authenticated { expires_at_ms, .. } => {
assert!(expires_at_ms == Some(exp_ms), "unset cap = raw token exp");
}
_ => panic!("expected Authenticated"),
}
}
#[tokio::test]
async fn handle_authenticate_oauthbearer_no_clamp_when_cap_above_exp() {
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::OAuthBearer,
exchange: SaslExchange::OAuthBearer,
pending_token_expiry_ms: None,
};
let validator = crabka_security::OAuthBearerValidator::Unsecured(
crabka_security::UnsecuredJwsValidator {
allowable_clock_skew_ms: 0,
..Default::default()
},
);
let now_ms = 1_000_000_i64;
let exp_ms = now_ms + 60_000; let token = unsecured_token("alice", exp_ms / 1000);
let req = oauthbearer_client_response(&token);
let resp = handle_authenticate_oauthbearer(
&req,
&mut auth,
&validator,
now_ms,
Some(600), )
.await;
assert!(resp.error_code == 0);
assert!(
resp.session_lifetime_ms == 60_000,
"cap above exp = no effect"
);
match auth {
ConnectionAuth::Authenticated { expires_at_ms, .. } => {
assert!(
expires_at_ms == Some(exp_ms),
"cap above exp = raw token exp"
);
}
_ => panic!("expected Authenticated"),
}
}
mod token_scram_fallback {
use super::*;
use assert2::assert;
use crabka_metadata::{DelegationTokenRecord, MetadataRecord};
use crabka_security::scram::hash_scram_password_with_salt;
use crabka_security::{KafkaPrincipal, ScramClientExchange};
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
async fn test_controller(
log_dir: std::path::PathBuf,
) -> Arc<crabka_raft::ControllerHandle> {
let cfg = crabka_raft::ControllerConfig {
election_timeout: Duration::from_millis(200),
heartbeat_interval: Duration::from_millis(50),
client_id: "test".into(),
..crabka_raft::ControllerConfig::for_tests(1, log_dir)
};
let handle = Arc::new(crabka_raft::Controller::start(cfg).await.unwrap());
let mut rx = handle.watch_leader();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
while rx.borrow().is_none() {
assert!(std::time::Instant::now() < deadline, "no leader in 5s");
let _ = tokio::time::timeout(Duration::from_millis(100), rx.changed()).await;
}
handle
}
async fn append_token(
controller: &crabka_raft::ControllerHandle,
token_id: &str,
owner_name: &str,
hmac: Vec<u8>,
expiry_timestamp_ms: i64,
) {
let rec = MetadataRecord::V1DelegationToken(DelegationTokenRecord {
token_id: token_id.into(),
owner: KafkaPrincipal {
principal_type: "User".into(),
name: owner_name.into(),
},
hmac,
issue_timestamp_ms: 0,
expiry_timestamp_ms,
max_timestamp_ms: expiry_timestamp_ms,
renewers: vec![],
});
controller.submit_change(vec![rec]).await.unwrap();
}
fn drive_scram_to_done(
controller: &crabka_raft::ControllerHandle,
scram_username: &str,
password: &[u8],
mechanism: SaslMechanism,
) -> (ConnectionAuth, SaslAuthenticateResponse) {
let mut auth = ConnectionAuth::Negotiating {
mechanism,
exchange: SaslExchange::ScramPending,
pending_token_expiry_ms: None,
};
let mut client =
ScramClientExchange::new(scram_username.into(), password.to_vec(), mechanism);
let c1 = client.client_first().expect("client first");
let resp1 = handle_authenticate_scram(
&SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from(c1),
..Default::default()
},
&mut auth,
controller,
);
assert!(resp1.error_code == 0, "round 1 must succeed for happy path");
let c2 = client.step(&resp1.auth_bytes).expect("client final");
let resp2 = handle_authenticate_scram(
&SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from(c2),
..Default::default()
},
&mut auth,
controller,
);
(auth, resp2)
}
#[tokio::test]
async fn scram_sha256_falls_back_to_delegation_token_when_no_scram_user() {
let dir = TempDir::new().unwrap();
let controller = test_controller(dir.path().into()).await;
let hmac = vec![0xABu8; 32];
let expiry_ms = crate::time_util::now_ms() + 60_000;
append_token(&controller, "tok-uuid", "alice", hmac.clone(), expiry_ms).await;
let password = {
use base64::Engine;
base64::engine::general_purpose::STANDARD.encode(&hmac)
};
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::ScramSha256,
exchange: SaslExchange::ScramPending,
pending_token_expiry_ms: None,
};
let mut client = ScramClientExchange::new(
"tok-uuid".into(),
password.as_bytes().to_vec(),
SaslMechanism::ScramSha256,
);
let c1 = client.client_first().unwrap();
let resp1 = handle_authenticate_scram(
&SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from(c1),
..Default::default()
},
&mut auth,
&*controller,
);
assert!(
resp1.error_code == 0,
"round 1 must succeed: token-fallback synthesizes the credential"
);
match &auth {
ConnectionAuth::Negotiating {
pending_token_expiry_ms,
..
} => {
assert!(
*pending_token_expiry_ms == Some(expiry_ms),
"round 1 must thread the token expiry through"
);
}
other => panic!("expected Negotiating, got {other:?}"),
}
controller.cancel().await;
}
#[tokio::test]
async fn token_authed_connection_has_authenticated_via_token_true_and_owner_principal() {
let dir = TempDir::new().unwrap();
let controller = test_controller(dir.path().into()).await;
let hmac = vec![0x42u8; 32];
let expiry_ms = crate::time_util::now_ms() + 60_000;
append_token(&controller, "tok-xyz", "alice", hmac.clone(), expiry_ms).await;
let password = {
use base64::Engine;
base64::engine::general_purpose::STANDARD.encode(&hmac)
};
let (auth, resp2) = drive_scram_to_done(
&controller,
"tok-xyz",
password.as_bytes(),
SaslMechanism::ScramSha256,
);
assert!(resp2.error_code == 0, "round 2 must succeed");
match auth {
ConnectionAuth::Authenticated {
principal,
mechanism,
expires_at_ms,
authenticated_via_token,
} => {
assert!(
principal.name == "alice",
"principal is the token OWNER, not the tokenId"
);
assert!(mechanism == SaslMechanism::ScramSha256);
assert!(
expires_at_ms == Some(expiry_ms),
"expires_at_ms = token expiry (KIP-368 ceiling)"
);
assert!(
authenticated_via_token,
"token-fallback path must mark the session as token-authed"
);
}
other => panic!("expected Authenticated, got {other:?}"),
}
controller.cancel().await;
}
#[tokio::test]
async fn scram_sha256_token_fallback_does_not_fire_for_unknown_token_id() {
let dir = TempDir::new().unwrap();
let controller = test_controller(dir.path().into()).await;
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::ScramSha256,
exchange: SaslExchange::ScramPending,
pending_token_expiry_ms: None,
};
let mut client = ScramClientExchange::new(
"no-such-token".into(),
b"whatever".to_vec(),
SaslMechanism::ScramSha256,
);
let c1 = client.client_first().unwrap();
let resp = handle_authenticate_scram(
&SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from(c1),
..Default::default()
},
&mut auth,
&*controller,
);
assert!(
resp.error_code == SASL_AUTHENTICATION_FAILED,
"no SCRAM user + no token = unknown-user failure"
);
controller.cancel().await;
}
#[tokio::test]
async fn scram_sha512_does_not_fall_back_to_token() {
let dir = TempDir::new().unwrap();
let controller = test_controller(dir.path().into()).await;
let hmac = vec![0x55u8; 32];
let expiry_ms = crate::time_util::now_ms() + 60_000;
append_token(&controller, "tok-xyz", "alice", hmac, expiry_ms).await;
let mut auth = ConnectionAuth::Negotiating {
mechanism: SaslMechanism::ScramSha512,
exchange: SaslExchange::ScramPending,
pending_token_expiry_ms: None,
};
let mut client = ScramClientExchange::new(
"tok-xyz".into(),
b"whatever".to_vec(),
SaslMechanism::ScramSha512,
);
let c1 = client.client_first().unwrap();
let resp = handle_authenticate_scram(
&SaslAuthenticateRequest {
auth_bytes: bytes::Bytes::from(c1),
..Default::default()
},
&mut auth,
&*controller,
);
assert!(
resp.error_code == SASL_AUTHENTICATION_FAILED,
"SCRAM-SHA-512 must not consult the delegation-token table"
);
controller.cancel().await;
}
#[tokio::test]
async fn regular_scram_user_authentication_does_not_set_token_flag() {
let dir = TempDir::new().unwrap();
let controller = test_controller(dir.path().into()).await;
let salt = (0..16).collect::<Vec<u8>>();
let cred = hash_scram_password_with_salt(
b"alice-password",
SaslMechanism::ScramSha256,
4096,
salt.clone(),
);
let scram_rec =
MetadataRecord::V1ScramCredential(crabka_metadata::ScramCredentialRecord {
user: "alice".into(),
mechanism: SaslMechanism::ScramSha256,
salt,
stored_key: cred.stored_key.clone(),
server_key: cred.server_key.clone(),
iterations: cred.iterations,
});
controller.submit_change(vec![scram_rec]).await.unwrap();
let (auth, resp2) = drive_scram_to_done(
&controller,
"alice",
b"alice-password",
SaslMechanism::ScramSha256,
);
assert!(resp2.error_code == 0);
assert!(
resp2.session_lifetime_ms == 0,
"regular SCRAM has no session lifetime"
);
match auth {
ConnectionAuth::Authenticated {
principal,
expires_at_ms,
authenticated_via_token,
..
} => {
assert!(principal.name == "alice");
assert!(expires_at_ms == None);
assert!(
!authenticated_via_token,
"regular SCRAM is NOT a token-authed session"
);
}
other => panic!("expected Authenticated, got {other:?}"),
}
controller.cancel().await;
}
}
}