use pmcp_code_mode::{
canonicalize_code, hash_code, ApprovalToken, CodeModeConfig, GraphQLValidator,
HmacTokenGenerator, NoopPolicyEvaluator, PolicyEvaluator, RiskLevel, TokenGenerator,
TokenSecret, ValidationContext, ValidationPipeline,
};
use proptest::prelude::*;
use std::sync::Arc;
proptest! {
#[test]
fn hmac_roundtrip_any_code(
secret in prop::collection::vec(any::<u8>(), 16..64),
code in "[ -~]{1,500}"
) {
let token_secret = TokenSecret::new(secret);
let generator = HmacTokenGenerator::new(token_secret).unwrap();
let token = generator.generate(
&code,
"user-prop",
"session-prop",
"server-prop",
"context-prop",
RiskLevel::Low,
300,
);
prop_assert!(generator.verify(&token).is_ok(),
"verify() failed for generated token");
prop_assert!(generator.verify_code(&code, &token).is_ok(),
"verify_code() failed for original code");
}
#[test]
fn hmac_detects_code_modification(
secret in prop::collection::vec(any::<u8>(), 16..64),
code in "[ -~]{1,200}",
suffix in "[ -~]{1,50}"
) {
let token_secret = TokenSecret::new(secret);
let generator = HmacTokenGenerator::new(token_secret).unwrap();
let token = generator.generate(
&code,
"user-prop",
"session-prop",
"server-prop",
"context-prop",
RiskLevel::Low,
300,
);
let modified_code = format!("{}{}", code, suffix);
if hash_code(&code) != hash_code(&modified_code) {
prop_assert!(generator.verify_code(&modified_code, &token).is_err(),
"verify_code() should fail for modified code");
}
}
#[test]
fn hash_code_deterministic(code in "[ -~]{0,1000}") {
let hash1 = hash_code(&code);
let hash2 = hash_code(&code);
prop_assert_eq!(hash1, hash2);
}
#[test]
fn canonicalize_idempotent(code in "[ -~]{0,500}") {
let c1 = canonicalize_code(&code);
let c2 = canonicalize_code(&c1);
prop_assert_eq!(c1, c2);
}
}
proptest! {
#[test]
fn hmac_rejects_bitflipped_tokens(
secret in prop::collection::vec(any::<u8>(), 32..64),
code in "[ -~]{10,200}",
flip_position in 0usize..200,
) {
let token_secret = TokenSecret::new(secret);
let generator = HmacTokenGenerator::new(token_secret).unwrap();
let token = generator.generate(
&code,
"user-prop",
"session-prop",
"server-prop",
"context-prop",
RiskLevel::Low,
300,
);
let encoded = token.encode().unwrap();
let mut bytes = encoded.into_bytes();
if !bytes.is_empty() {
let idx = flip_position % bytes.len();
bytes[idx] ^= 0x01; let flipped_str = String::from_utf8_lossy(&bytes);
if let Ok(flipped_token) = ApprovalToken::decode(&flipped_str) {
let verify_result = generator.verify(&flipped_token);
let code_result = generator.verify_code(&code, &flipped_token);
prop_assert!(
verify_result.is_err() || code_result.is_err(),
"Bit-flipped token should not pass both verify and verify_code"
);
}
}
}
#[test]
fn hmac_rejects_wrong_secret(
secret1 in prop::collection::vec(any::<u8>(), 32..64),
secret2 in prop::collection::vec(any::<u8>(), 32..64),
code in "[ -~]{10,200}",
) {
prop_assume!(secret1 != secret2);
let gen1 = HmacTokenGenerator::new(TokenSecret::new(secret1)).unwrap();
let gen2 = HmacTokenGenerator::new(TokenSecret::new(secret2)).unwrap();
let token = gen1.generate(
&code,
"user-prop",
"session-prop",
"server-prop",
"context-prop",
RiskLevel::Low,
300,
);
prop_assert!(gen2.verify(&token).is_err(),
"Token generated with one secret should not verify with a different secret");
}
}
proptest! {
#[test]
fn graphql_validation_deterministic(
field_name in "[a-z]{1,15}"
) {
let query = format!("query {{ {} {{ id }} }}", field_name);
let validator = GraphQLValidator::default();
let result1 = validator.validate(&query);
let result2 = validator.validate(&query);
match (result1, result2) {
(Ok(info1), Ok(info2)) => {
prop_assert_eq!(info1.operation_type, info2.operation_type);
prop_assert_eq!(info1.root_fields, info2.root_fields);
prop_assert_eq!(info1.max_depth, info2.max_depth);
}
(Err(_), Err(_)) => {
}
_ => {
prop_assert!(false, "Non-deterministic: one call succeeded and the other failed");
}
}
}
}
#[cfg(test)]
mod token_secret_trait_denials {
use super::*;
#[test]
fn token_secret_is_consumed_on_move() {
let secret = TokenSecret::new(b"test".to_vec());
let _moved = secret;
}
#[test]
fn token_secret_not_debug_canary() {
let secret = TokenSecret::new(b"canary-secret".to_vec());
let _ = secret;
}
#[test]
fn token_secret_not_display_canary() {
let secret = TokenSecret::new(b"canary-secret".to_vec());
let _ = secret;
}
#[test]
fn token_secret_not_serialize_canary() {
let _secret = TokenSecret::new(b"canary-secret".to_vec());
}
#[test]
fn token_secret_expose_secret_returns_correct_bytes() {
let input = b"test-bytes-123";
let secret = TokenSecret::new(input.to_vec());
assert_eq!(secret.expose_secret(), input);
}
}
struct DenyAllEvaluator;
#[pmcp_code_mode::async_trait]
impl PolicyEvaluator for DenyAllEvaluator {
async fn evaluate_operation(
&self,
_operation: &pmcp_code_mode::OperationEntity,
_server_config: &pmcp_code_mode::ServerConfigEntity,
) -> Result<pmcp_code_mode::AuthorizationDecision, pmcp_code_mode::PolicyEvaluationError> {
Ok(pmcp_code_mode::AuthorizationDecision {
allowed: false,
determining_policies: vec!["deny_all_test".to_string()],
errors: vec![],
})
}
fn name(&self) -> &str {
"deny_all_test"
}
}
#[tokio::test]
async fn default_deny_without_noop() {
let config = CodeModeConfig::enabled();
let pipeline = ValidationPipeline::with_policy_evaluator(
config,
b"test-secret-key!".to_vec(),
Arc::new(DenyAllEvaluator),
)
.unwrap();
let ctx = ValidationContext::new("user-123", "session-456", "schema-hash", "perms-hash");
let result = pipeline
.validate_graphql_query_async("query { users { id name } }", &ctx)
.await
.unwrap();
assert!(
!result.is_valid,
"Query should be rejected when DenyAllEvaluator is configured"
);
assert!(
!result.violations.is_empty(),
"There should be policy violations from the deny-all evaluator"
);
}
#[tokio::test]
async fn noop_evaluator_allows_query() {
let config = CodeModeConfig::enabled();
let pipeline = ValidationPipeline::with_policy_evaluator(
config,
b"test-secret-key!".to_vec(),
Arc::new(NoopPolicyEvaluator::new()),
)
.unwrap();
let ctx = ValidationContext::new("user-123", "session-456", "schema-hash", "perms-hash");
let result = pipeline
.validate_graphql_query_async("query { users { id name } }", &ctx)
.await
.unwrap();
assert!(
result.is_valid,
"Query should be allowed when NoopPolicyEvaluator is configured"
);
}