krafka 0.11.0

A pure Rust, async-native Apache Kafka client
Documentation
use std::fmt;

use bytes::{Buf, BufMut};
use zeroize::Zeroizing;

use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::auth::scram::{MAX_PBKDF2_ITERATIONS, MIN_PBKDF2_ITERATIONS, ScramMechanism};
use crate::error::{ErrorCode, KrafkaError, ProtocolErrorKind, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};

// ============================================================================
// AlterUserScramCredentials API (Key 51)
//
// v0 baseline. All versions use flexible encoding.
// ============================================================================

/// A SCRAM credential deletion.
#[derive(Debug, Clone)]
pub struct ScramCredentialDeletion {
    /// User name.
    pub name: String,
    /// SCRAM mechanism.
    pub mechanism: ScramMechanism,
}

/// A SCRAM credential upsert (create or update).
///
/// Secret fields (`salt`, `salted_password`) are stored in zeroizing
/// containers and redacted in `Debug` output.
#[derive(Clone)]
pub struct ScramCredentialUpsertion {
    /// User name.
    pub name: String,
    /// SCRAM mechanism.
    pub mechanism: ScramMechanism,
    /// Iteration count (must be in [`MIN_PBKDF2_ITERATIONS`]..=[`MAX_PBKDF2_ITERATIONS`]).
    pub iterations: i32,
    /// Random salt generated by the client.
    pub salt: Zeroizing<Vec<u8>>,
    /// The salted password.
    pub salted_password: Zeroizing<Vec<u8>>,
}

impl fmt::Debug for ScramCredentialUpsertion {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ScramCredentialUpsertion")
            .field("name", &self.name)
            .field("mechanism", &self.mechanism)
            .field("iterations", &self.iterations)
            .field("salt", &"[REDACTED]")
            .field("salted_password", &"[REDACTED]")
            .finish()
    }
}

/// AlterUserScramCredentials request.
#[derive(Debug, Clone)]
pub struct AlterUserScramCredentialsRequest {
    /// Credentials to delete.
    pub deletions: Vec<ScramCredentialDeletion>,
    /// Credentials to upsert (create/update).
    pub upsertions: Vec<ScramCredentialUpsertion>,
}

impl AlterUserScramCredentialsRequest {
    /// Get the API key.
    pub fn api_key() -> ApiKey {
        ApiKey::AlterUserScramCredentials
    }

    /// Encode for version 0 (flexible encoding).
    pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
        // Validate iteration counts before encoding.
        for ups in &self.upsertions {
            let min = MIN_PBKDF2_ITERATIONS as i32;
            let max = MAX_PBKDF2_ITERATIONS as i32;
            if ups.iterations < min || ups.iterations > max {
                return Err(KrafkaError::config(format!(
                    "SCRAM iteration count {} for user '{}' is outside the accepted range [{}, {}]",
                    ups.iterations, ups.name, min, max,
                )));
            }
        }

        // Deletions
        encode_compact_array_len(self.deletions.len(), buf)?;
        for del in &self.deletions {
            KafkaString::new(&del.name).try_encode_compact(buf)?;
            buf.put_i8(del.mechanism.to_wire_byte());
            TaggedFields::default().try_encode(buf)?;
        }
        // Upsertions
        encode_compact_array_len(self.upsertions.len(), buf)?;
        for ups in &self.upsertions {
            KafkaString::new(&ups.name).try_encode_compact(buf)?;
            buf.put_i8(ups.mechanism.to_wire_byte());
            buf.put_i32(ups.iterations);
            Self::encode_compact_bytes(&ups.salt, buf)?;
            Self::encode_compact_bytes(&ups.salted_password, buf)?;
            TaggedFields::default().try_encode(buf)?;
        }
        TaggedFields::default().try_encode(buf)?;
        Ok(())
    }

    /// Encode compact bytes: varint(len+1) followed by raw bytes.
    fn encode_compact_bytes(data: &[u8], buf: &mut impl BufMut) -> Result<()> {
        let wire = u32::try_from(data.len().checked_add(1).ok_or_else(|| {
            crate::error::KrafkaError::protocol_kind(
                ProtocolErrorKind::InvalidLength,
                "compact bytes length overflow",
            )
        })?)
        .map_err(|_| {
            crate::error::KrafkaError::protocol_kind(
                ProtocolErrorKind::InvalidLength,
                "compact bytes length exceeds u32",
            )
        })?;
        crate::util::varint::encode_unsigned_varint(wire, buf);
        buf.put_slice(data);
        Ok(())
    }
}

