Skip to main content

crabka_client_admin/
delegation_tokens.rs

1//! KIP-48: delegation-token RPCs on `AdminClient`.
2//!
3//! Four operations the `KafkaUser` (`authentication.type: delegation-token`)
4//! reconciler drives via the cluster's admin connection:
5//! `CreateDelegationToken` (`api_key` 38) with the act-as owner field
6//! populated, `RenewDelegationToken` (39), `ExpireDelegationToken` (40),
7//! and `DescribeDelegationToken` (41) filtered to a single owner.
8//!
9//! Like `users.rs`, this module keeps wire concerns local: requests are
10//! built via small `build_*` helpers, responses are mapped via
11//! `parse_*` helpers, and unit tests cover each end of the pipe.
12
13use bytes::Bytes;
14use crabka_metadata::DelegationToken;
15use crabka_protocol::owned::create_delegation_token_request::{
16    CreatableRenewers, CreateDelegationTokenRequest,
17};
18use crabka_protocol::owned::create_delegation_token_response::CreateDelegationTokenResponse;
19use crabka_protocol::owned::describe_delegation_token_request::{
20    DescribeDelegationTokenOwner, DescribeDelegationTokenRequest,
21};
22use crabka_protocol::owned::describe_delegation_token_response::{
23    DescribeDelegationTokenResponse, DescribedDelegationToken,
24};
25use crabka_protocol::owned::expire_delegation_token_request::ExpireDelegationTokenRequest;
26use crabka_protocol::owned::renew_delegation_token_request::RenewDelegationTokenRequest;
27use crabka_security::KafkaPrincipal;
28
29use crate::{AdminClient, AdminError, kafka_error_name};
30
31impl AdminClient {
32    /// KIP-48 act-as create: mint a delegation token whose owner is
33    /// `owner_principal_name` (type always `"User"`).
34    ///
35    /// The caller MUST be a broker super-user (per broker
36    /// semantics) for the act-as path to take effect; non-super callers
37    /// get `DELEGATION_TOKEN_AUTHORIZATION_FAILED` (65). The full
38    /// response is returned so callers can pluck out `token_id`, `hmac`,
39    /// and the issue/expiry/max timestamps for downstream persistence.
40    ///
41    /// `renewers` items are `"User:bob"` form; entries without a `:`
42    /// are interpreted with type `"User"`.
43    pub async fn create_delegation_token_as_owner(
44        &mut self,
45        owner_principal_name: &str,
46        renewers: &[String],
47        max_lifetime_ms: i64,
48    ) -> Result<CreateDelegationTokenResponse, AdminError> {
49        let req = build_create_delegation_token(owner_principal_name, renewers, max_lifetime_ms);
50        let resp = self.conn.send(req).await?;
51        if resp.error_code != 0 {
52            return Err(broker_err("CreateDelegationToken", resp.error_code, None));
53        }
54        Ok(resp)
55    }
56
57    /// KIP-48 renew: bump the token's `expiry_timestamp_ms` capped by
58    /// `max_timestamp_ms`. `renew_period_ms = -1` tells the broker to
59    /// use its configured default. Returns the new expiry.
60    pub async fn renew_delegation_token(&mut self, hmac: &[u8]) -> Result<i64, AdminError> {
61        let req = build_renew_delegation_token(hmac);
62        let resp = self.conn.send(req).await?;
63        if resp.error_code != 0 {
64            return Err(broker_err("RenewDelegationToken", resp.error_code, None));
65        }
66        Ok(resp.expiry_timestamp_ms)
67    }
68
69    /// KIP-48 expire: tombstone the token immediately
70    /// (`expiry_time_period_ms = -1`).
71    pub async fn expire_delegation_token(&mut self, hmac: &[u8]) -> Result<(), AdminError> {
72        let req = build_expire_delegation_token(hmac);
73        let resp = self.conn.send(req).await?;
74        if resp.error_code != 0 {
75            return Err(broker_err("ExpireDelegationToken", resp.error_code, None));
76        }
77        Ok(())
78    }
79
80    /// KIP-48 describe filtered to a single owner. `owner_principal` is
81    /// a canonical `"Type:Name"` string (e.g. `"User:alice"`); entries
82    /// without a `:` default to type `"User"`.
83    pub async fn describe_delegation_tokens_owned_by(
84        &mut self,
85        owner_principal: &str,
86    ) -> Result<Vec<DelegationToken>, AdminError> {
87        let req = build_describe_owner_filter(owner_principal);
88        let resp = self.conn.send(req).await?;
89        parse_describe_delegation_tokens(resp)
90    }
91}
92
93/// Pure: build a `CreateDelegationTokenRequest` with the act-as owner
94/// fields populated and renewer strings split into the wire principal
95/// pair (`User` default for items lacking a `:`).
96fn build_create_delegation_token(
97    owner_principal_name: &str,
98    renewers: &[String],
99    max_lifetime_ms: i64,
100) -> CreateDelegationTokenRequest {
101    CreateDelegationTokenRequest {
102        owner_principal_type: Some("User".into()),
103        owner_principal_name: Some(owner_principal_name.into()),
104        renewers: renewers
105            .iter()
106            .map(|s| renewer_str_to_wire(s.as_str()))
107            .collect(),
108        max_lifetime_ms,
109        ..Default::default()
110    }
111}
112
113/// Pure: build a `RenewDelegationTokenRequest`. `renew_period_ms = -1`
114/// signals "use broker default" per KIP-48.
115fn build_renew_delegation_token(hmac: &[u8]) -> RenewDelegationTokenRequest {
116    RenewDelegationTokenRequest {
117        hmac: Bytes::copy_from_slice(hmac),
118        renew_period_ms: -1,
119        ..Default::default()
120    }
121}
122
123/// Pure: build an `ExpireDelegationTokenRequest`.
124/// `expiry_time_period_ms = -1` requests immediate tombstoning.
125fn build_expire_delegation_token(hmac: &[u8]) -> ExpireDelegationTokenRequest {
126    ExpireDelegationTokenRequest {
127        hmac: Bytes::copy_from_slice(hmac),
128        expiry_time_period_ms: -1,
129        ..Default::default()
130    }
131}
132
133/// Pure: split `"Type:Name"` (default type `User`) into a wire
134/// `CreatableRenewers`.
135fn renewer_str_to_wire(s: &str) -> CreatableRenewers {
136    let (pt, pn) = s.split_once(':').unwrap_or(("User", s));
137    CreatableRenewers {
138        principal_type: pt.to_string(),
139        principal_name: pn.to_string(),
140        ..Default::default()
141    }
142}
143
144/// Pure: build a `DescribeDelegationTokenRequest` whose `owners` filter
145/// is a single-entry list for the given `"Type:Name"` principal.
146fn build_describe_owner_filter(owner_principal: &str) -> DescribeDelegationTokenRequest {
147    let (pt, pn) = owner_principal
148        .split_once(':')
149        .unwrap_or(("User", owner_principal));
150    DescribeDelegationTokenRequest {
151        owners: Some(vec![DescribeDelegationTokenOwner {
152            principal_type: pt.to_string(),
153            principal_name: pn.to_string(),
154            ..Default::default()
155        }]),
156        ..Default::default()
157    }
158}
159
160/// Pure: response → `Vec<DelegationToken>` (the in-memory image type).
161/// Maps an `error_code != 0` to `AdminError::Broker` so callers get
162/// the same Kafka-error surface that the other admin methods use.
163fn parse_describe_delegation_tokens(
164    resp: DescribeDelegationTokenResponse,
165) -> Result<Vec<DelegationToken>, AdminError> {
166    if resp.error_code != 0 {
167        return Err(broker_err("DescribeDelegationToken", resp.error_code, None));
168    }
169    Ok(resp.tokens.into_iter().map(described_to_image).collect())
170}
171
172/// Pure: wire `DescribedDelegationToken` → `crabka_metadata::DelegationToken`.
173///
174/// Field rename notes:
175/// - wire `issue_timestamp` → image `issue_timestamp_ms`
176/// - wire `expiry_timestamp` → image `expiry_timestamp_ms`
177/// - wire `max_timestamp` → image `max_timestamp_ms`
178/// - wire `hmac: bytes::Bytes` → image `hmac: Vec<u8>`
179/// - wire `renewers: Vec<DescribedDelegationTokenRenewer>` →
180///   image `renewers: Vec<KafkaPrincipal>`
181fn described_to_image(t: DescribedDelegationToken) -> DelegationToken {
182    DelegationToken {
183        token_id: t.token_id,
184        owner: KafkaPrincipal {
185            principal_type: t.principal_type,
186            name: t.principal_name,
187        },
188        hmac: t.hmac.to_vec(),
189        issue_timestamp_ms: t.issue_timestamp,
190        expiry_timestamp_ms: t.expiry_timestamp,
191        max_timestamp_ms: t.max_timestamp,
192        renewers: t
193            .renewers
194            .into_iter()
195            .map(|r| KafkaPrincipal {
196                principal_type: r.principal_type,
197                name: r.principal_name,
198            })
199            .collect(),
200    }
201}
202
203fn broker_err(api: &'static str, code: i16, message: Option<String>) -> AdminError {
204    AdminError::Broker {
205        api,
206        code,
207        name: kafka_error_name(code),
208        message,
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use assert2::assert;
216    use bytes::Bytes;
217    use crabka_protocol::owned::describe_delegation_token_response::{
218        DescribeDelegationTokenResponse, DescribedDelegationToken, DescribedDelegationTokenRenewer,
219    };
220
221    // ── build_create_delegation_token ─────────────────────────────────
222    //
223    // The act-as wire contract: `owner_principal_type` must be `"User"`
224    // and `owner_principal_name` must carry the target user, so the
225    // broker's act-as branch fires (otherwise the broker falls back to
226    // self-mint and the caller's principal owns the token). Renewer
227    // strings are split on the first `:`; bare names default to
228    // `principal_type = "User"`.
229
230    #[test]
231    fn build_create_populates_act_as_owner_and_renewers() {
232        let req = build_create_delegation_token(
233            "alice",
234            &["User:bob".to_string(), "carol".to_string()],
235            60_000,
236        );
237        assert!(req.owner_principal_type.as_deref() == Some("User"));
238        assert!(req.owner_principal_name.as_deref() == Some("alice"));
239        assert!(req.max_lifetime_ms == 60_000);
240        assert!(req.renewers.len() == 2);
241        // "User:bob" → type=User, name=bob
242        assert!(req.renewers[0].principal_type == "User");
243        assert!(req.renewers[0].principal_name == "bob");
244        // "carol" → default type=User, name=carol
245        assert!(req.renewers[1].principal_type == "User");
246        assert!(req.renewers[1].principal_name == "carol");
247    }
248
249    // ── build_renew_delegation_token / build_expire_delegation_token ─
250
251    /// Renew uses the broker default by passing `renew_period_ms = -1`;
252    /// the hmac is copied verbatim into the wire bytes field.
253    #[test]
254    fn build_renew_uses_minus_one_for_broker_default() {
255        let req = build_renew_delegation_token(b"\x01\x02\x03");
256        assert!(req.hmac.as_ref() == &[0x01, 0x02, 0x03]);
257        assert!(req.renew_period_ms == -1);
258    }
259
260    /// Expire-immediately is signaled by `expiry_time_period_ms = -1`;
261    /// the hmac is copied verbatim.
262    #[test]
263    fn build_expire_uses_minus_one_for_immediate_tombstone() {
264        let req = build_expire_delegation_token(b"\xaa\xbb");
265        assert!(req.hmac.as_ref() == &[0xaa, 0xbb]);
266        assert!(req.expiry_time_period_ms == -1);
267    }
268
269    // ── build_describe_owner_filter ──────────────────────────────────
270
271    #[test]
272    fn build_describe_owner_filter_sets_single_entry() {
273        let req = build_describe_owner_filter("User:alice");
274        let owners = req.owners.as_ref().expect("owners filter populated");
275        assert!(owners.len() == 1);
276        assert!(owners[0].principal_type == "User");
277        assert!(owners[0].principal_name == "alice");
278
279        // Bare-name default-type=User path.
280        let req2 = build_describe_owner_filter("solo");
281        let owners2 = req2.owners.as_ref().unwrap();
282        assert!(owners2[0].principal_type == "User");
283        assert!(owners2[0].principal_name == "solo");
284    }
285
286    // ── parse_describe_delegation_tokens ─────────────────────────────
287
288    #[test]
289    fn parse_describe_maps_wire_fields_to_image() {
290        let resp = DescribeDelegationTokenResponse {
291            error_code: 0,
292            tokens: vec![DescribedDelegationToken {
293                principal_type: "User".into(),
294                principal_name: "alice".into(),
295                // act-as: token_requester != owner — but the image type
296                // tracks only the owner; requester is not surfaced.
297                token_requester_principal_type: "User".into(),
298                token_requester_principal_name: "operator".into(),
299                issue_timestamp: 1_000,
300                expiry_timestamp: 2_000,
301                max_timestamp: 9_000,
302                token_id: "tok-1".into(),
303                hmac: Bytes::from_static(b"\xde\xad\xbe\xef"),
304                renewers: vec![DescribedDelegationTokenRenewer {
305                    principal_type: "User".into(),
306                    principal_name: "bob".into(),
307                    ..Default::default()
308                }],
309                ..Default::default()
310            }],
311            ..Default::default()
312        };
313        let out = parse_describe_delegation_tokens(resp).expect("ok response");
314        assert!(out.len() == 1);
315        let t = &out[0];
316        assert!(t.token_id == "tok-1");
317        assert!(t.owner.principal_type == "User");
318        assert!(t.owner.name == "alice");
319        assert!(t.hmac == vec![0xde, 0xad, 0xbe, 0xef]);
320        assert!(t.issue_timestamp_ms == 1_000);
321        assert!(t.expiry_timestamp_ms == 2_000);
322        assert!(t.max_timestamp_ms == 9_000);
323        assert!(t.renewers.len() == 1);
324        assert!(t.renewers[0].principal_type == "User");
325        assert!(t.renewers[0].name == "bob");
326    }
327
328    // ── broker_err ─────────────────────────────────────────────────────
329    //
330    // The pure error-construction helper shared by all four methods.
331    // A non-zero `error_code` surfaces as `AdminError::Broker` carrying
332    // the Kafka-error name string the reconciler may inspect.
333
334    /// Spec-style coverage for the non-zero-error path on
335    /// `create_delegation_token_as_owner` (and by symmetry the other
336    /// three methods, which use the same `broker_err` helper): a
337    /// `DELEGATION_TOKEN_AUTHORIZATION_FAILED` (65) response is mapped
338    /// to `AdminError::Broker { code: 65, .. }`.
339    #[test]
340    fn broker_err_carries_api_and_kafka_code_name() {
341        let e = broker_err("CreateDelegationToken", 65, Some("not super-user".into()));
342        match e {
343            AdminError::Broker {
344                api,
345                code,
346                name,
347                message,
348            } => {
349                assert!(api == "CreateDelegationToken");
350                assert!(code == 65);
351                // 65 is not in `kafka_error_name`'s match table yet, so
352                // it falls through to "UNKNOWN" — locking that behavior
353                // so a later edit to the table doesn't silently change
354                // the surfaced name.
355                assert!(name == "UNKNOWN");
356                assert!(message.as_deref() == Some("not super-user"));
357            }
358            other => panic!("expected AdminError::Broker, got {other:?}"),
359        }
360    }
361
362    /// Non-zero `error_code` on the top-level response surfaces as
363    /// `AdminError::Broker` so the reconciler can branch on the Kafka
364    /// error code (e.g. `DELEGATION_TOKEN_AUTH_DISABLED = 61`).
365    #[test]
366    fn parse_describe_propagates_nonzero_error_code() {
367        let resp = DescribeDelegationTokenResponse {
368            error_code: 61, // DELEGATION_TOKEN_AUTH_DISABLED
369            ..Default::default()
370        };
371        let err = parse_describe_delegation_tokens(resp).unwrap_err();
372        match err {
373            AdminError::Broker { api, code, .. } => {
374                assert!(api == "DescribeDelegationToken");
375                assert!(code == 61);
376            }
377            other => panic!("expected AdminError::Broker, got {other:?}"),
378        }
379    }
380}