use std::sync::Arc;
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64URL;
use jsonpath_rust::parser::model::JpQuery;
use jsonpath_rust::query::js_path_process;
use serde_json::Value;
use crate::jwks::JwksHandle;
use crate::{AuthError, AuthMethod, Principal};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthOutcome {
pub principal: Principal,
pub expires_at_ms: Option<i64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClientInitialResponse {
pub token: String,
pub authzid: Option<String>,
}
pub fn parse_client_initial_response(bytes: &[u8]) -> Result<ClientInitialResponse, AuthError> {
let s = std::str::from_utf8(bytes).map_err(|_| AuthError::MalformedMessage)?;
let (gs2, rest) = s.split_once('\u{1}').ok_or(AuthError::MalformedMessage)?;
let authzid = parse_gs2_header(gs2)?;
let mut token = None;
for pair in rest.split('\u{1}') {
if pair.is_empty() {
continue;
}
let (key, value) = pair.split_once('=').ok_or(AuthError::MalformedMessage)?;
if key == "auth" {
let t = value
.strip_prefix("Bearer ")
.ok_or(AuthError::MalformedMessage)?;
token = Some(t.to_string());
}
}
let token = token.ok_or(AuthError::MalformedMessage)?;
Ok(ClientInitialResponse { token, authzid })
}
fn parse_gs2_header(gs2: &str) -> Result<Option<String>, AuthError> {
let rest = if let Some(r) = gs2.strip_prefix("n,") {
r
} else if let Some(r) = gs2.strip_prefix("y,") {
r
} else {
return Err(AuthError::MalformedMessage);
};
let authzid = rest.strip_suffix(',').ok_or(AuthError::MalformedMessage)?;
if authzid.is_empty() {
Ok(None)
} else {
Ok(Some(
authzid.strip_prefix("a=").unwrap_or(authzid).to_string(),
))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct UnsecuredJwsValidator {
pub principal_claim_name: String,
pub allowable_clock_skew_ms: i64,
pub custom_claim_check: Option<JpQuery>,
pub valid_token_type: Option<String>,
pub fallback_user_name_claim: Option<String>,
pub fallback_user_name_prefix: Option<String>,
pub groups_claim: Option<JpQuery>,
pub groups_claim_delimiter: Option<String>,
}
impl Default for UnsecuredJwsValidator {
fn default() -> Self {
Self {
principal_claim_name: "sub".to_string(),
allowable_clock_skew_ms: 30_000,
custom_claim_check: None,
valid_token_type: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
}
}
}
impl UnsecuredJwsValidator {
pub fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
let mut segs = token.split('.');
let header_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
let payload_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
let sig = segs.next().ok_or(AuthError::InvalidToken)?;
if segs.next().is_some() {
return Err(AuthError::InvalidToken);
}
if !sig.is_empty() {
return Err(AuthError::InvalidToken);
}
let header: Value = decode_json_segment(header_b64)?;
if header.get("alg").and_then(Value::as_str) != Some("none") {
return Err(AuthError::InvalidToken);
}
if let Some(expected_typ) = &self.valid_token_type
&& header.get("typ").and_then(Value::as_str) != Some(expected_typ.as_str())
{
return Err(AuthError::InvalidToken);
}
let claims: Value = decode_json_segment(payload_b64)?;
let exp_ms = numeric_date_ms(&claims, "exp").ok_or(AuthError::InvalidToken)?;
if exp_ms + self.allowable_clock_skew_ms <= now_ms {
return Err(AuthError::InvalidToken);
}
if let Some(iat_ms) = numeric_date_ms(&claims, "iat")
&& iat_ms - self.allowable_clock_skew_ms > now_ms
{
return Err(AuthError::InvalidToken);
}
if let Some(path) = &self.custom_claim_check
&& !evaluate_custom_claim_check(path, &claims)
{
return Err(AuthError::InvalidToken);
}
let (raw_name, used_fallback) = if let Some(n) = claims
.get(&self.principal_claim_name)
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
{
(n.to_string(), false)
} else {
let fallback_claim = self
.fallback_user_name_claim
.as_deref()
.ok_or(AuthError::InvalidToken)?;
let raw = claims
.get(fallback_claim)
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.ok_or(AuthError::InvalidToken)?;
(raw.to_string(), true)
};
let name = if used_fallback {
match &self.fallback_user_name_prefix {
Some(prefix) => format!("{prefix}{raw_name}"),
None => raw_name,
}
} else {
raw_name
};
let groups = match &self.groups_claim {
Some(path) => extract_groups(path, &claims, self.groups_claim_delimiter.as_deref()),
None => Vec::new(),
};
Ok(AuthOutcome {
principal: Principal {
name,
auth_method: AuthMethod::SaslOAuthBearer,
groups,
},
expires_at_ms: Some(exp_ms),
})
}
}
fn extract_groups(path: &JpQuery, claims: &Value, delimiter: Option<&str>) -> Vec<String> {
let Ok(refs) = js_path_process(path, claims) else {
return Vec::new();
};
let mut out = Vec::new();
for r in refs {
match r.val() {
Value::String(s) => match delimiter {
Some(d) => out.extend(
s.split(d)
.map(str::trim)
.filter(|s| !s.is_empty())
.map(String::from),
),
None => out.push(s.clone()),
},
Value::Array(items) => {
out.extend(items.iter().filter_map(Value::as_str).map(String::from));
}
_ => {} }
}
out
}
fn evaluate_custom_claim_check(path: &JpQuery, claims: &Value) -> bool {
let Ok(refs) = js_path_process(path, claims) else {
return false;
};
if refs.is_empty() {
return false;
}
for r in refs {
match r.val() {
Value::Null | Value::Bool(false) => return false,
_ => {}
}
}
true
}
fn audience_contains(claims: &Value, expected: &str) -> bool {
match claims.get("aud") {
Some(Value::String(s)) => s == expected,
Some(Value::Array(items)) => items
.iter()
.filter_map(Value::as_str)
.any(|a| a == expected),
_ => false,
}
}
fn decode_json_segment(seg: &str) -> Result<Value, AuthError> {
let bytes = B64URL.decode(seg).map_err(|_| AuthError::InvalidToken)?;
serde_json::from_slice(&bytes).map_err(|_| AuthError::InvalidToken)
}
fn numeric_date_ms(claims: &Value, key: &str) -> Option<i64> {
let v = claims.get(key)?;
if let Some(secs) = v.as_i64() {
return secs.checked_mul(1000);
}
let ms = v.as_f64()? * 1000.0;
if ms.is_finite() {
#[allow(clippy::cast_possible_truncation)]
Some(ms as i64)
} else {
None
}
}
#[must_use]
pub fn invalid_token_json() -> String {
"{\"status\":\"invalid_token\"}".to_string()
}
#[derive(Debug, Clone)]
pub struct SignedJwsValidator {
pub principal_claim_name: String,
pub allowable_clock_skew_ms: i64,
pub valid_issuer: Option<String>,
pub expected_audience: Option<String>,
pub custom_claim_check: Option<JpQuery>,
pub valid_token_type: Option<String>,
pub fallback_user_name_claim: Option<String>,
pub fallback_user_name_prefix: Option<String>,
pub groups_claim: Option<JpQuery>,
pub groups_claim_delimiter: Option<String>,
pub expiry_ms: Option<i64>,
keys: JwksHandle,
}
impl SignedJwsValidator {
#[must_use]
pub fn new(keys: JwksHandle) -> Self {
Self {
principal_claim_name: "sub".to_string(),
allowable_clock_skew_ms: 30_000,
valid_issuer: None,
expected_audience: None,
custom_claim_check: None,
valid_token_type: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
expiry_ms: None,
keys,
}
}
#[must_use]
pub fn key_handle(&self) -> JwksHandle {
self.keys.clone()
}
pub fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
let mut segs = token.split('.');
let header_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
let payload_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
let sig_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
if segs.next().is_some() || sig_b64.is_empty() {
return Err(AuthError::InvalidToken);
}
let header: Value = decode_json_segment(header_b64)?;
let alg = header
.get("alg")
.and_then(Value::as_str)
.ok_or(AuthError::InvalidToken)?;
if alg != "RS256" && alg != "ES256" {
return Err(AuthError::InvalidToken);
}
if let Some(expected_typ) = &self.valid_token_type
&& header.get("typ").and_then(Value::as_str) != Some(expected_typ.as_str())
{
return Err(AuthError::InvalidToken);
}
if let Some(expiry_ms) = self.expiry_ms {
let last_fetch = self.keys.last_successful_fetch_ms();
if last_fetch > 0 && now_ms.saturating_sub(last_fetch) > expiry_ms {
tracing::debug!(
last_fetch_ms = last_fetch,
now_ms,
expiry_ms,
"JWKS cache expired; rejecting token until next successful refresh",
);
return Err(AuthError::InvalidToken);
}
}
let kid = header.get("kid").and_then(Value::as_str);
let signing_input = format!("{header_b64}.{payload_b64}");
let sig = B64URL
.decode(sig_b64)
.map_err(|_| AuthError::InvalidToken)?;
if let Err(e) = self
.keys
.load()
.verify(kid, alg, signing_input.as_bytes(), &sig)
{
self.keys.signal_refresh();
return Err(e);
}
let claims: Value = decode_json_segment(payload_b64)?;
self.check_claims(&claims, now_ms)
}
fn check_claims(&self, claims: &Value, now_ms: i64) -> Result<AuthOutcome, AuthError> {
let exp_ms = numeric_date_ms(claims, "exp").ok_or(AuthError::InvalidToken)?;
if exp_ms + self.allowable_clock_skew_ms <= now_ms {
return Err(AuthError::InvalidToken);
}
if let Some(iat_ms) = numeric_date_ms(claims, "iat")
&& iat_ms - self.allowable_clock_skew_ms > now_ms
{
return Err(AuthError::InvalidToken);
}
if let Some(nbf_ms) = numeric_date_ms(claims, "nbf")
&& nbf_ms - self.allowable_clock_skew_ms > now_ms
{
return Err(AuthError::InvalidToken);
}
if let Some(expected) = &self.valid_issuer
&& claims.get("iss").and_then(Value::as_str) != Some(expected.as_str())
{
return Err(AuthError::InvalidToken);
}
if let Some(expected) = &self.expected_audience
&& !audience_contains(claims, expected)
{
return Err(AuthError::InvalidToken);
}
if let Some(path) = &self.custom_claim_check
&& !evaluate_custom_claim_check(path, claims)
{
return Err(AuthError::InvalidToken);
}
let (raw_name, used_fallback) = if let Some(n) = claims
.get(&self.principal_claim_name)
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
{
(n.to_string(), false)
} else {
let fallback_claim = self
.fallback_user_name_claim
.as_deref()
.ok_or(AuthError::InvalidToken)?;
let raw = claims
.get(fallback_claim)
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.ok_or(AuthError::InvalidToken)?;
(raw.to_string(), true)
};
let name = if used_fallback {
match &self.fallback_user_name_prefix {
Some(prefix) => format!("{prefix}{raw_name}"),
None => raw_name,
}
} else {
raw_name
};
let groups = match &self.groups_claim {
Some(path) => extract_groups(path, claims, self.groups_claim_delimiter.as_deref()),
None => Vec::new(),
};
Ok(AuthOutcome {
principal: Principal {
name,
auth_method: AuthMethod::SaslOAuthBearer,
groups,
},
expires_at_ms: Some(exp_ms),
})
}
}
#[derive(Debug, Clone)]
pub enum OAuthBearerValidator {
Unsecured(UnsecuredJwsValidator),
Signed(SignedJwsValidator),
Introspection(IntrospectionValidator),
}
impl Default for OAuthBearerValidator {
fn default() -> Self {
Self::Unsecured(UnsecuredJwsValidator::default())
}
}
impl OAuthBearerValidator {
pub async fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
match self {
Self::Unsecured(v) => v.validate(token, now_ms),
Self::Signed(v) => v.validate(token, now_ms),
Self::Introspection(v) => v.validate(token, now_ms).await,
}
}
#[must_use]
pub fn jwks_handle(&self) -> Option<JwksHandle> {
match self {
Self::Unsecured(_) | Self::Introspection(_) => None,
Self::Signed(v) => Some(v.key_handle()),
}
}
}
#[async_trait::async_trait]
pub trait IntrospectionClient: Send + Sync + std::fmt::Debug {
async fn introspect(&self, token: &str) -> Result<serde_json::Value, IntrospectionError>;
async fn userinfo(&self, token: &str) -> Result<Option<serde_json::Value>, IntrospectionError>;
}
#[derive(Debug, thiserror::Error)]
pub enum IntrospectionError {
#[error("transport: {0}")]
Transport(String),
#[error("non-2xx response: {0}")]
Status(u16),
#[error("invalid JSON body")]
Parse,
}
#[derive(Debug, Clone)]
pub struct IntrospectionValidator {
pub client: Arc<dyn IntrospectionClient>,
pub principal_claim_name: String,
pub custom_claim_check: Option<JpQuery>,
pub call_userinfo: bool,
pub allowable_clock_skew_ms: i64,
pub expected_audience: Option<String>,
pub fallback_user_name_claim: Option<String>,
pub fallback_user_name_prefix: Option<String>,
pub groups_claim: Option<JpQuery>,
pub groups_claim_delimiter: Option<String>,
}
impl IntrospectionValidator {
pub async fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
let mut claims = self
.client
.introspect(token)
.await
.map_err(|e| AuthError::IntrospectionTransport(e.to_string()))?;
if claims.get("active").and_then(Value::as_bool) != Some(true) {
return Err(AuthError::InvalidToken);
}
if let Some(expected) = &self.expected_audience
&& !audience_contains(&claims, expected)
{
return Err(AuthError::InvalidToken);
}
check_temporal_claims(&claims, now_ms, self.allowable_clock_skew_ms)?;
let exp_ms = numeric_date_ms(&claims, "exp").ok_or(AuthError::InvalidToken)?;
if self.call_userinfo
&& let Some(ui) = self
.client
.userinfo(token)
.await
.map_err(|e| AuthError::IntrospectionTransport(e.to_string()))?
{
merge_userinfo_over_introspection(&mut claims, ui);
}
if let Some(path) = &self.custom_claim_check
&& !evaluate_custom_claim_check(path, &claims)
{
return Err(AuthError::InvalidToken);
}
let (raw_name, used_fallback) = if let Some(n) = claims
.get(&self.principal_claim_name)
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
{
(n.to_string(), false)
} else {
let fallback_claim = self
.fallback_user_name_claim
.as_deref()
.ok_or(AuthError::InvalidToken)?;
let raw = claims
.get(fallback_claim)
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.ok_or(AuthError::InvalidToken)?;
(raw.to_string(), true)
};
let name = if used_fallback {
match &self.fallback_user_name_prefix {
Some(prefix) => format!("{prefix}{raw_name}"),
None => raw_name,
}
} else {
raw_name
};
let groups = match &self.groups_claim {
Some(path) => extract_groups(path, &claims, self.groups_claim_delimiter.as_deref()),
None => Vec::new(),
};
Ok(AuthOutcome {
principal: Principal {
name,
auth_method: AuthMethod::SaslOAuthBearer,
groups,
},
expires_at_ms: Some(exp_ms),
})
}
}
fn check_temporal_claims(claims: &Value, now_ms: i64, skew_ms: i64) -> Result<(), AuthError> {
if let Some(exp_s) = claims.get("exp").and_then(Value::as_i64) {
let exp_ms = exp_s.saturating_mul(1000);
if now_ms.saturating_sub(skew_ms) > exp_ms {
return Err(AuthError::InvalidToken);
}
}
if let Some(iat_s) = claims.get("iat").and_then(Value::as_i64) {
let iat_ms = iat_s.saturating_mul(1000);
if iat_ms.saturating_sub(skew_ms) > now_ms {
return Err(AuthError::InvalidToken);
}
}
if let Some(nbf_s) = claims.get("nbf").and_then(Value::as_i64) {
let nbf_ms = nbf_s.saturating_mul(1000);
if nbf_ms.saturating_sub(skew_ms) > now_ms {
return Err(AuthError::InvalidToken);
}
}
Ok(())
}
fn merge_userinfo_over_introspection(introspection: &mut Value, userinfo: Value) {
const RESERVED: &[&str] = &["active", "exp", "iat", "nbf", "scope", "client_id", "sub"];
let (Some(obj), Value::Object(ui_map)) = (introspection.as_object_mut(), userinfo) else {
return;
};
for (k, v) in ui_map {
if !RESERVED.contains(&k.as_str()) {
obj.insert(k, v);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use jsonpath_rust::parser::parse_json_path;
fn jws(header: &str, claims: &str) -> String {
format!(
"{}.{}.",
B64URL.encode(header.as_bytes()),
B64URL.encode(claims.as_bytes())
)
}
fn unsecured(sub: &str, iat_s: i64, exp_s: i64) -> String {
jws(
"{\"alg\":\"none\"}",
&format!("{{\"sub\":\"{sub}\",\"iat\":{iat_s},\"exp\":{exp_s}}}"),
)
}
fn make_unsecured_jws_with_header(
header: &serde_json::Value,
claims: &serde_json::Value,
) -> String {
format!(
"{}.{}.",
B64URL.encode(serde_json::to_vec(header).unwrap()),
B64URL.encode(serde_json::to_vec(claims).unwrap()),
)
}
fn make_unsecured_jws(claims: &serde_json::Value) -> String {
make_unsecured_jws_with_header(&serde_json::json!({"alg": "none"}), claims)
}
fn parse_jp(expr: &str) -> JpQuery {
parse_json_path(expr).expect("expression compiles")
}
fn client_resp(token: &str) -> Vec<u8> {
format!("n,,\u{1}auth=Bearer {token}\u{1}\u{1}").into_bytes()
}
#[test]
fn parse_happy_path_empty_authzid() {
let r = parse_client_initial_response(&client_resp("tok.en.")).unwrap();
assert!(r.token == "tok.en.");
assert!(r.authzid == None);
}
#[test]
fn parse_extracts_authzid_and_ignores_extra_kvpairs() {
let bytes =
b"n,a=alice,\x01host=example.com\x01auth=Bearer abc\x01port=443\x01\x01".to_vec();
let r = parse_client_initial_response(&bytes).unwrap();
assert!(r.token == "abc");
assert!(r.authzid == Some("alice".to_string()));
}
#[test]
fn parse_rejects_missing_auth_kvpair() {
let bytes = b"n,,\x01host=example.com\x01\x01".to_vec();
assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
}
#[test]
fn parse_rejects_missing_bearer_prefix() {
let bytes = b"n,,\x01auth=Basic abc\x01\x01".to_vec();
assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
}
#[test]
fn parse_rejects_bad_gs2_header() {
let bytes = b"z,,\x01auth=Bearer abc\x01\x01".to_vec();
assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
}
#[test]
fn validate_accepts_fresh_unsecured_token() {
let v = UnsecuredJwsValidator::default();
let now = 1_000_000_000_000;
let token = unsecured("admin", 999_999_000, 1_000_000_900); let outcome = v.validate(&token, now).unwrap();
assert!(outcome.principal.name == "admin");
assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
}
#[test]
fn unsecured_validate_surfaces_exp_in_auth_outcome() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = unsecured("alice", 999, exp_secs);
let v = UnsecuredJwsValidator::default();
let outcome = v.validate(&token, now_ms).expect("token valid");
assert!(outcome.principal.name == "alice");
assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
}
#[test]
fn validate_rejects_expired_token() {
let v = UnsecuredJwsValidator {
allowable_clock_skew_ms: 0,
..Default::default()
};
let now = 2_000_000_000_000;
let token = unsecured("admin", 1_000_000_000, 1_000_000_100);
assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
}
#[test]
fn validate_rejects_future_iat() {
let v = UnsecuredJwsValidator {
allowable_clock_skew_ms: 0,
..Default::default()
};
let now = 1_000_000_000_000;
let token = unsecured("admin", 5_000_000_000, 5_000_000_100);
assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
}
#[test]
fn validate_rejects_signed_token() {
let v = UnsecuredJwsValidator::default();
let now = 1_000_000_000_000;
let token = format!(
"{}.{}.{}",
B64URL.encode(b"{\"alg\":\"RS256\"}"),
B64URL.encode(b"{\"sub\":\"admin\",\"exp\":1000000900}"),
B64URL.encode(b"sig")
);
assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
}
#[test]
fn validate_rejects_missing_exp() {
let v = UnsecuredJwsValidator::default();
let token = jws("{\"alg\":\"none\"}", "{\"sub\":\"admin\"}");
assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn validate_rejects_missing_principal_claim() {
let v = UnsecuredJwsValidator::default();
let token = jws("{\"alg\":\"none\"}", "{\"exp\":5000000000}");
assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn unsecured_validate_rejects_when_custom_claim_check_fails() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws(&serde_json::json!({
"sub": "alice",
"exp": exp_secs,
"scope": ["kafka.read"],
}));
let v = UnsecuredJwsValidator {
custom_claim_check: Some(parse_jp("$.scope[?@ == 'kafka.admin']")),
..Default::default()
};
let result = v.validate(&token, now_ms);
assert!(result.unwrap_err() == AuthError::InvalidToken);
}
#[test]
fn unsecured_validate_accepts_when_custom_claim_check_passes() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws(&serde_json::json!({
"sub": "alice",
"exp": exp_secs,
"scope": ["kafka.admin", "kafka.read"],
}));
let v = UnsecuredJwsValidator {
custom_claim_check: Some(parse_jp("$.scope[?@ == 'kafka.admin']")),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid token");
assert!(outcome.principal.name == "alice");
}
#[test]
fn unsecured_validate_rejects_when_valid_token_type_mismatch() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "OPAQUE"}),
&serde_json::json!({"sub": "alice", "exp": exp_secs}),
);
let v = UnsecuredJwsValidator {
valid_token_type: Some("JWT".into()),
..Default::default()
};
let result = v.validate(&token, now_ms);
assert!(result.unwrap_err() == AuthError::InvalidToken);
}
#[test]
fn unsecured_validate_accepts_when_valid_token_type_match() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({"sub": "alice", "exp": exp_secs}),
);
let v = UnsecuredJwsValidator {
valid_token_type: Some("JWT".into()),
..Default::default()
};
assert!(v.validate(&token, now_ms).is_ok());
}
#[test]
fn unsecured_validate_accepts_when_valid_token_type_unset_regardless_of_header() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "OPAQUE"}),
&serde_json::json!({"sub": "alice", "exp": exp_secs}),
);
let v = UnsecuredJwsValidator::default();
assert!(v.validate(&token, now_ms).is_ok());
}
#[test]
fn unsecured_validate_uses_primary_principal_claim_when_present() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({"sub": "alice", "exp": exp_secs}),
);
let v = UnsecuredJwsValidator {
fallback_user_name_claim: Some("client_id".into()),
fallback_user_name_prefix: Some("service-account-".into()),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.name == "alice"); }
#[test]
fn unsecured_validate_falls_back_to_alt_claim_when_primary_absent() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({"client_id": "svc1", "exp": exp_secs}),
);
let v = UnsecuredJwsValidator {
fallback_user_name_claim: Some("client_id".into()),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.name == "svc1"); }
#[test]
fn unsecured_validate_applies_fallback_prefix_only_on_fallback() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({"client_id": "svc1", "exp": exp_secs}),
);
let v = UnsecuredJwsValidator {
fallback_user_name_claim: Some("client_id".into()),
fallback_user_name_prefix: Some("service-account-".into()),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.name == "service-account-svc1");
}
#[test]
fn unsecured_validate_rejects_when_neither_primary_nor_fallback_present() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({"exp": exp_secs}),
);
let v = UnsecuredJwsValidator {
fallback_user_name_claim: Some("client_id".into()),
..Default::default()
};
assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
}
#[test]
fn unsecured_validate_extracts_groups_from_array_claim() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({
"sub": "alice",
"exp": exp_secs,
"groups": ["admin", "ops"],
}),
);
let v = UnsecuredJwsValidator {
groups_claim: Some(parse_jp("$.groups")),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
}
#[test]
fn unsecured_validate_extracts_groups_from_delimited_string() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({
"sub": "alice",
"exp": exp_secs,
"groups": "admin,ops, kafka",
}),
);
let v = UnsecuredJwsValidator {
groups_claim: Some(parse_jp("$.groups")),
groups_claim_delimiter: Some(",".into()),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(
outcome.principal.groups
== vec!["admin".to_string(), "ops".to_string(), "kafka".to_string()]
);
}
#[test]
fn unsecured_validate_extracts_groups_from_nested_claim_via_jsonpath() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({
"sub": "alice",
"exp": exp_secs,
"realm_access": { "roles": ["admin", "ops"] },
}),
);
let v = UnsecuredJwsValidator {
groups_claim: Some(parse_jp("$.realm_access.roles[*]")),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
}
#[test]
fn unsecured_validate_returns_empty_groups_when_claim_unset() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({
"sub": "alice",
"exp": exp_secs,
"groups": ["admin"],
}),
);
let v = UnsecuredJwsValidator::default(); let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.groups == Vec::<String>::new());
}
#[test]
fn unsecured_validate_returns_empty_groups_when_claim_resolves_to_empty() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let token = make_unsecured_jws_with_header(
&serde_json::json!({"alg": "none", "typ": "JWT"}),
&serde_json::json!({
"sub": "alice",
"exp": exp_secs,
}),
);
let v = UnsecuredJwsValidator {
groups_claim: Some(parse_jp("$.nonexistent")),
..Default::default()
};
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.groups == Vec::<String>::new());
}
#[test]
fn validate_custom_principal_claim() {
let v = UnsecuredJwsValidator {
principal_claim_name: "client_id".to_string(),
..Default::default()
};
let token = jws(
"{\"alg\":\"none\"}",
"{\"client_id\":\"svc-1\",\"exp\":5000000000}",
);
let outcome = v.validate(&token, 1_000_000_000_000).unwrap();
assert!(outcome.principal.name == "svc-1");
}
#[test]
fn invalid_token_json_is_rfc7628_shape() {
assert!(invalid_token_json() == "{\"status\":\"invalid_token\"}");
}
use crate::jwks::{Jwks, JwksHandle, mint_es256, mint_rs256, mint_rs256_with_header};
fn signed(jwks_json: &str) -> (SignedJwsValidator, JwksHandle) {
let handle = JwksHandle::new(Jwks::from_json(jwks_json, false).unwrap());
(SignedJwsValidator::new(handle.clone()), handle)
}
#[test]
fn signed_accepts_fresh_rs256_token() {
let (token, jwks) = mint_rs256("k1", "{\"sub\":\"admin\",\"exp\":9999999999}");
let (v, _h) = signed(&jwks);
let outcome = v.validate(&token, 1_000_000_000_000).unwrap();
assert!(outcome.principal.name == "admin");
assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
}
#[test]
fn signed_validate_surfaces_exp_in_auth_outcome() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
let (v, _h) = signed(&jwks);
let outcome = v.validate(&token, now_ms).expect("token valid");
assert!(outcome.principal.name == "alice");
assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
}
#[test]
fn signed_rejects_unsecured_alg_none() {
let (_token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
let (v, _h) = signed(&jwks);
let unsecured = jws("{\"alg\":\"none\"}", "{\"sub\":\"a\",\"exp\":9999999999}");
assert!(v.validate(&unsecured, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_rejects_expired() {
let (token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":1000}");
let (mut v, _h) = signed(&jwks);
v.allowable_clock_skew_ms = 0;
assert!(v.validate(&token, 5_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_rejects_future_nbf() {
let (token, jwks) = mint_rs256(
"k1",
"{\"sub\":\"a\",\"exp\":9999999999,\"nbf\":5000000000}",
);
let (mut v, _h) = signed(&jwks);
v.allowable_clock_skew_ms = 0;
assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_honors_issuer() {
let (token, jwks) = mint_rs256(
"k1",
"{\"sub\":\"a\",\"exp\":9999999999,\"iss\":\"https://idp\"}",
);
let (mut v, _h) = signed(&jwks);
v.valid_issuer = Some("https://idp".to_string());
assert!(v.validate(&token, 1_000_000_000_000).is_ok());
v.valid_issuer = Some("https://other".to_string());
assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_rejects_missing_issuer_when_required() {
let (token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
let (mut v, _h) = signed(&jwks);
v.valid_issuer = Some("https://idp".to_string());
assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_honors_audience_string_and_array() {
let (tok_str, jwks) =
mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":\"kafka\"}");
let (mut v, _h) = signed(&jwks);
v.expected_audience = Some("kafka".to_string());
assert!(v.validate(&tok_str, 1_000_000_000_000).is_ok());
let (tok_arr, jwks2) = mint_rs256(
"k1",
"{\"sub\":\"a\",\"exp\":9999999999,\"aud\":[\"other\",\"kafka\"]}",
);
let (mut v2, _h2) = signed(&jwks2);
v2.expected_audience = Some("kafka".to_string());
assert!(v2.validate(&tok_arr, 1_000_000_000_000).is_ok());
let (tok_bad, jwks3) =
mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":\"web\"}");
let (mut v3, _h3) = signed(&jwks3);
v3.expected_audience = Some("kafka".to_string());
assert!(v3.validate(&tok_bad, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_validate_rejects_when_custom_claim_check_fails() {
let (token, jwks) = mint_rs256(
"k1",
"{\"sub\":\"alice\",\"exp\":9999999999,\"scope\":[\"kafka.read\"]}",
);
let (mut v, _h) = signed(&jwks);
v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
let result = v.validate(&token, 1_000_000_000_000);
assert!(result.unwrap_err() == AuthError::InvalidToken);
}
#[test]
fn signed_validate_accepts_when_custom_claim_check_passes() {
let (token, jwks) = mint_rs256(
"k1",
"{\"sub\":\"alice\",\"exp\":9999999999,\"scope\":[\"kafka.admin\",\"kafka.read\"]}",
);
let (mut v, _h) = signed(&jwks);
v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
let outcome = v.validate(&token, 1_000_000_000_000).expect("valid token");
assert!(outcome.principal.name == "alice");
}
#[test]
fn signed_validate_rejects_when_valid_token_type_mismatch() {
let (token, jwks) = mint_rs256_with_header(
"{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"OPAQUE\"}",
"{\"sub\":\"alice\",\"exp\":9999999999}",
);
let (mut v, _h) = signed(&jwks);
v.valid_token_type = Some("JWT".into());
let result = v.validate(&token, 1_000_000_000_000);
assert!(result.unwrap_err() == AuthError::InvalidToken);
}
#[test]
fn signed_validate_accepts_when_valid_token_type_match() {
let (token, jwks) = mint_rs256_with_header(
"{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"JWT\"}",
"{\"sub\":\"alice\",\"exp\":9999999999}",
);
let (mut v, _h) = signed(&jwks);
v.valid_token_type = Some("JWT".into());
assert!(v.validate(&token, 1_000_000_000_000).is_ok());
}
#[test]
fn signed_validate_accepts_when_valid_token_type_unset_regardless_of_header() {
let (token, jwks) = mint_rs256_with_header(
"{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"OPAQUE\"}",
"{\"sub\":\"alice\",\"exp\":9999999999}",
);
let (v, _h) = signed(&jwks);
assert!(v.validate(&token, 1_000_000_000_000).is_ok());
}
#[test]
fn signed_rejects_missing_principal() {
let (token, jwks) = mint_rs256("k1", "{\"exp\":9999999999}");
let (v, _h) = signed(&jwks);
assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_custom_principal_claim() {
let (token, jwks) = mint_rs256("k1", "{\"client_id\":\"svc-1\",\"exp\":9999999999}");
let (mut v, _h) = signed(&jwks);
v.principal_claim_name = "client_id".to_string();
assert!(
v.validate(&token, 1_000_000_000_000)
.unwrap()
.principal
.name
== "svc-1"
);
}
#[test]
fn signed_key_rotation_via_handle() {
let (token_a, jwks_a) = mint_es256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
let (v, handle) = signed(&jwks_a);
assert!(v.validate(&token_a, 1_000_000_000_000).is_ok());
let (token_b, jwks_b) = mint_es256("k1", "{\"sub\":\"b\",\"exp\":9999999999}");
handle.store(Jwks::from_json(&jwks_b, false).unwrap());
assert!(v.validate(&token_a, 1_000_000_000_000) == Err(AuthError::InvalidToken));
assert!(
v.validate(&token_b, 1_000_000_000_000)
.unwrap()
.principal
.name
== "b"
);
}
#[test]
fn signed_rejects_when_keyset_empty() {
let (token, _jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
let v = SignedJwsValidator::new(JwksHandle::default());
assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_validate_falls_back_to_alt_claim_when_primary_absent() {
let (token, jwks) = mint_rs256_with_header(
"{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"JWT\"}",
"{\"client_id\":\"svc1\",\"exp\":9999999999,\"iss\":\"https://test.example\"}",
);
let (mut v, _h) = signed(&jwks);
v.fallback_user_name_claim = Some("client_id".into());
v.fallback_user_name_prefix = Some("service-account-".into());
let outcome = v.validate(&token, 1_000_000_000_000).expect("valid");
assert!(outcome.principal.name == "service-account-svc1");
}
#[test]
fn signed_validate_extracts_groups_from_array_claim() {
let (token, jwks) = mint_rs256(
"k1",
"{\"sub\":\"alice\",\"exp\":9999999999,\"groups\":[\"admin\",\"ops\"]}",
);
let (mut v, _h) = signed(&jwks);
v.groups_claim = Some(parse_jp("$.groups"));
let outcome = v.validate(&token, 1_000_000_000_000).expect("valid");
assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
}
fn signed_with_handles(
jwks_json: &str,
last_successful_fetch_ms: i64,
) -> (SignedJwsValidator, tokio::sync::mpsc::Receiver<()>) {
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
let ts = Arc::new(AtomicI64::new(last_successful_fetch_ms));
let handle = JwksHandle::new_with_refresher_handles(
Jwks::from_json(jwks_json, false).unwrap(),
ts,
tx,
);
(SignedJwsValidator::new(handle), rx)
}
#[test]
fn signed_validate_rejects_when_jwks_cache_expired() {
let now_ms: i64 = 10_000_000;
let exp_secs: i64 = (now_ms / 1000) + 60;
let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
let (mut v, _rx) = signed_with_handles(&jwks, now_ms - 2_000);
v.expiry_ms = Some(1_000);
assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
}
#[test]
fn signed_validate_accepts_when_jwks_cache_within_expiry() {
let now_ms: i64 = 10_000_000;
let exp_secs: i64 = (now_ms / 1000) + 60;
let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
let (mut v, _rx) = signed_with_handles(&jwks, now_ms - 500);
v.expiry_ms = Some(1_000);
let outcome = v.validate(&token, now_ms).expect("valid");
assert!(outcome.principal.name == "alice");
}
#[test]
fn signed_validate_accepts_when_expiry_unset_regardless_of_cache_age() {
let now_ms: i64 = 10_000_000;
let exp_secs: i64 = (now_ms / 1000) + 60;
let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
let (v, _rx) = signed_with_handles(&jwks, now_ms - 999_999_999);
assert!(v.validate(&token, now_ms).is_ok());
}
#[test]
fn signed_validate_skips_expiry_check_when_never_fetched() {
let now_ms: i64 = 10_000_000;
let exp_secs: i64 = (now_ms / 1000) + 60;
let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
let (mut v, _rx) = signed_with_handles(&jwks, 0);
v.expiry_ms = Some(1);
assert!(v.validate(&token, now_ms).is_ok());
}
#[test]
fn signed_validate_signals_refresh_on_unknown_kid() {
let now_ms: i64 = 10_000_000;
let exp_secs: i64 = (now_ms / 1000) + 60;
let (token, _jwks_with_k1) =
mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
let mismatched_jwks =
r#"{"keys":[{"kty":"RSA","kid":"other","n":"AQAB","e":"AQAB"}]}"#.to_string();
let (v, mut rx) = signed_with_handles(&mismatched_jwks, now_ms);
assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
assert!(
rx.try_recv().is_ok(),
"validator should signal refresh on verify failure",
);
}
#[test]
fn signed_validate_does_not_signal_when_verification_succeeds() {
let now_ms: i64 = 10_000_000;
let exp_secs: i64 = (now_ms / 1000) + 60;
let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
let (v, mut rx) = signed_with_handles(&jwks, now_ms);
assert!(v.validate(&token, now_ms).is_ok());
assert!(
rx.try_recv().is_err(),
"happy-path verification should not signal a refresh",
);
}
#[tokio::test]
async fn enum_dispatches_unsecured_and_signed() {
let unsecured = OAuthBearerValidator::default();
assert!(unsecured.jwks_handle().is_none());
let tok = unsecured_token("admin", 999_999_000, 9_999_999_999);
assert!(unsecured.validate(&tok, 1_000_000_000_000).await.is_ok());
let (token, jwks) = mint_rs256("k1", "{\"sub\":\"x\",\"exp\":9999999999}");
let (sv, _h) = signed(&jwks);
let signed_enum = OAuthBearerValidator::Signed(sv);
assert!(signed_enum.jwks_handle().is_some());
assert!(
signed_enum
.validate(&token, 1_000_000_000_000)
.await
.unwrap()
.principal
.name
== "x"
);
}
fn unsecured_token(sub: &str, iat_s: i64, exp_s: i64) -> String {
jws(
"{\"alg\":\"none\"}",
&format!("{{\"sub\":\"{sub}\",\"iat\":{iat_s},\"exp\":{exp_s}}}"),
)
}
#[test]
fn custom_claim_check_compile_error_at_validator_construction() {
let result = parse_json_path("@.unterminated");
assert!(result.is_err(), "malformed expression must fail to parse");
}
}
#[cfg(test)]
mod introspection_tests {
use super::*;
use crate::{AuthError, AuthMethod};
use assert2::assert;
use jsonpath_rust::parser::parse_json_path;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::sync::Mutex;
#[derive(Debug, Default)]
struct MockIntrospectionClient {
introspect_responses: Mutex<HashMap<String, Result<Value, IntrospectionError>>>,
userinfo_responses: Mutex<HashMap<String, Result<Option<Value>, IntrospectionError>>>,
}
impl MockIntrospectionClient {
fn arc() -> Arc<Self> {
Arc::new(Self::default())
}
fn set_introspect(&self, token: &str, resp: Result<Value, IntrospectionError>) {
self.introspect_responses
.lock()
.unwrap()
.insert(token.into(), resp);
}
fn set_userinfo(&self, token: &str, resp: Result<Option<Value>, IntrospectionError>) {
self.userinfo_responses
.lock()
.unwrap()
.insert(token.into(), resp);
}
}
#[async_trait::async_trait]
impl IntrospectionClient for MockIntrospectionClient {
async fn introspect(&self, token: &str) -> Result<Value, IntrospectionError> {
self.introspect_responses
.lock()
.unwrap()
.remove(token)
.unwrap_or(Err(IntrospectionError::Transport(
"no canned response".into(),
)))
}
async fn userinfo(&self, token: &str) -> Result<Option<Value>, IntrospectionError> {
self.userinfo_responses
.lock()
.unwrap()
.remove(token)
.unwrap_or(Ok(None))
}
}
fn validator(client: Arc<MockIntrospectionClient>) -> IntrospectionValidator {
IntrospectionValidator {
client,
principal_claim_name: "sub".into(),
custom_claim_check: None,
call_userinfo: false,
allowable_clock_skew_ms: 30_000,
expected_audience: None,
fallback_user_name_claim: None,
fallback_user_name_prefix: None,
groups_claim: None,
groups_claim_delimiter: None,
}
}
fn parse_jp(expr: &str) -> JpQuery {
parse_json_path(expr).expect("expression compiles")
}
const NOW_MS: i64 = 1_700_000_000_000;
#[tokio::test]
async fn introspection_active_true_with_principal_returns_ok() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
);
let v = validator(mock.clone());
let outcome = v.validate("tok", NOW_MS).await.unwrap();
assert!(outcome.principal.name == "alice");
assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
}
#[tokio::test]
async fn introspection_active_false_rejected() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect("tok", Ok(json!({"active": false})));
let v = validator(mock.clone());
assert!(matches!(
v.validate("tok", NOW_MS).await,
Err(AuthError::InvalidToken)
));
}
#[tokio::test]
async fn introspection_missing_active_field_rejected() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect("tok", Ok(json!({"sub": "alice"})));
let v = validator(mock.clone());
assert!(matches!(
v.validate("tok", NOW_MS).await,
Err(AuthError::InvalidToken)
));
}
#[tokio::test]
async fn introspection_expired_exp_rejected() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 - 3600})),
);
let v = validator(mock.clone());
assert!(matches!(
v.validate("tok", NOW_MS).await,
Err(AuthError::InvalidToken)
));
}
#[tokio::test]
async fn introspection_validate_rejects_when_custom_claim_check_fails() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({
"active": true,
"sub": "alice",
"exp": NOW_MS/1000 + 60,
"scope": ["kafka.read"],
})),
);
let mut v = validator(mock.clone());
v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
let result = v.validate("tok", NOW_MS).await;
assert!(result.unwrap_err() == AuthError::InvalidToken);
}
#[tokio::test]
async fn introspection_validate_accepts_when_custom_claim_check_passes() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({
"active": true,
"sub": "alice",
"exp": NOW_MS/1000 + 60,
"scope": ["kafka.admin", "kafka.read"],
})),
);
let mut v = validator(mock.clone());
v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
let outcome = v.validate("tok", NOW_MS).await.expect("valid");
assert!(outcome.principal.name == "alice");
}
#[tokio::test]
async fn introspection_honors_audience_string_and_array() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "kafka"})),
);
let mut v = validator(mock.clone());
v.expected_audience = Some("kafka".to_string());
assert!(v.validate("tok", NOW_MS).await.is_ok());
let mock2 = MockIntrospectionClient::arc();
mock2.set_introspect(
"tok",
Ok(json!({
"active": true, "sub": "a", "exp": NOW_MS/1000 + 60,
"aud": ["other", "kafka"],
})),
);
let mut v2 = validator(mock2.clone());
v2.expected_audience = Some("kafka".to_string());
assert!(v2.validate("tok", NOW_MS).await.is_ok());
}
#[tokio::test]
async fn introspection_rejects_non_matching_audience() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "web"})),
);
let mut v = validator(mock.clone());
v.expected_audience = Some("kafka".to_string());
assert!(v.validate("tok", NOW_MS).await == Err(AuthError::InvalidToken));
}
#[tokio::test]
async fn introspection_rejects_missing_audience_when_expected() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60})),
);
let mut v = validator(mock.clone());
v.expected_audience = Some("kafka".to_string());
assert!(v.validate("tok", NOW_MS).await == Err(AuthError::InvalidToken));
}
#[tokio::test]
async fn introspection_ignores_audience_when_unset() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "web"})),
);
let v = validator(mock.clone());
assert!(v.validate("tok", NOW_MS).await.is_ok());
}
#[tokio::test]
async fn introspection_validate_does_not_check_valid_token_type() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({
"active": true,
"sub": "alice",
"exp": NOW_MS/1000 + 60,
})),
);
let v = validator(mock.clone());
let outcome = v.validate("tok", NOW_MS).await.expect("valid");
assert!(outcome.principal.name == "alice");
}
#[tokio::test]
async fn introspection_userinfo_claims_override_introspection_for_profile_keys() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({
"active": true,
"sub": "alice",
"exp": NOW_MS/1000 + 60,
"preferred_username": "intros-name",
})),
);
mock.set_userinfo(
"tok",
Ok(Some(
json!({"preferred_username": "userinfo-name", "email": "a@b.c"}),
)),
);
let mut v = validator(mock.clone());
v.call_userinfo = true;
v.principal_claim_name = "preferred_username".into();
let outcome = v.validate("tok", NOW_MS).await.unwrap();
assert!(outcome.principal.name == "userinfo-name");
}
#[tokio::test]
async fn introspection_userinfo_does_not_override_authorization_keys() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
);
mock.set_userinfo("tok", Ok(Some(json!({"active": false, "sub": "mallory"}))));
let mut v = validator(mock.clone());
v.call_userinfo = true;
let outcome = v.validate("tok", NOW_MS).await.unwrap();
assert!(
outcome.principal.name == "alice",
"sub from introspection wins over userinfo"
);
}
#[tokio::test]
async fn introspection_userinfo_disabled_when_call_userinfo_false() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
);
mock.set_userinfo("tok", Ok(Some(json!({"preferred_username": "ignored"}))));
let v = validator(mock.clone()); let outcome = v.validate("tok", NOW_MS).await.unwrap();
assert!(outcome.principal.name == "alice");
}
#[tokio::test]
async fn introspection_transport_error_becomes_introspection_transport() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Err(IntrospectionError::Transport("connection refused".into())),
);
let v = validator(mock.clone());
let err = v.validate("tok", NOW_MS).await.unwrap_err();
assert!(
matches!(err, AuthError::IntrospectionTransport(ref msg) if msg.contains("connection refused")),
"got {err:?}",
);
}
#[tokio::test]
async fn introspection_default_principal_claim_sub() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "sub-name", "exp": NOW_MS/1000 + 60})),
);
let v = validator(mock.clone());
assert!(v.validate("tok", NOW_MS).await.unwrap().principal.name == "sub-name");
}
#[tokio::test]
async fn introspection_custom_principal_claim_client_id() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({
"active": true,
"sub": "sub-name",
"exp": NOW_MS/1000 + 60,
"client_id": "my-client",
})),
);
let mut v = validator(mock.clone());
v.principal_claim_name = "client_id".into();
assert!(v.validate("tok", NOW_MS).await.unwrap().principal.name == "my-client");
}
#[tokio::test]
async fn enum_dispatch_introspection_async() {
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"tok",
Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
);
let v = validator(mock.clone());
let enum_v = OAuthBearerValidator::Introspection(v);
let outcome = enum_v.validate("tok", NOW_MS).await.unwrap();
assert!(outcome.principal.name == "alice");
}
#[tokio::test]
async fn introspection_validate_surfaces_exp_from_introspection_response() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"opaque-token",
Ok(json!({
"active": true,
"sub": "alice",
"exp": exp_secs,
"scope": "kafka.write",
})),
);
let v = validator(mock.clone());
let outcome = v
.validate("opaque-token", now_ms)
.await
.expect("token valid");
assert!(outcome.principal.name == "alice");
assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
}
#[tokio::test]
async fn introspection_validate_extracts_groups_from_introspection_response() {
let exp_secs: i64 = 2_000;
let now_ms: i64 = 1_000_000;
let mock = MockIntrospectionClient::arc();
mock.set_introspect(
"opaque-token",
Ok(json!({
"active": true,
"sub": "alice",
"exp": exp_secs,
"groups": ["admin", "ops"],
})),
);
let mut v = validator(mock.clone());
v.groups_claim = Some(parse_jp("$.groups"));
let outcome = v.validate("opaque-token", now_ms).await.expect("valid");
assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
}
}