use kavach::scanning::ExternalizationPolicy;
use kavach::*;
#[cfg(target_os = "linux")]
#[tokio::test]
async fn seccomp_blocks_ptrace() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.policy_seccomp("basic")
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("cat /proc/self/status").await.unwrap();
assert_eq!(result.exit_code, 0);
let has_seccomp = result.stdout.contains("Seccomp:");
assert!(has_seccomp, "should be able to read /proc/self/status");
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn seccomp_blocks_mount() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.policy_seccomp("basic")
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox
.exec("mount -t tmpfs none /mnt 2>&1; echo $?")
.await
.unwrap();
assert_ne!(result.exit_code, 0, "mount should fail under seccomp");
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn seccomp_allows_basic_ops() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.policy_seccomp("basic")
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("echo hello && ls /tmp").await.unwrap();
assert_eq!(result.exit_code, 0);
assert!(result.stdout.contains("hello"));
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn gate_blocks_private_key() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.externalization(ExternalizationPolicy::default())
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("echo '-----BEGIN RSA PRIVATE KEY-----'").await;
assert!(result.is_err(), "private key should be blocked");
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn gate_blocks_aws_key() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.externalization(ExternalizationPolicy::default())
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("echo 'AKIAIOSFODNN7EXAMPLE'").await;
assert!(result.is_err(), "AWS key should be blocked");
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn gate_passes_clean_output() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.externalization(ExternalizationPolicy::default())
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("echo 'hello world'").await.unwrap();
assert_eq!(result.exit_code, 0);
assert!(result.stdout.contains("hello world"));
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn gate_redacts_api_key() {
let policy = ExternalizationPolicy {
quarantine_threshold: Severity::High,
block_threshold: Severity::Critical,
..Default::default()
};
let config = SandboxConfig::builder()
.backend(Backend::Process)
.externalization(policy)
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox
.exec(r#"echo 'api_key = "abcdefghijklmnopqrstuvwxyz"'"#)
.await
.unwrap();
assert!(result.stdout.contains("[REDACTED:"));
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn gate_blocks_oversized() {
let policy = ExternalizationPolicy {
max_artifact_size_bytes: 50,
..Default::default()
};
let config = SandboxConfig::builder()
.backend(Backend::Process)
.externalization(policy)
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("head -c 200 /dev/urandom | base64").await;
assert!(result.is_err(), "oversized output should be blocked");
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn gate_disabled_passes_secrets() {
let policy = ExternalizationPolicy {
enabled: false,
..Default::default()
};
let config = SandboxConfig::builder()
.backend(Backend::Process)
.externalization(policy)
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox
.exec("echo '-----BEGIN RSA PRIVATE KEY-----'")
.await
.unwrap();
assert!(result.stdout.contains("BEGIN RSA PRIVATE KEY"));
}
mod gate_patterns {
use kavach::scanning::secrets::SecretsScanner;
fn scanner() -> SecretsScanner {
SecretsScanner::new()
}
#[test]
fn detect_rsa_private_key() {
let findings = scanner().scan("-----BEGIN RSA PRIVATE KEY-----");
assert!(!findings.is_empty(), "should detect RSA key");
}
#[test]
fn detect_ec_private_key() {
let findings = scanner().scan("-----BEGIN EC PRIVATE KEY-----");
assert!(!findings.is_empty(), "should detect EC key");
}
#[test]
fn detect_generic_private_key() {
let findings = scanner().scan("-----BEGIN PRIVATE KEY-----");
assert!(!findings.is_empty(), "should detect generic private key");
}
#[test]
fn detect_openssh_private_key() {
let findings = scanner().scan("-----BEGIN OPENSSH PRIVATE KEY-----");
assert!(!findings.is_empty(), "should detect OpenSSH key");
}
#[test]
fn detect_aws_access_key() {
let findings = scanner().scan("AKIAIOSFODNN7EXAMPLE");
assert!(!findings.is_empty(), "should detect AWS key");
}
#[test]
fn detect_github_token() {
let ghp = format!("ghp_{}", "a".repeat(36));
let findings = scanner().scan(&ghp);
assert!(!findings.is_empty(), "should detect GitHub PAT");
}
#[test]
fn detect_github_app_token() {
let ghs = format!("ghs_{}", "a".repeat(36));
let findings = scanner().scan(&ghs);
assert!(!findings.is_empty(), "should detect GitHub App token");
}
#[test]
fn detect_slack_token() {
let token = format!("xoxb-{}-{}", "1234567890", "abcdefghijklmnopqrstuvwx");
let findings = scanner().scan(&token);
assert!(!findings.is_empty(), "should detect Slack token");
}
#[test]
fn detect_stripe_secret() {
let key = format!("sk_live_{}", "abcdefghijklmnopqrstuvwxyz0123456789");
let findings = scanner().scan(&key);
assert!(!findings.is_empty(), "should detect Stripe key");
}
#[test]
fn detect_jwt() {
let jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.dozjgNryP4J3jVmNHl0w5N_XgL0n3I9PlFUP0THsR8U";
let findings = scanner().scan(jwt);
assert!(!findings.is_empty(), "should detect JWT");
}
#[test]
fn detect_postgres_uri() {
let findings = scanner().scan("postgres://user:password@host:5432/db");
assert!(!findings.is_empty(), "should detect postgres URI");
}
#[test]
fn detect_mysql_uri() {
let findings = scanner().scan("mysql://user:password@host:3306/db");
assert!(!findings.is_empty(), "should detect mysql URI");
}
#[test]
fn detect_mongodb_uri() {
let findings = scanner().scan("mongodb://user:password@host:27017/db");
assert!(!findings.is_empty(), "should detect mongodb URI");
}
#[test]
fn detect_redis_uri() {
let findings = scanner().scan("redis://user:password@host:6379/0");
assert!(!findings.is_empty(), "should detect redis URI");
}
#[test]
fn detect_ssn() {
let findings = scanner().scan("SSN: 123-45-6789");
assert!(!findings.is_empty(), "should detect SSN");
}
#[test]
fn detect_bearer_token() {
let findings =
scanner().scan("Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.a.b");
assert!(!findings.is_empty(), "should detect bearer token");
}
#[test]
fn detect_generic_api_key() {
let findings = scanner().scan("api_key = \"abcdefghijklmnopqrstuvwxyz\"");
assert!(!findings.is_empty(), "should detect generic API key");
}
#[test]
fn clean_output_no_findings() {
let findings = scanner().scan("hello world\nthis is a test\n42\n");
assert!(findings.is_empty(), "clean output should not trigger");
}
#[test]
fn clean_base64_no_false_positive() {
let findings = scanner().scan("dGhpcyBpcyBhIHRlc3Q=");
assert!(findings.is_empty(), "short base64 should not trigger");
}
#[test]
fn redact_does_not_leak_secret() {
let s = scanner();
let redacted = s.redact("my key is AKIAIOSFODNN7EXAMPLE ok");
assert!(
!redacted.contains("AKIAIOSFODNN7EXAMPLE"),
"AWS key should be redacted"
);
assert!(redacted.contains("[REDACTED:"));
}
#[test]
fn redact_preserves_context() {
let s = scanner();
let redacted = s.redact("before AKIAIOSFODNN7EXAMPLE after");
assert!(redacted.contains("before"));
assert!(redacted.contains("after"));
}
#[test]
fn multiple_secrets_all_detected() {
let input = "key=AKIAIOSFODNN7EXAMPLE postgres://user:password@host:5432/db -----BEGIN RSA PRIVATE KEY-----";
let findings = scanner().scan(input);
assert!(
findings.len() >= 3,
"should detect all 3 secrets, got {}",
findings.len()
);
}
#[test]
fn multiple_secrets_all_redacted() {
let input = "key=AKIAIOSFODNN7EXAMPLE and postgres://user:password@host:5432/db";
let redacted = scanner().redact(input);
assert!(!redacted.contains("AKIAIOSFODNN7EXAMPLE"));
assert!(!redacted.contains("postgres://user:password"));
}
#[test]
fn secret_in_json() {
let json = r#"{"aws_key": "AKIAIOSFODNN7EXAMPLE", "data": "safe"}"#;
let findings = scanner().scan(json);
assert!(!findings.is_empty(), "should detect AWS key in JSON");
}
#[test]
fn secret_in_stderr_like_output() {
let output = "Error: connection string postgres://admin:s3cret@db:5432/prod leaked";
let findings = scanner().scan(output);
assert!(
!findings.is_empty(),
"should detect connection string in error output"
);
}
#[test]
fn empty_input_no_crash() {
assert!(scanner().scan("").is_empty());
assert_eq!(scanner().redact(""), "");
}
#[test]
fn very_long_input_no_crash() {
let long = "a".repeat(1_000_000);
let findings = scanner().scan(&long);
assert!(findings.is_empty());
}
#[test]
fn binary_like_input_no_crash() {
let binary: String = (0..=255u8).map(|b| b as char).collect();
let _ = scanner().scan(&binary);
let _ = scanner().redact(&binary);
}
}
#[cfg(feature = "sy-agnos")]
mod tpm {
use kavach::backend::sy_agnos::AttestationReport;
use std::collections::HashMap;
fn valid_report() -> AttestationReport {
let mut pcrs = HashMap::new();
pcrs.insert(8, "abcdef0123456789abcdef0123456789".into());
pcrs.insert(9, "1234567890abcdef1234567890abcdef".into());
pcrs.insert(10, "fedcba9876543210fedcba9876543210".into());
AttestationReport {
pcr_values: pcrs,
hmac_signature: Some("a".repeat(64)),
algorithm: Some("SHA-256".into()),
timestamp: Some("2026-03-21T00:00:00Z".into()),
}
}
#[test]
fn valid_passes() {
assert!(valid_report().verify());
}
#[test]
fn missing_pcr_fails() {
let mut r = valid_report();
r.pcr_values.remove(&9);
assert!(!r.verify());
}
#[test]
fn bad_hex_fails() {
let mut r = valid_report();
r.pcr_values.insert(8, "ZZZZ_not_hex_at_all!".into());
assert!(!r.verify());
}
#[test]
fn short_pcr_fails() {
let mut r = valid_report();
r.pcr_values.insert(8, "abcdef".into());
assert!(!r.verify());
}
#[test]
fn no_hmac_fails() {
let mut r = valid_report();
r.hmac_signature = None;
assert!(!r.verify());
}
#[test]
fn short_hmac_fails() {
let mut r = valid_report();
r.hmac_signature = Some("short".into());
assert!(!r.verify());
}
#[test]
fn empty_report_fails() {
let r = AttestationReport {
pcr_values: HashMap::new(),
hmac_signature: None,
algorithm: None,
timestamp: None,
};
assert!(!r.verify());
}
#[test]
fn extra_pcrs_still_valid() {
let mut r = valid_report();
r.pcr_values.insert(0, "0".repeat(32));
r.pcr_values.insert(7, "7".repeat(32));
assert!(r.verify(), "extra PCRs should not invalidate report");
}
#[test]
fn pcr_case_insensitive() {
let mut r = valid_report();
r.pcr_values
.insert(8, "ABCDEF0123456789ABCDEF0123456789".into());
assert!(r.verify(), "uppercase hex should be valid");
}
#[test]
fn max_length_pcr_valid() {
let mut r = valid_report();
r.pcr_values.insert(8, "a".repeat(128));
assert!(r.verify(), "128-char PCR should be valid");
}
#[test]
fn too_long_pcr_fails() {
let mut r = valid_report();
r.pcr_values.insert(8, "a".repeat(129));
assert!(!r.verify(), "129-char PCR should be invalid");
}
}
#[cfg(feature = "sev")]
mod sev_attestation {
use kavach::backend::sev::{SevAttestationPolicy, SevAttestationReport, SevGuestPolicy};
fn valid_report() -> SevAttestationReport {
SevAttestationReport {
report_version: 2,
guest_svn: 1,
policy: SevGuestPolicy::default_hardened().to_bits(),
measurement: "a".repeat(96),
host_data: "b".repeat(64),
id_key_digest: "c".repeat(64),
report_id: "d".repeat(64),
vmpl: 0,
signature: vec![0xAB; 96],
}
}
#[test]
fn valid_passes() {
assert!(valid_report().verify());
}
#[test]
fn forged_measurement_wrong_length() {
let mut r = valid_report();
r.measurement = "a".repeat(64); assert!(!r.verify());
}
#[test]
fn forged_measurement_nonhex() {
let mut r = valid_report();
r.measurement = "z".repeat(96);
assert!(!r.verify());
}
#[test]
fn forged_report_id() {
let mut r = valid_report();
r.report_id = "short".into();
assert!(!r.verify());
}
#[test]
fn truncated_signature() {
let mut r = valid_report();
r.signature = vec![0; 10];
assert!(!r.verify());
}
#[test]
fn nonzero_vmpl_rejected() {
let mut r = valid_report();
r.vmpl = 1;
assert!(!r.verify());
}
#[test]
fn policy_measurement_mismatch() {
let r = valid_report();
let policy = SevAttestationPolicy {
expected_measurement: Some("f".repeat(96)),
..SevAttestationPolicy::strict()
};
assert!(!policy.verify_against(&r));
}
#[test]
fn policy_svn_too_low() {
let mut r = valid_report();
r.guest_svn = 0;
let policy = SevAttestationPolicy {
min_guest_svn: 5,
..SevAttestationPolicy::strict()
};
assert!(!policy.verify_against(&r));
}
#[test]
fn policy_accepts_valid() {
let r = valid_report();
assert!(SevAttestationPolicy::strict().verify_against(&r));
}
#[test]
fn guest_policy_debug_bit() {
let mut gp = SevGuestPolicy::default_hardened();
assert_eq!(gp.to_bits() & (1 << 19), 0, "debug should be off");
gp.debug_allowed = true;
assert_ne!(gp.to_bits() & (1 << 19), 0, "debug should be on");
}
#[test]
fn guest_policy_roundtrip() {
let gp = SevGuestPolicy {
smt_allowed: true,
migration_agent_allowed: true,
debug_allowed: false,
single_socket_only: true,
min_abi_major: 5,
min_abi_minor: 2,
};
let back = SevGuestPolicy::from_bits(gp.to_bits());
assert_eq!(gp.smt_allowed, back.smt_allowed);
assert_eq!(gp.migration_agent_allowed, back.migration_agent_allowed);
assert_eq!(gp.debug_allowed, back.debug_allowed);
assert_eq!(gp.single_socket_only, back.single_socket_only);
}
#[test]
fn empty_signature_rejected() {
let mut r = valid_report();
r.signature = vec![];
assert!(!r.verify());
}
}
#[cfg(feature = "sgx")]
mod sgx_attestation {
use kavach::backend::sgx::{
SealKeyPolicy, SealedData, SgxAttestationPolicy, SgxAttestationReport,
};
fn valid_report() -> SgxAttestationReport {
SgxAttestationReport {
mrenclave: "a".repeat(64),
mrsigner: "b".repeat(64),
isv_prod_id: 1,
isv_svn: 2,
report_data: vec![0; 64],
ias_signature: Some("c".repeat(64)),
timestamp: Some("2026-03-25T00:00:00Z".into()),
}
}
#[test]
fn valid_passes() {
assert!(valid_report().verify());
}
#[test]
fn forged_mrenclave_wrong_length() {
let mut r = valid_report();
r.mrenclave = "a".repeat(32);
assert!(!r.verify());
}
#[test]
fn forged_mrenclave_nonhex() {
let mut r = valid_report();
r.mrenclave = "z".repeat(64);
assert!(!r.verify());
}
#[test]
fn forged_mrsigner() {
let mut r = valid_report();
r.mrsigner = "short".into();
assert!(!r.verify());
}
#[test]
fn no_ias_signature() {
let mut r = valid_report();
r.ias_signature = None;
assert!(!r.verify());
}
#[test]
fn short_ias_signature() {
let mut r = valid_report();
r.ias_signature = Some("short".into());
assert!(!r.verify());
}
#[test]
fn policy_mrenclave_mismatch() {
let r = valid_report();
let policy = SgxAttestationPolicy {
expected_mrenclave: Some("f".repeat(64)),
..SgxAttestationPolicy::permissive()
};
assert!(!policy.verify_against(&r));
}
#[test]
fn policy_svn_too_low() {
let mut r = valid_report();
r.isv_svn = 0;
let policy = SgxAttestationPolicy {
min_isv_svn: 5,
..SgxAttestationPolicy::permissive()
};
assert!(!policy.verify_against(&r));
}
#[test]
fn sealed_data_roundtrip() {
let data = SealedData {
ciphertext: vec![0xDE, 0xAD, 0xBE, 0xEF],
tag: vec![0x01],
aad: vec![],
key_policy: SealKeyPolicy::MrEnclave,
};
let json = serde_json::to_string(&data).unwrap();
let back: SealedData = serde_json::from_str(&json).unwrap();
assert_eq!(data.ciphertext, back.ciphertext);
assert_eq!(data.key_policy, back.key_policy);
}
#[test]
fn empty_sealed_data_invalid() {
let data = SealedData {
ciphertext: vec![],
tag: vec![],
aad: vec![],
key_policy: SealKeyPolicy::MrSigner,
};
assert!(!data.is_valid());
}
}
#[test]
fn strength_increases_with_policy() {
let minimal = scoring::score_backend(Backend::Process, &SandboxPolicy::minimal());
let basic = scoring::score_backend(Backend::Process, &SandboxPolicy::basic());
let strict = scoring::score_backend(Backend::Process, &SandboxPolicy::strict());
assert!(minimal < basic);
assert!(basic < strict);
}
#[test]
fn backend_strength_ordering() {
let noop = scoring::base_score(Backend::Noop);
let process = scoring::base_score(Backend::Process);
let gvisor = scoring::base_score(Backend::GVisor);
let firecracker = scoring::base_score(Backend::Firecracker);
assert!(noop < process);
assert!(process < gvisor);
assert!(gvisor < firecracker);
}
#[test]
fn firecracker_strict_near_max() {
let score = scoring::score_backend(Backend::Firecracker, &SandboxPolicy::strict());
assert!(score.value() >= 95, "got {}", score.value());
}
#[test]
fn all_backends_serde_roundtrip() {
for backend in Backend::all() {
let json = serde_json::to_string(backend).unwrap();
let back: Backend = serde_json::from_str(&json).unwrap();
assert_eq!(*backend, back);
}
}
#[test]
fn noop_base_is_zero() {
assert_eq!(scoring::base_score(Backend::Noop).value(), 0);
}
#[test]
fn process_base_score() {
let score = scoring::base_score(Backend::Process);
assert!(
score.value() >= 40 && score.value() <= 60,
"got {}",
score.value()
);
}
#[test]
fn all_backends_score_within_range() {
let policy = SandboxPolicy::strict();
for backend in Backend::all() {
let score = scoring::score_backend(*backend, &policy);
assert!(
score.value() <= 100,
"{:?} scored {} > 100",
backend,
score.value()
);
}
}
#[test]
fn score_labels_valid() {
for v in [0u8, 10, 25, 40, 50, 60, 70, 80, 90, 100] {
let score = StrengthScore(v);
let label = score.label();
assert!(!label.is_empty(), "label for {v} should not be empty");
assert_ne!(label, "unknown", "label for {v} should not be unknown");
}
}
#[test]
fn score_above_100_is_unknown() {
assert_eq!(StrengthScore(101).label(), "unknown");
assert_eq!(StrengthScore(255).label(), "unknown");
}
#[test]
fn strict_always_higher_than_minimal() {
for backend in Backend::all() {
let minimal = scoring::score_backend(*backend, &SandboxPolicy::minimal());
let strict = scoring::score_backend(*backend, &SandboxPolicy::strict());
assert!(
strict >= minimal,
"{:?}: strict {} < minimal {}",
backend,
strict.value(),
minimal.value()
);
}
}
#[test]
fn score_is_deterministic() {
let policy = SandboxPolicy::strict();
let s1 = scoring::score_backend(Backend::Process, &policy);
let s2 = scoring::score_backend(Backend::Process, &policy);
assert_eq!(s1.value(), s2.value(), "scoring must be deterministic");
}
mod lifecycle_fsm {
use kavach::SandboxState;
const ALL_STATES: &[SandboxState] = &[
SandboxState::Created,
SandboxState::Running,
SandboxState::Paused,
SandboxState::Stopped,
SandboxState::Destroyed,
];
#[test]
fn created_to_running_valid() {
assert!(SandboxState::Created.valid_transition(&SandboxState::Running));
}
#[test]
fn running_to_paused_valid() {
assert!(SandboxState::Running.valid_transition(&SandboxState::Paused));
}
#[test]
fn running_to_stopped_valid() {
assert!(SandboxState::Running.valid_transition(&SandboxState::Stopped));
}
#[test]
fn paused_to_running_valid() {
assert!(SandboxState::Paused.valid_transition(&SandboxState::Running));
}
#[test]
fn stopped_to_destroyed_valid() {
assert!(SandboxState::Stopped.valid_transition(&SandboxState::Destroyed));
}
#[test]
fn created_to_stopped_invalid() {
assert!(!SandboxState::Created.valid_transition(&SandboxState::Stopped));
}
#[test]
fn created_to_destroyed_invalid() {
assert!(!SandboxState::Created.valid_transition(&SandboxState::Destroyed));
}
#[test]
fn destroyed_is_terminal() {
for state in ALL_STATES {
assert!(
!SandboxState::Destroyed.valid_transition(state),
"Destroyed -> {:?} should be invalid",
state
);
}
}
#[test]
fn self_transitions_invalid() {
for state in ALL_STATES {
assert!(
!state.valid_transition(state),
"{:?} -> {:?} self-transition should be invalid",
state,
state
);
}
}
#[test]
fn all_invalid_transitions_from_created() {
let invalid = [
SandboxState::Created,
SandboxState::Paused,
SandboxState::Stopped,
SandboxState::Destroyed,
];
for to in &invalid {
assert!(
!SandboxState::Created.valid_transition(to),
"Created -> {:?} should be invalid",
to
);
}
}
#[test]
fn all_invalid_transitions_from_stopped() {
let invalid = [
SandboxState::Created,
SandboxState::Running,
SandboxState::Paused,
SandboxState::Stopped,
];
for to in &invalid {
assert!(
!SandboxState::Stopped.valid_transition(to),
"Stopped -> {:?} should be invalid",
to
);
}
}
#[test]
fn running_to_destroyed_valid() {
assert!(SandboxState::Running.valid_transition(&SandboxState::Destroyed));
}
#[test]
fn paused_to_stopped_valid() {
assert!(SandboxState::Paused.valid_transition(&SandboxState::Stopped));
}
#[test]
fn paused_to_destroyed_valid() {
assert!(SandboxState::Paused.valid_transition(&SandboxState::Destroyed));
}
#[test]
fn transition_count() {
let mut valid_count = 0;
for from in ALL_STATES {
for to in ALL_STATES {
if from.valid_transition(to) {
valid_count += 1;
}
}
}
assert_eq!(
valid_count, 8,
"FSM should have exactly 8 valid transitions"
);
}
#[test]
fn state_display_roundtrip() {
for state in ALL_STATES {
let display = state.to_string();
assert!(!display.is_empty());
}
}
#[test]
fn state_serde_roundtrip() {
for state in ALL_STATES {
let json = serde_json::to_string(state).unwrap();
let back: SandboxState = serde_json::from_str(&json).unwrap();
assert_eq!(*state, back);
}
}
}
mod credential {
use kavach::credential::{CredentialProxy, InjectionMethod, SecretRef};
#[test]
fn register_and_resolve() {
let mut proxy = CredentialProxy::new();
proxy.register("db_pass", "s3cret");
assert_eq!(
proxy.resolve(&SecretRef {
name: "db_pass".into(),
inject_via: InjectionMethod::Stdin
}),
Some("s3cret")
);
}
#[test]
fn resolve_missing_returns_none() {
let proxy = CredentialProxy::new();
assert_eq!(
proxy.resolve(&SecretRef {
name: "nonexistent".into(),
inject_via: InjectionMethod::Stdin
}),
None
);
}
#[test]
fn env_var_injection() {
let mut proxy = CredentialProxy::new();
proxy.register("key", "val");
let refs = vec![SecretRef {
name: "key".into(),
inject_via: InjectionMethod::EnvVar {
var_name: "MY_KEY".into(),
},
}];
let env = proxy.env_vars(&refs);
assert_eq!(env.len(), 1);
assert_eq!(env[0], ("MY_KEY".into(), "val".into()));
}
#[test]
fn file_injection() {
let mut proxy = CredentialProxy::new();
proxy.register("cert", "CERT_DATA");
let refs = vec![SecretRef {
name: "cert".into(),
inject_via: InjectionMethod::File {
path: "/run/secrets/cert".into(),
mode: 0o400,
},
}];
let files = proxy.file_injections(&refs);
assert_eq!(files.len(), 1);
assert_eq!(files[0].content, "CERT_DATA");
assert_eq!(files[0].mode, 0o400);
}
#[test]
fn stdin_injection() {
let mut proxy = CredentialProxy::new();
proxy.register("pass", "hunter2");
let refs = vec![SecretRef {
name: "pass".into(),
inject_via: InjectionMethod::Stdin,
}];
let payload = proxy.stdin_payload(&refs);
assert!(payload.is_some());
assert!(payload.unwrap().contains("hunter2"));
}
#[test]
fn missing_secret_skipped_in_env() {
let proxy = CredentialProxy::new();
let refs = vec![SecretRef {
name: "nonexistent".into(),
inject_via: InjectionMethod::EnvVar {
var_name: "X".into(),
},
}];
let env = proxy.env_vars(&refs);
assert!(env.is_empty(), "missing secret should be skipped");
}
#[test]
fn len_and_is_empty() {
let mut proxy = CredentialProxy::new();
assert!(proxy.is_empty());
assert_eq!(proxy.len(), 0);
proxy.register("a", "b");
assert!(!proxy.is_empty());
assert_eq!(proxy.len(), 1);
}
#[test]
fn overwrite_secret() {
let mut proxy = CredentialProxy::new();
proxy.register("key", "old");
proxy.register("key", "new");
assert_eq!(
proxy.resolve(&SecretRef {
name: "key".into(),
inject_via: InjectionMethod::Stdin
}),
Some("new")
);
assert_eq!(proxy.len(), 1);
}
#[test]
fn many_secrets() {
let mut proxy = CredentialProxy::new();
for i in 0..100 {
proxy.register(format!("key_{i}"), format!("val_{i}"));
}
assert_eq!(proxy.len(), 100);
assert_eq!(
proxy.resolve(&SecretRef {
name: "key_50".into(),
inject_via: InjectionMethod::Stdin
}),
Some("val_50")
);
}
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn externalization_composes_with_seccomp() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.policy_seccomp("basic")
.externalization(ExternalizationPolicy::default())
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("echo composed").await.unwrap();
assert_eq!(result.exit_code, 0);
assert!(result.stdout.contains("composed"));
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn basic_policy_with_externalization() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.policy(SandboxPolicy::basic())
.externalization(ExternalizationPolicy {
enabled: false,
..Default::default()
})
.timeout_ms(5_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("echo basic").await.unwrap();
assert!(result.stdout.contains("basic"));
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn timeout_enforcement() {
let config = SandboxConfig::builder()
.backend(Backend::Process)
.timeout_ms(1_000)
.build();
let mut sandbox = Sandbox::create(config).await.unwrap();
sandbox.transition(SandboxState::Running).unwrap();
let result = sandbox.exec("sleep 30").await.unwrap();
assert!(result.timed_out, "should time out");
}
#[test]
fn policy_minimal_defaults() {
let p = SandboxPolicy::minimal();
assert!(!p.network.enabled, "minimal should disable network");
assert!(!p.seccomp_enabled, "minimal should have no seccomp");
}
#[test]
fn policy_basic_has_seccomp() {
let p = SandboxPolicy::basic();
assert!(p.seccomp_enabled, "basic should have seccomp");
assert!(!p.network.enabled, "basic should disable network");
}
#[test]
fn policy_strict_full_lockdown() {
let p = SandboxPolicy::strict();
assert!(p.seccomp_enabled);
assert!(!p.network.enabled);
assert!(p.read_only_rootfs);
}
#[test]
fn policy_serde_roundtrip() {
for policy in [
SandboxPolicy::minimal(),
SandboxPolicy::basic(),
SandboxPolicy::strict(),
] {
let json = serde_json::to_string(&policy).unwrap();
let back: SandboxPolicy = serde_json::from_str(&json).unwrap();
assert_eq!(policy.network.enabled, back.network.enabled);
}
}
#[cfg(feature = "sy-agnos")]
mod phylax {
use kavach::backend::sy_agnos::{PhylaxScanner, PhylaxSeverity};
#[test]
fn detects_verity_violation() {
let scanner = PhylaxScanner::new();
let findings = scanner.scan("dm-verity: device corruption");
assert!(findings.iter().any(|f| f.category == "VERITY"));
}
#[test]
fn detects_nftables_bypass() {
let scanner = PhylaxScanner::new();
let findings = scanner.scan("iptables -F");
assert!(findings.iter().any(|f| f.category == "NFTABLES_BYPASS"));
}
#[test]
fn detects_namespace_escape() {
let scanner = PhylaxScanner::new();
let findings = scanner.scan("nsenter --target 1");
assert!(findings.iter().any(|f| f.category == "NAMESPACE_ESCAPE"));
}
#[test]
fn detects_mount_escape() {
let scanner = PhylaxScanner::new();
let findings = scanner.scan("chroot /host");
assert!(findings.iter().any(|f| f.category == "MOUNT_ESCAPE"));
}
#[test]
fn clean_output_ok() {
let scanner = PhylaxScanner::new();
assert!(scanner.scan("all good").is_empty());
}
#[test]
fn severity_ordering() {
assert!(PhylaxSeverity::Info < PhylaxSeverity::High);
assert!(PhylaxSeverity::High < PhylaxSeverity::Critical);
}
#[test]
fn combined_secrets_and_escape() {
let scanner = PhylaxScanner::new();
let input = "AKIAIOSFODNN7EXAMPLE nsenter --target 1";
let findings = scanner.scan(input);
let has_secret = findings.iter().any(|f| f.severity == PhylaxSeverity::High);
let has_escape = findings
.iter()
.any(|f| f.severity == PhylaxSeverity::Critical);
assert!(
has_secret && has_escape,
"should detect both secret and escape"
);
}
}