use clasp_caps::{CapError, CapabilityToken, CapabilityValidator};
use clasp_core::security::{Action, TokenValidator, ValidationResult};
use ed25519_dalek::SigningKey;
use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};
fn future_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
+ 3600
}
fn root_key() -> SigningKey {
SigningKey::from_bytes(&[1u8; 32])
}
fn make_validator() -> CapabilityValidator {
let key = root_key();
let pub_key = key.verifying_key().to_bytes().to_vec();
CapabilityValidator::new(vec![pub_key], 5)
}
#[test]
fn test_cap_01_token_forgery() {
let validator = make_validator();
let untrusted_key = SigningKey::from_bytes(&[99u8; 32]);
let token = CapabilityToken::create_root(
&untrusted_key,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let encoded = token.encode().unwrap();
match validator.validate(&encoded) {
ValidationResult::Invalid(msg) => {
assert!(
msg.contains("untrusted") || msg.contains("Untrusted"),
"expected UntrustedIssuer, got: {}",
msg
);
}
other => panic!("expected Invalid(UntrustedIssuer), got: {:?}", other),
}
}
#[test]
fn test_cap_02_scope_attenuation_bypass() {
use base64::Engine;
let key = root_key();
let child_key = SigningKey::from_bytes(&[2u8; 32]);
let validator = make_validator();
let token = CapabilityToken::create_root(
&key,
vec!["read:/lights/*".to_string()],
future_timestamp(),
None,
)
.unwrap();
let encoded = token.encode().unwrap();
let b64 = encoded.strip_prefix("cap_").unwrap();
let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(b64)
.unwrap();
let mut decoded: CapabilityToken = rmp_serde::from_slice(&bytes).unwrap();
let original_scopes = decoded.scopes.clone();
decoded.scopes = vec!["admin:/**".to_string()]; assert_ne!(
decoded.scopes, original_scopes,
"scopes must differ after mutation"
);
let tampered_bytes = rmp_serde::to_vec_named(&decoded).unwrap();
let tampered = format!(
"cap_{}",
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&tampered_bytes)
);
match validator.validate(&tampered) {
ValidationResult::Invalid(msg) => {
assert!(
msg.contains("signature") || msg.contains("Signature"),
"expected signature error for tampered scopes, got: {}",
msg
);
}
other => panic!(
"expected Invalid(signature) for tampered scopes, got: {:?}",
other
),
}
let root = CapabilityToken::create_root(
&key,
vec!["read:/lights/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let result = root.delegate(
&child_key,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
);
assert!(
result.is_err(),
"delegating wider scopes (read -> admin) must be rejected as AttenuationViolation"
);
let err_msg = format!("{}", result.unwrap_err());
assert!(
err_msg.contains("attenuation")
|| err_msg.contains("Attenuation")
|| err_msg.contains("subset"),
"error should mention attenuation violation, got: {}",
err_msg
);
}
#[test]
fn test_cap_03_chain_depth_bypass() {
let key_a = SigningKey::from_bytes(&[1u8; 32]);
let key_b = SigningKey::from_bytes(&[2u8; 32]);
let key_c = SigningKey::from_bytes(&[3u8; 32]);
let key_d = SigningKey::from_bytes(&[4u8; 32]);
let key_e = SigningKey::from_bytes(&[5u8; 32]);
let pub_a = key_a.verifying_key().to_bytes().to_vec();
let validator = CapabilityValidator::new(vec![pub_a], 3);
let t0 = CapabilityToken::create_root(
&key_a,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let t1 = t0
.delegate(
&key_b,
vec!["write:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let t2 = t1
.delegate(
&key_c,
vec!["write:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let t3 = t2
.delegate(
&key_d,
vec!["write:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let t4 = t3
.delegate(
&key_e,
vec!["read:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
assert_eq!(t4.chain_depth(), 4);
let encoded = t4.encode().unwrap();
match validator.validate(&encoded) {
ValidationResult::Invalid(msg) => {
assert!(
msg.contains("deep") || msg.contains("chain") || msg.contains("Chain"),
"expected ChainTooDeep, got: {}",
msg
);
}
other => panic!("expected Invalid(ChainTooDeep), got: {:?}", other),
}
let encoded_3 = t3.encode().unwrap();
assert!(
matches!(validator.validate(&encoded_3), ValidationResult::Valid(_)),
"depth 3 should be accepted with max_depth=3"
);
}
#[test]
fn test_cap_04_proof_chain_manipulation() {
use base64::Engine;
let validator = make_validator();
let key_a = root_key(); let key_b = SigningKey::from_bytes(&[2u8; 32]);
let key_c = SigningKey::from_bytes(&[3u8; 32]);
let t_a = CapabilityToken::create_root(
&key_a,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let t_b = t_a
.delegate(
&key_b,
vec!["write:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let t_c = t_b
.delegate(
&key_c,
vec!["read:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let legit_encoded = t_c.encode().unwrap();
assert!(
matches!(
validator.validate(&legit_encoded),
ValidationResult::Valid(_)
),
"legitimate 3-level chain should validate"
);
let b64 = legit_encoded.strip_prefix("cap_").unwrap();
let bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(b64)
.unwrap();
let mut decoded: CapabilityToken = rmp_serde::from_slice(&bytes).unwrap();
assert!(
!decoded.proofs.is_empty(),
"delegated token must have proof links"
);
decoded.proofs[0].scopes = vec!["read:/narrow".to_string()];
let tampered_bytes = rmp_serde::to_vec_named(&decoded).unwrap();
let tampered = format!(
"cap_{}",
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&tampered_bytes)
);
match validator.validate(&tampered) {
ValidationResult::Invalid(msg) => {
assert!(
msg.contains("signature") || msg.contains("Signature"),
"proof chain tamper should produce signature error, got: {}",
msg
);
}
other => panic!(
"expected Invalid for tampered proof chain, got: {:?}",
other
),
}
let mut decoded2: CapabilityToken = rmp_serde::from_slice(&bytes).unwrap();
decoded2.proofs.remove(0); let stripped_bytes = rmp_serde::to_vec_named(&decoded2).unwrap();
let stripped = format!(
"cap_{}",
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&stripped_bytes)
);
match validator.validate(&stripped) {
ValidationResult::Invalid(msg) => {
assert!(
msg.contains("untrusted")
|| msg.contains("Untrusted")
|| msg.contains("signature")
|| msg.contains("Signature"),
"stripped proof should produce untrusted/signature error, got: {}",
msg
);
}
other => panic!(
"expected Invalid for stripped proof chain, got: {:?}",
other
),
}
}
#[test]
fn test_cap_05_replay_attack() {
let validator = make_validator();
let key = root_key();
let token_1 = CapabilityToken::create_root(
&key,
vec!["write:/lights/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
assert!(!token_1.nonce.is_empty(), "token must have a nonce");
let encoded_1 = token_1.encode().unwrap();
let info_1 = match validator.validate(&encoded_1) {
ValidationResult::Valid(info) => info,
other => panic!("expected Valid on first validation, got: {:?}", other),
};
assert!(
info_1.has_scope(Action::Write, "/lights/room1"),
"validated token must grant write:/lights/room1"
);
let info_2 = match validator.validate(&encoded_1) {
ValidationResult::Valid(info) => info,
other => panic!("expected Valid on replay, got: {:?}", other),
};
assert!(
info_2.has_scope(Action::Write, "/lights/room1"),
"replayed token must still grant same scopes (stateless = no degradation)"
);
let token_2 = CapabilityToken::create_root(
&key,
vec!["write:/lights/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
assert_ne!(
token_1.nonce, token_2.nonce,
"two tokens must have distinct nonces"
);
let encoded_2 = token_2.encode().unwrap();
let info_3 = match validator.validate(&encoded_2) {
ValidationResult::Valid(info) => info,
other => panic!("expected Valid for second token, got: {:?}", other),
};
assert!(
info_3.has_scope(Action::Write, "/lights/room1"),
"second token with same scopes must also validate"
);
}
#[test]
fn test_cap_06_nonce_collision() {
let key = root_key();
let mut nonces = HashSet::new();
for _ in 0..100_000 {
let token = CapabilityToken::create_root(
&key,
vec!["read:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
assert!(
nonces.insert(token.nonce.clone()),
"nonce collision detected: {}",
token.nonce
);
}
assert_eq!(nonces.len(), 100_000);
}
#[test]
fn test_cap_07_trust_anchor_runtime() {
let key_a = root_key();
let key_b = SigningKey::from_bytes(&[2u8; 32]);
let pub_a = key_a.verifying_key().to_bytes().to_vec();
let validator = CapabilityValidator::new(vec![pub_a], 5);
let token_b = CapabilityToken::create_root(
&key_b,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let encoded_b = token_b.encode().unwrap();
assert!(
matches!(validator.validate(&encoded_b), ValidationResult::Invalid(_)),
"key_b must not be accepted as trust anchor"
);
let root_a = CapabilityToken::create_root(
&key_a,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let delegated = root_a
.delegate(
&key_b,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let encoded_del = delegated.encode().unwrap();
assert!(matches!(
validator.validate(&encoded_del),
ValidationResult::Valid(_)
));
assert!(matches!(
validator.validate(&encoded_b),
ValidationResult::Invalid(_)
));
}
#[test]
fn test_cap_08_malformed_msgpack() {
use base64::Engine;
let validator = make_validator();
let random = base64::engine::general_purpose::URL_SAFE_NO_PAD
.encode([0xDE, 0xAD, 0xBE, 0xEF, 0x01, 0x02, 0x03, 0x04]);
assert!(matches!(
validator.validate(&format!("cap_{}", random)),
ValidationResult::Invalid(_)
));
let truncated = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode([0x92]);
assert!(matches!(
validator.validate(&format!("cap_{}", truncated)),
ValidationResult::Invalid(_)
));
assert!(matches!(
validator.validate("cap_!!!not-valid!!!"),
ValidationResult::Invalid(_)
));
assert!(matches!(
validator.validate("cpsk_something"),
ValidationResult::NotMyToken
));
assert!(matches!(
validator.validate("cap_"),
ValidationResult::Invalid(_)
));
let long_garbage = "A".repeat(10000);
let long_b64 = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(long_garbage.as_bytes());
assert!(matches!(
validator.validate(&format!("cap_{}", long_b64)),
ValidationResult::Invalid(_)
));
let int_bytes = rmp_serde::to_vec_named(&42u32).unwrap();
let int_b64 = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&int_bytes);
assert!(matches!(
validator.validate(&format!("cap_{}", int_b64)),
ValidationResult::Invalid(_)
));
}
#[test]
fn test_cap_09_action_hierarchy_escalation() {
let key_a = root_key();
let key_b = SigningKey::from_bytes(&[2u8; 32]);
let read_root = CapabilityToken::create_root(
&key_a,
vec!["read:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let result = read_root.delegate(
&key_b,
vec!["write:/**".to_string()],
future_timestamp(),
None,
);
assert!(result.is_err(), "read -> write delegation must be rejected");
let write_root = CapabilityToken::create_root(
&key_a,
vec!["write:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let result = write_root.delegate(
&key_b,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
);
assert!(
result.is_err(),
"write -> admin delegation must be rejected"
);
let admin_root = CapabilityToken::create_root(
&key_a,
vec!["admin:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let write_child = admin_root
.delegate(
&key_b,
vec!["write:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
let key_c = SigningKey::from_bytes(&[3u8; 32]);
let read_grandchild = write_child
.delegate(
&key_c,
vec!["read:/**".to_string()],
future_timestamp(),
None,
)
.unwrap();
assert!(read_grandchild.verify_signature().is_ok());
}
#[test]
fn test_cap_10_pattern_wildcard_injection() {
use clasp_caps::token::pattern_is_subset;
let key_a = root_key();
let key_b = SigningKey::from_bytes(&[2u8; 32]);
let validator = make_validator();
assert!(
!pattern_is_subset("/lights/**", "/lights/*"),
"/lights/** is wider than /lights/*, not a subset"
);
let narrow_root = CapabilityToken::create_root(
&key_a,
vec!["write:/lights/*".to_string()],
future_timestamp(),
None,
)
.unwrap();
let result = narrow_root.delegate(
&key_b,
vec!["write:/lights/**".to_string()],
future_timestamp(),
None,
);
assert!(
result.is_err(),
"delegation from /* to /** must be rejected"
);
assert!(
matches!(result.unwrap_err(), CapError::AttenuationViolation(_)),
"error must be AttenuationViolation"
);
let parent_encoded = narrow_root.encode().unwrap();
let parent_info = match validator.validate(&parent_encoded) {
ValidationResult::Valid(info) => info,
other => panic!("parent should validate, got: {:?}", other),
};
assert!(
!parent_info.has_scope(Action::Write, "/lights/room1/brightness"),
"parent with /lights/* must NOT grant multi-level access"
);
let result = narrow_root.delegate(
&key_b,
vec!["write:/**".to_string()],
future_timestamp(),
None,
);
assert!(result.is_err(), "/** must not be allowed under /lights/*");
}