use bytes::Bytes;
use crabka_protocol::owned::{
alter_user_scram_credentials_request::{
AlterUserScramCredentialsRequest, ScramCredentialDeletion, ScramCredentialUpsertion,
},
create_acls_request::{AclCreation, CreateAclsRequest},
delete_acls_request::{DeleteAclsFilter, DeleteAclsRequest},
describe_acls_request::DescribeAclsRequest,
};
use crabka_security::SaslMechanism;
use ring::rand::{SecureRandom, SystemRandom};
use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
const SCRAM_SHA_512_WIRE: i8 = 2;
const SCRAM_SHA_256_WIRE: i8 = 1;
pub const DEFAULT_SCRAM_ITERATIONS: i32 = 8192;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ResourceType {
Topic,
Group,
Cluster,
TransactionalId,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum PatternType {
Literal,
Prefixed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum PermissionType {
Allow,
Deny,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum AclOperation {
All,
Read,
Write,
Create,
Delete,
Alter,
Describe,
ClusterAction,
DescribeConfigs,
AlterConfigs,
IdempotentWrite,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct AclEntry {
pub resource_type: ResourceType,
pub resource_name: String,
pub pattern_type: PatternType,
pub principal: String,
pub host: String,
pub operation: AclOperation,
pub permission_type: PermissionType,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct AclEntryFilter {
pub resource_type: Option<ResourceType>,
pub resource_name: Option<String>,
pub pattern_type: Option<PatternType>,
pub principal: Option<String>,
pub host: Option<String>,
pub operation: Option<AclOperation>,
pub permission_type: Option<PermissionType>,
}
#[derive(Debug, Clone)]
pub struct ScramUpsertion {
pub username: String,
pub password: String,
pub iterations: i32,
}
#[derive(Debug, Clone)]
pub struct ScramDeletion {
pub username: String,
}
#[derive(Debug, Clone)]
pub struct ScramUserOutcome {
pub username: String,
pub error: Option<KafkaError>,
}
#[derive(Debug, Clone)]
pub struct CreateAclOutcome {
pub error: Option<KafkaError>,
}
#[derive(Debug, Clone)]
pub struct DeleteAclFilterOutcome {
pub error: Option<KafkaError>,
pub matched: Vec<AclEntry>,
}
impl AdminClient {
pub async fn alter_user_scram_credentials_sha512(
&mut self,
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
) -> Result<Vec<ScramUserOutcome>, AdminError> {
let rng = SystemRandom::new();
let req = build_alter_scram_request_sha512(upsertions, deletions, &rng)?;
let resp = self.conn.send(req).await?;
Ok(parse_alter_scram_results(resp))
}
pub async fn alter_user_scram_credentials_sha256(
&mut self,
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
) -> Result<Vec<ScramUserOutcome>, AdminError> {
let rng = SystemRandom::new();
let req = build_alter_scram_request_sha256(upsertions, deletions, &rng)?;
let resp = self.conn.send(req).await?;
Ok(parse_alter_scram_results(resp))
}
pub async fn describe_acls(
&mut self,
filter: &AclEntryFilter,
) -> Result<Vec<AclEntry>, AdminError> {
let req = filter_to_describe_request(filter);
let resp = self.conn.send(req).await?;
parse_describe_acls(resp)
}
pub async fn create_acls(
&mut self,
creations: &[AclEntry],
) -> Result<Vec<CreateAclOutcome>, AdminError> {
let req = CreateAclsRequest {
creations: creations.iter().map(acl_to_creation).collect(),
..Default::default()
};
let resp = self.conn.send(req).await?;
Ok(resp
.results
.into_iter()
.map(|r| CreateAclOutcome {
error: error_if(r.error_code, r.error_message),
})
.collect())
}
pub async fn delete_acls(
&mut self,
filters: &[AclEntryFilter],
) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
let req = DeleteAclsRequest {
filters: filters.iter().map(acl_filter_to_wire).collect(),
..Default::default()
};
let resp = self.conn.send(req).await?;
let mut out = Vec::with_capacity(resp.filter_results.len());
for fr in resp.filter_results {
if let Some(err) = error_if(fr.error_code, fr.error_message) {
out.push(DeleteAclFilterOutcome {
error: Some(err),
matched: Vec::new(),
});
continue;
}
let mut matched = Vec::with_capacity(fr.matching_acls.len());
for m in fr.matching_acls {
if m.error_code != 0 {
return Err(AdminError::Broker {
api: "DeleteAcls",
code: m.error_code,
name: kafka_error_name(m.error_code),
message: m.error_message,
});
}
matched.push(AclEntry {
resource_type: wire_to_resource_type(m.resource_type)?,
resource_name: m.resource_name,
pattern_type: wire_to_pattern_type(m.pattern_type)?,
principal: m.principal,
host: m.host,
operation: wire_to_operation(m.operation)?,
permission_type: wire_to_permission(m.permission_type)?,
});
}
out.push(DeleteAclFilterOutcome {
error: None,
matched,
});
}
Ok(out)
}
}
fn error_if(code: i16, message: Option<String>) -> Option<KafkaError> {
if code == 0 {
None
} else {
Some(KafkaError {
code,
name: kafka_error_name(code),
message,
})
}
}
fn build_alter_scram_request_sha512(
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
rng: &SystemRandom,
) -> Result<AlterUserScramCredentialsRequest, AdminError> {
build_alter_scram_request(
upsertions,
deletions,
rng,
SaslMechanism::ScramSha512,
SCRAM_SHA_512_WIRE,
)
}
fn build_alter_scram_request_sha256(
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
rng: &SystemRandom,
) -> Result<AlterUserScramCredentialsRequest, AdminError> {
build_alter_scram_request(
upsertions,
deletions,
rng,
SaslMechanism::ScramSha256,
SCRAM_SHA_256_WIRE,
)
}
fn build_alter_scram_request(
upsertions: &[ScramUpsertion],
deletions: &[ScramDeletion],
rng: &SystemRandom,
mechanism: SaslMechanism,
wire_mechanism: i8,
) -> Result<AlterUserScramCredentialsRequest, AdminError> {
let mut wire_upserts = Vec::with_capacity(upsertions.len());
for u in upsertions {
let mut salt = vec![0u8; 16];
rng.fill(&mut salt)
.map_err(|_| AdminError::Protocol("system RNG failure".into()))?;
let salted = crabka_security::pbkdf2_salted(
u.password.as_bytes(),
mechanism,
u32::try_from(u.iterations.max(0)).unwrap_or(0),
&salt,
);
wire_upserts.push(ScramCredentialUpsertion {
name: u.username.clone(),
mechanism: wire_mechanism,
iterations: u.iterations,
salt: Bytes::from(salt),
salted_password: Bytes::from(salted),
..Default::default()
});
}
let wire_deletes = deletions
.iter()
.map(|d| ScramCredentialDeletion {
name: d.username.clone(),
mechanism: wire_mechanism,
..Default::default()
})
.collect();
Ok(AlterUserScramCredentialsRequest {
upsertions: wire_upserts,
deletions: wire_deletes,
..Default::default()
})
}
fn parse_alter_scram_results(
resp: <AlterUserScramCredentialsRequest as crabka_protocol::ProtocolRequest>::Response,
) -> Vec<ScramUserOutcome> {
resp.results
.into_iter()
.map(|r| ScramUserOutcome {
username: r.user,
error: error_if(r.error_code, r.error_message),
})
.collect()
}
fn filter_to_describe_request(f: &AclEntryFilter) -> DescribeAclsRequest {
DescribeAclsRequest {
resource_type_filter: f.resource_type.map_or(WIRE_ANY, resource_type_to_wire),
resource_name_filter: f.resource_name.clone(),
pattern_type_filter: f.pattern_type.map_or(WIRE_ANY, pattern_type_to_wire),
principal_filter: f.principal.clone(),
host_filter: f.host.clone(),
operation: f.operation.map_or(WIRE_ANY, operation_to_wire),
permission_type: f.permission_type.map_or(WIRE_ANY, permission_to_wire),
..Default::default()
}
}
fn parse_describe_acls(
resp: <DescribeAclsRequest as crabka_protocol::ProtocolRequest>::Response,
) -> Result<Vec<AclEntry>, AdminError> {
if resp.error_code != 0 {
return Err(AdminError::Broker {
api: "DescribeAcls",
code: resp.error_code,
name: kafka_error_name(resp.error_code),
message: resp.error_message,
});
}
let mut out = Vec::new();
for resource in resp.resources {
let rt = wire_to_resource_type(resource.resource_type)?;
let pt = wire_to_pattern_type(resource.pattern_type)?;
for desc in resource.acls {
out.push(AclEntry {
resource_type: rt,
resource_name: resource.resource_name.clone(),
pattern_type: pt,
principal: desc.principal,
host: desc.host,
operation: wire_to_operation(desc.operation)?,
permission_type: wire_to_permission(desc.permission_type)?,
});
}
}
Ok(out)
}
pub(crate) fn acl_to_creation(e: &AclEntry) -> AclCreation {
AclCreation {
resource_type: resource_type_to_wire(e.resource_type),
resource_name: e.resource_name.clone(),
resource_pattern_type: pattern_type_to_wire(e.pattern_type),
principal: e.principal.clone(),
host: e.host.clone(),
operation: operation_to_wire(e.operation),
permission_type: permission_to_wire(e.permission_type),
..Default::default()
}
}
pub(crate) fn acl_filter_to_wire(f: &AclEntryFilter) -> DeleteAclsFilter {
DeleteAclsFilter {
resource_type_filter: f.resource_type.map_or(WIRE_ANY, resource_type_to_wire),
resource_name_filter: f.resource_name.clone(),
pattern_type_filter: f.pattern_type.map_or(WIRE_ANY, pattern_type_to_wire),
principal_filter: f.principal.clone(),
host_filter: f.host.clone(),
operation: f.operation.map_or(WIRE_ANY, operation_to_wire),
permission_type: f.permission_type.map_or(WIRE_ANY, permission_to_wire),
..Default::default()
}
}
const WIRE_ANY: i8 = 1;
fn resource_type_to_wire(rt: ResourceType) -> i8 {
match rt {
ResourceType::Topic => 2,
ResourceType::Group => 3,
ResourceType::Cluster => 4,
ResourceType::TransactionalId => 5,
}
}
fn wire_to_resource_type(b: i8) -> Result<ResourceType, AdminError> {
match b {
2 => Ok(ResourceType::Topic),
3 => Ok(ResourceType::Group),
4 => Ok(ResourceType::Cluster),
5 => Ok(ResourceType::TransactionalId),
_ => Err(AdminError::Protocol(format!(
"unknown ACL resource_type discriminant: {b}",
))),
}
}
fn pattern_type_to_wire(pt: PatternType) -> i8 {
match pt {
PatternType::Literal => 3,
PatternType::Prefixed => 4,
}
}
fn wire_to_pattern_type(b: i8) -> Result<PatternType, AdminError> {
match b {
3 => Ok(PatternType::Literal),
4 => Ok(PatternType::Prefixed),
_ => Err(AdminError::Protocol(format!(
"unknown ACL pattern_type discriminant: {b}",
))),
}
}
fn permission_to_wire(pt: PermissionType) -> i8 {
match pt {
PermissionType::Deny => 2,
PermissionType::Allow => 3,
}
}
fn wire_to_permission(b: i8) -> Result<PermissionType, AdminError> {
match b {
2 => Ok(PermissionType::Deny),
3 => Ok(PermissionType::Allow),
_ => Err(AdminError::Protocol(format!(
"unknown ACL permission discriminant: {b}",
))),
}
}
fn operation_to_wire(op: AclOperation) -> i8 {
match op {
AclOperation::All => 2,
AclOperation::Read => 3,
AclOperation::Write => 4,
AclOperation::Create => 5,
AclOperation::Delete => 6,
AclOperation::Alter => 7,
AclOperation::Describe => 8,
AclOperation::ClusterAction => 9,
AclOperation::DescribeConfigs => 10,
AclOperation::AlterConfigs => 11,
AclOperation::IdempotentWrite => 12,
}
}
fn wire_to_operation(b: i8) -> Result<AclOperation, AdminError> {
match b {
2 => Ok(AclOperation::All),
3 => Ok(AclOperation::Read),
4 => Ok(AclOperation::Write),
5 => Ok(AclOperation::Create),
6 => Ok(AclOperation::Delete),
7 => Ok(AclOperation::Alter),
8 => Ok(AclOperation::Describe),
9 => Ok(AclOperation::ClusterAction),
10 => Ok(AclOperation::DescribeConfigs),
11 => Ok(AclOperation::AlterConfigs),
12 => Ok(AclOperation::IdempotentWrite),
_ => Err(AdminError::Protocol(format!(
"unknown ACL operation discriminant: {b}",
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
fn sample_entry() -> AclEntry {
AclEntry {
resource_type: ResourceType::Topic,
resource_name: "orders".into(),
pattern_type: PatternType::Literal,
principal: "User:alice".into(),
host: "*".into(),
operation: AclOperation::Read,
permission_type: PermissionType::Allow,
}
}
#[test]
fn resource_type_round_trips() {
for rt in [
ResourceType::Topic,
ResourceType::Group,
ResourceType::Cluster,
ResourceType::TransactionalId,
] {
assert!(wire_to_resource_type(resource_type_to_wire(rt)).unwrap() == rt);
}
}
#[test]
fn pattern_type_round_trips() {
for pt in [PatternType::Literal, PatternType::Prefixed] {
assert!(wire_to_pattern_type(pattern_type_to_wire(pt)).unwrap() == pt);
}
}
#[test]
fn permission_round_trips() {
for p in [PermissionType::Allow, PermissionType::Deny] {
assert!(wire_to_permission(permission_to_wire(p)).unwrap() == p);
}
}
#[test]
fn operation_round_trips() {
for op in [
AclOperation::All,
AclOperation::Read,
AclOperation::Write,
AclOperation::Create,
AclOperation::Delete,
AclOperation::Alter,
AclOperation::Describe,
AclOperation::ClusterAction,
AclOperation::DescribeConfigs,
AclOperation::AlterConfigs,
AclOperation::IdempotentWrite,
] {
assert!(wire_to_operation(operation_to_wire(op)).unwrap() == op);
}
}
#[test]
fn wire_to_unknown_resource_type_errors() {
assert!(matches!(
wire_to_resource_type(99),
Err(AdminError::Protocol(_))
));
assert!(matches!(
wire_to_resource_type(1),
Err(AdminError::Protocol(_))
));
}
#[test]
fn acl_to_creation_matches_discriminants() {
let e = sample_entry();
let c = acl_to_creation(&e);
assert!(c.resource_type == 2);
assert!(c.resource_name == "orders");
assert!(c.resource_pattern_type == 3);
assert!(c.principal == "User:alice");
assert!(c.host == "*");
assert!(c.operation == 3);
assert!(c.permission_type == 3);
}
#[test]
fn acl_filter_to_wire_uses_any_for_none_axes() {
let f = AclEntryFilter::default();
let w = acl_filter_to_wire(&f);
assert!(w.resource_type_filter == WIRE_ANY);
assert!(w.pattern_type_filter == WIRE_ANY);
assert!(w.operation == WIRE_ANY);
assert!(w.permission_type == WIRE_ANY);
assert!(w.resource_name_filter.is_none());
assert!(w.principal_filter.is_none());
assert!(w.host_filter.is_none());
}
#[test]
fn acl_filter_to_wire_passes_concrete_axes_through() {
let f = AclEntryFilter {
resource_type: Some(ResourceType::Topic),
resource_name: Some("orders".into()),
pattern_type: Some(PatternType::Literal),
principal: Some("User:alice".into()),
host: Some("10.0.0.0".into()),
operation: Some(AclOperation::Read),
permission_type: Some(PermissionType::Allow),
};
let w = acl_filter_to_wire(&f);
assert!(w.resource_type_filter == 2);
assert!(w.resource_name_filter.as_deref() == Some("orders"));
assert!(w.pattern_type_filter == 3);
assert!(w.principal_filter.as_deref() == Some("User:alice"));
assert!(w.host_filter.as_deref() == Some("10.0.0.0"));
assert!(w.operation == 3);
assert!(w.permission_type == 3);
}
#[test]
fn scram_request_carries_pbkdf2_intermediate_not_password() {
let rng = SystemRandom::new();
let upserts = [ScramUpsertion {
username: "alice".into(),
password: "hunter2".into(),
iterations: 4096,
}];
let req = build_alter_scram_request_sha512(&upserts, &[], &rng).unwrap();
assert!(req.upsertions.len() == 1);
let u = &req.upsertions[0];
assert!(u.name == "alice");
assert!(u.mechanism == SCRAM_SHA_512_WIRE);
assert!(u.iterations == 4096);
assert!(u.salt.len() == 16);
assert!(u.salted_password.len() == 64);
assert!(u.salted_password.as_ref() != b"hunter2");
}
#[test]
fn scram_request_deletions_use_sha512_mechanism() {
let rng = SystemRandom::new();
let dels = [ScramDeletion {
username: "alice".into(),
}];
let req = build_alter_scram_request_sha512(&[], &dels, &rng).unwrap();
assert!(req.deletions.len() == 1);
assert!(req.deletions[0].name == "alice");
assert!(req.deletions[0].mechanism == SCRAM_SHA_512_WIRE);
}
#[test]
fn scram_request_two_upserts_get_distinct_salts() {
let rng = SystemRandom::new();
let upserts = [
ScramUpsertion {
username: "alice".into(),
password: "p".into(),
iterations: 4096,
},
ScramUpsertion {
username: "bob".into(),
password: "p".into(),
iterations: 4096,
},
];
let req = build_alter_scram_request_sha512(&upserts, &[], &rng).unwrap();
assert!(req.upsertions[0].salt != req.upsertions[1].salt);
}
#[test]
fn describe_request_uses_any_for_unspecified_axes() {
let f = AclEntryFilter {
principal: Some("User:alice".into()),
..Default::default()
};
let r = filter_to_describe_request(&f);
assert!(r.principal_filter.as_deref() == Some("User:alice"));
assert!(r.resource_type_filter == WIRE_ANY);
assert!(r.pattern_type_filter == WIRE_ANY);
assert!(r.operation == WIRE_ANY);
assert!(r.permission_type == WIRE_ANY);
}
}