use assert_cmd::Command;
use predicates::prelude::PredicateBooleanExt;
use tempfile::tempdir;
use tsafe_core::baseline_contracts::{
baseline_contracts, ci_deploy_contract, ops_emergency_contract, read_only_contract,
};
fn tsafe() -> Command {
Command::cargo_bin("tsafe").unwrap()
}
#[cfg(not(target_os = "windows"))]
fn portable_noop_child() -> &'static [&'static str] {
&["true"]
}
#[cfg(target_os = "windows")]
fn portable_noop_child() -> &'static [&'static str] {
&["cmd", "/c", "exit", "0"]
}
#[cfg(not(target_os = "windows"))]
fn portable_noop_target() -> &'static str {
"true"
}
#[cfg(target_os = "windows")]
fn portable_noop_target() -> &'static str {
"cmd"
}
#[cfg(not(target_os = "windows"))]
fn portable_env_value_child(name: &str) -> Vec<String> {
vec!["printenv".to_string(), name.to_string()]
}
#[cfg(target_os = "windows")]
fn portable_env_value_child(name: &str) -> Vec<String> {
vec![
"cmd".to_string(),
"/c".to_string(),
"echo".to_string(),
format!("%{name}%"),
]
}
fn exec_args(prefix: &[&str], child_cmd: Vec<String>) -> Vec<String> {
let mut args = prefix.iter().map(|s| s.to_string()).collect::<Vec<_>>();
args.push("--".to_string());
args.extend(child_cmd);
args
}
#[test]
fn test_read_only_baseline_contract_is_accessible() {
let contract = read_only_contract();
assert_eq!(contract.name, "read_only");
assert_eq!(contract.access_level.to_string(), "read_only");
assert_eq!(contract.required_trust_profile, "standard");
}
#[test]
fn test_read_only_contract_allows_any_target() {
let contract = read_only_contract();
assert!(contract.target_constraints.is_none());
assert!(contract.secret_constraints.allowed_secrets.is_empty());
}
#[test]
fn test_ci_deploy_baseline_contract_restricts_targets() {
let targets = vec!["terraform".to_string(), "ansible".to_string()];
let contract = ci_deploy_contract(targets.clone());
assert_eq!(contract.name, "ci_deploy");
assert_eq!(contract.required_trust_profile, "hardened");
assert_eq!(contract.access_level.to_string(), "read_only");
let constraints = contract.target_constraints.unwrap();
assert_eq!(constraints, targets);
}
#[test]
fn test_ci_deploy_contract_requires_aws_credentials() {
let contract = ci_deploy_contract(vec!["terraform".to_string()]);
let required = &contract.secret_constraints.required_secrets;
assert!(required.contains(&"AWS_ACCESS_KEY_ID".to_string()));
assert!(required.contains(&"AWS_SECRET_ACCESS_KEY".to_string()));
}
#[test]
fn test_ops_emergency_baseline_contract_allows_write_access() {
let contract = ops_emergency_contract();
assert_eq!(contract.name, "ops_emergency");
assert_eq!(contract.access_level.to_string(), "read_write");
assert_eq!(contract.required_trust_profile, "hardened");
}
#[test]
fn test_ops_emergency_contract_restricts_to_ssh_and_sudo() {
let contract = ops_emergency_contract();
let targets = contract.target_constraints.unwrap();
assert!(targets.contains(&"ssh".to_string()));
assert!(targets.contains(&"sudo".to_string()));
assert_eq!(targets.len(), 2);
}
#[test]
fn test_baseline_contracts_registry_contains_all_templates() {
let registry = baseline_contracts();
assert!(registry.contains_key("read_only"));
assert!(registry.contains_key("ci_deploy"));
assert!(registry.contains_key("ops_emergency"));
assert_eq!(registry.len(), 3);
}
#[test]
fn test_baseline_contracts_are_serializable() {
let contract = read_only_contract();
let serialized = serde_json::to_string(&contract).expect("should serialize");
let deserialized: tsafe_core::baseline_contracts::BaselineContract =
serde_json::from_str(&serialized).expect("should deserialize");
assert_eq!(deserialized.name, contract.name);
assert_eq!(
deserialized.access_level.to_string(),
contract.access_level.to_string()
);
}
#[test]
fn test_ci_deploy_contract_metadata_is_accurate() {
let contract = ci_deploy_contract(vec!["terraform".to_string()]);
assert!(contract
.description
.contains("hardened env, target-restricted"));
assert!(contract.description.contains("read-only vault"));
}
#[test]
fn test_ops_emergency_contract_has_admin_credentials() {
let contract = ops_emergency_contract();
let allowed = &contract.secret_constraints.allowed_secrets;
assert!(allowed.contains(&"ADMIN_SSH_KEY".to_string()));
assert!(allowed.contains(&"SUDO_PASSWORD".to_string()));
assert!(allowed.contains(&"INCIDENT_TICKET_TOKEN".to_string()));
}
#[test]
fn template_read_only_inherits_standard_trust() {
let dir = tempdir().unwrap();
std::fs::write(
dir.path().join(".tsafe.yml"),
r#"
contracts:
viewer:
template: read_only
"#,
)
.unwrap();
tsafe()
.args(["init"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "APP_KEY", "secret-value"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args([
"exec",
"--contract",
"viewer",
"--allow-dangerous-env",
"--",
])
.args(portable_noop_child())
.current_dir(dir.path())
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
}
#[test]
fn template_read_only_injects_secret_into_portable_child_env() {
let dir = tempdir().unwrap();
std::fs::write(
dir.path().join(".tsafe.yml"),
r#"
contracts:
viewer:
template: read_only
"#,
)
.unwrap();
tsafe()
.args(["init"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "APP_KEY", "secret-value"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(exec_args(
&["exec", "--contract", "viewer"],
portable_env_value_child("APP_KEY"),
))
.current_dir(dir.path())
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success()
.stdout(predicates::str::contains("secret-value"));
}
#[test]
fn template_ci_deploy_enforces_target_restriction() {
let dir = tempdir().unwrap();
std::fs::write(
dir.path().join(".tsafe.yml"),
r#"
contracts:
deploy:
template: ci_deploy
allowed_secrets: [AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY]
required_secrets: [AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY]
allowed_targets: [terraform, ansible]
"#,
)
.unwrap();
tsafe()
.args(["init"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "AWS_ACCESS_KEY_ID", "AKIA123"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "AWS_SECRET_ACCESS_KEY", "secret123"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["exec", "--contract", "deploy", "--"])
.args(portable_noop_child())
.current_dir(dir.path())
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.failure()
.stderr(
predicates::str::contains("TARGET_NOT_ALLOWED")
.or(predicates::str::contains("not in allowed_targets")),
);
}
#[test]
fn template_unknown_name_gives_clear_error() {
let dir = tempdir().unwrap();
std::fs::write(
dir.path().join(".tsafe.yml"),
r#"
contracts:
bad:
template: nonexistent_template
trust_level: standard
"#,
)
.unwrap();
tsafe()
.args(["init"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["exec", "--contract", "bad", "--"])
.args(portable_noop_child())
.current_dir(dir.path())
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.failure()
.stderr(predicates::str::contains("nonexistent_template"));
}
#[test]
fn template_fields_can_be_overridden() {
let dir = tempdir().unwrap();
#[cfg(not(target_os = "windows"))]
let (override_target, check_child): (&str, &[&str]) = (
"printenv",
&[
"exec",
"--contract",
"deploy",
"--",
"printenv",
"AWS_ACCESS_KEY_ID",
],
);
#[cfg(target_os = "windows")]
let (override_target, check_child): (&str, &[&str]) = (
"cmd",
&[
"exec",
"--contract",
"deploy",
"--",
"cmd",
"/c",
"echo",
"%AWS_ACCESS_KEY_ID%",
],
);
std::fs::write(
dir.path().join(".tsafe.yml"),
format!(
r#"
contracts:
deploy:
template: ci_deploy
allowed_secrets: [AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY]
required_secrets: [AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY]
allowed_targets: [{override_target}]
"#
),
)
.unwrap();
tsafe()
.args(["init"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "AWS_ACCESS_KEY_ID", "AKIA123"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "AWS_SECRET_ACCESS_KEY", "secret123"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(check_child)
.current_dir(dir.path())
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success()
.stdout(predicates::str::contains("[REDACTED]"));
}
#[test]
fn template_ci_deploy_target_override_does_not_disable_hardened_dangerous_env_denial() {
let dir = tempdir().unwrap();
#[cfg(not(target_os = "windows"))]
let (override_target, child_args): (&str, &[&str]) =
("true", &["exec", "--contract", "deploy", "--", "true"]);
#[cfg(target_os = "windows")]
let (override_target, child_args): (&str, &[&str]) = (
"cmd",
&[
"exec",
"--contract",
"deploy",
"--",
"cmd",
"/c",
"exit",
"0",
],
);
std::fs::write(
dir.path().join(".tsafe.yml"),
format!(
r#"
contracts:
deploy:
template: ci_deploy
allowed_secrets: [NODE_OPTIONS]
required_secrets: [NODE_OPTIONS]
allowed_targets: [{override_target}]
"#
),
)
.unwrap();
tsafe()
.args(["init"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "NODE_OPTIONS", "--", "--inspect"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(child_args)
.current_dir(dir.path())
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.failure()
.stderr(predicates::str::contains("DANGEROUS_ENV_VARIABLE"))
.stderr(predicates::str::contains("NODE_OPTIONS"));
}
#[test]
fn template_ci_deploy_target_override_still_requires_default_aws_secret_pair() {
let dir = tempdir().unwrap();
let override_target = portable_noop_target();
std::fs::write(
dir.path().join(".tsafe.yml"),
format!(
r#"
contracts:
deploy:
template: ci_deploy
allowed_targets: [{override_target}]
"#
),
)
.unwrap();
tsafe()
.args(["init"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["set", "AWS_ACCESS_KEY_ID", "AKIA123"])
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.success();
tsafe()
.args(["exec", "--contract", "deploy", "--"])
.args(portable_noop_child())
.current_dir(dir.path())
.env("TSAFE_VAULT_DIR", dir.path())
.env("TSAFE_PASSWORD", "pw")
.assert()
.failure()
.stderr(predicates::str::contains("required secret"))
.stderr(predicates::str::contains("AWS_SECRET_ACCESS_KEY"))
.stderr(predicates::str::contains("allowed_targets").not());
}