use bytes::Bytes;
use crabka_metadata::DelegationToken;
use crabka_protocol::owned::create_delegation_token_request::{
CreatableRenewers, CreateDelegationTokenRequest,
};
use crabka_protocol::owned::create_delegation_token_response::CreateDelegationTokenResponse;
use crabka_protocol::owned::describe_delegation_token_request::{
DescribeDelegationTokenOwner, DescribeDelegationTokenRequest,
};
use crabka_protocol::owned::describe_delegation_token_response::{
DescribeDelegationTokenResponse, DescribedDelegationToken,
};
use crabka_protocol::owned::expire_delegation_token_request::ExpireDelegationTokenRequest;
use crabka_protocol::owned::renew_delegation_token_request::RenewDelegationTokenRequest;
use crabka_security::KafkaPrincipal;
use crate::{AdminClient, AdminError, kafka_error_name};
impl AdminClient {
pub async fn create_delegation_token_as_owner(
&mut self,
owner_principal_name: &str,
renewers: &[String],
max_lifetime_ms: i64,
) -> Result<CreateDelegationTokenResponse, AdminError> {
let req = build_create_delegation_token(owner_principal_name, renewers, max_lifetime_ms);
let resp = self.conn.send(req).await?;
if resp.error_code != 0 {
return Err(broker_err("CreateDelegationToken", resp.error_code, None));
}
Ok(resp)
}
pub async fn renew_delegation_token(&mut self, hmac: &[u8]) -> Result<i64, AdminError> {
let req = build_renew_delegation_token(hmac);
let resp = self.conn.send(req).await?;
if resp.error_code != 0 {
return Err(broker_err("RenewDelegationToken", resp.error_code, None));
}
Ok(resp.expiry_timestamp_ms)
}
pub async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
let req = build_expire_delegation_token(hmac);
let resp = self.conn.send(req).await?;
if resp.error_code != 0 {
return Err(broker_err("ExpireDelegationToken", resp.error_code, None));
}
Ok(())
}
pub async fn describe_delegation_tokens_owned_by(
&mut self,
owner_principal: &str,
) -> Result<Vec<DelegationToken>, AdminError> {
let req = build_describe_owner_filter(owner_principal);
let resp = self.conn.send(req).await?;
parse_describe_delegation_tokens(resp)
}
}
fn build_create_delegation_token(
owner_principal_name: &str,
renewers: &[String],
max_lifetime_ms: i64,
) -> CreateDelegationTokenRequest {
CreateDelegationTokenRequest {
owner_principal_type: Some("User".into()),
owner_principal_name: Some(owner_principal_name.into()),
renewers: renewers
.iter()
.map(|s| renewer_str_to_wire(s.as_str()))
.collect(),
max_lifetime_ms,
..Default::default()
}
}
fn build_renew_delegation_token(hmac: &[u8]) -> RenewDelegationTokenRequest {
RenewDelegationTokenRequest {
hmac: Bytes::copy_from_slice(hmac),
renew_period_ms: -1,
..Default::default()
}
}
fn build_expire_delegation_token(hmac: &[u8]) -> ExpireDelegationTokenRequest {
ExpireDelegationTokenRequest {
hmac: Bytes::copy_from_slice(hmac),
expiry_time_period_ms: -1,
..Default::default()
}
}
fn renewer_str_to_wire(s: &str) -> CreatableRenewers {
let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
CreatableRenewers {
principal_type: pt.to_string(),
principal_name: pn.to_string(),
..Default::default()
}
}
fn build_describe_owner_filter(owner_principal: &str) -> DescribeDelegationTokenRequest {
let (pt, pn) = owner_principal
.split_once(':')
.unwrap_or(("User", owner_principal));
DescribeDelegationTokenRequest {
owners: Some(vec![DescribeDelegationTokenOwner {
principal_type: pt.to_string(),
principal_name: pn.to_string(),
..Default::default()
}]),
..Default::default()
}
}
fn parse_describe_delegation_tokens(
resp: DescribeDelegationTokenResponse,
) -> Result<Vec<DelegationToken>, AdminError> {
if resp.error_code != 0 {
return Err(broker_err("DescribeDelegationToken", resp.error_code, None));
}
Ok(resp.tokens.into_iter().map(described_to_image).collect())
}
fn described_to_image(t: DescribedDelegationToken) -> DelegationToken {
DelegationToken {
token_id: t.token_id,
owner: KafkaPrincipal {
principal_type: t.principal_type,
name: t.principal_name,
},
hmac: t.hmac.to_vec(),
issue_timestamp_ms: t.issue_timestamp,
expiry_timestamp_ms: t.expiry_timestamp,
max_timestamp_ms: t.max_timestamp,
renewers: t
.renewers
.into_iter()
.map(|r| KafkaPrincipal {
principal_type: r.principal_type,
name: r.principal_name,
})
.collect(),
}
}
fn broker_err(api: &'static str, code: i16, message: Option<String>) -> AdminError {
AdminError::Broker {
api,
code,
name: kafka_error_name(code),
message,
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use bytes::Bytes;
use crabka_protocol::owned::describe_delegation_token_response::{
DescribeDelegationTokenResponse, DescribedDelegationToken, DescribedDelegationTokenRenewer,
};
#[test]
fn build_create_populates_act_as_owner_and_renewers() {
let req = build_create_delegation_token(
"alice",
&["User:bob".to_string(), "carol".to_string()],
60_000,
);
assert!(req.owner_principal_type.as_deref() == Some("User"));
assert!(req.owner_principal_name.as_deref() == Some("alice"));
assert!(req.max_lifetime_ms == 60_000);
assert!(req.renewers.len() == 2);
assert!(req.renewers[0].principal_type == "User");
assert!(req.renewers[0].principal_name == "bob");
assert!(req.renewers[1].principal_type == "User");
assert!(req.renewers[1].principal_name == "carol");
}
#[test]
fn build_renew_uses_minus_one_for_broker_default() {
let req = build_renew_delegation_token(b"\x01\x02\x03");
assert!(req.hmac.as_ref() == &[0x01, 0x02, 0x03]);
assert!(req.renew_period_ms == -1);
}
#[test]
fn build_expire_uses_minus_one_for_immediate_tombstone() {
let req = build_expire_delegation_token(b"\xaa\xbb");
assert!(req.hmac.as_ref() == &[0xaa, 0xbb]);
assert!(req.expiry_time_period_ms == -1);
}
#[test]
fn build_describe_owner_filter_sets_single_entry() {
let req = build_describe_owner_filter("User:alice");
let owners = req.owners.as_ref().expect("owners filter populated");
assert!(owners.len() == 1);
assert!(owners[0].principal_type == "User");
assert!(owners[0].principal_name == "alice");
let req2 = build_describe_owner_filter("solo");
let owners2 = req2.owners.as_ref().unwrap();
assert!(owners2[0].principal_type == "User");
assert!(owners2[0].principal_name == "solo");
}
#[test]
fn parse_describe_maps_wire_fields_to_image() {
let resp = DescribeDelegationTokenResponse {
error_code: 0,
tokens: vec![DescribedDelegationToken {
principal_type: "User".into(),
principal_name: "alice".into(),
token_requester_principal_type: "User".into(),
token_requester_principal_name: "operator".into(),
issue_timestamp: 1_000,
expiry_timestamp: 2_000,
max_timestamp: 9_000,
token_id: "tok-1".into(),
hmac: Bytes::from_static(b"\xde\xad\xbe\xef"),
renewers: vec![DescribedDelegationTokenRenewer {
principal_type: "User".into(),
principal_name: "bob".into(),
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let out = parse_describe_delegation_tokens(resp).expect("ok response");
assert!(out.len() == 1);
let t = &out[0];
assert!(t.token_id == "tok-1");
assert!(t.owner.principal_type == "User");
assert!(t.owner.name == "alice");
assert!(t.hmac == vec![0xde, 0xad, 0xbe, 0xef]);
assert!(t.issue_timestamp_ms == 1_000);
assert!(t.expiry_timestamp_ms == 2_000);
assert!(t.max_timestamp_ms == 9_000);
assert!(t.renewers.len() == 1);
assert!(t.renewers[0].principal_type == "User");
assert!(t.renewers[0].name == "bob");
}
#[test]
fn broker_err_carries_api_and_kafka_code_name() {
let e = broker_err("CreateDelegationToken", 65, Some("not super-user".into()));
match e {
AdminError::Broker {
api,
code,
name,
message,
} => {
assert!(api == "CreateDelegationToken");
assert!(code == 65);
assert!(name == "UNKNOWN");
assert!(message.as_deref() == Some("not super-user"));
}
other => panic!("expected AdminError::Broker, got {other:?}"),
}
}
#[test]
fn parse_describe_propagates_nonzero_error_code() {
let resp = DescribeDelegationTokenResponse {
error_code: 61, ..Default::default()
};
let err = parse_describe_delegation_tokens(resp).unwrap_err();
match err {
AdminError::Broker { api, code, .. } => {
assert!(api == "DescribeDelegationToken");
assert!(code == 61);
}
other => panic!("expected AdminError::Broker, got {other:?}"),
}
}
}