crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
//! KIP-48: `RenewDelegationToken` (`api_key` 39).
//!
//! Per spec ยง1.3: caller must be SASL-authenticated; the request's
//! `hmac` selects an existing token by HMAC bytes; only the owner, a
//! `renewers` entry, or a configured super-user may extend it; the new
//! expiry is clamped to the token's `max_timestamp_ms`; a fresh
//! `V1DelegationToken` record is appended with the same `token_id`
//! (image semantics: replace).
//!
//! The super-user bypass matches Kafka's `DelegationTokenManager.
//! isAuthorizedToOperateOnToken` (via `SecurityUtils.isAuthorized`),
//! and is what the operator relies on: the operator is a
//! super-user that mints tokens on behalf of `KafkaUser` principals
//! via act-as, then must be able to renew/expire them despite being
//! neither the owner nor a listed renewer.

use std::collections::HashSet;
use std::hash::BuildHasher;

use crabka_metadata::{DelegationTokenRecord, MetadataRecord};
use crabka_protocol::owned::renew_delegation_token_request::RenewDelegationTokenRequest;
use crabka_protocol::owned::renew_delegation_token_response::RenewDelegationTokenResponse;
use crabka_security::SecretBytes;

use crate::network::auth::ConnectionAuth;
use crate::time_util::now_ms;

pub(crate) async fn handle<S: BuildHasher>(
    req: &RenewDelegationTokenRequest,
    auth: &ConnectionAuth,
    secret_key: Option<&SecretBytes>,
    default_renew_period_ms: i64,
    controller: &dyn crate::metadata_source::MetadataSource,
    super_users: &HashSet<String, S>,
) -> RenewDelegationTokenResponse {
    if secret_key.is_none() {
        return err_response(crate::codes::DELEGATION_TOKEN_AUTH_DISABLED);
    }
    let ConnectionAuth::Authenticated { principal, .. } = auth else {
        return err_response(crate::codes::INVALID_REQUEST);
    };
    let caller = principal.to_kafka();

    let image = controller.current_image();
    // KIP-48/KIP-778: KRaft delegation tokens require metadata.version >= 3.6-IV2.
    if crate::features::require_feature(
        &image,
        crate::features::METADATA_VERSION,
        crabka_metadata::metadata_version::DELEGATION_TOKEN_MIN_LEVEL,
    )
    .is_err()
    {
        return err_response(crate::codes::UNSUPPORTED_VERSION);
    }
    let Some(token) = image.delegation_token_by_hmac(req.hmac.as_ref()).cloned() else {
        return err_response(crate::codes::DELEGATION_TOKEN_NOT_FOUND);
    };

    // KIP-48: super-users bypass the owner/renewer gate. The
    // operator-driven issuance flow depends on this โ€” the operator is
    // a super-user that mints tokens via act-as for other principals,
    // so it is neither the owner nor a listed renewer when it later
    // needs to renew them ahead of expiry.
    let is_super_user = super_users.contains(&principal.name);
    if !is_super_user && token.owner != caller && !token.renewers.contains(&caller) {
        return err_response(crate::codes::DELEGATION_TOKEN_OWNER_MISMATCH);
    }

    let now = now_ms();
    let renew_period_ms = if req.renew_period_ms == -1 {
        default_renew_period_ms
    } else {
        req.renew_period_ms
    };
    let new_expiry = (now + renew_period_ms).min(token.max_timestamp_ms);

    let record = DelegationTokenRecord {
        token_id: token.token_id.clone(),
        owner: token.owner.clone(),
        hmac: token.hmac.clone(),
        issue_timestamp_ms: token.issue_timestamp_ms,
        expiry_timestamp_ms: new_expiry,
        max_timestamp_ms: token.max_timestamp_ms,
        renewers: token.renewers.clone(),
    };
    if let Err(e) = controller
        .submit_change(vec![MetadataRecord::V1DelegationToken(record)])
        .await
    {
        tracing::warn!(error = %e, "RenewDelegationToken: submit_change failed");
        return err_response(crate::codes::INVALID_REQUEST);
    }

    RenewDelegationTokenResponse {
        error_code: 0,
        expiry_timestamp_ms: new_expiry,
        ..Default::default()
    }
}

