use bytes::{Buf, BufMut, Bytes};
use super::{VersionedDecode, VersionedEncode, non_nullable_bytes, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{
Decode, Encode, KafkaBytes, KafkaString, TaggedFields, TryEncode,
};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_decode_array_len, encode_compact_array_len,
};
#[derive(Debug, Clone)]
pub struct CreatableRenewer {
pub principal_type: String,
pub principal_name: String,
}
#[derive(Debug, Clone)]
pub struct CreateDelegationTokenRequest {
pub renewers: Vec<CreatableRenewer>,
pub max_lifetime_ms: i64,
pub owner_principal_type: Option<String>,
pub owner_principal_name: Option<String>,
}
impl CreateDelegationTokenRequest {
pub fn api_key() -> ApiKey {
ApiKey::CreateDelegationToken
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_i32(array_len_i32(self.renewers.len())?);
for renewer in &self.renewers {
KafkaString::new(&renewer.principal_type).try_encode(buf)?;
KafkaString::new(&renewer.principal_name).try_encode(buf)?;
}
self.max_lifetime_ms.encode(buf);
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.renewers.len(), buf)?;
for renewer in &self.renewers {
KafkaString::new(&renewer.principal_type).try_encode_compact(buf)?;
KafkaString::new(&renewer.principal_name).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
}
self.max_lifetime_ms.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v3(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString(self.owner_principal_type.clone()).try_encode_compact(buf)?;
KafkaString(self.owner_principal_name.clone()).try_encode_compact(buf)?;
encode_compact_array_len(self.renewers.len(), buf)?;
for renewer in &self.renewers {
KafkaString::new(&renewer.principal_type).try_encode_compact(buf)?;
KafkaString::new(&renewer.principal_name).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
}
self.max_lifetime_ms.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
impl VersionedEncode for CreateDelegationTokenRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
2 => self.encode_v2(buf)?,
3 => self.encode_v3(buf)?,
_ => return unsupported_encode!("CreateDelegationTokenRequest", version),
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct CreateDelegationTokenResponse {
pub error_code: ErrorCode,
pub principal_type: String,
pub principal_name: String,
pub token_requester_principal_type: Option<String>,
pub token_requester_principal_name: Option<String>,
pub issue_timestamp_ms: i64,
pub expiry_timestamp_ms: i64,
pub max_timestamp_ms: i64,
pub token_id: String,
pub hmac: Bytes,
pub throttle_time_ms: i32,
}
impl CreateDelegationTokenResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let principal_type = non_nullable_string("principal_type", KafkaString::decode(buf)?.0)?;
let principal_name = non_nullable_string("principal_name", KafkaString::decode(buf)?.0)?;
let issue_timestamp_ms = i64::decode(buf)?;
let expiry_timestamp_ms = i64::decode(buf)?;
let max_timestamp_ms = i64::decode(buf)?;
let token_id = non_nullable_string("token_id", KafkaString::decode(buf)?.0)?;
let hmac = non_nullable_bytes("hmac", KafkaBytes::decode(buf)?.0)?;
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
error_code,
principal_type,
principal_name,
token_requester_principal_type: None,
token_requester_principal_name: None,
issue_timestamp_ms,
expiry_timestamp_ms,
max_timestamp_ms,
token_id,
hmac,
throttle_time_ms,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let principal_type =
non_nullable_string("principal_type", KafkaString::decode_compact(buf)?.0)?;
let principal_name =
non_nullable_string("principal_name", KafkaString::decode_compact(buf)?.0)?;
let issue_timestamp_ms = i64::decode(buf)?;
let expiry_timestamp_ms = i64::decode(buf)?;
let max_timestamp_ms = i64::decode(buf)?;
let token_id = non_nullable_string("token_id", KafkaString::decode_compact(buf)?.0)?;
let hmac = non_nullable_bytes("hmac", KafkaBytes::decode_compact(buf)?.0)?;
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
error_code,
principal_type,
principal_name,
token_requester_principal_type: None,
token_requester_principal_name: None,
issue_timestamp_ms,
expiry_timestamp_ms,
max_timestamp_ms,
token_id,
hmac,
throttle_time_ms,
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let principal_type =
non_nullable_string("principal_type", KafkaString::decode_compact(buf)?.0)?;
let principal_name =
non_nullable_string("principal_name", KafkaString::decode_compact(buf)?.0)?;
let token_requester_principal_type =
non_nullable_string("requester_type", KafkaString::decode_compact(buf)?.0)?;
let token_requester_principal_name =
non_nullable_string("requester_name", KafkaString::decode_compact(buf)?.0)?;
let issue_timestamp_ms = i64::decode(buf)?;
let expiry_timestamp_ms = i64::decode(buf)?;
let max_timestamp_ms = i64::decode(buf)?;
let token_id = non_nullable_string("token_id", KafkaString::decode_compact(buf)?.0)?;
let hmac = non_nullable_bytes("hmac", KafkaBytes::decode_compact(buf)?.0)?;
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
error_code,
principal_type,
principal_name,
token_requester_principal_type: Some(token_requester_principal_type),
token_requester_principal_name: Some(token_requester_principal_name),
issue_timestamp_ms,
expiry_timestamp_ms,
max_timestamp_ms,
token_id,
hmac,
throttle_time_ms,
})
}
}
impl VersionedDecode for CreateDelegationTokenResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
3 => Self::decode_v3(buf),
_ => unsupported_decode!("CreateDelegationTokenResponse", version),
}
}
}
#[derive(Debug, Clone)]
pub struct RenewDelegationTokenRequest {
pub hmac: Bytes,
pub renew_period_ms: i64,
}
impl RenewDelegationTokenRequest {
pub fn api_key() -> ApiKey {
ApiKey::RenewDelegationToken
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaBytes::new(self.hmac.clone()).try_encode(buf)?;
self.renew_period_ms.encode(buf);
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaBytes::new(self.hmac.clone()).try_encode_compact(buf)?;
self.renew_period_ms.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
impl VersionedEncode for RenewDelegationTokenRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
2 => self.encode_v2(buf)?,
_ => return unsupported_encode!("RenewDelegationTokenRequest", version),
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RenewDelegationTokenResponse {
pub error_code: ErrorCode,
pub expiry_timestamp_ms: i64,
pub throttle_time_ms: i32,
}
impl RenewDelegationTokenResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let expiry_timestamp_ms = i64::decode(buf)?;
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
error_code,
expiry_timestamp_ms,
throttle_time_ms,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let expiry_timestamp_ms = i64::decode(buf)?;
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
error_code,
expiry_timestamp_ms,
throttle_time_ms,
})
}
}
impl VersionedDecode for RenewDelegationTokenResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
_ => unsupported_decode!("RenewDelegationTokenResponse", version),
}
}
}
#[derive(Debug, Clone)]
pub struct ExpireDelegationTokenRequest {
pub hmac: Bytes,
pub expiry_period_ms: i64,
}
impl ExpireDelegationTokenRequest {
pub fn api_key() -> ApiKey {
ApiKey::ExpireDelegationToken
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaBytes::new(self.hmac.clone()).try_encode(buf)?;
self.expiry_period_ms.encode(buf);
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaBytes::new(self.hmac.clone()).try_encode_compact(buf)?;
self.expiry_period_ms.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
impl VersionedEncode for ExpireDelegationTokenRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
2 => self.encode_v2(buf)?,
_ => return unsupported_encode!("ExpireDelegationTokenRequest", version),
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ExpireDelegationTokenResponse {
pub error_code: ErrorCode,
pub expiry_timestamp_ms: i64,
pub throttle_time_ms: i32,
}
impl ExpireDelegationTokenResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let expiry_timestamp_ms = i64::decode(buf)?;
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
error_code,
expiry_timestamp_ms,
throttle_time_ms,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let expiry_timestamp_ms = i64::decode(buf)?;
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
error_code,
expiry_timestamp_ms,
throttle_time_ms,
})
}
}
impl VersionedDecode for ExpireDelegationTokenResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
_ => unsupported_decode!("ExpireDelegationTokenResponse", version),
}
}
}
#[derive(Debug, Clone)]
pub struct DescribeDelegationTokenOwner {
pub principal_type: String,
pub principal_name: String,
}
#[derive(Debug, Clone)]
pub struct DescribeDelegationTokenRequest {
pub owners: Option<Vec<DescribeDelegationTokenOwner>>,
}
impl DescribeDelegationTokenRequest {
pub fn api_key() -> ApiKey {
ApiKey::DescribeDelegationToken
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.owners {
None => (-1i32).encode(buf),
Some(owners) => {
buf.put_i32(array_len_i32(owners.len())?);
for owner in owners {
KafkaString::new(&owner.principal_type).try_encode(buf)?;
KafkaString::new(&owner.principal_name).try_encode(buf)?;
}
}
}
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.owners {
None => crate::util::varint::encode_unsigned_varint(0, buf),
Some(owners) => {
encode_compact_array_len(owners.len(), buf)?;
for owner in owners {
KafkaString::new(&owner.principal_type).try_encode_compact(buf)?;
KafkaString::new(&owner.principal_name).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
}
}
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
impl VersionedEncode for DescribeDelegationTokenRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
2 | 3 => self.encode_v2(buf)?,
_ => return unsupported_encode!("DescribeDelegationTokenRequest", version),
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DelegationTokenRenewer {
pub principal_type: String,
pub principal_name: String,
}
#[derive(Debug, Clone)]
pub struct DelegationTokenInfo {
pub principal_type: String,
pub principal_name: String,
pub token_requester_principal_type: Option<String>,
pub token_requester_principal_name: Option<String>,
pub issue_timestamp_ms: i64,
pub expiry_timestamp_ms: i64,
pub max_timestamp_ms: i64,
pub token_id: String,
pub hmac: Bytes,
pub renewers: Vec<DelegationTokenRenewer>,
}
#[derive(Debug, Clone)]
pub struct DescribeDelegationTokenResponse {
pub error_code: ErrorCode,
pub tokens: Vec<DelegationTokenInfo>,
pub throttle_time_ms: i32,
}
impl DescribeDelegationTokenResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let token_count = check_decode_array_len(i32::decode(buf)?)?;
let mut tokens = Vec::with_capacity(token_count);
for _ in 0..token_count {
let principal_type =
non_nullable_string("principal_type", KafkaString::decode(buf)?.0)?;
let principal_name =
non_nullable_string("principal_name", KafkaString::decode(buf)?.0)?;
let issue_timestamp_ms = i64::decode(buf)?;
let expiry_timestamp_ms = i64::decode(buf)?;
let max_timestamp_ms = i64::decode(buf)?;
let token_id = non_nullable_string("token_id", KafkaString::decode(buf)?.0)?;
let hmac = non_nullable_bytes("hmac", KafkaBytes::decode(buf)?.0)?;
let renewer_count = check_decode_array_len(i32::decode(buf)?)?;
let mut renewers = Vec::with_capacity(renewer_count);
for _ in 0..renewer_count {
let renewer_type =
non_nullable_string("renewer_type", KafkaString::decode(buf)?.0)?;
let renewer_name =
non_nullable_string("renewer_name", KafkaString::decode(buf)?.0)?;
renewers.push(DelegationTokenRenewer {
principal_type: renewer_type,
principal_name: renewer_name,
});
}
tokens.push(DelegationTokenInfo {
principal_type,
principal_name,
token_requester_principal_type: None,
token_requester_principal_name: None,
issue_timestamp_ms,
expiry_timestamp_ms,
max_timestamp_ms,
token_id,
hmac,
renewers,
});
}
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
error_code,
tokens,
throttle_time_ms,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let token_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut tokens = Vec::with_capacity(token_count);
for _ in 0..token_count {
let principal_type =
non_nullable_string("principal_type", KafkaString::decode_compact(buf)?.0)?;
let principal_name =
non_nullable_string("principal_name", KafkaString::decode_compact(buf)?.0)?;
let issue_timestamp_ms = i64::decode(buf)?;
let expiry_timestamp_ms = i64::decode(buf)?;
let max_timestamp_ms = i64::decode(buf)?;
let token_id = non_nullable_string("token_id", KafkaString::decode_compact(buf)?.0)?;
let hmac = non_nullable_bytes("hmac", KafkaBytes::decode_compact(buf)?.0)?;
let renewer_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut renewers = Vec::with_capacity(renewer_count);
for _ in 0..renewer_count {
let renewer_type =
non_nullable_string("renewer_type", KafkaString::decode_compact(buf)?.0)?;
let renewer_name =
non_nullable_string("renewer_name", KafkaString::decode_compact(buf)?.0)?;
let _ = TaggedFields::decode(buf)?;
renewers.push(DelegationTokenRenewer {
principal_type: renewer_type,
principal_name: renewer_name,
});
}
let _ = TaggedFields::decode(buf)?;
tokens.push(DelegationTokenInfo {
principal_type,
principal_name,
token_requester_principal_type: None,
token_requester_principal_name: None,
issue_timestamp_ms,
expiry_timestamp_ms,
max_timestamp_ms,
token_id,
hmac,
renewers,
});
}
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
error_code,
tokens,
throttle_time_ms,
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let token_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut tokens = Vec::with_capacity(token_count);
for _ in 0..token_count {
let principal_type =
non_nullable_string("principal_type", KafkaString::decode_compact(buf)?.0)?;
let principal_name =
non_nullable_string("principal_name", KafkaString::decode_compact(buf)?.0)?;
let token_requester_principal_type =
non_nullable_string("requester_type", KafkaString::decode_compact(buf)?.0)?;
let token_requester_principal_name =
non_nullable_string("requester_name", KafkaString::decode_compact(buf)?.0)?;
let issue_timestamp_ms = i64::decode(buf)?;
let expiry_timestamp_ms = i64::decode(buf)?;
let max_timestamp_ms = i64::decode(buf)?;
let token_id = non_nullable_string("token_id", KafkaString::decode_compact(buf)?.0)?;
let hmac = non_nullable_bytes("hmac", KafkaBytes::decode_compact(buf)?.0)?;
let renewer_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut renewers = Vec::with_capacity(renewer_count);
for _ in 0..renewer_count {
let renewer_type =
non_nullable_string("renewer_type", KafkaString::decode_compact(buf)?.0)?;
let renewer_name =
non_nullable_string("renewer_name", KafkaString::decode_compact(buf)?.0)?;
let _ = TaggedFields::decode(buf)?;
renewers.push(DelegationTokenRenewer {
principal_type: renewer_type,
principal_name: renewer_name,
});
}
let _ = TaggedFields::decode(buf)?;
tokens.push(DelegationTokenInfo {
principal_type,
principal_name,
token_requester_principal_type: Some(token_requester_principal_type),
token_requester_principal_name: Some(token_requester_principal_name),
issue_timestamp_ms,
expiry_timestamp_ms,
max_timestamp_ms,
token_id,
hmac,
renewers,
});
}
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
error_code,
tokens,
throttle_time_ms,
})
}
}
impl VersionedDecode for DescribeDelegationTokenResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
3 => Self::decode_v3(buf),
_ => unsupported_decode!("DescribeDelegationTokenResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
#[test]
fn test_create_delegation_token_request_roundtrip() {
let request = CreateDelegationTokenRequest {
renewers: vec![CreatableRenewer {
principal_type: "User".to_string(),
principal_name: "alice".to_string(),
}],
max_lifetime_ms: 86_400_000,
owner_principal_type: None,
owner_principal_name: None,
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf2 = BytesMut::new();
request.encode_versioned(1, &mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_create_delegation_token_request_empty_renewers() {
let request = CreateDelegationTokenRequest {
renewers: vec![],
max_lifetime_ms: -1,
owner_principal_type: None,
owner_principal_name: None,
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert_eq!(buf.len(), 4 + 8);
assert_eq!(i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]), 0);
}
#[test]
fn test_create_delegation_token_response_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i16(0);
buf.put_i16(4);
buf.put_slice(b"User");
buf.put_i16(5);
buf.put_slice(b"alice");
buf.put_i64(1000);
buf.put_i64(2000);
buf.put_i64(3000);
buf.put_i16(8);
buf.put_slice(b"token-01");
buf.put_i32(4);
buf.put_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
buf.put_i32(0);
let mut frozen = buf.freeze();
let resp = CreateDelegationTokenResponse::decode_v1(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.principal_name, "alice");
assert_eq!(resp.token_id, "token-01");
assert_eq!(&resp.hmac[..], &[0xDE, 0xAD, 0xBE, 0xEF]);
assert_eq!(resp.issue_timestamp_ms, 1000);
assert_eq!(resp.expiry_timestamp_ms, 2000);
}
#[test]
fn test_delegation_token_v1_versioned_dispatch() {
let create_req = CreateDelegationTokenRequest {
renewers: vec![CreatableRenewer {
principal_type: "User".to_string(),
principal_name: "alice".to_string(),
}],
max_lifetime_ms: 60_000,
owner_principal_type: None,
owner_principal_name: None,
};
let mut buf_direct = BytesMut::new();
let mut buf_dispatch = BytesMut::new();
create_req.encode_v1(&mut buf_direct).unwrap();
create_req.encode_versioned(1, &mut buf_dispatch).unwrap();
assert_eq!(buf_direct, buf_dispatch);
let renew_req = RenewDelegationTokenRequest {
hmac: Bytes::from_static(&[0x01, 0x02]),
renew_period_ms: 30_000,
};
let mut buf_direct = BytesMut::new();
let mut buf_dispatch = BytesMut::new();
renew_req.encode_v1(&mut buf_direct).unwrap();
renew_req.encode_versioned(1, &mut buf_dispatch).unwrap();
assert_eq!(buf_direct, buf_dispatch);
let expire_req = ExpireDelegationTokenRequest {
hmac: Bytes::from_static(&[0xAB]),
expiry_period_ms: -1,
};
let mut buf_direct = BytesMut::new();
let mut buf_dispatch = BytesMut::new();
expire_req.encode_v1(&mut buf_direct).unwrap();
expire_req.encode_versioned(1, &mut buf_dispatch).unwrap();
assert_eq!(buf_direct, buf_dispatch);
let describe_req = DescribeDelegationTokenRequest { owners: None };
let mut buf_direct = BytesMut::new();
let mut buf_dispatch = BytesMut::new();
describe_req.encode_v1(&mut buf_direct).unwrap();
describe_req.encode_versioned(1, &mut buf_dispatch).unwrap();
assert_eq!(buf_direct, buf_dispatch);
let mut resp_buf = BytesMut::new();
resp_buf.put_i16(0); resp_buf.put_i64(42_000); resp_buf.put_i32(5); let frozen = resp_buf.freeze();
let resp = RenewDelegationTokenResponse::decode_versioned(1, &mut frozen.clone()).unwrap();
assert_eq!(resp.expiry_timestamp_ms, 42_000);
assert_eq!(resp.throttle_time_ms, 5);
}
#[test]
fn test_renew_delegation_token_request_roundtrip() {
let request = RenewDelegationTokenRequest {
hmac: Bytes::from_static(&[0x01, 0x02, 0x03]),
renew_period_ms: 60_000,
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert_eq!(buf.len(), 4 + 3 + 8);
}
#[test]
fn test_renew_delegation_token_response_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i64(999_999); buf.put_i32(0);
let mut frozen = buf.freeze();
let resp = RenewDelegationTokenResponse::decode_v1(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.expiry_timestamp_ms, 999_999);
}
#[test]
fn test_expire_delegation_token_request_roundtrip() {
let request = ExpireDelegationTokenRequest {
hmac: Bytes::from_static(&[0xAB]),
expiry_period_ms: -1,
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert_eq!(buf.len(), 4 + 1 + 8);
}
#[test]
fn test_expire_delegation_token_response_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i64(500_000); buf.put_i32(10);
let mut frozen = buf.freeze();
let resp = ExpireDelegationTokenResponse::decode_v1(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.expiry_timestamp_ms, 500_000);
assert_eq!(resp.throttle_time_ms, 10);
}
#[test]
fn test_describe_delegation_token_request_null_owners() {
let request = DescribeDelegationTokenRequest { owners: None };
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert_eq!(buf.len(), 4);
assert_eq!(i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]), -1);
}
#[test]
fn test_describe_delegation_token_request_with_owners() {
let request = DescribeDelegationTokenRequest {
owners: Some(vec![DescribeDelegationTokenOwner {
principal_type: "User".to_string(),
principal_name: "bob".to_string(),
}]),
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert_eq!(buf.len(), 4 + 6 + 5);
}
#[test]
fn test_describe_delegation_token_request_empty_owners() {
let request = DescribeDelegationTokenRequest {
owners: Some(vec![]),
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert_eq!(buf.len(), 4);
assert_eq!(i32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]), 0);
}
#[test]
fn test_describe_delegation_token_response_roundtrip() {
use bytes::BufMut;
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i32(1); buf.put_i16(4);
buf.put_slice(b"User"); buf.put_i16(3);
buf.put_slice(b"bob"); buf.put_i64(100); buf.put_i64(200); buf.put_i64(300); buf.put_i16(2);
buf.put_slice(b"t1"); buf.put_i32(2);
buf.put_slice(&[0xAA, 0xBB]); buf.put_i32(2); buf.put_i16(4);
buf.put_slice(b"User"); buf.put_i16(5);
buf.put_slice(b"alice"); buf.put_i16(4);
buf.put_slice(b"User"); buf.put_i16(3);
buf.put_slice(b"eve"); buf.put_i32(0);
let mut frozen = buf.freeze();
let resp = DescribeDelegationTokenResponse::decode_v1(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.tokens.len(), 1);
assert_eq!(resp.tokens[0].principal_name, "bob");
assert_eq!(resp.tokens[0].token_id, "t1");
assert_eq!(&resp.tokens[0].hmac[..], &[0xAA, 0xBB]);
assert_eq!(resp.tokens[0].renewers.len(), 2);
assert_eq!(resp.tokens[0].renewers[0].principal_name, "alice");
assert_eq!(resp.tokens[0].renewers[1].principal_name, "eve");
}
#[test]
fn test_create_delegation_token_v2_flexible() {
let request = CreateDelegationTokenRequest {
renewers: vec![CreatableRenewer {
principal_type: "User".to_string(),
principal_name: "alice".to_string(),
}],
max_lifetime_ms: 86_400_000,
owner_principal_type: None,
owner_principal_name: None,
};
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v1.len(), v2.len());
assert!(!v2.is_empty());
}
#[test]
fn test_create_delegation_token_v3_owner_override() {
let request = CreateDelegationTokenRequest {
renewers: vec![],
max_lifetime_ms: -1,
owner_principal_type: Some("User".to_string()),
owner_principal_name: Some("admin".to_string()),
};
let mut buf = BytesMut::new();
request.encode_v3(&mut buf).unwrap();
assert!(!buf.is_empty());
let mut v2 = BytesMut::new();
let req_no_owner = CreateDelegationTokenRequest {
renewers: vec![],
max_lifetime_ms: -1,
owner_principal_type: None,
owner_principal_name: None,
};
req_no_owner.encode_v2(&mut v2).unwrap();
assert!(buf.len() > v2.len());
}
#[test]
fn test_create_delegation_token_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_u8(5); buf.put_slice(b"User");
buf.put_u8(6); buf.put_slice(b"alice");
buf.put_i64(1000); buf.put_i64(2000); buf.put_i64(3000); buf.put_u8(9); buf.put_slice(b"token-01");
buf.put_u8(5); buf.put_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
buf.put_i32(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = CreateDelegationTokenResponse::decode_v2(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.principal_name, "alice");
assert_eq!(resp.token_id, "token-01");
assert!(resp.token_requester_principal_type.is_none());
}
#[test]
fn test_create_delegation_token_response_v3_with_requester() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_u8(5); buf.put_slice(b"User");
buf.put_u8(6); buf.put_slice(b"admin");
buf.put_u8(5); buf.put_slice(b"User");
buf.put_u8(6); buf.put_slice(b"alice");
buf.put_i64(1000); buf.put_i64(2000); buf.put_i64(3000); buf.put_u8(4); buf.put_slice(b"t-1");
buf.put_u8(3); buf.put_slice(&[0xAB, 0xCD]);
buf.put_i32(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = CreateDelegationTokenResponse::decode_v3(&mut frozen).unwrap();
assert_eq!(resp.principal_name, "admin");
assert_eq!(resp.token_requester_principal_type.as_deref(), Some("User"));
assert_eq!(
resp.token_requester_principal_name.as_deref(),
Some("alice")
);
}
#[test]
fn test_renew_delegation_token_v2_flexible() {
let request = RenewDelegationTokenRequest {
hmac: Bytes::from_static(&[0x01, 0x02, 0x03]),
renew_period_ms: 60_000,
};
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v1.len(), v2.len());
}
#[test]
fn test_renew_delegation_token_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i64(42_000); buf.put_i32(5); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = RenewDelegationTokenResponse::decode_v2(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.expiry_timestamp_ms, 42_000);
}
#[test]
fn test_expire_delegation_token_v2_flexible() {
let request = ExpireDelegationTokenRequest {
hmac: Bytes::from_static(&[0xAB]),
expiry_period_ms: -1,
};
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v1.len(), v2.len());
}
#[test]
fn test_expire_delegation_token_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_i64(500_000); buf.put_i32(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = ExpireDelegationTokenResponse::decode_v2(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.expiry_timestamp_ms, 500_000);
}
#[test]
fn test_describe_delegation_token_v2_flexible() {
let request = DescribeDelegationTokenRequest { owners: None };
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v1.len(), v2.len());
let mut v3 = BytesMut::new();
request.encode_versioned(3, &mut v3).unwrap();
assert_eq!(v2, v3);
}
#[test]
fn test_describe_delegation_token_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i16(0); buf.put_u8(1); buf.put_i32(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = DescribeDelegationTokenResponse::decode_v2(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert!(resp.tokens.is_empty());
}
}