use std::{
env,
process::Command,
time::{SystemTime, UNIX_EPOCH},
};
use assert_cmd::prelude::*;
use cosmian_kms_cli_actions::{
actions::symmetric::keys::create_key::CreateKeyAction,
reexport::cosmian_kms_client::reexport::cosmian_kms_client_utils::symmetric_utils::DataEncryptionAlgorithm,
};
use cosmian_logger::{log_init, trace};
use test_kms_server::start_default_test_kms_server_with_cert_auth;
#[cfg(feature = "non-fips")]
use test_kms_server::start_default_test_kms_server_with_privileged_users;
#[cfg(feature = "non-fips")]
use super::rsa::create_key_pair::{RsaKeyPairOptions, create_rsa_key_pair};
use super::{symmetric::create_key::create_symmetric_key, utils::recover_cmd_logs};
#[cfg(feature = "non-fips")]
use crate::tests::shared::{ImportKeyParams, import_key};
use crate::{
config::CKMS_CONF_ENV,
error::{CosmianError, result::CosmianResult},
tests::{
PROG_NAME, save_kms_cli_config,
shared::{ExportKeyParams, destroy, export_key, revoke},
symmetric::encrypt_decrypt::run_encrypt_decrypt_test,
},
};
pub(crate) const SUB_COMMAND: &str = "access-rights";
fn unique_temp_path(file_name: &str) -> String {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
env::temp_dir()
.join(format!("{file_name}.{now}"))
.to_string_lossy()
.into_owned()
}
fn gen_key(cli_conf_path: &str) -> CosmianResult<String> {
create_symmetric_key(cli_conf_path, CreateKeyAction::default())
}
#[cfg(feature = "non-fips")]
fn export_import_sym_key(key_id: &str, cli_conf_path: &str) -> Result<String, CosmianError> {
let export_path = unique_temp_path("output.export");
export_key(ExportKeyParams {
cli_conf_path: cli_conf_path.to_owned(),
sub_command: "sym".to_owned(),
key_id: key_id.to_owned(),
key_file: export_path.clone(),
..Default::default()
})?;
let import_params = ImportKeyParams {
cli_conf_path: cli_conf_path.to_owned(),
sub_command: "sym".to_owned(),
key_file: export_path,
..Default::default()
};
import_key(import_params)
}
pub(crate) fn grant_access(
cli_conf_path: &str,
object_id: Option<&str>,
user: &str,
operations: &[&str],
) -> CosmianResult<()> {
let mut cmd = Command::cargo_bin(PROG_NAME)?;
cmd.env(CKMS_CONF_ENV, cli_conf_path);
cmd.arg(SUB_COMMAND).args(vec!["grant", user]);
for operation in operations {
cmd.arg(operation);
}
if let Some(uid) = object_id {
cmd.args(vec!["--object-uid", uid]);
}
let output = recover_cmd_logs(&mut cmd);
if output.status.success() {
return Ok(());
}
Err(CosmianError::Default(
std::str::from_utf8(&output.stderr)?.to_owned(),
))
}
pub(crate) fn revoke_access(
cli_conf_path: &str,
object_id: Option<&str>,
user: &str,
operations: &[&str],
) -> CosmianResult<()> {
let mut cmd = Command::cargo_bin(PROG_NAME)?;
cmd.env(CKMS_CONF_ENV, cli_conf_path);
cmd.arg(SUB_COMMAND).args(vec!["revoke", user]);
for operation in operations {
cmd.arg(operation);
}
if let Some(uid) = object_id {
cmd.args(vec!["--object-uid", uid]);
}
let output = recover_cmd_logs(&mut cmd);
if output.status.success() {
return Ok(());
}
Err(CosmianError::Default(
std::str::from_utf8(&output.stderr)?.to_owned(),
))
}
fn list_access(cli_conf_path: &str, object_id: &str) -> CosmianResult<String> {
let mut cmd = Command::cargo_bin(PROG_NAME)?;
cmd.env(CKMS_CONF_ENV, cli_conf_path);
cmd.arg(SUB_COMMAND).args(vec!["list", object_id]);
let output = recover_cmd_logs(&mut cmd);
if output.status.success() {
let out = String::from_utf8(output.stdout)?;
return Ok(out);
}
Err(CosmianError::Default(
std::str::from_utf8(&output.stderr)?.to_owned(),
))
}
fn list_owned_objects(cli_conf_path: &str) -> CosmianResult<String> {
let mut cmd = Command::cargo_bin(PROG_NAME)?;
cmd.env(CKMS_CONF_ENV, cli_conf_path);
cmd.arg(SUB_COMMAND).args(vec!["owned"]);
let output = recover_cmd_logs(&mut cmd);
if output.status.success() {
let out = String::from_utf8(output.stdout)?;
return Ok(out);
}
Err(CosmianError::Default(
std::str::from_utf8(&output.stderr)?.to_owned(),
))
}
fn list_accesses_rights_obtained(cli_conf_path: &str) -> CosmianResult<String> {
let mut cmd = Command::cargo_bin(PROG_NAME)?;
cmd.env(CKMS_CONF_ENV, cli_conf_path);
cmd.arg(SUB_COMMAND).args(vec!["obtained"]);
let output = recover_cmd_logs(&mut cmd);
if output.status.success() {
let out = String::from_utf8(output.stdout)?;
return Ok(out);
}
Err(CosmianError::Default(
std::str::from_utf8(&output.stderr)?.to_owned(),
))
}
#[tokio::test]
pub(crate) async fn test_ownership_and_grant() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
export_key(ExportKeyParams {
cli_conf_path: owner_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})?;
run_encrypt_decrypt_test(
&owner_client_conf_path,
&key_id,
DataEncryptionAlgorithm::AesGcm,
None,
0,
)?;
assert!(
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})
.is_err()
);
assert!(
run_encrypt_decrypt_test(
&user_client_conf_path,
&key_id,
DataEncryptionAlgorithm::AesGcm,
None,
0
)
.is_err()
);
assert!(revoke(&user_client_conf_path, "sym", &key_id, "failed revoke").is_err());
assert!(destroy(&user_client_conf_path, "sym", &key_id, false).is_err());
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["decrypt", "encrypt"],
)?;
assert!(
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})
.is_err()
);
run_encrypt_decrypt_test(
&user_client_conf_path,
&key_id,
DataEncryptionAlgorithm::AesGcm,
None,
0,
)?;
assert!(revoke(&user_client_conf_path, "sym", &key_id, "failed revoke").is_err());
assert!(destroy(&user_client_conf_path, "sym", &key_id, false).is_err());
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get"],
)?;
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})?;
assert!(revoke(&user_client_conf_path, "sym", &key_id, "failed revoke").is_err());
assert!(destroy(&user_client_conf_path, "sym", &key_id, false).is_err());
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["revoke"],
)?;
revoke(&user_client_conf_path, "sym", &key_id, "user revoke")?;
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["destroy"],
)?;
destroy(&user_client_conf_path, "sym", &key_id, false)?;
Ok(())
}
#[tokio::test]
pub(crate) async fn test_grant_error() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, _) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
assert!(
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["BAD_OP"],
)
.is_err(),
);
assert!(
grant_access(
&owner_client_conf_path,
Some("BAD ID"),
"user.client@acme.com",
&["get"]
)
.is_err()
);
assert!(
grant_access(
&owner_client_conf_path,
Some(&key_id),
"owner.client@acme.com",
&["get"]
)
.is_err()
);
Ok(())
}
#[tokio::test]
pub(crate) async fn test_revoke_access() -> CosmianResult<()> {
log_init(option_env!("RUST_LOG"));
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get"],
)?;
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})?;
revoke_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get"],
)?;
assert!(
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path,
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})
.is_err()
);
assert!(
revoke_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["BAD"]
)
.is_err()
);
assert!(
revoke_access(
&owner_client_conf_path,
Some("BAD KEY"),
"user.client@acme.com",
&["get"]
)
.is_err()
);
revoke_access(&owner_client_conf_path, Some(&key_id), "BAD USER", &["get"])?;
Ok(())
}
#[tokio::test]
pub(crate) async fn test_list_access_rights() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get"],
)?;
let owner_list = list_access(&owner_client_conf_path, &key_id)?;
trace!("owner list {owner_list}");
assert!(owner_list.contains("user.client@acme.com: {get}"));
assert!(list_access(&user_client_conf_path, &key_id).is_err());
Ok(())
}
#[tokio::test]
pub(crate) async fn test_list_access_rights_error() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (_, user_client_conf_path) = save_kms_cli_config(ctx);
assert!(list_access(&user_client_conf_path, "BAD KEY").is_err());
Ok(())
}
#[tokio::test]
pub(crate) async fn test_list_owned_objects() -> CosmianResult<()> {
log_init(option_env!("RUST_LOG"));
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get"],
)?;
let user_list = list_owned_objects(&user_client_conf_path)?;
assert!(!user_list.contains(&key_id));
let owner_list = list_owned_objects(&owner_client_conf_path)?;
assert!(owner_list.contains(&key_id));
let user_key_id = gen_key(&user_client_conf_path)?;
let user_list = list_owned_objects(&user_client_conf_path)?;
assert!(user_list.contains(&user_key_id));
let owner_list = list_owned_objects(&owner_client_conf_path)?;
assert!(!owner_list.contains(&user_key_id));
assert!(owner_list.contains(&key_id));
Ok(())
}
#[tokio::test]
pub(crate) async fn test_access_right_obtained() -> CosmianResult<()> {
log_init(option_env!("RUST_LOG"));
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
let list = list_accesses_rights_obtained(&owner_client_conf_path)?;
assert!(!list.contains(&key_id));
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get"],
)?;
let list = list_accesses_rights_obtained(&user_client_conf_path)?;
trace!("user list {list}");
assert!(list.contains(&key_id));
assert!(list.contains("get"));
let list = list_accesses_rights_obtained(&owner_client_conf_path)?;
assert!(!list.contains(&key_id));
let owner_list = list_accesses_rights_obtained(&owner_client_conf_path)?;
assert!(!owner_list.contains(&key_id));
Ok(())
}
#[tokio::test]
pub(crate) async fn test_ownership_and_grant_wildcard_user() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
export_key(ExportKeyParams {
cli_conf_path: owner_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})?;
run_encrypt_decrypt_test(
&owner_client_conf_path,
&key_id,
DataEncryptionAlgorithm::AesGcm,
None,
0,
)?;
assert!(
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})
.is_err()
);
assert!(
run_encrypt_decrypt_test(
&user_client_conf_path,
&key_id,
DataEncryptionAlgorithm::AesGcm,
None,
0
)
.is_err()
);
assert!(revoke(&user_client_conf_path, "sym", &key_id, "failed revoke").is_err());
assert!(destroy(&user_client_conf_path, "sym", &key_id, false).is_err());
grant_access(&owner_client_conf_path, Some(&key_id), "*", &["encrypt"])?;
grant_access(&owner_client_conf_path, Some(&key_id), "*", &["decrypt"])?;
assert!(
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})
.is_err()
);
run_encrypt_decrypt_test(
&user_client_conf_path,
&key_id,
DataEncryptionAlgorithm::AesGcm,
None,
0,
)?;
assert!(revoke(&user_client_conf_path, "sym", &key_id, "failed revoke").is_err());
assert!(destroy(&user_client_conf_path, "sym", &key_id, false).is_err());
grant_access(&owner_client_conf_path, Some(&key_id), "*", &["get"])?;
export_key(ExportKeyParams {
cli_conf_path: user_client_conf_path.clone(),
sub_command: "sym".to_owned(),
key_id: key_id.clone(),
key_file: unique_temp_path("output.json"),
..Default::default()
})?;
assert!(revoke(&user_client_conf_path, "sym", &key_id, "failed revoke").is_err());
assert!(destroy(&user_client_conf_path, "sym", &key_id, false).is_err());
grant_access(&owner_client_conf_path, Some(&key_id), "*", &["revoke"])?;
revoke(&user_client_conf_path, "sym", &key_id, "user revoke")?;
grant_access(&owner_client_conf_path, Some(&key_id), "*", &["destroy"])?;
destroy(&user_client_conf_path, "sym", &key_id, false)?;
Ok(())
}
#[tokio::test]
pub(crate) async fn test_access_right_obtained_using_wildcard() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
let list = list_accesses_rights_obtained(&owner_client_conf_path)?;
assert!(!list.contains(&key_id));
grant_access(&owner_client_conf_path, Some(&key_id), "*", &["get"])?;
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["encrypt"],
)?;
let list = list_accesses_rights_obtained(&user_client_conf_path)?;
trace!("user list {list}");
assert!(list.contains(&key_id));
assert!(list.contains("get"));
assert!(list.contains("encrypt"));
let list = list_accesses_rights_obtained(&owner_client_conf_path)?;
assert!(!list.contains(&key_id));
let owner_list = list_accesses_rights_obtained(&owner_client_conf_path)?;
assert!(!owner_list.contains(&key_id));
Ok(())
}
#[tokio::test]
pub(crate) async fn test_grant_multiple_operations() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, _) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path)?;
grant_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get", "revoke", "encrypt", "encrypt"], )?;
let owner_list = list_access(&owner_client_conf_path, &key_id)?;
assert!(owner_list.contains("user.client@acme.com: {encrypt, get, revoke}"));
revoke_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get", "revoke", "get"], )?;
let owner_list = list_access(&owner_client_conf_path, &key_id)?;
assert!(owner_list.contains("user.client@acme.com: {encrypt}"));
revoke_access(
&owner_client_conf_path,
Some(&key_id),
"user.client@acme.com",
&["get", "revoke", "get"], )?;
let owner_list = list_access(&owner_client_conf_path, &key_id)?;
assert!(owner_list.contains("user.client@acme.com: {encrypt}"));
Ok(())
}
#[tokio::test]
pub(crate) async fn test_grant_with_without_object_uid() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_cert_auth().await;
let (owner_client_conf_path, _) = save_kms_cli_config(ctx);
let result_grant_create = grant_access(
&owner_client_conf_path,
None,
"user.client@acme.com",
&["create"],
);
assert!(result_grant_create.is_ok());
let result_grant_other = grant_access(
&owner_client_conf_path,
None,
"user.client@acme.com",
&["create", "get"],
);
assert!(result_grant_other.is_err());
Ok(())
}
#[cfg(feature = "non-fips")]
#[tokio::test]
pub(crate) async fn test_privileged_users() -> CosmianResult<()> {
let ctx = start_default_test_kms_server_with_privileged_users(vec![
"owner.client@acme.com".to_owned(),
"user.privileged@acme.com".to_owned(),
])
.await;
let (owner_client_conf_path, user_client_conf_path) = save_kms_cli_config(ctx);
let key_id = gen_key(&owner_client_conf_path);
assert!(key_id.is_ok());
let binding = key_id.unwrap();
let initial_key_id = binding.as_str();
grant_access(
&owner_client_conf_path,
Some(initial_key_id),
"user.client@acme.com",
&["export", "get"],
)?;
let keypair_id = create_rsa_key_pair(&owner_client_conf_path, &RsaKeyPairOptions::default());
assert!(keypair_id.is_ok());
let imported_key_id = export_import_sym_key(initial_key_id, &owner_client_conf_path);
assert!(imported_key_id.is_ok());
let key_id_user = gen_key(&user_client_conf_path);
assert!(key_id_user.is_err());
let keypair_id_user = gen_key(&user_client_conf_path);
assert!(keypair_id_user.is_err());
let imported_key_id = export_import_sym_key(initial_key_id, &user_client_conf_path);
assert!(imported_key_id.is_err());
let result_grant_create = grant_access(
&owner_client_conf_path,
None,
"user.client@acme.com",
&["create"],
);
assert!(result_grant_create.is_ok());
let key_id_user = gen_key(&user_client_conf_path);
assert!(key_id_user.is_ok());
let keypair_id_user = gen_key(&user_client_conf_path);
assert!(keypair_id_user.is_ok());
let imported_key_id = export_import_sym_key(initial_key_id, &user_client_conf_path);
assert!(imported_key_id.is_ok());
let result_grant_create = grant_access(
&user_client_conf_path,
None,
"user2.client@acme.com",
&["create"],
);
assert!(result_grant_create.is_err());
let result_grant_create = grant_access(
&owner_client_conf_path,
None,
"user.privileged@acme.com",
&["create"],
);
assert!(result_grant_create.is_err());
let result_revoke_create = revoke_access(
&owner_client_conf_path,
None,
"user.client@acme.com",
&["create"],
);
assert!(result_revoke_create.is_ok());
let key_id_user = gen_key(&user_client_conf_path);
assert!(key_id_user.is_err());
let keypair_id_user = gen_key(&user_client_conf_path);
assert!(keypair_id_user.is_err());
let imported_key_id = export_import_sym_key(initial_key_id, &user_client_conf_path);
assert!(imported_key_id.is_err());
let result_revoke_create = revoke_access(
&owner_client_conf_path,
None,
"user.privileged@acme.com",
&["create"],
);
assert!(result_revoke_create.is_err());
Ok(())
}