fn err_response(code: i16) -> RenewDelegationTokenResponse {
    RenewDelegationTokenResponse {
        error_code: code,
        ..Default::default()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;
    use crabka_raft::ControllerHandle;
    use crabka_security::{AuthMethod, KafkaPrincipal, Principal, SaslMechanism};
    use std::collections::HashSet;
    use std::sync::Arc;
    use std::time::Duration;
    use tempfile::TempDir;

    /// Helper: empty super-users set for the pre-existing tests, which
    /// all exercise the owner/renewer path.
    fn empty_super_users() -> HashSet<String> {
        HashSet::new()
    }

    /// Helper: super-users set containing the given names (for the new
    /// super-user-bypass tests).
    fn super_users_with(names: &[&str]) -> HashSet<String> {
        names.iter().map(|s| (*s).to_string()).collect()
    }

    /// Spin up a single-voter `Controller` for tests, wait for leader.
    async fn test_controller(log_dir: std::path::PathBuf) -> Arc<ControllerHandle> {
        let cfg = crabka_raft::ControllerConfig {
            election_timeout: Duration::from_millis(200),
            heartbeat_interval: Duration::from_millis(50),
            client_id: "test".into(),
            ..crabka_raft::ControllerConfig::for_tests(1, log_dir)
        };
        let handle = Arc::new(crabka_raft::Controller::start(cfg).await.unwrap());
        let mut rx = handle.watch_leader();
        let deadline = std::time::Instant::now() + Duration::from_secs(5);
        while rx.borrow().is_none() {
            assert!(std::time::Instant::now() < deadline, "no leader in 5s");
            let _ = tokio::time::timeout(Duration::from_millis(100), rx.changed()).await;
        }
        handle
    }

    fn authed(name: &str) -> ConnectionAuth {
        ConnectionAuth::Authenticated {
            principal: Principal {
                name: name.into(),
                auth_method: AuthMethod::SaslScramSha256,
                groups: vec![],
            },
            mechanism: SaslMechanism::ScramSha256,
            expires_at_ms: None,
            authenticated_via_token: false,
        }
    }

    fn kp(name: &str) -> KafkaPrincipal {
        KafkaPrincipal {
            principal_type: "User".into(),
            name: name.into(),
        }
    }

    #[allow(clippy::too_many_arguments)]
    async fn seed_token(
        controller: &ControllerHandle,
        token_id: &str,
        hmac: Vec<u8>,
        owner: KafkaPrincipal,
        renewers: Vec<KafkaPrincipal>,
        issue_ms: i64,
        expiry_ms: i64,
        max_ms: i64,
    ) {
        let rec = DelegationTokenRecord {
            token_id: token_id.into(),
            owner,
            hmac,
            issue_timestamp_ms: issue_ms,
            expiry_timestamp_ms: expiry_ms,
            max_timestamp_ms: max_ms,
            renewers,
        };
        controller
            .submit_change(vec![MetadataRecord::V1DelegationToken(rec)])
            .await
            .expect("seed token");
    }

    #[tokio::test]
    async fn returns_auth_disabled_when_no_secret_key() {
        // Auth-disabled gate fires before anything else (no controller needed).
        let req = RenewDelegationTokenRequest::default();
        let auth = authed("alice");
        let dir = TempDir::new().unwrap();
        let controller = test_controller(dir.path().into()).await;
        let resp = handle(&req, &auth, None, 1_000, &*controller, &empty_super_users()).await;
        assert!(resp.error_code == crate::codes::DELEGATION_TOKEN_AUTH_DISABLED);
        controller.cancel().await;
    }

    #[tokio::test]
    async fn success_as_owner_extends_expiry() {
        let dir = TempDir::new().unwrap();
        let controller = test_controller(dir.path().into()).await;
        let secret = SecretBytes::new(b"k".to_vec());
        let hmac = vec![0xAA; 32];
        let now = now_ms();
        seed_token(
            &controller,
            "tok-1",
            hmac.clone(),
            kp("alice"),
            vec![],
            now - 1_000,
            now + 60_000,
            now + 7 * 24 * 60 * 60 * 1_000,
        )
        .await;

        let req = RenewDelegationTokenRequest {
            hmac: hmac.into(),
            renew_period_ms: 3_600_000, // +1h
            ..Default::default()
        };
        let resp = handle(
            &req,
            &authed("alice"),
            Some(&secret),
            1_000,
            &*controller,
            &empty_super_users(),
        )
        .await;
        assert!(resp.error_code == 0);
        // expiry should be roughly now + 1h (within a small slop window).
        let slop = 60_000;
        let target = now_ms() + 3_600_000;
        assert!(
            (resp.expiry_timestamp_ms - target).abs() < slop,
            "expiry {} far from {target}",
            resp.expiry_timestamp_ms
        );
        // Persisted in image.
        let img = controller.current_image();
        let stored = img.delegation_token_by_id("tok-1").expect("present");
        assert!(stored.expiry_timestamp_ms == resp.expiry_timestamp_ms);
        controller.cancel().await;
    }

    #[tokio::test]
    async fn success_as_renewer_extends_expiry() {
        let dir = TempDir::new().unwrap();
        let controller = test_controller(dir.path().into()).await;
        let secret = SecretBytes::new(b"k".to_vec());
        let hmac = vec![0xBB; 32];
        let now = now_ms();
        seed_token(
            &controller,
            "tok-2",
            hmac.clone(),
            kp("alice"),
            vec![kp("bob")],
            now - 1_000,
            now + 60_000,
            now + 7 * 24 * 60 * 60 * 1_000,
        )
        .await;

        let req = RenewDelegationTokenRequest {
            hmac: hmac.into(),
            renew_period_ms: 60_000, // +1m
            ..Default::default()
        };
        let resp = handle(
            &req,
            &authed("bob"),
            Some(&secret),
            1_000,
            &*controller,
            &empty_super_users(),
        )
        .await;
        assert!(resp.error_code == 0);
        assert!(resp.expiry_timestamp_ms > now + 30_000);
        controller.cancel().await;
    }

    #[tokio::test]
    async fn unknown_hmac_returns_not_found() {
        let dir = TempDir::new().unwrap();
        let controller = test_controller(dir.path().into()).await;
        let secret = SecretBytes::new(b"k".to_vec());
        let req = RenewDelegationTokenRequest {
            hmac: vec![0xFF; 32].into(),
            renew_period_ms: 60_000,
            ..Default::default()
        };
        let resp = handle(
            &req,
            &authed("alice"),
            Some(&secret),
            1_000,
            &*controller,
            &empty_super_users(),
        )
        .await;
        assert!(resp.error_code == crate::codes::DELEGATION_TOKEN_NOT_FOUND);
        controller.cancel().await;
    }

    #[tokio::test]
    async fn non_owner_non_renewer_returns_owner_mismatch() {
        let dir = TempDir::new().unwrap();
        let controller = test_controller(dir.path().into()).await;
        let secret = SecretBytes::new(b"k".to_vec());
        let hmac = vec![0xCC; 32];
        let now = now_ms();
        seed_token(
            &controller,
            "tok-3",
            hmac.clone(),
            kp("alice"),
            vec![kp("bob")],
            now - 1_000,
            now + 60_000,
            now + 7 * 24 * 60 * 60 * 1_000,
        )
        .await;

        let req = RenewDelegationTokenRequest {
            hmac: hmac.into(),
            renew_period_ms: 60_000,
            ..Default::default()
        };
        let resp = handle(
            &req,
            &authed("eve"),
            Some(&secret),
            1_000,
            &*controller,
            &empty_super_users(),
        )
        .await;
        assert!(resp.error_code == crate::codes::DELEGATION_TOKEN_OWNER_MISMATCH);
        controller.cancel().await;
    }

    /// A super-user caller may renew a token they
    /// neither own nor are listed as a renewer on. This mirrors Kafka's
    /// `DelegationTokenManager.isAuthorizedToOperateOnToken` and is the
    /// load-bearing gate for the operator flow โ€” the operator
    /// is a super-user that act-as-mints tokens on behalf of `KafkaUser`
    /// principals, then must be able to renew them ahead of expiry.
    #[tokio::test]
    async fn super_user_can_renew_any_token() {
        let dir = TempDir::new().unwrap();
        let controller = test_controller(dir.path().into()).await;
        let secret = SecretBytes::new(b"k".to_vec());
        let hmac = vec![0xDD; 32];
        let now = now_ms();
        // Token owned by `alice`, no renewers โ€” operator (`admin`) is
        // neither, but it IS in `super_users`, so renew must succeed.
        seed_token(
            &controller,
            "tok-super",
            hmac.clone(),
            kp("alice"),
            vec![],
            now - 1_000,
            now + 60_000,
            now + 7 * 24 * 60 * 60 * 1_000,
        )
        .await;

        let req = RenewDelegationTokenRequest {
            hmac: hmac.into(),
            renew_period_ms: 3_600_000, // +1h
            ..Default::default()
        };
        let resp = handle(
            &req,
            &authed("admin"),
            Some(&secret),
            1_000,
            &*controller,
            &super_users_with(&["admin"]),
        )
        .await;
        assert!(
            resp.error_code == 0,
            "super-user must be able to renew any token regardless of owner/renewers"
        );
        // Persisted in image with the new expiry.
        let img = controller.current_image();
        let stored = img.delegation_token_by_id("tok-super").expect("present");
        assert!(stored.expiry_timestamp_ms == resp.expiry_timestamp_ms);
        controller.cancel().await;
    }

    /// A non-super-user caller who is also not the
    /// owner and not a listed renewer must still be rejected with
    /// `DELEGATION_TOKEN_OWNER_MISMATCH`. Guards against accidentally
    /// widening the bypass beyond `super_users`.
    #[tokio::test]
    async fn non_super_user_non_owner_non_renewer_still_rejected() {
        let dir = TempDir::new().unwrap();
        let controller = test_controller(dir.path().into()).await;
        let secret = SecretBytes::new(b"k".to_vec());
        let hmac = vec![0xEE; 32];
        let now = now_ms();
        seed_token(
            &controller,
            "tok-eve",
            hmac.clone(),
            kp("alice"),
            vec![kp("bob")],
            now - 1_000,
            now + 60_000,
            now + 7 * 24 * 60 * 60 * 1_000,
        )
        .await;

        let req = RenewDelegationTokenRequest {
            hmac: hmac.into(),
            renew_period_ms: 60_000,
            ..Default::default()
        };
        // `eve` is not in the super-users set (only `admin` is) and is
        // neither owner nor renewer โ€” must still get the mismatch error.
        let resp = handle(
            &req,
            &authed("eve"),
            Some(&secret),
            1_000,
            &*controller,
            &super_users_with(&["admin"]),
        )
        .await;
        assert!(resp.error_code == crate::codes::DELEGATION_TOKEN_OWNER_MISMATCH);
        controller.cancel().await;
    }
}