1use std::sync::Arc;
16
17use base64::Engine;
18use base64::engine::general_purpose::URL_SAFE_NO_PAD as B64URL;
19use jsonpath_rust::parser::model::JpQuery;
20use jsonpath_rust::query::js_path_process;
21use serde_json::Value;
22
23use crate::jwks::JwksHandle;
24use crate::{AuthError, AuthMethod, Principal};
25
26#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct AuthOutcome {
32 pub principal: Principal,
33 pub expires_at_ms: Option<i64>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
42pub struct ClientInitialResponse {
43 pub token: String,
45 pub authzid: Option<String>,
47}
48
49pub fn parse_client_initial_response(bytes: &[u8]) -> Result<ClientInitialResponse, AuthError> {
64 let s = std::str::from_utf8(bytes).map_err(|_| AuthError::MalformedMessage)?;
65
66 let (gs2, rest) = s.split_once('\u{1}').ok_or(AuthError::MalformedMessage)?;
68 let authzid = parse_gs2_header(gs2)?;
69
70 let mut token = None;
73 for pair in rest.split('\u{1}') {
74 if pair.is_empty() {
75 continue;
76 }
77 let (key, value) = pair.split_once('=').ok_or(AuthError::MalformedMessage)?;
78 if key == "auth" {
79 let t = value
80 .strip_prefix("Bearer ")
81 .ok_or(AuthError::MalformedMessage)?;
82 token = Some(t.to_string());
83 }
84 }
86
87 let token = token.ok_or(AuthError::MalformedMessage)?;
88 Ok(ClientInitialResponse { token, authzid })
89}
90
91fn parse_gs2_header(gs2: &str) -> Result<Option<String>, AuthError> {
94 let rest = if let Some(r) = gs2.strip_prefix("n,") {
97 r
98 } else if let Some(r) = gs2.strip_prefix("y,") {
99 r
100 } else {
101 return Err(AuthError::MalformedMessage);
102 };
103 let authzid = rest.strip_suffix(',').ok_or(AuthError::MalformedMessage)?;
105 if authzid.is_empty() {
106 Ok(None)
107 } else {
108 Ok(Some(
110 authzid.strip_prefix("a=").unwrap_or(authzid).to_string(),
111 ))
112 }
113}
114
115#[derive(Debug, Clone, PartialEq)]
119pub struct UnsecuredJwsValidator {
120 pub principal_claim_name: String,
122 pub allowable_clock_skew_ms: i64,
125 pub custom_claim_check: Option<JpQuery>,
129 pub valid_token_type: Option<String>,
132 pub fallback_user_name_claim: Option<String>,
137 pub fallback_user_name_prefix: Option<String>,
140 pub groups_claim: Option<JpQuery>,
143 pub groups_claim_delimiter: Option<String>,
146}
147
148impl Default for UnsecuredJwsValidator {
149 fn default() -> Self {
150 Self {
151 principal_claim_name: "sub".to_string(),
152 allowable_clock_skew_ms: 30_000,
153 custom_claim_check: None,
154 valid_token_type: None,
155 fallback_user_name_claim: None,
156 fallback_user_name_prefix: None,
157 groups_claim: None,
158 groups_claim_delimiter: None,
159 }
160 }
161}
162
163impl UnsecuredJwsValidator {
164 pub fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
173 let mut segs = token.split('.');
176 let header_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
177 let payload_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
178 let sig = segs.next().ok_or(AuthError::InvalidToken)?;
179 if segs.next().is_some() {
180 return Err(AuthError::InvalidToken);
181 }
182 if !sig.is_empty() {
183 return Err(AuthError::InvalidToken);
185 }
186
187 let header: Value = decode_json_segment(header_b64)?;
188 if header.get("alg").and_then(Value::as_str) != Some("none") {
189 return Err(AuthError::InvalidToken);
190 }
191 if let Some(expected_typ) = &self.valid_token_type
193 && header.get("typ").and_then(Value::as_str) != Some(expected_typ.as_str())
194 {
195 return Err(AuthError::InvalidToken);
196 }
197
198 let claims: Value = decode_json_segment(payload_b64)?;
199
200 let exp_ms = numeric_date_ms(&claims, "exp").ok_or(AuthError::InvalidToken)?;
202 if exp_ms + self.allowable_clock_skew_ms <= now_ms {
203 return Err(AuthError::InvalidToken);
204 }
205 if let Some(iat_ms) = numeric_date_ms(&claims, "iat")
207 && iat_ms - self.allowable_clock_skew_ms > now_ms
208 {
209 return Err(AuthError::InvalidToken);
210 }
211
212 if let Some(path) = &self.custom_claim_check
214 && !evaluate_custom_claim_check(path, &claims)
215 {
216 return Err(AuthError::InvalidToken);
217 }
218
219 let (raw_name, used_fallback) = if let Some(n) = claims
222 .get(&self.principal_claim_name)
223 .and_then(Value::as_str)
224 .filter(|s| !s.is_empty())
225 {
226 (n.to_string(), false)
227 } else {
228 let fallback_claim = self
229 .fallback_user_name_claim
230 .as_deref()
231 .ok_or(AuthError::InvalidToken)?;
232 let raw = claims
233 .get(fallback_claim)
234 .and_then(Value::as_str)
235 .filter(|s| !s.is_empty())
236 .ok_or(AuthError::InvalidToken)?;
237 (raw.to_string(), true)
238 };
239 let name = if used_fallback {
240 match &self.fallback_user_name_prefix {
241 Some(prefix) => format!("{prefix}{raw_name}"),
242 None => raw_name,
243 }
244 } else {
245 raw_name
246 };
247
248 let groups = match &self.groups_claim {
250 Some(path) => extract_groups(path, &claims, self.groups_claim_delimiter.as_deref()),
251 None => Vec::new(),
252 };
253
254 Ok(AuthOutcome {
255 principal: Principal {
256 name,
257 auth_method: AuthMethod::SaslOAuthBearer,
258 groups,
259 },
260 expires_at_ms: Some(exp_ms),
261 })
262 }
263}
264
265fn extract_groups(path: &JpQuery, claims: &Value, delimiter: Option<&str>) -> Vec<String> {
276 let Ok(refs) = js_path_process(path, claims) else {
277 return Vec::new();
278 };
279 let mut out = Vec::new();
280 for r in refs {
281 match r.val() {
282 Value::String(s) => match delimiter {
283 Some(d) => out.extend(
284 s.split(d)
285 .map(str::trim)
286 .filter(|s| !s.is_empty())
287 .map(String::from),
288 ),
289 None => out.push(s.clone()),
290 },
291 Value::Array(items) => {
292 out.extend(items.iter().filter_map(Value::as_str).map(String::from));
293 }
294 _ => {} }
296 }
297 out
298}
299
300fn evaluate_custom_claim_check(path: &JpQuery, claims: &Value) -> bool {
305 let Ok(refs) = js_path_process(path, claims) else {
306 return false;
307 };
308 if refs.is_empty() {
309 return false;
310 }
311 for r in refs {
312 match r.val() {
313 Value::Null | Value::Bool(false) => return false,
314 _ => {}
315 }
316 }
317 true
318}
319
320fn audience_contains(claims: &Value, expected: &str) -> bool {
323 match claims.get("aud") {
324 Some(Value::String(s)) => s == expected,
325 Some(Value::Array(items)) => items
326 .iter()
327 .filter_map(Value::as_str)
328 .any(|a| a == expected),
329 _ => false,
330 }
331}
332
333fn decode_json_segment(seg: &str) -> Result<Value, AuthError> {
335 let bytes = B64URL.decode(seg).map_err(|_| AuthError::InvalidToken)?;
336 serde_json::from_slice(&bytes).map_err(|_| AuthError::InvalidToken)
337}
338
339fn numeric_date_ms(claims: &Value, key: &str) -> Option<i64> {
342 let v = claims.get(key)?;
343 if let Some(secs) = v.as_i64() {
345 return secs.checked_mul(1000);
346 }
347 let ms = v.as_f64()? * 1000.0;
349 if ms.is_finite() {
350 #[allow(clippy::cast_possible_truncation)]
351 Some(ms as i64)
352 } else {
353 None
354 }
355}
356
357#[must_use]
362pub fn invalid_token_json() -> String {
363 "{\"status\":\"invalid_token\"}".to_string()
364}
365
366#[derive(Debug, Clone)]
374pub struct SignedJwsValidator {
375 pub principal_claim_name: String,
377 pub allowable_clock_skew_ms: i64,
379 pub valid_issuer: Option<String>,
381 pub expected_audience: Option<String>,
383 pub custom_claim_check: Option<JpQuery>,
386 pub valid_token_type: Option<String>,
388 pub fallback_user_name_claim: Option<String>,
390 pub fallback_user_name_prefix: Option<String>,
392 pub groups_claim: Option<JpQuery>,
394 pub groups_claim_delimiter: Option<String>,
396 pub expiry_ms: Option<i64>,
403 keys: JwksHandle,
405}
406
407impl SignedJwsValidator {
408 #[must_use]
411 pub fn new(keys: JwksHandle) -> Self {
412 Self {
413 principal_claim_name: "sub".to_string(),
414 allowable_clock_skew_ms: 30_000,
415 valid_issuer: None,
416 expected_audience: None,
417 custom_claim_check: None,
418 valid_token_type: None,
419 fallback_user_name_claim: None,
420 fallback_user_name_prefix: None,
421 groups_claim: None,
422 groups_claim_delimiter: None,
423 expiry_ms: None,
424 keys,
425 }
426 }
427
428 #[must_use]
431 pub fn key_handle(&self) -> JwksHandle {
432 self.keys.clone()
433 }
434
435 pub fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
442 let mut segs = token.split('.');
444 let header_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
445 let payload_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
446 let sig_b64 = segs.next().ok_or(AuthError::InvalidToken)?;
447 if segs.next().is_some() || sig_b64.is_empty() {
448 return Err(AuthError::InvalidToken);
449 }
450
451 let header: Value = decode_json_segment(header_b64)?;
452 let alg = header
453 .get("alg")
454 .and_then(Value::as_str)
455 .ok_or(AuthError::InvalidToken)?;
456 if alg != "RS256" && alg != "ES256" {
457 return Err(AuthError::InvalidToken);
458 }
459 if let Some(expected_typ) = &self.valid_token_type
461 && header.get("typ").and_then(Value::as_str) != Some(expected_typ.as_str())
462 {
463 return Err(AuthError::InvalidToken);
464 }
465
466 if let Some(expiry_ms) = self.expiry_ms {
473 let last_fetch = self.keys.last_successful_fetch_ms();
474 if last_fetch > 0 && now_ms.saturating_sub(last_fetch) > expiry_ms {
475 tracing::debug!(
476 last_fetch_ms = last_fetch,
477 now_ms,
478 expiry_ms,
479 "JWKS cache expired; rejecting token until next successful refresh",
480 );
481 return Err(AuthError::InvalidToken);
482 }
483 }
484
485 let kid = header.get("kid").and_then(Value::as_str);
486
487 let signing_input = format!("{header_b64}.{payload_b64}");
488 let sig = B64URL
489 .decode(sig_b64)
490 .map_err(|_| AuthError::InvalidToken)?;
491 if let Err(e) = self
498 .keys
499 .load()
500 .verify(kid, alg, signing_input.as_bytes(), &sig)
501 {
502 self.keys.signal_refresh();
503 return Err(e);
504 }
505
506 let claims: Value = decode_json_segment(payload_b64)?;
507 self.check_claims(&claims, now_ms)
508 }
509
510 fn check_claims(&self, claims: &Value, now_ms: i64) -> Result<AuthOutcome, AuthError> {
514 let exp_ms = numeric_date_ms(claims, "exp").ok_or(AuthError::InvalidToken)?;
516 if exp_ms + self.allowable_clock_skew_ms <= now_ms {
517 return Err(AuthError::InvalidToken);
518 }
519 if let Some(iat_ms) = numeric_date_ms(claims, "iat")
521 && iat_ms - self.allowable_clock_skew_ms > now_ms
522 {
523 return Err(AuthError::InvalidToken);
524 }
525 if let Some(nbf_ms) = numeric_date_ms(claims, "nbf")
527 && nbf_ms - self.allowable_clock_skew_ms > now_ms
528 {
529 return Err(AuthError::InvalidToken);
530 }
531
532 if let Some(expected) = &self.valid_issuer
533 && claims.get("iss").and_then(Value::as_str) != Some(expected.as_str())
534 {
535 return Err(AuthError::InvalidToken);
536 }
537
538 if let Some(expected) = &self.expected_audience
539 && !audience_contains(claims, expected)
540 {
541 return Err(AuthError::InvalidToken);
542 }
543
544 if let Some(path) = &self.custom_claim_check
546 && !evaluate_custom_claim_check(path, claims)
547 {
548 return Err(AuthError::InvalidToken);
549 }
550
551 let (raw_name, used_fallback) = if let Some(n) = claims
553 .get(&self.principal_claim_name)
554 .and_then(Value::as_str)
555 .filter(|s| !s.is_empty())
556 {
557 (n.to_string(), false)
558 } else {
559 let fallback_claim = self
560 .fallback_user_name_claim
561 .as_deref()
562 .ok_or(AuthError::InvalidToken)?;
563 let raw = claims
564 .get(fallback_claim)
565 .and_then(Value::as_str)
566 .filter(|s| !s.is_empty())
567 .ok_or(AuthError::InvalidToken)?;
568 (raw.to_string(), true)
569 };
570 let name = if used_fallback {
571 match &self.fallback_user_name_prefix {
572 Some(prefix) => format!("{prefix}{raw_name}"),
573 None => raw_name,
574 }
575 } else {
576 raw_name
577 };
578
579 let groups = match &self.groups_claim {
580 Some(path) => extract_groups(path, claims, self.groups_claim_delimiter.as_deref()),
581 None => Vec::new(),
582 };
583
584 Ok(AuthOutcome {
585 principal: Principal {
586 name,
587 auth_method: AuthMethod::SaslOAuthBearer,
588 groups,
589 },
590 expires_at_ms: Some(exp_ms),
591 })
592 }
593}
594
595#[derive(Debug, Clone)]
600pub enum OAuthBearerValidator {
601 Unsecured(UnsecuredJwsValidator),
603 Signed(SignedJwsValidator),
605 Introspection(IntrospectionValidator),
607}
608
609impl Default for OAuthBearerValidator {
610 fn default() -> Self {
611 Self::Unsecured(UnsecuredJwsValidator::default())
612 }
613}
614
615impl OAuthBearerValidator {
616 pub async fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
624 match self {
625 Self::Unsecured(v) => v.validate(token, now_ms),
626 Self::Signed(v) => v.validate(token, now_ms),
627 Self::Introspection(v) => v.validate(token, now_ms).await,
628 }
629 }
630
631 #[must_use]
634 pub fn jwks_handle(&self) -> Option<JwksHandle> {
635 match self {
636 Self::Unsecured(_) | Self::Introspection(_) => None,
637 Self::Signed(v) => Some(v.key_handle()),
638 }
639 }
640}
641
642#[async_trait::async_trait]
647pub trait IntrospectionClient: Send + Sync + std::fmt::Debug {
648 async fn introspect(&self, token: &str) -> Result<serde_json::Value, IntrospectionError>;
651
652 async fn userinfo(&self, token: &str) -> Result<Option<serde_json::Value>, IntrospectionError>;
656}
657
658#[derive(Debug, thiserror::Error)]
662pub enum IntrospectionError {
663 #[error("transport: {0}")]
664 Transport(String),
665 #[error("non-2xx response: {0}")]
666 Status(u16),
667 #[error("invalid JSON body")]
668 Parse,
669}
670
671#[derive(Debug, Clone)]
678pub struct IntrospectionValidator {
679 pub client: Arc<dyn IntrospectionClient>,
680 pub principal_claim_name: String,
684 pub custom_claim_check: Option<JpQuery>,
688 pub call_userinfo: bool,
692 pub allowable_clock_skew_ms: i64,
695 pub expected_audience: Option<String>,
700 pub fallback_user_name_claim: Option<String>,
702 pub fallback_user_name_prefix: Option<String>,
704 pub groups_claim: Option<JpQuery>,
708 pub groups_claim_delimiter: Option<String>,
710}
711
712impl IntrospectionValidator {
713 pub async fn validate(&self, token: &str, now_ms: i64) -> Result<AuthOutcome, AuthError> {
724 let mut claims = self
725 .client
726 .introspect(token)
727 .await
728 .map_err(|e| AuthError::IntrospectionTransport(e.to_string()))?;
729 if claims.get("active").and_then(Value::as_bool) != Some(true) {
730 return Err(AuthError::InvalidToken);
731 }
732 if let Some(expected) = &self.expected_audience
738 && !audience_contains(&claims, expected)
739 {
740 return Err(AuthError::InvalidToken);
741 }
742 check_temporal_claims(&claims, now_ms, self.allowable_clock_skew_ms)?;
743 let exp_ms = numeric_date_ms(&claims, "exp").ok_or(AuthError::InvalidToken)?;
751 if self.call_userinfo
752 && let Some(ui) = self
753 .client
754 .userinfo(token)
755 .await
756 .map_err(|e| AuthError::IntrospectionTransport(e.to_string()))?
757 {
758 merge_userinfo_over_introspection(&mut claims, ui);
759 }
760 if let Some(path) = &self.custom_claim_check
763 && !evaluate_custom_claim_check(path, &claims)
764 {
765 return Err(AuthError::InvalidToken);
766 }
767 let (raw_name, used_fallback) = if let Some(n) = claims
769 .get(&self.principal_claim_name)
770 .and_then(Value::as_str)
771 .filter(|s| !s.is_empty())
772 {
773 (n.to_string(), false)
774 } else {
775 let fallback_claim = self
776 .fallback_user_name_claim
777 .as_deref()
778 .ok_or(AuthError::InvalidToken)?;
779 let raw = claims
780 .get(fallback_claim)
781 .and_then(Value::as_str)
782 .filter(|s| !s.is_empty())
783 .ok_or(AuthError::InvalidToken)?;
784 (raw.to_string(), true)
785 };
786 let name = if used_fallback {
787 match &self.fallback_user_name_prefix {
788 Some(prefix) => format!("{prefix}{raw_name}"),
789 None => raw_name,
790 }
791 } else {
792 raw_name
793 };
794
795 let groups = match &self.groups_claim {
796 Some(path) => extract_groups(path, &claims, self.groups_claim_delimiter.as_deref()),
797 None => Vec::new(),
798 };
799
800 Ok(AuthOutcome {
801 principal: Principal {
802 name,
803 auth_method: AuthMethod::SaslOAuthBearer,
804 groups,
805 },
806 expires_at_ms: Some(exp_ms),
807 })
808 }
809}
810
811fn check_temporal_claims(claims: &Value, now_ms: i64, skew_ms: i64) -> Result<(), AuthError> {
814 if let Some(exp_s) = claims.get("exp").and_then(Value::as_i64) {
815 let exp_ms = exp_s.saturating_mul(1000);
816 if now_ms.saturating_sub(skew_ms) > exp_ms {
817 return Err(AuthError::InvalidToken);
818 }
819 }
820 if let Some(iat_s) = claims.get("iat").and_then(Value::as_i64) {
821 let iat_ms = iat_s.saturating_mul(1000);
822 if iat_ms.saturating_sub(skew_ms) > now_ms {
823 return Err(AuthError::InvalidToken);
824 }
825 }
826 if let Some(nbf_s) = claims.get("nbf").and_then(Value::as_i64) {
827 let nbf_ms = nbf_s.saturating_mul(1000);
828 if nbf_ms.saturating_sub(skew_ms) > now_ms {
829 return Err(AuthError::InvalidToken);
830 }
831 }
832 Ok(())
833}
834
835fn merge_userinfo_over_introspection(introspection: &mut Value, userinfo: Value) {
840 const RESERVED: &[&str] = &["active", "exp", "iat", "nbf", "scope", "client_id", "sub"];
841 let (Some(obj), Value::Object(ui_map)) = (introspection.as_object_mut(), userinfo) else {
842 return;
843 };
844 for (k, v) in ui_map {
845 if !RESERVED.contains(&k.as_str()) {
846 obj.insert(k, v);
847 }
848 }
849}
850
851#[cfg(test)]
852mod tests {
853 use super::*;
854 use assert2::assert;
855 use jsonpath_rust::parser::parse_json_path;
856
857 fn jws(header: &str, claims: &str) -> String {
858 format!(
859 "{}.{}.",
860 B64URL.encode(header.as_bytes()),
861 B64URL.encode(claims.as_bytes())
862 )
863 }
864
865 fn unsecured(sub: &str, iat_s: i64, exp_s: i64) -> String {
866 jws(
867 "{\"alg\":\"none\"}",
868 &format!("{{\"sub\":\"{sub}\",\"iat\":{iat_s},\"exp\":{exp_s}}}"),
869 )
870 }
871
872 fn make_unsecured_jws_with_header(
876 header: &serde_json::Value,
877 claims: &serde_json::Value,
878 ) -> String {
879 format!(
880 "{}.{}.",
881 B64URL.encode(serde_json::to_vec(header).unwrap()),
882 B64URL.encode(serde_json::to_vec(claims).unwrap()),
883 )
884 }
885
886 fn make_unsecured_jws(claims: &serde_json::Value) -> String {
887 make_unsecured_jws_with_header(&serde_json::json!({"alg": "none"}), claims)
888 }
889
890 fn parse_jp(expr: &str) -> JpQuery {
891 parse_json_path(expr).expect("expression compiles")
892 }
893
894 fn client_resp(token: &str) -> Vec<u8> {
895 format!("n,,\u{1}auth=Bearer {token}\u{1}\u{1}").into_bytes()
896 }
897
898 #[test]
899 fn parse_happy_path_empty_authzid() {
900 let r = parse_client_initial_response(&client_resp("tok.en.")).unwrap();
901 assert!(r.token == "tok.en.");
902 assert!(r.authzid == None);
903 }
904
905 #[test]
906 fn parse_extracts_authzid_and_ignores_extra_kvpairs() {
907 let bytes =
908 b"n,a=alice,\x01host=example.com\x01auth=Bearer abc\x01port=443\x01\x01".to_vec();
909 let r = parse_client_initial_response(&bytes).unwrap();
910 assert!(r.token == "abc");
911 assert!(r.authzid == Some("alice".to_string()));
912 }
913
914 #[test]
915 fn parse_rejects_missing_auth_kvpair() {
916 let bytes = b"n,,\x01host=example.com\x01\x01".to_vec();
917 assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
918 }
919
920 #[test]
921 fn parse_rejects_missing_bearer_prefix() {
922 let bytes = b"n,,\x01auth=Basic abc\x01\x01".to_vec();
923 assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
924 }
925
926 #[test]
927 fn parse_rejects_bad_gs2_header() {
928 let bytes = b"z,,\x01auth=Bearer abc\x01\x01".to_vec();
929 assert!(parse_client_initial_response(&bytes) == Err(AuthError::MalformedMessage));
930 }
931
932 #[test]
933 fn validate_accepts_fresh_unsecured_token() {
934 let v = UnsecuredJwsValidator::default();
935 let now = 1_000_000_000_000;
936 let token = unsecured("admin", 999_999_000, 1_000_000_900); let outcome = v.validate(&token, now).unwrap();
938 assert!(outcome.principal.name == "admin");
939 assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
940 }
941
942 #[test]
943 fn unsecured_validate_surfaces_exp_in_auth_outcome() {
944 let exp_secs: i64 = 2_000;
946 let now_ms: i64 = 1_000_000;
947 let token = unsecured("alice", 999, exp_secs);
948 let v = UnsecuredJwsValidator::default();
949 let outcome = v.validate(&token, now_ms).expect("token valid");
950 assert!(outcome.principal.name == "alice");
951 assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
952 }
953
954 #[test]
955 fn validate_rejects_expired_token() {
956 let v = UnsecuredJwsValidator {
957 allowable_clock_skew_ms: 0,
958 ..Default::default()
959 };
960 let now = 2_000_000_000_000;
961 let token = unsecured("admin", 1_000_000_000, 1_000_000_100);
962 assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
963 }
964
965 #[test]
966 fn validate_rejects_future_iat() {
967 let v = UnsecuredJwsValidator {
968 allowable_clock_skew_ms: 0,
969 ..Default::default()
970 };
971 let now = 1_000_000_000_000;
972 let token = unsecured("admin", 5_000_000_000, 5_000_000_100);
974 assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
975 }
976
977 #[test]
978 fn validate_rejects_signed_token() {
979 let v = UnsecuredJwsValidator::default();
980 let now = 1_000_000_000_000;
981 let token = format!(
983 "{}.{}.{}",
984 B64URL.encode(b"{\"alg\":\"RS256\"}"),
985 B64URL.encode(b"{\"sub\":\"admin\",\"exp\":1000000900}"),
986 B64URL.encode(b"sig")
987 );
988 assert!(v.validate(&token, now) == Err(AuthError::InvalidToken));
989 }
990
991 #[test]
992 fn validate_rejects_missing_exp() {
993 let v = UnsecuredJwsValidator::default();
994 let token = jws("{\"alg\":\"none\"}", "{\"sub\":\"admin\"}");
995 assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
996 }
997
998 #[test]
999 fn validate_rejects_missing_principal_claim() {
1000 let v = UnsecuredJwsValidator::default();
1001 let token = jws("{\"alg\":\"none\"}", "{\"exp\":5000000000}");
1002 assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1003 }
1004
1005 #[test]
1008 fn unsecured_validate_rejects_when_custom_claim_check_fails() {
1009 let exp_secs: i64 = 2_000;
1010 let now_ms: i64 = 1_000_000;
1011 let token = make_unsecured_jws(&serde_json::json!({
1012 "sub": "alice",
1013 "exp": exp_secs,
1014 "scope": ["kafka.read"],
1015 }));
1016 let v = UnsecuredJwsValidator {
1017 custom_claim_check: Some(parse_jp("$.scope[?@ == 'kafka.admin']")),
1018 ..Default::default()
1019 };
1020 let result = v.validate(&token, now_ms);
1021 assert!(result.unwrap_err() == AuthError::InvalidToken);
1022 }
1023
1024 #[test]
1025 fn unsecured_validate_accepts_when_custom_claim_check_passes() {
1026 let exp_secs: i64 = 2_000;
1027 let now_ms: i64 = 1_000_000;
1028 let token = make_unsecured_jws(&serde_json::json!({
1029 "sub": "alice",
1030 "exp": exp_secs,
1031 "scope": ["kafka.admin", "kafka.read"],
1032 }));
1033 let v = UnsecuredJwsValidator {
1034 custom_claim_check: Some(parse_jp("$.scope[?@ == 'kafka.admin']")),
1035 ..Default::default()
1036 };
1037 let outcome = v.validate(&token, now_ms).expect("valid token");
1038 assert!(outcome.principal.name == "alice");
1039 }
1040
1041 #[test]
1042 fn unsecured_validate_rejects_when_valid_token_type_mismatch() {
1043 let exp_secs: i64 = 2_000;
1044 let now_ms: i64 = 1_000_000;
1045 let token = make_unsecured_jws_with_header(
1047 &serde_json::json!({"alg": "none", "typ": "OPAQUE"}),
1048 &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1049 );
1050 let v = UnsecuredJwsValidator {
1051 valid_token_type: Some("JWT".into()),
1052 ..Default::default()
1053 };
1054 let result = v.validate(&token, now_ms);
1055 assert!(result.unwrap_err() == AuthError::InvalidToken);
1056 }
1057
1058 #[test]
1059 fn unsecured_validate_accepts_when_valid_token_type_match() {
1060 let exp_secs: i64 = 2_000;
1061 let now_ms: i64 = 1_000_000;
1062 let token = make_unsecured_jws_with_header(
1063 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1064 &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1065 );
1066 let v = UnsecuredJwsValidator {
1067 valid_token_type: Some("JWT".into()),
1068 ..Default::default()
1069 };
1070 assert!(v.validate(&token, now_ms).is_ok());
1071 }
1072
1073 #[test]
1074 fn unsecured_validate_accepts_when_valid_token_type_unset_regardless_of_header() {
1075 let exp_secs: i64 = 2_000;
1076 let now_ms: i64 = 1_000_000;
1077 let token = make_unsecured_jws_with_header(
1078 &serde_json::json!({"alg": "none", "typ": "OPAQUE"}),
1079 &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1080 );
1081 let v = UnsecuredJwsValidator::default();
1082 assert!(v.validate(&token, now_ms).is_ok());
1084 }
1085
1086 #[test]
1089 fn unsecured_validate_uses_primary_principal_claim_when_present() {
1090 let exp_secs: i64 = 2_000;
1092 let now_ms: i64 = 1_000_000;
1093 let token = make_unsecured_jws_with_header(
1094 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1095 &serde_json::json!({"sub": "alice", "exp": exp_secs}),
1096 );
1097 let v = UnsecuredJwsValidator {
1098 fallback_user_name_claim: Some("client_id".into()),
1099 fallback_user_name_prefix: Some("service-account-".into()),
1100 ..Default::default()
1101 };
1102 let outcome = v.validate(&token, now_ms).expect("valid");
1103 assert!(outcome.principal.name == "alice"); }
1105
1106 #[test]
1107 fn unsecured_validate_falls_back_to_alt_claim_when_primary_absent() {
1108 let exp_secs: i64 = 2_000;
1109 let now_ms: i64 = 1_000_000;
1110 let token = make_unsecured_jws_with_header(
1111 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1112 &serde_json::json!({"client_id": "svc1", "exp": exp_secs}),
1114 );
1115 let v = UnsecuredJwsValidator {
1116 fallback_user_name_claim: Some("client_id".into()),
1117 ..Default::default()
1118 };
1119 let outcome = v.validate(&token, now_ms).expect("valid");
1120 assert!(outcome.principal.name == "svc1"); }
1122
1123 #[test]
1124 fn unsecured_validate_applies_fallback_prefix_only_on_fallback() {
1125 let exp_secs: i64 = 2_000;
1126 let now_ms: i64 = 1_000_000;
1127 let token = make_unsecured_jws_with_header(
1128 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1129 &serde_json::json!({"client_id": "svc1", "exp": exp_secs}),
1130 );
1131 let v = UnsecuredJwsValidator {
1132 fallback_user_name_claim: Some("client_id".into()),
1133 fallback_user_name_prefix: Some("service-account-".into()),
1134 ..Default::default()
1135 };
1136 let outcome = v.validate(&token, now_ms).expect("valid");
1137 assert!(outcome.principal.name == "service-account-svc1");
1138 }
1139
1140 #[test]
1141 fn unsecured_validate_rejects_when_neither_primary_nor_fallback_present() {
1142 let exp_secs: i64 = 2_000;
1143 let now_ms: i64 = 1_000_000;
1144 let token = make_unsecured_jws_with_header(
1145 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1146 &serde_json::json!({"exp": exp_secs}),
1148 );
1149 let v = UnsecuredJwsValidator {
1150 fallback_user_name_claim: Some("client_id".into()),
1151 ..Default::default()
1152 };
1153 assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
1154 }
1155
1156 #[test]
1157 fn unsecured_validate_extracts_groups_from_array_claim() {
1158 let exp_secs: i64 = 2_000;
1159 let now_ms: i64 = 1_000_000;
1160 let token = make_unsecured_jws_with_header(
1161 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1162 &serde_json::json!({
1163 "sub": "alice",
1164 "exp": exp_secs,
1165 "groups": ["admin", "ops"],
1166 }),
1167 );
1168 let v = UnsecuredJwsValidator {
1169 groups_claim: Some(parse_jp("$.groups")),
1170 ..Default::default()
1171 };
1172 let outcome = v.validate(&token, now_ms).expect("valid");
1173 assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
1174 }
1175
1176 #[test]
1177 fn unsecured_validate_extracts_groups_from_delimited_string() {
1178 let exp_secs: i64 = 2_000;
1179 let now_ms: i64 = 1_000_000;
1180 let token = make_unsecured_jws_with_header(
1181 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1182 &serde_json::json!({
1183 "sub": "alice",
1184 "exp": exp_secs,
1185 "groups": "admin,ops, kafka",
1186 }),
1187 );
1188 let v = UnsecuredJwsValidator {
1189 groups_claim: Some(parse_jp("$.groups")),
1190 groups_claim_delimiter: Some(",".into()),
1191 ..Default::default()
1192 };
1193 let outcome = v.validate(&token, now_ms).expect("valid");
1194 assert!(
1195 outcome.principal.groups
1196 == vec!["admin".to_string(), "ops".to_string(), "kafka".to_string()]
1197 );
1198 }
1199
1200 #[test]
1201 fn unsecured_validate_extracts_groups_from_nested_claim_via_jsonpath() {
1202 let exp_secs: i64 = 2_000;
1203 let now_ms: i64 = 1_000_000;
1204 let token = make_unsecured_jws_with_header(
1205 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1206 &serde_json::json!({
1207 "sub": "alice",
1208 "exp": exp_secs,
1209 "realm_access": { "roles": ["admin", "ops"] },
1210 }),
1211 );
1212 let v = UnsecuredJwsValidator {
1213 groups_claim: Some(parse_jp("$.realm_access.roles[*]")),
1214 ..Default::default()
1215 };
1216 let outcome = v.validate(&token, now_ms).expect("valid");
1217 assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
1218 }
1219
1220 #[test]
1221 fn unsecured_validate_returns_empty_groups_when_claim_unset() {
1222 let exp_secs: i64 = 2_000;
1223 let now_ms: i64 = 1_000_000;
1224 let token = make_unsecured_jws_with_header(
1225 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1226 &serde_json::json!({
1227 "sub": "alice",
1228 "exp": exp_secs,
1229 "groups": ["admin"],
1230 }),
1231 );
1232 let v = UnsecuredJwsValidator::default(); let outcome = v.validate(&token, now_ms).expect("valid");
1234 assert!(outcome.principal.groups == Vec::<String>::new());
1235 }
1236
1237 #[test]
1238 fn unsecured_validate_returns_empty_groups_when_claim_resolves_to_empty() {
1239 let exp_secs: i64 = 2_000;
1240 let now_ms: i64 = 1_000_000;
1241 let token = make_unsecured_jws_with_header(
1242 &serde_json::json!({"alg": "none", "typ": "JWT"}),
1243 &serde_json::json!({
1244 "sub": "alice",
1245 "exp": exp_secs,
1246 }),
1247 );
1248 let v = UnsecuredJwsValidator {
1249 groups_claim: Some(parse_jp("$.nonexistent")),
1250 ..Default::default()
1251 };
1252 let outcome = v.validate(&token, now_ms).expect("valid");
1253 assert!(outcome.principal.groups == Vec::<String>::new());
1254 }
1255
1256 #[test]
1257 fn validate_custom_principal_claim() {
1258 let v = UnsecuredJwsValidator {
1259 principal_claim_name: "client_id".to_string(),
1260 ..Default::default()
1261 };
1262 let token = jws(
1263 "{\"alg\":\"none\"}",
1264 "{\"client_id\":\"svc-1\",\"exp\":5000000000}",
1265 );
1266 let outcome = v.validate(&token, 1_000_000_000_000).unwrap();
1267 assert!(outcome.principal.name == "svc-1");
1268 }
1269
1270 #[test]
1271 fn invalid_token_json_is_rfc7628_shape() {
1272 assert!(invalid_token_json() == "{\"status\":\"invalid_token\"}");
1273 }
1274
1275 use crate::jwks::{Jwks, JwksHandle, mint_es256, mint_rs256, mint_rs256_with_header};
1278
1279 fn signed(jwks_json: &str) -> (SignedJwsValidator, JwksHandle) {
1281 let handle = JwksHandle::new(Jwks::from_json(jwks_json, false).unwrap());
1282 (SignedJwsValidator::new(handle.clone()), handle)
1283 }
1284
1285 #[test]
1286 fn signed_accepts_fresh_rs256_token() {
1287 let (token, jwks) = mint_rs256("k1", "{\"sub\":\"admin\",\"exp\":9999999999}");
1288 let (v, _h) = signed(&jwks);
1289 let outcome = v.validate(&token, 1_000_000_000_000).unwrap();
1290 assert!(outcome.principal.name == "admin");
1291 assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
1292 }
1293
1294 #[test]
1295 fn signed_validate_surfaces_exp_in_auth_outcome() {
1296 let exp_secs: i64 = 2_000;
1297 let now_ms: i64 = 1_000_000;
1298 let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1299 let (v, _h) = signed(&jwks);
1300 let outcome = v.validate(&token, now_ms).expect("token valid");
1301 assert!(outcome.principal.name == "alice");
1302 assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
1303 }
1304
1305 #[test]
1306 fn signed_rejects_unsecured_alg_none() {
1307 let (_token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1310 let (v, _h) = signed(&jwks);
1311 let unsecured = jws("{\"alg\":\"none\"}", "{\"sub\":\"a\",\"exp\":9999999999}");
1312 assert!(v.validate(&unsecured, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1313 }
1314
1315 #[test]
1316 fn signed_rejects_expired() {
1317 let (token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":1000}");
1318 let (mut v, _h) = signed(&jwks);
1319 v.allowable_clock_skew_ms = 0;
1320 assert!(v.validate(&token, 5_000_000_000_000) == Err(AuthError::InvalidToken));
1322 }
1323
1324 #[test]
1325 fn signed_rejects_future_nbf() {
1326 let (token, jwks) = mint_rs256(
1327 "k1",
1328 "{\"sub\":\"a\",\"exp\":9999999999,\"nbf\":5000000000}",
1329 );
1330 let (mut v, _h) = signed(&jwks);
1331 v.allowable_clock_skew_ms = 0;
1332 assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1334 }
1335
1336 #[test]
1337 fn signed_honors_issuer() {
1338 let (token, jwks) = mint_rs256(
1339 "k1",
1340 "{\"sub\":\"a\",\"exp\":9999999999,\"iss\":\"https://idp\"}",
1341 );
1342 let (mut v, _h) = signed(&jwks);
1343 v.valid_issuer = Some("https://idp".to_string());
1344 assert!(v.validate(&token, 1_000_000_000_000).is_ok());
1345 v.valid_issuer = Some("https://other".to_string());
1346 assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1347 }
1348
1349 #[test]
1350 fn signed_rejects_missing_issuer_when_required() {
1351 let (token, jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1352 let (mut v, _h) = signed(&jwks);
1353 v.valid_issuer = Some("https://idp".to_string());
1354 assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1355 }
1356
1357 #[test]
1358 fn signed_honors_audience_string_and_array() {
1359 let (tok_str, jwks) =
1360 mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":\"kafka\"}");
1361 let (mut v, _h) = signed(&jwks);
1362 v.expected_audience = Some("kafka".to_string());
1363 assert!(v.validate(&tok_str, 1_000_000_000_000).is_ok());
1364
1365 let (tok_arr, jwks2) = mint_rs256(
1366 "k1",
1367 "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":[\"other\",\"kafka\"]}",
1368 );
1369 let (mut v2, _h2) = signed(&jwks2);
1370 v2.expected_audience = Some("kafka".to_string());
1371 assert!(v2.validate(&tok_arr, 1_000_000_000_000).is_ok());
1372
1373 let (tok_bad, jwks3) =
1374 mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999,\"aud\":\"web\"}");
1375 let (mut v3, _h3) = signed(&jwks3);
1376 v3.expected_audience = Some("kafka".to_string());
1377 assert!(v3.validate(&tok_bad, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1378 }
1379
1380 #[test]
1383 fn signed_validate_rejects_when_custom_claim_check_fails() {
1384 let (token, jwks) = mint_rs256(
1385 "k1",
1386 "{\"sub\":\"alice\",\"exp\":9999999999,\"scope\":[\"kafka.read\"]}",
1387 );
1388 let (mut v, _h) = signed(&jwks);
1389 v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1390 let result = v.validate(&token, 1_000_000_000_000);
1391 assert!(result.unwrap_err() == AuthError::InvalidToken);
1392 }
1393
1394 #[test]
1395 fn signed_validate_accepts_when_custom_claim_check_passes() {
1396 let (token, jwks) = mint_rs256(
1397 "k1",
1398 "{\"sub\":\"alice\",\"exp\":9999999999,\"scope\":[\"kafka.admin\",\"kafka.read\"]}",
1399 );
1400 let (mut v, _h) = signed(&jwks);
1401 v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1402 let outcome = v.validate(&token, 1_000_000_000_000).expect("valid token");
1403 assert!(outcome.principal.name == "alice");
1404 }
1405
1406 #[test]
1407 fn signed_validate_rejects_when_valid_token_type_mismatch() {
1408 let (token, jwks) = mint_rs256_with_header(
1409 "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"OPAQUE\"}",
1410 "{\"sub\":\"alice\",\"exp\":9999999999}",
1411 );
1412 let (mut v, _h) = signed(&jwks);
1413 v.valid_token_type = Some("JWT".into());
1414 let result = v.validate(&token, 1_000_000_000_000);
1415 assert!(result.unwrap_err() == AuthError::InvalidToken);
1416 }
1417
1418 #[test]
1419 fn signed_validate_accepts_when_valid_token_type_match() {
1420 let (token, jwks) = mint_rs256_with_header(
1421 "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"JWT\"}",
1422 "{\"sub\":\"alice\",\"exp\":9999999999}",
1423 );
1424 let (mut v, _h) = signed(&jwks);
1425 v.valid_token_type = Some("JWT".into());
1426 assert!(v.validate(&token, 1_000_000_000_000).is_ok());
1427 }
1428
1429 #[test]
1430 fn signed_validate_accepts_when_valid_token_type_unset_regardless_of_header() {
1431 let (token, jwks) = mint_rs256_with_header(
1432 "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"OPAQUE\"}",
1433 "{\"sub\":\"alice\",\"exp\":9999999999}",
1434 );
1435 let (v, _h) = signed(&jwks);
1436 assert!(v.validate(&token, 1_000_000_000_000).is_ok());
1438 }
1439
1440 #[test]
1441 fn signed_rejects_missing_principal() {
1442 let (token, jwks) = mint_rs256("k1", "{\"exp\":9999999999}");
1443 let (v, _h) = signed(&jwks);
1444 assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1445 }
1446
1447 #[test]
1448 fn signed_custom_principal_claim() {
1449 let (token, jwks) = mint_rs256("k1", "{\"client_id\":\"svc-1\",\"exp\":9999999999}");
1450 let (mut v, _h) = signed(&jwks);
1451 v.principal_claim_name = "client_id".to_string();
1452 assert!(
1453 v.validate(&token, 1_000_000_000_000)
1454 .unwrap()
1455 .principal
1456 .name
1457 == "svc-1"
1458 );
1459 }
1460
1461 #[test]
1462 fn signed_key_rotation_via_handle() {
1463 let (token_a, jwks_a) = mint_es256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1466 let (v, handle) = signed(&jwks_a);
1467 assert!(v.validate(&token_a, 1_000_000_000_000).is_ok());
1468
1469 let (token_b, jwks_b) = mint_es256("k1", "{\"sub\":\"b\",\"exp\":9999999999}");
1472 handle.store(Jwks::from_json(&jwks_b, false).unwrap());
1473 assert!(v.validate(&token_a, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1474 assert!(
1475 v.validate(&token_b, 1_000_000_000_000)
1476 .unwrap()
1477 .principal
1478 .name
1479 == "b"
1480 );
1481 }
1482
1483 #[test]
1484 fn signed_rejects_when_keyset_empty() {
1485 let (token, _jwks) = mint_rs256("k1", "{\"sub\":\"a\",\"exp\":9999999999}");
1486 let v = SignedJwsValidator::new(JwksHandle::default());
1487 assert!(v.validate(&token, 1_000_000_000_000) == Err(AuthError::InvalidToken));
1488 }
1489
1490 #[test]
1493 fn signed_validate_falls_back_to_alt_claim_when_primary_absent() {
1494 let (token, jwks) = mint_rs256_with_header(
1497 "{\"alg\":\"RS256\",\"kid\":\"k1\",\"typ\":\"JWT\"}",
1498 "{\"client_id\":\"svc1\",\"exp\":9999999999,\"iss\":\"https://test.example\"}",
1499 );
1500 let (mut v, _h) = signed(&jwks);
1501 v.fallback_user_name_claim = Some("client_id".into());
1502 v.fallback_user_name_prefix = Some("service-account-".into());
1503 let outcome = v.validate(&token, 1_000_000_000_000).expect("valid");
1504 assert!(outcome.principal.name == "service-account-svc1");
1505 }
1506
1507 #[test]
1508 fn signed_validate_extracts_groups_from_array_claim() {
1509 let (token, jwks) = mint_rs256(
1510 "k1",
1511 "{\"sub\":\"alice\",\"exp\":9999999999,\"groups\":[\"admin\",\"ops\"]}",
1512 );
1513 let (mut v, _h) = signed(&jwks);
1514 v.groups_claim = Some(parse_jp("$.groups"));
1515 let outcome = v.validate(&token, 1_000_000_000_000).expect("valid");
1516 assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
1517 }
1518
1519 fn signed_with_handles(
1525 jwks_json: &str,
1526 last_successful_fetch_ms: i64,
1527 ) -> (SignedJwsValidator, tokio::sync::mpsc::Receiver<()>) {
1528 use std::sync::Arc;
1529 use std::sync::atomic::AtomicI64;
1530 let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1531 let ts = Arc::new(AtomicI64::new(last_successful_fetch_ms));
1532 let handle = JwksHandle::new_with_refresher_handles(
1533 Jwks::from_json(jwks_json, false).unwrap(),
1534 ts,
1535 tx,
1536 );
1537 (SignedJwsValidator::new(handle), rx)
1538 }
1539
1540 #[test]
1541 fn signed_validate_rejects_when_jwks_cache_expired() {
1542 let now_ms: i64 = 10_000_000;
1543 let exp_secs: i64 = (now_ms / 1000) + 60;
1544 let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1545 let (mut v, _rx) = signed_with_handles(&jwks, now_ms - 2_000);
1547 v.expiry_ms = Some(1_000);
1548 assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
1549 }
1550
1551 #[test]
1552 fn signed_validate_accepts_when_jwks_cache_within_expiry() {
1553 let now_ms: i64 = 10_000_000;
1554 let exp_secs: i64 = (now_ms / 1000) + 60;
1555 let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1556 let (mut v, _rx) = signed_with_handles(&jwks, now_ms - 500);
1558 v.expiry_ms = Some(1_000);
1559 let outcome = v.validate(&token, now_ms).expect("valid");
1560 assert!(outcome.principal.name == "alice");
1561 }
1562
1563 #[test]
1564 fn signed_validate_accepts_when_expiry_unset_regardless_of_cache_age() {
1565 let now_ms: i64 = 10_000_000;
1566 let exp_secs: i64 = (now_ms / 1000) + 60;
1567 let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1568 let (v, _rx) = signed_with_handles(&jwks, now_ms - 999_999_999);
1570 assert!(v.validate(&token, now_ms).is_ok());
1572 }
1573
1574 #[test]
1575 fn signed_validate_skips_expiry_check_when_never_fetched() {
1576 let now_ms: i64 = 10_000_000;
1581 let exp_secs: i64 = (now_ms / 1000) + 60;
1582 let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1583 let (mut v, _rx) = signed_with_handles(&jwks, 0);
1584 v.expiry_ms = Some(1);
1585 assert!(v.validate(&token, now_ms).is_ok());
1588 }
1589
1590 #[test]
1591 fn signed_validate_signals_refresh_on_unknown_kid() {
1592 let now_ms: i64 = 10_000_000;
1593 let exp_secs: i64 = (now_ms / 1000) + 60;
1594 let (token, _jwks_with_k1) =
1598 mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1599 let mismatched_jwks =
1601 r#"{"keys":[{"kty":"RSA","kid":"other","n":"AQAB","e":"AQAB"}]}"#.to_string();
1602 let (v, mut rx) = signed_with_handles(&mismatched_jwks, now_ms);
1603 assert!(v.validate(&token, now_ms) == Err(AuthError::InvalidToken));
1604 assert!(
1605 rx.try_recv().is_ok(),
1606 "validator should signal refresh on verify failure",
1607 );
1608 }
1609
1610 #[test]
1611 fn signed_validate_does_not_signal_when_verification_succeeds() {
1612 let now_ms: i64 = 10_000_000;
1613 let exp_secs: i64 = (now_ms / 1000) + 60;
1614 let (token, jwks) = mint_rs256("k1", &format!("{{\"sub\":\"alice\",\"exp\":{exp_secs}}}"));
1615 let (v, mut rx) = signed_with_handles(&jwks, now_ms);
1616 assert!(v.validate(&token, now_ms).is_ok());
1617 assert!(
1618 rx.try_recv().is_err(),
1619 "happy-path verification should not signal a refresh",
1620 );
1621 }
1622
1623 #[tokio::test]
1624 async fn enum_dispatches_unsecured_and_signed() {
1625 let unsecured = OAuthBearerValidator::default();
1627 assert!(unsecured.jwks_handle().is_none());
1628 let tok = unsecured_token("admin", 999_999_000, 9_999_999_999);
1629 assert!(unsecured.validate(&tok, 1_000_000_000_000).await.is_ok());
1630
1631 let (token, jwks) = mint_rs256("k1", "{\"sub\":\"x\",\"exp\":9999999999}");
1633 let (sv, _h) = signed(&jwks);
1634 let signed_enum = OAuthBearerValidator::Signed(sv);
1635 assert!(signed_enum.jwks_handle().is_some());
1636 assert!(
1637 signed_enum
1638 .validate(&token, 1_000_000_000_000)
1639 .await
1640 .unwrap()
1641 .principal
1642 .name
1643 == "x"
1644 );
1645 }
1646
1647 fn unsecured_token(sub: &str, iat_s: i64, exp_s: i64) -> String {
1648 jws(
1649 "{\"alg\":\"none\"}",
1650 &format!("{{\"sub\":\"{sub}\",\"iat\":{iat_s},\"exp\":{exp_s}}}"),
1651 )
1652 }
1653
1654 #[test]
1655 fn custom_claim_check_compile_error_at_validator_construction() {
1656 let result = parse_json_path("@.unterminated");
1659 assert!(result.is_err(), "malformed expression must fail to parse");
1660 }
1661}
1662
1663#[cfg(test)]
1664mod introspection_tests {
1665 use super::*;
1666 use crate::{AuthError, AuthMethod};
1667 use assert2::assert;
1668 use jsonpath_rust::parser::parse_json_path;
1669 use serde_json::{Value, json};
1670 use std::collections::HashMap;
1671 use std::sync::Mutex;
1672
1673 #[derive(Debug, Default)]
1677 struct MockIntrospectionClient {
1678 introspect_responses: Mutex<HashMap<String, Result<Value, IntrospectionError>>>,
1679 userinfo_responses: Mutex<HashMap<String, Result<Option<Value>, IntrospectionError>>>,
1680 }
1681
1682 impl MockIntrospectionClient {
1683 fn arc() -> Arc<Self> {
1684 Arc::new(Self::default())
1685 }
1686 fn set_introspect(&self, token: &str, resp: Result<Value, IntrospectionError>) {
1687 self.introspect_responses
1688 .lock()
1689 .unwrap()
1690 .insert(token.into(), resp);
1691 }
1692 fn set_userinfo(&self, token: &str, resp: Result<Option<Value>, IntrospectionError>) {
1693 self.userinfo_responses
1694 .lock()
1695 .unwrap()
1696 .insert(token.into(), resp);
1697 }
1698 }
1699
1700 #[async_trait::async_trait]
1701 impl IntrospectionClient for MockIntrospectionClient {
1702 async fn introspect(&self, token: &str) -> Result<Value, IntrospectionError> {
1703 self.introspect_responses
1704 .lock()
1705 .unwrap()
1706 .remove(token)
1707 .unwrap_or(Err(IntrospectionError::Transport(
1708 "no canned response".into(),
1709 )))
1710 }
1711 async fn userinfo(&self, token: &str) -> Result<Option<Value>, IntrospectionError> {
1712 self.userinfo_responses
1713 .lock()
1714 .unwrap()
1715 .remove(token)
1716 .unwrap_or(Ok(None))
1717 }
1718 }
1719
1720 fn validator(client: Arc<MockIntrospectionClient>) -> IntrospectionValidator {
1721 IntrospectionValidator {
1722 client,
1723 principal_claim_name: "sub".into(),
1724 custom_claim_check: None,
1725 call_userinfo: false,
1726 allowable_clock_skew_ms: 30_000,
1727 expected_audience: None,
1728 fallback_user_name_claim: None,
1729 fallback_user_name_prefix: None,
1730 groups_claim: None,
1731 groups_claim_delimiter: None,
1732 }
1733 }
1734
1735 fn parse_jp(expr: &str) -> JpQuery {
1736 parse_json_path(expr).expect("expression compiles")
1737 }
1738
1739 const NOW_MS: i64 = 1_700_000_000_000;
1740
1741 #[tokio::test]
1742 async fn introspection_active_true_with_principal_returns_ok() {
1743 let mock = MockIntrospectionClient::arc();
1744 mock.set_introspect(
1745 "tok",
1746 Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
1747 );
1748 let v = validator(mock.clone());
1749 let outcome = v.validate("tok", NOW_MS).await.unwrap();
1750 assert!(outcome.principal.name == "alice");
1751 assert!(outcome.principal.auth_method == AuthMethod::SaslOAuthBearer);
1752 }
1753
1754 #[tokio::test]
1755 async fn introspection_active_false_rejected() {
1756 let mock = MockIntrospectionClient::arc();
1757 mock.set_introspect("tok", Ok(json!({"active": false})));
1758 let v = validator(mock.clone());
1759 assert!(matches!(
1760 v.validate("tok", NOW_MS).await,
1761 Err(AuthError::InvalidToken)
1762 ));
1763 }
1764
1765 #[tokio::test]
1766 async fn introspection_missing_active_field_rejected() {
1767 let mock = MockIntrospectionClient::arc();
1768 mock.set_introspect("tok", Ok(json!({"sub": "alice"})));
1769 let v = validator(mock.clone());
1770 assert!(matches!(
1771 v.validate("tok", NOW_MS).await,
1772 Err(AuthError::InvalidToken)
1773 ));
1774 }
1775
1776 #[tokio::test]
1777 async fn introspection_expired_exp_rejected() {
1778 let mock = MockIntrospectionClient::arc();
1779 mock.set_introspect(
1780 "tok",
1781 Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 - 3600})),
1782 );
1783 let v = validator(mock.clone());
1784 assert!(matches!(
1785 v.validate("tok", NOW_MS).await,
1786 Err(AuthError::InvalidToken)
1787 ));
1788 }
1789
1790 #[tokio::test]
1793 async fn introspection_validate_rejects_when_custom_claim_check_fails() {
1794 let mock = MockIntrospectionClient::arc();
1795 mock.set_introspect(
1796 "tok",
1797 Ok(json!({
1798 "active": true,
1799 "sub": "alice",
1800 "exp": NOW_MS/1000 + 60,
1801 "scope": ["kafka.read"],
1802 })),
1803 );
1804 let mut v = validator(mock.clone());
1805 v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1806 let result = v.validate("tok", NOW_MS).await;
1807 assert!(result.unwrap_err() == AuthError::InvalidToken);
1808 }
1809
1810 #[tokio::test]
1811 async fn introspection_validate_accepts_when_custom_claim_check_passes() {
1812 let mock = MockIntrospectionClient::arc();
1813 mock.set_introspect(
1814 "tok",
1815 Ok(json!({
1816 "active": true,
1817 "sub": "alice",
1818 "exp": NOW_MS/1000 + 60,
1819 "scope": ["kafka.admin", "kafka.read"],
1820 })),
1821 );
1822 let mut v = validator(mock.clone());
1823 v.custom_claim_check = Some(parse_jp("$.scope[?@ == 'kafka.admin']"));
1824 let outcome = v.validate("tok", NOW_MS).await.expect("valid");
1825 assert!(outcome.principal.name == "alice");
1826 }
1827
1828 #[tokio::test]
1831 async fn introspection_honors_audience_string_and_array() {
1832 let mock = MockIntrospectionClient::arc();
1834 mock.set_introspect(
1835 "tok",
1836 Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "kafka"})),
1837 );
1838 let mut v = validator(mock.clone());
1839 v.expected_audience = Some("kafka".to_string());
1840 assert!(v.validate("tok", NOW_MS).await.is_ok());
1841
1842 let mock2 = MockIntrospectionClient::arc();
1844 mock2.set_introspect(
1845 "tok",
1846 Ok(json!({
1847 "active": true, "sub": "a", "exp": NOW_MS/1000 + 60,
1848 "aud": ["other", "kafka"],
1849 })),
1850 );
1851 let mut v2 = validator(mock2.clone());
1852 v2.expected_audience = Some("kafka".to_string());
1853 assert!(v2.validate("tok", NOW_MS).await.is_ok());
1854 }
1855
1856 #[tokio::test]
1857 async fn introspection_rejects_non_matching_audience() {
1858 let mock = MockIntrospectionClient::arc();
1859 mock.set_introspect(
1860 "tok",
1861 Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "web"})),
1862 );
1863 let mut v = validator(mock.clone());
1864 v.expected_audience = Some("kafka".to_string());
1865 assert!(v.validate("tok", NOW_MS).await == Err(AuthError::InvalidToken));
1866 }
1867
1868 #[tokio::test]
1869 async fn introspection_rejects_missing_audience_when_expected() {
1870 let mock = MockIntrospectionClient::arc();
1871 mock.set_introspect(
1872 "tok",
1873 Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60})),
1874 );
1875 let mut v = validator(mock.clone());
1876 v.expected_audience = Some("kafka".to_string());
1877 assert!(v.validate("tok", NOW_MS).await == Err(AuthError::InvalidToken));
1878 }
1879
1880 #[tokio::test]
1881 async fn introspection_ignores_audience_when_unset() {
1882 let mock = MockIntrospectionClient::arc();
1884 mock.set_introspect(
1885 "tok",
1886 Ok(json!({"active": true, "sub": "a", "exp": NOW_MS/1000 + 60, "aud": "web"})),
1887 );
1888 let v = validator(mock.clone());
1889 assert!(v.validate("tok", NOW_MS).await.is_ok());
1890 }
1891
1892 #[tokio::test]
1893 async fn introspection_validate_does_not_check_valid_token_type() {
1894 let mock = MockIntrospectionClient::arc();
1900 mock.set_introspect(
1901 "tok",
1902 Ok(json!({
1903 "active": true,
1904 "sub": "alice",
1905 "exp": NOW_MS/1000 + 60,
1906 })),
1907 );
1908 let v = validator(mock.clone());
1909 let outcome = v.validate("tok", NOW_MS).await.expect("valid");
1910 assert!(outcome.principal.name == "alice");
1911 }
1912
1913 #[tokio::test]
1914 async fn introspection_userinfo_claims_override_introspection_for_profile_keys() {
1915 let mock = MockIntrospectionClient::arc();
1916 mock.set_introspect(
1917 "tok",
1918 Ok(json!({
1919 "active": true,
1920 "sub": "alice",
1921 "exp": NOW_MS/1000 + 60,
1922 "preferred_username": "intros-name",
1923 })),
1924 );
1925 mock.set_userinfo(
1926 "tok",
1927 Ok(Some(
1928 json!({"preferred_username": "userinfo-name", "email": "a@b.c"}),
1929 )),
1930 );
1931 let mut v = validator(mock.clone());
1932 v.call_userinfo = true;
1933 v.principal_claim_name = "preferred_username".into();
1934 let outcome = v.validate("tok", NOW_MS).await.unwrap();
1935 assert!(outcome.principal.name == "userinfo-name");
1936 }
1937
1938 #[tokio::test]
1939 async fn introspection_userinfo_does_not_override_authorization_keys() {
1940 let mock = MockIntrospectionClient::arc();
1941 mock.set_introspect(
1942 "tok",
1943 Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
1944 );
1945 mock.set_userinfo("tok", Ok(Some(json!({"active": false, "sub": "mallory"}))));
1946 let mut v = validator(mock.clone());
1947 v.call_userinfo = true;
1948 let outcome = v.validate("tok", NOW_MS).await.unwrap();
1949 assert!(
1950 outcome.principal.name == "alice",
1951 "sub from introspection wins over userinfo"
1952 );
1953 }
1954
1955 #[tokio::test]
1956 async fn introspection_userinfo_disabled_when_call_userinfo_false() {
1957 let mock = MockIntrospectionClient::arc();
1958 mock.set_introspect(
1959 "tok",
1960 Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
1961 );
1962 mock.set_userinfo("tok", Ok(Some(json!({"preferred_username": "ignored"}))));
1964 let v = validator(mock.clone()); let outcome = v.validate("tok", NOW_MS).await.unwrap();
1966 assert!(outcome.principal.name == "alice");
1967 }
1968
1969 #[tokio::test]
1970 async fn introspection_transport_error_becomes_introspection_transport() {
1971 let mock = MockIntrospectionClient::arc();
1972 mock.set_introspect(
1973 "tok",
1974 Err(IntrospectionError::Transport("connection refused".into())),
1975 );
1976 let v = validator(mock.clone());
1977 let err = v.validate("tok", NOW_MS).await.unwrap_err();
1978 assert!(
1979 matches!(err, AuthError::IntrospectionTransport(ref msg) if msg.contains("connection refused")),
1980 "got {err:?}",
1981 );
1982 }
1983
1984 #[tokio::test]
1985 async fn introspection_default_principal_claim_sub() {
1986 let mock = MockIntrospectionClient::arc();
1987 mock.set_introspect(
1988 "tok",
1989 Ok(json!({"active": true, "sub": "sub-name", "exp": NOW_MS/1000 + 60})),
1990 );
1991 let v = validator(mock.clone());
1992 assert!(v.validate("tok", NOW_MS).await.unwrap().principal.name == "sub-name");
1993 }
1994
1995 #[tokio::test]
1996 async fn introspection_custom_principal_claim_client_id() {
1997 let mock = MockIntrospectionClient::arc();
1998 mock.set_introspect(
1999 "tok",
2000 Ok(json!({
2001 "active": true,
2002 "sub": "sub-name",
2003 "exp": NOW_MS/1000 + 60,
2004 "client_id": "my-client",
2005 })),
2006 );
2007 let mut v = validator(mock.clone());
2008 v.principal_claim_name = "client_id".into();
2009 assert!(v.validate("tok", NOW_MS).await.unwrap().principal.name == "my-client");
2010 }
2011
2012 #[tokio::test]
2013 async fn enum_dispatch_introspection_async() {
2014 let mock = MockIntrospectionClient::arc();
2015 mock.set_introspect(
2016 "tok",
2017 Ok(json!({"active": true, "sub": "alice", "exp": NOW_MS/1000 + 60})),
2018 );
2019 let v = validator(mock.clone());
2020 let enum_v = OAuthBearerValidator::Introspection(v);
2021 let outcome = enum_v.validate("tok", NOW_MS).await.unwrap();
2022 assert!(outcome.principal.name == "alice");
2023 }
2024
2025 #[tokio::test]
2026 async fn introspection_validate_surfaces_exp_from_introspection_response() {
2027 let exp_secs: i64 = 2_000;
2028 let now_ms: i64 = 1_000_000;
2029 let mock = MockIntrospectionClient::arc();
2030 mock.set_introspect(
2031 "opaque-token",
2032 Ok(json!({
2033 "active": true,
2034 "sub": "alice",
2035 "exp": exp_secs,
2036 "scope": "kafka.write",
2037 })),
2038 );
2039 let v = validator(mock.clone());
2040 let outcome = v
2041 .validate("opaque-token", now_ms)
2042 .await
2043 .expect("token valid");
2044 assert!(outcome.principal.name == "alice");
2045 assert!(outcome.expires_at_ms == Some(exp_secs * 1000));
2046 }
2047
2048 #[tokio::test]
2051 async fn introspection_validate_extracts_groups_from_introspection_response() {
2052 let exp_secs: i64 = 2_000;
2053 let now_ms: i64 = 1_000_000;
2054 let mock = MockIntrospectionClient::arc();
2055 mock.set_introspect(
2056 "opaque-token",
2057 Ok(json!({
2058 "active": true,
2059 "sub": "alice",
2060 "exp": exp_secs,
2061 "groups": ["admin", "ops"],
2062 })),
2063 );
2064 let mut v = validator(mock.clone());
2065 v.groups_claim = Some(parse_jp("$.groups"));
2066 let outcome = v.validate("opaque-token", now_ms).await.expect("valid");
2067 assert!(outcome.principal.groups == vec!["admin".to_string(), "ops".to_string()]);
2068 }
2069}