impl VersionedEncode for AlterUserScramCredentialsRequest {
    fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
        match version {
            0 => self.encode_v0(buf),
            _ => unsupported_encode!("AlterUserScramCredentialsRequest", version),
        }
    }
}

// ── Response ─────────────────────────────────────────────────────────────

/// Per-user result from an AlterUserScramCredentials request.
#[derive(Debug, Clone)]
pub struct AlterUserScramCredentialsResultEntry {
    /// User name.
    pub user: String,
    /// Error code.
    pub error_code: ErrorCode,
    /// Error message, or `None` if no error.
    pub error_message: Option<String>,
}

/// AlterUserScramCredentials response.
#[derive(Debug, Clone)]
pub struct AlterUserScramCredentialsResponse {
    /// Throttle time in milliseconds.
    pub throttle_time_ms: i32,
    /// Per-user results.
    pub results: Vec<AlterUserScramCredentialsResultEntry>,
}

impl AlterUserScramCredentialsResponse {
    /// Decode from version 0 (flexible encoding).
    pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
        let throttle_time_ms = i32::decode(buf)?;
        let result_count =
            check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
        let mut results = Vec::with_capacity(result_count);
        for _ in 0..result_count {
            let user = non_nullable_string("user", KafkaString::decode_compact(buf)?.0)?;
            let error_code = ErrorCode::from_i16(i16::decode(buf)?);
            let error_message = KafkaString::decode_compact(buf)?.0;
            let _ = TaggedFields::decode(buf)?;
            results.push(AlterUserScramCredentialsResultEntry {
                user,
                error_code,
                error_message,
            });
        }
        let _ = TaggedFields::decode(buf)?;

        Ok(Self {
            throttle_time_ms,
            results,
        })
    }
}

impl VersionedDecode for AlterUserScramCredentialsResponse {
    fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
        match version {
            0 => Self::decode_v0(buf),
            _ => unsupported_decode!("AlterUserScramCredentialsResponse", version),
        }
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
    use super::*;

    use bytes::BytesMut;

    use crate::auth::scram::{MIN_PBKDF2_ITERATIONS, ScramMechanism};

    /// Build a compact (flexible) string: varint(len+1) followed by raw bytes.
    fn put_compact_string(buf: &mut BytesMut, s: &str) {
        crate::util::varint::encode_unsigned_varint((s.len() + 1) as u32, buf);
        buf.put_slice(s.as_bytes());
    }

    fn put_compact_null_string(buf: &mut BytesMut) {
        crate::util::varint::encode_unsigned_varint(0, buf);
    }

    fn put_empty_tagged_fields(buf: &mut BytesMut) {
        crate::util::varint::encode_unsigned_varint(0, buf);
    }

    fn put_compact_array_len(buf: &mut BytesMut, count: usize) {
        crate::util::varint::encode_unsigned_varint((count + 1) as u32, buf);
    }

    #[test]
    fn test_alter_scram_request_api_key() {
        assert_eq!(
            AlterUserScramCredentialsRequest::api_key(),
            ApiKey::AlterUserScramCredentials
        );
    }

    #[test]
    fn test_alter_scram_request_encode_v0() {
        let request = AlterUserScramCredentialsRequest {
            deletions: vec![ScramCredentialDeletion {
                name: "bob".to_string(),
                mechanism: ScramMechanism::Sha256,
            }],
            upsertions: vec![ScramCredentialUpsertion {
                name: "alice".to_string(),
                mechanism: ScramMechanism::Sha256,
                iterations: 8192,
                salt: Zeroizing::new(b"random-salt".to_vec()),
                salted_password: Zeroizing::new(b"salted-pw".to_vec()),
            }],
        };
        let mut buf = BytesMut::new();
        request.encode_v0(&mut buf).unwrap();
        assert!(!buf.is_empty());
    }

