Skip to main content

crabka_security/
oauthbearer.rs

1//! SASL/OAUTHBEARER (KIP-255 / RFC 7628) — pure logic.
2//!
3//! Two pieces, both I/O-free so the broker can unit-test them without a
4//! socket:
5//!
6//! 1. [`parse_client_initial_response`] — decode the RFC 7628 client initial
7//!    response (`n,,\x01auth=Bearer <token>\x01\x01`) into its bearer token
8//!    and optional authzid.
9//! 2. [`UnsecuredJwsValidator`] — validate the bearer token as an *unsecured*
10//!    JWS (`alg: none`) and extract the connection principal from a claim.
11//!    This mirrors Kafka's `OAuthBearerUnsecuredValidatorCallbackHandler`,
12//!    the built-in development/testing validator. Signed-token (JWKS)
13//!    validation is handled separately.
14
15use std::sync::Arc;
16
17use base64::Engine;
18use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64URL;
19use jsonpath_rust::parser::model::JpQuery;
20use jsonpath_rust::query::js_path_process;
21use serde_json::Value;
22
23use crate::jwks::JwksHandle;
24use crate::{AuthError, AuthMethod, Principal};
25
26/// Outcome of an OAUTHBEARER validation: the authenticated principal plus the
27/// token's expiry. The expiry populates
28/// `SaslAuthenticateResponse.session_lifetime_ms` and what the dispatch loop
29/// uses to schedule per-connection re-auth deadlines (KIP-368).
30#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct AuthOutcome {
32    pub principal: Principal,
33    /// Token expiry as Unix epoch milliseconds. `None` means "no expiry / no
34    /// re-auth required" — reserved for future non-OAuth paths. For
35    /// OAUTHBEARER this is always `Some` (validators reject tokens without
36    /// `exp`).
37    pub expires_at_ms: Option<i64>,
38}
39
40/// Parsed RFC 7628 client initial response.
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct ClientInitialResponse {
43    /// The bearer token (the value after `auth=Bearer `).
44    pub token: String,
45    /// The GS2 authorization id, if the client supplied a non-empty one.
46    pub authzid: Option<String>,
47}
48
49/// Parse the SASL/OAUTHBEARER client initial response (RFC 7628 §3.1).
50///
51/// Wire shape (`^A` = `\x01`):
52/// `gs2-header ^A auth=Bearer <token> [^A key=value ...] ^A ^A`
53///
54/// The GS2 header is `gs2-cb-flag "," [authzid] ","` — for OAUTHBEARER over a
55/// TLS / plaintext listener the channel-binding flag is `n` (none). We accept
56/// `n` / `y` and ignore everything else in the header except the authzid.
57/// Non-`auth` kvpairs (host / port / extensions) are ignored.
58///
59/// # Errors
60///
61/// [`AuthError::MalformedMessage`] when the GS2 header or the `auth=Bearer`
62/// kvpair is missing or malformed.
63pub fn parse_client_initial_response(bytes: &[u8]) -> Result<ClientInitialResponse, AuthError> {
64    let s = std::str::from_utf8(bytes).map_err(|_| AuthError::MalformedMessage)?;
65
66    // Split off the GS2 header at the first kvsep.
67    let (gs2, rest) = s.split_once('\u{1}').ok_or(AuthError::MalformedMessage)?;
68    let authzid = parse_gs2_header(gs2)?;
69
70    // The remainder is kvsep-separated kvpairs terminated by an empty pair
71    // (the trailing `\x01\x01`). Find the `auth` kvpair.
72    let mut token = None;
73    for pair in rest.split('\u{1}') {
74        if pair.is_empty() {
75            continue;
76        }
77        let (key, value) = pair.split_once('=').ok_or(AuthError::MalformedMessage)?;
78        if key == "auth" {
79            let t = value
80                .strip_prefix("Bearer ")
81                .ok_or(AuthError::MalformedMessage)?;
82            token = Some(t.to_string());
83        }
84        // Other keys (host, port, SASL extensions) are not used here.
85    }
86
87    let token = token.ok_or(AuthError::MalformedMessage)?;
88    Ok(ClientInitialResponse { token, authzid })
89}
90
91/// Parse a GS2 header `cb-flag "," [authzid] ","`, returning the authzid if
92/// non-empty. The `a=` prefix RFC 5801 puts on the authzid is stripped.
93fn parse_gs2_header(gs2: &str) -> Result<Option<String>, AuthError> {
94    // cb-flag is one of "n", "y", or "p=<name>". OAUTHBEARER never negotiates
95    // channel binding, so a "p=" flag is malformed here.
96    let rest = if let Some(r) = gs2.strip_prefix("n,") {
97        r
98    } else if let Some(r) = gs2.strip_prefix("y,") {
99        r
100    } else {
101        return Err(AuthError::MalformedMessage);
102    };
103    // `rest` is `[authzid] ","` — must end with the trailing comma.
104    let authzid = rest.strip_suffix(',').ok_or(AuthError::MalformedMessage)?;
105    if authzid.is_empty() {
106        Ok(None)
107    } else {
108        // RFC 5801 prefixes the authzid with `a=`.
109        Ok(Some(
110            authzid.strip_prefix("a=").unwrap_or(authzid).to_string(),
111        ))
112    }
113}
114
115/// Validates an *unsecured* JWS bearer token (`alg: none`) and derives the
116/// connection principal. Mirrors Kafka's
117/// `OAuthBearerUnsecuredValidatorCallbackHandler`.
118#[derive(Debug, Clone, PartialEq)]
119pub struct UnsecuredJwsValidator {
120    /// Claim whose string value becomes the principal name. Default `sub`.
121    pub principal_claim_name: String,
122    /// Tolerance, in milliseconds, applied to the `exp` / `iat` temporal
123    /// checks to absorb clock drift between the client and broker.
124    pub allowable_clock_skew_ms: i64,
125    /// Precompiled `JsonPath` expression evaluated against the
126    /// token's claim set. Token is rejected when the expression yields
127    /// empty/null/false. Compile once at validator construction.
128    pub custom_claim_check: Option<JpQuery>,
129    /// When set, the JWT `typ` header field must equal this
130    /// string. Ignored when unset.
131    pub valid_token_type: Option<String>,
132    /// Alternate claim name to read the principal name from
133    /// when `principal_claim_name` is absent or empty. Strimzi's
134    /// "service-account fallback" — `sub` typically holds a UUID,
135    /// `client_id` is the readable name.
136    pub fallback_user_name_claim: Option<String>,
137    /// Prepended to the resolved principal name ONLY when
138    /// the fallback claim fires. Strimzi convention: "service-account-".
139    pub fallback_user_name_prefix: Option<String>,
140    /// Precompiled `JsonPath` expression extracting group
141    /// memberships from the token claims. Compile-once-at-startup.
142    pub groups_claim: Option<JpQuery>,
143    /// When `groups_claim` resolves to a string (not an
144    /// array), split on this delimiter. Common: "," or " ".
145    pub groups_claim_delimiter: Option<String>,
146}
147
148impl Default for UnsecuredJwsValidator {
149    fn default() -> Self {
150        Self {
151            principal_claim_name: "sub".to_string(),
152            allowable_clock_skew_ms: 30_000,
153            custom_claim_check: None,
154            valid_token_type: None,
155            fallback_user_name_claim: None,
156            fallback_user_name_prefix: None,
157            groups_claim: None,
158            groups_claim_delimiter: None,
159        }
160    }
161}
162
163impl UnsecuredJwsValidator {
164    /// Validate `token` against `now_ms` (Unix epoch milliseconds), returning
165    /// the authenticated [`Principal`] on success.
166    ///
167    /// # Errors
168    ///
169    /// [`AuthError::InvalidToken`] for any structural, signature, temporal,
170    /// scope, or principal-claim failure. The caller maps this onto the RFC
171    /// 7628 `invalid_token` server error status.
172    pub fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
173        // JWS compact serialization: header.payload.signature. For `alg:none`
174        // the signature segment is empty.
175        let mut segs = token.split('.');
176        let header_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
177        let payload_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
178        let sig = segs.next().ok_or(AuthError::InvalidToken)?;
179        if segs.next().is_some() {
180            return Err(AuthError::InvalidToken);
181        }
182        if !sig.is_empty() {
183            // Signed token — needs JWKS signature verification.
184            return Err(AuthError::InvalidToken);
185        }
186
187        let header: Value = decode_json_segment(header_b64)?;
188        if header.get("alg").and_then(Value::as_str) != Some("none") {
189            return Err(AuthError::InvalidToken);
190        }
191        // Optional JWT `typ` header check (JWT-mode validator only).
192        if let Some(expected_typ) = &self.valid_token_type
193            && header.get("typ").and_then(Value::as_str) != Some(expected_typ.as_str())
194        {
195            return Err(AuthError::InvalidToken);
196        }
197
198        let claims: Value = decode_json_segment(payload_b64)?;
199
200        // `exp` is required and must be in the future (within skew).
201        let exp_ms = numeric_date_ms(&claims, "exp").ok_or(AuthError::InvalidToken)?;
202        if exp_ms + self.allowable_clock_skew_ms <= now_ms {
203            return Err(AuthError::InvalidToken);
204        }
205        // `iat` is optional; if present it must not be in the future (within skew).
206        if let Some(iat_ms) = numeric_date_ms(&claims, "iat")
207            && iat_ms - self.allowable_clock_skew_ms > now_ms
208        {
209            return Err(AuthError::InvalidToken);
210        }
211
212        // Optional JsonPath custom_claim_check.
213        if let Some(path) = &self.custom_claim_check
214            && !evaluate_custom_claim_check(path, &claims)
215        {
216            return Err(AuthError::InvalidToken);
217        }
218
219        // Primary → fallback → reject. Prefix applied only
220        // when fallback fires.
221        let (raw_name, used_fallback) = if let Some(n) = claims
222            .get(&self.principal_claim_name)
223            .and_then(Value::as_str)
224            .filter(|s| !s.is_empty())
225        {
226            (n.to_string(), false)
227        } else {
228            let fallback_claim = self
229                .fallback_user_name_claim
230                .as_deref()
231                .ok_or(AuthError::InvalidToken)?;
232            let raw = claims
233                .get(fallback_claim)
234                .and_then(Value::as_str)
235                .filter(|s| !s.is_empty())
236                .ok_or(AuthError::InvalidToken)?;
237            (raw.to_string(), true)
238        };
239        let name = if used_fallback {
240            match &self.fallback_user_name_prefix {
241                Some(prefix) => format!("{prefix}{raw_name}"),
242                None => raw_name,
243            }
244        } else {
245            raw_name
246        };
247
248        // Groups extraction.
249        let groups = match &self.groups_claim {
250            Some(path) => extract_groups(path, &claims, self.groups_claim_delimiter.as_deref()),
251            None => Vec::new(),
252        };
253
254        Ok(AuthOutcome {
255            principal: Principal {
256                name,
257                auth_method: AuthMethod::SaslOAuthBearer,
258                groups,
259            },
260            expires_at_ms: Some(exp_ms),
261        })
262    }
263}
264
265/// Extract group memberships from token claims using a
266/// precompiled `JsonPath`. Each result element is interpreted per its
267/// JSON type:
268/// - `String`: if `delimiter` is set, split + trim + drop empty;
269///   otherwise the whole string becomes one group.
270/// - `Array`: each string element becomes a group.
271/// - `Number` / `Object` / `Null`: ignored (no error).
272///
273/// Returns `vec![]` for empty matches (no groups extracted is not an
274/// error — the token may legitimately have no groups).
275fn extract_groups(path: &JpQuery, claims: &Value, delimiter: Option<&str>) -> Vec<String> {
276    let Ok(refs) = js_path_process(path, claims) else {
277        return Vec::new();
278    };
279    let mut out = Vec::new();
280    for r in refs {
281        match r.val() {
282            Value::String(s) => match delimiter {
283                Some(d) => out.extend(
284                    s.split(d)
285                        .map(str::trim)
286                        .filter(|s| !s.is_empty())
287                        .map(String::from),
288                ),
289                None => out.push(s.clone()),
290            },
291            Value::Array(items) => {
292                out.extend(items.iter().filter_map(Value::as_str).map(String::from));
293            }
294            _ => {} // ignore numbers, objects, nulls
295        }
296    }
297    out
298}
299
300/// Evaluate a precompiled `JsonPath` expression against the token claims.
301/// Returns true when the result is truthy (non-empty, with no element being
302/// null or false); false otherwise. Matches Strimzi's "expression yields
303/// truthy" semantics. Errors during evaluation count as falsy (rejection).
304fn evaluate_custom_claim_check(path: &JpQuery, claims: &Value) -> bool {
305    let Ok(refs) = js_path_process(path, claims) else {
306        return false;
307    };
308    if refs.is_empty() {
309        return false;
310    }
311    for r in refs {
312        match r.val() {
313            Value::Null | Value::Bool(false) => return false,
314            _ => {}
315        }
316    }
317    true
318}
319
320/// Whether the JWT `aud` claim contains `expected`. `aud` is a single string
321/// or an array of strings (RFC 7519 §4.1.3).
322fn audience_contains(claims: &Value, expected: &str) -> bool {
323    match claims.get("aud") {
324        Some(Value::String(s)) => s == expected,
325        Some(Value::Array(items)) => items
326            .iter()
327            .filter_map(Value::as_str)
328            .any(|a| a == expected),
329        _ => false,
330    }
331}
332
333/// base64url-decode a JWS segment and parse it as JSON.
334fn decode_json_segment(seg: &str) -> Result<Value, AuthError> {
335    let bytes = B64URL.decode(seg).map_err(|_| AuthError::InvalidToken)?;
336    serde_json::from_slice(&bytes).map_err(|_| AuthError::InvalidToken)
337}
338
339/// Read a JWT `NumericDate` claim (seconds since the epoch, possibly
340/// fractional) and convert it to integer milliseconds.
341fn numeric_date_ms(claims: &Value, key: &str) -> Option<i64> {
342    let v = claims.get(key)?;
343    // The common case: an integer second count. Avoids any float rounding.
344    if let Some(secs) = v.as_i64() {
345        return secs.checked_mul(1000);
346    }
347    // Fractional NumericDate (rare): truncate to whole milliseconds.
348    let ms = v.as_f64()? * 1000.0;
349    if ms.is_finite() {
350        #[allow(clippy::cast_possible_truncation)]
351        Some(ms as i64)
352    } else {
353        None
354    }
355}
356
357/// The RFC 7628 server error response body for a rejected token. The JVM
358/// `OAuthBearerSaslClient` treats any non-empty first server message as an
359/// error and replies with a single `\x01` kvsep, after which the broker
360/// completes the failure handshake.
361#[must_use]
362pub fn invalid_token_json() -> String {
363    "{\"status\":\"invalid_token\"}".to_string()
364}
365
366/// Validates a *signed* JWS bearer token (`RS256` / `ES256`) against a JWKS
367/// key set fetched from the identity provider, then checks the standard JWT
368/// claims and derives the connection principal.
369///
370/// The key set lives behind a [`JwksHandle`] so the broker's background
371/// refresher can rotate keys without restarting the broker or taking a lock;
372/// each [`validate`](Self::validate) reads the current set.
373#[derive(Debug, Clone)]
374pub struct SignedJwsValidator {
375    /// Claim whose string value becomes the principal name. Default `sub`.
376    pub principal_claim_name: String,
377    /// Tolerance, in milliseconds, applied to `exp` / `iat` / `nbf`.
378    pub allowable_clock_skew_ms: i64,
379    /// When set, the token `iss` claim must equal this exactly.
380    pub valid_issuer: Option<String>,
381    /// When set, the token `aud` claim must contain this value.
382    pub expected_audience: Option<String>,
383    /// Precompiled `JsonPath` `custom_claim_check`. See
384    /// [`UnsecuredJwsValidator`] for semantics.
385    pub custom_claim_check: Option<JpQuery>,
386    /// JWT `typ` header check. Ignored when unset.
387    pub valid_token_type: Option<String>,
388    /// Alternate principal claim. See [`UnsecuredJwsValidator`].
389    pub fallback_user_name_claim: Option<String>,
390    /// Prepended to the principal name only on fallback.
391    pub fallback_user_name_prefix: Option<String>,
392    /// Precompiled `JsonPath` extracting group memberships.
393    pub groups_claim: Option<JpQuery>,
394    /// Delimiter when `groups_claim` resolves to a string.
395    pub groups_claim_delimiter: Option<String>,
396    /// Hard cache-expiry threshold, in milliseconds. When set,
397    /// the validator rejects tokens if the paired refresher has not had a
398    /// successful fetch within this window (using
399    /// [`JwksHandle::last_successful_fetch_ms`]). `None` = no expiry check.
400    /// Fails closed on prolonged `IdP` outage so a
401    /// rotated-out key can't keep signing valid tokens indefinitely.
402    pub expiry_ms: Option<i64>,
403    /// The live JWKS, swapped in by the broker's refresher.
404    keys: JwksHandle,
405}
406
407impl SignedJwsValidator {
408    /// A validator backed by `keys`, with the same claim/skew defaults as the
409    /// unsecured validator and no issuer / audience constraint.
410    #[must_use]
411    pub fn new(keys: JwksHandle) -> Self {
412        Self {
413            principal_claim_name: "sub".to_string(),
414            allowable_clock_skew_ms: 30_000,
415            valid_issuer: None,
416            expected_audience: None,
417            custom_claim_check: None,
418            valid_token_type: None,
419            fallback_user_name_claim: None,
420            fallback_user_name_prefix: None,
421            groups_claim: None,
422            groups_claim_delimiter: None,
423            expiry_ms: None,
424            keys,
425        }
426    }
427
428    /// The shared key-set handle, so the broker can hand the same cell to its
429    /// JWKS refresher task.
430    #[must_use]
431    pub fn key_handle(&self) -> JwksHandle {
432        self.keys.clone()
433    }
434
435    /// Validate a signed bearer token against `now_ms` (Unix epoch ms).
436    ///
437    /// # Errors
438    ///
439    /// [`AuthError::InvalidToken`] for any structural, signature, temporal,
440    /// issuer, audience, scope, or principal-claim failure.
441    pub fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
442        // JWS compact serialization: header.payload.signature, all non-empty.
443        let mut segs = token.split('.');
444        let header_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
445        let payload_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
446        let sig_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
447        if segs.next().is_some() || sig_b64.is_empty() {
448            return Err(AuthError::InvalidToken);
449        }
450
451        let header: Value = decode_json_segment(header_b64)?;
452        let alg = header
453            .get("alg")
454            .and_then(Value::as_str)
455            .ok_or(AuthError::InvalidToken)?;
456        if alg != "RS256" && alg != "ES256" {
457            return Err(AuthError::InvalidToken);
458        }
459        // Optional JWT `typ` check (JWT-mode validator only).
460        if let Some(expected_typ) = &self.valid_token_type
461            && header.get("typ").and_then(Value::as_str) != Some(expected_typ.as_str())
462        {
463            return Err(AuthError::InvalidToken);
464        }
465
466        // Hard cache-expiry. If the last successful refresh is
467        // older than `expiry_ms`, reject all tokens until the refresher
468        // succeeds again. The `last_fetch > 0` guard skips this on a
469        // never-fetched handle so the "broker is still starting
470        // up" path stays open (the verify-level check below will reject
471        // anyway because the key set is empty).
472        if let Some(expiry_ms) = self.expiry_ms {
473            let last_fetch = self.keys.last_successful_fetch_ms();
474            if last_fetch > 0 && now_ms.saturating_sub(last_fetch) > expiry_ms {
475                tracing::debug!(
476                    last_fetch_ms = last_fetch,
477                    now_ms,
478                    expiry_ms,
479                    "JWKS cache expired; rejecting token until next successful refresh",
480                );
481                return Err(AuthError::InvalidToken);
482            }
483        }
484
485        let kid = header.get("kid").and_then(Value::as_str);
486
487        let signing_input = format!("{header_b64}.{payload_b64}");
488        let sig = B64URL
489            .decode(sig_b64)
490            .map_err(|_| AuthError::InvalidToken)?;
491        // On any verify failure (unknown kid or bad signature)
492        // signal the refresher to attempt an on-demand JWKS fetch — the
493        // signing key may have rotated since the last periodic refresh.
494        // The current token still rejects; a subsequent reconnect will
495        // see the rotated keys. The refresher's `min_on_demand_pause`
496        // caps the signal-storm cost.
497        if let Err(e) = self
498            .keys
499            .load()
500            .verify(kid, alg, signing_input.as_bytes(), &sig)
501        {
502            self.keys.signal_refresh();
503            return Err(e);
504        }
505
506        let claims: Value = decode_json_segment(payload_b64)?;
507        self.check_claims(&claims, now_ms)
508    }
509
510    /// Apply the claim policy (temporal, issuer, audience, scope, principal) to
511    /// already-signature-verified `claims`. Split out so the policy is
512    /// unit-testable without minting signed tokens.
513    fn check_claims(&self, claims: &Value, now_ms: i64) -> Result<AuthOutcome, AuthError> {
514        // `exp` required and in the future (within skew).
515        let exp_ms = numeric_date_ms(claims, "exp").ok_or(AuthError::InvalidToken)?;
516        if exp_ms + self.allowable_clock_skew_ms <= now_ms {
517            return Err(AuthError::InvalidToken);
518        }
519        // `iat` optional: must not be in the future (within skew).
520        if let Some(iat_ms) = numeric_date_ms(claims, "iat")
521            && iat_ms - self.allowable_clock_skew_ms > now_ms
522        {
523            return Err(AuthError::InvalidToken);
524        }
525        // `nbf` optional: token not valid before it (within skew).
526        if let Some(nbf_ms) = numeric_date_ms(claims, "nbf")
527            && nbf_ms - self.allowable_clock_skew_ms > now_ms
528        {
529            return Err(AuthError::InvalidToken);
530        }
531
532        if let Some(expected) = &self.valid_issuer
533            && claims.get("iss").and_then(Value::as_str) != Some(expected.as_str())
534        {
535            return Err(AuthError::InvalidToken);
536        }
537
538        if let Some(expected) = &self.expected_audience
539            && !audience_contains(claims, expected)
540        {
541            return Err(AuthError::InvalidToken);
542        }
543
544        // Optional JsonPath custom_claim_check.
545        if let Some(path) = &self.custom_claim_check
546            && !evaluate_custom_claim_check(path, claims)
547        {
548            return Err(AuthError::InvalidToken);
549        }
550
551        // Primary → fallback → reject. Prefix on fallback only.
552        let (raw_name, used_fallback) = if let Some(n) = claims
553            .get(&self.principal_claim_name)
554            .and_then(Value::as_str)
555            .filter(|s| !s.is_empty())
556        {
557            (n.to_string(), false)
558        } else {
559            let fallback_claim = self
560                .fallback_user_name_claim
561                .as_deref()
562                .ok_or(AuthError::InvalidToken)?;
563            let raw = claims
564                .get(fallback_claim)
565                .and_then(Value::as_str)
566                .filter(|s| !s.is_empty())
567                .ok_or(AuthError::InvalidToken)?;
568            (raw.to_string(), true)
569        };
570        let name = if used_fallback {
571            match &self.fallback_user_name_prefix {
572                Some(prefix) => format!("{prefix}{raw_name}"),
573                None => raw_name,
574            }
575        } else {
576            raw_name
577        };
578
579        let groups = match &self.groups_claim {
580            Some(path) => extract_groups(path, claims, self.groups_claim_delimiter.as_deref()),
581            None => Vec::new(),
582        };
583
584        Ok(AuthOutcome {
585            principal: Principal {
586                name,
587                auth_method: AuthMethod::SaslOAuthBearer,
588                groups,
589            },
590            expires_at_ms: Some(exp_ms),
591        })
592    }
593}
594
595/// The broker's configured OAUTHBEARER token validator: the
596/// development-only unsecured-JWS path, production signed-JWT
597/// validation against a JWKS endpoint, or RFC 7662 opaque-token
598/// introspection. Defaults to unsecured.
599#[derive(Debug, Clone)]
600pub enum OAuthBearerValidator {
601    /// Unsecured JWS (`alg:none`) — development / testing only.
602    Unsecured(UnsecuredJwsValidator),
603    /// Signed JWS verified against a JWKS key set.
604    Signed(SignedJwsValidator),
605    /// RFC 7662 opaque-token introspection.
606    Introspection(IntrospectionValidator),
607}
608
609impl Default for OAuthBearerValidator {
610    fn default() -> Self {
611        Self::Unsecured(UnsecuredJwsValidator::default())
612    }
613}
614
615impl OAuthBearerValidator {
616    /// Validate `token` against `now_ms`, dispatching to the configured path.
617    ///
618    /// # Errors
619    ///
620    /// - [`AuthError::InvalidToken`] when the token fails validation.
621    /// - [`AuthError::IntrospectionTransport`] when the introspection variant's
622    ///   HTTP call fails at the transport layer.
623    pub async fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
624        match self {
625            Self::Unsecured(v) => v.validate(token, now_ms),
626            Self::Signed(v) => v.validate(token, now_ms),
627            Self::Introspection(v) => v.validate(token, now_ms).await,
628        }
629    }
630
631    /// The JWKS handle when this is a signed validator, so the broker can wire
632    /// a refresher to the same key cell. `None` for the unsecured + introspection paths.
633    #[must_use]
634    pub fn jwks_handle(&self) -> Option<JwksHandle> {
635        match self {
636            Self::Unsecured(_) | Self::Introspection(_) => None,
637            Self::Signed(v) => Some(v.key_handle()),
638        }
639    }
640}
641
642/// HTTP transport contract for RFC 7662 introspection + OIDC userinfo.
643/// Lives in this crate to keep `crates/security` as the validator surface;
644/// the concrete reqwest-backed impl lives in `crates/broker`
645/// (`oauth_introspection.rs`) so this crate stays I/O-free.
646#[async_trait::async_trait]
647pub trait IntrospectionClient: Send + Sync + std::fmt::Debug {
648    /// POST the `IdP`'s introspection endpoint with `token` in a
649    /// form-encoded body. Caller checks `active` + claims.
650    async fn introspect(&self, token: &str) -> Result<serde_json::Value, IntrospectionError>;
651
652    /// GET the `IdP`'s userinfo endpoint with `Authorization: Bearer
653    /// <token>`. `Ok(None)` when the validator is configured without
654    /// userinfo enrichment.
655    async fn userinfo(&self, token: &str) -> Result<Option<serde_json::Value>, IntrospectionError>;
656}
657
658/// Transport-layer failures surfaced by [`IntrospectionClient`]. The
659/// validator maps these onto [`AuthError::IntrospectionTransport`] for
660/// the SASL handler.
661#[derive(Debug, thiserror::Error)]
662pub enum IntrospectionError {
663    #[error("transport: {0}")]
664    Transport(String),
665    #[error("non-2xx response: {0}")]
666    Status(u16),
667    #[error("invalid JSON body")]
668    Parse,
669}
670
671/// RFC 7662 opaque-token introspection validator. Calls the
672/// introspection endpoint per token (no caching — RFC 7662 §4 discourages
673/// caching without explicit lifetime info; SASL is once per connection so
674/// the cost is acceptable). Optionally calls OIDC userinfo after a
675/// successful introspection and merges the profile claims over the
676/// introspection claims.
677#[derive(Debug, Clone)]
678pub struct IntrospectionValidator {
679    pub client: Arc<dyn IntrospectionClient>,
680    /// Claim whose string value becomes the principal name. Default `sub`
681    /// for generic OAuth flows; commonly `client_id` for Keycloak
682    /// client-credentials.
683    pub principal_claim_name: String,
684    /// Precompiled `JsonPath` `custom_claim_check`. See
685    /// [`UnsecuredJwsValidator`] for semantics. Introspection has no JWT
686    /// header, so there is no `valid_token_type` field here.
687    pub custom_claim_check: Option<JpQuery>,
688    /// `true` iff a `userinfo_endpoint_uri` is configured; the validator
689    /// calls `client.userinfo(token)` after a successful introspection and
690    /// merges the response over the introspection claims.
691    pub call_userinfo: bool,
692    /// Clock-skew tolerance for `exp`/`iat`/`nbf` checks on
693    /// introspection-response timestamps (when present).
694    pub allowable_clock_skew_ms: i64,
695    /// When set, the introspection-response `aud` claim (RFC 7662 §2.2) must
696    /// contain this value. Defaults to `None` (no audience restriction).
697    /// Prevents a token minted for another resource server of the same `IdP`,
698    /// which still introspects as `active: true`, from authenticating here.
699    pub expected_audience: Option<String>,
700    /// Alternate principal claim. See [`UnsecuredJwsValidator`].
701    pub fallback_user_name_claim: Option<String>,
702    /// Prepended to the principal name only on fallback.
703    pub fallback_user_name_prefix: Option<String>,
704    /// Precompiled `JsonPath` extracting group memberships,
705    /// evaluated against the merged claims (introspection + optional
706    /// userinfo).
707    pub groups_claim: Option<JpQuery>,
708    /// Delimiter when `groups_claim` resolves to a string.
709    pub groups_claim_delimiter: Option<String>,
710}
711
712impl IntrospectionValidator {
713    /// Validate a bearer token via RFC 7662 introspection + optional
714    /// userinfo enrichment.
715    ///
716    /// # Errors
717    ///
718    /// - [`AuthError::IntrospectionTransport`] on HTTP transport / parse failures.
719    /// - [`AuthError::InvalidToken`] on `active != true`, missing `exp`,
720    ///   missing principal claim, scope mismatch, or temporal-claim failure.
721    ///   `exp` is required so the SASL handler can populate
722    ///   `session_lifetime_ms` for KIP-368 re-authentication.
723    pub async fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
724        let mut claims = self
725            .client
726            .introspect(token)
727            .await
728            .map_err(|e| AuthError::IntrospectionTransport(e.to_string()))?;
729        if claims.get("active").and_then(Value::as_bool) != Some(true) {
730            return Err(AuthError::InvalidToken);
731        }
732        // Audience restriction. When configured, the introspection-response
733        // `aud` claim (RFC 7662 §2.2) must contain the expected value, mirroring
734        // the signed-JWS path. Guards against an `IdP` that serves multiple
735        // resource servers handing out a token that introspects as active here
736        // but was minted for a different audience.
737        if let Some(expected) = &self.expected_audience
738            && !audience_contains(&claims, expected)
739        {
740            return Err(AuthError::InvalidToken);
741        }
742        check_temporal_claims(&claims, now_ms, self.allowable_clock_skew_ms)?;
743        // Capture `exp_ms` from the introspection response BEFORE any userinfo
744        // merge. Introspection's `exp` is the authoritative session expiry
745        // (RFC 7662); userinfo typically doesn't carry `exp`, and the
746        // `merge_userinfo_over_introspection` precedence already reserves
747        // `exp` to introspection, but pulling it out here makes the
748        // ordering explicit. Required for OAUTHBEARER (validators reject
749        // tokens without `exp`).
750        let exp_ms = numeric_date_ms(&claims, "exp").ok_or(AuthError::InvalidToken)?;
751        if self.call_userinfo
752            && let Some(ui) = self
753                .client
754                .userinfo(token)
755                .await
756                .map_err(|e| AuthError::IntrospectionTransport(e.to_string()))?
757        {
758            merge_userinfo_over_introspection(&mut claims, ui);
759        }
760        // Optional JsonPath custom_claim_check. Evaluated against the merged
761        // claims (introspection plus optional userinfo).
762        if let Some(path) = &self.custom_claim_check
763            && !evaluate_custom_claim_check(path, &claims)
764        {
765            return Err(AuthError::InvalidToken);
766        }
767        // Primary → fallback → reject. Prefix on fallback only.
768        let (raw_name, used_fallback) = if let Some(n) = claims
769            .get(&self.principal_claim_name)
770            .and_then(Value::as_str)
771            .filter(|s| !s.is_empty())
772        {
773            (n.to_string(), false)
774        } else {
775            let fallback_claim = self
776                .fallback_user_name_claim
777                .as_deref()
778                .ok_or(AuthError::InvalidToken)?;
779            let raw = claims
780                .get(fallback_claim)
781                .and_then(Value::as_str)
782                .filter(|s| !s.is_empty())
783                .ok_or(AuthError::InvalidToken)?;
784            (raw.to_string(), true)
785        };
786        let name = if used_fallback {
787            match &self.fallback_user_name_prefix {
788                Some(prefix) => format!("{prefix}{raw_name}"),
789                None => raw_name,
790            }
791        } else {
792            raw_name
793        };
794
795        let groups = match &self.groups_claim {
796            Some(path) => extract_groups(path, &claims, self.groups_claim_delimiter.as_deref()),
797            None => Vec::new(),
798        };
799
800        Ok(AuthOutcome {
801            principal: Principal {
802                name,
803                auth_method: AuthMethod::SaslOAuthBearer,
804                groups,
805            },
806            expires_at_ms: Some(exp_ms),
807        })
808    }
809}
810
811/// Skew-tolerant temporal-claims check for introspection responses.
812/// RFC 7662 doesn't mandate exp/iat/nbf, but honor them when present.
813fn check_temporal_claims(claims: &Value, now_ms: i64, skew_ms: i64) -> Result<(), AuthError> {
814    if let Some(exp_s) = claims.get("exp").and_then(Value::as_i64) {
815        let exp_ms = exp_s.saturating_mul(1000);
816        if now_ms.saturating_sub(skew_ms) > exp_ms {
817            return Err(AuthError::InvalidToken);
818        }
819    }
820    if let Some(iat_s) = claims.get("iat").and_then(Value::as_i64) {
821        let iat_ms = iat_s.saturating_mul(1000);
822        if iat_ms.saturating_sub(skew_ms) > now_ms {
823            return Err(AuthError::InvalidToken);
824        }
825    }
826    if let Some(nbf_s) = claims.get("nbf").and_then(Value::as_i64) {
827        let nbf_ms = nbf_s.saturating_mul(1000);
828        if nbf_ms.saturating_sub(skew_ms) > now_ms {
829            return Err(AuthError::InvalidToken);
830        }
831    }
832    Ok(())
833}
834
835/// Merge userinfo response over introspection claims. Userinfo wins for
836/// profile-style claims (`preferred_username`, email, name, `given_name`,
837/// `family_name`, ...); introspection wins for the small set of
838/// authorization claims listed in `RESERVED`.
839fn merge_userinfo_over_introspection(introspection: &mut Value, userinfo: Value) {
840    const RESERVED: &[&str] = &["active", "exp", "iat", "nbf", "scope", "client_id", "sub"];
841    let (Some(obj), Value::Object(ui_map)) = (introspection.as_object_mut(), userinfo) else {
842        return;
843    };
844    for (k, v) in ui_map {
845        if !RESERVED.contains(&k.as_str()) {
846            obj.insert(k, v);
847        }
848    }
849}
850
851#[cfg(test)]
852mod tests {
853    use super::*;
854    use assert2::assert;
855    use jsonpath_rust::parser::parse_json_path;
856
857    fn jws(header: &str, claims: &str) -> String {
858        format!(
859            "{}.{}.",
860            B64URL.encode(header.as_bytes()),
861            B64URL.encode(claims.as_bytes())
862        )
863    }
864
865    fn unsecured(sub: &str, iat_s: i64, exp_s: i64) -> String {
866        jws(
867            "{\"alg\":\"none\"}",
868            &format!("{{\"sub\":\"{sub}\",\"iat\":{iat_s},\"exp\":{exp_s}}}"),
869        )
870    }
871
872    /// Build an unsecured-JWS from an explicit header + claim object (so
873    /// callers can drive the `typ` header for `typ`-check tests). The
874    /// signature segment is left empty per `alg:none`.
875    fn make_unsecured_jws_with_header(
876        header: &serde_json::Value,
877        claims: &serde_json::Value,
878    ) -> String {
879        format!(
880            "{}.{}.",
881            B64URL.encode(serde_json::to_vec(header).unwrap()),
882            B64URL.encode(serde_json::to_vec(claims).unwrap()),
883        )
884    }
885
886    fn make_unsecured_jws(claims: &serde_json::Value) -> String {
887        make_unsecured_jws_with_header(&serde_json::json!({"alg": "none"}), claims)
888    }
889
890    fn parse_jp(expr: &str) -> JpQuery {
891        parse_json_path(expr).expect("expression compiles")
892    }
893
894    fn client_resp(token: &str) -> Vec<u8> {
895        format!("n,,\u{1}auth=Bearer {token}\u{1}\u{1}").into_bytes()
896    }
897
898    #[test]
899    fn parse_happy_path_empty_authzid() {
900        let r = parse_client_initial_response(&client_resp("tok.en.")).unwrap();
901        assert!(r.token == "tok.en.");
902        assert!(r.authzid == None);
903    }
904
905    #[test]
906    fn parse_extracts_authzid_and_ignores_extra_kvpairs() {
907        let bytes =
908            b"n,a=alice,\x01host=example.com\x01auth=Bearer abc\x01port=443\x01\x01".to_vec();
909        let r = parse_client_initial_response(&bytes).unwrap();
910        assert!(r.token == "abc");
911        assert!(r.authzid == Some("alice".to_string()));
912    }
913
914    #[test]
915    fn parse_rejects_missing_auth_kvpair() {
916        let bytes = b"n,,\x01host=example.com\x01\x01".to_vec();
917        assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
918    }
919
920    #[test]
921    fn parse_rejects_missing_bearer_prefix() {
922        let bytes = b"n,,\x01auth=Basic abc\x01\x01".to_vec();
923        assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
924    }
925
926    #[test]
927    fn parse_rejects_bad_gs2_header() {
928        let bytes = b"z,,\x01auth=Bearer abc\x01\x01".to_vec();
929        assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
930    }
931
932    #[test]
933    fn validate_accepts_fresh_unsecured_token() {
934        let v = UnsecuredJwsValidator::default();
935        let now = 1_000_000_000_000;
936        let token = unsecured("admin", 999_999_000, 1_000_000_900); // seconds
937        let outcome = v.validate(&token, now).unwrap();
938        assert!(outcome.principal.name == "admin");
939        assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
940    }
941
942    #[test]
943    fn unsecured_validate_surfaces_exp_in_auth_outcome() {
944        // exp = 2000 sec = 2_000_000 ms; now = 1_000_000 ms.
945        let exp_secs: i64 = 2_000;
946        let now_ms: i64 = 1_000_000;
947        let token = unsecured("alice", 999, exp_secs);
948        let v = UnsecuredJwsValidator::default();
949        let outcome = v.validate(&token, now_ms).expect("token valid");
950        assert!(outcome.principal.name == "alice");
951        assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
952    }
953
954    #[test]
955    fn validate_rejects_expired_token() {
956        let v = UnsecuredJwsValidator {
957            allowable_clock_skew_ms: 0,
958            ..Default::default()
959        };
960        let now = 2_000_000_000_000;
961        let token = unsecured("admin", 1_000_000_000, 1_000_000_100);
962        assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
963    }
964
965    #[test]
966    fn validate_rejects_future_iat() {
967        let v = UnsecuredJwsValidator {
968            allowable_clock_skew_ms: 0,
969            ..Default::default()
970        };
971        let now = 1_000_000_000_000;
972        // iat far in the future, exp even further.
973        let token = unsecured("admin", 5_000_000_000, 5_000_000_100);
974        assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
975    }
976
977    #[test]
978    fn validate_rejects_signed_token() {
979        let v = UnsecuredJwsValidator::default();
980        let now = 1_000_000_000_000;
981        // alg RS256 + a non-empty signature segment.
982        let token = format!(
983            "{}.{}.{}",
984            B64URL.encode(b"{\"alg\":\"RS256\"}"),
985            B64URL.encode(b"{\"sub\":\"admin\",\"exp\":1000000900}"),
986            B64URL.encode(b"sig")
987        );
988        assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
989    }
990
991    #[test]
992    fn validate_rejects_missing_exp() {
993        let v = UnsecuredJwsValidator::default();
994        let token = jws("{\"alg\":\"none\"}", "{\"sub\":\"admin\"}");
995        assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
996    }
997
998    #[test]
999    fn validate_rejects_missing_principal_claim() {
1000        let v = UnsecuredJwsValidator::default();
1001        let token = jws("{\"alg\":\"none\"}", "{\"exp\":5000000000}");
1002        assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1003    }
1004
1005    // ---- custom_claim_check (JsonPath) + valid_token_type ---
1006
1007    #[test]
1008    fn unsecured_validate_rejects_when_custom_claim_check_fails() {
1009        let exp_secs: i64 = 2_000;
1010        let now_ms: i64 = 1_000_000;
1011        let token = make_unsecured_jws(&serde_json::json!({
1012            "sub": "alice",
1013            "exp": exp_secs,
1014            "scope": ["kafka.read"],
1015        }));
1016        let v = UnsecuredJwsValidator {
1017            custom_claim_check: Some(parse_jp("$.scope[?@ == 'kafka.admin']")),
1018            ..Default::default()
1019        };
1020        let result = v.validate(&token, now_ms);
1021        assert!(result.unwrap_err() == AuthError::InvalidToken);
1022    }
1023
1024    #[test]
1025    fn unsecured_validate_accepts_when_custom_claim_check_passes() {
1026        let exp_secs: i64 = 2_000;
1027        let now_ms: i64 = 1_000_000;
1028        let token = make_unsecured_jws(&serde_json::json!({
1029            "sub": "alice",
1030            "exp": exp_secs,
1031            "scope": ["kafka.admin", "kafka.read"],
1032        }));
1033        let v = UnsecuredJwsValidator {
1034            custom_claim_check: Some(parse_jp("$.scope[?@ == 'kafka.admin']")),
1035            ..Default::default()
1036        };
1037        let outcome = v.validate(&token, now_ms).expect("valid token");
1038        assert!(outcome.principal.name == "alice");
1039    }
1040
1041    #[test]
1042    fn unsecured_validate_rejects_when_valid_token_type_mismatch() {
1043        let exp_secs: i64 = 2_000;
1044        let now_ms: i64 = 1_000_000;
1045        // Token with typ=OPAQUE in the header.
1046        let token = make_unsecured_jws_with_header(
1047            &serde_json::json!({"alg": "none", "typ": "OPAQUE"}),
1048            &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1049        );
1050        let v = UnsecuredJwsValidator {
1051            valid_token_type: Some("JWT".into()),
1052            ..Default::default()
1053        };
1054        let result = v.validate(&token, now_ms);
1055        assert!(result.unwrap_err() == AuthError::InvalidToken);
1056    }
1057
1058    #[test]
1059    fn unsecured_validate_accepts_when_valid_token_type_match() {
1060        let exp_secs: i64 = 2_000;
1061        let now_ms: i64 = 1_000_000;
1062        let token = make_unsecured_jws_with_header(
1063            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1064            &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1065        );
1066        let v = UnsecuredJwsValidator {
1067            valid_token_type: Some("JWT".into()),
1068            ..Default::default()
1069        };
1070        assert!(v.validate(&token, now_ms).is_ok());
1071    }
1072
1073    #[test]
1074    fn unsecured_validate_accepts_when_valid_token_type_unset_regardless_of_header() {
1075        let exp_secs: i64 = 2_000;
1076        let now_ms: i64 = 1_000_000;
1077        let token = make_unsecured_jws_with_header(
1078            &serde_json::json!({"alg": "none", "typ": "OPAQUE"}),
1079            &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1080        );
1081        let v = UnsecuredJwsValidator::default();
1082        // No valid_token_type set → header `typ` ignored.
1083        assert!(v.validate(&token, now_ms).is_ok());
1084    }
1085
1086    // ---- name fallback chain + groups extraction --------------
1087
1088    #[test]
1089    fn unsecured_validate_uses_primary_principal_claim_when_present() {
1090        // Regression: primary claim present → use primary, no prefix.
1091        let exp_secs: i64 = 2_000;
1092        let now_ms: i64 = 1_000_000;
1093        let token = make_unsecured_jws_with_header(
1094            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1095            &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1096        );
1097        let v = UnsecuredJwsValidator {
1098            fallback_user_name_claim: Some("client_id".into()),
1099            fallback_user_name_prefix: Some("service-account-".into()),
1100            ..Default::default()
1101        };
1102        let outcome = v.validate(&token, now_ms).expect("valid");
1103        assert!(outcome.principal.name == "alice"); // primary, no prefix
1104    }
1105
1106    #[test]
1107    fn unsecured_validate_falls_back_to_alt_claim_when_primary_absent() {
1108        let exp_secs: i64 = 2_000;
1109        let now_ms: i64 = 1_000_000;
1110        let token = make_unsecured_jws_with_header(
1111            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1112            // No `sub` — primary lookup fails.
1113            &serde_json::json!({"client_id": "svc1", "exp": exp_secs}),
1114        );
1115        let v = UnsecuredJwsValidator {
1116            fallback_user_name_claim: Some("client_id".into()),
1117            ..Default::default()
1118        };
1119        let outcome = v.validate(&token, now_ms).expect("valid");
1120        assert!(outcome.principal.name == "svc1"); // fallback, no prefix
1121    }
1122
1123    #[test]
1124    fn unsecured_validate_applies_fallback_prefix_only_on_fallback() {
1125        let exp_secs: i64 = 2_000;
1126        let now_ms: i64 = 1_000_000;
1127        let token = make_unsecured_jws_with_header(
1128            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1129            &serde_json::json!({"client_id": "svc1", "exp": exp_secs}),
1130        );
1131        let v = UnsecuredJwsValidator {
1132            fallback_user_name_claim: Some("client_id".into()),
1133            fallback_user_name_prefix: Some("service-account-".into()),
1134            ..Default::default()
1135        };
1136        let outcome = v.validate(&token, now_ms).expect("valid");
1137        assert!(outcome.principal.name == "service-account-svc1");
1138    }
1139
1140    #[test]
1141    fn unsecured_validate_rejects_when_neither_primary_nor_fallback_present() {
1142        let exp_secs: i64 = 2_000;
1143        let now_ms: i64 = 1_000_000;
1144        let token = make_unsecured_jws_with_header(
1145            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1146            // Neither sub nor client_id.
1147            &serde_json::json!({"exp": exp_secs}),
1148        );
1149        let v = UnsecuredJwsValidator {
1150            fallback_user_name_claim: Some("client_id".into()),
1151            ..Default::default()
1152        };
1153        assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
1154    }
1155
1156    #[test]
1157    fn unsecured_validate_extracts_groups_from_array_claim() {
1158        let exp_secs: i64 = 2_000;
1159        let now_ms: i64 = 1_000_000;
1160        let token = make_unsecured_jws_with_header(
1161            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1162            &serde_json::json!({
1163                "sub": "alice",
1164                "exp": exp_secs,
1165                "groups": ["admin", "ops"],
1166            }),
1167        );
1168        let v = UnsecuredJwsValidator {
1169            groups_claim: Some(parse_jp("$.groups")),
1170            ..Default::default()
1171        };
1172        let outcome = v.validate(&token, now_ms).expect("valid");
1173        assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
1174    }
1175
1176    #[test]
1177    fn unsecured_validate_extracts_groups_from_delimited_string() {
1178        let exp_secs: i64 = 2_000;
1179        let now_ms: i64 = 1_000_000;
1180        let token = make_unsecured_jws_with_header(
1181            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1182            &serde_json::json!({
1183                "sub": "alice",
1184                "exp": exp_secs,
1185                "groups": "admin,ops, kafka",
1186            }),
1187        );
1188        let v = UnsecuredJwsValidator {
1189            groups_claim: Some(parse_jp("$.groups")),
1190            groups_claim_delimiter: Some(",".into()),
1191            ..Default::default()
1192        };
1193        let outcome = v.validate(&token, now_ms).expect("valid");
1194        assert!(
1195            outcome.principal.groups
1196                == vec!["admin".to_string(), "ops".to_string(), "kafka".to_string()]
1197        );
1198    }
1199
1200    #[test]
1201    fn unsecured_validate_extracts_groups_from_nested_claim_via_jsonpath() {
1202        let exp_secs: i64 = 2_000;
1203        let now_ms: i64 = 1_000_000;
1204        let token = make_unsecured_jws_with_header(
1205            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1206            &serde_json::json!({
1207                "sub": "alice",
1208                "exp": exp_secs,
1209                "realm_access": { "roles": ["admin", "ops"] },
1210            }),
1211        );
1212        let v = UnsecuredJwsValidator {
1213            groups_claim: Some(parse_jp("$.realm_access.roles[*]")),
1214            ..Default::default()
1215        };
1216        let outcome = v.validate(&token, now_ms).expect("valid");
1217        assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
1218    }
1219
1220    #[test]
1221    fn unsecured_validate_returns_empty_groups_when_claim_unset() {
1222        let exp_secs: i64 = 2_000;
1223        let now_ms: i64 = 1_000_000;
1224        let token = make_unsecured_jws_with_header(
1225            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1226            &serde_json::json!({
1227                "sub": "alice",
1228                "exp": exp_secs,
1229                "groups": ["admin"],
1230            }),
1231        );
1232        let v = UnsecuredJwsValidator::default(); // no groups_claim
1233        let outcome = v.validate(&token, now_ms).expect("valid");
1234        assert!(outcome.principal.groups == Vec::<String>::new());
1235    }
1236
1237    #[test]
1238    fn unsecured_validate_returns_empty_groups_when_claim_resolves_to_empty() {
1239        let exp_secs: i64 = 2_000;
1240        let now_ms: i64 = 1_000_000;
1241        let token = make_unsecured_jws_with_header(
1242            &serde_json::json!({"alg": "none", "typ": "JWT"}),
1243            &serde_json::json!({
1244                "sub": "alice",
1245                "exp": exp_secs,
1246            }),
1247        );
1248        let v = UnsecuredJwsValidator {
1249            groups_claim: Some(parse_jp("$.nonexistent")),
1250            ..Default::default()
1251        };
1252        let outcome = v.validate(&token, now_ms).expect("valid");
1253        assert!(outcome.principal.groups == Vec::<String>::new());
1254    }
1255
1256    #[test]
1257    fn validate_custom_principal_claim() {
1258        let v = UnsecuredJwsValidator {
1259            principal_claim_name: "client_id".to_string(),
1260            ..Default::default()
1261        };
1262        let token = jws(
1263            "{\"alg\":\"none\"}",
1264            "{\"client_id\":\"svc-1\",\"exp\":5000000000}",
1265        );
1266        let outcome = v.validate(&token, 1_000_000_000_000).unwrap();
1267        assert!(outcome.principal.name == "svc-1");
1268    }
1269
1270    #[test]
1271    fn invalid_token_json_is_rfc7628_shape() {
1272        assert!(invalid_token_json() == "{\"status\":\"invalid_token\"}");
1273    }
1274
1275    // ---- SignedJwsValidator -------------------------------------
1276
1277    use crate::jwks::{Jwks, JwksHandle, mint_es256, mint_rs256, mint_rs256_with_header};
1278
1279    /// Build a `SignedJwsValidator` whose key set is populated from `jwks_json`.
1280    fn signed(jwks_json: &str) -> (SignedJwsValidator, JwksHandle) {
1281        let handle = JwksHandle::new(Jwks::from_json(jwks_json, false).unwrap());
1282        (SignedJwsValidator::new(handle.clone()), handle)
1283    }
1284
1285    #[test]
1286    fn signed_accepts_fresh_rs256_token() {
1287        let (token, jwks) = mint_rs256("k1", "{\"sub\":\"admin\",\"exp\":9999999999}");
1288        let (v, _h) = signed(&jwks);
1289        let outcome = v.validate(&token, 1_000_000_000_000).unwrap();
1290        assert!(outcome.principal.name == "admin");
1291        assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
1292    }
1293
1294    #[test]
1295    fn signed_validate_surfaces_exp_in_auth_outcome() {
1296        let exp_secs: i64 = 2_000;
1297        let now_ms: i64 = 1_000_000;
1298        let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1299        let (v, _h) = signed(&jwks);
1300        let outcome = v.validate(&token, now_ms).expect("token valid");
1301        assert!(outcome.principal.name == "alice");
1302        assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
1303    }
1304
1305    #[test]
1306    fn signed_rejects_unsecured_alg_none() {
1307        // An `alg:none` token must never pass the signed validator even if a
1308        // key happens to be present.
1309        let (_token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1310        let (v, _h) = signed(&jwks);
1311        let unsecured = jws("{\"alg\":\"none\"}", "{\"sub\":\"a\",\"exp\":9999999999}");
1312        assert!(v.validate(&unsecured, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1313    }
1314
1315    #[test]
1316    fn signed_rejects_expired() {
1317        let (token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":1000}");
1318        let (mut v, _h) = signed(&jwks);
1319        v.allowable_clock_skew_ms = 0;
1320        // now (ms) far past exp (1000 s).
1321        assert!(v.validate(&token, 5_000_000_000_000) == Err(AuthError::InvalidToken));
1322    }
1323
1324    #[test]
1325    fn signed_rejects_future_nbf() {
1326        let (token, jwks) = mint_rs256(
1327            "k1",
1328            "{\"sub\":\"a\",\"exp\":9999999999,\"nbf\":5000000000}",
1329        );
1330        let (mut v, _h) = signed(&jwks);
1331        v.allowable_clock_skew_ms = 0;
1332        // now = 1e12 ms = 1e9 s, which is before nbf (5e9 s).
1333        assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1334    }
1335
1336    #[test]
1337    fn signed_honors_issuer() {
1338        let (token, jwks) = mint_rs256(
1339            "k1",
1340            "{\"sub\":\"a\",\"exp\":9999999999,\"iss\":\"https://idp\"}",
1341        );
1342        let (mut v, _h) = signed(&jwks);
1343        v.valid_issuer = Some("https://idp".to_string());
1344        assert!(v.validate(&token, 1_000_000_000_000).is_ok());
1345        v.valid_issuer = Some("https://other".to_string());
1346        assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1347    }
1348
1349    #[test]
1350    fn signed_rejects_missing_issuer_when_required() {
1351        let (token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1352        let (mut v, _h) = signed(&jwks);
1353        v.valid_issuer = Some("https://idp".to_string());
1354        assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1355    }
1356
1357    #[test]
1358    fn signed_honors_audience_string_and_array() {
1359        let (tok_str, jwks) =
1360            mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":\"kafka\"}");
1361        let (mut v, _h) = signed(&jwks);
1362        v.expected_audience = Some("kafka".to_string());
1363        assert!(v.validate(&tok_str, 1_000_000_000_000).is_ok());
1364
1365        let (tok_arr, jwks2) = mint_rs256(
1366            "k1",
1367            "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":[\"other\",\"kafka\"]}",
1368        );
1369        let (mut v2, _h2) = signed(&jwks2);
1370        v2.expected_audience = Some("kafka".to_string());
1371        assert!(v2.validate(&tok_arr, 1_000_000_000_000).is_ok());
1372
1373        let (tok_bad, jwks3) =
1374            mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":\"web\"}");
1375        let (mut v3, _h3) = signed(&jwks3);
1376        v3.expected_audience = Some("kafka".to_string());
1377        assert!(v3.validate(&tok_bad, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1378    }
1379
1380    // ---- SignedJwsValidator custom_claim_check + valid_token_type
1381
1382    #[test]
1383    fn signed_validate_rejects_when_custom_claim_check_fails() {
1384        let (token, jwks) = mint_rs256(
1385            "k1",
1386            "{\"sub\":\"alice\",\"exp\":9999999999,\"scope\":[\"kafka.read\"]}",
1387        );
1388        let (mut v, _h) = signed(&jwks);
1389        v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1390        let result = v.validate(&token, 1_000_000_000_000);
1391        assert!(result.unwrap_err() == AuthError::InvalidToken);
1392    }
1393
1394    #[test]
1395    fn signed_validate_accepts_when_custom_claim_check_passes() {
1396        let (token, jwks) = mint_rs256(
1397            "k1",
1398            "{\"sub\":\"alice\",\"exp\":9999999999,\"scope\":[\"kafka.admin\",\"kafka.read\"]}",
1399        );
1400        let (mut v, _h) = signed(&jwks);
1401        v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1402        let outcome = v.validate(&token, 1_000_000_000_000).expect("valid token");
1403        assert!(outcome.principal.name == "alice");
1404    }
1405
1406    #[test]
1407    fn signed_validate_rejects_when_valid_token_type_mismatch() {
1408        let (token, jwks) = mint_rs256_with_header(
1409            "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"OPAQUE\"}",
1410            "{\"sub\":\"alice\",\"exp\":9999999999}",
1411        );
1412        let (mut v, _h) = signed(&jwks);
1413        v.valid_token_type = Some("JWT".into());
1414        let result = v.validate(&token, 1_000_000_000_000);
1415        assert!(result.unwrap_err() == AuthError::InvalidToken);
1416    }
1417
1418    #[test]
1419    fn signed_validate_accepts_when_valid_token_type_match() {
1420        let (token, jwks) = mint_rs256_with_header(
1421            "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"JWT\"}",
1422            "{\"sub\":\"alice\",\"exp\":9999999999}",
1423        );
1424        let (mut v, _h) = signed(&jwks);
1425        v.valid_token_type = Some("JWT".into());
1426        assert!(v.validate(&token, 1_000_000_000_000).is_ok());
1427    }
1428
1429    #[test]
1430    fn signed_validate_accepts_when_valid_token_type_unset_regardless_of_header() {
1431        let (token, jwks) = mint_rs256_with_header(
1432            "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"OPAQUE\"}",
1433            "{\"sub\":\"alice\",\"exp\":9999999999}",
1434        );
1435        let (v, _h) = signed(&jwks);
1436        // No valid_token_type set → header `typ` ignored.
1437        assert!(v.validate(&token, 1_000_000_000_000).is_ok());
1438    }
1439
1440    #[test]
1441    fn signed_rejects_missing_principal() {
1442        let (token, jwks) = mint_rs256("k1", "{\"exp\":9999999999}");
1443        let (v, _h) = signed(&jwks);
1444        assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1445    }
1446
1447    #[test]
1448    fn signed_custom_principal_claim() {
1449        let (token, jwks) = mint_rs256("k1", "{\"client_id\":\"svc-1\",\"exp\":9999999999}");
1450        let (mut v, _h) = signed(&jwks);
1451        v.principal_claim_name = "client_id".to_string();
1452        assert!(
1453            v.validate(&token, 1_000_000_000_000)
1454                .unwrap()
1455                .principal
1456                .name
1457                == "svc-1"
1458        );
1459    }
1460
1461    #[test]
1462    fn signed_key_rotation_via_handle() {
1463        // Token A verifies under key set A. ES256 with a fresh key per mint, so
1464        // set B is a genuinely different key (RS256's fixed test key can't be).
1465        let (token_a, jwks_a) = mint_es256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1466        let (v, handle) = signed(&jwks_a);
1467        assert!(v.validate(&token_a, 1_000_000_000_000).is_ok());
1468
1469        // Rotate to a fresh key set (same kid, new key). Token A no longer
1470        // verifies; a token under the new key does. Same validator instance.
1471        let (token_b, jwks_b) = mint_es256("k1", "{\"sub\":\"b\",\"exp\":9999999999}");
1472        handle.store(Jwks::from_json(&jwks_b, false).unwrap());
1473        assert!(v.validate(&token_a, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1474        assert!(
1475            v.validate(&token_b, 1_000_000_000_000)
1476                .unwrap()
1477                .principal
1478                .name
1479                == "b"
1480        );
1481    }
1482
1483    #[test]
1484    fn signed_rejects_when_keyset_empty() {
1485        let (token, _jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1486        let v = SignedJwsValidator::new(JwksHandle::default());
1487        assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1488    }
1489
1490    // ---- signed-validator parity --------------------------------
1491
1492    #[test]
1493    fn signed_validate_falls_back_to_alt_claim_when_primary_absent() {
1494        // Mirror the unsecured fallback test using the signed validator.
1495        // No `sub` claim → fallback to `client_id`, then apply prefix.
1496        let (token, jwks) = mint_rs256_with_header(
1497            "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"JWT\"}",
1498            "{\"client_id\":\"svc1\",\"exp\":9999999999,\"iss\":\"https://test.example\"}",
1499        );
1500        let (mut v, _h) = signed(&jwks);
1501        v.fallback_user_name_claim = Some("client_id".into());
1502        v.fallback_user_name_prefix = Some("service-account-".into());
1503        let outcome = v.validate(&token, 1_000_000_000_000).expect("valid");
1504        assert!(outcome.principal.name == "service-account-svc1");
1505    }
1506
1507    #[test]
1508    fn signed_validate_extracts_groups_from_array_claim() {
1509        let (token, jwks) = mint_rs256(
1510            "k1",
1511            "{\"sub\":\"alice\",\"exp\":9999999999,\"groups\":[\"admin\",\"ops\"]}",
1512        );
1513        let (mut v, _h) = signed(&jwks);
1514        v.groups_claim = Some(parse_jp("$.groups"));
1515        let outcome = v.validate(&token, 1_000_000_000_000).expect("valid");
1516        assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
1517    }
1518
1519    // ---- cache expiry + signal-on-verify-failure ----------------
1520
1521    /// Build a signed validator whose paired `JwksHandle` carries explicit
1522    /// `last_successful_fetch_ms` and a fresh signal channel. Returns the
1523    /// validator and the receiver so tests can assert on emitted signals.
1524    fn signed_with_handles(
1525        jwks_json: &str,
1526        last_successful_fetch_ms: i64,
1527    ) -> (SignedJwsValidator, tokio::sync::mpsc::Receiver<()>) {
1528        use std::sync::Arc;
1529        use std::sync::atomic::AtomicI64;
1530        let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1531        let ts = Arc::new(AtomicI64::new(last_successful_fetch_ms));
1532        let handle = JwksHandle::new_with_refresher_handles(
1533            Jwks::from_json(jwks_json, false).unwrap(),
1534            ts,
1535            tx,
1536        );
1537        (SignedJwsValidator::new(handle), rx)
1538    }
1539
1540    #[test]
1541    fn signed_validate_rejects_when_jwks_cache_expired() {
1542        let now_ms: i64 = 10_000_000;
1543        let exp_secs: i64 = (now_ms / 1000) + 60;
1544        let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1545        // Last fetch 2s ago; expiry threshold 1s ⇒ expired.
1546        let (mut v, _rx) = signed_with_handles(&jwks, now_ms - 2_000);
1547        v.expiry_ms = Some(1_000);
1548        assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
1549    }
1550
1551    #[test]
1552    fn signed_validate_accepts_when_jwks_cache_within_expiry() {
1553        let now_ms: i64 = 10_000_000;
1554        let exp_secs: i64 = (now_ms / 1000) + 60;
1555        let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1556        // Last fetch 500ms ago; expiry threshold 1s ⇒ still fresh.
1557        let (mut v, _rx) = signed_with_handles(&jwks, now_ms - 500);
1558        v.expiry_ms = Some(1_000);
1559        let outcome = v.validate(&token, now_ms).expect("valid");
1560        assert!(outcome.principal.name == "alice");
1561    }
1562
1563    #[test]
1564    fn signed_validate_accepts_when_expiry_unset_regardless_of_cache_age() {
1565        let now_ms: i64 = 10_000_000;
1566        let exp_secs: i64 = (now_ms / 1000) + 60;
1567        let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1568        // Cache "is" very stale, but expiry_ms = None ⇒ no check fires.
1569        let (v, _rx) = signed_with_handles(&jwks, now_ms - 999_999_999);
1570        // expiry_ms left at default None.
1571        assert!(v.validate(&token, now_ms).is_ok());
1572    }
1573
1574    #[test]
1575    fn signed_validate_skips_expiry_check_when_never_fetched() {
1576        // last_successful_fetch_ms still at 0 ⇒ no expiry math runs (the
1577        // never-fetched broker-startup window stays open; verify will fail
1578        // anyway because the served key set is intact, but the expiry path
1579        // must not preempt with a spurious rejection).
1580        let now_ms: i64 = 10_000_000;
1581        let exp_secs: i64 = (now_ms / 1000) + 60;
1582        let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1583        let (mut v, _rx) = signed_with_handles(&jwks, 0);
1584        v.expiry_ms = Some(1);
1585        // Cache "age" math would say "very expired" if we ran it, but the
1586        // sentinel-zero guard skips. Verification succeeds.
1587        assert!(v.validate(&token, now_ms).is_ok());
1588    }
1589
1590    #[test]
1591    fn signed_validate_signals_refresh_on_unknown_kid() {
1592        let now_ms: i64 = 10_000_000;
1593        let exp_secs: i64 = (now_ms / 1000) + 60;
1594        // Token's header advertises kid="k1" (the default rs256 mint), but the
1595        // served JWKS has a different kid ⇒ verify() returns InvalidToken AND
1596        // signal_refresh() fires.
1597        let (token, _jwks_with_k1) =
1598            mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1599        // Hand-craft a JWKS with a different kid so verify can't find k1.
1600        let mismatched_jwks =
1601            r#"{"keys":[{"kty":"RSA","kid":"other","n":"AQAB","e":"AQAB"}]}"#.to_string();
1602        let (v, mut rx) = signed_with_handles(&mismatched_jwks, now_ms);
1603        assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
1604        assert!(
1605            rx.try_recv().is_ok(),
1606            "validator should signal refresh on verify failure",
1607        );
1608    }
1609
1610    #[test]
1611    fn signed_validate_does_not_signal_when_verification_succeeds() {
1612        let now_ms: i64 = 10_000_000;
1613        let exp_secs: i64 = (now_ms / 1000) + 60;
1614        let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1615        let (v, mut rx) = signed_with_handles(&jwks, now_ms);
1616        assert!(v.validate(&token, now_ms).is_ok());
1617        assert!(
1618            rx.try_recv().is_err(),
1619            "happy-path verification should not signal a refresh",
1620        );
1621    }
1622
1623    #[tokio::test]
1624    async fn enum_dispatches_unsecured_and_signed() {
1625        // Unsecured default.
1626        let unsecured = OAuthBearerValidator::default();
1627        assert!(unsecured.jwks_handle().is_none());
1628        let tok = unsecured_token("admin", 999_999_000, 9_999_999_999);
1629        assert!(unsecured.validate(&tok, 1_000_000_000_000).await.is_ok());
1630
1631        // Signed.
1632        let (token, jwks) = mint_rs256("k1", "{\"sub\":\"x\",\"exp\":9999999999}");
1633        let (sv, _h) = signed(&jwks);
1634        let signed_enum = OAuthBearerValidator::Signed(sv);
1635        assert!(signed_enum.jwks_handle().is_some());
1636        assert!(
1637            signed_enum
1638                .validate(&token, 1_000_000_000_000)
1639                .await
1640                .unwrap()
1641                .principal
1642                .name
1643                == "x"
1644        );
1645    }
1646
1647    fn unsecured_token(sub: &str, iat_s: i64, exp_s: i64) -> String {
1648        jws(
1649            "{\"alg\":\"none\"}",
1650            &format!("{{\"sub\":\"{sub}\",\"iat\":{iat_s},\"exp\":{exp_s}}}"),
1651        )
1652    }
1653
1654    #[test]
1655    fn custom_claim_check_compile_error_at_validator_construction() {
1656        // Operators paste a malformed expression. We catch it at parse
1657        // time (validator construction), not per-token validation.
1658        let result = parse_json_path("@.unterminated");
1659        assert!(result.is_err(), "malformed expression must fail to parse");
1660    }
1661}
1662
1663#[cfg(test)]
1664mod introspection_tests {
1665    use super::*;
1666    use crate::{AuthError, AuthMethod};
1667    use assert2::assert;
1668    use jsonpath_rust::parser::parse_json_path;
1669    use serde_json::{Value, json};
1670    use std::collections::HashMap;
1671    use std::sync::Mutex;
1672
1673    /// Per-token canned responses. `introspect` returns the entry for
1674    /// the matching token (or a Transport error if absent so a test can
1675    /// exercise the transport-error path).
1676    #[derive(Debug, Default)]
1677    struct MockIntrospectionClient {
1678        introspect_responses: Mutex<HashMap<String, Result<Value, IntrospectionError>>>,
1679        userinfo_responses: Mutex<HashMap<String, Result<Option<Value>, IntrospectionError>>>,
1680    }
1681
1682    impl MockIntrospectionClient {
1683        fn arc() -> Arc<Self> {
1684            Arc::new(Self::default())
1685        }
1686        fn set_introspect(&self, token: &str, resp: Result<Value, IntrospectionError>) {
1687            self.introspect_responses
1688                .lock()
1689                .unwrap()
1690                .insert(token.into(), resp);
1691        }
1692        fn set_userinfo(&self, token: &str, resp: Result<Option<Value>, IntrospectionError>) {
1693            self.userinfo_responses
1694                .lock()
1695                .unwrap()
1696                .insert(token.into(), resp);
1697        }
1698    }
1699
1700    #[async_trait::async_trait]
1701    impl IntrospectionClient for MockIntrospectionClient {
1702        async fn introspect(&self, token: &str) -> Result<Value, IntrospectionError> {
1703            self.introspect_responses
1704                .lock()
1705                .unwrap()
1706                .remove(token)
1707                .unwrap_or(Err(IntrospectionError::Transport(
1708                    "no canned response".into(),
1709                )))
1710        }
1711        async fn userinfo(&self, token: &str) -> Result<Option<Value>, IntrospectionError> {
1712            self.userinfo_responses
1713                .lock()
1714                .unwrap()
1715                .remove(token)
1716                .unwrap_or(Ok(None))
1717        }
1718    }
1719
1720    fn validator(client: Arc<MockIntrospectionClient>) -> IntrospectionValidator {
1721        IntrospectionValidator {
1722            client,
1723            principal_claim_name: "sub".into(),
1724            custom_claim_check: None,
1725            call_userinfo: false,
1726            allowable_clock_skew_ms: 30_000,
1727            expected_audience: None,
1728            fallback_user_name_claim: None,
1729            fallback_user_name_prefix: None,
1730            groups_claim: None,
1731            groups_claim_delimiter: None,
1732        }
1733    }
1734
1735    fn parse_jp(expr: &str) -> JpQuery {
1736        parse_json_path(expr).expect("expression compiles")
1737    }
1738
1739    const NOW_MS: i64 = 1_700_000_000_000;
1740
1741    #[tokio::test]
1742    async fn introspection_active_true_with_principal_returns_ok() {
1743        let mock = MockIntrospectionClient::arc();
1744        mock.set_introspect(
1745            "tok",
1746            Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
1747        );
1748        let v = validator(mock.clone());
1749        let outcome = v.validate("tok", NOW_MS).await.unwrap();
1750        assert!(outcome.principal.name == "alice");
1751        assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
1752    }
1753
1754    #[tokio::test]
1755    async fn introspection_active_false_rejected() {
1756        let mock = MockIntrospectionClient::arc();
1757        mock.set_introspect("tok", Ok(json!({"active": false})));
1758        let v = validator(mock.clone());
1759        assert!(matches!(
1760            v.validate("tok", NOW_MS).await,
1761            Err(AuthError::InvalidToken)
1762        ));
1763    }
1764
1765    #[tokio::test]
1766    async fn introspection_missing_active_field_rejected() {
1767        let mock = MockIntrospectionClient::arc();
1768        mock.set_introspect("tok", Ok(json!({"sub": "alice"})));
1769        let v = validator(mock.clone());
1770        assert!(matches!(
1771            v.validate("tok", NOW_MS).await,
1772            Err(AuthError::InvalidToken)
1773        ));
1774    }
1775
1776    #[tokio::test]
1777    async fn introspection_expired_exp_rejected() {
1778        let mock = MockIntrospectionClient::arc();
1779        mock.set_introspect(
1780            "tok",
1781            Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 - 3600})),
1782        );
1783        let v = validator(mock.clone());
1784        assert!(matches!(
1785            v.validate("tok", NOW_MS).await,
1786            Err(AuthError::InvalidToken)
1787        ));
1788    }
1789
1790    // ---- IntrospectionValidator custom_claim_check -------------
1791
1792    #[tokio::test]
1793    async fn introspection_validate_rejects_when_custom_claim_check_fails() {
1794        let mock = MockIntrospectionClient::arc();
1795        mock.set_introspect(
1796            "tok",
1797            Ok(json!({
1798                "active": true,
1799                "sub": "alice",
1800                "exp": NOW_MS/1000 + 60,
1801                "scope": ["kafka.read"],
1802            })),
1803        );
1804        let mut v = validator(mock.clone());
1805        v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1806        let result = v.validate("tok", NOW_MS).await;
1807        assert!(result.unwrap_err() == AuthError::InvalidToken);
1808    }
1809
1810    #[tokio::test]
1811    async fn introspection_validate_accepts_when_custom_claim_check_passes() {
1812        let mock = MockIntrospectionClient::arc();
1813        mock.set_introspect(
1814            "tok",
1815            Ok(json!({
1816                "active": true,
1817                "sub": "alice",
1818                "exp": NOW_MS/1000 + 60,
1819                "scope": ["kafka.admin", "kafka.read"],
1820            })),
1821        );
1822        let mut v = validator(mock.clone());
1823        v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1824        let outcome = v.validate("tok", NOW_MS).await.expect("valid");
1825        assert!(outcome.principal.name == "alice");
1826    }
1827
1828    // ---- IntrospectionValidator expected_audience --------------
1829
1830    #[tokio::test]
1831    async fn introspection_honors_audience_string_and_array() {
1832        // Matching string `aud`.
1833        let mock = MockIntrospectionClient::arc();
1834        mock.set_introspect(
1835            "tok",
1836            Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "kafka"})),
1837        );
1838        let mut v = validator(mock.clone());
1839        v.expected_audience = Some("kafka".to_string());
1840        assert!(v.validate("tok", NOW_MS).await.is_ok());
1841
1842        // Matching value inside an `aud` array.
1843        let mock2 = MockIntrospectionClient::arc();
1844        mock2.set_introspect(
1845            "tok",
1846            Ok(json!({
1847                "active": true, "sub": "a", "exp": NOW_MS/1000 + 60,
1848                "aud": ["other", "kafka"],
1849            })),
1850        );
1851        let mut v2 = validator(mock2.clone());
1852        v2.expected_audience = Some("kafka".to_string());
1853        assert!(v2.validate("tok", NOW_MS).await.is_ok());
1854    }
1855
1856    #[tokio::test]
1857    async fn introspection_rejects_non_matching_audience() {
1858        let mock = MockIntrospectionClient::arc();
1859        mock.set_introspect(
1860            "tok",
1861            Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "web"})),
1862        );
1863        let mut v = validator(mock.clone());
1864        v.expected_audience = Some("kafka".to_string());
1865        assert!(v.validate("tok", NOW_MS).await == Err(AuthError::InvalidToken));
1866    }
1867
1868    #[tokio::test]
1869    async fn introspection_rejects_missing_audience_when_expected() {
1870        let mock = MockIntrospectionClient::arc();
1871        mock.set_introspect(
1872            "tok",
1873            Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60})),
1874        );
1875        let mut v = validator(mock.clone());
1876        v.expected_audience = Some("kafka".to_string());
1877        assert!(v.validate("tok", NOW_MS).await == Err(AuthError::InvalidToken));
1878    }
1879
1880    #[tokio::test]
1881    async fn introspection_ignores_audience_when_unset() {
1882        // Default (expected_audience == None): any `aud` is accepted.
1883        let mock = MockIntrospectionClient::arc();
1884        mock.set_introspect(
1885            "tok",
1886            Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "web"})),
1887        );
1888        let v = validator(mock.clone());
1889        assert!(v.validate("tok", NOW_MS).await.is_ok());
1890    }
1891
1892    #[tokio::test]
1893    async fn introspection_validate_does_not_check_valid_token_type() {
1894        // Introspection responses have no JWT header → typ check is N/A.
1895        // The struct doesn't even expose a valid_token_type field; this
1896        // is a regression test that validation passes regardless of any
1897        // hypothetical typ in the response (introspection responses
1898        // don't carry `typ`).
1899        let mock = MockIntrospectionClient::arc();
1900        mock.set_introspect(
1901            "tok",
1902            Ok(json!({
1903                "active": true,
1904                "sub": "alice",
1905                "exp": NOW_MS/1000 + 60,
1906            })),
1907        );
1908        let v = validator(mock.clone());
1909        let outcome = v.validate("tok", NOW_MS).await.expect("valid");
1910        assert!(outcome.principal.name == "alice");
1911    }
1912
1913    #[tokio::test]
1914    async fn introspection_userinfo_claims_override_introspection_for_profile_keys() {
1915        let mock = MockIntrospectionClient::arc();
1916        mock.set_introspect(
1917            "tok",
1918            Ok(json!({
1919                "active": true,
1920                "sub": "alice",
1921                "exp": NOW_MS/1000 + 60,
1922                "preferred_username": "intros-name",
1923            })),
1924        );
1925        mock.set_userinfo(
1926            "tok",
1927            Ok(Some(
1928                json!({"preferred_username": "userinfo-name", "email": "a@b.c"}),
1929            )),
1930        );
1931        let mut v = validator(mock.clone());
1932        v.call_userinfo = true;
1933        v.principal_claim_name = "preferred_username".into();
1934        let outcome = v.validate("tok", NOW_MS).await.unwrap();
1935        assert!(outcome.principal.name == "userinfo-name");
1936    }
1937
1938    #[tokio::test]
1939    async fn introspection_userinfo_does_not_override_authorization_keys() {
1940        let mock = MockIntrospectionClient::arc();
1941        mock.set_introspect(
1942            "tok",
1943            Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
1944        );
1945        mock.set_userinfo("tok", Ok(Some(json!({"active": false, "sub": "mallory"}))));
1946        let mut v = validator(mock.clone());
1947        v.call_userinfo = true;
1948        let outcome = v.validate("tok", NOW_MS).await.unwrap();
1949        assert!(
1950            outcome.principal.name == "alice",
1951            "sub from introspection wins over userinfo"
1952        );
1953    }
1954
1955    #[tokio::test]
1956    async fn introspection_userinfo_disabled_when_call_userinfo_false() {
1957        let mock = MockIntrospectionClient::arc();
1958        mock.set_introspect(
1959            "tok",
1960            Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
1961        );
1962        // Deliberately set a userinfo response — should be ignored.
1963        mock.set_userinfo("tok", Ok(Some(json!({"preferred_username": "ignored"}))));
1964        let v = validator(mock.clone()); // call_userinfo: false (default)
1965        let outcome = v.validate("tok", NOW_MS).await.unwrap();
1966        assert!(outcome.principal.name == "alice");
1967    }
1968
1969    #[tokio::test]
1970    async fn introspection_transport_error_becomes_introspection_transport() {
1971        let mock = MockIntrospectionClient::arc();
1972        mock.set_introspect(
1973            "tok",
1974            Err(IntrospectionError::Transport("connection refused".into())),
1975        );
1976        let v = validator(mock.clone());
1977        let err = v.validate("tok", NOW_MS).await.unwrap_err();
1978        assert!(
1979            matches!(err, AuthError::IntrospectionTransport(ref msg) if msg.contains("connection refused")),
1980            "got {err:?}",
1981        );
1982    }
1983
1984    #[tokio::test]
1985    async fn introspection_default_principal_claim_sub() {
1986        let mock = MockIntrospectionClient::arc();
1987        mock.set_introspect(
1988            "tok",
1989            Ok(json!({"active": true, "sub": "sub-name", "exp": NOW_MS/1000 + 60})),
1990        );
1991        let v = validator(mock.clone());
1992        assert!(v.validate("tok", NOW_MS).await.unwrap().principal.name == "sub-name");
1993    }
1994
1995    #[tokio::test]
1996    async fn introspection_custom_principal_claim_client_id() {
1997        let mock = MockIntrospectionClient::arc();
1998        mock.set_introspect(
1999            "tok",
2000            Ok(json!({
2001                "active": true,
2002                "sub": "sub-name",
2003                "exp": NOW_MS/1000 + 60,
2004                "client_id": "my-client",
2005            })),
2006        );
2007        let mut v = validator(mock.clone());
2008        v.principal_claim_name = "client_id".into();
2009        assert!(v.validate("tok", NOW_MS).await.unwrap().principal.name == "my-client");
2010    }
2011
2012    #[tokio::test]
2013    async fn enum_dispatch_introspection_async() {
2014        let mock = MockIntrospectionClient::arc();
2015        mock.set_introspect(
2016            "tok",
2017            Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
2018        );
2019        let v = validator(mock.clone());
2020        let enum_v = OAuthBearerValidator::Introspection(v);
2021        let outcome = enum_v.validate("tok", NOW_MS).await.unwrap();
2022        assert!(outcome.principal.name == "alice");
2023    }
2024
2025    #[tokio::test]
2026    async fn introspection_validate_surfaces_exp_from_introspection_response() {
2027        let exp_secs: i64 = 2_000;
2028        let now_ms: i64 = 1_000_000;
2029        let mock = MockIntrospectionClient::arc();
2030        mock.set_introspect(
2031            "opaque-token",
2032            Ok(json!({
2033                "active": true,
2034                "sub": "alice",
2035                "exp": exp_secs,
2036                "scope": "kafka.write",
2037            })),
2038        );
2039        let v = validator(mock.clone());
2040        let outcome = v
2041            .validate("opaque-token", now_ms)
2042            .await
2043            .expect("token valid");
2044        assert!(outcome.principal.name == "alice");
2045        assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
2046    }
2047
2048    // ---- introspection parity -----------------------------------
2049
2050    #[tokio::test]
2051    async fn introspection_validate_extracts_groups_from_introspection_response() {
2052        let exp_secs: i64 = 2_000;
2053        let now_ms: i64 = 1_000_000;
2054        let mock = MockIntrospectionClient::arc();
2055        mock.set_introspect(
2056            "opaque-token",
2057            Ok(json!({
2058                "active": true,
2059                "sub": "alice",
2060                "exp": exp_secs,
2061                "groups": ["admin", "ops"],
2062            })),
2063        );
2064        let mut v = validator(mock.clone());
2065        v.groups_claim = Some(parse_jp("$.groups"));
2066        let outcome = v.validate("opaque-token", now_ms).await.expect("valid");
2067        assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
2068    }
2069}