1use bytes::Bytes;
15use crabka_protocol::owned::{
16 alter_user_scram_credentials_request::{
17 AlterUserScramCredentialsRequest, ScramCredentialDeletion, ScramCredentialUpsertion,
18 },
19 create_acls_request::{AclCreation, CreateAclsRequest},
20 delete_acls_request::{DeleteAclsFilter, DeleteAclsRequest},
21 describe_acls_request::DescribeAclsRequest,
22};
23use crabka_security::SaslMechanism;
24use ring::rand::{SecureRandom, SystemRandom};
25
26use crate::{AdminClient, AdminError, KafkaError, kafka_error_name};
27
28const SCRAM_SHA_512_WIRE: i8 = 2;
30const SCRAM_SHA_256_WIRE: i8 = 1;
33
34pub const DEFAULT_SCRAM_ITERATIONS: i32 = 8192;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
40pub enum ResourceType {
41 Topic,
42 Group,
43 Cluster,
44 TransactionalId,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
48pub enum PatternType {
49 Literal,
50 Prefixed,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
54pub enum PermissionType {
55 Allow,
56 Deny,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
60pub enum AclOperation {
61 All,
62 Read,
63 Write,
64 Create,
65 Delete,
66 Alter,
67 Describe,
68 ClusterAction,
69 DescribeConfigs,
70 AlterConfigs,
71 IdempotentWrite,
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
77pub struct AclEntry {
78 pub resource_type: ResourceType,
79 pub resource_name: String,
80 pub pattern_type: PatternType,
81 pub principal: String,
82 pub host: String,
83 pub operation: AclOperation,
84 pub permission_type: PermissionType,
85}
86
87#[derive(Debug, Clone, Default, PartialEq, Eq)]
90pub struct AclEntryFilter {
91 pub resource_type: Option<ResourceType>,
92 pub resource_name: Option<String>,
93 pub pattern_type: Option<PatternType>,
94 pub principal: Option<String>,
95 pub host: Option<String>,
96 pub operation: Option<AclOperation>,
97 pub permission_type: Option<PermissionType>,
98}
99
100#[derive(Debug, Clone)]
101pub struct ScramUpsertion {
102 pub username: String,
103 pub password: String,
104 pub iterations: i32,
105}
106
107#[derive(Debug, Clone)]
108pub struct ScramDeletion {
109 pub username: String,
110}
111
112#[derive(Debug, Clone)]
113pub struct ScramUserOutcome {
114 pub username: String,
115 pub error: Option<KafkaError>,
116}
117
118#[derive(Debug, Clone)]
119pub struct CreateAclOutcome {
120 pub error: Option<KafkaError>,
121}
122
123#[derive(Debug, Clone)]
124pub struct DeleteAclFilterOutcome {
125 pub error: Option<KafkaError>,
126 pub matched: Vec<AclEntry>,
127}
128
129impl AdminClient {
130 pub async fn alter_user_scram_credentials_sha512(
138 &mut self,
139 upsertions: &[ScramUpsertion],
140 deletions: &[ScramDeletion],
141 ) -> Result<Vec<ScramUserOutcome>, AdminError> {
142 let rng = SystemRandom::new();
143 let req = build_alter_scram_request_sha512(upsertions, deletions, &rng)?;
144 let resp = self.conn.send(req).await?;
145 Ok(parse_alter_scram_results(resp))
146 }
147
148 pub async fn alter_user_scram_credentials_sha256(
160 &mut self,
161 upsertions: &[ScramUpsertion],
162 deletions: &[ScramDeletion],
163 ) -> Result<Vec<ScramUserOutcome>, AdminError> {
164 let rng = SystemRandom::new();
165 let req = build_alter_scram_request_sha256(upsertions, deletions, &rng)?;
166 let resp = self.conn.send(req).await?;
167 Ok(parse_alter_scram_results(resp))
168 }
169
170 pub async fn describe_acls(
175 &mut self,
176 filter: &AclEntryFilter,
177 ) -> Result<Vec<AclEntry>, AdminError> {
178 let req = filter_to_describe_request(filter);
179 let resp = self.conn.send(req).await?;
180 parse_describe_acls(resp)
181 }
182
183 pub async fn create_acls(
185 &mut self,
186 creations: &[AclEntry],
187 ) -> Result<Vec<CreateAclOutcome>, AdminError> {
188 let req = CreateAclsRequest {
189 creations: creations.iter().map(acl_to_creation).collect(),
190 ..Default::default()
191 };
192 let resp = self.conn.send(req).await?;
193 Ok(resp
194 .results
195 .into_iter()
196 .map(|r| CreateAclOutcome {
197 error: error_if(r.error_code, r.error_message),
198 })
199 .collect())
200 }
201
202 pub async fn delete_acls(
206 &mut self,
207 filters: &[AclEntryFilter],
208 ) -> Result<Vec<DeleteAclFilterOutcome>, AdminError> {
209 let req = DeleteAclsRequest {
210 filters: filters.iter().map(acl_filter_to_wire).collect(),
211 ..Default::default()
212 };
213 let resp = self.conn.send(req).await?;
214 let mut out = Vec::with_capacity(resp.filter_results.len());
215 for fr in resp.filter_results {
216 if let Some(err) = error_if(fr.error_code, fr.error_message) {
217 out.push(DeleteAclFilterOutcome {
218 error: Some(err),
219 matched: Vec::new(),
220 });
221 continue;
222 }
223 let mut matched = Vec::with_capacity(fr.matching_acls.len());
224 for m in fr.matching_acls {
225 if m.error_code != 0 {
226 return Err(AdminError::Broker {
229 api: "DeleteAcls",
230 code: m.error_code,
231 name: kafka_error_name(m.error_code),
232 message: m.error_message,
233 });
234 }
235 matched.push(AclEntry {
236 resource_type: wire_to_resource_type(m.resource_type)?,
237 resource_name: m.resource_name,
238 pattern_type: wire_to_pattern_type(m.pattern_type)?,
239 principal: m.principal,
240 host: m.host,
241 operation: wire_to_operation(m.operation)?,
242 permission_type: wire_to_permission(m.permission_type)?,
243 });
244 }
245 out.push(DeleteAclFilterOutcome {
246 error: None,
247 matched,
248 });
249 }
250 Ok(out)
251 }
252}
253
254fn error_if(code: i16, message: Option<String>) -> Option<KafkaError> {
255 if code == 0 {
256 None
257 } else {
258 Some(KafkaError {
259 code,
260 name: kafka_error_name(code),
261 message,
262 })
263 }
264}
265
266fn build_alter_scram_request_sha512(
267 upsertions: &[ScramUpsertion],
268 deletions: &[ScramDeletion],
269 rng: &SystemRandom,
270) -> Result<AlterUserScramCredentialsRequest, AdminError> {
271 build_alter_scram_request(
272 upsertions,
273 deletions,
274 rng,
275 SaslMechanism::ScramSha512,
276 SCRAM_SHA_512_WIRE,
277 )
278}
279
280fn build_alter_scram_request_sha256(
284 upsertions: &[ScramUpsertion],
285 deletions: &[ScramDeletion],
286 rng: &SystemRandom,
287) -> Result<AlterUserScramCredentialsRequest, AdminError> {
288 build_alter_scram_request(
289 upsertions,
290 deletions,
291 rng,
292 SaslMechanism::ScramSha256,
293 SCRAM_SHA_256_WIRE,
294 )
295}
296
297fn build_alter_scram_request(
298 upsertions: &[ScramUpsertion],
299 deletions: &[ScramDeletion],
300 rng: &SystemRandom,
301 mechanism: SaslMechanism,
302 wire_mechanism: i8,
303) -> Result<AlterUserScramCredentialsRequest, AdminError> {
304 let mut wire_upserts = Vec::with_capacity(upsertions.len());
305 for u in upsertions {
306 let mut salt = vec![0u8; 16];
307 rng.fill(&mut salt)
308 .map_err(|_| AdminError::Protocol("system RNG failure".into()))?;
309 let salted = crabka_security::pbkdf2_salted(
310 u.password.as_bytes(),
311 mechanism,
312 u32::try_from(u.iterations.max(0)).unwrap_or(0),
313 &salt,
314 );
315 wire_upserts.push(ScramCredentialUpsertion {
316 name: u.username.clone(),
317 mechanism: wire_mechanism,
318 iterations: u.iterations,
319 salt: Bytes::from(salt),
320 salted_password: Bytes::from(salted),
321 ..Default::default()
322 });
323 }
324 let wire_deletes = deletions
325 .iter()
326 .map(|d| ScramCredentialDeletion {
327 name: d.username.clone(),
328 mechanism: wire_mechanism,
329 ..Default::default()
330 })
331 .collect();
332 Ok(AlterUserScramCredentialsRequest {
333 upsertions: wire_upserts,
334 deletions: wire_deletes,
335 ..Default::default()
336 })
337}
338
339fn parse_alter_scram_results(
340 resp: <AlterUserScramCredentialsRequest as crabka_protocol::ProtocolRequest>::Response,
341) -> Vec<ScramUserOutcome> {
342 resp.results
343 .into_iter()
344 .map(|r| ScramUserOutcome {
345 username: r.user,
346 error: error_if(r.error_code, r.error_message),
347 })
348 .collect()
349}
350
351fn filter_to_describe_request(f: &AclEntryFilter) -> DescribeAclsRequest {
352 DescribeAclsRequest {
353 resource_type_filter: f.resource_type.map_or(WIRE_ANY, resource_type_to_wire),
354 resource_name_filter: f.resource_name.clone(),
355 pattern_type_filter: f.pattern_type.map_or(WIRE_ANY, pattern_type_to_wire),
356 principal_filter: f.principal.clone(),
357 host_filter: f.host.clone(),
358 operation: f.operation.map_or(WIRE_ANY, operation_to_wire),
359 permission_type: f.permission_type.map_or(WIRE_ANY, permission_to_wire),
360 ..Default::default()
361 }
362}
363
364fn parse_describe_acls(
365 resp: <DescribeAclsRequest as crabka_protocol::ProtocolRequest>::Response,
366) -> Result<Vec<AclEntry>, AdminError> {
367 if resp.error_code != 0 {
368 return Err(AdminError::Broker {
369 api: "DescribeAcls",
370 code: resp.error_code,
371 name: kafka_error_name(resp.error_code),
372 message: resp.error_message,
373 });
374 }
375 let mut out = Vec::new();
376 for resource in resp.resources {
377 let rt = wire_to_resource_type(resource.resource_type)?;
378 let pt = wire_to_pattern_type(resource.pattern_type)?;
379 for desc in resource.acls {
380 out.push(AclEntry {
381 resource_type: rt,
382 resource_name: resource.resource_name.clone(),
383 pattern_type: pt,
384 principal: desc.principal,
385 host: desc.host,
386 operation: wire_to_operation(desc.operation)?,
387 permission_type: wire_to_permission(desc.permission_type)?,
388 });
389 }
390 }
391 Ok(out)
392}
393
394pub(crate) fn acl_to_creation(e: &AclEntry) -> AclCreation {
397 AclCreation {
398 resource_type: resource_type_to_wire(e.resource_type),
399 resource_name: e.resource_name.clone(),
400 resource_pattern_type: pattern_type_to_wire(e.pattern_type),
401 principal: e.principal.clone(),
402 host: e.host.clone(),
403 operation: operation_to_wire(e.operation),
404 permission_type: permission_to_wire(e.permission_type),
405 ..Default::default()
406 }
407}
408
409pub(crate) fn acl_filter_to_wire(f: &AclEntryFilter) -> DeleteAclsFilter {
412 DeleteAclsFilter {
413 resource_type_filter: f.resource_type.map_or(WIRE_ANY, resource_type_to_wire),
414 resource_name_filter: f.resource_name.clone(),
415 pattern_type_filter: f.pattern_type.map_or(WIRE_ANY, pattern_type_to_wire),
416 principal_filter: f.principal.clone(),
417 host_filter: f.host.clone(),
418 operation: f.operation.map_or(WIRE_ANY, operation_to_wire),
419 permission_type: f.permission_type.map_or(WIRE_ANY, permission_to_wire),
420 ..Default::default()
421 }
422}
423
424const WIRE_ANY: i8 = 1;
433
434fn resource_type_to_wire(rt: ResourceType) -> i8 {
435 match rt {
436 ResourceType::Topic => 2,
437 ResourceType::Group => 3,
438 ResourceType::Cluster => 4,
439 ResourceType::TransactionalId => 5,
440 }
441}
442
443fn wire_to_resource_type(b: i8) -> Result<ResourceType, AdminError> {
444 match b {
445 2 => Ok(ResourceType::Topic),
446 3 => Ok(ResourceType::Group),
447 4 => Ok(ResourceType::Cluster),
448 5 => Ok(ResourceType::TransactionalId),
449 _ => Err(AdminError::Protocol(format!(
450 "unknown ACL resource_type discriminant: {b}",
451 ))),
452 }
453}
454
455fn pattern_type_to_wire(pt: PatternType) -> i8 {
456 match pt {
457 PatternType::Literal => 3,
458 PatternType::Prefixed => 4,
459 }
460}
461
462fn wire_to_pattern_type(b: i8) -> Result<PatternType, AdminError> {
463 match b {
464 3 => Ok(PatternType::Literal),
465 4 => Ok(PatternType::Prefixed),
466 _ => Err(AdminError::Protocol(format!(
467 "unknown ACL pattern_type discriminant: {b}",
468 ))),
469 }
470}
471
472fn permission_to_wire(pt: PermissionType) -> i8 {
473 match pt {
474 PermissionType::Deny => 2,
475 PermissionType::Allow => 3,
476 }
477}
478
479fn wire_to_permission(b: i8) -> Result<PermissionType, AdminError> {
480 match b {
481 2 => Ok(PermissionType::Deny),
482 3 => Ok(PermissionType::Allow),
483 _ => Err(AdminError::Protocol(format!(
484 "unknown ACL permission discriminant: {b}",
485 ))),
486 }
487}
488
489fn operation_to_wire(op: AclOperation) -> i8 {
490 match op {
491 AclOperation::All => 2,
492 AclOperation::Read => 3,
493 AclOperation::Write => 4,
494 AclOperation::Create => 5,
495 AclOperation::Delete => 6,
496 AclOperation::Alter => 7,
497 AclOperation::Describe => 8,
498 AclOperation::ClusterAction => 9,
499 AclOperation::DescribeConfigs => 10,
500 AclOperation::AlterConfigs => 11,
501 AclOperation::IdempotentWrite => 12,
502 }
503}
504
505fn wire_to_operation(b: i8) -> Result<AclOperation, AdminError> {
506 match b {
507 2 => Ok(AclOperation::All),
508 3 => Ok(AclOperation::Read),
509 4 => Ok(AclOperation::Write),
510 5 => Ok(AclOperation::Create),
511 6 => Ok(AclOperation::Delete),
512 7 => Ok(AclOperation::Alter),
513 8 => Ok(AclOperation::Describe),
514 9 => Ok(AclOperation::ClusterAction),
515 10 => Ok(AclOperation::DescribeConfigs),
516 11 => Ok(AclOperation::AlterConfigs),
517 12 => Ok(AclOperation::IdempotentWrite),
518 _ => Err(AdminError::Protocol(format!(
519 "unknown ACL operation discriminant: {b}",
520 ))),
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use assert2::assert;
528
529 fn sample_entry() -> AclEntry {
530 AclEntry {
531 resource_type: ResourceType::Topic,
532 resource_name: "orders".into(),
533 pattern_type: PatternType::Literal,
534 principal: "User:alice".into(),
535 host: "*".into(),
536 operation: AclOperation::Read,
537 permission_type: PermissionType::Allow,
538 }
539 }
540
541 #[test]
542 fn resource_type_round_trips() {
543 for rt in [
544 ResourceType::Topic,
545 ResourceType::Group,
546 ResourceType::Cluster,
547 ResourceType::TransactionalId,
548 ] {
549 assert!(wire_to_resource_type(resource_type_to_wire(rt)).unwrap() == rt);
550 }
551 }
552
553 #[test]
554 fn pattern_type_round_trips() {
555 for pt in [PatternType::Literal, PatternType::Prefixed] {
556 assert!(wire_to_pattern_type(pattern_type_to_wire(pt)).unwrap() == pt);
557 }
558 }
559
560 #[test]
561 fn permission_round_trips() {
562 for p in [PermissionType::Allow, PermissionType::Deny] {
563 assert!(wire_to_permission(permission_to_wire(p)).unwrap() == p);
564 }
565 }
566
567 #[test]
568 fn operation_round_trips() {
569 for op in [
570 AclOperation::All,
571 AclOperation::Read,
572 AclOperation::Write,
573 AclOperation::Create,
574 AclOperation::Delete,
575 AclOperation::Alter,
576 AclOperation::Describe,
577 AclOperation::ClusterAction,
578 AclOperation::DescribeConfigs,
579 AclOperation::AlterConfigs,
580 AclOperation::IdempotentWrite,
581 ] {
582 assert!(wire_to_operation(operation_to_wire(op)).unwrap() == op);
583 }
584 }
585
586 #[test]
587 fn wire_to_unknown_resource_type_errors() {
588 assert!(matches!(
589 wire_to_resource_type(99),
590 Err(AdminError::Protocol(_))
591 ));
592 assert!(matches!(
597 wire_to_resource_type(1),
598 Err(AdminError::Protocol(_))
599 ));
600 }
601
602 #[test]
603 fn acl_to_creation_matches_discriminants() {
604 let e = sample_entry();
605 let c = acl_to_creation(&e);
606 assert!(c.resource_type == 2);
607 assert!(c.resource_name == "orders");
608 assert!(c.resource_pattern_type == 3);
609 assert!(c.principal == "User:alice");
610 assert!(c.host == "*");
611 assert!(c.operation == 3);
612 assert!(c.permission_type == 3);
613 }
614
615 #[test]
616 fn acl_filter_to_wire_uses_any_for_none_axes() {
617 let f = AclEntryFilter::default();
618 let w = acl_filter_to_wire(&f);
619 assert!(w.resource_type_filter == WIRE_ANY);
620 assert!(w.pattern_type_filter == WIRE_ANY);
621 assert!(w.operation == WIRE_ANY);
622 assert!(w.permission_type == WIRE_ANY);
623 assert!(w.resource_name_filter.is_none());
624 assert!(w.principal_filter.is_none());
625 assert!(w.host_filter.is_none());
626 }
627
628 #[test]
629 fn acl_filter_to_wire_passes_concrete_axes_through() {
630 let f = AclEntryFilter {
631 resource_type: Some(ResourceType::Topic),
632 resource_name: Some("orders".into()),
633 pattern_type: Some(PatternType::Literal),
634 principal: Some("User:alice".into()),
635 host: Some("10.0.0.0".into()),
636 operation: Some(AclOperation::Read),
637 permission_type: Some(PermissionType::Allow),
638 };
639 let w = acl_filter_to_wire(&f);
640 assert!(w.resource_type_filter == 2);
641 assert!(w.resource_name_filter.as_deref() == Some("orders"));
642 assert!(w.pattern_type_filter == 3);
643 assert!(w.principal_filter.as_deref() == Some("User:alice"));
644 assert!(w.host_filter.as_deref() == Some("10.0.0.0"));
645 assert!(w.operation == 3);
646 assert!(w.permission_type == 3);
647 }
648
649 #[test]
650 fn scram_request_carries_pbkdf2_intermediate_not_password() {
651 let rng = SystemRandom::new();
652 let upserts = [ScramUpsertion {
653 username: "alice".into(),
654 password: "hunter2".into(),
655 iterations: 4096,
656 }];
657 let req = build_alter_scram_request_sha512(&upserts, &[], &rng).unwrap();
658 assert!(req.upsertions.len() == 1);
659 let u = &req.upsertions[0];
660 assert!(u.name == "alice");
661 assert!(u.mechanism == SCRAM_SHA_512_WIRE);
662 assert!(u.iterations == 4096);
663 assert!(u.salt.len() == 16);
664 assert!(u.salted_password.len() == 64);
667 assert!(u.salted_password.as_ref() != b"hunter2");
668 }
669
670 #[test]
671 fn scram_request_deletions_use_sha512_mechanism() {
672 let rng = SystemRandom::new();
673 let dels = [ScramDeletion {
674 username: "alice".into(),
675 }];
676 let req = build_alter_scram_request_sha512(&[], &dels, &rng).unwrap();
677 assert!(req.deletions.len() == 1);
678 assert!(req.deletions[0].name == "alice");
679 assert!(req.deletions[0].mechanism == SCRAM_SHA_512_WIRE);
680 }
681
682 #[test]
683 fn scram_request_two_upserts_get_distinct_salts() {
684 let rng = SystemRandom::new();
685 let upserts = [
686 ScramUpsertion {
687 username: "alice".into(),
688 password: "p".into(),
689 iterations: 4096,
690 },
691 ScramUpsertion {
692 username: "bob".into(),
693 password: "p".into(),
694 iterations: 4096,
695 },
696 ];
697 let req = build_alter_scram_request_sha512(&upserts, &[], &rng).unwrap();
698 assert!(req.upsertions[0].salt != req.upsertions[1].salt);
699 }
700
701 #[test]
702 fn describe_request_uses_any_for_unspecified_axes() {
703 let f = AclEntryFilter {
704 principal: Some("User:alice".into()),
705 ..Default::default()
706 };
707 let r = filter_to_describe_request(&f);
708 assert!(r.principal_filter.as_deref() == Some("User:alice"));
709 assert!(r.resource_type_filter == WIRE_ANY);
710 assert!(r.pattern_type_filter == WIRE_ANY);
711 assert!(r.operation == WIRE_ANY);
712 assert!(r.permission_type == WIRE_ANY);
713 }
714}