    #[test]
    fn test_alter_scram_versioned_unsupported() {
        let request = AlterUserScramCredentialsRequest {
            deletions: Vec::new(),
            upsertions: Vec::new(),
        };
        let mut buf = BytesMut::new();
        assert!(request.encode_versioned(-1, &mut buf).is_err());
        assert!(request.encode_versioned(1, &mut buf).is_err());
    }

    #[test]
    fn test_alter_scram_response_decode_v0() {
        let mut buf = BytesMut::new();
        buf.put_i32(0); // throttle_time_ms
        // results: 2 users
        put_compact_array_len(&mut buf, 2);
        // user 1: success
        put_compact_string(&mut buf, "alice");
        buf.put_i16(0); // error_code
        put_compact_null_string(&mut buf); // error_message
        put_empty_tagged_fields(&mut buf); // user tagged fields
        // user 2: error
        put_compact_string(&mut buf, "bob");
        buf.put_i16(69); // error_code
        put_compact_string(&mut buf, "Not found");
        put_empty_tagged_fields(&mut buf); // user tagged fields
        put_empty_tagged_fields(&mut buf); // top-level tagged fields

        let resp = AlterUserScramCredentialsResponse::decode_v0(&mut buf.freeze()).unwrap();
        assert_eq!(resp.results.len(), 2);
        assert_eq!(resp.results[0].user, "alice");
        assert!(resp.results[0].error_code.is_ok());
        assert_eq!(resp.results[1].user, "bob");
        assert!(!resp.results[1].error_code.is_ok());
        assert_eq!(resp.results[1].error_message.as_deref(), Some("Not found"));
    }

    #[test]
    fn test_alter_scram_versioned_decode_unsupported() {
        let buf = BytesMut::new();
        assert!(
            AlterUserScramCredentialsResponse::decode_versioned(-1, &mut buf.clone().freeze())
                .is_err()
        );
        assert!(AlterUserScramCredentialsResponse::decode_versioned(1, &mut buf.freeze()).is_err());
    }

    #[test]
    fn test_upsertion_debug_redacts_secrets() {
        let ups = ScramCredentialUpsertion {
            name: "alice".to_string(),
            mechanism: ScramMechanism::Sha256,
            iterations: 8192,
            salt: Zeroizing::new(b"super-secret-salt".to_vec()),
            salted_password: Zeroizing::new(b"super-secret-pw".to_vec()),
        };
        let debug = format!("{:?}", ups);
        assert!(debug.contains("alice"));
        assert!(debug.contains("[REDACTED]"));
        assert!(!debug.contains("super-secret"));
    }

    #[test]
    fn test_encode_v0_rejects_low_iterations() {
        let request = AlterUserScramCredentialsRequest {
            deletions: Vec::new(),
            upsertions: vec![ScramCredentialUpsertion {
                name: "alice".to_string(),
                mechanism: ScramMechanism::Sha256,
                iterations: 100, // below minimum 4096
                salt: Zeroizing::new(vec![1, 2, 3]),
                salted_password: Zeroizing::new(vec![4, 5, 6]),
            }],
        };
        let mut buf = BytesMut::new();
        let err = request.encode_v0(&mut buf).unwrap_err();
        assert!(err.to_string().contains("outside the accepted range"));
    }

    #[test]
    fn test_encode_v0_rejects_high_iterations() {
        let request = AlterUserScramCredentialsRequest {
            deletions: Vec::new(),
            upsertions: vec![ScramCredentialUpsertion {
                name: "bob".to_string(),
                mechanism: ScramMechanism::Sha512,
                iterations: 2_000_000, // above maximum 1_000_000
                salt: Zeroizing::new(vec![1]),
                salted_password: Zeroizing::new(vec![2]),
            }],
        };
        let mut buf = BytesMut::new();
        assert!(request.encode_v0(&mut buf).is_err());
    }

    #[test]
    fn test_encode_v0_accepts_boundary_iterations() {
        let request = AlterUserScramCredentialsRequest {
            deletions: Vec::new(),
            upsertions: vec![ScramCredentialUpsertion {
                name: "carol".to_string(),
                mechanism: ScramMechanism::Sha256,
                iterations: MIN_PBKDF2_ITERATIONS as i32,
                salt: Zeroizing::new(vec![1]),
                salted_password: Zeroizing::new(vec![2]),
            }],
        };
        let mut buf = BytesMut::new();
        assert!(request.encode_v0(&mut buf).is_ok());
    }
}