use std::fmt;
use bytes::{Buf, BufMut};
use zeroize::Zeroizing;
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::auth::scram::{MAX_PBKDF2_ITERATIONS, MIN_PBKDF2_ITERATIONS, ScramMechanism};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};
#[derive(Debug, Clone)]
pub struct ScramCredentialDeletion {
pub name: String,
pub mechanism: ScramMechanism,
}
#[derive(Clone)]
pub struct ScramCredentialUpsertion {
pub name: String,
pub mechanism: ScramMechanism,
pub iterations: i32,
pub salt: Zeroizing<Vec<u8>>,
pub salted_password: Zeroizing<Vec<u8>>,
}
impl fmt::Debug for ScramCredentialUpsertion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ScramCredentialUpsertion")
.field("name", &self.name)
.field("mechanism", &self.mechanism)
.field("iterations", &self.iterations)
.field("salt", &"[REDACTED]")
.field("salted_password", &"[REDACTED]")
.finish()
}
}
#[derive(Debug, Clone)]
pub struct AlterUserScramCredentialsRequest {
pub deletions: Vec<ScramCredentialDeletion>,
pub upsertions: Vec<ScramCredentialUpsertion>,
}
impl AlterUserScramCredentialsRequest {
pub fn api_key() -> ApiKey {
ApiKey::AlterUserScramCredentials
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
for ups in &self.upsertions {
let min = MIN_PBKDF2_ITERATIONS as i32;
let max = MAX_PBKDF2_ITERATIONS as i32;
if ups.iterations < min || ups.iterations > max {
return Err(KrafkaError::config(format!(
"SCRAM iteration count {} for user '{}' is outside the accepted range [{}, {}]",
ups.iterations, ups.name, min, max,
)));
}
}
encode_compact_array_len(self.deletions.len(), buf)?;
for del in &self.deletions {
KafkaString::new(&del.name).try_encode_compact(buf)?;
buf.put_i8(del.mechanism.to_wire_byte());
TaggedFields::default().try_encode(buf)?;
}
encode_compact_array_len(self.upsertions.len(), buf)?;
for ups in &self.upsertions {
KafkaString::new(&ups.name).try_encode_compact(buf)?;
buf.put_i8(ups.mechanism.to_wire_byte());
buf.put_i32(ups.iterations);
Self::encode_compact_bytes(&ups.salt, buf)?;
Self::encode_compact_bytes(&ups.salted_password, buf)?;
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
fn encode_compact_bytes(data: &[u8], buf: &mut impl BufMut) -> Result<()> {
let wire =
u32::try_from(data.len().checked_add(1).ok_or_else(|| {
crate::error::KrafkaError::protocol("compact bytes length overflow")
})?)
.map_err(|_| crate::error::KrafkaError::protocol("compact bytes length exceeds u32"))?;
crate::util::varint::encode_unsigned_varint(wire, buf);
buf.put_slice(data);
Ok(())
}
}
impl VersionedEncode for AlterUserScramCredentialsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf),
_ => unsupported_encode!("AlterUserScramCredentialsRequest", version),
}
}
}
#[derive(Debug, Clone)]
pub struct AlterUserScramCredentialsResultEntry {
pub user: String,
pub error_code: ErrorCode,
pub error_message: Option<String>,
}
#[derive(Debug, Clone)]
pub struct AlterUserScramCredentialsResponse {
pub throttle_time_ms: i32,
pub results: Vec<AlterUserScramCredentialsResultEntry>,
}
impl AlterUserScramCredentialsResponse {
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let result_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut results = Vec::with_capacity(result_count);
for _ in 0..result_count {
let user = non_nullable_string("user", KafkaString::decode_compact(buf)?.0)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
results.push(AlterUserScramCredentialsResultEntry {
user,
error_code,
error_message,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
results,
})
}
}
impl VersionedDecode for AlterUserScramCredentialsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 => Self::decode_v0(buf),
_ => unsupported_decode!("AlterUserScramCredentialsResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
use crate::auth::scram::{MIN_PBKDF2_ITERATIONS, ScramMechanism};
fn put_compact_string(buf: &mut BytesMut, s: &str) {
crate::util::varint::encode_unsigned_varint((s.len() + 1) as u32, buf);
buf.put_slice(s.as_bytes());
}
fn put_compact_null_string(buf: &mut BytesMut) {
crate::util::varint::encode_unsigned_varint(0, buf);
}
fn put_empty_tagged_fields(buf: &mut BytesMut) {
crate::util::varint::encode_unsigned_varint(0, buf);
}
fn put_compact_array_len(buf: &mut BytesMut, count: usize) {
crate::util::varint::encode_unsigned_varint((count + 1) as u32, buf);
}
#[test]
fn test_alter_scram_request_api_key() {
assert_eq!(
AlterUserScramCredentialsRequest::api_key(),
ApiKey::AlterUserScramCredentials
);
}
#[test]
fn test_alter_scram_request_encode_v0() {
let request = AlterUserScramCredentialsRequest {
deletions: vec![ScramCredentialDeletion {
name: "bob".to_string(),
mechanism: ScramMechanism::Sha256,
}],
upsertions: vec![ScramCredentialUpsertion {
name: "alice".to_string(),
mechanism: ScramMechanism::Sha256,
iterations: 8192,
salt: Zeroizing::new(b"random-salt".to_vec()),
salted_password: Zeroizing::new(b"salted-pw".to_vec()),
}],
};
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_alter_scram_versioned_unsupported() {
let request = AlterUserScramCredentialsRequest {
deletions: Vec::new(),
upsertions: Vec::new(),
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(-1, &mut buf).is_err());
assert!(request.encode_versioned(1, &mut buf).is_err());
}
#[test]
fn test_alter_scram_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i32(0); put_compact_array_len(&mut buf, 2);
put_compact_string(&mut buf, "alice");
buf.put_i16(0); put_compact_null_string(&mut buf); put_empty_tagged_fields(&mut buf); put_compact_string(&mut buf, "bob");
buf.put_i16(69); put_compact_string(&mut buf, "Not found");
put_empty_tagged_fields(&mut buf); put_empty_tagged_fields(&mut buf);
let resp = AlterUserScramCredentialsResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.results.len(), 2);
assert_eq!(resp.results[0].user, "alice");
assert!(resp.results[0].error_code.is_ok());
assert_eq!(resp.results[1].user, "bob");
assert!(!resp.results[1].error_code.is_ok());
assert_eq!(resp.results[1].error_message.as_deref(), Some("Not found"));
}
#[test]
fn test_alter_scram_versioned_decode_unsupported() {
let buf = BytesMut::new();
assert!(
AlterUserScramCredentialsResponse::decode_versioned(-1, &mut buf.clone().freeze())
.is_err()
);
assert!(AlterUserScramCredentialsResponse::decode_versioned(1, &mut buf.freeze()).is_err());
}
#[test]
fn test_upsertion_debug_redacts_secrets() {
let ups = ScramCredentialUpsertion {
name: "alice".to_string(),
mechanism: ScramMechanism::Sha256,
iterations: 8192,
salt: Zeroizing::new(b"super-secret-salt".to_vec()),
salted_password: Zeroizing::new(b"super-secret-pw".to_vec()),
};
let debug = format!("{:?}", ups);
assert!(debug.contains("alice"));
assert!(debug.contains("[REDACTED]"));
assert!(!debug.contains("super-secret"));
}
#[test]
fn test_encode_v0_rejects_low_iterations() {
let request = AlterUserScramCredentialsRequest {
deletions: Vec::new(),
upsertions: vec![ScramCredentialUpsertion {
name: "alice".to_string(),
mechanism: ScramMechanism::Sha256,
iterations: 100, salt: Zeroizing::new(vec![1, 2, 3]),
salted_password: Zeroizing::new(vec![4, 5, 6]),
}],
};
let mut buf = BytesMut::new();
let err = request.encode_v0(&mut buf).unwrap_err();
assert!(err.to_string().contains("outside the accepted range"));
}
#[test]
fn test_encode_v0_rejects_high_iterations() {
let request = AlterUserScramCredentialsRequest {
deletions: Vec::new(),
upsertions: vec![ScramCredentialUpsertion {
name: "bob".to_string(),
mechanism: ScramMechanism::Sha512,
iterations: 2_000_000, salt: Zeroizing::new(vec![1]),
salted_password: Zeroizing::new(vec![2]),
}],
};
let mut buf = BytesMut::new();
assert!(request.encode_v0(&mut buf).is_err());
}
#[test]
fn test_encode_v0_accepts_boundary_iterations() {
let request = AlterUserScramCredentialsRequest {
deletions: Vec::new(),
upsertions: vec![ScramCredentialUpsertion {
name: "carol".to_string(),
mechanism: ScramMechanism::Sha256,
iterations: MIN_PBKDF2_ITERATIONS as i32,
salt: Zeroizing::new(vec![1]),
salted_password: Zeroizing::new(vec![2]),
}],
};
let mut buf = BytesMut::new();
assert!(request.encode_v0(&mut buf).is_ok());
}
}