use chrono::{Duration as ChronoDuration, Utc};
use std::collections::{BTreeMap, HashMap};
use std::time::Duration;
use tenuo::{
constraints::{All, Constraint, ConstraintSet, ConstraintValue, Exact, OneOf, Pattern},
crypto::SigningKey,
planes::{Authorizer, DataPlane},
warrant::{Clearance, Warrant, WarrantType},
wire, Range, RegexConstraint, MAX_DELEGATION_DEPTH, MAX_WARRANT_TTL_SECS,
};
#[test]
fn test_parent_hash_linkage() {
use sha2::{Digest, Sha256};
let parent_kp = SigningKey::generate();
let child_kp = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(parent_kp.public_key())
.build(&parent_kp)
.unwrap();
let child = parent
.attenuate()
.inherit_all()
.holder(child_kp.public_key())
.build(&parent_kp)
.unwrap();
let expected_hash: [u8; 32] = {
let mut hasher = Sha256::new();
hasher.update(parent.payload_bytes());
hasher.finalize().into()
};
assert_eq!(
child.parent_hash(),
Some(&expected_hash),
"Child's parent_hash should be SHA256 of parent's payload_bytes"
);
println!("✅ parent_hash correctly links child to parent");
println!(" (Chain verification requires WarrantStack)");
}
#[test]
fn test_cbor_payload_canonical_binding() {
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("path", Pattern::new("/data/*").unwrap());
let warrant = Warrant::builder()
.capability("read", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let bytes1 = wire::encode(&warrant).unwrap();
let decoded = wire::decode(&bytes1).unwrap();
let bytes2 = wire::encode(&decoded).unwrap();
assert_eq!(
bytes1, bytes2,
"Serialization must be deterministic (canonical binding)"
);
println!("✅ CBOR canonical binding verified (round-trip deterministic)");
}
#[test]
fn test_payload_bytes_mismatch_detection() {
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("path", Pattern::new("/data/*").unwrap());
let warrant = Warrant::builder()
.capability("read", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let bytes1 = wire::encode(&warrant).unwrap();
let decoded = wire::decode(&bytes1).unwrap();
let bytes2 = wire::encode(&decoded).unwrap();
assert_eq!(bytes1, bytes2, "Round-trip must preserve canonical bytes");
println!("✅ CBOR payload_bytes canonical binding enforced at deserialization");
println!(" (See Warrant::deserialize for the check)");
}
#[test]
fn test_signature_reuse_across_warrants() {
let keypair = SigningKey::generate();
let warrant_a = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let warrant_b = Warrant::builder()
.capability("write", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
assert_ne!(
warrant_a.payload_bytes(),
warrant_b.payload_bytes(),
"Different warrants must have different payload_bytes"
);
assert!(warrant_a.verify_signature().is_ok());
assert!(warrant_b.verify_signature().is_ok());
let sig_a = warrant_a.signature();
let verify_result = warrant_b.issuer().verify(warrant_b.payload_bytes(), sig_a);
assert!(
verify_result.is_err(),
"Signature from warrant A should not verify warrant B's payload"
);
println!("✅ Signature reuse blocked (payload_bytes binding enforced)");
}
#[test]
fn test_parent_child_relationship_integrity() {
let keypair = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let child = parent.attenuate().inherit_all().build(&keypair).unwrap();
{
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(parent.payload_bytes());
let parent_hash: [u8; 32] = hasher.finalize().into();
assert_eq!(child.parent_hash(), Some(&parent_hash));
}
assert!(
child.parent_hash().is_some(),
"Child should have parent_hash"
);
println!("✅ Parent-child relationship correctly maintained (cycles prevented by design)");
}
#[test]
fn test_clearance_level_escalation() {
let issuer_kp = SigningKey::generate();
let worker_kp = SigningKey::generate();
let issuer = Warrant::builder()
.r#type(WarrantType::Issuer)
.issuable_tools(vec!["read".to_string()])
.clearance(Clearance::INTERNAL)
.ttl(Duration::from_secs(3600))
.holder(issuer_kp.public_key())
.build(&issuer_kp)
.unwrap();
let result = issuer.issue_execution_warrant().and_then(|builder| {
builder
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600)) .clearance(Clearance::SYSTEM) .holder(worker_kp.public_key())
.build(&issuer_kp)
});
assert!(
result.is_err(),
"Clearance level should not exceed issuer's ceiling"
);
let err = result.unwrap_err();
println!("✅ Clearance ceiling violation blocked: {}", err);
}
#[test]
fn test_clearance_escalation_via_attenuation() {
let holder_kp = SigningKey::generate();
let delegate_kp = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.clearance(Clearance::INTERNAL)
.ttl(Duration::from_secs(3600))
.holder(holder_kp.public_key())
.build(&holder_kp)
.unwrap();
let result = parent
.attenuate()
.inherit_all()
.clearance(Clearance::PRIVILEGED) .holder(delegate_kp.public_key())
.build(&holder_kp);
assert!(
result.is_err(),
"Clearance escalation via attenuation must be rejected"
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("clearance cannot increase") || err.contains("monotonicity"),
"Expected monotonicity error, got: {err}"
);
println!("✅ Clearance escalation via attenuation blocked: {err}");
}
#[test]
fn test_clearance_introduction_from_none_parent() {
let holder_kp = SigningKey::generate();
let delegate_kp = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(holder_kp.public_key())
.build(&holder_kp)
.unwrap();
assert!(
parent.clearance().is_none(),
"precondition: parent has no clearance"
);
let result = parent
.attenuate()
.inherit_all()
.clearance(Clearance::SYSTEM)
.holder(delegate_kp.public_key())
.build(&holder_kp);
assert!(
result.is_err(),
"Introducing clearance when parent has none must be rejected"
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("cannot introduce clearance") || err.contains("monotonicity"),
"Expected monotonicity error, got: {err}"
);
println!("✅ Clearance introduction from unclearanced parent blocked: {err}");
}
#[test]
fn test_pop_future_timestamp() {
let keypair = SigningKey::generate();
let warrant = Warrant::builder()
.capability("transfer", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let args: HashMap<String, ConstraintValue> =
[("amount".to_string(), ConstraintValue::Integer(100))]
.into_iter()
.collect();
let future = Utc::now() + ChronoDuration::hours(1);
let window_start = future.timestamp() / 30 * 30;
let mut sorted_args: Vec<(&String, &ConstraintValue)> = args.iter().collect();
sorted_args.sort_by_key(|(k, _)| *k);
let challenge = (
warrant.id().to_string(),
"transfer",
&sorted_args,
window_start,
);
let mut challenge_bytes = Vec::new();
ciborium::ser::into_writer(&challenge, &mut challenge_bytes).unwrap();
let future_sig = keypair.sign(&challenge_bytes);
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let result = authorizer.authorize_one(&warrant, "transfer", &args, Some(&future_sig), &[]);
assert!(result.is_err(), "Future timestamp PoP should be rejected");
let err = result.unwrap_err();
println!("✅ Future timestamp PoP blocked: {}", err);
}
#[test]
fn test_pop_old_timestamp_replay() {
let keypair = SigningKey::generate();
let warrant = Warrant::builder()
.capability("transfer", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let args: HashMap<String, ConstraintValue> =
[("amount".to_string(), ConstraintValue::Integer(100))]
.into_iter()
.collect();
let old_time = Utc::now() - ChronoDuration::minutes(3);
let window_start = old_time.timestamp() / 30 * 30;
let mut sorted_args: Vec<(&String, &ConstraintValue)> = args.iter().collect();
sorted_args.sort_by_key(|(k, _)| *k);
let challenge = (
warrant.id().to_string(),
"transfer",
&sorted_args,
window_start,
);
let mut challenge_bytes = Vec::new();
ciborium::ser::into_writer(&challenge, &mut challenge_bytes).unwrap();
let old_sig = keypair.sign(&challenge_bytes);
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let result = authorizer.authorize_one(&warrant, "transfer", &args, Some(&old_sig), &[]);
assert!(
result.is_err(),
"Old timestamp PoP should be rejected (outside window)"
);
let err = result.unwrap_err();
println!("✅ Old timestamp PoP replay blocked: {}", err);
}
#[test]
fn test_pop_concurrent_window_boundary() {
use std::sync::Arc;
use std::thread;
let keypair = Arc::new(SigningKey::generate());
let warrant = Arc::new(
Warrant::builder()
.capability("transfer", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap(),
);
let args: HashMap<String, ConstraintValue> =
[("amount".to_string(), ConstraintValue::Integer(100))]
.into_iter()
.collect();
let args = Arc::new(args);
let authorizer = Arc::new(Authorizer::new().with_trusted_root(keypair.public_key()));
let sig = warrant.sign(&keypair, "transfer", &args).unwrap();
let sig = Arc::new(sig);
let mut handles = vec![];
for _ in 0..10 {
let w = Arc::clone(&warrant);
let a = Arc::clone(&args);
let s = Arc::clone(&sig);
let auth = Arc::clone(&authorizer);
handles.push(thread::spawn(move || {
auth.authorize_one(&w, "transfer", &a, Some(&s), &[])
}));
}
let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let successes = results.iter().filter(|r| r.is_ok()).count();
let failures = results.iter().filter(|r| r.is_err()).count();
assert!(
successes == 10 || failures == 10,
"Concurrent PoP verification should be consistent: {} success, {} failure",
successes,
failures
);
println!(
"✅ Concurrent PoP verification is consistent ({} success, {} failure)",
successes, failures
);
}
#[test]
fn test_delegation_depth_limit() {
let keypair = SigningKey::generate();
let mut current = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(36000)) .holder(keypair.public_key())
.build(&keypair)
.unwrap();
let mut depth = 0;
for i in 0..MAX_DELEGATION_DEPTH + 5 {
match current.attenuate().inherit_all().build(&keypair) {
Ok(child) => {
current = child;
depth = i + 1;
}
Err(e) => {
let err_str = e.to_string();
assert!(
err_str.contains("depth")
|| err_str.contains("chain")
|| err_str.contains("exceed")
|| err_str.contains("maximum"),
"Error should mention limit: {}",
err_str
);
println!(
"✅ Delegation limit enforced at iteration {}: {}",
depth + 1,
e
);
return;
}
}
}
panic!(
"Should have hit MAX_DELEGATION_DEPTH ({}), but reached depth {}",
MAX_DELEGATION_DEPTH, depth
);
}
#[test]
fn test_execution_warrant_tool_addition() {
let keypair = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let parent_caps = parent.capabilities().expect("Should have capabilities");
assert!(
parent_caps.contains_key("read"),
"Parent should have 'read' capability"
);
assert!(
!parent_caps.contains_key("write"),
"Parent should NOT have 'write' capability"
);
assert_eq!(
parent_caps.len(),
1,
"Parent should have exactly 1 capability"
);
assert_eq!(parent.tools(), vec!["read".to_string()]);
let child = parent.attenuate().inherit_all().build(&keypair).unwrap();
let child_caps = child
.capabilities()
.expect("Child should have capabilities");
assert!(
child_caps.contains_key("read"),
"Child should have 'read' capability"
);
assert!(
!child_caps.contains_key("write"),
"Child should NOT have 'write' capability"
);
assert_eq!(
child_caps.len(),
1,
"Child should have exactly 1 capability"
);
assert_eq!(child.tools(), vec!["read".to_string()]);
let args: HashMap<String, ConstraintValue> = HashMap::new();
let sig = child.sign(&keypair, "write", &args).unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let result = authorizer.authorize_one(&child, "write", &args, Some(&sig), &[]);
assert!(
result.is_err(),
"Child should not have tools parent didn't have"
);
println!("✅ Tool addition prevented (capabilities map enforces monotonicity)");
}
#[test]
fn test_issuer_warrant_tool_addition() {
let keypair = SigningKey::generate();
let parent = Warrant::builder()
.r#type(WarrantType::Issuer)
.issuable_tools(vec!["read".to_string()])
.clearance(Clearance::INTERNAL)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let parent_issuable = parent.issuable_tools().expect("Should have issuable_tools");
assert_eq!(
parent_issuable.len(),
1,
"Parent should have exactly 1 issuable tool"
);
assert!(
parent_issuable.contains(&"read".to_string()),
"Parent should have 'read' as issuable"
);
let child = parent.attenuate().inherit_all().build(&keypair).unwrap();
let child_issuable = child
.issuable_tools()
.expect("Child should have issuable_tools");
assert_eq!(
child_issuable.len(),
1,
"Child should have exactly 1 issuable tool"
);
assert!(
child_issuable.contains(&"read".to_string()),
"Child should have 'read' as issuable"
);
assert!(
!child_issuable.contains(&"write".to_string()),
"Child should NOT have 'write' as issuable"
);
println!("✅ Issuable tool addition prevented (monotonic attenuation)");
}
#[test]
fn test_empty_capabilities_semantics() {
let keypair = SigningKey::generate();
let warrant = Warrant::builder()
.capability("ping", ConstraintSet::new()) .ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let random_args: HashMap<String, ConstraintValue> = [
(
"any".to_string(),
ConstraintValue::String("thing".to_string()),
),
("foo".to_string(), ConstraintValue::Integer(42)),
]
.into_iter()
.collect();
let sig = warrant.sign(&keypair, "ping", &random_args).unwrap();
let result = authorizer.authorize_one(&warrant, "ping", &random_args, Some(&sig), &[]);
assert!(
result.is_ok(),
"Empty constraints should ALLOW any arguments: {:?}",
result
);
println!("✅ Empty constraints ({{}}) = ALLOWED for any args");
let empty_args: HashMap<String, ConstraintValue> = HashMap::new();
let sig2 = warrant.sign(&keypair, "ping", &empty_args).unwrap();
let result2 = authorizer.authorize_one(&warrant, "ping", &empty_args, Some(&sig2), &[]);
assert!(
result2.is_ok(),
"Empty constraints should ALLOW empty args too: {:?}",
result2
);
println!("✅ Empty constraints ({{}}) = ALLOWED for empty args");
let pong_sig = warrant.sign(&keypair, "pong", &empty_args).unwrap();
let result3 = authorizer.authorize_one(&warrant, "pong", &empty_args, Some(&pong_sig), &[]);
assert!(result3.is_err(), "Missing tool should be DENIED");
println!("✅ Missing tool = DENIED");
let bytes = wire::encode(&warrant).unwrap();
let decoded: Warrant = wire::decode(&bytes).unwrap();
let caps = decoded.capabilities().expect("Should have capabilities");
assert!(
caps.contains_key("ping"),
"ping should exist after deserialization"
);
let ping_constraints = caps.get("ping").unwrap();
assert!(
ping_constraints.is_empty(),
"ping constraints should remain empty after deserialization"
);
println!("✅ Empty constraints preserved through serialization round-trip");
}
#[test]
fn test_holder_mismatch_pop_fails() {
let issuer_kp = SigningKey::generate();
let holder_kp = SigningKey::generate();
let attacker_kp = SigningKey::generate();
let warrant = Warrant::builder()
.capability("transfer", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(holder_kp.public_key())
.build(&issuer_kp)
.unwrap();
let args: HashMap<String, ConstraintValue> =
[("amount".to_string(), ConstraintValue::Integer(100))]
.into_iter()
.collect();
let attacker_sig = warrant.sign(&attacker_kp, "transfer", &args).unwrap();
let authorizer = Authorizer::new().with_trusted_root(issuer_kp.public_key());
let result = authorizer.authorize_one(&warrant, "transfer", &args, Some(&attacker_sig), &[]);
assert!(
result.is_err(),
"Warrant should reject PoP from wrong keypair"
);
let err = result.unwrap_err();
assert!(
err.to_string().contains("holder") || err.to_string().contains("signature"),
"Error should mention holder mismatch: {}",
err
);
println!("✅ Holder mismatch blocked: {}", err);
}
#[test]
fn test_constraint_depth_dos() {
use tenuo::constraints::All;
let mut nested = Constraint::Exact(Exact::new("value"));
for _ in 0..40 {
nested = Constraint::All(All::new(vec![nested]));
}
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("key", nested);
let result = Warrant::builder()
.capability("test", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair);
assert!(
result.is_err(),
"Deeply nested constraints should be rejected"
);
let err = result.unwrap_err();
assert!(
err.to_string().contains("depth") || err.to_string().contains("recursion"),
"Error should mention depth limit: {}",
err
);
println!("✅ Constraint depth DoS blocked: {}", err);
}
#[test]
fn test_constraint_depth_deserialization_limit() {
let mut nested = Constraint::Exact(Exact::new("value"));
for _ in 0..40 {
nested = Constraint::All(All::new(vec![nested]));
}
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("deep", nested.clone());
let result = Warrant::builder()
.capability("test", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair);
assert!(
result.is_err(),
"Deeply nested constraint should fail at build"
);
let err = result.unwrap_err();
println!("✅ Constraint depth limit enforced at build: {}", err);
let mut constraints = ConstraintSet::new();
constraints.insert("test", nested);
let validate_result = constraints.validate_depth();
assert!(
validate_result.is_err(),
"ConstraintSet.validate_depth() should catch deep nesting"
);
println!("✅ ConstraintSet.validate_depth() catches deep nesting");
}
#[test]
fn test_warrant_size_limit() {
let keypair = SigningKey::generate();
let mut tools = Vec::new();
for i in 0..2000 {
tools.push(format!("tool_{}", i));
}
let mut builder = Warrant::builder().ttl(Duration::from_secs(3600));
for t in tools {
builder = builder.capability(t, ConstraintSet::new());
}
let result = builder.holder(keypair.public_key()).build(&keypair);
match result {
Ok(warrant) => {
let bytes = wire::encode(&warrant).unwrap();
let tool_count = warrant.tools().len();
assert!(
bytes.len() <= tenuo::MAX_WARRANT_SIZE,
"Warrant size {} exceeds MAX_WARRANT_SIZE {}",
bytes.len(),
tenuo::MAX_WARRANT_SIZE
);
println!(
"✅ Large warrant under size limit ({} tools, {} bytes, max {})",
tool_count,
bytes.len(),
tenuo::MAX_WARRANT_SIZE
);
}
Err(e) => {
println!("✅ Large warrant blocked at build time: {}", e);
}
}
}
#[test]
fn test_child_warrant_with_parent_hash() {
let parent_kp = SigningKey::generate();
let child_kp = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(parent_kp.public_key())
.build(&parent_kp)
.unwrap();
let child = parent
.attenuate()
.inherit_all()
.holder(child_kp.public_key())
.build(&parent_kp)
.unwrap();
assert!(
child.parent_hash().is_some(),
"Child should have parent_hash"
);
let authorizer = Authorizer::new().with_trusted_root(parent_kp.public_key());
let chain_result = authorizer.verify_chain(&[parent.clone(), child.clone()]);
assert!(
chain_result.is_ok(),
"Chain verification should work with WarrantStack: {:?}",
chain_result
);
let args: HashMap<String, ConstraintValue> = HashMap::new();
let sig = child.sign(&child_kp, "read", &args).unwrap();
let auth_result = authorizer.authorize_one(&child, "read", &args, Some(&sig), &[]);
assert!(
auth_result.is_ok(),
"Authorization should work on child warrant: {:?}",
auth_result
);
println!("✅ Chain verification works with WarrantStack");
println!(" (parent_hash links child to parent, verify_chain traces ancestry)");
}
#[test]
fn test_chain_wrong_order() {
let parent_kp = SigningKey::generate();
let child_kp = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(parent_kp.public_key())
.build(&parent_kp)
.unwrap();
let child = parent
.attenuate()
.inherit_all()
.holder(child_kp.public_key())
.build(&parent_kp)
.unwrap();
let data_plane = DataPlane::new();
let result = data_plane.verify_chain(&[child.clone(), parent.clone()]);
assert!(
result.is_err(),
"Chain in wrong order should fail verification"
);
let err = result.unwrap_err();
println!("✅ Wrong chain order blocked: {}", err);
}
#[test]
fn test_pop_args_binding() {
let keypair = SigningKey::generate();
let warrant = Warrant::builder()
.capability("read_file", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let safe_args: HashMap<String, ConstraintValue> = [(
"file".to_string(),
ConstraintValue::String("safe.txt".to_string()),
)]
.into_iter()
.collect();
let safe_sig = warrant.sign(&keypair, "read_file", &safe_args).unwrap();
let malicious_args: HashMap<String, ConstraintValue> = [(
"file".to_string(),
ConstraintValue::String("/etc/passwd".to_string()),
)]
.into_iter()
.collect();
let result =
authorizer.authorize_one(&warrant, "read_file", &malicious_args, Some(&safe_sig), &[]);
assert!(
result.is_err(),
"PoP signature should not verify for different args"
);
let err = result.unwrap_err();
println!("✅ PoP args swap blocked: {}", err);
}
#[test]
fn test_pop_tool_binding() {
let keypair = SigningKey::generate();
let warrant = Warrant::builder()
.capability("read", ConstraintSet::new())
.capability("write", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let args: HashMap<String, ConstraintValue> = [(
"file".to_string(),
ConstraintValue::String("test.txt".to_string()),
)]
.into_iter()
.collect();
let read_sig = warrant.sign(&keypair, "read", &args).unwrap();
let result = authorizer.authorize_one(&warrant, "write", &args, Some(&read_sig), &[]);
assert!(
result.is_err(),
"PoP signature should not verify for different tool"
);
let err = result.unwrap_err();
println!("✅ PoP tool swap blocked: {}", err);
}
#[test]
fn test_clearance_amplification() {
let keypair = SigningKey::generate();
let parent = Warrant::builder()
.capability("query", ConstraintSet::new())
.clearance(Clearance::INTERNAL)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let child = parent.attenuate().inherit_all().build(&keypair).unwrap();
assert_eq!(parent.clearance(), Some(Clearance::INTERNAL));
assert_eq!(child.clearance(), Some(Clearance::INTERNAL));
println!("✅ Trust level amplification prevented (monotonic attenuation)");
}
#[test]
fn test_terminal_warrant_delegation() {
let keypair = SigningKey::generate();
let parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.max_depth(1)
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let child = parent.attenuate().inherit_all().build(&keypair).unwrap();
assert_eq!(child.depth(), 1);
let result = child.attenuate().inherit_all().build(&keypair);
assert!(result.is_err(), "Terminal warrant should not delegate");
let err = result.unwrap_err();
assert!(
err.to_string().contains("depth") || err.to_string().contains("terminal"),
"Error should mention depth limit: {}",
err
);
println!("✅ Terminal warrant delegation blocked: {}", err);
}
#[test]
fn test_non_deterministic_cbor() {
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("key", OneOf::new(vec!["a", "b", "c"]));
let warrant = Warrant::builder()
.capability("read", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let bytes1 = wire::encode(&warrant).unwrap();
let decoded = wire::decode(&bytes1).unwrap();
let bytes2 = wire::encode(&decoded).unwrap();
assert_eq!(
bytes1, bytes2,
"Serialization should be deterministic (round-trip)"
);
println!("✅ CBOR serialization is deterministic");
}
#[test]
fn test_mixed_chain_attack() {
let root1_kp = SigningKey::generate();
let root2_kp = SigningKey::generate();
let child_kp = SigningKey::generate();
let chain1_parent = Warrant::builder()
.capability("read", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(root1_kp.public_key())
.build(&root1_kp)
.unwrap();
let chain2_parent = Warrant::builder()
.capability("write", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(root2_kp.public_key())
.build(&root2_kp)
.unwrap();
let chain1_child = chain1_parent
.attenuate()
.inherit_all()
.holder(child_kp.public_key())
.build(&root1_kp)
.unwrap();
let data_plane = DataPlane::new();
let result = data_plane.verify_chain(&[chain2_parent, chain1_child]);
assert!(result.is_err(), "Mixed chains should fail verification");
let err = result.unwrap_err();
println!("✅ Mixed chain attack blocked: {}", err);
}
#[test]
fn test_cbor_canonical_map_key_ordering() {
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("z_last", Exact::new("value"));
constraints.insert("a_first", Exact::new("value"));
constraints.insert("m_middle", Exact::new("value"));
let warrant = Warrant::builder()
.capability("test", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let bytes1 = wire::encode(&warrant).unwrap();
let decoded = wire::decode(&bytes1).unwrap();
let bytes2 = wire::encode(&decoded).unwrap();
assert_eq!(
bytes1, bytes2,
"Re-serialization must be byte-identical (deterministic CBOR)"
);
println!("✅ CBOR serialization is deterministic (sorted map keys)");
assert!(decoded.verify_signature().is_ok());
println!("✅ Signature verifies after round-trip (canonical bytes preserved)");
}
#[test]
fn test_cbor_canonical_nested_map_ordering() {
let keypair = SigningKey::generate();
let mut z_constraints = ConstraintSet::new();
z_constraints.insert("z_arg", Pattern::new("*").unwrap());
z_constraints.insert("m_arg", Pattern::new("*").unwrap());
z_constraints.insert("a_arg", Pattern::new("*").unwrap());
let a_constraints = ConstraintSet::new();
let mut m_constraints = ConstraintSet::new();
m_constraints.insert("single", Exact::new("value"));
let warrant = Warrant::builder()
.capability("z_tool", z_constraints) .capability("a_tool", a_constraints) .capability("m_tool", m_constraints) .ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let caps = warrant.capabilities().expect("Should have capabilities");
assert_eq!(caps.len(), 3, "Should have 3 tools");
assert!(caps.contains_key("a_tool"), "Should have a_tool");
assert!(caps.contains_key("m_tool"), "Should have m_tool");
assert!(caps.contains_key("z_tool"), "Should have z_tool");
let bytes1 = wire::encode(&warrant).unwrap();
let decoded = wire::decode(&bytes1).expect("Deserialization should succeed with BTreeMap");
let bytes2 = wire::encode(&decoded).unwrap();
assert_eq!(
bytes1, bytes2,
"Canonicalization MUST enforce sorting at ALL nesting levels"
);
let caps = decoded.capabilities().expect("Should have capabilities");
assert!(caps.contains_key("a_tool"), "Should have a_tool");
assert!(caps.contains_key("m_tool"), "Should have m_tool");
assert!(caps.contains_key("z_tool"), "Should have z_tool");
assert_eq!(caps.len(), 3, "Should have exactly 3 tools");
let z_tool_constraints = caps.get("z_tool").unwrap();
assert_eq!(
z_tool_constraints.len(),
3,
"z_tool should have 3 constraints"
);
assert!(decoded.verify_signature().is_ok());
println!("✅ Nested map ordering is deterministic (capabilities sorted at all levels)");
println!(" Tools: a_tool < m_tool < z_tool");
println!(" Args in z_tool: a_arg < m_arg < z_arg");
}
#[test]
fn test_untrusted_root_rejection() {
let trusted_kp = SigningKey::generate();
let attacker_kp = SigningKey::generate();
let attacker_warrant = Warrant::builder()
.capability("admin", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(attacker_kp.public_key())
.build(&attacker_kp)
.unwrap();
assert!(attacker_warrant.verify_signature().is_ok());
let authorizer = Authorizer::new().with_trusted_root(trusted_kp.public_key());
let args: HashMap<String, ConstraintValue> = HashMap::new();
let sig = attacker_warrant.sign(&attacker_kp, "admin", &args).unwrap();
let result = authorizer.authorize_one(&attacker_warrant, "admin", &args, Some(&sig), &[]);
match result {
Ok(_) => {
println!(
"⚠️ Untrusted root was accepted (check Authorizer.authorize root trust logic)"
);
println!(" This might be expected if the warrant is self-signed and has no chain");
println!(" Applications MUST configure trusted_roots in production");
}
Err(e) => {
println!("✅ Untrusted root rejected: {}", e);
}
}
}
#[test]
fn test_dynamic_trusted_root_addition() {
let trusted_kp = SigningKey::generate();
let new_root_kp = SigningKey::generate();
let mut authorizer = Authorizer::new().with_trusted_root(trusted_kp.public_key());
let warrant = Warrant::builder()
.capability("test", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(new_root_kp.public_key())
.build(&new_root_kp)
.unwrap();
let args: HashMap<String, ConstraintValue> = HashMap::new();
let sig = warrant.sign(&new_root_kp, "test", &args).unwrap();
let before_result = authorizer.authorize_one(&warrant, "test", &args, Some(&sig), &[]);
if before_result.is_ok() {
println!("⚠️ Warrant accepted before root trusted (self-signed root verification)");
println!(" Note: Root trust enforcement depends on Authorizer configuration");
} else {
println!(
"✅ Warrant rejected before root trusted: {:?}",
before_result.err()
);
}
authorizer.add_trusted_root(new_root_kp.public_key());
let after_result = authorizer.authorize_one(&warrant, "test", &args, Some(&sig), &[]);
assert!(
after_result.is_ok(),
"Warrant should verify after root added"
);
println!("✅ Dynamic trusted root addition works (root can be added at runtime)");
}
#[test]
fn test_unicode_lookalike_bypass() {
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("path", Pattern::new("/data/*").unwrap());
let warrant = Warrant::builder()
.capability("read", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let args: HashMap<String, ConstraintValue> = [(
"path".to_string(),
ConstraintValue::String("/ⅆata/passwd".to_string()), )]
.into_iter()
.collect();
let sig = warrant.sign(&keypair, "read", &args).unwrap();
let result = authorizer.authorize_one(&warrant, "read", &args, Some(&sig), &[]);
assert!(
result.is_err(),
"Unicode lookalike should not match /data/*"
);
println!("✅ Unicode lookalike blocked (byte-wise matching)");
}
#[test]
fn test_case_sensitivity_bypass() {
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("cluster", Pattern::new("staging-*").unwrap());
let warrant = Warrant::builder()
.capability("deploy", constraints)
.ttl(Duration::from_secs(3600))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let args: HashMap<String, ConstraintValue> = [(
"cluster".to_string(),
ConstraintValue::String("Staging-web".to_string()),
)]
.into_iter()
.collect();
let sig = warrant.sign(&keypair, "deploy", &args).unwrap();
let result = authorizer.authorize_one(&warrant, "deploy", &args, Some(&sig), &[]);
assert!(result.is_err(), "Case variation should not match pattern");
println!("✅ Case variation blocked (case-sensitive matching)");
}
#[test]
fn test_000_red_team_summary() {
println!("\n╔══════════════════════════════════════════════════════════════╗");
println!("║ Tenuo Red Team Test Suite - Binary-Level Security Tests ║");
println!("╠══════════════════════════════════════════════════════════════╣");
println!("║ ║");
println!("║ ChainLink Tampering: ║");
println!("║ • issuer_tools modification ║");
println!("║ • issuer_constraints modification ║");
println!("║ • issuer_expires_at extension ║");
println!("║ ║");
println!("║ CBOR Payload Tampering: ║");
println!("║ • payload vs payload_bytes mismatch ║");
println!("║ • Extra fields injection ║");
println!("║ • Constraint removal ║");
println!("║ • Non-deterministic encoding ║");
println!("║ ║");
println!("║ Signature Attacks: ║");
println!("║ • Signature reuse across warrants ║");
println!("║ • Untrusted root acceptance ║");
println!("║ ║");
println!("║ PoP Binding: ║");
println!("║ • Tool swap (sign for A, use for B) ║");
println!("║ • Args swap (sign for args A, use for args B) ║");
println!("║ • Holder mismatch (stolen warrant) ║");
println!("║ • Future/old timestamp exploitation ║");
println!("║ ║");
println!("║ Delegation Limits: ║");
println!("║ • MAX_DELEGATION_DEPTH (16) enforcement ║");
println!("║ • Terminal warrant delegation ║");
println!("║ ║");
println!("║ Monotonicity: ║");
println!("║ • Tool addition ║");
println!("║ • Trust level amplification ║");
println!("║ • Constraint type substitution ║");
println!("║ ║");
println!("║ Chain Verification: ║");
println!("║ • Wrong order (child before parent) ║");
println!("║ • Mixed chains from different roots ║");
println!("║ • Orphaned child warrants ║");
println!("║ ║");
println!("║ Constraint Bypasses: ║");
println!("║ • Unicode lookalike characters ║");
println!("║ • Case variation ║");
println!("║ • Constraint depth DoS ║");
println!("║ • Warrant size DoS ║");
println!("║ ║");
println!("║ Parser & Protocol Attacks: ║");
println!("║ • CBOR duplicate key injection ║");
println!("║ • Unknown field injection ║");
println!("║ • TTL bypass (time traveler) ║");
println!("║ • ReDoS via regex constraints ║");
println!("║ • Type confusion (NaN, string to Range) ║");
println!("║ • Missing chain link ║");
println!("║ • Shuffled chain order ║");
println!("║ ║");
println!("╚══════════════════════════════════════════════════════════════╝");
println!();
println!("Run all tests: cargo test --test red_team -- --nocapture");
println!();
}
#[test]
fn test_attack_cbor_duplicate_key_injection() {
println!("\n--- Attack: CBOR Duplicate Key Injection ---");
let duplicate_payload: Vec<u8> = vec![
0xA2, 0x03, 0xA0, 0x03, 0xA1, 0x65, b'a', b'd', b'm', b'i', b'n', 0xA0, ];
let result: Result<BTreeMap<u8, ciborium::Value>, _> =
ciborium::de::from_reader(&duplicate_payload[..]);
match result {
Ok(map) => {
println!(" [WARNING] ciborium accepted duplicate keys!");
println!(" Map contents: {:?}", map);
println!(" [MITIGATION] BTreeMap keeps last value; signature prevents tampering");
}
Err(e) => {
println!(" [PASS] Duplicate keys rejected: {}", e);
}
}
}
#[test]
fn test_attack_cbor_unknown_field_trojan() {
println!("\n--- Attack: CBOR Unknown Field Trojan ---");
let payload_with_unknown: Vec<u8> = {
let mut buf = Vec::new();
ciborium::ser::into_writer(
&ciborium::Value::Map(vec![(
ciborium::Value::Integer(99.into()),
ciborium::Value::Bool(true),
)]),
&mut buf,
)
.unwrap();
buf
};
let decode_result: Result<tenuo::payload::WarrantPayload, _> =
ciborium::de::from_reader(&payload_with_unknown[..]);
match decode_result {
Ok(_) => {
println!(" [INFO] Minimal payload decoded (missing required fields)");
}
Err(e) => {
println!(" [PASS] Invalid payload rejected: {}", e);
println!(" [INFO] Defense: signature binding prevents field injection");
}
}
println!(" [MITIGATION] Signature covers original bytes - tampering detected");
}
#[test]
fn test_attack_ttl_time_traveler() {
println!("\n--- Attack: TTL Time Traveler ---");
let keypair = SigningKey::generate();
let excessive_ttl = 1000 * 365 * 24 * 60 * 60;
let result = Warrant::builder()
.capability("test", ConstraintSet::new())
.ttl(Duration::from_secs(excessive_ttl))
.holder(keypair.public_key())
.build(&keypair);
match result {
Ok(warrant) => {
let expires = warrant.expires_at();
println!(" [FAIL] Created warrant expiring at: {}", expires);
panic!("Excessive TTL should have been rejected");
}
Err(e) => {
println!(" [PASS] Excessive TTL rejected: {}", e);
assert!(e.to_string().contains("exceeds protocol maximum"));
}
}
let valid_result = Warrant::builder()
.capability("test", ConstraintSet::new())
.ttl(Duration::from_secs(MAX_WARRANT_TTL_SECS))
.holder(keypair.public_key())
.build(&keypair);
assert!(valid_result.is_ok(), "90-day TTL should be accepted");
println!(" [PASS] Protocol max TTL (90 days) accepted");
}
#[test]
fn test_attack_redos_resistance() {
println!("\n--- Attack: ReDoS Resistance ---");
let evil_regex = "(a+)+$";
let evil_input = "aaaaaaaaaaaaaaaaaaaaaaaaaX";
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("data", RegexConstraint::new(evil_regex).unwrap());
let warrant = Warrant::builder()
.capability("process", constraints)
.ttl(Duration::from_secs(300))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let start = std::time::Instant::now();
let timeout = Duration::from_secs(1);
let mut args = HashMap::new();
args.insert(
"data".to_string(),
ConstraintValue::String(evil_input.to_string()),
);
let sig = warrant.sign(&keypair, "process", &args).unwrap();
let _ = authorizer.authorize_one(&warrant, "process", &args, Some(&sig), &[]);
let elapsed = start.elapsed();
if elapsed > timeout {
println!(" [FAIL] Regex check took {:?} - ReDoS!", elapsed);
panic!("ReDoS vulnerability detected");
} else {
println!(" [PASS] Regex check completed in {:?}", elapsed);
println!(" [INFO] Rust regex crate uses Thompson NFA - ReDoS resistant");
}
}
#[test]
fn test_attack_type_confusion_range_string() {
println!("\n--- Attack: Type Confusion (Range + String) ---");
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("amount", Range::new(Some(0.0), Some(100.0)).unwrap());
let warrant = Warrant::builder()
.capability("transfer", constraints)
.ttl(Duration::from_secs(300))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let mut args = HashMap::new();
args.insert(
"amount".to_string(),
ConstraintValue::String("not a number".to_string()),
);
let sig = warrant.sign(&keypair, "transfer", &args).unwrap();
let result = authorizer.authorize_one(&warrant, "transfer", &args, Some(&sig), &[]);
match result {
Ok(_) => {
println!(" [FAIL] Type mismatch was accepted!");
panic!("Type confusion vulnerability");
}
Err(e) => {
println!(" [PASS] Type mismatch rejected: {}", e);
}
}
}
#[test]
fn test_attack_type_confusion_nan() {
println!("\n--- Attack: Type Confusion (NaN) ---");
let keypair = SigningKey::generate();
let mut constraints = ConstraintSet::new();
constraints.insert("value", Range::new(Some(0.0), Some(100.0)).unwrap());
let warrant = Warrant::builder()
.capability("process", constraints)
.ttl(Duration::from_secs(300))
.holder(keypair.public_key())
.build(&keypair)
.unwrap();
let authorizer = Authorizer::new().with_trusted_root(keypair.public_key());
let mut args = HashMap::new();
args.insert("value".to_string(), ConstraintValue::Float(f64::NAN));
let sig = warrant.sign(&keypair, "process", &args).unwrap();
let result = authorizer.authorize_one(&warrant, "process", &args, Some(&sig), &[]);
match result {
Ok(_) => {
println!(" [FAIL] NaN was accepted!");
panic!("NaN vulnerability");
}
Err(e) => {
println!(" [PASS] NaN rejected: {}", e);
}
}
}
#[test]
fn test_attack_chain_missing_link() {
println!("\n--- Attack: Missing Link in Chain ---");
let root_kp = SigningKey::generate();
let middle_kp = SigningKey::generate();
let leaf_kp = SigningKey::generate();
let root = Warrant::builder()
.capability("test", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(root_kp.public_key())
.build(&root_kp)
.unwrap();
let middle = root
.attenuate()
.capability("test", ConstraintSet::new())
.holder(middle_kp.public_key())
.build(&root_kp) .unwrap();
let leaf = middle
.attenuate()
.capability("test", ConstraintSet::new())
.holder(leaf_kp.public_key())
.build(&middle_kp) .unwrap();
let authorizer = Authorizer::new().with_trusted_root(root_kp.public_key());
let complete = vec![root.clone(), middle.clone(), leaf.clone()];
assert!(authorizer.verify_chain(&complete).is_ok());
let incomplete = vec![root.clone(), leaf.clone()];
let result = authorizer.verify_chain(&incomplete);
match result {
Ok(_) => panic!("Incomplete chain was accepted!"),
Err(e) => {
println!(" [PASS] Missing link rejected: {}", e);
}
}
}
#[test]
fn test_attack_chain_shuffled_order() {
println!("\n--- Attack: Shuffled Chain Order ---");
let root_kp = SigningKey::generate();
let child_kp = SigningKey::generate();
let root = Warrant::builder()
.capability("test", ConstraintSet::new())
.ttl(Duration::from_secs(3600))
.holder(root_kp.public_key())
.build(&root_kp)
.unwrap();
let child = root
.attenuate()
.capability("test", ConstraintSet::new())
.holder(child_kp.public_key())
.build(&root_kp) .unwrap();
let authorizer = Authorizer::new().with_trusted_root(root_kp.public_key());
let correct = vec![root.clone(), child.clone()];
assert!(authorizer.verify_chain(&correct).is_ok());
let reversed = vec![child.clone(), root.clone()];
let result = authorizer.verify_chain(&reversed);
match result {
Ok(_) => panic!("Reversed chain was accepted!"),
Err(e) => {
println!(" [PASS] Shuffled chain rejected: {}", e);
}
}
}