#![deny(unsafe_code)]
use serde::Deserialize;
use std::collections::BTreeMap;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JwtAnalysis {
pub alg: String,
pub typ: Option<String>,
pub kid: Option<String>,
pub iss: Option<String>,
pub sub: Option<String>,
pub aud: Option<String>,
pub exp: Option<i64>,
pub expired: Option<bool>,
pub anomalies: Vec<JwtAnomaly>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum JwtAnomaly {
AlgNone,
UnknownAlg(String),
NonStandardTyp(String),
Expired,
}
pub fn anomalies_to_metadata(analysis: &JwtAnalysis) -> Option<BTreeMap<String, String>> {
if analysis.anomalies.is_empty() {
return None;
}
let mut out = BTreeMap::new();
for anomaly in &analysis.anomalies {
match anomaly {
JwtAnomaly::AlgNone => {
out.insert(
"jwt.alg_none".to_string(),
"true (unsigned token: RFC 7519 §6 risk)".to_string(),
);
}
JwtAnomaly::UnknownAlg(alg) => {
out.insert("jwt.unknown_alg".to_string(), alg.clone());
}
JwtAnomaly::NonStandardTyp(typ) => {
out.insert("jwt.non_standard_typ".to_string(), typ.clone());
}
JwtAnomaly::Expired => {
out.insert("jwt.expired".to_string(), "true".to_string());
}
}
}
Some(out)
}
pub fn finding_metadata(credential: &str) -> Option<std::collections::HashMap<String, String>> {
let analysis = analyze(credential)?;
let mut meta = std::collections::HashMap::new();
meta.insert("jwt.alg".to_string(), analysis.alg.clone());
if let Some(iss) = &analysis.iss {
meta.insert("jwt.iss".to_string(), iss.clone());
}
if let Some(sub) = &analysis.sub {
meta.insert("jwt.sub".to_string(), sub.clone());
}
if let Some(aud) = &analysis.aud {
meta.insert("jwt.aud".to_string(), aud.clone());
}
if let Some(exp) = analysis.exp {
meta.insert("jwt.exp".to_string(), exp.to_string());
}
if let Some(anomalies) = anomalies_to_metadata(&analysis) {
for (k, v) in anomalies {
meta.insert(k, v);
}
}
Some(meta)
}
pub fn looks_like_jwt(s: &str) -> bool {
let s = s.trim();
const MAX_JWT_SEGMENT_LEN: usize = 16 * 1024;
let mut parts = s.split('.');
let (Some(h), Some(p), Some(sig), None) =
(parts.next(), parts.next(), parts.next(), parts.next())
else {
return false;
};
if h.len() > MAX_JWT_SEGMENT_LEN
|| p.len() > MAX_JWT_SEGMENT_LEN
|| sig.len() > MAX_JWT_SEGMENT_LEN
{
return false;
}
!h.is_empty()
&& !p.is_empty()
&& !sig.is_empty()
&& h.bytes().all(is_base64url_byte)
&& p.bytes().all(is_base64url_byte)
&& sig.bytes().all(is_base64url_byte)
}
pub fn analyze(s: &str) -> Option<JwtAnalysis> {
let s = s.trim();
if !looks_like_jwt(s) {
return None;
}
let mut parts = s.split('.');
let header_b64 = parts.next()?;
let payload_b64 = parts.next()?;
let _signature_b64 = parts.next()?;
let header_json = decode_b64url(header_b64)?;
let payload_json = decode_b64url(payload_b64)?;
if !check_nesting_depth(&header_json, 15) || !check_nesting_depth(&payload_json, 15) {
return None;
}
let header: JwtHeader = serde_json::from_slice(&header_json).ok()?;
let mut payload: JwtPayload = serde_json::from_slice(&payload_json).ok()?;
let aud = payload.take_aud();
let iss = payload.iss.take();
let sub = payload.sub.take();
let mut anomalies = Vec::new();
let alg = header.alg.unwrap_or_else(|| "<missing>".to_string());
if alg.eq_ignore_ascii_case("none") {
anomalies.push(JwtAnomaly::AlgNone);
} else if !is_known_alg(&alg) {
anomalies.push(JwtAnomaly::UnknownAlg(alg.clone()));
}
if let Some(typ) = header.typ.as_deref() {
if !is_standard_typ(typ) {
anomalies.push(JwtAnomaly::NonStandardTyp(typ.to_string()));
}
}
let exp_val = payload.exp.take();
let exp = exp_val.and_then(|v| match v {
serde_json::Value::Number(n) => n.as_i64(),
_ => None,
});
let expired = exp.map(|exp_val| {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let is_expired = now >= exp_val;
if is_expired {
anomalies.push(JwtAnomaly::Expired);
}
is_expired
});
Some(JwtAnalysis {
alg,
typ: header.typ,
kid: header.kid,
iss,
sub,
aud,
exp,
expired,
anomalies,
})
}
#[inline]
fn is_base64url_byte(b: u8) -> bool {
b.is_ascii_alphanumeric() || b == b'-' || b == b'_' || b == b'='
}
fn decode_b64url(s: &str) -> Option<Vec<u8>> {
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine;
let trimmed = s.trim_end_matches('=');
URL_SAFE_NO_PAD.decode(trimmed).ok()
}
fn is_known_alg(alg: &str) -> bool {
matches!(
alg,
"RS256"
| "RS384"
| "RS512"
| "HS256"
| "HS384"
| "HS512"
| "ES256"
| "ES384"
| "ES512"
| "ES256K"
| "PS256"
| "PS384"
| "PS512"
| "EdDSA"
)
}
fn is_standard_typ(typ: &str) -> bool {
matches!(typ, "JWT" | "at+jwt" | "id+jwt" | "dpop+jwt" | "logout+jwt")
}
#[derive(Deserialize)]
struct JwtHeader {
alg: Option<String>,
typ: Option<String>,
kid: Option<String>,
}
#[derive(Deserialize)]
struct JwtPayload {
iss: Option<String>,
sub: Option<String>,
#[serde(default)]
aud: serde_json::Value,
exp: Option<serde_json::Value>,
}
impl JwtPayload {
fn take_aud(&mut self) -> Option<String> {
match std::mem::take(&mut self.aud) {
serde_json::Value::String(s) if !s.is_empty() => Some(s),
serde_json::Value::Array(items) if !items.is_empty() => {
let joined: Vec<String> = items
.into_iter()
.filter_map(|v| match v {
serde_json::Value::String(s) => Some(s),
_ => None,
})
.collect();
if joined.is_empty() {
None
} else {
Some(joined.join(","))
}
}
_ => None,
}
}
}
fn check_nesting_depth(json: &[u8], max_depth: usize) -> bool {
let mut depth = 0;
let mut in_string = false;
let mut escaped = false;
for &b in json {
if escaped {
escaped = false;
continue;
}
if b == b'\\' {
if in_string {
escaped = true;
}
continue;
}
if b == b'"' {
in_string = !in_string;
continue;
}
if !in_string {
if b == b'{' || b == b'[' {
depth += 1;
if depth > max_depth {
return false;
}
} else if b == b'}' || b == b']' {
depth = depth.saturating_sub(1);
}
}
}
true
}