1use bytes::Bytes;
14use crabka_metadata::DelegationToken;
15use crabka_protocol::owned::create_delegation_token_request::{
16 CreatableRenewers, CreateDelegationTokenRequest,
17};
18use crabka_protocol::owned::create_delegation_token_response::CreateDelegationTokenResponse;
19use crabka_protocol::owned::describe_delegation_token_request::{
20 DescribeDelegationTokenOwner, DescribeDelegationTokenRequest,
21};
22use crabka_protocol::owned::describe_delegation_token_response::{
23 DescribeDelegationTokenResponse, DescribedDelegationToken,
24};
25use crabka_protocol::owned::expire_delegation_token_request::ExpireDelegationTokenRequest;
26use crabka_protocol::owned::renew_delegation_token_request::RenewDelegationTokenRequest;
27use crabka_security::KafkaPrincipal;
28
29use crate::{AdminClient, AdminError, kafka_error_name};
30
31impl AdminClient {
32 pub async fn create_delegation_token_as_owner(
44 &mut self,
45 owner_principal_name: &str,
46 renewers: &[String],
47 max_lifetime_ms: i64,
48 ) -> Result<CreateDelegationTokenResponse, AdminError> {
49 let req = build_create_delegation_token(owner_principal_name, renewers, max_lifetime_ms);
50 let resp = self.conn.send(req).await?;
51 if resp.error_code != 0 {
52 return Err(broker_err("CreateDelegationToken", resp.error_code, None));
53 }
54 Ok(resp)
55 }
56
57 pub async fn renew_delegation_token(&mut self, hmac: &[u8]) -> Result<i64, AdminError> {
61 let req = build_renew_delegation_token(hmac);
62 let resp = self.conn.send(req).await?;
63 if resp.error_code != 0 {
64 return Err(broker_err("RenewDelegationToken", resp.error_code, None));
65 }
66 Ok(resp.expiry_timestamp_ms)
67 }
68
69 pub async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
72 let req = build_expire_delegation_token(hmac);
73 let resp = self.conn.send(req).await?;
74 if resp.error_code != 0 {
75 return Err(broker_err("ExpireDelegationToken", resp.error_code, None));
76 }
77 Ok(())
78 }
79
80 pub async fn describe_delegation_tokens_owned_by(
84 &mut self,
85 owner_principal: &str,
86 ) -> Result<Vec<DelegationToken>, AdminError> {
87 let req = build_describe_owner_filter(owner_principal);
88 let resp = self.conn.send(req).await?;
89 parse_describe_delegation_tokens(resp)
90 }
91}
92
93fn build_create_delegation_token(
97 owner_principal_name: &str,
98 renewers: &[String],
99 max_lifetime_ms: i64,
100) -> CreateDelegationTokenRequest {
101 CreateDelegationTokenRequest {
102 owner_principal_type: Some("User".into()),
103 owner_principal_name: Some(owner_principal_name.into()),
104 renewers: renewers
105 .iter()
106 .map(|s| renewer_str_to_wire(s.as_str()))
107 .collect(),
108 max_lifetime_ms,
109 ..Default::default()
110 }
111}
112
113fn build_renew_delegation_token(hmac: &[u8]) -> RenewDelegationTokenRequest {
116 RenewDelegationTokenRequest {
117 hmac: Bytes::copy_from_slice(hmac),
118 renew_period_ms: -1,
119 ..Default::default()
120 }
121}
122
123fn build_expire_delegation_token(hmac: &[u8]) -> ExpireDelegationTokenRequest {
126 ExpireDelegationTokenRequest {
127 hmac: Bytes::copy_from_slice(hmac),
128 expiry_time_period_ms: -1,
129 ..Default::default()
130 }
131}
132
133fn renewer_str_to_wire(s: &str) -> CreatableRenewers {
136 let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
137 CreatableRenewers {
138 principal_type: pt.to_string(),
139 principal_name: pn.to_string(),
140 ..Default::default()
141 }
142}
143
144fn build_describe_owner_filter(owner_principal: &str) -> DescribeDelegationTokenRequest {
147 let (pt, pn) = owner_principal
148 .split_once(':')
149 .unwrap_or(("User", owner_principal));
150 DescribeDelegationTokenRequest {
151 owners: Some(vec![DescribeDelegationTokenOwner {
152 principal_type: pt.to_string(),
153 principal_name: pn.to_string(),
154 ..Default::default()
155 }]),
156 ..Default::default()
157 }
158}
159
160fn parse_describe_delegation_tokens(
164 resp: DescribeDelegationTokenResponse,
165) -> Result<Vec<DelegationToken>, AdminError> {
166 if resp.error_code != 0 {
167 return Err(broker_err("DescribeDelegationToken", resp.error_code, None));
168 }
169 Ok(resp.tokens.into_iter().map(described_to_image).collect())
170}
171
172fn described_to_image(t: DescribedDelegationToken) -> DelegationToken {
182 DelegationToken {
183 token_id: t.token_id,
184 owner: KafkaPrincipal {
185 principal_type: t.principal_type,
186 name: t.principal_name,
187 },
188 hmac: t.hmac.to_vec(),
189 issue_timestamp_ms: t.issue_timestamp,
190 expiry_timestamp_ms: t.expiry_timestamp,
191 max_timestamp_ms: t.max_timestamp,
192 renewers: t
193 .renewers
194 .into_iter()
195 .map(|r| KafkaPrincipal {
196 principal_type: r.principal_type,
197 name: r.principal_name,
198 })
199 .collect(),
200 }
201}
202
203fn broker_err(api: &'static str, code: i16, message: Option<String>) -> AdminError {
204 AdminError::Broker {
205 api,
206 code,
207 name: kafka_error_name(code),
208 message,
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use assert2::assert;
216 use bytes::Bytes;
217 use crabka_protocol::owned::describe_delegation_token_response::{
218 DescribeDelegationTokenResponse, DescribedDelegationToken, DescribedDelegationTokenRenewer,
219 };
220
221 #[test]
231 fn build_create_populates_act_as_owner_and_renewers() {
232 let req = build_create_delegation_token(
233 "alice",
234 &["User:bob".to_string(), "carol".to_string()],
235 60_000,
236 );
237 assert!(req.owner_principal_type.as_deref() == Some("User"));
238 assert!(req.owner_principal_name.as_deref() == Some("alice"));
239 assert!(req.max_lifetime_ms == 60_000);
240 assert!(req.renewers.len() == 2);
241 assert!(req.renewers[0].principal_type == "User");
243 assert!(req.renewers[0].principal_name == "bob");
244 assert!(req.renewers[1].principal_type == "User");
246 assert!(req.renewers[1].principal_name == "carol");
247 }
248
249 #[test]
254 fn build_renew_uses_minus_one_for_broker_default() {
255 let req = build_renew_delegation_token(b"\x01\x02\x03");
256 assert!(req.hmac.as_ref() == &[0x01, 0x02, 0x03]);
257 assert!(req.renew_period_ms == -1);
258 }
259
260 #[test]
263 fn build_expire_uses_minus_one_for_immediate_tombstone() {
264 let req = build_expire_delegation_token(b"\xaa\xbb");
265 assert!(req.hmac.as_ref() == &[0xaa, 0xbb]);
266 assert!(req.expiry_time_period_ms == -1);
267 }
268
269 #[test]
272 fn build_describe_owner_filter_sets_single_entry() {
273 let req = build_describe_owner_filter("User:alice");
274 let owners = req.owners.as_ref().expect("owners filter populated");
275 assert!(owners.len() == 1);
276 assert!(owners[0].principal_type == "User");
277 assert!(owners[0].principal_name == "alice");
278
279 let req2 = build_describe_owner_filter("solo");
281 let owners2 = req2.owners.as_ref().unwrap();
282 assert!(owners2[0].principal_type == "User");
283 assert!(owners2[0].principal_name == "solo");
284 }
285
286 #[test]
289 fn parse_describe_maps_wire_fields_to_image() {
290 let resp = DescribeDelegationTokenResponse {
291 error_code: 0,
292 tokens: vec![DescribedDelegationToken {
293 principal_type: "User".into(),
294 principal_name: "alice".into(),
295 token_requester_principal_type: "User".into(),
298 token_requester_principal_name: "operator".into(),
299 issue_timestamp: 1_000,
300 expiry_timestamp: 2_000,
301 max_timestamp: 9_000,
302 token_id: "tok-1".into(),
303 hmac: Bytes::from_static(b"\xde\xad\xbe\xef"),
304 renewers: vec![DescribedDelegationTokenRenewer {
305 principal_type: "User".into(),
306 principal_name: "bob".into(),
307 ..Default::default()
308 }],
309 ..Default::default()
310 }],
311 ..Default::default()
312 };
313 let out = parse_describe_delegation_tokens(resp).expect("ok response");
314 assert!(out.len() == 1);
315 let t = &out[0];
316 assert!(t.token_id == "tok-1");
317 assert!(t.owner.principal_type == "User");
318 assert!(t.owner.name == "alice");
319 assert!(t.hmac == vec![0xde, 0xad, 0xbe, 0xef]);
320 assert!(t.issue_timestamp_ms == 1_000);
321 assert!(t.expiry_timestamp_ms == 2_000);
322 assert!(t.max_timestamp_ms == 9_000);
323 assert!(t.renewers.len() == 1);
324 assert!(t.renewers[0].principal_type == "User");
325 assert!(t.renewers[0].name == "bob");
326 }
327
328 #[test]
340 fn broker_err_carries_api_and_kafka_code_name() {
341 let e = broker_err("CreateDelegationToken", 65, Some("not super-user".into()));
342 match e {
343 AdminError::Broker {
344 api,
345 code,
346 name,
347 message,
348 } => {
349 assert!(api == "CreateDelegationToken");
350 assert!(code == 65);
351 assert!(name == "UNKNOWN");
356 assert!(message.as_deref() == Some("not super-user"));
357 }
358 other => panic!("expected AdminError::Broker, got {other:?}"),
359 }
360 }
361
362 #[test]
366 fn parse_describe_propagates_nonzero_error_code() {
367 let resp = DescribeDelegationTokenResponse {
368 error_code: 61, ..Default::default()
370 };
371 let err = parse_describe_delegation_tokens(resp).unwrap_err();
372 match err {
373 AdminError::Broker { api, code, .. } => {
374 assert!(api == "DescribeDelegationToken");
375 assert!(code == 61);
376 }
377 other => panic!("expected AdminError::Broker, got {other:?}"),
378 }
379 }
380}