#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::indexing_slicing
)]
use proptest::prelude::*;
use rmcp_server_kit::{
auth::{ApiKeyEntry, generate_api_key, verify_bearer_token},
rbac::{ArgumentAllowlist, RbacConfig, RbacPolicy, RoleConfig},
};
const PROPTEST_CASES: u32 = 1024;
const ARGON2_PROPTEST_CASES: u32 = 64;
proptest! {
#![proptest_config(ProptestConfig::with_cases(ARGON2_PROPTEST_CASES))]
#[test]
fn api_key_generate_verify_roundtrip(extra_keys in 0usize..4) {
let (token, hash) = generate_api_key().expect("generate_api_key");
let mut keys = vec![ApiKeyEntry::new("primary", hash, "viewer")];
for i in 0..extra_keys {
let (_decoy_token, decoy_hash) =
generate_api_key().expect("generate_api_key decoy");
keys.push(ApiKeyEntry::new(format!("decoy-{i}"), decoy_hash, "viewer"));
}
let id = verify_bearer_token(&token, &keys);
prop_assert!(id.is_some(), "freshly generated token must verify");
let id = id.expect("verified identity");
prop_assert_eq!(id.name, "primary");
prop_assert_eq!(id.role, "viewer");
}
}
fn token_strategy() -> impl Strategy<Value = String> {
"[a-zA-Z][a-zA-Z0-9_]{0,15}".prop_map(String::from)
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(PROPTEST_CASES))]
#[test]
fn argument_allowed_membership(
allowed in proptest::collection::vec(token_strategy(), 1..8),
candidate in token_strategy(),
) {
let role = RoleConfig::new(
"viewer",
vec!["run_query".into()],
vec!["*".into()],
)
.with_argument_allowlists(vec![ArgumentAllowlist::new(
"run_query",
"cmd",
allowed.clone(),
)]);
let mut config = RbacConfig::with_roles(vec![role]);
config.enabled = true;
let policy = RbacPolicy::new(&config);
let actual = policy.argument_allowed("viewer", "run_query", "cmd", &candidate);
let expected = allowed.iter().any(|v| v == &candidate);
prop_assert_eq!(actual, expected,
"argument_allowed disagrees with set membership");
}
}
fn glob_pattern_strategy() -> impl Strategy<Value = String> {
proptest::collection::vec("[a-z]{1,6}", 1..5).prop_map(|parts| parts.join("*"))
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(PROPTEST_CASES))]
#[test]
fn glob_pattern_never_panics(
pattern in glob_pattern_strategy(),
tool in "[a-z]{1,12}".prop_map(String::from),
) {
let role = RoleConfig::new(
"viewer",
vec!["*".into()],
vec!["*".into()],
)
.with_argument_allowlists(vec![ArgumentAllowlist::new(
pattern,
"cmd",
vec!["ls".into()],
)]);
let mut config = RbacConfig::with_roles(vec![role]);
config.enabled = true;
let policy = RbacPolicy::new(&config);
let _ = policy.argument_allowed("viewer", &tool, "cmd", "ls");
let _ = policy.argument_allowed("viewer", &tool, "cmd", "rm");
}
}